Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 68 additions & 27 deletions src/PowerPlatform/Dataverse/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from .core._auth import _AuthManager
from .core.config import DataverseConfig
from .core.results import FluentResult, RequestMetadata
from .data._odata import _ODataClient


Expand Down Expand Up @@ -108,7 +109,9 @@ def _scoped_odata(self) -> Iterator[_ODataClient]:
yield od

# ---------------- Unified CRUD: create/update/delete ----------------
def create(self, table_schema_name: str, records: Union[Dict[str, Any], List[Dict[str, Any]]]) -> List[str]:
def create(
self, table_schema_name: str, records: Union[Dict[str, Any], List[Dict[str, Any]]]
) -> FluentResult[List[str]]:
"""
Create one or more records by table name.

Expand All @@ -118,8 +121,9 @@ def create(self, table_schema_name: str, records: Union[Dict[str, Any], List[Dic
Each dictionary should contain column schema names as keys.
:type records: :class:`dict` or :class:`list` of :class:`dict`

:return: List of created record GUIDs. Returns a single-element list for a single input.
:rtype: :class:`list` of :class:`str`
:return: FluentResult wrapping list of created record GUIDs. Acts like a list
by default. Call ``.with_detail_response()`` for telemetry.
:rtype: :class:`~PowerPlatform.Dataverse.core.results.FluentResult` of :class:`list` of :class:`str`

:raises TypeError: If ``records`` is not a dict or list[dict], or if the internal
client returns an unexpected type.
Expand All @@ -139,25 +143,31 @@ def create(self, table_schema_name: str, records: Union[Dict[str, Any], List[Dic
]
ids = client.create("account", records)
print(f"Created {len(ids)} accounts")

Access telemetry with ``.with_detail_response()``::

response = client.create("account", {"name": "Test"}).with_detail_response()
print(f"Timing: {response.telemetry['timing_ms']}ms")
"""
with self._scoped_odata() as od:
entity_set = od._entity_set_from_schema_name(table_schema_name)
if isinstance(records, dict):
rid = od._create(entity_set, table_schema_name, records)
# _create returns str on single input
rid, metadata = od._create_with_metadata(entity_set, table_schema_name, records)
if not isinstance(rid, str):
raise TypeError("_create (single) did not return GUID string")
return [rid]
return FluentResult([rid], metadata, batch_info={"total": 1, "success": 1, "failures": 0})
if isinstance(records, list):
ids = od._create_multiple(entity_set, table_schema_name, records)
ids, metadata, batch_info = od._create_multiple_with_metadata(
entity_set, table_schema_name, records
)
if not isinstance(ids, list) or not all(isinstance(x, str) for x in ids):
raise TypeError("_create (multi) did not return list[str]")
return ids
return FluentResult(ids, metadata, batch_info=batch_info)
raise TypeError("records must be dict or list[dict]")

def update(
self, table_schema_name: str, ids: Union[str, List[str]], changes: Union[Dict[str, Any], List[Dict[str, Any]]]
) -> None:
) -> FluentResult[None]:
"""
Update one or more records.

Expand All @@ -177,6 +187,9 @@ def update(
have equal length for one-to-one mapping.
:type changes: :class:`dict` or :class:`list` of :class:`dict`

:return: FluentResult wrapping None. Call ``.with_detail_response()`` for telemetry.
:rtype: :class:`~PowerPlatform.Dataverse.core.results.FluentResult` of ``None``

:raises TypeError: If ``ids`` is not str or list[str], or if ``changes`` type doesn't match usage pattern.

.. note::
Expand All @@ -199,24 +212,34 @@ def update(
{"name": "Updated Name 2"}
]
client.update("account", ids, changes)

Access telemetry with ``.with_detail_response()``::

response = client.update("account", id, {"name": "Test"}).with_detail_response()
print(f"Timing: {response.telemetry['timing_ms']}ms")
"""
with self._scoped_odata() as od:
if isinstance(ids, str):
if not isinstance(changes, dict):
raise TypeError("For single id, changes must be a dict")
od._update(table_schema_name, ids, changes) # discard representation
return None
_, metadata = od._update_with_metadata(table_schema_name, ids, changes)
return FluentResult(None, metadata)
if not isinstance(ids, list):
raise TypeError("ids must be str or list[str]")
# For bulk updates, we still use the original method as _update_by_ids doesn't have a _with_metadata variant yet
# TODO: Add _update_by_ids_with_metadata for bulk update telemetry
od._update_by_ids(table_schema_name, ids, changes)
return None
# Create placeholder metadata for bulk updates
placeholder_metadata = RequestMetadata()
num_updates = len(ids)
return FluentResult(None, placeholder_metadata, batch_info={"total": num_updates, "success": num_updates, "failures": 0})

def delete(
self,
table_schema_name: str,
ids: Union[str, List[str]],
use_bulk_delete: bool = True,
) -> Optional[str]:
) -> FluentResult[Optional[str]]:
"""
Delete one or more records by GUID.

Expand All @@ -228,12 +251,13 @@ def delete(
return its async job identifier. When ``False`` each record is deleted sequentially.
:type use_bulk_delete: :class:`bool`

:return: FluentResult wrapping BulkDelete job ID (for bulk) or None (for single).
Call ``.with_detail_response()`` for telemetry.
:rtype: :class:`~PowerPlatform.Dataverse.core.results.FluentResult` of :class:`str` or ``None``

:raises TypeError: If ``ids`` is not str or list[str].
:raises HttpError: If the underlying Web API delete request fails.

:return: BulkDelete job ID when deleting multiple records via BulkDelete; otherwise ``None``.
:rtype: :class:`str` or None

Example:
Delete a single record::

Expand All @@ -242,22 +266,31 @@ def delete(
Delete multiple records::

job_id = client.delete("account", [id1, id2, id3])

Access telemetry with ``.with_detail_response()``::

response = client.delete("account", account_id).with_detail_response()
print(f"Timing: {response.telemetry['timing_ms']}ms")
"""
with self._scoped_odata() as od:
if isinstance(ids, str):
od._delete(table_schema_name, ids)
return None
_, metadata = od._delete_with_metadata(table_schema_name, ids)
return FluentResult(None, metadata)
if not isinstance(ids, list):
raise TypeError("ids must be str or list[str]")
if not ids:
return None
return FluentResult(None, RequestMetadata())
if not all(isinstance(rid, str) for rid in ids):
raise TypeError("ids must contain string GUIDs")
if use_bulk_delete:
return od._delete_multiple(table_schema_name, ids)
# TODO: Add _delete_multiple_with_metadata for bulk delete telemetry
job_id = od._delete_multiple(table_schema_name, ids)
return FluentResult(job_id, RequestMetadata(), batch_info={"total": len(ids)})
# Sequential deletes
last_metadata = RequestMetadata()
for rid in ids:
od._delete(table_schema_name, rid)
return None
_, last_metadata = od._delete_with_metadata(table_schema_name, rid)
return FluentResult(None, last_metadata, batch_info={"total": len(ids), "success": len(ids), "failures": 0})

def get(
self,
Expand All @@ -269,11 +302,11 @@ def get(
top: Optional[int] = None,
expand: Optional[List[str]] = None,
page_size: Optional[int] = None,
) -> Union[Dict[str, Any], Iterable[List[Dict[str, Any]]]]:
) -> Union[FluentResult[Dict[str, Any]], Iterable[List[Dict[str, Any]]]]:
"""
Fetch a single record by ID or query multiple records.

When ``record_id`` is provided, returns a single record dictionary.
When ``record_id`` is provided, returns a FluentResult wrapping the record dictionary.
When ``record_id`` is None, returns a generator yielding batches of records.

:param table_schema_name: Schema name of the table (e.g. ``"account"`` or ``"new_MyTestTable"``).
Expand All @@ -293,9 +326,11 @@ def get(
:param page_size: Optional number of records per page for pagination.
:type page_size: :class:`int` or None

:return: Single record dict if ``record_id`` is provided, otherwise a generator
:return: FluentResult wrapping single record dict if ``record_id`` is provided
(call ``.with_detail_response()`` for telemetry), otherwise a generator
yielding lists of record dictionaries (one list per page).
:rtype: :class:`dict` or :class:`collections.abc.Iterable` of :class:`list` of :class:`dict`
:rtype: :class:`~PowerPlatform.Dataverse.core.results.FluentResult` of :class:`dict`
or :class:`collections.abc.Iterable` of :class:`list` of :class:`dict`

:raises TypeError: If ``record_id`` is provided but not a string.

Expand All @@ -305,6 +340,11 @@ def get(
record = client.get("account", record_id=account_id, select=["name", "telephone1"])
print(record["name"])

Access telemetry for single record fetch::

response = client.get("account", record_id=account_id).with_detail_response()
print(f"Timing: {response.telemetry['timing_ms']}ms")

Query multiple records with filtering (note: exact logical names in filter)::

for batch in client.get(
Expand Down Expand Up @@ -340,11 +380,12 @@ def get(
if not isinstance(record_id, str):
raise TypeError("record_id must be str")
with self._scoped_odata() as od:
return od._get(
record, metadata = od._get_with_metadata(
table_schema_name,
record_id,
select=select,
)
return FluentResult(record, metadata)

def _paged() -> Iterable[List[Dict[str, Any]]]:
with self._scoped_odata() as od:
Expand Down
28 changes: 27 additions & 1 deletion src/PowerPlatform/Dataverse/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,30 @@
configuration, HTTP client, and error handling.
"""

__all__ = []
from .results import (
# New fluent API types
RequestMetadata,
DataverseResponse,
FluentResult,
# Legacy types (backward compatible)
OperationResult,
CreateResult,
UpdateResult,
DeleteResult,
GetResult,
PagedResult,
)

__all__ = [
# New fluent API types
"RequestMetadata",
"DataverseResponse",
"FluentResult",
# Legacy types (backward compatible)
"OperationResult",
"CreateResult",
"UpdateResult",
"DeleteResult",
"GetResult",
"PagedResult",
]
64 changes: 63 additions & 1 deletion src/PowerPlatform/Dataverse/core/_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,26 @@
from __future__ import annotations

import time
from typing import Any, Optional
from dataclasses import dataclass
from typing import Any, Optional, Tuple

import requests


@dataclass
class _HttpTiming:
"""Timing information for an HTTP request.

:param elapsed_ms: Total request duration in milliseconds.
:type elapsed_ms: :class:`float`
:param attempts: Number of attempts made (1 = no retries).
:type attempts: :class:`int`
"""

elapsed_ms: float
attempts: int = 1


class _HttpClient:
"""
HTTP client with configurable retry logic and timeout handling.
Expand Down Expand Up @@ -77,3 +92,50 @@ def _request(self, method: str, url: str, **kwargs: Any) -> requests.Response:
delay = self.base_delay * (2**attempt)
time.sleep(delay)
continue

def _request_with_timing(
self, method: str, url: str, **kwargs: Any
) -> Tuple[requests.Response, _HttpTiming]:
"""
Execute an HTTP request and return response with timing information.

Same behavior as :meth:`_request` but additionally returns timing data
for telemetry purposes.

:param method: HTTP method (GET, POST, PUT, DELETE, etc.).
:type method: :class:`str`
:param url: Target URL for the request.
:type url: :class:`str`
:param kwargs: Additional arguments passed to ``requests.request()``.
:return: Tuple of (HTTP response, timing information).
:rtype: :class:`tuple` of (:class:`requests.Response`, :class:`_HttpTiming`)
:raises requests.exceptions.RequestException: If all retry attempts fail.
"""
# If no timeout is provided, use the user-specified default timeout if set;
# otherwise, apply per-method defaults (120s for POST/DELETE, 10s for others).
if "timeout" not in kwargs:
if self.default_timeout is not None:
kwargs["timeout"] = self.default_timeout
else:
m = (method or "").lower()
kwargs["timeout"] = 120 if m in ("post", "delete") else 10

start_time = time.time()
attempts = 0

# Small backoff retry on network errors only
for attempt in range(self.max_attempts):
attempts = attempt + 1
try:
response = requests.request(method, url, **kwargs)
elapsed_ms = (time.time() - start_time) * 1000
return response, _HttpTiming(elapsed_ms=elapsed_ms, attempts=attempts)
except requests.exceptions.RequestException:
if attempt == self.max_attempts - 1:
raise
delay = self.base_delay * (2**attempt)
time.sleep(delay)
continue

# This should not be reached, but include for type safety
raise RuntimeError("Unexpected state in _request_with_timing")
Loading
Loading