2016-01-21 28 views
12

Tôi đang viết một quy trình ETL nơi tôi sẽ cần đọc các tệp nhật ký theo giờ, phân vùng dữ liệu và lưu nó. Tôi đang sử dụng Spark (trong Databricks). Tệp nhật ký là CSV nên tôi đọc chúng và áp dụng lược đồ, sau đó thực hiện các phép biến đổi của tôi.Nối dữ liệu mới vào các tệp gỗ được phân đoạn

Vấn đề của tôi là, làm thế nào tôi có thể lưu dữ liệu của mỗi giờ dưới dạng định dạng sàn nhưng thêm vào tập dữ liệu hiện có? Khi lưu, tôi cần phân vùng bằng 4 cột có trong khung dữ liệu.

Đây là dòng tiết kiệm của tôi:

data 
    .filter(validPartnerIds($"partnerID")) 
    .write 
    .partitionBy("partnerID","year","month","day") 
    .parquet(saveDestination) 

Vấn đề là nếu thư mục đích tồn tại của tiết kiệm ném một lỗi. Nếu đích đến không tồn tại thì tôi sẽ không thêm các tệp của mình.

Tôi đã thử sử dụng .mode("append") nhưng tôi thấy rằng đôi khi Spark không hoạt động giữa chừng vì vậy tôi sẽ mất bao nhiêu dữ liệu của mình được ghi và tôi vẫn cần viết bao nhiêu.

Tôi đang sử dụng sàn gỗ vì phân vùng tăng đáng kể truy vấn của tôi trong tương lai. Đồng thời, tôi phải ghi dữ liệu dưới dạng một số định dạng tệp trên đĩa và không thể sử dụng cơ sở dữ liệu như Druid hoặc Cassandra.

Bất kỳ đề xuất nào về cách phân vùng khung dữ liệu của tôi và lưu tệp (gắn bó với sàn gỗ hoặc định dạng khác) được đánh giá cao.

+1

Bạn có thể chia sẻ những lỗi mà bạn nhận được khi bạn sử dụng '.mode (append)'. –

+0

Lỗi tôi nhận được là: Gây ra bởi: java.io.IOException: Tệp đã tồn tại:/tracking/v4/010316/gif = a/partnerID = 111/year = 2016/month = 1/day = 3/part -r-00147-8f30e760-3706-4e4c-bf56-e3b5515942d1.gz.parquet Tôi nghĩ rằng lỗi này được ném do tính năng lập lịch biểu tác vụ không khớp khi một số thao tác ghi mất nhiều thời gian. – Saman

Trả lời

9

Nếu bạn cần nối thêm các tệp, bạn chắc chắn phải sử dụng chế độ nối thêm. Tôi không biết có bao nhiêu phân vùng bạn mong đợi nó tạo ra, nhưng tôi thấy rằng nếu bạn có nhiều phân vùng, partitionBy sẽ gây ra một số vấn đề (vấn đề về bộ nhớ và IO).

Nếu bạn nghĩ rằng vấn đề của bạn là do các hoạt động ghi dùng quá lâu, tôi khuyên bạn nên thử hai điều này:

1) Sử dụng linh hoạt bằng cách thêm vào cấu hình:

conf.set("spark.sql.parquet.compression.codec", "snappy") 

2) Vô hiệu hoá hệ của các tập tin siêu dữ liệu trong hadoopConfiguration trên SparkContext như thế này:

sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false") 

các siêu dữ liệu tập tin sẽ được phần nào thời gian consumi Để tạo ra (xem this blog post), nhưng theo số this chúng không thực sự quan trọng. Cá nhân, tôi luôn vô hiệu hóa chúng và không có vấn đề gì.

Nếu bạn tạo nhiều phân vùng (> 500), tôi e rằng điều tốt nhất tôi có thể làm là đề xuất bạn xem giải pháp không sử dụng chế độ gắn thêm - Tôi chưa bao giờ quản lý được partitionBy để làm việc với nhiều phân vùng đó.

+0

Cảm ơn Glennie. Tôi luôn luôn vô hiệu hóa các tập tin siêu dữ liệu được tạo ra chính xác vì bài đăng trên blog đó: D. Tôi chắc chắn đang tạo hơn 500 phân vùng. Tôi tin rằng hầu hết các vấn đề của tôi là phát sinh từ thực tế là định dạng sàn gỗ không có ý định được sử dụng như một định dạng cập nhật được và tôi đang xử lý nó như một bảng cơ sở dữ liệu. Bạn có bất cứ gợi ý nào cho một cách khác để lưu dữ liệu hàng ngày của tôi không? – Saman

+0

tôi có vấn đề tương tự, tôi đang phân vùng trên cơ sở dấu thời gian hiện tại, với mỗi phân vùng mới nối nó tạo ra tổng số nhiệm vụ bằng với phân vùng cho đến nay. tức là nếu có 1000 phân vùng và 1 phân vùng mới để thêm, nó sẽ chạy 1001 tác vụ và nó làm tăng tổng thời gian công việc. Tôi đang làm gì sai ở đây? –

0

Nếu bạn đang sử dụng phân vùng chưa được phân loại, dữ liệu của bạn sẽ được phân chia trên tất cả các phân vùng của bạn. Điều đó có nghĩa là mọi tác vụ sẽ tạo và ghi dữ liệu vào từng tệp đầu ra của bạn.

xem xét phân vùng lại dữ liệu của bạn theo cột phân vùng của bạn trước khi viết để có tất cả các dữ liệu cho mỗi tập tin đầu ra trên cùng một phân vùng:

data 
.filter(validPartnerIds($"partnerID")) 
.repartition([optional integer,] "partnerID","year","month","day") 
.write 
.partitionBy("partnerID","year","month","day") 
.parquet(saveDestination) 

Xem: DataFrame.repartition

Các vấn đề liên quan