From fc33486543013d376725bc87f60553da00b9b4e5 Mon Sep 17 00:00:00 2001 From: bossenti Date: Mon, 27 Feb 2023 21:47:07 +0100 Subject: [PATCH 1/6] feature(#1259): implement version endpoint Signed-off-by: bossenti --- .../streampipes/client/client.py | 7 +- .../streampipes/endpoint/api/__init__.py | 2 + .../streampipes/endpoint/api/version.py | 130 ++++++++++++++++++ .../streampipes/model/container/__init__.py | 2 + .../streampipes/model/container/versions.py | 55 ++++++++ .../streampipes/model/resource/__init__.py | 2 + .../streampipes/model/resource/version.py | 43 ++++++ 7 files changed, 240 insertions(+), 1 deletion(-) create mode 100644 streampipes-client-python/streampipes/endpoint/api/version.py create mode 100644 streampipes-client-python/streampipes/model/container/versions.py create mode 100644 streampipes-client-python/streampipes/model/resource/version.py diff --git a/streampipes-client-python/streampipes/client/client.py b/streampipes-client-python/streampipes/client/client.py index d98b2d5f47..07303ab5ee 100644 --- a/streampipes-client-python/streampipes/client/client.py +++ b/streampipes-client-python/streampipes/client/client.py @@ -29,7 +29,11 @@ from requests import Session from streampipes.client.config import StreamPipesClientConfig -from streampipes.endpoint.api import DataLakeMeasureEndpoint, DataStreamEndpoint +from streampipes.endpoint.api import ( + DataLakeMeasureEndpoint, + DataStreamEndpoint, + VersionEndpoint, +) from streampipes.endpoint.endpoint import APIEndpoint logger = logging.getLogger(__name__) @@ -126,6 +130,7 @@ def __init__( # name of the endpoint needs to be consistent with the Java client self.dataLakeMeasureApi = DataLakeMeasureEndpoint(parent_client=self) self.dataStreamApi = DataStreamEndpoint(parent_client=self) + self.versionApi = VersionEndpoint(parent_client=self) @staticmethod def _set_up_logging(logging_level: int) -> None: diff --git a/streampipes-client-python/streampipes/endpoint/api/__init__.py b/streampipes-client-python/streampipes/endpoint/api/__init__.py index 3d4e6e9cdd..28bcf688ae 100644 --- a/streampipes-client-python/streampipes/endpoint/api/__init__.py +++ b/streampipes-client-python/streampipes/endpoint/api/__init__.py @@ -17,8 +17,10 @@ from .data_lake_measure import DataLakeMeasureEndpoint from .data_stream import DataStreamEndpoint +from .version import VersionEndpoint __all__ = [ "DataLakeMeasureEndpoint", "DataStreamEndpoint", + "VersionEndpoint", ] diff --git a/streampipes-client-python/streampipes/endpoint/api/version.py b/streampipes-client-python/streampipes/endpoint/api/version.py new file mode 100644 index 0000000000..aa1fe0cf3a --- /dev/null +++ b/streampipes-client-python/streampipes/endpoint/api/version.py @@ -0,0 +1,130 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Specific implementation of the StreamPipes API's version endpoint. +""" + +__all__ = [ + "VersionEndpoint", +] + +from typing import Tuple, Type + +from streampipes.endpoint import APIEndpoint +from streampipes.model.container import Versions +from streampipes.model.container.resource_container import ResourceContainer +from streampipes.model.resource.resource import Resource + + +class VersionEndpoint(APIEndpoint): + """Implementation of the Versions endpoint. + + This endpoint provides metadata about the StreamPipes version of the connected instance. + It only allows to apply the `get()` method with an empty string as identifier. + + Parameters + ---------- + parent_client: StreamPipesClient + The instance of [StreamPipesClient][streampipes.client.StreamPipesClient] the endpoint is attached to. + + Examples + -------- + + >>> from streampipes.client import StreamPipesClient + >>> from streampipes.client.config import StreamPipesClientConfig + >>> from streampipes.client.credential_provider import StreamPipesApiKeyCredentials + + >>> client_config = StreamPipesClientConfig( + ... credential_provider=StreamPipesApiKeyCredentials(username="test-user", api_key="api-key"), + ... host_address="localhost", + ... port=8082, + ... https_disabled=True + ... ) + + >>> client = StreamPipesClient.create(client_config=client_config) + + >>> client.versionApi.get(identifier="").to_dict(use_source_names=False) + {'backend_version': '0.92.0-SNAPSHOT'} + """ + + @property + def _container_cls(self) -> Type[ResourceContainer]: + """Defines the model container class the endpoint refers to. + + Returns + ------- + [Versions][streampipes.model.container.Versions] + """ + + return Versions + + @property + def _relative_api_path(self) -> Tuple[str, ...]: + """Defines the relative api path to the DataStream endpoint. + + Each path within the URL is defined as an own string. + + Returns + ------- + api_path: Tuple[str, ...] + a tuple of strings of which every represents a path value of the endpoint's API URL. + """ + + return "api", "v2", "info", "versions" + + def all(self) -> ResourceContainer: + """Usually, this method returns information about all resources provided by this endpoint. + However, this endpoint does not support this kind of operation. + + Raises + ------ + NotImplementedError + this endpoint does not return multiple entries, therefore this method is not available + + """ + raise NotImplementedError("The `all()` method is not supported by this endpoint.") + + def get(self, identifier: str, **kwargs) -> Resource: + """Queries the resource from the API endpoint. + + For this endpoint only one resource is available. + + Parameters + ---------- + identifier: str + Not supported by this endpoint, is set to an empty string. + + Returns + ------- + versions: Version + The specified resource as an instance of the corresponding model class([Version][streampipes.model.resource.Version]). # noqa: 501 + """ + + return super().get(identifier="") + + def post(self, resource: Resource) -> None: + """Usually, this method allows to create via this endpoint. + Since the data represented by this endpoint is immutable, it does not support this kind of operation. + + Raises + ------ + NotImplementedError + this endpoint does not allow for POST requests, therefore this method is not available + + """ + raise NotImplementedError("The `post()` method is not supported by this endpoint.") diff --git a/streampipes-client-python/streampipes/model/container/__init__.py b/streampipes-client-python/streampipes/model/container/__init__.py index 53b0ded938..59d688ce8a 100644 --- a/streampipes-client-python/streampipes/model/container/__init__.py +++ b/streampipes-client-python/streampipes/model/container/__init__.py @@ -17,8 +17,10 @@ from .data_lake_measures import DataLakeMeasures from .data_streams import DataStreams +from .versions import Versions __all__ = [ "DataLakeMeasures", "DataStreams", + "Versions", ] diff --git a/streampipes-client-python/streampipes/model/container/versions.py b/streampipes-client-python/streampipes/model/container/versions.py new file mode 100644 index 0000000000..f2e952fb34 --- /dev/null +++ b/streampipes-client-python/streampipes/model/container/versions.py @@ -0,0 +1,55 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Implementation of a resource container for the versions endpoint. +""" + +__all__ = [ + "Versions", +] + +from typing import Type + +from streampipes.model.container.resource_container import ResourceContainer +from streampipes.model.resource import Version +from streampipes.model.resource.resource import Resource + + +class Versions(ResourceContainer): + """Implementation of the resource container for the versions endpoint. + + This resource container is a collection of versions returned by the StreamPipes API. + It is capable of parsing the response content directly into a list of queried `Version`. + Furthermore, the resource container makes them accessible in a pythonic manner. + + Parameters + ---------- + resources: List[Version] + A list of resources ([Version][streampipes.model.resource.Version]) to be contained in the `ResourceContainer`. + + """ + + @classmethod + def _resource_cls(cls) -> Type[Resource]: + """Returns the class of the resource that are bundled. + + Returns + ------- + [Version][streampipes.model.resource.Version] + """ + return Version diff --git a/streampipes-client-python/streampipes/model/resource/__init__.py b/streampipes-client-python/streampipes/model/resource/__init__.py index 09590c1dae..0e6672cbe9 100644 --- a/streampipes-client-python/streampipes/model/resource/__init__.py +++ b/streampipes-client-python/streampipes/model/resource/__init__.py @@ -19,10 +19,12 @@ from .data_series import DataSeries from .data_stream import DataStream from .function_definition import FunctionDefinition +from .version import Version __all__ = [ "DataLakeMeasure", "DataSeries", "DataStream", "FunctionDefinition", + "Version", ] diff --git a/streampipes-client-python/streampipes/model/resource/version.py b/streampipes-client-python/streampipes/model/resource/version.py new file mode 100644 index 0000000000..c0405caecf --- /dev/null +++ b/streampipes-client-python/streampipes/model/resource/version.py @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +__all__ = [ + "Version", +] + +from typing import Dict + +from pydantic import StrictStr +from streampipes.model.resource.resource import Resource + + +class Version(Resource): + """Metadata about the version of the connected StreamPipes server. + + Attributes + ---------- + backend_version: str + version of the StreamPipes backend the client is connected to + """ + + def convert_to_pandas_representation(self) -> Dict: + """Returns the dictionary representation of the version metadata + to be used when creating a pandas Dataframe. + """ + return self.to_dict(use_source_names=False) + + backend_version: StrictStr From 5bbe9286e8def53198ec22f07302ec94d344a53f Mon Sep 17 00:00:00 2001 From: bossenti Date: Mon, 27 Feb 2023 22:12:27 +0100 Subject: [PATCH 2/6] chore: reformatting Signed-off-by: bossenti --- .../streampipes/endpoint/api/data_stream.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/streampipes-client-python/streampipes/endpoint/api/data_stream.py b/streampipes-client-python/streampipes/endpoint/api/data_stream.py index ce5da71d2b..6067e0b35d 100644 --- a/streampipes-client-python/streampipes/endpoint/api/data_stream.py +++ b/streampipes-client-python/streampipes/endpoint/api/data_stream.py @@ -18,15 +18,14 @@ """ Specific implementation of the StreamPipes API's data stream endpoints. """ -from typing import Tuple, Type - -from streampipes.endpoint.endpoint import APIEndpoint -from streampipes.model.container import DataStreams __all__ = [ "DataStreamEndpoint", ] +from typing import Tuple, Type +from streampipes.endpoint.endpoint import APIEndpoint +from streampipes.model.container import DataStreams from streampipes.model.container.resource_container import ResourceContainer From 2ea31caa8dcd55d7896f588d01ca43d9cc2e2a74 Mon Sep 17 00:00:00 2001 From: bossenti Date: Mon, 27 Feb 2023 22:12:49 +0100 Subject: [PATCH 3/6] fix: typos Signed-off-by: bossenti --- .../streampipes/endpoint/api/data_lake_measure.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py index c7493683ca..aa44e238b0 100644 --- a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py +++ b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py @@ -25,14 +25,13 @@ from pydantic import BaseModel, Extra, Field, StrictInt, ValidationError, validator from streampipes.endpoint.endpoint import APIEndpoint from streampipes.model.container import DataLakeMeasures +from streampipes.model.resource.query_result import QueryResult from streampipes.model.container.resource_container import ResourceContainer __all__ = [ "DataLakeMeasureEndpoint", ] -from streampipes.model.resource.query_result import QueryResult - class StreamPipesQueryValidationError(Exception): """A custom exception to be raised when the validation of query parameter @@ -159,9 +158,9 @@ def build_query_string(self) -> str: class DataLakeMeasureEndpoint(APIEndpoint): """Implementation of the DataLakeMeasure endpoint. - This endpoint provides an interfact to all data stored in the StreamPipes data lake. + This endpoint provides an interface to all data stored in the StreamPipes data lake. - Consequently, it allows uerying metadata about available data sets (see `all()` method). + Consequently, it allows querying metadata about available data sets (see `all()` method). The metadata is returned as an instance of [`DataLakeMeasures`][streampipes.model.container.DataLakeMeasures]. In addition, the endpoint provides direct access to the data stored in the data laka by querying a From a1e4041464c64e0c8ea00805fea77ce2534a28e7 Mon Sep 17 00:00:00 2001 From: bossenti Date: Sat, 6 May 2023 08:14:14 +0200 Subject: [PATCH 4/6] [1259] retrieve version on start-up to verify authentication Signed-off-by: bossenti --- .../streampipes/client/client.py | 45 +++++++++++++++++-- .../tests/client/test_client.py | 24 +++++++--- .../tests/client/test_endpoint.py | 2 + 3 files changed, 62 insertions(+), 9 deletions(-) diff --git a/streampipes-client-python/streampipes/client/client.py b/streampipes-client-python/streampipes/client/client.py index 07303ab5ee..afb8c78888 100644 --- a/streampipes-client-python/streampipes/client/client.py +++ b/streampipes-client-python/streampipes/client/client.py @@ -29,12 +29,12 @@ from requests import Session from streampipes.client.config import StreamPipesClientConfig +from streampipes.endpoint import APIEndpoint from streampipes.endpoint.api import ( DataLakeMeasureEndpoint, DataStreamEndpoint, VersionEndpoint, ) -from streampipes.endpoint.endpoint import APIEndpoint logger = logging.getLogger(__name__) @@ -132,6 +132,35 @@ def __init__( self.dataStreamApi = DataStreamEndpoint(parent_client=self) self.versionApi = VersionEndpoint(parent_client=self) + self.server_version = self._get_server_version() + + def _get_server_version(self) -> str: + """Connects to the StreamPipes server and retrieves its version. + + In addition to querying the server version, this method implicitly checks the specified credentials. + + Returns + ------- + sp_version: str + version of the connected StreamPipes instance + + """ + + # retrieve metadata from the API via the Streampipes server + # as a side effect, the specified credentials are also tested to ensure that authentication is successful. + version_dict = self.versionApi.get(identifier="").to_dict(use_source_names=False) + + # remove SNAPSHOT-suffix if present + sp_version = version_dict["backend_version"].replace("-SNAPSHOT", "") + + logger.info( + "The StreamPipes version was successfully retrieved from the backend: %s. " + "By means of that, authentication via the provided credentials is also tested successfully.", + sp_version, + ) + + return sp_version + @staticmethod def _set_up_logging(logging_level: int) -> None: """Configures the logging behavior of the `StreamPipesClient`. @@ -233,9 +262,17 @@ def describe(self) -> None: """ # get all endpoints of this client - available_endpoints = [ + available_endpoints = { attr_name for attr_name in dir(self) if isinstance(self.__getattribute__(attr_name), APIEndpoint) - ] + } + + # remove endpoints that are not suitable for the describe method + # this is mainly due to not providing the `all()` method + available_endpoints = available_endpoints.symmetric_difference( + { + "versionApi", + } + ) # collect the number of available resources per endpoint endpoint_stats = { @@ -252,7 +289,7 @@ def describe(self) -> None: f"\nHi there!\n" f"You are connected to a StreamPipes instance running at " f"{'http://' if self.client_config.https_disabled else 'https://'}" - f"{self.client_config.host_address}:{self.client_config.port}.\n" + f"{self.client_config.host_address}:{self.client_config.port} with version {self.server_version}.\n" f"The following StreamPipes resources are available with this client:\n" ) diff --git a/streampipes-client-python/tests/client/test_client.py b/streampipes-client-python/tests/client/test_client.py index b5b6448fb0..a08904438b 100644 --- a/streampipes-client-python/tests/client/test_client.py +++ b/streampipes-client-python/tests/client/test_client.py @@ -17,7 +17,9 @@ import json from collections import namedtuple from unittest import TestCase -from unittest.mock import MagicMock, call, patch +from unittest.mock import patch, MagicMock, call + +from requests import Response from streampipes.client import StreamPipesClient from streampipes.client.config import StreamPipesClientConfig @@ -74,10 +76,18 @@ def test_client_create(self): @patch("builtins.print") @patch("streampipes.endpoint.endpoint.APIEndpoint._make_request", autospec=True) def test_client_describe(self, make_request: MagicMock, mocked_print: MagicMock): + + class MockResponse: + def __init__(self, text): + self.text = text + + def json(self): + return json.loads(self.text) + def simulate_response(*args, **kwargs): - Response = namedtuple("Response", ["text"]) + if "measurements" in kwargs["url"]: - return Response( + return MockResponse( json.dumps( [ { @@ -90,11 +100,15 @@ def simulate_response(*args, **kwargs): ) ) if "streams" in kwargs["url"]: - return Response( + return MockResponse( json.dumps( [{"elementId": "test-stream", "name": "test", "eventGrounding": {"transportProtocols": []}}] ) ) + if "versions" in kwargs["url"]: + return MockResponse( + json.dumps({"backendVersion": "SP-dev"}) + ) make_request.side_effect = simulate_response StreamPipesClient.create( @@ -109,7 +123,7 @@ def simulate_response(*args, **kwargs): mocked_print.assert_has_calls( calls=[ call( - "\nHi there!\nYou are connected to a StreamPipes instance running at https://localhost:443.\n" + "\nHi there!\nYou are connected to a StreamPipes instance running at https://localhost:443 with version SP-dev.\n" "The following StreamPipes resources are available with this client:\n" "1x DataLakeMeasures\n1x DataStreams" ), diff --git a/streampipes-client-python/tests/client/test_endpoint.py b/streampipes-client-python/tests/client/test_endpoint.py index 5e4c0e1f9a..9b67df64b8 100644 --- a/streampipes-client-python/tests/client/test_endpoint.py +++ b/streampipes-client-python/tests/client/test_endpoint.py @@ -418,6 +418,7 @@ class TestMessagingEndpoint(TestCase): ) ) + @patch("streampipes.client.client.Session", autospec=True) def test_messaging_endpoint_happy_path(self): demo_endpoint = MessagingEndpoint(parent_client=self.client) @@ -425,6 +426,7 @@ def test_messaging_endpoint_happy_path(self): self.assertTrue(isinstance(demo_endpoint.broker, NatsConsumer)) + @patch("streampipes.client.client.Session", autospec=True) def test_messaging_endpoint_missing_configure(self): demo_endpoint = MessagingEndpoint(parent_client=self.client) From a2f35ff7dac26e0b873fd5201dcb25aef68e502d Mon Sep 17 00:00:00 2001 From: bossenti Date: Sat, 6 May 2023 08:14:22 +0200 Subject: [PATCH 5/6] reorder imports Signed-off-by: bossenti --- .../streampipes/endpoint/api/data_lake_measure.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py index aa44e238b0..1c494895a9 100644 --- a/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py +++ b/streampipes-client-python/streampipes/endpoint/api/data_lake_measure.py @@ -25,8 +25,8 @@ from pydantic import BaseModel, Extra, Field, StrictInt, ValidationError, validator from streampipes.endpoint.endpoint import APIEndpoint from streampipes.model.container import DataLakeMeasures -from streampipes.model.resource.query_result import QueryResult from streampipes.model.container.resource_container import ResourceContainer +from streampipes.model.resource.query_result import QueryResult __all__ = [ "DataLakeMeasureEndpoint", From ff4f3adc7d99534bd8df93a669a80aef93a42d13 Mon Sep 17 00:00:00 2001 From: bossenti Date: Sun, 7 May 2023 19:52:38 +0200 Subject: [PATCH 6/6] feature(1259): adapt tests Signed-off-by: bossenti --- .../streampipes/client/client.py | 7 +- .../tests/client/test_client.py | 18 +++-- .../tests/client/test_data_lake_series.py | 18 ++++- .../tests/client/test_endpoint.py | 78 ++++++++++++++----- .../tests/functions/test_function_handler.py | 29 +++++-- .../tests/functions/test_river_function.py | 8 ++ 6 files changed, 122 insertions(+), 36 deletions(-) diff --git a/streampipes-client-python/streampipes/client/client.py b/streampipes-client-python/streampipes/client/client.py index afb8c78888..00f15b0bac 100644 --- a/streampipes-client-python/streampipes/client/client.py +++ b/streampipes-client-python/streampipes/client/client.py @@ -274,10 +274,13 @@ def describe(self) -> None: } ) + # ensure deterministic order + ordered_available_endpoints = sorted(available_endpoints) + # collect the number of available resources per endpoint endpoint_stats = { (all_items := self.__getattribute__(endpoint_name).all()).__class__.__name__: len(all_items) - for endpoint_name in available_endpoints + for endpoint_name in ordered_available_endpoints } # sort the endpoints descending based on the number of resources @@ -295,4 +298,4 @@ def describe(self) -> None: endpoint_stats_message = "\n".join(f"{count}x {name}" for name, count in sorted_endpoint_stats.items()) - print(base_message + endpoint_stats_message) + logger.info(base_message + endpoint_stats_message) diff --git a/streampipes-client-python/tests/client/test_client.py b/streampipes-client-python/tests/client/test_client.py index a08904438b..19a8988332 100644 --- a/streampipes-client-python/tests/client/test_client.py +++ b/streampipes-client-python/tests/client/test_client.py @@ -28,7 +28,11 @@ class TestStreamPipesClient(TestCase): - def test_client_init(self): + @patch("streampipes.client.client.StreamPipesClient._get_server_version", autospec=True) + def test_client_init(self, server_version: MagicMock): + + server_version.return_value = {"backendVersion": "0.x.y"} + result = StreamPipesClient( client_config=StreamPipesClientConfig( credential_provider=StreamPipesApiKeyCredentials(username="user", api_key="key"), @@ -50,7 +54,11 @@ def test_client_init(self): self.assertTrue(isinstance(result.dataLakeMeasureApi, DataLakeMeasureEndpoint)) self.assertEqual(result.base_api_path, "http://localhost:80/streampipes-backend/") - def test_client_create(self): + @patch("streampipes.client.client.StreamPipesClient._get_server_version", autospec=True) + def test_client_create(self, server_version: MagicMock): + + server_version.return_value = {"backendVersion": "0.x.y"} + result = StreamPipesClient.create( client_config=StreamPipesClientConfig( credential_provider=StreamPipesApiKeyCredentials(username="user", api_key="key"), @@ -73,9 +81,9 @@ def test_client_create(self): self.assertTrue(isinstance(result.dataLakeMeasureApi, DataLakeMeasureEndpoint)) self.assertEqual(result.base_api_path, "https://localhost:443/streampipes-backend/") - @patch("builtins.print") + @patch("streampipes.client.client.logger", autospec=True) @patch("streampipes.endpoint.endpoint.APIEndpoint._make_request", autospec=True) - def test_client_describe(self, make_request: MagicMock, mocked_print: MagicMock): + def test_client_describe(self, make_request: MagicMock, mocked_logger: MagicMock): class MockResponse: def __init__(self, text): @@ -120,7 +128,7 @@ def simulate_response(*args, **kwargs): ) ).describe() - mocked_print.assert_has_calls( + mocked_logger.info.assert_has_calls( calls=[ call( "\nHi there!\nYou are connected to a StreamPipes instance running at https://localhost:443 with version SP-dev.\n" diff --git a/streampipes-client-python/tests/client/test_data_lake_series.py b/streampipes-client-python/tests/client/test_data_lake_series.py index d634a1262b..f6ebf41ee8 100644 --- a/streampipes-client-python/tests/client/test_data_lake_series.py +++ b/streampipes-client-python/tests/client/test_data_lake_series.py @@ -94,7 +94,11 @@ def get_result_as_panda(http_session: MagicMock, data: dict): return result.to_pandas() @patch("streampipes.client.client.Session", autospec=True) - def test_to_pandas(self, http_session: MagicMock): + @patch("streampipes.client.client.StreamPipesClient._get_server_version", autospec=True) + def test_to_pandas(self, server_version: MagicMock, http_session: MagicMock): + + server_version.return_value = {"backendVersion": "0.x.y"} + query_result = { "total": 1, "headers": self.headers, @@ -114,7 +118,11 @@ def test_to_pandas(self, http_session: MagicMock): self.assertEqual(73.37740325927734, result_pd["level"][0]) @patch("streampipes.client.client.Session", autospec=True) - def test_group_by_to_pandas(self, http_session: MagicMock): + @patch("streampipes.client.client.StreamPipesClient._get_server_version", autospec=True) + def test_group_by_to_pandas(self, server_version: MagicMock, http_session: MagicMock): + + server_version.return_value = {"backendVersion": "0.x.y"} + query_result = { "total": 2, "headers": self.headers, @@ -135,7 +143,11 @@ def test_group_by_to_pandas(self, http_session: MagicMock): self.assertEqual(70.03279876708984, result_pd["level"][3]) @patch("streampipes.client.client.Session", autospec=True) - def test_different_headers_exception(self, http_session: MagicMock): + @patch("streampipes.client.client.StreamPipesClient._get_server_version", autospec=True) + def test_different_headers_exception(self, server_version: MagicMock, http_session: MagicMock): + + server_version.return_value = {"backendVersion": "0.x.y"} + query_result = { "total": 1, "headers": ['one'], diff --git a/streampipes-client-python/tests/client/test_endpoint.py b/streampipes-client-python/tests/client/test_endpoint.py index 9b67df64b8..c610339ee1 100644 --- a/streampipes-client-python/tests/client/test_endpoint.py +++ b/streampipes-client-python/tests/client/test_endpoint.py @@ -115,7 +115,7 @@ def setUp(self) -> None: "topicDefinition": { "@class": "org.apache.streampipes.model.grounding.SimpleTopicDefinition", "actualTopicName": "org.apache.streampipes.connect." - "fc22b8f6-698a-4127-aa71-e11854dc57c5", + "fc22b8f6-698a-4127-aa71-e11854dc57c5", }, "port": 4222, } @@ -170,7 +170,7 @@ def setUp(self) -> None: "measurementObject": None, "index": 0, "correspondingAdapterId": "urn:streampipes.apache.org:spi:org.apache.streampipes.connect." - "iiot.adapters.simulator.machine:11934d37-135b-4ef6-b5f1-4f520cc81a43", + "iiot.adapters.simulator.machine:11934d37-135b-4ef6-b5f1-4f520cc81a43", "category": None, "uri": "urn:streampipes.apache.org:eventstream:uPDKLI", "dom": None, @@ -188,11 +188,14 @@ def setUp(self) -> None: self.data_lake_measure_all_json_validation = json.dumps(self.dlm_all_manipulated) @patch("streampipes.client.client.Session", autospec=True) - def test_endpoint_get(self, http_session: MagicMock): + @patch("streampipes.client.client.StreamPipesClient._get_server_version", autospec=True) + def test_endpoint_get(self, server_version: MagicMock, http_session: MagicMock): http_session_mock = MagicMock() http_session_mock.get.return_value.json.return_value = self.data_stream_get http_session.return_value = http_session_mock + server_version.return_value = {"backendVersion": "0.x.y"} + client = StreamPipesClient( client_config=StreamPipesClientConfig( credential_provider=StreamPipesApiKeyCredentials(username="user", api_key="key"), @@ -206,7 +209,7 @@ def test_endpoint_get(self, http_session: MagicMock): calls=[ call().get( url="https://localhost:80/streampipes-backend/api/v2/streams/urn:streampipes." - "apache.org:eventstream:uPDKLI" + "apache.org:eventstream:uPDKLI" ) ], any_order=True, @@ -215,10 +218,13 @@ def test_endpoint_get(self, http_session: MagicMock): self.assertEqual(result.to_dict(use_source_names=True), self.data_stream_get) @patch("streampipes.client.client.Session", autospec=True) - def test_endpoint_post(self, http_session: MagicMock): + @patch("streampipes.client.client.StreamPipesClient._get_server_version", autospec=True) + def test_endpoint_post(self, server_version: MagicMock, http_session: MagicMock): http_session_mock = MagicMock() http_session.return_value = http_session_mock + server_version.return_value = {"backendVersion": "0.x.y"} + client = StreamPipesClient( client_config=StreamPipesClientConfig( credential_provider=StreamPipesApiKeyCredentials(username="user", api_key="key"), @@ -235,7 +241,11 @@ def test_endpoint_post(self, http_session: MagicMock): ) @patch("streampipes.client.client.Session", autospec=True) - def test_endpoint_data_stream_happy_path(self, http_session: MagicMock): + @patch("streampipes.client.client.StreamPipesClient._get_server_version", autospec=True) + def test_endpoint_data_stream_happy_path(self, server_version: MagicMock, http_session: MagicMock): + + server_version.return_value = {"backendVersion": "0.x.y"} + http_session_mock = MagicMock() http_session_mock.get.return_value.text = self.data_stream_all_json http_session.return_value = http_session_mock @@ -301,7 +311,11 @@ def test_endpoint_data_stream_happy_path(self, http_session: MagicMock): ) @patch("streampipes.client.client.Session", autospec=True) - def test_endpoint_data_lake_measure_happy_path(self, http_session: MagicMock): + @patch("streampipes.client.client.StreamPipesClient._get_server_version", autospec=True) + def test_endpoint_data_lake_measure_happy_path(self, server_version: MagicMock, http_session: MagicMock): + + server_version.return_value = {"backendVersion": "0.x.y"} + http_session_mock = MagicMock() http_session_mock.get.return_value.text = self.data_lake_measure_all_json http_session.return_value = http_session_mock @@ -350,7 +364,11 @@ def test_endpoint_data_lake_measure_happy_path(self, http_session: MagicMock): self.assertEqual(2, result_pd["num_event_properties"][0]) @patch("streampipes.client.client.Session", autospec=True) - def test_endpoint_data_lake_measure_bad_return_code(self, http_session: MagicMock): + @patch("streampipes.client.client.StreamPipesClient._get_server_version", autospec=True) + def test_endpoint_data_lake_measure_bad_return_code(self, server_version: MagicMock, http_session: MagicMock): + + server_version.return_value = {"backendVersion": "0.x.y"} + response_mock = MagicMock() response_mock.status_code = 405 response_mock.text = "Test error" @@ -376,7 +394,11 @@ def test_endpoint_data_lake_measure_bad_return_code(self, http_session: MagicMoc ) @patch("streampipes.client.client.Session", autospec=True) - def test_endpoint_data_lake_measure_json_error(self, http_session: MagicMock): + @patch("streampipes.client.client.StreamPipesClient._get_server_version", autospec=True) + def test_endpoint_data_lake_measure_json_error(self, server_version: MagicMock, http_session: MagicMock): + + server_version.return_value = {"backendVersion": "0.x.y"} + http_session_mock = MagicMock() http_session_mock.get.return_value.text = self.data_lake_measure_all_json_error http_session.return_value = http_session_mock @@ -392,7 +414,11 @@ def test_endpoint_data_lake_measure_json_error(self, http_session: MagicMock): client.dataLakeMeasureApi.all() @patch("streampipes.client.client.Session", autospec=True) - def test_endpoint_data_lake_measure_validation_error(self, http_session: MagicMock): + @patch("streampipes.client.client.StreamPipesClient._get_server_version", autospec=True) + def test_endpoint_data_lake_measure_validation_error(self, server_version: MagicMock, http_session: MagicMock): + + server_version.return_value = {"backendVersion": "0.x.y"} + http_session_mock = MagicMock() http_session_mock.get.return_value.text = self.data_lake_measure_all_json_validation http_session.return_value = http_session_mock @@ -411,24 +437,34 @@ def test_endpoint_data_lake_measure_validation_error(self, http_session: MagicMo class TestMessagingEndpoint(TestCase): - client = StreamPipesClient( - client_config=StreamPipesClientConfig( - credential_provider=StreamPipesApiKeyCredentials(username="user", api_key="key"), - host_address="localhost", + + @patch("streampipes.client.client.StreamPipesClient._get_server_version", autospec=True) + def test_messaging_endpoint_happy_path(self, _: MagicMock): + + client = StreamPipesClient( + client_config=StreamPipesClientConfig( + credential_provider=StreamPipesApiKeyCredentials(username="user", api_key="key"), + host_address="localhost", + ) ) - ) - @patch("streampipes.client.client.Session", autospec=True) - def test_messaging_endpoint_happy_path(self): - demo_endpoint = MessagingEndpoint(parent_client=self.client) + + demo_endpoint = MessagingEndpoint(parent_client=client) demo_endpoint.configure(broker=NatsConsumer()) self.assertTrue(isinstance(demo_endpoint.broker, NatsConsumer)) - @patch("streampipes.client.client.Session", autospec=True) - def test_messaging_endpoint_missing_configure(self): - demo_endpoint = MessagingEndpoint(parent_client=self.client) + @patch("streampipes.client.client.StreamPipesClient._get_server_version", autospec=True) + def test_messaging_endpoint_missing_configure(self, _:MagicMock): + client = StreamPipesClient( + client_config=StreamPipesClientConfig( + credential_provider=StreamPipesApiKeyCredentials(username="user", api_key="key"), + host_address="localhost", + ) + ) + + demo_endpoint = MessagingEndpoint(parent_client=client) with self.assertRaises(MessagingEndpointNotConfiguredError): demo_endpoint.broker diff --git a/streampipes-client-python/tests/functions/test_function_handler.py b/streampipes-client-python/tests/functions/test_function_handler.py index 8ff3252995..fdb27ea46c 100644 --- a/streampipes-client-python/tests/functions/test_function_handler.py +++ b/streampipes-client-python/tests/functions/test_function_handler.py @@ -174,11 +174,14 @@ def setUp(self) -> None: @patch("streampipes.functions.broker.nats.nats_consumer.connect", autospec=True) @patch("streampipes.functions.broker.NatsConsumer.get_message", autospec=True) @patch("streampipes.client.client.Session", autospec=True) - def test_function_handler_nats(self, http_session: MagicMock, get_messages: MagicMock, connection: AsyncMock): + @patch("streampipes.client.client.StreamPipesClient._get_server_version", autospec=True) + def test_function_handler_nats(self, server_version: MagicMock, http_session: MagicMock, get_messages: MagicMock, connection: AsyncMock): http_session_mock = MagicMock() http_session_mock.get.return_value.json.return_value = self.data_stream_nats http_session.return_value = http_session_mock + server_version.return_value = {"backendVersion": '0.x.y'} + get_messages.return_value = TestMessageIterator(self.test_stream_data1) client = StreamPipesClient( @@ -206,11 +209,14 @@ def test_function_handler_nats(self, http_session: MagicMock, get_messages: Magi @patch("streampipes.functions.broker.kafka.kafka_consumer.KafkaConnection", autospec=True) @patch("streampipes.client.client.Session", autospec=True) - def test_function_handler_kafka(self, http_session: MagicMock, connection: MagicMock): + @patch("streampipes.client.client.StreamPipesClient._get_server_version", autospec=True) + def test_function_handler_kafka(self, server_version: MagicMock, http_session: MagicMock, connection: MagicMock): http_session_mock = MagicMock() http_session_mock.get.return_value.json.return_value = self.data_stream_kafka http_session.return_value = http_session_mock + server_version.return_value = {"backendVersion": '0.x.y'} + connection_mock = MagicMock() connection_mock.poll.side_effect = TestKafkaMessageContainer(self.test_stream_data1).get_data connection.return_value = connection_mock @@ -240,11 +246,14 @@ def test_function_handler_kafka(self, http_session: MagicMock, connection: Magic @patch("streampipes.functions.broker.nats.nats_consumer.connect", autospec=True) @patch("streampipes.client.client.Session", autospec=True) - def test_function_handler_unsupported_broker(self, http_session: MagicMock, connection: AsyncMock): + @patch("streampipes.client.client.StreamPipesClient._get_server_version", autospec=True) + def test_function_handler_unsupported_broker(self, server_version: MagicMock, http_session: MagicMock, connection: AsyncMock): http_session_mock = MagicMock() http_session_mock.get.return_value.json.return_value = self.data_stream_unsupported_broker http_session.return_value = http_session_mock + server_version.return_value = {"backendVersion": "0.x.y"} + client = StreamPipesClient( client_config=StreamPipesClientConfig( credential_provider=StreamPipesApiKeyCredentials(username="user", api_key="key"), @@ -261,8 +270,9 @@ def test_function_handler_unsupported_broker(self, http_session: MagicMock, conn @patch("streampipes.functions.broker.nats.nats_consumer.connect", autospec=True) @patch("streampipes.functions.broker.NatsConsumer.get_message", autospec=True) - @patch("streampipes.endpoint.endpoint.APIEndpoint.get", autospec=True) - def test_two_streams_nats(self, endpoint: MagicMock, nats_broker: MagicMock, *args: Tuple[AsyncMock]): + @patch("streampipes.endpoint.api.DataStreamEndpoint.get", autospec=True) + @patch("streampipes.client.client.StreamPipesClient._get_server_version", autospec=True) + def test_two_streams_nats(self, server_version: MagicMock, endpoint: MagicMock, nats_broker: MagicMock, *args: Tuple[AsyncMock]): def get_stream(endpoint, stream_id): if stream_id == "urn:streampipes.apache.org:eventstream:uPDKLI": return DataStream(**self.data_stream_nats) @@ -272,6 +282,7 @@ def get_stream(endpoint, stream_id): data_stream.event_grounding.transport_protocols[0].topic_definition.actual_topic_name = "test2" return data_stream + server_version.return_value = {"backendVersion": "0.x.y"} endpoint.side_effect = get_stream def get_message(broker): @@ -318,8 +329,10 @@ def get_message(broker): @patch("streampipes.functions.broker.NatsConsumer.get_message", autospec=True) @patch("streampipes.functions.broker.NatsPublisher.publish_event", autospec=True) @patch("streampipes.client.client.Session", autospec=True) + @patch("streampipes.client.client.StreamPipesClient._get_server_version", autospec=True) def test_function_output_stream_nats( self, + server_version: MagicMock, http_session: MagicMock, pulish_event: MagicMock, get_message: MagicMock, @@ -330,6 +343,8 @@ def test_function_output_stream_nats( http_session_mock.get.return_value.json.return_value = self.data_stream_nats http_session.return_value = http_session_mock + server_version.return_value = {"backendVersion": "0.x.y"} + output_events = [] def save_event(self, event: Dict[str, Any]): @@ -370,8 +385,10 @@ def save_event(self, event: Dict[str, Any]): @patch("streampipes.functions.broker.kafka.kafka_consumer.KafkaConnection", autospec=True) @patch("streampipes.functions.broker.KafkaPublisher.publish_event", autospec=True) @patch("streampipes.client.client.Session", autospec=True) + @patch("streampipes.client.client.StreamPipesClient._get_server_version", autospec=True) def test_function_output_stream_kafka( self, + server_version: MagicMock, http_session: MagicMock, pulish_event: MagicMock, connection: MagicMock, @@ -384,6 +401,8 @@ def test_function_output_stream_kafka( http_session_mock.get.return_value.json.return_value = self.data_stream_kafka http_session.return_value = http_session_mock + server_version.return_value = {"backendVersion": "0.x.y"} + output_events = [] def save_event(self, event: Dict[str, Any]): diff --git a/streampipes-client-python/tests/functions/test_river_function.py b/streampipes-client-python/tests/functions/test_river_function.py index a850501efb..985a1359a7 100644 --- a/streampipes-client-python/tests/functions/test_river_function.py +++ b/streampipes-client-python/tests/functions/test_river_function.py @@ -76,8 +76,10 @@ def setUp(self) -> None: @patch("streampipes.functions.broker.NatsConsumer.get_message", autospec=True) @patch("streampipes.functions.broker.NatsPublisher.publish_event", autospec=True) @patch("streampipes.client.client.Session", autospec=True) + @patch("streampipes.client.client.StreamPipesClient._get_server_version", autospec=True) def test_river_function_unsupervised( self, + server_version: MagicMock, http_session: MagicMock, pulish_event: MagicMock, get_message: MagicMock, @@ -88,6 +90,8 @@ def test_river_function_unsupervised( http_session_mock.get.return_value.json.return_value = self.data_stream http_session.return_value = http_session_mock + server_version.return_value = {"backendVersion": '0.x.y'} + output_events = [] def save_event(self, event: Dict[str, Any]): @@ -141,8 +145,10 @@ def on_event(self, event, stream_id): @patch("streampipes.functions.broker.NatsConsumer.get_message", autospec=True) @patch("streampipes.functions.broker.NatsPublisher.publish_event", autospec=True) @patch("streampipes.client.client.Session", autospec=True) + @patch("streampipes.client.client.StreamPipesClient._get_server_version", autospec=True) def test_river_function_supervised( self, + server_version: MagicMock, http_session: MagicMock, pulish_event: MagicMock, get_message: MagicMock, @@ -153,6 +159,8 @@ def test_river_function_supervised( http_session_mock.get.return_value.json.return_value = self.data_stream http_session.return_value = http_session_mock + server_version.return_value = {"backendVersion": '0.x.y'} + output_events = [] def save_event(self, event: Dict[str, Any]):