-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcache.py
130 lines (111 loc) · 4.42 KB
/
cache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import logging
from datetime import timedelta
from typing import Union
from obspy import read_inventory
from obspy.core.inventory import Channel, Network
from obspy.core.inventory.inventory import Inventory
from requests import HTTPError
from apps.restriction import Epoch, Restriction
from apps.redis_client import RedisClient
from config import Config
logging.basicConfig(
handlers=[logging.StreamHandler()],
level=logging.INFO,
format=f"[%(asctime)s] [0] [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S +0000",
)
logger = logging.getLogger(__name__)
class Cache:
def __init__(self):
self._config = Config()
self._inv = {}
@staticmethod
def _is_obspy_restricted(cha_or_net: Union[Network, Channel]) -> Restriction:
if cha_or_net.restricted_status == "open":
return Restriction.OPEN
elif cha_or_net.restricted_status == "closed":
return Restriction.RESTRICTED
else:
return None
def build_cache(self):
logger.info(f"Getting inventory from FDSNWS-Station...")
inventory = Inventory()
try:
url = f"{self._config.FDSNWS_STATION_URL}?level=network"
cat = read_inventory(url)
logger.info(
"Harvesting {} from {}: {}".format(
len(cat.networks), url, ",".join([n.code for n in cat.networks])
)
)
except HTTPError as err:
logger.exception(err)
return
try:
for n in cat:
# Get inventory from FDSN:
url = (
f"{self._config.FDSNWS_STATION_URL}?network={n.code}&level=channel"
)
i = read_inventory(url)
inventory.networks += i.networks
logger.info(
"Added network {} with {} stations: {}".format(
i.networks[0].code,
len(i.networks[0].stations),
",".join([s.code for s in i.networks[0].stations]),
)
)
except HTTPError as err:
logger.exception(err)
return
# Read ObsPy inventory
for net in inventory:
for sta in net:
for cha in sta:
epoch = Epoch(
net.code,
sta.code,
cha.location_code,
cha.code,
(cha.start_date.date if cha.start_date else None),
(cha.end_date.date if cha.end_date else None),
)
seed_id = epoch.seed_id
if seed_id not in self._inv:
self._inv[seed_id] = []
if seed_id in self._inv:
logger.debug(
"Repeated channel: %s %s %s",
seed_id,
epoch.start,
epoch.end,
)
cha_status = self._is_obspy_restricted(cha)
if cha_status is not None:
epoch.restriction = cha_status
else:
# Go up to network level
net_status = self._is_obspy_restricted(net)
if net_status is not None:
epoch.restriction = net_status
else:
logger.debug("%s defaulting to OPEN", seed_id)
epoch.restriction = Restriction.OPEN
self._inv[seed_id].append(epoch)
# Sort epochs by start date
for seed_id in self._inv:
self._inv[seed_id].sort(key=lambda epoch: epoch.start)
# Fill missing end dates
for i in range(len(self._inv[seed_id]) - 1):
if self._inv[seed_id][i].end is None:
self._inv[seed_id][i].end = self._inv[seed_id][
i + 1
].start - timedelta(days=1)
# Store inventory in shared memcache instance
rc = RedisClient(self._config.CACHE_HOST, self._config.CACHE_PORT)
rc.set(self._config.CACHE_INVENTORY_KEY, self._inv)
logger.info(f"Completed caching inventory from FDSNWS-Station")
if __name__ == "__main__":
cache = Cache()
cache.build_cache()