Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SFTPOperator with dynamic user, fails. #46354

Open
1 of 2 tasks
remchuk opened this issue Feb 2, 2025 · 1 comment
Open
1 of 2 tasks

SFTPOperator with dynamic user, fails. #46354

remchuk opened this issue Feb 2, 2025 · 1 comment
Labels
area:core-operators Operators, Sensors and hooks within Core Airflow kind:bug This is a clearly a bug kind:documentation needs-triage label for new issues that we didn't triage yet

Comments

@remchuk
Copy link

remchuk commented Feb 2, 2025

What do you see as an issue?

Hey there, This is my first post here so forgive me if this is not the correct place.

I have a unique need of Airflow, I need to integrate SFTP&SSH Operator with a dynamic client, meaning I need that every trigger will also prompt for a user and a password, This means that I can't actually create a permanent Connection in the UI.

I find that using SSHHook is the correct approch.
I created a task that receives the params, and does:

def create_ssh_hook(**kwargs):
    ti = kwargs["ti"]
    ti.xcom_push(key="ssh_hook_host", value=kwargs["params"]["host"])
    ti.xcom_push(key="ssh_hook_username", value=kwargs["params"]["username"])
    ti.xcom_push(key="ssh_hook_password", value=kwargs["params"]["password"])


create_ssh_hook_task = PythonOperator(
     task_id="create_ssh_hook",
     provide_context=True,
     python_callable=create_ssh_hook,
    op_kwargs=params
)


add_file_to_vm = SFTPOperator(
      task_id='add_script_to_vm',
      sftp_hook=SFTPHook(ssh_hook=SSHHook(remote_host="{{ ti.xcom_pull(key='ssh_hook_host', task_ids='create_ssh_hook') }}",  # I know this is not supported after 4.9.0, but I can't find a different way to do it and this is the version we have.
                                          username="{{ ti.xcom_pull(key='ssh_hook_username', task_ids='create_ssh_hook') }}"
                                          password="{{ ti.xcom_pull(key='ssh_hook_password', task_ids='create_ssh_hook') }}")),
     local_filepath='/a',
     remote_filepath='/tmp/a',
     operation='put',
     create_intermediate_dirs=True,
)

create_ssh_hoot_task >> add_file_to_vm

Now I recieve the following error:
socket.gaierror: [Errno -2] Name or service not known

Now when trying to just ssh from the same place that runs this dag, works just fine, also setting up a proper connection with a static user and a ssh_conn_id also works.

Solving the problem

I think the correct way of working with SSHHook should still be implemented.
I tried every docs file but none specify about dynamic connection.

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@remchuk remchuk added kind:bug This is a clearly a bug kind:documentation needs-triage label for new issues that we didn't triage yet labels Feb 2, 2025
Copy link

boring-cyborg bot commented Feb 2, 2025

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@dosubot dosubot bot added the area:core-operators Operators, Sensors and hooks within Core Airflow label Feb 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core-operators Operators, Sensors and hooks within Core Airflow kind:bug This is a clearly a bug kind:documentation needs-triage label for new issues that we didn't triage yet
Projects
None yet
Development

No branches or pull requests

1 participant