From 0fcf95ac615f7a6fd1038857f069d5c7614e1136 Mon Sep 17 00:00:00 2001 From: jianfengmao Date: Thu, 13 Jan 2022 14:59:53 -0700 Subject: [PATCH] pandas module for Pandas integration 1. to_pandas() and to_table() to convert between DH tables and DF 2. test cases showed some lingering issues with DH NULL handling 3. performance is potentially an issue that has yet been addressed. --- pyintegration/deephaven2/column.py | 2 + pyintegration/deephaven2/csv.py | 1 + pyintegration/deephaven2/dtypes.py | 50 ++++--- pyintegration/deephaven2/pandas.py | 208 +++++++++++++++++++++++++++++ pyintegration/tests/test_column.py | 3 +- pyintegration/tests/test_pandas.py | 143 ++++++++++++++++++++ 6 files changed, 385 insertions(+), 22 deletions(-) create mode 100644 pyintegration/deephaven2/pandas.py create mode 100644 pyintegration/tests/test_pandas.py diff --git a/pyintegration/deephaven2/column.py b/pyintegration/deephaven2/column.py index 52211b495fb..d07915a3430 100644 --- a/pyintegration/deephaven2/column.py +++ b/pyintegration/deephaven2/column.py @@ -1,6 +1,8 @@ # # Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending # +""" This module defines the Column class that represents column definitions on Deephaven tables. """ + from dataclasses import dataclass, field from enum import Enum from typing import Sequence, Any diff --git a/pyintegration/deephaven2/csv.py b/pyintegration/deephaven2/csv.py index 44854e153ba..bca61eda5bf 100644 --- a/pyintegration/deephaven2/csv.py +++ b/pyintegration/deephaven2/csv.py @@ -41,6 +41,7 @@ class Inference(Enum): """ Configured parsers: BOOL, DATETIME, CHAR, STRING, SECONDS. """ + def _build_header(header: Dict[str, DType] = None): if not header: return None diff --git a/pyintegration/deephaven2/dtypes.py b/pyintegration/deephaven2/dtypes.py index e61e5d00819..eeefab1f0e9 100644 --- a/pyintegration/deephaven2/dtypes.py +++ b/pyintegration/deephaven2/dtypes.py @@ -7,9 +7,10 @@ """ from __future__ import annotations -from typing import Any, Sequence, Callable, Iterable +from typing import Iterable, Any, Sequence, Callable import jpy +import numpy as np from deephaven2 import DHError @@ -34,15 +35,24 @@ def from_jtype(cls, j_class: Any) -> DType: j_name = j_class.getName() dtype = DType._j_name_type_map.get(j_name) if not dtype: - return cls(j_name=j_name, j_type=j_class) + return cls(j_name=j_name, j_type=j_class, np_type=np.object_) else: return dtype - def __init__(self, j_name: str, j_type: Any = None, qst_type: Any = None, is_primitive: bool = False): + @classmethod + def from_np_dtype(cls, np_dtype: np.dtype): + for _, dtype in DType._j_name_type_map.items(): + if np.dtype(dtype.np_type) == np_dtype and dtype.np_type != np.object_: + return dtype + + return DType._j_name_type_map['org.jpy.PyObject'] + + def __init__(self, j_name: str, j_type: Any = None, qst_type: Any = None, is_primitive: bool = False, np_type=None): self.j_name = j_name self.j_type = j_type if j_type else jpy.get_type(j_name) self.qst_type = qst_type if qst_type else _qst_custom_type(j_name) self.is_primitive = is_primitive + self.np_type = np_type DType._j_name_type_map[j_name] = self @@ -104,7 +114,7 @@ class CharDType(DType): """ The class for char type. """ def __init__(self): - super().__init__(j_name="char", qst_type=_JQstType.charType(), is_primitive=True) + super().__init__(j_name="char", qst_type=_JQstType.charType(), is_primitive=True, np_type=np.dtype('uint16')) def array_from(self, seq: Sequence, remap: Callable[[Any], Any] = None): if isinstance(seq, str) and not remap: @@ -112,32 +122,32 @@ def array_from(self, seq: Sequence, remap: Callable[[Any], Any] = None): return super().array_from(seq, remap) -bool_ = DType(j_name="java.lang.Boolean", qst_type=_JQstType.booleanType()) -byte = DType(j_name="byte", qst_type=_JQstType.byteType(), is_primitive=True) +bool_ = DType(j_name="java.lang.Boolean", qst_type=_JQstType.booleanType(), np_type=np.bool_) +byte = DType(j_name="byte", qst_type=_JQstType.byteType(), is_primitive=True, np_type=np.int8) int8 = byte -short = DType(j_name="short", qst_type=_JQstType.shortType(), is_primitive=True) +short = DType(j_name="short", qst_type=_JQstType.shortType(), is_primitive=True, np_type=np.int16) int16 = short char = CharDType() -int_ = DType(j_name="int", qst_type=_JQstType.intType(), is_primitive=True) -int32 = int_ -long = DType(j_name="long", qst_type=_JQstType.longType(), is_primitive=True) +int32 = DType(j_name="int", qst_type=_JQstType.intType(), is_primitive=True, np_type=np.int32) +long = DType(j_name="long", qst_type=_JQstType.longType(), is_primitive=True, np_type=np.int64) int64 = long -float_ = DType(j_name="float", qst_type=_JQstType.floatType(), is_primitive=True) +int_ = long +float_ = DType(j_name="float", qst_type=_JQstType.floatType(), is_primitive=True, np_type=np.float32) single = float_ float32 = float_ -double = DType(j_name="double", qst_type=_JQstType.doubleType(), is_primitive=True) +double = DType(j_name="double", qst_type=_JQstType.doubleType(), is_primitive=True, np_type=np.float64) float64 = double -string = DType(j_name="java.lang.String", qst_type=_JQstType.stringType()) -BigDecimal = DType(j_name="java.math.BigDecimal") -StringSet = DType(j_name="io.deephaven.stringset.StringSet") -PyObject = DType(j_name="org.jpy.PyObject") -JObject = DType(j_name="java.lang.Object") -DateTime = DType(j_name="io.deephaven.time.DateTime") -Period = DType(j_name="io.deephaven.time.Period") +string = DType(j_name="java.lang.String", qst_type=_JQstType.stringType(), np_type=np.object_) +BigDecimal = DType(j_name="java.math.BigDecimal", np_type=np.object_) +StringSet = DType(j_name="io.deephaven.stringset.StringSet", np_type=np.object_) +DateTime = DType(j_name="io.deephaven.time.DateTime", np_type=np.dtype("datetime64[ns]")) +Period = DType(j_name="io.deephaven.time.Period", np_type=np.object_) +PyObject = DType(j_name="org.jpy.PyObject", np_type=np.object_) +JObject = DType(j_name="java.lang.Object", np_type=np.object_) def j_array_list(values: Iterable): - j_list = jpy.get_type("java.util.ArrayList")(len(values)) + j_list = jpy.get_type("java.util.ArrayList")(len(list(values))) try: for v in values: j_list.add(v) diff --git a/pyintegration/deephaven2/pandas.py b/pyintegration/deephaven2/pandas.py new file mode 100644 index 00000000000..09c021d5fff --- /dev/null +++ b/pyintegration/deephaven2/pandas.py @@ -0,0 +1,208 @@ +# +# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending +# +""" THis module supports the conversion between Deephaven tables and Pandas DataFrames. """ +from typing import List, Callable, TypeVar, Dict + +import jpy +import numpy as np +import pandas + +from deephaven2 import DHError, dtypes, empty_table, new_table +from deephaven2.column import Column, InputColumn +from deephaven2.dtypes import DType +from deephaven2.table import Table + +_JPrimitiveArrayConversionUtility = jpy.get_type("io.deephaven.integrations.common.PrimitiveArrayConversionUtility") +T = TypeVar("T") +R = TypeVar("R") + + +# _JImmutableObjectArraySource = jpy.get_type( +# "io.deephaven.engine.table.impl.sources.immutable.ImmutableObjectArraySource") +# _JImmutableLongAsDateTimeColumnSource = jpy.get_type( +# "io.deephaven.engine.table.impl.sources.immutable.ImmutableLongAsDateTimeColumnSource") +# _JTableTools = jpy.get_type("io.deephaven.engine.util.TableTools") +# +# +# +# def _to_column_name(pd_name: str): +# tmp_name = re.sub("\W+", " ", str(pd_name)).strip() +# return re.sub("\s+", "_", tmp_name) +# +# + +def _freeze_table(table: Table): + return empty_table(0).snapshot(table, True) + + +def _column_to_numpy_array(table: Table, col_def: Column, j_array, remap: Callable) -> np.ndarray: + try: + # TODO not happy with the current approach on Boolean + # + # TO be discussed with Chip + # 1. allow users to specify np.dtypes for specific columns to override the default mapping, might + # be a overkill just for Boolean + # 2. the remap works on individual values, could be slow, support vectorized versions? + # 3. md-array ? + + if remap: + mapped_list = [remap(x) for x in j_array] + return np.array(mapped_list) + + if col_def.data_type in {dtypes.char, dtypes.string}: + return np.array(j_array) + + np_array = np.empty(table.size, dtype=np.dtype(col_def.data_type.np_type)) + if col_def.data_type == dtypes.DateTime: + longs = _JPrimitiveArrayConversionUtility.translateArrayDateTimeToLong(j_array) + np_array[:] = longs + elif col_def.data_type == dtypes.bool_: + bytes_ = _JPrimitiveArrayConversionUtility.translateArrayBooleanToByte(j_array) + np_array[:] = bytes_ + else: + np_array[:] = j_array + return np_array + except DHError: + raise + except Exception as e: + raise DHError(e, f"failed to create a numpy array for the column {col_def.name}") from e + + +def _column_to_series(table: Table, col_def: Column, remap: Callable = None) -> pandas.Series: + """ Produce a copy of the specified column as a pandas.Series object. + + Args: + table (Table): the table + col_def (Column): the column definition + remap (callable): a user supplied function to map DH special NULL values, default is None + + Returns: + pandas.Series + + Raises: + DHError + """ + try: + data_col = table.j_table.getColumn(col_def.name) + np_array = _column_to_numpy_array(table, col_def, data_col.getDirect(), remap=remap) + return pandas.Series(data=np_array, copy=False) + + # if col_def.data_type == dtypes.DateTime: + # # # NOTE: I think that we should localize to UTC, and then let the user convert that if they want to... + # # # Note that localizing does not actually effect the underlying numpy array, + # # # but only a pandas construct on top + # # return pandas.Series(np_array).dt.tz_localize('UTC') # .dt.tz_convert(time.tzname[0]) + # else: + # return pandas.Series(data=np_array, copy=False) + except DHError: + raise + except Exception as e: + raise DHError(e, message="failed to pandas Series for {col}") from e + + +def to_pandas(table: Table, cols: List[str] = None, remaps: Dict[str, Callable[[T], R]] = None) -> pandas.DataFrame: + """ Produces a copy of a table object as a pandas.DataFrame. + + Note that the **entire table** is going to be cloned into memory, so the total number of entries in the table + should be considered before blindly doing this. For large tables (millions of entries or more?), consider + measures such as dropping unnecessary columns and/or down-selecting rows using the Deephaven query language + **before** converting. + + The table will be frozen prior to conversion. A table which updates mid-conversion would lead to errors + or other undesirable behavior. + + + Args: + table (Table): the source table + cols (List[str]): the source column names, default is None which means including all columns + remaps (dict): a dict of column names and user supplied functions to map DeepHaven special NULL values, + default is None + + Returns: + pandas.DataFrame + + Raise: + DHError + """ + try: + if table.is_refreshing: + table = _freeze_table(table) + + col_def_dict = {col.name: col for col in table.columns} + if not cols: + cols = list(col_def_dict.keys()) + else: + diff_set = set(cols) - set(col_def_dict.keys()) + if diff_set: + raise DHError(message=f"columns - {list(diff_set)} not found") + + data = {} + for col in cols: + remap = remaps.get(col, None) if remaps else None + + series = _column_to_series(table, col_def_dict[col], remap=remap) + data[col] = series + + dtype_set = set([v.dtype for k, v in data.items()]) + if len(dtype_set) == 1: + return pandas.DataFrame(data=np.stack([v.array for k, v in data.items()], axis=1), + columns=cols, + copy=False) + else: + return pandas.DataFrame(data=data, columns=cols, copy=False) + except DHError: + raise + except Exception as e: + raise DHError(e, "failed to create a Pandas DataFrame from table.") from e + + +def _make_input_column(col: str, np_array: np.ndarray, remap: Callable) -> InputColumn: + dtype = DType.from_np_dtype(np_array.dtype) + if dtype == dtypes.bool_: + bytes_ = np_array.astype(dtype=np.int8) + j_bytes = dtypes.byte.array_from(bytes_) + np_array = _JPrimitiveArrayConversionUtility.translateArrayByteToBoolean(j_bytes) + + if dtype == dtypes.DateTime: + longs = jpy.array('long', np_array.astype('datetime64[ns]').astype('int64')) + np_array = _JPrimitiveArrayConversionUtility.translateArrayLongToDateTime(longs) + + return InputColumn(name=col, data_type=dtype, input_data=np_array) + + +def to_table(df: pandas.DataFrame, cols: List[str] = None, remaps: Dict[str, Callable[[T], R]] = None) -> Table: + """ Creates a new Deephaven table from a pandas.DataFrame. + + Args: + df (DataFrame): the Pandas DataFrame instance + cols (List[str]): the dataframe column names, default is None which means including all columns in the dataframe + remaps (dict): a dict of column names and user supplied functions to convert values before storing them in the + Deephaven table, default is None + + Returns: + a Deephaven table + + Raise: + DHError + """ + + try: + if not cols: + cols = list(df) + else: + diff_set = set(cols) - set(list(df)) + if diff_set: + raise DHError(message=f"columns - {list(diff_set)} not found") + + input_cols = [] + for col in cols: + remap = remaps.get(col, None) if remaps else None + + input_cols.append(_make_input_column(col, df.get(col).values, remap=remap)) + + return new_table(cols=input_cols) + except DHError: + raise + except Exception as e: + raise DHError(e, "failed to create a Deephaven Table from a Pandas DataFrame.") from e diff --git a/pyintegration/tests/test_column.py b/pyintegration/tests/test_column.py index adc73f01626..9e1567cbf7b 100644 --- a/pyintegration/tests/test_column.py +++ b/pyintegration/tests/test_column.py @@ -69,8 +69,7 @@ def test_array_column(self): ] ) - test_table = test_table \ - .group_by(["StringColumn"]) + test_table = test_table.group_by(["StringColumn"]) self.assertIsNone(test_table.columns[0].component_type) self.assertEqual(test_table.columns[1].component_type, dtypes.double) diff --git a/pyintegration/tests/test_pandas.py b/pyintegration/tests/test_pandas.py new file mode 100644 index 00000000000..c67f7760ca6 --- /dev/null +++ b/pyintegration/tests/test_pandas.py @@ -0,0 +1,143 @@ +# +# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending +# +import unittest +from dataclasses import dataclass + +import jpy +import numpy +import numpy as np + +from deephaven2 import dtypes, new_table, DHError +from deephaven2.column import byte_col, char_col, short_col, bool_col, int_col, long_col, float_col, double_col, \ + string_col, datetime_col, pyobj_col, jobj_col +from deephaven2.constants import NULL_LONG, NULL_BYTE, NULL_BOOLEAN +from deephaven2.pandas import to_pandas, to_table +from tests.testbase import BaseTestCase + +JArrayList = jpy.get_type("java.util.ArrayList") + + +@dataclass +class CustomClass: + f1: int + f2: str + + +def remap_long(v): + if v == NULL_LONG: + return numpy.NAN + else: + return v + + +class PandasTestCase(BaseTestCase): + def setUp(self): + jobj1 = JArrayList() + jobj1.add(1) + jobj1.add(-1) + jobj2 = JArrayList() + jobj2.add(2) + jobj2.add(-2) + input_cols = [ + bool_col(name="Boolean", data=[True, False]), + byte_col(name="Byte", data=(1, -1)), + char_col(name="Char", data='-1'), + short_col(name="Short", data=[1, -1]), + int_col(name="Int", data=[1, -1]), + long_col(name="Long", data=[1, NULL_LONG]), + long_col(name="NPLong", data=np.array([1, -1], dtype=np.int8)), + float_col(name="Float", data=[1.01, -1.01]), + double_col(name="Double", data=[1.01, -1.01]), + string_col(name="String", data=["foo", "bar"]), + datetime_col(name="Datetime", data=[dtypes.DateTime(1), dtypes.DateTime(-1)]), + pyobj_col(name="PyObj", data=[CustomClass(1, "1"), CustomClass(-1, "-1")]), + pyobj_col(name="PyObj1", data=[[1, 2, 3], CustomClass(-1, "-1")]), + pyobj_col(name="PyObj2", data=[False, 'False']), + jobj_col(name="JObj", data=[jobj1, jobj2]), + ] + self.test_table = new_table(cols=input_cols) + + def tearDown(self) -> None: + self.test_table = None + + def test_to_pandas(self): + df = to_pandas(self.test_table) + self.assertEqual(len(df.columns), len(self.test_table.columns)) + self.assertEqual(df.size, 2 * len(self.test_table.columns)) + df_series = [df[col] for col in list(df.columns)] + for i, col in enumerate(self.test_table.columns): + self.assertEqual(col.data_type.np_type, df_series[i].dtype) + + def test_to_pandas_remaps(self): + df = to_pandas(self.test_table, cols=["Boolean", "Long"], + remaps={"Long": lambda x: float('nan') if x == NULL_LONG else x}) + self.assertEqual(df['Boolean'].dtype, numpy.int8) + self.assertEqual(df['Long'].dtype, numpy.float64) + + df2 = to_pandas(self.test_table, cols=["Boolean", "Long"], remaps={"Long": remap_long}) + self.assertTrue(df.equals(df2)) + + def test_vector_column(self): + strings = ["Str1", "Str1", "Str2", "Str2", "Str2"] + doubles = [1.0, 2.0, 4.0, 8.0, 16.0] + test_table = new_table([ + string_col("String", strings), + double_col("Doubles", doubles) + ] + ) + + test_table = test_table.group_by(["String"]) + df = to_pandas(test_table, cols=["String", "Doubles"]) + self.assertEqual(df['String'].dtype, numpy.object_) + self.assertEqual(df['Doubles'].dtype, numpy.object_) + + double_series = df['Doubles'] + self.assertEqual([1.0, 2.0], list(double_series[0].toArray())) + self.assertEqual([4.0, 8.0, 16.0], list(double_series[1].toArray())) + + def test_invalid_col_name(self): + with self.assertRaises(DHError) as cm: + to_pandas(self.test_table, cols=["boolean", "Long"]) + + self.assertIn("boolean", str(cm.exception)) + + def test_to_table(self): + input_cols = [ + bool_col(name="Boolean", data=[True, False]), + byte_col(name="Byte", data=(1, -1)), + char_col(name="Char", data='-1'), + short_col(name="Short", data=[1, -1]), + int_col(name="Int", data=[1, -1]), + long_col(name="Long", data=[1, NULL_LONG]), + long_col(name="NPLong", data=np.array([1, -1], dtype=np.int8)), + float_col(name="Float", data=[1.01, -1.01]), + double_col(name="Double", data=[1.01, -1.01]), + ] + test_table = new_table(cols=input_cols) + df = to_pandas(test_table) + table_from_df = to_table(df) + self.assertEqual(table_from_df, test_table) + + def test_to_table_boolean_with_none(self): + input_cols = [bool_col(name="Boolean", data=[True, None])] + table_with_null_bool = new_table(cols=input_cols) + + df = to_pandas(table_with_null_bool, remaps={"Boolean": lambda x: NULL_BYTE if not x else x * 1}) + print(df.info()) + table_from_df = to_table(df, remaps={"Boolean": lambda x: None if x == NULL_BYTE else bool(x)}) + self.assertEqual(table_from_df, table_with_null_bool) + + def test_to_table_datetime_with_none(self): + input_cols = [datetime_col(name="Datetime", data=[dtypes.DateTime(1), None])] + table_with_null_dt = new_table(cols=input_cols) + + df = to_pandas(table_with_null_dt) + print(df) + print(df.info()) + table_from_df = to_table(df) + self.assertEqual(table_from_df, table_with_null_dt) + + +if __name__ == '__main__': + unittest.main()