Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* [ENHANCEMENT] Upgrade prometheus alertmanager version to v0.32.1. #7462
* [ENHANCEMENT] Tenant Federation: Avoid purging the regex resolver LRU cache on user-sync ticks when the set of known users has not changed. #7489
* [ENHANCEMENT] Memberlist: Add `-memberlist.packet-read-timeout`, `-memberlist.max-packet-size`, and `-memberlist.max-concurrent-connections` flags to bound inbound gossip TCP connections, preventing slow-read, OOM, and connection-flood attacks on the gossip port. #7518
* [ENHANCEMENT] Parquet Converter: Add `parquet-converter.max-block-label-names` limit to skip conversion of TSDB blocks with too many label names. #7625
* [ENHANCEMENT] Parquet Converter: Add a ring status page to expose the ring status. #7455
* [ENHANCEMENT] Parquet: Add a row ranges cache for parquet query filtering in querier and store-gateway. #7478
* [ENHANCEMENT] Ingester: Add WAL record metrics to help evaluate the effectiveness of WAL compression type (e.g. snappy, zstd): `cortex_ingester_tsdb_wal_record_part_writes_total`, `cortex_ingester_tsdb_wal_record_parts_bytes_written_total`, and `cortex_ingester_tsdb_wal_record_bytes_saved_total`. #7420
Expand Down
6 changes: 6 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4779,6 +4779,12 @@ query_rejection:
# CLI flag: -parquet-converter.sort-columns
[parquet_converter_sort_columns: <list of string> | default = []]

# [Experimental] Maximum number of distinct label names allowed in a TSDB block
# for parquet conversion. If exceeded, the converter writes a no-convert marker.
# 0 to disable.
# CLI flag: -parquet-converter.max-block-label-names
[parquet_converter_max_block_label_names: <int> | default = 0]

# S3 server-side encryption type. Required to enable server-side encryption
# overrides for a specific tenant. If not set, the default S3 client settings
# are used.
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,3 +150,4 @@ Currently experimental features are:
- `-ingester.head-queried-series-metrics-windows` time windows to report (default: 2h)
- `-ingester.head-queried-series-metrics-window-duration` HLL sub-window size
- `-ingester.head-queried-series-metrics-sample-rate` query sampling rate
- Parquet Converter: `-parquet-converter.max-block-label-names` (int) - If enabled, adds a no-convert mark and skips blocks with too many labels.
142 changes: 142 additions & 0 deletions integration/parquet_converter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
//go:build integration

package integration

import (
"context"
"fmt"
"math/rand"
"path/filepath"
"testing"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"

"github.com/cortexproject/cortex/integration/e2e"
e2ecache "github.com/cortexproject/cortex/integration/e2e/cache"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util/log"
cortex_testutil "github.com/cortexproject/cortex/pkg/util/test"
)

func TestParquetConverter_NoConvertMarkWithTooManyLabels(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

consul := e2edb.NewConsulWithName("consul")
memcached := e2ecache.NewMemcached()
require.NoError(t, s.StartAndWaitReady(consul, memcached))

baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
flags := mergeFlags(
baseFlags,
map[string]string{
"-target": "all,parquet-converter",
"-blocks-storage.tsdb.block-ranges-period": "1m,24h",
"-blocks-storage.tsdb.ship-interval": "1s",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl": "1s",
"-blocks-storage.bucket-store.bucket-index.idle-timeout": "1s",
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
// compactor
"-compactor.cleanup-interval": "1s",
// Ingester.
"-ring.store": "consul",
"-consul.hostname": consul.NetworkHTTPEndpoint(),
// Distributor.
"-distributor.replication-factor": "1",
// Store-gateway.
"-store-gateway.sharding-enabled": "false",
"--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways
// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
// Enable vertical sharding.
"-frontend.query-vertical-shard-size": "3",
"-frontend.max-cache-freshness": "1m",
// enable experimental promQL funcs
"-querier.enable-promql-experimental-functions": "true",
// parquet-converter
"-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(),
"-parquet-converter.conversion-interval": "1s",
"-parquet-converter.enabled": "true",
"-parquet-converter.max-block-label-names": "1",
// Querier
"-querier.enable-parquet-queryable": "true",
// Enable cache for parquet labels and chunks
"-blocks-storage.bucket-store.parquet-labels-cache.backend": "inmemory,memcached",
"-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
"-blocks-storage.bucket-store.chunks-cache.backend": "inmemory,memcached",
"-blocks-storage.bucket-store.chunks-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
},
)

// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

ctx := context.Background()
rnd := rand.New(rand.NewSource(time.Now().Unix()))
dir := filepath.Join(s.SharedDir(), "data")
lbls := []labels.Labels{
labels.FromStrings("__name__", "test_series_a", "job", "test"),
}

numSamples := 60
scrapeInterval := time.Minute
now := time.Now()
start := now.Add(-time.Hour * 24)
end := now.Add(-time.Hour)

minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
require.NoError(t, s.StartAndWaitReady(cortex))
storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, err)
bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil)

id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples,
start.UnixMilli(),
end.UnixMilli(),
scrapeInterval.Milliseconds(), 10,
)
require.NoError(t, err)

err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc)
require.NoError(t, err)

// Wait for the converter to write the no-convert marker
cortex_testutil.Poll(t, 30*time.Second, true, func() interface{} {
noConvertMarkerPath := fmt.Sprintf("%s/parquet-no-convert-mark.json", id.String())
found := false
err := bkt.Iter(ctx, "", func(name string) error {
if name == noConvertMarkerPath {
found = true
}
return nil
}, objstore.WithRecursiveIter())
require.NoError(t, err)
return found
})

// confirm the conversion did not happen (check both paths)
blockID := id.String()
markerPaths := []string{
fmt.Sprintf("%s/parquet-converter-mark.json", blockID),
fmt.Sprintf("parquet-markers/%s-parquet-converter-mark.json", blockID),
}
for _, markerPath := range markerPaths {
exists, err := bkt.Exists(ctx, markerPath)
require.NoError(t, err)
require.False(t, exists, "converter mark should not exist at %s", markerPath)
}
}
65 changes: 64 additions & 1 deletion pkg/parquetconverter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ const (
ringKey = "parquet-converter"

converterMetaPrefix = "converter-meta-"

parquetConverterDataColumnDuration = time.Hour * 8
parquetConverterSystemColumnCount = 2 // s_col_indexes and s_series_hash.
)

var RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)
Expand Down Expand Up @@ -139,7 +142,7 @@ func newConverter(cfg Config, bkt objstore.InstrumentedBucket, storageCfg cortex
metrics: newMetrics(registerer),
bkt: bkt,
baseConverterOptions: []convert.ConvertOption{
convert.WithColDuration(time.Hour * 8),
convert.WithColDuration(parquetConverterDataColumnDuration),
convert.WithRowGroupSize(cfg.MaxRowsPerRowGroup),
},
}
Expand Down Expand Up @@ -396,6 +399,20 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
continue
}

configuredMaxBlockLabelNames := c.limits.ParquetConverterMaxBlockLabelNames(userID)
maxBlockLabelNames := effectiveMaxBlockLabelNames(configuredMaxBlockLabelNames, b.MinTime, b.MaxTime)

noConvertMark, err := cortex_parquet.ReadNoConvertMark(ctx, b.ULID, uBucket, logger)
if err != nil {
level.Error(logger).Log("msg", "failed to read parquet no-convert marker", "block", b.ULID.String(), "err", err)
continue
}
if cortex_parquet.ValidNoConvertMarkVersion(noConvertMark.Version) {
level.Debug(logger).Log("msg", "skipping block, no-convert marker already exists", "block", b.ULID.String())
c.metrics.skippedBlocks.WithLabelValues(userID, cortex_parquet.NoConvertReasonMarkerExists).Inc()
continue
}

if err := os.RemoveAll(c.compactRootDir()); err != nil {
level.Error(logger).Log("msg", "failed to remove work directory", "path", c.compactRootDir(), "err", err)
if c.checkConvertError(userID, err) {
Expand Down Expand Up @@ -425,6 +442,33 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
continue
}

if configuredMaxBlockLabelNames > 0 {
labelNames, err := tsdbBlock.LabelNames(ctx)
if err != nil {
_ = tsdbBlock.Close()
level.Error(logger).Log("msg", "failed to get label names", "block", b.ULID.String(), "err", err)
if c.checkConvertError(userID, err) {
return err
}
continue
}
labelNamesCount := len(labelNames)
if labelNamesCount > maxBlockLabelNames {
if err := cortex_parquet.WriteNoConvertMark(ctx, b.ULID, uBucket, labelNamesCount, maxBlockLabelNames); err != nil {
_ = tsdbBlock.Close()
level.Error(logger).Log("msg", "failed to write parquet no-convert marker", "block", b.ULID.String(), "err", err)
if c.checkConvertError(userID, err) {
return err
}
continue
}
level.Debug(logger).Log("msg", "skipping parquet conversion for block with too many label names", "block", b.ULID.String(), "label_names", labelNamesCount, "limit", maxBlockLabelNames)
c.metrics.skippedBlocks.WithLabelValues(userID, cortex_parquet.NoConvertReasonTooManyLabels).Inc()
_ = tsdbBlock.Close()
continue
}
}

level.Info(logger).Log("msg", "converting block", "block", b.ULID.String(), "dir", bdir)
start := time.Now()

Expand Down Expand Up @@ -486,6 +530,25 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
return nil
}

func effectiveMaxBlockLabelNames(configuredMaxBlockLabelNames int, mint, maxt int64) int {
if configuredMaxBlockLabelNames <= 0 {
return configuredMaxBlockLabelNames
}

dataColumnCount := 0
if maxt >= mint {
dataColumnCount = int((maxt-mint)/parquetConverterDataColumnDuration.Milliseconds()) + 1
}

// Reserve for s_col_indexes, s_series_hash, and generated s_data_* columns.
maxBlockLabelNames := max(parquet.MaxColumnIndex-parquetConverterSystemColumnCount-dataColumnCount, 0)

if configuredMaxBlockLabelNames > maxBlockLabelNames {
return maxBlockLabelNames
}
return configuredMaxBlockLabelNames
}

func (c *Converter) checkConvertError(userID string, err error) (terminate bool) {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || c.isCausedByPermissionDenied(err) {
terminate = true
Expand Down
Loading
Loading