Skip to content

Commit

Permalink
integration tests: add fakepubsub service
Browse files Browse the repository at this point in the history
  • Loading branch information
Linus Arver committed Jun 10, 2022
1 parent 02b5d7f commit 19076f3
Show file tree
Hide file tree
Showing 6 changed files with 346 additions and 0 deletions.
1 change: 1 addition & 0 deletions .ko.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ baseImageOverrides:
# prow integration test
k8s.io/test-infra/prow/test/integration/cmd/fakeghserver: gcr.io/k8s-prow/alpine:v20200713-e9b3d9d
k8s.io/test-infra/prow/test/integration/cmd/fakegitserver: gcr.io/k8s-prow/git:v20220523-6026203ca9
k8s.io/test-infra/prow/test/integration/cmd/fakepubsub: google/cloud-sdk:389.0.0

# https://pkg.go.dev/cmd/link
# -s: omit symbol/debug info
Expand Down
165 changes: 165 additions & 0 deletions prow/test/integration/cmd/fakepubsub/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// fakepubsub wraps around the official gcloud Pub/Sub emulator.
package main

import (
"context"
"flag"
"fmt"
"os"
"os/exec"
"strings"
"syscall"

"github.com/sirupsen/logrus"

configflagutil "k8s.io/test-infra/prow/flagutil/config"
"k8s.io/test-infra/prow/interrupts"
"k8s.io/test-infra/prow/logrusutil"
"k8s.io/test-infra/prow/pjutil"
"k8s.io/test-infra/prow/test/integration/internal/fakepubsub"
)

type options struct {
// config is the Prow configuration. We need this to read in the
// 'pubsub_subscriptions' field set in the integration test's Prow
// configuration, because we have to initialize (create) these subscriptions
// before sub can start listening to them.
config configflagutil.ConfigOptions
emulatorHostPort string
}

func (o *options) validate() error {
return nil
}

func flagOptions() *options {
// When the KIND cluster starts, the Prow configs get loaded into a
// Kubernetes ConfigMap object. This object is then mounted as a volume into
// the fakepubsub container at the path "/etc/config/config.yaml".
o := &options{config: configflagutil.ConfigOptions{ConfigPath: "/etc/config/config.yaml"}}
fs := flag.NewFlagSet(os.Args[0], flag.ExitOnError)

fs.StringVar(&o.emulatorHostPort, "emulator-host-port", "0.0.0.0:8085", "Host and port of the running Pub/Sub emulator.")
o.config.AddFlags(fs)

fs.Parse(os.Args[1:])

return o
}

func main() {
logrusutil.ComponentInit()

o := flagOptions()
if err := o.validate(); err != nil {
logrus.WithError(err).Fatal("Invalid arguments.")
}

health := pjutil.NewHealth()
health.ServeReady()

defer interrupts.WaitForGracefulShutdown()

if err := startPubSubEmulator(o); err != nil {
logrus.WithError(err).Fatal("could not start Pub/Sub emulator")
}

if err := initEmulatorState(o); err != nil {
logrus.WithError(err).Fatal("Could not initialize emulator state")
}
}

// startPubSubEmulator starts the Pub/Sub Emulator. It's a Java server, so the
// host system needs the JRE installed as well as gcloud cli (this the
// recommended way to start the emulator).
func startPubSubEmulator(o *options) error {
logrus.Info("Starting Pub/Sub emulator...")

args := []string{"beta", "emulators", "pubsub", "start",
fmt.Sprintf("--host-port=%s", o.emulatorHostPort)}
cmd := exec.Command("gcloud", args...)

// Unfortunately the emulator does not really give useful messages about
// what type of gRPC request is being served. Still, this is better than
// nothing.
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
if err := cmd.Start(); err != nil {
return fmt.Errorf("Could not start process: %v", err)
}
logrus.Info("Started Pub/Sub emulator")

pgid, err := syscall.Getpgid(cmd.Process.Pid)
if err != nil {
return fmt.Errorf("Could not get pid: %v", err)
}

// Cleanup. Kill child processes (in our case, the emulator) if we detect
// that we're getting shut down. See
// https://stackoverflow.com/a/29552044/437583.
interrupts.Run(func(ctx context.Context) {
for {
if _, ok := <-ctx.Done(); ok {
syscall.Kill(-pgid, syscall.SIGTERM)
cmd.Wait()
logrus.Info("Pub/Sub emulator exited.")
return
}
}
})

return nil
}

// initEmulatorState creates Pub/Sub topics and subscriptions, because
// every time the emulator starts, it starts off from a clean slate (no topics
// or subscriptions).
func initEmulatorState(o *options) error {
configAgent, err := o.config.ConfigAgent()
if err != nil {
return fmt.Errorf("Error starting config agent: %v", err)
}

subs := configAgent.Config().PubSubSubscriptions

logrus.Info("Initializing Pub/Sub emulator state...")

ctx := context.Background()

for projectID, subscriptionIDs := range subs {
client, err := fakepubsub.NewClient(projectID, o.emulatorHostPort)
if err != nil {
return err
}
for _, subscriptionID := range subscriptionIDs {
// Extract the number part from the subscriptionID. The pattern we use
// for tests is "subscriptionN" where the trailing N is a number.
// Example: For "subscription1", we create "topic1".
numberPart := strings.TrimPrefix(subscriptionID, "subscription")
topicID := "topic" + numberPart
if err := client.CreateSubscription(ctx, projectID, topicID, subscriptionID); err != nil {
return err
}
}
}

return nil
}
60 changes: 60 additions & 0 deletions prow/test/integration/config/prow/cluster/fakepubsub.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
apiVersion: apps/v1
kind: Deployment
metadata:
namespace: default
name: fakepubsub
labels:
app: fakepubsub
spec:
selector:
matchLabels:
app: fakepubsub
replicas: 1
template:
metadata:
labels:
app: fakepubsub
spec:
containers:
- name: fakepubsub
image: localhost:5001/fakepubsub
args:
- --config-path=/etc/config/config.yaml
ports:
- containerPort: 8085
volumeMounts:
- name: config
mountPath: /etc/config
readOnly: true
volumes:
- name: config
configMap:
name: config
---
apiVersion: v1
kind: Service
metadata:
labels:
app: fakepubsub
namespace: default
name: fakepubsub
spec:
ports:
- name: main
# Allow the test code to bypass ingress-nginx, because we don't want to deal
# with TLS or mess with the ingress-nginx configuration. By using this
# nodePort (also configured in KIND's settings), the test code can just use
# localhost:30303 to talk to the Pub/Sub emulator running in port 8085 in
# the fakepubsub container.
nodePort: 30303
# This is the port for the *service* from within the cluster. It's used by
# sub (that is, sub talks to the fakepubsub service via port 80 --- i.e.,
# "fakepubusb.default:80").
port: 80
# 8085 is the default port used by the Pub/Sub emulator that ships with
# gcloud. It is the port exposed by the pod.
targetPort: 8085
protocol: TCP
selector:
app: fakepubsub
type: NodePort
114 changes: 114 additions & 0 deletions prow/test/integration/internal/fakepubsub/fakepubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package fakepubsub

import (
"context"
"encoding/json"
"fmt"
"time"

"cloud.google.com/go/pubsub"
"github.com/sirupsen/logrus"
"google.golang.org/api/option"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"k8s.io/test-infra/prow/pubsub/subscriber"
)

type PubSubMessageForSub struct {
Attributes map[string]string
Data subscriber.ProwJobEvent
}

type Client struct {
projectID string
pubsubClient *pubsub.Client
}

func NewClient(projectID, pubsubEmulatorHost string) (*Client, error) {
client, err := newClientOnEmulator(projectID, pubsubEmulatorHost)
if err != nil {
return nil, fmt.Errorf("Unable to create pubsub client to project %q: %v", projectID, err)
}

return &Client{
projectID: projectID,
pubsubClient: client,
}, nil
}

// newClientOnEmulator returns a pubsub client that is hardcoded to always talk
// to the fakepubsub service running in the test KIND cluster via the
// pubsubEmulatorHost parameter. This is taken from
// https://github.com/googleapis/google-cloud-go/blob/e43c095c94e44a95c618861f9da8f2469b53be16/pubsub/pubsub.go#L126.
// This is better than getting the PUBSUB_EMULATOR_HOST environment variable
// because this makes the code thread-safe (we no longer rely on a global
// environment variable).
func newClientOnEmulator(projectID, pubsubEmulatorHost string) (*pubsub.Client, error) {
conn, err := grpc.Dial(pubsubEmulatorHost, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, fmt.Errorf("grpc.Dial: %v", err)
}
o := []option.ClientOption{option.WithGRPCConn(conn)}
o = append(o, option.WithTelemetryDisabled())
return pubsub.NewClientWithConfig(context.Background(), projectID, nil, o...)
}

// PublishMessage creates a Pub/Sub message that sub understands (to create a
// ProwJob). The podName parameter is used by the integration tests;
// specifically, each test case invocation generates a UUID which is used as the
// name of the ProwJob CR. Then when the test pod is created, it is also named
// with the same UUID. This makes checking for the creation of jobs and pods
// very easy in the tests.
func (c *Client) PublishMessage(ctx context.Context, msg PubSubMessageForSub, topicID string) error {
bytes, err := json.Marshal(msg.Data)
if err != nil {
return fmt.Errorf("failed to marshal: %v", err)
}

t := c.pubsubClient.Topic(topicID)
result := t.Publish(ctx, &pubsub.Message{Data: bytes, Attributes: msg.Attributes})

id, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("failed to publish: %v", err)
}

logrus.Infof("successfully published message %v; msg ID: %v", string(bytes), id)

return nil
}

// CreateSubscription creates a Pub/Sub topic and a corresponding subscription.
func (c *Client) CreateSubscription(ctx context.Context, projectID, topicID, subscriptionID string) error {
topic, err := c.pubsubClient.CreateTopic(ctx, topicID)
if err != nil {
return err
}

if _, err := c.pubsubClient.CreateSubscription(ctx, subscriptionID, pubsub.SubscriptionConfig{
Topic: topic,
AckDeadline: 10 * time.Second,
ExpirationPolicy: 25 * time.Hour,
}); err != nil {
return err
}

return nil
}
3 changes: 3 additions & 0 deletions prow/test/integration/lib.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ declare -ra PROW_COMPONENTS=(
fakegerritserver
fakegitserver
fakeghserver
fakepubsub
gerrit
hook
horologium
Expand Down Expand Up @@ -67,6 +68,7 @@ declare -rA PROW_IMAGES=(
[fakegerritserver]=prow/test/integration/cmd/fakegerritserver
[fakegitserver]=prow/test/integration/cmd/fakegitserver
[fakeghserver]=prow/test/integration/cmd/fakeghserver
[fakepubsub]=prow/test/integration/cmd/fakepubsub
# Utility images. These images are not Prow components per se, and so do not
# have corresponding Kubernetes configurations.
[clonerefs]=prow/cmd/clonerefs
Expand All @@ -91,6 +93,7 @@ declare -rA PROW_IMAGES_TO_COMPONENTS=(
[fakegerritserver]=fakegerritserver
[fakegitserver]=fakegitserver
[fakeghserver]=fakeghserver
[fakepubsub]=fakepubsub
)

function do_kubectl() {
Expand Down
3 changes: 3 additions & 0 deletions prow/test/integration/setup-kind-cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ nodes:
- containerPort: 443
hostPort: 443
protocol: TCP
- containerPort: 30303
hostPort: 30303
protocol: TCP
EOF

}
Expand Down

0 comments on commit 19076f3

Please sign in to comment.