Skip to content

Commit

Permalink
No flushing (#153)
Browse files Browse the repository at this point in the history
* No flush on connect ack
  • Loading branch information
fafhrd91 committed Oct 31, 2023
1 parent 39078cb commit 37aa52c
Show file tree
Hide file tree
Showing 8 changed files with 40 additions and 49 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.12.6] - 2023-10-31

* Send server ConnectAck without io flushing

## [0.12.5] - 2023-10-23

* Fix typo
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-mqtt"
version = "0.12.5"
version = "0.12.6"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Client and Server framework for MQTT v5 and v3.1.1 protocols"
documentation = "https://docs.rs/ntex-mqtt"
Expand Down
26 changes: 9 additions & 17 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ impl<E> From<Either<io::Error, io::Error>> for MqttError<E> {
}
}

impl<E> From<EncodeError> for MqttError<E> {
fn from(err: EncodeError) -> Self {
MqttError::Handshake(HandshakeError::Protocol(ProtocolError::Encode(err)))
}
}

impl<E> From<Either<DecodeError, io::Error>> for HandshakeError<E> {
fn from(err: Either<DecodeError, io::Error>) -> Self {
match err {
Expand All @@ -115,17 +121,6 @@ impl<E> From<Either<DecodeError, io::Error>> for HandshakeError<E> {
}
}

impl<E> From<Either<EncodeError, io::Error>> for MqttError<E> {
fn from(err: Either<EncodeError, io::Error>) -> Self {
match err {
Either::Left(err) => {
MqttError::Handshake(HandshakeError::Protocol(ProtocolError::Encode(err)))
}
Either::Right(err) => MqttError::Handshake(HandshakeError::Disconnected(Some(err))),
}
}
}

#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, thiserror::Error)]
pub enum DecodeError {
#[error("Invalid protocol")]
Expand Down Expand Up @@ -200,12 +195,9 @@ pub enum ClientError<T: fmt::Debug> {
Connect(#[from] ntex::connect::ConnectError),
}

impl<T: fmt::Debug> From<Either<EncodeError, std::io::Error>> for ClientError<T> {
fn from(err: Either<EncodeError, std::io::Error>) -> Self {
match err {
Either::Left(err) => ClientError::Protocol(ProtocolError::Encode(err)),
Either::Right(err) => ClientError::Disconnected(Some(err)),
}
impl<T: fmt::Debug> From<EncodeError> for ClientError<T> {
fn from(err: EncodeError) -> Self {
ClientError::Protocol(ProtocolError::Encode(err))
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ where

match inner.st {
IoDispatcherState::Processing => {
let item = match ready!(inner.poll_service(this.service, cx,)) {
let item = match ready!(inner.poll_service(this.service, cx)) {
PollService::Ready => {
// decode incoming bytes stream
match ready!(inner.io.poll_recv(this.codec, cx)) {
Expand Down Expand Up @@ -326,7 +326,6 @@ where
let res = this.response.as_mut().as_pin_mut().unwrap().poll(cx);

let mut state = inner.state.borrow_mut();
let response_idx = state.base.wrapping_add(state.queue.len());

if let Poll::Ready(res) = res {
// check if current result is only response
Expand All @@ -345,12 +344,13 @@ where
Ok(None) => (),
}
} else {
*this.response_idx = response_idx;
*this.response_idx =
state.base.wrapping_add(state.queue.len());
state.queue.push_back(ServiceResult::Ready(res));
}
this.response.set(None);
} else {
*this.response_idx = response_idx;
*this.response_idx = state.base.wrapping_add(state.queue.len());
state.queue.push_back(ServiceResult::Pending);
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/v3/client/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ where
let codec = codec::Codec::new();
codec.set_max_size(self.max_packet_size);

io.send(pkt.into(), &codec).await?;
io.encode(pkt.into(), &codec)?;

let packet = io.recv(&codec).await.map_err(ClientError::from)?.ok_or_else(|| {
log::trace!("Mqtt server is disconnected during handshake");
Expand Down
8 changes: 4 additions & 4 deletions src/v3/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ where
log::trace!("Sending success handshake ack: {:#?}", pkt);

ack.shared.set_cap(ack.inflight as usize);
ack.io.send(pkt, &ack.shared.codec).await?;
ack.io.encode(pkt, &ack.shared.codec)?;
Ok((
ack.io,
ack.shared.clone(),
Expand All @@ -393,7 +393,7 @@ where
});

log::trace!("Sending failed handshake ack: {:#?}", pkt);
ack.io.send(pkt, &ack.shared.codec).await?;
ack.io.encode(pkt, &ack.shared.codec)?;
let _ = ack.io.shutdown().await;

Err(MqttError::Handshake(HandshakeError::Disconnected(None)))
Expand Down Expand Up @@ -518,7 +518,7 @@ where

ack.shared.set_cap(ack.inflight as usize);
ack.shared.codec.set_max_size(self.max_size);
ack.io.send(pkt, &ack.shared.codec).await.map_err(MqttError::from)?;
ack.io.encode(pkt, &ack.shared.codec)?;

let session = Session::new(session, MqttSink::new(ack.shared.clone()));
let handler = self.handler.create(session).await?;
Expand All @@ -537,7 +537,7 @@ where
});

log::trace!("Sending failed handshake ack: {:#?}", pkt);
ack.io.send(pkt, &ack.shared.codec).await?;
ack.io.encode(pkt, &ack.shared.codec)?;
let _ = ack.io.shutdown().await;

Err(MqttError::Handshake(HandshakeError::Disconnected(None)))
Expand Down
2 changes: 1 addition & 1 deletion src/v5/client/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ where
codec.set_max_inbound_size(max_packet_size);
let pool = self.pool.clone();

io.send(codec::Packet::Connect(Box::new(pkt)), &codec).await?;
io.encode(codec::Packet::Connect(Box::new(pkt)), &codec)?;

let packet = io.recv(&codec).await.map_err(ClientError::from)?.ok_or_else(|| {
log::trace!("Mqtt server is disconnected during handshake");
Expand Down
37 changes: 16 additions & 21 deletions src/v5/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,12 +419,10 @@ where
}
shared.set_cap(peer_receive_max);

ack.io
.send(
mqtt::Packet::ConnectAck(Box::new(ack.packet)),
&shared.codec,
)
.await?;
ack.io.encode(
mqtt::Packet::ConnectAck(Box::new(ack.packet)),
&shared.codec,
)?;

Ok((
ack.io,
Expand All @@ -436,12 +434,10 @@ where
None => {
log::trace!("Failed to complete handshake: {:#?}", ack.packet);

ack.io
.send(
mqtt::Packet::ConnectAck(Box::new(ack.packet)),
&ack.shared.codec,
)
.await?;
ack.io.encode(
mqtt::Packet::ConnectAck(Box::new(ack.packet)),
&ack.shared.codec,
)?;
let _ = ack.io.shutdown().await;
Err(MqttError::Handshake(HandshakeError::Disconnected(None)))
}
Expand Down Expand Up @@ -604,9 +600,10 @@ where
ack.packet.server_keepalive_sec = Some(ack.keepalive);
}
shared.set_cap(peer_receive_max);
ack.io
.send(mqtt::Packet::ConnectAck(Box::new(ack.packet)), &shared.codec)
.await?;
ack.io.encode(
mqtt::Packet::ConnectAck(Box::new(ack.packet)),
&shared.codec,
)?;

let session = Session::new(session, MqttSink::new(shared.clone()));
let handler = self.handler.create(session).await?;
Expand All @@ -621,12 +618,10 @@ where
None => {
log::trace!("Failed to complete handshake: {:#?}", ack.packet);

ack.io
.send(
mqtt::Packet::ConnectAck(Box::new(ack.packet)),
&ack.shared.codec,
)
.await?;
ack.io.encode(
mqtt::Packet::ConnectAck(Box::new(ack.packet)),
&ack.shared.codec,
)?;
let _ = ack.io.shutdown().await;
Err(MqttError::Handshake(HandshakeError::Disconnected(None)))
}
Expand Down

0 comments on commit 37aa52c

Please sign in to comment.