Skip to content

Commit

Permalink
Added an extra CRC check for "ghost" files
Browse files Browse the repository at this point in the history
  • Loading branch information
VOvchinnikov committed Sep 14, 2023
1 parent dd2574d commit 8fc6b5b
Showing 1 changed file with 18 additions and 19 deletions.
37 changes: 18 additions & 19 deletions gcsfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,9 @@

logger = logging.getLogger("gcsfs")


if "GCSFS_DEBUG" in os.environ:
setup_logging(logger=logger, level=os.getenv("GCSFS_DEBUG"))


# client created 2018-01-16
ACLs = {
"authenticatedread",
Expand All @@ -50,9 +48,9 @@
}
DEFAULT_PROJECT = os.getenv("GCSFS_DEFAULT_PROJECT", "")

GCS_MIN_BLOCK_SIZE = 2**18
GCS_MAX_BLOCK_SIZE = 2**28
DEFAULT_BLOCK_SIZE = 5 * 2**20
GCS_MIN_BLOCK_SIZE = 2 ** 18
GCS_MAX_BLOCK_SIZE = 2 ** 28
DEFAULT_BLOCK_SIZE = 5 * 2 ** 20

SUPPORTED_FIXED_KEY_METADATA = {
"content_encoding": "contentEncoding",
Expand Down Expand Up @@ -118,7 +116,7 @@ def _chunks(lst, n):
Implementation based on https://stackoverflow.com/a/312464.
"""
for i in range(0, len(lst), n):
yield lst[i : i + n]
yield lst[i: i + n]


def _coalesce_generation(*args):
Expand Down Expand Up @@ -351,9 +349,9 @@ def _strip_protocol(cls, path):
protos = (cls.protocol,) if isinstance(cls.protocol, str) else cls.protocol
for protocol in protos:
if path.startswith(protocol + "://"):
path = path[len(protocol) + 3 :]
path = path[len(protocol) + 3:]
elif path.startswith(protocol + "::"):
path = path[len(protocol) + 2 :]
path = path[len(protocol) + 2:]
# use of root_marker to make minimum required path, e.g., "/"
return path or cls.root_marker

Expand Down Expand Up @@ -406,7 +404,6 @@ async def _request(
data=data,
timeout=self.requests_timeout,
) as r:

status = r.status
headers = r.headers
info = r.request_info # for debug only
Expand Down Expand Up @@ -567,7 +564,9 @@ def _filter_ghost_items(items):
filtered_items = []

for item in items:
if item.get("kind", "") != "storage#object" and item.get("size", "0") != "0":
if item.get("kind", "") != "storage#object" \
and item.get("size", "0") != "0" \
and item.get("crc32c", "") != "AAAAAA==":
filtered_items.append(item)

return filtered_items
Expand Down Expand Up @@ -1038,7 +1037,7 @@ async def _rm_files(self, paths):
f"{self._location}/batch/storage/v1",
headers={
"Content-Type": 'multipart/mixed; boundary="=========='
'=====7330845974216740156=="'
'=====7330845974216740156=="'
},
data=body + "\n--===============7330845974216740156==--",
)
Expand Down Expand Up @@ -1071,7 +1070,7 @@ async def _rm(self, path, recursive=False, maxdepth=None, batchsize=20):
exs = await asyncio.gather(
*(
[
self._rm_files(files[i : i + batchsize])
self._rm_files(files[i: i + batchsize])
for i in range(0, len(files), batchsize)
]
),
Expand All @@ -1087,8 +1086,8 @@ async def _rm(self, path, recursive=False, maxdepth=None, batchsize=20):
ex
for ex in exs
if ex is not None
and "No such object" not in str(ex)
and not isinstance(ex, FileNotFoundError)
and "No such object" not in str(ex)
and not isinstance(ex, FileNotFoundError)
]
if exs:
raise exs[0]
Expand All @@ -1103,21 +1102,21 @@ async def _pipe_file(
metadata=None,
consistency=None,
content_type="application/octet-stream",
chunksize=50 * 2**20,
chunksize=50 * 2 ** 20,
):
# enforce blocksize should be a multiple of 2**18
consistency = consistency or self.consistency
bucket, key, generation = self.split_path(path)
size = len(data)
out = None
if size < 5 * 2**20:
if size < 5 * 2 ** 20:
location = await simple_upload(
self, bucket, key, data, metadata, consistency, content_type
)
else:
location = await initiate_upload(self, bucket, key, content_type, metadata)
for offset in range(0, len(data), chunksize):
bit = data[offset : offset + chunksize]
bit = data[offset: offset + chunksize]
out = await upload_chunk(
self, location, bit, offset, size, content_type
)
Expand All @@ -1136,7 +1135,7 @@ async def _put_file(
metadata=None,
consistency=None,
content_type="application/octet-stream",
chunksize=50 * 2**20,
chunksize=50 * 2 ** 20,
callback=None,
**kwargs,
):
Expand All @@ -1154,7 +1153,7 @@ async def _put_file(
f0.seek(0)
callback.set_size(size)

if size < 5 * 2**20:
if size < 5 * 2 ** 20:
await simple_upload(
self,
bucket,
Expand Down

0 comments on commit 8fc6b5b

Please sign in to comment.