如何防止我的应用在并行化paramiko.SFTPClient.get请求时挂起?

2022-03-12 00:00:00 python boto3 sftp paramiko file-transfer

问题描述

我正在尝试通过SFTP从服务器并行检索文件并上传到AWS。我使用的是python多线程,上传部分工作得很好,但是我注意到paramiko.SFTPClient的get操作使程序在最后挂起。事实上,所有文件都被撤回并上传,但程序并不退出。我尝试了很多类似帖子的东西,但是都不管用,我的伪代码如下,欢迎任何帮助:

def create_sftp_connection(host, port, username, password):
    transport = paramiko.Transport((host, port))
    transport.connect(username, password)
    sftp_client = paramiko.SFTPClient.from_transport(transport)

def get_and_upload_file(s3, sftp_client, file, local_full_path, destination_bucket, cloud_path):
     sftp_client.get(file, local_full_path)
     upload_file_to_s3(s3, local_full_path, destination_bucket, cloud_path)

def transfer_files(sftp_client, remote_path, local_path, destination_bucket):
     all_files = get_files_to_transfer(sftp_client, remote_path)
     s3 = init_s3()
     threads = list()
    
     for file in all_files:
         ....
         thread = threading.Thread(target=get_and_upload_file, args=(s3, sftp_client, file, local_full_path, destination_bucket, cloud_path))
         thread.daemon = True
         threads.append(thread)
         thread.start()
        
      for thread in threads:
           thread.join()


if __name__ == "__main__":
     sftp_client = create_sftp_connection(host, port, username, password)
     transfer_files(sftp_client, remote_path, local_path, destination_bucket)

注意:我还尝试使用以下命令等待线程停止:

for thread in threads:
    while thread.is_alive():
          thread.join(timeout=0.1)

解决方案

我非常确定Paramiko不是线程安全的。

您很可能需要为每个线程创建单独的连接(Transport)。


创建少量连接,并让它们从共享列表/队列中挑选文件。无论如何,并行上载多个或几个文件是没有意义的。

相关文章