Tôi đang sử dụng trình điều khiển java datastax 3.1.0 để kết nối với cụm cassandra và phiên bản cụm cassandra của tôi là 2.0.10. Tôi đang viết không đồng bộ với tính nhất quán của QUORUM.Làm thế nào để tăng tốc viết yêu cầu đến cassandra khi làm việc với "executeAsync"?
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
public void save(String process, int clientid, long deviceid) {
String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
try {
BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
bs.setString(0, process);
bs.setInt(1, clientid);
bs.setLong(2, deviceid);
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
logger.logInfo("successfully written");
}
@Override
public void onFailure(Throwable t) {
logger.logError("error= ", t);
}
}, executorService);
} catch (Exception ex) {
logger.logError("error= ", ex);
}
}
Phương thức lưu trên đây của tôi sẽ được gọi từ nhiều luồng với tốc độ rất nhanh.
Câu hỏi:
Tôi muốn tăng tốc theo yêu cầu để executeAsync
phương pháp mà viết không đồng bộ vào Cassandra. Nếu tôi viết ở tốc độ rất cao so với cụm Cassandra của tôi có thể xử lý sau đó nó sẽ bắt đầu ném lỗi và tôi muốn tất cả các bài viết của tôi nên thành công vào cassandra mà không bị mất.
Tôi thấy điều này post nơi giải pháp là sử dụng Semaphore
với số lượng giấy phép cố định. Nhưng tôi không chắc làm thế nào và cách tốt nhất để thực hiện điều đó là gì. Tôi chưa bao giờ sử dụng Semaphor trước đây. Đây là logic. Bất cứ ai có thể cung cấp một ví dụ với cơ sở Semaphore trên mã của tôi hoặc nếu có bất kỳ cách nào tốt hơn/tùy chọn, sau đó cho tôi biết là tốt.
Trong bối cảnh viết một chương trình dataloader, bạn có thể làm điều gì đó như sau:
- Để giữ cho mọi thứ đơn giản sử dụng một Semaphore hoặc một số cấu trúc khác với một số cố định của giấy phép (mà ý chí là số lượng yêu cầu tối đa của bạn). Bất cứ khi nào bạn truy cập để gửi truy vấn bằng cách sử dụng executeAsync, có được giấy phép. Bạn thực sự chỉ cần 1 thread (nhưng có thể muốn để giới thiệu một hồ bơi # cpu lõi kích thước thực hiện điều này) mà mua lại giấy phép từ Semaphore và thực hiện truy vấn. Nó sẽ chỉ chặn khi có được cho đến khi có giấy phép có sẵn.
- Sử dụng Futures.addCallback cho tương lai được trả lại từ executeAsync. Gọi lại nên gọi Sempahore.release() trong cả hai trường hợp onSuccess và onFailure. Bằng cách phát hành giấy phép, điều này sẽ cho phép chuỗi của bạn ở bước 1 để tiếp tục và gửi yêu cầu tiếp theo.
Ngoài ra tôi đã thấy vài post khác, nơi họ đã nói chuyện về việc sử dụng RingBuffer
hoặc Guava RateLimitter
nên cái nào là tốt hơn và tôi nên sử dụng? Dưới đây là những tùy chọn tôi có thể nghĩ:
- Sử dụng Semaphore
- Sử dụng Vòng đệm
- Sử dụng Ổi Rate Limiter
bất cứ ai có thể giúp tôi với một ví dụ về cách chúng ta có thể tăng tốc theo yêu cầu hoặc có được backpressure cho cassandra viết và đảm bảo tất cả các ghi đi thành công vào cassandra?
Bạn có nghĩ rằng bạn có thể cung cấp cho tôi một ví dụ cho hàng đợi hoặc đệm Ví dụ, bạn đã cho tôi? Tôi nghĩ rằng điều này sẽ phù hợp với tôi nhất trong kịch bản của tôi. – john