Skip to content

Commit

Permalink
Fix #1548: initial work on sinkbinding
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolaferraro committed Aug 7, 2020
1 parent fa33423 commit d07ea17
Show file tree
Hide file tree
Showing 15 changed files with 235 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@
apiVersion: camel.apache.org/v1
kind: CamelCatalog
metadata:
name: camel-catalog-1.4.1-main
name: camel-catalog-1.5.0-snapshot-main
labels:
app: camel-k
camel.apache.org/catalog.version: 3.4.0
camel.apache.org/catalog.loader.version: 3.4.0
camel.apache.org/runtime.version: 1.4.1
camel.apache.org/runtime.version: 1.5.0-SNAPSHOT
camel.apache.org/runtime.provider: main
spec:
runtime:
version: 1.4.1
version: 1.5.0-SNAPSHOT
provider: main
applicationClass: org.apache.camel.k.main.Application
metadata:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@
apiVersion: camel.apache.org/v1
kind: CamelCatalog
metadata:
name: camel-catalog-1.4.1-quarkus
name: camel-catalog-1.5.0-snapshot-quarkus
labels:
app: camel-k
camel.apache.org/catalog.version: 3.4.0
camel.apache.org/catalog.loader.version: 3.4.0
camel.apache.org/runtime.version: 1.4.1
camel.apache.org/runtime.version: 1.5.0-SNAPSHOT
camel.apache.org/runtime.provider: quarkus
spec:
runtime:
version: 1.4.1
version: 1.5.0-SNAPSHOT
provider: quarkus
applicationClass: io.quarkus.runner.GeneratedMain
metadata:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ spec:
- apiGroups:
- eventing.knative.dev
- messaging.knative.dev
- sources.knative.dev
resources:
- '*'
verbs:
Expand Down
1 change: 1 addition & 0 deletions deploy/operator-role-knative.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ rules:
- apiGroups:
- eventing.knative.dev
- messaging.knative.dev
- sources.knative.dev
resources:
- "*"
verbs:
Expand Down
24 changes: 12 additions & 12 deletions deploy/resources.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion examples/knative/messages-channel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.
# ---------------------------------------------------------------------------

apiVersion: messaging.knative.dev/v1alpha1
apiVersion: messaging.knative.dev/v1beta1
kind: InMemoryChannel
metadata:
name: messages
2 changes: 1 addition & 1 deletion examples/knative/words-channel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.
# ---------------------------------------------------------------------------

apiVersion: messaging.knative.dev/v1alpha1
apiVersion: messaging.knative.dev/v1beta1
kind: InMemoryChannel
metadata:
name: words
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1106,7 +1106,9 @@ github.com/radovskyb/watcher v1.0.6/go.mod h1:78okwvY5wPdzcb1UYnip1pvrZNIVEIh/Cm
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rcrowley/go-metrics v0.0.0-20190706150252-9beb055b7962/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uYEyJGbgTkfkS4+E/PavXkNJcbFIpEtjt2B0KDQ5+9M=
github.com/robfig/cron v0.0.0-20170526150127-736158dc09e1 h1:NZInwlJPD/G44mJDgBEMFvBfbv/QQKCrpo+az/QXn8c=
github.com/robfig/cron v0.0.0-20170526150127-736158dc09e1/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
Expand Down
1 change: 1 addition & 0 deletions helm/camel-k/templates/operator-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ rules:
- apiGroups:
- eventing.knative.dev
- messaging.knative.dev
- sources.knative.dev
resources:
- "*"
verbs:
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/addtoscheme_knative_eventing.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1"
messagingv1alpha1 "knative.dev/eventing/pkg/apis/messaging/v1alpha1"
messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1"
sourcesv1alpha1 "knative.dev/eventing/pkg/apis/sources/v1alpha1"
)

func init() {
Expand All @@ -30,4 +31,5 @@ func init() {
AddToSchemes = append(AddToSchemes, eventingv1beta1.AddToScheme)
AddToSchemes = append(AddToSchemes, messagingv1alpha1.AddToScheme)
AddToSchemes = append(AddToSchemes, messagingv1beta1.AddToScheme)
AddToSchemes = append(AddToSchemes, sourcesv1alpha1.AddToScheme)
}
178 changes: 133 additions & 45 deletions pkg/trait/knative.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,23 @@ limitations under the License.
package trait

import (
"fmt"
"net/url"
"reflect"
"strings"

"github.com/pkg/errors"

corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"

eventing "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
serving "knative.dev/serving/pkg/apis/serving/v1"

v1 "github.com/apache/camel-k/pkg/apis/camel/v1"
knativeapi "github.com/apache/camel-k/pkg/apis/camel/v1/knative"
"github.com/apache/camel-k/pkg/metadata"
"github.com/apache/camel-k/pkg/util"
"github.com/apache/camel-k/pkg/util/envvar"
knativeutil "github.com/apache/camel-k/pkg/util/knative"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
eventing "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
serving "knative.dev/serving/pkg/apis/serving/v1"
)

// The Knative trait automatically discovers addresses of Knative resources and inject them into the
Expand Down Expand Up @@ -73,6 +73,10 @@ type knativeTrait struct {
FilterSourceChannels *bool `property:"filter-source-channels" json:"filterSourceChannels,omitempty"`
// Enables Knative CamelSource pre 0.15 compatibility fixes (will be removed in future versions).
CamelSourceCompat *bool `property:"camel-source-compat" json:"camelSourceCompat,omitempty"`
// Allows binding the integration to a sink via a Knative SinkBinding resource.
// This can be used when the integration targets a single sink.
// It's disabled by default.
SinkBinding *bool `property:"sink-binding" json:"sinkBinding,omitempty"`
// Enable automatic discovery of all trait properties.
Auto *bool `property:"auto" json:"auto,omitempty"`
}
Expand Down Expand Up @@ -201,6 +205,10 @@ func (t *knativeTrait) Apply(e *Environment) error {
}
}

if t.SinkBinding != nil && *t.SinkBinding {
util.StringSliceUniqueAdd(&e.Integration.Status.Dependencies, "mvn:org.apache.camel.k/camel-k-runtime-knative")
}

if len(t.ChannelSources) > 0 || len(t.EndpointSources) > 0 || len(t.EventSources) > 0 {
util.StringSliceUniqueAdd(&e.Integration.Status.Capabilities, v1.CapabilityPlatformHTTP)
}
Expand All @@ -225,6 +233,9 @@ func (t *knativeTrait) Apply(e *Environment) error {
if err := t.configureEvents(e, &env); err != nil {
return err
}
if err := t.configureSinkBinding(e, &env); err != nil {
return err
}

conf, err := env.Serialize()
if err != nil {
Expand Down Expand Up @@ -268,19 +279,21 @@ func (t *knativeTrait) configureChannels(e *Environment, env *knativeapi.CamelEn
return err
}

// Sinks
err = t.ifServiceMissingDo(e, env, t.ChannelSinks, knativeapi.CamelServiceTypeChannel, knativeapi.CamelEndpointKindSink,
func(ref *corev1.ObjectReference, loc *url.URL, serviceURI string) error {
svc, err := knativeapi.BuildCamelServiceDefinition(ref.Name, knativeapi.CamelEndpointKindSink,
knativeapi.CamelServiceTypeChannel, *loc, ref.APIVersion, ref.Kind)
if err != nil {
return err
}
env.Services = append(env.Services, svc)
return nil
})
if err != nil {
return err
if t.SinkBinding == nil || !*t.SinkBinding {
// Sinks
err = t.ifServiceMissingDo(e, env, t.ChannelSinks, knativeapi.CamelServiceTypeChannel, knativeapi.CamelEndpointKindSink,
func(ref *corev1.ObjectReference, loc *url.URL, serviceURI string) error {
svc, err := knativeapi.BuildCamelServiceDefinition(ref.Name, knativeapi.CamelEndpointKindSink,
knativeapi.CamelServiceTypeChannel, *loc, ref.APIVersion, ref.Kind)
if err != nil {
return err
}
env.Services = append(env.Services, svc)
return nil
})
if err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -319,18 +332,20 @@ func (t *knativeTrait) configureEndpoints(e *Environment, env *knativeapi.CamelE
}

// Sinks
err := t.ifServiceMissingDo(e, env, t.EndpointSinks, knativeapi.CamelServiceTypeEndpoint, knativeapi.CamelEndpointKindSink,
func(ref *corev1.ObjectReference, loc *url.URL, serviceURI string) error {
svc, err := knativeapi.BuildCamelServiceDefinition(ref.Name, knativeapi.CamelEndpointKindSink,
knativeapi.CamelServiceTypeEndpoint, *loc, ref.APIVersion, ref.Kind)
if err != nil {
return err
}
env.Services = append(env.Services, svc)
return nil
})
if err != nil {
return err
if t.SinkBinding == nil || !*t.SinkBinding {
err := t.ifServiceMissingDo(e, env, t.EndpointSinks, knativeapi.CamelServiceTypeEndpoint, knativeapi.CamelEndpointKindSink,
func(ref *corev1.ObjectReference, loc *url.URL, serviceURI string) error {
svc, err := knativeapi.BuildCamelServiceDefinition(ref.Name, knativeapi.CamelEndpointKindSink,
knativeapi.CamelServiceTypeEndpoint, *loc, ref.APIVersion, ref.Kind)
if err != nil {
return err
}
env.Services = append(env.Services, svc)
return nil
})
if err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -366,23 +381,96 @@ func (t *knativeTrait) configureEvents(e *Environment, env *knativeapi.CamelEnvi
}

// Sinks
err = t.ifServiceMissingDo(e, env, t.EventSinks, knativeapi.CamelServiceTypeEvent, knativeapi.CamelEndpointKindSink,
func(ref *corev1.ObjectReference, loc *url.URL, serviceURI string) error {
svc, err := knativeapi.BuildCamelServiceDefinition(ref.Name, knativeapi.CamelEndpointKindSink,
knativeapi.CamelServiceTypeEvent, *loc, ref.APIVersion, ref.Kind)
if err != nil {
return err
}
env.Services = append(env.Services, svc)
return nil
})
if err != nil {
return err
if t.SinkBinding == nil || !*t.SinkBinding {
err = t.ifServiceMissingDo(e, env, t.EventSinks, knativeapi.CamelServiceTypeEvent, knativeapi.CamelEndpointKindSink,
func(ref *corev1.ObjectReference, loc *url.URL, serviceURI string) error {
svc, err := knativeapi.BuildCamelServiceDefinition(ref.Name, knativeapi.CamelEndpointKindSink,
knativeapi.CamelServiceTypeEvent, *loc, ref.APIVersion, ref.Kind)
if err != nil {
return err
}
env.Services = append(env.Services, svc)
return nil
})
if err != nil {
return err
}
}

return nil
}

func (t *knativeTrait) configureSinkBinding(e *Environment, env *knativeapi.CamelEnvironment) error {
if t.SinkBinding == nil || !*t.SinkBinding {
return nil
}
var serviceType knativeapi.CamelServiceType
services := t.extractServices(t.ChannelSinks, knativeapi.CamelServiceTypeChannel)
if len(services) > 0 {
serviceType = knativeapi.CamelServiceTypeChannel
}
services = append(services, t.extractServices(t.EndpointSinks, knativeapi.CamelServiceTypeEndpoint)...)
if len(serviceType) == 0 && len(services) > 0 {
serviceType = knativeapi.CamelServiceTypeEndpoint
}
services = append(services, t.extractServices(t.EventSinks, knativeapi.CamelServiceTypeEvent)...)
if len(serviceType) == 0 && len(services) > 0 {
serviceType = knativeapi.CamelServiceTypeEvent
}

if len(services) != 1 {
return fmt.Errorf("sinkbinding can only be used with a single sink: found %d sinks", len(services))
}

err := t.withServiceDo(false, e, env, services, serviceType, knativeapi.CamelEndpointKindSink, func(ref *corev1.ObjectReference, url *url.URL, serviceURI string) error {
util.StringSliceUniqueAdd(&e.Interceptors, "knative-sink-binding")
e.ApplicationProperties["loader.interceptor.knative-sink-binding.name"] = ref.Name
e.ApplicationProperties["loader.interceptor.knative-sink-binding.type"] = string(serviceType)
e.ApplicationProperties["loader.interceptor.knative-sink-binding.kind"] = ref.Kind
e.ApplicationProperties["loader.interceptor.knative-sink-binding.api-version"] = ref.APIVersion

if e.IntegrationInPhase(v1.IntegrationPhaseDeploying) {
e.PostStepProcessors = append(e.PostStepProcessors, func(e *Environment) error {
sinkBindingInjected := false
e.Resources.Visit(func(object runtime.Object) {
gvk := object.GetObjectKind().GroupVersionKind()
if gvk.Kind == "SinkBinding" && strings.Contains(gvk.Group, "knative") {
sinkBindingInjected = true
}
})
if sinkBindingInjected {
return nil
}

controller := e.Resources.GetController(func(object runtime.Object) bool {
return true
})
if controller != nil && !reflect.ValueOf(controller).IsNil() {
gvk := controller.GetObjectKind().GroupVersionKind()
av, k := gvk.ToAPIVersionAndKind()
source := corev1.ObjectReference{
Kind: k,
Namespace: e.Integration.Namespace,
Name: e.Integration.Name,
APIVersion: av,
}
target := corev1.ObjectReference{
Kind: ref.Kind,
Namespace: e.Integration.Namespace,
Name: ref.Name,
APIVersion: ref.APIVersion,
}
e.Resources.Add(knativeutil.CreateSinkBinding(source, target))
}
return nil
})
}
return nil
})

return err
}

func (t *knativeTrait) createTrigger(e *Environment, ref *corev1.ObjectReference, eventType string) {
// TODO extend to additional filters too, to filter them at source and not at destination
found := e.Resources.HasKnativeTrigger(func(trigger *eventing.Trigger) bool {
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (
Version = "1.1.0-SNAPSHOT"

// DefaultRuntimeVersion --
DefaultRuntimeVersion = "1.4.1"
DefaultRuntimeVersion = "1.5.0-SNAPSHOT"

// BuildahVersion --
BuildahVersion = "1.14.0"
Expand Down
Loading

0 comments on commit d07ea17

Please sign in to comment.