Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ plugins {
}

group = "org.eclipse.dataplane-core"
version = "0.0.11-SNAPSHOT"
version = "0.0.12-SNAPSHOT"

subprojects {
apply(plugin = "signing")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@
import java.util.Set;
import java.util.UUID;

import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES;
import static jakarta.ws.rs.core.HttpHeaders.AUTHORIZATION;
import static java.util.Collections.emptyMap;

public class Dataplane {

private final ObjectMapper objectMapper = new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false);
private final ObjectMapper objectMapper = new ObjectMapper()
.configure(FAIL_ON_UNKNOWN_PROPERTIES, false)
.setDefaultPropertyInclusion(NON_NULL);
private DataFlowStore dataFlowStore = new InMemoryDataFlowStore(objectMapper);
private ControlPlaneStore controlPlaneStore = new InMemoryControlPlaneStore(objectMapper);
private String id;
Expand Down Expand Up @@ -265,8 +267,8 @@ public Result<Void> notifyCompleted(String dataFlowId) {
return dataFlowStore.findById(dataFlowId)
.compose(dataFlow -> {
dataFlow.transitionToCompleted();

return notifyControlPlane("completed", dataFlow, emptyMap());
var message = new DataFlowStatusMessage(dataFlowId, dataFlow.getState().name(), null, null);
return notifyControlPlane("completed", dataFlow, message);
});
}

Expand Down Expand Up @@ -372,7 +374,7 @@ private Result<Void> notifyControlPlane(String action, DataFlow dataFlow, Object
})
.onSuccess(authorizationHeader -> requestBuilder.header(AUTHORIZATION, authorizationHeader));

return httpClient.send(requestBuilder.build(), HttpResponse.BodyHandlers.discarding());
return httpClient.send(requestBuilder.build(), HttpResponse.BodyHandlers.ofString());
})
.compose(response -> {
var successful = response.statusCode() >= 200 && response.statusCode() < 300;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,21 @@

package org.eclipse.dataplane.domain;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;

@JsonIgnoreProperties(value = {"@type"}, allowGetters = true)
public record DataAddress(
@JsonProperty("@type") String type,
String endpointType,
String endpoint,
List<EndpointProperty> endpointProperties
) {

public DataAddress(String endpointType, String endpoint, List<EndpointProperty> endpointProperties) {
this("DataAddress", endpointType, endpoint, endpointProperties);
@JsonProperty("@type")
public String getType() {
return "DataAddress";
}

public record EndpointProperty(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,11 @@ public void transitionToTerminated(String reason) {
}

public boolean isPush() {
return transferType.split("-")[1].equalsIgnoreCase("PUSH");
return transferTypeLastToken().equalsIgnoreCase("push");
}

public boolean isPull() {
return transferTypeLastToken().equalsIgnoreCase("pull");
}

public boolean isInitiating() {
Expand All @@ -147,10 +151,6 @@ public boolean isStarted() {
return state == State.STARTED;
}

public boolean isPull() {
return transferType.split("-")[1].equalsIgnoreCase("PULL");
}

public void setDataAddress(DataAddress dataAddress) {
this.dataAddress = dataAddress;
}
Expand All @@ -167,8 +167,12 @@ public Type getType() {
return type;
}

private String transferTypeLastToken() {
return transferType.substring(transferType.lastIndexOf('-') + 1);
}

public enum Type {
PROVIDER, CONSUMER
PROVIDER, CONSUMER;
}

public static class Builder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.eclipse.dataplane.domain.DataAddress;

public record DataFlowStartedNotificationMessage(
String messageId,
DataAddress dataAddress
) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@

import org.eclipse.dataplane.domain.DataAddress;

import java.util.UUID;

public record DataFlowStatusMessage(
String messageId,
String dataFlowId,
String state,
DataAddress dataAddress,
String error
) {
public DataFlowStatusMessage(String dataFlowId, String state, DataAddress dataAddress, String error) {
this(UUID.randomUUID().toString(), dataFlowId, state, dataAddress, error);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package org.eclipse.dataplane.domain.dataflow;

public record DataFlowStatusResponseMessage(
String dataflowId,
String dataFlowId,
String state
) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.eclipse.dataplane.domain.dataflow;

public record DataFlowSuspendMessage(
String messageId,
String reason
) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.eclipse.dataplane.domain.dataflow;

public record DataFlowTerminateMessage(
String messageId,
String reason
) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@

public class DataFlowNotifyControlPlaneFailed extends Exception {
private final String action;
private final HttpResponse<Void> response;
private final HttpResponse<String> response;

public DataFlowNotifyControlPlaneFailed(String action, HttpResponse<Void> response) {
super("control-plane responded with %s".formatted(response.statusCode()));
public DataFlowNotifyControlPlaneFailed(String action, HttpResponse<String> response) {
super("control-plane responded with %s: %s".formatted(response.statusCode(), response.body()));
this.action = action;
this.response = response;
}

public HttpResponse<Void> getResponse() {
public HttpResponse<String> getResponse() {
return response;
}

Expand Down
11 changes: 6 additions & 5 deletions e2e-tests/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,20 @@ dependencies {

testImplementation(libs.nimbus.jwt)

testImplementation(platform(libs.junit.bom))
testImplementation(libs.junit.jupiter)
testRuntimeOnly(libs.junit.launcher)
testImplementation(libs.restAssured)
testImplementation(platform(libs.junit.bom))
testImplementation(libs.assertJ)
testImplementation(libs.awaitility)

testImplementation(libs.jakarta.rsApi)
testImplementation(libs.jersey.servlet)
testImplementation(libs.jersey.hk2)
testImplementation(libs.jersey.jackson)
testImplementation(libs.jersey.servlet)
testImplementation(libs.jetty.ee10.servlet)
testImplementation(libs.jetty.server)
testImplementation(libs.junit.jupiter)
testImplementation(libs.parsson)
testImplementation(libs.restAssured)
testImplementation(libs.tck.dps)
testImplementation(libs.wiremock.jetty12)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.eclipse.dataplane.domain.dataflow.DataFlowTerminateMessage;

import java.net.URI;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Predicate;
Expand Down Expand Up @@ -174,7 +175,8 @@ public void errored(@PathParam("transferId") String transferId, @Context Contain
}
executor.submit(() -> {
var idPart = transferId.split("_")[1];
counterPart.terminate("consumer_" + idPart, new DataFlowTerminateMessage("terminated by provider")).statusCode(200);
var message = new DataFlowTerminateMessage(UUID.randomUUID().toString(), "terminated by provider");
counterPart.terminate("consumer_" + idPart, message).statusCode(200);
});
}

Expand Down
22 changes: 16 additions & 6 deletions e2e-tests/src/test/java/org/eclipse/dataplane/HttpServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.glassfish.hk2.utilities.binding.AbstractBinder;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.servlet.ServletContainer;
import org.jspecify.annotations.NonNull;

import static org.eclipse.jetty.ee10.servlet.ServletContextHandler.NO_SESSIONS;

Expand All @@ -49,7 +50,19 @@ public void start() {
}

public void deploy(String basePath, Object controller) {
var servletHolder = createServletHolder(controller);
deploy(basePath, createServletContainerFor(controller));
}

public void deploy(String basePath, ResourceConfig resourceConfig) {
var servlet = new ServletContainer(resourceConfig);
var servletHolder = new ServletHolder(Source.EMBEDDED);
servletHolder.setServlet(servlet);
servletContextHandler.getServletHandler().addServletWithMapping(servletHolder, basePath + "/*");
}

public void deploy(String basePath, ServletContainer servlet) {
var servletHolder = new ServletHolder(Source.EMBEDDED);
servletHolder.setServlet(servlet);
servletContextHandler.getServletHandler().addServletWithMapping(servletHolder, basePath + "/*");
}

Expand All @@ -61,7 +74,7 @@ public void stop() {
}
}

private ServletHolder createServletHolder(Object controller) {
private @NonNull ServletContainer createServletContainerFor(Object controller) {
var resourceConfig = new ResourceConfig();
resourceConfig.registerClasses(controller.getClass());
resourceConfig.registerInstances(new AbstractBinder() {
Expand All @@ -70,10 +83,7 @@ protected void configure() {
bind(controller).to((Class<? super Object>) controller.getClass());
}
});
var servlet = new ServletContainer(resourceConfig);
var servletHolder = new ServletHolder(Source.EMBEDDED);
servletHolder.setServlet(servlet);
return servletHolder;
return new ServletContainer(resourceConfig);
}

public int port() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ void shouldPullDataFromProvider() {
assertThat(startResponse.state()).isEqualTo(STARTED.name());
assertThat(startResponse.dataAddress()).isNotNull();

controlPlane.consumerStarted(consumerProcessId, new DataFlowStartedNotificationMessage(startResponse.dataAddress())).statusCode(200);
var message = new DataFlowStartedNotificationMessage(UUID.randomUUID().toString(), startResponse.dataAddress());
controlPlane.consumerStarted(consumerProcessId, message).statusCode(200);

await().untilAsserted(() -> {
assertThat(consumerDataPlane.storage.toFile().listFiles()).hasSize(filesAvailableOnProvider);
Expand Down Expand Up @@ -197,7 +198,7 @@ private Result<DataFlow> prepareSourceDataAddress(DataFlow dataFlow) {
Files.writeString(path, UUID.randomUUID().toString());
}

var dataAddress = new DataAddress("FileSystem", "directory", destinationDirectory.toString(), emptyList());
var dataAddress = new DataAddress("FileSystem", destinationDirectory.toString(), emptyList());
dataFlow.setDataAddress(dataAddress);
return Result.success(dataFlow);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,15 @@ void shouldPullDataFromProvider_thenProviderTerminatesIt() {
assertThat(startResponse.state()).isEqualTo(STARTED.name());
assertThat(startResponse.dataAddress()).isNotNull();

controlPlane.consumerStarted(consumerProcessId, new DataFlowStartedNotificationMessage(startResponse.dataAddress())).statusCode(200);
var startedMessage = new DataFlowStartedNotificationMessage(UUID.randomUUID().toString(), startResponse.dataAddress());
controlPlane.consumerStarted(consumerProcessId, startedMessage).statusCode(200);

await().untilAsserted(() -> {
assertThat(consumerDataPlane.storage.toFile().listFiles()).hasSizeGreaterThan(20);
});

controlPlane.providerTerminate(providerProcessId, new DataFlowTerminateMessage("a good reason")).statusCode(200);
var terminateMessage = new DataFlowTerminateMessage(UUID.randomUUID().toString(), "a good reason");
controlPlane.providerTerminate(providerProcessId, terminateMessage).statusCode(200);

consumerDataPlane.assertNoMoreDataIsTransferred();
}
Expand All @@ -126,19 +128,22 @@ void shouldSuspendAndResumeOnProvider() {
assertThat(startResponse.state()).isEqualTo(STARTED.name());
assertThat(startResponse.dataAddress()).isNotNull();

controlPlane.consumerStarted(consumerProcessId, new DataFlowStartedNotificationMessage(startResponse.dataAddress())).statusCode(200);
var firstStarted = new DataFlowStartedNotificationMessage(UUID.randomUUID().toString(), startResponse.dataAddress());
controlPlane.consumerStarted(consumerProcessId, firstStarted).statusCode(200);

consumerDataPlane.assertDataIsFlowing();

controlPlane.providerSuspend(providerProcessId, new DataFlowSuspendMessage("a reason")).statusCode(200);
var suspendMessage = new DataFlowSuspendMessage(UUID.randomUUID().toString(), "a reason");
controlPlane.providerSuspend(providerProcessId, suspendMessage).statusCode(200);

consumerDataPlane.assertNoMoreDataIsTransferred();

var resumeMessage = resumeMessage(providerProcessId);
var providerResumeResponse = controlPlane.providerResume(providerProcessId, resumeMessage)
.statusCode(200).extract().as(DataFlowStatusMessage.class);
assertThat(providerResumeResponse.dataAddress()).isNotNull();
controlPlane.consumerStarted(consumerProcessId, new DataFlowStartedNotificationMessage(providerResumeResponse.dataAddress()))
var secondStarted = new DataFlowStartedNotificationMessage(UUID.randomUUID().toString(), providerResumeResponse.dataAddress());
controlPlane.consumerStarted(consumerProcessId, secondStarted)
.statusCode(200);

var consumerResumeResponse = controlPlane.consumerResume(consumerProcessId, resumeMessage(consumerProcessId))
Expand Down Expand Up @@ -252,7 +257,7 @@ private Result<DataFlow> onStart(DataFlow dataFlow) {

flows.put(dataFlow.getId(), flow);

var dataAddress = new DataAddress("FileSystem", "directory", destinationDirectory.toString(), emptyList());
var dataAddress = new DataAddress("FileSystem", destinationDirectory.toString(), emptyList());
dataFlow.setDataAddress(dataAddress);

return Result.success(dataFlow);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ void shouldSuspendAndResumeByConsumer() {
assertThat(startResponse.state()).isEqualTo(STARTED.name());
consumerDataPlane.assertDataIsFlowing(consumerProcessId);

controlPlane.consumerSuspend(consumerProcessId, new DataFlowSuspendMessage("a reason"))
controlPlane.consumerSuspend(consumerProcessId, new DataFlowSuspendMessage(UUID.randomUUID().toString(), "a reason"))
.statusCode(200);

controlPlane.providerSuspend(providerProcessId, new DataFlowSuspendMessage("a reason"))
controlPlane.providerSuspend(providerProcessId, new DataFlowSuspendMessage(UUID.randomUUID().toString(), "a reason"))
.statusCode(200);

consumerDataPlane.assertNoMoreDataIsTransferred(consumerProcessId);
Expand Down Expand Up @@ -205,7 +205,7 @@ private static class ConsumerDataPlane {
private Result<DataFlow> onPrepare(DataFlow dataFlow) {
try {
var destinationFolder = Files.createTempDirectory("consumer-dest");
var dataAddress = new DataAddress("FileSystem", "folder", destinationFolder.toString(), emptyList());
var dataAddress = new DataAddress("FileSystem", destinationFolder.toString(), emptyList());

dataFlow.setDataAddress(dataAddress);
destinations.put(dataFlow.getId(), dataAddress);
Expand Down
Loading