Skip to content

parallel gets for list slices #111

@MRossol

Description

@MRossol

Why this feature is necessary:
Even with the slicing tricks implemented here:

rex/rex/resource.py

Lines 133 to 445 in ae280db

@staticmethod
def _check_slice(ds_slice):
"""
Check ds_slice for lists, ensure lists are of the same len
Parameters
----------
ds_slice : tuple
Tuple of (int, slice, list, ndarray) of what to extract from ds,
each arg is for a sequential axis
Returns
-------
list_len : int | None
List lenght, None if none of the args are a list | ndarray
multi_list : bool
Flag if multiple list are provided in ds_slice
"""
multi_list = False
list_len = []
for s in ds_slice:
if isinstance(s, (list, np.ndarray)):
list_len.append(len(s))
if list_len:
if len(list_len) > 1:
multi_list = True
list_len = list(set(list_len))
if len(list_len) > 1:
msg = ('shape mismatch: indexing arrays could not be '
'broadcast together with shapes {}'
.format(['({},)'.format(ln) for ln in list_len]))
raise IndexError(msg)
else:
list_len = list_len[0]
else:
list_len = None
return list_len, multi_list
@staticmethod
def _make_list_slices(ds_slice, list_len):
"""
Duplicate slice arguements to enable zipping of list slices with
non-list slices
Parameters
----------
ds_slice : tuple
Tuple of (int, slice, list, ndarray) of what to extract from ds,
each arg is for a sequential axis
list_len : int
List lenght
Returns
-------
zip_slices : list
List of slices to extract for each entry in list slice
"""
zip_slices = []
for s in ds_slice:
if not isinstance(s, (list, np.ndarray)):
zip_slices.append([s] * list_len)
else:
zip_slices.append(s)
return zip_slices
@staticmethod
def _list_to_slice(ds_slice):
"""
Check ds_slice to see if it is an int, slice, or list. Return
pieces required for fancy indexing based on input type.
Parameters
----------
ds_slice : tuple
Tuple of (int, slice, list, ndarray) of what to extract from ds,
each arg is for a sequential axis
Returns
-------
ds_slice : slice
Slice that encompasses the entire range
ds_idx : ndarray
Adjusted list to extract points of interest from sliced array
"""
ds_idx = None
if isinstance(ds_slice, (list, np.ndarray)):
in_slice = np.array(ds_slice)
if np.issubdtype(in_slice.dtype, np.dtype(bool)):
in_slice = np.where(in_slice)[0]
s = in_slice.min()
e = in_slice.max() + 1
ds_slice = slice(s, e, None)
ds_idx = in_slice - s
elif isinstance(ds_slice, slice):
ds_idx = slice(None)
return ds_slice, ds_idx
@staticmethod
def _get_out_arr_slice(arr_slice, start):
"""
Determine slice of pre-build output array that is being filled
Parameters
----------
arr_slice : tuple
Tuple of (int, slice, list, ndarray) for section of output array
being extracted
start : int
Start of slice, used for list gets
Returns
-------
out_slice : tuple
Slice arguments of portion of output array to insert arr_slice
into
stop : int
Stop of slice, used for list gets, will be new start upon
iteration
"""
out_slice = ()
int_slice = ()
int_start = start
int_stop = start
stop = start
for s in arr_slice:
if isinstance(s, slice):
out_slice += (slice(None), )
int_slice += (slice(None), )
elif isinstance(s, (int, np.integer)):
if int_start == int_stop:
int_slice += (int_start, )
int_stop += 1
elif isinstance(s, (list, tuple, np.ndarray)):
list_len = len(s)
if list_len == 1:
stop += 1
out_slice += ([start], )
else:
stop += len(s)
out_slice += (slice(start, stop), )
if not out_slice:
out_slice += (start, )
stop += 1
elif all(s == slice(None) for s in out_slice):
out_slice = int_slice
stop = int_stop
return out_slice, stop
def _get_out_arr_shape(self, ds_slice):
"""
Determine shape of output array
Parameters
----------
ds_slice : tuple
Tuple of (int, slice, list, ndarray) of what to extract from ds,
each arg is for a sequential axis
Returns
-------
out_shape : tuple
Shape of output array
"""
ds_shape = self.shape
out_shape = ()
contains_list = False
ds_slice += (slice(None), ) * (len(ds_shape) - len(ds_slice))
for i, ax_slice in enumerate(ds_slice):
if isinstance(ax_slice, slice):
stop = ax_slice.stop
if stop is None:
stop = ds_shape[i]
out_shape += (len(range(*ax_slice.indices(stop))), )
if isinstance(ax_slice, (list, tuple, np.ndarray)):
if not contains_list:
out_shape += (len(ax_slice), )
contains_list = True
return out_shape
def _extract_list_slice(self, ds_slice):
"""
Optimize and extract list slice request along a single dimension
Parameters
----------
ds_slice : tuple
Tuple of (int, slice, list, ndarray) of what to extract from ds,
each arg is for a sequential axis
Returns
-------
out : ndarray
Extracted array of data from ds
"""
out_slices = []
chunks = self.chunks
sort_idx = []
list_len = None
if chunks:
for i, ax_slice in enumerate(ds_slice):
c = chunks[i]
if isinstance(ax_slice, (list, np.ndarray)):
if not isinstance(ax_slice, np.ndarray):
ax_slice = np.array(ax_slice)
idx = np.argsort(ax_slice)
sort_idx.append(np.argsort(idx))
ax_slice = ax_slice[idx]
diff = np.diff(ax_slice) > c
if np.any(diff):
pos = np.where(diff)[0] + 1
ax_slice = np.split(ax_slice, pos)
list_len = len(ax_slice)
elif isinstance(ax_slice, slice):
sort_idx.append(slice(None))
out_slices.append(ax_slice)
else:
out_slices = ds_slice
if list_len is not None:
out_shape = self._get_out_arr_shape(ds_slice)
out_slices = self._make_list_slices(out_slices, list_len)
out = np.zeros(out_shape, dtype=self.dtype)
start = 0
for s in zip(*out_slices):
arr_slice, stop = self._get_out_arr_slice(s, start)
out[arr_slice] = self._extract_ds_slice(s)
start = stop
out = out[tuple(sort_idx)]
else:
out = self._extract_ds_slice(ds_slice)
return out
def _extract_multi_list_slice(self, ds_slice, list_len):
"""
Extract ds_slice that has multiple lists
Parameters
----------
ds_slice : tuple
Tuple of (int, slice, list, ndarray) of what to extract from ds,
each arg is for a sequential axis
list_len : int
List lenght
Returns
-------
out : ndarray
Extracted array of data from ds
"""
zip_slices = self._make_list_slices(ds_slice, list_len)
out_shape = self._get_out_arr_shape(ds_slice)
out = np.zeros(out_shape, dtype=self.dtype)
start = 0
for s in zip(*zip_slices):
arr_slice, stop = self._get_out_arr_slice(s, start)
arr = self._extract_ds_slice(s)
out[arr_slice] = arr
start = stop
return out
def _extract_ds_slice(self, ds_slice):
"""
Extact ds_slice from ds using slices where possible
Parameters
----------
ds_slice : tuple
Tuple of (int, slice, list, ndarray) of what to extract from ds,
each arg is for a sequential axis
Returns
-------
out : ndarray
Extracted array of data from ds
"""
slices = ()
idx_slice = ()
for ax_slice in ds_slice:
ax_slice, ax_idx = self._list_to_slice(ax_slice)
slices += (ax_slice,)
if ax_idx is not None:
idx_slice += (ax_idx,)
out = self.ds[slices]
# check to see if idx_slice needs to be applied
if any(s != slice(None) if isinstance(s, slice) else True
for s in idx_slice):
out = out[idx_slice]
return out

extracting data from h5py and h5pyd using a list is slow.

A possible solution is:
The most performant solution would be to use concurrent futures to extract each item associated with the list in parallel and then recombine the requests. This would greatly simplify the logic noted above. The only issue is that it wouldn't be compatible with the already parallel nature of reV Generation as you can't run parallel workers on parellel workers.

Additional Context
Relevant for: NREL/reV#335

Charge code
reV

Urgency / Timeframe
TBD

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementUpdate to logic or general code improvementstopic-resource-handlerIssues/pull requests related to the main rex Resource handlerwishlistFeatures we would ideally want to support, but not right now

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions