Skip to content

Commit 82a4fbd

Browse files
committed
draft of getInMemoryMetadata / getStorageSnapshot for TieredDistributedMerge
1 parent 73c5f0e commit 82a4fbd

2 files changed

Lines changed: 150 additions & 8 deletions

File tree

src/Storages/StorageDistributed.cpp

Lines changed: 141 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <DataTypes/DataTypeString.h>
2121
#include <DataTypes/ObjectUtils.h>
2222
#include <DataTypes/NestedUtils.h>
23+
#include <DataTypes/getLeastSupertype.h>
2324

2425
#include <Disks/IVolume.h>
2526

@@ -28,6 +29,7 @@
2829
#include <Storages/StorageFactory.h>
2930
#include <Storages/AlterCommands.h>
3031
#include <Storages/getStructureOfRemoteTable.h>
32+
#include <TableFunctions/TableFunctionFactory.h>
3133
#include <Storages/checkAndGetLiteralArgument.h>
3234
#include <Storages/StorageDummy.h>
3335
#include <Storages/removeGroupingFunctionSpecializations.h>
@@ -765,11 +767,6 @@ static bool requiresObjectColumns(const ColumnsDescription & all_columns, ASTPtr
765767
return false;
766768
}
767769

768-
StorageSnapshotPtr StorageDistributed::getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const
769-
{
770-
/// TODO: support additional table functions
771-
return getStorageSnapshotForQuery(metadata_snapshot, nullptr, query_context);
772-
}
773770

774771
/// TODO: support additional table functions
775772
StorageSnapshotPtr StorageDistributed::getStorageSnapshotForQuery(
@@ -1188,15 +1185,15 @@ void StorageDistributed::read(
11881185
auto table_function = TableFunctionFactory::instance().get(additional_table_functions[i].table_function_ast, local_context);
11891186
if (!table_function)
11901187
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid table function in TieredDistributedMerge engine");
1191-
1188+
11921189
storage = table_function->execute(
11931190
additional_table_functions[i].table_function_ast,
11941191
local_context,
11951192
getStorageID().table_name, // Use the current table name
11961193
{}, // columns - will be determined from storage
11971194
false, // use_global_context = false
11981195
false); // is_insert_query = false
1199-
1196+
12001197
// Handle StorageTableFunctionProxy if present
12011198
if (auto proxy = std::dynamic_pointer_cast<StorageTableFunctionProxy>(storage))
12021199
{
@@ -2424,7 +2421,7 @@ void registerStorageTieredDistributedMerge(StorageFactory & factory)
24242421
throw Exception(ErrorCodes::BAD_ARGUMENTS,
24252422
"Argument #{} must be a valid SQL expression: {}", i + 1, e.message());
24262423
}
2427-
2424+
24282425
// Validate table function or table identifier
24292426
if (const auto * func = table_function_ast->as<ASTFunction>())
24302427
{
@@ -2545,4 +2542,140 @@ bool StorageDistributed::initializeDiskOnConfigChange(const std::set<String> & n
25452542

25462543
return true;
25472544
}
2545+
2546+
StorageInMemoryMetadata StorageDistributed::getInMemoryMetadata() const
2547+
{
2548+
if (additional_table_functions.empty())
2549+
{
2550+
// For regular Distributed engine, use base implementation
2551+
return IStorage::getInMemoryMetadata();
2552+
}
2553+
2554+
// For TieredDistributedMerge engine, merge schemas from all layers
2555+
auto metadata = IStorage::getInMemoryMetadata();
2556+
2557+
// Get merged columns from all layers
2558+
auto merged_columns = getColumnsDescriptionFromLayers(getContext());
2559+
if (!merged_columns.empty())
2560+
{
2561+
metadata.setColumns(merged_columns);
2562+
}
2563+
2564+
// Note: Virtual columns for TieredDistributedMerge should be set in constructor
2565+
// or in a non-const method
2566+
2567+
return metadata;
2568+
}
2569+
2570+
StorageSnapshotPtr StorageDistributed::getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const
2571+
{
2572+
if (additional_table_functions.empty())
2573+
{
2574+
// For regular Distributed engine, use existing implementation
2575+
return getStorageSnapshotForQuery(metadata_snapshot, nullptr, query_context);
2576+
}
2577+
2578+
// For TieredDistributedMerge engine, create snapshot with virtual columns
2579+
auto snapshot_data = std::make_unique<SnapshotData>();
2580+
2581+
if (!requiresObjectColumns(metadata_snapshot->getColumns(), nullptr))
2582+
return std::make_shared<StorageSnapshot>(*this, metadata_snapshot, ColumnsDescription{}, std::move(snapshot_data));
2583+
2584+
// For Object columns, we need to collect objects from all layers
2585+
snapshot_data->objects_by_shard = getExtendedObjectsOfRemoteTables(
2586+
*getCluster(),
2587+
StorageID{remote_database, remote_table},
2588+
metadata_snapshot->getColumns(),
2589+
getContext());
2590+
2591+
auto object_columns = DB::getConcreteObjectColumns(
2592+
snapshot_data->objects_by_shard.begin(),
2593+
snapshot_data->objects_by_shard.end(),
2594+
metadata_snapshot->getColumns(),
2595+
[](const auto & shard_num_and_columns) -> const auto & { return shard_num_and_columns.second; });
2596+
2597+
return std::make_shared<StorageSnapshot>(*this, metadata_snapshot, std::move(object_columns), std::move(snapshot_data));
2598+
}
2599+
2600+
ColumnsDescription StorageDistributed::getColumnsDescriptionFromLayers(const ContextPtr & query_context) const
2601+
{
2602+
ColumnsDescription result;
2603+
2604+
// Start with the main distributed table columns
2605+
auto main_metadata = IStorage::getInMemoryMetadata();
2606+
result = main_metadata.getColumns();
2607+
2608+
// Add columns from additional table functions
2609+
for (const auto & layer : additional_table_functions)
2610+
{
2611+
try
2612+
{
2613+
auto layer_snapshot = getStorageSnapshotForLayer(layer, query_context);
2614+
if (layer_snapshot)
2615+
{
2616+
auto layer_columns = layer_snapshot->getAllColumnsDescription();
2617+
2618+
// Merge columns using supertype logic (similar to StorageMerge)
2619+
for (const auto & column : layer_columns)
2620+
{
2621+
if (!result.has(column.name))
2622+
{
2623+
result.add(column);
2624+
}
2625+
else if (column != result.get(column.name))
2626+
{
2627+
result.modify(column.name, [&column](ColumnDescription & existing)
2628+
{
2629+
existing.type = getLeastSupertypeOrVariant(DataTypes{existing.type, column.type});
2630+
if (existing.default_desc != column.default_desc)
2631+
existing.default_desc = {};
2632+
});
2633+
}
2634+
}
2635+
}
2636+
}
2637+
catch (const Exception & e)
2638+
{
2639+
LOG_WARNING(log, "Failed to get schema from layer {}: {}", layer.table_function_ast->formatForErrorMessage(), e.message());
2640+
}
2641+
}
2642+
2643+
return result;
2644+
}
2645+
2646+
StorageSnapshotPtr StorageDistributed::getStorageSnapshotForLayer(const TableFunctionEntry & layer, const ContextPtr & query_context) const
2647+
{
2648+
try
2649+
{
2650+
if (layer.storage_id.has_value())
2651+
{
2652+
// It's a table identifier - get storage directly
2653+
auto storage = DatabaseCatalog::instance().getTable(layer.storage_id.value(), query_context);
2654+
return storage->getStorageSnapshot(storage->getInMemoryMetadataPtr(), query_context);
2655+
}
2656+
else
2657+
{
2658+
// It's a table function - execute it to get storage
2659+
auto table_function = TableFunctionFactory::instance().get(layer.table_function_ast, query_context);
2660+
auto storage = table_function->execute(layer.table_function_ast, query_context, "");
2661+
return storage->getStorageSnapshot(storage->getInMemoryMetadataPtr(), query_context);
2662+
}
2663+
}
2664+
catch (const Exception & e)
2665+
{
2666+
LOG_WARNING(log, "Failed to get storage snapshot for layer: {}", e.message());
2667+
return nullptr;
2668+
}
2669+
}
2670+
2671+
VirtualColumnsDescription StorageDistributed::createVirtualsForTieredDistributedMerge() const
2672+
{
2673+
auto desc = createVirtuals(); // Get base virtuals from regular Distributed
2674+
2675+
// Add _table_index virtual column for TieredDistributedMerge
2676+
desc.addEphemeral("_table_index", std::make_shared<DataTypeUInt32>(), "Index of the table layer (0 for main layer, 1+ for additional layers)");
2677+
2678+
return desc;
2679+
}
2680+
25482681
}

src/Storages/StorageDistributed.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,9 @@ class StorageDistributed final : public IStorage, WithContext
120120
StorageSnapshotPtr getStorageSnapshotForQuery(
121121
const StorageMetadataPtr & metadata_snapshot, const ASTPtr & query, ContextPtr query_context) const override;
122122

123+
/// Override for TieredDistributedMerge to merge schemas from all layers
124+
StorageInMemoryMetadata getInMemoryMetadata() const override;
125+
123126
QueryProcessingStage::Enum
124127
getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;
125128

@@ -325,6 +328,12 @@ class StorageDistributed final : public IStorage, WithContext
325328

326329
/// Additional table functions for TieredDistributedMerge engine
327330
std::vector<TableFunctionEntry> additional_table_functions;
331+
332+
private:
333+
/// Helper methods for TieredDistributedMerge
334+
ColumnsDescription getColumnsDescriptionFromLayers(const ContextPtr & query_context) const;
335+
StorageSnapshotPtr getStorageSnapshotForLayer(const TableFunctionEntry & layer, const ContextPtr & query_context) const;
336+
VirtualColumnsDescription createVirtualsForTieredDistributedMerge() const;
328337
};
329338

330339
}

0 commit comments

Comments
 (0)