2016-03-17 17 views
10

Tôi đang sử dụng CDH5.5Truy vấn bảng HIVE trong pyspark

Tôi có bảng được tạo trong cơ sở dữ liệu mặc định HIVE và có thể truy vấn nó từ lệnh HIVE.

Output

hive> use default; 

OK 

Time taken: 0.582 seconds 


hive> show tables; 

OK 

bank 
Time taken: 0.341 seconds, Fetched: 1 row(s) 

hive> select count(*) from bank; 

OK 

542 

Time taken: 64.961 seconds, Fetched: 1 row(s) 

Tuy nhiên, tôi không thể truy vấn bảng từ pyspark vì nó không thể nhận ra bàn.

from pyspark.context import SparkContext 

from pyspark.sql import HiveContext 

sqlContext = HiveContext(sc) 


sqlContext.sql("use default") 

DataFrame[result: string] 

sqlContext.sql("show tables").show() 

+---------+-----------+ 

|tableName|isTemporary| 

+---------+-----------+ 

+---------+-----------+ 


sqlContext.sql("FROM bank SELECT count(*)") 

16/03/16 20:12:13 INFO parse.ParseDriver: Parsing command: FROM bank SELECT count(*) 
16/03/16 20:12:13 INFO parse.ParseDriver: Parse Completed 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/usr/lib/spark/python/pyspark/sql/context.py", line 552, in sql 
     return DataFrame(self._ssql_ctx.sql(sqlQuery), self) 
    File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ 
    File "/usr/lib/spark/python/pyspark/sql/utils.py", line 40, in deco 
     raise AnalysisException(s.split(': ', 1)[1]) 
    **pyspark.sql.utils.AnalysisException: no such table bank; line 1 pos 5** 

Lỗi New

>>> from pyspark.sql import HiveContext 
>>> hive_context = HiveContext(sc) 
>>> bank = hive_context.table("default.bank") 
16/03/22 18:33:30 INFO DataNucleus.Persistence: Property datanucleus.cache.level2 unknown - will be ignored 
16/03/22 18:33:30 INFO DataNucleus.Persistence: Property hive.metastore.integral.jdo.pushdown unknown - will be ignored 
16/03/22 18:33:44 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table. 
16/03/22 18:33:44 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table. 
16/03/22 18:33:48 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MFieldSchema" is tagged as "embedded-only" so does not have its own datastore table. 
16/03/22 18:33:48 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MOrder" is tagged as "embedded-only" so does not have its own datastore table. 
16/03/22 18:33:50 INFO DataNucleus.Datastore: The class "org.apache.hadoop.hive.metastore.model.MResourceUri" is tagged as "embedded-only" so does not have its own datastore table. 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/usr/lib/spark/python/pyspark/sql/context.py", line 565, in table 
    return DataFrame(self._ssql_ctx.table(tableName), self) 
    File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ 
    File "/usr/lib/spark/python/pyspark/sql/utils.py", line 36, in deco 
    return f(*a, **kw) 
    File "/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling o22.table. 
: org.apache.spark.sql.catalyst.analysis.NoSuchTableException 
    at org.apache.spark.sql.hive.client.ClientInterface$$anonfun$getTable$1.apply(ClientInterface.scala:123) 
    at org.apache.spark.sql.hive.client.ClientInterface$$anonfun$getTable$1.apply(ClientInterface.scala:123) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.sql.hive.client.ClientInterface$class.getTable(ClientInterface.scala:123) 
    at org.apache.spark.sql.hive.client.ClientWrapper.getTable(ClientWrapper.scala:60) 
    at org.apache.spark.sql.hive.HiveMetastoreCatalog.lookupRelation(HiveMetastoreCatalog.scala:406) 
    at org.apache.spark.sql.hive.HiveContext$$anon$1.org$apache$spark$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(HiveContext.scala:422) 
    at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:203) 
    at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:203) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.sql.catalyst.analysis.OverrideCatalog$class.lookupRelation(Catalog.scala:203) 
    at org.apache.spark.sql.hive.HiveContext$$anon$1.lookupRelation(HiveContext.scala:422) 
    at org.apache.spark.sql.SQLContext.table(SQLContext.scala:739) 
    at org.apache.spark.sql.SQLContext.table(SQLContext.scala:735) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:207) 
    at java.lang.Thread.run(Thread.java:745) 

nhờ

Trả lời

20

Chúng ta không thể vượt qua các tên bảng Hive trực tiếp đến Hive phương pháp bối cảnh sql vì nó không hiểu được tên bảng Hive. Một cách để đọc bảng Hive trong vỏ pyspark là:

from pyspark.sql import HiveContext 
hive_context = HiveContext(sc) 
bank = hive_context.table("default.bank") 
bank.show() 

Để chạy SQL trên bảng hive: tiên, chúng ta cần phải đăng ký khung dữ liệu chúng tôi có được từ việc đọc bảng tổ ong. Sau đó, chúng ta có thể chạy truy vấn SQL.

bank.registerTempTable("bank_temp") 
hive_context.sql("select * from bank_temp").show() 
+0

cảm ơn.Tuy nhiên, tôi nhận được lỗi này. – Chn

+0

bank = hive_context.table ("bank") Traceback (cuộc gọi gần đây nhất): Tệp "", dòng 1, trong Tệp "/usr/lib/spark/python/pyspark/sql/context.py ", dòng 565, trong bảng trả về DataFrame (self._ssql_ctx.table (tableName), self) Tệp" /usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py ", dòng 538, trong __call__ Tệp" /usr/lib/spark/python/pyspark/sql/utils.py ", dòng 36, trong deco return f (* a, ** kw) Tệp"/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py ", dòng 300, trong get_return_value py4j.protocol.Py4JJavaError: Đã xảy ra lỗi khi gọi o30.table. – Chn

+0

Tôi đã chỉnh sửa câu trả lời để bao gồm tên cơ sở dữ liệu. Nó sẽ hoạt động ngay bây giờ. – bijay697

0

Tôi cũng đang cố gắng thực hiện việc này. Khi tôi chạy tập đầu tiên của lệnh, tôi nhận được báo lỗi dưới đây:

line 300, in get_return_value 

py4j.protocol.Py4JJavaError: Một lỗi đã xảy ra trong khi gọi o28.table. : org.apache.spark.sql.types.DataTypeException: Dữ liệu không được hỗ trợType: char (1). Nếu bạn có cấu trúc và tên trường của nó có bất kỳ ký tự đặc biệt nào, vui lòng sử dụng dấu sau () to quote that field name, e.g. x + y`. Xin lưu ý rằng chính bản thân nó không được hỗ trợ trong tên trường.

-2

bạn có thể sử dụng sqlCtx.sql. hive-site.xml nên được sao chép vào châm ngòi đường conf.

my_dataframe = sqlCtx.sql ("Select * from loại") my_dataframe.show()

7

SparkSQL được vận chuyển với metastore riêng của mình (derby), để nó có thể hoạt động ngay cả khi hive chưa được cài đặt trên hệ thống.Đây là chế độ mặc định.

Trong câu hỏi trên, bạn đã tạo một bảng trong hive. Bạn gặp lỗi table not found vì SparkSQL đang sử dụng kho dữ liệu mặc định của nó mà không có siêu dữ liệu trong bảng hive của bạn.

Nếu bạn muốn SparkSQL sử dụng kho lưu trữ hive thay thế và truy cập bảng hive, thì bạn phải thêm hive-site.xml vào thư mục conf spark.

0

Không chắc, nếu điều này không được giải quyết chưa, tôi đã kiểm tra ra các hạt nhân pyspark với Livy hội nhập và đây là làm thế nào tôi đã thử nghiệm cấu hình tổ ong

from pyspark.sql import Row 
from pyspark.sql import HiveContext 
sqlContext = HiveContext(sc) 
test_list = [('A', 25),('B', 20),('C', 25),('D', 18)] 
rdd = sc.parallelize(test_list) 
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1]))) 
schemaPeople = sqlContext.createDataFrame(people) 
# Register it as a temp table 
sqlContext.registerDataFrameAsTable(schemaPeople, "test_table") 
sqlContext.sql("show tables").show() 


Output: 
-------- 
+--------+----------+-----------+ 
|database| tableName|isTemporary| 
+--------+----------+-----------+ 
|  |test_table|  true| 
+--------+----------+-----------+ 

Now one can query it in many different ways, 
1. jupyter kernel(sparkmagic syntax): 
    %%sql 
    SELECT * FROM test_table limit 4 
2. Using default HiveContext: 
    sqlContext.sql("Select * from test_table").show() 
0

Tại vấn đề của tôi, cp hive tại chỗ. xml để $ SPARK_HOME/conf của bạn, và cp mysql-connect-java - *. jar vào $ SPARK_HOME/jars của bạn, giải pháp này đã giải quyết được vấn đề của tôi.

Các vấn đề liên quan