2012-05-16 39 views
6

Tôi vẫn đang trong quá trình gói bộ não của mình xung quanh cách hoạt động của đồng thời trong Java. Tôi hiểu rằng (nếu bạn đang đăng ký mô hình đồng thời Java 5 OO), bạn triển khai Task hoặc Callable với phương thức run() hoặc call() (tương ứng) và điều đó cho phép bạn thực hiện song song nhiều phương pháp được triển khai nhất có thể.Đa luồng một tệp lớn đọc

Nhưng tôi vẫn không hiểu cái gì đó cố hữu về lập trình đồng thời trong Java:

  • thế nào là một Task 's phương pháp run() giao đúng số lượng công việc đồng thời được thực hiện?

Là một ví dụ cụ thể, những gì nếu tôi có một I/O-bound readMobyDick() phương pháp mà đọc toàn bộ nội dung của Herman Melville Moby Dick vào bộ nhớ từ một tập tin trên hệ thống địa phương. Và chúng ta hãy chỉ nói rằng tôi muốn phương pháp readMobyDick() này là đồng thời và xử lý bởi 3 chủ đề, trong đó:

  • Chủ đề # 1 đọc 1/3 đầu tiên của cuốn sách vào bộ nhớ
  • Chủ đề # 2 lần đọc thứ hai 1/3 của cuốn sách vào bộ nhớ
  • chủ đề # 3 lần đọc cuối cùng 1/3 của cuốn sách vào bộ nhớ

tôi có cần phải đoạn Moby Dick lên thành ba tập tin và vượt qua chúng từng đến nhiệm vụ của mình hoặc làm II chỉ cần gọi readMobyDick() từ bên trong imp lemented run() phương pháp và (bằng cách nào đó) các Executor biết làm thế nào để phá vỡ công việc trong số các chủ đề.

Tôi là người học rất trực quan, vì vậy bất kỳ ví dụ mã nào về cách tiếp cận phù hợp đều được đánh giá cao! Cảm ơn!

+0

tên phương pháp Tuyệt vời! Khi tôi cố gắng đọc Moby Dick tôi thấy tôi phải làm điều đó đồng thời (xen kẽ với những cuốn sách khác) ;-) –

+0

Vì vậy ... cá voi trắng lớn sẽ là .NET, phải không? –

Trả lời

14

Có thể bạn đã chọn ngẫu nhiên ví dụ tồi tệ nhất tuyệt đối về các hoạt động song song!

Đọc song song với một đĩa cơ học thực sự chậm hơn so với đọc bằng một chuỗi đơn, bởi vì bạn đang thực sự nảy đầu cơ khí vào các phần khác nhau của đĩa khi mỗi luồng được lượt chạy. Điều này là tốt nhất còn lại như là một hoạt động đơn luồng. Chúng ta hãy lấy một ví dụ khác, tương tự như của bạn nhưng thực sự có thể mang lại một số lợi ích: giả sử tôi muốn tìm kiếm sự xuất hiện của một từ nhất định trong một danh sách khổng lồ các từ (danh sách này thậm chí có thể đến từ một tệp đĩa). , nhưng như tôi đã nói, đọc bởi một sợi đơn). Giả sử tôi có thể sử dụng 3 chủ đề như trong ví dụ của bạn, mỗi chủ đề tìm kiếm trên 1/3 của danh sách từ khổng lồ và giữ một bộ đếm cục bộ bao nhiêu lần từ tìm kiếm xuất hiện.

Trong trường hợp này, bạn muốn chia danh sách thành 3 phần, chuyển từng phần cho một đối tượng khác có loại thực hiện Runnable và thực hiện tìm kiếm theo phương thức run.

Bản thân thời gian chạy không có ý tưởng cách thực hiện phân vùng hoặc bất kỳ thứ gì như thế, bạn phải tự xác định nó. Có rất nhiều chiến lược phân vùng khác, mỗi chiến lược có điểm mạnh và điểm yếu riêng, nhưng chúng ta có thể gắn bó với phân vùng tĩnh ngay bây giờ.

Hãy xem một số mã:

class SearchTask implements Runnable { 
    private int localCounter = 0; 
    private int start; // start index of search 
    private int end; 
    private List<String> words; 
    private String token; 

    public SearchTask(int start, int end, List<String> words, String token) { 
     this.start = start; 
     this.end = end; 
     this.words = words; 
     this.token = token; 
    } 

    public void run() { 
     for(int i = start; i < end; i++) { 
       if(words.get(i).equals(token)) localCounter++; 
     } 
    } 

    public int getCounter() { return localCounter; } 
} 

// meanwhile in main :) 

List<String> words = new ArrayList<String>(); 
// populate words 
// let's assume you have 30000 words 

// create tasks 
SearchTask task1 = new SearchTask(0, 10000, words, "John"); 
SearchTask task2 = new SearchTask(10000, 20000, words, "John"); 
SearchTask task3 = new SearchTask(20000, 30000, words, "John"); 

// create threads 
Thread t1 = new Thread(task1); 
Thread t2 = new Thread(task1); 
Thread t3 = new Thread(task1); 

// start threads 
t1.start(); 
t2.start(); 
t3.start(); 

// wait for threads to finish 
t1.join(); 
t2.join(); 
t3.join(); 

// collect results 
int counter = 0; 
counter += task1.getCounter(); 
counter += task2.getCounter(); 
counter += task3.getCounter(); 

này nên hoạt động độc đáo. Lưu ý rằng trong các trường hợp thực tế, bạn sẽ xây dựng một lược đồ phân vùng chung hơn. Bạn có thể sử dụng cách khác ExecutorService và triển khai Callable thay vì Runnable nếu bạn muốn trả lại kết quả.

Vì vậy, một ví dụ khác sử dụng các cấu trúc tiên tiến hơn:

class SearchTask implements Callable<Integer> { 
    private int localCounter = 0; 
    private int start; // start index of search 
    private int end; 
    private List<String> words; 
    private String token; 

    public SearchTask(int start, int end, List<String> words, String token) { 
     this.start = start; 
     this.end = end; 
     this.words = words; 
     this.token = token; 
    } 

    public Integer call() { 
     for(int i = start; i < end; i++) { 
       if(words.get(i).equals(token)) localCounter++; 
     } 
     return localCounter; 
    }   
} 

// meanwhile in main :) 

List<String> words = new ArrayList<String>(); 
// populate words 
// let's assume you have 30000 words 

// create tasks 
List<Callable> tasks = new ArrayList<Callable>(); 
tasks.add(new SearchTask(0, 10000, words, "John")); 
tasks.add(new SearchTask(10000, 20000, words, "John")); 
tasks.add(new SearchTask(20000, 30000, words, "John")); 

// create thread pool and start tasks 
ExecutorService exec = Executors.newFixedThreadPool(3); 
List<Future> results = exec.invokeAll(tasks); 

// wait for tasks to finish and collect results 
int counter = 0; 
for(Future f: results) { 
    counter += f.get(); 
} 
+0

Vậy điều gì sẽ là một ví dụ hay về một tác vụ có lợi từ việc đa luồng? Tôi thực sự không quan tâm đến việc đọc tệp từ đĩa - tôi quan tâm đến việc xem ví dụ về cách thức hoạt động, hơi thở (** code **) về cách thức hoạt động của công việc được sắp xếp và cấp cho các tác vụ. – IAmYourFaja

+0

@herpylderp: Tôi đã đăng chỉnh sửa. Mã sẽ sớm ra mắt. :) – Tudor

+0

Một ví dụ tốt sẽ là một hàng đợi được phục vụ bởi nhiều chủ đề –

1

Bạn chọn một tấm gương xấu, như Tudor rất tử tế với chỉ ra. Phần cứng đĩa quay chịu sự hạn chế về mặt vật lý của các đĩa và đầu di chuyển, và việc thực hiện đọc hiệu quả nhất là đọc từng khối theo thứ tự, làm giảm nhu cầu di chuyển đầu hoặc đợi đĩa sắp xếp.

Điều đó nói rằng, một số hệ điều hành không phải lúc nào cũng lưu trữ liên tục trên đĩa, và đối với những người nhớ, phân mảnh có thể cung cấp tăng hiệu suất đĩa nếu hệ điều hành/tệp không thực hiện công việc cho bạn.

Như bạn đã đề cập muốn có một chương trình có lợi, hãy để tôi đề xuất một cách đơn giản, bổ sung ma trận.

Giả sử bạn tạo một luồng cho mỗi lõi, bạn có thể chia nhỏ hai ma trận bất kỳ để được thêm vào N (một cho mỗi luồng) hàng. Matrix Ngoài (nếu bạn gọi lại) hoạt động như vậy:

A + B = C 

hoặc

[ a11, a12, a13 ] [ b11, b12, b13] = [ (a11+b11), (a12+b12), (a13+c13) ] 
[ a21, a22, a23 ] + [ b21, b22, b23] = [ (a21+b21), (a22+b22), (a23+c23) ] 
[ a31, a32, a33 ] [ b31, b32, b33] = [ (a31+b31), (a32+b32), (a33+c33) ] 

Vì vậy, để phân phối này qua N đề, chúng tôi chỉ đơn giản là cần phải thực hiện đếm hàng và phân chia mô đun bằng của số đề để có được "id thread" nó sẽ được thêm vào.

matrix with 20 rows across 3 threads 
row % 3 == 0 (for rows 0, 3, 6, 9, 12, 15, and 18) 
row % 3 == 1 (for rows 1, 4, 7, 10, 13, 16, and 19) 
row % 3 == 2 (for rows 2, 5, 8, 11, 14, and 17) 
// row 20 doesn't exist, because we number rows from 0 

Bây giờ mỗi chủ đề "biết" mà hàng nó nên xử lý, và kết quả "mỗi hàng" có thể được tính trivially vì kết quả không vượt qua vào miền thread khác của tính toán.

Tất cả những gì cần thiết bây giờ là cấu trúc dữ liệu "kết quả" theo dõi khi các giá trị được tính toán và khi giá trị cuối cùng được đặt, khi đó tính toán hoàn tất. Trong ví dụ "giả" này của một kết quả ma trận bổ sung với hai luồng, tính toán câu trả lời với hai luồng mất khoảng một nửa thời gian.

// the following assumes that threads don't get rescheduled to different cores for 
// illustrative purposes only. Real Threads are scheduled across cores due to 
// availability and attempts to prevent unnecessary core migration of a running thread. 
[ done, done, done ] // filled in at about the same time as row 2 (runs on core 3) 
[ done, done, done ] // filled in at about the same time as row 1 (runs on core 1) 
[ done, done, .... ] // filled in at about the same time as row 4 (runs on core 3) 
[ done, ...., .... ] // filled in at about the same time as row 3 (runs on core 1) 

Các vấn đề phức tạp khác có thể được giải quyết bằng đa luồng và các vấn đề khác nhau được giải quyết bằng các kỹ thuật khác nhau. Tôi đã chọn một trong những ví dụ đơn giản nhất.

1

bạn thực hiện một nhiệm vụ hoặc Callable với một phương pháp run() hoặc gọi số() (tương ứng), và nó behooves bạn parallelize càng nhiều mà phương pháp thực hiện càng tốt.

Một Task đại diện cho một đơn vị rời rạc công việc
tải một tập tin vào bộ nhớ là một đơn vị riêng biệt của công việc và có thể do hoạt động này có thể được giao cho một sợi nền. I E. một chủ đề nền chạy nhiệm vụ tải tệp này.
Nó là một đơn vị rời rạc của công việc vì nó không có phụ thuộc khác cần thiết để làm công việc của mình (tải các tập tin) và có ranh giới rời rạc.
Điều bạn đang yêu cầu là chia thêm điều này thành công việc. I E. một chủ đề tải 1/3 của tập tin trong khi một chủ đề 2/3 vv
Nếu bạn có thể chia nhiệm vụ thành các phần phụ tiếp theo thì nó sẽ không phải là một nhiệm vụ ở vị trí đầu tiên theo định nghĩa. Vì vậy, tải một tập tin là một nhiệm vụ duy nhất của chính nó.

Để cung cấp cho bạn ví dụ:
Giả sử bạn có GUI và bạn cần hiển thị cho dữ liệu người dùng từ 5 tệp khác nhau. Để trình bày chúng, bạn cũng cần chuẩn bị một số cấu trúc dữ liệu để xử lý dữ liệu thực tế.
Tất cả những điều này là các nhiệm vụ riêng biệt.
Ví dụ: tải các tập tin là 5 nhiệm vụ khác nhau vì vậy có thể được thực hiện bởi 5 chủ đề khác nhau.
Việc chuẩn bị các cấu trúc dữ liệu có thể được thực hiện một chủ đề khác.
GUI chạy tất nhiên trong một chuỗi khác.
Tất cả những có thể xảy ra đồng thời

-1

Nếu bạn hệ thống hỗ trợ cao thông lượng I/O, đây là cách bạn có thể làm điều đó:

How to read a file using multiple threads in Java when a high throughput(3GB/s) file system is available

Dưới đây là giải pháp để đọc một tập tin duy nhất với nhiều chủ đề.

Chia tệp thành các đoạn N, đọc từng đoạn trong một chuỗi, sau đó hợp nhất chúng theo thứ tự. Cẩn thận với các đường mà vượt qua ranh giới chunk. Đó là ý tưởng cơ bản theo đề nghị của người sử dụng slaks

Bench đánh dấu dưới đây thực hiện nhiều chủ đề cho một tập tin 20 GB duy nhất:

1 Ðề tài: 50 giây: 400 MB/s

2 Chủ đề: 30 giây: 666 MB/s

4 Chủ đề: 20 giây: 1GB/s

8 Chủ đề: 60 Secon ds: 333 MB/s

readAllLines Java7 tương đương(): 400 giây: 50 MB/s

Lưu ý: Đây chỉ có thể làm việc trên các hệ thống được thiết kế để hỗ trợ cao thông lượng I/O, và không phải trên thông thường máy tính cá nhân

đây là nits thiết yếu của mã này, để biết chi tiết hoàn chỉnh, hãy làm theo các liên kết

public class FileRead implements Runnable 
{ 

private FileChannel _channel; 
private long _startLocation; 
private int _size; 
int _sequence_number; 

public FileRead(long loc, int size, FileChannel chnl, int sequence) 
{ 
    _startLocation = loc; 
    _size = size; 
    _channel = chnl; 
    _sequence_number = sequence; 
} 

@Override 
public void run() 
{ 
     System.out.println("Reading the channel: " + _startLocation + ":" + _size); 

     //allocate memory 
     ByteBuffer buff = ByteBuffer.allocate(_size); 

     //Read file chunk to RAM 
     _channel.read(buff, _startLocation); 

     //chunk to String 
     String string_chunk = new String(buff.array(), Charset.forName("UTF-8")); 

     System.out.println("Done Reading the channel: " + _startLocation + ":" + _size); 

} 

//args[0] is path to read file 
//args[1] is the size of thread pool; Need to try different values to fing sweet spot 
public static void main(String[] args) throws Exception 
{ 
    FileInputStream fileInputStream = new FileInputStream(args[0]); 
    FileChannel channel = fileInputStream.getChannel(); 
    long remaining_size = channel.size(); //get the total number of bytes in the file 
    long chunk_size = remaining_size/Integer.parseInt(args[1]); //file_size/threads 


    //thread pool 
    ExecutorService executor = Executors.newFixedThreadPool(Integer.parseInt(args[1])); 

    long start_loc = 0;//file pointer 
    int i = 0; //loop counter 
    while (remaining_size >= chunk_size) 
    { 
     //launches a new thread 
     executor.execute(new FileRead(start_loc, toIntExact(chunk_size), channel, i)); 
     remaining_size = remaining_size - chunk_size; 
     start_loc = start_loc + chunk_size; 
     i++; 
    } 

    //load the last remaining piece 
    executor.execute(new FileRead(start_loc, toIntExact(remaining_size), channel, i)); 

    //Tear Down 

} 

}