Skip to content

Commit

Permalink
Capture and server fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
LVala committed Nov 28, 2023
1 parent 42a6985 commit b7a043e
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 53 deletions.
15 changes: 11 additions & 4 deletions client/src/app/packets_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,20 @@ impl PacketsTable {
let streams = self.streams.borrow();
let packets = &streams.packets;

if packets.is_empty() {
return;
}

let first_timestamp = packets.first().unwrap().timestamp;
let keys: Vec<_> = packets.keys().collect();

body.rows(25.0, packets.len(), |id, mut row| {
let first_ts = packets.get(0).unwrap().timestamp;
let packet = packets.get(id).unwrap();
let key = **keys.get(id).unwrap();
let packet = packets.get(key).unwrap();
row.col(|ui| {
ui.label(id.to_string());
ui.label(packet.id.to_string());
});
let timestamp = packet.timestamp - first_ts;
let timestamp = packet.timestamp - first_timestamp;
row.col(|ui| {
ui.label(timestamp.as_secs_f64().to_string());
});
Expand Down
2 changes: 1 addition & 1 deletion client/src/app/rtcp_packets_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl RtcpPacketsTable {
let mut last_id = 0;
let mut next_ix = 1;

let first_ts = streams.packets.get(0).unwrap().timestamp;
let first_ts = streams.packets.first().unwrap().timestamp;
body.heterogeneous_rows(heights, |ix, mut row| {
let (id, rtcp) = rtcp_packets.get(ix).unwrap();
let packet = streams.packets.get(*id).unwrap();
Expand Down
20 changes: 19 additions & 1 deletion client/src/streams/packets.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use rtpeeker_common::packet::Packet;
use std::collections::{btree_map::Values, BTreeMap};
use std::collections::{
btree_map::{Keys, Values},
BTreeMap,
};

#[derive(Debug, Default)]
pub struct Packets {
Expand All @@ -11,14 +14,29 @@ impl Packets {
self.packets.get(&id)
}

pub fn first(&self) -> Option<&Packet> {
match self.packets.first_key_value() {
Some((_, v)) => Some(v),
_ => None,
}
}

pub fn values(&self) -> Values<'_, usize, Packet> {
self.packets.values()
}

pub fn keys(&self) -> Keys<'_, usize, Packet> {
self.packets.keys()
}

pub fn is_new(&self, packet: &Packet) -> bool {
!self.packets.contains_key(&packet.id)
}

pub fn is_empty(&self) -> bool {
self.packets.is_empty()
}

pub fn len(&self) -> usize {
self.packets.len()
}
Expand Down
11 changes: 5 additions & 6 deletions client/src/streams/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,16 +142,15 @@ impl Stream {
RtcpPacket::ReceiverReport(_rr) => {}
RtcpPacket::SenderReport(sr) => {
// let mut revisit_packets = false;
if let Some((ntp_time, rtp_time)) = self.ntp_rtp {
if let Some((ntp_time, _rtp_time)) = self.ntp_rtp {
// revisit_packets = self.estimated_clock_rate.is_none();
let rtp_diff = sr.rtp_time - rtp_time;
let secs_diff = ntp_to_f64(sr.ntp_time) - ntp_to_f64(ntp_time);
self.estimated_clock_rate = Some(rtp_diff as f64 / secs_diff);
// let rtp_diff = sr.rtp_time - rtp_time;
let _secs_diff = ntp_to_f64(sr.ntp_time) - ntp_to_f64(ntp_time);
// self.estimated_clock_rate = Some(rtp_diff as f64 / secs_diff);
} else {
// revisit_packets = true;
}
self.ntp_rtp = Some((sr.ntp_time, sr.rtp_time));
// log::info!("EST:{:x?} {:?}", sr.ssrc, self.estimated_clock_rate);
// self.ntp_rtp = Some((sr.ntp_time, sr.rtp_time));
// TODO: use the estiamted clock rate to set ntp time in rtp_info
// TODO: sometimes ntp timestamps are bs
}
Expand Down
75 changes: 44 additions & 31 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::sniffer::Sniffer;
use futures_util::{stream::SplitStream, SinkExt, StreamExt, TryFutureExt};
use futures_util::{
stream::{SplitSink, SplitStream},
SinkExt, StreamExt, TryFutureExt,
};
use log::{error, info, warn};
use rtpeeker_common::packet::SessionProtocol;
use rtpeeker_common::Source;
Expand All @@ -10,8 +13,7 @@ use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use tokio::sync::{mpsc, mpsc::UnboundedSender, RwLock};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio::sync::{mpsc, mpsc::Sender, RwLock};
use warp::ws::{Message, WebSocket};
use warp::Filter;

Expand All @@ -20,12 +22,12 @@ const WS_PATH: &str = "ws";
static NEXT_CLIENT_ID: AtomicUsize = AtomicUsize::new(1);

struct Client {
pub sender: mpsc::UnboundedSender<Message>,
pub sender: mpsc::Sender<Message>,
pub source: Option<Source>,
}

impl Client {
pub fn new(sender: mpsc::UnboundedSender<Message>) -> Self {
pub fn new(sender: mpsc::Sender<Message>) -> Self {
Self {
sender,
source: None,
Expand Down Expand Up @@ -93,15 +95,16 @@ async fn client_connected(ws: WebSocket, clients: Clients, source_to_packets: Pa

let (mut ws_tx, ws_rx) = ws.split();

// create channel to send incoming packets to client
// and pass it to sniffer via shared state
let (mut tx, rx) = mpsc::unbounded_channel();
let mut rx = UnboundedReceiverStream::new(rx);
send_pcap_filenames(&client_id, &mut ws_tx, &source_to_packets).await;

send_pcap_filenames(&client_id, &mut tx, &source_to_packets).await;
// FIXME: something is very wrong here
// if buffer size is > 1, rx.recv always waits
// until channel's buffer is full before receiving
// might be because of blocking sniffers
let (tx, mut rx) = mpsc::channel(1);

tokio::task::spawn(async move {
while let Some(message) = rx.next().await {
while let Some(message) = rx.recv().await {
ws_tx
.send(message)
.unwrap_or_else(|e| {
Expand All @@ -121,7 +124,7 @@ async fn client_connected(ws: WebSocket, clients: Clients, source_to_packets: Pa

async fn send_pcap_filenames(
client_id: &usize,
ws_tx: &mut UnboundedSender<Message>,
ws_tx: &mut SplitSink<WebSocket, Message>,
source_to_packets: &Arc<HashMap<Source, Packets>>,
) {
let sources = source_to_packets.keys().cloned().collect();
Expand All @@ -133,9 +136,12 @@ async fn send_pcap_filenames(
};

let msg = Message::binary(encoded);
ws_tx.send(msg).unwrap_or_else(|e| {
error!("WebSocket send error: {}, client_id: {}", e, client_id);
});
ws_tx
.send(msg)
.unwrap_or_else(|e| {
error!("WebSocket send error: {}, client_id: {}", e, client_id);
})
.await;
}

async fn sniff<T: pcap::Activated>(mut sniffer: Sniffer<T>, packets: Packets, clients: Clients) {
Expand All @@ -156,34 +162,36 @@ async fn sniff<T: pcap::Activated>(mut sniffer: Sniffer<T>, packets: Packets, cl
source: Some(source),
sender,
} if *source == sniffer.source => {
sender.send(msg.clone()).unwrap_or_else(|e| {
error!("Sniffer: error while sending packet: {}", e);
})
sender
.send(msg.clone())
.unwrap_or_else(|e| {
error!("Sniffer: error while sending packet: {}", e);
})
.await;
}
_ => {}
}
}
packets.write().await.push(response);
}
Err(err) => error!("Error when capturing a packet: {:?}", err),
Err(err) => info!("Error when capturing a packet: {:?}", err),
}
}
}

async fn send_all_packets(
client_id: usize,
packets: &Packets,
ws_tx: &mut UnboundedSender<Message>,
) {
async fn send_all_packets(client_id: usize, packets: &Packets, ws_tx: &mut Sender<Message>) {
for pack in packets.read().await.iter() {
let Ok(encoded) = pack.encode() else {
error!("Failed to encode packet, client_id: {}", client_id);
continue;
};
let msg = Message::binary(encoded);
ws_tx.send(msg).unwrap_or_else(|e| {
error!("WebSocket `feed` error: {}, client_id: {}", e, client_id);
})
ws_tx
.send(msg)
.unwrap_or_else(|e| {
error!("WebSocket `feed` error: {}, client_id: {}", e, client_id);
})
.await;
}

info!(
Expand Down Expand Up @@ -224,11 +232,16 @@ async fn reparse_packet(
Client {
source: Some(source),
sender,
} if *source == *cur_source => sender.send(msg.clone()).unwrap_or_else(|e| {
error!("Sniffer: error while sending packet: {}", e);
}),
} if *source == *cur_source => {
sender
.send(msg.clone())
.unwrap_or_else(|e| {
error!("Sniffer: error while sending packet: {}", e);
})
.await;
}
_ => {}
}
};
}
}

Expand Down
20 changes: 10 additions & 10 deletions src/sniffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl Sniffer<pcap::Offline> {
pub fn from_file(file: &str) -> Result<Self> {
match pcap::Capture::from_file(file) {
Ok(capture) => Ok(Self {
packet_id: 0,
packet_id: 1,
capture,
source: Source::File(file.to_string()),
}),
Expand All @@ -37,9 +37,9 @@ impl Sniffer<pcap::Active> {
return Err(Error::DeviceNotFound);
};

match capture.open() {
match capture.immediate_mode(true).open() {
Ok(capture) => Ok(Self {
packet_id: 0,
packet_id: 1,
capture,
source: Source::Interface(device.to_string()),
}),
Expand All @@ -56,12 +56,12 @@ impl<T: pcap::Activated> Sniffer<T> {
Err(_) => return Some(Err(Error::CouldntReceivePacket)),
};

match Packet::build(&packet, self.packet_id) {
Some(packet) => {
self.packet_id += 1;
Some(Ok(packet))
}
None => Some(Err(Error::UnsupportedPacketType)),
}
let res = match Packet::build(&packet, self.packet_id) {
Some(packet) => Ok(packet),
None => Err(Error::UnsupportedPacketType),
};

self.packet_id += 1;
Some(res)
}
}

0 comments on commit b7a043e

Please sign in to comment.