Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor server config #173

Merged
merged 3 commits into from
May 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@
impl<V3, V5, Err, InitErr> MqttServer<V3, V5, Err, InitErr> {
/// Set client timeout reading protocol version.
///
/// Defines a timeout for reading `Connect` frame. If a client does not transmit
/// Defines a timeout for reading protocol version. If a client does not transmit
/// version of the protocol within this time, the connection is terminated with
/// Mqtt::Handshake(HandshakeError::Timeout) error.
///
/// By default, connect timeuot is 5 seconds.
pub fn connect_timeout(mut self, timeout: Seconds) -> Self {
/// By default, timeuot is 5 seconds.
pub fn protocol_version_timeout(mut self, timeout: Seconds) -> Self {

Check warning on line 59 in src/server.rs

View check run for this annotation

Codecov / codecov/patch

src/server.rs#L59

Added line #L59 was not covered by tests
self.connect_timeout = timeout.into();
self
}
Expand Down
21 changes: 10 additions & 11 deletions src/v3/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
use super::sink::MqttSink;

const DEFAULT_KEEPALIVE: Seconds = Seconds(30);
const DEFAULT_OUTGOING_INFLIGHT: u16 = 16;

/// Connect message
pub struct Handshake {
Expand Down Expand Up @@ -67,7 +66,7 @@
keepalive,
session_present,
session: Some(st),
inflight: DEFAULT_OUTGOING_INFLIGHT,
outgoing: None,
return_code: mqtt::ConnectAckReason::ConnectionAccepted,
}
}
Expand All @@ -80,7 +79,7 @@
session: None,
session_present: false,
keepalive: DEFAULT_KEEPALIVE,
inflight: DEFAULT_OUTGOING_INFLIGHT,
outgoing: None,
return_code: mqtt::ConnectAckReason::IdentifierRejected,
}
}
Expand All @@ -92,8 +91,8 @@
shared: self.shared,
session: None,
session_present: false,
outgoing: None,
keepalive: DEFAULT_KEEPALIVE,
inflight: DEFAULT_OUTGOING_INFLIGHT,
return_code: mqtt::ConnectAckReason::BadUserNameOrPassword,
}
}
Expand All @@ -105,8 +104,8 @@
shared: self.shared,
session: None,
session_present: false,
outgoing: None,
keepalive: DEFAULT_KEEPALIVE,
inflight: DEFAULT_OUTGOING_INFLIGHT,
return_code: mqtt::ConnectAckReason::NotAuthorized,
}
}
Expand All @@ -118,8 +117,8 @@
shared: self.shared,
session: None,
session_present: false,
outgoing: None,
keepalive: DEFAULT_KEEPALIVE,
inflight: DEFAULT_OUTGOING_INFLIGHT,
return_code: mqtt::ConnectAckReason::ServiceUnavailable,
}
}
Expand All @@ -139,7 +138,7 @@
pub(crate) return_code: mqtt::ConnectAckReason,
pub(crate) shared: Rc<MqttShared>,
pub(crate) keepalive: Seconds,
pub(crate) inflight: u16,
pub(crate) outgoing: Option<u16>,
}

impl<St> HandshakeAck<St> {
Expand All @@ -151,11 +150,11 @@
self
}

/// Number of outgoing in-flight concurrent messages.
/// Number of outgoing concurrent messages.
///
/// By default in-flight is set to 16 messages
pub fn inflight(mut self, val: u16) -> Self {
self.inflight = val;
/// By default outgoing is set to 16 messages
pub fn max_outgoing(mut self, val: u16) -> Self {
self.outgoing = Some(val);

Check warning on line 157 in src/v3/handshake.rs

View check run for this annotation

Codecov / codecov/patch

src/v3/handshake.rs#L156-L157

Added lines #L156 - L157 were not covered by tests
self
}
}
43 changes: 39 additions & 4 deletions src/v3/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
max_size: u32,
max_inflight: u16,
max_inflight_size: usize,
max_outgoing: u16,
max_outgoing_size: (u32, u32),
handle_qos_after_disconnect: Option<QoS>,
connect_timeout: Seconds,
config: DispatcherConfig,
Expand Down Expand Up @@ -79,6 +81,8 @@
max_size: 0,
max_inflight: 16,
max_inflight_size: 65535,
max_outgoing: 16,
max_outgoing_size: (65535, 512),
handle_qos_after_disconnect: None,
connect_timeout: Seconds::ZERO,
pool: Default::default(),
Expand Down Expand Up @@ -154,19 +158,35 @@
/// Number of in-flight concurrent messages.
///
/// By default in-flight is set to 16 messages
pub fn inflight(mut self, val: u16) -> Self {
pub fn max_inflight(mut self, val: u16) -> Self {

Check warning on line 161 in src/v3/server.rs

View check run for this annotation

Codecov / codecov/patch

src/v3/server.rs#L161

Added line #L161 was not covered by tests
self.max_inflight = val;
self
}

/// Total size of in-flight messages.
///
/// By default total in-flight size is set to 64Kb
pub fn inflight_size(mut self, val: usize) -> Self {
pub fn max_inflight_size(mut self, val: usize) -> Self {

Check warning on line 169 in src/v3/server.rs

View check run for this annotation

Codecov / codecov/patch

src/v3/server.rs#L169

Added line #L169 was not covered by tests
self.max_inflight_size = val;
self
}

/// Number of outgoing concurrent messages.
///
/// By default outgoing is set to 16 messages
pub fn max_outgoing(mut self, val: u16) -> Self {
self.max_outgoing = val;
self
}

Check warning on line 180 in src/v3/server.rs

View check run for this annotation

Codecov / codecov/patch

src/v3/server.rs#L177-L180

Added lines #L177 - L180 were not covered by tests

/// Total size of outgoing messages.
///
/// By default total outgoing size is set to 64Kb
pub fn max_outgoing_size(mut self, val: u32) -> Self {
self.max_outgoing_size = (val, val / 10);
self
}

Check warning on line 188 in src/v3/server.rs

View check run for this annotation

Codecov / codecov/patch

src/v3/server.rs#L185-L188

Added lines #L185 - L188 were not covered by tests

/// Handle max received QoS messages after client disconnect.
///
/// By default, messages received before dispatched to the publish service will be dropped if
Expand Down Expand Up @@ -212,6 +232,8 @@
max_size: self.max_size,
max_inflight: self.max_inflight,
max_inflight_size: self.max_inflight_size,
max_outgoing: self.max_outgoing,
max_outgoing_size: self.max_outgoing_size,
handle_qos_after_disconnect: self.handle_qos_after_disconnect,
connect_timeout: self.connect_timeout,
pool: self.pool,
Expand All @@ -235,8 +257,10 @@
max_size: self.max_size,
max_inflight: self.max_inflight,
max_inflight_size: self.max_inflight_size,
connect_timeout: self.connect_timeout,
max_outgoing: self.max_outgoing,
max_outgoing_size: self.max_outgoing_size,
handle_qos_after_disconnect: self.handle_qos_after_disconnect,
connect_timeout: self.connect_timeout,
pool: self.pool,
_t: PhantomData,
}
Expand Down Expand Up @@ -266,6 +290,8 @@
HandshakeFactory {
factory: self.handshake,
max_size: self.max_size,
max_outgoing: self.max_outgoing,
max_outgoing_size: self.max_outgoing_size,
connect_timeout: self.connect_timeout,
pool: self.pool.clone(),
_t: PhantomData,
Expand All @@ -286,6 +312,8 @@
struct HandshakeFactory<St, H> {
factory: H,
max_size: u32,
max_outgoing: u16,
max_outgoing_size: (u32, u32),
connect_timeout: Seconds,
pool: Rc<MqttSinkPool>,
_t: PhantomData<St>,
Expand All @@ -305,6 +333,8 @@
async fn create(&self, _: ()) -> Result<Self::Service, Self::InitError> {
Ok(HandshakeService {
max_size: self.max_size,
max_outgoing: self.max_outgoing,
max_outgoing_size: self.max_outgoing_size,
pool: self.pool.clone(),
service: self.factory.create(()).await?,
connect_timeout: self.connect_timeout.into(),
Expand All @@ -316,6 +346,8 @@
struct HandshakeService<St, H> {
service: H,
max_size: u32,
max_outgoing: u16,
max_outgoing_size: (u32, u32),
pool: Rc<MqttSinkPool>,
connect_timeout: Millis,
_t: PhantomData<St>,
Expand All @@ -339,6 +371,9 @@
) -> Result<Self::Response, Self::Error> {
log::trace!("Starting mqtt v3 handshake");

let (h, l) = self.max_outgoing_size;
io.memory_pool().set_write_params(h, l);

let codec = mqtt::Codec::default();
codec.set_max_size(self.max_size);
let shared = Rc::new(MqttShared::new(io.get_ref(), codec, false, self.pool.clone()));
Expand Down Expand Up @@ -373,7 +408,7 @@

log::trace!("Sending success handshake ack: {:#?}", pkt);

ack.shared.set_cap(ack.inflight as usize);
ack.shared.set_cap(ack.outgoing.unwrap_or(self.max_outgoing) as usize);
ack.io.encode(pkt, &ack.shared.codec)?;
Ok((
ack.io,
Expand Down
4 changes: 2 additions & 2 deletions tests/test_server_v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1070,10 +1070,10 @@ async fn handle_or_drop_publish_after_disconnect(
)
.unwrap();
io.flush(true).await.unwrap();
sleep(Millis(1750)).await;
sleep(Millis(2750)).await;
io.close();
drop(io);
sleep(Millis(1000)).await;
sleep(Millis(1500)).await;

assert!(disconnect.load(Relaxed));

Expand Down
Loading