2016-05-26 20 views
5

Tôi đang sử dụng một biến phát sóng khoảng 100 MB ngâm trong kích thước, mà tôi đang xấp xỉ với:Mẹo để sử dụng đúng các biến phát rộng lớn?

>>> data = list(range(int(10*1e6))) 
>>> import cPickle as pickle 
>>> len(pickle.dumps(data)) 
98888896 

Chạy trên một cluster với 3 Chấp hành viên c3.2xlarge, và một tài xế m3.large, với sau lệnh tung ra phiên tương tác:

IPYTHON=1 pyspark --executor-memory 10G --driver-memory 5G --conf spark.driver.maxResultSize=5g 

Trong một RDD, nếu tôi kiên trì một tham chiếu đến biến phát sóng này, các phát nổ sử dụng bộ nhớ. Đối với 100 tài liệu tham khảo đến một biến 100 MB, ngay cả khi nó đã được sao chép 100 lần, tôi mong đợi việc sử dụng dữ liệu không quá 10 GB tổng số (hãy để một mình 30 GB trên 3 nút). Tuy nhiên, tôi thấy ra lỗi bộ nhớ khi tôi chạy thử nghiệm sau đây:

data = list(range(int(10*1e6))) 
metadata = sc.broadcast(data) 
ids = sc.parallelize(zip(range(100), range(100))) 
joined_rdd = ids.mapValues(lambda _: metadata.value) 
joined_rdd.persist() 
print('count: {}'.format(joined_rdd.count())) 

Các stack trace:

TaskSetManager: Lost task 17.3 in stage 0.0 (TID 75, 10.22.10.13): 

org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main 
    process() 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func 
    return func(split, prev_func(split, iterator)) 
    File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func 
    return func(split, prev_func(split, iterator)) 
    File "/usr/lib/spark/python/pyspark/rdd.py", line 317, in func 
    return f(iterator) 
    File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <lambda> 
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
    File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <genexpr> 
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in load_stream 
    yield self._read_with_length(stream) 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length 
    return self.loads(obj) 
    File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads 
    return pickle.loads(obj) 
MemoryError 


    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138) 
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:264) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

16/05/25 23:57:15 ERROR TaskSetManager: Task 17 in stage 0.0 failed 4 times; aborting job 
--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
<ipython-input-1-7a262fdfa561> in <module>() 
     7 joined_rdd.persist() 
     8 print('persist called') 
----> 9 print('count: {}'.format(joined_rdd.count())) 

/usr/lib/spark/python/pyspark/rdd.py in count(self) 
    1004   3 
    1005   """ 
-> 1006   return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
    1007 
    1008  def stats(self): 

/usr/lib/spark/python/pyspark/rdd.py in sum(self) 
    995   6.0 
    996   """ 
--> 997   return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) 
    998 
    999  def count(self): 

/usr/lib/spark/python/pyspark/rdd.py in fold(self, zeroValue, op) 
    869   # zeroValue provided to each partition is unique from the one provided 
    870   # to the final reduce call 
--> 871   vals = self.mapPartitions(func).collect() 
    872   return reduce(op, vals, zeroValue) 
    873 

/usr/lib/spark/python/pyspark/rdd.py in collect(self) 
    771   """ 
    772   with SCCallSiteSync(self.context) as css: 
--> 773    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 
    774   return list(_load_from_socket(port, self._jrdd_deserializer)) 
    775 

/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 

    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:207) 
    at java.lang.Thread.run(Thread.java:745) 

Tôi đã nhìn thấy bài trước về cách sử dụng bộ nhớ của dưa deserialization là một vấn đề. Tuy nhiên, tôi sẽ mong đợi một biến phát sóng chỉ được deserialized (và được nạp vào bộ nhớ trên một thực thi) một lần, và tham khảo tiếp theo để .value để tham chiếu rằng địa chỉ trong bộ nhớ. Đó không phải là trường hợp, tuy nhiên. Tui bỏ lỡ điều gì vậy?

Ví dụ tôi đã thấy với các biến phát sóng có các từ điển, được sử dụng một lần để chuyển đổi tập hợp dữ liệu (nghĩa là thay thế từ viết tắt tên sân bay bằng tên sân bay). Động lực thúc đẩy họ ở đây là tạo ra các đối tượng với kiến ​​thức về một biến phát sóng và cách tương tác với nó, duy trì các đối tượng đó và thực hiện nhiều tính toán bằng cách sử dụng chúng (với tia lửa chăm sóc giữ chúng trong bộ nhớ).

Một số mẹo để sử dụng các biến phát rộng (100 MB +) là gì? Có tồn tại một biến phát sóng sai lầm không? Đây có phải là vấn đề có thể xảy ra với PySpark không?

Cảm ơn bạn! Trợ giúp của bạn được đánh giá cao.

Lưu ý, tôi cũng đã đăng câu hỏi này trên databricks forums

Edit - Câu hỏi của followup:

Có ý kiến ​​cho rằng serializer Spark mặc định có kích thước lô 65337. Đối tượng đăng trên khác nhau các lô không được xác định là giống nhau và được chỉ định các địa chỉ bộ nhớ khác nhau, được kiểm tra tại đây thông qua hàm dựng sẵn id. Tuy nhiên, ngay cả với một biến phát sóng lớn hơn mà theo lý thuyết sẽ có 256 lô để tuần tự hóa, tôi vẫn chỉ thấy 2 bản sao riêng biệt. Tôi không nên thấy nhiều hơn? Là sự hiểu biết của tôi về cách serialization hàng loạt hoạt động không chính xác?

>>> sc.serializer.bestSize 
65536 
>>> import cPickle as pickle 
>>> broadcast_data = {k: v for (k, v) in enumerate(range(int(1e6)))} 
>>> len(pickle.dumps(broadcast_data)) 
16777786 
>>> len(pickle.dumps({k: v for (k, v) in enumerate(range(int(1e6)))}))/sc.serializer.bestSize 
256 
>>> bd = sc.broadcast(broadcast_data) 
>>> rdd = sc.parallelize(range(100), 1).map(lambda _: bd.value) 
>>> rdd.map(id).distinct().count() 
1 
>>> rdd.cache().count() 
100 
>>> rdd.map(id).distinct().count() 
2 

Trả lời

5

Vâng, ma quỷ là chi tiết. Để hiểu lý do tại sao điều này có thể xảy ra, chúng tôi sẽ phải xem xét kỹ hơn về bộ lặp tuần tự PySpark.Đầu tiên cho phép tạo SparkContext với các thiết lập mặc định:

from pyspark import SparkContext 

sc = SparkContext("local", "foo") 

và kiểm tra một serializer mặc định là gì:

sc.serializer 
## AutoBatchedSerializer(PickleSerializer()) 

sc.serializer.bestSize 
## 65536 

Nó cho chúng ta biết ba điều khác nhau:

  • đây là AutoBatchedSerializer serializer
  • nó đang sử dụng PickleSerializer để thực hiện công việc thực tế
  • bestSize của serialized batched là 65536 bytes

Nhìn lướt qua at the source code sẽ cho bạn thấy rằng serialize này điều chỉnh số lượng hồ sơ đăng vào thời điểm trên thời gian chạy và cố gắng giữ kích thước hàng loạt dưới 10 * bestSize. Điểm quan trọng là không phải tất cả các bản ghi trong phân vùng đơn được tuần tự hóa cùng một lúc.

Chúng tôi có thể kiểm tra bằng thực nghiệm như sau:

from operator import add 

bd = sc.broadcast({}) 

rdd = sc.parallelize(range(10), 1).map(lambda _: bd.value) 
rdd.map(id).distinct().count() 
## 1 

rdd.cache().count() 
## 10 

rdd.map(id).distinct().count() 
## 2 

Như bạn có thể thấy ngay cả trong ví dụ này đơn giản sau khi serialization-deserialization chúng tôi nhận được hai đối tượng riêng biệt. Bạn có thể quan sát hành vi tương tự làm việc trực tiếp với pickle:

v = {} 
vs = [v, v, v, v] 

v1, *_, v4 = pickle.loads(pickle.dumps(vs)) 
v1 is v4 
## True 

(v1_, v2_), (v3_, v4_) = (
    pickle.loads(pickle.dumps(vs[:2])), 
    pickle.loads(pickle.dumps(vs[2:])) 
) 

v1_ is v4_ 
## False 

v3_ is v4_ 
## True 

giá trị đăng trên các tài liệu tham khảo hàng loạt cùng, sau khi unpickling, cùng một đối tượng. Giá trị từ các lô khác nhau trỏ đến các đối tượng khác nhau.

Trong thực tế, Spark nhiều serializes và các chiến lược tuần tự hóa khác nhau. Bạn có thể ví dụ như sử dụng lô kích thước vô hạn:

from pyspark.serializers import BatchedSerializer, PickleSerializer 

rdd_ = (sc.parallelize(range(10), 1).map(lambda _: bd.value) 
    ._reserialize(BatchedSerializer(PickleSerializer()))) 
rdd_.cache().count() 

rdd_.map(id).distinct().count() 
## 1 

Bạn có thể thay đổi serializer bằng cách thông qua serializer và/hoặc batchSize tham số để SparkContext constructor:

sc = SparkContext(
    "local", "bar", 
    serializer=PickleSerializer(), # Default serializer 
    # Unlimited batch size -> BatchedSerializer instead of AutoBatchedSerializer 
    batchSize=-1 
) 

sc.serializer 
## BatchedSerializer(PickleSerializer(), -1) 

Chọn serializers khác nhau và chia phần theo chiến lược kết quả trong thương mại khác nhau -offs (tốc độ, khả năng serialize các đối tượng tùy ý, yêu cầu bộ nhớ, vv). Bạn cũng nên nhớ rằng các biến phát sóng trong Spark không được chia sẻ giữa các chuỗi thực thi vì vậy cùng một nhân viên có thể tồn tại nhiều bản sao được tuần tự hóa cùng một lúc.

Hơn nữa, bạn sẽ thấy một hành vi tương tự với điều này nếu bạn thực hiện chuyển đổi yêu cầu xáo trộn.

+0

Bạn có nhớ nhận xét thêm về sự cân bằng giữa các chiến lược tuần tự hóa không? Với kích thước hàng loạt tăng lên, chúng ta có nên mong đợi nhiều bộ nhớ hơn cần thiết để tuần tự hóa không? Làm thế nào nó sẽ ảnh hưởng đến tốc độ serialization? Tại sao một trong những sẽ chọn một serializer mà không thể serialize các đối tượng tùy ý? – captaincapsaicin

+0

Vâng, phân vùng đầy đủ có thể không phù hợp trong bộ nhớ vì vậy nếu lô là vô hạn không có đảm bảo nó sẽ thành công. Thats cho người mới bắt đầu. Sử dụng bộ nhớ cao hơn có thể dẫn đến các vấn đề GC khác nhau. Về câu hỏi cuối cùng của bạn khá nhiều không serializer có thể xử lý một đối tượng tùy ý, đặc biệt là nếu điều này là interfacing với một mã nguồn gốc. Có một số cấu trúc ngôn ngữ không thể được tuần tự hóa theo mặc định bởi thiết kế (như biểu thức lambda) và yêu cầu các công cụ chuyên biệt. Mặt khác, việc đóng gói serialization phức tạp có thể chậm. – zero323

+0

Tôi cũng đã chỉnh sửa câu hỏi tiếp theo cho câu hỏi ban đầu của mình. Bạn có thể xem xét và làm rõ cách kích thước lô tương quan với số lượng các đối tượng riêng biệt mà chúng ta thấy trong bộ nhớ Spark? – captaincapsaicin

Các vấn đề liên quan