diff --git a/api/event-source.html b/api/event-source.html index 1b5f84b8f4..3431098950 100644 --- a/api/event-source.html +++ b/api/event-source.html @@ -722,7 +722,7 @@

EventSourceSpec type
-Argo Events common.EventSourceType +github.com/argoproj/argo-events/pkg/apis/common.EventSourceType @@ -2272,5 +2272,5 @@

StripeEventSource

Generated with gen-crd-api-reference-docs -on git commit 02c6134. +on git commit b0ce469.

diff --git a/api/event-source.md b/api/event-source.md index db65b58b5c..198b87bc89 100644 --- a/api/event-source.md +++ b/api/event-source.md @@ -1397,7 +1397,8 @@ Generic event source -type
Argo Events common.EventSourceType +type
+github.com/argoproj/argo-events/pkg/apis/common.EventSourceType @@ -4526,6 +4527,6 @@ all types of events will be processed. More info at

Generated with gen-crd-api-reference-docs on git -commit 02c6134. +commit b0ce469.

diff --git a/api/gateway.html b/api/gateway.html index a02c7628ba..e150e88191 100644 --- a/api/gateway.html +++ b/api/gateway.html @@ -136,7 +136,7 @@

Gateway type
-Argo Events common.EventSourceType +github.com/argoproj/argo-events/pkg/apis/common.EventSourceType @@ -186,7 +186,7 @@

Gateway eventProtocol
-Argo Events common.EventProtocol +github.com/argoproj/argo-events/pkg/apis/common.EventProtocol @@ -303,7 +303,7 @@

GatewaySpec type
-Argo Events common.EventSourceType +github.com/argoproj/argo-events/pkg/apis/common.EventSourceType @@ -353,7 +353,7 @@

GatewaySpec eventProtocol
-Argo Events common.EventProtocol +github.com/argoproj/argo-events/pkg/apis/common.EventProtocol @@ -669,5 +669,5 @@

Subscribers

Generated with gen-crd-api-reference-docs -on git commit 02c6134. +on git commit b0ce469.

diff --git a/api/gateway.md b/api/gateway.md index fc177985df..65d2f742ce 100644 --- a/api/gateway.md +++ b/api/gateway.md @@ -270,7 +270,8 @@ configurations for the gateway -type
Argo Events common.EventSourceType +type
+github.com/argoproj/argo-events/pkg/apis/common.EventSourceType @@ -357,8 +358,8 @@ Port on which the gateway event source processor is running on. -eventProtocol
Argo Events common.EventProtocol - +eventProtocol
+github.com/argoproj/argo-events/pkg/apis/common.EventProtocol @@ -600,7 +601,8 @@ configurations for the gateway -type
Argo Events common.EventSourceType +type
+github.com/argoproj/argo-events/pkg/apis/common.EventSourceType @@ -687,8 +689,8 @@ Port on which the gateway event source processor is running on. -eventProtocol
Argo Events common.EventProtocol - +eventProtocol
+github.com/argoproj/argo-events/pkg/apis/common.EventProtocol @@ -1321,6 +1323,6 @@ NATS refers to the subscribers over NATS protocol.

Generated with gen-crd-api-reference-docs on git -commit 02c6134. +commit b0ce469.

diff --git a/api/sensor.html b/api/sensor.html index f2b50b12b9..8edb877318 100644 --- a/api/sensor.html +++ b/api/sensor.html @@ -225,7 +225,7 @@

ArtifactLocation s3
-Argo Events common.S3Artifact +github.com/argoproj/argo-events/pkg/apis/common.S3Artifact @@ -421,6 +421,93 @@

ConfigmapArtifact +

CustomTrigger +

+

+(Appears on: +TriggerTemplate) +

+

+

CustomTrigger refers to the specification of the custom trigger.

+

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
FieldDescription
+serverURL
+ +string + +
+

ServerURL is the url of the gRPC server that executes custom trigger

+
+secure
+ +bool + +
+

Secure refers to type of the connection between sensor to custom trigger gRPC

+
+certFilePath
+ +string + +
+

CertFilePath is path to the cert file within sensor for secure connection between sensor and custom trigger gRPC server.

+
+serverNameOverride
+ +string + +
+

ServerNameOverride for the secure connection between sensor and custom trigger gRPC server.

+
+triggerBody
+ +string + +
+

TriggerBody is the custom trigger resource specification that custom trigger gRPC server knows how to interpret.

+
+parameters
+ + +[]TriggerParameter + + +
+

Parameters is the list of parameters that is applied to resolved custom trigger trigger object.

+

DataFilter

@@ -635,7 +722,7 @@

EventDependencyFilter context
-Argo Events common.EventContext +github.com/argoproj/argo-events/pkg/apis/common.EventContext @@ -1336,7 +1423,7 @@

NodeStatus event
-Argo Events common.Event +github.com/argoproj/argo-events/pkg/apis/common.Event @@ -2234,6 +2321,7 @@

TriggerParameter (Appears on: AWSLambdaTrigger, ArgoWorkflowTrigger, +CustomTrigger, HTTPTrigger, OpenFaasTrigger, StandardK8sTrigger, @@ -2570,9 +2658,24 @@

TriggerTemplate +(Optional)

AWSLambda refers to the trigger designed to invoke AWS Lambda function with with on-the-fly constructable payload.

+ + +customTrigger
+ + +CustomTrigger + + + + +(Optional) +

CustomTrigger refers to the trigger designed to connect to a gRPC trigger server and execute a custom trigger.

+ +

URLArtifact @@ -2619,5 +2722,5 @@

URLArtifact

Generated with gen-crd-api-reference-docs -on git commit 02c6134. +on git commit b0ce469.

diff --git a/api/sensor.md b/api/sensor.md index 397eb00e52..f4060ef8ce 100644 --- a/api/sensor.md +++ b/api/sensor.md @@ -473,7 +473,8 @@ Description -s3
Argo Events common.S3Artifact +s3
+github.com/argoproj/argo-events/pkg/apis/common.S3Artifact @@ -855,6 +856,184 @@ Key within configmap data which contains trigger resource definition +

+ +CustomTrigger + +

+ +

+ +(Appears on: +TriggerTemplate) + +

+ +

+ +

+ +CustomTrigger refers to the specification of the custom trigger. + +

+ +

+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ +Field + + + +Description + +
+ +serverURL
string + +
+ +

+ +ServerURL is the url of the gRPC server that executes custom trigger + +

+ +
+ +secure
bool + +
+ +

+ +Secure refers to type of the connection between sensor to custom trigger +gRPC + +

+ +
+ +certFilePath
string + +
+ +

+ +CertFilePath is path to the cert file within sensor for secure +connection between sensor and custom trigger gRPC server. + +

+ +
+ +serverNameOverride
string + +
+ +

+ +ServerNameOverride for the secure connection between sensor and custom +trigger gRPC server. + +

+ +
+ +triggerBody
string + +
+ +

+ +TriggerBody is the custom trigger resource specification that custom +trigger gRPC server knows how to interpret. + +

+ +
+ +parameters
+ \[\]TriggerParameter + + +
+ +

+ +Parameters is the list of parameters that is applied to resolved custom +trigger trigger object. + +

+ +
+

DataFilter @@ -1294,7 +1473,8 @@ Time filter on the event with escalation -context
Argo Events common.EventContext +context
+github.com/argoproj/argo-events/pkg/apis/common.EventContext @@ -2725,7 +2905,8 @@ events -event
Argo Events common.Event +event
+github.com/argoproj/argo-events/pkg/apis/common.Event @@ -4481,6 +4662,7 @@ TriggerParameter (Appears on: AWSLambdaTrigger, ArgoWorkflowTrigger, +CustomTrigger, HTTPTrigger, OpenFaasTrigger, StandardK8sTrigger, @@ -5160,6 +5342,8 @@ with with on-the-fly constructable payload. +(Optional) +

AWSLambda refers to the trigger designed to invoke AWS Lambda function @@ -5171,6 +5355,30 @@ with with on-the-fly constructable payload. + + + + +customTrigger
+ CustomTrigger + + + + + +(Optional) + +

+ +CustomTrigger refers to the trigger designed to connect to a gRPC +trigger server and execute a custom trigger. + +

+ + + + + @@ -5271,6 +5479,6 @@ VerifyCert decides whether the connection is secure or not

Generated with gen-crd-api-reference-docs on git -commit 02c6134. +commit b0ce469.

diff --git a/controllers/sensor/validate.go b/controllers/sensor/validate.go index b5139bf692..5531b04c70 100644 --- a/controllers/sensor/validate.go +++ b/controllers/sensor/validate.go @@ -18,13 +18,13 @@ package sensor import ( "fmt" - "github.com/pkg/errors" "net/http" "time" "github.com/Knetic/govaluate" "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1" + "github.com/pkg/errors" ) // ValidateSensor accepts a sensor and performs validation against it @@ -141,6 +141,11 @@ func validateTriggerTemplate(template *v1alpha1.TriggerTemplate) error { return errors.Wrapf(err, "template %s is invalid", template.Name) } } + if template.CustomTrigger != nil { + if err := validateCustomTrigger(template.CustomTrigger); err != nil { + return errors.Wrapf(err, "template %s is invalid", template.Name) + } + } return nil } @@ -211,6 +216,13 @@ func validateHTTPTrigger(trigger *v1alpha1.HTTPTrigger) error { return errors.New("only GET, DELETE, PATCH, POST and PUT methods are supported") } } + if trigger.Parameters != nil { + for i, parameter := range trigger.Parameters { + if err := validateTriggerParameter(¶meter); err != nil { + return errors.Errorf("resource parameter index: %d. err: %+v", i, err) + } + } + } return nil } @@ -228,6 +240,13 @@ func validateOpenFaasTrigger(trigger *v1alpha1.OpenFaasTrigger) error { if trigger.Password != nil && trigger.Namespace == "" { return errors.New("namespace can't be empty when password secret selector is specified") } + if trigger.Parameters != nil { + for i, parameter := range trigger.Parameters { + if err := validateTriggerParameter(¶meter); err != nil { + return errors.Errorf("resource parameter index: %d. err: %+v", i, err) + } + } + } return nil } @@ -251,6 +270,39 @@ func validateAWSLambdaTrigger(trigger *v1alpha1.AWSLambdaTrigger) error { if trigger.Payload == nil { return errors.New("payload parameters are not specified") } + if trigger.Parameters != nil { + for i, parameter := range trigger.Parameters { + if err := validateTriggerParameter(¶meter); err != nil { + return errors.Errorf("resource parameter index: %d. err: %+v", i, err) + } + } + } + return nil +} + +// validateCustomTrigger validates the custom trigger. +func validateCustomTrigger(trigger *v1alpha1.CustomTrigger) error { + if trigger == nil { + return errors.New("custom trigger for can't be nil") + } + if trigger.ServerURL == "" { + return errors.New("custom trigger gRPC server url is not defined") + } + if trigger.TriggerBody == "" { + return errors.New("trigger body can't be empty") + } + if trigger.Secure { + if trigger.CertFilePath == "" { + return errors.New("cert file path can't be nil when the trigger server connection is secure") + } + } + if trigger.Parameters != nil { + for i, parameter := range trigger.Parameters { + if err := validateTriggerParameter(¶meter); err != nil { + return errors.Errorf("resource parameter index: %d. err: %+v", i, err) + } + } + } return nil } diff --git a/gateways/server/amqp/start.go b/gateways/server/amqp/start.go index 7bc48699f3..202f5167cd 100644 --- a/gateways/server/amqp/start.go +++ b/gateways/server/amqp/start.go @@ -22,7 +22,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/gateways" "github.com/argoproj/argo-events/gateways/server" - apicommon "github.com/argoproj/argo-events/pkg/apis/common" + "github.com/argoproj/argo-events/pkg/apis/events" "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1" "github.com/ghodss/yaml" "github.com/pkg/errors" @@ -96,7 +96,7 @@ func (listener *EventListener) listenEvents(eventSource *gateways.EventSource, c select { case msg := <-delivery: logger.WithField("message-id", msg.MessageId).Infoln("received the message") - body := &apicommon.AMQPEventData{ + body := &events.AMQPEventData{ ContentType: msg.ContentType, ContentEncoding: msg.ContentEncoding, DeliveryMode: int(msg.DeliveryMode), diff --git a/gateways/server/aws-sns/start.go b/gateways/server/aws-sns/start.go index 833f3e5dc3..440ef1653a 100644 --- a/gateways/server/aws-sns/start.go +++ b/gateways/server/aws-sns/start.go @@ -27,7 +27,7 @@ import ( "github.com/argoproj/argo-events/gateways/server" commonaws "github.com/argoproj/argo-events/gateways/server/common/aws" "github.com/argoproj/argo-events/gateways/server/common/webhook" - apicommon "github.com/argoproj/argo-events/pkg/apis/common" + "github.com/argoproj/argo-events/pkg/apis/events" "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1" snslib "github.com/aws/aws-sdk-go/service/sns" "github.com/ghodss/yaml" @@ -109,7 +109,7 @@ func (router *Router) HandleRoute(writer http.ResponseWriter, request *http.Requ case messageTypeNotification: logger.Infoln("dispatching notification on route's data channel") - eventData := &apicommon.SNSEventData{Body: body} + eventData := &events.SNSEventData{Body: body} eventBytes, err := json.Marshal(eventData) if err != nil { logger.WithError(err).Error("failed to marshal the event data") diff --git a/gateways/server/aws-sqs/start.go b/gateways/server/aws-sqs/start.go index bd490bb9cf..9819bdec50 100644 --- a/gateways/server/aws-sqs/start.go +++ b/gateways/server/aws-sqs/start.go @@ -18,11 +18,12 @@ package aws_sqs import ( "encoding/json" + "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/gateways" "github.com/argoproj/argo-events/gateways/server" commonaws "github.com/argoproj/argo-events/gateways/server/common/aws" - common2 "github.com/argoproj/argo-events/pkg/apis/common" + "github.com/argoproj/argo-events/pkg/apis/events" "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" @@ -107,7 +108,7 @@ func (listener *EventListener) listenEvents(eventSource *gateways.EventSource, c if msg != nil && len(msg.Messages) > 0 { message := *msg.Messages[0] - data := &common2.SQSEventData{ + data := &events.SQSEventData{ MessageId: *message.MessageId, MessageAttributes: message.MessageAttributes, Body: []byte(*message.Body), diff --git a/gateways/server/azure-events-hub/start.go b/gateways/server/azure-events-hub/start.go index 957ef8d1a0..277a682b2f 100644 --- a/gateways/server/azure-events-hub/start.go +++ b/gateways/server/azure-events-hub/start.go @@ -25,7 +25,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/gateways" "github.com/argoproj/argo-events/gateways/server" - common2 "github.com/argoproj/argo-events/pkg/apis/common" + "github.com/argoproj/argo-events/pkg/apis/events" "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1" "github.com/ghodss/yaml" "github.com/pkg/errors" @@ -88,7 +88,7 @@ func (listener *EventListener) listenEvents(eventSource *gateways.EventSource, c } handler := func(c context.Context, event *eventhub.Event) error { - eventData := &common2.AzureEventsHubEventData{ + eventData := &events.AzureEventsHubEventData{ Id: event.ID, PartitionKey: *event.PartitionKey, Body: event.Data, diff --git a/gateways/server/calendar/start.go b/gateways/server/calendar/start.go index 314fa6a2eb..dae9b1bfbb 100644 --- a/gateways/server/calendar/start.go +++ b/gateways/server/calendar/start.go @@ -23,7 +23,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/gateways" "github.com/argoproj/argo-events/gateways/server" - apicommon "github.com/argoproj/argo-events/pkg/apis/common" + "github.com/argoproj/argo-events/pkg/apis/events" "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1" "github.com/ghodss/yaml" "github.com/pkg/errors" @@ -118,7 +118,7 @@ func (listener *EventListener) listenEvents(eventSource *gateways.EventSource, c if location != nil { lastT = lastT.In(location) } - response := &apicommon.CalendarEventData{ + response := &events.CalendarEventData{ EventTime: tx.String(), UserPayload: calendarEventSource.UserPayload, } diff --git a/gateways/server/calendar/start_test.go b/gateways/server/calendar/start_test.go index d46a3e005a..5367930bae 100644 --- a/gateways/server/calendar/start_test.go +++ b/gateways/server/calendar/start_test.go @@ -24,6 +24,7 @@ import ( "github.com/argoproj/argo-events/gateways" "github.com/argoproj/argo-events/gateways/server" apicommon "github.com/argoproj/argo-events/pkg/apis/common" + "github.com/argoproj/argo-events/pkg/apis/events" "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1" "github.com/ghodss/yaml" "github.com/stretchr/testify/assert" @@ -59,7 +60,7 @@ func TestListenEvents(t *testing.T) { } go func() { data := <-channels.Data - var cal *apicommon.CalendarEventData + var cal *events.CalendarEventData err = yaml.Unmarshal(data, &cal) assert.Nil(t, err) diff --git a/gateways/server/emitter/start.go b/gateways/server/emitter/start.go index ae57bc83fd..14ca80d895 100644 --- a/gateways/server/emitter/start.go +++ b/gateways/server/emitter/start.go @@ -22,7 +22,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/gateways" "github.com/argoproj/argo-events/gateways/server" - apicommon "github.com/argoproj/argo-events/pkg/apis/common" + "github.com/argoproj/argo-events/pkg/apis/events" "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1" emitter "github.com/emitter-io/go/v2" "github.com/ghodss/yaml" @@ -107,7 +107,7 @@ func (listener *EventListener) listenEvents(eventSource *gateways.EventSource, c } if err := client.Subscribe(channelKey, emitterEventSource.ChannelName, func(_ *emitter.Client, message emitter.Message) { - eventBytes, err := json.Marshal(&apicommon.EmitterEventData{ + eventBytes, err := json.Marshal(&events.EmitterEventData{ Topic: message.Topic(), Body: message.Payload(), }) diff --git a/gateways/server/gcp-pubsub/start.go b/gateways/server/gcp-pubsub/start.go index aaecb1ca8a..40f9e77548 100644 --- a/gateways/server/gcp-pubsub/start.go +++ b/gateways/server/gcp-pubsub/start.go @@ -25,7 +25,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/gateways" "github.com/argoproj/argo-events/gateways/server" - apicommon "github.com/argoproj/argo-events/pkg/apis/common" + "github.com/argoproj/argo-events/pkg/apis/events" "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1" "github.com/ghodss/yaml" "github.com/pkg/errors" @@ -132,7 +132,7 @@ func (listener *EventListener) listenEvents(eventSource *gateways.EventSource, c logger.Infoln("listening for messages from PubSub...") err = subscription.Receive(ctx, func(msgCtx context.Context, m *pubsub.Message) { logger.Info("received GCP PubSub Message from topic") - eventData := &apicommon.PubSubEventData{ + eventData := &events.PubSubEventData{ ID: m.ID, Body: m.Data, Attributes: m.Attributes, diff --git a/gateways/server/kafka/start.go b/gateways/server/kafka/start.go index 5df828ba54..8f5ec7122c 100644 --- a/gateways/server/kafka/start.go +++ b/gateways/server/kafka/start.go @@ -24,7 +24,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/gateways" "github.com/argoproj/argo-events/gateways/server" - apicommon "github.com/argoproj/argo-events/pkg/apis/common" + "github.com/argoproj/argo-events/pkg/apis/events" "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1" "github.com/ghodss/yaml" "github.com/pkg/errors" @@ -120,7 +120,7 @@ func (listener *EventListener) listenEvents(eventSource *gateways.EventSource, c select { case msg := <-partitionConsumer.Messages(): logger.Infoln("dispatching event on the data channel...") - eventData := &apicommon.KafkaEventData{ + eventData := &events.KafkaEventData{ Topic: msg.Topic, Partition: int(msg.Partition), Body: msg.Value, diff --git a/gateways/server/minio/start.go b/gateways/server/minio/start.go index 56b74e6af7..2acac00639 100644 --- a/gateways/server/minio/start.go +++ b/gateways/server/minio/start.go @@ -23,6 +23,7 @@ import ( "github.com/argoproj/argo-events/gateways" "github.com/argoproj/argo-events/gateways/server" apicommon "github.com/argoproj/argo-events/pkg/apis/common" + "github.com/argoproj/argo-events/pkg/apis/events" "github.com/argoproj/argo-events/store" "github.com/ghodss/yaml" "github.com/minio/minio-go" @@ -99,7 +100,7 @@ func (listener *EventListener) listenEvents(eventSource *gateways.EventSource, c continue } - eventData := &apicommon.MinioEventData{Notification: notification.Records} + eventData := &events.MinioEventData{Notification: notification.Records} eventBytes, err := json.Marshal(eventData) if err != nil { logger.WithError(notification.Err).Errorln("failed to marshal the event data, rejecting the event...") diff --git a/gateways/server/mqtt/start.go b/gateways/server/mqtt/start.go index 6f148fdd3e..d5fe0ea7b0 100644 --- a/gateways/server/mqtt/start.go +++ b/gateways/server/mqtt/start.go @@ -22,7 +22,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/gateways" "github.com/argoproj/argo-events/gateways/server" - apicommon "github.com/argoproj/argo-events/pkg/apis/common" + "github.com/argoproj/argo-events/pkg/apis/events" "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1" mqttlib "github.com/eclipse/paho.mqtt.golang" "github.com/ghodss/yaml" @@ -68,7 +68,7 @@ func (listener *EventListener) listenEvents(eventSource *gateways.EventSource, c logger.Infoln("setting up the message handler...") handler := func(c mqttlib.Client, msg mqttlib.Message) { - eventData := &apicommon.MQTTEventData{ + eventData := &events.MQTTEventData{ Topic: msg.Topic(), MessageId: int(msg.MessageID()), Body: msg.Payload(), diff --git a/gateways/server/nats/start.go b/gateways/server/nats/start.go index 895103373b..3039a3a30d 100644 --- a/gateways/server/nats/start.go +++ b/gateways/server/nats/start.go @@ -22,7 +22,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/gateways" "github.com/argoproj/argo-events/gateways/server" - apicommon "github.com/argoproj/argo-events/pkg/apis/common" + "github.com/argoproj/argo-events/pkg/apis/events" "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1" "github.com/ghodss/yaml" natslib "github.com/nats-io/go-nats" @@ -81,7 +81,7 @@ func (listener *EventListener) listenEvents(eventSource *gateways.EventSource, c logger.Info("subscribing to messages on the queue...") _, err := conn.Subscribe(natsEventSource.Subject, func(msg *natslib.Msg) { - eventData := &apicommon.NATSEventData{ + eventData := &events.NATSEventData{ Subject: msg.Subject, Body: msg.Data, } diff --git a/gateways/server/nsq/start.go b/gateways/server/nsq/start.go index 8e3f2e39ef..c418f5704f 100644 --- a/gateways/server/nsq/start.go +++ b/gateways/server/nsq/start.go @@ -23,7 +23,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/gateways" "github.com/argoproj/argo-events/gateways/server" - apicommon "github.com/argoproj/argo-events/pkg/apis/common" + "github.com/argoproj/argo-events/pkg/apis/events" "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1" "github.com/ghodss/yaml" "github.com/nsqio/go-nsq" @@ -102,7 +102,7 @@ func (listener *EventListener) listenEvents(eventSource *gateways.EventSource, c // HandleMessage implements the Handler interface. func (h *messageHandler) HandleMessage(m *nsq.Message) error { h.logger.Infoln("received a message") - eventData := &apicommon.NSQEventData{ + eventData := &events.NSQEventData{ Body: m.Body, Timestamp: strconv.Itoa(int(m.Timestamp)), NSQDAddress: m.NSQDAddress, diff --git a/gateways/server/redis/start.go b/gateways/server/redis/start.go index c6cba05ce9..41a6646dc3 100644 --- a/gateways/server/redis/start.go +++ b/gateways/server/redis/start.go @@ -22,7 +22,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/gateways" "github.com/argoproj/argo-events/gateways/server" - apicommon "github.com/argoproj/argo-events/pkg/apis/common" + "github.com/argoproj/argo-events/pkg/apis/events" "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1" "github.com/ghodss/yaml" "github.com/go-redis/redis" @@ -98,7 +98,7 @@ func (listener *EventListener) listenEvents(eventSource *gateways.EventSource, c select { case message := <-ch: logger.WithField("channel", message.Channel).Infoln("received a message") - eventData := &apicommon.RedisEventData{ + eventData := &events.RedisEventData{ Channel: message.Channel, Pattern: message.Pattern, Body: message.Payload, diff --git a/gateways/server/resource/start.go b/gateways/server/resource/start.go index cd002f145a..d73e1f03be 100644 --- a/gateways/server/resource/start.go +++ b/gateways/server/resource/start.go @@ -24,7 +24,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/gateways" "github.com/argoproj/argo-events/gateways/server" - apicommon "github.com/argoproj/argo-events/pkg/apis/common" + "github.com/argoproj/argo-events/pkg/apis/events" "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1" "github.com/ghodss/yaml" "github.com/pkg/errors" @@ -138,7 +138,7 @@ func (listener *EventListener) listenEvents(eventSource *gateways.EventSource, c continue } - eventData := &apicommon.ResourceEventData{ + eventData := &events.ResourceEventData{ EventType: string(event.Type), Body: objBody, Group: resourceEventSource.Group, diff --git a/gateways/server/webhook/start.go b/gateways/server/webhook/start.go index e76de21d49..38acedc588 100644 --- a/gateways/server/webhook/start.go +++ b/gateways/server/webhook/start.go @@ -24,7 +24,7 @@ import ( "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/gateways" "github.com/argoproj/argo-events/gateways/server/common/webhook" - apicommon "github.com/argoproj/argo-events/pkg/apis/common" + "github.com/argoproj/argo-events/pkg/apis/events" "github.com/ghodss/yaml" "github.com/sirupsen/logrus" ) @@ -89,7 +89,7 @@ func (router *Router) HandleRoute(writer http.ResponseWriter, request *http.Requ return } - payload := &apicommon.WebhookEventData{ + payload := &events.WebhookEventData{ Header: request.Header, Body: (*json.RawMessage)(&body), } diff --git a/hack/update-api-docs.sh b/hack/update-api-docs.sh old mode 100644 new mode 100755 diff --git a/hack/update-openapigen.sh b/hack/update-openapigen.sh index c8f04a3dd2..92fc82f823 100755 --- a/hack/update-openapigen.sh +++ b/hack/update-openapigen.sh @@ -28,11 +28,3 @@ go run ${CODEGEN_PKG}/cmd/openapi-gen/openapi-gen.go \ --input-dirs github.com/argoproj/argo-events/pkg/apis/eventsources/${VERSION} \ --output-package github.com/argoproj/argo-events/pkg/apis/eventsources/${VERSION} \ $@ - -# Common -go run ${CODEGEN_PKG}/cmd/openapi-gen/openapi-gen.go \ - --go-header-file ${PROJECT_ROOT}/hack/custom-boilerplate.go.txt \ - --input-dirs github.com/argoproj/argo-events/pkg/apis/common \ - --output-package github.com/argoproj/argo-events/pkg/apis/common \ - $@ - diff --git a/pkg/apis/common/openapi_generated.go b/pkg/apis/common/openapi_generated.go index 14eaf67425..bcf604aaa5 100644 --- a/pkg/apis/common/openapi_generated.go +++ b/pkg/apis/common/openapi_generated.go @@ -29,14 +29,234 @@ import ( func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenAPIDefinition { return map[string]common.OpenAPIDefinition{ - "github.com/argoproj/argo-events/pkg/apis/common.Event": schema_argo_events_pkg_apis_common_Event(ref), - "github.com/argoproj/argo-events/pkg/apis/common.EventContext": schema_argo_events_pkg_apis_common_EventContext(ref), - "github.com/argoproj/argo-events/pkg/apis/common.EventProtocol": schema_argo_events_pkg_apis_common_EventProtocol(ref), - "github.com/argoproj/argo-events/pkg/apis/common.Http": schema_argo_events_pkg_apis_common_Http(ref), - "github.com/argoproj/argo-events/pkg/apis/common.Nats": schema_argo_events_pkg_apis_common_Nats(ref), - "github.com/argoproj/argo-events/pkg/apis/common.S3Artifact": schema_argo_events_pkg_apis_common_S3Artifact(ref), - "github.com/argoproj/argo-events/pkg/apis/common.S3Bucket": schema_argo_events_pkg_apis_common_S3Bucket(ref), - "github.com/argoproj/argo-events/pkg/apis/common.S3Filter": schema_argo_events_pkg_apis_common_S3Filter(ref), + "github.com/argoproj/argo-events/pkg/apis/common.AMQPEventData": schema_argo_events_pkg_apis_common_AMQPEventData(ref), + "github.com/argoproj/argo-events/pkg/apis/common.AzureEventsHubEventData": schema_argo_events_pkg_apis_common_AzureEventsHubEventData(ref), + "github.com/argoproj/argo-events/pkg/apis/common.CalendarEventData": schema_argo_events_pkg_apis_common_CalendarEventData(ref), + "github.com/argoproj/argo-events/pkg/apis/common.EmitterEventData": schema_argo_events_pkg_apis_common_EmitterEventData(ref), + "github.com/argoproj/argo-events/pkg/apis/common.Event": schema_argo_events_pkg_apis_common_Event(ref), + "github.com/argoproj/argo-events/pkg/apis/common.EventContext": schema_argo_events_pkg_apis_common_EventContext(ref), + "github.com/argoproj/argo-events/pkg/apis/common.EventProtocol": schema_argo_events_pkg_apis_common_EventProtocol(ref), + "github.com/argoproj/argo-events/pkg/apis/common.GitLabEventData": schema_argo_events_pkg_apis_common_GitLabEventData(ref), + "github.com/argoproj/argo-events/pkg/apis/common.GithubEventData": schema_argo_events_pkg_apis_common_GithubEventData(ref), + "github.com/argoproj/argo-events/pkg/apis/common.Http": schema_argo_events_pkg_apis_common_Http(ref), + "github.com/argoproj/argo-events/pkg/apis/common.KafkaEventData": schema_argo_events_pkg_apis_common_KafkaEventData(ref), + "github.com/argoproj/argo-events/pkg/apis/common.MQTTEventData": schema_argo_events_pkg_apis_common_MQTTEventData(ref), + "github.com/argoproj/argo-events/pkg/apis/common.MinioEventData": schema_argo_events_pkg_apis_common_MinioEventData(ref), + "github.com/argoproj/argo-events/pkg/apis/common.NATSEventData": schema_argo_events_pkg_apis_common_NATSEventData(ref), + "github.com/argoproj/argo-events/pkg/apis/common.NSQEventData": schema_argo_events_pkg_apis_common_NSQEventData(ref), + "github.com/argoproj/argo-events/pkg/apis/common.Nats": schema_argo_events_pkg_apis_common_Nats(ref), + "github.com/argoproj/argo-events/pkg/apis/common.PubSubEventData": schema_argo_events_pkg_apis_common_PubSubEventData(ref), + "github.com/argoproj/argo-events/pkg/apis/common.RedisEventData": schema_argo_events_pkg_apis_common_RedisEventData(ref), + "github.com/argoproj/argo-events/pkg/apis/common.ResourceEventData": schema_argo_events_pkg_apis_common_ResourceEventData(ref), + "github.com/argoproj/argo-events/pkg/apis/common.S3Artifact": schema_argo_events_pkg_apis_common_S3Artifact(ref), + "github.com/argoproj/argo-events/pkg/apis/common.S3Bucket": schema_argo_events_pkg_apis_common_S3Bucket(ref), + "github.com/argoproj/argo-events/pkg/apis/common.S3Filter": schema_argo_events_pkg_apis_common_S3Filter(ref), + "github.com/argoproj/argo-events/pkg/apis/common.SNSEventData": schema_argo_events_pkg_apis_common_SNSEventData(ref), + "github.com/argoproj/argo-events/pkg/apis/common.SQSEventData": schema_argo_events_pkg_apis_common_SQSEventData(ref), + "github.com/argoproj/argo-events/pkg/apis/common.WebhookEventData": schema_argo_events_pkg_apis_common_WebhookEventData(ref), + } +} + +func schema_argo_events_pkg_apis_common_AMQPEventData(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "AMQPEventData represents the event data generated by AMQP gateway.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "contentType": { + SchemaProps: spec.SchemaProps{ + Description: "ContentType is the MIME content type", + Type: []string{"string"}, + Format: "", + }, + }, + "contentEncoding": { + SchemaProps: spec.SchemaProps{ + Description: "ContentEncoding is the MIME content encoding", + Type: []string{"string"}, + Format: "", + }, + }, + "deliveryMode": { + SchemaProps: spec.SchemaProps{ + Description: "Delivery mode can be either - non-persistent (1) or persistent (2)", + Type: []string{"integer"}, + Format: "int32", + }, + }, + "priority": { + SchemaProps: spec.SchemaProps{ + Description: "Priority refers to the use - 0 to 9", + Type: []string{"integer"}, + Format: "int32", + }, + }, + "correlationId": { + SchemaProps: spec.SchemaProps{ + Description: "CorrelationId is the correlation identifier", + Type: []string{"string"}, + Format: "", + }, + }, + "replyTo": { + SchemaProps: spec.SchemaProps{ + Description: "ReplyTo is the address to reply to (ex: RPC)", + Type: []string{"string"}, + Format: "", + }, + }, + "expiration": { + SchemaProps: spec.SchemaProps{ + Description: "Expiration refers to message expiration spec", + Type: []string{"string"}, + Format: "", + }, + }, + "messageId": { + SchemaProps: spec.SchemaProps{ + Description: "MessageId is message identifier", + Type: []string{"string"}, + Format: "", + }, + }, + "timestamp": { + SchemaProps: spec.SchemaProps{ + Description: "Timestamp refers to the message timestamp", + Type: []string{"string"}, + Format: "", + }, + }, + "type": { + SchemaProps: spec.SchemaProps{ + Description: "Type refers to the message type name", + Type: []string{"string"}, + Format: "", + }, + }, + "appId": { + SchemaProps: spec.SchemaProps{ + Description: "AppId refers to the application id", + Type: []string{"string"}, + Format: "", + }, + }, + "exchange": { + SchemaProps: spec.SchemaProps{ + Description: "Exchange is basic.publish exchange", + Type: []string{"string"}, + Format: "", + }, + }, + "routingKey": { + SchemaProps: spec.SchemaProps{ + Description: "RoutingKey is basic.publish routing key", + Type: []string{"string"}, + Format: "", + }, + }, + "body": { + SchemaProps: spec.SchemaProps{ + Description: "Body represents the messsage body", + Type: []string{"string"}, + Format: "byte", + }, + }, + }, + Required: []string{"contentType", "contentEncoding", "deliveryMode", "priority", "correlationId", "replyTo", "expiration", "messageId", "timestamp", "type", "appId", "exchange", "routingKey", "body"}, + }, + }, + } +} + +func schema_argo_events_pkg_apis_common_AzureEventsHubEventData(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "AzureEventsHubEventData represents to the event data generated by Azure Events Hub gateway.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "id": { + SchemaProps: spec.SchemaProps{ + Description: "Id of the message", + Type: []string{"string"}, + Format: "", + }, + }, + "partitionKey": { + SchemaProps: spec.SchemaProps{ + Description: "PartitionKey", + Type: []string{"string"}, + Format: "", + }, + }, + "body": { + SchemaProps: spec.SchemaProps{ + Description: "Message body", + Type: []string{"string"}, + Format: "byte", + }, + }, + }, + Required: []string{"id", "partitionKey", "body"}, + }, + }, + } +} + +func schema_argo_events_pkg_apis_common_CalendarEventData(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "CalendarEventData represents the event data generated by the Calendar gateway.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "eventTime": { + SchemaProps: spec.SchemaProps{ + Description: "EventTime is time at which event occurred", + Type: []string{"string"}, + Format: "", + }, + }, + "userPayload": { + SchemaProps: spec.SchemaProps{ + Description: "UserPayload if any", + Type: []string{"string"}, + Format: "byte", + }, + }, + }, + Required: []string{"eventTime"}, + }, + }, + } +} + +func schema_argo_events_pkg_apis_common_EmitterEventData(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "EmitterEventData represents the event data generated by the Emitter gateway.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "topic": { + SchemaProps: spec.SchemaProps{ + Description: "Topic name", + Type: []string{"string"}, + Format: "", + }, + }, + "body": { + SchemaProps: spec.SchemaProps{ + Description: "Body represents the message body", + Type: []string{"string"}, + Format: "byte", + }, + }, + }, + Required: []string{"topic", "body"}, + }, + }, } } @@ -168,6 +388,48 @@ func schema_argo_events_pkg_apis_common_EventProtocol(ref common.ReferenceCallba } } +func schema_argo_events_pkg_apis_common_GitLabEventData(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "GitLabEventData represents the event data generated by the GitLab gateway.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "body": { + SchemaProps: spec.SchemaProps{ + Description: "Body represents the message body", + Type: []string{"string"}, + Format: "byte", + }, + }, + }, + Required: []string{"body"}, + }, + }, + } +} + +func schema_argo_events_pkg_apis_common_GithubEventData(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "GithubEventData represents the event data generated by the GitHub gateway.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "body": { + SchemaProps: spec.SchemaProps{ + Description: "Body represents the message body", + Type: []string{"string"}, + Format: "byte", + }, + }, + }, + Required: []string{"body"}, + }, + }, + } +} + func schema_argo_events_pkg_apis_common_Http(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ @@ -219,6 +481,174 @@ func schema_argo_events_pkg_apis_common_Http(ref common.ReferenceCallback) commo } } +func schema_argo_events_pkg_apis_common_KafkaEventData(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "KafkaEventData represents the event data generated by the Kafka gateway.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "topic": { + SchemaProps: spec.SchemaProps{ + Description: "Topic refers to the Kafka topic", + Type: []string{"string"}, + Format: "", + }, + }, + "partition": { + SchemaProps: spec.SchemaProps{ + Description: "Partition refers to the Kafka partition", + Type: []string{"integer"}, + Format: "int32", + }, + }, + "value": { + SchemaProps: spec.SchemaProps{ + Description: "Body refers to the message value", + Type: []string{"string"}, + Format: "byte", + }, + }, + "timestamp": { + SchemaProps: spec.SchemaProps{ + Description: "Timestamp of the message", + Type: []string{"string"}, + Format: "", + }, + }, + }, + Required: []string{"topic", "partition", "value", "timestamp"}, + }, + }, + } +} + +func schema_argo_events_pkg_apis_common_MQTTEventData(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "MQTTEventData represents the event data generated by the MQTT gateway.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "topic": { + SchemaProps: spec.SchemaProps{ + Description: "Topic refers to the MQTT topic name.", + Type: []string{"string"}, + Format: "", + }, + }, + "messageId": { + SchemaProps: spec.SchemaProps{ + Description: "MessageId is the unique ID for the message", + Type: []string{"integer"}, + Format: "int32", + }, + }, + "payload": { + SchemaProps: spec.SchemaProps{ + Description: "Payload is the message payload.", + Type: []string{"string"}, + Format: "byte", + }, + }, + }, + Required: []string{"topic", "messageId", "payload"}, + }, + }, + } +} + +func schema_argo_events_pkg_apis_common_MinioEventData(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "MinioEventData represents the event data generated by the Minio gateway.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "notification": { + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Ref: ref("github.com/minio/minio-go.NotificationEvent"), + }, + }, + }, + }, + }, + }, + Required: []string{"notification"}, + }, + }, + Dependencies: []string{ + "github.com/minio/minio-go.NotificationEvent"}, + } +} + +func schema_argo_events_pkg_apis_common_NATSEventData(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "NATSEventData represents the event data generated by the NATS gateway.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "subject": { + SchemaProps: spec.SchemaProps{ + Description: "Name of the subject.", + Type: []string{"string"}, + Format: "", + }, + }, + "data": { + SchemaProps: spec.SchemaProps{ + Description: "Message data.", + Type: []string{"string"}, + Format: "byte", + }, + }, + }, + Required: []string{"subject", "data"}, + }, + }, + } +} + +func schema_argo_events_pkg_apis_common_NSQEventData(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "NSQEventData represents the event data generated by the NSQ gateway.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "Body": { + SchemaProps: spec.SchemaProps{ + Description: "Body is the message data.", + Type: []string{"string"}, + Format: "byte", + }, + }, + "Timestamp": { + SchemaProps: spec.SchemaProps{ + Description: "Timestamp of the message.", + Type: []string{"string"}, + Format: "", + }, + }, + "NSQDAddress": { + SchemaProps: spec.SchemaProps{ + Description: "NSQDAddress is the address of the nsq host.", + Type: []string{"string"}, + Format: "", + }, + }, + }, + Required: []string{"Body", "Timestamp", "NSQDAddress"}, + }, + }, + } +} + func schema_argo_events_pkg_apis_common_Nats(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ @@ -303,6 +733,140 @@ func schema_argo_events_pkg_apis_common_Nats(ref common.ReferenceCallback) commo } } +func schema_argo_events_pkg_apis_common_PubSubEventData(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "PubSubEventData represents the event data generated by the GCP PubSub gateway.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "id": { + SchemaProps: spec.SchemaProps{ + Description: "ID of the message", + Type: []string{"string"}, + Format: "", + }, + }, + "body": { + SchemaProps: spec.SchemaProps{ + Description: "Body is the actual data in the message.", + Type: []string{"string"}, + Format: "byte", + }, + }, + "attributes": { + SchemaProps: spec.SchemaProps{ + Description: "Attributes represents the key-value pairs the current message is labelled with.", + Type: []string{"object"}, + AdditionalProperties: &spec.SchemaOrBool{ + Allows: true, + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, + "publishTime": { + SchemaProps: spec.SchemaProps{ + Description: "The time at which the message was published.", + Type: []string{"string"}, + Format: "", + }, + }, + }, + Required: []string{"id", "body", "attributes", "publishTime"}, + }, + }, + } +} + +func schema_argo_events_pkg_apis_common_RedisEventData(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "RedisEventData represents the event data generated by the Redis gateway.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "channel": { + SchemaProps: spec.SchemaProps{ + Description: "Subscription channel.", + Type: []string{"string"}, + Format: "", + }, + }, + "pattern": { + SchemaProps: spec.SchemaProps{ + Description: "Message pattern", + Type: []string{"string"}, + Format: "", + }, + }, + "payload": { + SchemaProps: spec.SchemaProps{ + Description: "Message body", + Type: []string{"string"}, + Format: "", + }, + }, + }, + Required: []string{"channel", "pattern", "payload"}, + }, + }, + } +} + +func schema_argo_events_pkg_apis_common_ResourceEventData(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "ResourceEventData represents the event data generated by the Resource gateway.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "type": { + SchemaProps: spec.SchemaProps{ + Description: "EventType of the type of the event.", + Type: []string{"string"}, + Format: "", + }, + }, + "body": { + SchemaProps: spec.SchemaProps{ + Description: "Resource body.", + Type: []string{"string"}, + Format: "byte", + }, + }, + "group": { + SchemaProps: spec.SchemaProps{ + Description: "Resource group name.", + Type: []string{"string"}, + Format: "", + }, + }, + "version": { + SchemaProps: spec.SchemaProps{ + Description: "Resource version.", + Type: []string{"string"}, + Format: "", + }, + }, + "resource": { + SchemaProps: spec.SchemaProps{ + Description: "Resource name.", + Type: []string{"string"}, + Format: "", + }, + }, + }, + Required: []string{"type", "body", "group", "version", "resource"}, + }, + }, + } +} + func schema_argo_events_pkg_apis_common_S3Artifact(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ @@ -426,3 +990,111 @@ func schema_argo_events_pkg_apis_common_S3Filter(ref common.ReferenceCallback) c }, } } + +func schema_argo_events_pkg_apis_common_SNSEventData(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "SNSEventData represents the event data generated by SNS gateway.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "body": { + SchemaProps: spec.SchemaProps{ + Description: "Body represents the SNS message body", + Type: []string{"string"}, + Format: "byte", + }, + }, + }, + Required: []string{"body"}, + }, + }, + } +} + +func schema_argo_events_pkg_apis_common_SQSEventData(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "SQSEventData represents the event data generated by SQS gateway.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "messageId": { + SchemaProps: spec.SchemaProps{ + Description: "A unique identifier for the message. A MessageId is considered unique across all AWS accounts for an extended period of time.", + Type: []string{"string"}, + Format: "", + }, + }, + "messageAttributes": { + SchemaProps: spec.SchemaProps{ + Description: "Each message attribute consists of a Name, Type, and Value. For more information, see Amazon SQS Message Attributes (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-attributes.html) in the Amazon Simple Queue Service Developer Guide.", + Type: []string{"object"}, + AdditionalProperties: &spec.SchemaOrBool{ + Allows: true, + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Ref: ref("github.com/aws/aws-sdk-go/service/sqs.MessageAttributeValue"), + }, + }, + }, + }, + }, + "body": { + SchemaProps: spec.SchemaProps{ + Description: "The message's contents (not URL-encoded).", + Type: []string{"string"}, + Format: "byte", + }, + }, + }, + Required: []string{"messageId", "messageAttributes", "body"}, + }, + }, + Dependencies: []string{ + "github.com/aws/aws-sdk-go/service/sqs.MessageAttributeValue"}, + } +} + +func schema_argo_events_pkg_apis_common_WebhookEventData(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "WebhookEventData represents the event data generated by the Webhook gateway.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "header": { + SchemaProps: spec.SchemaProps{ + Description: "Header is the http request header", + Type: []string{"object"}, + AdditionalProperties: &spec.SchemaOrBool{ + Allows: true, + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, + }, + }, + }, + "body": { + SchemaProps: spec.SchemaProps{ + Description: "Body is http request body", + Type: []string{"string"}, + Format: "byte", + }, + }, + }, + Required: []string{"header", "body"}, + }, + }, + } +} diff --git a/pkg/apis/common/event-data.go b/pkg/apis/events/event-data.go similarity index 99% rename from pkg/apis/common/event-data.go rename to pkg/apis/events/event-data.go index 6c956f3033..00c17d5c46 100644 --- a/pkg/apis/common/event-data.go +++ b/pkg/apis/events/event-data.go @@ -13,7 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -package common +package events import ( "encoding/json" @@ -24,7 +24,6 @@ import ( ) // AMQPEventData represents the event data generated by AMQP gateway. -// +k8s:openapi-gen=true type AMQPEventData struct { // ContentType is the MIME content type ContentType string `json:"contentType"` diff --git a/pkg/apis/eventsources/v1alpha1/openapi_generated.go b/pkg/apis/eventsources/v1alpha1/openapi_generated.go index c8cff44682..2da7f12003 100644 --- a/pkg/apis/eventsources/v1alpha1/openapi_generated.go +++ b/pkg/apis/eventsources/v1alpha1/openapi_generated.go @@ -38,6 +38,7 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.EventSourceSpec": schema_pkg_apis_eventsources_v1alpha1_EventSourceSpec(ref), "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.EventSourceStatus": schema_pkg_apis_eventsources_v1alpha1_EventSourceStatus(ref), "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.FileEventSource": schema_pkg_apis_eventsources_v1alpha1_FileEventSource(ref), + "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.GenericEventSource": schema_pkg_apis_eventsources_v1alpha1_GenericEventSource(ref), "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.GithubEventSource": schema_pkg_apis_eventsources_v1alpha1_GithubEventSource(ref), "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.GitlabEventSource": schema_pkg_apis_eventsources_v1alpha1_GitlabEventSource(ref), "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.HDFSEventSource": schema_pkg_apis_eventsources_v1alpha1_HDFSEventSource(ref), @@ -699,8 +700,7 @@ func schema_pkg_apis_eventsources_v1alpha1_EventSourceSpec(ref common.ReferenceC Allows: true, Schema: &spec.Schema{ SchemaProps: spec.SchemaProps{ - Type: []string{"object"}, - Format: "", + Ref: ref("github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.GenericEventSource"), }, }, }, @@ -718,7 +718,7 @@ func schema_pkg_apis_eventsources_v1alpha1_EventSourceSpec(ref common.ReferenceC }, }, Dependencies: []string{ - "github.com/argoproj/argo-events/gateways/server/common/webhook.Context", "github.com/argoproj/argo-events/pkg/apis/common.S3Artifact", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.AMQPEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.AzureEventsHubEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.CalendarEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.EmitterEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.FileEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.GithubEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.GitlabEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.HDFSEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.KafkaEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.MQTTEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.NATSEventsSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.NSQEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.PubSubEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.RedisEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.ResourceEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.SNSEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.SQSEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.SlackEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.StorageGridEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.StripeEventSource"}, + "github.com/argoproj/argo-events/gateways/server/common/webhook.Context", "github.com/argoproj/argo-events/pkg/apis/common.S3Artifact", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.AMQPEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.AzureEventsHubEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.CalendarEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.EmitterEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.FileEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.GenericEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.GithubEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.GitlabEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.HDFSEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.KafkaEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.MQTTEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.NATSEventsSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.NSQEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.PubSubEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.RedisEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.ResourceEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.SNSEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.SQSEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.SlackEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.StorageGridEventSource", "github.com/argoproj/argo-events/pkg/apis/eventsources/v1alpha1.StripeEventSource"}, } } @@ -771,6 +771,27 @@ func schema_pkg_apis_eventsources_v1alpha1_FileEventSource(ref common.ReferenceC } } +func schema_pkg_apis_eventsources_v1alpha1_GenericEventSource(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "GenericEventSource refers to a generic event source. It can be used to implement a custom event source.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "value": { + SchemaProps: spec.SchemaProps{ + Description: "Value of the event source", + Type: []string{"string"}, + Format: "", + }, + }, + }, + Required: []string{"value"}, + }, + }, + } +} + func schema_pkg_apis_eventsources_v1alpha1_GithubEventSource(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ diff --git a/pkg/apis/sensor/v1alpha1/openapi_generated.go b/pkg/apis/sensor/v1alpha1/openapi_generated.go index b71c2448f2..c5cf14cacc 100644 --- a/pkg/apis/sensor/v1alpha1/openapi_generated.go +++ b/pkg/apis/sensor/v1alpha1/openapi_generated.go @@ -34,6 +34,7 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.ArtifactLocation": schema_pkg_apis_sensor_v1alpha1_ArtifactLocation(ref), "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.Backoff": schema_pkg_apis_sensor_v1alpha1_Backoff(ref), "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.ConfigmapArtifact": schema_pkg_apis_sensor_v1alpha1_ConfigmapArtifact(ref), + "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.CustomTrigger": schema_pkg_apis_sensor_v1alpha1_CustomTrigger(ref), "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.DataFilter": schema_pkg_apis_sensor_v1alpha1_DataFilter(ref), "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.DependencyGroup": schema_pkg_apis_sensor_v1alpha1_DependencyGroup(ref), "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.EventDependency": schema_pkg_apis_sensor_v1alpha1_EventDependency(ref), @@ -335,6 +336,75 @@ func schema_pkg_apis_sensor_v1alpha1_ConfigmapArtifact(ref common.ReferenceCallb } } +func schema_pkg_apis_sensor_v1alpha1_CustomTrigger(ref common.ReferenceCallback) common.OpenAPIDefinition { + return common.OpenAPIDefinition{ + Schema: spec.Schema{ + SchemaProps: spec.SchemaProps{ + Description: "CustomTrigger refers to the specification of the custom trigger.", + Type: []string{"object"}, + Properties: map[string]spec.Schema{ + "serverURL": { + SchemaProps: spec.SchemaProps{ + Description: "ServerURL is the url of the gRPC server that executes custom trigger", + Type: []string{"string"}, + Format: "", + }, + }, + "secure": { + SchemaProps: spec.SchemaProps{ + Description: "Secure refers to type of the connection between sensor to custom trigger gRPC", + Type: []string{"boolean"}, + Format: "", + }, + }, + "certFilePath": { + SchemaProps: spec.SchemaProps{ + Description: "CertFilePath is path to the cert file within sensor for secure connection between sensor and custom trigger gRPC server.", + Type: []string{"string"}, + Format: "", + }, + }, + "serverNameOverride": { + SchemaProps: spec.SchemaProps{ + Description: "ServerNameOverride for the secure connection between sensor and custom trigger gRPC server.", + Type: []string{"string"}, + Format: "", + }, + }, + "triggerBody": { + SchemaProps: spec.SchemaProps{ + Description: "TriggerBody is the custom trigger resource specification that custom trigger gRPC server knows how to interpret.", + Type: []string{"string"}, + Format: "", + }, + }, + "parameters": { + VendorExtensible: spec.VendorExtensible{ + Extensions: spec.Extensions{ + "x-kubernetes-list-type": "triggerParameters", + }, + }, + SchemaProps: spec.SchemaProps{ + Description: "Parameters is the list of parameters that is applied to resolved custom trigger trigger object.", + Type: []string{"array"}, + Items: &spec.SchemaOrArray{ + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Ref: ref("github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.TriggerParameter"), + }, + }, + }, + }, + }, + }, + Required: []string{"serverURL", "secure", "triggerBody"}, + }, + }, + Dependencies: []string{ + "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.TriggerParameter"}, + } +} + func schema_pkg_apis_sensor_v1alpha1_DataFilter(ref common.ReferenceCallback) common.OpenAPIDefinition { return common.OpenAPIDefinition{ Schema: spec.Schema{ @@ -1780,12 +1850,18 @@ func schema_pkg_apis_sensor_v1alpha1_TriggerTemplate(ref common.ReferenceCallbac Ref: ref("github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.AWSLambdaTrigger"), }, }, + "customTrigger": { + SchemaProps: spec.SchemaProps{ + Description: "CustomTrigger refers to the trigger designed to connect to a gRPC trigger server and execute a custom trigger.", + Ref: ref("github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.CustomTrigger"), + }, + }, }, Required: []string{"name"}, }, }, Dependencies: []string{ - "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.AWSLambdaTrigger", "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.ArgoWorkflowTrigger", "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.HTTPTrigger", "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.OpenFaasTrigger", "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.StandardK8sTrigger", "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.TriggerSwitch"}, + "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.AWSLambdaTrigger", "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.ArgoWorkflowTrigger", "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.CustomTrigger", "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.HTTPTrigger", "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.OpenFaasTrigger", "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.StandardK8sTrigger", "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1.TriggerSwitch"}, } } diff --git a/pkg/apis/sensor/v1alpha1/types.go b/pkg/apis/sensor/v1alpha1/types.go index 9f889f8eca..34bf7d754d 100644 --- a/pkg/apis/sensor/v1alpha1/types.go +++ b/pkg/apis/sensor/v1alpha1/types.go @@ -276,7 +276,11 @@ type TriggerTemplate struct { // +optional OpenFaas *OpenFaasTrigger `json:"openFaas,omitempty" protobuf:"bytes,5,opt,name=openFaas"` // AWSLambda refers to the trigger designed to invoke AWS Lambda function with with on-the-fly constructable payload. + // +optional AWSLambda *AWSLambdaTrigger `json:"awsLambda,omitempty" protobuf:"bytes,6,opt,name=awsLambda"` + // CustomTrigger refers to the trigger designed to connect to a gRPC trigger server and execute a custom trigger. + // +optional + CustomTrigger *CustomTrigger `json:"customTrigger,omitempty" protobuf:"bytes,7,opt,name=customTrigger"` } // TriggerSwitch describes condition which must be satisfied in order to execute a trigger. @@ -403,6 +407,26 @@ type AWSLambdaTrigger struct { Parameters []TriggerParameter `json:"parameters,omitempty" protobuf:"bytes,7,rep,name=parameters"` } +// CustomTrigger refers to the specification of the custom trigger. +type CustomTrigger struct { + // ServerURL is the url of the gRPC server that executes custom trigger + ServerURL string `json:"serverURL" protobuf:"bytes,1,name=serverURL"` + // Secure refers to type of the connection between sensor to custom trigger gRPC + Secure bool `json:"secure" protobuf:"bytes,2,name=secure"` + // CertFilePath is path to the cert file within sensor for secure connection between sensor and custom trigger gRPC server. + CertFilePath string `json:"certFilePath,omitempty" protobuf:"bytes,3,opt,name=certFilePath"` + // ServerNameOverride for the secure connection between sensor and custom trigger gRPC server. + ServerNameOverride string `json:"serverNameOverride,omitempty" protobuf:"bytes,4,opt,name=serverNameOverride"` + // TriggerBody is the custom trigger resource specification that custom trigger gRPC server knows how to interpret. + TriggerBody string `json:"triggerBody" protobuf:"bytes,5,name=triggerBody"` + // Parameters is the list of parameters that is applied to resolved custom trigger trigger object. + // +listType=triggerParameters + Parameters []TriggerParameter `json:"parameters,omitempty" protobuf:"bytes,6,rep,name=parameters"` + // Payload is the list of key-value extracted from an event payload to construct the request payload. + // +listType=payloadParameters + Payload []TriggerParameter `json:"payload" protobuf:"bytes,7,rep,name=payload"` +} + // TriggerParameterOperation represents how to set a trigger destination // resource key type TriggerParameterOperation string diff --git a/pkg/apis/sensor/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/sensor/v1alpha1/zz_generated.deepcopy.go index 0b8a85affe..69c6207120 100644 --- a/pkg/apis/sensor/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/sensor/v1alpha1/zz_generated.deepcopy.go @@ -182,6 +182,29 @@ func (in *ConfigmapArtifact) DeepCopy() *ConfigmapArtifact { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *CustomTrigger) DeepCopyInto(out *CustomTrigger) { + *out = *in + if in.Parameters != nil { + in, out := &in.Parameters, &out.Parameters + *out = make([]TriggerParameter, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CustomTrigger. +func (in *CustomTrigger) DeepCopy() *CustomTrigger { + if in == nil { + return nil + } + out := new(CustomTrigger) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DataFilter) DeepCopyInto(out *DataFilter) { *out = *in @@ -969,6 +992,11 @@ func (in *TriggerTemplate) DeepCopyInto(out *TriggerTemplate) { *out = new(AWSLambdaTrigger) (*in).DeepCopyInto(*out) } + if in.CustomTrigger != nil { + in, out := &in.CustomTrigger, &out.CustomTrigger + *out = new(CustomTrigger) + (*in).DeepCopyInto(*out) + } return } diff --git a/sensors/context.go b/sensors/context.go index d610b25f11..ee74c8055a 100644 --- a/sensors/context.go +++ b/sensors/context.go @@ -22,6 +22,7 @@ import ( sensorclientset "github.com/argoproj/argo-events/pkg/client/sensor/clientset/versioned" "github.com/argoproj/argo-events/sensors/types" "github.com/sirupsen/logrus" + "google.golang.org/grpc" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" ) @@ -44,6 +45,8 @@ type SensorContext struct { ControllerInstanceID string // Updated indicates update to Sensor resource Updated bool + // customTriggerClients holds the references to the gRPC clients for the custom trigger servers + customTriggerClients map[string]*grpc.ClientConn } // NewSensorContext returns a new sensor execution context. @@ -56,5 +59,6 @@ func NewSensorContext(sensorClient sensorclientset.Interface, kubeClient kuberne Logger: common.NewArgoEventsLogger().WithField(common.LabelSensorName, sensor.Name).Logger, NotificationQueue: make(chan *types.Notification), ControllerInstanceID: controllerInstanceID, + customTriggerClients: make(map[string]*grpc.ClientConn), } } diff --git a/sensors/trigger.go b/sensors/trigger.go index 5caa0b67e5..7945fad904 100644 --- a/sensors/trigger.go +++ b/sensors/trigger.go @@ -18,7 +18,8 @@ package sensors import ( "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1" argoworkflow "github.com/argoproj/argo-events/sensors/triggers/argo-workflow" - aws_lambda "github.com/argoproj/argo-events/sensors/triggers/aws-lambda" + awslambda "github.com/argoproj/argo-events/sensors/triggers/aws-lambda" + customtrigger "github.com/argoproj/argo-events/sensors/triggers/custom-trigger" "github.com/argoproj/argo-events/sensors/triggers/http" "github.com/argoproj/argo-events/sensors/triggers/openfaas" standardk8s "github.com/argoproj/argo-events/sensors/triggers/standard-k8s" @@ -51,7 +52,15 @@ func (sensorCtx *SensorContext) GetTrigger(trigger *v1alpha1.Trigger) Trigger { return http.NewHTTPTrigger(sensorCtx.Sensor, trigger, sensorCtx.Logger) } if trigger.Template.AWSLambda != nil { - return aws_lambda.NewAWSLambdaTrigger(sensorCtx.KubeClient, sensorCtx.Sensor, trigger, sensorCtx.Logger) + return awslambda.NewAWSLambdaTrigger(sensorCtx.KubeClient, sensorCtx.Sensor, trigger, sensorCtx.Logger) + } + if trigger.Template.CustomTrigger != nil { + result, err := customtrigger.NewCustomTrigger(sensorCtx.Sensor, trigger, sensorCtx.Logger, sensorCtx.customTriggerClients) + if err != nil { + sensorCtx.Logger.WithError(err).WithField("trigger", trigger.Template.Name).Errorln("failed to invoke the trigger") + return nil + } + return result } return nil } diff --git a/sensors/triggers/aws-lambda/aws-lambda.go b/sensors/triggers/aws-lambda/aws-lambda.go index 3f33d98ccd..1b41689ef5 100644 --- a/sensors/triggers/aws-lambda/aws-lambda.go +++ b/sensors/triggers/aws-lambda/aws-lambda.go @@ -88,24 +88,9 @@ func (t *AWSLambdaTrigger) Execute(resource interface{}) (interface{}, error) { return nil, errors.New("payload parameters are not specified") } - payload := make(map[string][]byte) - - events := triggers.ExtractEvents(t.Sensor, trigger.Payload) - if events == nil { - return nil, errors.New("payload can't be constructed as there are not events to extract data from") - } - - for _, parameter := range trigger.Payload { - value, err := triggers.ResolveParamValue(parameter.Src, events) - if err != nil { - return nil, err - } - payload[parameter.Dest] = []byte(value) - } - - payloadBody, err := json.Marshal(payload) + payload, err := triggers.ConstructPayload(t.Sensor, trigger.Payload) if err != nil { - return nil, errors.Wrap(err, "failed to marshal payload") + return nil, err } awsSession, err := commonaws.CreateAWSSession(t.K8sClient, trigger.Namespace, trigger.Region, trigger.AccessKey, trigger.SecretKey) @@ -117,7 +102,7 @@ func (t *AWSLambdaTrigger) Execute(resource interface{}) (interface{}, error) { response, err := client.Invoke(&lambda.InvokeInput{ FunctionName: &trigger.FunctionName, - Payload: payloadBody, + Payload: payload, }) if err != nil { return nil, err diff --git a/sensors/triggers/custom-trigger/custom-trigger.go b/sensors/triggers/custom-trigger/custom-trigger.go new file mode 100644 index 0000000000..017339eadd --- /dev/null +++ b/sensors/triggers/custom-trigger/custom-trigger.go @@ -0,0 +1,169 @@ +/* +Copyright 2020 BlackRock, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package custom_trigger + +import ( + "context" + + "github.com/argoproj/argo-events/common" + "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1" + "github.com/argoproj/argo-events/sensors/triggers" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials" + "k8s.io/apimachinery/pkg/util/wait" +) + +// CustomTrigger implements Trigger interface for custom trigger resource +type CustomTrigger struct { + // Sensor object + Sensor *v1alpha1.Sensor + // Trigger definition + Trigger *v1alpha1.Trigger + // logger to log stuff + Logger *logrus.Logger + // triggerClient is the gRPC client for the custom trigger server + triggerClient triggers.TriggerClient +} + +func NewCustomTrigger(sensor *v1alpha1.Sensor, trigger *v1alpha1.Trigger, logger *logrus.Logger, customTriggerClients map[string]*grpc.ClientConn) (*CustomTrigger, error) { + customTrigger := &CustomTrigger{ + Sensor: sensor, + Trigger: trigger, + Logger: logger, + } + + ct := trigger.Template.CustomTrigger + + if conn, ok := customTriggerClients[trigger.Template.Name]; ok { + if conn.GetState() == connectivity.Ready { + customTrigger.triggerClient = triggers.NewTriggerClient(conn) + return customTrigger, nil + } + delete(customTriggerClients, trigger.Template.Name) + } + + opt := []grpc.DialOption{ + grpc.WithBlock(), + grpc.WithInsecure(), + } + + if ct.Secure { + creds, err := credentials.NewClientTLSFromFile(ct.CertFilePath, ct.ServerNameOverride) + if err != nil { + return nil, err + } + opt = append(opt, grpc.WithTransportCredentials(creds)) + } + + conn, err := grpc.Dial( + ct.ServerURL, + opt..., + ) + if err != nil { + return nil, err + } + + connBackoff := common.GetConnectionBackoff(nil) + + if err = wait.ExponentialBackoff(*connBackoff, func() (done bool, err error) { + if conn.GetState() == connectivity.Ready { + return true, nil + } + return false, nil + }); err != nil { + return nil, err + } + + customTrigger.triggerClient = triggers.NewTriggerClient(conn) + customTriggerClients[trigger.Template.Name] = conn + return customTrigger, nil +} + +// FetchResource fetches the trigger resource from external source +func (ct *CustomTrigger) FetchResource() (interface{}, error) { + resource, err := ct.triggerClient.FetchResource(context.Background(), &triggers.FetchResourceRequest{ + Resource: []byte(ct.Trigger.Template.CustomTrigger.TriggerBody), + }) + if err != nil { + return nil, errors.Wrapf(err, "failed to fetch the custom trigger resource for %s", ct.Trigger.Template.Name) + } + return resource, nil +} + +// ApplyResourceParameters applies parameters to the trigger resource +func (ct *CustomTrigger) ApplyResourceParameters(sensor *v1alpha1.Sensor, resource interface{}) (interface{}, error) { + obj, ok := resource.([]byte) + if !ok { + return nil, errors.New("failed to interpret the trigger resource for resource parameters application") + } + parameters := ct.Trigger.Template.CustomTrigger.Parameters + + if parameters != nil && len(parameters) > 0 { + resource, err := triggers.ApplyParams(obj, ct.Trigger.Template.OpenFaas.Parameters, triggers.ExtractEvents(sensor, parameters)) + if err != nil { + return nil, errors.Wrapf(err, "failed to apply the parameters to the custom trigger resource for %s", ct.Trigger.Template.Name) + } + return resource, nil + } + + return resource, nil +} + +// Execute executes the trigger +func (ct *CustomTrigger) Execute(resource interface{}) (interface{}, error) { + obj, ok := resource.([]byte) + if !ok { + return nil, errors.New("failed to interpret the trigger resource for the execution") + } + trigger := ct.Trigger.Template.CustomTrigger + if trigger.Payload == nil { + return nil, errors.New("payload parameters are not specified") + } + payload, err := triggers.ConstructPayload(ct.Sensor, trigger.Payload) + if err != nil { + return nil, err + } + result, err := ct.triggerClient.Execute(context.Background(), &triggers.ExecuteRequest{ + Resource: obj, + Payload: payload, + }) + if err != nil { + return nil, errors.Wrapf(err, "failed to execute the custom trigger resource for %s", ct.Trigger.Template.Name) + } + return result, nil +} + +// ApplyPolicy applies the policy on the trigger +func (ct *CustomTrigger) ApplyPolicy(resource interface{}) error { + obj, ok := resource.([]byte) + if !ok { + return errors.New("failed to interpret the trigger resource for the policy application") + } + result, err := ct.triggerClient.ApplyPolicy(context.Background(), &triggers.ApplyPolicyRequest{ + Request: obj, + }) + if err != nil { + return errors.Wrapf(err, "failed to apply the policy for the custom trigger resource for %s", ct.Trigger.Template.Name) + } + ct.Logger.WithFields(logrus.Fields{ + "success": result.Success, + "message": result.Message, + }).Infoln("policy application result") + return err +} diff --git a/sensors/triggers/custom-trigger/custom-trigger_test.go b/sensors/triggers/custom-trigger/custom-trigger_test.go new file mode 100644 index 0000000000..70058a2453 --- /dev/null +++ b/sensors/triggers/custom-trigger/custom-trigger_test.go @@ -0,0 +1,16 @@ +/* +Copyright 2020 BlackRock, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package custom_trigger diff --git a/sensors/triggers/openfaas/openfaas.go b/sensors/triggers/openfaas/openfaas.go index d1f950369b..5e467fdb48 100644 --- a/sensors/triggers/openfaas/openfaas.go +++ b/sensors/triggers/openfaas/openfaas.go @@ -151,6 +151,8 @@ func (t *OpenFaasTrigger) Execute(resource interface{}) (interface{}, error) { return nil, errors.Wrap(err, "failed to read the response") } + t.Logger.WithField("body", string(body)).Infoln("response body") + return body, nil } diff --git a/sensors/triggers/params.go b/sensors/triggers/params.go index ec8ffac661..8415298736 100644 --- a/sensors/triggers/params.go +++ b/sensors/triggers/params.go @@ -19,6 +19,7 @@ package triggers import ( "encoding/json" "fmt" + "github.com/argoproj/argo-events/common" snctrl "github.com/argoproj/argo-events/controllers/sensor" apicommon "github.com/argoproj/argo-events/pkg/apis/common" @@ -32,7 +33,7 @@ import ( // ConstructPayload constructs a payload for operations involving request and responses like HTTP request. func ConstructPayload(sensor *v1alpha1.Sensor, parameters []v1alpha1.TriggerParameter) ([]byte, error) { - var result []byte + payload := make(map[string]string) events := ExtractEvents(sensor, parameters) if events == nil { @@ -44,14 +45,10 @@ func ConstructPayload(sensor *v1alpha1.Sensor, parameters []v1alpha1.TriggerPara if err != nil { return nil, err } - - result, err = sjson.SetBytes(result, parameter.Dest, []byte(value)) - if err != nil { - return nil, errors.Wrapf(err, "failed to construct the JSON payload") - } + payload[parameter.Dest] = value } - return result, nil + return json.Marshal(payload) } // ApplyTemplateParameters applies parameters to trigger template diff --git a/sensors/triggers/trigger.pb.go b/sensors/triggers/trigger.pb.go new file mode 100644 index 0000000000..f3d0d554c9 --- /dev/null +++ b/sensors/triggers/trigger.pb.go @@ -0,0 +1,455 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: trigger.proto + +package triggers + +import ( + context "context" + fmt "fmt" + proto "github.com/golang/protobuf/proto" + grpc "google.golang.org/grpc" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +// FetchResourceRequest is the request to fetch trigger resource +type FetchResourceRequest struct { + Resource []byte `protobuf:"bytes,1,opt,name=resource,proto3" json:"resource,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *FetchResourceRequest) Reset() { *m = FetchResourceRequest{} } +func (m *FetchResourceRequest) String() string { return proto.CompactTextString(m) } +func (*FetchResourceRequest) ProtoMessage() {} +func (*FetchResourceRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_8c31e6d8b4368946, []int{0} +} + +func (m *FetchResourceRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_FetchResourceRequest.Unmarshal(m, b) +} +func (m *FetchResourceRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_FetchResourceRequest.Marshal(b, m, deterministic) +} +func (m *FetchResourceRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_FetchResourceRequest.Merge(m, src) +} +func (m *FetchResourceRequest) XXX_Size() int { + return xxx_messageInfo_FetchResourceRequest.Size(m) +} +func (m *FetchResourceRequest) XXX_DiscardUnknown() { + xxx_messageInfo_FetchResourceRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_FetchResourceRequest proto.InternalMessageInfo + +func (m *FetchResourceRequest) GetResource() []byte { + if m != nil { + return m.Resource + } + return nil +} + +// FetchResourceRequest contains the fetched resource. +type FetchResourceResponse struct { + Resource []byte `protobuf:"bytes,1,opt,name=resource,proto3" json:"resource,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *FetchResourceResponse) Reset() { *m = FetchResourceResponse{} } +func (m *FetchResourceResponse) String() string { return proto.CompactTextString(m) } +func (*FetchResourceResponse) ProtoMessage() {} +func (*FetchResourceResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_8c31e6d8b4368946, []int{1} +} + +func (m *FetchResourceResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_FetchResourceResponse.Unmarshal(m, b) +} +func (m *FetchResourceResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_FetchResourceResponse.Marshal(b, m, deterministic) +} +func (m *FetchResourceResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_FetchResourceResponse.Merge(m, src) +} +func (m *FetchResourceResponse) XXX_Size() int { + return xxx_messageInfo_FetchResourceResponse.Size(m) +} +func (m *FetchResourceResponse) XXX_DiscardUnknown() { + xxx_messageInfo_FetchResourceResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_FetchResourceResponse proto.InternalMessageInfo + +func (m *FetchResourceResponse) GetResource() []byte { + if m != nil { + return m.Resource + } + return nil +} + +// ExecuteRequest is a request to execute a trigger resource. +type ExecuteRequest struct { + Resource []byte `protobuf:"bytes,1,opt,name=resource,proto3" json:"resource,omitempty"` + Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ExecuteRequest) Reset() { *m = ExecuteRequest{} } +func (m *ExecuteRequest) String() string { return proto.CompactTextString(m) } +func (*ExecuteRequest) ProtoMessage() {} +func (*ExecuteRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_8c31e6d8b4368946, []int{2} +} + +func (m *ExecuteRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ExecuteRequest.Unmarshal(m, b) +} +func (m *ExecuteRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ExecuteRequest.Marshal(b, m, deterministic) +} +func (m *ExecuteRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ExecuteRequest.Merge(m, src) +} +func (m *ExecuteRequest) XXX_Size() int { + return xxx_messageInfo_ExecuteRequest.Size(m) +} +func (m *ExecuteRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ExecuteRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ExecuteRequest proto.InternalMessageInfo + +func (m *ExecuteRequest) GetResource() []byte { + if m != nil { + return m.Resource + } + return nil +} + +func (m *ExecuteRequest) GetPayload() []byte { + if m != nil { + return m.Payload + } + return nil +} + +// ExecuteResponse is the response of the trigger execution +type ExecuteResponse struct { + Response []byte `protobuf:"bytes,1,opt,name=response,proto3" json:"response,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ExecuteResponse) Reset() { *m = ExecuteResponse{} } +func (m *ExecuteResponse) String() string { return proto.CompactTextString(m) } +func (*ExecuteResponse) ProtoMessage() {} +func (*ExecuteResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_8c31e6d8b4368946, []int{3} +} + +func (m *ExecuteResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ExecuteResponse.Unmarshal(m, b) +} +func (m *ExecuteResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ExecuteResponse.Marshal(b, m, deterministic) +} +func (m *ExecuteResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_ExecuteResponse.Merge(m, src) +} +func (m *ExecuteResponse) XXX_Size() int { + return xxx_messageInfo_ExecuteResponse.Size(m) +} +func (m *ExecuteResponse) XXX_DiscardUnknown() { + xxx_messageInfo_ExecuteResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_ExecuteResponse proto.InternalMessageInfo + +func (m *ExecuteResponse) GetResponse() []byte { + if m != nil { + return m.Response + } + return nil +} + +// ApplyPolicyRequest is the request to apply policy on the trigger execution result. +type ApplyPolicyRequest struct { + Request []byte `protobuf:"bytes,1,opt,name=request,proto3" json:"request,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ApplyPolicyRequest) Reset() { *m = ApplyPolicyRequest{} } +func (m *ApplyPolicyRequest) String() string { return proto.CompactTextString(m) } +func (*ApplyPolicyRequest) ProtoMessage() {} +func (*ApplyPolicyRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_8c31e6d8b4368946, []int{4} +} + +func (m *ApplyPolicyRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ApplyPolicyRequest.Unmarshal(m, b) +} +func (m *ApplyPolicyRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ApplyPolicyRequest.Marshal(b, m, deterministic) +} +func (m *ApplyPolicyRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ApplyPolicyRequest.Merge(m, src) +} +func (m *ApplyPolicyRequest) XXX_Size() int { + return xxx_messageInfo_ApplyPolicyRequest.Size(m) +} +func (m *ApplyPolicyRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ApplyPolicyRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ApplyPolicyRequest proto.InternalMessageInfo + +func (m *ApplyPolicyRequest) GetRequest() []byte { + if m != nil { + return m.Request + } + return nil +} + +// ApplyPolicyResponse is the response of the application of the trigger policy. +type ApplyPolicyResponse struct { + Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"` + Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ApplyPolicyResponse) Reset() { *m = ApplyPolicyResponse{} } +func (m *ApplyPolicyResponse) String() string { return proto.CompactTextString(m) } +func (*ApplyPolicyResponse) ProtoMessage() {} +func (*ApplyPolicyResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_8c31e6d8b4368946, []int{5} +} + +func (m *ApplyPolicyResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ApplyPolicyResponse.Unmarshal(m, b) +} +func (m *ApplyPolicyResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ApplyPolicyResponse.Marshal(b, m, deterministic) +} +func (m *ApplyPolicyResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_ApplyPolicyResponse.Merge(m, src) +} +func (m *ApplyPolicyResponse) XXX_Size() int { + return xxx_messageInfo_ApplyPolicyResponse.Size(m) +} +func (m *ApplyPolicyResponse) XXX_DiscardUnknown() { + xxx_messageInfo_ApplyPolicyResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_ApplyPolicyResponse proto.InternalMessageInfo + +func (m *ApplyPolicyResponse) GetSuccess() bool { + if m != nil { + return m.Success + } + return false +} + +func (m *ApplyPolicyResponse) GetMessage() string { + if m != nil { + return m.Message + } + return "" +} + +func init() { + proto.RegisterType((*FetchResourceRequest)(nil), "triggers.FetchResourceRequest") + proto.RegisterType((*FetchResourceResponse)(nil), "triggers.FetchResourceResponse") + proto.RegisterType((*ExecuteRequest)(nil), "triggers.ExecuteRequest") + proto.RegisterType((*ExecuteResponse)(nil), "triggers.ExecuteResponse") + proto.RegisterType((*ApplyPolicyRequest)(nil), "triggers.ApplyPolicyRequest") + proto.RegisterType((*ApplyPolicyResponse)(nil), "triggers.ApplyPolicyResponse") +} + +func init() { proto.RegisterFile("trigger.proto", fileDescriptor_8c31e6d8b4368946) } + +var fileDescriptor_8c31e6d8b4368946 = []byte{ + // 272 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x52, 0x4d, 0x4b, 0x03, 0x31, + 0x10, 0x65, 0x3d, 0x98, 0x75, 0xb4, 0x0a, 0x51, 0x21, 0x2e, 0x7e, 0x91, 0x93, 0x17, 0xf7, 0xd0, + 0xde, 0x05, 0x0f, 0x16, 0xf4, 0x54, 0x82, 0x7f, 0x60, 0x8d, 0xc3, 0x5a, 0x58, 0x4d, 0xcc, 0x64, + 0xc1, 0xfd, 0xc5, 0xfe, 0x0d, 0x31, 0x9b, 0xb4, 0x5d, 0x6d, 0xc5, 0xdb, 0xbc, 0xbc, 0x79, 0xf3, + 0x26, 0x2f, 0x81, 0x91, 0x77, 0xf3, 0xba, 0x46, 0x57, 0x5a, 0x67, 0xbc, 0xe1, 0x79, 0x84, 0x24, + 0xc7, 0x70, 0x34, 0x45, 0xaf, 0x5f, 0x14, 0x92, 0x69, 0x9d, 0x46, 0x85, 0xef, 0x2d, 0x92, 0xe7, + 0x05, 0xe4, 0x2e, 0x1e, 0x89, 0xec, 0x32, 0xbb, 0xda, 0x53, 0x0b, 0x2c, 0x27, 0x70, 0xfc, 0x43, + 0x43, 0xd6, 0xbc, 0x11, 0xfe, 0x29, 0x9a, 0xc2, 0xfe, 0xdd, 0x07, 0xea, 0xd6, 0xff, 0xc7, 0x82, + 0x0b, 0x60, 0xb6, 0xea, 0x1a, 0x53, 0x3d, 0x8b, 0xad, 0x40, 0x25, 0x28, 0xaf, 0xe1, 0x60, 0x31, + 0x67, 0x60, 0x1b, 0xea, 0x95, 0x41, 0x01, 0xcb, 0x12, 0xf8, 0xad, 0xb5, 0x4d, 0x37, 0x33, 0xcd, + 0x5c, 0x77, 0xc9, 0x5a, 0x00, 0x73, 0x7d, 0x19, 0x05, 0x09, 0xca, 0x7b, 0x38, 0x1c, 0xf4, 0x47, + 0x0b, 0x01, 0x8c, 0x5a, 0xad, 0x91, 0x28, 0x08, 0x72, 0x95, 0xe0, 0x37, 0xf3, 0x8a, 0x44, 0x55, + 0x8d, 0x61, 0xd3, 0x1d, 0x95, 0xe0, 0xf8, 0x33, 0x03, 0xf6, 0xd8, 0xe7, 0xcc, 0x67, 0x30, 0x1a, + 0x44, 0xc6, 0xcf, 0xcb, 0xf4, 0x04, 0xe5, 0xba, 0xfc, 0x8b, 0x8b, 0x8d, 0x7c, 0xdc, 0xe8, 0x06, + 0x58, 0xcc, 0x81, 0x8b, 0x65, 0xef, 0x30, 0xe2, 0xe2, 0x64, 0x0d, 0x13, 0xf5, 0x0f, 0xb0, 0xbb, + 0x72, 0x51, 0x7e, 0xba, 0xec, 0xfc, 0x9d, 0x57, 0x71, 0xb6, 0x81, 0xed, 0x67, 0x3d, 0x6d, 0x87, + 0x5f, 0x35, 0xf9, 0x0a, 0x00, 0x00, 0xff, 0xff, 0x4b, 0x24, 0xc7, 0x9f, 0x66, 0x02, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// TriggerClient is the client API for Trigger service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type TriggerClient interface { + // FetchResource fetches the resource to be triggered. + FetchResource(ctx context.Context, in *FetchResourceRequest, opts ...grpc.CallOption) (*FetchResourceResponse, error) + // Execute executes the requested trigger resource. + Execute(ctx context.Context, in *ExecuteRequest, opts ...grpc.CallOption) (*ExecuteResponse, error) + // ApplyPolicy applies policies on the trigger execution result. + ApplyPolicy(ctx context.Context, in *ApplyPolicyRequest, opts ...grpc.CallOption) (*ApplyPolicyResponse, error) +} + +type triggerClient struct { + cc *grpc.ClientConn +} + +func NewTriggerClient(cc *grpc.ClientConn) TriggerClient { + return &triggerClient{cc} +} + +func (c *triggerClient) FetchResource(ctx context.Context, in *FetchResourceRequest, opts ...grpc.CallOption) (*FetchResourceResponse, error) { + out := new(FetchResourceResponse) + err := c.cc.Invoke(ctx, "/triggers.Trigger/FetchResource", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *triggerClient) Execute(ctx context.Context, in *ExecuteRequest, opts ...grpc.CallOption) (*ExecuteResponse, error) { + out := new(ExecuteResponse) + err := c.cc.Invoke(ctx, "/triggers.Trigger/Execute", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *triggerClient) ApplyPolicy(ctx context.Context, in *ApplyPolicyRequest, opts ...grpc.CallOption) (*ApplyPolicyResponse, error) { + out := new(ApplyPolicyResponse) + err := c.cc.Invoke(ctx, "/triggers.Trigger/ApplyPolicy", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// TriggerServer is the server API for Trigger service. +type TriggerServer interface { + // FetchResource fetches the resource to be triggered. + FetchResource(context.Context, *FetchResourceRequest) (*FetchResourceResponse, error) + // Execute executes the requested trigger resource. + Execute(context.Context, *ExecuteRequest) (*ExecuteResponse, error) + // ApplyPolicy applies policies on the trigger execution result. + ApplyPolicy(context.Context, *ApplyPolicyRequest) (*ApplyPolicyResponse, error) +} + +func RegisterTriggerServer(s *grpc.Server, srv TriggerServer) { + s.RegisterService(&_Trigger_serviceDesc, srv) +} + +func _Trigger_FetchResource_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(FetchResourceRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TriggerServer).FetchResource(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/triggers.Trigger/FetchResource", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TriggerServer).FetchResource(ctx, req.(*FetchResourceRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Trigger_Execute_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ExecuteRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TriggerServer).Execute(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/triggers.Trigger/Execute", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TriggerServer).Execute(ctx, req.(*ExecuteRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Trigger_ApplyPolicy_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ApplyPolicyRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TriggerServer).ApplyPolicy(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/triggers.Trigger/ApplyPolicy", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TriggerServer).ApplyPolicy(ctx, req.(*ApplyPolicyRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _Trigger_serviceDesc = grpc.ServiceDesc{ + ServiceName: "triggers.Trigger", + HandlerType: (*TriggerServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "FetchResource", + Handler: _Trigger_FetchResource_Handler, + }, + { + MethodName: "Execute", + Handler: _Trigger_Execute_Handler, + }, + { + MethodName: "ApplyPolicy", + Handler: _Trigger_ApplyPolicy_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "trigger.proto", +} diff --git a/sensors/triggers/trigger.proto b/sensors/triggers/trigger.proto new file mode 100644 index 0000000000..6033744442 --- /dev/null +++ b/sensors/triggers/trigger.proto @@ -0,0 +1,48 @@ +/** +* Trigger related messages and services +*/ +syntax = "proto3"; + +package triggers; + +// Trigger offers services to build a custom trigger +service Trigger { + // FetchResource fetches the resource to be triggered. + rpc FetchResource(FetchResourceRequest) returns (FetchResourceResponse); + // Execute executes the requested trigger resource. + rpc Execute(ExecuteRequest) returns (ExecuteResponse); + // ApplyPolicy applies policies on the trigger execution result. + rpc ApplyPolicy(ApplyPolicyRequest) returns (ApplyPolicyResponse); +} + +// FetchResourceRequest is the request to fetch trigger resource +message FetchResourceRequest { + bytes resource = 1; +} + +// FetchResourceRequest contains the fetched resource. +message FetchResourceResponse { + bytes resource = 1; +} + +// ExecuteRequest is a request to execute a trigger resource. +message ExecuteRequest { + bytes resource = 1; + bytes payload = 2; +} + +// ExecuteResponse is the response of the trigger execution +message ExecuteResponse { + bytes response = 1; +} + +// ApplyPolicyRequest is the request to apply policy on the trigger execution result. +message ApplyPolicyRequest { + bytes request = 1; +} + +// ApplyPolicyResponse is the response of the application of the trigger policy. +message ApplyPolicyResponse { + bool success = 1; + string message = 2; +}