Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow passing database context through database http header #12417

Merged
merged 42 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
a169915
Request filters to translate table name
shounakmk219 Feb 9, 2024
b7b93d8
Make endpoints database aware
shounakmk219 Feb 13, 2024
3a1ba75
fix column name extraction logic when prefixed with database
shounakmk219 Feb 14, 2024
a29f5f5
test and checkstyle fixes
shounakmk219 Feb 14, 2024
7c8ba97
test fixes
shounakmk219 Feb 14, 2024
96fd366
Merge remote-tracking branch 'upstream/master' into db-request-filter
shounakmk219 Feb 14, 2024
09e129b
Handle deprecated config segmentsConfig.schemaName to avoid false errors
shounakmk219 Feb 21, 2024
b5e14c2
Move tableCache registration to start()
shounakmk219 Feb 21, 2024
5f28a09
fix false errors upon mixed table name formats being passed
shounakmk219 Feb 22, 2024
bb4ef42
Expose list database controller API
shounakmk219 Feb 22, 2024
b7c94a1
Merge remote-tracking branch 'upstream/master' into db-request-filter
shounakmk219 Feb 23, 2024
37c75e8
refactor getActualColumnName to cover all edge cases
shounakmk219 Feb 27, 2024
872d0ac
Merge remote-tracking branch 'upstream/master' into db-request-filter
shounakmk219 Feb 27, 2024
dd7a62c
handle null names for equivalence
shounakmk219 Feb 27, 2024
b8d1f43
add java doc
shounakmk219 Feb 28, 2024
6144ab0
set uri outside the loop
shounakmk219 Feb 28, 2024
c0c5219
Use StringUtils.split to avoid regex match
shounakmk219 Feb 28, 2024
b77eb28
Handle default db during translation without table cache
shounakmk219 Feb 28, 2024
1abae88
fixes
shounakmk219 Feb 28, 2024
3333f8f
Revert the request filter implementation and push down translation lo…
shounakmk219 Mar 1, 2024
a5eccbf
refactor PinotSchemaRestletResource
shounakmk219 Mar 1, 2024
46e8ab8
translate the table name before authz validations
shounakmk219 Mar 1, 2024
c6b017f
refactor translateTableName method
shounakmk219 Mar 1, 2024
d1a2728
missing license header
shounakmk219 Mar 1, 2024
a0b4618
refactor
shounakmk219 Mar 4, 2024
8e4e089
Merge remote-tracking branch 'upstream/master' into db-request-filter
shounakmk219 Mar 4, 2024
5b9f014
checkstyle fix
shounakmk219 Mar 4, 2024
f7971f3
cleanup
shounakmk219 Mar 5, 2024
9540b4b
Bugfix. Handle default db header for get table flows
shounakmk219 Mar 5, 2024
1a75690
Merge remote-tracking branch 'upstream/master' into db-request-filter
shounakmk219 Mar 5, 2024
37362e3
API payloads should either have all translated values or all logical …
shounakmk219 Mar 6, 2024
fd1e750
refactors and minor fixes
shounakmk219 Mar 6, 2024
e9f8f7a
Merge remote-tracking branch 'upstream/master' into db-request-filter
shounakmk219 Mar 6, 2024
12bca07
add missing table name translations on query params
shounakmk219 Mar 7, 2024
f54dee4
revert adding entries for default.tableName in tableCache name maps
shounakmk219 Mar 8, 2024
f5f7b30
fixes and reformats
shounakmk219 Mar 8, 2024
6a417b9
Merge remote-tracking branch 'upstream/master' into db-request-filter
shounakmk219 Mar 8, 2024
73621f9
remove redundant default database check
shounakmk219 Mar 12, 2024
1b21160
Merge remote-tracking branch 'upstream/master' into db-request-filter
shounakmk219 Mar 12, 2024
6f6401b
Misc fixes: ignore case, table config and schema handling
Jackie-Jiang Mar 12, 2024
68d17ee
ensure translated name is sent to auth validations
shounakmk219 Mar 12, 2024
a7d6f93
Fix test and fine grained access control
Jackie-Jiang Mar 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,17 @@
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.spi.utils.CommonConstants;

import static org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;

Expand All @@ -52,6 +57,9 @@ public class PinotBrokerRouting {
@Inject
BrokerRoutingManager _routingManager;

@Inject
TableCache _tableCache;

@PUT
@Produces(MediaType.TEXT_PLAIN)
@Path("/routing/{tableName}")
Expand All @@ -61,8 +69,23 @@ public class PinotBrokerRouting {
@ApiResponse(code = 200, message = "Success"),
@ApiResponse(code = 500, message = "Internal server error")
})
public String buildRouting(
public String buildRouting(@Context HttpHeaders headers,
shounakmk219 marked this conversation as resolved.
Show resolved Hide resolved
@ApiParam(value = "Table name (with type)") @PathParam("tableName") String tableNameWithType) {
return buildRoutingV2(DatabaseUtils.translateTableName(tableNameWithType,
headers.getHeaderString(CommonConstants.DATABASE), _tableCache));
}

@PUT
@Produces(MediaType.TEXT_PLAIN)
@Path("/v2/routing")
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.BUILD_ROUTING)
@ApiOperation(value = "Build/rebuild the routing for a table", notes = "Build/rebuild the routing for a table")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Success"),
@ApiResponse(code = 500, message = "Internal server error")
})
public String buildRoutingV2(
@ApiParam(value = "Table name (with type)") @QueryParam("tableName") String tableNameWithType) {
_routingManager.buildRouting(tableNameWithType);
return "Success";
}
Expand All @@ -76,9 +99,25 @@ public String buildRouting(
@ApiResponse(code = 200, message = "Success"),
@ApiResponse(code = 500, message = "Internal server error")
})
public String refreshRouting(
public String refreshRouting(@Context HttpHeaders headers,
@ApiParam(value = "Table name (with type)") @PathParam("tableName") String tableNameWithType,
@ApiParam(value = "Segment name") @PathParam("segmentName") String segmentName) {
return refreshRoutingV2(DatabaseUtils.translateTableName(tableNameWithType,
headers.getHeaderString(CommonConstants.DATABASE), _tableCache), segmentName);
}

@PUT
@Produces(MediaType.TEXT_PLAIN)
@Path("/v2/routing/refresh/{segmentName}")
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.REFRESH_ROUTING)
@ApiOperation(value = "Refresh the routing for a segment", notes = "Refresh the routing for a segment")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Success"),
@ApiResponse(code = 500, message = "Internal server error")
})
public String refreshRoutingV2(
@ApiParam(value = "Table name (with type)") @QueryParam("tableName") String tableNameWithType,
@ApiParam(value = "Segment name") @PathParam("segmentName") String segmentName) {
_routingManager.refreshSegment(tableNameWithType, segmentName);
return "Success";
}
Expand All @@ -92,8 +131,23 @@ public String refreshRouting(
@ApiResponse(code = 200, message = "Success"),
@ApiResponse(code = 500, message = "Internal server error")
})
public String removeRouting(
public String removeRouting(@Context HttpHeaders headers,
@ApiParam(value = "Table name (with type)") @PathParam("tableName") String tableNameWithType) {
return removeRoutingV2(DatabaseUtils.translateTableName(tableNameWithType,
headers.getHeaderString(CommonConstants.DATABASE), _tableCache));
}

@DELETE
@Produces(MediaType.TEXT_PLAIN)
@Path("/v2/routing")
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.DELETE_ROUTING)
@ApiOperation(value = "Remove the routing for a table", notes = "Remove the routing for a table")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Success"),
@ApiResponse(code = 500, message = "Internal server error")
})
public String removeRoutingV2(
@ApiParam(value = "Table name (with type)") @QueryParam("tableName") String tableNameWithType) {
_routingManager.removeRouting(tableNameWithType);
return "Success";
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.broker.broker;

import java.io.IOException;
import javax.inject.Inject;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerRequestFilter;
import javax.ws.rs.container.PreMatching;
import javax.ws.rs.ext.Provider;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


@Provider
@PreMatching
public class BrokerRequestFilter implements ContainerRequestFilter {

public static final Logger LOGGER = LoggerFactory.getLogger(BrokerRequestFilter.class);

@Inject
private TableCache _tableCache;

@Override
public void filter(ContainerRequestContext requestContext)
throws IOException {
// uses the database name from header to build the actual table name
DatabaseUtils.translateTableNameQueryParam(requestContext, _tableCache);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.pinot.spi.utils.InstanceTypeUtils;
import org.apache.pinot.spi.utils.NetUtils;
import org.apache.pinot.sql.parsers.rewriter.QueryRewriterFactory;
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -128,6 +129,7 @@ public abstract class BaseBrokerStarter implements ServiceStartable {
// Handles the server routing stats.
protected ServerRoutingStatsManager _serverRoutingStatsManager;
protected BrokerQueryEventListener _brokerQueryEventListener;
protected TableCache _tableCache;

@Override
public void init(PinotConfiguration brokerConf)
Expand Down Expand Up @@ -283,7 +285,7 @@ public void start()
FunctionRegistry.init();
boolean caseInsensitive =
_brokerConf.getProperty(Helix.ENABLE_CASE_INSENSITIVE_KEY, Helix.DEFAULT_ENABLE_CASE_INSENSITIVE);
TableCache tableCache = new TableCache(_propertyStore, caseInsensitive);
_tableCache = new TableCache(_propertyStore, caseInsensitive);
// Configure TLS for netty connection to server
TlsConfig tlsDefaults = TlsUtils.extractTlsConfig(_brokerConf, Broker.BROKER_TLS_PREFIX);
NettyConfig nettyDefaults = NettyConfig.extractNettyConfig(_brokerConf, Broker.BROKER_NETTY_PREFIX);
Expand All @@ -300,17 +302,17 @@ public void start()
if (brokerRequestHandlerType.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE)) {
singleStageBrokerRequestHandler =
new GrpcBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory, queryQuotaManager,
tableCache, _brokerMetrics, null, _brokerQueryEventListener);
_tableCache, _brokerMetrics, null, _brokerQueryEventListener);
} else { // default request handler type, e.g. netty
if (_brokerConf.getProperty(Broker.BROKER_NETTYTLS_ENABLED, false)) {
singleStageBrokerRequestHandler =
new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, tlsDefaults, _serverRoutingStatsManager,
queryQuotaManager, _tableCache, _brokerMetrics, nettyDefaults, tlsDefaults, _serverRoutingStatsManager,
_brokerQueryEventListener);
} else {
singleStageBrokerRequestHandler =
new SingleConnectionBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
queryQuotaManager, tableCache, _brokerMetrics, nettyDefaults, null, _serverRoutingStatsManager,
queryQuotaManager, _tableCache, _brokerMetrics, nettyDefaults, null, _serverRoutingStatsManager,
_brokerQueryEventListener);
}
}
Expand All @@ -322,7 +324,7 @@ public void start()
// TODO: decouple protocol and engine selection.
multiStageBrokerRequestHandler =
new MultiStageBrokerRequestHandler(_brokerConf, brokerId, _routingManager, _accessControlFactory,
queryQuotaManager, tableCache, _brokerMetrics, _brokerQueryEventListener);
queryQuotaManager, _tableCache, _brokerMetrics, _brokerQueryEventListener);
}

_brokerRequestHandler = new BrokerRequestHandlerDelegate(brokerId, singleStageBrokerRequestHandler,
Expand Down Expand Up @@ -350,6 +352,12 @@ public void start()
_brokerAdminApplication =
new BrokerAdminApiApplication(_routingManager, _brokerRequestHandler, _brokerMetrics, _brokerConf,
_sqlQueryExecutor, _serverRoutingStatsManager, _accessControlFactory, _spectatorHelixManager);
_brokerAdminApplication.register(new AbstractBinder() {
@Override
protected void configure() {
bind(_tableCache).to(TableCache.class);
}
});
registerExtraComponents(_brokerAdminApplication);
_brokerAdminApplication.start(_listenerConfigs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.auth.Actions;
Expand Down Expand Up @@ -320,6 +321,11 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
// Compile the request into PinotQuery
compilationStartTimeNs = System.nanoTime();
pinotQuery = CalciteSqlParser.compileToPinotQuery(sqlNodeAndOptions);
if (pinotQuery.getDataSource() != null) {
pinotQuery.getDataSource().setTableName(DatabaseUtils.translateTableName(
pinotQuery.getDataSource().getTableName(), httpHeaders.getHeaderString(CommonConstants.DATABASE),
_tableCache));
}
} catch (Exception e) {
LOGGER.info("Caught exception while compiling SQL request {}: {}, {}", requestId, query, e.getMessage());
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS, 1);
Expand Down Expand Up @@ -372,7 +378,8 @@ protected BrokerResponse handleRequest(long requestId, String query, @Nullable S
return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
}

String tableName = getActualTableName(dataSource.getTableName(), _tableCache);
String tableName = DatabaseUtils.translateTableName(dataSource.getTableName(),
httpHeaders.getHeaderString(CommonConstants.DATABASE), _tableCache);
dataSource.setTableName(tableName);
String rawTableName = TableNameBuilder.extractRawTableName(tableName);
requestContext.setTableName(rawTableName);
Expand Down Expand Up @@ -943,34 +950,6 @@ private void handleSubquery(Expression expression, long requestId, JsonNode json
}
}

/**
* Resolves the actual table name for:
* - Case-insensitive cluster
* - Table name in the format of [database_name].[table_name]
*
* @param tableName the table name in the query
* @param tableCache the table case-sensitive cache
* @return table name if the table name is found in Pinot registry, drop the database_name in the format
* of [database_name].[table_name] if only [table_name] is found in Pinot registry.
*/
@VisibleForTesting
static String getActualTableName(String tableName, TableCache tableCache) {
String actualTableName = tableCache.getActualTableName(tableName);
if (actualTableName != null) {
return actualTableName;
}

// Check if table is in the format of [database_name].[table_name]
String[] tableNameSplits = StringUtils.split(tableName, ".", 2);
if (tableNameSplits.length == 2) {
actualTableName = tableCache.getActualTableName(tableNameSplits[1]);
if (actualTableName != null) {
return actualTableName;
}
}
return tableName;
}

/**
* Retrieve segment partitioned columns for a table.
* For a hybrid table, a segment partitioned column has to be the intersection of both offline and realtime tables.
Expand Down Expand Up @@ -1652,6 +1631,7 @@ private static void fixColumnName(String rawTableName, Expression expression, Ma
* Returns the actual column name for the given column name for:
* - Case-insensitive cluster
* - Column name in the format of [table_name].[column_name]
* - Column name in the format of [database_name].[table_name].[column_name]
shounakmk219 marked this conversation as resolved.
Show resolved Hide resolved
*/
@VisibleForTesting
static String getActualColumnName(String rawTableName, String columnName, @Nullable Map<String, String> columnNameMap,
Expand All @@ -1660,10 +1640,25 @@ static String getActualColumnName(String rawTableName, String columnName, @Nulla
return columnName;
}
String columnNameToCheck;
if (columnName.regionMatches(ignoreCase, 0, rawTableName, 0, rawTableName.length())
&& columnName.length() > rawTableName.length() && columnName.charAt(rawTableName.length()) == '.') {
columnNameToCheck = ignoreCase ? columnName.substring(rawTableName.length() + 1).toLowerCase()
: columnName.substring(rawTableName.length() + 1);
String resolvedColumnName = columnName;
if (rawTableName.contains(".")) { // table name has database prefix
String databaseName = rawTableName.split("\\.")[0];
shounakmk219 marked this conversation as resolved.
Show resolved Hide resolved
// if column name only has table prefix, we need to append the database prefix as well
// to ensure following logic does not break
if (columnName.split("\\.").length == 2) {
resolvedColumnName = String.format("%s.%s", databaseName, columnName);
}
} else { // table name does not have database prefix -> table is under "default" database
// remove the "default" database prefix from column name if present
if (columnName.split("\\.").length == 3 && columnName.startsWith(CommonConstants.DEFAULT_DATABASE + ".")) {
resolvedColumnName = columnName.substring(CommonConstants.DEFAULT_DATABASE.length() + 1);
}
}
if (resolvedColumnName.regionMatches(ignoreCase, 0, rawTableName, 0, rawTableName.length())
&& resolvedColumnName.length() > rawTableName.length()
&& resolvedColumnName.charAt(rawTableName.length()) == '.') {
columnNameToCheck = ignoreCase ? resolvedColumnName.substring(rawTableName.length() + 1).toLowerCase()
: resolvedColumnName.substring(rawTableName.length() + 1);
} else {
columnNameToCheck = ignoreCase ? columnName.toLowerCase() : columnName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Nullable;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MultivaluedHashMap;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.model.InstanceConfig;
import org.apache.pinot.broker.broker.AllowAllAccessControlFactory;
Expand Down Expand Up @@ -151,6 +153,9 @@ public void testCancelQuery()
TableCache tableCache = mock(TableCache.class);
TableConfig tableCfg = mock(TableConfig.class);
when(tableCache.getActualTableName(anyString())).thenReturn(tableName);
HttpHeaders headers = mock(HttpHeaders.class);
when(headers.getRequestHeaders()).thenReturn(new MultivaluedHashMap<>());
when(headers.getHeaderString(anyString())).thenReturn(null);
TenantConfig tenant = new TenantConfig("tier_BROKER", "tier_SERVER", null);
when(tableCfg.getTenantConfig()).thenReturn(tenant);
when(tableCache.getTableConfig(anyString())).thenReturn(tableCfg);
Expand Down Expand Up @@ -198,7 +203,7 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques
JsonNode request = JsonUtils.stringToJsonNode(
String.format("{\"sql\":\"select * from %s limit 10\",\"queryOptions\":\"timeoutMs=10000\"}", tableName));
RequestContext requestStats = Tracing.getTracer().createRequestScope();
requestHandler.handleRequest(request, null, requestStats, null);
requestHandler.handleRequest(request, null, requestStats, headers);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Loading
Loading