Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(migrations): add --define flag to apply command #7401

Merged
merged 13 commits into from
Apr 21, 2021
15 changes: 15 additions & 0 deletions docs/operate-and-deploy/migrations-tool.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ ksql-migrations {-c | --config-file} <config-file> apply
[ {-n | --next} ]
[ {-u | --until} <untilVersion> ]
[ {-v | --version} <version> ]
[ {-d | --define} <variableName>=<variableValue> ]
[ --dry-run ]
```

Expand All @@ -239,6 +240,20 @@ to apply:
In addition to selecting a mode for `ksql-migrations apply`, you must also provide
the path to the config file of your migrations project as part of the command.

You can define variables by passing the `--define` flag followed by a string of the form
`name=value` any number of times. For example, the following command

```bash
$ ksql-migrations --config-file /my/migrations/project/ksql-migrations.properties apply --next -d foo=bar -d car=3
```

is equivalent to having the following lines at the begining of each migration file:

```
DEFINE foo='bar';
DEFINE car='3';
```

You can optionally use the `--dry-run` flag to see which migration file(s) the
command will apply before running the actual `ksql-migrations apply` command
to update your ksqlDB cluster. The dry run does not validate whether the ksqlDB
Expand Down
19 changes: 2 additions & 17 deletions ksqldb-cli/src/main/java/io/confluent/ksql/cli/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,10 @@
import com.github.rvesse.airline.annotations.restrictions.ranges.LongRange;
import com.github.rvesse.airline.help.Help;
import com.github.rvesse.airline.parser.errors.ParseException;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.cli.console.OutputFormat;
import io.confluent.ksql.parser.VariableParser;
import io.confluent.ksql.rest.client.BasicCredentials;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -233,20 +232,6 @@ public Optional<String> getScriptFile() {
}

public Map<String, String> getVariables() {
if (definedVars == null) {
return Collections.emptyMap();
}

final ImmutableMap.Builder<String, String> variables = ImmutableMap.builder();
for (String pair : definedVars) {
final String[] parts = pair.split("=");
if (parts.length != 2) {
throw new IllegalArgumentException("Variables must be defined using '=' (i.e. var=val).");
}

variables.put(parts[0], parts[1]);
}

return variables.build();
return VariableParser.getVariables(definedVars);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"; you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.parser;
jzaralim marked this conversation as resolved.
Show resolved Hide resolved

import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public final class VariableParser {
private VariableParser() {
}

/**
* Parses a list of Strings of the form, var=val into a map of variables and values.
*/
public static Map<String, String> getVariables(final List<String> definedVars) {
if (definedVars == null) {
return Collections.emptyMap();
}

final ImmutableMap.Builder<String, String> variables = ImmutableMap.builder();
for (String pair : definedVars) {
final String[] parts = pair.split("=");
if (parts.length != 2) {
throw new IllegalArgumentException("Failed to parse argument " + pair
+ ": variables must be defined using '=' (i.e. var=val).");
}

variables.put(parts[0], parts[1]);
}

return variables.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.confluent.ksql.api.client.FieldInfo;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.parser.VariableParser;
import io.confluent.ksql.tools.migrations.MigrationConfig;
import io.confluent.ksql.tools.migrations.MigrationException;
import io.confluent.ksql.tools.migrations.util.CommandParser;
Expand Down Expand Up @@ -122,6 +123,14 @@ public class ApplyMigrationCommand extends BaseCommand {
@Once
private boolean dryRun = false;

@Option(
name = {"--define", "-d"},
description = "Define variables for the session. This is equivalent to including DEFINE "
+ "statements before each migration. The `--define` option should be followed by a "
+ "string of the form `name=value` and may be passed any number of times."
)
private List<String> definedVars = null;

@Override
protected int command() {
if (!validateConfigFilePresent()) {
Expand Down Expand Up @@ -337,7 +346,7 @@ private void executeCommands(
final String previous,
final boolean validateOnly
) {
cleanUpJavaClientVariables(ksqlClient);
setUpJavaClientVariables(ksqlClient);
final Map<String, Object> properties = new HashMap<>();
for (final String command : commands) {
try {
Expand All @@ -360,8 +369,13 @@ private void executeCommands(
}
}

private void cleanUpJavaClientVariables(final Client ksqlClient) {
private void setUpJavaClientVariables(final Client ksqlClient) {
ksqlClient.getVariables().forEach((k, v) -> ksqlClient.undefine(k));
try {
VariableParser.getVariables(definedVars).forEach((k, v) -> ksqlClient.define(k, v));
} catch (IllegalArgumentException e) {
throw new MigrationException(e.getMessage());
}
}

private void executeCommand(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ private void shouldApplyMigrations() throws Exception {
1,
"foo FOO fO0",
configFilePath,
"CREATE STREAM FOO (A STRING) WITH (KAFKA_TOPIC='FOO', PARTITIONS=1, VALUE_FORMAT='JSON');\n" +
"CREATE STREAM ${streamName} (A STRING) WITH (KAFKA_TOPIC='FOO', PARTITIONS=1, VALUE_FORMAT='JSON');\n" +
"-- let's create some connectors!!!\n" +
"CREATE SOURCE CONNECTOR C WITH ('connector.class'='org.apache.kafka.connect.tools.MockSourceConnector');\n" +
"CREATE SINK CONNECTOR D WITH ('connector.class'='org.apache.kafka.connect.tools.MockSinkConnector', 'topics'='d');\n" +
Expand All @@ -235,13 +235,13 @@ private void shouldApplyMigrations() throws Exception {
2,
"bar_bar_BAR",
configFilePath,
"CREATE OR REPLACE STREAM FOO (A STRING, B INT) WITH (KAFKA_TOPIC='FOO', PARTITIONS=1, VALUE_FORMAT='JSON');"
+ "ALTER STREAM FOO ADD COLUMN C BIGINT;" +
"CREATE OR REPLACE STREAM ${streamName} (A STRING, B INT) WITH (KAFKA_TOPIC='FOO', PARTITIONS=1, VALUE_FORMAT='JSON');"
+ "ALTER STREAM ${streamName} ADD COLUMN C BIGINT;" +
"/* add some '''data''' to FOO */" +
"DEFINE variable = '50';" +
"INSERT INTO FOO VALUES ('HELLO', ${variable}, -4325);" +
"INSERT INTO FOO (A) VALUES ('GOOD''BYE');" +
"INSERT INTO FOO (A) VALUES ('${onlyDefinedInFile1}--ha\nha');" +
"INSERT INTO ${streamName} (A) VALUES ('${onlyDefinedInFile1}--ha\nha');" +
"INSERT INTO FOO (A) VALUES ('');" +
"DEFINE variable = 'cool';" +
"SET 'ksql.output.topic.name.prefix' = '${variable}';" +
Expand All @@ -260,7 +260,7 @@ private void shouldApplyMigrations() throws Exception {
);

// When:
final int applyStatus = MIGRATIONS_CLI.parse("--config-file", configFilePath, "apply", "-a").runCommand();
final int applyStatus = MIGRATIONS_CLI.parse("--config-file", configFilePath, "apply", "-a", "-d", "streamName=FOO").runCommand();

// Then:
assertThat(applyStatus, is(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,70 @@ public void shouldResetVariablesBetweenMigrations() throws Exception {
inOrder.verifyNoMoreInteractions();
}

@Test
public void shouldApplyArgumentVariablesEveryMigration() throws Exception {
// Given:
command = PARSER.parse("-a", "-d", "name=tame", "-d", "dame=blame");
createMigrationFile(1, NAME, migrationsDir, "INSERT INTO FOO VALUES ('${name}');");
createMigrationFile(2, NAME, migrationsDir, "INSERT INTO FOO VALUES ('${dame}');");
when(versionQueryResult.get()).thenReturn(ImmutableList.of());
when(ksqlClient.getVariables()).thenReturn(
ImmutableMap.of("name", "tame", "dame", "blame")
);
givenAppliedMigration(1, NAME, MigrationState.MIGRATED);

// When:
final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed(
Instant.ofEpochMilli(1000), ZoneId.systemDefault()));

// Then:
assertThat(result, is(0));
final InOrder inOrder = inOrder(ksqlClient);
inOrder.verify(ksqlClient).insertInto("`FOO`", new KsqlObject(ImmutableMap.of("`A`", "tame")));
inOrder.verify(ksqlClient).insertInto("`FOO`", new KsqlObject(ImmutableMap.of("`A`", "blame")));
inOrder.verify(ksqlClient).close();
inOrder.verifyNoMoreInteractions();
}

@Test
public void defineStatementsShouldTakePrecedenceOverArgumentVariables() throws Exception {
// Given:
command = PARSER.parse("-a", "-d", "name=tame");
createMigrationFile(1, NAME, migrationsDir, "DEFINE name='flame'; INSERT INTO FOO VALUES ('${name}');");
when(versionQueryResult.get()).thenReturn(ImmutableList.of());
when(ksqlClient.getVariables()).thenReturn(
ImmutableMap.of("name", "flame")
);
givenAppliedMigration(1, NAME, MigrationState.MIGRATED);

// When:
final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed(
Instant.ofEpochMilli(1000), ZoneId.systemDefault()));

// Then:
assertThat(result, is(0));
final InOrder inOrder = inOrder(ksqlClient);
inOrder.verify(ksqlClient).define("name", "flame");
inOrder.verify(ksqlClient).insertInto("`FOO`", new KsqlObject(ImmutableMap.of("`A`", "flame")));
inOrder.verify(ksqlClient).close();
inOrder.verifyNoMoreInteractions();
}

@Test
public void shouldFailOnInvalidArgumentVariable() throws Exception {
// Given:
command = PARSER.parse("-a", "-d", "woooo");
createMigrationFile(1, NAME, migrationsDir, "INSERT INTO FOO VALUES ('${name}');");
when(versionQueryResult.get()).thenReturn(ImmutableList.of());

// When:
final int result = command.command(config, cfg -> ksqlClient, migrationsDir, Clock.fixed(
Instant.ofEpochMilli(1000), ZoneId.systemDefault()));

// Then:
assertThat(result, is(1));
}

jzaralim marked this conversation as resolved.
Show resolved Hide resolved
@Test
public void shouldResetPropertiesBetweenMigrations() throws Exception {
// Given:
Expand Down