Skip to content

Commit

Permalink
Polish apache#3984 : Add Parallel implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
mercyblitz committed May 17, 2019
1 parent 693fc23 commit e9e7292
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,21 @@ public abstract class AbstractEventDispatcher implements EventDispatcher {

private final ConcurrentMap<Class<? extends Event>, List<Listener>> listenersCache = new ConcurrentHashMap<>();

private final Executor executor;

/**
* Constructor with an instance of {@link Executor}
*
* @param executor {@link Executor}
* @throws NullPointerException <code>executor</code> is <code>null</code>
*/
protected AbstractEventDispatcher(Executor executor) {
if (executor == null) {
throw new NullPointerException("executor must not be null");
}
this.executor = executor;
}

@Override
public void addListener(Listener<?> listener) throws NullPointerException, IllegalArgumentException {
assertListener(listener);
Expand Down Expand Up @@ -81,11 +96,8 @@ private <E> void addIfAbsent(Collection<E> collection, E element) {

@Override
public void dispatch(Event event) {
Executor executor = getExecutor();

if (executor == null) { // If absent, uses DIRECT_EXECUTOR
executor = DIRECT_EXECUTOR;
}
Executor executor = getExecutor();

// execute in sequential or parallel execution model
executor.execute(() -> {
Expand All @@ -100,6 +112,14 @@ public void dispatch(Event event) {
});
}

/**
* @return the non-null {@link Executor}
*/
@Override
public final Executor getExecutor() {
return executor;
}

protected void doInListener(Listener<?> listener, Consumer<Collection<Listener>> consumer) {

Class<? extends Event> eventType = findEventType(listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,9 @@
* @see EventDispatcher
* @since 2.7.2
*/
public class DirectEventDispatcher extends AbstractEventDispatcher {
public final class DirectEventDispatcher extends AbstractEventDispatcher {

public DirectEventDispatcher() {
super(DIRECT_EXECUTOR);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.common.event;

import java.util.concurrent.ForkJoinPool;

/**
* Parallel {@link EventDispatcher} implementation uses {@link ForkJoinPool#commonPool() JDK common thread pool}
*
* @see ForkJoinPool#commonPool()
* @since 2.7.2
*/
public class ParallelEventDispatcher extends AbstractEventDispatcher {

public ParallelEventDispatcher() {
super(ForkJoinPool.commonPool());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,8 @@ public final void onEvent(E event) {
public int getEventOccurs() {
return eventOccurs.get();
}

protected void println(String message) {
System.out.printf("[%s] %s\n", Thread.currentThread().getName(), message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ class EchoEventListener extends AbstractListener<EchoEvent> implements Serializa

@Override
public void handleEvent(EchoEvent event) {
System.out.println("EchoEventListener : " + event);
println("EchoEventListener : " + event);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class EchoEventListener2 extends Vector<Listener<Event>> implements Seria
private AbstractListener<Event> delegate = new AbstractListener<Event>() {
@Override
protected void handleEvent(Event event) {
System.out.println("EchoEventListener2 - " + event);
println("EchoEventListener2 : " + event);
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.common.event;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertEquals;

/**
* {@link ParallelEventDispatcher} Test
*
* @since 2.7.2
*/
public class ParallelEventDispatcherTest {

private EventDispatcher eventDispatcher;

private AbstractListener listener;

@BeforeEach
public void init() {
eventDispatcher = new ParallelEventDispatcher();
listener = new EchoEventListener();
eventDispatcher.addListener(listener);
}

@Test
public void testDispatchEvent() throws InterruptedException {
eventDispatcher.dispatch(new EchoEvent("Hello,World"));
ForkJoinPool.commonPool().awaitTermination(1, TimeUnit.SECONDS);
// event has been handled
assertEquals(1, listener.getEventOccurs());
}

@AfterAll
public static void destroy() {
ForkJoinPool.commonPool().shutdown();
}

}

0 comments on commit e9e7292

Please sign in to comment.