2016-05-11 14 views
8

Tôi gửi mã của mình đến cụm sao độc lập. Gửi lệnh giống như dưới đây:Đây có phải là lỗi của luồng tia lửa hoặc rò rỉ bộ nhớ không?

nohup ./bin/spark-submit \ 
--master spark://ES01:7077 \ 
--executor-memory 4G \ 
--num-executors 1 \ 
--total-executor-cores 1 \ 
--conf "spark.storage.memoryFraction=0.2" \ 
./myCode.py 1>a.log 2>b.log & 

Tôi chỉ định người thực thi sử dụng bộ nhớ 4G ở trên lệnh. Nhưng sử dụng lệnh trên cùng để theo dõi quá trình thực thi, tôi nhận thấy việc sử dụng bộ nhớ ngày càng tăng. Bây giờ đầu ra Lệnh trên cùng bên dưới:

PID USER  PR NI VIRT RES SHR S %CPU %MEM  TIME+ COMMAND                                      
12578 root  20 0 20.223g 5.790g 23856 S 61.5 37.3 20:49.36 java  

Tổng bộ nhớ của tôi là 16G nên 37,3% đã lớn hơn 4GB tôi chỉ định. Và nó vẫn đang phát triển.

Sử dụng lệnh ps, bạn có thể biết đó là quá trình thực thi.

[[email protected]ES01 ~]# ps -awx | grep spark | grep java 
10409 ?  Sl  1:43 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.master.Master --ip ES01 --port 7077 --webui-port 8080 
10603 ?  Sl  6:16 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4G -Xmx4G -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://ES01:7077 
12420 ?  Sl 10:16 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit --master spark://ES01:7077 --conf spark.storage.memoryFraction=0.2 --executor-memory 4G --num-executors 1 --total-executor-cores 1 /opt/flowSpark/sparkStream/ForAsk01.py 
12578 ?  Sl 21:03 java -cp /opt/spark-1.6.0-bin-hadoop2.6/conf/:/opt/spark-1.6.0-bin-hadoop2.6/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar:/opt/spark-1.6.0-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:/opt/hadoop-2.6.2/etc/hadoop/ -Xms4096M -Xmx4096M -Dspark.driver.port=52931 -XX:MaxPermSize=256m org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://[email protected]:52931 --executor-id 0 --hostname 10.79.148.184 --cores 1 --app-id app-20160511080701-0013 --worker-url spark://[email protected]:52660 

Dưới đây là mã. Nó rất đơn giản vì vậy tôi không nghĩ rằng có rò rỉ bộ nhớ

if __name__ == "__main__": 

    dataDirectory = '/stream/raw' 

    sc = SparkContext(appName="Netflow") 
    ssc = StreamingContext(sc, 20) 

    # Read CSV File 
    lines = ssc.textFileStream(dataDirectory) 

    lines.foreachRDD(process) 

    ssc.start() 
    ssc.awaitTermination() 

Mã cho chức năng xử lý bên dưới. Xin lưu ý rằng tôi đang sử dụng HiveContext không phải SqlContext tại đây. Bởi vì SqlContext không hỗ trợ chức năng cửa sổ

def getSqlContextInstance(sparkContext): 
    if ('sqlContextSingletonInstance' not in globals()): 
     globals()['sqlContextSingletonInstance'] = HiveContext(sparkContext) 
    return globals()['sqlContextSingletonInstance'] 

def process(time, rdd): 

    if rdd.isEmpty(): 
     return sc.emptyRDD() 

    sqlContext = getSqlContextInstance(rdd.context) 

    # Convert CSV File to Dataframe 
    parts = rdd.map(lambda l: l.split(",")) 
    rowRdd = parts.map(lambda p: Row(router=p[0], interface=int(p[1]), flow_direction=p[9], bits=int(p[11]))) 
    dataframe = sqlContext.createDataFrame(rowRdd) 

    # Get the top 2 interface of each router 
    dataframe = dataframe.groupBy(['router','interface']).agg(func.sum('bits').alias('bits')) 
    windowSpec = Window.partitionBy(dataframe['router']).orderBy(dataframe['bits'].desc()) 
    rank = func.dense_rank().over(windowSpec) 
    ret = dataframe.select(dataframe['router'],dataframe['interface'],dataframe['bits'], rank.alias('rank')).filter("rank<=2") 

    ret.show() 
    dataframe.show() 

Trên thực tế tôi thấy bên dưới mã sẽ gây ra vấn đề:

# Get the top 2 interface of each router 
    dataframe = dataframe.groupBy(['router','interface']).agg(func.sum('bits').alias('bits')) 
    windowSpec = Window.partitionBy(dataframe['router']).orderBy(dataframe['bits'].desc()) 
    rank = func.dense_rank().over(windowSpec) 
    ret = dataframe.select(dataframe['router'],dataframe['interface'],dataframe['bits'], rank.alias('rank')).filter("rank<=2") 
    ret.show() 

vì Nếu tôi loại bỏ những 5 dòng. Mã có thể chạy cả đêm mà không hiển thị tăng bộ nhớ. Nhưng việc thêm chúng sẽ khiến cho việc sử dụng bộ nhớ của người thực thi phát triển lên một con số rất cao.

Về cơ bản, mã trên chỉ là một số cửa sổ + grouby trong SparkSQL. Vậy đây có phải là lỗi không?

+0

Related http://stackoverflow.com/q/37283624/1560062? – zero323

+0

@ zero323 cảm ơn. Nhưng tôi đang sử dụng spark1.6.1 –

Trả lời

0

Như tôi có thể thấy trong 5 dòng của bạn, có thể groupBy là vấn đề, bạn sẽ thử với reduceBy và xem cách nó hoạt động.

Xem herehere.

+0

cảm ơn thông tin. Nhưng tôi hy vọng sẽ biết nếu nó là một lỗi hoặc tôi không sử dụng nó đúng cách. –

+0

@Tristan Nó không phải là cùng một nhómBởi như trên RDD, Xem http://stackoverflow.com/q/32902982/1560062 – zero323

+0

Tôi giả định tệp csv này được lưu trữ trên HDFS. kích thước của nó là gì? bao nhiêu nó phát triển/thay đổi và trong những gì tần số. những gì tôi đang cố gắng để hiểu là bao nhiêu dữ liệu để bạn cần phải xử lý tại mỗi khoảng thời gian hàng loạt và khoảng thời gian đó (1 giây theo mặc định) là gì? –

3

Disclaimer: câu trả lời này không dựa trên gỡ lỗi, nhưng thêm về những quan sát và tài liệu Apache Spark cung cấp

Tôi không tin rằng đây là một lỗi để bắt đầu với!

Nhìn vào cấu hình của bạn, chúng ta có thể thấy rằng bạn đang tập trung chủ yếu vào điều chỉnh của người điều hành, điều này không sai, nhưng bạn quên phần trình điều khiển của phương trình.

Nhìn vào tia lửa tổng quan về cụm từ Apache Spark documentaion

enter image description here

Như bạn có thể thấy, mỗi công nhân có một chấp hành viên, tuy nhiên, trong trường hợp của bạn, nút công nhân cũng giống như các nút điều khiển! Trường hợp đó là trường hợp khi bạn chạy cục bộ hoặc trên một cụm độc lập trong một nút duy nhất.

Hơn nữa, trình điều khiển mất 1G bộ nhớ theo mặc định trừ khi được điều chỉnh bằng cờ spark.driver.memory.Hơn nữa, bạn không nên quên việc sử dụng heap từ bản thân JVM, và giao diện người dùng web cũng được người lái xe chăm sóc quá AFAIK!

Khi bạn xóa các dòng mã bạn đã đề cập, mã của bạn còn lại mà không có hành động như chức năng map chỉ là một phép chuyển đổi, do đó, bạn sẽ không thấy tăng bộ nhớ !

Tương tự áp dụng trên groupBy vì nó chỉ là một phép biến đổi sẽ không được thực hiện trừ khi một hành động được gọi trong trường hợp của bạn là aggshow tiếp tục xuống luồng!

Điều đó nói rằng, hãy cố gắng giảm thiểu bộ nhớ trình điều khiển của bạn và tổng số lõi trong tia lửa được xác định bởi spark.cores.max nếu bạn muốn kiểm soát số lõi trên quy trình này, sau đó xếp xuống các trình thực thi. Hơn nữa, tôi sẽ thêm spark.python.profile.dump vào danh sách cấu hình của bạn để bạn có thể xem tiểu sử để thực hiện công việc spark, có thể giúp bạn hiểu rõ hơn về trường hợp và điều chỉnh cụm của bạn theo nhu cầu của bạn.

+0

Xin chào, Cảm ơn bạn đã trả lời. Nhưng 1. Tôi vẫn có một dataframe.show() sau khi loại bỏ những dòng. Vì vậy, vẫn còn có một hành động. 2. Trong trường hợp của tôi, tính toán luồng có thể chạy trong vài giờ. Có nghĩa là hàng ngàn vòng (khoảng thời gian là 20 giây). Trong thời gian này việc sử dụng bộ nhớ của người thực thi tiếp tục phát triển. Vì vậy, tôi không biết giải pháp được đề xuất của bạn là gì. Giảm thiểu bộ nhớ trình điều khiển của tôi? Tại sao? –

+0

Tôi không nhận ra rằng chương trình không phải là một phần của mã đã bị xóa. Đối với bộ nhớ, tôi đang xây dựng dựa trên cố gắng để giảm thiểu nó và xem nếu nó sẽ tràn! –

Các vấn đề liên quan