Concurrent processing
Rasterio affords concurrent processing of raster data. Python’s global
interpreter lock (GIL) is released when calling GDAL’s GDALRasterIO()
function, which means that Python threads can read and write concurrently.
The Numpy library also often releases the GIL, e.g., in applying universal functions to arrays, and this makes it possible to distribute processing of an array across cores of a processor.
This means that it is possible to parallelize tasks that need to be performed for a set of windows/pixels in the raster. Reading, writing and processing can always be done concurrently. But it depends on the hardware and where the bottlenecks are, how much of a speedup can be obtained. In the case that the processing function releases the GIL, multiple threads processing simultaneously can lead to further speedups.
Note
If you wish to do multiprocessing that is not trivially parallelizable accross very large images that do not fit in memory, or if you wish to do multiprocessing across multiple machines. You might want to have a look at dask and in particular this example.
The Cython function below, included in Rasterio’s _example
module,
simulates a GIL-releasing CPU-intensive raster processing function. You can
also easily create GIL-releasing functions by using numba
# cython: boundscheck=False
import numpy as np
def compute(unsigned char[:, :, :] input):
"""reverses bands inefficiently
Given input and output uint8 arrays, fakes an CPU-intensive
computation.
"""
cdef int I, J, K
cdef int i, j, k, l
cdef double val
I = input.shape[0]
J = input.shape[1]
K = input.shape[2]
output = np.empty((I, J, K), dtype='uint8')
cdef unsigned char[:, :, :] output_view = output
with nogil:
for i in range(I):
for j in range(J):
for k in range(K):
val = <double>input[i, j, k]
for l in range(2000):
val += 1.0
val -= 2000.0
output_view[~i, j, k] = <unsigned char>val
return output
Here is the program in examples/thread_pool_executor.py. It is set up in such a way that at most 1 thread is reading and at most 1 thread is writing at the same time. Processing is not protected by a lock and can be done by multiple threads simultaneously.
"""thread_pool_executor.py
Operate on a raster dataset window-by-window using a ThreadPoolExecutor.
Simulates a CPU-bound thread situation where multiple threads can improve
performance.
With -j 4, the program returns in about 1/4 the time as with -j 1.
"""
import concurrent.futures
import multiprocessing
import threading
import rasterio
from rasterio._example import compute
def main(infile, outfile, num_workers=4):
"""Process infile block-by-block and write to a new file
The output is the same as the input, but with band order
reversed.
"""
with rasterio.open(infile) as src:
# Create a destination dataset based on source params. The
# destination will be tiled, and we'll process the tiles
# concurrently.
profile = src.profile
profile.update(blockxsize=128, blockysize=128, tiled=True)
with rasterio.open(outfile, "w", **src.profile) as dst:
windows = [window for ij, window in dst.block_windows()]
# We cannot write to the same file from multiple threads
# without causing race conditions. To safely read/write
# from multiple threads, we use a lock to protect the
# DatasetReader/Writer
read_lock = threading.Lock()
write_lock = threading.Lock()
def process(window):
with read_lock:
src_array = src.read(window=window)
# The computation can be performed concurrently
result = compute(src_array)
with write_lock:
dst.write(result, window=window)
# We map the process() function over the list of
# windows.
with concurrent.futures.ThreadPoolExecutor(
max_workers=num_workers
) as executor:
executor.map(process, windows)
The code above simulates a CPU-intensive calculation that runs faster when
spread over multiple cores using concurrent.futures.ThreadPoolExecutor
compared to the case of one concurrent job (-j 1
),
$ time python examples/thread_pool_executor.py tests/data/RGB.byte.tif /tmp/test.tif -j 1
real 0m4.277s
user 0m4.356s
sys 0m0.184s
we get over 3x speed up with four concurrent jobs.
$ time python examples/thread_pool_executor.py tests/data/RGB.byte.tif /tmp/test.tif -j 4
real 0m1.251s
user 0m4.402s
sys 0m0.168s
If the function that you’d like to map over raster windows doesn’t release the
GIL, you unfortunately cannot simply replace ThreadPoolExecutor
with
ProcessPoolExecutor
,
the DatasetReader
/DatasetWriter
cannot be shared by multiple
processes, which means that each process needs to open the file seperately,
or you can do all the reading and writing from the main thread, as shown in
this next example. This is much less efficient memory wise, however.
arrays = [src.read(window=window) for window in windows]
with concurrent.futures.ProcessPoolExecutor(
max_workers=num_workers
) as executor:
futures = executor.map(compute, arrays)
for window, result in zip(windows, futures):
dst.write(result, window=window)