From aae86a433fc9c0c92eb3a2c08e38116c0be46dd1 Mon Sep 17 00:00:00 2001 From: shewitt Date: Wed, 20 Apr 2022 16:44:58 -0400 Subject: [PATCH 1/2] Release semaphore and execute callback when message fails to encode. Add tests for producer schema encode. --- pulsar/producer_partition.go | 2 ++ pulsar/producer_test.go | 44 ++++++++++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index bc775e9270..fbebfba53f 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -419,6 +419,8 @@ func (p *partitionProducer) internalSend(request *sendRequest) { var schemaPayload []byte schemaPayload, err = p.options.Schema.Encode(msg.Value) if err != nil { + p.publishSemaphore.Release() + request.callback(nil, request.msg, err) p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value) return } diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 3c526bbbed..541c1fe506 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -924,6 +924,50 @@ func TestMaxMessageSize(t *testing.T) { } } +func TestFailedSchemaEncode(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + ctx := context.Background() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + Schema: NewAvroSchema("{\"type\":\"string\"}", nil), + }) + + assert.Nil(t, err) + defer producer.Close() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + // producer should send return an error as message is Int64, but schema is String + mid, err := producer.Send(ctx, &ProducerMessage{ + Value: int64(1), + }) + assert.NotNil(t, err) + assert.Nil(t, mid) + wg.Done() + }() + + wg.Add(1) + // producer should send return an error as message is Int64, but schema is String + producer.SendAsync(ctx, &ProducerMessage{ + Value: int64(1), + }, func(messageID MessageID, producerMessage *ProducerMessage, err error) { + assert.NotNil(t, err) + assert.Nil(t, messageID) + wg.Done() + }) + wg.Wait() +} + func TestSendTimeout(t *testing.T) { quotaURL := adminURL + "/admin/v2/namespaces/public/default/backlogQuota" quotaFmt := `{"limit": "%d", "policy": "producer_request_hold"}` From 242232f66ab0ef5554cbd718054c700e21ad2690 Mon Sep 17 00:00:00 2001 From: shewitt Date: Wed, 20 Apr 2022 17:36:52 -0400 Subject: [PATCH 2/2] Use well-defined error code. --- pulsar/error.go | 4 ++++ pulsar/producer_partition.go | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/pulsar/error.go b/pulsar/error.go index f433bfc973..ead5cf9436 100644 --- a/pulsar/error.go +++ b/pulsar/error.go @@ -101,6 +101,8 @@ const ( SeekFailed // ProducerClosed means producer already been closed ProducerClosed + // SchemaFailure means the payload could not be encoded using the Schema + SchemaFailure ) // Error implement error interface, composed of two parts: msg and result. @@ -205,6 +207,8 @@ func getResultStr(r Result) string { return "SeekFailed" case ProducerClosed: return "ProducerClosed" + case SchemaFailure: + return "SchemaFailure" default: return fmt.Sprintf("Result(%d)", r) } diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index fbebfba53f..43ae68fdef 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -420,7 +420,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { schemaPayload, err = p.options.Schema.Encode(msg.Value) if err != nil { p.publishSemaphore.Release() - request.callback(nil, request.msg, err) + request.callback(nil, request.msg, newError(SchemaFailure, err.Error())) p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value) return }