Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throttle endorsement and deliver service #529

Closed
wants to merge 1 commit into from

Conversation

wenjianqiao
Copy link
Contributor

@wenjianqiao wenjianqiao commented Jan 21, 2020

Signed-off-by: Wenjian Qiao [email protected]

Type of change

  • Improvement (limit concurrent client requests)

Description

Add a new config data to throttle the concurrent client requests
to a peer. The throttle is added to endorser and event deliver handler.

Additional details

Related issues

Release Note

TBD

@wenjianqiao wenjianqiao requested a review from a team as a code owner January 21, 2020 21:13
@wenjianqiao wenjianqiao changed the title [FAB-14761] Throttle endorsement and deliver service Throttle endorsement and deliver service Jan 21, 2020
// A goroutine is pinned to an os thread for file i/o.
// Throttling limits the number of os threads created when getting blocks from ledger.
if h.ThrottleSemaphore != nil {
if err := h.ThrottleSemaphore.Acquire(ctx); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember we had a no-op semaphore somewhere, didn't we? Can't we inject it instead and then we don't need to do this if statement here?

Copy link
Contributor Author

@wenjianqiao wenjianqiao Jan 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a no-op semaphore.

@@ -368,6 +372,15 @@ func (e *Endorser) ProcessProposalSuccessfullyOrError(up *UnpackedProposal) (*pb
txParams.HistoryQueryExecutor = hqe
}

// A goroutine is pinned to an os thread for file i/o.
// Throttle the concurrent calls to limit the number of os threads created due to file i/o's (via ledger APIs).
if e.ThrottleSemaphore != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto the above comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a noop semaphore

@@ -334,7 +336,9 @@ func (e *Endorser) ProcessProposal(ctx context.Context, signedProp *pb.SignedPro
return pResp, nil
}

func (e *Endorser) ProcessProposalSuccessfullyOrError(up *UnpackedProposal) (*pb.ProposalResponse, error) {
// ProcessProposalSuccessfullyOrError processes the proposal and returns a response or error.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an alternative, more (in my opinion) clean way of doing this for the endorser gRPC:

We already have several authentication filter which are essentially a chaining on endorsement functions.

name: ExpirationCheck # This filter checks identity x509 certificate expiration

To add a new built-in filter, we:

  1. Put a construction of it in https://github.com/hyperledger/fabric/blob/master/core/handlers/library/library.go similarly to the existing functions
  2. append it to the handlers.authFilters in core.yaml

Or, alternatively we can always manually append if to the list at construction time like we do here:
https://github.com/hyperledger/fabric/blob/master/internal/peer/node/start.go#L835

This leaves the endorser code intact.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will not use the filter approach because it is better to throttle for as minimum code as needed. Otherwise, it may unnecessarily limit the concurrency.

@wenjianqiao wenjianqiao force-pushed the fab14761h branch 2 times, most recently from 3c82ab0 to 29d4c4d Compare January 22, 2020 21:41
@wenjianqiao
Copy link
Contributor Author

/ci-run

@github-actions
Copy link

AZP build triggered!

// Throttling limits the number of os threads created when getting blocks (file i/o's) from ledger.
if err := h.ThrottleSemaphore.Acquire(ctx); err != nil {
logger.Errorf("[channel: %s] Failed to acquire semaphore to get a block", chdr.ChannelId)
return cb.Status_BAD_REQUEST, errors.WithMessage(err, "failed to acquire semaphore to get a block")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that this should be Status_BAD_REQUEST , shouldn't this be Status_INTERNAL_SERVER_ERROR ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If semaphore.Acquire returns an error, it means the user context is cancelled (e.g., client closed the connection). Therefore I return a bad request, but looking at the existing code, it actually returns Status_INTERNAL_SERVER_ERROR. Will change to Status_INTERNAL_SERVER_ERROR.

Copy link
Contributor

@manish-sethi manish-sethi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good if some PTE numbers can be gathered for this kind of change to have a guess of performance impact.

// Create a semaphore for throttle control
var throttleSemaphore Semaphore = &semaphore.NoopSemaphore{}
if viper.IsSet("peer.limits.concurrency.clients") {
throttleSemaphore = semaphore.New(viper.GetInt("peer.limits.concurrency.clients"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that there was an attempt to remove references to viper.GetXXX in the flowing code and these calls were consolidated/wrapped in a component specific functions. Can you look if this can be added to one of the existing components config (perhaps peerConfig)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

if err := e.ThrottleSemaphore.Acquire(ctx); err != nil {
return nil, errors.WithMessage(err, "failed to acquire semaphore for endorsement")
}
defer e.ThrottleSemaphore.Release()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to lock it for entire function. This should be release right after step 1 - when the simulation is done at line 394.

Copy link
Contributor Author

@wenjianqiao wenjianqiao Jan 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both e.Support.ChaincodeEndorsementInfo and e.SimulateProposal call ledger API. I add the throttle in this func right before ChaincodeEndorsementInfo for simplicity. To limit the throttling scope, I can move ChaincodeEndorsementInfo and SimulateProposal into a single method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My comment was more about "defer" statement instead of explicitly releasing after line 394.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Manish, thank you for the comment. I moved the related code to a separate method because otherwise we had to call Release multiple times due to the error branch.

logger.Errorf("[channel: %s] Failed to acquire semaphore to get a block", chdr.ChannelId)
return cb.Status_INTERNAL_SERVER_ERROR, errors.WithMessage(err, "failed to acquire semaphore to get a block")
}
defer h.ThrottleSemaphore.Release()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function could be very long running based on how many blocks the client is pulling. Not sure whether keeping the lock for entire function is a good strategy. Would there be an issue if you move the locking around "Next" function below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, this is for simplicity. There are multiple places calling ledger APIs. (1) chain.Reader().Iterator; (2) cursor.Next(); (3) srv.SendBlockResponse (only for deliver with pvtdata). Since we decide to throttle at the high level, I didn't break it into multiple locks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree to the trade-off and that's why its hard to make these choices without being backed by some performance measurement. To me, it would be like shooting in the dark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To solve a potential deadlock, I changed the code to call throttling multiple times, in each of the above 3 places calling ledger API. Also exported a WaitForNextBlock method from block iterator.

@wenjianqiao wenjianqiao force-pushed the fab14761h branch 3 times, most recently from 1556a11 to f8a5b84 Compare January 27, 2020 03:37
@wenjianqiao
Copy link
Contributor Author

/ci-run

@github-actions
Copy link

AZP build triggered!

common/ledger/blkstorage/fsblkstorage/blocks_itr.go Outdated Show resolved Hide resolved
@@ -68,6 +68,16 @@ func (itr *blocksItr) shouldClose() bool {
return itr.closeMarker
}

// WaitForNextBlock waits until next block is available or the iterator should close
// It returns bools to indicate if next block is available and if the iterator should close
func (itr *blocksItr) WaitForNextBlock() (bool, bool) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this is a new exported function. Better to add UT for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

ResultsIterator
// WaitForNextBlock waits until next block is available or the iterator should close
// It returns bools to indicate if next block is available and if the iterator should close
WaitForNextBlock() (bool, bool)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new function name suggests that the type ResultsIteratorWithWait should better be called BlocksIteratorWithWait.... In fact, this can simply be called "BlocksIterator" as this is the only type for iterating over blocks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to BlocksIterator.

@@ -346,6 +366,28 @@ func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.E
return cb.Status_SUCCESS, nil
}

func (h *Handler) getNextBlock(ctx context.Context, channelID string, it blockledger.Iterator) (*cb.Block, cb.Status) {
nextBlockAvailable, shouldClose := it.WaitForNextBlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"shouldClose" is a misleading name. The itr is already closed (if this variable is set to true) by the consumer itself. "closed" is a better name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

logger.Debugf("[channel: %s] no block is returned because the iterator should close", channelID)
return nil, cb.Status_SERVICE_UNAVAILABLE
}
if !nextBlockAvailable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather remove this if block. The contract of the api "WaitForBlock" is very clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

nextBlockAvailable, shouldClose := it.WaitForNextBlock()
if shouldClose {
logger.Debugf("[channel: %s] no block is returned because the iterator should close", channelID)
return nil, cb.Status_SERVICE_UNAVAILABLE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to do anything for this but just to take a note, that this return status is never sent to client... As this if will be executed only if context is canceled in caller goroutine.


It("fails to acquire the semaphore", func() {
err := handler.Handle(ctx, server)
Expect(err).To(MatchError("failed to acquire semaphore to get a block: context canceled"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this a potential test-flake? In the code for handle.Handle, the context cancelation could in fact be trapped in the main flow instead of in the goroutine... no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. ctx.Done may be caught in either the goroutine or main flow depending on the timing. Updated the test to accordingly.

func (e *Endorser) performSimulationWithThrottling(ctx context.Context, up *UnpackedProposal, txParams *ccprovider.TransactionParams) (
*lifecycle.ChaincodeEndorsementInfo, *pb.Response, []byte, *pb.ChaincodeEvent, error) {

if err := e.ThrottleSemaphore.Acquire(ctx); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just like I commented in the deliver service code, can you verify that this ctx.Done is already not getting called in a separate goroutine already in the caller path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the code path gets to endorser.ProcessProposal, other than the semaphore, ctx is only used to extract context value which doesn't call ctx.Done. The grpc layer calls ctx.Done in DialContext, but it will return error before getting into endorser.

Add a new config data to throttle the concurrent client requests
to a peer. The throttle is added in endorser and event
deliver handler.

Signed-off-by: Wenjian Qiao <[email protected]>
@cendhu
Copy link
Contributor

cendhu commented Jan 30, 2020

To throttle the number of active requests, the best method is to use a worker pool. Most of the transactional-based systems such as PostgreSQL, MySQL, and CouchDB take max_connection as input from the user and allows only that many numbers of connections or active requests (using a worker pool). When all connections are active, the user would receive a connection refused error. The same exact problem could occur with PostgreSQL or MySQL when we set a high max connections and a very low memory.

Is it not possible to implement a connection pool at the endorser?

@manish-sethi
Copy link
Contributor

To throttle the number of active requests, the best method is to use a worker pool. Most of the transactional-based systems such as PostgreSQL, MySQL, and CouchDB take max_connection as input from the user and allows only that many numbers of connections or active requests (using a worker pool). When all connections are active, the user would receive a connection refused error. The same exact problem could occur with PostgreSQL or MySQL when we set a high max connections and a very low memory.

Is it not possible to implement a connection pool at the endorser?

Max connection and worker pool are two different setting. Typically, the max connections being significantly higher than worker pool. While max connections would control the number of clients, the worker pool will control the number of threads. In golang, since golang automatically manages the worker pool and let users launch goroutines freely (think of this as job queue/multiplexing), you don't implement an explicit worker pool.

Having said that, given that we are using a very large number in the semaphore in this PR, it would definitely make sense to control just at the level of grpc and that was the first option that was discussed. But it appears that Matt had run into some issue (he has mentioned in FAB-14761) when he had tried it at grpc level I don't know the details... @sykesm - do you have more details on why it was performing badly at the grpc layer (assuming that we are not supporting that high concurrent clients)

@cendhu
Copy link
Contributor

cendhu commented Jan 31, 2020

I am not sure whether they are two different settings. At least in PostgreSQL, the max_connections map to the max number of processes that handle the user requests. We do have max_user_connections which is different from the worker pool size.

I meant goroutine worker pool not pthread worker pool. The user can create any number of goroutines. The go runtime maps the goroutine to pthread. When the pthread is blocked, the goroutine would create a new one. The active number of pthread cannot exceed GO_MAX_PROC but there is no restriction on the number of blocked threads. As a result, the peer could create a large number pthreads which would consume most of the memory (possible OOM).

A sample implementation of worker pool in golang
can be found here -- https://golangbot.com/buffered-channels-worker-pools/
I was thinking of implementing such a worker pool.

From grpc-java grpc/grpc-java#1886 no way to set max connections. Not sure whether we have such a settings in grpc-go.

@cendhu
Copy link
Contributor

cendhu commented Jan 31, 2020

MaxConncurrentStream parameter might be the one that control max connections at the server side https://github.com/grpc/grpc-go/blob/master/server.go#L121

@hyperledger hyperledger deleted a comment from wenjianqiao Feb 3, 2020
@hyperledger hyperledger deleted a comment from guoger Feb 3, 2020
@wenjianqiao
Copy link
Contributor Author

A sample implementation of worker pool in golang
can be found here -- https://golangbot.com/buffered-channels-worker-pools/
I was thinking of implementing such a worker pool.

@cendhu This PR is using a counting semaphore (which is really a buffered channel) to limit the concurrent access from clients. The semaphore is used to block endorsement/deliver requests once the concurrency is over a limit. It effectively limits the number of goroutines created in a peer to handle client requests; therefore limits the number of os threads (blocking and active). Since go manages goroutines, I don't see a need for fabric itself to create and manage a goroutine worker pool.

I agree that the concurrency control for deliver service is more complicated than what was initially thought - we have to break it up into 3 places due to a potential deadlock. As mentioned in the defect, there was some issue with limiting the concurrency at grpc level. I will follow up with Matt to get more details.

@cendhu
Copy link
Contributor

cendhu commented Feb 3, 2020

@wenjianqiao Yes, a thread pool is not needed. I discussed with @manish-sethi and he convinced me that a pool is not required when the cost of creating a goroutine is much lesser than the cost of creating a process or a pthread. I also found two open-source implementations of goroutine limiter. https://github.com/korovkin/limiter/blob/master/limiter.go is a good one in my view and uses a buffered channel. As you mentioned, the semaphore is implemented using a buffered channel, this part of my question is resolved.

I am curious to know the issue with introducing the throttle at the grpc. Some of the discussions I found in the grpc-go repo seem to suggest that throttling can be done at the grpc server.
grpc/grpc-go#1986
grpc/grpc-go#2412

Similar to other established system software, we can expose max_connections (i.e., a throttle limit) as a user-config. Depending on the amount of memory allocated, the user can set the max_connections (or they can allocate memory depending on the required max_connections).

@yacovm
Copy link
Contributor

yacovm commented Feb 4, 2020

I am curious to know the issue with introducing the throttle at the grpc. Some of the discussions I found in the grpc-go repo seem to suggest that throttling can be done at the grpc server.

This can horribly backfire. The deliver service and the gossip service are both stream based and share the same gRPC server. This max streams configuration to my understanding is per gRPC server and not per service type, so this might starve gossip which only needs a single bi-directional stream among every 2 peers.

This semaphore approach, can also backfire - a malicious client can open many concurrent deliver streams and thus starve other clients.

I think the ideal is to keep track of the clients connected through their identity extracted from the request, and kick old sessions once new sessions with the same identity are authenticated.

@wenjianqiao
Copy link
Contributor Author

I talked to Matt about the issue with limiting concurrency at grpc level. It is basically the same problem as mentioned by Yacov - may starve gossip and decrease tx throughput when the semaphore is acquired by other requests (e.g, deliver service and qscc).

@cendhu
Copy link
Contributor

cendhu commented Feb 5, 2020

Yes, yacov. Max connections and max per client connections are the default config parameter. Manish and i discussed about it offline but forgot to mention it in my earlier comment. What you suggested is used by most existing database systems where the client has to login with credentials.

Thanks, Wenjian, for the clarification. So the major reason is the lack of connection limit per grpc service.

@wenjianqiao
Copy link
Contributor Author

Close this PR due to design change - throttle will be done in grpc interceptor for endoser and deliver services. See #647.

@wenjianqiao wenjianqiao closed this Feb 7, 2020
@wenjianqiao wenjianqiao deleted the fab14761h branch May 29, 2020 13:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants