You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Describe the usage question you have. Please include as many useful details as possible.
I have a do_exchange() method that needs to interact with async code that's an external dependency where I don't have any control of it whatsoever. Unfortunately, my understanding around concurrency/multi-threading isn't the best. I do know though that mixing multi-threaded with async code isn't a recommended approach, if there's any alternative please let me know.
I'm creating a new event loop inside the do_exchange method, so when multiple requests arrive the server spawns up multiple threads as expected, and each thread should (?) have its own event loop. Whenever the external async code is called, I'm using an async lock to make sure no multiple threads access it simultaneously since it has stateful variables that I don't want to change at the same time (and it's not thread-safe anyways).
defdo_exchange(
self,
context: flight.ServerCallContext,
descriptor: flight.FlightDescriptor,
reader: flight.FlightStreamReader,
writer: flight.FlightStreamWriter,
) ->None:
"""This method implements the `do_exchange` method of the FlightServerBase class. :param context: A ServerCallContext object. :param descriptor: A FlightDescriptor object. :param reader: A FlightStreamReader object. :param writer: A FlightStreamWriter object. """loop=asyncio.new_event_loop()
asyncio.set_event_loop(loop)
is_first_batch=TruewhileTrue:
logger.info("Processing data...")
(writer, reader, is_first_batch) =loop.run_until_complete(
self._run_inference_and_write_to_stream(writer, reader, is_first_batch),
)
logger.info("Output data ready to be consumed.")
however when two client requests arrive at the same time I'm getting:
pyarrow._flight.FlightServerError: Task <Task pending name='Task-2' coro=<ArrowFlightAsyncServer._run_inference_and_write_to_stream() running at /Users/panagiotisvardanis/Documents/wallaroo/projects/platform/conductor/model-auto-conversion/mac/mac/service/arrow_flight/async_server.py:157> cb=[_run_until_complete_cb() at /Users/panagiotisvardanis/.pyenv/versions/3.8.19/lib/python3.8/asyncio/base_events.py:184]> got Future <Future pending> attached to a different loop. Detail: Python exception: RuntimeError. gRPC client debug context: UNKNOWN:Error received from peer ipv4:0.0.0.0:8080 {created_time:"2024-09-16T15:44:17.886048+02:00", grpc_status:2, grpc_message:"Task <Task pending name=\'Task-2\' coro=<ArrowFlightAsyncServer._run_inference_and_write_to_stream() running at /Users/panagiotisvardanis/Documents/wallaroo/projects/platform/conductor/model-auto-conversion/mac/mac/service/arrow_flight/async_server.py:157> cb=[_run_until_complete_cb() at /Users/panagiotisvardanis/.pyenv/versions/3.8.19/lib/python3.8/asyncio/base_events.py:184]> got Future <Future pending> attached to a different loop. Detail: Python exception: RuntimeError"}. Client context: OK
I do see in the logs that two threads are spawned indeed, but when they both reach the external async code the above error is called.
-------------------------------- live log call ---------------------------------
INFO Processing data...
INFO Processing data...
INFO Is `PythonStep` awaitable? True
INFO Is `PythonStep` awaitable? True
INFO Acquiring lock...
INFO Acquiring lock...
INFO Sleeping for 1 second...
INFO Sleeping for 1 second...
FAILED [100%]
------------------------------ live log teardown -------------------------------
INFO Done sleeping.
INFO Releasing lock...
INFO Converting `InferenceData` to `pa.RecordBatch`...
If I remove the async lock from process_data(), it waits forever.
Component(s)
FlightRPC, Python
The text was updated successfully, but these errors were encountered:
Describe the usage question you have. Please include as many useful details as possible.
I have a
do_exchange()
method that needs to interact withasync
code that's an external dependency where I don't have any control of it whatsoever. Unfortunately, my understanding around concurrency/multi-threading isn't the best. I do know though that mixing multi-threaded withasync
code isn't a recommended approach, if there's any alternative please let me know.I'm creating a new event loop inside the
do_exchange
method, so when multiple requests arrive the server spawns up multiple threads as expected, and each thread should (?) have its own event loop. Whenever the externalasync
code is called, I'm using an async lock to make sure no multiple threads access it simultaneously since it has stateful variables that I don't want to change at the same time (and it's not thread-safe anyways).however when two client requests arrive at the same time I'm getting:
the "rough" async code looks like this:
I do see in the logs that two threads are spawned indeed, but when they both reach the external async code the above error is called.
If I remove the async lock from
process_data()
, it waits forever.Component(s)
FlightRPC, Python
The text was updated successfully, but these errors were encountered: