Skip to content

Commit

Permalink
Refactor control messages (#170)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Apr 15, 2024
1 parent 5912642 commit 184fc53
Show file tree
Hide file tree
Showing 31 changed files with 491 additions and 1,536 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/cov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ jobs:
run: cargo llvm-cov --all-features --workspace --lcov --output-path lcov.info

- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: lcov.info
fail_ci_if_error: true
8 changes: 8 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changes

## [2.0.0] - 2024-04-1x

* Mark `Control` type as `non exhaustive`

* Rename `ControlMessage` to `Control`

* Remove protocol variant services

## [1.1.0] - 2024-03-07

* Use MqttService::connect_timeout() only for reading protocol version
Expand Down
12 changes: 6 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-mqtt"
version = "1.1.0"
version = "2.0.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Client and Server framework for MQTT v5 and v3.1.1 protocols"
documentation = "https://docs.rs/ntex-mqtt"
Expand All @@ -16,16 +16,16 @@ features = ["ntex/tokio"]

[dependencies]
ntex = "1.2"
bitflags = "2.4"
bitflags = "2"
log = "0.4"
pin-project-lite = "0.2"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
thiserror = "1"

[dev-dependencies]
env_logger = "0.11"
ntex-tls = "1.1"
openssl = "0.10"
test-case = "3.2"
ntex = { version = "1.2", features = ["tokio", "openssl"] }
ntex = { version = "1", features = ["tokio", "openssl"] }
27 changes: 13 additions & 14 deletions examples/subs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ use std::cell::RefCell;

use ntex::service::{fn_factory_with_config, fn_service, ServiceFactory};
use ntex::util::{ByteString, Ready};
use ntex_mqtt::v5::{
self, ControlMessage, ControlResult, MqttServer, Publish, PublishAck, Session,
};
use ntex_mqtt::v5::{self, Control, ControlAck, MqttServer, Publish, PublishAck, Session};

#[derive(Clone, Debug)]
struct MySession {
Expand Down Expand Up @@ -70,22 +68,22 @@ async fn publish(
}

fn control_service_factory() -> impl ServiceFactory<
ControlMessage<MyServerError>,
Control<MyServerError>,
Session<MySession>,
Response = ControlResult,
Response = ControlAck,
Error = MyServerError,
InitError = MyServerError,
> {
fn_factory_with_config(|session: Session<MySession>| {
Ready::Ok(fn_service(move |control| match control {
v5::ControlMessage::Auth(a) => Ready::Ok(a.ack(v5::codec::Auth::default())),
v5::ControlMessage::Error(e) => {
v5::Control::Auth(a) => Ready::Ok(a.ack(v5::codec::Auth::default())),
v5::Control::Error(e) => {
Ready::Ok(e.ack(v5::codec::DisconnectReasonCode::UnspecifiedError))
}
v5::ControlMessage::ProtocolError(e) => Ready::Ok(e.ack()),
v5::ControlMessage::Ping(p) => Ready::Ok(p.ack()),
v5::ControlMessage::Disconnect(d) => Ready::Ok(d.ack()),
v5::ControlMessage::Subscribe(mut s) => {
v5::Control::ProtocolError(e) => Ready::Ok(e.ack()),
v5::Control::Ping(p) => Ready::Ok(p.ack()),
v5::Control::Disconnect(d) => Ready::Ok(d.ack()),
v5::Control::Subscribe(mut s) => {
// store subscribed topics in session, publish service uses this list for echos
s.iter_mut().for_each(|mut s| {
session.subscriptions.borrow_mut().push(s.topic().clone());
Expand All @@ -94,9 +92,10 @@ fn control_service_factory() -> impl ServiceFactory<

Ready::Ok(s.ack())
}
v5::ControlMessage::Unsubscribe(s) => Ready::Ok(s.ack()),
v5::ControlMessage::Closed(c) => Ready::Ok(c.ack()),
v5::ControlMessage::PeerGone(c) => Ready::Ok(c.ack()),
v5::Control::Unsubscribe(s) => Ready::Ok(s.ack()),
v5::Control::Closed(c) => Ready::Ok(c.ack()),
v5::Control::PeerGone(c) => Ready::Ok(c.ack()),
_ => Ready::Ok(control.ack()),
}))
})
}
Expand Down
21 changes: 11 additions & 10 deletions examples/subs_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ async fn main() -> std::io::Result<()> {
let sink = client.sink();

// handle incoming publishes
ntex::rt::spawn(client.start(fn_service(|control: v5::client::ControlMessage<Error>| {
match control {
v5::client::ControlMessage::Publish(publish) => {
ntex::rt::spawn(client.start(fn_service(
|control: v5::client::Control<Error>| match control {
v5::client::Control::Publish(publish) => {
log::info!(
"incoming publish: {:?} -> {:?} payload {:?}",
publish.packet().packet_id,
Expand All @@ -40,28 +40,29 @@ async fn main() -> std::io::Result<()> {
);
Ready::Ok(publish.ack(v5::codec::PublishAckReason::Success))
}
v5::client::ControlMessage::Disconnect(msg) => {
v5::client::Control::Disconnect(msg) => {
log::warn!("Server disconnecting: {:?}", msg);
Ready::Ok(msg.ack())
}
v5::client::ControlMessage::Error(msg) => {
v5::client::Control::Error(msg) => {
log::error!("Codec error: {:?}", msg);
Ready::Ok(msg.ack(v5::codec::DisconnectReasonCode::UnspecifiedError))
}
v5::client::ControlMessage::ProtocolError(msg) => {
v5::client::Control::ProtocolError(msg) => {
log::error!("Protocol error: {:?}", msg);
Ready::Ok(msg.ack())
}
v5::client::ControlMessage::PeerGone(msg) => {
v5::client::Control::PeerGone(msg) => {
log::warn!("Peer closed connection: {:?}", msg.error());
Ready::Ok(msg.ack())
}
v5::client::ControlMessage::Closed(msg) => {
v5::client::Control::Closed(msg) => {
log::warn!("Server closed connection: {:?}", msg);
Ready::Ok(msg.ack())
}
}
})));
_ => Ready::Ok(control.ack()),
},
)));

// subscribe to topic
sink.subscribe(None)
Expand Down
60 changes: 5 additions & 55 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl<Err, InitErr>
InitErr,
>
{
/// Create mqtt protocol selector server
/// Create mqtt server
pub fn new() -> Self {
MqttServer {
v3: DefaultProtocolServer::new(ProtocolVersion::MQTT3),
Expand Down Expand Up @@ -85,11 +85,8 @@ where
Error = Err,
InitError = InitErr,
> + 'static,
Cn: ServiceFactory<
v3::ControlMessage<Err>,
v3::Session<St>,
Response = v3::ControlResult,
> + 'static,
Cn: ServiceFactory<v3::Control<Err>, v3::Session<St>, Response = v3::ControlAck>
+ 'static,
P: ServiceFactory<v3::Publish, v3::Session<St>, Response = ()> + 'static,
C::Error: From<Cn::Error>
+ From<Cn::InitError>
Expand All @@ -105,28 +102,6 @@ where
}
}

/// Service to handle v3 protocol
pub fn v3_variants(
self,
service: v3::Selector<Err, InitErr>,
) -> MqttServer<
impl ServiceFactory<IoBoxed, Response = (), Error = MqttError<Err>, InitError = InitErr>,
V5,
Err,
InitErr,
>
where
Err: 'static,
InitErr: 'static,
{
MqttServer {
v3: service,
v5: self.v5,
connect_timeout: self.connect_timeout,
_t: marker::PhantomData,
}
}

/// Service to handle v5 protocol
pub fn v5<St, C, Cn, P>(
self,
Expand All @@ -145,11 +120,8 @@ where
Error = Err,
InitError = InitErr,
> + 'static,
Cn: ServiceFactory<
v5::ControlMessage<Err>,
v5::Session<St>,
Response = v5::ControlResult,
> + 'static,
Cn: ServiceFactory<v5::Control<Err>, v5::Session<St>, Response = v5::ControlAck>
+ 'static,
P: ServiceFactory<v5::Publish, v5::Session<St>, Response = v5::PublishAck> + 'static,
P::Error: fmt::Debug,
C::Error: From<Cn::Error>
Expand All @@ -166,28 +138,6 @@ where
_t: marker::PhantomData,
}
}

/// Service to handle v5 protocol
pub fn v5_variants<St, C, Cn, P>(
self,
service: v5::Selector<Err, InitErr>,
) -> MqttServer<
V3,
impl ServiceFactory<IoBoxed, Response = (), Error = MqttError<Err>, InitError = InitErr>,
Err,
InitErr,
>
where
Err: 'static,
InitErr: 'static,
{
MqttServer {
v3: self.v3,
v5: service,
connect_timeout: self.connect_timeout,
_t: marker::PhantomData,
}
}
}

impl<V3, V5, Err, InitErr> MqttServer<V3, V5, Err, InitErr>
Expand Down
19 changes: 9 additions & 10 deletions src/v3/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ use ntex::service::{boxed, into_service, IntoService, Pipeline, Service};
use ntex::time::{sleep, Millis, Seconds};
use ntex::util::{Either, Ready};

use crate::error::MqttError;
use crate::io::Dispatcher;
use crate::v3::{codec, shared::MqttShared, sink::MqttSink, ControlResult, Publish};
use crate::v3::{codec, shared::MqttShared, sink::MqttSink, ControlAck, Publish};
use crate::{error::MqttError, io::Dispatcher};

use super::{control::ControlMessage, dispatcher::create_dispatcher};
use super::{control::Control, dispatcher::create_dispatcher};

/// Mqtt client
pub struct Client {
Expand Down Expand Up @@ -104,7 +103,7 @@ impl Client {
self.shared.clone(),
self.max_receive,
into_service(|pkt| Ready::Ok(Either::Right(pkt))),
into_service(|msg: ControlMessage<()>| Ready::<_, ()>::Ok(msg.disconnect())),
into_service(|msg: Control<()>| Ready::<_, ()>::Ok(msg.disconnect())),
);

let _ = Dispatcher::new(self.io, self.shared.clone(), dispatcher, &self.config).await;
Expand All @@ -114,8 +113,8 @@ impl Client {
pub async fn start<F, S, E>(self, service: F) -> Result<(), MqttError<E>>
where
E: 'static,
F: IntoService<S, ControlMessage<E>> + 'static,
S: Service<ControlMessage<E>, Response = ControlResult, Error = E> + 'static,
F: IntoService<S, Control<E>> + 'static,
S: Service<Control<E>, Response = ControlAck, Error = E> + 'static,
{
if self.keepalive.non_zero() {
let _ =
Expand Down Expand Up @@ -190,7 +189,7 @@ where
self.shared.clone(),
self.max_receive,
dispatch(self.builder.finish(), self.handlers),
into_service(|msg: ControlMessage<Err>| Ready::<_, Err>::Ok(msg.disconnect())),
into_service(|msg: Control<Err>| Ready::<_, Err>::Ok(msg.disconnect())),
);

let _ = Dispatcher::new(self.io, self.shared.clone(), dispatcher, &self.config).await;
Expand All @@ -199,8 +198,8 @@ where
/// Run client and handle control messages
pub async fn start<F, S>(self, service: F) -> Result<(), MqttError<Err>>
where
F: IntoService<S, ControlMessage<Err>>,
S: Service<ControlMessage<Err>, Response = ControlResult, Error = Err> + 'static,
F: IntoService<S, Control<Err>>,
S: Service<Control<Err>, Response = ControlAck, Error = Err> + 'static,
{
if self.keepalive.non_zero() {
let _ =
Expand Down
Loading

0 comments on commit 184fc53

Please sign in to comment.