2016-03-16 12 views
14

Tôi đang làm việc trên dự án phát trực tuyến Scala (2.11)/Spark (1.6.1) và sử dụng mapWithState() để theo dõi dữ liệu đã xem từ các đợt trước.Spark Bản đồ phát trực tuyếnWithState dường như xây dựng lại trạng thái hoàn chỉnh theo định kỳ

Trạng thái được phân phối trong 20 phân vùng trên nhiều nút, được tạo bằng StateSpec.function(trackStateFunc _).numPartitions(20). Trong trạng thái này, chúng tôi chỉ có một vài phím (~ 100) được ánh xạ tới Sets với hơn ~ 160.000 mục, tăng lên trong toàn bộ ứng dụng. Toàn bộ tiểu bang lên đến 3GB, có thể được xử lý bởi mỗi nút trong cụm. Trong mỗi lô, một số dữ liệu được thêm vào trạng thái nhưng không bị xóa cho đến khi kết thúc quá trình, nghĩa là ~ 15 phút.

Trong khi theo dõi giao diện người dùng ứng dụng, mỗi lần xử lý của lô thứ 10 rất cao so với các lô khác. Xem hình ảnh:

The spikes show the higher processing time.

Các trường vàng đại diện cho thời gian xử lý cao.

enter image description here

Một chi tiết hơn xem công việc cho thấy rằng trong những đợt xảy ra tại một điểm nhất định, chính xác khi nào tất cả 20 phân vùng đã được "bỏ qua". Hoặc đây là những gì giao diện người dùng nói.

enter image description here

sự hiểu biết của tôi về skipped là mỗi phân vùng nhà nước là một trong những nhiệm vụ càng tốt mà không được thực hiện, như nó không cần phải được tính toán lại. Tuy nhiên, tôi không hiểu tại sao số lượng skips thay đổi trong mỗi Công việc và tại sao Công việc cuối cùng đòi hỏi quá trình xử lý nhiều. Thời gian xử lý cao hơn xảy ra bất kể kích thước của nhà nước, nó chỉ ảnh hưởng đến thời gian.

Đây có phải là lỗi trong chức năng mapWithState() hoặc là hành vi dự định này không? Cấu trúc dữ liệu cơ bản có yêu cầu một số loại thay đổi lại không, Set trong tiểu bang có cần sao chép dữ liệu không? Hoặc là nó có nhiều khả năng là một lỗ hổng trong ứng dụng của tôi?

Trả lời

9

Đây có phải là lỗi trong chức năng mapWithState() hoặc hành vi này có ý định không?

Đây là hành vi dự định. Số lần tăng đột biến bạn thấy là bởi vì dữ liệu của bạn đang nhận được điểm kiểm tra ở cuối loạt đã cho đó. Nếu bạn nhận thấy thời gian trên các lô dài hơn, bạn sẽ thấy rằng nó xảy ra liên tục cứ 100 giây một lần. Đó là vì thời gian điểm kiểm tra là không đổi và được tính trên mỗi batchDuration, tần suất bạn nói với nguồn dữ liệu của mình để đọc một đợt nhân với một số hằng số, trừ khi bạn đặt rõ ràng khoảng thời gian DStream.checkpoint.

Đây được mảnh có liên quan của mã từ MapWithStateDStream:

override def initialize(time: Time): Unit = { 
    if (checkpointDuration == null) { 
    checkpointDuration = slideDuration * DEFAULT_CHECKPOINT_DURATION_MULTIPLIER 
    } 
    super.initialize(time) 
} 

Trường DEFAULT_CHECKPOINT_DURATION_MULTIPLIER là:

private[streaming] object InternalMapWithStateDStream { 
    private val DEFAULT_CHECKPOINT_DURATION_MULTIPLIER = 10 
} 

Những dòng chính xác với hành vi mà bạn đang nhìn thấy, kể từ thời gian đọc mẻ của bạn là cứ sau 10 giây => 10 * 10 = 100 giây.

Điều này là bình thường và đó là chi phí của trạng thái bền bỉ với Spark. Một tối ưu hóa ở phía bạn có thể là suy nghĩ làm thế nào bạn có thể giảm thiểu kích thước của nhà nước bạn phải giữ trong bộ nhớ, để serialization này càng nhanh càng tốt. Bổ sung, hãy đảm bảo rằng dữ liệu được trải ra trong suốt các trình thực thi đủ để trạng thái được phân bố đồng đều giữa tất cả các nút. Ngoài ra, tôi hy vọng bạn đã bật Kryo Serialization thay vì tuần tự hóa Java mặc định, có thể cung cấp cho bạn hiệu suất tăng cường có ý nghĩa.

+0

Trong trường hợp của tôi, tôi có thể thấy rằng mọi công việc đều được kiểm tra trong lô. Tại sao không chỉ là công việc cuối cùng? Giải pháp của bạn để theo dõi quy mô của tiểu bang là gì? Để có thể tối ưu hóa nó. – crak

+0

@crak Khoảng thời gian kiểm tra của bạn là gì? Và làm thế nào bạn thấy rằng mọi công việc đều kiểm tra dữ liệu? –

+0

Cứ 10 lô. Mắt tôi đã bị lạm dụng, tôi có 12 công việc trên 16 mà làm checkpoint. Và đó là logic, tôi có 12 mapWithState, tôi có thể thấy dấu chân trong tia lửa. Nhưng không biết cái nào có kích thước lớn nhất. mapWithState lưu trữ chỉ cần nhà nước không giống như cấy ghép trước đó? – crak

1

Ngoài câu trả lời được chấp nhận, chỉ ra giá của serialization liên quan đến checkpointing, có một vấn đề khác ít được biết đến, có thể góp phần vào hành vi spikey: đuổi nhà nước bị xóa.

Cụ thể, trạng thái 'đã xóa' hoặc 'hết hạn' không bị xóa ngay lập tức khỏi bản đồ nhưng được đánh dấu để xóa và chỉ thực sự bị xóa trong quá trình tuần tự hóa [trong Spark 1.6.1, xem writeObjectInternal()].

này có hai ý nghĩa biểu diễn, trong đó xảy ra chỉ một lần mỗi 10 lô:

  1. Các traversal và xóa quá trình có giá của nó
  2. Nếu bạn xử lý các dòng timed-out/sự kiện xóa, ví dụ lưu giữ nó vào bộ nhớ ngoài, chi phí liên quan cho tất cả 10 lô sẽ chỉ được thanh toán tại thời điểm này (và không phải như dự kiến, trên mỗi RDD)
Các vấn đề liên quan