Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[#1289] Very first version for new adapter interface
Browse files Browse the repository at this point in the history
tenthe committed Mar 31, 2023

Unverified

This user has not yet uploaded their public signing key.
1 parent b1d4591 commit 3c851b3
Showing 36 changed files with 1,105 additions and 59 deletions.
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@

package org.apache.streampipes.extensions.api.connect;

@Deprecated
public interface Connector {

String getId();
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@

package org.apache.streampipes.extensions.api.connect;

@Deprecated
public interface EmitBinaryEvent {
Boolean emit(byte[] event);
}
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.grounding.TransportProtocol;

@Deprecated
public interface IAdapter<T extends AdapterDescription> extends Connector {

T declareModel();
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@

import java.util.Map;

@Deprecated
public interface IFormat {

IFormat getInstance(FormatDescription formatDescription);
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@
import java.io.InputStream;
import java.util.List;

@Deprecated
public interface IParser {

IParser getInstance(FormatDescription formatDescription);
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@
import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
import org.apache.streampipes.model.connect.guess.GuessSchema;

@Deprecated
public interface IProtocol extends Connector {

IProtocol getInstance(ProtocolDescription protocolDescription,
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.extensions.management.connect;


// TODO rename to `Adapter` once the old class can be deleted
public abstract class AdapterInstance implements AdapterInterface {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.extensions.management.connect;

import org.apache.streampipes.extensions.api.connect.exception.AdapterException;
import org.apache.streampipes.model.connect.adapter.AdapterConfiguration;
import org.apache.streampipes.model.connect.adapter.IEventCollector;
import org.apache.streampipes.model.connect.guess.AdapterGuessInfo;
import org.apache.streampipes.sdk.extractor.AdapterParameterExtractor;

// TODO Rename to IAdapter once the old class can be deleted
public interface AdapterInterface {
AdapterConfiguration declareConfig();

void onAdapterStarted(AdapterParameterExtractor extractor,
IEventCollector collector,
IAdapterRuntimeContext adapterRuntimeContext) throws AdapterException;

void onAdapterStopped(AdapterParameterExtractor extractor,
IAdapterRuntimeContext adapterRuntimeContext) throws AdapterException;

AdapterGuessInfo onSchemaRequested(AdapterParameterExtractor extractor,
IAdapterRuntimeContext adapterRuntimeContext) throws AdapterException;
}
Original file line number Diff line number Diff line change
@@ -39,6 +39,27 @@ public class ConnectWorkerDescriptionProvider {

private static final Logger LOG = LoggerFactory.getLogger(ConnectWorkerDescriptionProvider.class);

/**
* Retrieves a list of all adapter descriptions that are currently registered.
* @return a list of {@link AdapterDescription} objects representing the registered adapters
*/
public List<AdapterDescription> getAdapterDescriptions() {
var adapters = getRegisteredAdapters()
.stream()
.map(adapter -> adapter.declareConfig().getAdapterDescription())
.toList();
return adapters;
}

/**
* This is a helper method to mock the Declarer Singleton in unit tests
* @return the registered adapters from the DeclarerSingleton
*/
public List<AdapterInterface> getRegisteredAdapters() {
return DeclarersSingleton.getInstance().getAdapters();
}

@Deprecated
public List<AdapterDescription> getContainerDescription(String serviceGroup) {

List<AdapterDescription> allAdapterDescriptions = new ArrayList<>();
@@ -48,6 +69,7 @@ public List<AdapterDescription> getContainerDescription(String serviceGroup) {
return allAdapterDescriptions;
}

@Deprecated
public Optional<AdapterDescription> getAdapterDescription(String appId) {
List<AdapterDescription> allAdapterDescriptions = getContainerDescription("");
return allAdapterDescriptions
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.extensions.management.connect;

import org.apache.streampipes.client.StreamPipesClient;
import org.apache.streampipes.extensions.management.config.ConfigExtractor;
import org.apache.streampipes.extensions.management.monitoring.SpMonitoringManager;
import org.apache.streampipes.model.runtime.SchemaInfo;
import org.apache.streampipes.model.runtime.SourceInfo;

import java.util.List;

public interface IAdapterRuntimeContext {
SpMonitoringManager getLogger();

List<SchemaInfo> getInputSchemaInfo();

List<SourceInfo> getInputSourceInfo();

String getCorrespondingUser();

ConfigExtractor getConfigStore();

StreamPipesClient getStreamPipesClient();


}
Original file line number Diff line number Diff line change
@@ -29,6 +29,7 @@
import org.apache.streampipes.extensions.api.declarer.PipelineTemplateDeclarer;
import org.apache.streampipes.extensions.api.declarer.SemanticEventConsumerDeclarer;
import org.apache.streampipes.extensions.api.declarer.SemanticEventProcessingAgentDeclarer;
import org.apache.streampipes.extensions.management.connect.AdapterInterface;
import org.apache.streampipes.extensions.management.model.SpServiceDefinition;
import org.apache.streampipes.messaging.SpProtocolDefinitionFactory;
import org.apache.streampipes.messaging.SpProtocolManager;
@@ -38,12 +39,11 @@
import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
import org.apache.streampipes.svcdiscovery.api.SpConfig;
import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
import org.apache.streampipes.vocabulary.StreamPipes;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
@@ -69,9 +69,13 @@ public class DeclarersSingleton {
private Map<String, TransportProtocol> supportedProtocols;
private Map<String, TransportFormat> supportedFormats;

@Deprecated
private Map<String, IProtocol> allProtocols;
@Deprecated
private Map<String, IAdapter> allAdapters;

private List<AdapterInterface> adapters;

private String serviceId;

private int port;
@@ -88,6 +92,7 @@ private DeclarersSingleton() {
this.supportedFormats = new HashMap<>();
this.allProtocols = new HashMap<>();
this.allAdapters = new HashMap<>();
this.adapters = new ArrayList<>();
this.functions = new HashMap<>();
this.route = "/";
}
@@ -111,8 +116,8 @@ public void populate(String host, Integer port, SpServiceDefinition serviceDef)
this.registerDataFormats(serviceDef.getDataFormatFactories());
this.allAdapters = serviceDef.getSpecificAdapters();
this.allProtocols = serviceDef.getAdapterProtocols();
this.adapters = serviceDef.getAdapters();
serviceDef.getFunctions().forEach(f -> this.functions.put(f.getFunctionConfig().getFunctionId().getId(), f));

}

private void registerConfigs(String serviceGroup,
@@ -155,37 +160,12 @@ public Map<String, Declarer<?>> getDeclarers() {
return result;
}

public void supportedProtocols(TransportProtocol... protocols) {
Arrays.asList(protocols).forEach(protocol ->
this.supportedProtocols.put(protocol.getClass().getCanonicalName(), protocol));
}

public void supportedFormats(TransportFormat... formats) {
Arrays.asList(formats).forEach(format ->
this.supportedFormats.put(getFormatUri(format), format));
}

private String getFormatUri(TransportFormat format) {
return format
.getRdfType()
.stream()
.map(URI::toString)
.filter(t -> !t.equals("http://www.w3.org/2000/01/rdf-schema#"))
.filter(t -> !t.equals(StreamPipes.TRANSPORT_FORMAT))
.findFirst()
.get();
}

public void registerProtocol(SpProtocolDefinitionFactory<?> protocol) {
SpProtocolManager.INSTANCE.register(protocol);
this.supportedProtocols.put(protocol.getTransportProtocolClass(),
protocol.getTransportProtocol());
}

public void registerProtocols(SpProtocolDefinitionFactory<?>... protocols) {
registerProtocols(Arrays.asList(protocols));
}

public void registerProtocols(List<SpProtocolDefinitionFactory<?>> protocols) {
protocols.forEach(this::registerProtocol);
}
@@ -326,6 +306,10 @@ public Map<String, IStreamPipesFunctionDeclarer> getFunctions() {
return functions;
}

public List<AdapterInterface> getAdapters() {
return adapters;
}

private void checkAndStartExecutableStreams(DataStreamDeclarer declarer) {
if (declarer.isExecutable()) {
declarer.executeStream();
Loading

0 comments on commit 3c851b3

Please sign in to comment.