diff --git a/changelog.d/17047.misc b/changelog.d/17047.misc new file mode 100644 index 00000000000..941c1a90119 --- /dev/null +++ b/changelog.d/17047.misc @@ -0,0 +1,2 @@ +Remove need for `stream_positions` table. + diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index 563450a97ed..e23c0cd3082 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -80,7 +80,6 @@ def __init__( db_conn=db_conn, db=database, notifier=hs.get_replication_notifier(), - stream_name="account_data", instance_name=self._instance_name, tables=[ ("room_account_data", "instance_name", "stream_id"), diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index bfd492d95d3..dab1b111425 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -107,7 +107,6 @@ def __init__( db_conn, database, notifier=hs.get_replication_notifier(), - stream_name="caches", instance_name=hs.get_instance_name(), tables=[ ( diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index e17821ff6ea..d2e983519e2 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -99,7 +99,6 @@ def __init__( db_conn=db_conn, db=database, notifier=hs.get_replication_notifier(), - stream_name="to_device", instance_name=self._instance_name, tables=[ ("device_inbox", "instance_name", "stream_id"), diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index e39d4b96242..9b94efe514e 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -202,7 +202,6 @@ def __init__( db_conn=db_conn, db=database, notifier=hs.get_replication_notifier(), - stream_name="events", instance_name=hs.get_instance_name(), tables=[("events", "instance_name", "stream_ordering")], sequence_name="events_stream_seq", @@ -212,7 +211,6 @@ def __init__( db_conn=db_conn, db=database, notifier=hs.get_replication_notifier(), - stream_name="backfill", instance_name=hs.get_instance_name(), tables=[("events", "instance_name", "stream_ordering")], sequence_name="events_backfill_stream_seq", @@ -314,7 +312,6 @@ def get_chain_id_txn(txn: Cursor) -> int: db_conn=db_conn, db=database, notifier=hs.get_replication_notifier(), - stream_name="un_partial_stated_event_stream", instance_name=hs.get_instance_name(), tables=[ ("un_partial_stated_event_stream", "instance_name", "stream_id") diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 567c2d30bd2..c5f8b20de59 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -96,7 +96,6 @@ def __init__( db_conn=db_conn, db=database, notifier=hs.get_replication_notifier(), - stream_name="presence_stream", instance_name=self._instance_name, tables=[("presence_stream", "instance_name", "stream_id")], sequence_name="presence_stream_sequence", diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index d513c425301..266d5e2f07c 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -89,7 +89,6 @@ def __init__( db_conn=db_conn, db=database, notifier=hs.get_replication_notifier(), - stream_name="receipts", instance_name=self._instance_name, tables=[("receipts_linearized", "instance_name", "stream_id")], sequence_name="receipts_sequence", diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 81c7bf37122..357356e3e92 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -162,7 +162,6 @@ def __init__( db_conn=db_conn, db=database, notifier=hs.get_replication_notifier(), - stream_name="un_partial_stated_room_stream", instance_name=self._instance_name, tables=[ ("un_partial_stated_room_stream", "instance_name", "stream_id") diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index fadc75cc803..bb36bdc74e4 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -48,7 +48,6 @@ import attr from sortedcontainers import SortedList, SortedSet -from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.database import ( DatabasePool, LoggingDatabaseConnection, @@ -343,8 +342,6 @@ class MultiWriterIdGenerator(AbstractStreamIdGenerator): Args: db_conn db - stream_name: A name for the stream, for use in the `stream_positions` - table. (Does not need to be the same as the replication stream name) instance_name: The name of this instance. tables: List of tables associated with the stream. Tuple of table name, column name that stores the writer's instance name, and @@ -363,7 +360,6 @@ def __init__( db_conn: LoggingDatabaseConnection, db: DatabasePool, notifier: "ReplicationNotifier", - stream_name: str, instance_name: str, tables: List[Tuple[str, str, str]], sequence_name: str, @@ -372,7 +368,6 @@ def __init__( ) -> None: self._db = db self._notifier = notifier - self._stream_name = stream_name self._instance_name = instance_name self._positive = positive self._writers = writers @@ -412,16 +407,14 @@ def __init__( # will be gapless; gaps can form when e.g. a transaction was rolled # back. This means that sometimes we won't be able to skip forward the # position even though everything has been persisted. However, since - # gaps should be relatively rare it's still worth doing the book keeping + # gaps should be relatively rare it's still worth doing the bookkeeping # that allows us to skip forwards when there are gapless runs of # positions. # # We start at 1 here as a) the first generated stream ID will be 2, and # b) other parts of the code assume that stream IDs are strictly greater # than 0. - self._persisted_upto_position = ( - min(self._current_positions.values()) if self._current_positions else 1 - ) + self._persisted_upto_position = 1 self._known_persisted_positions: List[int] = [] # The maximum stream ID that we have seen been allocated across any writer. @@ -440,7 +433,6 @@ def __init__( db_conn, table=table, id_column=id_column, - stream_name=stream_name, positive=positive, ) @@ -451,7 +443,8 @@ def __init__( self._current_positions.values(), default=1 ) - # For the case where `stream_positions` is not up to date, + # TODO: this needs updating or verifying + # For the case where `stream_positions` is not up-to-date, # `_persisted_upto_position` may be higher. self._max_seen_allocated_stream_id = max( self._max_seen_allocated_stream_id, self._persisted_upto_position @@ -463,7 +456,7 @@ def __init__( if not writers: # If there have been no explicit writers given then any instance can - # write to the stream. In which case, let's pre-seed our own + # write to the stream. In which case, lets pre-seed our own # position with the current minimum. self._current_positions[self._instance_name] = self._persisted_upto_position @@ -473,57 +466,66 @@ def _load_current_ids( tables: List[Tuple[str, str, str]], ) -> None: cur = db_conn.cursor(txn_name="_load_current_ids") - - # Load the current positions of all writers for the stream. - if self._writers: - # We delete any stale entries in the positions table. This is - # important if we add back a writer after a long time; we want to - # consider that a "new" writer, rather than using the old stale - # entry here. - sql = """ - DELETE FROM stream_positions - WHERE - stream_name = ? - AND instance_name != ALL(?) - """ - cur.execute(sql, (self._stream_name, self._writers)) - + max_stream_id = 1 + + # Load the current positions of all writers for the stream from its provided + # table. Differentiate between(potentially) multiple workers for a stream by the + # instance_name. For a multiple event writer example: + # event_persister1 could be at 6 + # but event_persister2 could be at 3 + # For the case of no explicit writers, this will still pull the data for + # populating max_stream_id. + rows: List[Tuple[str, int]] = [] + for table, instance_column, id_column in tables: sql = """ - SELECT instance_name, stream_id FROM stream_positions - WHERE stream_name = ? - """ - cur.execute(sql, (self._stream_name,)) - - self._current_positions = { - instance: stream_id * self._return_factor - for instance, stream_id in cur - if instance in self._writers + SELECT %(instance)s, %(agg)s(%(id)s) FROM %(table)s + GROUP BY %(instance)s + """ % { + "id": id_column, + "table": table, + "instance": instance_column, + "agg": "MAX" if self._positive else "MIN", } + cur.execute(sql) + + # Cast safety: this corresponds to the types returned by the query above. + rows.extend(cast(Iterable[Tuple[str, int]], cur)) + + # This is filtered by writer... + self._current_positions = { + instance: stream_id * self._return_factor + for instance, stream_id in rows + if instance in self._writers + } + + # ...where this is not + for _, stream_id in rows: + max_stream_id = max(max_stream_id, stream_id) # We set the `_persisted_upto_position` to be the minimum of all current # positions. If empty we use the max stream ID from the DB table. + + # Note some oddities around writer instances that disappear then reappear. + # For example with an events stream writer: + # 1. Using the main process to persist events + # 2. Declaring a separate events worker for some time + # 3. Resetting back to the main process + # It should resolve itself the first time (3) writes a row, thereby advancing + # its position. + min_stream_id = min(self._current_positions.values(), default=None) if min_stream_id is None: - # We add a GREATEST here to ensure that the result is always - # positive. (This can be a problem for e.g. backfill streams where - # the server has never backfilled). - max_stream_id = 1 - for table, _, id_column in tables: - sql = """ - SELECT GREATEST(COALESCE(%(agg)s(%(id)s), 1), 1) - FROM %(table)s - """ % { - "id": id_column, - "table": table, - "agg": "MAX" if self._positive else "-MIN", - } - cur.execute(sql) - result = cur.fetchone() - assert result is not None - (stream_id,) = result - - max_stream_id = max(max_stream_id, stream_id) + # A min_stream_id being None means: + # 1. This table has never been written to + # 2. Whatever stream writer wrote to it is no longer available + # 3. It doesn't matter if the writer position is tracked(see cache + # invalidation stream for example) + # Use the max_stream_id as all of those stream ids will be guaranteed + # to be persisted or will default to 1. + # TODO: is this next comment still valid? + # (This can be a problem for e.g. backfill streams where the server has + # never backfilled). self._persisted_upto_position = max_stream_id else: @@ -531,16 +533,10 @@ def _load_current_ids( # than it from the DB so that we can prefill # `_known_persisted_positions` and get a more accurate # `_persisted_upto_position`. - # - # We also check if any of the later rows are from this instance, in - # which case we use that for this instance's current position. This - # is to handle the case where we didn't finish persisting to the - # stream positions table before restart (or the stream position - # table otherwise got out of date). self._persisted_upto_position = min_stream_id - rows: List[Tuple[str, int]] = [] + rows = [] for table, instance_column, id_column in tables: sql = """ SELECT %(instance)s, %(id)s FROM %(table)s @@ -553,32 +549,17 @@ def _load_current_ids( } cur.execute(sql, (min_stream_id * self._return_factor,)) - # Cast safety: this corresponds to the types returned by the query above. + # Cast safety: this corresponds to the types returned by the query above rows.extend(cast(Iterable[Tuple[str, int]], cur)) - # Sort by stream_id (ascending, lowest -> highest) so that we handle - # rows in order for each instance because we don't want to overwrite - # the current_position of an instance to a lower stream ID than - # we're actually at. - def sort_by_stream_id_key_func(row: Tuple[str, int]) -> int: - (instance, stream_id) = row - # If `stream_id` is ever `None`, we will see a `TypeError: '<' - # not supported between instances of 'NoneType' and 'X'` error. - return stream_id - - rows.sort(key=sort_by_stream_id_key_func) - with self._lock: for ( - instance, + _, stream_id, ) in rows: stream_id = self._return_factor * stream_id self._add_persisted_position(stream_id) - if instance == self._instance_name: - self._current_positions[instance] = stream_id - if self._writers: # If we have explicit writers then make sure that each instance has # a position. @@ -661,21 +642,6 @@ def get_next_txn(self, txn: LoggingTransaction) -> int: txn.call_on_exception(self._mark_ids_as_finished, [next_id]) txn.call_after(self._notifier.notify_replication) - # Update the `stream_positions` table with newly updated stream - # ID (unless self._writers is not set in which case we don't - # bother, as nothing will read it). - # - # We only do this on the success path so that the persisted current - # position points to a persisted row with the correct instance name. - if self._writers: - txn.call_after( - run_as_background_process, - "MultiWriterIdGenerator._update_table", - self._db.runInteraction, - "MultiWriterIdGenerator._update_table", - self._update_stream_positions_table_txn, - ) - return self._return_factor * next_id def get_next_mult_txn(self, txn: LoggingTransaction, n: int) -> List[int]: @@ -697,21 +663,6 @@ def get_next_mult_txn(self, txn: LoggingTransaction, n: int) -> List[int]: txn.call_on_exception(self._mark_ids_as_finished, next_ids) txn.call_after(self._notifier.notify_replication) - # Update the `stream_positions` table with newly updated stream - # ID (unless self._writers is not set in which case we don't - # bother, as nothing will read it). - # - # We only do this on the success path so that the persisted current - # position points to a persisted row with the correct instance name. - if self._writers: - txn.call_after( - run_as_background_process, - "MultiWriterIdGenerator._update_table", - self._db.runInteraction, - "MultiWriterIdGenerator._update_table", - self._update_stream_positions_table_txn, - ) - return [self._return_factor * next_id for next_id in next_ids] def _mark_ids_as_finished(self, next_ids: List[int]) -> None: @@ -905,27 +856,6 @@ def _add_persisted_position(self, new_id: int) -> None: # do. break - def _update_stream_positions_table_txn(self, txn: Cursor) -> None: - """Update the `stream_positions` table with newly persisted position.""" - - if not self._writers: - return - - # We upsert the value, ensuring on conflict that we always increase the - # value (or decrease if stream goes backwards). - sql = """ - INSERT INTO stream_positions (stream_name, instance_name, stream_id) - VALUES (?, ?, ?) - ON CONFLICT (stream_name, instance_name) - DO UPDATE SET - stream_id = %(agg)s(stream_positions.stream_id, EXCLUDED.stream_id) - """ % { - "agg": "GREATEST" if self._positive else "LEAST", - } - - pos = (self.get_current_token_for_writer(self._instance_name),) - txn.execute(sql, (self._stream_name, self._instance_name, pos)) - @attr.s(frozen=True, auto_attribs=True) class _AsyncCtxManagerWrapper(Generic[T]): @@ -986,22 +916,4 @@ async def __aexit__( if exc_type is not None: return False - # Update the `stream_positions` table with newly updated stream - # ID (unless self._writers is not set in which case we don't - # bother, as nothing will read it). - # - # We only do this on the success path so that the persisted current - # position points to a persisted row with the correct instance name. - # - # We do this in autocommit mode as a) the upsert works correctly outside - # transactions and b) reduces the amount of time the rows are locked - # for. If we don't do this then we'll often hit serialization errors due - # to the fact we default to REPEATABLE READ isolation levels. - if self.id_gen._writers: - await self.id_gen._db.runInteraction( - "MultiWriterIdGenerator._update_table", - self.id_gen._update_stream_positions_table_txn, - db_autocommit=True, - ) - return False diff --git a/synapse/storage/util/sequence.py b/synapse/storage/util/sequence.py index f57e7ec41cd..6d5bae14c1b 100644 --- a/synapse/storage/util/sequence.py +++ b/synapse/storage/util/sequence.py @@ -51,21 +51,6 @@ See docs/postgres.md for more information. """ -_INCONSISTENT_STREAM_ERROR = """ -Postgres sequence '%(seq)s' is inconsistent with associated stream position -of '%(stream_name)s' in the 'stream_positions' table. - -This is likely a programming error and should be reported at -https://github.com/matrix-org/synapse. - -A temporary workaround to fix this error is to shut down Synapse (including -any and all workers) and run the following SQL: - - DELETE FROM stream_positions WHERE stream_name = '%(stream_name)s'; - -This will need to be done every time the server is restarted. -""" - class SequenceGenerator(metaclass=abc.ABCMeta): """A class which generates a unique sequence of integers""" @@ -86,7 +71,6 @@ def check_consistency( db_conn: "LoggingDatabaseConnection", table: str, id_column: str, - stream_name: Optional[str] = None, positive: bool = True, ) -> None: """Should be called during start up to test that the current value of @@ -95,11 +79,6 @@ def check_consistency( This is to handle various cases where the sequence value can get out of sync with the table, e.g. if Synapse gets rolled back to a previous version and the rolled forwards again. - - If a stream name is given then this will check that any value in the - `stream_positions` table is less than or equal to the current sequence - value. If it isn't then it's likely that streams have been crossed - somewhere (e.g. two ID generators have the same stream name). """ ... @@ -127,7 +106,6 @@ def check_consistency( db_conn: "LoggingDatabaseConnection", table: str, id_column: str, - stream_name: Optional[str] = None, positive: bool = True, ) -> None: """See SequenceGenerator.check_consistency for docstring.""" @@ -158,17 +136,6 @@ def check_consistency( assert fetch_res is not None last_value, is_called = fetch_res - # If we have an associated stream check the stream_positions table. - max_in_stream_positions = None - if stream_name: - txn.execute( - "SELECT MAX(stream_id) FROM stream_positions WHERE stream_name = ?", - (stream_name,), - ) - row = txn.fetchone() - if row: - max_in_stream_positions = row[0] - txn.close() # If `is_called` is False then `last_value` is actually the value that @@ -189,14 +156,6 @@ def check_consistency( % {"seq": self._sequence_name, "table": table, "max_id_sql": table_sql} ) - # If we have values in the stream positions table then they have to be - # less than or equal to `last_value` - if max_in_stream_positions and max_in_stream_positions > last_value: - raise IncorrectDatabaseSetup( - _INCONSISTENT_STREAM_ERROR - % {"seq": self._sequence_name, "stream_name": stream_name} - ) - GetFirstCallbackType = Callable[[Cursor], int] @@ -249,7 +208,6 @@ def check_consistency( db_conn: Connection, table: str, id_column: str, - stream_name: Optional[str] = None, positive: bool = True, ) -> None: # There is nothing to do for in memory sequences @@ -263,7 +221,6 @@ def build_sequence_generator( sequence_name: str, table: Optional[str], id_column: Optional[str], - stream_name: Optional[str] = None, positive: bool = True, ) -> SequenceGenerator: """Get the best impl of SequenceGenerator available @@ -291,7 +248,6 @@ def build_sequence_generator( db_conn=db_conn, table=table, id_column=id_column, - stream_name=stream_name, positive=positive, ) diff --git a/tests/storage/test_id_generators.py b/tests/storage/test_id_generators.py index 409d856ab9c..f511cccf95a 100644 --- a/tests/storage/test_id_generators.py +++ b/tests/storage/test_id_generators.py @@ -205,7 +205,6 @@ def _create(conn: LoggingDatabaseConnection) -> MultiWriterIdGenerator: conn, self.db_pool, notifier=self.hs.get_replication_notifier(), - stream_name="test_stream", instance_name=instance_name, tables=[("foobar", "instance_name", "stream_id")], sequence_name="foobar_seq", @@ -225,13 +224,6 @@ def _insert(txn: LoggingTransaction) -> None: "INSERT INTO foobar VALUES (nextval('foobar_seq'), ?)", (instance_name,), ) - txn.execute( - """ - INSERT INTO stream_positions VALUES ('test_stream', ?, lastval()) - ON CONFLICT (stream_name, instance_name) DO UPDATE SET stream_id = lastval() - """, - (instance_name,), - ) self.get_success(self.db_pool.runInteraction("_insert_rows", _insert)) @@ -249,13 +241,6 @@ def _insert(txn: LoggingTransaction) -> None: ), ) txn.execute("SELECT setval('foobar_seq', ?)", (stream_id,)) - txn.execute( - """ - INSERT INTO stream_positions VALUES ('test_stream', ?, ?) - ON CONFLICT (stream_name, instance_name) DO UPDATE SET stream_id = ? - """, - (instance_name, stream_id, stream_id), - ) self.get_success(self.db_pool.runInteraction("_insert_row_with_id", _insert)) @@ -604,6 +589,10 @@ def test_writer_config_change(self) -> None: self.assertEqual(id_gen.get_current_token_for_writer("first"), 3) self.assertEqual(id_gen.get_current_token_for_writer("second"), 5) + # Going to test gap closing at the bottom of the test, save this guy for later, + # otherwise we can not write to "first" + id_gen_1 = self._create_id_generator("first", writers=["first", "third"]) + # New config removes one of the configs. Note that if the writer is # removed from config we assume that it has been shut down and has # finished persisting, hence why the persisted upto position is 5. @@ -628,17 +617,41 @@ def test_writer_config_change(self) -> None: async def _get_next_async() -> None: async with id_gen_3.get_next() as stream_id: self.assertEqual(stream_id, 6) + # Actually write a row, or the position won't advance + self._insert_row_with_id("third", stream_id) self.get_success(_get_next_async()) self.assertEqual(id_gen_3.get_persisted_upto_position(), 6) - # If we add back the old "first" then we shouldn't see the persisted up - # to position revert back to 3. + # If we add back the old "first" then we should see the persisted up + # to position revert back to 3, as this writer hasn't written anything since. id_gen_5 = self._create_id_generator("five", writers=["first", "third"]) - self.assertEqual(id_gen_5.get_persisted_upto_position(), 6) - self.assertEqual(id_gen_5.get_current_token_for_writer("first"), 6) + self.assertEqual(id_gen_5.get_persisted_upto_position(), 3) + self.assertEqual(id_gen_5.get_current_token_for_writer("first"), 3) self.assertEqual(id_gen_5.get_current_token_for_writer("third"), 6) + # Gap closing + # Check that writing of what should be the next token(7) written to "first" + # actually skips over the gap correctly. + async def _get_next_async_gap_closer() -> None: + # Recall we saved "first" writer above + async with id_gen_1.get_next() as stream_id: + self.assertEqual(stream_id, 7) + # Actually write a row, or the position won't advance + self._insert_row_with_id("first", stream_id) + + self.get_success(_get_next_async_gap_closer()) + + # Since this is not a full Synapse Homeserver Test case, there is no + # replication. We have to create yet another id generator to load the new values + id_gen_6 = self._create_id_generator("six", writers=["first", "third"]) + + # Updating "first" does advance the position + self.assertEqual(id_gen_6.get_persisted_upto_position(), 7) + # Updates both tokens correctly + self.assertEqual(id_gen_6.get_current_token_for_writer("first"), 7) + self.assertEqual(id_gen_6.get_current_token_for_writer("third"), 7) + def test_sequence_consistency(self) -> None: """Test that we error out if the table and sequence diverges.""" @@ -752,7 +765,6 @@ def _create(conn: LoggingDatabaseConnection) -> MultiWriterIdGenerator: conn, self.db_pool, notifier=self.hs.get_replication_notifier(), - stream_name="test_stream", instance_name=instance_name, tables=[("foobar", "instance_name", "stream_id")], sequence_name="foobar_seq", @@ -773,13 +785,6 @@ def _insert(txn: LoggingTransaction) -> None: instance_name, ), ) - txn.execute( - """ - INSERT INTO stream_positions VALUES ('test_stream', ?, ?) - ON CONFLICT (stream_name, instance_name) DO UPDATE SET stream_id = ? - """, - (instance_name, -stream_id, -stream_id), - ) self.get_success(self.db_pool.runInteraction("_insert_row", _insert)) @@ -889,7 +894,6 @@ def _create(conn: LoggingDatabaseConnection) -> MultiWriterIdGenerator: conn, self.db_pool, notifier=self.hs.get_replication_notifier(), - stream_name="test_stream", instance_name=instance_name, tables=[ ("foobar1", "instance_name", "stream_id"), @@ -906,7 +910,6 @@ def _insert_rows( table: str, instance_name: str, number: int, - update_stream_table: bool = True, ) -> None: """Insert N rows as the given instance, inserting with stream IDs pulled from the postgres sequence. @@ -918,29 +921,18 @@ def _insert(txn: LoggingTransaction) -> None: "INSERT INTO %s VALUES (nextval('foobar_seq'), ?)" % (table,), (instance_name,), ) - if update_stream_table: - txn.execute( - """ - INSERT INTO stream_positions VALUES ('test_stream', ?, lastval()) - ON CONFLICT (stream_name, instance_name) DO UPDATE SET stream_id = lastval() - """, - (instance_name,), - ) self.get_success(self.db_pool.runInteraction("_insert_rows", _insert)) def test_load_existing_stream(self) -> None: - """Test creating ID gens with multiple tables that have rows from after - the position in `stream_positions` table. - """ + """Test creating ID gens with multiple tables load positions correctly.""" self._insert_rows("foobar1", "first", 3) - self._insert_rows("foobar2", "second", 3) - self._insert_rows("foobar2", "second", 1, update_stream_table=False) + self._insert_rows("foobar2", "second", 4) first_id_gen = self._create_id_generator("first", writers=["first", "second"]) second_id_gen = self._create_id_generator("second", writers=["first", "second"]) - self.assertEqual(first_id_gen.get_positions(), {"first": 3, "second": 6}) + self.assertEqual(first_id_gen.get_positions(), {"first": 3, "second": 7}) self.assertEqual(first_id_gen.get_current_token_for_writer("first"), 7) self.assertEqual(first_id_gen.get_current_token_for_writer("second"), 7) self.assertEqual(first_id_gen.get_persisted_upto_position(), 7)