并行和顺序处理 Java 任务
In my program, the user can trigger different tasks via an interface, which take some time to process. Therefore they are executed by threads. So far I have implemented it so that I have an executer with one thread that executes all tasks one after the other. But now I would like to parallelize everything a little bit.
i.e. I would like to run tasks in parallel, except if they have the same path, then I want to run them sequentially. For example, I have 10 threads in my pool and when a task comes in, the task should be assigned to the worker which is currently processing a task with the same path. If no task with the same path is currently being processed by a worker, then the task should be processed by a currently free worker.
Additional info: A task is any type of task that is executed on a file in the local file system. For example, renaming a file. Therefore, the task have the attribute path
. And I don't want to execute two tasks on the same file at the same time, so such tasks with the same paths should be performed sequentially.
Here is my sample code but there is work to do:
One of my problems is, I need a safe way to check if a worker is currently running and get the path of the currently running worker. By safe I mean, that no problems of simultaneous access or other thread problems occur.
public class TasksOrderingExecutor {
public interface Task extends Runnable {
//Task code here
String getPath();
}
private static class Worker implements Runnable {
private final LinkedBlockingQueue<Task> tasks = new LinkedBlockingQueue<>();
//some variable or mechanic to give the actual path of the running tasks??
private volatile boolean stopped;
void schedule(Task task) {
tasks.add(task);
}
void stop() {
stopped = true;
}
@Override
public void run() {
while (!stopped) {
try {
Task task = tasks.take();
task.run();
} catch (InterruptedException ie) {
// perhaps, handle somehow
}
}
}
}
private final Worker[] workers;
private final ExecutorService executorService;
/**
* @param queuesNr nr of concurrent task queues
*/
public TasksOrderingExecutor(int queuesNr) {
Preconditions.checkArgument(queuesNr >= 1, "queuesNr >= 1");
executorService = new ThreadPoolExecutor(queuesNr, queuesNr, 0, TimeUnit.SECONDS, new SynchronousQueue<>());
workers = new Worker[queuesNr];
for (int i = 0; i < queuesNr; i++) {
Worker worker = new Worker();
executorService.submit(worker);
workers[i] = worker;
}
}
public void submit(Task task) {
Worker worker = getWorker(task);
worker.schedule(task);
}
public void stop() {
for (Worker w : workers) w.stop();
executorService.shutdown();
}
private Worker getWorker(Task task) {
//check here if a running worker with a specific path exists? If yes return it, else return a free worker. How do I check if a worker is currently running?
return workers[task.getPath() //HERE I NEED HELP//];
}
}
解决方案
all you need is a hash map of actors, with file path as a key. Different actors would run in parallel, and concrete actor would handle tasks sequentially.
Your solution is wrong because Worker class uses blocking operation take
but is executed in a limited thread pool, which may lead to a thread starvation (a kind of deadlock). Actors do not block when waiting for next message.
import org.df4j.core.dataflow.ClassicActor;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
public class TasksOrderingExecutor {
public static class Task implements Runnable {
private final String path;
private final String task;
public Task(String path, String task) {
this.path = path;
this.task = task;
}
//Task code here
String getPath() {
return path;
}
@Override
public void run() {
System.out.println(path+"/"+task+" started");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
System.out.println(path+"/"+task+" stopped");
}
}
static class Worker extends ClassicActor<Task> {
@Override
protected void runAction(Task task) throws Throwable {
task.run();
}
}
private final ExecutorService executorService;
private final Map<String,Worker> workers = new HashMap<String,Worker>(){
@Override
public Worker get(Object key) {
return super.computeIfAbsent((String) key, (k) -> {
Worker res = new Worker();
res.setExecutor(executorService);
res.start();
return res;
});
}
};
/**
* @param queuesNr nr of concurrent task queues
*/
public TasksOrderingExecutor(int queuesNr) {
executorService = ForkJoinPool.commonPool();
}
public void submit(Task task) {
Worker worker = getWorker(task);
worker.onNext(task);
}
public void stop() throws InterruptedException {
for (Worker w : workers.values()) {
w.onComplete();
}
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
}
private Worker getWorker(Task task) {
//check here if a runnig worker with a specific path exists? If yes return it, else return a free worker. How do I check if a worker is currently running?
return workers.get(task.getPath());
}
public static void main(String[] args) throws InterruptedException {
TasksOrderingExecutor orderingExecutor = new TasksOrderingExecutor(20);
orderingExecutor.submit(new Task("path1", "task1"));
orderingExecutor.submit(new Task("path1", "task2"));
orderingExecutor.submit(new Task("path2", "task1"));
orderingExecutor.submit(new Task("path3", "task1"));
orderingExecutor.submit(new Task("path2", "task2"));
orderingExecutor.stop();
}
}
The protocol of execution shows that tasks with te same key are executed sequentially and tasks with different keys are executed in parallel:
path3/task1 started
path2/task1 started
path1/task1 started
path3/task1 stopped
path2/task1 stopped
path1/task1 stopped
path2/task2 started
path1/task2 started
path2/task2 stopped
path1/task2 stopped
I used my own actor library DF4J, but any other actor library can be used.
相关文章