-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun-consumer_calibration.py
141 lines (112 loc) · 5.04 KB
/
run-consumer_calibration.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#!/usr/bin/python
# -*- coding: UTF-8
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/. */
# Authors:
# Michael Berg-Mohnicke <[email protected]>
#
# Maintainers:
# Currently maintained by the authors.
#
# This file has been created at the Institute of
# Landscape Systems Analysis at the ZALF.
# Copyright (C: Leibniz Centre for Agricultural Landscape Research (ZALF)
import capnp
from collections import defaultdict
from datetime import datetime
import json
import os
from pathlib import Path
import sys
import zmq
PATH_TO_REPO = Path(os.path.realpath(__file__)).parent
PATH_TO_MAS_INFRASTRUCTURE_REPO = PATH_TO_REPO / "../mas-infrastructure"
PATH_TO_PYTHON_CODE = PATH_TO_MAS_INFRASTRUCTURE_REPO / "src/python"
if str(PATH_TO_PYTHON_CODE) not in sys.path:
sys.path.insert(1, str(PATH_TO_PYTHON_CODE))
from pkgs.common import common
PATH_TO_CAPNP_SCHEMAS = (PATH_TO_MAS_INFRASTRUCTURE_REPO / "capnproto_schemas").resolve()
abs_imports = [str(PATH_TO_CAPNP_SCHEMAS)]
fbp_capnp = capnp.load(str(PATH_TO_CAPNP_SCHEMAS / "fbp.capnp"), imports=abs_imports)
PATHS = {
"remoteConsumer-remoteMonica": {
"path-to-data-dir": "./data/",
"path-to-output-dir": "/out/out/",
"path-to-csv-output-dir": "/out/csv-out/"
}
}
def run_consumer(server=None, port=None):
"""collect data from workers"""
config = {
"mode": "remoteConsumer-remoteMonica",
"port": port if port else "7777", # local 7778, remote 7777
"server": server if server else "login01.cluster.zalf.de",
"writer_sr": None,
"path_to_out": "out/",
"timeout": 600000 # 10min
}
common.update_config(config, sys.argv, print_config=True, allow_new_keys=False)
path_to_out_file = config["path_to_out"] + "/consumer.out"
if not os.path.exists(config["path_to_out"]):
try:
os.makedirs(config["path_to_out"])
except OSError:
print("run-calibration-consumer.py: Couldn't create dir:", config["path_to_out"], "!")
with open(path_to_out_file, "a") as _:
_.write(f"config: {config}\n")
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.connect("tcp://" + config["server"] + ":" + config["port"])
socket.RCVTIMEO = config["timeout"]
nuts3_region_id_to_year_to_yields = defaultdict(lambda: defaultdict(list))
conman = common.ConnectionManager()
writer = conman.try_connect(config["writer_sr"], cast_as=fbp_capnp.Channel.Writer, retry_secs=1) #None
envs_received = 0
no_of_envs_expected = None
while True:
try:
msg: dict = socket.recv_json() # encoding="latin-1"
custom_id = msg["customId"]
if "no_of_sent_envs" in custom_id:
no_of_envs_expected = custom_id["no_of_sent_envs"]
else:
envs_received += 1
#with open(path_to_out_file, "a") as _:
# _.write(f"received result customId: {custom_id}\n")
#print("received result customId:", custom_id)
nuts3_region_id = custom_id["nuts3_region_id"]
for data in msg.get("data", []):
results = data.get("results", [])
for vals in results:
if "Year" in vals:
nuts3_region_id_to_year_to_yields[nuts3_region_id][int(vals["Year"])].append(vals["Yield"]*1.16)
if no_of_envs_expected == envs_received and writer:
with open(path_to_out_file, "a") as _:
_.write(f"{datetime.now()} last expected env received\n")
#print("last expected env received")
nuts3_region_id_and_year_to_avg_yield = {}
for nuts3_region_id, rest in nuts3_region_id_to_year_to_yields.items():
for year, yields in rest.items():
no_of_yields = len(yields)
if no_of_yields > 0:
nuts3_region_id_and_year_to_avg_yield[f"{nuts3_region_id}|{year}"] = sum(yields) / no_of_yields
out_ip = fbp_capnp.IP.new_message(content=json.dumps(nuts3_region_id_and_year_to_avg_yield))
writer.write(value=out_ip).wait()
# reset and wait for next round
nuts3_region_id_to_year_to_yields.clear()
no_of_envs_expected = None
envs_received = 0
except zmq.error.Again as _e:
with open(path_to_out_file, "a") as _:
_.write(f"no response from the server (with {socket.RCVTIMEO} ms timeout)\n")
print('no response from the server (with "timeout"=%d ms) ' % socket.RCVTIMEO)
continue
except Exception as e:
with open(path_to_out_file, "a") as _:
_.write(f"Exception: {e}\n")
print("Exception:", e)
break
#print("exiting run_consumer()")
if __name__ == "__main__":
run_consumer()