Tôi muốn thực hiện một số tác vụ khác nhau song song, nhưng có khái niệm rằng nếu tác vụ đã được xếp hàng hoặc đang xử lý, nó sẽ không được xếp hàng đợi. Tôi đã đọc một chút về Java API và đã đưa ra mã dưới đây, mà dường như làm việc. Ai cũng có thể làm sáng tỏ liệu phương pháp tôi đang sử dụng có phải là cách tiếp cận tốt nhất hay không. Bất kỳ mối nguy hiểm (an toàn thread?) Hoặc cách tốt hơn để làm điều này? Mã là như sau:Chủ đề Xử lý hồ sơ 'trùng lặp' nhiệm vụ
import java.util.HashMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestExecution implements Runnable {
String key1;
String key2;
static HashMap<TestExecution, Future<?>> executions = new HashMap<TestExecution, Future<?>>();
static LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
static ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, q);
public static void main(String[] args) {
try {
execute(new TestExecution("A", "A"));
execute(new TestExecution("A", "A"));
execute(new TestExecution("B", "B"));
Thread.sleep(8000);
execute(new TestExecution("B", "B"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static boolean execute(TestExecution e) {
System.out.println("Handling "+e.key1+":"+e.key2);
if (executions.containsKey(e)) {
Future<?> f = (Future<?>) executions.get(e);
if (f.isDone()) {
System.out.println("Previous execution has completed");
executions.remove(e);
} else {
System.out.println("Previous execution still running");
return false;
}
}
else {
System.out.println("No previous execution");
}
Future<?> f = tpe.submit(e);
executions.put(e, f);
return true;
}
public TestExecution(String key1, String key2) {
this.key1 = key1;
this.key2 = key2;
}
public boolean equals(Object obj)
{
if (obj instanceof TestExecution)
{
TestExecution t = (TestExecution) obj;
return (key1.equals(t.key1) && key2.equals(t.key2));
}
return false;
}
public int hashCode()
{
return key1.hashCode()+key2.hashCode();
}
public void run() {
try {
System.out.println("Start processing "+key1+":"+key2);
Thread.sleep(4000);
System.out.println("Finish processing "+key1+":"+key2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Theo dõi bình luận dưới đây:
Kế hoạch này là kích hoạt các nhiệm vụ để thực hiện sẽ được xử lý bởi cron gọi dịch vụ web RESTful. Ví dụ dưới đây là thiết lập cho một nhiệm vụ được kích hoạt lúc 9:30 mỗi ngày, cộng với một nhiệm vụ được lên lịch sau mỗi hai phút.
0/2 * * * * restclient.pl key11 key12
30 09 * * * restclient.pl key21 key22
Trong trường hợp này, nếu khóa công việc11: key12 đang chạy hoặc đã xếp hàng để chạy, tôi không muốn xếp hàng một phiên bản khác. Tôi hiểu rằng chúng tôi có các tùy chọn khác để lên lịch, tuy nhiên chúng tôi có xu hướng sử dụng cron cho các tác vụ khác, vì vậy tôi muốn cố gắng giữ lại điều này.
Cập nhật thứ hai. Đáp lại các bình luận cho đến nay tôi đã viết lại mã, bạn có thể bình luận về bất kỳ vấn đề nào với giải pháp được cập nhật sau đây không?
import java.util.concurrent.LinkedBlockingQueue;
public class TestExecution implements Runnable {
String key1;
String key2;
static TestThreadPoolExecutor tpe = new TestThreadPoolExecutor(new LinkedBlockingQueue<Runnable>());
public static void main(String[] args) {
try {
tpe.execute(new TestExecution("A", "A"));
tpe.execute(new TestExecution("A", "A"));
tpe.execute(new TestExecution("B", "B"));
Thread.sleep(8000);
tpe.execute(new TestExecution("B", "B"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public TestExecution(String key1, String key2) {
this.key1 = key1;
this.key2 = key2;
}
public boolean equals(Object obj)
{
if (obj instanceof TestExecution)
{
TestExecution t = (TestExecution) obj;
return (key1.equals(t.key1) && key2.equals(t.key2));
}
return false;
}
public int hashCode()
{
return key1.hashCode()+key2.hashCode();
}
public void run() {
try {
System.out.println("Start processing "+key1+":"+key2);
Thread.sleep(4000);
System.out.println("Finish processing "+key1+":"+key2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestThreadPoolExecutor extends ThreadPoolExecutor {
Set<Runnable> executions = Collections.synchronizedSet(new HashSet<Runnable>());
public TestThreadPoolExecutor(LinkedBlockingQueue<Runnable> q) {
super(2, 5, 1, TimeUnit.MINUTES, q);
}
public void execute(Runnable command) {
if (executions.contains(command)) {
System.out.println("Previous execution still running");
return;
}
else {
System.out.println("No previous execution");
}
super.execute(command);
executions.add(command);
}
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
executions.remove(r);
}
}
Tại sao không sử dụng một HashSet cho TestExecution thay vì HashMap ?? –