异步Rust:构建实时消息代理服务器

在本文中,我们将深入研究使用Rust构建实时消息代理服务器 , 展示其强大的并发特性 。我们将使用Warp作为web服务器,并使用Tokio来管理异步任务 。此外,我们将创建一个WebSocket客户端来测试代理服务器的功能 。
设计图如下:

异步Rust:构建实时消息代理服务器

文章插图
图片
构建消息代理服务器消息代理服务器允许客户端为主题生成事件并订阅它们 。它使用Warp作为HTTP和WebSocket服务器,使用Tokio作为异步运行时 。
使用以下命令创建一个Rust项目:
cargo new real-ime-message在Cargo.toml文件中加入以下依赖项:
[dependencies]futures-util = "0.3.30"tokio = {version = "1.35.1", features = ["full"]}tokio-tungstenite = "0.21.0"url = "2.5.0"warp = "0.3.6"在src/mAIn.rs文件中定义一个Broker结构体:
use std::{collections::{HashMap, VecDeque},sync::Arc,};use futures_util::{SinkExt, StreamExt};use tokio::sync::{mpsc::{self, UnboundedSender},RwLock,};use warp::{filters::ws::Message, Filter};type Topic = String;type Event = String;type WsSender = UnboundedSender<warp::ws::Message>;struct Broker {events: Arc<RwLock<HashMap<Topic, VecDeque<Event>>>>,subscribers: Arc<RwLock<HashMap<Topic, Vec<WsSender>>>>,}
  • events:存储每个主题的事件 。
  • subscribers:跟踪每个主题的订阅者 。
创建一个新的Broker实例:
impl Broker {fn new() -> Self {Broker {events: Arc::new(RwLock::new(HashMap::new())),subscribers: Arc::new(RwLock::new(HashMap::new())),}}}定义发布事件的方法produce:
impl Broker {......async fn produce(&self, topic: Topic, event: Event) {let mut events = self.events.write().await;events.entry(topic.clone()).or_default().push_back(event.clone());// 异步通知所有订阅者let subscribers_list;{let subscribers = self.subscribers.read().await;subscribers_list = subscribers.get(&topic).cloned().unwrap_or_default();}for ws_sender in subscribers_list {// 将事件发送到WebSocket客户端let _ = ws_sender.send(warp::ws::Message::text(event.clone()));}}}这个方法主要是将事件添加到相应的主题,然后将新事件通知所有订阅者 。
定义subscribe方法,来管理新的订阅:
impl Broker {......pub async fn subscribe(&self, topic: Topic, socket: warp::ws::WebSocket) {let (ws_sender, mut ws_receiver) = socket.split();let (tx, mut rx) = mpsc::unbounded_channel::<Message>();{let mut subs = self.subscribers.write().await;subs.entry(topic).or_default().push(tx);}tokio::task::spawn(async move {while let Some(result) = ws_receiver.next().await {match result {Ok(message) => {// 处理有效的消息if message.is_text() {println!("Received message from client: {}",message.to_str().unwrap());}}Err(e) => {// 处理错误eprintln!("WebSocket error: {:?}", e);break;}}}println!("WebSocket connection closed");});tokio::task::spawn(async move {let mut sender = ws_sender;while let Some(msg) = rx.recv().await {let _ = sender.send(msg).await;}});}}这个方法主要是将WebSocket拆分为发送方和接收方,将订阅者添加到订阅者列表中 , 处理传入的WebSocket消息 。
main函数代码如下:
#[tokio::main]async fn main() {let broker = Arc::new(Broker::new());let broker_clone1 = Arc::clone(&broker);let broker_clone2 = Arc::clone(&broker);let produce = warp::path!("produce" / String).and(warp::post()).and(warp::body::json()).and(warp::any().map(move || Arc::clone(&broker_clone1))).and_then(move |topic: String, event: Event, broker_clone2: Arc<Broker>| async move {broker_clone2.produce(topic, event).await;Ok::<_, warp::Rejection>(warp::reply())},);let subscribe = warp::path!("subscribe" / String).and(warp::ws()).map(move |topic: String, ws: warp::ws::Ws| {let broker_clone3 = Arc::clone(&broker_clone2);ws.on_upgrade(move |socket| async move {broker_clone3.subscribe(topic.clone(), socket).await;})},);let routes = produce.or(subscribe);println!("Broker server running at http://127.0.0.1:3030");warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;}实现WebSocket客户端WebSocket客户端将模拟一个订阅主题和接收消息的真实用户 。
在src/bin目录下,创建一个ws_cli.rs文件 。在文件中定义websocket_client函数,建立WebSocket连接并管理消息:
use futures_util::{sink::SinkExt, stream::StreamExt};use std::sync::Arc;use tokio::sync::RwLock;use tokio::time::{sleep, Duration};use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};use url::Url;async fn websocket_client(topic_url: &str) {// 解析要连接WebSocket服务器的URLlet url = Url::parse(topic_url).expect("Invalid URL");// 连接到WebSocket服务器let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");println!("WebSocket client connected");let (mut write, mut read) = ws_stream.split();let message = Arc::new(RwLock::new(String::new()));let message_1 = message.clone();// 生成一个任务来处理传入的消息tokio::spawn(async move {let msg_lock = message_1.clone();while let Some(message) = read.next().await {match message {Ok(msg) => {let mut ms = msg_lock.write().await;*ms = msg.to_text().unwrap().to_string();println!("Received message: {}", msg.to_text().unwrap());}Err(e) => {eprintln!("Error receiving message: {:?}", e);break;}}}});// 发送消息loop {let msg_lock = message.clone();let ms = msg_lock.read().await;if let Err(e) = write.send(Message::Text(ms.to_string())).await {eprintln!("Error sending message: {:?}", e);break;}sleep(Duration::from_secs(5)).await;}}


推荐阅读