Gần đây, tôi đã bắt đầu thử nghiệm với đa xử lý để tăng tốc công việc. Tôi tạo ra một kịch bản mà không phù hợp chuỗi mờ và tính điểm bằng cách sử dụng các thuật toán khác nhau (tôi muốn so sánh các kỹ thuật phù hợp khác nhau). Bạn có thể tìm nguồn đầy đủ ở đây: https://bitbucket.org/bergonzzi/fuzzy-compare/src. Khi đầu vào, phải mất 2 tệp được kết hợp thành từng cặp (mỗi dòng của tệp1 với mỗi dòng của tệp2). Đối với mỗi cặp, điểm số trận đấu mờ được tính toán.Hàng đợi xử lý đa xử lý Python chậm hơn so với pool.map
Tôi đã tạo 3 phiên bản. Chạy với dữ liệu mẫu được cung cấp trong repo của tôi (trong đó bao gồm 697,340 mục sau khi được kết hợp thành cặp), tôi có timings sau:
- Đơn giản đơn quá trình - 0:00:47
- đa tiến sử dụng hồ bơi. bản đồ() - 0:00:13
- đa tiến sử dụng Queues (sản xuất/mô hình tiêu dùng) - 0:01:04
tôi đang cố gắng để hiểu tại sao Pool.map của tôi() phiên bản là nhanh hơn nhiều so với phiên bản Queue của tôi, nó thực sự chậm hơn phiên bản đơn giản nhất.
Lý do của tôi thậm chí cố gắng sử dụng Hàng đợi là phiên bản Pool.map() giữ nguyên kết quả cho đến khi mọi thứ kết thúc và chỉ ghi vào một tệp ở cuối. Điều này có nghĩa là đối với các tệp lớn, nó kết thúc bằng việc ăn nhiều bộ nhớ. I'm talking about this version (liên kết với nó bởi vì nó có rất nhiều mã để dán ở đây).
To solve this I refactored it into a producer/consumer pattern (hoặc cố gắng ít nhất). Ở đây tôi lần đầu tiên tạo ra công ăn việc làm bằng cách kết hợp cả hai tệp đầu vào và đặt chúng vào một hàng đợi mà quá trình người tiêu dùng (tính toán điểm số phù hợp mờ). Các công việc đã hoàn thành được đưa vào hàng đợi. Sau đó, tôi có một quá trình duy nhất lấy các mục đã thực hiện từ hàng đợi này và ghi chúng vào một tệp. Bằng cách này, về lý thuyết, tôi sẽ không cần nhiều bộ nhớ vì kết quả sẽ bị xóa ra đĩa. Nó có vẻ hoạt động tốt nhưng chậm hơn nhiều. Tôi cũng nhận thấy rằng 4 tiến trình tôi sinh sản dường như không sử dụng hết 100% CPU khi nhìn vào Activity Monitor trên Mac OSX (không phải là trường hợp với phiên bản Pool.map()). Một điều khác tôi nhận thấy là chức năng sản xuất của tôi dường như lấp đầy hàng đợi đúng cách nhưng các quy trình tiêu dùng dường như chờ cho đến khi hàng đợi được lấp đầy thay vì bắt đầu làm việc ngay khi vật phẩm đầu tiên đến. Tôi có thể đang làm sai điều gì đó ...
Để tham khảo, đây là một số mã có liên quan cho phiên bản Hàng đợi (mặc dù tốt hơn là xem mã đầy đủ trong mã được liên kết ở trên).
Dưới đây là chức năng sản xuất của tôi:
def combine(list1, list2):
'''
Combine every item of list1 with every item of list 2,
normalize put the pair in the job queue.
'''
pname = multiprocessing.current_process().name
for x in list1:
for y in list2:
# slugify is a function to normalize the strings
term1 = slugify(x.strip(), separator=' ')
term2 = slugify(y.strip(), separator=' ')
job_queue.put_nowait([term1, term2])
Đây là chức năng nhà văn:
def writer(writer_queue):
out = open(file_out, 'wb')
pname = multiprocessing.current_process().name
out.write(header)
for match in iter(writer_queue.get, "STOP"):
print("%s is writing %s") % (pname, str(match))
line = str(';'.join(match) + '\n')
out.write(line)
out.close()
Đây là chức năng lao động mà không tính toán thực tế (tước ra hầu hết các mã kể từ khi nó doesn' t tạo sự khác biệt ở đây, nguồn đầy đủ trên repo):
def score_it(job_queue, writer_queue):
'''Calculate scores for pair of words.'''
pname = multiprocessing.current_process().name
for pair in iter(job_queue.get_nowait, "STOP"):
# do all the calculations and put the result into the writer queue
writer_queue.put(result)
Đây là cách tôi thiết lập các quy trình:
# Files
to_match = open(args.file_to_match).readlines()
source_list = open(args.file_to_be_matched).readlines()
workers = 4
job_queue = multiprocessing.Manager().Queue()
writer_queue = multiprocessing.Manager().Queue()
processes = []
print('Start matching with "%s", minimum score of %s and %s workers') % (
args.algorithm, minscore, workers)
# Fill up job queue
print("Filling up job queue with term pairs...")
c = multiprocessing.Process(target=combine, name="Feeder", args=(to_match, source_list))
c.start()
c.join()
print("Job queue size: %s") % job_queue.qsize()
# Start writer process
w = multiprocessing.Process(target=writer, name="Writer", args=(writer_queue,))
w.start()
for w in xrange(workers):
p = multiprocessing.Process(target=score_it, args=(job_queue, writer_queue))
p.start()
processes.append(p)
job_queue.put("STOP")
for p in processes:
p.join()
writer_queue.put("STOP")
Tôi đã đọc một chút ở đây về việc xử lý đa chậm hơn đôi khi và tôi biết điều này phải làm với chi phí tạo và quản lý quy trình mới.Ngoài ra khi công việc được thực hiện không đủ lớn, hiệu quả của quá trình đa xử lý có thể không hiển thị. Tuy nhiên trong trường hợp này, tôi nghĩ rằng công việc khá lớn và phiên bản Pool.map() dường như chứng minh điều đó vì nó nhanh hơn nhiều.
Tôi có làm điều gì đó thực sự sai khi quản lý tất cả các quy trình này và chuyển qua đối tượng hàng đợi không? Làm thế nào điều này có thể được tối ưu hóa để kết quả có thể được ghi vào một tập tin khi chúng được xử lý để giảm thiểu số lượng bộ nhớ cần thiết trong khi chạy nó?
Cảm ơn!
Tôi không biết điều gì đang xảy ra với hiệu suất của hệ thống dựa trên hàng đợi của bạn (nhưng tôi chưa quá khó), nhưng để giải quyết vấn đề tiêu thụ bộ nhớ của phiên bản dựa trên 'pool' của bạn, bạn có thể thử sử dụng' pool.imap' để có được một iterator năng suất các giá trị kết quả khi chúng được tính toán bởi các quy trình công nhân. Trong mã 'map' của bạn, chỉ cần hoán đổi' imap' trong 'map' và di chuyển các cuộc gọi' pool.close' và 'pool.join' bên dưới vòng lặp ghi kết quả và bạn có thể được thiết lập! – Blckknght
Cảm ơn rất nhiều vì gợi ý, giải pháp của bạn giải quyết vấn đề bộ nhớ. Với imap mỗi quá trình mất ít hơn 100mb và với bản đồ họ đi lên đến 1GB. Tuy nhiên nó trở nên chậm hơn - trên dữ liệu thử nghiệm của tôi, tôi nhận được 34 giây với imap (so với 13 giây với bản đồ). Bất kỳ ý tưởng tại sao có thể? – bergonzzi
Nó có thể phải làm với tham số 'chunksize'. Nếu tôi hiểu mọi thứ một cách chính xác, 'map' sử dụng các khối lớn hơn theo mặc định hơn' imap', điều này có thể làm giảm đáng kể chi phí nếu chuỗi được ánh xạ quá dài. – Blckknght