Skip to content

Commit 3155daa

Browse files
authored
[FLINK-38263][table] Add SecretStore related interfaces
This closes #27394.
1 parent 4e088de commit 3155daa

18 files changed

Lines changed: 1025 additions & 4 deletions

File tree

flink-python/pyflink/table/tests/test_environment_settings_completeness.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ def java_class(cls):
3838
def excluded_methods(cls):
3939
# internal interfaces, no need to expose to users.
4040
return {'getPlanner', 'getExecutor', 'getUserClassLoader', 'getCatalogStore',
41-
'toConfiguration', 'fromConfiguration', 'getSqlFactory'}
41+
'getSecretStore', 'toConfiguration', 'fromConfiguration', 'getSqlFactory'}
4242

4343

4444
class EnvironmentSettingsBuilderCompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCase):
@@ -59,7 +59,7 @@ def java_class(cls):
5959
def excluded_methods(cls):
6060
# internal interfaces, no need to expose to users.
6161
# withSqlFactory - needs to be implemented
62-
return {'withClassLoader', 'withCatalogStore', 'withSqlFactory'}
62+
return {'withClassLoader', 'withCatalogStore', 'withSecretStore', 'withSqlFactory'}
6363

6464
if __name__ == '__main__':
6565
import unittest

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.table.catalog.CatalogStore;
2626
import org.apache.flink.table.expressions.SqlFactory;
2727
import org.apache.flink.table.functions.UserDefinedFunction;
28+
import org.apache.flink.table.secret.SecretStore;
2829

2930
import javax.annotation.Nullable;
3031

@@ -66,16 +67,19 @@ public class EnvironmentSettings {
6667

6768
private final @Nullable CatalogStore catalogStore;
6869
private final @Nullable SqlFactory sqlFactory;
70+
private final @Nullable SecretStore secretStore;
6971

7072
private EnvironmentSettings(
7173
Configuration configuration,
7274
ClassLoader classLoader,
7375
CatalogStore catalogStore,
74-
SqlFactory sqlFactory) {
76+
SqlFactory sqlFactory,
77+
SecretStore secretStore) {
7578
this.configuration = configuration;
7679
this.classLoader = classLoader;
7780
this.catalogStore = catalogStore;
7881
this.sqlFactory = sqlFactory;
82+
this.secretStore = secretStore;
7983
}
8084

8185
/**
@@ -153,6 +157,11 @@ public Optional<SqlFactory> getSqlFactory() {
153157
return Optional.ofNullable(sqlFactory);
154158
}
155159

160+
@Internal
161+
public Optional<SecretStore> getSecretStore() {
162+
return Optional.ofNullable(secretStore);
163+
}
164+
156165
/** A builder for {@link EnvironmentSettings}. */
157166
@PublicEvolving
158167
public static class Builder {
@@ -162,6 +171,7 @@ public static class Builder {
162171

163172
private @Nullable CatalogStore catalogStore;
164173
private @Nullable SqlFactory sqlFactory;
174+
private @Nullable SecretStore secretStore;
165175

166176
public Builder() {}
167177

@@ -250,12 +260,28 @@ public Builder withSqlFactory(SqlFactory sqlFactory) {
250260
return this;
251261
}
252262

263+
/**
264+
* Specifies the {@link SecretStore} to be used for managing secrets in the {@link
265+
* TableEnvironment}.
266+
*
267+
* <p>The secret store allows for secure storage and retrieval of sensitive configuration
268+
* data such as credentials, tokens, and passwords.
269+
*
270+
* @param secretStore the secret store instance to use
271+
* @return this builder
272+
*/
273+
public Builder withSecretStore(SecretStore secretStore) {
274+
this.secretStore = secretStore;
275+
return this;
276+
}
277+
253278
/** Returns an immutable instance of {@link EnvironmentSettings}. */
254279
public EnvironmentSettings build() {
255280
if (classLoader == null) {
256281
classLoader = Thread.currentThread().getContextClassLoader();
257282
}
258-
return new EnvironmentSettings(configuration, classLoader, catalogStore, sqlFactory);
283+
return new EnvironmentSettings(
284+
configuration, classLoader, catalogStore, sqlFactory, secretStore);
259285
}
260286
}
261287
}

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@
120120
import org.apache.flink.table.resource.ResourceManager;
121121
import org.apache.flink.table.resource.ResourceType;
122122
import org.apache.flink.table.resource.ResourceUri;
123+
import org.apache.flink.table.secret.SecretStore;
124+
import org.apache.flink.table.secret.SecretStoreFactory;
123125
import org.apache.flink.table.types.AbstractDataType;
124126
import org.apache.flink.table.types.DataType;
125127
import org.apache.flink.table.types.utils.DataTypeUtils;
@@ -258,6 +260,13 @@ public static TableEnvironmentImpl create(EnvironmentSettings settings) {
258260
final CatalogStore catalogStore = catalogStoreResult.getCatalogStore();
259261
final CatalogStoreFactory catalogStoreFactory = catalogStoreResult.getCatalogStoreFactory();
260262

263+
final ApiFactoryUtil.SecretStoreResult secretStoreResult =
264+
ApiFactoryUtil.getOrCreateSecretStore(
265+
settings.getSecretStore(), settings.getConfiguration(), userClassLoader);
266+
final SecretStore secretStore = secretStoreResult.getSecretStore();
267+
final SecretStoreFactory secretStoreFactory = secretStoreResult.getSecretStoreFactory();
268+
// TODO (FLINK-38261): pass secret store to catalog manager for encryption/decryption
269+
261270
// use configuration to init table config
262271
final TableConfig tableConfig = TableConfig.getDefault();
263272
tableConfig.setRootConfiguration(executor.getConfiguration());

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ApiFactoryUtil.java

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,14 @@
1919
package org.apache.flink.table.factories;
2020

2121
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.annotation.VisibleForTesting;
2223
import org.apache.flink.configuration.Configuration;
2324
import org.apache.flink.configuration.DelegatingConfiguration;
2425
import org.apache.flink.table.catalog.CatalogStore;
2526
import org.apache.flink.table.catalog.CommonCatalogOptions;
27+
import org.apache.flink.table.secret.CommonSecretOptions;
28+
import org.apache.flink.table.secret.SecretStore;
29+
import org.apache.flink.table.secret.SecretStoreFactory;
2630

2731
import javax.annotation.Nullable;
2832

@@ -127,4 +131,101 @@ public static CatalogStoreFactory.Context buildCatalogStoreFactoryContext(
127131

128132
return context;
129133
}
134+
135+
/** Result holder for secret store and factory. */
136+
@Internal
137+
public static class SecretStoreResult {
138+
private final SecretStore secretStore;
139+
@Nullable private final SecretStoreFactory secretStoreFactory;
140+
141+
public SecretStoreResult(
142+
SecretStore secretStore, @Nullable SecretStoreFactory secretStoreFactory) {
143+
this.secretStore = secretStore;
144+
this.secretStoreFactory = secretStoreFactory;
145+
}
146+
147+
public SecretStore getSecretStore() {
148+
return secretStore;
149+
}
150+
151+
@Nullable
152+
public SecretStoreFactory getSecretStoreFactory() {
153+
return secretStoreFactory;
154+
}
155+
}
156+
157+
/**
158+
* Gets or creates a {@link SecretStore}. If a secret store is provided in settings, it will be
159+
* used directly. Otherwise, a new secret store will be created using the factory.
160+
*
161+
* @param providedSecretStore the secret store from settings, if present
162+
* @param configuration the configuration
163+
* @param classLoader the user classloader
164+
* @return a result containing the secret store and factory (factory is null if store was
165+
* provided)
166+
*/
167+
public static SecretStoreResult getOrCreateSecretStore(
168+
Optional<SecretStore> providedSecretStore,
169+
Configuration configuration,
170+
ClassLoader classLoader) {
171+
if (providedSecretStore.isPresent()) {
172+
return new SecretStoreResult(providedSecretStore.get(), null);
173+
} else {
174+
SecretStoreFactory secretStoreFactory =
175+
findAndCreateSecretStoreFactory(configuration, classLoader);
176+
SecretStoreFactory.Context secretStoreFactoryContext =
177+
buildSecretStoreFactoryContext(configuration, classLoader);
178+
secretStoreFactory.open(secretStoreFactoryContext);
179+
SecretStore secretStore = secretStoreFactory.createSecretStore();
180+
return new SecretStoreResult(secretStore, secretStoreFactory);
181+
}
182+
}
183+
184+
/**
185+
* Finds and creates a {@link SecretStoreFactory} using the provided {@link Configuration} and
186+
* user classloader.
187+
*
188+
* <p>The configuration format should be as follows:
189+
*
190+
* <pre>{@code
191+
* table.secret-store.kind: {identifier}
192+
* table.secret-store.{identifier}.{param1}: xxx
193+
* table.secret-store.{identifier}.{param2}: xxx
194+
* }</pre>
195+
*/
196+
@VisibleForTesting
197+
static SecretStoreFactory findAndCreateSecretStoreFactory(
198+
Configuration configuration, ClassLoader classLoader) {
199+
String identifier = configuration.get(CommonSecretOptions.TABLE_SECRET_STORE_KIND);
200+
201+
SecretStoreFactory secretStoreFactory =
202+
FactoryUtil.discoverFactory(classLoader, SecretStoreFactory.class, identifier);
203+
204+
return secretStoreFactory;
205+
}
206+
207+
/**
208+
* Build a {@link SecretStoreFactory.Context} for opening the {@link SecretStoreFactory}.
209+
*
210+
* <p>The configuration format should be as follows:
211+
*
212+
* <pre>{@code
213+
* table.secret-store.kind: {identifier}
214+
* table.secret-store.{identifier}.{param1}: xxx
215+
* table.secret-store.{identifier}.{param2}: xxx
216+
* }</pre>
217+
*/
218+
@VisibleForTesting
219+
static SecretStoreFactory.Context buildSecretStoreFactoryContext(
220+
Configuration configuration, ClassLoader classLoader) {
221+
String identifier = configuration.get(CommonSecretOptions.TABLE_SECRET_STORE_KIND);
222+
String secretStoreOptionPrefix =
223+
CommonSecretOptions.TABLE_SECRET_STORE_OPTION_PREFIX + identifier + ".";
224+
Map<String, String> options =
225+
new DelegatingConfiguration(configuration, secretStoreOptionPrefix).toMap();
226+
SecretStoreFactory.Context context =
227+
new FactoryUtil.DefaultSecretStoreContext(options, configuration, classLoader);
228+
229+
return context;
230+
}
130231
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.secret;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.table.secret.exceptions.SecretNotFoundException;
23+
24+
import java.util.Collections;
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
import java.util.UUID;
28+
29+
import static org.apache.flink.util.Preconditions.checkNotNull;
30+
31+
/**
32+
* A generic in-memory implementation of both {@link ReadableSecretStore} and {@link
33+
* WritableSecretStore}.
34+
*
35+
* <p>This implementation stores secrets in memory as immutable Map objects. It is suitable for
36+
* testing and development purposes but should not be used in production environments as secrets are
37+
* not encrypted.
38+
*/
39+
@Internal
40+
public class GenericInMemorySecretStore implements ReadableSecretStore, WritableSecretStore {
41+
42+
private final Map<String, Map<String, String>> secrets;
43+
44+
public GenericInMemorySecretStore() {
45+
this.secrets = new HashMap<>();
46+
}
47+
48+
@Override
49+
public Map<String, String> getSecret(String secretId) throws SecretNotFoundException {
50+
checkNotNull(secretId, "Secret ID cannot be null");
51+
52+
Map<String, String> secretData = secrets.get(secretId);
53+
if (secretData == null) {
54+
throw new SecretNotFoundException(
55+
String.format("Secret with ID '%s' not found", secretId));
56+
}
57+
58+
return secretData;
59+
}
60+
61+
@Override
62+
public String storeSecret(Map<String, String> secretData) {
63+
checkNotNull(secretData, "Secret data cannot be null");
64+
65+
String secretId = UUID.randomUUID().toString();
66+
secrets.put(secretId, Collections.unmodifiableMap(new HashMap<>(secretData)));
67+
return secretId;
68+
}
69+
70+
@Override
71+
public void removeSecret(String secretId) {
72+
checkNotNull(secretId, "Secret ID cannot be null");
73+
secrets.remove(secretId);
74+
}
75+
76+
@Override
77+
public void updateSecret(String secretId, Map<String, String> newSecretData)
78+
throws SecretNotFoundException {
79+
checkNotNull(secretId, "Secret ID cannot be null");
80+
checkNotNull(newSecretData, "New secret data cannot be null");
81+
82+
if (!secrets.containsKey(secretId)) {
83+
throw new SecretNotFoundException(
84+
String.format("Secret with ID '%s' not found", secretId));
85+
}
86+
87+
secrets.put(secretId, Collections.unmodifiableMap(new HashMap<>(newSecretData)));
88+
}
89+
90+
/** Clears all secrets from the store (for testing purposes). */
91+
void clear() {
92+
secrets.clear();
93+
}
94+
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.secret;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.configuration.ConfigOption;
23+
import org.apache.flink.table.catalog.exceptions.CatalogException;
24+
25+
import java.util.Set;
26+
27+
/**
28+
* Factory for creating {@link GenericInMemorySecretStore} instances.
29+
*
30+
* <p>This factory creates in-memory secret stores that are suitable for testing and development
31+
* purposes. Secrets are stored in plaintext JSON format in memory and are not persisted.
32+
*/
33+
@Internal
34+
public class GenericInMemorySecretStoreFactory implements SecretStoreFactory {
35+
36+
public static final String IDENTIFIER = "generic_in_memory";
37+
38+
@Override
39+
public String factoryIdentifier() {
40+
return IDENTIFIER;
41+
}
42+
43+
@Override
44+
public Set<ConfigOption<?>> requiredOptions() {
45+
return Set.of();
46+
}
47+
48+
@Override
49+
public Set<ConfigOption<?>> optionalOptions() {
50+
return Set.of();
51+
}
52+
53+
@Override
54+
public SecretStore createSecretStore() {
55+
return new GenericInMemorySecretStore();
56+
}
57+
58+
@Override
59+
public void open(Context context) {}
60+
61+
@Override
62+
public void close() throws CatalogException {}
63+
}

0 commit comments

Comments
 (0)