AnonSec Shell
Server IP : 92.204.138.22  /  Your IP : 3.16.47.65
Web Server : Apache
System : Linux ns1009439.ip-92-204-138.us 4.18.0-553.8.1.el8_10.x86_64 #1 SMP Tue Jul 2 07:26:33 EDT 2024 x86_64
User : internationaljou ( 1019)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /var/opt/nydus/ops/google/protobuf/internal/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME ]     

Current File : /var/opt/nydus/ops/google/protobuf/internal/unknown_fields_test.py
# -*- coding: utf-8 -*-
# Protocol Buffers - Google's data interchange format
# Copyright 2008 Google Inc.  All rights reserved.
# https://developers.google.com/protocol-buffers/
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
#     * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
#     * Neither the name of Google Inc. nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

"""Test for preservation of unknown fields in the pure Python implementation."""

__author__ = 'bohdank@google.com (Bohdan Koval)'

import sys
import unittest

from google.protobuf import map_unittest_pb2
from google.protobuf import unittest_mset_pb2
from google.protobuf import unittest_pb2
from google.protobuf import unittest_proto3_arena_pb2
from google.protobuf.internal import api_implementation
from google.protobuf.internal import encoder
from google.protobuf.internal import message_set_extensions_pb2
from google.protobuf.internal import missing_enum_values_pb2
from google.protobuf.internal import test_util
from google.protobuf.internal import testing_refleaks
from google.protobuf.internal import type_checkers
from google.protobuf.internal import wire_format
from google.protobuf import descriptor
from google.protobuf import unknown_fields
try:
  import tracemalloc  # pylint: disable=g-import-not-at-top
except ImportError:
  # Requires python 3.4+
  pass


@testing_refleaks.TestCase
class UnknownFieldsTest(unittest.TestCase):

  def setUp(self):
    self.descriptor = unittest_pb2.TestAllTypes.DESCRIPTOR
    self.all_fields = unittest_pb2.TestAllTypes()
    test_util.SetAllFields(self.all_fields)
    self.all_fields_data = self.all_fields.SerializeToString()
    self.empty_message = unittest_pb2.TestEmptyMessage()
    self.empty_message.ParseFromString(self.all_fields_data)

  def testSerialize(self):
    data = self.empty_message.SerializeToString()

    # Don't use assertEqual because we don't want to dump raw binary data to
    # stdout.
    self.assertTrue(data == self.all_fields_data)

  def testSerializeProto3(self):
    # Verify proto3 unknown fields behavior.
    message = unittest_proto3_arena_pb2.TestEmptyMessage()
    message.ParseFromString(self.all_fields_data)
    self.assertEqual(self.all_fields_data, message.SerializeToString())

  def testByteSize(self):
    self.assertEqual(self.all_fields.ByteSize(), self.empty_message.ByteSize())

  def testListFields(self):
    # Make sure ListFields doesn't return unknown fields.
    self.assertEqual(0, len(self.empty_message.ListFields()))

  def testSerializeMessageSetWireFormatUnknownExtension(self):
    # Create a message using the message set wire format with an unknown
    # message.
    raw = unittest_mset_pb2.RawMessageSet()

    # Add an unknown extension.
    item = raw.item.add()
    item.type_id = 98218603
    message1 = message_set_extensions_pb2.TestMessageSetExtension1()
    message1.i = 12345
    item.message = message1.SerializeToString()

    serialized = raw.SerializeToString()

    # Parse message using the message set wire format.
    proto = message_set_extensions_pb2.TestMessageSet()
    proto.MergeFromString(serialized)

    unknown_field_set = unknown_fields.UnknownFieldSet(proto)
    self.assertEqual(len(unknown_field_set), 1)
    # Unknown field should have wire format data which can be parsed back to
    # original message.
    self.assertEqual(unknown_field_set[0].field_number, item.type_id)
    self.assertEqual(unknown_field_set[0].wire_type,
                     wire_format.WIRETYPE_LENGTH_DELIMITED)
    d = unknown_field_set[0].data
    message_new = message_set_extensions_pb2.TestMessageSetExtension1()
    message_new.ParseFromString(d)
    self.assertEqual(message1, message_new)

    # Verify that the unknown extension is serialized unchanged
    reserialized = proto.SerializeToString()
    new_raw = unittest_mset_pb2.RawMessageSet()
    new_raw.MergeFromString(reserialized)
    self.assertEqual(raw, new_raw)

  def testEquals(self):
    message = unittest_pb2.TestEmptyMessage()
    message.ParseFromString(self.all_fields_data)
    self.assertEqual(self.empty_message, message)

    self.all_fields.ClearField('optional_string')
    message.ParseFromString(self.all_fields.SerializeToString())
    self.assertNotEqual(self.empty_message, message)

  def testDiscardUnknownFields(self):
    self.empty_message.DiscardUnknownFields()
    self.assertEqual(b'', self.empty_message.SerializeToString())
    # Test message field and repeated message field.
    message = unittest_pb2.TestAllTypes()
    other_message = unittest_pb2.TestAllTypes()
    other_message.optional_string = 'discard'
    message.optional_nested_message.ParseFromString(
        other_message.SerializeToString())
    message.repeated_nested_message.add().ParseFromString(
        other_message.SerializeToString())
    self.assertNotEqual(
        b'', message.optional_nested_message.SerializeToString())
    self.assertNotEqual(
        b'', message.repeated_nested_message[0].SerializeToString())
    message.DiscardUnknownFields()
    self.assertEqual(b'', message.optional_nested_message.SerializeToString())
    self.assertEqual(
        b'', message.repeated_nested_message[0].SerializeToString())

    msg = map_unittest_pb2.TestMap()
    msg.map_int32_all_types[1].optional_nested_message.ParseFromString(
        other_message.SerializeToString())
    msg.map_string_string['1'] = 'test'
    self.assertNotEqual(
        b'',
        msg.map_int32_all_types[1].optional_nested_message.SerializeToString())
    msg.DiscardUnknownFields()
    self.assertEqual(
        b'',
        msg.map_int32_all_types[1].optional_nested_message.SerializeToString())


@testing_refleaks.TestCase
class UnknownFieldsAccessorsTest(unittest.TestCase):

  def setUp(self):
    self.descriptor = unittest_pb2.TestAllTypes.DESCRIPTOR
    self.all_fields = unittest_pb2.TestAllTypes()
    test_util.SetAllFields(self.all_fields)
    self.all_fields_data = self.all_fields.SerializeToString()
    self.empty_message = unittest_pb2.TestEmptyMessage()
    self.empty_message.ParseFromString(self.all_fields_data)

  # InternalCheckUnknownField() is an additional Pure Python check which checks
  # a detail of unknown fields. It cannot be used by the C++
  # implementation because some protect members are called.
  # The test is added for historical reasons. It is not necessary as
  # serialized string is checked.
  # TODO(jieluo): Remove message._unknown_fields.
  def InternalCheckUnknownField(self, name, expected_value):
    if api_implementation.Type() != 'python':
      return
    field_descriptor = self.descriptor.fields_by_name[name]
    wire_type = type_checkers.FIELD_TYPE_TO_WIRE_TYPE[field_descriptor.type]
    field_tag = encoder.TagBytes(field_descriptor.number, wire_type)
    result_dict = {}
    for tag_bytes, value in self.empty_message._unknown_fields:
      if tag_bytes == field_tag:
        decoder = unittest_pb2.TestAllTypes._decoders_by_tag[tag_bytes][0]
        decoder(memoryview(value), 0, len(value), self.all_fields, result_dict)
    self.assertEqual(expected_value, result_dict[field_descriptor])

  def CheckUnknownField(self, name, unknown_field_set, expected_value):
    field_descriptor = self.descriptor.fields_by_name[name]
    expected_type = type_checkers.FIELD_TYPE_TO_WIRE_TYPE[
        field_descriptor.type]
    for unknown_field in unknown_field_set:
      if unknown_field.field_number == field_descriptor.number:
        self.assertEqual(expected_type, unknown_field.wire_type)
        if expected_type == 3:
          # Check group
          self.assertEqual(expected_value[0],
                           unknown_field.data[0].field_number)
          self.assertEqual(expected_value[1], unknown_field.data[0].wire_type)
          self.assertEqual(expected_value[2], unknown_field.data[0].data)
          continue
        if expected_type == wire_format.WIRETYPE_LENGTH_DELIMITED:
          self.assertIn(type(unknown_field.data), (str, bytes))
        if field_descriptor.label == descriptor.FieldDescriptor.LABEL_REPEATED:
          self.assertIn(unknown_field.data, expected_value)
        else:
          self.assertEqual(expected_value, unknown_field.data)

  def testCheckUnknownFieldValue(self):
    unknown_field_set = unknown_fields.UnknownFieldSet(self.empty_message)
    # Test enum.
    self.CheckUnknownField('optional_nested_enum',
                           unknown_field_set,
                           self.all_fields.optional_nested_enum)
    self.InternalCheckUnknownField('optional_nested_enum',
                                   self.all_fields.optional_nested_enum)

    # Test repeated enum.
    self.CheckUnknownField('repeated_nested_enum',
                           unknown_field_set,
                           self.all_fields.repeated_nested_enum)
    self.InternalCheckUnknownField('repeated_nested_enum',
                                   self.all_fields.repeated_nested_enum)

    # Test varint.
    self.CheckUnknownField('optional_int32',
                           unknown_field_set,
                           self.all_fields.optional_int32)
    self.InternalCheckUnknownField('optional_int32',
                                   self.all_fields.optional_int32)

    # Test fixed32.
    self.CheckUnknownField('optional_fixed32',
                           unknown_field_set,
                           self.all_fields.optional_fixed32)
    self.InternalCheckUnknownField('optional_fixed32',
                                   self.all_fields.optional_fixed32)

    # Test fixed64.
    self.CheckUnknownField('optional_fixed64',
                           unknown_field_set,
                           self.all_fields.optional_fixed64)
    self.InternalCheckUnknownField('optional_fixed64',
                                   self.all_fields.optional_fixed64)

    # Test length delimited.
    self.CheckUnknownField('optional_string',
                           unknown_field_set,
                           self.all_fields.optional_string.encode('utf-8'))
    self.InternalCheckUnknownField('optional_string',
                                   self.all_fields.optional_string)

    # Test group.
    self.CheckUnknownField('optionalgroup',
                           unknown_field_set,
                           (17, 0, 117))
    self.InternalCheckUnknownField('optionalgroup',
                                   self.all_fields.optionalgroup)

    self.assertEqual(98, len(unknown_field_set))

  def testCopyFrom(self):
    message = unittest_pb2.TestEmptyMessage()
    message.CopyFrom(self.empty_message)
    self.assertEqual(message.SerializeToString(), self.all_fields_data)

  def testMergeFrom(self):
    message = unittest_pb2.TestAllTypes()
    message.optional_int32 = 1
    message.optional_uint32 = 2
    source = unittest_pb2.TestEmptyMessage()
    source.ParseFromString(message.SerializeToString())

    message.ClearField('optional_int32')
    message.optional_int64 = 3
    message.optional_uint32 = 4
    destination = unittest_pb2.TestEmptyMessage()
    unknown_field_set = unknown_fields.UnknownFieldSet(destination)
    self.assertEqual(0, len(unknown_field_set))
    destination.ParseFromString(message.SerializeToString())
    self.assertEqual(0, len(unknown_field_set))
    unknown_field_set = unknown_fields.UnknownFieldSet(destination)
    self.assertEqual(2, len(unknown_field_set))
    destination.MergeFrom(source)
    self.assertEqual(2, len(unknown_field_set))
    # Check that the fields where correctly merged, even stored in the unknown
    # fields set.
    message.ParseFromString(destination.SerializeToString())
    self.assertEqual(message.optional_int32, 1)
    self.assertEqual(message.optional_uint32, 2)
    self.assertEqual(message.optional_int64, 3)

  def testClear(self):
    unknown_field_set = unknown_fields.UnknownFieldSet(self.empty_message)
    self.empty_message.Clear()
    # All cleared, even unknown fields.
    self.assertEqual(self.empty_message.SerializeToString(), b'')
    self.assertEqual(len(unknown_field_set), 98)

  @unittest.skipIf((sys.version_info.major, sys.version_info.minor) < (3, 4),
                   'tracemalloc requires python 3.4+')
  def testUnknownFieldsNoMemoryLeak(self):
    # Call to UnknownFields must not leak memory
    nb_leaks = 1234

    def leaking_function():
      for _ in range(nb_leaks):
        unknown_fields.UnknownFieldSet(self.empty_message)

    tracemalloc.start()
    snapshot1 = tracemalloc.take_snapshot()
    leaking_function()
    snapshot2 = tracemalloc.take_snapshot()
    top_stats = snapshot2.compare_to(snapshot1, 'lineno')
    tracemalloc.stop()
    # There's no easy way to look for a precise leak source.
    # Rely on a "marker" count value while checking allocated memory.
    self.assertEqual([], [x for x in top_stats if x.count_diff == nb_leaks])

  def testSubUnknownFields(self):
    message = unittest_pb2.TestAllTypes()
    message.optionalgroup.a = 123
    destination = unittest_pb2.TestEmptyMessage()
    destination.ParseFromString(message.SerializeToString())
    sub_unknown_fields = unknown_fields.UnknownFieldSet(destination)[0].data
    self.assertEqual(1, len(sub_unknown_fields))
    self.assertEqual(sub_unknown_fields[0].data, 123)
    destination.Clear()
    self.assertEqual(1, len(sub_unknown_fields))
    self.assertEqual(sub_unknown_fields[0].data, 123)
    message.Clear()
    message.optional_uint32 = 456
    nested_message = unittest_pb2.NestedTestAllTypes()
    nested_message.payload.optional_nested_message.ParseFromString(
        message.SerializeToString())
    unknown_field_set = unknown_fields.UnknownFieldSet(
        nested_message.payload.optional_nested_message)
    self.assertEqual(unknown_field_set[0].data, 456)
    nested_message.ClearField('payload')
    self.assertEqual(unknown_field_set[0].data, 456)
    unknown_field_set = unknown_fields.UnknownFieldSet(
        nested_message.payload.optional_nested_message)
    self.assertEqual(0, len(unknown_field_set))

  def testUnknownField(self):
    message = unittest_pb2.TestAllTypes()
    message.optional_int32 = 123
    destination = unittest_pb2.TestEmptyMessage()
    destination.ParseFromString(message.SerializeToString())
    unknown_field = unknown_fields.UnknownFieldSet(destination)[0]
    destination.Clear()
    self.assertEqual(unknown_field.data, 123)

  def testUnknownExtensions(self):
    message = unittest_pb2.TestEmptyMessageWithExtensions()
    message.ParseFromString(self.all_fields_data)
    self.assertEqual(len(unknown_fields.UnknownFieldSet(message)), 98)
    self.assertEqual(message.SerializeToString(), self.all_fields_data)


@testing_refleaks.TestCase
class UnknownEnumValuesTest(unittest.TestCase):

  def setUp(self):
    self.descriptor = missing_enum_values_pb2.TestEnumValues.DESCRIPTOR

    self.message = missing_enum_values_pb2.TestEnumValues()
    # TestEnumValues.ZERO = 0, but does not exist in the other NestedEnum.
    self.message.optional_nested_enum = (
        missing_enum_values_pb2.TestEnumValues.ZERO)
    self.message.repeated_nested_enum.extend([
        missing_enum_values_pb2.TestEnumValues.ZERO,
        missing_enum_values_pb2.TestEnumValues.ONE,
        ])
    self.message.packed_nested_enum.extend([
        missing_enum_values_pb2.TestEnumValues.ZERO,
        missing_enum_values_pb2.TestEnumValues.ONE,
        ])
    self.message_data = self.message.SerializeToString()
    self.missing_message = missing_enum_values_pb2.TestMissingEnumValues()
    self.missing_message.ParseFromString(self.message_data)

  # CheckUnknownField() is an additional Pure Python check which checks
  # a detail of unknown fields. It cannot be used by the C++
  # implementation because some protect members are called.
  # The test is added for historical reasons. It is not necessary as
  # serialized string is checked.

  def CheckUnknownField(self, name, expected_value):
    field_descriptor = self.descriptor.fields_by_name[name]
    unknown_field_set = unknown_fields.UnknownFieldSet(self.missing_message)
    self.assertIsInstance(unknown_field_set, unknown_fields.UnknownFieldSet)
    count = 0
    for field in unknown_field_set:
      if field.field_number == field_descriptor.number:
        count += 1
        if field_descriptor.label == descriptor.FieldDescriptor.LABEL_REPEATED:
          self.assertIn(field.data, expected_value)
        else:
          self.assertEqual(expected_value, field.data)
    if field_descriptor.label == descriptor.FieldDescriptor.LABEL_REPEATED:
      self.assertEqual(count, len(expected_value))
    else:
      self.assertEqual(count, 1)

  def testUnknownParseMismatchEnumValue(self):
    just_string = missing_enum_values_pb2.JustString()
    just_string.dummy = 'blah'

    missing = missing_enum_values_pb2.TestEnumValues()
    # The parse is invalid, storing the string proto into the set of
    # unknown fields.
    missing.ParseFromString(just_string.SerializeToString())

    # Fetching the enum field shouldn't crash, instead returning the
    # default value.
    self.assertEqual(missing.optional_nested_enum, 0)

  def testUnknownEnumValue(self):
    self.assertFalse(self.missing_message.HasField('optional_nested_enum'))
    self.assertEqual(self.missing_message.optional_nested_enum, 2)
    # Clear does not do anything.
    serialized = self.missing_message.SerializeToString()
    self.missing_message.ClearField('optional_nested_enum')
    self.assertEqual(self.missing_message.SerializeToString(), serialized)

  def testUnknownRepeatedEnumValue(self):
    self.assertEqual([], self.missing_message.repeated_nested_enum)

  def testUnknownPackedEnumValue(self):
    self.assertEqual([], self.missing_message.packed_nested_enum)

  def testCheckUnknownFieldValueForEnum(self):
    unknown_field_set = unknown_fields.UnknownFieldSet(self.missing_message)
    self.assertEqual(len(unknown_field_set), 5)
    self.CheckUnknownField('optional_nested_enum',
                           self.message.optional_nested_enum)
    self.CheckUnknownField('repeated_nested_enum',
                           self.message.repeated_nested_enum)
    self.CheckUnknownField('packed_nested_enum',
                           self.message.packed_nested_enum)

  def testRoundTrip(self):
    new_message = missing_enum_values_pb2.TestEnumValues()
    new_message.ParseFromString(self.missing_message.SerializeToString())
    self.assertEqual(self.message, new_message)


if __name__ == '__main__':
  unittest.main()

Anon7 - 2022
AnonSec Team