Skip to content

Commit

Permalink
PROTON-2818: clearer code and comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
cliffjansen committed May 6, 2024
1 parent 079bf64 commit 7a035ae
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 20 deletions.
14 changes: 7 additions & 7 deletions c/src/proactor/epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,7 @@ static void write_flush(pconnection_t *pc) {

static void pconnection_connected_lh(pconnection_t *pc);
static void pconnection_maybe_connect_lh(pconnection_t *pc);
static void pconnection_first_connect_lh(pconnection_t *pc);
static bool pconnection_first_connect_lh(pconnection_t *pc);

static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool sched_ready, bool topup) {
bool waking = false;
Expand All @@ -1142,14 +1142,10 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
if (sched_ready) schedule_done(&pc->task);

if (pc->first_schedule) {
// Normal case: resumed logic from pn_proactor_connect2.
// But possible tie: pn_connection_wake() or pn_proactor_disconnect().
// Respect the latter. Check former after connect attempt.
pc->first_schedule = false;
assert(!topup && !events);
if (!pc->queued_disconnect) {
pconnection_first_connect_lh(pc); // Drops and reaquires lock. Wake or disconnect state may change.
if (pc->psocket.epoll_io.fd != -1 && !pc->queued_disconnect && !pni_task_wake_pending(&pc->task)) {
if (pconnection_first_connect_lh(pc)) {
unlock(&pc->task.mutex);
return NULL;
}
Expand Down Expand Up @@ -1435,7 +1431,8 @@ bool schedule_if_inactive(pn_proactor_t *p) {
}

// Call from pconnection_process with task lock held.
static void pconnection_first_connect_lh(pconnection_t *pc) {
// Return true if the socket is connecting and there are no Proton events to deliver.
static bool pconnection_first_connect_lh(pconnection_t *pc) {
unlock(&pc->task.mutex);
// TODO: move this step to a separate worker thread that scales in response to multiple blocking DNS lookups.
int gai_error = pgetaddrinfo(pc->host, pc->port, 0, &pc->addrinfo);
Expand All @@ -1444,9 +1441,12 @@ static void pconnection_first_connect_lh(pconnection_t *pc) {
if (!gai_error) {
pc->ai = pc->addrinfo;
pconnection_maybe_connect_lh(pc); /* Start connection attempts */
if (pc->psocket.epoll_io.fd != -1 && !pc->queued_disconnect && !pni_task_wake_pending(&pc->task))
return true;
} else {
psocket_gai_error(&pc->psocket, gai_error, "connect to ");
}
return false;
}

void pn_proactor_connect2(pn_proactor_t *p, pn_connection_t *c, pn_transport_t *t, const char *addr) {
Expand Down
29 changes: 16 additions & 13 deletions c/src/proactor/epoll_raw_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,23 +183,28 @@ pn_raw_connection_t *pn_raw_connection(void) {
}

// Call from pconnection_process with task lock held.
static void praw_connection_first_connect_lh(praw_connection_t *prc) {
// Return true if the socket is connecting and there are no Proton events to deliver.
static bool praw_connection_first_connect_lh(praw_connection_t *prc) {
const char *host;
const char *port;

unlock(&prc->task.mutex);
size_t addrlen = strlen(prc->taddr);
char *addr_buf = (char*) alloca(addrlen+1);
pni_parse_addr(prc->taddr, addr_buf, addrlen+1, &host, &port);
// TODO: move this step to a separate worker thread that scales in response to multiple blocking DNS lookups.
int gai_error = pgetaddrinfo(host, port, 0, &prc->addrinfo);
lock(&prc->task.mutex);

if (!gai_error) {
prc->ai = prc->addrinfo;
praw_connection_maybe_connect_lh(prc); /* Start connection attempts */
if (prc->psocket.epoll_io.fd != -1 && !pni_task_wake_pending(&prc->task))
return true;
} else {
psocket_gai_error(prc, gai_error, "connect to ", prc->taddr);
}
return false;
}

void pn_proactor_raw_connect(pn_proactor_t *p, pn_raw_connection_t *rc, const char *addr) {
Expand Down Expand Up @@ -403,6 +408,16 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool
}
int events = io_events;
int fd = rc->psocket.epoll_io.fd;

if (rc->first_schedule) {
rc->first_schedule = false;
assert(!events); // No socket yet.
assert(!rc->connected);
if (praw_connection_first_connect_lh(rc)) {
unlock(&rc->task.mutex);
return NULL;
}
}
if (!rc->connected) {
if (events & (EPOLLHUP | EPOLLERR)) {
praw_connection_maybe_connect_lh(rc);
Expand All @@ -422,18 +437,6 @@ pn_event_batch_t *pni_raw_connection_process(task_t *t, uint32_t io_events, bool
}
if (events & EPOLLOUT)
praw_connection_connected_lh(rc);
if (rc->first_schedule) {
// Normal case: resumed logic from pn_proactor_raw_connect.
// But possible tie: pn_raw_connection_wake()
// Defer wake check until getaddrinfo is done.
rc->first_schedule = false;
assert(!events); // No socket yet.
praw_connection_first_connect_lh(rc); // Drops and reacquires lock.
if (rc->psocket.epoll_io.fd != -1 && !pni_task_wake_pending(&rc->task)) {
unlock(&rc->task.mutex);
return NULL;
}
}

unlock(&rc->task.mutex);
return &rc->batch;
Expand Down

0 comments on commit 7a035ae

Please sign in to comment.