2016-04-24 20 views
14

Chương trình dựa trên Hazelcast của tôi có thể hoạt động ở hai chế độ: người gửi và người lao động.Weird Hazelcat IMap # put() behavior

Submitter đặt một số POJO vào bản đồ phân phối bởi một số chìa khóa, ví dụ .: hazelcastInstance.getMap(MAP_NAME).put(key, value);

Worker có một vòng lặp vô hạn (với Thread.sleep(1000L); bên trong cho thời gian chờ) mà phải xử lý các đối tượng từ bản đồ. Bây giờ tôi chỉ in kích thước bản đồ trong vòng lặp này.

Bây giờ, đây là vấn đề. Tôi bắt đầu ứng dụng công nhân. Sau đó, tôi bắt đầu bốn người gửi đồng thời (mỗi lần thêm một mục vào bản đồ và chấm dứt công việc của nó). Nhưng sau khi tất cả các ứng dụng người gửi được thực hiện, ứng dụng công nhân in tùy ý kích thước: đôi khi nó phát hiện rằng chỉ có một mục được thêm vào, đôi khi hai, đôi khi ba (thực sự nó không bao giờ đã nhìn thấy tất cả bốn mục).

Sự cố với luồng đơn giản này là gì? Tôi đã đọc trong tài liệu Hazelcast rằng phương pháp put() là đồng bộ, vì vậy nó đảm bảo rằng sau khi nó trả về, mục nhập được đặt vào bản đồ phân phối và được nhân rộng. Nhưng nó không có vẻ như vậy trong thí nghiệm của tôi.

UPD (code)

Submitter:

public void submit(String key) { 
    Object mySerializableObject = ... 
    IMap<String, Object> map = hazelcastInstance.getMap(MAP_NAME); 
    map.putIfAbsent(key, mySerializableObject, TASK_TTL_IN_HOURS, TimeUnit.HOURS); 
} 

Worker:

public void process() { 
    while (true) { 
     IMap<String, Object> map = hazelcastInstance.getMap(MAP_NAME); 
     System.out.println(map.size()); 

     // Optional<Map.Entry<String, Object>> objectToProcess = getObjectToProcess(); 
     // objectToProcess.ifPresent(objectToProcess-> processObject(id, objectToProcess)); 
     try { 
      Thread.sleep(PAUSE); 
     } catch (InterruptedException e) { 
      LOGGER.error(e.getMessage(), e); 
     } 
    } 
} 

Tôi nhận xét ra "xử lý" một phần bản thân, bởi vì bây giờ tôi chỉ cố gắng để có được trạng thái nhất quán của bản đồ. Mã ở trên in các kết quả khác nhau mỗi lần, ví dụ: "4, 3, 1, 1, 1, 1, 1 ..." (vì vậy nó thậm chí có thể thấy 4 tác vụ đã gửi trong một khoảnh khắc, nhưng sau đó chúng ... biến mất) .

UPD (log)

Worker:

... 
tasksMap.size() = 0 
tasksMap.size() = 0 
tasksMap.size() = 0 
tasksMap.size() = 0 
tasksMap.size() = 1 
tasksMap.size() = 2 
tasksMap.size() = 2 
tasksMap.size() = 2 
tasksMap.size() = 2 
tasksMap.size() = 2 
... 

Submitter 1:

Before: tasksMap.size() = 0 
After: tasksMap.size() = 1 

Submitter 2:

Before: tasksMap.size() = 1 
After: tasksMap.size() = 4 

nộp cho tter 3:

Before: tasksMap.size() = 1 
After: tasksMap.size() = 2 

Submitter 4:

Before: tasksMap.size() = 3 
After: tasksMap.size() = 4 
+0

IMap :: phương pháp kích thước là ước tính, dù sao nó cũng sẽ ổn định cuối cùng. Bạn có thể chia sẻ thêm một số mã không? – noctarius

+0

@noctarius, tôi đã cập nhật câu hỏi. –

+0

Mã của bạn có sử dụng các thành viên được nhúng và chúng có thực sự dừng sau khi gửi giá trị không? Tôi có thể tưởng tượng các thành viên nhanh chóng rời khỏi cụm để thực hiện các yêu cầu sao lưu khi rời khỏi. – noctarius

Trả lời

7

Vâng, tôi đoán, tôi đã tìm ra vấn đề. Theo như tôi hiểu, phân phối IMap được trả về bởi hazelcastInstance.getMap không đảm bảo rằng dữ liệu được sao chép trên tất cả các nút hiện có trong cụm: một số phần dữ liệu có thể được sao chép sang một số nút, một phần khác - đến các nút khác. Đó là lý do tại sao trong ví dụ của tôi một số nhiệm vụ được gửi đã được nhân rộng không đến nút công nhân (hoạt động vĩnh viễn), nhưng đối với một số người gửi khác, chấm dứt việc thực thi của họ sau khi gửi. Vì vậy, các mục như vậy đã bị mất khi thoát khỏi người gửi.

Tôi đã giải quyết vấn đề này bằng cách thay thế hazelcastInstance.getMap thành hazelcastInstance.getReplicatedMap.Phương thức này trả về ReplicatedMap, trong đó, AFAIK, đảm bảo rằng các mục nhập vào nó sẽ được nhân rộng thành tất cả các nút của cụm. Vì vậy, bây giờ mọi thứ hoạt động tốt trong hệ thống của tôi.