Tôi đang sử dụng một biến phát sóng khoảng 100 MB ngâm trong kích thước, mà tôi đang xấp xỉ với:Mẹo để sử dụng đúng các biến phát rộng lớn?
>>> data = list(range(int(10*1e6)))
>>> import cPickle as pickle
>>> len(pickle.dumps(data))
98888896
Chạy trên một cluster với 3 Chấp hành viên c3.2xlarge, và một tài xế m3.large, với sau lệnh tung ra phiên tương tác:
IPYTHON=1 pyspark --executor-memory 10G --driver-memory 5G --conf spark.driver.maxResultSize=5g
Trong một RDD, nếu tôi kiên trì một tham chiếu đến biến phát sóng này, các phát nổ sử dụng bộ nhớ. Đối với 100 tài liệu tham khảo đến một biến 100 MB, ngay cả khi nó đã được sao chép 100 lần, tôi mong đợi việc sử dụng dữ liệu không quá 10 GB tổng số (hãy để một mình 30 GB trên 3 nút). Tuy nhiên, tôi thấy ra lỗi bộ nhớ khi tôi chạy thử nghiệm sau đây:
data = list(range(int(10*1e6)))
metadata = sc.broadcast(data)
ids = sc.parallelize(zip(range(100), range(100)))
joined_rdd = ids.mapValues(lambda _: metadata.value)
joined_rdd.persist()
print('count: {}'.format(joined_rdd.count()))
Các stack trace:
TaskSetManager: Lost task 17.3 in stage 0.0 (TID 75, 10.22.10.13):
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/lib/spark/python/pyspark/rdd.py", line 317, in func
return f(iterator)
File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <lambda>
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <genexpr>
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in load_stream
yield self._read_with_length(stream)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
MemoryError
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
16/05/25 23:57:15 ERROR TaskSetManager: Task 17 in stage 0.0 failed 4 times; aborting job
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-1-7a262fdfa561> in <module>()
7 joined_rdd.persist()
8 print('persist called')
----> 9 print('count: {}'.format(joined_rdd.count()))
/usr/lib/spark/python/pyspark/rdd.py in count(self)
1004 3
1005 """
-> 1006 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
1007
1008 def stats(self):
/usr/lib/spark/python/pyspark/rdd.py in sum(self)
995 6.0
996 """
--> 997 return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
998
999 def count(self):
/usr/lib/spark/python/pyspark/rdd.py in fold(self, zeroValue, op)
869 # zeroValue provided to each partition is unique from the one provided
870 # to the final reduce call
--> 871 vals = self.mapPartitions(func).collect()
872 return reduce(op, vals, zeroValue)
873
/usr/lib/spark/python/pyspark/rdd.py in collect(self)
771 """
772 with SCCallSiteSync(self.context) as css:
--> 773 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
774 return list(_load_from_socket(port, self._jrdd_deserializer))
775
/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Tôi đã nhìn thấy bài trước về cách sử dụng bộ nhớ của dưa deserialization là một vấn đề. Tuy nhiên, tôi sẽ mong đợi một biến phát sóng chỉ được deserialized (và được nạp vào bộ nhớ trên một thực thi) một lần, và tham khảo tiếp theo để .value
để tham chiếu rằng địa chỉ trong bộ nhớ. Đó không phải là trường hợp, tuy nhiên. Tui bỏ lỡ điều gì vậy?
Ví dụ tôi đã thấy với các biến phát sóng có các từ điển, được sử dụng một lần để chuyển đổi tập hợp dữ liệu (nghĩa là thay thế từ viết tắt tên sân bay bằng tên sân bay). Động lực thúc đẩy họ ở đây là tạo ra các đối tượng với kiến thức về một biến phát sóng và cách tương tác với nó, duy trì các đối tượng đó và thực hiện nhiều tính toán bằng cách sử dụng chúng (với tia lửa chăm sóc giữ chúng trong bộ nhớ).
Một số mẹo để sử dụng các biến phát rộng (100 MB +) là gì? Có tồn tại một biến phát sóng sai lầm không? Đây có phải là vấn đề có thể xảy ra với PySpark không?
Cảm ơn bạn! Trợ giúp của bạn được đánh giá cao.
Lưu ý, tôi cũng đã đăng câu hỏi này trên databricks forums
Edit - Câu hỏi của followup:
Có ý kiến cho rằng serializer Spark mặc định có kích thước lô 65337. Đối tượng đăng trên khác nhau các lô không được xác định là giống nhau và được chỉ định các địa chỉ bộ nhớ khác nhau, được kiểm tra tại đây thông qua hàm dựng sẵn id
. Tuy nhiên, ngay cả với một biến phát sóng lớn hơn mà theo lý thuyết sẽ có 256 lô để tuần tự hóa, tôi vẫn chỉ thấy 2 bản sao riêng biệt. Tôi không nên thấy nhiều hơn? Là sự hiểu biết của tôi về cách serialization hàng loạt hoạt động không chính xác?
>>> sc.serializer.bestSize
65536
>>> import cPickle as pickle
>>> broadcast_data = {k: v for (k, v) in enumerate(range(int(1e6)))}
>>> len(pickle.dumps(broadcast_data))
16777786
>>> len(pickle.dumps({k: v for (k, v) in enumerate(range(int(1e6)))}))/sc.serializer.bestSize
256
>>> bd = sc.broadcast(broadcast_data)
>>> rdd = sc.parallelize(range(100), 1).map(lambda _: bd.value)
>>> rdd.map(id).distinct().count()
1
>>> rdd.cache().count()
100
>>> rdd.map(id).distinct().count()
2
Bạn có nhớ nhận xét thêm về sự cân bằng giữa các chiến lược tuần tự hóa không? Với kích thước hàng loạt tăng lên, chúng ta có nên mong đợi nhiều bộ nhớ hơn cần thiết để tuần tự hóa không? Làm thế nào nó sẽ ảnh hưởng đến tốc độ serialization? Tại sao một trong những sẽ chọn một serializer mà không thể serialize các đối tượng tùy ý? – captaincapsaicin
Vâng, phân vùng đầy đủ có thể không phù hợp trong bộ nhớ vì vậy nếu lô là vô hạn không có đảm bảo nó sẽ thành công. Thats cho người mới bắt đầu. Sử dụng bộ nhớ cao hơn có thể dẫn đến các vấn đề GC khác nhau. Về câu hỏi cuối cùng của bạn khá nhiều không serializer có thể xử lý một đối tượng tùy ý, đặc biệt là nếu điều này là interfacing với một mã nguồn gốc. Có một số cấu trúc ngôn ngữ không thể được tuần tự hóa theo mặc định bởi thiết kế (như biểu thức lambda) và yêu cầu các công cụ chuyên biệt. Mặt khác, việc đóng gói serialization phức tạp có thể chậm. – zero323
Tôi cũng đã chỉnh sửa câu hỏi tiếp theo cho câu hỏi ban đầu của mình. Bạn có thể xem xét và làm rõ cách kích thước lô tương quan với số lượng các đối tượng riêng biệt mà chúng ta thấy trong bộ nhớ Spark? – captaincapsaicin