-
Notifications
You must be signed in to change notification settings - Fork 249
/
Copy pathartifacttool_updater.py
203 lines (168 loc) · 8.85 KB
/
artifacttool_updater.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import io
import os
import platform
import shutil
import stat
import sys
import time
import uuid
import zipfile
import humanfriendly
import requests
from knack.log import get_logger
from knack.util import CLIError
import distro
from azext_devops.dev.common.services import get_connection
from azext_devops.dev.common.config import AZ_DEVOPS_GLOBAL_CONFIG_DIR
from azext_devops.dev.common.const import (ARTIFACTTOOL_OVERRIDE_PATH_ENVKEY,
ARTIFACTTOOL_OVERRIDE_URL_ENVKEY,
ARTIFACTTOOL_OVERRIDE_VERSION_ENVKEY)
logger = get_logger(__name__)
# pylint: disable=too-few-public-methods
class ArtifactToolUpdater:
def get_latest_artifacttool(self, organization):
artifacttool_binary_override_path = os.environ.get(ARTIFACTTOOL_OVERRIDE_PATH_ENVKEY)
if artifacttool_binary_override_path is not None:
artifacttool_binary_path = artifacttool_binary_override_path
logger.debug("ArtifactTool path was overriden to '%s' due to environment variable %s",
artifacttool_binary_path, ARTIFACTTOOL_OVERRIDE_PATH_ENVKEY)
else:
logger.debug("Checking for a new ArtifactTool")
artifacttool_binary_path = self._get_artifacttool(organization)
return artifacttool_binary_path
def _get_artifacttool(self, organization): # pylint: disable=no-self-use
logger.debug("Checking for ArtifactTool updates")
# Call the auto-update API to find the current version of ArtifactTool
# If AZURE_DEVOPS_EXT_ARTIFACTTOOL_OVERRIDE_URL is set, instead always download from the URL
artifacttool_override_url = os.environ.get(ARTIFACTTOOL_OVERRIDE_URL_ENVKEY)
if artifacttool_override_url is not None:
release_uri = artifacttool_override_url
release_id = "custom_{}".format(uuid.uuid4())
logger.debug("ArtifactTool download URL is being overridden to '%s' (ID '%s')", release_uri, release_id)
else:
override_version = os.environ.get(ARTIFACTTOOL_OVERRIDE_VERSION_ENVKEY)
try:
release = _get_current_release(organization, override_version)
except Exception as ex:
logger.debug(ex, exc_info=True)
raise CLIError('Failed to update Universal Packages tooling.\n {}'.format(ex))
release_uri, release_id = release
# Determine the path for the release, and skip downloading if it already exists
logger.debug("Checking if we already have ArtifactTool release '%s'", release_id)
release_dir = _compute_release_dir(release_id)
if os.path.exists(release_dir):
logger.debug("Not updating ArtifactTool because the current release already exists at '%s'", release_dir)
return release_dir
# Doesn't already exist. Download and extract the release.
logger.debug("Updating to ArtifactTool release %s since it doesn't exist at %s", release_id, release_dir)
_update_artifacttool(release_uri, release_id)
return release_dir
def _update_artifacttool(uri, release_id):
root = _compute_artifacttool_root()
# Remove all existing releases. In the future we may maintain some old versions,
# but right now we always delete them.
if os.path.isdir(root):
for item in os.listdir(root):
path = os.path.join(root, item)
if os.path.isdir(path):
logger.debug("Trying to remove old release %s", item)
shutil.rmtree(path, ignore_errors=True) # Failing cleanup is not fatal
with humanfriendly.Spinner( # pylint: disable=no-member
label="Downloading Universal Packages tooling ({})"
.format(release_id), total=100, stream=sys.stderr) as spinner:
spinner.step()
logger.debug("Downloading ArtifactTool from %s", uri)
# Make the request, determine the total size
response = requests.get(uri, stream=True)
content_length_header = response.headers['Content-Length'].strip()
content_length = int(content_length_header)
# Do the download, updating the progress bar
content = io.BytesIO()
bytes_so_far = 0
for chunk in response.iter_content(chunk_size=1024 * 512):
if chunk:
content.write(chunk)
bytes_so_far += len(chunk)
spinner.step(100 * float(bytes_so_far) / float(content_length))
# Extract the zip
release_temp_dir = os.path.join(root, str(uuid.uuid4()))
logger.debug("Extracting ArtifactTool to %s", release_temp_dir)
f = zipfile.ZipFile(content)
try:
_mkdir_if_not_exist(release_temp_dir)
f.extractall(path=release_temp_dir)
# For Linux, ensure the executable bit is set on the binary "ArtifactTool" if it exists.
# Python has a bug https://bugs.python.org/issue15795 where file permissions are not preserved.
artifacttool_binary = os.path.join(release_temp_dir, "artifacttool")
if os.path.exists(artifacttool_binary):
artifacttool_stat = os.stat(artifacttool_binary)
os.chmod(artifacttool_binary,
artifacttool_stat.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
# Move the release into the real releases location
release_dir = _compute_release_dir(release_id)
if os.path.exists(release_dir):
logger.warning(
"The Universal Packages tool already exists at the location %s. Skipping download.",
release_dir)
else:
logger.debug("Moving downloaded ArtifactTool from %s to %s", release_temp_dir, release_dir)
# number of times to retry
retries = 10
for _ in range(retries - 1):
try:
os.rename(release_temp_dir, release_dir)
break
except BaseException as ex: # pylint: disable=broad-except
logger.debug(
"An error occurred while renaming the Universal Packages tooling: %s. Retrying...", ex)
time.sleep(1)
else:
os.rename(release_temp_dir, release_dir)
logger.info("Downloaded Universal Packages tooling successfully")
except BaseException as ex: # pylint: disable=broad-except
logger.error("An error occurred while extracting the Universal Packages tooling: %s", ex)
logger.debug("Removing temporary directory %s", release_temp_dir)
shutil.rmtree(release_temp_dir, ignore_errors=True)
def _get_current_release(organization, override_version):
connection = get_connection(organization)
client = connection.get_client('azext_devops.dev.common.client_tool.client_tool_client.ClientToolClient')
logger.debug("Looking up current version of ArtifactTool...")
# Distro returns empty strings on Windows currently, so don't even send
distro_name = distro.id() or None
distro_version = distro.version() or None
os_name = platform.system()
arch = platform.machine()
# For M1 macs, there is no version of artifact tool. However, the x86_64
# version can run under Rosetta, so we use that instead.
if os_name == "Darwin" and arch in ["amd64", "arm64"]:
arch = "x86_64"
# Similarly for Windows ARM64 targets there is no version of artifact tool. However, the x86_64
# version can run under emulation, so we use that instead.
if os_name == "Windows" and arch == "ARM64":
arch = "x86_64"
release = client.get_clienttool_release(
"ArtifactTool",
os_name=os_name,
arch=arch,
distro_name=distro_name,
distro_version=distro_version,
version=override_version)
return (release.uri, _compute_id(release)) if release is not None else None
def _mkdir_if_not_exist(path):
try:
os.makedirs(path)
except OSError:
# Ignore errors that were likely because the directory already exists
if not os.path.isdir(path):
raise
def _compute_id(release):
return "{}_{}_{}".format(release.name, release.rid, release.version)
def _compute_artifacttool_root():
az_devops_cli_root = os.path.join(AZ_DEVOPS_GLOBAL_CONFIG_DIR, 'cli', 'tools')
return os.path.join(az_devops_cli_root, "artifacttool")
def _compute_release_dir(release_id):
return os.path.join(_compute_artifacttool_root(), release_id)