diff --git a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java index fec5b97b6fbf1..2c56b2336601e 100644 --- a/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java +++ b/example/mqtt-customize/src/main/java/org/apache/iotdb/mqtt/server/CustomizedJsonPayloadFormatter.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.protocol.mqtt.TreeMessage; import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.NotImplementedException; import java.util.ArrayList; import java.util.Arrays; @@ -33,7 +34,7 @@ public class CustomizedJsonPayloadFormatter implements PayloadFormatter { @Override - public List format(ByteBuf payload) { + public List format(String topic, ByteBuf payload) { // Suppose the payload is a json format if (payload == null) { return Collections.emptyList(); @@ -54,6 +55,12 @@ public List format(ByteBuf payload) { return ret; } + @Override + @Deprecated + public List format(ByteBuf payload) { + throw new NotImplementedException(); + } + @Override public String getName() { // set the value of mqtt_payload_formatter in iotdb-common.properties as the following string: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java index 258c98d636454..745fbd7215b6b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatter.java @@ -27,6 +27,7 @@ import com.google.gson.JsonParseException; import com.google.gson.reflect.TypeToken; import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.NotImplementedException; import org.apache.tsfile.enums.TSDataType; import java.nio.charset.StandardCharsets; @@ -50,7 +51,7 @@ public class JSONPayloadFormatter implements PayloadFormatter { private static final Gson GSON = new GsonBuilder().create(); @Override - public List format(ByteBuf payload) { + public List format(String topic, ByteBuf payload) { if (payload == null) { return new ArrayList<>(); } @@ -81,6 +82,12 @@ public List format(ByteBuf payload) { throw new JsonParseException("payload is invalidate"); } + @Override + @Deprecated + public List format(ByteBuf payload) { + throw new NotImplementedException(); + } + private List formatJson(JsonObject jsonObject) { TreeMessage message = new TreeMessage(); message.setDevice(jsonObject.get(JSON_KEY_DEVICE).getAsString()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java index 8e389f78760b1..63767a7a450bb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatter.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.protocol.mqtt; import io.netty.buffer.ByteBuf; +import org.apache.commons.lang3.NotImplementedException; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.Pair; @@ -63,7 +64,7 @@ public LinePayloadFormatter() { } @Override - public List format(ByteBuf payload) { + public List format(String topic, ByteBuf payload) { List messages = new ArrayList<>(); if (payload == null) { return messages; @@ -71,6 +72,8 @@ public List format(ByteBuf payload) { String txt = payload.toString(StandardCharsets.UTF_8); String[] lines = txt.split(LINE_BREAK); + // '/' previously defined as a database name + String database = !topic.contains("/") ? topic : topic.substring(0, topic.indexOf("/")); for (String line : lines) { if (line.trim().startsWith(WELL)) { continue; @@ -83,6 +86,9 @@ public List format(ByteBuf payload) { continue; } + // Parsing Database Name + message.setDatabase((database)); + // Parsing Table Names message.setTable(matcher.group(TABLE)); @@ -121,6 +127,12 @@ public List format(ByteBuf payload) { return messages; } + @Override + @Deprecated + public List format(ByteBuf payload) { + throw new NotImplementedException(); + } + private boolean setTags(Matcher matcher, TableMessage message) { List tagKeys = new ArrayList<>(); List tagValues = new ArrayList<>(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java index 288efb85d6d44..ad97dd310c3d5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/MPPPublishHandler.java @@ -136,7 +136,7 @@ public void onPublish(InterceptPublishMessage msg) { topic, payload); - List messages = payloadFormat.format(payload); + List messages = payloadFormat.format(topic, payload); if (messages == null) { return; } @@ -146,14 +146,7 @@ public void onPublish(InterceptPublishMessage msg) { continue; } if (useTableInsert) { - TableMessage tableMessage = (TableMessage) message; - // '/' previously defined as a database name - String database = - !msg.getTopicName().contains("/") - ? msg.getTopicName() - : msg.getTopicName().substring(0, msg.getTopicName().indexOf("/")); - tableMessage.setDatabase(database); - insertTable(tableMessage, session); + insertTable((TableMessage) message, session); } else { insertTree((TreeMessage) message, session); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java index 278d6eb3743ec..c86648ac16136 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/mqtt/PayloadFormatter.java @@ -40,8 +40,20 @@ public interface PayloadFormatter { * @param payload * @return */ + @Deprecated List format(ByteBuf payload); + /** + * format a payload of a topic to a list of messages + * + * @param topic + * @param payload + * @return + */ + default List format(String topic, ByteBuf payload) { + return format(payload); + } + /** * get the formatter name * diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java index bc721406d9a8a..deecf607d8162 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/JSONPayloadFormatterTest.java @@ -38,9 +38,10 @@ public void formatJson() { + " }"; ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8); + String topic = ""; JSONPayloadFormatter formatter = new JSONPayloadFormatter(); - TreeMessage message = (TreeMessage) formatter.format(buf).get(0); + TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(0); assertEquals("root.sg.d1", message.getDevice()); assertEquals(Long.valueOf(1586076045524L), message.getTimestamp()); @@ -59,9 +60,10 @@ public void formatBatchJson() { + " }"; ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8); + String topic = ""; JSONPayloadFormatter formatter = new JSONPayloadFormatter(); - TreeMessage message = (TreeMessage) formatter.format(buf).get(1); + TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(1); assertEquals("root.sg.d1", message.getDevice()); assertEquals(Long.valueOf(1586076065526L), message.getTimestamp()); @@ -88,9 +90,10 @@ public void formatJsonArray() { + "]"; ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8); + String topic = ""; JSONPayloadFormatter formatter = new JSONPayloadFormatter(); - TreeMessage message = (TreeMessage) formatter.format(buf).get(1); + TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(1); assertEquals("root.sg.d2", message.getDevice()); assertEquals(Long.valueOf(1586076065526L), message.getTimestamp()); @@ -117,9 +120,10 @@ public void formatBatchJsonArray() { + "]"; ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8); + String topic = ""; JSONPayloadFormatter formatter = new JSONPayloadFormatter(); - TreeMessage message = (TreeMessage) formatter.format(buf).get(3); + TreeMessage message = (TreeMessage) formatter.format(topic, buf).get(3); assertEquals("root.sg.d2", message.getDevice()); assertEquals(Long.valueOf(1586076065526L), message.getTimestamp()); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java index 5651ca49b97d2..7bf9bce0702d3 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/protocol/mqtt/LinePayloadFormatterTest.java @@ -37,9 +37,10 @@ public void formatLine() { "test1,tag1=t1,tag2=t2 attr1=a1,attr2=a2 field1=\"value1\",field2=1i,field3=2u,field4=3i32,field5=t,field6=false,field7=4,field8=5f 1"; ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8); + String topic = ""; LinePayloadFormatter formatter = new LinePayloadFormatter(); - TableMessage message = (TableMessage) formatter.format(buf).get(0); + TableMessage message = (TableMessage) formatter.format(topic, buf).get(0); assertEquals("test1", message.getTable()); assertEquals(Long.valueOf(1L), message.getTimestamp()); @@ -64,9 +65,10 @@ public void formatBatchLine() { + "test2,tag3=t3,tag4=t4 attr3=a3,attr4=a4 field4=\"value4\",field5=10i,field6=10i32 2 "; ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8); + String topic = ""; LinePayloadFormatter formatter = new LinePayloadFormatter(); - TableMessage message = (TableMessage) formatter.format(buf).get(1); + TableMessage message = (TableMessage) formatter.format(topic, buf).get(1); assertEquals("test2", message.getTable()); assertEquals(Long.valueOf(2L), message.getTimestamp()); @@ -82,9 +84,10 @@ public void formatLineAnnotation() { + " # test2,tag3=t3,tag4=t4 attr3=a3,attr4=a4 field4=\"value4\",field5=10i,field6=10i32 2 "; ByteBuf buf = Unpooled.copiedBuffer(payload, StandardCharsets.UTF_8); + String topic = ""; LinePayloadFormatter formatter = new LinePayloadFormatter(); - List message = formatter.format(buf); + List message = formatter.format(topic, buf); assertEquals(1, message.size()); }