From 7ab4b236d5ad3fe9776a6a82a56d62a8e3578ef2 Mon Sep 17 00:00:00 2001 From: Jonathan Gangi Date: Wed, 25 Jun 2025 18:14:14 -0300 Subject: [PATCH 1/3] Azure: Add new models and exceptions This commit introduces a new model named `ConfigureError` and exceptions named `ConflictError` and `RunningSubmissionError` which aims to provide a more detailed status of an Azure Publishing Error. The goal is to be able to differentiate certain errors which are caused by submission in progress/conflict. Assisted-By: Cursor Signed-off-by: Jonathan Gangi --- cloudpub/error.py | 8 ++++++++ cloudpub/models/ms_azure.py | 19 ++++++++++++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/cloudpub/error.py b/cloudpub/error.py index b3dd908..184f2ea 100644 --- a/cloudpub/error.py +++ b/cloudpub/error.py @@ -17,6 +17,14 @@ class InvalidStateError(RuntimeError): """Report invalid state which should not happen in code.""" +class ConflictError(Exception): + """Report a submission conflict error.""" + + +class RunningSubmissionError(Exception): + """Report a running submission error.""" + + class NotFoundError(ValueError): """Represent a missing resource.""" diff --git a/cloudpub/models/ms_azure.py b/cloudpub/models/ms_azure.py index c3472e7..63d00e9 100644 --- a/cloudpub/models/ms_azure.py +++ b/cloudpub/models/ms_azure.py @@ -28,6 +28,20 @@ def _mask_secret(value: str) -> str: return value +@define +class ConfigureError(AttrsJSONDecodeMixin): + """Represent an error from a :meth:`~AzureService.configure` request.""" + + code: str + """The error code.""" + + message: str + """The error message.""" + + resource_id: str = field(metadata={"alias": "resourceId"}) + """The resource ID.""" + + @define class ConfigureStatus(AttrsJSONDecodeMixin): """Represent a response from a :meth:`~AzureService.configure` request.""" @@ -67,7 +81,10 @@ class ConfigureStatus(AttrsJSONDecodeMixin): resource_uri: Optional[str] = field(metadata={"alias": "resourceUri", "hide_unset": True}) """The resource URI related to the configure job.""" - errors: List[str] + errors: List[ConfigureError] = field( + converter=lambda x: [ConfigureError.from_json(r) for r in x] if x else [], + on_setattr=NO_OP, + ) """List of errors when the ``job_result`` is ``failed``.""" From 907cb65ef3604ce9b4461514375f9a27660515a7 Mon Sep 17 00:00:00 2001 From: Jonathan Gangi Date: Wed, 25 Jun 2025 18:16:18 -0300 Subject: [PATCH 2/3] Azure: Retry on publishing when a conflict occurs This commit changes the Azure module to retry publishing the VM image whenever a submission in progress/conflict happens. It will first attempt to change the target to `preview` or `live` for 3 times and then, if the exception comes as `ConflictError` or `RunningSubmissionError` it will restart the publishing task. Assisted-By: Cursor Signed-off-by: Jonathan Gangi --- cloudpub/ms_azure/service.py | 22 +++++++++++++++++----- cloudpub/ms_azure/utils.py | 20 ++++++++++++++++++++ 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/cloudpub/ms_azure/service.py b/cloudpub/ms_azure/service.py index 7641040..743ff5e 100644 --- a/cloudpub/ms_azure/service.py +++ b/cloudpub/ms_azure/service.py @@ -6,13 +6,13 @@ from deepdiff import DeepDiff from requests import HTTPError -from tenacity import retry -from tenacity.retry import retry_if_result +from tenacity import retry, wait_exponential +from tenacity.retry import retry_if_exception_type, retry_if_result from tenacity.stop import stop_after_attempt, stop_after_delay from tenacity.wait import wait_chain, wait_fixed from cloudpub.common import BaseService -from cloudpub.error import InvalidStateError, NotFoundError +from cloudpub.error import ConflictError, InvalidStateError, NotFoundError, RunningSubmissionError from cloudpub.models.ms_azure import ( RESOURCE_MAPING, AzureResource, @@ -38,6 +38,8 @@ from cloudpub.ms_azure.session import PartnerPortalSession from cloudpub.ms_azure.utils import ( AzurePublishingMetadata, + check_for_conflict, + check_for_running_submission, create_disk_version_from_scratch, is_azure_job_not_complete, is_sas_present, @@ -180,6 +182,10 @@ def _wait_for_job_completion(self, job_id: str) -> ConfigureStatus: job_details = self._query_job_details(job_id=job_id) if job_details.job_result == "failed": error_message = f"Job {job_id} failed: \n{job_details.errors}" + if check_for_conflict(job_details): + self._raise_error(ConflictError, error_message) + elif check_for_running_submission(job_details): + self._raise_error(RunningSubmissionError, error_message) self._raise_error(InvalidStateError, error_message) elif job_details.job_result == "succeeded": log.debug("Job %s succeeded", job_id) @@ -558,7 +564,7 @@ def _publish_preview(self, product: Product, product_name: str) -> None: if res.job_result != 'succeeded' or not self.get_submission_state( product.id, state="preview" ): - errors = "\n".join(res.errors) + errors = "\n".join(["%s: %s" % (error.code, error.message) for error in res.errors]) failure_msg = ( f"Failed to submit the product {product.id} to preview. " f"Status: {res.job_result} Errors: {errors}" @@ -585,13 +591,19 @@ def _publish_live(self, product: Product, product_name: str) -> None: res = self.submit_to_status(product_id=product.id, status='live') if res.job_result != 'succeeded' or not self.get_submission_state(product.id, state="live"): - errors = "\n".join(res.errors) + errors = "\n".join(["%s: %s" % (error.code, error.message) for error in res.errors]) failure_msg = ( f"Failed to submit the product {product.id} to live. " f"Status: {res.job_result} Errors: {errors}" ) raise RuntimeError(failure_msg) + @retry( + retry=retry_if_exception_type((ConflictError, RunningSubmissionError)), + wait=wait_exponential(multiplier=1, min=60, max=60 * 60 * 24 * 7), + stop=stop_after_attempt(3), + reraise=True, + ) def publish(self, metadata: AzurePublishingMetadata) -> None: """ Associate a VM image with a given product listing (destination) and publish it if required. diff --git a/cloudpub/ms_azure/utils.py b/cloudpub/ms_azure/utils.py index 9fc144a..39fc9b0 100644 --- a/cloudpub/ms_azure/utils.py +++ b/cloudpub/ms_azure/utils.py @@ -1,5 +1,6 @@ # SPDX-License-Identifier: GPL-3.0-or-later import logging +import re from operator import attrgetter from typing import Any, Dict, List, Optional, Tuple @@ -596,3 +597,22 @@ def logdiff(diff: DeepDiff) -> None: """Log the offer diff if it exists.""" if diff: log.warning("Found the following offer diff before publishing:\n%s", diff.pretty()) + + +def check_for_conflict(job_details: ConfigureStatus) -> bool: + """Check if the job details contain a conflict error.""" + err_lookup = r"The submission cannot be pushed to \w+ as its not the latest .*" + for error in job_details.errors: + if error.code == "conflict" and re.match(err_lookup, error.message): + return True + return False + + +def check_for_running_submission(job_details: ConfigureStatus) -> bool: + """Check if the job details contain a running submission error.""" + err_lookup = r"An In Progress submission [0-9]+ already exists." + for error in job_details.errors: + err_msg = error.message + if error.code == "internalServerError" and re.match(err_lookup, err_msg): + return True + return False From 9ffe3ea1e2f6ff7c2deb0c219968f5ba98828d41 Mon Sep 17 00:00:00 2001 From: Jonathan Gangi Date: Wed, 25 Jun 2025 18:17:52 -0300 Subject: [PATCH 3/3] Add/Update unit tests for Azure This commit updates and creates new tests to make sure the new implemented exception handlers for retrying on in progress submission/conflict are properly working. Assisted-by: Cursor Signed-off-by: Jonathan Gangi --- tests/ms_azure/conftest.py | 54 +++++++++++++++++++++++- tests/ms_azure/test_service.py | 56 ++++++++++++++++++++----- tests/ms_azure/test_utils.py | 75 ++++++++++++++++++++++++++++++++++ 3 files changed, 174 insertions(+), 11 deletions(-) diff --git a/tests/ms_azure/conftest.py b/tests/ms_azure/conftest.py index ec58c89..eac1363 100644 --- a/tests/ms_azure/conftest.py +++ b/tests/ms_azure/conftest.py @@ -91,12 +91,49 @@ def job_details_completed_failure(errors: List[Dict[str, Any]]) -> Dict[str, Any return job_details(status="completed", result="failed", errors=errors) +@pytest.fixture +def job_details_completed_conflict(errors_conflict: List[Dict[str, Any]]) -> Dict[str, Any]: + return job_details(status="completed", result="failed", errors=errors_conflict) + + +@pytest.fixture +def job_details_completed_running_submission( + errors_running_submission: List[Dict[str, Any]], +) -> Dict[str, Any]: + return job_details(status="completed", result="failed", errors=errors_running_submission) + + @pytest.fixture def errors() -> List[Dict[str, Any]]: return [ { + "resourceId": "resource-id", + "code": "internalServerError", + "message": "A catastrophic error occurred.", + "details": [{"code": "invalidResource", "message": "Failure for resource"}], + } + ] + + +@pytest.fixture +def errors_running_submission() -> List[Dict[str, Any]]: + return [ + { + "resourceId": "resource-id", + "code": "internalServerError", + "message": "An In Progress submission 1234567890 already exists.", + "details": [{"code": "invalidResource", "message": "Failure for resource"}], + } + ] + + +@pytest.fixture +def errors_conflict() -> List[Dict[str, Any]]: + return [ + { + "resourceId": "resource-id", "code": "conflict", - "message": "Error message", + "message": "The submission cannot be pushed to live as its not the latest submission.", "details": [{"code": "invalidResource", "message": "Failure for resource"}], } ] @@ -132,6 +169,7 @@ def configure_success_response() -> Dict[str, Any]: def configure_failure_response() -> Dict[str, Any]: return { "error": { + "resourceId": "resource-id", "code": "badRequest", "message": "Invalid configuration: schema validation failed", "details": [ @@ -617,3 +655,17 @@ def job_details_completed_failure_obj( job_details_completed_failure: Dict[str, Any], ) -> ConfigureStatus: return ConfigureStatus.from_json(job_details_completed_failure) + + +@pytest.fixture +def job_details_completed_conflict_obj( + job_details_completed_conflict: Dict[str, Any], +) -> ConfigureStatus: + return ConfigureStatus.from_json(job_details_completed_conflict) + + +@pytest.fixture +def job_details_completed_running_submission_obj( + job_details_completed_running_submission: Dict[str, Any], +) -> ConfigureStatus: + return ConfigureStatus.from_json(job_details_completed_running_submission) diff --git a/tests/ms_azure/test_service.py b/tests/ms_azure/test_service.py index ef8f4fa..6b1325b 100644 --- a/tests/ms_azure/test_service.py +++ b/tests/ms_azure/test_service.py @@ -1,7 +1,7 @@ import json import logging from copy import deepcopy -from typing import Any, Dict, List +from typing import Any, Dict from unittest import mock import pytest @@ -13,7 +13,7 @@ from tenacity.stop import stop_after_attempt from cloudpub.common import BaseService -from cloudpub.error import InvalidStateError, NotFoundError +from cloudpub.error import ConflictError, InvalidStateError, NotFoundError, RunningSubmissionError from cloudpub.models.ms_azure import ( ConfigureStatus, CustomerLeads, @@ -214,6 +214,14 @@ def test_wait_for_job_completion_successful_completion( assert f"Job {job_id} failed" not in caplog.text assert f"Job {job_id} succeeded" in caplog.text + @pytest.mark.parametrize( + "error_name", + [ + "job_details_completed_failure_obj", + "job_details_completed_conflict_obj", + "job_details_completed_running_submission_obj", + ], + ) @mock.patch("cloudpub.ms_azure.utils.is_azure_job_not_complete") @mock.patch("cloudpub.ms_azure.AzureService._query_job_details") def test_get_job_details_after_failed_completion( @@ -224,20 +232,26 @@ def test_get_job_details_after_failed_completion( caplog: LogCaptureFixture, job_details_running_obj: ConfigureStatus, job_details_completed_failure_obj: ConfigureStatus, - errors: List[Dict[str, Any]], + job_details_completed_conflict_obj: ConfigureStatus, + job_details_completed_running_submission_obj: ConfigureStatus, + error_name: str, + request: pytest.FixtureRequest, ) -> None: + mock_job_details.side_effect = [ job_details_running_obj, job_details_running_obj, job_details_running_obj, - job_details_completed_failure_obj, + request.getfixturevalue(error_name), job_details_running_obj, ] azure_service._wait_for_job_completion.retry.sleep = mock.Mock() # type: ignore job_id = "job_id_111" with caplog.at_level(logging.ERROR): - with pytest.raises(InvalidStateError) as e_info: + with pytest.raises( + (InvalidStateError, RunningSubmissionError, ConflictError) + ) as e_info: azure_service._wait_for_job_completion(job_id=job_id) assert f"Job {job_id} failed: \n" in str(e_info.value) assert mock_job_details.call_count == 4 @@ -861,7 +875,7 @@ def test_publish_preview_success_on_retry( None, mock.MagicMock(), # Success on 3rd call ] - # Remove the retry sleep + # Remove the retry sleeptest_publish_preview_fail_on_retry azure_service._publish_preview.retry.sleep = mock.Mock() # type: ignore # Test @@ -891,7 +905,18 @@ def test_publish_preview_fail_on_retry( "jobId": "1", "jobStatus": "completed", "jobResult": "failed", - "errors": ["failure1", "failure2"], + "errors": [ + { + "resourceId": "resource-id", + "code": "internalServerError", + "message": "failure1", + }, + { + "resourceId": "resource-id", + "code": "internalServerError", + "message": "failure2", + }, + ], } ) mock_is_sbpreview.return_value = False @@ -901,7 +926,7 @@ def test_publish_preview_fail_on_retry( azure_service._publish_preview.retry.sleep = mock.Mock() # type: ignore expected_err = ( f"Failed to submit the product {product_obj.id} to preview. " - "Status: failed Errors: failure1\nfailure2" + "Status: failed Errors: internalServerError: failure1\ninternalServerError: failure2" ) # Test @@ -952,7 +977,18 @@ def test_publish_live_fail_on_retry( "jobId": "1", "jobStatus": "completed", "jobResult": "failed", - "errors": ["failure1", "failure2"], + "errors": [ + { + "resourceId": "resource-id", + "code": "internalServerError", + "message": "failure1", + }, + { + "resourceId": "resource-id", + "code": "internalServerError", + "message": "failure2", + }, + ], } ) mock_subst.side_effect = [err_resp for _ in range(3)] @@ -961,7 +997,7 @@ def test_publish_live_fail_on_retry( azure_service._publish_live.retry.sleep = mock.Mock() # type: ignore expected_err = ( f"Failed to submit the product {product_obj.id} to live. " - "Status: failed Errors: failure1\nfailure2" + "Status: failed Errors: internalServerError: failure1\ninternalServerError: failure2" ) # Test diff --git a/tests/ms_azure/test_utils.py b/tests/ms_azure/test_utils.py index 2ece8e3..90f5732 100644 --- a/tests/ms_azure/test_utils.py +++ b/tests/ms_azure/test_utils.py @@ -15,6 +15,8 @@ ) from cloudpub.ms_azure.utils import ( AzurePublishingMetadata, + check_for_conflict, + check_for_running_submission, create_disk_version_from_scratch, get_image_type_mapping, is_azure_job_not_complete, @@ -580,3 +582,76 @@ def test_create_disk_version_from_scratch_arm64( res.vm_images = sorted(res.vm_images, key=attrgetter("image_type")) assert res == disk_version_arm64_obj + + @pytest.mark.parametrize( + "error,expected_result", + [ + ( + { + "code": "conflict", + "message": "The submission cannot be pushed to live as its not the latest submission.", # noqa: E501 + }, + True, + ), + ( + { + "code": "internalServerError", + "message": "An In Progress submission 1234567890 already exists.", + }, + False, + ), + ( + { + "code": "unknownError", + "message": "An In Progress submission 1234567890 already exists.", + }, + False, + ), + ({"code": "conflict", "message": "Something else happened"}, False), + ], + ) + def test_check_for_conflict(self, error: Dict[str, Any], expected_result: bool) -> None: + err_obj = ConfigureStatus.from_json( + { + "jobId": "1", + "jobStatus": "completed", + "jobResult": "failed", + "errors": [error], + } + ) + assert err_obj + assert check_for_conflict(err_obj) == expected_result + + @pytest.mark.parametrize( + "error,expected_result", + [ + ( + { + "code": "internalServerError", + "message": "An In Progress submission 1234567890 already exists.", + }, + True, + ), + ( + { + "code": "unknownError", + "message": "An In Progress submission 1234567890 already exists.", + }, + False, + ), + ({"code": "conflict", "message": "Something else happened"}, False), + ], + ) + def test_check_for_running_submission( + self, error: Dict[str, Any], expected_result: bool + ) -> None: + err_obj = ConfigureStatus.from_json( + { + "jobId": "1", + "jobStatus": "completed", + "jobResult": "failed", + "errors": [error], + } + ) + assert err_obj + assert check_for_running_submission(err_obj) == expected_result