Skip to content

Commit

Permalink
Merge branch 'main' into #5550-add-partition-pre-event-for-Gravitino-…
Browse files Browse the repository at this point in the history
…server

# Conflicts:
#	docs/gravitino-server-config.md
  • Loading branch information
LiuQhahah committed Nov 21, 2024
2 parents 8394eab + af259c3 commit a15df66
Show file tree
Hide file tree
Showing 31 changed files with 963 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,18 @@ public void testDownloadFromHTTP() throws Exception {
boolean success = false;
int attempts = 0;

while (!success && attempts < MAX_RETRIES) {
while (!success) {
try {
LOG.info("Attempting to download file from URL: {} (Attempt {})", fileUrl, attempts + 1);
FetchFileUtils.fetchFileFromUri(fileUrl, destFile, 10, conf);
FetchFileUtils.fetchFileFromUri(fileUrl, destFile, 45, conf);
success = true;
LOG.info("File downloaded successfully on attempt {}", attempts + 1);
} catch (IOException e) {
attempts++;
LOG.error("Download attempt {} failed due to: {}", attempts, e.getMessage(), e);
if (attempts < MAX_RETRIES) {
long retryDelay = INITIAL_RETRY_DELAY_MS * (1L << (attempts - 1));
LOG.warn("Retrying in {}ms", retryDelay);
LOG.warn("Retrying in {} ms", retryDelay);
Thread.sleep(retryDelay);
} else {
throw new AssertionError("Failed to download file after " + MAX_RETRIES + " attempts", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,19 @@
import org.apache.gravitino.exceptions.NonEmptyEntityException;
import org.apache.gravitino.listener.api.event.AlterMetalakeEvent;
import org.apache.gravitino.listener.api.event.AlterMetalakeFailureEvent;
import org.apache.gravitino.listener.api.event.AlterMetalakePreEvent;
import org.apache.gravitino.listener.api.event.CreateMetalakeEvent;
import org.apache.gravitino.listener.api.event.CreateMetalakeFailureEvent;
import org.apache.gravitino.listener.api.event.CreateMetalakePreEvent;
import org.apache.gravitino.listener.api.event.DropMetalakeEvent;
import org.apache.gravitino.listener.api.event.DropMetalakeFailureEvent;
import org.apache.gravitino.listener.api.event.DropMetalakePreEvent;
import org.apache.gravitino.listener.api.event.ListMetalakeEvent;
import org.apache.gravitino.listener.api.event.ListMetalakeFailureEvent;
import org.apache.gravitino.listener.api.event.ListMetalakePreEvent;
import org.apache.gravitino.listener.api.event.LoadMetalakeEvent;
import org.apache.gravitino.listener.api.event.LoadMetalakeFailureEvent;
import org.apache.gravitino.listener.api.event.LoadMetalakePreEvent;
import org.apache.gravitino.listener.api.info.MetalakeInfo;
import org.apache.gravitino.metalake.MetalakeDispatcher;
import org.apache.gravitino.utils.PrincipalUtils;
Expand Down Expand Up @@ -65,6 +70,7 @@ public MetalakeEventDispatcher(EventBus eventBus, MetalakeDispatcher dispatcher)

@Override
public Metalake[] listMetalakes() {
eventBus.dispatchEvent(new ListMetalakePreEvent(PrincipalUtils.getCurrentUserName()));
try {
Metalake[] metalakes = dispatcher.listMetalakes();
eventBus.dispatchEvent(new ListMetalakeEvent(PrincipalUtils.getCurrentUserName()));
Expand All @@ -77,6 +83,7 @@ public Metalake[] listMetalakes() {

@Override
public Metalake loadMetalake(NameIdentifier ident) throws NoSuchMetalakeException {
eventBus.dispatchEvent(new LoadMetalakePreEvent(PrincipalUtils.getCurrentUserName(), ident));
try {
Metalake metalake = dispatcher.loadMetalake(ident);
eventBus.dispatchEvent(
Expand All @@ -99,6 +106,10 @@ public boolean metalakeExists(NameIdentifier ident) {
public Metalake createMetalake(
NameIdentifier ident, String comment, Map<String, String> properties)
throws MetalakeAlreadyExistsException {
MetalakeInfo createMetalakeRequest = new MetalakeInfo(ident.name(), comment, properties, null);
eventBus.dispatchEvent(
new CreateMetalakePreEvent(
PrincipalUtils.getCurrentUserName(), ident, createMetalakeRequest));
try {
Metalake metalake = dispatcher.createMetalake(ident, comment, properties);
eventBus.dispatchEvent(
Expand All @@ -117,6 +128,8 @@ public Metalake createMetalake(
@Override
public Metalake alterMetalake(NameIdentifier ident, MetalakeChange... changes)
throws NoSuchMetalakeException, IllegalArgumentException {
eventBus.dispatchEvent(
new AlterMetalakePreEvent(PrincipalUtils.getCurrentUserName(), ident, changes));
try {
Metalake metalake = dispatcher.alterMetalake(ident, changes);
eventBus.dispatchEvent(
Expand All @@ -133,6 +146,7 @@ public Metalake alterMetalake(NameIdentifier ident, MetalakeChange... changes)
@Override
public boolean dropMetalake(NameIdentifier ident, boolean force)
throws NonEmptyEntityException, MetalakeInUseException {
eventBus.dispatchEvent(new DropMetalakePreEvent(PrincipalUtils.getCurrentUserName(), ident));
try {
boolean isExists = dispatcher.dropMetalake(ident, force);
eventBus.dispatchEvent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,19 @@
import org.apache.gravitino.exceptions.TopicAlreadyExistsException;
import org.apache.gravitino.listener.api.event.AlterTopicEvent;
import org.apache.gravitino.listener.api.event.AlterTopicFailureEvent;
import org.apache.gravitino.listener.api.event.AlterTopicPreEvent;
import org.apache.gravitino.listener.api.event.CreateTopicEvent;
import org.apache.gravitino.listener.api.event.CreateTopicFailureEvent;
import org.apache.gravitino.listener.api.event.CreateTopicPreEvent;
import org.apache.gravitino.listener.api.event.DropTopicEvent;
import org.apache.gravitino.listener.api.event.DropTopicFailureEvent;
import org.apache.gravitino.listener.api.event.DropTopicPreEvent;
import org.apache.gravitino.listener.api.event.ListTopicEvent;
import org.apache.gravitino.listener.api.event.ListTopicFailureEvent;
import org.apache.gravitino.listener.api.event.ListTopicPreEvent;
import org.apache.gravitino.listener.api.event.LoadTopicEvent;
import org.apache.gravitino.listener.api.event.LoadTopicFailureEvent;
import org.apache.gravitino.listener.api.event.LoadTopicPreEvent;
import org.apache.gravitino.listener.api.info.TopicInfo;
import org.apache.gravitino.messaging.DataLayout;
import org.apache.gravitino.messaging.Topic;
Expand Down Expand Up @@ -66,6 +71,8 @@ public TopicEventDispatcher(EventBus eventBus, TopicDispatcher dispatcher) {
@Override
public Topic alterTopic(NameIdentifier ident, TopicChange... changes)
throws NoSuchTopicException, IllegalArgumentException {
eventBus.dispatchEvent(
new AlterTopicPreEvent(PrincipalUtils.getCurrentUserName(), ident, changes));
try {
Topic topic = dispatcher.alterTopic(ident, changes);
eventBus.dispatchEvent(
Expand All @@ -81,6 +88,7 @@ public Topic alterTopic(NameIdentifier ident, TopicChange... changes)

@Override
public boolean dropTopic(NameIdentifier ident) {
eventBus.dispatchEvent(new DropTopicPreEvent(PrincipalUtils.getCurrentUserName(), ident));
try {
boolean isExists = dispatcher.dropTopic(ident);
eventBus.dispatchEvent(
Expand All @@ -95,6 +103,7 @@ public boolean dropTopic(NameIdentifier ident) {

@Override
public NameIdentifier[] listTopics(Namespace namespace) throws NoSuchTopicException {
eventBus.dispatchEvent(new ListTopicPreEvent(PrincipalUtils.getCurrentUserName(), namespace));
try {
NameIdentifier[] nameIdentifiers = dispatcher.listTopics(namespace);
eventBus.dispatchEvent(new ListTopicEvent(PrincipalUtils.getCurrentUserName(), namespace));
Expand All @@ -108,6 +117,7 @@ public NameIdentifier[] listTopics(Namespace namespace) throws NoSuchTopicExcept

@Override
public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException {
eventBus.dispatchEvent(new LoadTopicPreEvent(PrincipalUtils.getCurrentUserName(), ident));
try {
Topic topic = dispatcher.loadTopic(ident);
eventBus.dispatchEvent(
Expand All @@ -129,13 +139,15 @@ public boolean topicExists(NameIdentifier ident) {
public Topic createTopic(
NameIdentifier ident, String comment, DataLayout dataLayout, Map<String, String> properties)
throws NoSuchTopicException, TopicAlreadyExistsException {
TopicInfo createTopicRequest = new TopicInfo(ident.name(), comment, properties, null);
eventBus.dispatchEvent(
new CreateTopicPreEvent(PrincipalUtils.getCurrentUserName(), ident, createTopicRequest));
try {
Topic topic = dispatcher.createTopic(ident, comment, dataLayout, properties);
eventBus.dispatchEvent(
new CreateTopicEvent(PrincipalUtils.getCurrentUserName(), ident, new TopicInfo(topic)));
return topic;
} catch (Exception e) {
TopicInfo createTopicRequest = new TopicInfo(ident.name(), comment, properties, null);
eventBus.dispatchEvent(
new CreateTopicFailureEvent(
PrincipalUtils.getCurrentUserName(), ident, e, createTopicRequest));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.listener.api.event;

import org.apache.gravitino.MetalakeChange;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.annotation.DeveloperApi;

/** Represents an event triggered before altering a metalake. */
@DeveloperApi
public class AlterMetalakePreEvent extends PreEvent {
private final MetalakeChange[] metalakeChanges;

public AlterMetalakePreEvent(
String user, NameIdentifier identifier, MetalakeChange[] metalakeChanges) {
super(user, identifier);
this.metalakeChanges = metalakeChanges;
}

/**
* Retrieves the specific changes that were made to the metalake during the alteration process.
*
* @return An array of {@link MetalakeChange} objects detailing each modification applied to the
* metalake.
*/
public MetalakeChange[] metalakeChanges() {
return metalakeChanges;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.listener.api.event;

import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.annotation.DeveloperApi;
import org.apache.gravitino.messaging.TopicChange;

/** Represents an event triggered before altering a topic. */
@DeveloperApi
public class AlterTopicPreEvent extends TopicPreEvent {
private final TopicChange[] topicChanges;

public AlterTopicPreEvent(String user, NameIdentifier identifier, TopicChange[] topicChanges) {
super(user, identifier);
this.topicChanges = topicChanges;
}

/**
* Retrieves the specific changes made to the topic during the alteration process.
*
* @return An array of {@link TopicChange} objects detailing each modification applied to the
* topic.
*/
public TopicChange[] topicChanges() {
return topicChanges;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.listener.api.event;

import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.annotation.DeveloperApi;
import org.apache.gravitino.listener.api.info.MetalakeInfo;

/** Represents an event triggered before creating a metalake. */
@DeveloperApi
public class CreateMetalakePreEvent extends MetalakePreEvent {

private final MetalakeInfo createMetalakeRequest;

public CreateMetalakePreEvent(
String user, NameIdentifier identifier, MetalakeInfo createMetalakeRequest) {
super(user, identifier);
this.createMetalakeRequest = createMetalakeRequest;
}

/**
* @return A {@link MetalakeInfo} instance encapsulating the comprehensive details of create
* metalake
*/
public MetalakeInfo createMetalakeRequest() {
return createMetalakeRequest;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.listener.api.event;

import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.annotation.DeveloperApi;
import org.apache.gravitino.listener.api.info.TopicInfo;

/** Represents an event triggered before creating a topic. */
@DeveloperApi
public class CreateTopicPreEvent extends TopicPreEvent {
private final TopicInfo createTopicRequest;

public CreateTopicPreEvent(String user, NameIdentifier identifier, TopicInfo createTopicRequest) {
super(user, identifier);
this.createTopicRequest = createTopicRequest;
}

/**
* Retrieves the creation topic request.
*
* @return A {@link TopicInfo} instance encapsulating the comprehensive details of create topic
* request.
*/
public TopicInfo createTopicRequest() {
return createTopicRequest;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.listener.api.event;

import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.annotation.DeveloperApi;

/** Represents an event triggered before dropping a metalake. */
@DeveloperApi
public class DropMetalakePreEvent extends MetalakePreEvent {

public DropMetalakePreEvent(String user, NameIdentifier identifier) {
super(user, identifier);
}
}
Loading

0 comments on commit a15df66

Please sign in to comment.