19234: s3v2 + non-aws: default to us-east-1 signing settings.
[arvados.git] / services / keepstore / s3aws_volume.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package keepstore
6
7 import (
8         "bytes"
9         "context"
10         "encoding/base64"
11         "encoding/hex"
12         "encoding/json"
13         "errors"
14         "fmt"
15         "io"
16         "os"
17         "regexp"
18         "strings"
19         "sync"
20         "sync/atomic"
21         "time"
22
23         "git.arvados.org/arvados.git/sdk/go/arvados"
24         "github.com/aws/aws-sdk-go-v2/aws"
25         "github.com/aws/aws-sdk-go-v2/aws/awserr"
26         "github.com/aws/aws-sdk-go-v2/aws/defaults"
27         "github.com/aws/aws-sdk-go-v2/aws/ec2metadata"
28         "github.com/aws/aws-sdk-go-v2/aws/ec2rolecreds"
29         "github.com/aws/aws-sdk-go-v2/aws/endpoints"
30         "github.com/aws/aws-sdk-go-v2/service/s3"
31         "github.com/aws/aws-sdk-go-v2/service/s3/s3manager"
32         "github.com/prometheus/client_golang/prometheus"
33         "github.com/sirupsen/logrus"
34 )
35
36 // S3AWSVolume implements Volume using an S3 bucket.
37 type S3AWSVolume struct {
38         arvados.S3VolumeDriverParameters
39         AuthToken      string    // populated automatically when IAMRole is used
40         AuthExpiration time.Time // populated automatically when IAMRole is used
41
42         cluster   *arvados.Cluster
43         volume    arvados.Volume
44         logger    logrus.FieldLogger
45         metrics   *volumeMetricsVecs
46         bucket    *s3AWSbucket
47         region    string
48         startOnce sync.Once
49 }
50
51 // s3bucket wraps s3.bucket and counts I/O and API usage stats. The
52 // wrapped bucket can be replaced atomically with SetBucket in order
53 // to update credentials.
54 type s3AWSbucket struct {
55         bucket string
56         svc    *s3.Client
57         stats  s3awsbucketStats
58         mu     sync.Mutex
59 }
60
61 // chooseS3VolumeDriver distinguishes between the old goamz driver and
62 // aws-sdk-go based on the UseAWSS3v2Driver feature flag
63 func chooseS3VolumeDriver(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
64         v := &S3Volume{cluster: cluster, volume: volume, metrics: metrics}
65         // Default value will be overriden if it happens to be defined in the config
66         v.S3VolumeDriverParameters.UseAWSS3v2Driver = true
67         err := json.Unmarshal(volume.DriverParameters, v)
68         if err != nil {
69                 return nil, err
70         }
71         if v.UseAWSS3v2Driver {
72                 logger.Debugln("Using AWS S3 v2 driver")
73                 return newS3AWSVolume(cluster, volume, logger, metrics)
74         }
75         logger.Debugln("Using goamz S3 driver")
76         return newS3Volume(cluster, volume, logger, metrics)
77 }
78
79 const (
80         PartSize         = 5 * 1024 * 1024
81         ReadConcurrency  = 13
82         WriteConcurrency = 5
83 )
84
85 var s3AWSKeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
86 var s3AWSZeroTime time.Time
87
88 func (v *S3AWSVolume) isKeepBlock(s string) (string, bool) {
89         if v.PrefixLength > 0 && len(s) == v.PrefixLength+33 && s[:v.PrefixLength] == s[v.PrefixLength+1:v.PrefixLength*2+1] {
90                 s = s[v.PrefixLength+1:]
91         }
92         return s, s3AWSKeepBlockRegexp.MatchString(s)
93 }
94
95 // Return the key used for a given loc. If PrefixLength==0 then
96 // key("abcdef0123") is "abcdef0123", if PrefixLength==3 then key is
97 // "abc/abcdef0123", etc.
98 func (v *S3AWSVolume) key(loc string) string {
99         if v.PrefixLength > 0 && v.PrefixLength < len(loc)-1 {
100                 return loc[:v.PrefixLength] + "/" + loc
101         } else {
102                 return loc
103         }
104 }
105
106 func newS3AWSVolume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
107         v := &S3AWSVolume{cluster: cluster, volume: volume, metrics: metrics}
108         err := json.Unmarshal(volume.DriverParameters, v)
109         if err != nil {
110                 return nil, err
111         }
112         v.logger = logger.WithField("Volume", v.String())
113         return v, v.check("")
114 }
115
116 func (v *S3AWSVolume) translateError(err error) error {
117         if _, ok := err.(*aws.RequestCanceledError); ok {
118                 return context.Canceled
119         } else if aerr, ok := err.(awserr.Error); ok {
120                 if aerr.Code() == "NotFound" {
121                         return os.ErrNotExist
122                 } else if aerr.Code() == "NoSuchKey" {
123                         return os.ErrNotExist
124                 }
125         }
126         return err
127 }
128
129 // safeCopy calls CopyObjectRequest, and checks the response to make
130 // sure the copy succeeded and updated the timestamp on the
131 // destination object
132 //
133 // (If something goes wrong during the copy, the error will be
134 // embedded in the 200 OK response)
135 func (v *S3AWSVolume) safeCopy(dst, src string) error {
136         input := &s3.CopyObjectInput{
137                 Bucket:      aws.String(v.bucket.bucket),
138                 ContentType: aws.String("application/octet-stream"),
139                 CopySource:  aws.String(v.bucket.bucket + "/" + src),
140                 Key:         aws.String(dst),
141         }
142
143         req := v.bucket.svc.CopyObjectRequest(input)
144         resp, err := req.Send(context.Background())
145
146         err = v.translateError(err)
147         if os.IsNotExist(err) {
148                 return err
149         } else if err != nil {
150                 return fmt.Errorf("PutCopy(%q ← %q): %s", dst, v.bucket.bucket+"/"+src, err)
151         }
152
153         if resp.CopyObjectResult.LastModified == nil {
154                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.CopyObjectResult.LastModified, err)
155         } else if time.Now().Sub(*resp.CopyObjectResult.LastModified) > maxClockSkew {
156                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.CopyObjectResult.LastModified, resp.CopyObjectResult.LastModified)
157         }
158         return nil
159 }
160
161 func (v *S3AWSVolume) check(ec2metadataHostname string) error {
162         if v.Bucket == "" {
163                 return errors.New("DriverParameters: Bucket must be provided")
164         }
165         if v.IndexPageSize == 0 {
166                 v.IndexPageSize = 1000
167         }
168         if v.RaceWindow < 0 {
169                 return errors.New("DriverParameters: RaceWindow must not be negative")
170         }
171
172         if v.V2Signature {
173                 return errors.New("DriverParameters: V2Signature is not supported")
174         }
175
176         defaultResolver := endpoints.NewDefaultResolver()
177
178         cfg := defaults.Config()
179
180         if v.Endpoint == "" && v.Region == "" {
181                 return fmt.Errorf("AWS region or endpoint must be specified")
182         } else if v.Endpoint != "" || ec2metadataHostname != "" {
183                 myCustomResolver := func(service, region string) (aws.Endpoint, error) {
184                         if v.Endpoint != "" && service == "s3" {
185                                 return aws.Endpoint{
186                                         URL:           v.Endpoint,
187                                         SigningRegion: region,
188                                 }, nil
189                         } else if service == "ec2metadata" && ec2metadataHostname != "" {
190                                 return aws.Endpoint{
191                                         URL: ec2metadataHostname,
192                                 }, nil
193                         } else {
194                                 return defaultResolver.ResolveEndpoint(service, region)
195                         }
196                 }
197                 cfg.EndpointResolver = aws.EndpointResolverFunc(myCustomResolver)
198         }
199         if v.Region == "" {
200                 // Endpoint is already specified (otherwise we would
201                 // have errored out above), but Region is also
202                 // required by the aws sdk, in order to determine
203                 // SignatureVersions.
204                 v.Region = "us-east-1"
205         }
206         cfg.Region = v.Region
207
208         // Zero timeouts mean "wait forever", which is a bad
209         // default. Default to long timeouts instead.
210         if v.ConnectTimeout == 0 {
211                 v.ConnectTimeout = s3DefaultConnectTimeout
212         }
213         if v.ReadTimeout == 0 {
214                 v.ReadTimeout = s3DefaultReadTimeout
215         }
216
217         creds := aws.NewChainProvider(
218                 []aws.CredentialsProvider{
219                         aws.NewStaticCredentialsProvider(v.AccessKeyID, v.SecretAccessKey, v.AuthToken),
220                         ec2rolecreds.New(ec2metadata.New(cfg)),
221                 })
222
223         cfg.Credentials = creds
224
225         v.bucket = &s3AWSbucket{
226                 bucket: v.Bucket,
227                 svc:    s3.New(cfg),
228         }
229
230         // Set up prometheus metrics
231         lbls := prometheus.Labels{"device_id": v.GetDeviceID()}
232         v.bucket.stats.opsCounters, v.bucket.stats.errCounters, v.bucket.stats.ioBytes = v.metrics.getCounterVecsFor(lbls)
233
234         return nil
235 }
236
237 // String implements fmt.Stringer.
238 func (v *S3AWSVolume) String() string {
239         return fmt.Sprintf("s3-bucket:%+q", v.Bucket)
240 }
241
242 // GetDeviceID returns a globally unique ID for the storage bucket.
243 func (v *S3AWSVolume) GetDeviceID() string {
244         return "s3://" + v.Endpoint + "/" + v.Bucket
245 }
246
247 // Compare the given data with the stored data.
248 func (v *S3AWSVolume) Compare(ctx context.Context, loc string, expect []byte) error {
249         key := v.key(loc)
250         errChan := make(chan error, 1)
251         go func() {
252                 _, err := v.head("recent/" + key)
253                 errChan <- err
254         }()
255         var err error
256         select {
257         case <-ctx.Done():
258                 return ctx.Err()
259         case err = <-errChan:
260         }
261         if err != nil {
262                 // Checking for the key itself here would interfere
263                 // with future GET requests.
264                 //
265                 // On AWS, if X doesn't exist, a HEAD or GET request
266                 // for X causes X's non-existence to be cached. Thus,
267                 // if we test for X, then create X and return a
268                 // signature to our client, the client might still get
269                 // 404 from all keepstores when trying to read it.
270                 //
271                 // To avoid this, we avoid doing HEAD X or GET X until
272                 // we know X has been written.
273                 //
274                 // Note that X might exist even though recent/X
275                 // doesn't: for example, the response to HEAD recent/X
276                 // might itself come from a stale cache. In such
277                 // cases, we will return a false negative and
278                 // PutHandler might needlessly create another replica
279                 // on a different volume. That's not ideal, but it's
280                 // better than passing the eventually-consistent
281                 // problem on to our clients.
282                 return v.translateError(err)
283         }
284
285         input := &s3.GetObjectInput{
286                 Bucket: aws.String(v.bucket.bucket),
287                 Key:    aws.String(key),
288         }
289
290         req := v.bucket.svc.GetObjectRequest(input)
291         result, err := req.Send(ctx)
292         if err != nil {
293                 return v.translateError(err)
294         }
295         return v.translateError(compareReaderWithBuf(ctx, result.Body, expect, loc[:32]))
296 }
297
298 // EmptyTrash looks for trashed blocks that exceeded BlobTrashLifetime
299 // and deletes them from the volume.
300 func (v *S3AWSVolume) EmptyTrash() {
301         if v.cluster.Collections.BlobDeleteConcurrency < 1 {
302                 return
303         }
304
305         var bytesInTrash, blocksInTrash, bytesDeleted, blocksDeleted int64
306
307         // Define "ready to delete" as "...when EmptyTrash started".
308         startT := time.Now()
309
310         emptyOneKey := func(trash *s3.Object) {
311                 key := strings.TrimPrefix(*trash.Key, "trash/")
312                 loc, isblk := v.isKeepBlock(key)
313                 if !isblk {
314                         return
315                 }
316                 atomic.AddInt64(&bytesInTrash, *trash.Size)
317                 atomic.AddInt64(&blocksInTrash, 1)
318
319                 trashT := *trash.LastModified
320                 recent, err := v.head("recent/" + key)
321                 if err != nil && os.IsNotExist(v.translateError(err)) {
322                         v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", *trash.Key, "recent/"+key, err)
323                         err = v.Untrash(loc)
324                         if err != nil {
325                                 v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc)
326                         }
327                         return
328                 } else if err != nil {
329                         v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+key)
330                         return
331                 }
332                 if trashT.Sub(*recent.LastModified) < v.cluster.Collections.BlobSigningTTL.Duration() {
333                         if age := startT.Sub(*recent.LastModified); age >= v.cluster.Collections.BlobSigningTTL.Duration()-time.Duration(v.RaceWindow) {
334                                 // recent/key is too old to protect
335                                 // loc from being Trashed again during
336                                 // the raceWindow that starts if we
337                                 // delete trash/X now.
338                                 //
339                                 // Note this means (TrashSweepInterval
340                                 // < BlobSigningTTL - raceWindow) is
341                                 // necessary to avoid starvation.
342                                 v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
343                                 v.fixRace(key)
344                                 v.Touch(loc)
345                                 return
346                         }
347                         _, err := v.head(key)
348                         if os.IsNotExist(err) {
349                                 v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
350                                 v.fixRace(key)
351                                 return
352                         } else if err != nil {
353                                 v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
354                                 return
355                         }
356                 }
357                 if startT.Sub(trashT) < v.cluster.Collections.BlobTrashLifetime.Duration() {
358                         return
359                 }
360                 err = v.bucket.Del(*trash.Key)
361                 if err != nil {
362                         v.logger.WithError(err).Errorf("EmptyTrash: error deleting %q", *trash.Key)
363                         return
364                 }
365                 atomic.AddInt64(&bytesDeleted, *trash.Size)
366                 atomic.AddInt64(&blocksDeleted, 1)
367
368                 _, err = v.head(*trash.Key)
369                 if err == nil {
370                         v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc)
371                         return
372                 }
373                 if !os.IsNotExist(v.translateError(err)) {
374                         v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", key)
375                         return
376                 }
377                 err = v.bucket.Del("recent/" + key)
378                 if err != nil {
379                         v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+key)
380                 }
381         }
382
383         var wg sync.WaitGroup
384         todo := make(chan *s3.Object, v.cluster.Collections.BlobDeleteConcurrency)
385         for i := 0; i < v.cluster.Collections.BlobDeleteConcurrency; i++ {
386                 wg.Add(1)
387                 go func() {
388                         defer wg.Done()
389                         for key := range todo {
390                                 emptyOneKey(key)
391                         }
392                 }()
393         }
394
395         trashL := s3awsLister{
396                 Logger:   v.logger,
397                 Bucket:   v.bucket,
398                 Prefix:   "trash/",
399                 PageSize: v.IndexPageSize,
400                 Stats:    &v.bucket.stats,
401         }
402         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
403                 todo <- trash
404         }
405         close(todo)
406         wg.Wait()
407
408         if err := trashL.Error(); err != nil {
409                 v.logger.WithError(err).Error("EmptyTrash: lister failed")
410         }
411         v.logger.Infof("EmptyTrash: stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
412 }
413
414 // fixRace(X) is called when "recent/X" exists but "X" doesn't
415 // exist. If the timestamps on "recent/X" and "trash/X" indicate there
416 // was a race between Put and Trash, fixRace recovers from the race by
417 // Untrashing the block.
418 func (v *S3AWSVolume) fixRace(key string) bool {
419         trash, err := v.head("trash/" + key)
420         if err != nil {
421                 if !os.IsNotExist(v.translateError(err)) {
422                         v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+key)
423                 }
424                 return false
425         }
426
427         recent, err := v.head("recent/" + key)
428         if err != nil {
429                 v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+key)
430                 return false
431         }
432
433         recentTime := *recent.LastModified
434         trashTime := *trash.LastModified
435         ageWhenTrashed := trashTime.Sub(recentTime)
436         if ageWhenTrashed >= v.cluster.Collections.BlobSigningTTL.Duration() {
437                 // No evidence of a race: block hasn't been written
438                 // since it became eligible for Trash. No fix needed.
439                 return false
440         }
441
442         v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", key, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
443         v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+key, key)
444         err = v.safeCopy(key, "trash/"+key)
445         if err != nil {
446                 v.logger.WithError(err).Error("fixRace: copy failed")
447                 return false
448         }
449         return true
450 }
451
452 func (v *S3AWSVolume) head(key string) (result *s3.HeadObjectOutput, err error) {
453         input := &s3.HeadObjectInput{
454                 Bucket: aws.String(v.bucket.bucket),
455                 Key:    aws.String(key),
456         }
457
458         req := v.bucket.svc.HeadObjectRequest(input)
459         res, err := req.Send(context.TODO())
460
461         v.bucket.stats.TickOps("head")
462         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.HeadOps)
463         v.bucket.stats.TickErr(err)
464
465         if err != nil {
466                 return nil, v.translateError(err)
467         }
468         result = res.HeadObjectOutput
469         return
470 }
471
472 // Get a block: copy the block data into buf, and return the number of
473 // bytes copied.
474 func (v *S3AWSVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
475         // Do not use getWithPipe here: the BlockReader interface does not pass
476         // through 'buf []byte', and we don't want to allocate two buffers for each
477         // read request. Instead, use a version of ReadBlock that accepts 'buf []byte'
478         // as an input.
479         key := v.key(loc)
480         count, err := v.readWorker(ctx, key, buf)
481         if err == nil {
482                 return count, err
483         }
484
485         err = v.translateError(err)
486         if !os.IsNotExist(err) {
487                 return 0, err
488         }
489
490         _, err = v.head("recent/" + key)
491         err = v.translateError(err)
492         if err != nil {
493                 // If we can't read recent/X, there's no point in
494                 // trying fixRace. Give up.
495                 return 0, err
496         }
497         if !v.fixRace(key) {
498                 err = os.ErrNotExist
499                 return 0, err
500         }
501
502         count, err = v.readWorker(ctx, key, buf)
503         if err != nil {
504                 v.logger.Warnf("reading %s after successful fixRace: %s", loc, err)
505                 err = v.translateError(err)
506                 return 0, err
507         }
508         return count, err
509 }
510
511 func (v *S3AWSVolume) readWorker(ctx context.Context, key string, buf []byte) (int, error) {
512         awsBuf := aws.NewWriteAtBuffer(buf)
513         downloader := s3manager.NewDownloaderWithClient(v.bucket.svc, func(u *s3manager.Downloader) {
514                 u.PartSize = PartSize
515                 u.Concurrency = ReadConcurrency
516         })
517
518         v.logger.Debugf("Partsize: %d; Concurrency: %d\n", downloader.PartSize, downloader.Concurrency)
519
520         count, err := downloader.DownloadWithContext(ctx, awsBuf, &s3.GetObjectInput{
521                 Bucket: aws.String(v.bucket.bucket),
522                 Key:    aws.String(key),
523         })
524         v.bucket.stats.TickOps("get")
525         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.GetOps)
526         v.bucket.stats.TickErr(err)
527         v.bucket.stats.TickInBytes(uint64(count))
528         return int(count), v.translateError(err)
529 }
530
531 func (v *S3AWSVolume) writeObject(ctx context.Context, key string, r io.Reader) error {
532         if r == nil {
533                 // r == nil leads to a memory violation in func readFillBuf in
534                 // aws-sdk-go-v2@v0.23.0/service/s3/s3manager/upload.go
535                 r = bytes.NewReader(nil)
536         }
537
538         uploadInput := s3manager.UploadInput{
539                 Bucket: aws.String(v.bucket.bucket),
540                 Key:    aws.String(key),
541                 Body:   r,
542         }
543
544         if loc, ok := v.isKeepBlock(key); ok {
545                 var contentMD5 string
546                 md5, err := hex.DecodeString(loc)
547                 if err != nil {
548                         return v.translateError(err)
549                 }
550                 contentMD5 = base64.StdEncoding.EncodeToString(md5)
551                 uploadInput.ContentMD5 = &contentMD5
552         }
553
554         // Experimentation indicated that using concurrency 5 yields the best
555         // throughput, better than higher concurrency (10 or 13) by ~5%.
556         // Defining u.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(64 * 1024 * 1024)
557         // is detrimental to througput (minus ~15%).
558         uploader := s3manager.NewUploaderWithClient(v.bucket.svc, func(u *s3manager.Uploader) {
559                 u.PartSize = PartSize
560                 u.Concurrency = WriteConcurrency
561         })
562
563         // Unlike the goamz S3 driver, we don't need to precompute ContentSHA256:
564         // the aws-sdk-go v2 SDK uses a ReadSeeker to avoid having to copy the
565         // block, so there is no extra memory use to be concerned about. See
566         // makeSha256Reader in aws/signer/v4/v4.go. In fact, we explicitly disable
567         // calculating the Sha-256 because we don't need it; we already use md5sum
568         // hashes that match the name of the block.
569         _, err := uploader.UploadWithContext(ctx, &uploadInput, s3manager.WithUploaderRequestOptions(func(r *aws.Request) {
570                 r.HTTPRequest.Header.Set("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
571         }))
572
573         v.bucket.stats.TickOps("put")
574         v.bucket.stats.Tick(&v.bucket.stats.Ops, &v.bucket.stats.PutOps)
575         v.bucket.stats.TickErr(err)
576
577         return v.translateError(err)
578 }
579
580 // Put writes a block.
581 func (v *S3AWSVolume) Put(ctx context.Context, loc string, block []byte) error {
582         // Do not use putWithPipe here; we want to pass an io.ReadSeeker to the S3
583         // sdk to avoid memory allocation there. See #17339 for more information.
584         if v.volume.ReadOnly {
585                 return MethodDisabledError
586         }
587
588         rdr := bytes.NewReader(block)
589         r := NewCountingReaderAtSeeker(rdr, v.bucket.stats.TickOutBytes)
590         key := v.key(loc)
591         err := v.writeObject(ctx, key, r)
592         if err != nil {
593                 return err
594         }
595         return v.writeObject(ctx, "recent/"+key, nil)
596 }
597
598 type s3awsLister struct {
599         Logger            logrus.FieldLogger
600         Bucket            *s3AWSbucket
601         Prefix            string
602         PageSize          int
603         Stats             *s3awsbucketStats
604         ContinuationToken string
605         buf               []s3.Object
606         err               error
607 }
608
609 // First fetches the first page and returns the first item. It returns
610 // nil if the response is the empty set or an error occurs.
611 func (lister *s3awsLister) First() *s3.Object {
612         lister.getPage()
613         return lister.pop()
614 }
615
616 // Next returns the next item, fetching the next page if necessary. It
617 // returns nil if the last available item has already been fetched, or
618 // an error occurs.
619 func (lister *s3awsLister) Next() *s3.Object {
620         if len(lister.buf) == 0 && lister.ContinuationToken != "" {
621                 lister.getPage()
622         }
623         return lister.pop()
624 }
625
626 // Return the most recent error encountered by First or Next.
627 func (lister *s3awsLister) Error() error {
628         return lister.err
629 }
630
631 func (lister *s3awsLister) getPage() {
632         lister.Stats.TickOps("list")
633         lister.Stats.Tick(&lister.Stats.Ops, &lister.Stats.ListOps)
634
635         var input *s3.ListObjectsV2Input
636         if lister.ContinuationToken == "" {
637                 input = &s3.ListObjectsV2Input{
638                         Bucket:  aws.String(lister.Bucket.bucket),
639                         MaxKeys: aws.Int64(int64(lister.PageSize)),
640                         Prefix:  aws.String(lister.Prefix),
641                 }
642         } else {
643                 input = &s3.ListObjectsV2Input{
644                         Bucket:            aws.String(lister.Bucket.bucket),
645                         MaxKeys:           aws.Int64(int64(lister.PageSize)),
646                         Prefix:            aws.String(lister.Prefix),
647                         ContinuationToken: &lister.ContinuationToken,
648                 }
649         }
650
651         req := lister.Bucket.svc.ListObjectsV2Request(input)
652         resp, err := req.Send(context.Background())
653         if err != nil {
654                 if aerr, ok := err.(awserr.Error); ok {
655                         lister.err = aerr
656                 } else {
657                         lister.err = err
658                 }
659                 return
660         }
661
662         if *resp.IsTruncated {
663                 lister.ContinuationToken = *resp.NextContinuationToken
664         } else {
665                 lister.ContinuationToken = ""
666         }
667         lister.buf = make([]s3.Object, 0, len(resp.Contents))
668         for _, key := range resp.Contents {
669                 if !strings.HasPrefix(*key.Key, lister.Prefix) {
670                         lister.Logger.Warnf("s3awsLister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, *key.Key)
671                         continue
672                 }
673                 lister.buf = append(lister.buf, key)
674         }
675 }
676
677 func (lister *s3awsLister) pop() (k *s3.Object) {
678         if len(lister.buf) > 0 {
679                 k = &lister.buf[0]
680                 lister.buf = lister.buf[1:]
681         }
682         return
683 }
684
685 // IndexTo writes a complete list of locators with the given prefix
686 // for which Get() can retrieve data.
687 func (v *S3AWSVolume) IndexTo(prefix string, writer io.Writer) error {
688         prefix = v.key(prefix)
689         // Use a merge sort to find matching sets of X and recent/X.
690         dataL := s3awsLister{
691                 Logger:   v.logger,
692                 Bucket:   v.bucket,
693                 Prefix:   prefix,
694                 PageSize: v.IndexPageSize,
695                 Stats:    &v.bucket.stats,
696         }
697         recentL := s3awsLister{
698                 Logger:   v.logger,
699                 Bucket:   v.bucket,
700                 Prefix:   "recent/" + prefix,
701                 PageSize: v.IndexPageSize,
702                 Stats:    &v.bucket.stats,
703         }
704         for data, recent := dataL.First(), recentL.First(); data != nil && dataL.Error() == nil; data = dataL.Next() {
705                 if *data.Key >= "g" {
706                         // Conveniently, "recent/*" and "trash/*" are
707                         // lexically greater than all hex-encoded data
708                         // hashes, so stopping here avoids iterating
709                         // over all of them needlessly with dataL.
710                         break
711                 }
712                 loc, isblk := v.isKeepBlock(*data.Key)
713                 if !isblk {
714                         continue
715                 }
716
717                 // stamp is the list entry we should use to report the
718                 // last-modified time for this data block: it will be
719                 // the recent/X entry if one exists, otherwise the
720                 // entry for the data block itself.
721                 stamp := data
722
723                 // Advance to the corresponding recent/X marker, if any
724                 for recent != nil && recentL.Error() == nil {
725                         if cmp := strings.Compare((*recent.Key)[7:], *data.Key); cmp < 0 {
726                                 recent = recentL.Next()
727                                 continue
728                         } else if cmp == 0 {
729                                 stamp = recent
730                                 recent = recentL.Next()
731                                 break
732                         } else {
733                                 // recent/X marker is missing: we'll
734                                 // use the timestamp on the data
735                                 // object.
736                                 break
737                         }
738                 }
739                 if err := recentL.Error(); err != nil {
740                         return err
741                 }
742                 // We truncate sub-second precision here. Otherwise
743                 // timestamps will never match the RFC1123-formatted
744                 // Last-Modified values parsed by Mtime().
745                 fmt.Fprintf(writer, "%s+%d %d\n", loc, *data.Size, stamp.LastModified.Unix()*1000000000)
746         }
747         return dataL.Error()
748 }
749
750 // Mtime returns the stored timestamp for the given locator.
751 func (v *S3AWSVolume) Mtime(loc string) (time.Time, error) {
752         key := v.key(loc)
753         _, err := v.head(key)
754         if err != nil {
755                 return s3AWSZeroTime, v.translateError(err)
756         }
757         resp, err := v.head("recent/" + key)
758         err = v.translateError(err)
759         if os.IsNotExist(err) {
760                 // The data object X exists, but recent/X is missing.
761                 err = v.writeObject(context.Background(), "recent/"+key, nil)
762                 if err != nil {
763                         v.logger.WithError(err).Errorf("error creating %q", "recent/"+key)
764                         return s3AWSZeroTime, v.translateError(err)
765                 }
766                 v.logger.Infof("Mtime: created %q to migrate existing block to new storage scheme", "recent/"+key)
767                 resp, err = v.head("recent/" + key)
768                 if err != nil {
769                         v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+key)
770                         return s3AWSZeroTime, v.translateError(err)
771                 }
772         } else if err != nil {
773                 // HEAD recent/X failed for some other reason.
774                 return s3AWSZeroTime, err
775         }
776         return *resp.LastModified, err
777 }
778
779 // Status returns a *VolumeStatus representing the current in-use
780 // storage capacity and a fake available capacity that doesn't make
781 // the volume seem full or nearly-full.
782 func (v *S3AWSVolume) Status() *VolumeStatus {
783         return &VolumeStatus{
784                 DeviceNum: 1,
785                 BytesFree: BlockSize * 1000,
786                 BytesUsed: 1,
787         }
788 }
789
790 // InternalStats returns bucket I/O and API call counters.
791 func (v *S3AWSVolume) InternalStats() interface{} {
792         return &v.bucket.stats
793 }
794
795 // Touch sets the timestamp for the given locator to the current time.
796 func (v *S3AWSVolume) Touch(loc string) error {
797         if v.volume.ReadOnly {
798                 return MethodDisabledError
799         }
800         key := v.key(loc)
801         _, err := v.head(key)
802         err = v.translateError(err)
803         if os.IsNotExist(err) && v.fixRace(key) {
804                 // The data object got trashed in a race, but fixRace
805                 // rescued it.
806         } else if err != nil {
807                 return err
808         }
809         err = v.writeObject(context.Background(), "recent/"+key, nil)
810         return v.translateError(err)
811 }
812
813 // checkRaceWindow returns a non-nil error if trash/key is, or might
814 // be, in the race window (i.e., it's not safe to trash key).
815 func (v *S3AWSVolume) checkRaceWindow(key string) error {
816         resp, err := v.head("trash/" + key)
817         err = v.translateError(err)
818         if os.IsNotExist(err) {
819                 // OK, trash/X doesn't exist so we're not in the race
820                 // window
821                 return nil
822         } else if err != nil {
823                 // Error looking up trash/X. We don't know whether
824                 // we're in the race window
825                 return err
826         }
827         t := resp.LastModified
828         safeWindow := t.Add(v.cluster.Collections.BlobTrashLifetime.Duration()).Sub(time.Now().Add(time.Duration(v.RaceWindow)))
829         if safeWindow <= 0 {
830                 // We can't count on "touch trash/X" to prolong
831                 // trash/X's lifetime. The new timestamp might not
832                 // become visible until now+raceWindow, and EmptyTrash
833                 // is allowed to delete trash/X before then.
834                 return fmt.Errorf("%s: same block is already in trash, and safe window ended %s ago", key, -safeWindow)
835         }
836         // trash/X exists, but it won't be eligible for deletion until
837         // after now+raceWindow, so it's safe to overwrite it.
838         return nil
839 }
840
841 func (b *s3AWSbucket) Del(path string) error {
842         input := &s3.DeleteObjectInput{
843                 Bucket: aws.String(b.bucket),
844                 Key:    aws.String(path),
845         }
846         req := b.svc.DeleteObjectRequest(input)
847         _, err := req.Send(context.Background())
848         b.stats.TickOps("delete")
849         b.stats.Tick(&b.stats.Ops, &b.stats.DelOps)
850         b.stats.TickErr(err)
851         return err
852 }
853
854 // Trash a Keep block.
855 func (v *S3AWSVolume) Trash(loc string) error {
856         if v.volume.ReadOnly {
857                 return MethodDisabledError
858         }
859         if t, err := v.Mtime(loc); err != nil {
860                 return err
861         } else if time.Since(t) < v.cluster.Collections.BlobSigningTTL.Duration() {
862                 return nil
863         }
864         key := v.key(loc)
865         if v.cluster.Collections.BlobTrashLifetime == 0 {
866                 if !v.UnsafeDelete {
867                         return ErrS3TrashDisabled
868                 }
869                 return v.translateError(v.bucket.Del(key))
870         }
871         err := v.checkRaceWindow(key)
872         if err != nil {
873                 return err
874         }
875         err = v.safeCopy("trash/"+key, key)
876         if err != nil {
877                 return err
878         }
879         return v.translateError(v.bucket.Del(key))
880 }
881
882 // Untrash moves block from trash back into store
883 func (v *S3AWSVolume) Untrash(loc string) error {
884         key := v.key(loc)
885         err := v.safeCopy(key, "trash/"+key)
886         if err != nil {
887                 return err
888         }
889         err = v.writeObject(context.Background(), "recent/"+key, nil)
890         return v.translateError(err)
891 }
892
893 type s3awsbucketStats struct {
894         statsTicker
895         Ops     uint64
896         GetOps  uint64
897         PutOps  uint64
898         HeadOps uint64
899         DelOps  uint64
900         ListOps uint64
901 }
902
903 func (s *s3awsbucketStats) TickErr(err error) {
904         if err == nil {
905                 return
906         }
907         errType := fmt.Sprintf("%T", err)
908         if aerr, ok := err.(awserr.Error); ok {
909                 if reqErr, ok := err.(awserr.RequestFailure); ok {
910                         // A service error occurred
911                         errType = errType + fmt.Sprintf(" %d %s", reqErr.StatusCode(), aerr.Code())
912                 } else {
913                         errType = errType + fmt.Sprintf(" 000 %s", aerr.Code())
914                 }
915         }
916         s.statsTicker.TickErr(err, errType)
917 }