diff --git a/docs/conf.py b/docs/conf.py index 3e9793e..699fe6e 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -1,5 +1,4 @@ #!/usr/bin/env python3 -# -*- coding: utf-8 -*- # # This file is execfile()d with the current directory set to its containing dir. # @@ -9,8 +8,8 @@ # All configuration values have a default; values that are commented out # serve to show the default. -import sys import os +import sys # If extensions (or modules to document with autodoc) are in another directory, # add these directories to sys.path here. If the directory is relative to the diff --git a/minfraud/__init__.py b/minfraud/__init__.py index b8a39c8..33bb785 100644 --- a/minfraud/__init__.py +++ b/minfraud/__init__.py @@ -7,14 +7,13 @@ # flake8: noqa: F401 from .errors import ( - MinFraudError, AuthenticationError, HTTPError, - InvalidRequestError, InsufficientFundsError, + InvalidRequestError, + MinFraudError, ) - -from .webservice import AsyncClient, Client from .version import __version__ +from .webservice import AsyncClient, Client __author__ = "Gregory Oschwald" diff --git a/minfraud/models.py b/minfraud/models.py index e4082a6..3d83cb2 100644 --- a/minfraud/models.py +++ b/minfraud/models.py @@ -7,25 +7,27 @@ """ # pylint:disable=too-many-lines,too-many-instance-attributes,too-many-locals -from typing import Dict, List, Optional, Sequence +from collections.abc import Sequence +from typing import Optional -from geoip2.mixins import SimpleEquality import geoip2.models import geoip2.records -class _Serializable(SimpleEquality): - def to_dict(self): - """Returns a dict of the object suitable for serialization""" +class _Serializable: + def __eq__(self, other: object) -> bool: + return isinstance(other, self.__class__) and self.to_dict() == other.to_dict() + + def __ne__(self, other: object) -> bool: + return not self.__eq__(other) + + def to_dict(self) -> dict: + """Returns a dict of the object suitable for serialization.""" result = {} for key, value in self.__dict__.items(): if hasattr(value, "to_dict") and callable(value.to_dict): if d := value.to_dict(): result[key] = d - elif hasattr(value, "raw"): - # geoip2 uses "raw" for historical reasons - if d := value.raw: - result[key] = d elif isinstance(value, list): ls = [] for e in value: @@ -82,7 +84,9 @@ class IPRiskReason(_Serializable): code: Optional[str] reason: Optional[str] - def __init__(self, code: Optional[str] = None, reason: Optional[str] = None): + def __init__( + self, code: Optional[str] = None, reason: Optional[str] = None + ) -> None: self.code = code self.reason = reason @@ -112,7 +116,7 @@ class GeoIP2Location(geoip2.records.Location): local_time: Optional[str] def __init__(self, *args, **kwargs) -> None: - self.local_time = kwargs.get("local_time", None) + self.local_time = kwargs.get("local_time") super().__init__(*args, **kwargs) @@ -196,16 +200,16 @@ class IPAddress(geoip2.models.Insights): location: GeoIP2Location risk: Optional[float] - risk_reasons: List[IPRiskReason] + risk_reasons: list[IPRiskReason] def __init__( self, locales: Optional[Sequence[str]], *, - country: Optional[Dict] = None, - location: Optional[Dict] = None, + country: Optional[dict] = None, + location: Optional[dict] = None, risk: Optional[float] = None, - risk_reasons: Optional[List[Dict]] = None, + risk_reasons: Optional[list[dict]] = None, **kwargs, ) -> None: # For raw attribute @@ -218,7 +222,7 @@ def __init__( if risk_reasons is not None: kwargs["risk_reasons"] = risk_reasons - super().__init__(kwargs, locales=list(locales or [])) + super().__init__(locales, **kwargs) self.location = GeoIP2Location(**(location or {})) self.risk = risk self.risk_reasons = [IPRiskReason(**x) for x in risk_reasons or []] @@ -237,7 +241,7 @@ class ScoreIPAddress(_Serializable): risk: Optional[float] - def __init__(self, *, risk: Optional[float] = None, **_): + def __init__(self, *, risk: Optional[float] = None, **_) -> None: self.risk = risk @@ -292,7 +296,7 @@ def __init__( phone_number: Optional[str] = None, matches_provided_phone_number: Optional[bool] = None, **_, - ): + ) -> None: self.name = name self.matches_provided_name = matches_provided_name self.phone_number = phone_number @@ -352,7 +356,7 @@ def __init__( last_seen: Optional[str] = None, local_time: Optional[str] = None, **_, - ): + ) -> None: self.confidence = confidence self.id = id self.last_seen = last_seen @@ -402,7 +406,7 @@ def __init__( reason: Optional[str] = None, rule_label: Optional[str] = None, **_, - ): + ) -> None: self.action = action self.reason = reason self.rule_label = rule_label @@ -423,7 +427,7 @@ class EmailDomain(_Serializable): first_seen: Optional[str] - def __init__(self, *, first_seen: Optional[str] = None, **_): + def __init__(self, *, first_seen: Optional[str] = None, **_) -> None: self.first_seen = first_seen @@ -477,12 +481,12 @@ class Email(_Serializable): def __init__( self, - domain: Optional[Dict] = None, + domain: Optional[dict] = None, first_seen: Optional[str] = None, is_disposable: Optional[bool] = None, is_free: Optional[bool] = None, is_high_risk: Optional[bool] = None, - ): + ) -> None: self.domain = EmailDomain(**(domain or {})) self.first_seen = first_seen self.is_disposable = is_disposable @@ -564,7 +568,7 @@ class CreditCard(_Serializable): def __init__( self, - issuer: Optional[Dict] = None, + issuer: Optional[dict] = None, country: Optional[str] = None, brand: Optional[str] = None, is_business: Optional[bool] = None, @@ -573,7 +577,7 @@ def __init__( is_virtual: Optional[bool] = None, # pylint:disable=redefined-builtin type: Optional[str] = None, - ): + ) -> None: self.issuer = Issuer(**(issuer or {})) self.country = country self.brand = brand @@ -642,7 +646,7 @@ def __init__( distance_to_ip_location: Optional[int] = None, is_in_ip_country: Optional[bool] = None, **_, - ): + ) -> None: self.is_postal_in_city = is_postal_in_city self.latitude = latitude self.longitude = longitude @@ -729,7 +733,7 @@ def __init__( is_high_risk: Optional[bool] = None, distance_to_billing_address: Optional[int] = None, **_, - ): + ) -> None: self.is_postal_in_city = is_postal_in_city self.latitude = latitude self.longitude = longitude @@ -789,7 +793,7 @@ def __init__( network_operator: Optional[str] = None, number_type: Optional[str] = None, **_, - ): + ) -> None: self.country = country self.is_voip = is_voip self.network_operator = network_operator @@ -837,7 +841,7 @@ def __init__( warning: Optional[str] = None, input_pointer: Optional[str] = None, **_, - ): + ) -> None: self.code = code self.warning = warning self.input_pointer = input_pointer @@ -1060,7 +1064,7 @@ def __init__( shipping_address_distance_to_ip_location: Optional[float] = None, time_of_day: Optional[float] = None, **_, - ): + ) -> None: self.avs_result = avs_result self.billing_address = billing_address self.billing_address_distance_to_ip_location = ( @@ -1174,8 +1178,12 @@ class Reason(_Serializable): reason: Optional[str] def __init__( - self, *, code: Optional[str] = None, reason: Optional[str] = None, **_ - ): + self, + *, + code: Optional[str] = None, + reason: Optional[str] = None, + **_, + ) -> None: self.code = code self.reason = reason @@ -1202,15 +1210,15 @@ class RiskScoreReason(_Serializable): """ multiplier: float - reasons: List[Reason] + reasons: list[Reason] def __init__( self, *, multiplier: float, - reasons: Optional[List] = None, + reasons: Optional[list] = None, **_, - ): + ) -> None: self.multiplier = multiplier self.reasons = [Reason(**x) for x in reasons or []] @@ -1357,32 +1365,32 @@ class Factors(_Serializable): shipping_address: ShippingAddress shipping_phone: Phone subscores: Subscores - warnings: List[ServiceWarning] - risk_score_reasons: List[RiskScoreReason] + warnings: list[ServiceWarning] + risk_score_reasons: list[RiskScoreReason] def __init__( self, locales: Sequence[str], *, - billing_address: Optional[Dict] = None, - billing_phone: Optional[Dict] = None, - credit_card: Optional[Dict] = None, - disposition: Optional[Dict] = None, + billing_address: Optional[dict] = None, + billing_phone: Optional[dict] = None, + credit_card: Optional[dict] = None, + disposition: Optional[dict] = None, funds_remaining: float, - device: Optional[Dict] = None, - email: Optional[Dict] = None, + device: Optional[dict] = None, + email: Optional[dict] = None, # pylint:disable=redefined-builtin id: str, - ip_address: Optional[Dict] = None, + ip_address: Optional[dict] = None, queries_remaining: int, risk_score: float, - shipping_address: Optional[Dict] = None, - shipping_phone: Optional[Dict] = None, - subscores: Optional[Dict] = None, - warnings: Optional[List[Dict]] = None, - risk_score_reasons: Optional[List[Dict]] = None, + shipping_address: Optional[dict] = None, + shipping_phone: Optional[dict] = None, + subscores: Optional[dict] = None, + warnings: Optional[list[dict]] = None, + risk_score_reasons: Optional[list[dict]] = None, **_, - ): + ) -> None: self.billing_address = BillingAddress(**(billing_address or {})) self.billing_phone = Phone(**(billing_phone or {})) self.credit_card = CreditCard(**(credit_card or {})) @@ -1524,29 +1532,29 @@ class Insights(_Serializable): risk_score: float shipping_address: ShippingAddress shipping_phone: Phone - warnings: List[ServiceWarning] + warnings: list[ServiceWarning] def __init__( self, locales: Sequence[str], *, - billing_address: Optional[Dict] = None, - billing_phone: Optional[Dict] = None, - credit_card: Optional[Dict] = None, - device: Optional[Dict] = None, - disposition: Optional[Dict] = None, - email: Optional[Dict] = None, + billing_address: Optional[dict] = None, + billing_phone: Optional[dict] = None, + credit_card: Optional[dict] = None, + device: Optional[dict] = None, + disposition: Optional[dict] = None, + email: Optional[dict] = None, funds_remaining: float, # pylint:disable=redefined-builtin id: str, - ip_address: Optional[Dict] = None, + ip_address: Optional[dict] = None, queries_remaining: int, risk_score: float, - shipping_address: Optional[Dict] = None, - shipping_phone: Optional[Dict] = None, - warnings: Optional[List[Dict]] = None, + shipping_address: Optional[dict] = None, + shipping_phone: Optional[dict] = None, + warnings: Optional[list[dict]] = None, **_, - ): + ) -> None: self.billing_address = BillingAddress(**(billing_address or {})) self.billing_phone = Phone(**(billing_phone or {})) self.credit_card = CreditCard(**(credit_card or {})) @@ -1627,21 +1635,21 @@ class Score(_Serializable): ip_address: ScoreIPAddress queries_remaining: int risk_score: float - warnings: List[ServiceWarning] + warnings: list[ServiceWarning] def __init__( self, *, - disposition: Optional[Dict] = None, + disposition: Optional[dict] = None, funds_remaining: float, # pylint:disable=redefined-builtin id: str, - ip_address: Optional[Dict] = None, + ip_address: Optional[dict] = None, queries_remaining: int, risk_score: float, - warnings: Optional[List[Dict]] = None, + warnings: Optional[list[dict]] = None, **_, - ): + ) -> None: self.disposition = Disposition(**(disposition or {})) self.funds_remaining = funds_remaining self.id = id diff --git a/minfraud/request.py b/minfraud/request.py index 0d2a468..3ffef1b 100644 --- a/minfraud/request.py +++ b/minfraud/request.py @@ -5,11 +5,12 @@ """ -import re -import warnings import hashlib +import re import unicodedata -from typing import Any, Dict +import warnings +from typing import Any, Optional + from voluptuous import MultipleInvalid from .errors import InvalidRequestError @@ -263,8 +264,8 @@ } -def prepare_report(request: Dict[str, Any], validate: bool): - """Validate and prepare minFraud report""" +def prepare_report(request: dict[str, Any], validate: bool): + """Validate and prepare minFraud report.""" cleaned_request = _copy_and_clean(request) if validate: try: @@ -275,11 +276,11 @@ def prepare_report(request: Dict[str, Any], validate: bool): def prepare_transaction( - request: Dict[str, Any], + request: dict[str, Any], validate: bool, hash_email: bool, ): - """Validate and prepare minFraud transaction""" + """Validate and prepare minFraud transaction.""" cleaned_request = _copy_and_clean(request) if validate: try: @@ -299,25 +300,26 @@ def prepare_transaction( def _copy_and_clean(data: Any) -> Any: """Create a copy of the data structure with Nones removed.""" if isinstance(data, dict): - return dict((k, _copy_and_clean(v)) for (k, v) in data.items() if v is not None) + return {k: _copy_and_clean(v) for (k, v) in data.items() if v is not None} if isinstance(data, (list, set, tuple)): return [_copy_and_clean(x) for x in data if x is not None] return data -def clean_credit_card(credit_card): - """Clean the credit_card input of a transaction request""" +def clean_credit_card(credit_card) -> None: + """Clean the credit_card input of a transaction request.""" last4 = credit_card.pop("last_4_digits", None) if last4: warnings.warn( "last_4_digits has been deprecated in favor of last_digits", DeprecationWarning, + stacklevel=2, ) credit_card["last_digits"] = last4 -def maybe_hash_email(transaction): - """Hash email address in transaction, if present""" +def maybe_hash_email(transaction) -> None: + """Hash email address in transaction, if present.""" try: email = transaction["email"] address = email["address"] @@ -337,7 +339,7 @@ def maybe_hash_email(transaction): email["address"] = hashlib.md5(address.encode("UTF-8")).hexdigest() -def _clean_domain(domain): +def _clean_domain(domain: str) -> str: domain = domain.strip().rstrip(".").encode("idna").decode("ASCII") domain = re.sub(r"(?:\.com){2,}$", ".com", domain) @@ -345,33 +347,30 @@ def _clean_domain(domain): idx = domain.rfind(".") if idx != -1: - tld = domain[idx + 1 :] # noqa - if tld in _TYPO_TLDS: - domain = domain[:idx] + "." + _TYPO_TLDS.get(tld) + # flake8: noqa: E203 + tld = domain[idx + 1 :] + if typo_tld := _TYPO_TLDS.get(tld): + domain = domain[:idx] + "." + typo_tld domain = _TYPO_DOMAINS.get(domain, domain) - domain = _EQUIVALENT_DOMAINS.get(domain, domain) - - return domain + return _EQUIVALENT_DOMAINS.get(domain, domain) -def _clean_email(address): +def _clean_email(address: str) -> tuple[Optional[str], Optional[str]]: address = address.lower().strip() at_idx = address.rfind("@") if at_idx == -1: return None, None - domain = _clean_domain(address[at_idx + 1 :]) # noqa + # flake8: noqa: E203 + domain = _clean_domain(address[at_idx + 1 :]) local_part = address[:at_idx] local_part = unicodedata.normalize("NFC", local_part) # Strip off aliased part of email address. - if domain in _YAHOO_DOMAINS: - divider = "-" - else: - divider = "+" + divider = "-" if domain in _YAHOO_DOMAINS else "+" alias_idx = local_part.find(divider) if alias_idx > 0: diff --git a/minfraud/validation.py b/minfraud/validation.py index dcd704f..b7b2dce 100644 --- a/minfraud/validation.py +++ b/minfraud/validation.py @@ -9,13 +9,23 @@ import ipaddress import re -import uuid import urllib.parse +import uuid from decimal import Decimal from typing import Optional from email_validator import validate_email # type: ignore -from voluptuous import All, Any, In, Match, MultipleInvalid, Range, Required, Schema +from voluptuous import ( + All, + Any, + In, + Match, + MultipleInvalid, + Range, + Required, + RequiredFieldInvalid, + Schema, +) from voluptuous.error import UrlInvalid # Pylint doesn't like the private function type naming for the callable @@ -31,7 +41,8 @@ _custom_input_value = Any( All(str, Match(r"^[^\n]{1,255}\Z")), All( - _any_number, Range(min=-1e13, max=1e13, min_included=False, max_included=False) + _any_number, + Range(min=-1e13, max=1e13, min_included=False, max_included=False), ), bool, ) @@ -41,7 +52,8 @@ _country_code = All(str, Match(r"^[A-Z]{2}$")) _telephone_country_code = Any( - All(str, Match("^[0-9]{1,4}$")), All(int, Range(min=1, max=9999)) + All(str, Match("^[0-9]{1,4}$")), + All(int, Range(min=1, max=9999)), ) _subdivision_iso_code = All(str, Match(r"^[0-9A-Z]{1,4}$")) @@ -251,7 +263,7 @@ def _hostname(hostname: str) -> str: "windcave", "wirecard", "worldpay", - ] + ], ) _single_char = Match("^[A-Za-z0-9]$") @@ -268,7 +280,7 @@ def _credit_card_token(s: str) -> str: _rfc3339_datetime = Match( - r"(?a)\A\d{4}-\d{2}-\d{2}[Tt]\d{2}:\d{2}:\d{2}(\.\d+)?(?:[Zz]|[+-]\d{2}:\d{2})\Z" + r"(?a)\A\d{4}-\d{2}-\d{2}[Tt]\d{2}:\d{2}:\d{2}(\.\d+)?(?:[Zz]|[+-]\d{2}:\d{2})\Z", ) @@ -283,7 +295,7 @@ def _credit_card_token(s: str) -> str: "recurring_purchase", "referral", "survey", - ] + ], ) _currency_code = Match("^[A-Z]{3}$") @@ -409,19 +421,26 @@ def _transaction_id(s: Optional[str]) -> str: ) -def _validate_at_least_one_identifier_field(report): +def _validate_at_least_one_identifier_field(report) -> bool: optional_fields = ["ip_address", "maxmind_id", "minfraud_id", "transaction_id"] if not any(field in report for field in optional_fields): # We return MultipleInvalid instead of ValueError to be consistent with what # voluptuous returns. - raise MultipleInvalid( + msg = ( "The report must contain at least one of the following fields: " "'ip_address', 'maxmind_id', 'minfraud_id', 'transaction_id'." ) + raise MultipleInvalid( + [ + RequiredFieldInvalid( + msg, + ) + ] + ) return True -def validate_report(report): +def validate_report(report) -> bool: """Validate minFraud Transaction Report fields.""" _validate_report_schema(report) _validate_at_least_one_identifier_field(report) diff --git a/minfraud/version.py b/minfraud/version.py index dfcb107..b4c6da7 100644 --- a/minfraud/version.py +++ b/minfraud/version.py @@ -1,3 +1,3 @@ -"""Internal module for version (to prevent cyclic imports)""" +"""Internal module for version (to prevent cyclic imports).""" __version__ = "2.12.0b1" diff --git a/minfraud/webservice.py b/minfraud/webservice.py index 7e6bcb0..84e3767 100644 --- a/minfraud/webservice.py +++ b/minfraud/webservice.py @@ -7,8 +7,9 @@ """ import json +from collections.abc import Sequence from functools import partial -from typing import Any, Callable, cast, Dict, Optional, Sequence, Union +from typing import Any, Callable, Optional, Union, cast import aiohttp import aiohttp.http @@ -16,18 +17,17 @@ import requests.utils from requests.models import Response -from .version import __version__ from .errors import ( - MinFraudError, - HTTPError, AuthenticationError, + HTTPError, InsufficientFundsError, InvalidRequestError, + MinFraudError, PermissionRequiredError, ) from .models import Factors, Insights, Score from .request import prepare_report, prepare_transaction - +from .version import __version__ _AIOHTTP_UA = f"minFraud-API/{__version__} {aiohttp.http.SERVER_SOFTWARE}" @@ -61,10 +61,10 @@ def __init__( self._license_key = license_key self._timeout = timeout base_uri = f"{_SCHEME}://{host}/minfraud/v2.0" - self._score_uri = "/".join([base_uri, "score"]) - self._insights_uri = "/".join([base_uri, "insights"]) - self._factors_uri = "/".join([base_uri, "factors"]) - self._report_uri = "/".join([base_uri, "transactions", "report"]) + self._score_uri = f"{base_uri}/score" + self._insights_uri = f"{base_uri}/insights" + self._factors_uri = f"{base_uri}/factors" + self._report_uri = f"{base_uri}/transactions/report" def _handle_success( self, @@ -85,7 +85,11 @@ def _handle_success( return model_class(**decoded_body) # type: ignore def _exception_for_error( - self, status: int, content_type: Optional[str], raw_body: str, uri: str + self, + status: int, + content_type: Optional[str], + raw_body: str, + uri: str, ) -> Union[ AuthenticationError, InsufficientFundsError, @@ -94,7 +98,6 @@ def _exception_for_error( PermissionRequiredError, ]: """Returns the exception for the error responses.""" - if 400 <= status < 500: return self._exception_for_4xx_status(status, content_type, raw_body, uri) if 500 <= status < 600: @@ -102,7 +105,11 @@ def _exception_for_error( return self._exception_for_unexpected_status(status, raw_body, uri) def _exception_for_4xx_status( - self, status: int, content_type: Optional[str], raw_body: str, uri: str + self, + status: int, + content_type: Optional[str], + raw_body: str, + uri: str, ) -> Union[ AuthenticationError, InsufficientFundsError, @@ -113,7 +120,10 @@ def _exception_for_4xx_status( """Returns exception for error responses with 4xx status codes.""" if not raw_body: return HTTPError( - f"Received a {status} error with no body", status, uri, raw_body + f"Received a {status} error with no body", + status, + uri, + raw_body, ) if content_type is None or content_type.find("json") == -1: return HTTPError( @@ -135,7 +145,10 @@ def _exception_for_4xx_status( if "code" in decoded_body and "error" in decoded_body: return self._exception_for_web_service_error( - decoded_body.get("error"), decoded_body.get("code"), status, uri + decoded_body.get("error"), + decoded_body.get("code"), + status, + uri, ) return HTTPError( "Error response contains JSON but it does not " @@ -147,7 +160,10 @@ def _exception_for_4xx_status( @staticmethod def _exception_for_web_service_error( - message: str, code: str, status: int, uri: str + message: str, + code: str, + status: int, + uri: str, ) -> Union[ InvalidRequestError, AuthenticationError, @@ -238,7 +254,7 @@ def __init__( async def factors( self, - transaction: Dict[str, Any], + transaction: dict[str, Any], validate: bool = True, hash_email: bool = False, ) -> Factors: @@ -277,7 +293,7 @@ async def factors( async def insights( self, - transaction: Dict[str, Any], + transaction: dict[str, Any], validate: bool = True, hash_email: bool = False, ) -> Insights: @@ -316,7 +332,7 @@ async def insights( async def score( self, - transaction: Dict[str, Any], + transaction: dict[str, Any], validate: bool = True, hash_email: bool = False, ) -> Score: @@ -354,7 +370,9 @@ async def score( ) async def report( - self, report: Dict[str, Optional[str]], validate: bool = True + self, + report: dict[str, Optional[str]], + validate: bool = True, ) -> None: """Send a transaction report to the Report Transaction endpoint. @@ -387,7 +405,7 @@ async def _response_for( self, uri: str, model_class: Callable, - request: Dict[str, Any], + request: dict[str, Any], validate: bool, hash_email: bool, ) -> Union[Score, Factors, Insights]: @@ -403,7 +421,9 @@ async def _response_for( return self._handle_success(raw_body, uri, model_class) async def _do_request( - self, uri: str, data: Dict[str, Any] + self, + uri: str, + data: dict[str, Any], ) -> aiohttp.ClientResponse: session = await self._session() return await session.post(uri, json=data, proxy=self._proxy) @@ -418,8 +438,8 @@ async def _session(self) -> aiohttp.ClientSession: return self._existing_session - async def close(self): - """Close underlying session + async def close(self) -> None: + """Close underlying session. This will close the session and any associated connections. """ @@ -436,7 +456,7 @@ async def __aexit__(self, exc_type: None, exc_value: None, traceback: None) -> N class Client(BaseClient): """Synchronous client for accessing the minFraud web services.""" - _proxies: Optional[Dict[str, str]] + _proxies: Optional[dict[str, str]] _session: requests.Session def __init__( @@ -487,7 +507,7 @@ def __init__( def factors( self, - transaction: Dict[str, Any], + transaction: dict[str, Any], validate: bool = True, hash_email: bool = False, ) -> Factors: @@ -526,7 +546,7 @@ def factors( def insights( self, - transaction: Dict[str, Any], + transaction: dict[str, Any], validate: bool = True, hash_email: bool = False, ) -> Insights: @@ -565,7 +585,7 @@ def insights( def score( self, - transaction: Dict[str, Any], + transaction: dict[str, Any], validate: bool = True, hash_email: bool = False, ) -> Score: @@ -602,7 +622,7 @@ def score( ), ) - def report(self, report: Dict[str, Optional[str]], validate: bool = True) -> None: + def report(self, report: dict[str, Optional[str]], validate: bool = True) -> None: """Send a transaction report to the Report Transaction endpoint. :param report: A dictionary containing the transaction report to be sent @@ -634,7 +654,7 @@ def _response_for( self, uri: str, model_class: Callable, - request: Dict[str, Any], + request: dict[str, Any], validate: bool, hash_email: bool, ) -> Union[Score, Factors, Insights]: @@ -649,13 +669,16 @@ def _response_for( raise self._exception_for_error(status, content_type, raw_body, uri) return self._handle_success(raw_body, uri, model_class) - def _do_request(self, uri: str, data: Dict[str, Any]) -> Response: + def _do_request(self, uri: str, data: dict[str, Any]) -> Response: return self._session.post( - uri, json=data, timeout=self._timeout, proxies=self._proxies + uri, + json=data, + timeout=self._timeout, + proxies=self._proxies, ) - def close(self): - """Close underlying session + def close(self) -> None: + """Close underlying session. This will close the session and any associated connections. """ diff --git a/pyproject.toml b/pyproject.toml index 1a70a36..aec807b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,7 @@ authors = [ dependencies = [ "aiohttp>=3.6.2,<4.0.0", "email_validator>=2.0.0,<3.0.0", - "geoip2>=4.8.0,<5.0.0", + "geoip2>=5.0.1,<6.0.0", "requests>=2.24.0,<3.0.0", "voluptuous", ] @@ -37,9 +37,6 @@ test = [ "pytest-httpserver>=1.0.10", ] -[tool.setuptools.package-data] -minfraud = ["py.typed"] - [project.urls] Homepage = "https://www.maxmind.com/" Documentation = "https://minfraud.readthedocs.org/" @@ -57,3 +54,33 @@ build-backend = "setuptools.build_meta" # src is showing up in our GitHub linting builds. It seems to # contain deps. extend-exclude = '^/src/' + +[tool.ruff.lint] +select = ["ALL"] +ignore = [ + # Skip type annotation on **_ + "ANN003", + + # documenting magic methods + "D105", + + # Line length. We let black handle this for now. + "E501", + + # Don't bother with future imports for type annotations + "FA100", + + # Magic numbers for HTTP status codes seem ok most of the time. + "PLR2004", + + # pytest rules + "PT009", + "PT027", +] + +[tool.ruff.lint.per-file-ignores] +"minfraud/models,.py" = [ "D107", "PLR0913" ] +"tests/*" = ["ANN201", "D"] + +[tool.setuptools.package-data] +minfraud = ["py.typed"] diff --git a/tests/test_models.py b/tests/test_models.py index 2cb97ed..8115455 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -1,28 +1,29 @@ import unittest +from typing import Any, Union from minfraud.models import * class TestModels(unittest.TestCase): - def setUp(self): + def setUp(self) -> None: self.maxDiff = 20_000 - def test_billing_address(self): - address = BillingAddress(**self.address_dict) + def test_billing_address(self) -> None: + address = BillingAddress(**self.address_dict) # type: ignore[arg-type] self.check_address(address) - def test_shipping_address(self): + def test_shipping_address(self) -> None: address_dict = self.address_dict address_dict["is_high_risk"] = False address_dict["distance_to_billing_address"] = 200 - address = ShippingAddress(**address_dict) + address = ShippingAddress(**address_dict) # type:ignore[arg-type] self.check_address(address) self.assertEqual(False, address.is_high_risk) self.assertEqual(200, address.distance_to_billing_address) @property - def address_dict(self): + def address_dict(self) -> dict[str, Union[bool, float]]: return { "is_in_ip_country": True, "latitude": 43.1, @@ -31,14 +32,14 @@ def address_dict(self): "is_postal_in_city": True, } - def check_address(self, address): + def check_address(self, address) -> None: self.assertEqual(True, address.is_in_ip_country) self.assertEqual(True, address.is_postal_in_city) self.assertEqual(100, address.distance_to_ip_location) self.assertEqual(32.1, address.longitude) self.assertEqual(43.1, address.latitude) - def test_credit_card(self): + def test_credit_card(self) -> None: cc = CreditCard( issuer={"name": "Bank"}, brand="Visa", @@ -59,7 +60,7 @@ def test_credit_card(self): self.assertEqual(True, cc.is_issued_in_billing_address_country) self.assertEqual("credit", cc.type) - def test_device(self): + def test_device(self) -> None: id = "b643d445-18b2-4b9d-bad4-c9c4366e402a" last_seen = "2016-06-08T14:16:38Z" local_time = "2016-06-10T14:19:10-08:00" @@ -75,7 +76,7 @@ def test_device(self): self.assertEqual(last_seen, device.last_seen) self.assertEqual(local_time, device.local_time) - def test_disposition(self): + def test_disposition(self) -> None: disposition = Disposition( action="accept", reason="default", @@ -86,7 +87,7 @@ def test_disposition(self): self.assertEqual("default", disposition.reason) self.assertEqual("custom rule label", disposition.rule_label) - def test_email(self): + def test_email(self) -> None: first_seen = "2016-01-01" email = Email( first_seen=first_seen, @@ -100,7 +101,7 @@ def test_email(self): self.assertEqual(True, email.is_free) self.assertEqual(False, email.is_high_risk) - def test_email_domain(self): + def test_email_domain(self) -> None: first_seen = "2016-01-01" domain = EmailDomain( first_seen=first_seen, @@ -108,13 +109,13 @@ def test_email_domain(self): self.assertEqual(first_seen, domain.first_seen) - def test_geoip2_location(self): + def test_geoip2_location(self) -> None: time = "2015-04-19T12:59:23-01:00" location = GeoIP2Location(local_time=time, latitude=5) self.assertEqual(time, location.local_time) self.assertEqual(5, location.latitude) - def test_ip_address(self): + def test_ip_address(self) -> None: time = "2015-04-19T12:59:23-01:00" address = IPAddress( ["en"], @@ -175,15 +176,15 @@ def test_ip_address(self): address.risk_reasons[1].reason, ) - def test_empty_address(self): + def test_empty_address(self) -> None: address = IPAddress([]) self.assertEqual([], address.risk_reasons) - def test_score_ip_address(self): + def test_score_ip_address(self) -> None: address = ScoreIPAddress(risk=99) self.assertEqual(99, address.risk) - def test_ip_address_locales(self): + def test_ip_address_locales(self) -> None: loc = IPAddress( ["fr"], country={"names": {"fr": "Country"}}, @@ -193,7 +194,7 @@ def test_ip_address_locales(self): self.assertEqual("City", loc.city.name) self.assertEqual("Country", loc.country.name) - def test_issuer(self): + def test_issuer(self) -> None: phone = "132-342-2131" issuer = Issuer( @@ -208,7 +209,7 @@ def test_issuer(self): self.assertEqual(phone, issuer.phone_number) self.assertEqual(True, issuer.matches_provided_phone_number) - def test_phone(self): + def test_phone(self) -> None: phone = Phone( country="US", is_voip=True, @@ -221,7 +222,7 @@ def test_phone(self): self.assertEqual("Verizon/1", phone.network_operator) self.assertEqual("fixed", phone.number_type) - def test_warning(self): + def test_warning(self) -> None: code = "INVALID_INPUT" msg = "Input invalid" @@ -231,7 +232,7 @@ def test_warning(self): self.assertEqual(msg, warning.warning) self.assertEqual("/first/second", warning.input_pointer) - def test_reason(self): + def test_reason(self) -> None: code = "EMAIL_ADDRESS_NEW" msg = "Riskiness of newly-sighted email address" @@ -240,7 +241,7 @@ def test_reason(self): self.assertEqual(code, reason.code) self.assertEqual(msg, reason.reason) - def test_risk_score_reason(self): + def test_risk_score_reason(self) -> None: multiplier = 0.34 code = "EMAIL_ADDRESS_NEW" msg = "Riskiness of newly-sighted email address" @@ -254,7 +255,7 @@ def test_risk_score_reason(self): self.assertEqual(code, reason.reasons[0].code) self.assertEqual(msg, reason.reasons[0].reason) - def test_score(self): + def test_score(self) -> None: id = "b643d445-18b2-4b9d-bad4-c9c4366e402a" response = { "id": id, @@ -264,7 +265,7 @@ def test_score(self): "ip_address": {"risk": 99}, "warnings": [{"code": "INVALID_INPUT"}], } - score = Score(**response) + score = Score(**response) # type: ignore[arg-type] self.assertEqual(id, score.id) self.assertEqual(10.01, score.funds_remaining) @@ -275,23 +276,24 @@ def test_score(self): self.assertEqual(response, score.to_dict()) - def test_insights(self): + def test_insights(self) -> None: response = self.factors_response() del response["risk_score_reasons"] del response["subscores"] - insights = Insights(None, **response) + insights = Insights(None, **response) # type: ignore[arg-type] self.check_insights_data(insights, response["id"]) self.assertEqual(response, insights.to_dict()) - def test_factors(self): + def test_factors(self) -> None: response = self.factors_response() - factors = Factors(None, **response) + factors = Factors(None, **response) # type: ignore[arg-type] self.check_insights_data(factors, response["id"]) self.check_risk_score_reasons_data(factors.risk_score_reasons) self.assertEqual(0.01, factors.subscores.avs_result) self.assertEqual(0.02, factors.subscores.billing_address) self.assertEqual( - 0.03, factors.subscores.billing_address_distance_to_ip_location + 0.03, + factors.subscores.billing_address_distance_to_ip_location, ) self.assertEqual(0.04, factors.subscores.browser) self.assertEqual(0.05, factors.subscores.chargeback) @@ -309,13 +311,14 @@ def test_factors(self): self.assertEqual(0.15, factors.subscores.phone_number) self.assertEqual(0.2, factors.subscores.shipping_address) self.assertEqual( - 0.16, factors.subscores.shipping_address_distance_to_ip_location + 0.16, + factors.subscores.shipping_address_distance_to_ip_location, ) self.assertEqual(0.17, factors.subscores.time_of_day) self.assertEqual(response, factors.to_dict()) - def factors_response(self): + def factors_response(self) -> dict[str, Any]: return { "id": "b643d445-18b2-4b9d-bad4-c9c4366e402a", "disposition": {"action": "reject"}, @@ -365,13 +368,13 @@ def factors_response(self): { "code": "ANONYMOUS_IP", "reason": "Risk due to IP being an Anonymous IP", - } + }, ], - } + }, ], } - def check_insights_data(self, insights, uuid): + def check_insights_data(self, insights, uuid) -> None: self.assertEqual("US", insights.ip_address.country.iso_code) self.assertEqual(False, insights.ip_address.country.is_in_european_union) self.assertEqual(True, insights.credit_card.is_business) @@ -393,11 +396,12 @@ def check_insights_data(self, insights, uuid): self.assertEqual("INVALID_INPUT", insights.warnings[0].code) self.assertIsInstance(insights.warnings, list, "warnings is a list") - def check_risk_score_reasons_data(self, reasons): + def check_risk_score_reasons_data(self, reasons) -> None: self.assertEqual(1, len(reasons)) self.assertEqual(45, reasons[0].multiplier) self.assertEqual(1, len(reasons[0].reasons)) self.assertEqual("ANONYMOUS_IP", reasons[0].reasons[0].code) self.assertEqual( - "Risk due to IP being an Anonymous IP", reasons[0].reasons[0].reason + "Risk due to IP being an Anonymous IP", + reasons[0].reasons[0].reason, ) diff --git a/tests/test_request.py b/tests/test_request.py index b4b14cd..dbb590f 100644 --- a/tests/test_request.py +++ b/tests/test_request.py @@ -1,14 +1,14 @@ import unittest from minfraud.request import ( - maybe_hash_email, - clean_credit_card, _clean_email, + clean_credit_card, + maybe_hash_email, ) class TestRequest(unittest.TestCase): - def test_maybe_hash_email(self): + def test_maybe_hash_email(self) -> None: tests = [ { "name": "no email", @@ -37,7 +37,7 @@ def test_maybe_hash_email(self): "email": { "address": "977577b140bfb7c516e4746204fbdb01", "domain": "maxmind.com", - } + }, }, }, { @@ -47,19 +47,19 @@ def test_maybe_hash_email(self): "email": { "address": "977577b140bfb7c516e4746204fbdb01", "domain": "maxmind.com", - } + }, }, }, { "name": "domain already set", "input": { - "email": {"address": "test@maxmind.com", "domain": "google.com"} + "email": {"address": "test@maxmind.com", "domain": "google.com"}, }, "expected": { "email": { "address": "977577b140bfb7c516e4746204fbdb01", "domain": "google.com", - } + }, }, }, { @@ -69,7 +69,7 @@ def test_maybe_hash_email(self): "email": { "address": "977577b140bfb7c516e4746204fbdb01", "domain": "maxmind.com", - } + }, }, }, { @@ -79,7 +79,7 @@ def test_maybe_hash_email(self): "email": { "address": "977577b140bfb7c516e4746204fbdb01", "domain": "maxmind.com", - } + }, }, }, { @@ -89,7 +89,7 @@ def test_maybe_hash_email(self): "email": { "address": "977577b140bfb7c516e4746204fbdb01", "domain": "maxmind.com", - } + }, }, }, { @@ -99,7 +99,7 @@ def test_maybe_hash_email(self): "email": { "address": "667a28047b6caade43c7e75f66aab5f5", "domain": "yahoo.com", - } + }, }, }, { @@ -109,7 +109,7 @@ def test_maybe_hash_email(self): "email": { "address": "a5f830c699fd71ad653aa59fa688c6d9", "domain": "yahoo.com", - } + }, }, }, { @@ -119,7 +119,7 @@ def test_maybe_hash_email(self): "email": { "address": "24948acabac551360cd510d5e5e2b464", "domain": "xn--bcher-kva.com", - } + }, }, }, { @@ -129,7 +129,7 @@ def test_maybe_hash_email(self): "email": { "address": "aa57884e48f0dda9fc6f4cb2bffb1dd2", "domain": "maxmind.com", - } + }, }, }, { @@ -138,7 +138,7 @@ def test_maybe_hash_email(self): "expected": { "email": { "address": "246a848af2f8394e3adbc738dbe43720", - } + }, }, }, { @@ -148,7 +148,7 @@ def test_maybe_hash_email(self): "email": { "address": "53550c712b146287a2d0dd30e5ed6f4b", "domain": "example.com", - } + }, }, }, { @@ -158,7 +158,7 @@ def test_maybe_hash_email(self): "email": { "address": "53550c712b146287a2d0dd30e5ed6f4b", "domain": "example.com", - } + }, }, }, ] @@ -171,7 +171,7 @@ def test_maybe_hash_email(self): self.assertEqual(test["expected"], transaction) - def test_clean_credit_card(self): + def test_clean_credit_card(self) -> None: tests = [ { "name": "deprecated last_4_digits is cleaned to last_digits", @@ -217,7 +217,7 @@ def test_clean_credit_card(self): self.assertEqual(test["expected"], transaction) -def test_clean_email(): +def test_clean_email() -> None: tests = [ {"input": "", "output": None}, {"input": "fasfs", "output": None}, @@ -256,5 +256,5 @@ def test_clean_email(): ] for test in tests: - got, _ = _clean_email(test["input"]) - assert test["output"] == got + got, _ = _clean_email(test["input"]) # type: ignore + assert test["output"] == got # type: ignore diff --git a/tests/test_validation.py b/tests/test_validation.py index 7c94cff..7799596 100644 --- a/tests/test_validation.py +++ b/tests/test_validation.py @@ -1,107 +1,109 @@ +import unittest from decimal import Decimal -from voluptuous import MultipleInvalid -from minfraud.validation import validate_transaction, validate_report +from voluptuous import MultipleInvalid -import unittest +from minfraud.validation import validate_report, validate_transaction -class ValidationBase: - def setup_transaction(self, transaction): +class ValidationBase(unittest.TestCase): + def setup_transaction(self, transaction) -> None: if "device" not in transaction: transaction["device"] = {} if "ip_address" not in transaction["device"]: transaction["device"]["ip_address"] = "1.1.1.1" - def check_invalid_transaction(self, transaction): + def check_invalid_transaction(self, transaction) -> None: self.setup_transaction(transaction) with self.assertRaises(MultipleInvalid, msg=f"{transaction} is invalid"): validate_transaction(transaction) - def check_transaction(self, transaction): + def check_transaction(self, transaction) -> None: self.setup_transaction(transaction) try: validate_transaction(transaction) except MultipleInvalid as e: self.fail(f"MultipleInvalid {e.msg} thrown for {transaction}") - def check_transaction_str_type(self, object, key): + def check_transaction_str_type(self, object, key) -> None: self.check_transaction({object: {key: "string"}}) self.check_invalid_transaction({object: {key: 12}}) - def check_positive_number(self, f): + def check_positive_number(self, f) -> None: for good in (1, 1.1, Decimal("1.1")): self.check_transaction(f(good)) for bad in ("1.2", "1", -1, -1.1, 0): self.check_invalid_transaction(f(bad)) - def check_bool(self, object, key): + def check_bool(self, object, key) -> None: for good in (True, False): self.check_transaction({object: {key: good}}) for bad in ("", 0, "True"): self.check_invalid_transaction({object: {key: bad}}) - def setup_report(self, report): + def setup_report(self, report) -> None: if "ip_address" not in report: report["ip_address"] = "1.2.3.4" if "tag" not in report: report["tag"] = "chargeback" - def check_invalid_report(self, report): + def check_invalid_report(self, report) -> None: self.setup_report(report) self.check_invalid_report_no_setup(report) - def check_invalid_report_no_setup(self, report): + def check_invalid_report_no_setup(self, report) -> None: with self.assertRaises(MultipleInvalid, msg=f"{report} is invalid"): validate_report(report) - def check_report(self, report): + def check_report(self, report) -> None: self.setup_report(report) self.check_report_no_setup(report) - def check_report_no_setup(self, report): + def check_report_no_setup(self, report) -> None: try: validate_report(report) except MultipleInvalid as e: self.fail(f"MultipleInvalid {e.msg} thrown for {report}") - def check_report_str_type(self, key): + def check_report_str_type(self, key) -> None: self.check_report({key: "string"}) self.check_invalid_report({key: 12}) -class TestTransaction(unittest.TestCase, ValidationBase): - def test_transaction_without_device(self): +class TestTransaction(ValidationBase): + def test_transaction_without_device(self) -> None: transaction = { "account": { "user_id": "usr", - } + }, } validate_transaction(transaction) -class TestAccount(unittest.TestCase, ValidationBase): - def test_account_user_id(self): +class TestAccount(ValidationBase): + def test_account_user_id(self) -> None: self.check_transaction({"account": {"user_id": "usr"}}) - def test_account_username_md5(self): + def test_account_username_md5(self) -> None: self.check_transaction( - {"account": {"username_md5": "14c4b06b824ec593239362517f538b29"}} + {"account": {"username_md5": "14c4b06b824ec593239362517f538b29"}}, ) - def test_invalid_account_username_md5s(self): + def test_invalid_account_username_md5s(self) -> None: self.check_invalid_transaction( - {"account": {"username_md5": "14c4b06b824ec593239362517f538b2"}} + {"account": {"username_md5": "14c4b06b824ec593239362517f538b2"}}, ) self.check_invalid_transaction( - {"account": {"username_md5": "14c4b06b824ec593239362517f538b29a"}} + {"account": {"username_md5": "14c4b06b824ec593239362517f538b29a"}}, ) class AddressBase(ValidationBase): - def test_strings(self): + type: str + + def test_strings(self) -> None: for key in ( "first_name", "last_name", @@ -114,33 +116,33 @@ def test_strings(self): ): self.check_transaction_str_type(self.type, key) - def test_region(self): + def test_region(self) -> None: for region in ("A", "AA", "AAA", "ZZZZ"): self.check_transaction({self.type: {"region": region}}) for invalid in ("", "AAAAA", 1, "aaa"): self.check_invalid_transaction({self.type: {"region": invalid}}) - def test_country(self): + def test_country(self) -> None: for country in ("US", "CA", "GB"): self.check_transaction({self.type: {"country": country}}) for invalid in ("", "U1", "USA", 1, "11", "us"): self.check_invalid_transaction({self.type: {"country": invalid}}) - def test_phone_country_code(self): + def test_phone_country_code(self) -> None: for code in (1, "1", "2341"): self.check_transaction({self.type: {"phone_country_code": code}}) for invalid in ("", "12345", "U"): self.check_invalid_transaction({self.type: {"phone_country_code": invalid}}) -class TestBillingAddress(unittest.TestCase, AddressBase): +class TestBillingAddress(AddressBase): type = "billing" -class TestShippingAddress(unittest.TestCase, AddressBase): +class TestShippingAddress(AddressBase): type = "shipping" - def test_delivery_speed(self): + def test_delivery_speed(self) -> None: for speed in ("same_day", "overnight", "expedited", "standard"): self.check_transaction({self.type: {"delivery_speed": speed}}) for invalid in ("fast", "slow", ""): @@ -148,79 +150,79 @@ def test_delivery_speed(self): class TestCreditCard(ValidationBase, unittest.TestCase): - def test_country(self): + def test_country(self) -> None: for code in ("CA", "US"): self.check_transaction({"credit_card": {"country": code}}) for invalid in (1, None, "", "A1", "Canada"): self.check_invalid_transaction({"credit_card": {"country": invalid}}) - def test_issuer_id_number(self): + def test_issuer_id_number(self) -> None: for iin in ("123456", "532313", "88888888"): self.check_transaction({"credit_card": {"issuer_id_number": iin}}) for invalid in ("12345", "1234567", 123456, "12345a"): self.check_invalid_transaction( - {"credit_card": {"issuer_id_number": invalid}} + {"credit_card": {"issuer_id_number": invalid}}, ) - def test_last_digits(self): + def test_last_digits(self) -> None: for last_digits in ("1234", "9323", "34"): self.check_transaction({"credit_card": {"last_digits": last_digits}}) for invalid in ("12345", "123", 1234, "123a"): self.check_invalid_transaction({"credit_card": {"last_digits": invalid}}) self.check_transaction( - {"credit_card": {"issuer_id_number": "88888888", "last_digits": "12"}} + {"credit_card": {"issuer_id_number": "88888888", "last_digits": "12"}}, ) self.check_transaction( - {"credit_card": {"issuer_id_number": "88888888", "last_digits": "1234"}} + {"credit_card": {"issuer_id_number": "88888888", "last_digits": "1234"}}, ) self.check_transaction( - {"credit_card": {"issuer_id_number": "666666", "last_digits": "1234"}} + {"credit_card": {"issuer_id_number": "666666", "last_digits": "1234"}}, ) self.check_transaction( - {"credit_card": {"issuer_id_number": "666666", "last_digits": "34"}} + {"credit_card": {"issuer_id_number": "666666", "last_digits": "34"}}, ) - def test_last_4_digits(self): + def test_last_4_digits(self) -> None: for last_digits in ("1234", "9323", "34"): self.check_transaction({"credit_card": {"last_4_digits": last_digits}}) for invalid in ("12345", "123", 1234, "123a"): self.check_invalid_transaction({"credit_card": {"last_4_digits": invalid}}) - def test_bank_name(self): + def test_bank_name(self) -> None: self.check_transaction_str_type("credit_card", "bank_name") - def test_bank_phone_number(self): + def test_bank_phone_number(self) -> None: self.check_transaction_str_type("credit_card", "bank_phone_number") - def test_phone_country_code(self): + def test_phone_country_code(self) -> None: for code in (1, "1", "2341"): self.check_transaction({"credit_card": {"bank_phone_country_code": code}}) for invalid in ("", "12345", "U"): self.check_invalid_transaction( - {"credit_card": {"bank_phone_country_code": invalid}} + {"credit_card": {"bank_phone_country_code": invalid}}, ) - def test_avs_and_cvv(self): + def test_avs_and_cvv(self) -> None: for key in ("avs_result", "cvv_result"): for code in ("1", "A"): self.check_transaction({"credit_card": {key: code}}) for invalid in ("", "12"): self.check_invalid_transaction( - {"credit_card": {"credit_card": invalid}} + {"credit_card": {"credit_card": invalid}}, ) - def test_token(self): + def test_token(self) -> None: for token in ("123456abc1245", "\x21", "1" * 20): self.check_transaction({"credit_card": {"token": token}}) for invalid in ("\x20", "123456", "x" * 256): self.check_invalid_transaction({"credit_card": {"token": invalid}}) - def test_was_3d_secure_successful(self): + def test_was_3d_secure_successful(self) -> None: self.check_bool("credit_card", "was_3d_secure_successful") class TestCustomInputs(ValidationBase, unittest.TestCase): - def test_valid_inputs(self): + def test_valid_inputs(self) -> None: self.check_transaction( { "custom_inputs": { @@ -228,64 +230,64 @@ def test_valid_inputs(self): "int_input": 19, "float_input": 3.2, "bool_input": True, - } - } + }, + }, ) - def test_invalid(self): + def test_invalid(self) -> None: for invalid in ( {"InvalidKey": 1}, {"too_long": "x" * 256}, {"has_newline": "test\n"}, {"too_big": 1e13}, {"too_small": -1e13}, - {"too_big_float": float(1e13)}, + {"too_big_float": 1e13}, ): self.check_invalid_transaction({"custom_inputs": invalid}) class TestDevice(ValidationBase, unittest.TestCase): - def test_ip_address(self): + def test_ip_address(self) -> None: for ip in ("1.2.3.4", "2001:db8:0:0:1:0:0:1", "::FFFF:1.2.3.4"): self.check_transaction({"device": {"ip_address": ip}}) for invalid in ("1.2.3.", "299.1.1.1", "::AF123"): self.check_invalid_transaction({"device": {"ip_address": invalid}}) - def test_missing_ip(self): + def test_missing_ip(self) -> None: validate_transaction({"device": {}}) validate_transaction( { "device": { "user_agent": "foo", - } - } + }, + }, ) - def test_missing_device(self): + def test_missing_device(self) -> None: validate_transaction({}) - def test_user_agent(self): + def test_user_agent(self) -> None: self.check_transaction_str_type("device", "user_agent") - def test_accept_language(self): + def test_accept_language(self) -> None: self.check_transaction_str_type("device", "accept_language") - def test_session_id(self): + def test_session_id(self) -> None: self.check_transaction_str_type("device", "session_id") - def test_session_age(self): + def test_session_age(self) -> None: for valid in (3600, 0, 25.5): self.check_transaction( - {"device": {"ip_address": "4.4.4.4", "session_age": valid}} + {"device": {"ip_address": "4.4.4.4", "session_age": valid}}, ) for invalid in ("foo", -1): self.check_invalid_transaction( - {"device": {"ip_address": "4.4.4.4", "session_age": invalid}} + {"device": {"ip_address": "4.4.4.4", "session_age": invalid}}, ) class TestEmail(ValidationBase, unittest.TestCase): - def test_address(self): + def test_address(self) -> None: for good in ("test@maxmind.com", "977577b140bfb7c516e4746204fbdb01"): self.check_transaction({"email": {"address": good}}) for bad in ( @@ -295,7 +297,7 @@ def test_address(self): ): self.check_invalid_transaction({"email": {"address": bad}}) - def test_domain(self): + def test_domain(self) -> None: for good in ("maxmind.com", "www.bbc.co.uk"): self.check_transaction({"email": {"domain": good}}) for bad in ("bad ", " bad.com"): @@ -303,19 +305,19 @@ def test_domain(self): class TestEvent(ValidationBase, unittest.TestCase): - def test_transaction(self): + def test_transaction(self) -> None: self.check_transaction_str_type("event", "transaction_id") - def test_shop_id(self): + def test_shop_id(self) -> None: self.check_transaction_str_type("event", "shop_id") - def test_time(self): + def test_time(self) -> None: for good in ("2015-05-08T16:07:56+00:00", "2015-05-08T16:07:56Z"): self.check_transaction({"event": {"time": good}}) for bad in ("2015-05-08T16:07:56", "2015-05-08 16:07:56Z"): self.check_invalid_transaction({"event": {"time": bad}}) - def test_type(self): + def test_type(self) -> None: for good in ( "account_creation", "account_login", @@ -333,31 +335,31 @@ def test_type(self): class TestOrder(ValidationBase, unittest.TestCase): - def test_amount(self): + def test_amount(self) -> None: self.check_positive_number(lambda v: {"order": {"amount": v}}) - def test_currency(self): + def test_currency(self) -> None: for good in ("USD", "GBP"): self.check_transaction({"order": {"currency": good}}) for bad in ("US", "US1", "USDD", "usd"): self.check_invalid_transaction({"order": {"currency": bad}}) - def test_discount_code(self): + def test_discount_code(self) -> None: self.check_transaction_str_type("order", "discount_code") - def test_affiliate_id(self): + def test_affiliate_id(self) -> None: self.check_transaction_str_type("order", "affiliate_id") - def test_subaffiliate_id(self): + def test_subaffiliate_id(self) -> None: self.check_transaction_str_type("order", "subaffiliate_id") - def test_is_gift(self): + def test_is_gift(self) -> None: self.check_bool("order", "is_gift") - def test_has_gift_message(self): + def test_has_gift_message(self) -> None: self.check_bool("order", "has_gift_message") - def test_referrer_uri(self): + def test_referrer_uri(self) -> None: for good in ("http://www.mm.com/fadsf", "https://x.org/"): self.check_transaction({"order": {"referrer_uri": good}}) for bad in ("ftp://a.com/", "www.mm.com"): @@ -365,50 +367,50 @@ def test_referrer_uri(self): class TestPayment(ValidationBase, unittest.TestCase): - def test_processor(self): + def test_processor(self) -> None: for good in ("adyen", "stripe"): self.check_transaction({"payment": {"processor": good}}) for bad in ("notvalid", " stripe"): self.check_invalid_transaction({"payment": {"processor": bad}}) - def test_was_authorized(self): + def test_was_authorized(self) -> None: self.check_bool("payment", "was_authorized") - def test_decline_code(self): + def test_decline_code(self) -> None: self.check_transaction_str_type("payment", "decline_code") class TestShoppingCart(ValidationBase, unittest.TestCase): - def test_category(self): + def test_category(self) -> None: self.check_transaction({"shopping_cart": [{"category": "cat"}]}) - def test_item_id(self): + def test_item_id(self) -> None: self.check_transaction({"shopping_cart": [{"item_id": "cat"}]}) - def test_amount(self): + def test_amount(self) -> None: self.check_positive_number(lambda v: {"shopping_cart": [{"price": v}]}) - def test_quantity(self): + def test_quantity(self) -> None: for good in (1, 1000): self.check_transaction({"shopping_cart": [{"quantity": good}]}) for bad in (1.1, -1, 0): self.check_invalid_transaction({"shopping_cart": [{"quantity": bad}]}) -class TestReport(unittest.TestCase, ValidationBase): - def test_ip_address(self): +class TestReport(ValidationBase): + def test_ip_address(self) -> None: for good in ("182.193.2.1", "a74:777f:3acd:57a0:4e7e:e999:7fe6:1b5b"): self.check_report({"ip_address": good}) for bad in ("1.2.3.", "299.1.1.1", "::AF123", "", None): self.check_invalid_report({"ip_address": bad}) - def test_maxmind_id(self): + def test_maxmind_id(self) -> None: for good in ("12345678", "abcdefgh"): self.check_report({"maxmind_id": good}) for bad in ("1234567", "123456789", "", None): self.check_invalid_report({"maxmind_id": bad}) - def test_minfraud_id(self): + def test_minfraud_id(self) -> None: for good in ( "12345678-1234-1234-1234-123456789012", "1234-5678-1234-1234-1234-1234-5678-9012", @@ -425,7 +427,7 @@ def test_minfraud_id(self): ): self.check_invalid_report({"minfraud_id": bad}) - def test_strings(self): + def test_strings(self) -> None: for key in ( "chargeback_code", "notes", @@ -433,18 +435,25 @@ def test_strings(self): ): self.check_report_str_type(key) - def test_tag(self): + def test_tag(self) -> None: for good in ("chargeback", "not_fraud", "spam_or_abuse", "suspected_fraud"): self.check_report({"tag": good}) for bad in ("risky_business", "", None): self.check_invalid_report({"tag": bad}) - def test_report_valid_identifier(self): + def test_report_valid_identifier(self) -> None: self.check_invalid_report_no_setup({"tag": "chargeback"}) self.check_report_no_setup({"tag": "chargeback", "ip_address": "1.1.1.1"}) self.check_report_no_setup( - {"tag": "chargeback", "minfraud_id": "58fa38d8-4b87-458b-a22b-f00eda1aa20d"} + { + "tag": "chargeback", + "minfraud_id": "58fa38d8-4b87-458b-a22b-f00eda1aa20d", + }, ) self.check_report_no_setup({"tag": "chargeback", "maxmind_id": "12345678"}) self.check_report_no_setup({"tag": "chargeback", "transaction_id": "abc123"}) + + +del AddressBase +del ValidationBase diff --git a/tests/test_webservice.py b/tests/test_webservice.py index 47fab04..ef4e5c0 100644 --- a/tests/test_webservice.py +++ b/tests/test_webservice.py @@ -1,51 +1,61 @@ import asyncio +import builtins import json import os +import unittest from functools import partial -from io import open -from typing import Type, Union -from pytest_httpserver import HTTPServer +from typing import Callable, Union, cast + import pytest +from pytest_httpserver import HTTPServer +import minfraud.webservice from minfraud.errors import ( - HTTPError, - InvalidRequestError, AuthenticationError, + HTTPError, InsufficientFundsError, + InvalidRequestError, MinFraudError, PermissionRequiredError, ) from minfraud.models import Factors, Insights, Score from minfraud.webservice import AsyncClient, Client -import minfraud.webservice -import unittest - minfraud.webservice._SCHEME = "http" class BaseTest(unittest.TestCase): - client_class: Union[Type[AsyncClient], Type[Client]] = Client + client: Union[AsyncClient, Client] + client_class: Union[type[AsyncClient], type[Client]] = Client + type: str + request_file: str + response_file: str @pytest.fixture(autouse=True) - def setup_httpserver(self, httpserver: HTTPServer): + def setup_httpserver(self, httpserver: HTTPServer) -> None: self.httpserver = httpserver - def setUp(self): + def setUp(self) -> None: self.client = self.client_class( 42, "abcdef123456", - host="{0}:{1}".format(self.httpserver.host, self.httpserver.port), + host=f"{self.httpserver.host}:{self.httpserver.port}", ) test_dir = os.path.join(os.path.dirname(__file__), "data") - with open(os.path.join(test_dir, self.request_file), encoding="utf-8") as file: + with builtins.open( + os.path.join(test_dir, self.request_file), + encoding="utf-8", + ) as file: content = file.read() self.full_request = json.loads(content) - with open(os.path.join(test_dir, self.response_file), encoding="utf-8") as file: + with builtins.open( + os.path.join(test_dir, self.response_file), + encoding="utf-8", + ) as file: self.response = file.read() - def test_invalid_auth(self): + def test_invalid_auth(self) -> None: for error in ( "ACCOUNT_ID_REQUIRED", "AUTHORIZATION_INVALID", @@ -58,24 +68,25 @@ def test_invalid_auth(self): status_code=401, ) - def test_invalid_request(self): + def test_invalid_request(self) -> None: with self.assertRaisesRegex(InvalidRequestError, "IP invalid"): self.create_error(text='{"code":"IP_ADDRESS_INVALID","error":"IP invalid"}') - def test_300_error(self): + def test_300_error(self) -> None: with self.assertRaisesRegex( - HTTPError, r"Received an unexpected HTTP status \(300\) for" + HTTPError, + r"Received an unexpected HTTP status \(300\) for", ): self.create_error(status_code=300) - def test_permission_required(self): + def test_permission_required(self) -> None: with self.assertRaisesRegex(PermissionRequiredError, "permission"): self.create_error( text='{"code":"PERMISSION_REQUIRED","error":"permission required"}', status_code=403, ) - def test_400_with_invalid_json(self): + def test_400_with_invalid_json(self) -> None: with self.assertRaisesRegex( HTTPError, "Received a 400 error but it did not include the expected JSON" @@ -83,17 +94,18 @@ def test_400_with_invalid_json(self): ): self.create_error(text="{blah}") - def test_400_with_no_body(self): + def test_400_with_no_body(self) -> None: with self.assertRaisesRegex(HTTPError, "Received a 400 error with no body"): self.create_error() - def test_400_with_unexpected_content_type(self): + def test_400_with_unexpected_content_type(self) -> None: with self.assertRaisesRegex( - HTTPError, "Received a 400 with the following body: b?'?plain'?" + HTTPError, + "Received a 400 with the following body: b?'?plain'?", ): self.create_error(content_type="text/plain", text="plain") - def test_400_without_json_body(self): + def test_400_without_json_body(self) -> None: with self.assertRaisesRegex( HTTPError, "Received a 400 error but it did not include the expected JSON" @@ -101,7 +113,7 @@ def test_400_without_json_body(self): ): self.create_error(text="plain") - def test_400_with_unexpected_json(self): + def test_400_with_unexpected_json(self) -> None: with self.assertRaisesRegex( HTTPError, "Error response contains JSON but it does not specify code or" @@ -109,15 +121,17 @@ def test_400_with_unexpected_json(self): ): self.create_error(text='{"not":"expected"}') - def test_500_error(self): + def test_500_error(self) -> None: with self.assertRaisesRegex(HTTPError, r"Received a server error \(500\) for"): self.create_error(status_code=500) def create_error(self, status_code=400, text="", content_type=None): uri = "/".join( - ["/minfraud/v2.0", "transactions", "report"] - if self.type == "report" - else ["/minfraud/v2.0", self.type] + ( + ["/minfraud/v2.0", "transactions", "report"] + if self.type == "report" + else ["/minfraud/v2.0", self.type] + ), ) if content_type is None: content_type = ( @@ -134,9 +148,11 @@ def create_error(self, status_code=400, text="", content_type=None): def create_success(self, text=None, client=None, request=None): uri = "/".join( - ["/minfraud/v2.0", "transactions", "report"] - if self.type == "report" - else ["/minfraud/v2.0", self.type] + ( + ["/minfraud/v2.0", "transactions", "report"] + if self.type == "report" + else ["/minfraud/v2.0", self.type] + ), ) if request is None: request = self.full_request @@ -156,35 +172,39 @@ def create_success(self, text=None, client=None, request=None): def run_client(self, v): return v - def test_named_constructor_args(self): - id = "47" + def test_named_constructor_args(self) -> None: + id = 47 key = "1234567890ab" for client in ( self.client_class(account_id=id, license_key=key), self.client_class(account_id=id, license_key=key), ): - self.assertEqual(client._account_id, id) + self.assertEqual(client._account_id, str(id)) self.assertEqual(client._license_key, key) - def test_missing_constructor_args(self): + def test_missing_constructor_args(self) -> None: with self.assertRaises(TypeError): - self.client_class(license_key="1234567890ab") + self.client_class(license_key="1234567890ab") # type: ignore[call-arg] with self.assertRaises(TypeError): - self.client_class("47") + self.client_class("47") # type: ignore class BaseTransactionTest(BaseTest): + type: str + cls: Callable + request_file: str + response_file: str - def has_ip_location(self): + def has_ip_location(self) -> bool: return self.type in ["factors", "insights"] - def test_200(self): + def test_200(self) -> None: model = self.create_success() response = json.loads(self.response) cls = self.cls if self.has_ip_location(): - cls = partial(cls, ("en",)) + cls = cast(Callable, partial(cls, ("en",))) self.assertEqual(cls(**response), model) if self.has_ip_location(): self.assertEqual("United Kingdom", model.ip_address.country.name) @@ -193,7 +213,7 @@ def test_200(self): self.assertEqual("004", model.ip_address.traits.mobile_network_code) self.assertEqual("ANONYMOUS_IP", model.ip_address.risk_reasons[0].code) - def test_200_on_request_with_nones(self): + def test_200_on_request_with_nones(self) -> None: model = self.create_success( request={ "device": {"ip_address": "152.216.7.110", "accept_language": None}, @@ -205,13 +225,12 @@ def test_200_on_request_with_nones(self): }, None, ], - } + }, ) - response = self.response self.assertEqual(0.01, model.risk_score) - def test_200_with_email_hashing(self): - uri = "/".join(["/minfraud/v2.0", self.type]) + def test_200_with_email_hashing(self) -> None: + uri = f"/minfraud/v2.0/{self.type}" self.httpserver.expect_request( uri, method="POST", @@ -219,7 +238,7 @@ def test_200_with_email_hashing(self): "email": { "address": "977577b140bfb7c516e4746204fbdb01", "domain": "maxmind.com", - } + }, }, ).respond_with_data( self.response, @@ -232,13 +251,13 @@ def test_200_with_email_hashing(self): # This was fixed in https://github.com/maxmind/minfraud-api-python/pull/78 - def test_200_with_locales(self): + def test_200_with_locales(self) -> None: locales = ("fr",) client = self.client_class( 42, "abcdef123456", locales=locales, - host="{0}:{1}".format(self.httpserver.host, self.httpserver.port), + host=f"{self.httpserver.host}:{self.httpserver.port}", ) model = self.create_success(client=client) response = json.loads(self.response) @@ -250,7 +269,7 @@ def test_200_with_locales(self): self.assertEqual("Royaume-Uni", model.ip_address.country.name) self.assertEqual("Londres", model.ip_address.city.name) - def test_200_with_reserved_ip_warning(self): + def test_200_with_reserved_ip_warning(self) -> None: model = self.create_success( """ { @@ -267,12 +286,12 @@ def test_200_with_reserved_ip_warning(self): } ] } - """ + """, ) self.assertEqual(12, model.risk_score) - def test_200_with_no_risk_score_reasons(self): + def test_200_with_no_risk_score_reasons(self) -> None: if "risk_score_reasons" not in self.response: return @@ -281,7 +300,7 @@ def test_200_with_no_risk_score_reasons(self): model = self.create_success(text=json.dumps(response)) self.assertEqual([], model.risk_score_reasons) - def test_200_with_no_body(self): + def test_200_with_no_body(self) -> None: with self.assertRaisesRegex( MinFraudError, "Received a 200 response but could not decode the response as" @@ -289,7 +308,7 @@ def test_200_with_no_body(self): ): self.create_success(text="") - def test_200_with_invalid_json(self): + def test_200_with_invalid_json(self) -> None: with self.assertRaisesRegex( MinFraudError, "Received a 200 response but could not decode the response as" @@ -297,7 +316,7 @@ def test_200_with_invalid_json(self): ): self.create_success(text="{") - def test_insufficient_funds(self): + def test_insufficient_funds(self) -> None: with self.assertRaisesRegex(InsufficientFundsError, "out of funds"): self.create_error( text='{"code":"INSUFFICIENT_FUNDS","error":"out of funds"}', @@ -331,10 +350,10 @@ class TestReportTransaction(BaseTest): request_file = "full-report-request.json" response_file = "report-response.json" - def test_204(self): + def test_204(self) -> None: self.create_success() - def test_204_on_request_with_nones(self): + def test_204_on_request_with_nones(self) -> None: self.create_success( request={ "ip_address": "81.2.69.60", @@ -343,17 +362,17 @@ def test_204_on_request_with_nones(self): "maxmind_id": None, "minfraud_id": None, "notes": None, - } + }, ) -class AsyncBase: - def setUp(self): +class AsyncBase(unittest.TestCase): + def setUp(self) -> None: self._loop = asyncio.new_event_loop() super().setUp() - def tearDown(self): - self._loop.run_until_complete(self.client.close()) + def tearDown(self) -> None: + self._loop.run_until_complete(self.client.close()) # type: ignore self._loop.close() super().tearDown() @@ -377,4 +396,4 @@ class TestAsyncReportTransaction(AsyncBase, TestReportTransaction): client_class = AsyncClient -del BaseTest, BaseTransactionTest +del AsyncBase, BaseTest, BaseTransactionTest