diff --git a/CHANGELOG.md b/CHANGELOG.md index 12c5b7e..14b0343 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,46 +1,34 @@ # Change Log for Visual Studio Code - Offline Gallery and Updater -## [1.0.24] - 2023-06-05 - +## `1.0.24` - 2023-06-05 ### Fixed - - Improvements to requests session handling to prevent ConnectionErrors due to repeated connections. Thanks @tomer953 for reporting. ### Added - - Note about Firefox in Readme.md. Thanks @jmorcate for highlighting this gap. ### Changed - - Sort gallery listings with simple python sort. - Removed deprecated logzero dependency, switched to logging. Thanks @bdsoha for the implementation and note. -## [1.0.23] - 2022-11-09 - +## `1.0.23` - 2022-11-09 ### Fixed - - @forky2 resolved an issue related to incorrect version ordering (from reverse-alphanumberical to reverse-chronological), which prevented extensions updating correctly by vscode clients. -## [1.0.22] - 2022-10-31 - +## `1.0.22` - 2022-10-31 ### Added - - @maxtruxa added support for specifying docker container environment variable `SSLARGS` to control SSL arguments, or disable SSL by setting `BIND=0.0.0.0:80` and `SSLARGS=` (empty). ### Changed - - @Precioussheep improved consistency of the codebase, reducing bonus code and added typing. -## [1.0.21] - 2022-08-08 - +## `1.0.21` - 2022-08-08 ### Added - - @tomer953 added support for fetching a specified number of recommended extensions `--total-recommended`. - @Ebsan added support for fetching pre-release extensions `--prerelease-extensions` and fix fetching other extensions [#31](https://github.com/LOLINTERNETZ/vscodeoffline/issues/31). - @Ebsan added support for specifying which Visual Studio Code version to masquerade as when fetching extensions `--vscode-version`. ### Changed - - Merge dependabot suggestions for CI pipeline updates. - Utilise individual requests, rather than a Requests session, for fetching extensions to improve stability of fetch process. Should resolve [#33](https://github.com/LOLINTERNETZ/vscodeoffline/issues/33). Thanks @Ebsan for the fix and @annieherram for reporting. - Updated build-in certificate and key to update its expiry [#37](https://github.com/LOLINTERNETZ/vscodeoffline/issues/37). Included CA chain aswell. Thanks for reporting @Ebsan. @@ -48,84 +36,57 @@ - Split out this changelog. ### Fixed - - @tomer953 removed a duplicate flag to QueryFlags. - @Ebsan fixed an issue with downloading cross-platform extensions [#24](https://github.com/LOLINTERNETZ/vscodeoffline/issues/24). -## [1.0.20] - +## `1.0.20` ### Fixed - - Fixed an issue when downloading multiple versions of extensions. Thanks @forky2! -## [1.0.19] - +## `1.0.19` ### Fixed - - Lots of really solid bug fixes. Thank you to @fullylegit! Resilience improvements when fetching from marketplace. Thanks @forky2 and @ebsan. -## [1.0.18] - +## `1.0.18` ### Changed - - Meta release to trigger CI. -## [1.0.17] - +## `1.0.17` ### Changed - - CORS support for gallery. Thanks @kenyon! -## [1.0.16] - +## `1.0.16` ### Changed - - Support for saving sync logs to file. Thanks @ap0yuv! -## [1.0.16] - +## `1.0.16` ### Changed - - Improve extension stats handling. -## [1.0.14] - +## `1.0.14` ### Fixed - - Fixed insider builds being re-fetched. -## [1.0.13] - +## `1.0.13` ### Added - - Added initial support for extension version handling. Hopefully this resolves issue #4. -## [1.0.12] - +## `1.0.12` ### Fixed - - @ttutko fixed a bug preventing multiple build qualities (stable/insider) from being downloaded. Thanks @darkonejr for investigating and reporting. -## [1.0.11] - +## `1.0.11` ### Fixed - - Fixed bugs in Gallery sorting, and added timeouts for Sync. -## [1.0.10] - +## `1.0.10` ### Changed - - Refactored to improve consistency. -## [1.0.9] - +## `1.0.9` ### Added - - Added support for Remote Development, currently (2019-05-12) available to insiders. Refactored various badness. -## [1.0.8] - +## `1.0.8` ### Added - - Insiders support and extension packs (remotes). diff --git a/README.md b/README.md index f850e36..b7ce771 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,7 @@ On the non-Internet connected system: 1. On the non-Internet connected system, ensure the following DNS addresses are pointed toward the vscgallery service. * update.code.visualstudio.com - * az764295.vo.msecnd.net + * main.vscode-cdn.net * marketplace.visualstudio.com This may be achieved using a corporate DNS server, or by modifying a client's host file. @@ -90,7 +90,7 @@ This guide will setup the vscsync and vscgallery service on the same Docker host 2. Point the DNS addresses to the vscgallery service. * update.code.visualstudio.com - * az764295.vo.msecnd.net + * main.vscode-cdn.net * marketplace.visualstudio.com This may be achieved using a corporate DNS server, or by modifying a client's host file. @@ -116,50 +116,48 @@ These arguments can be passed as command line arguments to sync.py (e.g. --varA ### Possible Args: ``` -usage: sync.py [-h] [--sync] [--syncall] [--artifacts ARTIFACTDIR] - [--frequency FREQUENCY] [--check-binaries] [--check-insider] - [--check-recommended-extensions] [--check-specified-extensions] - [--extension-name EXTENSIONNAME] - [--extension-search EXTENSIONSEARCH] [--update-binaries] - [--update-extensions] [--update-malicious-extensions] - [--prerelease-extensions] [--vscode-version VSCODEVERSION] - [--skip-binaries] [--debug] [--logfile LOGFILE] +usage: sync.py [-h] [--sync] [--syncall] [--artifacts ARTIFACTDIR] [--frequency FREQUENCY] [--check-binaries] [--check-insider] [--check-recommended-extensions] [--check-specified-extensions] [--extension-name EXTENSIONNAME] [--extension-search EXTENSIONSEARCH] [--prerelease-extensions] + [--update-binaries] [--update-extensions] [--update-malicious-extensions] [--skip-binaries] [--vscode-version VERSION] [--total-recommended TOTALRECOMMENDED] [--debug] [--logfile LOGFILE] [--include-existing] [--skip-existing] [--garbage-collection] Synchronises VSCode in an Offline Environment -optional arguments: +options: -h, --help show this help message and exit - --sync The basic-user sync. It includes stable binaries and - typical extensions - --syncall The power-user sync. It includes all binaries and - extensions - --artifacts ARTIFACTDIR + --sync, -s The basic-user sync. It includes stable binaries and typical extensions + --syncall, -a The power-user sync. It includes all binaries and extensions + --artifacts ARTIFACTDIR, -d ARTIFACTDIR Path to downloaded artifacts - --frequency FREQUENCY - The frequency to try and update (e.g. sleep for '12h' - and try again - --total-recommended N - The number of recommended extensions to fetch - (default: 200) + --frequency FREQUENCY, -f FREQUENCY + The frequency to try and update (e.g. sleep for '12h' and try again) --check-binaries Check for updated binaries - --check-insider Check for updated insider binaries + --check-insider, -i Check for updated insider binaries --check-recommended-extensions Check for recommended extensions - --check-specified-extensions + --check-specified-extensions, -w Check for extensions in /specified.json - --extension-name EXTENSIONNAME + --extension-name EXTENSIONNAME, -n EXTENSIONNAME Find a specific extension by name --extension-search EXTENSIONSEARCH Search for a set of extensions - --update-binaries Download binaries - --update-extensions Download extensions - --update-malicious-extensions - Update the malicious extension list - --prerelease-extensions + --prerelease-extensions, -p Download prerelease extensions. Defaults to false. - --vscode-version + --update-binaries, -b + Download binaries + --update-extensions, -u + Download extensions + --update-malicious-extensions, -m + Update the malicious extension list + --skip-binaries, -B Skip downloading binaries + --vscode-version VERSION, -v VERSION VSCode version to search extensions as. - --skip-binaries Skip downloading binaries + --total-recommended TOTALRECOMMENDED + Total number of recommended extensions to sync from Search API. Defaults to 500 --debug Show debug output - --logfile LOGFILE Sets a logfile to store loggging output + --logfile LOGFILE, -l LOGFILE + Sets a logfile to store loggging output + --include-existing, -e + Include existing extensions in the update process + --skip-existing, -E Skip inclusion of existing extensions in the update process + --garbage-collection, -g + Remove old versions of artifacts (binaries / extensions) ``` diff --git a/docker-compose.yml b/docker-compose.yml index 08bdea9..8d5eec7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,3 @@ -version: '3' services: vscsync: diff --git a/vscoffline/sync.py b/vscoffline/sync.py index 41bbfc9..275908e 100644 --- a/vscoffline/sync.py +++ b/vscoffline/sync.py @@ -1,842 +1,1007 @@ -from dataclasses import dataclass -import os -import sys -import argparse -import requests -import pathlib -import hashlib -import uuid -import logging -import json -import time -import datetime -from typing import List -from platform import release -import logging as log -from pytimeparse.timeparse import timeparse -import vsc -from distutils.dir_util import create_tree -from requests.adapters import HTTPAdapter, Retry - - -class VSCUpdateDefinition(object): - - def __init__(self, platform=None, architecture=None, buildtype=None, quality=None, - updateurl=None, name=None, version=None, productVersion=None, - hashs=None, timestamp=None, sha256hash=None, supportsFastUpdate=None): - - if not vsc.Utility.validate_platform(platform): - raise ValueError(f"Platform {platform} invalid or not implemented") - - if not vsc.Utility.validate_architecture(architecture): - raise ValueError( - f"Architecture {architecture} invalid or not implemented") - - if not vsc.Utility.validate_buildtype(buildtype): - raise ValueError( - f"Buildtype {buildtype} invalid or not implemented") - - if not vsc.Utility.validate_quality(quality): - raise ValueError(f"Quality {quality} invalid or not implemented") - - self.identity = platform - - if architecture: - self.identity += f'-{architecture}' - if buildtype: - self.identity += f'-{buildtype}' - - self.platform = platform - self.architecture = architecture - self.buildtype = buildtype - self.quality = quality - self.updateurl = updateurl - self.name = name - self.version = version - self.productVersion = productVersion - self.hash = hashs - self.timestamp = timestamp - self.sha256hash = sha256hash - self.supportsFastUpdate = supportsFastUpdate - self.checkedForUpdate = False - - def check_for_update(self, old_commit_id=None): - if not old_commit_id: - # To trigger the API to delta - old_commit_id = '7c4205b5c6e52a53b81c69d2b2dc8a627abaa0ba' - - url = vsc.URL_BINUPDATES + \ - f"{self.identity}/{self.quality}/{old_commit_id}" - - log.debug(f'Update url {url}') - result = requests.get(url, allow_redirects=True, timeout=vsc.TIMEOUT) - self.checkedForUpdate = True - - if result.status_code == 204: - # No update available - return False - elif result.status_code != 200: - # Unhandled response from API - log.warning( - f"Update url failed {url}. Unhandled status code {result.status_code}") - return False - - jresult = result.json() - - self.updateurl = jresult['url'] - self.name = jresult['name'] - self.version = jresult['version'] - self.productVersion = jresult['productVersion'] - self.hash = jresult['hash'] - self.timestamp = jresult['timestamp'] - self.sha256hash = jresult['sha256hash'] - - if 'supportsFastUpdate' in jresult: - self.supportsFastUpdate = jresult['supportsFastUpdate'] - if self.updateurl: - return True - else: - return False - - def download_update(self, destination): - if not self.checkedForUpdate: - log.warning( - 'Cannot download update if the update definition has not been downloaded') - return - if not self.updateurl: - log.warning( - 'Cannot download update if there is no url to download from') - return - - destination = os.path.join(destination, self.identity, self.quality) - if not os.path.isdir(destination): - os.makedirs(destination) - suffix = pathlib.Path(self.updateurl).suffix - if '.gz' in suffix: - suffix = ''.join(pathlib.Path(self.updateurl).suffixes) - destfile = os.path.join(destination, f'vscode-{self.name}{suffix}') - - if os.path.exists(destfile) and vsc.Utility.hash_file_and_check(destfile, self.sha256hash): - log.debug(f'Previously downloaded {self}') - else: - log.info(f'Downloading {self} to {destfile}') - result = requests.get( - self.updateurl, allow_redirects=True, timeout=vsc.TIMEOUT) - open(destfile, 'wb').write(result.content) - - if not vsc.Utility.hash_file_and_check(destfile, self.sha256hash): - log.warning( - f'HASH MISMATCH for {self} at {destfile} expected {self.sha256hash}. Removing local file.') - os.remove(destfile) - return False - log.debug(f'Hash ok for {self} with {self.sha256hash}') - return True - - def save_state(self, destination): - destination = os.path.join(destination, self.identity) - if not os.path.isdir(destination): - os.makedirs(destination) - # Write version details blob as latest - vsc.Utility.write_json(os.path.join( - destination, self.quality, 'latest.json'), self) - # Write version details blob as the commit id - if self.version: - vsc.Utility.write_json(os.path.join( - destination, self.quality, f'{self.version}.json'), self) - - def __repr__(self): - strs = f"<{self.__class__.__name__}> {self.quality}/{self.identity}" - if self.updateurl: - strs += f" - Version: {self.name} ({self.version})" - elif self.checkedForUpdate: - strs += f" - Latest version not available" - return strs - - -@dataclass -class File: - assetType: str - source: str - - @staticmethod - def from_dict(obj) -> 'File': - _assetType = str(obj.get("assetType")) - _source = str(obj.get("source")) - return File(_assetType, _source) - - -@dataclass -class Property: - key: str - value: str - - @staticmethod - def from_dict(obj) -> 'Property': - _key = str(obj.get("key")) - _value = str(obj.get("value")) - return Property(_key, _value) - - -@dataclass -class VSCExtensionVersionDefinition: - - version: str - flags: str - lastUpdated: str - files: List[File] - properties: List[Property] - assetUri: str - fallbackAssetUri: str - - @staticmethod - def from_dict(obj) -> 'VSCExtensionVersionDefinition': - _version = str(obj.get("version")) - _flags = str(obj.get("flags")) - _lastUpdated = str(obj.get("lastUpdated")) - _files = [File.from_dict(y) for y in obj.get("files")] - _properties = [Property.from_dict(y) for y in obj.get("properties")] if obj.get( - "properties") else [] # older versions do not have properties so we need to set to empty array - _assetUri = str(obj.get("assetUri")) - _fallbackAssetUri = str(obj.get("fallbackAssetUri")) - return VSCExtensionVersionDefinition(_version, _flags, _lastUpdated, _files, _properties, _assetUri, _fallbackAssetUri) - - def isprerelease(self): - prerelease = False - for property in self.properties: - # if property["key"] == "Microsoft.VisualStudio.Code.PreRelease" and property["value"] == "true": - if property.key == "Microsoft.VisualStudio.Code.PreRelease" and property.value == "true": - prerelease = True - return prerelease - - def __repr__(self): - strs = f"<{self.__class__.__name__}> {self.version} ({self.lastUpdate}) - Version: {self.version}" - return strs - - -class VSCExtensionDefinition(object): - - def __init__(self, identity, raw=None): - self.identity = identity - self.extensionId = None - self.recommended = False - self.versions: List[VSCExtensionVersionDefinition] = [] - if raw: - self.__dict__.update(raw) - if 'extensionId' in raw: - self.extensionId = raw['extensionId'] - - def download_assets(self, destination, session): - for version in self.versions: - targetplatform = '' - if "targetPlatform" in version: - targetplatform = version["targetPlatform"] - ver_destination = os.path.join( - destination, self.identity, version["version"], targetplatform) - for file in version["files"]: - url = file["source"] - if not url: - log.warning( - 'download_asset() cannot download update as asset url is missing') - return - asset = file["assetType"] - destfile = os.path.join(ver_destination, f'{asset}') - create_tree(os.path.abspath(os.sep), (destfile,)) - if not os.path.exists(destfile): - for i in range(5): - try: - if i == 0: - log.debug(f'Downloading {self.identity} {asset} to {destfile}') - else: - log.info(f'Retrying {i+1}, download {self.identity} {asset} to {destfile}') - result = session.get( - url, allow_redirects=True, timeout=vsc.TIMEOUT) - with open(destfile, 'wb') as dest: - dest.write(result.content) - break - except requests.exceptions.ProxyError: - log.info("ProxyError: Retrying.") - except requests.exceptions.ReadTimeout: - log.info("ReadTimeout: Retrying.") - - def process_embedded_extensions(self, destination, mp): - """ - Check an extension's Manifest for an extension pack (e.g. more extensions to download) - """ - bonusextensions = [] - for version in self.versions: - targetplatform = '' - if "targetPlatform" in version: - targetplatform = version["targetPlatform"] - manifestpath = os.path.join( - destination, self.identity, version["version"], targetplatform, 'Microsoft.VisualStudio.Code.Manifest') - manifest = vsc.Utility.load_json(manifestpath) - if manifest and 'extensionPack' in manifest: - for extname in manifest['extensionPack']: - bonusextension = mp.search_by_extension_name(extname) - if bonusextension: - bonusextensions.append(bonusextension) - return bonusextensions - - def save_state(self, destination): - destination = os.path.join(destination, self.identity) - if not os.path.isdir(destination): - os.makedirs(destination) - # Save as latest - with open(os.path.join(destination, 'latest.json'), 'w') as outfile: - json.dump(self, outfile, cls=vsc.MagicJsonEncoder, indent=4) - # Save in the version folder - for version in self.versions: - with open(os.path.join(destination, version["version"], 'extension.json'), 'w') as outfile: - json.dump(self, outfile, cls=vsc.MagicJsonEncoder, indent=4) - - def isprerelease(self): - prerelease = False - if "properties" in self.versions[0].keys(): - for property in self.versions[0]["properties"]: - if property["key"] == "Microsoft.VisualStudio.Code.PreRelease" and property["value"] == "true": - prerelease = True - return prerelease - - def get_latest_release_versions(self): - if self.versions and len(self.versions) > 1: - releaseVersions = list(filter(lambda x: VSCExtensionVersionDefinition.from_dict( - x).isprerelease() == False, self.versions)) - if(len(releaseVersions) > 0): - releaseVersions.sort( - reverse=True, key=lambda x: x["lastUpdated"]) - latestversion = releaseVersions[0]["version"] - - filteredversions = [] - for version in releaseVersions: - if version["version"] == latestversion: - filteredversions.append(version) - - return filteredversions - return self.versions - - def version(self): - if self.versions and len(self.versions) > 1: - return ";".join(list(map(lambda x: x['version'], self.versions))) - return self.versions[0]['version'] - - def set_recommended(self): - self.recommended = True - - def __repr__(self): - strs = f"<{self.__class__.__name__}> {self.identity} ({self.extensionId}) - Version: {self.version()}" - return strs - - -class VSCUpdates(object): - - @staticmethod - def latest_versions(insider=False): - versions = {} - for platform in vsc.PLATFORMS: - for architecture in vsc.ARCHITECTURES: - for buildtype in vsc.BUILDTYPES: - for quality in vsc.QUALITIES: - if quality == 'insider' and not insider: - continue - if platform == 'win32' and architecture == 'ia32': - continue - if platform == 'darwin' and (architecture != '' or buildtype != ''): - continue - if 'linux' in platform and (architecture == '' or buildtype != ''): - continue - ver = VSCUpdateDefinition( - platform, architecture, buildtype, quality) - ver.check_for_update() - log.info(ver) - versions[f'{ver.identity}-{ver.quality}'] = ver - return versions - - @staticmethod - def signal_updated(artifactdir): - signalpath = os.path.join(artifactdir, 'updated.json') - result = { - 'updated': datetime.datetime.utcnow() - } - with open(signalpath, 'w') as outfile: - json.dump(result, outfile, cls=vsc.MagicJsonEncoder, indent=4) - - -class VSCMarketplace(object): - - def __init__(self, insider, prerelease, version, session): - self.insider = insider - self.prerelease = prerelease - self.version = version - self.session = session - self.backoff = 1 - - def get_recommendations(self, destination, totalrecommended): - recommendations = self.search_top_n(totalrecommended) - recommended_old = self.get_recommendations_old(destination) - - for extension in recommendations: - # If the extension has already been found then prevent it from being collected again when processing the old recommendation list - if extension.identity in recommended_old.keys(): - del recommended_old[extension.identity] - - for packagename in recommended_old: - extension = self.search_by_extension_name(packagename) - if extension: - recommendations.append(extension) - else: - log.debug( - f'get_recommendations failed finding a recommended extension by name for {packagename}. This extension has likely been removed.') - - prereleasecount = 0 - for recommendation in recommendations: - recommendation.set_recommended() - # If the found extension is a prerelease version search for the next available release version - if not self.prerelease and recommendation.isprerelease(): - prereleasecount += 1 - extension = self.search_release_by_extension_id( - recommendation.extensionId) - if extension: - recommendation.versions = extension.get_latest_release_versions() - return recommendations - - def get_recommendations_old(self, destination): - result = self.session.get(vsc.URL_RECOMMENDATIONS, - allow_redirects=True, timeout=vsc.TIMEOUT) - if result.status_code != 200: - log.warning( - f"get_recommendations failed accessing url {vsc.URL_RECOMMENDATIONS}, unhandled status code {result.status_code}") - return False - - jresult = result.json() - with open(os.path.join(destination, 'recommendations.json'), 'w') as outfile: - json.dump(jresult, outfile, cls=vsc.MagicJsonEncoder, indent=4) - - # To dict to remove duplicates - packages = {} - for recommendation in jresult['workspaceRecommendations']: - for package in recommendation['recommendations']: - packages[package] = None - - return packages - - def get_malicious(self, destination, extensions=None): - result = self.session.get( - vsc.URL_MALICIOUS, allow_redirects=True, timeout=vsc.TIMEOUT) - if result.status_code != 200: - log.warning( - f"get_malicious failed accessing url {vsc.URL_MALICIOUS}, unhandled status code {result.status_code}") - return False - # Remove random utf-8 nbsp from server response - stripped = result.content.decode( - 'utf-8', 'ignore').replace(u'\xa0', u'') - jresult = json.loads(stripped) - with open(os.path.join(destination, 'malicious.json'), 'w') as outfile: - json.dump(jresult, outfile, cls=vsc.MagicJsonEncoder, indent=4) - - if not extensions: - return - - for malicious in jresult['malicious']: - log.debug(f'Malicious extension {malicious}') - if malicious in extensions.keys(): - log.warning( - f'Preventing malicious extension {malicious} from being downloaded') - del extensions[malicious] - - def get_specified(self, specifiedpath): - if not os.path.exists(specifiedpath): - result = { - 'extensions': [] - } - with open(specifiedpath, 'w') as outfile: - json.dump(result, outfile, cls=vsc.MagicJsonEncoder, indent=4) - log.info( - f'Created empty list of custom extensions to mirror at {specifiedpath}') - return - else: - with open(specifiedpath, 'r') as fp: - specifiedextensions = json.load(fp) - if specifiedextensions and 'extensions' in specifiedextensions: - specified = [] - for packagename in specifiedextensions['extensions']: - extension = self.search_by_extension_name(packagename) - if extension: - log.info(f'Adding extension to mirror {packagename}') - specified.append(extension) - else: - log.debug( - f'get_custom failed finding a recommended extension by name for {packagename}. This extension has likely been removed.') - return specified - - def search_by_text(self, searchtext): - if searchtext == '*': - searchtext = '' - - return self._query_marketplace(vsc.FilterType.SearchText, searchtext) - - def search_top_n(self, n=200): - log.info(f'Searching for top {n} recommended extensions') - return self._query_marketplace(vsc.FilterType.SearchText, '', limit=n, sortOrder=vsc.SortOrder.Descending, sortBy=vsc.SortBy.InstallCount) - - def search_by_extension_id(self, extensionid): - result = self._query_marketplace( - vsc.FilterType.ExtensionId, extensionid) - if result and len(result) == 1: - return result[0] - else: - log.warning(f"search_by_extension_id failed {extensionid}") - return False - - def search_by_extension_name(self, extensionname): - if self.prerelease: - result = self._query_marketplace( - vsc.FilterType.ExtensionName, extensionname) - else: - releaseQueryFlags = vsc.QueryFlags.IncludeFiles | vsc.QueryFlags.IncludeVersionProperties | vsc.QueryFlags.IncludeAssetUri | \ - vsc.QueryFlags.IncludeStatistics | vsc.QueryFlags.IncludeStatistics | vsc.QueryFlags.IncludeVersions - result = self._query_marketplace( - vsc.FilterType.ExtensionName, extensionname, queryFlags=releaseQueryFlags) - if result and len(result) == 1: - result[0].versions = result[0].get_latest_release_versions() - - if result and len(result) == 1: - return result[0] - else: - #log.debug(f"search_by_extension_name failed {extensionname} got {result}") - return False - - def search_release_by_extension_id(self, extensionid): - log.debug( - f'Searching for release candidate by extensionId: {extensionid}') - releaseQueryFlags = vsc.QueryFlags.IncludeFiles | vsc.QueryFlags.IncludeVersionProperties | vsc.QueryFlags.IncludeAssetUri | \ - vsc.QueryFlags.IncludeStatistics | vsc.QueryFlags.IncludeStatistics | vsc.QueryFlags.IncludeVersions - result = self._query_marketplace( - vsc.FilterType.ExtensionId, extensionid, queryFlags=releaseQueryFlags) - if result and len(result) == 1: - return result[0] - else: - log.warning(f"search_release_by_extension_id failed {extensionid}") - return False - - def backoff_reset(self): - self.backoff = 1 - - def backoff_sleep(self): - time.sleep(self.backoff) - self.backoff *= 2 - - def _query_marketplace(self, filtertype, filtervalue, pageNumber=0, pageSize=500, limit=0, sortOrder=vsc.SortOrder.Default, sortBy=vsc.SortBy.NoneOrRelevance, queryFlags=0): - extensions = {} - total = 0 - count = 0 - - if 0 < limit < pageSize: - pageSize = limit - - while count <= total: - # log.debug(f'Query marketplace count {count} / total {total} - pagenumber {pageNumber}, pagesize {pageSize}') - pageNumber = pageNumber + 1 - query = self._query(filtertype, filtervalue, - pageNumber, pageSize, queryFlags) - result = None - i = 0 - while i < 10: - try: - result = self.session.post(vsc.URL_MARKETPLACEQUERY, headers=self._headers( - ), json=query, allow_redirects=True, timeout=vsc.TIMEOUT) - if result: - self.backoff_reset() - break - elif result.status_code == 429: - # Server is rate limiting us. Backoff. - self.backoff_sleep() - continue - except requests.exceptions.ProxyError: - log.info("ProxyError: Retrying.") - except requests.exceptions.ReadTimeout: - log.info("ReadTimeout: Retrying.") - i += 1 - if i < 10: - log.info("Retrying pull page %d attempt %d." % - (pageNumber, i+1)) - if not result: - log.info("Failed 10 attempts to query marketplace. Giving up.") - break - jresult = result.json() - count = count + pageSize - if 'results' in jresult: - for jres in jresult['results']: - for extension in jres['extensions']: - identity = extension['publisher']['publisherName'] + \ - '.' + extension['extensionName'] - mpd = VSCExtensionDefinition( - identity=identity, raw=extension) - extensions[identity] = mpd - - if 'resultMetadata' in jres: - for resmd in jres['resultMetadata']: - if 'ResultCount' in resmd['metadataType']: - total = resmd['metadataItems'][0]['count'] - if limit > 0 and count >= limit: - break - - return list(extensions.values()) - - def _query(self, filtertype, filtervalue, pageNumber, pageSize, queryFlags=0): - if queryFlags == 0: - queryFlags = self._query_flags() - payload = { - 'assetTypes': [], - 'filters': [self._query_filter(filtertype, filtervalue, pageNumber, pageSize)], - 'flags': int(queryFlags) - } - return payload - - def _query_filter(self, filtertype, filtervalue, pageNumber, pageSize): - result = { - 'pageNumber': pageNumber, - 'pageSize': pageSize, - 'sortBy': vsc.SortBy.NoneOrRelevance, - 'sortOrder': vsc.SortOrder.Default, - 'criteria': [ - self._query_filter_criteria( - vsc.FilterType.Target, 'Microsoft.VisualStudio.Code'), - self._query_filter_criteria( - vsc.FilterType.ExcludeWithFlags, str(int(vsc.QueryFlags.Unpublished))) - ] - } - - if filtervalue != '': - result['criteria'].append( - self._query_filter_criteria(filtertype, filtervalue) - ) - - return result - - def _query_filter_criteria(self, filtertype, queryvalue): - return { - 'filterType': int(filtertype), - 'value': queryvalue - } - - def _query_flags(self): - # return QueryFlags(914) - return vsc.QueryFlags.IncludeFiles | vsc.QueryFlags.IncludeVersionProperties | vsc.QueryFlags.IncludeAssetUri | \ - vsc.QueryFlags.IncludeStatistics | vsc.QueryFlags.IncludeLatestVersionOnly - - def _headers(self): - if self.insider: - insider = '-insider' - else: - insider = '' - return { - 'content-type': 'application/json', - 'accept': 'application/json;api-version=3.0-preview.1', - 'accept-encoding': 'gzip, deflate, br', - 'User-Agent': f'VSCode {self.version}{insider}', - 'x-market-client-Id': f'VSCode {self.version}{insider}', - 'x-market-user-Id': str(uuid.uuid4()) - } - - def __repr__(self): - strs = f"<{self.__class__.__name__}>" - return strs - - -if __name__ == '__main__': - parser = argparse.ArgumentParser( - description='Synchronises VSCode in an Offline Environment') - parser.add_argument('--sync', dest='sync', action='store_true', - help='The basic-user sync. It includes stable binaries and typical extensions') - parser.add_argument('--syncall', dest='syncall', action='store_true', - help='The power-user sync. It includes all binaries and extensions ') - parser.add_argument('--artifacts', dest='artifactdir', - default='../artifacts/', help='Path to downloaded artifacts') - parser.add_argument('--frequency', dest='frequency', default=None, - help='The frequency to try and update (e.g. sleep for \'12h\' and try again') - - # Arguments to tweak behaviour - parser.add_argument('--check-binaries', dest='checkbinaries', - action='store_true', help='Check for updated binaries') - parser.add_argument('--check-insider', dest='checkinsider', - action='store_true', help='Check for updated insider binaries') - parser.add_argument('--check-recommended-extensions', dest='checkextensions', - action='store_true', help='Check for recommended extensions') - parser.add_argument('--check-specified-extensions', dest='checkspecified', - action='store_true', help='Check for extensions in /specified.json') - parser.add_argument('--extension-name', dest='extensionname', - help='Find a specific extension by name') - parser.add_argument('--extension-search', dest='extensionsearch', - help='Search for a set of extensions') - parser.add_argument('--prerelease-extensions', dest='prerelease', - action='store_true', help='Download prerelease extensions. Defaults to false.') - parser.add_argument('--update-binaries', dest='updatebinaries', - action='store_true', help='Download binaries') - parser.add_argument('--update-extensions', dest='updateextensions', - action='store_true', help='Download extensions') - parser.add_argument('--update-malicious-extensions', dest='updatemalicious', - action='store_true', help='Update the malicious extension list') - parser.add_argument('--skip-binaries', dest='skipbinaries', - action='store_true', help='Skip downloading binaries') - parser.add_argument('--vscode-version', dest='version', - default='1.69.2', help='VSCode version to search extensions as.') - parser.add_argument('--total-recommended', type=int, dest='totalrecommended', default=500, - help='Total number of recommended extensions to sync from Search API. Defaults to 500') - parser.add_argument('--debug', dest='debug', - action='store_true', help='Show debug output') - parser.add_argument('--logfile', dest='logfile', default=None, - help='Sets a logfile to store loggging output') - config = parser.parse_args() - - if config.debug: - loglevel = logging.DEBUG - else: - loglevel = logging.INFO - - if config.logfile: - log_dir = os.path.dirname(os.path.abspath(config.logfile)) - if not os.path.exists(log_dir): - raise FileNotFoundError( - f'Log directory does not exist at {log_dir}') - logging.basicConfig(filename=config.logfile, encoding='utf-8', level=loglevel) - else: - log.basicConfig( - format='[%(levelname)1.1s %(asctime)s %(module)s:%(lineno)d] %(message)s', - datefmt='%y%m%d %H:%M:%S', - level=loglevel - ) - - config.artifactdir_installers = os.path.join( - os.path.abspath(config.artifactdir), 'installers') - config.artifactdir_extensions = os.path.join( - os.path.abspath(config.artifactdir), 'extensions') - - if config.sync or config.syncall: - config.checkbinaries = True - config.checkextensions = True - config.updatebinaries = True - config.updateextensions = True - config.updatemalicious = True - config.checkspecified = True - if not config.frequency: - config.frequency = '12h' - - if config.syncall: - config.extensionsearch = '*' - config.checkinsider = True - - if config.artifactdir: - if not os.path.isdir(config.artifactdir): - raise FileNotFoundError( - f'Artifact directory does not exist at {config.artifactdir}') - - if config.updatebinaries and not config.checkbinaries: - config.checkbinaries = True - - if config.frequency: - config.frequency = timeparse(config.frequency) - - session = requests.Session() - retries = Retry(total=5, - backoff_factor=0.1, - status_forcelist=[ 500, 502, 503, 504 ]) - session.mount('https://', HTTPAdapter(max_retries=retries)) - - while True: - versions = [] - extensions = {} - mp = VSCMarketplace(config.checkinsider, - config.prerelease, config.version, session) - - if config.checkbinaries and not config.skipbinaries: - log.info('Syncing VS Code Update Versions') - versions = VSCUpdates.latest_versions(config.checkinsider) - - if config.updatebinaries and not config.skipbinaries: - log.info('Syncing VS Code Binaries') - for idkey in versions: - if versions[idkey].updateurl: - result = versions[idkey].download_update( - config.artifactdir_installers) - - # Only save the reference json if the download was successful - if result: - versions[idkey].save_state( - config.artifactdir_installers) - - if config.checkspecified: - log.info('Syncing VS Code Specified Extensions') - specifiedpath = os.path.join(os.path.abspath( - config.artifactdir), 'specified.json') - specified = mp.get_specified(specifiedpath) - if specified: - for item in specified: - log.info(item) - extensions[item.identity] = item - - if config.extensionsearch: - log.info( - f'Searching for VS Code Extension: {config.extensionsearch}') - results = mp.search_by_text(config.extensionsearch) - log.info(f'Found {len(results)} extensions') - for item in results: - log.debug(item) - extensions[item.identity] = item - - if config.extensionname: - log.info( - f'Checking Specific VS Code Extension: {config.extensionname}') - result = mp.search_by_extension_name(config.extensionname) - if result: - extensions[result.identity] = result - - if config.checkextensions: - log.info('Syncing VS Code Recommended Extensions') - recommended = mp.get_recommendations(os.path.abspath( - config.artifactdir), config.totalrecommended) - for item in recommended: - extensions[item.identity] = item - - if config.updatemalicious: - log.info('Syncing VS Code Malicious Extension List') - malicious = mp.get_malicious( - os.path.abspath(config.artifactdir), extensions) - - if config.updateextensions: - log.info( - f'Checking and Downloading Updates for {len(extensions)} Extensions') - count = 0 - bonus = [] - for identity in extensions: - log.debug(f'Fetching extension: {identity}') - if count % 100 == 0: - log.info( - f'Progress {count}/{len(extensions)} ({count/len(extensions)*100:.1f}%)') - extensions[identity].download_assets( - config.artifactdir_extensions, session) - bonus = extensions[identity].process_embedded_extensions( - config.artifactdir_extensions, mp) + bonus - extensions[identity].save_state(config.artifactdir_extensions) - count = count + 1 - - for bonusextension in bonus: - log.debug(f'Processing Embedded Extension: {bonusextension}') - bonusextension.download_assets(config.artifactdir_extensions, session) - bonusextension.save_state(config.artifactdir_extensions) - - # Check if we did anything - if config.checkbinaries or config.checkextensions or config.updatebinaries or config.updateextensions or config.updatemalicious or config.checkspecified or config.checkinsider: - log.info('Complete') - VSCUpdates.signal_updated(os.path.abspath(config.artifactdir)) - - # Check if we need to sleep - if config.frequency: - log.info( - f'Going to sleep for {vsc.Utility.seconds_to_human_time(config.frequency)}') - time.sleep(config.frequency) - else: - break - else: - log.info('Nothing to do') - break - +from dataclasses import dataclass +import glob +import os +import sys +import re +import argparse +import requests +import pathlib +import uuid +import logging +import json +import time +import datetime +from typing import List +from platform import release +import logging as log +from pytimeparse.timeparse import timeparse +import vsc +from distutils.dir_util import create_tree +from requests.adapters import HTTPAdapter, Retry +from packaging.version import Version + + +class VSCUpdateDefinition(object): + + def __init__(self, platform=None, architecture=None, buildtype=None, quality=None, + updateurl=None, name=None, version=None, productVersion=None, + hashs=None, timestamp=None, sha256hash=None, supportsFastUpdate=None): + + if not vsc.Utility.validate_platform(platform): + raise ValueError(f"Platform {platform} invalid or not implemented") + + if not vsc.Utility.validate_architecture(architecture): + raise ValueError( + f"Architecture {architecture} invalid or not implemented") + + if not vsc.Utility.validate_buildtype(buildtype): + raise ValueError( + f"Buildtype {buildtype} invalid or not implemented") + + if not vsc.Utility.validate_quality(quality): + raise ValueError(f"Quality {quality} invalid or not implemented") + + self.identity = platform + + if architecture: + self.identity += f'-{architecture}' + if buildtype: + self.identity += f'-{buildtype}' + + self.platform = platform + self.architecture = architecture + self.buildtype = buildtype + self.quality = quality + self.updateurl = updateurl + self.name = name + self.version = version + self.productVersion = productVersion + self.hash = hashs + self.timestamp = timestamp + self.sha256hash = sha256hash + self.supportsFastUpdate = supportsFastUpdate + self.checkedForUpdate = False + + def check_for_update(self, old_commit_id=None): + if not old_commit_id: + # To trigger the API to delta + old_commit_id = '7c4205b5c6e52a53b81c69d2b2dc8a627abaa0ba' + + url = vsc.URL_BINUPDATES + \ + f"{self.identity}/{self.quality}/{old_commit_id}" + + log.debug(f'Update url {url}') + result = requests.get(url, allow_redirects=True, timeout=vsc.TIMEOUT) + self.checkedForUpdate = True + + if result.status_code == 204: + # No update available + return False + elif result.status_code != 200: + # Unhandled response from API + log.warning( + f"Update url failed {url}. Unhandled status code {result.status_code}") + return False + + jresult = result.json() + + self.updateurl = jresult['url'] + self.name = jresult['name'] + self.version = jresult['version'] + self.productVersion = jresult['productVersion'] + self.hash = jresult['hash'] + self.timestamp = jresult['timestamp'] + self.sha256hash = jresult['sha256hash'] + + if 'supportsFastUpdate' in jresult: + self.supportsFastUpdate = jresult['supportsFastUpdate'] + if self.updateurl: + return True + else: + return False + + def download_update(self, destination): + if not self.checkedForUpdate: + log.warning( + 'Cannot download update if the update definition has not been downloaded') + return + if not self.updateurl: + log.warning( + 'Cannot download update if there is no url to download from') + return + + destination = os.path.join(destination, self.identity, self.quality) + if not os.path.isdir(destination): + os.makedirs(destination) + suffix = pathlib.Path(self.updateurl).suffix + if '.gz' in suffix: + suffix = ''.join(pathlib.Path(self.updateurl).suffixes) + destfile = os.path.join(destination, f'vscode-{self.name}{suffix}') + + if os.path.exists(destfile) and vsc.Utility.hash_file_and_check(destfile, self.sha256hash): + log.debug(f'Previously downloaded {self}') + else: + log.info(f'Downloading {self} to {destfile}') + result = requests.get( + self.updateurl, allow_redirects=True, timeout=vsc.TIMEOUT) + open(destfile, 'wb').write(result.content) + + if not vsc.Utility.hash_file_and_check(destfile, self.sha256hash): + log.warning( + f'HASH MISMATCH for {self} at {destfile} expected {self.sha256hash}. Removing local file.') + os.remove(destfile) + return False + log.debug(f'Hash ok for {self} with {self.sha256hash}') + return True + + def save_state(self, destination): + destination = os.path.join(destination, self.identity) + if not os.path.isdir(destination): + os.makedirs(destination) + # Write version details blob as latest + vsc.Utility.write_json(os.path.join( + destination, self.quality, 'latest.json'), self) + # Write version details blob as the commit id + if self.version: + vsc.Utility.write_json(os.path.join( + destination, self.quality, f'{self.version}.json'), self) + + def __repr__(self): + strs = f"<{self.__class__.__name__}> {self.quality}/{self.identity}" + if self.updateurl: + strs += f" - Version: {self.name} ({self.version})" + elif self.checkedForUpdate: + strs += f" - Latest version not available" + return strs + + +@dataclass +class File: + assetType: str + source: str + + @staticmethod + def from_dict(obj) -> 'File': + _assetType = str(obj.get("assetType")) + _source = str(obj.get("source")) + return File(_assetType, _source) + + +@dataclass +class Property: + key: str + value: str + + @staticmethod + def from_dict(obj) -> 'Property': + _key = str(obj.get("key")) + _value = str(obj.get("value")) + return Property(_key, _value) + + +@dataclass +class VSCExtensionVersionDefinition: + + version: str + flags: str + lastUpdated: str + files: List[File] + properties: List[Property] + assetUri: str + fallbackAssetUri: str + + @staticmethod + def from_dict(obj) -> 'VSCExtensionVersionDefinition': + _version = str(obj.get("version")) + _flags = str(obj.get("flags")) + _lastUpdated = str(obj.get("lastUpdated")) + _files = [File.from_dict(y) for y in obj.get("files")] + _properties = [Property.from_dict(y) for y in obj.get("properties")] if obj.get( + "properties") else [] # older versions do not have properties so we need to set to empty array + _assetUri = str(obj.get("assetUri")) + _fallbackAssetUri = str(obj.get("fallbackAssetUri")) + return VSCExtensionVersionDefinition(_version, _flags, _lastUpdated, _files, _properties, _assetUri, _fallbackAssetUri) + + def isprerelease(self): + prerelease = False + for property in self.properties: + # if property["key"] == "Microsoft.VisualStudio.Code.PreRelease" and property["value"] == "true": + if property.key == "Microsoft.VisualStudio.Code.PreRelease" and property.value == "true": + prerelease = True + return prerelease + + def __repr__(self): + strs = f"<{self.__class__.__name__}> {self.version} ({self.lastUpdate}) - Version: {self.version}" + return strs + + +class VSCExtensionDefinition(object): + + def __init__(self, identity, raw=None): + self.identity = identity + self.extensionId = None + self.recommended = False + self.versions: List[VSCExtensionVersionDefinition] = [] + if raw: + self.__dict__.update(raw) + if 'extensionId' in raw: + self.extensionId = raw['extensionId'] + + def download_assets(self, destination, session): + for version in self.versions: + targetplatform = '' + if "targetPlatform" in version: + targetplatform = version["targetPlatform"] + ver_destination = os.path.join( + destination, self.identity, version["version"], targetplatform) + for file in version["files"]: + url = file["source"] + if not url: + log.warning( + 'download_asset() cannot download update as asset url is missing') + return + asset = file["assetType"] + destfile = os.path.join(ver_destination, f'{asset}') + create_tree(os.path.abspath(os.sep), (destfile,)) + if not os.path.exists(destfile): + for i in range(5): + try: + if i == 0: + log.debug(f'Downloading {self.identity} {asset} to {destfile}') + else: + log.info(f'Retrying {i+1}, download {self.identity} {asset} to {destfile}') + result = session.get( + url, allow_redirects=True, timeout=vsc.TIMEOUT) + with open(destfile, 'wb') as dest: + dest.write(result.content) + break + except requests.exceptions.ProxyError: + log.info("ProxyError: Retrying.") + except requests.exceptions.ReadTimeout: + log.info("ReadTimeout: Retrying.") + + def process_embedded_extensions(self, destination, mp): + """ + Check an extension's Manifest for an extension pack (e.g. more extensions to download) + """ + bonusextensions = [] + for version in self.versions: + targetplatform = '' + if "targetPlatform" in version: + targetplatform = version["targetPlatform"] + manifestpath = os.path.join( + destination, self.identity, version["version"], targetplatform, 'Microsoft.VisualStudio.Code.Manifest') + manifest = vsc.Utility.load_json(manifestpath) + if manifest and 'extensionPack' in manifest: + for extname in manifest['extensionPack']: + bonusextension = mp.search_by_extension_name(extname) + if bonusextension: + bonusextensions.append(bonusextension) + return bonusextensions + + def save_state(self, destination): + destination = os.path.join(destination, self.identity) + if not os.path.isdir(destination): + os.makedirs(destination) + # Save as latest + with open(os.path.join(destination, 'latest.json'), 'w') as outfile: + json.dump(self, outfile, cls=vsc.MagicJsonEncoder, indent=4) + # Save in the version folder + for version in self.versions: + with open(os.path.join(destination, version["version"], 'extension.json'), 'w') as outfile: + json.dump(self, outfile, cls=vsc.MagicJsonEncoder, indent=4) + + def isprerelease(self): + prerelease = False + if "properties" in self.versions[0].keys(): + for property in self.versions[0]["properties"]: + if property["key"] == "Microsoft.VisualStudio.Code.PreRelease" and property["value"] == "true": + prerelease = True + return prerelease + + def get_latest_release_versions(self): + if self.versions and len(self.versions) > 1: + releaseVersions = list(filter(lambda x: VSCExtensionVersionDefinition.from_dict( + x).isprerelease() == False, self.versions)) + if(len(releaseVersions) > 0): + releaseVersions.sort( + reverse=True, key=lambda x: x["lastUpdated"]) + latestversion = releaseVersions[0]["version"] + + filteredversions = [] + for version in releaseVersions: + if version["version"] == latestversion: + filteredversions.append(version) + + return filteredversions + return self.versions + + def version(self): + if self.versions and len(self.versions) > 1: + return ";".join(list(map(lambda x: x['version'], self.versions))) + return self.versions[0]['version'] + + def set_recommended(self): + self.recommended = True + + def __repr__(self): + strs = f"<{self.__class__.__name__}> {self.identity} ({self.extensionId}) - Version: {self.version()}" + return strs + + +class VSCExtension: + + @staticmethod + def remove_old(artifactdir_extensions): + for path, directoryNames, fileNames in os.walk(artifactdir_extensions): + path = path.replace(artifactdir_extensions, '') + if path == '': + continue # Skip root directory + if str(path).count(os.path.sep) > 1: + continue # Skip any subdirectory + versions = sorted(directoryNames, key=lambda dir: Version(dir), reverse=True) + versions.remove(versions[0]) + if len(versions) == 0: + continue # Only a single version exists + for version in versions: + # Delete all left over versions + log.debug(f'Remove version {version} of {os.path.basename(path)}') + + +class VSCUpdates(object): + + @staticmethod + def latest_versions(insider=False): + versions = {} + for platform in vsc.PLATFORMS: + for architecture in vsc.ARCHITECTURES: + for buildtype in vsc.BUILDTYPES: + for quality in vsc.QUALITIES: + if quality == 'insider' and not insider: + continue + if platform == 'win32-x64' and architecture == 'ia32': + continue + if platform == 'darwin' and (architecture != '' or buildtype != ''): + continue + if 'linux' in platform and (architecture == '' or buildtype != ''): + continue + ver = VSCUpdateDefinition( + platform, architecture, buildtype, quality) + ver.check_for_update() + log.info(ver) + versions[f'{ver.identity}-{ver.quality}'] = ver + return versions + + @staticmethod + def latest_version(insider=False): + versions = VSCUpdates.latest_versions(insider) + latestVersion = Version('0.0.0') + for version in versions.items(): + productVersion = version[1].productVersion + if not productVersion: + break + productVersion = Version(productVersion) + if productVersion > latestVersion: + latestVersion = productVersion + return str(latestVersion) + + @staticmethod + def signal_updated(artifactdir): + signalpath = os.path.join(artifactdir, 'updated.json') + result = { + 'updated': datetime.datetime.now(datetime.timezone.utc) + } + with open(signalpath, 'w') as outfile: + json.dump(result, outfile, cls=vsc.MagicJsonEncoder, indent=4) + + @staticmethod + def remove_old(artifactdir_installers): + for path, directoryNames, fileNames in os.walk(artifactdir_installers): + path = path.replace(artifactdir_installers, '') + if path == '': + continue # Skip root directory + if str(path).count(os.path.sep) == 1: + continue # Skip version directory + if str(path).count(os.path.sep) > 2: + continue # Skip any subdirectory + filtered = filter(lambda file: not file.endswith('.json'), fileNames) + versions = sorted(filtered, key=lambda file: Version(re.findall(r'\d+\.\d+\.\d+', file)[0]), reverse=True) + if not versions: + # Versions could not be determined + log.debug(f'Versions of {path[path.index(os.path.sep)]} could not be determined') + continue + versions.remove(versions[0]) + if len(versions) == 0: + continue # Only a single version exists + for version in versions: + # Delete all left over versions + log.debug(f'Remove version {version} of {path[path.index(os.path.sep)]}') + + +class VSCMarketplace(object): + + def __init__(self, insider, prerelease, version, session): + self.insider = insider + self.prerelease = prerelease + self.version = version + self.session = session + self.backoff = 1 + + def get_recommendations(self, destination, totalrecommended): + recommendations = self.search_top_n(totalrecommended) + recommended_old = self.get_recommendations_old(destination) + + for extension in recommendations: + # If the extension has already been found then prevent it from being collected again when processing the old recommendation list + if extension.identity in recommended_old.keys(): + del recommended_old[extension.identity] + + for packagename in recommended_old: + extension = self.search_by_extension_name(packagename) + if extension: + recommendations.append(extension) + else: + log.debug( + f'get_recommendations failed finding a recommended extension by name for {packagename}. This extension has likely been removed.') + + prereleasecount = 0 + for recommendation in recommendations: + recommendation.set_recommended() + # If the found extension is a prerelease version search for the next available release version + if not self.prerelease and recommendation.isprerelease(): + prereleasecount += 1 + extension = self.search_release_by_extension_id( + recommendation.extensionId) + if extension: + recommendation.versions = extension.get_latest_release_versions() + return recommendations + + def get_recommendations_old(self, destination): + result = self.session.get(vsc.URL_RECOMMENDATIONS, + allow_redirects=True, timeout=vsc.TIMEOUT) + if result.status_code != 200: + log.warning( + f"get_recommendations failed accessing url {vsc.URL_RECOMMENDATIONS}, unhandled status code {result.status_code}") + return {} # Return an empty dictionary instead of False + + jresult = result.json() + with open(os.path.join(destination, 'recommendations.json'), 'w') as outfile: + json.dump(jresult, outfile, cls=vsc.MagicJsonEncoder, indent=4) + + # To dict to remove duplicates + packages = {} + for recommendation in jresult['workspaceRecommendations']: + for package in recommendation['recommendations']: + packages[package] = None + + return packages + + def get_malicious(self, destination, extensions=None): + if not extensions: + return + + # Query Microsofts list + result = self.session.get( + vsc.URL_MALICIOUS, allow_redirects=True, timeout=vsc.TIMEOUT) + if result.status_code != 200: + log.warning( + f"get_malicious failed accessing url {vsc.URL_MALICIOUS}, unhandled status code {result.status_code}") + return False + # Remove random utf-8 nbsp from server response + stripped = result.content.decode( + 'utf-8', 'ignore').replace(u'\xa0', u'') + jresult = json.loads(stripped) + + # Output to malicious.json (used by VS Code) + with open(os.path.join(destination, 'malicious.json'), 'w') as outfile: + json.dump(jresult, outfile, cls=vsc.MagicJsonEncoder, indent=4) + + # Remove malicious extensions from collection + for extension in (extensions.copy()): + if extension in jresult['malicious']: + log.warning( + f'Preventing malicious extension {extension} from being downloaded') + del extensions[extension] + + def get_specified(self, specifiedpath): + if not os.path.exists(specifiedpath): + result = { + 'extensions': [] + } + with open(specifiedpath, 'w') as outfile: + json.dump(result, outfile, cls=vsc.MagicJsonEncoder, indent=4) + log.info( + f'Created empty list of custom extensions to mirror at {specifiedpath}') + return + else: + with open(specifiedpath, 'r') as fp: + specifiedextensions = json.load(fp) + if specifiedextensions and 'extensions' in specifiedextensions: + specified = [] + for packagename in specifiedextensions['extensions']: + extension = self.search_by_extension_name(packagename) + if extension: + log.info(f'Adding extension to mirror {packagename}') + specified.append(extension) + else: + log.debug( + f'get_custom failed finding a recommended extension by name for {packagename}. This extension has likely been removed.') + return specified + + def search_by_text(self, searchtext): + if searchtext == '*': + searchtext = '' + + return self._query_marketplace(vsc.FilterType.SearchText, searchtext) + + def search_top_n(self, n=200): + log.info(f'Searching for top {n} recommended extensions') + return self._query_marketplace(vsc.FilterType.SearchText, '', limit=n, sortOrder=vsc.SortOrder.Descending, sortBy=vsc.SortBy.InstallCount) + + def search_by_extension_id(self, extensionid): + result = self._query_marketplace( + vsc.FilterType.ExtensionId, extensionid) + if result and len(result) == 1: + return result[0] + else: + log.warning(f"search_by_extension_id failed {extensionid}") + return False + + def search_by_extension_name(self, extensionname): + if self.prerelease: + result = self._query_marketplace( + vsc.FilterType.ExtensionName, extensionname) + else: + releaseQueryFlags = vsc.QueryFlags.IncludeFiles | vsc.QueryFlags.IncludeVersionProperties | vsc.QueryFlags.IncludeAssetUri | \ + vsc.QueryFlags.IncludeStatistics | vsc.QueryFlags.IncludeStatistics | vsc.QueryFlags.IncludeVersions + result = self._query_marketplace( + vsc.FilterType.ExtensionName, extensionname, queryFlags=releaseQueryFlags) + if result and len(result) == 1: + result[0].versions = result[0].get_latest_release_versions() + + if result and len(result) == 1: + return result[0] + else: + #log.debug(f"search_by_extension_name failed {extensionname} got {result}") + return False + + def search_release_by_extension_id(self, extensionid): + log.debug( + f'Searching for release candidate by extensionId: {extensionid}') + releaseQueryFlags = vsc.QueryFlags.IncludeFiles | vsc.QueryFlags.IncludeVersionProperties | vsc.QueryFlags.IncludeAssetUri | \ + vsc.QueryFlags.IncludeStatistics | vsc.QueryFlags.IncludeStatistics | vsc.QueryFlags.IncludeVersions + result = self._query_marketplace( + vsc.FilterType.ExtensionId, extensionid, queryFlags=releaseQueryFlags) + if result and len(result) == 1: + return result[0] + else: + log.warning(f"search_release_by_extension_id failed {extensionid}") + return False + + def backoff_reset(self): + self.backoff = 1 + + def backoff_sleep(self): + time.sleep(self.backoff) + self.backoff *= 2 + + def get_existing(self, artifactdir_extensions): + extensions=[] + for extension in glob.glob(os.path.join(artifactdir_extensions, '*', 'latest.json')): + manifest = vsc.Utility.load_json(extension) + result = self.search_by_extension_id(manifest['extensionId']) + if result: + extensions.append(result) + return extensions + + def _query_marketplace(self, filtertype, filtervalue, pageNumber=0, pageSize=500, limit=0, sortOrder=vsc.SortOrder.Default, sortBy=vsc.SortBy.NoneOrRelevance, queryFlags=0): + extensions = {} + total = 0 + count = 0 + + if 0 < limit < pageSize: + pageSize = limit + + while count <= total: + # log.debug(f'Query marketplace count {count} / total {total} - pagenumber {pageNumber}, pagesize {pageSize}') + pageNumber = pageNumber + 1 + query = self._query(filtertype, filtervalue, + pageNumber, pageSize, queryFlags) + result = None + i = 0 + while i < 10: + try: + result = self.session.post(vsc.URL_MARKETPLACEQUERY, headers=self._headers( + ), json=query, allow_redirects=True, timeout=vsc.TIMEOUT) + if result: + self.backoff_reset() + break + elif result.status_code == 429: + # Server is rate limiting us. Backoff. + self.backoff_sleep() + continue + except requests.exceptions.ProxyError: + log.info("ProxyError: Retrying.") + except requests.exceptions.ReadTimeout: + log.info("ReadTimeout: Retrying.") + i += 1 + if i < 10: + log.info("Retrying pull page %d attempt %d." % + (pageNumber, i+1)) + if not result: + log.info("Failed 10 attempts to query marketplace. Giving up.") + break + jresult = result.json() + count = count + pageSize + if 'results' in jresult: + for jres in jresult['results']: + for extension in jres['extensions']: + identity = extension['publisher']['publisherName'] + \ + '.' + extension['extensionName'] + mpd = VSCExtensionDefinition( + identity=identity, raw=extension) + extensions[identity] = mpd + + if 'resultMetadata' in jres: + for resmd in jres['resultMetadata']: + if 'ResultCount' in resmd['metadataType']: + total = resmd['metadataItems'][0]['count'] + if limit > 0 and count >= limit: + break + + return list(extensions.values()) + + def _query(self, filtertype, filtervalue, pageNumber, pageSize, queryFlags=0): + if queryFlags == 0: + queryFlags = self._query_flags() + payload = { + 'assetTypes': [], + 'filters': [self._query_filter(filtertype, filtervalue, pageNumber, pageSize)], + 'flags': int(queryFlags) + } + return payload + + def _query_filter(self, filtertype, filtervalue, pageNumber, pageSize): + result = { + 'pageNumber': pageNumber, + 'pageSize': pageSize, + 'sortBy': vsc.SortBy.NoneOrRelevance, + 'sortOrder': vsc.SortOrder.Default, + 'criteria': [ + self._query_filter_criteria( + vsc.FilterType.Target, 'Microsoft.VisualStudio.Code'), + self._query_filter_criteria( + vsc.FilterType.ExcludeWithFlags, str(int(vsc.QueryFlags.Unpublished))) + ] + } + + if filtervalue != '': + result['criteria'].append( + self._query_filter_criteria(filtertype, filtervalue) + ) + + return result + + def _query_filter_criteria(self, filtertype, queryvalue): + return { + 'filterType': int(filtertype), + 'value': queryvalue + } + + def _query_flags(self): + # return QueryFlags(914) + return vsc.QueryFlags.IncludeFiles | vsc.QueryFlags.IncludeVersionProperties | vsc.QueryFlags.IncludeAssetUri | \ + vsc.QueryFlags.IncludeStatistics | vsc.QueryFlags.IncludeLatestVersionOnly + + def _headers(self): + if self.insider: + insider = '-insider' + else: + insider = '' + return { + 'content-type': 'application/json', + 'accept': 'application/json;api-version=3.0-preview.1', + 'accept-encoding': 'gzip, deflate, br', + 'User-Agent': f'VSCode {self.version}{insider}', + 'x-market-client-Id': f'VSCode {self.version}{insider}', + 'x-market-user-Id': str(uuid.uuid4()) + } + + def __repr__(self): + strs = f"<{self.__class__.__name__}>" + return strs + + +if __name__ == '__main__': + parser = argparse.ArgumentParser( + description='Synchronises VSCode in an Offline Environment') + parser.add_argument('--sync', '-s', + dest='sync', + action='store_true', + help='The basic-user sync. It includes stable binaries and typical extensions' + ) + parser.add_argument('--syncall', '-a', + dest='syncall', + action='store_true', + help='The power-user sync. It includes all binaries and extensions' + ) + parser.add_argument('--artifacts', '-d', + dest='artifactdir', + default='../artifacts/', + help='Path to downloaded artifacts' + ) + parser.add_argument('--frequency', '-f', + dest='frequency', + default=None, + help='The frequency to try and update (e.g. sleep for \'12h\' and try again)' + ) + + # Arguments to tweak behaviour + # ToDo Implement action=argparse.BooleanOptionalAction to combine --check-binaries and --skip-binaries into a single argument + parser.add_argument('--check-binaries', + dest='checkbinaries', + action='store_true', + help='Check for updated binaries' + ) + parser.add_argument('--check-insider', '-i', + dest='checkinsider', + action='store_true', + help='Check for updated insider binaries' + ) + parser.add_argument('--check-recommended-extensions', '-r', + dest='checkrecommended', + action='store_true', + help='Check for recommended extensions' + ) + parser.add_argument('--check-specified-extensions', '-w', + dest='checkspecified', + action='store_true', + help='Check for extensions in /specified.json' + ) + # ToDo Allow for list of names (action='extend' nargs='+') + parser.add_argument('--extension-name', '-n', + dest='extensionname', + help='Find a specific extension by name' + ) + # ToDo Allow for list of names (action='extend' nargs='+') + parser.add_argument('--extension-search', + dest='extensionsearch', + help='Search for a set of extensions' + ) + parser.add_argument('--prerelease-extensions', '-p', + dest='prerelease', + action='store_true', + help='Download prerelease extensions. Defaults to false.' + ) + parser.add_argument('--update-binaries', '-b', + dest='updatebinaries', + action='store_true', + help='Download binaries' + ) + parser.add_argument('--update-extensions', '-u', + dest='updateextensions', + action='store_true', + help='Download extensions' + ) + parser.add_argument('--update-malicious-extensions', '-m', + dest='updatemalicious', + action='store_true', + help='Update the malicious extension list' + ) + parser.add_argument('--skip-binaries', '-B', + dest='skipbinaries', + action='store_true', + help='Skip downloading binaries' + ) + parser.add_argument('--vscode-version', '-v', + dest='version', + default=VSCUpdates.latest_version(), + help='VSCode version to search extensions as.' + ) + parser.add_argument('--total-recommended', + type=int, + dest='totalrecommended', + default=500, + help='Total number of recommended extensions to sync from Search API. Defaults to 500' + ) + parser.add_argument('--debug', + dest='debug', + action='store_true', + help='Show debug output' + ) + parser.add_argument('--logfile', '-l', + dest='logfile', + default=None, + help='Sets a logfile to store loggging output' + ) + parser.add_argument('--include-existing', '-e', + dest='existing', + action='store_true', + help='Include existing extensions in the update process' + ) + parser.add_argument('--skip-existing', '-E', + dest='skipExisting', + action='store_true', + help='Skip inclusion of existing extensions in the update process' + ) + parser.add_argument('--skip-recommended', '-R', + dest='skipRecommended', + action='store_true', + help='Skip inclusion of existing extensions in the update process' + ) + parser.add_argument('--garbage-collection', '-g', + dest='garbageCollection', + action='store_true', + help='Remove old versions of artifacts (binaries / extensions)' + ) + config = parser.parse_args() + + if config.debug: + loglevel = logging.DEBUG + else: + loglevel = logging.INFO + + if config.logfile: + log_dir = os.path.dirname(os.path.abspath(config.logfile)) + if not os.path.exists(log_dir): + raise FileNotFoundError( + f'Log directory does not exist at {log_dir}') + logging.basicConfig(filename=config.logfile, encoding='utf-8', level=loglevel) + else: + log.basicConfig( + format='[%(levelname)1.1s %(asctime)s %(module)s:%(lineno)d] %(message)s', + datefmt='%y%m%d %H:%M:%S', + level=loglevel + ) + + config.artifactdir_installers = os.path.join( + os.path.abspath(config.artifactdir), 'installers') + config.artifactdir_extensions = os.path.join( + os.path.abspath(config.artifactdir), 'extensions') + + if config.sync or config.syncall: + config.checkbinaries = True + config.checkrecommended = True + config.updatebinaries = True + config.updateextensions = True + config.updatemalicious = True + config.checkspecified = True + if not config.frequency: + config.frequency = '12h' + config.existing = True + + if config.syncall: + config.extensionsearch = '*' + config.checkinsider = True + + if config.artifactdir: + if not os.path.isdir(config.artifactdir): + raise FileNotFoundError( + f'Artifact directory does not exist at {config.artifactdir}') + + if config.updatebinaries and not config.checkbinaries: + config.checkbinaries = True + + if config.existing: + config.updateextensions = True + + if config.frequency: + config.frequency = timeparse(config.frequency) + + session = requests.Session() + retries = Retry(total=5, + backoff_factor=0.1, + status_forcelist=[ 500, 502, 503, 504 ]) + session.mount('https://', HTTPAdapter(max_retries=retries)) + + while True: + versions = [] + extensions = {} + mp = VSCMarketplace(config.checkinsider, + config.prerelease, config.version, session) + + if config.checkbinaries and not config.skipbinaries: + log.info('Syncing VS Code Update Binaries') + versions = VSCUpdates.latest_versions(config.checkinsider) + + if config.updatebinaries and not config.skipbinaries: + log.info('Syncing VS Code Binaries') + for idkey in versions: + if versions[idkey].updateurl: + result = versions[idkey].download_update( + config.artifactdir_installers) + + # Only save the reference json if the download was successful + if result: + versions[idkey].save_state( + config.artifactdir_installers) + + if config.garbageCollection: + log.info('Removing old VS Code Binaries') + VSCUpdates.remove_old(config.artifactdir_installers) + log.info('Removing old VS Code Extensions') + VSCExtension.remove_old(config.artifactdir_extensions) + + if config.existing and not config.skipExisting: + log.info('Get existing extensions from artifact directory') + existing = mp.get_existing(config.artifactdir_extensions) + if existing: + for item in existing: + extensions[item.identity] = item + + if config.checkspecified: + log.info('Syncing VS Code Specified Extensions') + specifiedpath = os.path.join(os.path.abspath( + config.artifactdir), 'specified.json') + specified = mp.get_specified(specifiedpath) + if specified: + for item in specified: + log.info(item) + extensions[item.identity] = item + + if config.extensionsearch: + log.info( + f'Searching for VS Code Extension: {config.extensionsearch}') + results = mp.search_by_text(config.extensionsearch) + log.info(f'Found {len(results)} extensions') + for item in results: + log.debug(item) + extensions[item.identity] = item + + if config.extensionname: + log.info( + f'Checking Specific VS Code Extension: {config.extensionname}') + result = mp.search_by_extension_name(config.extensionname) + if result: + extensions[result.identity] = result + + if config.checkrecommended and not config.skipRecommended: + log.info('Syncing VS Code Recommended Extensions') + recommended = mp.get_recommendations(os.path.abspath( + config.artifactdir), config.totalrecommended) + for item in recommended: + extensions[item.identity] = item + + if config.updatemalicious: + log.info('Syncing VS Code Malicious Extension List') + malicious = mp.get_malicious( + os.path.abspath(config.artifactdir), extensions) + + if config.updateextensions: + log.info( + f'Checking and Downloading Updates for {len(extensions)} Extensions') + count = 0 + bonus = [] + for identity in extensions: + log.debug(f'Fetching extension: {identity}') + if count % 100 == 0: + log.info( + f'Progress {count}/{len(extensions)} ({count/len(extensions)*100:.1f}%)') + extensions[identity].download_assets( + config.artifactdir_extensions, session) + bonus = extensions[identity].process_embedded_extensions( + config.artifactdir_extensions, mp) + bonus + extensions[identity].save_state(config.artifactdir_extensions) + count = count + 1 + + for bonusextension in bonus: + log.debug(f'Processing Embedded Extension: {bonusextension}') + bonusextension.download_assets(config.artifactdir_extensions, session) + bonusextension.save_state(config.artifactdir_extensions) + + # Check if we did anything + if config.checkbinaries or config.checkrecommended or config.updatebinaries or config.updateextensions or config.updatemalicious or config.checkspecified or config.checkinsider: + log.info('Complete') + VSCUpdates.signal_updated(os.path.abspath(config.artifactdir)) + + # Check if we need to sleep + if config.frequency: + log.info( + f'Going to sleep for {vsc.Utility.seconds_to_human_time(config.frequency)}') + time.sleep(config.frequency) + else: + break + else: + log.info('Nothing to do') + break + diff --git a/vscoffline/vsc.py b/vscoffline/vsc.py index 61cb8a6..65b97c1 100644 --- a/vscoffline/vsc.py +++ b/vscoffline/vsc.py @@ -1,194 +1,194 @@ -import datetime -import hashlib -import json -import os -import pathlib -from enum import IntFlag -from typing import Any, Dict, List, Union -import logging as log - -PLATFORMS = ["win32", "linux", "linux-deb", "linux-rpm", "darwin", "linux-snap", "server-linux", "server-linux-legacy", "cli-alpine"] -ARCHITECTURES = ["", "x64"] -BUILDTYPES = ["", "archive", "user"] -QUALITIES = ["stable", "insider"] - -URL_BINUPDATES = r"https://update.code.visualstudio.com/api/update/" -URL_RECOMMENDATIONS = r"https://az764295.vo.msecnd.net/extensions/workspaceRecommendations.json.gz" -URL_MARKETPLACEQUERY = r"https://marketplace.visualstudio.com/_apis/public/gallery/extensionquery" -URL_MALICIOUS = r"https://az764295.vo.msecnd.net/extensions/marketplace.json" - -URLROOT = "https://update.code.visualstudio.com" -ARTIFACTS = "/artifacts/" -ARTIFACTS_INSTALLERS = "/artifacts/installers" -ARTIFACTS_EXTENSIONS = "/artifacts/extensions" -ARTIFACT_RECOMMENDATION = "/artifacts/recommendations.json" -ARTIFACT_MALICIOUS = "/artifacts/malicious.json" - -TIMEOUT = 12 - - -class QueryFlags(IntFlag): - __no_flags_name__ = "NoneDefined" - NoneDefined = 0x0 - IncludeVersions = 0x1 - IncludeFiles = 0x2 - IncludeCategoryAndTags = 0x4 - IncludeSharedAccounts = 0x8 - IncludeVersionProperties = 0x10 - ExcludeNonValidated = 0x20 - IncludeInstallationTargets = 0x40 - IncludeAssetUri = 0x80 - IncludeStatistics = 0x100 - IncludeLatestVersionOnly = 0x200 - Unpublished = 0x1000 - - -class FilterType(IntFlag): - __no_flags_name__ = "Target" - Tag = 1 - ExtensionId = 4 - Category = 5 - ExtensionName = 7 - Target = 8 - Featured = 9 - SearchText = 10 - ExcludeWithFlags = 12 - UndefinedType = 14 - - -class SortBy(IntFlag): - __no_flags_name__ = "NoneOrRelevance" - NoneOrRelevance = 0 - LastUpdatedDate = 1 - Title = 2 - PublisherName = 3 - InstallCount = 4 - PublishedDate = 5 - AverageRating = 6 - WeightedRating = 12 - - -class SortOrder(IntFlag): - __no_flags_name__ = "Default" - Default = 0 - Ascending = 1 - Descending = 2 - - -class MagicJsonEncoder(json.JSONEncoder): - def default(self, o: Any) -> Union[str, Dict[str, Any]]: - try: - return super().default(o) - except TypeError as err: - # could be datetime - if isinstance(o, datetime.datetime): - return o.isoformat() - # could also be cls with slots - try: - return {key: getattr(o, key, None) for key in o.__slots__} - except AttributeError: - pass - # finally, should have a dict if it is a dataclass or another cls - try: - return o.__dict__ - except AttributeError: - raise TypeError( - "Can't encode object. Tried isoformat of datetime, class slots and class dict" - ) from err - - -class Utility: - """ - Utility tool - """ - - @staticmethod - def hash_file_and_check(filepath: Union[str, pathlib.Path], expectedchecksum: str) -> bool: - """ - Hashes a file and checks for the expected checksum. - Checksum is sha256 default implementation. - """ - h = hashlib.sha256() - with open(filepath, "rb") as f: - for chunk in iter(lambda: f.read(4096), b""): - h.update(chunk) - return expectedchecksum == h.hexdigest() - - @staticmethod - def load_json(filepath: Union[str, pathlib.Path]) -> Union[List[Any], Dict[str, Any]]: - if isinstance(filepath, str): - filepath: pathlib.Path = pathlib.Path(filepath) - - result = [] - if not filepath.exists(): - log.debug(f"Unable to load json from {filepath.absolute()}. Does not exist.") - return result - elif filepath.is_dir(): - log.debug(f"Cannot load json at path {filepath.absolute()}. It is a directory") - return result - - with open(filepath, "r", encoding="utf-8-sig") as fp: - try: - result = json.load(fp) - if not result: - return [] - except json.decoder.JSONDecodeError as err: - log.debug(f"JSONDecodeError while processing {filepath.absolute()} \n error: {str(err)}") - return [] - except UnicodeDecodeError as err: - log.debug(f"UnicodeDecodeError while processing {filepath.absolute()} \n error: {str(err)}") - return [] - return result - - @staticmethod - def write_json(filepath: Union[str, pathlib.Path], content: Dict[str, Any]) -> None: - with open(filepath, "w") as outfile: - json.dump(content, outfile, cls=MagicJsonEncoder, indent=4) - - @staticmethod - def first_file(filepath: Union[str, pathlib.Path], pattern: str, reverse: bool = False) -> Union[str, bool]: - if isinstance(filepath, str): - filepath = pathlib.Path(filepath) - results = [*filepath.glob(pattern)] - if not results: - return False - elif len(results) >= 1 and reverse: - results.sort(reverse=True) - return str(results[0].absolute()) - - @staticmethod - def folders_in_folder(filepath: str) -> List[str]: - listing = [f for f in os.listdir(filepath) if os.path.isdir(os.path.join(filepath, f))] - listing.sort() - return listing - - @staticmethod - def files_in_folder(filepath: str) -> List[str]: - listing = [f for f in os.listdir(filepath) if os.path.isfile(os.path.join(filepath, f))] - listing.sort() - return listing - - - @staticmethod - def seconds_to_human_time(seconds: int) -> str: - return str(datetime.timedelta(seconds=seconds)) - - @staticmethod - def from_json_datetime(jsondate: str) -> datetime.datetime: - return datetime.datetime.strptime(jsondate, "%Y-%m-%dT%H:%M:%S.%fZ") - - @staticmethod - def validate_platform(platform: str) -> bool: - return platform in PLATFORMS - - @staticmethod - def validate_architecture(arch: str) -> bool: - return arch in ARCHITECTURES - - @staticmethod - def validate_buildtype(buildtype: str) -> bool: - return buildtype in BUILDTYPES - - @staticmethod - def validate_quality(quality: str) -> bool: - return quality in QUALITIES +import datetime +import hashlib +import json +import os +import pathlib +from enum import IntFlag +from typing import Any, Dict, List, Union +import logging as log + +PLATFORMS = ["win32-x64", "linux", "linux-deb", "linux-rpm", "darwin", "darwin-arm64", "darwin-universal", "linux-snap", "server-linux", "server-linux-legacy", "cli-alpine"] +ARCHITECTURES = ["", "x64", "arm64", "armhf", "alpine"] +BUILDTYPES = ["", "archive", "user", "web"] +QUALITIES = ["stable", "insider"] + +URL_BINUPDATES = r"https://update.code.visualstudio.com/api/update/" +URL_RECOMMENDATIONS = r"https://main.vscode-cdn.net/extensions/marketplace.json" +URL_MARKETPLACEQUERY = r"https://marketplace.visualstudio.com/_apis/public/gallery/extensionquery" +URL_MALICIOUS = r"https://main.vscode-cdn.net/extensions/marketplace.json" + +URLROOT = "https://update.code.visualstudio.com" +ARTIFACTS = "/artifacts/" +ARTIFACTS_INSTALLERS = "/artifacts/installers" +ARTIFACTS_EXTENSIONS = "/artifacts/extensions" +ARTIFACT_RECOMMENDATION = "/artifacts/recommendations.json" +ARTIFACT_MALICIOUS = "/artifacts/malicious.json" + +TIMEOUT = 12 + + +class QueryFlags(IntFlag): + __no_flags_name__ = "NoneDefined" + NoneDefined = 0x0 + IncludeVersions = 0x1 + IncludeFiles = 0x2 + IncludeCategoryAndTags = 0x4 + IncludeSharedAccounts = 0x8 + IncludeVersionProperties = 0x10 + ExcludeNonValidated = 0x20 + IncludeInstallationTargets = 0x40 + IncludeAssetUri = 0x80 + IncludeStatistics = 0x100 + IncludeLatestVersionOnly = 0x200 + Unpublished = 0x1000 + + +class FilterType(IntFlag): + __no_flags_name__ = "Target" + Tag = 1 + ExtensionId = 4 + Category = 5 + ExtensionName = 7 + Target = 8 + Featured = 9 + SearchText = 10 + ExcludeWithFlags = 12 + UndefinedType = 14 + + +class SortBy(IntFlag): + __no_flags_name__ = "NoneOrRelevance" + NoneOrRelevance = 0 + LastUpdatedDate = 1 + Title = 2 + PublisherName = 3 + InstallCount = 4 + PublishedDate = 5 + AverageRating = 6 + WeightedRating = 12 + + +class SortOrder(IntFlag): + __no_flags_name__ = "Default" + Default = 0 + Ascending = 1 + Descending = 2 + + +class MagicJsonEncoder(json.JSONEncoder): + def default(self, o: Any) -> Union[str, Dict[str, Any]]: + try: + return super().default(o) + except TypeError as err: + # could be datetime + if isinstance(o, datetime.datetime): + return o.isoformat() + # could also be cls with slots + try: + return {key: getattr(o, key, None) for key in o.__slots__} + except AttributeError: + pass + # finally, should have a dict if it is a dataclass or another cls + try: + return o.__dict__ + except AttributeError: + raise TypeError( + "Can't encode object. Tried isoformat of datetime, class slots and class dict" + ) from err + + +class Utility: + """ + Utility tool + """ + + @staticmethod + def hash_file_and_check(filepath: Union[str, pathlib.Path], expectedchecksum: str) -> bool: + """ + Hashes a file and checks for the expected checksum. + Checksum is sha256 default implementation. + """ + h = hashlib.sha256() + with open(filepath, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + h.update(chunk) + return expectedchecksum == h.hexdigest() + + @staticmethod + def load_json(filepath: Union[str, pathlib.Path]) -> Union[List[Any], Dict[str, Any]]: + if isinstance(filepath, str): + filepath: pathlib.Path = pathlib.Path(filepath) + + result = [] + if not filepath.exists(): + log.debug(f"Unable to load json from {filepath.absolute()}. Does not exist.") + return result + elif filepath.is_dir(): + log.debug(f"Cannot load json at path {filepath.absolute()}. It is a directory") + return result + + with open(filepath, "r", encoding="utf-8-sig") as fp: + try: + result = json.load(fp) + if not result: + return [] + except json.decoder.JSONDecodeError as err: + log.debug(f"JSONDecodeError while processing {filepath.absolute()} \n error: {str(err)}") + return [] + except UnicodeDecodeError as err: + log.debug(f"UnicodeDecodeError while processing {filepath.absolute()} \n error: {str(err)}") + return [] + return result + + @staticmethod + def write_json(filepath: Union[str, pathlib.Path], content: Dict[str, Any]) -> None: + with open(filepath, "w") as outfile: + json.dump(content, outfile, cls=MagicJsonEncoder, indent=4) + + @staticmethod + def first_file(filepath: Union[str, pathlib.Path], pattern: str, reverse: bool = False) -> Union[str, bool]: + if isinstance(filepath, str): + filepath = pathlib.Path(filepath) + results = [*filepath.glob(pattern)] + if not results: + return False + elif len(results) >= 1 and reverse: + results.sort(reverse=True) + return str(results[0].absolute()) + + @staticmethod + def folders_in_folder(filepath: str) -> List[str]: + listing = [f for f in os.listdir(filepath) if os.path.isdir(os.path.join(filepath, f))] + listing.sort() + return listing + + @staticmethod + def files_in_folder(filepath: str) -> List[str]: + listing = [f for f in os.listdir(filepath) if os.path.isfile(os.path.join(filepath, f))] + listing.sort() + return listing + + + @staticmethod + def seconds_to_human_time(seconds: int) -> str: + return str(datetime.timedelta(seconds=seconds)) + + @staticmethod + def from_json_datetime(jsondate: str) -> datetime.datetime: + return datetime.datetime.strptime(jsondate, "%Y-%m-%dT%H:%M:%S.%fZ") + + @staticmethod + def validate_platform(platform: str) -> bool: + return platform in PLATFORMS + + @staticmethod + def validate_architecture(arch: str) -> bool: + return arch in ARCHITECTURES + + @staticmethod + def validate_buildtype(buildtype: str) -> bool: + return buildtype in BUILDTYPES + + @staticmethod + def validate_quality(quality: str) -> bool: + return quality in QUALITIES diff --git a/vscoffline/vscsync/requirements.txt b/vscoffline/vscsync/requirements.txt index 2b98212..13ec905 100644 --- a/vscoffline/vscsync/requirements.txt +++ b/vscoffline/vscsync/requirements.txt @@ -1,3 +1,4 @@ -requests -pytimeparse -setuptools +requests +pytimeparse +setuptools +packaging \ No newline at end of file