2013-04-15 48 views
11

Đây phải là câu hỏi thứ ba và cuối cùng của tôi về những nỗ lực tăng hiệu suất trên một số phân tích thống kê mà tôi đang thực hiện với python. Tôi có 2 phiên bản mã của tôi (lõi đơn so với đa xử lý), tôi đã mong đợi đạt được hiệu suất bằng cách sử dụng nhiều lõi như tôi mong đợi mã của tôi giải nén/giải nén một vài chuỗi nhị phân, đáng buồn là tôi nhận thấy hiệu suất thực sự giảm bằng cách sử dụng nhiều lõi.Khả năng xử lý đa xử lý Python

Tôi tự hỏi nếu có ai có lời giải thích có thể cho những gì tôi quan sát (cuộn xuống bản cập nhật ngày 16 tháng 4 để biết thêm thông tin)?

Phần chính của chương trình là hàm numpy_array (+ decode trong đa), đoạn mã dưới đây (mã đầy đủ truy cập qua pastebin, thêm dưới đây):

def numpy_array(data, peaks): 
    rt_counter=0 
    for x in peaks: 
     if rt_counter %(len(peaks)/20) == 0: 
      update_progress() 
     peak_counter=0 
     data_buff=base64.b64decode(x) 
     buff_size=len(data_buff)/4 
     unpack_format=">%dL" % buff_size 
     index=0 
     for y in struct.unpack(unpack_format,data_buff): 
      buff1=struct.pack("I",y) 
      buff2=struct.unpack("f",buff1)[0] 
      if (index % 2 == 0): 
       data[rt_counter][1][peak_counter][0]=float(buff2) 
      else: 
       data[rt_counter][1][peak_counter][1]=float(buff2) 
       peak_counter+=1 
      index+=1 
     rt_counter+=1 

Phiên bản đa thực hiện điều này với một tập hợp các chức năng, tôi sẽ hiển thị phím 2 dưới đây:

def tonumpyarray(mp_arr): 
    return np.frombuffer(mp_arr.get_obj()) 

def numpy_array(shared_arr,peaks): 
    processors=mp.cpu_count() 
    with contextlib.closing(mp.Pool(processes=processors, 
            initializer=pool_init, 
            initargs=(shared_arr,))) as pool: 
     chunk_size=int(len(peaks)/processors) 
     map_parameters=[] 
     for i in range(processors): 
      counter = i*chunk_size 
      chunk=peaks[i*chunk_size:(i+1)*chunk_size] 
      map_parameters.append((chunk, counter)) 
     pool.map(decode,map_parameters) 

def decode ((chunk, counter)): 
    data=tonumpyarray(shared_arr).view(
     [('f0','<f4'), ('f1','<f4',(250000,2))]) 
    for x in chunk: 
     peak_counter=0 
     data_buff=base64.b64decode(x) 
     buff_size=len(data_buff)/4 
     unpack_format=">%dL" % buff_size 
     index=0 
     for y in struct.unpack(unpack_format,data_buff): 
      buff1=struct.pack("I",y) 
      buff2=struct.unpack("f",buff1)[0] 
      #with shared_arr.get_lock(): 
      if (index % 2 == 0): 
       data[counter][1][peak_counter][0]=float(buff2) 
      else: 
       data[counter][1][peak_counter][1]=float(buff2) 
       peak_counter+=1 
      index+=1 
     counter+=1 

mã đầy đủ chương trình có thể được truy cập thông qua những liên kết này pastebin

Pastebin for single core version

Pastebin for multiprocessing version

Việc thực hiện mà tôi đang quan sát với một tập tin chứa 239 timepoints và ~ cặp đo 180k mỗi timepoint là ~ 2.5m cho lõi đơn và ~ 3.5 cho đa xử lý.

PS: Hai câu hỏi trước đó (của đầu tiên của tôi bao giờ cố gắng ở paralellization):

  1. Python multi-processing
  2. Making my NumPy array shared across processes

- 16 tháng tư -

Tôi đã lược tả chương trình của mình bằng t ông thư viện cProfile (có cProfile.run('main()') trong __main__, trong đó cho thấy rằng có 1 bước mà đang chậm lại tất cả mọi thứ xuống:

ncalls tottime percall cumtime percall filename:lineno(function) 
23 85.859 3.733 85.859 3.733 {method 'acquire' of 'thread.lock' objects} 

Điều mà tôi không hiểu ở đây là thread.lock đối tượng được sử dụng trong threading (để hiểu biết của tôi) nhưng không nên được sử dụng trong đa xử lý như mỗi lõi nên chạy một thread duy nhất (bên cạnh việc có cơ chế khóa riêng của nó), vậy làm thế nào nó xảy ra và tại sao một cuộc gọi duy nhất mất 3,7 giây?

+1

bạn có thể chia sẻ liên kết tới câu hỏi trước của mình trong câu hỏi này không? và dán các hàm bạn cho là quan trọng đối với câu hỏi – 0x90

+0

Offcourse, hãy để tôi chỉnh sửa câu hỏi. –

+0

Nó rất tốt có thể có một cái gì đó để làm với GIL. Xem bản trình bày này: http://www.youtube.com/watch?v=Obt-vMVdM8s – alexhb

Trả lời

2

Dữ liệu được chia sẻ là trường hợp chậm lại do đồng bộ hóa đã biết.

Bạn có thể chia dữ liệu giữa các quy trình hoặc cung cấp cho từng quy trình một bản sao độc lập không? Sau đó, các quá trình của bạn sẽ không cần phải đồng bộ hóa bất cứ thứ gì cho đến thời điểm khi tất cả các phép tính được thực hiện.

Sau đó, tôi muốn cho quy trình tổng thể kết nối đầu ra của tất cả bộ xử lý công nhân vào một tập hợp nhất quán.

Cách tiếp cận này có thể mất thêm RAM, nhưng RAM hiện nay rẻ.

Nếu bạn hỏi, tôi cũng bối rối bởi 3700 mili giây cho mỗi lần lấy khóa luồng. Hồ sơ OTOH có thể bị nhầm lẫn về các cuộc gọi đặc biệt như thế này.

+0

Tôi có thể tách mảng dữ liệu thành n bản sao nhưng tôi vẫn muốn hiểu tại sao và cách toàn bộ quá trình mất quá nhiều thời gian. Bạn có khuyên bạn nên sử dụng một cách khác để lược tả? –

0

Ghi đè của bạn trống.

Vấn đề là đa xử lý sử dụng ngã ba nếu nó có sẵn (thay vì sinh sản một con trăn mới). Chia quá trình chia sẻ cùng một env (tập tin mô tả ví dụ). Có thể nó có một số ổ khóa trong số đó.

Dưới đây là một số thất vọng về điều đó: Multiprocessing or os.fork, os.exec?

0

Theo như phần cuối của câu hỏi của bạn, các tài liệu Python về cơ bản nói multiprocessing.lock đó là một bản sao của threading.lock. Có được cuộc gọi trên ổ khóa có thể mất một thời gian dài bởi vì nếu khóa đã được mua lại, nó sẽ chặn cho đến khi khóa được phát hành. Điều này có thể trở thành một vấn đề khi nhiều tiến trình đang cạnh tranh để truy cập vào cùng một dữ liệu, như trong mã của bạn. Bởi vì tôi không thể xem được pastebin của bạn, tôi chỉ có thể đoán chính xác những gì đang diễn ra, nhưng rất có thể, bạn đang xử lý khóa trong một thời gian dài khiến các quá trình khác không chạy, ngay cả khi có nhiều thời gian CPU miễn phí. Điều này không nên bị ảnh hưởng bởi GIL vì điều đó chỉ nên hạn chế các ứng dụng đa luồng, chứ không phải các ứng dụng đa xử lý. Vì vậy, làm thế nào để sửa lỗi này? Tôi đoán là bạn có một số khóa bảo vệ mảng được chia sẻ của bạn đang bị khóa trong khi quá trình đang thực hiện các phép tính chuyên sâu mất một thời gian tương đối dài, do đó chặn quyền truy cập cho các quy trình khác, sau đó chặn khóa lock.acquire() cuộc gọi. Giả sử bạn có đủ RAM, tôi xác nhận mạnh mẽ câu trả lời cho thấy lưu trữ nhiều bản sao của mảng trong không gian địa chỉ của mỗi quá trình. Tuy nhiên, chỉ cần lưu ý rằng việc chuyển các cấu trúc dữ liệu lớn thông qua bản đồ có thể gây tắc nghẽn bất ngờ, vì nó đòi hỏi phải chọn và bỏ đi.

Các vấn đề liên quan