Skip to content

Commit

Permalink
Merge pull request #28 from softonic/fix/middleware-message-handling
Browse files Browse the repository at this point in the history
fix: middleware message handling
  • Loading branch information
mindhells authored Feb 6, 2025
2 parents 646dbc5 + 0f9634c commit 740e489
Show file tree
Hide file tree
Showing 8 changed files with 108 additions and 55 deletions.
8 changes: 7 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ FROM golang:1.23-bookworm AS build
RUN apt-get update \
&& apt-get install -y protobuf-compiler

RUN groupadd --gid 1000 app && useradd -u 1000 -g app app

WORKDIR /go/src/github.com/softonic/homing-pigeon

COPY . .
Expand All @@ -12,6 +14,10 @@ RUN make build &&\

FROM scratch

COPY --from=build /go/src/github.com/softonic/homing-pigeon/bin/homing-pigeon /
COPY --from=build /etc/passwd /etc/passwd
COPY --from=build /etc/group /etc/group
COPY --chown=app:app --from=build /go/src/github.com/softonic/homing-pigeon/bin/homing-pigeon /

USER app

ENTRYPOINT ["/homing-pigeon", "-logtostderr"]
10 changes: 9 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,15 @@ build: dep generate-proto
stress-build: dep generate-proto
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -ldflags="-w -s" -o bin/stress-pigeon pkg/stress/main.go
docker-build:
docker build -t softonic/homing-pigeon:${TAG} .
docker buildx create --name hp-image-builder --driver docker-container --bootstrap 2> /dev/null || true
docker buildx use hp-image-builder
docker buildx build \
--pull \
--push \
-f ./Dockerfile \
. \
--platform linux/arm64,linux/amd64 \
-t softonic/homing-pigeon:${TAG}
mock:
mockery --name=WriteAdapter -r
mockery --name=Channel -r
Expand Down
27 changes: 16 additions & 11 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,35 @@ services:
ACK_BUFFER_LENGTH: "500"
GRPC_GO_LOG_VERBOSITY_LEVEL: 99
GRPC_GO_LOG_SEVERITY_LEVEL: info
REQUEST_MIDDLEWARES_SOCKET: "passthrough:///unix:///tmp/hprq"
RESPONSE_MIDDLEWARES_SOCKET: "passthrough:///unix:///tmp/hprp"
REQUEST_MIDDLEWARES_SOCKET: "passthrough:///unix:///tmp/req1"
RESPONSE_MIDDLEWARES_SOCKET: "passthrough:///unix:///tmp/res1"
READ_ADAPTER: "AMQP"
WRITE_ADAPTER: "ELASTIC"
command: ["-stderrthreshold=INFO"]
depends_on:
rabbit-mq:
condition: service_healthy
request-middleware-pass:
platform: linux/amd64
volumes:
- shared_socket:/tmp
image: softonic/hp-throttling:0.1.0
image: softonic/hp-pass-middleware:dev
environment:
IN_SOCKET: "/tmp/hprq"
THROTTLE_LIMIT: "10"
THROTTLE_BURST: "10"
IN_SOCKET: "/tmp/req1"
command: ["-stderrthreshold=INFO"]
reponse-middleware-pass:
platform: linux/amd64
response-middleware-pass:
volumes:
- shared_socket:/tmp
image: softonic/hp-pass-middleware:dev
environment:
IN_SOCKET: "/tmp/res1"
OUT_SOCKET: "passthrough:///unix:///tmp/res2"
command: ["-stderrthreshold=INFO"]
response-middleware-pass-2:
volumes:
- shared_socket:/tmp
image: softonic/hp-pass-middleware:0.1.0
image: softonic/hp-pass-middleware:dev
environment:
IN_SOCKET: "/tmp/hprp"
IN_SOCKET: "/tmp/res2"
command: ["-stderrthreshold=INFO"]
rabbit-mq:
image: rabbitmq:3.8-management
Expand Down
7 changes: 3 additions & 4 deletions pkg/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ import (
"os"
"strconv"

// Third-party packages, with a blank line separator
"k8s.io/klog"
_ "go.uber.org/automaxprocs"
// Third-party packages, with a blank line separator
_ "go.uber.org/automaxprocs"
"k8s.io/klog"

"github.com/softonic/homing-pigeon/pkg/helpers"
"github.com/softonic/homing-pigeon/pkg/messages"
"github.com/softonic/homing-pigeon/pkg/middleware"
"github.com/softonic/homing-pigeon/pkg/readers"
"github.com/softonic/homing-pigeon/pkg/writers"

)

func main() {
Expand Down
15 changes: 15 additions & 0 deletions pkg/middleware/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,18 @@ type Middleware interface {
proto.MiddlewareServer
Next(req *proto.Data) (*proto.Data, error)
}

var (
// see https://github.com/grpc/grpc/blob/master/doc/service_config.md to know more about service config
defaultRetryPolicy = `{
"methodConfig": [{
"name": [{"service": "proto.Middleware"}],
"retryPolicy": {
"MaxAttempts": 5,
"InitialBackoff": "1s",
"MaxBackoff": "9s",
"BackoffMultiplier": 3.0,
"RetryableStatusCodes": [ "UNAVAILABLE" ]
}
}]}`
)
18 changes: 12 additions & 6 deletions pkg/middleware/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ func (m *MiddlwareManager) Start() {
klog.V(1).Infof("Middlewares available")

var opts []grpc.DialOption
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
opts = append(opts,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(defaultRetryPolicy))

conn, err := grpc.NewClient(m.MiddlewareAddress, opts...)
if err != nil {
Expand All @@ -46,18 +48,22 @@ func (m *MiddlwareManager) Start() {
klog.V(5).Infof("Sending message to proto")
start := time.Now()

data, err := client.Handle(context.Background(), &proto.Data{
// wait for ready up to 12 seconds (including retries)
// to handle service discontinuity (external middlewares) or startup order
ctxTimeout, cancelTimeout := context.WithTimeout(context.Background(), 31*time.Second)
handleData, err := client.Handle(ctxTimeout, &proto.Data{
Body: message.Body,
})
}, grpc.WaitForReady(true))
cancelTimeout()
if err != nil {
klog.Errorf("What happens!? %v", err)
klog.Errorf("Error calling middleware %v", err)
} else {
message.Body = handleData.GetBody()
}

elapsed := time.Since(start)
klog.V(5).Infof("Middlewares took %s", elapsed)

message.Body = data.GetBody()

m.OutputChannel <- message
}
}
Expand Down
67 changes: 39 additions & 28 deletions pkg/middleware/unimplemented.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"net"
"os"
"path/filepath"
"time"

. "github.com/softonic/homing-pigeon/pkg/helpers"
"github.com/softonic/homing-pigeon/proto"
Expand All @@ -20,14 +21,20 @@ type UnimplementedMiddleware struct {
}

func (b *UnimplementedMiddleware) Next(req *proto.Data) (*proto.Data, error) {
resp := req
if b.client != nil {
var err error
_, err = (*b.client).Handle(context.Background(), req)
return nil, err
klog.V(0).Info("processing next middleware")
ctxTimeout, cancelTimeout := context.WithTimeout(context.Background(), 31*time.Second)
nextResp, err := (*b.client).Handle(ctxTimeout, req, grpc.WaitForReady(true))
cancelTimeout()
if err != nil {
klog.Errorf("Next middleware error %v", err)
} else {
klog.V(0).Info("next middleware processed")
return nextResp, nil
}
}

return resp, nil
// if there is no next middleware or it fails, return the same request
return req, nil
}

func (b *UnimplementedMiddleware) Listen(middleware proto.MiddlewareServer) {
Expand All @@ -53,35 +60,39 @@ func (b *UnimplementedMiddleware) Listen(middleware proto.MiddlewareServer) {

func (b *UnimplementedMiddleware) getInputSocket() string {
socket := GetEnv("IN_SOCKET", "")

err := os.RemoveAll(socket)
if err != nil {
klog.Errorf("Failed to remove socket: %v", err)
}

err = os.MkdirAll(filepath.Dir(socket), 0775)
if err != nil {
klog.Errorf("Error creating socket directory: %v", err)
if _, err := os.Stat(socket); err == nil {
klog.Infof("Removing existing socket %s", socket)
err := os.Remove(socket)
if err != nil {
klog.Errorf("Failed to remove socket: %v", err)
}
} else {
err = os.MkdirAll(filepath.Dir(socket), 0775)
if err != nil {
klog.Errorf("Error creating socket directory: %v", err)
}
}
return socket
}

func (b *UnimplementedMiddleware) getOutputGrpc() (*grpc.ClientConn, *proto.MiddlewareClient) {
nextSocketAddr := GetEnv("OUT_SOCKET", "")
if nextSocketAddr != "" {
var opts []grpc.DialOption
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))

conn, err := grpc.NewClient(nextSocketAddr, opts...)
if err != nil {
klog.Errorf("fail to dial: %v", err)
}

klog.V(0).Info("Connected to the next middleware")
if nextSocketAddr == "" {
return nil, nil
}
var opts []grpc.DialOption
opts = append(opts,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(defaultRetryPolicy))

client := proto.NewMiddlewareClient(conn)
return conn, &client
conn, err := grpc.NewClient(nextSocketAddr, opts...)
if err != nil {
klog.Errorf("failed to create OUT_SOCKET client: %v", err)
return nil, nil
}

return nil, nil
klog.V(0).Info("Connected to the next middleware")

client := proto.NewMiddlewareClient(conn)
return conn, &client
}
11 changes: 7 additions & 4 deletions send.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import string
import random
import pika
import sys
import time
Expand All @@ -8,8 +10,8 @@
RABBITMQ_PASSWORD = 'guest'
EXCHANGE_NAME = 'homing-pigeon'
ROUTING_KEY = '' # Leave empty or set appropriately
MESSAGE_COUNT = 100000 # Number of messages to send
MESSAGE_BODY = '{"meta": { "index" : { "_index" : "test", "_id" : "1" } },"data": { "field1" : "value1" }}'
MESSAGE_COUNT = 10 # Number of messages to send
MESSAGE_BODY = '{"meta": { "index" : { "_index" : "test", "_id" :"$$ID_PLACE_HOLDER" } },"data": { "field1" : "value1" }}'

def send_messages():
try:
Expand All @@ -24,13 +26,14 @@ def send_messages():

for i in range(1, MESSAGE_COUNT + 1):
# Publish message
messageId = ''.join(random.choices(string.ascii_letters + string.digits, k=30))
channel.basic_publish(
exchange=EXCHANGE_NAME,
routing_key=ROUTING_KEY,
body=MESSAGE_BODY,
body=MESSAGE_BODY.replace("$$ID_PLACE_HOLDER", messageId),
properties=pika.BasicProperties(delivery_mode=2), # Make messages persistent
)

# Print progress every 10,000 messages
if i % 10000 == 0:
print(f"Sent {i} messages...")
Expand Down

0 comments on commit 740e489

Please sign in to comment.