Skip to content

Commit

Permalink
Properly apply configs to influx adapter (#1289)
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikriemer committed May 26, 2023
1 parent 8d3e1cc commit 5970029
Showing 1 changed file with 4 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.commons.exceptions.connect.AdapterException;
import org.apache.streampipes.extensions.api.connect.StreamPipesAdapter;
import org.apache.streampipes.extensions.api.connect.IAdapterConfiguration;
import org.apache.streampipes.extensions.api.connect.IEventCollector;
import org.apache.streampipes.extensions.api.connect.StreamPipesAdapter;
import org.apache.streampipes.extensions.api.connect.context.IAdapterGuessSchemaContext;
import org.apache.streampipes.extensions.api.connect.context.IAdapterRuntimeContext;
import org.apache.streampipes.extensions.api.extractor.IAdapterParameterExtractor;
Expand Down Expand Up @@ -55,9 +55,6 @@ public class InfluxDbStreamAdapter implements StreamPipesAdapter {
private Thread pollingThread;
private int pollingInterval;

public InfluxDbStreamAdapter() {
}

@Override
public IAdapterConfiguration declareConfig() {
var builder = AdapterConfigurationBuilder.create(ID, InfluxDbStreamAdapter::new)
Expand All @@ -79,6 +76,7 @@ public IAdapterConfiguration declareConfig() {
public void onAdapterStarted(IAdapterParameterExtractor extractor,
IEventCollector collector,
IAdapterRuntimeContext adapterRuntimeContext) throws AdapterException {
applyConfigurations(extractor.getStaticPropertyExtractor());
pollingThread = new Thread(new PollingThread(this, pollingInterval, collector));
pollingThread.start();
}
Expand All @@ -98,7 +96,7 @@ public void onAdapterStopped(IAdapterParameterExtractor extractor, IAdapterRunti
@Override
public GuessSchema onSchemaRequested(IAdapterParameterExtractor extractor,
IAdapterGuessSchemaContext adapterGuessSchemaContext) throws AdapterException {
getConfigurations(extractor.getStaticPropertyExtractor());
applyConfigurations(extractor.getStaticPropertyExtractor());
return influxDbClient.getSchema();
}

Expand Down Expand Up @@ -182,7 +180,7 @@ private InfluxDbClient getInfluxDbClient() {
return influxDbClient;
}

private void getConfigurations(IStaticPropertyExtractor extractor) {
private void applyConfigurations(IStaticPropertyExtractor extractor) {

pollingInterval = extractor.singleValueParameter(POLLING_INTERVAL, Integer.class);
String replace = extractor.selectedSingleValueInternalName(InfluxDbClient.REPLACE_NULL_VALUES, String.class);
Expand Down

0 comments on commit 5970029

Please sign in to comment.