Skip to content

Commit

Permalink
Merge pull request #49 from DiamondLightSource/event-loop
Browse files Browse the repository at this point in the history
Enable controller initialisation on main event loop
  • Loading branch information
GDYendell committed Aug 8, 2024
2 parents 178eb0a + 29b552f commit 8e04689
Show file tree
Hide file tree
Showing 13 changed files with 358 additions and 217 deletions.
194 changes: 111 additions & 83 deletions src/fastcs/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,79 +2,68 @@
from collections import defaultdict
from collections.abc import Callable
from types import MethodType
from typing import Any

from softioc.asyncio_dispatcher import AsyncioDispatcher

from .attributes import AttrR, AttrW, Sender, Updater
from .controller import Controller
from .exceptions import FastCSException
from .mapping import Mapping, SingleMapping


def _get_initial_tasks(mapping: Mapping) -> list[Callable]:
initial_tasks: list[Callable] = []
initial_tasks.append(mapping.controller.connect)
return initial_tasks


def _create_periodic_scan_task(period, methods: list[Callable]) -> Callable:
async def scan_task() -> None:
while True:
await asyncio.gather(*[method() for method in methods])
await asyncio.sleep(period)

return scan_task


def _get_periodic_scan_tasks(scan_dict: dict[float, list[Callable]]) -> list[Callable]:
periodic_scan_tasks: list[Callable] = []
for period, methods in scan_dict.items():
periodic_scan_tasks.append(_create_periodic_scan_task(period, methods))

return periodic_scan_tasks


def _add_scan_method_tasks(
scan_dict: dict[float, list[Callable]], single_mapping: SingleMapping
):
for method in single_mapping.scan_methods.values():
scan_dict[method.period].append(
MethodType(method.fn, single_mapping.controller)
class Backend:
_initial_tasks: list[Callable] = []
_context: dict[str, Any] = {}

def __init__(
self, controller: Controller, loop: asyncio.AbstractEventLoop | None = None
):
self._dispatcher = AsyncioDispatcher(loop)
self._loop = self._dispatcher.loop
self._controller = controller

self._initial_tasks.append(controller.connect)

asyncio.run_coroutine_threadsafe(
self._controller.initialise(), self._loop
).result()

self._mapping = Mapping(self._controller)
self._link_process_tasks()

self._context.update(
{
"dispatcher": self._dispatcher,
"controller": self._controller,
"mapping": self._mapping,
}
)

def _link_process_tasks(self):
for single_mapping in self._mapping.get_controller_mappings():
_link_single_controller_put_tasks(single_mapping)
_link_attribute_sender_class(single_mapping)

def _create_updater_callback(attribute, controller):
async def callback():
try:
await attribute.updater.update(controller, attribute)
except Exception as e:
print(
f"Update loop in {attribute.updater} stopped:\n"
f"{e.__class__.__name__}: {e}"
)
raise

return callback

def run(self):
self._run_initial_tasks()
self._start_scan_tasks()

def _add_attribute_updater_tasks(
scan_dict: dict[float, list[Callable]], single_mapping: SingleMapping
):
for attribute in single_mapping.attributes.values():
match attribute:
case AttrR(updater=Updater(update_period=update_period)) as attribute:
callback = _create_updater_callback(
attribute, single_mapping.controller
)
scan_dict[update_period].append(callback)
self._run()

def _run_initial_tasks(self):
for task in self._initial_tasks:
future = asyncio.run_coroutine_threadsafe(task(), self._loop)
future.result()

def _get_scan_tasks(mapping: Mapping) -> list[Callable]:
scan_dict: dict[float, list[Callable]] = defaultdict(list)
def _start_scan_tasks(self):
scan_tasks = _get_scan_tasks(self._mapping)

for single_mapping in mapping.get_controller_mappings():
_add_scan_method_tasks(scan_dict, single_mapping)
_add_attribute_updater_tasks(scan_dict, single_mapping)
for task in scan_tasks:
asyncio.run_coroutine_threadsafe(task(), self._loop)

scan_tasks = _get_periodic_scan_tasks(scan_dict)
return scan_tasks
def _run(self):
raise NotImplementedError("Specific Backend must implement _run")


def _link_single_controller_put_tasks(single_mapping: SingleMapping) -> None:
Expand All @@ -94,13 +83,6 @@ def _link_single_controller_put_tasks(single_mapping: SingleMapping) -> None:
)


def _create_sender_callback(attribute, controller):
async def callback(value):
await attribute.sender.put(controller, attribute, value)

return callback


def _link_attribute_sender_class(single_mapping: SingleMapping) -> None:
for attr_name, attribute in single_mapping.attributes.items():
match attribute:
Expand All @@ -113,25 +95,71 @@ def _link_attribute_sender_class(single_mapping: SingleMapping) -> None:
attribute.set_process_callback(callback)


class Backend:
def __init__(self, mapping: Mapping, loop: asyncio.AbstractEventLoop):
self._mapping = mapping
self._loop = loop
def _create_sender_callback(attribute, controller):
async def callback(value):
await attribute.sender.put(controller, attribute, value)

def link_process_tasks(self):
for single_mapping in self._mapping.get_controller_mappings():
_link_single_controller_put_tasks(single_mapping)
_link_attribute_sender_class(single_mapping)
return callback

def run_initial_tasks(self):
initial_tasks = _get_initial_tasks(self._mapping)

for task in initial_tasks:
future = asyncio.run_coroutine_threadsafe(task(), self._loop)
future.result()
def _get_scan_tasks(mapping: Mapping) -> list[Callable]:
scan_dict: dict[float, list[Callable]] = defaultdict(list)

def start_scan_tasks(self):
scan_tasks = _get_scan_tasks(self._mapping)
for single_mapping in mapping.get_controller_mappings():
_add_scan_method_tasks(scan_dict, single_mapping)
_add_attribute_updater_tasks(scan_dict, single_mapping)

for task in scan_tasks:
asyncio.run_coroutine_threadsafe(task(), self._loop)
scan_tasks = _get_periodic_scan_tasks(scan_dict)
return scan_tasks


def _add_scan_method_tasks(
scan_dict: dict[float, list[Callable]], single_mapping: SingleMapping
):
for method in single_mapping.scan_methods.values():
scan_dict[method.period].append(
MethodType(method.fn, single_mapping.controller)
)


def _add_attribute_updater_tasks(
scan_dict: dict[float, list[Callable]], single_mapping: SingleMapping
):
for attribute in single_mapping.attributes.values():
match attribute:
case AttrR(updater=Updater(update_period=update_period)) as attribute:
callback = _create_updater_callback(
attribute, single_mapping.controller
)
scan_dict[update_period].append(callback)


def _create_updater_callback(attribute, controller):
async def callback():
try:
await attribute.updater.update(controller, attribute)
except Exception as e:
print(
f"Update loop in {attribute.updater} stopped:\n"
f"{e.__class__.__name__}: {e}"
)
raise

return callback


def _get_periodic_scan_tasks(scan_dict: dict[float, list[Callable]]) -> list[Callable]:
periodic_scan_tasks: list[Callable] = []
for period, methods in scan_dict.items():
periodic_scan_tasks.append(_create_periodic_scan_task(period, methods))

return periodic_scan_tasks


def _create_periodic_scan_task(period, methods: list[Callable]) -> Callable:
async def scan_task() -> None:
while True:
await asyncio.gather(*[method() for method in methods])
await asyncio.sleep(period)

return scan_task
31 changes: 7 additions & 24 deletions src/fastcs/backends/asyncio_backend.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,13 @@
from softioc import asyncio_dispatcher, softioc
from softioc import softioc

from fastcs.backend import Backend
from fastcs.mapping import Mapping
from fastcs.controller import Controller


class AsyncioBackend:
def __init__(self, mapping: Mapping):
self._mapping = mapping

def run_interactive_session(self):
# Create an asyncio dispatcher; the event loop is now running
dispatcher = asyncio_dispatcher.AsyncioDispatcher()

backend = Backend(self._mapping, dispatcher.loop)

backend.link_process_tasks()
backend.run_initial_tasks()
backend.start_scan_tasks()
class AsyncioBackend(Backend):
def __init__(self, controller: Controller): # noqa: F821
super().__init__(controller)

def _run(self):
# Run the interactive shell
global_variables = globals()
global_variables.update(
{
"dispatcher": dispatcher,
"mapping": self._mapping,
"controller": self._mapping.controller,
}
)
softioc.interactive_ioc(globals())
softioc.interactive_ioc(self._context)
23 changes: 12 additions & 11 deletions src/fastcs/backends/epics/backend.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
from fastcs.mapping import Mapping
from fastcs.backend import Backend
from fastcs.controller import Controller

from .docs import EpicsDocs, EpicsDocsOptions
from .gui import EpicsGUI, EpicsGUIOptions
from .ioc import EpicsIOC
from .ioc import EpicsIOC, EpicsIOCOptions


class EpicsBackend:
def __init__(self, mapping: Mapping, pv_prefix: str = "MY-DEVICE-PREFIX"):
self._mapping = mapping
class EpicsBackend(Backend):
def __init__(self, controller: Controller, pv_prefix: str = "MY-DEVICE-PREFIX"):
super().__init__(controller)

self._pv_prefix = pv_prefix
self._ioc = EpicsIOC(pv_prefix, self._mapping)

def create_docs(self, options: EpicsDocsOptions | None = None) -> None:
docs = EpicsDocs(self._mapping)
docs.create_docs(options)
EpicsDocs(self._mapping).create_docs(options)

def create_gui(self, options: EpicsGUIOptions | None = None) -> None:
gui = EpicsGUI(self._mapping, self._pv_prefix)
gui.create_gui(options)
EpicsGUI(self._mapping, self._pv_prefix).create_gui(options)

def get_ioc(self) -> EpicsIOC:
return EpicsIOC(self._mapping, self._pv_prefix)
def _run(self, options: EpicsIOCOptions | None = None):
self._ioc.run(self._dispatcher, self._context, options)
Loading

0 comments on commit 8e04689

Please sign in to comment.