21700: Install Bundler system-wide in Rails postinst
[arvados.git] / services / keepstore / s3_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "encoding/base64"
11         "encoding/hex"
12         "encoding/json"
13         "errors"
14         "fmt"
15         "io"
16         "os"
17         "regexp"
18         "strings"
19         "sync"
20         "sync/atomic"
21         "time"
22
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "github.com/aws/aws-sdk-go-v2/aws"
25         "github.com/aws/aws-sdk-go-v2/aws/awserr"
26         "github.com/aws/aws-sdk-go-v2/aws/defaults"
27         "github.com/aws/aws-sdk-go-v2/aws/ec2metadata"
28         "github.com/aws/aws-sdk-go-v2/aws/ec2rolecreds"
29         "github.com/aws/aws-sdk-go-v2/aws/endpoints"
30         "github.com/aws/aws-sdk-go-v2/service/s3"
31         "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
32         "github.com/prometheus/client_golang/prometheus"
33         "github.com/sirupsen/logrus"
34 )
35
36 func init() {
37         driver["S3"] = news3Volume
38 }
39
40 const (
41         s3DefaultReadTimeout        = arvados.Duration(10 * time.Minute)
42         s3DefaultConnectTimeout     = arvados.Duration(time.Minute)
43         maxClockSkew                = 600 * time.Second
44         nearlyRFC1123               = "Mon, 2 Jan 2006 15:04:05 GMT"
45         s3downloaderPartSize        = 6 * 1024 * 1024
46         s3downloaderReadConcurrency = 11
47         s3uploaderPartSize          = 5 * 1024 * 1024
48         s3uploaderWriteConcurrency  = 5
49 )
50
51 var (
52         errS3TrashDisabled   = fmt.Errorf("trash function is disabled because Collections.BlobTrashLifetime=0 and DriverParameters.UnsafeDelete=false")
53         s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
54         s3AWSZeroTime        time.Time
55 )
56
57 // s3Volume implements Volume using an S3 bucket.
58 type s3Volume struct {
59         arvados.S3VolumeDriverParameters
60         AuthToken      string    // populated automatically when IAMRole is used
61         AuthExpiration time.Time // populated automatically when IAMRole is used
62
63         cluster    *arvados.Cluster
64         volume     arvados.Volume
65         logger     logrus.FieldLogger
66         metrics    *volumeMetricsVecs
67         bufferPool *bufferPool
68         bucket     *s3Bucket
69         region     string
70         startOnce  sync.Once
71 }
72
73 // s3bucket wraps s3.bucket and counts I/O and API usage stats. The
74 // wrapped bucket can be replaced atomically with SetBucket in order
75 // to update credentials.
76 type s3Bucket struct {
77         bucket string
78         svc    *s3.Client
79         stats  s3awsbucketStats
80         mu     sync.Mutex
81 }
82
83 func (v *s3Volume) isKeepBlock(s string) (string, bool) {
84         if v.PrefixLength > 0 && len(s) == v.PrefixLength+33 && s[:v.PrefixLength] == s[v.PrefixLength+1:v.PrefixLength*2+1] {
85                 s = s[v.PrefixLength+1:]
86         }
87         return s, s3AWSKeepBlockRegexp.MatchString(s)
88 }
89
90 // Return the key used for a given loc. If PrefixLength==0 then
91 // key("abcdef0123") is "abcdef0123", if PrefixLength==3 then key is
92 // "abc/abcdef0123", etc.
93 func (v *s3Volume) key(loc string) string {
94         if v.PrefixLength > 0 && v.PrefixLength < len(loc)-1 {
95                 return loc[:v.PrefixLength] + "/" + loc
96         } else {
97                 return loc
98         }
99 }
100
101 func news3Volume(params newVolumeParams) (volume, error) {
102         v := &s3Volume{
103                 cluster:    params.Cluster,
104                 volume:     params.ConfigVolume,
105                 metrics:    params.MetricsVecs,
106                 bufferPool: params.BufferPool,
107         }
108         err := json.Unmarshal(params.ConfigVolume.DriverParameters, v)
109         if err != nil {
110                 return nil, err
111         }
112         v.logger = params.Logger.WithField("Volume", v.DeviceID())
113         return v, v.check("")
114 }
115
116 func (v *s3Volume) translateError(err error) error {
117         if _, ok := err.(*aws.RequestCanceledError); ok {
118                 return context.Canceled
119         } else if aerr, ok := err.(awserr.Error); ok {
120                 if aerr.Code() == "NotFound" {
121                         return os.ErrNotExist
122                 } else if aerr.Code() == "NoSuchKey" {
123                         return os.ErrNotExist
124                 }
125         }
126         return err
127 }
128
129 // safeCopy calls CopyObjectRequest, and checks the response to make
130 // sure the copy succeeded and updated the timestamp on the
131 // destination object
132 //
133 // (If something goes wrong during the copy, the error will be
134 // embedded in the 200 OK response)
135 func (v *s3Volume) safeCopy(dst, src string) error {
136         input := &s3.CopyObjectInput{
137                 Bucket:      aws.String(v.bucket.bucket),
138                 ContentType: aws.String("application/octet-stream"),
139                 CopySource:  aws.String(v.bucket.bucket + "/" + src),
140                 Key:         aws.String(dst),
141         }
142
143         req := v.bucket.svc.CopyObjectRequest(input)
144         resp, err := req.Send(context.Background())
145
146         err = v.translateError(err)
147         if os.IsNotExist(err) {
148                 return err
149         } else if err != nil {
150                 return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.bucket+"/"+src, err)
151         }
152
153         if resp.CopyObjectResult.LastModified == nil {
154                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.CopyObjectResult.LastModified, err)
155         } else if time.Now().Sub(*resp.CopyObjectResult.LastModified) > maxClockSkew {
156                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.CopyObjectResult.LastModified, resp.CopyObjectResult.LastModified)
157         }
158         return nil
159 }
160
161 func (v *s3Volume) check(ec2metadataHostname string) error {
162         if v.Bucket == "" {
163                 return errors.New("DriverParameters: Bucket must be provided")
164         }
165         if v.IndexPageSize == 0 {
166                 v.IndexPageSize = 1000
167         }
168         if v.RaceWindow < 0 {
169                 return errors.New("DriverParameters: RaceWindow must not be negative")
170         }
171
172         if v.V2Signature {
173                 return errors.New("DriverParameters: V2Signature is not supported")
174         }
175
176         defaultResolver := endpoints.NewDefaultResolver()
177
178         cfg := defaults.Config()
179
180         if v.Endpoint == "" && v.Region == "" {
181                 return fmt.Errorf("AWS region or endpoint must be specified")
182         } else if v.Endpoint != "" || ec2metadataHostname != "" {
183                 myCustomResolver := func(service, region string) (aws.Endpoint, error) {
184                         if v.Endpoint != "" && service == "s3" {
185                                 return aws.Endpoint{
186                                         URL:           v.Endpoint,
187                                         SigningRegion: region,
188                                 }, nil
189                         } else if service == "ec2metadata" && ec2metadataHostname != "" {
190                                 return aws.Endpoint{
191                                         URL: ec2metadataHostname,
192                                 }, nil
193                         } else {
194                                 return defaultResolver.ResolveEndpoint(service, region)
195                         }
196                 }
197                 cfg.EndpointResolver = aws.EndpointResolverFunc(myCustomResolver)
198         }
199         if v.Region == "" {
200                 // Endpoint is already specified (otherwise we would
201                 // have errored out above), but Region is also
202                 // required by the aws sdk, in order to determine
203                 // SignatureVersions.
204                 v.Region = "us-east-1"
205         }
206         cfg.Region = v.Region
207
208         // Zero timeouts mean "wait forever", which is a bad
209         // default. Default to long timeouts instead.
210         if v.ConnectTimeout == 0 {
211                 v.ConnectTimeout = s3DefaultConnectTimeout
212         }
213         if v.ReadTimeout == 0 {
214                 v.ReadTimeout = s3DefaultReadTimeout
215         }
216
217         creds := aws.NewChainProvider(
218                 []aws.CredentialsProvider{
219                         aws.NewStaticCredentialsProvider(v.AccessKeyID, v.SecretAccessKey, v.AuthToken),
220                         ec2rolecreds.New(ec2metadata.New(cfg), func(opts *ec2rolecreds.ProviderOptions) {
221                                 // (from aws-sdk-go-v2 comments)
222                                 // "allow the credentials to trigger
223                                 // refreshing prior to the credentials
224                                 // actually expiring. This is
225                                 // beneficial so race conditions with
226                                 // expiring credentials do not cause
227                                 // request to fail unexpectedly due to
228                                 // ExpiredTokenException exceptions."
229                                 //
230                                 // (from
231                                 // https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html)
232                                 // "We make new credentials available
233                                 // at least five minutes before the
234                                 // expiration of the old credentials."
235                                 opts.ExpiryWindow = 5 * time.Minute
236                         }),
237                 })
238
239         cfg.Credentials = creds
240
241         v.bucket = &s3Bucket{
242                 bucket: v.Bucket,
243                 svc:    s3.New(cfg),
244         }
245
246         // Set up prometheus metrics
247         lbls := prometheus.Labels{"device_id": v.DeviceID()}
248         v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
249
250         return nil
251 }
252
253 // DeviceID returns a globally unique ID for the storage bucket.
254 func (v *s3Volume) DeviceID() string {
255         return "s3://" + v.Endpoint + "/" + v.Bucket
256 }
257
258 // EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
259 // and deletes them from the volume.
260 func (v *s3Volume) EmptyTrash() {
261         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
262
263         // Define "ready to delete" as "...when EmptyTrash started".
264         startT := time.Now()
265
266         emptyOneKey := func(trash *s3.Object) {
267                 key := strings.TrimPrefix(*trash.Key, "trash/")
268                 loc, isblk := v.isKeepBlock(key)
269                 if !isblk {
270                         return
271                 }
272                 atomic.AddInt64(&bytesInTrash, *trash.Size)
273                 atomic.AddInt64(&blocksInTrash, 1)
274
275                 trashT := *trash.LastModified
276                 recent, err := v.head("recent/" + key)
277                 if err != nil && os.IsNotExist(v.translateError(err)) {
278                         v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", *trash.Key, "recent/"+key, err)
279                         err = v.BlockUntrash(loc)
280                         if err != nil {
281                                 v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc)
282                         }
283                         return
284                 } else if err != nil {
285                         v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+key)
286                         return
287                 }
288                 if trashT.Sub(*recent.LastModified) < v.cluster.Collections.BlobSigningTTL.Duration() {
289                         if age := startT.Sub(*recent.LastModified); age >= v.cluster.Collections.BlobSigningTTL.Duration()-time.Duration(v.RaceWindow) {
290                                 // recent/key is too old to protect
291                                 // loc from being Trashed again during
292                                 // the raceWindow that starts if we
293                                 // delete trash/X now.
294                                 //
295                                 // Note this means (TrashSweepInterval
296                                 // < BlobSigningTTL - raceWindow) is
297                                 // necessary to avoid starvation.
298                                 v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
299                                 v.fixRace(key)
300                                 v.BlockTouch(loc)
301                                 return
302                         }
303                         _, err := v.head(key)
304                         if os.IsNotExist(err) {
305                                 v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
306                                 v.fixRace(key)
307                                 return
308                         } else if err != nil {
309                                 v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
310                                 return
311                         }
312                 }
313                 if startT.Sub(trashT) < v.cluster.Collections.BlobTrashLifetime.Duration() {
314                         return
315                 }
316                 err = v.bucket.Del(*trash.Key)
317                 if err != nil {
318                         v.logger.WithError(err).Errorf("EmptyTrash: error deleting %q", *trash.Key)
319                         return
320                 }
321                 atomic.AddInt64(&bytesDeleted, *trash.Size)
322                 atomic.AddInt64(&blocksDeleted, 1)
323
324                 _, err = v.head(*trash.Key)
325                 if err == nil {
326                         v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc)
327                         return
328                 }
329                 if !os.IsNotExist(v.translateError(err)) {
330                         v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", key)
331                         return
332                 }
333                 err = v.bucket.Del("recent/" + key)
334                 if err != nil {
335                         v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+key)
336                 }
337         }
338
339         var wg sync.WaitGroup
340         todo := make(chan *s3.Object, v.cluster.Collections.BlobDeleteConcurrency)
341         for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
342                 wg.Add(1)
343                 go func() {
344                         defer wg.Done()
345                         for key := range todo {
346                                 emptyOneKey(key)
347                         }
348                 }()
349         }
350
351         trashL := s3awsLister{
352                 Logger:   v.logger,
353                 Bucket:   v.bucket,
354                 Prefix:   "trash/",
355                 PageSize: v.IndexPageSize,
356                 Stats:    &v.bucket.stats,
357         }
358         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
359                 todo <- trash
360         }
361         close(todo)
362         wg.Wait()
363
364         if err := trashL.Error(); err != nil {
365                 v.logger.WithError(err).Error("EmptyTrash: lister failed")
366         }
367         v.logger.Infof("EmptyTrash: stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.DeviceID(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
368 }
369
370 // fixRace(X) is called when "recent/X" exists but "X" doesn't
371 // exist. If the timestamps on "recent/X" and "trash/X" indicate there
372 // was a race between Put and Trash, fixRace recovers from the race by
373 // Untrashing the block.
374 func (v *s3Volume) fixRace(key string) bool {
375         trash, err := v.head("trash/" + key)
376         if err != nil {
377                 if !os.IsNotExist(v.translateError(err)) {
378                         v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+key)
379                 }
380                 return false
381         }
382
383         recent, err := v.head("recent/" + key)
384         if err != nil {
385                 v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+key)
386                 return false
387         }
388
389         recentTime := *recent.LastModified
390         trashTime := *trash.LastModified
391         ageWhenTrashed := trashTime.Sub(recentTime)
392         if ageWhenTrashed >= v.cluster.Collections.BlobSigningTTL.Duration() {
393                 // No evidence of a race: block hasn't been written
394                 // since it became eligible for Trash. No fix needed.
395                 return false
396         }
397
398         v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", key, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
399         v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+key, key)
400         err = v.safeCopy(key, "trash/"+key)
401         if err != nil {
402                 v.logger.WithError(err).Error("fixRace: copy failed")
403                 return false
404         }
405         return true
406 }
407
408 func (v *s3Volume) head(key string) (result *s3.HeadObjectOutput, err error) {
409         input := &s3.HeadObjectInput{
410                 Bucket: aws.String(v.bucket.bucket),
411                 Key:    aws.String(key),
412         }
413
414         req := v.bucket.svc.HeadObjectRequest(input)
415         res, err := req.Send(context.TODO())
416
417         v.bucket.stats.TickOps("head")
418         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.HeadOps)
419         v.bucket.stats.TickErr(err)
420
421         if err != nil {
422                 return nil, v.translateError(err)
423         }
424         result = res.HeadObjectOutput
425         return
426 }
427
428 // BlockRead reads a Keep block that has been stored as a block blob
429 // in the S3 bucket.
430 func (v *s3Volume) BlockRead(ctx context.Context, hash string, w io.WriterAt) error {
431         key := v.key(hash)
432         err := v.readWorker(ctx, key, w)
433         if err != nil {
434                 err = v.translateError(err)
435                 if !os.IsNotExist(err) {
436                         return err
437                 }
438
439                 _, err = v.head("recent/" + key)
440                 err = v.translateError(err)
441                 if err != nil {
442                         // If we can't read recent/X, there's no point in
443                         // trying fixRace. Give up.
444                         return err
445                 }
446                 if !v.fixRace(key) {
447                         err = os.ErrNotExist
448                         return err
449                 }
450
451                 err = v.readWorker(ctx, key, w)
452                 if err != nil {
453                         v.logger.Warnf("reading %s after successful fixRace: %s", hash, err)
454                         err = v.translateError(err)
455                         return err
456                 }
457         }
458         return nil
459 }
460
461 func (v *s3Volume) readWorker(ctx context.Context, key string, dst io.WriterAt) error {
462         downloader := s3manager.NewDownloaderWithClient(v.bucket.svc, func(u *s3manager.Downloader) {
463                 u.PartSize = s3downloaderPartSize
464                 u.Concurrency = s3downloaderReadConcurrency
465         })
466         count, err := downloader.DownloadWithContext(ctx, dst, &s3.GetObjectInput{
467                 Bucket: aws.String(v.bucket.bucket),
468                 Key:    aws.String(key),
469         })
470         v.bucket.stats.TickOps("get")
471         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.GetOps)
472         v.bucket.stats.TickErr(err)
473         v.bucket.stats.TickInBytes(uint64(count))
474         return v.translateError(err)
475 }
476
477 func (v *s3Volume) writeObject(ctx context.Context, key string, r io.Reader) error {
478         if r == nil {
479                 // r == nil leads to a memory violation in func readFillBuf in
480                 // aws-sdk-go-v2@v0.23.0/service/s3/s3manager/upload.go
481                 r = bytes.NewReader(nil)
482         }
483
484         uploadInput := s3manager.UploadInput{
485                 Bucket: aws.String(v.bucket.bucket),
486                 Key:    aws.String(key),
487                 Body:   r,
488         }
489
490         if loc, ok := v.isKeepBlock(key); ok {
491                 var contentMD5 string
492                 md5, err := hex.DecodeString(loc)
493                 if err != nil {
494                         return v.translateError(err)
495                 }
496                 contentMD5 = base64.StdEncoding.EncodeToString(md5)
497                 uploadInput.ContentMD5 = &contentMD5
498         }
499
500         // Experimentation indicated that using concurrency 5 yields the best
501         // throughput, better than higher concurrency (10 or 13) by ~5%.
502         // Defining u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(64 * 1024 * 1024)
503         // is detrimental to throughput (minus ~15%).
504         uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
505                 u.PartSize = s3uploaderPartSize
506                 u.Concurrency = s3uploaderWriteConcurrency
507         })
508
509         // Unlike the goamz S3 driver, we don't need to precompute ContentSHA256:
510         // the aws-sdk-go v2 SDK uses a ReadSeeker to avoid having to copy the
511         // block, so there is no extra memory use to be concerned about. See
512         // makeSha256Reader in aws/signer/v4/v4.go. In fact, we explicitly disable
513         // calculating the Sha-256 because we don't need it; we already use md5sum
514         // hashes that match the name of the block.
515         _, err := uploader.UploadWithContext(ctx, &uploadInput, s3manager.WithUploaderRequestOptions(func(r *aws.Request) {
516                 r.HTTPRequest.Header.Set("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
517         }))
518
519         v.bucket.stats.TickOps("put")
520         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.PutOps)
521         v.bucket.stats.TickErr(err)
522
523         return v.translateError(err)
524 }
525
526 // Put writes a block.
527 func (v *s3Volume) BlockWrite(ctx context.Context, hash string, data []byte) error {
528         // Do not use putWithPipe here; we want to pass an io.ReadSeeker to the S3
529         // sdk to avoid memory allocation there. See #17339 for more information.
530         rdr := bytes.NewReader(data)
531         r := newCountingReaderAtSeeker(rdr, v.bucket.stats.TickOutBytes)
532         key := v.key(hash)
533         err := v.writeObject(ctx, key, r)
534         if err != nil {
535                 return err
536         }
537         return v.writeObject(ctx, "recent/"+key, nil)
538 }
539
540 type s3awsLister struct {
541         Logger            logrus.FieldLogger
542         Bucket            *s3Bucket
543         Prefix            string
544         PageSize          int
545         Stats             *s3awsbucketStats
546         ContinuationToken string
547         buf               []s3.Object
548         err               error
549 }
550
551 // First fetches the first page and returns the first item. It returns
552 // nil if the response is the empty set or an error occurs.
553 func (lister *s3awsLister) First() *s3.Object {
554         lister.getPage()
555         return lister.pop()
556 }
557
558 // Next returns the next item, fetching the next page if necessary. It
559 // returns nil if the last available item has already been fetched, or
560 // an error occurs.
561 func (lister *s3awsLister) Next() *s3.Object {
562         if len(lister.buf) == 0 && lister.ContinuationToken != "" {
563                 lister.getPage()
564         }
565         return lister.pop()
566 }
567
568 // Return the most recent error encountered by First or Next.
569 func (lister *s3awsLister) Error() error {
570         return lister.err
571 }
572
573 func (lister *s3awsLister) getPage() {
574         lister.Stats.TickOps("list")
575         lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
576
577         var input *s3.ListObjectsV2Input
578         if lister.ContinuationToken == "" {
579                 input = &s3.ListObjectsV2Input{
580                         Bucket:  aws.String(lister.Bucket.bucket),
581                         MaxKeys: aws.Int64(int64(lister.PageSize)),
582                         Prefix:  aws.String(lister.Prefix),
583                 }
584         } else {
585                 input = &s3.ListObjectsV2Input{
586                         Bucket:            aws.String(lister.Bucket.bucket),
587                         MaxKeys:           aws.Int64(int64(lister.PageSize)),
588                         Prefix:            aws.String(lister.Prefix),
589                         ContinuationToken: &lister.ContinuationToken,
590                 }
591         }
592
593         req := lister.Bucket.svc.ListObjectsV2Request(input)
594         resp, err := req.Send(context.Background())
595         if err != nil {
596                 if aerr, ok := err.(awserr.Error); ok {
597                         lister.err = aerr
598                 } else {
599                         lister.err = err
600                 }
601                 return
602         }
603
604         if *resp.IsTruncated {
605                 lister.ContinuationToken = *resp.NextContinuationToken
606         } else {
607                 lister.ContinuationToken = ""
608         }
609         lister.buf = make([]s3.Object, 0, len(resp.Contents))
610         for _, key := range resp.Contents {
611                 if !strings.HasPrefix(*key.Key, lister.Prefix) {
612                         lister.Logger.Warnf("s3awsLister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, *key.Key)
613                         continue
614                 }
615                 lister.buf = append(lister.buf, key)
616         }
617 }
618
619 func (lister *s3awsLister) pop() (k *s3.Object) {
620         if len(lister.buf) > 0 {
621                 k = &lister.buf[0]
622                 lister.buf = lister.buf[1:]
623         }
624         return
625 }
626
627 // Index writes a complete list of locators with the given prefix
628 // for which Get() can retrieve data.
629 func (v *s3Volume) Index(ctx context.Context, prefix string, writer io.Writer) error {
630         prefix = v.key(prefix)
631         // Use a merge sort to find matching sets of X and recent/X.
632         dataL := s3awsLister{
633                 Logger:   v.logger,
634                 Bucket:   v.bucket,
635                 Prefix:   prefix,
636                 PageSize: v.IndexPageSize,
637                 Stats:    &v.bucket.stats,
638         }
639         recentL := s3awsLister{
640                 Logger:   v.logger,
641                 Bucket:   v.bucket,
642                 Prefix:   "recent/" + prefix,
643                 PageSize: v.IndexPageSize,
644                 Stats:    &v.bucket.stats,
645         }
646         for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
647                 if ctx.Err() != nil {
648                         return ctx.Err()
649                 }
650                 if *data.Key >= "g" {
651                         // Conveniently, "recent/*" and "trash/*" are
652                         // lexically greater than all hex-encoded data
653                         // hashes, so stopping here avoids iterating
654                         // over all of them needlessly with dataL.
655                         break
656                 }
657                 loc, isblk := v.isKeepBlock(*data.Key)
658                 if !isblk {
659                         continue
660                 }
661
662                 // stamp is the list entry we should use to report the
663                 // last-modified time for this data block: it will be
664                 // the recent/X entry if one exists, otherwise the
665                 // entry for the data block itself.
666                 stamp := data
667
668                 // Advance to the corresponding recent/X marker, if any
669                 for recent != nil && recentL.Error() == nil {
670                         if cmp := strings.Compare((*recent.Key)[7:], *data.Key); cmp < 0 {
671                                 recent = recentL.Next()
672                                 continue
673                         } else if cmp == 0 {
674                                 stamp = recent
675                                 recent = recentL.Next()
676                                 break
677                         } else {
678                                 // recent/X marker is missing: we'll
679                                 // use the timestamp on the data
680                                 // object.
681                                 break
682                         }
683                 }
684                 if err := recentL.Error(); err != nil {
685                         return err
686                 }
687                 // We truncate sub-second precision here. Otherwise
688                 // timestamps will never match the RFC1123-formatted
689                 // Last-Modified values parsed by Mtime().
690                 fmt.Fprintf(writer, "%s+%d %d\n", loc, *data.Size, stamp.LastModified.Unix()*1000000000)
691         }
692         return dataL.Error()
693 }
694
695 // Mtime returns the stored timestamp for the given locator.
696 func (v *s3Volume) Mtime(loc string) (time.Time, error) {
697         key := v.key(loc)
698         _, err := v.head(key)
699         if err != nil {
700                 return s3AWSZeroTime, v.translateError(err)
701         }
702         resp, err := v.head("recent/" + key)
703         err = v.translateError(err)
704         if os.IsNotExist(err) {
705                 // The data object X exists, but recent/X is missing.
706                 err = v.writeObject(context.Background(), "recent/"+key, nil)
707                 if err != nil {
708                         v.logger.WithError(err).Errorf("error creating %q", "recent/"+key)
709                         return s3AWSZeroTime, v.translateError(err)
710                 }
711                 v.logger.Infof("Mtime: created %q to migrate existing block to new storage scheme", "recent/"+key)
712                 resp, err = v.head("recent/" + key)
713                 if err != nil {
714                         v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+key)
715                         return s3AWSZeroTime, v.translateError(err)
716                 }
717         } else if err != nil {
718                 // HEAD recent/X failed for some other reason.
719                 return s3AWSZeroTime, err
720         }
721         return *resp.LastModified, err
722 }
723
724 // InternalStats returns bucket I/O and API call counters.
725 func (v *s3Volume) InternalStats() interface{} {
726         return &v.bucket.stats
727 }
728
729 // BlockTouch sets the timestamp for the given locator to the current time.
730 func (v *s3Volume) BlockTouch(hash string) error {
731         key := v.key(hash)
732         _, err := v.head(key)
733         err = v.translateError(err)
734         if os.IsNotExist(err) && v.fixRace(key) {
735                 // The data object got trashed in a race, but fixRace
736                 // rescued it.
737         } else if err != nil {
738                 return err
739         }
740         err = v.writeObject(context.Background(), "recent/"+key, nil)
741         return v.translateError(err)
742 }
743
744 // checkRaceWindow returns a non-nil error if trash/key is, or might
745 // be, in the race window (i.e., it's not safe to trash key).
746 func (v *s3Volume) checkRaceWindow(key string) error {
747         resp, err := v.head("trash/" + key)
748         err = v.translateError(err)
749         if os.IsNotExist(err) {
750                 // OK, trash/X doesn't exist so we're not in the race
751                 // window
752                 return nil
753         } else if err != nil {
754                 // Error looking up trash/X. We don't know whether
755                 // we're in the race window
756                 return err
757         }
758         t := resp.LastModified
759         safeWindow := t.Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
760         if safeWindow <= 0 {
761                 // We can't count on "touch trash/X" to prolong
762                 // trash/X's lifetime. The new timestamp might not
763                 // become visible until now+raceWindow, and EmptyTrash
764                 // is allowed to delete trash/X before then.
765                 return fmt.Errorf("%s: same block is already in trash, and safe window ended %s ago", key, -safeWindow)
766         }
767         // trash/X exists, but it won't be eligible for deletion until
768         // after now+raceWindow, so it's safe to overwrite it.
769         return nil
770 }
771
772 func (b *s3Bucket) Del(path string) error {
773         input := &s3.DeleteObjectInput{
774                 Bucket: aws.String(b.bucket),
775                 Key:    aws.String(path),
776         }
777         req := b.svc.DeleteObjectRequest(input)
778         _, err := req.Send(context.Background())
779         b.stats.TickOps("delete")
780         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
781         b.stats.TickErr(err)
782         return err
783 }
784
785 // Trash a Keep block.
786 func (v *s3Volume) BlockTrash(loc string) error {
787         if t, err := v.Mtime(loc); err != nil {
788                 return err
789         } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
790                 return nil
791         }
792         key := v.key(loc)
793         if v.cluster.Collections.BlobTrashLifetime == 0 {
794                 if !v.UnsafeDelete {
795                         return errS3TrashDisabled
796                 }
797                 return v.translateError(v.bucket.Del(key))
798         }
799         err := v.checkRaceWindow(key)
800         if err != nil {
801                 return err
802         }
803         err = v.safeCopy("trash/"+key, key)
804         if err != nil {
805                 return err
806         }
807         return v.translateError(v.bucket.Del(key))
808 }
809
810 // BlockUntrash moves block from trash back into store
811 func (v *s3Volume) BlockUntrash(hash string) error {
812         key := v.key(hash)
813         err := v.safeCopy(key, "trash/"+key)
814         if err != nil {
815                 return err
816         }
817         err = v.writeObject(context.Background(), "recent/"+key, nil)
818         return v.translateError(err)
819 }
820
821 type s3awsbucketStats struct {
822         statsTicker
823         Ops     uint64
824         GetOps  uint64
825         PutOps  uint64
826         HeadOps uint64
827         DelOps  uint64
828         ListOps uint64
829 }
830
831 func (s *s3awsbucketStats) TickErr(err error) {
832         if err == nil {
833                 return
834         }
835         errType := fmt.Sprintf("%T", err)
836         if aerr, ok := err.(awserr.Error); ok {
837                 if reqErr, ok := err.(awserr.RequestFailure); ok {
838                         // A service error occurred
839                         errType = errType + fmt.Sprintf(" %d %s", reqErr.StatusCode(), aerr.Code())
840                 } else {
841                         errType = errType + fmt.Sprintf(" 000 %s", aerr.Code())
842                 }
843         }
844         s.statsTicker.TickErr(err, errType)
845 }