diff --git a/utils/api_client.py b/clients/api_client.py similarity index 57% rename from utils/api_client.py rename to clients/api_client.py index 9f86e6a..a2380e0 100644 --- a/utils/api_client.py +++ b/clients/api_client.py @@ -6,11 +6,13 @@ class APIClient: - def __init__(self, base_url: str, token: Optional[str] = None, api_key: Optional[str] = None): + def __init__(self, base_url: str, token: Optional[str] = None, refresh_token: Optional[str] = None, api_key: Optional[str] = None): self.base_url = base_url.rstrip('/') self.token = token + self.refresh_token = refresh_token self.api_key = api_key self.client = httpx.Client(timeout=30.0) + self.headers = self._get_headers() def _get_headers(self, additional_headers: Optional[Dict[str, str]] = None) -> Dict[str, str]: headers = {"Content-Type": "application/json"} @@ -80,6 +82,48 @@ def delete(self, endpoint: str, headers: Optional[Dict[str, str]] = None) -> htt return response + @allure.step("PATCH {endpoint}") + def patch(self, endpoint: str, json: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, str]] = None) -> httpx.Response: + url = f"{self.base_url}{endpoint}" + request_headers = self._get_headers(headers) + + with allure.step(f"Request: PATCH {url}"): + allure.attach(str(json), "Request Body", allure.attachment_type.JSON) + response = self.client.patch(url, json=json, headers=request_headers) + allure.attach(response.text, "Response Body", allure.attachment_type.JSON) + allure.attach(str(response.status_code), "Status Code", allure.attachment_type.TEXT) + + return response + + @allure.step("Custom Request {method} {endpoint}") + def send_custom_request(self, method: str, endpoint: str, + json: Optional[Dict[str, Any]] = None, + params: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, str]] = None) -> httpx.Response: + if method.upper() == "GET": + return self.get(endpoint, params=params, headers=headers) + elif method.upper() == "POST": + return self.post(endpoint, json=json, headers=headers) + elif method.upper() == "PUT": + return self.put(endpoint, json=json, headers=headers) + elif method.upper() == "PATCH": + return self.patch(endpoint, json=json, headers=headers) + elif method.upper() == "DELETE": + return self.delete(endpoint, headers=headers) + else: + raise ValueError(f"Unsupported HTTP method: {method}") + + def set_cookies(self, cookies: Dict[str, str]): + for name, value in cookies.items(): + self.client.cookies.set(name, value) + + def get_cookies(self) -> Dict[str, str]: + return dict(self.client.cookies) + + def clear_cookies(self): + self.client.cookies.clear() + def close(self): self.client.close() @@ -126,3 +170,36 @@ def register_worker(self, worker_id: str, worker_data: Dict[str, Any]) -> httpx. def confirm_deletion(self, node_id: str) -> httpx.Response: return self.post(f"/internal/nodes/{node_id}/confirm-delete") + + +class AuthAPIClient(APIClient): + + def __init__(self, settings: Settings, token: Optional[str] = None, refresh_token: Optional[str] = None): + super().__init__( + base_url=settings.cp_nodes_api_url, + token=token, + refresh_token=refresh_token + ) + + def login(self, username: Optional[str] = None, password: Optional[str] = None) -> httpx.Response: + return self.post("/v1/auth/login", json={"username": username, "password": password}) + + def post_refresh(self, refresh_token: str) -> httpx.Response: + return self.post("/v1/auth/refresh", json={"refresh_token": refresh_token}) + + def logout(self, refresh_token: Optional[str] = None) -> httpx.Response: + payload = {"refresh_token": refresh_token} if refresh_token else {} + return self.post("/v1/auth/logout", json=payload) + + def get_profile(self) -> httpx.Response: + return self.get("/v1/auth/profile") + + def change_password(self, old_password: Optional[str] = None, new_password: Optional[str] = None) -> httpx.Response: + return self.put("/v1/auth/password", json={"old_password": old_password, "new_password": new_password}) + + def change_username(self, new_username: Optional[str] = None) -> httpx.Response: + return self.put("/v1/auth/username", json={"new_username": new_username}) + + def get_audit_log(self, page: Optional[int] = None, page_size: Optional[int] = None) -> httpx.Response: + params = {"page": page, "page_size": page_size} + return self.get("/v1/auth/audit-log", params=params) diff --git a/utils/eth_client.py b/clients/eth_client.py similarity index 100% rename from utils/eth_client.py rename to clients/eth_client.py diff --git a/fixtures/api_fixtures.py b/fixtures/api_fixtures.py index 5b26554..7ef2138 100644 --- a/fixtures/api_fixtures.py +++ b/fixtures/api_fixtures.py @@ -1,9 +1,11 @@ import pytest -from utils.api_client import NodesAPIClient, InternalAPIClient +from clients.api_client import NodesAPIClient, InternalAPIClient, AuthAPIClient from config.settings import Settings from faker import Faker -fake = Faker() +@pytest.fixture(scope="session") +def faker(): + return Faker() @pytest.fixture(scope="session") def nodes_api_client(config: Settings): @@ -19,16 +21,65 @@ def internal_api_client(config: Settings): client.close() -@pytest.fixture -def authenticated_client(config: Settings): +@pytest.fixture(scope="session") +def authenticated_nodes_client(config: Settings): client = NodesAPIClient(config) yield client client.close() -@pytest.fixture -def invalid_username(): - return fake.email() + +@pytest.fixture(scope="function") +def auth_client(config: Settings): + client = AuthAPIClient(config) + yield client + client.close() + + +@pytest.fixture(scope="function") +def authenticated_auth_client(config: Settings): + client = AuthAPIClient(config) + if config.user_log and config.user_pass: + response = client.login(config.user_log, config.user_pass) + if response.status_code == 200: + token = response.json().get("access_token") + refresh_token = response.json().get("refresh_token") + client.token = token + client.refresh_token = refresh_token + else: + raise Exception("Login failed") + yield client + client.close() + @pytest.fixture -def invalid_password(): - return fake.password() \ No newline at end of file +def valid_credentials(config: Settings): + return { + "username": config.user_log, + "password": config.user_pass + } + + +@pytest.fixture(scope="function") +def invalid_username(faker): + return faker.email() + +@pytest.fixture(scope="function") +def valid_username(faker): + return faker.name() + +@pytest.fixture(scope="function") +def valid_password(faker): + return faker.password() + + +@pytest.fixture(scope="function") +def invalid_password(faker): + return faker.password() + + +@pytest.fixture(scope="function") +def invalid_credentials(invalid_username, invalid_password): + return { + "username": invalid_username, + "password": invalid_password + } diff --git a/fixtures/eth_fixtures.py b/fixtures/eth_fixtures.py index bb59b5a..77814bd 100644 --- a/fixtures/eth_fixtures.py +++ b/fixtures/eth_fixtures.py @@ -1,5 +1,5 @@ import pytest -from utils.eth_client import EthereumClient +from clients.eth_client import EthereumClient from config.settings import Settings diff --git a/openapi.yaml b/openapi.yaml new file mode 100644 index 0000000..934b01b --- /dev/null +++ b/openapi.yaml @@ -0,0 +1,1196 @@ +openapi: 3.0.3 +info: + title: CP Deployments API + description: API for managing deployments and presets in the Chainstack Control Panel + version: 1.0.0 + contact: + name: Chainstack Team + +servers: + - url: http://localhost:8080 + description: Local development server + +paths: + /healthz: + get: + tags: + - Health + summary: Health check endpoint + operationId: healthCheck + responses: + '200': + description: Service is healthy + content: + application/json: + schema: + type: object + properties: + status: + type: string + example: ok + + # Preset endpoints + /v1/presets/types: + get: + tags: + - Presets + summary: List all preset types + operationId: listPresetTypes + responses: + '200': + description: Successfully retrieved preset types + content: + application/json: + schema: + type: object + properties: + results: + type: array + items: + $ref: '#/components/schemas/PresetType' + '503': + description: Preset registry unavailable + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '500': + description: Internal server error + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + /v1/presets/types/{id}: + get: + tags: + - Presets + summary: Get a specific preset type by ID + operationId: getPresetType + parameters: + - name: id + in: path + required: true + description: Preset type ID + schema: + type: string + responses: + '200': + description: Successfully retrieved preset type + content: + application/json: + schema: + $ref: '#/components/schemas/PresetType' + '400': + description: Invalid request + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '404': + description: Preset not found + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '500': + description: Internal server error + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + /v1/presets/instances: + get: + tags: + - Presets + summary: List all preset instances + operationId: listPresetInstances + responses: + '200': + description: Successfully retrieved preset instances + content: + application/json: + schema: + type: object + properties: + results: + type: array + items: + $ref: '#/components/schemas/PresetInstance' + '503': + description: Preset registry unavailable + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '500': + description: Internal server error + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + /v1/presets/instances/{id}: + get: + tags: + - Presets + summary: Get a specific preset instance by ID + operationId: getPresetInstance + parameters: + - name: id + in: path + required: true + description: Preset instance ID + schema: + type: string + responses: + '200': + description: Successfully retrieved preset instance + content: + application/json: + schema: + $ref: '#/components/schemas/PresetInstance' + '400': + description: Invalid request + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '404': + description: Preset not found + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '500': + description: Internal server error + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + # UI Deployment endpoints + /v1/ui/deployments: + post: + tags: + - UI Deployments + summary: Create a new deployment + operationId: createDeployment + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/CreateDeploymentRequest' + responses: + '201': + description: Deployment created successfully + content: + application/json: + schema: + $ref: '#/components/schemas/CreateDeploymentResponse' + '400': + description: Invalid request + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '500': + description: Internal server error + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + get: + tags: + - UI Deployments + summary: List all deployments + operationId: listDeployments + responses: + '200': + description: Successfully retrieved deployments + content: + application/json: + schema: + type: object + properties: + results: + type: array + items: + $ref: '#/components/schemas/Deployment' + total: + type: integer + description: Total number of deployments + '500': + description: Internal server error + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + /v1/ui/deployments/{id}: + get: + tags: + - UI Deployments + summary: Get a specific deployment + operationId: getDeployment + parameters: + - name: id + in: path + required: true + description: Deployment ID + schema: + type: string + format: uuid + responses: + '200': + description: Successfully retrieved deployment + content: + application/json: + schema: + $ref: '#/components/schemas/Deployment' + '400': + description: Invalid deployment ID + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '404': + description: Deployment not found + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '500': + description: Internal server error + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + /v1/ui/deployments/{id}/revisions: + get: + tags: + - UI Deployments + summary: List all revisions for a deployment + operationId: listRevisions + parameters: + - name: id + in: path + required: true + description: Deployment ID + schema: + type: string + format: uuid + responses: + '200': + description: Successfully retrieved revisions + content: + application/json: + schema: + type: object + properties: + results: + type: array + items: + $ref: '#/components/schemas/DeploymentRevision' + total: + type: integer + description: Total number of revisions + '400': + description: Invalid deployment ID + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '404': + description: Deployment not found + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '500': + description: Internal server error + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + post: + tags: + - UI Deployments + summary: Create a new revision for a deployment + operationId: createRevision + parameters: + - name: id + in: path + required: true + description: Deployment ID + schema: + type: string + format: uuid + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/CreateRevisionRequest' + responses: + '201': + description: Revision created successfully + content: + application/json: + schema: + $ref: '#/components/schemas/CreateRevisionResponse' + '400': + description: Invalid request + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '500': + description: Internal server error + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + /v1/ui/deployments/{id}/revisions/{revision_id}: + get: + tags: + - UI Deployments + summary: Get a specific revision + operationId: getRevision + parameters: + - name: id + in: path + required: true + description: Deployment ID + schema: + type: string + format: uuid + - name: revision_id + in: path + required: true + description: Revision ID + schema: + type: string + format: uuid + responses: + '200': + description: Successfully retrieved revision + content: + application/json: + schema: + $ref: '#/components/schemas/DeploymentRevision' + '400': + description: Invalid deployment ID or revision ID + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '404': + description: Deployment or revision not found + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '500': + description: Internal server error + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + /v1/ui/deployments/{id}/schedule-delete: + post: + tags: + - UI Deployments + summary: Schedule a deployment for deletion + operationId: scheduleDeleteDeployment + parameters: + - name: id + in: path + required: true + description: Deployment ID + schema: + type: string + format: uuid + responses: + '200': + description: Deletion scheduled successfully + content: + application/json: + schema: + $ref: '#/components/schemas/ScheduleDeleteResponse' + '400': + description: Invalid deployment ID + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '500': + description: Internal server error + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + # Auth endpoints + /v1/auth/login: + post: + tags: + - Authentication + summary: Authenticate user and obtain tokens + operationId: login + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/LoginRequest' + responses: + '200': + description: Successfully authenticated + content: + application/json: + schema: + $ref: '#/components/schemas/LoginResponse' + '400': + description: Invalid request + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '401': + description: Invalid credentials + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '500': + description: Internal server error + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + /v1/auth/refresh: + post: + tags: + - Authentication + summary: Refresh access token using refresh token + operationId: refreshToken + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/RefreshTokenRequest' + responses: + '200': + description: Token refreshed successfully + content: + application/json: + schema: + $ref: '#/components/schemas/RefreshTokenResponse' + '400': + description: Invalid request + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '401': + description: Invalid or expired refresh token + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '500': + description: Internal server error + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + /v1/auth/logout: + post: + tags: + - Authentication + summary: Logout user and revoke tokens + operationId: logout + security: + - BearerAuth: [] + requestBody: + required: false + content: + application/json: + schema: + $ref: '#/components/schemas/LogoutRequest' + responses: + '200': + description: Successfully logged out + content: + application/json: + schema: + $ref: '#/components/schemas/LogoutResponse' + '401': + description: Unauthorized - invalid or missing token + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '500': + description: Internal server error + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + /v1/auth/profile: + get: + tags: + - Authentication + summary: Get current user profile + operationId: getProfile + security: + - BearerAuth: [] + responses: + '200': + description: Successfully retrieved profile + content: + application/json: + schema: + $ref: '#/components/schemas/UserProfile' + '401': + description: Unauthorized - invalid or missing token + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '500': + description: Internal server error + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + /v1/auth/password: + put: + tags: + - Authentication + summary: Change user password + description: > + Change password for the current user. + NOTE: This endpoint is not properly implemented in cp-auth service. + The current ChangePassword method is admin-only and doesn't verify old password. + Currently returns 501 Not Implemented. + operationId: changePassword + security: + - BearerAuth: [] + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/ChangePasswordRequest' + responses: + '200': + description: Password changed successfully + content: + application/json: + schema: + $ref: '#/components/schemas/ChangePasswordResponse' + '400': + description: Invalid request + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '401': + description: Unauthorized - invalid or missing token + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '501': + description: Not implemented - requires cp-auth service update with user-facing password change + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '500': + description: Internal server error + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + /v1/auth/username: + put: + tags: + - Authentication + summary: Change username + description: > + Change the username for the current user. + NOTE: This endpoint requires implementation in cp-auth service. + Currently returns 501 Not Implemented. + operationId: changeUsername + security: + - BearerAuth: [] + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/ChangeUsernameRequest' + responses: + '200': + description: Username changed successfully + content: + application/json: + schema: + $ref: '#/components/schemas/UserProfile' + '400': + description: Invalid request + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '401': + description: Unauthorized - invalid or missing token + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '501': + description: Not implemented - requires cp-auth service update + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '500': + description: Internal server error + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + /v1/auth/audit-log: + get: + tags: + - Authentication + summary: Get user audit log + description: > + Retrieve audit log entries for the current user. + NOTE: This endpoint requires implementation in cp-auth service. + Currently returns 501 Not Implemented. + operationId: getAuditLog + security: + - BearerAuth: [] + parameters: + - name: page + in: query + description: Page number (1-indexed) + schema: + type: integer + minimum: 1 + default: 1 + - name: page_size + in: query + description: Number of items per page + schema: + type: integer + minimum: 1 + maximum: 100 + default: 20 + responses: + '200': + description: Successfully retrieved audit log + content: + application/json: + schema: + $ref: '#/components/schemas/AuditLogResponse' + '400': + description: Invalid request parameters + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '401': + description: Unauthorized - invalid or missing token + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '501': + description: Not implemented - requires cp-auth service update + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '500': + description: Internal server error + content: + application/json: + schema: + $ref: '#/components/schemas/Error' +components: + schemas: + # Error response + Error: + type: object + properties: + error: + type: string + description: Error message + required: + - error + + # Preset schemas + PresetType: + type: object + description: CTI Preset Type from the registry + additionalProperties: true + # Note: These are dynamic objects from CTI registry (metadata.EntityType) + + PresetInstance: + type: object + description: CTI Preset Instance from the registry + additionalProperties: true + # Note: These are dynamic objects from CTI registry (metadata.EntityInstance) + + # Deployment schemas + DeploymentState: + type: string + enum: + - pending + - running + - failed + - deleting + - deleted + + Deployment: + type: object + properties: + id: + type: string + format: uuid + state: + $ref: '#/components/schemas/DeploymentState' + created_at: + type: string + format: date-time + updated_at: + type: string + format: date-time + cti: + type: object + description: > + CTI object - may contain various fields depending on the preset type. + This is a dynamic object from CTI registry. + additionalProperties: true + + + required: + - id + - state + - created_at + - updated_at + + RevisionStatus: + type: string + enum: + - pending + - applied + - failed + + Metadata: + type: object + description: Client's metadata information + properties: + id: + type: string + format: uuid + description: Unique identifier for the Metadata entry + name: + type: string + description: CTI identifier (e.g., cti.c.cp.ethereum_mainnet_reth_prysm.v1.0) + example: "cti.c.cp.ethereum_mainnet_reth_prysm.v1.0" + data: + type: object + description: CTI metadata.EntityType as JSON object containing schema, display_name, description, etc. + additionalProperties: true + created_at: + type: string + format: date-time + updated_at: + type: string + format: date-time + required: + - id + - name + - data + - created_at + - updated_at + + DeploymentRevision: + type: object + properties: + id: + type: string + format: uuid + deployment_id: + type: string + format: uuid + preset_type: + type: string + description: CTI ID for the preset type + preset_instance_values: + type: object + additionalProperties: true + version: + type: integer + status: + $ref: '#/components/schemas/RevisionStatus' + error_message: + type: string + nullable: true + created_at: + type: string + format: date-time + updated_at: + type: string + format: date-time + applied_at: + type: string + format: date-time + nullable: true + metadata: + type: array + items: + $ref: '#/components/schemas/Metadata' + description: List of provisioned clients (or another artifacts) for this revision + required: + - id + - deployment_id + - preset_type + - preset_instance_values + - version + - status + - created_at + - updated_at + + # Request/Response schemas + CreateDeploymentRequest: + type: object + properties: + preset_instance_id: + type: string + description: CTI preset instance ID (e.g., cti.c.cp.ethereum_mainnet_reth_prysm.v1.0) + preset_override_values: + type: object + additionalProperties: true + description: Values to override from the preset instance (deep merge strategy - overrides apply to leaf values only) + required: + - preset_instance_id + + CreateDeploymentResponse: + type: object + properties: + deployment_id: + type: string + format: uuid + initial_revision_id: + type: string + format: uuid + state: + type: string + required: + - deployment_id + - initial_revision_id + - state + + CreateRevisionRequest: + type: object + properties: + preset_instance_id: + type: string + description: CTI preset instance ID (e.g., cti.c.cp.ethereum_mainnet_reth_prysm.v1.0) + preset_override_values: + type: object + additionalProperties: true + description: Values to override from the preset instance (deep merge strategy - overrides apply to leaf values only) + required: + - preset_instance_id + + CreateRevisionResponse: + type: object + properties: + revision_id: + type: string + format: uuid + deployment_id: + type: string + format: uuid + version: + type: integer + state: + type: string + required: + - revision_id + - deployment_id + - version + - state + + ScheduleDeleteResponse: + type: object + properties: + deployment_id: + type: string + format: uuid + state: + type: string + required: + - deployment_id + - state + + # Auth schemas + LoginRequest: + type: object + properties: + username: + type: string + description: Username or email + example: "user@example.com" + password: + type: string + format: password + description: User password + required: + - username + - password + + LoginResponse: + type: object + properties: + access_token: + type: string + description: JWT access token + refresh_token: + type: string + description: Refresh token for obtaining new access tokens + expires_in: + type: integer + format: int64 + description: Access token expiration time in seconds + user: + $ref: '#/components/schemas/UserProfile' + required: + - access_token + - refresh_token + - expires_in + - user + + RefreshTokenRequest: + type: object + properties: + refresh_token: + type: string + description: Refresh token obtained from login + required: + - refresh_token + + RefreshTokenResponse: + type: object + properties: + access_token: + type: string + description: New JWT access token + expires_in: + type: integer + format: int64 + description: Access token expiration time in seconds + required: + - access_token + - expires_in + + LogoutRequest: + type: object + properties: + refresh_token: + type: string + description: Optional refresh token to revoke. If not provided, extracted from Authorization header. + nullable: true + + LogoutResponse: + type: object + properties: + message: + type: string + example: "Successfully logged out" + required: + - message + + UserProfile: + type: object + properties: + id: + type: string + format: uuid + description: User ID + username: + type: string + description: Username + email: + type: string + format: email + description: User email + tenant_id: + type: string + format: uuid + description: Tenant ID + tenant_name: + type: string + description: Tenant name + tenant_role: + type: string + description: User role in the tenant + example: "admin" + last_login: + type: string + format: date-time + description: Last login timestamp + created_at: + type: string + format: date-time + description: Account creation timestamp + required: + - id + - username + - email + - tenant_id + - tenant_name + - tenant_role + - created_at + + ChangePasswordRequest: + type: object + properties: + old_password: + type: string + format: password + description: Current password + new_password: + type: string + format: password + description: New password + required: + - old_password + - new_password + + ChangePasswordResponse: + type: object + properties: + message: + type: string + example: "Password changed successfully" + required: + - message + + ChangeUsernameRequest: + type: object + properties: + new_username: + type: string + description: New username + minLength: 3 + maxLength: 50 + required: + - new_username + + AuditLogEntry: + type: object + properties: + id: + type: string + format: uuid + description: Audit log entry ID + user_id: + type: string + format: uuid + description: User ID who performed the action + action: + type: string + description: Action performed + example: "login" + enum: + - login + - logout + - password_change + - username_change + - profile_update + - failed_login + ip_address: + type: string + description: IP address of the request + example: "192.168.1.1" + user_agent: + type: string + description: User agent string + timestamp: + type: string + format: date-time + description: When the action occurred + details: + type: object + additionalProperties: true + description: Additional action-specific details + required: + - id + - user_id + - action + - timestamp + + AuditLogResponse: + type: object + properties: + results: + type: array + items: + $ref: '#/components/schemas/AuditLogEntry' + total: + type: integer + description: Total number of audit log entries + page: + type: integer + description: Current page number + page_size: + type: integer + description: Number of items per page + required: + - results + - total + - page + - page_size + + securitySchemes: + BearerAuth: + type: http + scheme: bearer + bearerFormat: JWT + description: JWT token obtained from /v1/auth/login endpoint + +tags: + - name: Health + description: Health check endpoints + - name: Presets + description: CTI preset management endpoints + - name: UI Deployments + description: Deployment management endpoints for UI + - name: Internal + description: Internal endpoints for worker communication + - name: Authentication + description: User authentication and profile management endpoints diff --git a/tests/core/__init__.py b/tests/api/__init__.py similarity index 100% rename from tests/core/__init__.py rename to tests/api/__init__.py diff --git a/tests/api/cases/const.py b/tests/api/cases/const.py new file mode 100644 index 0000000..cea5938 --- /dev/null +++ b/tests/api/cases/const.py @@ -0,0 +1 @@ +MAX_64_BIT_INT = 2**63 \ No newline at end of file diff --git a/tests/api/cases/test_cases.py b/tests/api/cases/test_cases.py new file mode 100644 index 0000000..b4afa3e --- /dev/null +++ b/tests/api/cases/test_cases.py @@ -0,0 +1,4 @@ +EMPTY_STRING_CASES = ["", " ", " ", "\t", "\n", "\r"] +NONSTRING_CASES = [123, True, None, [123], {"key": "value"}, (1,2,3), {1,2,3}, 123.45] +NONINTEGER_CASES = [True, None, [123], {"key": "value"}, (1,2,3), {1,2,3}, 123.45, "123", "null", "true", "false"] +WEAK_PASSWORD_CASES = ["123", "abcd", "password", "12345678", "qwerty123", "PASSWORD123", "p@ssword123"] \ No newline at end of file diff --git a/tests/api/schemas/__init__.py b/tests/api/schemas/__init__.py new file mode 100644 index 0000000..ca7b18e --- /dev/null +++ b/tests/api/schemas/__init__.py @@ -0,0 +1,22 @@ +from pydantic import ValidationError +import pytest + + +def validate_schema(response_data: dict, schema_class): + """ + Validate response data against a Pydantic schema. + + Args: + response_data: The JSON response data + schema_class: The Pydantic model class to validate against + + Returns: + The validated Pydantic model instance + + Raises: + pytest.fail if validation fails + """ + try: + return schema_class(**response_data) + except ValidationError as e: + pytest.fail(f"Schema validation failed for {schema_class.__name__}: {e}") diff --git a/tests/api/schemas/auth_schemas.py b/tests/api/schemas/auth_schemas.py new file mode 100644 index 0000000..27baab2 --- /dev/null +++ b/tests/api/schemas/auth_schemas.py @@ -0,0 +1,55 @@ +from pydantic import BaseModel, Field, EmailStr +from typing import Optional +from datetime import datetime + + +class UserProfile(BaseModel): + id: str + username: str + email: EmailStr + tenant_id: str + tenant_name: str + tenant_role: str + created_at: datetime + last_login: Optional[datetime] = None + + +class LoginResponse(BaseModel): + access_token: str + refresh_token: str + expires_in: int = Field(gt=0) + user: UserProfile + + +class RefreshTokenResponse(BaseModel): + access_token: str + expires_in: int = Field(gt=0) + + +class LogoutResponse(BaseModel): + message: str + + +class ErrorResponse(BaseModel): + error: str + + +class ChangePasswordResponse(BaseModel): + message: str + + +class AuditLogEntry(BaseModel): + id: str + user_id: str + action: str + timestamp: datetime + ip_address: Optional[str] = None + user_agent: Optional[str] = None + details: Optional[dict] = None + + +class AuditLogResponse(BaseModel): + results: list[AuditLogEntry] + total: int + page: int + page_size: int diff --git a/tests/api/test_auth_audit_log.py b/tests/api/test_auth_audit_log.py new file mode 100644 index 0000000..f1cb43d --- /dev/null +++ b/tests/api/test_auth_audit_log.py @@ -0,0 +1,242 @@ +import pytest +import allure +from pydantic import ValidationError +from tests.api.schemas.auth_schemas import AuditLogResponse, ErrorResponse +from tests.api.cases.const import MAX_64_BIT_INT +from tests.api.cases.test_cases import NONINTEGER_CASES +from utils.token_generator import generate_invalid_bearer_tokens +import base64 + + +@allure.feature("Authentication") +@allure.story("Audit Log") +@pytest.mark.api +class TestAuditLogGeneral: + + @allure.title("Get audit log successfully with default pagination") + @allure.severity(allure.severity_level.CRITICAL) + @pytest.mark.smoke + def test_get_audit_log_success(self, authenticated_auth_client): + response = authenticated_auth_client.get_audit_log() + + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + try: + audit_log = AuditLogResponse(**response.json()) + assert isinstance(audit_log.results, list), "Results should be a list" + assert audit_log.total >= 0, "Total should be non-negative" + assert audit_log.page >= 1, "Page should be at least 1" + assert audit_log.page_size > 0, "Page size should be positive" + except ValidationError as e: + pytest.fail(f"Audit log response schema validation failed: {e}") + +@allure.feature("Authentication") +@allure.story("Audit Log") +@pytest.mark.api +class TestAuditLogPaginationQueryParams: + + @allure.title("Get audit log with valid pagination parameters") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("page", [1, 2, 10, 50, 100, MAX_64_BIT_INT-1]) + @pytest.mark.parametrize("page_size", [10, 20, 50, MAX_64_BIT_INT-1]) + def test_get_audit_log_with_pagination(self, authenticated_auth_client, page, page_size): + response = authenticated_auth_client.get_audit_log(page=page, page_size=page_size) + + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + audit_log = AuditLogResponse(**response.json()) + assert audit_log.page == page, "Page should match requested page" + assert audit_log.page_size == page_size, "Page size should match requested size" + + @allure.title("Get audit log handler handles 64 bit integer") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("page, page_size", [(MAX_64_BIT_INT, MAX_64_BIT_INT), (MAX_64_BIT_INT+1, MAX_64_BIT_INT+1)]) + def test_get_audit_log_with_64_bit_int(self, authenticated_auth_client, page, page_size): + response = authenticated_auth_client.get_audit_log(page=page, page_size=page_size) + + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + audit_log = AuditLogResponse(**response.json()) + assert audit_log.page == page, "Page should match requested page" + assert audit_log.page_size == page_size, "Page size should match requested size" + + @allure.title("Get audit log with invalid page number") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("invalid_page", [0, -1, -10]) + def test_get_audit_log_invalid_page(self, authenticated_auth_client, invalid_page): + response = authenticated_auth_client.get_audit_log(page=invalid_page, page_size=20) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Get audit log with invalid page size") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("invalid_size", [0, -1, -10]) + def test_get_audit_log_invalid_page_size(self, authenticated_auth_client, invalid_size): + response = authenticated_auth_client.get_audit_log(page=1, page_size=invalid_size) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Get audit log with non-integer page parameter") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("invalid_page", NONINTEGER_CASES) + def test_get_audit_log_noninteger_page(self, authenticated_auth_client, invalid_page): + response = authenticated_auth_client.get_audit_log(page=invalid_page, page_size=20) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Get audit log with non-integer page_size parameter") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("invalid_size", NONINTEGER_CASES) + def test_get_audit_log_noninteger_page_size(self, authenticated_auth_client, invalid_size): + response = authenticated_auth_client.get_audit_log(page=1, page_size=invalid_size) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Get audit log handles request with page param only") + @allure.severity(allure.severity_level.NORMAL) + def test_get_audit_log_page_only(self, authenticated_auth_client): + response = authenticated_auth_client.get_audit_log(page=1) + + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + AuditLogResponse(**response.json()) + + @allure.title("Get audit log handles request with page_size param only") + @allure.severity(allure.severity_level.NORMAL) + def test_get_audit_log_page_size_only(self, authenticated_auth_client): + response = authenticated_auth_client.get_audit_log(page_size=20) + + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + AuditLogResponse(**response.json()) + + @allure.title("Get audit log with extra query parameters") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("extra_field, extra_param", [("extra_field", "extra_param"), ("test", "test"), ("page_length", "1"), ("page_volume", "20")]) + def test_get_audit_log_extra_params(self, authenticated_auth_client, extra_field, extra_param): + response = authenticated_auth_client.send_custom_request( + "GET", + "/v1/auth/audit-log", + params={ + "page": 1, + "page_size": 20, + extra_field: extra_param + } + ) + + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + assert extra_field not in response.json(), f"Extra field {extra_field} should not be in response" + AuditLogResponse(**response.json()) + +@allure.feature("Authentication") +@allure.story("Audit Log") +@pytest.mark.api +class TestAuditLogStructure: + @allure.title("Audit log entry structure validation") + @allure.severity(allure.severity_level.NORMAL) + def test_audit_log_entry_structure(self, authenticated_auth_client): + response = authenticated_auth_client.get_audit_log(page=1, page_size=20) + + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + audit_log = AuditLogResponse(**response.json()) + if len(audit_log.results) > 0: + entry = audit_log.results[0] + assert entry.id, "Audit entry should have id" + assert entry.user_id, "Audit entry should have user_id" + assert entry.action, "Audit entry should have action" + assert entry.timestamp, "Audit entry should have timestamp" + + @allure.title("Audit log updates as expected") + @allure.severity(allure.severity_level.NORMAL) + def test_audit_log_entry_structure_with_user_actions(self, authenticated_auth_client, valid_credentials, valid_username): + authenticated_auth_client.logout() + authenticated_auth_client.login(valid_credentials["username"], valid_credentials["password"]) + authenticated_auth_client.change_username(valid_username) + valid_credentials["username"] = valid_username + response = authenticated_auth_client.get_audit_log() + profile_response = authenticated_auth_client.get_profile() + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + assert response.json()["results"][0]["action"] == "change_username", "Audit log should have change_username action" + assert response.json()["results"][0]["user_id"] == profile_response.json()["user_id"], "Audit log should have user_id" + assert response.json()["results"][1]["action"] == "login", "Audit log should have login action" + assert response.json()["results"][1]["user_id"] == profile_response.json()["user_id"], "Audit log should have user_id" + assert response.json()["results"][2]["action"] == "logout", "Audit log should have logout action" + assert response.json()["results"][2]["user_id"] == profile_response.json()["user_id"], "Audit log should have user_id" + +@allure.feature("Authentication") +@allure.story("Audit Log") +@pytest.mark.api +class TestAuditLogAccess: + + @allure.title("Get audit log without access token") + @allure.severity(allure.severity_level.CRITICAL) + @pytest.mark.smoke + def test_get_audit_log_without_access_token(self, auth_client): + response = auth_client.get_audit_log() + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + + @allure.title("Get audit log with invalid access token") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("invalid_token", generate_invalid_bearer_tokens()) + def test_get_audit_log_with_invalid_access_token(self, authenticated_auth_client, invalid_token): + token = authenticated_auth_client.token + authenticated_auth_client.token = invalid_token + response = authenticated_auth_client.get_audit_log() + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.token = token + #TODO Add cases for IDOR / Broken access control when user creation flow will be clarified + + + @allure.title("Get audit log with wrong auth type") + @allure.severity(allure.severity_level.NORMAL) + def test_get_audit_log_with_wrong_auth_type(self, authenticated_auth_client): + headers = authenticated_auth_client.headers.copy() + authenticated_auth_client.headers["Authorization"] = "Basic " + base64.b64encode(authenticated_auth_client.token.encode()).decode() + response = authenticated_auth_client.get_audit_log() + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.headers = headers + + @allure.title("Get audit log with wrong auth format") + @allure.severity(allure.severity_level.NORMAL) + def test_get_audit_log_with_wrong_auth_format(self, authenticated_auth_client): + headers = authenticated_auth_client.headers.copy() + authenticated_auth_client.headers["Authorization"] = "Bearer " + base64.b64encode(authenticated_auth_client.token.encode()).decode() + response = authenticated_auth_client.get_audit_log() + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.headers = headers + + @allure.title("Get audit log with too long access token") + @allure.severity(allure.severity_level.NORMAL) + def test_get_audit_log_with_too_long_access_token(self, authenticated_auth_client): + headers = authenticated_auth_client.headers.copy() + authenticated_auth_client.headers["Authorization"] = "Bearer " + "a" * 20480 + response = authenticated_auth_client.get_audit_log() + assert response.status_code == 431, f"Expected 431, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.headers = headers + + @allure.title("Get audit log with revoked access token") + @allure.severity(allure.severity_level.NORMAL) + def test_get_audit_log_with_revoked_access_token(self, authenticated_auth_client, valid_credentials): + authenticated_auth_client.logout() + response = authenticated_auth_client.get_audit_log() + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.login(valid_credentials["username"], valid_credentials["password"]) + + @allure.title("Get audit log check response headers") + @allure.severity(allure.severity_level.NORMAL) + def test_get_audit_log_check_response_headers(self, authenticated_auth_client): + response = authenticated_auth_client.get_audit_log() + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + assert response.headers["Cache-Control"] == "no-store", "Cache-Control should be no-store" + assert response.headers["Expires"] == "0", "Expires should be 0" + assert response.headers["Pragma"] == "no-cache", "Pragma should be no-cache" + + + + + + diff --git a/tests/api/test_auth_authorization.py b/tests/api/test_auth_authorization.py new file mode 100644 index 0000000..8d376df --- /dev/null +++ b/tests/api/test_auth_authorization.py @@ -0,0 +1,230 @@ +import pytest +import allure +from utils.token_generator import generate_invalid_bearer_tokens + + +@allure.feature("Authentication") +@allure.story("Authorization & Access Control") +@pytest.mark.api +class TestAuthorization: + + @allure.title("Protected endpoints require Bearer token") + @allure.severity(allure.severity_level.CRITICAL) + @pytest.mark.smoke + @pytest.mark.parametrize("endpoint, method", [ + ("/v1/auth/profile", "GET"), + ("/v1/auth/logout", "POST"), + ("/v1/auth/password", "PUT"), + ("/v1/auth/username", "PUT"), + ("/v1/auth/audit-log", "GET") + ]) + def test_protected_endpoints_require_token(self, auth_client, endpoint, method): + with allure.step(f"Testing {method} {endpoint} without token"): + response = auth_client.send_custom_request(method, endpoint) + assert response.status_code == 401, \ + f"{method} {endpoint} should return 401 without token, got {response.status_code}" + + data = response.json() + assert "error" in data, f"{method} {endpoint} should return error message" + + @allure.title("Invalid Bearer token format returns 401") + @allure.severity(allure.severity_level.CRITICAL) + @pytest.mark.smoke + @pytest.mark.parametrize("token", generate_invalid_bearer_tokens()) + def test_invalid_token_format(self, auth_client, token): + with allure.step(f"Testing with invalid token: {str(token)[:50]}..."): + auth_client.token = token + response = auth_client.get_profile() + + assert response.status_code == 401, \ + f"Invalid token should return 401, got {response.status_code}" + + # TODO: Generate expired token + @allure.title("Expired token returns 401") + @allure.severity(allure.severity_level.CRITICAL) + def test_expired_token_rejected(self, auth_client): + expired_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyLCJleHAiOjE1MTYyMzkwMjJ9.4Adcj0vVzr7B8Y8P9nGJ5pZXkJZ5JZ5JZ5JZ5JZ5JZ5" + + auth_client.token = expired_token + response = auth_client.get_profile() + + assert response.status_code == 401, f"Expired token should return 401, got {response.status_code}" + + data = response.json() + assert "error" in data, "Response should contain error message" + + @allure.title("Token from different user cannot access another user's resources") + @allure.severity(allure.severity_level.CRITICAL) + def test_token_isolation(self, auth_client, valid_credentials): + login_response = auth_client.login( + valid_credentials["username"], + valid_credentials["password"] + ) + assert login_response.status_code == 200 + + user1_token = login_response.json()["access_token"] + user1_id = login_response.json()["user"]["id"] + + auth_client.token = user1_token + profile_response = auth_client.get_profile() + assert profile_response.status_code == 200 + + profile_user_id = profile_response.json()["id"] + assert profile_user_id == user1_id, "Profile should match the authenticated user" + + @allure.title("Authorization header must use Bearer scheme") + @allure.severity(allure.severity_level.NORMAL) + def test_bearer_scheme_required(self, auth_client, valid_credentials): + login_response = auth_client.login( + valid_credentials["username"], + valid_credentials["password"] + ) + assert login_response.status_code == 200 + + token = login_response.json()["access_token"] + + response = auth_client.get( + "/v1/auth/profile", + headers={"Authorization": token} + ) + + assert response.status_code == 401, \ + "Authorization header without Bearer scheme should return 401" + + @allure.title("Case-sensitive Bearer token validation") + @allure.severity(allure.severity_level.MINOR) + def test_bearer_case_sensitivity(self, auth_client, valid_credentials): + login_response = auth_client.login( + valid_credentials["username"], + valid_credentials["password"] + ) + assert login_response.status_code == 200 + + token = login_response.json()["access_token"] + + response = auth_client.get( + "/v1/auth/profile", + headers={"Authorization": f"bearer {token}"} + ) + #TODO clarify expected behavior + assert response.status_code in [200, 401], \ + "Server should handle Bearer scheme case sensitivity consistently" + + @allure.title("Multiple Authorization headers handled correctly") + @allure.severity(allure.severity_level.MINOR) + def test_multiple_auth_headers(self, auth_client, valid_credentials): + login_response = auth_client.login( + valid_credentials["username"], + valid_credentials["password"] + ) + assert login_response.status_code == 200 + + token = login_response.json()["access_token"] + + response = auth_client.client.get( + f"{auth_client.base_url}/v1/auth/profile", + headers=[ + ("Authorization", f"Bearer {token}"), + ("Authorization", "Bearer invalid_duplicate_token"), + ] + ) + #TODO clarify expected behavior + assert response.status_code in [200, 400, 401], \ + f"Expected 200/400/401 for duplicate headers, got {response.status_code}" + + @allure.title("Token cannot be reused after logout") + @allure.severity(allure.severity_level.CRITICAL) + def test_token_revoked_after_logout(self, auth_client, valid_credentials): + login_response = auth_client.login( + valid_credentials["username"], + valid_credentials["password"] + ) + assert login_response.status_code == 200 + + access_token = login_response.json()["access_token"] + refresh_token = login_response.json()["refresh_token"] + + auth_client.token = access_token + + profile_before = auth_client.get_profile() + assert profile_before.status_code == 200, "Token should work before logout" + + logout_response = auth_client.logout(refresh_token) + assert logout_response.status_code == 200 + + profile_after = auth_client.get_profile() + assert profile_after.status_code == 401, "Token should be revoked after logout" + + @allure.title("Public endpoints accessible without authentication") + @allure.severity(allure.severity_level.NORMAL) + def test_public_endpoints_no_auth(self, auth_client): + public_endpoints = [ + "/healthz", + "/v1/auth/login", + "/v1/auth/refresh", + ] + + for endpoint in public_endpoints: + with allure.step(f"Testing public endpoint: {endpoint}"): + if endpoint == "/healthz": + response = auth_client.get(endpoint) + assert response.status_code in [200, 404], \ + f"{endpoint} should be accessible without auth" + + @allure.title("CORS headers present in auth responses") + @allure.severity(allure.severity_level.MINOR) + def test_cors_headers_present(self, auth_client, valid_credentials): + response = auth_client.login( + valid_credentials["username"], + valid_credentials["password"] + ) + + assert response.status_code == 200 + #TODO:Clarify expected cors headers + assert response.headers["Access-Control-Allow-Origin"] == "*" + assert response.headers["Access-Control-Allow-Methods"] == "GET, POST, OPTIONS" + assert response.headers["Access-Control-Allow-Headers"] == "Authorization, Content-Type" + assert response.headers["Access-Control-Allow-Credentials"] == "true" + + @allure.title("Rate limiting on authentication endpoints") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.slow + def test_rate_limiting(self, auth_client, invalid_credentials): + """ + TODO: Clarify rate limit thresholds: + - How many requests allowed? + - Time window (per minute/hour)? + - Per IP or per username? + - Retry-After header format? + """ + max_attempts = 50 + rate_limited = False + status_codes = [] + + for i in range(max_attempts): + response = auth_client.login( + invalid_credentials["username"], + invalid_credentials["password"] + ) + status_codes.append(response.status_code) + + if response.status_code == 429: + rate_limited = True + with allure.step(f"Rate limit triggered after {i + 1} attempts"): + # Check for Retry-After header + retry_after = response.headers.get("Retry-After") + if retry_after: + allure.attach( + f"Retry-After: {retry_after}", + name="Rate Limit Headers", + attachment_type=allure.attachment_type.TEXT + ) + break + + if rate_limited: + assert response.status_code == 429, "Rate limit should return 429" + else: + with allure.step(f"No rate limiting detected after {max_attempts} attempts"): + pytest.skip(f"Rate limiting not implemented or threshold > {max_attempts}") + + diff --git a/tests/api/test_auth_login.py b/tests/api/test_auth_login.py new file mode 100644 index 0000000..cc5c57d --- /dev/null +++ b/tests/api/test_auth_login.py @@ -0,0 +1,295 @@ +import pytest +import allure +from pydantic import ValidationError +from tests.api.schemas.auth_schemas import LoginResponse, ErrorResponse, UserProfile +from tests.api.cases.test_cases import EMPTY_STRING_CASES, NONSTRING_CASES + +def randomize_valid_credentials(valid_credentials): + return [ + (valid_credentials["username"].upper(), valid_credentials["password"]), + (valid_credentials["username"].lower(), valid_credentials["password"]), + (valid_credentials["username"], valid_credentials["password"].upper()), + (valid_credentials["username"], valid_credentials["password"].lower()), + (valid_credentials["username"].upper(), valid_credentials["password"].upper()), + (valid_credentials["username"].lower(), valid_credentials["password"].lower()), + (valid_credentials["username"].upper(), valid_credentials["password"].lower()), + (valid_credentials["username"].lower(), valid_credentials["password"].upper()), + (valid_credentials["username"].swapcase(), valid_credentials["password"]), + (valid_credentials["username"], valid_credentials["password"].swapcase()), + (valid_credentials["username"].swapcase(), valid_credentials["password"].swapcase()) + ] + +def stripcase_valid_credentials(valid_credentials): + return [ + (f" {valid_credentials["username"]}", valid_credentials["password"]), + (f"{valid_credentials["username"]} ", valid_credentials["password"]), + (f" {valid_credentials["username"]} ", valid_credentials["password"]), + (valid_credentials["username"], f" {valid_credentials["password"]}"), + (valid_credentials["username"], f"{valid_credentials["password"]} "), + (valid_credentials["username"], f" {valid_credentials["password"]} "), + (f" {valid_credentials["username"]}", f" {valid_credentials["password"]}"), + (f"{valid_credentials["username"]} ", f"{valid_credentials["password"]} "), + (f" {valid_credentials["username"]} ", f" {valid_credentials["password"]} "), + ] + + +@allure.feature("Authentication") +@allure.story("Login") +@pytest.mark.api +class TestLoginGeneral: + + @allure.title("Successful login with valid credentials") + @allure.severity(allure.severity_level.CRITICAL) + @pytest.mark.smoke + def test_login_success(self, auth_client, valid_credentials): + response = auth_client.login( + valid_credentials["username"], + valid_credentials["password"] + ) + + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + + try: + login_data = LoginResponse(**response.json()) + assert login_data.access_token, "access_token should not be empty" + assert login_data.refresh_token, "refresh_token should not be empty" + #TODO add token TTL check + except ValidationError as e: + pytest.fail(f"Response schema validation failed: {e}") + +@allure.feature("Authentication") +@allure.story("Login") +@pytest.mark.api +class TestLoginFieldsValidation: + + @allure.title("Login fails with invalid username") + @allure.severity(allure.severity_level.CRITICAL) + @pytest.mark.smoke + def test_login_invalid_username(self, auth_client, invalid_username, valid_credentials): + response = auth_client.login( + invalid_username, + valid_credentials["password"] + ) + + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + + + @allure.title("Login fails with invalid password") + @allure.severity(allure.severity_level.CRITICAL) + @pytest.mark.smoke + def test_login_invalid_password(self, auth_client, valid_credentials, invalid_password): + response = auth_client.login( + valid_credentials["username"], + invalid_password + ) + + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Login fails with both invalid credentials") + @allure.severity(allure.severity_level.NORMAL) + def test_login_invalid_credentials(self, auth_client, invalid_credentials): + response = auth_client.login( + invalid_credentials["username"], + invalid_credentials["password"] + ) + + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Login fails with stripcases for username and password") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("username, password", stripcase_valid_credentials(valid_credentials)) + def test_login_stripcases(self, auth_client, username, password): + response = auth_client.login(username, password) + + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Login fails with wrong case for username and password") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("username, password", randomize_valid_credentials(valid_credentials)) + def test_login_wrong_case(self, auth_client, username, password): + response = auth_client.login(username, password) + + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + + + @allure.title("Login fails with bad type username") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("username", NONSTRING_CASES) + def test_login_bad_type_username(self, auth_client, username, valid_credentials): + response = auth_client.login(username, valid_credentials["password"]) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Login fails with bad type password") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("password", NONSTRING_CASES) + def test_login_bad_type_password(self, auth_client, valid_credentials, password): + response = auth_client.login(valid_credentials["username"], password) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Login fails with missing username") + @allure.severity(allure.severity_level.NORMAL) + def test_login_missing_username(self, auth_client, valid_credentials): + response = auth_client.login(password=valid_credentials["password"]) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Login fails with missing password") + @allure.severity(allure.severity_level.NORMAL) + def test_login_missing_password(self, auth_client, valid_credentials): + response = auth_client.login(password=valid_credentials["username"]) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Login fails with empty username") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("username", EMPTY_STRING_CASES) + def test_login_empty_username(self, auth_client, valid_credentials, username): + response = auth_client.login(username, valid_credentials["password"]) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Login fails with empty password") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("password", EMPTY_STRING_CASES) + def test_login_empty_password(self, auth_client, valid_credentials, password): + response = auth_client.login(valid_credentials["username"], password) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + +@allure.feature("Authentication") +@allure.story("Login") +@pytest.mark.api +class TestLoginQueryManipulation: + + @allure.title("Login fails with malformed JSON") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("json", [ + "", + "{}", + "{", + "}", + '{"username": "user"', + '{"username": "user",}', + '{"username": user}', + "{username: \"user\"}", + '{"username": "user", "password": }', + '["username", "password"', + '{"username": "user" "password": "1"}', + '{"username": "юзер", "password": "пароль"', + "null", + "true", + "123", + ]) + def test_login_malformed_json(self, auth_client, json): + response = auth_client.client.post( + f"{auth_client.base_url}/v1/auth/login", + content=json, + headers={"Content-Type": "application/json"} + ) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + + @allure.title("Login without content type") + @allure.severity(allure.severity_level.NORMAL) + def test_login_without_content_type(self, auth_client, valid_credentials): + headers = auth_client.headers.copy() + headers.pop("Content-Type") + response = auth_client.login(valid_credentials["username"], valid_credentials["password"]) + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + auth_client.headers = headers + + @allure.title("Login with wrong content type") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("content_type", ["text/plain", "application/xml", "application/json; charset=utf-8"]) + def test_login_with_wrong_content_type(self, auth_client, valid_credentials, content_type): + headers = auth_client.headers.copy() + headers["Content-Type"] = content_type + response = auth_client.login(valid_credentials["username"], valid_credentials["password"]) + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + auth_client.headers = headers + + @allure.title("Login with wrong method") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("method", ["PUT", "PATCH"]) + def test_login_with_wrong_method(self, auth_client, valid_credentials, method): + response = auth_client.send_custom_request(method, "/v1/auth/login", json={ + "username": valid_credentials["username"], + "password": valid_credentials["password"] + }) + assert response.status_code == 405, f"Expected 405, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Login check cache") + @allure.severity(allure.severity_level.NORMAL) + def test_login_check_cache(self, auth_client, valid_credentials): + response = auth_client.login(valid_credentials["username"], valid_credentials["password"]) + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + assert response.headers["Cache-Control"] == "no-cache, no-store, must-revalidate", "Cache-Control header should be set to no-cache, no-store, must-revalidate" + assert response.headers["Pragma"] == "no-cache", "Pragma header should be set to no-cache" + + @allure.title("Login response contains correct CORS headers") + @allure.severity(allure.severity_level.MINOR) + def test_login_cors_headers(self, auth_client, valid_credentials): + response = auth_client.login( + valid_credentials["username"], + valid_credentials["password"] + ) + + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + + cors_headers = response.headers + #TODO clarify partial CORS headers + assert cors_headers["Access-Control-Allow-Origin"] == "*" + assert cors_headers["Access-Control-Allow-Methods"] == "GET, POST, PUT, DELETE, OPTIONS" + assert cors_headers["Access-Control-Allow-Headers"] == "Content-Type, Authorization" + assert cors_headers["Access-Control-Allow-Credentials"] == "true" + + @allure.title("Multiple successful logins generate different tokens") + @allure.severity(allure.severity_level.NORMAL) + def test_login_multiple_tokens(self, auth_client, valid_credentials, authenticated_auth_client): + response1 = auth_client.login( + valid_credentials["username"], + valid_credentials["password"] + ) + + response2 = auth_client.login( + valid_credentials["username"], + valid_credentials["password"] + ) + + assert response1.status_code == 200 + assert response2.status_code == 200 + + token1 = response1.json()["access_token"] + token2 = response2.json()["access_token"] + + assert token1 != token2, "Multiple logins should generate different access tokens" + + authenticated_auth_client.token = token1 + get_profile_response = authenticated_auth_client.get_profile() + + assert get_profile_response.status_code == 200 + UserProfile(**get_profile_response.json()) + + authenticated_auth_client.token = token2 + get_profile_response = authenticated_auth_client.get_profile() + + assert get_profile_response.status_code == 200 + UserProfile(**get_profile_response.json()) + + diff --git a/tests/api/test_auth_logout.py b/tests/api/test_auth_logout.py new file mode 100644 index 0000000..6a5e8ef --- /dev/null +++ b/tests/api/test_auth_logout.py @@ -0,0 +1,236 @@ +import pytest +import allure +from pydantic import ValidationError +from tests.api.schemas.auth_schemas import LogoutResponse, ErrorResponse, LoginResponse, UserProfile +from tests.api.cases.test_cases import EMPTY_STRING_CASES +from utils.token_generator import generate_invalid_refresh_tokens, generate_expired_token +import base64 + + +@allure.feature("Authentication") +@allure.story("Logout") +@pytest.mark.api +class TestLogoutGeneral: + + @allure.title("Successfully logout with valid token") + @allure.severity(allure.severity_level.CRITICAL) + @pytest.mark.smoke + def test_logout_success(self, authenticated_auth_client): + logout_response = authenticated_auth_client.logout(authenticated_auth_client.refresh_token) + assert logout_response.status_code == 200, f"Expected 200, got {logout_response.status_code}" + + try: + logout_data = LogoutResponse(**logout_response.json()) + assert logout_data.message, "Message should not be empty" + except ValidationError as e: + pytest.fail(f"Logout response schema validation failed: {e}") + + +class TestLogoutRefreshToken: + @allure.title("Successfully logout without providing refresh token in body") + @allure.severity(allure.severity_level.NORMAL) + def test_logout_without_refresh_token_in_body(self, authenticated_auth_client): + logout_response = authenticated_auth_client.logout() + assert logout_response.status_code == 200, f"Expected 200, got {logout_response.status_code}" + + try: + logout_data = LogoutResponse(**logout_response.json()) + assert logout_data.message, "Message should not be empty" + except ValidationError as e: + pytest.fail(f"Logout response schema validation failed: {e}") + + @allure.title("Logout with empty string refresh token") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("empty_string", EMPTY_STRING_CASES) + def test_logout_with_empty_string_refresh_token_in_body(self, authenticated_auth_client, empty_string): + logout_response = authenticated_auth_client.logout(empty_string) + assert logout_response.status_code == 401, f"Expected 401, got {logout_response.status_code}" + ErrorResponse(**logout_response.json()) + + @allure.title("Token is invalidated after logout") + @allure.severity(allure.severity_level.CRITICAL) + def test_token_invalidated_after_logout(self, authenticated_auth_client): + profile_response_before = authenticated_auth_client.get_profile() + assert profile_response_before.status_code == 200, "Token should work before logout" + + logout_response = authenticated_auth_client.logout(authenticated_auth_client.refresh_token) + + assert logout_response.status_code == 200, f"Expected 200, got {logout_response.status_code}" + LogoutResponse(**logout_response.json()) + + profile_response_after = authenticated_auth_client.get_profile() + assert profile_response_after.status_code == 401, f"Expected 401, got {profile_response_after.status_code}" + ErrorResponse(**profile_response_after.json()) + + + @allure.title("Refresh token is invalidated after logout") + @allure.severity(allure.severity_level.CRITICAL) + def test_refresh_token_invalidated_after_logout(self,authenticated_auth_client): + logout_response = authenticated_auth_client.logout(authenticated_auth_client.refresh_token) + assert logout_response.status_code == 200, f"Expected 200, got {logout_response.status_code}" + LogoutResponse(**logout_response.json()) + + refresh_response = authenticated_auth_client.post_refresh(authenticated_auth_client.refresh_token) + assert refresh_response.status_code == 401, f"Expected 401, got {refresh_response.status_code}" + ErrorResponse(**refresh_response.json()) + + + @allure.title("Logout with wrong method") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("method", ["PUT", "PATCH"]) + def test_logout_with_wrong_method(self, authenticated_auth_client, method): + logout_response = authenticated_auth_client.send_custom_request(method, "/v1/auth/logout", json={"refresh_token": authenticated_auth_client.refresh_token}) + assert logout_response.status_code == 405, f"Expected 405, got {logout_response.status_code}" + ErrorResponse(**logout_response.json()) + +@allure.feature("Authentication") +@allure.story("Logout") +@pytest.mark.api +class TestLogoutAcess: + + @allure.title("Logout without authentication token") + @allure.severity(allure.severity_level.CRITICAL) + def test_logout_without_auth_token(self, authenticated_auth_client): + token = authenticated_auth_client.token + authenticated_auth_client.token = None + logout_response = authenticated_auth_client.logout() + assert logout_response.status_code == 401, f"Expected 401, got {logout_response.status_code}" + ErrorResponse(**logout_response.json()) + authenticated_auth_client.token = token + + @allure.title("Logout with invalid authentication token") + @allure.severity(allure.severity_level.CRITICAL) + @pytest.mark.parametrize("invalid_token", generate_invalid_refresh_tokens()) + def test_logout_with_invalid_token(self, authenticated_auth_client, invalid_token): + token = authenticated_auth_client.token + authenticated_auth_client.token = invalid_token + logout_response = authenticated_auth_client.logout() + assert logout_response.status_code == 401, f"Expected 401, got {logout_response.status_code}" + ErrorResponse(**logout_response.json()) + authenticated_auth_client.token = token + + @allure.title("Logout with with wrong auth type") + @allure.severity(allure.severity_level.CRITICAL) + def test_logout_with_invalid_schema(self, authenticated_auth_client): + token = authenticated_auth_client.token + authenticated_auth_client.token = "Basic " + base64.b64encode(token.encode()).decode() + logout_response = authenticated_auth_client.logout() + assert logout_response.status_code == 401, f"Expected 401, got {logout_response.status_code}" + ErrorResponse(**logout_response.json()) + authenticated_auth_client.token = token + + @allure.title("Logout with wrong auth format") + @allure.severity(allure.severity_level.CRITICAL) + def test_logout_with_invalid_auth_format(self, authenticated_auth_client): + token = authenticated_auth_client.token + authenticated_auth_client.token = "Bearer " + base64.b64encode(token.encode()).decode() + logout_response = authenticated_auth_client.logout() + assert logout_response.status_code == 401, f"Expected 401, got {logout_response.status_code}" + ErrorResponse(**logout_response.json()) + authenticated_auth_client.token = token + + @allure.title("Logout with too long access token") + @allure.severity(allure.severity_level.CRITICAL) + def test_logout_with_too_long_access_token(self, authenticated_auth_client): + headers = authenticated_auth_client.headers.copy() + authenticated_auth_client.headers["Authorization"] = "Bearer " + "a" * 20480 + response = authenticated_auth_client.logout() + assert response.status_code == 431, f"Expected 431, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.headers = headers + + @allure.title("Logout check response headers") + @allure.severity(allure.severity_level.CRITICAL) + def test_logout_check_response_headers(self, authenticated_auth_client): + response = authenticated_auth_client.logout() + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + assert response.headers["Cache-Control"] == "no-store", "Cache-Control should be no-store" + assert response.headers["Expires"] == "0", "Expires should be 0" + assert response.headers["Pragma"] == "no-cache", "Pragma should be no-cache" + +class TestLogoutRaceConditions: + + @allure.title("Logout with multiple requests") + @allure.severity(allure.severity_level.NORMAL) + def test_logout_multiple_times(self, authenticated_auth_client): + logout_response1 = authenticated_auth_client.logout(refresh_token=authenticated_auth_client.refresh_token) + logout_response2 = authenticated_auth_client.logout(refresh_token=authenticated_auth_client.refresh_token) + + assert logout_response1.status_code == 200, f"Expected 200, got {logout_response1.status_code}" + LogoutResponse(**logout_response1.json()) + assert logout_response2.status_code == 401, f"Expected 401, got {logout_response2.status_code}" + ErrorResponse(**logout_response2.json()) + + @allure.title("Logout and refresh token at the same time") + @allure.severity(allure.severity_level.NORMAL) + def test_logout_and_refresh_token_at_the_same_time(self, authenticated_auth_client): + logout_response = authenticated_auth_client.logout(refresh_token=authenticated_auth_client.refresh_token) + refresh_response = authenticated_auth_client.refresh_token(refresh_token=authenticated_auth_client.refresh_token) + assert logout_response.status_code == 200, f"Expected 200, got {logout_response.status_code}" + LogoutResponse(**logout_response.json()) + assert refresh_response.status_code == 401, f"Expected 401, got {refresh_response.status_code}" + ErrorResponse(**refresh_response.json()) + + +@allure.feature("Authentication") +@allure.story("Logout") +@pytest.mark.api +class TestLogoutQueryManipulation: + + @allure.title("Logout fails with malformed JSON") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("json", [ + "", + "{}", + "{", + "}", + '{"refresh_token": "qwerty"', + '{"refresh_token": "qwerty",}', + '{"refresh_token": qwerty}', + "{refresh_token: \"qwerty\"}", + '{"refresh_token": "qwerty", "refresh_token": }', + '["refresh_token",', + '{"refresh_token": }', + '{"refresh_token": "юникод"}', + "null", + "true", + "123", + ]) + def test_logout_malformed_json(self, auth_client, json): + response = auth_client.client.post( + f"{auth_client.base_url}/v1/auth/logout", + content=json, + headers={"Content-Type": "application/json"} + ) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + + @allure.title("Logout without content type") + @allure.severity(allure.severity_level.NORMAL) + def test_logout_without_content_type(self, auth_client): + headers = auth_client.headers.copy() + auth_client.headers.pop("Content-Type") + response = auth_client.logout() + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + auth_client.headers = headers + + @allure.title("Logout with wrong content type") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("content_type", ["text/plain", "application/xml", "application/json; charset=utf-8"]) + def test_logout_with_wrong_content_type(self, auth_client, content_type): + headers = auth_client.headers.copy() + auth_client.headers["Content-Type"] = content_type + response = auth_client.logout() + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + auth_client.headers = headers + + @allure.title("Logout check cache") + @allure.severity(allure.severity_level.NORMAL) + def test_logout_check_cache(self, auth_client): + response = auth_client.logout() + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + assert response.headers["Cache-Control"] == "no-cache, no-store, must-revalidate", "Cache-Control header should be set to no-cache, no-store, must-revalidate" + assert response.headers["Pragma"] == "no-cache", "Pragma header should be set to no-cache" + diff --git a/tests/api/test_auth_password.py b/tests/api/test_auth_password.py new file mode 100644 index 0000000..48b9e34 --- /dev/null +++ b/tests/api/test_auth_password.py @@ -0,0 +1,280 @@ +import pytest +import allure +from pydantic import ValidationError +from tests.api.schemas.auth_schemas import ChangePasswordResponse, ErrorResponse +from tests.api.cases.test_cases import EMPTY_STRING_CASES, NONSTRING_CASES +import base64 + +@allure.feature("Authentication") +@allure.story("Password Change") +@pytest.mark.api +class TestPasswordChangeGeneral: + + @allure.title("Change password successfully with valid credentials") + @allure.severity(allure.severity_level.CRITICAL) + @pytest.mark.smoke + # TODO: clarify credentials requirements + def test_change_password_success(self, authenticated_auth_client, valid_credentials, valid_password): + response = authenticated_auth_client.change_password( + old_password=valid_credentials["password"], + new_password=valid_password + ) + + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + try: + change_response = ChangePasswordResponse(**response.json()) + assert change_response.message, "Response should contain success message" + valid_credentials["password"] = valid_password + except ValidationError as e: + pytest.fail(f"Password change response schema validation failed: {e}") + +@allure.feature("Authentication") +@allure.story("Password Change") +@pytest.mark.api +class TestPasswordChangeFieldsValidation: + + @allure.title("Change password with missing old password") + @allure.severity(allure.severity_level.NORMAL) + def test_change_password_missing_old_password(self, authenticated_auth_client, valid_password): + response = authenticated_auth_client.change_password( + new_password=valid_password + ) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Change password with missing new password") + @allure.severity(allure.severity_level.NORMAL) + def test_change_password_missing_new_password(self, authenticated_auth_client, valid_credentials): + response = authenticated_auth_client.change_password( + old_password=valid_credentials["password"] + ) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + if response.status_code == 400: + ErrorResponse(**response.json()) + + @allure.title("Change password with empty old password") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("old_password", EMPTY_STRING_CASES) + def test_change_password_empty_old_password(self, authenticated_auth_client, old_password, valid_password): + response = authenticated_auth_client.change_password( + old_password=old_password, + new_password=valid_password + ) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Change password with empty new password") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("new_password", EMPTY_STRING_CASES) + def test_change_password_empty_new_password(self, authenticated_auth_client, valid_credentials, new_password): + response = authenticated_auth_client.change_password( + old_password=valid_credentials["password"], + new_password=new_password + ) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Change password with non-string old password") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("old_password", NONSTRING_CASES) + def test_change_password_nonstring_old_password(self, authenticated_auth_client, old_password, valid_password): + response = authenticated_auth_client.change_password( + old_password=old_password, + new_password=valid_password + ) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Change password with non-string new password") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("new_password", NONSTRING_CASES) + def test_change_password_nonstring_new_password(self, authenticated_auth_client, valid_credentials, new_password): + response = authenticated_auth_client.change_password( + old_password=valid_credentials["password"], + new_password=new_password + ) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Change password with weak new password") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("weak_password", WEAK_PASSWORD_CASES) + def test_change_password_weak_password(self, authenticated_auth_client, valid_credentials, weak_password): + response = authenticated_auth_client.change_password( + old_password=valid_credentials["password"], + new_password=weak_password + ) + + assert response.status_code == 400, f"Expected 400 or 501, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Change password with same old and new password") + @allure.severity(allure.severity_level.NORMAL) + def test_change_password_same_passwords(self, authenticated_auth_client, valid_credentials): + response = authenticated_auth_client.change_password( + old_password=valid_credentials["password"], + new_password=valid_credentials["password"] + ) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Change password with incorrect old password") + @allure.severity(allure.severity_level.NORMAL) + def test_change_password_wrong_old_password(self, authenticated_auth_client, valid_password, invalid_password): + response = authenticated_auth_client.change_password( + old_password=invalid_password, + new_password=valid_password + ) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + +@allure.feature("Authentication") +@allure.story("Password Change") +@pytest.mark.api +class TestPasswordChangeAccess: + + @allure.title("Change password without authentication token") + @allure.severity(allure.severity_level.CRITICAL) + def test_change_password_without_auth_token(self, authenticated_auth_client, valid_credentials, faker): + token = authenticated_auth_client.token + authenticated_auth_client.token = None + logout_response = authenticated_auth_client.change_password( + old_password=valid_credentials["password"], + new_password=faker.password() + ) + assert logout_response.status_code == 401, f"Expected 401, got {logout_response.status_code}" + ErrorResponse(**logout_response.json()) + authenticated_auth_client.token = token + + @allure.title("Change password with invalid authentication token") + @allure.severity(allure.severity_level.CRITICAL) + @pytest.mark.parametrize("invalid_token", generate_invalid_refresh_tokens()) + def test_change_password_with_invalid_token(self, authenticated_auth_client, invalid_token, valid_credentials, faker): + token = authenticated_auth_client.token + authenticated_auth_client.token = invalid_token + logout_response = authenticated_auth_client.change_password( + old_password=valid_credentials["password"], + new_password=faker.password() + ) + assert logout_response.status_code == 401, f"Expected 401, got {logout_response.status_code}" + ErrorResponse(**logout_response.json()) + authenticated_auth_client.token = token + + @allure.title("Change password with with wrong auth type") + @allure.severity(allure.severity_level.CRITICAL) + def test_change_password_with_invalid_schema(self, authenticated_auth_client, valid_credentials, faker): + token = authenticated_auth_client.token + authenticated_auth_client.token = "Basic " + base64.b64encode(token.encode()).decode() + logout_response = authenticated_auth_client.change_password( + old_password=valid_credentials["password"], + new_password=faker.password() + ) + assert logout_response.status_code == 401, f"Expected 401, got {logout_response.status_code}" + ErrorResponse(**logout_response.json()) + authenticated_auth_client.token = token + + @allure.title("Change password with wrong auth format") + @allure.severity(allure.severity_level.CRITICAL) + def test_change_password_with_invalid_auth_format(self, authenticated_auth_client, valid_credentials, faker): + token = authenticated_auth_client.token + authenticated_auth_client.token = "Bearer " + base64.b64encode(token.encode()).decode() + logout_response = authenticated_auth_client.change_password( + old_password=valid_credentials["password"], + new_password=faker.password() + ) + assert logout_response.status_code == 401, f"Expected 401, got {logout_response.status_code}" + ErrorResponse(**logout_response.json()) + authenticated_auth_client.token = token + + @allure.title("Change password with too long access token") + @allure.severity(allure.severity_level.CRITICAL) + def test_change_password_with_too_long_access_token(self, authenticated_auth_client, valid_credentials, faker): + headers = authenticated_auth_client.headers.copy() + authenticated_auth_client.headers["Authorization"] = "Bearer " + "a" * 20480 + response = authenticated_auth_client.change_password( + old_password=valid_credentials["password"], + new_password=faker.password() + ) + assert response.status_code == 431, f"Expected 431, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.headers = headers + + @allure.title("Change password check response headers") + @allure.severity(allure.severity_level.CRITICAL) + def test_change_password_check_response_headers(self, authenticated_auth_client, valid_credentials, faker): + response = authenticated_auth_client.change_password( + old_password=valid_credentials["password"], + new_password=faker.password() + ) + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + assert response.headers["Cache-Control"] == "no-store", "Cache-Control should be no-store" + assert response.headers["Expires"] == "0", "Expires should be 0" + assert response.headers["Pragma"] == "no-cache", "Pragma should be no-cache" + +@allure.feature("Authentication") +@allure.story("Password Change") +@pytest.mark.api +class TestPasswordChangeQueryManipulation: + + @allure.title("Change password with extra fields in request") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("extra_field, extra_value", [("extra_field", "extra_value"), ("extra_field2", "extra_value2"), ("extra_field3", "extra_value3")]) + def test_change_password_extra_fields(self, authenticated_auth_client, valid_credentials, valid_password, extra_field, extra_value): + response = authenticated_auth_client.send_custom_request( + method="PUT", + endpoint="/v1/auth/password", + json={ + "old_password": valid_credentials["password"], + "new_password": valid_password, + extra_field: extra_value + } + ) + + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + assert extra_field not in response.json(), f"Extra field {extra_field} should not be in response" + + @allure.title("Change password without content type") + @allure.severity(allure.severity_level.NORMAL) + def test_change_password_without_content_type(self, authenticated_auth_client, valid_credentials, valid_password): + headers = authenticated_auth_client.headers.copy() + headers.pop("Content-Type") + response = authenticated_auth_client.change_password( + old_password=valid_credentials["password"], + new_password=valid_password + ) + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.headers = headers + + @allure.title("Change password with wrong content type") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("content_type", ["text/plain", "application/xml", "application/json; charset=utf-8"]) + def test_change_password_with_wrong_content_type(self, authenticated_auth_client, valid_credentials, valid_password, content_type): + headers = authenticated_auth_client.headers.copy() + headers["Content-Type"] = content_type + response = authenticated_auth_client.change_password( + old_password=valid_credentials["password"], + new_password=valid_password + ) + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.headers = headers + + @allure.title("Change password check cache") + @allure.severity(allure.severity_level.NORMAL) + def test_change_password_check_cache(self, authenticated_auth_client, valid_credentials, valid_password): + response = authenticated_auth_client.change_password( + old_password=valid_credentials["password"], + new_password=valid_password + ) + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + assert response.headers["Cache-Control"] == "no-cache, no-store, must-revalidate", "Cache-Control header should be set to no-cache, no-store, must-revalidate" + assert response.headers["Pragma"] == "no-cache", "Pragma header should be set to no-cache" \ No newline at end of file diff --git a/tests/api/test_auth_profile.py b/tests/api/test_auth_profile.py new file mode 100644 index 0000000..21d5feb --- /dev/null +++ b/tests/api/test_auth_profile.py @@ -0,0 +1,226 @@ +import pytest +import allure +from pydantic import ValidationError +from tests.api.schemas.auth_schemas import UserProfile, ErrorResponse +from utils.token_generator import generate_invalid_tokens +import base64 + + +@allure.feature("Authentication") +@allure.story("User Profile") +@pytest.mark.api +class TestProfileGeneral: + + @allure.title("Successfully retrieve user profile with valid token") + @allure.severity(allure.severity_level.CRITICAL) + @pytest.mark.smoke + def test_get_profile_success(self, authenticated_auth_client): + response = authenticated_auth_client.get_profile() + + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + + try: + profile = UserProfile(**response.json()) + assert profile.id, "User id should not be empty" + assert profile.username, "Username should not be empty" + assert profile.email, "Email should not be empty" + # assert profile.tenant_id, "Tenant id should not be empty" + # assert profile.tenant_name, "Tenant name should not be empty" + # assert profile.tenant_role, "Tenant role should not be empty" + except ValidationError as e: + pytest.fail(f"Profile schema validation failed: {e}") + + @allure.title("Profile data matches login response user data") + @allure.severity(allure.severity_level.NORMAL) + def test_profile_matches_login_data(self, authenticated_auth_client, valid_credentials): + from tests.api.schemas.auth_schemas import LoginResponse + + login_response = authenticated_auth_client.login( + valid_credentials["username"], + valid_credentials["password"] + ) + assert login_response.status_code == 200 + + login_data = LoginResponse(**login_response.json()) + + authenticated_auth_client.token = login_data.access_token + profile_response = authenticated_auth_client.get_profile() + assert profile_response.status_code == 200 + + profile = UserProfile(**profile_response.json()) + + assert profile.id == login_data.user.id, "User ID should match" + assert profile.username == login_data.user.username, "Username should match" + assert profile.email == login_data.user.email, "Email should match" + assert profile.tenant_id == login_data.user.tenant_id, "Tenant ID should match" + assert profile.tenant_name == login_data.user.tenant_name, "Tenant name should match" + assert profile.tenant_role == login_data.user.tenant_role, "Tenant role should match" + + @allure.title("Profile endpoint returns consistent data on multiple calls") + @allure.severity(allure.severity_level.NORMAL) + def test_profile_consistency(self, authenticated_auth_client): + response1 = authenticated_auth_client.get_profile() + response2 = authenticated_auth_client.get_profile() + + assert response1.status_code == 200 + assert response2.status_code == 200 + + profile1 = UserProfile(**response1.json()) + profile2 = UserProfile(**response2.json()) + + assert profile1.model_dump() == profile2.model_dump(), "Profile data should be consistent across multiple calls" + + @allure.title("Profile contains valid email format") + @allure.severity(allure.severity_level.MINOR) + def test_profile_email_format(self, authenticated_auth_client): + response = authenticated_auth_client.get_profile() + + assert response.status_code == 200 + + profile = UserProfile(**response.json()) + assert "@" in profile.email, "Email should contain @ symbol" + assert "." in profile.email.split("@")[1], "Email domain should contain a dot" + + @allure.title("Profile contains valid UUID format for IDs") + @allure.severity(allure.severity_level.MINOR) + def test_profile_uuid_format(self, authenticated_auth_client): + import uuid + + response = authenticated_auth_client.get_profile() + + assert response.status_code == 200 + + profile = UserProfile(**response.json()) + + try: + uuid.UUID(profile.id) + uuid.UUID(profile.tenant_id) + except ValueError: + pytest.fail("User ID and Tenant ID should be valid UUIDs") + + @allure.title("Profile contains valid tenant role") + @allure.severity(allure.severity_level.NORMAL) + def test_profile_tenant_role(self, authenticated_auth_client): + response = authenticated_auth_client.get_profile() + + assert response.status_code == 200 + + profile = UserProfile(**response.json()) + valid_roles = ["owner", "admin", "editor", "viewer"] + + assert profile.tenant_role in valid_roles, "Tenant role should be valid" + +@allure.feature("Authentication") +@allure.story("User Profile") +@pytest.mark.api +class TestProfileAccess: + + @allure.title("Get profile fails without authentication token") + @allure.severity(allure.severity_level.CRITICAL) + @pytest.mark.smoke + def test_get_profile_without_token(self, authenticated_auth_client): + token = authenticated_auth_client.token + authenticated_auth_client.token = None + response = authenticated_auth_client.get_profile() + + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.token = token + + @allure.title("Get profile fails with invalid token") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("invalid_token", generate_invalid_tokens()) + def test_get_profile_invalid_token(self, authenticated_auth_client, invalid_token): + authenticated_auth_client.token = invalid_token + + response = authenticated_auth_client.get_profile() + + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + + #TODO add expired token generation + @allure.title("Get profile fails with expired token") + @allure.severity(allure.severity_level.NORMAL) + def test_get_profile_expired_token(self, authenticated_auth_client): + token = authenticated_auth_client.token + expired_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyLCJleHAiOjE1MTYyMzkwMjJ9.4Adcj0vVzr7B8Y8P9nGJ5pZXkJZ5JZ5JZ5JZ5JZ5JZ5" + authenticated_auth_client.token = expired_token + + response = authenticated_auth_client.get_profile() + + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.token = token + + @allure.title("Get profile with wrong auth type") + @allure.severity(allure.severity_level.NORMAL) + def test_get_profile_with_wrong_auth_type(self, authenticated_auth_client): + headers = authenticated_auth_client.headers.copy() + authenticated_auth_client.headers["Authorization"] = "Basic " + base64.b64encode(authenticated_auth_client.token.encode()).decode() + response = authenticated_auth_client.get_profile() + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.headers = headers + + @allure.title("Get profile with wrong auth format") + @allure.severity(allure.severity_level.NORMAL) + def test_get_profile_with_wrong_auth_format(self, authenticated_auth_client): + headers = authenticated_auth_client.headers.copy() + authenticated_auth_client.headers["Authorization"] = "Bearer " + base64.b64encode(authenticated_auth_client.token.encode()).decode() + response = authenticated_auth_client.get_profile() + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.headers = headers + + @allure.title("Get profile with too long access token") + @allure.severity(allure.severity_level.NORMAL) + def test_get_profile_with_too_long_access_token(self, authenticated_auth_client): + headers = authenticated_auth_client.headers.copy() + authenticated_auth_client.headers["Authorization"] = "Bearer " + "a" * 20480 + response = authenticated_auth_client.get_profile() + assert response.status_code == 431, f"Expected 431, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.headers = headers + + @allure.title("Get profile with revoked access token") + @allure.severity(allure.severity_level.NORMAL) + def test_get_profile_with_revoked_access_token(self, authenticated_auth_client, valid_credentials): + authenticated_auth_client.logout() + response = authenticated_auth_client.get_profile() + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.login(valid_credentials["username"], valid_credentials["password"]) + +class TestProfileHeaders: + + @allure.title("Get profile without content type") + @allure.severity(allure.severity_level.NORMAL) + def test_get_profile_without_content_type(self, authenticated_auth_client): + headers = authenticated_auth_client.headers.copy() + authenticated_auth_client.headers.pop("Content-Type") + response = authenticated_auth_client.get_profile() + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.headers = headers + + @allure.title("Get profile with wrong content type") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("content_type", ["text/plain", "application/xml", "application/json; charset=utf-8"]) + def test_get_profile_with_wrong_content_type(self, authenticated_auth_client, content_type): + headers = authenticated_auth_client.headers.copy() + headers["Content-Type"] = content_type + response = authenticated_auth_client.get_profile() + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.headers = headers + + @allure.title("Get profile check cache") + @allure.severity(allure.severity_level.NORMAL) + def test_get_profile_check_cache(self, authenticated_auth_client): + response = authenticated_auth_client.get_profile() + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + assert response.headers["Cache-Control"] == "no-cache, no-store, must-revalidate", "Cache-Control header should be set to no-cache, no-store, must-revalidate" + assert response.headers["Pragma"] == "no-cache", "Pragma header should be set to no-cache" + + + diff --git a/tests/api/test_auth_refresh.py b/tests/api/test_auth_refresh.py new file mode 100644 index 0000000..a587f7e --- /dev/null +++ b/tests/api/test_auth_refresh.py @@ -0,0 +1,263 @@ +import pytest +import allure +import time +from pydantic import ValidationError +from tests.api.schemas.auth_schemas import LoginResponse, RefreshTokenResponse, ErrorResponse, UserProfile +from utils.token_generator import generate_invalid_refresh_tokens, generate_invalid_bearer_tokens +import base64 + +@allure.feature("Authentication") +@allure.story("Token Refresh") +@pytest.mark.api +class TestTokenRefreshGeneral: + + @allure.title("Successfully refresh access token with valid refresh token") + @allure.severity(allure.severity_level.CRITICAL) + @pytest.mark.smoke + def test_refresh_token_success(self, auth_client, valid_credentials): + login_response = auth_client.login( + valid_credentials["username"], + valid_credentials["password"] + ) + assert login_response.status_code == 200 + + login_data = LoginResponse(**login_response.json()) + + refresh_response = auth_client.post_refresh(login_data.refresh_token) + + assert refresh_response.status_code == 200, f"Expected 200, got {refresh_response.status_code}" + + try: + refresh_data = RefreshTokenResponse(**refresh_response.json()) + assert refresh_data.access_token, "access_token is empty" + assert refresh_data.access_token != login_data.access_token, "New access token should be different from old one" + except ValidationError as e: + pytest.fail(f"Refresh token response schema validation failed: {e}") + +@allure.feature("Authentication") +@allure.story("Token Refresh") +@pytest.mark.api +class TestRefreshTokenManipulation: + @allure.title("Refresh token fails with invalid refresh token") + @allure.severity(allure.severity_level.CRITICAL) + @pytest.mark.smoke + @pytest.mark.parametrize("invalid_token", generate_invalid_refresh_tokens()) + def test_refresh_token_invalid(self, auth_client, invalid_token): + response = auth_client.post_refresh(invalid_token) + + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Refresh token fails with missing refresh token") + @allure.severity(allure.severity_level.NORMAL) + def test_refresh_token_missing(self, auth_client): + response = auth_client.post("/v1/auth/refresh", json={}) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Refresh token fails with empty refresh token") + @allure.severity(allure.severity_level.NORMAL) + def test_refresh_token_empty(self, auth_client): + response = auth_client.post_refresh("") + + assert response.status_code in [400, 401], f"Expected 400 or 401, got {response.status_code}" + ErrorResponse(**response.json()) + + #TODO generate expired refresh tokens + @allure.title("Refresh token fails with expired refresh token") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.slow + def test_refresh_token_expired(self, auth_client): + expired_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyLCJleHAiOjE1MTYyMzkwMjJ9.4Adcj0vVzr7B8Y8P9nGJ5pZXkJZ5JZ5JZ5JZ5JZ5JZ5" + + response = auth_client.post_refresh(expired_token) + + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Multiple refresh requests with same token generate different access tokens") + @allure.severity(allure.severity_level.NORMAL) + def test_refresh_token_multiple_times(self, auth_client, valid_credentials): + login_response = auth_client.login( + valid_credentials["username"], + valid_credentials["password"] + ) + assert login_response.status_code == 200 + + login_data = LoginResponse(**login_response.json()) + + refresh_response1 = auth_client.post_refresh(login_data.refresh_token) + time.sleep(1) + refresh_response2 = auth_client.post_refresh(login_data.refresh_token) + + assert refresh_response1.status_code == 200 + assert refresh_response2.status_code == 200 + + refresh_data1 = RefreshTokenResponse(**refresh_response1.json()) + refresh_data2 = RefreshTokenResponse(**refresh_response2.json()) + + assert refresh_data1.access_token != refresh_data2.access_token, "Multiple refresh requests should generate different access tokens" + + @allure.title("Refreshed access token is valid for API calls") + @allure.severity(allure.severity_level.CRITICAL) + def test_refreshed_token_is_valid(self, auth_client, valid_credentials): + login_response = auth_client.login( + valid_credentials["username"], + valid_credentials["password"] + ) + assert login_response.status_code == 200 + + login_data = LoginResponse(**login_response.json()) + + refresh_response = auth_client.post_refresh(login_data.refresh_token) + assert refresh_response.status_code == 200 + + refresh_data = RefreshTokenResponse(**refresh_response.json()) + + auth_client.token = refresh_data.access_token + profile_response = auth_client.get_profile() + + assert profile_response.status_code == 200, "Refreshed token should be valid for API calls" + UserProfile(**profile_response.json()) + + @allure.title("Old access token is invalid for API calls after refresh") + @allure.severity(allure.severity_level.CRITICAL) + def test_old_token_is_invalid(self, auth_client, valid_credentials): + login_response = auth_client.login( + valid_credentials["username"], + valid_credentials["password"] + ) + assert login_response.status_code == 200 + + login_data = LoginResponse(**login_response.json()) + + refresh_response = auth_client.post_refresh(login_data.refresh_token) + assert refresh_response.status_code == 200 + + auth_client.token = login_data.access_token + profile_response = auth_client.get_profile() + + assert profile_response.status_code == 401, "Old token should be invalid for API calls after refresh" + ErrorResponse(**profile_response.json()) + +@allure.feature("Authentication") +@allure.story("Token Refresh") +@pytest.mark.api +class TestRefreshTokenAccess: + + @allure.title("Refresh fails without authentication token") + @allure.severity(allure.severity_level.CRITICAL) + @pytest.mark.smoke + def test_refresh_without_token(self, authenticated_auth_client): + token = authenticated_auth_client.token + authenticated_auth_client.token = None + + refresh_response = authenticated_auth_client.post_refresh(authenticated_auth_client.refresh_token) + + assert refresh_response.status_code == 401, f"Expected 401, got {refresh_response.status_code}" + ErrorResponse(**refresh_response.json()) + authenticated_auth_client.token = token + + @allure.title("Refresh fails with invalid access token") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("invalid_token", generate_invalid_bearer_tokens()) + def test_refresh_invalid_token(self, authenticated_auth_client, invalid_token): + token = authenticated_auth_client.token + authenticated_auth_client.token = invalid_token + response = authenticated_auth_client.post_refresh(authenticated_auth_client.refresh_token) + + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.token = token + + #TODO add expired token generation + @allure.title("Refresh fails with expired access token") + @allure.severity(allure.severity_level.NORMAL) + def test_refresh_expired_token(self, authenticated_auth_client): + token = authenticated_auth_client.token + expired_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyLCJleHAiOjE1MTYyMzkwMjJ9.4Adcj0vVzr7B8Y8P9nGJ5pZXkJZ5JZ5JZ5JZ5JZ5JZ5" + authenticated_auth_client.token = expired_token + + response = authenticated_auth_client.post_refresh(authenticated_auth_client.refresh_token) + + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.token = token + + @allure.title("Refresh with wrong auth type") + @allure.severity(allure.severity_level.NORMAL) + def test_refresh_with_wrong_auth_type(self, authenticated_auth_client): + headers = authenticated_auth_client.headers.copy() + authenticated_auth_client.headers["Authorization"] = "Basic " + base64.b64encode(authenticated_auth_client.token.encode()).decode() + response = authenticated_auth_client.post_refresh(authenticated_auth_client.refresh_token) + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.headers = headers + + @allure.title("Refresh with wrong auth format") + @allure.severity(allure.severity_level.NORMAL) + def test_refresh_with_wrong_auth_format(self, authenticated_auth_client): + headers = authenticated_auth_client.headers.copy() + authenticated_auth_client.headers["Authorization"] = "Bearer " + base64.b64encode(authenticated_auth_client.token.encode()).decode() + response = authenticated_auth_client.post_refresh(authenticated_auth_client.refresh_token) + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.headers = headers + + @allure.title("Refresh with too long access token") + @allure.severity(allure.severity_level.NORMAL) + def test_refresh_with_too_long_access_token(self, authenticated_auth_client): + headers = authenticated_auth_client.headers.copy() + authenticated_auth_client.headers["Authorization"] = "Bearer " + "a" * 20480 + response = authenticated_auth_client.post_refresh(authenticated_auth_client.refresh_token) + assert response.status_code == 431, f"Expected 431, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.headers = headers + + @allure.title("Refresh with revoked refresh token") + @allure.severity(allure.severity_level.NORMAL) + def test_refresh_with_revoked_refresh_token(self, authenticated_auth_client): + authenticated_auth_client.logout() + response = authenticated_auth_client.post_refresh(authenticated_auth_client.refresh_token) + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.login() + +@allure.feature("Authentication") +@allure.story("Token Refresh") +@pytest.mark.api +class TestRefreshTokenHeaders: + + @allure.title("Refresh without content type") + @allure.severity(allure.severity_level.NORMAL) + def test_refresh_without_content_type(self, authenticated_auth_client): + headers = authenticated_auth_client.headers.copy() + authenticated_auth_client.headers.pop("Content-Type") + response = authenticated_auth_client.post_refresh(authenticated_auth_client.refresh_token) + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.headers = headers + + @allure.title("Refresh with wrong content type") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("content_type", ["text/plain", "application/xml", "application/json; charset=utf-8"]) + def test_refresh_with_wrong_content_type(self, authenticated_auth_client, content_type): + headers = authenticated_auth_client.headers.copy() + headers["Content-Type"] = content_type + response = authenticated_auth_client.post_refresh(authenticated_auth_client.refresh_token) + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.headers = headers + + @allure.title("Refresh check cache") + @allure.severity(allure.severity_level.NORMAL) + def test_refresh_check_cache(self, authenticated_auth_client): + response = authenticated_auth_client.post_refresh(authenticated_auth_client.refresh_token) + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + assert response.headers["Cache-Control"] == "no-cache, no-store, must-revalidate", "Cache-Control header should be set to no-cache, no-store, must-revalidate" + assert response.headers["Pragma"] == "no-cache", "Pragma header should be set to no-cache" + + + + diff --git a/tests/api/test_auth_username.py b/tests/api/test_auth_username.py new file mode 100644 index 0000000..9264a65 --- /dev/null +++ b/tests/api/test_auth_username.py @@ -0,0 +1,266 @@ +import pytest +import allure +from pydantic import ValidationError +from tests.api.schemas.auth_schemas import UserProfile, ErrorResponse +from tests.api.cases.test_cases import EMPTY_STRING_CASES, NONSTRING_CASES +import base64 +from utils.token_generator import generate_invalid_bearer_tokens + + +@allure.feature("Authentication") +@allure.story("Username Change") +@pytest.mark.api +class TestUsernameChangeGeneral: + #TODO clarify username requirements + @allure.title("Change username successfully with valid username") + @allure.severity(allure.severity_level.CRITICAL) + @pytest.mark.smoke + def test_change_username_success(self, authenticated_auth_client, valid_username, valid_credentials): + response = authenticated_auth_client.change_username(valid_username) + + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + try: + profile = UserProfile(**response.json()) + assert profile.username == valid_username, "Username should be updated" + valid_credentials["username"] = valid_username + except ValidationError as e: + pytest.fail(f"Username change response schema validation failed: {e}") + +@allure.feature("Authentication") +@allure.story("Username Change") +@pytest.mark.api +class TestUsernameChangeUsernameValidation: + + @allure.title("Change username with missing new_username") + @allure.severity(allure.severity_level.NORMAL) + def test_change_username_missing_field(self, authenticated_auth_client): + response = authenticated_auth_client.change_username(None) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Change username with empty new_username") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("new_username", EMPTY_STRING_CASES) + def test_change_username_empty(self, authenticated_auth_client, new_username): + response = authenticated_auth_client.change_username(new_username) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Change username with non-string type") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("new_username", NONSTRING_CASES) + def test_change_username_nonstring(self, authenticated_auth_client, new_username): + response = authenticated_auth_client.change_username(new_username) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Change username with too short username") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("short_username", ["a", "ab", "abc"]) + def test_change_username_too_short(self, authenticated_auth_client, short_username): + response = authenticated_auth_client.change_username(short_username) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Change username with too long username") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("username", ["a" * 51, "a" * 100]) + def test_change_username_too_long(self, authenticated_auth_client, username): + response = authenticated_auth_client.change_username(username) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Change username with invalid characters") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("invalid_username", [ + "user@name", + "user name", + "user#name", + "user$name", + "user%name", + "user!name", + "user*name", + "user(name)", + "user[name]", + "user{name}", + ]) + def test_change_username_invalid_characters(self, authenticated_auth_client, invalid_username): + response = authenticated_auth_client.change_username(invalid_username) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Change username with special characters") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("special_username", [ + "user.name", + "user_name", + "user-name", + "user123", + "123user", + ]) + def test_change_username_special_characters(self, authenticated_auth_client, special_username): + response = authenticated_auth_client.change_username(special_username) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Change username with existing username") + @allure.severity(allure.severity_level.NORMAL) + def test_change_username_already_exists(self, authenticated_auth_client, valid_credentials): + response = authenticated_auth_client.change_username(valid_credentials["username"]) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + + @allure.title("Change username with extra fields in request") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("extra_field, extra_value", [("extra_field", "extra_value"), ("is_admin", True), ("test_field", "test_value")]) + def test_change_username_extra_fields(self, authenticated_auth_client, valid_username, extra_field, extra_value): + response = authenticated_auth_client.send_custom_request( + "PUT", + "/v1/auth/username", + json={ + "new_username": valid_username, + extra_field: extra_value + } + ) + + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + assert extra_field not in response.json(), f"Extra field {extra_field} should not be in response" + + @allure.title("Change username with SQL injection attempt") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("malicious_username", [ + "admin'; DROP TABLE users; --", + "' OR '1'='1", + "admin'--", + "1' UNION SELECT * FROM users--", + ]) + def test_change_username_sql_injection(self, authenticated_auth_client, malicious_username): + response = authenticated_auth_client.change_username(malicious_username) + + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + +@allure.feature("Authentication") +@allure.story("Username Change") +@pytest.mark.api +class TestUsernameChangeAccess: + + @allure.title("Change username fails without authentication token") + @allure.severity(allure.severity_level.CRITICAL) + @pytest.mark.smoke + def test_change_username_without_token(self, authenticated_auth_client, valid_username): + token = authenticated_auth_client.token + authenticated_auth_client.token = None + + response = authenticated_auth_client.change_username(valid_username) + + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.token = token + + @allure.title("Change username fails with invalid access token") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("invalid_token", generate_invalid_bearer_tokens()) + def test_change_username_invalid_token(self, authenticated_auth_client, invalid_token, valid_username): + token = authenticated_auth_client.token + authenticated_auth_client.token = invalid_token + response = authenticated_auth_client.change_username(valid_username) + + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.token = token + + #TODO add expired token generation + @allure.title("Change username fails with expired access token") + @allure.severity(allure.severity_level.NORMAL) + def test_change_username_expired_token(self, authenticated_auth_client, valid_username): + token = authenticated_auth_client.token + expired_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyLCJleHAiOjE1MTYyMzkwMjJ9.4Adcj0vVzr7B8Y8P9nGJ5pZXkJZ5JZ5JZ5JZ5JZ5JZ5" + authenticated_auth_client.token = expired_token + + response = authenticated_auth_client.change_username(valid_username) + + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.token = token + + @allure.title("Change username with wrong auth type") + @allure.severity(allure.severity_level.NORMAL) + def test_change_username_with_wrong_auth_type(self, authenticated_auth_client, valid_username): + headers = authenticated_auth_client.headers.copy() + headers["Authorization"] = "Basic " + base64.b64encode(authenticated_auth_client.token.encode()).decode() + response = authenticated_auth_client.change_username(valid_username) + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.headers = headers + + @allure.title("Change username with wrong auth format") + @allure.severity(allure.severity_level.NORMAL) + def test_change_username_with_wrong_auth_format(self, authenticated_auth_client, valid_username): + headers = authenticated_auth_client.headers.copy() + authenticated_auth_client.headers["Authorization"] = "Bearer " + base64.b64encode(authenticated_auth_client.token.encode()).decode() + response = authenticated_auth_client.change_username(valid_username) + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.headers = headers + + @allure.title("Change username with too long access token") + @allure.severity(allure.severity_level.NORMAL) + def test_change_username_with_too_long_access_token(self, authenticated_auth_client, valid_username): + headers = authenticated_auth_client.headers.copy() + authenticated_auth_client.headers["Authorization"] = "Bearer " + "a" * 20480 + response = authenticated_auth_client.change_username(valid_username) + assert response.status_code == 431, f"Expected 431, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.headers = headers + + @allure.title("Change username with revoked refresh token") + @allure.severity(allure.severity_level.NORMAL) + def test_change_username_with_revoked_refresh_token(self, authenticated_auth_client, valid_username): + authenticated_auth_client.logout() + response = authenticated_auth_client.change_username(valid_username) + assert response.status_code == 401, f"Expected 401, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.login() + +@allure.feature("Authentication") +@allure.story("Username Change") +@pytest.mark.api +class TestChangeUsernameHeaders: + + @allure.title("Change username without content type") + @allure.severity(allure.severity_level.NORMAL) + def test_change_username_without_content_type(self, authenticated_auth_client, valid_username): + headers = authenticated_auth_client.headers.copy() + headers.pop("Content-Type") + response = authenticated_auth_client.change_username(valid_username) + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.headers = headers + + @allure.title("Change username with wrong content type") + @allure.severity(allure.severity_level.NORMAL) + @pytest.mark.parametrize("content_type", ["text/plain", "application/xml", "application/json; charset=utf-8"]) + def test_change_username_with_wrong_content_type(self, authenticated_auth_client, content_type, valid_username): + headers = authenticated_auth_client.headers.copy() + headers["Content-Type"] = content_type + response = authenticated_auth_client.change_username(valid_username) + assert response.status_code == 400, f"Expected 400, got {response.status_code}" + ErrorResponse(**response.json()) + authenticated_auth_client.headers = headers + + @allure.title("Change username check cache") + @allure.severity(allure.severity_level.NORMAL) + def test_change_username_check_cache(self, authenticated_auth_client, valid_username): + response = authenticated_auth_client.change_username(valid_username) + assert response.status_code == 200, f"Expected 200, got {response.status_code}" + assert response.headers["Cache-Control"] == "no-cache, no-store, must-revalidate", "Cache-Control header should be set to no-cache, no-store, must-revalidate" + assert response.headers["Pragma"] == "no-cache", "Pragma header should be set to no-cache" diff --git a/utils/token_generator.py b/utils/token_generator.py new file mode 100644 index 0000000..72f9942 --- /dev/null +++ b/utils/token_generator.py @@ -0,0 +1,167 @@ +import base64 +import json +from faker import Faker +import random +import string +import time + + +def generate_invalid_bearer_tokens(): + """ + Generate various types of invalid bearer tokens for testing. + + Returns: + list: List of invalid token strings for testing + """ + + faker = Faker() + valid_header = base64.urlsafe_b64encode( + json.dumps({"alg": "HS256", "typ": "JWT"}).encode() + ).decode().rstrip('=') + + valid_payload = base64.urlsafe_b64encode( + json.dumps({"sub": "1234567890", "name": str(faker.name()), "iat": int(time.time())}).encode() + ).decode().rstrip('=') + + valid_signature = ''.join(random.choices(string.ascii_letters + string.digits + '-_', k=43)) + + return [ + # Completely invalid strings + "invalid_token", + "not.a.jwt", + "12345", + "", + + # Missing parts + f"{valid_header}", + f"{valid_header}.{valid_payload}", + f".{valid_payload}.{valid_signature}", + f"{valid_header}..{valid_signature}", + + # Too many parts + f"{valid_header}.{valid_payload}.{valid_signature}.extra", + f"{valid_header}.{valid_payload}.{valid_signature}.extra.parts", + + # Invalid base64 encoding + f"{valid_header}.invalid_base64.{valid_signature}", + f"invalid_base64.{valid_payload}.{valid_signature}", + f"{valid_header}.{valid_payload}.invalid_base64", + + # Malformed structure + f"...", + f"....", + f"{valid_header}...{valid_signature}", + + # Invalid JSON in parts + "bm90X2pzb24.bm90X2pzb24.signature", # "not_json" base64 encoded + + # Tampered signature + f"{valid_header}.{valid_payload}.tampered_signature", + f"{valid_header}.{valid_payload}.aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + + # Special characters + f"{valid_header}.{valid_payload}.sig@#$%", + f"header!@#.payload$%^.signature&*()", + + # Expired/invalid timestamps (will be caught by signature validation) + "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyLCJleHAiOjE1MTYyMzkwMjJ9.invalid", + + # Wrong algorithm in header + "eyJhbGciOiJub25lIiwidHlwIjoiSldUIn0.eyJzdWIiOiIxMjM0NTY3ODkwIn0.signature", + + # Unicode/special encoding + "тест.токен.подпись", + "🔑.🔐.🔒", + + # Very long token + "a" * 10000, + + # SQL injection attempts in token + "'; DROP TABLE users; --", + + # Null bytes + "header\x00.payload\x00.signature\x00", + ] + +def generate_invalid_refresh_tokens(): + """ + Generate various types of invalid refresh tokens for testing. + Refresh tokens are JWTs similar to access tokens but typically have longer expiration. + + Returns: + list: List of invalid refresh token strings for testing + """ + faker = Faker() + valid_header = base64.urlsafe_b64encode( + json.dumps({"alg": "HS256", "typ": "JWT"}).encode() + ).decode().rstrip('=') + + valid_payload = base64.urlsafe_b64encode( + json.dumps({ + "sub": faker.uuid4(), + "type": "refresh", + "iat": int(time.time()), + "exp": int(time.time()) + 2592000 # 30 days + }).encode() + ).decode().rstrip('=') + + valid_signature = ''.join(random.choices(string.ascii_letters + string.digits + '-_', k=43)) + + wrong_type_token = ( + base64.urlsafe_b64encode(json.dumps({"alg": "HS256", "typ": "JWT"}).encode()).decode().rstrip('=') + "." + + base64.urlsafe_b64encode(json.dumps({ + "sub": faker.uuid4(), + "type": "access", + "iat": int(time.time()), + "exp": int(time.time()) + 3600 + }).encode()).decode().rstrip('=') + "." + valid_signature + ) + + return [ + # Completely invalid strings + "invalid_refresh_token", + "not.a.refresh.token", + "refresh123", + "", + + # Missing parts + f"{valid_header}", + f"{valid_header}.{valid_payload}", + f".{valid_payload}.{valid_signature}", + f"{valid_header}..{valid_signature}", + + # Too many parts + f"{valid_header}.{valid_payload}.{valid_signature}.extra", + + # Invalid base64 encoding + f"{valid_header}.invalid_base64.{valid_signature}", + f"invalid_base64.{valid_payload}.{valid_signature}", + f"{valid_header}.{valid_payload}.invalid_base64", + + # Malformed structure + "...", + "....", + + # Tampered signature + f"{valid_header}.{valid_payload}.tampered_signature", + f"{valid_header}.{valid_payload}.aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + + # Special characters + f"{valid_header}.{valid_payload}.sig@#$%", + + # Wrong token type (access token instead of refresh) + wrong_type_token, + + # Unicode/special encoding + "тест.refresh.токен", + "🔄.🔐.🔒", + + # Very long token + "r" * 10000, + + # SQL injection attempts + "'; DROP TABLE refresh_tokens; --", + + # Null bytes + "refresh\x00.token\x00.signature\x00", + ] \ No newline at end of file