Skip to content

Commit

Permalink
pandas module for Pandas integration
Browse files Browse the repository at this point in the history
1. to_pandas() and to_table() to convert between DH tables and DF
2. test cases showed some lingering issues with DH NULL handling
3. performance is potentially an issue that has yet been addressed.
  • Loading branch information
jmao-denver committed Feb 1, 2022
1 parent 69d3560 commit aae3edd
Show file tree
Hide file tree
Showing 6 changed files with 385 additions and 22 deletions.
2 changes: 2 additions & 0 deletions pyintegration/deephaven2/column.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#
# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending
#
""" This module defines the Column class that represents column definitions on Deephaven tables. """

from dataclasses import dataclass, field
from enum import Enum
from typing import Sequence, Any
Expand Down
1 change: 1 addition & 0 deletions pyintegration/deephaven2/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class Inference(Enum):
""" Configured parsers: BOOL, DATETIME, CHAR, STRING, SECONDS.
"""


def _build_header(header: Dict[str, DType] = None):
if not header:
return None
Expand Down
50 changes: 30 additions & 20 deletions pyintegration/deephaven2/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
"""
from __future__ import annotations

from typing import Any, Sequence, Callable, Iterable
from typing import Iterable, Any, Sequence, Callable

import jpy
import numpy as np

from deephaven2 import DHError

Expand All @@ -34,15 +35,24 @@ def from_jtype(cls, j_class: Any) -> DType:
j_name = j_class.getName()
dtype = DType._j_name_type_map.get(j_name)
if not dtype:
return cls(j_name=j_name, j_type=j_class)
return cls(j_name=j_name, j_type=j_class, np_type=np.object_)
else:
return dtype

def __init__(self, j_name: str, j_type: Any = None, qst_type: Any = None, is_primitive: bool = False):
@classmethod
def from_np_dtype(cls, np_dtype: np.dtype):
for _, dtype in DType._j_name_type_map.items():
if np.dtype(dtype.np_type) == np_dtype and dtype.np_type != np.object_:
return dtype

return DType._j_name_type_map['org.jpy.PyObject']

def __init__(self, j_name: str, j_type: Any = None, qst_type: Any = None, is_primitive: bool = False, np_type=None):
self.j_name = j_name
self.j_type = j_type if j_type else jpy.get_type(j_name)
self.qst_type = qst_type if qst_type else _qst_custom_type(j_name)
self.is_primitive = is_primitive
self.np_type = np_type

DType._j_name_type_map[j_name] = self

Expand Down Expand Up @@ -104,40 +114,40 @@ class CharDType(DType):
""" The class for char type. """

def __init__(self):
super().__init__(j_name="char", qst_type=_JQstType.charType(), is_primitive=True)
super().__init__(j_name="char", qst_type=_JQstType.charType(), is_primitive=True, np_type=np.dtype('uint16'))

def array_from(self, seq: Sequence, remap: Callable[[Any], Any] = None):
if isinstance(seq, str) and not remap:
return super().array_from(seq, remap=ord)
return super().array_from(seq, remap)


bool_ = DType(j_name="java.lang.Boolean", qst_type=_JQstType.booleanType())
byte = DType(j_name="byte", qst_type=_JQstType.byteType(), is_primitive=True)
bool_ = DType(j_name="java.lang.Boolean", qst_type=_JQstType.booleanType(), np_type=np.bool_)
byte = DType(j_name="byte", qst_type=_JQstType.byteType(), is_primitive=True, np_type=np.int8)
int8 = byte
short = DType(j_name="short", qst_type=_JQstType.shortType(), is_primitive=True)
short = DType(j_name="short", qst_type=_JQstType.shortType(), is_primitive=True, np_type=np.int16)
int16 = short
char = CharDType()
int_ = DType(j_name="int", qst_type=_JQstType.intType(), is_primitive=True)
int32 = int_
long = DType(j_name="long", qst_type=_JQstType.longType(), is_primitive=True)
int32 = DType(j_name="int", qst_type=_JQstType.intType(), is_primitive=True, np_type=np.int32)
long = DType(j_name="long", qst_type=_JQstType.longType(), is_primitive=True, np_type=np.int64)
int64 = long
float_ = DType(j_name="float", qst_type=_JQstType.floatType(), is_primitive=True)
int_ = long
float_ = DType(j_name="float", qst_type=_JQstType.floatType(), is_primitive=True, np_type=np.float32)
single = float_
float32 = float_
double = DType(j_name="double", qst_type=_JQstType.doubleType(), is_primitive=True)
double = DType(j_name="double", qst_type=_JQstType.doubleType(), is_primitive=True, np_type=np.float64)
float64 = double
string = DType(j_name="java.lang.String", qst_type=_JQstType.stringType())
BigDecimal = DType(j_name="java.math.BigDecimal")
StringSet = DType(j_name="io.deephaven.stringset.StringSet")
PyObject = DType(j_name="org.jpy.PyObject")
JObject = DType(j_name="java.lang.Object")
DateTime = DType(j_name="io.deephaven.time.DateTime")
Period = DType(j_name="io.deephaven.time.Period")
string = DType(j_name="java.lang.String", qst_type=_JQstType.stringType(), np_type=np.object_)
BigDecimal = DType(j_name="java.math.BigDecimal", np_type=np.object_)
StringSet = DType(j_name="io.deephaven.stringset.StringSet", np_type=np.object_)
DateTime = DType(j_name="io.deephaven.time.DateTime", np_type=np.dtype("datetime64[ns]"))
Period = DType(j_name="io.deephaven.time.Period", np_type=np.object_)
PyObject = DType(j_name="org.jpy.PyObject", np_type=np.object_)
JObject = DType(j_name="java.lang.Object", np_type=np.object_)


def j_array_list(values: Iterable):
j_list = jpy.get_type("java.util.ArrayList")(len(values))
j_list = jpy.get_type("java.util.ArrayList")(len(list(values)))
try:
for v in values:
j_list.add(v)
Expand Down
208 changes: 208 additions & 0 deletions pyintegration/deephaven2/pandas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
#
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
#
""" THis module supports the conversion between Deephaven tables and Pandas DataFrames. """
from typing import List, Callable, TypeVar, Dict

import jpy
import numpy as np
import pandas

from deephaven2 import DHError, dtypes, empty_table, new_table
from deephaven2.column import Column, InputColumn
from deephaven2.dtypes import DType
from deephaven2.table import Table

_JPrimitiveArrayConversionUtility = jpy.get_type("io.deephaven.integrations.common.PrimitiveArrayConversionUtility")
T = TypeVar("T")
R = TypeVar("R")


# _JImmutableObjectArraySource = jpy.get_type(
# "io.deephaven.engine.table.impl.sources.immutable.ImmutableObjectArraySource")
# _JImmutableLongAsDateTimeColumnSource = jpy.get_type(
# "io.deephaven.engine.table.impl.sources.immutable.ImmutableLongAsDateTimeColumnSource")
# _JTableTools = jpy.get_type("io.deephaven.engine.util.TableTools")
#
#
#
# def _to_column_name(pd_name: str):
# tmp_name = re.sub("\W+", " ", str(pd_name)).strip()
# return re.sub("\s+", "_", tmp_name)
#
#

def _freeze_table(table: Table):
return empty_table(0).snapshot(table, True)


def _column_to_numpy_array(table: Table, col_def: Column, j_array, remap: Callable) -> np.ndarray:
try:
# TODO not happy with the current approach on Boolean
#
# TO be discussed with Chip
# 1. allow users to specify np.dtypes for specific columns to override the default mapping, might
# be a overkill just for Boolean
# 2. the remap works on individual values, could be slow, support vectorized versions?
# 3. md-array ?

if remap:
mapped_list = [remap(x) for x in j_array]
return np.array(mapped_list)

if col_def.data_type in {dtypes.char, dtypes.string}:
return np.array(j_array)

np_array = np.empty(table.size, dtype=np.dtype(col_def.data_type.np_type))
if col_def.data_type == dtypes.DateTime:
longs = _JPrimitiveArrayConversionUtility.translateArrayDateTimeToLong(j_array)
np_array[:] = longs
elif col_def.data_type == dtypes.bool_:
bytes_ = _JPrimitiveArrayConversionUtility.translateArrayBooleanToByte(j_array)
np_array[:] = bytes_
else:
np_array[:] = j_array
return np_array
except DHError:
raise
except Exception as e:
raise DHError(e, f"failed to create a numpy array for the column {col_def.name}") from e


def _column_to_series(table: Table, col_def: Column, remap: Callable = None) -> pandas.Series:
""" Produce a copy of the specified column as a pandas.Series object.
Args:
table (Table): the table
col_def (Column): the column definition
remap (callable): a user supplied function to map DH special NULL values, default is None
Returns:
pandas.Series
Raises:
DHError
"""
try:
data_col = table.j_table.getColumn(col_def.name)
np_array = _column_to_numpy_array(table, col_def, data_col.getDirect(), remap=remap)
return pandas.Series(data=np_array, copy=False)

# if col_def.data_type == dtypes.DateTime:
# # # NOTE: I think that we should localize to UTC, and then let the user convert that if they want to...
# # # Note that localizing does not actually effect the underlying numpy array,
# # # but only a pandas construct on top
# # return pandas.Series(np_array).dt.tz_localize('UTC') # .dt.tz_convert(time.tzname[0])
# else:
# return pandas.Series(data=np_array, copy=False)
except DHError:
raise
except Exception as e:
raise DHError(e, message="failed to pandas Series for {col}") from e


def to_pandas(table: Table, cols: List[str] = None, remaps: Dict[str, Callable[[T], R]] = None) -> pandas.DataFrame:
""" Produces a copy of a table object as a pandas.DataFrame.
Note that the **entire table** is going to be cloned into memory, so the total number of entries in the table
should be considered before blindly doing this. For large tables (millions of entries or more?), consider
measures such as dropping unnecessary columns and/or down-selecting rows using the Deephaven query language
**before** converting.
The table will be frozen prior to conversion. A table which updates mid-conversion would lead to errors
or other undesirable behavior.
Args:
table (Table): the source table
cols (List[str]): the source column names, default is None which means including all columns
remaps (dict): a dict of column names and user supplied functions to map DeepHaven special NULL values,
default is None
Returns:
pandas.DataFrame
Raise:
DHError
"""
try:
if table.is_refreshing:
table = _freeze_table(table)

col_def_dict = {col.name: col for col in table.columns}
if not cols:
cols = list(col_def_dict.keys())
else:
diff_set = set(cols) - set(col_def_dict.keys())
if diff_set:
raise DHError(message=f"columns - {list(diff_set)} not found")

data = {}
for col in cols:
remap = remaps.get(col, None) if remaps else None

series = _column_to_series(table, col_def_dict[col], remap=remap)
data[col] = series

dtype_set = set([v.dtype for k, v in data.items()])
if len(dtype_set) == 1:
return pandas.DataFrame(data=np.stack([v.array for k, v in data.items()], axis=1),
columns=cols,
copy=False)
else:
return pandas.DataFrame(data=data, columns=cols, copy=False)
except DHError:
raise
except Exception as e:
raise DHError(e, "failed to create a Pandas DataFrame from table.") from e


def _make_input_column(col: str, np_array: np.ndarray, remap: Callable) -> InputColumn:
dtype = DType.from_np_dtype(np_array.dtype)
if dtype == dtypes.bool_:
bytes_ = np_array.astype(dtype=np.int8)
j_bytes = dtypes.byte.array_from(bytes_)
np_array = _JPrimitiveArrayConversionUtility.translateArrayByteToBoolean(j_bytes)

if dtype == dtypes.DateTime:
longs = jpy.array('long', np_array.astype('datetime64[ns]').astype('int64'))
np_array = _JPrimitiveArrayConversionUtility.translateArrayLongToDateTime(longs)

return InputColumn(name=col, data_type=dtype, input_data=np_array)


def to_table(df: pandas.DataFrame, cols: List[str] = None, remaps: Dict[str, Callable[[T], R]] = None) -> Table:
""" Creates a new Deephaven table from a pandas.DataFrame.
Args:
df (DataFrame): the Pandas DataFrame instance
cols (List[str]): the dataframe column names, default is None which means including all columns in the dataframe
remaps (dict): a dict of column names and user supplied functions to convert values before storing them in the
Deephaven table, default is None
Returns:
a Deephaven table
Raise:
DHError
"""

try:
if not cols:
cols = list(df)
else:
diff_set = set(cols) - set(list(df))
if diff_set:
raise DHError(message=f"columns - {list(diff_set)} not found")

input_cols = []
for col in cols:
remap = remaps.get(col, None) if remaps else None

input_cols.append(_make_input_column(col, df.get(col).values, remap=remap))

return new_table(cols=input_cols)
except DHError:
raise
except Exception as e:
raise DHError(e, "failed to create a Deephaven Table from a Pandas DataFrame.") from e
3 changes: 1 addition & 2 deletions pyintegration/tests/test_column.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ def test_array_column(self):
]
)

test_table = test_table \
.group_by(["StringColumn"])
test_table = test_table.group_by(["StringColumn"])

self.assertIsNone(test_table.columns[0].component_type)
self.assertEqual(test_table.columns[1].component_type, dtypes.double)
Expand Down
Loading

0 comments on commit aae3edd

Please sign in to comment.