ArcPy+多处理:错误:无法保存栅格数据集

2022-04-10 00:00:00 python python-multiprocessing arcpy

问题描述

我正在使用arcpy对栅格进行一些数字运算,希望使用multiprocessing包来加快速度。基本上,我需要遍历元组列表,使用每个元组进行一些栅格计算,并将一些输出写入文件。我的输入由一个数据栅格(水深测量)、一个定义区域的栅格和一个由两个浮点(水面高程、深度)组成的元组组成。我的过程由一个函数computeplane组成,该函数获取一个元组并运行一系列栅格计算以生成五个栅格(总栅格、滨海栅格、表层栅格、次表层栅格和深栅格栅格),然后为每个栅格调用processtable函数以使用arcpy.sa.ZonalStatisticsAsTable将值写入DBF,使用arcpy.AddField_management添加一些字段,使用arcpy.TableToTable_conversion将DBF转换为CSV,最后使用arcpy.Delete_management删除DBF文件。

基于一些other SO posts,我将我的代码包装在main()中,以便multiprocessing.Pool可以很好地发挥作用。我使用main()创建元组集,使用pool.map进行多处理。我使用tempfile包为DBF文件选择名称以避免名称冲突;CSV文件名保证不会与其他线程冲突。

我已经使用for循环测试了我的代码,它工作得很好,但当我尝试使用pool.map时,我收到

运行时错误:错误010240:无法将栅格数据集保存到 C:UsersMichaelAppDataLocalESRIDesktop10.4SpatialAnalystlesst_ras 使用输出格式网格。

这里发生了什么?这个错误没有出现在代码的非多进程版本中,我也不会写出任何地方的栅格-话又说回来,我不知道arcpy如何处理中间栅格(我肯定不认为它会让它们留在内存中-它们太大了)。我需要告诉arcpy它处理栅格计算的方式才能使多处理工作吗?我已在下面包含了我的python文件。

import arcpy
arcpy.CheckOutExtension("Spatial")
import arcpy.sa
import numpy
import multiprocessing
import tempfile
bathymetry_path = r'C:/GIS workspace/RRE/habitat.gdb/bathymetry_NGVD_meters'
zones_path = r'C:/GIS workspace/RRE/habitat.gdb/markerzones_meters'
table_folder = r'C:/GIS workspace/RRE/zonetables'
bathymetry = arcpy.sa.Raster(bathymetry_path)
zones = arcpy.sa.Raster(zones_path)

def processtable(raster_obj, zone_file, w, z, out_folder, out_file):
  temp_name = "/" + next(tempfile._get_candidate_names()) + ".dbf"
  arcpy.sa.ZonalStatisticsAsTable(zone_file, 'VALUE', raster_obj, out_folder + temp_name, "DATA", "SUM")
  arcpy.AddField_management(out_folder + temp_name, "wse", 'TEXT')
  arcpy.AddField_management(out_folder + temp_name, "z", 'TEXT')
  arcpy.CalculateField_management(out_folder + temp_name, "wse", "'" + str(w) + "'", "PYTHON")
  arcpy.CalculateField_management(out_folder + temp_name, "z", "'" + str(z) + "'", "PYTHON")
  arcpy.TableToTable_conversion(out_folder + temp_name, out_folder, out_file)
  arcpy.Delete_management(out_folder + temp_name)

def computeplane(wsedepth):
  wse = wsedepth[0]
  depth = wsedepth[1]
  total = bathymetry < depth
  littoral = total & ((wse - bathymetry) < 2)
  surface = total & ~(littoral) & ((total + wse - depth) < (total + 2))
  profundal = total & ((total + wse - depth) > (total + 5))
  subsurface = total & ~(profundal | surface | littoral)
  # zonal statistics table names
  total_name = 'total_w' + str(wse) + '_z' + str(depth) + '.csv'
  littoral_name = 'littoral_w' + str(wse) + '_z' + str(depth) + '.csv'
  surface_name = 'surface_w' + str(wse) + '_z' + str(depth) + '.csv'
  subsurface_name = 'subsurface_w' + str(wse) + '_z' + str(depth) + '.csv'
  profundal_name = 'profundal_w' + str(wse) + '_z' + str(depth) + '.csv'
  # compute zonal statistics
  processtable(total, zones, wse, depth, table_folder, total_name)
  processtable(littoral, zones, wse, depth, table_folder, littoral_name)
  processtable(surface, zones, wse, depth, table_folder, surface_name)
  processtable(profundal, zones, wse, depth, table_folder, profundal_name)
  processtable(subsurface, zones, wse, depth, table_folder, subsurface_name)

def main():
  watersurface = numpy.arange(-15.8, 2.7, 0.1)
  # take small subset of the tuples: watersurface[33:34]  
  wsedepths = [(watersurface[x], watersurface[y]) for x in range(watersurface.size)[33:34] for y in range(watersurface[0:x+1].size)]
  pool = multiprocessing.Pool()
  pool.map(computeplane, wsedepths)
  pool.close()
  pool.join()

if __name__ == '__main__':
  main()

更新

更多的调查表明,这与其说是multiprocessing的问题,不如说是ArcGIS处理栅格的方式的问题。栅格代数结果被写入到默认工作空间中的文件中;在我的例子中,我没有指定文件夹,因此arcpy将栅格写入到某个AppData文件夹。ArcGIS根据您的代数表达式使用诸如Lessth、Lessth_1等基本名称。因为我没有指定工作区,所以所有multiprocessing线程都写入此文件夹。虽然单个arcpy进程可以跟踪名称,但多个进程都试图写入相同的栅格名称并撞上其他进程的锁。

我尝试在computeplane开头创建一个随机工作区(文件gdb),然后在结尾将其删除,但arcpy通常不会及时释放其锁定,并且进程在DELETE语句上崩溃。所以我不确定如何继续。


解决方案

我也花了很大力气来制作一个可以工作的脚本。

由于您的数据与我习惯的数据略有不同,我无法测试答案,也不知道它是否有效,但可以说一些东西来打开您的思维,使您能够进行多进程处理,并提出一个新的脚本供您测试。

我看到有些东西可以并行化,有些东西不能并行化,或者从多进程中几乎得不到任何好处。

使用多进程和arcpy的好做法是不使用地理数据库,尝试写入实际的‘.tif’文件或‘in_Memory/Tempfile’,然后在计算后删除,在要并行化的函数内部。不要一开始就为空间而烦恼,减少要测试的数据,当它奏效时,通过删除临时文件来改善它,在中间放一些有用的打印。

回到问题上,脚本中有一些东西是不能使用的,比如传递一个arcpy.Raster对象,而不是传递给栅格的路径,并在函数中读取它。栅格的路径是一个字符串,因此它是可拾取的,这意味着它们可以轻松地传递到一个新进程,必须特别注意这一点。

另一件事是声明一个类似于‘TABLE_FOLDER’的变量,并期望您的所有7核内核都返回到第一个内核中的原始进程,以询问它的路径。 多进程之间不容易共享变量,对于原始代码,您必须创建一个独立运行的函数。主模块向空闲的CPU发送的内容是创建一个新的Python进程,其中几乎只有函数声明、传递给它的参数和必要的导入模块,所有这些都准备好执行,它不能回头询问。

对我起作用的还是在一个非常切分的函数(不是我创建的)中使用apply_async,该函数使用进程作为键,apply_async的结果作为值来创建字典,然后将其放入try/except中,以便如果一个进程发生错误,它不会停止主进程的执行。 大概是这样的:

def computeplane_multi(processList):
    pool = Pool(processes=4, maxtasksperchild=10)
    jobs= {}
    for item in processList:
        jobs[item[0]] = pool.apply_async(computeplane, [x for x in item])
    for item,result in jobs.items():
        try:
            result = result.get()
        except Exception as e:
            print(e)
    pool.close()
    pool.join()

回到您的代码,我对您的尝试(并告诉我)做了一些修改:

import arcpy
import numpy
import multiprocessing
import tempfile

bathymetry_path = r'C:/GIS workspace/RRE/habitat.gdb/bathymetry_NGVD_meters'
zones_path = r'C:/GIS workspace/RRE/habitat.gdb/markerzones_meters'
table_folder = r'C:/GIS workspace/RRE/zonetables'

def computeplane(bathymetry_path,zones_path,wsedepth):
    def processtable(raster_obj, zones_path, w, z, out_folder, out_file):
        zone_file = arcpy.sa.Raster(zones_path)
        temp_name = "/" + next(tempfile._get_candidate_names()) + ".dbf"
        arcpy.sa.ZonalStatisticsAsTable(zone_file, 'VALUE', raster_obj, out_folder + temp_name, "DATA", "SUM")
        arcpy.AddField_management(out_folder + temp_name, "wse", 'TEXT')
        arcpy.AddField_management(out_folder + temp_name, "z", 'TEXT')
        arcpy.CalculateField_management(out_folder + temp_name, "wse", "'" + str(w) + "'", "PYTHON")
        arcpy.CalculateField_management(out_folder + temp_name, "z", "'" + str(z) + "'", "PYTHON")
        arcpy.TableToTable_conversion(out_folder + temp_name, out_folder, out_file)
        arcpy.Delete_management(out_folder + temp_name)
    bathymetry = arcpy.sa.Raster(bathymetry_path)
    wse = wsedepth[0]
    depth = wsedepth[1]
    total = bathymetry < depth
    littoral = total & ((wse - bathymetry) < 2)
    surface = total & ~(littoral) & ((total + wse - depth) < (total + 2))
    profundal = total & ((total + wse - depth) > (total + 5))
    subsurface = total & ~(profundal | surface | littoral)
    # zonal statistics table names
    total_name = 'total_w' + str(wse) + '_z' + str(depth) + '.csv'
    littoral_name = 'littoral_w' + str(wse) + '_z' + str(depth) + '.csv'
    surface_name = 'surface_w' + str(wse) + '_z' + str(depth) + '.csv'
    subsurface_name = 'subsurface_w' + str(wse) + '_z' + str(depth) + '.csv'
    profundal_name = 'profundal_w' + str(wse) + '_z' + str(depth) + '.csv'
    # compute zonal statistics
    processtable(total, zones_path, wse, depth, table_folder, total_name)
    processtable(littoral, zones_path, wse, depth, table_folder, littoral_name)
    processtable(surface, zones_path, wse, depth, table_folder, surface_name)
    processtable(profundal, zones_path, wse, depth, table_folder, profundal_name)
    processtable(subsurface, zones_path, wse, depth, table_folder, subsurface_name)
    print('point processed : {},{}'.format(wsedepth[0],wsedepth[1]))


def computeplane_multi(processList):
    pool = Pool(processes=4, maxtasksperchild=10)
    jobs= {}
    for item in processList:
        jobs[item[0]] = pool.apply_async(computeplane, [x for x in item])
    for item,result in jobs.items():
        try:
            result = result.get()
        except Exception as e:
            print(e)
    pool.close()
    pool.join()


def main():
    watersurface = numpy.arange(-15.8, 2.7, 0.1)
    # take small subset of the tuples: watersurface[33:34]  
    wsedepths = [(watersurface[x], watersurface[y]) for x in range(watersurface.size)[33:34] for y in range(watersurface[0:x+1].size)]
    processList = []
    for i in wsedepths:
        processList.append((bathymetry_path,zones_path,i))
    computeplane_multi(processList)

if __name__ == '__main__':
    main()

相关文章