2011-02-07 36 views
17

Trong một thời gian rất dài, bây giờ tôi đã sử dụng hàng đợi nhiệm vụ trên AppEngine để lên lịch các tác vụ, đúng như cách tôi dự kiến.Hàng đợi nhiệm vụ thử nghiệm đơn vị trong AppEngine

Nhưng những gì tôi đã luôn luôn tự hỏi là làm thế nào để viết một bài kiểm tra cho điều đó? Cho đến bây giờ tôi đã thực hiện các kiểm tra đơn giản để chắc chắn rằng một lỗi không xảy ra trên API xếp hàng một nhiệm vụ và sau đó viết các bài kiểm tra thích hợp hơn cho API thực hiện nhiệm vụ.

Tuy nhiên gần đây tôi đã bắt đầu cảm thấy một chút không hài lòng bởi điều này và tôi đang tìm kiếm một cách để thực sự kiểm tra rằng nhiệm vụ chính xác đã được thêm vào hàng đợi chính xác. Hy vọng rằng điều này có thể được thực hiện tốt hơn đơn giản bằng cách triển khai mã và hy vọng điều tốt nhất.

Tôi đang sử dụng django-nonrel, nếu có bất kỳ câu trả lời nào.

Tóm tắt lại: Làm thế nào để kiểm tra đơn vị được viết để xác nhận các tác vụ đã được xếp hàng đợi?

Trả lời

13

Sử dụng GAE Test Bed sẽ cho phép bạn phân phát hàng đợi công việc.

Nếu bạn kế thừa từ FunctionalTestCase hoặc TaskQueueTestCase, bạn sẽ nhận được các phương pháp như get_tasksassertTasksInQueue.

Bạn cũng có thể thực hiện các tác vụ. Làm thế nào để làm nó khác nhau tùy thuộc vào việc bạn sử dụng nhiệm vụ hoặc trì hoãn.

Đối deferreds, tôi có một số mã như thế này:

from google.appengine.ext import deferred 
import base64 

# gets the most recent task -- since the task queue is reset between tests, 
# this is usually what you want 
def get_task(self): 
    for task in self.get_task_queue_stub().GetTasks('default'): 
     return task 

# decode and execute the deferred method 
def run_deferred(task): 
    deferred.run(base64.b64decode(task['body'])) 

Chạy nhiệm vụ tương tự, nhưng sau khi bạn lấy nhiệm vụ, bạn sử dụng WebTest (mà GAE Kiểm tra giường ngủ được xây dựng trên đỉnh của) trình như POST yêu cầu đến URL của nhiệm vụ với các tham số của nó.

+0

Điều này có vẻ như nó sẽ thực hiện thủ thuật. Cảm ơn! –

+0

Bạn có thể sử dụng 'self.get_tasks' và' task ['decoded_body'] 'làm lối tắt nếu bạn đang sử dụng tất cả các trường hợp kiểm tra cơ sở (https://github.com/jgeewax/gaetestbed/blob/master/gaetestbed/ taskqueue.py). Ngoài ra, điều này hy vọng sớm là một phần của google.appengine.ext.testbed. Có phương thức 'get_filtered_tasks' hoạt động rất giống' get_tasks' (https://code.google.com/p/googleappengine/source/browse/trunk/python/google/appengine/api/taskqueue/taskqueue_stub.py # 2453) –

+0

Xem câu trả lời của tôi: thư viện này hiện không được dùng nữa vì có lợi thế ext.testbed (https://developers.google.com/appengine/docs/python/tools/localunittesting) –

1

Có một khung kiểm tra nhỏ được gọi là gaetestbed có thể phù hợp với nhu cầu của bạn. Để biết chi tiết, vui lòng tham khảo: https://github.com/jgeewax/gaetestbed.

Môi trường thử nghiệm này hoạt động liên quan đến mũi, mũi-gae plugin và gói WebTest. Với sự pha trộn của các gói python là cách tốt nhất để kiểm tra các ứng dụng GAE theo như tôi quan tâm.

+4

Gói này là thực sự phản đối như tôi đã sáp nhập các chức năng vào google.appengine.ext.testbed (https://developers.google.com/appengine/docs/python/tools/localunittesting) –

0

SDK 1.4.3Testbed API cung cấp cấu hình thư viện sơ khai dễ dàng cho các bài kiểm tra tích hợp cục bộ.

Một lệnh dịch vụ cho Task Queue khả dụng.

18

Nếu bạn đang sử dụng google.appengine.ext.testbed thay vì GAE Testbed (GAE Testbed hiện đang bị phản đối và được di chuyển sang ext.testbed), bạn có thể làm như sau:

import base64 
import unittest2 

from google.appengine.ext import deferred 
from google.appengine.ext import testbed 


class TestTasks(unittest2.TestCase): 
    def setUp(self): 
    self.testbed = testbed.Testbed() 
    self.testbed.activate() 
    self.testbed.init_taskqueue_stub() 
    self.taskqueue_stub = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME) 

    def tearDown(self): 
    self.testbed.deactivate() 

    def test_send_contact_request(self): 
    # Make the request to your app that "defers" something: 
    response = ... 
    self.assertEqual(response.status_int, 200) 

    # Get the task out of the queue 
    tasks = self.taskqueue_stub.get_filtered_tasks() 
    self.assertEqual(1, len(tasks)) 

    # Run the task 
    task = tasks[0] 
    deferred.run(task.payload) 

    # Assert that other things happened (ie, if the deferred was sending mail...) 
    self.assertEqual(...) 
+1

Đây phải là câu trả lời được chấp nhận. Cảm ơn – Houman

+0

Bạn có thể thực hiện việc này với hàng đợi của riêng mình không? Tôi sử dụng hoãn lại như: deferred.defer (get_and_store_all_new_messages_async, user_id, truy vấn, next_page_token, _queue = "emailFetch") nhưng không nhận ra hàng đợi, có thể do nó không đọc tệp queue.yaml cho bài kiểm tra đơn vị –

Các vấn đề liên quan