2013-02-01 25 views
7

Tôi có bảng Hive chứa dữ liệu của các cuộc gọi của khách hàng. Để đơn giản xem xét nó có 2 cột, cột đầu tiên giữ ID khách hàng và cột thứ hai giữ dấu thời gian của cuộc gọi (dấu thời gian unix).Tính toán sự khác biệt giữa các bản ghi liên tiếp trong Hadoop với Truy vấn Hive

Tôi có thể truy vấn bảng này để tìm tất cả các cuộc gọi cho mỗi khách hàng:

SELECT * FROM mytable SORT BY customer_id, call_time; 

Kết quả là:

Customer1 timestamp11 
Customer1 timestamp12 
Customer1 timestamp13 
Customer2 timestamp21 
Customer3 timestamp31 
Customer3 timestamp32 
... 

Có thể tạo một truy vấn Hive trả về, đối với mỗi khách hàng , bắt đầu từ cuộc gọi thứ hai, khoảng thời gian giữa hai cuộc gọi thành công? Đối với ví dụ trên truy vấn mà nên trở lại:

Customer1 timestamp12-timestamp11 
Customer1 timestamp13-timestamp12 
Customer3 timestamp32-timestamp31 
... 

Tôi đã cố gắng để thích ứng với các giải pháp từ sql solution, nhưng tôi bị mắc kẹt với những hạn chế Hive: it accepts subqueries only in FROMjoins must contain only equalities.

Cảm ơn bạn.

EDIT1:

Tôi đã cố gắng sử dụng một hàm Hive UDF:

public class DeltaComputerUDF extends UDF { 
private String previousCustomerId; 
private long previousCallTime; 

public String evaluate(String customerId, LongWritable callTime) { 
    long callTimeValue = callTime.get(); 
    String timeDifference = null; 

    if (customerId.equals(previousCustomerId)) { 
     timeDifference = new Long(callTimeValue - previousCallTime).toString(); 
    } 

    previousCustomerId = customerId; 
    previousCallTime = callTimeValue; 

    return timeDifference; 
}} 

và sử dụng nó với cái tên "đồng bằng".

Nhưng có vẻ như (từ nhật ký và kết quả) đang được sử dụng vào thời gian MAP. 2 vấn đề nảy sinh từ đây:

Đầu tiên: Các bảng dữ liệu phải được sắp xếp theo ID khách hàng và timestamp trước khi sử dụng chức năng này. Truy vấn:

SELECT customer_id, call_time, delta(customer_id, call_time) FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time; 

không hoạt động vì phần sắp xếp được thực hiện lúc REDUCE, sau khi chức năng của tôi đang được sử dụng.

Tôi có thể sắp xếp dữ liệu bảng trước khi sử dụng hàm, nhưng tôi không hài lòng với điều này vì nó là phí mà tôi hy vọng sẽ tránh.

Thứ hai: Trong trường hợp cấu hình Hadoop phân tán, dữ liệu được chia giữa các trình theo dõi công việc có sẵn. Vì vậy, tôi tin rằng sẽ có nhiều trường hợp của hàm này, một cho mỗi người lập bản đồ, do đó, có thể có cùng một dữ liệu khách hàng được chia giữa 2 người lập bản đồ. Trong trường hợp này, tôi sẽ mất các cuộc gọi của khách hàng, điều này không được chấp nhận.

Tôi không biết cách giải quyết vấn đề này. Tôi biết rằng DISTRIBUTE BY đảm bảo rằng tất cả các dữ liệu với một giá trị cụ thể được gửi đến cùng một bộ giảm (do đó đảm bảo rằng SORT hoạt động như mong đợi), không ai biết nếu có cái gì đó tương tự cho người lập bản đồ?

Tiếp theo, tôi định làm theo đề xuất của libjack để sử dụng tập lệnh giảm. Điều này "tính toán" là cần thiết giữa một số truy vấn hive khác, vì vậy tôi muốn thử tất cả mọi thứ Hive cung cấp, trước khi chuyển sang một công cụ khác, theo đề nghị của Balaswamy vaddeman.

EDIT2:

tôi bắt đầu để điều tra giải pháp kịch bản tùy chỉnh. Nhưng, trong trang đầu tiên của chương 14 trong Lập trình Hive cuốn sách (chương này trình bày các kịch bản tùy chỉnh), tôi thấy đoạn sau:

streaming thường là kém hiệu quả hơn so với mã hóa UDFs thể so sánh hoặc đối tượng InputFormat. Serialize và deserializing dữ liệu để vượt qua nó trong và ra khỏi đường ống là tương đối kém hiệu quả. Nó cũng khó khăn hơn để gỡ lỗi toàn bộ chương trình một cách thống nhất. Tuy nhiên, nó rất hữu ích cho việc tạo mẫu nhanh và để tận dụng mã hiện có không được viết bằng Java. Đối với người dùng Hive người không muốn viết mã Java, nó có thể là cách tiếp cận rất hiệu quả.

Vì vậy, rõ ràng rằng tập lệnh tùy chỉnh không phải là giải pháp tốt nhất về mặt hiệu quả.

Nhưng làm cách nào để giữ chức năng UDF của tôi, nhưng đảm bảo chức năng hoạt động như mong đợi trong cấu hình Hadoop phân tán? Tôi đã tìm thấy câu trả lời cho câu hỏi này trong phần UDF Internals của trang Wiki Ngôn ngữ UDF thủ công. Nếu tôi viết truy vấn của tôi:

SELECT customer_id, call_time, delta(customer_id, call_time) FROM (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t; 

nó được thực hiện tại giảm thời gian và PHÂN PHỐI BỞI và Sắp xếp cấu trúc đảm bảo rằng tất cả các hồ sơ từ khách hàng cùng đang được xử lý bởi các giảm tương tự, theo thứ tự của các cuộc gọi.

Vì vậy, UDF ở trên và truy vấn này xây dựng giải quyết vấn đề của tôi.

(Xin lỗi vì đã không thêm các liên kết, nhưng tôi không được phép làm điều đó bởi vì tôi không có đủ điểm danh tiếng)

+0

Tôi nghĩ điều này rất giống với [câu hỏi này] (http://stackoverflow.com/questions/14028796/reduce-a-set-of-rows-in-hive-to-another-set-of-rows) Tôi trả lời bằng cách sử dụng một bản đồ tùy chỉnh/giảm trong hive. Bạn sẽ phải cung cấp kịch bản giảm phù hợp. – libjack

+0

Tôi không biết làm thế nào để làm điều này trong hive nhưng có cascading api để làm this.there là một cái gì đó được gọi là bộ đệm trong cascading.http: //docs.cascading.org/cascading/2.0/userguide/html/ch05s05.html –

Trả lời

11

Đó là một câu hỏi cũ, nhưng đối với tài liệu tham khảo trong tương lai, tôi viết ở đây khác đề xuất:

Hive Windowing functions cho phép sử dụng giá trị trước đó/tiếp theo trong truy vấn của bạn.

Một truy vấn đang simili có thể là:

CHỌN customer_id, LAG (call_time, 1, 0) OVER (PARTITION BY TỰ DO customer_id ROWS call_time 1 TRƯỚC) - call_time FROM mytable;

0

Có lẽ ai đó bắt gặp một yêu cầu tương tự, giải pháp tôi thấy như sau:

1) Tạo một chức năng tùy chỉnh:

package com.example; 
// imports (they depend on the hive version) 
@Description(name = "delta", value = "_FUNC_(customer id column, call time column) " 
    + "- computes the time passed between two succesive records from the same customer. " 
    + "It generates 3 columns: first contains the customer id, second contains call time " 
    + "and third contains the time passed from the previous call. This function returns only " 
    + "the records that have a previous call from the same customer (requirements are not applicable " 
    + "to the first call)", extended = "Example:\n> SELECT _FUNC_(customer_id, call_time) AS" 
    + "(customer_id, call_time, time_passed) FROM (SELECT customer_id, call_time FROM mytable " 
    + "DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t;") 
public class DeltaComputerUDTF extends GenericUDTF { 
private static final int NUM_COLS = 3; 

private Text[] retCols; // array of returned column values 
private ObjectInspector[] inputOIs; // input ObjectInspectors 
private String prevCustomerId; 
private Long prevCallTime; 

@Override 
public StructObjectInspector initialize(ObjectInspector[] ois) throws UDFArgumentException { 
    if (ois.length != 2) { 
     throw new UDFArgumentException(
       "There must be 2 arguments: customer Id column name and call time column name"); 
    } 

    inputOIs = ois; 

    // construct the output column data holders 
    retCols = new Text[NUM_COLS]; 
    for (int i = 0; i < NUM_COLS; ++i) { 
     retCols[i] = new Text(); 
    } 

    // construct output object inspector 
    List<String> fieldNames = new ArrayList<String>(NUM_COLS); 
    List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(NUM_COLS); 
    for (int i = 0; i < NUM_COLS; ++i) { 
     // column name can be anything since it will be named by UDTF as clause 
     fieldNames.add("c" + i); 
     // all returned type will be Text 
     fieldOIs.add(PrimitiveObjectInspectorFactory.writableStringObjectInspector); 
    } 

    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs); 
} 

@Override 
public void process(Object[] args) throws HiveException { 
    String customerId = ((StringObjectInspector) inputOIs[0]).getPrimitiveJavaObject(args[0]); 
    Long callTime = ((LongObjectInspector) inputOIs[1]).get(args[1]); 

    if (customerId.equals(prevCustomerId)) { 
     retCols[0].set(customerId); 
     retCols[1].set(callTime.toString()); 
     retCols[2].set(new Long(callTime - prevCallTime).toString()); 
     forward(retCols); 
    } 

    // Store the current customer data, for the next line 
    prevCustomerId = customerId; 
    prevCallTime = callTime; 
} 

@Override 
public void close() throws HiveException { 
    // TODO Auto-generated method stub 

} 

} 

2) Tạo một lọ chứa chức năng này. Giả sử jarname là myjar.jar.

3) Sao chép bình vào máy bằng Hive. Giả sử nó được đặt trong/tmp

4) Xác định các chức năng tùy chỉnh bên trong Hive:

ADD JAR /tmp/myjar.jar; 
CREATE TEMPORARY FUNCTION delta AS 'com.example.DeltaComputerUDTF'; 

5) Thực hiện truy vấn:

SELECT delta(customer_id, call_time) AS (customer_id, call_time, time_difference) FROM 
    (SELECT customer_id, call_time FROM mytable DISTRIBUTE BY customer_id SORT BY customer_id, call_time) t; 

Ghi chú:

a. Tôi giả định rằng cột call_time lưu trữ dữ liệu dưới dạng bigint. Trong trường hợp nó là chuỗi, trong hàm xử lý, chúng tôi lấy nó dưới dạng chuỗi (như chúng ta làm với customerId), sau đó phân tích nó thành Long

b. Tôi quyết định sử dụng một UDTF thay vì UDF bởi vì cách này nó tạo ra tất cả các dữ liệu cần thiết. Nếu không (với UDF) dữ liệu được tạo cần phải được lọc để bỏ qua các giá trị NULL.Vì vậy, với chức năng UDF (DeltaComputerUDF) được mô tả trong lần chỉnh sửa đầu tiên của bài đăng gốc, truy vấn sẽ là:

SELECT customer_id, call_time, time_difference 
FROM 
    (
    SELECT delta(customer_id, call_time) AS (customer_id, call_time, time_difference) 
    FROM 
     (
     SELECT customer_id, call_time FROM mytable 
     DISTRIBUTE BY customer_id 
     SORT BY customer_id, call_time 
     ) t 
    ) u 
WHERE time_difference IS NOT NULL; 

c. Cả hai hàm (UDF và UDTF) hoạt động như mong muốn, bất kể thứ tự các hàng bên trong bảng (vì vậy không có yêu cầu dữ liệu bảng phải được sắp xếp theo id khách hàng và thời gian gọi trước khi sử dụng hàm delta)

1

Bạn có thể sử dụng rõ ràng MAP-REDUCE với ngôn ngữ lập trình khác như Java hoặc Python. Nơi phát ra từ bản đồ {cutomer_id,call_time} và trong bộ giảm tốc, bạn sẽ nhận được {customer_id,list{time_stamp}} và trong bộ giảm tốc, bạn có thể sắp xếp các dấu thời gian này và có thể xử lý dữ liệu.

Các vấn đề liên quan