Skip to content

Commit

Permalink
Support migration of adapters in data import (#1683)
Browse files Browse the repository at this point in the history
* Support migration of adapters in data import

* Fix checkstyle

* Fix checkstyle
  • Loading branch information
dominikriemer authored Jun 26, 2023
1 parent 27839af commit 117af96
Show file tree
Hide file tree
Showing 20 changed files with 345 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.streampipes.export.resolver.FileResolver;
import org.apache.streampipes.export.resolver.MeasurementResolver;
import org.apache.streampipes.export.resolver.PipelineResolver;
import org.apache.streampipes.export.utils.ImportAdapterMigrationUtils;
import org.apache.streampipes.manager.file.FileHandler;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
Expand Down Expand Up @@ -70,7 +71,8 @@ protected void handleAsset(Map<String, byte[]> previewFiles, String assetId) thr
@Override
protected void handleAdapter(String document, String adapterId) throws JsonProcessingException {
if (shouldStore(adapterId, config.getAdapters())) {
new AdapterResolver().writeDocument(document, config.isOverrideBrokerSettings());
var convertedDoc = ImportAdapterMigrationUtils.checkAndPerformMigration(document);
new AdapterResolver().writeDocument(convertedDoc, config.isOverrideBrokerSettings());
permissionsToStore.add(new PermissionInfo(adapterId, AdapterDescription.class));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,22 @@
import org.apache.streampipes.export.resolver.FileResolver;
import org.apache.streampipes.export.resolver.MeasurementResolver;
import org.apache.streampipes.export.resolver.PipelineResolver;
import org.apache.streampipes.export.utils.ImportAdapterMigrationUtils;
import org.apache.streampipes.model.export.AssetExportConfiguration;
import org.apache.streampipes.model.export.ExportItem;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.function.Consumer;

public class PreviewImportGenerator extends ImportGenerator<AssetExportConfiguration> {

private AssetExportConfiguration importConfig;
private static final Logger LOG = LoggerFactory.getLogger(PreviewImportGenerator.class);
private final AssetExportConfiguration importConfig;

public PreviewImportGenerator() {
super();
Expand All @@ -56,14 +60,20 @@ private void addExportItem(String id,
@Override
protected void handleAsset(Map<String, byte[]> previewFiles, String assetId) throws JsonProcessingException {
Map<String, Object> assetDescription = this.defaultMapper.readValue(asString(previewFiles.get(assetId)),
new TypeReference<Map<String, Object>>() {
new TypeReference<>() {
});
importConfig.addAsset(new ExportItem(assetId, String.valueOf(assetDescription.get("assetName")), true));
}

@Override
protected void handleAdapter(String document, String adapterId) throws JsonProcessingException {
addExportItem(adapterId, new AdapterResolver().readDocument(document).getName(), importConfig::addAdapter);
protected void handleAdapter(String document,
String adapterId) throws JsonProcessingException {
try {
var convertedDoc = ImportAdapterMigrationUtils.checkAndPerformMigration(document);
addExportItem(adapterId, new AdapterResolver().readDocument(convertedDoc).getName(), importConfig::addAdapter);
} catch (IllegalArgumentException e) {
LOG.warn("Skipping import of data set adapter {}", adapterId);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.export.utils;

import org.apache.streampipes.model.connect.adapter.migration.GenericAdapterConverter;
import org.apache.streampipes.model.connect.adapter.migration.IAdapterConverter;
import org.apache.streampipes.model.connect.adapter.migration.SpecificAdapterConverter;
import org.apache.streampipes.model.connect.adapter.migration.utils.AdapterModels;

import com.google.gson.JsonObject;
import com.google.gson.JsonParser;

import static org.apache.streampipes.model.connect.adapter.migration.utils.AdapterModels.GENERIC_STREAM;

public class ImportAdapterMigrationUtils {

public static String checkAndPerformMigration(String document) {
JsonObject doc = JsonParser.parseString(document).getAsJsonObject();
var docType = doc.get("@class").getAsString();
if (AdapterModels.shouldMigrate(docType)) {
if (AdapterModels.isSetAdapter(docType)) {
throw new IllegalArgumentException("Sets are no longer supported");
} else {
var converter = getAdapterConverter(docType);
return converter.convert(doc).toString();
}
} else {
return doc.toString();
}
}

private static IAdapterConverter getAdapterConverter(String adapterType) {
if (adapterType.equals(GENERIC_STREAM)) {
return new GenericAdapterConverter(true);
} else {
return new SpecificAdapterConverter(true);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.model.connect.adapter.migration;

import org.apache.streampipes.model.connect.adapter.migration.format.CsvFormatMigrator;
import org.apache.streampipes.model.connect.adapter.migration.format.EmptyFormatMigrator;
import org.apache.streampipes.model.connect.adapter.migration.format.JsonFormatMigrator;
import org.apache.streampipes.model.connect.adapter.migration.format.XmlFormatMigrator;

import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.streampipes.model.connect.adapter.migration.MigrationHelpers.PROPERTIES;
import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.CSV;
import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.CSV_FORMAT_ID;
import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.GEOJSON_FORMAT_ID;
import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.GEOJSON_NEW_KEY;
import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.IMAGE;
import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.IMAGE_FORMAT_ID;
import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.JSON;
import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.JSON_ARRAY_KEY_FORMAT_ID;
import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.JSON_ARRAY_KEY_NEW_KEY;
import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.JSON_ARRAY_NO_KEY_FORMAT_ID;
import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.JSON_ARRAY_NO_KEY_NEW_KEY;
import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.JSON_OBJECT_FORMAT_ID;
import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.JSON_OBJECT_NEW_KEY;
import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.XML;
import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.XML_FORMAT_ID;
import static org.apache.streampipes.model.connect.adapter.migration.utils.GenericAdapterUtils.applyFormat;
import static org.apache.streampipes.model.connect.adapter.migration.utils.GenericAdapterUtils.getFormatTemplate;

public class GenericAdapterConverter implements IAdapterConverter {

private static final Logger LOG = LoggerFactory.getLogger(GenericAdapterConverter.class);
private static final String PROTOCOL_DESC_KEY = "protocolDescription";
private static final String FORMAT_DESC_KEY = "formatDescription";
private static final String CONFIG_KEY = "config";

private final MigrationHelpers helpers;

private final String typeFieldName;
private final boolean importMode;

public GenericAdapterConverter(boolean importMode) {
this.helpers = new MigrationHelpers();
this.importMode = importMode;
this.typeFieldName = importMode ? "@class" : "type";
}

@Override
public JsonObject convert(JsonObject adapter) {
helpers.updateType(adapter, typeFieldName);
if (!importMode) {
helpers.updateFieldType(adapter);
}

JsonObject formatDescription = getProperties(adapter).get(FORMAT_DESC_KEY).getAsJsonObject();
JsonObject protocolDescription = getProperties(adapter).get(PROTOCOL_DESC_KEY).getAsJsonObject();

migrateProtocolDescription(adapter, protocolDescription);
migrateFormatDescription(adapter, formatDescription);

getProperties(adapter).remove(FORMAT_DESC_KEY);
getProperties(adapter).remove(PROTOCOL_DESC_KEY);

return adapter;
}

private JsonObject getProperties(JsonObject object) {
return importMode ? object : object.get(PROPERTIES).getAsJsonObject();
}

private void migrateProtocolDescription(JsonObject adapter,
JsonObject protocolDescription) {
JsonArray config = getProperties(adapter).get(CONFIG_KEY).getAsJsonArray();
JsonArray protocolDescriptionConfig = protocolDescription.get(CONFIG_KEY).getAsJsonArray();
protocolDescriptionConfig.forEach(config::add);
}

private void migrateFormatDescription(JsonObject adapter,
JsonObject formatDescription) {
var adapterConfig = getProperties(adapter)
.get(CONFIG_KEY)
.getAsJsonArray();

var formatTemplate = getFormatTemplate();

if (isFormat(formatDescription, JSON_OBJECT_FORMAT_ID)) {
var migrator = new JsonFormatMigrator(JSON_OBJECT_NEW_KEY, formatDescription);
applyFormat(JSON, formatTemplate, migrator);
} else if (isFormat(formatDescription, JSON_ARRAY_KEY_FORMAT_ID)) {
var migrator = new JsonFormatMigrator(JSON_ARRAY_KEY_NEW_KEY, formatDescription);
applyFormat(JSON, formatTemplate, migrator);
} else if (isFormat(formatDescription, JSON_ARRAY_NO_KEY_FORMAT_ID)) {
var migrator = new JsonFormatMigrator(JSON_ARRAY_NO_KEY_NEW_KEY, formatDescription);
applyFormat(JSON, formatTemplate, migrator);
} else if (isFormat(formatDescription, CSV_FORMAT_ID)) {
var migrator = new CsvFormatMigrator(formatDescription);
applyFormat(CSV, formatTemplate, migrator);
} else if (isFormat(formatDescription, GEOJSON_FORMAT_ID)) {
var migrator = new JsonFormatMigrator(GEOJSON_NEW_KEY, formatDescription);
applyFormat(JSON, formatTemplate, migrator);
} else if (isFormat(formatDescription, XML_FORMAT_ID)) {
var migrator = new XmlFormatMigrator(formatDescription);
applyFormat(XML, formatTemplate, migrator);
} else if (isFormat(formatDescription, IMAGE_FORMAT_ID)) {
applyFormat(IMAGE, formatTemplate, new EmptyFormatMigrator());
} else {
LOG.warn("Found unknown format {}", getAppId(formatDescription));
}

adapterConfig.add(formatTemplate);
}

private boolean isFormat(JsonObject formatDescription,
String format) {
return getAppId(formatDescription).equals(format);
}

private String getAppId(JsonObject formatDescription) {
return formatDescription
.getAsJsonObject()
.get(MigrationHelpers.APP_ID).getAsString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.model.connect.adapter.migration;

import com.google.gson.JsonObject;

public interface IAdapterConverter {

JsonObject convert(JsonObject adapterDescription);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
*
*/

package org.apache.streampipes.service.core.migrations.v093.migrator;
package org.apache.streampipes.model.connect.adapter.migration;

import org.apache.streampipes.service.core.migrations.v093.utils.AdapterMigrationUtils;
import org.apache.streampipes.model.connect.adapter.migration.utils.AdapterModels;

import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
Expand All @@ -40,12 +40,13 @@ public String getRev(JsonObject adapter) {
return adapter.get(REV).getAsString();
}

public void updateType(JsonObject adapter) {
adapter.add("type", new JsonPrimitive(AdapterMigrationUtils.NEW_MODEL));
public void updateType(JsonObject adapter,
String typeFieldName) {
adapter.add(typeFieldName, new JsonPrimitive(AdapterModels.NEW_MODEL));
}

public void updateFieldType(JsonObject adapter) {
adapter.add("field_type", new JsonPrimitive(AdapterMigrationUtils.NEW_MODEL));
adapter.add("field_type", new JsonPrimitive(AdapterModels.NEW_MODEL));
}

public String getAdapterName(JsonObject adapter) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.model.connect.adapter.migration;

import com.google.gson.JsonObject;

public class SpecificAdapterConverter implements IAdapterConverter {

private final MigrationHelpers helpers;
private final String typeFieldName;

public SpecificAdapterConverter(boolean importMode) {
this.helpers = new MigrationHelpers();
this.typeFieldName = importMode ? "@class" : "type";
}

@Override
public JsonObject convert(JsonObject adapter) {
helpers.updateType(adapter, typeFieldName);
helpers.updateFieldType(adapter);

return adapter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

package org.apache.streampipes.service.core.migrations.v093.format;
package org.apache.streampipes.model.connect.adapter.migration.format;

import com.google.gson.JsonObject;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

package org.apache.streampipes.service.core.migrations.v093.format;
package org.apache.streampipes.model.connect.adapter.migration.format;

import com.google.gson.JsonObject;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

package org.apache.streampipes.service.core.migrations.v093.format;
package org.apache.streampipes.model.connect.adapter.migration.format;

import com.google.gson.JsonObject;

Expand Down
Loading

0 comments on commit 117af96

Please sign in to comment.