From ddca15d06fc5b74493e6f69368d404bf48f6c6d6 Mon Sep 17 00:00:00 2001 From: Max Wang Date: Thu, 6 Nov 2025 10:11:56 -0800 Subject: [PATCH 1/3] bulk delete --- README.md | 14 ++- examples/quickstart.py | 104 ++++++++++++-------- src/dataverse_sdk/client.py | 31 +++++- src/dataverse_sdk/odata.py | 186 ++++++++++++++++++++++++++++++++++-- 4 files changed, 280 insertions(+), 55 deletions(-) diff --git a/README.md b/README.md index 58222b9..8fdd863 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ Auth: | `update` | `update(logical_name, list[id], patch)` | `None` | Broadcast; same patch applied to all IDs (UpdateMultiple). | | `update` | `update(logical_name, list[id], list[patch])` | `None` | 1:1 patches; lengths must match (UpdateMultiple). | | `delete` | `delete(logical_name, id)` | `None` | Delete one record. | -| `delete` | `delete(logical_name, list[id])` | `None` | Delete many (sequential). | +| `delete` | `delete(logical_name, list[id], ..., wait_poll_interval_seconds=2.0)` | `Optional[str]` | Delete many with async BulkDelete. | | `query_sql` | `query_sql(sql)` | `list[dict]` | Constrained read-only SELECT via `?sql=`. | | `create_table` | `create_table(tablename, schema, solution_unique_name=None)` | `dict` | Creates custom table + columns. Friendly name (e.g. `SampleItem`) becomes schema `new_SampleItem`; explicit schema name (contains `_`) used as-is. Pass `solution_unique_name` to attach the table to a specific solution instead of the default solution. | | `create_column` | `create_column(tablename, columns)` | `list[str]` | Adds columns using a `{name: type}` mapping (same shape as `create_table` schema). Returns schema names for the created columns. | @@ -54,8 +54,10 @@ Auth: Guidelines: - `create` always returns a list of GUIDs (1 for single, N for bulk). -- `update`/`delete` always return `None` (single and multi forms). +- `update` always returns `None`. - Bulk update chooses broadcast vs per-record by the type of `changes` (dict vs list). +- `delete` returns `None` for single-record delete and the BulkDelete async job ID for multi-record delete. +- By default multi-record delete doesn't wait for the async job to complete. User can optionally wait for the job to complete.,klmmm - Paging and SQL operations never mutate inputs. - Metadata lookups for logical name stamping cached per entity set (in-memory). @@ -143,9 +145,12 @@ client.update("account", ids, [ ]) print({"multi_update": "ok"}) -# Delete +# Delete (single) client.delete("account", account_id) +# Bulk delete (schedules BulkDelete and returns job id) +job_id = client.delete("account", ids) + # SQL (read-only) via Web API `?sql=` rows = client.query_sql("SELECT TOP 3 accountid, name FROM account ORDER BY createdon DESC") for r in rows: @@ -334,7 +339,7 @@ client.delete_table("SampleItem") # delete table (friendly name or explici Notes: - `create` always returns a list of GUIDs (length 1 for single input). -- `update` and `delete` return `None` for both single and multi. +- `update` returns `None`. `delete` returns `None` for single-record delete and the BulkDelete async job ID for multi-record delete. - Passing a list of payloads to `create` triggers bulk create and returns `list[str]` of IDs. - `get` supports single record retrieval with record id or paging through result sets (prefer `select` to limit columns). - For CRUD methods that take a record id, pass the GUID string (36-char hyphenated). Parentheses around the GUID are accepted but not required. @@ -350,7 +355,6 @@ VS Code Tasks ## Limitations / Future Work - No general-purpose OData batching, upsert, or association operations yet. -- `DeleteMultiple` not yet exposed. - Minimal retry policy in library (network-error only); examples include additional backoff for transient Dataverse consistency. ## Contributing diff --git a/examples/quickstart.py b/examples/quickstart.py index e1c3a63..4f72568 100644 --- a/examples/quickstart.py +++ b/examples/quickstart.py @@ -14,7 +14,6 @@ import requests import time from datetime import date, timedelta -from concurrent.futures import ThreadPoolExecutor, as_completed entered = input("Enter Dataverse org URL (e.g. https://yourorg.crm.dynamics.com): ").strip() @@ -57,10 +56,12 @@ def backoff_retry(op, *, delays=(0, 2, 5, 10, 20), retry_http_statuses=(400, 403 print(f'Request failed: {ex}') last_exc = ex if retry_if and retry_if(ex): + print("Retrying operation...") continue if isinstance(ex, requests.exceptions.HTTPError): code = getattr(getattr(ex, 'response', None), 'status_code', None) if code in retry_http_statuses: + print("Retrying operation...") continue break if last_exc: @@ -176,20 +177,6 @@ def print_line_summaries(label: str, summaries: list[dict]) -> None: f"count={s.get('count')} amount={s.get('amount')} when={s.get('when')}" ) -def _resolve_status_value(kind: str, raw_value, use_french: bool): - """kind values: - - 'label': English label - - 'fr_label': French label if allowed, else fallback to English equivalent - - 'int': the enum integer value - """ - if kind == "label": - return raw_value - if kind == "fr_label": - if use_french: - return raw_value - return "Active" if raw_value == "Actif" else "Inactive" - return raw_value - def _has_installed_language(base_url: str, credential, lcid: int) -> bool: try: token = credential.get_token(f"{base_url}/.default").token @@ -496,39 +483,64 @@ def run_paging_demo(label: str, *, top: Optional[int], page_size: Optional[int]) print(f"Retrieve multiple demos failed: {e}") # 5) Delete record print("Delete (OData):") -# Show deletes to be executed (concurrently via SDK delete) +# Show deletes to be executed (single + bulk) if 'record_ids' in locals() and record_ids: print({"delete_count": len(record_ids)}) -pause("Execute Delete (concurrent SDK calls)") +pause("Execute Delete (single then bulk)") try: if record_ids: - max_workers = min(8, len(record_ids)) - log_call(f"concurrent delete {len(record_ids)} items from '{logical}' (workers={max_workers})") + single_target = record_ids[0] + rest_targets = record_ids[1:] + single_error: Optional[str] = None + bulk_job_id: Optional[str] = None + bulk_error: Optional[str] = None + bulk_wait_job_id: Optional[str] = None + bulk_wait_error: Optional[str] = None + async_targets: list[str] = [] + wait_targets: list[str] = [] + + try: + log_call(f"client.delete('{logical}', '{single_target}')") + backoff_retry(lambda: client.delete(logical, single_target)) + except Exception as ex: + single_error = str(ex) - successes: list[str] = [] - failures: list[dict] = [] + if rest_targets: + half = len(rest_targets) // 2 + async_targets = rest_targets[:half] + wait_targets = rest_targets[half:] - def _del_one(rid: str) -> tuple[str, bool, str | None]: try: - log_call(f"client.delete('{logical}', '{rid}')") - backoff_retry(lambda: client.delete(logical, rid)) - return (rid, True, None) + log_call(f"client.delete('{logical}', <{len(async_targets)} ids>) [fire-and-forget]") + bulk_job_id = client.delete(logical, async_targets) except Exception as ex: - return (rid, False, str(ex)) - - with ThreadPoolExecutor(max_workers=max_workers) as executor: - future_map = {executor.submit(_del_one, rid): rid for rid in record_ids} - for fut in as_completed(future_map): - rid, ok, err = fut.result() - if ok: - successes.append(rid) - else: - failures.append({"id": rid, "error": err}) + bulk_error = str(ex) + try: + log_call(f"client.delete('{logical}', <{len(wait_targets)} ids>, wait=True)") + bulk_wait_job_id = client.delete( + logical, + wait_targets, + wait=True, + ) + except Exception as ex: + bulk_wait_error = str(ex) print({ "entity": logical, - "delete_summary": {"requested": len(record_ids), "success": len(successes), "failures": len(failures)}, - "failed": failures[:5], # preview up to 5 failures + "delete_single": { + "id": single_target, + "error": single_error, + }, + "delete_bulk_fire_and_forget": { + "count": len(async_targets) if rest_targets else 0, + "job_id": bulk_job_id, + "error": bulk_error, + }, + "delete_bulk_wait": { + "count": len(wait_targets) if rest_targets else 0, + "job_id": bulk_wait_job_id, + "error": bulk_wait_error, + }, }) else: raise RuntimeError("No record created; skipping delete.") @@ -577,8 +589,22 @@ def _metadata_after_create(): if isinstance(raw_type, str): attr_type_before = raw_type lowered = raw_type.lower() - log_call(f"client.delete_column('{entity_schema}', '{scratch_column}')") - column_delete = client.delete_columns(entity_schema, scratch_column) + delete_target = attribute_schema or scratch_column + log_call(f"client.delete_column('{entity_schema}', '{delete_target}')") + + def _delete_column(): + return client.delete_columns(entity_schema, delete_target) + + column_delete = backoff_retry( + _delete_column, + delays=(0, 1, 2, 4, 8), + retry_http_statuses=(), + retry_if=lambda exc: ( + isinstance(exc, MetadataError) + or "not found" in str(exc).lower() + or "not yet available" in str(exc).lower() + ), + ) if not isinstance(column_delete, list) or not column_delete: raise RuntimeError("delete_column did not return schema list") deleted_details = column_delete diff --git a/src/dataverse_sdk/client.py b/src/dataverse_sdk/client.py index 4e5be5a..68b1289 100644 --- a/src/dataverse_sdk/client.py +++ b/src/dataverse_sdk/client.py @@ -201,7 +201,14 @@ def update(self, logical_name: str, ids: Union[str, List[str]], changes: Union[D od._update_by_ids(logical_name, ids, changes) return None - def delete(self, logical_name: str, ids: Union[str, List[str]]) -> None: + def delete( + self, + logical_name: str, + ids: Union[str, List[str]], + wait: bool = False, + wait_timeout_seconds: Optional[int] = 300, + wait_poll_interval_seconds: float = 2.0, + ) -> Optional[str]: """ Delete one or more records by GUID. @@ -209,8 +216,17 @@ def delete(self, logical_name: str, ids: Union[str, List[str]]) -> None: :type logical_name: str :param ids: Single GUID string or list of GUID strings to delete. :type ids: str or list[str] - + :param wait: When deleting multiple records, wait for the background job to complete. Ignored for single deletes. + :type wait: bool + :param wait_timeout_seconds: Optional timeout applied when ``wait`` is True. ``None`` or + values ``<= 0`` wait indefinitely. Defaults to 300 seconds. + :type wait_timeout_seconds: int or None + :param wait_poll_interval_seconds: Poll interval used while waiting for job completion. + :type wait_poll_interval_seconds: float :raises TypeError: If ``ids`` is not str or list[str]. + + :return: BulkDelete job ID when deleting multiple records; otherwise ``None``. + :rtype: str or None Example: Delete a single record:: @@ -219,7 +235,7 @@ def delete(self, logical_name: str, ids: Union[str, List[str]]) -> None: Delete multiple records:: - client.delete("account", [id1, id2, id3]) + job_id = client.delete("account", [id1, id2, id3]) """ od = self._get_odata() if isinstance(ids, str): @@ -227,8 +243,13 @@ def delete(self, logical_name: str, ids: Union[str, List[str]]) -> None: return None if not isinstance(ids, list): raise TypeError("ids must be str or list[str]") - od._delete_multiple(logical_name, ids) - return None + return od._delete_multiple( + logical_name, + ids, + wait=wait, + timeout_seconds=wait_timeout_seconds, + poll_interval_seconds=wait_poll_interval_seconds, + ) def get( self, diff --git a/src/dataverse_sdk/odata.py b/src/dataverse_sdk/odata.py index f01bcf7..b92fa96 100644 --- a/src/dataverse_sdk/odata.py +++ b/src/dataverse_sdk/odata.py @@ -1,11 +1,12 @@ from __future__ import annotations -from typing import Any, Dict, Optional, List, Union, Iterable +from typing import Any, Dict, Optional, List, Union, Iterable, Tuple from enum import Enum import unicodedata import time import re import json +from datetime import datetime, timezone import importlib.resources as ir from .http import HttpClient @@ -281,13 +282,186 @@ def _update_by_ids(self, logical_name: str, ids: List[str], changes: Union[Dict[ self._update_multiple(entity_set, logical_name, batch) return None - def _delete_multiple(self, logical_name: str, ids: List[str]) -> None: - """Delete many records by GUID list (simple loop; potential future optimization point).""" + def _delete_multiple( + self, + logical_name: str, + ids: List[str], + wait: bool = False, + timeout_seconds: Optional[int] = 300, + poll_interval_seconds: float = 2.0, + ) -> Optional[str]: + """Delete many records by GUID list. + + Returns the asynchronous job identifier. When ``wait`` is True the call blocks until the + async operation completes or the timeout elapses. + """ if not isinstance(ids, list): raise TypeError("ids must be list[str]") - for rid in ids: - self._delete(logical_name, rid) - return None + if not all(isinstance(rid, str) for rid in ids): + raise TypeError("each id must be a string GUID") + targets = [rid for rid in ids if rid] + if not targets: + return None + value_objects = [{"Value": rid, "Type": "System.Guid"} for rid in targets] + + pk_attr = self._primary_id_attr(logical_name) + timestamp = datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z") + job_label = f"Bulk delete {logical_name} records @ {timestamp}" + + when_utc = timestamp + + query = { + "@odata.type": "Microsoft.Dynamics.CRM.QueryExpression", + "EntityName": logical_name, + "ColumnSet": { + "@odata.type": "Microsoft.Dynamics.CRM.ColumnSet", + "AllColumns": False, + "Columns": [], + }, + "Criteria": { + "@odata.type": "Microsoft.Dynamics.CRM.FilterExpression", + "FilterOperator": "And", + "Conditions": [ + { + "@odata.type": "Microsoft.Dynamics.CRM.ConditionExpression", + "AttributeName": pk_attr, + "Operator": "In", + "Values": value_objects, + } + ], + }, + } + + payload = { + "JobName": job_label, + "SendEmailNotification": False, + "ToRecipients": [], + "CCRecipients": [], + "RecurrencePattern": "", + "StartDateTime": when_utc, + "QuerySet": [query], + } + + url = f"{self.api}/BulkDelete" + response = self._request("post", url, json=payload, expected=(200, 202, 204)) + + job_id = None + try: + body = response.json() if response.text else {} + except ValueError: + body = {} + if isinstance(body, dict): + job_id = body.get("JobId") + + if wait and job_id: + payload, succeeded = self._wait_for_async_job( + job_id, + timeout_seconds=timeout_seconds, + poll_interval=poll_interval_seconds, + ) + if not succeeded: + state = payload.get("statecode") + status = payload.get("statuscode") + message = payload.get("message") + raise RuntimeError( + f"Bulk delete async job '{job_id}' did not succeed (state={state}, status={status})." + + (f" Message: {message}" if message else "") + ) + + return job_id + + def _wait_for_async_job( + self, + job_id: str, + timeout_seconds: Optional[int] = 300, + poll_interval: float = 2.0, + ) -> Tuple[Dict[str, Any], bool]: + """Poll the asyncoperation record until completion or timeout. + + Returns the last payload and a boolean indicating success. + """ + if not job_id: + return {}, False + + interval = poll_interval if poll_interval and poll_interval > 0 else 2.0 + deadline = None + if timeout_seconds is not None and timeout_seconds > 0: + deadline = time.time() + timeout_seconds + + url = f"{self.api}/asyncoperations({job_id})" + params = {"$select": "statecode,statuscode,name,message"} + last_payload: Dict[str, Any] = {} + + while True: + now = time.time() + if deadline and now >= deadline: + message = last_payload.get("message") if last_payload else None + raise TimeoutError( + f"Timed out waiting for async job '{job_id}' to complete." + + (f" Last message: {message}" if message else "") + ) + + try: + response = self._request("get", url, params=params) + try: + payload = response.json() if response.text else {} + except ValueError: + payload = {} + if isinstance(payload, dict): + last_payload = payload + else: + last_payload = {} + except HttpError as err: + if getattr(err, "status_code", None) == 404: + # The job record might not be immediately available yet; retry until timeout. + time.sleep(interval) + continue + raise + + state = last_payload.get("statecode") + status = last_payload.get("statuscode") + finished, succeeded = self._interpret_async_job_state(state, status) + if finished: + return last_payload, succeeded + + time.sleep(interval) + + @staticmethod + def _interpret_async_job_state(state_raw: Any, status_raw: Any) -> Tuple[bool, bool]: + """Return (finished, succeeded) flags for an asyncoperation state/status.""" + + def _norm(val: Any) -> str: + if val is None: + return "" + if isinstance(val, str): + return val.strip().lower() + return str(val).strip().lower() + + state_norm = _norm(state_raw) + status_norm = _norm(status_raw) + + finished = False + succeeded = False + + if state_norm in {"3", "completed", "complete", "succeeded"}: + finished = True + + if status_norm in {"30", "succeeded", "success", "completed"}: + finished = True + succeeded = True + elif status_norm in {"31", "failed", "failure"}: + finished = True + succeeded = False + elif status_norm in {"32", "33", "canceled", "cancelled"}: + finished = True + succeeded = False + elif status_norm.isdigit(): + code = int(status_norm) + if code >= 30: + finished = True + succeeded = (code == 30) + + return finished, succeeded def _format_key(self, key: str) -> str: k = key.strip() From abd25ef2fa37c83fda8274228041213339bd0154 Mon Sep 17 00:00:00 2001 From: Max Wang Date: Thu, 6 Nov 2025 11:01:24 -0800 Subject: [PATCH 2/3] remove wait and add 2 modes for multi record delete --- README.md | 9 +-- examples/quickstart.py | 52 ++++++++-------- src/dataverse_sdk/client.py | 34 +++++----- src/dataverse_sdk/odata.py | 120 +----------------------------------- 4 files changed, 48 insertions(+), 167 deletions(-) diff --git a/README.md b/README.md index 8fdd863..a13fd37 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ Auth: | `update` | `update(logical_name, list[id], patch)` | `None` | Broadcast; same patch applied to all IDs (UpdateMultiple). | | `update` | `update(logical_name, list[id], list[patch])` | `None` | 1:1 patches; lengths must match (UpdateMultiple). | | `delete` | `delete(logical_name, id)` | `None` | Delete one record. | -| `delete` | `delete(logical_name, list[id], ..., wait_poll_interval_seconds=2.0)` | `Optional[str]` | Delete many with async BulkDelete. | +| `delete` | `delete(logical_name, list[id], use_bulk_delete=True)` | `Optional[str]` | Delete many with async BulkDelete or sequential single-record delete. | | `query_sql` | `query_sql(sql)` | `list[dict]` | Constrained read-only SELECT via `?sql=`. | | `create_table` | `create_table(tablename, schema, solution_unique_name=None)` | `dict` | Creates custom table + columns. Friendly name (e.g. `SampleItem`) becomes schema `new_SampleItem`; explicit schema name (contains `_`) used as-is. Pass `solution_unique_name` to attach the table to a specific solution instead of the default solution. | | `create_column` | `create_column(tablename, columns)` | `list[str]` | Adds columns using a `{name: type}` mapping (same shape as `create_table` schema). Returns schema names for the created columns. | @@ -56,8 +56,8 @@ Guidelines: - `create` always returns a list of GUIDs (1 for single, N for bulk). - `update` always returns `None`. - Bulk update chooses broadcast vs per-record by the type of `changes` (dict vs list). -- `delete` returns `None` for single-record delete and the BulkDelete async job ID for multi-record delete. -- By default multi-record delete doesn't wait for the async job to complete. User can optionally wait for the job to complete.,klmmm +- `delete` returns `None` for single-record delete and sequential multi-record delete, and the BulkDelete async job ID for multi-record BulkDelete. +- BulkDelete doesn't wait for the delete job to complete. It returns once the async delete job is scheduled. - Paging and SQL operations never mutate inputs. - Metadata lookups for logical name stamping cached per entity set (in-memory). @@ -339,7 +339,8 @@ client.delete_table("SampleItem") # delete table (friendly name or explici Notes: - `create` always returns a list of GUIDs (length 1 for single input). -- `update` returns `None`. `delete` returns `None` for single-record delete and the BulkDelete async job ID for multi-record delete. +- `update` returns `None`. +- `delete` returns `None` for single-record delete/sequential multi-record delete, and the BulkDelete async job ID for BulkDelete. - Passing a list of payloads to `create` triggers bulk create and returns `list[str]` of IDs. - `get` supports single record retrieval with record id or paging through result sets (prefer `select` to limit columns). - For CRUD methods that take a record id, pass the GUID string (36-char hyphenated). Parentheses around the GUID are accepted but not required. diff --git a/examples/quickstart.py b/examples/quickstart.py index 4f72568..a06a533 100644 --- a/examples/quickstart.py +++ b/examples/quickstart.py @@ -494,10 +494,6 @@ def run_paging_demo(label: str, *, top: Optional[int], page_size: Optional[int]) single_error: Optional[str] = None bulk_job_id: Optional[str] = None bulk_error: Optional[str] = None - bulk_wait_job_id: Optional[str] = None - bulk_wait_error: Optional[str] = None - async_targets: list[str] = [] - wait_targets: list[str] = [] try: log_call(f"client.delete('{logical}', '{single_target}')") @@ -505,25 +501,26 @@ def run_paging_demo(label: str, *, top: Optional[int], page_size: Optional[int]) except Exception as ex: single_error = str(ex) - if rest_targets: - half = len(rest_targets) // 2 - async_targets = rest_targets[:half] - wait_targets = rest_targets[half:] + half = max(1, len(rest_targets) // 2) + bulk_targets = rest_targets[:half] + sequential_targets = rest_targets[half:] + bulk_error = None + sequential_error = None - try: - log_call(f"client.delete('{logical}', <{len(async_targets)} ids>) [fire-and-forget]") - bulk_job_id = client.delete(logical, async_targets) - except Exception as ex: - bulk_error = str(ex) - try: - log_call(f"client.delete('{logical}', <{len(wait_targets)} ids>, wait=True)") - bulk_wait_job_id = client.delete( - logical, - wait_targets, - wait=True, - ) - except Exception as ex: - bulk_wait_error = str(ex) + # Fire-and-forget bulk delete for the first portion + try: + log_call(f"client.delete('{logical}', <{len(bulk_targets)} ids>, use_bulk_delete=True)") + bulk_job_id = client.delete(logical, bulk_targets) + except Exception as ex: + bulk_error = str(ex) + + # Sequential deletes for the remainder + try: + log_call(f"client.delete('{logical}', <{len(sequential_targets)} ids>, use_bulk_delete=False)") + for rid in sequential_targets: + backoff_retry(lambda rid=rid: client.delete(logical, rid, use_bulk_delete=False)) + except Exception as ex: + sequential_error = str(ex) print({ "entity": logical, @@ -531,15 +528,14 @@ def run_paging_demo(label: str, *, top: Optional[int], page_size: Optional[int]) "id": single_target, "error": single_error, }, - "delete_bulk_fire_and_forget": { - "count": len(async_targets) if rest_targets else 0, + "delete_bulk": { + "count": len(bulk_targets), "job_id": bulk_job_id, "error": bulk_error, }, - "delete_bulk_wait": { - "count": len(wait_targets) if rest_targets else 0, - "job_id": bulk_wait_job_id, - "error": bulk_wait_error, + "delete_sequential": { + "count": len(sequential_targets), + "error": sequential_error, }, }) else: diff --git a/src/dataverse_sdk/client.py b/src/dataverse_sdk/client.py index 68b1289..d6c61ce 100644 --- a/src/dataverse_sdk/client.py +++ b/src/dataverse_sdk/client.py @@ -205,9 +205,7 @@ def delete( self, logical_name: str, ids: Union[str, List[str]], - wait: bool = False, - wait_timeout_seconds: Optional[int] = 300, - wait_poll_interval_seconds: float = 2.0, + use_bulk_delete: bool = True, ) -> Optional[str]: """ Delete one or more records by GUID. @@ -216,16 +214,14 @@ def delete( :type logical_name: str :param ids: Single GUID string or list of GUID strings to delete. :type ids: str or list[str] - :param wait: When deleting multiple records, wait for the background job to complete. Ignored for single deletes. - :type wait: bool - :param wait_timeout_seconds: Optional timeout applied when ``wait`` is True. ``None`` or - values ``<= 0`` wait indefinitely. Defaults to 300 seconds. - :type wait_timeout_seconds: int or None - :param wait_poll_interval_seconds: Poll interval used while waiting for job completion. - :type wait_poll_interval_seconds: float + :param use_bulk_delete: When ``True`` (default) and ``ids`` is a list, execute the BulkDelete action and + return its async job identifier. When ``False`` each record is deleted sequentially. + :type use_bulk_delete: bool + :raises TypeError: If ``ids`` is not str or list[str]. + :raises HttpError: If the underlying Web API delete request fails. - :return: BulkDelete job ID when deleting multiple records; otherwise ``None``. + :return: BulkDelete job ID when deleting multiple records via BulkDelete; otherwise ``None``. :rtype: str or None Example: @@ -243,13 +239,15 @@ def delete( return None if not isinstance(ids, list): raise TypeError("ids must be str or list[str]") - return od._delete_multiple( - logical_name, - ids, - wait=wait, - timeout_seconds=wait_timeout_seconds, - poll_interval_seconds=wait_poll_interval_seconds, - ) + if not ids: + return None + if not all(isinstance(rid, str) for rid in ids): + raise TypeError("ids must contain string GUIDs") + if use_bulk_delete: + return od._delete_multiple(logical_name, ids) + for rid in ids: + od._delete(logical_name, rid) + return None def get( self, diff --git a/src/dataverse_sdk/odata.py b/src/dataverse_sdk/odata.py index b92fa96..688cd9f 100644 --- a/src/dataverse_sdk/odata.py +++ b/src/dataverse_sdk/odata.py @@ -1,6 +1,6 @@ from __future__ import annotations -from typing import Any, Dict, Optional, List, Union, Iterable, Tuple +from typing import Any, Dict, Optional, List, Union, Iterable from enum import Enum import unicodedata import time @@ -286,14 +286,10 @@ def _delete_multiple( self, logical_name: str, ids: List[str], - wait: bool = False, - timeout_seconds: Optional[int] = 300, - poll_interval_seconds: float = 2.0, ) -> Optional[str]: """Delete many records by GUID list. - Returns the asynchronous job identifier. When ``wait`` is True the call blocks until the - async operation completes or the timeout elapses. + Returns the asynchronous job identifier reported by the BulkDelete action. """ if not isinstance(ids, list): raise TypeError("ids must be list[str]") @@ -308,8 +304,6 @@ def _delete_multiple( timestamp = datetime.now(timezone.utc).isoformat(timespec="seconds").replace("+00:00", "Z") job_label = f"Bulk delete {logical_name} records @ {timestamp}" - when_utc = timestamp - query = { "@odata.type": "Microsoft.Dynamics.CRM.QueryExpression", "EntityName": logical_name, @@ -338,7 +332,7 @@ def _delete_multiple( "ToRecipients": [], "CCRecipients": [], "RecurrencePattern": "", - "StartDateTime": when_utc, + "StartDateTime": timestamp, "QuerySet": [query], } @@ -353,116 +347,8 @@ def _delete_multiple( if isinstance(body, dict): job_id = body.get("JobId") - if wait and job_id: - payload, succeeded = self._wait_for_async_job( - job_id, - timeout_seconds=timeout_seconds, - poll_interval=poll_interval_seconds, - ) - if not succeeded: - state = payload.get("statecode") - status = payload.get("statuscode") - message = payload.get("message") - raise RuntimeError( - f"Bulk delete async job '{job_id}' did not succeed (state={state}, status={status})." - + (f" Message: {message}" if message else "") - ) - return job_id - def _wait_for_async_job( - self, - job_id: str, - timeout_seconds: Optional[int] = 300, - poll_interval: float = 2.0, - ) -> Tuple[Dict[str, Any], bool]: - """Poll the asyncoperation record until completion or timeout. - - Returns the last payload and a boolean indicating success. - """ - if not job_id: - return {}, False - - interval = poll_interval if poll_interval and poll_interval > 0 else 2.0 - deadline = None - if timeout_seconds is not None and timeout_seconds > 0: - deadline = time.time() + timeout_seconds - - url = f"{self.api}/asyncoperations({job_id})" - params = {"$select": "statecode,statuscode,name,message"} - last_payload: Dict[str, Any] = {} - - while True: - now = time.time() - if deadline and now >= deadline: - message = last_payload.get("message") if last_payload else None - raise TimeoutError( - f"Timed out waiting for async job '{job_id}' to complete." - + (f" Last message: {message}" if message else "") - ) - - try: - response = self._request("get", url, params=params) - try: - payload = response.json() if response.text else {} - except ValueError: - payload = {} - if isinstance(payload, dict): - last_payload = payload - else: - last_payload = {} - except HttpError as err: - if getattr(err, "status_code", None) == 404: - # The job record might not be immediately available yet; retry until timeout. - time.sleep(interval) - continue - raise - - state = last_payload.get("statecode") - status = last_payload.get("statuscode") - finished, succeeded = self._interpret_async_job_state(state, status) - if finished: - return last_payload, succeeded - - time.sleep(interval) - - @staticmethod - def _interpret_async_job_state(state_raw: Any, status_raw: Any) -> Tuple[bool, bool]: - """Return (finished, succeeded) flags for an asyncoperation state/status.""" - - def _norm(val: Any) -> str: - if val is None: - return "" - if isinstance(val, str): - return val.strip().lower() - return str(val).strip().lower() - - state_norm = _norm(state_raw) - status_norm = _norm(status_raw) - - finished = False - succeeded = False - - if state_norm in {"3", "completed", "complete", "succeeded"}: - finished = True - - if status_norm in {"30", "succeeded", "success", "completed"}: - finished = True - succeeded = True - elif status_norm in {"31", "failed", "failure"}: - finished = True - succeeded = False - elif status_norm in {"32", "33", "canceled", "cancelled"}: - finished = True - succeeded = False - elif status_norm.isdigit(): - code = int(status_norm) - if code >= 30: - finished = True - succeeded = (code == 30) - - return finished, succeeded - def _format_key(self, key: str) -> str: k = key.strip() if k.startswith("(") and k.endswith(")"): From ebd5c000515a501c802611c2308fa420fa5b1d74 Mon Sep 17 00:00:00 2001 From: Max Wang Date: Thu, 6 Nov 2025 11:14:51 -0800 Subject: [PATCH 3/3] remove dup input validation --- src/dataverse_sdk/odata.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/dataverse_sdk/odata.py b/src/dataverse_sdk/odata.py index 688cd9f..06c4e56 100644 --- a/src/dataverse_sdk/odata.py +++ b/src/dataverse_sdk/odata.py @@ -291,10 +291,6 @@ def _delete_multiple( Returns the asynchronous job identifier reported by the BulkDelete action. """ - if not isinstance(ids, list): - raise TypeError("ids must be list[str]") - if not all(isinstance(rid, str) for rid in ids): - raise TypeError("each id must be a string GUID") targets = [rid for rid in ids if rid] if not targets: return None