diff --git a/examples/s3-sensor-with-param.yaml b/examples/s3-sensor-with-param.yaml new file mode 100644 index 0000000000..80348a077b --- /dev/null +++ b/examples/s3-sensor-with-param.yaml @@ -0,0 +1,60 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Sensor +metadata: + name: s3-example + labels: + sensors.argoproj.io/controller-instanceid: axis +spec: + signals: + - name: minioS3 + artifact: + s3: + bucket: hello + event: s3:ObjectCreated:Put + endpoint: artifacts-minio.default:9000 + target: + type: NATS + url: nats://example-nats-cluster:4222 + attributes: + subject: hello + triggers: + - name: argo-workflow + resource: + namespace: default + group: argoproj.io + version: v1alpha1 + kind: Workflow + # The artifact key from the workflow are overridden by the s3 notification key + parameters: + - src: + signal: minioS3 + path: s3.object.key + dest: spec.templates.0.inputs.artifacts.0.key + source: + inline: | + apiVersion: argoproj.io/v1alpha1 + kind: Workflow + metadata: + generateName: input-artifact-s3- + spec: + entrypoint: input-artifact-s3-example + templates: + - name: input-artifact-s3-example + inputs: + artifacts: + - name: my-art + path: /my-artifact + s3: + endpoint: artifacts-minio.default:9000 + bucket: hello + key: path/in/bucket + accessKey: + key: accesskey + name: artifacts-minio + secretKey: + key: secretkey + name: artifacts-minio + container: + image: debian:latest + command: [sh, -c] + args: ["ls -l /my-artifact"] diff --git a/pkg/apis/sensor/v1alpha1/types.go b/pkg/apis/sensor/v1alpha1/types.go index e7e2676254..5053b7245c 100644 --- a/pkg/apis/sensor/v1alpha1/types.go +++ b/pkg/apis/sensor/v1alpha1/types.go @@ -466,7 +466,7 @@ type ArtifactLocation struct { // S3Artifact contains information about an artifact in S3 type S3Artifact struct { - S3Bucket `json:",inline" protobuf:"bytes,4,opt,name=s3Bucket"` + S3Bucket `json:",inline"` Key string `json:"key,omitempty" protobuf:"bytes,1,opt,name=key"` Event minio.NotificationEventType `json:"event,omitempty" protobuf:"bytes,2,opt,name=event"` Filter *S3Filter `json:"filter,omitempty" protobuf:"bytes,3,opt,name=filter"` diff --git a/sdk/fake/signal-client.go b/sdk/fake/signal-client.go new file mode 100644 index 0000000000..2d6c624fde --- /dev/null +++ b/sdk/fake/signal-client.go @@ -0,0 +1,78 @@ +package fake + +import ( + "context" + "io" + + "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1" + "github.com/argoproj/argo-events/sdk" + "github.com/micro/go-micro/client" +) + +// SignalClient implements the sdk.SignalClient +// also contains a Send() method which should be invoked via a separate go-routine +// if your test is single-threaded to prevent blocking channels +type SignalClient interface { + sdk.SignalClient + Generate(*v1alpha1.Event) +} + +// implements sdk.SignalService_ListenService as a simple loop +type simpleLoopListenService struct { + events chan *v1alpha1.Event +} + +func (*simpleLoopListenService) SendMsg(interface{}) error { + return nil +} + +func (*simpleLoopListenService) RecvMsg(interface{}) error { + return nil +} + +func (f *simpleLoopListenService) Close() error { + close(f.events) + return nil +} + +func (*simpleLoopListenService) Send(*sdk.SignalContext) error { + return nil +} + +func (f *simpleLoopListenService) Recv() (*sdk.EventContext, error) { + event, ok := <-f.events + if !ok { + return nil, io.EOF + } + return &sdk.EventContext{Event: event}, nil +} + +type fakeSignalClient struct { + loop *simpleLoopListenService +} + +// NewClient returns a new fake signal client +func NewClient() SignalClient { + return &fakeSignalClient{ + loop: &simpleLoopListenService{ + events: make(chan *v1alpha1.Event), + }, + } +} + +func (*fakeSignalClient) Ping(context.Context) error { + return nil +} + +func (f *fakeSignalClient) Listen(context.Context, *v1alpha1.Signal, ...client.CallOption) (sdk.SignalService_ListenService, error) { + return f.loop, nil +} + +func (*fakeSignalClient) Handshake(*v1alpha1.Signal, sdk.SignalService_ListenService) error { + return nil +} + +// Generate allows us to produce fake events for testing purposes +func (f *fakeSignalClient) Generate(e *v1alpha1.Event) { + go func() { f.loop.events <- e.DeepCopy() }() +} diff --git a/sdk/interface.go b/sdk/interface.go index 06459fb06d..f811ee43b0 100644 --- a/sdk/interface.go +++ b/sdk/interface.go @@ -46,20 +46,11 @@ type Listener interface { Listen(*v1alpha1.Signal, <-chan struct{}) (<-chan *v1alpha1.Event, error) } -// ArtifactListener is the interface for listening with artifacts -// In addition to including the basic Listener interface, this also -// enables access to read an artifact object to include in the event data payload -type ArtifactListener interface { - Listener - // TODO: change to use io.Reader and io.Closer interfaces? - Read(loc *v1alpha1.ArtifactLocation, key string) ([]byte, error) -} - // SignalClient is the interface for signal clients type SignalClient interface { Ping(context.Context) error Listen(context.Context, *v1alpha1.Signal, ...client.CallOption) (SignalService_ListenService, error) - handshake(*v1alpha1.Signal, SignalService_ListenService) error + Handshake(*v1alpha1.Signal, SignalService_ListenService) error } // SignalServer is the interface for signal servers diff --git a/sdk/micro_client.go b/sdk/micro_client.go index b2b29ade82..6c3bd60665 100644 --- a/sdk/micro_client.go +++ b/sdk/micro_client.go @@ -35,7 +35,7 @@ func (m *microSignalClient) Listen(ctx context.Context, signal *v1alpha1.Signal, if err != nil { return nil, err } - err = m.handshake(signal, stream) + err = m.Handshake(signal, stream) if err != nil { return nil, err } @@ -43,7 +43,7 @@ func (m *microSignalClient) Listen(ctx context.Context, signal *v1alpha1.Signal, } // Handshake performs the initial signal handshaking with the server -func (m *microSignalClient) handshake(signal *v1alpha1.Signal, stream SignalService_ListenService) error { +func (m *microSignalClient) Handshake(signal *v1alpha1.Signal, stream SignalService_ListenService) error { err := stream.Send(&SignalContext{Signal: signal}) if err != nil { return err diff --git a/signals/artifact/micro/artifact_service.go b/signals/artifact/micro/artifact_service.go index 38c66e36a7..5288c29a32 100644 --- a/signals/artifact/micro/artifact_service.go +++ b/signals/artifact/micro/artifact_service.go @@ -19,38 +19,26 @@ package main import ( "os" - "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/sdk" "github.com/argoproj/argo-events/signals/artifact" "github.com/micro/go-micro" k8s "github.com/micro/kubernetes/go/micro" - "k8s.io/client-go/kubernetes" ) func main() { svc := k8s.NewService(micro.Name("artifact"), micro.Metadata(sdk.SignalMetadata)) svc.Init() - // kubernetes configuration - kubeConfig, _ := os.LookupEnv(common.EnvVarKubeConfig) - rest, err := common.GetClientConfig(kubeConfig) - if err != nil { - panic(err) - } - kubeclient := kubernetes.NewForConfigOrDie(rest) - - // namespace configuration - nm := common.DefaultSensorControllerNamespace - // stream configuration // TODO: make this configurable while running through github.com/micro/go-config + // or use google/go-cloud runtimeVars stream, ok := os.LookupEnv("stream-signal") if !ok { stream = "nats" } streamClient := sdk.NewMicroSignalClient(stream, svc.Client()) - sdk.RegisterSignalServiceHandler(svc.Server(), sdk.NewMicroSignalServer(artifact.New(streamClient, kubeclient, nm))) + sdk.RegisterSignalServiceHandler(svc.Server(), sdk.NewMicroSignalServer(artifact.New(streamClient))) if err := svc.Run(); err != nil { panic(err) diff --git a/signals/artifact/s3.go b/signals/artifact/s3.go index 58d3e5425f..a11b7c65b4 100644 --- a/signals/artifact/s3.go +++ b/signals/artifact/s3.go @@ -22,19 +22,15 @@ import ( "errors" "fmt" "io" - "io/ioutil" "strconv" "strings" "time" "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1" "github.com/argoproj/argo-events/sdk" - "github.com/argoproj/argo-events/store" - "github.com/golang/protobuf/proto" minio "github.com/minio/minio-go" log "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" ) const ( @@ -46,18 +42,12 @@ const ( // receive struct to save or modify state. // Listen() methods CAN retrieve fields from the s3 struct. type s3 struct { - kubeClient kubernetes.Interface - namespace string streamClient sdk.SignalClient } // New creates a new S3 signal -func New(client sdk.SignalClient, kubeClient kubernetes.Interface, nm string) sdk.ArtifactListener { - return &s3{ - streamClient: client, - kubeClient: kubeClient, - namespace: nm, - } +func New(client sdk.SignalClient) sdk.Listener { + return &s3{streamClient: client} } func (s *s3) Listen(signal *v1alpha1.Signal, done <-chan struct{}) (<-chan *v1alpha1.Event, error) { @@ -95,7 +85,6 @@ func (s *s3) Listen(signal *v1alpha1.Signal, done <-chan struct{}) (<-chan *v1al // wait for stop signal go func() { <-done - close(events) // TODO: should we cancel or gracefully shutdown and first send Terminate msg err := stream.Send(sdk.Terminate) if err != nil { @@ -105,7 +94,9 @@ func (s *s3) Listen(signal *v1alpha1.Signal, done <-chan struct{}) (<-chan *v1al if err != nil { log.Panicf("failed to close stream: %s", err) } + log.Printf("shut down signal '%s'", signal.Name) }() + log.Printf("signal '%s' listening for S3 [%s] for bucket [%s]...", signal.Name, signal.Artifact.S3.Event, signal.Artifact.S3.Bucket) return events, nil } @@ -113,23 +104,30 @@ func (s *s3) Listen(signal *v1alpha1.Signal, done <-chan struct{}) (<-chan *v1al // intercepts the receive-only msgs off the stream, filters them, and writes artifact events // to the sendCh. func (s *s3) interceptFilterAndEnhanceEvents(sig *v1alpha1.Signal, sendCh chan *v1alpha1.Event, recvCh <-chan *v1alpha1.Event) { + loc := sig.Artifact.ArtifactLocation defer close(sendCh) for streamEvent := range recvCh { - // todo: apply general filtering on cloudEvents - event := proto.Clone(streamEvent).(*v1alpha1.Event) - notification := &minio.NotificationInfo{} err := json.Unmarshal(streamEvent.Data, notification) if err != nil { // we ignore this - as this stream could be in use by another publisher of different notifications + log.Warnf("failed to unmarshal notification %s: %s", streamEvent.Data, err) continue } if notification.Err != nil { + event := streamEvent.DeepCopy() event.Context.Extensions[sdk.ContextExtensionErrorKey] = notification.Err.Error() + sendCh <- event } for _, record := range notification.Records { - if ok := applyFilter(&record, sig.Artifact.ArtifactLocation); !ok { + event := streamEvent.DeepCopy() + + if ok := applyFilter(&record, loc); !ok { // this record failed to pass the filter so we ignore it + log.Debugf("filtered event - record metadata [bucket: %s, event: %s, key: %s] "+ + "does not match expected s3 [bucket: %s, event: %s, filter: %v]", + record.S3.Bucket.Name, record.EventName, record.S3.Object.Key, + loc.S3.Bucket, loc.S3.Event, loc.S3.Filter) continue } port, _ := strconv.ParseInt(record.Source.Port, 10, 32) @@ -146,13 +144,17 @@ func (s *s3) interceptFilterAndEnhanceEvents(sig *v1alpha1.Signal, sendCh chan * Scheme: record.S3.SchemaVersion, } event.Context.EventID = record.S3.Object.ETag + event.Context.ContentType = "application/json" - // read the actual s3 artifact to put into the event data - b, err := s.Read(&sig.Artifact.ArtifactLocation, record.S3.Object.Key) + // re-marshal each record back into json + recordEvent := new(minio.NotificationEvent) + recordEventBytes, err := json.Marshal(recordEvent) if err != nil { - event.Context.Extensions[sdk.ContextExtensionErrorKey] = err.Error() + log.Warnf("failed to re-marshal notification event into json: %s. falling back to the stream event's original data", err) + event.Data = streamEvent.Data + } else { + event.Data = recordEventBytes } - event.Data = b sendCh <- event } } @@ -195,25 +197,3 @@ func getMetaTimestamp(tStr string) metav1.Time { } return metav1.Time{Time: t} } - -func (s *s3) Read(loc *v1alpha1.ArtifactLocation, key string) ([]byte, error) { - creds, err := store.GetCredentials(s.kubeClient, s.namespace, loc) - if err != nil { - return nil, err - } - client, err := store.NewMinioClient(loc.S3, *creds) - if err != nil { - return nil, err - } - - obj, err := client.GetObject(loc.S3.Bucket, key, minio.GetObjectOptions{}) - if err != nil { - return nil, err - } - defer obj.Close() - b, err := ioutil.ReadAll(obj) - if err != nil { - return nil, err - } - return b, nil -} diff --git a/signals/artifact/s3_test.go b/signals/artifact/s3_test.go index 2ab83fa490..3268c6aa16 100644 --- a/signals/artifact/s3_test.go +++ b/signals/artifact/s3_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -21,6 +21,8 @@ import ( "testing" "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1" + "github.com/argoproj/argo-events/sdk/fake" + minio "github.com/minio/minio-go" ) func TestExtractAndCreateStreamSignal(t *testing.T) { @@ -62,16 +64,9 @@ func TestExtractAndCreateStreamSignal(t *testing.T) { } } -/* func TestSignal(t *testing.T) { - natsEmbeddedServerOpts := server.Options{ - Host: "localhost", - Port: 4223, - NoLog: true, - NoSigs: true, - MaxControlLine: 256, - } - s3 := New(nats.New(), fake.NewSimpleClientset(), "default") + fakeClient := fake.NewClient() + s3 := New(fakeClient) signal := v1alpha1.Signal{ Name: "s3-test", @@ -79,210 +74,99 @@ func TestSignal(t *testing.T) { ArtifactLocation: v1alpha1.ArtifactLocation{ S3: &v1alpha1.S3Artifact{ S3Bucket: v1alpha1.S3Bucket{ - Bucket: "bucket", + Bucket: "images", }, + Key: "myObj.txt", + Event: minio.ObjectCreatedPut, }, }, - NotificationStream: v1alpha1.Stream{ - Type: "NATS", - URL: "nats://" + natsEmbeddedServerOpts.Host + ":" + strconv.Itoa(natsEmbeddedServerOpts.Port), - Attributes: map[string]string{"subject": "test"}, + Target: v1alpha1.Stream{ + Type: "TEST", + URL: "http://unit-tests-are-awesome.com", }, }, } - // start the signal - _, err := s3.Start(&signal) - if err == nil { - t.Errorf("expected: failed to connect to nats cluster\nfound: %s", err) - } - - // run an embedded gnats server - testServer := test.RunServer(&natsEmbeddedServerOpts) - defer testServer.Shutdown() - events, err := s3.Start(&signal) + done := make(chan struct{}) + events, err := s3.Listen(&signal, done) if err != nil { t.Error(err) } - // send a misformed message - should not get anything on channel - conn, err := natsio.Connect(signal.Artifact.NotificationStream.URL) - if err != nil { - t.Fatalf("failed to connect to embedded nats server. cause: %s", err) - } - defer conn.Close() - err = conn.Publish("test", []byte("hello, world")) - if err != nil { - t.Fatalf("failed to publish misformed msg. cause: %s", err) - } - - // send a minio bucket notification with error - err = conn.Publish("test", []byte(notificationInfoWithErr)) - if err != nil { - t.Fatalf("failed to publish notification msg. cause: %s", err) + // send a proper notification + notificationEvent := v1alpha1.Event{ + Context: v1alpha1.EventContext{}, + Data: []byte(notificationInfo), } + fakeClient.Generate(¬ificationEvent) event, ok := <-events if !ok { t.Errorf("expected an event but found none") } - // verify the error extension - errMsg := event.Context.Extensions[shared.ContextExtensionErrorKey] - if errMsg != "this is an error" { - t.Errorf("event context error:\nexpected: %s\nactual: %s", "this is an error", errMsg) - } - - // send a valid minio bucket notification - err = conn.Publish("test", []byte(notificationInfo)) - if err != nil { - t.Fatalf("failed to publish notification msg. cause: %s", err) - } - event, ok = <-events - if !ok { - t.Errorf("expected an event but found none") - } // verify the event data if event.Context.EventType != EventType { t.Errorf("event context EventType:\nexpected: %s\nactual: %s", EventType, event.Context.EventID) } - err = s3.Stop() - if err != nil { - t.Errorf("failed to stop signal. cause: %s", err) - } - + close(done) // ensure events channel is closed if _, ok := <-events; ok { t.Errorf("expected read-only events channel to be closed after signal stop") } } -*/ var notificationInfo = ` { - "EventType": "s3:ObjectCreated:Put", - "Key": "images/myphoto.jpg", - "Records": [ - { - "eventVersion": "2.0", - "eventSource": "minio:s3", - "awsRegion": "", - "eventTime": "2017-07-07T18:46:37Z", - "eventName": "s3:ObjectCreated:Put", - "userIdentity": { - "principalId": "minio" - }, - "requestParameters": { - "sourceIPAddress": "192.168.1.80:55328" - }, - "responseElements": { - "x-amz-request-id": "14CF20BD1EFD5B93", - "x-minio-origin-endpoint": "http://127.0.0.1:9000" - }, - "s3": { - "s3SchemaVersion": "1.0", - "configurationId": "Config", - "bucket": { - "name": "images", - "ownerIdentity": { - "principalId": "minio" - }, - "arn": "arn:aws:s3:::images" - }, - "object": { - "key": "myphoto.jpg", - "size": 248682, - "eTag": "f1671feacb8bbf7b0397c6e9364e8c92", - "contentType": "image/jpeg", - "userDefined": { - "content-type": "image/jpeg" - }, - "versionId": "1", - "sequencer": "14CF20BD1EFD5B93" - } - }, - "source": { - "host": "192.168.1.80", - "port": "55328", - "userAgent": "Minio (linux; amd64) minio-go/2.0.4 mc/DEVELOPMENT.GOGET" - } - } - ], - "level": "info", - "msg": "", - "time": "2017-07-07T11:46:37-07:00" -} -` - -var notificationInfoWithErr = ` -{ - "EventType": "s3:ObjectCreated:Put", - "Key": "images/myphoto.jpg", - "Records": [ - { - "eventVersion": "2.0", - "eventSource": "minio:s3", - "awsRegion": "", - "eventTime": "2017-07-07T18:46:37Z", - "eventName": "s3:ObjectCreated:Put", - "userIdentity": { - "principalId": "minio" - }, - "requestParameters": { - "sourceIPAddress": "192.168.1.80:55328" - }, - "responseElements": { - "x-amz-request-id": "14CF20BD1EFD5B93", - "x-minio-origin-endpoint": "http://127.0.0.1:9000" - }, - "s3": { - "s3SchemaVersion": "1.0", - "configurationId": "Config", - "bucket": { - "name": "images", - "ownerIdentity": { - "principalId": "minio" - }, - "arn": "arn:aws:s3:::images" - }, - "object": { - "key": "myphoto.jpg", - "size": 248682, - "eTag": "f1671feacb8bbf7b0397c6e9364e8c92", - "contentType": "image/jpeg", - "userDefined": { - "content-type": "image/jpeg" - }, - "versionId": "1", - "sequencer": "14CF20BD1EFD5B93" - } - }, - "source": { - "host": "192.168.1.80", - "port": "55328", - "userAgent": "Minio (linux; amd64) minio-go/2.0.4 mc/DEVELOPMENT.GOGET" - } - } - ], - "Err": "this is an error", - "level": "info", - "msg": "", - "time": "2017-07-07T11:46:37-07:00" + "EventType": "s3:ObjectCreated:Put", + "Key": "images/myphoto.jpg", + "Records": [ + { + "eventVersion": "2.0", + "eventSource": "minio:s3", + "awsRegion": "", + "eventTime": "2017-07-07T18:46:37Z", + "eventName": "s3:ObjectCreated:Put", + "userIdentity": { + "principalId": "minio" + }, + "requestParameters": { + "sourceIPAddress": "192.168.1.80:55328" + }, + "responseElements": { + "x-amz-request-id": "14CF20BD1EFD5B93", + "x-minio-origin-endpoint": "http://127.0.0.1:9000" + }, + "s3": { + "s3SchemaVersion": "1.0", + "configurationId": "Config", + "bucket": { + "name": "images", + "ownerIdentity": { + "principalId": "minio" + }, + "arn": "arn:aws:s3:::images" + }, + "object": { + "key": "myphoto.jpg", + "size": 248682, + "eTag": "f1671feacb8bbf7b0397c6e9364e8c92", + "contentType": "image/jpeg", + "userDefined": { + "content-type": "image/jpeg" + }, + "versionId": "1", + "sequencer": "14CF20BD1EFD5B93" + } + }, + "source": { + "host": "192.168.1.80", + "port": "55328", + "userAgent": "Minio (linux; amd64) minio-go/2.0.4 mc/DEVELOPMENT.GOGET" + } + } + ], + "level": "info", + "msg": "", + "time": "2017-07-07T11:46:37-07:00" } ` - -/* -func TestGetProtoTimestamp(t *testing.T) { - tStr := "1970-01-01T00:00:00.00Z" - //"2006-01-02T15:04:05:07Z" - timestamp1 := getProtoTimestamp(tStr) - - t1, err := ptypes.Timestamp(timestamp1) - if err != nil { - t.Error(err) - } - expected := time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC) - if !t1.Equal(expected) { - t.Errorf("times are not equal\nexpected: %s\nactual: %s", expected, t1) - } -} -*/