diff --git a/client/src/app/packets_table.rs b/client/src/app/packets_table.rs index f44a03a..917b4c2 100644 --- a/client/src/app/packets_table.rs +++ b/client/src/app/packets_table.rs @@ -80,13 +80,20 @@ impl PacketsTable { let streams = self.streams.borrow(); let packets = &streams.packets; + if packets.is_empty() { + return; + } + + let first_timestamp = packets.first().unwrap().timestamp; + let keys: Vec<_> = packets.keys().collect(); + body.rows(25.0, packets.len(), |id, mut row| { - let first_ts = packets.get(0).unwrap().timestamp; - let packet = packets.get(id).unwrap(); + let key = **keys.get(id).unwrap(); + let packet = packets.get(key).unwrap(); row.col(|ui| { - ui.label(id.to_string()); + ui.label(packet.id.to_string()); }); - let timestamp = packet.timestamp - first_ts; + let timestamp = packet.timestamp - first_timestamp; row.col(|ui| { ui.label(timestamp.as_secs_f64().to_string()); }); diff --git a/client/src/app/rtcp_packets_table.rs b/client/src/app/rtcp_packets_table.rs index 3f4a380..a62ffb6 100644 --- a/client/src/app/rtcp_packets_table.rs +++ b/client/src/app/rtcp_packets_table.rs @@ -77,7 +77,7 @@ impl RtcpPacketsTable { let mut last_id = 0; let mut next_ix = 1; - let first_ts = streams.packets.get(0).unwrap().timestamp; + let first_ts = streams.packets.first().unwrap().timestamp; body.heterogeneous_rows(heights, |ix, mut row| { let (id, rtcp) = rtcp_packets.get(ix).unwrap(); let packet = streams.packets.get(*id).unwrap(); diff --git a/client/src/streams/packets.rs b/client/src/streams/packets.rs index ccfb9f8..2c4543e 100644 --- a/client/src/streams/packets.rs +++ b/client/src/streams/packets.rs @@ -1,5 +1,8 @@ use rtpeeker_common::packet::Packet; -use std::collections::{btree_map::Values, BTreeMap}; +use std::collections::{ + btree_map::{Keys, Values}, + BTreeMap, +}; #[derive(Debug, Default)] pub struct Packets { @@ -11,14 +14,29 @@ impl Packets { self.packets.get(&id) } + pub fn first(&self) -> Option<&Packet> { + match self.packets.first_key_value() { + Some((_, v)) => Some(v), + _ => None, + } + } + pub fn values(&self) -> Values<'_, usize, Packet> { self.packets.values() } + pub fn keys(&self) -> Keys<'_, usize, Packet> { + self.packets.keys() + } + pub fn is_new(&self, packet: &Packet) -> bool { !self.packets.contains_key(&packet.id) } + pub fn is_empty(&self) -> bool { + self.packets.is_empty() + } + pub fn len(&self) -> usize { self.packets.len() } diff --git a/client/src/streams/stream.rs b/client/src/streams/stream.rs index 19a2dc9..4a41092 100644 --- a/client/src/streams/stream.rs +++ b/client/src/streams/stream.rs @@ -142,16 +142,15 @@ impl Stream { RtcpPacket::ReceiverReport(_rr) => {} RtcpPacket::SenderReport(sr) => { // let mut revisit_packets = false; - if let Some((ntp_time, rtp_time)) = self.ntp_rtp { + if let Some((ntp_time, _rtp_time)) = self.ntp_rtp { // revisit_packets = self.estimated_clock_rate.is_none(); - let rtp_diff = sr.rtp_time - rtp_time; - let secs_diff = ntp_to_f64(sr.ntp_time) - ntp_to_f64(ntp_time); - self.estimated_clock_rate = Some(rtp_diff as f64 / secs_diff); + // let rtp_diff = sr.rtp_time - rtp_time; + let _secs_diff = ntp_to_f64(sr.ntp_time) - ntp_to_f64(ntp_time); + // self.estimated_clock_rate = Some(rtp_diff as f64 / secs_diff); } else { // revisit_packets = true; } - self.ntp_rtp = Some((sr.ntp_time, sr.rtp_time)); - // log::info!("EST:{:x?} {:?}", sr.ssrc, self.estimated_clock_rate); + // self.ntp_rtp = Some((sr.ntp_time, sr.rtp_time)); // TODO: use the estiamted clock rate to set ntp time in rtp_info // TODO: sometimes ntp timestamps are bs } diff --git a/src/server.rs b/src/server.rs index ab24c09..918d7c6 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,5 +1,8 @@ use crate::sniffer::Sniffer; -use futures_util::{stream::SplitStream, SinkExt, StreamExt, TryFutureExt}; +use futures_util::{ + stream::{SplitSink, SplitStream}, + SinkExt, StreamExt, TryFutureExt, +}; use log::{error, info, warn}; use rtpeeker_common::packet::SessionProtocol; use rtpeeker_common::Source; @@ -10,8 +13,7 @@ use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, }; -use tokio::sync::{mpsc, mpsc::UnboundedSender, RwLock}; -use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio::sync::{mpsc, mpsc::Sender, RwLock}; use warp::ws::{Message, WebSocket}; use warp::Filter; @@ -20,12 +22,12 @@ const WS_PATH: &str = "ws"; static NEXT_CLIENT_ID: AtomicUsize = AtomicUsize::new(1); struct Client { - pub sender: mpsc::UnboundedSender, + pub sender: mpsc::Sender, pub source: Option, } impl Client { - pub fn new(sender: mpsc::UnboundedSender) -> Self { + pub fn new(sender: mpsc::Sender) -> Self { Self { sender, source: None, @@ -93,15 +95,16 @@ async fn client_connected(ws: WebSocket, clients: Clients, source_to_packets: Pa let (mut ws_tx, ws_rx) = ws.split(); - // create channel to send incoming packets to client - // and pass it to sniffer via shared state - let (mut tx, rx) = mpsc::unbounded_channel(); - let mut rx = UnboundedReceiverStream::new(rx); + send_pcap_filenames(&client_id, &mut ws_tx, &source_to_packets).await; - send_pcap_filenames(&client_id, &mut tx, &source_to_packets).await; + // FIXME: something is very wrong here + // if buffer size is > 1, rx.recv always waits + // until channel's buffer is full before receiving + // might be because of blocking sniffers + let (tx, mut rx) = mpsc::channel(1); tokio::task::spawn(async move { - while let Some(message) = rx.next().await { + while let Some(message) = rx.recv().await { ws_tx .send(message) .unwrap_or_else(|e| { @@ -121,7 +124,7 @@ async fn client_connected(ws: WebSocket, clients: Clients, source_to_packets: Pa async fn send_pcap_filenames( client_id: &usize, - ws_tx: &mut UnboundedSender, + ws_tx: &mut SplitSink, source_to_packets: &Arc>, ) { let sources = source_to_packets.keys().cloned().collect(); @@ -133,9 +136,12 @@ async fn send_pcap_filenames( }; let msg = Message::binary(encoded); - ws_tx.send(msg).unwrap_or_else(|e| { - error!("WebSocket send error: {}, client_id: {}", e, client_id); - }); + ws_tx + .send(msg) + .unwrap_or_else(|e| { + error!("WebSocket send error: {}, client_id: {}", e, client_id); + }) + .await; } async fn sniff(mut sniffer: Sniffer, packets: Packets, clients: Clients) { @@ -156,34 +162,36 @@ async fn sniff(mut sniffer: Sniffer, packets: Packets, cl source: Some(source), sender, } if *source == sniffer.source => { - sender.send(msg.clone()).unwrap_or_else(|e| { - error!("Sniffer: error while sending packet: {}", e); - }) + sender + .send(msg.clone()) + .unwrap_or_else(|e| { + error!("Sniffer: error while sending packet: {}", e); + }) + .await; } _ => {} } } packets.write().await.push(response); } - Err(err) => error!("Error when capturing a packet: {:?}", err), + Err(err) => info!("Error when capturing a packet: {:?}", err), } } } -async fn send_all_packets( - client_id: usize, - packets: &Packets, - ws_tx: &mut UnboundedSender, -) { +async fn send_all_packets(client_id: usize, packets: &Packets, ws_tx: &mut Sender) { for pack in packets.read().await.iter() { let Ok(encoded) = pack.encode() else { error!("Failed to encode packet, client_id: {}", client_id); continue; }; let msg = Message::binary(encoded); - ws_tx.send(msg).unwrap_or_else(|e| { - error!("WebSocket `feed` error: {}, client_id: {}", e, client_id); - }) + ws_tx + .send(msg) + .unwrap_or_else(|e| { + error!("WebSocket `feed` error: {}, client_id: {}", e, client_id); + }) + .await; } info!( @@ -224,11 +232,16 @@ async fn reparse_packet( Client { source: Some(source), sender, - } if *source == *cur_source => sender.send(msg.clone()).unwrap_or_else(|e| { - error!("Sniffer: error while sending packet: {}", e); - }), + } if *source == *cur_source => { + sender + .send(msg.clone()) + .unwrap_or_else(|e| { + error!("Sniffer: error while sending packet: {}", e); + }) + .await; + } _ => {} - } + }; } } diff --git a/src/sniffer.rs b/src/sniffer.rs index 8818d1a..bcde763 100644 --- a/src/sniffer.rs +++ b/src/sniffer.rs @@ -22,7 +22,7 @@ impl Sniffer { pub fn from_file(file: &str) -> Result { match pcap::Capture::from_file(file) { Ok(capture) => Ok(Self { - packet_id: 0, + packet_id: 1, capture, source: Source::File(file.to_string()), }), @@ -37,9 +37,9 @@ impl Sniffer { return Err(Error::DeviceNotFound); }; - match capture.open() { + match capture.immediate_mode(true).open() { Ok(capture) => Ok(Self { - packet_id: 0, + packet_id: 1, capture, source: Source::Interface(device.to_string()), }), @@ -56,12 +56,12 @@ impl Sniffer { Err(_) => return Some(Err(Error::CouldntReceivePacket)), }; - match Packet::build(&packet, self.packet_id) { - Some(packet) => { - self.packet_id += 1; - Some(Ok(packet)) - } - None => Some(Err(Error::UnsupportedPacketType)), - } + let res = match Packet::build(&packet, self.packet_id) { + Some(packet) => Ok(packet), + None => Err(Error::UnsupportedPacketType), + }; + + self.packet_id += 1; + Some(res) } }