diff --git a/.github/workflows/python.yaml b/.github/workflows/python.yaml index 994fb4f09..205fd96f4 100644 --- a/.github/workflows/python.yaml +++ b/.github/workflows/python.yaml @@ -119,10 +119,7 @@ jobs: if: ${{ matrix.module != 'gpu' }} run: | pip install numpy scipy cython coverage flaky - - if [[ "$PYTHON" == '3.9' ]]; then - conda install numba - fi + if [[ "$MODULE" == "xorbits" ]]; then pip install openpyxl fi diff --git a/python/xorbits/_mars/dataframe/base/__init__.py b/python/xorbits/_mars/dataframe/base/__init__.py index 50d9702e9..7ecd5fa5a 100644 --- a/python/xorbits/_mars/dataframe/base/__init__.py +++ b/python/xorbits/_mars/dataframe/base/__init__.py @@ -14,6 +14,7 @@ # limitations under the License. from .apply import df_apply, series_apply +from .applymap import df_applymap from .astype import astype, index_astype from .cartesian_chunk import cartesian_chunk from .check_monotonic import ( @@ -66,6 +67,7 @@ def _install(): setattr(t, "rechunk", rechunk) setattr(t, "describe", describe) setattr(t, "apply", df_apply) + setattr(t, "applymap", df_applymap) setattr(t, "transform", df_transform) setattr(t, "isin", df_isin) setattr(t, "shift", shift) diff --git a/python/xorbits/_mars/dataframe/base/applymap.py b/python/xorbits/_mars/dataframe/base/applymap.py new file mode 100644 index 000000000..f13de3db8 --- /dev/null +++ b/python/xorbits/_mars/dataframe/base/applymap.py @@ -0,0 +1,242 @@ +# Copyright 2022-2023 XProbe Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Callable, Union + +import cloudpickle +import numpy as np +import pandas as pd + +from ... import opcodes +from ...core import OutputType +from ...core.context import Context +from ...core.custom_log import redirect_custom_log +from ...serialization.serializables import AnyField, DictField, StringField +from ...utils import enter_current_session, get_func_token, quiet_stdio +from ..operands import DataFrameOperand, DataFrameOperandMixin +from ..utils import build_df, build_empty_df, parse_index + + +class DataFrameApplymap(DataFrameOperand, DataFrameOperandMixin): + _op_type_ = opcodes.APPLYMAP + + func = AnyField("func") + na_action = StringField("na_action", default=None) + func_token = StringField("func_token", default=None) + kwds = DictField("kwds", default=None) + + def __init__(self, **kw): + super().__init__(**kw) + + def _load_func(self): + if isinstance(self.func, bytes): + return cloudpickle.loads(self.func) + else: + return self.func + + @classmethod + @redirect_custom_log + @enter_current_session + def execute(cls, ctx: Union[dict, Context], op: "DataFrameApplymap"): + func = op._load_func() + input_data = ctx[op.inputs[0].key] + out = op.outputs[0] + if len(input_data) == 0: + ctx[out.key] = build_empty_df(out.dtypes) + + result = input_data.applymap( + func, + na_action=op.na_action, + **op.kwds, + ) + ctx[out.key] = result + + @classmethod + def tile(cls, op: "DataFrameApplymap"): + in_df = op.inputs[0] + out_df = op.outputs[0] + chunks = [] + for c in in_df.chunks: + new_shape = c.shape + + new_index_value, new_columns_value = c.index_value, c.columns_value + + new_dtypes = out_df.dtypes + + new_op = op.copy().reset_key() + new_op.tileable_op_key = op.key + chunks.append( + new_op.new_chunk( + [c], + shape=tuple(new_shape), + index=c.index, + dtypes=new_dtypes, + index_value=new_index_value, + columns_value=new_columns_value, + ) + ) + new_nsplits = list(in_df.nsplits) + new_op = op.copy() + kw = out_df.params.copy() + new_nsplits = tuple(new_nsplits) + kw.update(dict(chunks=chunks, nsplits=new_nsplits)) + return new_op.new_tileables(op.inputs, **kw) + + def _infer_df_func_returns(self, df: pd.DataFrame): + func = self._load_func() + if isinstance(func, np.ufunc): + output_type = OutputType.dataframe + new_dtypes = None + index_value = "inherit" + else: + output_type = new_dtypes = index_value = None + + try: + empty_df = build_df(df, size=2) + with np.errstate(all="ignore"), quiet_stdio(): + infer_df = empty_df.applymap( + func, + na_action=self.na_action, + **self.kwds, + ) + if index_value is None: + if infer_df.index is empty_df.index: + index_value = "inherit" + else: # pragma: no cover + index_value = parse_index(pd.RangeIndex(-1)) + + output_type = output_type or OutputType.dataframe + new_dtypes = new_dtypes or infer_df.dtypes + except: # noqa: E722 # nosec + pass + + self.output_types = ( + [output_type] if not self.output_types else self.output_types + ) + dtypes = new_dtypes + return dtypes, index_value + + def __call__(self, df: pd.DataFrame, skip_infer: bool = False): + self.func_token = get_func_token(self.func) + if skip_infer: + self.func = cloudpickle.dumps(self.func) + return self.new_dataframe([df]) + + # for backward compatibility + dtypes, index_value = self._infer_df_func_returns(df) + + if index_value is None: + index_value = parse_index(None, (df.key, df.index_value.key)) + for arg, desc in zip((self.output_types, dtypes), ("output_types", "dtypes")): + if arg is None: + raise TypeError( + f"Cannot determine {desc} by calculating with enumerate data, " + "please specify it as arguments" + ) + + if index_value == "inherit": + index_value = df.index_value + + shape = df.shape + + self.func = cloudpickle.dumps(self.func) + + return self.new_dataframe( + [df], + shape=shape, + dtypes=dtypes, + index_value=index_value, + columns_value=parse_index(dtypes.index, store_data=True), + ) + + +def df_applymap( + df: pd.DataFrame, + func: Callable, + na_action: str = None, + skip_infer: bool = False, + **kwds, +): + """ + Apply a function to a Dataframe elementwise. + + This method applies a function that accepts and returns a scalar + to every element of a DataFrame. + + Parameters + ---------- + func : callable + Python function, returns a single value from a single value. + + na_action : {None, 'ignore'}, default None + If 'ignore', propagate NaN values, without passing them to func. + + skip_infer: bool, default False + Whether infer dtypes when dtypes or output_type is not specified. + + **kwds + Additional keyword arguments to pass as keywords arguments to + `func`. + + Returns + ------- + DataFrame + Transformed DataFrame. + + See Also + -------- + DataFrame.apply : Apply a function along input axis of DataFrame. + + Examples + -------- + >>> df = pd.DataFrame([[1, 2.12], [3.356, 4.567]]) + >>> df + 0 1 + 0 1.000 2.120 + 1 3.356 4.567 + + >>> df.applymap(lambda x: len(str(x))) + 0 1 + 0 3 4 + 1 5 5 + + Like Series.map, NA values can be ignored: + + >>> df_copy = df.copy() + >>> df_copy.iloc[0, 0] = pd.NA + >>> df_copy.applymap(lambda x: len(str(x)), na_action='ignore') + 0 1 + 0 NaN 4 + 1 5.0 5 + + Note that a vectorized version of `func` often exists, which will + be much faster. You could square each number elementwise. + + >>> df.applymap(lambda x: x**2) + 0 1 + 0 1.000000 4.494400 + 1 11.262736 20.857489 + + But it's better to avoid applymap in that case. + + >>> df ** 2 + 0 1 + 0 1.000000 4.494400 + 1 11.262736 20.857489 + """ + if na_action not in {"ignore", None}: + raise ValueError(f"na_action must be 'ignore' or None. Got {repr(na_action)}") + + op = DataFrameApplymap(func=func, na_action=na_action, kwds=kwds) + return op(df, skip_infer=skip_infer) diff --git a/python/xorbits/_mars/dataframe/base/tests/test_base_execution.py b/python/xorbits/_mars/dataframe/base/tests/test_base_execution.py index 3c3e1e82f..90352bab7 100644 --- a/python/xorbits/_mars/dataframe/base/tests/test_base_execution.py +++ b/python/xorbits/_mars/dataframe/base/tests/test_base_execution.py @@ -542,6 +542,150 @@ def test_apply_with_arrow_dtype_execution(setup): pd.testing.assert_series_equal(result, expected) +def test_data_frame_applymap_execute(setup): + # TODO: support GPU for appltmap operation + # test one chunk + df_raw = pd.DataFrame([[1, 2.12], [3.356, 4.567]]) + df = from_pandas_df(df_raw, chunk_size=4) + + r = df.applymap(lambda x: len(str(x))) + result = r.execute().fetch() + expected = df_raw.applymap(lambda x: len(str(x))) + pd.testing.assert_frame_equal(result, expected) + + r = df.applymap(np.sqrt) + result = r.execute().fetch() + expected = df_raw.applymap(np.sqrt) + pd.testing.assert_frame_equal(result, expected) + + # test multiple chunks + df_raw = pd.DataFrame(np.random.rand(10, 2)) + df = from_pandas_df(df_raw, chunk_size=3) + + r = df.applymap(lambda x: len(str(x))) + result = r.execute().fetch() + expected = df_raw.applymap(lambda x: len(str(x))) + pd.testing.assert_frame_equal(result, expected) + + r = df.applymap(np.square) + result = r.execute().fetch() + expected = df_raw.applymap(np.square) + pd.testing.assert_frame_equal(result, expected) + + r = df.applymap(np.sqrt, na_action="ignore", skip_infer=True) + result = r.execute().fetch() + excepted = df_raw.applymap(np.sqrt, na_action="ignore") + pd.testing.assert_frame_equal(result, excepted) + + df_raw = pd.DataFrame([[1.0, 2.0], [3.0, 4.0]]) + df = from_pandas_df(df_raw, chunk_size=2) + + r = df.applymap(lambda x: int(x)) + result = r.execute().fetch() + expected = df_raw.applymap(lambda x: int(x)) + pd.testing.assert_frame_equal(result, expected) + + r = df.applymap(lambda x: int(x) ** 2 if x > 2 else x * 2) + result = r.execute().fetch() + expected = df_raw.applymap(lambda x: int(x) ** 2 if x > 2 else x * 2) + pd.testing.assert_frame_equal(result, expected) + + # test with strings and numbers + df_raw = pd.DataFrame([["a", 1], ["b", 2]]) + df = from_pandas_df(df_raw, chunk_size=2) + + r = df.applymap(lambda x: str(x)) + result = r.execute().fetch() + expected = df_raw.applymap(lambda x: str(x)) + pd.testing.assert_frame_equal(result, expected) + + # test with timestamp + df_raw = pd.DataFrame([[pd.Timestamp.now(), 2], [pd.Timestamp.now(), 3]]) + df = from_pandas_df(df_raw, chunk_size=2) + + r = df.applymap(lambda x: str(x)) + result = r.execute().fetch() + expected = df_raw.applymap(lambda x: str(x)) + pd.testing.assert_frame_equal(result, expected) + + # test na_action with different data types + df_raw = pd.DataFrame( + [[pd.NA, 2.12], [3.356, "hello"], [pd.Timestamp.now(), 4.567]] + ) + df = from_pandas_df(df_raw, chunk_size=3) + + r = df.applymap(lambda x: len(str(x)), na_action="ignore", skip_infer=True) + result = r.execute().fetch() + expected = df_raw.applymap(lambda x: len(str(x)), na_action="ignore") + pd.testing.assert_frame_equal(result, expected) + + r = df.applymap(lambda x: str(x), na_action="ignore", skip_infer=True) + result = r.execute().fetch() + expected = df_raw.applymap(lambda x: str(x), na_action="ignore") + pd.testing.assert_frame_equal(result, expected) + + # test na_action with sqrt on int + df_raw = pd.DataFrame([[pd.NA, 2], [3, 4]]) + df = from_pandas_df(df_raw, chunk_size=2) + + r = df.applymap(np.sqrt, na_action="ignore", skip_infer=True) + result = r.execute().fetch() + expected = df_raw.applymap(np.sqrt, na_action="ignore") + pd.testing.assert_frame_equal(result, expected) + + # test skip_infer error + with pytest.raises(TypeError): + df.applymap(np.sqrt, na_action="ignore").execute() + + # test custom function + def custom_func(x): + if isinstance(x, float): + return int(x) + elif isinstance(x, int): + return float(x) ** 2 + elif isinstance(x, str): + return len(x) + elif isinstance(x, pd.Timestamp): + return x.year + else: + return x + + df_raw = pd.DataFrame([[1.0, "hello"], [3, pd.Timestamp.now()]]) + df = from_pandas_df(df_raw, chunk_size=2) + + r = df.applymap(custom_func) + result = r.execute().fetch() + expected = df_raw.applymap(custom_func) + pd.testing.assert_frame_equal(result, expected) + + # test na_action error + with pytest.raises(ValueError): + df.applymap(lambda x: len(str(x)), na_action="unknown") + + # test empty dataframe + df_raw = pd.DataFrame() + df = from_pandas_df(df_raw, chunk_size=2) + + r = df.applymap(lambda x: len(str(x))) + result = r.execute().fetch() + expected = df_raw.applymap(lambda x: len(str(x))) + pd.testing.assert_frame_equal(result, expected) + + # test func kwargs input + def kw_func(x, dete=False): + if dete: + return x**2 + else: + return x + 1 + + df_raw = pd.DataFrame([[1.0, 2.0], [3.0, 4.0]]) + df = from_pandas_df(df_raw, chunk_size=2) + r = df.applymap(kw_func, dete=True) + result = r.execute().fetch() + expected = df_raw.applymap(kw_func, dete=True) + pd.testing.assert_frame_equal(result, expected) + + def test_transform_execute(setup): cols = [chr(ord("A") + i) for i in range(10)] df_raw = pd.DataFrame(dict((c, [i**2 for i in range(20)]) for c in cols)) diff --git a/python/xorbits/_mars/opcodes.py b/python/xorbits/_mars/opcodes.py index e113a5a8c..80f0f4e95 100644 --- a/python/xorbits/_mars/opcodes.py +++ b/python/xorbits/_mars/opcodes.py @@ -387,6 +387,7 @@ DUPLICATED = 739 DELETE = 740 ALIGN = 741 +APPLYMAP = 742 FUSE = 801