Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add transformation ability to write. #148

Merged
merged 1 commit into from
Mar 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion afm/pep/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Copyright 2020 IBM Corp.
# SPDX-License-Identifier: Apache-2.0
#
from .base import Action, consolidate_actions, transform, transform_schema
from .base import Action, consolidate_actions, transform, transform_schema, transform_batches
from .actions import Redact, RemoveColumns

# registry is a map from action name to Action class
Expand Down
18 changes: 18 additions & 0 deletions afm/pep/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,21 @@ def transform_schema(actions, schema):
for action in actions:
schema = action.schema(schema)
return schema

def transform_batches(actions, record_batches):
"""Transform record batches according to actions.

Args:
actions (list): actions to apply
record_batches (list): list of recrod batches to act on

Returns:
[pyarrow.RecordBatch]: list of the transformed record batches
"""
transformed_batches = []
for record_batch in record_batches:
item = record_batch
for action in actions:
item = action(item)
transformed_batches.append(item)
return transformed_batches
7 changes: 4 additions & 3 deletions afm/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from .asset import asset_from_config
from .command import AFMCommand
from .config import Config
from .pep import transform, transform_schema, actions
from .pep import transform, transform_schema, transform_batches, actions
from .ticket import AFMTicket
from .worker import workers_from_config
from .auth import AFMAuthHandler
Expand Down Expand Up @@ -58,9 +58,10 @@ def _filter_columns(self, schema, columns):
# write arrow dataset to filesystem
def _write_asset(self, asset, reader):
# in this implementation we currently begin by reading the entire dataset
t = reader.read_all().combine_chunks()
record_batches = reader.read_all().combine_chunks().to_batches()
transformed_batches = transform_batches(asset.actions, record_batches)
# currently, write_dataset supports the parquet format, but not csv
ds.write_dataset(t, base_dir=asset.path, format=asset.format,
ds.write_dataset(transformed_batches, base_dir=asset.path, format=asset.format,
filesystem=asset.filesystem)

def _read_asset(self, asset, columns=None):
Expand Down
7 changes: 7 additions & 0 deletions sample/sample.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ data:
path: "/tmp/new-dataset"
connection:
type: localfs
transformations:
- action: "Redact"
description: "redact stuff"
columns:
- Date of Birth
options:
redactValue: "XXXXX"
- name: "nyc-taxi.parquet"
capability: "read"
format: parquet
Expand Down