Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Add subscription commands (#58)
Browse files Browse the repository at this point in the history
Master issue: #2
  • Loading branch information
zymap authored and maxsxu committed Mar 14, 2023
1 parent 67ccfb6 commit 16a50dc
Show file tree
Hide file tree
Showing 8 changed files with 467 additions and 8 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.12
require (
github.com/davecgh/go-spew v1.1.1
github.com/fatih/color v1.7.0 // indirect
github.com/golang/protobuf v1.3.1
github.com/google/go-cmp v0.3.1 // indirect
github.com/kris-nova/logger v0.0.0-20181127235838-fd0d87064b06
github.com/kris-nova/lolgopher v0.0.0-20180921204813-313b3abb0d9b // indirect
Expand Down
1 change: 1 addition & 0 deletions pkg/pulsar/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type Client interface {
Functions() Functions
Tenants() Tenants
Topics() Topics
Subscriptions() Subscriptions
Sources() Sources
Sinks() Sinks
Namespaces() Namespaces
Expand Down
88 changes: 88 additions & 0 deletions pkg/pulsar/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsar

import "github.com/golang/protobuf/proto"

type Message struct {
messageID MessageID
payload []byte
topic string
properties map[string]string
}

func NewMessage(topic string, id MessageID, payload []byte, properties map[string]string) *Message {
return &Message{
messageID: id,
payload: payload,
topic: topic,
properties: properties,
}
}

func (m *Message) GetMessageID() MessageID {
return m.messageID
}

func (m *Message) GetProperties() map[string]string {
return m.properties
}

func (m *Message) GetPayload() []byte {
return m.payload
}

// nolint
type SingleMessageMetadata struct {
Properties []*KeyValue `protobuf:"bytes,1,rep,name=properties" json:"properties,omitempty"`
PartitionKey *string `protobuf:"bytes,2,opt,name=partition_key,json=partitionKey" json:"partition_key,omitempty"`
PayloadSize *int32 `protobuf:"varint,3,req,name=payload_size,json=payloadSize" json:"payload_size,omitempty"`
CompactedOut *bool `protobuf:"varint,4,opt,name=compacted_out,json=compactedOut,def=0" json:"compacted_out,omitempty"`
// the timestamp that this event occurs. it is typically set by applications.
// if this field is omitted, `publish_time` can be used for the purpose of `event_time`.
EventTime *uint64 `protobuf:"varint,5,opt,name=event_time,json=eventTime,def=0" json:"event_time,omitempty"`
PartitionKeyB64Encoded *bool `protobuf:"varint,6,opt,name=partition_key_b64_encoded,json=partitionKeyB64Encoded,def=0" json:"partition_key_b64_encoded,omitempty"`
// Specific a key to overwrite the message key which used for ordering dispatch in Key_Shared mode.
OrderingKey []byte `protobuf:"bytes,7,opt,name=ordering_key,json=orderingKey" json:"ordering_key,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}

func (m *SingleMessageMetadata) Reset() { *m = SingleMessageMetadata{} }
func (m *SingleMessageMetadata) String() string { return proto.CompactTextString(m) }
func (*SingleMessageMetadata) ProtoMessage() {}
func (m *SingleMessageMetadata) GetPayloadSize() int32 {
if m != nil && m.PayloadSize != nil {
return *m.PayloadSize
}
return 0
}

// nolint
type KeyValue struct {
Key *string `protobuf:"bytes,1,req,name=key" json:"key,omitempty"`
Value *string `protobuf:"bytes,2,req,name=value" json:"value,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}

func (m *KeyValue) Reset() { *m = KeyValue{} }
func (m *KeyValue) String() string { return proto.CompactTextString(m) }
func (*KeyValue) ProtoMessage() {}
58 changes: 58 additions & 0 deletions pkg/pulsar/message_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,66 @@

package pulsar

import (
"strconv"
"strings"

"github.com/pkg/errors"
)

type MessageID struct {
LedgerID int64 `json:"ledgerId"`
EntryID int64 `json:"entryId"`
PartitionedIndex int `json:"partitionedIndex"`
BatchIndex int `json:"-"`
}

var Latest = MessageID{0x7fffffffffffffff, 0x7fffffffffffffff, -1, -1}
var Earliest = MessageID{-1, -1, -1, -1}

func ParseMessageID(str string) (*MessageID, error) {
s := strings.Split(str, ":")

m := Earliest

if len(s) < 2 || len(s) > 4 {
return nil, errors.Errorf("invalid message id string. %s", str)
}

ledgerID, err := strconv.ParseInt(s[0], 10, 64)
if err != nil {
return nil, errors.Errorf("invalid ledger id. %s", str)
}
m.LedgerID = ledgerID

entryID, err := strconv.ParseInt(s[1], 10, 64)
if err != nil {
return nil, errors.Errorf("invalid entry id. %s", str)
}
m.EntryID = entryID

if len(s) > 2 {
pi, err := strconv.Atoi(s[2])
if err != nil {
return nil, errors.Errorf("invalid partition index. %s", str)
}
m.PartitionedIndex = pi
}

if len(s) == 4 {
bi, err := strconv.Atoi(s[3])
if err != nil {
return nil, errors.Errorf("invalid batch index. %s", str)
}
m.BatchIndex = bi
}

return &m, nil
}

func (m MessageID) String() string {
return strconv.FormatInt(m.LedgerID, 10) + ":" +
strconv.FormatInt(m.EntryID, 10) + ":" +
strconv.Itoa(m.PartitionedIndex) + ":" +
strconv.Itoa(m.BatchIndex)
}
65 changes: 65 additions & 0 deletions pkg/pulsar/message_id_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsar

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestParseMessageId(t *testing.T) {
id, err := ParseMessageID("1:1")
assert.Nil(t, err)
assert.Equal(t, MessageID{LedgerID: 1, EntryID: 1, PartitionedIndex: -1, BatchIndex: -1}, *id)

id, err = ParseMessageID("1:2:3")
assert.Nil(t, err)
assert.Equal(t, MessageID{LedgerID: 1, EntryID: 2, PartitionedIndex: 3, BatchIndex: -1}, *id)

id, err = ParseMessageID("1:2:3:4")
assert.Nil(t, err)
assert.Equal(t, MessageID{LedgerID: 1, EntryID: 2, PartitionedIndex: 3, BatchIndex: 4}, *id)
}

func TestParseMessageIdErrors(t *testing.T) {
id, err := ParseMessageID("1;1")
assert.Nil(t, id)
assert.NotNil(t, err)
assert.Equal(t, "invalid message id string. 1;1", err.Error())

id, err = ParseMessageID("a:1")
assert.Nil(t, id)
assert.NotNil(t, err)
assert.Equal(t, "invalid ledger id. a:1", err.Error())

id, err = ParseMessageID("1:a")
assert.Nil(t, id)
assert.NotNil(t, err)
assert.Equal(t, "invalid entry id. 1:a", err.Error())

id, err = ParseMessageID("1:2:a")
assert.Nil(t, id)
assert.NotNil(t, err)
assert.Equal(t, "invalid partition index. 1:2:a", err.Error())

id, err = ParseMessageID("1:2:3:a")
assert.Nil(t, id)
assert.NotNil(t, err)
assert.Equal(t, "invalid batch index. 1:2:3:a", err.Error())
}
14 changes: 7 additions & 7 deletions pkg/pulsar/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ type Namespaces interface {
DeleteNamespaceAntiAffinityGroup(namespace string) error

// Set the deduplication status for all topics within a namespace
// When deduplication is enabled, the broker will prevent to store the same message multiple times
// When deduplication is enabled, the broker will prevent to store the same Message multiple times
SetDeduplicationStatus(namespace string, enableDeduplication bool) error

// Set the persistence configuration for all the topics on a namespace
Expand Down Expand Up @@ -202,19 +202,19 @@ type Namespaces interface {
// Clear backlog for all topics on a namespace
ClearNamespaceBacklog(namespace NameSpaceName) error

// Set replicator-message-dispatch-rate (Replicators under this namespace
// Set replicator-Message-dispatch-rate (Replicators under this namespace
// can dispatch this many messages per second)
SetReplicatorDispatchRate(namespace NameSpaceName, rate DispatchRate) error

// Get replicator-message-dispatch-rate (Replicators under this namespace
// Get replicator-Message-dispatch-rate (Replicators under this namespace
// can dispatch this many messages per second)
GetReplicatorDispatchRate(namespace NameSpaceName) (DispatchRate, error)

// Set subscription-message-dispatch-rate (subscriptions under this namespace
// Set subscription-Message-dispatch-rate (subscriptions under this namespace
// can dispatch this many messages per second)
SetSubscriptionDispatchRate(namespace NameSpaceName, rate DispatchRate) error

// Get subscription-message-dispatch-rate (subscriptions under this namespace
// Get subscription-Message-dispatch-rate (subscriptions under this namespace
// can dispatch this many messages per second)
GetSubscriptionDispatchRate(namespace NameSpaceName) (DispatchRate, error)

Expand All @@ -224,10 +224,10 @@ type Namespaces interface {
// Get namespace-subscribe-rate (topics under this namespace allow subscribe times per consumer in a period)
GetSubscribeRate(namespace NameSpaceName) (SubscribeRate, error)

// Set message-dispatch-rate (topics under this namespace can dispatch this many messages per second)
// Set Message-dispatch-rate (topics under this namespace can dispatch this many messages per second)
SetDispatchRate(namespace NameSpaceName, rate DispatchRate) error

// Get message-dispatch-rate (topics under this namespace can dispatch this many messages per second)
// Get Message-dispatch-rate (topics under this namespace can dispatch this many messages per second)
GetDispatchRate(namespace NameSpaceName) (DispatchRate, error)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/pulsar/sink_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type SinkInstanceStatusData struct {
// Number of messages written to sink
NumWrittenToSink int64 `json:"numWrittenToSink"`

// When was the last time we received a message from Pulsar
// When was the last time we received a Message from Pulsar
LastReceivedTime int64 `json:"lastReceivedTime"`

WorkerID string `json:"workerId"`
Expand Down
Loading

0 comments on commit 16a50dc

Please sign in to comment.