diff --git a/README.md b/README.md index e3bbfe4..bbdc2f2 100644 --- a/README.md +++ b/README.md @@ -41,12 +41,12 @@ Auth: | `delete` | `delete(logical_name, id)` | `None` | Delete one record. | | `delete` | `delete(logical_name, list[id], use_bulk_delete=True)` | `Optional[str]` | Delete many with async BulkDelete or sequential single-record delete. | | `query_sql` | `query_sql(sql)` | `list[dict]` | Constrained read-only SELECT via `?sql=`. | -| `create_table` | `create_table(tablename, schema, solution_unique_name=None)` | `dict` | Creates custom table + columns. Friendly name (e.g. `SampleItem`) becomes schema `new_SampleItem`; explicit schema name (contains `_`) used as-is. Pass `solution_unique_name` to attach the table to a specific solution instead of the default solution. | -| `create_column` | `create_column(tablename, columns)` | `list[str]` | Adds columns using a `{name: type}` mapping (same shape as `create_table` schema). Returns schema names for the created columns. | -| `get_table_info` | `get_table_info(schema_name)` | `dict | None` | Basic table metadata by schema name (e.g. `new_SampleItem`). Friendly names not auto-converted. | -| `list_tables` | `list_tables()` | `list[dict]` | Lists non-private tables. | -| `delete_table` | `delete_table(tablename)` | `None` | Drops custom table. Accepts friendly or schema name; friendly converted to `new_`. | -| `delete_column` | `delete_column(tablename, columns)` | `list[str]` | Deletes one or more columns; returns schema names (accepts string or list[str]). | +| `create_table` | `create_table(logical_name, schema, solution_unique_name=None)` | `dict` | Creates custom table + columns. Requires logical name with publisher prefix (e.g. `new_sampleitem`) and column names with same prefix (e.g. `{"new_code": "string"}`). Pass `solution_unique_name` to attach the table to a specific solution instead of the default solution. | +| `create_columns` | `create_columns(logical_name, columns)` | `list[str]` | Adds columns using a `{name: type}` mapping with publisher prefix (e.g. `{"new_category": "string"}`). Returns logical names for the created columns. | +| `get_table_info` | `get_table_info(logical_name)` | `dict | None` | Basic table metadata by logical name (e.g. `new_sampleitem`). Lookup is case-insensitive. | +| `list_tables` | `list_tables()` | `list[str]` | Lists custom table logical names. | +| `delete_table` | `delete_table(logical_name)` | `None` | Drops custom table by logical name with publisher prefix (e.g. `new_sampleitem`). | +| `delete_columns` | `delete_columns(logical_name, columns)` | `list[str]` | Deletes one or more columns; returns logical names (accepts string or list[str]). | | `PandasODataClient.create_df` | `create_df(logical_name, series)` | `str` | Create one record (returns GUID). | | `PandasODataClient.update` | `update(logical_name, id, series)` | `None` | Returns None; ignored if Series empty. | | `PandasODataClient.get_ids` | `get_ids(logical_name, ids, select=None)` | `DataFrame` | One row per ID (errors inline). | @@ -301,21 +301,21 @@ class Status(IntEnum): # Create a simple custom table and a few columns info = client.create_table( - "SampleItem", # friendly name; defaults to SchemaName new_SampleItem + "new_sampleitem", # logical name with publisher prefix (e.g., "new_") { - "code": "string", - "count": "int", - "amount": "decimal", - "when": "datetime", - "active": "bool", - "status": Status, + "new_code": "string", + "new_count": "int", + "new_amount": "decimal", + "new_when": "datetime", + "new_active": "bool", + "new_status": Status, }, solution_unique_name="my_solution_unique_name", # optional: associate table with this solution ) # Create or delete columns -client.create_column("SampleItem", {"category": "string"}) # returns ["new_Category"] -client.delete_column("SampleItem", "category") # returns ["new_Category"] +client.create_columns("new_sampleitem", {"new_category": "string"}) # returns ["new_category"] +client.delete_columns("new_sampleitem", "new_category") # returns ["new_category"] logical = info["entity_logical_name"] # e.g., "new_sampleitem" @@ -329,7 +329,7 @@ rec_id = client.create(logical, {name_attr: "Sample A"})[0] # Clean up client.delete(logical, rec_id) # delete record -client.delete_table("SampleItem") # delete table (friendly name or explicit schema new_SampleItem) +client.delete_table("new_sampleitem") # delete table by logical name ``` Notes: diff --git a/examples/advanced/file_upload.py b/examples/advanced/file_upload.py index 67f3a39..6623707 100644 --- a/examples/advanced/file_upload.py +++ b/examples/advanced/file_upload.py @@ -152,18 +152,18 @@ def backoff(op, *, delays=(0,2,5,10), retry_status=(400,403,404,409,412,429,500, raise last # --------------------------- Table ensure --------------------------- -TABLE_SCHEMA_NAME = "new_FileSample" +TABLE_LOGICAL_NAME = "new_filesample" # If user wants new publisher prefix / naming, adjust above. def ensure_table(): - # Check by schema - existing = client.get_table_info(TABLE_SCHEMA_NAME) + # Check by logical name + existing = client.get_table_info(TABLE_LOGICAL_NAME) if existing: - print({"table": TABLE_SCHEMA_NAME, "existed": True}) + print({"table": TABLE_LOGICAL_NAME, "existed": True}) return existing - log("client.create_table('new_FileSample', schema={title})") - info = client.create_table(TABLE_SCHEMA_NAME, {"title": "string"}) - print({"table": TABLE_SCHEMA_NAME, "existed": False, "metadata_id": info.get('metadata_id')}) + log(f"client.create_table('{TABLE_LOGICAL_NAME}', schema={{'new_title': 'string'}})") + info = client.create_table(TABLE_LOGICAL_NAME, {"new_title": "string"}) + print({"table": TABLE_LOGICAL_NAME, "existed": False, "metadata_id": info.get('metadata_id')}) return info try: @@ -388,8 +388,8 @@ def get_dataset_info(file_path: Path): # --------------------------- Cleanup --------------------------- if cleanup_record and record_id: try: - log(f"client.delete('{entity_set}', '{record_id}')") - backoff(lambda: client.delete(entity_set, record_id)) + log(f"client.delete('{logical}', '{record_id}')") + backoff(lambda: client.delete(logical, record_id)) print({"record_deleted": True}) except Exception as e: # noqa: BLE001 print({"record_deleted": False, "error": str(e)}) @@ -398,8 +398,8 @@ def get_dataset_info(file_path: Path): if cleanup_table: try: - log(f"client.delete_table('{TABLE_SCHEMA_NAME}')") - client.delete_table(TABLE_SCHEMA_NAME) + log(f"client.delete_table('{TABLE_LOGICAL_NAME}')") + client.delete_table(TABLE_LOGICAL_NAME) print({"table_deleted": True}) except Exception as e: # noqa: BLE001 print({"table_deleted": False, "error": str(e)}) diff --git a/examples/advanced/pandas_integration.py b/examples/advanced/pandas_integration.py index a117ed9..d8a5884 100644 --- a/examples/advanced/pandas_integration.py +++ b/examples/advanced/pandas_integration.py @@ -54,8 +54,8 @@ def backoff_retry(op, *, delays=(0, 2, 5, 10, 20), retry_http_statuses=(400, 403 table_info = None created_this_run = False -# First check for existing table -existing = client.get_table_info("SampleItem") +# First check for existing table (use logical name - lowercase with prefix) +existing = client.get_table_info("new_sampleitem") if existing: table_info = existing created_this_run = False @@ -68,16 +68,16 @@ def backoff_retry(op, *, delays=(0, 2, 5, 10, 20), retry_http_statuses=(400, 403 }) else: - # Create it since it doesn't exist + # Create it since it doesn't exist (use logical name) try: table_info = client.create_table( - "SampleItem", + "new_sampleitem", { - "code": "string", - "count": "int", - "amount": "decimal", - "when": "datetime", - "active": "bool", + "new_code": "string", + "new_count": "int", + "new_amount": "decimal", + "new_when": "datetime", + "new_active": "bool", }, ) created_this_run = True if table_info and table_info.get("columns_created") else False @@ -217,10 +217,10 @@ def _retry_if(ex: Exception) -> bool: # 6) Cleanup: delete the custom table if it exists print("Cleanup (Metadata):") try: - # Delete if present, regardless of whether it was created in this run - info = client.get_table_info("SampleItem") + # Delete if present, regardless of whether it was created in this run (use logical name) + info = client.get_table_info("new_sampleitem") if info: - client.delete_table("SampleItem") + client.delete_table("new_sampleitem") print({"table_deleted": True}) else: print({"table_deleted": False, "reason": "not found"}) diff --git a/examples/basic/quickstart.py b/examples/basic/quickstart.py index f4da0f9..885b478 100644 --- a/examples/basic/quickstart.py +++ b/examples/basic/quickstart.py @@ -95,9 +95,11 @@ class Status(IntEnum): # Check for existing table using list_tables log_call("client.list_tables()") tables = client.list_tables() -existing_table = next((t for t in tables if t.get("SchemaName") == "new_SampleItem"), None) + +# LogicalName is always lowercase (SDK normalizes) +existing_table = next((t for t in tables if t.get("LogicalName", "") == "new_sampleitem"), None) if existing_table: - table_info = client.get_table_info("new_SampleItem") + table_info = client.get_table_info("new_sampleitem") created_this_run = False print({ "table": table_info.get("entity_schema"), @@ -106,20 +108,19 @@ class Status(IntEnum): "logical": table_info.get("entity_logical_name"), "metadata_id": table_info.get("metadata_id"), }) - else: # Create it since it doesn't exist try: - log_call("client.create_table('new_SampleItem', schema={code,count,amount,when,active,status})") + log_call("client.create_table('new_sampleitem', schema={new_code,new_count,new_amount,new_when,new_active,new_status})") table_info = client.create_table( - "new_SampleItem", + "new_sampleitem", { - "code": "string", - "count": "int", - "amount": "decimal", - "when": "datetime", - "active": "bool", - "status": Status, + "new_code": "string", + "new_count": "int", + "new_amount": "decimal", + "new_when": "datetime", + "new_active": "bool", + "new_status": Status, }, ) created_this_run = True if table_info and table_info.get("columns_created") else False @@ -146,11 +147,11 @@ class Status(IntEnum): pass # Fail fast: all operations must use the custom table sys.exit(1) -entity_schema = table_info.get("entity_schema") or "new_SampleItem" -logical = table_info.get("entity_logical_name") +entity_schema = table_info.get("entity_schema") or "new_Sampleitem" +logical = table_info.get("entity_logical_name") or "new_sampleitem" metadata_id = table_info.get("metadata_id") if not metadata_id: - refreshed_info = client.get_table_info(entity_schema) or {} + refreshed_info = client.get_table_info(logical) or {} metadata_id = refreshed_info.get("metadata_id") if metadata_id: table_info["metadata_id"] = metadata_id @@ -550,27 +551,28 @@ def run_paging_demo(label: str, *, top: Optional[int], page_size: Optional[int]) # 6) Column metadata helpers: column create/delete print("Column metadata helpers (create/delete column):") -scratch_column = f"scratch_{int(time.time())}" +scratch_column = f"new_scratch_{int(time.time())}" column_payload = {scratch_column: "string"} try: - log_call(f"client.create_column('{entity_schema}', {repr(column_payload)})") - column_create = client.create_columns(entity_schema, column_payload) + log_call(f"client.create_column('{logical}', {repr(column_payload)})") + column_create = client.create_columns(logical, column_payload) if not isinstance(column_create, list) or not column_create: raise RuntimeError("create_column did not return schema list") created_details = column_create if not all(isinstance(item, str) for item in created_details): raise RuntimeError("create_column entries were not schema strings") - attribute_schema = created_details[0] + # create_columns returns logical names odata_client = client._get_odata() + column_logical = created_details[0] exists_after_create = None exists_after_delete = None attr_type_before = None - if metadata_id and attribute_schema: + if metadata_id and column_logical: _ready_message = "Column metadata not yet available" def _metadata_after_create(): meta = odata_client._get_attribute_metadata( metadata_id, - attribute_schema, + column_logical, extra_select="@odata.type,AttributeType", ) if not meta or not meta.get("MetadataId"): @@ -588,11 +590,12 @@ def _metadata_after_create(): if isinstance(raw_type, str): attr_type_before = raw_type lowered = raw_type.lower() - delete_target = attribute_schema or scratch_column - log_call(f"client.delete_column('{entity_schema}', '{delete_target}')") + # For delete, we pass the logical name + delete_target = column_logical + log_call(f"client.delete_column('{logical}', '{delete_target}')") def _delete_column(): - return client.delete_columns(entity_schema, delete_target) + return client.delete_columns(logical, delete_target) column_delete = backoff_retry( _delete_column, @@ -609,12 +612,14 @@ def _delete_column(): deleted_details = column_delete if not all(isinstance(item, str) for item in deleted_details): raise RuntimeError("delete_column entries were not schema strings") - if attribute_schema not in deleted_details: + # deleted_details contains logical names (lowercase), so check for column_logical + if column_logical not in deleted_details: raise RuntimeError("delete_column response missing expected schema name") - if metadata_id and attribute_schema: + if metadata_id and column_logical: _delete_message = "Column metadata still present after delete" def _ensure_removed(): - meta = odata_client._get_attribute_metadata(metadata_id, attribute_schema) + # _get_attribute_metadata now accepts logical names + meta = odata_client._get_attribute_metadata(metadata_id, column_logical) if meta: raise RuntimeError(_delete_message) return True @@ -645,11 +650,11 @@ def _ensure_removed(): print("Cleanup (Metadata):") if delete_table_at_end: try: - log_call("client.get_table_info('new_SampleItem')") - info = client.get_table_info("new_SampleItem") + log_call("client.get_table_info('new_sampleitem')") + info = client.get_table_info("new_sampleitem") if info: - log_call("client.delete_table('new_SampleItem')") - client.delete_table("new_SampleItem") + log_call("client.delete_table('new_sampleitem')") + client.delete_table("new_sampleitem") print({"table_deleted": True}) else: print({"table_deleted": False, "reason": "not found"}) diff --git a/src/dataverse_sdk/client.py b/src/dataverse_sdk/client.py index 99dcbde..52eb48a 100644 --- a/src/dataverse_sdk/client.py +++ b/src/dataverse_sdk/client.py @@ -372,13 +372,12 @@ def query_sql(self, sql: str): return self._get_odata()._query_sql(sql) # Table metadata helpers - def get_table_info(self, tablename: str) -> Optional[Dict[str, Any]]: + def get_table_info(self, logical_name: str) -> Optional[Dict[str, Any]]: """ Get basic metadata for a custom table if it exists. - - :param tablename: Table friendly name (e.g. ``"SampleItem"``) or full schema name - (e.g. ``"new_SampleItem"``). - :type tablename: str + :param logical_name: Table logical name with publisher prefix (e.g. ``"new_sampleitem"``). + Lookup is case-insensitive (e.g., ``"new_SampleItem"`` will also work). + :type logical_name: str :return: Dictionary containing table metadata with keys ``entity_schema``, ``entity_logical_name``, ``entity_set_name``, and ``metadata_id``. @@ -388,27 +387,27 @@ def get_table_info(self, tablename: str) -> Optional[Dict[str, Any]]: Example: Retrieve table metadata:: - info = client.get_table_info("SampleItem") + info = client.get_table_info("new_sampleitem") if info: print(f"Logical name: {info['entity_logical_name']}") print(f"Entity set: {info['entity_set_name']}") """ - return self._get_odata()._get_table_info(tablename) + return self._get_odata()._get_table_info(logical_name) def create_table( self, - tablename: str, + logical_name: str, schema: Dict[str, Any], solution_unique_name: Optional[str] = None, ) -> Dict[str, Any]: """ Create a simple custom table with specified columns. - :param tablename: Table friendly name (e.g. ``"SampleItem"``) or full schema name - (e.g. ``"new_SampleItem"``). If a publisher prefix is not included, the default - publisher prefix will be applied. - :type tablename: str - :param schema: Dictionary mapping column logical names (without prefix) to their types. + :param logical_name: Table logical name with publisher prefix (e.g. ``"new_sampleitem"``). + Both table and column names must include the publisher prefix (default is ``"new_"``). + :type logical_name: str + :param schema: Dictionary mapping column logical names (with prefix) to their types. + Supported types: - Primitive types: ``"string"``, ``"int"``, ``"decimal"``, ``"float"``, ``"datetime"``, ``"bool"`` @@ -432,10 +431,11 @@ class ItemStatus(IntEnum): ``entity_set_name``, ``entity_logical_name``, ``metadata_id``, and ``columns_created``. :rtype: dict - :raises ~dataverse_sdk.errors.MetadataError: If table creation fails or the schema is invalid. + :raises ~dataverse_sdk.errors.HttpError: If server rejects the metadata (invalid names, missing prefix, etc.). + :raises ~dataverse_sdk.errors.MetadataError: If table creation fails. Example: - Create a table with simple columns:: + Create a table with simple columns (using default publisher prefix ``new_``):: from enum import IntEnum @@ -444,30 +444,27 @@ class ItemStatus(IntEnum): INACTIVE = 2 schema = { - "title": "string", - "quantity": "int", - "price": "decimal", - "available": "bool", - "status": ItemStatus + "new_title": "string", + "new_quantity": "int", + "new_price": "decimal", + "new_available": "bool", + "new_status": ItemStatus } - - result = client.create_table("SampleItem", schema) + result = client.create_table("new_sampleitem", schema) print(f"Created table: {result['entity_logical_name']}") print(f"Columns: {result['columns_created']}") """ return self._get_odata()._create_table( - tablename, + logical_name, schema, solution_unique_name, ) - def delete_table(self, tablename: str) -> None: + def delete_table(self, logical_name: str) -> None: """ - Delete a custom table by name. - - :param tablename: Table friendly name (e.g. ``"SampleItem"``) or full schema name - (e.g. ``"new_SampleItem"``). - :type tablename: str + Delete a custom table by logical name. + :param logical_name: Table logical name with publisher prefix (e.g. ``"new_sampleitem"``). + :type logical_name: str :raises ~dataverse_sdk.errors.MetadataError: If the table does not exist or deletion fails. @@ -478,9 +475,9 @@ def delete_table(self, tablename: str) -> None: Example: Delete a custom table:: - client.delete_table("SampleItem") + client.delete_table("new_sampleitem") """ - self._get_odata()._delete_table(tablename) + self._get_odata()._delete_table(logical_name) def list_tables(self) -> list[str]: """ @@ -500,63 +497,59 @@ def list_tables(self) -> list[str]: def create_columns( self, - tablename: str, + logical_name: str, columns: Dict[str, Any], ) -> List[str]: """ Create one or more columns on an existing table using a schema-style mapping. - :param tablename: Friendly name ("SampleItem") or full schema name ("new_SampleItem"). - :type tablename: str - :param columns: Mapping of logical names (without prefix) to supported types. Primitive types include + :param logical_name: Table logical name with publisher prefix (e.g. ``"new_sampleitem"``). + :type logical_name: str + :param columns: Mapping of column names with publisher prefix to supported types. Primitive types include ``string``, ``int``, ``decimal``, ``float``, ``datetime``, and ``bool``. Enum subclasses (IntEnum preferred) generate a local option set and can specify localized labels via ``__labels__``. :type columns: Dict[str, Any] - :returns: Schema names for the columns that were created. + :returns: Logical names for the columns that were created. :rtype: list[str] Example: Create two columns on the custom table:: - created = client.create_columns( - "new_SampleItem", + "new_sampleitem", { - "scratch": "string", - "flags": "bool", + "new_scratch": "string", + "new_flags": "bool", }, ) print(created) """ return self._get_odata()._create_columns( - tablename, + logical_name, columns, ) def delete_columns( self, - tablename: str, + logical_name: str, columns: Union[str, List[str]], ) -> List[str]: """ Delete one or more columns from a table. - - :param tablename: Friendly or schema name of the table. - :type tablename: str - :param columns: Column name or list of column names to remove. Friendly names are normalized to schema - names using the same prefix logic as ``create_columns``. + :param logical_name: Table logical name with publisher prefix (e.g. ``"new_sampleitem"``). + :type logical_name: str + :param columns: Column name or list of column names to remove with publisher prefix (e.g. ``"new_scratch"``). :type columns: str | list[str] - :returns: Schema names for the columns that were removed. + :returns: Logical names for the columns that were removed. :rtype: list[str] Example: - Remove two custom columns by schema name: - + Remove two custom columns: removed = client.delete_columns( - "new_SampleItem", - ["new_Scratch", "new_Flags"], + "new_sampleitem", + ["new_scratch", "new_flags"], ) print(removed) """ return self._get_odata()._delete_columns( - tablename, + logical_name, columns, ) diff --git a/src/dataverse_sdk/data/odata.py b/src/dataverse_sdk/data/odata.py index 87551c3..b518301 100644 --- a/src/dataverse_sdk/data/odata.py +++ b/src/dataverse_sdk/data/odata.py @@ -3,7 +3,7 @@ from __future__ import annotations -from typing import Any, Dict, Optional, List, Union, Iterable +from typing import Any, Dict, Optional, List, Union, Iterable, TypedDict from enum import Enum import unicodedata import time @@ -20,6 +20,11 @@ _GUID_RE = re.compile(r"[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}") +class TableMetadata(TypedDict): + """Entity metadata cached for efficient lookups.""" + entity_set_name: str + schema_name: str + primary_id_attribute: str class ODataClient(ODataFileUpload): """Dataverse Web API client: CRUD, SQL-over-API, and table metadata helpers.""" @@ -28,6 +33,17 @@ class ODataClient(ODataFileUpload): def _escape_odata_quotes(value: str) -> str: """Escape single quotes for OData queries (by doubling them).""" return value.replace("'", "''") + + @staticmethod + def _normalize_logical_name(logical_name: str) -> str: + """Normalize logical name to lowercase for case-insensitive lookups. + + Dataverse logical names are always lowercase by convention. This helper ensures + case-insensitive behavior for user inputs (e.g., 'new_SampleItem' -> 'new_sampleitem'). + """ + if not isinstance(logical_name, str): + raise TypeError(f"logical_name must be str, got {type(logical_name).__name__}") + return logical_name.lower().strip() def __init__( self, @@ -46,10 +62,10 @@ def __init__( backoff=self.config.http_backoff, timeout=self.config.http_timeout, ) - # Cache: logical name -> entity set name (plural) resolved from metadata - self._logical_to_entityset_cache: dict[str, str] = {} - # Cache: logical name -> primary id attribute (e.g. accountid) - self._logical_primaryid_cache: dict[str, str] = {} + # Unified entity metadata cache: logical name -> {entity_set_name, schema_name, primary_id_attribute} + self._entity_metadata_cache: dict[str, TableMetadata] = {} + # Cache: (entity_logical, attribute_logical) -> attribute SchemaName + self._attribute_schema_cache: dict[tuple[str, str], str] = {} # Picklist label cache: (logical_name, attribute_logical) -> {'map': {...}, 'ts': epoch_seconds} self._picklist_label_cache = {} self._picklist_cache_ttl_seconds = 3600 # 1 hour TTL @@ -239,14 +255,21 @@ def _create_multiple(self, entity_set: str, logical_name: str, records: List[Dic # --- Derived helpers for high-level client ergonomics --- def _primary_id_attr(self, logical_name: str) -> str: """Return primary key attribute using metadata; error if unavailable.""" - pid = self._logical_primaryid_cache.get(logical_name) + # Normalize logical name for case-insensitive lookup + logical_name = self._normalize_logical_name(logical_name) + + # Get from unified metadata cache + if logical_name in self._entity_metadata_cache: + pid = self._entity_metadata_cache[logical_name].get('primary_id_attribute') + if pid: + return pid + + # Populate cache by resolving entity metadata + metadata = self._get_entity_metadata(logical_name) + pid = metadata.get('primary_id_attribute') if pid: return pid - # Resolve metadata (populates _logical_primaryid_cache or raises if logical unknown) - self._entity_set_from_logical(logical_name) - pid2 = self._logical_primaryid_cache.get(logical_name) - if pid2: - return pid2 + raise RuntimeError( f"PrimaryIdAttribute not resolved for logical name '{logical_name}'. Metadata did not include PrimaryIdAttribute." ) @@ -617,18 +640,34 @@ def _extract_logical_table(sql: str) -> str: # ---------------------- Entity set resolution ----------------------- def _entity_set_from_logical(self, logical: str) -> str: """Resolve entity set name (plural) from a logical (singular) name using metadata. - Caches results for subsequent SQL queries. """ if not logical: raise ValueError("logical name required") - cached = self._logical_to_entityset_cache.get(logical) - if cached: - return cached + + metadata = self._get_entity_metadata(logical) + return metadata['entity_set_name'] + + # ---------------------- Table metadata helpers ---------------------- + + def _get_entity_metadata(self, logical_name: str) -> TableMetadata: + """Get entity metadata (EntitySetName, SchemaName, PrimaryIdAttribute) with caching. + + Returns a TableMetadata dict with keys: 'entity_set_name', 'schema_name', 'primary_id_attribute' + Raises MetadataError if entity not found. + """ + # Normalize to lowercase for case-insensitive lookup + logical_name = self._normalize_logical_name(logical_name) + + # Check cache first + if logical_name in self._entity_metadata_cache: + return self._entity_metadata_cache[logical_name] + + # Look up from server url = f"{self.api}/EntityDefinitions" - logical_escaped = self._escape_odata_quotes(logical) + logical_escaped = self._escape_odata_quotes(logical_name) params = { - "$select": "LogicalName,EntitySetName,PrimaryIdAttribute", + "$select": "LogicalName,EntitySetName,SchemaName,PrimaryIdAttribute", "$filter": f"LogicalName eq '{logical_escaped}'", } r = self._request("get", url, params=params) @@ -637,26 +676,130 @@ def _entity_set_from_logical(self, logical: str) -> str: items = body.get("value", []) if isinstance(body, dict) else [] except ValueError: items = [] + if not items: - plural_hint = " (did you pass a plural entity set name instead of the singular logical name?)" if logical.endswith("s") and not logical.endswith("ss") else "" + plural_hint = " (did you pass a plural entity set name instead of the singular logical name?)" if logical_name.endswith("s") and not logical_name.endswith("ss") else "" raise MetadataError( - f"Unable to resolve entity set for logical name '{logical}'. Provide the singular logical name.{plural_hint}", + f"Unable to resolve entity metadata for logical name '{logical_name}'. Provide the singular logical name.{plural_hint}", subcode=ec.METADATA_ENTITYSET_NOT_FOUND, ) + md = items[0] - es = md.get("EntitySetName") - if not es: + entity_set_name = md.get("EntitySetName") + schema_name = md.get("SchemaName") + primary_id_attr = md.get("PrimaryIdAttribute") + + if not entity_set_name: raise MetadataError( - f"Metadata response missing EntitySetName for logical '{logical}'.", + f"Metadata response missing EntitySetName for logical '{logical_name}'.", subcode=ec.METADATA_ENTITYSET_NAME_MISSING, ) - self._logical_to_entityset_cache[logical] = es - primary_id_attr = md.get("PrimaryIdAttribute") - if isinstance(primary_id_attr, str) and primary_id_attr: - self._logical_primaryid_cache[logical] = primary_id_attr - return es - # ---------------------- Table metadata helpers ---------------------- + if not schema_name: + raise MetadataError( + f"Metadata response missing SchemaName for logical '{logical_name}'.", + subcode=ec.METADATA_TABLE_NOT_FOUND, + ) + + # Cache all metadata together using TypedDict + metadata: TableMetadata = { + 'entity_set_name': entity_set_name, + 'schema_name': schema_name, + 'primary_id_attribute': primary_id_attr or '', + } + self._entity_metadata_cache[logical_name] = metadata + return metadata + + def _logical_to_schema_name(self, logical_name: str) -> str: + """Convert logical name (new_sampleitem) to SchemaName (new_Sampleitem) for CREATE operations. + + Use this ONLY when creating new entities where we control the SchemaName. + For existing entities, use _get_entity_schema_name() to get the actual SchemaName from server. + + Note: All callers (_create_table, _create_columns) validate that logical_name contains + an underscore before calling this method, enforcing Dataverse's publisher prefix requirement. + """ + # Normalize logical name first + logical_name = self._normalize_logical_name(logical_name) + + # Validate not empty after normalization + if not logical_name: + raise ValueError("logical_name cannot be empty or whitespace-only") + + # Split on first underscore to get prefix and name parts + prefix, rest = logical_name.split("_", 1) + + # Validate that rest is not empty (e.g., "new_" is invalid) + if not rest: + raise ValueError(f"logical_name '{logical_name}' has empty part after underscore (expected format: 'prefix_name')") + + # Capitalize first letter of the part after prefix + return f"{prefix}_{rest[:1].upper()}{rest[1:]}" + + def _get_entity_schema_name(self, logical_name: str) -> str: + """Get actual SchemaName for an existing entity. + + Returns the SchemaName as stored in Dataverse (which may have different casing than + our _logical_to_schema_name() conversion would produce). + + Raises MetadataError if entity not found. + """ + metadata = self._get_entity_metadata(logical_name) + return metadata['schema_name'] + + def _get_attribute_schema_name(self, entity_logical: str, attribute_logical: str) -> str: + """Get actual SchemaName for an existing attribute by looking it up from server. + + Raises MetadataError if attribute not found. + """ + # Normalize entity logical name for case-insensitive lookup + entity_logical = self._normalize_logical_name(entity_logical) + # Normalize attribute logical name for case-insensitive lookup + attribute_logical = self._normalize_logical_name(attribute_logical) + + cache_key = (entity_logical, attribute_logical) + + # Check cache first + if cache_key in self._attribute_schema_cache: + return self._attribute_schema_cache[cache_key] + + # Get entity metadata ID first + ent = self._get_entity_by_logical_name(entity_logical) + if not ent or not ent.get("MetadataId"): + raise MetadataError( + f"Entity '{entity_logical}' not found.", + subcode=ec.METADATA_TABLE_NOT_FOUND, + ) + + metadata_id = ent["MetadataId"] + + # Look up attribute by LogicalName + attr_escaped = self._escape_odata_quotes(attribute_logical) + url = f"{self.api}/EntityDefinitions({metadata_id})/Attributes" + params = { + "$select": "LogicalName,SchemaName", + "$filter": f"LogicalName eq '{attr_escaped}'", + } + r = self._request("get", url, params=params) + items = r.json().get("value", []) + + if not items: + raise MetadataError( + f"Attribute '{attribute_logical}' not found on entity '{entity_logical}'.", + subcode=ec.METADATA_COLUMN_NOT_FOUND, + ) + + schema_name = items[0].get("SchemaName") + if not schema_name: + raise MetadataError( + f"SchemaName missing for attribute '{attribute_logical}' on entity '{entity_logical}'.", + subcode=ec.METADATA_COLUMN_NOT_FOUND, + ) + + # Cache it + self._attribute_schema_cache[cache_key] = schema_name + return schema_name + def _label(self, text: str) -> Dict[str, Any]: lang = int(self.config.language_code) return { @@ -670,15 +813,6 @@ def _label(self, text: str) -> Dict[str, Any]: ], } - def _to_pascal(self, name: str) -> str: - parts = re.split(r"[^A-Za-z0-9]+", name) - return "".join(p[:1].upper() + p[1:] for p in parts if p) - - def _normalize_entity_schema(self, tablename: str) -> str: - if "_" in tablename: - return tablename - return f"new_{self._to_pascal(tablename)}" - def _get_entity_by_schema( self, schema_name: str, @@ -695,10 +829,30 @@ def _get_entity_by_schema( items = r.json().get("value", []) return items[0] if items else None + def _get_entity_by_logical_name( + self, + logical_name: str, + headers: Optional[Dict[str, str]] = None, + ) -> Optional[Dict[str, Any]]: + """Lookup entity by LogicalName (case-insensitive).""" + # Normalize to lowercase for case-insensitive lookup + logical_name = self._normalize_logical_name(logical_name) + + url = f"{self.api}/EntityDefinitions" + # Escape single quotes in logical name + logical_escaped = self._escape_odata_quotes(logical_name) + params = { + "$select": "MetadataId,LogicalName,SchemaName,EntitySetName", + "$filter": f"LogicalName eq '{logical_escaped}'", + } + r = self._request("get", url, params=params, headers=headers) + items = r.json().get("value", []) + return items[0] if items else None + def _create_entity( self, schema_name: str, - display_name: str, + logical_name: str, attributes: List[Dict[str, Any]], solution_unique_name: Optional[str] = None, ) -> Dict[str, Any]: @@ -706,9 +860,10 @@ def _create_entity( payload = { "@odata.type": "Microsoft.Dynamics.CRM.EntityMetadata", "SchemaName": schema_name, - "DisplayName": self._label(display_name), - "DisplayCollectionName": self._label(display_name + "s"), - "Description": self._label(f"Custom entity for {display_name}"), + "LogicalName": logical_name, + "DisplayName": self._label(logical_name), + "DisplayCollectionName": self._label(logical_name + "s"), + "Description": self._label(f"Custom entity for {logical_name}"), "OwnershipType": "UserOwned", "HasActivities": False, "HasNotes": True, @@ -719,13 +874,13 @@ def _create_entity( if solution_unique_name: params = {"SolutionUniqueName": solution_unique_name} self._request("post", url, json=payload, params=params) - ent = self._get_entity_by_schema( - schema_name, + ent = self._get_entity_by_logical_name( + logical_name, headers={"Consistency": "Strong"}, ) if not ent or not ent.get("EntitySetName"): raise RuntimeError( - f"Failed to create or retrieve entity '{schema_name}' (EntitySetName not available)." + f"Failed to create or retrieve entity '{logical_name}' (EntitySetName not available)." ) if not ent.get("MetadataId"): raise RuntimeError( @@ -733,23 +888,26 @@ def _create_entity( ) return ent - def _normalize_attribute_schema(self, entity_schema: str, column_name: str) -> str: - # Use same publisher prefix segment as entity_schema if present; else default to 'new_'. - if not isinstance(column_name, str) or not column_name.strip(): - raise ValueError("column_name must be a non-empty string") - publisher = entity_schema.split("_", 1)[0] if "_" in entity_schema else "new" - expected_prefix = f"{publisher}_" - if column_name.lower().startswith(expected_prefix.lower()): - return column_name - return f"{publisher}_{self._to_pascal(column_name)}" - def _get_attribute_metadata( self, entity_metadata_id: str, - schema_name: str, + logical_name: str, extra_select: Optional[str] = None, ) -> Optional[Dict[str, Any]]: - attr_escaped = self._escape_odata_quotes(schema_name) + """Get attribute metadata by logical name. + + Args: + entity_metadata_id: The MetadataId of the entity + logical_name: The logical name of the attribute (e.g., "new_category") + extra_select: Optional comma-separated list of additional fields to select + + Returns: + Dict with attribute metadata, or None if not found + """ + # Normalize logical name for case-insensitive lookup + logical_name = self._normalize_logical_name(logical_name) + + attr_escaped = self._escape_odata_quotes(logical_name) url = f"{self.api}/EntityDefinitions({entity_metadata_id})/Attributes" select_fields = ["MetadataId", "LogicalName", "SchemaName"] if extra_select: @@ -763,7 +921,7 @@ def _get_attribute_metadata( select_fields.append(piece) params = { "$select": ",".join(select_fields), - "$filter": f"SchemaName eq '{attr_escaped}'", + "$filter": f"LogicalName eq '{attr_escaped}'", } r = self._request("get", url, params=params) try: @@ -1108,24 +1266,25 @@ def _attribute_payload(self, schema_name: str, dtype: Any, *, is_primary_name: b } return None - def _get_table_info(self, tablename: str) -> Optional[Dict[str, Any]]: + def _get_table_info(self, logical_name: str) -> Optional[Dict[str, Any]]: """Return basic metadata for a custom table if it exists. - Parameters ---------- - tablename : str - Friendly name or full schema name (with publisher prefix and underscore). - + logical_name : str + Table logical name with publisher prefix (e.g. ``"new_sampleitem"``). Returns ------- dict | None Metadata summary or ``None`` if not found. """ - ent = self._get_entity_by_schema(tablename) + # Normalize logical name for case-insensitive handling + logical_name = self._normalize_logical_name(logical_name) + + ent = self._get_entity_by_logical_name(logical_name) if not ent: return None return { - "entity_schema": ent.get("SchemaName") or tablename, + "entity_schema": ent.get("SchemaName") or logical_name, "entity_logical_name": ent.get("LogicalName"), "entity_set_name": ent.get("EntitySetName"), "metadata_id": ent.get("MetadataId"), @@ -1141,12 +1300,15 @@ def _list_tables(self) -> List[Dict[str, Any]]: r = self._request("get", url, params=params) return r.json().get("value", []) - def _delete_table(self, tablename: str) -> None: - entity_schema = self._normalize_entity_schema(tablename) - ent = self._get_entity_by_schema(entity_schema) + def _delete_table(self, logical_name: str) -> None: + # Normalize logical name for case-insensitive handling + logical_name = self._normalize_logical_name(logical_name) + + # Use strong consistency to ensure we get latest metadata after recent create + ent = self._get_entity_by_logical_name(logical_name, headers={"Consistency": "Strong"}) if not ent or not ent.get("MetadataId"): raise MetadataError( - f"Table '{entity_schema}' not found.", + f"Table '{logical_name}' not found.", subcode=ec.METADATA_TABLE_NOT_FOUND, ) metadata_id = ent["MetadataId"] @@ -1155,66 +1317,72 @@ def _delete_table(self, tablename: str) -> None: def _create_table( self, - tablename: str, + logical_name: str, schema: Dict[str, Any], solution_unique_name: Optional[str] = None, ) -> Dict[str, Any]: - # Accept a friendly name and construct a default schema under 'new_'. - # If a full SchemaName is passed (contains '_'), use as-is. - entity_schema = self._normalize_entity_schema(tablename) - - ent = self._get_entity_by_schema(entity_schema) - if ent: - raise MetadataError( - f"Table '{entity_schema}' already exists.", - subcode=ec.METADATA_TABLE_ALREADY_EXISTS, - ) + """Create a custom table. Server validates naming requirements (prefixes, format, etc.).""" + # Normalize logical name for case-insensitive handling + logical_name = self._normalize_logical_name(logical_name) created_cols: List[str] = [] - primary_attr_schema = "new_Name" if "_" not in entity_schema else f"{entity_schema.split('_',1)[0]}_Name" attributes: List[Dict[str, Any]] = [] - attributes.append(self._attribute_payload(primary_attr_schema, "string", is_primary_name=True)) + + # Extract publisher prefix from table name (e.g., "new_sampleitem" -> "new") + if "_" not in logical_name: + raise ValueError(f"Table logical name must include publisher prefix (e.g., 'new_tablename'), got: '{logical_name}'") + prefix = logical_name.split("_", 1)[0] + primary_name_schema = f"{prefix}_Name" + + attributes.append(self._attribute_payload(primary_name_schema, "string", is_primary_name=True)) for col_name, dtype in schema.items(): - attr_schema = self._normalize_attribute_schema(entity_schema, col_name) - payload = self._attribute_payload(attr_schema, dtype) + # Normalize column logical name for case-insensitive handling + col_name_normalized = self._normalize_logical_name(col_name) + # Convert column logical name to SchemaName (e.g., "new_code" -> "new_Code") + col_schema = self._logical_to_schema_name(col_name_normalized) + payload = self._attribute_payload(col_schema, dtype) if not payload: raise ValueError(f"Unsupported column type '{dtype}' for '{col_name}'.") attributes.append(payload) - created_cols.append(attr_schema) + created_cols.append(col_name_normalized) if solution_unique_name is not None: - if not isinstance(solution_unique_name, str): - raise TypeError("solution_unique_name must be a string when provided") - if not solution_unique_name: - raise ValueError("solution_unique_name cannot be empty") - - metadata = self._create_entity( - entity_schema, - tablename, + if not isinstance(solution_unique_name, str) or not solution_unique_name: + raise ValueError("solution_unique_name must be a non-empty string") + + # Convert logical name to SchemaName (e.g., "new_sampleitem" -> "new_Sampleitem") + schema_name = self._logical_to_schema_name(logical_name) + + ent = self._create_entity( + schema_name, + logical_name, attributes, solution_unique_name, ) return { - "entity_schema": entity_schema, - "entity_logical_name": metadata.get("LogicalName"), - "entity_set_name": metadata.get("EntitySetName"), - "metadata_id": metadata.get("MetadataId"), + "entity_schema": ent.get("SchemaName"), + "entity_logical_name": ent.get("LogicalName"), + "entity_set_name": ent.get("EntitySetName"), + "metadata_id": ent.get("MetadataId"), "columns_created": created_cols, } def _create_columns( self, - tablename: str, + logical_name: str, columns: Dict[str, Any], ) -> List[str]: if not isinstance(columns, dict) or not columns: raise TypeError("columns must be a non-empty dict[name -> type]") - entity_schema = self._normalize_entity_schema(tablename) - ent = self._get_entity_by_schema(entity_schema) + + # Normalize logical name for case-insensitive handling + logical_name = self._normalize_logical_name(logical_name) + + ent = self._get_entity_by_logical_name(logical_name) if not ent or not ent.get("MetadataId"): raise MetadataError( - f"Table '{entity_schema}' not found.", + f"Table '{logical_name}' not found.", subcode=ec.METADATA_TABLE_NOT_FOUND, ) @@ -1223,27 +1391,33 @@ def _create_columns( needs_picklist_flush = False for column_name, column_type in columns.items(): - schema_name = self._normalize_attribute_schema(entity_schema, column_name) - payload = self._attribute_payload(schema_name, column_type) + # Normalize column logical name for case-insensitive handling + column_name_normalized = self._normalize_logical_name(column_name) + + # Validate that column name includes publisher prefix + if "_" not in column_name_normalized: + raise ValueError(f"Column logical name must include publisher prefix (e.g., 'new_columnname'), got: '{column_name}'") + + # Convert column logical name to SchemaName + col_schema = self._logical_to_schema_name(column_name_normalized) + payload = self._attribute_payload(col_schema, column_type) if not payload: - raise ValueError(f"Unsupported column type '{column_type}' for '{schema_name}'.") + raise ValueError(f"Unsupported column type '{column_type}' for '{column_name}'.") url = f"{self.api}/EntityDefinitions({metadata_id})/Attributes" self._request("post", url, json=payload) - created.append(schema_name) + created.append(column_name_normalized) if "OptionSet" in payload: needs_picklist_flush = True - if needs_picklist_flush: self._flush_cache("picklist") - return created def _delete_columns( self, - tablename: str, + logical_name: str, columns: Union[str, List[str]], ) -> List[str]: if isinstance(columns, str): @@ -1252,16 +1426,17 @@ def _delete_columns( names = columns else: raise TypeError("columns must be str or list[str]") - for name in names: if not isinstance(name, str) or not name.strip(): raise ValueError("column names must be non-empty strings") - entity_schema = self._normalize_entity_schema(tablename) - ent = self._get_entity_by_schema(entity_schema) + # Normalize logical name for case-insensitive handling + logical_name = self._normalize_logical_name(logical_name) + + ent = self._get_entity_by_logical_name(logical_name) if not ent or not ent.get("MetadataId"): raise MetadataError( - f"Table '{entity_schema}' not found.", + f"Table '{logical_name}' not found.", subcode=ec.METADATA_TABLE_NOT_FOUND, ) @@ -1270,34 +1445,34 @@ def _delete_columns( needs_picklist_flush = False for column_name in names: - schema_name = self._normalize_attribute_schema(entity_schema, column_name) - attr_meta = self._get_attribute_metadata(metadata_id, schema_name, extra_select="@odata.type,AttributeType") + # Normalize column name for case-insensitive handling + column_name_normalized = self._normalize_logical_name(column_name) + # Get attribute metadata by logical name + attr_meta = self._get_attribute_metadata(metadata_id, column_name_normalized, extra_select="@odata.type,AttributeType") if not attr_meta: raise MetadataError( - f"Column '{schema_name}' not found on table '{entity_schema}'.", + f"Column '{column_name}' not found on table '{logical_name}'.", subcode=ec.METADATA_COLUMN_NOT_FOUND, ) attr_metadata_id = attr_meta.get("MetadataId") if not attr_metadata_id: raise RuntimeError( - f"Metadata incomplete for column '{schema_name}' (missing MetadataId)." + f"Metadata incomplete for column '{column_name}' (missing MetadataId)." ) attr_url = f"{self.api}/EntityDefinitions({metadata_id})/Attributes({attr_metadata_id})" self._request("delete", attr_url, headers={"If-Match": "*"}) - attr_type = attr_meta.get("@odata.type") or attr_meta.get("AttributeType") if isinstance(attr_type, str): attr_type_l = attr_type.lower() if "picklist" in attr_type_l or "optionset" in attr_type_l: needs_picklist_flush = True - deleted.append(schema_name) + deleted.append(column_name_normalized) if needs_picklist_flush: self._flush_cache("picklist") - return deleted # ---------------------- Cache maintenance ------------------------- diff --git a/tests/unit/core/test_http_errors.py b/tests/unit/core/test_http_errors.py index 4c3a0e0..d0227de 100644 --- a/tests/unit/core/test_http_errors.py +++ b/tests/unit/core/test_http_errors.py @@ -5,39 +5,13 @@ from dataverse_sdk.core.errors import HttpError from dataverse_sdk.core import error_codes as ec from dataverse_sdk.data.odata import ODataClient - -class DummyAuth: - def acquire_token(self, scope): - class T: access_token = "x" - return T() - -class DummyHTTP: - def __init__(self, responses): - self._responses = responses - def request(self, method, url, **kwargs): - if not self._responses: - raise AssertionError("No more responses") - status, headers, body = self._responses.pop(0) - class R: - pass - r = R() - r.status_code = status - r.headers = headers - if isinstance(body, dict): - import json - r.text = json.dumps(body) - def json_func(): return body - r.json = json_func - else: - r.text = body or "" - def json_fail(): raise ValueError("non-json") - r.json = json_fail - return r +from tests.unit.test_helpers import DummyAuth, DummyHTTPClient class TestClient(ODataClient): + """Test client for HTTP error testing.""" def __init__(self, responses): super().__init__(DummyAuth(), "https://org.example", None) - self._http = DummyHTTP(responses) + self._http = DummyHTTPClient(responses) # --- Tests --- diff --git a/tests/unit/data/test_enum_optionset_payload.py b/tests/unit/data/test_enum_optionset_payload.py index ca58a38..4b35335 100644 --- a/tests/unit/data/test_enum_optionset_payload.py +++ b/tests/unit/data/test_enum_optionset_payload.py @@ -5,22 +5,7 @@ from enum import Enum, IntEnum from dataverse_sdk.data.odata import ODataClient - -class DummyAuth: - def acquire_token(self, scope): # pragma: no cover - simple stub - class T: - access_token = "token" - return T() - -class DummyConfig: - """Minimal config stub providing attributes ODataClient.__init__ expects.""" - - def __init__(self, language_code=1033): - self.language_code = language_code - # HTTP settings referenced during ODataClient construction - self.http_retries = 0 - self.http_backoff = 0 - self.http_timeout = 5 +from tests.unit.test_helpers import DummyAuth, DummyConfig def _make_client(lang=1033): return ODataClient(DummyAuth(), "https://org.example", DummyConfig(language_code=lang)) diff --git a/tests/unit/data/test_logical_crud.py b/tests/unit/data/test_logical_crud.py index c3233ea..ab7a369 100644 --- a/tests/unit/data/test_logical_crud.py +++ b/tests/unit/data/test_logical_crud.py @@ -1,69 +1,17 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT license. -import types import pytest -from dataverse_sdk.data.odata import ODataClient from dataverse_sdk.core.errors import MetadataError +from tests.unit.test_helpers import ( + TestableClient, + MD_ACCOUNT, + make_entity_create_headers, + make_entity_metadata +) -class DummyAuth: - def acquire_token(self, scope): - class T: access_token = "x" - return T() - -class DummyHTTPClient: - def __init__(self, responses): - self._responses = responses - self.calls = [] - def request(self, method, url, **kwargs): - self.calls.append((method, url, kwargs)) - if not self._responses: - raise AssertionError("No more dummy responses configured") - status, headers, body = self._responses.pop(0) - resp = types.SimpleNamespace() - resp.status_code = status - resp.headers = headers - resp.text = "" if body is None else ("{}" if isinstance(body, dict) else str(body)) - def raise_for_status(): - if status >= 400: - raise RuntimeError(f"HTTP {status}") - return None - def json_func(): - return body if isinstance(body, dict) else {} - resp.raise_for_status = raise_for_status - resp.json = json_func - return resp - -class TestableClient(ODataClient): - def __init__(self, responses): - super().__init__(DummyAuth(), "https://org.example", None) - self._http = DummyHTTPClient(responses) - def _convert_labels_to_ints(self, logical_name, record): # pragma: no cover - test shim - return record - -# Helper metadata response for logical name resolution -MD_ACCOUNT = { - "value": [ - { - "LogicalName": "account", - "EntitySetName": "accounts", - "PrimaryIdAttribute": "accountid" - } - ] -} - -MD_SAMPLE = { - "value": [ - { - "LogicalName": "new_sampleitem", - "EntitySetName": "new_sampleitems", - "PrimaryIdAttribute": "new_sampleitemid" - } - ] -} - -def make_entity_create_headers(entity_set, guid): - return {"OData-EntityId": f"https://org.example/api/data/v9.2/{entity_set}({guid})"} +# Additional metadata for this test file +MD_SAMPLE = make_entity_metadata("new_sampleitem", "new_sampleitems", "new_Sampleitem", "new_sampleitemid") def test_single_create_update_delete_get(): diff --git a/tests/unit/data/test_naming_normalization.py b/tests/unit/data/test_naming_normalization.py new file mode 100644 index 0000000..0c23d39 --- /dev/null +++ b/tests/unit/data/test_naming_normalization.py @@ -0,0 +1,689 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +""" +Unit tests for logical name normalization and SchemaName resolution. + +Tests the case-insensitive logical name handling, SchemaName lookup caching, +and unified metadata cache introduced for explicit naming enforcement. +""" + +import pytest +from dataverse_sdk.core.errors import MetadataError +from tests.unit.test_helpers import ( + TestableClient, + MD_ACCOUNT, + MD_SAMPLE_ITEM +) + +# ============================================================================ +# Test Data - Additional Metadata Responses for this test file +# ============================================================================ + +MD_ENTITY_BY_LOGICAL = { + "LogicalName": "new_sampleitem", + "EntitySetName": "new_sampleitems", + "SchemaName": "new_Sampleitem", + "MetadataId": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee" +} + +MD_ATTRIBUTE_TITLE = { + "value": [ + { + "LogicalName": "new_title", + "SchemaName": "new_Title" + } + ] +} + + +# ============================================================================ +# Tests for _normalize_logical_name +# ============================================================================ + +def test_normalize_logical_name_lowercase(): + """Test that _normalize_logical_name converts to lowercase.""" + c = TestableClient([]) + assert c._normalize_logical_name("NEW_SAMPLEITEM") == "new_sampleitem" + assert c._normalize_logical_name("New_SampleItem") == "new_sampleitem" + assert c._normalize_logical_name("new_sampleitem") == "new_sampleitem" + + +def test_normalize_logical_name_strips_whitespace(): + """Test that _normalize_logical_name strips whitespace.""" + c = TestableClient([]) + assert c._normalize_logical_name(" new_sampleitem ") == "new_sampleitem" + assert c._normalize_logical_name("\tnew_sampleitem\n") == "new_sampleitem" + + +def test_normalize_logical_name_empty(): + """Test that _normalize_logical_name handles empty strings.""" + c = TestableClient([]) + assert c._normalize_logical_name("") == "" + assert c._normalize_logical_name(" ") == "" + + +# ============================================================================ +# Tests for _logical_to_schema_name +# ============================================================================ + +def test_logical_to_schema_name_basic(): + """Test PascalCase conversion for new entities.""" + c = TestableClient([]) + # new_sampleitem -> new_Sampleitem + assert c._logical_to_schema_name("new_sampleitem") == "new_Sampleitem" + # abc_myentity -> abc_Myentity + assert c._logical_to_schema_name("abc_myentity") == "abc_Myentity" + + +# ============================================================================ +# Tests for _get_entity_schema_name +# ============================================================================ + +def test_get_entity_schema_name_lookup(): + """Test that _get_entity_schema_name retrieves SchemaName from server.""" + responses = [ + (200, {}, MD_SAMPLE_ITEM) # _get_entity_metadata uses EntityDefinitions endpoint + ] + c = TestableClient(responses) + schema = c._get_entity_schema_name("new_sampleitem") + assert schema == "new_Sampleitem" + + +def test_get_entity_schema_name_not_found(): + """Test that _get_entity_schema_name raises error when entity not found.""" + responses = [ + (200, {}, {"value": []}) # Empty response = not found + ] + c = TestableClient(responses) + with pytest.raises(MetadataError, match="Unable to resolve entity metadata"): + c._get_entity_schema_name("new_nonexistent") + + +# ============================================================================ +# Tests for _get_attribute_schema_name +# ============================================================================ + +def test_get_attribute_schema_name_lookup(): + """Test that _get_attribute_schema_name retrieves attribute SchemaName.""" + responses = [ + (200, {}, {"value": [MD_ENTITY_BY_LOGICAL]}), # _get_entity_by_logical_name uses EntityDefinitions endpoint + (200, {}, MD_ATTRIBUTE_TITLE) # Attribute lookup + ] + c = TestableClient(responses) + schema = c._get_attribute_schema_name("new_sampleitem", "new_title") + assert schema == "new_Title" + + +def test_get_attribute_schema_name_caching(): + """Test that _get_attribute_schema_name caches results.""" + responses = [ + (200, {}, {"value": [MD_ENTITY_BY_LOGICAL]}), # Entity lookup (first call) + (200, {}, MD_ATTRIBUTE_TITLE) # Attribute lookup (first call) + # No more responses needed - second call should use cache + ] + c = TestableClient(responses) + + # First call - hits server + schema1 = c._get_attribute_schema_name("new_sampleitem", "new_title") + assert schema1 == "new_Title" + + # Second call - uses cache + schema2 = c._get_attribute_schema_name("new_sampleitem", "new_title") + assert schema2 == "new_Title" + + # Verify only 2 HTTP calls were made (not 4) + assert len(c._http.calls) == 2 + + +def test_get_attribute_schema_name_case_insensitive(): + """Test that attribute lookups are case-insensitive.""" + responses = [ + (200, {}, {"value": [MD_ENTITY_BY_LOGICAL]}), # Entity lookup + (200, {}, MD_ATTRIBUTE_TITLE) # Attribute lookup + ] + c = TestableClient(responses) + + # Lookup with different casing + schema = c._get_attribute_schema_name("NEW_SAMPLEITEM", "NEW_TITLE") + assert schema == "new_Title" + + # Verify the query used normalized (lowercase) logical name + calls = c._http.calls + attr_call = calls[1] # Second call is attribute lookup + assert "new_title" in str(attr_call).lower() + + +def test_get_attribute_schema_name_not_found(): + """Test error when attribute not found.""" + responses = [ + (200, {}, {"value": [MD_ENTITY_BY_LOGICAL]}), + (200, {}, {"value": []}) # Empty result = not found + ] + c = TestableClient(responses) + + with pytest.raises(MetadataError, match="Attribute 'new_missing' not found"): + c._get_attribute_schema_name("new_sampleitem", "new_missing") + + +def test_get_attribute_schema_name_entity_not_found(): + """Test error when entity doesn't exist for attribute lookup.""" + responses = [ + (200, {}, {"value": []}) # Empty entity response from EntityDefinitions + ] + c = TestableClient(responses) + + with pytest.raises(MetadataError, match="Entity 'new_missing' not found"): + c._get_attribute_schema_name("new_missing", "new_field") + + +# ============================================================================ +# Tests for _get_entity_metadata (Unified Cache) +# ============================================================================ + +def test_get_entity_metadata_returns_all_fields(): + """Test that _get_entity_metadata returns complete TableMetadata.""" + responses = [ + (200, {}, MD_SAMPLE_ITEM) + ] + c = TestableClient(responses) + + metadata = c._get_entity_metadata("new_sampleitem") + + assert metadata["entity_set_name"] == "new_sampleitems" + assert metadata["schema_name"] == "new_Sampleitem" + assert metadata["primary_id_attribute"] == "new_sampleitemid" + + +def test_get_entity_metadata_caching(): + """Test that _get_entity_metadata caches results.""" + responses = [ + (200, {}, MD_SAMPLE_ITEM) # Only one response needed + ] + c = TestableClient(responses) + + # First call - hits server + metadata1 = c._get_entity_metadata("new_sampleitem") + assert metadata1["entity_set_name"] == "new_sampleitems" + + # Second call - uses cache + metadata2 = c._get_entity_metadata("new_sampleitem") + assert metadata2["entity_set_name"] == "new_sampleitems" + + # Third call - still uses cache + metadata3 = c._get_entity_metadata("new_sampleitem") + assert metadata3["entity_set_name"] == "new_sampleitems" + + # Verify only 1 HTTP call was made + assert len(c._http.calls) == 1 + + +def test_get_entity_metadata_case_insensitive(): + """Test that metadata lookups are case-insensitive.""" + responses = [ + (200, {}, MD_SAMPLE_ITEM) + ] + c = TestableClient(responses) + + # Different casing should all normalize to same cache key + metadata1 = c._get_entity_metadata("new_sampleitem") + metadata2 = c._get_entity_metadata("NEW_SAMPLEITEM") + metadata3 = c._get_entity_metadata("New_SampleItem") + + assert metadata1 == metadata2 == metadata3 + # Only 1 HTTP call should have been made + assert len(c._http.calls) == 1 + + +def test_get_entity_metadata_not_found(): + """Test error when entity metadata not found.""" + responses = [ + (200, {}, {"value": []}) # Empty response + ] + c = TestableClient(responses) + + with pytest.raises(MetadataError, match="Unable to resolve entity metadata"): + c._get_entity_metadata("new_nonexistent") + + +def test_get_entity_metadata_plural_hint(): + """Test helpful error message for plural names.""" + responses = [ + (200, {}, {"value": []}) + ] + c = TestableClient(responses) + + with pytest.raises(MetadataError, match="did you pass a plural entity set name"): + c._get_entity_metadata("accounts") # Ends with 's' + + +def test_get_entity_metadata_missing_entity_set(): + """Test error when EntitySetName missing in response.""" + responses = [ + (200, {}, {"value": [{"LogicalName": "test", "SchemaName": "Test"}]}) + ] + c = TestableClient(responses) + + with pytest.raises(MetadataError, match="missing EntitySetName"): + c._get_entity_metadata("test") + + +def test_get_entity_metadata_missing_schema_name(): + """Test error when SchemaName missing in response.""" + responses = [ + (200, {}, {"value": [{"LogicalName": "test", "EntitySetName": "tests"}]}) + ] + c = TestableClient(responses) + + with pytest.raises(MetadataError, match="missing SchemaName"): + c._get_entity_metadata("test") + + +# ============================================================================ +# Tests for Case-Insensitive Entity Operations +# ============================================================================ + +def test_entity_set_from_logical_case_insensitive(): + """Test that _entity_set_from_logical is case-insensitive.""" + responses = [ + (200, {}, MD_SAMPLE_ITEM) + ] + c = TestableClient(responses) + + # All variations should return same entity set name + assert c._entity_set_from_logical("new_sampleitem") == "new_sampleitems" + assert c._entity_set_from_logical("NEW_SAMPLEITEM") == "new_sampleitems" + assert c._entity_set_from_logical("New_SampleItem") == "new_sampleitems" + + # Only 1 HTTP call (rest from cache) + assert len(c._http.calls) == 1 + + +def test_get_entity_by_logical_name_normalization(): + """Test that _get_entity_by_logical_name normalizes input.""" + responses = [ + (200, {}, {"value": [MD_ENTITY_BY_LOGICAL]}) + ] + c = TestableClient(responses) + + entity = c._get_entity_by_logical_name("NEW_SAMPLEITEM") + assert entity["LogicalName"] == "new_sampleitem" + + # Check that the query parameters used normalized name + call = c._http.calls[0] + method, url, kwargs = call + params = kwargs.get('params', {}) + filter_clause = params.get('$filter', '') + assert "new_sampleitem" in filter_clause.lower() + + +# ============================================================================ +# Tests for Table/Column Operations with Normalization +# ============================================================================ + +def test_create_table_normalizes_logical_name(): + """Test that _create_table normalizes the logical name.""" + responses = [ + (200, {}, {}), # POST to create entity + (200, {}, {"value": [MD_ENTITY_BY_LOGICAL]}), # GET entity by logical with Consistency: Strong + ] + c = TestableClient(responses) + + # Create with mixed case - _create_table takes schema dict, not individual params + c._create_table( + logical_name="NEW_SAMPLEITEM", + schema={"new_field1": "string"}, # Dict of column_name -> type + solution_unique_name=None + ) + + # Verify the POST was made to EntityDefinitions + call = c._http.calls[0] + method, url, kwargs = call + assert method == "post" + assert "EntityDefinitions" in url + + +def test_create_columns_normalizes_names(): + """Test that _create_columns normalizes table and column names.""" + responses = [ + (200, {}, {"value": [MD_ENTITY_BY_LOGICAL]}), # Get entity by logical + (204, {}, {}) # Column creation response (POST to Attributes) + ] + c = TestableClient(responses) + + # Create columns with mixed case - _create_columns takes dict of name -> type + c._create_columns( + logical_name="NEW_SAMPLEITEM", + columns={"NEW_FIELD1": "string"} # Dict format + ) + + # Verify entity lookup used normalized name + calls = c._http.calls + assert len(calls) >= 1 + # The first call should be to get entity metadata with normalized name + + +def test_delete_columns_normalizes_names(): + """Test that _delete_columns normalizes table and column names.""" + responses = [ + (200, {}, {"value": [MD_ENTITY_BY_LOGICAL]}), # Get entity by logical (from _delete_columns) + (200, {}, {"value": [MD_ENTITY_BY_LOGICAL]}), # Get entity by logical (from _get_attribute_schema_name) + (200, {}, MD_ATTRIBUTE_TITLE), # Get attribute schema (from _get_attribute_schema_name) + (200, {}, {"value": [{"MetadataId": "attr-guid-123", "LogicalName": "new_title", "SchemaName": "new_Title", "@odata.type": "Microsoft.Dynamics.CRM.StringAttributeMetadata"}]}), # Get attribute metadata by SchemaName + (204, {}, {}) # Delete response + ] + c = TestableClient(responses) + + # Delete with mixed case - _delete_columns takes str or list of str + c._delete_columns( + logical_name="NEW_SAMPLEITEM", + columns=["NEW_TITLE"] # Parameter is 'columns' not 'column_names' + ) + + # Verify calls were made with normalized names + assert len(c._http.calls) >= 2 + + +# ============================================================================ +# Tests for Integration Scenarios +# ============================================================================ + +def test_end_to_end_case_insensitive_workflow(): + """Test complete workflow with mixed case names.""" + guid = "11111111-2222-3333-4444-555555555555" + + responses = [ + # Lookup metadata with UPPERCASE + (200, {}, MD_SAMPLE_ITEM), + # Create record + (204, {"OData-EntityId": f"https://org.example/api/data/v9.2/new_sampleitems({guid})"}, {}), + # Get record back with MixedCase + (200, {}, {"new_sampleitemid": guid, "new_title": "Test"}), + ] + c = TestableClient(responses) + + # Use UPPERCASE for entity set lookup + entity_set = c._entity_set_from_logical("NEW_SAMPLEITEM") + assert entity_set == "new_sampleitems" + + # Create with lowercase + record_id = c._create(entity_set, "new_sampleitem", {"new_title": "Test"}) + assert record_id == guid + + # Get with MixedCase + record = c._get("New_SampleItem", guid) + assert record["new_title"] == "Test" + + +def test_cache_isolation_between_entities(): + """Test that cache correctly isolates different entities.""" + responses = [ + (200, {}, MD_SAMPLE_ITEM), + (200, {}, MD_ACCOUNT) + ] + c = TestableClient(responses) + + # Get metadata for two different entities + md1 = c._get_entity_metadata("new_sampleitem") + md2 = c._get_entity_metadata("account") + + # They should have different values + assert md1["entity_set_name"] != md2["entity_set_name"] + assert md1["schema_name"] != md2["schema_name"] + + # Both should be cached (2 HTTP calls total) + assert len(c._http.calls) == 2 + + # Re-accessing should not make more calls + md1_again = c._get_entity_metadata("new_sampleitem") + assert md1 == md1_again + assert len(c._http.calls) == 2 # Still only 2 + + +def test_create_with_one_casing_crud_with_another(): + """Test creating table with one casing, then performing CRUD with different casings. + + This simulates real-world scenarios where: + 1. Table is created with lowercase: "new_product" + 2. Developer uses different casing in subsequent operations + 3. All operations should work and share the same cache + """ + table_guid = "table-1111-2222-3333-4444" + + # Simplified test focusing on metadata operations and cache sharing + responses = [ + # CREATE TABLE with lowercase "new_product" + (200, {}, {}), # POST EntityDefinitions + (200, {}, {"value": [{ + "LogicalName": "new_product", + "EntitySetName": "new_products", + "SchemaName": "new_Product", + "MetadataId": table_guid, + "PrimaryIdAttribute": "new_productid" + }]}), # GET with Consistency: Strong + + # ADD COLUMN with UPPERCASE "NEW_PRODUCT" - should use cached entity metadata + (200, {}, {"value": [{ # Get entity by logical (for _create_columns) + "LogicalName": "new_product", + "EntitySetName": "new_products", + "SchemaName": "new_Product", + "MetadataId": table_guid + }]}), + (204, {}, {}), # POST attribute + + # GET entity set with MixedCase "New_Product" + # First call may need metadata if not already cached + (200, {}, {"value": [{ + "LogicalName": "new_product", + "EntitySetName": "new_products", + "SchemaName": "new_Product", + "MetadataId": table_guid, + "PrimaryIdAttribute": "new_productid" + }]}), # Metadata for _entity_set_from_logical + + # Subsequent calls with different casing use cache (no HTTP calls) + + # DELETE TABLE with lowercase "new_product" + (200, {}, {"value": [{ # Get entity by logical + "LogicalName": "new_product", + "EntitySetName": "new_products", + "SchemaName": "new_Product", + "MetadataId": table_guid + }]}), + (200, {}, {}), # DELETE entity + ] + c = TestableClient(responses) + + # 1. CREATE TABLE with lowercase + result = c._create_table( + logical_name="new_product", + schema={"new_price": "decimal"}, + solution_unique_name=None + ) + assert result["entity_set_name"] == "new_products" + + # 2. ADD COLUMN with UPPERCASE - operations normalize the name + c._create_columns( + logical_name="NEW_PRODUCT", + columns={"NEW_DESCRIPTION": "string"} + ) + + # 3. GET entity set with different casings - first call populates cache + entity_set1 = c._entity_set_from_logical("New_Product") + calls_after_first = len(c._http.calls) + + # First call may hit server to get metadata (if not already cached from _create_columns) + assert entity_set1 == "new_products" + + # Now verify cache was populated with normalized key + assert "new_product" in c._entity_metadata_cache + cached = c._entity_metadata_cache["new_product"] + assert cached["entity_set_name"] == "new_products" + assert cached["schema_name"] == "new_Product" + + # 4. Subsequent calls with different casing should use cache + entity_set2 = c._entity_set_from_logical("NEW_PRODUCT") + entity_set3 = c._entity_set_from_logical("new_product") + entity_set4 = c._entity_set_from_logical("NeW_PrOdUcT") + calls_after_cache_hits = len(c._http.calls) + + # All should return the same entity set + assert entity_set1 == entity_set2 == entity_set3 == entity_set4 == "new_products" + + # Verify no additional HTTP calls were made (all used cache) + assert calls_after_cache_hits == calls_after_first, \ + f"Expected cache hits, but made {calls_after_cache_hits - calls_after_first} additional calls" + + # 5. DELETE TABLE with lowercase - normalized name works + c._delete_table("new_product") + + # Verify all operations completed + assert len(c._http.calls) > 0 + + +def test_mixed_case_cache_reuse(): + """Test that cache is properly shared across different casing variants. + + Ensures that NEW_PRODUCT, new_product, New_Product all hit the same cache entry. + """ + responses = [ + (200, {}, MD_SAMPLE_ITEM) # Only ONE server call should be made + ] + c = TestableClient(responses) + + # Multiple lookups with different casings + md1 = c._get_entity_metadata("new_sampleitem") + md2 = c._get_entity_metadata("NEW_SAMPLEITEM") + md3 = c._get_entity_metadata("New_SampleItem") + md4 = c._get_entity_metadata("NeW_sAmPlEiTeM") + md5 = c._get_entity_metadata(" NEW_SAMPLEITEM ") # With whitespace + + # All should be identical + assert md1 == md2 == md3 == md4 == md5 + + # Verify only 1 HTTP call was made (all others used cache) + assert len(c._http.calls) == 1 + + # Verify the cache key is normalized (lowercase) + assert "new_sampleitem" in c._entity_metadata_cache + assert "NEW_SAMPLEITEM" not in c._entity_metadata_cache + assert "New_SampleItem" not in c._entity_metadata_cache + + +def test_attribute_cache_case_insensitive(): + """Test that attribute schema cache is case-insensitive.""" + responses = [ + (200, {}, {"value": [MD_ENTITY_BY_LOGICAL]}), + (200, {}, MD_ATTRIBUTE_TITLE) + ] + c = TestableClient(responses) + + # First lookup with lowercase + schema1 = c._get_attribute_schema_name("new_sampleitem", "new_title") + assert schema1 == "new_Title" + + # Second lookup with UPPERCASE - should hit cache + schema2 = c._get_attribute_schema_name("NEW_SAMPLEITEM", "NEW_TITLE") + assert schema2 == "new_Title" + + # Third lookup with MixedCase - should hit cache + schema3 = c._get_attribute_schema_name("New_SampleItem", "New_Title") + assert schema3 == "new_Title" + + # Only 2 HTTP calls (entity + attribute lookup), not 6 + assert len(c._http.calls) == 2 + + # Verify cache uses normalized keys + assert ("new_sampleitem", "new_title") in c._attribute_schema_cache + assert ("NEW_SAMPLEITEM", "NEW_TITLE") not in c._attribute_schema_cache + + +def test_primary_id_attr_case_insensitive(): + """Test that _primary_id_attr normalizes logical names for cache lookup.""" + responses = [ + (200, {}, MD_SAMPLE_ITEM) # Only ONE server call should be made + ] + c = TestableClient(responses) + + # Multiple lookups with different casings + pid1 = c._primary_id_attr("new_sampleitem") + pid2 = c._primary_id_attr("NEW_SAMPLEITEM") + pid3 = c._primary_id_attr("New_SampleItem") + pid4 = c._primary_id_attr(" new_sampleitem ") # With whitespace + + # All should return the same primary ID attribute + assert pid1 == pid2 == pid3 == pid4 == "new_sampleitemid" + + # Verify only 1 HTTP call was made (all others used cache) + assert len(c._http.calls) == 1 + + # Verify the cache key is normalized (lowercase) + assert "new_sampleitem" in c._entity_metadata_cache + assert "NEW_SAMPLEITEM" not in c._entity_metadata_cache + + +def test_logical_to_schema_name_case_insensitive(): + """Test that _logical_to_schema_name normalizes input for consistent output.""" + c = TestableClient([]) + + # All variations should produce the same SchemaName + assert c._logical_to_schema_name("new_sampleitem") == "new_Sampleitem" + assert c._logical_to_schema_name("NEW_SAMPLEITEM") == "new_Sampleitem" + assert c._logical_to_schema_name("New_SampleItem") == "new_Sampleitem" + assert c._logical_to_schema_name(" new_sampleitem ") == "new_Sampleitem" + + +def test_logical_to_schema_name_edge_cases(): + """Test that _logical_to_schema_name handles edge cases correctly.""" + c = TestableClient([]) + + # Empty string should raise ValueError + with pytest.raises(ValueError, match="cannot be empty"): + c._logical_to_schema_name("") + + # Whitespace-only should raise ValueError + with pytest.raises(ValueError, match="cannot be empty"): + c._logical_to_schema_name(" ") + + # Trailing underscore with no suffix should raise ValueError + with pytest.raises(ValueError, match="empty part after underscore"): + c._logical_to_schema_name("new_") + + # Name without underscore should raise ValueError (since split will fail) + # This should never happen in practice because callers validate presence of underscore + with pytest.raises(ValueError): + c._logical_to_schema_name("account") + + # Multiple underscores - only first split matters + assert c._logical_to_schema_name("new_sample_item") == "new_Sample_item" + + +def test_create_columns_requires_prefix(): + """Test that _create_columns rejects column names without publisher prefix.""" + responses = [ + (200, {}, {"value": [MD_ENTITY_BY_LOGICAL]}), # Get entity by logical + ] + c = TestableClient(responses) + + # Column name without underscore should raise ValueError + with pytest.raises(ValueError, match="Column logical name must include publisher prefix"): + c._create_columns( + logical_name="new_sampleitem", + columns={"mycolumn": "string"} # Missing prefix + ) + + +def test_create_table_requires_prefix(): + """Test that _create_table rejects table names without publisher prefix.""" + c = TestableClient([]) + + # Table name without underscore should raise ValueError + with pytest.raises(ValueError, match="Table logical name must include publisher prefix"): + c._create_table( + logical_name="mytable", # Missing prefix + schema={"new_field": "string"} + ) + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tests/unit/test_helpers.py b/tests/unit/test_helpers.py new file mode 100644 index 0000000..c1b5417 --- /dev/null +++ b/tests/unit/test_helpers.py @@ -0,0 +1,166 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +""" +Shared test utilities for unit tests. + +Provides mock objects for authentication, HTTP clients, and common test data +to reduce duplication across test files. +""" + +import types +from dataverse_sdk.data.odata import ODataClient + + +class DummyAuth: + """Mock authentication provider for testing. + + Returns a simple token object with an access_token attribute. + """ + def acquire_token(self, scope): + class Token: + access_token = "test_token" + return Token() + + +class DummyHTTPClient: + """Mock HTTP client that returns pre-configured responses. + + Args: + responses: List of (status_code, headers, body) tuples to return in sequence. + + Attributes: + calls: List of (method, url, kwargs) tuples recording all requests made. + """ + def __init__(self, responses): + self._responses = list(responses) # Make a copy + self.calls = [] + + def request(self, method, url, **kwargs): + """Mock HTTP request that returns the next pre-configured response.""" + self.calls.append((method, url, kwargs)) + if not self._responses: + raise AssertionError("No more dummy responses configured") + + status, headers, body = self._responses.pop(0) + resp = types.SimpleNamespace() + resp.status_code = status + resp.headers = headers + resp.text = "" if body is None else ("{}" if isinstance(body, dict) else str(body)) + + def raise_for_status(): + if status >= 400: + raise RuntimeError(f"HTTP {status}") + return None + + def json_func(): + return body if isinstance(body, dict) else {} + + resp.raise_for_status = raise_for_status + resp.json = json_func + return resp + + +class TestableClient(ODataClient): + """ODataClient with mocked HTTP for testing. + + Args: + responses: List of (status_code, headers, body) tuples for the mock HTTP client. + org_url: Organization URL (default: "https://org.example"). + config: Optional config object (default: None). + """ + def __init__(self, responses, org_url="https://org.example", config=None): + super().__init__(DummyAuth(), org_url, config) + self._http = DummyHTTPClient(responses) + + def _convert_labels_to_ints(self, logical_name, record): # pragma: no cover - test shim + """Test shim - no-op conversion for simplicity.""" + return record + + +class DummyConfig: + """Minimal config stub for tests that need config attributes. + + Args: + language_code: Language code for localized labels (default: 1033). + """ + def __init__(self, language_code=1033): + self.language_code = language_code + # HTTP settings referenced during ODataClient construction + self.http_retries = 0 + self.http_backoff = 0 + self.http_timeout = 5 + + +# ============================================================================ +# Common Test Data - Metadata Responses +# ============================================================================ + +def make_entity_metadata(logical_name, entity_set_name, schema_name, primary_id_attr): + """Create a standard EntityDefinitions metadata response. + + Args: + logical_name: Logical name of the entity (e.g., "account"). + entity_set_name: Entity set name for the collection (e.g., "accounts"). + schema_name: Schema name (e.g., "Account"). + primary_id_attr: Primary ID attribute name (e.g., "accountid"). + + Returns: + Dict representing an EntityDefinitions query response with one entity. + """ + return { + "value": [ + { + "LogicalName": logical_name, + "EntitySetName": entity_set_name, + "SchemaName": schema_name, + "PrimaryIdAttribute": primary_id_attr + } + ] + } + + +def make_entity_create_headers(entity_set_name, guid): + """Create standard headers returned by entity create operations. + + Args: + entity_set_name: Entity set name (e.g., "accounts"). + guid: GUID string for the created entity. + + Returns: + Dict with OData-EntityId header. + """ + return { + "OData-EntityId": f"https://org.example/api/data/v9.2/{entity_set_name}({guid})" + } + + +def make_attribute_metadata(logical_name, schema_name, metadata_id, odata_type=None, attribute_type=None): + """Create a standard Attribute metadata response. + + Args: + logical_name: Logical name of the attribute (e.g., "new_category"). + schema_name: Schema name of the attribute (e.g., "new_Category"). + metadata_id: GUID string for the attribute metadata. + odata_type: Optional @odata.type value (e.g., "Microsoft.Dynamics.CRM.PicklistAttributeMetadata"). + attribute_type: Optional AttributeType value (e.g., "Picklist"). + + Returns: + Dict representing an Attribute metadata response. + """ + result = { + "LogicalName": logical_name, + "SchemaName": schema_name, + "MetadataId": metadata_id + } + if odata_type: + result["@odata.type"] = odata_type + if attribute_type: + result["AttributeType"] = attribute_type + return result + + +# Common metadata responses for frequently used entities +MD_ACCOUNT = make_entity_metadata("account", "accounts", "Account", "accountid") +MD_CONTACT = make_entity_metadata("contact", "contacts", "Contact", "contactid") +MD_SAMPLE_ITEM = make_entity_metadata("new_sampleitem", "new_sampleitems", "new_Sampleitem", "new_sampleitemid")