2012-07-02 30 views
5

Tôi muốn tạo nhiều bộ dữ liệu từ một bộ dữ liệu. Ý tôi là: Tôi có tệp với dữ liệu sau trong đó.Tách một tuple thành nhiều bộ dữ liệu trong Pig

>> cat data 
ID | ColumnName1:Value1 | ColumnName2:Value2 

vì vậy tôi tải nó bằng lệnh sau

grunt >> A = load '$data' using PigStorage('|');  
grunt >> dump A;  
(ID,ColumnName1:Value1,ColumnName2:Value2) 

Bây giờ tôi muốn chia tuple này thành hai bộ dữ liệu.

(ID, ColumnName1, Value1) 
(ID, ColumnName2, Value2) 

Tôi có thể sử dụng UDF cùng với foreach và tạo ra không. Một số điều như sau?

grunt >> foreach A generate SOMEUDF(A) 

EDIT:

đầu vào tuple: (id1, column1, column2) đầu ra: hai tuples (id1, column1) và (id2, column2) nên nó là Danh sách hay tôi nên trả lại một túi?

public class SPLITTUPPLE extends EvalFunc <List<Tuple>> 
{ 
    public List<Tuple> exec(Tuple input) throws IOException { 
     if (input == null || input.size() == 0) 
      return null; 
     try{ 
      // not sure how whether I can create tuples on my own. Looks like I should use TupleFactory. 
      // return list of tuples. 
     }catch(Exception e){ 
      throw WrappedIOException.wrap("Caught exception processing input row ", e); 
     } 
    } 
} 

Phương pháp này có đúng không?

Trả lời

10

Bạn có thể viết UDF hoặc sử dụng tập lệnh PIG có chức năng tích hợp sẵn.

Ví dụ:

-- data should be chararray, PigStorage('|') return bytearray which will not work for this example 
inpt = load '/pig_fun/input/single_tuple_to_multiple.txt' as (line:chararray); 

-- split by | and create a row so we can dereference it later 
splt = foreach inpt generate FLATTEN(STRSPLIT($0, '\\|')) ; 

-- first column is id, rest is converted into a bag and flatten it to make rows 
id_vals = foreach splt generate $0 as id, FLATTEN(TOBAG(*)) as value; 
-- there will be records with (id, id), but id should not have ':' 
id_vals = foreach id_vals generate id, INDEXOF(value, ':') as p, STRSPLIT(value, ':', 2) as vals; 
final = foreach (filter id_vals by p != -1) generate id, FLATTEN(vals) as (col, val); 
dump final; 

kiểm tra đầu vào:

1|c1:11:33|c2:12 
234|c1:21|c2:22 
33|c1:31|c2:32 
345|c1:41|c2:42 

OUTPUT

(1,c1,11:33) 
(1,c2,12) 
(234,c1,21) 
(234,c2,22) 
(33,c1,31) 
(33,c2,32) 
(345,c1,41) 
(345,c2,42) 

Tôi hy vọng nó giúp.

Chúc mừng.

+0

Cảm ơn rất nhiều. Tôi có thể làm điều tương tự với việc viết một UDF. Tôi cập nhật câu hỏi. – FourOfAKind

+0

Có thể. Xem câu trả lời tiếp theo. – alexeipab

+0

Trợ giúp tuyệt vời của nó. Cảm ơn vì đã dành thời gian cho tôi. – FourOfAKind

6

Đây là phiên bản UDF. Tôi thích trả về một số BAG:

import java.io.IOException; 

import org.apache.pig.EvalFunc; 
import org.apache.pig.backend.executionengine.ExecException; 
import org.apache.pig.data.BagFactory; 
import org.apache.pig.data.DataBag; 
import org.apache.pig.data.DataType; 
import org.apache.pig.data.Tuple; 
import org.apache.pig.data.TupleFactory; 
import org.apache.pig.impl.logicalLayer.FrontendException; 
import org.apache.pig.impl.logicalLayer.schema.Schema; 

/** 
* Converts input chararray "ID|ColumnName1:Value1|ColumnName2:Value2|.." into a bag 
* {(ID, ColumnName1, Value1), (ID, ColumnName2, Value2), ...} 
* 
* Default rows separator is '|' and key value separator is ':'. 
* In this implementation white spaces around separator characters are not removed. 
* ID can be made of any character (including sequence of white spaces). 
* @author 
* 
*/ 
public class TupleToBagColumnValuePairs extends EvalFunc<DataBag> { 

    private static final TupleFactory tupleFactory = TupleFactory.getInstance(); 
    private static final BagFactory bagFactory = BagFactory.getInstance(); 

    //Row separator character. Default is '|'. 
    private String rowsSeparator; 
    //Column value separator character. Default i 
    private String columnValueSeparator; 

    public TupleToBagColumnValuePairs() { 
     this.rowsSeparator = "\\|"; 
     this.columnValueSeparator = ":"; 
    } 

    public TupleToBagColumnValuePairs(String rowsSeparator, String keyValueSeparator) { 
     this.rowsSeparator = rowsSeparator; 
     this.columnValueSeparator = keyValueSeparator; 
    } 

    /** 
    * Creates a tuple with 3 fields (id:chararray, column:chararray, value:chararray) 
    * @param outputBag Output tuples (id, column, value) are added to this bag 
    * @param id 
    * @param column 
    * @param value 
    * @throws ExecException 
    */ 
    protected void addTuple(DataBag outputBag, String id, String column, String value) throws ExecException { 
     Tuple outputTuple = tupleFactory.newTuple(); 
     outputTuple.append(id); 
     outputTuple.append(column); 
     outputTuple.append(value); 
     outputBag.add(outputTuple); 
    } 

    /** 
    * Takes column{separator}value from splitInputLine, splits id into column value and adds them to the outputBag as (id, column, value) 
    * @param outputBag Output tuples (id, column, value) should be added to this bag 
    * @param id 
    * @param splitInputLine format column{separator}value, which start from index 1 
    * @throws ExecException 
    */ 
    protected void parseColumnValues(DataBag outputBag, String id, 
      String[] splitInputLine) throws ExecException { 
     for (int i = 1; i < splitInputLine.length; i++) { 
      if (splitInputLine[i] != null) { 
       int columnValueSplitIndex = splitInputLine[i].indexOf(this.columnValueSeparator); 
       if (columnValueSplitIndex != -1) { 
        String column = splitInputLine[i].substring(0, columnValueSplitIndex); 
        String value = null; 
        if (columnValueSplitIndex + 1 < splitInputLine[i].length()) { 
         value = splitInputLine[i].substring(columnValueSplitIndex + 1); 
        } 
        this.addTuple(outputBag, id, column, value); 
       } else { 
        String column = splitInputLine[i]; 
        this.addTuple(outputBag, id, column, null); 
       } 
      } 
     } 
    } 

    /** 
    * input - contains only one field of type chararray, which will be split by '|' 
    * All inputs that are: null or of length 0 are ignored. 
    */ 
    @Override 
    public DataBag exec(Tuple input) throws IOException { 
     if (input == null || input.size() != 1 || input.isNull(0)) { 
      return null; 
     } 

     String inputLine = (String)input.get(0); 
     String[] splitInputLine = inputLine.split(this.rowsSeparator, -1); 

     if (splitInputLine.length > 1 && splitInputLine[0].length() > 0) { 
      String id = splitInputLine[0]; 
      DataBag outputBag = bagFactory.newDefaultBag();    
      if (splitInputLine.length == 1) { // there is just an id in the line 
       this.addTuple(outputBag, id, null, null); 
      } else { 
       this.parseColumnValues(outputBag, id, splitInputLine); 
      } 


      return outputBag; 
     } 
     return null; 
    } 

    @Override 
    public Schema outputSchema(Schema input) { 
     try { 
      if (input.size() != 1) { 
       throw new RuntimeException("Expected input to have only one field"); 
      } 

      Schema.FieldSchema inputFieldSchema = input.getField(0); 
      if (inputFieldSchema.type != DataType.CHARARRAY) { 
       throw new RuntimeException("Expected a CHARARRAY as input"); 
      } 

      Schema tupleSchema = new Schema(); 
      tupleSchema.add(new Schema.FieldSchema("id", DataType.CHARARRAY)); 
      tupleSchema.add(new Schema.FieldSchema("column", DataType.CHARARRAY)); 
      tupleSchema.add(new Schema.FieldSchema("value", DataType.CHARARRAY)); 

      return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), tupleSchema, DataType.BAG)); 
     } catch (FrontendException exx) { 
      throw new RuntimeException(exx); 
     } 
    } 

} 

Dưới đây là làm thế nào nó được sử dụng trong PIG:

register 'path to the jar'; 
define IdColumnValue myPackage.TupleToBagColumnValuePairs(); 

inpt = load '/pig_fun/input/single_tuple_to_multiple.txt' as (line:chararray); 
result = foreach inpt generate FLATTEN(IdColumnValue($0)) as (id1, c2, v2); 
dump result; 

Một nguồn cảm hứng tốt để viết UDFs với túi thấy DataFu source code by LinkedIn

0

Bạn có thể sử dụng TransposeTupleToBag (UDF từ DataFu lib) trên đầu ra của STRSPLIT để lấy túi, và sau đó FLATTEN túi để tạo hàng riêng biệt cho mỗi cột ban đầu.

Các vấn đề liên quan