2010-01-18 26 views
6

Tôi có một tệp dữ liệu XML lớn (> 160M) để xử lý và có vẻ như phân tích cú pháp SAX/expat/pulldom là cách để đi. Tôi muốn có một luồng mà chuyển qua các nút và đẩy các nút được xử lý lên một hàng đợi, và sau đó các luồng công nhân khác kéo nút sẵn có tiếp theo ra khỏi hàng đợi và xử lý nó.Làm cách nào để xử lý xml không đồng bộ trong python?

tôi có như sau (cần có ổ khóa, tôi biết - nó sẽ, sau)

import sys, time 
import xml.parsers.expat 
import threading 

q = [] 

def start_handler(name, attrs): 
    q.append(name) 

def do_expat(): 
    p = xml.parsers.expat.ParserCreate() 
    p.StartElementHandler = start_handler 
    p.buffer_text = True 
    print("opening {0}".format(sys.argv[1])) 
    with open(sys.argv[1]) as f: 
     print("file is open") 
     p.ParseFile(f) 
     print("parsing complete") 


t = threading.Thread(group=None, target=do_expat) 
t.start() 

while True: 
    print(q) 
    time.sleep(1) 

Vấn đề là cơ thể của khối while được gọi chỉ một lần, và sau đó tôi không thể thậm chí ctrl-C cũng làm gián đoạn nó. Trên các tệp nhỏ hơn, đầu ra là như mong đợi, nhưng điều đó dường như chỉ ra rằng trình xử lý chỉ được gọi khi tài liệu được phân tích cú pháp đầy đủ, dường như đánh bại mục đích của trình phân tích cú pháp SAX.

Tôi chắc chắn đó là sự thiếu hiểu biết của riêng tôi, nhưng tôi không thấy nơi tôi đang phạm sai lầm.

PS: Tôi cũng đã cố gắng thay đổi start_handler như sau:

def start_handler(name, attrs): 
    def app(): 
     q.append(name) 
    u = threading.Thread(group=None, target=app) 
    u.start() 

Không có tình yêu, mặc dù.

Trả lời

7

ParseFile, như bạn đã nhận thấy, chỉ cần "gulps down" mọi thứ - không tốt cho việc phân tích số gia tăng bạn muốn thực hiện! Vì vậy, chỉ cần ăn các tập tin để phân tích cú pháp một chút tại một thời điểm, đảm bảo có điều kiện mang lại quyền kiểm soát để đề khác khi bạn đi - ví dụ:

while True: 
    data = f.read(BUFSIZE) 
    if not data: 
    p.Parse('', True) 
    break 
    p.Parse(data, False) 
    time.sleep(0.0) 

cuộc gọi time.sleep(0.0) là cách Python để nói "năng suất để khác chủ đề nếu có sẵn sàng và chờ đợi "; phương pháp Parse được ghi thành tài liệu here.

Điểm thứ hai là, quên khóa cho việc sử dụng này! - sử dụng Queue.Queue thay vào đó, nó chủ yếu là an toàn và hầu như luôn là cách tốt nhất và đơn giản nhất để phối hợp nhiều luồng trong Python. Chỉ cần tạo một ví dụ Queueq, q.put(name) trên đó và đã chặn chuỗi đang hoạt động trên q.get() đang chờ để có thêm một số việc phải làm - đó là SO đơn giản!

(Có một số chiến lược phụ mà bạn có thể sử dụng để điều phối việc chấm dứt chuỗi công việc khi không có thêm công việc để làm, nhưng yêu cầu đặc biệt đơn giản, vắng mặt là chỉ tạo chuỗi daemon, vì vậy chúng sẽ chấm dứt khi sợi chỉ hoạt động - xem the docs).

+0

Đã bỏ phiếu cho các đề xuất Hàng đợi, nhưng bạn có chắc chắn về việc ParseFile nuốt chửng mọi thứ xuống một lần không? Nó gọi lại vào các trình xử lý Python để xử lý các thẻ khi nó đi, đó là toàn bộ mục đích của phân tích cú pháp SAX ... hay bạn đang nói điều đó là không đủ để kích hoạt một trình chuyển đổi luồng trong Python? –

+1

Nếu bạn muốn SAX, bạn có thể sử dụng xml.sax, xem http://docs.python.org/library/xml.sax.html?highlight=sax#module-xml.sax; OP không sử dụng SAX, nhưng thay vì xml.parsers.expat, một giao diện trừu tượng thấp hơn mà không ** áp đặt một chiến lược gia tăng (nó _supports_ nó, nhưng không _impose_ nó, để nó lên đến mức mã Python để chọn và chọn). –

+0

Sự lựa chọn của người nước ngoài là phần nào tùy ý, tôi không thể tìm thấy một lời giải thích tốt về sự khác biệt giữa người nước ngoài và sax. Các mô-đun sax hoạt động chỉ là tốt - thậm chí có lẽ tốt hơn, vì nó có vẻ là không đồng bộ như tôi cần thiết. Tôi vết thương lên việc áp dụng các "thức ăn nó một đoạn tại một thời gian" phương pháp anyway, vì nó mang lại cho tôi một cơ hội để khử trùng các chuỗi tôi ăn trước khi phân tích cú pháp được cho họ. Câu trả lời rất hữu ích, cảm ơn. – decitrig

1

Điều duy nhất tôi thấy là sai là bạn đang truy cập q đồng thời từ các luồng khác nhau - không khóa khi bạn viết thực sự. Đó là yêu cầu cho sự cố - và bạn có thể gặp rắc rối trong các hình thức của thông dịch viên Python khóa lên trên bạn. :)

Hãy thử khóa, nó thực sự không phải là rất khó khăn:

import sys, time 
import xml.parsers.expat 
import threading 

q = [] 
q_lock = threading.Lock() <--- 

def start_handler(name, attrs): 
    q_lock.acquire() <--- 
    q.append(name) 
    q_lock.release() <--- 

def do_expat(): 
    p = xml.parsers.expat.ParserCreate() 
    p.StartElementHandler = start_handler 
    p.buffer_text = True 
    print("opening {0}".format(sys.argv[1])) 
    with open(sys.argv[1]) as f: 
     print("file is open") 
     p.ParseFile(f) 
     print("parsing complete") 


t = threading.Thread(group=None, target=do_expat) 
t.start() 

while True: 
    q_lock.acquire() <--- 
    print(q) 
    q_lock.release() <--- 
    time.sleep(1) 

Bạn thấy đấy, nó đã thực sự đơn giản, chúng tôi vừa tạo ra một biến lock để bảo vệ đối tượng của chúng tôi, và có được mà khóa mỗi lần trước khi chúng tôi sử dụng các đối tượng và phát hành mỗi lần sau khi chúng tôi hoàn thành nhiệm vụ của chúng tôi trên đối tượng. Bằng cách này, chúng tôi đảm bảo rằng q.append(name) sẽ không bao giờ trùng lặp với print(q).


(Với phiên bản mới của Python đó cũng là một "với ...." cú pháp giúp bạn không để phát hành ổ khóa hoặc các tập tin gần hoặc dọn dẹp khác một thường xuyên quên.)

7

Tôi không quá chắc chắn về vấn đề này. Tôi đoán cuộc gọi đến ParseFile đang chặn và chỉ chuỗi phân tích đang được chạy vì GIL. Thay vào đó, cách này sẽ là sử dụng multiprocessing. Nó được thiết kế để làm việc với hàng đợi, anyway.

Bạn làm cho một Process và bạn có thể vượt qua nó một Queue:

import sys, time 
import xml.parsers.expat 
import multiprocessing 
import Queue 

def do_expat(q): 
    p = xml.parsers.expat.ParserCreate() 

    def start_handler(name, attrs): 
     q.put(name) 

    p.StartElementHandler = start_handler 
    p.buffer_text = True 
    print("opening {0}".format(sys.argv[1])) 
    with open(sys.argv[1]) as f: 
     print("file is open") 
     p.ParseFile(f) 
     print("parsing complete") 

if __name__ == '__main__': 
    q = multiprocessing.Queue() 
    process = multiprocessing.Process(target=do_expat, args=(q,)) 
    process.start() 

    elements = [] 
    while True: 
     while True: 
      try: 
       elements.append(q.get_nowait()) 
      except Queue.Empty: 
       break 

     print elements 
     time.sleep(1) 

tôi đã bao gồm một danh sách các yếu tố, chỉ cần sao chép kịch bản gốc của bạn. Giải pháp cuối cùng của bạn có thể sẽ sử dụng get_nowait và một số Pool hoặc một cái gì đó tương tự.

+1

Vâng, đây là một con đường tốt để đi xuống - như bạn đã nói bạn vẫn muốn sử dụng hàng đợi. –

+0

Tôi đã thử mã đó; nó tránh được việc bị khóa, nhưng ParseFile vẫn không có vẻ như xuất ra bất cứ thứ gì cho đến khi nó đọc toàn bộ đầu vào. – decitrig

0

Tôi không biết nhiều về việc triển khai, nhưng nếu phân tích cú pháp là mã C thực thi cho đến khi hoàn thành, các chuỗi Python khác sẽ không chạy. Nếu trình phân tích cú pháp gọi lại mã Python, thì GIL có thể được giải phóng cho các luồng khác để chạy, nhưng tôi không chắc chắn về điều đó. Bạn có thể muốn kiểm tra các chi tiết đó.

Các vấn đề liên quan