Tôi gặp sự cố tương tự khi đọc biểu đồ từ tệp. Việc xử lý bao gồm việc tính toán ma trận phao 200 000x200 000 (một dòng tại một thời điểm) không phù hợp với bộ nhớ. Cố gắng giải phóng bộ nhớ giữa các tính toán bằng cách sử dụng gc.collect()
cố định khía cạnh liên quan đến bộ nhớ nhưng vấn đề hiệu suất: Tôi không biết tại sao nhưng mặc dù số lượng bộ nhớ đã sử dụng không đổi, mỗi cuộc gọi mới đến gc.collect()
thời gian hơn so với trước đó. Vì vậy, khá nhanh chóng việc thu gom rác chiếm phần lớn thời gian tính toán.
Để khắc phục cả vấn đề về bộ nhớ và hiệu năng tôi đã chuyển sang sử dụng mẹo đa luồng tôi đã đọc ở đâu đó (xin lỗi, tôi không thể tìm thấy bài đăng liên quan nữa). Trước khi tôi đọc từng dòng của tập tin trong một vòng lặp lớn for
, xử lý nó và chạy gc.collect()
mỗi một lần và một thời gian để giải phóng dung lượng bộ nhớ. Bây giờ tôi gọi một hàm đọc và xử lý một đoạn của tập tin trong một chủ đề mới. Khi thread kết thúc, bộ nhớ sẽ tự động được giải phóng mà không có vấn đề về hiệu suất lạ.
Thực tế nó hoạt động như thế này:
from dask import delayed // this module wraps the multithreading
def f(storage, index, chunk_size): // the processing function
// read the chunk of size chunk_size starting at index in the file
// process it using data in storage if needed
// append data needed for further computations to storage
return storage
partial_result = delayed([]) // put into the delayed() the constructor for your data structure
// i personally use "delayed(nx.Graph())" since I am creating a networkx Graph
chunk_size = 100 // ideally you want this as big as possible while still enabling the computations to fit in memory
for index in range(0, len(file), chunk_size):
// we indicates to dask that we will want to apply f to the parameters partial_result, index, chunk_size
partial_result = delayed(f)(partial_result, index, chunk_size)
// no computations are done yet !
// dask will spawn a thread to run f(partial_result, index, chunk_size) once we call partial_result.compute()
// passing the previous "partial_result" variable in the parameters assures a chunk will only be processed after the previous one is done
// it also allows you to use the results of the processing of the previous chunks in the file if needed
// this launches all the computations
result = partial_result.compute()
// one thread is spawned for each "delayed" one at a time to compute its result
// dask then closes the tread, which solves the memory freeing issue
// the strange performance issue with gc.collect() is also avoided
Tại sao không in ra tam giác để một file trung gian, và đọc chúng trở lại trong một lần nữa khi bạn cần đến chúng? –