2016-07-01 13 views
6

Tôi bắt đầu nhiều quy trình để tạo danh sách các đối tượng mới. htop cho tôi thấy trong khoảng từ 1 đến 4 quy trình (tôi luôn tạo 3 đối tượng mới).Python3: Đa xử lý tiêu thụ nhiều RAM và làm chậm

def foo(self): 
    with multiprocessing.Pool(processes=3, maxtasksperchild=10) as pool: 
     result = pool.map_async(self.new_obj, self.information) 
     self.new_objs = result.get() 
     pool.terminate() 
    gc.collect() 

tôi gọi foo() nhiều lần, mỗi lần nó được gọi là, toàn bộ quá trình đang chạy chậm hơn, chương trình không thậm chí kết thúc cuối cùng, vì nó chậm đến nhiều. Chương trình bắt đầu ăn hết RAM của tôi, trong khi cách tiếp cận tuần tự không có bất kỳ việc sử dụng RAM đáng kể nào.

Khi tôi giết chương trình, phần lớn thời gian là chức năng mà chương trình đã thực hiện lần cuối.

->File "threading.py", line 293, in wait 
    waiter.acquire() 

Sửa Để đưa ra một số thông tin về hoàn cảnh của tôi. Tôi tạo ra một cây làm bằng các nút. foo() được gọi bởi một nút cha để tạo các nút con của nó. result được trả về bởi các quá trình là các nút con này. Chúng được lưu trong danh sách ở nút cha. Tôi muốn song song việc tạo ra các nút con đó thay vì tạo chúng theo cách tuần tự.

Trả lời

2

Tôi nghĩ rằng vấn đề của bạn chủ yếu liên quan đến thực tế là hàm song song của bạn là một phương thức của đối tượng. Thật khó để chắc chắn không có thêm thông tin, nhưng xem xét việc này ít chương trình đồ chơi:

import multiprocessing as mp 
import numpy as np 
import gc 


class Object(object): 
    def __init__(self, _): 
     self.data = np.empty((100, 100, 100), dtype=np.float64) 


class Container(object): 
    def __new__(cls): 
     self = object.__new__(cls) 
     print("Born") 
     return self 

    def __init__(self): 
     self.objects = [] 

    def foo(self): 
     with mp.Pool(processes=3, maxtasksperchild=10) as pool: 
      result = pool.map_async(self.new_obj, range(50)) 
      self.objects.extend(result.get()) 
      pool.terminate() 
     gc.collect() 

    def new_obj(self, i): 
     return Object(i) 

    def __del__(self): 
     print("Dead") 


if __name__ == '__main__': 
    c = Container() 
    for j in range(5): 
     c.foo() 

Bây giờ Container được gọi là một lần duy nhất, vì vậy mà bạn mong đợi để xem một "Born", tiếp theo là một "Dead" được in ra; nhưng vì mã đang được thực thi bởi các quy trình là phương pháp của vùng chứa, điều này có nghĩa là toàn bộ container toàn bộ vùng chứa phải được thực thi ở nơi khác! Chạy này, bạn sẽ thấy một dòng xen kẽ "Born""Dead" như container của bạn đang được xây dựng lại trên mỗi thực hiện của bản đồ:

Born 
Born 
Born 
Born 
Born 
Dead 
Born 
Dead 
Dead 
Born 
Dead 
Born 
... 
<MANY MORE LINES HERE> 
... 
Born 
Dead 

Để thuyết phục bản thân rằng toàn bộ chứa đang được sao chép và gửi khoảng mỗi lần, hãy cố gắng thiết lập một số giá trị không serialisable:

def foo(self): 
    with mp.Pool(processes=3, maxtasksperchild=10) as pool: 
     result = pool.map_async(self.new_obj, range(50)) 
     self.fn = lambda x: x**2 
     self.objects.extend(result.get()) 
     pool.terminate() 
    gc.collect() 

nào ngay lập tức sẽ nâng cao một AttributeError vì nó không thể serialise container.

Hãy tổng hợp: khi gửi 1000 yêu cầu tới hồ bơi, Container sẽ được đăng, được gửi đến quy trình và được deserialised tại đó 1000 lần. Chắc chắn, cuối cùng họ sẽ bị loại bỏ (giả sử không có quá nhiều tham chiếu chéo kì lạ), nhưng điều đó chắc chắn sẽ gây áp lực lên RAM, vì đối tượng được tuần tự hóa, gọi là, cập nhật, reserialised ... cho mọi trong phần tử được ánh xạ của bạn.

Bạn có thể giải quyết điều đó bằng cách nào? Vâng, lý tưởng, không chia sẻ trạng thái:

def new_obj(_): 
    return Object(_) 


class Container(object): 
    def __new__(cls): 
     self = object.__new__(cls) 
     print("Born") 
     return self 

    def __init__(self): 
     self.objects = [] 

    def foo(self): 
     with mp.Pool(processes=3, maxtasksperchild=10) as pool: 
      result = pool.map_async(new_obj, range(50)) 
      self.objects.extend(result.get()) 
      pool.terminate() 
     gc.collect() 

    def __del__(self): 
     print("Dead") 

này hoàn thành trong một khoảng thời gian ngắn, và chỉ tạo ra khí cầu nhỏ nhất trên RAM (như là một đơn Container được từng được xây dựng).Nếu bạn cần một số trạng thái nội bộ được chuyển vào đó, hãy trích xuất và gửi chỉ:

def new_obj(tup): 
    very_important_state, parameters = tup 
    return Object(very_important_state=very_important_state, 
        parameters=parameters) 


class Container(object): 
    def __new__(cls): 
     self = object.__new__(cls) 
     print("Born") 
     return self 

    def __init__(self): 
     self.objects = [] 

    def foo(self): 
     important_state = len(self.objects) 
     with mp.Pool(processes=3, maxtasksperchild=10) as pool: 
      result = pool.map_async(new_obj, 
            ((important_state, i) for i in range(50))) 
      self.objects.extend(result.get()) 
      pool.terminate() 
     gc.collect() 

    def __del__(self): 
     print("Dead") 

Điều này có cùng hành vi như trước. Nếu bạn hoàn toàn không thể tránh chia sẻ một số trạng thái có thể thay đổi giữa các quy trình, hãy thanh toán the multiprocessing tools để làm điều đó mà không phải sao chép mọi thứ ở mọi nơi.

+0

Vui lòng xem chỉnh sửa của tôi. Vì vậy, nếu tôi hiểu bạn đúng, tôi cần phải gọi một phương pháp bên ngoài bên ngoài đối tượng của tôi trong mỗi quá trình? – Jonas

+0

Hàm song song 'self.new_obj', là một phương thức của đối tượng, _requires_ toàn bộ nút cha được nối tiếp và gửi xung quanh trên mỗi cuộc gọi; nếu bạn có thể trích xuất phương thức đó để hàm _function_ 'new_obj (...)' trả về một nút đơn giản ('đơn giản, mồ côi', 'không quốc tịch') và 'foo' phụ trách liên kết nó (thêm phụ huynh <-> con, v.v. .. nhưng trong _calling process_), toàn bộ vấn đề này có khả năng biến mất: các tiến trình con yêu cầu chỉ một trạng thái minmal được gửi đi xung quanh. – val

Các vấn đề liên quan