Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions pkg/sources/s3/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,23 @@ type metricsCollector interface {
RecordObjectSkipped(bucket, reason string, sizeBytes float64)
RecordObjectError(bucket string)

// Bucket metrics.

RecordBucketListError(bucket, roleArn string)

// Role metrics.

RecordRoleScanned(roleArn string)
RecordBucketForRole(roleArn string)
}

type collector struct {
objectsScanned *prometheus.HistogramVec
objectsSkipped *prometheus.HistogramVec
objectsErrors *prometheus.CounterVec
rolesScanned *prometheus.GaugeVec
bucketsPerRole *prometheus.GaugeVec
objectsScanned *prometheus.HistogramVec
objectsSkipped *prometheus.HistogramVec
objectsErrors *prometheus.CounterVec
bucketListErrors *prometheus.CounterVec
rolesScanned *prometheus.GaugeVec
bucketsPerRole *prometheus.GaugeVec
}

var metricsInstance metricsCollector
Expand Down Expand Up @@ -58,6 +63,13 @@ func init() {
Help: "Total number of errors encountered during S3 scan",
}, []string{"bucket"}),

bucketListErrors: promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Name: "bucket_list_errors_total",
Help: "Total number of failures to list objects in an S3 bucket, by bucket and role",
}, []string{"bucket", "role_arn"}),

rolesScanned: promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: common.MetricsNamespace,
Subsystem: common.MetricsSubsystem,
Expand Down Expand Up @@ -86,6 +98,13 @@ func (c *collector) RecordObjectError(bucket string) {
c.objectsErrors.WithLabelValues(bucket).Inc()
}

func (c *collector) RecordBucketListError(bucket, roleArn string) {
if roleArn == "" {
roleArn = defaultRoleARN
}
c.bucketListErrors.WithLabelValues(bucket, roleArn).Inc()
}

const defaultRoleARN = "default"

func (c *collector) RecordRoleScanned(roleArn string) {
Expand Down
33 changes: 26 additions & 7 deletions pkg/sources/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,17 +285,21 @@ func determineResumePosition(ctx context.Context, tracker *Checkpointer, buckets
}
}

// scanBuckets scans the given buckets using the given role and adds the number
// of objects it scanned to totalObjectCount. The counter is owned by Chunks and
// shared across role passes so that the completion message reflects the whole
// scan, not just the last role's pass.
func (s *Source) scanBuckets(
ctx context.Context,
client *s3.Client,
role string,
bucketsToScan []string,
chunksChan chan *sources.Chunk,
totalObjectCount *uint64,
) {
if role != "" {
ctx = context.WithValue(ctx, "role", role)
}
var totalObjectCount uint64

checkpointer := NewCheckpointer(ctx, &s.Progress, false)
pos := determineResumePosition(ctx, checkpointer, bucketsToScan)
Expand Down Expand Up @@ -340,13 +344,13 @@ func (s *Source) scanBuckets(
}

objectCount := s.scanBucket(ctx, client, role, bucket, sources.ChanReporter{Ch: chunksChan}, startAfter, checkpointer)
totalObjectCount += objectCount
*totalObjectCount += objectCount
}

s.SetProgressComplete(
len(bucketsToScan),
len(bucketsToScan),
fmt.Sprintf("Completed scanning source %s. %d objects scanned.", s.name, totalObjectCount),
fmt.Sprintf("Completed scanning source %s. %d objects scanned.", s.name, *totalObjectCount),
"",
)
}
Expand Down Expand Up @@ -390,15 +394,19 @@ func (s *Source) scanBucket(
for paginator.HasMorePages() {
output, err := paginator.NextPage(ctx)
if err != nil {
if role == "" {
ctx.Logger().Error(err, "could not list objects in bucket")
} else {
if s.listErrorsAreExpected(role) {
// Our documentation blesses specifying a role to assume without specifying buckets to scan, which will
// often cause this to happen a lot (because in that case the scanner tries to scan every bucket in the
// account, but the role probably doesn't have access to all of them). This makes it expected behavior
// and therefore not an error.
ctx.Logger().V(3).Info("could not list objects in bucket", "err", err)
} else {
// This can also be a failure to assume the role itself: role credentials
// are resolved lazily, so the first request that needs them surfaces the
// STS error here rather than at client construction.
ctx.Logger().Error(err, "could not list objects in bucket")
}
s.metricsCollector.RecordBucketListError(bucket, role)
break
}
pageMetadata := pageMetadata{
Expand All @@ -419,10 +427,21 @@ func (s *Source) scanBucket(
return objectCount
}

// listErrorsAreExpected reports whether a failure to list a bucket's objects
// should be suppressed rather than logged as an error. When a role is assumed
// without an explicit bucket list, the scanner attempts every bucket in the
// account and is expected to be denied on some of them. When buckets are
// explicitly configured, a listing failure means a configured target is being
// silently skipped, so it is always an error, even under an assumed role.
func (s *Source) listErrorsAreExpected(role string) bool {
return role != "" && len(s.conn.GetBuckets()) == 0
}

// Chunks emits chunks of bytes over a channel.
func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk, _ ...sources.ChunkingTarget) error {
var totalObjectCount uint64
visitor := func(c context.Context, defaultRegionClient *s3.Client, roleArn string, buckets []string) error {
s.scanBuckets(c, defaultRegionClient, roleArn, buckets, chunksChan)
s.scanBuckets(c, defaultRegionClient, roleArn, buckets, chunksChan, &totalObjectCount)
return nil
}

Expand Down
70 changes: 70 additions & 0 deletions pkg/sources/s3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,76 @@ func TestSource_Init_IncludeAndIgnoreBucketsError(t *testing.T) {
assert.Error(t, err)
}

func TestSource_ListErrorsAreExpected(t *testing.T) {
tests := []struct {
name string
role string
buckets []string
want bool
}{
{
name: "no role, no explicit buckets",
want: false,
},
{
name: "no role, explicit buckets",
buckets: []string{"bucket-a"},
want: false,
},
{
name: "role without explicit buckets, denials are expected",
role: "arn:aws:iam::123456789012:role/some-role",
want: true,
},
{
name: "role with explicit buckets, denials are errors",
role: "arn:aws:iam::123456789012:role/some-role",
buckets: []string{"bucket-a"},
want: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
conn, err := anypb.New(&sourcespb.S3{
Credential: &sourcespb.S3_AccessKey{
AccessKey: &credentialspb.KeySecret{
Key: "ignored for test",
Secret: "ignored for test",
},
},
Buckets: tt.buckets,
Roles: []string{tt.role},
})
require.NoError(t, err)

s := Source{}
require.NoError(t, s.Init(context.Background(), "s3 test source", 0, 0, false, conn, 1))

assert.Equal(t, tt.want, s.listErrorsAreExpected(tt.role))
})
}
}

func TestSource_ScanBucketsReportsCumulativeObjectCount(t *testing.T) {
conn, err := anypb.New(&sourcespb.S3{
Credential: &sourcespb.S3_Unauthenticated{},
})
require.NoError(t, err)

s := Source{}
require.NoError(t, s.Init(context.Background(), "s3 test source", 0, 0, false, conn, 1))

// Simulate a later role pass after an earlier pass already scanned three
// objects. The pass below scans no buckets, so the completion message must
// still report the cumulative total rather than resetting to zero.
totalObjectCount := uint64(3)
s.scanBuckets(context.Background(), nil, "", nil, make(chan *sources.Chunk, 1), &totalObjectCount)

assert.Equal(t, uint64(3), totalObjectCount)
assert.Contains(t, s.Message, "3 objects scanned")
}

func TestSource_Chunks(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
Expand Down
Loading