diff --git a/primitive/message.go b/primitive/message.go index b8e8f83a..11faf5ea 100644 --- a/primitive/message.go +++ b/primitive/message.go @@ -63,12 +63,13 @@ const ( ) type Message struct { - Topic string - Body []byte - Flag int32 - TransactionId string - Batch bool - Compress bool + Topic string + Body []byte + CompressedBody []byte + Flag int32 + TransactionId string + Batch bool + Compress bool // Queue is the queue that messages will be sent to. the value must be set if want to custom the queue of message, // just ignore if not. Queue *MessageQueue diff --git a/producer/producer.go b/producer/producer.go index 3c875c62..45c3afb1 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -335,7 +335,7 @@ func (p *defaultProducer) tryCompressMsg(msg *primitive.Message) bool { if e != nil { return false } - msg.Body = compressedBody + msg.CompressedBody = compressedBody msg.Compress = true return true } @@ -345,8 +345,14 @@ func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue, if !msg.Batch && msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex) == "" { msg.WithProperty(primitive.PropertyUniqueClientMessageIdKeyIndex, primitive.CreateUniqID()) } - sysFlag := 0 + + var ( + sysFlag = 0 + transferBody = msg.Body + ) + if p.tryCompressMsg(msg) { + transferBody = msg.CompressedBody sysFlag = primitive.SetCompressedFlag(sysFlag) } v := msg.GetProperty(primitive.PropertyTransactionPrepared) @@ -373,10 +379,10 @@ func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue, if msg.Batch { cmd = internal.ReqSendBatchMessage reqv2 := &internal.SendMessageRequestV2Header{SendMessageRequestHeader: req} - return remote.NewRemotingCommand(cmd, reqv2, msg.Body) + return remote.NewRemotingCommand(cmd, reqv2, transferBody) } - return remote.NewRemotingCommand(cmd, req, msg.Body) + return remote.NewRemotingCommand(cmd, req, transferBody) } func (p *defaultProducer) selectMessageQueue(msg *primitive.Message) *primitive.MessageQueue {