Skip to content

Commit

Permalink
Fix v5::Subscribe/Unsubscribe packet properties encoding (#154)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Nov 4, 2023
1 parent 37aa52c commit a8c37e6
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 1 deletion.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.12.7] - 2023-11-04

* Fix v5::Subscribe/Unsubscribe packet properties encoding

## [0.12.6] - 2023-10-31

* Send server ConnectAck without io flushing
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-mqtt"
version = "0.12.6"
version = "0.12.7"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Client and Server framework for MQTT v5 and v3.1.1 protocols"
documentation = "https://docs.rs/ntex-mqtt"
Expand Down
56 changes: 56 additions & 0 deletions src/v5/codec/packet/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ impl EncodeLtd for Subscribe {
fn encode(&self, buf: &mut BytesMut, _: u32) -> Result<(), EncodeError> {
self.packet_id.encode(buf)?;

// encode properties
let prop_len = self.id.map_or(0, |v| 1 + var_int_len(v.get() as usize))
+ self.user_properties.encoded_size() as u32; // safe: size was already checked against maximum
utils::write_variable_length(prop_len, buf);
Expand All @@ -208,6 +209,9 @@ impl EncodeLtd for Subscribe {
write_variable_length(id.get(), buf);
}

self.user_properties.encode(buf)?;

// payload
for (filter, opts) in self.topic_filters.iter() {
filter.encode(buf)?;
opts.encode(buf)?;
Expand Down Expand Up @@ -282,8 +286,13 @@ impl EncodeLtd for Unsubscribe {

fn encode(&self, buf: &mut BytesMut, _size: u32) -> Result<(), EncodeError> {
self.packet_id.encode(buf)?;

// properties
let prop_len = self.user_properties.encoded_size();
utils::write_variable_length(prop_len as u32, buf); // safe: max size check is done already
self.user_properties.encode(buf)?;

// payload
for filter in self.topic_filters.iter() {
filter.encode(buf)?;
}
Expand Down Expand Up @@ -317,8 +326,55 @@ impl EncodeLtd for UnsubscribeAck {

#[cfg(test)]
mod tests {
use ntex::codec::{Decoder, Encoder};

use super::super::super::{Codec, Packet};
use super::*;

#[test]
fn test_sub() {
let pkt = Subscribe {
packet_id: 12.try_into().unwrap(),
id: Some(10.try_into().unwrap()),
user_properties: vec![("a".into(), "1".into())],
topic_filters: vec![("test".into(), SubscriptionOptions::default())],
};

let size = pkt.encoded_size(99999);
let mut buf = BytesMut::with_capacity(size);
pkt.encode(&mut buf, size as u32).unwrap();
assert_eq!(buf.len(), size);
assert_eq!(pkt, Subscribe::decode(&mut buf.freeze()).unwrap());

let pkt = Unsubscribe {
packet_id: 12.try_into().unwrap(),
user_properties: vec![("a".into(), "1".into())],
topic_filters: vec!["test".into()],
};

let size = pkt.encoded_size(99999);
let mut buf = BytesMut::with_capacity(size);
pkt.encode(&mut buf, size as u32).unwrap();
assert_eq!(buf.len(), size);
assert_eq!(pkt, Unsubscribe::decode(&mut buf.freeze()).unwrap());
}

#[test]
fn test_sub_pkt() {
let pkt = Packet::Subscribe(Subscribe {
packet_id: 12.try_into().unwrap(),
id: None,
user_properties: vec![("a".into(), "1".into())],
topic_filters: vec![("test".into(), SubscriptionOptions::default())],
});
let codec = Codec::new();

let mut buf = BytesMut::new();
codec.encode(pkt.clone(), &mut buf).unwrap();

assert_eq!(pkt, codec.decode(&mut buf).unwrap().unwrap().0);
}

#[test]
fn test_sub_ack() {
let ack = SubscribeAck {
Expand Down

0 comments on commit a8c37e6

Please sign in to comment.