Skip to content

Commit

Permalink
Mark Control as non exhaustive
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Apr 15, 2024
1 parent 2f7bbc5 commit cc69135
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 17 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/cov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ jobs:
run: cargo llvm-cov --all-features --workspace --lcov --output-path lcov.info

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: lcov.info
fail_ci_if_error: true
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## [2.0.0] - 2024-04-1x

* Mark `Control` type as `non exhaustive`

* Rename `ControlMessage` to `Control`

* Remove protocol variant services
Expand Down
5 changes: 2 additions & 3 deletions examples/subs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ use std::cell::RefCell;

use ntex::service::{fn_factory_with_config, fn_service, ServiceFactory};
use ntex::util::{ByteString, Ready};
use ntex_mqtt::v5::{
self, Control, ControlAck, MqttServer, Publish, PublishAck, Session,
};
use ntex_mqtt::v5::{self, Control, ControlAck, MqttServer, Publish, PublishAck, Session};

#[derive(Clone, Debug)]
struct MySession {
Expand Down Expand Up @@ -97,6 +95,7 @@ fn control_service_factory() -> impl ServiceFactory<
v5::Control::Unsubscribe(s) => Ready::Ok(s.ack()),
v5::Control::Closed(c) => Ready::Ok(c.ack()),
v5::Control::PeerGone(c) => Ready::Ok(c.ack()),
_ => Ready::Ok(control.ack()),
}))
})
}
Expand Down
9 changes: 5 additions & 4 deletions examples/subs_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ async fn main() -> std::io::Result<()> {
let sink = client.sink();

// handle incoming publishes
ntex::rt::spawn(client.start(fn_service(|control: v5::client::Control<Error>| {
match control {
ntex::rt::spawn(client.start(fn_service(
|control: v5::client::Control<Error>| match control {
v5::client::Control::Publish(publish) => {
log::info!(
"incoming publish: {:?} -> {:?} payload {:?}",
Expand Down Expand Up @@ -60,8 +60,9 @@ async fn main() -> std::io::Result<()> {
log::warn!("Server closed connection: {:?}", msg);
Ready::Ok(msg.ack())
}
}
})));
_ => Ready::Ok(control.ack()),
},
)));

// subscribe to topic
sink.subscribe(None)
Expand Down
13 changes: 13 additions & 0 deletions src/v3/client/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::io;
pub use crate::v3::control::{Closed, ControlAck, Disconnect, Error, PeerGone, ProtocolError};
use crate::v3::{codec, control::ControlAckKind, error};

/// Client control messages
#[non_exhaustive]
#[derive(Debug)]
pub enum Control<E> {
/// Unhandled publish packet
Expand Down Expand Up @@ -42,6 +44,17 @@ impl<E> Control<E> {
pub fn disconnect(&self) -> ControlAck {
ControlAck { result: ControlAckKind::Disconnect }
}

/// Ack control message
pub fn ack(self) -> ControlAck {
match self {
Control::Publish(msg) => msg.ack(),
Control::Closed(msg) => msg.ack(),
Control::Error(msg) => msg.ack(),
Control::ProtocolError(msg) => msg.ack(),
Control::PeerGone(msg) => msg.ack(),

Check warning on line 55 in src/v3/client/control.rs

View check run for this annotation

Codecov / codecov/patch

src/v3/client/control.rs#L49-L55

Added lines #L49 - L55 were not covered by tests
}
}
}

#[derive(Debug)]
Expand Down
22 changes: 22 additions & 0 deletions src/v3/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::{io, marker::PhantomData, num::NonZeroU16};
use super::codec;
use crate::{error, types::QoS};

/// Server control messages
#[non_exhaustive]
#[derive(Debug)]
pub enum Control<E> {
/// Ping packet
Expand Down Expand Up @@ -86,6 +88,26 @@ impl<E> Control<E> {
pub fn disconnect(&self) -> ControlAck {
ControlAck { result: ControlAckKind::Disconnect }
}

/// Ack control message
pub fn ack(self) -> ControlAck {
match self {
Control::Ping(msg) => msg.ack(),
Control::Disconnect(msg) => msg.ack(),

Check warning on line 96 in src/v3/control.rs

View check run for this annotation

Codecov / codecov/patch

src/v3/control.rs#L93-L96

Added lines #L93 - L96 were not covered by tests
Control::Subscribe(_) => {
log::warn!("Subscribe is not supported");
ControlAck { result: ControlAckKind::Disconnect }

Check warning on line 99 in src/v3/control.rs

View check run for this annotation

Codecov / codecov/patch

src/v3/control.rs#L98-L99

Added lines #L98 - L99 were not covered by tests
}
Control::Unsubscribe(_) => {
log::warn!("Unsubscribe is not supported");
ControlAck { result: ControlAckKind::Disconnect }

Check warning on line 103 in src/v3/control.rs

View check run for this annotation

Codecov / codecov/patch

src/v3/control.rs#L102-L103

Added lines #L102 - L103 were not covered by tests
}
Control::Closed(msg) => msg.ack(),
Control::Error(msg) => msg.ack(),
Control::ProtocolError(msg) => msg.ack(),
Control::PeerGone(msg) => msg.ack(),

Check warning on line 108 in src/v3/control.rs

View check run for this annotation

Codecov / codecov/patch

src/v3/control.rs#L105-L108

Added lines #L105 - L108 were not covered by tests
}
}
}

#[derive(Copy, Clone, Debug)]
Expand Down
18 changes: 18 additions & 0 deletions src/v5/client/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use crate::{error, v5::codec};

pub use crate::v5::control::{Closed, ControlAck, Disconnect, Error, ProtocolError};

/// Client control messages
#[non_exhaustive]
#[derive(Debug)]
pub enum Control<E> {
/// Unhandled publish packet
Expand Down Expand Up @@ -50,6 +52,22 @@ impl<E> Control<E> {
pub fn disconnect(&self, pkt: codec::Disconnect) -> ControlAck {
ControlAck { packet: Some(codec::Packet::Disconnect(pkt)), disconnect: true }
}

/// Ack control message
pub fn ack(self) -> ControlAck {
match self {

Check warning on line 58 in src/v5/client/control.rs

View check run for this annotation

Codecov / codecov/patch

src/v5/client/control.rs#L57-L58

Added lines #L57 - L58 were not covered by tests
Control::Publish(_) => {
crate::v5::disconnect("Publish control message is not supported")

Check warning on line 60 in src/v5/client/control.rs

View check run for this annotation

Codecov / codecov/patch

src/v5/client/control.rs#L60

Added line #L60 was not covered by tests
}
Control::Disconnect(msg) => msg.ack(),
Control::Closed(msg) => msg.ack(),

Check warning on line 63 in src/v5/client/control.rs

View check run for this annotation

Codecov / codecov/patch

src/v5/client/control.rs#L62-L63

Added lines #L62 - L63 were not covered by tests
Control::Error(_) => {
crate::v5::disconnect("Error control message is not supported")

Check warning on line 65 in src/v5/client/control.rs

View check run for this annotation

Codecov / codecov/patch

src/v5/client/control.rs#L65

Added line #L65 was not covered by tests
}
Control::ProtocolError(msg) => msg.ack(),
Control::PeerGone(msg) => msg.ack(),

Check warning on line 68 in src/v5/client/control.rs

View check run for this annotation

Codecov / codecov/patch

src/v5/client/control.rs#L67-L68

Added lines #L67 - L68 were not covered by tests
}
}
}

#[derive(Debug)]
Expand Down
18 changes: 17 additions & 1 deletion src/v5/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use ntex::util::ByteString;
use super::codec::{self, DisconnectReasonCode, QoS, UserProperties};
use crate::error;

/// Control messages
/// Server control messages
#[non_exhaustive]
#[derive(Debug)]
pub enum Control<E> {
/// Auth packet from a client
Expand Down Expand Up @@ -100,6 +101,21 @@ impl<E> Control<E> {
pub fn disconnect_with(&self, pkt: codec::Disconnect) -> ControlAck {
ControlAck { packet: Some(codec::Packet::Disconnect(pkt)), disconnect: true }
}

/// Ack control message
pub fn ack(self) -> ControlAck {
match self {
Control::Auth(_) => super::disconnect("Auth control message is not supported"),
Control::Ping(msg) => msg.ack(),
Control::Disconnect(msg) => msg.ack(),
Control::Subscribe(msg) => msg.ack(),
Control::Unsubscribe(msg) => msg.ack(),
Control::Closed(msg) => msg.ack(),
Control::Error(_) => super::disconnect("Error control message is not supported"),
Control::ProtocolError(msg) => msg.ack(),
Control::PeerGone(msg) => msg.ack(),

Check warning on line 116 in src/v5/control.rs

View check run for this annotation

Codecov / codecov/patch

src/v5/control.rs#L106-L116

Added lines #L106 - L116 were not covered by tests
}
}
}

#[derive(Debug)]
Expand Down
12 changes: 12 additions & 0 deletions src/v5/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,15 @@ pub use crate::topic::{TopicFilter, TopicFilterError};
pub use crate::types::QoS;

const RECEIVE_MAX_DEFAULT: NonZeroU16 = unsafe { NonZeroU16::new_unchecked(65_535) };

fn disconnect(msg: &'static str) -> ControlAck {
log::error!("{}", msg);

Check warning on line 33 in src/v5/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/v5/mod.rs#L32-L33

Added lines #L32 - L33 were not covered by tests

ControlAck {
packet: Some(
codec::Disconnect::new(codec::DisconnectReasonCode::ImplementationSpecificError)
.into(),
),
disconnect: true,
}
}

Check warning on line 42 in src/v5/mod.rs

View check run for this annotation

Codecov / codecov/patch

src/v5/mod.rs#L35-L42

Added lines #L35 - L42 were not covered by tests
7 changes: 4 additions & 3 deletions tests/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,10 @@ async fn handle_or_drop_publish_after_disconnect(
.unwrap();
io.encode(codec::Packet::Disconnect, &codec).unwrap();
io.flush(true).await.unwrap();
sleep(Millis(350)).await;
io.close();
drop(io);

sleep(Millis(50)).await;
sleep(Millis(250)).await;

assert!(disconnect.load(Relaxed));

Expand Down Expand Up @@ -770,7 +771,7 @@ async fn test_frame_read_rate() -> std::io::Result<()> {
sleep(Millis(1000)).await;
assert!(!check.load(Relaxed));

sleep(Millis(2100)).await;
sleep(Millis(2300)).await;
assert!(check.load(Relaxed));

Ok(())
Expand Down
10 changes: 5 additions & 5 deletions tests/test_server_v5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use ntex::util::{lazy, ByteString, Bytes, BytesMut, Ready};
use ntex::{codec::Encoder, server, service::fn_service};

use ntex_mqtt::v5::{
client, codec, error, Control, Handshake, HandshakeAck, MqttServer, Publish,
PublishAck, QoS, Session,
client, codec, error, Control, Handshake, HandshakeAck, MqttServer, Publish, PublishAck,
QoS, Session,
};

struct St;
Expand Down Expand Up @@ -1070,9 +1070,9 @@ async fn handle_or_drop_publish_after_disconnect(
)
.unwrap();
io.flush(true).await.unwrap();
io.close();
drop(io);

sleep(Millis(50)).await;
sleep(Millis(250)).await;

assert!(disconnect.load(Relaxed));

Expand Down Expand Up @@ -1297,7 +1297,7 @@ async fn test_frame_read_rate() -> std::io::Result<()> {
sleep(Millis(1000)).await;
assert!(!check.load(Relaxed));

sleep(Millis(2100)).await;
sleep(Millis(2300)).await;
assert!(check.load(Relaxed));

Ok(())
Expand Down

0 comments on commit cc69135

Please sign in to comment.