Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,12 @@ parquet_converter:
# CLI flag: -parquet-converter.max-rows-per-row-group
[max_rows_per_row_group: <int> | default = 1000000]

# Maximum number of columns per Parquet file. When exceeded, conversion will
# automatically shard the data into multiple files. 0 uses the library default
# (32767).
# CLI flag: -parquet-converter.max-num-columns
[max_num_columns: <int> | default = 0]

# Enable disk-based write buffering to reduce memory consumption during
# parquet file generation.
# CLI flag: -parquet-converter.file-buffer-enabled
Expand Down
3 changes: 3 additions & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,6 @@ Currently experimental features are:
- `-ingester.head-queried-series-metrics-windows` time windows to report (default: 2h)
- `-ingester.head-queried-series-metrics-window-duration` HLL sub-window size
- `-ingester.head-queried-series-metrics-sample-rate` query sampling rate
- Parquet Converter: Maximum number of columns per file
- `-parquet-converter.max-num-columns` (int) CLI flag
- Automatically shards parquet files when the number of columns exceeds the configured limit
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
github.com/oklog/ulid/v2 v2.1.1
github.com/parquet-go/parquet-go v0.26.4
github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94
github.com/prometheus-community/parquet-common v0.0.0-20260614025832-5f32460b5373
github.com/prometheus/client_golang/exp v0.0.0-20251212205219-7ba246a648ca
github.com/prometheus/procfs v0.16.1
github.com/sercand/kuberesolver/v5 v5.1.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -863,8 +863,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94 h1:6WmPxbqGMjBKLOZvurIZR5eEBF0Rd0t1oQ06PMWaHe8=
github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94/go.mod h1:gewN7ZuOXJh0X2I57iGHyDLbLvL891P2Ynko2QM5axY=
github.com/prometheus-community/parquet-common v0.0.0-20260614025832-5f32460b5373 h1:IkWKd4KK4KOhjjj6SnEyenQ4eNW0N3IPQGAw4GOZxEY=
github.com/prometheus-community/parquet-common v0.0.0-20260614025832-5f32460b5373/go.mod h1:gewN7ZuOXJh0X2I57iGHyDLbLvL891P2Ynko2QM5axY=
github.com/prometheus-community/prom-label-proxy v0.11.1 h1:jX+m+BQCNM0z3/P6V6jVxbiDKgugvk91SaICD6bVhT4=
github.com/prometheus-community/prom-label-proxy v0.11.1/go.mod h1:uTeQW+wZ/VPV1LL3IPfvUE++wR2nPLex+Y4RE38Cpis=
github.com/prometheus/alertmanager v0.32.1 h1:BQ3jHXNq2A7VSD9Kh0Qx+kXbifNbHSDuKVbMmdRHHJ0=
Expand Down
6 changes: 6 additions & 0 deletions pkg/parquetconverter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type Config struct {
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
ConversionInterval time.Duration `yaml:"conversion_interval"`
MaxRowsPerRowGroup int `yaml:"max_rows_per_row_group"`
MaxNumColumns int `yaml:"max_num_columns"`
FileBufferEnabled bool `yaml:"file_buffer_enabled"`

DataDir string `yaml:"data_dir"`
Expand Down Expand Up @@ -108,6 +109,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MetaSyncConcurrency, "parquet-converter.meta-sync-concurrency", 20, "Maximum concurrent goroutines for downloading block metadata from object storage.")
f.IntVar(&cfg.MaxRowsPerRowGroup, "parquet-converter.max-rows-per-row-group", 1e6, "Maximum number of time series per parquet row group. Larger values improve compression but may reduce performance during reads.")
f.DurationVar(&cfg.ConversionInterval, "parquet-converter.conversion-interval", time.Minute, "How often to check for new TSDB blocks to convert to parquet format.")
f.IntVar(&cfg.MaxNumColumns, "parquet-converter.max-num-columns", 0, "Maximum number of columns per Parquet file. When exceeded, conversion will automatically shard the data into multiple files. 0 uses the library default (32767).")
f.BoolVar(&cfg.FileBufferEnabled, "parquet-converter.file-buffer-enabled", true, "Enable disk-based write buffering to reduce memory consumption during parquet file generation.")
}

Expand Down Expand Up @@ -144,6 +146,10 @@ func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex
},
}

if cfg.MaxNumColumns > 0 {
c.baseConverterOptions = append(c.baseConverterOptions, convert.WithMaxNumColumns(cfg.MaxNumColumns))
}

c.Service = services.NewBasicService(c.starting, c.running, c.stopping)
return c
}
Expand Down
55 changes: 55 additions & 0 deletions pkg/parquetconverter/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
"log/slog"
"math/rand"
"path"
"strings"
Expand All @@ -15,6 +16,7 @@ import (

"github.com/go-kit/log"
"github.com/oklog/ulid/v2"
"github.com/prometheus-community/parquet-common/convert"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -488,3 +490,56 @@ func TestConverter_SkipBlocksWithExistingValidMarker(t *testing.T) {
// It should be 0 since the block was already converted
assert.Equal(t, 0.0, testutil.ToFloat64(c.metrics.convertedBlocks.WithLabelValues(user)))
}

func TestConvertWithMaxNumColumns(t *testing.T) {
ctx := context.Background()
dbDir := t.TempDir()
db, err := tsdb.Open(dbDir, nil, nil, &tsdb.Options{
RetentionDuration: int64(24 * time.Hour / time.Millisecond),
NoLockfile: true,
}, nil)
require.NoError(t, err)
t.Cleanup(func() { _ = db.Close() })

// Create series with many unique label names to exceed column limit
app := db.Appender(ctx)
for i := range 10 {
lblBuilder := labels.NewBuilder(labels.EmptyLabels())
lblBuilder.Set(labels.MetricName, fmt.Sprintf("metric_%d", i))
for j := range 5 {
lblBuilder.Set(fmt.Sprintf("label_%d_%d", i, j), fmt.Sprintf("value_%d", j))
}
_, err := app.Append(0, lblBuilder.Labels(), int64(i)*1000, float64(i))
require.NoError(t, err)
}
require.NoError(t, app.Commit())

head := db.Head()
bkt, err := filesystem.NewBucket(t.TempDir())
require.NoError(t, err)
t.Cleanup(func() { _ = bkt.Close() })

// With low column limit, should produce multiple shards
shards, err := convert.ConvertTSDBBlock(
ctx, bkt, head.MinTime(), head.MaxTime(),
[]convert.Convertible{head},
slog.Default(),
convert.WithMaxNumColumns(20),
)
require.NoError(t, err)
require.Greater(t, shards, 1, "expected multiple shards with low column limit")

// With high column limit, should produce a single shard
bkt2, err := filesystem.NewBucket(t.TempDir())
require.NoError(t, err)
t.Cleanup(func() { _ = bkt2.Close() })

shards2, err := convert.ConvertTSDBBlock(
ctx, bkt2, head.MinTime(), head.MaxTime(),
[]convert.Convertible{head},
slog.Default(),
convert.WithMaxNumColumns(10000),
)
require.NoError(t, err)
require.Equal(t, 1, shards2, "expected single shard with high column limit")
}
6 changes: 6 additions & 0 deletions schemas/cortex-config-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -9420,6 +9420,12 @@
"type": "boolean",
"x-cli-flag": "parquet-converter.file-buffer-enabled"
},
"max_num_columns": {
"default": 0,
"description": "Maximum number of columns per Parquet file. When exceeded, conversion will automatically shard the data into multiple files. 0 uses the library default (32767).",
"type": "number",
"x-cli-flag": "parquet-converter.max-num-columns"
},
"max_rows_per_row_group": {
"default": 1000000,
"description": "Maximum number of time series per parquet row group. Larger values improve compression but may reduce performance during reads.",
Expand Down
Loading
Loading