2017-11-11 17 views

Trả lời

12

Đây là giải pháp sử dụng (loại thử nghiệm) Base.Threads mô-đun luồng.

Giải pháp sử dụng pmap (v.v) đối với phân phối song song sẽ giống nhau. Mặc dù tôi nghĩ rằng chi phí liên lạc liên ngành sẽ làm tổn thương bạn.

Ý tưởng là sắp xếp nó theo khối (một trên mỗi sợi), vì vậy mỗi luồng có thể hoàn toàn độc lập, chỉ cần chăm sóc các khối của nó.

Sau đó, hợp nhất các khối được sắp xếp trước này.

Đây là vấn đề khá nổi tiếng khi hợp nhất danh sách được sắp xếp. Xem thêm questions về điều đó.

Và đừng quên tự thiết lập đa luồng, bằng cách đặt biến môi trường JULIA_NUM_THREADS trước khi bạn bắt đầu.

Đây là mã của tôi:

using Base.Threads 

function blockranges(nblocks, total_len) 
    rem = total_len % nblocks 
    main_len = div(total_len, nblocks) 

    starts=Int[1] 
    ends=Int[] 
    for ii in 1:nblocks 
     len = main_len 
     if rem>0 
      len+=1 
      rem-=1 
     end 
     push!(ends, starts[end]+len-1) 
     push!(starts, ends[end] + 1) 
    end 
    @assert ends[end] == total_len 
    starts[1:end-1], ends 
end 

function threadedsort!(data::Vector) 
    starts, ends = blockranges(nthreads(), length(data)) 

    # Sort each block 
    @threads for (ss, ee) in collect(zip(starts, ends)) 
     @inbounds sort!(@view data[ss:ee]) 
    end 


    # Go through each sorted block taking out the smallest item and putting it in the new array 
    # This code could maybe be optimised. see https://stackoverflow.com/a/22057372/179081 
    ret = similar(data) # main bit of allocation right here. avoiding it seems expensive. 
    # Need to not overwrite data we haven't read yet 
    @inbounds for ii in eachindex(ret) 
     minblock_id = 1 
     ret[ii]=data[starts[1]] 
     @inbounds for blockid in 2:endof(starts) # findmin allocates a lot for some reason, so do the find by hand. (maybe use findmin! ?) 
      ele = data[starts[blockid]] 
      if ret[ii] > ele 
       ret[ii] = ele 
       minblock_id = blockid 
      end 
     end 
     starts[minblock_id]+=1 # move the start point forward 
     if starts[minblock_id] > ends[minblock_id] 
      deleteat!(starts, minblock_id) 
      deleteat!(ends, minblock_id) 
     end 
    end 
    data.=ret # copy back into orignal as we said we would do it inplace 
    return data 
end 

Tôi đã làm một số điểm chuẩn:

using Plots 
function evaluate_timing(range) 
    sizes = Int[] 
    threadsort_times = Float64[] 
    sort_times = Float64[] 
     for sz in 2.^collect(range) 
      data_orig = rand(Int, sz) 
      push!(sizes, sz) 

      data = copy(data_orig) 
      push!(sort_times,  @elapsed sort!(data)) 

      data = copy(data_orig) 
      push!(threadsort_times, @elapsed threadedsort!(data)) 

      @show (sz, sort_times[end], threadsort_times[end]) 
    end 
    return sizes, threadsort_times, sort_times 
end 

sizes, threadsort_times, sort_times = evaluate_timing(0:28) 
plot(sizes, [threadsort_times sort_times]; title="Sorting Time", ylabel="time(s)", xlabel="number of elements", label=["threadsort!" "sort!"]) 
plot(sizes, [threadsort_times sort_times]; title="Sorting Time", ylabel="time(s)", xlabel="number of elements", label=["threadsort!" "sort!"], xscale=:log10, yscale=:log10) 

kết quả của tôi: sử dụng 8 chủ đề.

plot normal scale plot loglog scale

tôi thấy điểm chéo là thấp đáng ngạc nhiên, một chút so với 1024. Ghi chú rằng thời gian dài ban đầu được thực hiện có thể được bỏ qua - đó là mã là JIT biên soạn cho người đầu tiên chạy.

Thật kỳ lạ, những kết quả này không tái tạo khi sử dụng BenchmarkTools. Các công cụ điểm chuẩn sẽ dừng tính thời gian ban đầu được tính. Nhưng chúng thường xuyên tái tạo khi sử dụng mã thời gian bình thường như tôi có trong mã điểm chuẩn ở trên. Tôi đoán nó đang làm cái gì mà giết đa luồng một số cách

Big nhờ @xiaodai người chỉ ra một sai lầm trong mã phân tích của tôi

1

tôi đã tiếp tục thử nghiệm nếu chỉ có 1% các mặt hàng là duy nhất và enter image description here cũng lấy mẫu từ 1:1_000_000. Kết quả là dưới

enter image description here evaluate_timing_w_repeats function (range) kích thước = Int [] threadsort_times = Float64 [] sort_times = Float64 [] cho sz trong 2.^thu thập (range) data_orig = rand (rand (Int, sz ÷ 100), sz) đẩy! (Kích cỡ, sz)

  data = copy(data_orig) 
      push!(sort_times,  @elapsed sort!(data)) 

      data = copy(data_orig) 
      push!(threadsort_times, @elapsed threadedsort!(data)) 

      @show (sz, sort_times[end], threadsort_times[end]) 
    end 
    return sizes, threadsort_times, sort_times 
end 

sizes, threadsort_times, sort_times = evaluate_timing_w_repeats(7:28) 
plot(sizes, [threadsort_times sort_times]; title="Sorting Time", ylabel="time(s)", xlabel="number of elements", label=["threadsort!" "sort!"]) 
plot(sizes, [threadsort_times sort_times]; title="Sorting Time", ylabel="time(s)", xlabel="number of elements", label=["threadsort!" "sort!"], xscale=:log10, yscale=:log10) 
savefig("sort_with_repeats.png") 

function evaluate_timing1m(range) 
    sizes = Int[] 
    threadsort_times = Float64[] 
    sort_times = Float64[] 
     for sz in 2.^collect(range) 
      data_orig = rand(1:1_000_000, sz) 
      push!(sizes, sz) 

      data = copy(data_orig) 
      push!(sort_times,  @elapsed sort!(data)) 

      data = copy(data_orig) 
      push!(threadsort_times, @elapsed threadedsort!(data)) 

      @show (sz, sort_times[end], threadsort_times[end]) 
    end 
    return sizes, threadsort_times, sort_times 
end 

sizes, threadsort_times, sort_times = evaluate_timing1m(7:28) 
plot(sizes, [threadsort_times sort_times]; title="Sorting Time", ylabel="time(s)", xlabel="number of elements", label=["threadsort!" "sort!"]) 
plot(sizes, [threadsort_times sort_times]; title="Sorting Time sampel from 1:1_000_000", ylabel="time(s)", xlabel="number of elements", label=["threadsort!" "sort!"], xscale=:log10, yscale=:log10) 
savefig("sort1m.png") 
Các vấn đề liên quan