Skip to content

Commit

Permalink
pandas module for Pandas integration, fixes 1826 (#1919)
Browse files Browse the repository at this point in the history
* pandas/numpy modules for Pandas/numpy integration
  • Loading branch information
jmao-denver authored Feb 24, 2022
1 parent c10ef21 commit 17d060a
Show file tree
Hide file tree
Showing 12 changed files with 657 additions and 29 deletions.
2 changes: 2 additions & 0 deletions pyintegration/deephaven2/column.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#
# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending
#
""" This module implements the Column class and functions that work with Columns. """

from dataclasses import dataclass, field
from enum import Enum
from typing import Sequence
Expand Down
4 changes: 3 additions & 1 deletion pyintegration/deephaven2/constants.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#
# Copyright (c) 2016-2021 Deephaven Data Labs and Patent Pending
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
#
""" The module defines the global constants including Deephaven's special numerical values. Other constants are defined
at the individual module level because they are only locally applicable. """

from enum import Enum, auto
import jpy
Expand Down
5 changes: 4 additions & 1 deletion pyintegration/deephaven2/dherror.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class DHError(Exception):
"""

def __init__(self, cause=None, message=""):
super().__init__()
super().__init__(cause, message)
self._message = message
self._traceback = traceback.format_exc()

Expand All @@ -49,6 +49,9 @@ def __init__(self, cause=None, message=""):
if for_compact_tb:
self._compact_tb.append(tb_ln)

if not self._root_cause:
self._root_cause = self._message

@property
def root_cause(self):
""" The root cause of the exception. """
Expand Down
12 changes: 6 additions & 6 deletions pyintegration/deephaven2/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ def __call__(self, *args, **kwargs):
int32 = DType(j_name="int", qst_type=_JQstType.intType(), is_primitive=True, np_type=np.int32)
long = DType(j_name="long", qst_type=_JQstType.longType(), is_primitive=True, np_type=np.int64)
int64 = long
int_ = int32
float_ = DType(j_name="float", qst_type=_JQstType.floatType(), is_primitive=True, np_type=np.float32)
single = float_
float32 = float_
double = DType(j_name="double", qst_type=_JQstType.doubleType(), is_primitive=True, np_type=np.float64)
float64 = double
int_ = long
float32 = DType(j_name="float", qst_type=_JQstType.floatType(), is_primitive=True, np_type=np.float32)
single = float32
float64 = DType(j_name="double", qst_type=_JQstType.doubleType(), is_primitive=True, np_type=np.float64)
double = float64
float_ = float64
string = DType(j_name="java.lang.String", qst_type=_JQstType.stringType())
BigDecimal = DType(j_name="java.math.BigDecimal")
StringSet = DType(j_name="io.deephaven.stringset.StringSet")
Expand Down
179 changes: 179 additions & 0 deletions pyintegration/deephaven2/numpy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
#
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
#
""" This module supports the conversion between Deephaven tables and numpy arrays. """
from typing import List

import jpy
import numpy as np
import re

from deephaven2 import DHError, dtypes, empty_table, new_table
from deephaven2.column import Column, InputColumn
from deephaven2.dtypes import DType
from deephaven2.table import Table

_JPrimitiveArrayConversionUtility = jpy.get_type("io.deephaven.integrations.common.PrimitiveArrayConversionUtility")


def freeze_table(table: Table) -> Table:
""" Returns a static snapshot of the source ticking table.
Args:
table (Table): the source table
Returns:
a new table
"""
return empty_table(0).snapshot(table, True)


def _to_column_name(name: str) -> str:
""" Transforms the given name string into a valid table column name. """
tmp_name = re.sub("\W+", " ", str(name)).strip()
return re.sub("\s+", "_", tmp_name)


def _column_to_numpy_array(col_def: Column, j_array: jpy.JType) -> np.ndarray:
""" Produces a numpy array from the given Java array and the Table column definition. """
try:
if col_def.data_type in {dtypes.char, dtypes.string}:
return np.array(j_array)

if col_def.data_type == dtypes.DateTime:
longs = _JPrimitiveArrayConversionUtility.translateArrayDateTimeToLong(j_array)
np_long_array = np.frombuffer(longs, np.int64)
np_array = np_long_array.view(col_def.data_type.np_type)
np_array[:] = np_long_array
elif col_def.data_type == dtypes.bool_:
bytes_ = _JPrimitiveArrayConversionUtility.translateArrayBooleanToByte(j_array)
np_array = np.frombuffer(bytes_, col_def.data_type.np_type)
elif col_def.data_type.np_type is not np.object_:
try:
np_array = np.frombuffer(j_array, col_def.data_type.np_type)
except:
np_array = np.array(j_array, np.object_)
else:
np_array = np.array(j_array, np.object_)

return np_array
except DHError:
raise
except Exception as e:
raise DHError(e, f"failed to create a numpy array for the column {col_def.name}") from e


def _columns_to_2d_numpy_array(col_def: Column, j_arrays: List[jpy.JType]) -> np.ndarray:
""" Produces a 2d numpy array from the given Java arrays of the same component type and the Table column
definition """
try:
if col_def.data_type.is_primitive:
np_array = np.empty(shape=(len(j_arrays[0]), len(j_arrays)), dtype=col_def.data_type.np_type)
for i, j_array in enumerate(j_arrays):
np_array[:, i] = np.frombuffer(j_array, col_def.data_type.np_type)
return np_array
else:
np_arrays = []
for j_array in j_arrays:
np_arrays.append(_column_to_numpy_array(col_def=col_def, j_array=j_array))
return np.stack(np_arrays, axis=1)
except DHError:
raise
except Exception as e:
raise DHError(e, f"failed to create a numpy array for the column {col_def.name}") from e


def _make_input_column(col: str, np_array: np.ndarray) -> InputColumn:
""" Creates a InputColumn with the given column name and the numpy array. """
dtype = dtypes.from_np_dtype(np_array.dtype)
if dtype == dtypes.bool_:
bytes_ = np_array.astype(dtype=np.int8)
j_bytes = dtypes.array(dtypes.byte, bytes_)
np_array = _JPrimitiveArrayConversionUtility.translateArrayByteToBoolean(j_bytes)

if dtype == dtypes.DateTime:
longs = jpy.array('long', np_array.astype('datetime64[ns]').astype('int64'))
np_array = _JPrimitiveArrayConversionUtility.translateArrayLongToDateTime(longs)

return InputColumn(name=_to_column_name(col), data_type=dtype, input_data=np_array)


def to_numpy(table: Table, cols: List[str] = None) -> np.ndarray:
""" Produces a numpy array from a table.
Note that the **entire table** is going to be cloned into memory, so the total number of entries in the table
should be considered before blindly doing this. For large tables, consider using the Deephaven query language to
select a subset of the table **before** using this method.
Args:
table (Table): the source table
cols (List[str]): the source column names, default is None which means include all columns
Returns:
a numpy ndarray
Raise:
DHError
"""

try:
if table.is_refreshing:
table = freeze_table(table)

col_def_dict = {col.name: col for col in table.columns}
if not cols:
cols = list(col_def_dict.keys())
else:
diff_set = set(cols) - set(col_def_dict.keys())
if diff_set:
raise DHError(message=f"columns - {list(diff_set)} not found")

col_defs = [col_def_dict[col] for col in cols]
if len(set([col_def.data_type for col_def in col_defs])) != 1:
raise DHError(message="columns must be of the same data type.")

j_arrays = []
for col_def in col_defs:
data_col = table.j_table.getColumn(col_def.name)
j_arrays.append(data_col.getDirect())
return _columns_to_2d_numpy_array(col_defs[0], j_arrays)
except DHError:
raise
except Exception as e:
raise DHError(e, "failed to create a Numpy array from the table column.") from e


def to_table(np_array: np.ndarray, cols: List[str]) -> Table:
""" Creates a new table from a numpy array.
Args:
np_array (np.ndarray): the numpy array
cols (List[str]): the table column names that will be assigned to each column in the numpy array
Returns:
a Deephaven table
Raise:
DHError
"""

try:
_, *dims = np_array.shape
if dims:
if not cols or len(cols) != dims[0]:
raise DHError(
message=f"the number of array columns {dims[0]} doesn't match "
f"the number of column names {len(cols)}")

input_cols = []
if len(cols) == 1:
input_cols.append(_make_input_column(cols[0], np_array))
else:
for i, col in enumerate(cols):
input_cols.append(_make_input_column(col, np_array[:, [i]]))

return new_table(cols=input_cols)
except DHError:
raise
except Exception as e:
raise DHError(e, "failed to create a Deephaven Table from a Pandas DataFrame.") from e
121 changes: 121 additions & 0 deletions pyintegration/deephaven2/pandas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
#
""" This module supports the conversion between Deephaven tables and Pandas DataFrames. """
import re
from typing import List

import jpy
import numpy as np
import pandas

from deephaven2 import DHError, new_table, dtypes
from deephaven2.column import Column
from deephaven2.numpy import _column_to_numpy_array, freeze_table, _make_input_column
from deephaven2.table import Table

_JPrimitiveArrayConversionUtility = jpy.get_type("io.deephaven.integrations.common.PrimitiveArrayConversionUtility")


def _column_to_series(table: Table, col_def: Column) -> pandas.Series:
""" Produce a copy of the specified column as a pandas.Series object.
Args:
table (Table): the table
col_def (Column): the column definition
Returns:
pandas.Series
Raises:
DHError
"""
try:
data_col = table.j_table.getColumn(col_def.name)
np_array = _column_to_numpy_array(col_def, data_col.getDirect())

return pandas.Series(data=np_array, copy=False)
except DHError:
raise
except Exception as e:
raise DHError(e, message="failed to create apandas Series for {col}") from e


def to_pandas(table: Table, cols: List[str] = None) -> pandas.DataFrame:
""" Produces a pandas.DataFrame from a table.
Note that the **entire table** is going to be cloned into memory, so the total number of entries in the table
should be considered before blindly doing this. For large tables, consider using the Deephaven query language to
select a subset of the table **before** using this method.
Args:
table (Table): the source table
cols (List[str]): the source column names, default is None which means include all columns
Returns:
pandas.DataFrame
Raise:
DHError
"""
try:
if table.is_refreshing:
table = freeze_table(table)

col_def_dict = {col.name: col for col in table.columns}
if not cols:
cols = list(col_def_dict.keys())
else:
diff_set = set(cols) - set(col_def_dict.keys())
if diff_set:
raise DHError(message=f"columns - {list(diff_set)} not found")

data = {}
for col in cols:
series = _column_to_series(table, col_def_dict[col])
data[col] = series

dtype_set = set([v.dtype for k, v in data.items()])
if len(dtype_set) == 1:
return pandas.DataFrame(data=np.stack([v.array for k, v in data.items()], axis=1),
columns=cols,
copy=False)
else:
return pandas.DataFrame(data=data, columns=cols, copy=False)
except DHError:
raise
except Exception as e:
raise DHError(e, "failed to create a Pandas DataFrame from table.") from e


def to_table(df: pandas.DataFrame, cols: List[str] = None) -> Table:
""" Creates a new table from a pandas.DataFrame.
Args:
df (DataFrame): the Pandas DataFrame instance
cols (List[str]): the dataframe column names, default is None which means including all columns in the dataframe
Returns:
a Deephaven table
Raise:
DHError
"""

try:
if not cols:
cols = list(df)
else:
diff_set = set(cols) - set(list(df))
if diff_set:
raise DHError(message=f"columns - {list(diff_set)} not found")

input_cols = []
for col in cols:
input_cols.append(_make_input_column(col, df.get(col).values))

return new_table(cols=input_cols)
except DHError:
raise
except Exception as e:
raise DHError(e, "failed to create a Deephaven Table from a Pandas DataFrame.") from e
12 changes: 3 additions & 9 deletions pyintegration/tests/test_column.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@
import unittest
from dataclasses import dataclass

import jpy

from deephaven2 import DHError, dtypes, new_table
from deephaven2._jcompat import j_array_list
from deephaven2.column import byte_col, char_col, short_col, bool_col, int_col, long_col, float_col, double_col, \
string_col, datetime_col, jobj_col, ColumnType
from tests.testbase import BaseTestCase

JArrayList = jpy.get_type("java.util.ArrayList")


class ColumnTestCase(BaseTestCase):

Expand All @@ -22,9 +19,7 @@ def test_column_type(self):
self.assertEqual(ColumnType.NORMAL, ColumnType(normal_type))

def test_column_error(self):
jobj = JArrayList()
jobj.add(1)
jobj.add(-1)
jobj = j_array_list([1, -1])
with self.assertRaises(DHError) as cm:
bool_input_col = bool_col(name="Boolean", data=[True, 'abc'])

Expand Down Expand Up @@ -69,8 +64,7 @@ def test_array_column(self):
]
)

test_table = test_table \
.group_by(["StringColumn"])
test_table = test_table.group_by(["StringColumn"])

self.assertIsNone(test_table.columns[0].component_type)
self.assertEqual(test_table.columns[1].component_type, dtypes.double)
Expand Down
Loading

0 comments on commit 17d060a

Please sign in to comment.