Using Spawn and Timers to Manage Concurrent Asynchronous Jobs
Anonym
A common complaint about IDL is its single threaded nature, and how you can’t peg all your CPUs by running a single PRO routine. I’m not here to announce any new multi-threaded version of IDL, but instead discuss a very cool implementation that a colleague showed me that allows you to fork off multiple processes and manage their lifecycles from IDL. You might argue that I could do this in Python, or even a simple shell script to a degree, but without IDL and ENVI’s rich support for data formats, it would difficult to implement a generic solution that can inspect your input data and determine how to partition the problem, nevermind reassemble to results into the final output product.
The main component is IDL’s Spawn procedure, though the use is a little nonstandard. The most common use of Spawn() is to build a command string, and then pass it into Spawn() along with positional parameters for stdout and maybe stderr. The big problems here are that this is a blocking call and there is no parameter for stdin, so you end up writing a file to disk and adding redirection or a pipe to the command string. The former may be overcome with the NOWAIT keyword, but that only works on Windows and I’m not entirely sure how you would capture the stdout or stderr output. Instead we will use the UNIT keyword, which makes the call non-blocking and wraps a bidirectional pipe in a LUN, so you can easily send data to stdin and read the results of stdout. Of course if we follow up the Spawn() call with a “while not EOF(lun)” loop then we gain nothing, as our code still effectively blocks until the external process has terminated and we’re done reading from its stdout. Enter timers, which allow us to spawn multiple processes and revisit them until they are complete. So my colleague created this class AsyncSpawner which will do just that, allowing you to enqueuer as many Spawn requests as you want, with a callback that will be invoked as the process spits stuff out on stdout. We use a class to maintain a catalog of all the job requests that have been made and the state of the running processes.
The AsyncSpawn class has been trimmed down for this post, you would normally want a lot more error checking and configurability. The “started” Boolean member indicates whether it should be running any timers to process events, which is controlled by the appropriately named method Start and Stop. There is a configurable CONCURRENCY parameter passed into Init() that controls how many processes can run simultaneously. The “queue” member is a List which stores the job requests in the order they are received. There are two Hash members, one for the running jobs keyed on their shell’s process id and one for the timers used to query the processes’ stdout pipe, keyed on the timer id.
A client uses Enqueue to request a process to be spawned. The first argument is the exact string you want passed to Spawn(), and the STDIN keyword is set to whatever string or string array you want written to the process’s stdin pipe. The CALLBACK keyword specifies either a procedure name or object reference to a class with the ‘JobCallback’ prodedure. In either case the callback procedure will be passed two arguments, a string message and a Dictionary containing the current job state. The values of the input parameters are all stored in a Dictionary object (for case insensitivity and dot notation access to members), which is then added to the List member queue.
The main method is called ExecutiveLoop, which is initially invoked by Start and then subsequently by a timer using the special userdate value of -1. If the “started” state is changed to False by the Stop method, then it won’t do anything. When “started” is True it will dequeue as many job requests as it can and spawn them, until it hits the concurrency limit. It will then set a new timer to call itself in 0.1 seconds. When it dequeues a job request, it calls Spawn with the UNIT and PID keywords so that it can write and read from stdin and stdout, respectively, as well as to have the shell process id as a unique identifier. Once any stdin data has been sent and flushed to stdin, a Dictionary is created to store the state of the job, which in turn is added to the “running” member Hash keying on the process id. The job state includes to full command string requested, the stdin contents, the callback procedure name/object reference, the LUN of the process, the process id of the process, a List to contain all the stdout contents, and the start time of the process. It is up to clients to not change any of this information when they implement the callback procedure, as that could break things. After storing the job state, the callback procedure is invoked with the “spawned” message, and then a timer is started, using the job’s process id as the userdata.
The AsyncSpawn class implements a HandleTimerEvent method, which is what the timers invoke when they go off. This method takes two arguments, the timer id and userdata, which is either the process id returned from Spawn or -1 to indicate ExecutiveLoop should be invoked again. While it would be cleaner to have one timer and select on all the job LUNs, the FILE_POLL_INPUT() method doesn’t seem to work the way I’d like, at least on Windows. So we resorted to separate timers for each job LUN, as well as one for ExecuteLoop. When a positive value is passed in for userdata, then we invoke PollJobStatus with that process id.
The PollJobStatus method does just what it sounds like it should. It gets the job state Dictionary from the “running” member, and checks if that job’s stdout LUN has reached its EOF marker. If so then we free the LUN, remove the job from “running”, and invoke the callback with the “done” message and the final job state. If we haven’t reached EOF, then we verify if there is activity on the pipe using FILE_POLL_INPUT(), and read a string from stdout. That string is added to the job’s stdout state, and then the callback is invoked with an “updated” message and the current job state.
That pretty much describes the minimalist implementation of AsyncSpawn, here is the code:
function AsyncSpawn::Init, CONCURRENCY=concurrency
compile_opt idl2
self.started = !False
self.concurrency = (N_Elements(concurrency) eq 1) ? (1>concurrency<20) : 1
self.timers = Hash()
self.running = Hash()
self.queue = List()
return, 1
end
pro AsyncSpawn::Cleanup
compile_opt idl2
self.Stop
end
pro AsyncSpawn::Start
compile_opt idl2
Timer.Block
if (~self.started) then begin
self.started = !True
self.ExecutiveLoop
endif
Timer.Unblock
end
pro AsyncSpawn::Stop
compile_opt idl2
Timer.Block
self.started = !False
; kill existing timers
foreach t, self.timers.Keys() do !null = Timer.Cancel(t)
self.timers = Hash()
; free luns for pipes to active jobs
foreach job, self.running, pid do Free_Lun, job.unit
Timer.Unblock
end
pro AsyncSpawn::Enqueue, cmd, STDIN=stdin, CALLBACK=callback
compile_opt idl2
if (~ISA(callback, /SCALAR, /STRING) && ~Obj_Valid(callback)) $
then Message, 'CALLBACK must be scalar string or object'
if (~ISA(stdin, /STRING)) $
then Message, 'STDIN must be string or string array'
self.queue.Add, Dictionary('cmd', cmd, 'stdin', stdin, $
'callback', callback)
end
pro AsyncSpawn::ExecutiveLoop
compile_opt idl2
; this is the main loop, it runs queued jobs and manages the
; polling timers that watch for results on the jobs
while (self.started && ~self.queue.IsEmpty() && $
(self.running.Count() lt self.concurrency)) do begin
req = self.queue.Remove(0)
Spawn, req.cmd, UNIT=u, PID=pid, /NOSHELL, /HIDE
; print to the pipe, it will be read by stdin in the spawned process
printf, u, req.stdin
flush, u
; add job to running catalog
job = Dictionary('cmd', req.cmd, 'stdin', req.stdin, 'unit', u, $
'callback', req.callback, 'pid', pid, $
'stdout', List(), 'start', SysTime(/Seconds))
self.running[pid] = job
if (Obj_Valid(req.callback)) then begin
Call_Method, 'JobCallback', req.callback, 'spawned', job
endif else begin
Call_Procedure, req.callback, 'spawned', job
endelse
; if not stopped, set a timer to inspect this new job
if (self.started) then self.timers[Timer.Set(0.1, self, pid)] = !True
endwhile
; if not stopped, set a timer to call myself
if (self.started) then self.timers[Timer.Set(0.1, self, -1)] = !True
end
pro AsyncSpawn::PollJobStatus, pid
compile_opt idl2
job = self.running[pid]
if (EOF(job.unit)) then begin
Free_Lun, job.unit
self.running.Remove, pid
if (Obj_Valid(job.callback)) then begin
Call_Method, 'JobCallback', job.callback, 'done', job
endif else begin
Call_Procedure, job.callback, 'done', job
endelse
return
endif
; check if there is content on this job's stdout
if (File_Poll_Input(job.unit, TIMEOUT=0) gt 0) then begin
s = ''
readf, job.unit, s
job.stdout.Add, s
if (Obj_Valid(job.callback)) then begin
Call_Method, 'JobCallback', job.callback, 'updated', job
endif else begin
Call_Procedure, job.callback, 'updated', job
endelse
endif
; if not stopped, set a timer to reinspect this job
if (self.started) then self.timers[Timer.Set(0.1, self, pid)] = !True
end
pro AsyncSpawn::HandleTimerEvent, id, pid
compile_opt idl2
self.timers.Remove, id
if (pid eq -1) then begin
self.ExecutiveLoop
endif else begin
self.PollJobStatus, pid
endelse
end
pro AsyncSpawn__define
compile_opt idl2
!null = {AsyncSpawn, $
started: !False, $
concurrency: 0L, $
timers: Hash(), $
running: Hash(), $
queue: List() $
}
end
The next step is to show this class in action. I created a simple IDL procedure which can be run from the command line using “idl –e” syntax (https://www.nv5geospatialsoftware.com/docs/command_line_options_for.html#-e). It’s called ProcessRasterTile and it has three input keywords: URI is a raster filename to load, SUB_RECT is a spatial subset to process, and INDEX is the spectral index name to calculate. It will launch ENVI, load the raster, spatially subset it, and calculate the requested spectral index. It then gets a temporary filename and exports the spectral index raster to it, and prints the filename and a special “OUTPUT=” prefix that can be identified by the spawn callbacks. This isn’t a process you really need parallelize, but this is just a short example of what is possible with this approach. I did add some error handling, where any error message from any API call is printed to stdout before returning.
pro ProcessRasterTile, URI=uri, SUB_RECT=subRect, INDEX=index
compile_opt idl2
nv = ENVI(/HEADLESS)
oRaster = nv.OpenRaster(uri, ERROR=error)
if (StrLen(error) gt 0) then begin
print, 'Error opening raster: ' + error
return
endif
oSubRaster = ENVISubsetRaster(oRaster, SUB_RECT=subRect, ERROR=error)
if (StrLen(error) gt 0) then begin
print, 'Error subsetting raster: ' + error
return
endif
oIndexRaster = ENVISpectralIndexRaster(oSubRaster, INDEX=index, ERROR=error)
if (StrLen(error) gt 0) then begin
print, 'Error calculating spectral index: ' + error
return
endif
outFilename = nv.GetTemporaryFilename('dat')
oIndexRaster.Export, outFilename, 'ENVI'
print, 'OUTPUT=' + outFilename
nv.close
end
The next part is callback, which I chose to implement as a class since I need to maintain state of multiple processes until they are all complete, and I didn’t want to use common block variables to do that. I called this class SpawnCallbackHandler, since that is its job. This class has a Boolean “done” state, which was needed to avoid garbage collection in my simple test driver. It also has two Hash members, “jobs” is for active jobs and “finishedJobs” is for jobs that have completed. The main purposed of this class is its JobCallback method, which is invoked by the SpawnAsync class. When it gets a “spawned” message it adds new job to its “jobs” hash, using the PID as the key. When it gets an “updated” message it updates the job Dictionary in the “jobs” hash, and when it gets a “done” message it moves the job Dictionary from the “jobs” hash to the “finishedJobs” hash. There is some error checking to make sure the PIDs are unique and only in the correct hash. It then checks if there are no running jobs, only finished jobs, and in that case it calls its AssembleFinalProduct method, a more robust version would make this a callback procedure that takes the “finishedJobs” hash as input.
The AssembleFinalProduct method iterates over all the finished jobs, looking for the line in their stdout list that has that special “OUTPUT=” prefix. Each of these are catalogued, so they can then be passed into ENVI::OpenRaster() to load all the NDVI tiles. I then create an ENVIMosaicRaster, which is exported to another temporary filename that is printed out so you know what it is.
function SpawnCallbackHandler::Init
compile_opt idl2
self.done = !False
self.jobs = Hash()
self.finishedJobs = Hash()
return, 1
end
pro SpawnCallbackHandler::Cleanup
compile_opt idl2
foo = 1
end
function SpawnCallbackHandler::Done
compile_opt idl2
return, self.done
end
pro SpawnCallbackHandler::JobCallback, msg, job
compile_opt idl2
case msg.ToLower() of
'spawned': begin
if (self.jobs.HasKey(job.pid) || self.finishedJobs.HasKey(job.pid)) then begin
Message, 'Already know about job with PID ' + StrTrim(job.pid, 2)
endif
self.jobs[job.pid] = job
end
'updated': begin
if (~self.jobs.HasKey(job.pid) || self.finishedJobs.HasKey(job.pid)) then begin
Message, 'Update to unknown job with PID ' + StrTrim(job.pid, 2)
endif
self.jobs[job.pid] = job
end
'done': begin
if (~self.jobs.HasKey(job.pid) || self.finishedJobs.HasKey(job.pid)) then begin
Message, 'Completion of unknown job with PID ' + StrTrim(job.pid, 2)
endif
self.jobs.Remove, job.pid
self.finishedJobs[job.pid] = job
end
endcase
; are they all done yet?
if (self.jobs.IsEmpty() && ~self.finishedJobs.IsEmpty()) then begin
self.AssembleFinalProduct
endif
end
pro SpawnCallbackHandler::AssembleFinalProduct
compile_opt idl2
jobUrls = List()
foreach job, self.finishedJobs do begin
foreach line, job.stdout do begin
if (line.StartsWith('OUTPUT=')) then begin
jobUrls.Add, (line.Substring(7)).Trim()
endif
endforeach
endforeach
; launch ENVI and load all the output rasters
nv = ENVI(/HEADLESS)
tiles = List()
foreach url, jobUrls do begin
tiles.Add, nv.OpenRaster(url)
endforeach
; mosaic the rasters and export the result
mosaic = ENVIMosaicRaster(tiles.ToArray())
outFilename = nv.GetTemporaryFilename('dat')
mosaic.Export, outFilename, 'ENVI'
print, outFilename
nv.close
self.done = !True
end
pro SpawnCallbackHandler__define
compile_opt idl2
!null = {SpawnCallbackHandler, $
done: !False, $
jobs: Hash(), $
finishedJobs: Hash() $
}
end
My test driver code for this is as follows. Since this example doesn’t have any persistence, I had to use the SpawnCallbackHandler::Done() method to keep it and the AsyncSpawn object alive until everything was complete. If you used an approach like this in a widget based application you wouldn’t need that method.
pro test_parallelRasterTile
compile_opt idl2
filename = 'C:\Program Files\Exelis\ENVI53\data\qb_boulder_msi'
subRect = [[0, 0, 511, 511], [512, 0, 1023, 511], $
[0, 512, 511, 1023], [512, 512, 1023, 1023]]
index = 'NDVI'
spawner = AsyncSpawn(CONCURRENCY=4)
handler = SpawnCallbackHandler()
for i = 0, 3 do begin
cmd = 'C:\Program Files\Exelis\IDL85\bin\bin.x86_64\idl.exe -e ' + $
'"ProcessRasterTile, URI=''' + filename + ''', SUB_RECT=[' + $
StrJoin(StrTrim(Reform(subRect[*,i]), 2), ',') + '], INDEX=''' + index + '''"'
spawner.Enqueue, cmd, STDIN='', CALLBACK=handler
endfor
spawner.Start
while (~handler.Done()) do begin
Wait, 0.5
endwhile
end