diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e7cf316a53..c979a414488 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ * [ENHANCEMENT] Upgrade prometheus alertmanager version to v0.32.1. #7462 * [ENHANCEMENT] Tenant Federation: Avoid purging the regex resolver LRU cache on user-sync ticks when the set of known users has not changed. #7489 * [ENHANCEMENT] Memberlist: Add `-memberlist.packet-read-timeout`, `-memberlist.max-packet-size`, and `-memberlist.max-concurrent-connections` flags to bound inbound gossip TCP connections, preventing slow-read, OOM, and connection-flood attacks on the gossip port. #7518 +* [ENHANCEMENT] Parquet Converter: Add `parquet-converter.max-block-label-names` limit to skip conversion of TSDB blocks with too many label names. #7625 * [ENHANCEMENT] Parquet Converter: Add a ring status page to expose the ring status. #7455 * [ENHANCEMENT] Parquet: Add a row ranges cache for parquet query filtering in querier and store-gateway. #7478 * [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 2da366f7358..e64a47ead8c 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -4779,6 +4779,12 @@ query_rejection: # CLI flag: -parquet-converter.sort-columns [parquet_converter_sort_columns: | default = []] +# [Experimental] Maximum number of distinct label names allowed in a TSDB block +# for parquet conversion. If exceeded, the converter writes a no-convert marker. +# 0 to disable. +# CLI flag: -parquet-converter.max-block-label-names +[parquet_converter_max_block_label_names: | default = 0] + # S3 server-side encryption type. Required to enable server-side encryption # overrides for a specific tenant. If not set, the default S3 client settings # are used. diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 52b1f884612..2b6d91cf6e8 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -150,3 +150,4 @@ Currently experimental features are: - `-ingester.head-queried-series-metrics-windows` time windows to report (default: 2h) - `-ingester.head-queried-series-metrics-window-duration` HLL sub-window size - `-ingester.head-queried-series-metrics-sample-rate` query sampling rate +- Parquet Converter: `-parquet-converter.max-block-label-names` (int) - If enabled, adds a no-convert mark and skips blocks with too many labels. diff --git a/integration/parquet_converter_test.go b/integration/parquet_converter_test.go new file mode 100644 index 00000000000..4a10d3f1e0f --- /dev/null +++ b/integration/parquet_converter_test.go @@ -0,0 +1,142 @@ +//go:build integration + +package integration + +import ( + "context" + "fmt" + "math/rand" + "path/filepath" + "testing" + "time" + + "github.com/prometheus/prometheus/model/labels" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + + "github.com/cortexproject/cortex/integration/e2e" + e2ecache "github.com/cortexproject/cortex/integration/e2e/cache" + e2edb "github.com/cortexproject/cortex/integration/e2e/db" + "github.com/cortexproject/cortex/integration/e2ecortex" + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/util/log" + cortex_testutil "github.com/cortexproject/cortex/pkg/util/test" +) + +func TestParquetConverter_NoConvertMarkWithTooManyLabels(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + consul := e2edb.NewConsulWithName("consul") + memcached := e2ecache.NewMemcached() + require.NoError(t, s.StartAndWaitReady(consul, memcached)) + + baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags()) + flags := mergeFlags( + baseFlags, + map[string]string{ + "-target": "all,parquet-converter", + "-blocks-storage.tsdb.block-ranges-period": "1m,24h", + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.bucket-store.sync-interval": "1s", + "-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl": "1s", + "-blocks-storage.bucket-store.bucket-index.idle-timeout": "1s", + "-blocks-storage.bucket-store.bucket-index.enabled": "true", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + // compactor + "-compactor.cleanup-interval": "1s", + // Ingester. + "-ring.store": "consul", + "-consul.hostname": consul.NetworkHTTPEndpoint(), + // Distributor. + "-distributor.replication-factor": "1", + // Store-gateway. + "-store-gateway.sharding-enabled": "false", + "--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + // Enable vertical sharding. + "-frontend.query-vertical-shard-size": "3", + "-frontend.max-cache-freshness": "1m", + // enable experimental promQL funcs + "-querier.enable-promql-experimental-functions": "true", + // parquet-converter + "-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(), + "-parquet-converter.conversion-interval": "1s", + "-parquet-converter.enabled": "true", + "-parquet-converter.max-block-label-names": "1", + // Querier + "-querier.enable-parquet-queryable": "true", + // Enable cache for parquet labels and chunks + "-blocks-storage.bucket-store.parquet-labels-cache.backend": "inmemory,memcached", + "-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), + "-blocks-storage.bucket-store.chunks-cache.backend": "inmemory,memcached", + "-blocks-storage.bucket-store.chunks-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort), + }, + ) + + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + ctx := context.Background() + rnd := rand.New(rand.NewSource(time.Now().Unix())) + dir := filepath.Join(s.SharedDir(), "data") + lbls := []labels.Labels{ + labels.FromStrings("__name__", "test_series_a", "job", "test"), + } + + numSamples := 60 + scrapeInterval := time.Minute + now := time.Now() + start := now.Add(-time.Hour * 24) + end := now.Add(-time.Hour) + + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(minio)) + + cortex := e2ecortex.NewSingleBinary("cortex", flags, "") + require.NoError(t, s.StartAndWaitReady(cortex)) + storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, err) + bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil) + + id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, + start.UnixMilli(), + end.UnixMilli(), + scrapeInterval.Milliseconds(), 10, + ) + require.NoError(t, err) + + err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc) + require.NoError(t, err) + + // Wait for the converter to write the no-convert marker + cortex_testutil.Poll(t, 30*time.Second, true, func() interface{} { + noConvertMarkerPath := fmt.Sprintf("%s/parquet-no-convert-mark.json", id.String()) + found := false + err := bkt.Iter(ctx, "", func(name string) error { + if name == noConvertMarkerPath { + found = true + } + return nil + }, objstore.WithRecursiveIter()) + require.NoError(t, err) + return found + }) + + // confirm the conversion did not happen (check both paths) + blockID := id.String() + markerPaths := []string{ + fmt.Sprintf("%s/parquet-converter-mark.json", blockID), + fmt.Sprintf("parquet-markers/%s-parquet-converter-mark.json", blockID), + } + for _, markerPath := range markerPaths { + exists, err := bkt.Exists(ctx, markerPath) + require.NoError(t, err) + require.False(t, exists, "converter mark should not exist at %s", markerPath) + } +} diff --git a/pkg/parquetconverter/converter.go b/pkg/parquetconverter/converter.go index a5ed6ce0c05..39b18d725f4 100644 --- a/pkg/parquetconverter/converter.go +++ b/pkg/parquetconverter/converter.go @@ -49,6 +49,9 @@ const ( ringKey = "parquet-converter" converterMetaPrefix = "converter-meta-" + + parquetConverterDataColumnDuration = time.Hour * 8 + parquetConverterSystemColumnCount = 2 // s_col_indexes and s_series_hash. ) var RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil) @@ -139,7 +142,7 @@ func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex metrics: newMetrics(registerer), bkt: bkt, baseConverterOptions: []convert.ConvertOption{ - convert.WithColDuration(time.Hour * 8), + convert.WithColDuration(parquetConverterDataColumnDuration), convert.WithRowGroupSize(cfg.MaxRowsPerRowGroup), }, } @@ -396,6 +399,20 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin continue } + configuredMaxBlockLabelNames := c.limits.ParquetConverterMaxBlockLabelNames(userID) + maxBlockLabelNames := effectiveMaxBlockLabelNames(configuredMaxBlockLabelNames, b.MinTime, b.MaxTime) + + noConvertMark, err := cortex_parquet.ReadNoConvertMark(ctx, b.ULID, uBucket, logger) + if err != nil { + level.Error(logger).Log("msg", "failed to read parquet no-convert marker", "block", b.ULID.String(), "err", err) + continue + } + if cortex_parquet.ValidNoConvertMarkVersion(noConvertMark.Version) { + level.Debug(logger).Log("msg", "skipping block, no-convert marker already exists", "block", b.ULID.String()) + c.metrics.skippedBlocks.WithLabelValues(userID, cortex_parquet.NoConvertReasonMarkerExists).Inc() + continue + } + if err := os.RemoveAll(c.compactRootDir()); err != nil { level.Error(logger).Log("msg", "failed to remove work directory", "path", c.compactRootDir(), "err", err) if c.checkConvertError(userID, err) { @@ -425,6 +442,33 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin continue } + if configuredMaxBlockLabelNames > 0 { + labelNames, err := tsdbBlock.LabelNames(ctx) + if err != nil { + _ = tsdbBlock.Close() + level.Error(logger).Log("msg", "failed to get label names", "block", b.ULID.String(), "err", err) + if c.checkConvertError(userID, err) { + return err + } + continue + } + labelNamesCount := len(labelNames) + if labelNamesCount > maxBlockLabelNames { + if err := cortex_parquet.WriteNoConvertMark(ctx, b.ULID, uBucket, labelNamesCount, maxBlockLabelNames); err != nil { + _ = tsdbBlock.Close() + level.Error(logger).Log("msg", "failed to write parquet no-convert marker", "block", b.ULID.String(), "err", err) + if c.checkConvertError(userID, err) { + return err + } + continue + } + level.Debug(logger).Log("msg", "skipping parquet conversion for block with too many label names", "block", b.ULID.String(), "label_names", labelNamesCount, "limit", maxBlockLabelNames) + c.metrics.skippedBlocks.WithLabelValues(userID, cortex_parquet.NoConvertReasonTooManyLabels).Inc() + _ = tsdbBlock.Close() + continue + } + } + level.Info(logger).Log("msg", "converting block", "block", b.ULID.String(), "dir", bdir) start := time.Now() @@ -486,6 +530,25 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin return nil } +func effectiveMaxBlockLabelNames(configuredMaxBlockLabelNames int, mint, maxt int64) int { + if configuredMaxBlockLabelNames <= 0 { + return configuredMaxBlockLabelNames + } + + dataColumnCount := 0 + if maxt >= mint { + dataColumnCount = int((maxt-mint)/parquetConverterDataColumnDuration.Milliseconds()) + 1 + } + + // Reserve for s_col_indexes, s_series_hash, and generated s_data_* columns. + maxBlockLabelNames := max(parquet.MaxColumnIndex-parquetConverterSystemColumnCount-dataColumnCount, 0) + + if configuredMaxBlockLabelNames > maxBlockLabelNames { + return maxBlockLabelNames + } + return configuredMaxBlockLabelNames +} + func (c *Converter) checkConvertError(userID string, err error) (terminate bool) { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || c.isCausedByPermissionDenied(err) { terminate = true diff --git a/pkg/parquetconverter/converter_test.go b/pkg/parquetconverter/converter_test.go index bdcf46b3d36..985ee3a4fcd 100644 --- a/pkg/parquetconverter/converter_test.go +++ b/pkg/parquetconverter/converter_test.go @@ -15,6 +15,7 @@ import ( "github.com/go-kit/log" "github.com/oklog/ulid/v2" + parquetgo "github.com/parquet-go/parquet-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" @@ -435,6 +436,7 @@ func TestConverter_SkipBlocksWithExistingValidMarker(t *testing.T) { limits := &validation.Limits{} flagext.DefaultValues(limits) limits.ParquetConverterEnabled = true + limits.ParquetConverterMaxBlockLabelNames = 1 c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits, nil) @@ -488,3 +490,133 @@ func TestConverter_SkipBlocksWithExistingValidMarker(t *testing.T) { // It should be 0 since the block was already converted assert.Equal(t, 0.0, testutil.ToFloat64(c.metrics.convertedBlocks.WithLabelValues(user))) } + +func TestConverter_WriteNoConvertMarkForBlockWithTooManyLabels(t *testing.T) { + cfg := prepareConfig() + user := "user" + ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + dir := t.TempDir() + + cfg.Ring.InstanceID = "parquet-converter-1" + cfg.Ring.InstanceAddr = "1.2.3.4" + cfg.Ring.KVStore.Mock = ringStore + bucketClient, err := filesystem.NewBucket(t.TempDir()) + require.NoError(t, err) + userBucket := bucket.NewPrefixedBucketClient(bucketClient, user) + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.ParquetConverterEnabled = true + limits.ParquetConverterMaxBlockLabelNames = 1 + + c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits, nil) + + ctx := context.Background() + + lbls := labels.FromStrings("__name__", "test", "job", "foo") + + // Create a block + rnd := rand.New(rand.NewSource(time.Now().Unix())) + + // 2h blocks are skipped by ShouldConvertBlockToParquet + blockID, err := e2e.CreateBlock(ctx, rnd, dir, []labels.Labels{lbls}, 2, 0, 4*time.Hour.Milliseconds(), time.Minute.Milliseconds(), 10) + require.NoError(t, err) + + // Upload the block to the bucket + blockDir := fmt.Sprintf("%s/%s", dir, blockID.String()) + b, err := tsdb.OpenBlock(nil, blockDir, nil, nil) + require.NoError(t, err) + err = block.Upload(ctx, logger, userBucket, b.Dir(), metadata.NoneFunc) + require.NoError(t, err) + + err = services.StartAndAwaitRunning(context.Background(), c) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(ctx, c) // nolint:errcheck + + // Start the converter + err = c.convertUser(ctx, logger, c.ring, user) + require.NoError(t, err) + + // Verify the marker was written correctly + readNoConvertMark, err := parquet.ReadNoConvertMark(ctx, blockID, userBucket, logger) + require.NoError(t, err) + require.True(t, parquet.ValidNoConvertMarkVersion(readNoConvertMark.Version)) + require.Equal(t, fmt.Sprintf("%s: label_names_count=%d threshold=%d", parquet.NoConvertReasonTooManyLabels, 2, 1), readNoConvertMark.Reason) + + // Confirm conversion did not happen + assert.Equal(t, 0.0, testutil.ToFloat64(c.metrics.convertedBlocks.WithLabelValues(user))) + assert.Equal(t, 1.0, testutil.ToFloat64(c.metrics.skippedBlocks.WithLabelValues(user, parquet.NoConvertReasonTooManyLabels))) +} + +func TestConverter_SkipBlockWhenNoConvertMarkAlreadyExists(t *testing.T) { + cfg := prepareConfig() + user := "user" + ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + dir := t.TempDir() + + cfg.Ring.InstanceID = "parquet-converter-1" + cfg.Ring.InstanceAddr = "1.2.3.4" + cfg.Ring.KVStore.Mock = ringStore + bucketClient, err := filesystem.NewBucket(t.TempDir()) + require.NoError(t, err) + userBucket := bucket.NewPrefixedBucketClient(bucketClient, user) + limits := &validation.Limits{} + flagext.DefaultValues(limits) + limits.ParquetConverterEnabled = true + + c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits, nil) + + ctx := context.Background() + + lbls := labels.FromStrings("__name__", "test", "job", "foo") + rnd := rand.New(rand.NewSource(time.Now().Unix())) + + // 2h blocks are skipped by ShouldConvertBlockToParquet + blockID, err := e2e.CreateBlock(ctx, rnd, dir, []labels.Labels{lbls}, 2, 0, + 4*time.Hour.Milliseconds(), time.Minute.Milliseconds(), 10) + require.NoError(t, err) + + blockDir := fmt.Sprintf("%s/%s", dir, blockID.String()) + b, err := tsdb.OpenBlock(nil, blockDir, nil, nil) + require.NoError(t, err) + err = block.Upload(ctx, logger, userBucket, b.Dir(), metadata.NoneFunc) + require.NoError(t, err) + + markerV1 := parquet.NoConvertMark{ + Version: parquet.CurrentNoConvertMarkVersion, + Reason: "manually uploaded", + } + markerBytes, err := json.Marshal(markerV1) + require.NoError(t, err) + markerPath := path.Join(blockID.String(), parquet.NoConvertMarkerFileName) + err = userBucket.Upload(ctx, markerPath, bytes.NewReader(markerBytes)) + require.NoError(t, err) + + err = services.StartAndAwaitRunning(context.Background(), c) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(ctx, c) // nolint:errcheck + + // start converter + err = c.convertUser(ctx, logger, c.ring, user) + require.NoError(t, err) + + // confirm conversion was skipped + assert.Equal(t, 0.0, testutil.ToFloat64(c.metrics.convertedBlocks.WithLabelValues(user))) + assert.Equal(t, 1.0, testutil.ToFloat64(c.metrics.skippedBlocks.WithLabelValues(user, parquet.NoConvertReasonMarkerExists))) + + markerAfter, err := parquet.ReadNoConvertMark(ctx, blockID, userBucket, logger) + require.NoError(t, err) + require.True(t, parquet.ValidNoConvertMarkVersion(markerAfter.Version)) + require.Equal(t, "manually uploaded", markerAfter.Reason) +} + +func TestEffectiveMaxBlockLabelNamesLeavesRoomForGeneratedColumns(t *testing.T) { + mint := int64(0) + maxt := 2 * parquetConverterDataColumnDuration.Milliseconds() + expectedReservedColumns := parquetConverterSystemColumnCount + 3 + + require.Equal(t, 10, effectiveMaxBlockLabelNames(10, mint, maxt)) + require.Equal(t, parquetgo.MaxColumnIndex-expectedReservedColumns, effectiveMaxBlockLabelNames(parquetgo.MaxColumnIndex, mint, maxt)) + require.Equal(t, 0, effectiveMaxBlockLabelNames(0, mint, maxt)) +} diff --git a/pkg/parquetconverter/metrics.go b/pkg/parquetconverter/metrics.go index 2b3e80b0cfd..97c2985b9dd 100644 --- a/pkg/parquetconverter/metrics.go +++ b/pkg/parquetconverter/metrics.go @@ -8,6 +8,7 @@ import ( type metrics struct { convertedBlocks *prometheus.CounterVec convertBlockFailures *prometheus.CounterVec + skippedBlocks *prometheus.CounterVec convertBlockDuration *prometheus.GaugeVec convertParquetBlockDelay prometheus.Histogram ownedUsers prometheus.Gauge @@ -23,6 +24,10 @@ func newMetrics(reg prometheus.Registerer) *metrics { Name: "cortex_parquet_converter_block_convert_failures_total", Help: "Total number of failed block conversions per user.", }, []string{"user"}), + skippedBlocks: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "cortex_parquet_converter_blocks_skipped_total", + Help: "Total number of blocks skipped during parquet conversion per user and reason.", + }, []string{"user", "reason"}), convertBlockDuration: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "cortex_parquet_converter_convert_block_duration_seconds", Help: "Time taken to for the latest block conversion for the user.", @@ -42,5 +47,6 @@ func newMetrics(reg prometheus.Registerer) *metrics { func (m *metrics) deleteMetricsForTenant(userID string) { m.convertedBlocks.DeleteLabelValues(userID) m.convertBlockFailures.DeleteLabelValues(userID) + m.skippedBlocks.DeletePartialMatch(prometheus.Labels{"user": userID}) m.convertBlockDuration.DeleteLabelValues(userID) } diff --git a/pkg/storage/parquet/no_convert_marker.go b/pkg/storage/parquet/no_convert_marker.go new file mode 100644 index 00000000000..9f48ff97511 --- /dev/null +++ b/pkg/storage/parquet/no_convert_marker.go @@ -0,0 +1,72 @@ +package parquet + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "path" + + "github.com/efficientgo/core/errors" + "github.com/go-kit/log" + "github.com/oklog/ulid/v2" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/runutil" + + "github.com/cortexproject/cortex/pkg/storage/bucket" +) + +const ( + NoConvertMarkerFileName = "parquet-no-convert-mark.json" + + CurrentNoConvertMarkVersion = NoConvertMarkVersion1 + NoConvertMarkVersion1 = 1 + + NoConvertReasonTooManyLabels = "too_many_labels" + NoConvertReasonMarkerExists = "marker_exists" +) + +type NoConvertMark struct { + Version int `json:"version"` + Reason string `json:"reason"` +} + +func ReadNoConvertMark(ctx context.Context, id ulid.ULID, userBkt objstore.InstrumentedBucket, logger log.Logger) (*NoConvertMark, error) { + markerPath := path.Join(id.String(), NoConvertMarkerFileName) + reader, err := userBkt.WithExpectedErrs(bucket.IsOneOfTheExpectedErrors(userBkt.IsAccessDeniedErr, userBkt.IsObjNotFoundErr)).Get(ctx, markerPath) + if err != nil { + if userBkt.IsObjNotFoundErr(err) || userBkt.IsAccessDeniedErr(err) { + return &NoConvertMark{}, nil + } + + return &NoConvertMark{}, err + } + defer runutil.CloseWithLogOnErr(logger, reader, "close parquet no-convert marker file reader") + + markerContent, err := io.ReadAll(reader) + if err != nil { + return &NoConvertMark{}, errors.Wrapf(err, "read file: %s", NoConvertMarkerFileName) + } + + marker := NoConvertMark{} + err = json.Unmarshal(markerContent, &marker) + return &marker, err +} + +func WriteNoConvertMark(ctx context.Context, id ulid.ULID, userBkt objstore.Bucket, labelNamesCount int, maxBlockLabelNames int) error { + noConvertMarker := NoConvertMark{ + Version: CurrentNoConvertMarkVersion, + Reason: fmt.Sprintf("%s: label_names_count=%d threshold=%d", NoConvertReasonTooManyLabels, labelNamesCount, maxBlockLabelNames), + } + noConvertMarkerPath := path.Join(id.String(), NoConvertMarkerFileName) + b, err := json.Marshal(noConvertMarker) + if err != nil { + return err + } + return userBkt.Upload(ctx, noConvertMarkerPath, bytes.NewReader(b)) +} + +func ValidNoConvertMarkVersion(version int) bool { + return version == NoConvertMarkVersion1 +} diff --git a/pkg/util/validation/exporter_test.go b/pkg/util/validation/exporter_test.go index 6067ed96067..1414d3f2bb5 100644 --- a/pkg/util/validation/exporter_test.go +++ b/pkg/util/validation/exporter_test.go @@ -101,6 +101,7 @@ func TestOverridesExporter_withConfig(t *testing.T) { cortex_overrides{limit_name="out_of_order_results_cache_ttl",user="tenant-a"} 0 cortex_overrides{limit_name="out_of_order_time_window",user="tenant-a"} 0 cortex_overrides{limit_name="parquet_converter_enabled",user="tenant-a"} 0 + cortex_overrides{limit_name="parquet_converter_max_block_label_names",user="tenant-a"} 0 cortex_overrides{limit_name="parquet_converter_tenant_shard_size",user="tenant-a"} 0 cortex_overrides{limit_name="parquet_max_fetched_chunk_bytes",user="tenant-a"} 0 cortex_overrides{limit_name="parquet_max_fetched_data_bytes",user="tenant-a"} 0 diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 019a5adc3ed..fc37e4a4054 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -241,9 +241,10 @@ type Limits struct { CompactorPartitionSeriesCount int64 `yaml:"compactor_partition_series_count" json:"compactor_partition_series_count"` // Parquet converter - ParquetConverterEnabled bool `yaml:"parquet_converter_enabled" json:"parquet_converter_enabled"` - ParquetConverterTenantShardSize float64 `yaml:"parquet_converter_tenant_shard_size" json:"parquet_converter_tenant_shard_size"` - ParquetConverterSortColumns []string `yaml:"parquet_converter_sort_columns" json:"parquet_converter_sort_columns"` + ParquetConverterEnabled bool `yaml:"parquet_converter_enabled" json:"parquet_converter_enabled"` + ParquetConverterTenantShardSize float64 `yaml:"parquet_converter_tenant_shard_size" json:"parquet_converter_tenant_shard_size"` + ParquetConverterSortColumns []string `yaml:"parquet_converter_sort_columns" json:"parquet_converter_sort_columns"` + ParquetConverterMaxBlockLabelNames int `yaml:"parquet_converter_max_block_label_names" json:"parquet_converter_max_block_label_names"` // This config doesn't have a CLI flag registered here because they're registered in // their own original config struct. S3SSEType string `yaml:"s3_sse_type" json:"s3_sse_type" doc:"nocli|description=S3 server-side encryption type. Required to enable server-side encryption overrides for a specific tenant. If not set, the default S3 client settings are used."` @@ -369,6 +370,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.Float64Var(&l.ParquetConverterTenantShardSize, "parquet-converter.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the parquet converter. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant. If the value is < 1 and > 0 the shard size will be a percentage of the total parquet converters.") f.BoolVar(&l.ParquetConverterEnabled, "parquet-converter.enabled", false, "If set, enables the Parquet converter to create the parquet files.") f.Var((*flagext.StringSlice)(&l.ParquetConverterSortColumns), "parquet-converter.sort-columns", "Additional label names for specific tenants to sort by after metric name, in order of precedence. These are applied during Parquet file generation.") + f.IntVar(&l.ParquetConverterMaxBlockLabelNames, "parquet-converter.max-block-label-names", 0, "[Experimental] Maximum number of distinct label names allowed in a TSDB block for parquet conversion. If exceeded, the converter writes a no-convert marker. 0 to disable.") // Parquet Queryable enforced limits. f.IntVar(&l.ParquetMaxFetchedRowCount, "querier.parquet-queryable.max-fetched-row-count", 0, "The maximum number of rows that can be fetched when querying parquet storage. Each row maps to a series in a parquet file. This limit applies before materializing chunks. 0 to disable.") @@ -1029,6 +1031,11 @@ func (o *Overrides) ParquetConverterSortColumns(userID string) []string { return o.GetOverridesForUser(userID).ParquetConverterSortColumns } +// ParquetConverterMaxBlockLabelNames returns the maximum number of distinct label names allowed in a TSDB block for parquet conversion. +func (o *Overrides) ParquetConverterMaxBlockLabelNames(userID string) int { + return o.GetOverridesForUser(userID).ParquetConverterMaxBlockLabelNames +} + // ParquetMaxFetchedRowCount returns the maximum number of rows that can be fetched when querying parquet storage. func (o *Overrides) ParquetMaxFetchedRowCount(userID string) int { return o.GetOverridesForUser(userID).ParquetMaxFetchedRowCount diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index a8c501c75f2..e1aca5ca9bd 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -5835,6 +5835,12 @@ "type": "boolean", "x-cli-flag": "parquet-converter.enabled" }, + "parquet_converter_max_block_label_names": { + "default": 0, + "description": "[Experimental] Maximum number of distinct label names allowed in a TSDB block for parquet conversion. If exceeded, the converter writes a no-convert marker. 0 to disable.", + "type": "number", + "x-cli-flag": "parquet-converter.max-block-label-names" + }, "parquet_converter_sort_columns": { "default": [], "description": "Additional label names for specific tenants to sort by after metric name, in order of precedence. These are applied during Parquet file generation.",