Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.
Merged
63 changes: 54 additions & 9 deletions synapse/replication/tcp/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import logging
from inspect import isawaitable
from typing import TYPE_CHECKING, Any, Generic, Optional, Type, TypeVar, cast
from typing import TYPE_CHECKING, Any, Generic, List, Optional, Type, TypeVar, cast

import attr
import txredisapi
Expand All @@ -24,6 +24,7 @@
from twisted.internet.interfaces import IAddress, IConnector
from twisted.python.failure import Failure

from synapse.config.homeserver import HomeServerConfig
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.metrics.background_process_metrics import (
BackgroundProcessLoggingContext,
Expand Down Expand Up @@ -85,14 +86,15 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):

Attributes:
synapse_handler: The command handler to handle incoming commands.
synapse_stream_name: The *redis* stream name to subscribe to and publish
synapse_stream_prefix: The *redis* stream name to subscribe to and publish
from (not anything to do with Synapse replication streams).
synapse_outbound_redis_connection: The connection to redis to use to send
commands.
"""

synapse_handler: "ReplicationCommandHandler"
synapse_stream_name: str
synapse_stream_prefix: str
synapse_subscribed_channels: List[str]
synapse_outbound_redis_connection: txredisapi.ConnectionHandler

def __init__(self, *args: Any, **kwargs: Any):
Expand All @@ -117,8 +119,13 @@ async def _send_subscribe(self) -> None:
# it's important to make sure that we only send the REPLICATE command once we
# have successfully subscribed to the stream - otherwise we might miss the
# POSITION response sent back by the other end.
logger.info("Sending redis SUBSCRIBE for %s", self.synapse_stream_name)
await make_deferred_yieldable(self.subscribe(self.synapse_stream_name))
fully_qualified_stream_names = [
f"{self.synapse_stream_prefix}/{stream_suffix}"
for stream_suffix in self.synapse_subscribed_channels
] + [self.synapse_stream_prefix]
logger.info("Sending redis SUBSCRIBE for %r", fully_qualified_stream_names)
await make_deferred_yieldable(self.subscribe(fully_qualified_stream_names))

logger.info(
"Successfully subscribed to redis stream, sending REPLICATE command"
)
Expand Down Expand Up @@ -217,7 +224,7 @@ async def _async_send_command(self, cmd: Command) -> None:

await make_deferred_yieldable(
self.synapse_outbound_redis_connection.publish(
self.synapse_stream_name, encoded_string
self.synapse_stream_prefix, encoded_string
)
)

Expand Down Expand Up @@ -300,7 +307,7 @@ def format_address(address: IAddress) -> str:

class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory):
"""This is a reconnecting factory that connects to redis and immediately
subscribes to a stream.
subscribes to some streams.

Args:
hs
Expand All @@ -326,10 +333,47 @@ def __init__(
)

self.synapse_handler = hs.get_replication_command_handler()
self.synapse_stream_name = hs.hostname
self.synapse_stream_prefix = hs.hostname
self.synapse_subscribed_channels = (
RedisDirectTcpReplicationClientFactory.channels_to_subscribe_to_for_config(
hs.config
)
)

self.synapse_outbound_redis_connection = outbound_redis_connection

@staticmethod
def channels_to_subscribe_to_for_config(config: HomeServerConfig) -> List[str]:
subscribe_to = []

if config.worker.run_background_tasks or config.worker.worker_app is None:
# If we're the main process or the background worker, we want to process
# User IP addresses
subscribe_to.append("USER_IP")

# Subscribe to the following RDATA channels.
# We may be able to reduce this in the future.
subscribe_to += [
"RDATA/account_data",
"RDATA/backfill",
"RDATA/caches",
"RDATA/device_lists",
"RDATA/events",
"RDATA/federation",
"RDATA/groups",
"RDATA/presence",
"RDATA/presence_federation",
"RDATA/push_rules",
"RDATA/pushers",
"RDATA/receipts",
"RDATA/tag_account_data",
"RDATA/to_device",
"RDATA/typing",
"RDATA/user_signature",
]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a huge fan of hardcoding the list of streams here. Can we either:

  1. Leave RDATA on the main channel for now; or
  2. Subscribe to RDATA/* and use PSUBSCRIBE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave RDATA on the main channel for now; or

I think I'd prefer this (mind you, it's still transmitting on the main channel). How do you suggest this work without specifying a list of streams here?
I guess we could have them be subscribed on-demand somewhere else, but I was sort of keen to subscribe them all in one place to be sure we're not sending REPLICATE before we are listening on all the desired channels.

Subscribe to RDATA/* and use PSUBSCRIBE

I am somewhat reluctant to do this because it's not really sorting us out to be able to unsubscribe from select streams in the future (and not to mention that it seems like it's more work for Redis to do pattern matches on each receiver rather than having a concrete set of listeners per channel; not sure how it's implemented in practice though).

Copy link
Member

@erikjohnston erikjohnston May 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sort of depends on what we want the final code to look like. One option that I've been considering is have it so that every handler that wants to listen to a stream has to call something like ReplicationSubscriber.subscribe_to_stream(Stream.NAME, func) in the handler's __init__. Then, when we come to connect to Redis the ReplicationSubscriber has the list of streams the worker is interested in.

What I really want to avoid is to have to manually list these stream names, its a recipe for it getting out of date and its very non-obvious how it all fits together.

Using PSUBSCRIBE would allow us to set up the channels and write to those channels before putting all the logic in for doing ReplicationSubscriber. OTOH, you could implement the ReplicationSubscriber logic before adding the streams

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll give that a go. Otherwise it may be worth cutting out any mention of RDATA in this PR and leaving that rewiring for another PR.

its very non-obvious how it all fits together.

yeees. That can be said about much of the replication stuff, I don't want to make it much worse though, so will give this a try

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. I think its worth time boxing the RDATA stuff, as just getting the UserIP stuff split out will provide a bunch of value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've de-scoped the RDATA stuff a bit for now (mostly because I'd appreciate a few thoughts on where exactly to hook it in; the code processing RDATA commands is a bit convoluted and I thought it was going to take a while to untangle), but I've restructured things in a way that I think makes it a bit more approachable to do cleanly.


return subscribe_to

def buildProtocol(self, addr: IAddress) -> RedisSubscriber:
p = super().buildProtocol(addr)
p = cast(RedisSubscriber, p)
Expand All @@ -340,7 +384,8 @@ def buildProtocol(self, addr: IAddress) -> RedisSubscriber:
# protocol.
p.synapse_handler = self.synapse_handler
p.synapse_outbound_redis_connection = self.synapse_outbound_redis_connection
p.synapse_stream_name = self.synapse_stream_name
p.synapse_stream_prefix = self.synapse_stream_prefix
p.synapse_subscribed_channels = self.synapse_subscribed_channels

return p

Expand Down