Merge branch '21504-arv-mount-reference'
[arvados.git] / services / keepstore / s3_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "encoding/base64"
11         "encoding/hex"
12         "encoding/json"
13         "errors"
14         "fmt"
15         "io"
16         "os"
17         "regexp"
18         "strings"
19         "sync"
20         "sync/atomic"
21         "time"
22
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "github.com/aws/aws-sdk-go-v2/aws"
25         "github.com/aws/aws-sdk-go-v2/aws/awserr"
26         "github.com/aws/aws-sdk-go-v2/aws/defaults"
27         "github.com/aws/aws-sdk-go-v2/aws/ec2metadata"
28         "github.com/aws/aws-sdk-go-v2/aws/ec2rolecreds"
29         "github.com/aws/aws-sdk-go-v2/aws/endpoints"
30         "github.com/aws/aws-sdk-go-v2/service/s3"
31         "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
32         "github.com/prometheus/client_golang/prometheus"
33         "github.com/sirupsen/logrus"
34 )
35
36 func init() {
37         driver["S3"] = news3Volume
38 }
39
40 const (
41         s3DefaultReadTimeout        = arvados.Duration(10 * time.Minute)
42         s3DefaultConnectTimeout     = arvados.Duration(time.Minute)
43         maxClockSkew                = 600 * time.Second
44         nearlyRFC1123               = "Mon, 2 Jan 2006 15:04:05 GMT"
45         s3downloaderPartSize        = 6 * 1024 * 1024
46         s3downloaderReadConcurrency = 11
47         s3uploaderPartSize          = 5 * 1024 * 1024
48         s3uploaderWriteConcurrency  = 5
49 )
50
51 var (
52         errS3TrashDisabled   = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false")
53         s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
54         s3AWSZeroTime        time.Time
55 )
56
57 // s3Volume implements Volume using an S3 bucket.
58 type s3Volume struct {
59         arvados.S3VolumeDriverParameters
60         AuthToken      string    // populated automatically when IAMRole is used
61         AuthExpiration time.Time // populated automatically when IAMRole is used
62
63         cluster    *arvados.Cluster
64         volume     arvados.Volume
65         logger     logrus.FieldLogger
66         metrics    *volumeMetricsVecs
67         bufferPool *bufferPool
68         bucket     *s3Bucket
69         region     string
70         startOnce  sync.Once
71 }
72
73 // s3bucket wraps s3.bucket and counts I/O and API usage stats. The
74 // wrapped bucket can be replaced atomically with SetBucket in order
75 // to update credentials.
76 type s3Bucket struct {
77         bucket string
78         svc    *s3.Client
79         stats  s3awsbucketStats
80         mu     sync.Mutex
81 }
82
83 func (v *s3Volume) isKeepBlock(s string) (string, bool) {
84         if v.PrefixLength > 0 && len(s) == v.PrefixLength+33 && s[:v.PrefixLength] == s[v.PrefixLength+1:v.PrefixLength*2+1] {
85                 s = s[v.PrefixLength+1:]
86         }
87         return s, s3AWSKeepBlockRegexp.MatchString(s)
88 }
89
90 // Return the key used for a given loc. If PrefixLength==0 then
91 // key("abcdef0123") is "abcdef0123", if PrefixLength==3 then key is
92 // "abc/abcdef0123", etc.
93 func (v *s3Volume) key(loc string) string {
94         if v.PrefixLength > 0 && v.PrefixLength < len(loc)-1 {
95                 return loc[:v.PrefixLength] + "/" + loc
96         } else {
97                 return loc
98         }
99 }
100
101 func news3Volume(params newVolumeParams) (volume, error) {
102         v := &s3Volume{
103                 cluster:    params.Cluster,
104                 volume:     params.ConfigVolume,
105                 metrics:    params.MetricsVecs,
106                 bufferPool: params.BufferPool,
107         }
108         err := json.Unmarshal(params.ConfigVolume.DriverParameters, v)
109         if err != nil {
110                 return nil, err
111         }
112         v.logger = params.Logger.WithField("Volume", v.DeviceID())
113         return v, v.check("")
114 }
115
116 func (v *s3Volume) translateError(err error) error {
117         if _, ok := err.(*aws.RequestCanceledError); ok {
118                 return context.Canceled
119         } else if aerr, ok := err.(awserr.Error); ok {
120                 if aerr.Code() == "NotFound" {
121                         return os.ErrNotExist
122                 } else if aerr.Code() == "NoSuchKey" {
123                         return os.ErrNotExist
124                 }
125         }
126         return err
127 }
128
129 // safeCopy calls CopyObjectRequest, and checks the response to make
130 // sure the copy succeeded and updated the timestamp on the
131 // destination object
132 //
133 // (If something goes wrong during the copy, the error will be
134 // embedded in the 200 OK response)
135 func (v *s3Volume) safeCopy(dst, src string) error {
136         input := &s3.CopyObjectInput{
137                 Bucket:      aws.String(v.bucket.bucket),
138                 ContentType: aws.String("application/octet-stream"),
139                 CopySource:  aws.String(v.bucket.bucket + "/" + src),
140                 Key:         aws.String(dst),
141         }
142
143         req := v.bucket.svc.CopyObjectRequest(input)
144         resp, err := req.Send(context.Background())
145
146         err = v.translateError(err)
147         if os.IsNotExist(err) {
148                 return err
149         } else if err != nil {
150                 return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.bucket+"/"+src, err)
151         }
152
153         if resp.CopyObjectResult.LastModified == nil {
154                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.CopyObjectResult.LastModified, err)
155         } else if time.Now().Sub(*resp.CopyObjectResult.LastModified) > maxClockSkew {
156                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.CopyObjectResult.LastModified, resp.CopyObjectResult.LastModified)
157         }
158         return nil
159 }
160
161 func (v *s3Volume) check(ec2metadataHostname string) error {
162         if v.Bucket == "" {
163                 return errors.New("DriverParameters: Bucket must be provided")
164         }
165         if v.IndexPageSize == 0 {
166                 v.IndexPageSize = 1000
167         }
168         if v.RaceWindow < 0 {
169                 return errors.New("DriverParameters: RaceWindow must not be negative")
170         }
171
172         if v.V2Signature {
173                 return errors.New("DriverParameters: V2Signature is not supported")
174         }
175
176         defaultResolver := endpoints.NewDefaultResolver()
177
178         cfg := defaults.Config()
179
180         if v.Endpoint == "" && v.Region == "" {
181                 return fmt.Errorf("AWS region or endpoint must be specified")
182         } else if v.Endpoint != "" || ec2metadataHostname != "" {
183                 myCustomResolver := func(service, region string) (aws.Endpoint, error) {
184                         if v.Endpoint != "" && service == "s3" {
185                                 return aws.Endpoint{
186                                         URL:           v.Endpoint,
187                                         SigningRegion: region,
188                                 }, nil
189                         } else if service == "ec2metadata" && ec2metadataHostname != "" {
190                                 return aws.Endpoint{
191                                         URL: ec2metadataHostname,
192                                 }, nil
193                         } else {
194                                 return defaultResolver.ResolveEndpoint(service, region)
195                         }
196                 }
197                 cfg.EndpointResolver = aws.EndpointResolverFunc(myCustomResolver)
198         }
199         if v.Region == "" {
200                 // Endpoint is already specified (otherwise we would
201                 // have errored out above), but Region is also
202                 // required by the aws sdk, in order to determine
203                 // SignatureVersions.
204                 v.Region = "us-east-1"
205         }
206         cfg.Region = v.Region
207
208         // Zero timeouts mean "wait forever", which is a bad
209         // default. Default to long timeouts instead.
210         if v.ConnectTimeout == 0 {
211                 v.ConnectTimeout = s3DefaultConnectTimeout
212         }
213         if v.ReadTimeout == 0 {
214                 v.ReadTimeout = s3DefaultReadTimeout
215         }
216
217         creds := aws.NewChainProvider(
218                 []aws.CredentialsProvider{
219                         aws.NewStaticCredentialsProvider(v.AccessKeyID, v.SecretAccessKey, v.AuthToken),
220                         ec2rolecreds.New(ec2metadata.New(cfg)),
221                 })
222
223         cfg.Credentials = creds
224
225         v.bucket = &s3Bucket{
226                 bucket: v.Bucket,
227                 svc:    s3.New(cfg),
228         }
229
230         // Set up prometheus metrics
231         lbls := prometheus.Labels{"device_id": v.DeviceID()}
232         v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
233
234         return nil
235 }
236
237 // DeviceID returns a globally unique ID for the storage bucket.
238 func (v *s3Volume) DeviceID() string {
239         return "s3://" + v.Endpoint + "/" + v.Bucket
240 }
241
242 // EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
243 // and deletes them from the volume.
244 func (v *s3Volume) EmptyTrash() {
245         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
246
247         // Define "ready to delete" as "...when EmptyTrash started".
248         startT := time.Now()
249
250         emptyOneKey := func(trash *s3.Object) {
251                 key := strings.TrimPrefix(*trash.Key, "trash/")
252                 loc, isblk := v.isKeepBlock(key)
253                 if !isblk {
254                         return
255                 }
256                 atomic.AddInt64(&bytesInTrash, *trash.Size)
257                 atomic.AddInt64(&blocksInTrash, 1)
258
259                 trashT := *trash.LastModified
260                 recent, err := v.head("recent/" + key)
261                 if err != nil && os.IsNotExist(v.translateError(err)) {
262                         v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", *trash.Key, "recent/"+key, err)
263                         err = v.BlockUntrash(loc)
264                         if err != nil {
265                                 v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc)
266                         }
267                         return
268                 } else if err != nil {
269                         v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+key)
270                         return
271                 }
272                 if trashT.Sub(*recent.LastModified) < v.cluster.Collections.BlobSigningTTL.Duration() {
273                         if age := startT.Sub(*recent.LastModified); age >= v.cluster.Collections.BlobSigningTTL.Duration()-time.Duration(v.RaceWindow) {
274                                 // recent/key is too old to protect
275                                 // loc from being Trashed again during
276                                 // the raceWindow that starts if we
277                                 // delete trash/X now.
278                                 //
279                                 // Note this means (TrashSweepInterval
280                                 // < BlobSigningTTL - raceWindow) is
281                                 // necessary to avoid starvation.
282                                 v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
283                                 v.fixRace(key)
284                                 v.BlockTouch(loc)
285                                 return
286                         }
287                         _, err := v.head(key)
288                         if os.IsNotExist(err) {
289                                 v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
290                                 v.fixRace(key)
291                                 return
292                         } else if err != nil {
293                                 v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
294                                 return
295                         }
296                 }
297                 if startT.Sub(trashT) < v.cluster.Collections.BlobTrashLifetime.Duration() {
298                         return
299                 }
300                 err = v.bucket.Del(*trash.Key)
301                 if err != nil {
302                         v.logger.WithError(err).Errorf("EmptyTrash: error deleting %q", *trash.Key)
303                         return
304                 }
305                 atomic.AddInt64(&bytesDeleted, *trash.Size)
306                 atomic.AddInt64(&blocksDeleted, 1)
307
308                 _, err = v.head(*trash.Key)
309                 if err == nil {
310                         v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc)
311                         return
312                 }
313                 if !os.IsNotExist(v.translateError(err)) {
314                         v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", key)
315                         return
316                 }
317                 err = v.bucket.Del("recent/" + key)
318                 if err != nil {
319                         v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+key)
320                 }
321         }
322
323         var wg sync.WaitGroup
324         todo := make(chan *s3.Object, v.cluster.Collections.BlobDeleteConcurrency)
325         for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
326                 wg.Add(1)
327                 go func() {
328                         defer wg.Done()
329                         for key := range todo {
330                                 emptyOneKey(key)
331                         }
332                 }()
333         }
334
335         trashL := s3awsLister{
336                 Logger:   v.logger,
337                 Bucket:   v.bucket,
338                 Prefix:   "trash/",
339                 PageSize: v.IndexPageSize,
340                 Stats:    &v.bucket.stats,
341         }
342         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
343                 todo <- trash
344         }
345         close(todo)
346         wg.Wait()
347
348         if err := trashL.Error(); err != nil {
349                 v.logger.WithError(err).Error("EmptyTrash: lister failed")
350         }
351         v.logger.Infof("EmptyTrash: stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.DeviceID(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
352 }
353
354 // fixRace(X) is called when "recent/X" exists but "X" doesn't
355 // exist. If the timestamps on "recent/X" and "trash/X" indicate there
356 // was a race between Put and Trash, fixRace recovers from the race by
357 // Untrashing the block.
358 func (v *s3Volume) fixRace(key string) bool {
359         trash, err := v.head("trash/" + key)
360         if err != nil {
361                 if !os.IsNotExist(v.translateError(err)) {
362                         v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+key)
363                 }
364                 return false
365         }
366
367         recent, err := v.head("recent/" + key)
368         if err != nil {
369                 v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+key)
370                 return false
371         }
372
373         recentTime := *recent.LastModified
374         trashTime := *trash.LastModified
375         ageWhenTrashed := trashTime.Sub(recentTime)
376         if ageWhenTrashed >= v.cluster.Collections.BlobSigningTTL.Duration() {
377                 // No evidence of a race: block hasn't been written
378                 // since it became eligible for Trash. No fix needed.
379                 return false
380         }
381
382         v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", key, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
383         v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+key, key)
384         err = v.safeCopy(key, "trash/"+key)
385         if err != nil {
386                 v.logger.WithError(err).Error("fixRace: copy failed")
387                 return false
388         }
389         return true
390 }
391
392 func (v *s3Volume) head(key string) (result *s3.HeadObjectOutput, err error) {
393         input := &s3.HeadObjectInput{
394                 Bucket: aws.String(v.bucket.bucket),
395                 Key:    aws.String(key),
396         }
397
398         req := v.bucket.svc.HeadObjectRequest(input)
399         res, err := req.Send(context.TODO())
400
401         v.bucket.stats.TickOps("head")
402         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.HeadOps)
403         v.bucket.stats.TickErr(err)
404
405         if err != nil {
406                 return nil, v.translateError(err)
407         }
408         result = res.HeadObjectOutput
409         return
410 }
411
412 // BlockRead reads a Keep block that has been stored as a block blob
413 // in the S3 bucket.
414 func (v *s3Volume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
415         key := v.key(hash)
416         err := v.readWorker(ctx, key, w)
417         if err != nil {
418                 err = v.translateError(err)
419                 if !os.IsNotExist(err) {
420                         return err
421                 }
422
423                 _, err = v.head("recent/" + key)
424                 err = v.translateError(err)
425                 if err != nil {
426                         // If we can't read recent/X, there's no point in
427                         // trying fixRace. Give up.
428                         return err
429                 }
430                 if !v.fixRace(key) {
431                         err = os.ErrNotExist
432                         return err
433                 }
434
435                 err = v.readWorker(ctx, key, w)
436                 if err != nil {
437                         v.logger.Warnf("reading %s after successful fixRace: %s", hash, err)
438                         err = v.translateError(err)
439                         return err
440                 }
441         }
442         return nil
443 }
444
445 func (v *s3Volume) readWorker(ctx context.Context, key string, dst io.WriterAt) error {
446         downloader := s3manager.NewDownloaderWithClient(v.bucket.svc, func(u *s3manager.Downloader) {
447                 u.PartSize = s3downloaderPartSize
448                 u.Concurrency = s3downloaderReadConcurrency
449         })
450         count, err := downloader.DownloadWithContext(ctx, dst, &s3.GetObjectInput{
451                 Bucket: aws.String(v.bucket.bucket),
452                 Key:    aws.String(key),
453         })
454         v.bucket.stats.TickOps("get")
455         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.GetOps)
456         v.bucket.stats.TickErr(err)
457         v.bucket.stats.TickInBytes(uint64(count))
458         return v.translateError(err)
459 }
460
461 func (v *s3Volume) writeObject(ctx context.Context, key string, r io.Reader) error {
462         if r == nil {
463                 // r == nil leads to a memory violation in func readFillBuf in
464                 // aws-sdk-go-v2@v0.23.0/service/s3/s3manager/upload.go
465                 r = bytes.NewReader(nil)
466         }
467
468         uploadInput := s3manager.UploadInput{
469                 Bucket: aws.String(v.bucket.bucket),
470                 Key:    aws.String(key),
471                 Body:   r,
472         }
473
474         if loc, ok := v.isKeepBlock(key); ok {
475                 var contentMD5 string
476                 md5, err := hex.DecodeString(loc)
477                 if err != nil {
478                         return v.translateError(err)
479                 }
480                 contentMD5 = base64.StdEncoding.EncodeToString(md5)
481                 uploadInput.ContentMD5 = &contentMD5
482         }
483
484         // Experimentation indicated that using concurrency 5 yields the best
485         // throughput, better than higher concurrency (10 or 13) by ~5%.
486         // Defining u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(64 * 1024 * 1024)
487         // is detrimental to throughput (minus ~15%).
488         uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
489                 u.PartSize = s3uploaderPartSize
490                 u.Concurrency = s3uploaderWriteConcurrency
491         })
492
493         // Unlike the goamz S3 driver, we don't need to precompute ContentSHA256:
494         // the aws-sdk-go v2 SDK uses a ReadSeeker to avoid having to copy the
495         // block, so there is no extra memory use to be concerned about. See
496         // makeSha256Reader in aws/signer/v4/v4.go. In fact, we explicitly disable
497         // calculating the Sha-256 because we don't need it; we already use md5sum
498         // hashes that match the name of the block.
499         _, err := uploader.UploadWithContext(ctx, &uploadInput, s3manager.WithUploaderRequestOptions(func(r *aws.Request) {
500                 r.HTTPRequest.Header.Set("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
501         }))
502
503         v.bucket.stats.TickOps("put")
504         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.PutOps)
505         v.bucket.stats.TickErr(err)
506
507         return v.translateError(err)
508 }
509
510 // Put writes a block.
511 func (v *s3Volume) BlockWrite(ctx context.Context, hash string, data []byte) error {
512         // Do not use putWithPipe here; we want to pass an io.ReadSeeker to the S3
513         // sdk to avoid memory allocation there. See #17339 for more information.
514         rdr := bytes.NewReader(data)
515         r := newCountingReaderAtSeeker(rdr, v.bucket.stats.TickOutBytes)
516         key := v.key(hash)
517         err := v.writeObject(ctx, key, r)
518         if err != nil {
519                 return err
520         }
521         return v.writeObject(ctx, "recent/"+key, nil)
522 }
523
524 type s3awsLister struct {
525         Logger            logrus.FieldLogger
526         Bucket            *s3Bucket
527         Prefix            string
528         PageSize          int
529         Stats             *s3awsbucketStats
530         ContinuationToken string
531         buf               []s3.Object
532         err               error
533 }
534
535 // First fetches the first page and returns the first item. It returns
536 // nil if the response is the empty set or an error occurs.
537 func (lister *s3awsLister) First() *s3.Object {
538         lister.getPage()
539         return lister.pop()
540 }
541
542 // Next returns the next item, fetching the next page if necessary. It
543 // returns nil if the last available item has already been fetched, or
544 // an error occurs.
545 func (lister *s3awsLister) Next() *s3.Object {
546         if len(lister.buf) == 0 && lister.ContinuationToken != "" {
547                 lister.getPage()
548         }
549         return lister.pop()
550 }
551
552 // Return the most recent error encountered by First or Next.
553 func (lister *s3awsLister) Error() error {
554         return lister.err
555 }
556
557 func (lister *s3awsLister) getPage() {
558         lister.Stats.TickOps("list")
559         lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
560
561         var input *s3.ListObjectsV2Input
562         if lister.ContinuationToken == "" {
563                 input = &s3.ListObjectsV2Input{
564                         Bucket:  aws.String(lister.Bucket.bucket),
565                         MaxKeys: aws.Int64(int64(lister.PageSize)),
566                         Prefix:  aws.String(lister.Prefix),
567                 }
568         } else {
569                 input = &s3.ListObjectsV2Input{
570                         Bucket:            aws.String(lister.Bucket.bucket),
571                         MaxKeys:           aws.Int64(int64(lister.PageSize)),
572                         Prefix:            aws.String(lister.Prefix),
573                         ContinuationToken: &lister.ContinuationToken,
574                 }
575         }
576
577         req := lister.Bucket.svc.ListObjectsV2Request(input)
578         resp, err := req.Send(context.Background())
579         if err != nil {
580                 if aerr, ok := err.(awserr.Error); ok {
581                         lister.err = aerr
582                 } else {
583                         lister.err = err
584                 }
585                 return
586         }
587
588         if *resp.IsTruncated {
589                 lister.ContinuationToken = *resp.NextContinuationToken
590         } else {
591                 lister.ContinuationToken = ""
592         }
593         lister.buf = make([]s3.Object, 0, len(resp.Contents))
594         for _, key := range resp.Contents {
595                 if !strings.HasPrefix(*key.Key, lister.Prefix) {
596                         lister.Logger.Warnf("s3awsLister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, *key.Key)
597                         continue
598                 }
599                 lister.buf = append(lister.buf, key)
600         }
601 }
602
603 func (lister *s3awsLister) pop() (k *s3.Object) {
604         if len(lister.buf) > 0 {
605                 k = &lister.buf[0]
606                 lister.buf = lister.buf[1:]
607         }
608         return
609 }
610
611 // Index writes a complete list of locators with the given prefix
612 // for which Get() can retrieve data.
613 func (v *s3Volume) Index(ctx context.Context, prefix string, writer io.Writer) error {
614         prefix = v.key(prefix)
615         // Use a merge sort to find matching sets of X and recent/X.
616         dataL := s3awsLister{
617                 Logger:   v.logger,
618                 Bucket:   v.bucket,
619                 Prefix:   prefix,
620                 PageSize: v.IndexPageSize,
621                 Stats:    &v.bucket.stats,
622         }
623         recentL := s3awsLister{
624                 Logger:   v.logger,
625                 Bucket:   v.bucket,
626                 Prefix:   "recent/" + prefix,
627                 PageSize: v.IndexPageSize,
628                 Stats:    &v.bucket.stats,
629         }
630         for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
631                 if ctx.Err() != nil {
632                         return ctx.Err()
633                 }
634                 if *data.Key >= "g" {
635                         // Conveniently, "recent/*" and "trash/*" are
636                         // lexically greater than all hex-encoded data
637                         // hashes, so stopping here avoids iterating
638                         // over all of them needlessly with dataL.
639                         break
640                 }
641                 loc, isblk := v.isKeepBlock(*data.Key)
642                 if !isblk {
643                         continue
644                 }
645
646                 // stamp is the list entry we should use to report the
647                 // last-modified time for this data block: it will be
648                 // the recent/X entry if one exists, otherwise the
649                 // entry for the data block itself.
650                 stamp := data
651
652                 // Advance to the corresponding recent/X marker, if any
653                 for recent != nil && recentL.Error() == nil {
654                         if cmp := strings.Compare((*recent.Key)[7:], *data.Key); cmp < 0 {
655                                 recent = recentL.Next()
656                                 continue
657                         } else if cmp == 0 {
658                                 stamp = recent
659                                 recent = recentL.Next()
660                                 break
661                         } else {
662                                 // recent/X marker is missing: we'll
663                                 // use the timestamp on the data
664                                 // object.
665                                 break
666                         }
667                 }
668                 if err := recentL.Error(); err != nil {
669                         return err
670                 }
671                 // We truncate sub-second precision here. Otherwise
672                 // timestamps will never match the RFC1123-formatted
673                 // Last-Modified values parsed by Mtime().
674                 fmt.Fprintf(writer, "%s+%d %d\n", loc, *data.Size, stamp.LastModified.Unix()*1000000000)
675         }
676         return dataL.Error()
677 }
678
679 // Mtime returns the stored timestamp for the given locator.
680 func (v *s3Volume) Mtime(loc string) (time.Time, error) {
681         key := v.key(loc)
682         _, err := v.head(key)
683         if err != nil {
684                 return s3AWSZeroTime, v.translateError(err)
685         }
686         resp, err := v.head("recent/" + key)
687         err = v.translateError(err)
688         if os.IsNotExist(err) {
689                 // The data object X exists, but recent/X is missing.
690                 err = v.writeObject(context.Background(), "recent/"+key, nil)
691                 if err != nil {
692                         v.logger.WithError(err).Errorf("error creating %q", "recent/"+key)
693                         return s3AWSZeroTime, v.translateError(err)
694                 }
695                 v.logger.Infof("Mtime: created %q to migrate existing block to new storage scheme", "recent/"+key)
696                 resp, err = v.head("recent/" + key)
697                 if err != nil {
698                         v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+key)
699                         return s3AWSZeroTime, v.translateError(err)
700                 }
701         } else if err != nil {
702                 // HEAD recent/X failed for some other reason.
703                 return s3AWSZeroTime, err
704         }
705         return *resp.LastModified, err
706 }
707
708 // InternalStats returns bucket I/O and API call counters.
709 func (v *s3Volume) InternalStats() interface{} {
710         return &v.bucket.stats
711 }
712
713 // BlockTouch sets the timestamp for the given locator to the current time.
714 func (v *s3Volume) BlockTouch(hash string) error {
715         key := v.key(hash)
716         _, err := v.head(key)
717         err = v.translateError(err)
718         if os.IsNotExist(err) && v.fixRace(key) {
719                 // The data object got trashed in a race, but fixRace
720                 // rescued it.
721         } else if err != nil {
722                 return err
723         }
724         err = v.writeObject(context.Background(), "recent/"+key, nil)
725         return v.translateError(err)
726 }
727
728 // checkRaceWindow returns a non-nil error if trash/key is, or might
729 // be, in the race window (i.e., it's not safe to trash key).
730 func (v *s3Volume) checkRaceWindow(key string) error {
731         resp, err := v.head("trash/" + key)
732         err = v.translateError(err)
733         if os.IsNotExist(err) {
734                 // OK, trash/X doesn't exist so we're not in the race
735                 // window
736                 return nil
737         } else if err != nil {
738                 // Error looking up trash/X. We don't know whether
739                 // we're in the race window
740                 return err
741         }
742         t := resp.LastModified
743         safeWindow := t.Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
744         if safeWindow <= 0 {
745                 // We can't count on "touch trash/X" to prolong
746                 // trash/X's lifetime. The new timestamp might not
747                 // become visible until now+raceWindow, and EmptyTrash
748                 // is allowed to delete trash/X before then.
749                 return fmt.Errorf("%s: same block is already in trash, and safe window ended %s ago", key, -safeWindow)
750         }
751         // trash/X exists, but it won't be eligible for deletion until
752         // after now+raceWindow, so it's safe to overwrite it.
753         return nil
754 }
755
756 func (b *s3Bucket) Del(path string) error {
757         input := &s3.DeleteObjectInput{
758                 Bucket: aws.String(b.bucket),
759                 Key:    aws.String(path),
760         }
761         req := b.svc.DeleteObjectRequest(input)
762         _, err := req.Send(context.Background())
763         b.stats.TickOps("delete")
764         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
765         b.stats.TickErr(err)
766         return err
767 }
768
769 // Trash a Keep block.
770 func (v *s3Volume) BlockTrash(loc string) error {
771         if t, err := v.Mtime(loc); err != nil {
772                 return err
773         } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
774                 return nil
775         }
776         key := v.key(loc)
777         if v.cluster.Collections.BlobTrashLifetime == 0 {
778                 if !v.UnsafeDelete {
779                         return errS3TrashDisabled
780                 }
781                 return v.translateError(v.bucket.Del(key))
782         }
783         err := v.checkRaceWindow(key)
784         if err != nil {
785                 return err
786         }
787         err = v.safeCopy("trash/"+key, key)
788         if err != nil {
789                 return err
790         }
791         return v.translateError(v.bucket.Del(key))
792 }
793
794 // BlockUntrash moves block from trash back into store
795 func (v *s3Volume) BlockUntrash(hash string) error {
796         key := v.key(hash)
797         err := v.safeCopy(key, "trash/"+key)
798         if err != nil {
799                 return err
800         }
801         err = v.writeObject(context.Background(), "recent/"+key, nil)
802         return v.translateError(err)
803 }
804
805 type s3awsbucketStats struct {
806         statsTicker
807         Ops     uint64
808         GetOps  uint64
809         PutOps  uint64
810         HeadOps uint64
811         DelOps  uint64
812         ListOps uint64
813 }
814
815 func (s *s3awsbucketStats) TickErr(err error) {
816         if err == nil {
817                 return
818         }
819         errType := fmt.Sprintf("%T", err)
820         if aerr, ok := err.(awserr.Error); ok {
821                 if reqErr, ok := err.(awserr.RequestFailure); ok {
822                         // A service error occurred
823                         errType = errType + fmt.Sprintf(" %d %s", reqErr.StatusCode(), aerr.Code())
824                 } else {
825                         errType = errType + fmt.Sprintf(" 000 %s", aerr.Code())
826                 }
827         }
828         s.statsTicker.TickErr(err, errType)
829 }