diff --git a/go.mod b/go.mod index a4a4bec..601b0a9 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.12 require ( github.com/davecgh/go-spew v1.1.1 github.com/fatih/color v1.7.0 // indirect + github.com/golang/protobuf v1.3.1 github.com/google/go-cmp v0.3.1 // indirect github.com/kris-nova/logger v0.0.0-20181127235838-fd0d87064b06 github.com/kris-nova/lolgopher v0.0.0-20180921204813-313b3abb0d9b // indirect diff --git a/pkg/pulsar/admin.go b/pkg/pulsar/admin.go index e1ad829..0c7b868 100644 --- a/pkg/pulsar/admin.go +++ b/pkg/pulsar/admin.go @@ -73,6 +73,7 @@ type Client interface { Functions() Functions Tenants() Tenants Topics() Topics + Subscriptions() Subscriptions Sources() Sources Sinks() Sinks Namespaces() Namespaces diff --git a/pkg/pulsar/message.go b/pkg/pulsar/message.go new file mode 100644 index 0000000..0e36434 --- /dev/null +++ b/pkg/pulsar/message.go @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import "github.com/golang/protobuf/proto" + +type Message struct { + messageID MessageID + payload []byte + topic string + properties map[string]string +} + +func NewMessage(topic string, id MessageID, payload []byte, properties map[string]string) *Message { + return &Message{ + messageID: id, + payload: payload, + topic: topic, + properties: properties, + } +} + +func (m *Message) GetMessageID() MessageID { + return m.messageID +} + +func (m *Message) GetProperties() map[string]string { + return m.properties +} + +func (m *Message) GetPayload() []byte { + return m.payload +} + +// nolint +type SingleMessageMetadata struct { + Properties []*KeyValue `protobuf:"bytes,1,rep,name=properties" json:"properties,omitempty"` + PartitionKey *string `protobuf:"bytes,2,opt,name=partition_key,json=partitionKey" json:"partition_key,omitempty"` + PayloadSize *int32 `protobuf:"varint,3,req,name=payload_size,json=payloadSize" json:"payload_size,omitempty"` + CompactedOut *bool `protobuf:"varint,4,opt,name=compacted_out,json=compactedOut,def=0" json:"compacted_out,omitempty"` + // the timestamp that this event occurs. it is typically set by applications. + // if this field is omitted, `publish_time` can be used for the purpose of `event_time`. + EventTime *uint64 `protobuf:"varint,5,opt,name=event_time,json=eventTime,def=0" json:"event_time,omitempty"` + PartitionKeyB64Encoded *bool `protobuf:"varint,6,opt,name=partition_key_b64_encoded,json=partitionKeyB64Encoded,def=0" json:"partition_key_b64_encoded,omitempty"` + // Specific a key to overwrite the message key which used for ordering dispatch in Key_Shared mode. + OrderingKey []byte `protobuf:"bytes,7,opt,name=ordering_key,json=orderingKey" json:"ordering_key,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *SingleMessageMetadata) Reset() { *m = SingleMessageMetadata{} } +func (m *SingleMessageMetadata) String() string { return proto.CompactTextString(m) } +func (*SingleMessageMetadata) ProtoMessage() {} +func (m *SingleMessageMetadata) GetPayloadSize() int32 { + if m != nil && m.PayloadSize != nil { + return *m.PayloadSize + } + return 0 +} + +// nolint +type KeyValue struct { + Key *string `protobuf:"bytes,1,req,name=key" json:"key,omitempty"` + Value *string `protobuf:"bytes,2,req,name=value" json:"value,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *KeyValue) Reset() { *m = KeyValue{} } +func (m *KeyValue) String() string { return proto.CompactTextString(m) } +func (*KeyValue) ProtoMessage() {} diff --git a/pkg/pulsar/message_id.go b/pkg/pulsar/message_id.go index 42e3fc7..898f15b 100644 --- a/pkg/pulsar/message_id.go +++ b/pkg/pulsar/message_id.go @@ -17,8 +17,66 @@ package pulsar +import ( + "strconv" + "strings" + + "github.com/pkg/errors" +) + type MessageID struct { LedgerID int64 `json:"ledgerId"` EntryID int64 `json:"entryId"` PartitionedIndex int `json:"partitionedIndex"` + BatchIndex int `json:"-"` +} + +var Latest = MessageID{0x7fffffffffffffff, 0x7fffffffffffffff, -1, -1} +var Earliest = MessageID{-1, -1, -1, -1} + +func ParseMessageID(str string) (*MessageID, error) { + s := strings.Split(str, ":") + + m := Earliest + + if len(s) < 2 || len(s) > 4 { + return nil, errors.Errorf("invalid message id string. %s", str) + } + + ledgerID, err := strconv.ParseInt(s[0], 10, 64) + if err != nil { + return nil, errors.Errorf("invalid ledger id. %s", str) + } + m.LedgerID = ledgerID + + entryID, err := strconv.ParseInt(s[1], 10, 64) + if err != nil { + return nil, errors.Errorf("invalid entry id. %s", str) + } + m.EntryID = entryID + + if len(s) > 2 { + pi, err := strconv.Atoi(s[2]) + if err != nil { + return nil, errors.Errorf("invalid partition index. %s", str) + } + m.PartitionedIndex = pi + } + + if len(s) == 4 { + bi, err := strconv.Atoi(s[3]) + if err != nil { + return nil, errors.Errorf("invalid batch index. %s", str) + } + m.BatchIndex = bi + } + + return &m, nil +} + +func (m MessageID) String() string { + return strconv.FormatInt(m.LedgerID, 10) + ":" + + strconv.FormatInt(m.EntryID, 10) + ":" + + strconv.Itoa(m.PartitionedIndex) + ":" + + strconv.Itoa(m.BatchIndex) } diff --git a/pkg/pulsar/message_id_test.go b/pkg/pulsar/message_id_test.go new file mode 100644 index 0000000..46c76b7 --- /dev/null +++ b/pkg/pulsar/message_id_test.go @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParseMessageId(t *testing.T) { + id, err := ParseMessageID("1:1") + assert.Nil(t, err) + assert.Equal(t, MessageID{LedgerID: 1, EntryID: 1, PartitionedIndex: -1, BatchIndex: -1}, *id) + + id, err = ParseMessageID("1:2:3") + assert.Nil(t, err) + assert.Equal(t, MessageID{LedgerID: 1, EntryID: 2, PartitionedIndex: 3, BatchIndex: -1}, *id) + + id, err = ParseMessageID("1:2:3:4") + assert.Nil(t, err) + assert.Equal(t, MessageID{LedgerID: 1, EntryID: 2, PartitionedIndex: 3, BatchIndex: 4}, *id) +} + +func TestParseMessageIdErrors(t *testing.T) { + id, err := ParseMessageID("1;1") + assert.Nil(t, id) + assert.NotNil(t, err) + assert.Equal(t, "invalid message id string. 1;1", err.Error()) + + id, err = ParseMessageID("a:1") + assert.Nil(t, id) + assert.NotNil(t, err) + assert.Equal(t, "invalid ledger id. a:1", err.Error()) + + id, err = ParseMessageID("1:a") + assert.Nil(t, id) + assert.NotNil(t, err) + assert.Equal(t, "invalid entry id. 1:a", err.Error()) + + id, err = ParseMessageID("1:2:a") + assert.Nil(t, id) + assert.NotNil(t, err) + assert.Equal(t, "invalid partition index. 1:2:a", err.Error()) + + id, err = ParseMessageID("1:2:3:a") + assert.Nil(t, id) + assert.NotNil(t, err) + assert.Equal(t, "invalid batch index. 1:2:3:a", err.Error()) +} diff --git a/pkg/pulsar/namespace.go b/pkg/pulsar/namespace.go index a1f96ef..9ec2700 100644 --- a/pkg/pulsar/namespace.go +++ b/pkg/pulsar/namespace.go @@ -144,7 +144,7 @@ type Namespaces interface { DeleteNamespaceAntiAffinityGroup(namespace string) error // Set the deduplication status for all topics within a namespace - // When deduplication is enabled, the broker will prevent to store the same message multiple times + // When deduplication is enabled, the broker will prevent to store the same Message multiple times SetDeduplicationStatus(namespace string, enableDeduplication bool) error // Set the persistence configuration for all the topics on a namespace @@ -202,19 +202,19 @@ type Namespaces interface { // Clear backlog for all topics on a namespace ClearNamespaceBacklog(namespace NameSpaceName) error - // Set replicator-message-dispatch-rate (Replicators under this namespace + // Set replicator-Message-dispatch-rate (Replicators under this namespace // can dispatch this many messages per second) SetReplicatorDispatchRate(namespace NameSpaceName, rate DispatchRate) error - // Get replicator-message-dispatch-rate (Replicators under this namespace + // Get replicator-Message-dispatch-rate (Replicators under this namespace // can dispatch this many messages per second) GetReplicatorDispatchRate(namespace NameSpaceName) (DispatchRate, error) - // Set subscription-message-dispatch-rate (subscriptions under this namespace + // Set subscription-Message-dispatch-rate (subscriptions under this namespace // can dispatch this many messages per second) SetSubscriptionDispatchRate(namespace NameSpaceName, rate DispatchRate) error - // Get subscription-message-dispatch-rate (subscriptions under this namespace + // Get subscription-Message-dispatch-rate (subscriptions under this namespace // can dispatch this many messages per second) GetSubscriptionDispatchRate(namespace NameSpaceName) (DispatchRate, error) @@ -224,10 +224,10 @@ type Namespaces interface { // Get namespace-subscribe-rate (topics under this namespace allow subscribe times per consumer in a period) GetSubscribeRate(namespace NameSpaceName) (SubscribeRate, error) - // Set message-dispatch-rate (topics under this namespace can dispatch this many messages per second) + // Set Message-dispatch-rate (topics under this namespace can dispatch this many messages per second) SetDispatchRate(namespace NameSpaceName, rate DispatchRate) error - // Get message-dispatch-rate (topics under this namespace can dispatch this many messages per second) + // Get Message-dispatch-rate (topics under this namespace can dispatch this many messages per second) GetDispatchRate(namespace NameSpaceName) (DispatchRate, error) } diff --git a/pkg/pulsar/sink_status.go b/pkg/pulsar/sink_status.go index 68fa27d..5629914 100644 --- a/pkg/pulsar/sink_status.go +++ b/pkg/pulsar/sink_status.go @@ -60,7 +60,7 @@ type SinkInstanceStatusData struct { // Number of messages written to sink NumWrittenToSink int64 `json:"numWrittenToSink"` - // When was the last time we received a message from Pulsar + // When was the last time we received a Message from Pulsar LastReceivedTime int64 `json:"lastReceivedTime"` WorkerID string `json:"workerId"` diff --git a/pkg/pulsar/subscription.go b/pkg/pulsar/subscription.go new file mode 100644 index 0000000..8b8e4d1 --- /dev/null +++ b/pkg/pulsar/subscription.go @@ -0,0 +1,246 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "bytes" + "encoding/binary" + "io" + "io/ioutil" + "net/http" + "net/url" + "strconv" + "strings" + + "github.com/golang/protobuf/proto" +) + +type Subscriptions interface { + Create(TopicName, string, MessageID) error + Delete(TopicName, string) error + List(TopicName) ([]string, error) + ResetCursorToMessageID(TopicName, string, MessageID) error + ResetCursorToTimestamp(TopicName, string, int64) error + ClearBacklog(TopicName, string) error + SkipMessages(TopicName, string, int64) error + ExpireMessages(TopicName, string, int64) error + ExpireAllMessages(TopicName, int64) error + PeekMessages(TopicName, string, int) ([]*Message, error) +} + +type subscriptions struct { + client *client + basePath string + SubPath string +} + +func (c *client) Subscriptions() Subscriptions { + return &subscriptions{ + client: c, + basePath: "", + SubPath: "subscription", + } +} + +func (s *subscriptions) Create(topic TopicName, sName string, messageID MessageID) error { + endpoint := s.client.endpoint(s.basePath, topic.GetRestPath(), s.SubPath, url.QueryEscape(sName)) + return s.client.put(endpoint, messageID) +} + +func (s *subscriptions) Delete(topic TopicName, sName string) error { + endpoint := s.client.endpoint(s.basePath, topic.GetRestPath(), s.SubPath, url.QueryEscape(sName)) + return s.client.delete(endpoint) +} + +func (s *subscriptions) List(topic TopicName) ([]string, error) { + endpoint := s.client.endpoint(s.basePath, topic.GetRestPath(), "subscriptions") + var list []string + return list, s.client.get(endpoint, &list) +} + +func (s *subscriptions) ResetCursorToMessageID(topic TopicName, sName string, id MessageID) error { + endpoint := s.client.endpoint(s.basePath, topic.GetRestPath(), s.SubPath, url.QueryEscape(sName), "resetcursor") + return s.client.post(endpoint, id) +} + +func (s *subscriptions) ResetCursorToTimestamp(topic TopicName, sName string, timestamp int64) error { + endpoint := s.client.endpoint( + s.basePath, topic.GetRestPath(), s.SubPath, url.QueryEscape(sName), + "resetcursor", strconv.FormatInt(timestamp, 10)) + return s.client.post(endpoint, "") +} + +func (s *subscriptions) ClearBacklog(topic TopicName, sName string) error { + endpoint := s.client.endpoint( + s.basePath, topic.GetRestPath(), s.SubPath, url.QueryEscape(sName), "skip_all") + return s.client.post(endpoint, "") +} + +func (s *subscriptions) SkipMessages(topic TopicName, sName string, n int64) error { + endpoint := s.client.endpoint( + s.basePath, topic.GetRestPath(), s.SubPath, url.QueryEscape(sName), + "skip", strconv.FormatInt(n, 10)) + return s.client.post(endpoint, "") +} + +func (s *subscriptions) ExpireMessages(topic TopicName, sName string, expire int64) error { + endpoint := s.client.endpoint( + s.basePath, topic.GetRestPath(), s.SubPath, url.QueryEscape(sName), + "expireMessages", strconv.FormatInt(expire, 10)) + return s.client.post(endpoint, "") +} + +func (s *subscriptions) ExpireAllMessages(topic TopicName, expire int64) error { + endpoint := s.client.endpoint( + s.basePath, topic.GetRestPath(), "all_subscription", + "expireMessages", strconv.FormatInt(expire, 10)) + return s.client.post(endpoint, "") +} + +func (s *subscriptions) PeekMessages(topic TopicName, sName string, n int) ([]*Message, error) { + var msgs []*Message + + count := 1 + for n > 0 { + m, err := s.peekNthMessage(topic, sName, count) + if err != nil { + return nil, err + } + msgs = append(msgs, m...) + n -= len(m) + count++ + } + + return msgs, nil +} + +func (s *subscriptions) peekNthMessage(topic TopicName, sName string, pos int) ([]*Message, error) { + endpoint := s.client.endpoint(s.basePath, topic.GetRestPath(), "subscription", url.QueryEscape(sName), + "position", strconv.Itoa(pos)) + req, err := s.client.newRequest(http.MethodGet, endpoint) + if err != nil { + return nil, err + } + + resp, err := checkSuccessful(s.client.doRequest(req)) + if err != nil { + return nil, err + } + defer safeRespClose(resp) + + return handleResp(topic, resp) +} + +const ( + PublishTimeHeader = "X-Pulsar-Publish-Time" + BatchHeader = "X-Pulsar-Num-Batch-Message" + PropertyPrefix = "X-Pulsar-PROPERTY-" +) + +func handleResp(topic TopicName, resp *http.Response) ([]*Message, error) { + msgID := resp.Header.Get("X-Pulsar-Message-ID") + ID, err := ParseMessageID(msgID) + if err != nil { + return nil, err + } + + // read data + payload, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + properties := make(map[string]string) + for k := range resp.Header { + switch { + case k == PublishTimeHeader: + h := resp.Header.Get(k) + if h != "" { + properties["publish-time"] = h + } + case k == BatchHeader: + h := resp.Header.Get(k) + if h != "" { + properties[BatchHeader] = h + } + return getIndividualMsgsFromBatch(topic, ID, payload, properties) + case strings.Contains(k, PropertyPrefix): + key := strings.TrimPrefix(k, PropertyPrefix) + properties[key] = resp.Header.Get(k) + } + } + + return []*Message{NewMessage(topic.String(), *ID, payload, properties)}, nil +} + +func getIndividualMsgsFromBatch(topic TopicName, msgID *MessageID, data []byte, + properties map[string]string) ([]*Message, error) { + + batchSize, err := strconv.Atoi(properties[BatchHeader]) + if err != nil { + return nil, nil + } + + msgs := make([]*Message, 0, batchSize) + + // read all messages in batch + buf32 := make([]byte, 4) + rdBuf := bytes.NewReader(data) + for i := 0; i < batchSize; i++ { + msgID.BatchIndex = i + // singleMetaSize + if _, err := io.ReadFull(rdBuf, buf32); err != nil { + return nil, err + } + singleMetaSize := binary.BigEndian.Uint32(buf32) + + // singleMeta + singleMetaBuf := make([]byte, singleMetaSize) + if _, err := io.ReadFull(rdBuf, singleMetaBuf); err != nil { + return nil, err + } + + singleMeta := new(SingleMessageMetadata) + if err := proto.Unmarshal(singleMetaBuf, singleMeta); err != nil { + return nil, err + } + + if len(singleMeta.Properties) > 0 { + for _, v := range singleMeta.Properties { + k := *v.Key + property := *v.Value + properties[k] = property + } + } + + //payload + singlePayload := make([]byte, singleMeta.GetPayloadSize()) + if _, err := io.ReadFull(rdBuf, singlePayload); err != nil { + return nil, err + } + + msgs = append(msgs, &Message{ + topic: topic.String(), + messageID: *msgID, + payload: singlePayload, + properties: properties, + }) + } + + return msgs, nil +}