Skip to content

Commit

Permalink
PipelineCall is static
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Jun 23, 2023
1 parent efb2a81 commit c06e7d6
Show file tree
Hide file tree
Showing 8 changed files with 19 additions and 11 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.11.1] - 2023-06-23

* `PipelineCall` is static

## [0.11.0] - 2023-06-22

* Release v0.11.0
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-mqtt"
version = "0.11.0"
version = "0.11.1"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Client and Server framework for MQTT v5 and v3.1.1 protocols"
documentation = "https://docs.rs/ntex-mqtt"
Expand All @@ -12,7 +12,7 @@ exclude = [".gitignore", ".travis.yml", ".cargo/config"]
edition = "2018"

[dependencies]
ntex = "0.7.0"
ntex = "0.7.1"
bitflags = "1.3"
log = "0.4"
pin-project-lite = "0.2"
Expand All @@ -26,7 +26,7 @@ ntex-tls = "0.3.0"
rustls = "0.21"
rustls-pemfile = "1.0"
openssl = "0.10"
ntex = { version = "0.7.0", features = ["tokio", "rustls", "openssl"] }
ntex = { version = "0.7.1", features = ["tokio", "rustls", "openssl"] }
test-case = "3"

[profile.dev]
Expand Down
2 changes: 1 addition & 1 deletion examples/mqtt-ws-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ async fn main() -> std::io::Result<()> {
// and then switch to websokets streaming
.upgrade(
// validate ws request and init ws transport
pipeline_factory(
chain_factory(
|(req, io, codec): (Request, Io<_>, h1::Codec)| {
async move {
match ws::handshake(req.head()) {
Expand Down
6 changes: 3 additions & 3 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pin_project_lite::pin_project! {
inner: DispatcherInner<S, U>,
pool: Pool,
#[pin]
response: Option<PipelineCall<'static, S, DispatchItem<U>>>,
response: Option<PipelineCall<S, DispatchItem<U>>>,
response_idx: usize,
}
}
Expand Down Expand Up @@ -321,7 +321,7 @@ where
if let Some(item) = item {
// optimize first call
if this.response.is_none() {
this.response.set(Some(this.service.call(item).into_static()));
this.response.set(Some(this.service.call(item)));
let res = this.response.as_mut().as_pin_mut().unwrap().poll(cx);

let mut state = inner.state.borrow_mut();
Expand Down Expand Up @@ -360,7 +360,7 @@ where
let st = inner.io.get_ref();
let codec = this.codec.clone();
let state = inner.state.clone();
let fut = this.service.call(item).into_static();
let fut = this.service.call(item);
ntex::rt::spawn(async move {
let item = fut.await;
state.borrow_mut().handle_result(
Expand Down
3 changes: 2 additions & 1 deletion src/v3/client/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ where
self.inner.sink.close();
let inner = self.inner.clone();
*shutdown = Some(Box::pin(async move {
let _ = Pipeline::new(&inner.control).call(ControlMessage::closed()).await;
let _ =
Pipeline::new(&inner.control).service_call(ControlMessage::closed()).await;
}));
}

Expand Down
3 changes: 2 additions & 1 deletion src/v3/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ where
self.inner.sink.close();
let inner = self.inner.clone();
*shutdown = Some(Box::pin(async move {
let _ = Pipeline::new(&inner.control).call(ControlMessage::closed()).await;
let _ =
Pipeline::new(&inner.control).service_call(ControlMessage::closed()).await;
}));
}

Expand Down
3 changes: 2 additions & 1 deletion src/v5/client/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ where
self.inner.sink.drop_sink();
let inner = self.inner.clone();
*shutdown = Some(Box::pin(async move {
let _ = Pipeline::new(&inner.control).call(ControlMessage::closed()).await;
let _ =
Pipeline::new(&inner.control).service_call(ControlMessage::closed()).await;
}));
}

Expand Down
3 changes: 2 additions & 1 deletion src/v5/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ where
self.inner.sink.drop_sink();
let inner = self.inner.clone();
*shutdown = Some(Box::pin(async move {
let _ = Pipeline::new(&inner.control).call(ControlMessage::closed()).await;
let _ =
Pipeline::new(&inner.control).service_call(ControlMessage::closed()).await;
}));
}

Expand Down

0 comments on commit c06e7d6

Please sign in to comment.