2013-03-05 30 views
6

tôi cần tải xuống tệp qua ftp, thay đổi tệp và tải tệp lên. Tôi đang sử dụng cần tây để làm điều này nhưng tôi đang chạy vào vấn đề khi cố gắng sử dụng chaining, nơi tôi đang nhận được:Nhiệm vụ chuỗi cần thiết liên tiếp

TypeError: upload_ftp_image() takes exactly 5 arguments (6 given)

Ngoài ra, tôi có thể sử dụng dây chuyền và thể yên tâm rằng các bước sẽ tuần tự? nếu không thay thế là gì?

res = chain(download_ftp_image.s(server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/"), upload_ftp_image.s(server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/")).apply_async() 
print res.get() 

Nhiệm vụ:

@task() 
def download_ftp_image(ftp_server, username , password , filename, directory): 
    try: 
     ftp = FTP(ftp_server) 
     ftp.login(username, password) 
     if not os.path.exists(directory): 
      os.makedirs(directory) 
      ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write) 
     else: 
      ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write) 
     ftp.quit() 
    except error_perm, resp: 
     raise download_ftp_image.retry(countdown=15) 

    return "SUCCESS: " 

@task() 
def upload_ftp_image(ftp_server, username , password , file , directory): 
    try: 
     ftp = FTP(ftp_server) 
     ftp.login(username, password) 
     new_file= file.replace(directory, "") 
     directory = directory.replace("tmp","") 
     try: 
      ftp.storbinary("STOR " + directory + new_file , open(file, "rb")) 
     except: 
      ftp.mkd(directory) 
      ftp.storbinary("STOR " + directory + new_file, open(file, "rb")) 
     ftp.quit() 
    except error_perm, resp: 
     raise upload_ftp_image.retry(countdown=15) 

    return "SUCCESS: " 

và là này một tốt hay một thực tế xấu đối với trường hợp cụ thể của tôi? :

result = download_ftp_image.apply_async((server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data') 
result.get() 
result = upload_ftp_image.apply_async((server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data') 
#result.get() 

Trả lời

13

Một chuỗi luôn luôn thông qua trước kết quả như một cuộc tranh luận đầu tiên. Từ chains documentation:

The linked task will be applied with the result of its parent task as the first argument, which in the above case will result in mul(4, 16) since the result is 4.

nhiệm vụ upload_ftp_image bạn không chấp nhận lập luận thêm này, và do đó nó không thành công.

Bạn có trường hợp sử dụng tốt cho chuỗi; nhiệm vụ thứ hai là đảm bảo được gọi là sau khi nhiệm vụ đầu tiên được hoàn thành (nếu không kết quả không thể được chuyển đi).

Chỉ cần thêm một cuộc tranh cãi về kết quả từ bài tập trước:

def upload_ftp_image(download_result, ftp_server, username , password , file , directory): 

Bạn có thể làm cho một số sử dụng các giá trị kết quả; có thể làm cho nó phương thức tải xuống trả về đường dẫn của tệp đã tải xuống để phương thức tải lên biết nội dung tải lên?

+0

Tôi nên làm như thế nào sau đó? – psychok7

+0

@ psychok7: Mở rộng một chút. –

+0

Nó có vẻ như tôi đã nhận nó làm việc :) .. cảm ơn một bó cho bạn giúp – psychok7

17

Một tùy chọn khác nếu bạn không muốn giá trị trả lại của tác vụ trước được sử dụng làm đối số, là sử dụng 'bất biến'.

http://docs.celeryproject.org/en/latest/userguide/canvas.html#immutability

Thay vì xác định nhiệm vụ nhỏ của bạn như:

download_ftp_image.s(...) and upload_ftp_image.s(...) 

xác định chúng như:

download_ftp_image.si(...) and upload_ftp_image.si(...) 

Và bây giờ bạn có thể sử dụng các nhiệm vụ với số lượng thông thường của đối số trong một chuỗi .

Các vấn đề liên quan