Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/dev' into parse-time
Browse files Browse the repository at this point in the history
  • Loading branch information
bossenti committed Dec 7, 2023
2 parents f2dbec0 + 5afa814 commit c86cb28
Show file tree
Hide file tree
Showing 41 changed files with 915 additions and 223 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/pr-validation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ jobs:
- uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: 3.8

Expand Down Expand Up @@ -151,7 +151,7 @@ jobs:
- uses: actions/checkout@v4

- name: Set up Python v${{ matrix.python }}
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python }}

Expand Down Expand Up @@ -179,7 +179,7 @@ jobs:
- uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: 3.8

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pypi-deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
- uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: 3.8

Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/python-docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
git config user.email 'github-actions[bot]@users.noreply.github.com'
- name: Set up Python
uses: actions/setup-python@v4
uses: actions/setup-python@v5
with:
python-version: 3.8

Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@
<apache-sis-referencing.version>1.2</apache-sis-referencing.version>
<boofcv.version>1.1.0</boofcv.version>
<classindex.version>3.9</classindex.version>
<checker-qual.version>3.39.0</checker-qual.version>
<commons-codec.version>1.15</commons-codec.version>
<checker-qual.version>3.41.0</checker-qual.version>
<commons-codec.version>1.16.0</commons-codec.version>
<commons-collections.version>3.2.2</commons-collections.version>
<commons-compress.version>1.25.0</commons-compress.version>
<commons-io.version>2.11.0</commons-io.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.streampipes.connect.management.util.WorkerPaths;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.storage.api.IAdapterStorage;
import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -41,13 +40,10 @@ public class AdapterHealthCheck implements Runnable {
private final IAdapterStorage adapterStorage;
private final AdapterMasterManagement adapterMasterManagement;

public AdapterHealthCheck() {
this.adapterStorage = CouchDbStorageManager.INSTANCE.getAdapterInstanceStorage();
this.adapterMasterManagement = new AdapterMasterManagement();
}

public AdapterHealthCheck(IAdapterStorage adapterStorage,
AdapterMasterManagement adapterMasterManagement) {
public AdapterHealthCheck(
IAdapterStorage adapterStorage,
AdapterMasterManagement adapterMasterManagement
) {
this.adapterStorage = adapterStorage;
this.adapterMasterManagement = adapterMasterManagement;
}
Expand Down Expand Up @@ -82,8 +78,14 @@ public void checkAndRestoreAdapters() {
public Map<String, AdapterDescription> getAllRunningInstancesAdapterDescriptions() {
Map<String, AdapterDescription> result = new HashMap<>();
List<AdapterDescription> allRunningInstancesAdapterDescription = this.adapterStorage.getAllAdapters();
allRunningInstancesAdapterDescription.forEach(adapterDescription ->
result.put(adapterDescription.getElementId(), adapterDescription));
allRunningInstancesAdapterDescription
.stream()
.filter(AdapterDescription::isRunning)
.forEach(adapterDescription ->
result.put(
adapterDescription.getElementId(),
adapterDescription
));

return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.streampipes.resource.management.SpResourceManager;
import org.apache.streampipes.storage.api.IAdapterStorage;
import org.apache.streampipes.storage.couchdb.CouchDbStorageManager;
import org.apache.streampipes.storage.management.StorageDispatcher;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;

import org.slf4j.Logger;
Expand All @@ -47,7 +48,11 @@ public class WorkerAdministrationManagement {
private final AdapterHealthCheck adapterHealthCheck;

public WorkerAdministrationManagement() {
this.adapterHealthCheck = new AdapterHealthCheck();
this.adapterHealthCheck = new AdapterHealthCheck(
StorageDispatcher.INSTANCE.getNoSqlStore()
.getAdapterInstanceStorage(),
new AdapterMasterManagement()
);
this.adapterDescriptionStorage = CouchDbStorageManager.INSTANCE.getAdapterDescriptionStorage();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.connect.management.health;

import org.apache.streampipes.connect.management.management.AdapterMasterManagement;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.storage.api.IAdapterStorage;

import org.junit.Before;
import org.junit.Test;

import java.util.List;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class AdapterHealthCheckTest {

private IAdapterStorage adapterInstanceStorageMock;

@Before
public void setUp() {
adapterInstanceStorageMock = mock(IAdapterStorage.class);
}

@Test
public void getAllRunningInstancesAdapterDescriptionsEmpty() {
when(adapterInstanceStorageMock.getAllAdapters()).thenReturn(List.of());

var healthCheck = new AdapterHealthCheck(adapterInstanceStorageMock, new AdapterMasterManagement());
var result = healthCheck.getAllRunningInstancesAdapterDescriptions();

assertTrue(result.isEmpty());
}

@Test
public void getAllRunningInstancesAdapterDescriptionsMixed() {

var nameRunningAdapter = "running-adapter";
var nameStoppedAdapter = "stopped-adapter";

var stoppedAdapter = new AdapterDescription();
stoppedAdapter.setElementId(nameStoppedAdapter);
stoppedAdapter.setRunning(false);

var runningAdapter = new AdapterDescription();
runningAdapter.setElementId(nameRunningAdapter);
runningAdapter.setRunning(true);

when(adapterInstanceStorageMock.getAllAdapters()).thenReturn(List.of(stoppedAdapter, runningAdapter));

var healthCheck = new AdapterHealthCheck(adapterInstanceStorageMock, new AdapterMasterManagement());
var result = healthCheck.getAllRunningInstancesAdapterDescriptions();

assertEquals(1, result.size());
assertTrue(result.containsKey(nameRunningAdapter));
assertEquals(runningAdapter, result.get(nameRunningAdapter));

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public TimeSeriesStore(Environment environment,
DataExplorerUtils.sanitizeAndRegisterAtDataLake(client, measure);

if (enableImageStore) {
// TODO check if event properties are replaces correctly
this.imageStore = new ImageStore(measure, environment);
}

Expand All @@ -67,11 +66,6 @@ public boolean onEvent(Event event) throws SpRuntimeException {
return true;
}


public boolean alterRetentionTime(DataLakeMeasure dataLakeMeasure) {
return true;
}

public void close() throws SpRuntimeException {
if (imageStore != null) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,10 @@ public boolean databaseExists(
return false;
}


private static Environment getEnvironment() {
return Environments.getEnvironment();
}


}
12 changes: 11 additions & 1 deletion streampipes-data-explorer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,23 @@
<artifactId>streampipes-storage-management</artifactId>
<version>0.95.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-test-utils</artifactId>
<version>0.95.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
Expand Down
Loading

0 comments on commit c86cb28

Please sign in to comment.