diff --git a/docs/operate-and-deploy/migrations-tool.md b/docs/operate-and-deploy/migrations-tool.md index 0b5738c4147f..7f8a3fc72c89 100644 --- a/docs/operate-and-deploy/migrations-tool.md +++ b/docs/operate-and-deploy/migrations-tool.md @@ -224,6 +224,7 @@ ksql-migrations {-c | --config-file} apply [ {-n | --next} ] [ {-u | --until} ] [ {-v | --version} ] + [ {-d | --define} = ] [ --dry-run ] ``` @@ -239,6 +240,20 @@ to apply: In addition to selecting a mode for `ksql-migrations apply`, you must also provide the path to the config file of your migrations project as part of the command. +You can define variables by passing the `--define` flag followed by a string of the form +`name=value` any number of times. For example, the following command + +```bash +$ ksql-migrations --config-file /my/migrations/project/ksql-migrations.properties apply --next -d foo=bar -d car=3 +``` + +is equivalent to having the following lines at the begining of each migration file: + +``` +DEFINE foo='bar'; +DEFINE car='3'; +``` + You can optionally use the `--dry-run` flag to see which migration file(s) the command will apply before running the actual `ksql-migrations apply` command to update your ksqlDB cluster. The dry run does not validate whether the ksqlDB diff --git a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/Options.java b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/Options.java index dd27c7aad444..6b91294a2d62 100644 --- a/ksqldb-cli/src/main/java/io/confluent/ksql/cli/Options.java +++ b/ksqldb-cli/src/main/java/io/confluent/ksql/cli/Options.java @@ -24,11 +24,10 @@ import com.github.rvesse.airline.annotations.restrictions.ranges.LongRange; import com.github.rvesse.airline.help.Help; import com.github.rvesse.airline.parser.errors.ParseException; -import com.google.common.collect.ImmutableMap; import io.confluent.ksql.cli.console.OutputFormat; +import io.confluent.ksql.parser.VariableParser; import io.confluent.ksql.rest.client.BasicCredentials; import java.io.IOException; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -233,20 +232,6 @@ public Optional getScriptFile() { } public Map getVariables() { - if (definedVars == null) { - return Collections.emptyMap(); - } - - final ImmutableMap.Builder variables = ImmutableMap.builder(); - for (String pair : definedVars) { - final String[] parts = pair.split("="); - if (parts.length != 2) { - throw new IllegalArgumentException("Variables must be defined using '=' (i.e. var=val)."); - } - - variables.put(parts[0], parts[1]); - } - - return variables.build(); + return VariableParser.getVariables(definedVars); } } diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/parser/VariableParser.java b/ksqldb-common/src/main/java/io/confluent/ksql/parser/VariableParser.java new file mode 100644 index 000000000000..47beb7ef2456 --- /dev/null +++ b/ksqldb-common/src/main/java/io/confluent/ksql/parser/VariableParser.java @@ -0,0 +1,48 @@ +/* + * Copyright 2021 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"; you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.parser; + +import com.google.common.collect.ImmutableMap; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public final class VariableParser { + private VariableParser() { + } + + /** + * Parses a list of Strings of the form, var=val into a map of variables and values. + */ + public static Map getVariables(final List definedVars) { + if (definedVars == null) { + return Collections.emptyMap(); + } + + final ImmutableMap.Builder variables = ImmutableMap.builder(); + for (String pair : definedVars) { + final String[] parts = pair.split("="); + if (parts.length != 2) { + throw new IllegalArgumentException("Failed to parse argument " + pair + + ": variables must be defined using '=' (i.e. var=val)."); + } + + variables.put(parts[0], parts[1]); + } + + return variables.build(); + } +} diff --git a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java index b90d275be40b..47b64072024f 100644 --- a/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java +++ b/ksqldb-tools/src/main/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommand.java @@ -30,6 +30,7 @@ import io.confluent.ksql.api.client.FieldInfo; import io.confluent.ksql.api.client.KsqlObject; import io.confluent.ksql.execution.expression.tree.Expression; +import io.confluent.ksql.parser.VariableParser; import io.confluent.ksql.tools.migrations.MigrationConfig; import io.confluent.ksql.tools.migrations.MigrationException; import io.confluent.ksql.tools.migrations.util.CommandParser; @@ -122,6 +123,14 @@ public class ApplyMigrationCommand extends BaseCommand { @Once private boolean dryRun = false; + @Option( + name = {"--define", "-d"}, + description = "Define variables for the session. This is equivalent to including DEFINE " + + "statements before each migration. The `--define` option should be followed by a " + + "string of the form `name=value` and may be passed any number of times." + ) + private List definedVars = null; + @Override protected int command() { if (!validateConfigFilePresent()) { @@ -337,7 +346,7 @@ private void executeCommands( final String previous, final boolean validateOnly ) { - cleanUpJavaClientVariables(ksqlClient); + setUpJavaClientVariables(ksqlClient); final Map properties = new HashMap<>(); for (final String command : commands) { try { @@ -360,8 +369,13 @@ private void executeCommands( } } - private void cleanUpJavaClientVariables(final Client ksqlClient) { + private void setUpJavaClientVariables(final Client ksqlClient) { ksqlClient.getVariables().forEach((k, v) -> ksqlClient.undefine(k)); + try { + VariableParser.getVariables(definedVars).forEach((k, v) -> ksqlClient.define(k, v)); + } catch (IllegalArgumentException e) { + throw new MigrationException(e.getMessage()); + } } private void executeCommand( diff --git a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/MigrationsTest.java b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/MigrationsTest.java index a23308473039..ce1caf7044c6 100644 --- a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/MigrationsTest.java +++ b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/MigrationsTest.java @@ -223,7 +223,7 @@ private void shouldApplyMigrations() throws Exception { 1, "foo FOO fO0", configFilePath, - "CREATE STREAM FOO (A STRING) WITH (KAFKA_TOPIC='FOO', PARTITIONS=1, VALUE_FORMAT='JSON');\n" + + "CREATE STREAM ${streamName} (A STRING) WITH (KAFKA_TOPIC='FOO', PARTITIONS=1, VALUE_FORMAT='JSON');\n" + "-- let's create some connectors!!!\n" + "CREATE SOURCE CONNECTOR C WITH ('connector.class'='org.apache.kafka.connect.tools.MockSourceConnector');\n" + "CREATE SINK CONNECTOR D WITH ('connector.class'='org.apache.kafka.connect.tools.MockSinkConnector', 'topics'='d');\n" + @@ -235,13 +235,13 @@ private void shouldApplyMigrations() throws Exception { 2, "bar_bar_BAR", configFilePath, - "CREATE OR REPLACE STREAM FOO (A STRING, B INT) WITH (KAFKA_TOPIC='FOO', PARTITIONS=1, VALUE_FORMAT='JSON');" - + "ALTER STREAM FOO ADD COLUMN C BIGINT;" + + "CREATE OR REPLACE STREAM ${streamName} (A STRING, B INT) WITH (KAFKA_TOPIC='FOO', PARTITIONS=1, VALUE_FORMAT='JSON');" + + "ALTER STREAM ${streamName} ADD COLUMN C BIGINT;" + "/* add some '''data''' to FOO */" + "DEFINE variable = '50';" + "INSERT INTO FOO VALUES ('HELLO', ${variable}, -4325);" + "INSERT INTO FOO (A) VALUES ('GOOD''BYE');" + - "INSERT INTO FOO (A) VALUES ('${onlyDefinedInFile1}--ha\nha');" + + "INSERT INTO ${streamName} (A) VALUES ('${onlyDefinedInFile1}--ha\nha');" + "INSERT INTO FOO (A) VALUES ('');" + "DEFINE variable = 'cool';" + "SET 'ksql.output.topic.name.prefix' = '${variable}';" + @@ -260,7 +260,7 @@ private void shouldApplyMigrations() throws Exception { ); // When: - final int applyStatus = MIGRATIONS_CLI.parse("--config-file", configFilePath, "apply", "-a").runCommand(); + final int applyStatus = MIGRATIONS_CLI.parse("--config-file", configFilePath, "apply", "-a", "-d", "streamName=FOO").runCommand(); // Then: assertThat(applyStatus, is(0)); diff --git a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommandTest.java b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommandTest.java index 3e5760c62aad..b663daf12ea7 100644 --- a/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommandTest.java +++ b/ksqldb-tools/src/test/java/io/confluent/ksql/tools/migrations/commands/ApplyMigrationCommandTest.java @@ -273,6 +273,70 @@ public void shouldResetVariablesBetweenMigrations() throws Exception { inOrder.verifyNoMoreInteractions(); } + @Test + public void shouldApplyArgumentVariablesEveryMigration() throws Exception { + // Given: + command = PARSER.parse("-a", "-d", "name=tame", "-d", "dame=blame"); + createMigrationFile(1, NAME, migrationsDir, "INSERT INTO FOO VALUES ('${name}');"); + createMigrationFile(2, NAME, migrationsDir, "INSERT INTO FOO VALUES ('${dame}');"); + when(versionQueryResult.get()).thenReturn(ImmutableList.of()); + when(ksqlClient.getVariables()).thenReturn( + ImmutableMap.of("name", "tame", "dame", "blame") + ); + givenAppliedMigration(1, NAME, MigrationState.MIGRATED); + + // When: + final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + Instant.ofEpochMilli(1000), ZoneId.systemDefault())); + + // Then: + assertThat(result, is(0)); + final InOrder inOrder = inOrder(ksqlClient); + inOrder.verify(ksqlClient).insertInto("`FOO`", new KsqlObject(ImmutableMap.of("`A`", "tame"))); + inOrder.verify(ksqlClient).insertInto("`FOO`", new KsqlObject(ImmutableMap.of("`A`", "blame"))); + inOrder.verify(ksqlClient).close(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void defineStatementsShouldTakePrecedenceOverArgumentVariables() throws Exception { + // Given: + command = PARSER.parse("-a", "-d", "name=tame"); + createMigrationFile(1, NAME, migrationsDir, "DEFINE name='flame'; INSERT INTO FOO VALUES ('${name}');"); + when(versionQueryResult.get()).thenReturn(ImmutableList.of()); + when(ksqlClient.getVariables()).thenReturn( + ImmutableMap.of("name", "flame") + ); + givenAppliedMigration(1, NAME, MigrationState.MIGRATED); + + // When: + final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + Instant.ofEpochMilli(1000), ZoneId.systemDefault())); + + // Then: + assertThat(result, is(0)); + final InOrder inOrder = inOrder(ksqlClient); + inOrder.verify(ksqlClient).define("name", "flame"); + inOrder.verify(ksqlClient).insertInto("`FOO`", new KsqlObject(ImmutableMap.of("`A`", "flame"))); + inOrder.verify(ksqlClient).close(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void shouldFailOnInvalidArgumentVariable() throws Exception { + // Given: + command = PARSER.parse("-a", "-d", "woooo"); + createMigrationFile(1, NAME, migrationsDir, "INSERT INTO FOO VALUES ('${name}');"); + when(versionQueryResult.get()).thenReturn(ImmutableList.of()); + + // When: + final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed( + Instant.ofEpochMilli(1000), ZoneId.systemDefault())); + + // Then: + assertThat(result, is(1)); + } + @Test public void shouldResetPropertiesBetweenMigrations() throws Exception { // Given: