-
Notifications
You must be signed in to change notification settings - Fork 454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP distributed FATE #3964
base: main
Are you sure you want to change the base?
WIP distributed FATE #3964
Conversation
Adds support to FATE for running multiple instances in different processes. Three major changes were made to support this. First, reserving a fate operation was moved from memory to zookeeper. Second, support for partitioning operations between multiple FATE instances was added. Partitioning is accomplished via a new class at the manager lever named PartitionData. This class provides the total number of manager processes and which unique number a given process is. Third, a new thread was added to fate instances that has the sole job of finding work. This change supports multiple fate instances, but does not make any changes to the manager to make this an actuality. The manager would need broader changes to support to multiple manager processes. When making these changes FATE can simply be passed a PartitionData supplier. This change is one part of a larger set of changes to support running multiple manager processes. Extensive changes were made to the internal interface used to store FATE data. These changes were required to move reserving data from in memory data structures to zookeeper. The interface was modified to return a new per fate operation type. On the old interface, one used to continually pass around a fate operation id. Now the following is done instead. ```java FatesStore allFates = ...; FateStore individualFateStore = FatesStore.reserve(123); individualFateStore.setState(FAILED); individualFateStore.unreserve(); ``` With the above changes to the FATE storage interface its easy to have an internal UUID associated with the reserved FATE operation, this was not possible with the old interface. This change also made it easier to handle deleting a reserved fate operation. The way operations to work on are found was changed substantially in this PR. There is a new single thread that looks for work using the partitioning information. If this thread does not find any work it will start doing exponential backoff. This changes was made for two reasons. First to avoid all threads scanning the FATE data store looking for work. Second now that there are multiple processes adding and running FATE op, a process can not rely on in memory signals to know to look for work. So having a single thread poll with exponential backoff is an efficient way to find work in this new scenario. If there a 10 fate instances each with 32 thread, this avoid having 320 threads scanning the store looking for work. Instead only 10 threads would be scanning in this scenario. Eventually when fate supports storing data in an Accumulo table, this scanning can use range partitioning making it very efficient. The scanning done in the zookeeper store in this PR uses hash partitioning, so each fate instance scans all data and filters out things outside of its partition. To get an overview of the functionality these changes offer, look at the new MultipleFateInstancesIT.testMultipleFateInstances() test. Fates ageoff functionality was dropped in this PR. The functionality is still needed, however the way it was implemented would not work well for multiple instances of FATE. Rather than attempt to change something that needed to reimplemented, I just removed it for now. Think it will be cleaner to add a new implementation later. Future work * Multiple managers that start fate and pass in partition data * Rework admin tools to provide information about locations where fate ops have run and are running * Billion splits test. When we can have multiple managers with 1000s of total fate threads, run a test on small cluster to create a table with 1 billion+ tablets. Would probably also need FATE data stored in an Accumulo table for this test. * Re-implement the age off functionality in such a way that it does not store data in memory * Implement finding FATE ops that are reserved by dead processes and remove the reservation * Fate metrics and upgrade code may have been broken by these changes, may fix that in this PR * Maybe other functional services running in the manager like TGW can use the PartitionData class to distribute work. * Within FATE operations, singleton manager services are used like the event coordinator. This will need to be addressed to support multiple managers. * The thread looking for work should consider the minimum deferment time when deciding how long to sleep the case where nothing was found.
I haven't looked at the code yet, but this implies that we have the ability to have multiple managers. If we are not going to have multiple managers, then these changes are not necessary. If we are going to support multiple managers, then it might make sense to finish #3262 and get it merged first. There are modifications in that PR that will have a direct impact on this PR. #3262 is currently in conflict, I'll work on getting it back in a good state. |
Yeah will definitely need 3262 to make this work overall. When looking at this, take a look at the new test MultipleFateInstancesIT.testMultipleFateInstances() and PartitionData. Currently the manager always returns a PartitionData(0,1) in this PR. When there are multiple managers, the manager will need to compute the partition data. Thinking it can get the list of all managers from ZK and sort them, then find its position in the sorted list to compute this. Hoping we can push this partition data object to other things in the manager like TGW and compaction coordinator and they all use to decided how to behave in the multiple manager case. |
One reason I am hoping this can be done is to simplify the manager code. We have been moving code that is specific to TGW from the manager class into TGW as we have changed functionality. Would like to continue this migration of code out of the manager into the functional components it runs. Hoping eventually the manager can have little code other than creating functional components like TGW, compaction coordinator, fate, etc and wiring them together. Hoping the partition data concept can help with this as how different component parition their work/data will be different. But not sure if it will actually work at this point. |
Dealing with the merge conflicts on #3262 was a pain. I decided to rebase instead. It's up to date and working at this point. I need to refresh myself more on it as I submitted it over 6 months ago. |
Refactored the storage layer of FATE to return an object when reserving a fate transaction. This object allows mutating the storage related to that FATE transaction. This replaces methods where the fate transaction id had to to always be passed. These changes are a subset of the changes in apache#3964, but only focusing on the storage layer refactoring and nothing else.
Refactored the storage layer of FATE to return an object when reserving a fate transaction. This object allows mutating the storage related to that FATE transaction. This replaces methods where the fate transaction id had to to always be passed. These changes are a subset of the changes in #3964, but only focusing on the storage layer refactoring and nothing else. Co-authored-by: Christopher L. Shannon (cshannon) <[email protected]>
This change modifies FATE to use singe thread to find work. It also cleans up some of the signaling between threads in FATE and fixes a synchronization bug in FATE that was introduced in apache#4017. The bug introduced in apache#4017 is that somethings are syncronizing on the wrong object because a new inner class was introduced. These changes were pulled from apache#3964 and cleaned up and improved.
This change modifies FATE to use singe thread to find work. It also cleans up some of the signaling between threads in FATE and fixes a synchronization bug in FATE that was introduced in #4017. The bug introduced in #4017 is that somethings are syncronizing on the wrong object because a new inner class was introduced. These changes were pulled from #3964 and cleaned up and improved.
Adds support to FATE for running multiple instances in different processes. Three major changes were made to support this. First, reserving a fate operation was moved from memory to zookeeper. Second, support for partitioning operations between multiple FATE instances was added. Partitioning is accomplished via a new class at the manager lever named PartitionData. This class provides the total number of manager processes and which unique number a given process is. Third, a new thread was added to fate instances that has the sole job of finding work.
This change supports multiple fate instances, but does not make any changes to the manager to make this an actuality. The manager would need broader changes to support to multiple manager processes. When making these changes FATE can simply be passed a PartitionData supplier. This change is one part of a larger set of changes to support running multiple manager processes.
Extensive changes were made to the internal interface used to store FATE data. These changes were required to move reserving data from in memory data structures to zookeeper. The interface was modified to return a new per fate operation type. On the old interface, one used to continually pass around a fate operation id. Now the following is done instead.
With the above changes to the FATE storage interface its easy to have an internal UUID associated with the reserved FATE operation, this was not possible with the old interface. This change also made it easier to handle deleting a reserved fate operation.
The way operations to work on are found was changed substantially in this PR. There is a new single thread that looks for work using the partitioning information. If this thread does not find any work it will start doing exponential backoff. This changes was made for two reasons. First to avoid all threads scanning the FATE data store looking for work. Second now that there are multiple processes adding and running FATE op, a process can not rely on in memory signals to know to look for work. So having a single thread poll with exponential backoff is an efficient way to find work in this new scenario. If there a 10 fate instances each with 32 thread, this avoid having 320 threads scanning the store looking for work. Instead only 10 threads would be scanning in this scenario. Eventually when fate supports storing data in an Accumulo table, this scanning can use range partitioning making it very efficient. The scanning done in the zookeeper store in this PR uses hash partitioning, so each fate instance scans all data and filters out things outside of its partition.
To get an overview of the functionality these changes offer, look at the new MultipleFateInstancesIT.testMultipleFateInstances() test.
Fates ageoff functionality was dropped in this PR. The functionality is still needed, however the way it was implemented would not work well for multiple instances of FATE. Rather than attempt to change something that needed to reimplemented, I just removed it for now. Think it will be cleaner to add a new implementation later.
Future work