5 分钟
Java 事件驱动模型实现
参考:
- Hadoop Yarn 部分源码
实验代码:https://github.com/rectcircle/java-event-driver-learn
简介
事件驱动模型是一种编程范式,一般应用于:
- 在网络编程中,一般情况下会与非阻塞IO(NIO)相结合,实现为应用层的 异步IO (AIO)。
- 在UI编程中,事件驱动模型也是几乎所有UI框架的标配
- 在业务层,面对需要阻塞等待的场景,事件驱动模型一般是处理此类问题的性能最好的解决方案
总的来说,在面对需要“等待”、“阻塞”、“轮询”的场景,都需要事件驱动模型来解决。
事件驱动模型也存在一些问题:
- 流程不清晰,逻辑相对复杂,代码不易阅读,调试困难
放在分布式系统中,与之类似的是 消息驱动模型
概念
Event
事件Dispatcher
分发器,DriverEventHandler
事件处理器
使用方式
- 定义
EventHandler
进行事件处理逻辑 - 调用
Dispatcher
注册EventHandler
- 在合适的地方 调用
Dispatcher.dispatchEvent
的 触发事件
流程
- 合适的位置 调用
Dispatcher.dispatchEvent
发送一个Event
Dispatcher
的 事件循环 取出Event
,分发给每一个EventHandler
Java 实现
新建 Maven 项目
mvn org.apache.maven.plugins:maven-archetype-plugin:3.1.2:generate -DarchetypeArtifactId="maven-archetype-quickstart" -DarchetypeGroupId="org.apache.maven.archetypes" -DarchetypeVersion="1.4"
添加日志依赖
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
核心接口
Event
src/main/java/cn/rectcircle/learn/event/Event.java
package cn.rectcircle.learn.event;
/**
* Interface defining events api.
* 事件接口
* @author
*/
public interface Event<T extends Enum<T>> {
/**
* 事件类型
* @return
*/
T getType();
/**
* 事件触发的时间戳
* @return 时间戳
*/
long getTimestamp();
/**
* 获取分发器
*
* @return {@link Dispatcher}
*/
Dispatcher getDispatcher();
/**
* human string of event
* @return
*/
@Override
String toString();
}
EventHandler
src/main/java/cn/rectcircle/learn/event/EventHandler.java
package cn.rectcircle.learn.event;
/**
* Interface for handling events of type T
* 事件处理器接口
*
* @param <T> 事件类型的类型
* @param <E> parameterized event of type T 事件的类型
* @author
*/
@FunctionalInterface
public interface EventHandler<T extends Enum<T>, E extends Event<T>> {
/**
* 处理函数
* @param event 事件
*/
void handle(E event);
}
Dispatcher
package cn.rectcircle.learn.event;
/**
* Event Dispatcher interface. It dispatches events to registered event handlers
* based on event types. 事件分发器。用于将事件分发到注册到 该 事件 eventType 的 EventHandler
*
* @author
*/
public interface Dispatcher {
/**
* 触发一个事件
* @param event 一个事件
*/
<T extends Enum<T>, E extends Event<T>> void dispatchEvent(E event);
/**
* 针对一种事件类型,注册一个消息处理器
* @param eventType 事件类型
* @param handler 事件处理器
*/
<T extends Enum<T>, E extends Event<T>> void register(T eventType, EventHandler<T, E> handler);
/**
* 针对一种事件类型的类型,注册一个消息处理器,即所有该 class 的类型都会触发该处理器
*
* @param eventTypeClazz 事件类型的类型
* @param handler 事件处理器
*/
<T extends Enum<T>, E extends Event<T>> void register(Class<T> eventTypeClazz, EventHandler<T, E> handler);
}
EventHandler
src/main/java/cn/rectcircle/learn/event/EventHandler.java
package cn.rectcircle.learn.event;
/**
* Interface for handling events of type T
* 事件处理器接口
*
* @param <T> 事件类型的类型
* @param <E> parameterized event of type T 事件的类型
* @author
*/
@FunctionalInterface
public interface EventHandler<T extends Enum<T>, E extends Event<T>> {
/**
* 处理函数
* @param event 事件
*/
void handle(E event);
}
Dispatcher 的 实现
src/main/java/cn/rectcircle/learn/event/AsyncDispatcher.java
package cn.rectcircle.learn.event;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Dispatches {@link Event}s in a separate thread. Currently only single thread
* does that. Potentially there could be multiple channels for each event type
* class and a thread pool can be used to dispatch the events.
* 异步调度器:
* <ui>
* <li>每一个事件类型允许注册一个或多个事件处理器</li>
* <li>事件循环运行在单独的线程中</li>
* <li>事件处理器与事件循环运行在不同的线程中(异步),同一个事件的多个事件处理器运行在同一线程中(同步)</li>
* </ui>
* @author
*/
public class AsyncDispatcher implements Dispatcher {
private static final Logger LOG = LoggerFactory.getLogger(AsyncDispatcher.class);
private static final int EVENT_PRINT_THRESHOLD = 1000;
private final BlockingQueue<Event<?>> eventQueue;
private volatile boolean stopped = false;
private Thread eventHandlingThread;
private final ExecutorService eventHandlingPool;
protected final Map<Object, EventHandler<?, ?>> eventDispatchers;
public AsyncDispatcher(ExecutorService eventHandlingPool) {
this(eventHandlingPool, new LinkedBlockingQueue<>());
}
public AsyncDispatcher(ExecutorService eventHandlingPool, BlockingQueue<Event<?>> eventQueue) {
this.eventQueue = eventQueue;
this.eventDispatchers = new HashMap<>();
this.eventHandlingPool = eventHandlingPool;
}
/**
* 事件循环逻辑
*/
Runnable createThread() {
return () -> {
while (!stopped && !Thread.currentThread().isInterrupted()) {
Event<?> event;
try {
event = eventQueue.take();
} catch (InterruptedException ie) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", ie);
}
return;
}
dispatch(event);
}
};
}
/**
* 启动事件循环
*/
public void serviceStart() {
// 创建一个单线程的线程池
ThreadPoolExecutor singleThread = new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), runnable -> {
eventHandlingThread = new Thread(runnable);
eventHandlingThread.setName("AsyncDispatcher event handler");
return eventHandlingThread;
});
singleThread.execute(createThread());
}
/**
* 关闭事件循环
*/
public void serviceStop() {
stopped = true;
if (eventHandlingThread != null) {
eventHandlingThread.interrupt();
try {
eventHandlingThread.join();
} catch (InterruptedException ie) {
LOG.warn("Interrupted Exception while stopping", ie);
}
}
}
/**
* 分发函数,事件循环调用
*/
protected <T extends Enum<T>, E extends Event<T>> void dispatch(E event) {
if (LOG.isDebugEnabled()) {
LOG.debug("Dispatching the event " + event.getClass().getName() + "."
+ event.toString());
}
T type = event.getType();
try {
@SuppressWarnings("unchecked")
EventHandler<T, E> handler1 = (EventHandler<T, E>) eventDispatchers.get(type.getClass());
@SuppressWarnings("unchecked")
EventHandler<T, E> handler2 = (EventHandler<T, E>) eventDispatchers.get(type);
if (handler1 == null && handler2 == null) {
throw new Exception("No handler for registered for " + type);
}
// 提交事件处理到 worker 线程池
eventHandlingPool.execute(() -> {
if (handler1 != null) {
handler1.handle(event);
}
if (handler2 != null) {
handler2.handle(event);
}
});
} catch (Throwable t) {
LOG.error("something happen in handle", t);
}
}
private <T extends Enum<T>, E extends Event<T>> void internalRegister(Object eventType, EventHandler<T, E> handler){
// check to see if we have a listener registered
// 检查是否已注册侦听器
@SuppressWarnings("unchecked")
EventHandler<T, E> registeredHandler = (EventHandler<T, E>) eventDispatchers.get(eventType);
LOG.info("Registering " + eventType + " for " + handler.getClass());
if (registeredHandler == null) {
eventDispatchers.put(eventType, handler);
} else if (!(registeredHandler instanceof MultiListenerHandler)) {
MultiListenerHandler<T, E> multiHandler = new MultiListenerHandler<>();
multiHandler.addHandler(registeredHandler);
multiHandler.addHandler(handler);
eventDispatchers.put(eventType, multiHandler);
} else {
// already a multilistener, just add to it
// 已经是 multilistener,只需添加即可
MultiListenerHandler<T, E> multiHandler = (MultiListenerHandler<T, E>) registeredHandler;
multiHandler.addHandler(handler);
}
}
/**
* 注册函数,某事件类型注册第一个处理器时直接注册,注册第二个时创建使用 {@link MultiListenerHandler} 进行包裹
*/
@Override
public <T extends Enum<T>, E extends Event<T>> void register(T eventType,
EventHandler<T, E> handler) {
this.internalRegister(eventType, handler);
}
@Override
public <T extends Enum<T>, E extends Event<T>> void register(Class<T> eventTypeClazz, EventHandler<T, E> handler) {
this.internalRegister(eventTypeClazz, handler);
}
@Override
public <T extends Enum<T>, E extends Event<T>> void dispatchEvent(E event) {
int queueSize = eventQueue.size();
if (queueSize != 0 && queueSize % EVENT_PRINT_THRESHOLD == 0) {
LOG.info("Size of event-queue is " + queueSize);
}
int remCapacity = eventQueue.remainingCapacity();
if (remCapacity < EVENT_PRINT_THRESHOLD) {
LOG.warn("Very low remaining capacity in the event-queue: " + remCapacity);
}
try {
eventQueue.put(event);
} catch (InterruptedException e) {
LOG.error("interrupted while put in event queue");
throw new RuntimeException(e);
}
}
/**
* Multiplexing an event. Sending it to different handlers that
* are interested in the event.
*/
static class MultiListenerHandler<T extends Enum<T>, E extends Event<T>> implements EventHandler<T, E> {
List<EventHandler<T, E>> listofHandlers;
public MultiListenerHandler() {
listofHandlers = new ArrayList<>();
}
@Override
public void handle(E event) {
for (EventHandler<T, E> handler : listofHandlers) {
handler.handle(event);
}
}
void addHandler(EventHandler<T, E> handler) {
listofHandlers.add(handler);
}
}
}
方便用户使用事件抽象类
src/main/java/cn/rectcircle/learn/event/AbstractEvent.java
package cn.rectcircle.learn.event;
/**
* Parent class of all the events. All events extend this class.
* 所有事件的父类:包含类型和时间戳
*
* @author
*/
public abstract class AbstractEvent<T extends Enum<T>> implements Event<T> {
private final T type;
private final long timestamp;
private final Dispatcher dispatcher;
public AbstractEvent(Dispatcher dispatcher, T type) {
// We're not generating a real timestamp here. It's too expensive.
// System.currentTimeMillis 存在一定性能问题 https://www.jianshu.com/p/3fbe607600a5
this(dispatcher, type, -1L);
}
public Dispatcher getDispatcher() {
return dispatcher;
}
public AbstractEvent(Dispatcher dispatcher, T type, long timestamp) {
this.dispatcher = dispatcher;
this.type = type;
this.timestamp = timestamp;
}
@Override
public long getTimestamp() {
return timestamp;
}
@Override
public T getType() {
return type;
}
@Override
public String toString() {
return "EventType: " + getType();
}
}
测试
- 主线程 发送 一个 包含一个单词 的 事件,打印
Hello $单词
创建相关 Event 和 EventType
src/main/java/cn/rectcircle/learn/base/BaseEventType.java
package cn.rectcircle.learn.base;
/**
* @author rectcircle
*/
public enum BaseEventType {
/** 测试事件类型 */
HELLO,
/** 测试事件类型 */
HI,
;
}
src/main/java/cn/rectcircle/learn/base/BaseEvent.java
package cn.rectcircle.learn.base;
import cn.rectcircle.learn.event.AbstractEvent;
import cn.rectcircle.learn.event.Dispatcher;
/**
* @author rectcircle
*/
public class BaseEvent extends AbstractEvent<BaseEventType> {
private final String word;
public BaseEvent(final Dispatcher dispatcher, final BaseEventType type, final String word) {
super(dispatcher, type);
this.word = word;
}
public String getWord() {
return word;
}
}
整体测试
src/main/java/cn/rectcircle/learn/main/BaseUsage.java
package cn.rectcircle.learn.main;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.rectcircle.learn.base.BaseEvent;
import cn.rectcircle.learn.base.BaseEventType;
import cn.rectcircle.learn.event.AsyncDispatcher;
/**
* @author rectcircle
*/
@SuppressWarnings({"PMD.ThreadPoolCreationRule", "PMD.UndefineMagicConstantRule"})
public class BaseUsage {
private static final Logger LOG = LoggerFactory.getLogger(BaseUsage.class);
public static void main(String[] args) {
AsyncDispatcher dispatcher = new AsyncDispatcher(
Executors.newSingleThreadExecutor()
);
// 注册处理器
dispatcher.register(BaseEventType.HELLO, (BaseEvent e) -> {
LOG.info("Hello " + e.getWord());
});
dispatcher.register(BaseEventType.HI, (BaseEvent e) -> {
LOG.info("Hi " + e.getWord());
});
dispatcher.register(BaseEventType.class, (BaseEvent e) -> {
LOG.info("通过 class 注册 " + e.getWord());
});
// 启动事件循环
dispatcher.serviceStart();
// 发送事件
for (int i = 0; i < 10; i++) {
dispatcher.dispatchEvent(new BaseEvent(dispatcher, BaseEventType.HELLO, "World"));
dispatcher.dispatchEvent(new BaseEvent(dispatcher, BaseEventType.HI, "世界"));
}
}
}