Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

limit concurrency for input downloads #45

Closed
rabernat opened this issue Jan 23, 2021 · 2 comments · Fixed by #557
Closed

limit concurrency for input downloads #45

rabernat opened this issue Jan 23, 2021 · 2 comments · Fixed by #557
Labels
design question A question of the design of Pangeo Forge executors Related to executors and pipelines

Comments

@rabernat
Copy link
Contributor

NOAA NCEI might not like it if we fire off hundreds of simultaneous requests to their servers. We would like to limit the concurrency of this step if possible.

From an API perspective, the question is:

  • Should a user have to specify concurrency limits as part of the recipe?
  • Alternatively, should we try to auto-detect if flows access certain resources (e.g. specific FTP servers) and then automatically enforce concurrency limits?

In terms of implementation, Prefect cloud has a prefect solution: https://docs.prefect.io/orchestration/concepts/task-concurrency-limiting.html

However this only works with cloud. Some questions about this option are:

  • Are we okay with getting locked into a prefect cloud feature?
  • What convention do we use for the task tags to indicate concurrency? For example, I could imagine a tag like www.ncei.noaa.gov, allowing us to limit concurrency for all requests to that server from all flows simultaneously! That would be pretty useful.

If we don't want to get locked into cloud features, @jcrist made the following suggestion on the Prefect slack:

I'd handle this with a distributed.Semaphore within your tasks for now. Alternatively, you could make use of dask's worker resources. Tasks tagged with tags of the form dask-resource:KEY=N will each take N amount of KEY resource. So you could limit active download tasks by creating a resource for downloading then tagging download tasks to mark that they require that resource. (edited)
That would mean that the total concurrency limit scales with the number of workers (so it isn't absolute across the whole run), but would also work and wouldn't block other tasks from running like the Semaphore would.

@rabernat rabernat added design question A question of the design of Pangeo Forge executors Related to executors and pipelines labels Jan 23, 2021
@rabernat
Copy link
Contributor Author

rabernat commented Mar 4, 2022

The situation described in pangeo-forge/staged-recipes#108 (comment) adds another dimension to the concurrency story. That recipe pulls data over opendap. When using opendap, the data loading happens during the store_chunk stage, not the cache_input stage.

If we follow the path outlined in #245, we may end up making significant changes to how Pangeo Forge works internally. That should give us the ability to attach a concurrency restriction on any stage of the pipeline. In pseudocode it may look something like

recipe = source | subset({'time': 100}) | load_with_xarray(concurrency=5) | zarr_destination

@cisaacstern
Copy link
Member

xref #389, which is a duplicate I opened not remembering this was already here.

Aiming to fix this in #557

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
design question A question of the design of Pangeo Forge executors Related to executors and pipelines
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants