2013-07-13 47 views
7

Tôi mới sử dụng Trident trong Storm. Tôi đang phá vỡ đầu của tôi trên TridentState. Theo như sự hiểu biết của tôi trident duy trì trạng thái (tức là siêu dữ liệu) cho mỗi lô (cho dù tất cả các bộ dữ liệu trong một lô được xử lý hoàn toàn bằng cách duy trì một id giao dịch trong cơ sở dữ liệu) và tôi không hoàn toàn chắc chắn câu lệnh sau đâyTrạng thái Trident trong Storm là gì?

TridentState urlToTweeters = 
    topology.newStaticState(getUrlToTweetersState()); 

Có ai có thể giải thích điều gì thực sự xảy ra khi chúng tôi xác định mã trên không?

+0

Bạn có thể xác định "Trident" trong ngữ cảnh này không? Có nhiều thứ gọi là Trident. – Charles

+1

Ngữ cảnh là "Bão": https://github.com/nathanmarz/storm/wiki/Documentation#trident – Dan

Trả lời

0

Có tài liệu tốt về trạng thái Trident on the storm wiki. Câu trả lời đơn giản cho câu hỏi của bạn là urlToTweeters là một đối tượng trạng thái có thể được truy vấn từ đó. Tôi giả định tuyên bố trên là từ trident tutorial, sao chép dưới đây:

TridentState urlToTweeters = topology.newStaticState(getUrlToTweetersState()); 
TridentState tweetersToFollowers = topology.newStaticState(getTweeterToFollowersState()); 
topology.newDRPCStream("reach") 
    .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter")) 
    /* At this point we have the tweeters for each url passed in args */ 
    .shuffle()   
    .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers")) 
    .parallelismHint(200) 
    .each(new Fields("followers"), new ExpandList(), new Fields("follower")) 
    .groupBy(new Fields("follower")) 
    .aggregate(new One(), new Fields("one")) 
    .parallelismHint(20) 
    .aggregate(new Count(), new Fields("reach")); 

Trong ví dụ này, urlToTweeters sẽ lưu trữ một ánh xạ các URL để loa, và truy vấn DRPC reach xác định trên dòng tiếp theo (mà mất trong URL là đối số của nó) cuối cùng sẽ mang lại khả năng hiển thị. Nhưng trên đường đi (được đánh dấu bằng nội dòng nhận xét), bạn sẽ thấy một luồng các loa của mỗi url, tức là kết quả của một truy vấn trên urlToTweeters.

+0

bạn có thể trợ giúp về http://stackoverflow.com/questions/35445165/total-number-of-non này không -repeated-words-in-each-tweet – user1

9

Tôi hy vọng không bao giờ là quá muộn để trả lời, ít nhất là người khác có thể tìm thấy câu trả lời của tôi có ích :)

Vì vậy, topology.newStaticState() là trừu tượng của một lưu trữ dữ liệu queryable của Trident. Thông số cho newStaticState() phải là một triển khai - dựa trên hợp đồng của phương thức - trong số storm.trident.state.StateFactory. Nhà máy, lần lượt, nên thực hiện phương pháp makeState() trả về một thể hiện của storm.trident.state.State. Tuy nhiên, nếu bạn có kế hoạch truy vấn trạng thái của mình, bạn nên trả lại giá trị là storm.trident.state.map.ReadOnlyMapState, vì đồng bằng storm.trident.state.State không có phương pháp truy vấn nguồn dữ liệu thực tế (bạn sẽ thực sự có ngoại lệ cho lớp học nếu bạn cố gắng sử dụng bất kỳ thứ gì trừ ReadOnlyMapState).

Vì vậy, hãy thử xem!

Một thực hiện giả nhà nước:

public static class ExampleStaticState implements ReadOnlyMapState<String> { 

    private final Map<String, String> dataSourceStub; 

    public ExampleStaticState() { 
     dataSourceStub = new HashMap<>(); 
     dataSourceStub.put("tuple-00", "Trident"); 
     dataSourceStub.put("tuple-01", "definitely"); 
     dataSourceStub.put("tuple-02", "lacks"); 
     dataSourceStub.put("tuple-03", "documentation"); 
    } 

    @Override 
    public List<String> multiGet(List<List<Object>> keys) { 

     System.out.println("DEBUG: MultiGet, keys is " + keys); 

     List<String> result = new ArrayList<>(); 

     for (List<Object> inputTuple : keys) { 
      result.add(dataSourceStub.get(inputTuple.get(0))); 
     } 

     return result; 
    } 

    @Override 
    public void beginCommit(Long txid) { 
     // never gets executed... 
     System.out.println("DEBUG: Begin commit, txid=" + txid); 
    } 

    @Override 
    public void commit(Long txid) { 
     // never gets executed... 
     System.out.println("DEBUG: Commit, txid=" + txid); 
    } 
} 

Một nhà máy:

public static class ExampleStaticStateFactory implements StateFactory { 
    @Override 
    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) { 
     return new ExampleStaticState(); 
    } 
} 

Một đơn giản psvm (aka public static void main):

public static void main(String... args) { 
    TridentTopology tridentTopology = new TridentTopology(); 
    FeederBatchSpout spout = new FeederBatchSpout(Arrays.asList(new String[]{ 
      "foo" 
    })); 
    TridentState state = tridentTopology.newStaticState(new ExampleStaticStateFactory()); 
    tridentTopology 
      .newStream("spout", spout) 
      .stateQuery(state, new Fields("foo"), new MapGet(), new Fields("bar")) 
      .each(new Fields("foo", "bar"), new Debug()) 
      ; 

    Config conf = new Config(); 
    conf.setNumWorkers(6); 

    LocalCluster localCluster = new LocalCluster(); 
    localCluster.submitTopology("tridentTopology", conf, tridentTopology.build()); 

    spout.feed(Arrays.asList(new Values[]{ 
      new Values("tuple-00"), 
      new Values("tuple-01"), 
      new Values("tuple-02"), 
      new Values("tuple-03") 
    })); 

    localCluster.shutdown(); 
} 

Và, cuối cùng, kết quả:

DEBUG: MultiGet, keys is [[tuple-00], [tuple-01], [tuple-02], [tuple-03]] 
DEBUG: [tuple-00, Trident] 
DEBUG: [tuple-01, definitely] 
DEBUG: [tuple-02, lacks] 
DEBUG: [tuple-03, documentation] 

Bạn thấy đấy, stateQuery() nhận các giá trị từ một lô đầu vào và ánh xạ chúng tới các giá trị được tìm thấy trong 'lưu trữ dữ liệu'.

Lặn một chút sâu hơn, bạn có thể có một cái nhìn tại nguồn của MapGet lớp (anh chàng mà dụ được sử dụng để truy vấn bên trong cấu trúc liên kết) và tìm ra sau đó:

public class MapGet extends BaseQueryFunction<ReadOnlyMapState, Object> { 
    @Override 
    public List<Object> batchRetrieve(ReadOnlyMapState map, List<TridentTuple> keys) { 
     return map.multiGet((List) keys); 
    }  

    @Override 
    public void execute(TridentTuple tuple, Object result, TridentCollector collector) { 
     collector.emit(new Values(result)); 
    }  
} 

Vì vậy, dưới mui xe nó chỉ cần gọi phương thức multiGet() của việc thực hiện ReadOnlyMapState và sau đó phát ra các giá trị được tìm thấy trong lưu trữ dữ liệu, thêm chúng vào bộ dữ liệu đã tồn tại.Bạn có thể (mặc dù nó có thể không phải là điều tốt nhất để làm) tạo ra thực hiện của riêng bạn BaseQueryFunction<ReadOnlyMapState, Object> làm một cái gì đó phức tạp hơn.

+1

Cảm ơn ... nó không bao giờ quá muộn khi nói đến việc học ... – Ezhil

+0

bạn có thể trợ giúp về http://stackoverflow.com/questions/35445165/total-number-of này -không lặp lại-từ-trong-mỗi-tweet – user1

Các vấn đề liên quan