Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ dev = [


[tool.ruff]
target-version = "py39"
target-version = "py310"

# Default + isort
# Default + isort + pyupgrade
lint.select = [
"E4", "E7", "E9", "F", "I",
"E4", "E7", "E9", "F", "I", "UP"
]

line-length = 120
46 changes: 22 additions & 24 deletions src/implicitdict/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dataclasses import dataclass
from datetime import datetime as datetime_type
from types import UnionType
from typing import Dict, Literal, Optional, Set, Tuple, Type, Union, get_args, get_origin, get_type_hints
from typing import Literal, Optional, Union, get_args, get_origin, get_type_hints

import arrow
import pytimeparse
Expand All @@ -14,7 +14,7 @@
_PARSING_ERRORS = (ValueError, TypeError)


def _bubble_up_parse_error(child: Union[ValueError, TypeError], field: str) -> Union[ValueError, TypeError]:
def _bubble_up_parse_error(child: ValueError | TypeError, field: str) -> ValueError | TypeError:
location_regex = r"^At ([A-Za-z0-9_.[\]]*):((?:.|[\n\r])*)$"
m = re.search(location_regex, str(child))
if m:
Expand Down Expand Up @@ -90,7 +90,7 @@ def __init__(self, **kwargs):
"""

@classmethod
def parse(cls, source: Dict, parse_type: Type):
def parse(cls, source: dict, parse_type: type):
if not isinstance(source, dict):
raise ValueError(
f"Expected to find dictionary data to populate {parse_type.__name__} object but instead found {type(source).__name__} type"
Expand All @@ -109,7 +109,7 @@ def parse(cls, source: Dict, parse_type: Type):
kwargs[key] = value
return parse_type(**kwargs)

def __init__(self, previous_instance: Optional[dict] = None, **kwargs):
def __init__(self, previous_instance: dict | None = None, **kwargs):
ancestor_kwargs = {}
subtype = type(self)

Expand All @@ -136,48 +136,46 @@ def __init__(self, previous_instance: Optional[dict] = None, **kwargs):
for key in all_fields:
if key not in provided_values:
if hasattr(subtype, key):
ancestor_kwargs[key] = super(ImplicitDict, self).__getattribute__(key)
ancestor_kwargs[key] = super().__getattribute__(key)

# Make sure all fields without a default and not labeled Optional were provided
for key in all_fields:
if key not in ancestor_kwargs and key not in optional_fields:
raise ValueError('Required field "{}" not specified in {}'.format(key, subtype.__name__))
raise ValueError(f'Required field "{key}" not specified in {subtype.__name__}')

super(ImplicitDict, self).__init__(**ancestor_kwargs)
super().__init__(**ancestor_kwargs)

def __getattribute__(self, item):
self_type = type(self)
if hasattr(self_type, _KEY_FIELDS_INFO):
fields_info_by_type: Dict[str, FieldsInfo] = getattr(self_type, _KEY_FIELDS_INFO)
fields_info_by_type: dict[str, FieldsInfo] = getattr(self_type, _KEY_FIELDS_INFO)
self_type_name = _fullname(self_type)
if self_type_name in fields_info_by_type:
if item in fields_info_by_type[self_type_name].all_fields:
try:
return self[item]
except KeyError:
raise AttributeError
return super(ImplicitDict, self).__getattribute__(item)
return super().__getattribute__(item)

def __setattr__(self, key, value):
self_type = type(self)
if hasattr(self_type, _KEY_FIELDS_INFO):
fields_info_by_type: Dict[str, FieldsInfo] = getattr(self_type, _KEY_FIELDS_INFO)
fields_info_by_type: dict[str, FieldsInfo] = getattr(self_type, _KEY_FIELDS_INFO)
self_type_name = _fullname(self_type)
if self_type_name in fields_info_by_type:
if key in fields_info_by_type[self_type_name].all_fields:
self[key] = value
return
else:
raise AttributeError(
'Attribute "{}" is not defined for "{}" object'.format(key, type(self).__name__)
)
super(ImplicitDict, self).__setattr__(key, value)
raise AttributeError(f'Attribute "{key}" is not defined for "{type(self).__name__}" object')
super().__setattr__(key, value)

def has_field_with_value(self, field_name: str) -> bool:
return field_name in self and self[field_name] is not None


def _parse_value(value, value_type: Type):
def _parse_value(value, value_type: type):
generic_type = get_origin(value_type)
if generic_type:
# Type is generic
Expand Down Expand Up @@ -225,7 +223,7 @@ def _parse_value(value, value_type: Type):
elif generic_type is Literal and len(arg_types) == 1:
# Type is a Literal (parsed value must match specified value)
if value != arg_types[0]:
raise ValueError("Value {} does not match required Literal {}".format(value, arg_types[0]))
raise ValueError(f"Value {value} does not match required Literal {arg_types[0]}")
return value

else:
Expand All @@ -244,12 +242,12 @@ def _parse_value(value, value_type: Type):


@dataclass
class FieldsInfo(object):
all_fields: Set[str]
optional_fields: Set[str]
class FieldsInfo:
all_fields: set[str]
optional_fields: set[str]


def _get_fields(subtype: Type) -> Tuple[Set[str], Set[str]]:
def _get_fields(subtype: type) -> tuple[set[str], set[str]]:
"""Determine all fields and optional fields for the specified type.

When all & optional fields are determined for a type, the result is cached
Expand All @@ -262,7 +260,7 @@ def _get_fields(subtype: Type) -> Tuple[Set[str], Set[str]]:
"""
if not hasattr(subtype, _KEY_FIELDS_INFO):
setattr(subtype, _KEY_FIELDS_INFO, {})
fields_info_by_type: Dict[str, FieldsInfo] = getattr(subtype, _KEY_FIELDS_INFO)
fields_info_by_type: dict[str, FieldsInfo] = getattr(subtype, _KEY_FIELDS_INFO)
subtype_name = _fullname(subtype)
if subtype_name not in fields_info_by_type:
# Enumerate fields defined for superclasses
Expand Down Expand Up @@ -310,7 +308,7 @@ def _get_fields(subtype: Type) -> Tuple[Set[str], Set[str]]:
return result.all_fields, result.optional_fields


def _fullname(class_type: Type) -> str:
def _fullname(class_type: type) -> str:
module = class_type.__module__
if module == "builtins":
return class_type.__qualname__ # avoid outputs like 'builtins.str'
Expand All @@ -323,7 +321,7 @@ class StringBasedTimeDelta(str):
timedelta: datetime.timedelta
"""Timedelta matching the string value of this instance."""

def __new__(cls, value: Union[str, datetime.timedelta, int, float], reformat: bool = False):
def __new__(cls, value: str | datetime.timedelta | int | float, reformat: bool = False):
"""Create a new StringBasedTimeDelta.

Args:
Expand Down Expand Up @@ -353,7 +351,7 @@ class StringBasedDateTime(str):
datetime: datetime.datetime
"""Timezone-aware datetime matching the string value of this instance."""

def __new__(cls, value: Union[str, datetime_type, arrow.Arrow], reformat: bool = False):
def __new__(cls, value: str | datetime_type | arrow.Arrow, reformat: bool = False):
"""Create a new StringBasedDateTime instance.

Args:
Expand Down
23 changes: 12 additions & 11 deletions src/implicitdict/jsonschema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,40 @@
import inspect
import json
import re
from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime
from types import UnionType
from typing import Callable, Dict, Literal, Optional, Tuple, Type, Union, get_args, get_origin, get_type_hints
from typing import Literal, Union, get_args, get_origin, get_type_hints

from . import ImplicitDict, StringBasedDateTime, StringBasedTimeDelta, _fullname, _get_fields


@dataclass
class SchemaVars(object):
class SchemaVars:
name: str
"""Unique name that can be used to reference this type/schema."""

path_to: Optional[Callable[[Type, Type], str]] = None
path_to: Callable[[type, type], str] | None = None
"""Function to compute $ref path to schema describing the first type from the schema describing the second type"""

schema_id: Optional[str] = None
schema_id: str | None = None
"""ID of the schema describing this type. Will be used to populate $schema."""

description: Optional[str] = None
description: str | None = None
"""Description of this type/schema."""


SchemaVarsResolver = Callable[[Type], SchemaVars]
SchemaVarsResolver = Callable[[type], SchemaVars]
"""Function producing the characteristics of a schema (SchemaVars) for a given Type."""

_implicitdict_doc = inspect.getdoc(ImplicitDict)


def make_json_schema(
schema_type: Type[ImplicitDict],
schema_type: type[ImplicitDict],
schema_vars_resolver: SchemaVarsResolver,
schema_repository: Dict[str, dict],
schema_repository: dict[str, dict],
) -> None:
"""Create JSON Schema for the specified schema type and all dependencies.

Expand Down Expand Up @@ -103,8 +104,8 @@ def make_json_schema(


def _schema_for(
value_type: Type, schema_vars_resolver: SchemaVarsResolver, schema_repository: Dict[str, dict], context: Type
) -> Tuple[dict, bool]:
value_type: type, schema_vars_resolver: SchemaVarsResolver, schema_repository: dict[str, dict], context: type
) -> tuple[dict, bool]:
"""Get the JSON Schema representation of the value_type.

Args:
Expand Down Expand Up @@ -198,7 +199,7 @@ def _schema_for(
raise NotImplementedError(f"Automatic JSON schema generation for {value_type} type is not yet implemented")


def _field_docs_for(t: Type[ImplicitDict]) -> Dict[str, str]:
def _field_docs_for(t: type[ImplicitDict]) -> dict[str, str]:
# Curse Guido for rejecting PEP224! Fine, we'll do it ourselves.
result = {}
src = inspect.getsource(t)
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def _duplicate_tests_with_future_annotations():
pass
for filename in os.listdir(this_folder):
if (filename.startswith("test_") and filename.endswith(".py")) or filename.endswith("_test.py"):
with open(os.path.join(this_folder, filename), "r") as f:
with open(os.path.join(this_folder, filename)) as f:
code = f.read()
with open(os.path.join(future_annotations_folder, filename), "w") as f:
f.write("from __future__ import annotations\n")
Expand Down
7 changes: 3 additions & 4 deletions tests/test_docpattern.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import multiprocessing
from typing import List, Optional, Type

from implicitdict import ImplicitDict
from implicitdict.jsonschema import SchemaVars, make_json_schema
Expand All @@ -10,10 +9,10 @@ class ResponseType(ImplicitDict):


class Query(ImplicitDict):
participant_id: Optional[str]
participant_id: str | None
"""If specified, identifier of the USS/participant hosting the server involved in this query."""

def parse_json_result(self, parse_type: Type[ResponseType]) -> ResponseType:
def parse_json_result(self, parse_type: type[ResponseType]) -> ResponseType:
"""Parses the JSON result into the specified type.

Args:
Expand All @@ -35,7 +34,7 @@ def parse_json_result(self, parse_type: Type[ResponseType]) -> ResponseType:
class QueryError(RuntimeError):
"""Error encountered when interacting with a server in the UTM ecosystem."""

queries: List[Query]
queries: list[Query]


def _perform_docstring_parsing_test():
Expand Down
7 changes: 3 additions & 4 deletions tests/test_jsonschema.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import json
from typing import Type

import jsonschema

Expand All @@ -19,16 +18,16 @@
)


def _resolver(t: Type) -> SchemaVars:
def path_to(t_dest: Type, t_src: Type) -> str:
def _resolver(t: type) -> SchemaVars:
def path_to(t_dest: type, t_src: type) -> str:
return "#/definitions/" + t_dest.__module__ + t_dest.__qualname__

full_name = t.__module__ + t.__qualname__

return SchemaVars(name=full_name, path_to=path_to)


def _verify_schema_validation(obj, obj_type: Type[ImplicitDict]) -> None:
def _verify_schema_validation(obj, obj_type: type[ImplicitDict]) -> None:
repo = {}
implicitdict.jsonschema.make_json_schema(obj_type, _resolver, repo)

Expand Down
12 changes: 6 additions & 6 deletions tests/test_normal_usage.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
from enum import Enum
from typing import Dict, List, Literal, Optional
from typing import Literal

import pytest

Expand Down Expand Up @@ -60,7 +60,7 @@ class Features(ImplicitDict):
t_start: StringBasedDateTime
my_duration: StringBasedTimeDelta
my_literal: Literal["Must be this string"]
nested: Optional[NormalUsageData]
nested: NormalUsageData | None


def test_features():
Expand Down Expand Up @@ -101,10 +101,10 @@ def test_features():


class NestedStructures(ImplicitDict):
my_list: List[NormalUsageData]
my_list_2: List[List[int]]
my_list_3: List[List[List[int]]]
my_dict: Dict[str, List[float]]
my_list: list[NormalUsageData]
my_list_2: list[list[int]]
my_list_3: list[list[list[int]]]
my_dict: dict[str, list[float]]


def test_nested_structures():
Expand Down
3 changes: 1 addition & 2 deletions tests/test_stacktrace.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import json
from typing import List, Optional

import pytest

Expand All @@ -10,7 +9,7 @@

# This object must be defined with future annotations as Python 3.8 will not resolve string-based forward references correctly
class MassiveNestingData(ImplicitDict):
children: Optional[List[MassiveNestingData]]
children: list[MassiveNestingData] | None
foo: str
bar: int = 0

Expand Down
Loading
Loading