Tôi khá mới với python. Tôi đang sử dụng mô-đun đa xử lý để đọc các dòng văn bản trên stdin, chuyển đổi chúng theo cách nào đó và viết chúng vào cơ sở dữ liệu. Dưới đây là một đoạn mã của tôi:python pool apply_async và map_async không chặn trên hàng đợi đầy đủ
batch = []
pool = multiprocessing.Pool(20)
i = 0
for i, content in enumerate(sys.stdin):
batch.append(content)
if len(batch) >= 10000:
pool.apply_async(insert, args=(batch,i+1))
batch = []
pool.apply_async(insert, args=(batch,i))
pool.close()
pool.join()
Bây giờ tất cả hoạt động tốt, cho đến khi tôi nhận được để xử lý tập tin đầu vào rất lớn (hàng trăm triệu dòng) mà tôi ống vào chương trình python của tôi. Tại một số thời điểm, khi cơ sở dữ liệu của tôi chậm hơn, tôi thấy bộ nhớ đầy.
Sau khi phát một số, hóa ra là pool.apply_async cũng như pool.map_async không bao giờ chặn, để hàng đợi các cuộc gọi được xử lý ngày càng lớn.
Cách tiếp cận chính xác cho vấn đề của tôi là gì? Tôi sẽ mong đợi một tham số mà tôi có thể thiết lập, điều đó sẽ chặn cuộc gọi pool.apply_async, ngay khi đạt đến độ dài hàng đợi nhất định. AFAIR trong Java có thể cung cấp cho ThreadPoolExecutor một BlockingQueue với độ dài cố định cho mục đích đó.
Cảm ơn!
_ "nó bật ra pool.apply_async đó cũng như pool.map_async không bao giờ chặn" _ - tất cả mọi thứ tôi đang tìm kiếm – leon