2013-06-04 35 views
5

Tôi muốn sử dụng api số lượng lớn elasticsearch bằng cách sử dụng java và tự hỏi làm thế nào tôi có thể đặt kích thước lô.Kích thước lô hàng loạt java elasticsearch

Hiện nay tôi đang sử dụng nó như:

BulkRequestBuilder bulkRequest = getClient().prepareBulk(); 
while(hasMore) { 
    bulkRequest.add(getClient().prepareIndex(indexName, indexType, artist.getDocId()).setSource(json)); 
    hasMore = checkHasMore(); 
} 
BulkResponse bResp = bulkRequest.execute().actionGet(); 
//To check failures 
log.info("Has failures? {}", bResp.hasFailures()); 

Bất kỳ ý tưởng làm thế nào tôi có thể thiết lập kích thước lớn/mẻ?

+1

Xin vui lòng, đánh dấu câu trả lời là đúng ..... –

Trả lời

21

Nó chủ yếu phụ thuộc vào kích thước tài liệu của bạn, tài nguyên sẵn có trên máy khách và loại khách hàng (khách hàng vận chuyển hoặc trình khách nút).

Trình khách nút nhận thức được phân đoạn trên cụm và gửi tài liệu trực tiếp đến các nút giữ các phân đoạn mà chúng được cho là được lập chỉ mục. Mặt khác, máy khách vận tải là một máy khách bình thường gửi các yêu cầu của nó tới một danh sách các nút theo kiểu vòng tròn. Sau đó, yêu cầu hàng loạt sẽ được gửi đến một nút, đó sẽ trở thành cổng của bạn khi lập chỉ mục.

Vì bạn đang sử dụng API Java, tôi khuyên bạn nên xem BulkProcessor, giúp dễ dàng hơn và linh hoạt hơn khi lập chỉ mục hàng loạt. Bạn có thể xác định số lượng hành động tối đa, kích thước tối đa và khoảng thời gian tối đa kể từ lần thực hiện hàng loạt cuối cùng. Nó sẽ thực hiện số lượng lớn tự động cho bạn khi cần thiết. Bạn cũng có thể đặt số lượng yêu cầu hàng loạt đồng thời tối đa.

Sau khi bạn đã tạo ra BulkProcessor như thế này:

BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { 
    @Override 
    public void beforeBulk(long executionId, BulkRequest request) { 
     logger.info("Going to execute new bulk composed of {} actions", request.numberOfActions()); 
    } 

    @Override 
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { 
     logger.info("Executed bulk composed of {} actions", request.numberOfActions()); 
    } 

    @Override 
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) { 
     logger.warn("Error executing bulk", failure); 
    } 
    }).setBulkActions(bulkSize).setConcurrentRequests(maxConcurrentBulk).build(); 

Bạn chỉ cần thêm các yêu cầu của bạn với nó:

bulkProcessor.add(indexRequest); 

và đóng nó lại ở cuối để tuôn bất kỳ yêu cầu cuối cùng có thể có chưa được thực hiện:

bulkProcessor.close(); 

Để cuối cùng trả lời câu hỏi của bạn: e điều tốt đẹp về các BulkProcessor cũng là nó có mặc định hợp lý: 5 MB kích thước, 1000 hành động, 1 yêu cầu đồng thời, không có khoảng thời gian tuôn ra (mà có thể hữu ích để thiết lập).

0

bạn cần tính trình tạo yêu cầu hàng loạt khi nó đạt đến giới hạn kích thước lô của bạn, sau đó lập chỉ mục và xóa các bản dựng hàng loạt cũ hơn. đây là ví dụ về mã

Settings settings = ImmutableSettings.settingsBuilder() 
    .put("cluster.name", "MyClusterName").build(); 

TransportClient client = new TransportClient(settings); 
String hostname = "myhost ip"; 
int port = 9300; 
client.addTransportAddress(new InetSocketTransportAddress(hostname, port)); 

BulkRequestBuilder bulkBuilder = client.prepareBulk(); 
BufferedReader br = new BufferedReader(new InputStreamReader(new DataInputStream(new FileInputStream("my_file_path")))); 
long bulkBuilderLength = 0; 
String readLine = ""; 
String index = "my_index_name"; 
String type = "my_type_name"; 
String id = ""; 

while((readLine = br.readLine()) != null){ 
    id = somefunction(readLine); 
    String json = new ObjectMapper().writeValueAsString(readLine); 
    bulkBuilder.add(client.prepareIndex(index, type, id).setSource(json)); 
    bulkBuilderLength++; 
    if(bulkBuilderLength % 1000== 0){ 
     logger.info("##### " + bulkBuilderLength + " data indexed."); 
     BulkResponse bulkRes = bulkBuilder.execute().actionGet(); 
     if(bulkRes.hasFailures()){ 
     logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage()); 
     } 
     bulkBuilder = client.prepareBulk(); 
    } 
} 

br.close(); 

if(bulkBuilder.numberOfActions() > 0){ 
    logger.info("##### " + bulkBuilderLength + " data indexed."); 
    BulkResponse bulkRes = bulkBuilder.execute().actionGet(); 
    if(bulkRes.hasFailures()){ 
     logger.error("##### Bulk Request failure with error: " + bulkRes.buildFailureMessage()); 
    } 
    bulkBuilder = client.prepareBulk(); 
} 

hy vọng điều này giúp bạn nhờ

Các vấn đề liên quan