Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
* [ENHANCEMENT] Distributor: Add `WrappedHistogram` with configurable size limit (`-validation.max-native-histogram-size-bytes`) to cap native histogram protobuf size before unmarshalling. #7570
* [ENHANCEMENT] Ingester: Add lazy regex evaluation on head postings cache miss. Defers expensive regex matchers on high-cardinality labels to per-series filtering when a selective equality matcher already narrows the result set. Configured via `-blocks-storage.expanded_postings_cache.head.lazy-matcher-max-cardinality` (disabled by default). #7553
* [ENHANCEMENT] Ring: Add ring metric to count number of duplicate tokens. #7626
* [ENHANCEMENT] Ring: Cache `ShuffleShardWithLookback` subrings. The cached entry is invalidated on topology change or once `now` reaches the earliest `RegisteredTimestamp + lookbackPeriod` of any included instance. #7628
* [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370
* [BUGFIX] Metrics Helper: Fix non-deterministic bucket order in merged histograms by sorting buckets after map iteration, matching Prometheus client library behavior. #7380
* [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389
Expand Down
96 changes: 66 additions & 30 deletions pkg/ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ type Ring struct {

// Cache of shuffle-sharded subrings per identifier. Invalidated when topology changes.
// If set to nil, no caching is done (used by tests, and subrings).
shuffledSubringCache map[subringCacheKey]*Ring
shuffledSubringCache map[subringCacheKey]*cachedSubring

memberOwnershipGaugeVec *prometheus.GaugeVec
numMembersGaugeVec *prometheus.GaugeVec
Expand All @@ -224,6 +224,20 @@ type subringCacheKey struct {
shardSize int

zoneStableSharding bool

// lookbackPeriod distinguishes subrings built with ShuffleShardWithLookback (>0) from
// plain shuffle-shard subrings (0).
lookbackPeriod time.Duration
}

// cachedSubring is a cached shuffle-shard subring together with its cache metadata, kept separate
// from the Ring so that ring data and cache bookkeeping aren't mixed.
type cachedSubring struct {
ring *Ring

// shuffleShardExpiry is the time at which the subring's membership would next change because
// an instance falls out of the lookback window. Zero means no time-based expiry.
shuffleShardExpiry time.Time
}

// New creates a new Ring. Being a service, Ring needs to be started to do anything.
Expand Down Expand Up @@ -254,7 +268,7 @@ func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client
KVClient: store,
strategy: strategy,
ringDesc: &Desc{},
shuffledSubringCache: map[subringCacheKey]*Ring{},
shuffledSubringCache: map[subringCacheKey]*cachedSubring{},
memberOwnershipGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "ring_member_ownership_percent",
Help: "The percent ownership of the ring by member",
Expand Down Expand Up @@ -347,7 +361,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
r.ringDesc = ringDesc
if rc == EqualButReadOnly && r.shuffledSubringCache != nil {
// Invalidate all cached subrings.
r.shuffledSubringCache = make(map[subringCacheKey]*Ring)
r.shuffledSubringCache = make(map[subringCacheKey]*cachedSubring)
}
r.updateRingMetrics(rc)
r.mtx.Unlock()
Expand All @@ -373,7 +387,7 @@ func (r *Ring) updateRingState(ringDesc *Desc) {
r.lastTopologyChange = now
if r.shuffledSubringCache != nil {
// Invalidate all cached subrings.
r.shuffledSubringCache = make(map[subringCacheKey]*Ring)
r.shuffledSubringCache = make(map[subringCacheKey]*cachedSubring)
}
r.updateRingMetrics(rc)
}
Expand Down Expand Up @@ -755,11 +769,11 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) {
// - Shuffling: probabilistically, for a large enough cluster each identifier gets a different
// set of instances, with a reduced number of overlapping instances between two identifiers.
func (r *Ring) ShuffleShard(identifier string, size int) ReadRing {
return r.shuffleShardWithCache(identifier, size, false)
return r.shuffleShardWithCache(identifier, size, 0, time.Now(), false)
}

func (r *Ring) ShuffleShardWithZoneStability(identifier string, size int) ReadRing {
return r.shuffleShardWithCache(identifier, size, true)
return r.shuffleShardWithCache(identifier, size, 0, time.Now(), true)
}

// ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes all instances
Expand All @@ -768,34 +782,35 @@ func (r *Ring) ShuffleShardWithZoneStability(identifier string, size int) ReadRi
// The returned subring may be unbalanced with regard to zones and should never be used for write
// operations (read only).
//
// This function doesn't support caching.
// The returned subring is cached and reused until its membership would change: either when an
// instance ages out of the lookback window (a per-entry expiry) or on a topology change.
func (r *Ring) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing {
// Nothing to do if the shard size is not smaller than the actual ring.
if size <= 0 || r.InstancesCount() <= size {
return r
}

return r.shuffleShard(identifier, size, lookbackPeriod, now, false)
return r.shuffleShardWithCache(identifier, size, lookbackPeriod, now, false)
}

func (r *Ring) shuffleShardWithCache(identifier string, size int, zoneStableSharding bool) ReadRing {
func (r *Ring) shuffleShardWithCache(identifier string, size int, lookbackPeriod time.Duration, now time.Time, zoneStableSharding bool) ReadRing {
// Nothing to do if the shard size is not smaller than the actual ring.
if size <= 0 || r.InstancesCount() <= size {
return r
}

if cached := r.getCachedShuffledSubring(identifier, size, zoneStableSharding); cached != nil {
if cached := r.getCachedShuffledSubring(identifier, size, lookbackPeriod, now, zoneStableSharding); cached != nil {
return cached
}

result := r.shuffleShard(identifier, size, 0, time.Now(), zoneStableSharding)
result, expiry := r.shuffleShard(identifier, size, lookbackPeriod, now, zoneStableSharding)

r.setCachedShuffledSubring(identifier, size, zoneStableSharding, result)
r.setCachedShuffledSubring(identifier, size, lookbackPeriod, zoneStableSharding, result, expiry)
return result
}

func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Duration, now time.Time, zoneStableSharding bool) *Ring {
func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Duration, now time.Time, zoneStableSharding bool) (*Ring, time.Time) {
lookbackUntil := now.Add(-lookbackPeriod).Unix()
lookbackInSeconds := int64(lookbackPeriod / time.Second)

// Earliest time (unix seconds) the subring's membership changes because a lookback-included
// instance ages out of the window. Zero means it has no time-based expiry.
var minLookbackExpiry int64

r.mtx.RLock()
defer r.mtx.RUnlock()
Expand Down Expand Up @@ -877,7 +892,15 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
// then we should include it in the subring but continuing selecting instances.
// If an instance is in READONLY we should always extend. The write path will filter it out when GetRing.
// The read path should extend to get new ingester used on write
if (lookbackPeriod > 0 && instance.RegisteredTimestamp >= lookbackUntil) || instance.State == READONLY {
withinLookback := lookbackPeriod > 0 && instance.RegisteredTimestamp >= lookbackUntil
if withinLookback {
// Track when this instance will leave the lookback window; that's the earliest
// point at which the cached subring's membership would change.
if expiry := instance.RegisteredTimestamp + lookbackInSeconds; minLookbackExpiry == 0 || expiry < minLookbackExpiry {
minLookbackExpiry = expiry
}
}
if withinLookback || instance.State == READONLY {
continue
}

Expand All @@ -898,6 +921,11 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
shardDesc := &Desc{Ingesters: shard}
shardTokensByZone := shardDesc.getTokensByZone()

var shuffleShardExpiry time.Time
if minLookbackExpiry > 0 {
shuffleShardExpiry = time.Unix(minLookbackExpiry, 0)
}

return &Ring{
cfg: r.cfg,
strategy: r.strategy,
Expand All @@ -912,9 +940,9 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur
// with the subring.
ringInstanceByToken: r.ringInstanceByToken,

// For caching to work, remember these values.
// For caching to work, remember this value.
lastTopologyChange: r.lastTopologyChange,
}
}, shuffleShardExpiry
}

// GetInstanceState returns the current state of an instance or an error if the
Expand Down Expand Up @@ -951,7 +979,7 @@ func (r *Ring) HasInstance(instanceID string) bool {
return ok
}

func (r *Ring) getCachedShuffledSubring(identifier string, size int, zoneStableSharding bool) *Ring {
func (r *Ring) getCachedShuffledSubring(identifier string, size int, lookbackPeriod time.Duration, now time.Time, zoneStableSharding bool) *Ring {
if r.cfg.SubringCacheDisabled {
return nil
}
Expand All @@ -960,26 +988,34 @@ func (r *Ring) getCachedShuffledSubring(identifier string, size int, zoneStableS
defer r.mtx.RUnlock()

// if shuffledSubringCache map is nil, reading it returns default value (nil pointer).
cached := r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding}]
cached := r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding, lookbackPeriod: lookbackPeriod}]
if cached == nil {
return nil
}

cached.mtx.Lock()
defer cached.mtx.Unlock()
// For lookback subrings the cached membership is only valid until the earliest instance
// leaves the lookback window. Once "now" reaches that point, treat it as a cache miss so
// the caller recomputes.
if !cached.shuffleShardExpiry.IsZero() && !now.Before(cached.shuffleShardExpiry) {
return nil
}

subring := cached.ring
subring.mtx.Lock()
defer subring.mtx.Unlock()

// Update instance states and timestamps. We know that the topology is the same,
// so zones and tokens are equal.
for name, cachedIng := range cached.ringDesc.Ingesters {
for name, cachedIng := range subring.ringDesc.Ingesters {
ing := r.ringDesc.Ingesters[name]
cachedIng.State = ing.State
cachedIng.Timestamp = ing.Timestamp
cached.ringDesc.Ingesters[name] = cachedIng
subring.ringDesc.Ingesters[name] = cachedIng
}
return cached
return subring
}

func (r *Ring) setCachedShuffledSubring(identifier string, size int, zoneStableSharding bool, subring *Ring) {
func (r *Ring) setCachedShuffledSubring(identifier string, size int, lookbackPeriod time.Duration, zoneStableSharding bool, subring *Ring, shuffleShardExpiry time.Time) {
if subring == nil || r.cfg.SubringCacheDisabled {
return
}
Expand All @@ -991,7 +1027,7 @@ func (r *Ring) setCachedShuffledSubring(identifier string, size int, zoneStableS
// (which can happen between releasing the read lock and getting read-write lock).
// Note that shuffledSubringCache can be only nil when set by test.
if r.shuffledSubringCache != nil && r.lastTopologyChange.Equal(subring.lastTopologyChange) {
r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding}] = subring
r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding, lookbackPeriod: lookbackPeriod}] = &cachedSubring{ring: subring, shuffleShardExpiry: shuffleShardExpiry}
}
}

Expand Down
126 changes: 121 additions & 5 deletions pkg/ring/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2666,7 +2666,8 @@ func TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy(t *testing.T) {
currTime := time.Now().Add(lookbackPeriod).Add(time.Minute)

// Add the initial shard to the history.
rs, err := ring.shuffleShard(userID, shardSize, 0, time.Now(), enableStableSharding).GetReplicationSetForOperation(Read)
subRing, _ := ring.shuffleShard(userID, shardSize, 0, time.Now(), enableStableSharding)
rs, err := subRing.GetReplicationSetForOperation(Read)
require.NoError(t, err)

history := map[time.Time]ReplicationSet{
Expand Down Expand Up @@ -2732,7 +2733,8 @@ func TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy(t *testing.T) {
}

// Add the current shard to the history.
rs, err = ring.shuffleShard(userID, shardSize, 0, time.Now(), enableStableSharding).GetReplicationSetForOperation(Read)
subRing, _ := ring.shuffleShard(userID, shardSize, 0, time.Now(), enableStableSharding)
rs, err = subRing.GetReplicationSetForOperation(Read)
require.NoError(t, err)
history[currTime] = rs

Expand Down Expand Up @@ -2808,7 +2810,7 @@ func benchmarkShuffleSharding(b *testing.B, numInstances, numZones, numTokens, s
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
shuffledSubringCache: map[subringCacheKey]*Ring{},
shuffledSubringCache: map[subringCacheKey]*cachedSubring{},
strategy: NewDefaultReplicationStrategy(),
lastTopologyChange: time.Now(),
KVClient: &MockClient{},
Expand All @@ -2835,7 +2837,7 @@ func BenchmarkRing_Get(b *testing.B) {
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
shuffledSubringCache: map[subringCacheKey]*Ring{},
shuffledSubringCache: map[subringCacheKey]*cachedSubring{},
strategy: NewDefaultReplicationStrategy(),
lastTopologyChange: time.Now(),
KVClient: &MockClient{},
Expand All @@ -2862,7 +2864,7 @@ func TestRing_Get_NoMemoryAllocations(t *testing.T) {
ringTokensByZone: ringDesc.getTokensByZone(),
ringInstanceByToken: ringDesc.getTokensInfo(),
ringZones: getZones(ringDesc.getTokensByZone()),
shuffledSubringCache: map[subringCacheKey]*Ring{},
shuffledSubringCache: map[subringCacheKey]*cachedSubring{},
strategy: NewDefaultReplicationStrategy(),
lastTopologyChange: time.Now(),
KVClient: &MockClient{},
Expand Down Expand Up @@ -3165,6 +3167,120 @@ func TestShuffleShardWithCaching(t *testing.T) {
require.False(t, subring == newSubring)
}

func TestShuffleShardWithLookbackCaching(t *testing.T) {
inmem, closer := consul.NewInMemoryClientWithConfig(GetCodec(), consul.Config{
MaxCasRetries: 20,
CasRetryDelay: 100 * time.Millisecond,
}, log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })

cfg := Config{
KVStore: kv.Config{Mock: inmem},
HeartbeatTimeout: 1 * time.Minute,
ReplicationFactor: 3,
ZoneAwarenessEnabled: true,
}

ring, err := New(cfg, "test", "test", log.NewNopLogger(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ring))
t.Cleanup(func() {
_ = services.StopAndAwaitTerminated(context.Background(), ring)
})

const numLifecyclers = 6
const zones = 3

lcs := []*Lifecycler(nil)
for i := range numLifecyclers {
lc := startLifecycler(t, cfg, 500*time.Millisecond, i, zones)

lcs = append(lcs, lc)
}

// Wait until all instances in the ring are ACTIVE.
test.Poll(t, 5*time.Second, numLifecyclers, func() any {
active := 0
rs, _ := ring.GetReplicationSetForOperation(Read)
for _, ing := range rs.Instances {
if ing.State == ACTIVE {
active++
}
}
return active
})

const (
shardSize = zones
user = "user"
lookbackPeriod = time.Hour
)

// All instances were registered just now, so with an hour lookback they are all within the
// lookback window and the resulting subring carries a non-zero expiry.
now := time.Now()

// This lookback subring should be cached and reused while now is before its expiry.
subring := ring.ShuffleShardWithLookback(user, shardSize, lookbackPeriod, now)

// Repeated calls with now still before the expiry reuse the cached subring.
const iters = 100
sleep := (2 * time.Second) / iters
for range iters {
newSubring := ring.ShuffleShardWithLookback(user, shardSize, lookbackPeriod, time.Now())
require.True(t, subring == newSubring, "cached lookback subring reused before expiry")
time.Sleep(sleep)
}

// On a cache hit the subring still has up-to-date instance timestamps.
{
rs, err := subring.GetReplicationSetForOperation(Read)
require.NoError(t, err)

nowTs := time.Now()
for _, ing := range rs.Instances {
// Lifecyclers use 500ms refresh, but timestamps use 1s resolution, so give it some buffer.
assert.InDelta(t, nowTs.UnixNano(), time.Unix(ing.Timestamp, 0).UnixNano(), float64(2*time.Second.Nanoseconds()))
}
}

// Advancing now past the lookback window forces a recompute: instances registered ~now are no
// longer within the window, so the subring shrinks and a new one is returned.
subringAfterExpiry := ring.ShuffleShardWithLookback(user, shardSize, lookbackPeriod, now.Add(lookbackPeriod+time.Minute))
require.False(t, subring == subringAfterExpiry, "expired lookback subring is recomputed")
require.NotEqual(t, subring.InstancesCount(), subringAfterExpiry.InstancesCount())

// The recomputed subring has no instances within lookback, so no time-based expiry, and is reused.
subring = subringAfterExpiry
newSubring := ring.ShuffleShardWithLookback(user, shardSize, lookbackPeriod, now.Add(lookbackPeriod+2*time.Minute))
require.True(t, subring == newSubring, "recomputed lookback subring reused")

// A plain shuffle shard for the same user and size is cached under a separate key.
require.False(t, ring.ShuffleShard(user, shardSize) == newSubring, "plain and lookback subrings are cached separately")

// Change of instances (topology) invalidates the cache.
before := ring.ShuffleShardWithLookback(user, 1, lookbackPeriod, time.Now())
require.True(t, before == ring.ShuffleShardWithLookback(user, 1, lookbackPeriod, time.Now()), "cached before topology change")

// Stop one instance per zone. Subring needs to be recomputed.
for i := range zones {
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), lcs[i]))
}
test.Poll(t, 5*time.Second, numLifecyclers-zones, func() any {
return ring.InstancesCount()
})
require.False(t, before == ring.ShuffleShardWithLookback(user, 1, lookbackPeriod, time.Now()), "recomputed after topology change")

// Change of shard size needs a different subring.
sizeOne := ring.ShuffleShardWithLookback(user, 1, lookbackPeriod, time.Now())
require.False(t, sizeOne == ring.ShuffleShardWithLookback(user, 2, lookbackPeriod, time.Now()), "different shard sizes cached separately")

// Same size reuses the cache, and cleanup invalidates it.
require.True(t, sizeOne == ring.ShuffleShardWithLookback(user, 1, lookbackPeriod, time.Now()))
ring.CleanupShuffleShardCache(user)
require.False(t, sizeOne == ring.ShuffleShardWithLookback(user, 1, lookbackPeriod, time.Now()), "recomputed after cache cleanup")
}

// User shuffle shard token.
func userToken(user, zone string, skip int) uint32 {
r := rand.New(rand.NewSource(shard.ShuffleShardSeed(user, zone)))
Expand Down
Loading