10

Tôi đang sử dụng trình điều khiển java datastax 3.1.0 để kết nối với cụm cassandra và phiên bản cụm cassandra của tôi là 2.0.10. Tôi đang viết không đồng bộ với tính nhất quán của QUORUM.Làm thế nào để tăng tốc viết yêu cầu đến cassandra khi làm việc với "executeAsync"?

private final ExecutorService executorService = Executors.newFixedThreadPool(10); 

    public void save(String process, int clientid, long deviceid) { 
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)"; 
    try { 
     BoundStatement bs = CacheStatement.getInstance().getStatement(sql); 
     bs.setConsistencyLevel(ConsistencyLevel.QUORUM); 
     bs.setString(0, process); 
     bs.setInt(1, clientid); 
     bs.setLong(2, deviceid); 

     ResultSetFuture future = session.executeAsync(bs); 
     Futures.addCallback(future, new FutureCallback<ResultSet>() { 
     @Override 
     public void onSuccess(ResultSet result) { 
      logger.logInfo("successfully written"); 
     } 

     @Override 
     public void onFailure(Throwable t) { 
      logger.logError("error= ", t); 
     } 
     }, executorService); 
    } catch (Exception ex) { 
     logger.logError("error= ", ex); 
    } 
    } 

Phương thức lưu trên đây của tôi sẽ được gọi từ nhiều luồng với tốc độ rất nhanh.

Câu hỏi:

Tôi muốn tăng tốc theo yêu cầu để executeAsync phương pháp mà viết không đồng bộ vào Cassandra. Nếu tôi viết ở tốc độ rất cao so với cụm Cassandra của tôi có thể xử lý sau đó nó sẽ bắt đầu ném lỗi và tôi muốn tất cả các bài viết của tôi nên thành công vào cassandra mà không bị mất.

Tôi thấy điều này post nơi giải pháp là sử dụng Semaphore với số lượng giấy phép cố định. Nhưng tôi không chắc làm thế nào và cách tốt nhất để thực hiện điều đó là gì. Tôi chưa bao giờ sử dụng Semaphor trước đây. Đây là logic. Bất cứ ai có thể cung cấp một ví dụ với cơ sở Semaphore trên mã của tôi hoặc nếu có bất kỳ cách nào tốt hơn/tùy chọn, sau đó cho tôi biết là tốt.

Trong bối cảnh viết một chương trình dataloader, bạn có thể làm điều gì đó như sau:

  • Để giữ cho mọi thứ đơn giản sử dụng một Semaphore hoặc một số cấu trúc khác với một số cố định của giấy phép (mà ý chí là số lượng yêu cầu tối đa của bạn). Bất cứ khi nào bạn truy cập để gửi truy vấn bằng cách sử dụng executeAsync, có được giấy phép. Bạn thực sự chỉ cần 1 thread (nhưng có thể muốn để giới thiệu một hồ bơi # cpu lõi kích thước thực hiện điều này) mà mua lại giấy phép từ Semaphore và thực hiện truy vấn. Nó sẽ chỉ chặn khi có được cho đến khi có giấy phép có sẵn.
  • Sử dụng Futures.addCallback cho tương lai được trả lại từ executeAsync. Gọi lại nên gọi Sempahore.release() trong cả hai trường hợp onSuccess và onFailure. Bằng cách phát hành giấy phép, điều này sẽ cho phép chuỗi của bạn ở bước 1 để tiếp tục và gửi yêu cầu tiếp theo.

Ngoài ra tôi đã thấy vài post khác, nơi họ đã nói chuyện về việc sử dụng RingBuffer hoặc Guava RateLimitter nên cái nào là tốt hơn và tôi nên sử dụng? Dưới đây là những tùy chọn tôi có thể nghĩ:

  • Sử dụng Semaphore
  • Sử dụng Vòng đệm
  • Sử dụng Ổi Rate Limiter

bất cứ ai có thể giúp tôi với một ví dụ về cách chúng ta có thể tăng tốc theo yêu cầu hoặc có được backpressure cho cassandra viết và đảm bảo tất cả các ghi đi thành công vào cassandra?

Trả lời

4

Không phải là câu trả lời có thẩm quyền nhưng có thể sẽ hữu ích. Trước tiên, bạn nên xem xét những gì bạn sẽ làm gì khi truy vấn không thể được thực hiện ngay lập tức. Không có vấn đề mà tỷ lệ giới hạn bạn đã chọn nếu bạn nhận được yêu cầu ở mức cao hơn so với bạn có thể viết thư cho Cassandra cuối cùng bạn sẽ nhận được quá trình của bạn bị tắc với các yêu cầu chờ đợi.Và tại thời điểm đó, bạn sẽ cần phải nói với khách hàng của bạn để giữ yêu cầu của họ trong một thời gian ("đẩy lùi"). Ví dụ. nếu chúng đến qua HTTP thì trạng thái phản hồi sẽ là 429 "Quá nhiều yêu cầu". Nếu bạn tạo yêu cầu trong cùng một quy trình thì hãy quyết định thời gian chờ lâu nhất có thể chấp nhận được. Điều đó nói rằng nếu Cassandra không thể theo kịp thì đó là thời gian để mở rộng (hoặc điều chỉnh) nó.

Có thể trước khi thực hiện giới hạn tốc độ, bạn nên thử nghiệm và thêm sự chậm trễ nhân tạo vào chủ đề của mình trước khi gọi đến phương thức save (sử dụng Thread.sleep (...)) và xem nó có giải quyết được vấn đề của bạn hay không.

Lỗi trả về truy vấn áp lực ngược từ Cassandra. Nhưng bạn có thể chọn hoặc triển khai RetryPolicy để xác định thời điểm thử lại các truy vấn không thành công.

Bạn cũng có thể xem connection pool options (và đặc biệt là Monitoring and tuning the pool). Người ta có thể điều chỉnh số lượng không đồng bộ requests per connection. Tuy nhiên tài liệu nói rằng cho Cassandra 2.x tham số này mũ đến 128 và ta không nên thay đổi nó (Tôi muốn thử nghiệm với nó mặc dù :)

thực hiện với Semaphore trông giống như

/* Share it among all threads or associate with a thread for per-thread limits 
    Number of permits is to be tuned depending on acceptable load. 
*/ 
final Semaphore queryPermits = new Semaphore(20); 


public void save(String process, int clientid, long deviceid) { 
    .... 
    queryPermits.acquire(); // Blocks until a permit is available 

    ResultSetFuture future = session.executeAsync(bs); 
    Futures.addCallback(future, new FutureCallback<ResultSet>() { 
    @Override 
    public void onSuccess(ResultSet result) { 
     queryPermits.release(); 
     logger.logInfo("successfully written"); 
    } 
    @Override 
    public void onFailure(Throwable t) { 
     queryPermits.release(); // Permit should be released in all cases. 
     logger.logError("error= ", t); 
    } 
    }, executorService); 
    .... 
} 

(Trong mã thực Tôi muốn tạo ra một cuộc gọi lại bao bọc mà sẽ phát hành giấy phép và sau đó gọi các phương thức được bao bọc)

Tỷ lệ của dung lượng tương tự như semaphore nhưng cho phép các vụ nổ tạm thời sau khi sử dụng đúng thời hạn và giới hạn dựa trên thời gian (không phải tổng số truy vấn hoạt động).

Tuy nhiên, các yêu cầu sẽ không thành công vì nhiều lý do khác nhau vì vậy có thể tốt hơn là nên lên kế hoạch thử lại chúng (trong trường hợp xảy ra lỗi liên tục).

Nó có thể không phù hợp trong trường hợp của bạn nhưng tôi cố gắng sử dụng một số hàng đợi hoặc bộ đệm để yêu cầu enqueue (ví dụ: java.util.concurrent.ArrayBlockingQueue). "Bộ đệm đầy" có nghĩa là khách hàng sẽ đợi hoặc từ bỏ yêu cầu. Bộ đệm cũng sẽ được sử dụng để trả lại các yêu cầu không thành công. Tuy nhiên, các yêu cầu không công bằng hơn có thể sẽ được đặt trước hàng đợi để chúng được thử lại trước. Cũng có một cách nào đó sẽ xử lý tình huống khi hàng đợi đầy và có các yêu cầu thất bại mới cùng một lúc. Một công nhân đơn luồng sau đó sẽ chọn hàng đợi mẫu yêu cầu và gửi cho Cassandra. Vì nó không nên làm nhiều, nó không chắc rằng nó sẽ trở thành một chai cổ. Công nhân này cũng có thể áp dụng giới hạn tốc độ của riêng nó, ví dụ: dựa trên thời gian với com.google.common.util.concurrent.RateLimiter.

Nếu bạn muốn tránh mất tin nhắn càng nhiều càng tốt, anh ta có thể đặt một nhà môi giới thư với sự kiên trì (ví dụ: Kafka) trước Cassandra. Bằng cách này, tin nhắn đến có thể tồn tại ngay cả khi cúp Cassandra dài. Nhưng, tôi đoán, nó quá mức cần thiết trong trường hợp của bạn.

+0

Bạn có nghĩ rằng bạn có thể cung cấp cho tôi một ví dụ cho hàng đợi hoặc đệm Ví dụ, bạn đã cho tôi? Tôi nghĩ rằng điều này sẽ phù hợp với tôi nhất trong kịch bản của tôi. – john

1

Chỉ cần sử dụng hàng đợi chặn sẽ làm tốt. Tương lai được tạo luồng và có gọi lại (thành công và thất bại) sẽ hoạt động như người tiêu dùng và bất cứ nơi nào bạn gọi phương thức lưu sẽ hoạt động như nhà sản xuất.

Thậm chí cách tốt hơn sẽ là, bạn đặt chính yêu cầu hoàn chỉnh trong hàng đợi và khử từng lần một tiết kiệm trên mỗi lần khử.

private final ExecutorService executorService = Executors.newFixedThreadPool(10); 

public void save(String process, int clientid, long deviceid, BlockingQueue<Object> queue) { 
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)"; 
    try { 
     BoundStatement bs = CacheStatement.getInstance().getStatement(sql); 
     bs.setConsistencyLevel(ConsistencyLevel.QUORUM); 
     bs.setString(0, process); 
     bs.setInt(1, clientid); 
     bs.setLong(2, deviceid); 

     ResultSetFuture future = session.executeAsync(bs); 
     Futures.addCallback(future, new FutureCallback<ResultSet>() { 
     @Override 
     public void onSuccess(ResultSet result) { 
      logger.logInfo("successfully written"); 
      queue.take(); 
     } 

     @Override 
     public void onFailure(Throwable t) { 
      logger.logError("error= ", t); 
      queue.take(); 
     } 
     }, executorService); 
    } catch (Exception ex) { 
     logger.logError("error= ", ex); 
    } 
} 

public void invokeSaveInLoop(){ 
    Object dummyObj = new Object(); 
    BlockingQueue<Object> queue = new ArrayBlockingQueue<>(20);; 
    for(int i=0; i< 1000; i++){ 
     save("process", clientid, deviceid, queue); 
     queue.put(dummyObj); 
    } 
} 

Nếu bạn muốn đi xa hơn và kiểm tra tải trọng trên cụm giữa cách

public static String getCurrentState(){  
StringBuilder response = new StringBuilder(); 
      response.append("Current Database Connection Status <br>\n ---------------------------------------------<br>\n"); 
      final LoadBalancingPolicy loadBalancingPolicy = 
        cluster.getConfiguration().getPolicies().getLoadBalancingPolicy(); 
      final PoolingOptions poolingOptions = 
        cluster.getConfiguration().getPoolingOptions(); 
      Session.State state = session.getState(); 
      for (Host host : state.getConnectedHosts()) { 
       HostDistance distance = loadBalancingPolicy.distance(host); 
       int connections = state.getOpenConnections(host); 
       int inFlightQueries = state.getInFlightQueries(host); 
       response.append(String.format("%s current connections=%d, max allowed connections=%d, current load=%d, max load=%d%n", 
           host, connections, poolingOptions.getMaxConnectionsPerHost(distance), inFlightQueries, 
           connections * 
             poolingOptions.getMaxRequestsPerConnection(distance))) 
         .append("<br>\n"); 
      } 
      return response.toString(); 
} 
Các vấn đề liên quan