Reactor模式的.net版本简单实现

Stella981
• 阅读 597

     近期在学习DotNetty,遇到不少的问题。由于dotnetty是次netty的.net版本的实现。导致在网上叙述dotnetty的原理,以及实现技巧方面的东西较少,这还是十分恼人的。在此建议学习和使用Dotnetty的和位小伙伴,真心阅读下netty的相关书籍,如《netty权威指南》。

     闲话少说,进入正题。netty的性能之所以能够达到如此的高度。主要由于他使用Reactor模式处理socket的请求,让服务器的使用率最大化,且尽量减少线程的开销。本文章主要简单介绍下Reactor模式。

一、reactor概论

reactor模式主要解决处理多个客户端请求的设计模式。

Reactor模式的.net版本简单实现

首先从类图我们可以得知:

Dispatcher:Handler管理器,以及调用度。他依赖于Demultiplexer类

Demultiplexer:事件管理器,接受外部的事件,并提供给Dispatch使用。

Handle:事件源,表示触发了那些事件

EventHandler:各种类型的处理器,用于处理具体的业务,以及I/O的读写

当然,也可以通过序列图看出首先需要初始化Dispatcher, Demultiplexer等相关类,以及注册具体的事件处理器。

二、代码的具体实现

类图如下[源码下载]:

Reactor模式的.net版本简单实现

2.1 多路复用事件处理器的代码

public class Demultiplexer
    {
        private ConcurrentQueue<Event> eventQuene = new ConcurrentQueue<Event>();
        private Object lockObj = new Object();

        public List<Event> Select()
        {
            return this.Select(0);
        }
        public List<Event> Select(int time)
        {
            if(time > 0)
            {
                if (this.eventQuene.IsEmpty)
                {
                    lock (lockObj)
                    {
                        if (this.eventQuene.IsEmpty)
                        {
                            System.Threading.Thread.Sleep(time);
                        }
                    }
                }
            }
            List<Event> events = new List<Event>();
            while(this.eventQuene.Count > 0)
            {
                Event tmp;
                if(this.eventQuene.TryDequeue(out tmp))
                {
                    events.Add(tmp);
                }
            }
            return events;
        }
        public void AddEvent(Event argEvent)
        {
            this.eventQuene.Enqueue(argEvent);
        }
    }

此类主要防止多线程的共同竞争,因为多路径复用选择器会被多个线程同时使用。所以使用的线程安全的Queue。

2.2 Handler触发器和管理器

/// <summary>
    /// Reactor的事件Handler触发器,提供事件Handler的注册,移除
    /// </summary>
    public class EventDispatch
    {
        private Demultiplexer demultiplexer;
        Dictionary<EventType, EventHandler> eventHandlerMap = new Dictionary<EventType, EventHandler>();

        public EventDispatch(Demultiplexer demultiplexer)
        {
            this.demultiplexer = demultiplexer;
        }

        public void RegisterHandler(EventType eventType, EventHandler eventHandler)
        {
            this.eventHandlerMap.Add(eventType, eventHandler);
        }
        public void RemoveHandler(EventType eventType)
        {
            this.eventHandlerMap.Remove(eventType);
        }

        public void HandleEvents()
        {
            this.Dispatch();
        }
        public void Dispatch()
        {
            string log = string.Format("thread id: {0} Dispatch", System.Threading.Thread.CurrentThread.ManagedThreadId);
            Console.WriteLine(log);
            while (true)
            {
                List<Event> events = this.demultiplexer.Select();
                foreach(var itemEvent in events)
                {
                    EventHandler eventHandler = this.eventHandlerMap[itemEvent.EventType];
                    eventHandler.Handle(itemEvent);
                }
                System.Threading.Thread.Sleep(1000);
            }
        }
    }

主要职责,对Handler的注册、移除的管理,以及通过 多路复用选择器 选择相应的Handler进行处理。

2.3 服务端的实现

/// <summary>
    /// 开启接受请求的服务端
    /// </summary>
    public class AcceptRuner
    {
        private System.Collections.Concurrent.ConcurrentQueue<object> sourceQueue = new System.Collections.Concurrent.ConcurrentQueue<object>();

        private Demultiplexer demultiplexer;

        public AcceptRuner(Demultiplexer demultiplexer)
        {
            this.demultiplexer = demultiplexer;
        }

        public void adConnection(object source)
        {
            this.sourceQueue.Enqueue(source);
        }

        public void Run()
        {
            string log = string.Format("thread id: {0} AcceptRunner", System.Threading.Thread.CurrentThread.ManagedThreadId);
            Console.WriteLine(log);
            while (true)
            {
                object source;
                if(this.sourceQueue.TryDequeue(out source))
                {
                    Event acceptEvent = new Event()
                    {
                        EventType = EventType.Accept,
                        Source = source
                    };
                    this.demultiplexer.AddEvent(acceptEvent);
                }
            }
        }
    }

此类效仿netty的serverBoostrap的实现,将外部新的连接以事件对象的形式添加到 多路复用选择器上。

2.4 其他类

Event:事件基类

EventHandler:事件处理器抽象基类。他派生了:AcceptEventHandler,ReadEventHandler。

EventType:事件类型

三、备注说明

1. 代码没有贴完整。但下载包就是完整的。

2. 这只我对Reactor模式的理解,如有偏颇之处,还望各拉指点一二。

点赞
收藏
评论区
推荐文章
blmius blmius
3年前
MySQL:[Err] 1292 - Incorrect datetime value: ‘0000-00-00 00:00:00‘ for column ‘CREATE_TIME‘ at row 1
文章目录问题用navicat导入数据时,报错:原因这是因为当前的MySQL不支持datetime为0的情况。解决修改sql\mode:sql\mode:SQLMode定义了MySQL应支持的SQL语法、数据校验等,这样可以更容易地在不同的环境中使用MySQL。全局s
待兔 待兔
3个月前
手写Java HashMap源码
HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程HashMap的使用教程22
Jacquelyn38 Jacquelyn38
3年前
2020年前端实用代码段,为你的工作保驾护航
有空的时候,自己总结了几个代码段,在开发中也经常使用,谢谢。1、使用解构获取json数据let jsonData  id: 1,status: "OK",data: 'a', 'b';let  id, status, data: number   jsonData;console.log(id, status, number )
Stella981 Stella981
3年前
SpringBoot整合Redis乱码原因及解决方案
问题描述:springboot使用springdataredis存储数据时乱码rediskey/value出现\\xAC\\xED\\x00\\x05t\\x00\\x05问题分析:查看RedisTemplate类!(https://oscimg.oschina.net/oscnet/0a85565fa
Stella981 Stella981
3年前
Android So动态加载 优雅实现与原理分析
背景:漫品Android客户端集成适配转换功能(基于目标识别(So库35M)和人脸识别库(5M)),导致apk体积50M左右,为优化客户端体验,决定实现So文件动态加载.!(https://oscimg.oschina.net/oscnet/00d1ff90e4b34869664fef59e3ec3fdd20b.png)点击上方“蓝字”关注我
Easter79 Easter79
3年前
Twitter的分布式自增ID算法snowflake (Java版)
概述分布式系统中,有一些需要使用全局唯一ID的场景,这种时候为了防止ID冲突可以使用36位的UUID,但是UUID有一些缺点,首先他相对比较长,另外UUID一般是无序的。有些时候我们希望能使用一种简单一些的ID,并且希望ID能够按照时间有序生成。而twitter的snowflake解决了这种需求,最初Twitter把存储系统从MySQL迁移
Wesley13 Wesley13
3年前
mysql设置时区
mysql设置时区mysql\_query("SETtime\_zone'8:00'")ordie('时区设置失败,请联系管理员!');中国在东8区所以加8方法二:selectcount(user\_id)asdevice,CONVERT\_TZ(FROM\_UNIXTIME(reg\_time),'08:00','0
Easter79 Easter79
3年前
SpringBoot整合Redis乱码原因及解决方案
问题描述:springboot使用springdataredis存储数据时乱码rediskey/value出现\\xAC\\xED\\x00\\x05t\\x00\\x05问题分析:查看RedisTemplate类!(https://oscimg.oschina.net/oscnet/0a85565fa
为什么mysql不推荐使用雪花ID作为主键
作者:毛辰飞背景在mysql中设计表的时候,mysql官方推荐不要使用uuid或者不连续不重复的雪花id(long形且唯一),而是推荐连续自增的主键id,官方的推荐是auto_increment,那么为什么不建议采用uuid,使用uuid究
Python进阶者 Python进阶者
9个月前
Excel中这日期老是出来00:00:00,怎么用Pandas把这个去除
大家好,我是皮皮。一、前言前几天在Python白银交流群【上海新年人】问了一个Pandas数据筛选的问题。问题如下:这日期老是出来00:00:00,怎么把这个去除。二、实现过程后来【论草莓如何成为冻干莓】给了一个思路和代码如下:pd.toexcel之前把这