Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ dependencies {
testImplementation("org.mockito:mockito-core:5.23.0")
testImplementation("org.slf4j:slf4j-simple:2.0.17")
testImplementation("org.wiremock:wiremock-jetty12:3.13.2")
testImplementation("org.testcontainers:testcontainers-junit-jupiter:2.0.5")
testImplementation("org.testcontainers:testcontainers-postgresql:2.0.5")
testImplementation("org.postgresql:postgresql:42.7.11")
}

tasks.test {
Expand Down
11 changes: 9 additions & 2 deletions src/main/java/org/eclipse/dataplane/Dataplane.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.eclipse.dataplane.port.store.DataFlowStore;
import org.eclipse.dataplane.port.store.InMemoryControlPlaneStore;
import org.eclipse.dataplane.port.store.InMemoryDataFlowStore;
import org.eclipse.dataplane.port.store.Stores;

import java.net.URI;
import java.net.http.HttpClient;
Expand All @@ -69,8 +70,8 @@
public class Dataplane {

private final ObjectMapper objectMapper = new ObjectMapper().configure(FAIL_ON_UNKNOWN_PROPERTIES, false);
private final DataFlowStore dataFlowStore = new InMemoryDataFlowStore(objectMapper);
private final ControlPlaneStore controlPlaneStore = new InMemoryControlPlaneStore(objectMapper);
private DataFlowStore dataFlowStore = new InMemoryDataFlowStore(objectMapper);
private ControlPlaneStore controlPlaneStore = new InMemoryControlPlaneStore(objectMapper);
private String id;
private URI endpoint;
private final Set<String> transferTypes = new HashSet<>();
Expand Down Expand Up @@ -454,6 +455,12 @@ public Builder label(String label) {
return this;
}

public Builder stores(Stores stores) {
dataplane.dataFlowStore = stores.dataFlowStore();
dataplane.controlPlaneStore = stores.controlPlaneStore();
return this;
}

public Builder onPrepare(OnPrepare onPrepare) {
dataplane.onPrepare = onPrepare;
return this;
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/org/eclipse/dataplane/domain/dataflow/DataFlow.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,16 @@ public Builder callbackAddress(URI callbackAddress) {
return this;
}

public Builder suspensionReason(String suspensionReason) {
dataFlow.suspensionReason = suspensionReason;
return this;
}

public Builder terminationReason(String terminationReason) {
dataFlow.terminationReason = terminationReason;
return this;
}

public Builder metadata(Map<String, Object> metadata) {
dataFlow.metadata = metadata;
return this;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation
*
*/

package org.eclipse.dataplane.port.exception;

/**
* Indicates an error during database interactions, i.e. an error occurred persisting, reading or
* deleting an entry.
*/
public class PersistenceException extends RuntimeException {

public PersistenceException(String message) {
super(message);
}

public PersistenceException(String message, Throwable cause) {
super(message, cause);
}
}
24 changes: 24 additions & 0 deletions src/main/java/org/eclipse/dataplane/port/store/Stores.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation
*
*/

package org.eclipse.dataplane.port.store;

/**
* Data class that bundles the stores used by the dataplane.
*
* @param dataFlowStore store for data flows
* @param controlPlaneStore store for control planes
*/
public record Stores(DataFlowStore dataFlowStore, ControlPlaneStore controlPlaneStore) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation
*
*/

package org.eclipse.dataplane.port.store.sql;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.dataplane.port.exception.PersistenceException;

import java.sql.Connection;
import java.sql.DriverManager;

/**
* Base class for SQL-based store implementations that provides methods for common functionality
* like connection handling and JSON parsing.
*/
public abstract class AbstractSqlStore {

protected ObjectMapper objectMapper;

private final String databaseUrl;
private final String databaseUsername;
private final String databasePassword;

public AbstractSqlStore(ObjectMapper objectMapper, String databaseUrl, String databaseUsername, String databasePassword) {
this.objectMapper = objectMapper;
this.databaseUrl = databaseUrl;
this.databaseUsername = databaseUsername;
this.databasePassword = databasePassword;
}

protected Connection getConnection() {
try {
return DriverManager.getConnection(databaseUrl, databaseUsername, databasePassword);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes me think that we should create a refactor issue to permit to setup an external connection pool if needed (very likely in any production environment)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, was thinking about that as well. Will create a follow-up issue once this PR is merged.

} catch (Exception e) {
throw new PersistenceException("Failed to connect to database.", e);
}
}

protected void closeConnection(Connection connection) {
try {
connection.close();
} catch (Exception e) {
throw new PersistenceException("Failed to commit transaction.", e);
}
}

protected String toJson(Object object) {
if (object == null) {
return null;
}

try {
return object instanceof String ? object.toString() : objectMapper.writeValueAsString(object);
} catch (JsonProcessingException e) {
throw new PersistenceException("Failed to convert object to JSON.", e);
}
}

protected <T> T fromJson(String json, Class<T> type) {
return fromJson(json, objectMapper.getTypeFactory().constructType(type));
}

protected <T> T fromJson(String json, TypeReference<T> type) {
return fromJson(json, objectMapper.getTypeFactory().constructType(type));
}

protected <T> T fromJson(String json, JavaType type) {
if (json == null) {
return null;
}

try {
return objectMapper.readValue(json, type);
} catch (JsonProcessingException e) {
throw new PersistenceException("Failed to convert JSON to object.", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright (c) 2026 Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. - initial API and implementation
*
*/

package org.eclipse.dataplane.port.store.sql;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.eclipse.dataplane.domain.Result;
import org.eclipse.dataplane.domain.controlplane.ControlPlane;
import org.eclipse.dataplane.domain.registration.AuthorizationProfile;
import org.eclipse.dataplane.port.exception.PersistenceException;
import org.eclipse.dataplane.port.exception.ResourceNotFoundException;
import org.eclipse.dataplane.port.store.ControlPlaneStore;

import java.net.URI;

import static java.lang.String.format;

public class PostgresControlPlaneStore extends AbstractSqlStore implements ControlPlaneStore {

public PostgresControlPlaneStore(ObjectMapper objectMapper, String databaseUrl, String databaseUsername, String databasePassword) {
super(objectMapper, databaseUrl, databaseUsername, databasePassword);
}

@Override
public Result<Void> save(ControlPlane controlPlane) {
var connection = getConnection();

try (var statement = connection.prepareStatement(upsertControlPlaneTemplate())) {
statement.setString(1, controlPlane.getId());
statement.setString(2, controlPlane.getEndpoint().toString());
statement.setString(3, toJson(controlPlane.getAuthorization()));

statement.executeUpdate();
return Result.success();
} catch (Exception e) {
return Result.failure(new PersistenceException(format("Failed to persist ControlPlane with id %s.", controlPlane.getId()), e));
} finally {
closeConnection(connection);
}
}

@Override
public Result<ControlPlane> findById(String controlplaneId) {
var connection = getConnection();

try (var statement = connection.prepareStatement(findControlPlaneByIdTemplate())) {
statement.setString(1, controlplaneId);
var resultSet = statement.executeQuery();

if (!resultSet.next()) {
return Result.failure(new ResourceNotFoundException(format("ControlPlane with id %s not found.", controlplaneId)));
}

var controlplane = ControlPlane.newInstance()
.id(controlplaneId)
.endpoint(URI.create(resultSet.getString("endpoint")))
.authorization(fromJson(resultSet.getString("auth"), AuthorizationProfile.class))
.build();
return Result.success(controlplane);
} catch (Exception e) {
return Result.failure(new PersistenceException(format("Failed to read ControlPlane with id %s.", controlplaneId), e));
} finally {
closeConnection(connection);
}
}

@Override
public Result<Void> delete(String id) {
var connection = getConnection();

try (var statement = connection.prepareStatement(deleteControlPlaneByIdTemplate())) {
statement.setString(1, id);
var rows = statement.executeUpdate();
if (rows < 1) {
return Result.failure(new ResourceNotFoundException(format("ControlPlane with id %s not found.", id)));
}
return Result.success();
} catch (Exception e) {
return Result.failure(new PersistenceException(format("Failed to delete ControlPlane with id %s.", id), e));
} finally {
closeConnection(connection);
}
}

@Override
public boolean exists(String controlplaneId) {
var connection = getConnection();

try (var statement = connection.prepareStatement(countControlPlaneByIdTemplate())) {
statement.setString(1, controlplaneId);
var resultSet = statement.executeQuery();
resultSet.next();
return resultSet.getInt(1) > 0;
} catch (Exception e) {
throw new PersistenceException(format("Failed to check for existence of ControlPlane with id %s.", controlplaneId), e);
} finally {
closeConnection(connection);
}
}

private String upsertControlPlaneTemplate() {
return "INSERT INTO control_planes (id, endpoint, auth) VALUES (?, ?, ?::json)" +
" ON CONFLICT (id) DO UPDATE SET" +
" endpoint = EXCLUDED.endpoint," +
" auth = EXCLUDED.auth";
}

private String findControlPlaneByIdTemplate() {
return "SELECT * FROM control_planes WHERE id = ?";
}

private String deleteControlPlaneByIdTemplate() {
return "DELETE FROM control_planes WHERE id = ?";
}

private String countControlPlaneByIdTemplate() {
return "SELECT COUNT(*) FROM control_planes WHERE id = ?";
}
}
Loading