Skip to content

Commit

Permalink
Merge pull request #55 from flexiblepower/implement-reconnect
Browse files Browse the repository at this point in the history
Implemented optional reconnect for S2Connection & new threading model where S2Connection.start_as_rm is blocking.
  • Loading branch information
lfse-slafleur authored Sep 30, 2024
2 parents 252be44 + 185c8f8 commit 9a88744
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 45 deletions.
9 changes: 2 additions & 7 deletions examples/example_frbc_rm.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging
import sys
import threading
import uuid
import signal
import datetime
Expand Down Expand Up @@ -159,20 +158,16 @@ def deactivate(self, conn: S2Connection) -> None:
provides_forecast=False,
provides_power_measurements=[CommodityQuantity.ELECTRIC_POWER_L1],
),
reconnect=True,
)


stop_event = threading.Event()


def stop(signal_num, _current_stack_frame):
print(f"Received signal {signal_num}. Will stop S2 connection.")
stop_event.set()
s2_conn.stop()


signal.signal(signal.SIGINT, stop)
signal.signal(signal.SIGTERM, stop)

s2_conn.start_as_rm()
stop_event.wait()
s2_conn.stop()
119 changes: 81 additions & 38 deletions src/s2python/s2_connection.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import json
import logging
import time
import threading
import uuid
from dataclasses import dataclass
Expand Down Expand Up @@ -180,6 +181,7 @@ def register_handler(self, msg_type: Type[S2Message], handler: S2MessageHandler)

class S2Connection: # pylint: disable=too-many-instance-attributes
url: str
reconnect: bool
reception_status_awaiter: ReceptionStatusAwaiter
ws: Optional[WSConnection]
s2_parser: S2Parser
Expand All @@ -195,16 +197,20 @@ class S2Connection: # pylint: disable=too-many-instance-attributes

_eventloop: asyncio.AbstractEventLoop
_stop_event: asyncio.Event
_restart_connection_event: asyncio.Event

def __init__(
def __init__( # pylint: disable=too-many-arguments
self,
url: str,
role: EnergyManagementRole,
control_types: List[S2ControlType],
asset_details: AssetDetails,
reconnect: bool = False,
) -> None:
self.url = url
self.reconnect = reconnect
self.reception_status_awaiter = ReceptionStatusAwaiter()
self.ws = None
self.s2_parser = S2Parser()

self._handlers = MessageHandlers()
Expand All @@ -221,14 +227,13 @@ def __init__(
self._handlers.register_handler(HandshakeResponse, self.handle_handshake_response_as_rm)

def start_as_rm(self) -> None:
self._thread = threading.Thread(target=self._run_eventloop, daemon=False)
self._thread.start()
logger.debug("Started eventloop thread!")
self._run_eventloop(self._run_as_rm())

def _run_eventloop(self) -> None:
def _run_eventloop(self, main_task: Awaitable[None]) -> None:
self._thread = threading.current_thread()
logger.debug("Starting eventloop")
try:
self._eventloop.run_until_complete(self._run_as_rm())
self._eventloop.run_until_complete(main_task)
except asyncio.CancelledError:
pass
logger.debug("S2 connection thread has stopped.")
Expand Down Expand Up @@ -256,52 +261,78 @@ async def _do_stop(self) -> None:

async def _run_as_rm(self) -> None:
logger.debug("Connecting as S2 resource manager.")
self._received_messages = asyncio.Queue()

self._stop_event = asyncio.Event()
await self.connect_ws()

background_tasks = []
background_tasks.append(self._eventloop.create_task(self._receive_messages()))
background_tasks.append(self._eventloop.create_task(self._handle_received_messages()))
first_run = True

async def wait_till_stop() -> None:
await self._stop_event.wait()
while (first_run or self.reconnect) and not self._stop_event.is_set():
first_run = False
self._restart_connection_event = asyncio.Event()
await self._connect_and_run()
time.sleep(1)

background_tasks.append(self._eventloop.create_task(wait_till_stop()))
logger.debug("Finished S2 connection eventloop.")

await self.connect_as_rm()
(done, pending) = await asyncio.wait(background_tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
try:
await task
except asyncio.CancelledError:
pass
except websockets.ConnectionClosedError:
logger.info("The other party closed the websocket connection.c")
async def _connect_and_run(self) -> None:
self._received_messages = asyncio.Queue()
await self._connect_ws()
if self.ws:

for task in pending:
try:
task.cancel()
await task
except asyncio.CancelledError:
pass
async def wait_till_stop() -> None:
await self._stop_event.wait()

async def wait_till_connection_restart() -> None:
await self._restart_connection_event.wait()

background_tasks = [
self._eventloop.create_task(self._receive_messages()),
self._eventloop.create_task(wait_till_stop()),
self._eventloop.create_task(self._connect_as_rm()),
self._eventloop.create_task(wait_till_connection_restart()),
]

(done, pending) = await asyncio.wait(
background_tasks, return_when=asyncio.FIRST_COMPLETED
)
if self._current_control_type:
self._current_control_type.deactivate(self)
self._current_control_type = None

for task in done:
try:
await task
except asyncio.CancelledError:
pass
except (websockets.ConnectionClosedError, websockets.ConnectionClosedOK):
logger.info("The other party closed the websocket connection.")

for task in pending:
try:
task.cancel()
await task
except asyncio.CancelledError:
pass

if self.ws:
await self.ws.close()
await self.ws.wait_closed()
logger.debug("Finished S2 connection eventloop.")

async def connect_ws(self) -> None:
self.ws = await ws_connect(uri=self.url)
async def _connect_ws(self) -> None:
try:
self.ws = await ws_connect(uri=self.url)
except (EOFError, OSError) as e:
logger.info("Could not connect due to: %s", str(e))

async def connect_as_rm(self) -> None:
async def _connect_as_rm(self) -> None:
await self.send_msg_and_await_reception_status_async(
Handshake(
message_id=uuid.uuid4(), role=self.role, supported_protocol_versions=[S2_VERSION]
)
)
logger.debug("Send handshake to CEM. Expecting Handshake and HandshakeResponse from CEM.")

await self._handle_received_messages()

async def handle_handshake(
self, _: "S2Connection", message: S2Message, send_okay: Awaitable[None]
) -> None:
Expand Down Expand Up @@ -427,7 +458,11 @@ async def _send_and_forget(self, s2_msg: S2Message) -> None:

json_msg = s2_msg.to_json()
logger.debug("Sending message %s", json_msg)
await self.ws.send(json_msg)
try:
await self.ws.send(json_msg)
except websockets.ConnectionClosedError as e:
logger.error("Unable to send message %s due to %s", s2_msg, str(e))
self._restart_connection_event.set()

async def respond_with_reception_status(
self, subject_message_id: str, status: ReceptionStatusValues, diagnostic_label: str
Expand Down Expand Up @@ -458,9 +493,17 @@ async def send_msg_and_await_reception_status_async(
s2_msg.message_id, # type: ignore[attr-defined]
timeout_reception_status,
)
reception_status = await self.reception_status_awaiter.wait_for_reception_status(
s2_msg.message_id, timeout_reception_status # type: ignore[attr-defined]
)
try:
reception_status = await self.reception_status_awaiter.wait_for_reception_status(
s2_msg.message_id, timeout_reception_status # type: ignore[attr-defined]
)
except TimeoutError:
logger.error(
"Did not receive a reception status on time for %s",
s2_msg.message_id, # type: ignore[attr-defined]
)
self._stop_event.set()
raise

if reception_status.status != ReceptionStatusValues.OK and raise_on_error:
raise RuntimeError(f"ReceptionStatus was not OK but rather {reception_status.status}")
Expand Down

0 comments on commit 9a88744

Please sign in to comment.