Skip to content

Commit 366fb38

Browse files
authored
Merge pull request #1349 from Altinity/feature/antalya-25.8/timezone_for_partitioning
Timezone for partitioning of Iceberg tables
2 parents 5e4ac41 + f30b2b2 commit 366fb38

12 files changed

Lines changed: 251 additions & 15 deletions

File tree

src/Core/Settings.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6934,6 +6934,15 @@ Throw an error if there are pending patch parts when exporting a merge tree part
69346934
)", 0) \
69356935
DECLARE(Bool, serialize_string_in_memory_with_zero_byte, true, R"(
69366936
Serialize String values during aggregation with zero byte at the end. Enable to keep compatibility when querying cluster of incompatible versions.
6937+
)", 0) \
6938+
DECLARE(Timezone, iceberg_partition_timezone, "", R"(
6939+
Time zone by which partitioning of Iceberg tables was performed.
6940+
Possible values:
6941+
6942+
- Any valid timezone, e.g. `Europe/Berlin`, `UTC` or `Zulu`
6943+
- `` (empty value) - use server or session timezone
6944+
6945+
Default value is empty.
69376946
)", 0) \
69386947
\
69396948
/* ####################################################### */ \

src/Core/SettingsChangesHistory.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
6565
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."},
6666
{"cluster_table_function_split_granularity", "file", "file", "New setting."},
6767
{"cluster_table_function_buckets_batch_size", 0, 0, "New setting."},
68+
{"iceberg_partition_timezone", "", "", "New setting."},
6869
{"export_merge_tree_part_throw_on_pending_mutations", true, true, "New setting."},
6970
{"export_merge_tree_part_throw_on_pending_patch_parts", true, true, "New setting."},
7071
});

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include <Analyzer/FunctionNode.h>
22
#include <Columns/ColumnsNumber.h>
3+
#include <Columns/ColumnString.h>
34
#include <Columns/IColumn_fwd.h>
45
#include <Core/ColumnWithTypeAndName.h>
56
#include <Core/ColumnsWithTypeAndName.h>
@@ -78,6 +79,7 @@ namespace Setting
7879
extern const SettingsUInt64 output_format_compression_level;
7980
extern const SettingsUInt64 output_format_compression_zstd_window_log;
8081
extern const SettingsBool write_full_path_in_iceberg_metadata;
82+
extern const SettingsTimezone iceberg_partition_timezone;
8183
}
8284

8385
namespace DataLakeStorageSetting
@@ -966,7 +968,7 @@ ChunkPartitioner::ChunkPartitioner(
966968

967969
auto & factory = FunctionFactory::instance();
968970

969-
auto transform_and_argument = Iceberg::parseTransformAndArgument(transform_name);
971+
auto transform_and_argument = Iceberg::parseTransformAndArgument(transform_name, context->getSettingsRef()[Setting::iceberg_partition_timezone]);
970972
if (!transform_and_argument)
971973
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown transform {}", transform_name);
972974

@@ -980,6 +982,7 @@ ChunkPartitioner::ChunkPartitioner(
980982
result_data_types.push_back(function->getReturnType(columns_for_function));
981983
functions.push_back(function);
982984
function_params.push_back(transform_and_argument->argument);
985+
function_time_zones.push_back(transform_and_argument->time_zone);
983986
columns_to_apply.push_back(column_name);
984987
}
985988
}
@@ -1016,6 +1019,14 @@ ChunkPartitioner::partitionChunk(const Chunk & chunk)
10161019
arguments.push_back(ColumnWithTypeAndName(const_column->clone(), type, "#"));
10171020
}
10181021
arguments.push_back(name_to_column[columns_to_apply[transform_ind]]);
1022+
if (function_time_zones[transform_ind].has_value())
1023+
{
1024+
auto type = std::make_shared<DataTypeString>();
1025+
auto column_value = ColumnString::create();
1026+
column_value->insert(*function_time_zones[transform_ind]);
1027+
auto const_column = ColumnConst::create(std::move(column_value), chunk.getNumRows());
1028+
arguments.push_back(ColumnWithTypeAndName(const_column->clone(), type, "PartitioningTimezone"));
1029+
}
10191030
auto result
10201031
= functions[transform_ind]->build(arguments)->execute(arguments, std::make_shared<DataTypeString>(), chunk.getNumRows(), false);
10211032
for (size_t i = 0; i < chunk.getNumRows(); ++i)

src/Storages/ObjectStorage/DataLakes/Iceberg/IcebergWrites.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ class ChunkPartitioner
195195

196196
std::vector<FunctionOverloadResolverPtr> functions;
197197
std::vector<std::optional<size_t>> function_params;
198+
std::vector<std::optional<String>> function_time_zones;
198199
std::vector<String> columns_to_apply;
199200
std::vector<DataTypePtr> result_data_types;
200201
};

src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@
55
#include <compare>
66
#include <optional>
77

8+
#include <Interpreters/Context.h>
89
#include <Interpreters/IcebergMetadataLog.h>
910

1011
#include <Storages/ObjectStorage/DataLakes/Iceberg/Constant.h>
1112
#include <Storages/ObjectStorage/DataLakes/Iceberg/ManifestFile.h>
1213
#include <Storages/ObjectStorage/DataLakes/Iceberg/ManifestFilesPruning.h>
1314
#include <Storages/ObjectStorage/Utils.h>
1415

16+
#include <Core/Settings.h>
1517
#include <Core/TypeId.h>
1618
#include <DataTypes/DataTypesDecimal.h>
1719
#include <Poco/JSON/Parser.h>
@@ -32,6 +34,11 @@ namespace DB::ErrorCodes
3234
extern const int BAD_ARGUMENTS;
3335
}
3436

37+
namespace DB::Setting
38+
{
39+
extern const SettingsTimezone iceberg_partition_timezone;
40+
}
41+
3542
namespace DB::Iceberg
3643
{
3744

@@ -217,7 +224,7 @@ ManifestFileContent::ManifestFileContent(
217224
auto transform_name = partition_specification_field->getValue<String>(f_partition_transform);
218225
auto partition_name = partition_specification_field->getValue<String>(f_partition_name);
219226
common_partition_specification.emplace_back(source_id, transform_name, partition_name);
220-
auto partition_ast = getASTFromTransform(transform_name, numeric_column_name);
227+
auto partition_ast = getASTFromTransform(transform_name, numeric_column_name, context->getSettingsRef()[Setting::iceberg_partition_timezone]);
221228
/// Unsupported partition key expression
222229
if (partition_ast == nullptr)
223230
continue;

src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFilesPruning.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ using namespace DB;
2626
namespace DB::Iceberg
2727
{
2828

29-
DB::ASTPtr getASTFromTransform(const String & transform_name_src, const String & column_name)
29+
DB::ASTPtr getASTFromTransform(const String & transform_name_src, const String & column_name, const String & time_zone)
3030
{
31-
auto transform_and_argument = parseTransformAndArgument(transform_name_src);
31+
auto transform_and_argument = parseTransformAndArgument(transform_name_src, time_zone);
3232
if (!transform_and_argument)
3333
{
3434
LOG_WARNING(&Poco::Logger::get("Iceberg Partition Pruning"), "Cannot parse iceberg transform name: {}.", transform_name_src);
@@ -47,6 +47,13 @@ DB::ASTPtr getASTFromTransform(const String & transform_name_src, const String &
4747
return makeASTFunction(
4848
transform_and_argument->transform_name, std::make_shared<DB::ASTLiteral>(*transform_and_argument->argument), std::make_shared<DB::ASTIdentifier>(column_name));
4949
}
50+
if (transform_and_argument->time_zone)
51+
{
52+
return makeASTFunction(
53+
transform_and_argument->transform_name,
54+
std::make_shared<DB::ASTIdentifier>(column_name),
55+
std::make_shared<DB::ASTLiteral>(*transform_and_argument->time_zone));
56+
}
5057
return makeASTFunction(transform_and_argument->transform_name, std::make_shared<DB::ASTIdentifier>(column_name));
5158
}
5259

src/Storages/ObjectStorage/DataLakes/Iceberg/ManifestFilesPruning.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ namespace DB::Iceberg
3030
struct ManifestFileEntry;
3131
class ManifestFileContent;
3232

33-
DB::ASTPtr getASTFromTransform(const String & transform_name_src, const String & column_name);
33+
DB::ASTPtr getASTFromTransform(const String & transform_name_src, const String & column_name, const String & time_zone);
3434

3535
/// Prune specific data files based on manifest content
3636
class ManifestFilesPruner

src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.cpp

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ namespace ProfileEvents
7474
namespace DB::Setting
7575
{
7676
extern const SettingsUInt64 output_format_compression_level;
77+
extern const SettingsTimezone iceberg_partition_timezone;
7778
}
7879

7980
namespace DB::Iceberg
@@ -112,27 +113,32 @@ void writeMessageToFile(
112113
}
113114
}
114115

115-
std::optional<TransformAndArgument> parseTransformAndArgument(const String & transform_name_src)
116+
117+
std::optional<TransformAndArgument> parseTransformAndArgument(const String & transform_name_src, const String & time_zone)
116118
{
117119
std::string transform_name = Poco::toLower(transform_name_src);
118120

121+
std::optional<String> time_zone_opt;
122+
if (!time_zone.empty())
123+
time_zone_opt = time_zone;
124+
119125
if (transform_name == "year" || transform_name == "years")
120-
return TransformAndArgument{"toYearNumSinceEpoch", std::nullopt};
126+
return TransformAndArgument{"toYearNumSinceEpoch", std::nullopt, time_zone_opt};
121127

122128
if (transform_name == "month" || transform_name == "months")
123-
return TransformAndArgument{"toMonthNumSinceEpoch", std::nullopt};
129+
return TransformAndArgument{"toMonthNumSinceEpoch", std::nullopt, time_zone_opt};
124130

125131
if (transform_name == "day" || transform_name == "date" || transform_name == "days" || transform_name == "dates")
126-
return TransformAndArgument{"toRelativeDayNum", std::nullopt};
132+
return TransformAndArgument{"toRelativeDayNum", std::nullopt, time_zone_opt};
127133

128134
if (transform_name == "hour" || transform_name == "hours")
129-
return TransformAndArgument{"toRelativeHourNum", std::nullopt};
135+
return TransformAndArgument{"toRelativeHourNum", std::nullopt, time_zone_opt};
130136

131137
if (transform_name == "identity")
132-
return TransformAndArgument{"identity", std::nullopt};
138+
return TransformAndArgument{"identity", std::nullopt, std::nullopt};
133139

134140
if (transform_name == "void")
135-
return TransformAndArgument{"tuple", std::nullopt};
141+
return TransformAndArgument{"tuple", std::nullopt, std::nullopt};
136142

137143
if (transform_name.starts_with("truncate") || transform_name.starts_with("bucket"))
138144
{
@@ -156,11 +162,11 @@ std::optional<TransformAndArgument> parseTransformAndArgument(const String & tra
156162

157163
if (transform_name.starts_with("truncate"))
158164
{
159-
return TransformAndArgument{"icebergTruncate", argument};
165+
return TransformAndArgument{"icebergTruncate", argument, std::nullopt};
160166
}
161167
else if (transform_name.starts_with("bucket"))
162168
{
163-
return TransformAndArgument{"icebergBucket", argument};
169+
return TransformAndArgument{"icebergBucket", argument, std::nullopt};
164170
}
165171
}
166172
return std::nullopt;

src/Storages/ObjectStorage/DataLakes/Iceberg/Utils.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,14 @@ struct TransformAndArgument
3838
{
3939
String transform_name;
4040
std::optional<size_t> argument;
41+
/// When Iceberg table is partitioned by time, splitting by partitions can be made using different timezone
42+
/// (UTC in most cases). This timezone can be set with setting `iceberg_partition_timezone`, value is in this member.
43+
/// When Iceberg partition condition converted to ClickHouse function in `parseTransformAndArgument` method
44+
/// `time_zone` added as second argument to functions like `toRelativeDayNum`, `toYearNumSinceEpoch`, etc.
45+
std::optional<String> time_zone;
4146
};
4247

43-
std::optional<TransformAndArgument> parseTransformAndArgument(const String & transform_name_src);
48+
std::optional<TransformAndArgument> parseTransformAndArgument(const String & transform_name_src, const String & time_zone);
4449

4550
Poco::JSON::Object::Ptr getMetadataJSONObject(
4651
const String & metadata_file_path,
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<clickhouse>
2+
<profiles>
3+
<default>
4+
<iceberg_partition_timezone>UTC</iceberg_partition_timezone>
5+
</default>
6+
</profiles>
7+
</clickhouse>

0 commit comments

Comments
 (0)