Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 16 additions & 28 deletions src/lavlab/omero/tiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def get_tiles( # pylint: disable=R0914
) -> Generator[
tuple[np.ndarray, tuple[int, int, int, tuple[int, int, int, int]]], None, None
]:
"""Pull tiles from omero faster using a ThreadPoolExecutor!
"""Pull tiles from omero faster using a ThreadPoolExecutor and executor.map!

Parameters
----------
Expand All @@ -46,49 +46,37 @@ def get_tiles( # pylint: disable=R0914
conn = img._conn # pylint: disable=W0212
local = threading.local()

def work(pix_id, zct, coord, res_lvl, rps_bypass):
"""runs inside a threadpool to get multiple tiles at a time"""
def work(args):
"""Runs inside a thread pool to get multiple tiles at a time."""
pix_id, zct, coord, res_lvl, rps_bypass = args
if getattr(local, "rps", None) is None:
# need to prepare a thread-specific rps
# Need to prepare a thread-specific rps
local.rps = conn.c.sf.createRawPixelsStore()
local.rps.setPixelsId(pix_id, rps_bypass)
if res_lvl is None:
res_lvl = local.rps.getResolutionLevels()
res_lvl -= 1
local.rps.setResolutionLevel(res_lvl)
return local.rps.getTile(*zct, *coord), (*zct, coord)
raw_data = local.rps.getTile(*zct, *coord)
return raw_data, (*zct, coord)

def cleanup():
"""cleans out the raw pixels stores after work is done"""
"""Cleans out the raw pixels stores after work is done."""
if hasattr(local, "rps"):
local.rps.close()
delattr(local, "rps")

futures = [
tpe.submit(
work,
img.getPrimaryPixels().getId(),
(z, c, t),
coord,
res_lvl,
rps_bypass,
)
for z, c, t, coord in tiles
]
try:
for future in as_completed(futures):
raw_data, (z, c, t, coord) = future.result()
processed_data = np.frombuffer(raw_data, dtype=np.uint8).reshape(
coord[3], coord[2]
)
# Use executor.map for streamlined processing
pix_id = img.getPrimaryPixels().getId()
args_iter = ((pix_id, (z, c, t), coord, res_lvl, rps_bypass) for z, c, t, coord in tiles)
for raw_data, (z, c, t, coord) in tpe.map(work, args_iter):
processed_data = np.frombuffer(raw_data, dtype=np.uint8).reshape(coord[3], coord[2])
yield processed_data, (z, c, t, coord)
finally:
futures = [
tpe.submit(cleanup)
for x in range(tpe._max_workers) # pylint: disable=W0212
] # pylint: disable=W0212
for future in as_completed(futures):
future.result()
# Cleanup resources
for _ in range(tpe._max_workers): # pylint: disable=W0212
cleanup()


def create_tile_list_2d( # pylint: disable=R0913
Expand Down
Loading