python threading 具有线程安全的队列生产者-消费者
问题描述
我正在使用线程和队列来获取 url 并存储到数据库.
我只想要一个线程来做存储工作.
所以我编写代码如下:
I am using threading and Queue to fetch url and store to database.
I just want one thread to do storing job.
so I write code as below:
import threading
import time
import Queue
site_count = 10
fetch_thread_count = 2
site_queue = Queue.Queue()
proxy_array=[]
class FetchThread(threading.Thread):
def __init__(self,site_queue,proxy_array):
threading.Thread.__init__(self)
self.site_queue = site_queue
self.proxy_array = proxy_array
def run(self):
while True:
index = self.site_queue.get()
self.get_proxy_one_website(index)
self.site_queue.task_done()
def get_proxy_one_website(self,index):
print '{0} fetched site :{1}
'.format(self.name,index)
self.proxy_array.append(index)
def save():
while True:
if site_queue.qsize() > 0:
if len(proxy_array) > 10:
print 'save :{0} to database
'.format(proxy_array.pop())
else:
time.sleep(1)
elif len(proxy_array) > 0:
print 'save :{0} to database
'.format(proxy_array.pop())
elif len(proxy_array) == 0:
print 'break'
break
else:
print 'continue'
continue
def start_crawl():
global site_count,fetch_thread_count,site_queue,proxy_array
print 'init'
for i in range(fetch_thread_count):
ft = FetchThread(site_queue,proxy_array)
ft.setDaemon(True)
ft.start()
print 'put site_queue'
for i in range(site_count):
site_queue.put(i)
save()
print 'start site_queue join'
site_queue.join()
print 'finish'
start_crawl()
执行输出:
init
put site_queue
Thread-1 fetched site :0
Thread-2 fetched site :1
Thread-1 fetched site :2
Thread-2 fetched site :3
Thread-1 fetched site :4
Thread-2 fetched site :5
Thread-1 fetched site :6
Thread-2 fetched site :7
Thread-1 fetched site :8
Thread-2 fetched site :9
save :9 to database
save :8 to database
save :7 to database
save :6 to database
save :5 to database
save :4 to database
save :3 to database
save :2 to database
save :1 to database
save :0 to database
break
start site_queue join
finish
[Finished in 1.2s]
为什么 save()
函数在 site_queue.join()
之后运行,它写在 save()
之后.
我也用线程函数替换了 save()
,但它也不起作用.
这是否意味着我必须将 proxy_array=[]
更改为 proxy_queue=Queue.Queue()
,然后才能使用标题存储数据?
我只想要一个theads来做这个,没有其他theads会从proxy_array
获取数据,我为什么要加入呢?使用Queue似乎很奇怪.
有没有更好的解决方案?
Why save()
function run after site_queue.join()
which written after save()
.
I also have substituted save()
with a thread function ,but it doesn't work too.
Does it mean I must change proxy_array=[]
to proxy_queue=Queue.Queue()
,then I can use theading to store data?
I just want one thead to do this,and there is not any other theads would get data from proxy_array
, why should I join it?Using Queue seems very weird.
Is there any better solusion?
更新:
我不想等到所有 FetchThreads 完成他们的工作.我想在 fethcing 时保存数据,它会快得多.我希望结果如下所示(因为我使用 array.pop(),所以保存 0 可能会在稍后出现,这只是一个易于理解的示例.):
UPDATE:
I don't want to wait until all the FetchThreads complete their work.I want to save data while fethcing,it would be much faster.
I want the result be something like below(Becuase I use array.pop(),so save 0 maybe appear very later,this is just a example for easily understand. ):
Thread-2 fetched site :1
Thread-1 fetched site :2
save :0 to database
Thread-2 fetched site :3
Thread-1 fetched site :4
save :2 to database
save :3 to database
Thread-2 fetched site :5
Thread-1 fetched site :6
save :4 to database
.......
某人的 UPDATE2 有以下相同的问题:
问题:
正如我在上面所说的那样,没有任何其他线程会从 proxy_array 获取数据.
我只是无法想象为什么它会破坏线程安全?
答:
生产者-消费者问题在misha的回答中,我仔细阅读后明白了.
question:
As I saying as above context,there is not any other theads would get data from proxy_array.
I just can not imagine why it would break thread-safe?
answer:
producer-consumer problem in misha's answer, I understand after reading it carefully.
问题:
还有一个问题,程序主线程是否可以使用 FetchThreads 充当消费者(换句话说,不需要创建 StoreThread)
这是我想不通的,找到答案后我会更新.
question:
And one more asking,if the Program main thread can play as comsumer with FetchThreads (in another word,needn't create StoreThread)
this is what I cannot figure out,I would update after finded the answer.
解决方案
我建议你阅读 生产者-消费者问题.您的生产者是获取线程.您的消费者是 save
功能.如果我理解正确,您希望消费者尽快保存获取的结果.为此,生产者和消费者必须能够以某种线程安全的方式(例如队列)进行通信.
I recommend you read about the producer-consumer problem. Your producers are the fetch threads. Your consumer is the save
function. If I understand correctly, you want the consumer to save the fetched result as soon as its available. For this to work, the producer and consumer must be able to communicate in some thread-safe way (e.g. a queue).
基本上,您需要另一个队列.它将替换 proxy_array
.您的 save
函数将如下所示:
Basically, you need another queue. It would replace proxy_array
. Your save
function will look something like this:
while True:
try:
data = fetch_data_from_output_queue()
save_to_database(data)
except EmptyQueue:
if not stop_flag.is_set():
# All done
break
time.sleep(1)
continue
这个 save
函数需要在它自己的线程中运行.stop_flag
是一个 事件设置 你加入你的 fetch 线程.
This save
function will need to run in its own thread. stop_flag
is an Event that gets set after you join your fetch threads.
从高层次上看,您的应用程序将如下所示:
From a high level, your application will look like this:
input_queue = initialize_input_queue()
ouput_queue = initialize_output_queue()
stop_flag = Event()
create_and_start_save_thread(output_queue) # read from output queue, save to DB
create_and_start_fetch_threads(input_queue, output_queue) # get sites to crawl from input queue, push crawled results to output_queue
join_fetch_threads() # this will block until the fetch threads have gone through everything in the input_queue
stop_flag.set() # this will inform the save thread that we are done
join_save_thread() # wait for all the saving to complete
相关文章