Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cloudpub/error.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,9 @@ class NotFoundError(ValueError):
"""Represent a missing resource."""


class ConflictError(RuntimeError):
"""Report a submission conflict error."""


class Timeout(Exception):
"""Represent a missing resource."""
113 changes: 79 additions & 34 deletions cloudpub/ms_azure/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

from deepdiff import DeepDiff
from requests import HTTPError
from tenacity import retry
from tenacity.retry import retry_if_result
from tenacity import RetryError, Retrying, retry
from tenacity.retry import retry_if_exception_type, retry_if_result
from tenacity.stop import stop_after_attempt, stop_after_delay
from tenacity.wait import wait_chain, wait_fixed
from tenacity.wait import wait_fixed

from cloudpub.common import BaseService
from cloudpub.error import InvalidStateError, NotFoundError
from cloudpub.error import ConflictError, InvalidStateError, NotFoundError, Timeout
from cloudpub.models.ms_azure import (
RESOURCE_MAPING,
AzureResource,
Expand Down Expand Up @@ -82,18 +82,31 @@ class AzureService(BaseService[AzurePublishingMetadata]):
CONFIGURE_SCHEMA = "https://schema.mp.microsoft.com/schema/configure/{AZURE_API_VERSION}"
DIFF_EXCLUDES = [r"root\['resources'\]\[[0-9]+\]\['url'\]"]

def __init__(self, credentials: Dict[str, str]):
def __init__(
self,
credentials: Dict[str, str],
retry_interval: Union[int, float] = 300,
retry_timeout: Union[int, float] = 3600 * 24 * 7,
):
"""
Create a new AuzureService object.

Args:
credentials (dict)
Dictionary with Azure credentials to authenticate on Product Ingestion API.
retry_interval (int, float)
The wait time interval in seconds for retrying jobs.
Defaults to 300
retry_timeout (int, float)
The max time in seconds to attempt retries.
Defaults to 7 days.
"""
self.session = PartnerPortalSession.make_graph_api_session(
auth_keys=credentials, schema_version=self.AZURE_SCHEMA_VERSION
)
self._products: List[ProductSummary] = []
self.retry_interval = retry_interval
self.retry_timeout = retry_timeout

def _configure(self, data: Dict[str, Any]) -> ConfigureStatus:
"""
Expand Down Expand Up @@ -152,15 +165,27 @@ def _query_job_details(self, job_id: str) -> ConfigureStatus:
log.debug("Query Job details response: %s", parsed_resp)
return parsed_resp

@retry(
retry=retry_if_result(predicate=is_azure_job_not_complete),
wait=wait_chain(
*[wait_fixed(wait=60)] # First wait for 1 minute # noqa: W503
+ [wait_fixed(wait=10 * 60)] # Then wait for 10 minutes # noqa: W503
+ [wait_fixed(wait=30 * 60)] # Finally wait each 30 minutes # noqa: W503
),
stop=stop_after_delay(max_delay=60 * 60 * 24 * 7), # Give up after retrying for 7 days
)
def query_job_status(self, job_id: str) -> ConfigureStatus:
"""Query the job status for a given Job ID.

It will raise error if any invalid state is detected.

Args:
job_id (str): The job ID to query details from.

Returns:
ConfigureStatus: The ConfigureStatus from JobID
Raises:
InvalidStateError: If the job has failed.
"""
job_details = self._query_job_details(job_id=job_id)
if job_details.job_result == "failed":
error_message = f"Job {job_id} failed: \n{job_details.errors}"
self._raise_error(InvalidStateError, error_message)
elif job_details.job_result == "succeeded":
log.debug("Job %s succeeded", job_id)
return job_details

def _wait_for_job_completion(self, job_id: str) -> ConfigureStatus:
"""
Wait until the specified job ID is complete.
Expand All @@ -179,13 +204,15 @@ def _wait_for_job_completion(self, job_id: str) -> ConfigureStatus:
Raises:
InvalidStateError if the job failed
"""
job_details = self._query_job_details(job_id=job_id)
if job_details.job_result == "failed":
error_message = f"Job {job_id} failed: \n{job_details.errors}"
self._raise_error(InvalidStateError, error_message)
elif job_details.job_result == "succeeded":
log.debug("Job %s succeeded", job_id)
return job_details
r = Retrying(
retry=retry_if_result(predicate=is_azure_job_not_complete),
wait=wait_fixed(self.retry_interval),
stop=stop_after_delay(max_delay=self.retry_timeout),
)
try:
return r(self.query_job_status, job_id)
except RetryError:
self._raise_error(Timeout, f"Time out waiting for job {job_id}")

def configure(self, resources: List[AzureResource]) -> ConfigureStatus:
"""
Expand Down Expand Up @@ -467,31 +494,48 @@ def submit_to_status(
log.debug("Set the status \"%s\" to submission.", status)
return self.configure(resources=cfg_res)

@retry(
wait=wait_fixed(300),
stop=stop_after_delay(max_delay=60 * 60 * 24 * 7), # Give up after retrying for 7 days,
reraise=True,
)
def ensure_can_publish(self, product_id: str) -> None:
"""
Ensure the offer is not already being published.

It will wait for up to 7 days retrying to make sure it's possible to publish before
giving up and raising.
It will raise ConflictError if a publish is already in progress in any submission target.

Args:
product_id (str)
The product ID to check the offer's publishing status
Raises:
RuntimeError: whenever a publishing is already in progress.
ConflictError: whenever a publishing is already in progress for any submission target.
"""
log.info("Ensuring no other publishing jobs are in progress for \"%s\"", product_id)
submission_targets = ["preview", "live"]

for target in submission_targets:
sub = self.get_submission_state(product_id, state=target)
if sub and sub.status and sub.status == "running":
raise RuntimeError(f"The offer {product_id} is already being published to {target}")
for sub in self.get_submissions(product_id):
if sub and sub.status and sub.status != "completed":
msg = (
f"The offer {product_id} is already being published to "
f"{sub.target.targetType}: {sub.status}/{sub.result}"
)
log.error(msg)
raise ConflictError(msg)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we create similar "wait method" as we have for AWS (AWSProductService.wait_active_changesets)?
*

def wait_active_changesets(self, entity_id: str) -> None:
"""
Get the first active changeset, if there is one, and wait for it to finish.
Args:
entity_id (str)
The Id of the entity to wait for active changesets
"""
def changeset_not_complete(change_set_list: List[ListChangeSet]) -> bool:
if change_set_list:
self.wait_for_changeset(change_set_list[0].id)
return True
else:
return False
r = Retrying(
stop=stop_after_attempt(self.wait_for_changeset_attempts),
retry=retry_if_result(changeset_not_complete),
)
try:
r(self.get_product_active_changesets, entity_id)
except RetryError:
self._raise_error(Timeout, f"Timed out waiting for {entity_id} to be unlocked")

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, yes, but I fear we would end up having some errors like "The submission cannot be pushed to as its not the latest" due to the changes we're attempting to publish not being the latest, or other weird errors like "attempting to remove an image which is already published" due to missing the version which was added in parallel, thus I thought on doing the whole operation again by retrying on pubtools.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking here: what I can do, instead, is wait for it before the whole "main" operation starts, like the first thing to do would be wait and then we do everything. Do you think it would be better? I can open a separate PR for that as well 🙂

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking on opening a different PR but I believe I can reuse this one with a commit on top of it. I had a different idea on how to take advantage of this current implementation in order to implement the "wait" feature.

I'll patch it soon

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Patched.


def wait_active_publishing(self, product_id: str) -> None:
"""
Wait when there's an existing submission in progress.

Args:
product_id (str)
The product ID of to verify the submissions state.
"""
r = Retrying(
retry=retry_if_exception_type(ConflictError),
wait=wait_fixed(self.retry_interval),
stop=stop_after_delay(max_delay=self.retry_timeout),
)
log.info("Checking for active changes on %s.", product_id)

try:
r(self.ensure_can_publish, product_id)
except RetryError:
self._raise_error(Timeout, f"Timed out waiting for {product_id} to be unlocked")

def get_plan_tech_config(self, product: Product, plan: PlanSummary) -> VMIPlanTechConfig:
"""
Expand Down Expand Up @@ -818,6 +862,7 @@ def publish(self, metadata: AzurePublishingMetadata) -> None:
plan_name = metadata.destination.split("/")[-1]
product_id = self.get_productid(product_name)
disk_version = None
self.wait_active_publishing(product_id=product_id)
log.info(
"Preparing to associate the image \"%s\" with the plan \"%s\" from product \"%s\"",
metadata.image_path,
Expand Down
2 changes: 1 addition & 1 deletion tests/ms_azure/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def auth_dict() -> Dict[str, str]:
@mock.patch("cloudpub.ms_azure.service.PartnerPortalSession")
def azure_service(auth_dict: Dict[str, str]) -> AzureService:
"""Return an instance of AzureService with mocked PartnerPortalSession."""
return AzureService(auth_dict)
return AzureService(auth_dict, retry_interval=0, retry_timeout=10)


def job_details(status: str, result: str, errors: List[Dict[str, Any]]) -> Dict[str, Any]:
Expand Down
Loading