Skip to content

Commit

Permalink
Add authzHandler to sdk-validator's grpc server (#797)
Browse files Browse the repository at this point in the history
### Description of change
Add authzHandler to sdk-validator's grpc server

- Add CommonHandler
- Refactor FlowControlHandler with CommonHandler

##### Checklist

- [ ] Tested in playground or other setup
- [ ] Documentation is changed or added
- [ ] Tests and/or benchmarks are included
- [ ] Breaking changes

<!-- Reviewable:start -->
- - -
This change is [<img src="https://reviewable.io/review_button.svg"
height="34" align="absmiddle"
alt="Reviewable"/>](https://reviewable.io/reviews/fluxninja/aperture/797)
<!-- Reviewable:end -->
  • Loading branch information
hasit authored Nov 7, 2022
1 parent 993ccc8 commit 2bbfe8d
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 58 deletions.
64 changes: 49 additions & 15 deletions cmd/sdk-validator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,23 @@ import (
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/docker/go-connections/nat"
authv3 "github.com/envoyproxy/go-control-plane/envoy/service/auth/v3"
tracev1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"

flowcontrolv1 "github.com/fluxninja/aperture/api/gen/proto/go/aperture/flowcontrol/check/v1"
"github.com/fluxninja/aperture/cmd/sdk-validator/validator"
"github.com/fluxninja/aperture/pkg/log"
"github.com/fluxninja/aperture/pkg/policies/flowcontrol/resources/classifier"
"github.com/fluxninja/aperture/pkg/policies/flowcontrol/service/envoy"
"github.com/fluxninja/aperture/pkg/status"
)

var (
logger *log.Logger
failed bool
logger *log.Logger
spanFailed bool
authzFailed bool
)

func init() {
Expand Down Expand Up @@ -60,6 +65,8 @@ func main() {
log.Info().Str("image", *sdkDockerImage).Str("id", id).Msg("Container started")
}

sdkURL := fmt.Sprintf("http://localhost:%s", *sdkPort)

// create listener for grpc server
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%s", *port))
if err != nil {
Expand All @@ -70,13 +77,21 @@ func main() {
grpcServer := grpc.NewServer(grpc.UnaryInterceptor(serverInterceptor))
reflection.Register(grpcServer)

// instantiate and register flowcontrol handler
flowcontrolHandler := &validator.FlowControlHandler{
commonHandler := &validator.CommonHandler{
Rejects: *rejects,
Rejected: 0,
}

// instantiate and register flowcontrol handler
flowcontrolHandler := &validator.FlowControlHandler{
CommonHandler: commonHandler,
}
flowcontrolv1.RegisterFlowControlServiceServer(grpcServer, flowcontrolHandler)

reg := status.NewRegistry(log.GetGlobalLogger())
authzHandler := envoy.NewHandler(classifier.NewClassificationEngine(reg), nil, commonHandler)
authv3.RegisterAuthorizationServer(grpcServer, authzHandler)

// initiate and register otel trace handler
traceHandler := &validator.TraceHandler{}
tracev1.RegisterTraceServiceServer(grpcServer, traceHandler)
Expand All @@ -90,31 +105,45 @@ func main() {
go func() {
s := <-sigCh
log.Info().Interface("signal", s).Msg("Got signal, attempting graceful shutdown")
grpcServer.GracefulStop()

log.Info().Msg("Validating fail-open behavior")
rejected := startTraffic(sdkURL, *requests)
l := log.With().Int("total requests", *requests).Int64("expected rejections", 0).Int("got rejections", rejected).Logger()
if rejected != 0 {
l.Error().Msg("Fail-open validation failed")
} else {
l.Info().Msg("Fail-open validation successful")
}

if *sdkDockerImage != "" {
log.Info().Interface("id", id).Msg("Stopping Docker container")
err = stopDockerContainer(id)
if err != nil {
log.Fatal().Err(err).Msg("Failed to stop Docker container")
}
}
grpcServer.GracefulStop()
wg.Done()
}()

if *sdkDockerImage != "" {
wg.Add(1)
go func() {
rejected := confirmConnectedAndStartTraffic(*sdkPort, *requests)
rejected := confirmConnectedAndStartTraffic(sdkURL, *requests)
l := log.With().Int("total requests", *requests).Int64("expected rejections", *rejects).Int("got rejections", rejected).Logger()
if rejected != int(*rejects) {
l.Error().Msg("FlowControl validation failed")
validation = 1
}

if failed {
if spanFailed {
l.Error().Msg("Span attributes validation failed")
validation = 1
}
if authzFailed {
l.Error().Msg("Authz validation failed")
validation = 1
}

if validation == 0 {
l.Info().Msg("Validation successful")
Expand Down Expand Up @@ -143,9 +172,11 @@ func serverInterceptor(ctx context.Context, req interface{}, info *grpc.UnarySer
log.Info().Str("method", info.FullMethod).Dur("latency", time.Since(start)).Msg("Request served")
if err != nil {
log.Error().Err(err).Msg("Handler returned error")
}
if err != nil {
failed = true
if info.FullMethod == "/opentelemetry.proto.collector.trace.v1.TraceService/Export" {
spanFailed = true
} else if info.FullMethod == "/envoy.service.auth.v3.Authorization/Check" {
authzFailed = true
}
}
return h, err
}
Expand Down Expand Up @@ -218,10 +249,7 @@ func stopDockerContainer(id string) error {
return nil
}

func confirmConnectedAndStartTraffic(port string, requests int) int {
rejected := 0
url := fmt.Sprintf("http://localhost:%s", port)

func confirmConnectedAndStartTraffic(url string, requests int) int {
for {
req, err := http.NewRequest(http.MethodGet, url+"/connected", nil)
if err != nil {
Expand All @@ -238,6 +266,12 @@ func confirmConnectedAndStartTraffic(port string, requests int) int {
}
log.Info().Msg("SDK example successfully connected to validator")

rejected := startTraffic(url, requests)
return rejected
}

func startTraffic(url string, requests int) int {
rejected := 0
superReq, err := http.NewRequest(http.MethodGet, url+"/super", nil)
if err != nil {
log.Error().Err(err).Str("url", superReq.URL.String()).Msg("Failed to create http request")
Expand All @@ -248,7 +282,7 @@ func confirmConnectedAndStartTraffic(port string, requests int) int {
log.Error().Err(err).Str("url", superReq.URL.String()).Msg("Failed to make http request")
}
res.Body.Close()
if res.StatusCode != http.StatusAccepted {
if (res.StatusCode > 400 && res.StatusCode < 500) || (res.StatusCode > 500 && res.StatusCode < 600) {
rejected += 1
}
}
Expand Down
41 changes: 41 additions & 0 deletions cmd/sdk-validator/validator/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package validator

import (
"context"
"sync/atomic"

"golang.org/x/exp/maps"

flowcontrolv1 "github.com/fluxninja/aperture/api/gen/proto/go/aperture/flowcontrol/check/v1"
"github.com/fluxninja/aperture/pkg/log"
"github.com/fluxninja/aperture/pkg/policies/flowcontrol/selectors"
"github.com/fluxninja/aperture/pkg/policies/flowcontrol/service/check"
)

// CommonHandler implements common.HandlerWithValues.
type CommonHandler struct {
check.HandlerWithValues

Rejects int64
Rejected int64
}

// CheckWithValues is a dummy function for creating *flowcontrolv1.CheckResponse from given parameters.
func (c *CommonHandler) CheckWithValues(ctx context.Context, services []string, controlPoint selectors.ControlPoint, labels map[string]string) *flowcontrolv1.CheckResponse {
resp := &flowcontrolv1.CheckResponse{
DecisionType: flowcontrolv1.CheckResponse_DECISION_TYPE_ACCEPTED,
FlowLabelKeys: maps.Keys(labels),
Services: services,
ControlPointInfo: controlPoint.ToControlPointInfoProto(),
RejectReason: flowcontrolv1.CheckResponse_REJECT_REASON_NONE,
}

if c.Rejected != c.Rejects {
log.Trace().Msg("Rejecting call")
resp.DecisionType = flowcontrolv1.CheckResponse_DECISION_TYPE_REJECTED
resp.RejectReason = flowcontrolv1.CheckResponse_REJECT_REASON_RATE_LIMITED
atomic.AddInt64(&c.Rejected, 1)
}

return resp
}
35 changes: 5 additions & 30 deletions cmd/sdk-validator/validator/flowcontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,27 @@ package validator

import (
"context"
"sync/atomic"
"time"

"golang.org/x/exp/maps"
"google.golang.org/grpc/peer"
"google.golang.org/protobuf/types/known/timestamppb"

flowcontrolv1 "github.com/fluxninja/aperture/api/gen/proto/go/aperture/flowcontrol/check/v1"
"github.com/fluxninja/aperture/pkg/log"
"github.com/fluxninja/aperture/pkg/policies/flowcontrol/selectors"
"github.com/fluxninja/aperture/pkg/policies/flowcontrol/service/check"
)

// FlowControlHandler implements FlowControlService.
type FlowControlHandler struct {
flowcontrolv1.UnimplementedFlowControlServiceServer

Rejects int64
Rejected int64
CommonHandler check.HandlerWithValues
}

// Check is a dummy Check handler.
func (f *FlowControlHandler) Check(ctx context.Context, req *flowcontrolv1.CheckRequest) (*flowcontrolv1.CheckResponse, error) {
log.Trace().Msg("Received Check request")
log.Trace().Msg("Received FlowControl Check request")

services := []string{}
rpcPeer, peerExists := peer.FromContext(ctx)
Expand All @@ -32,35 +31,11 @@ func (f *FlowControlHandler) Check(ctx context.Context, req *flowcontrolv1.Check
}

start := time.Now()
resp := f.check(ctx, req.Feature, req.Labels, services)
resp := f.CommonHandler.CheckWithValues(ctx, services, selectors.NewControlPoint(flowcontrolv1.ControlPointInfo_TYPE_FEATURE, req.Feature), req.Labels)
end := time.Now()

resp.Start = timestamppb.New(start)
resp.End = timestamppb.New(end)

return resp, nil
}

func (f *FlowControlHandler) check(ctx context.Context, feature string, labels map[string]string, services []string) *flowcontrolv1.CheckResponse {
resp := &flowcontrolv1.CheckResponse{
DecisionType: flowcontrolv1.CheckResponse_DECISION_TYPE_ACCEPTED,
FlowLabelKeys: maps.Keys(labels),
Services: services,
ControlPointInfo: &flowcontrolv1.ControlPointInfo{
Feature: feature,
Type: flowcontrolv1.ControlPointInfo_TYPE_FEATURE,
},
RejectReason: flowcontrolv1.CheckResponse_REJECT_REASON_NONE,
}

// randomly reject requests based on rejectRatio
// nolint:gosec
if f.Rejected != f.Rejects {
log.Trace().Msg("Rejecting call")
resp.DecisionType = flowcontrolv1.CheckResponse_DECISION_TYPE_REJECTED
resp.RejectReason = flowcontrolv1.CheckResponse_REJECT_REASON_RATE_LIMITED
atomic.AddInt64(&f.Rejected, 1)
}

return resp
}
24 changes: 20 additions & 4 deletions cmd/sdk-validator/validator/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package validator

import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"strings"

tracev1 "go.opentelemetry.io/proto/otlp/collector/trace/v1"
"go.uber.org/multierr"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"

flowcontrolv1 "github.com/fluxninja/aperture/api/gen/proto/go/aperture/flowcontrol/check/v1"
"github.com/fluxninja/aperture/pkg/log"
Expand Down Expand Up @@ -35,9 +38,22 @@ func (t TraceHandler) Export(ctx context.Context, req *tracev1.ExportTraceServic
log.Trace().Str("attribute", otelcollector.ApertureCheckResponseLabel).Msg("Validating attribute")
v := attribute.Value.GetStringValue()
checkResponse := &flowcontrolv1.CheckResponse{}
perr := protojson.Unmarshal([]byte(v), checkResponse)
var wireMsg []byte
if !strings.HasPrefix(v, "{") {
wireMsg, err = base64.StdEncoding.DecodeString(v)
if err != nil {
log.Error().Err(err).Msg("Failed to unmarshal as base64")
}
perr := proto.Unmarshal(wireMsg, checkResponse)
if err != nil {
log.Error().Err(err).Msg("Failed to unmarshal as protobuf")
err = multierr.Append(err, fmt.Errorf("invalid %s: %w", otelcollector.ApertureCheckResponseLabel, perr))
}
continue
}
perr := json.Unmarshal([]byte(v), checkResponse)
if perr != nil {
log.Error().Err(perr).Msg("Failed to validate flowcontrol CheckResponse")
log.Error().Err(err).Msg("Failed to unmarshal as json")
err = multierr.Append(err, fmt.Errorf("invalid %s: %w", otelcollector.ApertureCheckResponseLabel, perr))
}
case otelcollector.ApertureSourceLabel:
Expand All @@ -50,7 +66,7 @@ func (t TraceHandler) Export(ctx context.Context, req *tracev1.ExportTraceServic
case otelcollector.ApertureFeatureStatusLabel:
log.Trace().Str("attribute", otelcollector.ApertureFeatureStatusLabel).Msg("Validating attribute")
v := attribute.Value.GetStringValue()
if v != otelcollector.ApertureFeatureStatusOK && v != otelcollector.ApertureFeatureStatusError {
if v != otelcollector.ApertureFeatureStatusOK && v != otelcollector.ApertureFeatureStatusError && v != "Unset" {
log.Error().Msg("Failed to validate feature status")
err = multierr.Append(err, fmt.Errorf("invalid %s", otelcollector.ApertureFeatureStatusLabel))
}
Expand Down
2 changes: 1 addition & 1 deletion sdks/aperture-java/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ plugins {
}

application {
mainClass.set("com.fluxninja.aperture.example.App")
mainClass.set("com.fluxninja.aperture.example.ArmeriaServer")
}

apply(from = "version.gradle.kts")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@ public static void main(String[] args) {
return;
}

WebClient client = Clients.builder("http://localhost:10101")
WebClient client = Clients.builder("http://localhost:8080")
.decorator(ApertureHTTPClient.newDecorator(apertureSDK))
.build(WebClient.class);

HttpResponse res = client.get("http/base");
HttpResponse res = client.get("notsuper");
System.out.println(res);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,23 @@ protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req) {
}
};
}
public static HttpService createHealthService() {
return new AbstractHttpService() {
@Override
protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req) {
return HttpResponse.of("Healthy");
}
};
}
public static HttpService createConnectedHTTPService() {
return new AbstractHttpService() {
@Override
protected HttpResponse doGet(ServiceRequestContext ctx, HttpRequest req) {
return HttpResponse.of("");
}
};
}

public static void main(String[] args) {
final String agentHost = "localhost";
final int agentPort = 8089;
Expand All @@ -35,17 +52,17 @@ public static void main(String[] args) {
return;
}
ServerBuilder serverBuilder = Server.builder();
serverBuilder.http(10101);
serverBuilder.service("/http/base", createHelloHTTPService());
serverBuilder.http(8080);
serverBuilder.service("/notsuper", createHelloHTTPService());
serverBuilder.service("/health", createHealthService());
serverBuilder.service("/connected", createConnectedHTTPService());

ApertureHTTPService decoratedService = createHelloHTTPService()
.decorate(ApertureHTTPService.newDecorator(apertureSDK));
serverBuilder.service("/http/decorated", decoratedService);
serverBuilder.service("/super", decoratedService);

Server server = serverBuilder.build();
CompletableFuture<Void> future = server.start();
future.join();
}


}

0 comments on commit 2bbfe8d

Please sign in to comment.