From 803c3ca4017b571f1fbf752b21d7c8d5a1453eba Mon Sep 17 00:00:00 2001 From: Martin Valgur Date: Tue, 13 Apr 2021 14:45:39 +0300 Subject: [PATCH] move the delete-on-success feature from CLI to the library --- hatanaka/cli.py | 21 +++--------- hatanaka/general_compression.py | 40 +++++++++++++++++++---- hatanaka/test/test_general_compression.py | 38 +++++++++++++++++++++ 3 files changed, 76 insertions(+), 23 deletions(-) diff --git a/hatanaka/cli.py b/hatanaka/cli.py index 1746ea7..34385d1 100644 --- a/hatanaka/cli.py +++ b/hatanaka/cli.py @@ -1,12 +1,11 @@ import argparse import sys -import warnings -from contextlib import contextmanager from pathlib import Path from typing import List, Optional from hatanaka import __version__, compress, compress_on_disk, decompress, decompress_on_disk, \ rnxcmp_version +from hatanaka.general_compression import _record_warnings __all__ = ['decompress_cli', 'compress_cli'] @@ -83,16 +82,13 @@ def _run(func, func_on_disk, args, **kwargs): for in_file in args.files: with _record_warnings() as warning_list: - out_file = func_on_disk(in_file, **kwargs) + out_file = func_on_disk(in_file, delete=args.delete, **kwargs) if out_file == in_file: print(f'{str(in_file)} is already {func.__name__}ed') else: print(f'Created {str(out_file)}') - assert out_file.exists() - if args.delete: - if len(warning_list) == 0 and in_file != out_file: - in_file.unlink() - print(f'Deleted {str(in_file)}') + if args.delete and not in_file.exists(): + print(f'Deleted {str(in_file)}') if len(args.files) == 0: with _record_warnings() as warning_list: @@ -110,12 +106,3 @@ def _add_common_args(parser): 'finishes without any errors and warnings') parser.add_argument('--version', action='version', version=__version__) parser.add_argument('--rnxcmp-version', action='version', version=rnxcmp_version) - - -@contextmanager -def _record_warnings(): - with warnings.catch_warnings(record=True) as warning_list: - yield warning_list - for w in warning_list: - warnings.showwarning(message=w.message, category=w.category, filename=w.filename, - lineno=w.lineno, file=w.file, line=w.line) diff --git a/hatanaka/general_compression.py b/hatanaka/general_compression.py index 0c6407b..ba1f177 100644 --- a/hatanaka/general_compression.py +++ b/hatanaka/general_compression.py @@ -1,7 +1,9 @@ import bz2 import gzip import re +import warnings import zipfile +from contextlib import contextmanager from io import BytesIO from pathlib import Path from typing import Union @@ -64,7 +66,8 @@ def decompress(content: Union[Path, str, bytes], *, return _decompress(content, skip_strange_epochs)[1] -def decompress_on_disk(path: Union[Path, str], *, skip_strange_epochs: bool = False) -> Path: +def decompress_on_disk(path: Union[Path, str], *, delete: bool = False, + skip_strange_epochs: bool = False) -> Path: """Decompress compressed RINEX files and write the resulting file to disk. Any RINEX files compressed with Hatanaka compression (.crx|.##d) and/or with a conventional @@ -77,6 +80,8 @@ def decompress_on_disk(path: Union[Path, str], *, skip_strange_epochs: bool = Fa ---------- path : Path or str Path to a compressed RINEX file. + delete : bool, default False + Delete the source file after successful decompression if no errors or warnings were raised. skip_strange_epochs : bool, default False For Hatanaka decompression. Warn and skip strange epochs instead of raising an exception. @@ -100,13 +105,18 @@ def decompress_on_disk(path: Union[Path, str], *, skip_strange_epochs: bool = Fa For invalid file contents. """ path = Path(path) - is_obs, txt = _decompress(path.read_bytes(), skip_strange_epochs=skip_strange_epochs) + with _record_warnings() as warning_list: + is_obs, txt = _decompress(path.read_bytes(), skip_strange_epochs=skip_strange_epochs) out_path = get_decompressed_path(path) if out_path == path: # file does not need decompressing return out_path with out_path.open('wb') as f_out: f_out.write(txt) + assert out_path.exists() + if delete: + if len(warning_list) == 0 and out_path != path: + path.unlink() return out_path @@ -186,7 +196,7 @@ def compress(content: Union[Path, str, bytes], *, compression: str = 'gz', return _compress(content, compression, skip_strange_epochs, reinit_every_nth)[1] -def compress_on_disk(path: Union[Path, str], *, compression: str = 'gz', +def compress_on_disk(path: Union[Path, str], *, compression: str = 'gz', delete: bool = False, skip_strange_epochs: bool = False, reinit_every_nth: int = None) -> Path: """Compress RINEX files. @@ -200,6 +210,8 @@ def compress_on_disk(path: Union[Path, str], *, compression: str = 'gz', Path to a RINEX file. compression : 'gz' (default), 'bz2', or 'none' Which compression (if any) to apply in addition to the Hatanaka compression. + delete : bool, default False + Delete the source file after successful compression if no errors or warnings were raised. skip_strange_epochs : bool, default False For Hatanaka compression. Warn and skip strange epochs instead of raising an exception. reinit_every_nth : int, optional @@ -225,11 +237,18 @@ def compress_on_disk(path: Union[Path, str], *, compression: str = 'gz', if path.name.lower().endswith(('.gz', '.bz2', '.z', '.zip')): # already compressed return path - is_obs, txt = _compress(path.read_bytes(), compression=compression, - skip_strange_epochs=skip_strange_epochs, - reinit_every_nth=reinit_every_nth) + with _record_warnings() as warning_list: + is_obs, txt = _compress(path.read_bytes(), compression=compression, + skip_strange_epochs=skip_strange_epochs, + reinit_every_nth=reinit_every_nth) out_path = get_compressed_path(path, is_obs, compression) + if out_path == path: + return out_path out_path.write_bytes(txt) + assert out_path.exists() + if delete: + if len(warning_list) == 0: + path.unlink() return out_path @@ -353,3 +372,12 @@ def _compress_hatanaka(txt: bytes, skip_strange_epochs, reinit_every_nth) -> (bo else: is_obs = b'COMPACT RINEX' in txt[:80] return is_obs, txt + + +@contextmanager +def _record_warnings(): + with warnings.catch_warnings(record=True) as warning_list: + yield warning_list + for w in warning_list: + warnings.showwarning(message=w.message, category=w.category, filename=w.filename, + lineno=w.lineno, file=w.file, line=w.line) diff --git a/hatanaka/test/test_general_compression.py b/hatanaka/test/test_general_compression.py index fada9bb..783ef34 100644 --- a/hatanaka/test/test_general_compression.py +++ b/hatanaka/test/test_general_compression.py @@ -173,3 +173,41 @@ def test_invalid_name(tmp_path, rnx_sample): compress_on_disk(sample_path) msg = excinfo.value.args[0] assert msg.endswith('is not a valid RINEX file name') + + +def test_decompress_on_disk_delete(tmp_path, rnx_bytes): + # prepare + in_file = 'sample.crx.gz' + sample_path = tmp_path / in_file + shutil.copy(get_data_path(in_file), sample_path) + # decompress and delete + out_path = decompress_on_disk(sample_path, delete=True) + # check + expected_path = tmp_path / 'sample.rnx' + assert not sample_path.exists() + assert out_path == expected_path + assert expected_path.exists() + assert clean(decompress(expected_path)) == clean(rnx_bytes) + # check that already decompressed is not deleted + out_path = decompress_on_disk(expected_path, delete=True) + assert out_path == expected_path + assert out_path.exists() + + +def test_compress_on_disk_delete(tmp_path, rnx_bytes): + # prepare + in_file = 'sample.rnx' + sample_path = tmp_path / in_file + shutil.copy(get_data_path(in_file), sample_path) + # decompress and delete + out_path = compress_on_disk(sample_path, delete=True) + # check + expected_path = tmp_path / 'sample.crx.gz' + assert not sample_path.exists() + assert out_path == expected_path + assert expected_path.exists() + assert clean(decompress(expected_path)) == clean(rnx_bytes) + # check that already decompressed is not deleted + out_path = compress_on_disk(expected_path, delete=True) + assert out_path == expected_path + assert out_path.exists()