From 5ab9de3acb52980a0a96c44b8235690381ee866c Mon Sep 17 00:00:00 2001 From: Mathias Lohne Date: Tue, 24 Sep 2024 09:19:30 +0200 Subject: [PATCH] Scheduler implementation (#363) Proposal for a new scheduler. Inspired by tha apscheduler, but with two important changes: Respecting a cancellation token both in the main loop and in sleeps, and allowing tasks to be triggered manually while still respecting the no overlap rule. --- .../unstable/configuration/models.py | 13 ++ .../unstable/scheduling/__init__.py | 3 + .../unstable/scheduling/_scheduler.py | 102 +++++++++++++ .../unstable/scheduling/_schedules.py | 31 ++++ pyproject.toml | 1 + tests/test_unstable/test_scheduler.py | 142 ++++++++++++++++++ 6 files changed, 292 insertions(+) create mode 100644 cognite/extractorutils/unstable/scheduling/__init__.py create mode 100644 cognite/extractorutils/unstable/scheduling/_scheduler.py create mode 100644 cognite/extractorutils/unstable/scheduling/_schedules.py create mode 100644 tests/test_unstable/test_scheduler.py diff --git a/cognite/extractorutils/unstable/configuration/models.py b/cognite/extractorutils/unstable/configuration/models.py index f45120f9..dc1ffe2d 100644 --- a/cognite/extractorutils/unstable/configuration/models.py +++ b/cognite/extractorutils/unstable/configuration/models.py @@ -201,6 +201,19 @@ def get_cognite_client(self, client_name: str) -> CogniteClient: return CogniteClient(client_config) +class CronConfig(ConfigModel): + type: Literal["cron"] + expression: str + + +class IntervalConfig(ConfigModel): + type: Literal["interval"] + expression: TimeIntervalConfig + + +ScheduleConfig = Annotated[CronConfig | IntervalConfig, Field(discriminator="type")] + + class LogLevel(Enum): CRITICAL = "CRITICAL" ERROR = "ERROR" diff --git a/cognite/extractorutils/unstable/scheduling/__init__.py b/cognite/extractorutils/unstable/scheduling/__init__.py new file mode 100644 index 00000000..5f95312d --- /dev/null +++ b/cognite/extractorutils/unstable/scheduling/__init__.py @@ -0,0 +1,3 @@ +from ._scheduler import TaskScheduler + +__all__ = ["TaskScheduler"] diff --git a/cognite/extractorutils/unstable/scheduling/_scheduler.py b/cognite/extractorutils/unstable/scheduling/_scheduler.py new file mode 100644 index 00000000..bb578bcb --- /dev/null +++ b/cognite/extractorutils/unstable/scheduling/_scheduler.py @@ -0,0 +1,102 @@ +from dataclasses import dataclass +from logging import getLogger +from threading import RLock, Thread +from time import time +from typing import Callable + +import arrow +from humps import pascalize + +from cognite.extractorutils.threading import CancellationToken +from cognite.extractorutils.unstable.configuration.models import CronConfig, IntervalConfig, ScheduleConfig +from cognite.extractorutils.unstable.scheduling._schedules import CronSchedule, IntervalSchedule, Schedule + + +@dataclass +class Job: + name: str + call: Callable[[], None] + schedule: Schedule + + def __hash__(self) -> int: + return hash(self.name) + + +class TaskScheduler: + def __init__(self, cancellation_token: CancellationToken) -> None: + self._cancellation_token = cancellation_token + self._jobs: dict[str, Job] = {} + self._jobs_lock = RLock() + self._running: set[Job] = set() + self._running_lock = RLock() + + self._logger = getLogger() + + def schedule_task(self, name: str, schedule: ScheduleConfig, task: Callable[[], None]) -> None: + parsed_schedule: Schedule + match schedule: + case CronConfig() as cron_config: + parsed_schedule = CronSchedule(expression=cron_config.expression) + + case IntervalConfig() as interval_config: + parsed_schedule = IntervalSchedule(interval=interval_config.expression.seconds) + + with self._jobs_lock: + self._jobs[name] = Job(name=name, call=task, schedule=parsed_schedule) + + def _get_next(self) -> list[Job]: + if not self._jobs: + return [] + with self._jobs_lock: + next_runs = sorted([(j.schedule.next(), j) for j in self._jobs.values()], key=lambda tup: tup[0]) + return [job for (next, job) in next_runs if next == next_runs[0][0]] if next_runs else [] + + def _run_job(self, job: Job) -> bool: + with self._running_lock: + if job in self._running: + self._logger.warning(f"Job {job.name} already running") + return False + + def wrap() -> None: + with self._running_lock: + self._running.add(job) + try: + job.call() + + self._logger.info(f"Job {job.name} done. Next run at {arrow.get(job.schedule.next()).isoformat()}") + + finally: + with self._running_lock: + self._running.remove(job) + + Thread(target=wrap, name=f"Run{pascalize(job.name)}").start() + return True + + def trigger(self, name: str) -> bool: + return self._run_job(self._jobs[name]) + + def run(self) -> None: + if not self._jobs: + raise ValueError("Can't run scheduler without any scheduled tasks") + + # Run all interval jobs on startup since the first next() is one interval from now + for job in [j for j in self._jobs.values() if isinstance(j.schedule, IntervalSchedule)]: + self.trigger(job.name) + + while not self._cancellation_token.is_cancelled: + next_runs = self._get_next() + + next_time = next_runs[0].schedule.next() + wait_time = max(next_time - time(), 0) + + if wait_time: + self._logger.info(f"Waiting until {arrow.get(next_time).isoformat()}") + if self._cancellation_token.wait(wait_time): + break + + for job in next_runs: + self._logger.info(f"Starting job {job.name}") + self._run_job(job) + + def stop(self) -> None: + self._cancellation_token.cancel() diff --git a/cognite/extractorutils/unstable/scheduling/_schedules.py b/cognite/extractorutils/unstable/scheduling/_schedules.py new file mode 100644 index 00000000..b7c5ef63 --- /dev/null +++ b/cognite/extractorutils/unstable/scheduling/_schedules.py @@ -0,0 +1,31 @@ +from abc import ABC, abstractmethod +from time import time + +from croniter import croniter + + +class Schedule(ABC): + @abstractmethod + def next(self) -> int: + pass + + +class CronSchedule(Schedule): + def __init__(self, expression: str) -> None: + self._cron = croniter(expression) + + def next(self) -> int: + return int(self._cron.get_next(start_time=time())) + + +class IntervalSchedule(Schedule): + def __init__(self, interval: int) -> None: + self._interval = interval + self._next = int(time()) + + def next(self) -> int: + t = time() + while t > self._next: + self._next += self._interval + + return self._next diff --git a/pyproject.toml b/pyproject.toml index 46433ccb..5833f8d2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -74,6 +74,7 @@ orjson = "^3.10.3" httpx = "^0.27.0" pydantic = "^2.8.2" pyhumps = "^3.8.0" +croniter = "^3.0.3" [tool.poetry.extras] experimental = ["cognite-sdk-experimental"] diff --git a/tests/test_unstable/test_scheduler.py b/tests/test_unstable/test_scheduler.py new file mode 100644 index 00000000..28b70b38 --- /dev/null +++ b/tests/test_unstable/test_scheduler.py @@ -0,0 +1,142 @@ +from threading import Thread +from time import sleep, time + +from pytest import approx + +from cognite.extractorutils.threading import CancellationToken +from cognite.extractorutils.unstable.configuration.models import IntervalConfig, TimeIntervalConfig +from cognite.extractorutils.unstable.scheduling._scheduler import TaskScheduler + + +class MockFunction: + def __init__(self, sleep_time: int) -> None: + self.called_times: list[float] = [] + + self.sleep_time = sleep_time + + def __call__(self) -> None: + self.called_times.append(time()) + sleep(self.sleep_time) + + +def test_interval_schedules() -> None: + ct = CancellationToken() + + mock = MockFunction(sleep_time=1) + + scheduler = TaskScheduler(cancellation_token=ct.create_child_token()) + scheduler.schedule_task( + name="test", + schedule=IntervalConfig(type="interval", expression=TimeIntervalConfig("3s")), + task=mock, + ) + start = time() + Thread(target=scheduler.run).start() + sleep(7) + scheduler.stop() + + assert len(mock.called_times) == 3 + assert mock.called_times[0] == approx(start) + assert mock.called_times[1] == approx(mock.called_times[0] + 3) + assert mock.called_times[2] == approx(mock.called_times[1] + 3) + + +def test_overlapping_schedules() -> None: + """ + Test with a trigger that fires when the job is still running + + Timeline: + + time | 0 1 2 3 4 5 6 7 8 9 STOP + --------|------------------------- + trigger | x x x x x + job | |-----| |-----| |-----| + """ + + ct = CancellationToken() + + mock = MockFunction(sleep_time=3) + + scheduler = TaskScheduler(cancellation_token=ct.create_child_token()) + scheduler.schedule_task( + name="test", + schedule=IntervalConfig(type="interval", expression=TimeIntervalConfig("2s")), + task=mock, + ) + start = time() + Thread(target=scheduler.run).start() + sleep(9) + scheduler.stop() + + assert len(mock.called_times) == 3 + assert mock.called_times[0] == approx(start) + assert mock.called_times[1] == approx(mock.called_times[0] + 4) + assert mock.called_times[1] == approx(mock.called_times[1] + 4) + + +def test_manual() -> None: + ct = CancellationToken() + mock = MockFunction(sleep_time=0) + + scheduler = TaskScheduler(cancellation_token=ct.create_child_token()) + scheduler.schedule_task( + name="test", + schedule=IntervalConfig(type="interval", expression=TimeIntervalConfig("1h")), + task=mock, + ) + + Thread(target=scheduler.run).start() + + scheduler.trigger("test") + sleep(0.1) + scheduler.trigger("test") + sleep(0.1) + scheduler.trigger("test") + + sleep(1) + + scheduler.stop() + + assert len(mock.called_times) == 4 + + +def test_manual_interval_mix() -> None: + """ + Test with a scheduled trigger mixed with manual trigger, make sure there's no overlap + + Timeline: + + time | 0 1 2 3 4 5 6 7 8 9 STOP + ---------|------------------------- + schedule | x x x + manual | x x + job | |---| |---| |---| + """ + + ct = CancellationToken() + mock = MockFunction(sleep_time=2) + + scheduler = TaskScheduler(cancellation_token=ct.create_child_token()) + scheduler.schedule_task( + name="test", + schedule=IntervalConfig(type="interval", expression=TimeIntervalConfig("4s")), + task=mock, + ) + + start = time() + Thread(target=scheduler.run).start() + sleep(1) + first_trigger = scheduler.trigger("test") + sleep(2) + second_trigger = scheduler.trigger("test") + + sleep(6) + scheduler.stop() + + assert not first_trigger + assert second_trigger + + assert len(mock.called_times) == 3 + assert mock.called_times[0] == approx(start) + assert mock.called_times[1] == approx(start + 3) + assert mock.called_times[2] == approx(start + 8)