Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Gateway): forward EDR + refactor #597

Merged
merged 2 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion DEPENDENCIES
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,6 @@ maven/mavencentral/org.eclipse.edc/util/0.1.3, Apache-2.0, approved, technology.
maven/mavencentral/org.eclipse.edc/validator-core/0.1.3, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/validator-spi/0.1.3, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/vault-azure/0.1.3, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/vault-filesystem/0.1.3, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.edc/web-spi/0.1.3, Apache-2.0, approved, technology.edc
maven/mavencentral/org.eclipse.jetty.toolchain/jetty-jakarta-servlet-api/5.0.2, EPL-2.0 OR Apache-2.0, approved, rt.jetty
maven/mavencentral/org.eclipse.jetty.toolchain/jetty-jakarta-websocket-api/2.0.0, EPL-2.0 OR Apache-2.0, approved, rt.jetty
Expand Down
20 changes: 20 additions & 0 deletions edc-extensions/control-plane-adapter-api/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Control Plane Adapter API (EDR management)

This module provide a new APIs on top of the EDC management APIs for dealing with EDRs token.

The APIs are mounted in the same context of the `management` APIs. So no additional configuration is required.

The base path of the API will be `<mgmtContext>/adapter/edrs`

This module for now provides three APIs:

- Initiating an EDR negotiation token
- Fetching the available EDRs
- Fetching the single EDR

The initiate negotiation EDR leverage the callbacks mechanism introduced in the latest EDC, and it handles
the contract negotiation and the transfer request in one API call. Once the transfer has been completed
the provider will return the EDR that will be stored into the consumer EDR store/cache. Users can interact
with the EDR store/cache for fetching the EDR and then requesting the data, or can use the `proxy` API described [here](../dataplane-proxy/edc-dataplane-proxy-consumer-api/README.md)

An overview on how to use the EDR APIs is available [here](../../docs/samples/edr-api-overview/edr-api-overview.md)
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# DataPlane Proxy Consumer API

This is an API extension that interacts with the EDR/cache for directly fetching the data
without knowing the EDR.

It contains only one endpoint with `POST` for fetching data:

The path is `<proxyContext>/aas/request` and the body is something like this example:

```json
{
"assetId": "1",
"endpointUrl": "http://localhost:8181/api/gateway/aas/test"
}
```

The body should contain the `assetId` or the `transferProcessId` which identify the data that we want to fetch
and an `endpointUrl` which is the provider gateway on which the data is available. More info [here](../edc-dataplane-proxy-provider-api/README.md) on the gateway.

## Configuration

| Key | Required | Default | Description |
|---------------------------------|----------|--------------------------------------------|
| web.http.proxy.port | | 8186 | |
| web.http.proxy.path | | /proxy | |
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# DataPlane Proxy Provider API

This extension provide additional dataplane extension for proxying requests to backends.
The configuration of the proxy can be found [here](../edc-dataplane-proxy-provider-core/README.md)

The provider proxy is mounted into the EDC default context, and it's available in the path `<defaultContext>/gateway`

The proxy will look for subPath in the request and match the subpath with the configured ones and forward
the rest of the path and query parameters.

For example:

with this URL `http://localhost:8181/api/gateway/aas/test` it will look for the `aas` alias in the configuration,
and it will compose the final url to call based on that configuration appending to it the remaining part of the path and query
parameters.

When the proxy receive a request, it must contain the EDR, which will be decoded with the `token` validation endpoint.

## Configuration

| Key | Required | Default | Description |
|---------------------------------|----------|----------------------------------------------------------------------------------------|
| tx.dpf.provider.proxy.thread.pool | | 10 | Thread pool size for the provider data plane proxy gateway |
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,8 @@ dependencies {
implementation(libs.nimbus.jwt)

implementation(project(":edc-extensions:dataplane-proxy:edc-dataplane-proxy-provider-spi"))

testImplementation(libs.edc.junit)
testImplementation(libs.okhttp.mockwebserver)
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.http.EdcHttpClient;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.types.TypeManager;
import org.eclipse.edc.web.spi.WebService;
import org.eclipse.tractusx.edc.dataplane.proxy.provider.api.gateway.ProviderGatewayController;
import org.eclipse.tractusx.edc.dataplane.proxy.provider.api.validation.ProxyProviderDataAddressResolver;
import org.eclipse.tractusx.edc.dataplane.proxy.spi.provider.gateway.authorization.AuthorizationHandlerRegistry;
import org.eclipse.tractusx.edc.dataplane.proxy.spi.provider.gateway.configuration.GatewayConfigurationRegistry;

Expand All @@ -35,13 +38,12 @@
*/
@Extension(value = DataPlaneProxyProviderApiExtension.NAME)
public class DataPlaneProxyProviderApiExtension implements ServiceExtension {
public static final int DEFAULT_THREAD_POOL = 10;
static final String NAME = "Data Plane Proxy Provider API";

@Setting(value = "Thread pool size for the provider data plane proxy gateway", type = "int")
private static final String THREAD_POOL_SIZE = "tx.dpf.provider.proxy.thread.pool";

public static final int DEFAULT_THREAD_POOL = 10;

@Setting
private static final String CONTROL_PLANE_VALIDATION_ENDPOINT = "edc.dataplane.token.validation.endpoint";
@Inject
private WebService webService;

Expand All @@ -57,6 +59,12 @@ public class DataPlaneProxyProviderApiExtension implements ServiceExtension {
@Inject
private AuthorizationHandlerRegistry authorizationRegistry;

@Inject
private TypeManager typeManager;

@Inject
private EdcHttpClient httpClient;

private ExecutorService executorService;

@Override
Expand All @@ -68,7 +76,12 @@ public String name() {
public void initialize(ServiceExtensionContext context) {
executorService = newFixedThreadPool(context.getSetting(THREAD_POOL_SIZE, DEFAULT_THREAD_POOL));

var validationEndpoint = context.getConfig().getString(CONTROL_PLANE_VALIDATION_ENDPOINT);

var dataAddressResolver = new ProxyProviderDataAddressResolver(httpClient, validationEndpoint, typeManager.getMapper());

var controller = new ProviderGatewayController(dataPlaneManager,
dataAddressResolver,
configurationRegistry,
authorizationRegistry,
executorService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import jakarta.ws.rs.core.StreamingOutput;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult;
import org.eclipse.edc.connector.dataplane.spi.resolver.DataAddressResolver;
import org.eclipse.edc.connector.dataplane.util.sink.AsyncStreamingDataSink;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.HttpDataAddress;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowRequest;
import org.eclipse.tractusx.edc.dataplane.proxy.spi.provider.gateway.authorization.AuthorizationHandlerRegistry;
import org.eclipse.tractusx.edc.dataplane.proxy.spi.provider.gateway.configuration.GatewayConfiguration;
Expand All @@ -43,6 +46,7 @@
import static jakarta.ws.rs.core.Response.Status.UNAUTHORIZED;
import static jakarta.ws.rs.core.Response.status;
import static java.lang.String.format;
import static java.lang.String.join;
import static java.util.UUID.randomUUID;
import static java.util.stream.Collectors.joining;
import static org.eclipse.tractusx.edc.dataplane.proxy.provider.api.response.ResponseHelper.createMessageResponse;
Expand All @@ -53,8 +57,6 @@
@Path("/" + ProviderGatewayController.GATEWAY_PATH)
public class ProviderGatewayController implements ProviderGatewayApi {
protected static final String GATEWAY_PATH = "gateway";

private static final String HTTP_DATA = "HttpData";
private static final String BASE_URL = "baseUrl";
private static final String ASYNC = "async";

Expand All @@ -65,16 +67,20 @@ public class ProviderGatewayController implements ProviderGatewayApi {
private final GatewayConfigurationRegistry configurationRegistry;
private final AuthorizationHandlerRegistry authorizationRegistry;

private final DataAddressResolver dataAddressResolver;

private final Monitor monitor;

private final ExecutorService executorService;

public ProviderGatewayController(DataPlaneManager dataPlaneManager,
DataAddressResolver dataAddressResolver,
GatewayConfigurationRegistry configurationRegistry,
AuthorizationHandlerRegistry authorizationRegistry,
ExecutorService executorService,
Monitor monitor) {
this.dataPlaneManager = dataPlaneManager;
this.dataAddressResolver = dataAddressResolver;
this.configurationRegistry = configurationRegistry;
this.authorizationRegistry = authorizationRegistry;
this.executorService = executorService;
Expand All @@ -98,6 +104,7 @@ public void requestAsset(@Context ContainerRequestContext context, @Suspended As
token = token.substring(BEARER_PREFIX.length());
}


var uriInfo = context.getUriInfo();
var segments = uriInfo.getPathSegments();
if (segments.size() < 3 || !GATEWAY_PATH.equals(segments.get(0).getPath())) {
Expand All @@ -112,6 +119,22 @@ public void requestAsset(@Context ContainerRequestContext context, @Suspended As
return;
}

var httpDataAddressResult = extractSourceDataAddress(token, configuration);
HttpDataAddress httpDataAddress = null;

if (httpDataAddressResult.succeeded()) {
httpDataAddress = httpDataAddressResult.getContent();
} else {
monitor.debug(join(", ", httpDataAddressResult.getFailureMessages()));
response.resume(createMessageResponse(UNAUTHORIZED, "Failed to decode data address", context.getMediaType()));
return;
}

if (!configuration.getProxiedPath().startsWith(httpDataAddress.getBaseUrl())) {
response.resume(createMessageResponse(NOT_FOUND, "Data address path not matched", context.getMediaType()));
return;
}

// calculate the sub-path, which all segments after the GATEWAY segment, including the alias segment
var subPath = segments.stream().skip(1).map(PathSegment::getPath).collect(joining("/"));
if (!authenticate(token, configuration.getAuthorizationType(), subPath, context, response)) {
Expand All @@ -120,7 +143,7 @@ public void requestAsset(@Context ContainerRequestContext context, @Suspended As

// calculate the request path, which all segments after the alias segment
var requestPath = segments.stream().skip(2).map(PathSegment::getPath).collect(joining("/"));
var flowRequest = createRequest(requestPath, configuration);
var flowRequest = createRequest(requestPath, configuration, httpDataAddress);

// transfer the data asynchronously
var sink = new AsyncStreamingDataSink(consumer -> response.resume((StreamingOutput) consumer::accept), executorService, monitor);
Expand All @@ -132,13 +155,13 @@ public void requestAsset(@Context ContainerRequestContext context, @Suspended As
}
}

private DataFlowRequest createRequest(String subPath, GatewayConfiguration configuration) {
private DataFlowRequest createRequest(String subPath, GatewayConfiguration configuration, HttpDataAddress httpDataAddress) {
var path = configuration.getProxiedPath() + "/" + subPath;

var sourceAddress = DataAddress.Builder.newInstance()
.type(HTTP_DATA)
.property(BASE_URL, path)
.build();
var sourceAddressBuilder = HttpDataAddress.Builder.newInstance()
.property(BASE_URL, path);

httpDataAddress.getAdditionalHeaders().forEach(sourceAddressBuilder::addAdditionalHeader);

var destinationAddress = DataAddress.Builder.newInstance()
.type(ASYNC)
Expand All @@ -147,7 +170,7 @@ private DataFlowRequest createRequest(String subPath, GatewayConfiguration confi
return DataFlowRequest.Builder.newInstance()
.processId(randomUUID().toString())
.trackable(false)
.sourceDataAddress(sourceAddress)
.sourceDataAddress(sourceAddressBuilder.build())
.destinationDataAddress(destinationAddress)
.traceContext(Map.of())
.build();
Expand Down Expand Up @@ -203,4 +226,16 @@ private void reportError(AsyncResponse response, Throwable throwable) {
response.resume(entity);
}


private Result<HttpDataAddress> extractSourceDataAddress(String token, GatewayConfiguration configuration) {
return dataAddressResolver.resolve(token).map(dataAddress -> mapToHttpDataAddress(dataAddress, configuration, token));
}

private HttpDataAddress mapToHttpDataAddress(DataAddress dataAddress, GatewayConfiguration configuration, String token) {
var builder = HttpDataAddress.Builder.newInstance().copyFrom(dataAddress);
if (configuration.isForwardEdrToken()) {
builder.addAdditionalHeader(configuration.getForwardEdrTokenHeaderKey(), token);
}
return builder.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright (c) 2022 Amadeus
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Amadeus - initial API and implementation
*
*/

package org.eclipse.tractusx.edc.dataplane.proxy.provider.api.validation;

import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.ws.rs.core.HttpHeaders;
import okhttp3.Request;
import org.eclipse.edc.connector.dataplane.spi.resolver.DataAddressResolver;
import org.eclipse.edc.spi.http.EdcHttpClient;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.types.domain.DataAddress;

import java.io.IOException;

import static java.lang.String.format;

public class ProxyProviderDataAddressResolver implements DataAddressResolver {

private final EdcHttpClient httpClient;
private final String endpoint;
private final ObjectMapper mapper;

public ProxyProviderDataAddressResolver(EdcHttpClient httpClient, String endpoint, ObjectMapper mapper) {
this.httpClient = httpClient;
this.endpoint = endpoint;
this.mapper = mapper;
}

/**
* Resolves access token received in input of Data Plane public API (consumer pull) into the {@link DataAddress}
* of the requested data.
*
* @param token Access token received in input of the Data Plane public API
* @return Data address
*/
@Override
public Result<DataAddress> resolve(String token) {
var request = new Request.Builder().url(endpoint).header(HttpHeaders.AUTHORIZATION, token).get().build();
try (var response = httpClient.execute(request)) {
var body = response.body();
var stringBody = body != null ? body.string() : null;
if (stringBody == null) {
return Result.failure("Token validation server returned null body");
}

if (response.isSuccessful()) {
return Result.success(mapper.readValue(stringBody, DataAddress.class));
} else {
return Result.failure(format("Call to token validation sever failed: %s - %s. %s", response.code(), response.message(), stringBody));
}
} catch (IOException e) {
return Result.failure("Unhandled exception occurred during call to token validation server: " + e.getMessage());
}
}
}
Loading