Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability for all operators to interact with storages of AWS/GCP/AZURE #8804

Closed
JeffryMAC opened this issue May 10, 2020 · 6 comments
Closed
Labels

Comments

@JeffryMAC
Copy link

Description

Currently every operator is doing XToY. so if someone wrote MySQLToGCS it doesn't help someone who needs MySQLToS3.

Use case / motivation

It would be great if when PR is raised people will need only to handle the X part and provide a list of the Y part. Something like people need to write XtoDataframe or XtoFile and there is build in integration in Airflow that can handle the FileToS3 FileToGCS etc...

So when user is PR MySQLToFile Airflow will utilise this and auto create MySQLToGCS and MySQLToS3.

The idea is to build infrastructure layer once that will be automated for all.

@JeffryMAC JeffryMAC added the kind:feature Feature Requests label May 10, 2020
@dinigo
Copy link
Contributor

dinigo commented May 11, 2020

You are suggesting to implement kind of what we have with GenericTransfer. I've thought of this too. We would need to have a common API for getting and sending files. Preferably a flile-like-object so transfers are done with a stream and they don't take storage.

I would rather have an GenericFileTransfer

Having FsApiHook abstract implementing functions similar to DbApiHook with the get_records and insert_rows but for files. For example get_file_stream and write_file_stream.

So for example having a GCSHook and S3Hook extend FsApiHook

class GCSHook (GcpBaseHook, FsApiHook):
  # ...
  def get_file_stream(bucket: str, file_path: str):
    return gcsfs.open(file_path)
  # ...
class S3Hook (AwsBaseHook, FsApi):
  # ...
  def write_file_stream(bucket: str, file_path: str):
    """ Already implemented in https://github.com/apache/airflow/blob/master/airflow/providers/amazon/aws/hooks/s3.py#L582 """
    client.upload_fileobj(file_obj, bucket_name, key, ExtraArgs=extra_args)
  # ...

Having a GenericFileTransfer similar to what GenericTransfer does in the execute method:

class GenericFileTransfer(BaseOperator):
  # ...
  def execute() 
        source_hook = BaseHook.get_hook(self.source_conn_id)
        exchange_stream = source_hook.get_file_stream(self.source_bucket, self.source_file)
        dest_hook = BaseHook.get_hook(self.dest_conn_id)
        dest_hook.write_file_stream(self.dest_bucket, self.dest_file)

Now I can configure the Hooks for each, source and destination, such as:

gcs_to_s3 = GenericFileTransfer(
  source_conn='gcs-conn-id',
  source_hook=GCSHook,
  source_bucket='my-gcs-bucket',
  source_file='my-file{{ ds }}.csv',
  dest_conn='s3-conn-id',
  dest_hook=S3Hook,
  dest_bucket='my-s3-bucket',
  dest_file='my-file{{ ds }}.csv'
)

And now you can remove all the specific copy operators like S3ToSFTP, AzureBlobbToGCS... Or, if they don't exist you don't need to implement them anymore!

What do you think @turbaszek? I don't know how the Core development is organized (I know there's a Jira and a mailing list, but know nothing of the organization processes). This will be a core change. But if Airflow aims to thrive, it's necessary (IMHO)

@turbaszek
Copy link
Member

This issue may provide some alternative #8059

@ashb
Copy link
Member

ashb commented May 11, 2020

See https://issues.apache.org/jira/browse/AIRFLOW-2651 for the previous discussion of this.

There was a pr #3526 but the interface/code wasn't quite right.

@dinigo
Copy link
Contributor

dinigo commented May 11, 2020

@turbaszek I see how this is related, and I face this question frequently. But I think #8059 depends on having a common FS interface like #3526 sugests.

Right now we have (I might miss some) the following file providers: S3, GCS, Azure, SFTP/SSH, FTP, Samba. If we want to allow all operations between them it means it's N^2 operators. It's 36 operators. And will be more as other cloud providers are added (Alibaba, DO, ...). I'll give a look at the PR @ashb , thanks.

@jrderuiter
Copy link
Contributor

jrderuiter commented Dec 12, 2020

Would be interesting to implement this using fsspec (https://filesystem-spec.readthedocs.io/en/latest/). Such an implementation would provide an interface to all filesystems supported by fsspec, which includes Azure Blob/GDL2, S3 and GCS. You also get others like FTP and SFTP for free too (https://filesystem-spec.readthedocs.io/en/latest/api.html#implementations).

@RNHTTR
Copy link
Contributor

RNHTTR commented Sep 3, 2023

Given there are already so many transfer operators (e.g. AWS transfers), I think implementing something like this would be more trouble than it's worth.

@RNHTTR RNHTTR closed this as not planned Won't fix, can't repro, duplicate, stale Sep 3, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

7 participants