Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a method to filter out "ghost" objects GCS started to create #1

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 35 additions & 23 deletions gcsfs/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,9 @@

logger = logging.getLogger("gcsfs")


if "GCSFS_DEBUG" in os.environ:
setup_logging(logger=logger, level=os.getenv("GCSFS_DEBUG"))


# client created 2018-01-16
ACLs = {
"authenticatedread",
Expand All @@ -50,9 +48,9 @@
}
DEFAULT_PROJECT = os.getenv("GCSFS_DEFAULT_PROJECT", "")

GCS_MIN_BLOCK_SIZE = 2**18
GCS_MAX_BLOCK_SIZE = 2**28
DEFAULT_BLOCK_SIZE = 5 * 2**20
GCS_MIN_BLOCK_SIZE = 2 ** 18
GCS_MAX_BLOCK_SIZE = 2 ** 28
DEFAULT_BLOCK_SIZE = 5 * 2 ** 20

SUPPORTED_FIXED_KEY_METADATA = {
"content_encoding": "contentEncoding",
Expand Down Expand Up @@ -118,7 +116,7 @@ def _chunks(lst, n):
Implementation based on https://stackoverflow.com/a/312464.
"""
for i in range(0, len(lst), n):
yield lst[i : i + n]
yield lst[i: i + n]


def _coalesce_generation(*args):
Expand Down Expand Up @@ -351,9 +349,9 @@ def _strip_protocol(cls, path):
protos = (cls.protocol,) if isinstance(cls.protocol, str) else cls.protocol
for protocol in protos:
if path.startswith(protocol + "://"):
path = path[len(protocol) + 3 :]
path = path[len(protocol) + 3:]
elif path.startswith(protocol + "::"):
path = path[len(protocol) + 2 :]
path = path[len(protocol) + 2:]
# use of root_marker to make minimum required path, e.g., "/"
return path or cls.root_marker

Expand Down Expand Up @@ -406,7 +404,6 @@ async def _request(
data=data,
timeout=self.requests_timeout,
) as r:

status = r.status
headers = r.headers
info = r.request_info # for debug only
Expand Down Expand Up @@ -507,7 +504,7 @@ async def _get_object(self, path):
maxResults=1 if not generation else None,
versions="true" if generation else None,
)
for item in resp.get("items", []):
for item in self._filter_ghost_items(resp.get("items", [])):
if item["name"] == key and (
not generation or item.get("generation") == generation
):
Expand Down Expand Up @@ -559,6 +556,21 @@ async def _list_objects(self, path, prefix="", versions=False):
self.dircache[path] = out
return out

@staticmethod
def _filter_ghost_items(items):
if not items:
items = []

filtered_items = []

for item in items:
if item.get("kind", "") != "storage#object" \
and item.get("size", "0") != "0" \
and item.get("crc32c", "") != "AAAAAA==":
filtered_items.append(item)

return filtered_items

async def _do_list_objects(
self, path, max_results=None, delimiter="/", prefix="", versions=False
):
Expand All @@ -581,7 +593,7 @@ async def _do_list_objects(
)

prefixes.extend(page.get("prefixes", []))
items.extend(page.get("items", []))
items.extend(self._filter_ghost_items(page.get("items", [])))
next_page_token = page.get("nextPageToken", None)

while next_page_token is not None:
Expand All @@ -599,7 +611,7 @@ async def _do_list_objects(

assert page["kind"] == "storage#objects"
prefixes.extend(page.get("prefixes", []))
items.extend(page.get("items", []))
items.extend(self._filter_ghost_items(page.get("items", [])))
next_page_token = page.get("nextPageToken", None)

items = [self._process_object(bucket, i) for i in items]
Expand All @@ -612,7 +624,7 @@ async def _list_buckets(self):
page = await self._call("GET", "b", project=self.project, json_out=True)

assert page["kind"] == "storage#buckets"
items.extend(page.get("items", []))
items.extend(self._filter_ghost_items(page.get("items", [])))
next_page_token = page.get("nextPageToken", None)

while next_page_token is not None:
Expand All @@ -625,7 +637,7 @@ async def _list_buckets(self):
)

assert page["kind"] == "storage#buckets"
items.extend(page.get("items", []))
items.extend(self._filter_ghost_items(page.get("items", [])))
next_page_token = page.get("nextPageToken", None)

buckets = [
Expand Down Expand Up @@ -1025,7 +1037,7 @@ async def _rm_files(self, paths):
f"{self._location}/batch/storage/v1",
headers={
"Content-Type": 'multipart/mixed; boundary="=========='
'=====7330845974216740156=="'
'=====7330845974216740156=="'
},
data=body + "\n--===============7330845974216740156==--",
)
Expand Down Expand Up @@ -1058,7 +1070,7 @@ async def _rm(self, path, recursive=False, maxdepth=None, batchsize=20):
exs = await asyncio.gather(
*(
[
self._rm_files(files[i : i + batchsize])
self._rm_files(files[i: i + batchsize])
for i in range(0, len(files), batchsize)
]
),
Expand All @@ -1074,8 +1086,8 @@ async def _rm(self, path, recursive=False, maxdepth=None, batchsize=20):
ex
for ex in exs
if ex is not None
and "No such object" not in str(ex)
and not isinstance(ex, FileNotFoundError)
and "No such object" not in str(ex)
and not isinstance(ex, FileNotFoundError)
]
if exs:
raise exs[0]
Expand All @@ -1090,21 +1102,21 @@ async def _pipe_file(
metadata=None,
consistency=None,
content_type="application/octet-stream",
chunksize=50 * 2**20,
chunksize=50 * 2 ** 20,
):
# enforce blocksize should be a multiple of 2**18
consistency = consistency or self.consistency
bucket, key, generation = self.split_path(path)
size = len(data)
out = None
if size < 5 * 2**20:
if size < 5 * 2 ** 20:
location = await simple_upload(
self, bucket, key, data, metadata, consistency, content_type
)
else:
location = await initiate_upload(self, bucket, key, content_type, metadata)
for offset in range(0, len(data), chunksize):
bit = data[offset : offset + chunksize]
bit = data[offset: offset + chunksize]
out = await upload_chunk(
self, location, bit, offset, size, content_type
)
Expand All @@ -1123,7 +1135,7 @@ async def _put_file(
metadata=None,
consistency=None,
content_type="application/octet-stream",
chunksize=50 * 2**20,
chunksize=50 * 2 ** 20,
callback=None,
**kwargs,
):
Expand All @@ -1141,7 +1153,7 @@ async def _put_file(
f0.seek(0)
callback.set_size(size)

if size < 5 * 2**20:
if size < 5 * 2 ** 20:
await simple_upload(
self,
bucket,
Expand Down
Loading