Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Changelog

## master / unreleased
* [FEATURE] Distributor: Add experimental `-distributor.num-query-workers` flag to use a goroutine worker pool for query fan-out calls to ingesters. Reuses pre-grown goroutine stacks to eliminate the `runtime.copystack` overhead (~8% CPU) observed on rulers with wide ingester fan-out. Falls back to spawning a new goroutine when no worker is available. #7623
* [CHANGE] Querier: Make query time range configurations per-tenant: `query_ingesters_within`, `query_store_after`, and `shuffle_sharding_ingesters_lookback_period`. Uses `model.Duration` instead of `time.Duration` to support serialization but has minimum unit of 1ms (nanoseconds/microseconds not supported). #7160
* [CHANGE] Cache: Setting `-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl` to 0 will disable the bucket-index cache. #7446
* [CHANGE] HA Tracker: Move `-distributor.ha-tracker.failover-timeout` from a global config to a per-tenant runtime config. The flag name and default value (30s) remain the same. #7481
* [FEATURE] Distributor: Add experimental `-distributor.num-query-workers` flag to use a goroutine worker pool for query fan-out calls to ingesters. Reuses pre-grown goroutine stacks to eliminate the `runtime.copystack` overhead (~8% CPU) observed on rulers with wide ingester fan-out. Falls back to spawning a new goroutine when no worker is available. #7623
Comment on lines -4 to +7

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix ordering

* [FEATURE] Ingester: Add experimental active series tracker that counts active series by configurable label matchers (including regex) per tenant and exposes `cortex_ingester_active_series_per_tracker` metric. Configured via `active_series_trackers` in runtime config overrides. #7476
* [FEATURE] Ingester: Add experimental head-only queried series metric. `cortex_ingester_queried_head_series` tracks unique series queried from head via HLL. Enabled via `-ingester.head-queried-series-metrics-enabled`. #7500
* [FEATURE] Ruler: Add per-tenant `ruler_alert_generator_url_template` runtime config option to customize alert generator URLs using Go templates. Includes a `jsonEscape` template function for safely embedding expressions in JSON-encoded URL parameters (e.g., Grafana Explore panes). Supports Grafana Explore, Perses, and other UIs. #7302
Expand Down Expand Up @@ -35,6 +35,7 @@
* [ENHANCEMENT] Distributor: Added `cortex_distributor_received_histogram_buckets` metric to track number of buckets in received native histogram samples before validation, per user. #7569
* [ENHANCEMENT] Distributor: Add `WrappedHistogram` with configurable size limit (`-validation.max-native-histogram-size-bytes`) to cap native histogram protobuf size before unmarshalling. #7570
* [ENHANCEMENT] Ingester: Add lazy regex evaluation on head postings cache miss. Defers expensive regex matchers on high-cardinality labels to per-series filtering when a selective equality matcher already narrows the result set. Configured via `-blocks-storage.expanded_postings_cache.head.lazy-matcher-max-cardinality` (disabled by default). #7553
* [ENHANCEMENT] Query Frontend: Improve the slow query log with `source`, `user_agent`, `engine_type`, `block_store_type`, and query stats fields to aid slow query diagnosis. #7601
* [ENHANCEMENT] Ring: Add ring metric to count number of duplicate tokens. #7626
* [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370
* [BUGFIX] Metrics Helper: Fix non-deterministic bucket order in merged histograms by sorting buckets after map iteration, matching Prometheus client library behavior. #7380
Expand Down
44 changes: 42 additions & 2 deletions pkg/frontend/transport/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
queryString = f.parseRequestQueryString(r, buf)
}
if shouldReportSlowQuery {
f.reportSlowQuery(r, queryString, queryResponseTime)
f.reportSlowQuery(r, queryString, queryResponseTime, source, stats)
if f.cfg.QueryStatsEnabled {
f.getOrCreateSlowQueryMetric().WithLabelValues(source, userID).Inc()
}
Expand Down Expand Up @@ -423,18 +423,58 @@ func (f *Handler) logQueryRequest(r *http.Request, queryString url.Values, sourc
}

// reportSlowQuery reports slow queries.
func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, queryResponseTime time.Duration) {
func (f *Handler) reportSlowQuery(r *http.Request, queryString url.Values, queryResponseTime time.Duration, source string, stats *querier_stats.QueryStats) {
logMessage := []any{
"msg", "slow query detected",
"method", r.Method,
"host", r.Host,
"path", r.URL.Path,
"source", source,
"time_taken", queryResponseTime.String(),
}

grafanaFields := formatGrafanaStatsFields(r)
if len(grafanaFields) > 0 {
logMessage = append(logMessage, grafanaFields...)
}

if userAgent := r.Header.Get("User-Agent"); len(userAgent) > 0 {
logMessage = append(logMessage, "user_agent", userAgent)
}
if engineType := r.Header.Get(engine.TypeHeader); len(engineType) > 0 {
logMessage = append(logMessage, "engine_type", engineType)
}
if blockStoreType := r.Header.Get(querier.BlockStoreTypeHeader); len(blockStoreType) > 0 {
logMessage = append(logMessage, "block_store_type", blockStoreType)
}
if wallTime := stats.LoadWallTime(); wallTime > 0 {
logMessage = append(logMessage, "query_wall_time_seconds", wallTime.Seconds())
}
if storageWallTime := stats.LoadQueryStorageWallTime(); storageWallTime > 0 {
logMessage = append(logMessage, "query_storage_wall_time_seconds", storageWallTime.Seconds())
}
if n := stats.LoadFetchedSeries(); n > 0 {
logMessage = append(logMessage, "fetched_series_count", n)
}
if n := stats.LoadFetchedChunks(); n > 0 {
logMessage = append(logMessage, "fetched_chunks_count", n)
}
if n := stats.LoadFetchedSamples(); n > 0 {
logMessage = append(logMessage, "fetched_samples_count", n)
}
if n := stats.LoadScannedSamples(); n > 0 {
logMessage = append(logMessage, "samples_scanned", n)
}
if n := stats.LoadFetchedChunkBytes(); n > 0 {
logMessage = append(logMessage, "fetched_chunks_bytes", n)
}
if n := stats.LoadFetchedDataBytes(); n > 0 {
logMessage = append(logMessage, "fetched_data_bytes", n)
}
if n := stats.LoadSplitQueries(); n > 0 {
logMessage = append(logMessage, "split_queries", n)
}

logMessage = append(logMessage, formatQueryString(queryString)...)

level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...)
Expand Down
91 changes: 91 additions & 0 deletions pkg/frontend/transport/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,97 @@ func TestReportQueryStatsFormat(t *testing.T) {
}
}

func TestReportSlowQueryFormat(t *testing.T) {
responseTime := time.Second

type testCase struct {
queryString url.Values
queryStats *querier_stats.QueryStats
header http.Header
source string
expectedLog string
}

tests := map[string]testCase{
"should log only base fields when stats and headers are empty": {
source: requestmeta.SourceAPI,
expectedLog: `level=info msg="slow query detected" method=GET host=localhost:8080 path=/prometheus/api/v1/query source=api time_taken=1s`,
},
"should log only base fields when stats is nil": {
source: requestmeta.SourceAPI,
queryStats: nil,
expectedLog: `level=info msg="slow query detected" method=GET host=localhost:8080 path=/prometheus/api/v1/query source=api time_taken=1s`,
},
"should include the query string at the end": {
source: requestmeta.SourceAPI,
queryString: url.Values(map[string][]string{"query": {"up"}}),
expectedLog: `level=info msg="slow query detected" method=GET host=localhost:8080 path=/prometheus/api/v1/query source=api time_taken=1s param_query=up`,
},
"should include grafana dashboard and panel id": {
source: requestmeta.SourceAPI,
header: http.Header{
"X-Dashboard-Uid": []string{"dashboard-1"},
"X-Panel-Id": []string{"panel-1"},
},
expectedLog: `level=info msg="slow query detected" method=GET host=localhost:8080 path=/prometheus/api/v1/query source=api time_taken=1s X-Dashboard-Uid=dashboard-1 X-Panel-Id=panel-1`,
},
"should include user agent, engine type and block store type headers": {
source: requestmeta.SourceAPI,
header: http.Header{
"User-Agent": []string{"Grafana"},
http.CanonicalHeaderKey(engine.TypeHeader): []string{string(engine.Thanos)},
http.CanonicalHeaderKey(querier.BlockStoreTypeHeader): []string{"parquet"},
},
expectedLog: `level=info msg="slow query detected" method=GET host=localhost:8080 path=/prometheus/api/v1/query source=api time_taken=1s user_agent=Grafana engine_type=thanos block_store_type=parquet`,
},
"should include query stats fields when set": {
source: requestmeta.SourceAPI,
queryStats: &querier_stats.QueryStats{
Stats: querier_stats.Stats{
WallTime: 3 * time.Second,
QueryStorageWallTime: 100 * time.Minute,
FetchedSeriesCount: 100,
FetchedChunksCount: 200,
FetchedSamplesCount: 300,
ScannedSamples: 400,
FetchedChunkBytes: 1024,
FetchedDataBytes: 2048,
SplitQueries: 10,
},
},
expectedLog: `level=info msg="slow query detected" method=GET host=localhost:8080 path=/prometheus/api/v1/query source=api time_taken=1s query_wall_time_seconds=3 query_storage_wall_time_seconds=6000 fetched_series_count=100 fetched_chunks_count=200 fetched_samples_count=300 samples_scanned=400 fetched_chunks_bytes=1024 fetched_data_bytes=2048 split_queries=10`,
},
"should not include query stats fields that are zero": {
source: requestmeta.SourceAPI,
queryStats: &querier_stats.QueryStats{
Stats: querier_stats.Stats{
WallTime: 3 * time.Second,
FetchedSeriesCount: 100,
},
},
expectedLog: `level=info msg="slow query detected" method=GET host=localhost:8080 path=/prometheus/api/v1/query source=api time_taken=1s query_wall_time_seconds=3 fetched_series_count=100`,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
outputBuf := bytes.NewBuffer(nil)
logger := log.NewSyncLogger(log.NewLogfmtLogger(outputBuf))
handler := NewHandler(HandlerConfig{QueryStatsEnabled: true}, tenantfederation.Config{}, http.DefaultTransport, logger, nil)

req, _ := http.NewRequest(http.MethodGet, "http://localhost:8080/prometheus/api/v1/query", nil)
req.Header = testData.header
req = req.WithContext(requestmeta.ContextWithRequestSource(context.Background(), testData.source))

handler.reportSlowQuery(req, testData.queryString, responseTime, testData.source, testData.queryStats)

data, err := io.ReadAll(outputBuf)
require.NoError(t, err)
require.Equal(t, testData.expectedLog+"\n", string(data))
})
}
}

func TestReportQueryStatsRejectionReason(t *testing.T) {
outputBuf := bytes.NewBuffer(nil)
logger := log.NewSyncLogger(log.NewLogfmtLogger(outputBuf))
Expand Down
Loading