Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve SFTP hook's directory transfer to use a single connection for multiple files #46582

Conversation

Dawnpool
Copy link
Contributor

@Dawnpool Dawnpool commented Feb 8, 2025

This PR improves SFTP hook's store_directory and retrieve_directory functions to use a single connection when transferring a directory with multiple files.

Previously, these functions relied on store_file and retrieve_file functions. And with this PR, the store_file and retrieve_file functions were modified to open and close sftp connection each time.
This leads to the store_directory and retrieve_directory functions to open and close too many connections repeatedly when there are many files in a directory, which causes significant overhead.

To address this, I modified them to open a connection, transfer all files in a directory, and then close the connection afterward.

I also did a performance test in my local environment by transferring a directory containing 1,000 small files. This reduced the transfer time by approximately 8-9 seconds. The results are shown below.

AS-IS TO-BE
store 47.18 sec 39.35 sec
retrieve 63.50 sec 54.73 sec

@Dawnpool Dawnpool changed the title Improve SFTP hook's directory transfer to use a single connection in multiple files Improve SFTP hook's directory transfer to use a single connection for multiple files Feb 8, 2025
@Dawnpool
Copy link
Contributor Author

I also added the with_conn wrapper to avoid repeating with self.conn() as conn.

Hi @dabla , I think this might be related to your recent PRs. Please feel free to take a look.

@dabla
Copy link
Contributor

dabla commented Feb 18, 2025

@Dawnpool really good catch, as indeed this will speed up the store_directory and retrieve_directory functions as only one connection will be created, but personally, and that's a personal opinion I don't like the wrapper that much even though it solves the performance issue. Wouldn't it be possible to refactor the get_conn method so that it caches the connection so when invoked a second time it just returns the cached instance and once it's finished removed the cached connection, then the code using the context manager stays as it was.

This is how it could be implemented, I've tested it locally and it works:

@contextmanager
def get_conn(self) -> Generator[SFTPClient, None, None]:
    """Context manager that closes the connection after use."""
    if self._sftp_conn is None:
        self._ssh_conn = super(SFTPHook, self).get_conn()  # Get the base SSH connection
        self._sftp_conn = self._ssh_conn.open_sftp()

    self._conn_count += 1
    try:
        yield self._sftp_conn
    finally:
        self._conn_count -= 1
        if self._conn_count == 0:  # Only close when last reference is done
            self._sftp_conn.close()
            self._sftp_conn = None
            self._ssh_conn.close()
            self._ssh_conn = None

The hook would have 3 new fields, namely self._ssh_conn, self._sftp_conn and self._conn_count.

The for example in retrieve_directory:

    def retrieve_directory(self, remote_full_path: str, local_full_path: str, prefetch: bool = True) -> None:
        """
        Transfer the remote directory to a local location.

        If local_full_path is a string path, the directory will be put
        at that location.

        :param remote_full_path: full path to the remote directory
        :param local_full_path: full path to the local directory
        :param prefetch: controls whether prefetch is performed (default: True)
        """
        if Path(local_full_path).exists():
            raise AirflowException(f"{local_full_path} already exists")
        Path(local_full_path).mkdir(parents=True)
        with self.get_conn():
            files, dirs, _ = self.get_tree_map(remote_full_path)
            for dir_path in dirs:
                new_local_path = os.path.join(local_full_path, os.path.relpath(dir_path, remote_full_path))
                Path(new_local_path).mkdir(parents=True, exist_ok=True)
            for file_path in files:
                new_local_path = os.path.join(local_full_path, os.path.relpath(file_path, remote_full_path))
                self.retrieve_file(file_path, new_local_path, prefetch)

Of course if we could replace the "dummy" usage of the self.get_conn() context manager as a decorator, then we would have best of both worlds and only annotate methods where we want that behaviour without changing the methods signature.

To be clear if it's not possible, then I would stick with your proposed solution.

WDYT?

@Dawnpool
Copy link
Contributor Author

Dawnpool commented Feb 20, 2025

Hi @dabla ,
Thanks for your comment! I actually love your suggestion. Your idea looks great and very reasonable to me.
But, as you said, I still want to remove the repeated usage of self.get_connsince it would indent code in every function. I'm not sure if it's possible, but I'll give it a try and let you know how it goes.
Thanks again for your valuable suggestion.

@Dawnpool
Copy link
Contributor Author

Hi, @dabla
I've applied your suggestions with some changes due to pre-commit type checking.
I couldn't really find a simple way to remove the repeated usage of self.get_conn, but I think the current state is still good enough.
Please take a look and thanks again for your feedback.

Copy link
Contributor

@dabla dabla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work, looks good to me!

@potiuk
Copy link
Member

potiuk commented Feb 24, 2025

Nice, but can you please add some unit tests testing the behaviour (identity of the client).

@dabla
Copy link
Contributor

dabla commented Feb 28, 2025

Nice, but can you please add some unit tests testing the behaviour (identity of the client).

Good point @potiuk. You could indeed add some assertions within the existing tests to make sure connection gets opened/closed only once during the whole operation.

@Dawnpool
Copy link
Contributor Author

Dawnpool commented Mar 1, 2025

@potiuk @dabla
Thanks for your opinions. I added test code for it. Please check.

@potiuk potiuk merged commit 998fcd6 into apache:main Mar 1, 2025
60 checks passed
file_path = os.path.join(root, file_name)
new_remote_path = os.path.join(remote_full_path, os.path.relpath(file_path, local_full_path))
self.store_file(new_remote_path, file_path, confirm)
with self.get_conn():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@potiuk @Dawnpool I think the merged code is broken, it should use get_managed_conn instead of get_conn

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh no, I didn't realize it. I'll create a fix PR right away.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries I forgot to warn you as we had to fix backward compatibility yesterday, my bad

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants