diff --git a/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/PerformImportGenerator.java b/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/PerformImportGenerator.java index e8ba6a0521..d35b8dbf1c 100644 --- a/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/PerformImportGenerator.java +++ b/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/PerformImportGenerator.java @@ -28,6 +28,7 @@ import org.apache.streampipes.export.resolver.FileResolver; import org.apache.streampipes.export.resolver.MeasurementResolver; import org.apache.streampipes.export.resolver.PipelineResolver; +import org.apache.streampipes.export.utils.ImportAdapterMigrationUtils; import org.apache.streampipes.manager.file.FileHandler; import org.apache.streampipes.model.SpDataStream; import org.apache.streampipes.model.connect.adapter.AdapterDescription; @@ -70,7 +71,8 @@ protected void handleAsset(Map previewFiles, String assetId) thr @Override protected void handleAdapter(String document, String adapterId) throws JsonProcessingException { if (shouldStore(adapterId, config.getAdapters())) { - new AdapterResolver().writeDocument(document, config.isOverrideBrokerSettings()); + var convertedDoc = ImportAdapterMigrationUtils.checkAndPerformMigration(document); + new AdapterResolver().writeDocument(convertedDoc, config.isOverrideBrokerSettings()); permissionsToStore.add(new PermissionInfo(adapterId, AdapterDescription.class)); } } diff --git a/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/PreviewImportGenerator.java b/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/PreviewImportGenerator.java index c85a2c8764..57ff84f809 100644 --- a/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/PreviewImportGenerator.java +++ b/streampipes-data-export/src/main/java/org/apache/streampipes/export/dataimport/PreviewImportGenerator.java @@ -26,18 +26,22 @@ import org.apache.streampipes.export.resolver.FileResolver; import org.apache.streampipes.export.resolver.MeasurementResolver; import org.apache.streampipes.export.resolver.PipelineResolver; +import org.apache.streampipes.export.utils.ImportAdapterMigrationUtils; import org.apache.streampipes.model.export.AssetExportConfiguration; import org.apache.streampipes.model.export.ExportItem; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Map; import java.util.function.Consumer; public class PreviewImportGenerator extends ImportGenerator { - private AssetExportConfiguration importConfig; + private static final Logger LOG = LoggerFactory.getLogger(PreviewImportGenerator.class); + private final AssetExportConfiguration importConfig; public PreviewImportGenerator() { super(); @@ -56,14 +60,20 @@ private void addExportItem(String id, @Override protected void handleAsset(Map previewFiles, String assetId) throws JsonProcessingException { Map assetDescription = this.defaultMapper.readValue(asString(previewFiles.get(assetId)), - new TypeReference>() { + new TypeReference<>() { }); importConfig.addAsset(new ExportItem(assetId, String.valueOf(assetDescription.get("assetName")), true)); } @Override - protected void handleAdapter(String document, String adapterId) throws JsonProcessingException { - addExportItem(adapterId, new AdapterResolver().readDocument(document).getName(), importConfig::addAdapter); + protected void handleAdapter(String document, + String adapterId) throws JsonProcessingException { + try { + var convertedDoc = ImportAdapterMigrationUtils.checkAndPerformMigration(document); + addExportItem(adapterId, new AdapterResolver().readDocument(convertedDoc).getName(), importConfig::addAdapter); + } catch (IllegalArgumentException e) { + LOG.warn("Skipping import of data set adapter {}", adapterId); + } } @Override diff --git a/streampipes-data-export/src/main/java/org/apache/streampipes/export/utils/ImportAdapterMigrationUtils.java b/streampipes-data-export/src/main/java/org/apache/streampipes/export/utils/ImportAdapterMigrationUtils.java new file mode 100644 index 0000000000..4100453e43 --- /dev/null +++ b/streampipes-data-export/src/main/java/org/apache/streampipes/export/utils/ImportAdapterMigrationUtils.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.export.utils; + +import org.apache.streampipes.model.connect.adapter.migration.GenericAdapterConverter; +import org.apache.streampipes.model.connect.adapter.migration.IAdapterConverter; +import org.apache.streampipes.model.connect.adapter.migration.SpecificAdapterConverter; +import org.apache.streampipes.model.connect.adapter.migration.utils.AdapterModels; + +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; + +import static org.apache.streampipes.model.connect.adapter.migration.utils.AdapterModels.GENERIC_STREAM; + +public class ImportAdapterMigrationUtils { + + public static String checkAndPerformMigration(String document) { + JsonObject doc = JsonParser.parseString(document).getAsJsonObject(); + var docType = doc.get("@class").getAsString(); + if (AdapterModels.shouldMigrate(docType)) { + if (AdapterModels.isSetAdapter(docType)) { + throw new IllegalArgumentException("Sets are no longer supported"); + } else { + var converter = getAdapterConverter(docType); + return converter.convert(doc).toString(); + } + } else { + return doc.toString(); + } + } + + private static IAdapterConverter getAdapterConverter(String adapterType) { + if (adapterType.equals(GENERIC_STREAM)) { + return new GenericAdapterConverter(true); + } else { + return new SpecificAdapterConverter(true); + } + } +} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/GenericAdapterConverter.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/GenericAdapterConverter.java new file mode 100644 index 0000000000..8b1f17e72e --- /dev/null +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/GenericAdapterConverter.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.model.connect.adapter.migration; + +import org.apache.streampipes.model.connect.adapter.migration.format.CsvFormatMigrator; +import org.apache.streampipes.model.connect.adapter.migration.format.EmptyFormatMigrator; +import org.apache.streampipes.model.connect.adapter.migration.format.JsonFormatMigrator; +import org.apache.streampipes.model.connect.adapter.migration.format.XmlFormatMigrator; + +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.streampipes.model.connect.adapter.migration.MigrationHelpers.PROPERTIES; +import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.CSV; +import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.CSV_FORMAT_ID; +import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.GEOJSON_FORMAT_ID; +import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.GEOJSON_NEW_KEY; +import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.IMAGE; +import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.IMAGE_FORMAT_ID; +import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.JSON; +import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.JSON_ARRAY_KEY_FORMAT_ID; +import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.JSON_ARRAY_KEY_NEW_KEY; +import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.JSON_ARRAY_NO_KEY_FORMAT_ID; +import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.JSON_ARRAY_NO_KEY_NEW_KEY; +import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.JSON_OBJECT_FORMAT_ID; +import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.JSON_OBJECT_NEW_KEY; +import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.XML; +import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.XML_FORMAT_ID; +import static org.apache.streampipes.model.connect.adapter.migration.utils.GenericAdapterUtils.applyFormat; +import static org.apache.streampipes.model.connect.adapter.migration.utils.GenericAdapterUtils.getFormatTemplate; + +public class GenericAdapterConverter implements IAdapterConverter { + + private static final Logger LOG = LoggerFactory.getLogger(GenericAdapterConverter.class); + private static final String PROTOCOL_DESC_KEY = "protocolDescription"; + private static final String FORMAT_DESC_KEY = "formatDescription"; + private static final String CONFIG_KEY = "config"; + + private final MigrationHelpers helpers; + + private final String typeFieldName; + private final boolean importMode; + + public GenericAdapterConverter(boolean importMode) { + this.helpers = new MigrationHelpers(); + this.importMode = importMode; + this.typeFieldName = importMode ? "@class" : "type"; + } + + @Override + public JsonObject convert(JsonObject adapter) { + helpers.updateType(adapter, typeFieldName); + if (!importMode) { + helpers.updateFieldType(adapter); + } + + JsonObject formatDescription = getProperties(adapter).get(FORMAT_DESC_KEY).getAsJsonObject(); + JsonObject protocolDescription = getProperties(adapter).get(PROTOCOL_DESC_KEY).getAsJsonObject(); + + migrateProtocolDescription(adapter, protocolDescription); + migrateFormatDescription(adapter, formatDescription); + + getProperties(adapter).remove(FORMAT_DESC_KEY); + getProperties(adapter).remove(PROTOCOL_DESC_KEY); + + return adapter; + } + + private JsonObject getProperties(JsonObject object) { + return importMode ? object : object.get(PROPERTIES).getAsJsonObject(); + } + + private void migrateProtocolDescription(JsonObject adapter, + JsonObject protocolDescription) { + JsonArray config = getProperties(adapter).get(CONFIG_KEY).getAsJsonArray(); + JsonArray protocolDescriptionConfig = protocolDescription.get(CONFIG_KEY).getAsJsonArray(); + protocolDescriptionConfig.forEach(config::add); + } + + private void migrateFormatDescription(JsonObject adapter, + JsonObject formatDescription) { + var adapterConfig = getProperties(adapter) + .get(CONFIG_KEY) + .getAsJsonArray(); + + var formatTemplate = getFormatTemplate(); + + if (isFormat(formatDescription, JSON_OBJECT_FORMAT_ID)) { + var migrator = new JsonFormatMigrator(JSON_OBJECT_NEW_KEY, formatDescription); + applyFormat(JSON, formatTemplate, migrator); + } else if (isFormat(formatDescription, JSON_ARRAY_KEY_FORMAT_ID)) { + var migrator = new JsonFormatMigrator(JSON_ARRAY_KEY_NEW_KEY, formatDescription); + applyFormat(JSON, formatTemplate, migrator); + } else if (isFormat(formatDescription, JSON_ARRAY_NO_KEY_FORMAT_ID)) { + var migrator = new JsonFormatMigrator(JSON_ARRAY_NO_KEY_NEW_KEY, formatDescription); + applyFormat(JSON, formatTemplate, migrator); + } else if (isFormat(formatDescription, CSV_FORMAT_ID)) { + var migrator = new CsvFormatMigrator(formatDescription); + applyFormat(CSV, formatTemplate, migrator); + } else if (isFormat(formatDescription, GEOJSON_FORMAT_ID)) { + var migrator = new JsonFormatMigrator(GEOJSON_NEW_KEY, formatDescription); + applyFormat(JSON, formatTemplate, migrator); + } else if (isFormat(formatDescription, XML_FORMAT_ID)) { + var migrator = new XmlFormatMigrator(formatDescription); + applyFormat(XML, formatTemplate, migrator); + } else if (isFormat(formatDescription, IMAGE_FORMAT_ID)) { + applyFormat(IMAGE, formatTemplate, new EmptyFormatMigrator()); + } else { + LOG.warn("Found unknown format {}", getAppId(formatDescription)); + } + + adapterConfig.add(formatTemplate); + } + + private boolean isFormat(JsonObject formatDescription, + String format) { + return getAppId(formatDescription).equals(format); + } + + private String getAppId(JsonObject formatDescription) { + return formatDescription + .getAsJsonObject() + .get(MigrationHelpers.APP_ID).getAsString(); + } +} diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/IAdapterConverter.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/IAdapterConverter.java new file mode 100644 index 0000000000..db799c2f13 --- /dev/null +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/IAdapterConverter.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.model.connect.adapter.migration; + +import com.google.gson.JsonObject; + +public interface IAdapterConverter { + + JsonObject convert(JsonObject adapterDescription); +} diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/migrator/MigrationHelpers.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/MigrationHelpers.java similarity index 79% rename from streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/migrator/MigrationHelpers.java rename to streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/MigrationHelpers.java index 8e73985204..d518086e9e 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/migrator/MigrationHelpers.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/MigrationHelpers.java @@ -16,9 +16,9 @@ * */ -package org.apache.streampipes.service.core.migrations.v093.migrator; +package org.apache.streampipes.model.connect.adapter.migration; -import org.apache.streampipes.service.core.migrations.v093.utils.AdapterMigrationUtils; +import org.apache.streampipes.model.connect.adapter.migration.utils.AdapterModels; import com.google.gson.JsonObject; import com.google.gson.JsonPrimitive; @@ -40,12 +40,13 @@ public String getRev(JsonObject adapter) { return adapter.get(REV).getAsString(); } - public void updateType(JsonObject adapter) { - adapter.add("type", new JsonPrimitive(AdapterMigrationUtils.NEW_MODEL)); + public void updateType(JsonObject adapter, + String typeFieldName) { + adapter.add(typeFieldName, new JsonPrimitive(AdapterModels.NEW_MODEL)); } public void updateFieldType(JsonObject adapter) { - adapter.add("field_type", new JsonPrimitive(AdapterMigrationUtils.NEW_MODEL)); + adapter.add("field_type", new JsonPrimitive(AdapterModels.NEW_MODEL)); } public String getAdapterName(JsonObject adapter) { diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/SpecificAdapterConverter.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/SpecificAdapterConverter.java new file mode 100644 index 0000000000..b584081a5a --- /dev/null +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/SpecificAdapterConverter.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.model.connect.adapter.migration; + +import com.google.gson.JsonObject; + +public class SpecificAdapterConverter implements IAdapterConverter { + + private final MigrationHelpers helpers; + private final String typeFieldName; + + public SpecificAdapterConverter(boolean importMode) { + this.helpers = new MigrationHelpers(); + this.typeFieldName = importMode ? "@class" : "type"; + } + + @Override + public JsonObject convert(JsonObject adapter) { + helpers.updateType(adapter, typeFieldName); + helpers.updateFieldType(adapter); + + return adapter; + } +} diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/format/CsvFormatMigrator.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/CsvFormatMigrator.java similarity index 96% rename from streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/format/CsvFormatMigrator.java rename to streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/CsvFormatMigrator.java index 80201eaa0b..e7365d98ef 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/format/CsvFormatMigrator.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/CsvFormatMigrator.java @@ -16,7 +16,7 @@ * */ -package org.apache.streampipes.service.core.migrations.v093.format; +package org.apache.streampipes.model.connect.adapter.migration.format; import com.google.gson.JsonObject; diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/format/EmptyFormatMigrator.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/EmptyFormatMigrator.java similarity index 93% rename from streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/format/EmptyFormatMigrator.java rename to streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/EmptyFormatMigrator.java index ad2c09f860..477e07312b 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/format/EmptyFormatMigrator.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/EmptyFormatMigrator.java @@ -16,7 +16,7 @@ * */ -package org.apache.streampipes.service.core.migrations.v093.format; +package org.apache.streampipes.model.connect.adapter.migration.format; import com.google.gson.JsonObject; diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/format/FormatMigrator.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/FormatMigrator.java similarity index 92% rename from streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/format/FormatMigrator.java rename to streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/FormatMigrator.java index be23ba0653..e6bf46cdc3 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/format/FormatMigrator.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/FormatMigrator.java @@ -16,7 +16,7 @@ * */ -package org.apache.streampipes.service.core.migrations.v093.format; +package org.apache.streampipes.model.connect.adapter.migration.format; import com.google.gson.JsonObject; diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/format/JsonFormatMigrator.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/JsonFormatMigrator.java similarity index 84% rename from streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/format/JsonFormatMigrator.java rename to streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/JsonFormatMigrator.java index d1741ef9b3..aefa2f471a 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/format/JsonFormatMigrator.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/JsonFormatMigrator.java @@ -16,16 +16,16 @@ * */ -package org.apache.streampipes.service.core.migrations.v093.format; +package org.apache.streampipes.model.connect.adapter.migration.format; -import org.apache.streampipes.service.core.migrations.v093.migrator.MigrationHelpers; +import org.apache.streampipes.model.connect.adapter.migration.MigrationHelpers; import com.google.gson.JsonObject; import com.google.gson.JsonPrimitive; -import static org.apache.streampipes.service.core.migrations.v093.utils.DocumentKeys.ALTERNATIVES; -import static org.apache.streampipes.service.core.migrations.v093.utils.DocumentKeys.INTERNAL_NAME; -import static org.apache.streampipes.service.core.migrations.v093.utils.FormatIds.JSON_ARRAY_KEY_NEW_KEY; +import static org.apache.streampipes.model.connect.adapter.migration.utils.DocumentKeys.ALTERNATIVES; +import static org.apache.streampipes.model.connect.adapter.migration.utils.DocumentKeys.INTERNAL_NAME; +import static org.apache.streampipes.model.connect.adapter.migration.utils.FormatIds.JSON_ARRAY_KEY_NEW_KEY; public class JsonFormatMigrator implements FormatMigrator { @@ -67,7 +67,5 @@ public void migrate(JsonObject newFormatProperties) { } } }); - - } } diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/format/XmlFormatMigrator.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/XmlFormatMigrator.java similarity index 95% rename from streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/format/XmlFormatMigrator.java rename to streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/XmlFormatMigrator.java index d5c7d7bee9..c6a44a8d80 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/format/XmlFormatMigrator.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/format/XmlFormatMigrator.java @@ -16,7 +16,7 @@ * */ -package org.apache.streampipes.service.core.migrations.v093.format; +package org.apache.streampipes.model.connect.adapter.migration.format; import com.google.gson.JsonObject; diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/utils/AdapterMigrationUtils.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/utils/AdapterModels.java similarity index 66% rename from streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/utils/AdapterMigrationUtils.java rename to streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/utils/AdapterModels.java index 891c033d56..07833dc0bd 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/utils/AdapterMigrationUtils.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/utils/AdapterModels.java @@ -16,26 +16,21 @@ * */ -package org.apache.streampipes.service.core.migrations.v093.utils; - -import org.apache.streampipes.service.core.migrations.v093.migrator.AdapterMigrator; -import org.apache.streampipes.service.core.migrations.v093.migrator.GenericAdapterMigrator; -import org.apache.streampipes.service.core.migrations.v093.migrator.MigrationHelpers; -import org.apache.streampipes.service.core.migrations.v093.migrator.SpecificAdapterMigrator; +package org.apache.streampipes.model.connect.adapter.migration.utils; import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; -public class AdapterMigrationUtils { +public class AdapterModels { - private static final String GENERIC_SET = + public static final String GENERIC_SET = "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription"; - private static final String SPECIFIC_SET = + public static final String SPECIFIC_SET = "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription"; - private static final String GENERIC_STREAM = + public static final String GENERIC_STREAM = "org.apache.streampipes.model.connect.adapter.GenericAdapterStreamDescription"; - private static final String SPECIFIC_STREAM = + public static final String SPECIFIC_STREAM = "org.apache.streampipes.model.connect.adapter.SpecificAdapterStreamDescription"; public static final String NEW_MODEL = @@ -56,12 +51,11 @@ public class AdapterMigrationUtils { deprecatedAdapterStreamClasses.stream()) .collect(Collectors.toList()); - public static AdapterMigrator getAdapterMigrator(String adapterType) { - if (adapterType.equals(GENERIC_STREAM)) { - return new GenericAdapterMigrator(new MigrationHelpers()); - } else { - return new SpecificAdapterMigrator(new MigrationHelpers()); - } + public static boolean shouldMigrate(String adapterClassName) { + return deprecatedAdapterClasses.contains(adapterClassName); } + public static boolean isSetAdapter(String adapterClassName) { + return AdapterModels.deprecatedAdapterSetClasses.contains(adapterClassName); + } } diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/utils/DocumentKeys.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/utils/DocumentKeys.java similarity index 93% rename from streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/utils/DocumentKeys.java rename to streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/utils/DocumentKeys.java index 6339c97e42..757db62991 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/utils/DocumentKeys.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/utils/DocumentKeys.java @@ -16,7 +16,7 @@ * */ -package org.apache.streampipes.service.core.migrations.v093.utils; +package org.apache.streampipes.model.connect.adapter.migration.utils; public class DocumentKeys { diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/utils/FormatIds.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/utils/FormatIds.java similarity index 96% rename from streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/utils/FormatIds.java rename to streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/utils/FormatIds.java index 9d8fc539ff..29bce717f4 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/utils/FormatIds.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/utils/FormatIds.java @@ -16,7 +16,7 @@ * */ -package org.apache.streampipes.service.core.migrations.v093.utils; +package org.apache.streampipes.model.connect.adapter.migration.utils; public class FormatIds { public static final String JSON = "Json"; diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/utils/GenericAdapterUtils.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/utils/GenericAdapterUtils.java similarity index 97% rename from streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/utils/GenericAdapterUtils.java rename to streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/utils/GenericAdapterUtils.java index c195238625..b4aed57cd4 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/utils/GenericAdapterUtils.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/adapter/migration/utils/GenericAdapterUtils.java @@ -16,16 +16,16 @@ * */ -package org.apache.streampipes.service.core.migrations.v093.utils; +package org.apache.streampipes.model.connect.adapter.migration.utils; -import org.apache.streampipes.service.core.migrations.v093.format.FormatMigrator; -import org.apache.streampipes.service.core.migrations.v093.migrator.MigrationHelpers; +import org.apache.streampipes.model.connect.adapter.migration.MigrationHelpers; +import org.apache.streampipes.model.connect.adapter.migration.format.FormatMigrator; import com.google.gson.JsonObject; import com.google.gson.JsonParser; import com.google.gson.JsonPrimitive; -import static org.apache.streampipes.service.core.migrations.v093.utils.DocumentKeys.INTERNAL_NAME; +import static org.apache.streampipes.model.connect.adapter.migration.utils.DocumentKeys.INTERNAL_NAME; public class GenericAdapterUtils { diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/AdapterBackupWriter.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/AdapterBackupWriter.java index c97e8ededa..a03d479a8a 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/AdapterBackupWriter.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/AdapterBackupWriter.java @@ -18,7 +18,7 @@ package org.apache.streampipes.service.core.migrations.v093; -import org.apache.streampipes.service.core.migrations.v093.migrator.MigrationHelpers; +import org.apache.streampipes.model.connect.adapter.migration.MigrationHelpers; import com.google.gson.JsonObject; import org.lightcouch.CouchDbClient; diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/AdapterMigration.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/AdapterMigration.java index b02403c0ef..7e8d29824f 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/AdapterMigration.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/AdapterMigration.java @@ -18,9 +18,12 @@ package org.apache.streampipes.service.core.migrations.v093; +import org.apache.streampipes.model.connect.adapter.migration.MigrationHelpers; +import org.apache.streampipes.model.connect.adapter.migration.utils.AdapterModels; import org.apache.streampipes.service.core.migrations.Migration; -import org.apache.streampipes.service.core.migrations.v093.migrator.MigrationHelpers; -import org.apache.streampipes.service.core.migrations.v093.utils.AdapterMigrationUtils; +import org.apache.streampipes.service.core.migrations.v093.migrator.AdapterMigrator; +import org.apache.streampipes.service.core.migrations.v093.migrator.GenericAdapterMigrator; +import org.apache.streampipes.service.core.migrations.v093.migrator.SpecificAdapterMigrator; import org.apache.streampipes.storage.couchdb.utils.Utils; import com.google.gson.JsonObject; @@ -32,6 +35,8 @@ import java.util.ArrayList; import java.util.List; +import static org.apache.streampipes.model.connect.adapter.migration.utils.AdapterModels.GENERIC_STREAM; + public class AdapterMigration implements Migration { private static final Logger LOG = LoggerFactory.getLogger(AdapterMigration.class); @@ -61,7 +66,7 @@ public boolean shouldExecute() { findDocsToMigrate(adapterInstanceClient, adapterInstanceUri, adaptersToMigrate); findDocsToMigrate(adapterDescriptionClient, adapterDescriptionUri, adapterDescriptionsToDelete); - return adaptersToMigrate.size() > 0 || adapterDescriptionsToDelete.size() > 0; + return adaptersToMigrate.size() > 0 || adapterDescriptionsToDelete.size() > 0; } private void findDocsToMigrate(CouchDbClient adapterClient, @@ -73,7 +78,7 @@ private void findDocsToMigrate(CouchDbClient adapterClient, rows.getAsJsonArray().forEach(row -> { var doc = row.getAsJsonObject().get("doc").getAsJsonObject(); var docType = doc.get("type").getAsString(); - if (shouldMigrate(docType)) { + if (AdapterModels.shouldMigrate(docType)) { collector.add(doc); } }); @@ -105,12 +110,10 @@ public void executeMigration() throws IOException { adaptersToMigrate.forEach(adapter -> { var adapterType = adapter.get("type").getAsString(); - if (isSetAdapter(adapterType)) { + if (AdapterModels.isSetAdapter(adapterType)) { LOG.warn("Data Set adapters are no longer supported and can't be migrated - consult docs for an alternative"); } else { - AdapterMigrationUtils - .getAdapterMigrator(adapterType) - .migrate(adapterInstanceClient, adapter); + getAdapterMigrator(adapterType).migrate(adapterInstanceClient, adapter); } }); @@ -126,18 +129,11 @@ private String getAllDocsUri(CouchDbClient client) { return client.getDBUri().toString() + "_all_docs" + "?include_docs=true"; } - private boolean isSetAdapter(String adapterClassName) { - return AdapterMigrationUtils.deprecatedAdapterSetClasses.contains(adapterClassName); - } - - private boolean shouldMigrate(String adapterClassName) { - return AdapterMigrationUtils.deprecatedAdapterClasses.contains(adapterClassName); - } - - public static void main(String[] args) throws IOException { - var migrator = new AdapterMigration(); - if (migrator.shouldExecute()) { - migrator.executeMigration(); + private AdapterMigrator getAdapterMigrator(String adapterType) { + if (adapterType.equals(GENERIC_STREAM)) { + return new GenericAdapterMigrator(new MigrationHelpers()); + } else { + return new SpecificAdapterMigrator(new MigrationHelpers()); } } } diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/migrator/GenericAdapterMigrator.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/migrator/GenericAdapterMigrator.java index 18e1ea53fe..a5be002fa3 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/migrator/GenericAdapterMigrator.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/migrator/GenericAdapterMigrator.java @@ -18,122 +18,34 @@ package org.apache.streampipes.service.core.migrations.v093.migrator; -import org.apache.streampipes.service.core.migrations.v093.format.CsvFormatMigrator; -import org.apache.streampipes.service.core.migrations.v093.format.EmptyFormatMigrator; -import org.apache.streampipes.service.core.migrations.v093.format.JsonFormatMigrator; -import org.apache.streampipes.service.core.migrations.v093.format.XmlFormatMigrator; +import org.apache.streampipes.model.connect.adapter.migration.GenericAdapterConverter; +import org.apache.streampipes.model.connect.adapter.migration.IAdapterConverter; +import org.apache.streampipes.model.connect.adapter.migration.MigrationHelpers; -import com.google.gson.JsonArray; import com.google.gson.JsonObject; import org.lightcouch.CouchDbClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.streampipes.service.core.migrations.v093.migrator.MigrationHelpers.PROPERTIES; -import static org.apache.streampipes.service.core.migrations.v093.utils.FormatIds.CSV; -import static org.apache.streampipes.service.core.migrations.v093.utils.FormatIds.CSV_FORMAT_ID; -import static org.apache.streampipes.service.core.migrations.v093.utils.FormatIds.GEOJSON_FORMAT_ID; -import static org.apache.streampipes.service.core.migrations.v093.utils.FormatIds.GEOJSON_NEW_KEY; -import static org.apache.streampipes.service.core.migrations.v093.utils.FormatIds.IMAGE; -import static org.apache.streampipes.service.core.migrations.v093.utils.FormatIds.IMAGE_FORMAT_ID; -import static org.apache.streampipes.service.core.migrations.v093.utils.FormatIds.JSON; -import static org.apache.streampipes.service.core.migrations.v093.utils.FormatIds.JSON_ARRAY_KEY_FORMAT_ID; -import static org.apache.streampipes.service.core.migrations.v093.utils.FormatIds.JSON_ARRAY_KEY_NEW_KEY; -import static org.apache.streampipes.service.core.migrations.v093.utils.FormatIds.JSON_ARRAY_NO_KEY_FORMAT_ID; -import static org.apache.streampipes.service.core.migrations.v093.utils.FormatIds.JSON_ARRAY_NO_KEY_NEW_KEY; -import static org.apache.streampipes.service.core.migrations.v093.utils.FormatIds.JSON_OBJECT_FORMAT_ID; -import static org.apache.streampipes.service.core.migrations.v093.utils.FormatIds.JSON_OBJECT_NEW_KEY; -import static org.apache.streampipes.service.core.migrations.v093.utils.FormatIds.XML; -import static org.apache.streampipes.service.core.migrations.v093.utils.FormatIds.XML_FORMAT_ID; -import static org.apache.streampipes.service.core.migrations.v093.utils.GenericAdapterUtils.applyFormat; -import static org.apache.streampipes.service.core.migrations.v093.utils.GenericAdapterUtils.getFormatTemplate; - public class GenericAdapterMigrator implements AdapterMigrator { private static final Logger LOG = LoggerFactory.getLogger(GenericAdapterMigrator.class); - private static final String PROTOCOL_DESC_KEY = "protocolDescription"; - private static final String FORMAT_DESC_KEY = "formatDescription"; - private static final String CONFIG_KEY = "config"; - private final MigrationHelpers helpers; + private final IAdapterConverter converter; public GenericAdapterMigrator(MigrationHelpers helpers) { this.helpers = helpers; + this.converter = new GenericAdapterConverter(false); } @Override public void migrate(CouchDbClient couchDbClient, JsonObject adapter) { var adapterName = helpers.getAdapterName(adapter); - helpers.updateType(adapter); - helpers.updateFieldType(adapter); - - JsonObject formatDescription = adapter.get(PROPERTIES).getAsJsonObject().get(FORMAT_DESC_KEY).getAsJsonObject(); - JsonObject protocolDescription = adapter.get(PROPERTIES).getAsJsonObject().get(PROTOCOL_DESC_KEY).getAsJsonObject(); - - migrateProtocolDescription(adapter, protocolDescription); - migrateFormatDescription(adapter, formatDescription); + var convertedAdapter = converter.convert(adapter); - adapter.get(PROPERTIES).getAsJsonObject().remove(FORMAT_DESC_KEY); - adapter.get(PROPERTIES).getAsJsonObject().remove(PROTOCOL_DESC_KEY); - - couchDbClient.update(adapter); + couchDbClient.update(convertedAdapter); LOG.info("Successfully migrated adapter {}", adapterName); } - - private void migrateProtocolDescription(JsonObject adapter, - JsonObject protocolDescription) { - JsonArray config = adapter.get(PROPERTIES).getAsJsonObject().get(CONFIG_KEY).getAsJsonArray(); - JsonArray protocolDescriptionConfig = protocolDescription.get(CONFIG_KEY).getAsJsonArray(); - protocolDescriptionConfig.forEach(config::add); - } - - private void migrateFormatDescription(JsonObject adapter, - JsonObject formatDescription) { - var adapterConfig = adapter - .get(PROPERTIES) - .getAsJsonObject() - .get(CONFIG_KEY) - .getAsJsonArray(); - - var formatTemplate = getFormatTemplate(); - - if (isFormat(formatDescription, JSON_OBJECT_FORMAT_ID)) { - var migrator = new JsonFormatMigrator(JSON_OBJECT_NEW_KEY, formatDescription); - applyFormat(JSON, formatTemplate, migrator); - } else if (isFormat(formatDescription, JSON_ARRAY_KEY_FORMAT_ID)) { - var migrator = new JsonFormatMigrator(JSON_ARRAY_KEY_NEW_KEY, formatDescription); - applyFormat(JSON, formatTemplate, migrator); - } else if (isFormat(formatDescription, JSON_ARRAY_NO_KEY_FORMAT_ID)) { - var migrator = new JsonFormatMigrator(JSON_ARRAY_NO_KEY_NEW_KEY, formatDescription); - applyFormat(JSON, formatTemplate, migrator); - } else if (isFormat(formatDescription, CSV_FORMAT_ID)) { - var migrator = new CsvFormatMigrator(formatDescription); - applyFormat(CSV, formatTemplate, migrator); - } else if (isFormat(formatDescription, GEOJSON_FORMAT_ID)) { - var migrator = new JsonFormatMigrator(GEOJSON_NEW_KEY, formatDescription); - applyFormat(JSON, formatTemplate, migrator); - } else if (isFormat(formatDescription, XML_FORMAT_ID)) { - var migrator = new XmlFormatMigrator(formatDescription); - applyFormat(XML, formatTemplate, migrator); - } else if (isFormat(formatDescription, IMAGE_FORMAT_ID)) { - applyFormat(IMAGE, formatTemplate, new EmptyFormatMigrator()); - } else { - LOG.warn("Found unknown format {}", getAppId(formatDescription)); - } - - adapterConfig.add(formatTemplate); - } - - private boolean isFormat(JsonObject formatDescription, - String format) { - return getAppId(formatDescription).equals(format); - } - - private String getAppId(JsonObject formatDescription) { - return formatDescription - .getAsJsonObject() - .get(MigrationHelpers.APP_ID).getAsString(); - } } diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/migrator/SpecificAdapterMigrator.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/migrator/SpecificAdapterMigrator.java index 19a4584875..0c5a7765a5 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/migrator/SpecificAdapterMigrator.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/migrations/v093/migrator/SpecificAdapterMigrator.java @@ -18,6 +18,10 @@ package org.apache.streampipes.service.core.migrations.v093.migrator; +import org.apache.streampipes.model.connect.adapter.migration.IAdapterConverter; +import org.apache.streampipes.model.connect.adapter.migration.MigrationHelpers; +import org.apache.streampipes.model.connect.adapter.migration.SpecificAdapterConverter; + import com.google.gson.JsonObject; import org.lightcouch.CouchDbClient; import org.slf4j.Logger; @@ -27,19 +31,20 @@ public class SpecificAdapterMigrator implements AdapterMigrator { private static final Logger LOG = LoggerFactory.getLogger(SpecificAdapterMigrator.class); private final MigrationHelpers helpers; + private final IAdapterConverter converter; public SpecificAdapterMigrator(MigrationHelpers helpers) { this.helpers = helpers; + this.converter = new SpecificAdapterConverter(false); } @Override public void migrate(CouchDbClient couchDbClient, JsonObject adapter) { var adapterName = helpers.getAdapterName(adapter); - helpers.updateType(adapter); - helpers.updateFieldType(adapter); + var convertedAdapter = converter.convert(adapter); - couchDbClient.update(adapter); + couchDbClient.update(convertedAdapter); LOG.info("Successfully migrated adapter {}", adapterName); }