asp.net core supersocket介绍以及源码分析

跟物联网设备常用的通信协议有TCP,MQTT.今天我们介绍的是TCP连接,TCP连接程序的组件有Supersocket,do.NETty.Supersocket相信搞过.net的朋友应该都知道,dotnetty是有微软Azure从JAVA平台下移植过来的一个高性能、异步事件驱动的 NIO 框架,Kafka和RocketMQ等消息中间件、ElasticSearch开源搜索引擎、大数据处理Hadoop的RPC框架Avro、分布式通信框架Dubbo,都使用了Netty,Netty的资料很多,有兴趣的可以搜一下,dotnetty和Netty语法基本一致,所以资料可以互相参考,今天我们介绍的是.net下supersocket的使用以及源码分析
SuperSocket 可以和 ASP.NET Core 网站一起同时运行 。你需要做的是将 SuperSocket 注册到 ASP.NET Core 网站的host builder中去, 同时将服务器的选项放到配置文件中或者通过代码定义 。
//don't forget the usingsusing SuperSocket;using SuperSocket.ProtoBase;public static IHostBuilder CreateHostBuilder(string[] args) =>Host.CreateDefaultBuilder(args).ConfigureWebHostDefaults(webBuilder =>{webBuilder.UseStartup<Startup>();}).AsSuperSocketHostBuilder<TextPackageInfo, LinePipelineFilter>().UsePackageHandler(async (s, p) =>{// echo message back to clientawait s.SendAsync(Encoding.UTF8.GetBytes(p.Text + "rn"));});同时将服务器的配置选项放到配置文件 "Appsettings.json" 中去:
{"Logging": {"LogLevel": {"Default": "Information","Microsoft": "Warning","Microsoft.Hosting.Lifetime": "Information"}},"serverOptions": {"name": "TestServer","listeners": [{"ip": "Any","port": 4040}]},"AllowedHosts": "*"}上面代码就可以实现supersocket和asp.net core (.net core3,.net5,.net6)的集成,可以支持最新的.NET6,当然DotNetty也支持.Net 6
今天我们分析源码的版本是supersocket V2.0的版本,跟之前.net freamwork下有很大的区别
SuperSocket 请求处理模型示意图
 

asp.net core supersocket介绍以及源码分析

文章插图
SuperSocket 请求处理模型示意图
 
源码主线流程分析
 
asp.net core supersocket介绍以及源码分析

文章插图
主流程图
1分析的入口就从SuperSocketService这个类开始,他集成自IHostedService, IServer,因此这个类的StartAsync就是执行的入口
async Task<bool> IServer.StartAsync(){await StartAsync(CancellationToken.None);return true;} 
【asp.net core supersocket介绍以及源码分析】2.通过channelCreatorFactory创建监听器,监听代码如下
 
public bool Start(){var options = Options;try{if (options.Security != SslProtocols.None && options.CertificateOptions != null){options.CertificateOptions.EnsureCertificate();}var listenEndpoint = options.GetListenEndPoint();var listenSocket = _listenSocket = new Socket(listenEndpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);listenSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);listenSocket.LingerState = new LingerOption(false, 0);if (options.NoDelay)listenSocket.NoDelay = true;listenSocket.Bind(listenEndpoint);listenSocket.Listen(options.BackLog);IsRunning = true;_cancellationTokenSource = new CancellationTokenSource();KeepAccept(listenSocket).DoNotAwait();return true;}catch (Exception e){_logger.LogError(e, $"The listener[{this.ToString()}] failed to start.");return false;}}3.开启异步接收accept线程
 
private async Task KeepAccept(Socket listenSocket){while (!_cancellationTokenSource.IsCancellationRequested){try{var client = await listenSocket.AcceptAsync().ConfigureAwait(false);OnNewClientAccept(client);}catch (Exception e){if (e is ObjectDisposedException || e is NullReferenceException)break;if (e is SocketException se){var errorCode = se.ErrorCode;//The listen socket was closedif (errorCode == 125 || errorCode == 89 || errorCode == 995 || errorCode == 10004 || errorCode == 10038){break;}}_logger.LogError(e, $"Listener[{this.ToString()}] failed to do AcceptAsync");continue;}}_stopTaskCompletionSource.TrySetResult(true);}4 当有连接进来后,执行执行注册的事件
listener.NewClientAccepted += OnNewClientAccept; 
5.accepted后就创建新的Channel,session
 
private void AcceptNewChannel(IChannel channel){var session = _sessionFactory.Create() as AppSession;HandleSession(session, channel).DoNotAwait();}6 channel.Start(),
public override void Start(){_readsTask = ProcessReads();_sendsTask = ProcessSends();WaitHandleClosing();} 
7.将从内核里读的socket数据异步写到 Pipe中


推荐阅读