2012-04-16 43 views
9

Tôi đang làm việc trên một ứng dụng có quy trình làm việc được quản lý bằng cách truyền các thông điệp trong SQS, sử dụng boto.Cách nhận tất cả thư trong hàng đợi SQS của Amazon bằng thư viện boto bằng Python?

Hàng đợi SQS của tôi đang tăng dần và tôi không có cách nào để kiểm tra xem có bao nhiêu phần tử được cho là chứa.

Bây giờ tôi có một daemon định kỳ thăm dò hàng đợi và kiểm tra xem tôi có tập hợp các phần tử cố định hay không. Ví dụ, hãy xem xét sau "hàng đợi":

q = ["msg1_comp1", "msg2_comp1", "msg1_comp2", "msg3_comp1", "msg2_comp2"] 

Bây giờ tôi muốn kiểm tra xem tôi có "msg1_comp1", "msg2_comp1" và "msg3_comp1" trong hàng đợi cùng nhau tại một thời điểm nào trong thời gian, nhưng tôi don' t biết kích thước của hàng đợi.

Sau khi xem qua API, có vẻ như bạn có thể chỉ nhận được 1 phần tử, hoặc một số cố định của các yếu tố trong hàng đợi, nhưng không phải tất cả:

>>> rs = q.get_messages() 
>>> len(rs) 
1 
>>> rs = q.get_messages(10) 
>>> len(rs) 
10 

Một gợi ý đề xuất trong các câu trả lời sẽ được ví dụ 10 thông điệp trong vòng lặp cho đến khi tôi không nhận được gì, nhưng các thông báo trong SQS có thời gian chờ hiển thị, có nghĩa là nếu tôi thăm dò các phần tử từ hàng đợi, chúng sẽ không bị xóa, chúng sẽ chỉ ẩn trong một khoảng thời gian ngắn thời gian.

Có cách nào đơn giản để nhận tất cả thư trong hàng đợi mà không biết có bao nhiêu thư?

Trả lời

13

Đặt cuộc gọi của bạn để q.get_messages(n) bên trong vòng lặp while:

all_messages=[] 
rs=q.get_messages(10) 
while len(rs)>0: 
    all_messages.extend(rs) 
    rs=q.get_messages(10) 

Bên cạnh đó, dump won't support more than 10 messages một trong hai:

def dump(self, file_name, page_size=10, vtimeout=10, sep='\n'): 
    """Utility function to dump the messages in a queue to a file 
    NOTE: Page size must be < 10 else SQS errors""" 
+0

tôi có thể không thực sự làm điều đó, vì các tin nhắn trong SQS có một thời gian chờ tầm nhìn, vì vậy nếu tôi lần đầu tiên được 10 tin nhắn, sau đó lặp lại một vài lần, lần sau tôi có thể nhận được 10 tin nhắn giống nhau kể từ khi hết thời gian chờ. Tôi đang suy nghĩ về việc sử dụng 'dump()' nhưng tôi sẽ phải đọc các tập tin sau, có vẻ ngớ ngẩn, tôi thiếu cái gì? (Tôi có thể đặt chế độ hiển thị_timeout trong một thời gian rất dài, nhưng điều đó có vẻ xấu). –

+0

@linker - bạn nói bạn cần kiểm tra các thông điệp cụ thể 'n'. điều này có nghĩa là có một số tiêu chí đối sánh mà bạn đang so sánh từng thư không? –

+0

Xin lỗi nếu điều đó gây nhầm lẫn, tôi đã cập nhật bài đăng của mình. –

5

sự hiểu biết của tôi là bản chất phân tán của dịch vụ SQS khá nhiều làm cho thiết kế của bạn không khả thi. Mỗi khi bạn gọi get_messages, bạn đang nói chuyện với một bộ máy chủ khác, sẽ có một số chứ không phải tất cả thư của bạn. Do đó, không thể 'kiểm tra theo thời gian' để đặt xem một nhóm thông điệp cụ thể đã sẵn sàng chưa, và sau đó chỉ chấp nhận những thông điệp đó.

Điều bạn cần làm là thăm dò ý kiến ​​liên tục, nhận tất cả thư khi họ đến và lưu trữ chúng cục bộ trong cấu trúc dữ liệu của riêng bạn. Sau mỗi lần tìm nạp thành công, bạn có thể kiểm tra cấu trúc dữ liệu của mình để xem liệu một tập hợp thư hoàn chỉnh đã được thu thập chưa.

Hãy ghi nhớ rằng thông điệp sẽ đến trong trật tự, và một số thông điệp sẽ được giao hai lần, như xóa phải tuyên truyền đến tất cả các máy chủ SQS, nhưng được yêu cầu tiếp theo đôi khi đánh bại các thông điệp xóa.

0

Nội dung nào đó giống như mã bên dưới sẽ thực hiện thủ thuật. Xin lỗi nó trong C#, nhưng nó không khó để chuyển đổi sang python. Từ điển được sử dụng để loại bỏ các bản sao.

public Dictionary<string, Message> GetAllMessages(int pollSeconds) 
    { 
     var msgs = new Dictionary<string, Message>(); 
     var end = DateTime.Now.AddSeconds(pollSeconds); 

     while (DateTime.Now <= end) 
     { 
      var request = new ReceiveMessageRequest(Url); 
      request.MaxNumberOfMessages = 10; 

      var response = GetClient().ReceiveMessage(request); 

      foreach (var msg in response.Messages) 
      { 
       if (!msgs.ContainsKey(msg.MessageId)) 
       { 
        msgs.Add(msg.MessageId, msg); 
       } 
      } 
     } 

     return msgs; 
    } 
9

Tôi đã làm việc với hàng đợi AWS SQS để cung cấp thông báo tức thì, vì vậy tôi cần xử lý tất cả thư trong thời gian thực. Các mã sau đây sẽ giúp bạn có hiệu quả dequeue (tất cả) tin nhắn và xử lý bất kỳ lỗi nào khi loại bỏ.

Lưu ý: để xóa thư khỏi hàng đợi, bạn cần xóa chúng.Tôi đang sử dụng cập nhật boto3 AWS python SDK, thư viện json, và các giá trị mặc định sau đây:

import boto3 
import json 

region_name = 'us-east-1' 
queue_name = 'example-queue-12345' 
max_queue_messages = 10 
message_bodies = [] 
aws_access_key_id = '<YOUR AWS ACCESS KEY ID>' 
aws_secret_access_key = '<YOUR AWS SECRET ACCESS KEY>' 
sqs = boto3.resource('sqs', region_name=region_name, 
     aws_access_key_id=aws_access_key_id, 
     aws_secret_access_key=aws_secret_access_key) 
queue = sqs.get_queue_by_name(QueueName=queue_name) 
while True: 
    messages_to_delete = [] 
    for message in queue.receive_messages(
      MaxNumberOfMessages=max_queue_messages) 
     # process message body 
     body = json.loads(message.body) 
     message_bodies.append(body) 
     # add message to delete 
     messages_to_delete.append({ 
      'Id': message.message_id, 
      'ReceiptHandle': message.receipt_handle 
     }) 

    # if you don't receive any notifications the 
    # messages_to_delete list will be empty 
    if len(messages_to_delete) == 0: 
     break 
    # delete messages to remove them from SQS queue 
    # handle any errors 
    else: 
     delete_response = queue.delete_messages(
       Entries=messages_to_delete) 
+0

Một sự thích nghi cho gói v2' Boto' để "backport" hàm 'delete_messages' từ' Boto3' là [ở đây] (http://stackoverflow.com/a/40638174/4228193). Việc xây dựng trong 'Boto' (2)' delete_message_batch' có một giới hạn của 10 tin nhắn và yêu cầu đầy đủ 'Message'-lớp đối tượng, chứ không phải chỉ là' ID' và 'ReceiptHandles' trong một đối tượng. – mpag

0

Chú ý: Đây không phải là dự định như là một câu trả lời trực tiếp cho câu hỏi. Thay vào đó là tăng thêm thành @TimothyLiu's answer, giả sử người dùng cuối đang sử dụng gói Boto (còn gọi là Boto2) chứ không phải Boto3. Mã này là một "Boto-2-ization" của delete_messages gọi nêu tại his answer


Một Boto (2) kêu gọi delete_message_batch(messages_to_delete) nơi messages_to_delete là một đối tượng dict với chính: giá trị tương ứng với id: receipt_handle cặp trả

AttributeError: 'dict' object has no attribute 'id'.

Có vẻ như delete_message_batch mong đợi đối tượng lớp Message; sao chép các Boto source for delete_message_batch và cho phép nó sử dụng một đối tượng không Message (ala boto3) cũng không thành công nếu bạn đang xóa nhiều hơn 10 "thư" tại một thời điểm. Vì vậy, tôi phải sử dụng công việc sau đây.

đang ePrint của here

from __future__ import print_function 
import sys 
from itertools import islice 

def eprint(*args, **kwargs): 
    print(*args, file=sys.stderr, **kwargs) 

@static_vars(counter=0) 
def take(n, iterable, reset=False): 
    "Return next n items of the iterable as same type" 
    if reset: take.counter = 0 
    take.counter += n 
    bob = islice(iterable, take.counter-n, take.counter) 
    if isinstance(iterable, dict): return dict(bob) 
    elif isinstance(iterable, list): return list(bob) 
    elif isinstance(iterable, tuple): return tuple(bob) 
    elif isinstance(iterable, set): return set(bob) 
    elif isinstance(iterable, file): return file(bob) 
    else: return bob 

def delete_message_batch2(cx, queue, messages): #returns a string reflecting level of success rather than throwing an exception or True/False 
    """ 
    Deletes a list of messages from a queue in a single request. 
    :param cx: A boto connection object. 
    :param queue: The :class:`boto.sqs.queue.Queue` from which the messages will be deleted 
    :param messages: List of any object or structure with id and receipt_handle attributes such as :class:`boto.sqs.message.Message` objects. 
    """ 
    listof10s = [] 
    asSuc, asErr, acS, acE = "","",0,0 
    res = [] 
    it = tuple(enumerate(messages)) 
    params = {} 
    tenmsg = take(10,it,True) 
    while len(tenmsg)>0: 
    listof10s.append(tenmsg) 
    tenmsg = take(10,it) 
    while len(listof10s)>0: 
    tenmsg = listof10s.pop() 
    params.clear() 
    for i, msg in tenmsg: #enumerate(tenmsg): 
     prefix = 'DeleteMessageBatchRequestEntry' 
     numb = (i%10)+1 
     p_name = '%s.%i.Id' % (prefix, numb) 
     params[p_name] = msg.get('id') 
     p_name = '%s.%i.ReceiptHandle' % (prefix, numb) 
     params[p_name] = msg.get('receipt_handle') 
    try: 
     go = cx.get_object('DeleteMessageBatch', params, BatchResults, queue.id, verb='POST') 
     (sSuc,cS),(sErr,cE) = tup_result_messages(go) 
     if cS: 
     asSuc += ","+sSuc 
     acS += cS 
     if cE: 
     asErr += ","+sErr 
     acE += cE 
    except cx.ResponseError: 
     eprint("Error in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params)) 
    except: 
     eprint("Error of unknown type in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params)) 
    return stringify_final_tup(asSuc, asErr, acS, acE, expect=len(messages)) #mdel #res 

def stringify_final_tup(sSuc="", sErr="", cS=0, cE=0, expect=0): 
    if sSuc == "": sSuc="None" 
    if sErr == "": sErr="None" 
    if cS == expect: sSuc="All" 
    if cE == expect: sErr="All" 
    return "Up to {} messages removed [{}]\t\tMessages remaining ({}) [{}]".format(cS,sSuc,cE,sErr) 
1

tôi thực hiện điều này trong một cronjob

from django.core.mail import EmailMessage 
from django.conf import settings 
import boto3 
import json 

sqs = boto3.resource('sqs', aws_access_key_id=settings.AWS_ACCESS_KEY_ID, 
     aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY, 
     region_name=settings.AWS_REGION) 

queue = sqs.get_queue_by_name(QueueName='email') 
messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1) 

while len(messages) > 0: 
    for message in messages: 
     mail_body = json.loads(message.body) 
     print("E-mail sent to: %s" % mail_body['to']) 
     email = EmailMessage(mail_body['subject'], mail_body['message'], to=[mail_body['to']]) 
     email.send() 
     message.delete() 

    messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1) 
Các vấn đề liên quan