Skip to content

Commit 860ce60

Browse files
committed
implement create/drop table/db for paimon
1 parent accd342 commit 860ce60

8 files changed

Lines changed: 864 additions & 38 deletions

File tree

fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOperations.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.doris.datasource.hive.HMSExternalCatalog;
2222
import org.apache.doris.datasource.hive.HiveMetadataOps;
2323
import org.apache.doris.datasource.iceberg.IcebergMetadataOps;
24+
import org.apache.doris.datasource.paimon.PaimonMetadataOps;
2425

2526
import org.apache.hadoop.hive.conf.HiveConf;
2627
import org.apache.iceberg.catalog.Catalog;
@@ -35,4 +36,9 @@ public static HiveMetadataOps newHiveMetadataOps(HiveConf hiveConf, HMSExternalC
3536
public static IcebergMetadataOps newIcebergMetadataOps(ExternalCatalog dorisCatalog, Catalog catalog) {
3637
return new IcebergMetadataOps(dorisCatalog, catalog);
3738
}
39+
40+
public static PaimonMetadataOps newPaimonMetaOps(ExternalCatalog dorisCatalog,
41+
org.apache.paimon.catalog.Catalog catalog) {
42+
return new PaimonMetadataOps(dorisCatalog, catalog);
43+
}
3844
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.datasource.paimon;
19+
20+
import org.apache.doris.catalog.ArrayType;
21+
import org.apache.doris.catalog.MapType;
22+
import org.apache.doris.catalog.PrimitiveType;
23+
import org.apache.doris.catalog.ScalarType;
24+
import org.apache.doris.catalog.StructField;
25+
import org.apache.doris.catalog.StructType;
26+
import org.apache.doris.catalog.Type;
27+
import org.apache.doris.datasource.DorisTypeVisitor;
28+
29+
import org.apache.paimon.types.BigIntType;
30+
import org.apache.paimon.types.BooleanType;
31+
import org.apache.paimon.types.DataField;
32+
import org.apache.paimon.types.DataType;
33+
import org.apache.paimon.types.DateType;
34+
import org.apache.paimon.types.DecimalType;
35+
import org.apache.paimon.types.DoubleType;
36+
import org.apache.paimon.types.FloatType;
37+
import org.apache.paimon.types.IntType;
38+
import org.apache.paimon.types.RowType;
39+
import org.apache.paimon.types.TimestampType;
40+
import org.apache.paimon.types.VarBinaryType;
41+
import org.apache.paimon.types.VarCharType;
42+
import org.apache.paimon.types.VariantType;
43+
44+
import java.util.ArrayList;
45+
import java.util.List;
46+
import java.util.concurrent.atomic.AtomicInteger;
47+
48+
public class DorisToPaimonTypeVisitor extends DorisTypeVisitor<DataType> {
49+
50+
@Override
51+
public DataType struct(StructType struct, List<DataType> fieldResults) {
52+
List<StructField> fields = struct.getFields();
53+
List<DataField> newFields = new ArrayList<>(fields.size());
54+
AtomicInteger atomicInteger = new AtomicInteger(-1);
55+
for (int i = 0; i < fields.size(); i++) {
56+
StructField field = fields.get(i);
57+
DataType fieldType = fieldResults.get(i).copy(field.getContainsNull());
58+
String comment = field.getComment();
59+
DataField dataField = new DataField(atomicInteger.incrementAndGet(), field.getName(), fieldType, comment);
60+
newFields.add(dataField);
61+
}
62+
return new RowType(newFields);
63+
}
64+
65+
@Override
66+
public DataType field(StructField field, DataType typeResult) {
67+
return typeResult;
68+
}
69+
70+
@Override
71+
public DataType array(ArrayType array, DataType elementResult) {
72+
return new org.apache.paimon.types.ArrayType(elementResult.copy(array.getContainsNull()));
73+
}
74+
75+
@Override
76+
public DataType map(MapType map, DataType keyResult, DataType valueResult) {
77+
return new org.apache.paimon.types.MapType(keyResult.copy(false),
78+
valueResult.copy(map.getIsValueContainsNull()));
79+
}
80+
81+
@Override
82+
public DataType atomic(Type atomic) {
83+
PrimitiveType primitiveType = atomic.getPrimitiveType();
84+
if (primitiveType.equals(PrimitiveType.BOOLEAN)) {
85+
return new BooleanType();
86+
} else if (primitiveType.equals(PrimitiveType.INT)) {
87+
return new IntType();
88+
} else if (primitiveType.equals(PrimitiveType.BIGINT)) {
89+
return new BigIntType();
90+
} else if (primitiveType.equals(PrimitiveType.FLOAT)) {
91+
return new FloatType();
92+
} else if (primitiveType.equals(PrimitiveType.DOUBLE)) {
93+
return new DoubleType();
94+
} else if (primitiveType.isCharFamily()) {
95+
return new VarCharType(VarCharType.MAX_LENGTH);
96+
} else if (primitiveType.equals(PrimitiveType.DATE) || primitiveType.equals(PrimitiveType.DATEV2)) {
97+
return new DateType();
98+
} else if (primitiveType.equals(PrimitiveType.DECIMALV2) || primitiveType.isDecimalV3Type()) {
99+
return new DecimalType(((ScalarType) atomic).getScalarPrecision(), ((ScalarType) atomic).getScalarScale());
100+
} else if (primitiveType.equals(PrimitiveType.DATETIME) || primitiveType.equals(PrimitiveType.DATETIMEV2)) {
101+
return new TimestampType();
102+
} else if (primitiveType.isVarbinaryType()) {
103+
return new VarBinaryType(VarBinaryType.MAX_LENGTH);
104+
} else if (primitiveType.isVariantType()) {
105+
return new VariantType();
106+
}
107+
throw new UnsupportedOperationException("Not a supported type: " + primitiveType);
108+
}
109+
}

fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonExternalCatalog.java

Lines changed: 4 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,13 @@
2323
import org.apache.doris.datasource.InitCatalogLog;
2424
import org.apache.doris.datasource.NameMapping;
2525
import org.apache.doris.datasource.SessionContext;
26+
import org.apache.doris.datasource.operations.ExternalMetadataOperations;
2627
import org.apache.doris.datasource.property.metastore.AbstractPaimonProperties;
2728

2829
import org.apache.commons.lang3.exception.ExceptionUtils;
2930
import org.apache.logging.log4j.LogManager;
3031
import org.apache.logging.log4j.Logger;
3132
import org.apache.paimon.catalog.Catalog;
32-
import org.apache.paimon.catalog.Catalog.TableNotExistException;
3333
import org.apache.paimon.catalog.Identifier;
3434
import org.apache.paimon.partition.Partition;
3535

@@ -62,6 +62,7 @@ protected void initLocalObjectsImpl() {
6262
catalogType = paimonProperties.getPaimonCatalogType();
6363
catalog = createCatalog();
6464
initPreExecutionAuthenticator();
65+
metadataOps = ExternalMetadataOperations.newPaimonMetaOps(this, catalog);
6566
}
6667

6768
@Override
@@ -76,49 +77,16 @@ public String getCatalogType() {
7677
return catalogType;
7778
}
7879

79-
protected List<String> listDatabaseNames() {
80-
try {
81-
return executionAuthenticator.execute(() -> new ArrayList<>(catalog.listDatabases()));
82-
} catch (Exception e) {
83-
throw new RuntimeException("Failed to list databases names, catalog name: " + getName(), e);
84-
}
85-
}
86-
8780
@Override
8881
public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
8982
makeSureInitialized();
90-
try {
91-
return executionAuthenticator.execute(() -> {
92-
try {
93-
catalog.getTable(Identifier.create(dbName, tblName));
94-
return true;
95-
} catch (TableNotExistException e) {
96-
return false;
97-
}
98-
});
99-
100-
} catch (Exception e) {
101-
throw new RuntimeException("Failed to check table existence, catalog name: " + getName()
102-
+ "error message is:" + ExceptionUtils.getRootCauseMessage(e), e);
103-
}
83+
return metadataOps.tableExist(dbName, tblName);
10484
}
10585

10686
@Override
10787
public List<String> listTableNames(SessionContext ctx, String dbName) {
10888
makeSureInitialized();
109-
try {
110-
return executionAuthenticator.execute(() -> {
111-
List<String> tableNames = null;
112-
try {
113-
tableNames = catalog.listTables(dbName);
114-
} catch (Catalog.DatabaseNotExistException e) {
115-
LOG.warn("DatabaseNotExistException", e);
116-
}
117-
return tableNames;
118-
});
119-
} catch (Exception e) {
120-
throw new RuntimeException("Failed to list table names, catalog name: " + getName(), e);
121-
}
89+
return metadataOps.listTableNames(dbName);
12290
}
12391

12492
public List<Partition> getPaimonPartitions(NameMapping nameMapping) {

0 commit comments

Comments
 (0)