2016-01-28 20 views
6

Im cố gắng để đọc dữ liệu từ mysql và viết nó trở lại tập tin sàn gỗ trong s3 với phân vùng cụ thể như sau:spark đọc dữ liệu từ mysql song song

df=sqlContext.read.format('jdbc')\ 
    .options(driver='com.mysql.jdbc.Driver',url="""jdbc:mysql://<host>:3306/<>db?user=<usr>&password=<pass>""", 
     dbtable='tbl', 
     numPartitions=4)\ 
    .load() 


df2=df.withColumn('updated_date',to_date(df.updated_at)) 
df2.write.parquet(path='s3n://parquet_location',mode='append',partitionBy=['updated_date']) 

Vấn đề của tôi là nó mở chỉ có một kết nối đến mysql (thay vì 4) và nó không viết để parquert cho đến khi nó lấy tất cả các dữ liệu từ mysql, bởi vì bảng của tôi trong mysql là rất lớn (100M hàng) quá trình thất bại trên OutOfMemory.

Có cách nào để định cấu hình Spark để mở nhiều kết nối tới mysql và ghi một phần dữ liệu vào sàn gỗ không?

+0

Bạn đã khắc phục sự cố này chưa? nếu có xin vui lòng hướng dẫn tôi để thực hiện.! Cảm ơn bạn – Vignesh

Trả lời

3

Bạn nên thiết lập các thuộc tính:

partitionColumn, 
lowerBound, 
upperBound, 
numPartitions 

vì nó được ghi chép lại ở đây: http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases

+1

không may sau khi thêm cấu hình ở trên tôi vẫn thấy chỉ có một quá trình trong mysql 'hiển thị đầy đủ processlist' numPartitions = 4, cột = 'id', lowerBound = 0, upperBound = 208.765.263 –

+2

Bạn có kiểm tra có bao nhiêu Spark nút công nhân đang chạy với các tùy chọn này? Nếu bạn thấy chỉ có một, có một vấn đề Spark, nhưng nếu có bốn, chúng ta nên tìm một cái gì đó khác, chẳng hạn như cấu hình MySQL ... – mgaido

+0

Tôi bắt đầu kịch bản với tập hợp chủ trên máy khách sợi và thiết lập num-executors được 4. trên danh sách xử lý toàn bộ mysql Tôi chỉ thấy 1 quá trình đọc dữ liệu và dọc theo cách tia lửa mất đi những người thực thi và cố gắng phục hồi nhưng không may mắn ... bạn nghĩ tôi nên làm gì? –

2

Đối Spar> = 2.0 tôi đã tạo ra một lớp học với phương pháp tiếp theo:

... 
private val dbUrl = 
s"""jdbc:mysql://${host}:${port}/${db_name} 
    |?zeroDateTimeBehavior=convertToNull 
    |&read_buffer_size=100M""".stripMargin.replace("\n", "") 

def run(sqlQuery: String): DataFrame = { 
println(sqlQuery) 
Datapipeline.spark.read 
    .format("jdbc") 
    .option("driver", "com.mysql.jdbc.Driver") 
    .option("url", dbUrl) 
    .option("user", user) 
    .option("password", pass) 
    .option("dbtable", s"($sqlQuery) as tmp") 
    .load() 
} 
... 
def getBounds(table: String, whereClause: String, partitionColumn: String): Array[Int] = { 
val sql = s"select min($partitionColumn) as min, max($partitionColumn) as max from $table${ 
    if (whereClause.length > 0) s" where $whereClause" 
}" 
val df = run(sql).collect()(0) 

Array(df.get(0).asInstanceOf[Int], df.get(1).asInstanceOf[Int]) 
} 

def getTableFields(table: String): String = { 
val sql = 
    s""" 
    |SELECT * 
    |FROM information_schema.COLUMNS 
    |WHERE table_name LIKE '$table' 
    | AND TABLE_SCHEMA LIKE '${db_name}' 
    |ORDER BY ORDINAL_POSITION 
    """.stripMargin 
run(sql).collect().map(r => r.getAs[String]("COLUMN_NAME")).mkString(", ") 
} 

/** 
* Returns DataFrame partitioned by <partritionColumn> to number of partitions provided in 
* <numPartitions> for a <table> with WHERE clause 
* @param table - a table name 
* @param whereClause - WHERE clause without "WHERE" key word 
* @param partitionColumn - column name used for partitioning, should be numeric 
* @param numPartitions - number of partitions 
* @return - a DataFrame 
*/ 
def run(table: String, whereClause: String, partitionColumn: String, numPartitions: Int): DataFrame = { 
val bounds = getBounds(table, whereClause, partitionColumn) 

val fields = getTableFields(table) 
val dfs: Array[DataFrame] = new Array[DataFrame](numPartitions) 

val lowerBound = bounds(0) 
val partitionRange: Int = ((bounds(1) - bounds(0))/numPartitions) 

for (i <- 0 to numPartitions - 2) { 
    dfs(i) = run(
    s"""select $fields from $table 
     | where $partitionColumn >= ${lowerBound + (partitionRange * i)} and $partitionColumn < ${lowerBound + (partitionRange * (i + 1))}${ 
     if (whereClause.length > 0) 
     s" and $whereClause" 
    } 
    """.stripMargin.replace("\n", "")) 
} 

dfs(numPartitions - 1) = run(s"select $fields from $table where $partitionColumn >= ${lowerBound + (partitionRange * (numPartitions - 1))}${ 
    if (whereClause.length > 0) 
    s" and $whereClause" 
}".replace("\n", "")) 

dfs.reduceLeft((res, df) => res.union(df)) 

} 

Phương pháp run mới nhất sẽ tạo ra một số phân vùng cần thiết. Khi bạn gọi một phương thức hành động, Spark sẽ tạo ra nhiều nhiệm vụ song song vì nhiều phân vùng đã được định nghĩa cho DataFrame được trả về theo phương thức run.

Tận hưởng.

Các vấn đề liên quan