Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077
* [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152
* [FEATURE] Ingester: Add experimental active series queried metric. #7173
* [FEATURE] StoreGateway: Add a flag `-blocks-storage.bucket-store.honor-projection-hints`. If enabled, Store Gateway in Parquet mode will honor projection hints and only materialize requested labels. #7206
* [FEATURE] Update prometheus Alertmanager version to v0.31.1 and add new integration to IncidentIO and Mattermost. #7092 #7267
* [FEATURE] Tenant Federation: Add experimental support for partial responses using the `-tenant-federation.allow-partial-data` flag. When enabled, failures from individual tenants during a federated query are treated as warnings, allowing results from successful tenants to be returned. #7232
* [FEATURE] Alertmanager: Add `-alertmanager.disable-replica-set-extension` flag to limit blast radius during config corruption incidents. #7153
Expand Down
6 changes: 6 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -2098,6 +2098,12 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl
[parquet_shard_cache_ttl: <duration> | default = 24h]

# [Experimental] If enabled, Store Gateway will honor projection hints and
# only materialize requested labels. It only takes effect when
# `-blocks-storage.bucket-store.bucket-store-type` is parquet.
# CLI flag: -blocks-storage.bucket-store.honor-projection-hints
[honor_projection_hints: <boolean> | default = false]

# Maximum number of concurrent goroutines per query applied at each level of
# parquet processing: shard querying, row group processing, and column
# materialization. Note: this limit is applied independently at each level,
Expand Down
6 changes: 6 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -2156,6 +2156,12 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl
[parquet_shard_cache_ttl: <duration> | default = 24h]

# [Experimental] If enabled, Store Gateway will honor projection hints and
# only materialize requested labels. It only takes effect when
# `-blocks-storage.bucket-store.bucket-store-type` is parquet.
# CLI flag: -blocks-storage.bucket-store.honor-projection-hints
[honor_projection_hints: <boolean> | default = false]

# Maximum number of concurrent goroutines per query applied at each level of
# parquet processing: shard querying, row group processing, and column
# materialization. Note: this limit is applied independently at each level,
Expand Down
6 changes: 6 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2735,6 +2735,12 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.parquet-shard-cache-ttl
[parquet_shard_cache_ttl: <duration> | default = 24h]

# [Experimental] If enabled, Store Gateway will honor projection hints and
# only materialize requested labels. It only takes effect when
# `-blocks-storage.bucket-store.bucket-store-type` is parquet.
# CLI flag: -blocks-storage.bucket-store.honor-projection-hints
[honor_projection_hints: <boolean> | default = false]

# Maximum number of concurrent goroutines per query applied at each level of
# parquet processing: shard querying, row group processing, and column
# materialization. Note: this limit is applied independently at each level, so
Expand Down
2 changes: 2 additions & 0 deletions docs/configuration/v1-guarantees.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ Currently experimental features are:
- Store Gateway Zone Stable Shuffle Sharding
- `-store-gateway.sharding-ring.zone-stable-shuffle-sharding` CLI flag
- `zone_stable_shuffle_sharding` (boolean) field in config file
- Store Gateway HonorProjectionHints in Parquet Mode
- `-blocks-storage.bucket-store.honor-projection-hints` CLI flag
- Basic Lifecycler (Storegateway, Alertmanager, Ruler) Final Sleep on shutdown, which tells the pod wait before shutdown, allowing a delay to propagate ring changes.
- `-ruler.ring.final-sleep` (duration) CLI flag
- `store-gateway.sharding-ring.final-sleep` (duration) CLI flag
Expand Down
241 changes: 241 additions & 0 deletions integration/parquet_store_gateway_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
//go:build integration_querier

package integration

import (
"context"
"fmt"
"math/rand"
"path/filepath"
"slices"
"testing"
"time"

"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"

"github.com/cortexproject/cortex/integration/e2e"
e2ecache "github.com/cortexproject/cortex/integration/e2e/cache"
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
"github.com/cortexproject/cortex/integration/e2ecortex"
"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/util/log"
cortex_testutil "github.com/cortexproject/cortex/pkg/util/test"
)

func TestParquetBucketStore_ProjectionHint(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

consul := e2edb.NewConsulWithName("consul")
minio := e2edb.NewMinio(9000, bucketName)
memcached := e2ecache.NewMemcached()
require.NoError(t, s.StartAndWaitReady(consul, minio, memcached))

// Define configuration flags.
flags := BlocksStorageFlags()
flags = mergeFlags(flags, map[string]string{
// Enable Thanos engine and projection optimization.
"-querier.thanos-engine": "true",
"-querier.optimizers": "projection",

// enable honor-projection-hints querier and store gateway
"-querier.honor-projection-hints": "true",
"-blocks-storage.bucket-store.honor-projection-hints": "true",
// enable Store Gateway Parquet mode
"-blocks-storage.bucket-store.bucket-store-type": "parquet",

// Set query-ingesters-within to 1h so queries older than 1h don't hit ingesters
"-limits.query-ingesters-within": "1h",

// Configure Parquet Converter
"-parquet-converter.enabled": "true",
"-parquet-converter.conversion-interval": "1s",
"-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(),
"-compactor.block-ranges": "1ms,12h",
// Enable cache
"-blocks-storage.bucket-store.parquet-labels-cache.backend": "inmemory,memcached",
"-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
"-blocks-storage.bucket-store.sync-interval": "1s",

// Compactor
"-compactor.cleanup-interval": "1s", // to update bucket index quickly
})

// Store Gateway
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(storeGateway))

// Parquet Converter
parquetConverter := e2ecortex.NewParquetConverter("parquet-converter", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(parquetConverter))

// Querier (honor projection hints enabled on the querier side)
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-querier.store-gateway-addresses": storeGateway.NetworkGRPCEndpoint(),
}), "")
require.NoError(t, s.StartAndWaitReady(querier))

// Querier with honor-projection-hints disabled on the querier side. Even though the querier
// does not honor projection hints itself, the hints are still propagated to the Store Gateway.
querierHonorOff := e2ecortex.NewQuerier("querier-honor-off", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
"-querier.store-gateway-addresses": storeGateway.NetworkGRPCEndpoint(),
"-querier.honor-projection-hints": "false",
}), "")
require.NoError(t, s.StartAndWaitReady(querierHonorOff))

require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, querierHonorOff.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))

// Create block
now := time.Now()
// Time range: [Now - 24h] to [Now - 20h]
start := now.Add(-24 * time.Hour)
end := now.Add(-20 * time.Hour)

ctx := context.Background()

rnd := rand.New(rand.NewSource(time.Now().Unix()))
dir := filepath.Join(s.SharedDir(), "data")
scrapeInterval := time.Minute
statusCodes := []string{"200", "400", "404", "500", "502"}
methods := []string{"GET", "POST", "PUT", "DELETE"}

numSeries := 10
numSamples := 100

lbls := make([]labels.Labels, 0, numSeries)
for i := 0; i < numSeries; i++ {
lbls = append(lbls, labels.FromStrings(
labels.MetricName, "http_requests_total",
"job", "api-server",
"instance", fmt.Sprintf("instance-%d", i),
"status_code", statusCodes[i%len(statusCodes)],
"method", methods[i%len(methods)],
"path", fmt.Sprintf("/api/v1/endpoint%d", i%3),
"cluster", "test-cluster",
))
}

id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10)
require.NoError(t, err)

storage, err := e2ecortex.NewS3ClientForMinio(minio, bucketName)
require.NoError(t, err)
bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil)

// Upload TSDB Block
require.NoError(t, block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc))

// Start compactor to create the bucket index.
compactor := e2ecortex.NewCompactor("compactor", consul.NetworkHTTPEndpoint(), flags, "")
require.NoError(t, s.StartAndWaitReady(compactor))

// Wait until parquet converter convert block
require.NoError(t, parquetConverter.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_parquet_converter_blocks_converted_total"}, e2e.WaitMissingMetrics))

// Create clients for both queriers (honor enabled / disabled on the querier side).
c, err := e2ecortex.NewClient("", querier.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)
cHonorOff, err := e2ecortex.NewClient("", querierHonorOff.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

cortex_testutil.Poll(t, 60*time.Second, true, func() interface{} {
labelSets, err := c.Series([]string{`{job="api-server"}`}, start, end)
if err != nil {
t.Logf("Series query failed: %v", err)
return false
}
return len(labelSets) > 0
})

testCases := []struct {
name string
query string
expectedLabels []string // query result should contain these labels
}{
{
name: "vector selector query",
query: `http_requests_total`,
expectedLabels: []string{
"__name__", "job", "instance", "status_code", "method", "path", "cluster",
},
},
{
name: "simple_sum_by_job",
query: `sum by (job) (http_requests_total)`,
expectedLabels: []string{"job"},
},
{
name: "rate_with_aggregation",
query: `sum by (method) (rate(http_requests_total[5m]))`,
expectedLabels: []string{"method"},
},
{
name: "multiple_grouping_labels",
query: `sum by (job, status_code) (http_requests_total)`,
expectedLabels: []string{"job", "status_code"},
},
{
name: "aggregation without query",
query: `sum without (instance, method) (http_requests_total)`,
expectedLabels: []string{"job", "status_code", "path", "cluster"},
},
}
// Run the same projection assertions against both queriers. The querier with
// honor-projection-hints disabled must produce identical results because the hints are
// still propagated to (and honored by) the Store Gateway.
clients := []struct {
name string
client *e2ecortex.Client
}{
{name: "querier_honor_on", client: c},
{name: "querier_honor_off", client: cHonorOff},
}

for _, qc := range clients {
t.Run(qc.name, func(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
t.Logf("Testing: %s", tc.query)

// Execute instant query
result, err := qc.client.Query(tc.query, end)
require.NoError(t, err)
require.NotNil(t, result)

// Verify we got results
vector, ok := result.(model.Vector)
require.True(t, ok, "result should be a vector")
require.NotEmpty(t, vector, "query should return results")

for _, sample := range vector {
actualLabels := make(map[string]struct{})
for label := range sample.Metric {
actualLabels[string(label)] = struct{}{}
}

// Check that all expected labels are present
for _, expectedLabel := range tc.expectedLabels {
_, ok := actualLabels[expectedLabel]
require.True(t, ok,
"series should have %s label", expectedLabel)
}

// Check that no unexpected labels are present
for lbl := range actualLabels {
if !slices.Contains(tc.expectedLabels, lbl) {
require.Fail(t, "series should not have unexpected label: %s", lbl)
}
}
}
})
}
})
}
}
4 changes: 3 additions & 1 deletion pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ type BucketStoreConfig struct {
// Token bucket configs
TokenBucketBytesLimiter TokenBucketBytesLimiterConfig `yaml:"token_bucket_bytes_limiter"`
// Parquet shard cache config
ParquetShardCache parquetutil.CacheConfig `yaml:",inline"`
ParquetShardCache parquetutil.CacheConfig `yaml:",inline"`
HonorProjectionHints bool `yaml:"honor_projection_hints"`

// ParquetQueryConcurrency controls the maximum number of concurrent goroutines
// per query at each level of parquet processing: shard querying, row group
Expand Down Expand Up @@ -406,6 +407,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MatchersCacheMaxItems, "blocks-storage.bucket-store.matchers-cache-max-items", 0, "Maximum number of entries in the regex matchers cache. 0 to disable.")
f.IntVar(&cfg.ParquetQueryConcurrency, "blocks-storage.bucket-store.parquet-query-concurrency", 4, "Maximum number of concurrent goroutines per query applied at each level of parquet processing: shard querying, row group processing, and column materialization. Note: this limit is applied independently at each level, so the total goroutines per query can grow multiplicatively (up to N^3 in the worst case).")
cfg.ParquetShardCache.RegisterFlagsWithPrefix("blocks-storage.bucket-store.", f)
f.BoolVar(&cfg.HonorProjectionHints, "blocks-storage.bucket-store.honor-projection-hints", false, "[Experimental] If enabled, Store Gateway will honor projection hints and only materialize requested labels. It only takes effect when `-blocks-storage.bucket-store.bucket-store-type` is parquet.")
}

// Validate the config.
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/bucket_stores_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func benchmarkBatching(b *testing.B, client storepb.StoreClient, userID string,
}
}

func generateBenchmarkBlock(b *testing.B, storageDir, userID string, numSeries, numSamples int) {
func generateBenchmarkBlock(b testing.TB, storageDir, userID string, numSeries, numSamples int) {
userDir := filepath.Join(storageDir, userID)
if err := os.MkdirAll(userDir, os.ModePerm); err != nil {
b.Fatal(err)
Expand Down
Loading
Loading