-
Notifications
You must be signed in to change notification settings - Fork 344
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Improve][Producer] Refactor internalSend() and resouce managment #1071
Conversation
@Gleiphir2769 @RobertIndie @graysonzeng @shibd Would you please review this PR ? |
4b6c0c2
to
68f1a0d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @gunli, I leaved some comments.
|
||
func (p *partitionProducer) internalSendAsync(ctx context.Context, msg *ProducerMessage, | ||
callback func(MessageID, *ProducerMessage, error), flushImmediately bool) { | ||
err := p.validateMsg(msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we make it inline?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inline will make internalSendAsync a BIG func, about 200 lines, it hard to read, spilt into small funcs will be more clear and readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, maybe my description is not clear. Could we make L1191-L1192 as a One Liner if...else Statements
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I think that is OK. inlining can reduce the code line number, non-inlining are better for debugging.
|
||
p.dataChan <- sr | ||
err = p.updateSchema(sr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we make it inline?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inline will make internalSendAsync a BIG func, about 200 lines, it hard to read, spilt into small funcs will be more clear and readable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above
|
||
func (p *partitionProducer) updateUncompressPayload(sr *sendRequest) error { | ||
// read payload from message | ||
sr.uncompressedPayload = sr.msg.Payload |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems msg.Payload is cloned to sr.uncompressedPayload and it will take up unnecessary memory. I think the type of uncompressedPayload is *[]byte may be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in go, []byte assignment just copy the address
pulsar/producer_partition.go
Outdated
return nil | ||
} | ||
|
||
func (p *partitionProducer) updateUncompressPayload(sr *sendRequest) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo updateUncompressPayload
-> updateUncompressedPayload
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I have renamed it.
checkSize = int64(sr.compressedSize) | ||
} | ||
|
||
sr.maxMessageSize = int32(int64(p._getConn().GetMaxMessageSize())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
p._getConn().GetMaxMessageSize()
makes an rpc call to broker. This breaks the semantics of async
.
For example, when user invokes the method producer.SendAsync
, he must wait for an rpc to return.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MaxMessageSize is cached in the conn when a conn is ready, you can check the code of connection and connectionPool, this is OK.
Actually, p.getOrCreateSchema()
will trigger a block RPC call. What make SendAsync
not a real asyn func is the fixed length PendingQueue
, if pengding queue can be expanded at runtime, shema-encoding/compress/getOrCreateSchema can be done in internalSend()
, that can make it a real asyn func.
I don't think memLimit and fixed length queue is neccessary in a language with GC like JAVA and Go, 'cause we have semaphore and dataChan to control how many messages can be pending. As I mentioned in #1043 .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MaxMessageSize is cached in the conn when a conn is ready, you can check the code of connection and connectionPool, this is OK.
You're right. This point is good to me.
Actually, p.getOrCreateSchema() will trigger a block RPC call. What make SendAsync not a real asyn func is the fixed length PendingQueue
But the size of PendingQueue
is decided by user. And if the size of dataChan
is set as MaxPendingMessages
, it will not be the async limit.
I don't think memLimit and fixed length queue is neccessary in a language with GC like JAVA and Go
This is not only related to memory, but also related to broker. Here is the interface description of Java client.
Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker
I think we should find a way to solve the p.getOrCreateSchema()
problem. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the size of
PendingQueue
is decided by user. And if the size ofdataChan
is set asMaxPendingMessages
, it will not be the async limit.
dataChan and semaphore can work together to make it async, when there is enough semaphore, add to the queue, or wait until the response back from the broker and release a semaphore.
I think we should find a way to solve the
p.getOrCreateSchema()
problem.
Since the schema can be set by the user at runtime, we can only solve it by moving the blocking logic to `internalSend()', but at the same time, we have to reserve memory and peding queue spaces before adding a message to dataChan, so if we want to solve it, we must drop memLimit and the fixed length pending queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when there is enough semaphore, add to the queue, or wait until the response back from the broker and release a semaphore.
I support the idea which reserve resource before internalSend
. And if we reserve the semaphore firstly, the block of dataChan
will not happen if it's made with capacity MaxPendingMessages
.
we must drop memLimit and the fixed length pending queue
Sorry, I am not get the point why we should drop the memLimit? It's a useful feature for the users who is lack of resources.
And I don't think the fixed length pending queue
will become a problem to SendAsync
. Why should we make it flexible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I support the idea which reserve resource before
internalSend
. And if we reserve the semaphore firstly, the block ofdataChan
will not happen if it's made with capacityMaxPendingMessages
.
We do not block on dataChan, just block on semaphore, we just treat dataChan as a channel between the main goroutine(user's goroutine) and the partitionProducer's goroutine(IO goroutine), semaphore represents the available resource(pendingItem), When aquire semaphore succeed, we can add it to dataChan, otherwise, block until one semaphore has been released(one message has been done) .
Sorry, I am not get the point why we should drop the memLimit? It's a useful feature for the users who is lack of resources.
And I don't think thefixed length pending queue
will become a problem toSendAsync
. Why should we make it flexible?
Because we limit the memory and pending queue, we have to reserve memory and pending queue before adding a message to dataChan, which make we have to do schema encoding and compressing first, or we have no idea about how much memory and how many pending items we need, these are bloking logic. When these blocking jobs are done in the user's goroutine, they block the user's logic, which make it a non-async method.
68f1a0d
to
3bd5c0e
Compare
@RobertIndie Would you please take a look at this PR, it pending for a long time. |
63b2726
to
632f367
Compare
@tisonkun @RobertIndie I think there is a bug in |
done. |
I noticed that you did a lot of refactoring work in this PR, including changes to the critical path for publishing messages. I’m concerned that these changes may impact publishing performance. While there are many modifications, it’s difficult for the reviewer to see the relationship between these modifications and the bug fixes. Could you provide more detail in the PR description, particularly regarding why you’re making these changes to fix bugs? I recommend separating the refactoring work from the bug fixes. Otherwise, it will be challenging for us to cherry-pick the bug fixes to other release branches. |
@RobertIndie The details we have disccussed in #1059 #1043 #1060, the bug fix just fixed the bug in the PR itself, not the exist code. |
@gunli Could you summarize them and add the detailed explanation to the PR description? They are very important context for this PR. A well-written PR description not only helps with review but also enables other developers to learn about the context of this PR. |
@RobertIndie I have updated the motivation. |
Hi @gunli . Looks like it does not pass the CI test now. You can check the CI log an fix it. Seems like it is related to Chunking. |
@Gleiphir2769 Thank you, I am busy these days, I will fix it ASAP. |
@Gleiphir2769 I have fix the test case, and it can run PASS now, but since only the last chunk will trigger the callback, I commented the wait logic in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @gunli. I leaved some comments about sendSingleChunk
.
mm.ChunkId = proto.Int32(int32(chunkID)) | ||
producerImpl.updateMetadataSeqID(mm, msg) | ||
|
||
doneCh := make(chan struct{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can remove this doneCh. The goal of this UT is to verify whether consumer can discard the oldest chunk message. It has no impact whether the callback of sendRequest is called or not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I have deleted these line in a new commit.
@@ -548,30 +556,57 @@ func createTestMessagePayload(size int) []byte { | |||
} | |||
|
|||
//nolint:all | |||
func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int) { | |||
func sendSingleChunk(p Producer, uuid string, chunkID int, totalChunks int, wholePayload string, callbackOnce *sync.Once, cr *chunkRecorder) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add the parameterswholePayload
callbackOnce
and chunkRecorder
? I think it's more appropriate to make them inside thesendSingleChunk
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, in the current implemention, all the chunking sendRequest should share the whole message payload, callbackOnce, chunkRecorder, see internalSend()
and addRequestToBatch()
, I add these params just want to make the send procedure work correctly, I think it better to keep them.
@RobertIndie @tisonkun Would you please review this PR again? |
36edd30
to
dc03dd6
Compare
message_chunking_test-ac9c1a6399336461d2d3ce1cdd31cac6debd5ed5.txt |
This PR looks complex, could you split this PR? |
@nodece OK, I will do it today. |
(If this PR fixes a github issue, please add
Fixes #<xyz>
.)Fixes #1043
(or if this PR is one task of a github issue, please add
Master Issue: #<xyz>
to link to the master issue.)Master Issue: #1043, #1055 #1059 #1060 #1067, #1068
Motivation
sendRequest
to thedataChan
, but currently, we do schema-encoding/compressing ininternalSend()
, this may lead to inaccurate memory limit cotrolling, and as described in [Improve][Producer]Simplify the MaxPendingMessages controlling #1043, make the code complicated and difficult to maintain, we need to simplify the send logic;Modifications
internalSend()
tointernalSendAsync()
;internalSend()
tointernalSendAsync()
;sendRequest
store the semaphore and memory it holds;internalSendAsync()
clearer;sendRequest.done()
to release the resources it holds;sendRequest
is done, callsendRequest.done()
;sendRequest.done()
run callback, update metrics, end transaction, run interceptors callback...Verifying this change
This change is already covered by existing tests, such as (please describe tests).
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation