Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: VReplication parallel copy #8934

Closed
wants to merge 24 commits into from

Conversation

shlomi-noach
Copy link
Contributor

Followup to #8056

This is an initial attempt at paralellizing VReplication copy; the main applicability is for MoveTables or Migrate, with multiple tables involved.

Recap

As quick recap from "The general VReplication flow" in #8056:

  • VReplication currently only ever copies one table at a time

  • Rows are read by rowstreamer using a LOCK TABLES READ + get GTID + START TRANSACTION WITH CONSISTENT SNAPSHOT + UNLOCK TABLES

    • from there on, table rows are read in an open transaction
  • vcopier invokes the above via gRPC, receives the rows and writes them down to target table + updates copy_state

  • catchup and fast forward steps follow, applying events from the binary log

    • to save resources, VReplication does not apply changes to rows that haven been yet in copy range

Why parallelize and what's the premise

Trivially we want to parallelize the copy to save time; we've seen as high as weeks-long mass imports of data.

Parallelization can occur in two places:

  • By reading multiple tables concurrently in rowstreamer
  • By writing to multiple tables ocncurrently in vcopier
    • Note: parallelizing writes to a single table can also have gain, when the tables does not have an AUTO_INCREMENT column. If AUTO_INCREMENT exists, basic tests show almost no gain with 2+ concurrent writes. We wish to focus on multi-table concurrency.

We know gRPC is a major source of overhead, and so we want to avoid multiple gRPC calls; we also assume that parallel data transfer across the network is not faster than serial data transfer across the network, assuming we're able to keep the network busy/utilized.

How not to parallelize VReplication copy

We don't take current behavior and multiply n times in parallel. If we did that:

  • We'd have n gRPCs
  • We'd create n LOCK TABLE statements
  • We'd pull n times the same binary log events
  • There'd be n VPlayers processing those duplicate events

Proposed solution

We want a single gRPC call that parallelizes into n workers in both ends: on rowstreamer and on vcopier; we want a single vplayer to process all binlog events.

gRPC

We add a VStreamRowsParallel() function, with VStreamRowsParallelRequest message. In essence, it's similar to VStreamRows, but:

  • It provides multiple queries
  • It provides multiple lastPk values

Obviously queries and the lastPK values correspond to each other.

VStreamRowsResponse is extended to include TableName. vcopier will need this to differentiate between responses of different queries/tables.

rowstreamer

  • Receives a VStreamRowsParallel request with multiple queries
  • creates plans for all queries
  • Generates sendQuery for all queries
  • Creates n+1 DB connections. One for each table/plan, plus one global that creates a lock.
  • Runs a single LOCAL TABLES t1 READ, t2 READ, t3 READ, ... query
  • Captures GTID
  • Callbacks to the streamer
    • A new goroutine is created for each table/plan
    • Each has its own connection, issuing a START TRANSACTION WITH CONSISTENT SNAPSHOT
  • Runs UNLOCK TABLES
  • Each table/plan proceeds to SELECT FROM t(i) concurrently. Each maintains pktsize
  • Each calls back to the "main" orchestrating mechanism to send rows. This is serialized.

vreplication/vcopier

Much of the logic is already implictly supported, by virtue of copy_state backend table. VCopier supports multiple tables in a workflow (as MoveTables supports -all flag), and so catchup/fastforward know how to handle the existence of multiple plans and multiple lastPks.

To simplify things, we will parallize by running batches of n tables at a time. This can, and will, have fragmentation. One or two of the tables will be larger than the others; some tables will complete first, but the batch will only complete when all n tables are processed.

We do it that way because this is what allows us to take a single table lock for all tables involved, and to keep our sanity while looking into GTID value.

Best approach would be to use a greedy alorithm: pick tables by size descending. this will parallaize more tables of same size at a time, which optimizes for less fragmentation (fragmentation == time wasted not parallelizing when we have an available slot).

VCopier needs to pick n tables at a time, compute plans for all these tables, and invoke StreamRowsParallel. Possibly there will already be lastPk for some of those tables; this is trivially read from copy_state with no significant changes other than reorganizing the data.

We will create n workers. Ideally, each worker will write to a different table (we have n tables), but it is possible that rowstreamer sends results from one table more frequelty. The logic to parallelize the writes on vcopier is not trivial I think. We can allow for periodic parallelization of writes to same table if that simplifies the code.

vcopier needs to both parallelize (via goroutine) the writes, but at the same time be able to respond to the send function with error result. As I'm writing this the problem goes more complex in my mind... :(

Then, calls to catchup/fastforward converge again; there's only one thread to run those.


Initial PR status:

  • New gRPC and collateral interfaces are implemented
  • Parallelization is implemented on streamer via parallel_rowstreamer.go and single_rowstreamer.go
  • Work began on parallel_vcopier.go; there is an initial refactor to support multiple concurrnet plans;; there is no parallelization yet, and vplayer needs to be extracted/encapsulated.

Checklist

  • Should this PR be backported?
  • Tests were added or are not required
  • Documentation was added or is not required

cc @rohit-nayak-ps @sougou @deepthi ; no need for code review right now, though you're welcome to, of course.

@shlomi-noach shlomi-noach added Type: Enhancement Logical improvement (somewhere between a bug and feature) Component: VReplication labels Oct 5, 2021
@shlomi-noach
Copy link
Contributor Author

Updates:

  • Calls to send() function in parallel_vstreamer.go are now concurrent; serialization did not make sense because we are waiting on parallel execution & return codes from vcopier
  • vcopier now applies vstream responses concurrently. Invocation of vplayer/fasforward are serialized; catchup is outside the concurrency logic anyway, and obviously it is serialized.
  • I've actually activated parallel vcopier as default in vreplication.go; let's see if all the tests break.

@shlomi-noach
Copy link
Contributor Author

Whoa. All pre-existing tests are passing with the new logic.

I'll start crafting specialized tests.

@shlomi-noach
Copy link
Contributor Author

Of course all tests passed. They weren't running the new flow after all.

@shlomi-noach
Copy link
Contributor Author

Most test failures right now seems to originate again by the type of testing we do: our endtoend tests look for a specific sequence of queries, and now parallelism ruined it all...

@shlomi-noach
Copy link
Contributor Author

Thoughts on the design are welcome

@deepthi
Copy link
Member

deepthi commented Oct 20, 2021

Any guesses / predictions as to how this might affect the memory usage on the vttablets? Both source and target.

@shlomi-noach
Copy link
Contributor Author

Great question, it will add n*vstream_packet_size on vstreamer. Default of vstream_packet_size is 250000.
Probably same amount of memory on vcopier/target.
I'm trying to think if there's anything else that is obvious but I can't find anything else.

@github-actions
Copy link
Contributor

This PR is being marked as stale because it has been open for 30 days with no activity. To rectify, you may do any of the following:

  • Push additional commits to the associated branch.
  • Remove the stale label.
  • Add a comment indicating why it is not stale.

If no action is taken within 7 days, this PR will be closed.

@github-actions github-actions bot added the Stale Marks PRs as stale after a period of inactivity, which are then closed after a grace period. label Jul 16, 2022
@shlomi-noach shlomi-noach removed the Stale Marks PRs as stale after a period of inactivity, which are then closed after a grace period. label Jul 17, 2022
@github-actions
Copy link
Contributor

This PR is being marked as stale because it has been open for 30 days with no activity. To rectify, you may do any of the following:

  • Push additional commits to the associated branch.
  • Remove the stale label.
  • Add a comment indicating why it is not stale.

If no action is taken within 7 days, this PR will be closed.

@github-actions github-actions bot added the Stale Marks PRs as stale after a period of inactivity, which are then closed after a grace period. label Aug 24, 2022
@github-actions
Copy link
Contributor

This PR was closed because it has been stale for 7 days with no activity.

@github-actions github-actions bot closed this Aug 31, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Component: VReplication Stale Marks PRs as stale after a period of inactivity, which are then closed after a grace period. Type: Enhancement Logical improvement (somewhere between a bug and feature) Type: Performance
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants