-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.rb
42 lines (39 loc) · 1 KB
/
main.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
require 'grpc'
require 'logger'
require 'map_reduce'
def start_master(file, logger, map_count)
master = MapReduce.new(logger:, map_count:, file:)
grpc_server = GRPC::RpcServer.new
grpc_server.add_http2_port('0.0.0.0:50051', :this_port_is_insecure)
grpc_server.handle(master)
Thread.new do
logger.info('[Master] Master gRPC server start')
grpc_server.run_till_terminated
end
master.wait_for_enough_workers
master.distribute_input
master.map do
proc do |input|
input = input.gsub(/[^\w\s]/, '')
words = input.split(/\s+/)
words.each do |l|
emit_intermediate(l, count: 1)
end
end
end
master.reduce do
proc do |input|
result = input.each_with_object(Hash.new(0)) do |(flag, number), acc|
acc[flag] += number
end
emit(result.to_a[0], count: result.to_a[1])
end
end
master
end
logger = Logger.new($stdout)
master = start_master(Pathname.new('./test/file.txt'), logger, 5)
logger.info(master.data)
loop do
sleep 1
end