Skip to content

Commit

Permalink
PERF: put BlockManager constructor in cython (#40842)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbrockmendel authored Apr 19, 2021
1 parent 5525561 commit 69d7663
Show file tree
Hide file tree
Showing 11 changed files with 276 additions and 146 deletions.
13 changes: 13 additions & 0 deletions pandas/_libs/internals.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ from pandas._typing import (
T,
)

from pandas import Index
from pandas.core.internals.blocks import Block as B

def slice_len(slc: slice, objlen: int = ...) -> int: ...


Expand Down Expand Up @@ -66,3 +69,13 @@ class NumpyBlock(SharedBlock):

class Block(SharedBlock):
...

class BlockManager:
blocks: tuple[B, ...]
axes: list[Index]
_known_consolidated: bool
_is_consolidated: bool
_blknos: np.ndarray
_blklocs: np.ndarray

def __init__(self, blocks: tuple[B, ...], axes: list[Index], verify_integrity=True): ...
77 changes: 77 additions & 0 deletions pandas/_libs/internals.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -533,3 +533,80 @@ cdef class Block(SharedBlock):
# set values here the (implicit) call to SharedBlock.__cinit__ will
# set placement and ndim
self.values = values


@cython.freelist(64)
cdef class BlockManager:
cdef:
public tuple blocks
public list axes
public bint _known_consolidated, _is_consolidated
public ndarray _blknos, _blklocs

def __cinit__(self, blocks, axes, verify_integrity=True):
if isinstance(blocks, list):
# Backward compat for e.g. pyarrow
blocks = tuple(blocks)

self.blocks = blocks
self.axes = axes.copy() # copy to make sure we are not remotely-mutable

# Populate known_consolidate, blknos, and blklocs lazily
self._known_consolidated = False
self._is_consolidated = False
# error: Incompatible types in assignment (expression has type "None",
# variable has type "ndarray")
self._blknos = None # type: ignore[assignment]
# error: Incompatible types in assignment (expression has type "None",
# variable has type "ndarray")
self._blklocs = None # type: ignore[assignment]

# -------------------------------------------------------------------
# Pickle

cpdef __reduce__(self):
if len(self.axes) == 1:
# SingleBlockManager, __init__ expects Block, axis
args = (self.blocks[0], self.axes[0])
else:
args = (self.blocks, self.axes)
return type(self), args

cpdef __setstate__(self, state):
from pandas.core.construction import extract_array
from pandas.core.internals.blocks import (
ensure_block_shape,
new_block,
)
from pandas.core.internals.managers import ensure_index

if isinstance(state, tuple) and len(state) >= 4 and "0.14.1" in state[3]:
state = state[3]["0.14.1"]
axes = [ensure_index(ax) for ax in state["axes"]]
ndim = len(axes)

for blk in state["blocks"]:
vals = blk["values"]
# older versions may hold e.g. DatetimeIndex instead of DTA
vals = extract_array(vals, extract_numpy=True)
blk["values"] = ensure_block_shape(vals, ndim=ndim)

nbs = [
new_block(blk["values"], blk["mgr_locs"], ndim=ndim)
for blk in state["blocks"]
]
blocks = tuple(nbs)
self.blocks = blocks
self.axes = axes

else:
raise NotImplementedError("pre-0.14.1 pickles are no longer supported")

self._post_setstate()

def _post_setstate(self) -> None:
self._is_consolidated = False
self._known_consolidated = False
self._rebuild_blknos_and_blklocs()

# -------------------------------------------------------------------
2 changes: 1 addition & 1 deletion pandas/_libs/reduction.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,6 @@ cdef class BlockSlider:
Ensure that we have the original blocks, blknos, and blklocs.
"""
mgr = self.dummy._mgr
mgr.blocks = self.blocks
mgr.blocks = tuple(self.blocks)
mgr._blklocs = self.orig_blklocs
mgr._blknos = self.orig_blknos
4 changes: 3 additions & 1 deletion pandas/compat/pickle_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
PeriodArray,
TimedeltaArray,
)
from pandas.core.internals import BlockManager

if TYPE_CHECKING:
from pandas import (
Expand Down Expand Up @@ -222,7 +223,8 @@ def load_newobj(self):
elif issubclass(cls, TimedeltaArray) and not args:
arr = np.array([], dtype="m8[ns]")
obj = cls.__new__(cls, arr, arr.dtype)

elif cls is BlockManager and not args:
obj = cls.__new__(cls, (), [], False)
else:
obj = cls.__new__(cls, *args)

Expand Down
2 changes: 1 addition & 1 deletion pandas/core/internals/concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def concatenate_managers(
b = new_block(new_values, placement=placement, ndim=len(axes))
blocks.append(b)

return BlockManager(blocks, axes)
return BlockManager(tuple(blocks), axes)


def _get_mgr_concatenation_plan(mgr: BlockManager, indexers: dict[int, np.ndarray]):
Expand Down
152 changes: 55 additions & 97 deletions pandas/core/internals/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,22 +135,16 @@ class BaseBlockManager(DataManager):
This is *not* a public API class
"""

__slots__ = [
"axes",
"blocks",
"_known_consolidated",
"_is_consolidated",
"_blknos",
"_blklocs",
]
__slots__ = ()

_blknos: np.ndarray
_blklocs: np.ndarray
blocks: tuple[Block, ...]
axes: list[Index]

# Non-trivially faster than a property
ndim: int
_known_consolidated: bool
_is_consolidated: bool

def __init__(self, blocks, axes, verify_integrity=True):
raise NotImplementedError
Expand Down Expand Up @@ -276,57 +270,6 @@ def arrays(self) -> list[ArrayLike]:
"""
return [blk.values for blk in self.blocks]

def __getstate__(self):
block_values = [b.values for b in self.blocks]
block_items = [self.items[b.mgr_locs.indexer] for b in self.blocks]
axes_array = list(self.axes)

extra_state = {
"0.14.1": {
"axes": axes_array,
"blocks": [
{"values": b.values, "mgr_locs": b.mgr_locs.indexer}
for b in self.blocks
],
}
}

# First three elements of the state are to maintain forward
# compatibility with 0.13.1.
return axes_array, block_values, block_items, extra_state

def __setstate__(self, state):
def unpickle_block(values, mgr_locs, ndim: int) -> Block:
# TODO(EA2D): ndim would be unnecessary with 2D EAs
# older pickles may store e.g. DatetimeIndex instead of DatetimeArray
values = extract_array(values, extract_numpy=True)
return new_block(values, placement=mgr_locs, ndim=ndim)

if isinstance(state, tuple) and len(state) >= 4 and "0.14.1" in state[3]:
state = state[3]["0.14.1"]
self.axes = [ensure_index(ax) for ax in state["axes"]]
ndim = len(self.axes)

for blk in state["blocks"]:
vals = blk["values"]
# older versions may hold e.g. DatetimeIndex instead of DTA
vals = extract_array(vals, extract_numpy=True)
blk["values"] = ensure_block_shape(vals, ndim=ndim)

self.blocks = tuple(
unpickle_block(b["values"], b["mgr_locs"], ndim=ndim)
for b in state["blocks"]
)
else:
raise NotImplementedError("pre-0.14.1 pickles are no longer supported")

self._post_setstate()

def _post_setstate(self) -> None:
self._is_consolidated = False
self._known_consolidated = False
self._rebuild_blknos_and_blklocs()

def __repr__(self) -> str:
output = type(self).__name__
for i, ax in enumerate(self.axes):
Expand Down Expand Up @@ -823,7 +766,7 @@ def consolidate(self: T) -> T:
if self.is_consolidated():
return self

bm = type(self)(self.blocks, self.axes)
bm = type(self)(self.blocks, self.axes, verify_integrity=False)
bm._is_consolidated = False
bm._consolidate_inplace()
return bm
Expand Down Expand Up @@ -1079,7 +1022,7 @@ def take(self: T, indexer, axis: int = 1, verify: bool = True) -> T:
)


class BlockManager(BaseBlockManager):
class BlockManager(libinternals.BlockManager, BaseBlockManager):
"""
BaseBlockManager that holds 2D blocks.
"""
Expand All @@ -1095,27 +1038,18 @@ def __init__(
axes: Sequence[Index],
verify_integrity: bool = True,
):
self.axes = [ensure_index(ax) for ax in axes]
self.blocks: tuple[Block, ...] = tuple(blocks)

for block in blocks:
if self.ndim != block.ndim:
raise AssertionError(
f"Number of Block dimensions ({block.ndim}) must equal "
f"number of axes ({self.ndim})"
)

if verify_integrity:
self._verify_integrity()
assert all(isinstance(x, Index) for x in axes)

# Populate known_consolidate, blknos, and blklocs lazily
self._known_consolidated = False
# error: Incompatible types in assignment (expression has type "None",
# variable has type "ndarray")
self._blknos = None # type: ignore[assignment]
# error: Incompatible types in assignment (expression has type "None",
# variable has type "ndarray")
self._blklocs = None # type: ignore[assignment]
for block in blocks:
if self.ndim != block.ndim:
raise AssertionError(
f"Number of Block dimensions ({block.ndim}) must equal "
f"number of axes ({self.ndim})"
)

self._verify_integrity()

def _verify_integrity(self) -> None:
mgr_shape = self.shape
Expand All @@ -1130,21 +1064,6 @@ def _verify_integrity(self) -> None:
f"tot_items: {tot_items}"
)

@classmethod
def _simple_new(cls, blocks: tuple[Block, ...], axes: list[Index]):
"""
Fastpath constructor; does NO validation.
"""
obj = cls.__new__(cls)
obj.axes = axes
obj.blocks = blocks

# Populate known_consolidate, blknos, and blklocs lazily
obj._known_consolidated = False
obj._blknos = None
obj._blklocs = None
return obj

@classmethod
def from_blocks(cls, blocks: list[Block], axes: list[Index]) -> BlockManager:
"""
Expand Down Expand Up @@ -1210,7 +1129,7 @@ def get_slice(self, slobj: slice, axis: int = 0) -> BlockManager:
new_axes = list(self.axes)
new_axes[axis] = new_axes[axis]._getitem_slice(slobj)

return type(self)._simple_new(tuple(new_blocks), new_axes)
return type(self)(tuple(new_blocks), new_axes, verify_integrity=False)

def iget(self, i: int) -> SingleBlockManager:
"""
Expand Down Expand Up @@ -1418,7 +1337,7 @@ def idelete(self, indexer) -> BlockManager:
nbs = self._slice_take_blocks_ax0(taker, only_slice=True)
new_columns = self.items[~is_deleted]
axes = [new_columns, self.axes[1]]
return type(self)._simple_new(tuple(nbs), axes)
return type(self)(tuple(nbs), axes)

# ----------------------------------------------------------------
# Block-wise Operation
Expand Down Expand Up @@ -1602,6 +1521,45 @@ def from_array(cls, array: ArrayLike, index: Index) -> SingleBlockManager:
block = new_block(array, placement=slice(0, len(index)), ndim=1)
return cls(block, index)

def __getstate__(self):
block_values = [b.values for b in self.blocks]
block_items = [self.items[b.mgr_locs.indexer] for b in self.blocks]
axes_array = list(self.axes)

extra_state = {
"0.14.1": {
"axes": axes_array,
"blocks": [
{"values": b.values, "mgr_locs": b.mgr_locs.indexer}
for b in self.blocks
],
}
}

# First three elements of the state are to maintain forward
# compatibility with 0.13.1.
return axes_array, block_values, block_items, extra_state

def __setstate__(self, state):
def unpickle_block(values, mgr_locs, ndim: int) -> Block:
# TODO(EA2D): ndim would be unnecessary with 2D EAs
# older pickles may store e.g. DatetimeIndex instead of DatetimeArray
values = extract_array(values, extract_numpy=True)
return new_block(values, placement=mgr_locs, ndim=ndim)

if isinstance(state, tuple) and len(state) >= 4 and "0.14.1" in state[3]:
state = state[3]["0.14.1"]
self.axes = [ensure_index(ax) for ax in state["axes"]]
ndim = len(self.axes)
self.blocks = tuple(
unpickle_block(b["values"], b["mgr_locs"], ndim=ndim)
for b in state["blocks"]
)
else:
raise NotImplementedError("pre-0.14.1 pickles are no longer supported")

self._post_setstate()

def _post_setstate(self):
pass

Expand Down
2 changes: 1 addition & 1 deletion pandas/core/internals/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def operate_blockwise(
# assert len(slocs) == nlocs, (len(slocs), nlocs)
# assert slocs == set(range(nlocs)), slocs

new_mgr = type(right)(res_blks, axes=right.axes, verify_integrity=False)
new_mgr = type(right)(tuple(res_blks), axes=right.axes, verify_integrity=False)
return new_mgr


Expand Down
Loading

0 comments on commit 69d7663

Please sign in to comment.