From 668a9c628767bfc5ea4ac81125fa762be8a9e59d Mon Sep 17 00:00:00 2001 From: mohammad-nassar10 Date: Thu, 24 Mar 2022 17:40:26 +0200 Subject: [PATCH] Add transformation ability to write. Signed-off-by: mohammad-nassar10 --- afm/pep/__init__.py | 2 +- afm/pep/base.py | 18 ++++++++++++++++++ afm/server.py | 7 ++++--- sample/sample.yaml | 7 +++++++ 4 files changed, 30 insertions(+), 4 deletions(-) diff --git a/afm/pep/__init__.py b/afm/pep/__init__.py index e44056d..88c718b 100644 --- a/afm/pep/__init__.py +++ b/afm/pep/__init__.py @@ -2,7 +2,7 @@ # Copyright 2020 IBM Corp. # SPDX-License-Identifier: Apache-2.0 # -from .base import Action, consolidate_actions, transform, transform_schema +from .base import Action, consolidate_actions, transform, transform_schema, transform_batches from .actions import Redact, RemoveColumns # registry is a map from action name to Action class diff --git a/afm/pep/base.py b/afm/pep/base.py index cfacec2..6a96af2 100644 --- a/afm/pep/base.py +++ b/afm/pep/base.py @@ -171,3 +171,21 @@ def transform_schema(actions, schema): for action in actions: schema = action.schema(schema) return schema + +def transform_batches(actions, record_batches): + """Transform record batches according to actions. + + Args: + actions (list): actions to apply + record_batches (list): list of recrod batches to act on + + Returns: + [pyarrow.RecordBatch]: list of the transformed record batches + """ + transformed_batches = [] + for record_batch in record_batches: + item = record_batch + for action in actions: + item = action(item) + transformed_batches.append(item) + return transformed_batches \ No newline at end of file diff --git a/afm/server.py b/afm/server.py index 039687d..cbcaf2d 100644 --- a/afm/server.py +++ b/afm/server.py @@ -17,7 +17,7 @@ from .asset import asset_from_config from .command import AFMCommand from .config import Config -from .pep import transform, transform_schema, actions +from .pep import transform, transform_schema, transform_batches, actions from .ticket import AFMTicket from .worker import workers_from_config from .auth import AFMAuthHandler @@ -58,9 +58,10 @@ def _filter_columns(self, schema, columns): # write arrow dataset to filesystem def _write_asset(self, asset, reader): # in this implementation we currently begin by reading the entire dataset - t = reader.read_all().combine_chunks() + record_batches = reader.read_all().combine_chunks().to_batches() + transformed_batches = transform_batches(asset.actions, record_batches) # currently, write_dataset supports the parquet format, but not csv - ds.write_dataset(t, base_dir=asset.path, format=asset.format, + ds.write_dataset(transformed_batches, base_dir=asset.path, format=asset.format, filesystem=asset.filesystem) def _read_asset(self, asset, columns=None): diff --git a/sample/sample.yaml b/sample/sample.yaml index 2b81627..c457df9 100644 --- a/sample/sample.yaml +++ b/sample/sample.yaml @@ -25,6 +25,13 @@ data: path: "/tmp/new-dataset" connection: type: localfs + transformations: + - action: "Redact" + description: "redact stuff" + columns: + - Date of Birth + options: + redactValue: "XXXXX" - name: "nyc-taxi.parquet" capability: "read" format: parquet