Skip to content

Commit

Permalink
feat: Support SubscribeTopicEvents API (#1088)
Browse files Browse the repository at this point in the history
  • Loading branch information
CrazyHZM authored Dec 17, 2024
1 parent f9dfcaf commit 20b531e
Show file tree
Hide file tree
Showing 10 changed files with 2,345 additions and 984 deletions.
3 changes: 2 additions & 1 deletion demo/pubsub/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ You can run server/client demo with different component names.
It is worth noting that both server and client demo should set the same store name by param `-s`.
For example:
```shell
cd ${project_path}/demo/pubsub/server/
#################### Run pubsub demo with appcallback ####################
cd ${project_path}/demo/pubsub/appcallback/
# 1. start subscriber
go build -o subscriber
/.subscriber -s pub_subs_demo
Expand Down
File renamed without changes.
2 changes: 2 additions & 0 deletions pkg/grpc/default_api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type api struct {
// app callback
AppCallbackConn *grpc.ClientConn
topicPerComponent map[string]TopicSubscriptions
streamer *streamer
// json
json jsoniter.API
}
Expand Down Expand Up @@ -148,6 +149,7 @@ func NewAPI(
secretStores: secretStores,
json: jsoniter.ConfigFastest,
}

}

func (a *api) SayHello(ctx context.Context, in *runtimev1pb.SayHelloRequest) (*runtimev1pb.SayHelloResponse, error) {
Expand Down
94 changes: 54 additions & 40 deletions pkg/grpc/default_api/api_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,56 +151,23 @@ func (a *api) getInterestedTopics() (map[string]TopicSubscriptions, error) {
}

func (a *api) publishMessageGRPC(ctx context.Context, msg *pubsub.NewMessage) error {
// 1. Unmarshal to cloudEvent model
var cloudEvent map[string]interface{}
err := a.json.Unmarshal(msg.Data, &cloudEvent)

// TODO tracing
envelope, cloudEvent, err := a.envelopeFromSubscriptionMessage(ctx, msg)

if err != nil {
log.DefaultLogger.Debugf("[runtime]error deserializing cloud events proto: %s", err)
return err
}

// 2. Drop msg if the current cloud event has expired
if pubsub.HasExpired(cloudEvent) {
log.DefaultLogger.Warnf("[runtime]dropping expired pub/sub event %v as of %v", cloudEvent[pubsub.IDField].(string), cloudEvent[pubsub.ExpirationField].(string))
if envelope == nil {
return nil
}

// 3. Convert to proto domain struct
envelope := &runtimev1pb.TopicEventRequest{
Id: cloudEvent[pubsub.IDField].(string),
Source: cloudEvent[pubsub.SourceField].(string),
DataContentType: cloudEvent[pubsub.DataContentTypeField].(string),
Type: cloudEvent[pubsub.TypeField].(string),
SpecVersion: cloudEvent[pubsub.SpecVersionField].(string),
Topic: msg.Topic,
PubsubName: msg.Metadata[Metadata_key_pubsubName],
}

// set data field
if data, ok := cloudEvent[pubsub.DataBase64Field]; ok && data != nil {
decoded, decodeErr := base64.StdEncoding.DecodeString(data.(string))
if decodeErr != nil {
log.DefaultLogger.Debugf("unable to base64 decode cloudEvent field data_base64: %s", decodeErr)
return err
}

envelope.Data = decoded
} else if data, ok := cloudEvent[pubsub.DataField]; ok && data != nil {
envelope.Data = nil

if contenttype.IsStringContentType(envelope.DataContentType) {
envelope.Data = []byte(data.(string))
} else if contenttype.IsJSONContentType(envelope.DataContentType) {
envelope.Data, _ = a.json.Marshal(data)
}
}
// TODO tracing

// 4. Call appcallback
// Call appcallback
clientV1 := runtimev1pb.NewAppCallbackClient(a.AppCallbackConn)
res, err := clientV1.OnTopicEvent(ctx, envelope)

// 5. Check result
// Check result
return retryStrategy(err, res, cloudEvent)
}

Expand Down Expand Up @@ -246,3 +213,50 @@ func listTopicSubscriptions(client runtimev1pb.AppCallbackClient, log log.ErrorL
}
return make([]*runtimev1pb.TopicSubscription, 0)
}

func (a *api) envelopeFromSubscriptionMessage(ctx context.Context, msg *pubsub.NewMessage) (*runtimev1pb.TopicEventRequest, map[string]interface{}, error) {
// 1. Unmarshal to cloudEvent model
var cloudEvent map[string]interface{}
err := a.json.Unmarshal(msg.Data, &cloudEvent)
if err != nil {
log.DefaultLogger.Debugf("[runtime]error deserializing cloud events proto: %s", err)
return nil, cloudEvent, err
}

// 2. Drop msg if the current cloud event has expired
if pubsub.HasExpired(cloudEvent) {
log.DefaultLogger.Warnf("[runtime]dropping expired pub/sub event %v as of %v", cloudEvent[pubsub.IDField].(string), cloudEvent[pubsub.ExpirationField].(string))
return nil, cloudEvent, nil
}

// 3. Convert to proto domain struct
envelope := &runtimev1pb.TopicEventRequest{
Id: cloudEvent[pubsub.IDField].(string),
Source: cloudEvent[pubsub.SourceField].(string),
DataContentType: cloudEvent[pubsub.DataContentTypeField].(string),
Type: cloudEvent[pubsub.TypeField].(string),
SpecVersion: cloudEvent[pubsub.SpecVersionField].(string),
Topic: msg.Topic,
PubsubName: msg.Metadata[Metadata_key_pubsubName],
}

// set data field
if data, ok := cloudEvent[pubsub.DataBase64Field]; ok && data != nil {
decoded, decodeErr := base64.StdEncoding.DecodeString(data.(string))
if decodeErr != nil {
log.DefaultLogger.Debugf("unable to base64 decode cloudEvent field data_base64: %s", decodeErr)
return nil, cloudEvent, err
}

envelope.Data = decoded
} else if data, ok := cloudEvent[pubsub.DataField]; ok && data != nil {
envelope.Data = nil

if contenttype.IsStringContentType(envelope.DataContentType) {
envelope.Data = []byte(data.(string))
} else if contenttype.IsJSONContentType(envelope.DataContentType) {
envelope.Data, _ = a.json.Marshal(data)
}
}
return envelope, cloudEvent, nil
}
Loading

0 comments on commit 20b531e

Please sign in to comment.