From 7a5581ddc2ec09e8cf1987d8aa3a1f0ec5155168 Mon Sep 17 00:00:00 2001 From: bench-mcw Date: Wed, 27 Nov 2024 10:36:46 -0600 Subject: [PATCH] fix mem leak --- src/lavlab/omero/tiles.py | 44 ++++++++++++++------------------------- 1 file changed, 16 insertions(+), 28 deletions(-) diff --git a/src/lavlab/omero/tiles.py b/src/lavlab/omero/tiles.py index 5477907..62dc7ca 100644 --- a/src/lavlab/omero/tiles.py +++ b/src/lavlab/omero/tiles.py @@ -21,7 +21,7 @@ def get_tiles( # pylint: disable=R0914 ) -> Generator[ tuple[np.ndarray, tuple[int, int, int, tuple[int, int, int, int]]], None, None ]: - """Pull tiles from omero faster using a ThreadPoolExecutor! + """Pull tiles from omero faster using a ThreadPoolExecutor and executor.map! Parameters ---------- @@ -46,49 +46,37 @@ def get_tiles( # pylint: disable=R0914 conn = img._conn # pylint: disable=W0212 local = threading.local() - def work(pix_id, zct, coord, res_lvl, rps_bypass): - """runs inside a threadpool to get multiple tiles at a time""" + def work(args): + """Runs inside a thread pool to get multiple tiles at a time.""" + pix_id, zct, coord, res_lvl, rps_bypass = args if getattr(local, "rps", None) is None: - # need to prepare a thread-specific rps + # Need to prepare a thread-specific rps local.rps = conn.c.sf.createRawPixelsStore() local.rps.setPixelsId(pix_id, rps_bypass) if res_lvl is None: res_lvl = local.rps.getResolutionLevels() res_lvl -= 1 local.rps.setResolutionLevel(res_lvl) - return local.rps.getTile(*zct, *coord), (*zct, coord) + raw_data = local.rps.getTile(*zct, *coord) + return raw_data, (*zct, coord) def cleanup(): - """cleans out the raw pixels stores after work is done""" + """Cleans out the raw pixels stores after work is done.""" if hasattr(local, "rps"): local.rps.close() delattr(local, "rps") - futures = [ - tpe.submit( - work, - img.getPrimaryPixels().getId(), - (z, c, t), - coord, - res_lvl, - rps_bypass, - ) - for z, c, t, coord in tiles - ] try: - for future in as_completed(futures): - raw_data, (z, c, t, coord) = future.result() - processed_data = np.frombuffer(raw_data, dtype=np.uint8).reshape( - coord[3], coord[2] - ) + # Use executor.map for streamlined processing + pix_id = img.getPrimaryPixels().getId() + args_iter = ((pix_id, (z, c, t), coord, res_lvl, rps_bypass) for z, c, t, coord in tiles) + for raw_data, (z, c, t, coord) in tpe.map(work, args_iter): + processed_data = np.frombuffer(raw_data, dtype=np.uint8).reshape(coord[3], coord[2]) yield processed_data, (z, c, t, coord) finally: - futures = [ - tpe.submit(cleanup) - for x in range(tpe._max_workers) # pylint: disable=W0212 - ] # pylint: disable=W0212 - for future in as_completed(futures): - future.result() + # Cleanup resources + for _ in range(tpe._max_workers): # pylint: disable=W0212 + cleanup() def create_tile_list_2d( # pylint: disable=R0913