2012-01-18 37 views
7

Tôi muốn thực hiện một số tác vụ khác nhau song song, nhưng có khái niệm rằng nếu tác vụ đã được xếp hàng hoặc đang xử lý, nó sẽ không được xếp hàng đợi. Tôi đã đọc một chút về Java API và đã đưa ra mã dưới đây, mà dường như làm việc. Ai cũng có thể làm sáng tỏ liệu phương pháp tôi đang sử dụng có phải là cách tiếp cận tốt nhất hay không. Bất kỳ mối nguy hiểm (an toàn thread?) Hoặc cách tốt hơn để làm điều này? Mã là như sau:Chủ đề Xử lý hồ sơ 'trùng lặp' nhiệm vụ

import java.util.HashMap; 
import java.util.concurrent.Future; 
import java.util.concurrent.LinkedBlockingQueue; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 

public class TestExecution implements Runnable { 
    String key1; 
    String key2; 
    static HashMap<TestExecution, Future<?>> executions = new HashMap<TestExecution, Future<?>>(); 
    static LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>(); 
    static ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, q); 

    public static void main(String[] args) { 
     try { 
     execute(new TestExecution("A", "A")); 
     execute(new TestExecution("A", "A")); 
     execute(new TestExecution("B", "B")); 
     Thread.sleep(8000); 
     execute(new TestExecution("B", "B")); 
     } catch (InterruptedException e) { 
     e.printStackTrace(); 
     } 
    } 

    static boolean execute(TestExecution e) { 
     System.out.println("Handling "+e.key1+":"+e.key2); 
     if (executions.containsKey(e)) { 
     Future<?> f = (Future<?>) executions.get(e); 
     if (f.isDone()) { 
      System.out.println("Previous execution has completed"); 
      executions.remove(e); 
     } else { 
      System.out.println("Previous execution still running"); 
      return false; 
     }   
     } 
     else { 
     System.out.println("No previous execution"); 
     } 
     Future<?> f = tpe.submit(e); 
     executions.put(e, f);    
     return true; 
    } 

    public TestExecution(String key1, String key2) { 
     this.key1 = key1; 
     this.key2 = key2;  
    } 

    public boolean equals(Object obj) 
    { 
     if (obj instanceof TestExecution) 
     { 
      TestExecution t = (TestExecution) obj; 
      return (key1.equals(t.key1) && key2.equals(t.key2));   
     }  
     return false; 
    } 

    public int hashCode() 
    { 
     return key1.hashCode()+key2.hashCode(); 
    } 

    public void run() {  
     try { 
     System.out.println("Start processing "+key1+":"+key2); 
     Thread.sleep(4000); 
     System.out.println("Finish processing "+key1+":"+key2); 
     } catch (InterruptedException e) { 
     e.printStackTrace(); 
     }  
    }    
} 

Theo dõi bình luận dưới đây:
Kế hoạch này là kích hoạt các nhiệm vụ để thực hiện sẽ được xử lý bởi cron gọi dịch vụ web RESTful. Ví dụ dưới đây là thiết lập cho một nhiệm vụ được kích hoạt lúc 9:30 mỗi ngày, cộng với một nhiệm vụ được lên lịch sau mỗi hai phút.

0/2 * * * * restclient.pl key11 key12 
30 09 * * * restclient.pl key21 key22 

Trong trường hợp này, nếu khóa công việc11: key12 đang chạy hoặc đã xếp hàng để chạy, tôi không muốn xếp hàng một phiên bản khác. Tôi hiểu rằng chúng tôi có các tùy chọn khác để lên lịch, tuy nhiên chúng tôi có xu hướng sử dụng cron cho các tác vụ khác, vì vậy tôi muốn cố gắng giữ lại điều này.

Cập nhật thứ hai. Đáp lại các bình luận cho đến nay tôi đã viết lại mã, bạn có thể bình luận về bất kỳ vấn đề nào với giải pháp được cập nhật sau đây không?

import java.util.concurrent.LinkedBlockingQueue; 

public class TestExecution implements Runnable { 
    String key1; 
    String key2;  
    static TestThreadPoolExecutor tpe = new TestThreadPoolExecutor(new LinkedBlockingQueue<Runnable>()); 

    public static void main(String[] args) { 
     try { 
     tpe.execute(new TestExecution("A", "A")); 
     tpe.execute(new TestExecution("A", "A")); 
     tpe.execute(new TestExecution("B", "B")); 
     Thread.sleep(8000); 
     tpe.execute(new TestExecution("B", "B")); 
     } catch (InterruptedException e) { 
     e.printStackTrace(); 
     } 
    } 

    public TestExecution(String key1, String key2) { 
     this.key1 = key1; 
     this.key2 = key2;  
    } 

    public boolean equals(Object obj) 
    { 
     if (obj instanceof TestExecution) 
     { 
      TestExecution t = (TestExecution) obj; 
      return (key1.equals(t.key1) && key2.equals(t.key2));   
     }  
     return false; 
    } 

    public int hashCode() 
    { 
     return key1.hashCode()+key2.hashCode(); 
    } 

    public void run() {  
     try { 
     System.out.println("Start processing "+key1+":"+key2); 
     Thread.sleep(4000); 
     System.out.println("Finish processing "+key1+":"+key2); 
     } catch (InterruptedException e) { 
     e.printStackTrace(); 
     }  
    } 
} 


import java.util.Collections; 
import java.util.HashSet; 
import java.util.Set; 
import java.util.concurrent.LinkedBlockingQueue; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 


public class TestThreadPoolExecutor extends ThreadPoolExecutor { 
    Set<Runnable> executions = Collections.synchronizedSet(new HashSet<Runnable>()); 

    public TestThreadPoolExecutor(LinkedBlockingQueue<Runnable> q) {  
     super(2, 5, 1, TimeUnit.MINUTES, q);  
    } 

    public void execute(Runnable command) { 
     if (executions.contains(command)) { 
     System.out.println("Previous execution still running"); 
     return; 
     } 
     else { 
     System.out.println("No previous execution"); 
     } 
     super.execute(command);  
     executions.add(command);  
    } 

    protected void afterExecute(Runnable r, Throwable t) { 
     super.afterExecute(r, t);   
     executions.remove(r); 
    }  
} 
+0

Tại sao không sử dụng một HashSet cho TestExecution thay vì HashMap ?? –

Trả lời

2

Một vài ý kiến:

  • trong thực-phương pháp, bạn sẽ nhận được một cuộc đua điều kiện giữa đọc "hành quyết" (containsKey) và văn bản (loại bỏ hoặc đặt) nếu một số chủ đề gọi phương thức này cùng một lúc. Bạn cần phải bọc tất cả các cuộc gọi đến "executions" được cho là nguyên tử trong một khối đồng bộ. (Trong trường hợp của bạn, làm cho phương pháp đồng bộ sẽ làm việc) http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html
  • Bạn nên xử lý tình trạng sử dụng một singleton chứ không phải tĩnh (tức là toàn cầu) biến

Nhưng tôi thực sự muốn biết thêm một chút về thiết kế của bạn để hiểu những gì bạn đang cố gắng đạt được. Tại sao một nhiệm vụ được xếp hàng đợi để thực hiện nhiều lần?

+0

Cảm ơn, tôi đã cập nhật câu hỏi với một số thông tin khác. – Patrick

+1

Và đối với một thiết kế hướng đối tượng hơn, tôi sẽ xem xét phân lớp ThreadPoolExecutor và đặt mã để quản lý bản đồ thực thi trong các hàm execute() và afterExecute() -. (Nó cũng có vẻ như tôi chính xác hơn để gọi execute() thay vì submit(), nhưng thông số không rõ ràng về điểm này) –

+0

Chúc mừng, viết lại mã dựa trên các đề xuất của bạn. Điều đó có vẻ tốt hơn không? – Patrick

3

Dưới đây là làm thế nào tôi sẽ xử lý và tránh trùng lặp

import java.util.Collections; 
import java.util.Set; 
import java.util.concurrent.*; 

public class TestExecution implements Callable<Void> { 
    private static final ThreadPoolExecutor TPE = new ThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>()); 
    private static final Set<TestExecution> TE_SET = Collections.newSetFromMap(new ConcurrentHashMap<TestExecution, Boolean>()); 

    private final String key1; 
    private final String key2; 

    public static void main(String... args) throws InterruptedException { 
     new TestExecution("A", "A").execute(); 
     new TestExecution("A", "A").execute(); 
     new TestExecution("B", "B").execute(); 
     Thread.sleep(8000); 
     new TestExecution("A", "A").execute(); 
     new TestExecution("B", "B").execute(); 
     new TestExecution("B", "B").execute(); 
     TPE.shutdown(); 
    } 

    public TestExecution(String key1, String key2) { 
     this.key1 = key1; 
     this.key2 = key2; 
    } 

    void execute() { 
     if (TE_SET.add(this)) { 
      System.out.println("Handling " + this); 
      TPE.submit(this); 
     } else { 
      System.out.println("... ignoring duplicate " + this); 
     } 
    } 

    public boolean equals(Object obj) { 
     return obj instanceof TestExecution && 
       key1.equals(((TestExecution) obj).key1) && 
       key2.equals(((TestExecution) obj).key2); 
    } 

    public int hashCode() { 
     return key1.hashCode() * 31 + key2.hashCode(); 
    } 

    @Override 
    public Void call() throws InterruptedException { 
     if (!TE_SET.remove(this)) { 
      System.out.println("... dropping duplicate " + this); 
      return null; 
     } 
     System.out.println("Start processing " + this); 
     Thread.sleep(4000); 
     System.out.println("Finish processing " + this); 
     return null; 
    } 

    public String toString() { 
     return key1 + ':' + key2; 
    } 
} 

in

Handling A:A 
... ignoring duplicate A:A 
Handling B:B 
Start processing A:A 
Start processing B:B 
Finish processing A:A 
Finish processing B:B 
Handling A:A 
Handling B:B 
Start processing A:A 
Start processing B:B 
... ignoring duplicate B:B 
Finish processing B:B 
Finish processing A:A 
+0

OK, cảm ơn, một số con trỏ tốt ở đó, đặc biệt là tránh các vấn đề đa luồng bằng cách sử dụng ConcurrentHashMap và ghi đè phương thức toString. Một vài câu hỏi.Tại sao không sử dụng HashSet (là bởi vì không có đối tượng an toàn chủ đề tương đương để sử dụng?) Ngoài ra tôi không hiểu mã để loại bỏ khỏi HashMap. Bạn dường như làm điều này khi bắt đầu quá trình xử lý, không nên ở cuối quá trình xử lý? – Patrick

+1

Bạn có thể sử dụng 'Collections.synchronizedSet (new HashSet())' Đây là luồng an toàn, nhưng không đồng thời. Cho dù nó ở đầu hoặc cuối phụ thuộc vào yêu cầu của bạn. Tốt hơn là thỉnh thoảng nên làm gì đó hai lần hoặc thỉnh thoảng không làm gì đó (vì nhiệm vụ mới được thêm vào giữa nhiệm vụ kết thúc và bị xóa) –

+0

OK, tôi thực sự không biết sự khác biệt giữa 'thread safe' và 'concurrent', có thể Tôi có một số điều tra để làm ở đó. Nhưng để loại bỏ phần tử, nếu tôi không muốn thực hiện lại (hoặc xếp hàng) một công việc đã bắt đầu, thì TE_SET.remove sẽ được chuyển đến cuối hàm Gọi, phải không? Tôi có đúng trong giả định trường hợp "bỏ trùng lặp" là một trường hợp lỗi không? Nếu chúng ta đang ở trong hàm gọi, thì một phần tử luôn luôn phải được ghi vào quyền HashSet không? – Patrick

Các vấn đề liên quan