From a8c37e6ac893f5979076b0900baee13d09894173 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Sat, 4 Nov 2023 23:39:11 +0600 Subject: [PATCH] Fix v5::Subscribe/Unsubscribe packet properties encoding (#154) --- CHANGES.md | 4 +++ Cargo.toml | 2 +- src/v5/codec/packet/subscribe.rs | 56 ++++++++++++++++++++++++++++++++ 3 files changed, 61 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index 17cc91a..9011995 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.12.7] - 2023-11-04 + +* Fix v5::Subscribe/Unsubscribe packet properties encoding + ## [0.12.6] - 2023-10-31 * Send server ConnectAck without io flushing diff --git a/Cargo.toml b/Cargo.toml index 33b4c42..401e1cb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-mqtt" -version = "0.12.6" +version = "0.12.7" authors = ["ntex contributors "] description = "Client and Server framework for MQTT v5 and v3.1.1 protocols" documentation = "https://docs.rs/ntex-mqtt" diff --git a/src/v5/codec/packet/subscribe.rs b/src/v5/codec/packet/subscribe.rs index ee76817..1e692ec 100644 --- a/src/v5/codec/packet/subscribe.rs +++ b/src/v5/codec/packet/subscribe.rs @@ -199,6 +199,7 @@ impl EncodeLtd for Subscribe { fn encode(&self, buf: &mut BytesMut, _: u32) -> Result<(), EncodeError> { self.packet_id.encode(buf)?; + // encode properties let prop_len = self.id.map_or(0, |v| 1 + var_int_len(v.get() as usize)) + self.user_properties.encoded_size() as u32; // safe: size was already checked against maximum utils::write_variable_length(prop_len, buf); @@ -208,6 +209,9 @@ impl EncodeLtd for Subscribe { write_variable_length(id.get(), buf); } + self.user_properties.encode(buf)?; + + // payload for (filter, opts) in self.topic_filters.iter() { filter.encode(buf)?; opts.encode(buf)?; @@ -282,8 +286,13 @@ impl EncodeLtd for Unsubscribe { fn encode(&self, buf: &mut BytesMut, _size: u32) -> Result<(), EncodeError> { self.packet_id.encode(buf)?; + + // properties let prop_len = self.user_properties.encoded_size(); utils::write_variable_length(prop_len as u32, buf); // safe: max size check is done already + self.user_properties.encode(buf)?; + + // payload for filter in self.topic_filters.iter() { filter.encode(buf)?; } @@ -317,8 +326,55 @@ impl EncodeLtd for UnsubscribeAck { #[cfg(test)] mod tests { + use ntex::codec::{Decoder, Encoder}; + + use super::super::super::{Codec, Packet}; use super::*; + #[test] + fn test_sub() { + let pkt = Subscribe { + packet_id: 12.try_into().unwrap(), + id: Some(10.try_into().unwrap()), + user_properties: vec![("a".into(), "1".into())], + topic_filters: vec![("test".into(), SubscriptionOptions::default())], + }; + + let size = pkt.encoded_size(99999); + let mut buf = BytesMut::with_capacity(size); + pkt.encode(&mut buf, size as u32).unwrap(); + assert_eq!(buf.len(), size); + assert_eq!(pkt, Subscribe::decode(&mut buf.freeze()).unwrap()); + + let pkt = Unsubscribe { + packet_id: 12.try_into().unwrap(), + user_properties: vec![("a".into(), "1".into())], + topic_filters: vec!["test".into()], + }; + + let size = pkt.encoded_size(99999); + let mut buf = BytesMut::with_capacity(size); + pkt.encode(&mut buf, size as u32).unwrap(); + assert_eq!(buf.len(), size); + assert_eq!(pkt, Unsubscribe::decode(&mut buf.freeze()).unwrap()); + } + + #[test] + fn test_sub_pkt() { + let pkt = Packet::Subscribe(Subscribe { + packet_id: 12.try_into().unwrap(), + id: None, + user_properties: vec![("a".into(), "1".into())], + topic_filters: vec![("test".into(), SubscriptionOptions::default())], + }); + let codec = Codec::new(); + + let mut buf = BytesMut::new(); + codec.encode(pkt.clone(), &mut buf).unwrap(); + + assert_eq!(pkt, codec.decode(&mut buf).unwrap().unwrap().0); + } + #[test] fn test_sub_ack() { let ack = SubscribeAck {