创建数据库连接并维护多个进程(多处理)
问题描述
与我发布的另一篇文章类似,这篇文章回答了该帖子并创建了一个新问题.
Similar to another post I made, this answers that post and creates a new question.
回顾:我需要更新空间数据库中的每条记录,其中我有一个点数据集覆盖多边形数据集.对于每个点要素,我想分配一个键以将其与它所在的多边形要素相关联.因此,如果我的点纽约市"位于多边形 USA 内,并且对于美国多边形GID = 1",我将为我的点纽约市分配gid_fkey = 1".
Recap: I need to update every record in a spatial database in which I have a data set of points that overlay data set of polygons. For each point feature I want to assign a key to relate it to the polygon feature that it lies within. So if my point 'New York City' lies within polygon USA and for the USA polygon 'GID = 1' I will assign 'gid_fkey = 1' for my point New York City.
好的,这是使用多处理实现的.我注意到使用它的速度提高了 150%,所以它确实有效.但我认为有一堆不必要的开销,因为每条记录都需要一个数据库连接.
Okay so this has been achieved using multiprocessing. I have noticed a 150% increase in speed using this so it does work. But I think there is a bunch of unecessary overhead as one DB connection is required for each record.
代码如下:
import multiprocessing, time, psycopg2
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
print 'Tasks Complete'
self.task_queue.task_done()
break
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, a):
self.a = a
def __call__(self):
pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
pyConn.set_isolation_level(0)
pyCursor1 = pyConn.cursor()
procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a)
pyCursor1.execute(procQuery)
print 'What is self?'
print self.a
return self.a
def __str__(self):
return 'ARC'
def run(self):
print 'IN'
if __name__ == '__main__':
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
num_consumers = multiprocessing.cpu_count() * 2
consumers = [Consumer(tasks, results) for i in xrange(num_consumers)]
for w in consumers:
w.start()
pyConnX = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
pyConnX.set_isolation_level(0)
pyCursorX = pyConnX.cursor()
pyCursorX.execute('SELECT count(*) FROM cities WHERE gid_fkey IS NULL')
temp = pyCursorX.fetchall()
num_job = temp[0]
num_jobs = num_job[0]
pyCursorX.execute('SELECT city_id FROM city WHERE gid_fkey IS NULL')
cityIdListTuple = pyCursorX.fetchall()
cityIdListList = []
for x in cityIdListTuple:
cityIdList.append(x[0])
for i in xrange(num_jobs):
tasks.put(Task(cityIdList[i - 1]))
for i in xrange(num_consumers):
tasks.put(None)
while num_jobs:
result = results.get()
print result
num_jobs -= 1
每个连接看起来在 0.3 到 1.5 秒之间,因为我使用时间"模块进行了测量.
It looks to be between 0.3 and 1.5 seconds per connection as I have measure it with 'time' module.
有没有办法为每个进程建立一个数据库连接,然后只使用 city_id 信息作为变量,我可以在这个打开的游标查询中输入它?这样我就可以说四个进程,每个进程都有一个数据库连接,然后以某种方式将我的 city_id 放入其中进行处理.
Is there a way to make a DB connection per process and then just use the city_id info as a variable that I can feed into a query for the cursor in this open? This way I make say four processes each with a DB connection and then drop me city_id in somehow to process.
解决方案
尝试在 Consumer 构造函数中隔离你的连接创建,然后把它交给执行的 Task:
Try to isolate the creation of your connection in the Consumer constructor, then give it to the executed Task :
import multiprocessing, time, psycopg2
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
self.pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
self.pyConn.set_isolation_level(0)
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
print 'Tasks Complete'
self.task_queue.task_done()
break
answer = next_task(connection=self.pyConn)
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, a):
self.a = a
def __call__(self, connection=None):
pyConn = connection
pyCursor1 = pyConn.cursor()
procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a)
pyCursor1.execute(procQuery)
print 'What is self?'
print self.a
return self.a
def __str__(self):
return 'ARC'
def run(self):
print 'IN'
相关文章