Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(CPA): obliviates the control plane adapter term and module refactor #606

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/edr-cache-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ dependencies {
implementation(libs.edc.config.filesystem)
implementation(libs.edc.util)

implementation(project(":spi:edr-cache-spi"))
implementation(project(":spi:edr-spi"))

testImplementation(testFixtures(project(":spi:edr-cache-spi")))
testImplementation(testFixtures(project(":spi:edr-spi")))

}

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.tractusx.edc.edr.core.defaults.InMemoryEndpointDataReferenceCache;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCache;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;

/**
* Registers default services for the EDR cache.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,19 @@
package org.eclipse.tractusx.edc.edr.core.defaults;

import org.eclipse.edc.spi.query.BaseCriterionToPredicateConverter;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry;
import org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry;

public class EdrCacheEntryPredicateConverter extends BaseCriterionToPredicateConverter<EndpointDataReferenceEntry> {

@Override
protected Object property(String key, Object object) {
if (object instanceof EndpointDataReferenceEntry) {
var entry = (EndpointDataReferenceEntry) object;
switch (key) {
case "assetId":
return entry.getAssetId();
case "agreementId":
return entry.getAgreementId();
default:
return null;
}
if (object instanceof EndpointDataReferenceEntry entry) {
return switch (key) {
case "assetId" -> entry.getAssetId();
case "agreementId" -> entry.getAgreementId();
case "providerId" -> entry.getProviderId();
default -> null;
};
}
throw new IllegalArgumentException("Can only handle objects of type " + EndpointDataReferenceEntry.class.getSimpleName() + " but received an " + object.getClass().getSimpleName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import org.eclipse.edc.spi.result.StoreResult;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.edc.util.concurrency.LockManager;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCache;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;
import org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand All @@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
Expand Down Expand Up @@ -66,12 +67,22 @@ public InMemoryEndpointDataReferenceCache() {

@Override
@NotNull
public List<EndpointDataReference> referencesForAsset(String assetId) {
public List<EndpointDataReference> referencesForAsset(String assetId, String providerId) {
var entries = entriesByAssetId.get(assetId);

Predicate<EndpointDataReferenceEntry> providerIdFilter = (cached) ->
Optional.ofNullable(providerId)
.map(id -> id.equals(cached.getProviderId()))
.orElse(true);

if (entries == null) {
return emptyList();
}
return entries.stream().map(e -> resolveReference(e.getTransferProcessId())).filter(Objects::nonNull).collect(toList());
return entries.stream()
.filter(providerIdFilter)
.map(e -> resolveReference(e.getTransferProcessId()))
.filter(Objects::nonNull)
.collect(toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry;
import org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry;

/**
* A wrapper to persist {@link EndpointDataReferenceEntry}s and {@link EndpointDataReference}s.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

package org.eclipse.tractusx.edc.edr.core.defaults;

import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCache;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCacheBaseTest;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;

class InMemoryEndpointDataReferenceCacheTest extends EndpointDataReferenceCacheBaseTest {
private final InMemoryEndpointDataReferenceCache cache = new InMemoryEndpointDataReferenceCache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry;
import org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry;
import org.junit.jupiter.api.Test;

import static java.util.UUID.randomUUID;
Expand All @@ -39,6 +39,7 @@ void verify_serializeDeserialize() throws JsonProcessingException {
.assetId(randomUUID().toString())
.agreementId(randomUUID().toString())
.transferProcessId(randomUUID().toString())
.providerId(randomUUID().toString())
.build();

var serialized = mapper.writeValueAsString(new PersistentCacheEntry(edrEntry, edr));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,20 @@

plugins {
`java-library`
`maven-publish`
}

dependencies {
implementation(project(":spi:control-plane-adapter-spi"))
implementation(project(":spi:edr-cache-spi"))
implementation(libs.edc.spi.core)
implementation(libs.edc.spi.transfer)
implementation(libs.edc.config.filesystem)
implementation(libs.edc.util)
implementation(libs.edc.spi.aggregateservices)
implementation(libs.edc.spi.contract)
implementation(libs.edc.spi.controlplane)
implementation(libs.edc.spi.aggregateservices)

implementation(project(":spi:edr-spi"))

testImplementation(libs.edc.junit)
testImplementation(testFixtures(project(":spi:edr-spi")))

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.edr.core;

import org.eclipse.edc.connector.spi.contractnegotiation.ContractNegotiationService;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.tractusx.edc.edr.core.service.EdrServiceImpl;
import org.eclipse.tractusx.edc.edr.spi.service.EdrService;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;

/**
* Registers default services for the EDR cache.
*/
@Extension(value = EdrCoreExtension.NAME)
public class EdrCoreExtension implements ServiceExtension {
static final String NAME = "EDR Core";


@Inject
private Monitor monitor;

@Inject
private ContractNegotiationService contractNegotiationService;

@Inject
private EndpointDataReferenceCache endpointDataReferenceCache;

@Override
public String name() {
return NAME;
}


@Provider
public EdrService adapterTransferProcessService() {
return new EdrServiceImpl(contractNegotiationService, endpointDataReferenceCache);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,20 @@
*
*/

package org.eclipse.tractusx.edc.cp.adapter.callback;
package org.eclipse.tractusx.edc.edr.core.service;

import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequest;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequestData;
import org.eclipse.edc.connector.spi.contractnegotiation.ContractNegotiationService;
import org.eclipse.edc.service.spi.result.ServiceResult;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.types.domain.callback.CallbackAddress;
import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceCache;
import org.eclipse.tractusx.edc.edr.spi.EndpointDataReferenceEntry;
import org.eclipse.tractusx.edc.spi.cp.adapter.model.NegotiateEdrRequest;
import org.eclipse.tractusx.edc.spi.cp.adapter.service.AdapterTransferProcessService;
import org.eclipse.tractusx.edc.edr.spi.service.EdrService;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;
import org.eclipse.tractusx.edc.edr.spi.types.EndpointDataReferenceEntry;
import org.eclipse.tractusx.edc.edr.spi.types.NegotiateEdrRequest;

import java.util.List;
import java.util.Optional;
Expand All @@ -35,10 +34,8 @@
import java.util.stream.Stream;

import static java.lang.String.format;
import static org.eclipse.edc.service.spi.result.ServiceResult.notFound;
import static org.eclipse.edc.service.spi.result.ServiceResult.success;

public class AdapterTransferProcessServiceImpl implements AdapterTransferProcessService {
public class EdrServiceImpl implements EdrService {

public static final String LOCAL_ADAPTER_URI = "local://adapter";
public static final Set<String> LOCAL_EVENTS = Set.of("contract.negotiation", "transfer.process");
Expand All @@ -51,63 +48,50 @@ public class AdapterTransferProcessServiceImpl implements AdapterTransferProcess

private final EndpointDataReferenceCache endpointDataReferenceCache;

public AdapterTransferProcessServiceImpl(ContractNegotiationService contractNegotiationService, EndpointDataReferenceCache endpointDataReferenceCache) {
public EdrServiceImpl(ContractNegotiationService contractNegotiationService, EndpointDataReferenceCache endpointDataReferenceCache) {
this.contractNegotiationService = contractNegotiationService;
this.endpointDataReferenceCache = endpointDataReferenceCache;
}

@Override
public ServiceResult<ContractNegotiation> initiateEdrNegotiation(NegotiateEdrRequest request) {
var contractNegotiation = contractNegotiationService.initiateNegotiation(createContractRequest(request));
return success(contractNegotiation);
}

private ContractRequest createContractRequest(NegotiateEdrRequest request) {
var callbacks = Stream.concat(request.getCallbackAddresses().stream(), Stream.of(LOCAL_CALLBACK)).collect(Collectors.toList());

var requestData = ContractRequestData.Builder.newInstance()
.contractOffer(request.getOffer())
.protocol(request.getProtocol())
.counterPartyAddress(request.getConnectorAddress())
.connectorId(request.getConnectorId())
.build();

return ContractRequest.Builder.newInstance()
.requestData(requestData)
.callbackAddresses(callbacks).build();
return ServiceResult.success(contractNegotiation);
}

@Override
public ServiceResult<EndpointDataReference> findByTransferProcessId(String transferProcessId) {
var edr = endpointDataReferenceCache.resolveReference(transferProcessId);
return Optional.ofNullable(edr)
.map(ServiceResult::success)
.orElse(notFound(format("No Edr found associated to the transfer process with id: %s", transferProcessId)));
.orElse(ServiceResult.notFound(format("No Edr found associated to the transfer process with id: %s", transferProcessId)));
}

@Override
public ServiceResult<List<EndpointDataReferenceEntry>> findByAssetAndAgreement(String assetId, String agreementId) {
var results = queryEdrs(assetId, agreementId).collect(Collectors.toList());
return success(results);
public ServiceResult<List<EndpointDataReferenceEntry>> findBy(QuerySpec querySpec) {
var results = endpointDataReferenceCache.queryForEntries(querySpec).collect(Collectors.toList());
return ServiceResult.success(results);
}

private Stream<EndpointDataReferenceEntry> queryEdrs(String assetId, String agreementId) {
var queryBuilder = QuerySpec.Builder.newInstance();
if (assetId != null) {
queryBuilder.filter(fieldFilter("assetId", assetId));
}
if (agreementId != null) {
queryBuilder.filter(fieldFilter("agreementId", agreementId));
}
return endpointDataReferenceCache.queryForEntries(queryBuilder.build());
@Override
public ServiceResult<EndpointDataReferenceEntry> deleteByTransferProcessId(String transferProcessId) {
var deleted = endpointDataReferenceCache.deleteByTransferProcessId(transferProcessId);
return ServiceResult.from(deleted);
}

private ContractRequest createContractRequest(NegotiateEdrRequest request) {
var callbacks = Stream.concat(request.getCallbackAddresses().stream(), Stream.of(LOCAL_CALLBACK)).collect(Collectors.toList());

private Criterion fieldFilter(String field, String value) {
return Criterion.Builder.newInstance()
.operandLeft(field)
.operator("=")
.operandRight(value)
var requestData = ContractRequestData.Builder.newInstance()
.contractOffer(request.getOffer())
.protocol(request.getProtocol())
.counterPartyAddress(request.getConnectorAddress())
.connectorId(request.getConnectorId())
.build();

return ContractRequest.Builder.newInstance()
.requestData(requestData)
.callbackAddresses(callbacks).build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
#
#

org.eclipse.tractusx.edc.api.cp.adapter.AdapterApiExtension
org.eclipse.tractusx.edc.edr.core.EdrCoreExtension
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.edr.core.service;

import org.eclipse.edc.connector.spi.contractnegotiation.ContractNegotiationService;
import org.eclipse.edc.junit.extensions.DependencyInjectionExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.system.injection.ObjectFactory;
import org.eclipse.tractusx.edc.edr.core.EdrCoreExtension;
import org.eclipse.tractusx.edc.edr.spi.store.EndpointDataReferenceCache;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

@ExtendWith(DependencyInjectionExtension.class)
public class EdrCoreExtensionTest {

EdrCoreExtension extension;

@BeforeEach
void setUp(ObjectFactory factory, ServiceExtensionContext context) {
context.registerService(ContractNegotiationService.class, mock(ContractNegotiationService.class));
context.registerService(EndpointDataReferenceCache.class, mock(EndpointDataReferenceCache.class));
extension = factory.constructInstance(EdrCoreExtension.class);
}

@Test
void shouldInitializeTheExtension(ServiceExtensionContext context) {
extension.initialize(context);

var service = extension.adapterTransferProcessService();
assertThat(service).isInstanceOf(EdrServiceImpl.class);

}
}
Loading