Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ✨ add get file structure endpoint #11

Merged
merged 5 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion function_app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Azure Function App for ETL pipeline."""
import json

import logging

import azure.functions as func

from publish_pipeline.generate_high_level_metadata.generate_changelog import (
Expand Down Expand Up @@ -191,3 +191,18 @@ def moving_folders(req: func.HttpRequest) -> func.HttpResponse:
def copying_folders(req: func.HttpRequest) -> func.HttpResponse:
"""Copies the directories along with the files in the Azure Database."""
return file_operations.file_operation(file_operations.copy_directory, req)


@app.route(route="listing-structure", auth_level=func.AuthLevel.FUNCTION)
def listing_folder_structure(req: func.HttpRequest) -> func.HttpResponse:
"""List the directories along with the files in the Azure Database."""
try:
file_operations.pipeline()
file_operations.get_file_tree()
# return func.HttpResponse("Success", status_code=200)
return func.HttpResponse(
json.dumps(file_operations.get_file_tree().to_dict()), status_code=200
)
except Exception as e:
print(f"Exception: {e}")
return func.HttpResponse("Internal Server Error", status_code=500)
135 changes: 119 additions & 16 deletions utils/file_operations.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,46 @@
import os
from typing import Callable
import config
from typing import Callable, List
from azure.storage.filedatalake import FileSystemClient
import azure.functions as func
import datetime

import azure.storage.blob as azureblob
import psycopg2

import config


class FileException(Exception):
pass


def move_directory(container: str, source: str, destination: str, overwrite_permitted: bool):
file_system = FileSystemClient.from_connection_string(
config.AZURE_STORAGE_CONNECTION_STRING,
file_system_name=container,
)
class FileStructure:
label: str

def __init__(self, label: str):
self.label = label

def to_dict(self):
return {
"label": self.label
}


class FolderStructure(FileStructure):
children: List['FileStructure']

def __init__(self, label: str, children: List['FileStructure']):
super().__init__(label)
self.children = children

def to_dict(self):
return {
"children": [child.to_dict() for child in self.children],
"label": self.label
}


def move_directory(file_system: FileSystemClient, source: str, destination: str, overwrite_permitted: bool):
source_client = file_system.get_directory_client(source)
destination_client = file_system.get_directory_client(destination)

Expand All @@ -31,12 +57,9 @@ def move_directory(container: str, source: str, destination: str, overwrite_perm
)


def copy_directory(container: str, source: str, destination: str, overwrite_permitted: bool) -> None:
def copy_directory(file_system: FileSystemClient, source: str, destination: str, overwrite_permitted: bool) -> None:
"""Moving directories while implementing subsequent copies (recursion)"""
file_system = FileSystemClient.from_connection_string(
config.AZURE_STORAGE_CONNECTION_STRING,
file_system_name=container,
)

source_client = file_system.get_directory_client(source)

if not source_client.exists():
Expand Down Expand Up @@ -68,7 +91,7 @@ def copy_directory(container: str, source: str, destination: str, overwrite_perm
destination_file.create_file()
destination_file.upload_data(source_file_bytes, overwrite=True)
else:
copy_directory(container, child_path, target_path, overwrite_permitted)
copy_directory(file_system, child_path, target_path, overwrite_permitted)


def file_operation(operation: Callable, req: func.HttpRequest) -> func.HttpResponse:
Expand All @@ -83,15 +106,95 @@ def file_operation(operation: Callable, req: func.HttpRequest) -> func.HttpRespo

overwrite: bool = overwrite_permitted.lower().strip() == "true"

source: str = "AI-READI/metadata/test2/t1"
destination: str = "AI-READI/metadata/test2/t2"
source: str = "AI-READI/metadata/test2/t1/test"
destination: str = "AI-READI/metadata/test2/t3/t4"
container = "stage-1-container"

try:
operation(container, source, destination, overwrite)
file_system = FileSystemClient.from_connection_string(
config.AZURE_STORAGE_CONNECTION_STRING,
file_system_name=container,
)
operation(file_system, source, destination, overwrite)
return func.HttpResponse("Success", status_code=200)
except FileException as e:
return func.HttpResponse(e.args[0], status_code=500)
except Exception as e:
print(f"Exception: {e}")
return func.HttpResponse("Internal Server Error", status_code=500)


def get_file_tree():
container = "stage-1-container"
file_system = FileSystemClient.from_connection_string(
config.AZURE_STORAGE_CONNECTION_STRING,
file_system_name=container,
)
source: str = "AI-READI/metadata/test2/t1"

return recurse_file_tree(file_system, source)


def recurse_file_tree(file_system: FileSystemClient, source: str) -> FileStructure:
source_client = file_system.get_directory_client(source)

if not source_client.exists():
raise FileException("source directory does not exist!")

source_path: str = source_client.get_directory_properties().name
return FolderStructure(os.path.basename(source_path),
[recurse_file_tree(file_system, child_path)
if child_path.is_directory
else FileStructure(os.path.basename(child_path.name))
for child_path
in file_system.get_paths(source_path, recursive=False)
])


def pipeline():
"""
Reads the file structure from azure
"""

conn = psycopg2.connect(
host=config.FAIRHUB_DATABASE_HOST,
database=config.FAIRHUB_DATABASE_NAME,
user=config.FAIRHUB_DATABASE_USER,
password=config.FAIRHUB_DATABASE_PASSWORD,
port=config.FAIRHUB_DATABASE_PORT,
)

cur = conn.cursor()

study_id = "c588f59c-cacb-4e52-99dd-95b37dcbfd5c"
dataset_id = "af4be921-e507-41a9-9328-4cbb4b7dca1c"

cur.execute(
"SELECT * FROM dataset WHERE id = %s AND study_id = %s",
(dataset_id, study_id),
)
dataset = cur.fetchone()

if dataset is None:
return "Dataset not found"

conn.close()
# upload the file to the metadata folder

# metadata_folder = "AI-READI/metadata"
#
# sas_token = azureblob.generate_account_sas(
# account_name="b2aistaging",
# account_key=config.AZURE_STORAGE_ACCESS_KEY,
# resource_types=azureblob.ResourceTypes(container=True, object=True),
# permission=azureblob.AccountSasPermissions(read=True, write=True, list=True),
# expiry=datetime.datetime.now(datetime.timezone.utc)
# + datetime.timedelta(hours=1),
# )
#
# # Get the blob service client
# blob_service_client = azureblob.BlobServiceClient(
# account_url="https://b2aistaging.blob.core.windows.net/",
# credential=sas_token,
# )
return
Loading