Tôi hy vọng không bao giờ là quá muộn để trả lời, ít nhất là người khác có thể tìm thấy câu trả lời của tôi có ích :)
Vì vậy, topology.newStaticState()
là trừu tượng của một lưu trữ dữ liệu queryable của Trident. Thông số cho newStaticState()
phải là một triển khai - dựa trên hợp đồng của phương thức - trong số storm.trident.state.StateFactory
. Nhà máy, lần lượt, nên thực hiện phương pháp makeState()
trả về một thể hiện của storm.trident.state.State
. Tuy nhiên, nếu bạn có kế hoạch truy vấn trạng thái của mình, bạn nên trả lại giá trị là storm.trident.state.map.ReadOnlyMapState
, vì đồng bằng storm.trident.state.State
không có phương pháp truy vấn nguồn dữ liệu thực tế (bạn sẽ thực sự có ngoại lệ cho lớp học nếu bạn cố gắng sử dụng bất kỳ thứ gì trừ ReadOnlyMapState
).
Vì vậy, hãy thử xem!
Một thực hiện giả nhà nước:
public static class ExampleStaticState implements ReadOnlyMapState<String> {
private final Map<String, String> dataSourceStub;
public ExampleStaticState() {
dataSourceStub = new HashMap<>();
dataSourceStub.put("tuple-00", "Trident");
dataSourceStub.put("tuple-01", "definitely");
dataSourceStub.put("tuple-02", "lacks");
dataSourceStub.put("tuple-03", "documentation");
}
@Override
public List<String> multiGet(List<List<Object>> keys) {
System.out.println("DEBUG: MultiGet, keys is " + keys);
List<String> result = new ArrayList<>();
for (List<Object> inputTuple : keys) {
result.add(dataSourceStub.get(inputTuple.get(0)));
}
return result;
}
@Override
public void beginCommit(Long txid) {
// never gets executed...
System.out.println("DEBUG: Begin commit, txid=" + txid);
}
@Override
public void commit(Long txid) {
// never gets executed...
System.out.println("DEBUG: Commit, txid=" + txid);
}
}
Một nhà máy:
public static class ExampleStaticStateFactory implements StateFactory {
@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
return new ExampleStaticState();
}
}
Một đơn giản psvm
(aka public static void main
):
public static void main(String... args) {
TridentTopology tridentTopology = new TridentTopology();
FeederBatchSpout spout = new FeederBatchSpout(Arrays.asList(new String[]{
"foo"
}));
TridentState state = tridentTopology.newStaticState(new ExampleStaticStateFactory());
tridentTopology
.newStream("spout", spout)
.stateQuery(state, new Fields("foo"), new MapGet(), new Fields("bar"))
.each(new Fields("foo", "bar"), new Debug())
;
Config conf = new Config();
conf.setNumWorkers(6);
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("tridentTopology", conf, tridentTopology.build());
spout.feed(Arrays.asList(new Values[]{
new Values("tuple-00"),
new Values("tuple-01"),
new Values("tuple-02"),
new Values("tuple-03")
}));
localCluster.shutdown();
}
Và, cuối cùng, kết quả:
DEBUG: MultiGet, keys is [[tuple-00], [tuple-01], [tuple-02], [tuple-03]]
DEBUG: [tuple-00, Trident]
DEBUG: [tuple-01, definitely]
DEBUG: [tuple-02, lacks]
DEBUG: [tuple-03, documentation]
Bạn thấy đấy, stateQuery() nhận các giá trị từ một lô đầu vào và ánh xạ chúng tới các giá trị được tìm thấy trong 'lưu trữ dữ liệu'.
Lặn một chút sâu hơn, bạn có thể có một cái nhìn tại nguồn của MapGet
lớp (anh chàng mà dụ được sử dụng để truy vấn bên trong cấu trúc liên kết) và tìm ra sau đó:
public class MapGet extends BaseQueryFunction<ReadOnlyMapState, Object> {
@Override
public List<Object> batchRetrieve(ReadOnlyMapState map, List<TridentTuple> keys) {
return map.multiGet((List) keys);
}
@Override
public void execute(TridentTuple tuple, Object result, TridentCollector collector) {
collector.emit(new Values(result));
}
}
Vì vậy, dưới mui xe nó chỉ cần gọi phương thức multiGet()
của việc thực hiện ReadOnlyMapState
và sau đó phát ra các giá trị được tìm thấy trong lưu trữ dữ liệu, thêm chúng vào bộ dữ liệu đã tồn tại.Bạn có thể (mặc dù nó có thể không phải là điều tốt nhất để làm) tạo ra thực hiện của riêng bạn BaseQueryFunction<ReadOnlyMapState, Object>
làm một cái gì đó phức tạp hơn.
Bạn có thể xác định "Trident" trong ngữ cảnh này không? Có nhiều thứ gọi là Trident. – Charles
Ngữ cảnh là "Bão": https://github.com/nathanmarz/storm/wiki/Documentation#trident – Dan