Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 6c5d5e5

Browse files
authored
Add unit test for event persister sharding (#8433)
1 parent 05ee048 commit 6c5d5e5

File tree

8 files changed

+371
-27
lines changed

8 files changed

+371
-27
lines changed

changelog.d/8433.misc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add unit test for event persister sharding.

mypy.ini

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,3 +143,6 @@ ignore_missing_imports = True
143143

144144
[mypy-nacl.*]
145145
ignore_missing_imports = True
146+
147+
[mypy-hiredis]
148+
ignore_missing_imports = True

stubs/txredisapi.pyi

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
"""Contains *incomplete* type hints for txredisapi.
1717
"""
1818

19-
from typing import List, Optional, Union
19+
from typing import List, Optional, Union, Type
2020

2121
class RedisProtocol:
2222
def publish(self, channel: str, message: bytes): ...
@@ -42,3 +42,21 @@ def lazyConnection(
4242

4343
class SubscriberFactory:
4444
def buildProtocol(self, addr): ...
45+
46+
class ConnectionHandler: ...
47+
48+
class RedisFactory:
49+
continueTrying: bool
50+
handler: RedisProtocol
51+
def __init__(
52+
self,
53+
uuid: str,
54+
dbid: Optional[int],
55+
poolsize: int,
56+
isLazy: bool = False,
57+
handler: Type = ConnectionHandler,
58+
charset: str = "utf-8",
59+
password: Optional[str] = None,
60+
replyTimeout: Optional[int] = None,
61+
convertNumbers: Optional[int] = True,
62+
): ...

synapse/replication/tcp/handler.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -251,10 +251,9 @@ def start_replication(self, hs):
251251
using TCP.
252252
"""
253253
if hs.config.redis.redis_enabled:
254-
import txredisapi
255-
256254
from synapse.replication.tcp.redis import (
257255
RedisDirectTcpReplicationClientFactory,
256+
lazyConnection,
258257
)
259258

260259
logger.info(
@@ -271,7 +270,8 @@ def start_replication(self, hs):
271270
# connection after SUBSCRIBE is called).
272271

273272
# First create the connection for sending commands.
274-
outbound_redis_connection = txredisapi.lazyConnection(
273+
outbound_redis_connection = lazyConnection(
274+
reactor=hs.get_reactor(),
275275
host=hs.config.redis_host,
276276
port=hs.config.redis_port,
277277
password=hs.config.redis.redis_password,

synapse/replication/tcp/redis.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import logging
1717
from inspect import isawaitable
18-
from typing import TYPE_CHECKING
18+
from typing import TYPE_CHECKING, Optional
1919

2020
import txredisapi
2121

@@ -228,3 +228,41 @@ def buildProtocol(self, addr):
228228
p.password = self.password
229229

230230
return p
231+
232+
233+
def lazyConnection(
234+
reactor,
235+
host: str = "localhost",
236+
port: int = 6379,
237+
dbid: Optional[int] = None,
238+
reconnect: bool = True,
239+
charset: str = "utf-8",
240+
password: Optional[str] = None,
241+
connectTimeout: Optional[int] = None,
242+
replyTimeout: Optional[int] = None,
243+
convertNumbers: bool = True,
244+
) -> txredisapi.RedisProtocol:
245+
"""Equivalent to `txredisapi.lazyConnection`, except allows specifying a
246+
reactor.
247+
"""
248+
249+
isLazy = True
250+
poolsize = 1
251+
252+
uuid = "%s:%d" % (host, port)
253+
factory = txredisapi.RedisFactory(
254+
uuid,
255+
dbid,
256+
poolsize,
257+
isLazy,
258+
txredisapi.ConnectionHandler,
259+
charset,
260+
password,
261+
replyTimeout,
262+
convertNumbers,
263+
)
264+
factory.continueTrying = reconnect
265+
for x in range(poolsize):
266+
reactor.connectTCP(host, port, factory, connectTimeout)
267+
268+
return factory.handler

0 commit comments

Comments
 (0)