Skip to content

Commit

Permalink
Migrate the learn module to v2
Browse files Browse the repository at this point in the history
  • Loading branch information
jmao-denver committed Feb 22, 2022
1 parent 421b225 commit 2256ab7
Show file tree
Hide file tree
Showing 3 changed files with 438 additions and 0 deletions.
175 changes: 175 additions & 0 deletions pyintegration/deephaven2/learn/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
#
# Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
#
""" Deephaven's learn module provides utilities for efficient data transfer between Deephaven tables and Python objects,
as well as a framework for using popular machine-learning / deep-learning libraries with Deephaven tables.
"""
from typing import List, Union, Callable, Type

import jpy

from deephaven2 import DHError
from deephaven2.table import Table

_JLearnInput = jpy.get_type("io.deephaven.integrations.learn.Input")
_JLearnOutput = jpy.get_type("io.deephaven.integrations.learn.Output")
_JLearnComputer = jpy.get_type("io.deephaven.integrations.learn.Computer")
_JLearnScatterer = jpy.get_type("io.deephaven.integrations.learn.Scatterer")


class Input:
""" Input specifies how to gather data from a Deephaven table into an object. """

def __init__(self, col_names: Union[str, List[str]], gather_func: Callable):
""" Initializes an Input object with the given arguments.
Args:
col_names (str|list) : column name or list of column names from which to gather input.
gather_func (Callable): function that determines how input gets transformed into an object.
"""
self.input = _JLearnInput(col_names, gather_func)

def __str__(self):
""" Returns the Input object as a string. """
return self.input.toString()


class Output:
""" Output specifies how to scatter data from an object into a table column. """

def __init__(self, col_name: str, scatter_func: Callable, col_type: Type):
""" Initializes an Output object with the given arguments.
Args:
col_name (str) : name of the new column that will store results.
scatter_func (Callable): function that determines how data is taken from an object and placed into a
Deephaven table column.
col_type (Type) : desired data type of the new output column, default is None
"""
self.output = _JLearnOutput(col_name, scatter_func, col_type)

def __str__(self):
""" Returns the Output object as a string. """
return self.output.toString()


def _validate(inputs: Input, outputs: Output, table: Table):
""" Ensures that all input columns exist in the table, and that no output column names already exist in the table.
Args:
inputs (Input) : list of Inputs to validate.
outputs (Output) : list of Outputs to validate.
table (Table) : table to check Input and Output columns against.
Raises:
ValueError : if at least one of the Input columns does not exist in the table.
ValueError : if at least one of the Output columns already exists in the table.
ValueError : if there are duplicates in the Output column names.
"""
input_columns_list = [input_.input.getColNames()[i] for input_ in inputs for i in
range(len(input_.input.getColNames()))]
input_columns = set(input_columns_list)
table_columns = {col.name for col in table.columns}
if table_columns >= input_columns:
if outputs is not None:
output_columns_list = [output.output.getColName() for output in outputs]
output_columns = set(output_columns_list)
if len(output_columns_list) != len(output_columns):
repeats = set([column for column in output_columns_list if output_columns_list.count(column) > 1])
raise ValueError(f"Cannot assign the same column name {repeats} to multiple columns.")
elif table_columns & output_columns:
overlap = output_columns & table_columns
raise ValueError(
f"The columns {overlap} already exist in the table. Please choose Output column names that are "
f"not already in the table.")
else:
difference = input_columns - table_columns
raise ValueError(f"Cannot find columns {difference} in the table.")


def _create_non_conflicting_col_name(table: Table, base_col_name: str) -> str:
""" Creates a column name that is not present in the table.
Args:
table (Table): table to check column name against.
base_col_name (str): base name to create a column from.
Returns:
column name that is not present in the table.
"""
table_col_names = set([col.name for col in table.columns])
if base_col_name not in table_col_names:
return base_col_name
else:
i = 0
while base_col_name in table_col_names:
base_col_name = base_col_name + str(i)

return base_col_name


def learn(table: Table = None, model_func: Callable = None, inputs: List[Input] = [], outputs: List[Output] = [],
batch_size: int = None):
""" Learn gathers data from multiple rows of the input table, performs a calculation, and scatters values from the
calculation into an output table. This is a common computing paradigm for artificial intelligence, machine learning,
and deep learning.
Args:
table (Table): the Deephaven table to perform computations on.
model_func (Callable): function that performs computations on the table.
inputs (List[Input]): list of Input objects that determine how data gets extracted from the table.
outputs (List[Output]): list of Output objects that determine how data gets scattered back into the results table.
batch_size (int): maximum number of rows for which model_func is evaluated at once.
Returns:
a Table with added columns containing the results of evaluating model_func.
Raises:
DHError
"""

try:
_validate(inputs, outputs, table)

if batch_size is None:
raise ValueError("Batch size cannot be inferred. Please specify a batch size.")

# TODO: When ticket #1072 is resolved, the following code should be replaced with
# Globals["__computer"] = _Computer_(table, model_func, [input.input for input in inputs], batch_size)
# and remove from globals at the end of function
(jpy.get_type("io.deephaven.engine.table.lang.QueryScope")
.addParam("__computer", _JLearnComputer(table, model_func,
[input_.input for input_ in inputs],
batch_size)))

future_offset = _create_non_conflicting_col_name(table, "__FutureOffset")
clean = _create_non_conflicting_col_name(table, "__CleanComputer")

if outputs is not None:
__scatterer = _JLearnScatterer([output.output for output in outputs])
# TODO: Similarly at resolution of #1072, replace the following code with
# Globals["__scatterer"] = __scatterer
# and remove from Globals at end of function
jpy.get_type("io.deephaven.engine.table.lang.QueryScope").addParam("__scatterer", __scatterer)

return (table
.update(formulas=[f"{future_offset} = __computer.compute(k)", ])
.update(formulas=[__scatterer.generateQueryStrings(f"{future_offset}"), ])
.update(formulas=[f"{clean} = __computer.clear()", ])
.drop_columns(cols=[f"{future_offset}", f"{clean}", ]))

result = _create_non_conflicting_col_name(table, "__Result")

return (table
.update(formulas=[
f"{future_offset} = __computer.compute(k)",
f"{result} = {future_offset}.getFuture().get()",
f"{clean} = __computer.clear()",
])
.drop_columns(cols=[
f"{future_offset}",
f"{clean}",
f"{result}",
]))
except Exception as e:
raise DHError(e, "failed to complete the learn function.") from e
90 changes: 90 additions & 0 deletions pyintegration/deephaven2/learn/gather.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#
# Copyright (c) 2016 - 2022 Deephaven Data Labs and Patent Pending
#
""" Utilities for gathering Deephaven table data into Python objects """
import enum
from typing import Any, Type

import jpy
import numpy as np

from deephaven2 import DHError

_JGatherer = jpy.get_type("io.deephaven.integrations.learn.gather.NumPy")


class MemoryLayout(enum.Enum):
""" Memory layouts for an array. """

ROW_MAJOR = True
""" Row-major memory layout."""
COLUMN_MAJOR = False
""" Column-major memory layout."""
C = True
""" Memory layout consistent with C arrays (row-major)."""
FORTRAN = False
""" Memory layout consistent with Fortran arrays (column-major)."""

def __init__(self, is_row_major):
self.is_row_major = is_row_major


def _convert_to_numpy_dtype(np_type: Type) -> Type:
""" Converts an input type to the corresponding NumPy data type. """
if np_type.__module__ == np.__name__:
return np_type
elif np_type == bool:
np_type = np.bool_
elif np_type == float:
np_type = np.double
elif np_type == int:
np_type = np.intc
else:
raise ValueError(f"{np_type} is not a data type that can be converted to a NumPy dtype.")
return np_type


def table_to_numpy_2d(row_set, col_set, order: MemoryLayout = MemoryLayout.ROW_MAJOR, np_type=np.intc):
""" Converts Deephaven table data to a 2d NumPy array of the appropriate size
Args:
row_set: a RowSequence describing the number of rows in the table
col_set: ColumnSources describing which columns to copy
order (MemoryLayout): the desired memory layout of the output array
np_type: the desired NumPy data type of the output NumPy array
Returns
a np.ndarray
Raises:
DHError
"""

try:
np_type = _convert_to_numpy_dtype(np_type)

if np_type == np.byte:
buffer = _JGatherer.tensorBuffer2DByte(row_set, col_set, order.is_row_major)
elif np_type == np.short:
buffer = _JGatherer.tensorBuffer2DShort(row_set, col_set, order.is_row_major)
elif np_type == np.intc:
buffer = _JGatherer.tensorBuffer2DInt(row_set, col_set, order.is_row_major)
elif np_type == np.int_:
buffer = _JGatherer.tensorBuffer2DLong(row_set, col_set, order.is_row_major)
elif np_type == np.single:
buffer = _JGatherer.tensorBuffer2DFloat(row_set, col_set, order.is_row_major)
elif np_type == np.double:
buffer = _JGatherer.tensorBuffer2DDouble(row_set, col_set, order.is_row_major)
else:
raise ValueError(f"Data type {np_type} is not supported.")

tensor = np.frombuffer(buffer, dtype=np_type)

if order.is_row_major:
tensor.shape = (len(col_set), row_set.intSize())
return tensor.T
else:
tensor.shape = (row_set.intSize(), len(col_set))
return tensor
except Exception as e:
raise DHError(e, "failed to convert a table to 2d numpy array.") from e
Loading

0 comments on commit 2256ab7

Please sign in to comment.