diff --git a/cpp/data/common/include/EigerDefinitions.h b/cpp/data/common/include/EigerDefinitions.h index 8a834be..f054bfe 100644 --- a/cpp/data/common/include/EigerDefinitions.h +++ b/cpp/data/common/include/EigerDefinitions.h @@ -43,8 +43,9 @@ namespace Eiger { // EigerFan related constants const int MORE_MESSAGES = 1; - const int RECEIVE_HWM = 100000; + const int RECEIVE_HWM = 100000; // High water marks for the main receiver thread const int SEND_HWM = 100000; + const int WORKER_SEND_HWM = 10000; // A lower high water mark for the worker threads const int LINGER_TIMEOUT = 100; // Socket linger timeout in milliseconds const std::string CONTROL_CMD_KEY = "msg_val"; diff --git a/cpp/data/eigerfan/src/MultiPullBroker.cpp b/cpp/data/eigerfan/src/MultiPullBroker.cpp index 8fc1cd1..89f2cb8 100644 --- a/cpp/data/eigerfan/src/MultiPullBroker.cpp +++ b/cpp/data/eigerfan/src/MultiPullBroker.cpp @@ -54,7 +54,7 @@ void MultiPullBroker::worker_loop(std::string& endpoint) { // threads on the context is not sufficient. zmq::context_t source_context(1); zmq::socket_t source_socket(source_context, ZMQ_PULL); - source_socket.setsockopt(ZMQ_SNDHWM, &SEND_HWM, sizeof(SEND_HWM)); + source_socket.setsockopt(ZMQ_SNDHWM, &WORKER_SEND_HWM, sizeof(WORKER_SEND_HWM)); source_socket.setsockopt(ZMQ_LINGER, &LINGER_TIMEOUT, sizeof(LINGER_TIMEOUT)); source_socket.connect(endpoint.c_str()); @@ -62,7 +62,7 @@ void MultiPullBroker::worker_loop(std::string& endpoint) { // The sink sockets must use the context from the main thread to use the inproc:// // protocol. If it uses a different context the client will not see the messages. zmq::socket_t sink_socket(*this->inproc_context_, ZMQ_PUSH); - sink_socket.setsockopt(ZMQ_SNDHWM, &SEND_HWM, sizeof(SEND_HWM)); + sink_socket.setsockopt(ZMQ_SNDHWM, &WORKER_SEND_HWM, sizeof(WORKER_SEND_HWM)); sink_socket.setsockopt(ZMQ_LINGER, &LINGER_TIMEOUT, sizeof(LINGER_TIMEOUT)); sink_socket.connect(this->sink_endpoint_.c_str());