From 184510b62d43a98490cbdf1f6c7dbf2c223a9191 Mon Sep 17 00:00:00 2001 From: Victor Nosov Date: Wed, 27 May 2020 08:55:47 +0300 Subject: [PATCH] events service (#49) --- gateway/events/Makefile | 22 +++++ gateway/events/event.pb.go | 144 ++++++++++++++++++++++++++++++ gateway/events/event.pb.gw.go | 116 ++++++++++++++++++++++++ gateway/events/event.pb.md | 62 +++++++++++++ gateway/events/event.proto | 17 ++++ gateway/events/event.swagger.json | 107 ++++++++++++++++++++++ gateway/events/service.go | 42 +++++++++ 7 files changed, 510 insertions(+) create mode 100644 gateway/events/Makefile create mode 100644 gateway/events/event.pb.go create mode 100644 gateway/events/event.pb.gw.go create mode 100644 gateway/events/event.pb.md create mode 100644 gateway/events/event.proto create mode 100644 gateway/events/event.swagger.json create mode 100644 gateway/events/service.go diff --git a/gateway/events/Makefile b/gateway/events/Makefile new file mode 100644 index 00000000..ee0e8bec --- /dev/null +++ b/gateway/events/Makefile @@ -0,0 +1,22 @@ + +.: generate + +GOFLAGS ?= -mod=vendor + +test: + @go test -ginkgo.failFast + +generate: clean + @echo "event service proto generation" + @protoc -I=. \ + -I=../../../ \ + -I=../../vendor \ + -I=../../third_party/googleapis \ + --go_out=plugins=grpc:. \ + --grpc-gateway_out=logtostderr=true:. \ + --swagger_out=logtostderr=true:. \ + --doc_out=markdown,event.pb.md:. \ + ./*.proto + +clean: + @rm -f *.swagger.json *.pb.md *.validator.pb.go *.pb.go *.pb.gw.go *.pb.cc.go diff --git a/gateway/events/event.pb.go b/gateway/events/event.pb.go new file mode 100644 index 00000000..ed442cea --- /dev/null +++ b/gateway/events/event.pb.go @@ -0,0 +1,144 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: event.proto + +package events + +import ( + context "context" + fmt "fmt" + proto "github.com/golang/protobuf/proto" + empty "github.com/golang/protobuf/ptypes/empty" + peer "github.com/hyperledger/fabric/protos/peer" + _ "google.golang.org/genproto/googleapis/api/annotations" + grpc "google.golang.org/grpc" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +func init() { proto.RegisterFile("event.proto", fileDescriptor_2d17a9d3f0ddf27e) } + +var fileDescriptor_2d17a9d3f0ddf27e = []byte{ + // 206 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x4e, 0x2d, 0x4b, 0xcd, + 0x2b, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x03, 0x73, 0x8a, 0xa5, 0x64, 0xd2, 0xf3, + 0xf3, 0xd3, 0x73, 0x52, 0xf5, 0x13, 0x0b, 0x32, 0xf5, 0x13, 0xf3, 0xf2, 0xf2, 0x4b, 0x12, 0x4b, + 0x32, 0xf3, 0xf3, 0x8a, 0x21, 0xaa, 0xa4, 0xa4, 0xa1, 0xb2, 0x60, 0x5e, 0x52, 0x69, 0x9a, 0x7e, + 0x6a, 0x6e, 0x41, 0x49, 0x25, 0x54, 0xd2, 0x3e, 0x3d, 0xb3, 0x24, 0xa3, 0x34, 0x49, 0x2f, 0x39, + 0x3f, 0x57, 0x3f, 0xa3, 0xb2, 0x20, 0xb5, 0x28, 0x27, 0x35, 0x25, 0x3d, 0xb5, 0x48, 0x3f, 0x2d, + 0x31, 0xa9, 0x28, 0x33, 0x19, 0xa2, 0xa7, 0x58, 0xbf, 0x20, 0x35, 0xb5, 0x48, 0x3f, 0x39, 0x23, + 0x31, 0x33, 0x2f, 0x39, 0x3f, 0x25, 0x35, 0x1e, 0xc9, 0x0d, 0x46, 0x89, 0x5c, 0x7c, 0xce, 0x30, + 0x09, 0x57, 0x90, 0xb8, 0x90, 0x3f, 0x17, 0x37, 0x98, 0x11, 0x5c, 0x52, 0x94, 0x9a, 0x98, 0x2b, + 0x24, 0xa6, 0x07, 0xb1, 0x5f, 0x0f, 0x66, 0xbf, 0x9e, 0x2b, 0xc8, 0x7e, 0x29, 0x31, 0x88, 0x40, + 0xb1, 0x1e, 0xaa, 0x76, 0x25, 0xbe, 0xa6, 0xcb, 0x4f, 0x26, 0x33, 0x71, 0x08, 0xb1, 0xe9, 0x83, + 0xad, 0x31, 0x60, 0x74, 0xe2, 0x8a, 0xe2, 0xd0, 0xb3, 0x86, 0x78, 0x35, 0x89, 0x0d, 0xac, 0xc9, + 0x18, 0x10, 0x00, 0x00, 0xff, 0xff, 0x0f, 0x64, 0xa3, 0xd0, 0x08, 0x01, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// ChaincodeEventClient is the client API for ChaincodeEvent service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type ChaincodeEventClient interface { + EventStream(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (ChaincodeEvent_EventStreamClient, error) +} + +type chaincodeEventClient struct { + cc *grpc.ClientConn +} + +func NewChaincodeEventClient(cc *grpc.ClientConn) ChaincodeEventClient { + return &chaincodeEventClient{cc} +} + +func (c *chaincodeEventClient) EventStream(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (ChaincodeEvent_EventStreamClient, error) { + stream, err := c.cc.NewStream(ctx, &_ChaincodeEvent_serviceDesc.Streams[0], "/events.ChaincodeEvent/EventStream", opts...) + if err != nil { + return nil, err + } + x := &chaincodeEventEventStreamClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type ChaincodeEvent_EventStreamClient interface { + Recv() (*peer.ChaincodeEvent, error) + grpc.ClientStream +} + +type chaincodeEventEventStreamClient struct { + grpc.ClientStream +} + +func (x *chaincodeEventEventStreamClient) Recv() (*peer.ChaincodeEvent, error) { + m := new(peer.ChaincodeEvent) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// ChaincodeEventServer is the server API for ChaincodeEvent service. +type ChaincodeEventServer interface { + EventStream(*empty.Empty, ChaincodeEvent_EventStreamServer) error +} + +func RegisterChaincodeEventServer(s *grpc.Server, srv ChaincodeEventServer) { + s.RegisterService(&_ChaincodeEvent_serviceDesc, srv) +} + +func _ChaincodeEvent_EventStream_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(empty.Empty) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(ChaincodeEventServer).EventStream(m, &chaincodeEventEventStreamServer{stream}) +} + +type ChaincodeEvent_EventStreamServer interface { + Send(*peer.ChaincodeEvent) error + grpc.ServerStream +} + +type chaincodeEventEventStreamServer struct { + grpc.ServerStream +} + +func (x *chaincodeEventEventStreamServer) Send(m *peer.ChaincodeEvent) error { + return x.ServerStream.SendMsg(m) +} + +var _ChaincodeEvent_serviceDesc = grpc.ServiceDesc{ + ServiceName: "events.ChaincodeEvent", + HandlerType: (*ChaincodeEventServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "EventStream", + Handler: _ChaincodeEvent_EventStream_Handler, + ServerStreams: true, + }, + }, + Metadata: "event.proto", +} diff --git a/gateway/events/event.pb.gw.go b/gateway/events/event.pb.gw.go new file mode 100644 index 00000000..a1a0fc16 --- /dev/null +++ b/gateway/events/event.pb.gw.go @@ -0,0 +1,116 @@ +// Code generated by protoc-gen-grpc-gateway. DO NOT EDIT. +// source: event.proto + +/* +Package events is a reverse proxy. + +It translates gRPC into RESTful JSON APIs. +*/ +package events + +import ( + "context" + "io" + "net/http" + + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes/empty" + "github.com/grpc-ecosystem/grpc-gateway/runtime" + "github.com/grpc-ecosystem/grpc-gateway/utilities" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/status" +) + +var _ codes.Code +var _ io.Reader +var _ status.Status +var _ = runtime.String +var _ = utilities.NewDoubleArray + +func request_ChaincodeEvent_EventStream_0(ctx context.Context, marshaler runtime.Marshaler, client ChaincodeEventClient, req *http.Request, pathParams map[string]string) (ChaincodeEvent_EventStreamClient, runtime.ServerMetadata, error) { + var protoReq empty.Empty + var metadata runtime.ServerMetadata + + stream, err := client.EventStream(ctx, &protoReq) + if err != nil { + return nil, metadata, err + } + header, err := stream.Header() + if err != nil { + return nil, metadata, err + } + metadata.HeaderMD = header + return stream, metadata, nil + +} + +// RegisterChaincodeEventHandlerFromEndpoint is same as RegisterChaincodeEventHandler but +// automatically dials to "endpoint" and closes the connection when "ctx" gets done. +func RegisterChaincodeEventHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) { + conn, err := grpc.Dial(endpoint, opts...) + if err != nil { + return err + } + defer func() { + if err != nil { + if cerr := conn.Close(); cerr != nil { + grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr) + } + return + } + go func() { + <-ctx.Done() + if cerr := conn.Close(); cerr != nil { + grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr) + } + }() + }() + + return RegisterChaincodeEventHandler(ctx, mux, conn) +} + +// RegisterChaincodeEventHandler registers the http handlers for service ChaincodeEvent to "mux". +// The handlers forward requests to the grpc endpoint over "conn". +func RegisterChaincodeEventHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error { + return RegisterChaincodeEventHandlerClient(ctx, mux, NewChaincodeEventClient(conn)) +} + +// RegisterChaincodeEventHandlerClient registers the http handlers for service ChaincodeEvent +// to "mux". The handlers forward requests to the grpc endpoint over the given implementation of "ChaincodeEventClient". +// Note: the gRPC framework executes interceptors within the gRPC handler. If the passed in "ChaincodeEventClient" +// doesn't go through the normal gRPC flow (creating a gRPC client etc.) then it will be up to the passed in +// "ChaincodeEventClient" to call the correct interceptors. +func RegisterChaincodeEventHandlerClient(ctx context.Context, mux *runtime.ServeMux, client ChaincodeEventClient) error { + + mux.Handle("GET", pattern_ChaincodeEvent_EventStream_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { + ctx, cancel := context.WithCancel(req.Context()) + defer cancel() + inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) + rctx, err := runtime.AnnotateContext(ctx, mux, req) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + resp, md, err := request_ChaincodeEvent_EventStream_0(rctx, inboundMarshaler, client, req, pathParams) + ctx = runtime.NewServerMetadataContext(ctx, md) + if err != nil { + runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) + return + } + + forward_ChaincodeEvent_EventStream_0(ctx, mux, outboundMarshaler, w, req, func() (proto.Message, error) { return resp.Recv() }, mux.GetForwardResponseOptions()...) + + }) + + return nil +} + +var ( + pattern_ChaincodeEvent_EventStream_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0}, []string{"event"}, "", runtime.AssumeColonVerbOpt(true))) +) + +var ( + forward_ChaincodeEvent_EventStream_0 = runtime.ForwardResponseStream +) diff --git a/gateway/events/event.pb.md b/gateway/events/event.pb.md new file mode 100644 index 00000000..6fedac85 --- /dev/null +++ b/gateway/events/event.pb.md @@ -0,0 +1,62 @@ +# Protocol Documentation + + +## Table of Contents + +- [event.proto](#event.proto) + + + + - [ChaincodeEvent](#events.ChaincodeEvent) + + +- [Scalar Value Types](#scalar-value-types) + + + + +

Top

+ +## event.proto + + + + + + + + + + + +### ChaincodeEvent + + +| Method Name | Request Type | Response Type | Description | +| ----------- | ------------ | ------------- | ------------| +| EventStream | [.google.protobuf.Empty](#google.protobuf.Empty) | [.protos.ChaincodeEvent](#protos.ChaincodeEvent) stream | | + + + + + +## Scalar Value Types + +| .proto Type | Notes | C++ Type | Java Type | Python Type | +| ----------- | ----- | -------- | --------- | ----------- | +| double | | double | double | float | +| float | | float | float | float | +| int32 | Uses variable-length encoding. Inefficient for encoding negative numbers – if your field is likely to have negative values, use sint32 instead. | int32 | int | int | +| int64 | Uses variable-length encoding. Inefficient for encoding negative numbers – if your field is likely to have negative values, use sint64 instead. | int64 | long | int/long | +| uint32 | Uses variable-length encoding. | uint32 | int | int/long | +| uint64 | Uses variable-length encoding. | uint64 | long | int/long | +| sint32 | Uses variable-length encoding. Signed int value. These more efficiently encode negative numbers than regular int32s. | int32 | int | int | +| sint64 | Uses variable-length encoding. Signed int value. These more efficiently encode negative numbers than regular int64s. | int64 | long | int/long | +| fixed32 | Always four bytes. More efficient than uint32 if values are often greater than 2^28. | uint32 | int | int | +| fixed64 | Always eight bytes. More efficient than uint64 if values are often greater than 2^56. | uint64 | long | int/long | +| sfixed32 | Always four bytes. | int32 | int | int | +| sfixed64 | Always eight bytes. | int64 | long | int/long | +| bool | | bool | boolean | boolean | +| string | A string must always contain UTF-8 encoded or 7-bit ASCII text. | string | String | str/unicode | +| bytes | May contain any arbitrary sequence of bytes. | string | ByteString | str | + diff --git a/gateway/events/event.proto b/gateway/events/event.proto new file mode 100644 index 00000000..357c1e10 --- /dev/null +++ b/gateway/events/event.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +package events; + +option go_package = ".;events"; + +import "google/api/annotations.proto"; +import "google/protobuf/empty.proto"; +import "github.com/hyperledger/fabric/protos/peer/chaincode_event.proto"; + +service ChaincodeEvent { + rpc EventStream (google.protobuf.Empty) returns (stream protos.ChaincodeEvent) { + option (google.api.http) = { + get: "/event" + }; + }; +} \ No newline at end of file diff --git a/gateway/events/event.swagger.json b/gateway/events/event.swagger.json new file mode 100644 index 00000000..9c33b913 --- /dev/null +++ b/gateway/events/event.swagger.json @@ -0,0 +1,107 @@ +{ + "swagger": "2.0", + "info": { + "title": "event.proto", + "version": "version not set" + }, + "schemes": [ + "http", + "https" + ], + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "paths": { + "/event": { + "get": { + "operationId": "EventStream", + "responses": { + "200": { + "description": "A successful response.(streaming responses)", + "schema": { + "$ref": "#/x-stream-definitions/protosChaincodeEvent" + } + } + }, + "tags": [ + "ChaincodeEvent" + ] + } + } + }, + "definitions": { + "protobufAny": { + "type": "object", + "properties": { + "type_url": { + "type": "string" + }, + "value": { + "type": "string", + "format": "byte" + } + } + }, + "protosChaincodeEvent": { + "type": "object", + "properties": { + "chaincode_id": { + "type": "string" + }, + "tx_id": { + "type": "string" + }, + "event_name": { + "type": "string" + }, + "payload": { + "type": "string", + "format": "byte" + } + }, + "title": "ChaincodeEvent is used for events and registrations that are specific to chaincode\nstring type - \"chaincode\"" + }, + "runtimeStreamError": { + "type": "object", + "properties": { + "grpc_code": { + "type": "integer", + "format": "int32" + }, + "http_code": { + "type": "integer", + "format": "int32" + }, + "message": { + "type": "string" + }, + "http_status": { + "type": "string" + }, + "details": { + "type": "array", + "items": { + "$ref": "#/definitions/protobufAny" + } + } + } + } + }, + "x-stream-definitions": { + "protosChaincodeEvent": { + "type": "object", + "properties": { + "result": { + "$ref": "#/definitions/protosChaincodeEvent" + }, + "error": { + "$ref": "#/definitions/runtimeStreamError" + } + }, + "title": "Stream result of protosChaincodeEvent" + } + } +} diff --git a/gateway/events/service.go b/gateway/events/service.go new file mode 100644 index 00000000..84ac7a03 --- /dev/null +++ b/gateway/events/service.go @@ -0,0 +1,42 @@ +package events + +import ( + "io" + + "github.com/golang/protobuf/ptypes/empty" + "github.com/s7techlab/cckit/gateway" +) + +//go:generate make generate + +type ChaincodeEventGateway struct { + gateway.Chaincode +} + +// ApiDef returns service definition +func (c *ChaincodeEventGateway) ApiDef() gateway.ServiceDef { + return gateway.ServiceDef{ + Desc: &_ChaincodeEvent_serviceDesc, + Service: c, + HandlerFromEndpointRegister: RegisterChaincodeEventHandlerFromEndpoint, + } +} + +func (s *ChaincodeEventGateway) EventStream(_ *empty.Empty, stream ChaincodeEvent_EventStreamServer) error { + sub, err := s.Chaincode.Events(stream.Context()) + if err != nil { + return err + } + + defer sub.Close() + for ev := range sub.Events() { + if ev != nil { + errS := stream.Send(ev) + if errS == io.EOF { + return nil + } + } + } + + return nil +}