-
Notifications
You must be signed in to change notification settings - Fork 194
/
Copy pathckan_pycsw.py
291 lines (223 loc) · 8.67 KB
/
ckan_pycsw.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
import sys
import logging
import datetime
import io
import os
import argparse
from configparser import SafeConfigParser
import requests
from lxml import etree
from pycsw.core import metadata, repository, util
import pycsw.core.config
import pycsw.core.admin
logging.basicConfig(format="%(message)s", level=logging.INFO)
log = logging.getLogger(__name__)
def setup_db(pycsw_config):
"""Setup database tables and indexes"""
from sqlalchemy import Column, Text
database = pycsw_config.get("repository", "database")
table_name = pycsw_config.get("repository", "table", "records")
ckan_columns = [
Column("ckan_id", Text, index=True),
Column("ckan_modified", Text),
]
pycsw.core.admin.setup_db(
database,
table_name,
"",
create_plpythonu_functions=False,
extra_columns=ckan_columns,
)
def set_keywords(pycsw_config_file, pycsw_config, ckan_url, limit=20):
"""set pycsw service metadata keywords from top limit CKAN tags"""
log.info("Fetching tags from %s", ckan_url)
url = ckan_url + "api/tag_counts"
response = requests.get(url)
tags = response.json()
log.info("Deriving top %d tags", limit)
# uniquify and sort by top limit
tags_unique = [list(x) for x in set(tuple(x) for x in tags)]
tags_sorted = sorted(tags_unique, key=lambda x: x[1], reverse=1)[0:limit]
keywords = ",".join("%s" % tn[0] for tn in tags_sorted)
log.info("Setting tags in pycsw configuration file %s", pycsw_config_file)
pycsw_config.set("metadata:main", "identification_keywords", keywords)
with open(pycsw_config_file, "wb") as configfile:
pycsw_config.write(configfile)
def load(pycsw_config, ckan_url):
database = pycsw_config.get("repository", "database")
table_name = pycsw_config.get("repository", "table", "records")
context = pycsw.core.config.StaticContext()
repo = repository.Repository(database, context, table=table_name)
log.info(
"Started gathering CKAN datasets identifiers: {0}".format(
str(datetime.datetime.now())
)
)
query = 'api/search/dataset?qjson={"fl":"id,metadata_modified,extras_harvest_object_id,extras_metadata_source", "q":"harvest_object_id:[\\"\\" TO *]", "limit":1000, "start":%s}'
start = 0
gathered_records = {}
while True:
url = ckan_url + query % start
response = requests.get(url)
listing = response.json()
if not isinstance(listing, dict):
raise RuntimeError("Wrong API response: %s" % listing)
results = listing.get("results")
if not results:
break
for result in results:
gathered_records[result["id"]] = {
"metadata_modified": result["metadata_modified"],
"harvest_object_id": result["extras"]["harvest_object_id"],
"source": result["extras"].get("metadata_source"),
}
start = start + 1000
log.debug("Gathered %s" % start)
log.info(
"Gather finished ({0} datasets): {1}".format(
len(gathered_records.keys()), str(datetime.datetime.now())
)
)
existing_records = {}
query = repo.session.query(repo.dataset.ckan_id, repo.dataset.ckan_modified)
for row in query:
existing_records[row[0]] = row[1]
repo.session.close()
new = set(gathered_records) - set(existing_records)
deleted = set(existing_records) - set(gathered_records)
changed = set()
for key in set(gathered_records) & set(existing_records):
if gathered_records[key]["metadata_modified"] > existing_records[key]:
changed.add(key)
for ckan_id in deleted:
try:
repo.session.begin()
repo.session.query(repo.dataset.ckan_id).filter_by(ckan_id=ckan_id).delete()
log.info("Deleted %s" % ckan_id)
repo.session.commit()
except Exception:
repo.session.rollback()
raise
for ckan_id in new:
ckan_info = gathered_records[ckan_id]
record = get_record(context, repo, ckan_url, ckan_id, ckan_info)
if not record:
log.info("Skipped record %s" % ckan_id)
continue
try:
repo.insert(record, "local", util.get_today_and_now())
log.info("Inserted %s" % ckan_id)
except Exception as err:
log.error("ERROR: not inserted %s Error:%s" % (ckan_id, err))
for ckan_id in changed:
ckan_info = gathered_records[ckan_id]
record = get_record(context, repo, ckan_url, ckan_id, ckan_info)
if not record:
continue
update_dict = dict(
[
(getattr(repo.dataset, key), getattr(record, key))
for key in record.__dict__.keys()
if key != "_sa_instance_state"
]
)
try:
repo.session.begin()
repo.session.query(repo.dataset).filter_by(ckan_id=ckan_id).update(
update_dict
)
repo.session.commit()
log.info("Changed %s" % ckan_id)
except Exception as err:
repo.session.rollback()
raise RuntimeError("ERROR: %s" % str(err))
def clear(pycsw_config):
from sqlalchemy import create_engine, MetaData, Table
database = pycsw_config.get("repository", "database")
table_name = pycsw_config.get("repository", "table", "records")
log.debug("Creating engine")
engine = create_engine(database)
records = Table(table_name, MetaData(engine))
records.delete().execute()
log.info("Table cleared")
def get_record(context, repo, ckan_url, ckan_id, ckan_info):
query = ckan_url + "harvest/object/%s"
url = query % ckan_info["harvest_object_id"]
response = requests.get(url)
if ckan_info["source"] == "arcgis":
return
try:
xml = etree.parse(io.BytesIO(response.content))
except Exception as err:
log.error("Could not pass xml doc from %s, Error: %s" % (ckan_id, err))
return
try:
record = metadata.parse_record(context, xml, repo)[0]
except Exception as err:
log.error("Could not extract metadata from %s, Error: %s" % (ckan_id, err))
return
if not record.identifier:
record.identifier = ckan_id
record.ckan_id = ckan_id
record.ckan_modified = ckan_info["metadata_modified"]
return record
usage = """
Manages the CKAN-pycsw integration
python ckan-pycsw.py setup [-p]
Setups the necessary pycsw table on the db.
python ckan-pycsw.py set_keywords [-p] -u
Sets pycsw server metadata keywords from CKAN site tag list.
python ckan-pycsw.py load [-p] -u
Loads CKAN datasets as records into the pycsw db.
python ckan-pycsw.py clear [-p]
Removes all records from the pycsw table.
All commands require the pycsw configuration file. By default it will try
to find a file called 'default.cfg' in the same directory, but you'll
probably need to provide the actual location via the -p option:
python ckan_pycsw.py setup -p /etc/ckan/default/pycsw.cfg
The load command requires a CKAN URL from where the datasets will be pulled:
python ckan_pycsw.py load -p /etc/ckan/default/pycsw.cfg -u http://localhost
"""
def _load_config(file_path):
abs_path = os.path.abspath(file_path)
if not os.path.exists(abs_path):
raise AssertionError("pycsw config file {0} does not exist.".format(abs_path))
config = SafeConfigParser()
config.read(abs_path)
return config
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="\n".split(usage)[0], usage=usage)
parser.add_argument("command", help="Command to perform")
parser.add_argument(
"-p",
"--pycsw_config",
action="store",
default="default.cfg",
help="pycsw config file to use.",
)
parser.add_argument(
"-u",
"--ckan_url",
action="store",
help="CKAN instance to import the datasets from.",
)
if len(sys.argv) <= 1:
parser.print_usage()
sys.exit(1)
arg = parser.parse_args()
pycsw_config = _load_config(arg.pycsw_config)
if arg.command == "setup":
setup_db(pycsw_config)
elif arg.command in ["load", "set_keywords"]:
if not arg.ckan_url:
raise AssertionError("You need to provide a CKAN URL with -u or --ckan_url")
ckan_url = arg.ckan_url.rstrip("/") + "/"
if arg.command == "load":
load(pycsw_config, ckan_url)
else:
set_keywords(arg.pycsw_config, pycsw_config, ckan_url)
elif arg.command == "clear":
clear(pycsw_config)
else:
print("Unknown command {0}".format(arg.command))
sys.exit(1)