日志

EventBus/EventQueue 再思考

 来源    2020-05-23    0  

EventBus/EventQueue 再思考

Intro

之前写过两篇文章,造轮子系列的 EventBus/EventQueue,回想起来觉得当前的想法有点问题,当时对 EvenStore 可能有点误解,有兴趣可以参考 https://www.cnblogs.com/weihanli/p/implement-a-simple-event-bus.html/https://www.cnblogs.com/weihanli/p/implement-event-queue.html,

最近把 Event 相关的逻辑做了一个重构,修改 EventStore,重新设计了 Event 相关的组件

重构后的 Event

  • Event: 事件的抽象定义
  • EventHandler:事件处理器抽象定义
  • EventHandlerFactory:事件处理器工厂,用来根据事件类型获取事件处理器(新增)
  • EventPublisher:事件发布器,用于事件发布
  • EventSubscriber:事件订阅器,用于管理事件的订阅
  • EventSubscriptionManager:事件订阅管理器,在 EventSubscriber 的基础上增加了一个根据事件类型获取事件订阅器类型的方法
  • EventBus:事件总线,由 EventPubliser 和 EventSubscriber 组合而成,用来比较方便的做事件发布和订阅
  • EventQueue:事件队列,希望某些消息顺序处理的时候可以考虑用 EventQueue 的模式
  • EventStore:事件存储,事件的持久化存储(在之前的版本里,EventStore 实际作用是一个 EventSubscriptionManager,在最近的版本更新中已修改)

以上 EventSubscriberEventSubscriptionManager 一般不直接用,一般用 EventBus 来处理即可

EventHandlerFactory

这次引入了 EventHandlerFactory 用来抽象获取 EventHandler 的逻辑,原来的设计里是在处理 Event 的时候获取 EventHandler 的类型,然后从依赖注入框架中获取或创建新的 event handler 实例之后再调用 EventHandler 的 Handle 方法处理事件,有一些冗余

使用 EventHandlerFactory 之后就可以直接获取一个 EventHandler 实例集合,具体是实例化还是从依赖注入中获取就由 EventHandlerFactory 来决定了,这样就可以对依赖注入很友好,对于基于内存的简单 EventBus 来说,在服务注册之后就不需要再调用 Subscribe 去显式订阅了,因为再注册服务的时候就已经隐式实现了订阅的逻辑,这样实际就不需要 EventSubscriptionManager 来管理订阅了,订阅信息都在依赖注入框架内部,比如说 CounterEvent,要获取它的订阅信息,我只需要从依赖注入框架中获取 IEventHandler<CounterEvent> 的实例即可,实际就代替了原先 “EventStoreInMemory”,现在的 EventSubscriptionManagerInMemory

基于依赖注入的 EventHandlerFactory 定义:

public sealed class DependencyInjectionEventHandlerFactory : IEventHandlerFactory
{
    private readonly IServiceProvider _serviceProvider;

    public DependencyInjectionEventHandlerFactory(IServiceProvider serviceProvider = null)
    {
        _serviceProvider = serviceProvider ?? DependencyResolver.Current;
    }

    public ICollection<IEventHandler> GetHandlers(Type eventType)
    {
        var eventHandlerType = typeof(IEventHandler<>).MakeGenericType(eventType);
        return _serviceProvider.GetServices(eventHandlerType).Cast<IEventHandler>().ToArray();
    }
}

如果不使用依赖注入,也可以根据 IEventSubscriptionManager 订阅信息来实现:

public sealed class DefaultEventHandlerFactory : IEventHandlerFactory
{
    private readonly IEventSubscriptionManager _subscriptionManager;
    private readonly ConcurrentDictionary<Type, ICollection<IEventHandler>> _eventHandlers = new ConcurrentDictionary<Type, ICollection<IEventHandler>>();
    private readonly IServiceProvider _serviceProvider;

    public DefaultEventHandlerFactory(IEventSubscriptionManager subscriptionManager, IServiceProvider serviceProvider = null)
    {
        _subscriptionManager = subscriptionManager;
        _serviceProvider = serviceProvider ?? DependencyResolver.Current;
    }

    public ICollection<IEventHandler> GetHandlers(Type eventType)
    {
        var eventHandlers = _eventHandlers.GetOrAdd(eventType, type =>
        {
            var handlerTypes = _subscriptionManager.GetEventHandlerTypes(type);
            var handlers = handlerTypes
                .Select(t => (IEventHandler)_serviceProvider.GetServiceOrCreateInstance(t))
                .ToArray();
            return handlers;
        });
        return eventHandlers;
    }
}

EventQueue Demo

来看一下 EventQueue 的示例,示例基于 asp.net core 的,定义了一个 HostedService 来实现一个 EventConsumer 来消费 EventQueue 中的事件信息

EventConsumer 定义如下:

public class EventConsumer : BackgroundService
{
    private readonly IEventQueue _eventQueue;
    private readonly IEventHandlerFactory _eventHandlerFactory;

    public EventConsumer(IEventQueue eventQueue, IEventHandlerFactory eventHandlerFactory)
    {
        _eventQueue = eventQueue;
        _eventHandlerFactory = eventHandlerFactory;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            var queues = await _eventQueue.GetQueuesAsync();
            if (queues.Count > 0)
            {
                await queues.Select(async q =>
                        {
                            var @event = await _eventQueue.DequeueAsync(q);
                            if (null != @event)
                            {
                                var handlers = _eventHandlerFactory.GetHandlers(@event.GetType());
                                if (handlers.Count > 0)
                                {
                                    await handlers
                                            .Select(h => h.Handle(@event))
                                            .WhenAll()
                                        ;
                                }
                            }
                        })
                        .WhenAll()
                    ;
            }

            await Task.Delay(1000, stoppingToken);
        }
    }
}

定义 PageViewEventPageViewEventHandler,用来记录和处理请求访问记录

public class PageViewEvent : EventBase
{
}

public class PageViewEventHandler : EventHandlerBase<PageViewEvent>
{
    public static int Count;

    public override Task Handle(PageViewEvent @event)
    {
        Interlocked.Increment(ref Count);
        return Task.CompletedTask;
    }
}

事件很简单,事件处理也只是增加了 PageViewEventHandler 内定义的 Count。

服务注册:

// 注册事件核心组件
// 会注册 EventBus、EventHandlerFactory、EventQueue 等
services.AddEvents()
    // 注册 EventHanlder 
    .AddEventHandler<PageViewEvent, PageViewEventHandler>()
    ;
// 注册 EventQueuePubliser,默认注册的 IEventPublisher 是 EventBus
services.AddSingleton<IEventPublisher, EventQueuePublisher>();
// 注册 EventConsumer
services.AddHostedService<EventConsumer>();

事件发布,定义了一个中间件来发布 PageViewEvent,定义如下:

// pageView middleware
app.Use((context, next) =>
        {
            var eventPublisher = context.RequestServices.GetRequiredService<IEventPublisher>();
            eventPublisher.Publish(new PageViewEvent());

            return next();
        });

然后定义一个接口来获取上面定义的 PageViewEventHandler 中的 Count

[Route("api/[controller]")]
public class EventsController : ControllerBase
{
    [HttpGet("pageViewCount")]
    public IActionResult Count()
    {
        return Ok(new { Count = PageViewEventHandler.Count });
    }
}

运行起来之后,访问几次接口,看上面的接口返回 Count 是否会增加,正常的话每访问一次接口就会增加 1,并发访问问题也不大,因为每个事件都是顺序处理的,即使并发访问也没有关系,事件发布之后,在队列里都是顺序处理的,这也就是引入事件队列的目的(好像上面的原子递增没什么用了...) 如果没看到了增加,稍等一会儿再访问试试,事件处理会迟到,但总会处理,毕竟是异步处理的,有些延迟很正常,而且上面我们还有一个 1s 的延迟

More

作者水平有限,如果上述有哪些不对的地方还望指出,万分感谢

Reference

相关文章
typedef的用法再思考
日志最近重读c语法,有所感悟,记录. 有时候感悟,其实就是猜,假想,作者创建语言的想法,通俗的讲就是丹尼斯灵魂附体了,这个时候任何c语言难点对于你来说,就像吃饭喝水一样简单了,同时还能发现它优美动人之处. ...
1
vijos p1523 贪吃的九头龙 思考思考再思考,就荒废了4小时
日志树形DP要有自己的风格,转二叉树是基础,考虑边界最头疼. #include<cstdio> #include<cstring> #include<algorithm> ...
二分查找之再思考
日志一.一般的二分查找 一般的二分查找即,输出要查找元素在数组中的位置,这里的位置没有特殊限定,没有要求是数字第一次出现的位置,也没有要求是最后一次出现的位置. int getPos(vector< ...
动态规划再思考
日志动态规划一般用来求解最优化问题.需要注意的是它与贪心算法的区别,贪心算法也是用来解决最优化问题的.动态规划和贪心算法都需要满足最优子结构特征,不同的是动态规划还需要满足重叠子问题特征,而贪心算法还需要 ...
1
CSS 思考和再学习——关于CSS中浮动和定位对元素宽度/外边距/其他元素所占空间的影响
日志一.width:auto和width:100%的区别 1.width:100%的作用是占满它的参考元素的宽度.(一般情况下参考元素 == 父级元素,这里写成参考元素而不是父级元素,在下面我会再细说) ...
从$emit 到 父子组件通信 再到 eventBus
日志故事还是得从$emit说起,某一天翻文档的时候看到$emit的说明 触发当前实例上的事件?就是自身组件上的事件呗,在父子组件通信中,父组件通过props传递给子组件数据(高阶组件可以用provide和 ...
算法思考(1)别再用递归计算斐波那契数列了!
日志曾经学习到递归时,相信绝大部分人都使用过斐波那契数列来学习递归吧. 当初我学习递归是老师还刻意让我们思考如何优化其性能,于是我们加了一些变量.参数 用于传递数据减少内存消耗,或者讲递归分割,分割成多个 ...
1
沉淀,再出发:Git的再次思考
日志沉淀,再出发:Git的再次思考 一.前言    使用git也有很久了,后来有一段时间一直没有机会去使用,现在想来总结一下自己学习了这么长时间的一些心得感悟,我写的博客一般都是开了一个轮廓和框架,等到以 ...
3
java – Gwt 2.4中的EventBus和Gin问题
问答我正试图在MVP GWT 2.4中使用Gin.在我的模块中,我有: import com.google.web.bindery.event.shared.EventBus; import com.go ...
2
java – 为什么我不能再从Spark应用程序中读取AWS S3了?
问答我已升级到Apache Spark 1.5.1,但我不确定是否会导致它.我在spark-submit中有我的访问键,它始终有效. Exception in thread "main" ...
1
bash:如何重定向stdin/stderr然后再恢复fd?
问答我想要一个脚本将stdin和stderr重定向到一个文件,做一堆东西,然后撤消那些重定向并对文件内容采取行动.我尝试着: function redirect(){ exec 3>&1 e ...
2
递归表达式的R警告消息:如果失败,请尝试再试一次
问答我想创建一个函数,如果失败,将重试表达式.这是我的工作版本: retry <- function(.FUN, max.attempts=3, sleep.seconds=1) { x <- ...
1
java – 为什么在将字节数组转换为String然后再转换为字节数组时长度会有所不同?
问答我有以下Java代码: byte[] signatureBytes = getSignature(); String signatureString = new String(signatureByt ...
1
iphone – 无法使用蓝牙 – 稍后再试
问答昨天,我为我的iPhone应用程序编写了一个简单的蓝牙网络.今天早上,当我试图继续开发时,它根本没有在我的iPod Touch 2G上工作.然而,它在iPAD上运行得很好.当我尝试在创建服务器会话时在 ...
android – 如何隐藏动作栏之前活动创建,然后再显示它?
问答我需要在我的蜂窝应用程序实现闪屏. 我在活动的onCreate中使用这个代码来显示splash: setContentView(R.layout.splash); getActionBar().hid ...
1
Java EventQueue.我什么时候应该考虑使用它?
问答我目前正在查看Oracle网站上的EventQueue类: http://download.oracle.com/javase/1.4.2/docs/api/java/awt/EventQueue.h ...
1
如何将符号链接文件从Linux复制到Windows,然后再复制到Linux,但仍将其保留为符号链接
问答我的Linux机器上有一个符号链接. 我想只将符号链接(不是目标)复制到Windows机器,然后将此符号链接从Windows机器复制回其他Linux机器,符号链接应继续工作. 我尝试了什么: > ...
2
javascript – 如何使循环等待,直到异步调用成功再继续
问答您好我正在使用for循环和异步ajax调用为我的本地商店创建批量更新. 我的问题是,即使我的ajax调用仍未成功完成,我的循环仍在继续. 在继续循环之前,我们如何设法让循环等待单元获得ajax响应的响 ...
1
javascript – 等待多个异步调用完成,然后再继续
问答所以,我有一个页面加载和通过jquery.get使几个请求填充下拉与他们的值. $(function() { LoadCategories($('#Category')); LoadPositions ...
1
JavaScript等待,直到图像完全加载,然后再继续执行脚本
问答我一直在寻找很多JavaScript的答案,但我还没有找到一个真正回答我的问题的答案.我想要做的是加载图像,获取像素数据,执行分析,然后加载另一个图像以重复该过程. 我的问题是,我无法预加载所有的图像 ...
1