2960: Refactor keepstore into a streaming server.
[arvados.git] / services / keepstore / s3aws_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "encoding/base64"
11         "encoding/hex"
12         "encoding/json"
13         "errors"
14         "fmt"
15         "io"
16         "os"
17         "regexp"
18         "strings"
19         "sync"
20         "sync/atomic"
21         "time"
22
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "github.com/aws/aws-sdk-go-v2/aws"
25         "github.com/aws/aws-sdk-go-v2/aws/awserr"
26         "github.com/aws/aws-sdk-go-v2/aws/defaults"
27         "github.com/aws/aws-sdk-go-v2/aws/ec2metadata"
28         "github.com/aws/aws-sdk-go-v2/aws/ec2rolecreds"
29         "github.com/aws/aws-sdk-go-v2/aws/endpoints"
30         "github.com/aws/aws-sdk-go-v2/service/s3"
31         "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
32         "github.com/prometheus/client_golang/prometheus"
33         "github.com/sirupsen/logrus"
34 )
35
36 func init() {
37         driver["S3"] = newS3AWSVolume
38 }
39
40 const (
41         s3DefaultReadTimeout        = arvados.Duration(10 * time.Minute)
42         s3DefaultConnectTimeout     = arvados.Duration(time.Minute)
43         maxClockSkew                = 600 * time.Second
44         nearlyRFC1123               = "Mon, 2 Jan 2006 15:04:05 GMT"
45         s3downloaderPartSize        = 5 * 1024 * 1024
46         s3downloaderReadConcurrency = 13
47         s3uploaderPartSize          = 5 * 1024 * 1024
48         s3uploaderWriteConcurrency  = 5
49 )
50
51 var (
52         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false")
53 )
54
55 // S3AWSVolume implements Volume using an S3 bucket.
56 type S3AWSVolume struct {
57         arvados.S3VolumeDriverParameters
58         AuthToken      string    // populated automatically when IAMRole is used
59         AuthExpiration time.Time // populated automatically when IAMRole is used
60
61         cluster    *arvados.Cluster
62         volume     arvados.Volume
63         logger     logrus.FieldLogger
64         metrics    *volumeMetricsVecs
65         bufferPool *bufferPool
66         bucket     *s3AWSbucket
67         region     string
68         startOnce  sync.Once
69 }
70
71 // s3bucket wraps s3.bucket and counts I/O and API usage stats. The
72 // wrapped bucket can be replaced atomically with SetBucket in order
73 // to update credentials.
74 type s3AWSbucket struct {
75         bucket string
76         svc    *s3.Client
77         stats  s3awsbucketStats
78         mu     sync.Mutex
79 }
80
81 const ()
82
83 var s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
84 var s3AWSZeroTime time.Time
85
86 func (v *S3AWSVolume) isKeepBlock(s string) (string, bool) {
87         if v.PrefixLength > 0 && len(s) == v.PrefixLength+33 && s[:v.PrefixLength] == s[v.PrefixLength+1:v.PrefixLength*2+1] {
88                 s = s[v.PrefixLength+1:]
89         }
90         return s, s3AWSKeepBlockRegexp.MatchString(s)
91 }
92
93 // Return the key used for a given loc. If PrefixLength==0 then
94 // key("abcdef0123") is "abcdef0123", if PrefixLength==3 then key is
95 // "abc/abcdef0123", etc.
96 func (v *S3AWSVolume) key(loc string) string {
97         if v.PrefixLength > 0 && v.PrefixLength < len(loc)-1 {
98                 return loc[:v.PrefixLength] + "/" + loc
99         } else {
100                 return loc
101         }
102 }
103
104 func newS3AWSVolume(params newVolumeParams) (volume, error) {
105         v := &S3AWSVolume{
106                 cluster:    params.Cluster,
107                 volume:     params.ConfigVolume,
108                 metrics:    params.MetricsVecs,
109                 bufferPool: params.BufferPool,
110         }
111         err := json.Unmarshal(params.ConfigVolume.DriverParameters, v)
112         if err != nil {
113                 return nil, err
114         }
115         v.logger = params.Logger.WithField("Volume", v.DeviceID())
116         return v, v.check("")
117 }
118
119 func (v *S3AWSVolume) translateError(err error) error {
120         if _, ok := err.(*aws.RequestCanceledError); ok {
121                 return context.Canceled
122         } else if aerr, ok := err.(awserr.Error); ok {
123                 if aerr.Code() == "NotFound" {
124                         return os.ErrNotExist
125                 } else if aerr.Code() == "NoSuchKey" {
126                         return os.ErrNotExist
127                 }
128         }
129         return err
130 }
131
132 // safeCopy calls CopyObjectRequest, and checks the response to make
133 // sure the copy succeeded and updated the timestamp on the
134 // destination object
135 //
136 // (If something goes wrong during the copy, the error will be
137 // embedded in the 200 OK response)
138 func (v *S3AWSVolume) safeCopy(dst, src string) error {
139         input := &s3.CopyObjectInput{
140                 Bucket:      aws.String(v.bucket.bucket),
141                 ContentType: aws.String("application/octet-stream"),
142                 CopySource:  aws.String(v.bucket.bucket + "/" + src),
143                 Key:         aws.String(dst),
144         }
145
146         req := v.bucket.svc.CopyObjectRequest(input)
147         resp, err := req.Send(context.Background())
148
149         err = v.translateError(err)
150         if os.IsNotExist(err) {
151                 return err
152         } else if err != nil {
153                 return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.bucket+"/"+src, err)
154         }
155
156         if resp.CopyObjectResult.LastModified == nil {
157                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.CopyObjectResult.LastModified, err)
158         } else if time.Now().Sub(*resp.CopyObjectResult.LastModified) > maxClockSkew {
159                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.CopyObjectResult.LastModified, resp.CopyObjectResult.LastModified)
160         }
161         return nil
162 }
163
164 func (v *S3AWSVolume) check(ec2metadataHostname string) error {
165         if v.Bucket == "" {
166                 return errors.New("DriverParameters: Bucket must be provided")
167         }
168         if v.IndexPageSize == 0 {
169                 v.IndexPageSize = 1000
170         }
171         if v.RaceWindow < 0 {
172                 return errors.New("DriverParameters: RaceWindow must not be negative")
173         }
174
175         if v.V2Signature {
176                 return errors.New("DriverParameters: V2Signature is not supported")
177         }
178
179         defaultResolver := endpoints.NewDefaultResolver()
180
181         cfg := defaults.Config()
182
183         if v.Endpoint == "" && v.Region == "" {
184                 return fmt.Errorf("AWS region or endpoint must be specified")
185         } else if v.Endpoint != "" || ec2metadataHostname != "" {
186                 myCustomResolver := func(service, region string) (aws.Endpoint, error) {
187                         if v.Endpoint != "" && service == "s3" {
188                                 return aws.Endpoint{
189                                         URL:           v.Endpoint,
190                                         SigningRegion: region,
191                                 }, nil
192                         } else if service == "ec2metadata" && ec2metadataHostname != "" {
193                                 return aws.Endpoint{
194                                         URL: ec2metadataHostname,
195                                 }, nil
196                         } else {
197                                 return defaultResolver.ResolveEndpoint(service, region)
198                         }
199                 }
200                 cfg.EndpointResolver = aws.EndpointResolverFunc(myCustomResolver)
201         }
202         if v.Region == "" {
203                 // Endpoint is already specified (otherwise we would
204                 // have errored out above), but Region is also
205                 // required by the aws sdk, in order to determine
206                 // SignatureVersions.
207                 v.Region = "us-east-1"
208         }
209         cfg.Region = v.Region
210
211         // Zero timeouts mean "wait forever", which is a bad
212         // default. Default to long timeouts instead.
213         if v.ConnectTimeout == 0 {
214                 v.ConnectTimeout = s3DefaultConnectTimeout
215         }
216         if v.ReadTimeout == 0 {
217                 v.ReadTimeout = s3DefaultReadTimeout
218         }
219
220         creds := aws.NewChainProvider(
221                 []aws.CredentialsProvider{
222                         aws.NewStaticCredentialsProvider(v.AccessKeyID, v.SecretAccessKey, v.AuthToken),
223                         ec2rolecreds.New(ec2metadata.New(cfg)),
224                 })
225
226         cfg.Credentials = creds
227
228         v.bucket = &s3AWSbucket{
229                 bucket: v.Bucket,
230                 svc:    s3.New(cfg),
231         }
232
233         // Set up prometheus metrics
234         lbls := prometheus.Labels{"device_id": v.DeviceID()}
235         v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
236
237         return nil
238 }
239
240 // DeviceID returns a globally unique ID for the storage bucket.
241 func (v *S3AWSVolume) DeviceID() string {
242         return "s3://" + v.Endpoint + "/" + v.Bucket
243 }
244
245 // EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
246 // and deletes them from the volume.
247 func (v *S3AWSVolume) EmptyTrash() {
248         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
249
250         // Define "ready to delete" as "...when EmptyTrash started".
251         startT := time.Now()
252
253         emptyOneKey := func(trash *s3.Object) {
254                 key := strings.TrimPrefix(*trash.Key, "trash/")
255                 loc, isblk := v.isKeepBlock(key)
256                 if !isblk {
257                         return
258                 }
259                 atomic.AddInt64(&bytesInTrash, *trash.Size)
260                 atomic.AddInt64(&blocksInTrash, 1)
261
262                 trashT := *trash.LastModified
263                 recent, err := v.head("recent/" + key)
264                 if err != nil && os.IsNotExist(v.translateError(err)) {
265                         v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", *trash.Key, "recent/"+key, err)
266                         err = v.BlockUntrash(loc)
267                         if err != nil {
268                                 v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc)
269                         }
270                         return
271                 } else if err != nil {
272                         v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+key)
273                         return
274                 }
275                 if trashT.Sub(*recent.LastModified) < v.cluster.Collections.BlobSigningTTL.Duration() {
276                         if age := startT.Sub(*recent.LastModified); age >= v.cluster.Collections.BlobSigningTTL.Duration()-time.Duration(v.RaceWindow) {
277                                 // recent/key is too old to protect
278                                 // loc from being Trashed again during
279                                 // the raceWindow that starts if we
280                                 // delete trash/X now.
281                                 //
282                                 // Note this means (TrashSweepInterval
283                                 // < BlobSigningTTL - raceWindow) is
284                                 // necessary to avoid starvation.
285                                 v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
286                                 v.fixRace(key)
287                                 v.BlockTouch(loc)
288                                 return
289                         }
290                         _, err := v.head(key)
291                         if os.IsNotExist(err) {
292                                 v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
293                                 v.fixRace(key)
294                                 return
295                         } else if err != nil {
296                                 v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
297                                 return
298                         }
299                 }
300                 if startT.Sub(trashT) < v.cluster.Collections.BlobTrashLifetime.Duration() {
301                         return
302                 }
303                 err = v.bucket.Del(*trash.Key)
304                 if err != nil {
305                         v.logger.WithError(err).Errorf("EmptyTrash: error deleting %q", *trash.Key)
306                         return
307                 }
308                 atomic.AddInt64(&bytesDeleted, *trash.Size)
309                 atomic.AddInt64(&blocksDeleted, 1)
310
311                 _, err = v.head(*trash.Key)
312                 if err == nil {
313                         v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc)
314                         return
315                 }
316                 if !os.IsNotExist(v.translateError(err)) {
317                         v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", key)
318                         return
319                 }
320                 err = v.bucket.Del("recent/" + key)
321                 if err != nil {
322                         v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+key)
323                 }
324         }
325
326         var wg sync.WaitGroup
327         todo := make(chan *s3.Object, v.cluster.Collections.BlobDeleteConcurrency)
328         for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
329                 wg.Add(1)
330                 go func() {
331                         defer wg.Done()
332                         for key := range todo {
333                                 emptyOneKey(key)
334                         }
335                 }()
336         }
337
338         trashL := s3awsLister{
339                 Logger:   v.logger,
340                 Bucket:   v.bucket,
341                 Prefix:   "trash/",
342                 PageSize: v.IndexPageSize,
343                 Stats:    &v.bucket.stats,
344         }
345         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
346                 todo <- trash
347         }
348         close(todo)
349         wg.Wait()
350
351         if err := trashL.Error(); err != nil {
352                 v.logger.WithError(err).Error("EmptyTrash: lister failed")
353         }
354         v.logger.Infof("EmptyTrash: stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.DeviceID(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
355 }
356
357 // fixRace(X) is called when "recent/X" exists but "X" doesn't
358 // exist. If the timestamps on "recent/X" and "trash/X" indicate there
359 // was a race between Put and Trash, fixRace recovers from the race by
360 // Untrashing the block.
361 func (v *S3AWSVolume) fixRace(key string) bool {
362         trash, err := v.head("trash/" + key)
363         if err != nil {
364                 if !os.IsNotExist(v.translateError(err)) {
365                         v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+key)
366                 }
367                 return false
368         }
369
370         recent, err := v.head("recent/" + key)
371         if err != nil {
372                 v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+key)
373                 return false
374         }
375
376         recentTime := *recent.LastModified
377         trashTime := *trash.LastModified
378         ageWhenTrashed := trashTime.Sub(recentTime)
379         if ageWhenTrashed >= v.cluster.Collections.BlobSigningTTL.Duration() {
380                 // No evidence of a race: block hasn't been written
381                 // since it became eligible for Trash. No fix needed.
382                 return false
383         }
384
385         v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", key, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
386         v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+key, key)
387         err = v.safeCopy(key, "trash/"+key)
388         if err != nil {
389                 v.logger.WithError(err).Error("fixRace: copy failed")
390                 return false
391         }
392         return true
393 }
394
395 func (v *S3AWSVolume) head(key string) (result *s3.HeadObjectOutput, err error) {
396         input := &s3.HeadObjectInput{
397                 Bucket: aws.String(v.bucket.bucket),
398                 Key:    aws.String(key),
399         }
400
401         req := v.bucket.svc.HeadObjectRequest(input)
402         res, err := req.Send(context.TODO())
403
404         v.bucket.stats.TickOps("head")
405         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.HeadOps)
406         v.bucket.stats.TickErr(err)
407
408         if err != nil {
409                 return nil, v.translateError(err)
410         }
411         result = res.HeadObjectOutput
412         return
413 }
414
415 // BlockRead reads a Keep block that has been stored as a block blob
416 // in the S3 bucket.
417 func (v *S3AWSVolume) BlockRead(ctx context.Context, hash string, writeTo io.Writer) (int, error) {
418         key := v.key(hash)
419         buf, err := v.bufferPool.GetContext(ctx)
420         if err != nil {
421                 return 0, err
422         }
423         defer v.bufferPool.Put(buf)
424
425         streamer := newStreamWriterAt(writeTo, 65536, buf)
426         defer streamer.Close()
427         err = v.readWorker(ctx, key, streamer)
428         if err != nil {
429                 err = v.translateError(err)
430                 if !os.IsNotExist(err) {
431                         return 0, err
432                 }
433                 if streamer.WroteAt() > 0 {
434                         return 0, errors.New("bug? readWorker returned ErrNotExist after writing to streamer")
435                 }
436
437                 _, err = v.head("recent/" + key)
438                 err = v.translateError(err)
439                 if err != nil {
440                         // If we can't read recent/X, there's no point in
441                         // trying fixRace. Give up.
442                         return 0, err
443                 }
444                 if !v.fixRace(key) {
445                         err = os.ErrNotExist
446                         return 0, err
447                 }
448
449                 err = v.readWorker(ctx, key, streamer)
450                 if err != nil {
451                         v.logger.Warnf("reading %s after successful fixRace: %s", hash, err)
452                         err = v.translateError(err)
453                         return 0, err
454                 }
455         }
456         err = streamer.Close()
457         if err != nil {
458                 return 0, v.translateError(err)
459         }
460         return streamer.Wrote(), nil
461 }
462
463 func (v *S3AWSVolume) readWorker(ctx context.Context, key string, dst io.WriterAt) error {
464         downloader := s3manager.NewDownloaderWithClient(v.bucket.svc, func(u *s3manager.Downloader) {
465                 u.PartSize = s3downloaderPartSize
466                 u.Concurrency = s3downloaderReadConcurrency
467         })
468         count, err := downloader.DownloadWithContext(ctx, dst, &s3.GetObjectInput{
469                 Bucket: aws.String(v.bucket.bucket),
470                 Key:    aws.String(key),
471         })
472         v.bucket.stats.TickOps("get")
473         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.GetOps)
474         v.bucket.stats.TickErr(err)
475         v.bucket.stats.TickInBytes(uint64(count))
476         return v.translateError(err)
477 }
478
479 func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader) error {
480         if r == nil {
481                 // r == nil leads to a memory violation in func readFillBuf in
482                 // aws-sdk-go-v2@v0.23.0/service/s3/s3manager/upload.go
483                 r = bytes.NewReader(nil)
484         }
485
486         uploadInput := s3manager.UploadInput{
487                 Bucket: aws.String(v.bucket.bucket),
488                 Key:    aws.String(key),
489                 Body:   r,
490         }
491
492         if loc, ok := v.isKeepBlock(key); ok {
493                 var contentMD5 string
494                 md5, err := hex.DecodeString(loc)
495                 if err != nil {
496                         return v.translateError(err)
497                 }
498                 contentMD5 = base64.StdEncoding.EncodeToString(md5)
499                 uploadInput.ContentMD5 = &contentMD5
500         }
501
502         // Experimentation indicated that using concurrency 5 yields the best
503         // throughput, better than higher concurrency (10 or 13) by ~5%.
504         // Defining u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(64 * 1024 * 1024)
505         // is detrimental to throughput (minus ~15%).
506         uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
507                 u.PartSize = s3uploaderPartSize
508                 u.Concurrency = s3uploaderWriteConcurrency
509         })
510
511         // Unlike the goamz S3 driver, we don't need to precompute ContentSHA256:
512         // the aws-sdk-go v2 SDK uses a ReadSeeker to avoid having to copy the
513         // block, so there is no extra memory use to be concerned about. See
514         // makeSha256Reader in aws/signer/v4/v4.go. In fact, we explicitly disable
515         // calculating the Sha-256 because we don't need it; we already use md5sum
516         // hashes that match the name of the block.
517         _, err := uploader.UploadWithContext(ctx, &uploadInput, s3manager.WithUploaderRequestOptions(func(r *aws.Request) {
518                 r.HTTPRequest.Header.Set("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
519         }))
520
521         v.bucket.stats.TickOps("put")
522         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.PutOps)
523         v.bucket.stats.TickErr(err)
524
525         return v.translateError(err)
526 }
527
528 // Put writes a block.
529 func (v *S3AWSVolume) BlockWrite(ctx context.Context, hash string, data []byte) error {
530         // Do not use putWithPipe here; we want to pass an io.ReadSeeker to the S3
531         // sdk to avoid memory allocation there. See #17339 for more information.
532         rdr := bytes.NewReader(data)
533         r := newCountingReaderAtSeeker(rdr, v.bucket.stats.TickOutBytes)
534         key := v.key(hash)
535         err := v.writeObject(ctx, key, r)
536         if err != nil {
537                 return err
538         }
539         return v.writeObject(ctx, "recent/"+key, nil)
540 }
541
542 type s3awsLister struct {
543         Logger            logrus.FieldLogger
544         Bucket            *s3AWSbucket
545         Prefix            string
546         PageSize          int
547         Stats             *s3awsbucketStats
548         ContinuationToken string
549         buf               []s3.Object
550         err               error
551 }
552
553 // First fetches the first page and returns the first item. It returns
554 // nil if the response is the empty set or an error occurs.
555 func (lister *s3awsLister) First() *s3.Object {
556         lister.getPage()
557         return lister.pop()
558 }
559
560 // Next returns the next item, fetching the next page if necessary. It
561 // returns nil if the last available item has already been fetched, or
562 // an error occurs.
563 func (lister *s3awsLister) Next() *s3.Object {
564         if len(lister.buf) == 0 && lister.ContinuationToken != "" {
565                 lister.getPage()
566         }
567         return lister.pop()
568 }
569
570 // Return the most recent error encountered by First or Next.
571 func (lister *s3awsLister) Error() error {
572         return lister.err
573 }
574
575 func (lister *s3awsLister) getPage() {
576         lister.Stats.TickOps("list")
577         lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
578
579         var input *s3.ListObjectsV2Input
580         if lister.ContinuationToken == "" {
581                 input = &s3.ListObjectsV2Input{
582                         Bucket:  aws.String(lister.Bucket.bucket),
583                         MaxKeys: aws.Int64(int64(lister.PageSize)),
584                         Prefix:  aws.String(lister.Prefix),
585                 }
586         } else {
587                 input = &s3.ListObjectsV2Input{
588                         Bucket:            aws.String(lister.Bucket.bucket),
589                         MaxKeys:           aws.Int64(int64(lister.PageSize)),
590                         Prefix:            aws.String(lister.Prefix),
591                         ContinuationToken: &lister.ContinuationToken,
592                 }
593         }
594
595         req := lister.Bucket.svc.ListObjectsV2Request(input)
596         resp, err := req.Send(context.Background())
597         if err != nil {
598                 if aerr, ok := err.(awserr.Error); ok {
599                         lister.err = aerr
600                 } else {
601                         lister.err = err
602                 }
603                 return
604         }
605
606         if *resp.IsTruncated {
607                 lister.ContinuationToken = *resp.NextContinuationToken
608         } else {
609                 lister.ContinuationToken = ""
610         }
611         lister.buf = make([]s3.Object, 0, len(resp.Contents))
612         for _, key := range resp.Contents {
613                 if !strings.HasPrefix(*key.Key, lister.Prefix) {
614                         lister.Logger.Warnf("s3awsLister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, *key.Key)
615                         continue
616                 }
617                 lister.buf = append(lister.buf, key)
618         }
619 }
620
621 func (lister *s3awsLister) pop() (k *s3.Object) {
622         if len(lister.buf) > 0 {
623                 k = &lister.buf[0]
624                 lister.buf = lister.buf[1:]
625         }
626         return
627 }
628
629 // Index writes a complete list of locators with the given prefix
630 // for which Get() can retrieve data.
631 func (v *S3AWSVolume) Index(ctx context.Context, prefix string, writer io.Writer) error {
632         prefix = v.key(prefix)
633         // Use a merge sort to find matching sets of X and recent/X.
634         dataL := s3awsLister{
635                 Logger:   v.logger,
636                 Bucket:   v.bucket,
637                 Prefix:   prefix,
638                 PageSize: v.IndexPageSize,
639                 Stats:    &v.bucket.stats,
640         }
641         recentL := s3awsLister{
642                 Logger:   v.logger,
643                 Bucket:   v.bucket,
644                 Prefix:   "recent/" + prefix,
645                 PageSize: v.IndexPageSize,
646                 Stats:    &v.bucket.stats,
647         }
648         for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
649                 if ctx.Err() != nil {
650                         return ctx.Err()
651                 }
652                 if *data.Key >= "g" {
653                         // Conveniently, "recent/*" and "trash/*" are
654                         // lexically greater than all hex-encoded data
655                         // hashes, so stopping here avoids iterating
656                         // over all of them needlessly with dataL.
657                         break
658                 }
659                 loc, isblk := v.isKeepBlock(*data.Key)
660                 if !isblk {
661                         continue
662                 }
663
664                 // stamp is the list entry we should use to report the
665                 // last-modified time for this data block: it will be
666                 // the recent/X entry if one exists, otherwise the
667                 // entry for the data block itself.
668                 stamp := data
669
670                 // Advance to the corresponding recent/X marker, if any
671                 for recent != nil && recentL.Error() == nil {
672                         if cmp := strings.Compare((*recent.Key)[7:], *data.Key); cmp < 0 {
673                                 recent = recentL.Next()
674                                 continue
675                         } else if cmp == 0 {
676                                 stamp = recent
677                                 recent = recentL.Next()
678                                 break
679                         } else {
680                                 // recent/X marker is missing: we'll
681                                 // use the timestamp on the data
682                                 // object.
683                                 break
684                         }
685                 }
686                 if err := recentL.Error(); err != nil {
687                         return err
688                 }
689                 // We truncate sub-second precision here. Otherwise
690                 // timestamps will never match the RFC1123-formatted
691                 // Last-Modified values parsed by Mtime().
692                 fmt.Fprintf(writer, "%s+%d %d\n", loc, *data.Size, stamp.LastModified.Unix()*1000000000)
693         }
694         return dataL.Error()
695 }
696
697 // Mtime returns the stored timestamp for the given locator.
698 func (v *S3AWSVolume) Mtime(loc string) (time.Time, error) {
699         key := v.key(loc)
700         _, err := v.head(key)
701         if err != nil {
702                 return s3AWSZeroTime, v.translateError(err)
703         }
704         resp, err := v.head("recent/" + key)
705         err = v.translateError(err)
706         if os.IsNotExist(err) {
707                 // The data object X exists, but recent/X is missing.
708                 err = v.writeObject(context.Background(), "recent/"+key, nil)
709                 if err != nil {
710                         v.logger.WithError(err).Errorf("error creating %q", "recent/"+key)
711                         return s3AWSZeroTime, v.translateError(err)
712                 }
713                 v.logger.Infof("Mtime: created %q to migrate existing block to new storage scheme", "recent/"+key)
714                 resp, err = v.head("recent/" + key)
715                 if err != nil {
716                         v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+key)
717                         return s3AWSZeroTime, v.translateError(err)
718                 }
719         } else if err != nil {
720                 // HEAD recent/X failed for some other reason.
721                 return s3AWSZeroTime, err
722         }
723         return *resp.LastModified, err
724 }
725
726 // InternalStats returns bucket I/O and API call counters.
727 func (v *S3AWSVolume) InternalStats() interface{} {
728         return &v.bucket.stats
729 }
730
731 // BlockTouch sets the timestamp for the given locator to the current time.
732 func (v *S3AWSVolume) BlockTouch(hash string) error {
733         key := v.key(hash)
734         _, err := v.head(key)
735         err = v.translateError(err)
736         if os.IsNotExist(err) && v.fixRace(key) {
737                 // The data object got trashed in a race, but fixRace
738                 // rescued it.
739         } else if err != nil {
740                 return err
741         }
742         err = v.writeObject(context.Background(), "recent/"+key, nil)
743         return v.translateError(err)
744 }
745
746 // checkRaceWindow returns a non-nil error if trash/key is, or might
747 // be, in the race window (i.e., it's not safe to trash key).
748 func (v *S3AWSVolume) checkRaceWindow(key string) error {
749         resp, err := v.head("trash/" + key)
750         err = v.translateError(err)
751         if os.IsNotExist(err) {
752                 // OK, trash/X doesn't exist so we're not in the race
753                 // window
754                 return nil
755         } else if err != nil {
756                 // Error looking up trash/X. We don't know whether
757                 // we're in the race window
758                 return err
759         }
760         t := resp.LastModified
761         safeWindow := t.Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
762         if safeWindow <= 0 {
763                 // We can't count on "touch trash/X" to prolong
764                 // trash/X's lifetime. The new timestamp might not
765                 // become visible until now+raceWindow, and EmptyTrash
766                 // is allowed to delete trash/X before then.
767                 return fmt.Errorf("%s: same block is already in trash, and safe window ended %s ago", key, -safeWindow)
768         }
769         // trash/X exists, but it won't be eligible for deletion until
770         // after now+raceWindow, so it's safe to overwrite it.
771         return nil
772 }
773
774 func (b *s3AWSbucket) Del(path string) error {
775         input := &s3.DeleteObjectInput{
776                 Bucket: aws.String(b.bucket),
777                 Key:    aws.String(path),
778         }
779         req := b.svc.DeleteObjectRequest(input)
780         _, err := req.Send(context.Background())
781         b.stats.TickOps("delete")
782         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
783         b.stats.TickErr(err)
784         return err
785 }
786
787 // Trash a Keep block.
788 func (v *S3AWSVolume) BlockTrash(loc string) error {
789         if t, err := v.Mtime(loc); err != nil {
790                 return err
791         } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
792                 return nil
793         }
794         key := v.key(loc)
795         if v.cluster.Collections.BlobTrashLifetime == 0 {
796                 if !v.UnsafeDelete {
797                         return ErrS3TrashDisabled
798                 }
799                 return v.translateError(v.bucket.Del(key))
800         }
801         err := v.checkRaceWindow(key)
802         if err != nil {
803                 return err
804         }
805         err = v.safeCopy("trash/"+key, key)
806         if err != nil {
807                 return err
808         }
809         return v.translateError(v.bucket.Del(key))
810 }
811
812 // BlockUntrash moves block from trash back into store
813 func (v *S3AWSVolume) BlockUntrash(hash string) error {
814         key := v.key(hash)
815         err := v.safeCopy(key, "trash/"+key)
816         if err != nil {
817                 return err
818         }
819         err = v.writeObject(context.Background(), "recent/"+key, nil)
820         return v.translateError(err)
821 }
822
823 type s3awsbucketStats struct {
824         statsTicker
825         Ops     uint64
826         GetOps  uint64
827         PutOps  uint64
828         HeadOps uint64
829         DelOps  uint64
830         ListOps uint64
831 }
832
833 func (s *s3awsbucketStats) TickErr(err error) {
834         if err == nil {
835                 return
836         }
837         errType := fmt.Sprintf("%T", err)
838         if aerr, ok := err.(awserr.Error); ok {
839                 if reqErr, ok := err.(awserr.RequestFailure); ok {
840                         // A service error occurred
841                         errType = errType + fmt.Sprintf(" %d %s", reqErr.StatusCode(), aerr.Code())
842                 } else {
843                         errType = errType + fmt.Sprintf(" 000 %s", aerr.Code())
844                 }
845         }
846         s.statsTicker.TickErr(err, errType)
847 }