Skip to content

Commit

Permalink
fix: gateway client http requests (#684)
Browse files Browse the repository at this point in the history
  • Loading branch information
VaibhavPage authored Jun 7, 2020
1 parent e4456e1 commit b0537d6
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 91 deletions.
4 changes: 2 additions & 2 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ const (
EnvVarGatewayServerPort = "GATEWAY_SERVER_PORT"
// ProcessorPort is the default port for the gateway event processor server to run on.
GatewayProcessorPort = "9300"
//LabelGatewayName is the label for gateway name
LabelGatewayName = "gateway-name"
)

const (
Expand All @@ -108,8 +110,6 @@ const (
LabelOwnerName = "owner-name"
// LabelObjectName is the label for object name
LabelObjectName = "object-name"
//LabelGatewayName is the label for gateway name
LabelGatewayName = "gateway-name"
)

// various supported media types
Expand Down
4 changes: 3 additions & 1 deletion controllers/sensor/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ func (ctx *sensorContext) generateServiceSpec() *corev1.Service {
},
Type: corev1.ServiceTypeClusterIP,
Selector: map[string]string{
common.LabelOwnerName: ctx.sensor.Name,
common.LabelSensorName: ctx.sensor.Name,
common.LabelOwnerName: ctx.sensor.Name,
},
},
}
Expand Down Expand Up @@ -88,6 +89,7 @@ func (ctx *sensorContext) makeDeploymentSpec() (*appv1.DeploymentSpec, error) {

replicas := int32(1)
labels := map[string]string{
common.LabelSensorName: ctx.sensor.Name,
common.LabelObjectName: ctx.sensor.Name,
}
sensorContainer := corev1.Container{
Expand Down
77 changes: 33 additions & 44 deletions gateways/client/cloud-events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@ limitations under the License.
package main

import (
"context"
"bytes"
"encoding/json"
"fmt"
"net/http"
"time"

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/gateways"
cloudevents "github.com/cloudevents/sdk-go"
cloudeventsnats "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/nats"
"github.com/google/uuid"
"github.com/nats-io/go-nats"
"github.com/sirupsen/logrus"
)

// updateSubscriberClients updates the active clients for event subscribers
Expand All @@ -34,51 +37,21 @@ func (gatewayContext *GatewayContext) updateSubscriberClients() {
return
}

if gatewayContext.httpSubscribers == nil {
gatewayContext.httpSubscribers = make(map[string]cloudevents.Client)
}
if gatewayContext.natsSubscribers == nil {
gatewayContext.natsSubscribers = make(map[string]cloudevents.Client)
}

// http subscribers
for _, subscriber := range gatewayContext.gateway.Spec.Subscribers.HTTP {
if _, ok := gatewayContext.httpSubscribers[subscriber]; !ok {
t, err := cloudevents.NewHTTPTransport(
cloudevents.WithTarget(subscriber),
cloudevents.WithEncoding(cloudevents.HTTPBinaryV03),
)
if err != nil {
gatewayContext.logger.WithError(err).WithField("subscriber", subscriber).Warnln("failed to create a transport")
continue
}

client, err := cloudevents.NewClient(t)
if err != nil {
gatewayContext.logger.WithError(err).WithField("subscriber", subscriber).Warnln("failed to create a client")
continue
}
gatewayContext.logger.WithField("subscriber", subscriber).Infoln("added a client for the subscriber")
gatewayContext.httpSubscribers[subscriber] = client
}
gatewayContext.natsSubscribers = make(map[string]*nats.Conn)
}

// nats subscribers
for _, subscriber := range gatewayContext.gateway.Spec.Subscribers.NATS {
if _, ok := gatewayContext.natsSubscribers[subscriber.Name]; !ok {
t, err := cloudeventsnats.New(subscriber.ServerURL, subscriber.Subject)
conn, err := nats.Connect(subscriber.ServerURL)
if err != nil {
gatewayContext.logger.WithError(err).WithField("subscriber", subscriber).Warnln("failed to create a transport")
gatewayContext.logger.WithError(err).WithField("subscriber", subscriber).Warnln("failed to connect to subscriber")
continue
}

client, err := cloudevents.NewClient(t)
if err != nil {
gatewayContext.logger.WithError(err).WithField("subscriber", subscriber).Warnln("failed to create a client")
continue
}
gatewayContext.logger.WithField("subscriber", subscriber).Infoln("added a client for the subscriber")
gatewayContext.natsSubscribers[subscriber.Name] = client
gatewayContext.natsSubscribers[subscriber.Name] = conn
}
}
}
Expand All @@ -100,37 +73,53 @@ func (gatewayContext *GatewayContext) dispatchEvent(gatewayEvent *gateways.Event

completeSuccess := true

eventBody, err := json.Marshal(cloudEvent)
if err != nil {
logger.WithError(err).Errorln("failed to marshal the event")
return err
}

// http subscribers
for _, subscriber := range gatewayContext.gateway.Spec.Subscribers.HTTP {
client, ok := gatewayContext.httpSubscribers[subscriber]
if !ok {
gatewayContext.logger.WithField("subscriber", subscriber).Warnln("unable to send event. no client found for the subscriber")
request, err := http.NewRequest(http.MethodPost, subscriber, bytes.NewReader(eventBody))
if err != nil {
logger.WithError(err).WithField("subscriber", subscriber).Warnln("failed to construct http request for the event")
completeSuccess = false
continue
}

_, _, err := client.Send(context.Background(), *cloudEvent)
response, err := gatewayContext.httpClient.Do(request)
if err != nil {
logger.WithError(err).WithField("target", subscriber).Warnln("failed to send the event")
logger.WithError(err).WithField("subscriber", subscriber).Warnln("failed to send http request for the event")
completeSuccess = false
continue
}

logger.WithFields(logrus.Fields{
"status": response.Status,
"subscriber": subscriber,
}).Infoln("successfully sent event to the subscriber")
}

// NATS subscribers
for _, subscriber := range gatewayContext.gateway.Spec.Subscribers.NATS {
client, ok := gatewayContext.natsSubscribers[subscriber.Name]
conn, ok := gatewayContext.natsSubscribers[subscriber.Name]
if !ok {
gatewayContext.logger.WithField("subscriber", subscriber).Warnln("unable to send event. no client found for the subscriber")
completeSuccess = false
continue
}

if _, _, err := client.Send(context.Background(), *cloudEvent); err != nil {
logger.WithError(err).WithField("target", subscriber).Warnln("failed to send the event")
if err := conn.Publish(subscriber.Subject, eventBody); err != nil {
logger.WithError(err).WithField("subscriber", subscriber).Warnln("failed to publish the event")
completeSuccess = false
continue
}

logger.WithFields(logrus.Fields{
"subscriber": subscriber.Name,
"subject": subscriber.Subject,
}).Infoln("successfully published event on the subject")
}

response := "dispatched event"
Expand Down
13 changes: 7 additions & 6 deletions gateways/client/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ package main

import (
"context"
"net/http"
"os"

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/gateways"
"github.com/argoproj/argo-events/pkg/apis/gateway/v1alpha1"
eventsourceClientset "github.com/argoproj/argo-events/pkg/client/eventsources/clientset/versioned"
gwclientset "github.com/argoproj/argo-events/pkg/client/gateway/clientset/versioned"
cloudevents "github.com/cloudevents/sdk-go"
"github.com/nats-io/go-nats"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -60,10 +61,10 @@ type GatewayContext struct {
controllerInstanceID string
// statusCh is used to communicate the status of an event source
statusCh chan notification
// httpSubscribers holds the active clients for HTTP subscribers
httpSubscribers map[string]cloudevents.Client
// http client to send cloud events to subscribers
httpClient *http.Client
// natsSubscribers holds the active clients for NATS subscribers
natsSubscribers map[string]cloudevents.Client
natsSubscribers map[string]*nats.Conn
}

// EventSourceContext contains information of a event source for gateway to run.
Expand Down Expand Up @@ -130,8 +131,8 @@ func NewGatewayContext() *GatewayContext {
controllerInstanceID: controllerInstanceID,
serverPort: serverPort,
statusCh: make(chan notification),
httpSubscribers: make(map[string]cloudevents.Client),
natsSubscribers: make(map[string]cloudevents.Client),
httpClient: &http.Client{},
natsSubscribers: make(map[string]*nats.Conn),
}

return gatewayConfig
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ require (
github.com/mitchellh/reflectwalk v1.0.1 // indirect
github.com/nats-io/gnatsd v1.4.1 // indirect
github.com/nats-io/go-nats v1.7.2
github.com/nats-io/nats.go v1.9.2 // indirect
github.com/nats-io/nats.go v1.10.0 // indirect
github.com/nicksnyder/go-i18n v1.10.1-0.20190510212457-b280125b035a // indirect
github.com/nlopes/slack v0.6.1-0.20200219171353-c05e07b0a5de
github.com/nsqio/go-nsq v1.0.8
Expand Down
Loading

0 comments on commit b0537d6

Please sign in to comment.