Skip to content

Commit

Permalink
Added a method to filter out "ghost" objects GCS started to create
Browse files Browse the repository at this point in the history
  • Loading branch information
VOvchinnikov committed Sep 14, 2023
1 parent 863c93d commit 053b956
Showing 1 changed file with 35 additions and 23 deletions.
58 changes: 35 additions & 23 deletions gcsfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,9 @@

logger = logging.getLogger("gcsfs")


if "GCSFS_DEBUG" in os.environ:
setup_logging(logger=logger, level=os.getenv("GCSFS_DEBUG"))


# client created 2018-01-16
ACLs = {
"authenticatedread",
Expand All @@ -50,9 +48,9 @@
}
DEFAULT_PROJECT = os.getenv("GCSFS_DEFAULT_PROJECT", "")

GCS_MIN_BLOCK_SIZE = 2**18
GCS_MAX_BLOCK_SIZE = 2**28
DEFAULT_BLOCK_SIZE = 5 * 2**20
GCS_MIN_BLOCK_SIZE = 2 ** 18
GCS_MAX_BLOCK_SIZE = 2 ** 28
DEFAULT_BLOCK_SIZE = 5 * 2 ** 20

SUPPORTED_FIXED_KEY_METADATA = {
"content_encoding": "contentEncoding",
Expand Down Expand Up @@ -118,7 +116,7 @@ def _chunks(lst, n):
Implementation based on https://stackoverflow.com/a/312464.
"""
for i in range(0, len(lst), n):
yield lst[i : i + n]
yield lst[i: i + n]


def _coalesce_generation(*args):
Expand Down Expand Up @@ -351,9 +349,9 @@ def _strip_protocol(cls, path):
protos = (cls.protocol,) if isinstance(cls.protocol, str) else cls.protocol
for protocol in protos:
if path.startswith(protocol + "://"):
path = path[len(protocol) + 3 :]
path = path[len(protocol) + 3:]
elif path.startswith(protocol + "::"):
path = path[len(protocol) + 2 :]
path = path[len(protocol) + 2:]
# use of root_marker to make minimum required path, e.g., "/"
return path or cls.root_marker

Expand Down Expand Up @@ -406,7 +404,6 @@ async def _request(
data=data,
timeout=self.requests_timeout,
) as r:

status = r.status
headers = r.headers
info = r.request_info # for debug only
Expand Down Expand Up @@ -507,7 +504,7 @@ async def _get_object(self, path):
maxResults=1 if not generation else None,
versions="true" if generation else None,
)
for item in resp.get("items", []):
for item in self._filter_ghost_items(resp.get("items", [])):
if item["name"] == key and (
not generation or item.get("generation") == generation
):
Expand Down Expand Up @@ -559,6 +556,21 @@ async def _list_objects(self, path, prefix="", versions=False):
self.dircache[path] = out
return out

@staticmethod
def _filter_ghost_items(items):
if not items:
items = []

filtered_items = []

for item in items:
if item.get("kind", "") != "storage#object" \
and item.get("size", "0") != "0" \
and item.get("crc32c", "") != "AAAAAA==":
filtered_items.append(item)

return filtered_items

async def _do_list_objects(
self, path, max_results=None, delimiter="/", prefix="", versions=False
):
Expand All @@ -581,7 +593,7 @@ async def _do_list_objects(
)

prefixes.extend(page.get("prefixes", []))
items.extend(page.get("items", []))
items.extend(self._filter_ghost_items(page.get("items", [])))
next_page_token = page.get("nextPageToken", None)

while next_page_token is not None:
Expand All @@ -599,7 +611,7 @@ async def _do_list_objects(

assert page["kind"] == "storage#objects"
prefixes.extend(page.get("prefixes", []))
items.extend(page.get("items", []))
items.extend(self._filter_ghost_items(page.get("items", [])))
next_page_token = page.get("nextPageToken", None)

items = [self._process_object(bucket, i) for i in items]
Expand All @@ -612,7 +624,7 @@ async def _list_buckets(self):
page = await self._call("GET", "b", project=self.project, json_out=True)

assert page["kind"] == "storage#buckets"
items.extend(page.get("items", []))
items.extend(self._filter_ghost_items(page.get("items", [])))
next_page_token = page.get("nextPageToken", None)

while next_page_token is not None:
Expand All @@ -625,7 +637,7 @@ async def _list_buckets(self):
)

assert page["kind"] == "storage#buckets"
items.extend(page.get("items", []))
items.extend(self._filter_ghost_items(page.get("items", [])))
next_page_token = page.get("nextPageToken", None)

buckets = [
Expand Down Expand Up @@ -1025,7 +1037,7 @@ async def _rm_files(self, paths):
f"{self._location}/batch/storage/v1",
headers={
"Content-Type": 'multipart/mixed; boundary="=========='
'=====7330845974216740156=="'
'=====7330845974216740156=="'
},
data=body + "\n--===============7330845974216740156==--",
)
Expand Down Expand Up @@ -1058,7 +1070,7 @@ async def _rm(self, path, recursive=False, maxdepth=None, batchsize=20):
exs = await asyncio.gather(
*(
[
self._rm_files(files[i : i + batchsize])
self._rm_files(files[i: i + batchsize])
for i in range(0, len(files), batchsize)
]
),
Expand All @@ -1074,8 +1086,8 @@ async def _rm(self, path, recursive=False, maxdepth=None, batchsize=20):
ex
for ex in exs
if ex is not None
and "No such object" not in str(ex)
and not isinstance(ex, FileNotFoundError)
and "No such object" not in str(ex)
and not isinstance(ex, FileNotFoundError)
]
if exs:
raise exs[0]
Expand All @@ -1090,21 +1102,21 @@ async def _pipe_file(
metadata=None,
consistency=None,
content_type="application/octet-stream",
chunksize=50 * 2**20,
chunksize=50 * 2 ** 20,
):
# enforce blocksize should be a multiple of 2**18
consistency = consistency or self.consistency
bucket, key, generation = self.split_path(path)
size = len(data)
out = None
if size < 5 * 2**20:
if size < 5 * 2 ** 20:
location = await simple_upload(
self, bucket, key, data, metadata, consistency, content_type
)
else:
location = await initiate_upload(self, bucket, key, content_type, metadata)
for offset in range(0, len(data), chunksize):
bit = data[offset : offset + chunksize]
bit = data[offset: offset + chunksize]
out = await upload_chunk(
self, location, bit, offset, size, content_type
)
Expand All @@ -1123,7 +1135,7 @@ async def _put_file(
metadata=None,
consistency=None,
content_type="application/octet-stream",
chunksize=50 * 2**20,
chunksize=50 * 2 ** 20,
callback=None,
**kwargs,
):
Expand All @@ -1141,7 +1153,7 @@ async def _put_file(
f0.seek(0)
callback.set_size(size)

if size < 5 * 2**20:
if size < 5 * 2 ** 20:
await simple_upload(
self,
bucket,
Expand Down

0 comments on commit 053b956

Please sign in to comment.