Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved Beam Opener PTransforms #375

Merged
merged 18 commits into from
Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions docs/pangeo_forge_recipes/api_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,40 @@
:members:
```


## Recipes
## Storage

```{eval-rst}
.. automodule:: pangeo_forge_recipes.recipes
:members:
.. automodule:: pangeo_forge_recipes.storage
:members:
```

## Storage

## Processing Functions

The [Beam PTransform Style Guide](https://beam.apache.org/contribute/ptransform-style-guide/) recommends:

> Expose large, non-trivial, reusable sequential bits of the
> transform’s code, which others might want to reuse in ways you
> haven’t anticipated, as a regular function or class library.
> The transform should simply wire this logic together.

These are those functions.

```{eval-rst}
.. automodule:: pangeo_forge_recipes.storage
.. automodule:: pangeo_forge_recipes.openers
:members:
```


## Executors
## PTransforms

The [Beam PTransform Style Guide](https://beam.apache.org/contribute/ptransform-style-guide/) recommends:

> Expose every major data-parallel task accomplished by your
> library as a composite PTransform. This allows the structure of
> the transform to evolve transparently to the code that uses it.

```{eval-rst}
.. automodule:: pangeo_forge_recipes.executors
:members:
.. automodule:: pangeo_forge_recipes.transforms
:members:
```
134 changes: 134 additions & 0 deletions pangeo_forge_recipes/openers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
"""Standalone functions for opening sources as Dataset objects."""

import io
import tempfile
import warnings
from typing import Dict, Optional, Union

import xarray as xr

from .patterns import FileType
from .storage import CacheFSSpecTarget, OpenFileType, _copy_btw_filesystems, _get_opener


def open_url(
url: str,
cache: Optional[CacheFSSpecTarget] = None,
secrets: Optional[Dict] = None,
open_kwargs: Optional[Dict] = None,
) -> OpenFileType:
"""Open a string-based URL with fsspec.

:param url: The URL to be parsed by fsspec.
:param cache: If provided, data will be cached in the object before opening.
:param secrets: If provided these secrets will be injected into the URL as a query string.
:param open_kwargs: Extra arguments passed to fsspec.open.
"""

kw = open_kwargs or {}
if cache is not None:
# this has side effects
cache.cache_file(url, secrets, **kw)
open_file = cache.open_file(url, mode="rb")
else:
open_file = _get_opener(url, secrets, **kw)
return open_file


OPENER_MAP = {
FileType.netcdf3: dict(engine="scipy"),
FileType.netcdf4: dict(engine="h5netcdf"),
FileType.zarr: dict(engine="zarr"),
}


def _set_engine(file_type, xr_open_kwargs):
kw = xr_open_kwargs.copy()
if "engine" in kw:
engine_message_base = (
"pangeo-forge-recipes will automatically set the xarray backend for "
f"files of type '{file_type.value}' to '{OPENER_MAP[file_type]}', "
)
warn_matching_msg = engine_message_base + (
"which is the same value you have passed via `xarray_open_kwargs`. "
f"If this input file is actually of type '{file_type.value}', you can "
f"remove `{{'engine': '{kw['engine']}'}}` from `xarray_open_kwargs`. "
)
error_mismatched_msg = engine_message_base + (
f"which is different from the value you have passed via "
"`xarray_open_kwargs`. If this input file is actually of type "
f"'{file_type.value}', please remove `{{'engine': '{kw['engine']}'}}` "
"from `xarray_open_kwargs`. "
)
engine_message_tail = (
f"If this input file is not of type '{file_type.value}', please update"
" this recipe by passing a different value to `FilePattern.file_type`."
)
warn_matching_msg += engine_message_tail
error_mismatched_msg += engine_message_tail

if kw["engine"] == OPENER_MAP[file_type]["engine"]:
warnings.warn(warn_matching_msg)
elif kw["engine"] != OPENER_MAP[file_type]["engine"]:
raise ValueError(error_mismatched_msg)
else:
kw.update(OPENER_MAP[file_type])
return kw


def open_with_xarray(
thing: Union[OpenFileType, str],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you like the generality of thing as a parameter name, I'm open to it, but my gut reaction is that this is a somewhat opaque name, and perhaps something more descriptive would make the code more self-documenting. Even, obj_to_open or the like?

file_type: FileType = FileType.unknown,
load: bool = False,
copy_to_local=False,
xarray_open_kwargs: Optional[Dict] = None,
) -> xr.Dataset:
"""Open item with Xarray. Accepts either fsspec open-file-like objects
or string URLs that can be passed directly to Xarray.

:param thing: The thing to be opened.
:param file_type: Provide this if you know what type of file it is.
:param load: Whether to eagerly load the data into memory ofter opening.
:param copy_to_local: Whether to copy the file-like-object to a local path
and pass the path to Xarray. Required for some file types (e.g. Grib).
Can only be used with file-like-objects, not URLs.
:xarray_open_kwargs: Extra arguments to pass to Xarray's open function.
"""
# TODO: check file type matrix

kw = xarray_open_kwargs or {}
kw = _set_engine(file_type, kw)

if copy_to_local:
if file_type in [FileType.zarr or FileType.opendap]:
raise ValueError(f"File type {file_type} can't be copied to a local file.")
if isinstance(thing, str):
raise ValueError(
"Won't copy string URLs to local files. Please call ``open_url`` first."
)
ntf = tempfile.NamedTemporaryFile()
tmp_name = ntf.name
target_opener = open(tmp_name, mode="wb")
_copy_btw_filesystems(thing, target_opener)
thing = tmp_name

if isinstance(thing, str):
pass
elif isinstance(thing, io.IOBase):
# required to make mypy happy
# LocalFileOpener is a subclass of io.IOBase
pass
elif hasattr(thing, "open"):
# work around fsspec inconsistencies
thing = thing.open()
ds = xr.open_dataset(thing, **kw)
if load:
ds.load()

if copy_to_local and not load:
warnings.warn(
"Input has been copied to a local file, but the Xarray dataset has not been loaded. "
"The data may not be accessible from other hosts. Consider adding ``load=True``."
)

return ds
1 change: 1 addition & 0 deletions pangeo_forge_recipes/patterns.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class FileType(AutoName):
netcdf4 = auto()
grib = auto()
opendap = auto()
zarr = auto()


class FilePattern:
Expand Down
75 changes: 11 additions & 64 deletions pangeo_forge_recipes/storage.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import hashlib
import io
import json
import logging
import os
Expand All @@ -11,24 +12,19 @@
from abc import ABC, abstractmethod
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Any, Iterator, Optional, Sequence, Union
from typing import Iterator, Optional, Sequence, Union
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse

import fsspec
from fsspec.implementations.local import LocalFileSystem

logger = logging.getLogger(__name__)

# fsspec doesn't provide type hints, so I'm not sure what the write type is for open files
OpenFileType = Any
# https://github.com/pangeo-forge/pangeo-forge-recipes/pull/213#discussion_r717801623
# There is no fool-proof method to tell whether the output of the context was created by fsspec.
# You could check for the few concrete classes that we expect
# like AbstractBufferedFile, LocalFileOpener.
OpenFileType = Union[fsspec.core.OpenFile, fsspec.spec.AbstractBufferedFile, io.IOBase]


def _get_url_size(fname, secrets, **open_kwargs):
with get_opener(fname, secrets, **open_kwargs) as of:
with _get_opener(fname, secrets, **open_kwargs) as of:
size = of.size
return size

Expand Down Expand Up @@ -173,7 +169,7 @@ def cache_file(self, fname: str, secrets: Optional[dict], **open_kwargs) -> None
logger.info(f"File '{fname}' is already cached")
return

input_opener = get_opener(fname, secrets, **open_kwargs)
input_opener = _get_opener(fname, secrets, **open_kwargs)
target_opener = self.open(fname, mode="wb")
logger.info(f"Copying remote file '{fname}' to cache")
_copy_btw_filesystems(input_opener, target_opener)
Expand Down Expand Up @@ -239,60 +235,6 @@ def temporary_storage_config():
)


@contextmanager
def file_opener(
fname: str,
cache: Optional[CacheFSSpecTarget] = None,
copy_to_local: bool = False,
bypass_open: bool = False,
secrets: Optional[dict] = None,
**open_kwargs,
) -> Iterator[Union[fsspec.core.OpenFile, str]]:
"""
Context manager for opening files.

:param fname: The filename / url to open. Fsspec will inspect the protocol
(e.g. http, ftp) and determine the appropriate filesystem type to use.
:param cache: A target where the file may have been cached. If none, the file
will be opened directly.
:param copy_to_local: If True, always copy the file to a local temporary file
before opening. In this case, function yields a path name rather than an open file.
:param bypass_open: If True, skip trying to open the file at all and just
return the filename back directly. (A fancy way of doing nothing!)
:param secrets: Dictionary of secrets to encode into the query string.
"""

if bypass_open:
if cache or copy_to_local:
raise ValueError("Can't bypass open with cache or copy_to_local.")
logger.debug(f"Bypassing open for '{fname}'")
yield fname
return

if cache is not None:
logger.info(f"Opening '{fname}' from cache")
opener = cache.open(fname, mode="rb")
else:
logger.info(f"Opening '{fname}' directly.")
opener = get_opener(fname, secrets, **open_kwargs)
if copy_to_local:
_, suffix = os.path.splitext(fname)
ntf = tempfile.NamedTemporaryFile(suffix=suffix)
tmp_name = ntf.name
logger.info(f"Copying '{fname}' to local file '{tmp_name}'")
target_opener = open(tmp_name, mode="wb")
_copy_btw_filesystems(opener, target_opener)
yield tmp_name
ntf.close() # cleans up the temporary file
else:
logger.debug(f"file_opener entering first context for {opener}")
with opener as fp:
logger.debug(f"file_opener entering second context for {fp}")
yield fp
logger.debug("file_opener yielded")
logger.debug("opener done")


def _slugify(value: str) -> str:
# Adopted from
# https://github.com/django/django/blob/master/django/utils/text.py
Expand All @@ -312,6 +254,11 @@ def _add_query_string_secrets(fname: str, secrets: dict) -> str:
return urlunparse(parsed)


def get_opener(fname: str, secrets: Optional[dict], **open_kwargs):
def _get_opener(fname, secrets, **open_kwargs):
fname = fname if not secrets else _add_query_string_secrets(fname, secrets)
return fsspec.open(fname, mode="rb", **open_kwargs)


def file_opener(*args, **kwargs):
# dummy function to keep test suite running
pass
Loading