-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathapi.py
277 lines (243 loc) · 12.8 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
import logging
from typing import TYPE_CHECKING, AnyStr, Optional
import six
from beaker.cache import cache_region
from cornice.service import get_services
from cornice_swagger import CorniceSwagger
from pyramid.authentication import Authenticated, IAuthenticationPolicy
from pyramid.exceptions import PredicateMismatch
from pyramid.httpexceptions import (
HTTPException,
HTTPForbidden,
HTTPMethodNotAllowed,
HTTPNotFound,
HTTPOk,
HTTPServerError,
HTTPUnauthorized
)
from pyramid.renderers import render_to_response
from pyramid.request import Request
from pyramid.response import Response
from pyramid.settings import asbool
from simplejson import JSONDecodeError
from six.moves.urllib.parse import urlparse
from weaver import __meta__
from weaver.formats import (
CONTENT_TYPE_APP_JSON,
CONTENT_TYPE_APP_PDF,
CONTENT_TYPE_APP_XML,
CONTENT_TYPE_TEXT_HTML
)
from weaver.owsexceptions import OWSException
from weaver.utils import get_header, get_settings, get_weaver_url
from weaver.wps_restapi import swagger_definitions as sd
from weaver.wps_restapi.colander_extras import CustomTypeConversionDispatcher
from weaver.wps_restapi.utils import OUTPUT_FORMAT_JSON, get_wps_restapi_base_url, wps_restapi_base_path
if TYPE_CHECKING:
from weaver.typedefs import JSON, SettingsType # noqa: F401
LOGGER = logging.getLogger(__name__)
@sd.api_frontpage_service.get(tags=[sd.TAG_API], renderer=OUTPUT_FORMAT_JSON,
schema=sd.FrontpageEndpoint(), response_schemas=sd.get_api_frontpage_responses)
def api_frontpage(request):
"""Frontpage of Weaver."""
settings = get_settings(request)
return api_frontpage_body(settings)
@cache_region("doc", sd.api_frontpage_service.name)
def api_frontpage_body(settings):
# type: (SettingsType) -> JSON
"""Generates the JSON body describing the Weaver API and documentation references."""
# import here to avoid circular import errors
from weaver.config import get_weaver_configuration
from weaver.wps import get_wps_url
weaver_url = get_weaver_url(settings)
weaver_config = get_weaver_configuration(settings)
weaver_api = asbool(settings.get("weaver.wps_restapi"))
weaver_api_url = get_wps_restapi_base_url(settings) if weaver_api else None
weaver_api_def = weaver_api_url + sd.api_swagger_ui_service.path if weaver_api else None
weaver_api_spec = weaver_api_url + sd.api_swagger_json_service.path if weaver_api else None
weaver_wps = asbool(settings.get("weaver.wps"))
weaver_wps_url = get_wps_url(settings) if weaver_wps else None
weaver_conform_url = weaver_url + sd.api_conformance_service.path
weaver_process_url = weaver_url + sd.processes_service.path
weaver_links = [
{"href": weaver_url, "rel": "self", "type": CONTENT_TYPE_APP_JSON, "title": "This document"},
{"href": weaver_conform_url, "rel": "conformance", "type": CONTENT_TYPE_APP_JSON,
"title": "WPS conformance classes implemented by this service."},
]
if weaver_api:
weaver_links.extend([
{"href": weaver_api_url,
"rel": "service", "type": CONTENT_TYPE_APP_JSON,
"title": "WPS REST API endpoint of this service."},
{"href": weaver_api_def,
"rel": "swagger", "type": CONTENT_TYPE_TEXT_HTML,
"title": "WPS REST API definition of this service."},
{"href": weaver_api_spec,
"rel": "OpenAPI", "type": CONTENT_TYPE_APP_JSON,
"title": "WPS REST API specification of this service."},
{"href": "https://raw.githubusercontent.com/opengeospatial/wps-rest-binding/develop/docs/18-062.pdf",
"rel": "documentation", "type": CONTENT_TYPE_APP_PDF,
"title": "API documentation about this service."},
{"href": "https://app.swaggerhub.com/apis/geoprocessing/WPS/",
"rel": "wps-rest-swagger", "type": CONTENT_TYPE_TEXT_HTML,
"title": "API reference specification of this service."},
{"href": weaver_process_url,
"rel": "processes", "type": CONTENT_TYPE_APP_JSON,
"title": "Processes offered by this service."}
])
if weaver_wps:
weaver_links.extend([
{"href": weaver_wps,
"rel": "wps", "type": CONTENT_TYPE_APP_XML,
"title": "WPS 1.0.0/2.0 XML endpoint of this service."},
{"href": "http://docs.opengeospatial.org/is/14-065/14-065.html",
"rel": "wps-xml-specification", "type": CONTENT_TYPE_TEXT_HTML,
"title": "WPS 1.0.0/2.0 definition of this service."},
{"href": "http://schemas.opengis.net/wps/",
"rel": "wps-xml-schema", "type": CONTENT_TYPE_APP_XML,
"title": "WPS 1.0.0/2.0 XML validation schemas."}
])
return {
"message": "Weaver Information",
"configuration": weaver_config,
"parameters": [
{"name": "api", "enabled": weaver_api,
"url": weaver_api_url,
"api": weaver_api_def},
{"name": "wps", "enabled": weaver_wps,
"url": weaver_wps_url},
],
"links": weaver_links,
}
@sd.api_versions_service.get(tags=[sd.TAG_API], renderer=OUTPUT_FORMAT_JSON,
schema=sd.VersionsEndpoint(), response_schemas=sd.get_api_versions_responses)
def api_versions(request): # noqa: F811
# type: (Request) -> HTTPException
"""Weaver versions information."""
weaver_info = {"name": "weaver", "version": __meta__.__version__, "type": "api"}
return HTTPOk(json={"versions": [weaver_info]})
@sd.api_conformance_service.get(tags=[sd.TAG_API], renderer=OUTPUT_FORMAT_JSON,
schema=sd.ConformanceEndpoint(), response_schemas=sd.get_api_conformance_responses)
def api_conformance(request): # noqa: F811
# type: (Request) -> HTTPException
"""Weaver specification conformance information."""
# TODO: follow updates with https://github.com/geopython/pygeoapi/issues/198
conformance = {"conformsTo": [
# "http://www.opengis.net/spec/wfs-1/3.0/req/core",
# "http://www.opengis.net/spec/wfs-1/3.0/req/oas30",
# "http://www.opengis.net/spec/wfs-1/3.0/req/html",
# "http://www.opengis.net/spec/wfs-1/3.0/req/geojson",
"http://schemas.opengis.net/wps/1.0.0/",
"http://schemas.opengis.net/wps/2.0/",
"http://www.opengis.net/spec/WPS/2.0/req/service/binding/rest-json/core",
# "http://www.opengis.net/spec/WPS/2.0/req/service/binding/rest-json/oas30",
# "http://www.opengis.net/spec/WPS/2.0/req/service/binding/rest-json/html"
"https://github.com/opengeospatial/wps-rest-binding",
]}
return HTTPOk(json=conformance)
@sd.api_swagger_json_service.get(tags=[sd.TAG_API], renderer=OUTPUT_FORMAT_JSON,
schema=sd.SwaggerJSONEndpoint(), response_schemas=sd.get_api_swagger_json_responses)
def api_swagger_json(request, use_docstring_summary=True):
# type: (Request, bool) -> dict
"""Weaver REST API schema generation in JSON format."""
CorniceSwagger.type_converter = CustomTypeConversionDispatcher
swagger = CorniceSwagger(get_services())
# function docstrings are used to create the route's summary in Swagger-UI
swagger.summary_docstrings = use_docstring_summary
swagger_base_spec = {"schemes": [request.scheme]}
# obtain 'server' host and api-base-path, which doesn't correspond necessarily to the app's host and path
# ex: 'server' adds '/weaver' with proxy redirect before API routes
weaver_server_url = get_weaver_url(request)
LOGGER.debug("Request app URL: [%s]", request.url)
LOGGER.debug("Weaver config URL: [%s]", weaver_server_url)
if weaver_server_url:
weaver_parsed_url = urlparse(weaver_server_url)
swagger_base_spec["host"] = weaver_parsed_url.netloc
swagger_base_path = weaver_parsed_url.path
else:
swagger_base_spec["host"] = request.host
swagger_base_path = sd.api_frontpage_service.path
swagger.swagger = swagger_base_spec
swagger_json = swagger.generate(title=sd.API_TITLE, version=__meta__.__version__,
base_path=swagger_base_path, openapi_spec=3)
swagger_json["externalDocs"] = sd.API_DOCS
return swagger_json
@sd.api_swagger_ui_service.get(tags=[sd.TAG_API],
schema=sd.SwaggerUIEndpoint(), response_schemas=sd.get_api_swagger_ui_responses)
def api_swagger_ui(request):
"""Weaver REST API swagger-ui schema documentation (this page)."""
json_path = wps_restapi_base_path(request.registry.settings) + sd.api_swagger_json_service.path
json_path = json_path.lstrip("/") # if path starts by '/', swagger-ui doesn't find it on remote
data_mako = {"api_title": sd.API_TITLE, "api_swagger_json_path": json_path, "api_version": __meta__.__version__}
return render_to_response("templates/swagger_ui.mako", data_mako, request=request)
def get_request_info(request, detail=None):
# type: (Request, Optional[AnyStr]) -> JSON
"""Provided additional response details based on the request and execution stack on failure."""
content = {u"route": str(request.upath_info), u"url": str(request.url), u"method": request.method}
if isinstance(detail, six.string_types):
content.update({"detail": detail})
if hasattr(request, "exception"):
# handle error raised simply by checking for 'json' property in python 3 when body is invalid
has_json = False
try:
has_json = hasattr(request.exception, "json")
except JSONDecodeError:
pass
if has_json and isinstance(request.exception.json, dict):
content.update(request.exception.json)
elif isinstance(request.exception, HTTPServerError) and hasattr(request.exception, "message"):
content.update({u"exception": str(request.exception.message)})
elif hasattr(request, "matchdict"):
if request.matchdict is not None and request.matchdict != "":
content.update(request.matchdict)
return content
def ows_json_format(function):
"""Decorator that adds additional detail in the response's JSON body if this is the returned content-type."""
def format_response_details(response, request):
# type: (Response, Request) -> HTTPException
http_response = function(request)
http_headers = get_header("Content-Type", http_response.headers) or []
req_headers = get_header("Accept", request.headers) or []
if any([CONTENT_TYPE_APP_JSON in http_headers, CONTENT_TYPE_APP_JSON in req_headers]):
body = OWSException.json_formatter(http_response.status, response.message or "",
http_response.title, request.environ)
body["detail"] = get_request_info(request)
http_response._json = body
if http_response.status_code != response.status_code:
raise http_response # re-raise if code was fixed
return http_response
return format_response_details
@ows_json_format
def not_found_or_method_not_allowed(request):
"""
Overrides the default is HTTPNotFound [404] by appropriate HTTPMethodNotAllowed [405] when applicable.
Not found response can correspond to underlying process operation not finding a required item, or a completely
unknown route (path did not match any existing API definition).
Method not allowed is more specific to the case where the path matches an existing API route, but the specific
request method (GET, POST, etc.) is not allowed on this path.
Without this fix, both situations return [404] regardless.
"""
if isinstance(request.exception, PredicateMismatch) and \
request.method not in request.exception._safe_methods: # noqa: W0212
http_err = HTTPMethodNotAllowed
http_msg = "" # auto-generated by HTTPMethodNotAllowed
else:
http_err = HTTPNotFound
http_msg = str(request.exception)
return http_err(http_msg)
@ows_json_format
def unauthorized_or_forbidden(request):
"""
Overrides the default is HTTPForbidden [403] by appropriate HTTPUnauthorized [401] when applicable.
Unauthorized response is for restricted user access according to credentials and/or authorization headers.
Forbidden response is for operation refused by the underlying process operations.
Without this fix, both situations return [403] regardless.
.. seealso::
- http://www.restapitutorial.com/httpstatuscodes.html
"""
authn_policy = request.registry.queryUtility(IAuthenticationPolicy)
if authn_policy:
principals = authn_policy.effective_principals(request)
if Authenticated not in principals:
return HTTPUnauthorized("Unauthorized access to this resource.")
return HTTPForbidden("Forbidden operation under this resource.")