Skip to content

Commit

Permalink
[admin-tool] Add gRPC endpoint for updateAdminOperationProtocolVersion (
Browse files Browse the repository at this point in the history
#1535)

Add grpc endpoint for updateAdminOperationProtocolVersion
Input: clusterName, target admin operation version we want to set ( -1 for the latest version)
Output: clusterName and updated admin operation version.
This PR only contains the gRPC setup for the endpoint, the main logic can be found in PR #1418
  • Loading branch information
minhmo1620 authored Feb 18, 2025
1 parent e05e625 commit a8fdd18
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3279,7 +3279,8 @@ private static void updateAdminOperationProtocolVersion(CommandLine cmd) throws
getRequiredArgument(cmd, Arg.ADMIN_OPERATION_PROTOCOL_VERSION, Command.UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION);
long protocolVersion =
Utils.parseLongFromString(protocolVersionInString, Arg.ADMIN_OPERATION_PROTOCOL_VERSION.name());
ControllerResponse response = controllerClient.updateAdminOperationProtocolVersion(clusterName, protocolVersion);
AdminTopicMetadataResponse response =
controllerClient.updateAdminOperationProtocolVersion(clusterName, protocolVersion);
printObject(response);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1351,7 +1351,7 @@ public AdminTopicMetadataResponse getAdminTopicMetadata(Optional<String> storeNa
return request(ControllerRoute.GET_ADMIN_TOPIC_METADATA, params, AdminTopicMetadataResponse.class);
}

public ControllerResponse updateAdminTopicMetadata(
public AdminTopicMetadataResponse updateAdminTopicMetadata(
long executionId,
Optional<String> storeName,
Optional<Long> offset,
Expand All @@ -1360,15 +1360,15 @@ public ControllerResponse updateAdminTopicMetadata(
.add(NAME, storeName)
.add(OFFSET, offset)
.add(UPSTREAM_OFFSET, upstreamOffset);
return request(ControllerRoute.UPDATE_ADMIN_TOPIC_METADATA, params, ControllerResponse.class);
return request(ControllerRoute.UPDATE_ADMIN_TOPIC_METADATA, params, AdminTopicMetadataResponse.class);
}

public ControllerResponse updateAdminOperationProtocolVersion(
public AdminTopicMetadataResponse updateAdminOperationProtocolVersion(
String clusterName,
Long adminOperationProtocolVersion) {
QueryParams params =
newParams().add(CLUSTER, clusterName).add(ADMIN_OPERATION_PROTOCOL_VERSION, adminOperationProtocolVersion);
return request(ControllerRoute.UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION, params, ControllerResponse.class);
return request(ControllerRoute.UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION, params, AdminTopicMetadataResponse.class);
}

public ControllerResponse deleteKafkaTopic(String topicName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.linkedin.venice.protocols.controller.ClusterAdminOpsGrpcServiceGrpc;
import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcRequest;
import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcResponse;
import com.linkedin.venice.protocols.controller.UpdateAdminOperationProtocolVersionGrpcRequest;
import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest;
import io.grpc.Context;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -88,4 +89,17 @@ public void updateAdminTopicMetadata(
return requestHandler.updateAdminTopicMetadata(request);
}, responseObserver, metadata.getClusterName(), metadata.hasStoreName() ? metadata.getStoreName() : null);
}

@Override
public void updateAdminOperationProtocolVersion(
UpdateAdminOperationProtocolVersionGrpcRequest request,
StreamObserver<AdminTopicMetadataGrpcResponse> responseObserver) {
LOGGER.debug("Received updateAdminOperationProtocolVersion request: {}", request);
ControllerGrpcServerUtils.handleRequest(
ClusterAdminOpsGrpcServiceGrpc.getUpdateAdminOperationProtocolVersionMethod(),
() -> requestHandler.updateAdminOperationProtocolVersion(request),
responseObserver,
request.getClusterName(),
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,8 @@ public boolean startInner() throws Exception {
UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION.getPath(),
new VeniceParentControllerRegionStateHandler(
admin,
adminTopicMetadataRoutes.updateAdminOperationProtocolVersion(admin)));
adminTopicMetadataRoutes
.updateAdminOperationProtocolVersion(admin, requestHandler.getClusterAdminOpsRequestHandler())));
httpService.post(
DELETE_KAFKA_TOPIC.getPath(),
new VeniceParentControllerRegionStateHandler(admin, storesRoutes.deleteKafkaTopic(admin)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.linkedin.venice.protocols.controller.AdminTopicGrpcMetadata;
import com.linkedin.venice.protocols.controller.AdminTopicMetadataGrpcRequest;
import com.linkedin.venice.protocols.controller.AdminTopicMetadataGrpcResponse;
import com.linkedin.venice.protocols.controller.UpdateAdminOperationProtocolVersionGrpcRequest;
import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest;
import java.util.Optional;
import org.apache.http.HttpStatus;
Expand Down Expand Up @@ -105,7 +106,7 @@ public Route updateAdminTopicMetadata(Admin admin, ClusterAdminOpsRequestHandler
};
}

public Route updateAdminOperationProtocolVersion(Admin admin) {
public Route updateAdminOperationProtocolVersion(Admin admin, ClusterAdminOpsRequestHandler requestHandler) {
return (request, response) -> {
AdminTopicMetadataResponse responseObject = new AdminTopicMetadataResponse();
response.type(HttpConstants.JSON);
Expand All @@ -120,11 +121,15 @@ public Route updateAdminOperationProtocolVersion(Admin admin) {
AdminSparkServer.validateParams(request, UPDATE_ADMIN_OPERATION_PROTOCOL_VERSION.getParams(), admin);
String clusterName = request.queryParams(CLUSTER);
Long adminOperationProtocolVersion = Long.parseLong(request.queryParams(ADMIN_OPERATION_PROTOCOL_VERSION));
AdminTopicMetadataGrpcResponse internalResponse = requestHandler.updateAdminOperationProtocolVersion(
UpdateAdminOperationProtocolVersionGrpcRequest.newBuilder()
.setClusterName(clusterName)
.setAdminOperationProtocolVersion(adminOperationProtocolVersion)
.build());

responseObject.setCluster(clusterName);
responseObject.setAdminOperationProtocolVersion(adminOperationProtocolVersion);

admin.updateAdminOperationProtocolVersion(clusterName, adminOperationProtocolVersion);
responseObject.setCluster(internalResponse.getMetadata().getClusterName());
responseObject
.setAdminOperationProtocolVersion(internalResponse.getMetadata().getAdminOperationProtocolVersion());
} catch (Throwable e) {
responseObject.setError(e);
AdminSparkServer.handleError(new VeniceException(e), request, response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.linkedin.venice.protocols.controller.AdminTopicMetadataGrpcResponse;
import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcRequest;
import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcResponse;
import com.linkedin.venice.protocols.controller.UpdateAdminOperationProtocolVersionGrpcRequest;
import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest;
import com.linkedin.venice.utils.Pair;
import java.util.Map;
Expand Down Expand Up @@ -148,4 +149,24 @@ public AdminTopicMetadataGrpcResponse updateAdminTopicMetadata(UpdateAdminTopicM
AdminTopicMetadataGrpcResponse.newBuilder().setMetadata(adminTopicGrpcMetadataBuilder.build());
return responseBuilder.build();
}

public AdminTopicMetadataGrpcResponse updateAdminOperationProtocolVersion(
UpdateAdminOperationProtocolVersionGrpcRequest request) {
String clusterName = request.getClusterName();
long adminOperationProtocolVersion = request.getAdminOperationProtocolVersion();
ControllerRequestParamValidator
.validateAdminOperationProtocolVersionRequest(clusterName, adminOperationProtocolVersion);

LOGGER.info(
"Updating admin operation protocol version for cluster: {} to version: {}",
clusterName,
adminOperationProtocolVersion);

admin.updateAdminOperationProtocolVersion(clusterName, adminOperationProtocolVersion);

AdminTopicGrpcMetadata.Builder adminMetadataBuilder = AdminTopicGrpcMetadata.newBuilder()
.setClusterName(clusterName)
.setAdminOperationProtocolVersion(adminOperationProtocolVersion);
return AdminTopicMetadataGrpcResponse.newBuilder().setMetadata(adminMetadataBuilder.build()).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,14 @@ public static void validateAdminCommandExecutionRequest(String clusterName, long
throw new IllegalArgumentException("Admin command execution id with positive value is required");
}
}

public static void validateAdminOperationProtocolVersionRequest(String clusterName, long protocolVersion) {
if (StringUtils.isBlank(clusterName)) {
throw new IllegalArgumentException("Cluster name is required for updating admin operation protocol version");
}
if (protocolVersion == 0 || protocolVersion < -1) {
throw new IllegalArgumentException(
"Admin operation protocol version is required and must be -1 or greater than 0");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.linkedin.venice.protocols.controller.ClusterAdminOpsGrpcServiceGrpc.ClusterAdminOpsGrpcServiceBlockingStub;
import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcRequest;
import com.linkedin.venice.protocols.controller.LastSuccessfulAdminCommandExecutionGrpcResponse;
import com.linkedin.venice.protocols.controller.UpdateAdminOperationProtocolVersionGrpcRequest;
import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest;
import com.linkedin.venice.protocols.controller.VeniceControllerGrpcErrorInfo;
import io.grpc.ManagedChannel;
Expand Down Expand Up @@ -191,4 +192,25 @@ public void testUpdateAdminTopicMetadataUnauthorized() {
errorInfo.getErrorMessage().contains(ACL_CHECK_FAILURE_WARN_MESSAGE_PREFIX),
"Actual error message: " + errorInfo.getErrorMessage());
}

@Test
public void testUpdateAdminOperationProtocolVersionSuccess() {
AdminTopicGrpcMetadata.Builder adminTopicGrpcMetadataBuilder =
AdminTopicGrpcMetadata.newBuilder().setClusterName(TEST_CLUSTER).setAdminOperationProtocolVersion(1L);
AdminTopicMetadataGrpcResponse response =
AdminTopicMetadataGrpcResponse.newBuilder().setMetadata(adminTopicGrpcMetadataBuilder.build()).build();
doReturn(response).when(requestHandler)
.updateAdminOperationProtocolVersion(any(UpdateAdminOperationProtocolVersionGrpcRequest.class));
doReturn(true).when(accessManager).isAllowListUser(anyString(), any());

UpdateAdminOperationProtocolVersionGrpcRequest request = UpdateAdminOperationProtocolVersionGrpcRequest.newBuilder()
.setClusterName(TEST_CLUSTER)
.setAdminOperationProtocolVersion(1L)
.build();

AdminTopicMetadataGrpcResponse actualResponse = blockingStub.updateAdminOperationProtocolVersion(request);
assertNotNull(actualResponse);
assertEquals(actualResponse.getMetadata().getClusterName(), TEST_CLUSTER);
assertEquals(actualResponse.getMetadata().getAdminOperationProtocolVersion(), 1L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import com.linkedin.venice.acl.DynamicAccessController;
import com.linkedin.venice.controller.Admin;
import com.linkedin.venice.controllerapi.AdminTopicMetadataResponse;
import com.linkedin.venice.controllerapi.ControllerResponse;
import com.linkedin.venice.protocols.controller.AdminTopicGrpcMetadata;
import com.linkedin.venice.protocols.controller.AdminTopicMetadataGrpcRequest;
import com.linkedin.venice.protocols.controller.AdminTopicMetadataGrpcResponse;
import com.linkedin.venice.protocols.controller.UpdateAdminOperationProtocolVersionGrpcRequest;
import com.linkedin.venice.protocols.controller.UpdateAdminTopicMetadataGrpcRequest;
import com.linkedin.venice.utils.ObjectMapperFactory;
import java.security.cert.X509Certificate;
Expand Down Expand Up @@ -158,8 +158,8 @@ public void testUpdateAdminTopicMetadataSuccess() throws Exception {

Route route =
new AdminTopicMetadataRoutes(false, Optional.empty()).updateAdminTopicMetadata(mockAdmin, requestHandler);
ControllerResponse responseObject =
OBJECT_MAPPER.readValue(route.handle(request, response).toString(), ControllerResponse.class);
AdminTopicMetadataResponse responseObject =
OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AdminTopicMetadataResponse.class);

verify(requestHandler, times(1)).updateAdminTopicMetadata(any(UpdateAdminTopicMetadataGrpcRequest.class));
assertEquals(responseObject.getCluster(), TEST_CLUSTER);
Expand Down Expand Up @@ -189,8 +189,8 @@ public void testUpdateAdminTopicMetadataHandlesUnauthorizedAccess() throws Excep
when(request.queryParams(STORE_NAME)).thenReturn(TEST_STORE);
Route route = new AdminTopicMetadataRoutes(false, Optional.of(accessController))
.updateAdminTopicMetadata(mockAdmin, requestHandler);
ControllerResponse responseObject =
OBJECT_MAPPER.readValue(route.handle(request, response).toString(), ControllerResponse.class);
AdminTopicMetadataResponse responseObject =
OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AdminTopicMetadataResponse.class);

verify(requestHandler, never()).updateAdminTopicMetadata(any(UpdateAdminTopicMetadataGrpcRequest.class));
assertNotNull(responseObject.getError());
Expand All @@ -212,8 +212,8 @@ public void testUpdateAdminTopicMetadataHandlesException() throws Exception {

Route route =
new AdminTopicMetadataRoutes(false, Optional.empty()).updateAdminTopicMetadata(mockAdmin, requestHandler);
ControllerResponse responseObject =
OBJECT_MAPPER.readValue(route.handle(request, response).toString(), ControllerResponse.class);
AdminTopicMetadataResponse responseObject =
OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AdminTopicMetadataResponse.class);

verify(requestHandler, times(1)).updateAdminTopicMetadata(any(UpdateAdminTopicMetadataGrpcRequest.class));
assertNotNull(responseObject.getError());
Expand All @@ -223,20 +223,33 @@ public void testUpdateAdminTopicMetadataHandlesException() throws Exception {
@Test
public void testUpdateAdminOperationProtocolVersion() throws Exception {
QueryParamsMap paramsMap = mock(QueryParamsMap.class);
String adminOperationProtocolVersion = "1";
long adminOperationProtocolVersion = 1L;
doReturn(new HashMap<>()).when(paramsMap).toMap();
doReturn(paramsMap).when(request).queryMap();

when(request.queryParams(CLUSTER)).thenReturn(TEST_CLUSTER);
when(request.queryParams(ADMIN_OPERATION_PROTOCOL_VERSION)).thenReturn(adminOperationProtocolVersion);
when(request.queryParams(ADMIN_OPERATION_PROTOCOL_VERSION))
.thenReturn(String.valueOf(adminOperationProtocolVersion));

Route route = new AdminTopicMetadataRoutes(false, Optional.empty()).updateAdminOperationProtocolVersion(mockAdmin);
AdminTopicGrpcMetadata.Builder adminTopicGrpcMetadataBuilder = AdminTopicGrpcMetadata.newBuilder()
.setClusterName(TEST_CLUSTER)
.setAdminOperationProtocolVersion(adminOperationProtocolVersion);
AdminTopicMetadataGrpcResponse grpcResponse =
AdminTopicMetadataGrpcResponse.newBuilder().setMetadata(adminTopicGrpcMetadataBuilder.build()).build();

when(requestHandler.updateAdminOperationProtocolVersion(any(UpdateAdminOperationProtocolVersionGrpcRequest.class)))
.thenReturn(grpcResponse);

Route route = new AdminTopicMetadataRoutes(false, Optional.empty())
.updateAdminOperationProtocolVersion(mockAdmin, requestHandler);

AdminTopicMetadataResponse responseObject =
OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AdminTopicMetadataResponse.class);

verify(requestHandler, times(1))
.updateAdminOperationProtocolVersion(any(UpdateAdminOperationProtocolVersionGrpcRequest.class));
assertEquals(responseObject.getCluster(), TEST_CLUSTER);
assertEquals(responseObject.getAdminOperationProtocolVersion(), 1L);
assertEquals(responseObject.getAdminOperationProtocolVersion(), adminOperationProtocolVersion);
assertNull(responseObject.getError());
}

Expand All @@ -262,11 +275,12 @@ public void testUpdateAdminOperationProtocolVersionHandlesUnauthorizedAccess() t
when(request.queryParams(ADMIN_OPERATION_PROTOCOL_VERSION)).thenReturn(adminOperationProtocolVersion);

Route route = new AdminTopicMetadataRoutes(false, Optional.of(accessController))
.updateAdminOperationProtocolVersion(mockAdmin);
.updateAdminOperationProtocolVersion(mockAdmin, requestHandler);

AdminTopicMetadataResponse responseObject =
OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AdminTopicMetadataResponse.class);

verify(requestHandler, never()).updateAdminOperationProtocolVersion(any());
assertNotNull(responseObject.getError());
assertTrue(responseObject.getError().contains("Only admin users are allowed"));
}
Expand All @@ -279,11 +293,12 @@ public void testUpdateAdminOperationProtocolVersionHandlesMissingParams() throws

when(request.queryParams(CLUSTER)).thenReturn(null); // Missing cluster parameter

Route route = new AdminTopicMetadataRoutes(false, Optional.empty()).updateAdminOperationProtocolVersion(mockAdmin);
Route route = new AdminTopicMetadataRoutes(false, Optional.empty())
.updateAdminOperationProtocolVersion(mockAdmin, requestHandler);
AdminTopicMetadataResponse responseObject =
OBJECT_MAPPER.readValue(route.handle(request, response).toString(), AdminTopicMetadataResponse.class);

verify(requestHandler, never()).getAdminTopicMetadata(any());
verify(requestHandler, never()).updateAdminOperationProtocolVersion(any());
assertNotNull(responseObject.getError());
assertTrue(responseObject.getError().contains("cluster_name is a required parameter"));
}
Expand Down
Loading

0 comments on commit a8fdd18

Please sign in to comment.