2012-02-21 32 views
7

Nói rằng tôi có một AtomicReference vào một danh sách các đối tượng:AtomicReference đến một đối tượng có thể thay đổi và tầm nhìn

AtomicReference<List<?>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>()); 

Chủ đề Một thêm yếu tố vào danh sách này: batch.get().add(o);

Sau đó, thread B mất danh sách và, ví dụ: lưu trữ nó trong một DB: insertBatch(batch.get());

Tôi có phải thực hiện đồng bộ hóa bổ sung khi viết (Thr hay không) ead A) và đọc (Thread B) để đảm bảo thread B thấy danh sách theo cách A còn lại, hoặc là điều này được chăm sóc bởi AtomicReference?

Nói cách khác: nếu tôi có AtomicReference đối với một đối tượng có thể thay đổi và một chuỗi thay đổi đối tượng đó, các chuỗi khác có thấy thay đổi này ngay lập tức không?

Edit:

Có lẽ một số mã ví dụ là theo thứ tự:

public void process(Reader in) throws IOException { 
    List<Future<AtomicReference<List<Object>>>> tasks = new ArrayList<Future<AtomicReference<List<Object>>>>(); 
    ExecutorService exec = Executors.newFixedThreadPool(4); 

    for (int i = 0; i < 4; ++i) { 
     tasks.add(exec.submit(new Callable<AtomicReference<List<Object>>>() { 
      @Override public AtomicReference<List<Object>> call() throws IOException { 

       final AtomicReference<List<Object>> batch = new AtomicReference<List<Object>>(new ArrayList<Object>(batchSize)); 

       Processor.this.parser.parse(in, new Parser.Handler() { 
        @Override public void onNewObject(Object event) { 
          batch.get().add(event); 

          if (batch.get().size() >= batchSize) { 
           dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize))); 
          } 
        } 
       }); 

       return batch; 
      } 
     })); 
    } 

    List<Object> remainingBatches = new ArrayList<Object>(); 

    for (Future<AtomicReference<List<Object>>> task : tasks) { 
     try { 
      AtomicReference<List<Object>> remainingBatch = task.get(); 
      remainingBatches.addAll(remainingBatch.get()); 
     } catch (ExecutionException e) { 
      Throwable cause = e.getCause(); 

      if (cause instanceof IOException) { 
       throw (IOException)cause; 
      } 

      throw (RuntimeException)cause; 
     } 
    } 

    // these haven't been flushed yet by the worker threads 
    if (!remainingBatches.isEmpty()) { 
     dao.insertBatch(remainingBatches); 
    } 
} 

gì xảy ra ở đây là tôi tạo ra bốn đề người lao động để phân tích một số văn bản (đây là Reader in tham số cho phương thức process()) . Mỗi công nhân lưu các dòng nó đã phân tích cú pháp trong một lô, và xóa sạch lô khi nó đầy (dao.insertBatch(batch.getAndSet(new ArrayList<Object>(batchSize)));).

Vì số lượng dòng trong văn bản không phải là bội số của kích thước lô, các đối tượng cuối cùng sẽ kết thúc trong một đợt không bị xóa, vì nó không đầy. Các lô còn lại này do đó được chèn vào bởi luồng chính.

Tôi sử dụng AtomicReference.getAndSet() để thay thế gói đầy đủ bằng một gói trống. Nó chương trình này chính xác liên quan đến luồng?

Trả lời

9

Um ... nó không thực sự hoạt động như thế này. AtomicReference đảm bảo rằng tham chiếu chính nó được hiển thị trên các chuỗi, nghĩa là nếu bạn chỉ định tham chiếu khác với tham chiếu ban đầu thì bản cập nhật sẽ hiển thị. Nó không đảm bảo về nội dung thực tế của đối tượng mà tham chiếu trỏ đến.

Do đó, hoạt động đọc/ghi trên nội dung danh sách yêu cầu đồng bộ hóa riêng biệt.

Chỉnh sửa: Vì vậy, đánh giá từ mã cập nhật của bạn và nhận xét bạn đã đăng, đặt tham chiếu cục bộ thành volatile là đủ để đảm bảo khả năng hiển thị.

+0

OK, tôi đã thêm một số mã ví dụ vào câu hỏi của mình ở trên. Tôi sử dụng 'AtomicReference.getAndSet()' để thay thế một lô đầy đủ bằng một gói trống mới. Tôi vẫn cần đồng bộ hóa bổ sung? –

+1

Có, mã của bạn là chính xác, mặc dù việc sử dụng «AtomicReference' dường như không cần thiết ở đây. – Tudor

+0

@Tudor đang nghĩ chính xác điều tương tự.Trong thực tế, getAndSet() có thể không làm những gì anh ta muốn vì nó sẽ nhận được giá trị hiện tại và sau đó thay đổi giá trị của AtomicReference. –

0

Thêm vào câu trả lời của Tudor: Bạn sẽ phải tự làm chủ đề an toàn hoặc - tùy theo yêu cầu của bạn - thậm chí khối mã lớn hơn.

Nếu bạn có thể nhận được ngay với một thread ArrayList bạn có thể "trang trí" nó như thế này:

batch = java.util.Collections.synchronizedList(new ArrayList<Object>()); 

Nhưng hãy nhớ: Ngay cả "đơn giản" xây dựng như thế này là không threadsafe với điều này:

Object o = batch.get(batch.size()-1); 
0

AtomicReference sẽ chỉ giúp bạn tham chiếu đến danh sách, nó sẽ không tự làm gì với danh sách. Đặc biệt hơn, trong kịch bản của bạn, bạn gần như chắc chắn sẽ gặp vấn đề khi hệ thống đang được tải, nơi người tiêu dùng đã lấy danh sách trong khi nhà sản xuất đang thêm một mục vào nó.

Âm thanh này với tôi như bạn nên sử dụng BlockingQueue. Sau đó bạn có thể giới hạn dung lượng bộ nhớ nếu bạn sản xuất nhanh hơn người tiêu dùng của bạn và để hàng đợi xử lý tất cả tranh chấp.

Cái gì như:

ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object> (50); 

// ... Producer 
queue.put(o); 

// ... Consumer 
List<Object> queueContents = new ArrayList<Object>(); 
// Grab everything waiting in the queue in one chunk. Should never be more than 50 items. 
queue.drainTo(queueContents); 

Added

Nhờ @Tudor để chỉ ra cấu trúc bạn đang sử dụng. ... Tôi phải thừa nhận nó khá kỳ lạ. Bạn thực sự không cần AtomicReference ở mức nào tôi có thể thấy. Mỗi thread sở hữu ArrayList riêng của nó cho đến khi nó được chuyển đến dao tại thời điểm đó nó được thay thế để không có tranh chấp ở tất cả mọi nơi.

Tôi hơi lo lắng về việc bạn tạo bốn trình phân tích cú pháp trên một đơn Reader. Tôi hy vọng bạn có một số cách để đảm bảo mỗi trình phân tích cú pháp không ảnh hưởng đến những người khác.

Cá nhân tôi sẽ sử dụng một số hình thức mẫu người tiêu dùng sản xuất như tôi đã mô tả trong mã ở trên. Một cái gì đó như thế này có lẽ.

static final int PROCESSES = 4; 
static final int batchSize = 10; 

public void process(Reader in) throws IOException, InterruptedException { 

    final List<Future<Void>> tasks = new ArrayList<Future<Void>>(); 
    ExecutorService exec = Executors.newFixedThreadPool(PROCESSES); 
    // Queue of objects. 
    final ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object> (batchSize * 2); 
    // The final object to post. 
    final Object FINISHED = new Object(); 

    // Start the producers. 
    for (int i = 0; i < PROCESSES; i++) { 
    tasks.add(exec.submit(new Callable<Void>() { 
     @Override 
     public Void call() throws IOException { 

     Processor.this.parser.parse(in, new Parser.Handler() { 
      @Override 
      public void onNewObject(Object event) { 
      queue.add(event); 
      } 
     }); 
     // Post a finished down the queue. 
     queue.add(FINISHED); 
     return null; 
     } 
    })); 
    } 

    // Start the consumer. 
    tasks.add(exec.submit(new Callable<Void>() { 
    @Override 
    public Void call() throws IOException { 
     List<Object> batch = new ArrayList<Object>(batchSize); 
     int finishedCount = 0; 
     // Until all threads finished. 
     while (finishedCount < PROCESSES) { 
     Object o = queue.take(); 
     if (o != FINISHED) { 
      // Batch them up. 
      batch.add(o); 
      if (batch.size() >= batchSize) { 
      dao.insertBatch(batch); 
      // If insertBatch takes a copy we could merely clear it. 
      batch = new ArrayList<Object>(batchSize); 
      } 
     } else { 
      // Count the finishes. 
      finishedCount += 1; 
     } 
     } 
     // Finished! Post any incopmplete batch. 
     if (batch.size() > 0) { 
     dao.insertBatch(batch); 
     } 
     return null; 
    } 
    })); 

    // Wait for everything to finish. 
    exec.shutdown(); 
    // Wait until all is done. 
    boolean finished = false; 
    do { 
    try { 
     // Wait up to 1 second for termination. 
     finished = exec.awaitTermination(1, TimeUnit.SECONDS); 
    } catch (InterruptedException ex) { 
    } 
    } while (!finished); 
} 
+0

Umm ... từ mã của anh ta không giống như người tiêu dùng sản xuất. Anh ấy thực sự sinh ra một nhóm các chủ đề, mỗi người làm một số công việc, sau đó tham gia và hoàn thành công việc trong chủ đề chính. Không có dữ liệu thực tế nào được truyền giữa các luồng. – Tudor

1

Tôi nghĩ rằng, quên tất cả các mã ở đây, bạn có câu hỏi chính xác là thế này:

Tôi phải làm đồng bộ hóa bổ sung khi viết (Chủ đề A) và đọc (Chủ đề B) để đảm bảo chủ đề B thấy danh sách theo cách còn lại, hoặc điều này được thực hiện bởi AtomicReference?

Vì vậy, phản hồi chính xác cho điều đó là: , quản lý nguyên tử khả năng hiển thị. Và đó không phải là ý kiến ​​của tôi, nhưng JDK documentation one:

Hiệu ứng bộ nhớ để truy cập và cập nhật nguyên tử thường tuân thủ các quy tắc cho các chất bay hơi, như đã nêu trong Mô hình bộ nhớ 17.4.

Tôi hy vọng điều này sẽ hữu ích.

Các vấn đề liên quan