Skip to content

Commit

Permalink
Use a different, lower HWM for the worker threads
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesOHeaDLS committed Sep 9, 2024
1 parent 1f1b749 commit 0932403
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 3 deletions.
3 changes: 2 additions & 1 deletion cpp/data/common/include/EigerDefinitions.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ namespace Eiger {

// EigerFan related constants
const int MORE_MESSAGES = 1;
const int RECEIVE_HWM = 100000;
const int RECEIVE_HWM = 100000; // High water marks for the main receiver thread
const int SEND_HWM = 100000;
const int WORKER_SEND_HWM = 10000; // A lower high water mark for the worker threads
const int LINGER_TIMEOUT = 100; // Socket linger timeout in milliseconds

const std::string CONTROL_CMD_KEY = "msg_val";
Expand Down
4 changes: 2 additions & 2 deletions cpp/data/eigerfan/src/MultiPullBroker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ void MultiPullBroker::worker_loop(std::string& endpoint) {
// threads on the context is not sufficient.
zmq::context_t source_context(1);
zmq::socket_t source_socket(source_context, ZMQ_PULL);
source_socket.setsockopt(ZMQ_SNDHWM, &SEND_HWM, sizeof(SEND_HWM));
source_socket.setsockopt(ZMQ_SNDHWM, &WORKER_SEND_HWM, sizeof(WORKER_SEND_HWM));
source_socket.setsockopt(ZMQ_LINGER, &LINGER_TIMEOUT, sizeof(LINGER_TIMEOUT));
source_socket.connect(endpoint.c_str());

// Create sink socket in context of main thread
// The sink sockets must use the context from the main thread to use the inproc://
// protocol. If it uses a different context the client will not see the messages.
zmq::socket_t sink_socket(*this->inproc_context_, ZMQ_PUSH);
sink_socket.setsockopt(ZMQ_SNDHWM, &SEND_HWM, sizeof(SEND_HWM));
sink_socket.setsockopt(ZMQ_SNDHWM, &WORKER_SEND_HWM, sizeof(WORKER_SEND_HWM));
sink_socket.setsockopt(ZMQ_LINGER, &LINGER_TIMEOUT, sizeof(LINGER_TIMEOUT));
sink_socket.connect(this->sink_endpoint_.c_str());

Expand Down

0 comments on commit 0932403

Please sign in to comment.