2015-01-27 15 views
5

Tôi có một chương trình Spark rất đơn giản (sử dụng Flambo trong Clojure, nhưng phải dễ theo dõi). Đây là tất cả các đối tượng trên JVM. Tôi đang thử nghiệm trên một ví dụ local (mặc dù tôi sẽ đoán rằng Spark vẫn nối tiếp và deserialises).Sử dụng JodaTime trong nhóm của SparkByKey và countByKey

(let [dt (t/date-time 2014) 
     input (f/parallelize sc [{:the-date dt :x "A"} 
           {:the-date dt :x "B"} 
           {:the-date dt :x "C"} 
           {:the-date dt :x "D"}]) 
     by-date (f/map input (f/fn [{the-date :the-date x :x}] [the-date x]))) 

Nhập là RDD của bốn bộ, mỗi bộ có cùng một đối tượng ngày. Bản đồ đầu tiên tạo ra một giá trị khóa-giá trị RDD của ngày => x.

Nội dung của input được, như mong đợi:

=> (f/foreach input prn) 
[#<DateTime 2014-01-01T00:00:00.000Z> "A"] 
[#<DateTime 2014-01-01T00:00:00.000Z> "B"] 
[#<DateTime 2014-01-01T00:00:00.000Z> "C"] 
[#<DateTime 2014-01-01T00:00:00.000Z> "D"] 

Chỉ cần được rõ ràng, bình đẳng và .hashCode làm việc trên các đối tượng date:

=> (= dt dt) 
true 
=> (.hashCode dt) 
1260848926 
=> (.hashCode dt) 
1260848926 

Họ là trường hợp của JodaTime của DateTime, mà implement equals as expected .

Khi tôi cố gắng countByKey, tôi nhận được mong đợi:

=> (f/count-by-key by-date) 
{#<DateTime 2014-01-01T00:00:00.000Z> 4} 

Nhưng khi tôi groupByKey, nó dường như không làm việc.

=> (f/foreach (f/group-by-key by-date) prn) 
[#<DateTime 2014-01-01T00:00:00.000Z> ["A"]] 
[#<DateTime 2014-01-01T00:00:00.000Z> ["B"]] 
[#<DateTime 2014-01-01T00:00:00.000Z> ["C"]] 
[#<DateTime 2014-01-01T00:00:00.000Z> ["D"]] 

Các phím đều giống hệt nhau vì vậy tôi mong chờ kết quả là một mục duy nhất với ngày là chìa khóa và ["A", "B", "C", "D"] như giá trị. Một cái gì đó đang xảy ra bởi vì các giá trị là tất cả các danh sách.

Bằng cách nào đó, groupByKey không được viết đúng các phím. Nhưng countByKey là. Sự khác nhau giữa hai cái là gì? Làm thế nào tôi có thể làm cho họ cư xử như nhau?

Bất kỳ ý tưởng nào?

Trả lời

3

Tôi đang tiến gần hơn về phía câu trả lời. Tôi nghĩ rằng điều này thuộc về phần trả lời thay vì phần câu hỏi.

Nhóm này theo khóa, chuyển thành thu thập địa phương, trích xuất mục đầu tiên (ngày).

=> (def result-dates (map first (f/collect (f/group-by-key by-date)))) 
=> result-dates 
(#<DateTime 2014-01-01T00:00:00.000Z> 
#<DateTime 2014-01-01T00:00:00.000Z> 
#<DateTime 2014-01-01T00:00:00.000Z> 
#<DateTime 2014-01-01T00:00:00.000Z>) 

Các hashcodes đều như nhau

=> (map #(.hashCode %) result-dates) 
(1260848926 
1260848926 
1260848926 
1260848926) 

Các mili giây đều như nhau:

=> (map #(.getMillis %) result-dates) 
(1388534400000 
1388534400000 
1388534400000 
1388534400000) 

equals thất bại, nhưng isEquals thành công

=> (.isEqual (first result-dates) (second result-dates)) 
true 

=> (.equals (first result-dates) (second result-dates)) 
false 

documentation for .equals says:

So sánh đối tượng này với đối tượng quy định về bình đẳng dựa trên phần nghìn giây ngay lập tức và Niên

mili giây của họ đều bình đẳng và biên niên của họ xuất hiện là:

=> (map #(.getChronology %) result-dates) 
(#<ISOChronology ISOChronology[UTC]> 
#<ISOChronology ISOChronology[UTC]> 
#<ISOChronology ISOChronology[UTC]> 
#<ISOChronology ISOChronology[UTC]>) 

Tuy nhiên, trình tự thời gian không tương đương.

=> (def a (first result-dates)) 
=> (def b (second result-dates)) 

=> (= (.getChronology a) (.getChronology b)) 
false 

Mặc dù hashcodes làm

=> (= (.hashCode (.getChronology a)) (.hashCode (.getChronology b))) 
true 

Nhưng joda.time.Chronology không cung cấp its own equals method và được thừa hưởng nó từ Object, mà chỉ sử dụng bình đẳng tham khảo.

Lý thuyết của tôi là tất cả các ngày này đều được deserialized với riêng của họ, khác nhau, xây dựng các đối tượng thời gian, nhưng JodaTime có its own serializer mà có thể đề với điều này. Có lẽ một serializer tùy chỉnh Kryo sẽ giúp đỡ trong vấn đề này.

Còn bây giờ, giải pháp của tôi để sử dụng JodaTime trong Spark là sử dụng org.joda.time .Instant bằng cách gọi toInstant, hoặc một java.util.Date chứ không phải là một org.joda.time.DateTime.

Cả hai đều liên quan đến việc loại bỏ thông tin múi giờ, điều này không lý tưởng, vì vậy nếu có ai có thêm thông tin, nó sẽ rất được hoan nghênh!

+0

có thể bạn có thể sử dụng thời gian epoch trong biểu diễn millis thay vì đối tượng ngày/giờ. Có vẻ như là giải pháp thay thế an toàn hơn. Chúng tôi đã gặp phải vấn đề về khóa dữ liệu với các băm khác dựa trên vị trí bộ nhớ, như Java enums. Chúng không hoạt động trên môi trường phân tán. – maasg

+0

Cảm ơn, đó là những gì tôi (nghĩ rằng tôi) đề xuất với Instant. Điều tốt để biết tôi không phải là người duy nhất có vấn đề này! – Joe

+0

Bạn có đang sử dụng sự kết hợp không đồng nhất của các thứ tự thời gian và các múi giờ trong một RDD không? Nếu không, tôi sẽ duy trì thông tin đó ở cấp độ RDD và lưu vào bộ nhớ của mỗi bản ghi (như bạn đang thực hiện với 'Instant') – climbage

Các vấn đề liên quan