ASP.NET Core Web API下事件驱动型架构的实现(二):事件处理器中对象生命周期的管理

Stella981
• 阅读 638

上文中,我介绍了事件驱动型架构的一种简单的实现,并演示了一个完整的事件派发、订阅和处理的流程。这种实现太简单了,百十行代码就展示了一个基本工作原理。然而,要将这样的解决方案运用到实际生产环境,还有很长的路要走。今天,我们就研究一下在事件处理器中,对象生命周期的管理问题。

事实上,不仅仅是在事件处理器中,我们需要关心对象的生命周期,在整个ASP.NET Core Web API的应用程序里,我们需要理解并仔细推敲被注册到IoC容器中的服务,它们的生命周期应该是个怎样的情形,这也是服务端应用程序设计必须认真考虑的内容。因为如果生命周期管理不合理,程序申请的资源无法合理释放,最后便会带来内存泄漏、程序崩溃等各种问题,然而这样的问题对于服务端应用程序来说,是非常严重的。

记得在上一篇文章的结束部分,我给大家留下一个练习,就是让大家在CustomerCreatedEventHandler事件处理器的HandleAsync方法中,填入自己的代码,以便对获得的事件消息做进一步的处理。作为本文的引子,我们首先将这部分工作做完,然后再进一步分析生命周期的问题。

Event Store

Event Store是CQRS体系结构模式中最为重要的一个组成部分,它的主要职责就是保存发生于领域模型中的领域事件,并对事件数据进行归档。当仓储需要获取领域模型对象时,Event Store也会配合快照数据库一起,根据领域事件的发生顺序,逐步回放并重塑领域模型对象。事实上,Event Store的实现是非常复杂的,虽然从它的职责上来看并不算太复杂,然而它所需要解决的事件同步、快照、性能、消息派发等问题,使得CQRS体系结构的实现变得非常复杂。在实际应用中,已经有一些比较成熟的框架和工具集,能够帮助我们在CQRS中很方便地实现Event Store,比如GetEventStore就是一个很好的开源Event Store框架,它是基于.NET开发的,在微软官方的eShopOnContainers说明文档中,也提到了这个框架,推荐大家上他们的官网(https://eventstore.org/)了解一下。在这里我们就先不深入研究Event Store应该如何实现,我们先做一个简单的Event Store,以便展示我们需要讨论的问题。

延续着上一版的代码库(https://github.com/daxnet/edasample/tree/chapter_1),我们首先在EdaSample.Common.Events命名空间下,定义一个IEventStore的接口,这个接口非常简单,仅仅包含一个保存事件的方法,代码如下:

public interface IEventStore : IDisposable
{
    Task SaveEventAsync<TEvent>(TEvent @event)
        where TEvent : IEvent;
}

SaveEventAsync方法仅有一个参数:由泛型类型TEvent绑定的@event对象。泛型约束表示SaveEventAsync方法仅能接受IEvent接口及其实现类型的对象作为参数传入。接口定义好了,下一步就是实现这个接口,对传入的事件对象进行保存。为了实现过程的简单,我们使用Dapper,将事件数据保存到SQL Server数据库中,来模拟Event Store对事件的保存操作。

Note:为什么IEventStore接口的SaveEventAsync方法签名中,没有CancellationToken参数?严格来说,支持async/await异步编程模型的方法定义上,是需要带上CancellationToken参数的,以便调用方请求取消操作的时候,方法内部可以根据情况对操作进行取消。然而有些情况下取消操作并不是那么合理,或者方法内部所使用的API并没有提供更深层的取消支持,因此也就没有必要在方法定义上增加CancellationToken参数。在此处,为了保证接口的简单,没有引入CancellationToken的参数。

接下来,我们实现这个接口,并用Dapper将事件数据保存到SQL Server中。出于框架设计的考虑,我们新建一个Net Standard Class Library项目,在这个新的项目中实现IEventStore接口,这么做的原因已经在上文中介绍过了。代码如下:

public class DapperEventStore : IEventStore
{
    private readonly string connectionString;

    public DapperEventStore(string connectionString)
    {
        this.connectionString = connectionString;
    }

    public async Task SaveEventAsync<TEvent>(TEvent @event) where TEvent : IEvent
    {
        const string sql = @"INSERT INTO [dbo].[Events] 
([EventId], [EventPayload], [EventTimestamp]) 
VALUES 
(@eventId, @eventPayload, @eventTimestamp)";
        using (var connection = new SqlConnection(this.connectionString))
        {
            await connection.ExecuteAsync(sql, new
            {
                eventId = @event.Id,
                eventPayload = JsonConvert.SerializeObject(@event),
                eventTimestamp = @event.Timestamp
            });
        }
    }

    #region IDisposable Support
    // 此处省略
    #endregion
}

IDisposable接口的实现部分暂且省略,可以看到,实现还是非常简单的:通过构造函数传入数据库的连接字符串,在SaveEventAsyc方法中,基于SqlConnection对象执行Dapper的扩展方法来完成事件数据的保存。

Note: 此处使用了JsonConvert.SerializeObject方法来序列化事件对象,也就意味着DapperEventStore程序集需要依赖Newtonsoft.Json程序集。虽然在我们此处的案例中不会有什么影响,但这样做会造成DapperEventStore对Newtonsoft.Json的强依赖,这样的依赖关系不仅让DapperEventStore变得不可测试,而且Newtonsoft.Json将来未知的变化,也会影响到DapperEventStore,带来一些不确定性和维护性问题。更好的做法是,引入一个IMessageSerializer接口,在另一个新的程序集中使用Newtonsoft.Json来实现这个接口,同时仅让DapperEventStore依赖IMessageSerializer,并在应用程序启动时,将Newtonsoft.Json的实现注册到IoC容器中。此时,IMessageSerializer可以被Mock,DapperEventStore就变得可测试了;另一方面,由于只有那个新的程序集会依赖Newtonsoft.Json,因此,Newtonsoft.Json的变化也仅仅会影响那个新的程序集,不会对框架主体的其它部分造成任何影响。

EventStore实现好了,接下来,我们将其用在CustomerCreatedEventHandler中,以便将订阅的CustomerCreatedEvent保存下来。

事件数据的保存

保存事件数据的第一步,就是在ASP.NET Core Web API的IoC容器中,将DapperEventStore注册进去。这一步是非常简单的,只需要在Startup.cs的ConfigureServices方法中完成即可。代码如下:

public void ConfigureServices(IServiceCollection services)
{
    services.AddMvc();

    services.AddTransient<IEventHandler, CustomerCreatedEventHandler>();
    services.AddTransient<IEventStore>(serviceProvider => new DapperEventStore(Configuration["mssql:connectionString"]));
    services.AddSingleton<IEventBus, PassThroughEventBus>();
}

注意我们使用的是services.AddTransient方法来注册DapperEventStore,我们希望应用程序在每次请求IEventStore实例时,都能获得一个新的DapperEventStore的实例。

接下来,打开CustomerCreatedEventHandler.cs文件,在构造函数中加入对IEventStore的依赖,然后修改HandleAsync方法,在该方法中使用IEventStore的实例来完成事件数据的保存。代码如下:

public class CustomerCreatedEventHandler : IEventHandler<CustomerCreatedEvent>
{
    private readonly IEventStore eventStore;

    public CustomerCreatedEventHandler(IEventStore eventStore)
    {
        this.eventStore = eventStore;
    }

    public bool CanHandle(IEvent @event)
        => @event.GetType().Equals(typeof(CustomerCreatedEvent));

    public async Task<bool> HandleAsync(CustomerCreatedEvent @event, CancellationToken cancellationToken = default)
    {
        await this.eventStore.SaveEventAsync(@event);
        return true;
    }

    public Task<bool> HandleAsync(IEvent @event, CancellationToken cancellationToken = default)
        => CanHandle(@event) ? HandleAsync((CustomerCreatedEvent)@event, cancellationToken) : Task.FromResult(false);
}

OK,代码修改完毕,测试一下。

ASP.NET Core Web API下事件驱动型架构的实现(二):事件处理器中对象生命周期的管理

看看数据库中客户信息是否已经创建:

ASP.NET Core Web API下事件驱动型架构的实现(二):事件处理器中对象生命周期的管理

看看数据库中事件数据是否已经保存成功:

ASP.NET Core Web API下事件驱动型架构的实现(二):事件处理器中对象生命周期的管理

OK,数据全部保存成功。

然而,事情真的就这么简单么?No。在追踪了IEventStore实例(也就是DapperEventStore)的生命周期后,你会发现,问题没有想象的那么简单。

追踪对象的生命周期

在使用services.AddTransient/AddScoped/AddSingleton/AddScoped这些方法对服务进行注册时,使用不同的方法也就意味着选择了不同的对象生命周期。在此我们也不再深入讨论每种方法之间的差异,微软官方有详细的文档和demo(抱歉我没有贴出中文链接,因为机器翻译的缘故,实在有点不堪入目),如果对ASP.NET Core的IoC容器不熟悉的话,建议先了解一下官网文章的内容。在上面我稍微提了一下,我们是用AddTransient方法来注册DapperEventStore的,因为我们希望在每次使用IEventStore的时候,都会有一个新的DapperEventStore被创建。现在,让我们来验证一下,看情况是否果真如此。

日志的使用

追踪程序执行的最有效的方式就是使用日志。在我们的场景中,使用基于文件的日志会更合适,因为这样我们可以更清楚地看到程序的执行过程以及对象的变化过程。同样,我不打算详细介绍如何在ASP.NET Core Web API中使用日志,微软官网同样有着非常详尽的文档来介绍这些内容。在这里,我简要地将相关代码列出来,以介绍如何启用基于文件的日志系统。

首先,在Web API服务的项目上,添加对Serilog.Extensions.Logging.File的nuget包,使用它能够非常方便地启用基于文件的日志。然后,打开Program.cs文件,添加ConfigureLogging的调用:

public static IWebHost BuildWebHost(string[] args) =>
    WebHost.CreateDefaultBuilder(args)
        .ConfigureLogging((context, lb) =>
        {
            lb.AddFile(LogFileName);
        })
        .UseStartup<Startup>()
        .Build();

此处LogFileName为本地文件系统中的日志文件文件名,为了避免权限问题,我将日志写入C:\Users\\appdata\local目录下,因为我的Web API进程是由当前登录用户启动的,所以写在这个目录下不会有权限问题。如果今后我们把Web API host在IIS中,那么启动IIS服务的用户需要对日志所在的目录具有写入的权限,日志文件才能被正确写入,这一点是需要注意的。

好了,现在可以使用日志了,先试试看。在Startup类的构造函数中,加入ILoggerFactory参数,并在构造函数执行时获取ILogger实例,然后在ConfigureServices调用中输出一些内容:

public class Startup
{
    private readonly ILogger logger;

    public Startup(IConfiguration configuration, ILoggerFactory loggerFactory)
    {
        Configuration = configuration;
        this.logger = loggerFactory.CreateLogger<Startup>();
    }

    public IConfiguration Configuration { get; }

    public void ConfigureServices(IServiceCollection services)
    {
        this.logger.LogInformation("正在对服务进行配置...");

        services.AddMvc();

        services.AddTransient<IEventHandler, CustomerCreatedEventHandler>();
        services.AddTransient<IEventStore>(serviceProvider => 
            new DapperEventStore(Configuration["mssql:connectionString"]));
        services.AddSingleton<IEventBus, PassThroughEventBus>();
        this.logger.LogInformation("服务配置完成,已注册到IoC容器!");
    }

    // 其它方法暂时省略
}

现在重新启动服务,然后查看日志文件,发现日志可以被正确输出:

ASP.NET Core Web API下事件驱动型架构的实现(二):事件处理器中对象生命周期的管理

接下来,使用类似的方式,向PassThroughEventBus的构造函数和Dispose方法中加入一些日志输出,在CustomersController的Create方法中、CustomerCreatedEventHandler的构造函数和HandleAsync方法中、DapperEventStore的构造函数和Dispose方法中也加入一些日志输出,以便能够观察当新的客户信息被创建时,Web API的执行过程。限于文章篇幅,就不在此一一贴出各方法中加入日志输出的代码了,大家可以根据本文最后所提供的源代码链接来获取源代码。简单地举个例子吧,比如对于DapperEventStore,我们通过构造函数注入ILogger的实例:

public class DapperEventStore : IEventStore
{
    private readonly string connectionString;
    private readonly ILogger logger;

    public DapperEventStore(string connectionString,
        ILogger<DapperEventStore> logger)
    {
        this.connectionString = connectionString;
        this.logger = logger;
        logger.LogInformation($"DapperEventStore构造函数调用完成。Hash Code:{this.GetHashCode()}.");
    }
    // 其它函数省略
}

这样一来,在DapperEventStore的其它方法中,就可以通过logger来输出日志了。

发现问题

同样,再次运行Web API,并通过Powershell发起一次创建客户信息的请求,然后打开日志文件,整个程序的执行过程基本上就一目了然了:

ASP.NET Core Web API下事件驱动型架构的实现(二):事件处理器中对象生命周期的管理

从上面的日志内容可以得知,当应用程序正常退出时,由IoC容器托管的PassThroughEventBus和DapperEventStore都能够被正常Dispose,目前看来没什么问题,因为资源可以正常释放。现在让我们重新启动Web API,连续发送两次创建客户信息的请求,再次查看日志,我们得到了下面的内容:

ASP.NET Core Web API下事件驱动型架构的实现(二):事件处理器中对象生命周期的管理

从上面的日志内容可以看到,在Web API的整个运行期间,CustomerCreatedEventHandler仅被构造了一次,而且在每次处理CustomerCreatedEvent事件的时候,都是使用同一个DapperEventStore实例来保存事件数据。也就是说,CustomerCreatedEventHandler和DapperEventStore在整个Web API服务的生命周期中,有且仅有一个实例,它们是Singleton的!然而,在进行系统架构的时候,我们应该尽量保证较短的对象生命周期,以免因为状态的不一致性导致不可回滚的错误出现,这也是架构设计中的一种最佳实践。虽然目前我们的DapperEventStore在程序正常退出的时候能够被Dispose掉,但如果DapperEventStore使用了非托管资源,并且非托管资源并没有很好地管理自己的内存呢?久而久之,DapperEventStore就产生了内存泄漏点,慢慢地,Web API就会出现内存泄漏,系统资源将被耗尽。假如Web API被部署在云中,应用程序监控装置(比如AWS的Cloud Watch)就会持续报警,并强制服务断线,整个系统的可用性就无法得到保障。所以,我们更期望DapperEventStore能够正确地实现C#的Dispose模式,在Dispose方法中合理地释放资源,并且仅在需要使用DapperEventStore时候才去构建它,用完就及时Dispose,以保证资源的合理使用。这也就是为什么我们使用services.AddTransient方法来注册CustomerCreatedEventHandler以及DapperEventStore的原因。

然而,事实却并非如此。究其原因,就是因为PassThroughEventBus是单例实例,它的生命周期是整个Web API服务。而在PassThroughEventBus的构造函数中,CustomerCreatedEventHandler被作为参数传入,于是,PassThroughEventBus产生了对CustomerCreatedEventHandler的依赖,而连带地也产生了对DapperEventStore的依赖。换句话说,在整个应用程序运行的过程中,IoC框架完全没有理由再去创建新的CustomerCreatedEventHandler以及DapperEventStore的实例,因为事件处理器作为强引用被注册到PassThroughEventBus中,而PassThroughEventBus至始至终没有变过!

Note:为什么PassThroughEventBus可以作为单例注册到IoC容器中?因为它提供了无状态的全局性的基础结构层服务:事件总线。在PassThroughEventBus的实现中,这种全局性体现得不明显,我们当然可以每一次HTTP请求都创建一个新的PassThroughEventBus来转发事件消息并作处理。然而,在今后我们要实现的基于RabbitMQ的事件总线中,如果我们还是每次HTTP请求都创建一个新的消息队列,不仅性能得不到保证,而且消息并不能路由到新创建的channel上。注意:我们将其注册成单例,一个很重要的依据是由于它是无状态的,但即使如此,我们也要注意在应用程序退出的时候,合理Dispose掉它所占用的资源。当然,在这里,ASP.NET Core的IoC机制会帮我们解决这个问题(因为我注册了PassThroughEventBus,但我没有显式调用Dispose方法,我仍然能从日志中看到“PassThroughEventBus已经被Dispose”的字样),然而有些情况下,ASP.NET Core不会帮我们做这些,就需要我们自己手工完成。

OMG!由于构造函数注入,使得对象之间产生了依赖关系,从而影响到了它们的生命周期,这可怎么办?既然问题是由依赖引起的,那么就需要想办法解耦。

解耦!解决事件处理器对象生命周期问题

经过分析,我们需要解除PassThroughEventBus对各种EventHandler的直接依赖。因为PassThroughEventBus是单例的,那么由它引用的所有组件也只可能具有相同的生命周期。然而,这样的解耦又该如何做呢?将EventHandler封装到另一个类中?结果还是一样,PassThroughEventBus总会通过某种对象关系,来间接引用到EventHandler上,造成EventHandler全局唯一。

或许,应该要有另一套生命周期管理体系来管理EventHandler的生命周期,使得每当PassThroughEventBus需要使用EventHandler对所订阅的事件进行处理的时候,都会通过这套体系来请求新的EventHandler实例,这样一来,PassThroughEventBus也就不再依赖于某个特定的实例了,而仅仅是引用了各种EventHandler在新的生命周期管理体系中的注册信息。每当需要的时候,PassThroughEventBus都会将事件处理器的注册信息传给新的管理体系,然后由这套新的体系来维护事件处理器的生命周期。

通过阅读微软官方的eShopOnContainers案例代码后,证实了这一想法。在案例中,有如下代码:

// namespace Microsoft.eShopOnContainers.BuildingBlocks.EventBusRabbitMQ
private async Task ProcessEvent(string eventName, string message)
{
    if (_subsManager.HasSubscriptionsForEvent(eventName))
    {
        using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
        {
            var subscriptions = _subsManager.GetHandlersForEvent(eventName);
            foreach (var subscription in subscriptions)
            {
                if (subscription.IsDynamic)
                { 
                    var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
                    dynamic eventData = JObject.Parse(message);
                    await handler.Handle(eventData);
                }
                else
                {
                    var eventType = _subsManager.GetEventTypeByName(eventName);
                    var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
                    var handler = scope.ResolveOptional(subscription.HandlerType);
                    var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
                    await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
                }
            }
        }
    }
}

可以看到,高亮的这一行,通过Autofac创建了一个新的LifetimeScope,在这个Scope中,通过eventName来获得一个subscription对象(也就是EventHandler的注册信息),进而通过scope的ResolveOptional调用来获得新的EventHandler实例。基本过程就是这样,目前也不需要纠结IDynamicIntegrationEventHandler是干什么用的,也不需要纠结为什么要使用dynamic来保存事件数据。重点是,autofac的BeginLifetimeScope方法调用创建了一个新的IoC Scope,在这个Scope中解析(resolve)了新的EventHandler实例。在eShopOnContainer案例中,EventBusRabbitMQ的设计是特定的,必须依赖于Autofac作为依赖注入框架。或许这部分设计可以进一步改善,使得EventBusRabbitMQ不会强依赖于Autofac。

接下来,我们会引入一个新的概念:事件处理器执行上下文,使用类似的方式来解决对象生命周期问题。

事件处理器执行上下文

事件处理器执行上下文(Event Handler Execution Context, EHEC)为事件处理器提供了一个完整的生命周期管理机制,在这套机制中,事件处理器及其引用的对象资源可以被正常创建和正常销毁。现在让我们一起看看,如何在EdaSample的案例代码中使用事件处理器执行上下文。

事件处理器执行上下文的接口定义如下,当然,这部分接口是放在EdaSample.Common.Events目录下,作为消息系统的框架代码提供给调用方:

public interface IEventHandlerExecutionContext
{
    void RegisterHandler<TEvent, THandler>()
        where TEvent : IEvent
        where THandler : IEventHandler<TEvent>;

    void RegisterHandler(Type eventType, Type handlerType);

    bool HandlerRegistered<TEvent, THandler>()
        where TEvent : IEvent
        where THandler : IEventHandler<TEvent>;

    bool HandlerRegistered(Type eventType, Type handlerType);

    Task HandleEventAsync(IEvent @event, CancellationToken cancellationToken = default);
}

这个接口主要包含三种方法:注册事件处理器、判断事件处理器是否已经注册,以及对接收到的事件消息进行处理。整个结构还是非常清晰简单的。现在需要实现这个接口。根据上面的分析,这个接口的实现是需要依赖于IoC容器的,目前简单起见,我们仅使用微软ASP.NET Core标准的Dependency Injection框架来实现,当然,也可以使用Autofac,取决于你怎样去实现上面这个接口。需要注意的是,由于该接口的实现是需要依赖于第三方组件的(在这里是微软的Dependency Injection框架),因此,最佳做法是新建一个类库,并引用EdaSample.Common程序集,并在这个新的类库中,依赖Dependency Injection框架来实现这个接口。

以下是基于Microsoft.Extensions.DependencyInjection框架来实现的事件处理器执行上下文完整代码,这里有个兼容性问题,就是构造函数的第二个参数:serviceProviderFactory。在Microsoft.Extensions.DependencyInjection框架2.0版本之前,IServiceCollection.BuildServiceProvider方法的返回类型是IServiceProvider,但从2.0开始,它的返回类型已经从IServiceProvider接口,变成了ServiceProvider类。这里引出了框架设计的另一个原则,就是依赖较低版本的.NET Core,以便获得更好的兼容性。如果我们的EdaSample是使用.NET Core 1.1开发的,那么当下面这个类被直接用在ASP.NET Core 2.0的项目中时,如果不通过构造函数参数传入ServiceProvider创建委托,而是直接在代码中使用registry.BuildServiceProvider调用,就会出现异常。

public class EventHandlerExecutionContext : IEventHandlerExecutionContext
{
    private readonly IServiceCollection registry;
    private readonly Func<IServiceCollection, IServiceProvider> serviceProviderFactory;
    private readonly ConcurrentDictionary<Type, List<Type>> registrations = new ConcurrentDictionary<Type, List<Type>>();

    public EventHandlerExecutionContext(IServiceCollection registry, 
        Func<IServiceCollection, IServiceProvider> serviceProviderFactory = null)
    {
        this.registry = registry;
        this.serviceProviderFactory = serviceProviderFactory ?? (sc => registry.BuildServiceProvider());
    }

    public async Task HandleEventAsync(IEvent @event, CancellationToken cancellationToken = default(CancellationToken))
    {
        var eventType = @event.GetType();
        if (this.registrations.TryGetValue(eventType, out List<Type> handlerTypes) &&
            handlerTypes?.Count > 0)
        {
            var serviceProvider = this.serviceProviderFactory(this.registry);
            using (var childScope = serviceProvider.CreateScope())
            {
                foreach(var handlerType in handlerTypes)
                {
                    var handler = (IEventHandler)childScope.ServiceProvider.GetService(handlerType);
                    if (handler.CanHandle(@event))
                    {
                        await handler.HandleAsync(@event, cancellationToken);
                    }
                }
            }
        }
    }

    public bool HandlerRegistered<TEvent, THandler>()
        where TEvent : IEvent
        where THandler : IEventHandler<TEvent>
        => this.HandlerRegistered(typeof(TEvent), typeof(THandler));

    public bool HandlerRegistered(Type eventType, Type handlerType)
    {
        if (this.registrations.TryGetValue(eventType, out List<Type> handlerTypeList))
        {
            return handlerTypeList != null && handlerTypeList.Contains(handlerType);
        }

        return false;
    }

    public void RegisterHandler<TEvent, THandler>()
        where TEvent : IEvent
        where THandler : IEventHandler<TEvent>
        => this.RegisterHandler(typeof(TEvent), typeof(THandler));

    public void RegisterHandler(Type eventType, Type handlerType)
    {
        Utils.ConcurrentDictionarySafeRegister(eventType, handlerType, this.registrations);
        this.registry.AddTransient(handlerType);
    }
}

好了,事件处理器执行上下文就定义好了,接下来就是在我们的ASP.NET Core Web API中使用。为了使用IEventHandlerExecutionContext,我们需要修改事件订阅器的接口定义,并相应地修改PassThroughEventBus以及Startup.cs。代码如下:

// IEventSubscriber
public interface IEventSubscriber : IDisposable
{
    void Subscribe<TEvent, TEventHandler>()
        where TEvent : IEvent
        where TEventHandler : IEventHandler<TEvent>;
}

// PassThroughEventBus
public sealed class PassThroughEventBus : IEventBus
{
    private readonly EventQueue eventQueue = new EventQueue();
    private readonly ILogger logger;
    private readonly IEventHandlerExecutionContext context;

    public PassThroughEventBus(IEventHandlerExecutionContext context,
        ILogger<PassThroughEventBus> logger)
    {
        this.context = context;
        this.logger = logger;
        logger.LogInformation($"PassThroughEventBus构造函数调用完成。Hash Code:{this.GetHashCode()}.");

        eventQueue.EventPushed += EventQueue_EventPushed;
    }

    private async void EventQueue_EventPushed(object sender, EventProcessedEventArgs e)
        => await this.context.HandleEventAsync(e.Event);

    public Task PublishAsync<TEvent>(TEvent @event, CancellationToken cancellationToken = default)
        where TEvent : IEvent
            => Task.Factory.StartNew(() => eventQueue.Push(@event));

    public void Subscribe<TEvent, TEventHandler>()
        where TEvent : IEvent
        where TEventHandler : IEventHandler<TEvent>
    {
        if (!this.context.HandlerRegistered<TEvent, TEventHandler>())
        {
            this.context.RegisterHandler<TEvent, TEventHandler>();
        }
    }

    #region IDisposable Support
    private bool disposedValue = false; // To detect redundant calls
    void Dispose(bool disposing)
    {
        if (!disposedValue)
        {
            if (disposing)
            {
                this.eventQueue.EventPushed -= EventQueue_EventPushed;
                logger.LogInformation($"PassThroughEventBus已经被Dispose。Hash Code:{this.GetHashCode()}.");
            }

            disposedValue = true;
        }
    }
    public void Dispose() => Dispose(true);

    #endregion
}

// Startup.cs
public void ConfigureServices(IServiceCollection services)
{
    this.logger.LogInformation("正在对服务进行配置...");

    services.AddMvc();

    services.AddTransient<IEventStore>(serviceProvider => 
        new DapperEventStore(Configuration["mssql:connectionString"], 
            serviceProvider.GetRequiredService<ILogger<DapperEventStore>>()));

    var eventHandlerExecutionContext = new EventHandlerExecutionContext(services, 
        sc => sc.BuildServiceProvider());
    services.AddSingleton<IEventHandlerExecutionContext>(eventHandlerExecutionContext);
    services.AddSingleton<IEventBus, PassThroughEventBus>();

    this.logger.LogInformation("服务配置完成,已注册到IoC容器!");
}

// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
    var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
    eventBus.Subscribe<CustomerCreatedEvent, CustomerCreatedEventHandler>();

    if (env.IsDevelopment())
    {
        app.UseDeveloperExceptionPage();
    }

    app.UseMvc();
}

代码修改完成后,再次执行Web API,并发送两次(或多次)创建客户的请求,然后查看日志,我们发现,每次请求都会使用新的事件处理器去处理接收到的消息,在保存消息数据时,会使用新的DapperEventStore来保存数据,而保存完成后,会及时将DapperEventStore dispose掉:

ASP.NET Core Web API下事件驱动型架构的实现(二):事件处理器中对象生命周期的管理

小结

本文篇幅比较长,或许你没有太多耐心将文章读完。但我尽量将问题分析清楚,希望提供给读者的内容是详细的、有理有据的。文章中黑体部分是在设计过程中的一些思考和需要注意的地方,希望能够给读者在工作和学习之中带来启发和收获。总而言之,对象生命周期的管理,在服务端应用程序中是非常重要的,需要引起足够的重视。在下文中,我们打算逐步摆脱PassThroughEventBus,基于RabbitMQ来实现消息总线的基础结构。

源代码的使用

本系列文章的源代码在https://github.com/daxnet/edasample这个Github Repo里,通过不同的release tag来区分针对不同章节的源代码。本文的源代码请参考chapter_2这个tag,如下:

ASP.NET Core Web API下事件驱动型架构的实现(二):事件处理器中对象生命周期的管理

点赞
收藏
评论区
推荐文章
Easter79 Easter79
3年前
spring中策略模式使用
策略模式工作中经常使用到策略模式工厂模式,实现一个接口多种实现的灵活调用与后续代码的扩展性。在spring中使用策略模式更为简单,所有的bean均为spring容器管理,只需获取该接口的所有实现类即可。下面以事件处理功能为例,接收到事件之后,根据事件类型调用不同的实现接口去处理。如需新增事件,只需扩展实现类即可,无需改动之前的代码。这样即
Stella981 Stella981
3年前
Guava — EventBus
Guava提供了事件总线的一个实现方案EventBus。它是事件发布订阅模式的实现,观察者模式。Guava为我们提供了同步实现EventBus和异步实现AsyncEventBus两个事件总线,他们都不是单例的eventBus.post(1);eventBus.post(1L);post方法,直接发布事件订阅者需要注册进来,ev
Stella981 Stella981
3年前
ABP EventBus(事件总线)
事件总线就是订阅/发布模式的一种实现  事件总线就是为了降低耦合1.比如在winform中 到处都是事件 !(https://oscimg.oschina.net/oscnet/ed3426bf15550c4b0623956eb95e826780d.png)触发事件的对象 sender事件的数据  e事件的处理逻辑 方法
Stella981 Stella981
3年前
NET Core Web API下事件驱动型架构CQRS架构中聚合与聚合根的实现
NETCoreWebAPI下事件驱动型架构在前面两篇文章中,我详细介绍了基本事件系统的实现,包括事件派发和订阅、通过事件处理器执行上下文来解决对象生命周期问题,以及一个基于RabbitMQ的事件总线的实现。接下来对于事件驱动型架构的讨论,就需要结合一个实际的架构案例来进行分析。在领域驱动设计的讨论范畴,CQRS架构本身就是事件驱动的,因此,我打算首先介
Stella981 Stella981
3年前
Redis 事件机制详解
点击上方"程序员历小冰",选择“置顶或者星标”你的关注意义重大!Redis采用事件驱动机制来处理大量的网络IO。它并没有使用libevent或者libev这样的成熟开源方案,而是自己实现一个非常简洁的事件驱动库ae\_event。Redis中的事件驱动库只关注网络IO,以及定时器。该事件库处理下面两类事件:文件事
Stella981 Stella981
3年前
Android的消息循环与Handler机制理解
一、概念1、事件驱动型什么是事件驱动?就是有事了才去处理,没事就躺着不动。假如把用户点击按钮,滑动页面等这些都看作事件,事件产生后程序就执行相应的处理方法,就是属于事件驱动型。2、消息循环把需要处理的事件表示成一个消息,并且把这个消息放入一个队列。消息循环就是一循环,for或者while都一样。从消息队列里面取出未处理的消息,然后调用该消息的
Wesley13 Wesley13
3年前
.Net Core微服务入门全纪录(七)——IdentityServer4
前言上一篇【.NetCore微服务入门全纪录(六)——EventBus事件总线(https://www.oschina.net/action/GoToLink?urlhttps%3A%2F%2Fwww.cnblogs.com%2Fxhznl%2Fp%2F13154851.html)】中使用CAP完成了一个简单的Eventbus,实现了服务
Stella981 Stella981
3年前
Noark入门之异步事件
引入异步事件主要是为了各模块的解耦,每当完成一个动作时,向系统发布一个事件,由关心的模块自己监听处理,可选择同步处理,异步处理,延迟处理。何时发布事件,当其他模块关心此动作时<br比如获得道具时,任务系统模块要判定完成进度,BI模块需要上报等等都可以监听此事件,已达模块解耦0x00事件源一个实现xyz.noark.core.event
Wesley13 Wesley13
3年前
#分布式系统架构之# 事件驱动模式以及与之匹配的长时间处理过程讨论
     在分布式系统下,可以很多种架构从事设计,或者分布式系统对技术架构本身没有做严格的限制。但是结合自己的实践以及基于《领域驱动设计》的推荐,采用【事件驱动模式】是比较好的一种分布式系统架构方式。该模式充分实现了不同系统之间的代码解耦,所有的业务流转是通过事件广播进行驱动的。所有业务都是在针对名为【事件总线】的组件在编程,也无需知道事件的生产者