diff --git a/mapillary_tools/api_v4.py b/mapillary_tools/api_v4.py index 40363daf..22204e1d 100644 --- a/mapillary_tools/api_v4.py +++ b/mapillary_tools/api_v4.py @@ -3,12 +3,11 @@ import enum import logging import os -import ssl import typing as T -from json import dumps import requests -from requests.adapters import HTTPAdapter + +from . import http LOG = logging.getLogger(__name__) MAPILLARY_CLIENT_TOKEN = os.getenv( @@ -17,8 +16,7 @@ MAPILLARY_GRAPH_API_ENDPOINT = os.getenv( "MAPILLARY_GRAPH_API_ENDPOINT", "https://graph.mapillary.com" ) -REQUESTS_TIMEOUT = 60 # 1 minutes -USE_SYSTEM_CERTS: bool = False +REQUESTS_TIMEOUT: float = 60 # 1 minutes class HTTPContentError(Exception): @@ -39,236 +37,19 @@ class ClusterFileType(enum.Enum): MLY_BUNDLE_MANIFEST = "mly_bundle_manifest" -class HTTPSystemCertsAdapter(HTTPAdapter): - """ - This adapter uses the system's certificate store instead of the certifi module. - - The implementation is based on the project https://pypi.org/project/pip-system-certs/, - which has a system-wide effect. - """ - - def init_poolmanager(self, *args, **kwargs): - ssl_context = ssl.create_default_context() - ssl_context.load_default_certs() - kwargs["ssl_context"] = ssl_context - - super().init_poolmanager(*args, **kwargs) - - def cert_verify(self, *args, **kwargs): - super().cert_verify(*args, **kwargs) - - # By default Python requests uses the ca_certs from the certifi module - # But we want to use the certificate store instead. - # By clearing the ca_certs variable we force it to fall back on that behaviour (handled in urllib3) - if "conn" in kwargs: - conn = kwargs["conn"] - else: - conn = args[0] - - conn.ca_certs = None - - -@T.overload -def _truncate(s: bytes, limit: int = 256) -> bytes | str: ... - - -@T.overload -def _truncate(s: str, limit: int = 256) -> str: ... - - -def _truncate(s, limit=256): - if limit < len(s): - if isinstance(s, bytes): - try: - s = s.decode("utf-8") - except UnicodeDecodeError: - pass - remaining = len(s) - limit - if isinstance(s, bytes): - return s[:limit] + f"...({remaining} bytes truncated)".encode("utf-8") - else: - return str(s[:limit]) + f"...({remaining} chars truncated)" - else: - return s - - -def _sanitize(headers: T.Mapping[T.Any, T.Any]) -> T.Mapping[T.Any, T.Any]: - new_headers = {} +def create_user_session(user_access_token: str) -> requests.Session: + session = http.Session() + session.headers["Authorization"] = f"OAuth {user_access_token}" + return session - for k, v in headers.items(): - if k.lower() in [ - "authorization", - "cookie", - "x-fb-access-token", - "access-token", - "access_token", - "password", - "user_upload_token", - ]: - new_headers[k] = "[REDACTED]" - else: - if isinstance(v, (str, bytes)): - new_headers[k] = T.cast(T.Any, _truncate(v)) - else: - new_headers[k] = v - return new_headers - - -def _log_debug_request( - method: str, - url: str, - json: dict | None = None, - params: dict | None = None, - headers: dict | None = None, - timeout: T.Any = None, -): - if logging.getLogger().getEffectiveLevel() <= logging.DEBUG: - return - - msg = f"HTTP {method} {url}" - - if USE_SYSTEM_CERTS: - msg += " (w/sys_certs)" - - if json: - t = _truncate(dumps(_sanitize(json))) - msg += f" JSON={t}" - - if params: - msg += f" PARAMS={_sanitize(params)}" - - if headers: - msg += f" HEADERS={_sanitize(headers)}" - - if timeout is not None: - msg += f" TIMEOUT={timeout}" - - msg = msg.replace("\n", "\\n") - - LOG.debug(msg) - - -def _log_debug_response(resp: requests.Response): - if logging.getLogger().getEffectiveLevel() <= logging.DEBUG: - return - - elapsed = resp.elapsed.total_seconds() * 1000 # Convert to milliseconds - msg = f"HTTP {resp.status_code} {resp.reason} ({elapsed:.0f} ms): {str(_truncate_response_content(resp))}" - - LOG.debug(msg) - - -def _truncate_response_content(resp: requests.Response) -> str | bytes: - try: - json_data = resp.json() - except requests.JSONDecodeError: - if resp.content is not None: - data = _truncate(resp.content) - else: - data = "" - else: - if isinstance(json_data, dict): - data = _truncate(dumps(_sanitize(json_data))) - else: - data = _truncate(str(json_data)) - - if isinstance(data, bytes): - return data.replace(b"\n", b"\\n") - - elif isinstance(data, str): - return data.replace("\n", "\\n") - - return data - - -def readable_http_error(ex: requests.HTTPError) -> str: - return readable_http_response(ex.response) - - -def readable_http_response(resp: requests.Response) -> str: - return f"{resp.request.method} {resp.url} => {resp.status_code} {resp.reason}: {str(_truncate_response_content(resp))}" - - -def request_post( - url: str, - data: T.Any | None = None, - json: dict | None = None, - disable_debug=False, - **kwargs, -) -> requests.Response: - global USE_SYSTEM_CERTS - - if not disable_debug: - _log_debug_request( - "POST", - url, - json=json, - params=kwargs.get("params"), - headers=kwargs.get("headers"), - timeout=kwargs.get("timeout"), - ) - - if USE_SYSTEM_CERTS: - with requests.Session() as session: - session.mount("https://", HTTPSystemCertsAdapter()) - resp = session.post(url, data=data, json=json, **kwargs) - - else: - try: - resp = requests.post(url, data=data, json=json, **kwargs) - except requests.exceptions.SSLError as ex: - if "SSLCertVerificationError" not in str(ex): - raise ex - USE_SYSTEM_CERTS = True - # HTTPSConnectionPool(host='graph.mapillary.com', port=443): Max retries exceeded with url: /login (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1018)'))) - LOG.warning( - "SSL error occurred, falling back to system SSL certificates: %s", ex - ) - return request_post(url, data=data, json=json, **kwargs) - - if not disable_debug: - _log_debug_response(resp) - - return resp - - -def request_get( - url: str, params: dict | None = None, disable_debug=False, **kwargs -) -> requests.Response: - global USE_SYSTEM_CERTS - - if not disable_debug: - _log_debug_request( - "GET", - url, - params=kwargs.get("params"), - headers=kwargs.get("headers"), - # Do not log timeout here as it's always set to REQUESTS_TIMEOUT - timeout=None, - ) - - if USE_SYSTEM_CERTS: - with requests.Session() as session: - session.mount("https://", HTTPSystemCertsAdapter()) - resp = session.get(url, params=params, **kwargs) - else: - try: - resp = requests.get(url, params=params, **kwargs) - except requests.exceptions.SSLError as ex: - if "SSLCertVerificationError" not in str(ex): - raise ex - USE_SYSTEM_CERTS = True - # HTTPSConnectionPool(host='graph.mapillary.com', port=443): Max retries exceeded with url: /login (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1018)'))) - LOG.warning( - "SSL error occurred, falling back to system SSL certificates: %s", ex - ) - resp = request_get(url, params=params, **kwargs) - - if not disable_debug: - _log_debug_response(resp) - - return resp +def create_client_session(disable_logging: bool = False) -> requests.Session: + session = http.Session() + session.headers["Authorization"] = f"OAuth {MAPILLARY_CLIENT_TOKEN}" + if disable_logging: + session.disable_logging_request = True + session.disable_logging_response = True + return session def is_auth_error(resp: requests.Response) -> bool: @@ -309,54 +90,42 @@ def extract_auth_error_message(resp: requests.Response) -> str: return resp.text -def get_upload_token(email: str, password: str) -> requests.Response: - resp = request_post( - f"{MAPILLARY_GRAPH_API_ENDPOINT}/login", - headers={"Authorization": f"OAuth {MAPILLARY_CLIENT_TOKEN}"}, - json={"email": email, "password": password, "locale": "en_US"}, - timeout=REQUESTS_TIMEOUT, - ) +def get_upload_token( + client_session: requests.Session, email: str, password: str +) -> requests.Response: + url = f"{MAPILLARY_GRAPH_API_ENDPOINT}/login" + json_data = {"email": email, "password": password, "locale": "en_US"} + + resp = client_session.post(url, json=json_data, timeout=REQUESTS_TIMEOUT) resp.raise_for_status() + return resp def fetch_organization( - user_access_token: str, organization_id: int | str + user_session: requests.Session, organization_id: int | str ) -> requests.Response: - resp = request_get( - f"{MAPILLARY_GRAPH_API_ENDPOINT}/{organization_id}", - params={ - "fields": ",".join(["slug", "description", "name"]), - }, - headers={ - "Authorization": f"OAuth {user_access_token}", - }, - timeout=REQUESTS_TIMEOUT, - ) + url = f"{MAPILLARY_GRAPH_API_ENDPOINT}/{organization_id}" + params = {"fields": ",".join(["slug", "description", "name"])} + + resp = user_session.get(url, params=params, timeout=REQUESTS_TIMEOUT) resp.raise_for_status() + return resp def fetch_user_or_me( - user_access_token: str, user_id: int | str | None = None + user_session: requests.Session, user_id: int | str | None = None ) -> requests.Response: if user_id is None: url = f"{MAPILLARY_GRAPH_API_ENDPOINT}/me" else: url = f"{MAPILLARY_GRAPH_API_ENDPOINT}/{user_id}" + params = {"fields": ",".join(["id", "username"])} - resp = request_get( - url, - params={ - "fields": ",".join(["id", "username"]), - }, - headers={ - "Authorization": f"OAuth {user_access_token}", - }, - timeout=REQUESTS_TIMEOUT, - ) - + resp = user_session.get(url, params=params, timeout=REQUESTS_TIMEOUT) resp.raise_for_status() + return resp @@ -365,42 +134,33 @@ def fetch_user_or_me( ] -def log_event(action_type: ActionType, properties: dict) -> requests.Response: - resp = request_post( - f"{MAPILLARY_GRAPH_API_ENDPOINT}/logging", - json={"action_type": action_type, "properties": properties}, - headers={ - "Authorization": f"OAuth {MAPILLARY_CLIENT_TOKEN}", - }, - timeout=REQUESTS_TIMEOUT, - disable_debug=True, - ) +def log_event( + client_session: requests.Session, action_type: ActionType, properties: dict +) -> requests.Response: + url = f"{MAPILLARY_GRAPH_API_ENDPOINT}/logging" + json_data = {"action_type": action_type, "properties": properties} + + resp = client_session.post(url, json=json_data, timeout=REQUESTS_TIMEOUT) resp.raise_for_status() + return resp def finish_upload( - user_access_token: str, + user_session: requests.Session, file_handle: str, cluster_filetype: ClusterFileType, organization_id: int | str | None = None, ) -> requests.Response: - data: dict[str, str | int] = { + url = f"{MAPILLARY_GRAPH_API_ENDPOINT}/finish_upload" + json_data: dict[str, str | int] = { "file_handle": file_handle, "file_type": cluster_filetype.value, } if organization_id is not None: - data["organization_id"] = organization_id - - resp = request_post( - f"{MAPILLARY_GRAPH_API_ENDPOINT}/finish_upload", - headers={ - "Authorization": f"OAuth {user_access_token}", - }, - json=data, - timeout=REQUESTS_TIMEOUT, - ) + json_data["organization_id"] = organization_id + resp = user_session.post(url, json=json_data, timeout=REQUESTS_TIMEOUT) resp.raise_for_status() return resp diff --git a/mapillary_tools/authenticate.py b/mapillary_tools/authenticate.py index c2f9b8f6..9e5217d7 100644 --- a/mapillary_tools/authenticate.py +++ b/mapillary_tools/authenticate.py @@ -10,7 +10,7 @@ import requests -from . import api_v4, config, constants, exceptions +from . import api_v4, config, constants, exceptions, http LOG = logging.getLogger(__name__) @@ -77,11 +77,11 @@ def authenticate( # TODO: print more user information if profile_name in all_user_items: LOG.info( - 'Profile "%s" updated: %s', profile_name, api_v4._sanitize(user_items) + 'Profile "%s" updated: %s', profile_name, http._sanitize(user_items) ) else: LOG.info( - 'Profile "%s" created: %s', profile_name, api_v4._sanitize(user_items) + 'Profile "%s" created: %s', profile_name, http._sanitize(user_items) ) @@ -134,9 +134,8 @@ def fetch_user_items( ) if organization_key is not None: - resp = api_v4.fetch_organization( - user_items["user_upload_token"], organization_key - ) + with api_v4.create_user_session(user_items["user_upload_token"]) as session: + resp = api_v4.fetch_organization(session, organization_key) data = api_v4.jsonify_response(resp) LOG.info( f"Uploading to organization: {data.get('name')} (ID: {data.get('id')})" @@ -173,16 +172,15 @@ def _verify_user_auth(user_items: config.UserItem) -> config.UserItem: if constants._AUTH_VERIFICATION_DISABLED: return user_items - try: - resp = api_v4.fetch_user_or_me( - user_access_token=user_items["user_upload_token"] - ) - except requests.HTTPError as ex: - if api_v4.is_auth_error(ex.response): - message = api_v4.extract_auth_error_message(ex.response) - raise exceptions.MapillaryUploadUnauthorizedError(message) - else: - raise ex + with api_v4.create_user_session(user_items["user_upload_token"]) as session: + try: + resp = api_v4.fetch_user_or_me(session) + except requests.HTTPError as ex: + if api_v4.is_auth_error(ex.response): + message = api_v4.extract_auth_error_message(ex.response) + raise exceptions.MapillaryUploadUnauthorizedError(message) + else: + raise ex data = api_v4.jsonify_response(resp) @@ -276,16 +274,17 @@ def _prompt_login( if user_password: break - try: - resp = api_v4.get_upload_token(user_email, user_password) - except requests.HTTPError as ex: - if not _enabled: - raise ex + with api_v4.create_client_session() as session: + try: + resp = api_v4.get_upload_token(session, user_email, user_password) + except requests.HTTPError as ex: + if not _enabled: + raise ex - if _is_login_retryable(ex): - return _prompt_login() + if _is_login_retryable(ex): + return _prompt_login() - raise ex + raise ex data = api_v4.jsonify_response(resp) diff --git a/mapillary_tools/commands/upload.py b/mapillary_tools/commands/upload.py index 63675345..b4cbe1b2 100644 --- a/mapillary_tools/commands/upload.py +++ b/mapillary_tools/commands/upload.py @@ -23,6 +23,13 @@ def add_common_upload_options(group): default=None, required=False, ) + group.add_argument( + "--num_upload_workers", + help="Number of concurrent upload workers for uploading images. [default: %(default)s]", + default=constants.MAX_IMAGE_UPLOAD_WORKERS, + type=int, + required=False, + ) group.add_argument( "--reupload", help="Re-upload data that has already been uploaded.", diff --git a/mapillary_tools/constants.py b/mapillary_tools/constants.py index 1deee80c..15372d75 100644 --- a/mapillary_tools/constants.py +++ b/mapillary_tools/constants.py @@ -154,16 +154,18 @@ def _parse_scaled_integers( # The minimal upload speed is used to calculate the read timeout to avoid upload hanging: # timeout = upload_size / MIN_UPLOAD_SPEED MIN_UPLOAD_SPEED: int | None = _parse_filesize( - os.getenv(_ENV_PREFIX + "MIN_UPLOAD_SPEED", "50K") # 50 KiB/s + os.getenv(_ENV_PREFIX + "MIN_UPLOAD_SPEED", "50K") # 50 Kb/s ) +# Maximum number of parallel workers for uploading images within a single sequence. +# NOTE: Sequences themselves are uploaded sequentially, not in parallel. MAX_IMAGE_UPLOAD_WORKERS: int = int( - os.getenv(_ENV_PREFIX + "MAX_IMAGE_UPLOAD_WORKERS", 64) + os.getenv(_ENV_PREFIX + "MAX_IMAGE_UPLOAD_WORKERS", 4) ) # The chunk size in MB (see chunked transfer encoding https://en.wikipedia.org/wiki/Chunked_transfer_encoding) # for uploading data to MLY upload service. # Changing this size does not change the number of requests nor affect upload performance, # but it affects the responsiveness of the upload progress bar -UPLOAD_CHUNK_SIZE_MB: float = float(os.getenv(_ENV_PREFIX + "UPLOAD_CHUNK_SIZE_MB", 1)) +UPLOAD_CHUNK_SIZE_MB: float = float(os.getenv(_ENV_PREFIX + "UPLOAD_CHUNK_SIZE_MB", 2)) MAX_UPLOAD_RETRIES: int = int(os.getenv(_ENV_PREFIX + "MAX_UPLOAD_RETRIES", 200)) MAPILLARY__ENABLE_UPLOAD_HISTORY_FOR_DRY_RUN: bool = _yes_or_no( os.getenv("MAPILLARY__ENABLE_UPLOAD_HISTORY_FOR_DRY_RUN", "NO") diff --git a/mapillary_tools/http.py b/mapillary_tools/http.py new file mode 100644 index 00000000..3f072120 --- /dev/null +++ b/mapillary_tools/http.py @@ -0,0 +1,211 @@ +from __future__ import annotations + +import logging + +import ssl +import sys +import typing as T +from json import dumps + +if sys.version_info >= (3, 12): + from typing import override +else: + from typing_extensions import override + +import requests +from requests.adapters import HTTPAdapter + + +LOG = logging.getLogger(__name__) + + +class HTTPSystemCertsAdapter(HTTPAdapter): + """ + This adapter uses the system's certificate store instead of the certifi module. + + The implementation is based on the project https://pypi.org/project/pip-system-certs/, + which has a system-wide effect. + """ + + def init_poolmanager(self, *args, **kwargs): + ssl_context = ssl.create_default_context() + ssl_context.load_default_certs() + kwargs["ssl_context"] = ssl_context + + super().init_poolmanager(*args, **kwargs) + + def cert_verify(self, *args, **kwargs): + super().cert_verify(*args, **kwargs) + + # By default Python requests uses the ca_certs from the certifi module + # But we want to use the certificate store instead. + # By clearing the ca_certs variable we force it to fall back on that behaviour (handled in urllib3) + if "conn" in kwargs: + conn = kwargs["conn"] + else: + conn = args[0] + + conn.ca_certs = None + + +class Session(requests.Session): + # NOTE: This is a global flag that affects all Session instances + USE_SYSTEM_CERTS: T.ClassVar[bool] = False + # Instance variables + disable_logging_request: bool = False + disable_logging_response: bool = False + # Avoid mounting twice + _mounted: bool = False + + @override + def request(self, method: str | bytes, url: str | bytes, *args, **kwargs): + self._log_debug_request(method, url, *args, **kwargs) + + if Session.USE_SYSTEM_CERTS: + if not self._mounted: + self.mount("https://", HTTPSystemCertsAdapter()) + self._mounted = True + resp = super().request(method, url, *args, **kwargs) + else: + try: + resp = super().request(method, url, *args, **kwargs) + except requests.exceptions.SSLError as ex: + if "SSLCertVerificationError" not in str(ex): + raise ex + Session.USE_SYSTEM_CERTS = True + # HTTPSConnectionPool(host='graph.mapillary.com', port=443): Max retries exceeded with url: /login (Caused by SSLError(SSLCertVerificationError(1, '[SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1018)'))) + LOG.warning( + "SSL error occurred, falling back to system SSL certificates: %s", + ex, + ) + return self.request(method, url, *args, **kwargs) + + self._log_debug_response(resp) + + return resp + + def _log_debug_request(self, method: str | bytes, url: str | bytes, **kwargs): + if self.disable_logging_request: + return + + if logging.getLogger().getEffectiveLevel() <= logging.DEBUG: + return + + if isinstance(method, str) and isinstance(url, str): + msg = f"HTTP {method} {url}" + else: + msg = f"HTTP {method!r} {url!r}" + + if Session.USE_SYSTEM_CERTS: + msg += " (w/sys_certs)" + + json = kwargs.get("json") + if json is not None: + t = _truncate(dumps(_sanitize(json))) + msg += f" JSON={t}" + + params = kwargs.get("params") + if params is not None: + msg += f" PARAMS={_sanitize(params)}" + + headers = kwargs.get("headers") + if headers is not None: + msg += f" HEADERS={_sanitize(headers)}" + + timeout = kwargs.get("timeout") + if timeout is not None: + msg += f" TIMEOUT={timeout}" + + msg = msg.replace("\n", "\\n") + + LOG.debug(msg) + + def _log_debug_response(self, resp: requests.Response): + if self.disable_logging_response: + return + + if logging.getLogger().getEffectiveLevel() <= logging.DEBUG: + return + + elapsed = resp.elapsed.total_seconds() * 1000 # Convert to milliseconds + msg = f"HTTP {resp.status_code} {resp.reason} ({elapsed:.0f} ms): {str(_truncate_response_content(resp))}" + + LOG.debug(msg) + + +def readable_http_error(ex: requests.HTTPError) -> str: + return readable_http_response(ex.response) + + +def readable_http_response(resp: requests.Response) -> str: + return f"{resp.request.method} {resp.url} => {resp.status_code} {resp.reason}: {str(_truncate_response_content(resp))}" + + +@T.overload +def _truncate(s: bytes, limit: int = 256) -> bytes | str: ... + + +@T.overload +def _truncate(s: str, limit: int = 256) -> str: ... + + +def _truncate(s, limit=256): + if limit < len(s): + if isinstance(s, bytes): + try: + s = s.decode("utf-8") + except UnicodeDecodeError: + pass + remaining = len(s) - limit + if isinstance(s, bytes): + return s[:limit] + f"...({remaining} bytes truncated)".encode("utf-8") + else: + return str(s[:limit]) + f"...({remaining} chars truncated)" + else: + return s + + +def _sanitize(headers: T.Mapping[T.Any, T.Any]) -> T.Mapping[T.Any, T.Any]: + new_headers = {} + + for k, v in headers.items(): + if k.lower() in [ + "authorization", + "cookie", + "x-fb-access-token", + "access-token", + "access_token", + "password", + "user_upload_token", + ]: + new_headers[k] = "[REDACTED]" + else: + if isinstance(v, (str, bytes)): + new_headers[k] = T.cast(T.Any, _truncate(v)) + else: + new_headers[k] = v + + return new_headers + + +def _truncate_response_content(resp: requests.Response) -> str | bytes: + try: + json_data = resp.json() + except requests.JSONDecodeError: + if resp.content is not None: + data = _truncate(resp.content) + else: + data = "" + else: + if isinstance(json_data, dict): + data = _truncate(dumps(_sanitize(json_data))) + else: + data = _truncate(str(json_data)) + + if isinstance(data, bytes): + return data.replace(b"\n", b"\\n") + + elif isinstance(data, str): + return data.replace("\n", "\\n") + + return data diff --git a/mapillary_tools/upload.py b/mapillary_tools/upload.py index 14aeb9ce..41f43101 100644 --- a/mapillary_tools/upload.py +++ b/mapillary_tools/upload.py @@ -20,6 +20,7 @@ constants, exceptions, history, + http, ipc, types, uploader, @@ -41,6 +42,7 @@ class UploadedAlready(uploader.SequenceError): def upload( import_path: Path | T.Sequence[Path], user_items: config.UserItem, + num_upload_workers: int, desc_path: str | None = None, _metadatas_from_process: T.Sequence[types.MetadataOrError] | None = None, reupload: bool = False, @@ -84,15 +86,18 @@ def upload( # Send the progress via IPC, and log the progress in debug mode _setup_ipc(emitter) - mly_uploader = uploader.Uploader( - uploader.UploadOptions( + try: + upload_options = uploader.UploadOptions( user_items, dry_run=dry_run, nofinish=nofinish, noresume=noresume, - ), - emitter=emitter, - ) + num_upload_workers=num_upload_workers, + ) + except ValueError as ex: + raise exceptions.MapillaryBadParameterError(str(ex)) from ex + + mly_uploader = uploader.Uploader(upload_options, emitter=emitter) results = _gen_upload_everything( mly_uploader, metadatas, import_paths, skip_subfolders @@ -160,10 +165,10 @@ def log_exception(ex: Exception) -> None: if isinstance(ex, UploadedAlready): LOG.info(f"{exc_name}: {ex}") elif isinstance(ex, requests.HTTPError): - LOG.error(f"{exc_name}: {api_v4.readable_http_error(ex)}", exc_info=exc_info) + LOG.error(f"{exc_name}: {http.readable_http_error(ex)}", exc_info=exc_info) elif isinstance(ex, api_v4.HTTPContentError): LOG.error( - f"{exc_name}: {ex}: {api_v4.readable_http_response(ex.response)}", + f"{exc_name}: {ex}: {http.readable_http_response(ex.response)}", exc_info=exc_info, ) else: @@ -202,31 +207,25 @@ def check_duplication(payload: uploader.Progress): record = history.read_history_record(md5sum) if record is not None: - sequence_uuid = payload.get("sequence_uuid") history_desc_path = history.history_desc_path(md5sum) uploaded_at = record.get("summary", {}).get("upload_end_time", None) - if sequence_uuid is None: - basename = os.path.basename(payload.get("import_path", "")) - name = f"file {basename}" - - else: - name = f"sequence {sequence_uuid}" + upload_name = uploader.Uploader._upload_name(payload) if reupload: if uploaded_at is not None: LOG.info( - f"Reuploading {name}: previously uploaded {humanize.naturaldelta(time.time() - uploaded_at)} ago ({time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(uploaded_at))})" + f"Reuploading {upload_name}, despite being uploaded {humanize.naturaldelta(time.time() - uploaded_at)} ago ({time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(uploaded_at))})" ) else: LOG.info( - f"Reuploading {name}: already uploaded, see {history_desc_path}" + f"Reuploading {upload_name}, despite already being uploaded (see {history_desc_path})" ) else: if uploaded_at is not None: - msg = f"Skipping {name}: previously uploaded {humanize.naturaldelta(time.time() - uploaded_at)} ago ({time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(uploaded_at))})" + msg = f"Skipping {upload_name}, already uploaded {humanize.naturaldelta(time.time() - uploaded_at)} ago ({time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(uploaded_at))})" else: - msg = f"Skipping {name}: already uploaded, see {history_desc_path}" + msg = f"Skipping {upload_name}, already uploaded (see {history_desc_path})" raise UploadedAlready(msg) @emitter.on("upload_finished") @@ -409,8 +408,8 @@ def collect_restart_time(payload: _APIStats) -> None: payload["offset"], payload.get("upload_first_offset", payload["offset"]) ) - @emitter.on("upload_interrupted") - def collect_interrupted(payload: _APIStats): + @emitter.on("upload_retrying") + def collect_retrying(payload: _APIStats): # could be None if it failed to fetch offset restart_time = payload.get("upload_last_restart_time") if restart_time is not None: @@ -494,7 +493,7 @@ def _show_upload_summary(stats: T.Sequence[_APIStats], errors: T.Sequence[Except LOG.info( f"{humanize.naturalsize(summary['uploaded_size'] * 1024 * 1024)} uploaded" ) - LOG.info(f"{summary['time']} upload time") + LOG.info(f"{summary['time']:.3f} seconds upload time") else: LOG.info("Nothing uploaded. Bye.") @@ -507,14 +506,16 @@ def _api_logging_finished(summary: dict, dry_run: bool = False): return action: api_v4.ActionType = "upload_finished_upload" - try: - api_v4.log_event(action, summary) - except requests.HTTPError as exc: - LOG.warning( - f"HTTPError from logging action {action}: {api_v4.readable_http_error(exc)}" - ) - except Exception: - LOG.warning(f"Error from logging action {action}", exc_info=True) + + with api_v4.create_client_session(disable_logging=True) as client_session: + try: + api_v4.log_event(client_session, action, summary) + except requests.HTTPError as exc: + LOG.warning( + f"HTTPError from logging action {action}: {http.readable_http_error(exc)}" + ) + except Exception: + LOG.warning(f"Error from logging action {action}", exc_info=True) def _api_logging_failed(payload: dict, exc: Exception, dry_run: bool = False): @@ -526,14 +527,16 @@ def _api_logging_failed(payload: dict, exc: Exception, dry_run: bool = False): payload_with_reason = {**payload, "reason": exc.__class__.__name__} action: api_v4.ActionType = "upload_failed_upload" - try: - api_v4.log_event(action, payload_with_reason) - except requests.HTTPError as exc: - LOG.warning( - f"HTTPError from logging action {action}: {api_v4.readable_http_error(exc)}" - ) - except Exception: - LOG.warning(f"Error from logging action {action}", exc_info=True) + + with api_v4.create_client_session(disable_logging=True) as client_session: + try: + api_v4.log_event(client_session, action, payload_with_reason) + except requests.HTTPError as exc: + LOG.warning( + f"HTTPError from logging action {action}: {http.readable_http_error(exc)}" + ) + except Exception: + LOG.warning(f"Error from logging action {action}", exc_info=True) _M = T.TypeVar("_M", bound=types.Metadata) @@ -557,9 +560,10 @@ def _gen_upload_everything( (m for m in metadatas if isinstance(m, types.ImageMetadata)), utils.find_images(import_paths, skip_subfolders=skip_subfolders), ) - yield from uploader.ImageSequenceUploader.upload_images( - mly_uploader, image_metadatas + image_uploader = uploader.ImageSequenceUploader( + mly_uploader.upload_options, emitter=mly_uploader.emitter ) + yield from image_uploader.upload_images(image_metadatas) # Upload videos video_metadatas = _find_metadata_with_filename_existed_in( diff --git a/mapillary_tools/upload_api_v4.py b/mapillary_tools/upload_api_v4.py index 7e6819db..c94a9b01 100644 --- a/mapillary_tools/upload_api_v4.py +++ b/mapillary_tools/upload_api_v4.py @@ -17,13 +17,7 @@ import requests -from .api_v4 import ( - HTTPContentError, - jsonify_response, - request_get, - request_post, - REQUESTS_TIMEOUT, -) +from .api_v4 import HTTPContentError, jsonify_response, REQUESTS_TIMEOUT MAPILLARY_UPLOAD_ENDPOINT = os.getenv( "MAPILLARY_UPLOAD_ENDPOINT", "https://rupload.facebook.com/mapillary_public_uploads" @@ -38,21 +32,17 @@ class UploadService: user_access_token: str session_key: str - def __init__(self, user_access_token: str, session_key: str): - self.user_access_token = user_access_token + def __init__(self, user_session: requests.Session, session_key: str): + self.user_session = user_session self.session_key = session_key def fetch_offset(self) -> int: - headers = { - "Authorization": f"OAuth {self.user_access_token}", - } url = f"{MAPILLARY_UPLOAD_ENDPOINT}/{self.session_key}" - resp = request_get(url, headers=headers, timeout=REQUESTS_TIMEOUT) + resp = self.user_session.get(url, timeout=REQUESTS_TIMEOUT) resp.raise_for_status() data = jsonify_response(resp) - try: return data["offset"] except KeyError: @@ -158,23 +148,21 @@ def upload_shifted_chunks( Upload the chunks that must already be shifted by the offset (e.g. fp.seek(offset, io.SEEK_SET)) """ + url = f"{MAPILLARY_UPLOAD_ENDPOINT}/{self.session_key}" headers = { - "Authorization": f"OAuth {self.user_access_token}", "Offset": f"{offset}", "X-Entity-Name": self.session_key, } - url = f"{MAPILLARY_UPLOAD_ENDPOINT}/{self.session_key}" - resp = request_post( + + resp = self.user_session.post( url, headers=headers, data=shifted_chunks, - timeout=(REQUESTS_TIMEOUT, read_timeout), + timeout=(REQUESTS_TIMEOUT, read_timeout), # type: ignore ) - resp.raise_for_status() data = jsonify_response(resp) - try: return data["h"] except KeyError: diff --git a/mapillary_tools/uploader.py b/mapillary_tools/uploader.py index 953ad457..d23c2549 100644 --- a/mapillary_tools/uploader.py +++ b/mapillary_tools/uploader.py @@ -7,6 +7,7 @@ import json import logging import os +import queue import struct import sys import tempfile @@ -54,10 +55,20 @@ class UploadOptions: user_items: config.UserItem chunk_size: int = int(constants.UPLOAD_CHUNK_SIZE_MB * 1024 * 1024) + num_upload_workers: int = constants.MAX_IMAGE_UPLOAD_WORKERS dry_run: bool = False nofinish: bool = False noresume: bool = False + def __post_init__(self): + if self.num_upload_workers <= 0: + raise ValueError( + f"Expect positive num_upload_workers but got {self.num_upload_workers}" + ) + + if self.chunk_size <= 0: + raise ValueError(f"Expect positive chunk_size but got {self.chunk_size}") + class UploaderProgress(T.TypedDict, total=True): """ @@ -83,7 +94,7 @@ class UploaderProgress(T.TypedDict, total=True): # - offset == entity_size when "upload_end" or "upload_finished" entity_size: int - # An "upload_interrupted" will increase it. Reset to 0 if a chunk is uploaded + # An "upload_retrying" will increase it. Reset to 0 if a chunk is uploaded retries: int # Cluster ID after finishing the upload @@ -115,7 +126,7 @@ class SequenceProgress(T.TypedDict, total=False): # MAPSequenceUUID. It is only available for directory uploading sequence_uuid: str - # Path to the Zipfile/BlackVue/CAMM + # Path to the image/video/zip import_path: str @@ -143,11 +154,38 @@ class InvalidMapillaryZipFileError(SequenceError): pass +# BELOW demonstrates the pseudocode for a typical upload workflow +# and when upload events are emitted +################################################################# +# def pseudo_upload(data): +# emit("upload_start") +# while True: +# try: +# if is_sequence(data): +# for image in data: +# upload_image(image) +# emit("upload_progress") +# elif is_video(data): +# for chunk in data: +# upload_chunk(chunk) +# emit("upload_progress") +# except BaseException as ex: # Include KeyboardInterrupt +# if retryable(ex): +# emit("upload_retrying") +# continue +# else: +# emit("upload_failed") +# raise ex +# else: +# break +# emit("upload_end") +# finish_upload(data) +# emit("upload_finished") EventName = T.Literal[ "upload_start", "upload_fetch_offset", "upload_progress", - "upload_interrupted", + "upload_retrying", "upload_end", "upload_failed", "upload_finished", @@ -478,7 +516,7 @@ def _wip_file_context(cls, wip_path: Path): upload_md5sum = utils.md5sum_fp(fp).hexdigest() done_path = wip_path.parent.joinpath( - _session_key(upload_md5sum, api_v4.ClusterFileType.ZIP) + _suffix_session_key(upload_md5sum, api_v4.ClusterFileType.ZIP) ) try: @@ -494,9 +532,12 @@ def _wip_file_context(cls, wip_path: Path): class ImageSequenceUploader: - @classmethod + def __init__(self, upload_options: UploadOptions, emitter: EventEmitter): + self.upload_options = upload_options + self.emitter = emitter + def upload_images( - cls, uploader: Uploader, image_metadatas: T.Sequence[types.ImageMetadata] + self, image_metadatas: T.Sequence[types.ImageMetadata] ) -> T.Generator[tuple[str, UploadResult], None, None]: sequences = types.group_and_sort_images(image_metadatas) @@ -514,53 +555,49 @@ def upload_images( } try: - cluster_id = cls._upload_sequence( - uploader, + cluster_id = self._upload_sequence_and_finish( sequence, - progress=T.cast(dict[str, T.Any], sequence_progress), + sequence_progress=T.cast(dict[str, T.Any], sequence_progress), ) except Exception as ex: yield sequence_uuid, UploadResult(error=ex) else: yield sequence_uuid, UploadResult(result=cluster_id) - @classmethod - def _upload_sequence( - cls, - uploader: Uploader, + def _upload_sequence_and_finish( + self, sequence: T.Sequence[types.ImageMetadata], - progress: dict[str, T.Any], + sequence_progress: dict[str, T.Any], ) -> str: _validate_metadatas(sequence) - progress["entity_size"] = sum(m.filesize or 0 for m in sequence) - uploader.emitter.emit("upload_start", progress) + sequence_progress["entity_size"] = sum(m.filesize or 0 for m in sequence) + self.emitter.emit("upload_start", sequence_progress) - single_image_uploader = SingleImageUploader(uploader, progress=progress) - with concurrent.futures.ThreadPoolExecutor( - max_workers=constants.MAX_IMAGE_UPLOAD_WORKERS - ) as executor: - image_file_handles = list( - executor.map(single_image_uploader.upload, sequence) + try: + # Retries will be handled in the call (but no upload event emissions) + image_file_handles = self._upload_images_parallel( + sequence, sequence_progress ) + except BaseException as ex: # Include KeyboardInterrupt + self.emitter.emit("upload_failed", sequence_progress) + raise ex - manifest_file_handle = cls._upload_manifest(uploader, image_file_handles) + manifest_file_handle = self._upload_manifest(image_file_handles) - uploader.emitter.emit("upload_end", progress) + self.emitter.emit("upload_end", sequence_progress) + uploader = Uploader(self.upload_options, emitter=self.emitter) cluster_id = uploader.finish_upload( manifest_file_handle, api_v4.ClusterFileType.MLY_BUNDLE_MANIFEST, - progress=progress, + progress=sequence_progress, ) return cluster_id - @classmethod - def _upload_manifest( - cls, uploader: Uploader, image_file_handles: T.Sequence[str] - ) -> str: - uploader_without_emitter = Uploader(uploader.upload_options) + def _upload_manifest(self, image_file_handles: T.Sequence[str]) -> str: + uploader = Uploader(self.upload_options) manifest = { "version": "1", @@ -575,53 +612,154 @@ def _upload_manifest( ) ) manifest_fp.seek(0, io.SEEK_SET) - return uploader_without_emitter.upload_stream( + return uploader.upload_stream( manifest_fp, session_key=f"{_prefixed_uuid4()}.json" ) + def _upload_images_parallel( + self, + sequence: T.Sequence[types.ImageMetadata], + sequence_progress: dict[str, T.Any], + ) -> list[str]: + if not sequence: + return [] + + max_workers = min(self.upload_options.num_upload_workers, len(sequence)) + + # Lock is used to synchronize event emission + lock = threading.Lock() + + # Push all images into the queue + image_queue: queue.Queue[tuple[int, types.ImageMetadata]] = queue.Queue() + for idx, image_metadata in enumerate(sequence): + image_queue.put((idx, image_metadata)) + + upload_interrupted = threading.Event() + + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = [ + executor.submit( + self._upload_images_from_queue, + image_queue, + lock, + upload_interrupted, + sequence_progress, + ) + for _ in range(max_workers) + ] + + indexed_image_file_handles = [] + + try: + for future in futures: + indexed_image_file_handles.extend(future.result()) + except KeyboardInterrupt as ex: + upload_interrupted.set() + raise ex + + # All tasks should be done here, so below is more like assertion + image_queue.join() + if sys.version_info >= (3, 13): + image_queue.shutdown() + + file_handles: list[str] = [] + + indexed_image_file_handles.sort() + + # Important to guarantee the order + assert len(indexed_image_file_handles) == len(sequence) + for expected_idx, (idx, file_handle) in enumerate(indexed_image_file_handles): + assert expected_idx == idx + file_handles.append(file_handle) + + return file_handles + + def _upload_images_from_queue( + self, + image_queue: queue.Queue[tuple[int, types.ImageMetadata]], + lock: threading.Lock, + upload_interrupted: threading.Event, + sequence_progress: dict[str, T.Any], + ) -> list[tuple[int, str]]: + indexed_file_handles = [] + + with api_v4.create_user_session( + self.upload_options.user_items["user_upload_token"] + ) as user_session: + single_image_uploader = SingleImageUploader( + self.upload_options, user_session=user_session + ) + + while True: + # Assert that all images are already pushed into the queue + try: + idx, image_metadata = image_queue.get_nowait() + except queue.Empty: + break + + # Main thread will handle the interruption + if upload_interrupted.is_set(): + break + + # Create a new mutatble progress to keep the sequence_progress immutable + image_progress = { + **sequence_progress, + "import_path": str(image_metadata.filename), + } + + # image_progress will be updated during uploading + file_handle = single_image_uploader.upload( + image_metadata, image_progress + ) + + # Update chunk_size (it was constant if set) + image_progress["chunk_size"] = image_metadata.filesize + + # Main thread will handle the interruption + if upload_interrupted.is_set(): + break + + with lock: + self.emitter.emit("upload_progress", image_progress) + + indexed_file_handles.append((idx, file_handle)) + + image_queue.task_done() + + return indexed_file_handles + class SingleImageUploader: def __init__( self, - uploader: Uploader, - progress: dict[str, T.Any] | None = None, + upload_options: UploadOptions, + user_session: requests.Session | None = None, ): - self.uploader = uploader - self.progress = progress or {} - self.lock = threading.Lock() + self.upload_options = upload_options + self.user_session = user_session self.cache = self._maybe_create_persistent_cache_instance( - uploader.upload_options.user_items + self.upload_options.user_items, upload_options ) - def upload(self, image_metadata: types.ImageMetadata) -> str: - mutable_progress = { - **(self.progress or {}), - "filename": str(image_metadata.filename), - } - + def upload( + self, image_metadata: types.ImageMetadata, image_progress: dict[str, T.Any] + ) -> str: image_bytes = self.dump_image_bytes(image_metadata) - uploader_without_emitter = Uploader(self.uploader.upload_options) + uploader = Uploader(self.upload_options, user_session=self.user_session) - session_key = uploader_without_emitter._gen_session_key( - io.BytesIO(image_bytes), mutable_progress - ) + session_key = uploader._gen_session_key(io.BytesIO(image_bytes), image_progress) - file_handle = self._file_handle_cache_get(session_key) + file_handle = self._get_cached_file_handle(session_key) if file_handle is None: - file_handle = uploader_without_emitter.upload_stream( + # image_progress will be updated during uploading + file_handle = uploader.upload_stream( io.BytesIO(image_bytes), session_key=session_key, - progress=mutable_progress, + progress=image_progress, ) - self._file_handle_cache_set(session_key, file_handle) - - # Override chunk_size with the actual filesize - mutable_progress["chunk_size"] = image_metadata.filesize - - with self.lock: - self.uploader.emitter.emit("upload_progress", mutable_progress) + self._set_file_handle_cache(session_key, file_handle) return file_handle @@ -648,7 +786,7 @@ def dump_image_bytes(cls, metadata: types.ImageMetadata) -> bytes: @classmethod def _maybe_create_persistent_cache_instance( - cls, user_items: config.UserItem + cls, user_items: config.UserItem, upload_options: UploadOptions ) -> history.PersistentCache | None: if not constants.UPLOAD_CACHE_DIR: LOG.debug( @@ -656,6 +794,10 @@ def _maybe_create_persistent_cache_instance( ) return None + if upload_options.dry_run: + LOG.debug("Dry-run mode enabled, skipping caching upload file handles") + return None + cache_path_dir = ( Path(constants.UPLOAD_CACHE_DIR) .joinpath(api_v4.MAPILLARY_CLIENT_TOKEN.replace("|", "_")) @@ -665,14 +807,22 @@ def _maybe_create_persistent_cache_instance( ) cache_path_dir.mkdir(parents=True, exist_ok=True) cache_path = cache_path_dir.joinpath("cached_file_handles") - LOG.debug(f"File handle cache path: {cache_path}") + + # Sanitize sensitive segments for logging + sanitized_cache_path = ( + Path(constants.UPLOAD_CACHE_DIR) + .joinpath("***") + .joinpath("***") + .joinpath("cached_file_handles") + ) + LOG.debug(f"File handle cache path: {sanitized_cache_path}") cache = history.PersistentCache(str(cache_path.resolve())) cache.clear_expired() return cache - def _file_handle_cache_get(self, key: str) -> str | None: + def _get_cached_file_handle(self, key: str) -> str | None: if self.cache is None: return None @@ -681,7 +831,7 @@ def _file_handle_cache_get(self, key: str) -> str | None: return self.cache.get(key) - def _file_handle_cache_set(self, key: str, value: str) -> None: + def _set_file_handle_cache(self, key: str, value: str) -> None: if self.cache is None: return @@ -693,9 +843,13 @@ def _file_handle_cache_set(self, key: str, value: str) -> None: class Uploader: def __init__( - self, upload_options: UploadOptions, emitter: EventEmitter | None = None + self, + upload_options: UploadOptions, + user_session: requests.Session | None = None, + emitter: EventEmitter | None = None, ): self.upload_options = upload_options + self.user_session = user_session if emitter is None: # An empty event emitter that does nothing self.emitter = EventEmitter() @@ -724,18 +878,27 @@ def upload_stream( self.emitter.emit("upload_start", progress) - upload_service = self._create_upload_service(session_key) - while True: try: - file_handle = self._upload_stream_retryable( - upload_service, fp, T.cast(UploaderProgress, progress) - ) - except Exception as ex: + if self.user_session is not None: + file_handle = self._upload_stream_retryable( + self.user_session, + fp, + session_key, + T.cast(UploaderProgress, progress), + ) + else: + with api_v4.create_user_session( + self.upload_options.user_items["user_upload_token"] + ) as user_session: + file_handle = self._upload_stream_retryable( + user_session, + fp, + session_key, + T.cast(UploaderProgress, progress), + ) + except BaseException as ex: # Include KeyboardInterrupt self._handle_upload_exception(ex, T.cast(UploaderProgress, progress)) - except BaseException as ex: - self.emitter.emit("upload_failed", progress) - raise ex else: break @@ -758,14 +921,17 @@ def finish_upload( if self.upload_options.dry_run or self.upload_options.nofinish: cluster_id = "0" else: - resp = api_v4.finish_upload( - self.upload_options.user_items["user_upload_token"], - file_handle, - cluster_filetype, - organization_id=self.upload_options.user_items.get( - "MAPOrganizationKey" - ), - ) + organization_id = self.upload_options.user_items.get("MAPOrganizationKey") + + with api_v4.create_user_session( + self.upload_options.user_items["user_upload_token"] + ) as user_session: + resp = api_v4.finish_upload( + user_session, + file_handle, + cluster_filetype, + organization_id=organization_id, + ) body = api_v4.jsonify_response(resp) # TODO: Validate cluster_id @@ -776,40 +942,41 @@ def finish_upload( return cluster_id - def _create_upload_service(self, session_key: str) -> upload_api_v4.UploadService: + def _create_upload_service( + self, user_session: requests.Session, session_key: str + ) -> upload_api_v4.UploadService: upload_service: upload_api_v4.UploadService if self.upload_options.dry_run: upload_path = os.getenv("MAPILLARY_UPLOAD_ENDPOINT") upload_service = upload_api_v4.FakeUploadService( - user_access_token=self.upload_options.user_items["user_upload_token"], - session_key=session_key, + user_session, + session_key, upload_path=Path(upload_path) if upload_path is not None else None, ) LOG.info( - "Dry run mode enabled. Data will be uploaded to %s", + "Dry-run mode enabled, uploading to %s", upload_service.upload_path.joinpath(session_key), ) else: - upload_service = upload_api_v4.UploadService( - user_access_token=self.upload_options.user_items["user_upload_token"], - session_key=session_key, - ) + upload_service = upload_api_v4.UploadService(user_session, session_key) return upload_service def _handle_upload_exception( - self, ex: Exception, progress: UploaderProgress + self, ex: BaseException, progress: UploaderProgress ) -> None: retries = progress.get("retries", 0) begin_offset = progress.get("begin_offset") offset = progress.get("offset") if retries <= constants.MAX_UPLOAD_RETRIES and _is_retriable_exception(ex): - self.emitter.emit("upload_interrupted", progress) + self.emitter.emit("upload_retrying", progress) + LOG.warning( - f"Error uploading at {offset=} since {begin_offset=}: {ex.__class__.__name__}: {ex}" + f"Error uploading {self._upload_name(progress)} at {offset=} since {begin_offset=}: {ex.__class__.__name__}: {ex}" ) + # Keep things immutable here. Will increment retries in the caller retries += 1 if _is_immediate_retriable_exception(ex): @@ -825,10 +992,25 @@ def _handle_upload_exception( self.emitter.emit("upload_failed", progress) raise ex + @classmethod + def _upload_name(cls, progress: UploaderProgress): + # Strictly speaking these sequence properties should not be exposed in this context + # TODO: Maybe move these logging statements to event handlers + sequence_uuid: str | None = T.cast( + T.Union[str, None], progress.get("sequence_uuid") + ) + import_path = T.cast(T.Union[str, None], progress.get("import_path")) + if sequence_uuid is not None: + if import_path is None: + name: str = f"sequence_{sequence_uuid}" + else: + name = f"sequence_{sequence_uuid}/{Path(import_path).name}" + else: + name = Path(import_path or "unknown").name + return name + def _chunk_with_progress_emitted( - self, - stream: T.IO[bytes], - progress: UploaderProgress, + self, stream: T.IO[bytes], progress: UploaderProgress ) -> T.Generator[bytes, None, None]: for chunk in upload_api_v4.UploadService.chunkize_byte_stream( stream, self.upload_options.chunk_size @@ -844,31 +1026,44 @@ def _chunk_with_progress_emitted( def _upload_stream_retryable( self, - upload_service: upload_api_v4.UploadService, + user_session: requests.Session, fp: T.IO[bytes], - progress: UploaderProgress, + session_key: str, + progress: UploaderProgress | None = None, ) -> str: """Upload the stream with safe retries guraranteed""" + if progress is None: + progress = T.cast(UploaderProgress, {}) + + upload_service = self._create_upload_service(user_session, session_key) + + if "entity_size" not in progress: + fp.seek(0, io.SEEK_END) + entity_size = fp.tell() + progress["entity_size"] = entity_size begin_offset = upload_service.fetch_offset() progress["begin_offset"] = begin_offset progress["offset"] = begin_offset + self.emitter.emit("upload_fetch_offset", progress) + + # Estimate the read timeout if not constants.MIN_UPLOAD_SPEED: read_timeout = None else: remaining_bytes = abs(progress["entity_size"] - begin_offset) read_timeout = max( - api_v4.REQUESTS_TIMEOUT, remaining_bytes / constants.MIN_UPLOAD_SPEED + api_v4.REQUESTS_TIMEOUT, + remaining_bytes / constants.MIN_UPLOAD_SPEED, ) - self.emitter.emit("upload_fetch_offset", progress) - + # Upload from begin_offset fp.seek(begin_offset, io.SEEK_SET) - shifted_chunks = self._chunk_with_progress_emitted(fp, progress) + # Start uploading return upload_service.upload_shifted_chunks( shifted_chunks, begin_offset, read_timeout=read_timeout ) @@ -884,7 +1079,7 @@ def _gen_session_key(self, fp: T.IO[bytes], progress: dict[str, T.Any]) -> str: filetype = progress.get("file_type") if filetype is not None: - session_key = _session_key(session_key, types.FileType(filetype)) + session_key = _suffix_session_key(session_key, types.FileType(filetype)) return session_key @@ -896,7 +1091,7 @@ def _validate_metadatas(metadatas: T.Sequence[types.ImageMetadata]): raise FileNotFoundError(f"No such file {metadata.filename}") -def _is_immediate_retriable_exception(ex: Exception) -> bool: +def _is_immediate_retriable_exception(ex: BaseException) -> bool: if ( isinstance(ex, requests.HTTPError) and isinstance(ex.response, requests.Response) @@ -912,7 +1107,7 @@ def _is_immediate_retriable_exception(ex: Exception) -> bool: return False -def _is_retriable_exception(ex: Exception) -> bool: +def _is_retriable_exception(ex: BaseException) -> bool: if isinstance(ex, (requests.ConnectionError, requests.Timeout)): return True @@ -931,22 +1126,29 @@ def _is_retriable_exception(ex: Exception) -> bool: return False -def _session_key( - upload_md5sum: str, filetype: api_v4.ClusterFileType | types.FileType +_SUFFIX_MAP: dict[api_v4.ClusterFileType | types.FileType, str] = { + api_v4.ClusterFileType.ZIP: ".zip", + api_v4.ClusterFileType.CAMM: ".mp4", + api_v4.ClusterFileType.BLACKVUE: ".mp4", + types.FileType.IMAGE: ".jpg", + types.FileType.ZIP: ".zip", + types.FileType.BLACKVUE: ".mp4", + types.FileType.CAMM: ".mp4", + types.FileType.GOPRO: ".mp4", + types.FileType.VIDEO: ".mp4", +} + + +def _suffix_session_key( + key: str, filetype: api_v4.ClusterFileType | types.FileType ) -> str: - _SUFFIX_MAP: dict[api_v4.ClusterFileType | types.FileType, str] = { - api_v4.ClusterFileType.ZIP: ".zip", - api_v4.ClusterFileType.CAMM: ".mp4", - api_v4.ClusterFileType.BLACKVUE: ".mp4", - types.FileType.IMAGE: ".jpg", - types.FileType.ZIP: ".zip", - types.FileType.BLACKVUE: ".mp4", - types.FileType.CAMM: ".mp4", - types.FileType.GOPRO: ".mp4", - types.FileType.VIDEO: ".mp4", - } + is_uuid_before = _is_uuid(key) + + key = f"mly_tools_{key}{_SUFFIX_MAP[filetype]}" + + assert _is_uuid(key) is is_uuid_before - return f"mly_tools_{upload_md5sum}{_SUFFIX_MAP[filetype]}" + return key def _prefixed_uuid4(): @@ -955,5 +1157,5 @@ def _prefixed_uuid4(): return prefixed -def _is_uuid(session_key: str) -> bool: - return session_key.startswith("uuid_") +def _is_uuid(key: str) -> bool: + return key.startswith("uuid_") or key.startswith("mly_tools_uuid_") diff --git a/tests/cli/upload_api_v4.py b/tests/cli/upload_api_v4.py index 3835bb06..e10b5a68 100644 --- a/tests/cli/upload_api_v4.py +++ b/tests/cli/upload_api_v4.py @@ -6,7 +6,7 @@ import requests import tqdm -from mapillary_tools import api_v4, authenticate +from mapillary_tools import api_v4, authenticate, http from mapillary_tools.upload_api_v4 import FakeUploadService, UploadService @@ -64,15 +64,16 @@ def main(): chunk_size = int(parsed.chunk_size * 1024 * 1024) user_access_token = user_items.get("user_upload_token", "") + session = api_v4.create_user_session(user_access_token) if parsed.dry_run: - service = FakeUploadService(user_access_token="", session_key=session_key) + service = FakeUploadService(session, session_key) else: - service = UploadService(user_access_token, session_key) + service = UploadService(session, session_key) try: initial_offset = service.fetch_offset() except requests.HTTPError as ex: - raise RuntimeError(api_v4.readable_http_error(ex)) + raise RuntimeError(http.readable_http_error(ex)) LOG.info("Session key: %s", session_key) LOG.info("Initial offset: %s", initial_offset) @@ -105,7 +106,7 @@ def _update_pbar(chunks, pbar): _update_pbar(shifted_chunks, pbar), initial_offset ) except requests.HTTPError as ex: - raise RuntimeError(api_v4.readable_http_error(ex)) + raise RuntimeError(http.readable_http_error(ex)) except KeyboardInterrupt: file_handle = None LOG.warning("Upload interrupted") @@ -113,7 +114,7 @@ def _update_pbar(chunks, pbar): try: final_offset = service.fetch_offset() except requests.HTTPError as ex: - raise RuntimeError(api_v4.readable_http_error(ex)) + raise RuntimeError(http.readable_http_error(ex)) LOG.info("Final offset: %s", final_offset) LOG.info("Entity size: %d", entity_size) diff --git a/tests/unit/test_upload_api_v4.py b/tests/unit/test_upload_api_v4.py index 88c2b415..a1a71448 100644 --- a/tests/unit/test_upload_api_v4.py +++ b/tests/unit/test_upload_api_v4.py @@ -8,7 +8,7 @@ def test_upload(tmpdir: py.path.local): upload_service = upload_api_v4.FakeUploadService( - user_access_token="TEST", + user_session=None, session_key="FOOBAR.txt", upload_path=Path(tmpdir), transient_error_ratio=0.02, @@ -26,7 +26,7 @@ def test_upload(tmpdir: py.path.local): def test_upload_big_chunksize(tmpdir: py.path.local): upload_service = upload_api_v4.FakeUploadService( - user_access_token="TEST", + user_session=None, session_key="FOOBAR.txt", upload_path=Path(tmpdir), transient_error_ratio=0.02, @@ -44,7 +44,7 @@ def test_upload_big_chunksize(tmpdir: py.path.local): def test_upload_chunks(tmpdir: py.path.local): upload_service = upload_api_v4.FakeUploadService( - user_access_token="TEST", + user_session=None, session_key="FOOBAR2.txt", upload_path=Path(tmpdir), transient_error_ratio=0.02,