Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow passing database context through database http header #12417

Merged
merged 42 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
a169915
Request filters to translate table name
shounakmk219 Feb 9, 2024
b7b93d8
Make endpoints database aware
shounakmk219 Feb 13, 2024
3a1ba75
fix column name extraction logic when prefixed with database
shounakmk219 Feb 14, 2024
a29f5f5
test and checkstyle fixes
shounakmk219 Feb 14, 2024
7c8ba97
test fixes
shounakmk219 Feb 14, 2024
96fd366
Merge remote-tracking branch 'upstream/master' into db-request-filter
shounakmk219 Feb 14, 2024
09e129b
Handle deprecated config segmentsConfig.schemaName to avoid false errors
shounakmk219 Feb 21, 2024
b5e14c2
Move tableCache registration to start()
shounakmk219 Feb 21, 2024
5f28a09
fix false errors upon mixed table name formats being passed
shounakmk219 Feb 22, 2024
bb4ef42
Expose list database controller API
shounakmk219 Feb 22, 2024
b7c94a1
Merge remote-tracking branch 'upstream/master' into db-request-filter
shounakmk219 Feb 23, 2024
37c75e8
refactor getActualColumnName to cover all edge cases
shounakmk219 Feb 27, 2024
872d0ac
Merge remote-tracking branch 'upstream/master' into db-request-filter
shounakmk219 Feb 27, 2024
dd7a62c
handle null names for equivalence
shounakmk219 Feb 27, 2024
b8d1f43
add java doc
shounakmk219 Feb 28, 2024
6144ab0
set uri outside the loop
shounakmk219 Feb 28, 2024
c0c5219
Use StringUtils.split to avoid regex match
shounakmk219 Feb 28, 2024
b77eb28
Handle default db during translation without table cache
shounakmk219 Feb 28, 2024
1abae88
fixes
shounakmk219 Feb 28, 2024
3333f8f
Revert the request filter implementation and push down translation lo…
shounakmk219 Mar 1, 2024
a5eccbf
refactor PinotSchemaRestletResource
shounakmk219 Mar 1, 2024
46e8ab8
translate the table name before authz validations
shounakmk219 Mar 1, 2024
c6b017f
refactor translateTableName method
shounakmk219 Mar 1, 2024
d1a2728
missing license header
shounakmk219 Mar 1, 2024
a0b4618
refactor
shounakmk219 Mar 4, 2024
8e4e089
Merge remote-tracking branch 'upstream/master' into db-request-filter
shounakmk219 Mar 4, 2024
5b9f014
checkstyle fix
shounakmk219 Mar 4, 2024
f7971f3
cleanup
shounakmk219 Mar 5, 2024
9540b4b
Bugfix. Handle default db header for get table flows
shounakmk219 Mar 5, 2024
1a75690
Merge remote-tracking branch 'upstream/master' into db-request-filter
shounakmk219 Mar 5, 2024
37362e3
API payloads should either have all translated values or all logical …
shounakmk219 Mar 6, 2024
fd1e750
refactors and minor fixes
shounakmk219 Mar 6, 2024
e9f8f7a
Merge remote-tracking branch 'upstream/master' into db-request-filter
shounakmk219 Mar 6, 2024
12bca07
add missing table name translations on query params
shounakmk219 Mar 7, 2024
f54dee4
revert adding entries for default.tableName in tableCache name maps
shounakmk219 Mar 8, 2024
f5f7b30
fixes and reformats
shounakmk219 Mar 8, 2024
6a417b9
Merge remote-tracking branch 'upstream/master' into db-request-filter
shounakmk219 Mar 8, 2024
73621f9
remove redundant default database check
shounakmk219 Mar 12, 2024
1b21160
Merge remote-tracking branch 'upstream/master' into db-request-filter
shounakmk219 Mar 12, 2024
6f6401b
Misc fixes: ignore case, table config and schema handling
Jackie-Jiang Mar 12, 2024
68d17ee
ensure translated name is sent to auth validations
shounakmk219 Mar 12, 2024
a7d6f93
Fix test and fine grained access control
Jackie-Jiang Mar 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
Expand All @@ -62,8 +64,9 @@ public class PinotBrokerRouting {
@ApiResponse(code = 500, message = "Internal server error")
})
public String buildRouting(
@ApiParam(value = "Table name (with type)") @PathParam("tableName") String tableNameWithType) {
_routingManager.buildRouting(tableNameWithType);
@ApiParam(value = "Table name (with type)") @PathParam("tableName") String tableNameWithType,
@Context HttpHeaders headers) {
_routingManager.buildRouting(DatabaseUtils.translateTableName(tableNameWithType, headers));
return "Success";
}

Expand All @@ -78,8 +81,9 @@ public String buildRouting(
})
public String refreshRouting(
@ApiParam(value = "Table name (with type)") @PathParam("tableName") String tableNameWithType,
@ApiParam(value = "Segment name") @PathParam("segmentName") String segmentName) {
_routingManager.refreshSegment(tableNameWithType, segmentName);
@ApiParam(value = "Segment name") @PathParam("segmentName") String segmentName,
@Context HttpHeaders headers) {
_routingManager.refreshSegment(DatabaseUtils.translateTableName(tableNameWithType, headers), segmentName);
return "Success";
}

Expand All @@ -93,8 +97,9 @@ public String refreshRouting(
@ApiResponse(code = 500, message = "Internal server error")
})
public String removeRouting(
@ApiParam(value = "Table name (with type)") @PathParam("tableName") String tableNameWithType) {
_routingManager.removeRouting(tableNameWithType);
@ApiParam(value = "Table name (with type)") @PathParam("tableName") String tableNameWithType,
@Context HttpHeaders headers) {
_routingManager.removeRouting(DatabaseUtils.translateTableName(tableNameWithType, headers));
return "Success";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.auth.Actions;
Expand Down Expand Up @@ -262,9 +263,9 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable SqlNodeAndOption
long requestId = _brokerIdGenerator.get();
requestContext.setRequestId(requestId);
if (httpHeaders != null) {
requestContext.setRequestHttpHeaders(httpHeaders.getRequestHeaders().entrySet().stream()
.filter(entry -> PinotBrokerQueryEventListenerFactory.getAllowlistQueryRequestHeaders()
.contains(entry.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
requestContext.setRequestHttpHeaders(httpHeaders.getRequestHeaders().entrySet().stream().filter(
entry -> PinotBrokerQueryEventListenerFactory.getAllowlistQueryRequestHeaders().contains(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
}

// First-stage access control to prevent unauthenticated requests from using up resources. Secondary table-level
Expand Down Expand Up @@ -373,16 +374,18 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
}

String tableName = getActualTableName(dataSource.getTableName(), _tableCache);
boolean ignoreCase = _tableCache.isIgnoreCase();
String tableName =
getActualTableName(DatabaseUtils.translateTableName(dataSource.getTableName(), httpHeaders, ignoreCase),
_tableCache);
dataSource.setTableName(tableName);
String rawTableName = TableNameBuilder.extractRawTableName(tableName);
requestContext.setTableName(rawTableName);

try {
boolean isCaseInsensitive = _tableCache.isIgnoreCase();
Map<String, String> columnNameMap = _tableCache.getColumnNameMap(rawTableName);
if (columnNameMap != null) {
updateColumnNames(rawTableName, serverPinotQuery, isCaseInsensitive, columnNameMap);
updateColumnNames(rawTableName, serverPinotQuery, ignoreCase, columnNameMap);
}
} catch (Exception e) {
// Throw exceptions with column in-existence error.
Expand Down Expand Up @@ -947,28 +950,17 @@ private void handleSubquery(Expression expression, long requestId, JsonNode json
/**
* Resolves the actual table name for:
* - Case-insensitive cluster
* - Table name in the format of [database_name].[table_name]
*
* @param tableName the table name in the query
* @param tableCache the table case-sensitive cache
* @return table name if the table name is found in Pinot registry, drop the database_name in the format
* of [database_name].[table_name] if only [table_name] is found in Pinot registry.
* @return table name if the table name is found in Pinot registry.
*/
@VisibleForTesting
static String getActualTableName(String tableName, TableCache tableCache) {
String actualTableName = tableCache.getActualTableName(tableName);
if (actualTableName != null) {
return actualTableName;
}

// Check if table is in the format of [database_name].[table_name]
String[] tableNameSplits = StringUtils.split(tableName, ".", 2);
if (tableNameSplits.length == 2) {
actualTableName = tableCache.getActualTableName(tableNameSplits[1]);
if (actualTableName != null) {
return actualTableName;
}
}
return tableName;
}

Expand Down Expand Up @@ -1652,21 +1644,18 @@ private static void fixColumnName(String rawTableName, Expression expression, Ma
/**
* Returns the actual column name for the given column name for:
* - Case-insensitive cluster
* - Column name in the format of [table_name].[column_name]
* - Column name in the format of [{@code rawTableName}].[column_name]
* - Column name in the format of [logical_table_name].[column_name] while {@code rawTableName} is a translated name
*/
@VisibleForTesting
static String getActualColumnName(String rawTableName, String columnName, @Nullable Map<String, String> columnNameMap,
boolean ignoreCase) {
if ("*".equals(columnName)) {
return columnName;
}
String columnNameToCheck;
if (columnName.regionMatches(ignoreCase, 0, rawTableName, 0, rawTableName.length())
&& columnName.length() > rawTableName.length() && columnName.charAt(rawTableName.length()) == '.') {
columnNameToCheck = ignoreCase ? columnName.substring(rawTableName.length() + 1).toLowerCase()
: columnName.substring(rawTableName.length() + 1);
} else {
columnNameToCheck = ignoreCase ? columnName.toLowerCase() : columnName;
String columnNameToCheck = trimTableName(rawTableName, columnName, ignoreCase);
if (ignoreCase) {
columnNameToCheck = columnNameToCheck.toLowerCase();
}
if (columnNameMap != null) {
String actualColumnName = columnNameMap.get(columnNameToCheck);
Expand All @@ -1680,6 +1669,26 @@ static String getActualColumnName(String rawTableName, String columnName, @Nulla
throw new BadQueryRequestException("Unknown columnName '" + columnName + "' found in the query");
}

private static String trimTableName(String rawTableName, String columnName, boolean ignoreCase) {
int columnNameLength = columnName.length();
int rawTableNameLength = rawTableName.length();
if (columnNameLength > rawTableNameLength && columnName.charAt(rawTableNameLength) == '.'
&& columnName.regionMatches(ignoreCase, 0, rawTableName, 0, rawTableNameLength)) {
return columnName.substring(rawTableNameLength + 1);
}
// Check if raw table name is translated name ([database_name].[logical_table_name]])
String[] split = StringUtils.split(rawTableName, '.');
if (split.length == 2) {
String logicalTableName = split[1];
int logicalTableNameLength = logicalTableName.length();
if (columnNameLength > logicalTableNameLength && columnName.charAt(logicalTableNameLength) == '.'
&& columnName.regionMatches(ignoreCase, 0, logicalTableName, 0, logicalTableNameLength)) {
return columnName.substring(logicalTableNameLength + 1);
}
}
return columnName;
}

/**
* Helper function to decide whether to force the log
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Nullable;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MultivaluedHashMap;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.broker.broker.AllowAllAccessControlFactory;
Expand Down Expand Up @@ -90,6 +92,12 @@ public void testGetActualColumnNameCaseSensitive() {
String actualColumnName =
BaseBrokerRequestHandler.getActualColumnName("mytable", "mytable.student_name", columnNameMap, false);
Assert.assertEquals(actualColumnName, "student_name");
Assert.assertEquals(
BaseBrokerRequestHandler.getActualColumnName("db1.mytable", "db1.mytable.student_name", columnNameMap, false),
"student_name");
Assert.assertEquals(
BaseBrokerRequestHandler.getActualColumnName("db1.mytable", "mytable.student_name", columnNameMap, false),
"student_name");
boolean exceptionThrown = false;
try {
BaseBrokerRequestHandler.getActualColumnName("mytable", "mytable2.student_name", columnNameMap, false);
Expand Down Expand Up @@ -124,6 +132,12 @@ public void testGetActualColumnNameCaseInSensitive() {
String actualColumnName =
BaseBrokerRequestHandler.getActualColumnName("mytable", "MYTABLE.student_name", columnNameMap, true);
Assert.assertEquals(actualColumnName, "student_name");
Assert.assertEquals(
BaseBrokerRequestHandler.getActualColumnName("db1.MYTABLE", "DB1.mytable.student_name", columnNameMap, true),
"student_name");
Assert.assertEquals(
BaseBrokerRequestHandler.getActualColumnName("db1.mytable", "MYTABLE.student_name", columnNameMap, true),
"student_name");
boolean exceptionThrown = false;
try {
BaseBrokerRequestHandler.getActualColumnName("student", "MYTABLE2.student_name", columnNameMap, true);
Expand Down Expand Up @@ -151,6 +165,9 @@ public void testCancelQuery()
TableCache tableCache = mock(TableCache.class);
TableConfig tableCfg = mock(TableConfig.class);
when(tableCache.getActualTableName(anyString())).thenReturn(tableName);
HttpHeaders headers = mock(HttpHeaders.class);
when(headers.getRequestHeaders()).thenReturn(new MultivaluedHashMap<>());
when(headers.getHeaderString(anyString())).thenReturn(null);
TenantConfig tenant = new TenantConfig("tier_BROKER", "tier_SERVER", null);
when(tableCfg.getTenantConfig()).thenReturn(tenant);
when(tableCache.getTableConfig(anyString())).thenReturn(tableCfg);
Expand Down Expand Up @@ -198,7 +215,7 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques
JsonNode request = JsonUtils.stringToJsonNode(
String.format("{\"sql\":\"select * from %s limit 10\",\"queryOptions\":\"timeoutMs=10000\"}", tableName));
RequestContext requestStats = Tracing.getTracer().createRequestScope();
requestHandler.handleRequest(request, null, requestStats, null);
requestHandler.handleRequest(request, null, requestStats, headers);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.utils;

import com.google.common.base.Preconditions;
import javax.annotation.Nullable;
import javax.ws.rs.core.HttpHeaders;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.spi.utils.CommonConstants;


public class DatabaseUtils {
private DatabaseUtils() {
}

/**
* Construct the fully qualified table name i.e. {databaseName}.{tableName} from given table name and database name
* @param tableName table/schema name
* @param databaseName database name
* @param ignoreCase whether to ignore case when comparing passed in database name against table name prefix if both
* exist. For 'default' database, always compare it ignoring case.
* @return translated table name. Throws {@link IllegalArgumentException} if {@code tableName} contains
* more than 1 dot or if {@code tableName} has database prefix, and it does not match with {@code databaseName}
*/
public static String translateTableName(String tableName, @Nullable String databaseName, boolean ignoreCase) {
Preconditions.checkArgument(StringUtils.isNotEmpty(tableName), "'tableName' cannot be null or empty");
String[] tableSplit = StringUtils.split(tableName, '.');
switch (tableSplit.length) {
case 1:
// do not concat the database name prefix if it's a 'default' database
if (StringUtils.isNotEmpty(databaseName) && !databaseName.equalsIgnoreCase(CommonConstants.DEFAULT_DATABASE)) {
Jackie-Jiang marked this conversation as resolved.
Show resolved Hide resolved
return databaseName + "." + tableName;
}
return tableName;
case 2:
Preconditions.checkArgument(!tableSplit[1].isEmpty(), "Invalid table name '%s'", tableName);
String databasePrefix = tableSplit[0];
Preconditions.checkArgument(
StringUtils.isEmpty(databaseName) || (!ignoreCase && databaseName.equals(databasePrefix)) || (ignoreCase
&& databaseName.equalsIgnoreCase(databasePrefix)),
"Database name '%s' from table prefix does not match database name '%s' from header", databasePrefix,
databaseName);
// skip database name prefix if it's a 'default' database
return databasePrefix.equalsIgnoreCase(CommonConstants.DEFAULT_DATABASE) ? tableSplit[1] : tableName;
default:
throw new IllegalArgumentException(
"Table name: '" + tableName + "' containing more than one '.' is not allowed");
}
}

public static String translateTableName(String tableName, @Nullable String databaseName) {
return translateTableName(tableName, databaseName, false);
}

/**
* Utility to get fully qualified table name i.e. {databaseName}.{tableName} from given table name and http headers
* @param tableName table/schema name
* @param headers http headers
* @param ignoreCase whether to ignore case when comparing database name in headers against table name prefix if both
* exist. For 'default' database, always compare it ignoring case.
* @return translated table name. Throws {@link IllegalStateException} if {@code tableName} contains more than 1 dot
* or if {@code tableName} has database prefix, and it does not match with the 'database' header
*/
public static String translateTableName(String tableName, HttpHeaders headers, boolean ignoreCase) {
return translateTableName(tableName, headers.getHeaderString(CommonConstants.DATABASE), ignoreCase);
}

public static String translateTableName(String tableName, HttpHeaders headers) {
return translateTableName(tableName, headers, false);
}
shounakmk219 marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.utils;

import org.apache.pinot.spi.utils.CommonConstants;
import org.testng.annotations.Test;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;


public class DatabaseUtilsTest {
private static final String LOGICAL_TABLE_NAME = "tb1";
private static final String DATABASE_NAME = "db1";
private static final String DEFAULT_DATABASE_NAME = CommonConstants.DEFAULT_DATABASE;
private static final String FULLY_QUALIFIED_TABLE_NAME = "db1.tb1";

@Test
public void translateTableNameTest() {
// valid cases with non-default database
check(LOGICAL_TABLE_NAME, DATABASE_NAME, FULLY_QUALIFIED_TABLE_NAME);
check(FULLY_QUALIFIED_TABLE_NAME, DATABASE_NAME, FULLY_QUALIFIED_TABLE_NAME);
check(FULLY_QUALIFIED_TABLE_NAME, null, FULLY_QUALIFIED_TABLE_NAME);

// error cases with non-default database
error(null, DATABASE_NAME);
error(FULLY_QUALIFIED_TABLE_NAME + "." + "foo", null);
error(FULLY_QUALIFIED_TABLE_NAME + "." + "foo", DATABASE_NAME);
error(FULLY_QUALIFIED_TABLE_NAME, DATABASE_NAME + "foo");

// valid cases with default database
check(LOGICAL_TABLE_NAME, null, LOGICAL_TABLE_NAME);
check(LOGICAL_TABLE_NAME, DEFAULT_DATABASE_NAME, LOGICAL_TABLE_NAME);
check(DEFAULT_DATABASE_NAME + "." + LOGICAL_TABLE_NAME, null, LOGICAL_TABLE_NAME);
check(DEFAULT_DATABASE_NAME + "." + LOGICAL_TABLE_NAME, DEFAULT_DATABASE_NAME, LOGICAL_TABLE_NAME);

// error cases with default database
error(null, DEFAULT_DATABASE_NAME);
error(FULLY_QUALIFIED_TABLE_NAME, DEFAULT_DATABASE_NAME);
error(DEFAULT_DATABASE_NAME + "." + LOGICAL_TABLE_NAME, DATABASE_NAME);
error(DEFAULT_DATABASE_NAME + "." + FULLY_QUALIFIED_TABLE_NAME, null);
error(DEFAULT_DATABASE_NAME + "." + FULLY_QUALIFIED_TABLE_NAME, DATABASE_NAME);
error(DEFAULT_DATABASE_NAME + "." + FULLY_QUALIFIED_TABLE_NAME, DEFAULT_DATABASE_NAME);
}

private void check(String tableName, String databaseName, String fqn) {
check(tableName, databaseName, fqn, false);
}

private void error(String tableName, String databaseName) {
check(tableName, databaseName, null, true);
}

private void check(String tableName, String databaseName, String fqn, boolean isError) {
if (isError) {
try {
DatabaseUtils.translateTableName(tableName, databaseName);
fail();
} catch (IllegalArgumentException ignored) {
return;
}
}
assertEquals(DatabaseUtils.translateTableName(tableName, databaseName), fqn);
}
}
Loading
Loading