Skip to content

Commit

Permalink
feat: ✨ add get file structure endpoint (#11)
Browse files Browse the repository at this point in the history
* feat: add get file tree

* fix: format

* fix: format

* style: format
  • Loading branch information
Aydawka authored May 30, 2024
1 parent 3510fe6 commit da2dd69
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 17 deletions.
17 changes: 16 additions & 1 deletion function_app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Azure Function App for ETL pipeline."""
import json

import logging

import azure.functions as func

from publish_pipeline.generate_high_level_metadata.generate_changelog import (
Expand Down Expand Up @@ -191,3 +191,18 @@ def moving_folders(req: func.HttpRequest) -> func.HttpResponse:
def copying_folders(req: func.HttpRequest) -> func.HttpResponse:
"""Copies the directories along with the files in the Azure Database."""
return file_operations.file_operation(file_operations.copy_directory, req)


@app.route(route="listing-structure", auth_level=func.AuthLevel.FUNCTION)
def listing_folder_structure(req: func.HttpRequest) -> func.HttpResponse:
"""List the directories along with the files in the Azure Database."""
try:
file_operations.pipeline()
file_operations.get_file_tree()
# return func.HttpResponse("Success", status_code=200)
return func.HttpResponse(
json.dumps(file_operations.get_file_tree().to_dict()), status_code=200
)
except Exception as e:
print(f"Exception: {e}")
return func.HttpResponse("Internal Server Error", status_code=500)
135 changes: 119 additions & 16 deletions utils/file_operations.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,46 @@
import os
from typing import Callable
import config
from typing import Callable, List
from azure.storage.filedatalake import FileSystemClient
import azure.functions as func
import datetime

import azure.storage.blob as azureblob
import psycopg2

import config


class FileException(Exception):
pass


def move_directory(container: str, source: str, destination: str, overwrite_permitted: bool):
file_system = FileSystemClient.from_connection_string(
config.AZURE_STORAGE_CONNECTION_STRING,
file_system_name=container,
)
class FileStructure:
label: str

def __init__(self, label: str):
self.label = label

def to_dict(self):
return {
"label": self.label
}


class FolderStructure(FileStructure):
children: List['FileStructure']

def __init__(self, label: str, children: List['FileStructure']):
super().__init__(label)
self.children = children

def to_dict(self):
return {
"children": [child.to_dict() for child in self.children],
"label": self.label
}


def move_directory(file_system: FileSystemClient, source: str, destination: str, overwrite_permitted: bool):
source_client = file_system.get_directory_client(source)
destination_client = file_system.get_directory_client(destination)

Expand All @@ -31,12 +57,9 @@ def move_directory(container: str, source: str, destination: str, overwrite_perm
)


def copy_directory(container: str, source: str, destination: str, overwrite_permitted: bool) -> None:
def copy_directory(file_system: FileSystemClient, source: str, destination: str, overwrite_permitted: bool) -> None:
"""Moving directories while implementing subsequent copies (recursion)"""
file_system = FileSystemClient.from_connection_string(
config.AZURE_STORAGE_CONNECTION_STRING,
file_system_name=container,
)

source_client = file_system.get_directory_client(source)

if not source_client.exists():
Expand Down Expand Up @@ -68,7 +91,7 @@ def copy_directory(container: str, source: str, destination: str, overwrite_perm
destination_file.create_file()
destination_file.upload_data(source_file_bytes, overwrite=True)
else:
copy_directory(container, child_path, target_path, overwrite_permitted)
copy_directory(file_system, child_path, target_path, overwrite_permitted)


def file_operation(operation: Callable, req: func.HttpRequest) -> func.HttpResponse:
Expand All @@ -83,15 +106,95 @@ def file_operation(operation: Callable, req: func.HttpRequest) -> func.HttpRespo

overwrite: bool = overwrite_permitted.lower().strip() == "true"

source: str = "AI-READI/metadata/test2/t1"
destination: str = "AI-READI/metadata/test2/t2"
source: str = "AI-READI/metadata/test2/t1/test"
destination: str = "AI-READI/metadata/test2/t3/t4"
container = "stage-1-container"

try:
operation(container, source, destination, overwrite)
file_system = FileSystemClient.from_connection_string(
config.AZURE_STORAGE_CONNECTION_STRING,
file_system_name=container,
)
operation(file_system, source, destination, overwrite)
return func.HttpResponse("Success", status_code=200)
except FileException as e:
return func.HttpResponse(e.args[0], status_code=500)
except Exception as e:
print(f"Exception: {e}")
return func.HttpResponse("Internal Server Error", status_code=500)


def get_file_tree():
container = "stage-1-container"
file_system = FileSystemClient.from_connection_string(
config.AZURE_STORAGE_CONNECTION_STRING,
file_system_name=container,
)
source: str = "AI-READI/metadata/test2/t1"

return recurse_file_tree(file_system, source)


def recurse_file_tree(file_system: FileSystemClient, source: str) -> FileStructure:
source_client = file_system.get_directory_client(source)

if not source_client.exists():
raise FileException("source directory does not exist!")

source_path: str = source_client.get_directory_properties().name
return FolderStructure(os.path.basename(source_path),
[recurse_file_tree(file_system, child_path)
if child_path.is_directory
else FileStructure(os.path.basename(child_path.name))
for child_path
in file_system.get_paths(source_path, recursive=False)
])


def pipeline():
"""
Reads the file structure from azure
"""

conn = psycopg2.connect(
host=config.FAIRHUB_DATABASE_HOST,
database=config.FAIRHUB_DATABASE_NAME,
user=config.FAIRHUB_DATABASE_USER,
password=config.FAIRHUB_DATABASE_PASSWORD,
port=config.FAIRHUB_DATABASE_PORT,
)

cur = conn.cursor()

study_id = "c588f59c-cacb-4e52-99dd-95b37dcbfd5c"
dataset_id = "af4be921-e507-41a9-9328-4cbb4b7dca1c"

cur.execute(
"SELECT * FROM dataset WHERE id = %s AND study_id = %s",
(dataset_id, study_id),
)
dataset = cur.fetchone()

if dataset is None:
return "Dataset not found"

conn.close()
# upload the file to the metadata folder

# metadata_folder = "AI-READI/metadata"
#
# sas_token = azureblob.generate_account_sas(
# account_name="b2aistaging",
# account_key=config.AZURE_STORAGE_ACCESS_KEY,
# resource_types=azureblob.ResourceTypes(container=True, object=True),
# permission=azureblob.AccountSasPermissions(read=True, write=True, list=True),
# expiry=datetime.datetime.now(datetime.timezone.utc)
# + datetime.timedelta(hours=1),
# )
#
# # Get the blob service client
# blob_service_client = azureblob.BlobServiceClient(
# account_url="https://b2aistaging.blob.core.windows.net/",
# credential=sas_token,
# )
return

0 comments on commit da2dd69

Please sign in to comment.