Merge branch '21705-go-deps'
[arvados.git] / services / keepstore / s3_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "encoding/base64"
11         "encoding/hex"
12         "encoding/json"
13         "errors"
14         "fmt"
15         "io"
16         "net/url"
17         "os"
18         "regexp"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "time"
23
24         "git.arvados.org/arvados.git/sdk/go/arvados"
25         "github.com/aws/aws-sdk-go-v2/aws"
26         v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
27         "github.com/aws/aws-sdk-go-v2/config"
28         "github.com/aws/aws-sdk-go-v2/credentials"
29         "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
30         "github.com/aws/aws-sdk-go-v2/service/s3"
31         "github.com/aws/aws-sdk-go-v2/service/s3/types"
32         "github.com/aws/smithy-go"
33         "github.com/prometheus/client_golang/prometheus"
34         "github.com/sirupsen/logrus"
35 )
36
37 func init() {
38         driver["S3"] = news3Volume
39 }
40
41 const (
42         s3DefaultReadTimeout        = arvados.Duration(10 * time.Minute)
43         s3DefaultConnectTimeout     = arvados.Duration(time.Minute)
44         maxClockSkew                = 600 * time.Second
45         nearlyRFC1123               = "Mon, 2 Jan 2006 15:04:05 GMT"
46         s3downloaderPartSize        = 6 * 1024 * 1024
47         s3downloaderReadConcurrency = 11
48         s3uploaderPartSize          = 5 * 1024 * 1024
49         s3uploaderWriteConcurrency  = 5
50 )
51
52 var (
53         errS3TrashDisabled        = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false")
54         s3AWSKeepBlockRegexp      = regexp.MustCompile(`^[0-9a-f]{32}$`)
55         s3AWSZeroTime             time.Time
56         defaultEndpointResolverV2 = s3.NewDefaultEndpointResolverV2()
57
58         // Returned by an aws.EndpointResolverWithOptions to indicate
59         // that the default resolver should be used.
60         errEndpointNotOverridden = &aws.EndpointNotFoundError{Err: errors.New("endpoint not overridden")}
61 )
62
63 // s3Volume implements Volume using an S3 bucket.
64 type s3Volume struct {
65         arvados.S3VolumeDriverParameters
66         AuthToken      string    // populated automatically when IAMRole is used
67         AuthExpiration time.Time // populated automatically when IAMRole is used
68
69         cluster    *arvados.Cluster
70         volume     arvados.Volume
71         logger     logrus.FieldLogger
72         metrics    *volumeMetricsVecs
73         bufferPool *bufferPool
74         bucket     *s3Bucket
75         region     string
76         startOnce  sync.Once
77
78         overrideEndpoint *aws.Endpoint
79         usePathStyle     bool // used by test suite
80 }
81
82 // s3bucket wraps s3.bucket and counts I/O and API usage stats. The
83 // wrapped bucket can be replaced atomically with SetBucket in order
84 // to update credentials.
85 type s3Bucket struct {
86         bucket string
87         svc    *s3.Client
88         stats  s3awsbucketStats
89         mu     sync.Mutex
90 }
91
92 func (v *s3Volume) isKeepBlock(s string) (string, bool) {
93         if v.PrefixLength > 0 && len(s) == v.PrefixLength+33 && s[:v.PrefixLength] == s[v.PrefixLength+1:v.PrefixLength*2+1] {
94                 s = s[v.PrefixLength+1:]
95         }
96         return s, s3AWSKeepBlockRegexp.MatchString(s)
97 }
98
99 // Return the key used for a given loc. If PrefixLength==0 then
100 // key("abcdef0123") is "abcdef0123", if PrefixLength==3 then key is
101 // "abc/abcdef0123", etc.
102 func (v *s3Volume) key(loc string) string {
103         if v.PrefixLength > 0 && v.PrefixLength < len(loc)-1 {
104                 return loc[:v.PrefixLength] + "/" + loc
105         } else {
106                 return loc
107         }
108 }
109
110 func news3Volume(params newVolumeParams) (volume, error) {
111         v := &s3Volume{
112                 cluster:    params.Cluster,
113                 volume:     params.ConfigVolume,
114                 metrics:    params.MetricsVecs,
115                 bufferPool: params.BufferPool,
116         }
117         err := json.Unmarshal(params.ConfigVolume.DriverParameters, v)
118         if err != nil {
119                 return nil, err
120         }
121         v.logger = params.Logger.WithField("Volume", v.DeviceID())
122         return v, v.check("")
123 }
124
125 func (v *s3Volume) translateError(err error) error {
126         if cerr := (interface{ CanceledError() bool })(nil); errors.As(err, &cerr) && cerr.CanceledError() {
127                 // *aws.RequestCanceledError and *smithy.CanceledError
128                 // implement this interface.
129                 return context.Canceled
130         }
131         var aerr smithy.APIError
132         if errors.As(err, &aerr) {
133                 switch aerr.ErrorCode() {
134                 case "NotFound", "NoSuchKey":
135                         return os.ErrNotExist
136                 }
137         }
138         return err
139 }
140
141 // safeCopy calls CopyObjectRequest, and checks the response to make
142 // sure the copy succeeded and updated the timestamp on the
143 // destination object
144 //
145 // (If something goes wrong during the copy, the error will be
146 // embedded in the 200 OK response)
147 func (v *s3Volume) safeCopy(dst, src string) error {
148         input := &s3.CopyObjectInput{
149                 Bucket:      aws.String(v.bucket.bucket),
150                 ContentType: aws.String("application/octet-stream"),
151                 CopySource:  aws.String(v.bucket.bucket + "/" + src),
152                 Key:         aws.String(dst),
153         }
154
155         resp, err := v.bucket.svc.CopyObject(context.Background(), input)
156
157         err = v.translateError(err)
158         if os.IsNotExist(err) {
159                 return err
160         } else if err != nil {
161                 return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.bucket+"/"+src, err)
162         } else if resp.CopyObjectResult.LastModified == nil {
163                 return fmt.Errorf("PutCopy(%q ← %q): succeeded but did not return a timestamp", dst, v.bucket.bucket+"/"+src)
164         } else if skew := time.Now().UTC().Sub(*resp.CopyObjectResult.LastModified); skew > maxClockSkew {
165                 return fmt.Errorf("PutCopy succeeded but returned old timestamp %s (skew %v > max %v, now %s)", resp.CopyObjectResult.LastModified, skew, maxClockSkew, time.Now())
166         }
167         return nil
168 }
169
170 func (v *s3Volume) check(ec2metadataHostname string) error {
171         if v.Bucket == "" {
172                 return errors.New("DriverParameters: Bucket must be provided")
173         }
174         if v.IndexPageSize == 0 {
175                 v.IndexPageSize = 1000
176         }
177         if v.RaceWindow < 0 {
178                 return errors.New("DriverParameters: RaceWindow must not be negative")
179         }
180
181         if v.V2Signature {
182                 return errors.New("DriverParameters: V2Signature is not supported")
183         }
184
185         if v.Endpoint == "" && v.Region == "" {
186                 return fmt.Errorf("AWS region or endpoint must be specified")
187         } else if v.Endpoint != "" {
188                 _, err := url.Parse(v.Endpoint)
189                 if err != nil {
190                         return fmt.Errorf("error parsing custom S3 endpoint %q: %w", v.Endpoint, err)
191                 }
192                 v.overrideEndpoint = &aws.Endpoint{
193                         URL:               v.Endpoint,
194                         HostnameImmutable: true,
195                         Source:            aws.EndpointSourceCustom,
196                 }
197         }
198         if v.Region == "" {
199                 // Endpoint is already specified (otherwise we would
200                 // have errored out above), but Region is also
201                 // required by the aws sdk, in order to determine
202                 // SignatureVersions.
203                 v.Region = "us-east-1"
204         }
205
206         // Zero timeouts mean "wait forever", which is a bad
207         // default. Default to long timeouts instead.
208         if v.ConnectTimeout == 0 {
209                 v.ConnectTimeout = s3DefaultConnectTimeout
210         }
211         if v.ReadTimeout == 0 {
212                 v.ReadTimeout = s3DefaultReadTimeout
213         }
214
215         cfg, err := config.LoadDefaultConfig(context.TODO(),
216                 config.WithRegion(v.Region),
217                 config.WithCredentialsCacheOptions(func(o *aws.CredentialsCacheOptions) {
218                         // (from aws-sdk-go-v2 comments) "allow the
219                         // credentials to trigger refreshing prior to
220                         // the credentials actually expiring. This is
221                         // beneficial so race conditions with expiring
222                         // credentials do not cause request to fail
223                         // unexpectedly due to ExpiredTokenException
224                         // exceptions."
225                         //
226                         // (from
227                         // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html)
228                         // "We make new credentials available at least
229                         // five minutes before the expiration of the
230                         // old credentials."
231                         o.ExpiryWindow = 5 * time.Minute
232                 }),
233                 func(o *config.LoadOptions) error {
234                         if v.AccessKeyID == "" && v.SecretAccessKey == "" {
235                                 // Use default sdk behavior (IAM / IMDS)
236                                 return nil
237                         }
238                         v.logger.Debug("using static credentials")
239                         o.Credentials = credentials.StaticCredentialsProvider{
240                                 Value: aws.Credentials{
241                                         AccessKeyID:     v.AccessKeyID,
242                                         SecretAccessKey: v.SecretAccessKey,
243                                         Source:          "Arvados configuration",
244                                 },
245                         }
246                         return nil
247                 },
248                 func(o *config.LoadOptions) error {
249                         if ec2metadataHostname != "" {
250                                 o.EC2IMDSEndpoint = ec2metadataHostname
251                         }
252                         if v.overrideEndpoint != nil {
253                                 o.EndpointResolverWithOptions = aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
254                                         if service == "S3" {
255                                                 return *v.overrideEndpoint, nil
256                                         }
257                                         return aws.Endpoint{}, errEndpointNotOverridden // use default resolver
258                                 })
259                         }
260                         return nil
261                 },
262         )
263         if err != nil {
264                 return fmt.Errorf("error loading aws client config: %w", err)
265         }
266
267         v.bucket = &s3Bucket{
268                 bucket: v.Bucket,
269                 svc: s3.NewFromConfig(cfg, func(o *s3.Options) {
270                         if v.usePathStyle {
271                                 o.UsePathStyle = true
272                         }
273                 }),
274         }
275
276         // Set up prometheus metrics
277         lbls := prometheus.Labels{"device_id": v.DeviceID()}
278         v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
279
280         return nil
281 }
282
283 // DeviceID returns a globally unique ID for the storage bucket.
284 func (v *s3Volume) DeviceID() string {
285         return "s3://" + v.Endpoint + "/" + v.Bucket
286 }
287
288 // EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
289 // and deletes them from the volume.
290 func (v *s3Volume) EmptyTrash() {
291         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
292
293         // Define "ready to delete" as "...when EmptyTrash started".
294         startT := time.Now()
295
296         emptyOneKey := func(trash *types.Object) {
297                 key := strings.TrimPrefix(*trash.Key, "trash/")
298                 loc, isblk := v.isKeepBlock(key)
299                 if !isblk {
300                         return
301                 }
302                 atomic.AddInt64(&bytesInTrash, *trash.Size)
303                 atomic.AddInt64(&blocksInTrash, 1)
304
305                 trashT := *trash.LastModified
306                 recent, err := v.head("recent/" + key)
307                 if err != nil && os.IsNotExist(v.translateError(err)) {
308                         v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", *trash.Key, "recent/"+key, err)
309                         err = v.BlockUntrash(loc)
310                         if err != nil {
311                                 v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc)
312                         }
313                         return
314                 } else if err != nil {
315                         v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+key)
316                         return
317                 }
318                 if trashT.Sub(*recent.LastModified) < v.cluster.Collections.BlobSigningTTL.Duration() {
319                         if age := startT.Sub(*recent.LastModified); age >= v.cluster.Collections.BlobSigningTTL.Duration()-time.Duration(v.RaceWindow) {
320                                 // recent/key is too old to protect
321                                 // loc from being Trashed again during
322                                 // the raceWindow that starts if we
323                                 // delete trash/X now.
324                                 //
325                                 // Note this means (TrashSweepInterval
326                                 // < BlobSigningTTL - raceWindow) is
327                                 // necessary to avoid starvation.
328                                 v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
329                                 v.fixRace(key)
330                                 v.BlockTouch(loc)
331                                 return
332                         }
333                         _, err := v.head(key)
334                         if os.IsNotExist(err) {
335                                 v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
336                                 v.fixRace(key)
337                                 return
338                         } else if err != nil {
339                                 v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
340                                 return
341                         }
342                 }
343                 if startT.Sub(trashT) < v.cluster.Collections.BlobTrashLifetime.Duration() {
344                         return
345                 }
346                 err = v.bucket.Del(*trash.Key)
347                 if err != nil {
348                         v.logger.WithError(err).Errorf("EmptyTrash: error deleting %q", *trash.Key)
349                         return
350                 }
351                 atomic.AddInt64(&bytesDeleted, *trash.Size)
352                 atomic.AddInt64(&blocksDeleted, 1)
353
354                 _, err = v.head(*trash.Key)
355                 if err == nil {
356                         v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc)
357                         return
358                 }
359                 if !os.IsNotExist(v.translateError(err)) {
360                         v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", key)
361                         return
362                 }
363                 err = v.bucket.Del("recent/" + key)
364                 if err != nil {
365                         v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+key)
366                 }
367         }
368
369         var wg sync.WaitGroup
370         todo := make(chan *types.Object, v.cluster.Collections.BlobDeleteConcurrency)
371         for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
372                 wg.Add(1)
373                 go func() {
374                         defer wg.Done()
375                         for key := range todo {
376                                 emptyOneKey(key)
377                         }
378                 }()
379         }
380
381         trashL := s3awsLister{
382                 Logger:   v.logger,
383                 Bucket:   v.bucket,
384                 Prefix:   "trash/",
385                 PageSize: v.IndexPageSize,
386                 Stats:    &v.bucket.stats,
387         }
388         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
389                 todo <- trash
390         }
391         close(todo)
392         wg.Wait()
393
394         if err := trashL.Error(); err != nil {
395                 v.logger.WithError(err).Error("EmptyTrash: lister failed")
396         }
397         v.logger.Infof("EmptyTrash: stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.DeviceID(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
398 }
399
400 // fixRace(X) is called when "recent/X" exists but "X" doesn't
401 // exist. If the timestamps on "recent/X" and "trash/X" indicate there
402 // was a race between Put and Trash, fixRace recovers from the race by
403 // Untrashing the block.
404 func (v *s3Volume) fixRace(key string) bool {
405         trash, err := v.head("trash/" + key)
406         if err != nil {
407                 if !os.IsNotExist(v.translateError(err)) {
408                         v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+key)
409                 }
410                 return false
411         }
412
413         recent, err := v.head("recent/" + key)
414         if err != nil {
415                 v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+key)
416                 return false
417         }
418
419         recentTime := *recent.LastModified
420         trashTime := *trash.LastModified
421         ageWhenTrashed := trashTime.Sub(recentTime)
422         if ageWhenTrashed >= v.cluster.Collections.BlobSigningTTL.Duration() {
423                 // No evidence of a race: block hasn't been written
424                 // since it became eligible for Trash. No fix needed.
425                 return false
426         }
427
428         v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", key, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
429         v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+key, key)
430         err = v.safeCopy(key, "trash/"+key)
431         if err != nil {
432                 v.logger.WithError(err).Error("fixRace: copy failed")
433                 return false
434         }
435         return true
436 }
437
438 func (v *s3Volume) head(key string) (result *s3.HeadObjectOutput, err error) {
439         input := &s3.HeadObjectInput{
440                 Bucket: aws.String(v.bucket.bucket),
441                 Key:    aws.String(key),
442         }
443
444         res, err := v.bucket.svc.HeadObject(context.Background(), input)
445
446         v.bucket.stats.TickOps("head")
447         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.HeadOps)
448         v.bucket.stats.TickErr(err)
449
450         if err != nil {
451                 return nil, v.translateError(err)
452         }
453         return res, nil
454 }
455
456 // BlockRead reads a Keep block that has been stored as a block blob
457 // in the S3 bucket.
458 func (v *s3Volume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
459         key := v.key(hash)
460         err := v.readWorker(ctx, key, w)
461         if err != nil {
462                 err = v.translateError(err)
463                 if !os.IsNotExist(err) {
464                         return err
465                 }
466
467                 _, err = v.head("recent/" + key)
468                 err = v.translateError(err)
469                 if err != nil {
470                         // If we can't read recent/X, there's no point in
471                         // trying fixRace. Give up.
472                         return err
473                 }
474                 if !v.fixRace(key) {
475                         err = os.ErrNotExist
476                         return err
477                 }
478
479                 err = v.readWorker(ctx, key, w)
480                 if err != nil {
481                         v.logger.Warnf("reading %s after successful fixRace: %s", hash, err)
482                         err = v.translateError(err)
483                         return err
484                 }
485         }
486         return nil
487 }
488
489 func (v *s3Volume) readWorker(ctx context.Context, key string, dst io.WriterAt) error {
490         downloader := manager.NewDownloader(v.bucket.svc, func(u *manager.Downloader) {
491                 u.PartSize = s3downloaderPartSize
492                 u.Concurrency = s3downloaderReadConcurrency
493         })
494         count, err := downloader.Download(ctx, dst, &s3.GetObjectInput{
495                 Bucket: aws.String(v.bucket.bucket),
496                 Key:    aws.String(key),
497         })
498         v.bucket.stats.TickOps("get")
499         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.GetOps)
500         v.bucket.stats.TickErr(err)
501         v.bucket.stats.TickInBytes(uint64(count))
502         return v.translateError(err)
503 }
504
505 func (v *s3Volume) writeObject(ctx context.Context, key string, r io.Reader) error {
506         if r == nil {
507                 // r == nil leads to a memory violation in func readFillBuf in
508                 // aws-sdk-go-v2@v0.23.0/service/s3/s3manager/upload.go
509                 r = bytes.NewReader(nil)
510         }
511
512         uploadInput := s3.PutObjectInput{
513                 Bucket: aws.String(v.bucket.bucket),
514                 Key:    aws.String(key),
515                 Body:   r,
516         }
517
518         if loc, ok := v.isKeepBlock(key); ok {
519                 var contentMD5 string
520                 md5, err := hex.DecodeString(loc)
521                 if err != nil {
522                         return v.translateError(err)
523                 }
524                 contentMD5 = base64.StdEncoding.EncodeToString(md5)
525                 uploadInput.ContentMD5 = &contentMD5
526         }
527
528         // Experimentation indicated that using concurrency 5 yields the best
529         // throughput, better than higher concurrency (10 or 13) by ~5%.
530         // Defining u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(64 * 1024 * 1024)
531         // is detrimental to throughput (minus ~15%).
532         uploader := manager.NewUploader(v.bucket.svc, func(u *manager.Uploader) {
533                 u.PartSize = s3uploaderPartSize
534                 u.Concurrency = s3uploaderWriteConcurrency
535         })
536
537         _, err := uploader.Upload(ctx, &uploadInput,
538                 // Avoid precomputing SHA256 before sending.
539                 manager.WithUploaderRequestOptions(s3.WithAPIOptions(v4.SwapComputePayloadSHA256ForUnsignedPayloadMiddleware)),
540         )
541
542         v.bucket.stats.TickOps("put")
543         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.PutOps)
544         v.bucket.stats.TickErr(err)
545
546         return v.translateError(err)
547 }
548
549 // Put writes a block.
550 func (v *s3Volume) BlockWrite(ctx context.Context, hash string, data []byte) error {
551         // Do not use putWithPipe here; we want to pass an io.ReadSeeker to the S3
552         // sdk to avoid memory allocation there. See #17339 for more information.
553         rdr := bytes.NewReader(data)
554         r := newCountingReaderAtSeeker(rdr, v.bucket.stats.TickOutBytes)
555         key := v.key(hash)
556         err := v.writeObject(ctx, key, r)
557         if err != nil {
558                 return err
559         }
560         return v.writeObject(ctx, "recent/"+key, nil)
561 }
562
563 type s3awsLister struct {
564         Logger            logrus.FieldLogger
565         Bucket            *s3Bucket
566         Prefix            string
567         PageSize          int
568         Stats             *s3awsbucketStats
569         ContinuationToken string
570         buf               []types.Object
571         err               error
572 }
573
574 // First fetches the first page and returns the first item. It returns
575 // nil if the response is the empty set or an error occurs.
576 func (lister *s3awsLister) First() *types.Object {
577         lister.getPage()
578         return lister.pop()
579 }
580
581 // Next returns the next item, fetching the next page if necessary. It
582 // returns nil if the last available item has already been fetched, or
583 // an error occurs.
584 func (lister *s3awsLister) Next() *types.Object {
585         if len(lister.buf) == 0 && lister.ContinuationToken != "" {
586                 lister.getPage()
587         }
588         return lister.pop()
589 }
590
591 // Return the most recent error encountered by First or Next.
592 func (lister *s3awsLister) Error() error {
593         return lister.err
594 }
595
596 func (lister *s3awsLister) getPage() {
597         lister.Stats.TickOps("list")
598         lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
599
600         var input *s3.ListObjectsV2Input
601         if lister.ContinuationToken == "" {
602                 input = &s3.ListObjectsV2Input{
603                         Bucket:  aws.String(lister.Bucket.bucket),
604                         MaxKeys: aws.Int32(int32(lister.PageSize)),
605                         Prefix:  aws.String(lister.Prefix),
606                 }
607         } else {
608                 input = &s3.ListObjectsV2Input{
609                         Bucket:            aws.String(lister.Bucket.bucket),
610                         MaxKeys:           aws.Int32(int32(lister.PageSize)),
611                         Prefix:            aws.String(lister.Prefix),
612                         ContinuationToken: &lister.ContinuationToken,
613                 }
614         }
615
616         resp, err := lister.Bucket.svc.ListObjectsV2(context.Background(), input)
617         if err != nil {
618                 var aerr smithy.APIError
619                 if errors.As(err, &aerr) {
620                         lister.err = aerr
621                 } else {
622                         lister.err = err
623                 }
624                 return
625         }
626
627         if *resp.IsTruncated {
628                 lister.ContinuationToken = *resp.NextContinuationToken
629         } else {
630                 lister.ContinuationToken = ""
631         }
632         lister.buf = make([]types.Object, 0, len(resp.Contents))
633         for _, key := range resp.Contents {
634                 if !strings.HasPrefix(*key.Key, lister.Prefix) {
635                         lister.Logger.Warnf("s3awsLister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, *key.Key)
636                         continue
637                 }
638                 lister.buf = append(lister.buf, key)
639         }
640 }
641
642 func (lister *s3awsLister) pop() (k *types.Object) {
643         if len(lister.buf) > 0 {
644                 k = &lister.buf[0]
645                 lister.buf = lister.buf[1:]
646         }
647         return
648 }
649
650 // Index writes a complete list of locators with the given prefix
651 // for which Get() can retrieve data.
652 func (v *s3Volume) Index(ctx context.Context, prefix string, writer io.Writer) error {
653         prefix = v.key(prefix)
654         // Use a merge sort to find matching sets of X and recent/X.
655         dataL := s3awsLister{
656                 Logger:   v.logger,
657                 Bucket:   v.bucket,
658                 Prefix:   prefix,
659                 PageSize: v.IndexPageSize,
660                 Stats:    &v.bucket.stats,
661         }
662         recentL := s3awsLister{
663                 Logger:   v.logger,
664                 Bucket:   v.bucket,
665                 Prefix:   "recent/" + prefix,
666                 PageSize: v.IndexPageSize,
667                 Stats:    &v.bucket.stats,
668         }
669         for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
670                 if ctx.Err() != nil {
671                         return ctx.Err()
672                 }
673                 if *data.Key >= "g" {
674                         // Conveniently, "recent/*" and "trash/*" are
675                         // lexically greater than all hex-encoded data
676                         // hashes, so stopping here avoids iterating
677                         // over all of them needlessly with dataL.
678                         break
679                 }
680                 loc, isblk := v.isKeepBlock(*data.Key)
681                 if !isblk {
682                         continue
683                 }
684
685                 // stamp is the list entry we should use to report the
686                 // last-modified time for this data block: it will be
687                 // the recent/X entry if one exists, otherwise the
688                 // entry for the data block itself.
689                 stamp := data
690
691                 // Advance to the corresponding recent/X marker, if any
692                 for recent != nil && recentL.Error() == nil {
693                         if cmp := strings.Compare((*recent.Key)[7:], *data.Key); cmp < 0 {
694                                 recent = recentL.Next()
695                                 continue
696                         } else if cmp == 0 {
697                                 stamp = recent
698                                 recent = recentL.Next()
699                                 break
700                         } else {
701                                 // recent/X marker is missing: we'll
702                                 // use the timestamp on the data
703                                 // object.
704                                 break
705                         }
706                 }
707                 if err := recentL.Error(); err != nil {
708                         return err
709                 }
710                 // We truncate sub-second precision here. Otherwise
711                 // timestamps will never match the RFC1123-formatted
712                 // Last-Modified values parsed by Mtime().
713                 fmt.Fprintf(writer, "%s+%d %d\n", loc, *data.Size, stamp.LastModified.Unix()*1000000000)
714         }
715         return dataL.Error()
716 }
717
718 // Mtime returns the stored timestamp for the given locator.
719 func (v *s3Volume) Mtime(loc string) (time.Time, error) {
720         key := v.key(loc)
721         _, err := v.head(key)
722         if err != nil {
723                 return s3AWSZeroTime, v.translateError(err)
724         }
725         resp, err := v.head("recent/" + key)
726         err = v.translateError(err)
727         if os.IsNotExist(err) {
728                 // The data object X exists, but recent/X is missing.
729                 err = v.writeObject(context.Background(), "recent/"+key, nil)
730                 if err != nil {
731                         v.logger.WithError(err).Errorf("error creating %q", "recent/"+key)
732                         return s3AWSZeroTime, v.translateError(err)
733                 }
734                 v.logger.Infof("Mtime: created %q to migrate existing block to new storage scheme", "recent/"+key)
735                 resp, err = v.head("recent/" + key)
736                 if err != nil {
737                         v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+key)
738                         return s3AWSZeroTime, v.translateError(err)
739                 }
740         } else if err != nil {
741                 // HEAD recent/X failed for some other reason.
742                 return s3AWSZeroTime, err
743         }
744         return *resp.LastModified, err
745 }
746
747 // InternalStats returns bucket I/O and API call counters.
748 func (v *s3Volume) InternalStats() interface{} {
749         return &v.bucket.stats
750 }
751
752 // BlockTouch sets the timestamp for the given locator to the current time.
753 func (v *s3Volume) BlockTouch(hash string) error {
754         key := v.key(hash)
755         _, err := v.head(key)
756         err = v.translateError(err)
757         if os.IsNotExist(err) && v.fixRace(key) {
758                 // The data object got trashed in a race, but fixRace
759                 // rescued it.
760         } else if err != nil {
761                 return err
762         }
763         err = v.writeObject(context.Background(), "recent/"+key, nil)
764         return v.translateError(err)
765 }
766
767 // checkRaceWindow returns a non-nil error if trash/key is, or might
768 // be, in the race window (i.e., it's not safe to trash key).
769 func (v *s3Volume) checkRaceWindow(key string) error {
770         resp, err := v.head("trash/" + key)
771         err = v.translateError(err)
772         if os.IsNotExist(err) {
773                 // OK, trash/X doesn't exist so we're not in the race
774                 // window
775                 return nil
776         } else if err != nil {
777                 // Error looking up trash/X. We don't know whether
778                 // we're in the race window
779                 return err
780         }
781         t := resp.LastModified
782         safeWindow := t.Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
783         if safeWindow <= 0 {
784                 // We can't count on "touch trash/X" to prolong
785                 // trash/X's lifetime. The new timestamp might not
786                 // become visible until now+raceWindow, and EmptyTrash
787                 // is allowed to delete trash/X before then.
788                 return fmt.Errorf("%s: same block is already in trash, and safe window ended %s ago", key, -safeWindow)
789         }
790         // trash/X exists, but it won't be eligible for deletion until
791         // after now+raceWindow, so it's safe to overwrite it.
792         return nil
793 }
794
795 func (b *s3Bucket) Del(path string) error {
796         input := &s3.DeleteObjectInput{
797                 Bucket: aws.String(b.bucket),
798                 Key:    aws.String(path),
799         }
800         _, err := b.svc.DeleteObject(context.Background(), input)
801         b.stats.TickOps("delete")
802         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
803         b.stats.TickErr(err)
804         return err
805 }
806
807 // Trash a Keep block.
808 func (v *s3Volume) BlockTrash(loc string) error {
809         if t, err := v.Mtime(loc); err != nil {
810                 return err
811         } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
812                 return nil
813         }
814         key := v.key(loc)
815         if v.cluster.Collections.BlobTrashLifetime == 0 {
816                 if !v.UnsafeDelete {
817                         return errS3TrashDisabled
818                 }
819                 return v.translateError(v.bucket.Del(key))
820         }
821         err := v.checkRaceWindow(key)
822         if err != nil {
823                 return err
824         }
825         err = v.safeCopy("trash/"+key, key)
826         if err != nil {
827                 return err
828         }
829         return v.translateError(v.bucket.Del(key))
830 }
831
832 // BlockUntrash moves block from trash back into store
833 func (v *s3Volume) BlockUntrash(hash string) error {
834         key := v.key(hash)
835         err := v.safeCopy(key, "trash/"+key)
836         if err != nil {
837                 return err
838         }
839         err = v.writeObject(context.Background(), "recent/"+key, nil)
840         return v.translateError(err)
841 }
842
843 type s3awsbucketStats struct {
844         statsTicker
845         Ops     uint64
846         GetOps  uint64
847         PutOps  uint64
848         HeadOps uint64
849         DelOps  uint64
850         ListOps uint64
851 }
852
853 func (s *s3awsbucketStats) TickErr(err error) {
854         if err == nil {
855                 return
856         }
857         errType := fmt.Sprintf("%T", err)
858         if aerr := smithy.APIError(nil); errors.As(err, &aerr) {
859                 if rerr := interface{ HTTPStatusCode() int }(nil); errors.As(err, &rerr) {
860                         errType = errType + fmt.Sprintf(" %d %s", rerr.HTTPStatusCode(), aerr.ErrorCode())
861                 } else {
862                         errType = errType + fmt.Sprintf(" 000 %s", aerr.ErrorCode())
863                 }
864         }
865         s.statsTicker.TickErr(err, errType)
866 }