Skip to content

Commit

Permalink
Use payload-type from SDP where applicable
Browse files Browse the repository at this point in the history
  • Loading branch information
LVala committed Dec 17, 2023
1 parent d78a53c commit b3fe562
Show file tree
Hide file tree
Showing 15 changed files with 308 additions and 49 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions client/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 10 additions & 4 deletions client/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ use eframe::egui;
use egui::{ComboBox, Ui};
use ewebsock::{WsEvent, WsMessage, WsReceiver, WsSender};
use log::{error, warn};
use rtpeeker_common::{Request, Response, Source};
use rtpeeker_common::{Request, Response, Source, StreamKey};

use packets_table::PacketsTable;
use rtcp_packets_table::RtcpPacketsTable;
use rtp_packets_table::RtpPacketsTable;
use rtp_streams_table::RtpStreamsTable;
use tab::Tab;

use crate::streams::{RefStreams, StreamKey};
use crate::streams::RefStreams;
use rtp_streams_plot::RtpStreamsPlot;

mod packets_table;
Expand Down Expand Up @@ -79,7 +79,7 @@ impl App {
let packets_table = PacketsTable::new(streams.clone(), ws_sender.clone());
let rtp_packets_table = RtpPacketsTable::new(streams.clone());
let rtcp_packets_table = RtcpPacketsTable::new(streams.clone());
let rtp_streams_table = RtpStreamsTable::new(streams.clone());
let rtp_streams_table = RtpStreamsTable::new(streams.clone(), ws_sender.clone());
let rtp_streams_plot = RtpStreamsPlot::new(streams.clone());

let (tab, selected_source) = get_initial_state(cc);
Expand Down Expand Up @@ -244,7 +244,7 @@ impl App {
};

let Ok(response) = Response::decode(&msg) else {
error!("Failed to decode request message");
error!("Failed to decode response message");
continue;
};

Expand All @@ -263,6 +263,12 @@ impl App {
}
self.sources = sources;
}
Response::Sdp(stream_key, sdp) => {
let mut streams = self.streams.borrow_mut();
if let Some(stream) = streams.streams.get_mut(&stream_key) {
stream.add_sdp(sdp);
}
}
}
}
}
Expand Down
13 changes: 2 additions & 11 deletions client/src/app/rtp_packets_table.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use super::is_stream_visible;
use crate::streams::{RefStreams, StreamKey};
use crate::streams::RefStreams;
use eframe::epaint::Color32;
use egui::RichText;
use egui_extras::{Column, TableBody, TableBuilder};
use rtpeeker_common::packet::SessionPacket;
use rtpeeker_common::StreamKey;
use std::collections::HashMap;

pub struct RtpPacketsTable {
Expand Down Expand Up @@ -162,17 +163,7 @@ impl RtpPacketsTable {
resp.on_hover_text(rtp_packet.payload_type.to_string());

row.col(|ui| {
// if rtp_packet.previous_packet_is_lost {
// let resp = ui.label(
// RichText::from(format!("{} ⚠", rtp_packet.sequence_number))
// .color(Color32::GOLD),
// );
// resp.on_hover_text(
// RichText::from("Previous packet is lost!").color(Color32::GOLD),
// );
// } else {
ui.label(rtp_packet.sequence_number.to_string());
// }
});

row.col(|ui| {
Expand Down
3 changes: 2 additions & 1 deletion client/src/app/rtp_streams_plot.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use self::SettingsXAxis::*;
use super::is_stream_visible;
use crate::streams::stream::{RtpInfo, Stream};
use crate::streams::{RefStreams, StreamKey, Streams};
use crate::streams::{RefStreams, Streams};
use eframe::egui;
use eframe::egui::TextBuffer;
use eframe::epaint::Color32;
Expand All @@ -13,6 +13,7 @@ use egui::{Align2, RichText};
use rtpeeker_common::packet::SessionPacket;
use rtpeeker_common::rtcp::ReceptionReport;
use rtpeeker_common::rtp::payload_type::MediaType;
use rtpeeker_common::StreamKey;
use rtpeeker_common::{Packet, RtcpPacket, RtpPacket};
use std::cell::Ref;
use std::collections::HashMap;
Expand Down
38 changes: 34 additions & 4 deletions client/src/app/rtp_streams_table.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,30 @@
use crate::streams::{stream::Stream, RefStreams, StreamKey};
use crate::streams::{stream::Stream, RefStreams};
use egui::plot::{Line, Plot, PlotPoints};
use egui::{TextEdit, Vec2};
use egui_extras::{Column, TableBody, TableBuilder};
use ewebsock::{WsMessage, WsSender};
use rtpeeker_common::{Request, StreamKey};

const SDP_PROMPT: &str = "Paste your SDP media section here, e.g.
m=audio 5004 RTP/AVP 96
c=IN IP4 239.30.22.1
a=rtpmap:96 L24/48000/2
a=recvonly
";

pub struct RtpStreamsTable {
streams: RefStreams,
ws_sender: WsSender,
sdp_window_open: bool,
chosen_key: Option<StreamKey>,
sdp: String,
}

impl RtpStreamsTable {
pub fn new(streams: RefStreams) -> Self {
pub fn new(streams: RefStreams, ws_sender: WsSender) -> Self {
Self {
streams,
ws_sender,
sdp_window_open: false,
chosen_key: None,
sdp: String::new(),
Expand All @@ -32,22 +43,29 @@ impl RtpStreamsTable {
return;
};

let mut send_sdp = false;

egui::Window::new(format!("SDP - {:x}", ssrc))
.open(&mut self.sdp_window_open)
.default_width(800.0)
.default_height(800.0)
.vscroll(true)
.show(ctx, |ui| {
TextEdit::multiline(&mut self.sdp)
.hint_text("Paste your SDP m-line here...")
.hint_text(SDP_PROMPT)
.desired_rows(30)
.desired_width(f32::INFINITY)
.show(ui);
ui.add_space(10.0);
if ui.button(format!("Set SDP for {:x}", ssrc)).clicked() {
// TODO: SDP handling
send_sdp = true;
}
});

if send_sdp {
self.send_sdp_request();
self.sdp_window_open = false;
}
}

fn build_table(&mut self, ui: &mut egui::Ui) {
Expand Down Expand Up @@ -153,6 +171,18 @@ impl RtpStreamsTable {
});
});
}

fn send_sdp_request(&mut self) {
let request = Request::ParseSdp(self.chosen_key.unwrap(), self.sdp.clone());

let Ok(msg) = request.encode() else {
log::error!("Failed to encode a request message");
return;
};
let msg = WsMessage::Binary(msg);

self.ws_sender.send(msg);
}
}

fn build_jitter_plot(ui: &mut egui::Ui, stream: &Stream) {
Expand Down
2 changes: 1 addition & 1 deletion client/src/streams.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use packets::Packets;
use rtpeeker_common::packet::SessionPacket;
use rtpeeker_common::StreamKey;
use rtpeeker_common::{packet::TransportProtocol, Packet, RtcpPacket};
use std::cell::RefCell;
use std::collections::HashMap;
Expand All @@ -11,7 +12,6 @@ mod packets;
pub mod stream;

pub type RefStreams = Rc<RefCell<Streams>>;
pub type StreamKey = (SocketAddr, SocketAddr, TransportProtocol, u32);

#[derive(Debug, Default)]
pub struct Streams {
Expand Down
42 changes: 40 additions & 2 deletions client/src/streams/stream.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::utils::ntp_to_f64;
use rtpeeker_common::packet::TransportProtocol;
use rtpeeker_common::rtcp::{source_description::SdesType, SourceDescription};
use rtpeeker_common::{Packet, RtcpPacket, RtpPacket};
use rtpeeker_common::rtp::payload_type::PayloadType;
use rtpeeker_common::{Packet, RtcpPacket, RtpPacket, Sdp};
use std::cmp::{max, min};
use std::net::SocketAddr;
use std::time::Duration;
Expand Down Expand Up @@ -55,6 +56,7 @@ pub struct Stream {
last_sequence_number: u16,
first_time: Duration,
last_time: Duration,
sdp: Option<Sdp>,
// ntp synchronization
pub ntp_rtp: Option<(u64, u32)>,
pub estimated_clock_rate: Option<f64>,
Expand Down Expand Up @@ -92,11 +94,17 @@ impl Stream {
last_sequence_number: rtp.sequence_number,
first_time: packet.timestamp,
last_time: packet.timestamp,
sdp: None,
ntp_rtp: None,
estimated_clock_rate: None,
}
}

pub fn add_sdp(&mut self, sdp: Sdp) {
self.sdp = Some(sdp);
self.recalculate();
}

pub fn get_duration(&self) -> Duration {
self.last_time.checked_sub(self.first_time).unwrap()
}
Expand Down Expand Up @@ -161,6 +169,22 @@ impl Stream {
self.rtcp_packets.push(rtcp_info);
}

fn recalculate(&mut self) {
let mut rtp_packets = std::mem::take(&mut self.rtp_packets).into_iter();
let rtp_info = rtp_packets.next().unwrap();
self.bytes = rtp_info.bytes;
self.max_jitter = 0.0;
self.sum_jitter = 0.0;
self.jitter_count = 0;
self.first_sequence_number = rtp_info.packet.sequence_number;
self.last_sequence_number = rtp_info.packet.sequence_number;
self.first_time = rtp_info.time;
self.last_time = rtp_info.time;
self.rtp_packets = vec![rtp_info];

rtp_packets.for_each(|rtp| self.update_rtp_parameters(rtp));
}

fn update_rtp_parameters(&mut self, mut rtp_info: RtpInfo) {
rtp_info.time_delta = rtp_info.time - self.rtp_packets.last().unwrap().time;

Expand All @@ -184,8 +208,22 @@ impl Stream {
// TODO
}

fn get_payload_type(&self, rtp_info: &RtpInfo) -> PayloadType {
let id = &rtp_info.packet.payload_type.id;

if let Some(sdp) = &self.sdp {
if let Some(pt) = sdp.payload_types.get(id) {
return pt.clone();
}
};

rtp_info.packet.payload_type.clone()
}

fn update_jitter(&mut self, rtp_info: &mut RtpInfo) {
let Some(clock_rate) = rtp_info.packet.payload_type.clock_rate else {
let payload_type = self.get_payload_type(rtp_info);

let Some(clock_rate) = payload_type.clock_rate else {
return;
};

Expand Down
Loading

0 comments on commit b3fe562

Please sign in to comment.