From c5ac5aae64d8b4f04028cc5f554fc22dc8a8d378 Mon Sep 17 00:00:00 2001 From: Nicola Ferraro Date: Fri, 24 Jul 2020 19:34:54 +0200 Subject: [PATCH] Fix #365: change sinkbinding to be a customizer unrelated to the knative-source loader --- .../KnativeSinkBindingContextCustomizer.java | 133 +++++++++++++ .../KnativeSourceLoaderInterceptor.java | 52 ----- .../KnativeSinkBindingCustomizerTest.java | 185 ++++++++++++++++++ .../KnativeSourceRoutesLoaderTest.java | 72 ------- .../camel/component/knative/knative.json | 2 +- 5 files changed, 319 insertions(+), 125 deletions(-) create mode 100644 camel-k-runtime-knative/src/main/java/org/apache/camel/k/knative/customizer/KnativeSinkBindingContextCustomizer.java create mode 100644 camel-k-runtime-knative/src/test/java/org/apache/camel/k/knative/customizer/KnativeSinkBindingCustomizerTest.java diff --git a/camel-k-runtime-knative/src/main/java/org/apache/camel/k/knative/customizer/KnativeSinkBindingContextCustomizer.java b/camel-k-runtime-knative/src/main/java/org/apache/camel/k/knative/customizer/KnativeSinkBindingContextCustomizer.java new file mode 100644 index 000000000..051195e9a --- /dev/null +++ b/camel-k-runtime-knative/src/main/java/org/apache/camel/k/knative/customizer/KnativeSinkBindingContextCustomizer.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.k.knative.customizer; + +import java.io.IOException; +import java.io.Reader; +import java.io.StringReader; +import java.util.HashMap; +import java.util.Optional; + +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.camel.CamelContext; +import org.apache.camel.component.knative.spi.Knative; +import org.apache.camel.component.knative.spi.KnativeEnvironment; +import org.apache.camel.k.ContextCustomizer; +import org.apache.camel.k.annotation.Customizer; +import org.apache.camel.util.ObjectHelper; + +@Customizer("sinkbinding") +public class KnativeSinkBindingContextCustomizer implements ContextCustomizer { + + private String name; + + private Knative.Type type; + + private String kind; + + private String apiVersion; + + @Override + public void apply(CamelContext camelContext) { + createSyntheticDefinition(camelContext, name).ifPresent(serviceDefinition -> { + // publish the synthetic service definition + camelContext.getRegistry().bind(name, serviceDefinition); + }); + } + + private Optional createSyntheticDefinition( + CamelContext camelContext, + String sinkName) { + + final String kSinkUrl = camelContext.resolvePropertyPlaceholders("{{k.sink:}}"); + final String kCeOverride = camelContext.resolvePropertyPlaceholders("{{k.ce.overrides:}}"); + + if (ObjectHelper.isNotEmpty(kSinkUrl)) { + // create a synthetic service definition to target the K_SINK url + var serviceBuilder = KnativeEnvironment.serviceBuilder(type, sinkName) + .withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.sink) + .withMeta(Knative.SERVICE_META_URL, kSinkUrl); + + if (ObjectHelper.isNotEmpty(kind)) { + serviceBuilder = serviceBuilder.withMeta(Knative.KNATIVE_KIND, kind); + } + + if (ObjectHelper.isNotEmpty(apiVersion)) { + serviceBuilder = serviceBuilder.withMeta(Knative.KNATIVE_API_VERSION, apiVersion); + } + + if (ObjectHelper.isNotEmpty(kCeOverride)) { + try (Reader reader = new StringReader(kCeOverride)) { + // assume K_CE_OVERRIDES is defined as simple key/val json + var overrides = Knative.MAPPER.readValue( + reader, + new TypeReference>() { + } + ); + + for (var entry : overrides.entrySet()) { + // generate proper ce-override meta-data for the service + // definition + serviceBuilder.withMeta( + Knative.KNATIVE_CE_OVERRIDE_PREFIX + entry.getKey(), + entry.getValue() + ); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + return Optional.of(serviceBuilder.build()); + } + + return Optional.empty(); + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Knative.Type getType() { + return type; + } + + public void setType(Knative.Type type) { + this.type = type; + } + + public String getKind() { + return kind; + } + + public void setKind(String kind) { + this.kind = kind; + } + + public String getApiVersion() { + return apiVersion; + } + + public void setApiVersion(String apiVersion) { + this.apiVersion = apiVersion; + } + +} diff --git a/camel-k-runtime-knative/src/main/java/org/apache/camel/k/loader/knative/KnativeSourceLoaderInterceptor.java b/camel-k-runtime-knative/src/main/java/org/apache/camel/k/loader/knative/KnativeSourceLoaderInterceptor.java index 07f211342..ef303b84b 100644 --- a/camel-k-runtime-knative/src/main/java/org/apache/camel/k/loader/knative/KnativeSourceLoaderInterceptor.java +++ b/camel-k-runtime-knative/src/main/java/org/apache/camel/k/loader/knative/KnativeSourceLoaderInterceptor.java @@ -16,25 +16,17 @@ */ package org.apache.camel.k.loader.knative; -import java.io.IOException; -import java.io.Reader; -import java.io.StringReader; -import java.util.HashMap; import java.util.List; import java.util.Optional; -import com.fasterxml.jackson.core.type.TypeReference; import org.apache.camel.CamelContext; import org.apache.camel.RoutesBuilder; -import org.apache.camel.component.knative.spi.Knative; -import org.apache.camel.component.knative.spi.KnativeEnvironment; import org.apache.camel.k.Source; import org.apache.camel.k.SourceLoader; import org.apache.camel.k.annotation.LoaderInterceptor; import org.apache.camel.k.support.RuntimeSupport; import org.apache.camel.model.RouteDefinition; import org.apache.camel.model.ToDefinition; -import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,11 +52,6 @@ public Optional builder() { final String sinkUri = String.format("knative://endpoint/%s", sinkName); final RouteDefinition definition = definitions.get(0); - createSyntheticDefinition(camelContext, sinkName).ifPresent(serviceDefinition -> { - // publish the synthetic service definition - camelContext.getRegistry().bind(sinkName, serviceDefinition); - }); - LOGGER.info("Add sink:{} to route:{}", sinkUri, definition.getId()); // assuming that route is linear like there's no content based routing @@ -83,43 +70,4 @@ public Optional configuration() { }; } - private static Optional createSyntheticDefinition( - CamelContext camelContext, - String sinkName) { - - final String kSinkUrl = camelContext.resolvePropertyPlaceholders("{{k.sink:}}"); - final String kCeOverride = camelContext.resolvePropertyPlaceholders("{{k.ce.overrides:}}"); - - if (ObjectHelper.isNotEmpty(kSinkUrl)) { - // create a synthetic service definition to target the K_SINK url - var serviceBuilder = KnativeEnvironment.serviceBuilder(Knative.Type.endpoint, sinkName) - .withMeta(Knative.CAMEL_ENDPOINT_KIND, Knative.EndpointKind.sink) - .withMeta(Knative.SERVICE_META_URL, kSinkUrl); - - if (ObjectHelper.isNotEmpty(kCeOverride)) { - try (Reader reader = new StringReader(kCeOverride)) { - // assume K_CE_OVERRIDES is defined as simple key/val json - var overrides = Knative.MAPPER.readValue( - reader, - new TypeReference>() { } - ); - - for (var entry: overrides.entrySet()) { - // generate proper ce-override meta-data for the service - // definition - serviceBuilder.withMeta( - Knative.KNATIVE_CE_OVERRIDE_PREFIX + entry.getKey(), - entry.getValue() - ); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - return Optional.of(serviceBuilder.build()); - } - - return Optional.empty(); - } } diff --git a/camel-k-runtime-knative/src/test/java/org/apache/camel/k/knative/customizer/KnativeSinkBindingCustomizerTest.java b/camel-k-runtime-knative/src/test/java/org/apache/camel/k/knative/customizer/KnativeSinkBindingCustomizerTest.java new file mode 100644 index 000000000..7dab46d51 --- /dev/null +++ b/camel-k-runtime-knative/src/test/java/org/apache/camel/k/knative/customizer/KnativeSinkBindingCustomizerTest.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.k.knative.customizer; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +import org.apache.camel.CamelContext; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.knative.KnativeComponent; +import org.apache.camel.component.knative.KnativeConstants; +import org.apache.camel.component.knative.spi.CloudEvent; +import org.apache.camel.component.knative.spi.CloudEvents; +import org.apache.camel.component.knative.spi.Knative; +import org.apache.camel.component.knative.spi.KnativeEnvironment; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.k.Runtime; +import org.apache.camel.k.Source; +import org.apache.camel.k.SourceLoader; +import org.apache.camel.k.Sources; +import org.apache.camel.k.http.PlatformHttpServiceContextCustomizer; +import org.apache.camel.k.listener.RoutesConfigurer; +import org.apache.camel.k.support.RuntimeSupport; +import org.apache.camel.k.test.AvailablePortFinder; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class KnativeSinkBindingCustomizerTest { + + @Test + public void testSinkBindingRegistration() throws Exception { + Runtime runtime = Runtime.on(new DefaultCamelContext()); + runtime.setProperties( + "k.sink", "http://theurl", + "camel.k.customizer.sinkbinding.enabled", "true", + "camel.k.customizer.sinkbinding.name", "mychannel", + "camel.k.customizer.sinkbinding.type", "channel", + "camel.k.customizer.sinkbinding.kind", "InMemoryChannel", + "camel.k.customizer.sinkbinding.api-version", "messaging.knative.dev/v1beta1"); + + + assertThat(RuntimeSupport.configureContextCustomizers(runtime)).hasOnlyOneElementSatisfying(customizer -> { + assertThat(customizer).isInstanceOfSatisfying(KnativeSinkBindingContextCustomizer.class, sc -> { + assertThat(sc.getName()).isEqualTo("mychannel"); + assertThat(sc.getType()).isEqualTo(Knative.Type.channel); + assertThat(sc.getApiVersion()).isEqualTo("messaging.knative.dev/v1beta1"); + assertThat(sc.getKind()).isEqualTo("InMemoryChannel"); + }); + + var svc = runtime.getRegistry().lookupByNameAndType("mychannel", KnativeEnvironment.KnativeServiceDefinition.class); + assertThat(svc).isNotNull(); + assertThat(svc.getUrl()).isEqualTo("http://theurl"); + assertThat(svc.getName()).isEqualTo("mychannel"); + assertThat(svc.getType()).isEqualTo(Knative.Type.channel); + assertThat(svc.getMetadata(Knative.KNATIVE_API_VERSION)).isEqualTo("messaging.knative.dev/v1beta1"); + assertThat(svc.getMetadata(Knative.KNATIVE_KIND)).isEqualTo("InMemoryChannel"); + }); + } + + @Test + public void testWrapLoaderWithSyntheticServiceDefinition() throws Exception { + + final String data = UUID.randomUUID().toString(); + final TestRuntime runtime = new TestRuntime(); + final String typeHeaderKey = CloudEvents.v1_0.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(); + final String typeHeaderVal = UUID.randomUUID().toString(); + final String url = String.format("http://localhost:%d", runtime.port); + + KnativeComponent component = new KnativeComponent(); + component.setEnvironment(new KnativeEnvironment(Collections.emptyList())); + + Properties properties = new Properties(); + properties.put("camel.k.customizer.sinkbinding.enabled", "true"); + properties.put("camel.k.customizer.sinkbinding.name", "mySynk"); + properties.put("camel.k.customizer.sinkbinding.type", "endpoint"); + properties.put("k.sink", String.format("http://localhost:%d", runtime.port)); + properties.put("k.ce.overrides", Knative.MAPPER.writeValueAsString(Map.of(typeHeaderKey, typeHeaderVal))); + + CamelContext context = runtime.getCamelContext(); + context.getPropertiesComponent().setInitialProperties(properties); + context.addComponent(KnativeConstants.SCHEME, component); + + RuntimeSupport.configureContextCustomizers(runtime); + + Source source = Sources.fromBytes("groovy", "from('direct:start').setBody().header('MyHeader').to('knative://endpoint/mySynk')".getBytes(StandardCharsets.UTF_8)); + SourceLoader loader = RoutesConfigurer.load(runtime, source); + + assertThat(loader.getSupportedLanguages()).contains(source.getLanguage()); + assertThat(runtime.builders).hasSize(1); + + try { + context.addRoutes(runtime.builders.get(0)); + context.addRoutes(new RouteBuilder() { + @Override + public void configure() throws Exception { + fromF("platform-http:/") + .routeId("http") + .to("mock:result"); + } + }); + context.start(); + + var services = context.getRegistry().findByType(KnativeEnvironment.KnativeServiceDefinition.class); + + assertThat(services).hasSize(1); + assertThat(services).first().hasFieldOrPropertyWithValue("name", "mySynk"); + assertThat(services).first().hasFieldOrPropertyWithValue("url", url); + + MockEndpoint mock = context.getEndpoint("mock:result", MockEndpoint.class); + mock.expectedMessageCount(1); + mock.expectedBodiesReceived(data); + mock.expectedHeaderReceived(typeHeaderKey, typeHeaderVal); + + context.createFluentProducerTemplate() + .to("direct:start") + .withHeader("MyHeader", data) + .send(); + + mock.assertIsSatisfied(); + } finally { + context.stop(); + } + } + + static class TestRuntime implements Runtime { + private final CamelContext camelContext; + private final List builders; + private final int port; + + public TestRuntime() { + this.camelContext = new DefaultCamelContext(); + this.camelContext.disableJMX(); + this.camelContext.setStreamCaching(true); + + this.builders = new ArrayList<>(); + this.port = AvailablePortFinder.getNextAvailable(); + + PlatformHttpServiceContextCustomizer httpService = new PlatformHttpServiceContextCustomizer(); + httpService.setBindPort(this.port); + httpService.apply(this.camelContext); + } + + public int getPort() { + return port; + } + + @Override + public CamelContext getCamelContext() { + return this.camelContext; + } + + @Override + public void addRoutes(RoutesBuilder builder) { + this.builders.add(builder); + } + + @Override + public void setPropertiesLocations(Collection locations) { + } + } + +} diff --git a/camel-k-runtime-knative/src/test/java/org/apache/camel/k/loader/knative/KnativeSourceRoutesLoaderTest.java b/camel-k-runtime-knative/src/test/java/org/apache/camel/k/loader/knative/KnativeSourceRoutesLoaderTest.java index 7af4acd68..513005eba 100644 --- a/camel-k-runtime-knative/src/test/java/org/apache/camel/k/loader/knative/KnativeSourceRoutesLoaderTest.java +++ b/camel-k-runtime-knative/src/test/java/org/apache/camel/k/loader/knative/KnativeSourceRoutesLoaderTest.java @@ -126,78 +126,6 @@ public void configure() throws Exception { } } - @ParameterizedTest - @MethodSource("parameters") - public void testWrapLoaderWithSyntheticServiceDefinition(String uri) throws Exception { - LOGGER.info("uri: {}", uri); - - final String data = UUID.randomUUID().toString(); - final TestRuntime runtime = new TestRuntime(); - final String typeHeaderKey = CloudEvents.v1_0.mandatoryAttribute(CloudEvent.CAMEL_CLOUD_EVENT_TYPE).http(); - final String typeHeaderVal = UUID.randomUUID().toString(); - final String url = String.format("http://localhost:%d", runtime.port); - - KnativeComponent component = new KnativeComponent(); - component.setEnvironment(new KnativeEnvironment(Collections.emptyList())); - - Properties properties = new Properties(); - properties.put("knative.sink", "mySynk"); - properties.put("k.sink", String.format("http://localhost:%d", runtime.port)); - properties.put("k.ce.overrides", Knative.MAPPER.writeValueAsString(Map.of(typeHeaderKey, typeHeaderVal))); - - CamelContext context = runtime.getCamelContext(); - context.getPropertiesComponent().setInitialProperties(properties); - context.addComponent(KnativeConstants.SCHEME, component); - - Source source = Sources.fromURI(uri); - SourceLoader loader = RoutesConfigurer.load(runtime, source); - - assertThat(loader.getSupportedLanguages()).contains(source.getLanguage()); - assertThat(runtime.builders).hasSize(1); - - try { - context.addRoutes(runtime.builders.get(0)); - context.addRoutes(new RouteBuilder() { - @Override - public void configure() throws Exception { - fromF("platform-http:/") - .routeId("http") - .to("mock:result"); - } - }); - context.start(); - - var definitions = context.adapt(ModelCamelContext.class).getRouteDefinitions(); - var services = context.getRegistry().findByType(KnativeEnvironment.KnativeServiceDefinition.class); - - assertThat(definitions).hasSize(2); - assertThat(definitions).first().satisfies(d -> { - assertThat(d.getOutputs()).last().hasFieldOrPropertyWithValue( - "endpointUri", - "knative://endpoint/mySynk" - ); - }); - - assertThat(services).hasSize(1); - assertThat(services).first().hasFieldOrPropertyWithValue("name", "mySynk"); - assertThat(services).first().hasFieldOrPropertyWithValue("url", url); - - MockEndpoint mock = context.getEndpoint("mock:result", MockEndpoint.class); - mock.expectedMessageCount(1); - mock.expectedBodiesReceived(data); - mock.expectedHeaderReceived(typeHeaderKey, typeHeaderVal); - - context.createFluentProducerTemplate() - .to("direct:start") - .withHeader("MyHeader", data) - .send(); - - mock.assertIsSatisfied(); - } finally { - context.stop(); - } - } - static class TestRuntime implements Runtime { private final CamelContext camelContext; private final List builders; diff --git a/camel-knative/camel-knative/src/generated/resources/org/apache/camel/component/knative/knative.json b/camel-knative/camel-knative/src/generated/resources/org/apache/camel/component/knative/knative.json index 5d81d0619..4954d47c5 100644 --- a/camel-knative/camel-knative/src/generated/resources/org/apache/camel/component/knative/knative.json +++ b/camel-knative/camel-knative/src/generated/resources/org/apache/camel/component/knative/knative.json @@ -11,7 +11,7 @@ "supportLevel": "Preview", "groupId": "org.apache.camel.k", "artifactId": "camel-knative", - "version": "1.4.1-SNAPSHOT", + "version": "1.5.0-SNAPSHOT", "scheme": "knative", "extendsScheme": "", "syntax": "knative:type\/name",