From c06e7d61e5d050ce48a3f3a98b7c262609034cb5 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Fri, 23 Jun 2023 16:55:27 +0600 Subject: [PATCH] PipelineCall is static --- CHANGES.md | 4 ++++ Cargo.toml | 6 +++--- examples/mqtt-ws-server.rs | 2 +- src/io.rs | 6 +++--- src/v3/client/dispatcher.rs | 3 ++- src/v3/dispatcher.rs | 3 ++- src/v5/client/dispatcher.rs | 3 ++- src/v5/dispatcher.rs | 3 ++- 8 files changed, 19 insertions(+), 11 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 590770e..dddea72 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.11.1] - 2023-06-23 + +* `PipelineCall` is static + ## [0.11.0] - 2023-06-22 * Release v0.11.0 diff --git a/Cargo.toml b/Cargo.toml index 534c8b5..b59e984 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-mqtt" -version = "0.11.0" +version = "0.11.1" authors = ["ntex contributors "] description = "Client and Server framework for MQTT v5 and v3.1.1 protocols" documentation = "https://docs.rs/ntex-mqtt" @@ -12,7 +12,7 @@ exclude = [".gitignore", ".travis.yml", ".cargo/config"] edition = "2018" [dependencies] -ntex = "0.7.0" +ntex = "0.7.1" bitflags = "1.3" log = "0.4" pin-project-lite = "0.2" @@ -26,7 +26,7 @@ ntex-tls = "0.3.0" rustls = "0.21" rustls-pemfile = "1.0" openssl = "0.10" -ntex = { version = "0.7.0", features = ["tokio", "rustls", "openssl"] } +ntex = { version = "0.7.1", features = ["tokio", "rustls", "openssl"] } test-case = "3" [profile.dev] diff --git a/examples/mqtt-ws-server.rs b/examples/mqtt-ws-server.rs index dfff7f8..d61479a 100644 --- a/examples/mqtt-ws-server.rs +++ b/examples/mqtt-ws-server.rs @@ -124,7 +124,7 @@ async fn main() -> std::io::Result<()> { // and then switch to websokets streaming .upgrade( // validate ws request and init ws transport - pipeline_factory( + chain_factory( |(req, io, codec): (Request, Io<_>, h1::Codec)| { async move { match ws::handshake(req.head()) { diff --git a/src/io.rs b/src/io.rs index 81af32c..166df55 100644 --- a/src/io.rs +++ b/src/io.rs @@ -25,7 +25,7 @@ pin_project_lite::pin_project! { inner: DispatcherInner, pool: Pool, #[pin] - response: Option>>, + response: Option>>, response_idx: usize, } } @@ -321,7 +321,7 @@ where if let Some(item) = item { // optimize first call if this.response.is_none() { - this.response.set(Some(this.service.call(item).into_static())); + this.response.set(Some(this.service.call(item))); let res = this.response.as_mut().as_pin_mut().unwrap().poll(cx); let mut state = inner.state.borrow_mut(); @@ -360,7 +360,7 @@ where let st = inner.io.get_ref(); let codec = this.codec.clone(); let state = inner.state.clone(); - let fut = this.service.call(item).into_static(); + let fut = this.service.call(item); ntex::rt::spawn(async move { let item = fut.await; state.borrow_mut().handle_result( diff --git a/src/v3/client/dispatcher.rs b/src/v3/client/dispatcher.rs index 46321f2..c7dfccb 100644 --- a/src/v3/client/dispatcher.rs +++ b/src/v3/client/dispatcher.rs @@ -90,7 +90,8 @@ where self.inner.sink.close(); let inner = self.inner.clone(); *shutdown = Some(Box::pin(async move { - let _ = Pipeline::new(&inner.control).call(ControlMessage::closed()).await; + let _ = + Pipeline::new(&inner.control).service_call(ControlMessage::closed()).await; })); } diff --git a/src/v3/dispatcher.rs b/src/v3/dispatcher.rs index 529ac6d..5548c12 100644 --- a/src/v3/dispatcher.rs +++ b/src/v3/dispatcher.rs @@ -144,7 +144,8 @@ where self.inner.sink.close(); let inner = self.inner.clone(); *shutdown = Some(Box::pin(async move { - let _ = Pipeline::new(&inner.control).call(ControlMessage::closed()).await; + let _ = + Pipeline::new(&inner.control).service_call(ControlMessage::closed()).await; })); } diff --git a/src/v5/client/dispatcher.rs b/src/v5/client/dispatcher.rs index 725e7a7..73cf852 100644 --- a/src/v5/client/dispatcher.rs +++ b/src/v5/client/dispatcher.rs @@ -116,7 +116,8 @@ where self.inner.sink.drop_sink(); let inner = self.inner.clone(); *shutdown = Some(Box::pin(async move { - let _ = Pipeline::new(&inner.control).call(ControlMessage::closed()).await; + let _ = + Pipeline::new(&inner.control).service_call(ControlMessage::closed()).await; })); } diff --git a/src/v5/dispatcher.rs b/src/v5/dispatcher.rs index 32f41d3..6581952 100644 --- a/src/v5/dispatcher.rs +++ b/src/v5/dispatcher.rs @@ -149,7 +149,8 @@ where self.inner.sink.drop_sink(); let inner = self.inner.clone(); *shutdown = Some(Box::pin(async move { - let _ = Pipeline::new(&inner.control).call(ControlMessage::closed()).await; + let _ = + Pipeline::new(&inner.control).service_call(ControlMessage::closed()).await; })); }