forked from openshift/origin-server
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathoo-admin-upgrade
1092 lines (942 loc) · 39.4 KB
/
oo-admin-upgrade
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env oo-ruby
require 'rubygems'
require 'pp'
require 'thread'
require 'getoptlong'
require 'stringio'
require 'set'
require 'json'
require 'thread'
require 'thor'
module OpenShift
module Upgrader
##
# A simple base class for objects which are likely to be marshalled to and
# from JSON files. Allows for consistent construction with both symbol-based
# option hashes and JSON-derived hashes with string keys.
#
class UpgradePersistable
def initialize(args)
args.each{|k, v| send("#{k.to_s}=", v)}
end
def jsonish_self
hash = {}
instance_variables.each {|v| hash[v.to_s[1..-1]] = instance_variable_get(v)}
hash
end
def to_json
JSON.dump(jsonish_self)
end
def self.from_json(json)
self.new(JSON.load(json))
end
def self.from_json_file(file)
self.new(JSON.parse(IO.read(file)))
end
def self.entries_from_json_array_file(file)
entries = []
File.readlines(file).each do |json|
entries << self.from_json(json)
end
entries
end
end
##
# High level metadata about the portion of the cluster
# being upgraded by this upgrader (e.g. the subset of
# nodes to be upgraded).
#
class ClusterMetadata < UpgradePersistable
attr_accessor :logins_count,
:upgrader_position_nodes,
:times
def initialize(args = {})
super(args)
@logins_count ||= 0
@times ||= {}
@upgrader_position_nodes ||= []
end
end
##
# An individual node queue entry representing a specific
# node on which an upgrade should be performed.
#
class NodeQueueEntry < UpgradePersistable
attr_accessor :server_identity,
:active_gear_length,
:inactive_gear_length,
:gears_length,
:version
def initialize(args)
super(args)
@gears_length ||= 0
@active_gear_length ||= 0
@inactive_gear_length ||= 0
end
end
##
# An individual gear queue entry representing a specific
# gear on a node to be upgraded, and all the metadata
# necessary to feed back into a separate Upgrader instance
# to execute the work.
#
class GearQueueEntry < UpgradePersistable
attr_accessor :server_identity,
:version,
:gear_uuid,
:gear_name,
:app_name,
:login,
:active
def initialize(args)
super(args)
end
end
##
# A summary object containing the overall cluster upgrade
# result metadata across all nodes for a given upgrade
# attempt.
#
class ClusterUpgradeResult < UpgradePersistable
attr_accessor :times,
:gear_count,
:gear_counts,
:cluster_metadata,
:starting_node_queue,
:incomplete_node_queue
def initialize(args = {})
super(args)
@times ||= {}
@gear_count ||= 0
@gear_counts ||= []
@starting_node_queue ||= []
@incomplete_node_queue ||= []
end
end
##
# A summary object containing the upgrade result metadata
# from a single gear upgrade attempt.
#
class GearUpgradeResult < UpgradePersistable
attr_accessor :login,
:plan_id,
:active,
:app_name,
:gear_uuid,
:namespace,
:hostname,
:version,
:errors,
:warnings,
:times,
:attempts,
:remote_upgrade_result,
:remote_exit_code
def initialize(args = {})
super(args)
@attempts ||= []
@errors ||= []
@warnings ||= []
@times ||= {}
end
end
end
end
class Upgrader
WORK_DIR = '/var/log/openshift/broker/upgrade'
##
# The primary upgrade method. Executes cluster-wide upgrades using the following process:
#
# 1. Create or reuses an node queue representing the ordered list of nodes in the cluster to upgrade,
# as well as high level metadata about the cluster.
#
# Part of the node queue creation includes the creation of gear queues for each node containing
# gears to upgrade.
#
# 2. Create worker threads to process each node in the node queue. These workers process gear queues
# to perform the actual gear upgrades. Results are written to files on disk per node. Errors are
# written to a different file, and also to a re-run gear queue.
#
# 3. Recreate the node queue with any incompletely upgraded nodes. A node upgrade is considered incomplete
# if any gearss in the node's gear queue weren't processed at all, or if any of the processed gears
# resulted in errors.
#
# 4. Generate reports based on all output and emit them to stdout.
#
# The upgrade process is meant to be re-entrant. If an existing node queue exists, existing gear queues
# for those nodes will continue processing. If only rerun gear queues exist, processing will proceed using
# the rerun queues (effectively re-running the errors from the previous run). If no gear queues at all exist
# for the node, the upgrade for that node is considered complete and no processing will occur for the node.
#
# The upgrade is considered a success if there are no nodes left in the node queue.
#
# Expects an argument hash:
#
# :version - The target upgrade version.
#
# :ignore_cartridge_version - Passed through to gear level upgrades; if +true+, upgrades
# will be performed regardless of the source and target versions.
# Default is +false+.
#
# :target_server_identity - If present, the node queue will be forced to contain only the
# specified node regardless of any computations.
#
# :num_upgraders - The total number of upgrader processes intended to be run. Must be used in
# conjunction with +upgrade_position+; +num_upgraders+ defines the number of
# slots which each upgrader process must assign itself to. All positions must
# be taken to upgrade.
#
# :upgrade_position - Position of this upgrader (1 based) amongst the num of upgraders (+num_upgraders+)
#
# :max_threads - The maximum numbers of threads to use when processing nodes from the node queue.
# Default is +12+.
#
# Returns a +ClusterUpgradeResult+ containing information about this upgrade execution.
#
def upgrade(args={})
defaults = {
version: nil,
ignore_cartridge_version: false,
target_server_identity: nil,
upgrade_position: 1,
num_upgraders: 1,
max_threads: 12,
gear_whitelist: [],
num_tries: 2
}
opts = defaults.merge(args) {|k, default, arg| arg.nil? ? default : arg}
version = opts[:version]
ignore_cartridge_version = opts[:ignore_cartridge_version]
target_server_identity = opts[:target_server_identity]
upgrade_position = opts[:upgrade_position]
num_upgraders = opts[:num_upgraders]
max_threads = opts[:max_threads]
gear_whitelist = opts[:gear_whitelist]
num_tries = opts[:num_tries]
puts "Upgrader started with options: #{opts.inspect}"
FileUtils.mkdir_p WORK_DIR if not Dir.exists?(WORK_DIR)
upgrade_result = OpenShift::Upgrader::ClusterUpgradeResult.new
start_time = current_time
upgrade_result.times["start"] = start_time
# only rebuild the queues and meta if none exists
if File.exists?(cluster_metadata_path)
puts "Reusing existing queues and cluster metadata"
else
puts "Building new upgrade queues and cluster metadata"
create_upgrade_queues(target_server_identity: target_server_identity,
upgrade_position: upgrade_position,
num_upgraders: num_upgraders,
version: version,
gear_whitelist: gear_whitelist)
end
raise "Couldn't find node queue at #{node_queue_path}" unless File.exists?(node_queue_path)
raise "Couldn't find cluster metadata at #{cluster_metadata_path}" unless File.exists?(cluster_metadata_path)
puts "Loading cluster metadata from #{cluster_metadata_path}"
metadata = OpenShift::Upgrader::ClusterMetadata.from_json_file(cluster_metadata_path)
puts "Loading node queue from #{node_queue_path}"
node_queue = OpenShift::Upgrader::NodeQueueEntry.entries_from_json_array_file(node_queue_path)
# Bail out if there's nothing to do
if node_queue.empty?
puts "Node queue is empty; there's nothing for the upgrader to do. Exiting."
return upgrade_result
end
upgrade_result.starting_node_queue = OpenShift::Upgrader::NodeQueueEntry.entries_from_json_array_file(node_queue_path)
upgrade_result.cluster_metadata = metadata
if !metadata.upgrader_position_nodes.empty?
puts "#####################################################"
puts 'Nodes this upgrader is handling:'
puts metadata.upgrader_position_nodes.pretty_inspect
puts "#####################################################"
end
# build a work queue that ensures active gears are processed across all
# nodes prior to any inactive gears being upgraded
work_queue = Queue.new
node_queue.each {|node| work_queue << { node: node, active_only: true }}
node_threads = []
completed_node_count = 0
# consume the work across the max number of threads
max_threads.times do |thread_num|
node_threads << Thread.new do
while completed_node_count != node_queue.length
begin
work = work_queue.pop(true)
rescue ThreadError => e
# wait for more work
sleep 1
next
end
node = work[:node]
active_only = work[:active_only]
upgrade_result.gear_counts[thread_num] = node.gears_length
# perform the node upgrade
begin
upgrade_node(node.server_identity, active_only, ignore_cartridge_version, num_tries)
rescue Exception => e
puts e.message
puts e.backtrace.join("\n")
end
# queue the remaining (inactive) work for the node if we
# already did a pass at just the active gears
if active_only && File.exists?(gear_queue_path(node.server_identity))
work_queue << { node: node, active_only: false }
else
completed_node_count += 1
upgrade_result.gear_count += node.gears_length
puts "#{completed_node_count} of #{node_queue.length} nodes completed"
end
end
end
end
# wait for all the threads to finish
node_threads.each {|t| t.join}
# rewrite the node queue with any leftover work
node_queue.each do |node|
upgrade_result.incomplete_node_queue << node if node_has_remaining_work?(node.server_identity)
end
puts "Writing updated node queue to #{node_queue_path}"
timestamp = Time.now.strftime("%Y-%m-%d-%H%M%S")
FileUtils.mv(node_queue_path, "#{node_queue_path}-#{timestamp}")
FileUtils.touch node_queue_path
upgrade_result.incomplete_node_queue.each do |node|
append_to_file(node_queue_path, node.to_json)
end
upgrade_result.times["total"] = current_time - start_time
# print a report
puts build_text_summary(upgrade_result)
upgrade_result
end
##
# Returns +true+ if the given node has any outstanding gear queues.
#
def node_has_remaining_work?(server_identity)
return File.exists?(gear_queue_path(server_identity)) || File.exists?(gear_rerun_queue_path(server_identity))
end
##
# Finds all the nodes containing gears to be upgraded, and writes:
#
# 1. A JSON dump of a +ClusterMetadata+ instance
#
# 2. A node queue file where each line contains JSON dump of a +NodeQueueEntry+ representing
# the ordered list of nodes to be upgraded
#
# 3. A gear queue file per node, where each line in the queue file is a JSON dump of a
# +GearQueueEntry+ representing a gear to be upgraded.
#
# Expects an argument hash with data to store along with each node queue entry used to construct
# the gear level node queues. Each argument is typically just passed through from +upgrade+; see
# that method for details.
#
# :version
# :ignore_cartridge_version
# :target_server_itentity
# :upgrade_position
# :num_upgraders
# :max_threads
#
def create_upgrade_queues(opts)
raise "Node queue file already exists at #{node_queue_path}" if File.exists?(node_queue_path)
FileUtils.touch(node_queue_path)
target_server_identity = opts[:target_server_identity]
upgrade_position = opts[:upgrade_position]
num_upgraders = opts[:num_upgraders]
version = opts[:version]
gear_whitelist = opts[:gear_whitelist]
metadata = OpenShift::Upgrader::ClusterMetadata.new
puts "Getting all active gears..."
gather_active_gears_start_time = current_time
active_gears_map = OpenShift::ApplicationContainerProxy.get_all_active_gears
metadata.times["gather_active_gears_total_time"] = current_time - gather_active_gears_start_time
puts "Getting all logins..."
gather_users_start_time = current_time
query = {"gears.0" => {"$exists" => true}}
options = {:fields => [
"domain_id",
"name",
"gears.uuid",
"gears.server_identity",
"gears.name"],
:timeout => false}
ret = []
user_map = {}
OpenShift::DataStore.find(:cloud_users, {}, {:fields => ['_id', 'uuid', 'login'], :timeout => false}) do |hash|
metadata.logins_count += 1
user_uuid = hash['uuid']
user_login = hash['login']
user_map[hash['_id'].to_s] = [user_uuid, user_login]
end
domain_map = {}
OpenShift::DataStore.find(:domains, {}, {:fields => ["_id" , "owner_id"], :timeout => false}) do |hash|
domain_map[hash['_id'].to_s] = hash['owner_id'].to_s
end
node_to_gears = {}
OpenShift::DataStore.find(:applications, query, options) do |app|
user_id = domain_map[app['domain_id'].to_s]
if user_id.nil?
relocated_domain = Domain.where(_id: Moped::BSON::ObjectId(app['domain_id'])).first
next if relocated_domain.nil?
user_id = relocated_domain.owner._id.to_s
user_uuid = user_id
user_login = relocated_domain.owner.login
else
if user_map.has_key? user_id
user_uuid,user_login = user_map[user_id]
else
relocated_user = CloudUser.where(_id: Moped::BSON::ObjectId(user_id)).first
next if relocated_user.nil?
user_uuid = relocated_user._id.to_s
user_login = relocated_user.login
end
end
app['gears'].each do |gear|
server_identity = gear['server_identity']
if server_identity && (!target_server_identity || (server_identity == target_server_identity))
node_to_gears[server_identity] = [] unless node_to_gears[server_identity]
node_to_gears[server_identity] << {:server_identity => server_identity, :uuid => gear['uuid'], :name => gear['name'], :app_name => app['name'], :login => user_login}
end
end
end
metadata.times["gather_users_total_time"] = current_time - gather_users_start_time
position = upgrade_position - 1
if num_upgraders > 1
server_identities = node_to_gears.keys.sort
server_identities.each_with_index do |server_identity, index|
if index == position
metadata.upgrader_position_nodes << server_identity
position += num_upgraders
else
node_to_gears.delete(server_identity)
end
end
end
# populate the node queue and for persistence
node_queue = []
node_to_gears.each do |server_identity, gears|
node_to_gears[server_identity] = nil
break if gears.empty?
# build the node for the queue with just metadata about the
# node and gears
node = OpenShift::Upgrader::NodeQueueEntry.new({
server_identity: server_identity,
version: version,
active_gear_length: 0,
inactive_gear_length: 0
})
# sort gears by active/inactive
active_gears = []
inactive_gears = []
gears.each do |gear|
if gear_whitelist && gear_whitelist.length > 0
puts "Gear #{gear[:uuid]} is not in the whitelist and will be skipped"
next unless gear_whitelist.include?(gear[:uuid])
end
if active_gears_map.include?(server_identity) && active_gears_map[server_identity].include?(gear[:uuid])
gear[:active] = true
active_gears << gear
else
gear[:active] = false
inactive_gears << gear
end
end
node.active_gear_length = active_gears.length
node.inactive_gear_length = inactive_gears.length
node.gears_length = gears.length
node_queue << node
# ensure we can process active gears first
write_gear_queue(node, active_gears) unless active_gears.empty?
write_gear_queue(node, inactive_gears) unless inactive_gears.empty?
end
node_to_gears.clear
# process the largest nodes first
node_queue = node_queue.sort_by { |node| node.active_gear_length }
puts "Writing node queue to #{node_queue_path}"
node_queue.each do |node|
append_to_file(node_queue_path, node.to_json)
end
puts "Writing cluster metadata to #{cluster_metadata_path}"
append_to_file(cluster_metadata_path, metadata.to_json)
end
##
# Writes a gear queue to a file in a consolidated format containing
# one line per gear queue entry, where each line is a JSON dump of
# a +GearQueueEntry+ instance.
#
def write_gear_queue(node, gears)
gear_queue_file = gear_queue_path(node.server_identity)
puts "Writing #{gears.length} entries to gear queue for node #{node.server_identity} at #{gear_queue_file}"
gears.each do |gear|
queue_entry = OpenShift::Upgrader::GearQueueEntry.new({
server_identity: node.server_identity,
version: node.version,
gear_uuid: gear[:uuid],
gear_name: gear[:name],
app_name: gear[:app_name],
login: gear[:login],
active: gear[:active]
})
append_to_file(gear_queue_file, queue_entry.to_json)
end
end
##
# Performs the a node upgrade of the specified +server_identity+ by setting up the output
# files and forking a call back to +upgrade_node_from_gear_queue_file+.
#
# The upgrade logic will continue or retry failures depending on the presence of the appropriate
# gear queue files, and will do nothing if there are no queues to process.
#
def upgrade_node(server_identity, active_only, ignore_cartridge_version=false, num_tries=2)
raise "No server identity specified" unless server_identity
gear_queue_file = gear_queue_path(server_identity)
rerun_queue_file = gear_rerun_queue_path(server_identity)
error_file = error_file_path(server_identity)
# if there's no queue of any sort, there's nothing to do on this node
unless File.exists?(gear_queue_file) || File.exists?(rerun_queue_file)
puts "Nothing to migrate for node #{server_identity}; no gear queue or rerun queue files exist"
return
end
if File.exists?(gear_queue_file)
puts "Upgrading node #{server_identity} from gear queue file at #{gear_queue_file}"
elsif File.exists?(rerun_queue_file)
puts "Re-running failures from previous upgrade of node #{server_identity} from rerun gear queue at #{rerun_queue_file}"
timestamp = Time.now.strftime("%Y-%m-%d-%H%M%S")
archive_error_file = "#{error_file}_#{timestamp}"
puts "Archiving previous error file from #{error_file} to #{archive_error_file}"
FileUtils.mv error_file, archive_error_file
# convert the rerun queue into the new normal queue
FileUtils.mv rerun_queue_file, gear_queue_file
end
# fork the call to +upgrade_node_from_gear_queue_file+ (TODO: revisit this later; the
# fork is apparently used to work around long-forgotten MCollective threading issues)
upgrade_node_cmd = "#{__FILE__} upgrade-from-file --upgrade-file '#{gear_queue_file}' --active-only #{active_only} --num-tries #{num_tries}"
upgrade_node_cmd += " --ignore-cartridge-version" if ignore_cartridge_version
execute_script(upgrade_node_cmd)
end
##
# Processes JSON-serialized +GearQueueEntry+, one per line in the given gear queue +file+.
#
# Successful upgrade results are written as JSON dumps of +GearUpgradeResult+ instances
# to +results_file+.
#
# Results containing errors are written as JSON dumps of +GearUpgradeResult+ instances to
# +error_file+ only after two retries have been attempted. In addition, when a result
# contains errors, the source +GearQueueEntry+ is written as JSON to +gear_rerun_queue_path+
# to facilitate reruns of past errors.
#
# Each time an result is written to disk, the source line from +file+ is deleted. When
# all lines are processed, the input file is deleted.
#
# This method returns nothing; callers must inspect the result file contents for
# upgrade details.
#
def upgrade_node_from_gear_queue_file(file, active_only=false, ignore_cartridge_version=false, num_tries=2)
File.open(file, 'r').each_line do |queue_entry_json|
next if queue_entry_json.nil? || queue_entry_json.empty?
gear = OpenShift::Upgrader::GearQueueEntry.from_json(queue_entry_json)
next if active_only && !gear.active
log_file = log_file_path(gear.server_identity)
results_file = results_file_path(gear.server_identity)
error_file = error_file_path(gear.server_identity)
rerun_file = gear_rerun_queue_path(gear.server_identity)
append_to_file(log_file, "Migrating app '#{gear.app_name}' gear '#{gear.gear_name}' with uuid '#{gear.gear_uuid}' on node '#{gear.server_identity}' for user: #{gear.login}")
attempts_results = []
# start the upgrades in a retry loop
(1..num_tries).each do |i|
# perform the upgrade
result = upgrade_gear(gear.login, gear.app_name, gear.gear_uuid, gear.version, ignore_cartridge_version, gear.active)
# write the results if all is well
if result.errors.empty?
if !attempts_results.empty?
result.attempts = attempts_results
end
append_to_file(results_file, result.to_json)
break
end
# dump the error to disk if the retry limit is hit and we're still failing
if i == num_tries
if !attempts_results.empty?
result.attempts = attempts_results
end
result.errors << "Failed upgrade after #{num_tries} tries"
append_to_file(error_file, result.to_json)
append_to_file(rerun_file, gear.to_json)
break
else
attempts_results << result.remote_upgrade_result
end
# verify the user still exists
user = nil
begin
user = CloudUser.with(consistency: :eventual).find_by(login: gear.login)
rescue Mongoid::Errors::DocumentNotFound
end
# if not, throw a warning and move on
unless user && Application.find_by_user(user, gear.app_name)
result.warnings << "App '#{gear.app_name}' no longer found in datastore with uuid '#{gear.gear_uuid}'. Ignoring..."
append_to_file(results_file, result.to_json)
break
end
# if so, we're ready for a retry
sleep 4
end
# The gear has been processed; delete the entry from the file. Inefficient, but
# it only happens between upgrades which are measured in minutes, and the last
# non-skipped gear should be near the head of the file anyway. Consider using
# a database rather than a flat file at a later time.
gear_line = 1
File.open(file, 'r').each_line do |line|
gear_to_delete = OpenShift::Upgrader::GearQueueEntry.from_json(line)
if gear_to_delete.gear_uuid == gear.gear_uuid
`sed -i '#{gear_line},#{gear_line}d' #{file}`
break
end
gear_line += 1
end
end
# double-check to ensure all lines were processed, and remove
# the input file if we're all done.
FileUtils.rm_f file if `wc -l #{file}`.to_i == 0
end
##
# Performs a gear upgrade via an RPC call to MCollective on a remote node.
#
# Returns a +GearUpgradeResult+, which also including the (decoded) remote
# upgrade JSON containing the gear level upgrade details.
#
# NOTE: All exceptions are trapped and added to the +GearUpgradeResult.errors+.
# This method should always return the hash and should never bubble exceptions.
#
def upgrade_gear(login, app_name, gear_uuid, version, ignore_cartridge_version=false, active=false)
gear_result = OpenShift::Upgrader::GearUpgradeResult.new({
login: login,
app_name: app_name,
gear_uuid: gear_uuid,
version: version
})
total_upgrade_gear_start_time = current_time
begin
user = nil
begin
user = CloudUser.with(consistency: :eventual).find_by(login: login)
rescue Mongoid::Errors::DocumentNotFound
end
raise "User not found: #{login}" unless user
gear_result.plan_id = user.plan_id
gear_result.active = active
app, gear = Application.find_by_gear_uuid(gear_uuid)
gear_result.warnings << "App '#{app_name}' not found" unless app
gear_result.warnings << "Gear not found with uuid #{gear_uuid} for app '#{app_name}' and user '#{login}'" unless gear
if app && gear
server_identity = gear.server_identity
gear_result.hostname = server_identity
gear_result.namespace = app.domain.namespace
remote_result_json = nil
remote_output = nil
response_processed = false
upgrade_on_node_start_time = current_time
upgrade_args = {
:uuid => gear_uuid,
:namespace => app.domain.namespace,
:app_uuid => app.uuid,
:version => version,
:scalable => app.scalable,
:ignore_cartridge_version => ignore_cartridge_version.to_s
}
Timeout::timeout(420) do
OpenShift::MCollectiveApplicationContainerProxy.rpc_exec('openshift', server_identity) do |client|
client.upgrade(upgrade_args) do |response|
remote_result_json = response[:body][:data][:upgrade_result_json]
remote_output = response[:body][:data][:output]
response_processed = true
end
end
end
if response_processed
if remote_result_json.nil?
gear_result.errors << "No upgrade result JSON was returned in the upgrade response. The outcome of the upgrade request is unknown. "\
"The following output was returned: #{remote_output}"
else
gear_result.remote_upgrade_result = JSON.load(remote_result_json)
if gear_result.remote_upgrade_result['upgrade_complete']
directives = gear_result.remote_upgrade_result['broker_directives']
directives.each do |command|
res = ResultIO.new
res.parse_output(command, gear_uuid)
app.process_commands(res, nil, gear)
end
app.execute_connections unless directives.empty?
else
gear_result.errors << "Gear upgrade result is marked incomplete"
end
end
else
gear_result.errors << "The MCollective upgrade execution didn't explicitly fail, but the response handler was not invoked. "\
"The outcome of the upgrade request is unknown."
end
gear_result.times["time_upgrade_on_node_measured_from_broker"] = current_time - upgrade_on_node_start_time
end
rescue Timeout::Error => e
gear_result.errors << "Timed out waiting on a response from the MCollective upgrade call"
rescue Exception => e
gear_result.errors << "Upgrade failed with an unhandled exception: #{e.message}\n#{e.backtrace.join("\n")}"
end
total_upgrade_gear_time = current_time - total_upgrade_gear_start_time
gear_result.times["time_total_upgrade_gear_measured_from_broker"] = total_upgrade_gear_start_time
gear_result
end
##
# Builds a simply textual summary of a +ClusterUpgradeResult+.
#
# Returns the report as a string.
#
def build_text_summary(upgrade_result)
buf = []
leftover_gears_per_node = {}
failed_gears_per_node = {}
failed_gears_per_plan = {}
failed_gears_per_state = {}
upgrade_result.starting_node_queue.each do |node|
server_identity = node.server_identity
gear_queue_file = gear_queue_path(server_identity)
gear_rerun_queue_file = gear_rerun_queue_path(server_identity)
if File.exists?(gear_queue_file)
leftover_gears_per_node[server_identity] = `wc -l #{gear_queue_file}`.to_i
end
if File.exists?(gear_rerun_queue_file)
failed_gears_per_node[server_identity] = `wc -l #{gear_rerun_queue_file}`.to_i
end
error_file_path = error_file_path(server_identity)
if File.exists? error_file_path
failed_gurs = OpenShift::Upgrader::GearUpgradeResult.entries_from_json_array_file(error_file_path)
failed_gurs.each do |failed_gur|
if failed_gur.plan_id
failed_gears_per_plan[failed_gur.plan_id] = 0 unless failed_gears_per_plan[failed_gur.plan_id]
failed_gears_per_plan[failed_gur.plan_id] += 1
end
state = failed_gur.active ? 'Active' : 'Inactive'
failed_gears_per_state[state] = 0 unless failed_gears_per_state[state]
failed_gears_per_state[state] += 1
end
end
end
failed_gears_total = failed_gears_per_node.values.inject(0, :+)
leftover_gears_total = leftover_gears_per_node.values.inject(0, :+)
if leftover_gears_per_node.length > 0 || failed_gears_per_node.length > 0
buf << "!!!!!!!!!!WARNING!!!!!!!!!!!!!WARNING!!!!!!!!!!!!WARNING!!!!!!!!!!"
buf << "The upgrade was incomplete due to unprocessed or failed gears"
buf << "remaining in node gear queues:"
unless leftover_gears_per_node.empty?
buf << ""
buf << "#{leftover_gears_total} unprocessed gears:"
leftover_gears_per_node.each do |server_identity, count|
buf << " #{server_identity}: #{count}"
end
end
unless failed_gears_per_node.empty?
buf << ""
buf << "#{failed_gears_total} failed gears:"
failed_gears_per_node.each do |server_identity, count|
buf << " #{server_identity}: #{count}"
end
end
buf << ""
buf << "You can run the upgrade again with the same arguments to continue."
buf << "!!!!!!!!!!WARNING!!!!!!!!!!!!!WARNING!!!!!!!!!!!!WARNING!!!!!!!!!!"
end
upgrader_position_nodes = upgrade_result.cluster_metadata.upgrader_position_nodes
buf << "#####################################################"
buf << "Summary:"
buf << "# of users: #{upgrade_result.cluster_metadata.logins_count}"
buf << "# of gears: #{upgrade_result.gear_count}"
buf << "# of failures: #{failed_gears_total}"
if failed_gears_per_plan.any?
buf << "# of failures per plan:"
failed_gears_per_plan.each do |plan_id, count|
buf << " #{plan_id}: #{count}"
end
end
if failed_gears_per_state.any?
buf << "# of failures per state:"
failed_gears_per_state.each do |state, count|
buf << " #{state}: #{count}"
end
end
buf << "# of leftovers: #{leftover_gears_total}"
buf << "Gear counts per thread: #{upgrade_result.gear_counts.pretty_inspect}"
buf << "Nodes upgraded: #{upgrader_position_nodes.pretty_inspect}" if !upgrader_position_nodes.empty?
buf << "Timings:"
upgrade_result.times.each do |topic, time_in_millis|
buf << " #{topic}=#{time_in_millis.to_f/1000}s"
end
buf << "Additional timings:"
upgrade_result.cluster_metadata.times.each do |topic, time_in_millis|
buf << " #{topic}=#{time_in_millis.to_f/1000}s"
end
buf << "#####################################################"
buf.join("\n")
end
def log_file_path(server_identity)
"#{WORK_DIR}/upgrade_log_#{server_identity}"
end
def gear_queue_path(server_identity)
"#{WORK_DIR}/gear_queue_#{server_identity}"
end
def gear_rerun_queue_path(server_identity)
"#{WORK_DIR}/gear_rerun_queue_#{server_identity}"
end
def results_file_path(server_identity)
"#{WORK_DIR}/upgrade_results_#{server_identity}"
end
def error_file_path(server_identity)
"#{WORK_DIR}/upgrade_errors_#{server_identity}"
end
def node_queue_path
"#{WORK_DIR}/node_queue"
end
def cluster_metadata_path
"#{WORK_DIR}/cluster_metadata"
end
##
# Appends +value+ to +filename+.
#
def append_to_file(filename, value)
FileUtils.touch filename unless File.exists?(filename)
file = File.open(filename, 'a')
begin
file.puts value
ensure
file.close
end
end
##
# Executes +cmd+ up to +num_tries+ times waiting for a zero exitcode up with
# a timeout of +timeout+.
#
# Returns [+output+, +exitcode+] of the process.
#
def execute_script(cmd, num_tries=1, timeout=28800)
exitcode = nil
output = ''
(1..num_tries).each do |i|
pid = nil
begin
Timeout::timeout(timeout) do
read, write = IO.pipe
pid = fork {
# child
$stdout.reopen write
read.close
exec(cmd)
}
# parent
write.close
read.each do |line|
output << line
end
Process.waitpid(pid)
exitcode = $?.exitstatus
end
break
rescue Timeout::Error
begin
Process.kill("TERM", pid) if pid
rescue Exception => e
puts "execute_script: WARNING - Failed to kill cmd: '#{cmd}' with message: #{e.message}"
end
puts "Command '#{cmd}' timed out"
raise if i == num_tries
end
end
return output, exitcode
end
def current_time
(Time.now.to_f * 1000).to_i
end
end
class UpgraderCli < Thor
no_tasks do
def with_upgrader
STDOUT.sync, STDERR.sync = true
# Disable analytics for admin scripts