2016-03-15 13 views
8

Tôi muốn đăng nhập vào bộ ghi chuẩn bên trong một người thực hiện trong quá trình chuyển đổi với mức nhật ký và định dạng được tôn trọng. Thật không may tôi không thể truy cập vào đối tượng logger log4j bên trong phương thức vì nó không thể tuần tự hóa được và ngữ cảnh tia lửa không có sẵn bên trong phép biến đổi. Tôi chỉ có thể đăng nhập bên ngoài của việc chuyển đổi tất cả các đối tượng mà tôi sẽ chạm vào nhưng điều đó không thực sự giúp gỡ lỗi hoặc giám sát việc thực thi mã.Trong PySpark, làm thế nào tôi có thể đăng nhập vào log4j từ bên trong một phép biến đổi

def slow_row_contents_fetch(row): 
    rows = fetch_id_row_contents(row) # API fetch, DB fetch, etc 
    # This shows up, but not controllable by log level 
    print "Processed slow row with {} results".format(len(rows)) 
    return rows 

sc.parallelize(fetchable_ids).flatMap(slow_row_contents_fetch, True) 

Bên ngoài việc chuyển đổi tôi có thể lấy logger qua:

logger = sc._jvm.org.apache.log4j.LogManager.getRootLogger() 
logger.warn('This will show up as expected') 

Nhưng sc không có sẵn bên trong việc chuyển đổi, vì lý do tốt. Bạn thấy thông báo sau nếu bạn cố gắng gọi sc trực tiếp bên trong việc chuyển đổi:

Ngoại lệ: Có vẻ như bạn đang cố gắng để tham khảo SparkContext từ một biến phát sóng, hành động, hoặc transforamtion. SparkContext chỉ có thể được sử dụng trên trình điều khiển, không phải trong mã mà nó chạy trên công nhân. Để biết thêm thông tin, xem SPARK-5063.

Tôi chỉ có thể in nhưng không dễ lọc và chỉ được theo dõi dưới dạng thông báo lỗi chưa được định dạng cho nhật ký log4j.

Serializing logger chính nó, như trừ, thất bại khi gọi các logger trong chuyển đổi chức năng:

... 
File "/usr/lib/python2.7/pickle.py", line 306, in save 
    rv = reduce(self.proto) 
    File "/usr/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ 
    File "/usr/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 36, in deco 
    File "/usr/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 304, in get_return_value 
py4j.protocol.Py4JError: An error occurred while calling o36.__getnewargs__. Trace: 
py4j.Py4JException: Method __getnewargs__([]) does not exist 
     at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333) 
     at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342) 
     at py4j.Gateway.invoke(Gateway.java:252) 
     at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
     at py4j.commands.CallCommand.execute(CallCommand.java:79) 
     at py4j.GatewayConnection.run(GatewayConnection.java:207) 
     at java.lang.Thread.run(Thread.java:745) 

Có cách nào để có được quyền truy cập vào các logger chấp hành viên trong biến đổi trong pyspark?

Trả lời

7

Sau một vài giờ đào vào kho chứa tia lửa, có vẻ như điều này là không thể đạt được hiện tại. Người thực thi không thực sự có một cá thể jvm mà nó được gắn vào, dữ liệu chỉ được truyền qua socket mà không có ràng buộc bản địa jvm để sử dụng.

Dưới đây là đoạn code tạo người lao động mà dòng thông báo lỗi để stderr:

private def createSimpleWorker(): Socket = { 
    ... 
    val worker = pb.start() 

    // Redirect worker stdout and stderr 
    redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream) 

    ... 
} 

/** 
* Redirect the given streams to our stderr in separate threads. 
*/ 
private def redirectStreamsToStderr(stdout: InputStream, stderr: InputStream) { 
    try { 
    new RedirectThread(stdout, System.err, "stdout reader for " + pythonExec).start() 
    new RedirectThread(stderr, System.err, "stderr reader for " + pythonExec).start() 
    } catch { 
    case e: Exception => 
     logError("Exception in redirecting streams", e) 
    } 
} 

Và đây là mã worker.py để giao tiếp quá trình xử lý công việc. Không có nơi nào để phát ra các thông điệp tường trình hoặc loại thông báo cho biết sự kiện nhật ký.

try: 
    ... 
    command = pickleSer._read_with_length(infile) 
    if isinstance(command, Broadcast): 
     command = pickleSer.loads(command.value) 
    func, profiler, deserializer, serializer = command 
    init_time = time.time() 

    def process(): 
     iterator = deserializer.load_stream(infile) 
     serializer.dump_stream(func(split_index, iterator), outfile) 

    if profiler: 
     profiler.profile(process) 
    else: 
     process() 
except Exception: 
    try: 
     write_int(SpecialLengths.PYTHON_EXCEPTION_THROWN, outfile) 
     write_with_length(traceback.format_exc().encode("utf-8"), outfile) 
    except IOError: 
     # JVM close the socket 
     pass 
    except Exception: 
     # Write the error to stderr if it happened while serializing 
     print("PySpark worker failed with exception:", file=sys.stderr) 
     print(traceback.format_exc(), file=sys.stderr) 
    exit(-1) 
finish_time = time.time() 
report_times(outfile, boot_time, init_time, finish_time) 
write_long(shuffle.MemoryBytesSpilled, outfile) 
write_long(shuffle.DiskBytesSpilled, outfile) 

# Mark the beginning of the accumulators section of the output 
write_int(SpecialLengths.END_OF_DATA_SECTION, outfile) 
write_int(len(_accumulatorRegistry), outfile) 
for (aid, accum) in _accumulatorRegistry.items(): 
    pickleSer._write_with_length((aid, accum._value), outfile) 
... 

Và cuối cùng là các loại tin nhắn có sẵn:

class SpecialLengths(object): 
    END_OF_DATA_SECTION = -1 
    PYTHON_EXCEPTION_THROWN = -2 
    TIMING_DATA = -3 
    END_OF_STREAM = -4 
    NULL = -5 
1

Có xem xét câu hỏi này

Similar situation

Bạn có thể nhận chức năng bản đồ của bạn để trả lại cho bạn một đối tượng có thể chứa một chuỗi dấu vết ngăn xếp hoặc một đối tượng thực, và một cờ bool cho biết nếu có lỗi. Điều này có thể hữu ích để gỡ lỗi một tác vụ có tác dụng phụ hoặc nếu bạn có các điều kiện dữ liệu cụ thể gây ra lỗi.

+0

Rất tiếc, điều này không đáp ứng được những hạn chế của những gì tôi đã cố gắng đạt được, nhưng đó là một ý tưởng thú vị. Trong tình huống tôi đã ở trong tôi muốn in ra một số thông tin gỡ lỗi nhưng có các hàng bị ảnh hưởng vì có một số giai đoạn sau đó phụ thuộc vào những đầu ra đó. – Pyrce

+0

Có thể bạn có thể tạo một hàm bản đồ trả về một đối tượng có chứa các thư bạn muốn và dữ liệu? Sau đó, bạn có thể tách dữ liệu và các tin nhắn với hai chức năng bản đồ đơn giản, và sau đó sử dụng dữ liệu trong phần còn lại của công việc sau này của bạn. – ThatDataGuy

+0

Khó thực hiện hơn trong các hàm UDI của Dataframe vì tôi cần phải thêm một cột dữ liệu khác cho các thư, nhưng đó là một công việc hợp lý xung quanh trong một số điều kiện cho điều tôi muốn thực sự đạt được – Pyrce

Các vấn đề liên quan