Java ReentrantLock 和条件 |生产者完成工作,消费者陷入困境
一般信息:三个读取器线程以块的形式从文件中随机读取,其中每个块都有一个 ID,并且它们写入普通的 ArrayList.只要将具有所需 ID 的块添加到列表中,写入器线程就会写入输出文件.
General Information: Three reader-threads read randomly from a file in chunks where each chunk has an ID and they write to a normal ArrayList. A writer-thread writes to an outputfile as soon as a chunk with the needed ID is added to the list.
出于这个原因,我编写了一个 BlockingChunkList,它应该同步 add() 和 getNextChunk() 方法.
For that reason I have written a BlockingChunkList which should synchronize the add() and getNextChunk() methods.
在一种情况下它适用于同步 + 通知 + notifyAll 和另一种情况下的同步列表.
It works for me with synchronized + notify + notifyAll in one case and with a synchronized list in another.
当我使用 ReentrantLock 和 Condition 时,我无法做到这一点.writer-thread 只写了四个块,然后就卡住了.
I don't manage to do it when I use ReentrantLock and Condition. The writer-thread only writes four chunks and then he gets stuck.
为什么它可能不起作用:我怀疑一旦读者完成,作者就无法取回锁定.但是我希望每次有东西要写(available = true)时,都应该调用写线程.当 available 为 true 时,他们似乎会忽略 hasAccess.await().
Why it might not work: I have the suspicion that once the readers are done the writer doesn't get the lock back. However i would expect that everytime when there is something to write (available=true) then the writer thread should be called. They seem to ignore hasAccess.await() when available is true.
应该如何工作:读取线程仅调用 add 方法,并且仅在有要写入的内容(可用)时才释放写入线程.当 available=true 时,它们也会阻止自己.当作者通过调用 hasAccess.signalAll() 写了一些东西时,这个锁就会被释放写入线程只调用 getNextChunk() 方法,并在写入块时释放其他线程.当 available=false 时他会屏蔽自己,并被读者释放.
How it should work: The reading threads only call the add method and they release the writing thread only when there is something to write (available). They also block themselves when available=true. This lock is released when the writer has written something by calling hasAccess.signalAll() The writing thread only calls the getNextChunk() method and he releases the other threads when he wrote the chunk. He blocks himself when available=false and he is released by the readers.
问题:读取线程完成它们的工作,写入线程只写入前 4 个块.我希望编写器总是在 available=true 时被调用.
Question: The reading threads finish their work and the writing thread only writes the first 4 chunks. I expect that the writer is always called when available=true.
我不需要一个确切的解决方案,因为我认为我遗漏了一些东西,所以也很感谢你的提示.那么:我错过了什么?
I don't need an exact solution, a hint is appreciated as well since I think I am missing something. So: What am I missing ?
谢谢
并发仅在发布的类中处理.主要方法仅启动踏板.编辑 2:这是我第一次尝试并发.我知道 ArrayList 不是线程安全的.我想通过使用 ReentrantLock 和 Condition 来理解这些概念.BASIC 的想法是阻止读取器或写入器,无论可用是真还是假.
Concurrency is handeled only in the posted class. The main-method only starts the treads. EDIT 2: This is one of my first shots at concurrency. I know that ArrayList is not thread safe. I would like to make it so by using ReentrantLock and Condition in order to understand the concepts. The BASIC idea is to block either reader or writer whether available is true or false.
import java.util.ArrayList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BlockingChunkQueue {
private final ArrayList<Chunk> chunks;
private volatile int currentID = 0; // first ID for which the writer needs to wait
private volatile boolean available; // true if the ID is in the list
private final Lock listLock;
private final Condition hasAccess;
public BlockingChunkQueue(){
chunks = new ArrayList<>();
listLock = new ReentrantLock();
hasAccess = listLock.newCondition();
available = false;
}
/*
* While there is a chunk which might be written to the output (=correct ID) then wait for access.
* Then add the chunk and if the added chunk has the ID for which the other thread is waiting, then,
* then available = true and signal the waiting thread.
*
*
*/
public void add(Chunk chunk) throws InterruptedException{
listLock.lock();
try{
while(available){
hasAccess.await(); // reader block yourself until you get the lock back
}
chunks.add(chunk);
if(chunk.getId() == currentID){
available = true;
hasAccess.signalAll();
}
}finally{
listLock.unlock();
}
}
/*
* If the chunk is not available then wait.
* If it becomes available then increment the ID, remove it from the list, signal the reader-threads and set available false.
* return the chunk which will be written to the output.
*/
public Chunk getNextChunk() throws InterruptedException {
listLock.lock();
try{
while(!available){
hasAccess.await(); // block yourself until u can write something
}
for(Chunk c : chunks){
if(c.getId() == currentID){
currentID++;
chunks.remove(c);
hasAccess.signalAll(); //signal the readers
available = false;
return c;
}
}
}finally{
listLock.unlock();
}
return null;
}
public int getcurrentID(){ return currentID;}
public boolean isEmpty(){ return chunks.isEmpty(); }
public int size(){return chunks.size();}
}
解决方案:处理线程没有任何问题.事实证明这是我这边的一个逻辑错误.写入线程卡住了,因为他没有机会检查必要的 ID,因为写入者随机读取块.感谢您提供有用的答案.
SOLUTION: There was nothing wrong with handling the threads. It turned out to be a logical error from my side. The writing thread gets stuck because he doesn't get the chance to check for the necessary IDs because the writers read the chunks randomly. Thanks for the helpfull answer.
推荐答案
这里有几个问题.
volatile 变量 available
仅在 lock
被持有时被读取或改变的目的是什么?
What is the purpose of the volatile variable available
which is only read or mutated while the lock
is held?
isEmtpy
和 size
方法调用 chunks
上的方法,而不持有 lock
.ArrayList
不是线程安全的.这些调用的行为无法预测.
The isEmtpy
and size
methods call methods on chunks
without holding the lock
. ArrayList
is not thread-safe. The behavior of these calls cannot be predicted.
您可能会卡住的一个原因是如果在调用 getNextChunk 之前添加了多个块.
A reason you might get stuck is if multiple chunks get added before getNextChunk is called.
在您的循环中查找您设置为 false 的当前",但它实际上可能已经在列表中:
In your loop to find the "current" you set available to false, but it may actually already be in the list:
for(Chunk c : chunks){
if(c.getId() == currentID){
currentID++;
chunks.remove(c);
hasAccess.signalAll(); //signal the readers
available = false;
return c;
}
}
考虑将块存储在 Map
中,以便通过标识符轻松查看块是否存在.
Consider storing your chunks in a Map<Integer,Chunk>
so that can easily see if a chunk is present by the identifier.
相关文章