From 5ff12ebdb3b32ed876aa809a3fcddd1c49394af4 Mon Sep 17 00:00:00 2001 From: Philipp Schlegel Date: Thu, 12 Sep 2024 17:16:56 +0100 Subject: [PATCH 1/3] allow reading e.g. SWC files from FTP server --- navis/io/base.py | 559 +++++++++++++++++++++++++++++++++------------ navis/io/swc_io.py | 8 +- 2 files changed, 419 insertions(+), 148 deletions(-) diff --git a/navis/io/base.py b/navis/io/base.py index 4dfd4edb..a8242c29 100644 --- a/navis/io/base.py +++ b/navis/io/base.py @@ -29,12 +29,14 @@ from typing import List, Union, Iterable, Dict, Optional, Any, IO from typing_extensions import Literal from zipfile import ZipFile, ZipInfo +from ftplib import FTP from .. import config, utils, core try: import zlib import zipfile + compression = zipfile.ZIP_DEFLATED except ImportError: compression = zipfile.ZIP_STORED @@ -77,7 +79,7 @@ class Writer: def __init__(self, write_func, ext): assert callable(write_func) if ext: - assert isinstance(ext, str) and ext.startswith('.') + assert isinstance(ext, str) and ext.startswith(".") self.write_func = write_func self.ext = ext @@ -87,7 +89,9 @@ def write_single(self, x, filepath, **kwargs): try: as_str = os.fspath(filepath) except TypeError: - raise ValueError(f'`filepath` must be str or pathlib.Path, got "{type(filepath)}"') + raise ValueError( + f'`filepath` must be str or pathlib.Path, got "{type(filepath)}"' + ) # Format filename (e.g. "{neuron.name}.swc") formatted_str = as_str.format(neuron=x) @@ -103,11 +107,11 @@ def write_single(self, x, filepath, **kwargs): # If not specified, generate filename if self.ext and not str(filepath).endswith(self.ext): - filepath = filepath / f'{x.id}{self.ext}' + filepath = filepath / f"{x.id}{self.ext}" # Make sure the parent directory exists if not filepath.parent.exists(): - raise ValueError(f'Parent folder {filepath.parent} must exist.') + raise ValueError(f"Parent folder {filepath.parent} must exist.") # Track the path we put this (and presumably all other files in) self.path = Path(filepath) @@ -126,39 +130,50 @@ def write_many(self, x, filepath, **kwargs): if not is_filename or is_single or is_formattable: filepath = [filepath] * len(x) else: - raise ValueError('`filepath` must either be a folder, a ' - 'formattable filepath or a list of filepaths' - 'when saving multiple neurons.') + raise ValueError( + "`filepath` must either be a folder, a " + "formattable filepath or a list of filepaths" + "when saving multiple neurons." + ) if len(filepath) != len(x): - raise ValueError(f'Got {len(filepath)} file names for ' - f'{len(x)} neurons.') + raise ValueError( + f"Got {len(filepath)} file names for " f"{len(x)} neurons." + ) # At this point filepath is iterable filepath: Iterable[str] - for n, f in config.tqdm(zip(x, filepath), disable=config.pbar_hide, - leave=config.pbar_leave, total=len(x), - desc='Writing'): + for n, f in config.tqdm( + zip(x, filepath), + disable=config.pbar_hide, + leave=config.pbar_leave, + total=len(x), + desc="Writing", + ): self.write_single(n, filepath=f, **kwargs) def write_zip(self, x, filepath, **kwargs): """Write files to zip.""" filepath = Path(filepath).expanduser() # Parse pattern, if given - pattern = '{neuron.id}' + (self.ext if self.ext else '') - if '@' in str(filepath): - pattern, filename = filepath.name.split('@') + pattern = "{neuron.id}" + (self.ext if self.ext else "") + if "@" in str(filepath): + pattern, filename = filepath.name.split("@") filepath = filepath.parent / filename # Make sure we have an iterable x = core.NeuronList(x) - with ZipFile(filepath, mode='w') as zf: + with ZipFile(filepath, mode="w") as zf: # Context-manager will remove temporary directory and its contents with tempfile.TemporaryDirectory() as tempdir: - for n in config.tqdm(x, disable=config.pbar_hide, - leave=config.pbar_leave, total=len(x), - desc='Writing'): + for n in config.tqdm( + x, + disable=config.pbar_hide, + leave=config.pbar_leave, + total=len(x), + desc="Writing", + ): # Save to temporary file f = None try: @@ -167,8 +182,11 @@ def write_zip(self, x, filepath, **kwargs): # Write to temporary file self.write_single(n, filepath=f, **kwargs) # Add file to zip - zf.write(f, arcname=pattern.format(neuron=n), - compress_type=compression) + zf.write( + f, + arcname=pattern.format(neuron=n), + compress_type=compression, + ) except BaseException: raise finally: @@ -184,7 +202,7 @@ def write_zip(self, x, filepath, **kwargs): def write_any(self, x, filepath, **kwargs): """Write any to file. Default entry point.""" # If target is a zipfile - if isinstance(filepath, (str, Path)) and str(filepath).endswith('.zip'): + if isinstance(filepath, (str, Path)) and str(filepath).endswith(".zip"): return self.write_zip(x, filepath=filepath, **kwargs) elif isinstance(x, core.NeuronList): return self.write_many(x, filepath=filepath, **kwargs) @@ -237,15 +255,14 @@ def __init__( if self.file_ext.startswith("*"): raise ValueError('File extension must be ".ext", not "*.ext"') - def files_in_dir(self, - dpath: Path, - include_subdirs: bool = DEFAULT_INCLUDE_SUBDIRS - ) -> Iterable[Path]: + def files_in_dir( + self, dpath: Path, include_subdirs: bool = DEFAULT_INCLUDE_SUBDIRS + ) -> Iterable[Path]: """List files to read in directory.""" if not isinstance(dpath, Path): dpath = Path(dpath) dpath = dpath.expanduser() - pattern = '*' + pattern = "*" if include_subdirs: pattern = os.path.join("**", pattern) @@ -285,9 +302,7 @@ def _make_attributes( Arbitrary string-keyed attributes. """ return merge_dicts( - dict( - created_at=str(datetime.datetime.now()) - ), + dict(created_at=str(datetime.datetime.now())), self.attrs, *dicts, **kwargs, @@ -295,7 +310,7 @@ def _make_attributes( def read_buffer( self, f: IO, attrs: Optional[Dict[str, Any]] = None - ) -> 'core.BaseNeuron': + ) -> "core.BaseNeuron": """Read buffer into a single neuron. Parameters @@ -309,12 +324,13 @@ def read_buffer( ------- core.NeuronObject """ - raise NotImplementedError('Reading from buffer not implemented for ' - f'{type(self)}') + raise NotImplementedError( + "Reading from buffer not implemented for " f"{type(self)}" + ) def read_file_path( self, fpath: os.PathLike, attrs: Optional[Dict[str, Any]] = None - ) -> 'core.BaseNeuron': + ) -> "core.BaseNeuron": """Read single file from path into a neuron. Parameters @@ -338,11 +354,12 @@ def read_file_path( raise ValueError(f"Error reading file {p}") from e def read_from_zip( - self, files: Union[str, List[str]], + self, + files: Union[str, List[str]], zippath: os.PathLike, attrs: Optional[Dict[str, Any]] = None, - on_error: Union[Literal['ignore', Literal['raise']]] = 'ignore' - ) -> 'core.NeuronList': + on_error: Union[Literal["ignore", Literal["raise"]]] = "ignore", + ) -> "core.NeuronList": """Read given files from a zip into a NeuronList. Typically not used directly but via `read_zip()` dispatcher. @@ -367,17 +384,16 @@ def read_from_zip( files = utils.make_iterable(files) neurons = [] - with ZipFile(p, 'r') as zip: + with ZipFile(p, "r") as zip: for file in files: # Note the `file` is of type zipfile.ZipInfo here props = self.parse_filename(file.orig_filename) - props['origin'] = str(p) + props["origin"] = str(p) try: - n = self.read_bytes(zip.read(file), - merge_dicts(props, attrs)) + n = self.read_bytes(zip.read(file), merge_dicts(props, attrs)) neurons.append(n) except BaseException: - if on_error == 'ignore': + if on_error == "ignore": logger.warning(f'Failed to read "{file.filename}" from zip.') else: raise @@ -385,12 +401,13 @@ def read_from_zip( return core.NeuronList(neurons) def read_zip( - self, fpath: os.PathLike, + self, + fpath: os.PathLike, parallel="auto", limit: Optional[int] = None, attrs: Optional[Dict[str, Any]] = None, - on_error: Union[Literal['ignore', Literal['raise']]] = 'ignore' - ) -> 'core.NeuronList': + on_error: Union[Literal["ignore", Literal["raise"]]] = "ignore", + ) -> "core.NeuronList": """Read files from a zip into a NeuronList. This is a dispatcher for `.read_from_zip`. @@ -412,22 +429,25 @@ def read_zip( """ fpath = Path(fpath).expanduser() - read_fn = partial(self.read_from_zip, - zippath=fpath, attrs=attrs, - on_error=on_error) - neurons = parallel_read_archive(read_fn=read_fn, - fpath=fpath, - file_ext=self.is_valid_file, - limit=limit, - parallel=parallel) + read_fn = partial( + self.read_from_zip, zippath=fpath, attrs=attrs, on_error=on_error + ) + neurons = parallel_read_archive( + read_fn=read_fn, + fpath=fpath, + file_ext=self.is_valid_file, + limit=limit, + parallel=parallel, + ) return core.NeuronList(neurons) def read_from_tar( - self, files: Union[str, List[str]], + self, + files: Union[str, List[str]], tarpath: os.PathLike, attrs: Optional[Dict[str, Any]] = None, - on_error: Union[Literal['ignore', Literal['raise']]] = 'ignore' - ) -> 'core.NeuronList': + on_error: Union[Literal["ignore", Literal["raise"]]] = "ignore", + ) -> "core.NeuronList": """Read given files from a tar into a NeuronList. Typically not used directly but via `read_tar()` dispatcher. @@ -452,17 +472,18 @@ def read_from_tar( files = utils.make_iterable(files) neurons = [] - with tarfile.open(p, 'r') as tf: + with tarfile.open(p, "r") as tf: for file in files: # Note the `file` is of type tarfile.TarInfo here - props = self.parse_filename(file.name.split('/')[-1]) - props['origin'] = str(p) + props = self.parse_filename(file.name.split("/")[-1]) + props["origin"] = str(p) try: - n = self.read_bytes(tf.extractfile(file).read(), - merge_dicts(props, attrs)) + n = self.read_bytes( + tf.extractfile(file).read(), merge_dicts(props, attrs) + ) neurons.append(n) except BaseException: - if on_error == 'ignore': + if on_error == "ignore": logger.warning(f'Failed to read "{file.filename}" from tar.') else: raise @@ -470,12 +491,13 @@ def read_from_tar( return core.NeuronList(neurons) def read_tar( - self, fpath: os.PathLike, + self, + fpath: os.PathLike, parallel="auto", limit: Optional[int] = None, attrs: Optional[Dict[str, Any]] = None, - on_error: Union[Literal['ignore', Literal['raise']]] = 'ignore' - ) -> 'core.NeuronList': + on_error: Union[Literal["ignore", Literal["raise"]]] = "ignore", + ) -> "core.NeuronList": """Read files from a tar archive into a NeuronList. This is a dispatcher for `.read_from_tar`. @@ -497,23 +519,134 @@ def read_tar( """ fpath = Path(fpath).expanduser() - read_fn = partial(self.read_from_tar, - tarpath=fpath, attrs=attrs, - on_error=on_error) - neurons = parallel_read_archive(read_fn=read_fn, - fpath=fpath, - file_ext=self.is_valid_file, - limit=limit, - parallel=parallel) + read_fn = partial( + self.read_from_tar, tarpath=fpath, attrs=attrs, on_error=on_error + ) + neurons = parallel_read_archive( + read_fn=read_fn, + fpath=fpath, + file_ext=self.is_valid_file, + limit=limit, + parallel=parallel, + ) + return core.NeuronList(neurons) + + def read_ftp( + self, + url, + parallel="auto", + limit: Optional[int] = None, + attrs: Optional[Dict[str, Any]] = None, + on_error: Union[Literal["ignore", Literal["raise"]]] = "ignore", + ) -> "core.NeuronList": + """Read files from an FTP server. + + This is a dispatcher for `.read_from_tar`. + + Parameters + ---------- + url : str + Can be the path to a single file or a directory. + limit : int, optional + Limit the number of files read from this directory. + attrs : dict or None + Arbitrary attributes to include in the TreeNeuron. + on_error : 'ignore' | 'raise' + What do do when error is encountered. + + Returns + ------- + core.NeuronList + + """ + # Remove the ftp:// prefix + url = url.replace("ftp://", "") + + # Split into server and path + server, path = url.split("/", 1) + + # Check if server contains a port + if ":" in server: + server, port = server.split(":") + port = int(port) + else: + port = 21 # default port + + read_fn = partial(self.read_from_ftp, attrs=attrs, on_error=on_error) + neurons = parallel_read_ftp( + read_fn=read_fn, + server=server, + port=port, + path=path, + file_ext=self.is_valid_file, + limit=limit, + parallel=parallel, + ) + return core.NeuronList(neurons) + + def read_from_ftp( + self, + files: Union[str, List[str]], + ftp: FTP, + attrs: Optional[Dict[str, Any]] = None, + on_error: Union[Literal["ignore", Literal["raise"]]] = "ignore", + ) -> "core.NeuronList": + """Read given files from an FTP server into a NeuronList. + + Typically not used directly but via `read_ftp()` dispatcher. + + Parameters + ---------- + files : tarfile.TarInfo | list thereof + Files inside the tar file to read. + ftp : ftplib.FTP | "GLOBAL" + The FTP client. This should already be connected, logged in + and in the correct directory. If "GLOBAL", we will look for a + `_FTP` global variable. + attrs : dict or None + Arbitrary attributes to include in the TreeNeuron. + on_error : 'ignore' | 'raise' + What do do when error is encountered. + + Returns + ------- + core.NeuronList + + """ + if ftp == "GLOBAL": + if "_FTP" not in globals(): + raise ValueError("No global FTP connection found.") + ftp = _FTP + + files = utils.make_iterable(files) + + neurons = [] + for file in files: + # Read the file into a bytes + with io.BytesIO() as f: + ftp.retrbinary("RETR " + file, f.write) + f.seek(0) + props = self.parse_filename(file) + props["origin"] = f"{ftp.host}:{ftp.port}{ftp.pwd()}/{file}" + try: + n = self.read_buffer(f, merge_dicts(props, attrs)) + neurons.append(n) + except BaseException: + if on_error == "ignore": + logger.warning(f'Failed to read "{file}" from FTP.') + else: + raise + return core.NeuronList(neurons) def read_directory( - self, path: os.PathLike, + self, + path: os.PathLike, include_subdirs=DEFAULT_INCLUDE_SUBDIRS, parallel="auto", limit: Optional[int] = None, - attrs: Optional[Dict[str, Any]] = None - ) -> 'core.NeuronList': + attrs: Optional[Dict[str, Any]] = None, + ) -> "core.NeuronList": """Read directory of files into a NeuronList. Parameters @@ -544,7 +677,7 @@ def read_directory( def read_url( self, url: str, attrs: Optional[Dict[str, Any]] = None - ) -> 'core.BaseNeuron': + ) -> "core.BaseNeuron": """Read file from URL into a neuron. Parameters @@ -569,16 +702,13 @@ def read_url( # the wrong format. with requests.get(url, stream=False) as r: r.raise_for_status() - props = self.parse_filename(url.split('/')[-1]) - props['origin'] = url - return self.read_buffer( - io.BytesIO(r.content), - merge_dicts(props, attrs) - ) + props = self.parse_filename(url.split("/")[-1]) + props["origin"] = url + return self.read_buffer(io.BytesIO(r.content), merge_dicts(props, attrs)) def read_string( self, s: str, attrs: Optional[Dict[str, Any]] = None - ) -> 'core.BaseNeuron': + ) -> "core.BaseNeuron": """Read single string into a Neuron. Parameters @@ -594,13 +724,12 @@ def read_string( """ sio = io.StringIO(s) return self.read_buffer( - sio, - merge_dicts({'name': self.name_fallback, 'origin': 'string'}, attrs) + sio, merge_dicts({"name": self.name_fallback, "origin": "string"}, attrs) ) def read_bytes( self, s: str, attrs: Optional[Dict[str, Any]] = None - ) -> 'core.BaseNeuron': + ) -> "core.BaseNeuron": """Read bytes into a Neuron. Parameters @@ -616,13 +745,12 @@ def read_bytes( """ sio = io.BytesIO(s) return self.read_buffer( - sio, - merge_dicts({'name': self.name_fallback, 'origin': 'string'}, attrs) + sio, merge_dicts({"name": self.name_fallback, "origin": "string"}, attrs) ) def read_dataframe( self, nodes: pd.DataFrame, attrs: Optional[Dict[str, Any]] = None - ) -> 'core.BaseNeuron': + ) -> "core.BaseNeuron": """Convert a DataFrame into a neuron. Parameters @@ -635,12 +763,13 @@ def read_dataframe( ------- core.BaseNeuron """ - raise NotImplementedError('Reading DataFrames not implemented for ' - f'{type(self)}') + raise NotImplementedError( + "Reading DataFrames not implemented for " f"{type(self)}" + ) def read_any_single( self, obj, attrs: Optional[Dict[str, Any]] = None - ) -> 'core.BaseNeuron': + ) -> "core.BaseNeuron": """Attempt to convert an arbitrary object into a neuron. Parameters @@ -660,7 +789,7 @@ def read_any_single( if isinstance(obj, pd.DataFrame): return self.read_dataframe(obj, attrs) if isinstance(obj, os.PathLike): - if str(obj).endswith('.zip'): + if str(obj).endswith(".zip"): return self.read_zip(obj, attrs=attrs) elif ".tar" in str(obj): return self.read_tar(obj, attrs=attrs) @@ -669,17 +798,17 @@ def read_any_single( # See if this might be a file (make sure to expand user) if os.path.isfile(os.path.expanduser(obj)): p = Path(obj).expanduser() - if p.suffix == '.zip': + if p.suffix == ".zip": return self.read_zip(p, attrs=attrs) return self.read_file_path(p, attrs) if obj.startswith("http://") or obj.startswith("https://"): return self.read_url(obj, attrs) + if obj.startswith("ftp://"): + return self.read_ftp(obj, attrs=attrs) return self.read_string(obj, attrs) if isinstance(obj, bytes): return self.read_bytes(obj, attrs) - raise ValueError( - f"Could not read neuron from object of type '{type(obj)}'" - ) + raise ValueError(f"Could not read neuron from object of type '{type(obj)}'") def read_any_multi( self, @@ -687,7 +816,7 @@ def read_any_multi( include_subdirs=DEFAULT_INCLUDE_SUBDIRS, parallel="auto", attrs: Optional[Dict[str, Any]] = None, - ) -> 'core.NeuronList': + ) -> "core.NeuronList": """Attempt to convert an arbitrary object into a NeuronList, potentially in parallel. @@ -730,7 +859,7 @@ def read_any_multi( if ( isinstance(parallel, str) - and parallel.lower() == 'auto' + and parallel.lower() == "auto" and len(new_objs) < 200 ): parallel = False @@ -746,7 +875,7 @@ def read_any( parallel="auto", limit=None, attrs: Optional[Dict[str, Any]] = None, - ) -> 'core.NeuronObject': + ) -> "core.NeuronObject": """Attempt to read an arbitrary object into a neuron. Parameters @@ -770,17 +899,19 @@ def read_any( except TypeError: pass try: - if os.path.isfile(os.path.expanduser(obj)) and str(obj).endswith('.zip'): + if os.path.isfile(os.path.expanduser(obj)) and str(obj).endswith( + ".zip" + ): return self.read_zip(obj, parallel, limit, attrs) if os.path.isfile(os.path.expanduser(obj)) and ".tar" in str(obj): return self.read_tar(obj, parallel, limit, attrs) + if isinstance(obj, str) and obj.startswith("ftp://"): + return self.read_ftp(obj, parallel, limit, attrs) except TypeError: pass return self.read_any_single(obj, attrs) - def parse_filename( - self, filename: str - ) -> dict: + def parse_filename(self, filename: str) -> dict: """Extract properties from filename according to specified formatter. Parameters @@ -800,7 +931,7 @@ def parse_filename( fmt = re.escape(self.fmt) # Unescape { and } - fmt = fmt.replace('\\{', '{').replace('\\}', '}') + fmt = fmt.replace("\\{", "{").replace("\\}", "}") # Replace all e.g. {name} with {.*} prop_names = [] @@ -814,37 +945,36 @@ def parse_filename( if not match: raise ValueError(f'Unable to match "{self.fmt}" to filename "{filename}"') - props = {'file': filename} + props = {"file": filename} for i, prop in enumerate(prop_names): - for p in prop.split(','): + for p in prop.split(","): # Ignore empty ("{}") if not p: continue # If datatype was specified if ":" in p: - p, dt = p.split(':') + p, dt = p.split(":") props[p] = match.group(i + 1) - if dt == 'int': + if dt == "int": props[p] = int(props[p]) - elif dt == 'float': + elif dt == "float": props[p] = float(props[p]) - elif dt == 'bool': + elif dt == "bool": props[p] = bool(props[p]) - elif dt == 'str': + elif dt == "str": props[p] = str(props[p]) else: - raise ValueError(f'Unable to interpret datatype "{dt}" ' - f'for property {p}') + raise ValueError( + f'Unable to interpret datatype "{dt}" ' f"for property {p}" + ) else: props[p] = match.group(i + 1) return props - def _extract_connectors( - self, nodes: pd.DataFrame - ) -> Optional[pd.DataFrame]: + def _extract_connectors(self, nodes: pd.DataFrame) -> Optional[pd.DataFrame]: """Infer outgoing/incoming connectors from data. Parameters @@ -859,7 +989,7 @@ def _extract_connectors( return -def parallel_read(read_fn, objs, parallel="auto") -> List['core.NeuronList']: +def parallel_read(read_fn, objs, parallel="auto") -> List["core.NeuronList"]: """Read neurons from some objects with the given reader function, potentially in parallel. @@ -885,15 +1015,15 @@ def parallel_read(read_fn, objs, parallel="auto") -> List['core.NeuronList']: prog = partial( config.tqdm, - desc='Importing', + desc="Importing", total=length, disable=config.pbar_hide, - leave=config.pbar_leave + leave=config.pbar_leave, ) if ( isinstance(parallel, str) - and parallel.lower() == 'auto' + and parallel.lower() == "auto" and not isinstance(length, type(None)) and length < 200 ): @@ -915,10 +1045,9 @@ def parallel_read(read_fn, objs, parallel="auto") -> List['core.NeuronList']: return neurons -def parallel_read_archive(read_fn, fpath, file_ext, - limit=None, - parallel="auto", - ignore_hidden=True) -> List['core.NeuronList']: +def parallel_read_archive( + read_fn, fpath, file_ext, limit=None, parallel="auto", ignore_hidden=True +) -> List["core.NeuronList"]: """Read neurons from a archive (zip or tar), potentially in parallel. Reader function must be picklable. @@ -955,38 +1084,38 @@ def parallel_read_archive(read_fn, fpath, file_ext, p = Path(fpath) to_read = [] - if p.name.endswith('.zip'): - with ZipFile(p, 'r') as zip: + if p.name.endswith(".zip"): + with ZipFile(p, "r") as zip: for i, file in enumerate(zip.filelist): - fname = file.filename.split('/')[-1] - if ignore_hidden and fname.startswith('._'): + fname = file.filename.split("/")[-1] + if ignore_hidden and fname.startswith("._"): continue if callable(file_ext): if file_ext(file): to_read.append(file) - elif file_ext == '*': + elif file_ext == "*": to_read.append(file) elif file_ext and fname.endswith(file_ext): to_read.append(file) - elif '.' not in file.filename: + elif "." not in file.filename: to_read.append(file) if isinstance(limit, int) and i >= limit: break - elif '.tar' in p.name: # can be ".tar", "tar.gz" or "tar.bz" - with tarfile.open(p, 'r') as tf: + elif ".tar" in p.name: # can be ".tar", "tar.gz" or "tar.bz" + with tarfile.open(p, "r") as tf: for i, file in enumerate(tf): - fname = file.name.split('/')[-1] - if ignore_hidden and fname.startswith('._'): + fname = file.name.split("/")[-1] + if ignore_hidden and fname.startswith("._"): continue if callable(file_ext): if file_ext(file): to_read.append(file) - elif file_ext == '*': + elif file_ext == "*": to_read.append(file) elif file_ext and fname.endswith(file_ext): to_read.append(file) - elif '.' not in file.filename: + elif "." not in file.filename: to_read.append(file) if isinstance(limit, int) and i >= limit: @@ -997,17 +1126,13 @@ def parallel_read_archive(read_fn, fpath, file_ext, prog = partial( config.tqdm, - desc='Importing', + desc="Importing", total=len(to_read), disable=config.pbar_hide, - leave=config.pbar_leave + leave=config.pbar_leave, ) - if ( - isinstance(parallel, str) - and parallel.lower() == 'auto' - and len(to_read) < 200 - ): + if isinstance(parallel, str) and parallel.lower() == "auto" and len(to_read) < 200: parallel = False if parallel: @@ -1026,6 +1151,148 @@ def parallel_read_archive(read_fn, fpath, file_ext, return neurons +def parallel_read_ftp( + read_fn, + server, + port, + path, + file_ext, + limit=None, + parallel="auto", + ignore_hidden=True, +) -> List["core.NeuronList"]: + """Read neurons from an FTP server, potentially in parallel. + + Reader function must be picklable. + + Parameters + ---------- + read_fn : Callable + server : str + FTP server address. + port : int + FTP server port. + path : str + Path to directory containing files or single file. + file_ext : str | callable + File extension to search for - e.g. ".swc". `None` or `''` + are interpreted as looking for filenames without extension. + To include all files use `'*'`. Can also be callable that + accepts a filename and returns True or False depending on + if it should be included. + limit : int, optional + Limit the number of files read from this directory. + parallel : str | bool | int + "auto" or True for n_cores // 2, otherwise int for number of + jobs, or false for serial. + ignore_hidden : bool + Archives zipped on OSX can end up containing a + `__MACOSX` folder with files that mirror the name of other + files. For example if there is a `123456.swc` in the archive + you might also find a `__MACOSX/._123456.swc`. Reading the + latter will result in an error. If ignore_hidden=True + we will simply ignore all file that starts with "._". + + Returns + ------- + core.NeuronList + + """ + # Check if this is a single file + is_single_file = False + if "*" not in path: + if isinstance(file_ext, str) and path.endswith(file_ext): + is_single_file = True + elif callable(file_ext) and file_ext(path.rsplit("/", 1)[1]): + is_single_file = True + + if is_single_file: + path, fname = path.rsplit("/", 1) + to_read = [fname] + else: + pattern = "" + # Check if path contains a "*." pattern - e.g. something like "*_raw.swc" + if "*" in path: + path, fname = path.rsplit("/", 1) + pattern = fname + + # Remove leading / + if path.startswith("/"): + path = path[1:] + + # First check content + with FTP() as ftp: + ftp.connect(server, port) # connect to server + ftp.login() # anonymous login + ftp.cwd(path) # change to path + + # Read content + content = [] + ftp.retrlines(f"LIST {pattern}", content.append) + + # Parse content into filenames + to_read = [] + for line in content: + if not line: + continue + file = line.split()[-1].strip() + + if callable(file_ext): + if file_ext(file): + to_read.append(file) + elif file_ext == "*": + to_read.append(file) + elif file_ext and fname.endswith(file_ext): + to_read.append(file) + + if isinstance(limit, int) and len(to_read) >= limit: + break + + if isinstance(limit, list): + to_read = [f for f in to_read if f in limit] + + prog = partial( + config.tqdm, + desc="Loading", + total=len(to_read), + disable=config.pbar_hide, + leave=config.pbar_leave, + ) + + if isinstance(parallel, str) and parallel.lower() == "auto" and len(to_read) < 200: + parallel = False + + if parallel: + # Do not swap this as `isinstance(True, int)` returns `True` + if isinstance(parallel, (bool, str)): + n_cores = max(1, os.cpu_count() // 2) + else: + n_cores = int(parallel) + + with mp.Pool( + processes=n_cores, initializer=_ftp_pool_init, initargs=(server, port, path) + ) as pool: + results = pool.imap(partial(read_fn, ftp="GLOBAL"), to_read) + neurons = list(prog(results)) + else: + with FTP() as ftp: + ftp.connect(server, port) + ftp.login() + ftp.cwd(path) + + neurons = [read_fn(file, ftp=ftp) for file in prog(to_read)] + + return neurons + + +def _ftp_pool_init(server, port, path): + global _FTP + _FTP = FTP() + _FTP.connect(server, port) + _FTP.login() + _FTP.cwd(path) + + def parse_precision(precision: Optional[int]): """Convert bit width into int and float dtypes. @@ -1051,5 +1318,5 @@ def parse_precision(precision: Optional[int]): return (INT_DTYPES[precision], FLOAT_DTYPES[precision]) except KeyError: raise ValueError( - f'Unknown precision {precision}. Expected on of the following: 16, 32 (default), 64 or None' + f"Unknown precision {precision}. Expected on of the following: 16, 32 (default), 64 or None" ) diff --git a/navis/io/swc_io.py b/navis/io/swc_io.py index 5a0c65d9..b6524381 100644 --- a/navis/io/swc_io.py +++ b/navis/io/swc_io.py @@ -368,12 +368,16 @@ def read_swc(f: Union[str, pd.DataFrame, Iterable], >>> s = navis.read_swc('skeletons.zip') # doctest: +SKIP - Sample first 100 SWC files a zip archive: + Sample first 100 SWC files in a zip archive: >>> s = navis.read_swc('skeletons.zip', limit=100) # doctest: +SKIP + Read first all SWC files an ftp folder: + + >>> s = navis.read_swc('ftp://server:port/path/to/swc/') # doctest: +SKIP + """ - # SwcReader will try its best to read whatever you throw at it - with limit + # SwcReader will try its best to read whatever you throw at it - with limited # sanity checks. For example: if you misspell a filepath, it will assume # that it's a SWC string (because anything that's a string but doesn't # point to an existing file or a folder MUST be a SWC) which will lead to From 14218c1db034fea436af213513efaae0cb18af6b Mon Sep 17 00:00:00 2001 From: Philipp Schlegel Date: Wed, 18 Sep 2024 09:53:26 +0100 Subject: [PATCH 2/3] docs: add FTP example to skeleton I/O tutorial --- docs/examples/0_io/plot_00_io_skeletons.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/docs/examples/0_io/plot_00_io_skeletons.py b/docs/examples/0_io/plot_00_io_skeletons.py index aea7dab5..3bd20181 100644 --- a/docs/examples/0_io/plot_00_io_skeletons.py +++ b/docs/examples/0_io/plot_00_io_skeletons.py @@ -62,11 +62,26 @@ s # %% -# You can even use URLs directly: +# You can even use URLs or FTP servers directly: # %% + +# From URL: s = navis.read_swc('https://v2.virtualflybrain.org/data/VFB/i/jrch/jup2/VFB_00101567/volume.swc') +# %% + +# From an FTP folder: +nl = navis.read_swc('ftp://download.brainlib.org:8811/biccn/zeng/pseq/morph/200526/', limit=3) + + +# !!! tip +# [`read_swc`][navis.read_swc] is super flexible and can handle a variety of inputs (file names, folders, archives, URLs, etc.). +# Importantly, it also let you customize which/how neurons are loaded. For example: +# - the `limit` parameter can also be used to load only files matching a given pattern +# - the `fmt` parameter lets you specify how to parse filenames into neuron names and ids +# Many of the other `navis.read_*` functions share these features! + # %% # ## To SWC files # @@ -125,7 +140,7 @@ # # Among other formats, neuroglancer supports a "precomputed" format for skeletons # (see specs [here](https://github.com/google/neuroglancer/blob/master/src/neuroglancer/datasource/precomputed/skeletons.md). -# This binary format is more compact than uncompressed SWC files but probably is not used outside of neuroglancer afaik. +# This binary format is more compact than uncompressed SWC files but is not used outside of neuroglancer as far as I know. # That said: {{ navis }} lets you read and write skeletons from/to precomputed format using [`navis.read_precomputed`][] and # [`navis.write_precomputed`][]. Note that these functions work on both precomputed skeletons and meshes. # From b187de292206bebf856a90c1301f5c992cb07e09 Mon Sep 17 00:00:00 2001 From: Philipp Schlegel Date: Wed, 18 Sep 2024 09:55:24 +0100 Subject: [PATCH 3/3] read_* functions: make `limit` parameter accept regex pattern or slice --- docs/changelog.md | 13 +++++---- navis/io/base.py | 55 ++++++++++++++++++++++++++++---------- navis/io/mesh_io.py | 17 ++++++++---- navis/io/nmx_io.py | 17 ++++++++---- navis/io/precomputed_io.py | 17 ++++++++---- navis/io/swc_io.py | 41 ++++++++++++++++++---------- navis/utils/misc.py | 2 ++ 7 files changed, 114 insertions(+), 48 deletions(-) diff --git a/docs/changelog.md b/docs/changelog.md index 5b33ed12..48fc7e54 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -36,7 +36,7 @@ more consistent and easier to use. - New function: [`navis.graph.skeleton_adjacency_matrix`][] computes the node adjacency for skeletons - New function: [`navis.graph.simplify_graph`][] simplifies skeleton graphs to only root, branch and leaf nodes while preserving branch length (i.e. weights) - New [`NeuronList`][navis.NeuronList] method: [`get_neuron_attributes`][navis.NeuronList.get_neuron_attributes] is analagous to `dict.get` -- [`NeuronLists`][navis.NeuronList] now implemented the `|` (`__or__`) operator which can be used to get the union of two [`NeuronLists`][navis.NeuronList] +- [`NeuronLists`][navis.NeuronList] now implement the `|` (`__or__`) operator which can be used to get the union of two [`NeuronLists`][navis.NeuronList] - [`navis.Volume`][] now have an (optional) `.units` property similar to neurons ##### Improvements @@ -44,9 +44,9 @@ more consistent and easier to use. - [`navis.plot3d`][]: - `legendgroup` parameter (plotly backend) now also sets the legend group's title - new parameters for the plotly backend: - - `legend` (default `True`): determines whether legends is shown - - `legend_orientation` (default `v`): determines whether legend is aranged vertically (`v`) or horizontally (`h`) - - `linestyle` (default `-`): determines line style for skeletons + - `legend` (default `True`): determines whether legends is shown + - `legend_orientation` (default `v`): determines whether legend is aranged vertically (`v`) or horizontally (`h`) + - `linestyle` (default `-`): determines line style for skeletons - default for `radius` is now `"auto"` - [`navis.plot2d`][]: - the `view` parameter now also works with `methods` `3d` and `3d_complex` @@ -55,13 +55,16 @@ more consistent and easier to use. - new parameters for methods `3d` and `3d_complex`: `mesh_shade=False` and `non_view_axes3d` - the `scalebar` parameter can now be a dictionary used to style (color, width, etc) the scalebar - the `connectors` parameter can now be used to show specific connector types (e.g. `connectors="pre"`) +- I/O: + - `read_*` functions are now able to read from FTP servers (`ftp://...`) + - the `limit` parameter used in many `read_*` functions can now also be a regex pattern or a `slice` - General improvements to docs and tutorials ##### Fixes - Memory usage of `Neuron/Lists` is now correctly re-calculated when the neuron is modified - Various fixes and improvements for the MICrONS interface (`navis.interfaces.microns`) - [`navis.graph.node_label_sorting`][] now correctly prioritizes total branch length -- [`navis.TreeNeuron.simple][] now correctly drops soma nodes if they aren't root, branch or leaf points themselves +- [`navis.TreeNeuron.simple`][] now correctly drops soma nodes if they aren't root, branch or leaf points themselves ## Version `1.7.0` { data-toc-label="1.7.0" } _Date: 25/07/24_ diff --git a/navis/io/base.py b/navis/io/base.py index a8242c29..f2d06b6c 100644 --- a/navis/io/base.py +++ b/navis/io/base.py @@ -48,6 +48,9 @@ DEFAULT_INCLUDE_SUBDIRS = False +# Regular expression to figure out if a string is a regex pattern +rgx = re.compile(r'[\\\.\?\[\]\+\^\$\*]') + def merge_dicts(*dicts: Optional[Dict], **kwargs) -> Dict: """Merge dicts and kwargs left to right. @@ -541,7 +544,7 @@ def read_ftp( ) -> "core.NeuronList": """Read files from an FTP server. - This is a dispatcher for `.read_from_tar`. + This is a dispatcher for `.read_from_ftp`. Parameters ---------- @@ -613,6 +616,8 @@ def read_from_ftp( core.NeuronList """ + # When reading in parallel, we expect there to be a global FTP connection + # that was initialized once for each worker process. if ftp == "GLOBAL": if "_FTP" not in globals(): raise ValueError("No global FTP connection found.") @@ -668,8 +673,18 @@ def read_directory( """ files = list(self.files_in_dir(Path(path), include_subdirs)) - if limit: + if isinstance(limit, int): files = files[:limit] + elif isinstance(limit, list): + files = [f for f in files if f in limit] + elif isinstance(limit, slice): + files = files[limit] + elif isinstance(limit, str): + # Check if limit is a regex + if rgx.search(limit): + files = [f for f in files if re.search(limit, str(f.name))] + else: + files = [f for f in files if limit in str(f)] read_fn = partial(self.read_file_path, attrs=attrs) neurons = parallel_read(read_fn, files, parallel) @@ -1123,6 +1138,14 @@ def parallel_read_archive( if isinstance(limit, list): to_read = [f for f in to_read if f in limit] + elif isinstance(limit, slice): + to_read = to_read[limit] + elif isinstance(limit, str): + # Check if limit is a regex + if rgx.search(limit): + to_read = [f for f in to_read if re.search(limit, f)] + else: + to_read = [f for f in to_read if limit in f] prog = partial( config.tqdm, @@ -1159,7 +1182,6 @@ def parallel_read_ftp( file_ext, limit=None, parallel="auto", - ignore_hidden=True, ) -> List["core.NeuronList"]: """Read neurons from an FTP server, potentially in parallel. @@ -1185,13 +1207,6 @@ def parallel_read_ftp( parallel : str | bool | int "auto" or True for n_cores // 2, otherwise int for number of jobs, or false for serial. - ignore_hidden : bool - Archives zipped on OSX can end up containing a - `__MACOSX` folder with files that mirror the name of other - files. For example if there is a `123456.swc` in the archive - you might also find a `__MACOSX/._123456.swc`. Reading the - latter will result in an error. If ignore_hidden=True - we will simply ignore all file that starts with "._". Returns ------- @@ -1245,11 +1260,21 @@ def parallel_read_ftp( elif file_ext and fname.endswith(file_ext): to_read.append(file) - if isinstance(limit, int) and len(to_read) >= limit: - break - - if isinstance(limit, list): + if isinstance(limit, int): + to_read = to_read[:limit] + elif isinstance(limit, list): to_read = [f for f in to_read if f in limit] + elif isinstance(limit, slice): + to_read = to_read[limit] + elif isinstance(limit, str): + # Check if limit is a regex + if rgx.search(limit): + to_read = [f for f in to_read if re.search(limit, f)] + else: + to_read = [f for f in to_read if limit in f] + + if not to_read: + return [] prog = partial( config.tqdm, @@ -1269,6 +1294,8 @@ def parallel_read_ftp( else: n_cores = int(parallel) + # We can't send the FTP object to the process (because its socket is not pickleable) + # Instead, we need to initialize a new FTP connection in each process via a global variable with mp.Pool( processes=n_cores, initializer=_ftp_pool_init, initargs=(server, port, path) ) as pool: diff --git a/navis/io/mesh_io.py b/navis/io/mesh_io.py index 5e5677f4..8e2b584d 100644 --- a/navis/io/mesh_io.py +++ b/navis/io/mesh_io.py @@ -62,11 +62,18 @@ def read_mesh(f: Union[str, Iterable], Determines function's output. See Returns. errors : "raise" | "log" | "ignore" If "log" or "ignore", errors will not be raised. - limit : int, optional - If reading from a folder you can use this parameter to - read only the first `limit` files. Useful when - wanting to get a sample from a large library of - meshes. + limit : int | str | slice | list, optional + When reading from a folder or archive you can use this parameter to + restrict the which files read: + - if an integer, will read only the first `limit` SWC files + (useful to get a sample from a large library of meshes) + - if a string, will interpret it as filename (regex) pattern + and only read files that match the pattern; e.g. `limit='.*_R.*'` + will only read files that contain `_R` in their filename + - if a slice (e.g. `slice(10, 20)`) will read only the files in + that range + - a list is expected to be a list of filenames to read from + the folder/archive **kwargs Keyword arguments passed to [`navis.MeshNeuron`][] or [`navis.Volume`][]. You can use this to e.g. diff --git a/navis/io/nmx_io.py b/navis/io/nmx_io.py index f57b85fc..bc1906e2 100644 --- a/navis/io/nmx_io.py +++ b/navis/io/nmx_io.py @@ -204,11 +204,18 @@ def read_nmx(f: Union[str, pd.DataFrame, Iterable], Precision for data. Defaults to 32 bit integers/floats. If `None` will let pandas infer data types - this typically leads to higher than necessary precision. - limit : int, optional - If reading from a folder you can use this parameter to - read only the first `limit` NMX files. Useful if - wanting to get a sample from a large library of - skeletons. + limit : int | str | slice | list, optional + When reading from a folder or archive you can use this parameter to + restrict the which files read: + - if an integer, will read only the first `limit` SWC files + (useful to get a sample from a large library of skeletons) + - if a string, will interpret it as filename (regex) pattern + and only read files that match the pattern; e.g. `limit='.*_R.*'` + will only read files that contain `_R` in their filename + - if a slice (e.g. `slice(10, 20)`) will read only the files in + that range + - a list is expected to be a list of filenames to read from + the folder/archive **kwargs Keyword arguments passed to the construction of `navis.TreeNeuron`. You can use this to e.g. set diff --git a/navis/io/precomputed_io.py b/navis/io/precomputed_io.py index 76f3b0c2..5adfac73 100644 --- a/navis/io/precomputed_io.py +++ b/navis/io/precomputed_io.py @@ -252,11 +252,18 @@ def read_precomputed(f: Union[str, io.BytesIO], - `False` = do not use/look for `info` file - `str` = filepath to `info` file - `dict` = already parsed info file - limit : int, optional - If reading from a folder you can use this parameter to - read only the first `limit` files. Useful if - wanting to get a sample from a large library of - skeletons/meshes. + limit : int | str | slice | list, optional + When reading from a folder or archive you can use this parameter to + restrict the which files read: + - if an integer, will read only the first `limit` SWC files + (useful to get a sample from a large library of neurons) + - if a string, will interpret it as filename (regex) pattern + and only read files that match the pattern; e.g. `limit='.*_R.*'` + will only read files that contain `_R` in their filename + - if a slice (e.g. `slice(10, 20)`) will read only the files in + that range + - a list is expected to be a list of filenames to read from + the folder/archive parallel : "auto" | bool | int Defaults to `auto` which means only use parallel processing if more than 200 files are imported. Spawning diff --git a/navis/io/swc_io.py b/navis/io/swc_io.py index b6524381..08ed6a68 100644 --- a/navis/io/swc_io.py +++ b/navis/io/swc_io.py @@ -273,14 +273,20 @@ def read_swc(f: Union[str, pd.DataFrame, Iterable], Parameters ---------- - f : str | pandas.DataFrame | iterable - Filename, folder, SWC string, URL or DataFrame. - If folder, will import all `.swc` files. If a - `.zip`, `.tar` or `.tar.gz` file will read all - SWC files in the file. See also `limit` parameter. + f : str | pandas.DataFrame | list thereof + Filename, folder, SWC string, URL or DataFrame: + - if folder, will import all `.swc` files + - if a `.zip`, `.tar` or `.tar.gz` archive will read all + SWC files from the file + - if a URL (http:// or https://), will download the + file and import it + - FTP address (ftp://) can point to a folder or a single + file + - DataFrames are interpreted as a SWC tables + See also `limit` parameter to read only a subset of files. connector_labels : dict, optional If provided will extract connectors from SWC. - Dictionary must map type to label: + Dictionary must map types to labels: `{'presynapse': 7, 'postsynapse': 8}` include_subdirs : bool, optional If True and `f` is a folder, will also search @@ -293,7 +299,7 @@ def read_swc(f: Union[str, pd.DataFrame, Iterable], and joining processes causes overhead and is considerably slower for imports of small numbers of neurons. Integer will be interpreted as the - number of cores (otherwise defaults to + number of processes to use (defaults to `os.cpu_count() // 2`). precision : int [8, 16, 32, 64] | None Precision for data. Defaults to 32 bit integers/floats. @@ -325,16 +331,23 @@ def read_swc(f: Union[str, pd.DataFrame, Iterable], read_meta : bool If True and SWC header contains a line with JSON-encoded meta data e.g. (`# Meta: {'id': 123}`), these data - will be read as neuron properties. `fmt` takes + will be read as neuron properties. `fmt` still takes precedence. Will try to assign meta data directly as neuron attribute (e.g. `neuron.id`). Failing that (can happen for properties intrinsic to `TreeNeurons`), will add a `.meta` dictionary to the neuron. - limit : int, optional - If reading from a folder you can use this parameter to - read only the first `limit` SWC files. Useful if - wanting to get a sample from a large library of - skeletons. + limit : int | str | slice | list, optional + When reading from a folder or archive you can use this parameter to + restrict the which files read: + - if an integer, will read only the first `limit` SWC files + (useful to get a sample from a large library of skeletons) + - if a string, will interpret it as filename (regex) pattern + and only read files that match the pattern; e.g. `limit='.*_R.*'` + will only read files that contain `_R` in their filename + - if a slice (e.g. `slice(10, 20)`) will read only the files in + that range + - a list is expected to be a list of filenames to read from + the folder/archive **kwargs Keyword arguments passed to the construction of `navis.TreeNeuron`. You can use this to e.g. set @@ -368,7 +381,7 @@ def read_swc(f: Union[str, pd.DataFrame, Iterable], >>> s = navis.read_swc('skeletons.zip') # doctest: +SKIP - Sample first 100 SWC files in a zip archive: + Sample the first 100 SWC files in a zip archive: >>> s = navis.read_swc('skeletons.zip', limit=100) # doctest: +SKIP diff --git a/navis/utils/misc.py b/navis/utils/misc.py index 2ce67365..3cfa3bab 100644 --- a/navis/utils/misc.py +++ b/navis/utils/misc.py @@ -96,6 +96,8 @@ def is_url(x: str) -> bool: False >>> is_url('http://www.google.com') True + >>> is_url("ftp://download.ft-server.org:8000") + True """ parsed = urllib.parse.urlparse(x)