6

Gần đây, tôi đã bắt đầu thử nghiệm với đa xử lý để tăng tốc công việc. Tôi tạo ra một kịch bản mà không phù hợp chuỗi mờ và tính điểm bằng cách sử dụng các thuật toán khác nhau (tôi muốn so sánh các kỹ thuật phù hợp khác nhau). Bạn có thể tìm nguồn đầy đủ ở đây: https://bitbucket.org/bergonzzi/fuzzy-compare/src. Khi đầu vào, phải mất 2 tệp được kết hợp thành từng cặp (mỗi dòng của tệp1 với mỗi dòng của tệp2). Đối với mỗi cặp, điểm số trận đấu mờ được tính toán.Hàng đợi xử lý đa xử lý Python chậm hơn so với pool.map

Tôi đã tạo 3 phiên bản. Chạy với dữ liệu mẫu được cung cấp trong repo của tôi (trong đó bao gồm 697,340 mục sau khi được kết hợp thành cặp), tôi có timings sau:

  • Đơn giản đơn quá trình - 0:00:47
  • đa tiến sử dụng hồ bơi. bản đồ() - 0:00:13
  • đa tiến sử dụng Queues (sản xuất/mô hình tiêu dùng) - 0:01:04

tôi đang cố gắng để hiểu tại sao Pool.map của tôi() phiên bản là nhanh hơn nhiều so với phiên bản Queue của tôi, nó thực sự chậm hơn phiên bản đơn giản nhất.

Lý do của tôi thậm chí cố gắng sử dụng Hàng đợi là phiên bản Pool.map() giữ nguyên kết quả cho đến khi mọi thứ kết thúc và chỉ ghi vào một tệp ở cuối. Điều này có nghĩa là đối với các tệp lớn, nó kết thúc bằng việc ăn nhiều bộ nhớ. I'm talking about this version (liên kết với nó bởi vì nó có rất nhiều mã để dán ở đây).

To solve this I refactored it into a producer/consumer pattern (hoặc cố gắng ít nhất). Ở đây tôi lần đầu tiên tạo ra công ăn việc làm bằng cách kết hợp cả hai tệp đầu vào và đặt chúng vào một hàng đợi mà quá trình người tiêu dùng (tính toán điểm số phù hợp mờ). Các công việc đã hoàn thành được đưa vào hàng đợi. Sau đó, tôi có một quá trình duy nhất lấy các mục đã thực hiện từ hàng đợi này và ghi chúng vào một tệp. Bằng cách này, về lý thuyết, tôi sẽ không cần nhiều bộ nhớ vì kết quả sẽ bị xóa ra đĩa. Nó có vẻ hoạt động tốt nhưng chậm hơn nhiều. Tôi cũng nhận thấy rằng 4 tiến trình tôi sinh sản dường như không sử dụng hết 100% CPU khi nhìn vào Activity Monitor trên Mac OSX (không phải là trường hợp với phiên bản Pool.map()). Một điều khác tôi nhận thấy là chức năng sản xuất của tôi dường như lấp đầy hàng đợi đúng cách nhưng các quy trình tiêu dùng dường như chờ cho đến khi hàng đợi được lấp đầy thay vì bắt đầu làm việc ngay khi vật phẩm đầu tiên đến. Tôi có thể đang làm sai điều gì đó ...

Để tham khảo, đây là một số mã có liên quan cho phiên bản Hàng đợi (mặc dù tốt hơn là xem mã đầy đủ trong mã được liên kết ở trên).

Dưới đây là chức năng sản xuất của tôi:

def combine(list1, list2): 
    ''' 
    Combine every item of list1 with every item of list 2, 
    normalize put the pair in the job queue. 
    ''' 
    pname = multiprocessing.current_process().name 
    for x in list1: 
     for y in list2: 
      # slugify is a function to normalize the strings 
      term1 = slugify(x.strip(), separator=' ') 
      term2 = slugify(y.strip(), separator=' ') 
      job_queue.put_nowait([term1, term2]) 

Đây là chức năng nhà văn:

def writer(writer_queue): 
    out = open(file_out, 'wb') 
    pname = multiprocessing.current_process().name 
    out.write(header) 
    for match in iter(writer_queue.get, "STOP"): 
     print("%s is writing %s") % (pname, str(match)) 
     line = str(';'.join(match) + '\n') 
     out.write(line) 
    out.close() 

Đây là chức năng lao động mà không tính toán thực tế (tước ra hầu hết các mã kể từ khi nó doesn' t tạo sự khác biệt ở đây, nguồn đầy đủ trên repo):

def score_it(job_queue, writer_queue): 
    '''Calculate scores for pair of words.''' 
    pname = multiprocessing.current_process().name 

    for pair in iter(job_queue.get_nowait, "STOP"): 
     # do all the calculations and put the result into the writer queue 
     writer_queue.put(result) 

Đây là cách tôi thiết lập các quy trình:

# Files 
to_match = open(args.file_to_match).readlines() 
source_list = open(args.file_to_be_matched).readlines() 

workers = 4 
job_queue = multiprocessing.Manager().Queue() 
writer_queue = multiprocessing.Manager().Queue() 
processes = [] 

print('Start matching with "%s", minimum score of %s and %s workers') % (
    args.algorithm, minscore, workers) 

# Fill up job queue 
print("Filling up job queue with term pairs...") 
c = multiprocessing.Process(target=combine, name="Feeder", args=(to_match, source_list)) 
c.start() 
c.join() 

print("Job queue size: %s") % job_queue.qsize() 

# Start writer process 
w = multiprocessing.Process(target=writer, name="Writer", args=(writer_queue,)) 
w.start() 

for w in xrange(workers): 
    p = multiprocessing.Process(target=score_it, args=(job_queue, writer_queue)) 
    p.start() 
    processes.append(p) 
    job_queue.put("STOP") 

for p in processes: 
    p.join() 

writer_queue.put("STOP") 

Tôi đã đọc một chút ở đây về việc xử lý đa chậm hơn đôi khi và tôi biết điều này phải làm với chi phí tạo và quản lý quy trình mới.Ngoài ra khi công việc được thực hiện không đủ lớn, hiệu quả của quá trình đa xử lý có thể không hiển thị. Tuy nhiên trong trường hợp này, tôi nghĩ rằng công việc khá lớn và phiên bản Pool.map() dường như chứng minh điều đó vì nó nhanh hơn nhiều.

Tôi có làm điều gì đó thực sự sai khi quản lý tất cả các quy trình này và chuyển qua đối tượng hàng đợi không? Làm thế nào điều này có thể được tối ưu hóa để kết quả có thể được ghi vào một tập tin khi chúng được xử lý để giảm thiểu số lượng bộ nhớ cần thiết trong khi chạy nó?

Cảm ơn!

+1

Tôi không biết điều gì đang xảy ra với hiệu suất của hệ thống dựa trên hàng đợi của bạn (nhưng tôi chưa quá khó), nhưng để giải quyết vấn đề tiêu thụ bộ nhớ của phiên bản dựa trên 'pool' của bạn, bạn có thể thử sử dụng' pool.imap' để có được một iterator năng suất các giá trị kết quả khi chúng được tính toán bởi các quy trình công nhân. Trong mã 'map' của bạn, chỉ cần hoán đổi' imap' trong 'map' và di chuyển các cuộc gọi' pool.close' và 'pool.join' bên dưới vòng lặp ghi kết quả và bạn có thể được thiết lập! – Blckknght

+0

Cảm ơn rất nhiều vì gợi ý, giải pháp của bạn giải quyết vấn đề bộ nhớ. Với imap mỗi quá trình mất ít hơn 100mb và với bản đồ họ đi lên đến 1GB. Tuy nhiên nó trở nên chậm hơn - trên dữ liệu thử nghiệm của tôi, tôi nhận được 34 giây với imap (so với 13 giây với bản đồ). Bất kỳ ý tưởng tại sao có thể? – bergonzzi

+0

Nó có thể phải làm với tham số 'chunksize'. Nếu tôi hiểu mọi thứ một cách chính xác, 'map' sử dụng các khối lớn hơn theo mặc định hơn' imap', điều này có thể làm giảm đáng kể chi phí nếu chuỗi được ánh xạ quá dài. – Blckknght

Trả lời

2

Tôi nghĩ vấn đề với thời gian của bạn là phiên bản hàng loạt đa luồng của bạn thiếu một tối ưu hóa. Bạn đã thực hiện một nhận xét về cơ bản nói rằng job_queue của bạn đầy lên trước khi các chủ đề công nhân bắt đầu nhận công việc từ nó. Tôi tin rằng lý do cho điều này là c.join() bạn có trong # Điền hàng đợi công việc. Điều này ngăn cản các chủ đề chính tiếp tục cho đến khi hàng đợi công việc đã đầy. Tôi sẽ di chuyển c.join() đến cuối sau p.join(). Bạn cũng sẽ cần phải tìm ra cách để đưa cờ dừng của bạn vào cuối hàng đợi. Các chức năng kết hợp có thể là một nơi tốt để đặt này. Một cái gì đó dọc theo dòng thêm x số cờ dừng sau khi nó hết dữ liệu để kết hợp.

Một điều khác cần lưu ý: Bạn đang viết qua bạn w biến trong phạm vi vòng lặp for của bạn để khởi động quy trình p. Như một vấn đề về phong cách/khả năng đọc/etc, tôi sẽ đổi w thành một tên biến khác. Nếu bạn không sử dụng nó, dấu gạch dưới hoạt động như một tên biến tốt. I.e.

for w in xrange(workers): 

nên trở thành

for _ in xrange(workers): 

câu chuyện dài ngắn, nếu bạn di chuyển c.join() để kết thúc, bạn sẽ nhận được timings chính xác hơn. Hiện tại, điều duy nhất đa luồng là kết hợp mờ của chuỗi. Một trong những ưu điểm của việc tạo ra một chuỗi nhà sản xuất/người tiêu dùng là chủ đề của người tiêu dùng không phải đợi cho đến khi chuỗi nhà sản xuất kết thúc, và do đó, bạn sẽ sử dụng ít bộ nhớ hơn.

+0

Cảm ơn bạn đã tip! Nó có ý nghĩa để tham gia vào quá trình "nạp công việc" ở cuối ... Tôi nghĩ rằng tôi vẫn còn bối rối về những gì join() thực sự đã làm, nó rõ ràng bây giờ với tôi. Những gì tôi vẫn không hiểu rất rõ là gợi ý của bạn "Bạn cũng sẽ cần phải tìm ra cách để đưa cờ dừng của bạn vào cuối hàng đợi. Chức năng kết hợp có thể là một nơi tốt để đặt điều này. dòng thêm x số cờ dừng sau khi hết dữ liệu để kết hợp. " - bạn có ý nghĩa gì chính xác? Tại sao x số lá cờ dừng? Và làm thế nào tôi có thể phát hiện ra rằng tôi không có thêm dữ liệu để kết hợp? – bergonzzi

+0

Khi dữ liệu của bạn đến từ các tệp, bất cứ khi nào các tệp hết, bạn biết rằng bạn không có thêm dữ liệu để kết hợp. Chủ đề tiêu dùng của bạn sẽ cần một số loại tín hiệu để biết thời điểm tắt máy, do đó việc thực hiện cờ "dừng". Bất cứ khi nào một chuỗi tiêu thụ kéo một công việc ra khỏi hàng đợi và đó là một "STOP" hoặc một Không hoặc một cái gì đó tương tự (bạn sẽ phải chọn những gì tốt nhất cho ứng dụng của bạn) thread biết nó có thể tắt. Làm thế nào tôi đã làm điều này trên một ứng dụng tương tự là chủ đề sản xuất của tôi, một khi nó đã được thực hiện tạo việc làm, sản xuất dừng cờ bằng số lượng chủ đề tiêu dùng. –

Các vấn đề liên quan