Tôi có một chương trình Spark rất đơn giản (sử dụng Flambo trong Clojure, nhưng phải dễ theo dõi). Đây là tất cả các đối tượng trên JVM. Tôi đang thử nghiệm trên một ví dụ local
(mặc dù tôi sẽ đoán rằng Spark vẫn nối tiếp và deserialises).Sử dụng JodaTime trong nhóm của SparkByKey và countByKey
(let [dt (t/date-time 2014)
input (f/parallelize sc [{:the-date dt :x "A"}
{:the-date dt :x "B"}
{:the-date dt :x "C"}
{:the-date dt :x "D"}])
by-date (f/map input (f/fn [{the-date :the-date x :x}] [the-date x])))
Nhập là RDD của bốn bộ, mỗi bộ có cùng một đối tượng ngày. Bản đồ đầu tiên tạo ra một giá trị khóa-giá trị RDD của ngày => x.
Nội dung của input
được, như mong đợi:
=> (f/foreach input prn)
[#<DateTime 2014-01-01T00:00:00.000Z> "A"]
[#<DateTime 2014-01-01T00:00:00.000Z> "B"]
[#<DateTime 2014-01-01T00:00:00.000Z> "C"]
[#<DateTime 2014-01-01T00:00:00.000Z> "D"]
Chỉ cần được rõ ràng, bình đẳng và .hashCode
làm việc trên các đối tượng date:
=> (= dt dt)
true
=> (.hashCode dt)
1260848926
=> (.hashCode dt)
1260848926
Họ là trường hợp của JodaTime của DateTime, mà implement equals as expected .
Khi tôi cố gắng countByKey
, tôi nhận được mong đợi:
=> (f/count-by-key by-date)
{#<DateTime 2014-01-01T00:00:00.000Z> 4}
Nhưng khi tôi groupByKey
, nó dường như không làm việc.
=> (f/foreach (f/group-by-key by-date) prn)
[#<DateTime 2014-01-01T00:00:00.000Z> ["A"]]
[#<DateTime 2014-01-01T00:00:00.000Z> ["B"]]
[#<DateTime 2014-01-01T00:00:00.000Z> ["C"]]
[#<DateTime 2014-01-01T00:00:00.000Z> ["D"]]
Các phím đều giống hệt nhau vì vậy tôi mong chờ kết quả là một mục duy nhất với ngày là chìa khóa và ["A", "B", "C", "D"]
như giá trị. Một cái gì đó đang xảy ra bởi vì các giá trị là tất cả các danh sách.
Bằng cách nào đó, groupByKey
không được viết đúng các phím. Nhưng countByKey
là. Sự khác nhau giữa hai cái là gì? Làm thế nào tôi có thể làm cho họ cư xử như nhau?
Bất kỳ ý tưởng nào?
có thể bạn có thể sử dụng thời gian epoch trong biểu diễn millis thay vì đối tượng ngày/giờ. Có vẻ như là giải pháp thay thế an toàn hơn. Chúng tôi đã gặp phải vấn đề về khóa dữ liệu với các băm khác dựa trên vị trí bộ nhớ, như Java enums. Chúng không hoạt động trên môi trường phân tán. – maasg
Cảm ơn, đó là những gì tôi (nghĩ rằng tôi) đề xuất với Instant. Điều tốt để biết tôi không phải là người duy nhất có vấn đề này! – Joe
Bạn có đang sử dụng sự kết hợp không đồng nhất của các thứ tự thời gian và các múi giờ trong một RDD không? Nếu không, tôi sẽ duy trì thông tin đó ở cấp độ RDD và lưu vào bộ nhớ của mỗi bản ghi (như bạn đang thực hiện với 'Instant') – climbage