Skip to content

Commit

Permalink
Avoid double serialize when proxying values (#117)
Browse files Browse the repository at this point in the history
Proxied objects end up being serialized twice: first when the object is
serialized with Colmena to check the size of the serialized object and
then again by ProxyStore when the object is proxied.

ProxyStore supports passing custom serializer/deserializer functions so
this commit adds a shim serializer and a deserializer wrapper to
minimize serialization overheads when an object gets proxied.

Note that there is still one extra step when using Colmena's pickle
serialization method and ProxyStore. SerializationMethod.serialize
will pickle the object producing a byte string then convert those bytes
to a hex string then the shims convert that hex string back to bytes.
I.e., there's this intermediate string representation that's created
then discarded in this path. I don't really see a way of avoiding this
because its an artifact of the different serialization types between
Colmena and ProxyStore (str vs bytes).
  • Loading branch information
gpauloski committed Dec 13, 2023
1 parent fd940da commit d3df232
Showing 1 changed file with 76 additions and 1 deletion.
77 changes: 76 additions & 1 deletion colmena/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from functools import partial
from io import StringIO
from pathlib import Path
from subprocess import run
Expand Down Expand Up @@ -68,6 +69,62 @@ def deserialize(method: 'SerializationMethod', message: str) -> Any:
raise NotImplementedError(f'Method {method} not yet implemented')


def _serialized_str_to_bytes_shim(
s: str,
method: Union[str, SerializationMethod],
) -> bytes:
"""Shim between Colmena serialized objects and bytes.
Colmena's serialization mechanisms produce strings but ProxyStore
serializes to bytes, so this shim takes a an object serialized by Colmena
and converts it to bytes.
Args:
s: Serialized string object
method: Serialization method used to produce s
Returns:
bytes representation of s
"""
if method == "json":
return s.encode('utf-8')
elif method == "pickle":
# In this case the conversion goes from obj > bytes > str > bytes
# which results in an unecessary conversion to a string but this is
# an unavoidable side effect of converting between the Colmena
# and ProxyStore serialization formats.
return bytes.fromhex(s)
else:
raise NotImplementedError(f'Method {method} not yet implemented')


def _serialized_bytes_to_obj_wrapper(
b: str,
method: Union[str, SerializationMethod],
) -> Any:
"""Wrapper which converts bytes to strings before deserializing.
Args:
b: Byte string of serialized object
method: Serialization method used to produce b
Returns:
Deserialized object
"""
if method == "json":
s = b.decode('utf-8')
elif method == "pickle":
# In this case the conversion goes from bytes > str > bytes > obj
# which results in an unecessary conversion to a string but this is
# an unavoidable side effect of converting between the Colmena
# and ProxyStore serialization formats.
s = b.hex()
else:
raise NotImplementedError(f'Method {method} not yet implemented')

return SerializationMethod.serialize(method, s)


class FailureInformation(BaseModel):
"""Stores information about a task failure"""

Expand Down Expand Up @@ -306,7 +363,25 @@ def _serialize_and_proxy(value, evict=False) -> Tuple[str, int]:
not isinstance(value, Proxy) and
value_size >= self.proxystore_threshold
):
value_proxy = store.proxy(value, evict=evict)
# Override ProxyStore's default serialization with these shims
# to Colmena's serialization mechanisms. This avoids value
# being serialized twice: once to get the size of the
# serialized object and once by proxy().
deserializer = partial(
_serialized_bytes_to_obj_wrapper,
method=self.serialization_method,
)
serializer = partial(
_serialized_str_to_bytes_shim,
method=self.serialization_method,
)

value_proxy = store.proxy(
value_str,
evict=evict,
deserializer=deserializer,
serializer=serializer,
)
logger.debug(f'Proxied object of type {type(value)} with id={id(value)}')
proxies.append(value_proxy)

Expand Down

0 comments on commit d3df232

Please sign in to comment.