偷偷摘套内射激情视频,久久精品99国产国产精,中文字幕无线乱码人妻,中文在线中文a,性爽19p

使用Rust和WebSocket構(gòu)建點(diǎn)對(duì)點(diǎn)網(wǎng)絡(luò)

開(kāi)發(fā) 前端
這篇文章使用Rust構(gòu)建基于WebSocket的P2P網(wǎng)絡(luò)示例,它提供了一種強(qiáng)大而有效的方式來(lái)實(shí)現(xiàn)節(jié)點(diǎn)之間的實(shí)時(shí)通信。通過(guò)理解代碼的每個(gè)部分,可以擴(kuò)展和定制這個(gè)示例以滿足特定需求,無(wú)論是分散的應(yīng)用程序,實(shí)時(shí)數(shù)據(jù)共享還是分布式計(jì)算任務(wù)。?

在WebSocket基礎(chǔ)設(shè)施上創(chuàng)建點(diǎn)對(duì)點(diǎn)(P2P)網(wǎng)絡(luò)似乎是一項(xiàng)艱巨的任務(wù)。在這篇文章中,我們將介紹基于Rust的P2P網(wǎng)絡(luò)的關(guān)鍵組件,探索每個(gè)部分如何構(gòu)建無(wú)縫的WebSocket基礎(chǔ)設(shè)施。使用它們構(gòu)建一個(gè)健壯且高效的P2P網(wǎng)絡(luò),允許節(jié)點(diǎn)之間的實(shí)時(shí)通信。

P2P網(wǎng)絡(luò)簡(jiǎn)介

點(diǎn)對(duì)點(diǎn)網(wǎng)絡(luò)支持節(jié)點(diǎn)之間的分散通信,允許數(shù)據(jù)交換而不依賴(lài)于中心服務(wù)器。網(wǎng)絡(luò)中的每個(gè)參與者,或“對(duì)等端”,可以同時(shí)充當(dāng)客戶端和服務(wù)器端,促進(jìn)直接連接和通信。在我們的示例中,我們使用WebSocket,它在單個(gè)TCP連接上提供全雙工通信通道,以促進(jìn)這種實(shí)時(shí)交互。

項(xiàng)目概述

我們的項(xiàng)目旨在演示如何在Rust中使用WebSocket建立P2P網(wǎng)絡(luò),利用Tokio異步運(yùn)行時(shí)的強(qiáng)大功能。我們將探討以下關(guān)鍵組件:

  • 命令行參數(shù)解析:使用clap解析對(duì)等url和綁定地址。
  • WebSocket Actor:管理到對(duì)等端的WebSocket連接。
  • 網(wǎng)絡(luò)狀態(tài)管理:維護(hù)網(wǎng)絡(luò)的狀態(tài),包括連接的對(duì)等點(diǎn)。
  • 連接處理:管理對(duì)等端WebSocket連接的生命周期。
  • 廣播消息:定期向所有連接的對(duì)等端發(fā)送消息。
  • 優(yōu)雅關(guān)閉:優(yōu)雅地處理中斷以關(guān)閉網(wǎng)絡(luò)。

項(xiàng)目開(kāi)發(fā)

使用以下命令創(chuàng)建一個(gè)Rust新項(xiàng)目:

cargo new p2p-ws-example

在Cargo.toml文件中加入以下依賴(lài)項(xiàng):

[dependencies]
clap = { version = "4.5.13", features = ["derive"] }
env_logger = "0.11.5"
futures = "0.3.30"
log = "0.4.22"
serde = "1.0.204"
tokio = { version = "1.39.2", features = ["full"] }
tokio-tungstenite = "0.23.1"
tokio-util = { version = "0.7.11", features = ["full"] }

命令行參數(shù)解析

我們將從使用clap定義命令行參數(shù)開(kāi)始。這允許我們?cè)趩?dòng)P2P節(jié)點(diǎn)時(shí)指定對(duì)等url和綁定地址。

use std::net::ToSocketAddrs;

use clap::Parser;

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
    /// 要連接的客戶端地址列表
    #[arg(short, long, value_delimiter = ',', value_parser = parse_peer)]
    peers: Vec<String>,

    /// 綁定服務(wù)器的地址
    #[arg(short, long, value_parser = parse_bind)]
    bind: String,
}

/// 解析并驗(yàn)證客戶端url
fn parse_peer(s: &str) -> Result<String, String> {
    // 驗(yàn)證以ws://或wss://開(kāi)頭的URL
    if s.starts_with("ws://") {
        let ip_port = &s[5..];
        if let Ok(_socket_addr) = ip_port.to_socket_addrs() {
            return Ok(s.to_string());
        }
    }
    Err(format!("Invalid client URL: {}", s))
}

/// 解析并驗(yàn)證綁定地址
fn parse_bind(s: &str) -> Result<String, String> {
    if let Ok(_socket_addr) = s.to_socket_addrs() {
        return Ok(s.to_string());
    }
    Err(format!("Invalid bind address: {}", s))
}

  • peers:以逗號(hào)分隔的WebSocket url列表,以對(duì)等端的形式連接。
  • bind:連接綁定服務(wù)器的地址。

WebSocket Actor

WebSocketActor結(jié)構(gòu)體管理WebSocket連接。它建立到給定URL的連接,并處理消息的發(fā)送和接收。

use tokio::net::TcpStream;
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};

// 定義 WebSocket actor
struct WebSocketActor {
    ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>,
}
impl WebSocketActor {
    async fn connect(url: &str) -> Option<Self> {
        match connect_async(url).await {
            Ok((conn, _)) => {
                log::info!("Connected successfully to {}", url);
                Some(WebSocketActor { ws_stream: conn })
            }
            Err(e) => {
                log::error!("Connection to {} failed: {:?}", url, e);
                None
            }
        }
    }
}

網(wǎng)絡(luò)狀態(tài)管理

P2PWebsocketNetwork結(jié)構(gòu)體維護(hù)網(wǎng)絡(luò)的狀態(tài),包括連接的對(duì)等點(diǎn)和用于消息廣播的主發(fā)送方。

use std::{
    collections::HashMap,
    net::{SocketAddr, ToSocketAddrs},
    sync::{Arc, Mutex},
};

use clap::Parser;
use tokio::{net::TcpStream, sync::mpsc::UnboundedSender};
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};

struct P2PWebsocketNetwork {
    addresses: Arc<Mutex<HashMap<SocketAddr, UnboundedSender<P2PInnerMessage>>>>,
    master: Arc<Mutex<UnboundedSender<P2PInnerMessage>>>,
}

#[derive(Debug)]
struct P2PInnerMessage {
    message: Message,
    tx_handler: UnboundedSender<P2PInnerMessage>,
}

連接管理

我們將使用handle_connection和handle_server_connection函數(shù)處理傳入和傳出的連接。這些函數(shù)管理WebSocket連接的生命周期,發(fā)送和接收消息。

async fn handle_connection(
    state: Arc<P2PWebsocketNetwork>,
    conn: WebSocketActor,
    token: CancellationToken,
) {
    // 處理連接和消息交換的邏輯
}

  • state:包含網(wǎng)絡(luò)信息的共享狀態(tài)(Arc<P2PWebsocketNetwork>),包括已連接的對(duì)等端和消息處理程序。
  • conn:表示與對(duì)等端連接的WebSocketActor實(shí)例。
  • token:用于處理任務(wù)取消,以實(shí)現(xiàn)安全關(guān)機(jī)。

handle_connection函數(shù)體邏輯如下:

1,提取套接字地址

// 提取套接字地址作為客戶端列表的鍵
let addr = match conn.ws_stream.get_ref() {
    MaybeTlsStream::Plain(f) => f.peer_addr().unwrap(),
    _ => {
        panic!("tls is not supported yet");
    }
};

addr:對(duì)等端的socket地址。代碼從WebSocket流中提取對(duì)等端的地址。目前只支持非tls (plain)流。

2,設(shè)置消息通道

// 這個(gè)tx應(yīng)該在網(wǎng)絡(luò)狀態(tài)下共享
let (tx, mut rx) = unbounded_channel::<P2PInnerMessage>();
{
    let mut list = state.addresses.lock().unwrap();
    list.insert(addr, tx.clone());
}

  • tx和rx:用于在此函數(shù)和應(yīng)用程序的其他部分之間發(fā)送和接收消息(P2PInnerMessage)的無(wú)界通道。
  • state.addresses:連接的對(duì)等端的地址存儲(chǔ)在一個(gè)共享的HashMap中,新對(duì)等端的地址和它的發(fā)送者(tx)被添加到列表中。

3,拆分WebSocket流

let (mut ws_tx, mut ws_rx) = conn.ws_stream.split();

ws_tx和ws_rx:WebSocket流分為發(fā)送方(ws_tx)和接收方(ws_rx),以異步方式處理傳入和傳出的消息。

4,循環(huán)消息處理

loop {
    tokio::select! {
        Some(msg) = ws_rx.next() => {
            log::debug!("Received: {:?}", msg);
            match msg {
                Ok(msg) => {
                    if let Err(e) = state.master.lock().unwrap().send(P2PInnerMessage {
                        message: msg,
                        tx_handler: tx.clone(),
                    }) {
                        log::error!("Failed to send message to master: {:?}", e);
                    }
                },
                Err(e) => {
                    log::error!("Error receiving message or connection closed: {:?}", e);
                    break
                }
            }
        }
        Some(msg) = rx.recv() => {
            log::debug!("Sending: {:?}", msg);
            if let Err(e) = ws_tx.send(msg.message).await {
                log::error!("Failed to send message on socket: {:?}", e);
            }
        }
        _ = token.cancelled() => {
            log::warn!("task cancelled");
            break
        }
    }
}

  • 傳入消息(ws_rx):循環(huán)使用tokio::select!等待多個(gè)異步事件。它首先檢查來(lái)自對(duì)等端的傳入消息(ws_rx.next())。如果成功接收到消息,則通過(guò)state.master將其轉(zhuǎn)發(fā)給主處理程序。這可以對(duì)消息進(jìn)行集中處理或路由。如果發(fā)生錯(cuò)誤(例如,連接關(guān)閉),循環(huán)中斷,有效地終止連接。
  • 傳出消息(rx):循環(huán)還檢查從應(yīng)用程序的內(nèi)部通道(rx.recv())接收到的打算發(fā)送給對(duì)等端的消息。使用ws_tx.send(msg.message).await將消息發(fā)送到對(duì)等端。發(fā)送中的錯(cuò)誤將被記錄。
  • 取消令牌:如果取消令牌被觸發(fā)(Token .cancelled()),循環(huán)中斷,允許任務(wù)干凈地退出。

5,從列表中刪除對(duì)等端

{
    // 從列表中刪除客戶端
    let mut list = state.addresses.lock().unwrap();
    list.remove(&addr);
}

一旦循環(huán)退出(由于錯(cuò)誤或取消),對(duì)等體的地址將從連接地址列表中刪除,以確保狀態(tài)準(zhǔn)確地反映當(dāng)前網(wǎng)絡(luò)連接。

handle_server_connection函數(shù)的操作邏輯與上述一樣,代碼如下:

async fn handle_server_connection(
    state: Arc<P2PWebsocketNetwork>,
    raw_stream: TcpStream,
    addr: SocketAddr,
    token: CancellationToken,
) {
    let (tx, mut rx) = unbounded_channel::<P2PInnerMessage>();
    {
        let mut list = state.addresses.lock().unwrap();
        list.insert(addr, tx.clone());
    }

    log::info!("Incoming TCP connection from: {}", addr);

    let ws_stream = match tokio_tungstenite::accept_async(raw_stream).await {
        Ok(ws) => ws,
        Err(e) => {
            log::error!("WebSocket handshake error: {:?}", e);
            return;
        }
    };

    log::info!("WebSocket connection established: {}", addr);

    let (mut ws_tx, mut ws_rx) = ws_stream.split();
    loop {
        tokio::select! {
            Some(msg) = ws_rx.next() => {
                log::debug!("Received: {:?}", msg);
                match msg {
                    Ok(msg) => {
                        if let Err(e) = state.master.lock().unwrap().send(P2PInnerMessage {
                            message: msg,
                            tx_handler: tx.clone(),
                        }) {
                            log::error!("Failed to send message to master: {:?}", e);
                        }
                    },
                    Err(e) => {
                        log::error!("Error receiving message or connection closed: {:?}", e);
                        break
                    }
                }
            }
            Some(msg) = rx.recv() => {
                log::debug!("Sending: {:?}", msg);
                if let Err(e) = ws_tx.send(msg.message).await {
                    log::error!("Failed to send message on socket: {:?}", e);
                }
            }
            _ = token.cancelled() => {
                log::warn!("task cancelled");
                break
            }
        }
    }
    {
        // 從列表中刪除客戶端
        let mut list = state.addresses.lock().unwrap();
        list.remove(&addr);
    }
}

廣播消息

我們的廣播功能定期向所有連接的對(duì)等點(diǎn)發(fā)送消息,展示了P2P網(wǎng)絡(luò)傳播信息的能力。

async fn broadcast(
    state: Arc<P2PWebsocketNetwork>,
    tx: UnboundedSender<P2PInnerMessage>,
    bind: String,
) {
    log::debug!("Broadcast start");

    // 廣播到已連接的客戶端
    let list = state.addresses.lock().unwrap();

    for (i, cl) in list.iter().enumerate() {
        log::debug!("Broadcasting to {} ", cl.0);
        if let Err(e) = cl.1.send(P2PInnerMessage {
            message: tungstenite::protocol::Message::text(format!(
                "Message to client {} from {}",
                i, bind
            )),
            tx_handler: tx.clone(),
        }) {
            log::error!("Failed to send broadcast message: {:?}", e);
        }
    }
    log::debug!("Broadcast end");
}

main函數(shù)

#[tokio::main]
async fn main() {
    let args = Args::parse();
    env_logger::init();

    let cancelation_token = CancellationToken::new();
    let tracker = TaskTracker::new();
    let (tx, mut rx) = unbounded_channel::<P2PInnerMessage>();
    let network_state: Arc<P2PWebsocketNetwork> = Arc::new(P2PWebsocketNetwork {
        addresses: Arc::new(Mutex::new(HashMap::new())),
        master: Arc::new(Mutex::new(tx.clone())),
    });

    for url in &args.peers {
        log::info!("connecting to {} ...", url);
        if let Some(conn) = WebSocketActor::connect(url).await {
            tracker.spawn(handle_connection(
                network_state.clone(),
                conn,
                cancelation_token.clone(),
            ));
        } else {
            log::warn!("could not connect to server: {url}");
        }
    }

    let listener = TcpListener::bind(&args.bind).await.expect("Failed to bind");

    loop {
        tokio::select! {
            Ok((stream, addr)) = listener.accept() => {
                tracker.spawn(handle_server_connection(
                    network_state.clone(),
                    stream, addr, cancelation_token.clone()));
            }
            Some(msg) = rx.recv() => {
                log::debug!("consuming ->{msg:?}");
            }
            _ = tokio::signal::ctrl_c() => {
                log::warn!("Received Ctrl+C, shutting down...");
                tracker.close();
                cancelation_token.cancel();
                break
            }
            _ = tokio::time::sleep(tokio::time::Duration::from_secs(10)) => {
                tracker.spawn(broadcast(network_state.clone(), tx.clone(), args.bind.clone()));
            }
        }
    }
    log::info!("waiting for all tasks");
    tracker.wait().await;
    log::debug!("tasks all are stoped");
}

在這里 我們將通過(guò)處理Ctrl+C信號(hào)并相應(yīng)地取消任務(wù)來(lái)確保我們的網(wǎng)絡(luò)可以優(yōu)雅地關(guān)閉。

運(yùn)行項(xiàng)目

在不同的終端中,使用這些參數(shù)按以下順序運(yùn)行程序:

$ RUST_LOG=debug cargo run -- --bind localhost:8080 # 啟動(dòng)第一個(gè)節(jié)點(diǎn)
$ RUST_LOG=debug cargo run -- --peers ws://localhost:8080 --bind localhost:8085 # 啟動(dòng)第二個(gè)節(jié)點(diǎn)
$ RUST_LOG=debug cargo run -- --peers ws://localhost:8080,ws://localhost:8085 --bind localhost:8086 # 啟動(dòng)第三個(gè)節(jié)點(diǎn)

然后,將看到從每個(gè)對(duì)等點(diǎn)向所有其他對(duì)等點(diǎn)廣播消息。這是對(duì)所有人的廣播!

總結(jié)

這篇文章使用Rust構(gòu)建基于WebSocket的P2P網(wǎng)絡(luò)示例,它提供了一種強(qiáng)大而有效的方式來(lái)實(shí)現(xiàn)節(jié)點(diǎn)之間的實(shí)時(shí)通信。通過(guò)理解代碼的每個(gè)部分,可以擴(kuò)展和定制這個(gè)示例以滿足特定需求,無(wú)論是分散的應(yīng)用程序,實(shí)時(shí)數(shù)據(jù)共享還是分布式計(jì)算任務(wù)。

責(zé)任編輯:武曉燕 來(lái)源: coding到燈火闌珊
相關(guān)推薦

2020-02-17 16:28:49

開(kāi)發(fā)技能代碼

2022-04-14 09:19:34

Notion開(kāi)源AppFlowy

2025-03-04 08:00:00

機(jī)器學(xué)習(xí)Rust開(kāi)發(fā)

2023-05-26 17:21:15

PythonRust

2021-05-19 14:46:41

Space XRust語(yǔ)言

2020-10-21 14:54:02

RustGolang開(kāi)發(fā)

2023-04-19 07:39:55

RustHTTP服務(wù)器

2024-02-01 12:54:00

RustWebSocket消息代理

2024-11-11 11:59:09

Rust網(wǎng)絡(luò)工具

2024-11-27 13:25:24

Rust線程池線程

2023-05-04 07:33:39

Rust變量常量

2014-03-25 14:21:18

WebSocket實(shí)時(shí)

2021-03-19 08:58:19

Rust共享愿景文檔開(kāi)發(fā)者

2017-08-29 13:50:03

TensorFlow深度學(xué)習(xí)神經(jīng)網(wǎng)絡(luò)

2010-08-30 20:13:25

DHCP服務(wù)器

2017-03-27 16:18:30

神經(jīng)網(wǎng)絡(luò)TensorFlow人工智能

2023-12-11 11:56:24

圖片服務(wù)器Rust

2009-09-03 14:49:49

C#實(shí)現(xiàn)網(wǎng)絡(luò)點(diǎn)對(duì)點(diǎn)

2014-06-12 10:17:25

SDN網(wǎng)絡(luò)虛擬化

2024-03-08 12:17:39

網(wǎng)絡(luò)爬蟲(chóng)Python開(kāi)發(fā)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)