From 8fc6b5bcba2a8e1113bab80e4c133a284761f8ba Mon Sep 17 00:00:00 2001 From: VOvchinnikov Date: Thu, 14 Sep 2023 18:26:05 +0200 Subject: [PATCH] Added an extra CRC check for "ghost" files --- gcsfs/core.py | 37 ++++++++++++++++++------------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/gcsfs/core.py b/gcsfs/core.py index 52305541..da4d07dd 100644 --- a/gcsfs/core.py +++ b/gcsfs/core.py @@ -27,11 +27,9 @@ logger = logging.getLogger("gcsfs") - if "GCSFS_DEBUG" in os.environ: setup_logging(logger=logger, level=os.getenv("GCSFS_DEBUG")) - # client created 2018-01-16 ACLs = { "authenticatedread", @@ -50,9 +48,9 @@ } DEFAULT_PROJECT = os.getenv("GCSFS_DEFAULT_PROJECT", "") -GCS_MIN_BLOCK_SIZE = 2**18 -GCS_MAX_BLOCK_SIZE = 2**28 -DEFAULT_BLOCK_SIZE = 5 * 2**20 +GCS_MIN_BLOCK_SIZE = 2 ** 18 +GCS_MAX_BLOCK_SIZE = 2 ** 28 +DEFAULT_BLOCK_SIZE = 5 * 2 ** 20 SUPPORTED_FIXED_KEY_METADATA = { "content_encoding": "contentEncoding", @@ -118,7 +116,7 @@ def _chunks(lst, n): Implementation based on https://stackoverflow.com/a/312464. """ for i in range(0, len(lst), n): - yield lst[i : i + n] + yield lst[i: i + n] def _coalesce_generation(*args): @@ -351,9 +349,9 @@ def _strip_protocol(cls, path): protos = (cls.protocol,) if isinstance(cls.protocol, str) else cls.protocol for protocol in protos: if path.startswith(protocol + "://"): - path = path[len(protocol) + 3 :] + path = path[len(protocol) + 3:] elif path.startswith(protocol + "::"): - path = path[len(protocol) + 2 :] + path = path[len(protocol) + 2:] # use of root_marker to make minimum required path, e.g., "/" return path or cls.root_marker @@ -406,7 +404,6 @@ async def _request( data=data, timeout=self.requests_timeout, ) as r: - status = r.status headers = r.headers info = r.request_info # for debug only @@ -567,7 +564,9 @@ def _filter_ghost_items(items): filtered_items = [] for item in items: - if item.get("kind", "") != "storage#object" and item.get("size", "0") != "0": + if item.get("kind", "") != "storage#object" \ + and item.get("size", "0") != "0" \ + and item.get("crc32c", "") != "AAAAAA==": filtered_items.append(item) return filtered_items @@ -1038,7 +1037,7 @@ async def _rm_files(self, paths): f"{self._location}/batch/storage/v1", headers={ "Content-Type": 'multipart/mixed; boundary="==========' - '=====7330845974216740156=="' + '=====7330845974216740156=="' }, data=body + "\n--===============7330845974216740156==--", ) @@ -1071,7 +1070,7 @@ async def _rm(self, path, recursive=False, maxdepth=None, batchsize=20): exs = await asyncio.gather( *( [ - self._rm_files(files[i : i + batchsize]) + self._rm_files(files[i: i + batchsize]) for i in range(0, len(files), batchsize) ] ), @@ -1087,8 +1086,8 @@ async def _rm(self, path, recursive=False, maxdepth=None, batchsize=20): ex for ex in exs if ex is not None - and "No such object" not in str(ex) - and not isinstance(ex, FileNotFoundError) + and "No such object" not in str(ex) + and not isinstance(ex, FileNotFoundError) ] if exs: raise exs[0] @@ -1103,21 +1102,21 @@ async def _pipe_file( metadata=None, consistency=None, content_type="application/octet-stream", - chunksize=50 * 2**20, + chunksize=50 * 2 ** 20, ): # enforce blocksize should be a multiple of 2**18 consistency = consistency or self.consistency bucket, key, generation = self.split_path(path) size = len(data) out = None - if size < 5 * 2**20: + if size < 5 * 2 ** 20: location = await simple_upload( self, bucket, key, data, metadata, consistency, content_type ) else: location = await initiate_upload(self, bucket, key, content_type, metadata) for offset in range(0, len(data), chunksize): - bit = data[offset : offset + chunksize] + bit = data[offset: offset + chunksize] out = await upload_chunk( self, location, bit, offset, size, content_type ) @@ -1136,7 +1135,7 @@ async def _put_file( metadata=None, consistency=None, content_type="application/octet-stream", - chunksize=50 * 2**20, + chunksize=50 * 2 ** 20, callback=None, **kwargs, ): @@ -1154,7 +1153,7 @@ async def _put_file( f0.seek(0) callback.set_size(size) - if size < 5 * 2**20: + if size < 5 * 2 ** 20: await simple_upload( self, bucket,