Skip to content

TestShardAware.test_advanced_shard_aware_port can fail #565

@dkropachev

Description

@dkropachev

Error:

________________ TestShardAware.test_advanced_shard_aware_port _________________
  
  self = <tests.unit.test_shard_aware.TestShardAware testMethod=test_advanced_shard_aware_port>
  
      def test_advanced_shard_aware_port(self):
          """
          Test that on given a `shard_aware_port` on the OPTIONS message (ShardInfo class)
          the next connections would be open using this port
          """
          class MockSession(MagicMock):
              is_shutdown = False
              keyspace = "ks1"
      
              def __init__(self, is_ssl=False, *args, **kwargs):
                  super(MockSession, self).__init__(*args, **kwargs)
                  self.cluster = MagicMock()
                  if is_ssl:
                      self.cluster.ssl_options = {'some_ssl_options': True}
                  else:
                      self.cluster.ssl_options = None
                  self.cluster.shard_aware_options = ShardAwareOptions()
                  self.cluster.executor = ThreadPoolExecutor(max_workers=2)
                  self.cluster.signal_connection_failure = lambda *args, **kwargs: False
                  self.cluster.connection_factory = self.mock_connection_factory
                  self.connection_counter = 0
                  self.futures = []
      
              def submit(self, fn, *args, **kwargs):
                  logging.info("Scheduling %s with args: %s, kwargs: %s", fn, args, kwargs)
                  if not self.is_shutdown:
                      f = self.cluster.executor.submit(fn, *args, **kwargs)
                      self.futures += [f]
                      return f
      
              def mock_connection_factory(self, *args, **kwargs):
                  connection = MagicMock()
                  connection.is_shutdown = False
                  connection.is_defunct = False
                  connection.is_closed = False
                  connection.orphaned_threshold_reached = False
                  connection.endpoint = args[0]
                  sharding_info = ShardingInfo(shard_id=1, shards_count=4, partitioner="", sharding_algorithm="", sharding_ignore_msb=0, shard_aware_port=19042, shard_aware_port_ssl=19045)
                  connection.features = ProtocolFeatures(shard_id=kwargs.get('shard_id', self.connection_counter), sharding_info=sharding_info)
                  self.connection_counter += 1
      
                  return connection
      
          host = MagicMock()
          host.endpoint = DefaultEndPoint("1.2.3.4")
      
          for port, is_ssl in [(19042, False), (19045, True)]:
              session = MockSession(is_ssl=is_ssl)
              pool = HostConnection(host=host, host_distance=HostDistance.REMOTE, session=session)
              for f in session.futures:
  >               f.result()
  
  /Users/runner/work/python-driver/python-driver/tests/unit/test_shard_aware.py:106: 
  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
  /Users/runner/Library/Caches/cibuildwheel/pypy3.11-v7.3.20-macos_arm64/lib/pypy3.11/concurrent/futures/_base.py:449: in result
      return self.__get_result()
             ^^^^^^^^^^^^^^^^^^^
  /Users/runner/Library/Caches/cibuildwheel/pypy3.11-v7.3.20-macos_arm64/lib/pypy3.11/concurrent/futures/_base.py:401: in __get_result
      raise self._exception
  /Users/runner/Library/Caches/cibuildwheel/pypy3.11-v7.3.20-macos_arm64/lib/pypy3.11/concurrent/futures/thread.py:58: in run
      result = self.fn(*self.args, **self.kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  /Users/runner/work/python-driver/python-driver/cassandra/pool.py:804: in _open_connection_to_missing_shard
      num_missing_or_needing_replacement = self.num_missing_or_needing_replacement
                                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  /Users/runner/work/python-driver/python-driver/cassandra/pool.py:920: in num_missing_or_needing_replacement
      - sum(1 for c in self._connections.values() if not c.orphaned_threshold_reached)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
  
  .0 = <dict_valueiterator object at 0x00000001195456e0>
  
  >   - sum(1 for c in self._connections.values() if not c.orphaned_threshold_reached)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  E   RuntimeError: dictionary changed size during iteration
  
  /Users/runner/work/python-driver/python-driver/cassandra/pool.py:920: RuntimeError

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions