Skip to content

Commit

Permalink
Support for compressed source blob #265
Browse files Browse the repository at this point in the history
  • Loading branch information
lburgazzoli authored and nicolaferraro committed Dec 7, 2018
1 parent e2cb3f0 commit ae04a8e
Show file tree
Hide file tree
Showing 12 changed files with 170 additions and 43 deletions.
12 changes: 9 additions & 3 deletions pkg/apis/camel/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,17 @@ func (is *IntegrationSpec) AddSource(name string, content string, language Langu
is.Sources = append(is.Sources, SourceSpec{Name: name, Content: content, Language: language})
}

// AddSources --
func (is *IntegrationSpec) AddSources(sources ...SourceSpec) {
is.Sources = append(is.Sources, sources...)
}

// SourceSpec --
type SourceSpec struct {
Name string `json:"name,omitempty"`
Content string `json:"content,omitempty"`
Language Language `json:"language,omitempty"`
Name string `json:"name,omitempty"`
Content string `json:"content,omitempty"`
Language Language `json:"language,omitempty"`
Compression bool `json:"compression,omitempty"`
}

// Language --
Expand Down
22 changes: 21 additions & 1 deletion pkg/client/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ limitations under the License.
package cmd

import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
Expand All @@ -30,6 +32,8 @@ import (
"strings"
"syscall"

"github.com/apache/camel-k/pkg/gzip"

"github.com/operator-framework/operator-sdk/pkg/util/k8sutil"
"gopkg.in/yaml.v2"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -86,6 +90,7 @@ func newCmdRun(rootCmdOptions *RootCmdOptions) *cobra.Command {
cmd.Flags().StringSliceVar(&options.LoggingLevels, "logging-level", nil, "Configure the logging level. "+
"E.g. \"--logging-level org.apache.camel=DEBUG\"")
cmd.Flags().StringVarP(&options.OutputFormat, "output", "o", "", "Output format. One of: json|yaml")
cmd.Flags().BoolVar(&options.Compression, "compression", false, "Enable store source as a compressed binary blob")

// completion support
configureKnownCompletions(&cmd)
Expand All @@ -111,6 +116,7 @@ type runCmdOptions struct {
Traits []string
LoggingLevels []string
OutputFormat string
Compression bool
}

func (o *runCmdOptions) validateArgs(cmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -291,7 +297,21 @@ func (o *runCmdOptions) updateIntegrationCode(sources []string) (*v1alpha1.Integ
return nil, err
}

integration.Spec.AddSource(path.Base(source), code, "")
if o.Compression {
var b bytes.Buffer

if err := gzip.Compress(&b, []byte(code)); err != nil {
return nil, err
}

code = base64.StdEncoding.EncodeToString(b.Bytes())
}

integration.Spec.AddSources(v1alpha1.SourceSpec{
Name: path.Base(source),
Content: code,
Compression: o.Compression,
})
}

for _, item := range o.Dependencies {
Expand Down
59 changes: 59 additions & 0 deletions pkg/gzip/compress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package gzip

import (
"bytes"
g "compress/gzip"
"io"
"io/ioutil"
)

// Compress --
func Compress(buffer io.Writer, data []byte) error {
gz := g.NewWriter(buffer)

if _, err := gz.Write(data); err != nil {
return err
}
if err := gz.Flush(); err != nil {
return err
}
if err := gz.Close(); err != nil {
return err
}

return nil
}

// Uncompress --
func Uncompress(buffer io.Writer, data []byte) error {
b := bytes.NewBuffer(data)
gz, err := g.NewReader(b)

defer gz.Close()

data, err = ioutil.ReadAll(gz)
if err != nil {
return err
}

buffer.Write(data)

return nil
}
51 changes: 30 additions & 21 deletions pkg/trait/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package trait
import (
"fmt"
"path"
"strconv"
"strings"

"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
Expand Down Expand Up @@ -116,29 +117,29 @@ func (d *deploymentTrait) getConfigMapsFor(e *Environment) []runtime.Object {
// do not create 'source' ConfigMap if a docker images for deployment
// is required
for i, s := range e.Integration.Spec.Sources {
maps = append(
maps,
&corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-source-%03d", e.Integration.Name, i),
Namespace: e.Integration.Namespace,
Labels: map[string]string{
"camel.apache.org/integration": e.Integration.Name,
},
Annotations: map[string]string{
"camel.apache.org/source.language": string(s.Language),
"camel.apache.org/source.name": s.Name,
},
cm := corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-source-%03d", e.Integration.Name, i),
Namespace: e.Integration.Namespace,
Labels: map[string]string{
"camel.apache.org/integration": e.Integration.Name,
},
Data: map[string]string{
"integration": s.Content,
Annotations: map[string]string{
"camel.apache.org/source.language": string(s.Language),
"camel.apache.org/source.name": s.Name,
"camel.apache.org/source.compression": strconv.FormatBool(s.Compression),
},
},
)
Data: map[string]string{
"integration": s.Content,
},
}

maps = append(maps, &cm)
}
}

Expand Down Expand Up @@ -166,8 +167,16 @@ func (d *deploymentTrait) getSources(e *Environment) []string {
src := path.Join(root, s.Name)
src = "file:" + src

params := make([]string, 0)
if s.Language != "" {
src = fmt.Sprintf("%s?language=%s", src, string(s.Language))
params = append(params, "language="+string(s.Language))
}
if s.Compression {
params = append(params, "compression=true")
}

if len(params) > 0 {
src = fmt.Sprintf("%s?%s", src, strings.Join(params, "&"))
}

sources = append(sources, src)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class GroovyRoutesLoader implements RoutesLoader {

def cl = Thread.currentThread().getContextClassLoader()
def sh = new GroovyShell(cl, new Binding(), cc)
def is = URIResolver.resolve(context, source.location)
def is = URIResolver.resolve(context, source)

is.withCloseable {
def reader = new InputStreamReader(is)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public RouteBuilder load(RuntimeRegistry registry, Source source) throws Excepti
return new RouteBuilder() {
@Override
public void configure() throws Exception {
try (InputStream is = URIResolver.resolve(getContext(), source.getLocation())) {
try (InputStream is = URIResolver.resolve(getContext(), source)) {
String name = StringUtils.substringAfter(source.getLocation(), ":");
name = StringUtils.removeEnd(name, ".java");

Expand Down Expand Up @@ -125,7 +125,7 @@ public void configure() throws Exception {
bindings.put("rest", (Supplier<RestDefinition>) () -> rest());
bindings.put("restConfiguration", (Supplier<RestConfigurationDefinition>) () -> restConfiguration());

try (InputStream is = URIResolver.resolve(context, source.getLocation())) {
try (InputStream is = URIResolver.resolve(context, source)) {
engine.eval(new InputStreamReader(is), bindings);
}
}
Expand All @@ -144,7 +144,7 @@ public RouteBuilder load(RuntimeRegistry registry, Source source) throws Excepti
return new RouteBuilder() {
@Override
public void configure() throws Exception {
try (InputStream is = URIResolver.resolve(getContext(), source.getLocation())) {
try (InputStream is = URIResolver.resolve(getContext(), source)) {
try {
setRouteCollection(
getContext().loadRoutesDefinition(is)
Expand Down
20 changes: 16 additions & 4 deletions runtime/jvm/src/main/java/org/apache/camel/k/jvm/Source.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@
*/
package org.apache.camel.k.jvm;

import java.util.Map;

import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
import org.apache.commons.lang3.StringUtils;

public class Source {
private final String location;
private final Language language;
private final boolean compressed;

private Source(String location, Language language) {
private Source(String location, Language language, boolean compression) {
this.location = location;
this.language = language;
this.compressed = compression;
}

public String getLocation() {
Expand All @@ -37,29 +41,37 @@ public Language getLanguage() {
return language;
}

public boolean isCompressed() {
return compressed;
}

@Override
public String toString() {
return "Source{" +
"location='" + location + '\'' +
", language=" + language +
", compressed=" + compressed +
'}';
}

public static Source create(String uri) throws Exception {
final String location = StringUtils.substringBefore(uri, "?");
final String query = StringUtils.substringAfter(uri, "?");
final String languageName = (String) URISupport.parseQuery(query).get("language");

if (!location.startsWith(Constants.SCHEME_CLASSPATH) &&
!location.startsWith(Constants.SCHEME_FILE) &&
!location.startsWith(Constants.SCHEME_ENV)) {
throw new IllegalArgumentException("No valid resource format, expected scheme:path, found " + uri);
}

final String query = StringUtils.substringAfter(uri, "?");
final Map<String, Object> params = URISupport.parseQuery(query);
final String languageName = (String) params.get("language");
final boolean compression = Boolean.valueOf((String) params.get("compression"));

Language language = ObjectHelper.isNotEmpty(languageName)
? Language.fromLanguageName(languageName)
: Language.fromResource(location);

return new Source(location, language);
return new Source(location, language, compression);
}
}
18 changes: 12 additions & 6 deletions runtime/jvm/src/main/java/org/apache/camel/k/jvm/URIResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.io.InputStream;
import java.io.Reader;
import java.io.StringReader;
import java.util.Base64;
import java.util.zip.GZIPInputStream;

import org.apache.camel.CamelContext;
import org.apache.camel.util.ResourceHelper;
Expand All @@ -28,20 +30,24 @@

public class URIResolver {

public static InputStream resolve(CamelContext ctx, String uri) throws Exception {
if (uri == null) {
public static InputStream resolve(CamelContext ctx, Source source) throws Exception {
if (source.getLocation() == null) {
throw new IllegalArgumentException("Cannot resolve null URI");
}

if (uri.startsWith(Constants.SCHEME_ENV)) {
final String envvar = StringHelper.after(uri, ":");
final InputStream is;

if (source.getLocation().startsWith(Constants.SCHEME_ENV)) {
final String envvar = StringHelper.after(source.getLocation(), ":");
final String content = System.getenv(envvar);

// Using platform encoding on purpose
return new ByteArrayInputStream(content.getBytes());
is = new ByteArrayInputStream(content.getBytes());
} else {
is = ResourceHelper.resolveMandatoryResourceAsInputStream(ctx, source.getLocation());
}

return ResourceHelper.resolveMandatoryResourceAsInputStream(ctx, uri);
return source.isCompressed() ? new GZIPInputStream(Base64.getDecoder().wrap(is)) : is;
}

public static Reader resolveEnv(String uri) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public void testLoadJava() throws Exception {
assertThat(routes.get(0).getOutputs().get(0)).isInstanceOf(ToDefinition.class);
}


@Test
public void testLoadJavaWithNestedClass() throws Exception {
Source source = Source.create("classpath:MyRoutesWithNestedClass.java");
Expand All @@ -84,7 +83,6 @@ public void testLoadJavaWithNestedClass() throws Exception {
assertThat(routes.get(0).getOutputs().get(2)).isInstanceOf(ToDefinition.class);
}


@Test
public void testLoadJavaScript() throws Exception {
Source source = Source.create("classpath:routes.js");
Expand All @@ -102,6 +100,23 @@ public void testLoadJavaScript() throws Exception {
assertThat(routes.get(0).getOutputs().get(0)).isInstanceOf(ToDefinition.class);
}

@Test
public void testLoadCompressedRoute() throws Exception {
Source source = Source.create("classpath:routes-compressed.js.gz.b64?language=js&compression=true");
RoutesLoader loader = RoutesLoaders.loaderFor(source);
RouteBuilder builder = loader.load(new SimpleRuntimeRegistry(), source);

assertThat(loader).isInstanceOf(RoutesLoaders.JavaScript.class);
assertThat(builder).isNotNull();

builder.configure();

List<RouteDefinition> routes = builder.getRouteCollection().getRoutes();
assertThat(routes).hasSize(1);
assertThat(routes.get(0).getInputs().get(0).getEndpointUri()).isEqualTo("timer:tick");
assertThat(routes.get(0).getOutputs().get(0)).isInstanceOf(ToDefinition.class);
}

@Test
public void testLoadJavaScriptWithCustomExtension() throws Exception {
Source source = Source.create("classpath:routes.mytype?language=js");
Expand Down
1 change: 1 addition & 0 deletions runtime/jvm/src/test/resources/routes-compressed.js.gz.b64
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
H4sIAAAAAAAA/+JKK8rP1VAvycxNLbIqyUzOVtfkUlBQUNAryddQz8lPt8rMS8tX1+QCAAAA//8BAAD//3wZ4pUoAAAA
Loading

0 comments on commit ae04a8e

Please sign in to comment.