-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathassets.py
98 lines (76 loc) · 3.11 KB
/
assets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import datetime
import json
import os
import urllib.request
import ijson
import pandas as pd
import zstandard
from dagster import MetadataValue, Output, asset
from .resources import SpacescopeResource
@asset(compute_kind="python")
def raw_datacapstats_verified_clients() -> Output[pd.DataFrame]:
"""
Verified Clients information from Datacapstats API.
"""
url = "https://api.datacapstats.io/api/getVerifiedClients"
data = pd.read_json(url, typ="series")["data"]
df = pd.json_normalize(data)
df["allowanceArray"] = df["allowanceArray"]
return Output(df, metadata={"Sample": MetadataValue.md(df.sample(5).to_markdown())})
@asset(compute_kind="python")
def raw_storage_providers_location_provider_quest() -> Output[pd.DataFrame]:
"""
Storage Providers location information from Provider Quest (https://provider.quest).
"""
url = "https://geoip.feeds.provider.quest/synthetic-locations-latest.json"
all_df = pd.read_json(url, typ="series")
df = pd.json_normalize(all_df["providerLocations"])
return Output(df, metadata={"Sample": MetadataValue.md(df.sample(5).to_markdown())})
@asset(compute_kind="API")
def raw_storage_provider_daily_power(
spacescope_api: SpacescopeResource,
) -> Output[pd.DataFrame]:
"""
Storage Providers daily power from Spacescope API.
"""
FILECOIN_FIRST_DAY = datetime.date(2020, 10, 15)
today = datetime.date.today()
latest_day = today - datetime.timedelta(days=2)
df_power_data = pd.DataFrame()
for day in pd.date_range(FILECOIN_FIRST_DAY, latest_day, freq="d"):
power_data = spacescope_api.get_storage_provider_power(
date=day.strftime("%Y-%m-%d"), storage_provider=None
)
df_power_data = pd.concat(
[df_power_data, pd.DataFrame(power_data)], ignore_index=True
)
return Output(
df_power_data,
metadata={"Sample": MetadataValue.md(df_power_data.sample(5).to_markdown())},
)
@asset(compute_kind="python")
def raw_filecoin_state_market_deals(context) -> None:
"""
State Market Deals snapshot from Gliff S3 JSON.
"""
urllib.request.urlretrieve(
"https://marketdeals.s3.amazonaws.com/StateMarketDeals.json.zst",
"/tmp/StateMarketDeals.json.zst",
)
context.log.info("Downloaded StateMarketDeals.json.zst")
dctx = zstandard.ZstdDecompressor()
input_path = "/tmp/StateMarketDeals.json.zst"
output_path = "/tmp/ParsedStateMarketDeals.json"
# jq --stream -c 'fromstream(1|truncate_stream(inputs))' /tmp/StateMarketDeals.json.zst > /tmp/ParsedStateMarketDeals.json
with open(input_path, "rb") as ifh, open(output_path, "wb") as ofh:
reader = dctx.stream_reader(ifh)
for k, v in ijson.kvitems(reader, ""):
v["DealID"] = k
ofh.write(json.dumps(v).encode("utf-8") + b"\n")
context.log.info("Decompressed and parsed StateMarketDeals.json.zst")
# Remove the input file
os.remove("/tmp/StateMarketDeals.json.zst")
# Compress the parsed file
os.system(
"zstd --rm -q -f -T0 /tmp/ParsedStateMarketDeals.json -o /tmp/ParsedStateMarketDeals.json.zst"
)