2010-07-29 20 views
6

Tôi đã hỏi câu hỏi này lần đầu tiên here, nhưng tôi đã nhận ra rằng câu hỏi của tôi không phải là một vòng lặp thời gian thực. Những gì tôi muốn biết là, cách thích hợp để thực hiện truyền thông điệp không đồng bộ hiệu suất cao trong Java là gì?Java: Thông báo hiệu năng cao (đơn sản xuất/người tiêu dùng đơn)

Những gì tôi đang cố gắng để làm ...

Tôi có ~ 10.000 người tiêu dùng, từng tốn nhiều tin nhắn từ hàng đợi tin của họ. Tôi có một chủ đề sản xuất tin nhắn từng người một và đặt chúng vào hàng đợi chính xác của người tiêu dùng. Mỗi vòng lặp của người tiêu dùng vô thời hạn, kiểm tra thông báo xuất hiện trong hàng đợi của nó và xử lý nó.

Tôi tin rằng thuật ngữ này là "một nhà sản xuất đơn/người tiêu dùng", vì có một nhà sản xuất và mỗi người tiêu dùng chỉ hoạt động trên hàng đợi riêng của họ (nhiều người tiêu dùng không bao giờ đọc từ cùng một hàng đợi).

Bên Consumer.java:

@Override 
public void run() { 
    while (true) { 
     Message msg = messageQueue.poll(); 
     if (msg != null) { 
      ... // do something with the message 
     } 
    } 
} 

Các nhà sản xuất đang đặt thông điệp bên trong hàng đợi thông điệp tiêu dùng với tốc độ nhanh (vài triệu tin nhắn mỗi giây). Người tiêu dùng nên xử lý các tin nhắn này càng nhanh càng tốt!

Lưu ý: while (true) { ... } bị chấm dứt bởi thông báo KILL được gửi bởi Nhà sản xuất làm thông báo cuối cùng của nhà sản xuất.

Tuy nhiên, câu hỏi của tôi là về cách thích hợp để thiết kế thông điệp này. Tôi nên sử dụng loại hàng đợi nào cho messageQueue? Nó có nên đồng bộ hoặc không đồng bộ không? Thông báo nên được thiết kế như thế nào? Tôi có nên sử dụng vòng lặp while-true không? Người tiêu dùng có nên là một chủ đề hay cái gì đó khác không? 10.000 chủ đề có bị chậm lại không? Thay thế cho chủ đề là gì?

Vì vậy, cách thích hợp để thực hiện truyền thông điệp hiệu suất cao trong Java là gì?

+1

Tại sao bạn có chủ đề 10k? Sẽ có rất nhiều chi phí trên chuyển đổi thread trừ khi bạn có rất nhiều lõi hoặc nhiệm vụ liên quan đến một số tiền hợp lý chờ đợi cho mỗi thread. – Mike

+2

10k chủ đề cách xa hiệu suất cao – whiskeysierra

+0

'@Mike:' Có 10.000 biểu tượng khác nhau và mỗi người tiêu dùng xử lý thông điệp cho một biểu tượng. Tôi không biết nếu nó nên được thực hiện như là chủ đề, nhưng người tiêu dùng chia sẻ không có gì với nhau, và làm cho một ứng cử viên tốt cho các mô hình diễn viên. –

Trả lời

5

Tôi có thể nói rằng ngữ cảnh chuyển đổi trên 10.000 chủ đề sẽ rất cao, chưa kể đến phí trên bộ nhớ. Theo mặc định, trên nền tảng 32 bit, mỗi luồng sử dụng kích thước ngăn xếp mặc định là 256kb, vì vậy chỉ có 2,5 GB cho ngăn xếp của bạn. Rõ ràng là bạn đang nói chuyện 64-bit nhưng thậm chí như vậy, đó là một số lượng khá lớn của bộ nhớ. Do số lượng bộ nhớ được sử dụng, bộ nhớ cache sẽ bị ném rất nhiều và CPU sẽ bị điều chỉnh bởi băng thông bộ nhớ.

Tôi sẽ tìm một thiết kế tránh sử dụng quá nhiều luồng để tránh phân bổ lượng lớn ngăn xếp và chuyển đổi ngữ cảnh trên cao. Bạn không thể xử lý đồng thời 10.000 luồng. Phần cứng hiện tại thường có ít hơn 100 lõi.

Tôi sẽ tạo một hàng đợi cho mỗi chuỗi phần cứng và gửi thư theo kiểu vòng tròn. Nếu thời gian xử lý thay đổi đáng kể, có nguy cơ rằng một số luồng kết thúc xử lý hàng đợi của chúng trước khi chúng được cung cấp nhiều công việc hơn, trong khi các luồng khác không bao giờ nhận được thông qua công việc được phân bổ của chúng. Điều này có thể tránh được bằng cách sử dụng trộm cắp công việc, như được thực hiện trong khung công tác JSJ-166 ForkJoin.

Vì giao tiếp là một cách từ nhà xuất bản đến người đăng ký, khi đó Tin nhắn không cần bất kỳ thiết kế đặc biệt nào, giả sử người đăng ký không thay đổi tin nhắn sau khi được xuất bản.

CHỈNH SỬA: Đọc nhận xét, nếu bạn có 10.000 biểu tượng, sau đó tạo một số chủ đề người đăng ký chung (một chuỗi người đăng ký trên mỗi lõi), nhận tin nhắn ngẫu nhiên từ nhà xuất bản (ví dụ: thông qua hàng đợi tin nhắn).Người đăng ký kéo thông điệp từ hàng đợi, lấy biểu tượng từ thông báo và tìm kiếm biểu tượng này trong một bản đồ xử lý tin nhắn, truy xuất trình xử lý và yêu cầu trình xử lý xử lý đồng bộ thông báo. Sau khi thực hiện, nó lặp lại, tìm nạp thông điệp tiếp theo từ hàng đợi. Nếu thông điệp cho cùng một biểu tượng phải được xử lý theo thứ tự (đó là lý do tại sao tôi đoán bạn muốn có 10.000 hàng đợi.), Bạn cần ánh xạ biểu tượng cho người đăng ký. Ví dụ. nếu có 10 người đăng ký thì ký hiệu 0-999 sẽ chuyển tới thuê bao 0, 1000-1999 cho thuê bao 1 vv .. Một lược đồ tinh tế hơn là ánh xạ biểu tượng theo tần suất phân phối của chúng, sao cho mỗi thuê bao nhận được cùng tải. Ví dụ: nếu 10% lưu lượng truy cập là ký hiệu 0 thì người đăng ký 0 sẽ chỉ xử lý một biểu tượng đó và các biểu tượng khác sẽ được phân phối trong số các người đăng ký khác.

+0

Có cách nào để viết chương trình của tôi để khái niệm đó là 10.000 người tiêu dùng riêng biệt, mỗi người làm việc trên hàng đợi của riêng họ? Nhưng chạy như một vài chủ đề đối phó với một vài hàng đợi? –

+0

Vui lòng xem chỉnh sửa của tôi. – mdma

+0

@ Mr.Burgundy Chắc chắn, có rất nhiều cách tiếp cận. ví dụ. như là một cách tiếp cận đơn giản, bạn có thể gói gọn logic của người tiêu dùng trong một lớp (không liên quan đến chủ đề người tiêu dùng) 10k trong danh sách, có chuỗi _one_ tiêu dùng tìm kiếm đúng và gọi logic cho người tiêu dùng cụ thể đó. – nos

0

Có một nhóm chủ đề người tiêu dùng liên quan đến khả năng phần cứng và công suất hệ điều hành. Những chủ đề tiêu dùng này có thể thăm dò hàng đợi tin nhắn của bạn.

Tôi hoặc sẽ có các Thông báo biết cách tự xử lý hoặc đăng ký bộ xử lý với các lớp chuỗi tiêu thụ khi chúng được khởi tạo.

1

Trước hết, không có câu trả lời đúng nào trừ khi bạn đặt tài liệu thiết kế hoàn chỉnh hoặc bạn thử các cách tiếp cận khác nhau cho chính mình.

Tôi giả định quá trình xử lý của bạn sẽ không được tính toán chuyên sâu nếu không bạn sẽ không nghĩ đến việc xử lý 10000 hàng đợi cùng một lúc. Một giải pháp có thể là giảm thiểu chuyển đổi ngữ cảnh bằng cách có một luồng hai luồng cho mỗi CPU. Trừ khi hệ thống của bạn sẽ xử lý dữ liệu theo thời gian thực nghiêm ngặt có thể khiến bạn chậm trễ hơn trên mỗi hàng đợi nhưng thông lượng tốt hơn. Ví dụ:

Ví dụ - để chuỗi nhà sản xuất chạy trên CPU của chính nó và đặt các chuỗi thông điệp vào chuỗi tiêu thụ. Mỗi luồng người tiêu dùng sau đó sẽ phân phối thông điệp tới hàng đợi riêng của N, thực hiện bước xử lý, nhận lô dữ liệu mới và cứ tiếp tục như vậy. Một lần nữa, phụ thuộc vào dung sai trễ của bạn để bước xử lý có thể có nghĩa là xử lý tất cả các hàng đợi, một số hàng đợi cố định, như nhiều hàng đợi có thể trừ khi đạt đến ngưỡng thời gian. Có thể dễ dàng biết hàng đợi thuộc về chuỗi tiêu dùng nào (ví dụ: nếu hàng đợi được đánh số liên tiếp: int consumerThreadNum = queueNum & 0x03) sẽ có lợi khi xem chúng trong bảng băm mỗi lần có thể chậm.

Để giảm thiểu sự cố bộ nhớ, có thể không phải là một ý tưởng tốt để tạo/hủy hàng đợi để bạn có thể phân bổ trước một đối tượng hàng đợi (tối đa số lượng/số lượng lõi) cho mỗi chuỗi. Khi một hàng đợi được hoàn thành thay vì bị phá hủy, nó có thể được xóa và sử dụng lại. Bạn không muốn gc theo cách của bạn quá thường xuyên và quá lâu.

Không rõ là nếu nhà sản xuất của bạn tạo bộ dữ liệu hoàn chỉnh cho mỗi hàng đợi hoặc sẽ gửi dữ liệu theo khối cho đến khi lệnh KILL được nhận. Nếu nhà sản xuất của bạn gửi các bộ dữ liệu hoàn chỉnh, bạn có thể loại bỏ hoàn toàn khái niệm xếp hàng và chỉ xử lý dữ liệu khi nó đến một chuỗi người tiêu dùng.

0

Trong trường hợp không có thêm chi tiết về các khó khăn trong việc xử lý các biểu tượng, thật khó để đưa ra lời khuyên cụ thể.

Bạn nên hãy xem bài viết slashdot này:

http://developers.slashdot.org/story/10/07/27/1925209/Java-IO-Faster-Than-NIO

Nó có khá nhiều cuộc thảo luận và các dữ liệu đo thực tế về các chủ đề nhiều so với đơn chọn so với lập luận chủ đề bi-a.

2

Bạn có thể sử dụng này (tín dụng đi vào Which ThreadPool in Java should I use?):

class Main { 
    ExecutorService threadPool = Executors.newFixedThreadPool(
            Runtime.availableProcessors()*2); 

    public static void main(String[] args){ 
     Set<Consumer> consumers = getConsumers(threadPool); 
     for(Consumer consumer : consumers){ 
      threadPool.execute(consumer); 
     } 
    } 
} 

class Consumer { 
    private final ExecutorService tp; 
    private final MessageQueue messageQueue; 
    Consumer(ExecutorService tp,MessageQueue queue){ 
     this.tp = tp; 
     this.messageQueue = queue; 
    } 
    @Override 
    public void run(){ 
       Message msg = messageQueue.poll(); 

       if (msg != null) { 
        try{ 
         ... // do something with the message 
        finally{ 
         this.tp.execute(this); 
        } 
       } 
      } 
    } 
}  

Bằng cách này, bạn có thể có kế hoạch ổn với rất ít rắc rối.

Các vấn đề liên quan