From ceb70ad0c0147bf44ffacf52eaa4db7f66351afa Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 10 Nov 2023 22:55:17 +0600 Subject: [PATCH] Use new ntex-io apis --- CHANGES.md | 4 ++ Cargo.toml | 11 +-- src/error.rs | 3 + src/io.rs | 137 ++++++++++++++++++++++-------------- src/service.rs | 25 +++---- src/v3/client/connection.rs | 36 ++++------ src/v3/client/connector.rs | 19 ++--- src/v3/client/dispatcher.rs | 5 ++ src/v3/dispatcher.rs | 5 ++ src/v3/server.rs | 30 ++++---- src/v5/client/connection.rs | 43 ++++------- src/v5/client/connector.rs | 25 +++---- src/v5/client/dispatcher.rs | 5 ++ src/v5/dispatcher.rs | 5 ++ src/v5/server.rs | 36 +++++----- 15 files changed, 208 insertions(+), 181 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 9011995..b924abf 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.12.8] - 2023-11-11 + +* Use new ntex-io apis + ## [0.12.7] - 2023-11-04 * Fix v5::Subscribe/Unsubscribe packet properties encoding diff --git a/Cargo.toml b/Cargo.toml index 401e1cb..0ad880e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-mqtt" -version = "0.12.7" +version = "0.12.8" authors = ["ntex contributors "] description = "Client and Server framework for MQTT v5 and v3.1.1 protocols" documentation = "https://docs.rs/ntex-mqtt" @@ -11,9 +11,12 @@ license = "MIT" exclude = [".gitignore", ".travis.yml", ".cargo/config"] edition = "2021" +[package.metadata.docs.rs] +features = ["ntex/tokio"] + [dependencies] -ntex = "0.7.4" -ntex-util = "0.3.2" +ntex = "0.7.9" +ntex-bytes = "0.1.21" bitflags = "2.4" log = "0.4" pin-project-lite = "0.2" @@ -28,4 +31,4 @@ rustls = "0.21" rustls-pemfile = "1.0" openssl = "0.10" test-case = "3.2" -ntex = { version = "0.7.4", features = ["tokio", "rustls", "openssl"] } +ntex = { version = "0.7", features = ["tokio", "rustls", "openssl"] } diff --git a/src/error.rs b/src/error.rs index cddc0d2..44fc4a9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -47,6 +47,9 @@ pub enum ProtocolError { /// Keep alive timeout #[error("Keep Alive timeout")] KeepAliveTimeout, + /// Read frame timeout + #[error("Read frame timeout")] + ReadTimeout, } #[derive(Debug, thiserror::Error)] diff --git a/src/io.rs b/src/io.rs index 575bf2f..1d0be17 100644 --- a/src/io.rs +++ b/src/io.rs @@ -3,13 +3,17 @@ use std::task::{Context, Poll}; use std::{cell::RefCell, collections::VecDeque, future::Future, pin::Pin, rc::Rc, time}; use ntex::codec::{Decoder, Encoder}; -use ntex::io::{DispatchItem, IoBoxed, IoRef, IoStatusUpdate, RecvError}; +use ntex::io::{ + Decoded, DispatchItem, DispatcherConfig, IoBoxed, IoRef, IoStatusUpdate, RecvError, +}; use ntex::service::{IntoService, Pipeline, PipelineCall, Service}; -use ntex::time::Seconds; +use ntex::time::{now, Seconds}; use ntex::util::{ready, Pool}; type Response = ::Item; +const ONE_SEC: time::Duration = time::Duration::from_secs(1); + pin_project_lite::pin_project! { /// Dispatcher for mqtt protocol pub(crate) struct Dispatcher @@ -17,8 +21,8 @@ pin_project_lite::pin_project! { S: Service, Response = Option>>, S: 'static, U: Encoder, - U: Decoder, - U: 'static, + U: Decoder, + U: 'static, { codec: U, service: Pipeline, @@ -31,9 +35,11 @@ pin_project_lite::pin_project! { } bitflags::bitflags! { + #[derive(Copy, Clone, Eq, PartialEq)] struct Flags: u8 { - const READY_ERR = 0b0001; - const IO_ERR = 0b0010; + const READY_ERR = 0b0001; + const IO_ERR = 0b0010; + const READ_TIMEOUT = 0b0100; } } @@ -42,6 +48,9 @@ struct DispatcherInner>, U: Encoder + Decoder> { flags: Flags, st: IoDispatcherState, state: Rc>>, + config: DispatcherConfig, + read_bytes: u32, + read_max_timeout: time::Instant, keepalive_timeout: time::Duration, } @@ -102,11 +111,11 @@ where io: IoBoxed, codec: U, service: F, + config: &DispatcherConfig, ) -> Self { - let keepalive_timeout = Seconds(30).into(); - // register keepalive timer - io.start_keepalive_timer(keepalive_timeout); + io.start_timer(Seconds(30).into()); + io.set_disconnect_timeout(config.disconnect_timeout()); let state = Rc::new(RefCell::new(DispatcherState { error: None, @@ -124,9 +133,12 @@ where inner: DispatcherInner { io, state, - keepalive_timeout, + config: config.clone(), flags: Flags::empty(), st: IoDispatcherState::Processing, + read_bytes: 0, + read_max_timeout: now(), + keepalive_timeout: time::Duration::from_secs(30), }, } } @@ -137,44 +149,12 @@ where /// /// By default keep-alive timeout is set to 30 seconds. pub(crate) fn keepalive_timeout(mut self, timeout: Seconds) -> Self { - let timeout = timeout.into(); - self.inner.io.start_keepalive_timer(timeout); - self.inner.keepalive_timeout = timeout; - self - } - - /// Set connection disconnect timeout. - /// - /// Defines a timeout for disconnect connection. If a disconnect procedure does not complete - /// within this time, the connection get dropped. - /// - /// To disable timeout set value to 0. - /// - /// By default disconnect timeout is set to 1 seconds. - pub(crate) fn disconnect_timeout(self, val: Seconds) -> Self { - self.inner.io.set_disconnect_timeout(val.into()); + self.inner.io.start_timer(timeout.into()); + self.inner.keepalive_timeout = timeout.into(); self } } -impl DispatcherInner -where - S: Service, Response = Option>> + 'static, - U: Decoder + Encoder + Clone + 'static, - ::Item: 'static, -{ - fn update_keepalive(&self) { - // update keep-alive timer - self.io.start_keepalive_timer(self.keepalive_timeout); - } - - fn unregister_keepalive(&mut self) { - // unregister keep-alive timer - self.io.stop_keepalive_timer(); - self.keepalive_timeout = time::Duration::ZERO; - } -} - impl DispatcherState where S: Service, Response = Option>> + 'static, @@ -275,11 +255,15 @@ where let item = match ready!(inner.poll_service(this.service, cx)) { PollService::Ready => { // decode incoming bytes stream - match ready!(inner.io.poll_recv(this.codec, cx)) { - Ok(el) => { + match inner.io.poll_recv_decode(this.codec, cx) { + Ok(decoded) => { // update keep-alive timer - inner.update_keepalive(); - Some(DispatchItem::Item(el)) + inner.update_timer(&decoded); + if let Some(el) = decoded.item { + Some(DispatchItem::Item(el)) + } else { + return Poll::Pending; + } } Err(RecvError::Stop) => { log::trace!("dispatcher is instructed to stop"); @@ -377,7 +361,7 @@ where } // drain service responses and shutdown io IoDispatcherState::Stop => { - inner.unregister_keepalive(); + inner.io.stop_timer(); // service may relay on poll_ready for response results if !inner.flags.contains(Flags::READY_ERR) { @@ -502,6 +486,52 @@ where } } } + + fn update_timer(&mut self, decoded: &Decoded<::Item>) { + // we got parsed frame + if decoded.item.is_some() { + // remove all timers + if self.flags.contains(Flags::READ_TIMEOUT) { + self.flags.remove(Flags::READ_TIMEOUT); + } + self.io.stop_timer(); + } else if self.flags.contains(Flags::READ_TIMEOUT) { + // update read timer + if let Some((_, max, rate)) = self.config.frame_read_rate() { + let bytes = decoded.remains as u32; + + let delta = (bytes - self.read_bytes).try_into().unwrap_or(u16::MAX); + + if delta >= rate { + let n = now(); + let next = self.io.timer_deadline() + ONE_SEC; + let new_timeout = if n >= next { ONE_SEC } else { next - n }; + + // max timeout + if max.is_zero() || (n + new_timeout) <= self.read_max_timeout { + self.read_bytes = bytes; + self.io.stop_timer(); + self.io.start_timer(new_timeout); + } + } + } + } else { + // no new data then start keep-alive timer + if decoded.remains == 0 { + self.io.start_timer(self.keepalive_timeout); + } else if let Some((period, max, _)) = self.config.frame_read_rate() { + // we got new data but not enough to parse single frame + // start read timer + self.flags.insert(Flags::READ_TIMEOUT); + + self.io.start_timer(period); + self.read_bytes = decoded.remains as u32; + if !max.is_zero() { + self.read_max_timeout = now() + max; + } + } + } + } } #[cfg(test)] @@ -529,9 +559,11 @@ mod tests { service: F, ) -> (Self, nio::IoRef) { let keepalive_timeout = Seconds(30).into(); - io.start_keepalive_timer(keepalive_timeout); + io.start_timer(keepalive_timeout); let rio = io.get_ref(); + let config = DispatcherConfig::default(); + let state = Rc::new(RefCell::new(DispatcherState { error: None, base: 0, @@ -547,10 +579,13 @@ mod tests { pool: io.memory_pool().pool(), inner: DispatcherInner { state, + config, keepalive_timeout, io: IoBoxed::from(io), st: IoDispatcherState::Processing, flags: Flags::empty(), + read_bytes: 0, + read_max_timeout: now(), }, }, rio, @@ -652,7 +687,7 @@ mod tests { }), ); ntex::rt::spawn(async move { - let _ = disp.disconnect_timeout(Seconds(1)).await; + let _ = disp.await; }); let buf = client.read().await.unwrap(); diff --git a/src/service.rs b/src/service.rs index 21a2aba..71791d6 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,7 +1,7 @@ use std::{fmt, marker::PhantomData, rc::Rc}; use ntex::codec::{Decoder, Encoder}; -use ntex::io::{DispatchItem, Filter, Io, IoBoxed}; +use ntex::io::{DispatchItem, DispatcherConfig, Filter, Io, IoBoxed}; use ntex::service::{Service, ServiceCtx, ServiceFactory}; use ntex::time::{Deadline, Seconds}; use ntex::util::{select, BoxFuture, Either}; @@ -13,13 +13,13 @@ type ResponseItem = Option<::Item>; pub struct MqttServer { connect: C, handler: Rc, - disconnect_timeout: Seconds, + config: DispatcherConfig, _t: PhantomData<(St, Codec)>, } impl MqttServer { - pub(crate) fn new(connect: C, service: T, disconnect_timeout: Seconds) -> Self { - MqttServer { connect, disconnect_timeout, handler: Rc::new(service), _t: PhantomData } + pub(crate) fn new(connect: C, service: T, config: DispatcherConfig) -> Self { + MqttServer { connect, config, handler: Rc::new(service), _t: PhantomData } } } @@ -32,8 +32,8 @@ where ) -> Result, C::InitError> { // create connect service and then create service impl Ok(MqttHandler { + config: self.config.clone(), handler: self.handler.clone(), - disconnect_timeout: self.disconnect_timeout, connect: self.connect.create(()).await?, _t: PhantomData, }) @@ -119,7 +119,7 @@ where pub struct MqttHandler { connect: C, handler: Rc, - disconnect_timeout: Seconds, + config: DispatcherConfig, _t: PhantomData<(St, Codec)>, } @@ -147,7 +147,6 @@ where #[inline] fn call<'a>(&'a self, req: IoBoxed, ctx: ServiceCtx<'a, Self>) -> Self::Future<'a> { Box::pin(async move { - let timeout = self.disconnect_timeout; let handshake = ctx.call(&self.connect, req).await; let (io, codec, session, keepalive) = handshake.map_err(|e| { @@ -159,10 +158,7 @@ where let handler = self.handler.create(session).await?; log::trace!("Connection handler is created, starting dispatcher"); - Dispatcher::new(io, codec, handler) - .keepalive_timeout(keepalive) - .disconnect_timeout(timeout) - .await + Dispatcher::new(io, codec, handler, &self.config).keepalive_timeout(keepalive).await }) } } @@ -223,8 +219,6 @@ where ctx: ServiceCtx<'a, Self>, ) -> Self::Future<'a> { Box::pin(async move { - let timeout = self.disconnect_timeout; - let (io, codec, ka, handler) = { let res = select( delay, @@ -253,10 +247,7 @@ where } }; - Dispatcher::new(io, codec, handler) - .keepalive_timeout(ka) - .disconnect_timeout(timeout) - .await + Dispatcher::new(io, codec, handler, &self.config).keepalive_timeout(ka).await }) } } diff --git a/src/v3/client/connection.rs b/src/v3/client/connection.rs index 10b18da..ed8c568 100644 --- a/src/v3/client/connection.rs +++ b/src/v3/client/connection.rs @@ -1,6 +1,6 @@ use std::{fmt, marker::PhantomData, rc::Rc}; -use ntex::io::IoBoxed; +use ntex::io::{DispatcherConfig, IoBoxed}; use ntex::router::{IntoPattern, Router, RouterBuilder}; use ntex::service::{boxed, into_service, IntoService, Pipeline, Service}; use ntex::time::{sleep, Millis, Seconds}; @@ -17,18 +17,18 @@ pub struct Client { io: IoBoxed, shared: Rc, keepalive: Seconds, - disconnect_timeout: Seconds, session_present: bool, max_receive: usize, + config: DispatcherConfig, } impl fmt::Debug for Client { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("v3::Client") .field("keepalive", &self.keepalive) - .field("disconnect_timeout", &self.disconnect_timeout) .field("session_present", &self.session_present) .field("max_receive", &self.max_receive) + .field("config", &self.config) .finish() } } @@ -40,15 +40,15 @@ impl Client { shared: Rc, session_present: bool, keepalive_timeout: Seconds, - disconnect_timeout: Seconds, max_receive: usize, + config: DispatcherConfig, ) -> Self { Client { io, shared, session_present, - disconnect_timeout, max_receive, + config, keepalive: keepalive_timeout, } } @@ -84,7 +84,7 @@ impl Client { io: self.io, shared: self.shared, keepalive: self.keepalive, - disconnect_timeout: self.disconnect_timeout, + config: self.config, max_receive: self.max_receive, _t: PhantomData, } @@ -105,10 +105,7 @@ impl Client { into_service(|msg: ControlMessage<()>| Ready::<_, ()>::Ok(msg.disconnect())), ); - let _ = Dispatcher::new(self.io, self.shared.clone(), dispatcher) - .keepalive_timeout(Seconds::ZERO) - .disconnect_timeout(self.disconnect_timeout) - .await; + let _ = Dispatcher::new(self.io, self.shared.clone(), dispatcher, &self.config).await; } /// Run client with provided control messages handler @@ -129,10 +126,7 @@ impl Client { service.into_service(), ); - Dispatcher::new(self.io, self.shared.clone(), dispatcher) - .keepalive_timeout(Seconds::ZERO) - .disconnect_timeout(self.disconnect_timeout) - .await + Dispatcher::new(self.io, self.shared.clone(), dispatcher, &self.config).await } /// Get negotiated io stream and codec @@ -150,8 +144,8 @@ pub struct ClientRouter { io: IoBoxed, shared: Rc, keepalive: Seconds, - disconnect_timeout: Seconds, max_receive: usize, + config: DispatcherConfig, _t: PhantomData, } @@ -159,8 +153,8 @@ impl fmt::Debug for ClientRouter { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("v3::ClientRouter") .field("keepalive", &self.keepalive) - .field("disconnect_timeout", &self.disconnect_timeout) .field("max_receive", &self.max_receive) + .field("config", &self.config) .finish() } } @@ -195,10 +189,7 @@ where into_service(|msg: ControlMessage| Ready::<_, Err>::Ok(msg.disconnect())), ); - let _ = Dispatcher::new(self.io, self.shared.clone(), dispatcher) - .keepalive_timeout(Seconds::ZERO) - .disconnect_timeout(self.disconnect_timeout) - .await; + let _ = Dispatcher::new(self.io, self.shared.clone(), dispatcher, &self.config).await; } /// Run client and handle control messages @@ -218,10 +209,7 @@ where service.into_service(), ); - Dispatcher::new(self.io, self.shared.clone(), dispatcher) - .keepalive_timeout(Seconds::ZERO) - .disconnect_timeout(self.disconnect_timeout) - .await + Dispatcher::new(self.io, self.shared.clone(), dispatcher, &self.config).await } } diff --git a/src/v3/client/connector.rs b/src/v3/client/connector.rs index 0ea62c1..8fa63fd 100644 --- a/src/v3/client/connector.rs +++ b/src/v3/client/connector.rs @@ -1,7 +1,7 @@ use std::rc::Rc; use ntex::connect::{self, Address, Connect, Connector}; -use ntex::io::IoBoxed; +use ntex::io::{DispatcherConfig, IoBoxed}; use ntex::service::{IntoService, Pipeline, Service}; use ntex::time::{timeout_checked, Seconds}; use ntex::util::{ByteString, Bytes, PoolId}; @@ -18,7 +18,7 @@ pub struct MqttConnector { max_receive: usize, max_packet_size: u32, handshake_timeout: Seconds, - disconnect_timeout: Seconds, + config: DispatcherConfig, pool: Rc, } @@ -29,15 +29,18 @@ where #[allow(clippy::new_ret_no_self)] /// Create new mqtt connector pub fn new(address: A) -> MqttConnector> { + let config = DispatcherConfig::default(); + config.set_disconnect_timeout(Seconds(3)).set_keepalive_timeout(Seconds(0)); + MqttConnector { address, + config, pkt: codec::Connect::default(), connector: Pipeline::new(Connector::default()), max_send: 16, max_receive: 16, max_packet_size: 64 * 1024, handshake_timeout: Seconds::ZERO, - disconnect_timeout: Seconds(3), pool: Rc::new(MqttSinkPool::default()), } } @@ -155,8 +158,8 @@ where /// To disable timeout set value to 0. /// /// By default disconnect timeout is set to 3 seconds. - pub fn disconnect_timeout(mut self, timeout: Seconds) -> Self { - self.disconnect_timeout = timeout; + pub fn disconnect_timeout(self, timeout: Seconds) -> Self { + self.config.set_disconnect_timeout(timeout); self } @@ -180,11 +183,11 @@ where connector: Pipeline::new(connector.into_service()), pkt: self.pkt, address: self.address, + config: self.config, max_send: self.max_send, max_receive: self.max_receive, max_packet_size: self.max_packet_size, handshake_timeout: self.handshake_timeout, - disconnect_timeout: self.disconnect_timeout, pool: self.pool, } } @@ -210,7 +213,7 @@ where let max_send = self.max_send; let max_receive = self.max_receive; let keepalive_timeout = pkt.keep_alive; - let disconnect_timeout = self.disconnect_timeout; + let config = self.config.clone(); let pool = self.pool.clone(); let codec = codec::Codec::new(); codec.set_max_size(self.max_packet_size); @@ -234,8 +237,8 @@ where shared, pkt.session_present, Seconds(keepalive_timeout), - disconnect_timeout, max_receive, + config, )) } else { Err(ClientError::Ack(pkt)) diff --git a/src/v3/client/dispatcher.rs b/src/v3/client/dispatcher.rs index e3476c5..22cf868 100644 --- a/src/v3/client/dispatcher.rs +++ b/src/v3/client/dispatcher.rs @@ -201,6 +201,11 @@ where ctx, ))) } + DispatchItem::ReadTimeout => Either::Right(Either::Right(ControlResponse::new( + ControlMessage::proto_error(ProtocolError::ReadTimeout), + &self.inner, + ctx, + ))), DispatchItem::WBackPressureEnabled => { self.inner.sink.enable_wr_backpressure(); Either::Right(Either::Left(Ready::Ok(None))) diff --git a/src/v3/dispatcher.rs b/src/v3/dispatcher.rs index 7633369..57122b1 100644 --- a/src/v3/dispatcher.rs +++ b/src/v3/dispatcher.rs @@ -324,6 +324,11 @@ where ctx, ))) } + DispatchItem::ReadTimeout => Either::Right(Either::Right(ControlResponse::new( + ControlMessage::proto_error(ProtocolError::ReadTimeout), + &self.inner, + ctx, + ))), DispatchItem::DecoderError(err) => { Either::Right(Either::Right(ControlResponse::new( ControlMessage::proto_error(ProtocolError::Decode(err)), diff --git a/src/v3/server.rs b/src/v3/server.rs index 2de052b..1b686b2 100644 --- a/src/v3/server.rs +++ b/src/v3/server.rs @@ -1,6 +1,6 @@ use std::{fmt, future::Future, marker::PhantomData, rc::Rc}; -use ntex::io::{DispatchItem, IoBoxed}; +use ntex::io::{DispatchItem, DispatcherConfig, IoBoxed}; use ntex::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory}; use ntex::time::{timeout_checked, Millis, Seconds}; use ntex::util::{BoxFuture, Either}; @@ -50,7 +50,7 @@ pub struct MqttServer { max_inflight: u16, max_inflight_size: usize, connect_timeout: Seconds, - disconnect_timeout: Seconds, + config: DispatcherConfig, pub(super) pool: Rc, _t: PhantomData, } @@ -67,7 +67,11 @@ where where F: IntoServiceFactory, { + let config = DispatcherConfig::default(); + config.set_disconnect_timeout(Seconds(3)); + MqttServer { + config, handshake: handshake.into_factory(), control: DefaultControlService::default(), publish: DefaultPublishService::default(), @@ -76,7 +80,6 @@ where max_inflight: 16, max_inflight_size: 65535, connect_timeout: Seconds::ZERO, - disconnect_timeout: Seconds(3), pool: Default::default(), _t: PhantomData, } @@ -128,8 +131,8 @@ where /// To disable timeout set value to 0. /// /// By default disconnect timeout is set to 3 seconds. - pub fn disconnect_timeout(mut self, val: Seconds) -> Self { - self.disconnect_timeout = val; + pub fn disconnect_timeout(self, val: Seconds) -> Self { + self.config.set_disconnect_timeout(val); self } @@ -182,12 +185,12 @@ where handshake: self.handshake, publish: self.publish, control: service.into_factory(), + config: self.config, max_qos: self.max_qos, max_size: self.max_size, max_inflight: self.max_inflight, max_inflight_size: self.max_inflight_size, connect_timeout: self.connect_timeout, - disconnect_timeout: self.disconnect_timeout, pool: self.pool, _t: PhantomData, } @@ -204,12 +207,12 @@ where handshake: self.handshake, publish: publish.into_factory(), control: self.control, + config: self.config, max_qos: self.max_qos, max_size: self.max_size, max_inflight: self.max_inflight, max_inflight_size: self.max_inflight_size, connect_timeout: self.connect_timeout, - disconnect_timeout: self.disconnect_timeout, pool: self.pool, _t: PhantomData, } @@ -250,7 +253,7 @@ where self.max_inflight_size, self.max_qos, ), - self.disconnect_timeout, + self.config, ) } @@ -279,7 +282,7 @@ where self.max_qos, )), max_size: self.max_size, - disconnect_timeout: self.disconnect_timeout, + config: self.config, _t: PhantomData, } } @@ -417,8 +420,8 @@ where pub(crate) struct ServerSelector { handshake: H, handler: Rc, - disconnect_timeout: Seconds, check: Rc, + config: DispatcherConfig, max_size: u32, _t: PhantomData<(St, R)>, } @@ -449,8 +452,8 @@ where Box::pin(async move { Ok(ServerSelectorImpl { handler: self.handler.clone(), - disconnect_timeout: self.disconnect_timeout, check: self.check.clone(), + config: self.config.clone(), max_size: self.max_size, handshake: self.handshake.create(()).await?, _t: PhantomData, @@ -463,8 +466,8 @@ pub(crate) struct ServerSelectorImpl { check: Rc, handshake: H, handler: Rc, - disconnect_timeout: Seconds, max_size: u32, + config: DispatcherConfig, _t: PhantomData<(St, R)>, } @@ -524,9 +527,8 @@ where let handler = self.handler.create(session).await?; log::trace!("Connection handler is created, starting dispatcher"); - Dispatcher::new(ack.io, ack.shared, handler) + Dispatcher::new(ack.io, ack.shared, handler, &self.config) .keepalive_timeout(ack.keepalive) - .disconnect_timeout(self.disconnect_timeout) .await?; Ok(Either::Right(())) } diff --git a/src/v5/client/connection.rs b/src/v5/client/connection.rs index 63bb28d..d0515b9 100644 --- a/src/v5/client/connection.rs +++ b/src/v5/client/connection.rs @@ -1,6 +1,6 @@ use std::{cell::RefCell, convert::TryFrom, fmt, marker, num::NonZeroU16, rc::Rc}; -use ntex::io::IoBoxed; +use ntex::io::{DispatcherConfig, IoBoxed}; use ntex::router::{IntoPattern, Path, Router, RouterBuilder}; use ntex::service::{boxed, into_service, IntoService, Pipeline, Service}; use ntex::time::{sleep, Millis, Seconds}; @@ -19,8 +19,8 @@ pub struct Client { io: IoBoxed, shared: Rc, keepalive: Seconds, - disconnect_timeout: Seconds, max_receive: usize, + config: DispatcherConfig, pkt: Box, } @@ -28,9 +28,9 @@ impl fmt::Debug for Client { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("v5::Client") .field("keepalive", &self.keepalive) - .field("disconnect_timeout", &self.disconnect_timeout) .field("max_receive", &self.max_receive) .field("connect", &self.pkt) + .field("config", &self.config) .finish() } } @@ -43,16 +43,9 @@ impl Client { pkt: Box, max_receive: u16, keepalive: Seconds, - disconnect_timeout: Seconds, + config: DispatcherConfig, ) -> Self { - Client { - io, - pkt, - shared, - keepalive, - disconnect_timeout, - max_receive: max_receive as usize, - } + Client { io, pkt, shared, keepalive, config, max_receive: max_receive as usize } } } @@ -100,7 +93,7 @@ impl Client { io: self.io, shared: self.shared, keepalive: self.keepalive, - disconnect_timeout: self.disconnect_timeout, + config: self.config, max_receive: self.max_receive, _t: marker::PhantomData, } @@ -124,10 +117,7 @@ impl Client { }), ); - let _ = Dispatcher::new(self.io, self.shared, dispatcher) - .keepalive_timeout(Seconds::ZERO) - .disconnect_timeout(self.disconnect_timeout) - .await; + let _ = Dispatcher::new(self.io, self.shared, dispatcher, &self.config).await; } /// Run client with provided control messages handler @@ -149,10 +139,7 @@ impl Client { service.into_service(), ); - Dispatcher::new(self.io, self.shared, dispatcher) - .keepalive_timeout(Seconds::ZERO) - .disconnect_timeout(self.disconnect_timeout) - .await + Dispatcher::new(self.io, self.shared, dispatcher, &self.config).await } /// Get negotiated io stream and codec @@ -170,7 +157,7 @@ pub struct ClientRouter { handlers: Vec>>, shared: Rc, keepalive: Seconds, - disconnect_timeout: Seconds, + config: DispatcherConfig, max_receive: usize, _t: marker::PhantomData, } @@ -179,7 +166,7 @@ impl fmt::Debug for ClientRouter { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("v5::ClientRouter") .field("keepalive", &self.keepalive) - .field("disconnect_timeout", &self.disconnect_timeout) + .field("config", &self.config) .field("max_receive", &self.max_receive) .finish() } @@ -219,10 +206,7 @@ where }), ); - let _ = Dispatcher::new(self.io, self.shared, dispatcher) - .keepalive_timeout(Seconds::ZERO) - .disconnect_timeout(self.disconnect_timeout) - .await; + let _ = Dispatcher::new(self.io, self.shared, dispatcher, &self.config).await; } /// Run client and handle control messages @@ -243,10 +227,7 @@ where service.into_service(), ); - Dispatcher::new(self.io, self.shared, dispatcher) - .keepalive_timeout(Seconds::ZERO) - .disconnect_timeout(self.disconnect_timeout) - .await + Dispatcher::new(self.io, self.shared, dispatcher, &self.config).await } /// Get negotiated io stream and codec diff --git a/src/v5/client/connector.rs b/src/v5/client/connector.rs index 3bf92f2..7b8a400 100644 --- a/src/v5/client/connector.rs +++ b/src/v5/client/connector.rs @@ -1,7 +1,7 @@ use std::{num::NonZeroU16, num::NonZeroU32, rc::Rc}; use ntex::connect::{self, Address, Connect, Connector}; -use ntex::io::IoBoxed; +use ntex::io::{DispatcherConfig, IoBoxed}; use ntex::service::{IntoService, Pipeline, Service}; use ntex::time::{timeout_checked, Seconds}; use ntex::util::{ByteString, Bytes, PoolId}; @@ -15,7 +15,7 @@ pub struct MqttConnector { connector: Pipeline, pkt: codec::Connect, handshake_timeout: Seconds, - disconnect_timeout: Seconds, + config: DispatcherConfig, pool: Rc, } @@ -26,12 +26,14 @@ where #[allow(clippy::new_ret_no_self)] /// Create new mqtt connector pub fn new(address: A) -> MqttConnector> { + let config = DispatcherConfig::default(); + config.set_disconnect_timeout(Seconds(3)).set_keepalive_timeout(Seconds(0)); MqttConnector { address, + config, pkt: codec::Connect::default(), connector: Pipeline::new(Connector::default()), handshake_timeout: Seconds::ZERO, - disconnect_timeout: Seconds(3), pool: Rc::new(MqttSinkPool::default()), } } @@ -162,8 +164,8 @@ where /// To disable timeout set value to 0. /// /// By default disconnect timeout is set to 3 seconds. - pub fn disconnect_timeout(mut self, timeout: Seconds) -> Self { - self.disconnect_timeout = timeout; + pub fn disconnect_timeout(self, timeout: Seconds) -> Self { + self.config.set_disconnect_timeout(timeout); self } @@ -187,8 +189,8 @@ where connector: Pipeline::new(connector.into_service()), pkt: self.pkt, address: self.address, + config: self.config, handshake_timeout: self.handshake_timeout, - disconnect_timeout: self.disconnect_timeout, pool: self.pool, } } @@ -214,10 +216,10 @@ where let keep_alive = pkt.keep_alive; let max_packet_size = pkt.max_packet_size.map(|v| v.get()).unwrap_or(0); let max_receive = pkt.receive_max.map(|v| v.get()).unwrap_or(65535); - let disconnect_timeout = self.disconnect_timeout; let codec = codec::Codec::new(); codec.set_max_inbound_size(max_packet_size); let pool = self.pool.clone(); + let config = self.config.clone(); io.encode(codec::Packet::Connect(Box::new(pkt)), &codec)?; @@ -240,14 +242,7 @@ where shared.set_cap(pkt.receive_max.get() as usize); - Ok(Client::new( - io, - shared, - pkt, - max_receive, - Seconds(keep_alive), - disconnect_timeout, - )) + Ok(Client::new(io, shared, pkt, max_receive, Seconds(keep_alive), config)) } else { Err(ClientError::Ack(pkt)) } diff --git a/src/v5/client/dispatcher.rs b/src/v5/client/dispatcher.rs index 5adccac..8a1f809 100644 --- a/src/v5/client/dispatcher.rs +++ b/src/v5/client/dispatcher.rs @@ -332,6 +332,11 @@ where ctx, ))) } + DispatchItem::ReadTimeout => Either::Right(Either::Right(ControlResponse::new( + ControlMessage::proto_error(ProtocolError::ReadTimeout), + &self.inner, + ctx, + ))), DispatchItem::WBackPressureEnabled => { self.inner.sink.enable_wr_backpressure(); Either::Right(Either::Left(Ready::Ok(None))) diff --git a/src/v5/dispatcher.rs b/src/v5/dispatcher.rs index 85c3f22..7a74e68 100644 --- a/src/v5/dispatcher.rs +++ b/src/v5/dispatcher.rs @@ -446,6 +446,11 @@ where ctx, ))) } + DispatchItem::ReadTimeout => Either::Right(Either::Right(ControlResponse::new( + ControlMessage::proto_error(ProtocolError::ReadTimeout), + &self.inner, + ctx, + ))), DispatchItem::DecoderError(err) => { Either::Right(Either::Right(ControlResponse::new( ControlMessage::proto_error(ProtocolError::Decode(err)), diff --git a/src/v5/server.rs b/src/v5/server.rs index a67a6e2..832bdc1 100644 --- a/src/v5/server.rs +++ b/src/v5/server.rs @@ -1,6 +1,6 @@ use std::{convert::TryFrom, fmt, future::Future, marker::PhantomData, rc::Rc}; -use ntex::io::{DispatchItem, IoBoxed}; +use ntex::io::{DispatchItem, DispatcherConfig, IoBoxed}; use ntex::service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory}; use ntex::time::{timeout_checked, Millis, Seconds}; use ntex::util::{BoxFuture, Either}; @@ -24,9 +24,9 @@ pub struct MqttServer { max_receive: u16, max_qos: QoS, max_inflight_size: usize, - connect_timeout: Seconds, - disconnect_timeout: Seconds, max_topic_alias: u16, + connect_timeout: Seconds, + config: DispatcherConfig, pub(super) pool: Rc, _t: PhantomData, } @@ -42,7 +42,11 @@ where where F: IntoServiceFactory, { + let config = DispatcherConfig::default(); + config.set_disconnect_timeout(Seconds(3)); + MqttServer { + config, handshake: handshake.into_factory(), srv_control: DefaultControlService::default(), srv_publish: DefaultPublishService::default(), @@ -50,9 +54,8 @@ where max_receive: 15, max_qos: QoS::AtLeastOnce, max_inflight_size: 65535, - connect_timeout: Seconds::ZERO, - disconnect_timeout: Seconds(3), max_topic_alias: 32, + connect_timeout: Seconds::ZERO, pool: Rc::new(MqttSinkPool::default()), _t: PhantomData, } @@ -103,8 +106,8 @@ where /// To disable timeout set value to 0. /// /// By default disconnect timeout is set to 3 seconds. - pub fn disconnect_timeout(mut self, val: Seconds) -> Self { - self.disconnect_timeout = val; + pub fn disconnect_timeout(self, val: Seconds) -> Self { + self.config.set_disconnect_timeout(val); self } @@ -162,6 +165,7 @@ where C::Error: From + From, { MqttServer { + config: self.config, handshake: self.handshake, srv_publish: self.srv_publish, srv_control: service.into_factory(), @@ -171,7 +175,6 @@ where max_qos: self.max_qos, max_inflight_size: self.max_inflight_size, connect_timeout: self.connect_timeout, - disconnect_timeout: self.disconnect_timeout, pool: self.pool, _t: PhantomData, } @@ -187,6 +190,7 @@ where PublishAck: TryFrom, { MqttServer { + config: self.config, handshake: self.handshake, srv_publish: publish.into_factory(), srv_control: self.srv_control, @@ -196,7 +200,6 @@ where max_qos: self.max_qos, max_inflight_size: self.max_inflight_size, connect_timeout: self.connect_timeout, - disconnect_timeout: self.disconnect_timeout, pool: self.pool, _t: PhantomData, } @@ -250,7 +253,7 @@ where _t: PhantomData, }, factory(self.srv_publish, self.srv_control, self.max_inflight_size), - self.disconnect_timeout, + self.config, ) } @@ -280,7 +283,7 @@ where max_receive: self.max_receive, max_topic_alias: self.max_topic_alias, max_qos: self.max_qos, - disconnect_timeout: self.disconnect_timeout, + config: self.config, _t: PhantomData, } } @@ -464,8 +467,8 @@ pub(crate) struct ServerSelector { max_size: u32, max_receive: u16, max_qos: QoS, - disconnect_timeout: Seconds, max_topic_alias: u16, + config: DispatcherConfig, _t: PhantomData<(St, R)>, } @@ -494,22 +497,22 @@ where let fut = self.connect.create(()); let handler = self.handler.clone(); let check = self.check.clone(); + let config = self.config.clone(); let max_size = self.max_size; let max_receive = self.max_receive; let max_qos = self.max_qos; let max_topic_alias = self.max_topic_alias; - let disconnect_timeout = self.disconnect_timeout; // create connect service and then create service impl Box::pin(async move { Ok(ServerSelectorImpl { handler, check, + config, max_size, max_receive, max_qos, max_topic_alias, - disconnect_timeout, connect: fut.await?, _t: PhantomData, }) @@ -524,8 +527,8 @@ pub(crate) struct ServerSelectorImpl { max_size: u32, max_receive: u16, max_qos: QoS, - disconnect_timeout: Seconds, max_topic_alias: u16, + config: DispatcherConfig, _t: PhantomData<(St, R)>, } @@ -609,9 +612,8 @@ where let handler = self.handler.create(session).await?; log::trace!("Connection handler is created, starting dispatcher"); - Dispatcher::new(ack.io, shared, handler) + Dispatcher::new(ack.io, shared, handler, &self.config) .keepalive_timeout(Seconds(ack.keepalive)) - .disconnect_timeout(self.disconnect_timeout) .await?; Ok(Either::Right(())) }