diff --git a/build.gradle.kts b/build.gradle.kts index 15b9940..dab8889 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -34,6 +34,9 @@ dependencies { testImplementation("org.mockito:mockito-core:5.23.0") testImplementation("org.slf4j:slf4j-simple:2.0.17") testImplementation("org.wiremock:wiremock-jetty12:3.13.2") + testImplementation("org.testcontainers:testcontainers-junit-jupiter:2.0.5") + testImplementation("org.testcontainers:testcontainers-postgresql:2.0.5") + testImplementation("org.postgresql:postgresql:42.7.11") } tasks.test { diff --git a/src/main/java/org/eclipse/dataplane/Dataplane.java b/src/main/java/org/eclipse/dataplane/Dataplane.java index a027ed8..94a3185 100644 --- a/src/main/java/org/eclipse/dataplane/Dataplane.java +++ b/src/main/java/org/eclipse/dataplane/Dataplane.java @@ -51,6 +51,7 @@ import org.eclipse.dataplane.port.store.DataFlowStore; import org.eclipse.dataplane.port.store.InMemoryControlPlaneStore; import org.eclipse.dataplane.port.store.InMemoryDataFlowStore; +import org.eclipse.dataplane.port.store.Stores; import java.net.URI; import java.net.http.HttpClient; @@ -69,8 +70,8 @@ public class Dataplane { private final ObjectMapper objectMapper = new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false); - private final DataFlowStore dataFlowStore = new InMemoryDataFlowStore(objectMapper); - private final ControlPlaneStore controlPlaneStore = new InMemoryControlPlaneStore(objectMapper); + private DataFlowStore dataFlowStore = new InMemoryDataFlowStore(objectMapper); + private ControlPlaneStore controlPlaneStore = new InMemoryControlPlaneStore(objectMapper); private String id; private URI endpoint; private final Set transferTypes = new HashSet<>(); @@ -454,6 +455,12 @@ public Builder label(String label) { return this; } + public Builder stores(Stores stores) { + dataplane.dataFlowStore = stores.dataFlowStore(); + dataplane.controlPlaneStore = stores.controlPlaneStore(); + return this; + } + public Builder onPrepare(OnPrepare onPrepare) { dataplane.onPrepare = onPrepare; return this; diff --git a/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java b/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java index c5efead..b1b8f36 100644 --- a/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java +++ b/src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java @@ -243,6 +243,16 @@ public Builder callbackAddress(URI callbackAddress) { return this; } + public Builder suspensionReason(String suspensionReason) { + dataFlow.suspensionReason = suspensionReason; + return this; + } + + public Builder terminationReason(String terminationReason) { + dataFlow.terminationReason = terminationReason; + return this; + } + public Builder metadata(Map metadata) { dataFlow.metadata = metadata; return this; diff --git a/src/main/java/org/eclipse/dataplane/port/exception/PersistenceException.java b/src/main/java/org/eclipse/dataplane/port/exception/PersistenceException.java new file mode 100644 index 0000000..64156b0 --- /dev/null +++ b/src/main/java/org/eclipse/dataplane/port/exception/PersistenceException.java @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + +package org.eclipse.dataplane.port.exception; + +/** + * Indicates an error during database interactions, i.e. an error occurred persisting, reading or + * deleting an entry. + */ +public class PersistenceException extends RuntimeException { + + public PersistenceException(String message) { + super(message); + } + + public PersistenceException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/org/eclipse/dataplane/port/store/Stores.java b/src/main/java/org/eclipse/dataplane/port/store/Stores.java new file mode 100644 index 0000000..8bf2693 --- /dev/null +++ b/src/main/java/org/eclipse/dataplane/port/store/Stores.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + +package org.eclipse.dataplane.port.store; + +/** + * Data class that bundles the stores used by the dataplane. + * + * @param dataFlowStore store for data flows + * @param controlPlaneStore store for control planes + */ +public record Stores(DataFlowStore dataFlowStore, ControlPlaneStore controlPlaneStore) { +} diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/AbstractSqlStore.java b/src/main/java/org/eclipse/dataplane/port/store/sql/AbstractSqlStore.java new file mode 100644 index 0000000..58c8bbc --- /dev/null +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/AbstractSqlStore.java @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + +package org.eclipse.dataplane.port.store.sql; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.dataplane.port.exception.PersistenceException; + +import java.sql.Connection; +import java.sql.DriverManager; + +/** + * Base class for SQL-based store implementations that provides methods for common functionality + * like connection handling and JSON parsing. + */ +public abstract class AbstractSqlStore { + + protected ObjectMapper objectMapper; + + private final String databaseUrl; + private final String databaseUsername; + private final String databasePassword; + + public AbstractSqlStore(ObjectMapper objectMapper, String databaseUrl, String databaseUsername, String databasePassword) { + this.objectMapper = objectMapper; + this.databaseUrl = databaseUrl; + this.databaseUsername = databaseUsername; + this.databasePassword = databasePassword; + } + + protected Connection getConnection() { + try { + return DriverManager.getConnection(databaseUrl, databaseUsername, databasePassword); + } catch (Exception e) { + throw new PersistenceException("Failed to connect to database.", e); + } + } + + protected void closeConnection(Connection connection) { + try { + connection.close(); + } catch (Exception e) { + throw new PersistenceException("Failed to commit transaction.", e); + } + } + + protected String toJson(Object object) { + if (object == null) { + return null; + } + + try { + return object instanceof String ? object.toString() : objectMapper.writeValueAsString(object); + } catch (JsonProcessingException e) { + throw new PersistenceException("Failed to convert object to JSON.", e); + } + } + + protected T fromJson(String json, Class type) { + return fromJson(json, objectMapper.getTypeFactory().constructType(type)); + } + + protected T fromJson(String json, TypeReference type) { + return fromJson(json, objectMapper.getTypeFactory().constructType(type)); + } + + protected T fromJson(String json, JavaType type) { + if (json == null) { + return null; + } + + try { + return objectMapper.readValue(json, type); + } catch (JsonProcessingException e) { + throw new PersistenceException("Failed to convert JSON to object.", e); + } + } +} diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresControlPlaneStore.java b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresControlPlaneStore.java new file mode 100644 index 0000000..026bdec --- /dev/null +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresControlPlaneStore.java @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + +package org.eclipse.dataplane.port.store.sql; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.dataplane.domain.Result; +import org.eclipse.dataplane.domain.controlplane.ControlPlane; +import org.eclipse.dataplane.domain.registration.AuthorizationProfile; +import org.eclipse.dataplane.port.exception.PersistenceException; +import org.eclipse.dataplane.port.exception.ResourceNotFoundException; +import org.eclipse.dataplane.port.store.ControlPlaneStore; + +import java.net.URI; + +import static java.lang.String.format; + +public class PostgresControlPlaneStore extends AbstractSqlStore implements ControlPlaneStore { + + public PostgresControlPlaneStore(ObjectMapper objectMapper, String databaseUrl, String databaseUsername, String databasePassword) { + super(objectMapper, databaseUrl, databaseUsername, databasePassword); + } + + @Override + public Result save(ControlPlane controlPlane) { + var connection = getConnection(); + + try (var statement = connection.prepareStatement(upsertControlPlaneTemplate())) { + statement.setString(1, controlPlane.getId()); + statement.setString(2, controlPlane.getEndpoint().toString()); + statement.setString(3, toJson(controlPlane.getAuthorization())); + + statement.executeUpdate(); + return Result.success(); + } catch (Exception e) { + return Result.failure(new PersistenceException(format("Failed to persist ControlPlane with id %s.", controlPlane.getId()), e)); + } finally { + closeConnection(connection); + } + } + + @Override + public Result findById(String controlplaneId) { + var connection = getConnection(); + + try (var statement = connection.prepareStatement(findControlPlaneByIdTemplate())) { + statement.setString(1, controlplaneId); + var resultSet = statement.executeQuery(); + + if (!resultSet.next()) { + return Result.failure(new ResourceNotFoundException(format("ControlPlane with id %s not found.", controlplaneId))); + } + + var controlplane = ControlPlane.newInstance() + .id(controlplaneId) + .endpoint(URI.create(resultSet.getString("endpoint"))) + .authorization(fromJson(resultSet.getString("auth"), AuthorizationProfile.class)) + .build(); + return Result.success(controlplane); + } catch (Exception e) { + return Result.failure(new PersistenceException(format("Failed to read ControlPlane with id %s.", controlplaneId), e)); + } finally { + closeConnection(connection); + } + } + + @Override + public Result delete(String id) { + var connection = getConnection(); + + try (var statement = connection.prepareStatement(deleteControlPlaneByIdTemplate())) { + statement.setString(1, id); + var rows = statement.executeUpdate(); + if (rows < 1) { + return Result.failure(new ResourceNotFoundException(format("ControlPlane with id %s not found.", id))); + } + return Result.success(); + } catch (Exception e) { + return Result.failure(new PersistenceException(format("Failed to delete ControlPlane with id %s.", id), e)); + } finally { + closeConnection(connection); + } + } + + @Override + public boolean exists(String controlplaneId) { + var connection = getConnection(); + + try (var statement = connection.prepareStatement(countControlPlaneByIdTemplate())) { + statement.setString(1, controlplaneId); + var resultSet = statement.executeQuery(); + resultSet.next(); + return resultSet.getInt(1) > 0; + } catch (Exception e) { + throw new PersistenceException(format("Failed to check for existence of ControlPlane with id %s.", controlplaneId), e); + } finally { + closeConnection(connection); + } + } + + private String upsertControlPlaneTemplate() { + return "INSERT INTO control_planes (id, endpoint, auth) VALUES (?, ?, ?::json)" + + " ON CONFLICT (id) DO UPDATE SET" + + " endpoint = EXCLUDED.endpoint," + + " auth = EXCLUDED.auth"; + } + + private String findControlPlaneByIdTemplate() { + return "SELECT * FROM control_planes WHERE id = ?"; + } + + private String deleteControlPlaneByIdTemplate() { + return "DELETE FROM control_planes WHERE id = ?"; + } + + private String countControlPlaneByIdTemplate() { + return "SELECT COUNT(*) FROM control_planes WHERE id = ?"; + } +} diff --git a/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresDataFlowStore.java b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresDataFlowStore.java new file mode 100644 index 0000000..f49b81f --- /dev/null +++ b/src/main/java/org/eclipse/dataplane/port/store/sql/PostgresDataFlowStore.java @@ -0,0 +1,132 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + +package org.eclipse.dataplane.port.store.sql; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.dataplane.domain.DataAddress; +import org.eclipse.dataplane.domain.Result; +import org.eclipse.dataplane.domain.dataflow.DataFlow; +import org.eclipse.dataplane.port.exception.PersistenceException; +import org.eclipse.dataplane.port.exception.ResourceNotFoundException; +import org.eclipse.dataplane.port.store.DataFlowStore; + +import java.net.URI; + +import static java.lang.String.format; + +public class PostgresDataFlowStore extends AbstractSqlStore implements DataFlowStore { + + public PostgresDataFlowStore(ObjectMapper objectMapper, String databaseUrl, String databaseUsername, String databasePassword) { + super(objectMapper, databaseUrl, databaseUsername, databasePassword); + } + + @Override + public Result save(DataFlow dataFlow) { + var connection = getConnection(); + + try (var statement = connection.prepareStatement(upsertDataFlowTemplate())) { + statement.setString(1, dataFlow.getId()); + statement.setString(2, dataFlow.getTransferType()); + statement.setString(3, dataFlow.getType().name()); + statement.setString(4, dataFlow.getState().name()); + statement.setString(5, dataFlow.getDatasetId()); + statement.setString(6, dataFlow.getAgreementId()); + statement.setString(7, dataFlow.getParticipantId()); + statement.setString(8, dataFlow.getCounterPartyId()); + statement.setString(9, dataFlow.getDataspaceContext()); + statement.setString(10, dataFlow.getCallbackAddress().toString()); + statement.setString(11, dataFlow.getSuspensionReason()); + statement.setString(12, dataFlow.getTerminationReason()); + statement.setString(13, toJson(dataFlow.getLabels())); + statement.setString(14, toJson(dataFlow.getMetadata())); + statement.setString(15, toJson(dataFlow.getDataAddress())); + statement.setString(16, dataFlow.getControlplaneId()); + + statement.executeUpdate(); + return Result.success(); + } catch (Exception e) { + return Result.failure(new PersistenceException(format("Failed to persist DataFlow with id %s.", dataFlow.getId()), e)); + } finally { + closeConnection(connection); + } + } + + @Override + public Result findById(String flowId) { + var connection = getConnection(); + + try (var statement = connection.prepareStatement(findDataFlowByIdTemplate())) { + statement.setString(1, flowId); + var resultSet = statement.executeQuery(); + + if (!resultSet.next()) { + return Result.failure(new ResourceNotFoundException(format("DataFlow with id %s not found.", flowId))); + } + + var dataFlow = DataFlow.newInstance() + .id(flowId) + .state(DataFlow.State.valueOf(resultSet.getString("state"))) + .transferType(resultSet.getString("transfer_type")) + .datasetId(resultSet.getString("dataset_id")) + .agreementId(resultSet.getString("agreement_id")) + .participantId(resultSet.getString("participant_id")) + .counterPartyId(resultSet.getString("counter_party_id")) + .dataspaceContext(resultSet.getString("dataspace_context")) + .callbackAddress(URI.create(resultSet.getString("callback_address"))) + .suspensionReason(resultSet.getString("suspension_reason")) + .terminationReason(resultSet.getString("termination_reason")) + .labels(fromJson(resultSet.getString("labels"), new TypeReference<>() {})) + .metadata(fromJson(resultSet.getString("metadata"), new TypeReference<>() {})) + .dataAddress(fromJson(resultSet.getString("data_address"), DataAddress.class)) + .controlplaneId(resultSet.getString("controlplane_id")) + .type(DataFlow.Type.valueOf(resultSet.getString("type"))) + .build(); + + return Result.success(dataFlow); + } catch (Exception e) { + return Result.failure(new PersistenceException(format("Failed to read DataFlow with id %s.", flowId), e)); + } finally { + closeConnection(connection); + } + } + + private String upsertDataFlowTemplate() { + return "INSERT INTO data_flows (id, transfer_type, type, state, dataset_id, agreement_id, participant_id," + + " counter_party_id, dataspace_context, callback_address, suspension_reason, termination_reason," + + " labels, metadata, data_address, controlplane_id) VALUES" + + " (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?::json, ?::json, ?::json, ?)" + + " ON CONFLICT (id) DO UPDATE SET" + + " transfer_type = EXCLUDED.transfer_type," + + " type = EXCLUDED.type," + + " state = EXCLUDED.state," + + " dataset_id = EXCLUDED.dataset_id," + + " agreement_id = EXCLUDED.agreement_id," + + " participant_id = EXCLUDED.participant_id," + + " counter_party_id = EXCLUDED.counter_party_id," + + " dataspace_context = EXCLUDED.dataspace_context," + + " callback_address = EXCLUDED.callback_address," + + " suspension_reason = EXCLUDED.suspension_reason," + + " termination_reason = EXCLUDED.termination_reason," + + " labels = EXCLUDED.labels," + + " metadata = EXCLUDED.metadata," + + " data_address = EXCLUDED.data_address," + + " controlplane_id = EXCLUDED.controlplane_id"; + } + + private String findDataFlowByIdTemplate() { + return "SELECT * FROM data_flows WHERE id = ?"; + } +} diff --git a/src/main/resources/sql/control_plane_schema.sql b/src/main/resources/sql/control_plane_schema.sql new file mode 100644 index 0000000..60ef4d1 --- /dev/null +++ b/src/main/resources/sql/control_plane_schema.sql @@ -0,0 +1,23 @@ +-- +-- Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. +-- +-- This program and the accompanying materials are made available under the +-- terms of the Apache License, Version 2.0 which is available at +-- https://www.apache.org/licenses/LICENSE-2.0 +-- +-- SPDX-License-Identifier: Apache-2.0 +-- +-- Contributors: +-- Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial script +-- + +-- THIS SCHEMA HAS BEEN WRITTEN AND TESTED ONLY FOR POSTGRES + +CREATE TABLE IF NOT EXISTS control_planes +( + id VARCHAR PRIMARY KEY, + endpoint VARCHAR, + auth JSON +); + +COMMENT ON COLUMN control_planes.auth IS 'Authorization profile serialized as JSON'; diff --git a/src/main/resources/sql/data_flow_schema.sql b/src/main/resources/sql/data_flow_schema.sql new file mode 100644 index 0000000..30469bb --- /dev/null +++ b/src/main/resources/sql/data_flow_schema.sql @@ -0,0 +1,38 @@ +-- +-- Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. +-- +-- This program and the accompanying materials are made available under the +-- terms of the Apache License, Version 2.0 which is available at +-- https://www.apache.org/licenses/LICENSE-2.0 +-- +-- SPDX-License-Identifier: Apache-2.0 +-- +-- Contributors: +-- Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial script +-- + +-- THIS SCHEMA HAS BEEN WRITTEN AND TESTED ONLY FOR POSTGRES + +CREATE TABLE IF NOT EXISTS data_flows +( + id VARCHAR PRIMARY KEY, + transfer_type VARCHAR, + type VARCHAR, + state VARCHAR NOT NULL, + dataset_id VARCHAR, + agreement_id VARCHAR, + participant_id VARCHAR, + counter_party_id VARCHAR, + dataspace_context VARCHAR, + callback_address VARCHAR, + suspension_reason VARCHAR, + termination_reason VARCHAR, + labels JSON, + metadata JSON, + data_address JSON, + controlplane_id VARCHAR +); + +COMMENT ON COLUMN data_flows.labels IS 'List of labels serialized as JSON'; +COMMENT ON COLUMN data_flows.metadata IS 'Metadata serialized as JSON'; +COMMENT ON COLUMN data_flows.data_address IS 'Data address serialized as JSON'; diff --git a/src/test/java/org/eclipse/dataplane/store/ControlPlaneStoreTestBase.java b/src/test/java/org/eclipse/dataplane/store/ControlPlaneStoreTestBase.java new file mode 100644 index 0000000..94e8d57 --- /dev/null +++ b/src/test/java/org/eclipse/dataplane/store/ControlPlaneStoreTestBase.java @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + +package org.eclipse.dataplane.store; + +import org.eclipse.dataplane.domain.controlplane.ControlPlane; +import org.eclipse.dataplane.domain.registration.AuthorizationProfile; +import org.eclipse.dataplane.port.exception.ResourceNotFoundException; +import org.eclipse.dataplane.port.store.ControlPlaneStore; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.net.URI; + +import static org.assertj.core.api.Assertions.assertThat; + +public abstract class ControlPlaneStoreTestBase { + + @Nested + class Save { + @Test + void save_newControlPlane_shouldCreate() { + var id = "id"; + var controlPlane = controlPlane(id); + + var result = store().save(controlPlane); + assertThat(result.succeeded()).isTrue(); + + var persisted = store().findById(id).getContent(); + assertThat(persisted).isNotNull(); + assertThat(persisted).usingRecursiveComparison().isEqualTo(controlPlane); + } + + @Test + void save_existingControlPlane_shouldUpdate() { + var id = "toUpdate"; + var controlPlane = controlPlane(id); + store().save(controlPlane); + + var newControlPlane = ControlPlane.newInstance() + .id(id) + .endpoint(URI.create("http://some-other-endpoint")) + .authorization(controlPlane.getAuthorization()) + .build(); + store().save(newControlPlane); + + var updated = store().findById(id).getContent(); + assertThat(updated.getEndpoint()) + .isEqualTo(newControlPlane.getEndpoint()) + .isNotEqualTo(controlPlane.getEndpoint()); + assertThat(updated.getAuthorization()).usingRecursiveComparison() + .isEqualTo(controlPlane.getAuthorization()) + .isEqualTo(newControlPlane.getAuthorization()); + } + } + + @Nested + class FindById { + @Test + void findById_exists_shouldReturnControlPlane() { + var id = "id"; + var controlPlane = controlPlane(id); + store().save(controlPlane); + + var result = store().findById(id); + + assertThat(result.succeeded()).isTrue(); + assertThat(result.getContent()).isNotNull(); + } + + @Test + void findById_doesNotExist_shouldReturnNotFound() { + var result = store().findById("nonExistent"); + + assertThat(result.failed()).isTrue(); + assertThat(result.getException()).isInstanceOf(ResourceNotFoundException.class); + } + } + + @Nested + class Delete { + @Test + void delete_exists_shouldDelete() { + var id = "id"; + var controlPlane = controlPlane(id); + store().save(controlPlane); + + var deleteResult = store().delete(id); + assertThat(deleteResult.succeeded()).isTrue(); + + var findResult = store().findById(id); + assertThat(findResult.failed()).isTrue(); + assertThat(findResult.getException()).isInstanceOf(ResourceNotFoundException.class); + } + + @Test + void delete_doesNotExist_shouldReturnNotFound() { + var deleteResult = store().delete("nonExistent"); + + assertThat(deleteResult.failed()).isTrue(); + assertThat(deleteResult.getException()).isInstanceOf(ResourceNotFoundException.class); + } + } + + @Nested + class Exists { + @Test + void exists_exists_shouldReturnTrue() { + var id = "id"; + var controlPlane = controlPlane(id); + store().save(controlPlane); + + var exists = store().exists(id); + + assertThat(exists).isTrue(); + } + + @Test + void exists_doesNotExists_shouldReturnFalse() { + var exists = store().exists("nonExistent"); + + assertThat(exists).isFalse(); + } + } + + protected abstract ControlPlaneStore store(); + + private ControlPlane controlPlane(String id) { + return ControlPlane.newInstance() + .id(id) + .endpoint(URI.create("https://controlplane")) + .authorization(new AuthorizationProfile("token") + .withAttribute("Authorization", "authToken")) + .build(); + } +} diff --git a/src/test/java/org/eclipse/dataplane/store/DataFlowStoreTestBase.java b/src/test/java/org/eclipse/dataplane/store/DataFlowStoreTestBase.java new file mode 100644 index 0000000..99650ec --- /dev/null +++ b/src/test/java/org/eclipse/dataplane/store/DataFlowStoreTestBase.java @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + +package org.eclipse.dataplane.store; + +import org.eclipse.dataplane.domain.DataAddress; +import org.eclipse.dataplane.domain.dataflow.DataFlow; +import org.eclipse.dataplane.port.exception.ResourceNotFoundException; +import org.eclipse.dataplane.port.store.DataFlowStore; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.net.URI; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +public abstract class DataFlowStoreTestBase { + + @Nested + class Save { + @Test + void save_newDataFlow_shouldCreate() { + var id = "id"; + var dataFlow = dataFlow(id); + + var result = store().save(dataFlow); + assertThat(result.succeeded()).isTrue(); + + var persisted = store().findById(id).getContent(); + assertThat(persisted).isNotNull(); + assertThat(persisted).usingRecursiveComparison().isEqualTo(dataFlow); + } + + @Test + void save_existingDataFlow_shouldUpdate() { + var id = "toUpdate"; + var dataFlow = dataFlow(id); + store().save(dataFlow); + var persisted = store().findById(id).getContent(); + assertThat(persisted.getState()).isEqualTo(DataFlow.State.INITIATING); + assertThat(persisted.getSuspensionReason()).isNull(); + + var suspensionReason = "suspend"; + dataFlow.transitionToSuspended(suspensionReason); + store().save(dataFlow); + + var updated = store().findById(id).getContent(); + assertThat(updated.getState()).isEqualTo(DataFlow.State.SUSPENDED); + assertThat(updated.getSuspensionReason()).isNotNull().isEqualTo(suspensionReason); + } + } + + @Nested + class FindById { + @Test + void findById_exists_shouldReturnDataFlow() { + var id = "id"; + var dataFlow = dataFlow(id); + store().save(dataFlow); + + var result = store().findById(id); + + assertThat(result.succeeded()).isTrue(); + assertThat(result.getContent()).isNotNull(); + } + + @Test + void findById_doesNotExist_shouldReturnNotFound() { + var result = store().findById("nonExistent"); + + assertThat(result.failed()).isTrue(); + assertThat(result.getException()).isInstanceOf(ResourceNotFoundException.class); + } + } + + protected abstract DataFlowStore store(); + + private DataFlow dataFlow(String id) { + return DataFlow.newInstance() + .id(id) + .state(DataFlow.State.INITIATING) + .transferType("HTTP-PUSH") + .datasetId("dataset") + .agreementId("agreement") + .participantId("participant") + .counterPartyId("counterParty") + .dataspaceContext("dataspaceContext") + .callbackAddress(URI.create("https://callbackAddress")) + .labels(List.of("label1", "label2")) + .metadata(Map.of("key1", "value1", "key2", "value2")) + .dataAddress(new DataAddress("http", "https://endpoint", List.of())) + .controlplaneId("controlPlane") + .type(DataFlow.Type.PROVIDER) + .build(); + } +} diff --git a/src/test/java/org/eclipse/dataplane/store/InMemoryControlPlaneStoreTest.java b/src/test/java/org/eclipse/dataplane/store/InMemoryControlPlaneStoreTest.java new file mode 100644 index 0000000..3809c56 --- /dev/null +++ b/src/test/java/org/eclipse/dataplane/store/InMemoryControlPlaneStoreTest.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + +package org.eclipse.dataplane.store; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.dataplane.port.store.ControlPlaneStore; +import org.eclipse.dataplane.port.store.InMemoryControlPlaneStore; + +import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; + +class InMemoryControlPlaneStoreTest extends ControlPlaneStoreTestBase { + + private InMemoryControlPlaneStore store = new InMemoryControlPlaneStore(new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false)); + + @Override + protected ControlPlaneStore store() { + return store; + } +} diff --git a/src/test/java/org/eclipse/dataplane/store/InMemoryDataFlowStoreTest.java b/src/test/java/org/eclipse/dataplane/store/InMemoryDataFlowStoreTest.java new file mode 100644 index 0000000..e88c103 --- /dev/null +++ b/src/test/java/org/eclipse/dataplane/store/InMemoryDataFlowStoreTest.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + +package org.eclipse.dataplane.store; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.dataplane.port.store.DataFlowStore; +import org.eclipse.dataplane.port.store.InMemoryDataFlowStore; + +import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; + +class InMemoryDataFlowStoreTest extends DataFlowStoreTestBase { + + private InMemoryDataFlowStore store = new InMemoryDataFlowStore(new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false)); + + @Override + protected DataFlowStore store() { + return store; + } +} diff --git a/src/test/java/org/eclipse/dataplane/store/sql/PostgresControlPlaneStoreTest.java b/src/test/java/org/eclipse/dataplane/store/sql/PostgresControlPlaneStoreTest.java new file mode 100644 index 0000000..a7ef3c0 --- /dev/null +++ b/src/test/java/org/eclipse/dataplane/store/sql/PostgresControlPlaneStoreTest.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + +package org.eclipse.dataplane.store.sql; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.dataplane.port.store.ControlPlaneStore; +import org.eclipse.dataplane.port.store.sql.PostgresControlPlaneStore; +import org.eclipse.dataplane.store.ControlPlaneStoreTestBase; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.postgresql.PostgreSQLContainer; + +import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; + +@Testcontainers +class PostgresControlPlaneStoreTest extends ControlPlaneStoreTestBase { + + private static final String POSTGRES_IMAGE = "postgres:18.3"; + private static final String DATABASE = "dataplane"; + private static final String USERNAME = "user"; + private static final String PASSWORD = "password"; + + private final ObjectMapper mapper = new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false); + private PostgresControlPlaneStore store; + + @Container + static PostgreSQLContainer postgres = new PostgreSQLContainer(POSTGRES_IMAGE) + .withDatabaseName(DATABASE) + .withUsername(USERNAME) + .withPassword(PASSWORD) + .withInitScript("sql/control_plane_schema.sql"); + + @BeforeAll + static void init() { + postgres.start(); + } + + @AfterAll + static void cleanUp() { + postgres.stop(); + postgres.close(); + } + + @BeforeEach + void initStore() { + store = new PostgresControlPlaneStore(mapper, postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword()); + } + + @Override + protected ControlPlaneStore store() { + return store; + } +} diff --git a/src/test/java/org/eclipse/dataplane/store/sql/PostgresDataFlowStoreTest.java b/src/test/java/org/eclipse/dataplane/store/sql/PostgresDataFlowStoreTest.java new file mode 100644 index 0000000..e5c060d --- /dev/null +++ b/src/test/java/org/eclipse/dataplane/store/sql/PostgresDataFlowStoreTest.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation + * + */ + +package org.eclipse.dataplane.store.sql; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.dataplane.port.store.DataFlowStore; +import org.eclipse.dataplane.port.store.sql.PostgresDataFlowStore; +import org.eclipse.dataplane.store.DataFlowStoreTestBase; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.postgresql.PostgreSQLContainer; + +import static com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES; + +@Testcontainers +class PostgresDataFlowStoreTest extends DataFlowStoreTestBase { + + private static final String POSTGRES_IMAGE = "postgres:18.3"; + private static final String DATABASE = "dataplane"; + private static final String USERNAME = "user"; + private static final String PASSWORD = "password"; + + private final ObjectMapper mapper = new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false); + private PostgresDataFlowStore store; + + @Container + static PostgreSQLContainer postgres = new PostgreSQLContainer(POSTGRES_IMAGE) + .withDatabaseName(DATABASE) + .withUsername(USERNAME) + .withPassword(PASSWORD) + .withInitScript("sql/data_flow_schema.sql"); + + @BeforeAll + static void init() { + postgres.start(); + } + + @AfterAll + static void cleanUp() { + postgres.stop(); + postgres.close(); + } + + @BeforeEach + void initStore() { + store = new PostgresDataFlowStore(mapper, postgres.getJdbcUrl(), postgres.getUsername(), postgres.getPassword()); + } + + @Override + protected DataFlowStore store() { + return store; + } +}