Skip to content

Commit

Permalink
support-pip-368
Browse files Browse the repository at this point in the history
  • Loading branch information
crossoverJie committed Oct 30, 2024
1 parent 310fb94 commit a54a11b
Show file tree
Hide file tree
Showing 9 changed files with 1,308 additions and 1,202 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#

IMAGE_NAME = pulsar-client-go-test:latest
PULSAR_VERSION ?= 3.2.2
PULSAR_VERSION ?= 4.0.0
PULSAR_IMAGE = apachepulsar/pulsar:$(PULSAR_VERSION)
GO_VERSION ?= 1.22
CONTAINER_ARCH ?= $(shell uname -m | sed s/x86_64/amd64/)
Expand Down
8 changes: 8 additions & 0 deletions pulsar/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ type ClientOptions struct {
// Limit of client memory usage (in byte). The 64M default can guarantee a high producer throughput.
// Config less than 0 indicates off memory limit.
MemoryLimitBytes int64

// Set the properties used for topic lookup.
// When the broker performs topic lookup, these lookup properties will be taken into consideration in a customized
// load manager.
// Note: The lookup properties are only used in topic lookup when:
// The protocol is binary protocol, i.e. the service URL starts with "pulsar://" or "pulsar+ssl://"
// The `loadManagerClassName` config in broker is a class that implements the `ExtensibleLoadManager` interface
LookupProperties map[string]string
}

// Client represents a pulsar client
Expand Down
2 changes: 1 addition & 1 deletion pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func newClient(options ClientOptions) (Client, error) {
}

c.rpcClient = internal.NewRPCClient(url, c.cnxPool, operationTimeout, logger, metrics,
options.ListenerName, tlsConfig, authProvider)
options.ListenerName, tlsConfig, authProvider, toKeyValues(options.LookupProperties))

c.lookupService = c.rpcClient.LookupService("")

Expand Down
63 changes: 63 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4691,3 +4691,66 @@ func TestPartitionConsumerGetLastMessageIDs(t *testing.T) {
}

}

func TestLookupConsumer(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
LookupProperties: map[string]string{
"broker.id": "1",
},
})

assert.Nil(t, err)
defer client.Close()

topic := "my-topic"
ctx := context.Background()

// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
Type: Exclusive,
})
assert.Nil(t, err)
defer consumer.Close()

// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: false,
})
assert.Nil(t, err)
defer producer.Close()

// send 10 messages
for i := 0; i < 10; i++ {
if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
Key: "pulsar",
Properties: map[string]string{
"key-1": "pulsar-1",
},
}); err != nil {
log.Fatal(err)
}
}

// receive 10 messages
for i := 0; i < 10; i++ {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}

expectMsg := fmt.Sprintf("hello-%d", i)
expectProperties := map[string]string{
"key-1": "pulsar-1",
}
assert.Equal(t, []byte(expectMsg), msg.Payload())
assert.Equal(t, "pulsar", msg.Key())
assert.Equal(t, expectProperties, msg.Properties())
// ack message
consumer.Ack(msg)
}
}
6 changes: 5 additions & 1 deletion pulsar/internal/lookup_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,21 @@ type lookupService struct {
serviceNameResolver ServiceNameResolver
tlsEnabled bool
listenerName string
lookupProperties []*pb.KeyValue
log log.Logger
metrics *Metrics
}

// NewLookupService init a lookup service struct and return an object of LookupService.
func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, serviceNameResolver ServiceNameResolver,
tlsEnabled bool, listenerName string, logger log.Logger, metrics *Metrics) LookupService {
tlsEnabled bool, listenerName string,
lookupProperties []*pb.KeyValue, logger log.Logger, metrics *Metrics) LookupService {
return &lookupService{
rpcClient: rpcClient,
serviceNameResolver: serviceNameResolver,
tlsEnabled: tlsEnabled,
log: logger.SubLogger(log.Fields{"serviceURL": serviceURL}),
lookupProperties: lookupProperties,
metrics: metrics,
listenerName: listenerName,
}
Expand Down Expand Up @@ -146,6 +149,7 @@ func (ls *lookupService) Lookup(topic string) (*LookupResult, error) {
Topic: &topic,
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(ls.listenerName),
Properties: ls.lookupProperties,
})
if err != nil {
return nil, err
Expand Down
46 changes: 29 additions & 17 deletions pulsar/internal/lookup_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestLookupSuccess(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)
serviceNameResolver := NewPulsarServiceNameResolver(url)

kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,

Expand All @@ -138,6 +138,7 @@ func TestLookupSuccess(t *testing.T) {
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
Properties: kvs,
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -149,9 +150,8 @@ func TestLookupSuccess(t *testing.T) {
},
},
}

metricsProvider := NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer)
ls := NewLookupService(mockedClient, url, serviceNameResolver, false, "", log.DefaultNopLogger(), metricsProvider)
ls := NewLookupService(mockedClient, url, serviceNameResolver, false, "", kvs, log.DefaultNopLogger(), metricsProvider)

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -165,6 +165,7 @@ func TestTlsLookupSuccess(t *testing.T) {
url, err := url.Parse("pulsar+ssl://example:6651")
assert.NoError(t, err)
serviceNameResolver := NewPulsarServiceNameResolver(url)
kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,

Expand All @@ -174,6 +175,7 @@ func TestTlsLookupSuccess(t *testing.T) {
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
Properties: kvs,
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -187,7 +189,7 @@ func TestTlsLookupSuccess(t *testing.T) {
}
metricsProvider := NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer)

ls := NewLookupService(mockedClient, url, serviceNameResolver, true, "", log.DefaultNopLogger(), metricsProvider)
ls := NewLookupService(mockedClient, url, serviceNameResolver, true, "", kvs, log.DefaultNopLogger(), metricsProvider)

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -201,6 +203,7 @@ func TestLookupWithProxy(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)
serviceNameResolver := NewPulsarServiceNameResolver(url)
kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,

Expand All @@ -210,6 +213,7 @@ func TestLookupWithProxy(t *testing.T) {
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
Properties: kvs,
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -223,7 +227,7 @@ func TestLookupWithProxy(t *testing.T) {
},
}
metricsProvider := NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer)
ls := NewLookupService(mockedClient, url, serviceNameResolver, false, "", log.DefaultNopLogger(), metricsProvider)
ls := NewLookupService(mockedClient, url, serviceNameResolver, false, "", kvs, log.DefaultNopLogger(), metricsProvider)

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -236,7 +240,7 @@ func TestLookupWithProxy(t *testing.T) {
func TestTlsLookupWithProxy(t *testing.T) {
url, err := url.Parse("pulsar+ssl://example:6651")
assert.NoError(t, err)

kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,

Expand All @@ -246,6 +250,7 @@ func TestTlsLookupWithProxy(t *testing.T) {
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
Properties: kvs,
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -260,7 +265,7 @@ func TestTlsLookupWithProxy(t *testing.T) {
}
resolver := NewPulsarServiceNameResolver(url)
metricsProvider := NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer)
ls := NewLookupService(mockedClient, url, resolver, true, "", log.DefaultNopLogger(), metricsProvider)
ls := NewLookupService(mockedClient, url, resolver, true, "", kvs, log.DefaultNopLogger(), metricsProvider)

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -273,7 +278,7 @@ func TestTlsLookupWithProxy(t *testing.T) {
func TestLookupWithRedirect(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)

kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,
expectedURL: "pulsar://broker-2:6650",
Expand All @@ -284,6 +289,7 @@ func TestLookupWithRedirect(t *testing.T) {
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
Properties: kvs,
},
{
RequestId: proto.Uint64(2),
Expand All @@ -309,7 +315,7 @@ func TestLookupWithRedirect(t *testing.T) {
}
resolver := NewPulsarServiceNameResolver(url)
metricsProvider := NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer)
ls := NewLookupService(mockedClient, url, resolver, false, "", log.DefaultNopLogger(), metricsProvider)
ls := NewLookupService(mockedClient, url, resolver, false, "", kvs, log.DefaultNopLogger(), metricsProvider)

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -322,7 +328,7 @@ func TestLookupWithRedirect(t *testing.T) {
func TestTlsLookupWithRedirect(t *testing.T) {
url, err := url.Parse("pulsar+ssl://example:6651")
assert.NoError(t, err)

kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,
expectedURL: "pulsar+ssl://broker-2:6651",
Expand All @@ -333,6 +339,7 @@ func TestTlsLookupWithRedirect(t *testing.T) {
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
Properties: kvs,
},
{
RequestId: proto.Uint64(2),
Expand All @@ -359,7 +366,7 @@ func TestTlsLookupWithRedirect(t *testing.T) {

resolver := NewPulsarServiceNameResolver(url)
metricsProvider := NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer)
ls := NewLookupService(mockedClient, url, resolver, true, "", log.DefaultNopLogger(), metricsProvider)
ls := NewLookupService(mockedClient, url, resolver, true, "", kvs, log.DefaultNopLogger(), metricsProvider)

lr, err := ls.Lookup("my-topic")
assert.NoError(t, err)
Expand All @@ -372,7 +379,7 @@ func TestTlsLookupWithRedirect(t *testing.T) {
func TestLookupWithInvalidUrlResponse(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)

kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,

Expand All @@ -382,6 +389,7 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) {
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
Properties: kvs,
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -396,7 +404,7 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) {
}
resolver := NewPulsarServiceNameResolver(url)
metricsProvider := NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer)
ls := NewLookupService(mockedClient, url, resolver, false, "", log.DefaultNopLogger(), metricsProvider)
ls := NewLookupService(mockedClient, url, resolver, false, "", kvs, log.DefaultNopLogger(), metricsProvider)

lr, err := ls.Lookup("my-topic")
assert.Error(t, err)
Expand All @@ -406,7 +414,7 @@ func TestLookupWithInvalidUrlResponse(t *testing.T) {
func TestLookupWithLookupFailure(t *testing.T) {
url, err := url.Parse("pulsar://example:6650")
assert.NoError(t, err)

kvs := make([]*pb.KeyValue, 0)
mockedClient := &mockedLookupRPCClient{
t: t,

Expand All @@ -416,6 +424,7 @@ func TestLookupWithLookupFailure(t *testing.T) {
Topic: proto.String("my-topic"),
Authoritative: proto.Bool(false),
AdvertisedListenerName: proto.String(""),
Properties: kvs,
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -429,7 +438,7 @@ func TestLookupWithLookupFailure(t *testing.T) {

resolver := NewPulsarServiceNameResolver(url)
metricsProvider := NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer)
ls := NewLookupService(mockedClient, url, resolver, false, "", log.DefaultNopLogger(), metricsProvider)
ls := NewLookupService(mockedClient, url, resolver, false, "", kvs, log.DefaultNopLogger(), metricsProvider)

lr, err := ls.Lookup("my-topic")
assert.Error(t, err)
Expand Down Expand Up @@ -509,6 +518,7 @@ func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {
assert.NoError(t, err)
serviceNameResolver := NewPulsarServiceNameResolver(url)

kvs := make([]*pb.KeyValue, 0)
ls := NewLookupService(&mockedPartitionedTopicMetadataRPCClient{
t: t,

Expand All @@ -525,7 +535,7 @@ func TestGetPartitionedTopicMetadataSuccess(t *testing.T) {
Response: pb.CommandPartitionedTopicMetadataResponse_Success.Enum(),
},
},
}, url, serviceNameResolver, false, "", log.DefaultNopLogger(),
}, url, serviceNameResolver, false, "", kvs, log.DefaultNopLogger(),
NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer))

metadata, err := ls.GetPartitionedTopicMetadata("my-topic")
Expand All @@ -539,6 +549,7 @@ func TestLookupSuccessWithMultipleHosts(t *testing.T) {
assert.NoError(t, err)
serviceNameResolver := NewPulsarServiceNameResolver(url)

kvs := make([]*pb.KeyValue, 0)
ls := NewLookupService(&mockedLookupRPCClient{
t: t,

Expand All @@ -548,6 +559,7 @@ func TestLookupSuccessWithMultipleHosts(t *testing.T) {
Topic: proto.String("my-topic"),
AdvertisedListenerName: proto.String(""),
Authoritative: proto.Bool(false),
Properties: kvs,
},
},
mockedResponses: []pb.CommandLookupTopicResponse{
Expand All @@ -558,7 +570,7 @@ func TestLookupSuccessWithMultipleHosts(t *testing.T) {
BrokerServiceUrl: proto.String("pulsar://broker-1:6650"),
},
},
}, url, serviceNameResolver, false, "", log.DefaultNopLogger(),
}, url, serviceNameResolver, false, "", kvs, log.DefaultNopLogger(),
NewMetricsProvider(4, map[string]string{}, prometheus.DefaultRegisterer))

lr, err := ls.Lookup("my-topic")
Expand Down
Loading

0 comments on commit a54a11b

Please sign in to comment.