2012-03-06 55 views
10

Tôi đang sử dụng cần tây với django và rabbitmq để tạo hàng đợi tin nhắn. Tôi cũng có một nhân viên, có nguồn gốc từ một cỗ máy khác. Trong một cái nhìn django Tôi bắt đầu một quá trình như thế này:cần tây - chức năng gọi khi thực hiện công việc

def processtask(request, name): 
    args = ["ls", "-l"] 
    MyTask.delay(args) 
    return HttpResponse("Task set to execute.") 

Nhiệm vụ của tôi được cấu hình như thế này:

class MyTask(Task): 
    def run(self, args): 
    p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 
    (out, err) = p.communicate() 
    return out 

Câu hỏi của tôi là làm thế nào có thể một nhà môi giới (dự án django của tôi) bây giờ nhận được đầu ra từ lệnh "ls -l" mà nhân viên đã thực thi trên máy tính của anh ta. Tôi đoán điều tốt nhất sẽ là cho nhân viên gọi một hàm trong môi giới bất cứ khi nào nó sẵn sàng gửi đầu ra từ lệnh được thực hiện.

Tôi muốn nhận kết quả từ công nhân không đồng bộ, sau đó cập nhật trang web với đầu ra, nhưng đó là thời gian khác. Bây giờ tôi chỉ muốn nhận đầu ra từ công nhân.

Cập nhật

Ngay bây giờ tôi đã thêm một yêu cầu HTTP GET được kích hoạt vào cuối nhiệm vụ thông báo cho ứng dụng web mà nhiệm vụ được thực hiện - Tôi cũng đang gửi task_id trong http GET . Phương pháp http GET gọi xem django, mà tạo ra asyncResult và được kết quả, nhưng vấn đề là khi gọi result.get() tôi nhận được lỗi sau:

/usr/lib64/python2.6/site-packages/django_celery-2.5.1-py2.6.egg/djcelery/managers.py:178: TxIsolationWarning: Polling results with transaction isolation level repeatable-read within the same transaction may give outdated results. Be sure to commit the transaction for each poll iteration. 
    "Polling results with transaction isolation level" 

Bất cứ ý tưởng tại sao? Tôi không sử dụng cơ sở dữ liệu, bởi vì tôi đang sử dụng rabbitmq với AMQP.

Cập nhật.

Tôi rất muốn sử dụng tùy chọn thứ ba, có vẻ như là lựa chọn tốt nhất - cho các giá trị trả lại nhỏ và lớn. Toàn bộ tác vụ của tôi trông giống như sau:

class MyTask(Task): 
    def __call__(self, *args, **kwargs): 
    return self.run(*args, **kwargs) 

    def after_return(self, status, retval, task_id, args, kwargs, einfo): 
    if self.webhost is not None: 
     conn = httplib.HTTPConnection(self.webhost, self.webport) 
     conn.request("HEAD", "/vuln/task/output/"+task_id) 

    def run(self, args, webhost=None, webport=None): 
    self.webhost = webhost 
    self.webport = webport 
    r = "This is a basic result string used for code clarity" 
    return r 

Vì vậy, tôi đã ghi đè hàm after_return, cũng sẽ giải phóng khóa cho nhiệm vụ của tôi, vì hàm run() của nhiệm vụ đã trả về một giá trị. Trong yêu cầu HEAD tôi về cơ bản gọi một hàm django, mà gọi AsyncResult trên task_id, mà nên cung cấp với kết quả của nhiệm vụ. Tôi đã sử dụng kết quả tùy ý cho mục đích thử nghiệm trong trường hợp của tôi, vì nó chỉ để thử nghiệm.

Tôi muốn biết lý do mã ở trên không hoạt động. Tôi có thể sử dụng on_success, nhưng tôi không nghĩ rằng nó sẽ tạo ra sự khác biệt - hay đúng không?

+0

Bạn có thể lưu đầu ra của lệnh trong cơ sở dữ liệu không? – jpic

+0

Xin chào, không, bởi vì công nhân không có quyền truy cập vào cơ sở dữ liệu của nhà môi giới và cũng không muốn tôi có quyền truy cập. Tôi chắc chắn cần phải gửi lại một kết quả và sau đó xử lý nó trong môi giới. – eleanor

+1

Có thể bạn có thể tạo một API HTTP để gửi lại kết quả? Có một số cách khá dễ dàng để làm điều đó trong Django. – jpic

Trả lời

14

Nếu bạn nhìn here bạn sẽ tìm thấy những điều sau đây:

Django-cần tây sử dụng MySQL để theo dõi tất cả các nhiệm vụ/kết quả, thỏ mq được sử dụng như một chiếc xe buýt giao tiếp cơ bản. Điều này thực sự đang xảy ra là bạn đang cố gắng tìm nạp ASyncResult của nhân viên trong khi nhiệm vụ vẫn đang chạy (tác vụ đã gọi một yêu cầu HTTP đến máy chủ của bạn và vì nó không trả lại, phiên khóa db từ công nhân vẫn hoạt động và hàng kết quả vẫn bị khóa). Khi Django cố gắng đọc kết quả nhiệm vụ (trạng thái của nó và giá trị trả về thực tế của hàm chạy), nó tìm thấy hàng bị khóa và đưa ra cảnh báo cho bạn.

Có một vài cách để đi về việc giải quyết này:

  1. Thiết lập một nhiệm vụ cần tây để gặt hái kết quả và chuỗi nó với nhiệm vụ xử lý của bạn. Bằng cách đó nhiệm vụ ban đầu sẽ kết thúc, phát hành khóa trên db và cái mới sẽ có được nó, đọc kết quả trong django và làm bất cứ điều gì bạn cần nó để làm. Tra cứu tài liệu cần tây về điều này.

  2. Đừng bận tâm chút nào và chỉ cần gửi POST tới Django với kết quả xử lý đầy đủ được đính kèm dưới dạng tải trọng, thay vì cố gắng tìm nạp nó qua db.

  3. Ghi đè lên_success trong lớp nhiệm vụ của bạn và BẬT yêu cầu thông báo của bạn tới Django sau đó tại thời điểm khóa cần được phát hành trên bảng db.

Lưu ý rằng bạn cần lưu trữ toàn bộ kết quả xử lý (dù nó lớn đến cỡ nào) khi trả lại phương thức chạy (có thể được ngâm). Bạn đã không đề cập đến kết quả lớn như thế nào có thể được vì vậy nó có thể có ý nghĩa để thực sự chỉ làm kịch bản # 2 ở trên (đó là những gì tôi sẽ làm). Ngoài ra tôi sẽ đi với # 3. Cũng đừng quên xử lý phương pháp on_failure cũng như trong nhiệm vụ của bạn.

+0

Cảm ơn bạn đã bình luận. Tôi đã cập nhật câu trả lời của tôi để hỏi thêm câu hỏi, mà tôi cần phải được trả lời trước khi chấp nhận câu trả lời của bạn, đó là thực sự tốt btw. – eleanor

Các vấn đề liên quan