Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use threading to run jobmanager loop #614

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- `load_stac`/`metadata_from_stac`: add support for extracting actual temporal dimension metadata ([#567](https://github.com/Open-EO/openeo-python-client/issues/567))
- `MultiBackendJobManager`: add `cancel_running_job_after` option to automatically cancel jobs that are running for too long ([#590](https://github.com/Open-EO/openeo-python-client/issues/590))
- `MultiBackendJobManager`: add API to the update loop in a separate thread, allowing controlled interruption.

### Changed

Expand Down
77 changes: 57 additions & 20 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import abc
import asyncio
import contextlib
import datetime
import json
import logging
import time
import warnings
from pathlib import Path
from threading import Thread
from typing import Callable, Dict, NamedTuple, Optional, Union, List

import pandas as pd
Expand Down Expand Up @@ -161,6 +163,7 @@ def __init__(
.. versionchanged:: 0.32.0
Added `cancel_running_job_after` parameter.
"""
self._stop = True
self.backends: Dict[str, _Backend] = {}
self.poll_sleep = poll_sleep
self._connections: Dict[str, _Backend] = {}
Expand All @@ -171,6 +174,7 @@ def __init__(
self._cancel_running_job_after = (
datetime.timedelta(seconds=cancel_running_job_after) if cancel_running_job_after is not None else None
)
self._timer = None

def add_backend(
self,
Expand Down Expand Up @@ -273,6 +277,36 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:

return df

def start_job_thread(self,start_job: Callable[[], BatchJob],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using "thread" in naming and docs might be confusing and setting wrong expectations as asyncio is not about threading but coroutines

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's now converted to use an actual 'Thread' object, so the confusion is gone?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@soxofaan if this is fine now, we can merge and continue with the other PR's

job_db: JobDatabaseInterface ):
"""
Start running the jobs in a separate thread, returns afterwards.
"""

# Resume from existing db
_log.info(f"Resuming `run_jobs` from existing {job_db}")
df = job_db.read()

self._stop = False



def run_loop():
while (sum(job_db.count_by_status(statuses=["not_started","created","queued","running"]).values()) > 0 and not self._stop
):
self._job_update_loop(df, job_db, start_job)

jdries marked this conversation as resolved.
Show resolved Hide resolved


self._timer = Thread(target = run_loop)
self._timer.start()

def stop_job_thread(self, force_timeout_seconds = 30):
self._stop = True
if(self._timer is not None):
self._timer.join(force_timeout_seconds)
jdries marked this conversation as resolved.
Show resolved Hide resolved


def run_jobs(
self,
df: Optional[pd.DataFrame],
Expand Down Expand Up @@ -364,29 +398,32 @@ def run_jobs(

while (
sum(job_db.count_by_status(statuses=["not_started","created","queued","running"]).values()) > 0

):
self._job_update_loop(df, job_db, start_job)
time.sleep(self.poll_sleep)

def _job_update_loop(self, df, job_db, start_job):
with ignore_connection_errors(context="get statuses"):
self._track_statuses(job_db)



not_started = job_db.get_by_status(statuses=["not_started"],max=200)
if len(not_started) > 0:
# Check number of jobs running at each backend
running = job_db.get_by_status(statuses=["created","queued","running"])
per_backend = running.groupby("backend_name").size().to_dict()
_log.info(f"Running per backend: {per_backend}")
for backend_name in self.backends:
backend_load = per_backend.get(backend_name, 0)
if backend_load < self.backends[backend_name].parallel_jobs:
to_add = self.backends[backend_name].parallel_jobs - backend_load
to_launch = not_started.iloc[0:to_add]
for i in to_launch.index:
self._launch_job(start_job, not_started, i, backend_name)
job_db.persist(to_launch)

with ignore_connection_errors(context="get statuses"):
self._track_statuses(job_db)

not_started = job_db.get_by_status(statuses=["not_started"],max=200)

if len(not_started) > 0:
# Check number of jobs running at each backend
running = job_db.get_by_status(statuses=["created","queued","running"])
per_backend = running.groupby("backend_name").size().to_dict()
_log.info(f"Running per backend: {per_backend}")
for backend_name in self.backends:
backend_load = per_backend.get(backend_name, 0)
if backend_load < self.backends[backend_name].parallel_jobs:
to_add = self.backends[backend_name].parallel_jobs - backend_load
to_launch = not_started.iloc[0:to_add]
for i in to_launch.index:
self._launch_job(start_job, not_started, i, backend_name)
job_db.persist(to_launch)

time.sleep(self.poll_sleep)

def _launch_job(self, start_job, df, i, backend_name):
"""Helper method for launching jobs
Expand Down
93 changes: 65 additions & 28 deletions tests/extra/test_job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import textwrap
import threading
import time
from time import sleep
from typing import Callable, Union
from unittest import mock

Expand Down Expand Up @@ -85,6 +86,69 @@ def sleep_mock(self):
yield sleep

def test_basic(self, tmp_path, requests_mock, sleep_mock):
manager = self.create_basic_mocked_manager(requests_mock, tmp_path)

df = pd.DataFrame(
{
"year": [2018, 2019, 2020, 2021, 2022],
# Use simple points in WKT format to test conversion to the geometry dtype
"geometry": ["POINT (1 2)"] * 5,
}
)
output_file = tmp_path / "jobs.csv"

def start_job(row, connection, **kwargs):
year = int(row["year"])
return BatchJob(job_id=f"job-{year}", connection=connection)

manager.run_jobs(df=df, start_job=start_job, output_file=output_file)
assert sleep_mock.call_count > 10

result = pd.read_csv(output_file)
assert len(result) == 5
assert set(result.status) == {"finished"}
assert set(result.backend_name) == {"foo", "bar"}

# We expect that the job metadata was saved, so verify that it exists.
# Checking for one of the jobs is enough.
metadata_path = manager.get_job_metadata_path(job_id="job-2022")
assert metadata_path.exists()

def test_basic_threading(self, tmp_path, requests_mock, sleep_mock):
manager = self.create_basic_mocked_manager(requests_mock, tmp_path)

df = pd.DataFrame(
{
"year": [2018, 2019, 2020, 2021, 2022],
# Use simple points in WKT format to test conversion to the geometry dtype
"geometry": ["POINT (1 2)"] * 5,
}
)
output_file = tmp_path / "jobs.csv"

def start_job(row, connection, **kwargs):
year = int(row["year"])
return BatchJob(job_id=f"job-{year}", connection=connection)

df = manager._normalize_df(df)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this normalize_df be handled automatically in the manager?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we lack a good mechanism to initialize a job db correctly, we'll have to come up with something

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@soxofaan I introduced initialize_job_db to address this issue.

job_db = CsvJobDatabase(output_file)
job_db.persist(df)
manager.start_job_thread( start_job=start_job,job_db=job_db)
sleep(20)
manager.stop_job_thread(10)
#assert sleep_mock.call_count > 10

result = pd.read_csv(output_file)
assert len(result) == 5
assert set(result.status) == {"finished"}
assert set(result.backend_name) == {"foo", "bar"}

# We expect that the job metadata was saved, so verify that it exists.
# Checking for one of the jobs is enough.
metadata_path = manager.get_job_metadata_path(job_id="job-2022")
assert metadata_path.exists()

def create_basic_mocked_manager(self, requests_mock, tmp_path):
requests_mock.get("http://foo.test/", json={"api_version": "1.1.0"})
requests_mock.get("http://bar.test/", json={"api_version": "1.1.0"})

Expand Down Expand Up @@ -136,38 +200,11 @@ def mock_job_status(job_id, queued=1, running=2):
mock_job_status("job-2020", queued=3, running=4)
mock_job_status("job-2021", queued=3, running=5)
mock_job_status("job-2022", queued=5, running=6)

root_dir = tmp_path / "job_mgr_root"
manager = MultiBackendJobManager(root_dir=root_dir)

manager.add_backend("foo", connection=openeo.connect("http://foo.test"))
manager.add_backend("bar", connection=openeo.connect("http://bar.test"))

df = pd.DataFrame(
{
"year": [2018, 2019, 2020, 2021, 2022],
# Use simple points in WKT format to test conversion to the geometry dtype
"geometry": ["POINT (1 2)"] * 5,
}
)
output_file = tmp_path / "jobs.csv"

def start_job(row, connection, **kwargs):
year = int(row["year"])
return BatchJob(job_id=f"job-{year}", connection=connection)

manager.run_jobs(df=df, start_job=start_job, output_file=output_file)
assert sleep_mock.call_count > 10

result = pd.read_csv(output_file)
assert len(result) == 5
assert set(result.status) == {"finished"}
assert set(result.backend_name) == {"foo", "bar"}

# We expect that the job metadata was saved, so verify that it exists.
# Checking for one of the jobs is enough.
metadata_path = manager.get_job_metadata_path(job_id="job-2022")
assert metadata_path.exists()
return manager

def test_normalize_df(self):
df = pd.DataFrame(
Expand Down