Skip to content

Commit 21e6d3f

Browse files
authored
Pipe: add mark-as-general-write-request parameter in pipe to force forwarding event (#15572)
1 parent cf75b60 commit 21e6d3f

2 files changed

Lines changed: 29 additions & 3 deletions

File tree

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeConnectorConstant.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,12 @@ public class PipeConnectorConstant {
255255
public static final String SINK_MARK_AS_PIPE_REQUEST_KEY = "sink.mark-as-pipe-request";
256256
public static final boolean CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE = true;
257257

258+
public static final String CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_KEY =
259+
"connector.mark-as-general-write-request";
260+
public static final String SINK_MARK_AS_GENERAL_WRITE_REQUEST_KEY =
261+
"sink.mark-as-general-write-request";
262+
public static final boolean CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_DEFAULT_VALUE = false;
263+
258264
public static final String CONNECTOR_SKIP_IF_KEY = "connector.skipif";
259265
public static final String SINK_SKIP_IF_KEY = "sink.skipif";
260266
public static final String CONNECTOR_IOTDB_SKIP_IF_NO_PRIVILEGES = "no-privileges";

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/protocol/IoTDBConnector.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@
103103
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_STRATEGY_SYNC_VALUE;
104104
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_VALIDATION_DEFAULT_VALUE;
105105
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_LOAD_TSFILE_VALIDATION_KEY;
106+
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_DEFAULT_VALUE;
107+
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_KEY;
106108
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE;
107109
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_MARK_AS_PIPE_REQUEST_KEY;
108110
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_RATE_LIMIT_DEFAULT_VALUE;
@@ -132,6 +134,7 @@
132134
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_BALANCE_STRATEGY_KEY;
133135
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_STRATEGY_KEY;
134136
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_LOAD_TSFILE_VALIDATION_KEY;
137+
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_MARK_AS_GENERAL_WRITE_REQUEST_KEY;
135138
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_MARK_AS_PIPE_REQUEST_KEY;
136139
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_RATE_LIMIT_KEY;
137140
import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.SINK_SKIP_IF_KEY;
@@ -229,6 +232,13 @@ public void validate(final PipeParameterValidator validator) throws Exception {
229232
Arrays.asList(CONNECTOR_IOTDB_BATCH_DELAY_SECONDS_KEY, SINK_IOTDB_BATCH_DELAY_SECONDS_KEY),
230233
false);
231234

235+
// Check coexistence of mark-as-pipe-request and mark-as-general-write-request
236+
validator.validateSynonymAttributes(
237+
Arrays.asList(CONNECTOR_MARK_AS_PIPE_REQUEST_KEY, SINK_MARK_AS_PIPE_REQUEST_KEY),
238+
Arrays.asList(
239+
CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_KEY, SINK_MARK_AS_GENERAL_WRITE_REQUEST_KEY),
240+
false);
241+
232242
username =
233243
parameters.getStringOrDefault(
234244
Arrays.asList(
@@ -383,10 +393,20 @@ public void customize(
383393
.equals(CONNECTOR_FORMAT_TS_FILE_VALUE);
384394
LOGGER.info("IoTDBConnector isTabletBatchModeEnabled: {}", isTabletBatchModeEnabled);
385395

386-
shouldMarkAsPipeRequest =
396+
final boolean shouldMarkAsGeneralWriteRequest =
387397
parameters.getBooleanOrDefault(
388-
Arrays.asList(CONNECTOR_MARK_AS_PIPE_REQUEST_KEY, SINK_MARK_AS_PIPE_REQUEST_KEY),
389-
CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE);
398+
Arrays.asList(
399+
CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_KEY,
400+
SINK_MARK_AS_GENERAL_WRITE_REQUEST_KEY),
401+
CONNECTOR_MARK_AS_GENERAL_WRITE_REQUEST_DEFAULT_VALUE);
402+
if (shouldMarkAsGeneralWriteRequest) {
403+
shouldMarkAsPipeRequest = false;
404+
} else {
405+
shouldMarkAsPipeRequest =
406+
parameters.getBooleanOrDefault(
407+
Arrays.asList(CONNECTOR_MARK_AS_PIPE_REQUEST_KEY, SINK_MARK_AS_PIPE_REQUEST_KEY),
408+
CONNECTOR_MARK_AS_PIPE_REQUEST_DEFAULT_VALUE);
409+
}
390410
LOGGER.info("IoTDBConnector shouldMarkAsPipeRequest: {}", shouldMarkAsPipeRequest);
391411

392412
final String connectorSkipIfValue =

0 commit comments

Comments
 (0)