diff --git a/build.gradle.kts b/build.gradle.kts index ad72c9c..0b32309 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -8,7 +8,7 @@ plugins { } group = "org.eclipse.dataplane-core" -version = "0.0.11-SNAPSHOT" +version = "0.0.12-SNAPSHOT" subprojects { apply(plugin = "signing") diff --git a/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/Dataplane.java b/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/Dataplane.java index 10c01e2..a821e2a 100644 --- a/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/Dataplane.java +++ b/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/Dataplane.java @@ -62,13 +62,15 @@ import java.util.Set; import java.util.UUID; +import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL; import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; import static jakarta.ws.rs.core.HttpHeaders.AUTHORIZATION; -import static java.util.Collections.emptyMap; public class Dataplane { - private final ObjectMapper objectMapper = new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false); + private final ObjectMapper objectMapper = new ObjectMapper() + .configure(FAIL_ON_UNKNOWN_PROPERTIES, false) + .setDefaultPropertyInclusion(NON_NULL); private DataFlowStore dataFlowStore = new InMemoryDataFlowStore(objectMapper); private ControlPlaneStore controlPlaneStore = new InMemoryControlPlaneStore(objectMapper); private String id; @@ -265,8 +267,8 @@ public Result notifyCompleted(String dataFlowId) { return dataFlowStore.findById(dataFlowId) .compose(dataFlow -> { dataFlow.transitionToCompleted(); - - return notifyControlPlane("completed", dataFlow, emptyMap()); + var message = new DataFlowStatusMessage(dataFlowId, dataFlow.getState().name(), null, null); + return notifyControlPlane("completed", dataFlow, message); }); } @@ -372,7 +374,7 @@ private Result notifyControlPlane(String action, DataFlow dataFlow, Object }) .onSuccess(authorizationHeader -> requestBuilder.header(AUTHORIZATION, authorizationHeader)); - return httpClient.send(requestBuilder.build(), HttpResponse.BodyHandlers.discarding()); + return httpClient.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofString()); }) .compose(response -> { var successful = response.statusCode() >= 200 && response.statusCode() < 300; diff --git a/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/DataAddress.java b/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/DataAddress.java index 073f040..d495280 100644 --- a/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/DataAddress.java +++ b/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/DataAddress.java @@ -14,19 +14,21 @@ package org.eclipse.dataplane.domain; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; import java.util.List; +@JsonIgnoreProperties(value = {"@type"}, allowGetters = true) public record DataAddress( - @JsonProperty("@type") String type, String endpointType, String endpoint, List endpointProperties ) { - public DataAddress(String endpointType, String endpoint, List endpointProperties) { - this("DataAddress", endpointType, endpoint, endpointProperties); + @JsonProperty("@type") + public String getType() { + return "DataAddress"; } public record EndpointProperty( diff --git a/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java b/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java index b1b8f36..99055cf 100644 --- a/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java +++ b/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java @@ -132,7 +132,11 @@ public void transitionToTerminated(String reason) { } public boolean isPush() { - return transferType.split("-")[1].equalsIgnoreCase("PUSH"); + return transferTypeLastToken().equalsIgnoreCase("push"); + } + + public boolean isPull() { + return transferTypeLastToken().equalsIgnoreCase("pull"); } public boolean isInitiating() { @@ -147,10 +151,6 @@ public boolean isStarted() { return state == State.STARTED; } - public boolean isPull() { - return transferType.split("-")[1].equalsIgnoreCase("PULL"); - } - public void setDataAddress(DataAddress dataAddress) { this.dataAddress = dataAddress; } @@ -167,8 +167,12 @@ public Type getType() { return type; } + private String transferTypeLastToken() { + return transferType.substring(transferType.lastIndexOf('-') + 1); + } + public enum Type { - PROVIDER, CONSUMER + PROVIDER, CONSUMER; } public static class Builder { diff --git a/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowStartedNotificationMessage.java b/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowStartedNotificationMessage.java index 7fadb04..06168b1 100644 --- a/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowStartedNotificationMessage.java +++ b/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowStartedNotificationMessage.java @@ -17,6 +17,7 @@ import org.eclipse.dataplane.domain.DataAddress; public record DataFlowStartedNotificationMessage( + String messageId, DataAddress dataAddress ) { } diff --git a/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowStatusMessage.java b/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowStatusMessage.java index 8fbc9cb..44a0231 100644 --- a/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowStatusMessage.java +++ b/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowStatusMessage.java @@ -17,10 +17,16 @@ import org.eclipse.dataplane.domain.DataAddress; +import java.util.UUID; + public record DataFlowStatusMessage( + String messageId, String dataFlowId, String state, DataAddress dataAddress, String error ) { + public DataFlowStatusMessage(String dataFlowId, String state, DataAddress dataAddress, String error) { + this(UUID.randomUUID().toString(), dataFlowId, state, dataAddress, error); + } } diff --git a/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowStatusResponseMessage.java b/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowStatusResponseMessage.java index 851e946..f14dfa8 100644 --- a/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowStatusResponseMessage.java +++ b/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowStatusResponseMessage.java @@ -15,7 +15,7 @@ package org.eclipse.dataplane.domain.dataflow; public record DataFlowStatusResponseMessage( - String dataflowId, + String dataFlowId, String state ) { } diff --git a/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowSuspendMessage.java b/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowSuspendMessage.java index a8d372e..9b27ffb 100644 --- a/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowSuspendMessage.java +++ b/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowSuspendMessage.java @@ -15,6 +15,7 @@ package org.eclipse.dataplane.domain.dataflow; public record DataFlowSuspendMessage( + String messageId, String reason ) { } diff --git a/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowTerminateMessage.java b/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowTerminateMessage.java index fffb46c..1060f16 100644 --- a/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowTerminateMessage.java +++ b/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlowTerminateMessage.java @@ -15,6 +15,7 @@ package org.eclipse.dataplane.domain.dataflow; public record DataFlowTerminateMessage( + String messageId, String reason ) { } diff --git a/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/port/exception/DataFlowNotifyControlPlaneFailed.java b/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/port/exception/DataFlowNotifyControlPlaneFailed.java index 0a05fa4..c84d52a 100644 --- a/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/port/exception/DataFlowNotifyControlPlaneFailed.java +++ b/dataplane-sdk-core/src/main/java/org/eclipse/dataplane/port/exception/DataFlowNotifyControlPlaneFailed.java @@ -18,15 +18,15 @@ public class DataFlowNotifyControlPlaneFailed extends Exception { private final String action; - private final HttpResponse response; + private final HttpResponse response; - public DataFlowNotifyControlPlaneFailed(String action, HttpResponse response) { - super("control-plane responded with %s".formatted(response.statusCode())); + public DataFlowNotifyControlPlaneFailed(String action, HttpResponse response) { + super("control-plane responded with %s: %s".formatted(response.statusCode(), response.body())); this.action = action; this.response = response; } - public HttpResponse getResponse() { + public HttpResponse getResponse() { return response; } diff --git a/e2e-tests/build.gradle.kts b/e2e-tests/build.gradle.kts index daf2e50..78b3dae 100644 --- a/e2e-tests/build.gradle.kts +++ b/e2e-tests/build.gradle.kts @@ -22,19 +22,20 @@ dependencies { testImplementation(libs.nimbus.jwt) - testImplementation(platform(libs.junit.bom)) - testImplementation(libs.junit.jupiter) testRuntimeOnly(libs.junit.launcher) - testImplementation(libs.restAssured) + testImplementation(platform(libs.junit.bom)) testImplementation(libs.assertJ) testImplementation(libs.awaitility) - testImplementation(libs.jakarta.rsApi) - testImplementation(libs.jersey.servlet) testImplementation(libs.jersey.hk2) testImplementation(libs.jersey.jackson) + testImplementation(libs.jersey.servlet) testImplementation(libs.jetty.ee10.servlet) testImplementation(libs.jetty.server) + testImplementation(libs.junit.jupiter) + testImplementation(libs.parsson) + testImplementation(libs.restAssured) + testImplementation(libs.tck.dps) testImplementation(libs.wiremock.jetty12) } diff --git a/e2e-tests/src/test/java/org/eclipse/dataplane/ControlPlane.java b/e2e-tests/src/test/java/org/eclipse/dataplane/ControlPlane.java index e136ac7..be8601b 100644 --- a/e2e-tests/src/test/java/org/eclipse/dataplane/ControlPlane.java +++ b/e2e-tests/src/test/java/org/eclipse/dataplane/ControlPlane.java @@ -32,6 +32,7 @@ import org.eclipse.dataplane.domain.dataflow.DataFlowTerminateMessage; import java.net.URI; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.function.Predicate; @@ -174,7 +175,8 @@ public void errored(@PathParam("transferId") String transferId, @Context Contain } executor.submit(() -> { var idPart = transferId.split("_")[1]; - counterPart.terminate("consumer_" + idPart, new DataFlowTerminateMessage("terminated by provider")).statusCode(200); + var message = new DataFlowTerminateMessage(UUID.randomUUID().toString(), "terminated by provider"); + counterPart.terminate("consumer_" + idPart, message).statusCode(200); }); } diff --git a/e2e-tests/src/test/java/org/eclipse/dataplane/HttpServer.java b/e2e-tests/src/test/java/org/eclipse/dataplane/HttpServer.java index 4751e53..88c9fd9 100644 --- a/e2e-tests/src/test/java/org/eclipse/dataplane/HttpServer.java +++ b/e2e-tests/src/test/java/org/eclipse/dataplane/HttpServer.java @@ -23,6 +23,7 @@ import org.glassfish.hk2.utilities.binding.AbstractBinder; import org.glassfish.jersey.server.ResourceConfig; import org.glassfish.jersey.servlet.ServletContainer; +import org.jspecify.annotations.NonNull; import static org.eclipse.jetty.ee10.servlet.ServletContextHandler.NO_SESSIONS; @@ -49,7 +50,19 @@ public void start() { } public void deploy(String basePath, Object controller) { - var servletHolder = createServletHolder(controller); + deploy(basePath, createServletContainerFor(controller)); + } + + public void deploy(String basePath, ResourceConfig resourceConfig) { + var servlet = new ServletContainer(resourceConfig); + var servletHolder = new ServletHolder(Source.EMBEDDED); + servletHolder.setServlet(servlet); + servletContextHandler.getServletHandler().addServletWithMapping(servletHolder, basePath + "/*"); + } + + public void deploy(String basePath, ServletContainer servlet) { + var servletHolder = new ServletHolder(Source.EMBEDDED); + servletHolder.setServlet(servlet); servletContextHandler.getServletHandler().addServletWithMapping(servletHolder, basePath + "/*"); } @@ -61,7 +74,7 @@ public void stop() { } } - private ServletHolder createServletHolder(Object controller) { + private @NonNull ServletContainer createServletContainerFor(Object controller) { var resourceConfig = new ResourceConfig(); resourceConfig.registerClasses(controller.getClass()); resourceConfig.registerInstances(new AbstractBinder() { @@ -70,10 +83,7 @@ protected void configure() { bind(controller).to((Class) controller.getClass()); } }); - var servlet = new ServletContainer(resourceConfig); - var servletHolder = new ServletHolder(Source.EMBEDDED); - servletHolder.setServlet(servlet); - return servletHolder; + return new ServletContainer(resourceConfig); } public int port() { diff --git a/e2e-tests/src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java b/e2e-tests/src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java index 92f9972..8127429 100644 --- a/e2e-tests/src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java +++ b/e2e-tests/src/test/java/org/eclipse/dataplane/scenario/ConsumerPullTest.java @@ -91,7 +91,8 @@ void shouldPullDataFromProvider() { assertThat(startResponse.state()).isEqualTo(STARTED.name()); assertThat(startResponse.dataAddress()).isNotNull(); - controlPlane.consumerStarted(consumerProcessId, new DataFlowStartedNotificationMessage(startResponse.dataAddress())).statusCode(200); + var message = new DataFlowStartedNotificationMessage(UUID.randomUUID().toString(), startResponse.dataAddress()); + controlPlane.consumerStarted(consumerProcessId, message).statusCode(200); await().untilAsserted(() -> { assertThat(consumerDataPlane.storage.toFile().listFiles()).hasSize(filesAvailableOnProvider); @@ -197,7 +198,7 @@ private Result prepareSourceDataAddress(DataFlow dataFlow) { Files.writeString(path, UUID.randomUUID().toString()); } - var dataAddress = new DataAddress("FileSystem", "directory", destinationDirectory.toString(), emptyList()); + var dataAddress = new DataAddress("FileSystem", destinationDirectory.toString(), emptyList()); dataFlow.setDataAddress(dataAddress); return Result.success(dataFlow); } catch (IOException e) { diff --git a/e2e-tests/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java b/e2e-tests/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java index 6498834..25acf10 100644 --- a/e2e-tests/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java +++ b/e2e-tests/src/test/java/org/eclipse/dataplane/scenario/StreamingPullTest.java @@ -98,13 +98,15 @@ void shouldPullDataFromProvider_thenProviderTerminatesIt() { assertThat(startResponse.state()).isEqualTo(STARTED.name()); assertThat(startResponse.dataAddress()).isNotNull(); - controlPlane.consumerStarted(consumerProcessId, new DataFlowStartedNotificationMessage(startResponse.dataAddress())).statusCode(200); + var startedMessage = new DataFlowStartedNotificationMessage(UUID.randomUUID().toString(), startResponse.dataAddress()); + controlPlane.consumerStarted(consumerProcessId, startedMessage).statusCode(200); await().untilAsserted(() -> { assertThat(consumerDataPlane.storage.toFile().listFiles()).hasSizeGreaterThan(20); }); - controlPlane.providerTerminate(providerProcessId, new DataFlowTerminateMessage("a good reason")).statusCode(200); + var terminateMessage = new DataFlowTerminateMessage(UUID.randomUUID().toString(), "a good reason"); + controlPlane.providerTerminate(providerProcessId, terminateMessage).statusCode(200); consumerDataPlane.assertNoMoreDataIsTransferred(); } @@ -126,11 +128,13 @@ void shouldSuspendAndResumeOnProvider() { assertThat(startResponse.state()).isEqualTo(STARTED.name()); assertThat(startResponse.dataAddress()).isNotNull(); - controlPlane.consumerStarted(consumerProcessId, new DataFlowStartedNotificationMessage(startResponse.dataAddress())).statusCode(200); + var firstStarted = new DataFlowStartedNotificationMessage(UUID.randomUUID().toString(), startResponse.dataAddress()); + controlPlane.consumerStarted(consumerProcessId, firstStarted).statusCode(200); consumerDataPlane.assertDataIsFlowing(); - controlPlane.providerSuspend(providerProcessId, new DataFlowSuspendMessage("a reason")).statusCode(200); + var suspendMessage = new DataFlowSuspendMessage(UUID.randomUUID().toString(), "a reason"); + controlPlane.providerSuspend(providerProcessId, suspendMessage).statusCode(200); consumerDataPlane.assertNoMoreDataIsTransferred(); @@ -138,7 +142,8 @@ void shouldSuspendAndResumeOnProvider() { var providerResumeResponse = controlPlane.providerResume(providerProcessId, resumeMessage) .statusCode(200).extract().as(DataFlowStatusMessage.class); assertThat(providerResumeResponse.dataAddress()).isNotNull(); - controlPlane.consumerStarted(consumerProcessId, new DataFlowStartedNotificationMessage(providerResumeResponse.dataAddress())) + var secondStarted = new DataFlowStartedNotificationMessage(UUID.randomUUID().toString(), providerResumeResponse.dataAddress()); + controlPlane.consumerStarted(consumerProcessId, secondStarted) .statusCode(200); var consumerResumeResponse = controlPlane.consumerResume(consumerProcessId, resumeMessage(consumerProcessId)) @@ -252,7 +257,7 @@ private Result onStart(DataFlow dataFlow) { flows.put(dataFlow.getId(), flow); - var dataAddress = new DataAddress("FileSystem", "directory", destinationDirectory.toString(), emptyList()); + var dataAddress = new DataAddress("FileSystem", destinationDirectory.toString(), emptyList()); dataFlow.setDataAddress(dataAddress); return Result.success(dataFlow); diff --git a/e2e-tests/src/test/java/org/eclipse/dataplane/scenario/StreamingPushTest.java b/e2e-tests/src/test/java/org/eclipse/dataplane/scenario/StreamingPushTest.java index b48f016..da101de 100644 --- a/e2e-tests/src/test/java/org/eclipse/dataplane/scenario/StreamingPushTest.java +++ b/e2e-tests/src/test/java/org/eclipse/dataplane/scenario/StreamingPushTest.java @@ -117,10 +117,10 @@ void shouldSuspendAndResumeByConsumer() { assertThat(startResponse.state()).isEqualTo(STARTED.name()); consumerDataPlane.assertDataIsFlowing(consumerProcessId); - controlPlane.consumerSuspend(consumerProcessId, new DataFlowSuspendMessage("a reason")) + controlPlane.consumerSuspend(consumerProcessId, new DataFlowSuspendMessage(UUID.randomUUID().toString(), "a reason")) .statusCode(200); - controlPlane.providerSuspend(providerProcessId, new DataFlowSuspendMessage("a reason")) + controlPlane.providerSuspend(providerProcessId, new DataFlowSuspendMessage(UUID.randomUUID().toString(), "a reason")) .statusCode(200); consumerDataPlane.assertNoMoreDataIsTransferred(consumerProcessId); @@ -205,7 +205,7 @@ private static class ConsumerDataPlane { private Result onPrepare(DataFlow dataFlow) { try { var destinationFolder = Files.createTempDirectory("consumer-dest"); - var dataAddress = new DataAddress("FileSystem", "folder", destinationFolder.toString(), emptyList()); + var dataAddress = new DataAddress("FileSystem", destinationFolder.toString(), emptyList()); dataFlow.setDataAddress(dataAddress); destinations.put(dataFlow.getId(), dataAddress); diff --git a/e2e-tests/src/test/java/org/eclipse/dataplane/tck/DpsTckTest.java b/e2e-tests/src/test/java/org/eclipse/dataplane/tck/DpsTckTest.java new file mode 100644 index 0000000..58a6498 --- /dev/null +++ b/e2e-tests/src/test/java/org/eclipse/dataplane/tck/DpsTckTest.java @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2026 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.dataplane.tck; + +import org.eclipse.dataplane.HttpServer; +import org.eclipse.dataplane.domain.registration.ControlPlaneRegistrationMessage; +import org.eclipse.dataplane.port.DataPlaneSignalingApiController; +import org.eclipse.dataspacetck.dps.system.DpsSystemLauncher; +import org.eclipse.dataspacetck.runtime.TckRuntime; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.net.URI; +import java.util.Map; +import java.util.Properties; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Runs the DPS TCK data plane verification tests against the dataplane-sdk-java HTTP server. + * + *

The SDK data plane (the CUT) is started in {@code @BeforeAll}. The TCK uses DpsSystemLauncher as the simulated + * control plane, pointed at the SDK server + * via the {@code dataspacetck.dps.dataplane.url} property. + * + *

Tests signal their intent via a sentinel {@code agreementId} in {@code dps.tck.properties}: + * {@code "complete"} causes the data plane to autonomously send a completed callback after the + * transfer starts; {@code "async"} causes it to respond 202+transitional then send a started/prepared callback. + */ +public class DpsTckTest { + + private HttpServer httpServer; + + @BeforeEach + void startDataplane() { + var tckControlPlaneId = "tck-control-plane"; + var tckDataplane = new TckDataplane(tckControlPlaneId); + tckDataplane.getDataplane().registerControlPlane(new ControlPlaneRegistrationMessage(tckControlPlaneId, URI.create("http://localhost"))); + + httpServer = new HttpServer(); + httpServer.start(); + httpServer.deploy("/dataplane", new DataPlaneSignalingApiController(tckDataplane.getDataplane())); + } + + @AfterEach + void stopDataplane() { + if (httpServer != null) { + httpServer.stop(); + } + } + + @Test + void runDpsTckTests() { + var properties = loadProperties("dps.tck.properties"); + properties.put("dataspacetck.debug", "true"); + properties.put("dataspacetck.dps.dataplane.url", "http://localhost:" + httpServer.port() + "/dataplane/v1"); + + var result = TckRuntime.Builder.newInstance() + .launcher(DpsSystemLauncher.class) + .properties(properties) + .addPackage("org.eclipse.dataspacetck.dps.verification.dataplane") + .build() + .execute(); + + assertThat(result.getFailures()).isEmpty(); + } + + private Map loadProperties(String resource) { + var url = getClass().getClassLoader().getResource(resource); + assertThat(url).as("Resource not found: %s", resource).isNotNull(); + var properties = new Properties(); + try (var stream = url.openStream()) { + properties.load(stream); + } catch (IOException e) { + throw new RuntimeException(e); + } + return properties.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey().toString(), e -> e.getValue().toString())); + } + +} diff --git a/e2e-tests/src/test/java/org/eclipse/dataplane/tck/TckDataplane.java b/e2e-tests/src/test/java/org/eclipse/dataplane/tck/TckDataplane.java new file mode 100644 index 0000000..03be89a --- /dev/null +++ b/e2e-tests/src/test/java/org/eclipse/dataplane/tck/TckDataplane.java @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2026 Think-it GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Think-it GmbH - initial API and implementation + * + */ + +package org.eclipse.dataplane.tck; + +import org.eclipse.dataplane.Dataplane; +import org.eclipse.dataplane.domain.DataAddress; +import org.eclipse.dataplane.domain.Result; +import org.eclipse.dataplane.domain.dataflow.DataFlow; +import org.eclipse.dataplane.domain.registration.Authorization; +import org.eclipse.dataplane.domain.registration.AuthorizationProfile; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static java.util.Collections.emptyList; + +public class TckDataplane { + + private static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor(); + + private final Dataplane dataplane; + + public TckDataplane(String tckControlPlaneId) { + this.dataplane = Dataplane.newInstance() + .id("sdk-dataplane") + .registerAuthorization(new TckAuthorization(tckControlPlaneId)) + .onPrepare(this::handlePrepare) + .onStart(this::handleStart) + .onStarted(this::handleStarted) + .onSuspend(Result::success) + .onResume(Result::success) + .onTerminate(Result::success) + .onCompleted(Result::success) + .build(); + } + + public Dataplane getDataplane() { + return dataplane; + } + + private Result handlePrepare(DataFlow dataFlow) { + if ("async".equals(dataFlow.getAgreementId())) { + dataFlow.transitionToPreparing(); + var dataFlowId = dataFlow.getId(); + EXECUTOR_SERVICE.schedule(() -> { + var result = dataplane.notifyPrepared(dataFlowId, Result::success); + if (result.failed()) { + throw new RuntimeException("[TCK async-prepare] notifyPrepared failed: " + result.getException()); + } + }, 50, TimeUnit.MILLISECONDS); + } else { + dataFlow.setDataAddress(new DataAddress("endpointType", "http://any", emptyList())); + } + return Result.success(dataFlow); + } + + private Result handleStart(DataFlow dataFlow) { + var dataFlowId = dataFlow.getId(); + if ("async".equals(dataFlow.getAgreementId())) { + dataFlow.transitionToStarting(); + EXECUTOR_SERVICE.schedule(() -> { + var result = dataplane.notifyStarted(dataFlowId, Result::success); + if (result.failed()) { + throw new RuntimeException("[TCK async-start] notifyStarted failed: " + result.getException()); + } + }, 50, TimeUnit.MILLISECONDS); + } else { + dataFlow.setDataAddress(new DataAddress("endpointType", "http://any", emptyList())); + if ("complete".equals(dataFlow.getAgreementId())) { + EXECUTOR_SERVICE.schedule(() -> { + var result = dataplane.notifyCompleted(dataFlowId); + if (result.failed()) { + throw new RuntimeException("[TCK auto-complete] notifyCompleted failed: " + result.getException()); + } + }, 250, TimeUnit.MILLISECONDS); + } + } + return Result.success(dataFlow); + } + + private Result handleStarted(DataFlow dataFlow) { + if ("complete".equals(dataFlow.getAgreementId())) { + var dataFlowId = dataFlow.getId(); + EXECUTOR_SERVICE.schedule(() -> { + var result = dataplane.notifyCompleted(dataFlowId); + if (result.failed()) { + throw new RuntimeException("[TCK auto-complete] notifyCompleted failed: " + result.getException()); + } + }, 250, TimeUnit.MILLISECONDS); + } + return Result.success(dataFlow); + } + + static class TckAuthorization implements Authorization { + + private final String tckControlPlaneId; + + TckAuthorization(String tckControlPlaneId) { + this.tckControlPlaneId = tckControlPlaneId; + } + + @Override + public String type() { + return "tck"; + } + + @Override + public Result authorizationHeader(AuthorizationProfile profile) { + return Result.success("dummy"); + } + + @Override + public Result extractCallerId(String authorizationHeader) { + return Result.success(tckControlPlaneId); + } + } + +} diff --git a/e2e-tests/src/test/resources/dps.tck.properties b/e2e-tests/src/test/resources/dps.tck.properties new file mode 100644 index 0000000..1acf4c7 --- /dev/null +++ b/e2e-tests/src/test/resources/dps.tck.properties @@ -0,0 +1,22 @@ +# +# Copyright (c) 2026 Think-it GmbH +# +# This program and the accompanying materials are made available under the +# terms of the Apache License, Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# +# Contributors: +# Think-it GmbH - initial API and implementation +# + +# Tests signal their intent via a sentinel agreementId: +# "complete" -> data plane autonomously sends a /dataflow/completed callback after the transfer starts +# "async" -> data plane responds 202+transitional, then sends a started/prepared callback +DP_C_PULL_03_01_AGREEMENTID=complete +DP_P_PULL_04_01_AGREEMENTID=async +DP_C_PULL_04_01_AGREEMENTID=async +DP_P_PUSH_03_01_AGREEMENTID=complete +DP_P_PUSH_04_01_AGREEMENTID=async +DP_C_PUSH_04_01_AGREEMENTID=async diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index fa2c3d6..31bbdfd 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -14,6 +14,7 @@ nimbusJoseJwt = "10.9" postgresql = "42.7.11" restAssured = "6.0.0" slf4j = "2.0.18" +tck-dps = "1.1.0" testcontainers = "2.0.5" wiremock = "3.13.2" @@ -35,10 +36,11 @@ restAssured = { module = "io.rest-assured:rest-assured", version.ref = "restAssu assertJ = { module = "org.assertj:assertj-core", version.ref = "assertJ" } awaitility = { module = "org.awaitility:awaitility", version.ref = "awaitility" } mockito-core = { module = "org.mockito:mockito-core", version.ref = "mockito" } - +parsson = { module = "org.eclipse.parsson:parsson", version = "1.1.9" } +postgresql = { module = "org.postgresql:postgresql", version.ref = "postgresql" } +tck-dps = { module = "org.eclipse.dataspacetck.dps:dps-tck", version.ref = "tck-dps" } testcontainers-junit-jupiter = { module = "org.testcontainers:testcontainers-junit-jupiter", version.ref = "testcontainers" } testcontainers-postgresql = { module = "org.testcontainers:testcontainers-postgresql", version.ref = "testcontainers" } -postgresql = { module = "org.postgresql:postgresql", version.ref = "postgresql" } jetty-ee10-servlet = { module = "org.eclipse.jetty.ee10:jetty-ee10-servlet", version.ref = "jetty" } jetty-server = { module = "org.eclipse.jetty:jetty-server", version.ref = "jetty" }