Skip to content

Commit

Permalink
Add TableView support (#743)
Browse files Browse the repository at this point in the history
* Add TableView for Pulsar Golang client

* cckellogg

* Return a copy of the underlying map in tv.Entries()
  • Loading branch information
ZiyaoWei authored Mar 31, 2022
1 parent ae400ca commit 3e1b39d
Show file tree
Hide file tree
Showing 5 changed files with 442 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pulsar/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ type Client interface {
// This method will block until the reader is created successfully.
CreateReader(ReaderOptions) (Reader, error)

// CreateTableView creates a table view instance.
// This method will block until the table view is created successfully.
CreateTableView(TableViewOptions) (TableView, error)

// TopicPartitions Fetches the list of partitions for a given topic
//
// If the topic is partitioned, this will return a list of partition names.
Expand Down
9 changes: 9 additions & 0 deletions pulsar/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,15 @@ func (c *client) CreateReader(options ReaderOptions) (Reader, error) {
return reader, nil
}

func (c *client) CreateTableView(options TableViewOptions) (TableView, error) {
tableView, err := newTableView(c, options)
if err != nil {
return nil, err
}
c.handlers.Add(tableView)
return tableView, nil
}

func (c *client) TopicPartitions(topic string) ([]string, error) {
topicName, err := internal.ParseTopicName(topic)
if err != nil {
Expand Down
78 changes: 78 additions & 0 deletions pulsar/table_view.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsar

import (
"reflect"
"time"

"github.com/apache/pulsar-client-go/pulsar/log"
)

// TableViewOptions contains the options for creating a TableView
type TableViewOptions struct {
// Topic specifies the topic this table view will subscribe on.
// This argument is required when constructing the table view.
Topic string

// Set the interval of updating partitions. Default to 1 minute.
AutoUpdatePartitionsInterval time.Duration

// Schema represents the schema implementation.
Schema Schema

// SchemaValueType represents the type of values for the given schema.
SchemaValueType reflect.Type

// Configure the logger used by the TableView.
// By default, a wrapped logrus.StandardLogger will be used, namely,
// log.NewLoggerWithLogrus(logrus.StandardLogger())
Logger log.Logger
}

// TableView provides a key-value map view of a compacted topic. Messages without keys will be ignored.
type TableView interface {
// Size returns the number of key-value mappings in the TableView.
Size() int

// IsEmpty returns true if this TableView contains no key-value mappings.
IsEmpty() bool

// ContainsKey returns true if this TableView contains a mapping for the specified key.
ContainsKey(key string) bool

// Get returns the value to which the specified key is mapped, or nil if this map contains no mapping for the key.
Get(key string) interface{}

// Entries returns a map view of the mappings contained in this TableView.
Entries() map[string]interface{}

// Keys returns a slice of the keys contained in this TableView.
Keys() []string

// ForEach performs the give action for each entry in this map until all entries have been processed or the action
// returns an error.
ForEach(func(string, interface{}) error) error

// ForEachAndListen performs the give action for each entry in this map until all entries have been processed or
// the action returns an error.
ForEachAndListen(func(string, interface{}) error) error

// Close closes the table view and releases resources allocated.
Close()
}
271 changes: 271 additions & 0 deletions pulsar/table_view_impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsar

import (
"context"
"errors"
"fmt"
"reflect"
"sync"
"time"

"github.com/apache/pulsar-client-go/pulsar/log"
"github.com/sirupsen/logrus"
)

type cancelReader struct {
reader Reader
cancelFunc context.CancelFunc
}

type TableViewImpl struct {
client *client
options TableViewOptions

dataMu sync.Mutex
data map[string]interface{}

readersMu sync.Mutex
cancelRaders map[string]cancelReader

listenersMu sync.Mutex
listeners []func(string, interface{}) error

logger log.Logger
closed bool
closedCh chan struct{}
}

func newTableView(client *client, options TableViewOptions) (TableView, error) {
if options.Topic == "" {
return nil, newError(TopicNotFound, "topic is required")
}

if options.Schema != nil && options.SchemaValueType == nil {
return nil, newError(InvalidConfiguration, "SchemaValueType is required when Schema is present")
}

var logger log.Logger
if options.Logger != nil {
logger = options.Logger
} else {
logger = log.NewLoggerWithLogrus(logrus.StandardLogger())
}

if options.AutoUpdatePartitionsInterval == 0 {
options.AutoUpdatePartitionsInterval = time.Minute
}

tv := TableViewImpl{
client: client,
options: options,
data: make(map[string]interface{}),
cancelRaders: make(map[string]cancelReader),
logger: logger,
closedCh: make(chan struct{}),
}

// Do an initial round of partition update check to make sure we can populate the partition readers
if err := tv.partitionUpdateCheck(); err != nil {
return nil, err
}
go tv.periodicPartitionUpdateCheck()

return &tv, nil
}

func (tv *TableViewImpl) partitionUpdateCheck() error {
partitionsArray, err := tv.client.TopicPartitions(tv.options.Topic)
if err != nil {
return fmt.Errorf("tv.client.TopicPartitions(%s) failed: %w", tv.options.Topic, err)
}

partitions := make(map[string]bool, len(partitionsArray))
for _, partition := range partitionsArray {
partitions[partition] = true
}

tv.readersMu.Lock()
defer tv.readersMu.Unlock()

for partition, cancelReader := range tv.cancelRaders {
if _, ok := partitions[partition]; !ok {
cancelReader.cancelFunc()
cancelReader.reader.Close()
delete(tv.cancelRaders, partition)
}
}

for partition := range partitions {
if _, ok := tv.cancelRaders[partition]; !ok {
reader, err := newReader(tv.client, ReaderOptions{
Topic: partition,
StartMessageID: EarliestMessageID(),
ReadCompacted: true,
// TODO: Pooling?
Schema: tv.options.Schema,
})
if err != nil {
return fmt.Errorf("create new reader failed for %s: %w", partition, err)
}
for reader.HasNext() {
msg, err := reader.Next(context.Background())
if err != nil {
tv.logger.Errorf("read next message failed for %s: %w", partition, err)
}
tv.handleMessage(msg)
}
ctx, cancelFunc := context.WithCancel(context.Background())
tv.cancelRaders[partition] = cancelReader{
reader: reader,
cancelFunc: cancelFunc,
}
go tv.watchReaderForNewMessages(ctx, reader)
}
}

return nil
}

func (tv *TableViewImpl) periodicPartitionUpdateCheck() {
for {
if err := tv.partitionUpdateCheck(); err != nil {
tv.logger.Errorf("failed to check for changes in number of partitions: %w", err)
}
select {
case <-tv.closedCh:
// If the TableViewImpl has been closed, stop checking for partition updates
return
case <-time.After(tv.options.AutoUpdatePartitionsInterval):
continue
}
}
}

func (tv *TableViewImpl) Size() int {
tv.dataMu.Lock()
defer tv.dataMu.Unlock()
return len(tv.data)
}

func (tv *TableViewImpl) IsEmpty() bool {
tv.dataMu.Lock()
defer tv.dataMu.Unlock()
return tv.Size() == 0
}

func (tv *TableViewImpl) ContainsKey(key string) bool {
tv.dataMu.Lock()
defer tv.dataMu.Unlock()
_, ok := tv.data[key]
return ok
}

func (tv *TableViewImpl) Get(key string) interface{} {
tv.dataMu.Lock()
defer tv.dataMu.Unlock()
return tv.data[key]
}

func (tv *TableViewImpl) Entries() map[string]interface{} {
tv.dataMu.Lock()
defer tv.dataMu.Unlock()
data := make(map[string]interface{}, len(tv.data))
for k, v := range tv.data {
data[k] = v
}
return tv.data
}

func (tv *TableViewImpl) Keys() []string {
tv.dataMu.Lock()
defer tv.dataMu.Unlock()
keys := make([]string, len(tv.data))
i := 0
for k := range tv.data {
keys[i] = k
i++
}
return keys
}

func (tv *TableViewImpl) ForEach(action func(string, interface{}) error) error {
tv.dataMu.Lock()
defer tv.dataMu.Unlock()
for k, v := range tv.data {
if err := action(k, v); err != nil {
return err
}
}
return nil
}

func (tv *TableViewImpl) ForEachAndListen(action func(string, interface{}) error) error {
tv.listenersMu.Lock()
defer tv.listenersMu.Unlock()

if err := tv.ForEach(action); err != nil {
return err
}

tv.listeners = append(tv.listeners, action)
return nil
}

func (tv *TableViewImpl) Close() {
tv.readersMu.Lock()
defer tv.readersMu.Unlock()

if !tv.closed {
tv.closed = true
for _, cancelReader := range tv.cancelRaders {
cancelReader.reader.Close()
}
close(tv.closedCh)
}
}

func (tv *TableViewImpl) handleMessage(msg Message) {
tv.dataMu.Lock()
defer tv.dataMu.Unlock()

payload := reflect.Indirect(reflect.New(tv.options.SchemaValueType)).Interface()
if err := msg.GetSchemaValue(&payload); err != nil {
tv.logger.Errorf("msg.GetSchemaValue() failed with %w; msg is %v", msg, err)
}
tv.data[msg.Key()] = payload
for _, listener := range tv.listeners {
if err := listener(msg.Key(), payload); err != nil {
tv.logger.Errorf("table view listener failed for %v: %w", msg, err)
}
}
}

func (tv *TableViewImpl) watchReaderForNewMessages(ctx context.Context, reader Reader) {
for {
msg, err := reader.Next(ctx)
if err != nil {
tv.logger.Errorf("read next message failed for %s: %w", reader.Topic(), err)
}
if errors.Is(err, context.Canceled) {
return
}
tv.handleMessage(msg)
}
}
Loading

0 comments on commit 3e1b39d

Please sign in to comment.