-
Notifications
You must be signed in to change notification settings - Fork 8.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Throttle endorsement and deliver service #529
Conversation
a6bc344
to
82cb54c
Compare
common/deliver/deliver.go
Outdated
// A goroutine is pinned to an os thread for file i/o. | ||
// Throttling limits the number of os threads created when getting blocks from ledger. | ||
if h.ThrottleSemaphore != nil { | ||
if err := h.ThrottleSemaphore.Acquire(ctx); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember we had a no-op semaphore somewhere, didn't we? Can't we inject it instead and then we don't need to do this if statement here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a no-op semaphore.
core/endorser/endorser.go
Outdated
@@ -368,6 +372,15 @@ func (e *Endorser) ProcessProposalSuccessfullyOrError(up *UnpackedProposal) (*pb | |||
txParams.HistoryQueryExecutor = hqe | |||
} | |||
|
|||
// A goroutine is pinned to an os thread for file i/o. | |||
// Throttle the concurrent calls to limit the number of os threads created due to file i/o's (via ledger APIs). | |||
if e.ThrottleSemaphore != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto the above comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a noop semaphore
@@ -334,7 +336,9 @@ func (e *Endorser) ProcessProposal(ctx context.Context, signedProp *pb.SignedPro | |||
return pResp, nil | |||
} | |||
|
|||
func (e *Endorser) ProcessProposalSuccessfullyOrError(up *UnpackedProposal) (*pb.ProposalResponse, error) { | |||
// ProcessProposalSuccessfullyOrError processes the proposal and returns a response or error. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an alternative, more (in my opinion) clean way of doing this for the endorser gRPC:
We already have several authentication filter which are essentially a chaining on endorsement functions.
Line 386 in f20eeb4
name: ExpirationCheck # This filter checks identity x509 certificate expiration |
To add a new built-in filter, we:
- Put a construction of it in https://github.com/hyperledger/fabric/blob/master/core/handlers/library/library.go similarly to the existing functions
- append it to the
handlers.authFilters
in core.yaml
Or, alternatively we can always manually append if to the list at construction time like we do here:
https://github.com/hyperledger/fabric/blob/master/internal/peer/node/start.go#L835
This leaves the endorser code intact.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will not use the filter approach because it is better to throttle for as minimum code as needed. Otherwise, it may unnecessarily limit the concurrency.
3c82ab0
to
29d4c4d
Compare
/ci-run |
AZP build triggered! |
common/deliver/deliver.go
Outdated
// Throttling limits the number of os threads created when getting blocks (file i/o's) from ledger. | ||
if err := h.ThrottleSemaphore.Acquire(ctx); err != nil { | ||
logger.Errorf("[channel: %s] Failed to acquire semaphore to get a block", chdr.ChannelId) | ||
return cb.Status_BAD_REQUEST, errors.WithMessage(err, "failed to acquire semaphore to get a block") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that this should be Status_BAD_REQUEST , shouldn't this be Status_INTERNAL_SERVER_ERROR ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If semaphore.Acquire returns an error, it means the user context is cancelled (e.g., client closed the connection). Therefore I return a bad request, but looking at the existing code, it actually returns Status_INTERNAL_SERVER_ERROR. Will change to Status_INTERNAL_SERVER_ERROR.
29d4c4d
to
b2a3764
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good if some PTE numbers can be gathered for this kind of change to have a guess of performance impact.
internal/peer/node/start.go
Outdated
// Create a semaphore for throttle control | ||
var throttleSemaphore Semaphore = &semaphore.NoopSemaphore{} | ||
if viper.IsSet("peer.limits.concurrency.clients") { | ||
throttleSemaphore = semaphore.New(viper.GetInt("peer.limits.concurrency.clients")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that there was an attempt to remove references to viper.GetXXX in the flowing code and these calls were consolidated/wrapped in a component specific functions. Can you look if this can be added to one of the existing components config (perhaps peerConfig)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
core/endorser/endorser.go
Outdated
if err := e.ThrottleSemaphore.Acquire(ctx); err != nil { | ||
return nil, errors.WithMessage(err, "failed to acquire semaphore for endorsement") | ||
} | ||
defer e.ThrottleSemaphore.Release() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to lock it for entire function. This should be release right after step 1 - when the simulation is done at line 394.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both e.Support.ChaincodeEndorsementInfo and e.SimulateProposal call ledger API. I add the throttle in this func right before ChaincodeEndorsementInfo for simplicity. To limit the throttling scope, I can move ChaincodeEndorsementInfo and SimulateProposal into a single method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My comment was more about "defer" statement instead of explicitly releasing after line 394.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Manish, thank you for the comment. I moved the related code to a separate method because otherwise we had to call Release multiple times due to the error branch.
common/deliver/deliver.go
Outdated
logger.Errorf("[channel: %s] Failed to acquire semaphore to get a block", chdr.ChannelId) | ||
return cb.Status_INTERNAL_SERVER_ERROR, errors.WithMessage(err, "failed to acquire semaphore to get a block") | ||
} | ||
defer h.ThrottleSemaphore.Release() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function could be very long running based on how many blocks the client is pulling. Not sure whether keeping the lock for entire function is a good strategy. Would there be an issue if you move the locking around "Next" function below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, this is for simplicity. There are multiple places calling ledger APIs. (1) chain.Reader().Iterator; (2) cursor.Next(); (3) srv.SendBlockResponse (only for deliver with pvtdata). Since we decide to throttle at the high level, I didn't break it into multiple locks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I agree to the trade-off and that's why its hard to make these choices without being backed by some performance measurement. To me, it would be like shooting in the dark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To solve a potential deadlock, I changed the code to call throttling multiple times, in each of the above 3 places calling ledger API. Also exported a WaitForNextBlock method from block iterator.
1556a11
to
f8a5b84
Compare
/ci-run |
AZP build triggered! |
@@ -68,6 +68,16 @@ func (itr *blocksItr) shouldClose() bool { | |||
return itr.closeMarker | |||
} | |||
|
|||
// WaitForNextBlock waits until next block is available or the iterator should close | |||
// It returns bools to indicate if next block is available and if the iterator should close | |||
func (itr *blocksItr) WaitForNextBlock() (bool, bool) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As this is a new exported function. Better to add UT for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
ResultsIterator | ||
// WaitForNextBlock waits until next block is available or the iterator should close | ||
// It returns bools to indicate if next block is available and if the iterator should close | ||
WaitForNextBlock() (bool, bool) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This new function name suggests that the type ResultsIteratorWithWait should better be called BlocksIteratorWithWait.... In fact, this can simply be called "BlocksIterator" as this is the only type for iterating over blocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed to BlocksIterator.
common/deliver/deliver.go
Outdated
@@ -346,6 +366,28 @@ func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.E | |||
return cb.Status_SUCCESS, nil | |||
} | |||
|
|||
func (h *Handler) getNextBlock(ctx context.Context, channelID string, it blockledger.Iterator) (*cb.Block, cb.Status) { | |||
nextBlockAvailable, shouldClose := it.WaitForNextBlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"shouldClose" is a misleading name. The itr is already closed (if this variable is set to true) by the consumer itself. "closed" is a better name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
common/deliver/deliver.go
Outdated
logger.Debugf("[channel: %s] no block is returned because the iterator should close", channelID) | ||
return nil, cb.Status_SERVICE_UNAVAILABLE | ||
} | ||
if !nextBlockAvailable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would rather remove this if block. The contract of the api "WaitForBlock" is very clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
common/deliver/deliver.go
Outdated
nextBlockAvailable, shouldClose := it.WaitForNextBlock() | ||
if shouldClose { | ||
logger.Debugf("[channel: %s] no block is returned because the iterator should close", channelID) | ||
return nil, cb.Status_SERVICE_UNAVAILABLE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to do anything for this but just to take a note, that this return status is never sent to client... As this if will be executed only if context is canceled in caller goroutine.
common/deliver/deliver_test.go
Outdated
|
||
It("fails to acquire the semaphore", func() { | ||
err := handler.Handle(ctx, server) | ||
Expect(err).To(MatchError("failed to acquire semaphore to get a block: context canceled")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this a potential test-flake? In the code for handle.Handle, the context cancelation could in fact be trapped in the main flow instead of in the goroutine... no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. ctx.Done may be caught in either the goroutine or main flow depending on the timing. Updated the test to accordingly.
func (e *Endorser) performSimulationWithThrottling(ctx context.Context, up *UnpackedProposal, txParams *ccprovider.TransactionParams) ( | ||
*lifecycle.ChaincodeEndorsementInfo, *pb.Response, []byte, *pb.ChaincodeEvent, error) { | ||
|
||
if err := e.ThrottleSemaphore.Acquire(ctx); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just like I commented in the deliver service code, can you verify that this ctx.Done is already not getting called in a separate goroutine already in the caller path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once the code path gets to endorser.ProcessProposal, other than the semaphore, ctx is only used to extract context value which doesn't call ctx.Done. The grpc layer calls ctx.Done in DialContext, but it will return error before getting into endorser.
f8a5b84
to
1e664a3
Compare
Add a new config data to throttle the concurrent client requests to a peer. The throttle is added in endorser and event deliver handler. Signed-off-by: Wenjian Qiao <[email protected]>
1e664a3
to
3c4ca87
Compare
To throttle the number of active requests, the best method is to use a worker pool. Most of the transactional-based systems such as PostgreSQL, MySQL, and CouchDB take max_connection as input from the user and allows only that many numbers of connections or active requests (using a worker pool). When all connections are active, the user would receive a connection refused error. The same exact problem could occur with PostgreSQL or MySQL when we set a high max connections and a very low memory. Is it not possible to implement a connection pool at the endorser? |
Max connection and worker pool are two different setting. Typically, the max connections being significantly higher than worker pool. While max connections would control the number of clients, the worker pool will control the number of threads. In golang, since golang automatically manages the worker pool and let users launch goroutines freely (think of this as job queue/multiplexing), you don't implement an explicit worker pool. Having said that, given that we are using a very large number in the semaphore in this PR, it would definitely make sense to control just at the level of grpc and that was the first option that was discussed. But it appears that Matt had run into some issue (he has mentioned in FAB-14761) when he had tried it at grpc level I don't know the details... @sykesm - do you have more details on why it was performing badly at the grpc layer (assuming that we are not supporting that high concurrent clients) |
I am not sure whether they are two different settings. At least in PostgreSQL, the max_connections map to the max number of processes that handle the user requests. We do have max_user_connections which is different from the worker pool size. I meant goroutine worker pool not pthread worker pool. The user can create any number of goroutines. The go runtime maps the goroutine to pthread. When the pthread is blocked, the goroutine would create a new one. The active number of pthread cannot exceed GO_MAX_PROC but there is no restriction on the number of blocked threads. As a result, the peer could create a large number pthreads which would consume most of the memory (possible OOM). A sample implementation of worker pool in golang From grpc-java grpc/grpc-java#1886 no way to set max connections. Not sure whether we have such a settings in grpc-go. |
MaxConncurrentStream parameter might be the one that control max connections at the server side https://github.com/grpc/grpc-go/blob/master/server.go#L121 |
@cendhu This PR is using a counting semaphore (which is really a buffered channel) to limit the concurrent access from clients. The semaphore is used to block endorsement/deliver requests once the concurrency is over a limit. It effectively limits the number of goroutines created in a peer to handle client requests; therefore limits the number of os threads (blocking and active). Since go manages goroutines, I don't see a need for fabric itself to create and manage a goroutine worker pool. I agree that the concurrency control for deliver service is more complicated than what was initially thought - we have to break it up into 3 places due to a potential deadlock. As mentioned in the defect, there was some issue with limiting the concurrency at grpc level. I will follow up with Matt to get more details. |
@wenjianqiao Yes, a thread pool is not needed. I discussed with @manish-sethi and he convinced me that a pool is not required when the cost of creating a goroutine is much lesser than the cost of creating a process or a pthread. I also found two open-source implementations of goroutine limiter. https://github.com/korovkin/limiter/blob/master/limiter.go is a good one in my view and uses a buffered channel. As you mentioned, the semaphore is implemented using a buffered channel, this part of my question is resolved. I am curious to know the issue with introducing the throttle at the grpc. Some of the discussions I found in the grpc-go repo seem to suggest that throttling can be done at the grpc server. Similar to other established system software, we can expose max_connections (i.e., a throttle limit) as a user-config. Depending on the amount of memory allocated, the user can set the max_connections (or they can allocate memory depending on the required max_connections). |
This can horribly backfire. The deliver service and the gossip service are both stream based and share the same gRPC server. This max streams configuration to my understanding is per gRPC server and not per service type, so this might starve gossip which only needs a single bi-directional stream among every 2 peers. This semaphore approach, can also backfire - a malicious client can open many concurrent deliver streams and thus starve other clients. I think the ideal is to keep track of the clients connected through their identity extracted from the request, and kick old sessions once new sessions with the same identity are authenticated. |
I talked to Matt about the issue with limiting concurrency at grpc level. It is basically the same problem as mentioned by Yacov - may starve gossip and decrease tx throughput when the semaphore is acquired by other requests (e.g, deliver service and qscc). |
Yes, yacov. Max connections and max per client connections are the default config parameter. Manish and i discussed about it offline but forgot to mention it in my earlier comment. What you suggested is used by most existing database systems where the client has to login with credentials. Thanks, Wenjian, for the clarification. So the major reason is the lack of connection limit per grpc service. |
Close this PR due to design change - throttle will be done in grpc interceptor for endoser and deliver services. See #647. |
Signed-off-by: Wenjian Qiao [email protected]
Type of change
Description
Add a new config data to throttle the concurrent client requests
to a peer. The throttle is added to endorser and event deliver handler.
Additional details
Related issues
Release Note
TBD