Skip to content

Commit

Permalink
Merge session unprocessed catalog properties into normal properties
Browse files Browse the repository at this point in the history
  • Loading branch information
dain committed Feb 6, 2022
1 parent a953b7b commit 8b015fd
Show file tree
Hide file tree
Showing 12 changed files with 30 additions and 107 deletions.
59 changes: 18 additions & 41 deletions core/trino-main/src/main/java/io/trino/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ public final class Session
private final Instant start;
private final Map<String, String> systemProperties;
// TODO use Table
private final Map<String, Map<String, String>> connectorProperties;
// TODO use Table
private final Map<String, Map<String, String>> unprocessedCatalogProperties;
private final Map<String, Map<String, String>> catalogProperties;
private final SessionPropertyManager sessionPropertyManager;
private final Map<String, String> preparedStatements;
private final ProtocolHeaders protocolHeaders;
Expand All @@ -104,8 +102,7 @@ public Session(
ResourceEstimates resourceEstimates,
Instant start,
Map<String, String> systemProperties,
Map<String, Map<String, String>> connectorProperties,
Map<String, Map<String, String>> unprocessedCatalogProperties,
Map<String, Map<String, String>> catalogProperties,
SessionPropertyManager sessionPropertyManager,
Map<String, String> preparedStatements,
ProtocolHeaders protocolHeaders)
Expand Down Expand Up @@ -133,19 +130,12 @@ public Session(
this.preparedStatements = requireNonNull(preparedStatements, "preparedStatements is null");
this.protocolHeaders = requireNonNull(protocolHeaders, "protocolHeaders is null");

requireNonNull(catalogProperties, "catalogProperties is null");
ImmutableMap.Builder<String, Map<String, String>> catalogPropertiesBuilder = ImmutableMap.builder();
connectorProperties.entrySet().stream()
catalogProperties.entrySet().stream()
.map(entry -> Maps.immutableEntry(entry.getKey(), ImmutableMap.copyOf(entry.getValue())))
.forEach(catalogPropertiesBuilder::put);
this.connectorProperties = catalogPropertiesBuilder.buildOrThrow();

ImmutableMap.Builder<String, Map<String, String>> unprocessedCatalogPropertiesBuilder = ImmutableMap.builder();
unprocessedCatalogProperties.entrySet().stream()
.map(entry -> Maps.immutableEntry(entry.getKey(), ImmutableMap.copyOf(entry.getValue())))
.forEach(unprocessedCatalogPropertiesBuilder::put);
this.unprocessedCatalogProperties = unprocessedCatalogPropertiesBuilder.buildOrThrow();

checkArgument(transactionId.isEmpty() || unprocessedCatalogProperties.isEmpty(), "Catalog session properties cannot be set if there is an open transaction");
this.catalogProperties = catalogPropertiesBuilder.buildOrThrow();

checkArgument(catalog.isPresent() || schema.isEmpty(), "schema is set but catalog is not");
}
Expand Down Expand Up @@ -256,19 +246,14 @@ public <T> T getSystemProperty(String name, Class<T> type)
return sessionPropertyManager.decodeSystemPropertyValue(name, systemProperties.get(name), type);
}

public Map<String, Map<String, String>> getConnectorProperties()
public Map<String, Map<String, String>> getCatalogProperties()
{
return connectorProperties;
return catalogProperties;
}

public Map<String, String> getConnectorProperties(String catalogName)
public Map<String, String> getCatalogProperties(String catalogName)
{
return connectorProperties.getOrDefault(catalogName, ImmutableMap.of());
}

public Map<String, Map<String, String>> getUnprocessedCatalogProperties()
{
return unprocessedCatalogProperties;
return catalogProperties.getOrDefault(catalogName, ImmutableMap.of());
}

public Map<String, String> getSystemProperties()
Expand Down Expand Up @@ -309,7 +294,7 @@ public Session beginTransactionId(TransactionId transactionId, TransactionManage

// Now that there is a transaction, the catalog name can be resolved to a connector, and the catalog properties can be validated
ImmutableMap.Builder<String, Map<String, String>> connectorProperties = ImmutableMap.builder();
for (Entry<String, Map<String, String>> catalogEntry : unprocessedCatalogProperties.entrySet()) {
for (Entry<String, Map<String, String>> catalogEntry : this.catalogProperties.entrySet()) {
String catalogName = catalogEntry.getKey();
Map<String, String> catalogProperties = catalogEntry.getValue();
if (catalogProperties.isEmpty()) {
Expand Down Expand Up @@ -359,7 +344,6 @@ public Session beginTransactionId(TransactionId transactionId, TransactionManage
start,
systemProperties,
connectorProperties.buildOrThrow(),
ImmutableMap.of(),
sessionPropertyManager,
preparedStatements,
protocolHeaders);
Expand All @@ -371,22 +355,18 @@ public Session withDefaultProperties(Map<String, String> systemPropertyDefaults,
requireNonNull(catalogPropertyDefaults, "catalogPropertyDefaults is null");

checkState(transactionId.isEmpty(), "property defaults can not be added to a transaction already in progress");
checkState(connectorProperties.isEmpty(), "catalog properties have already been processed");

// NOTE: properties should not be validated here and instead will be validated in beginTransactionId
Map<String, String> systemProperties = new HashMap<>();
systemProperties.putAll(systemPropertyDefaults);
systemProperties.putAll(this.systemProperties);

Map<String, Map<String, String>> unprocessedCatalogProperties = catalogPropertyDefaults.entrySet().stream()
Map<String, Map<String, String>> catalogProperties = catalogPropertyDefaults.entrySet().stream()
.map(entry -> Maps.immutableEntry(entry.getKey(), new HashMap<>(entry.getValue())))
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
for (Entry<String, Map<String, String>> catalogProperties : this.unprocessedCatalogProperties.entrySet()) {
String catalog = catalogProperties.getKey();
for (Entry<String, String> entry : catalogProperties.getValue().entrySet()) {
unprocessedCatalogProperties.computeIfAbsent(catalog, id -> new HashMap<>())
.put(entry.getKey(), entry.getValue());
}
for (Entry<String, Map<String, String>> catalogEntry : this.catalogProperties.entrySet()) {
catalogProperties.computeIfAbsent(catalogEntry.getKey(), id -> new HashMap<>())
.putAll(catalogEntry.getValue());
}

return new Session(
Expand All @@ -409,8 +389,7 @@ public Session withDefaultProperties(Map<String, String> systemPropertyDefaults,
resourceEstimates,
start,
systemProperties,
ImmutableMap.of(),
unprocessedCatalogProperties,
catalogProperties,
sessionPropertyManager,
preparedStatements,
protocolHeaders);
Expand All @@ -433,7 +412,7 @@ public ConnectorSession toConnectorSession(CatalogName catalogName)
return new FullConnectorSession(
this,
identity.toConnectorIdentity(catalogName.getCatalogName()),
connectorProperties.getOrDefault(catalogName.getCatalogName(), ImmutableMap.of()),
catalogProperties.getOrDefault(catalogName.getCatalogName(), ImmutableMap.of()),
catalogName,
catalogName.getCatalogName(),
sessionPropertyManager);
Expand Down Expand Up @@ -464,8 +443,7 @@ public SessionRepresentation toSessionRepresentation()
resourceEstimates,
start,
systemProperties,
connectorProperties,
unprocessedCatalogProperties,
catalogProperties,
identity.getCatalogRoles(),
preparedStatements,
protocolHeaders.getProtocolName());
Expand Down Expand Up @@ -590,7 +568,7 @@ private SessionBuilder(Session session)
this.clientTags = ImmutableSet.copyOf(session.clientTags);
this.start = session.start;
this.systemProperties.putAll(session.systemProperties);
session.unprocessedCatalogProperties
session.catalogProperties
.forEach((catalog, properties) -> catalogSessionProperties.put(catalog, new HashMap<>(properties)));
this.preparedStatements.putAll(session.preparedStatements);
this.protocolHeaders = session.protocolHeaders;
Expand Down Expand Up @@ -819,7 +797,6 @@ public Session build()
Optional.ofNullable(resourceEstimates).orElse(new ResourceEstimateBuilder().build()),
start,
systemProperties,
ImmutableMap.of(),
catalogSessionProperties,
sessionPropertyManager,
preparedStatements,
Expand Down
15 changes: 0 additions & 15 deletions core/trino-main/src/main/java/io/trino/SessionRepresentation.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ public final class SessionRepresentation
private final ResourceEstimates resourceEstimates;
private final Map<String, String> systemProperties;
private final Map<String, Map<String, String>> catalogProperties;
private final Map<String, Map<String, String>> unprocessedCatalogProperties;
private final Map<String, SelectedRole> catalogRoles;
private final Map<String, String> preparedStatements;
private final String protocolName;
Expand Down Expand Up @@ -93,7 +92,6 @@ public SessionRepresentation(
@JsonProperty("start") Instant start,
@JsonProperty("systemProperties") Map<String, String> systemProperties,
@JsonProperty("catalogProperties") Map<String, Map<String, String>> catalogProperties,
@JsonProperty("unprocessedCatalogProperties") Map<String, Map<String, String>> unprocessedCatalogProperties,
@JsonProperty("catalogRoles") Map<String, SelectedRole> catalogRoles,
@JsonProperty("preparedStatements") Map<String, String> preparedStatements,
@JsonProperty("protocolName") String protocolName)
Expand Down Expand Up @@ -129,12 +127,6 @@ public SessionRepresentation(
catalogPropertiesBuilder.put(entry.getKey(), ImmutableMap.copyOf(entry.getValue()));
}
this.catalogProperties = catalogPropertiesBuilder.buildOrThrow();

ImmutableMap.Builder<String, Map<String, String>> unprocessedCatalogPropertiesBuilder = ImmutableMap.builder();
for (Entry<String, Map<String, String>> entry : unprocessedCatalogProperties.entrySet()) {
unprocessedCatalogPropertiesBuilder.put(entry.getKey(), ImmutableMap.copyOf(entry.getValue()));
}
this.unprocessedCatalogProperties = unprocessedCatalogPropertiesBuilder.buildOrThrow();
}

@JsonProperty
Expand Down Expand Up @@ -275,12 +267,6 @@ public Map<String, Map<String, String>> getCatalogProperties()
return catalogProperties;
}

@JsonProperty
public Map<String, Map<String, String>> getUnprocessedCatalogProperties()
{
return unprocessedCatalogProperties;
}

@JsonProperty
public Map<String, SelectedRole> getCatalogRoles()
{
Expand Down Expand Up @@ -349,7 +335,6 @@ public Session toSession(SessionPropertyManager sessionPropertyManager, Map<Stri
start,
systemProperties,
catalogProperties,
unprocessedCatalogProperties,
sessionPropertyManager,
preparedStatements,
createProtocolHeaders(protocolName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,13 +459,6 @@ private static Map<String, String> mergeSessionAndCatalogProperties(SessionRepre
{
Map<String, String> mergedProperties = new LinkedHashMap<>(session.getSystemProperties());

// Either processed or unprocessed catalog properties, but not both. Instead of trying to enforces this while
// firing events, allow both to be set and if there is a duplicate favor the processed properties.
for (Map.Entry<String, Map<String, String>> catalogEntry : session.getUnprocessedCatalogProperties().entrySet()) {
for (Map.Entry<String, String> entry : catalogEntry.getValue().entrySet()) {
mergedProperties.put(catalogEntry.getKey() + "." + entry.getKey(), entry.getValue());
}
}
for (Map.Entry<String, Map<String, String>> catalogEntry : session.getCatalogProperties().entrySet()) {
for (Map.Entry<String, String> entry : catalogEntry.getValue().entrySet()) {
mergedProperties.put(catalogEntry.getKey() + "." + entry.getKey(), entry.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public List<SessionPropertyValue> getAllSessionProperties(Session session, Map<S
for (Entry<String, CatalogName> entry : new TreeMap<>(catalogs).entrySet()) {
String catalog = entry.getKey();
CatalogName catalogName = entry.getValue();
Map<String, String> connectorProperties = session.getConnectorProperties(catalog);
Map<String, String> connectorProperties = session.getCatalogProperties(catalog);

for (PropertyMetadata<?> property : new TreeMap<>(connectorSessionProperties.get(catalogName)).values()) {
String defaultValue = firstNonNull(property.getDefaultValue(), "").toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,7 @@ private LocalQueryRunner(
defaultSession.getResourceEstimates(),
defaultSession.getStart(),
defaultSession.getSystemProperties(),
defaultSession.getConnectorProperties(),
defaultSession.getUnprocessedCatalogProperties(),
defaultSession.getCatalogProperties(),
sessionPropertyManager,
defaultSession.getPreparedStatements(),
defaultSession.getProtocolHeaders());
Expand Down
30 changes: 3 additions & 27 deletions core/trino-main/src/test/java/io/trino/TestSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,8 @@ public void testSetCatalogProperty()
.setCatalogSessionProperty("some_catalog", "first_property", "some_value")
.build();

assertThat(session.getUnprocessedCatalogProperties())
assertThat(session.getCatalogProperties())
.isEqualTo(Map.of("some_catalog", Map.of("first_property", "some_value")));

// empty, will be populated at transaction start
assertThat(session.getConnectorProperties())
.isEqualTo(Map.of());

// empty, will be populated at transaction start
assertThat(session.getConnectorProperties("some_catalog"))
.isEqualTo(Map.of());
}

@Test
Expand All @@ -50,16 +42,8 @@ public void testBuildWithCatalogProperty()
session = Session.builder(session)
.build();

assertThat(session.getUnprocessedCatalogProperties())
assertThat(session.getCatalogProperties())
.isEqualTo(Map.of("some_catalog", Map.of("first_property", "some_value")));

// empty, will be populated at transaction start
assertThat(session.getConnectorProperties())
.isEqualTo(Map.of());

// empty, will be populated at transaction start
assertThat(session.getConnectorProperties("some_catalog"))
.isEqualTo(Map.of());
}

@Test
Expand All @@ -72,17 +56,9 @@ public void testAddSecondCatalogProperty()
.setCatalogSessionProperty("some_catalog", "second_property", "another_value")
.build();

assertThat(session.getUnprocessedCatalogProperties())
assertThat(session.getCatalogProperties())
.isEqualTo(Map.of("some_catalog", Map.of(
"first_property", "some_value",
"second_property", "another_value")));

// empty, will be populated at transaction start
assertThat(session.getConnectorProperties())
.isEqualTo(Map.of());

// empty, will be populated at transaction start
assertThat(session.getConnectorProperties("some_catalog"))
.isEqualTo(Map.of());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,6 @@ private static void assertEqualSessionsWithoutTransactionId(Session actual, Sess
assertEquals(actual.getUserAgent(), expected.getUserAgent());
assertEquals(actual.getStart(), expected.getStart());
assertEquals(actual.getSystemProperties(), expected.getSystemProperties());
assertEquals(actual.getConnectorProperties(), expected.getConnectorProperties());
assertEquals(actual.getCatalogProperties(), expected.getCatalogProperties());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void testApplyDefaultProperties()
.put(HASH_PARTITION_COUNT, "43")
.buildOrThrow());
assertEquals(
session.getUnprocessedCatalogProperties(),
session.getCatalogProperties(),
ImmutableMap.of(
"testCatalog",
ImmutableMap.<String, String>builder()
Expand All @@ -96,7 +96,7 @@ public void testApplyDefaultProperties()
.put(QUERY_MAX_TOTAL_MEMORY, "2GB") // Default value is used
.buildOrThrow());
assertEquals(
session.getUnprocessedCatalogProperties(),
session.getCatalogProperties(),
ImmutableMap.of(
"testCatalog",
ImmutableMap.<String, String>builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8429,15 +8429,14 @@ private static void testWithStorageFormat(TestingHiveStorageFormat storageFormat
test.accept(session, storageFormat.getFormat());
}
catch (Exception | AssertionError e) {
fail(format("Failure for format %s with properties %s / %s", storageFormat.getFormat(), session.getConnectorProperties(), session.getUnprocessedCatalogProperties()), e);
fail(format("Failure for format %s with properties %s", storageFormat.getFormat(), session.getCatalogProperties()), e);
}
}

private boolean isNativeParquetWriter(Session session, HiveStorageFormat storageFormat)
{
return storageFormat == HiveStorageFormat.PARQUET &&
("true".equals(session.getConnectorProperties("hive").get("experimental_parquet_optimized_writer_enabled")) ||
"true".equals(session.getUnprocessedCatalogProperties().getOrDefault("hive", Map.of()).get("experimental_parquet_optimized_writer_enabled")));
"true".equals(session.getCatalogProperties("hive").get("experimental_parquet_optimized_writer_enabled"));
}

private List<TestingHiveStorageFormat> getAllTestingHiveStorageFormat()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,12 @@ private static ClientSession toClientSession(Session session, URI server, Durati
{
ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
properties.putAll(session.getSystemProperties());
for (Entry<String, Map<String, String>> catalogAndConnectorProperties : session.getConnectorProperties().entrySet()) {
for (Entry<String, Map<String, String>> catalogAndConnectorProperties : session.getCatalogProperties().entrySet()) {
for (Entry<String, String> connectorProperties : catalogAndConnectorProperties.getValue().entrySet()) {
String catalogName = catalogAndConnectorProperties.getKey();
properties.put(catalogName + "." + connectorProperties.getKey(), connectorProperties.getValue());
}
}
for (Entry<String, Map<String, String>> connectorProperties : session.getUnprocessedCatalogProperties().entrySet()) {
for (Entry<String, String> entry : connectorProperties.getValue().entrySet()) {
properties.put(connectorProperties.getKey() + "." + entry.getKey(), entry.getValue());
}
}

ImmutableMap.Builder<String, String> resourceEstimates = ImmutableMap.builder();
ResourceEstimates estimates = session.getResourceEstimates();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ else if (enabledRoles.size() == 1) {
session.getClientCapabilities(),
session.getResourceEstimates(),
session.getSystemProperties(),
session.getConnectorProperties(),
session.getCatalogProperties(),
session.getPreparedStatements(),
session.getTransactionId(),
session.isClientTransactionSupport(),
Expand Down
Loading

0 comments on commit 8b015fd

Please sign in to comment.