Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SftpSensor w/ possibility to use RegEx or fnmatch #15332

Closed
saveriogzz opened this issue Apr 12, 2021 · 3 comments · Fixed by #24084
Closed

SftpSensor w/ possibility to use RegEx or fnmatch #15332

saveriogzz opened this issue Apr 12, 2021 · 3 comments · Fixed by #24084

Comments

@saveriogzz
Copy link
Contributor

Description

SmartSftpSensor with possibility to search for patterns (RegEx or UNIX fnmatch) in filenames or folders

Use case / motivation

I would like to have the possibility to use wildcards and/or regular expressions to look for certain files when using an SftpSensor.
At the moment I tried to do something like this:

from airflow.providers.sftp.sensors.sftp import SFTPSensor
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
from typing import Any

import os
import fnmatch

class SmartSftpSensor(SFTPSensor):
    poke_context_fields = ('path', 'filepattern', 'sftp_conn_id', ) # <- Required fields
    template_fields = ['filepattern', 'path']

    @apply_defaults
    def __init__(
            self, 
            filepattern="",
            **kwargs: Any):

        super().__init__(**kwargs)
        self.filepath = self.path
        self.filepattern = filepattern

    def poke(self, context):
        full_path = self.filepath

        directory = os.listdir(full_path)

        for file in directory:
            if not fnmatch.fnmatch(file, self.filepattern):
                pass
            else:
                context['task_instance'].xcom_push(key='file_name', value=file)
                return True
        return False

    def is_smart_sensor_compatible(self): # <- Required
        result = (
            not self.soft_fail
            and super().is_smart_sensor_compatible()
        )
        return result

class MyPlugin(AirflowPlugin):
    name = "my_plugin"
    operators = [SmartSftpSensor]

And I call it by doing

sense_file = SmartSftpSensor(
    task_id='sense_file',
    sftp_conn_id='my_sftp_connection',
    path=templ_remote_filepath,
    filepattern=filename,
    timeout=3
)

where path is the folder containing the files and filepattern is a rendered filename with wildcards: filename = """{{ execution_date.strftime("%y%m%d_%H00??_P??_???") }}.LV1""", which is rendered to e.g. 210412_1600??_P??_???.LV1

but I am still not getting the expected result, as it's not capturing anything.

Are you willing to submit a PR?
Yes!

Related Issues

I didn't find any

@saveriogzz saveriogzz added the kind:feature Feature Requests label Apr 12, 2021
@saveriogzz saveriogzz changed the title SmartSftpSensor leveraging RegEx or fnmatch to look for patterns SftpSensor w/ possibility to use RegEx or fnmatch Apr 13, 2021
@blcksrx
Copy link
Contributor

blcksrx commented Apr 13, 2021

You can use the wildcard in SFTPSensor

@saveriogzz
Copy link
Contributor Author

Hey @blcksrx would you mind giving some more details on how to use them? If I simply use the wildcard written above with Airflow's built-in SFTPSensor, it doesn't capture anything..
Thanks in advance!!

@blcksrx
Copy link
Contributor

blcksrx commented Apr 16, 2021

it sounds for *nix OS that provides shell. it's convenient to use wildcards like this:

hook.get_conn().execute("ls PATH/*.csv")

but it is too raw and not useable for any cases. I'm going to prepare a PR for that to using regex.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants