forked from gondoi/shearline
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathshearline.py
executable file
·164 lines (133 loc) · 5.64 KB
/
shearline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import multiprocessing
import os
import sys
import signal
import time
import Queue
import argparse
import syslog
import cloudfiles
from boto.s3.connection import S3Connection
from boto.exception import S3ResponseError
from boto.s3.key import Key
class CommandError(Exception):
pass
def env(e):
return os.environ.get(e, '')
class Shearline(object):
def __init__(self):
self.parser = argparse.ArgumentParser()
self.parser.add_argument("-q", "--quiet", action="store_true",
help="disable progress output")
self.parser.add_argument("-v", "--verbose", action="store_true",
help="enable verbose output")
self.parser.add_argument("--bucket", default=env('S3_BUCKET'),
help='defaults to env[S3_BUCKET]')
self.parser.add_argument("--username",
default=env('CF_USERNAME'),
help='defaults to env[CF_USERNAME]')
self.parser.add_argument("--apikey",
default=env('CF_APIKEY'),
help='defaults to env[CF_APIKEY]')
self.parser.add_argument("--container",
default=env('CF_CONTAINER'),
help='defaults to env[CF_CONTAINER]')
self.parser.add_argument("--processes", type=int, default=1,
help='number of synchronization processes at a time')
def synchronize(self, key):
status = None
while True:
try:
syslog.syslog('Synchronizing item: %s' % key)
if self.verbose:
print 'Started: %s' % key
s3 = S3Connection(is_secure=True, anon=True)
bucket = s3.get_bucket(self.s3_bucket)
item = bucket.get_key(key)
cf = cloudfiles.get_connection(self.cf_username, self.cf_apikey)
container = cf.create_container(self.cf_container)
cf_object = container.create_object(key)
if item.size > 0:
if cf_object.etag is None or item.etag != '"%s"' % cf_object.etag:
cf_object.send(item)
status = "Created on Cloud Files: %s" % item.key
else:
status = "Already exists and is up-to-date: %s" % item.key
else:
status = "Skipping empty item: %s" %item.key
syslog.syslog(status)
if self.verbose:
print status
break
except S3ResponseError:
break
except Exception, e:
syslog.syslog(syslog.LOG_ERR, 'ERROR: %s' % e)
print "ERROR: %s" % e
time.sleep(10)
return status
def process(self, total, job_queue, result_queue):
signal.signal(signal.SIGINT, signal.SIG_IGN)
while not job_queue.empty():
try:
if not self.quiet:
progress = 100 - float(job_queue.qsize()) / total * 100
print "Progress: %.2f%%" % progress
job = job_queue.get(block=False)
result_queue.put(self.synchronize(job))
except Queue.Empty:
pass
def main(self, argv):
args = self.parser.parse_args(argv)
self.s3_bucket = args.bucket
self.cf_username = args.username
self.cf_apikey = args.apikey
self.cf_container = args.container
self.processes = args.processes
self.verbose = args.verbose
self.quiet = args.quiet
if not self.s3_bucket:
raise CommandError("You must provide an S3 bucket, either via "
"--bucket or via env[S3_BUCKET]")
if not self.cf_username:
raise CommandError("You must provide a Cloud Files username, either via "
"--username or via env[CF_USERNAME]")
if not self.cf_apikey:
raise CommandError("You must provide a Cloud Files API key, either via "
"--apikey or via env[CF_APIKEY]")
if not self.cf_container:
raise CommandError("You must provide a Cloud Files container, either via "
"--container or via env[CF_CONTAINER]")
syslog.syslog('Processing started')
job_queue = multiprocessing.Queue()
result_queue = multiprocessing.Queue()
syslog.syslog('Connecting to S3 bucket: %s' % self.s3_bucket)
s3 = S3Connection(is_secure=True, anon=True)
bucket = s3.get_bucket(self.s3_bucket)
for item in bucket.list():
job_queue.put(item.key)
total = job_queue.qsize()
syslog.syslog('Total size of S3 bucket: %s items' % total)
workers = []
for i in range(self.processes):
tmp = multiprocessing.Process(target=self.process,
args=(total, job_queue, result_queue))
tmp.start()
workers.append(tmp)
try:
for worker in workers:
worker.join()
except KeyboardInterrupt:
for worker in workers:
worker.terminate()
worker.join()
if self.verbose:
while not result_queue.empty():
print result_queue.get(block=False)
syslog.syslog('Processing completed')
def main():
try:
Shearline().main(sys.argv[1:])
except CommandError, e:
print >> sys.stderr, e
sys.exit(1)