Skip to content

Commit

Permalink
async execute request
Browse files Browse the repository at this point in the history
  • Loading branch information
DerThorsten committed Mar 20, 2024
1 parent 07ce01c commit bd7eed3
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 50 deletions.
4 changes: 2 additions & 2 deletions include/xeus/xinterpreter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace xeus

void configure();

nl::json execute_request(xrequest_context context,
nl::json execute_request( xexecute_request_context context,
const std::string& code,
bool silent,
bool store_history,
Expand Down Expand Up @@ -101,7 +101,7 @@ namespace xeus

virtual void configure_impl() = 0;

virtual nl::json execute_request_impl(xrequest_context request_context,
virtual nl::json execute_request_impl(xexecute_request_context request_context,
int execution_counter,
const std::string& code,
bool silent,
Expand Down
13 changes: 13 additions & 0 deletions include/xeus/xrequest_context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include "xeus/xmessage.hpp" // for xmessage::guid_list
#include "xeus/xserver.hpp" // for channel

#include <functional> // for std::function

namespace nl = nlohmann;

namespace xeus
Expand All @@ -38,6 +40,17 @@ namespace xeus
channel m_origin;
guid_list m_id;
};


class XEUS_API xexecute_request_context : public xrequest_context
{
public:
xexecute_request_context(nl::json header, channel origin, guid_list id, std::function<void(const xexecute_request_context&,nl::json)> on_send_reply);

void send_reply(nl::json reply);
private:
std::function<void(const xexecute_request_context&, nl::json)> m_on_send_reply;
};
}

#endif
2 changes: 1 addition & 1 deletion src/xinterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace xeus
configure_impl();
}

nl::json xinterpreter::execute_request(xrequest_context context,
nl::json xinterpreter::execute_request(xexecute_request_context context,
const std::string& code,
bool silent,
bool store_history,
Expand Down
90 changes: 50 additions & 40 deletions src/xkernel_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,20 @@ namespace xeus
, p_history_manager(history_manager)
, p_debugger(debugger)
{
// Request handlers
m_handler["execute_request"] = &xkernel_core::execute_request;
m_handler["complete_request"] = &xkernel_core::complete_request;
m_handler["inspect_request"] = &xkernel_core::inspect_request;
m_handler["history_request"] = &xkernel_core::history_request;
m_handler["is_complete_request"] = &xkernel_core::is_complete_request;
m_handler["comm_info_request"] = &xkernel_core::comm_info_request;
m_handler["comm_open"] = &xkernel_core::comm_open;
m_handler["comm_close"] = &xkernel_core::comm_close;
m_handler["comm_msg"] = &xkernel_core::comm_msg;
m_handler["kernel_info_request"] = &xkernel_core::kernel_info_request;
m_handler["shutdown_request"] = &xkernel_core::shutdown_request;
m_handler["interrupt_request"] = &xkernel_core::interrupt_request;
m_handler["debug_request"] = &xkernel_core::debug_request;
// Request handlers (all but execute_request are blocking)
m_handler["execute_request"] = handler_type{&xkernel_core::execute_request, /*blocking*/ false};
m_handler["complete_request"] = handler_type{&xkernel_core::complete_request, true};
m_handler["inspect_request"] = handler_type{&xkernel_core::inspect_request, true};
m_handler["history_request"] = handler_type{&xkernel_core::history_request, true};
m_handler["is_complete_request"] = handler_type{&xkernel_core::is_complete_request, true};
m_handler["comm_info_request"] = handler_type{&xkernel_core::comm_info_request, true};
m_handler["comm_open"] = handler_type{&xkernel_core::comm_open, true};
m_handler["comm_close"] = handler_type{&xkernel_core::comm_close, true};
m_handler["comm_msg"] = handler_type{&xkernel_core::comm_msg, true};
m_handler["kernel_info_request"] = handler_type{&xkernel_core::kernel_info_request, true};
m_handler["shutdown_request"] = handler_type{&xkernel_core::shutdown_request, true};
m_handler["interrupt_request"] = handler_type{&xkernel_core::interrupt_request, true};
m_handler["debug_request"] = handler_type{&xkernel_core::debug_request, true};

// Server bindings
p_server->register_shell_listener(std::bind(&xkernel_core::dispatch_shell, this, _1));
Expand Down Expand Up @@ -195,7 +195,7 @@ namespace xeus

std::string msg_type = header.value("msg_type", "");
handler_type handler = get_handler(msg_type);
if (handler == nullptr)
if (handler.fptr == nullptr)
{
std::cerr << "ERROR: received unknown message" << std::endl;
std::cerr << "Message type: " << msg_type << std::endl;
Expand All @@ -204,22 +204,25 @@ namespace xeus
{
try
{
(this->*handler)(std::move(msg), c);
(this->*(handler.fptr))(std::move(msg), c);
}
catch (std::exception& e)
{
std::cerr << "ERROR: received bad message: " << e.what() << std::endl;
std::cerr << "Message type: " << msg_type << std::endl;
}
}

publish_status(header, "idle", c);
// async handlers need to set the idle status themselves
if(handler.blocking)
{
publish_status(header, "idle", c);
}
}

auto xkernel_core::get_handler(const std::string& msg_type) -> handler_type
{
auto iter = m_handler.find(msg_type);
handler_type res = (iter == m_handler.end()) ? nullptr : iter->second;
handler_type res = (iter == m_handler.end()) ? handler_type{nullptr} : iter->second;
return res;
}

Expand All @@ -237,29 +240,36 @@ namespace xeus
bool stop_on_error = content.value("stop_on_error", false);

nl::json metadata = get_metadata();
xrequest_context request_context(request.header(), c, request.identities());
nl::json reply = p_interpreter->execute_request(std::move(request_context),
xexecute_request_context request_context(request.header(), c, request.identities(),
[this, silent, store_history, code, stop_on_error](const xexecute_request_context & ctx , nl::json reply)
{
this->send_reply(ctx.id(), "execute_reply", ctx.header(), nl::json::object(), std::move(reply), ctx.origin());

int execution_count = reply.value("execution_count", 1);
std::string status = reply.value("status", "error");


if (!silent && store_history)
{
this->p_history_manager->store_inputs(0, execution_count, code);
}

if (!silent && status == "error" && stop_on_error)
{
constexpr long polling_interval = 50;
p_server->abort_queue(std::bind(&xkernel_core::abort_request, this, _1), polling_interval);
}


// idle
publish_status("idle", ctx.header(), ctx.origin());

}
);

p_interpreter->execute_request(std::move(request_context),
code, silent, store_history, std::move(user_expression), allow_stdin);
int execution_count = reply.value("execution_count", 1);
std::string status = reply.value("status", "error");
send_reply(
request.identities(),
"execute_reply",
request.header(),
std::move(metadata),
std::move(reply),
c);

if (!silent && store_history)
{
p_history_manager->store_inputs(0, execution_count, code);
}

if (!silent && status == "error" && stop_on_error)
{
constexpr long polling_interval = 50;
p_server->abort_queue(std::bind(&xkernel_core::abort_request, this, _1), polling_interval);
}
}
catch (std::exception& e)
{
Expand Down
7 changes: 6 additions & 1 deletion src/xkernel_core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ namespace xeus

private:

using handler_type = void (xkernel_core::*)(xmessage, channel);
using handler_fptr_type = void (xkernel_core::*)(xmessage, channel);

struct handler_type{
handler_fptr_type fptr = nullptr;
bool blocking = true;
};


void dispatch(xmessage msg, channel c);
Expand Down
4 changes: 2 additions & 2 deletions src/xmock_interpreter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ namespace xeus
{
}

nl::json execute_request_impl(xrequest_context /*request_context*/,
nl::json execute_request_impl(xexecute_request_context request_context,
int /*execution_counter*/,
const std::string& /*code*/,
bool /*silent*/,
bool /*store_history*/,
nl::json /*user_expressions*/,
bool /*allow_stdin*/) override
{
return nl::json();
request_context.send_reply(nl::json());
}

nl::json complete_request_impl(const std::string& /*code*/, int /*cursor_pos*/) override
Expand Down
18 changes: 18 additions & 0 deletions src/xrequest_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,22 @@ namespace xeus
{
return m_id;
}


void xexecute_request_context::send_reply(nl::json reply)
{
m_on_send_reply(*this, std::move(reply));
}


xexecute_request_context::xexecute_request_context(nl::json header,
channel origin,
guid_list id,
std::function<void(const xexecute_request_context&,nl::json)> on_send_reply)
: xrequest_context(std::move(header),
origin,
std::move(id)),
m_on_send_reply(std::move(on_send_reply))
{
}
}
6 changes: 3 additions & 3 deletions test/xmock_interpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace xeus
using function_type = std::function<void(xeus::xcomm&&, const xeus::xmessage&)>;
}

nl::json xmock_interpreter::execute_request_impl(xrequest_context request_context,
nl::json xmock_interpreter::execute_request_impl(xexecute_request_context ctx,
int execution_counter,
const std::string& code,
bool /* silent */,
Expand Down Expand Up @@ -62,14 +62,14 @@ namespace xeus
{"start", 0}
});

return xeus::create_successful_reply(payload);
ctx.send_reply(xeus::create_successful_reply(payload));
}

nl::json pub_data;
pub_data["text/plain"] = code;
publish_execution_result(request_context, execution_counter, std::move(pub_data), nl::json::object());

return xeus::create_successful_reply();
ctx.send_reply( xeus::create_successful_reply());
}

nl::json xmock_interpreter::complete_request_impl(const std::string& /* code */,
Expand Down
2 changes: 1 addition & 1 deletion test/xmock_interpreter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ namespace xeus

void configure_impl() override;

nl::json execute_request_impl(xrequest_context request_context,
nl::json execute_request_impl(xexecute_request_context request_context,
int execution_counter,
const std::string& code,
bool silent,
Expand Down

0 comments on commit bd7eed3

Please sign in to comment.