From 10de685fd3d048d35c04c5ba5aa33b686e63fd4b Mon Sep 17 00:00:00 2001 From: Andrey Zvonov <32552679+zvonand@users.noreply.github.com> Date: Mon, 18 May 2026 17:54:03 +0200 Subject: [PATCH 1/2] Cherry-pick of https://github.com/Altinity/ClickHouse/pull/1751 with unresolved conflict markers (resolution in next commit) --- Original cherry-pick message follows: Merge pull request #1751 from Altinity/feature/antalya-26.3/pr-1631 Antalya 26.3: Fix condition for using parquet metadata cache # Conflicts: # src/Storages/ObjectStorage/StorageObjectStorageSource.cpp # tests/integration/test_storage_iceberg_with_spark/test_read_constant_columns_optimization.py --- .../StorageObjectStorageSource.cpp | 5 + ...test_read_constant_columns_optimization.py | 289 ++++++++++++++++++ 2 files changed, 294 insertions(+) create mode 100644 tests/integration/test_storage_iceberg_with_spark/test_read_constant_columns_optimization.py diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index 5acfd7ef7a0c..e6bc7e431302 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -802,8 +802,13 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade logIcebergFileStats(object_info, log); InputFormatPtr input_format; +<<<<<<< HEAD if (context_->getSettingsRef()[Setting::use_parquet_metadata_cache] && (Poco::toLower(object_info->getFileFormat().value_or(configuration->format)) == "parquet") +======= + if (context_->getSettingsRef()[Setting::use_parquet_metadata_cache] && use_native_reader_v3 + && (Poco::toLower(object_info->getFileFormat().value_or(configuration->getFormat())) == "parquet") +>>>>>>> 7e5f7d649ab (Merge pull request #1751 from Altinity/feature/antalya-26.3/pr-1631) && !object_info->getObjectMetadata()->etag.empty()) { std::optional object_with_metadata = object_info->relative_path_with_metadata; diff --git a/tests/integration/test_storage_iceberg_with_spark/test_read_constant_columns_optimization.py b/tests/integration/test_storage_iceberg_with_spark/test_read_constant_columns_optimization.py new file mode 100644 index 000000000000..58ac641f0074 --- /dev/null +++ b/tests/integration/test_storage_iceberg_with_spark/test_read_constant_columns_optimization.py @@ -0,0 +1,289 @@ +import pytest + +from helpers.iceberg_utils import ( + get_uuid_str, + get_creation_expression, + execute_spark_query_general, +) + + +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) +@pytest.mark.parametrize("run_on_cluster", [False, True]) +def test_read_constant_columns_optimization(started_cluster_iceberg_with_spark, storage_type, run_on_cluster): + instance = started_cluster_iceberg_with_spark.instances["node1"] + spark = started_cluster_iceberg_with_spark.spark_session + TABLE_NAME = "test_read_constant_columns_optimization_" + storage_type + "_" + get_uuid_str() + + def execute_spark_query(query: str): + return execute_spark_query_general( + spark, + started_cluster_iceberg_with_spark, + storage_type, + TABLE_NAME, + query, + ) + + execute_spark_query( + f""" + CREATE TABLE {TABLE_NAME} ( + tag INT, + date DATE, + date2 DATE, + name VARCHAR(50), + number BIGINT + ) + USING iceberg + PARTITIONED BY (identity(tag), years(date)) + OPTIONS('format-version'='2') + """ + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME} VALUES + (1, DATE '2024-01-20', DATE '2024-01-20', 'vasya', 5), + (1, DATE '2024-01-20', DATE '2024-01-20', 'vasilisa', 5), + (1, DATE '2025-01-20', DATE '2025-01-20', 'vasya', 5), + (1, DATE '2025-01-20', DATE '2025-01-20', 'vasya', 5), + (2, DATE '2025-01-20', DATE '2025-01-20', 'vasilisa', 5), + (2, DATE '2025-01-21', DATE '2025-01-20', 'vasilisa', 5) + """ + ) + + execute_spark_query( + f""" + ALTER TABLE {TABLE_NAME} ALTER COLUMN number FIRST; + """ + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME} VALUES + (5, 3, DATE '2025-01-20', DATE '2024-01-20', 'vasilisa'), + (5, 3, DATE '2025-01-20', DATE '2025-01-20', 'vasilisa') + """ + ) + + execute_spark_query( + f""" + ALTER TABLE {TABLE_NAME} RENAME COLUMN name TO name_old; + """ + ) + + execute_spark_query( + f""" + ALTER TABLE {TABLE_NAME} + ADD COLUMNS ( + name string + ); + """ + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME} VALUES + (5, 4, DATE '2025-01-20', DATE '2024-01-20', 'vasya', 'iceberg'), + (5, 4, DATE '2025-01-20', DATE '2025-01-20', 'vasilisa', 'iceberg'), + (5, 5, DATE '2025-01-20', DATE '2024-01-20', 'vasya', 'iceberg'), + (5, 5, DATE '2025-01-20', DATE '2024-01-20', 'vasilisa', 'icebreaker'), + (5, 6, DATE '2025-01-20', DATE '2024-01-20', 'vasya', 'iceberg'), + (5, 6, DATE '2025-01-20', DATE '2024-01-20', 'vasya', 'iceberg') + """ + ) + + # Totally must be 7 files + # Partitioned column 'tag' is constant in each file + # Column 'date' is constant in 6 files, has different values in (2-2025) + # Column 'date2' is constant in 4 files (1-2024, 2-2025, 5-2025, 6-2025) + # Column 'name_old' is constant in 3 files (1-2025, 2-2025 as 'name', 6-2025 as 'name_old') + # Column 'number' is globally constant + # New column 'name2' is present only in 3 files (4-2025, 5-2025, 6-2025), constant in two (4-2025, 6-2025) + # Files 1-2025 and 6-2025 have only constant columns + + creation_expression = get_creation_expression( + storage_type, TABLE_NAME, started_cluster_iceberg_with_spark, table_function=True, run_on_cluster=run_on_cluster + ) + + # Warm up metadata cache + for replica in started_cluster_iceberg_with_spark.instances.values(): + replica.query(f"SELECT * FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0") + + all_data_expected_query_id = get_uuid_str() + all_data_expected = instance.query( + f"SELECT * FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0", + query_id=all_data_expected_query_id, + ) + const_only_expected_query_id = get_uuid_str() + const_only_expected = instance.query( + f"SELECT tag, number FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0", + query_id=const_only_expected_query_id, + ) + const_partial_expected_query_id = get_uuid_str() + const_partial_expected = instance.query( + f"SELECT tag, date2, number, name_old FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0", + query_id=const_partial_expected_query_id, + ) + const_partial2_expected_query_id = get_uuid_str() + const_partial2_expected = instance.query( + f"SELECT tag, date2, number, name FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0", + query_id=const_partial2_expected_query_id, + ) + count_expected_query_id = get_uuid_str() + count_expected = instance.query( + f"SELECT count(),tag FROM {creation_expression} GROUP BY ALL ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0", + query_id=count_expected_query_id, + ) + + all_data_query_id = get_uuid_str() + all_data_optimized = instance.query( + f"SELECT * FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1", + query_id=all_data_query_id, + ) + const_only_query_id = get_uuid_str() + const_only_optimized = instance.query( + f"SELECT tag, number FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1", + query_id=const_only_query_id, + ) + const_partial_query_id = get_uuid_str() + const_partial_optimized = instance.query( + f"SELECT tag, date2, number, name_old FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1", + query_id=const_partial_query_id, + ) + const_partial2_query_id = get_uuid_str() + const_partial2_optimized = instance.query( + f"SELECT tag, date2, number, name FROM {creation_expression} ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1", + query_id=const_partial2_query_id, + ) + count_query_id = get_uuid_str() + count_optimized = instance.query( + f"SELECT count(),tag FROM {creation_expression} GROUP BY ALL ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1", + query_id=count_query_id, + ) + + assert all_data_expected == all_data_optimized + assert const_only_expected == const_only_optimized + assert const_partial_expected == const_partial_optimized + assert const_partial2_expected == const_partial2_optimized + assert count_expected == count_optimized + + for replica in started_cluster_iceberg_with_spark.instances.values(): + replica.query("SYSTEM FLUSH LOGS") + + # Number of object-get requests per data file that are NOT served from caches + # after the warmup query above. The parquet metadata cache (enabled by default) + # caches the parquet footer keyed by the object's etag; the warmup query then + # populates it, so any subsequent read of the same file skips one object-get + # (the footer read). However, AzureObjectStorage::getObjectMetadata does NOT + # populate etag, so the cache guard `!etag.empty()` in + # StorageObjectStorageSource::createReader always fails for Azure, and the + # cache path is never taken there. As a result the multiplier is: + # S3: 2 (footer served from cache, data-only gets remain) + # Azure: 3 (cache never engaged, footer + data gets) + per_file_gets = 2 if storage_type == "s3" else 3 + + def check_events(query_id, event, is_cluster, expected): + res = instance.query( + f""" + SELECT + sum(tupleElement(arrayJoin(ProfileEvents),2)) as value + FROM + clusterAllReplicas('cluster_simple', system.query_log) + WHERE + type='QueryFinish' + AND tupleElement(arrayJoin(ProfileEvents),1)='{event}' + AND initial_query_id='{query_id}' + GROUP BY ALL + FORMAT CSV + """) + assert int(res) == expected + + # Each file contains one row group, so number of reded row groups is equal to readed data files + event = "ParquetReadRowGroups" + + # Without optimization clickhouse reads all 7 files + check_events(all_data_expected_query_id, event, run_on_cluster, 7) + check_events(const_only_expected_query_id, event, run_on_cluster, 7) + check_events(const_partial_expected_query_id, event, run_on_cluster, 7) + check_events(const_partial2_expected_query_id, event, run_on_cluster, 7) + check_events(count_expected_query_id, event, run_on_cluster, 7) + + # If file has only constant columns it is not read + check_events(all_data_query_id, event, run_on_cluster, 5) # 1-2025, 6-2025 must not be read + check_events(const_only_query_id, event, run_on_cluster, 0) # All must not be read + check_events(const_partial_query_id, event, run_on_cluster, 4) # 1-2025, 6-2025 and 2-2025 must not be read + check_events(const_partial2_query_id, event, run_on_cluster, 3) # 6-2025 must not be read, 1-2024, 1-2025, 2-2025 don't have new column 'name' + check_events(count_query_id, event, run_on_cluster, 0) # All must not be read + + def compare_selects(query): + result_expected = instance.query(f"{query} SETTINGS allow_experimental_iceberg_read_optimization=0") + result_optimized = instance.query(f"{query} SETTINGS allow_experimental_iceberg_read_optimization=1") + assert result_expected == result_optimized + + compare_selects(f"SELECT _path,* FROM {creation_expression} ORDER BY ALL") + compare_selects(f"SELECT _path,* FROM {creation_expression} WHERE name_old='vasily' ORDER BY ALL") + compare_selects(f"SELECT _path,* FROM {creation_expression} WHERE ((tag + length(name_old)) % 2 = 1) ORDER BY ALL") + + +@pytest.mark.parametrize("storage_type", ["s3", "azure"]) +@pytest.mark.parametrize("run_on_cluster", [False, True]) +def test_read_constant_columns_optimization_view(started_cluster_iceberg_with_spark, storage_type, run_on_cluster): + instance = started_cluster_iceberg_with_spark.instances["node1"] + spark = started_cluster_iceberg_with_spark.spark_session + TABLE_NAME = "test_read_constant_columns_optimization_view_" + storage_type + "_" + get_uuid_str() + + def execute_spark_query(query: str): + return execute_spark_query_general( + spark, + started_cluster_iceberg_with_spark, + storage_type, + TABLE_NAME, + query, + ) + + execute_spark_query( + f""" + CREATE TABLE {TABLE_NAME} ( + tag INT, + date DATE, + date2 DATE, + name VARCHAR(50), + number BIGINT + ) + USING iceberg + PARTITIONED BY (identity(tag), years(date)) + OPTIONS('format-version'='2') + """ + ) + + execute_spark_query( + f""" + INSERT INTO {TABLE_NAME} VALUES + (1, DATE '2024-01-20', DATE '2024-01-20', 'vasya', 5), + (1, DATE '2024-01-20', DATE '2024-01-20', 'vasilisa', 5), + (1, DATE '2025-01-20', DATE '2025-01-20', 'vasya', 5), + (1, DATE '2025-01-20', DATE '2025-01-20', 'vasya', 5), + (2, DATE '2025-01-20', DATE '2025-01-20', 'vasilisa', 5), + (2, DATE '2025-01-21', DATE '2025-01-20', 'vasilisa', 5) + """ + ) + + creation_expression = get_creation_expression( + storage_type, TABLE_NAME, started_cluster_iceberg_with_spark, table_function=True, run_on_cluster=run_on_cluster + ) + + # Check that view over Iceberg table works + instance.query(f"CREATE VIEW {TABLE_NAME}_view AS SELECT * FROM {creation_expression}") + + expected = instance.query(f"SELECT * FROM {TABLE_NAME}_view ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=0") + # All data + optimized = instance.query(f"SELECT * FROM {TABLE_NAME}_view ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1") + assert expected == optimized + # Constant column in where + optimized = instance.query(f"SELECT * FROM {TABLE_NAME}_view WHERE number=5 ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1") + assert expected == optimized + # Partition columns in where + optimized = instance.query(f"SELECT * FROM {TABLE_NAME}_view WHERE tag>0 AND date>'2020-01-01' ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1") + assert expected == optimized + # Non-constant column in where + optimized = instance.query(f"SELECT * FROM {TABLE_NAME}_view WHERE date2!='2020-01-01' ORDER BY ALL SETTINGS allow_experimental_iceberg_read_optimization=1") + assert expected == optimized From a1bb8f1ef390b38dcb80e2506236dfe1961073c8 Mon Sep 17 00:00:00 2001 From: Andrey Zvonov <32552679+zvonand@users.noreply.github.com> Date: Thu, 25 Jun 2026 02:10:09 +0200 Subject: [PATCH 2/2] Resolve conflicts in cherry-pick of #1751 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Kept "ours" (antalya-26.5) side of the conflict in `StorageObjectStorageSource.cpp`. The actual fix from #1751 — wrapping the format comparison with `Poco::toLower` and using lowercase `"parquet"` — was already present on antalya-26.5. The "theirs" side additionally included `&& use_native_reader_v3`, which is not part of #1751's diff (it was pre-existing context on antalya-26.3) and does not exist on antalya-26.5, so it was not introduced. `configuration->format` (field access on antalya-26.5) is kept instead of `configuration->getFormat()` (method used on antalya-26.3). --- src/Storages/ObjectStorage/StorageObjectStorageSource.cpp | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp index e6bc7e431302..5acfd7ef7a0c 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageSource.cpp @@ -802,13 +802,8 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade logIcebergFileStats(object_info, log); InputFormatPtr input_format; -<<<<<<< HEAD if (context_->getSettingsRef()[Setting::use_parquet_metadata_cache] && (Poco::toLower(object_info->getFileFormat().value_or(configuration->format)) == "parquet") -======= - if (context_->getSettingsRef()[Setting::use_parquet_metadata_cache] && use_native_reader_v3 - && (Poco::toLower(object_info->getFileFormat().value_or(configuration->getFormat())) == "parquet") ->>>>>>> 7e5f7d649ab (Merge pull request #1751 from Altinity/feature/antalya-26.3/pr-1631) && !object_info->getObjectMetadata()->etag.empty()) { std::optional object_with_metadata = object_info->relative_path_with_metadata;