//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package keepstore
import (
"bufio"
"fmt"
"io"
"io/ioutil"
- "log"
"net/http"
"os"
"regexp"
"sync/atomic"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
"github.com/AdRoll/goamz/aws"
"github.com/AdRoll/goamz/s3"
"github.com/prometheus/client_golang/prometheus"
)
func init() {
- driver["S3"] = newS3Volume
+ driver["S3"] = chooseS3VolumeDriver
}
func newS3Volume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
- v := &S3Volume{cluster: cluster, volume: volume, logger: logger, metrics: metrics}
- err := json.Unmarshal(volume.DriverParameters, &v)
+ v := &S3Volume{cluster: cluster, volume: volume, metrics: metrics}
+ err := json.Unmarshal(volume.DriverParameters, v)
if err != nil {
return nil, err
}
+ v.logger = logger.WithField("Volume", v.String())
return v, v.check()
}
return errors.New("DriverParameters: RaceWindow must not be negative")
}
- var ok bool
- v.region, ok = aws.Regions[v.Region]
if v.Endpoint == "" {
+ r, ok := aws.Regions[v.Region]
if !ok {
return fmt.Errorf("unrecognized region %+q; try specifying endpoint instead", v.Region)
}
- } else if ok {
- return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
- "specify empty endpoint or use a different region name", v.Region, v.Endpoint)
+ v.region = r
} else {
v.region = aws.Region{
Name: v.Region,
// S3Volume implements Volume using an S3 bucket.
type S3Volume struct {
- AccessKey string
- SecretKey string
- AuthToken string // populated automatically when IAMRole is used
- AuthExpiration time.Time // populated automatically when IAMRole is used
- IAMRole string
- Endpoint string
- Region string
- Bucket string
- LocationConstraint bool
- IndexPageSize int
- ConnectTimeout arvados.Duration
- ReadTimeout arvados.Duration
- RaceWindow arvados.Duration
- UnsafeDelete bool
+ arvados.S3VolumeDriverParameters
+ AuthToken string // populated automatically when IAMRole is used
+ AuthExpiration time.Time // populated automatically when IAMRole is used
cluster *arvados.Cluster
volume arvados.Volume
}
func (v *S3Volume) bootstrapIAMCredentials() error {
- if v.AccessKey != "" || v.SecretKey != "" {
+ if v.AccessKeyID != "" || v.SecretAccessKey != "" {
if v.IAMRole != "" {
- return errors.New("invalid DriverParameters: AccessKey and SecretKey must be blank if IAMRole is specified")
+ return errors.New("invalid DriverParameters: AccessKeyID and SecretAccessKey must be blank if IAMRole is specified")
}
return nil
}
}
func (v *S3Volume) newS3Client() *s3.S3 {
- auth := aws.NewAuth(v.AccessKey, v.SecretKey, v.AuthToken, v.AuthExpiration)
+ auth := aws.NewAuth(v.AccessKeyID, v.SecretAccessKey, v.AuthToken, v.AuthExpiration)
client := s3.New(*auth, v.region)
- if v.region.EC2Endpoint.Signer == aws.V4Signature {
- // Currently affects only eu-central-1
+ if !v.V2Signature {
client.Signature = aws.V4Signature
}
client.ConnectTimeout = time.Duration(v.ConnectTimeout)
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound {
- return 0, fmt.Errorf("this instance does not have an IAM role assigned -- either assign a role, or configure AccessKey and SecretKey explicitly in DriverParameters (error getting %s: HTTP status %s)", url, resp.Status)
+ return 0, fmt.Errorf("this instance does not have an IAM role assigned -- either assign a role, or configure AccessKeyID and SecretAccessKey explicitly in DriverParameters (error getting %s: HTTP status %s)", url, resp.Status)
} else if resp.StatusCode != http.StatusOK {
return 0, fmt.Errorf("error getting %s: HTTP status %s", url, resp.Status)
}
if err != nil {
return 0, fmt.Errorf("error decoding credentials from %s: %s", url, err)
}
- v.AccessKey, v.SecretKey, v.AuthToken, v.AuthExpiration = cred.AccessKeyID, cred.SecretAccessKey, cred.Token, cred.Expiration
+ v.AccessKeyID, v.SecretAccessKey, v.AuthToken, v.AuthExpiration = cred.AccessKeyID, cred.SecretAccessKey, cred.Token, cred.Expiration
v.bucket.SetBucket(&s3.Bucket{
S3: v.newS3Client(),
Name: v.Bucket,
return ttl, nil
}
-func (v *S3Volume) getReaderWithContext(ctx context.Context, loc string) (rdr io.ReadCloser, err error) {
+func (v *S3Volume) getReaderWithContext(ctx context.Context, key string) (rdr io.ReadCloser, err error) {
ready := make(chan bool)
go func() {
- rdr, err = v.getReader(loc)
+ rdr, err = v.getReader(key)
close(ready)
}()
select {
case <-ready:
return
case <-ctx.Done():
- v.logger.Debugf("s3: abandoning getReader(): %s", ctx.Err())
+ v.logger.Debugf("s3: abandoning getReader(%s): %s", key, ctx.Err())
go func() {
<-ready
if err == nil {
// In situations where (Bucket)GetReader would fail because the block
// disappeared in a Trash race, getReader calls fixRace to recover the
// data, and tries again.
-func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
- rdr, err = v.bucket.GetReader(loc)
+func (v *S3Volume) getReader(key string) (rdr io.ReadCloser, err error) {
+ rdr, err = v.bucket.GetReader(key)
err = v.translateError(err)
if err == nil || !os.IsNotExist(err) {
return
}
- _, err = v.bucket.Head("recent/"+loc, nil)
+ _, err = v.bucket.Head("recent/"+key, nil)
err = v.translateError(err)
if err != nil {
// If we can't read recent/X, there's no point in
// trying fixRace. Give up.
return
}
- if !v.fixRace(loc) {
+ if !v.fixRace(key) {
err = os.ErrNotExist
return
}
- rdr, err = v.bucket.GetReader(loc)
+ rdr, err = v.bucket.GetReader(key)
if err != nil {
- log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
+ v.logger.Warnf("reading %s after successful fixRace: %s", key, err)
err = v.translateError(err)
}
return
// Get a block: copy the block data into buf, and return the number of
// bytes copied.
func (v *S3Volume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
- rdr, err := v.getReaderWithContext(ctx, loc)
+ key := v.key(loc)
+ rdr, err := v.getReaderWithContext(ctx, key)
if err != nil {
return 0, err
}
// Compare the given data with the stored data.
func (v *S3Volume) Compare(ctx context.Context, loc string, expect []byte) error {
+ key := v.key(loc)
errChan := make(chan error, 1)
go func() {
- _, err := v.bucket.Head("recent/"+loc, nil)
+ _, err := v.bucket.Head("recent/"+key, nil)
errChan <- err
}()
var err error
// problem on to our clients.
return v.translateError(err)
}
- rdr, err := v.getReaderWithContext(ctx, loc)
+ rdr, err := v.getReaderWithContext(ctx, key)
if err != nil {
return err
}
opts.ContentSHA256 = fmt.Sprintf("%x", sha256.Sum256(block))
}
+ key := v.key(loc)
+
// Send the block data through a pipe, so that (if we need to)
// we can close the pipe early and abandon our PutReader()
// goroutine, without worrying about PutReader() accessing our
go func() {
defer func() {
if ctx.Err() != nil {
- v.logger.Debugf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
+ v.logger.Debugf("abandoned PutReader goroutine finished with err: %s", err)
}
}()
defer close(ready)
- err = v.bucket.PutReader(loc, bufr, int64(size), "application/octet-stream", s3ACL, opts)
+ err = v.bucket.PutReader(key, bufr, int64(size), "application/octet-stream", s3ACL, opts)
if err != nil {
return
}
- err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.PutReader("recent/"+key, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
}()
select {
case <-ctx.Done():
- v.logger.Debugf("%s: taking PutReader's input away: %s", v, ctx.Err())
+ v.logger.Debugf("taking PutReader's input away: %s", ctx.Err())
// Our pipe might be stuck in Write(), waiting for
// PutReader() to read. If so, un-stick it. This means
// PutReader will get corrupt data, but that's OK: the
go io.Copy(ioutil.Discard, bufr)
// CloseWithError() will return once pending I/O is done.
bufw.CloseWithError(ctx.Err())
- v.logger.Debugf("%s: abandoning PutReader goroutine", v)
+ v.logger.Debugf("abandoning PutReader goroutine")
return ctx.Err()
case <-ready:
// Unblock pipe in case PutReader did not consume it.
if v.volume.ReadOnly {
return MethodDisabledError
}
- _, err := v.bucket.Head(loc, nil)
+ key := v.key(loc)
+ _, err := v.bucket.Head(key, nil)
err = v.translateError(err)
- if os.IsNotExist(err) && v.fixRace(loc) {
+ if os.IsNotExist(err) && v.fixRace(key) {
// The data object got trashed in a race, but fixRace
// rescued it.
} else if err != nil {
return err
}
- err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.PutReader("recent/"+key, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
return v.translateError(err)
}
// Mtime returns the stored timestamp for the given locator.
func (v *S3Volume) Mtime(loc string) (time.Time, error) {
- _, err := v.bucket.Head(loc, nil)
+ key := v.key(loc)
+ _, err := v.bucket.Head(key, nil)
if err != nil {
return zeroTime, v.translateError(err)
}
- resp, err := v.bucket.Head("recent/"+loc, nil)
+ resp, err := v.bucket.Head("recent/"+key, nil)
err = v.translateError(err)
if os.IsNotExist(err) {
// The data object X exists, but recent/X is missing.
- err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.PutReader("recent/"+key, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
- log.Printf("error: creating %q: %s", "recent/"+loc, err)
+ v.logger.WithError(err).Errorf("error creating %q", "recent/"+key)
return zeroTime, v.translateError(err)
}
- log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
- resp, err = v.bucket.Head("recent/"+loc, nil)
+ v.logger.Infof("created %q to migrate existing block to new storage scheme", "recent/"+key)
+ resp, err = v.bucket.Head("recent/"+key, nil)
if err != nil {
- log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
+ v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+key)
return zeroTime, v.translateError(err)
}
} else if err != nil {
func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
// Use a merge sort to find matching sets of X and recent/X.
dataL := s3Lister{
+ Logger: v.logger,
Bucket: v.bucket.Bucket(),
- Prefix: prefix,
+ Prefix: v.key(prefix),
PageSize: v.IndexPageSize,
Stats: &v.bucket.stats,
}
recentL := s3Lister{
+ Logger: v.logger,
Bucket: v.bucket.Bucket(),
- Prefix: "recent/" + prefix,
+ Prefix: "recent/" + v.key(prefix),
PageSize: v.IndexPageSize,
Stats: &v.bucket.stats,
}
// over all of them needlessly with dataL.
break
}
- if !v.isKeepBlock(data.Key) {
+ loc, isBlk := v.isKeepBlock(data.Key)
+ if !isBlk {
continue
}
if err != nil {
return err
}
- fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
+ // We truncate sub-second precision here. Otherwise
+ // timestamps will never match the RFC1123-formatted
+ // Last-Modified values parsed by Mtime().
+ fmt.Fprintf(writer, "%s+%d %d\n", loc, data.Size, t.Unix()*1000000000)
}
return dataL.Error()
}
} else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
return nil
}
+ key := v.key(loc)
if v.cluster.Collections.BlobTrashLifetime == 0 {
if !v.UnsafeDelete {
return ErrS3TrashDisabled
}
- return v.translateError(v.bucket.Del(loc))
+ return v.translateError(v.bucket.Del(key))
}
- err := v.checkRaceWindow(loc)
+ err := v.checkRaceWindow(key)
if err != nil {
return err
}
- err = v.safeCopy("trash/"+loc, loc)
+ err = v.safeCopy("trash/"+key, key)
if err != nil {
return err
}
- return v.translateError(v.bucket.Del(loc))
+ return v.translateError(v.bucket.Del(key))
}
-// checkRaceWindow returns a non-nil error if trash/loc is, or might
-// be, in the race window (i.e., it's not safe to trash loc).
-func (v *S3Volume) checkRaceWindow(loc string) error {
- resp, err := v.bucket.Head("trash/"+loc, nil)
+// checkRaceWindow returns a non-nil error if trash/key is, or might
+// be, in the race window (i.e., it's not safe to trash key).
+func (v *S3Volume) checkRaceWindow(key string) error {
+ resp, err := v.bucket.Head("trash/"+key, nil)
err = v.translateError(err)
if os.IsNotExist(err) {
// OK, trash/X doesn't exist so we're not in the race
// trash/X's lifetime. The new timestamp might not
// become visible until now+raceWindow, and EmptyTrash
// is allowed to delete trash/X before then.
- return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
+ return fmt.Errorf("%s: same block is already in trash, and safe window ended %s ago", key, -safeWindow)
}
// trash/X exists, but it won't be eligible for deletion until
// after now+raceWindow, so it's safe to overwrite it.
// Untrash moves block from trash back into store
func (v *S3Volume) Untrash(loc string) error {
- err := v.safeCopy(loc, "trash/"+loc)
+ key := v.key(loc)
+ err := v.safeCopy(key, "trash/"+key)
if err != nil {
return err
}
- err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
+ err = v.bucket.PutReader("recent/"+key, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
return v.translateError(err)
}
var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
-func (v *S3Volume) isKeepBlock(s string) bool {
- return s3KeepBlockRegexp.MatchString(s)
+func (v *S3Volume) isKeepBlock(s string) (string, bool) {
+ if v.PrefixLength > 0 && len(s) == v.PrefixLength+33 && s[:v.PrefixLength] == s[v.PrefixLength+1:v.PrefixLength*2+1] {
+ s = s[v.PrefixLength+1:]
+ }
+ return s, s3KeepBlockRegexp.MatchString(s)
+}
+
+// Return the key used for a given loc. If PrefixLength==0 then
+// key("abcdef0123") is "abcdef0123", if PrefixLength==3 then key is
+// "abc/abcdef0123", etc.
+func (v *S3Volume) key(loc string) string {
+ if v.PrefixLength > 0 && v.PrefixLength < len(loc)-1 {
+ return loc[:v.PrefixLength] + "/" + loc
+ } else {
+ return loc
+ }
}
// fixRace(X) is called when "recent/X" exists but "X" doesn't
-// exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
-// there was a race between Put and Trash, fixRace recovers from the
-// race by Untrashing the block.
-func (v *S3Volume) fixRace(loc string) bool {
- trash, err := v.bucket.Head("trash/"+loc, nil)
+// exist. If the timestamps on "recent/X" and "trash/X" indicate there
+// was a race between Put and Trash, fixRace recovers from the race by
+// Untrashing the block.
+func (v *S3Volume) fixRace(key string) bool {
+ trash, err := v.bucket.Head("trash/"+key, nil)
if err != nil {
if !os.IsNotExist(v.translateError(err)) {
- log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
+ v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+key)
}
return false
}
trashTime, err := v.lastModified(trash)
if err != nil {
- log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
+ v.logger.WithError(err).Errorf("fixRace: error parsing time %q", trash.Header.Get("Last-Modified"))
return false
}
- recent, err := v.bucket.Head("recent/"+loc, nil)
+ recent, err := v.bucket.Head("recent/"+key, nil)
if err != nil {
- log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
+ v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+key)
return false
}
recentTime, err := v.lastModified(recent)
if err != nil {
- log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
+ v.logger.WithError(err).Errorf("fixRace: error parsing time %q", recent.Header.Get("Last-Modified"))
return false
}
return false
}
- log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
- log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
- err = v.safeCopy(loc, "trash/"+loc)
+ v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", key, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
+ v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+key, key)
+ err = v.safeCopy(key, "trash/"+key)
if err != nil {
- log.Printf("error: fixRace: %s", err)
+ v.logger.WithError(err).Error("fixRace: copy failed")
return false
}
return true
startT := time.Now()
emptyOneKey := func(trash *s3.Key) {
- loc := trash.Key[6:]
- if !v.isKeepBlock(loc) {
+ key := trash.Key[6:]
+ loc, isBlk := v.isKeepBlock(key)
+ if !isBlk {
return
}
atomic.AddInt64(&bytesInTrash, trash.Size)
trashT, err := time.Parse(time.RFC3339, trash.LastModified)
if err != nil {
- log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
+ v.logger.Warnf("EmptyTrash: %q: parse %q: %s", trash.Key, trash.LastModified, err)
return
}
- recent, err := v.bucket.Head("recent/"+loc, nil)
+ recent, err := v.bucket.Head("recent/"+key, nil)
if err != nil && os.IsNotExist(v.translateError(err)) {
- log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
+ v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", trash.Key, "recent/"+loc, err)
err = v.Untrash(loc)
if err != nil {
- log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
+ v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc)
}
return
} else if err != nil {
- log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
+ v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+key)
return
}
recentT, err := v.lastModified(recent)
if err != nil {
- log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
+ v.logger.WithError(err).Warnf("EmptyTrash: %q: error parsing %q", "recent/"+key, recent.Header.Get("Last-Modified"))
return
}
if trashT.Sub(recentT) < v.cluster.Collections.BlobSigningTTL.Duration() {
// Note this means (TrashSweepInterval
// < BlobSigningTTL - raceWindow) is
// necessary to avoid starvation.
- log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
- v.fixRace(loc)
+ v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
+ v.fixRace(key)
v.Touch(loc)
return
}
- _, err := v.bucket.Head(loc, nil)
+ _, err := v.bucket.Head(key, nil)
if os.IsNotExist(err) {
- log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
- v.fixRace(loc)
+ v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
+ v.fixRace(key)
return
} else if err != nil {
- log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
+ v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
return
}
}
}
err = v.bucket.Del(trash.Key)
if err != nil {
- log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
+ v.logger.WithError(err).Errorf("EmptyTrash: error deleting %q", trash.Key)
return
}
atomic.AddInt64(&bytesDeleted, trash.Size)
atomic.AddInt64(&blocksDeleted, 1)
- _, err = v.bucket.Head(loc, nil)
+ _, err = v.bucket.Head(key, nil)
if err == nil {
- log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
+ v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", key, key)
return
}
if !os.IsNotExist(v.translateError(err)) {
- log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
+ v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", key)
return
}
- err = v.bucket.Del("recent/" + loc)
+ err = v.bucket.Del("recent/" + key)
if err != nil {
- log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
+ v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+key)
}
}
}
trashL := s3Lister{
+ Logger: v.logger,
Bucket: v.bucket.Bucket(),
Prefix: "trash/",
PageSize: v.IndexPageSize,
wg.Wait()
if err := trashL.Error(); err != nil {
- log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
+ v.logger.WithError(err).Error("EmptyTrash: lister failed")
}
- log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
+ v.logger.Infof("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
}
type s3Lister struct {
+ Logger logrus.FieldLogger
Bucket *s3.Bucket
Prefix string
PageSize int
lister.buf = make([]s3.Key, 0, len(resp.Contents))
for _, key := range resp.Contents {
if !strings.HasPrefix(key.Key, lister.Prefix) {
- log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
+ lister.Logger.Warnf("s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
continue
}
lister.buf = append(lister.buf, key)