diff --git a/c/src/proactor/epoll-internal.h b/c/src/proactor/epoll-internal.h index 8f765121e..550324ccd 100644 --- a/c/src/proactor/epoll-internal.h +++ b/c/src/proactor/epoll-internal.h @@ -238,6 +238,7 @@ typedef struct pconnection_t { bool server; /* accept, not connect */ bool tick_pending; bool queued_disconnect; /* deferred from pn_proactor_disconnect() */ + bool first_schedule; pn_condition_t *disconnect_condition; // Following values only changed by (sole) working task: uint32_t current_arm; // active epoll io events diff --git a/c/src/proactor/epoll.c b/c/src/proactor/epoll.c index 7abd884ef..7714c23fe 100644 --- a/c/src/proactor/epoll.c +++ b/c/src/proactor/epoll.c @@ -810,6 +810,7 @@ static const char *pconnection_setup(pconnection_t *pc, pn_proactor_t *p, pn_con pc->wbuf_current = NULL; pc->hog_count = 0; pc->batch.next_event = pconnection_batch_next; + pc->first_schedule = false; if (server) { pn_transport_set_server(pc->driver.transport); @@ -1122,6 +1123,7 @@ static void write_flush(pconnection_t *pc) { static void pconnection_connected_lh(pconnection_t *pc); static void pconnection_maybe_connect_lh(pconnection_t *pc); +static bool pconnection_first_connect_lh(pconnection_t *pc); static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool sched_ready, bool topup) { bool waking = false; @@ -1139,6 +1141,17 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, } if (sched_ready) schedule_done(&pc->task); + if (pc->first_schedule) { + pc->first_schedule = false; + assert(!topup && !events); + if (!pc->queued_disconnect) { + if (pconnection_first_connect_lh(pc)) { + unlock(&pc->task.mutex); + return NULL; + } + } + } + if (topup) { // Only called by the batch owner. Does not loop, just "tops up" // once. May be back depending on hog_count. @@ -1396,6 +1409,7 @@ static void pconnection_maybe_connect_lh(pconnection_t *pc) { int pgetaddrinfo(const char *host, const char *port, int flags, struct addrinfo **res) { + // NOTE: getaddrinfo can block on DNS lookup (PROTON-2812). struct addrinfo hints = { 0 }; hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; @@ -1416,7 +1430,27 @@ bool schedule_if_inactive(pn_proactor_t *p) { return false; } +// Call from pconnection_process with task lock held. +// Return true if the socket is connecting and there are no Proton events to deliver. +static bool pconnection_first_connect_lh(pconnection_t *pc) { + unlock(&pc->task.mutex); + // TODO: move this step to a separate worker thread that scales in response to multiple blocking DNS lookups. + int gai_error = pgetaddrinfo(pc->host, pc->port, 0, &pc->addrinfo); + lock(&pc->task.mutex); + + if (!gai_error) { + pc->ai = pc->addrinfo; + pconnection_maybe_connect_lh(pc); /* Start connection attempts */ + if (pc->psocket.epoll_io.fd != -1 && !pc->queued_disconnect && !pni_task_wake_pending(&pc->task)) + return true; + } else { + psocket_gai_error(&pc->psocket, gai_error, "connect to "); + } + return false; +} + void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, const char *addr) { + // Called from an arbitrary thread. Do setup prior to getaddrinfo, then switch to a worker thread. size_t addrlen = strlen(addr); pconnection_t *pc = (pconnection_t*) malloc(sizeof(pconnection_t)+addrlen); assert(pc); // TODO: memory safety @@ -1430,27 +1464,8 @@ void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t * lock(&pc->task.mutex); proactor_add(&pc->task); pn_connection_open(pc->driver.connection); /* Auto-open */ - - bool notify = false; - - if (pc->disconnected) { - notify = schedule(&pc->task); /* Error during initialization */ - } else { - int gai_error = pgetaddrinfo(pc->host, pc->port, 0, &pc->addrinfo); - if (!gai_error) { - pn_connection_open(pc->driver.connection); /* Auto-open */ - pc->ai = pc->addrinfo; - pconnection_maybe_connect_lh(pc); /* Start connection attempts */ - if (pc->disconnected) notify = schedule(&pc->task); - } else { - psocket_gai_error(&pc->psocket, gai_error, "connect to "); - notify = schedule(&pc->task); - lock(&p->task.mutex); - notify |= schedule_if_inactive(p); - unlock(&p->task.mutex); - } - } - /* We need to issue INACTIVE on immediate failure */ + pc->first_schedule = true; // Resume connection setup when next scheduled. + bool notify = schedule(&pc->task); unlock(&pc->task.mutex); if (notify) notify_poller(p); } diff --git a/c/src/proactor/epoll_raw_connection.c b/c/src/proactor/epoll_raw_connection.c index b7547f9f9..350c16ba8 100644 --- a/c/src/proactor/epoll_raw_connection.c +++ b/c/src/proactor/epoll_raw_connection.c @@ -52,6 +52,8 @@ struct praw_connection_t { bool disconnected; bool hup_detected; bool read_check; + bool first_schedule; + char *taddr; }; static void psocket_error(praw_connection_t *rc, int err, const char* msg) { @@ -145,6 +147,8 @@ static void praw_connection_init(praw_connection_t *prc, pn_proactor_t *p, pn_ra prc->connected = false; prc->disconnected = false; + prc->first_schedule = false; + prc->taddr = NULL; prc->batch.next_event = pni_raw_batch_next; pmutex_init(&prc->rearm_mutex); @@ -163,6 +167,7 @@ static void praw_connection_cleanup(praw_connection_t *prc) { task_finalize(&prc->task); if (prc->addrinfo) freeaddrinfo(prc->addrinfo); + free(prc->taddr); free(prc); } // else proactor_disconnect logic owns prc and its final free @@ -177,39 +182,48 @@ pn_raw_connection_t *pn_raw_connection(void) { return &conn->raw_connection; } -void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const char *addr) { - assert(rc); - praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection); - praw_connection_init(prc, p, rc); - // TODO: check case of proactor shutting down - - lock(&prc->task.mutex); - proactor_add(&prc->task); - - bool notify = false; - +// Call from pconnection_process with task lock held. +// Return true if the socket is connecting and there are no Proton events to deliver. +static bool praw_connection_first_connect_lh(praw_connection_t *prc) { const char *host; const char *port; - size_t addrlen = strlen(addr); - char *addr_buf = (char*) alloca(addrlen+1); - pni_parse_addr(addr, addr_buf, addrlen+1, &host, &port); + unlock(&prc->task.mutex); + size_t addrlen = strlen(prc->taddr); + char *addr_buf = (char*) alloca(addrlen+1); + pni_parse_addr(prc->taddr, addr_buf, addrlen+1, &host, &port); + // TODO: move this step to a separate worker thread that scales in response to multiple blocking DNS lookups. int gai_error = pgetaddrinfo(host, port, 0, &prc->addrinfo); + lock(&prc->task.mutex); + if (!gai_error) { prc->ai = prc->addrinfo; praw_connection_maybe_connect_lh(prc); /* Start connection attempts */ - if (prc->disconnected) notify = schedule(&prc->task); + if (prc->psocket.epoll_io.fd != -1 && !pni_task_wake_pending(&prc->task)) + return true; } else { - psocket_gai_error(prc, gai_error, "connect to ", addr); - prc->disconnected = true; - notify = schedule(&prc->task); - lock(&p->task.mutex); - notify |= schedule_if_inactive(p); - unlock(&p->task.mutex); + psocket_gai_error(prc, gai_error, "connect to ", prc->taddr); } + return false; +} - /* We need to issue INACTIVE on immediate failure */ +void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const char *addr) { + // Called from an arbitrary thread. Do setup prior to getaddrinfo, then switch to a worker thread. + assert(rc); + praw_connection_t *prc = containerof(rc, praw_connection_t, raw_connection); + praw_connection_init(prc, p, rc); + // TODO: check case of proactor shutting down + + lock(&prc->task.mutex); + size_t addrlen = strlen(addr); + prc->taddr = (char*) malloc(addrlen+1); + assert(prc->taddr); // TODO: memory safety + memcpy(prc->taddr, addr, addrlen+1); + prc->first_schedule = true; // Resume connection setup when next scheduled. + proactor_add(&prc->task); + bool notify = schedule(&prc->task); unlock(&prc->task.mutex); + if (notify) notify_poller(p); } @@ -394,6 +408,16 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool } int events = io_events; int fd = rc->psocket.epoll_io.fd; + + if (rc->first_schedule) { + rc->first_schedule = false; + assert(!events); // No socket yet. + assert(!rc->connected); + if (praw_connection_first_connect_lh(rc)) { + unlock(&rc->task.mutex); + return NULL; + } + } if (!rc->connected) { if (events & (EPOLLHUP | EPOLLERR)) { praw_connection_maybe_connect_lh(rc); @@ -413,6 +437,7 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool } if (events & EPOLLOUT) praw_connection_connected_lh(rc); + unlock(&rc->task.mutex); return &rc->batch; } diff --git a/c/tests/raw_wake_test.cpp b/c/tests/raw_wake_test.cpp index fee780b15..4f2f88257 100644 --- a/c/tests/raw_wake_test.cpp +++ b/c/tests/raw_wake_test.cpp @@ -275,7 +275,6 @@ TEST_CASE("proactor_raw_connection_wake") { pn_proactor_raw_connect(pn_listener_proactor(l), rc, addr.c_str()); - REQUIRE_RUN(p, PN_LISTENER_ACCEPT); REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS); REQUIRE_RUN(p, PN_RAW_CONNECTION_NEED_READ_BUFFERS); CHECK(pn_proactor_get(p) == NULL); /* idle */