Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1259] verify authentication on startup of python client #1557

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 52 additions & 7 deletions streampipes-client-python/streampipes/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@

from requests import Session
from streampipes.client.config import StreamPipesClientConfig
from streampipes.endpoint.api import DataLakeMeasureEndpoint, DataStreamEndpoint
from streampipes.endpoint.endpoint import APIEndpoint
from streampipes.endpoint import APIEndpoint
from streampipes.endpoint.api import (
DataLakeMeasureEndpoint,
DataStreamEndpoint,
VersionEndpoint,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -126,6 +130,36 @@ def __init__(
# name of the endpoint needs to be consistent with the Java client
self.dataLakeMeasureApi = DataLakeMeasureEndpoint(parent_client=self)
self.dataStreamApi = DataStreamEndpoint(parent_client=self)
self.versionApi = VersionEndpoint(parent_client=self)

self.server_version = self._get_server_version()

def _get_server_version(self) -> str:
"""Connects to the StreamPipes server and retrieves its version.

In addition to querying the server version, this method implicitly checks the specified credentials.

Returns
-------
sp_version: str
version of the connected StreamPipes instance

"""

# retrieve metadata from the API via the Streampipes server
# as a side effect, the specified credentials are also tested to ensure that authentication is successful.
version_dict = self.versionApi.get(identifier="").to_dict(use_source_names=False)

# remove SNAPSHOT-suffix if present
sp_version = version_dict["backend_version"].replace("-SNAPSHOT", "")

logger.info(
"The StreamPipes version was successfully retrieved from the backend: %s. "
"By means of that, authentication via the provided credentials is also tested successfully.",
sp_version,
)

return sp_version

@staticmethod
def _set_up_logging(logging_level: int) -> None:
Expand Down Expand Up @@ -228,14 +262,25 @@ def describe(self) -> None:
"""

# get all endpoints of this client
available_endpoints = [
available_endpoints = {
attr_name for attr_name in dir(self) if isinstance(self.__getattribute__(attr_name), APIEndpoint)
]
}

# remove endpoints that are not suitable for the describe method
# this is mainly due to not providing the `all()` method
available_endpoints = available_endpoints.symmetric_difference(
{
"versionApi",
}
)

# ensure deterministic order
ordered_available_endpoints = sorted(available_endpoints)

# collect the number of available resources per endpoint
endpoint_stats = {
(all_items := self.__getattribute__(endpoint_name).all()).__class__.__name__: len(all_items)
for endpoint_name in available_endpoints
for endpoint_name in ordered_available_endpoints
}

# sort the endpoints descending based on the number of resources
Expand All @@ -247,10 +292,10 @@ def describe(self) -> None:
f"\nHi there!\n"
f"You are connected to a StreamPipes instance running at "
f"{'http://' if self.client_config.https_disabled else 'https://'}"
f"{self.client_config.host_address}:{self.client_config.port}.\n"
f"{self.client_config.host_address}:{self.client_config.port} with version {self.server_version}.\n"
f"The following StreamPipes resources are available with this client:\n"
)

endpoint_stats_message = "\n".join(f"{count}x {name}" for name, count in sorted_endpoint_stats.items())

print(base_message + endpoint_stats_message)
logger.info(base_message + endpoint_stats_message)
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

from .data_lake_measure import DataLakeMeasureEndpoint
from .data_stream import DataStreamEndpoint
from .version import VersionEndpoint

__all__ = [
"DataLakeMeasureEndpoint",
"DataStreamEndpoint",
"VersionEndpoint",
]
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@
from streampipes.endpoint.endpoint import APIEndpoint
from streampipes.model.container import DataLakeMeasures
from streampipes.model.container.resource_container import ResourceContainer
from streampipes.model.resource.query_result import QueryResult

__all__ = [
"DataLakeMeasureEndpoint",
]

from streampipes.model.resource.query_result import QueryResult


class StreamPipesQueryValidationError(Exception):
"""A custom exception to be raised when the validation of query parameter
Expand Down Expand Up @@ -159,9 +158,9 @@ def build_query_string(self) -> str:
class DataLakeMeasureEndpoint(APIEndpoint):
"""Implementation of the DataLakeMeasure endpoint.

This endpoint provides an interfact to all data stored in the StreamPipes data lake.
This endpoint provides an interface to all data stored in the StreamPipes data lake.

Consequently, it allows uerying metadata about available data sets (see `all()` method).
Consequently, it allows querying metadata about available data sets (see `all()` method).
The metadata is returned as an instance of [`DataLakeMeasures`][streampipes.model.container.DataLakeMeasures].

In addition, the endpoint provides direct access to the data stored in the data laka by querying a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@
"""
Specific implementation of the StreamPipes API's data stream endpoints.
"""
from typing import Tuple, Type

from streampipes.endpoint.endpoint import APIEndpoint
from streampipes.model.container import DataStreams

__all__ = [
"DataStreamEndpoint",
]
from typing import Tuple, Type

from streampipes.endpoint.endpoint import APIEndpoint
from streampipes.model.container import DataStreams
from streampipes.model.container.resource_container import ResourceContainer


Expand Down
130 changes: 130 additions & 0 deletions streampipes-client-python/streampipes/endpoint/api/version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Specific implementation of the StreamPipes API's version endpoint.
"""

__all__ = [
"VersionEndpoint",
]

from typing import Tuple, Type

from streampipes.endpoint import APIEndpoint
from streampipes.model.container import Versions
from streampipes.model.container.resource_container import ResourceContainer
from streampipes.model.resource.resource import Resource


class VersionEndpoint(APIEndpoint):
"""Implementation of the Versions endpoint.

This endpoint provides metadata about the StreamPipes version of the connected instance.
It only allows to apply the `get()` method with an empty string as identifier.

Parameters
----------
parent_client: StreamPipesClient
The instance of [StreamPipesClient][streampipes.client.StreamPipesClient] the endpoint is attached to.

Examples
--------

>>> from streampipes.client import StreamPipesClient
>>> from streampipes.client.config import StreamPipesClientConfig
>>> from streampipes.client.credential_provider import StreamPipesApiKeyCredentials

>>> client_config = StreamPipesClientConfig(
... credential_provider=StreamPipesApiKeyCredentials(username="test-user", api_key="api-key"),
... host_address="localhost",
... port=8082,
... https_disabled=True
... )

>>> client = StreamPipesClient.create(client_config=client_config)

>>> client.versionApi.get(identifier="").to_dict(use_source_names=False)
{'backend_version': '0.92.0-SNAPSHOT'}
"""

@property
def _container_cls(self) -> Type[ResourceContainer]:
"""Defines the model container class the endpoint refers to.

Returns
-------
[Versions][streampipes.model.container.Versions]
"""

return Versions

@property
def _relative_api_path(self) -> Tuple[str, ...]:
"""Defines the relative api path to the DataStream endpoint.

Each path within the URL is defined as an own string.

Returns
-------
api_path: Tuple[str, ...]
a tuple of strings of which every represents a path value of the endpoint's API URL.
"""

return "api", "v2", "info", "versions"

def all(self) -> ResourceContainer:
"""Usually, this method returns information about all resources provided by this endpoint.
However, this endpoint does not support this kind of operation.

Raises
------
NotImplementedError
this endpoint does not return multiple entries, therefore this method is not available

"""
raise NotImplementedError("The `all()` method is not supported by this endpoint.")

def get(self, identifier: str, **kwargs) -> Resource:
"""Queries the resource from the API endpoint.

For this endpoint only one resource is available.

Parameters
----------
identifier: str
Not supported by this endpoint, is set to an empty string.

Returns
-------
versions: Version
The specified resource as an instance of the corresponding model class([Version][streampipes.model.resource.Version]). # noqa: 501
"""

return super().get(identifier="")

def post(self, resource: Resource) -> None:
"""Usually, this method allows to create via this endpoint.
Since the data represented by this endpoint is immutable, it does not support this kind of operation.

Raises
------
NotImplementedError
this endpoint does not allow for POST requests, therefore this method is not available

"""
raise NotImplementedError("The `post()` method is not supported by this endpoint.")
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

from .data_lake_measures import DataLakeMeasures
from .data_streams import DataStreams
from .versions import Versions

__all__ = [
"DataLakeMeasures",
"DataStreams",
"Versions",
]
55 changes: 55 additions & 0 deletions streampipes-client-python/streampipes/model/container/versions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
Implementation of a resource container for the versions endpoint.
"""

__all__ = [
"Versions",
]

from typing import Type

from streampipes.model.container.resource_container import ResourceContainer
from streampipes.model.resource import Version
from streampipes.model.resource.resource import Resource


class Versions(ResourceContainer):
"""Implementation of the resource container for the versions endpoint.

This resource container is a collection of versions returned by the StreamPipes API.
It is capable of parsing the response content directly into a list of queried `Version`.
Furthermore, the resource container makes them accessible in a pythonic manner.

Parameters
----------
resources: List[Version]
A list of resources ([Version][streampipes.model.resource.Version]) to be contained in the `ResourceContainer`.

"""

@classmethod
def _resource_cls(cls) -> Type[Resource]:
"""Returns the class of the resource that are bundled.

Returns
-------
[Version][streampipes.model.resource.Version]
"""
return Version
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
from .data_series import DataSeries
from .data_stream import DataStream
from .function_definition import FunctionDefinition
from .version import Version

__all__ = [
"DataLakeMeasure",
"DataSeries",
"DataStream",
"FunctionDefinition",
"Version",
]
Loading