5

I am relatively new to Python and thought I would give a stab at multiprocessing. I have a script that runs well in IDLE or as an ArcMap toolbox script. After perusing these forums and docs.python, I made an attempt at incorporating my working script into a multiprocessing one. However similar working examples on this forum are, none address data processing as I would like to. I hope it is feasible.

Basically, the script moves through a list of elevation rasters (ERDAS IMG format), extracting cells below a threshold, and finally merging them together. I am currently running the script in the command prompt, as everything else opens new windows, or crashes in an attempt to. The script gives the illusion that it works fine, except it seems to move on to the final merge before waiting for the workers to completely finish.

I have looked at several examples and few seem to have more than a couple of processes in the worker function. None of which are arcpy geoprocesses. So I guess my questions are essentially

  1. Should I be using something other than pool.apply_async, such as pool.map or pool.apply?

  2. Am I properly returning the path of the final polygon to the resultList?

    Import modules

    import arcpy, os, math from arcpy import env from arcpy.sa import * import multiprocessing import time

    Check out licenses

    arcpy.CheckOutExtension("spatial")

    Define functions

    def worker_bee(inputRaster, scratch, addNum):
    (path, lName) = os.path.split(inputRaster) (sName, ext) = os.path.splitext(lName) nameParts = sName.split("_") nameNumber = nameParts[-1]

     # Create scratch subfolder if not exists
     subFolder = scratch + "\\" + nameNumber + "_output" 
     if not os.path.exists(subFolder):os.makedirs(subFolder)
     # Set workspace to subfolder
     arcpy.env.workspace = subFolder
     arcpy.env.overwriteOutput=True
     arcpy.env.extent = "MAXOF"
    

    Local Variables

    Expression = "Shape_Area >= 100"

    poly1 = subFolder + "\poly1.shp" poly2 = subFolder + "\poly2.shp" poly3 = subFolder + "\poly3.shp" poly4 = subFolder + "\poly4.shp" poly5 = subFolder + "\poly5.shp" poly6 = subFolder + "\poly6.shp" poly7 = subFolder + "\poly7.shp" outName = scratch + "\ABL_" + nameNumber + ".shp"

    Perform calculations

    Map Algebra (replace -9999 with 9999)

    inRasterCon = Con(inputRaster, 9999, inputRaster, "Value = -9999")

    Filter DEM to smooth out low outliers

    filterOut = Filter(inRasterCon, "LOW", "DATA")

    Determine raster MINIMUM value and calculate threshold

    filterMinResult = arcpy.GetRasterProperties_management(filterOut, "MINIMUM") filterMin = filterMinResult.getOutput(0) threshold = (float(filterMin) + float(addNum))

    Map Algebra (values under threshold)

    outCon = Con(filterOut <= threshold, 1, "") arcpy.RasterToPolygon_conversion(outCon, poly1, "SIMPLIFY", "Value")

    Dissolve parts

    arcpy.Dissolve_management(poly1, poly2, "", "", "SINGLE_PART", "DISSOLVE_LINES")

    Select parts larger than 100 sq m

    arcpy.Select_analysis(poly2, poly3, Expression)

    Process: Eliminate Polygon Part

    arcpy.EliminatePolygonPart_management(poly4, poly5, "PERCENT", "0 SquareMeters", "10", "CONTAINED_ONLY")

    Select parts larget than 100 sq m

    arcpy.Select_analysis(poly5, poly6, Expression)

    Simplify Polygon

    arcpy.SimplifyPolygon_cartography(poly6, poly7, "BEND_SIMPLIFY", "3 Meters", "3000 SquareMeters", "RESOLVE_ERRORS", "KEEP_COLLAPSED_POINTS")

    Smooth Polygon

    outShape = arcpy.SmoothPolygon_cartography(poly7, outName, "PAEK", "3 Meters", "FIXED_ENDPOINT", "FLAG_ERRORS").getOutput(0)

    Calculations complete

    Delete scratch subfolder

    arcpy.Delete_management(subFolder)

    print("Completed " + outShape + "...") return outShape

    resultList = [] def log_result(result): resultList.append(result)

    if name == "main": arcpy.env.overwriteOutput=True

     # Read in parameters
     inFolder = raw_input("Input Folder: ")#arcpy.GetParameterAsText(0)
     addElev = raw_input("Number of elevation units to add to minimum: ")
    

    Create scratch folder workspace

    scratchFolder = inFolder + "\scratch" if not os.path.exists(scratchFolder):os.makedirs(scratchFolder)

    Local variables

    dec_num = str(float(addElev) - int(float(addElev)))[1:] outNameNum = dec_num.replace(".", "") outMerge = inFolder + "\ABL_" + outNameNum + ".shp"

    Print core usage

    cores = multiprocessing.cpu_count() print("Using " + str(cores) + " cores...")

    #Start timing start = time.clock()

    List input tiles

    arcpy.env.workspace = inFolder inTiles = arcpy.ListRasters("*", "IMG") tileList = [] for tile in inTiles: tileList.append(inFolder + "\" + tile)

    Create a Pool of subprocesses

    pool = multiprocessing.Pool(cores)

    print("Adding jobs to multiprocessing pool...") for tile in tileList: # Add the job to the multiprocessing pool asynchronously pool.apply_async(worker_bee, (tile, scratchFolder, addElev), callback = log_result)

    Clean up worker pool; waits for all jobs to finish

    pool.close() pool.join()

    Get the resulting outputs (paths to successfully computed breakline polygons)

    #print("Getting resulting outputs...") #results = [job.get() for job in jobs]

    Merge the temporary outputs

    print("Merging temporary outputs into shapefile " + outMerge + "...") arcpy.Merge_management(resultList, outMerge)

    Clean up temporary data

    print("Deleting temporary data ...") for result in results: try: arcpy.Delete_management(result) except: pass

    Stop timing and report duration

    end = time.clock() duration = end - start hours, remainder = divmod(duration, 3600) minutes, seconds = divmod(remainder, 60) print("Completed in %d:%d:%f" % (hours, minutes, seconds))

PolyGeo
  • 65,136
  • 29
  • 109
  • 338
Barbarossa
  • 5,797
  • 27
  • 61
  • Just to clarify, if you took out the final merge step the code actually executes without error using multiple cores? – Hornbydd Jun 10 '13 at 20:10
  • It executes without errors with multiple cores, but each worker does not complete all tasks. Each worker stops after Dissolve, not finished the remaining steps of the function. – Barbarossa Jun 10 '13 at 21:38

1 Answers1

2

I think this may be an example of what you are looking for...check out this Esri article on multiprocess geocoding. It says, "Download the tool even if you’re not a geocoder, you may be able to leverage the logic to parallelize other geoprocessing jobs – and if you do, don’t forget to share your own tools!" "... lets you leverage all available CPU cores on your local machine, or all available Server instances on a remote server."

http://blogs.esri.com/esri/arcgis/2011/05/04/multiprocess-geocoding/
Also here, http://blogs.esri.com/esri/arcgis/2011/08/29/multiprocessing/

awesomo
  • 3,421
  • 17
  • 34
  • thank you for the answer. I actually used the second link for reference when writing my code. I'll look at the subprocess example and let you know how it goes. Thanks again. – Barbarossa Jun 13 '13 at 18:10