diff --git a/core/src/main/java/org/apache/gravitino/listener/AsyncQueueListener.java b/core/src/main/java/org/apache/gravitino/listener/AsyncQueueListener.java index 641bc3eb532..18043964ddd 100644 --- a/core/src/main/java/org/apache/gravitino/listener/AsyncQueueListener.java +++ b/core/src/main/java/org/apache/gravitino/listener/AsyncQueueListener.java @@ -29,7 +29,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.gravitino.listener.api.EventListenerPlugin; +import org.apache.gravitino.listener.api.event.BaseEvent; import org.apache.gravitino.listener.api.event.Event; +import org.apache.gravitino.listener.api.event.PreEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +46,7 @@ public class AsyncQueueListener implements EventListenerPlugin { private static final String NAME_PREFIX = "async-queue-listener-"; private final List eventListeners; - private final BlockingQueue queue; + private final BlockingQueue queue; private final Thread asyncProcessor; private final int dispatcherJoinSeconds; private final AtomicBoolean stopped = new AtomicBoolean(false); @@ -68,20 +70,13 @@ public AsyncQueueListener( } @Override - public void onPostEvent(Event event) { - if (stopped.get()) { - LOG.warn( - "{} drop event: {}, since AsyncQueueListener is stopped", - asyncQueueListenerName, - event.getClass().getSimpleName()); - return; - } - - if (queue.offer(event)) { - return; - } + public void onPreEvent(PreEvent event) { + enqueueEvent(event); + } - logDropEventsIfNecessary(); + @Override + public void onPostEvent(Event event) { + enqueueEvent(event); } @Override @@ -117,8 +112,14 @@ List getEventListeners() { private void processEvents() { while (!Thread.currentThread().isInterrupted()) { try { - Event event = queue.take(); - this.eventListeners.forEach(listener -> listener.onPostEvent(event)); + BaseEvent baseEvent = queue.take(); + if (baseEvent instanceof PreEvent) { + this.eventListeners.forEach(listener -> listener.onPreEvent((PreEvent) baseEvent)); + } else if (baseEvent instanceof Event) { + this.eventListeners.forEach(listener -> listener.onPostEvent((Event) baseEvent)); + } else { + LOG.warn("Unknown event type: {}", baseEvent.getClass().getSimpleName()); + } } catch (InterruptedException e) { LOG.warn("{} event dispatcher thread is interrupted.", asyncQueueListenerName); break; @@ -154,4 +155,20 @@ private void logDropEventsIfNecessary() { } } } + + private void enqueueEvent(BaseEvent baseEvent) { + if (stopped.get()) { + LOG.warn( + "{} drop event: {}, since AsyncQueueListener is stopped", + asyncQueueListenerName, + baseEvent.getClass().getSimpleName()); + return; + } + + if (queue.offer(baseEvent)) { + return; + } + + logDropEventsIfNecessary(); + } } diff --git a/core/src/main/java/org/apache/gravitino/listener/EventBus.java b/core/src/main/java/org/apache/gravitino/listener/EventBus.java index 6b18f9a5aca..d851dc29271 100644 --- a/core/src/main/java/org/apache/gravitino/listener/EventBus.java +++ b/core/src/main/java/org/apache/gravitino/listener/EventBus.java @@ -21,8 +21,11 @@ import com.google.common.annotations.VisibleForTesting; import java.util.List; +import org.apache.gravitino.exceptions.ForbiddenException; import org.apache.gravitino.listener.api.EventListenerPlugin; +import org.apache.gravitino.listener.api.event.BaseEvent; import org.apache.gravitino.listener.api.event.Event; +import org.apache.gravitino.listener.api.event.PreEvent; /** * The {@code EventBus} class serves as a mechanism to dispatch events to registered listeners. It @@ -34,26 +37,32 @@ public class EventBus { // EventListenerPluginWrapper, // which are meant for synchronous event listening, or AsyncQueueListener, designed for // asynchronous event processing. - private final List postEventListeners; + private final List eventListeners; /** * Constructs an EventBus with a predefined list of event listeners. * - * @param postEventListeners A list of {@link EventListenerPlugin} instances that are to be - * registered with this EventBus for event dispatch. + * @param eventListeners A list of {@link EventListenerPlugin} instances that are to be registered + * with this EventBus for event dispatch. */ - public EventBus(List postEventListeners) { - this.postEventListeners = postEventListeners; + public EventBus(List eventListeners) { + this.eventListeners = eventListeners; } /** * Dispatches an event to all registered listeners. Each listener processes the event based on its * implementation, which could be either synchronous or asynchronous. * - * @param event The event to be dispatched to all registered listeners. + * @param baseEvent The event to be dispatched to all registered listeners. */ - public void dispatchEvent(Event event) { - postEventListeners.forEach(postEventListener -> postEventListener.onPostEvent(event)); + public void dispatchEvent(BaseEvent baseEvent) { + if (baseEvent instanceof PreEvent) { + dispatchPreEvent((PreEvent) baseEvent); + } else if (baseEvent instanceof Event) { + dispatchPostEvent((Event) baseEvent); + } else { + throw new RuntimeException("Unknown event type:" + baseEvent.getClass().getSimpleName()); + } } /** @@ -64,7 +73,15 @@ public void dispatchEvent(Event event) { * EventBus. */ @VisibleForTesting - List getPostEventListeners() { - return postEventListeners; + List getEventListeners() { + return eventListeners; + } + + private void dispatchPostEvent(Event postEvent) { + eventListeners.forEach(eventListener -> eventListener.onPostEvent(postEvent)); + } + + private void dispatchPreEvent(PreEvent preEvent) throws ForbiddenException { + eventListeners.forEach(eventListener -> eventListener.onPreEvent(preEvent)); } } diff --git a/core/src/main/java/org/apache/gravitino/listener/EventListenerPluginWrapper.java b/core/src/main/java/org/apache/gravitino/listener/EventListenerPluginWrapper.java index a1483396384..8e0a2ffbc45 100644 --- a/core/src/main/java/org/apache/gravitino/listener/EventListenerPluginWrapper.java +++ b/core/src/main/java/org/apache/gravitino/listener/EventListenerPluginWrapper.java @@ -21,8 +21,11 @@ import com.google.common.annotations.VisibleForTesting; import java.util.Map; +import org.apache.gravitino.exceptions.ForbiddenException; import org.apache.gravitino.listener.api.EventListenerPlugin; +import org.apache.gravitino.listener.api.event.BaseEvent; import org.apache.gravitino.listener.api.event.Event; +import org.apache.gravitino.listener.api.event.PreEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,11 +70,27 @@ public void onPostEvent(Event event) { try { userEventListener.onPostEvent(event); } catch (Exception e) { - LOG.warn( - "Event listener {} process event {} failed,", - listenerName, - event.getClass().getSimpleName(), - e); + printExceptionInEventProcess(listenerName, event, e); + } + } + + @Override + public void onPreEvent(PreEvent preEvent) { + try { + userEventListener.onPreEvent(preEvent); + } catch (ForbiddenException e) { + if (Mode.SYNC.equals(mode())) { + LOG.warn( + "Event listener {} process pre event {} throws ForbiddenException, will skip the " + + "operation.", + listenerName, + preEvent.getClass().getSimpleName(), + e); + throw e; + } + printExceptionInEventProcess(listenerName, preEvent, e); + } catch (Exception e) { + printExceptionInEventProcess(listenerName, preEvent, e); } } @@ -79,4 +98,12 @@ public void onPostEvent(Event event) { EventListenerPlugin getUserEventListener() { return userEventListener; } + + private void printExceptionInEventProcess(String listenerName, BaseEvent baseEvent, Exception e) { + LOG.warn( + "Event listener {} process event {} failed,", + listenerName, + baseEvent.getClass().getSimpleName(), + e); + } } diff --git a/core/src/main/java/org/apache/gravitino/listener/api/EventListenerPlugin.java b/core/src/main/java/org/apache/gravitino/listener/api/EventListenerPlugin.java index 8a0b8d98286..06d5b444019 100644 --- a/core/src/main/java/org/apache/gravitino/listener/api/EventListenerPlugin.java +++ b/core/src/main/java/org/apache/gravitino/listener/api/EventListenerPlugin.java @@ -21,7 +21,9 @@ import java.util.Map; import org.apache.gravitino.annotation.DeveloperApi; +import org.apache.gravitino.exceptions.ForbiddenException; import org.apache.gravitino.listener.api.event.Event; +import org.apache.gravitino.listener.api.event.PreEvent; /** * Defines an interface for event listeners that manage the lifecycle and state of a plugin, @@ -95,17 +97,29 @@ enum Mode { void stop() throws RuntimeException; /** - * Handles events generated after the completion of an operation. Implementers are responsible for - * processing these events, which may involve additional logic to respond to the operation - * outcomes. + * Handle post-events generated after the completion of an operation. * - *

This method provides a hook for post-operation event processing, allowing plugins to react - * or adapt based on the event details. + *

This method provides a hook for post-operation event processing, you couldn't change the + * resource in the event. * - * @param event The event to be processed. - * @throws RuntimeException Indicates issues encountered during event processing. + * @param postEvent The post event to be processed. + * @throws RuntimeException Indicates issues encountered during event processing, this has no + * affect to the operation. */ - void onPostEvent(Event event) throws RuntimeException; + default void onPostEvent(Event postEvent) throws RuntimeException {} + + /** + * Handle pre-events generated before the operation. + * + *

This method handles pre-operation events in SYNC or ASYNC mode, any changes to resources in + * the event will affect the subsequent operations. + * + * @param preEvent The pre event to be processed. + * @throws ForbiddenException The subsequent operation will be skipped if and only if the event + * listener throwing {@code org.apache.gravitino.exceptions.ForbiddenException} and the event + * listener is SYNC mode, the exception will be ignored and logged only in other conditions. + */ + default void onPreEvent(PreEvent preEvent) throws ForbiddenException {} /** * Specifies the default operational mode for event processing by the plugin. The default diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/BaseEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/BaseEvent.java new file mode 100644 index 00000000000..973323a0591 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/BaseEvent.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import javax.annotation.Nullable; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; + +/** + * The abstract base class for all events. It encapsulates common information such as the user who + * generated the event and the identifier for the resource associated with the event. Subclasses + * should provide specific details related to their individual event types. + */ +@DeveloperApi +public abstract class BaseEvent { + private final String user; + @Nullable private final NameIdentifier identifier; + private final long eventTime; + + /** + * Constructs an Event instance with the specified user and resource identifier details. + * + * @param user The user associated with this event. It provides context about who triggered the + * event. + * @param identifier The resource identifier associated with this event. This may refer to various + * types of resources such as a metalake, catalog, schema, or table, etc. + */ + protected BaseEvent(String user, NameIdentifier identifier) { + this.user = user; + this.identifier = identifier; + this.eventTime = System.currentTimeMillis(); + } + + /** + * Retrieves the user associated with this event. + * + * @return A string representing the user associated with this event. + */ + public String user() { + return user; + } + + /** + * Retrieves the resource identifier associated with this event. + * + *

For list operations within a namespace, the identifier is the identifier corresponds to that + * namespace. For metalake list operation, identifier is null. + * + * @return A NameIdentifier object that represents the resource, like a metalake, catalog, schema, + * table, etc., associated with the event. + */ + @Nullable + public NameIdentifier identifier() { + return identifier; + } + + /** + * Returns the timestamp when the event was created. + * + * @return The event creation time in milliseconds since epoch. + */ + public long eventTime() { + return eventTime; + } +} diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/Event.java b/core/src/main/java/org/apache/gravitino/listener/api/event/Event.java index 89e233b430e..7dba616d42b 100644 --- a/core/src/main/java/org/apache/gravitino/listener/api/event/Event.java +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/Event.java @@ -19,64 +19,13 @@ package org.apache.gravitino.listener.api.event; -import javax.annotation.Nullable; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.annotation.DeveloperApi; -/** - * The abstract base class for all events. It encapsulates common information such as the user who - * generated the event and the identifier for the resource associated with the event. Subclasses - * should provide specific details related to their individual event types. - */ +/** Represents a post event. */ @DeveloperApi -public abstract class Event { - private final String user; - @Nullable private final NameIdentifier identifier; - private final long eventTime; - - /** - * Constructs an Event instance with the specified user and resource identifier details. - * - * @param user The user associated with this event. It provides context about who triggered the - * event. - * @param identifier The resource identifier associated with this event. This may refer to various - * types of resources such as a metalake, catalog, schema, or table, etc. - */ +public abstract class Event extends BaseEvent { protected Event(String user, NameIdentifier identifier) { - this.user = user; - this.identifier = identifier; - this.eventTime = System.currentTimeMillis(); - } - - /** - * Retrieves the user associated with this event. - * - * @return A string representing the user associated with this event. - */ - public String user() { - return user; - } - - /** - * Retrieves the resource identifier associated with this event. - * - *

For list operations within a namespace, the identifier is the identifier corresponds to that - * namespace. For metalake list operation, identifier is null. - * - * @return A NameIdentifier object that represents the resource, like a metalake, catalog, schema, - * table, etc., associated with the event. - */ - @Nullable - public NameIdentifier identifier() { - return identifier; - } - - /** - * Returns the timestamp when the event was created. - * - * @return The event creation time in milliseconds since epoch. - */ - public long eventTime() { - return eventTime; + super(user, identifier); } } diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/PreEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/PreEvent.java new file mode 100644 index 00000000000..52e26aec346 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/PreEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.event; + +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.annotation.DeveloperApi; + +/** Represents a pre event. */ +@DeveloperApi +public abstract class PreEvent extends BaseEvent { + protected PreEvent(String user, NameIdentifier identifier) { + super(user, identifier); + } +} diff --git a/core/src/test/java/org/apache/gravitino/listener/DummyEventListener.java b/core/src/test/java/org/apache/gravitino/listener/DummyEventListener.java index 17e3e424963..4ec7ab71523 100644 --- a/core/src/test/java/org/apache/gravitino/listener/DummyEventListener.java +++ b/core/src/test/java/org/apache/gravitino/listener/DummyEventListener.java @@ -24,14 +24,17 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import lombok.Getter; +import org.apache.gravitino.exceptions.ForbiddenException; import org.apache.gravitino.listener.api.EventListenerPlugin; import org.apache.gravitino.listener.api.event.Event; +import org.apache.gravitino.listener.api.event.PreEvent; import org.awaitility.Awaitility; import org.junit.jupiter.api.Assertions; public class DummyEventListener implements EventListenerPlugin { Map properties; - @Getter LinkedList events = new LinkedList<>(); + @Getter LinkedList postEvents = new LinkedList<>(); + @Getter LinkedList preEvents = new LinkedList<>(); @Override public void init(Map properties) { @@ -46,7 +49,17 @@ public void stop() {} @Override public void onPostEvent(Event event) { - this.events.add(event); + postEvents.add(event); + } + + @Override + public void onPreEvent(PreEvent preEvent) { + if (preEvent.equals(TestEventListenerManager.DUMMY_FORBIDDEN_PRE_EVENT_INSTANCE)) { + throw new ForbiddenException(""); + } else if (preEvent.equals(TestEventListenerManager.DUMMY_EXCEPTION_PRE_EVENT_INSTANCE)) { + throw new RuntimeException(""); + } + preEvents.add(preEvent); } @Override @@ -54,18 +67,26 @@ public Mode mode() { return Mode.SYNC; } - public Event popEvent() { - Assertions.assertTrue(events.size() > 0, "No events to pop"); - return events.removeLast(); + public Event popPostEvent() { + Assertions.assertTrue(postEvents.size() > 0, "No events to pop"); + return postEvents.removeLast(); } public static class DummyAsyncEventListener extends DummyEventListener { - public List tryGetEvents() { + public List tryGetPostEvents() { + Awaitility.await() + .atMost(20, TimeUnit.SECONDS) + .pollInterval(10, TimeUnit.MILLISECONDS) + .until(() -> getPostEvents().size() > 0); + return getPostEvents(); + } + + public List tryGetPreEvents() { Awaitility.await() .atMost(20, TimeUnit.SECONDS) .pollInterval(10, TimeUnit.MILLISECONDS) - .until(() -> getEvents().size() > 0); - return getEvents(); + .until(() -> getPreEvents().size() > 0); + return getPreEvents(); } @Override diff --git a/core/src/test/java/org/apache/gravitino/listener/TestEventListenerManager.java b/core/src/test/java/org/apache/gravitino/listener/TestEventListenerManager.java index d0dda8878f9..fd7a612726f 100644 --- a/core/src/test/java/org/apache/gravitino/listener/TestEventListenerManager.java +++ b/core/src/test/java/org/apache/gravitino/listener/TestEventListenerManager.java @@ -26,22 +26,42 @@ import java.util.Set; import java.util.stream.Collectors; import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.exceptions.ForbiddenException; import org.apache.gravitino.listener.DummyEventListener.DummyAsyncEventListener; import org.apache.gravitino.listener.DummyEventListener.DummyAsyncIsolatedEventListener; import org.apache.gravitino.listener.api.EventListenerPlugin; import org.apache.gravitino.listener.api.event.Event; +import org.apache.gravitino.listener.api.event.PreEvent; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; public class TestEventListenerManager { - static class DummyEvent extends Event { - protected DummyEvent(String user, NameIdentifier identifier) { + + static class DummyPostEvent extends Event { + + protected DummyPostEvent(String user, NameIdentifier identifier) { super(user, identifier); } } - private static final DummyEvent DUMMY_EVENT_INSTANCE = - new DummyEvent("user", NameIdentifier.of("a", "b")); + static class DummyPreEvent extends PreEvent { + + protected DummyPreEvent(String user, NameIdentifier identifier) { + super(user, identifier); + } + } + + private static final DummyPostEvent DUMMY_POST_EVENT_INSTANCE = + new DummyPostEvent("user", NameIdentifier.of("a", "b")); + + private static final DummyPreEvent DUMMY_PRE_EVENT_INSTANCE = + new DummyPreEvent("user2", NameIdentifier.of("a2", "b2")); + + public static final DummyPreEvent DUMMY_FORBIDDEN_PRE_EVENT_INSTANCE = + new DummyPreEvent("user3", NameIdentifier.of("a3", "b3")); + + public static final DummyPreEvent DUMMY_EXCEPTION_PRE_EVENT_INSTANCE = + new DummyPreEvent("user4", NameIdentifier.of("a4", "b4")); @Test void testSyncListener() { @@ -54,9 +74,10 @@ void testSyncListener() { eventListenerManager.start(); EventBus eventBus = eventListenerManager.createEventBus(); - eventBus.dispatchEvent(DUMMY_EVENT_INSTANCE); - List listeners = eventBus.getPostEventListeners(); + // test post event + eventBus.dispatchEvent(DUMMY_POST_EVENT_INSTANCE); + List listeners = eventBus.getEventListeners(); Assertions.assertEquals(2, listeners.size()); Set names = listeners.stream() @@ -66,7 +87,27 @@ void testSyncListener() { EventListenerPluginWrapper wrapper = (EventListenerPluginWrapper) listener; EventListenerPlugin userListener = wrapper.getUserEventListener(); Assertions.assertTrue(userListener instanceof DummyEventListener); - checkEvents(((DummyEventListener) userListener).getEvents()); + checkPostEvents(((DummyEventListener) userListener).getPostEvents()); + Assertions.assertEquals( + 0, ((DummyEventListener) userListener).getPreEvents().size()); + return ((DummyEventListener) userListener).properties.get("name"); + }) + .collect(Collectors.toSet()); + Assertions.assertEquals(ImmutableSet.of(sync1, sync2), names); + + // test pre event + eventBus.dispatchEvent(DUMMY_PRE_EVENT_INSTANCE); + names = + listeners.stream() + .map( + listener -> { + Assertions.assertTrue(listener instanceof EventListenerPluginWrapper); + EventListenerPluginWrapper wrapper = (EventListenerPluginWrapper) listener; + EventListenerPlugin userListener = wrapper.getUserEventListener(); + Assertions.assertTrue(userListener instanceof DummyEventListener); + checkPreEvents(((DummyEventListener) userListener).getPreEvents()); + Assertions.assertEquals( + 0, ((DummyEventListener) userListener).getPostEvents().size()); return ((DummyEventListener) userListener).properties.get("name"); }) .collect(Collectors.toSet()); @@ -84,10 +125,11 @@ void testSharedAsyncListeners() { EventListenerManager eventListenerManager = new EventListenerManager(); eventListenerManager.init(properties); eventListenerManager.start(); - EventBus eventBus = eventListenerManager.createEventBus(); - eventBus.dispatchEvent(DUMMY_EVENT_INSTANCE); - List listeners = eventBus.getPostEventListeners(); + + // Test post event + eventBus.dispatchEvent(DUMMY_POST_EVENT_INSTANCE); + List listeners = eventBus.getEventListeners(); Assertions.assertEquals(1, listeners.size()); Assertions.assertTrue(listeners.get(0) instanceof AsyncQueueListener); @@ -102,12 +144,27 @@ void testSharedAsyncListeners() { EventListenerPlugin userListener = ((EventListenerPluginWrapper) shareQueueListener).getUserEventListener(); Assertions.assertTrue(userListener instanceof DummyAsyncEventListener); - checkEvents(((DummyAsyncEventListener) userListener).tryGetEvents()); + checkPostEvents(((DummyAsyncEventListener) userListener).tryGetPostEvents()); + Assertions.assertEquals( + 0, ((DummyAsyncEventListener) userListener).getPreEvents().size()); return ((DummyAsyncEventListener) userListener).properties.get("name"); }) .collect(Collectors.toSet()); Assertions.assertEquals(ImmutableSet.of(async1, async2), sharedQueueListenerNames); + // Test pre event + eventBus.dispatchEvent(DUMMY_PRE_EVENT_INSTANCE); + shareQueueListeners.forEach( + shareQueueListener -> { + Assertions.assertTrue(shareQueueListener instanceof EventListenerPluginWrapper); + EventListenerPlugin userListener = + ((EventListenerPluginWrapper) shareQueueListener).getUserEventListener(); + Assertions.assertTrue(userListener instanceof DummyAsyncEventListener); + checkPreEvents(((DummyAsyncEventListener) userListener).tryGetPreEvents()); + Assertions.assertEquals( + 0, ((DummyAsyncEventListener) userListener).getPostEvents().size()); + }); + eventListenerManager.stop(); } @@ -122,8 +179,8 @@ void testIsolatedAsyncListeners() { eventListenerManager.start(); EventBus eventBus = eventListenerManager.createEventBus(); - eventBus.dispatchEvent(DUMMY_EVENT_INSTANCE); - List listeners = eventBus.getPostEventListeners(); + eventBus.dispatchEvent(DUMMY_POST_EVENT_INSTANCE); + List listeners = eventBus.getEventListeners(); Assertions.assertEquals(2, listeners.size()); Set isolatedListenerNames = @@ -141,12 +198,49 @@ void testIsolatedAsyncListeners() { ((EventListenerPluginWrapper) internalListeners.get(0)) .getUserEventListener(); Assertions.assertTrue(userListener instanceof DummyAsyncEventListener); - checkEvents(((DummyAsyncEventListener) userListener).tryGetEvents()); + checkPostEvents(((DummyAsyncEventListener) userListener).tryGetPostEvents()); + Assertions.assertEquals( + 0, ((DummyAsyncEventListener) userListener).getPreEvents().size()); return ((DummyAsyncEventListener) userListener).properties.get("name"); }) .collect(Collectors.toSet()); Assertions.assertEquals(ImmutableSet.of(async1, async2), isolatedListenerNames); + eventBus.dispatchEvent(DUMMY_PRE_EVENT_INSTANCE); + listeners.forEach( + listener -> { + Assertions.assertTrue(listener instanceof AsyncQueueListener); + AsyncQueueListener asyncQueueListener = (AsyncQueueListener) listener; + List internalListeners = asyncQueueListener.getEventListeners(); + Assertions.assertEquals(1, internalListeners.size()); + Assertions.assertTrue(internalListeners.get(0) instanceof EventListenerPluginWrapper); + EventListenerPlugin userListener = + ((EventListenerPluginWrapper) internalListeners.get(0)).getUserEventListener(); + Assertions.assertTrue(userListener instanceof DummyAsyncEventListener); + checkPreEvents(((DummyAsyncEventListener) userListener).tryGetPreEvents()); + Assertions.assertEquals( + 0, ((DummyAsyncEventListener) userListener).getPostEvents().size()); + }); + + eventListenerManager.stop(); + } + + @Test + void testForbiddenPreEvent() { + String sync1 = "sync1"; + String sync2 = "sync2"; + Map properties = createSyncEventListenerConfig(sync1, sync2); + + EventListenerManager eventListenerManager = new EventListenerManager(); + eventListenerManager.init(properties); + eventListenerManager.start(); + + EventBus eventBus = eventListenerManager.createEventBus(); + + Assertions.assertThrowsExactly( + ForbiddenException.class, () -> eventBus.dispatchEvent(DUMMY_FORBIDDEN_PRE_EVENT_INSTANCE)); + + Assertions.assertDoesNotThrow(() -> eventBus.dispatchEvent(DUMMY_EXCEPTION_PRE_EVENT_INSTANCE)); eventListenerManager.stop(); } @@ -206,8 +300,15 @@ private Map createSyncEventListenerConfig(String sync1, String s return config; } - private void checkEvents(List events) { + private void checkPostEvents(List events) { + Assertions.assertEquals(1, events.size()); + Assertions.assertEquals(DUMMY_POST_EVENT_INSTANCE, events.get(0)); + events.clear(); + } + + private void checkPreEvents(List events) { Assertions.assertEquals(1, events.size()); - Assertions.assertEquals(DUMMY_EVENT_INSTANCE, events.get(0)); + Assertions.assertEquals(DUMMY_PRE_EVENT_INSTANCE, events.get(0)); + events.clear(); } } diff --git a/core/src/test/java/org/apache/gravitino/listener/api/event/TestCatalogEvent.java b/core/src/test/java/org/apache/gravitino/listener/api/event/TestCatalogEvent.java index ae5407329b0..d2050894368 100644 --- a/core/src/test/java/org/apache/gravitino/listener/api/event/TestCatalogEvent.java +++ b/core/src/test/java/org/apache/gravitino/listener/api/event/TestCatalogEvent.java @@ -65,7 +65,7 @@ void testCreateCatalogEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", catalog.name()); dispatcher.createCatalog( identifier, catalog.type(), catalog.provider(), catalog.comment(), catalog.properties()); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(CreateCatalogEvent.class, event.getClass()); CatalogInfo catalogInfo = ((CreateCatalogEvent) event).createdCatalogInfo(); @@ -76,7 +76,7 @@ void testCreateCatalogEvent() { void testLoadCatalogEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", catalog.name()); dispatcher.loadCatalog(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(LoadCatalogEvent.class, event.getClass()); CatalogInfo catalogInfo = ((LoadCatalogEvent) event).loadedCatalogInfo(); @@ -88,7 +88,7 @@ void testAlterCatalogEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", catalog.name()); CatalogChange catalogChange = CatalogChange.setProperty("a", "b"); dispatcher.alterCatalog(identifier, catalogChange); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(AlterCatalogEvent.class, event.getClass()); CatalogInfo catalogInfo = ((AlterCatalogEvent) event).updatedCatalogInfo(); @@ -102,7 +102,7 @@ void testAlterCatalogEvent() { void testDropCatalogEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", catalog.name()); dispatcher.dropCatalog(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(DropCatalogEvent.class, event.getClass()); Assertions.assertEquals(true, ((DropCatalogEvent) event).isExists()); @@ -112,7 +112,7 @@ void testDropCatalogEvent() { void testListCatalogEvent() { Namespace namespace = Namespace.of("metalake"); dispatcher.listCatalogs(namespace); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(namespace.toString(), event.identifier().toString()); Assertions.assertEquals(ListCatalogEvent.class, event.getClass()); Assertions.assertEquals(namespace, ((ListCatalogEvent) event).namespace()); @@ -122,7 +122,7 @@ void testListCatalogEvent() { void testListCatalogInfoEvent() { Namespace namespace = Namespace.of("metalake"); dispatcher.listCatalogsInfo(namespace); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(namespace.toString(), event.identifier().toString()); Assertions.assertEquals(ListCatalogEvent.class, event.getClass()); Assertions.assertEquals(namespace, ((ListCatalogEvent) event).namespace()); @@ -140,7 +140,7 @@ void testCreateCatalogFailureEvent() { catalog.provider(), catalog.comment(), catalog.properties())); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(CreateCatalogFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -154,7 +154,7 @@ void testLoadCatalogFailureEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.loadCatalog(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(LoadCatalogFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -168,7 +168,7 @@ void testAlterCatalogFailureEvent() { Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.alterCatalog(identifier, catalogChange)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(AlterCatalogFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -183,7 +183,7 @@ void testDropCatalogFailureEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.dropCatalog(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(DropCatalogFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -195,7 +195,7 @@ void testListCatalogFailureEvent() { Namespace namespace = Namespace.of("metalake", "catalog"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.listCatalogs(namespace)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(ListCatalogFailureEvent.class, event.getClass()); Assertions.assertEquals( GravitinoRuntimeException.class, ((ListCatalogFailureEvent) event).exception().getClass()); @@ -207,7 +207,7 @@ void testListCatalogInfoFailureEvent() { Namespace namespace = Namespace.of("metalake", "catalog"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.listCatalogsInfo(namespace)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(ListCatalogFailureEvent.class, event.getClass()); Assertions.assertEquals( GravitinoRuntimeException.class, ((ListCatalogFailureEvent) event).exception().getClass()); diff --git a/core/src/test/java/org/apache/gravitino/listener/api/event/TestFilesetEvent.java b/core/src/test/java/org/apache/gravitino/listener/api/event/TestFilesetEvent.java index efc073b1979..3210887117b 100644 --- a/core/src/test/java/org/apache/gravitino/listener/api/event/TestFilesetEvent.java +++ b/core/src/test/java/org/apache/gravitino/listener/api/event/TestFilesetEvent.java @@ -75,7 +75,7 @@ void testCreateFilesetEvent() { fileset.type(), fileset.storageLocation(), fileset.properties()); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(CreateFilesetEvent.class, event.getClass()); FilesetInfo filesetInfo = ((CreateFilesetEvent) event).createdFilesetInfo(); @@ -86,7 +86,7 @@ void testCreateFilesetEvent() { void testLoadFilesetEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", fileset.name()); dispatcher.loadFileset(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(LoadFilesetEvent.class, event.getClass()); FilesetInfo filesetInfo = ((LoadFilesetEvent) event).loadedFilesetInfo(); @@ -98,7 +98,7 @@ void testAlterFilesetEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", fileset.name()); FilesetChange change = FilesetChange.setProperty("a", "b"); dispatcher.alterFileset(identifier, change); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(AlterFilesetEvent.class, event.getClass()); FilesetInfo filesetInfo = ((AlterFilesetEvent) event).updatedFilesetInfo(); @@ -111,7 +111,7 @@ void testAlterFilesetEvent() { void testDropFilesetEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", fileset.name()); dispatcher.dropFileset(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(DropFilesetEvent.class, event.getClass()); Assertions.assertTrue(((DropFilesetEvent) event).isExists()); @@ -121,7 +121,7 @@ void testDropFilesetEvent() { void testListFilesetEvent() { Namespace namespace = Namespace.of("metalake", "catalog"); dispatcher.listFilesets(namespace); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(namespace.toString(), event.identifier().toString()); Assertions.assertEquals(ListFilesetEvent.class, event.getClass()); Assertions.assertEquals(namespace, ((ListFilesetEvent) event).namespace()); @@ -136,7 +136,7 @@ void testGetFileLocationEvent() { fileset.type(), fileset.storageLocation(), fileset.properties()); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(CreateFilesetEvent.class, event.getClass()); FilesetInfo filesetInfo = ((CreateFilesetEvent) event).createdFilesetInfo(); @@ -152,7 +152,7 @@ void testGetFileLocationEvent() { CallerContext callerContext = CallerContext.builder().withContext(contextMap).build(); CallerContext.CallerContextHolder.set(callerContext); String fileLocation = dispatcher.getFileLocation(identifier, "test"); - Event event1 = dummyEventListener.popEvent(); + Event event1 = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event1.identifier()); Assertions.assertEquals(GetFileLocationEvent.class, event1.getClass()); String actualFileLocation = ((GetFileLocationEvent) event1).actualFileLocation(); @@ -180,7 +180,7 @@ void testCreateSchemaFailureEvent() { fileset.type(), fileset.storageLocation(), fileset.properties())); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(CreateFilesetFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -194,7 +194,7 @@ void testLoadFilesetFailureEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "fileset"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.loadFileset(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(LoadFilesetFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -207,7 +207,7 @@ void testAlterFilesetFailureEvent() { FilesetChange change = FilesetChange.setProperty("a", "b"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.alterFileset(identifier, change)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(AlterFilesetFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -221,7 +221,7 @@ void testDropFilesetFailureEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "fileset"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.dropFileset(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(DropFilesetFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -233,7 +233,7 @@ void testListFilesetFailureEvent() { Namespace namespace = Namespace.of("metalake", "catalog"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.listFilesets(namespace)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(namespace.toString(), event.identifier().toString()); Assertions.assertEquals(ListFilesetFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -247,7 +247,7 @@ void testGetFileLocationFailureEvent() { Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.getFileLocation(identifier, "/test")); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(GetFileLocationFailureEvent.class, event.getClass()); Assertions.assertEquals( diff --git a/core/src/test/java/org/apache/gravitino/listener/api/event/TestMetalakeEvent.java b/core/src/test/java/org/apache/gravitino/listener/api/event/TestMetalakeEvent.java index a31ce933890..319ac641f61 100644 --- a/core/src/test/java/org/apache/gravitino/listener/api/event/TestMetalakeEvent.java +++ b/core/src/test/java/org/apache/gravitino/listener/api/event/TestMetalakeEvent.java @@ -63,7 +63,7 @@ void init() { void testCreateMetalakeEvent() { NameIdentifier identifier = NameIdentifier.of("metalake"); dispatcher.createMetalake(identifier, metalake.comment(), metalake.properties()); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(CreateMetalakeEvent.class, event.getClass()); MetalakeInfo metalakeInfo = ((CreateMetalakeEvent) event).createdMetalakeInfo(); @@ -74,7 +74,7 @@ void testCreateMetalakeEvent() { void testLoadMetalakeEvent() { NameIdentifier identifier = NameIdentifier.of("metalake"); dispatcher.loadMetalake(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(LoadMetalakeEvent.class, event.getClass()); MetalakeInfo metalakeInfo = ((LoadMetalakeEvent) event).loadedMetalakeInfo(); @@ -86,7 +86,7 @@ void testAlterMetalakeEvent() { NameIdentifier identifier = NameIdentifier.of("metalake"); MetalakeChange metalakeChange = MetalakeChange.setProperty("a", "b"); dispatcher.alterMetalake(identifier, metalakeChange); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(AlterMetalakeEvent.class, event.getClass()); MetalakeInfo metalakeInfo = ((AlterMetalakeEvent) event).updatedMetalakeInfo(); @@ -100,7 +100,7 @@ void testAlterMetalakeEvent() { void testDropMetalakeEvent() { NameIdentifier identifier = NameIdentifier.of("metalake"); dispatcher.dropMetalake(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(DropMetalakeEvent.class, event.getClass()); Assertions.assertTrue(((DropMetalakeEvent) event).isExists()); @@ -109,7 +109,7 @@ void testDropMetalakeEvent() { @Test void testListMetalakeEvent() { dispatcher.listMetalakes(); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertNull(event.identifier()); Assertions.assertEquals(ListMetalakeEvent.class, event.getClass()); } @@ -122,7 +122,7 @@ void testCreateMetalakeFailureEvent() { () -> failureDispatcher.createMetalake( identifier, metalake.comment(), metalake.properties())); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(CreateMetalakeFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -136,7 +136,7 @@ void testLoadMetalakeFailureEvent() { NameIdentifier identifier = NameIdentifier.of(metalake.name()); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.loadMetalake(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(LoadMetalakeFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -150,7 +150,7 @@ void testAlterMetalakeFailureEvent() { Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.alterMetalake(identifier, metalakeChange)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(AlterMetalakeFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -166,7 +166,7 @@ void testDropMetalakeFailureEvent() { NameIdentifier identifier = NameIdentifier.of(metalake.name()); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.dropMetalake(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(DropMetalakeFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -177,7 +177,7 @@ void testDropMetalakeFailureEvent() { void testListMetalakeFailureEvent() { Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.listMetalakes()); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertNull(event.identifier()); Assertions.assertEquals(ListMetalakeFailureEvent.class, event.getClass()); Assertions.assertEquals( diff --git a/core/src/test/java/org/apache/gravitino/listener/api/event/TestPartitionEvent.java b/core/src/test/java/org/apache/gravitino/listener/api/event/TestPartitionEvent.java index 408330a4081..a1aa8aab2d0 100644 --- a/core/src/test/java/org/apache/gravitino/listener/api/event/TestPartitionEvent.java +++ b/core/src/test/java/org/apache/gravitino/listener/api/event/TestPartitionEvent.java @@ -110,7 +110,7 @@ void testCreatePartitionInfo() { void testAddPartitionEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema", "table"); dispatcher.addPartition(identifier, partition); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(AddPartitionEvent.class, event.getClass()); PartitionInfo partitionInfo = ((AddPartitionEvent) event).createdPartitionInfo(); @@ -121,7 +121,7 @@ void testAddPartitionEvent() { void testDropPartitionEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema", "table"); dispatcher.dropPartition(identifier, partition.name()); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(DropPartitionEvent.class, event.getClass()); Assertions.assertEquals(false, ((DropPartitionEvent) event).isExists()); @@ -131,7 +131,7 @@ void testDropPartitionEvent() { void testPartitionExistsEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema", "table"); dispatcher.partitionExists(identifier, partition.name()); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(PartitionExistsEvent.class, event.getClass()); Assertions.assertEquals(false, ((PartitionExistsEvent) event).isExists()); @@ -141,7 +141,7 @@ void testPartitionExistsEvent() { void testListPartitionEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema", "table"); dispatcher.listPartitions(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(ListPartitionEvent.class, event.getClass()); Assertions.assertEquals(identifier, ((ListPartitionEvent) event).identifier()); @@ -151,7 +151,7 @@ void testListPartitionEvent() { void testListPartitionNamesEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema", "table"); dispatcher.listPartitionNames(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(ListPartitionNamesEvent.class, event.getClass()); Assertions.assertEquals(identifier, ((ListPartitionNamesEvent) event).identifier()); @@ -161,7 +161,7 @@ void testListPartitionNamesEvent() { void testPurgePartitionEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema", "table"); dispatcher.purgePartition(identifier, partition.name()); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(PurgePartitionEvent.class, event.getClass()); Assertions.assertEquals(identifier, ((PurgePartitionEvent) event).identifier()); @@ -173,7 +173,7 @@ void testAddPartitionFailureEvent() { Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.addPartition(identifier, partition)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(AddPartitionFailureEvent.class, event.getClass()); Assertions.assertEquals( GravitinoRuntimeException.class, ((AddPartitionFailureEvent) event).exception().getClass()); @@ -187,7 +187,7 @@ void testDropPartitionFailureEvent() { Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.dropPartition(identifier, partition.name())); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(DropPartitionFailureEvent.class, event.getClass()); Assertions.assertEquals( GravitinoRuntimeException.class, @@ -201,7 +201,7 @@ void testPartitionExistsFailureEvent() { Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.partitionExists(identifier, partition.name())); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(PartitionExistsFailureEvent.class, event.getClass()); Assertions.assertEquals( GravitinoRuntimeException.class, @@ -214,7 +214,7 @@ void testListPartitionFailureEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema", "table"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.listPartitions(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(ListPartitionFailureEvent.class, event.getClass()); Assertions.assertEquals( GravitinoRuntimeException.class, @@ -227,7 +227,7 @@ void testListPartitionNamesFailureEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema", "table"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.listPartitionNames(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(ListPartitionNamesFailureEvent.class, event.getClass()); Assertions.assertEquals( GravitinoRuntimeException.class, @@ -241,7 +241,7 @@ void testPurgePartitionFailureEvent() { Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.purgePartition(identifier, partition.name())); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(PurgePartitionFailureEvent.class, event.getClass()); Assertions.assertEquals( GravitinoRuntimeException.class, diff --git a/core/src/test/java/org/apache/gravitino/listener/api/event/TestSchemaEvent.java b/core/src/test/java/org/apache/gravitino/listener/api/event/TestSchemaEvent.java index d9af6a155b2..c2c0d7e4468 100644 --- a/core/src/test/java/org/apache/gravitino/listener/api/event/TestSchemaEvent.java +++ b/core/src/test/java/org/apache/gravitino/listener/api/event/TestSchemaEvent.java @@ -66,7 +66,7 @@ void init() { void testCreateSchemaEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema"); dispatcher.createSchema(identifier, "", ImmutableMap.of()); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(CreateSchemaEvent.class, event.getClass()); SchemaInfo schemaInfo = ((CreateSchemaEvent) event).createdSchemaInfo(); @@ -77,7 +77,7 @@ void testCreateSchemaEvent() { void testLoadSchemaEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema"); dispatcher.loadSchema(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(LoadSchemaEvent.class, event.getClass()); SchemaInfo schemaInfo = ((LoadSchemaEvent) event).loadedSchemaInfo(); @@ -88,7 +88,7 @@ void testLoadSchemaEvent() { void testListSchemaEvent() { Namespace namespace = Namespace.of("metalake", "catalog"); dispatcher.listSchemas(namespace); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(ListSchemaEvent.class, event.getClass()); Assertions.assertEquals(namespace, ((ListSchemaEvent) event).namespace()); } @@ -98,7 +98,7 @@ void testAlterSchemaEvent() { SchemaChange schemaChange = SchemaChange.setProperty("a", "b"); NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema"); dispatcher.alterSchema(identifier, schemaChange); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(AlterSchemaEvent.class, event.getClass()); @@ -113,7 +113,7 @@ void testAlterSchemaEvent() { void testDropSchemaEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema"); dispatcher.dropSchema(identifier, true); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(DropSchemaEvent.class, event.getClass()); Assertions.assertEquals(true, ((DropSchemaEvent) event).cascade()); @@ -126,7 +126,7 @@ void testCreateSchemaFailureEvent() { Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.createSchema(identifier, schema.comment(), schema.properties())); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(CreateSchemaFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -139,7 +139,7 @@ void testLoadSchemaFailureEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.loadSchema(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(LoadSchemaFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -154,7 +154,7 @@ void testAlterSchemaFailureEvent() { Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.alterSchema(identifier, schemaChange)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(AlterSchemaFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -168,7 +168,7 @@ void testDropSchemaFailureEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "schema"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.dropSchema(identifier, true)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(DropSchemaFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -181,7 +181,7 @@ void testListSchemaFailureEvent() { Namespace namespace = Namespace.of("metalake", "catalog"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.listSchemas(namespace)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(namespace.toString(), event.identifier().toString()); Assertions.assertEquals(ListSchemaFailureEvent.class, event.getClass()); Assertions.assertEquals( diff --git a/core/src/test/java/org/apache/gravitino/listener/api/event/TestTableEvent.java b/core/src/test/java/org/apache/gravitino/listener/api/event/TestTableEvent.java index bf427f01f58..11507c34376 100644 --- a/core/src/test/java/org/apache/gravitino/listener/api/event/TestTableEvent.java +++ b/core/src/test/java/org/apache/gravitino/listener/api/event/TestTableEvent.java @@ -84,7 +84,7 @@ void testCreateTableEvent() { table.distribution(), table.sortOrder(), table.index()); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(CreateTableEvent.class, event.getClass()); TableInfo tableInfo = ((CreateTableEvent) event).createdTableInfo(); @@ -95,7 +95,7 @@ void testCreateTableEvent() { void testLoadTableEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", table.name()); dispatcher.loadTable(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(LoadTableEvent.class, event.getClass()); TableInfo tableInfo = ((LoadTableEvent) event).loadedTableInfo(); @@ -107,7 +107,7 @@ void testAlterTableEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", table.name()); TableChange change = TableChange.setProperty("a", "b"); dispatcher.alterTable(identifier, change); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(AlterTableEvent.class, event.getClass()); TableInfo tableInfo = ((AlterTableEvent) event).updatedTableInfo(); @@ -120,7 +120,7 @@ void testAlterTableEvent() { void testDropTableEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", table.name()); dispatcher.dropTable(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(DropTableEvent.class, event.getClass()); Assertions.assertEquals(true, ((DropTableEvent) event).isExists()); @@ -130,7 +130,7 @@ void testDropTableEvent() { void testPurgeTableEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", table.name()); dispatcher.purgeTable(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(PurgeTableEvent.class, event.getClass()); Assertions.assertEquals(true, ((PurgeTableEvent) event).isExists()); @@ -140,7 +140,7 @@ void testPurgeTableEvent() { void testListTableEvent() { Namespace namespace = Namespace.of("metalake", "catalog"); dispatcher.listTables(namespace); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(namespace.toString(), event.identifier().toString()); Assertions.assertEquals(ListTableEvent.class, event.getClass()); Assertions.assertEquals(namespace, ((ListTableEvent) event).namespace()); @@ -161,7 +161,7 @@ void testCreateTableFailureEvent() { table.distribution(), table.sortOrder(), table.index())); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(CreateTableFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -174,7 +174,7 @@ void testLoadTableFailureEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "table", table.name()); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.loadTable(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(LoadTableFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -187,7 +187,7 @@ void testAlterTableFailureEvent() { TableChange change = TableChange.setProperty("a", "b"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.alterTable(identifier, change)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(AlterTableFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -201,7 +201,7 @@ void testDropTableFailureEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "table", table.name()); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.dropTable(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(DropTableFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -213,7 +213,7 @@ void testPurgeTableFailureEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "table", table.name()); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.purgeTable(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(PurgeTableFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -225,7 +225,7 @@ void testListTableFailureEvent() { Namespace namespace = Namespace.of("metalake", "catalog"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.listTables(namespace)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(namespace.toString(), event.identifier().toString()); Assertions.assertEquals(ListTableFailureEvent.class, event.getClass()); Assertions.assertEquals( diff --git a/core/src/test/java/org/apache/gravitino/listener/api/event/TestTopicEvent.java b/core/src/test/java/org/apache/gravitino/listener/api/event/TestTopicEvent.java index cf61006481b..268c628c51a 100644 --- a/core/src/test/java/org/apache/gravitino/listener/api/event/TestTopicEvent.java +++ b/core/src/test/java/org/apache/gravitino/listener/api/event/TestTopicEvent.java @@ -65,7 +65,7 @@ void init() { void testCreateTopicEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "topic"); dispatcher.createTopic(identifier, topic.comment(), null, topic.properties()); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(CreateTopicEvent.class, event.getClass()); TopicInfo topicInfo = ((CreateTopicEvent) event).createdTopicInfo(); @@ -76,7 +76,7 @@ void testCreateTopicEvent() { void testLoadTopicEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "topic"); dispatcher.loadTopic(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(LoadTopicEvent.class, event.getClass()); TopicInfo topicInfo = ((LoadTopicEvent) event).loadedTopicInfo(); @@ -88,7 +88,7 @@ void testAlterTopicEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "topic"); TopicChange topicChange = TopicChange.setProperty("a", "b"); dispatcher.alterTopic(identifier, topicChange); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(AlterTopicEvent.class, event.getClass()); TopicInfo topicInfo = ((AlterTopicEvent) event).updatedTopicInfo(); @@ -101,7 +101,7 @@ void testAlterTopicEvent() { void testDropTopicEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "topic"); dispatcher.dropTopic(identifier); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(DropTopicEvent.class, event.getClass()); Assertions.assertEquals(true, ((DropTopicEvent) event).isExists()); @@ -111,7 +111,7 @@ void testDropTopicEvent() { void testListTopicEvent() { Namespace namespace = Namespace.of("metalake", "catalog"); dispatcher.listTopics(namespace); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(namespace.toString(), event.identifier().toString()); Assertions.assertEquals(ListTopicEvent.class, event.getClass()); Assertions.assertEquals(namespace, ((ListTopicEvent) event).namespace()); @@ -123,7 +123,7 @@ void testCreateTopicFailureEvent() { Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.createTopic(identifier, topic.comment(), null, topic.properties())); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(CreateTopicFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -136,7 +136,7 @@ void testLoadTopicFailureEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "topic"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.loadTopic(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(LoadTopicFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -150,7 +150,7 @@ void testAlterTopicFailureEvent() { Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.alterTopic(identifier, topicChange)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(AlterTopicFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -164,7 +164,7 @@ void testDropTopicFailureEvent() { NameIdentifier identifier = NameIdentifier.of("metalake", "catalog", "topic"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.dropTopic(identifier)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(identifier, event.identifier()); Assertions.assertEquals(DropTopicFailureEvent.class, event.getClass()); Assertions.assertEquals( @@ -176,7 +176,7 @@ void testListTopicFailureEvent() { Namespace namespace = Namespace.of("metalake", "catalog"); Assertions.assertThrowsExactly( GravitinoRuntimeException.class, () -> failureDispatcher.listTopics(namespace)); - Event event = dummyEventListener.popEvent(); + Event event = dummyEventListener.popPostEvent(); Assertions.assertEquals(namespace.toString(), event.identifier().toString()); Assertions.assertEquals(ListTopicFailureEvent.class, event.getClass()); Assertions.assertEquals(