17
17
from typing import TYPE_CHECKING , Any , Generic , List , Optional , Type , TypeVar , cast
18
18
19
19
import attr
20
- import txredisapi
20
+ from txredisapi import (
21
+ ConnectionHandler ,
22
+ RedisFactory ,
23
+ SubscriberProtocol ,
24
+ UnixConnectionHandler ,
25
+ )
21
26
from zope .interface import implementer
22
27
23
28
from twisted .internet .address import IPv4Address , IPv6Address
@@ -68,7 +73,7 @@ def __set__(self, obj: Optional[T], value: V) -> None:
68
73
69
74
70
75
@implementer (IReplicationConnection )
71
- class RedisSubscriber (txredisapi . SubscriberProtocol ):
76
+ class RedisSubscriber (SubscriberProtocol ):
72
77
"""Connection to redis subscribed to replication stream.
73
78
74
79
This class fulfils two functions:
@@ -95,7 +100,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
95
100
synapse_handler : "ReplicationCommandHandler"
96
101
synapse_stream_prefix : str
97
102
synapse_channel_names : List [str ]
98
- synapse_outbound_redis_connection : txredisapi . ConnectionHandler
103
+ synapse_outbound_redis_connection : ConnectionHandler
99
104
100
105
def __init__ (self , * args : Any , ** kwargs : Any ):
101
106
super ().__init__ (* args , ** kwargs )
@@ -229,7 +234,7 @@ async def _async_send_command(self, cmd: Command) -> None:
229
234
)
230
235
231
236
232
- class SynapseRedisFactory (txredisapi . RedisFactory ):
237
+ class SynapseRedisFactory (RedisFactory ):
233
238
"""A subclass of RedisFactory that periodically sends pings to ensure that
234
239
we detect dead connections.
235
240
"""
@@ -245,7 +250,7 @@ def __init__(
245
250
dbid : Optional [int ],
246
251
poolsize : int ,
247
252
isLazy : bool = False ,
248
- handler : Type = txredisapi . ConnectionHandler ,
253
+ handler : Type = ConnectionHandler ,
249
254
charset : str = "utf-8" ,
250
255
password : Optional [str ] = None ,
251
256
replyTimeout : int = 30 ,
@@ -326,7 +331,7 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory):
326
331
def __init__ (
327
332
self ,
328
333
hs : "HomeServer" ,
329
- outbound_redis_connection : txredisapi . ConnectionHandler ,
334
+ outbound_redis_connection : ConnectionHandler ,
330
335
channel_names : List [str ],
331
336
):
332
337
super ().__init__ (
@@ -368,7 +373,7 @@ def lazyConnection(
368
373
reconnect : bool = True ,
369
374
password : Optional [str ] = None ,
370
375
replyTimeout : int = 30 ,
371
- ) -> txredisapi . ConnectionHandler :
376
+ ) -> ConnectionHandler :
372
377
"""Creates a connection to Redis that is lazily set up and reconnects if the
373
378
connections is lost.
374
379
"""
@@ -380,7 +385,7 @@ def lazyConnection(
380
385
dbid = dbid ,
381
386
poolsize = 1 ,
382
387
isLazy = True ,
383
- handler = txredisapi . ConnectionHandler ,
388
+ handler = ConnectionHandler ,
384
389
password = password ,
385
390
replyTimeout = replyTimeout ,
386
391
)
@@ -408,3 +413,44 @@ def lazyConnection(
408
413
)
409
414
410
415
return factory .handler
416
+
417
+
418
+ def lazyUnixConnection (
419
+ hs : "HomeServer" ,
420
+ path : str = "/tmp/redis.sock" ,
421
+ dbid : Optional [int ] = None ,
422
+ reconnect : bool = True ,
423
+ password : Optional [str ] = None ,
424
+ replyTimeout : int = 30 ,
425
+ ) -> ConnectionHandler :
426
+ """Creates a connection to Redis that is lazily set up and reconnects if the
427
+ connection is lost.
428
+
429
+ Returns:
430
+ A subclass of ConnectionHandler, which is a UnixConnectionHandler in this case.
431
+ """
432
+
433
+ uuid = path
434
+
435
+ factory = SynapseRedisFactory (
436
+ hs ,
437
+ uuid = uuid ,
438
+ dbid = dbid ,
439
+ poolsize = 1 ,
440
+ isLazy = True ,
441
+ handler = UnixConnectionHandler ,
442
+ password = password ,
443
+ replyTimeout = replyTimeout ,
444
+ )
445
+ factory .continueTrying = reconnect
446
+
447
+ reactor = hs .get_reactor ()
448
+
449
+ reactor .connectUNIX (
450
+ path ,
451
+ factory ,
452
+ timeout = 30 ,
453
+ checkPID = False ,
454
+ )
455
+
456
+ return factory .handler
0 commit comments