Merge branch '21704-eject-cra'
[arvados.git] / services / keepstore / s3_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "encoding/base64"
11         "encoding/hex"
12         "encoding/json"
13         "errors"
14         "fmt"
15         "io"
16         "net/url"
17         "os"
18         "regexp"
19         "strings"
20         "sync"
21         "sync/atomic"
22         "time"
23
24         "git.arvados.org/arvados.git/sdk/go/arvados"
25         "github.com/aws/aws-sdk-go-v2/aws"
26         v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
27         "github.com/aws/aws-sdk-go-v2/config"
28         "github.com/aws/aws-sdk-go-v2/credentials"
29         "github.com/aws/aws-sdk-go-v2/feature/s3/manager"
30         "github.com/aws/aws-sdk-go-v2/service/s3"
31         "github.com/aws/aws-sdk-go-v2/service/s3/types"
32         "github.com/aws/smithy-go"
33         "github.com/prometheus/client_golang/prometheus"
34         "github.com/sirupsen/logrus"
35 )
36
37 func init() {
38         driver["S3"] = news3Volume
39 }
40
41 const (
42         s3DefaultReadTimeout        = arvados.Duration(10 * time.Minute)
43         s3DefaultConnectTimeout     = arvados.Duration(time.Minute)
44         maxClockSkew                = 600 * time.Second
45         nearlyRFC1123               = "Mon, 2 Jan 2006 15:04:05 GMT"
46         s3downloaderPartSize        = 6 * 1024 * 1024
47         s3downloaderReadConcurrency = 11
48         s3uploaderPartSize          = 5 * 1024 * 1024
49         s3uploaderWriteConcurrency  = 5
50 )
51
52 var (
53         errS3TrashDisabled        = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false")
54         s3AWSKeepBlockRegexp      = regexp.MustCompile(`^[0-9a-f]{32}$`)
55         s3AWSZeroTime             time.Time
56         defaultEndpointResolverV2 = s3.NewDefaultEndpointResolverV2()
57
58         // Returned by an aws.EndpointResolverWithOptions to indicate
59         // that the default resolver should be used.
60         errEndpointNotOverridden = &aws.EndpointNotFoundError{Err: errors.New("endpoint not overridden")}
61 )
62
63 // s3Volume implements Volume using an S3 bucket.
64 type s3Volume struct {
65         arvados.S3VolumeDriverParameters
66
67         cluster    *arvados.Cluster
68         volume     arvados.Volume
69         logger     logrus.FieldLogger
70         metrics    *volumeMetricsVecs
71         bufferPool *bufferPool
72         bucket     *s3Bucket
73         region     string
74         startOnce  sync.Once
75
76         overrideEndpoint *aws.Endpoint
77         usePathStyle     bool // used by test suite
78 }
79
80 // s3bucket wraps s3.bucket and counts I/O and API usage stats. The
81 // wrapped bucket can be replaced atomically with SetBucket in order
82 // to update credentials.
83 type s3Bucket struct {
84         bucket string
85         svc    *s3.Client
86         stats  s3awsbucketStats
87         mu     sync.Mutex
88 }
89
90 func (v *s3Volume) isKeepBlock(s string) (string, bool) {
91         if v.PrefixLength > 0 && len(s) == v.PrefixLength+33 && s[:v.PrefixLength] == s[v.PrefixLength+1:v.PrefixLength*2+1] {
92                 s = s[v.PrefixLength+1:]
93         }
94         return s, s3AWSKeepBlockRegexp.MatchString(s)
95 }
96
97 // Return the key used for a given loc. If PrefixLength==0 then
98 // key("abcdef0123") is "abcdef0123", if PrefixLength==3 then key is
99 // "abc/abcdef0123", etc.
100 func (v *s3Volume) key(loc string) string {
101         if v.PrefixLength > 0 && v.PrefixLength < len(loc)-1 {
102                 return loc[:v.PrefixLength] + "/" + loc
103         } else {
104                 return loc
105         }
106 }
107
108 func news3Volume(params newVolumeParams) (volume, error) {
109         v := &s3Volume{
110                 cluster:    params.Cluster,
111                 volume:     params.ConfigVolume,
112                 metrics:    params.MetricsVecs,
113                 bufferPool: params.BufferPool,
114         }
115         err := json.Unmarshal(params.ConfigVolume.DriverParameters, v)
116         if err != nil {
117                 return nil, err
118         }
119         v.logger = params.Logger.WithField("Volume", v.DeviceID())
120         return v, v.check("")
121 }
122
123 func (v *s3Volume) translateError(err error) error {
124         if cerr := (interface{ CanceledError() bool })(nil); errors.As(err, &cerr) && cerr.CanceledError() {
125                 // *aws.RequestCanceledError and *smithy.CanceledError
126                 // implement this interface.
127                 return context.Canceled
128         }
129         var aerr smithy.APIError
130         if errors.As(err, &aerr) {
131                 switch aerr.ErrorCode() {
132                 case "NotFound", "NoSuchKey":
133                         return os.ErrNotExist
134                 }
135         }
136         return err
137 }
138
139 // safeCopy calls CopyObjectRequest, and checks the response to make
140 // sure the copy succeeded and updated the timestamp on the
141 // destination object
142 //
143 // (If something goes wrong during the copy, the error will be
144 // embedded in the 200 OK response)
145 func (v *s3Volume) safeCopy(dst, src string) error {
146         input := &s3.CopyObjectInput{
147                 Bucket:      aws.String(v.bucket.bucket),
148                 ContentType: aws.String("application/octet-stream"),
149                 CopySource:  aws.String(v.bucket.bucket + "/" + src),
150                 Key:         aws.String(dst),
151         }
152
153         resp, err := v.bucket.svc.CopyObject(context.Background(), input)
154
155         err = v.translateError(err)
156         if os.IsNotExist(err) {
157                 return err
158         } else if err != nil {
159                 return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.bucket+"/"+src, err)
160         } else if resp.CopyObjectResult.LastModified == nil {
161                 return fmt.Errorf("PutCopy(%q ← %q): succeeded but did not return a timestamp", dst, v.bucket.bucket+"/"+src)
162         } else if skew := time.Now().UTC().Sub(*resp.CopyObjectResult.LastModified); skew > maxClockSkew {
163                 return fmt.Errorf("PutCopy succeeded but returned old timestamp %s (skew %v > max %v, now %s)", resp.CopyObjectResult.LastModified, skew, maxClockSkew, time.Now())
164         }
165         return nil
166 }
167
168 func (v *s3Volume) check(ec2metadataHostname string) error {
169         if v.Bucket == "" {
170                 return errors.New("DriverParameters: Bucket must be provided")
171         }
172         if v.IndexPageSize == 0 {
173                 v.IndexPageSize = 1000
174         }
175         if v.RaceWindow < 0 {
176                 return errors.New("DriverParameters: RaceWindow must not be negative")
177         }
178
179         if v.V2Signature {
180                 return errors.New("DriverParameters: V2Signature is not supported")
181         }
182
183         if v.Endpoint == "" && v.Region == "" {
184                 return fmt.Errorf("AWS region or endpoint must be specified")
185         } else if v.Endpoint != "" {
186                 _, err := url.Parse(v.Endpoint)
187                 if err != nil {
188                         return fmt.Errorf("error parsing custom S3 endpoint %q: %w", v.Endpoint, err)
189                 }
190                 v.overrideEndpoint = &aws.Endpoint{
191                         URL:               v.Endpoint,
192                         HostnameImmutable: true,
193                         Source:            aws.EndpointSourceCustom,
194                 }
195         }
196         if v.Region == "" {
197                 // Endpoint is already specified (otherwise we would
198                 // have errored out above), but Region is also
199                 // required by the aws sdk, in order to determine
200                 // SignatureVersions.
201                 v.Region = "us-east-1"
202         }
203
204         // Zero timeouts mean "wait forever", which is a bad
205         // default. Default to long timeouts instead.
206         if v.ConnectTimeout == 0 {
207                 v.ConnectTimeout = s3DefaultConnectTimeout
208         }
209         if v.ReadTimeout == 0 {
210                 v.ReadTimeout = s3DefaultReadTimeout
211         }
212
213         cfg, err := config.LoadDefaultConfig(context.TODO(),
214                 config.WithRegion(v.Region),
215                 config.WithCredentialsCacheOptions(func(o *aws.CredentialsCacheOptions) {
216                         // (from aws-sdk-go-v2 comments) "allow the
217                         // credentials to trigger refreshing prior to
218                         // the credentials actually expiring. This is
219                         // beneficial so race conditions with expiring
220                         // credentials do not cause request to fail
221                         // unexpectedly due to ExpiredTokenException
222                         // exceptions."
223                         //
224                         // (from
225                         // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html)
226                         // "We make new credentials available at least
227                         // five minutes before the expiration of the
228                         // old credentials."
229                         o.ExpiryWindow = 5 * time.Minute
230                 }),
231                 func(o *config.LoadOptions) error {
232                         if v.AccessKeyID == "" && v.SecretAccessKey == "" {
233                                 // Use default sdk behavior (IAM / IMDS)
234                                 return nil
235                         }
236                         v.logger.Debug("using static credentials")
237                         o.Credentials = credentials.StaticCredentialsProvider{
238                                 Value: aws.Credentials{
239                                         AccessKeyID:     v.AccessKeyID,
240                                         SecretAccessKey: v.SecretAccessKey,
241                                         Source:          "Arvados configuration",
242                                 },
243                         }
244                         return nil
245                 },
246                 func(o *config.LoadOptions) error {
247                         if ec2metadataHostname != "" {
248                                 o.EC2IMDSEndpoint = ec2metadataHostname
249                         }
250                         if v.overrideEndpoint != nil {
251                                 o.EndpointResolverWithOptions = aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
252                                         if service == "S3" {
253                                                 return *v.overrideEndpoint, nil
254                                         }
255                                         return aws.Endpoint{}, errEndpointNotOverridden // use default resolver
256                                 })
257                         }
258                         return nil
259                 },
260         )
261         if err != nil {
262                 return fmt.Errorf("error loading aws client config: %w", err)
263         }
264
265         v.bucket = &s3Bucket{
266                 bucket: v.Bucket,
267                 svc: s3.NewFromConfig(cfg, func(o *s3.Options) {
268                         if v.usePathStyle {
269                                 o.UsePathStyle = true
270                         }
271                 }),
272         }
273
274         // Set up prometheus metrics
275         lbls := prometheus.Labels{"device_id": v.DeviceID()}
276         v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
277
278         return nil
279 }
280
281 // DeviceID returns a globally unique ID for the storage bucket.
282 func (v *s3Volume) DeviceID() string {
283         return "s3://" + v.Endpoint + "/" + v.Bucket
284 }
285
286 // EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
287 // and deletes them from the volume.
288 func (v *s3Volume) EmptyTrash() {
289         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
290
291         // Define "ready to delete" as "...when EmptyTrash started".
292         startT := time.Now()
293
294         emptyOneKey := func(trash *types.Object) {
295                 key := strings.TrimPrefix(*trash.Key, "trash/")
296                 loc, isblk := v.isKeepBlock(key)
297                 if !isblk {
298                         return
299                 }
300                 atomic.AddInt64(&bytesInTrash, *trash.Size)
301                 atomic.AddInt64(&blocksInTrash, 1)
302
303                 trashT := *trash.LastModified
304                 recent, err := v.head("recent/" + key)
305                 if err != nil && os.IsNotExist(v.translateError(err)) {
306                         v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", *trash.Key, "recent/"+key, err)
307                         err = v.BlockUntrash(loc)
308                         if err != nil {
309                                 v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc)
310                         }
311                         return
312                 } else if err != nil {
313                         v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+key)
314                         return
315                 }
316                 if trashT.Sub(*recent.LastModified) < v.cluster.Collections.BlobSigningTTL.Duration() {
317                         if age := startT.Sub(*recent.LastModified); age >= v.cluster.Collections.BlobSigningTTL.Duration()-time.Duration(v.RaceWindow) {
318                                 // recent/key is too old to protect
319                                 // loc from being Trashed again during
320                                 // the raceWindow that starts if we
321                                 // delete trash/X now.
322                                 //
323                                 // Note this means (TrashSweepInterval
324                                 // < BlobSigningTTL - raceWindow) is
325                                 // necessary to avoid starvation.
326                                 v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
327                                 v.fixRace(key)
328                                 v.BlockTouch(loc)
329                                 return
330                         }
331                         _, err := v.head(key)
332                         if os.IsNotExist(err) {
333                                 v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
334                                 v.fixRace(key)
335                                 return
336                         } else if err != nil {
337                                 v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
338                                 return
339                         }
340                 }
341                 if startT.Sub(trashT) < v.cluster.Collections.BlobTrashLifetime.Duration() {
342                         return
343                 }
344                 err = v.bucket.Del(*trash.Key)
345                 if err != nil {
346                         v.logger.WithError(err).Errorf("EmptyTrash: error deleting %q", *trash.Key)
347                         return
348                 }
349                 atomic.AddInt64(&bytesDeleted, *trash.Size)
350                 atomic.AddInt64(&blocksDeleted, 1)
351
352                 _, err = v.head(*trash.Key)
353                 if err == nil {
354                         v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc)
355                         return
356                 }
357                 if !os.IsNotExist(v.translateError(err)) {
358                         v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", key)
359                         return
360                 }
361                 err = v.bucket.Del("recent/" + key)
362                 if err != nil {
363                         v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+key)
364                 }
365         }
366
367         var wg sync.WaitGroup
368         todo := make(chan *types.Object, v.cluster.Collections.BlobDeleteConcurrency)
369         for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
370                 wg.Add(1)
371                 go func() {
372                         defer wg.Done()
373                         for key := range todo {
374                                 emptyOneKey(key)
375                         }
376                 }()
377         }
378
379         trashL := s3awsLister{
380                 Logger:   v.logger,
381                 Bucket:   v.bucket,
382                 Prefix:   "trash/",
383                 PageSize: v.IndexPageSize,
384                 Stats:    &v.bucket.stats,
385         }
386         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
387                 todo <- trash
388         }
389         close(todo)
390         wg.Wait()
391
392         if err := trashL.Error(); err != nil {
393                 v.logger.WithError(err).Error("EmptyTrash: lister failed")
394         }
395         v.logger.Infof("EmptyTrash: stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.DeviceID(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
396 }
397
398 // fixRace(X) is called when "recent/X" exists but "X" doesn't
399 // exist. If the timestamps on "recent/X" and "trash/X" indicate there
400 // was a race between Put and Trash, fixRace recovers from the race by
401 // Untrashing the block.
402 func (v *s3Volume) fixRace(key string) bool {
403         trash, err := v.head("trash/" + key)
404         if err != nil {
405                 if !os.IsNotExist(v.translateError(err)) {
406                         v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+key)
407                 }
408                 return false
409         }
410
411         recent, err := v.head("recent/" + key)
412         if err != nil {
413                 v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+key)
414                 return false
415         }
416
417         recentTime := *recent.LastModified
418         trashTime := *trash.LastModified
419         ageWhenTrashed := trashTime.Sub(recentTime)
420         if ageWhenTrashed >= v.cluster.Collections.BlobSigningTTL.Duration() {
421                 // No evidence of a race: block hasn't been written
422                 // since it became eligible for Trash. No fix needed.
423                 return false
424         }
425
426         v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", key, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
427         v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+key, key)
428         err = v.safeCopy(key, "trash/"+key)
429         if err != nil {
430                 v.logger.WithError(err).Error("fixRace: copy failed")
431                 return false
432         }
433         return true
434 }
435
436 func (v *s3Volume) head(key string) (result *s3.HeadObjectOutput, err error) {
437         input := &s3.HeadObjectInput{
438                 Bucket: aws.String(v.bucket.bucket),
439                 Key:    aws.String(key),
440         }
441
442         res, err := v.bucket.svc.HeadObject(context.Background(), input)
443
444         v.bucket.stats.TickOps("head")
445         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.HeadOps)
446         v.bucket.stats.TickErr(err)
447
448         if err != nil {
449                 return nil, v.translateError(err)
450         }
451         return res, nil
452 }
453
454 // BlockRead reads a Keep block that has been stored as a block blob
455 // in the S3 bucket.
456 func (v *s3Volume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
457         key := v.key(hash)
458         err := v.readWorker(ctx, key, w)
459         if err != nil {
460                 err = v.translateError(err)
461                 if !os.IsNotExist(err) {
462                         return err
463                 }
464
465                 _, err = v.head("recent/" + key)
466                 err = v.translateError(err)
467                 if err != nil {
468                         // If we can't read recent/X, there's no point in
469                         // trying fixRace. Give up.
470                         return err
471                 }
472                 if !v.fixRace(key) {
473                         err = os.ErrNotExist
474                         return err
475                 }
476
477                 err = v.readWorker(ctx, key, w)
478                 if err != nil {
479                         v.logger.Warnf("reading %s after successful fixRace: %s", hash, err)
480                         err = v.translateError(err)
481                         return err
482                 }
483         }
484         return nil
485 }
486
487 func (v *s3Volume) readWorker(ctx context.Context, key string, dst io.WriterAt) error {
488         downloader := manager.NewDownloader(v.bucket.svc, func(u *manager.Downloader) {
489                 u.PartSize = s3downloaderPartSize
490                 u.Concurrency = s3downloaderReadConcurrency
491         })
492         count, err := downloader.Download(ctx, dst, &s3.GetObjectInput{
493                 Bucket: aws.String(v.bucket.bucket),
494                 Key:    aws.String(key),
495         })
496         v.bucket.stats.TickOps("get")
497         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.GetOps)
498         v.bucket.stats.TickErr(err)
499         v.bucket.stats.TickInBytes(uint64(count))
500         return v.translateError(err)
501 }
502
503 func (v *s3Volume) writeObject(ctx context.Context, key string, r io.Reader) error {
504         if r == nil {
505                 // r == nil leads to a memory violation in func readFillBuf in
506                 // aws-sdk-go-v2@v0.23.0/service/s3/s3manager/upload.go
507                 r = bytes.NewReader(nil)
508         }
509
510         uploadInput := s3.PutObjectInput{
511                 Bucket: aws.String(v.bucket.bucket),
512                 Key:    aws.String(key),
513                 Body:   r,
514         }
515
516         if loc, ok := v.isKeepBlock(key); ok {
517                 var contentMD5 string
518                 md5, err := hex.DecodeString(loc)
519                 if err != nil {
520                         return v.translateError(err)
521                 }
522                 contentMD5 = base64.StdEncoding.EncodeToString(md5)
523                 uploadInput.ContentMD5 = &contentMD5
524         }
525
526         // Experimentation indicated that using concurrency 5 yields the best
527         // throughput, better than higher concurrency (10 or 13) by ~5%.
528         // Defining u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(64 * 1024 * 1024)
529         // is detrimental to throughput (minus ~15%).
530         uploader := manager.NewUploader(v.bucket.svc, func(u *manager.Uploader) {
531                 u.PartSize = s3uploaderPartSize
532                 u.Concurrency = s3uploaderWriteConcurrency
533         })
534
535         _, err := uploader.Upload(ctx, &uploadInput,
536                 // Avoid precomputing SHA256 before sending.
537                 manager.WithUploaderRequestOptions(s3.WithAPIOptions(v4.SwapComputePayloadSHA256ForUnsignedPayloadMiddleware)),
538         )
539
540         v.bucket.stats.TickOps("put")
541         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.PutOps)
542         v.bucket.stats.TickErr(err)
543
544         return v.translateError(err)
545 }
546
547 // Put writes a block.
548 func (v *s3Volume) BlockWrite(ctx context.Context, hash string, data []byte) error {
549         // Do not use putWithPipe here; we want to pass an io.ReadSeeker to the S3
550         // sdk to avoid memory allocation there. See #17339 for more information.
551         rdr := bytes.NewReader(data)
552         r := newCountingReaderAtSeeker(rdr, v.bucket.stats.TickOutBytes)
553         key := v.key(hash)
554         err := v.writeObject(ctx, key, r)
555         if err != nil {
556                 return err
557         }
558         return v.writeObject(ctx, "recent/"+key, nil)
559 }
560
561 type s3awsLister struct {
562         Logger            logrus.FieldLogger
563         Bucket            *s3Bucket
564         Prefix            string
565         PageSize          int
566         Stats             *s3awsbucketStats
567         ContinuationToken string
568         buf               []types.Object
569         err               error
570 }
571
572 // First fetches the first page and returns the first item. It returns
573 // nil if the response is the empty set or an error occurs.
574 func (lister *s3awsLister) First() *types.Object {
575         lister.getPage()
576         return lister.pop()
577 }
578
579 // Next returns the next item, fetching the next page if necessary. It
580 // returns nil if the last available item has already been fetched, or
581 // an error occurs.
582 func (lister *s3awsLister) Next() *types.Object {
583         if len(lister.buf) == 0 && lister.ContinuationToken != "" {
584                 lister.getPage()
585         }
586         return lister.pop()
587 }
588
589 // Return the most recent error encountered by First or Next.
590 func (lister *s3awsLister) Error() error {
591         return lister.err
592 }
593
594 func (lister *s3awsLister) getPage() {
595         lister.Stats.TickOps("list")
596         lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
597
598         var input *s3.ListObjectsV2Input
599         if lister.ContinuationToken == "" {
600                 input = &s3.ListObjectsV2Input{
601                         Bucket:  aws.String(lister.Bucket.bucket),
602                         MaxKeys: aws.Int32(int32(lister.PageSize)),
603                         Prefix:  aws.String(lister.Prefix),
604                 }
605         } else {
606                 input = &s3.ListObjectsV2Input{
607                         Bucket:            aws.String(lister.Bucket.bucket),
608                         MaxKeys:           aws.Int32(int32(lister.PageSize)),
609                         Prefix:            aws.String(lister.Prefix),
610                         ContinuationToken: &lister.ContinuationToken,
611                 }
612         }
613
614         resp, err := lister.Bucket.svc.ListObjectsV2(context.Background(), input)
615         if err != nil {
616                 var aerr smithy.APIError
617                 if errors.As(err, &aerr) {
618                         lister.err = aerr
619                 } else {
620                         lister.err = err
621                 }
622                 return
623         }
624
625         if *resp.IsTruncated {
626                 lister.ContinuationToken = *resp.NextContinuationToken
627         } else {
628                 lister.ContinuationToken = ""
629         }
630         lister.buf = make([]types.Object, 0, len(resp.Contents))
631         for _, key := range resp.Contents {
632                 if !strings.HasPrefix(*key.Key, lister.Prefix) {
633                         lister.Logger.Warnf("s3awsLister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, *key.Key)
634                         continue
635                 }
636                 lister.buf = append(lister.buf, key)
637         }
638 }
639
640 func (lister *s3awsLister) pop() (k *types.Object) {
641         if len(lister.buf) > 0 {
642                 k = &lister.buf[0]
643                 lister.buf = lister.buf[1:]
644         }
645         return
646 }
647
648 // Index writes a complete list of locators with the given prefix
649 // for which Get() can retrieve data.
650 func (v *s3Volume) Index(ctx context.Context, prefix string, writer io.Writer) error {
651         prefix = v.key(prefix)
652         // Use a merge sort to find matching sets of X and recent/X.
653         dataL := s3awsLister{
654                 Logger:   v.logger,
655                 Bucket:   v.bucket,
656                 Prefix:   prefix,
657                 PageSize: v.IndexPageSize,
658                 Stats:    &v.bucket.stats,
659         }
660         recentL := s3awsLister{
661                 Logger:   v.logger,
662                 Bucket:   v.bucket,
663                 Prefix:   "recent/" + prefix,
664                 PageSize: v.IndexPageSize,
665                 Stats:    &v.bucket.stats,
666         }
667         for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
668                 if ctx.Err() != nil {
669                         return ctx.Err()
670                 }
671                 if *data.Key >= "g" {
672                         // Conveniently, "recent/*" and "trash/*" are
673                         // lexically greater than all hex-encoded data
674                         // hashes, so stopping here avoids iterating
675                         // over all of them needlessly with dataL.
676                         break
677                 }
678                 loc, isblk := v.isKeepBlock(*data.Key)
679                 if !isblk {
680                         continue
681                 }
682
683                 // stamp is the list entry we should use to report the
684                 // last-modified time for this data block: it will be
685                 // the recent/X entry if one exists, otherwise the
686                 // entry for the data block itself.
687                 stamp := data
688
689                 // Advance to the corresponding recent/X marker, if any
690                 for recent != nil && recentL.Error() == nil {
691                         if cmp := strings.Compare((*recent.Key)[7:], *data.Key); cmp < 0 {
692                                 recent = recentL.Next()
693                                 continue
694                         } else if cmp == 0 {
695                                 stamp = recent
696                                 recent = recentL.Next()
697                                 break
698                         } else {
699                                 // recent/X marker is missing: we'll
700                                 // use the timestamp on the data
701                                 // object.
702                                 break
703                         }
704                 }
705                 if err := recentL.Error(); err != nil {
706                         return err
707                 }
708                 // We truncate sub-second precision here. Otherwise
709                 // timestamps will never match the RFC1123-formatted
710                 // Last-Modified values parsed by Mtime().
711                 fmt.Fprintf(writer, "%s+%d %d\n", loc, *data.Size, stamp.LastModified.Unix()*1000000000)
712         }
713         return dataL.Error()
714 }
715
716 // Mtime returns the stored timestamp for the given locator.
717 func (v *s3Volume) Mtime(loc string) (time.Time, error) {
718         key := v.key(loc)
719         _, err := v.head(key)
720         if err != nil {
721                 return s3AWSZeroTime, v.translateError(err)
722         }
723         resp, err := v.head("recent/" + key)
724         err = v.translateError(err)
725         if os.IsNotExist(err) {
726                 // The data object X exists, but recent/X is missing.
727                 err = v.writeObject(context.Background(), "recent/"+key, nil)
728                 if err != nil {
729                         v.logger.WithError(err).Errorf("error creating %q", "recent/"+key)
730                         return s3AWSZeroTime, v.translateError(err)
731                 }
732                 v.logger.Infof("Mtime: created %q to migrate existing block to new storage scheme", "recent/"+key)
733                 resp, err = v.head("recent/" + key)
734                 if err != nil {
735                         v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+key)
736                         return s3AWSZeroTime, v.translateError(err)
737                 }
738         } else if err != nil {
739                 // HEAD recent/X failed for some other reason.
740                 return s3AWSZeroTime, err
741         }
742         return *resp.LastModified, err
743 }
744
745 // InternalStats returns bucket I/O and API call counters.
746 func (v *s3Volume) InternalStats() interface{} {
747         return &v.bucket.stats
748 }
749
750 // BlockTouch sets the timestamp for the given locator to the current time.
751 func (v *s3Volume) BlockTouch(hash string) error {
752         key := v.key(hash)
753         _, err := v.head(key)
754         err = v.translateError(err)
755         if os.IsNotExist(err) && v.fixRace(key) {
756                 // The data object got trashed in a race, but fixRace
757                 // rescued it.
758         } else if err != nil {
759                 return err
760         }
761         err = v.writeObject(context.Background(), "recent/"+key, nil)
762         return v.translateError(err)
763 }
764
765 // checkRaceWindow returns a non-nil error if trash/key is, or might
766 // be, in the race window (i.e., it's not safe to trash key).
767 func (v *s3Volume) checkRaceWindow(key string) error {
768         resp, err := v.head("trash/" + key)
769         err = v.translateError(err)
770         if os.IsNotExist(err) {
771                 // OK, trash/X doesn't exist so we're not in the race
772                 // window
773                 return nil
774         } else if err != nil {
775                 // Error looking up trash/X. We don't know whether
776                 // we're in the race window
777                 return err
778         }
779         t := resp.LastModified
780         safeWindow := t.Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
781         if safeWindow <= 0 {
782                 // We can't count on "touch trash/X" to prolong
783                 // trash/X's lifetime. The new timestamp might not
784                 // become visible until now+raceWindow, and EmptyTrash
785                 // is allowed to delete trash/X before then.
786                 return fmt.Errorf("%s: same block is already in trash, and safe window ended %s ago", key, -safeWindow)
787         }
788         // trash/X exists, but it won't be eligible for deletion until
789         // after now+raceWindow, so it's safe to overwrite it.
790         return nil
791 }
792
793 func (b *s3Bucket) Del(path string) error {
794         input := &s3.DeleteObjectInput{
795                 Bucket: aws.String(b.bucket),
796                 Key:    aws.String(path),
797         }
798         _, err := b.svc.DeleteObject(context.Background(), input)
799         b.stats.TickOps("delete")
800         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
801         b.stats.TickErr(err)
802         return err
803 }
804
805 // Trash a Keep block.
806 func (v *s3Volume) BlockTrash(loc string) error {
807         if t, err := v.Mtime(loc); err != nil {
808                 return err
809         } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
810                 return nil
811         }
812         key := v.key(loc)
813         if v.cluster.Collections.BlobTrashLifetime == 0 {
814                 if !v.UnsafeDelete {
815                         return errS3TrashDisabled
816                 }
817                 return v.translateError(v.bucket.Del(key))
818         }
819         err := v.checkRaceWindow(key)
820         if err != nil {
821                 return err
822         }
823         err = v.safeCopy("trash/"+key, key)
824         if err != nil {
825                 return err
826         }
827         return v.translateError(v.bucket.Del(key))
828 }
829
830 // BlockUntrash moves block from trash back into store
831 func (v *s3Volume) BlockUntrash(hash string) error {
832         key := v.key(hash)
833         err := v.safeCopy(key, "trash/"+key)
834         if err != nil {
835                 return err
836         }
837         err = v.writeObject(context.Background(), "recent/"+key, nil)
838         return v.translateError(err)
839 }
840
841 type s3awsbucketStats struct {
842         statsTicker
843         Ops     uint64
844         GetOps  uint64
845         PutOps  uint64
846         HeadOps uint64
847         DelOps  uint64
848         ListOps uint64
849 }
850
851 func (s *s3awsbucketStats) TickErr(err error) {
852         if err == nil {
853                 return
854         }
855         errType := fmt.Sprintf("%T", err)
856         if aerr := smithy.APIError(nil); errors.As(err, &aerr) {
857                 if rerr := interface{ HTTPStatusCode() int }(nil); errors.As(err, &rerr) {
858                         errType = errType + fmt.Sprintf(" %d %s", rerr.HTTPStatusCode(), aerr.ErrorCode())
859                 } else {
860                         errType = errType + fmt.Sprintf(" 000 %s", aerr.ErrorCode())
861                 }
862         }
863         s.statsTicker.TickErr(err, errType)
864 }