Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial query download feature changes #1

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions backend/dataall/base/aws/s3_client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import logging

import boto3
from botocore.config import Config

from botocore.exceptions import ClientError
import logging

log = logging.getLogger(__name__)

Expand Down
9 changes: 7 additions & 2 deletions backend/dataall/modules/worksheets/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
queries,
resolvers,
types,
enums,
)

__all__ = ['resolvers', 'types', 'input_types', 'queries', 'mutations', 'enums']
__all__ = [
'resolvers',
'types',
'input_types',
'queries',
'mutations',
]
11 changes: 11 additions & 0 deletions backend/dataall/modules/worksheets/api/input_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,14 @@
gql.Argument(name='measures', type=gql.ArrayType(gql.Ref('WorksheetMeasureInput'))),
],
)


WorksheetQueryResultDownloadUrlInput = gql.InputType(
name='WorksheetQueryResultDownloadUrlInput',
arguments=[
gql.Argument(name='athenaQueryId', type=gql.NonNullableType(gql.String)),
gql.Argument(name='fileFormat', type=gql.NonNullableType(gql.String)),
gql.Argument(name='environmentUri', type=gql.NonNullableType(gql.String)),
gql.Argument(name='worksheetUri', type=gql.NonNullableType(gql.String)),
],
)
16 changes: 15 additions & 1 deletion backend/dataall/modules/worksheets/api/mutations.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
from dataall.base.api import gql
from dataall.modules.worksheets.api.resolvers import create_worksheet, delete_worksheet, update_worksheet
from dataall.modules.worksheets.api.resolvers import (
create_worksheet,
delete_worksheet,
update_worksheet,
create_athena_query_result_download_url,
)


createWorksheet = gql.MutationField(
Expand Down Expand Up @@ -27,3 +32,12 @@
],
type=gql.Boolean,
)

createWorksheetQueryResultDownloadUrl = gql.MutationField(
name='createWorksheetQueryResultDownloadUrl',
resolver=create_athena_query_result_download_url,
args=[
gql.Argument(name='input', type=gql.Ref('WorksheetQueryResultDownloadUrlInput')),
],
type=gql.Ref('WorksheetQueryResult'),
)
28 changes: 27 additions & 1 deletion backend/dataall/modules/worksheets/api/resolvers.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from dataall.base.db import exceptions
from dataall.modules.worksheets.api.enums import WorksheetRole
from dataall.modules.worksheets.services.worksheet_enums import WorksheetRole, WorksheetResultsFormat
from dataall.modules.worksheets.db.worksheet_models import Worksheet
from dataall.modules.worksheets.db.worksheet_repositories import WorksheetRepository
from dataall.modules.worksheets.services.worksheet_service import WorksheetService
from dataall.base.api.context import Context
from dataall.modules.worksheets.services.worksheet_query_result_service import WorksheetQueryResultService


def create_worksheet(context: Context, source, input: dict = None):
Expand Down Expand Up @@ -69,3 +70,28 @@ def run_sql_query(context: Context, source, environmentUri: str = None, workshee
def delete_worksheet(context, source, worksheetUri: str = None):
with context.engine.scoped_session() as session:
return WorksheetService.delete_worksheet(session=session, uri=worksheetUri)


def create_athena_query_result_download_url(context: Context, source, input: dict = None):
if not input:
raise exceptions.RequiredParameter('data')
if not input.get('environmentUri'):
raise exceptions.RequiredParameter('environmentUri')
if not input.get('athenaQueryId'):
raise exceptions.RequiredParameter('athenaQueryId')
if not input.get('fileFormat'):
raise exceptions.RequiredParameter('fileFormat')
if not hasattr(WorksheetResultsFormat, input.get('fileFormat').upper()):
raise exceptions.InvalidInput(
'fileFormat',
input.get('fileFormat'),
', '.join(result_format.value for result_format in WorksheetResultsFormat),
)

env_uri = input['environmentUri']
worksheet_uri = input['worksheetUri']

with context.engine.scoped_session() as session:
return WorksheetQueryResultService.download_sql_query_result(
session=session, uri=worksheet_uri, env_uri=env_uri, data=input
)
11 changes: 6 additions & 5 deletions backend/dataall/modules/worksheets/api/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,16 @@
name='WorksheetQueryResult',
fields=[
gql.Field(name='worksheetQueryResultUri', type=gql.ID),
gql.Field(name='queryType', type=gql.NonNullableType(gql.String)),
gql.Field(name='sqlBody', type=gql.NonNullableType(gql.String)),
gql.Field(name='sqlBody', type=gql.String),
gql.Field(name='AthenaQueryId', type=gql.NonNullableType(gql.String)),
gql.Field(name='region', type=gql.NonNullableType(gql.String)),
gql.Field(name='AwsAccountId', type=gql.NonNullableType(gql.String)),
gql.Field(name='AthenaOutputBucketName', type=gql.NonNullableType(gql.String)),
gql.Field(name='AthenaOutputKey', type=gql.NonNullableType(gql.String)),
gql.Field(name='timeElapsedInSecond', type=gql.NonNullableType(gql.Integer)),
gql.Field(name='elapsedTimeInMs', type=gql.Integer),
gql.Field(name='created', type=gql.NonNullableType(gql.String)),
gql.Field(name='downloadLink', type=gql.String),
gql.Field(name='outputLocation', type=gql.String),
gql.Field(name='expiresIn', type=gql.AWSDateTime),
gql.Field(name='fileFormat', type=gql.String),
],
)

Expand Down
69 changes: 69 additions & 0 deletions backend/dataall/modules/worksheets/aws/s3_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import logging
from typing import TYPE_CHECKING

from botocore.exceptions import ClientError

from dataall.base.aws.sts import SessionHelper
from dataall.base.db.exceptions import AWSResourceNotFound

if TYPE_CHECKING:
from dataall.core.environment.db.environment_models import Environment

try:
from mypy_boto3_s3 import S3Client as S3ClientType
except ImportError:
S3ClientType = None

log = logging.getLogger(__name__)


class S3Client:
def __init__(self, env: 'Environment'):
self._client = SessionHelper.remote_session(env.AwsAccountId, env.region).client('s3', region_name=env.region)
self._env = env

@property
def client(self) -> 'S3ClientType':
return self._client

def get_presigned_url(self, bucket, key, expire_minutes: int = 15):
expire_seconds = expire_minutes * 60
try:
presigned_url = self.client.generate_presigned_url(
'get_object',
Params=dict(
Bucket=bucket,
Key=key,
),
ExpiresIn=expire_seconds,
)
return presigned_url
except ClientError as e:
log.error(f'Failed to get presigned URL due to: {e}')
raise e

def object_exists(self, bucket, key) -> bool:
try:
self.client.head_object(Bucket=bucket, Key=key)
return True
except ClientError as e:
if e.response['Error']['Code'] == '404':
log.info(f'Object {key} not found in bucket {bucket}')
return False
log.error(f'Failed to check object existence due to: {e}')
raise AWSResourceNotFound('s3_object_exists', f'Object {key} not found in bucket {bucket}')

def put_object(self, bucket, key, body):
try:
self.client.put_object(Bucket=bucket, Key=key, Body=body)
except ClientError as e:
log.error(f'Failed to put object due to: {e}')
raise e

def get_object(self, bucket, key) -> str:
try:
response = self.client.get_object(Bucket=bucket, Key=key)
return response['Body'].read().decode('utf-8')
except ClientError as e:
log.error(f'Failed to get object due to: {e}')
raise e
20 changes: 14 additions & 6 deletions backend/dataall/modules/worksheets/db/worksheet_models.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import datetime
import enum

from sqlalchemy import Column, DateTime, Integer, Enum, String
from sqlalchemy import Column, DateTime, Integer, Enum, String, BigInteger
from sqlalchemy.dialects import postgresql
from sqlalchemy.orm import query_expression

Expand All @@ -27,15 +27,23 @@ class Worksheet(Resource, Base):

class WorksheetQueryResult(Base):
__tablename__ = 'worksheet_query_result'
worksheetQueryResultUri = Column(String, primary_key=True, default=utils.uuid('worksheetQueryResultUri'))
worksheetUri = Column(String, nullable=False)
AthenaQueryId = Column(String, primary_key=True)
status = Column(String, nullable=False)
queryType = Column(Enum(QueryType), nullable=False, default=True)
sqlBody = Column(String, nullable=False)
AthenaQueryId = Column(String, nullable=False)
status = Column(String, nullable=True)
sqlBody = Column(String, nullable=True)
AwsAccountId = Column(String, nullable=False)
region = Column(String, nullable=False)
OutputLocation = Column(String, nullable=False)
error = Column(String, nullable=True)
ElapsedTimeInMs = Column(Integer, nullable=True)
DataScannedInBytes = Column(Integer, nullable=True)
DataScannedInBytes = Column(BigInteger, nullable=True)
created = Column(DateTime, default=datetime.datetime.now)

downloadLink = Column(String, nullable=True)
expiresIn = Column(DateTime, nullable=True)
updated = Column(DateTime, nullable=False, onupdate=datetime.datetime.utcnow, default=datetime.datetime.utcnow)
fileFormat = Column(String, nullable=True)

def is_download_link_expired(self):
return self.expiresIn is None or self.expiresIn <= datetime.datetime.utcnow()
14 changes: 14 additions & 0 deletions backend/dataall/modules/worksheets/db/worksheet_repositories.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,17 @@ def paginated_user_worksheets(session, username, groups, uri, data=None, check_p
page=data.get('page', WorksheetRepository._DEFAULT_PAGE),
page_size=data.get('pageSize', WorksheetRepository._DEFAULT_PAGE_SIZE),
).to_dict()

@staticmethod
def find_query_result_by_format(
session, worksheet_uri: str, athena_query_id: str, file_format: str
) -> WorksheetQueryResult:
return (
session.query(WorksheetQueryResult)
.filter(
WorksheetQueryResult.worksheetUri == worksheet_uri,
WorksheetQueryResult.AthenaQueryId == athena_query_id,
WorksheetQueryResult.fileFormat == file_format,
)
.first()
)
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,8 @@ class WorksheetRole(GraphQLEnumMapper):
Creator = '950'
Admin = '900'
NoPermission = '000'


class WorksheetResultsFormat(GraphQLEnumMapper):
CSV = 'csv'
XLSX = 'xlsx'
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
from dataall.core.permissions.services.resources_permissions import (
RESOURCES_ALL,
RESOURCES_ALL_WITH_DESC,
)
from dataall.core.permissions.services.environment_permissions import (
ENVIRONMENT_INVITED,
ENVIRONMENT_INVITATION_REQUEST,
ENVIRONMENT_ALL,
)

from dataall.core.permissions.services.resources_permissions import (
RESOURCES_ALL,
RESOURCES_ALL_WITH_DESC,
)
from dataall.core.permissions.services.tenant_permissions import TENANT_ALL, TENANT_ALL_WITH_DESC

MANAGE_WORKSHEETS = 'MANAGE_WORKSHEETS'
Expand All @@ -22,12 +21,10 @@
UPDATE_WORKSHEET = 'UPDATE_WORKSHEET'
DELETE_WORKSHEET = 'DELETE_WORKSHEET'
RUN_WORKSHEET_QUERY = 'RUN_WORKSHEET_QUERY'
WORKSHEET_ALL = [
GET_WORKSHEET,
UPDATE_WORKSHEET,
DELETE_WORKSHEET,
RUN_WORKSHEET_QUERY,
]
DOWNLOAD_ATHENA_QUERY_RESULTS = 'DOWNLOAD_ATHENA_QUERY_RESULTS'


WORKSHEET_ALL = [GET_WORKSHEET, UPDATE_WORKSHEET, DELETE_WORKSHEET, RUN_WORKSHEET_QUERY, DOWNLOAD_ATHENA_QUERY_RESULTS]

RESOURCES_ALL.extend(WORKSHEET_ALL)

Expand Down
Loading