Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(migrations): add --define flag to apply command #7401

Merged
merged 13 commits into from
Apr 21, 2021
21 changes: 21 additions & 0 deletions docs/operate-and-deploy/migrations-tool.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ types of ksqlDB statements:
* `DROP TYPE`
* `SET <property>`
* `UNSET <property>`
* `DEFINE <variable>`
* `UNDEFINE <variable>`

Any properties or variables set using the `SET`, `UNSET`, `DEFINE` and `UNDEFINE` are applied in the
current migration file only. They do not carry over to the next migration file, even if multiple
migration files are applied as part of the same `ksql-migrations apply` command

Requirements and Installation
-----------------------------
Expand Down Expand Up @@ -218,6 +224,7 @@ ksql-migrations {-c | --config-file} <config-file> apply
[ {-n | --next} ]
[ {-u | --until} <untilVersion> ]
[ {-v | --version} <version> ]
[ {-d | --define} <variableName>=<variableValue> ]
[ --dry-run ]
```

Expand All @@ -233,6 +240,20 @@ to apply:
In addition to selecting a mode for `ksql-migrations apply`, you must also provide
the path to the config file of your migrations project as part of the command.

You can define variables by passing the `--define` flag followed by a string of the form
`name=value` any number of times. For example, the following command

```
jzaralim marked this conversation as resolved.
Show resolved Hide resolved
$ ksql-migrations --config-file /my/migrations/project/ksql-migrations.properties apply --next -d foo=bar -d car=3
```

is equivalent to having the following lines at the begining of each migration file:

```
DEFINE foo='bar';
DEFINE car='3';
```

You can optionally use the `--dry-run` flag to see which migration file(s) the
command will apply before running the actual `ksql-migrations apply` command
to update your ksqlDB cluster. The dry run does not validate whether the ksqlDB
Expand Down
17 changes: 2 additions & 15 deletions ksqldb-cli/src/main/java/io/confluent/ksql/cli/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.github.rvesse.airline.parser.errors.ParseException;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.cli.console.OutputFormat;
import io.confluent.ksql.parser.VariableParser;
import io.confluent.ksql.rest.client.BasicCredentials;
import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -233,20 +234,6 @@ public Optional<String> getScriptFile() {
}

public Map<String, String> getVariables() {
if (definedVars == null) {
return Collections.emptyMap();
}

final ImmutableMap.Builder<String, String> variables = ImmutableMap.builder();
for (String pair : definedVars) {
final String[] parts = pair.split("=");
if (parts.length != 2) {
throw new IllegalArgumentException("Variables must be defined using '=' (i.e. var=val).");
}

variables.put(parts[0], parts[1]);
}

return variables.build();
return VariableParser.getVariables(definedVars);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.confluent.ksql.parser;
jzaralim marked this conversation as resolved.
Show resolved Hide resolved

import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public class VariableParser {

/**
* Parses a list of Strings of the form, var=val into a map of variables and values.
*/
public static Map<String, String> getVariables(final List<String> definedVars) {
if (definedVars == null) {
return Collections.emptyMap();
}

final ImmutableMap.Builder<String, String> variables = ImmutableMap.builder();
for (String pair : definedVars) {
final String[] parts = pair.split("=");
if (parts.length != 2) {
throw new IllegalArgumentException("Variables must be defined using '=' (i.e. var=val).");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not your code but let's take this opportunity to improve the error message (the migrations tool displays this error directly so we should make it more informative). Let's include the bad argument in the error message.

}

variables.put(parts[0], parts[1]);
}

return variables.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.text.StringSubstitutor;

public final class VariableSubstitutor {
Expand Down Expand Up @@ -57,6 +58,16 @@ public static Set<String> lookup(final String text) {
}
}

public static String substitute(
final String string,
final Map<String, String> valueMap
) {
return StringSubstitutor.replace(
string, valueMap.entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey(), e -> sanitize(e.getValue())))
);
}

public static String substitute(
final KsqlParser.ParsedStatement parsedStatement,
final Map<String, String> valueMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,20 @@ public void shouldThrowOnSQLInjection() {
assertThrowOnInvalidVariables(statements, variablesMap);
}

@Test
public void shouldSubstituteVariablesInString() {
// Given
final Map<String, String> variablesMap = new ImmutableMap.Builder<String, String>() {{
put("event", "birthday");
}}.build();

// When
final String substituted = VariableSubstitutor.substitute("Happy ${event} to you!", variablesMap);

// Then
assertThat(substituted, equalTo("Happy birthday to you!"));
}

private void assertReplacedStatements(
final List<Pair<String, String>> statements,
final Map<String, String> variablesMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,18 @@
import io.confluent.ksql.api.client.FieldInfo;
import io.confluent.ksql.api.client.KsqlObject;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.parser.VariableParser;
import io.confluent.ksql.tools.migrations.MigrationConfig;
import io.confluent.ksql.tools.migrations.MigrationException;
import io.confluent.ksql.tools.migrations.util.CommandParser;
import io.confluent.ksql.tools.migrations.util.CommandParser.SqlCommand;
import io.confluent.ksql.tools.migrations.util.CommandParser.SqlCreateConnectorStatement;
import io.confluent.ksql.tools.migrations.util.CommandParser.SqlDefineVariableCommand;
import io.confluent.ksql.tools.migrations.util.CommandParser.SqlDropConnectorStatement;
import io.confluent.ksql.tools.migrations.util.CommandParser.SqlInsertValues;
import io.confluent.ksql.tools.migrations.util.CommandParser.SqlPropertyCommand;
import io.confluent.ksql.tools.migrations.util.CommandParser.SqlStatement;
import io.confluent.ksql.tools.migrations.util.CommandParser.SqlUndefineVariableCommand;
import io.confluent.ksql.tools.migrations.util.MetadataUtil;
import io.confluent.ksql.tools.migrations.util.MetadataUtil.MigrationState;
import io.confluent.ksql.tools.migrations.util.MigrationFile;
Expand Down Expand Up @@ -120,6 +123,14 @@ public class ApplyMigrationCommand extends BaseCommand {
@Once
private boolean dryRun = false;

@Option(
name = {"--define", "-d"},
description = "Define variables for the session. This is equivalent to including DEFINE "
+ "statements before each migration. The `--define` option should be followed by a "
+ "string of the form `name=value` and may be passed any number of times."
)
private List<String> definedVars = null;

@Override
protected int command() {
if (!validateConfigFilePresent()) {
Expand Down Expand Up @@ -311,22 +322,84 @@ private void executeCommands(
final Clock clock,
final String previous
) {
final List<String> commands = CommandParser.splitSql(migrationFileContent);

executeCommands(
commands, ksqlClient, config, executionStart, migration, clock, previous, true);
executeCommands(
commands, ksqlClient, config, executionStart, migration, clock, previous, false);
}

/**
* If validateOnly is set to true, then this parses each of the commands but only executes
* DEFINE/UNDEFINE commands (variables are needed for parsing INSERT INTO... VALUES, SET/UNSET
* and DEFINE commands). If validateOnly is set to false, then each command will execute after
* parsing.
*/
private void executeCommands(
final List<String> commands,
final Client ksqlClient,
final MigrationConfig config,
final String executionStart,
final MigrationFile migration,
final Clock clock,
final String previous,
final boolean validateOnly
) {
setUpJavaClientVariables(ksqlClient);
final Map<String, Object> properties = new HashMap<>();
final List<SqlCommand> commands = CommandParser.parse(migrationFileContent);
for (final SqlCommand command : commands) {
for (final String command : commands) {
try {
executeCommand(command, ksqlClient, properties);
} catch (InterruptedException | ExecutionException e) {
final Map<String, String> variables = ksqlClient.getVariables().entrySet()
.stream().collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().toString()));
executeCommand(
CommandParser.transformToSqlCommand(command, variables),
ksqlClient,
properties,
validateOnly
);
} catch (InterruptedException | ExecutionException | MigrationException e) {
final String action = validateOnly ? "parse" : "execute";
final String errorMsg = String.format(
"Failed to execute sql: %s. Error: %s", command.getCommand(), e.getMessage());
"Failed to %s sql: %s. Error: %s", action, command, e.getMessage());
updateState(config, ksqlClient, MigrationState.ERROR,
executionStart, migration, clock, previous, Optional.of(errorMsg));
throw new MigrationException(errorMsg);
}
}
}

private void setUpJavaClientVariables(final Client ksqlClient) {
ksqlClient.getVariables().forEach((k, v) -> ksqlClient.undefine(k));
try {
VariableParser.getVariables(definedVars).forEach((k, v) -> ksqlClient.define(k, v));
} catch(IllegalArgumentException e) {
throw new MigrationException(e.getMessage());
}
}

private void executeCommand(
final SqlCommand command,
final Client ksqlClient,
final Map<String, Object> properties,
final boolean defineUndefineOnly
) throws ExecutionException, InterruptedException {
if (command instanceof SqlDefineVariableCommand) {
ksqlClient.define(
((SqlDefineVariableCommand) command).getVariable(),
((SqlDefineVariableCommand) command).getValue()
);
} else if (command instanceof SqlUndefineVariableCommand) {
ksqlClient.undefine(((SqlUndefineVariableCommand) command).getVariable());
} else if (!defineUndefineOnly) {
executeNonVariableCommands(command, ksqlClient, properties);
}
}

/**
* Executes everything besides define/undefine commands
*/
private void executeNonVariableCommands(
final SqlCommand command,
final Client ksqlClient,
final Map<String, Object> properties
Expand Down
Loading