Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
document the REPLICATE command a bit better (#6305)
Browse files Browse the repository at this point in the history
* commit 'cc6243b4c':
  document the REPLICATE command a bit better (#6305)
  • Loading branch information
anoadragon453 committed Mar 16, 2020
2 parents b883522 + cc6243b commit 4a634ef
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 10 deletions.
1 change: 1 addition & 0 deletions changelog.d/6305.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add some documentation about worker replication.
15 changes: 14 additions & 1 deletion docs/tcp_replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,20 @@ client (C):

#### REPLICATE (C)

Asks the server to replicate a given stream
Asks the server to replicate a given stream. The syntax is:

```
REPLICATE <stream_name> <token>
```

Where `<token>` may be either:
* a numeric stream_id to stream updates since (exclusive)
* `NOW` to stream all subsequent updates.

The `<stream_name>` is the name of a replication stream to subscribe
to (see [here](../synapse/replication/tcp/streams/_base.py) for a list
of streams). It can also be `ALL` to subscribe to all known streams,
in which case the `<token>` must be set to `NOW`.

#### USER_SYNC (C)

Expand Down
10 changes: 9 additions & 1 deletion synapse/replication/slave/storage/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

import logging
from typing import Dict

import six

Expand Down Expand Up @@ -44,7 +45,14 @@ def __init__(self, db_conn, hs):

self.hs = hs

def stream_positions(self):
def stream_positions(self) -> Dict[str, int]:
"""
Get the current positions of all the streams this store wants to subscribe to
Returns:
map from stream name to the most recent update we have for
that stream (ie, the point we want to start replicating from)
"""
pos = {}
if self._cache_id_gen:
pos["caches"] = self._cache_id_gen.get_current_token()
Expand Down
20 changes: 14 additions & 6 deletions synapse/replication/tcp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,24 @@
"""

import logging
from typing import Dict

from twisted.internet import defer
from twisted.internet.protocol import ReconnectingClientFactory

from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.tcp.protocol import (
AbstractReplicationClientHandler,
ClientReplicationStreamProtocol,
)

from .commands import (
FederationAckCommand,
InvalidateCacheCommand,
RemovePusherCommand,
UserIpCommand,
UserSyncCommand,
)
from .protocol import ClientReplicationStreamProtocol

logger = logging.getLogger(__name__)

Expand All @@ -42,7 +48,7 @@ class ReplicationClientFactory(ReconnectingClientFactory):

maxDelay = 30 # Try at least once every N seconds

def __init__(self, hs, client_name, handler):
def __init__(self, hs, client_name, handler: AbstractReplicationClientHandler):
self.client_name = client_name
self.handler = handler
self.server_name = hs.config.server_name
Expand All @@ -68,13 +74,13 @@ def clientConnectionFailed(self, connector, reason):
ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)


class ReplicationClientHandler(object):
class ReplicationClientHandler(AbstractReplicationClientHandler):
"""A base handler that can be passed to the ReplicationClientFactory.
By default proxies incoming replication data to the SlaveStore.
"""

def __init__(self, store):
def __init__(self, store: BaseSlavedStore):
self.store = store

# The current connection. None if we are currently (re)connecting
Expand Down Expand Up @@ -138,11 +144,13 @@ def on_sync(self, data):
if d:
d.callback(data)

def get_streams_to_replicate(self):
def get_streams_to_replicate(self) -> Dict[str, int]:
"""Called when a new connection has been established and we need to
subscribe to streams.
Returns a dictionary of stream name to token.
Returns:
map from stream name to the most recent update we have for
that stream (ie, the point we want to start replicating from)
"""
args = self.store.stream_positions()
user_account_data = args.pop("user_account_data", None)
Expand Down
74 changes: 72 additions & 2 deletions synapse/replication/tcp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
> ERROR server stopping
* connection closed by server *
"""

import abc
import fcntl
import logging
import struct
Expand All @@ -65,6 +65,7 @@
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util import Clock
from synapse.util.stringutils import random_string

from .commands import (
Expand Down Expand Up @@ -558,11 +559,80 @@ def on_connection_closed(self):
self.streamer.lost_connection(self)


class AbstractReplicationClientHandler(metaclass=abc.ABCMeta):
"""
The interface for the handler that should be passed to
ClientReplicationStreamProtocol
"""

@abc.abstractmethod
def on_rdata(self, stream_name, token, rows):
"""Called to handle a batch of replication data with a given stream token.
Args:
stream_name (str): name of the replication stream for this batch of rows
token (int): stream token for this batch of rows
rows (list): a list of Stream.ROW_TYPE objects as returned by
Stream.parse_row.
Returns:
Deferred|None
"""
raise NotImplementedError()

@abc.abstractmethod
def on_position(self, stream_name, token):
"""Called when we get new position data."""
raise NotImplementedError()

@abc.abstractmethod
def on_sync(self, data):
"""Called when get a new SYNC command."""
raise NotImplementedError()

@abc.abstractmethod
def get_streams_to_replicate(self):
"""Called when a new connection has been established and we need to
subscribe to streams.
Returns:
map from stream name to the most recent update we have for
that stream (ie, the point we want to start replicating from)
"""
raise NotImplementedError()

@abc.abstractmethod
def get_currently_syncing_users(self):
"""Get the list of currently syncing users (if any). This is called
when a connection has been established and we need to send the
currently syncing users."""
raise NotImplementedError()

@abc.abstractmethod
def update_connection(self, connection):
"""Called when a connection has been established (or lost with None).
"""
raise NotImplementedError()

@abc.abstractmethod
def finished_connecting(self):
"""Called when we have successfully subscribed and caught up to all
streams we're interested in.
"""
raise NotImplementedError()


class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
VALID_INBOUND_COMMANDS = VALID_SERVER_COMMANDS
VALID_OUTBOUND_COMMANDS = VALID_CLIENT_COMMANDS

def __init__(self, client_name, server_name, clock, handler):
def __init__(
self,
client_name: str,
server_name: str,
clock: Clock,
handler: AbstractReplicationClientHandler,
):
BaseReplicationStreamProtocol.__init__(self, clock)

self.client_name = client_name
Expand Down

0 comments on commit 4a634ef

Please sign in to comment.