8555: Implement trash with race-recovery for S3 volumes.
[arvados.git] / services / keepstore / s3_volume.go
1 package main
2
3 import (
4         "encoding/base64"
5         "encoding/hex"
6         "flag"
7         "fmt"
8         "io"
9         "log"
10         "net/http"
11         "os"
12         "regexp"
13         "strings"
14         "time"
15
16         "github.com/AdRoll/goamz/aws"
17         "github.com/AdRoll/goamz/s3"
18 )
19
20 var (
21         ErrS3TrashNotAvailable = fmt.Errorf("trash is not possible because -trash-lifetime=0 and -s3-unsafe-delete=false")
22
23         s3AccessKeyFile string
24         s3SecretKeyFile string
25         s3RegionName    string
26         s3Endpoint      string
27         s3Replication   int
28         s3UnsafeDelete  bool
29         s3RaceWindow    time.Duration
30
31         s3ACL = s3.Private
32 )
33
34 const (
35         maxClockSkew  = 600 * time.Second
36         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
37 )
38
39 type s3VolumeAdder struct {
40         *volumeSet
41 }
42
43 func (s *s3VolumeAdder) Set(bucketName string) error {
44         if trashLifetime != 0 {
45                 return ErrNotImplemented
46         }
47         if bucketName == "" {
48                 return fmt.Errorf("no container name given")
49         }
50         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
51                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
52         }
53         region, ok := aws.Regions[s3RegionName]
54         if s3Endpoint == "" {
55                 if !ok {
56                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", s3RegionName)
57                 }
58         } else {
59                 if ok {
60                         return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
61                                 "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", s3RegionName, s3Endpoint)
62                 }
63                 region = aws.Region{
64                         Name:       s3RegionName,
65                         S3Endpoint: s3Endpoint,
66                 }
67         }
68         var err error
69         var auth aws.Auth
70         auth.AccessKey, err = readKeyFromFile(s3AccessKeyFile)
71         if err != nil {
72                 return err
73         }
74         auth.SecretKey, err = readKeyFromFile(s3SecretKeyFile)
75         if err != nil {
76                 return err
77         }
78         if flagSerializeIO {
79                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
80         }
81         v := NewS3Volume(auth, region, bucketName, s3RaceWindow, flagReadonly, s3Replication)
82         if err := v.Check(); err != nil {
83                 return err
84         }
85         *s.volumeSet = append(*s.volumeSet, v)
86         return nil
87 }
88
89 func s3regions() (okList []string) {
90         for r := range aws.Regions {
91                 okList = append(okList, r)
92         }
93         return
94 }
95
96 func init() {
97         flag.Var(&s3VolumeAdder{&volumes},
98                 "s3-bucket-volume",
99                 "Use the given bucket as a storage volume. Can be given multiple times.")
100         flag.StringVar(
101                 &s3RegionName,
102                 "s3-region",
103                 "",
104                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
105         flag.StringVar(
106                 &s3Endpoint,
107                 "s3-endpoint",
108                 "",
109                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
110         flag.StringVar(
111                 &s3AccessKeyFile,
112                 "s3-access-key-file",
113                 "",
114                 "File containing the access key used for subsequent -s3-bucket-volume arguments.")
115         flag.StringVar(
116                 &s3SecretKeyFile,
117                 "s3-secret-key-file",
118                 "",
119                 "File containing the secret key used for subsequent -s3-bucket-volume arguments.")
120         flag.DurationVar(
121                 &s3RaceWindow,
122                 "s3-race-window",
123                 24*time.Hour,
124                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
125         flag.IntVar(
126                 &s3Replication,
127                 "s3-replication",
128                 2,
129                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
130         flag.BoolVar(
131                 &s3UnsafeDelete,
132                 "s3-unsafe-delete",
133                 false,
134                 "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
135 }
136
137 type S3Volume struct {
138         *s3.Bucket
139         raceWindow    time.Duration
140         readonly      bool
141         replication   int
142         indexPageSize int
143 }
144
145 // NewS3Volume returns a new S3Volume using the given auth, region,
146 // and bucket name. The replication argument specifies the replication
147 // level to report when writing data.
148 func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, raceWindow time.Duration, readonly bool, replication int) *S3Volume {
149         return &S3Volume{
150                 Bucket: &s3.Bucket{
151                         S3:   s3.New(auth, region),
152                         Name: bucket,
153                 },
154                 raceWindow:    raceWindow,
155                 readonly:      readonly,
156                 replication:   replication,
157                 indexPageSize: 1000,
158         }
159 }
160
161 func (v *S3Volume) Check() error {
162         return nil
163 }
164
165 func (v *S3Volume) getReaderWithFixRace(loc string) (rdr io.ReadCloser, err error) {
166         rdr, err = v.Bucket.GetReader(loc)
167         err = v.translateError(err)
168         if err == nil || !os.IsNotExist(err) {
169                 return
170         }
171         _, err = v.Bucket.Head("recent/"+loc, nil)
172         err = v.translateError(err)
173         if err != nil {
174                 // If we can't read recent/X, there's no point in
175                 // trying fixRace. Give up.
176                 return
177         }
178         if !v.fixRace(loc) {
179                 err = os.ErrNotExist
180                 return
181         }
182         rdr, err = v.Bucket.GetReader(loc)
183         if err != nil {
184                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
185                 err = v.translateError(err)
186         }
187         return
188 }
189
190 func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
191         rdr, err := v.getReaderWithFixRace(loc)
192         if err != nil {
193                 return 0, err
194         }
195         defer rdr.Close()
196         n, err := io.ReadFull(rdr, buf)
197         switch err {
198         case nil, io.EOF, io.ErrUnexpectedEOF:
199                 return n, nil
200         default:
201                 return 0, v.translateError(err)
202         }
203 }
204
205 func (v *S3Volume) Compare(loc string, expect []byte) error {
206         rdr, err := v.getReaderWithFixRace(loc)
207         if err != nil {
208                 return err
209         }
210         defer rdr.Close()
211         return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
212 }
213
214 func (v *S3Volume) Put(loc string, block []byte) error {
215         if v.readonly {
216                 return MethodDisabledError
217         }
218         var opts s3.Options
219         if len(block) > 0 {
220                 md5, err := hex.DecodeString(loc)
221                 if err != nil {
222                         return err
223                 }
224                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
225         }
226         err := v.Bucket.Put(loc, block, "application/octet-stream", s3ACL, opts)
227         if err != nil {
228                 return v.translateError(err)
229         }
230         err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
231         return v.translateError(err)
232 }
233
234 func (v *S3Volume) Touch(loc string) error {
235         if v.readonly {
236                 return MethodDisabledError
237         }
238         _, err := v.Bucket.Head(loc, nil)
239         err = v.translateError(err)
240         if os.IsNotExist(err) && v.fixRace(loc) {
241                 // The data object got trashed in a race, but fixRace
242                 // rescued it.
243         } else if err != nil {
244                 return err
245         }
246         err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
247         return v.translateError(err)
248 }
249
250 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
251         _, err := v.Bucket.Head(loc, nil)
252         if err != nil {
253                 return zeroTime, v.translateError(err)
254         }
255         resp, err := v.Bucket.Head("recent/"+loc, nil)
256         err = v.translateError(err)
257         if os.IsNotExist(err) {
258                 // The data object X exists, but recent/X is missing.
259                 err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
260                 if err != nil {
261                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
262                         return zeroTime, v.translateError(err)
263                 }
264                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
265                 resp, err = v.Bucket.Head("recent/"+loc, nil)
266                 if err != nil {
267                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
268                         return zeroTime, v.translateError(err)
269                 }
270         } else if err != nil {
271                 // HEAD recent/X failed for some other reason.
272                 return zeroTime, err
273         }
274         return v.lastModified(resp)
275 }
276
277 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
278         // Use a merge sort to find matching sets of X and recent/X.
279         dataL := s3Lister{
280                 Bucket:   v.Bucket,
281                 Prefix:   prefix,
282                 PageSize: v.indexPageSize,
283         }
284         recentL := s3Lister{
285                 Bucket:   v.Bucket,
286                 Prefix:   "recent/"+prefix,
287                 PageSize: v.indexPageSize,
288         }
289         for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
290                 if data.Key >= "g" {
291                         // Conveniently, "recent/*" and "trash/*" are
292                         // lexically greater than all hex-encoded data
293                         // hashes, so stopping here avoids iterating
294                         // over all of them needlessly with dataL.
295                         break
296                 }
297                 if !v.isKeepBlock(data.Key) {
298                         continue
299                 }
300
301                 // stamp is the list entry we should use to report the
302                 // last-modified time for this data block: it will be
303                 // the recent/X entry if one exists, otherwise the
304                 // entry for the data block itself.
305                 stamp := data
306
307                 // Advance to the corresponding recent/X marker, if any
308                 for recent != nil {
309                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
310                                 recent = recentL.Next()
311                                 continue
312                         } else if cmp == 0 {
313                                 stamp = recent
314                                 recent = recentL.Next()
315                                 break
316                         } else {
317                                 // recent/X marker is missing: we'll
318                                 // use the timestamp on the data
319                                 // object.
320                                 break
321                         }
322                 }
323                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
324                 if err != nil {
325                         return err
326                 }
327                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
328         }
329         return nil
330 }
331
332 // Trash a Keep block.
333 func (v *S3Volume) Trash(loc string) error {
334         if v.readonly {
335                 return MethodDisabledError
336         }
337         if t, err := v.Mtime(loc); err != nil {
338                 return err
339         } else if time.Since(t) < blobSignatureTTL {
340                 return nil
341         }
342         if trashLifetime == 0 {
343                 if !s3UnsafeDelete {
344                         return ErrS3TrashNotAvailable
345                 }
346                 return v.Bucket.Del(loc)
347         }
348
349         // Make sure we're not in the race window.
350         {
351                 resp, err := v.Bucket.Head("trash/"+loc, nil)
352                 err = v.translateError(err)
353                 if os.IsNotExist(err) {
354                         // OK, trash/X doesn't exist so we're not in
355                         // the race window
356                 } else if err != nil {
357                         // Error looking up trash/X. Unsafe to proceed
358                         // without knowing whether we're in the race
359                         // window
360                         return err
361                 } else if t, err := v.lastModified(resp); err != nil {
362                         // Can't parse timestamp
363                         return err
364                 } else if safeWindow := t.Add(trashLifetime).Sub(time.Now().Add(v.raceWindow)); safeWindow <= 0 {
365                         // We can't count on "touch trash/X" to
366                         // prolong trash/X's lifetime. The new
367                         // timestamp might not become visible until
368                         // now+raceWindow, and EmptyTrash is allowed
369                         // to delete trash/X before then.
370                         return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
371                 }
372         }
373
374         err := v.safeCopy("trash/"+loc, loc)
375         if err != nil {
376                 return err
377         }
378         return v.translateError(v.Bucket.Del(loc))
379 }
380
381 func (v *S3Volume) safeCopy(dst, src string) error {
382         resp, err := v.Bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
383                 ContentType:       "application/octet-stream",
384                 MetadataDirective: "REPLACE",
385         }, v.Bucket.Name+"/"+src)
386         err = v.translateError(err)
387         if err != nil {
388                 return err
389         }
390         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
391                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
392         } else if time.Now().Sub(t) > maxClockSkew {
393                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
394         }
395         return nil
396 }
397
398 // Get the LastModified header from resp, and parse it as RFC1123 or
399 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
400 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
401         s := resp.Header.Get("Last-Modified")
402         t, err = time.Parse(time.RFC1123, s)
403         if err != nil && s != "" {
404                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
405                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
406                 // as required by HTTP spec. If it's not a valid HTTP
407                 // header value, it's probably AWS (or s3test) giving
408                 // us a nearly-RFC1123 timestamp.
409                 t, err = time.Parse(nearlyRFC1123, s)
410         }
411         return
412 }
413
414 func (v *S3Volume) Untrash(loc string) error {
415         return v.safeCopy(loc, "trash/"+loc)
416 }
417
418 func (v *S3Volume) Status() *VolumeStatus {
419         return &VolumeStatus{
420                 DeviceNum: 1,
421                 BytesFree: BlockSize * 1000,
422                 BytesUsed: 1,
423         }
424 }
425
426 func (v *S3Volume) String() string {
427         return fmt.Sprintf("s3-bucket:%+q", v.Bucket.Name)
428 }
429
430 func (v *S3Volume) Writable() bool {
431         return !v.readonly
432 }
433 func (v *S3Volume) Replication() int {
434         return v.replication
435 }
436
437 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
438
439 func (v *S3Volume) isKeepBlock(s string) bool {
440         return s3KeepBlockRegexp.MatchString(s)
441 }
442
443 // fixRace(X) is called when "recent/X" exists but "X" doesn't
444 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
445 // there was a race between Put and Trash, fixRace recovers from the
446 // race by Untrashing the block.
447 func (v *S3Volume) fixRace(loc string) bool {
448         trash, err := v.Bucket.Head("trash/"+loc, nil)
449         if err != nil {
450                 if !os.IsNotExist(v.translateError(err)) {
451                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
452                 }
453                 return false
454         }
455         trashTime, err := v.lastModified(trash)
456         if err != nil {
457                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
458                 return false
459         }
460
461         recent, err := v.Bucket.Head("recent/"+loc, nil)
462         if err != nil {
463                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
464                 return false
465         }
466         recentTime, err := v.lastModified(recent)
467         if err != nil {
468                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
469                 return false
470         }
471
472         ageWhenTrashed := trashTime.Sub(recentTime)
473         if ageWhenTrashed >= blobSignatureTTL {
474                 // No evidence of a race: block hasn't been written
475                 // since it became eligible for Trash. No fix needed.
476                 return false
477         }
478
479         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, blobSignatureTTL)
480         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
481         err = v.safeCopy(loc, "trash/"+loc)
482         if err != nil {
483                 log.Printf("error: fixRace: %s", err)
484                 return false
485         }
486         return true
487 }
488
489 func (v *S3Volume) translateError(err error) error {
490         switch err := err.(type) {
491         case *s3.Error:
492                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
493                         strings.Contains(err.Error(), "Not Found") {
494                         return os.ErrNotExist
495                 }
496                 // Other 404 errors like NoSuchVersion and
497                 // NoSuchBucket are different problems which should
498                 // get called out downstream, so we don't convert them
499                 // to os.ErrNotExist.
500         }
501         return err
502 }
503
504 // EmptyTrash looks for trashed blocks that exceeded trashLifetime
505 // and deletes them from the volume.
506 func (v *S3Volume) EmptyTrash() {
507         // Use a merge sort to find matching sets of trash/X and recent/X.
508         trashL := s3Lister{
509                 Bucket:   v.Bucket,
510                 Prefix:   "trash/",
511                 PageSize: v.indexPageSize,
512         }
513         // Define "ready to delete" as "...when EmptyTrash started".
514         now := time.Now()
515         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
516                 loc := trash.Key[6:]
517                 if !v.isKeepBlock(loc) {
518                         continue
519                 }
520                 recent, err := v.Bucket.Head("recent/"+loc, nil)
521                 if err != nil {
522                         log.Printf("warning: %s: EmptyTrash: cannot delete trash %q with no corresponding recent/* marker", v, trash.Key)
523                         continue
524                 }
525                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
526                 if err != nil {
527                         continue
528                 }
529                 recentT, err := v.lastModified(recent)
530                 if err != nil {
531                         continue
532                 }
533                 if trashT.Sub(recentT) < blobSignatureTTL {
534                         v.fixRace(loc)
535                         continue
536                 }
537                 if now.Sub(trashT) < trashLifetime {
538                         continue
539                 }
540                 err = v.Bucket.Del(trash.Key)
541                 if err != nil {
542                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
543                         continue
544                 }
545                 _, err = v.Bucket.Head(loc, nil)
546                 if os.IsNotExist(err) {
547                         err = v.Bucket.Del("recent/"+loc)
548                         if err != nil {
549                                 log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
550                         }
551                 } else if err != nil {
552                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
553                 }
554         }
555         if err := trashL.Error(); err != nil {
556                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
557         }
558 }
559
560 type s3Lister struct {
561         Bucket      *s3.Bucket
562         Prefix      string
563         PageSize    int
564         nextMarker  string
565         buf         []s3.Key
566         err         error
567 }
568
569 // First fetches the first page and returns the first item. It returns
570 // nil if the response is the empty set or an error occurs.
571 func (lister *s3Lister) First() *s3.Key {
572         lister.getPage()
573         return lister.pop()
574 }
575
576 // Next returns the next item, fetching the next page if necessary. It
577 // returns nil if the last available item has already been fetched, or
578 // an error occurs.
579 func (lister *s3Lister) Next() *s3.Key {
580         if len(lister.buf) == 0 && lister.nextMarker != "" {
581                 lister.getPage()
582         }
583         return lister.pop()
584 }
585
586 // Return the most recent error encountered by First or Next.
587 func (lister *s3Lister) Error() error {
588         return lister.err
589 }
590
591 func (lister *s3Lister) getPage() {
592         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
593         lister.nextMarker = ""
594         if err != nil {
595                 lister.err = err
596                 return
597         }
598         if resp.IsTruncated {
599                 lister.nextMarker = resp.NextMarker
600         }
601         lister.buf = make([]s3.Key, 0, len(resp.Contents))
602         for _, key := range resp.Contents {
603                 if !strings.HasPrefix(key.Key, lister.Prefix) {
604                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
605                         continue
606                 }
607                 lister.buf = append(lister.buf, key)
608         }
609 }
610
611 func (lister *s3Lister) pop() (k *s3.Key) {
612         if len(lister.buf) > 0 {
613                 k = &lister.buf[0]
614                 lister.buf = lister.buf[1:]
615         }
616         return
617 }