Skip to content

Commit

Permalink
Scheduler implementation (#363)
Browse files Browse the repository at this point in the history
Proposal for a new scheduler. Inspired by tha apscheduler, but with two
important changes: Respecting a cancellation token both in the main loop
and in sleeps, and allowing tasks to be triggered manually while still
respecting the no overlap rule.
  • Loading branch information
mathialo authored Sep 24, 2024
1 parent dbb02a1 commit 5ab9de3
Show file tree
Hide file tree
Showing 6 changed files with 292 additions and 0 deletions.
13 changes: 13 additions & 0 deletions cognite/extractorutils/unstable/configuration/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,19 @@ def get_cognite_client(self, client_name: str) -> CogniteClient:
return CogniteClient(client_config)


class CronConfig(ConfigModel):
type: Literal["cron"]
expression: str


class IntervalConfig(ConfigModel):
type: Literal["interval"]
expression: TimeIntervalConfig


ScheduleConfig = Annotated[CronConfig | IntervalConfig, Field(discriminator="type")]


class LogLevel(Enum):
CRITICAL = "CRITICAL"
ERROR = "ERROR"
Expand Down
3 changes: 3 additions & 0 deletions cognite/extractorutils/unstable/scheduling/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from ._scheduler import TaskScheduler

__all__ = ["TaskScheduler"]
102 changes: 102 additions & 0 deletions cognite/extractorutils/unstable/scheduling/_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
from dataclasses import dataclass
from logging import getLogger
from threading import RLock, Thread
from time import time
from typing import Callable

import arrow
from humps import pascalize

from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.unstable.configuration.models import CronConfig, IntervalConfig, ScheduleConfig
from cognite.extractorutils.unstable.scheduling._schedules import CronSchedule, IntervalSchedule, Schedule


@dataclass
class Job:
name: str
call: Callable[[], None]
schedule: Schedule

def __hash__(self) -> int:
return hash(self.name)


class TaskScheduler:
def __init__(self, cancellation_token: CancellationToken) -> None:
self._cancellation_token = cancellation_token
self._jobs: dict[str, Job] = {}
self._jobs_lock = RLock()
self._running: set[Job] = set()
self._running_lock = RLock()

self._logger = getLogger()

def schedule_task(self, name: str, schedule: ScheduleConfig, task: Callable[[], None]) -> None:
parsed_schedule: Schedule
match schedule:
case CronConfig() as cron_config:
parsed_schedule = CronSchedule(expression=cron_config.expression)

case IntervalConfig() as interval_config:
parsed_schedule = IntervalSchedule(interval=interval_config.expression.seconds)

with self._jobs_lock:
self._jobs[name] = Job(name=name, call=task, schedule=parsed_schedule)

def _get_next(self) -> list[Job]:
if not self._jobs:
return []
with self._jobs_lock:
next_runs = sorted([(j.schedule.next(), j) for j in self._jobs.values()], key=lambda tup: tup[0])
return [job for (next, job) in next_runs if next == next_runs[0][0]] if next_runs else []

def _run_job(self, job: Job) -> bool:
with self._running_lock:
if job in self._running:
self._logger.warning(f"Job {job.name} already running")
return False

def wrap() -> None:
with self._running_lock:
self._running.add(job)
try:
job.call()

self._logger.info(f"Job {job.name} done. Next run at {arrow.get(job.schedule.next()).isoformat()}")

finally:
with self._running_lock:
self._running.remove(job)

Thread(target=wrap, name=f"Run{pascalize(job.name)}").start()
return True

def trigger(self, name: str) -> bool:
return self._run_job(self._jobs[name])

def run(self) -> None:
if not self._jobs:
raise ValueError("Can't run scheduler without any scheduled tasks")

# Run all interval jobs on startup since the first next() is one interval from now
for job in [j for j in self._jobs.values() if isinstance(j.schedule, IntervalSchedule)]:
self.trigger(job.name)

while not self._cancellation_token.is_cancelled:
next_runs = self._get_next()

next_time = next_runs[0].schedule.next()
wait_time = max(next_time - time(), 0)

if wait_time:
self._logger.info(f"Waiting until {arrow.get(next_time).isoformat()}")
if self._cancellation_token.wait(wait_time):
break

for job in next_runs:
self._logger.info(f"Starting job {job.name}")
self._run_job(job)

def stop(self) -> None:
self._cancellation_token.cancel()
31 changes: 31 additions & 0 deletions cognite/extractorutils/unstable/scheduling/_schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from abc import ABC, abstractmethod
from time import time

from croniter import croniter


class Schedule(ABC):
@abstractmethod
def next(self) -> int:
pass


class CronSchedule(Schedule):
def __init__(self, expression: str) -> None:
self._cron = croniter(expression)

def next(self) -> int:
return int(self._cron.get_next(start_time=time()))


class IntervalSchedule(Schedule):
def __init__(self, interval: int) -> None:
self._interval = interval
self._next = int(time())

def next(self) -> int:
t = time()
while t > self._next:
self._next += self._interval

return self._next
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ orjson = "^3.10.3"
httpx = "^0.27.0"
pydantic = "^2.8.2"
pyhumps = "^3.8.0"
croniter = "^3.0.3"

[tool.poetry.extras]
experimental = ["cognite-sdk-experimental"]
Expand Down
142 changes: 142 additions & 0 deletions tests/test_unstable/test_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
from threading import Thread
from time import sleep, time

from pytest import approx

from cognite.extractorutils.threading import CancellationToken
from cognite.extractorutils.unstable.configuration.models import IntervalConfig, TimeIntervalConfig
from cognite.extractorutils.unstable.scheduling._scheduler import TaskScheduler


class MockFunction:
def __init__(self, sleep_time: int) -> None:
self.called_times: list[float] = []

self.sleep_time = sleep_time

def __call__(self) -> None:
self.called_times.append(time())
sleep(self.sleep_time)


def test_interval_schedules() -> None:
ct = CancellationToken()

mock = MockFunction(sleep_time=1)

scheduler = TaskScheduler(cancellation_token=ct.create_child_token())
scheduler.schedule_task(
name="test",
schedule=IntervalConfig(type="interval", expression=TimeIntervalConfig("3s")),
task=mock,
)
start = time()
Thread(target=scheduler.run).start()
sleep(7)
scheduler.stop()

assert len(mock.called_times) == 3
assert mock.called_times[0] == approx(start)
assert mock.called_times[1] == approx(mock.called_times[0] + 3)
assert mock.called_times[2] == approx(mock.called_times[1] + 3)


def test_overlapping_schedules() -> None:
"""
Test with a trigger that fires when the job is still running
Timeline:
time | 0 1 2 3 4 5 6 7 8 9 STOP
--------|-------------------------
trigger | x x x x x
job | |-----| |-----| |-----|
"""

ct = CancellationToken()

mock = MockFunction(sleep_time=3)

scheduler = TaskScheduler(cancellation_token=ct.create_child_token())
scheduler.schedule_task(
name="test",
schedule=IntervalConfig(type="interval", expression=TimeIntervalConfig("2s")),
task=mock,
)
start = time()
Thread(target=scheduler.run).start()
sleep(9)
scheduler.stop()

assert len(mock.called_times) == 3
assert mock.called_times[0] == approx(start)
assert mock.called_times[1] == approx(mock.called_times[0] + 4)
assert mock.called_times[1] == approx(mock.called_times[1] + 4)


def test_manual() -> None:
ct = CancellationToken()
mock = MockFunction(sleep_time=0)

scheduler = TaskScheduler(cancellation_token=ct.create_child_token())
scheduler.schedule_task(
name="test",
schedule=IntervalConfig(type="interval", expression=TimeIntervalConfig("1h")),
task=mock,
)

Thread(target=scheduler.run).start()

scheduler.trigger("test")
sleep(0.1)
scheduler.trigger("test")
sleep(0.1)
scheduler.trigger("test")

sleep(1)

scheduler.stop()

assert len(mock.called_times) == 4


def test_manual_interval_mix() -> None:
"""
Test with a scheduled trigger mixed with manual trigger, make sure there's no overlap
Timeline:
time | 0 1 2 3 4 5 6 7 8 9 STOP
---------|-------------------------
schedule | x x x
manual | x x
job | |---| |---| |---|
"""

ct = CancellationToken()
mock = MockFunction(sleep_time=2)

scheduler = TaskScheduler(cancellation_token=ct.create_child_token())
scheduler.schedule_task(
name="test",
schedule=IntervalConfig(type="interval", expression=TimeIntervalConfig("4s")),
task=mock,
)

start = time()
Thread(target=scheduler.run).start()
sleep(1)
first_trigger = scheduler.trigger("test")
sleep(2)
second_trigger = scheduler.trigger("test")

sleep(6)
scheduler.stop()

assert not first_trigger
assert second_trigger

assert len(mock.called_times) == 3
assert mock.called_times[0] == approx(start)
assert mock.called_times[1] == approx(start + 3)
assert mock.called_times[2] == approx(start + 8)

0 comments on commit 5ab9de3

Please sign in to comment.