-
Notifications
You must be signed in to change notification settings - Fork 14.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create salesforce to gcs transfer #10760
Create salesforce to gcs transfer #10760
Conversation
6028042
to
1972774
Compare
""" | ||
|
||
def __init__( | ||
self, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self, | |
self, *, |
nitpick here. Operator arguments are now keyword-only
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you. Added this arg.
:param query: The query to make to Salesforce. | ||
:type query: str |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
:param query: The query to make to Salesforce. | |
:type query: str | |
:param query: The query to make to Salesforce. | |
:type query: str |
I'm thinking that this would give error if not on the same line with the param?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, fixed formatting for all parameters in this doc string.
from airflow.providers.google.cloud.transfers.salesforce_to_gcs import SalesforceToGcsOperator | ||
from airflow.utils.dates import days_ago | ||
|
||
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/root/keyfile.json" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work! 👍
System test would make this easy with @pytest.mark.credential_file and I think it(system test) is required for google provider packages. Take a look
@pytest.mark.credential_file(GCP_BIGQUERY_KEY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the example! I am looking into implementing system test for this operator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General comments:
-
Salesforce hook already support multiple formats (json, csv , ndjson) why do you limit this operator only to csv? It also has
write_object_to_file
method which seems like you implemented in another way in here. -
I wonder how close can we get to the same behavior and parameters as
BaseSQLToGCSOperator
? (It has relevant params likeapprox_max_file_size_bytes
)
:type object_name: str | ||
:param salesforce_conn_id: the name of the connection that has the parameters | ||
we need to connect to Salesforce. The connection should be type `http` and | ||
include a user's security token in the `Extras` field. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to specify this. It's part of the doc of salesforce connection #10482.
This can change when Airflow will support other connection options for salesforce.
@eladkal Thanks for the feedback. I think using write_object_to_file method is a good call out. Will make this simpler and more flexible so I'm going to try to implement it that way. In terms of adding other parameters, do you think that adding parameters related to file creation like approx_max_file_size_bytes and field_delimiter to happen within write_object_to_file would make sense? Alternatively the execute method would chunk the results and call write_object_to_file multiple times. |
923661b
to
d90fed6
Compare
Ignore my previous comment I'll review the salesforce part later this week. |
|
||
|
||
@contextmanager | ||
def provide_facebook_connection(key_file_path: str): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind moving this function to tests.test_utils.facebook_system_helpers
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Moved to test_utils. Also this should be named salesforce so renamed as well.
@pytest.mark.backend("mysql", "postgres") | ||
@pytest.mark.credential_file(GCP_BIGQUERY_KEY) | ||
@pytest.mark.credential_file(SALESFORCE_KEY) | ||
@pytest.mark.system("google.cloud") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also mark it as salesforce?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Marked as salesforce system test.
0f32335
to
bf750e3
Compare
@chipmyersjr can you please take a look at the CI issues? Pylint seems to be sad :< |
bf750e3
to
03190e9
Compare
@turbaszek seems pylint was complaining about some unrelated files. I just did rebase and checks seem to be in in good shape now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Salesforce logic looks good
Can you please add support for template_fields
, template_ext
(reading query from file) ?
352b4d5
to
ee6d552
Compare
@eladkal thanks for checking. I added templating support for parameters where I felt is was appropriate. |
ee6d552
to
17799ca
Compare
filename=path, | ||
gzip=self.gzip, | ||
) | ||
self.log.info("%s uploaded to GCS", path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to return the path to file so it can be easily referenced in downstream operators
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hi @turbaszek , 'path' variable here represents the temporary file that gets created
just to be uploaded to GCS and wouldn't be available outside of this context. I'm wondering if it would be more appropriate to pass
this or alternatively the path to the actual GCS object? Also wondering about which would be better to log actually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for not being precise, by returning path
I thought about returning GCS uri
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. I implemented uri return value for execute method.
10dabc5
to
fd90bca
Compare
@turbaszek is something missing to get it merged? |
@chipmyersjr can you please rebase and solve the conflict? |
Co-authored-by: Tomek Urbaszek <[email protected]>
fd90bca
to
224f684
Compare
@turbaszek Should be good to go now. |
Adds SalesforceToGcsOperator that allows users to transfer data from Salesforce to GCS bucket. Co-authored-by: Tomek Urbaszek <[email protected]>
Add Salesforce to GCS transfer operator. Closes issue #8896
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.