模块(类)之间解耦利器:EventPublishSubscribeUtils 事件发布订阅工具类

2020-05-08 00:00:00 订阅 模块 利器

如果熟悉C#语言的小伙伴们一般都会知道委托、事件的好处,只需在某个类中提前定义好公开的委托或事件(委托的特殊表现形式)变量,然后在其它类中就可以很随意的订阅该委托或事件,当委托或事件被触发执行时,会自动通知所有的订阅者进行消费处理。(观察者模式用委托来实现是最好不过了,DDD所提倡的事件驱动其根本理念也是如此),当然我这里想到的是不需要在每个类中进行定义委托或事件,而是由一个统一的中介者(即EventPublishSubscribeUtils)来提供事件的订阅及发布操作,这样各模块之间无需直接依赖,只需通过中介者完成发布通知与订阅回调即可,何乐而不为呢?

这里我先借助C#语言独有的委托类型快速实现了一个简易的EventPublishSubscribeUtils,代码如下:

    /// <summary>
    /// 自定义事件发布订阅回调工具类(业务解藕、关注点分离,避免互相依赖)--演示版
    /// EventBus简化版,观察者模式
    /// author:zuowenjun
    /// </summary>
    public static class EventPublishSubscribeUtils
    {
        private static ConcurrentDictionary<Type, EventHandler<object>> EventHandlers { get; } = new ConcurrentDictionary<Type, EventHandler<object>>();

        private static void removeRegisters(ref EventHandler<object> srcEvents, EventHandler<object> removeTargetEvents)
        {
            var evtTypes = removeTargetEvents.GetInvocationList().Select(d => d.GetType());
            var registeredEventHandlers = Delegate.Combine(srcEvents.GetInvocationList().Where(ei => evtTypes.Contains(ei.GetType())).ToArray());
            srcEvents -= (EventHandler<object>)registeredEventHandlers;
        }

        public static void Register<T>(EventHandler<object> eventHandlers)
        {
            EventHandlers.AddOrUpdate(typeof(T), eventHandlers,
                (t, e) =>
                {
                    //先根据订阅委托类型匹匹配过滤掉之前已有的相同订阅,然后再重新订阅,防止重复订阅,多次执行的情况。
                    removeRegisters(ref e, eventHandlers);
                    e += eventHandlers;
                    return e;
                });
        }


        public static void UnRegister<T>(EventHandler<object> eventHandlers = null)
        {
            Type eventMsgType = typeof(T);
            if (eventHandlers == null)
            {
                EventHandlers.TryRemove(eventMsgType, out eventHandlers);
                return;
            }

            var e = EventHandlers[eventMsgType];
            removeRegisters(ref e, eventHandlers);
        }

        public static void PublishEvent<T>(T eventMsg, object sender)
        {
            Type eventMsgType = eventMsg.GetType();
            if (EventHandlers.ContainsKey(eventMsgType))
            {
                EventHandlers[eventMsgType].Invoke(sender, eventMsg);
            }
        }
    }

然后使用就比较简单了,我们只需通过EventPublishSubscribeUtils.Register注册订阅事件消息,通过EventPublishSubscribeUtils.PublishEvent发布事件通知,这样就可以让两个甚至多个不相关的模块(类)能够通过消息类型实现1对多的通讯与协同处理。使用示例代码如下:


    class EventMessage
    {
        public string Name { get; set; }

        public string Msg { get; set; }

        public DateTime CreatedDate { get; set; }
    }

    class DemoA
    {
        public DemoA()
        {
            EventHandler<object> eventHandlers = EventCallback1;
            eventHandlers += EventCallback2;

            EventPublishSubscribeUtils.Register<EventMessage>(eventHandlers);
        }

        private void EventCallback1(object sender, object e)
        {
            string json = JsonConvert.SerializeObject(e);
            System.Diagnostics.Debug.WriteLine($"EventCallback1=> sender:{sender},e:{json}");
        }

        private void EventCallback2(object sender, object e)
        {
            string json = JsonConvert.SerializeObject(e);
            System.Diagnostics.Debug.WriteLine($"EventCallback2=> sender:{sender},e:{json}");
        }

    }

    class DemoB
    {
        public void ShowMsg(string name, string msg)
        {
            System.Diagnostics.Debug.WriteLine($"ShowMsg=> name:{name},msg:{msg}");
            var eventMsg = new EventMessage
            {
                Name = name,
                Msg = msg,
                CreatedDate = DateTime.Now
            };
            EventPublishSubscribeUtils.PublishEvent(eventMsg, nameof(DemoB.ShowMsg));
        }
    }

//main方法中使用:
var demoA = new DemoA();
var demoB = new DemoB();

demoB.ShowMsg("梦在旅途", "i love csharp and java!");

从上述示例代码中可以看出,DemoA与DemoB各为独立,互不依赖,它们都不知道有对方的存在,它们只关心业务的处理,通过执行demoB.ShowMsg方法进而触发回调demoA.EventCallback1,demoA.EventCallback2方法,是不是比起直接从DemoA中调DemoB更好呢?

c#有委托类型(方法的引用),那如果是在java中该如何实现呢?

其实同理,我们可以借助匿名内部类+匿名实现类的方式(如:函数式接口)实现与C#异曲同工的效果,同样可以实现类似的事件发布与订阅功能,如下便是采用java语言的实现EventPublishSubscribeUtils类的代码:

这个因项目需要,我特意实现了两种模式,一种支持1对多的普通方式,另一种支持1对1的订阅回调方式,有返回值。


/**
 * 自定义事件发布订阅回调工具类(业务解藕、关注点分离,避免互相依赖)
 * EventBus简化版,观察者模式
 * <pre>
 * 支持两种模式
 * 1.无返回值:订阅事件消费(register)+ 发布事件消息(publishEvent/publishEventAsync)
 * 2.有返回值:监听回调通知处理(listenCallback)+通知回调(notifyCallback),通过notifyMessageType+MessageChannel 即可标识唯一的一组通知回调与监听回调处理
 * <pre>
 * @author zuowenjun
 * @date 20200310
 */
public final class EventPublishSubscribeUtils {

    private static final Logger LOGGER = LoggerFactory.getLogger(EventPublishSubscribeUtils.class);

    private static final Map<Class<?>, LinkedList<Consumer<Object>>> eventConsumers = new ConcurrentHashMap<>();

    private static final Map<Class<?>, ConcurrentHashMap<MessageChannel, Function<Object, Object>>> callbackFuncs = new ConcurrentHashMap<>();

    private EventPublishSubscribeUtils() {
    }


    /**
     * 注册事件回调消费者
     * 用法:EventSubscribeConsumeUtils.register(this::xxxx方法) 或lambda表达式
     * 注意:若回调方法添加了事务注解,则应指派其代理对象的方法来完成回调,如:
     * EventSubscribeConsumeUtils.register((xxxService)SpringUtils.getBean(this.class)::xxxx方法)
     *
     * @param eventConsumer
     */
    public static void register(Class<?> eventMessageType, Consumer<Object> eventConsumer) {

        if (eventConsumer == null) {
            return;
        }

        LinkedList<Consumer<Object>> eventConsumerItems = null;
        if (!eventConsumers.containsKey(eventMessageType)) {
            eventConsumers.putIfAbsent(eventMessageType, new LinkedList<>());
        }
        eventConsumerItems = eventConsumers.get(eventMessageType);

        eventConsumerItems.add(eventConsumer);
    }

    /**
     * 取消订阅回调
     *
     * @param eventMessageType
     * @param eventConsumer
     */
    public static void unRegister(Class<?> eventMessageType, Consumer<Object> eventConsumer) {
        if (!eventConsumers.containsKey(eventMessageType)) {
            return;
        }

        LinkedList<Consumer<Object>> eventConsumerItems = eventConsumers.get(eventMessageType);
        int eventConsumerIndex = eventConsumerItems.indexOf(eventConsumer);
        if (eventConsumerIndex == -1) {
            return;
        }
        eventConsumerItems.remove(eventConsumerIndex);
    }


    /**
     * 发布事件,同步触发执行回调事件消费者方法(存在阻塞等待),即事件消息生产者
     * 用法:在需要触发事件消息回调时调用,如:publishEvent(eventMessage);
     *
     * @param eventMessage
     */
    public static <T> void publishEvent(T eventMessage) {
        Class<?> eventMessageType = eventMessage.getClass();

        if (!eventConsumers.containsKey(eventMessageType)) {
            return;
        }

        LOGGER.info("事件已发布,正在执行通知消费:{}", JSONObject.toJSONString(eventMessage));

        for (Consumer<Object> eventConsumer : eventConsumers.get(eventMessageType)) {
            try {
                eventConsumer.accept(eventMessage);
            } catch (Exception ex) {
                LOGGER.error("eventConsumer.accept error:{},eventMessageType:{},eventMessage:{}",
                        ex, eventMessageType, JSONObject.toJSONString(eventMessage));
            }
        }
    }


    /**
     * 发布事件,异步触发执行回调事件消费者方法(异步非阻塞),即事件消息生产者
     * 用法:在需要触发事件消息回调时调用,如:publishEventAsync(eventMessage);
     *
     * @param eventMessage
     */
    public static <T> void publishEventAsync(final T eventMessage) {
        Executor asyncTaskExecutor = (Executor) SpringUtils.getBean("asyncTaskExecutor");
        asyncTaskExecutor.execute(() -> {
            publishEvent(eventMessage);
        });
    }


    /**
     * 监听回调处理(需要有返回值),即有返回值的回调消费者
     *
     * @param notifyMessageType
     * @param messageChannel
     * @param callbackFunc
     */
    public static void listenCallback(Class<?> notifyMessageType, MessageChannel messageChannel, Function<Object, Object> callbackFunc) {
        if (!callbackFuncs.containsKey(notifyMessageType)) {
            callbackFuncs.putIfAbsent(notifyMessageType, new ConcurrentHashMap<>());
        }

        Map<MessageChannel, Function<Object, Object>> functionMap = callbackFuncs.get(notifyMessageType);
        if (!functionMap.containsKey(messageChannel)) {
            functionMap.putIfAbsent(messageChannel, callbackFunc);
        } else {
            LOGGER.error("该通知消息类型:{}+消息通道:{},已被订阅监听,重复订阅监听无效!", notifyMessageType.getSimpleName(), messageChannel.getDescription());
        }

    }

    /**
     * 通知回调(同步等待获取监听回调的处理结果),即生产者
     *
     * @param notifyMessage
     * @param messageChannel
     * @param <R>
     * @return
     */
    @SuppressWarnings("unchecked")
    public static <R> R notifyCallback(Object notifyMessage, MessageChannel messageChannel) {
        Class<?> notifyMessageType = notifyMessage.getClass();

        Map<MessageChannel, Function<Object, Object>> functionMap = callbackFuncs.getOrDefault(notifyMessageType, null);
        if (functionMap != null) {
            Function<Object, Object> callbackFunction = functionMap.getOrDefault(messageChannel, null);
            if (callbackFunction != null) {
                LOGGER.info("通知回调消息已发布,正在执行回调处理:{},messageChannel:[{}]", JSONObject.toJSONString(notifyMessage), messageChannel.getDescription());
                Object result = callbackFunction.apply(notifyMessage);
                try {
                    return (R) result;
                } catch (ClassCastException castEx) {
                    throw new ClassCastException(String.format("监听回调处理后返回值实际类型与发布通知回调待接收的值预期类型不一致,导致类型转换失败:%s," +
                                    "请确保notifyCallback与listenCallback针对通知消息类型:%s+消息通道:%s返回值类型必需一致。",
                            castEx.getMessage(), notifyMessageType.getSimpleName(), messageChannel.getDescription()));
                }

            }
        }
        return null;
    }


}

当然如果需要实现1对1的通讯,除了指定消息类型外,还需要指定消息通讯通道(即:唯一标识),目的是可以实现同一种消息类型,支持不同的点对点的处理。

/**
 * 自定义消息通道
 * 作用:用于识别同一个消息类型下不同的监听回调者(notifyMessage+messageChannel 即可标识唯一的一组通知回调[生产者]与监听回调[消费者])
 * @author zuowenjun
 * @date 2020-03-31
 */
public enum MessageChannel {
    None("无效"),
    MSG_A("测试消息A"),
    ;

    private String description;

    MessageChannel(String description) {
        this.description=description;
    }

    public String getDescription() {
        return description;
    }
}

使用方法示例代码如下:


@Service
public class DemoAService {

    private static final Logger LOGGER = LoggerFactory.getLogger(DemoAService.class);
    

    public void showMsg(String name, String msg) {
        System.out.printf("【%1$tF %1$tT.%1$tL】hello!%s,DemoAService showMsg:%s %n", new Date(), name, msg);

        EventMessage eventMessage = new EventMessage();
        eventMessage.setName("aaa");
        eventMessage.setMsg("test");
        eventMessage.setCreatedDate(new Date());
        EventPublishSubscribeUtils.publishEvent(eventMessage);

        String msgJsonStr = EventPublishSubscribeUtils.notifyCallback(eventMessage, MessageChannel.MSG_A);

        System.out.printf("【%1$tF %1$tT.%1$tL】DemoAService showMsg notifyCallback json result:%2$s %n", new Date(), msgJsonStr);
    }

}


@Service
public class DemoBService {

    @Autowired
    private DemoAService demoAService;

    @PostConstruct
    public void init(){

        //订阅消费,无返回值,支持1对多,即:同一个消息类型可同时被多个消费者订阅
        EventPublishSubscribeUtils.register(EventMessage.class,this::showFinishedMsg);

        //订阅监听回调,有返回值,只能1对1
        EventPublishSubscribeUtils.listenCallback(EventMessage.class, MessageChannel.MSG_A,this::getMsgCallbak);
    }

    private void showFinishedMsg(Object eventMsg){
        EventMessage eventMessage=(EventMessage)eventMsg;
        System.out.printf("【%1$tF %1$tT.%1$tL】%s,receive msg:%s doing...%n",
                eventMessage.getCreatedDate(),eventMessage.getName(),eventMessage.getMsg());

        //模拟逻辑处理
        try {
            Thread.sleep(500L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.printf("【%1$tF %1$tT.%1$tL】%s,do finished!!!%n",new Date(),eventMessage.getName());
    }

    private String getMsgCallbak(Object eventMsg){
        EventMessage eventMessage=(EventMessage)eventMsg;
        eventMessage.setMsg(eventMessage.getMsg()+"--callback added!");
        eventMessage.setCreatedDate(new Date());

        System.out.printf("【%1$tF %1$tT.%1$tL】%s,do msg callback!!!%n",new Date(),eventMessage.getName());

        return JSONObject.toJSONString(eventMessage);
    }

}


如上代码所示,我们借助于EventPublishSubscribeUtils,解耦了两个Service Bean之间的依赖,避免了循环依赖的问题,去掉了之前为了解决循环依赖而使用@Lazy注解的方式,更易于扩展与更改。其实Spring底层也使用了类似的Event机制,说明这种方式还是有合适的用武之地的。

这里我通过简单的关系图来对比未引用EventPublishSubscribeUtils前与引用后的区别,大家可以感受一下哪种更方便:
之前:
《模块(类)之间解耦利器:EventPublishSubscribeUtils 事件发布订阅工具类》

之后:
《模块(类)之间解耦利器:EventPublishSubscribeUtils 事件发布订阅工具类》

最后,关于业务解耦,分清业务边界,我个人认为跨进程通讯使用MQ,同进程跨多模块(类,或者说跨多业务边界)可使用Event事件驱动思路来解决。大家觉得如何呢?如果有更好的方案欢迎评论交流,谢谢。

相关文章