Skip to content

Commit

Permalink
feat: Introduce policy stream mapping (#15982)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
Introduces the idea of policies to Loki, which are recognizable based on the given stream selectors.
This is an improved version of #15561 and built on top of #15875.
A policy mapping can be configured the following way:
```yaml
12345:
  policy_stream_mapping:
    policy6:
    - selector: `{env="prod"}`
      priority: 2
    - selector: `{env=~"prod|staging"}`
      priority: 1
    - selector: `{team="finance"}`
      priority: 4
    policy7:
    - selector: `{env=~"prod|dev"}`
      priority: 3
```
With that configuration, pushes to tenant `12345` with the labels `{env="prod", team="finance"}` would be assigned to policy6 because the third mapping for policy6 matches these labels and has higher priority than any other matching.
  • Loading branch information
DylanGuedes authored Feb 5, 2025
1 parent 2587f34 commit 5c8e832
Show file tree
Hide file tree
Showing 15 changed files with 359 additions and 84 deletions.
14 changes: 14 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3613,6 +3613,20 @@ otlp_config:
# CLI flag: -validation.enforced-labels
[enforced_labels: <list of strings> | default = []]

# Map of policies to stream selectors with a priority. Experimental.
# Example:
# policy_stream_mapping:
# finance:
# - selectors: ["{namespace="prod", container="billing"}"]
# priority: 2
# ops:
# - selectors: ["{namespace="prod", container="ops"}"]
# priority: 1
# staging:
# - selectors: ["{namespace="staging"}, {namespace="dev"}"]
# priority: 1
[policy_stream_mapping: <map of string to list of PriorityStreams>]

# The number of partitions a tenant's data should be sharded to when using kafka
# ingestion. Tenants are sharded across partitions using shuffle-sharding. 0
# disables shuffle sharding and tenant is sharded across all partitions.
Expand Down
1 change: 1 addition & 0 deletions pkg/compactor/retention/expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Limits interface {
StreamRetention(userID string) []validation.StreamRetention
AllByUserID() map[string]*validation.Limits
DefaultLimits() *validation.Limits
PoliciesStreamMapping(userID string) validation.PolicyStreamMapping
}

func NewExpirationChecker(limits Limits) ExpirationChecker {
Expand Down
9 changes: 7 additions & 2 deletions pkg/compactor/retention/expiration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import (
)

type retentionLimit struct {
retentionPeriod time.Duration
streamRetention []validation.StreamRetention
retentionPeriod time.Duration
streamRetention []validation.StreamRetention
policyStreamMapping validation.PolicyStreamMapping
}

func (r retentionLimit) convertToValidationLimit() *validation.Limits {
Expand All @@ -33,6 +34,10 @@ func (f fakeLimits) RetentionPeriod(userID string) time.Duration {
return f.perTenant[userID].retentionPeriod
}

func (f fakeLimits) PoliciesStreamMapping(_ string) validation.PolicyStreamMapping {
return f.perTenant["user0"].policyStreamMapping
}

func (f fakeLimits) StreamRetention(userID string) []validation.StreamRetention {
return f.perTenant[userID].streamRetention
}
Expand Down
48 changes: 27 additions & 21 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,24 +528,24 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
d.truncateLines(validationContext, &stream)

var lbs labels.Labels
var retentionHours string
lbs, stream.Labels, stream.Hash, retentionHours, err = d.parseStreamLabels(validationContext, stream.Labels, stream)
var retentionHours, policy string
lbs, stream.Labels, stream.Hash, retentionHours, policy, err = d.parseStreamLabels(validationContext, stream.Labels, stream)
if err != nil {
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
validation.DiscardedSamples.WithLabelValues(validation.InvalidLabels, tenantID, retentionHours).Add(float64(len(stream.Entries)))
validation.DiscardedSamples.WithLabelValues(validation.InvalidLabels, tenantID, retentionHours, policy).Add(float64(len(stream.Entries)))
discardedBytes := util.EntriesTotalSize(stream.Entries)
validation.DiscardedBytes.WithLabelValues(validation.InvalidLabels, tenantID, retentionHours).Add(float64(discardedBytes))
validation.DiscardedBytes.WithLabelValues(validation.InvalidLabels, tenantID, retentionHours, policy).Add(float64(discardedBytes))
continue
}

if missing, lbsMissing := d.missingEnforcedLabels(lbs, tenantID); missing {
err := fmt.Errorf(validation.MissingEnforcedLabelsErrorMsg, strings.Join(lbsMissing, ","), tenantID)
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
validation.DiscardedSamples.WithLabelValues(validation.MissingEnforcedLabels, tenantID, retentionHours).Add(float64(len(stream.Entries)))
validation.DiscardedSamples.WithLabelValues(validation.MissingEnforcedLabels, tenantID, retentionHours, policy).Add(float64(len(stream.Entries)))
discardedBytes := util.EntriesTotalSize(stream.Entries)
validation.DiscardedBytes.WithLabelValues(validation.MissingEnforcedLabels, tenantID, retentionHours).Add(float64(discardedBytes))
validation.DiscardedBytes.WithLabelValues(validation.MissingEnforcedLabels, tenantID, retentionHours, policy).Add(float64(discardedBytes))
continue
}

Expand All @@ -554,7 +554,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
prevTs := stream.Entries[0].Timestamp

for _, entry := range stream.Entries {
if err := d.validator.ValidateEntry(ctx, validationContext, lbs, entry, retentionHours); err != nil {
if err := d.validator.ValidateEntry(ctx, validationContext, lbs, entry, retentionHours, policy); err != nil {
d.writeFailuresManager.Log(tenantID, err)
validationErrors.Add(err)
continue
Expand Down Expand Up @@ -609,7 +609,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
}

n++
validationContext.validationMetrics.compute(entry, retentionHours)
validationContext.validationMetrics.compute(entry, retentionHours, policy)
pushSize += len(entry.Line)
}
stream.Entries = stream.Entries[:n]
Expand Down Expand Up @@ -647,10 +647,10 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
return nil, httpgrpc.Errorf(retStatusCode, "%s", err.Error())
}

if !d.ingestionRateLimiter.AllowN(now, tenantID, validationContext.validationMetrics.lineSize) {
if !d.ingestionRateLimiter.AllowN(now, tenantID, validationContext.validationMetrics.aggregatedPushStats.lineSize) {
d.trackDiscardedData(ctx, req, validationContext, tenantID, validationContext.validationMetrics, validation.RateLimited)

err = fmt.Errorf(validation.RateLimitedErrorMsg, tenantID, int(d.ingestionRateLimiter.Limit(now, tenantID)), validationContext.validationMetrics.lineCount, validationContext.validationMetrics.lineSize)
err = fmt.Errorf(validation.RateLimitedErrorMsg, tenantID, int(d.ingestionRateLimiter.Limit(now, tenantID)), validationContext.validationMetrics.aggregatedPushStats.lineCount, validationContext.validationMetrics.aggregatedPushStats.lineSize)
d.writeFailuresManager.Log(tenantID, err)
// Return a 429 to indicate to the client they are being rate limited
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "%s", err.Error())
Expand Down Expand Up @@ -787,14 +787,16 @@ func (d *Distributor) trackDiscardedData(
validationMetrics validationMetrics,
reason string,
) {
for retentionHours, count := range validationMetrics.lineCountPerRetentionHours {
validation.DiscardedSamples.WithLabelValues(reason, tenantID, retentionHours).Add(float64(count))
validation.DiscardedBytes.WithLabelValues(reason, tenantID, retentionHours).Add(float64(validationMetrics.lineSizePerRetentionHours[retentionHours]))
for policy, retentionToStats := range validationMetrics.policyPushStats {
for retentionHours, stats := range retentionToStats {
validation.DiscardedSamples.WithLabelValues(reason, tenantID, retentionHours, policy).Add(float64(stats.lineCount))
validation.DiscardedBytes.WithLabelValues(reason, tenantID, retentionHours, policy).Add(float64(stats.lineSize))
}
}

if d.usageTracker != nil {
for _, stream := range req.Streams {
lbs, _, _, _, err := d.parseStreamLabels(validationContext, stream.Labels, stream)
lbs, _, _, _, _, err := d.parseStreamLabels(validationContext, stream.Labels, stream)
if err != nil {
continue
}
Expand Down Expand Up @@ -1173,28 +1175,32 @@ type labelData struct {
hash uint64
}

func (d *Distributor) parseStreamLabels(vContext validationContext, key string, stream logproto.Stream) (labels.Labels, string, uint64, string, error) {
func (d *Distributor) parseStreamLabels(vContext validationContext, key string, stream logproto.Stream) (labels.Labels, string, uint64, string, string, error) {
mapping := d.validator.Limits.PoliciesStreamMapping(vContext.userID)
if val, ok := d.labelCache.Get(key); ok {
retentionHours := d.tenantsRetention.RetentionHoursFor(vContext.userID, val.ls)
return val.ls, val.ls.String(), val.hash, retentionHours, nil
policy := mapping.PolicyFor(val.ls)
return val.ls, val.ls.String(), val.hash, retentionHours, policy, nil
}

ls, err := syntax.ParseLabels(key)
if err != nil {
tenantRetentionHours := d.tenantsRetention.RetentionHoursFor(vContext.userID, nil)
return nil, "", 0, tenantRetentionHours, fmt.Errorf(validation.InvalidLabelsErrorMsg, key, err)
retentionHours := d.tenantsRetention.RetentionHoursFor(vContext.userID, nil)
// TODO: check for global policy.
return nil, "", 0, retentionHours, mapping.PolicyFor(nil), fmt.Errorf(validation.InvalidLabelsErrorMsg, key, err)
}

policy := mapping.PolicyFor(ls)
retentionHours := d.tenantsRetention.RetentionHoursFor(vContext.userID, ls)

if err := d.validator.ValidateLabels(vContext, ls, stream, retentionHours); err != nil {
return nil, "", 0, retentionHours, err
if err := d.validator.ValidateLabels(vContext, ls, stream, retentionHours, policy); err != nil {
return nil, "", 0, retentionHours, policy, err
}

lsHash := ls.Hash()

d.labelCache.Add(key, labelData{ls, lsHash})
return ls, ls.String(), lsHash, retentionHours, nil
return ls, ls.String(), lsHash, retentionHours, policy, nil
}

// shardCountFor returns the right number of shards to be used by the given stream.
Expand Down
63 changes: 61 additions & 2 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1233,7 +1233,7 @@ func Benchmark_SortLabelsOnPush(b *testing.B) {
for n := 0; n < b.N; n++ {
stream := request.Streams[0]
stream.Labels = `{buzz="f", a="b"}`
_, _, _, _, err := d.parseStreamLabels(vCtx, stream.Labels, stream)
_, _, _, _, _, err := d.parseStreamLabels(vCtx, stream.Labels, stream)
if err != nil {
panic("parseStreamLabels fail,err:" + err.Error())
}
Expand Down Expand Up @@ -1279,7 +1279,7 @@ func TestParseStreamLabels(t *testing.T) {
vCtx := d.validator.getValidationContextForTime(testTime, "123")

t.Run(tc.name, func(t *testing.T) {
lbs, lbsString, hash, _, err := d.parseStreamLabels(vCtx, tc.origLabels, logproto.Stream{
lbs, lbsString, hash, _, _, err := d.parseStreamLabels(vCtx, tc.origLabels, logproto.Stream{
Labels: tc.origLabels,
})
if tc.expectedErr != nil {
Expand Down Expand Up @@ -2063,3 +2063,62 @@ func TestDistributor_StructuredMetadataSanitization(t *testing.T) {
assert.Equal(t, tc.numSanitizations, testutil.ToFloat64(distributors[0].tenantPushSanitizedStructuredMetadata.WithLabelValues("test")))
}
}

func BenchmarkDistributor_PushWithPolicies(b *testing.B) {
baselineLimits := &validation.Limits{}
flagext.DefaultValues(baselineLimits)
lbs := `{foo="bar", env="prod", daz="baz", container="loki", pod="loki-0"}`

b.Run("push without policies", func(b *testing.B) {
limits := baselineLimits
limits.PolicyStreamMapping = make(validation.PolicyStreamMapping)
distributors, _ := prepare(&testing.T{}, 1, 3, limits, nil)
req := makeWriteRequestWithLabels(10, 10, []string{lbs}, false, false, false)
b.ResetTimer()
for i := 0; i < b.N; i++ {
distributors[0].Push(ctx, req) //nolint:errcheck
}
})

for numPolicies := 1; numPolicies <= 100; numPolicies *= 10 {
b.Run(fmt.Sprintf("push with %d policies", numPolicies), func(b *testing.B) {
limits := baselineLimits
limits.PolicyStreamMapping = make(validation.PolicyStreamMapping)
for i := 1; i <= numPolicies; i++ {
limits.PolicyStreamMapping[fmt.Sprintf("policy%d", i)] = []*validation.PriorityStream{
{
Selector: `{foo="bar"}`, Priority: i,
},
}
}

req := makeWriteRequestWithLabels(10, 10, []string{lbs}, false, false, false)
distributors, _ := prepare(&testing.T{}, 1, 3, limits, nil)
b.ResetTimer()
for i := 0; i < b.N; i++ {
distributors[0].Push(ctx, req) //nolint:errcheck
}
})
}

for numMatchers := 1; numMatchers <= 100; numMatchers *= 10 {
b.Run(fmt.Sprintf("push with %d matchers", numMatchers), func(b *testing.B) {
limits := baselineLimits
limits.PolicyStreamMapping = make(validation.PolicyStreamMapping)
for i := 1; i <= numMatchers; i++ {
limits.PolicyStreamMapping["policy0"] = append(limits.PolicyStreamMapping["policy0"], &validation.PriorityStream{
Selector: `{foo="bar"}`,
Matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")},
Priority: i,
})
}

req := makeWriteRequestWithLabels(10, 10, []string{lbs}, false, false, false)
distributors, _ := prepare(&testing.T{}, 1, 3, limits, nil)
b.ResetTimer()
for i := 0; i < b.N; i++ {
distributors[0].Push(ctx, req) //nolint:errcheck
}
})
}
}
40 changes: 27 additions & 13 deletions pkg/distributor/validation_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,40 @@ import (
"github.com/grafana/loki/v3/pkg/util"
)

type pushStats struct {
lineSize int
lineCount int
}

type validationMetrics struct {
lineSizePerRetentionHours map[string]int
lineCountPerRetentionHours map[string]int
lineSize int
lineCount int
tenantRetentionHours string
policyPushStats map[string]map[string]pushStats // policy -> retentionHours -> lineSize
tenantRetentionHours string
aggregatedPushStats pushStats
}

func newValidationMetrics(tenantRetentionHours string) validationMetrics {
return validationMetrics{
lineSizePerRetentionHours: make(map[string]int),
lineCountPerRetentionHours: make(map[string]int),
tenantRetentionHours: tenantRetentionHours,
policyPushStats: make(map[string]map[string]pushStats),
tenantRetentionHours: tenantRetentionHours,
}
}

func (v *validationMetrics) compute(entry logproto.Entry, retentionHours string) {
func (v *validationMetrics) compute(entry logproto.Entry, retentionHours string, policy string) {
if _, ok := v.policyPushStats[policy]; !ok {
v.policyPushStats[policy] = make(map[string]pushStats)
}

if _, ok := v.policyPushStats[policy][retentionHours]; !ok {
v.policyPushStats[policy][retentionHours] = pushStats{}
}

totalEntrySize := util.EntryTotalSize(&entry)
v.lineSizePerRetentionHours[retentionHours] += totalEntrySize
v.lineCountPerRetentionHours[retentionHours]++
v.lineSize += totalEntrySize
v.lineCount++

v.aggregatedPushStats.lineSize += totalEntrySize
v.aggregatedPushStats.lineCount++

stats := v.policyPushStats[policy][retentionHours]
stats.lineCount++
stats.lineSize += totalEntrySize
v.policyPushStats[policy][retentionHours] = stats
}
Loading

0 comments on commit 5c8e832

Please sign in to comment.