From bf79b91bc77482865c6d287097b50b1a5ac3c712 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 16 Jun 2026 00:12:31 +0000 Subject: [PATCH] feat(parquetconverter): Add max-num-columns config for automatic parquet sharding Bump parquet-common to include PR #131 which adds automatic parquet file sharding based on a configurable column limit. The parquet-go library has a hard limit of 32767 columns; this feature allows users to set a lower limit and automatically shard into multiple files when the number of unique label names would exceed it. Changes: - Bump parquet-common to 5f32460b5373 (merged column sharding PR) - Add -parquet-converter.max-num-columns flag (default 0 = library default) - Add MaxNumColumns to Config struct and wire into convert options - Add TestConvertWithMaxNumColumns integration test - Update v1-guarantees.md with experimental feature entry - Regenerate docs and JSON schema Signed-off-by: Ben Ye --- docs/configuration/config-file-reference.md | 6 + docs/configuration/v1-guarantees.md | 3 + go.mod | 2 +- go.sum | 4 +- pkg/parquetconverter/converter.go | 6 + pkg/parquetconverter/converter_test.go | 55 ++++ schemas/cortex-config-schema.json | 6 + .../parquet-common/convert/convert.go | 276 +++++++++++++----- vendor/modules.txt | 2 +- 9 files changed, 276 insertions(+), 84 deletions(-) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 2da366f7358..26a1fc7b424 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -186,6 +186,12 @@ parquet_converter: # CLI flag: -parquet-converter.max-rows-per-row-group [max_rows_per_row_group: | default = 1000000] + # Maximum number of columns per Parquet file. When exceeded, conversion will + # automatically shard the data into multiple files. 0 uses the library default + # (32767). + # CLI flag: -parquet-converter.max-num-columns + [max_num_columns: | default = 0] + # Enable disk-based write buffering to reduce memory consumption during # parquet file generation. # CLI flag: -parquet-converter.file-buffer-enabled diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 52b1f884612..e1ccfa5451e 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -150,3 +150,6 @@ Currently experimental features are: - `-ingester.head-queried-series-metrics-windows` time windows to report (default: 2h) - `-ingester.head-queried-series-metrics-window-duration` HLL sub-window size - `-ingester.head-queried-series-metrics-sample-rate` query sampling rate +- Parquet Converter: Maximum number of columns per file + - `-parquet-converter.max-num-columns` (int) CLI flag + - Automatically shards parquet files when the number of columns exceeds the configured limit diff --git a/go.mod b/go.mod index ff195426da8..2e2156e3363 100644 --- a/go.mod +++ b/go.mod @@ -89,7 +89,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/oklog/ulid/v2 v2.1.1 github.com/parquet-go/parquet-go v0.26.4 - github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94 + github.com/prometheus-community/parquet-common v0.0.0-20260614025832-5f32460b5373 github.com/prometheus/client_golang/exp v0.0.0-20251212205219-7ba246a648ca github.com/prometheus/procfs v0.16.1 github.com/sercand/kuberesolver/v5 v5.1.1 diff --git a/go.sum b/go.sum index 725e2bbbf99..0ae952e5fdc 100644 --- a/go.sum +++ b/go.sum @@ -863,8 +863,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= -github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94 h1:6WmPxbqGMjBKLOZvurIZR5eEBF0Rd0t1oQ06PMWaHe8= -github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94/go.mod h1:gewN7ZuOXJh0X2I57iGHyDLbLvL891P2Ynko2QM5axY= +github.com/prometheus-community/parquet-common v0.0.0-20260614025832-5f32460b5373 h1:IkWKd4KK4KOhjjj6SnEyenQ4eNW0N3IPQGAw4GOZxEY= +github.com/prometheus-community/parquet-common v0.0.0-20260614025832-5f32460b5373/go.mod h1:gewN7ZuOXJh0X2I57iGHyDLbLvL891P2Ynko2QM5axY= github.com/prometheus-community/prom-label-proxy v0.11.1 h1:jX+m+BQCNM0z3/P6V6jVxbiDKgugvk91SaICD6bVhT4= github.com/prometheus-community/prom-label-proxy v0.11.1/go.mod h1:uTeQW+wZ/VPV1LL3IPfvUE++wR2nPLex+Y4RE38Cpis= github.com/prometheus/alertmanager v0.32.1 h1:BQ3jHXNq2A7VSD9Kh0Qx+kXbifNbHSDuKVbMmdRHHJ0= diff --git a/pkg/parquetconverter/converter.go b/pkg/parquetconverter/converter.go index a5ed6ce0c05..a4504624380 100644 --- a/pkg/parquetconverter/converter.go +++ b/pkg/parquetconverter/converter.go @@ -57,6 +57,7 @@ type Config struct { MetaSyncConcurrency int `yaml:"meta_sync_concurrency"` ConversionInterval time.Duration `yaml:"conversion_interval"` MaxRowsPerRowGroup int `yaml:"max_rows_per_row_group"` + MaxNumColumns int `yaml:"max_num_columns"` FileBufferEnabled bool `yaml:"file_buffer_enabled"` DataDir string `yaml:"data_dir"` @@ -108,6 +109,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.MetaSyncConcurrency, "parquet-converter.meta-sync-concurrency", 20, "Maximum concurrent goroutines for downloading block metadata from object storage.") f.IntVar(&cfg.MaxRowsPerRowGroup, "parquet-converter.max-rows-per-row-group", 1e6, "Maximum number of time series per parquet row group. Larger values improve compression but may reduce performance during reads.") f.DurationVar(&cfg.ConversionInterval, "parquet-converter.conversion-interval", time.Minute, "How often to check for new TSDB blocks to convert to parquet format.") + f.IntVar(&cfg.MaxNumColumns, "parquet-converter.max-num-columns", 0, "Maximum number of columns per Parquet file. When exceeded, conversion will automatically shard the data into multiple files. 0 uses the library default (32767).") f.BoolVar(&cfg.FileBufferEnabled, "parquet-converter.file-buffer-enabled", true, "Enable disk-based write buffering to reduce memory consumption during parquet file generation.") } @@ -144,6 +146,10 @@ func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex }, } + if cfg.MaxNumColumns > 0 { + c.baseConverterOptions = append(c.baseConverterOptions, convert.WithMaxNumColumns(cfg.MaxNumColumns)) + } + c.Service = services.NewBasicService(c.starting, c.running, c.stopping) return c } diff --git a/pkg/parquetconverter/converter_test.go b/pkg/parquetconverter/converter_test.go index bdcf46b3d36..98e92b911b1 100644 --- a/pkg/parquetconverter/converter_test.go +++ b/pkg/parquetconverter/converter_test.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "log/slog" "math/rand" "path" "strings" @@ -15,6 +16,7 @@ import ( "github.com/go-kit/log" "github.com/oklog/ulid/v2" + "github.com/prometheus-community/parquet-common/convert" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" @@ -488,3 +490,56 @@ func TestConverter_SkipBlocksWithExistingValidMarker(t *testing.T) { // It should be 0 since the block was already converted assert.Equal(t, 0.0, testutil.ToFloat64(c.metrics.convertedBlocks.WithLabelValues(user))) } + +func TestConvertWithMaxNumColumns(t *testing.T) { + ctx := context.Background() + dbDir := t.TempDir() + db, err := tsdb.Open(dbDir, nil, nil, &tsdb.Options{ + RetentionDuration: int64(24 * time.Hour / time.Millisecond), + NoLockfile: true, + }, nil) + require.NoError(t, err) + t.Cleanup(func() { _ = db.Close() }) + + // Create series with many unique label names to exceed column limit + app := db.Appender(ctx) + for i := range 10 { + lblBuilder := labels.NewBuilder(labels.EmptyLabels()) + lblBuilder.Set(labels.MetricName, fmt.Sprintf("metric_%d", i)) + for j := range 5 { + lblBuilder.Set(fmt.Sprintf("label_%d_%d", i, j), fmt.Sprintf("value_%d", j)) + } + _, err := app.Append(0, lblBuilder.Labels(), int64(i)*1000, float64(i)) + require.NoError(t, err) + } + require.NoError(t, app.Commit()) + + head := db.Head() + bkt, err := filesystem.NewBucket(t.TempDir()) + require.NoError(t, err) + t.Cleanup(func() { _ = bkt.Close() }) + + // With low column limit, should produce multiple shards + shards, err := convert.ConvertTSDBBlock( + ctx, bkt, head.MinTime(), head.MaxTime(), + []convert.Convertible{head}, + slog.Default(), + convert.WithMaxNumColumns(20), + ) + require.NoError(t, err) + require.Greater(t, shards, 1, "expected multiple shards with low column limit") + + // With high column limit, should produce a single shard + bkt2, err := filesystem.NewBucket(t.TempDir()) + require.NoError(t, err) + t.Cleanup(func() { _ = bkt2.Close() }) + + shards2, err := convert.ConvertTSDBBlock( + ctx, bkt2, head.MinTime(), head.MaxTime(), + []convert.Convertible{head}, + slog.Default(), + convert.WithMaxNumColumns(10000), + ) + require.NoError(t, err) + require.Equal(t, 1, shards2, "expected single shard with high column limit") +} diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index a8c501c75f2..673390a15ad 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -9420,6 +9420,12 @@ "type": "boolean", "x-cli-flag": "parquet-converter.file-buffer-enabled" }, + "max_num_columns": { + "default": 0, + "description": "Maximum number of columns per Parquet file. When exceeded, conversion will automatically shard the data into multiple files. 0 uses the library default (32767).", + "type": "number", + "x-cli-flag": "parquet-converter.max-num-columns" + }, "max_rows_per_row_group": { "default": 1000000, "description": "Maximum number of time series per parquet row group. Larger values improve compression but may reduce performance during reads.", diff --git a/vendor/github.com/prometheus-community/parquet-common/convert/convert.go b/vendor/github.com/prometheus-community/parquet-common/convert/convert.go index 3780b4c3909..969c355c2df 100644 --- a/vendor/github.com/prometheus-community/parquet-common/convert/convert.go +++ b/vendor/github.com/prometheus-community/parquet-common/convert/convert.go @@ -39,6 +39,9 @@ import ( "github.com/prometheus-community/parquet-common/schema" ) +// 2 system columns: s_col_indexes and s_series_hash +const systemColumns = 2 + var DefaultConvertOpts = convertOpts{ name: "block", rowGroupSize: 1e6, @@ -52,6 +55,7 @@ var DefaultConvertOpts = convertOpts{ readConcurrency: runtime.GOMAXPROCS(0), writeConcurrency: 1, maxSamplesPerChunk: tsdb.DefaultSamplesPerChunk, + maxNumColumns: parquet.MaxColumnIndex, // max column index supported by parquet-go } type Convertible interface { @@ -74,6 +78,7 @@ type convertOpts struct { readConcurrency int writeConcurrency int maxSamplesPerChunk int + maxNumColumns int labelsCompressionOpts []schema.CompressionOpts chunksCompressionOpts []schema.CompressionOpts } @@ -277,6 +282,27 @@ func WithMaxSamplesPerChunk(samplesPerChunk int) ConvertOption { } } +// WithMaxNumColumns sets the maximum number of columns allowed in a Parquet file. +// Parquet-go library has a limit of max column index supported. When this limit is exceeded, +// the conversion will automatically shard the data into multiple files. This option allows +// users to control the number of columns in the converted parquet file. +// +// The limit includes both label columns (one per unique label name) and 2 system columns +// (s_col_indexes and s_series_hash). For example, if maxColumns is 1000, then at most +// 998 unique label names can be included in a single shard. +// +// Parameters: +// - maxColumns: Maximum number of columns per Parquet file, including system columns +// +// Example: +// +// WithMaxNumColumns(1000) // Use a smaller limit for testing +func WithMaxNumColumns(maxColumns int) ConvertOption { + return func(opts *convertOpts) { + opts.maxNumColumns = maxColumns + } +} + func WithColumnPageBuffers(buffers parquet.BufferPool) ConvertOption { return func(opts *convertOpts) { opts.columnPageBuffers = buffers @@ -359,17 +385,16 @@ func ConvertTSDBBlock( } var ( - rr *TSDBRowReader shardedRowReaders []*TSDBRowReader err error ) // If numRowGroups is not specified, we can use a single row reader. + // However, we may still need to shard if column limits are exceeded. if cfg.numRowGroups == math.MaxInt32 { - rr, err = singleTSDBRowReader(ctx, mint, maxt, cfg.colDuration.Milliseconds(), blocks, cfg) + shardedRowReaders, err = singleTSDBRowReader(ctx, mint, maxt, cfg.colDuration.Milliseconds(), blocks, cfg) if err != nil { return 0, errors.Wrap(err, "failed to create TSDB row readers") } - shardedRowReaders = []*TSDBRowReader{rr} } else { logger.Info("sharding input series") shardedRowReaders, err = shardedTSDBRowReaders(ctx, mint, maxt, cfg.colDuration.Milliseconds(), blocks, cfg) @@ -439,21 +464,76 @@ type blockSeries struct { labels labels.Labels } -// singleTSDBRowReader is a shortcut when we know there is only one final shard. -// This can happen when numRowGroups is not specified. +// singleTSDBRowReader creates row readers when numRowGroups is not specified. +// It may create multiple shards if the column limit would be exceeded. func singleTSDBRowReader( ctx context.Context, mint, maxt, colDuration int64, blocks []Convertible, opts convertOpts, -) (*TSDBRowReader, error) { +) ([]*TSDBRowReader, error) { + // First, check if we need to shard based on column limits by getting all unique label names. + // This is cheaper than fully initializing indexReaders and postings, so we do this first. + allLabelNames := make(map[string]struct{}) + for _, blk := range blocks { + indexr, err := blk.Index() + if err != nil { + return nil, errors.Wrap(err, "failed to get label names from index reader") + } + lblns, err := indexr.LabelNames(ctx) + if err != nil { + _ = indexr.Close() + return nil, errors.Wrap(err, "failed to get label names from index reader") + } + for _, lbln := range lblns { + allLabelNames[lbln] = struct{}{} + } + _ = indexr.Close() + } + + // If total unique label names exceed the limit, we need to shard based only on column limits. + // Equality is allowed (exactly maxNumColumns columns is fine). + if len(allLabelNames)+systemColumns > opts.maxNumColumns { + indexReaders := make([]blockIndexReader, len(blocks)) + defer func() { + for _, indexReader := range indexReaders { + if indexReader.reader != nil { + _ = indexReader.reader.Close() + } + } + }() + for i, blk := range blocks { + indexReader, err := blk.Index() + if err != nil { + return nil, errors.Wrap(err, "failed to get index reader from block") + } + indexReaders[i] = blockIndexReader{ + blockID: blk.Meta().ULID, + idx: i, + reader: indexReader, + postings: tsdb.AllSortedPostings(ctx, indexReader), + } + } + + // Use shardSeries to shard based only on column limits (no row group limits). + uniqueSeriesCount, shardedSeries, err := shardSeries(indexReaders, mint, maxt, opts) + if err != nil { + return nil, errors.Wrap(err, "failed to determine unique series count") + } + if uniqueSeriesCount == 0 { + return nil, errors.New("no series found in the specified time range") + } + + // Create row readers from sharded series (same logic as shardedTSDBRowReaders). + return createShardedRowReaders(ctx, mint, maxt, colDuration, blocks, shardedSeries, opts) + } + + // No sharding needed - create a single row reader (original behavior) var ( seriesSets = make([]storage.ChunkSeriesSet, 0, len(blocks)) closers = make([]io.Closer, 0, len(blocks)) ok = false ) - // If we fail to build the row reader, make sure we release resources. - // This could be either a controlled error or a panic. defer func() { if !ok { for i := range closers { @@ -470,7 +550,6 @@ func singleTSDBRowReader( return c } } - return labels.Compare(a, b) } @@ -510,45 +589,18 @@ func singleTSDBRowReader( return nil, fmt.Errorf("unable to build schema reader from block: %w", err) } ok = true - return newTSDBRowReader(ctx, closers, cseriesSet, s, opts), nil + return []*TSDBRowReader{newTSDBRowReader(ctx, closers, cseriesSet, s, opts)}, nil } -func shardedTSDBRowReaders( +// createShardedRowReaders creates TSDBRowReader instances from sharded series. +// This is a shared helper used by both singleTSDBRowReader and shardedTSDBRowReaders. +func createShardedRowReaders( ctx context.Context, mint, maxt, colDuration int64, blocks []Convertible, + shardedSeries []map[int][]blockSeries, opts convertOpts, ) ([]*TSDBRowReader, error) { - // Blocks can have multiple entries with the same of ULID in the case of head blocks; - // track all blocks by their index in the input slice rather than assuming unique ULIDs. - indexReaders := make([]blockIndexReader, len(blocks)) - // Simpler to track and close these readers separate from those used by shard conversion reader/writers. - defer func() { - for _, indexReader := range indexReaders { - _ = indexReader.reader.Close() - } - }() - for i, blk := range blocks { - indexReader, err := blk.Index() - if err != nil { - return nil, errors.Wrap(err, "failed to get index reader from block") - } - indexReaders[i] = blockIndexReader{ - blockID: blk.Meta().ULID, - idx: i, - reader: indexReader, - postings: tsdb.AllSortedPostings(ctx, indexReader), - } - } - - uniqueSeriesCount, shardedSeries, err := shardSeries(indexReaders, mint, maxt, opts) - if err != nil { - return nil, errors.Wrap(err, "failed to determine unique series count") - } - if uniqueSeriesCount == 0 { - return nil, errors.Wrap(err, "no series found in the specified time range") - } - shardTSDBRowReaders := make([]*TSDBRowReader, len(shardedSeries)) // We close everything if any errors or panic occur @@ -579,7 +631,7 @@ func shardedTSDBRowReaders( blk := blocks[blockSeries[0].blockIdx] // Init all readers for block & add to closers - // Init separate index readers from above indexReaders to simplify closing logic + // Init separate index readers to simplify closing logic indexr, err := blk.Index() if err != nil { return nil, errors.Wrap(err, "failed to get index reader from block") @@ -630,6 +682,45 @@ func shardedTSDBRowReaders( return shardTSDBRowReaders, nil } +func shardedTSDBRowReaders( + ctx context.Context, + mint, maxt, colDuration int64, + blocks []Convertible, + opts convertOpts, +) ([]*TSDBRowReader, error) { + // Blocks can have multiple entries with the same of ULID in the case of head blocks; + // track all blocks by their index in the input slice rather than assuming unique ULIDs. + indexReaders := make([]blockIndexReader, len(blocks)) + // Simpler to track and close these readers separate from those used by shard conversion reader/writers. + defer func() { + for _, indexReader := range indexReaders { + _ = indexReader.reader.Close() + } + }() + for i, blk := range blocks { + indexReader, err := blk.Index() + if err != nil { + return nil, errors.Wrap(err, "failed to get index reader from block") + } + indexReaders[i] = blockIndexReader{ + blockID: blk.Meta().ULID, + idx: i, + reader: indexReader, + postings: tsdb.AllSortedPostings(ctx, indexReader), + } + } + + uniqueSeriesCount, shardedSeries, err := shardSeries(indexReaders, mint, maxt, opts) + if err != nil { + return nil, errors.Wrap(err, "failed to determine unique series count") + } + if uniqueSeriesCount == 0 { + return nil, errors.Wrap(err, "no series found in the specified time range") + } + + return createShardedRowReaders(ctx, mint, maxt, colDuration, blocks, shardedSeries, opts) +} + func shardSeries( blockIndexReaders []blockIndexReader, mint, maxt int64, @@ -681,50 +772,75 @@ func shardSeries( } } - // Divide rows evenly across shards to avoid one small shard at the end; - // Use (a + b - 1) / b equivalence to math.Ceil(a / b) - // so integer division does not cut off the remainder series and to avoid floating point issues. - totalShards := (uniqueSeriesCount + (opts.numRowGroups * opts.rowGroupSize) - 1) / (opts.numRowGroups * opts.rowGroupSize) - rowsPerShard := (uniqueSeriesCount + totalShards - 1) / totalShards + // Calculate row-based sharding only if numRowGroups is set (not MaxInt32) + var targetTotalShards int + var rowsPerShard int + if opts.numRowGroups != math.MaxInt32 { + // Divide rows evenly across shards to avoid one small shard at the end; + // Use (a + b - 1) / b equivalence to math.Ceil(a / b) + // so integer division does not cut off the remainder series and to avoid floating point issues. + targetTotalShards = (uniqueSeriesCount + (opts.numRowGroups * opts.rowGroupSize) - 1) / (opts.numRowGroups * opts.rowGroupSize) + rowsPerShard = (uniqueSeriesCount + targetTotalShards - 1) / targetTotalShards + } // For each shard index i, shardSeries[i] is a map of blockIdx -> []series. - shardSeries := make([]map[int][]blockSeries, totalShards) - for i := range shardSeries { - shardSeries[i] = make(map[int][]blockSeries) - } - - shardIdx, allSeriesIdx := 0, 0 - for shardIdx < totalShards { - seriesToShard := allSeries[allSeriesIdx:] - - i, uniqueCount := 0, 0 - matchLabels := labels.Labels{} - for i < len(seriesToShard) { - current := seriesToShard[i] - if labels.Compare(current.labels, matchLabels) != 0 { - // New unique series - - if uniqueCount >= rowsPerShard { - // Stop before adding current series if it would exceed the unique series count for the shard. - // Do not increment, we will start the next shard with this series. - break + // Start with one shard and dynamically create more as needed to avoid column limit. + // If row-based sharding is enabled, pre-allocate capacity for expected shards. + initialCapacity := 1 + if opts.numRowGroups != math.MaxInt32 && targetTotalShards > 1 { + initialCapacity = targetTotalShards + } + shardSeries := make([]map[int][]blockSeries, 1, initialCapacity) + shardSeries[0] = make(map[int][]blockSeries) + + shardIdx, uniqueCount := 0, 0 + matchLabels := labels.Labels{} + labelColumns := make(map[string]struct{}) + for _, series := range allSeries { + if labels.Compare(series.labels, matchLabels) != 0 { + // New unique series + // Count how many new label names this series would add + newLabelCount := 0 + series.labels.Range(func(label labels.Label) { + if _, exists := labelColumns[label.Name]; !exists { + newLabelCount++ } + }) + + // Create a new shard if: + // 1. Row-based sharding is enabled AND the row limit is reached, OR + // 2. Adding this series would exceed the column limit (equality is allowed) + shouldCreateNewShard := false + if opts.numRowGroups != math.MaxInt32 && uniqueCount >= rowsPerShard { + shouldCreateNewShard = true + } + if len(labelColumns)+newLabelCount+systemColumns > opts.maxNumColumns { + shouldCreateNewShard = true + } - // Unique series limit is not hit yet for the shard; add the series. - shardSeries[shardIdx][current.blockIdx] = append(shardSeries[shardIdx][current.blockIdx], current) - // Increment unique count, update labels to compare against, and move on to next series - uniqueCount++ - matchLabels = current.labels - i++ - } else { - // Same labelset as previous series, add it to the shard but do not increment unique count - shardSeries[shardIdx][current.blockIdx] = append(shardSeries[shardIdx][current.blockIdx], current) - // Move on to next series - i++ + if shouldCreateNewShard { + // Create a new shard and start with this series + shardIdx++ + shardSeries = append(shardSeries, make(map[int][]blockSeries)) + labelColumns = make(map[string]struct{}) + uniqueCount = 0 } - allSeriesIdx++ + + // Track unique label names for this shard (after potentially creating a new shard) + series.labels.Range(func(label labels.Label) { + labelColumns[label.Name] = struct{}{} + }) + + // Unique series limit is not hit yet for the shard; add the series. + shardSeries[shardIdx][series.blockIdx] = append(shardSeries[shardIdx][series.blockIdx], series) + // Increment unique count, update labels to compare against, and move on to next series + uniqueCount++ + matchLabels = series.labels + } else { + // Same labelset as previous series, add it to the shard but do not increment unique count + shardSeries[shardIdx][series.blockIdx] = append(shardSeries[shardIdx][series.blockIdx], series) + // Move on to next series } - shardIdx++ } return uniqueSeriesCount, shardSeries, nil diff --git a/vendor/modules.txt b/vendor/modules.txt index af452941bcd..8996e359fdf 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -967,7 +967,7 @@ github.com/planetscale/vtprotobuf/types/known/wrapperspb # github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 ## explicit github.com/pmezard/go-difflib/difflib -# github.com/prometheus-community/parquet-common v0.0.0-20251211092633-65ebeae24e94 +# github.com/prometheus-community/parquet-common v0.0.0-20260614025832-5f32460b5373 ## explicit; go 1.24.0 github.com/prometheus-community/parquet-common/convert github.com/prometheus-community/parquet-common/queryable