-
Notifications
You must be signed in to change notification settings - Fork 205
/
Copy path_file_cache.py
176 lines (141 loc) · 6.06 KB
/
_file_cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import base64
import json
import logging
import os
import time
try:
import collections.abc as collections
except ImportError:
import collections
logger = logging.getLogger(__name__)
class FileCache(collections.MutableMapping):
"""A simple dict-like class that is backed by a JSON file.
All direct modifications will save the file. Indirect modifications should
be followed by a call to `save_with_retry` or `save`.
"""
def __init__(self, file_name, max_age=0):
super(FileCache, self).__init__()
self.file_name = file_name
self.max_age = max_age
self.data = {}
self.initial_load_occurred = False
def load(self):
self.data = {}
try:
if os.path.isfile(self.file_name):
if self.max_age > 0 and os.stat(self.file_name).st_mtime + self.max_age < time.time():
logger.debug('Cache file expired: %s', self.file_name)
os.remove(self.file_name)
else:
logger.debug('Loading cache file: %s', self.file_name)
self.data = get_file_json(self.file_name, throw_on_empty=False) or {}
else:
logger.debug('Cache file does not exist: %s', self.file_name)
except Exception as ex:
logger.debug(ex, exc_info=True)
# file is missing or corrupt so attempt to delete it
try:
os.remove(self.file_name)
except Exception as ex2:
logger.debug(ex2, exc_info=True)
self.initial_load_occurred = True
def save(self):
self._check_for_initial_load()
self._save()
def _save(self):
if self.file_name:
with os.fdopen(os.open(self.file_name, os.O_RDWR | os.O_CREAT | os.O_TRUNC, 0o600), 'w+') as cred_file:
cred_file.write(json.dumps(self.data))
def save_with_retry(self, retries=5):
self._check_for_initial_load()
for _ in range(retries - 1):
try:
self.save()
break
except OSError:
time.sleep(0.1)
else:
self.save()
def clear(self):
if os.path.isfile(self.file_name):
logger.info("Deleting file: " + self.file_name)
os.remove(self.file_name)
else:
logger.info("File does not exist: " + self.file_name)
def get(self, key, default=None):
self._check_for_initial_load()
return self.data.get(key, default)
def __getitem__(self, key):
self._check_for_initial_load()
return self.data.setdefault(key, {})
def __setitem__(self, key, value):
self._check_for_initial_load()
self.data[key] = value
self.save_with_retry()
def __delitem__(self, key):
self._check_for_initial_load()
del self.data[key]
self.save_with_retry()
def __iter__(self):
self._check_for_initial_load()
return iter(self.data)
def __len__(self):
self._check_for_initial_load()
return len(self.data)
def _check_for_initial_load(self):
if not self.initial_load_occurred:
self.load()
def get_cache_dir():
azure_devops_cache_dir = os.getenv('AZURE_DEVOPS_CACHE_DIR', None)\
or os.path.expanduser(os.path.join('~', '.azure-devops', 'python-sdk', 'cache'))
if not os.path.exists(azure_devops_cache_dir):
os.makedirs(azure_devops_cache_dir)
return azure_devops_cache_dir
DEFAULT_MAX_AGE = 3600 * 12 # 12 hours
DEFAULT_CACHE_DIR = get_cache_dir()
def get_cache(name, max_age=DEFAULT_MAX_AGE, cache_dir=DEFAULT_CACHE_DIR):
file_name = os.path.join(cache_dir, name + '.json')
return FileCache(file_name, max_age)
OPTIONS_CACHE = get_cache('options')
RESOURCE_CACHE = get_cache('resources')
# Code below this point from azure-cli-core
# https://github.com/Azure/azure-cli/blob/master/src/azure-cli-core/azure/cli/core/util.py
def get_file_json(file_path, throw_on_empty=True, preserve_order=False):
content = read_file_content(file_path)
if not content and not throw_on_empty:
return None
return shell_safe_json_parse(content, preserve_order)
def read_file_content(file_path, allow_binary=False):
from codecs import open as codecs_open
# Note, always put 'utf-8-sig' first, so that BOM in WinOS won't cause trouble.
for encoding in ['utf-8-sig', 'utf-8', 'utf-16', 'utf-16le', 'utf-16be']:
try:
with codecs_open(file_path, encoding=encoding) as f:
logger.debug("attempting to read file %s as %s", file_path, encoding)
return f.read()
except UnicodeDecodeError:
if allow_binary:
with open(file_path, 'rb') as input_file:
logger.debug("attempting to read file %s as binary", file_path)
return base64.b64encode(input_file.read()).decode("utf-8")
else:
raise
except UnicodeError:
pass
raise ValueError('Failed to decode file {} - unknown decoding'.format(file_path))
def shell_safe_json_parse(json_or_dict_string, preserve_order=False):
""" Allows the passing of JSON or Python dictionary strings. This is needed because certain
JSON strings in CMD shell are not received in main's argv. This allows the user to specify
the alternative notation, which does not have this problem (but is technically not JSON). """
try:
if not preserve_order:
return json.loads(json_or_dict_string)
from collections import OrderedDict
return json.loads(json_or_dict_string, object_pairs_hook=OrderedDict)
except ValueError:
import ast
return ast.literal_eval(json_or_dict_string)