Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP distributed FATE #3964

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Draft

Conversation

keith-turner
Copy link
Contributor

Adds support to FATE for running multiple instances in different processes. Three major changes were made to support this. First, reserving a fate operation was moved from memory to zookeeper. Second, support for partitioning operations between multiple FATE instances was added. Partitioning is accomplished via a new class at the manager lever named PartitionData. This class provides the total number of manager processes and which unique number a given process is. Third, a new thread was added to fate instances that has the sole job of finding work.

This change supports multiple fate instances, but does not make any changes to the manager to make this an actuality. The manager would need broader changes to support to multiple manager processes. When making these changes FATE can simply be passed a PartitionData supplier. This change is one part of a larger set of changes to support running multiple manager processes.

Extensive changes were made to the internal interface used to store FATE data. These changes were required to move reserving data from in memory data structures to zookeeper. The interface was modified to return a new per fate operation type. On the old interface, one used to continually pass around a fate operation id. Now the following is done instead.

FatesStore allFates = ...;
FateStore individualFateStore = FatesStore.reserve(123);
individualFateStore.setState(FAILED);
individualFateStore.unreserve();

With the above changes to the FATE storage interface its easy to have an internal UUID associated with the reserved FATE operation, this was not possible with the old interface. This change also made it easier to handle deleting a reserved fate operation.

The way operations to work on are found was changed substantially in this PR. There is a new single thread that looks for work using the partitioning information. If this thread does not find any work it will start doing exponential backoff. This changes was made for two reasons. First to avoid all threads scanning the FATE data store looking for work. Second now that there are multiple processes adding and running FATE op, a process can not rely on in memory signals to know to look for work. So having a single thread poll with exponential backoff is an efficient way to find work in this new scenario. If there a 10 fate instances each with 32 thread, this avoid having 320 threads scanning the store looking for work. Instead only 10 threads would be scanning in this scenario. Eventually when fate supports storing data in an Accumulo table, this scanning can use range partitioning making it very efficient. The scanning done in the zookeeper store in this PR uses hash partitioning, so each fate instance scans all data and filters out things outside of its partition.

To get an overview of the functionality these changes offer, look at the new MultipleFateInstancesIT.testMultipleFateInstances() test.

Fates ageoff functionality was dropped in this PR. The functionality is still needed, however the way it was implemented would not work well for multiple instances of FATE. Rather than attempt to change something that needed to reimplemented, I just removed it for now. Think it will be cleaner to add a new implementation later.

Future work

  • Multiple managers that start fate and pass in partition data
  • Rework admin tools to provide information about locations where fate ops have run and are running
  • Billion splits test. When we can have multiple managers with 1000s of total fate threads, run a test on small cluster to create a table with 1 billion+ tablets. Would probably also need FATE data stored in an Accumulo table for this test.
  • Re-implement the age off functionality in such a way that it does not store data in memory
  • Implement finding FATE ops that are reserved by dead processes and remove the reservation
  • Fate metrics and upgrade code may have been broken by these changes, may fix that in this PR
  • Maybe other functional services running in the manager like TGW can use the PartitionData class to distribute work.
  • Within FATE operations, singleton manager services are used like the event coordinator. This will need to be addressed to support multiple managers.
  • The thread looking for work should consider the minimum deferment time when deciding how long to sleep the case where nothing was found.

Adds support to FATE for running multiple instances in different
processes.  Three major changes were made to support this.  First,
reserving a fate operation was moved from memory to zookeeper.  Second,
support for partitioning operations between multiple FATE instances was
added.  Partitioning is accomplished via a new class at the manager
lever named PartitionData.  This class provides the total number of
manager processes and which unique number a given process is. Third, a
new thread was added to fate instances that has the sole job of finding
work.

This change supports multiple fate instances, but does not make any
changes to the manager to make this an actuality. The manager would need
broader changes to support to multiple manager processes.  When making
these changes FATE can simply be passed a PartitionData supplier.  This
change is one part of a larger set of changes to support running multiple
manager processes.

Extensive changes were made to the internal interface used to store FATE
data.  These changes were required to move reserving data from in memory
data structures to zookeeper.  The interface was modified to return a
new per fate operation type.  On the old interface, one used to
continually pass around a fate operation id.  Now the following is done
instead.

```java
FatesStore allFates = ...;
FateStore individualFateStore = FatesStore.reserve(123);
individualFateStore.setState(FAILED);
individualFateStore.unreserve();
```

With the above changes to the FATE storage interface its easy to have an
internal UUID associated with the reserved FATE operation, this was not
possible with the old interface.  This change also made it easier to
handle deleting a reserved fate operation.

The way operations to work on are found was changed substantially in
this PR.  There is a new single thread that looks for work using the
partitioning information.  If this thread does not find any work it will
start doing exponential backoff.  This changes was made for two reasons.
First to avoid all threads scanning the FATE data store looking for
work.  Second now that there are multiple processes adding and running
FATE op, a process can not rely on in memory signals to know to look for
work.  So having a single thread poll with exponential backoff is an
efficient way to find work in this new scenario.  If there a 10 fate
instances each with 32 thread, this avoid having 320 threads scanning
the store looking for work. Instead only 10 threads would be scanning in
this scenario.  Eventually when fate supports storing data in an
Accumulo table, this scanning can use range partitioning making it very
efficient.  The scanning done in the zookeeper store in this PR uses
hash partitioning, so each fate instance scans all data and filters out
things outside of its partition.

To get an overview of the functionality these changes offer, look at the
new MultipleFateInstancesIT.testMultipleFateInstances() test.

Fates ageoff functionality was dropped in this PR.  The functionality is
still needed, however the way it was implemented would not work well for
multiple instances of FATE.  Rather than attempt to change something
that needed to reimplemented, I just removed it for now.  Think it will
be cleaner to add a new implementation later.

Future work

 * Multiple managers that start fate and pass in partition data
 * Rework admin tools to provide information about locations where fate
   ops have run and are running
 * Billion splits test.  When we can have multiple managers with 1000s
   of total fate threads, run a test on small cluster to create a table
   with 1 billion+ tablets.  Would probably also need FATE data stored in
   an Accumulo table for this test.
 * Re-implement the age off functionality in such a way that it does not
   store data in memory
 * Implement finding FATE ops that are reserved by dead processes and
   remove the reservation
 * Fate metrics and upgrade code may have been broken by these changes,
   may fix that in this PR
 * Maybe other functional services running in the manager like TGW can
   use the PartitionData class to distribute work.
 * Within FATE operations, singleton manager services are used like the
   event coordinator.  This will need to be addressed to support
multiple managers.
 * The thread looking for work should consider the minimum deferment
   time when deciding how long to sleep the case where nothing was
   found.
@dlmarion
Copy link
Contributor

I haven't looked at the code yet, but this implies that we have the ability to have multiple managers. If we are not going to have multiple managers, then these changes are not necessary. If we are going to support multiple managers, then it might make sense to finish #3262 and get it merged first. There are modifications in that PR that will have a direct impact on this PR. #3262 is currently in conflict, I'll work on getting it back in a good state.

@keith-turner
Copy link
Contributor Author

I haven't looked at the code yet, but this implies that we have the ability to have multiple managers. If we are not going to have multiple managers, then these changes are not necessary. If we are going to support multiple managers, then it might make sense to finish #3262 and get it merged first. There are modifications in that PR that will have a direct impact on this PR. #3262 is currently in conflict, I'll work on getting it back in a good state.

Yeah will definitely need 3262 to make this work overall. When looking at this, take a look at the new test MultipleFateInstancesIT.testMultipleFateInstances() and PartitionData. Currently the manager always returns a PartitionData(0,1) in this PR. When there are multiple managers, the manager will need to compute the partition data. Thinking it can get the list of all managers from ZK and sort them, then find its position in the sorted list to compute this. Hoping we can push this partition data object to other things in the manager like TGW and compaction coordinator and they all use to decided how to behave in the multiple manager case.

@keith-turner
Copy link
Contributor Author

Hoping we can push this partition data object to other things in the manager like TGW and compaction coordinator and they all use to decided how to behave in the multiple manager case.

One reason I am hoping this can be done is to simplify the manager code. We have been moving code that is specific to TGW from the manager class into TGW as we have changed functionality. Would like to continue this migration of code out of the manager into the functional components it runs. Hoping eventually the manager can have little code other than creating functional components like TGW, compaction coordinator, fate, etc and wiring them together. Hoping the partition data concept can help with this as how different component parition their work/data will be different. But not sure if it will actually work at this point.

@dlmarion
Copy link
Contributor

Dealing with the merge conflicts on #3262 was a pain. I decided to rebase instead. It's up to date and working at this point. I need to refresh myself more on it as I submitted it over 6 months ago.

keith-turner added a commit to keith-turner/accumulo that referenced this pull request Dec 4, 2023
Refactored the storage layer of FATE to return an object when reserving
a fate transaction.  This object allows mutating the storage related to
that FATE transaction.  This replaces methods where the fate transaction
id had to to always be passed.

These changes are a subset of the changes in apache#3964, but only focusing on
the storage layer refactoring and nothing else.
keith-turner added a commit that referenced this pull request Dec 6, 2023
Refactored the storage layer of FATE to return an object when reserving
a fate transaction.  This object allows mutating the storage related to
that FATE transaction.  This replaces methods where the fate transaction
id had to to always be passed.

These changes are a subset of the changes in #3964, but only focusing on
the storage layer refactoring and nothing else.

Co-authored-by: Christopher L. Shannon (cshannon) <[email protected]>
keith-turner added a commit to keith-turner/accumulo that referenced this pull request Dec 7, 2023
This change modifies FATE to use singe thread to find work.  It also
cleans up some of the signaling between threads in FATE and fixes a
synchronization bug in FATE that was introduced in apache#4017.  The bug
introduced in apache#4017 is that somethings are syncronizing on the wrong
object because a new inner class was introduced.

These changes were pulled from apache#3964 and cleaned up and improved.
keith-turner added a commit that referenced this pull request Dec 8, 2023
This change modifies FATE to use singe thread to find work.  It also
cleans up some of the signaling between threads in FATE and fixes a
synchronization bug in FATE that was introduced in #4017.  The bug
introduced in #4017 is that somethings are syncronizing on the wrong
object because a new inner class was introduced.

These changes were pulled from #3964 and cleaned up and improved.
@ctubbsii ctubbsii added this to the 4.0.0 milestone Jul 12, 2024
@dlmarion dlmarion changed the base branch from elasticity to main August 26, 2024 12:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: No status
Development

Successfully merging this pull request may close these issues.

3 participants