2015-03-20 15 views
9

Tôi đang mô phỏng một hệ thống ngân hàng mà tôi có 100.000 giao dịch để chạy. Mỗi loại giao dịch thực hiện runnable và tôi có nhiều loại giao dịch khác nhau có thể xảy ra.Chạy 100.000 quy trình đồng thời

transactions là một loạt các Runnables.

Lý tưởng nhất, các mã sau đây sẽ giải quyết vấn đề của tôi:

for (Transaction transaction : transactions) { 
    new Thread(transaction).start(); 
} 

Tuy nhiên, rõ ràng là một java.lang.OutOfMemoryError: unable to create new native thread là ràng buộc để xảy ra khi cố gắng để bắt đầu 100,000 chủ đề.

Vì vậy, tiếp theo, tôi đã thử triển khai ExecutorService để tạo một nhóm luồng để quản lý 100.000 lần chạy của tôi.

ExecutorService service; 
int cpus = Runtime.getRuntime().availableProcessors(); 
// cpus == 8 in my case 
service = Executors.newFixedThreadPool(cpus); 

for (Transaction transaction : transactions) { 
    service.execute(transaction); 
} 

Khi thử phương pháp này, các quy trình dài "hog" JVM. Ví dụ: một loại giao dịch mất 30 - 60 giây để thực thi. Khi lược tả ứng dụng, không có chủ đề nào khác được phép chạy trong khi giao dịch dài diễn ra.

In this case, thread 6 did not allow any other threads to run until its single transaction was complete

Trong trường hợp này, chủ đề 6 không cho phép bất kỳ chủ đề khác để chạy cho đến khi chế biến của nó đã được hoàn tất.

Vì vậy, câu hỏi của tôi là: Làm thế nào tôi có thể chạy 100.000 giao dịch nhanh nhất có thể mà không gặp phải vấn đề về bộ nhớ? Nếu ExecutorService là câu trả lời, thì làm thế nào tôi có thể dừng các giao dịch rất dài từ việc hogging JVM và cho phép các giao dịch khác chạy đồng thời?

EDIT:

Tôi buộc một số loại giao dịch xảy ra trong vòng 30 - 60 giây trên mục đích để đảm bảo rằng chương trình ren của tôi đang làm việc một cách chính xác. Mỗi giao dịch sẽ khóa trên một tài khoản và có 10 tài khoản. Đây là phương pháp của tôi mà heo JVM: (gọi bằng run())

public void makeTransaction() { 
    synchronized(account) { 
     long timeStarted = System.nanoTime(); 
     long timeToEnd = timeStarted + nanos; 

     this.view = new BatchView(transactionNumber, account.getId()); 

     this.displayView(); 

     while(true) { 
      if(System.nanoTime() % 1000000000 == 0) { 
       System.out.println("batch | " + account.getId()); 
      } 

      if(System.nanoTime() >= timeToEnd) { 
       break; 
      } 
     } 
    } 
} 

Mỗi lần giao dịch này được chạy, chỉ có một tài khoản sẽ bị khóa, khiến 9 người khác mà nên có sẵn để xử lý. Làm thế nào để JVM không xử lý thêm bất kỳ chủ đề nào, và thay vào đó bị treo cho đến khi giao dịch dài này kết thúc?

Đây là một liên kết đến một phiên bản rút gọn của dự án để chứng minh vấn đề: project

+3

Bạn không nên tạo cùng một số lượng luồng làm giao dịch. Thay vào đó tạo ra số lượng cố định của các chủ đề như 100 và cho phép họ làm việc trên 100000 giao dịch bằng cách sử dụng ExecutorsService – virendrao

+0

Bạn có thể hiển thị [một ví dụ hoàn chỉnh] (http://stackoverflow.com/help/mcve) sao chép hành vi đó không? – assylias

+0

@assylias đã thêm – Feek

Trả lời

2

Vấn đề với ứng dụng của bạn là rất sớm tất cả các luồng sẽ chỉ định giao dịch cho cùng một tài khoản, và sau đó tất cả chỉ một luồng phải chờ. Bạn có thể thấy điều này trong ảnh chụp màn hình sau, tôi đã tạm dừng ứng dụng.Thread pool-1-thread-3 hiện đang xử lý một giao dịch cho đối tượng Account với id 19 (id này không phải là id tài khoản của bạn, nhưng một đối tượng duy nhất mà Eclipse gán), và tất cả các luồng khác đang chờ khóa trên cùng Đối tượng tài khoản. Các đối tượng tài khoản là một trong những nơi id của bạn là 9.

Screenshot of debugger

Tại sao điều này xảy ra? Trong giao dịch 853, một luồng bắt đầu giao dịch dài đầu tiên (đối với tài khoản 9). Các chủ đề khác tiếp tục hoạt động trên các giao dịch khác. Tuy nhiên, khi bất kỳ chủ đề nào đạt đến một giao dịch khác cho tài khoản 9, nó sẽ phải dừng và đợi. Các giao dịch 857, 861 và 862 cũng dành cho tài khoản 9, và mỗi khối một luồng, vì vậy tại thời điểm này tất cả các luồng của tôi bị chặn (trên lõi tứ của tôi).

Cách giải quyết vấn đề này? Điều này tùy thuộc vào trường hợp sử dụng của bạn.

Nếu trong chương trình thực tế của bạn được đảm bảo rằng không có giao dịch đến cho một tài khoản đã cho X miễn là có một giao dịch khác đang chạy cho tài khoản X, bạn không cần phải thay đổi bất kỳ thứ gì.

Nếu số lượng tài khoản của bạn là rất lớn so với số lượng chủ đề, vấn đề trở nên khó xảy ra hơn, vì vậy bạn có thể quyết định sống với nó.

Nếu số tài khoản của bạn tương đối thấp (giả sử có thể nhỏ hơn một trăm hoặc nhiều hơn), bạn nên (như Peter nói) có một chuỗi (không ngừng chạy) cho mỗi tài khoản, mỗi luồng có hàng đợi giao dịch riêng. Điều này có lẽ sẽ hiệu quả hơn, bởi vì các chủ đề không cần phải "chiến đấu" trên hàng đợi được chia sẻ.

Một giải pháp khác là triển khai một số hình thức "đánh cắp công việc". Điều này có nghĩa rằng bất cứ khi nào một sợi sẽ bị chặn, thay vào đó nó sẽ tìm kiếm một số công việc khác để thực hiện. Để thực hiện điều này, trước tiên bạn cần phải kiểm tra xem một chuỗi có thể lấy khóa cho một tài khoản cụ thể hay không. Với synchronized trong Java, điều này là không thể, vì vậy bạn cần một cái gì đó như ReentrantLock.tryLock(). Bạn cũng cần có khả năng truy cập trực tiếp vào hàng đợi giao dịch từ mỗi luồng, vì vậy tôi đoán bạn không thể sử dụng ExecutorService tại đây nhưng cần tự thực hiện xử lý giao dịch (sử dụng LinkedBlockingQueue).

Bây giờ mỗi luồng sẽ thăm dò ý kiến ​​các giao dịch từ hàng đợi trong một vòng lặp. Đầu tiên, nó cố gắng lấy khóa cho tài khoản tương ứng với tryLock(). Nếu điều này không thành công, hãy thêm giao dịch vào danh sách (chủ đề cụ thể), tìm nạp giao dịch tiếp theo từ hàng đợi và thử giao dịch này, cho đến khi bạn tìm thấy một giao dịch bạn có thể xử lý. Sau khi giao dịch kết thúc, đầu tiên hãy xem trong danh sách để có thể xử lý các giao dịch trước khi có thể thực hiện giao dịch khác từ hàng đợi toàn cầu. Các mã có thể giúp bạn xấp xỉ như sau:

public BlockingQueue<Transaction> queue = ...; // the global queue for all threads 

public void run() { 
    LinkedList<Transaction> myTransactions = new LinkedList<>(); 
    while (true) { 
    Transaction t = queue.take(); 
    while (!t.getLock().tryLock()) { 
     myTransactions.add(t); 
    } 
    try { 
     // here we hold the lock for t 
     t.makeTransaction(); 
    } finally { 
     t.getLock().unlock(); 
    } 

    Iterator<Transaction> iter = myTransactions.iterator(); 
    while (iter.hasNext()) { 
     t = iter.next(); 
     if (t.getLock().tryLock()) { 
     try { 
      t.makeTransaction(); 
     } finally { 
      t.getLock().unlock(); 
     } 
     iter.remove(); 
     } 
    } 
    } 
} 

Lưu ý rằng điều này vẫn có ít nhất các vấn đề sau đây bạn có thể muốn giải quyết:

  • Trong khi một sợi treo trong queue.take(), nó không kiểm tra xem các giao dịch trong danh sách của nó đã có sẵn. Vì vậy, nếu có khoảng thời gian trong đó queue trống (ở cuối quá trình xử lý), có thể có các giao dịch bị kẹt trong danh sách không được xử lý.
  • Nếu một số lượng lớn khóa đang được giữ bởi một số chủ đề, các chủ đề còn lại có thể mất rất nhiều giao dịch mà họ không thể xử lý ngay bây giờ, vì vậy họ sẽ điền vào danh sách địa phương của họ, thoát khỏi hàng đợi toàn cầu. Khi các khóa được giải phóng, nhiều giao dịch có thể đã bị xóa khỏi hàng đợi toàn cầu, tạo ra sự mất cân đối giữa công việc mà các luồng có thể làm (một số luồng có thể không hoạt động trong khi các luồng khác vẫn đang hoạt động trên các giao dịch dài).

Một lựa chọn đơn giản hơn có thể là để put() giao dịch vào hàng đợi (ở cuối) nếu bạn không thể có được các khóa cho họ, nhưng điều này sẽ làm cho họ thực hiện theo một thứ tự rất tùy ý (có thể xảy ra với các giải pháp trên cũng vậy, nhưng có lẽ không quá cực kỳ).

Chỉnh sửa: Một giải pháp tốt hơn có thể là đính kèm hàng đợi vào từng tài khoản thay vì danh sách chỉ theo chuỗi. Sau đó, một chủ đề sẽ thêm một giao dịch vào hàng đợi của tài khoản tương ứng bất cứ khi nào nó tìm thấy tài khoản này bị chặn. Khi một luồng kết thúc giao dịch cho tài khoản X, đầu tiên nó sẽ nhìn vào hàng đợi của tài khoản X, nếu bất kỳ giao dịch nào đã được thêm vào đó, trước khi xem danh sách toàn cầu.

9

Khi profiling ứng dụng, không có chủ đề khác đang được phép chạy trong khi giao dịch dài diễn ra.

Rất có thể, tác vụ này đang sử dụng tài nguyên đơn luồng. tức là cách ti được viết ngăn việc sử dụng đồng thời.

Làm cách nào tôi có thể chạy 100.000 giao dịch nhanh nhất có thể mà không gặp sự cố về bộ nhớ?

Nếu giao dịch là CPU bị ràng buộc, bạn nên có một nhóm có cùng kích thước với số lượng CPU bạn có.

Nếu các giao dịch phụ thuộc vào cơ sở dữ liệu, bạn nên xem xét việc nhóm chúng để sử dụng cơ sở dữ liệu hiệu quả hơn.

Nếu ExecutorService là câu trả lời, thì làm cách nào tôi có thể dừng giao dịch quá lâu để hoán đổi JVM và cho phép các giao dịch khác chạy đồng thời?

Làm cho giao dịch ngắn hơn nhiều. Nếu bạn có một nhiệm vụ chạy trong hơn một vài phần nghìn giây, bạn nên tìm ra lý do tại sao nó mất quá nhiều thời gian. Tôi sẽ bắt đầu bằng cách nhìn vào cách mạng/IO là nó sử dụng và lược tả nhiệm vụ. Hầu hết các giao dịch (nếu bạn có số lượng lớn) phải vào khoảng 0,01 giây hoặc ít lý tưởng hơn nhiều.

Bạn nên cẩn thận khi cân nhắc cách sử dụng tài nguyên được chia sẻ. Nếu nhiệm vụ của bạn sử dụng cùng một tài nguyên quá nhiều, bạn có thể thấy rằng đa luồng không nhanh hơn, hoặc thậm chí còn chậm hơn.

+0

Mỗi giao dịch khóa trên một tài khoản (có 10 tài khoản). Giao dịch lâu dài là có mục đích: Tôi khóa trên tài khoản được chọn duy nhất trong khoảng thời gian ngẫu nhiên từ 30 đến 60 giây để xem hệ thống sẽ phản ứng như thế nào. Khi một tài khoản bị khóa trong 30 giây, các tài khoản khác vẫn tự do bị đột biến bởi các chủ đề khác trong nhóm. Không có cơ sở dữ liệu để lo lắng về và tôi có một hồ bơi thread có cùng kích thước với số lượng CPU mà tôi có. Tôi còn thiếu gì nữa? – Feek

+2

@ Rất có thể tất cả các tác vụ đang chờ trên cùng một tài khoản. Bạn chỉ cần một khối lượng công việc rất không cân bằng để vào trạng thái này. Nếu bạn chỉ có 10 tài khoản, một cách tiếp cận lành mạnh hơn nhiều là có 10 chủ đề trong đó bạn gán tất cả công việc cho từng tài khoản cho chuỗi của riêng nó. Bằng cách này, họ cũng sẽ chạy song song vì các chủ đề sẽ không bao giờ cố gắng truy cập vào cùng một tài khoản. –

1

Điều quan trọng là phải tính số lượng chuỗi công nhân có thể xử lý giao dịch cho bạn dựa trên phần cứng của bạn. Có vài công thức có sẵn để kích thước hồ bơi thread

Đối với CPU ràng buộc các ứng dụng

N * U hoặc (N + 1) * U

Đối với các ứng dụng IO bị ràng buộc

N * U * (1 + W/C)

nơi N - Số lượng bộ xử lý U - mục tiêu CPU Sử dụng W - Chờ thời gian C - Tính toán Thời gian

Ví dụ, nếu ứng dụng của bạn đang sử dụng 50% CPU và bạn có một 8 lõi . Sau đó, cho CPU bị ràng buộc các ứng dụng đa luồng để đạt hiệu quả bạn có

8 * (0,5) = 4

Nếu bạn có 4 chủ đề sau đó tất cả các lõi của bạn sẽ được xử lý một cách hiệu quả. Điều này thay đổi trong một số lợn có hỗ trợ hyperthreading

-1

Thực hiện 100.000 cuộc gọi trong các chuỗi riêng biệt khó thực hiện nếu bạn thực hiện từ máy tính xách tay hoặc thậm chí là máy tính để bàn 16 lõi. Bạn sẽ cần một mạng lưới hoặc một loạt các máy chủ để tối ưu thực hiện điều này.

Tuy nhiên, bạn vẫn có thể kéo dài điều này bằng cách thực hiện bất kỳ thao tác giao dịch nào trong callback. Thông lượng của bạn có thể tăng lên.

Các vấn đề liên quan