You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Consume the sendRequests before closing the producer, send and flush them, invoke the callbacks of the input messages, so that the application can know the producing is succeed or failed.
Actual behavior
Currently, when we close the producer, we forget to consume the sendRequests in partitionProducer.dataChan before closing.
In the case that producing is faster than consuming in partitionProducer, we have send a lot of sendRequests into partitionProducer.dataChan, when closing, many of them are not consumed, these sendRequests will looks like get lost, their callback won't get invoked, the application won't know the producing result of these messages.
Steps to reproduce
Review the code of partitionProducer
func (p*partitionProducer) internalSendAsync(ctx context.Context, msg*ProducerMessage,
callbackfunc(MessageID, *ProducerMessage, error), flushImmediatelybool) {
//Register transaction operation to transaction and the transaction coordinator.varnewCallbackfunc(MessageID, *ProducerMessage, error)
ifmsg.Transaction!=nil {
transactionImpl:= (msg.Transaction).(*transaction)
iftransactionImpl.state!=TxnOpen {
p.log.WithField("state", transactionImpl.state).Error("Failed to send message"+" by a non-open transaction.")
callback(nil, msg, newError(InvalidStatus, "Failed to send message by a non-open transaction."))
return
}
iferr:=transactionImpl.registerProducerTopic(p.topic); err!=nil {
callback(nil, msg, err)
return
}
iferr:=transactionImpl.registerSendOrAckOp(); err!=nil {
callback(nil, msg, err)
}
newCallback=func(idMessageID, producerMessage*ProducerMessage, errerror) {
callback(id, producerMessage, err)
transactionImpl.endSendOrAckOp(err)
}
} else {
newCallback=callback
}
ifp.getProducerState() !=producerReady {
// Producer is closingnewCallback(nil, msg, errProducerClosed)
return
}
// bc only works when DisableBlockIfQueueFull is falsebc:=make(chanstruct{})
// callbackOnce make sure the callback is only invoked once in chunkingcallbackOnce:=&sync.Once{}
vartxn*transactionifmsg.Transaction!=nil {
txn= (msg.Transaction).(*transaction)
}
sr:=&sendRequest{
ctx: ctx,
msg: msg,
callback: newCallback,
callbackOnce: callbackOnce,
flushImmediately: flushImmediately,
publishTime: time.Now(),
blockCh: bc,
closeBlockChOnce: &sync.Once{},
transaction: txn,
}
p.options.Interceptors.BeforeSend(p, msg)
p.dataChan<-srif!p.options.DisableBlockIfQueueFull {
// block if queue full<-bc
}
}
func (p*partitionProducer) internalClose(req*closeProducer) {
deferclose(req.doneCh)
if!p.casProducerState(producerReady, producerClosing) {
return
}
p.log.Info("Closing producer")
id:=p.client.rpcClient.NewRequestID()
_, err:=p.client.rpcClient.RequestOnCnx(p._getConn(), id, pb.BaseCommand_CLOSE_PRODUCER, &pb.CommandCloseProducer{
ProducerId: &p.producerID,
RequestId: &id,
})
iferr!=nil {
p.log.WithError(err).Warn("Failed to close producer")
} else {
p.log.Info("Closed producer")
}
ifp.batchBuilder!=nil {
iferr=p.batchBuilder.Close(); err!=nil {
p.log.WithError(err).Warn("Failed to close batch builder")
}
}
p.setProducerState(producerClosed)
p._getConn().UnregisterListener(p.producerID)
p.batchFlushTicker.Stop()
}
And in the interface of the Producer, we saied Waits until all pending write request are persisted. In case of errors, pending writes will not be retried., I think the messages that have been summit to SendAsync should be treated as pending write requests, so we should send and flush them to make sure they are done with a result.
// Producer is used to publish messages on a topictypeProducerinterface {
...// Close the producer and releases resources allocated// No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case// of errors, pending writes will not be retried.Close()
}
Expected behavior
Consume the sendRequests before closing the producer, send and flush them, invoke the callbacks of the input messages, so that the application can know the producing is succeed or failed.
Actual behavior
Currently, when we close the producer, we forget to consume the sendRequests in partitionProducer.dataChan before closing.
In the case that producing is faster than consuming in partitionProducer, we have send a lot of sendRequests into partitionProducer.dataChan, when closing, many of them are not consumed, these sendRequests will looks like get lost, their callback won't get invoked, the application won't know the producing result of these messages.
Steps to reproduce
Review the code of partitionProducer
And in the interface of the Producer, we saied Waits until all pending write request are persisted. In case of errors, pending writes will not be retried., I think the messages that have been summit to
SendAsync
should be treated as pending write requests, so we should send and flush them to make sure they are done with a result.System configuration
Pulsar version: x.y
@zengguan @merlimat @wolfstudy
The text was updated successfully, but these errors were encountered: