Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Multi queue consumption #1475

Merged
merged 1 commit into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions bench/config/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,20 @@
import logging
import os

# imports - third party imports
import click

# imports - module imports
import bench
from bench.app import use_rq
from bench.utils import get_bench_name, which
from bench.bench import Bench
from bench.config.common_site_config import (
update_config,
get_gunicorn_workers,
get_default_max_requests,
compute_max_requests_jitter,
get_default_max_requests,
get_gunicorn_workers,
update_config,
)

# imports - third party imports
import click

from bench.utils import get_bench_name, which

logger = logging.getLogger(bench.PROJECT_NAME)

Expand Down Expand Up @@ -58,6 +57,7 @@ def generate_supervisor_config(bench_path, user=None, yes=False, skip_redis=Fals
"bench_cmd": which("bench"),
"skip_redis": skip_redis,
"workers": config.get("workers", {}),
"multi_queue_consumption": can_enable_multi_queue_consumption(bench_path),
}
)

Expand Down Expand Up @@ -90,6 +90,21 @@ def get_supervisord_conf():
return possibility


def can_enable_multi_queue_consumption(bench_path: str) -> bool:
try:
from semantic_version import Version

from bench.utils.app import get_current_version

supported_version = Version(major=14, minor=18, patch=0)

frappe_version = Version(get_current_version("frappe", bench_path=bench_path))

return frappe_version > supported_version
except Exception:
return False


def check_supervisord_config(user=None):
"""From bench v5.x, we're moving to supervisor running as user"""
# i don't think bench should be responsible for this but we're way past this now...
Expand Down
62 changes: 8 additions & 54 deletions bench/config/templates/supervisor.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ stderr_logfile={{ bench_dir }}/logs/web.error.log
user={{ user }}
directory={{ sites_dir }}

{% if use_rq %}
[program:{{ bench_name }}-frappe-schedule]
command={{ bench_cmd }} schedule
priority=3
Expand All @@ -23,6 +22,7 @@ stderr_logfile={{ bench_dir }}/logs/schedule.error.log
user={{ user }}
directory={{ bench_dir }}

{% if not multi_queue_consumption %}
[program:{{ bench_name }}-frappe-default-worker]
command={{ bench_cmd }} worker --queue default
priority=4
Expand All @@ -36,9 +36,10 @@ directory={{ bench_dir }}
killasgroup=true
numprocs={{ background_workers }}
process_name=%(program_name)s-%(process_num)d
{% endif %}

[program:{{ bench_name }}-frappe-short-worker]
command={{ bench_cmd }} worker --queue short
command={{ bench_cmd }} worker --queue short{{',default' if multi_queue_consumption else ''}}
priority=4
autostart=true
autorestart=true
Expand All @@ -52,7 +53,7 @@ numprocs={{ background_workers }}
process_name=%(program_name)s-%(process_num)d

[program:{{ bench_name }}-frappe-long-worker]
command={{ bench_cmd }} worker --queue long
command={{ bench_cmd }} worker --queue long{{',default,short' if multi_queue_consumption else ''}}
priority=4
autostart=true
autorestart=true
Expand Down Expand Up @@ -81,54 +82,6 @@ numprocs={{ worker_details["background_workers"] or background_workers }}
process_name=%(program_name)s-%(process_num)d
{% endfor %}

{% else %}
[program:{{ bench_name }}-frappe-workerbeat]
command={{ bench_dir }}/env/bin/python -m frappe.celery_app beat -s beat.schedule
priority=3
autostart=true
autorestart=true
stdout_logfile={{ bench_dir }}/logs/workerbeat.log
stderr_logfile={{ bench_dir }}/logs/workerbeat.error.log
user={{ user }}
directory={{ sites_dir }}

[program:{{ bench_name }}-frappe-worker]
command={{ bench_dir }}/env/bin/python -m frappe.celery_app worker -n jobs@%%h -Ofair --soft-time-limit 360 --time-limit 390 --loglevel INFO
priority=4
autostart=true
autorestart=true
stdout_logfile={{ bench_dir }}/logs/worker.log
stderr_logfile={{ bench_dir }}/logs/worker.error.log
user={{ user }}
stopwaitsecs=400
directory={{ sites_dir }}
killasgroup=true

[program:{{ bench_name }}-frappe-longjob-worker]
command={{ bench_dir }}/env/bin/python -m frappe.celery_app worker -n longjobs@%%h -Ofair --soft-time-limit 1500 --time-limit 1530 --loglevel INFO
priority=2
autostart=true
autorestart=true
stdout_logfile={{ bench_dir }}/logs/worker.log
stderr_logfile={{ bench_dir }}/logs/worker.error.log
user={{ user }}
stopwaitsecs=1540
directory={{ sites_dir }}
killasgroup=true

[program:{{ bench_name }}-frappe-async-worker]
command={{ bench_dir }}/env/bin/python -m frappe.celery_app worker -n async@%%h -Ofair --soft-time-limit 1500 --time-limit 1530 --loglevel INFO
priority=2
autostart=true
autorestart=true
stdout_logfile={{ bench_dir }}/logs/worker.log
stderr_logfile={{ bench_dir }}/logs/worker.error.log
user={{ user }}
stopwaitsecs=1540
directory={{ sites_dir }}
killasgroup=true

{% endif %}

{% if not skip_redis %}
[program:{{ bench_name }}-redis-cache]
Expand Down Expand Up @@ -167,15 +120,16 @@ directory={{ bench_dir }}
[group:{{ bench_name }}-web]
programs={{ bench_name }}-frappe-web {%- if node -%} ,{{ bench_name }}-node-socketio {%- endif%}

{% if use_rq %}

{% if multi_queue_consumption %}

[group:{{ bench_name }}-workers]
programs={{ bench_name }}-frappe-schedule,{{ bench_name }}-frappe-default-worker,{{ bench_name }}-frappe-short-worker,{{ bench_name }}-frappe-long-worker{%- for worker_name in workers -%},{{ bench_name }}-frappe-{{ worker_name }}-worker{%- endfor %}
programs={{ bench_name }}-frappe-schedule,{{ bench_name }}-frappe-short-worker,{{ bench_name }}-frappe-long-worker{%- for worker_name in workers -%},{{ bench_name }}-frappe-{{ worker_name }}-worker{%- endfor %}

{% else %}

[group:{{ bench_name }}-workers]
programs={{ bench_name }}-frappe-workerbeat,{{ bench_name }}-frappe-worker,{{ bench_name }}-frappe-longjob-worker,{{ bench_name }}-frappe-async-worker{%- for worker_name in workers -%},{{ bench_name }}-frappe-{{ worker_name }}-worker{%- endfor %}
programs={{ bench_name }}-frappe-schedule,{{ bench_name }}-frappe-default-worker,{{ bench_name }}-frappe-short-worker,{{ bench_name }}-frappe-long-worker{%- for worker_name in workers -%},{{ bench_name }}-frappe-{{ worker_name }}-worker{%- endfor %}

{% endif %}

Expand Down