-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathgcs.py
142 lines (124 loc) · 5.94 KB
/
gcs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from .base import Output, NotConfiguredException
from google.cloud import storage
import base64
import os
class GcsOutput(Output):
"""
Writes contents to Google Cloud Storage.
Args:
bucket (str): Target bucket name.
object (str, optional): Target object name. Either specify object or objects.
objects (list, optional): Target objects when writing multiple files. Contents template will be
called multiple times with `filename` and `key` variables.
contents (str, optional): Contents to write to target file. Either specify contents or file.
file (str, optional): File to write.
project (str, optional): Google Cloud project to issue Cloud Storage API calls against.
"""
def output(self):
if 'bucket' not in self.output_config:
raise NotConfiguredException(
'No destination bucket defined in GCS output.')
if 'object' not in self.output_config and 'objects' not in self.output_config:
raise NotConfiguredException(
'No destination object(s) defined in GCS output.')
if 'contents' not in self.output_config and 'file' not in self.output_config:
raise NotConfiguredException('No GCS contents defined in output.')
bucket_template = self.jinja_environment.from_string(
self.output_config['bucket'])
bucket_template.name = 'bucket'
destination_bucket = bucket_template.render()
project = self.output_config[
'project'] if 'project' in self.output_config else None
storage_client = storage.Client(
client_info=self._get_grpc_client_info(), project=project)
bucket = storage_client.bucket(destination_bucket)
if 'objects' in self.output_config:
all_objects = self._jinja_var_to_list(self.output_config['objects'])
for destination_object, destination_key in all_objects.items():
self.logger.debug(
'Creating destination file in bucket.',
extra={
'url':
'gs://%s/%s' %
(destination_bucket, destination_object)
})
contents_template = self.jinja_environment.from_string(
self.output_config['contents'])
contents_template.name = 'contents'
contents = contents_template.render({
'filename': destination_object,
'key': destination_key
})
if 'base64decode' in self.output_config and self.output_config[
'base64decode']:
contents = base64.decodebytes(contents.encode('ascii'))
blob = bucket.blob(destination_object)
blob.upload_from_string(contents)
self.logger.info(
'Object created in Cloud Storage bucket.',
extra={
'url':
'gs://%s/%s' %
(destination_bucket, destination_object),
'size':
len(contents)
})
else: # Single object
object_template = self.jinja_environment.from_string(
self.output_config['object'])
object_template.name = 'object'
destination_object = object_template.render()
self.logger.debug('Creating destination file in bucket.',
extra={
'url':
'gs://%s/%s' %
(destination_bucket, destination_object)
})
blob = bucket.blob(destination_object)
if 'contents' in self.output_config:
contents_template = self.jinja_environment.from_string(
self.output_config['contents'])
contents_template.name = 'contents'
contents = contents_template.render()
if 'base64decode' in self.output_config and self.output_config[
'base64decode']:
contents = base64.decodebytes(contents.encode('ascii'))
blob.upload_from_string(contents)
self.logger.info(
'Object created in Cloud Storage bucket.',
extra={
'url':
'gs://%s/%s' %
(destination_bucket, destination_object),
'size':
len(contents)
})
else:
filename = self._jinja_expand_string(self.output_config['file'],
'file')
blob.upload_from_filename(filename)
file_stats = os.stat(filename)
self.logger.info(
'Object created in Cloud Storage bucket.',
extra={
'url':
'gs://%s/%s' %
(destination_bucket, destination_object),
'file_name':
filename,
'size':
file_stats.st_size,
})