Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

relayed connection client becomes numb after disconnect and reconnect #310

Merged
merged 31 commits into from
Sep 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
9aef611
init
staszek-krotki Aug 31, 2023
9ffe6a2
logging
staszek-krotki Aug 31, 2023
63d66ab
handle Unauthorized for all sessions - just close it
staszek-krotki Sep 1, 2023
5cf5b58
some logs
staszek-krotki Sep 1, 2023
2dde931
merge main
staszek-krotki Sep 1, 2023
f089e47
more logs
staszek-krotki Sep 4, 2023
fc46b3e
Fmt
pwalski Sep 5, 2023
cc5732e
Merge branch 'main' into staszekk/nat-disconnected-client
pwalski Sep 5, 2023
2664eae
more logs and remove forwards when server session is lost
staszek-krotki Sep 13, 2023
a1463af
merge conflicts
staszek-krotki Sep 13, 2023
2f8bc86
no packet trace enable
staszek-krotki Sep 13, 2023
79aa652
revert some logging
staszek-krotki Sep 13, 2023
9717e2c
cleanup logs in session
staszek-krotki Sep 13, 2023
a1598ef
some more logs removed
staszek-krotki Sep 13, 2023
bd94990
clean up the logs
staszek-krotki Sep 13, 2023
c8a9b19
cleanup logs
staszek-krotki Sep 13, 2023
1314b17
merge main
staszek-krotki Sep 13, 2023
e9c4502
fmt and less logs
staszek-krotki Sep 14, 2023
c613980
Merge branch 'main' into staszekk/nat-disconnected-client
staszek-krotki Sep 14, 2023
d120fd9
fix test
staszek-krotki Sep 14, 2023
8f308fa
clean up the code
staszek-krotki Sep 14, 2023
c670cc1
Update client/src/session.rs
staszek-krotki Sep 18, 2023
e8998d0
Update server/src/server.rs
staszek-krotki Sep 18, 2023
8a4c916
Update server/src/server.rs
staszek-krotki Sep 18, 2023
72db353
Update server/src/server.rs
staszek-krotki Sep 18, 2023
a343260
Update server/src/server.rs
staszek-krotki Sep 18, 2023
3664db4
Update server/src/server.rs
staszek-krotki Sep 18, 2023
29e92fb
Update client/src/session.rs
staszek-krotki Sep 18, 2023
29c4611
review upadtes
staszek-krotki Sep 18, 2023
ac05073
Merge branch 'staszekk/nat-disconnected-client' of github.com:golemfa…
staszek-krotki Sep 18, 2023
7792d54
Merge branch 'main' into staszekk/nat-disconnected-client
staszek-krotki Sep 19, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions client/examples/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ use actix_web::{
App, HttpResponse, HttpServer, Responder,
};
use anyhow::{anyhow, Result};
use chrono::Local;
use futures::{future, try_join, FutureExt};
use rand::Rng;
use std::collections::VecDeque;
use std::{
collections::{HashMap, VecDeque},
collections::HashMap,
io::Write,
sync::{Arc, Mutex},
time::Instant,
};
Expand Down Expand Up @@ -449,7 +452,19 @@ async fn forward(
}

async fn run() -> Result<()> {
env_logger::init();
env_logger::Builder::new()
.parse_default_env()
.format(|buf, record| {
writeln!(
buf,
"[{} {:5} {}] {}",
Local::now().format("%Y-%m-%d %H:%M:%S%.3f"),
record.level(),
record.module_path().unwrap_or("<unnamed>"),
record.args()
)
})
.init();

let cli = Cli::from_args();
let client = build_client(
Expand Down Expand Up @@ -515,7 +530,9 @@ async fn build_client(
FallbackCryptoProvider::default()
};

let mut builder = ClientBuilder::from_url(relay_addr).crypto(provider);
let mut builder = ClientBuilder::from_url(relay_addr)
.crypto(provider)
.expire_session_after(std::time::Duration::from_secs(20));
pwalski marked this conversation as resolved.
Show resolved Hide resolved

if let Some(bind) = p2p_bind_addr {
builder = builder.listen(bind);
Expand Down
2 changes: 2 additions & 0 deletions client/src/direct_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub struct AllowedForwards {

impl AllowedForwards {
pub fn add(&mut self, node: NodeEntry<NodeId>, slot: SlotId) {
log::trace!("[add]: node {} to slot {}", node.default_id, slot);
// Remove previous information about node.
// We are removing all identities and slot. This is redundant, because in most cases
// using default id should be enough. This protects from situations, when `NodeEntries`
Expand All @@ -53,6 +54,7 @@ impl AllowedForwards {
}

pub fn remove(&mut self, node_id: &NodeId) -> Option<NodeEntry<NodeId>> {
log::trace!("[remove]: trying to remove node {}", node_id);
if let Some(slot) = self.nodes.remove(node_id) {
if let Some(entry) = self.slots.remove(&slot) {
for id in &entry.identities {
Expand Down
101 changes: 79 additions & 22 deletions client/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use metrics::{gauge, increment_counter};
use std::cmp::{max, min};
use std::collections::{HashMap, HashSet, VecDeque};
use std::convert::{TryFrom, TryInto};
use std::iter::FromIterator;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex, Weak};
use std::thread::sleep;
Expand All @@ -28,15 +29,19 @@ use crate::client::{ClientConfig, Forwarded};
use crate::direct_session::{DirectSession, NodeEntry};
use crate::dispatch::{dispatch, Handler};
use crate::encryption::Encryption;
use crate::error::{ProtocolError, ResultExt, SessionError, SessionInitError, SessionResult};
use crate::error::{
ProtocolError, ResultExt, SessionError, SessionInitError, SessionResult, TransitionError,
};
use crate::metrics::{metric_session_established, TARGET_ID};
use crate::raw_session::{RawSession, SessionType};
use crate::routing_session::{NodeRouting, RoutingSender};
use crate::session::session_initializer::SessionInitializer;
use crate::transport::ForwardReceiver;

use crate::error::SenderError::Session;
use crate::session::session_state::SessionState::{Closed, FailedEstablish};
use crate::session::session_traits::{SessionDeregistration, SessionRegistration};
use crate::SessionError::Network;
use ya_relay_core::identity::Identity;
use ya_relay_core::server_session::{Endpoint, NodeInfo, SessionId, TransportType};
use ya_relay_core::udp_stream::{udp_bind, OutStream};
Expand Down Expand Up @@ -215,12 +220,13 @@ impl SessionRegistration for SessionLayer {
#[async_trait(?Send)]
impl SessionDeregistration for SessionLayer {
async fn unregister(&self, node_id: NodeId) {
log::debug!("Unregistering Node [{node_id}]");
log::debug!("[unregister]: Unregistering Node [{node_id}]");

let direct = {
let mut state = self.state.write().await;

let mut ids = HashSet::<NodeId>::new();
let mut ids: HashSet<NodeId> = HashSet::from_iter(vec![node_id]);

let routing = state.nodes.get(&node_id).cloned();
let direct = state.p2p_nodes.get(&node_id).cloned();

Expand Down Expand Up @@ -258,6 +264,11 @@ impl SessionDeregistration for SessionLayer {
}
}

if let Some(entry) = self.registry.get_entry(node_id).await {
log::trace!("[unregister]: found entry for {node_id}, removing...",);
self.registry.remove_node(node_id).await;
}

direct
};

Expand Down Expand Up @@ -286,6 +297,21 @@ impl SessionDeregistration for SessionLayer {
// Node should handle disconnected Nodes properly even if he won't be notified.
session.raw.disconnect().await.ok();

if session.owner.default_id == NodeId::default() {
let f = session.list();
log::trace!(
"[close_session]: lost session with server - remove {} forwards",
f.len()
);
for e in f {
log::trace!(
"[close_session]: removing forward node_id {}.",
e.default_id
);
self.registry.remove_node(e.default_id).await;
}
}

let forwards = session.list();
{
let mut state = self.state.write().await;
Expand Down Expand Up @@ -535,7 +561,7 @@ impl SessionLayer {
/// TODO: `disconnect` shouldn't fail, because there is no reasonable reaction to this case.
/// This function must leave everything in clean state.
pub async fn disconnect(&self, node_id: NodeId) -> Result<(), SessionError> {
log::info!("Disconnecting Node [{node_id}]");
log::info!("[disconnect]: Disconnecting Node [{node_id}]");

// Note: This function shouldn't return before changing state to `Closed` (abort-safety).
let entry = self.registry.guard(node_id, &[]).await;
Expand Down Expand Up @@ -583,7 +609,7 @@ impl SessionLayer {
node_id: NodeId,
dont_use: Vec<ConnectionMethod>,
) -> Result<RoutingSender, SessionError> {
log::trace!("Requested session with [{node_id}]");
log::trace!("[session]: Requested session with [{node_id}]");

if let Some(routing) = self.get_node_routing(node_id).await {
// Why we need this ugly solution? Can't we just return `RoutingSender`?
Expand All @@ -603,7 +629,7 @@ impl SessionLayer {
return Ok(routing);
}

log::trace!("Node [{node_id}] not found in routing tables. Trying to establish session...");
log::trace!("[session]: Node [{node_id}] not found in routing tables. Trying to establish session...");

// Query relay server for Node information, we need to find out default id and aliases.
let info = self
Expand Down Expand Up @@ -634,7 +660,7 @@ impl SessionLayer {
let addrs: Vec<SocketAddr> = self.filter_own_addresses(&info.endpoints).await;
let this = self.clone();

match self.registry.lock_outgoing(remote_id, &addrs, this).await {
let session = match self.registry.lock_outgoing(remote_id, &addrs, this).await {
SessionLock::Permit(mut permit) => {
log::trace!("Acquired `SessionPermit` to init session with [{remote_id}]");
let myself = self.clone();
Expand All @@ -658,13 +684,34 @@ impl SessionLayer {
}
.map_err(|e| SessionError::Generic(e.to_string()))?;

session.raw.dispatcher.handle_error(
proto::StatusCode::Unauthorized as i32,
true,
self.clone(),
Arc::downgrade(&session),
Self::error_handler(),
);

self.get_node_routing(node_id)
.await
.ok_or(SessionError::Internal(format!(
"Session with [{remote_id}] closed immediately after establishing."
)))
}

fn error_handler() -> fn(i32, SessionLayer, Weak<DirectSession>) -> LocalBoxFuture<'static, ()>
{
move |code: i32, layer: SessionLayer, session: Weak<DirectSession>| {
async move {
if let Some(session) = session.upgrade() {
layer.close_session(session).await;
}
log::trace!("[session-layer]: handle_error {code}");
}
.boxed_local()
}
}

pub async fn server_session(&self) -> Result<Arc<DirectSession>, SessionError> {
// A little bit dirty hack, that we give default NodeId (0x00) for relay server.
// TODO: In the future relays should have regular NodeId
Expand Down Expand Up @@ -713,15 +760,7 @@ impl SessionLayer {
true,
self.clone(),
Arc::downgrade(&session),
move |code, layer, session| {
async move {
if let Some(session) = session.upgrade() {
layer.close_session(session).await;
}
log::debug!("handle_error {code}");
}
.boxed_local()
},
Self::error_handler(),
);

// TODO: Make sure this functionality is replaced in new code.
Expand Down Expand Up @@ -1004,9 +1043,7 @@ impl SessionLayer {
}

pub(crate) async fn await_connected(&self, node_id: NodeId) -> Result<(), SessionError> {
log::trace!(
"Session with Node [{node_id}] is registered. Waiting until it will be ready.."
);
log::trace!("[await_connected]: Session with Node [{node_id}] is registered.");

let entry = self
.registry
Expand All @@ -1015,6 +1052,9 @@ impl SessionLayer {
.ok_or(SessionError::Internal(format!(
"Entry for Node [{node_id}] not found, despite it should exits."
)))?;

log::trace!("[await_connected]: Waiting until it will be ready..");

entry.awaiting_notifier().await_for_finish().await?;
Ok(())
}
Expand All @@ -1026,13 +1066,19 @@ impl SessionLayer {
from: SocketAddr,
request: proto::request::Session,
) -> Result<(), SessionError> {
log::trace!("Called `dispatch_session` by {from}.");
log::trace!(
"[dispatch_session]: from {}, sessionId: {}.",
from,
hex::encode(&session_id)
);

let protocol = self.get_protocol().await?;
let this = self.clone();

if session_id.is_empty() {
log::trace!("Received `Session` packet with empty session id from {from}. Handling init attempt..");
log::trace!(
"[dispatch_session]: empty session id from {from}. Handling init attempt.."
);

// Empty `session_id` indicates attempt to initialize session.
let remote_id = challenge::recover_default_node_id(&request)
Expand Down Expand Up @@ -1082,6 +1128,11 @@ impl SessionLayer {
from: SocketAddr,
_request: proto::request::Ping,
) {
log::trace!(
"[on_ping]: from {}, sessionId: {}.",
from,
hex::encode(&session_id)
);
let packet = proto::Packet::response(
request_id,
session_id,
Expand All @@ -1100,7 +1151,12 @@ impl SessionLayer {
from: SocketAddr,
by: By,
) -> anyhow::Result<()> {
log::trace!("Handling `Disconnected` from {from}");
log::trace!(
"[on_disconnected]: from {}, sessionId: {} by {:?}.",
from,
hex::encode(&session_id),
by
);

// SessionId should be valid, otherwise this is some unknown session
// so we should be cautious, when processing it.
Expand Down Expand Up @@ -1451,6 +1507,7 @@ impl Handler for SessionLayer {
// TODO: Consider just adding node to `DirectSession` forwards list. If the other Node couldn't
// establish p2p session with us, we won't be able to do this anyway.
log::debug!("Attempting to establish connection to Node {} (slot {})", ident.node_id, node.slot);

let session = myself
.session_filtered_connection_methods(ident.node_id, vec![ConnectionMethod::Reverse, ConnectionMethod::Direct])
.await.map_err(|e| anyhow!("Failed to resolve node with slot {slot}. {e}"))?;
Expand Down
2 changes: 1 addition & 1 deletion client/src/session/expire.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub async fn track_sessions_expiration(layer: SessionLayer) {
let expiration = layer.config.session_expiration;

loop {
log::trace!("Checking, if all sessions are alive. Removing not active sessions.");
log::trace!("[expire]: Checking, if all sessions are alive. Removing not active sessions.");

let sessions = layer
.sessions()
Expand Down
20 changes: 17 additions & 3 deletions client/src/session/network_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,17 @@ impl NetworkView {
target
}

pub async fn remove_node(&self, node_id: NodeId) {
log::trace!("[remove_node]: node_id {}", node_id);
let mut state = self.state.write().await;
if let Some(target) = state.find(node_id, &[]) {
if node_id != NodeId::default() {
state.by_node_id.remove(&node_id).is_some();
state.by_addr.retain(|_, node_view| node_view.id != node_id);
}
}
}

/// Updates information about given Node.
/// This function must keep data in `NetworkView` consistent. It should check all NodeIds,
/// because some of them may already been in separate `NodeViews`. This could happen
Expand Down Expand Up @@ -467,7 +478,6 @@ impl NodeView {
)
}
};

SessionLock::Wait(notifier)
}
}
Expand Down Expand Up @@ -588,6 +598,7 @@ impl SessionPermit {
layer: Arc<Box<dyn SessionDeregistration>>,
new_state: SessionState,
) {
log::trace!("[async_drop]: for node id: {}", node.id);
let node_id = node.id;
let reverse = matches!(&new_state, SessionState::ReverseConnection(_));

Expand Down Expand Up @@ -621,6 +632,7 @@ impl SessionPermit {
/// Makes sure we deregister all data about this Node and abort all futures
/// that might be during initialization.
pub(crate) async fn clean_state(node: &NodeView, layer: Arc<Box<dyn SessionDeregistration>>) {
log::trace!("[clean_state]: {}", node.id);
for addr in node.public_addresses().await {
layer.abort_initializations(addr).await.ok();
}
Expand Down Expand Up @@ -702,7 +714,9 @@ impl NodeAwaiting {
}
SessionState::FailedEstablish(e) => return Err(e),
_ => {
log::trace!("Waiting for established session with [{node_id}]: state: {state}")
log::trace!(
"Waiting for established session with [{node_id}]: skipping state: {state}"
)
}
};

Expand All @@ -723,7 +737,7 @@ impl NodeAwaiting {
}
SessionState::FailedEstablish(e) => return Err(e),
_ => {
log::trace!("Waiting for Closed or FailedEstablished session with [{node_id}]. Current state: {state}")
log::trace!("Waiting for Closed or FailedEstablished session with [{node_id}]. skipping state: {state}")
}
};

Expand Down
1 change: 1 addition & 0 deletions client/src/session/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ impl SessionState {
(SessionState::Outgoing(_), SessionState::FailedEstablish(_)) => true,
(SessionState::ReverseConnection(_), SessionState::FailedEstablish(_)) => true,
(SessionState::RestartConnect, SessionState::FailedEstablish(_)) => true,
(SessionState::FailedEstablish(_), SessionState::FailedEstablish(_)) => true,
// Session can be moved to `Established` only if it was set to `Ready` by `SessionProtocol`
// or in case of `ReverseConnection`, when reverse `SessionPermit` sets `Reverse-Finished` state.
(SessionState::Incoming(InitState::Ready), SessionState::Established(_)) => true,
Expand Down
4 changes: 3 additions & 1 deletion client/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ impl TransportLayer {
channel.disconnect().await.ok();
}

self.virtual_tcp.shutdown().await;
self.virtual_tcp
.shutdown(self.session_layer.config.node_id)
.await;

// After Tcp shutdown will return, we are sending last Tcp packet to notify other Node,
// that connection is closed. We shouldn't close sessions before we give them chance to be sent.
Expand Down
Loading
Loading