8555: Move checkRaceWindow out to a func, tweak comments
[arvados.git] / services / keepstore / s3_volume.go
1 package main
2
3 import (
4         "encoding/base64"
5         "encoding/hex"
6         "flag"
7         "fmt"
8         "io"
9         "log"
10         "net/http"
11         "os"
12         "regexp"
13         "strings"
14         "time"
15
16         "github.com/AdRoll/goamz/aws"
17         "github.com/AdRoll/goamz/s3"
18 )
19
20 var (
21         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
22
23         s3AccessKeyFile string
24         s3SecretKeyFile string
25         s3RegionName    string
26         s3Endpoint      string
27         s3Replication   int
28         s3UnsafeDelete  bool
29         s3RaceWindow    time.Duration
30
31         s3ACL = s3.Private
32 )
33
34 const (
35         maxClockSkew  = 600 * time.Second
36         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
37 )
38
39 type s3VolumeAdder struct {
40         *volumeSet
41 }
42
43 func (s *s3VolumeAdder) Set(bucketName string) error {
44         if trashLifetime != 0 {
45                 return ErrNotImplemented
46         }
47         if bucketName == "" {
48                 return fmt.Errorf("no container name given")
49         }
50         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
51                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
52         }
53         region, ok := aws.Regions[s3RegionName]
54         if s3Endpoint == "" {
55                 if !ok {
56                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", s3RegionName)
57                 }
58         } else {
59                 if ok {
60                         return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
61                                 "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", s3RegionName, s3Endpoint)
62                 }
63                 region = aws.Region{
64                         Name:       s3RegionName,
65                         S3Endpoint: s3Endpoint,
66                 }
67         }
68         var err error
69         var auth aws.Auth
70         auth.AccessKey, err = readKeyFromFile(s3AccessKeyFile)
71         if err != nil {
72                 return err
73         }
74         auth.SecretKey, err = readKeyFromFile(s3SecretKeyFile)
75         if err != nil {
76                 return err
77         }
78         if flagSerializeIO {
79                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
80         }
81         v := NewS3Volume(auth, region, bucketName, s3RaceWindow, flagReadonly, s3Replication)
82         if err := v.Check(); err != nil {
83                 return err
84         }
85         *s.volumeSet = append(*s.volumeSet, v)
86         return nil
87 }
88
89 func s3regions() (okList []string) {
90         for r := range aws.Regions {
91                 okList = append(okList, r)
92         }
93         return
94 }
95
96 func init() {
97         flag.Var(&s3VolumeAdder{&volumes},
98                 "s3-bucket-volume",
99                 "Use the given bucket as a storage volume. Can be given multiple times.")
100         flag.StringVar(
101                 &s3RegionName,
102                 "s3-region",
103                 "",
104                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
105         flag.StringVar(
106                 &s3Endpoint,
107                 "s3-endpoint",
108                 "",
109                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
110         flag.StringVar(
111                 &s3AccessKeyFile,
112                 "s3-access-key-file",
113                 "",
114                 "File containing the access key used for subsequent -s3-bucket-volume arguments.")
115         flag.StringVar(
116                 &s3SecretKeyFile,
117                 "s3-secret-key-file",
118                 "",
119                 "File containing the secret key used for subsequent -s3-bucket-volume arguments.")
120         flag.DurationVar(
121                 &s3RaceWindow,
122                 "s3-race-window",
123                 24*time.Hour,
124                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
125         flag.IntVar(
126                 &s3Replication,
127                 "s3-replication",
128                 2,
129                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
130         flag.BoolVar(
131                 &s3UnsafeDelete,
132                 "s3-unsafe-delete",
133                 false,
134                 "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
135 }
136
137 type S3Volume struct {
138         *s3.Bucket
139         raceWindow    time.Duration
140         readonly      bool
141         replication   int
142         indexPageSize int
143 }
144
145 // NewS3Volume returns a new S3Volume using the given auth, region,
146 // and bucket name. The replication argument specifies the replication
147 // level to report when writing data.
148 func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, raceWindow time.Duration, readonly bool, replication int) *S3Volume {
149         return &S3Volume{
150                 Bucket: &s3.Bucket{
151                         S3:   s3.New(auth, region),
152                         Name: bucket,
153                 },
154                 raceWindow:    raceWindow,
155                 readonly:      readonly,
156                 replication:   replication,
157                 indexPageSize: 1000,
158         }
159 }
160
161 func (v *S3Volume) Check() error {
162         return nil
163 }
164
165 // getReader wraps (Bucket)GetReader.
166 //
167 // In situations where (Bucket)GetReader would fail because the block
168 // disappeared in a Trash race, getReader calls fixRace to recover the
169 // data, and tries again.
170 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
171         rdr, err = v.Bucket.GetReader(loc)
172         err = v.translateError(err)
173         if err == nil || !os.IsNotExist(err) {
174                 return
175         }
176         _, err = v.Bucket.Head("recent/"+loc, nil)
177         err = v.translateError(err)
178         if err != nil {
179                 // If we can't read recent/X, there's no point in
180                 // trying fixRace. Give up.
181                 return
182         }
183         if !v.fixRace(loc) {
184                 err = os.ErrNotExist
185                 return
186         }
187         rdr, err = v.Bucket.GetReader(loc)
188         if err != nil {
189                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
190                 err = v.translateError(err)
191         }
192         return
193 }
194
195 func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
196         rdr, err := v.getReader(loc)
197         if err != nil {
198                 return 0, err
199         }
200         defer rdr.Close()
201         n, err := io.ReadFull(rdr, buf)
202         switch err {
203         case nil, io.EOF, io.ErrUnexpectedEOF:
204                 return n, nil
205         default:
206                 return 0, v.translateError(err)
207         }
208 }
209
210 func (v *S3Volume) Compare(loc string, expect []byte) error {
211         rdr, err := v.getReader(loc)
212         if err != nil {
213                 return err
214         }
215         defer rdr.Close()
216         return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
217 }
218
219 func (v *S3Volume) Put(loc string, block []byte) error {
220         if v.readonly {
221                 return MethodDisabledError
222         }
223         var opts s3.Options
224         if len(block) > 0 {
225                 md5, err := hex.DecodeString(loc)
226                 if err != nil {
227                         return err
228                 }
229                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
230         }
231         err := v.Bucket.Put(loc, block, "application/octet-stream", s3ACL, opts)
232         if err != nil {
233                 return v.translateError(err)
234         }
235         err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
236         return v.translateError(err)
237 }
238
239 func (v *S3Volume) Touch(loc string) error {
240         if v.readonly {
241                 return MethodDisabledError
242         }
243         _, err := v.Bucket.Head(loc, nil)
244         err = v.translateError(err)
245         if os.IsNotExist(err) && v.fixRace(loc) {
246                 // The data object got trashed in a race, but fixRace
247                 // rescued it.
248         } else if err != nil {
249                 return err
250         }
251         err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
252         return v.translateError(err)
253 }
254
255 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
256         _, err := v.Bucket.Head(loc, nil)
257         if err != nil {
258                 return zeroTime, v.translateError(err)
259         }
260         resp, err := v.Bucket.Head("recent/"+loc, nil)
261         err = v.translateError(err)
262         if os.IsNotExist(err) {
263                 // The data object X exists, but recent/X is missing.
264                 err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
265                 if err != nil {
266                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
267                         return zeroTime, v.translateError(err)
268                 }
269                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
270                 resp, err = v.Bucket.Head("recent/"+loc, nil)
271                 if err != nil {
272                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
273                         return zeroTime, v.translateError(err)
274                 }
275         } else if err != nil {
276                 // HEAD recent/X failed for some other reason.
277                 return zeroTime, err
278         }
279         return v.lastModified(resp)
280 }
281
282 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
283         // Use a merge sort to find matching sets of X and recent/X.
284         dataL := s3Lister{
285                 Bucket:   v.Bucket,
286                 Prefix:   prefix,
287                 PageSize: v.indexPageSize,
288         }
289         recentL := s3Lister{
290                 Bucket:   v.Bucket,
291                 Prefix:   "recent/"+prefix,
292                 PageSize: v.indexPageSize,
293         }
294         for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
295                 if data.Key >= "g" {
296                         // Conveniently, "recent/*" and "trash/*" are
297                         // lexically greater than all hex-encoded data
298                         // hashes, so stopping here avoids iterating
299                         // over all of them needlessly with dataL.
300                         break
301                 }
302                 if !v.isKeepBlock(data.Key) {
303                         continue
304                 }
305
306                 // stamp is the list entry we should use to report the
307                 // last-modified time for this data block: it will be
308                 // the recent/X entry if one exists, otherwise the
309                 // entry for the data block itself.
310                 stamp := data
311
312                 // Advance to the corresponding recent/X marker, if any
313                 for recent != nil {
314                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
315                                 recent = recentL.Next()
316                                 continue
317                         } else if cmp == 0 {
318                                 stamp = recent
319                                 recent = recentL.Next()
320                                 break
321                         } else {
322                                 // recent/X marker is missing: we'll
323                                 // use the timestamp on the data
324                                 // object.
325                                 break
326                         }
327                 }
328                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
329                 if err != nil {
330                         return err
331                 }
332                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
333         }
334         return nil
335 }
336
337 // Trash a Keep block.
338 func (v *S3Volume) Trash(loc string) error {
339         if v.readonly {
340                 return MethodDisabledError
341         }
342         if t, err := v.Mtime(loc); err != nil {
343                 return err
344         } else if time.Since(t) < blobSignatureTTL {
345                 return nil
346         }
347         if trashLifetime == 0 {
348                 if !s3UnsafeDelete {
349                         return ErrS3TrashDisabled
350                 }
351                 return v.Bucket.Del(loc)
352         }
353         err := v.checkRaceWindow(loc)
354         if err != nil {
355                 return err
356         }
357         err = v.safeCopy("trash/"+loc, loc)
358         if err != nil {
359                 return err
360         }
361         return v.translateError(v.Bucket.Del(loc))
362 }
363
364 // checkRaceWindow returns a non-nil error if trash/loc is, or might
365 // be, in the race window (i.e., it's not safe to trash loc).
366 func (v *S3Volume) checkRaceWindow(loc string) error {
367         resp, err := v.Bucket.Head("trash/"+loc, nil)
368         err = v.translateError(err)
369         if os.IsNotExist(err) {
370                 // OK, trash/X doesn't exist so we're not in the race
371                 // window
372                 return nil
373         } else if err != nil {
374                 // Error looking up trash/X. We don't know whether
375                 // we're in the race window
376                 return err
377         }
378         t, err := v.lastModified(resp)
379         if err != nil {
380                 // Can't parse timestamp
381                 return err
382         }
383         safeWindow := t.Add(trashLifetime).Sub(time.Now().Add(v.raceWindow))
384         if safeWindow <= 0 {
385                 // We can't count on "touch trash/X" to prolong
386                 // trash/X's lifetime. The new timestamp might not
387                 // become visible until now+raceWindow, and EmptyTrash
388                 // is allowed to delete trash/X before then.
389                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
390         }
391         // trash/X exists, but it won't be eligible for deletion until
392         // after now+raceWindow, so it's safe to overwrite it.
393         return nil
394 }
395
396 func (v *S3Volume) safeCopy(dst, src string) error {
397         resp, err := v.Bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
398                 ContentType:       "application/octet-stream",
399                 MetadataDirective: "REPLACE",
400         }, v.Bucket.Name+"/"+src)
401         err = v.translateError(err)
402         if err != nil {
403                 return err
404         }
405         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
406                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
407         } else if time.Now().Sub(t) > maxClockSkew {
408                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
409         }
410         return nil
411 }
412
413 // Get the LastModified header from resp, and parse it as RFC1123 or
414 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
415 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
416         s := resp.Header.Get("Last-Modified")
417         t, err = time.Parse(time.RFC1123, s)
418         if err != nil && s != "" {
419                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
420                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
421                 // as required by HTTP spec. If it's not a valid HTTP
422                 // header value, it's probably AWS (or s3test) giving
423                 // us a nearly-RFC1123 timestamp.
424                 t, err = time.Parse(nearlyRFC1123, s)
425         }
426         return
427 }
428
429 func (v *S3Volume) Untrash(loc string) error {
430         return v.safeCopy(loc, "trash/"+loc)
431 }
432
433 func (v *S3Volume) Status() *VolumeStatus {
434         return &VolumeStatus{
435                 DeviceNum: 1,
436                 BytesFree: BlockSize * 1000,
437                 BytesUsed: 1,
438         }
439 }
440
441 func (v *S3Volume) String() string {
442         return fmt.Sprintf("s3-bucket:%+q", v.Bucket.Name)
443 }
444
445 func (v *S3Volume) Writable() bool {
446         return !v.readonly
447 }
448 func (v *S3Volume) Replication() int {
449         return v.replication
450 }
451
452 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
453
454 func (v *S3Volume) isKeepBlock(s string) bool {
455         return s3KeepBlockRegexp.MatchString(s)
456 }
457
458 // fixRace(X) is called when "recent/X" exists but "X" doesn't
459 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
460 // there was a race between Put and Trash, fixRace recovers from the
461 // race by Untrashing the block.
462 func (v *S3Volume) fixRace(loc string) bool {
463         trash, err := v.Bucket.Head("trash/"+loc, nil)
464         if err != nil {
465                 if !os.IsNotExist(v.translateError(err)) {
466                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
467                 }
468                 return false
469         }
470         trashTime, err := v.lastModified(trash)
471         if err != nil {
472                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
473                 return false
474         }
475
476         recent, err := v.Bucket.Head("recent/"+loc, nil)
477         if err != nil {
478                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
479                 return false
480         }
481         recentTime, err := v.lastModified(recent)
482         if err != nil {
483                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
484                 return false
485         }
486
487         ageWhenTrashed := trashTime.Sub(recentTime)
488         if ageWhenTrashed >= blobSignatureTTL {
489                 // No evidence of a race: block hasn't been written
490                 // since it became eligible for Trash. No fix needed.
491                 return false
492         }
493
494         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, blobSignatureTTL)
495         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
496         err = v.safeCopy(loc, "trash/"+loc)
497         if err != nil {
498                 log.Printf("error: fixRace: %s", err)
499                 return false
500         }
501         return true
502 }
503
504 func (v *S3Volume) translateError(err error) error {
505         switch err := err.(type) {
506         case *s3.Error:
507                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
508                         strings.Contains(err.Error(), "Not Found") {
509                         return os.ErrNotExist
510                 }
511                 // Other 404 errors like NoSuchVersion and
512                 // NoSuchBucket are different problems which should
513                 // get called out downstream, so we don't convert them
514                 // to os.ErrNotExist.
515         }
516         return err
517 }
518
519 // EmptyTrash looks for trashed blocks that exceeded trashLifetime
520 // and deletes them from the volume.
521 func (v *S3Volume) EmptyTrash() {
522         // Use a merge sort to find matching sets of trash/X and recent/X.
523         trashL := s3Lister{
524                 Bucket:   v.Bucket,
525                 Prefix:   "trash/",
526                 PageSize: v.indexPageSize,
527         }
528         // Define "ready to delete" as "...when EmptyTrash started".
529         now := time.Now()
530         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
531                 loc := trash.Key[6:]
532                 if !v.isKeepBlock(loc) {
533                         continue
534                 }
535                 recent, err := v.Bucket.Head("recent/"+loc, nil)
536                 if err != nil {
537                         log.Printf("warning: %s: EmptyTrash: cannot delete trash %q with no corresponding recent/* marker", v, trash.Key)
538                         continue
539                 }
540                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
541                 if err != nil {
542                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
543                         continue
544                 }
545                 recentT, err := v.lastModified(recent)
546                 if err != nil {
547                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
548                         continue
549                 }
550                 if trashT.Sub(recentT) < blobSignatureTTL {
551                         v.fixRace(loc)
552                         continue
553                 }
554                 if now.Sub(trashT) < trashLifetime {
555                         continue
556                 }
557                 err = v.Bucket.Del(trash.Key)
558                 if err != nil {
559                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
560                         continue
561                 }
562                 _, err = v.Bucket.Head(loc, nil)
563                 if os.IsNotExist(err) {
564                         err = v.Bucket.Del("recent/"+loc)
565                         if err != nil {
566                                 log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
567                         }
568                 } else if err != nil {
569                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
570                 }
571         }
572         if err := trashL.Error(); err != nil {
573                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
574         }
575 }
576
577 type s3Lister struct {
578         Bucket      *s3.Bucket
579         Prefix      string
580         PageSize    int
581         nextMarker  string
582         buf         []s3.Key
583         err         error
584 }
585
586 // First fetches the first page and returns the first item. It returns
587 // nil if the response is the empty set or an error occurs.
588 func (lister *s3Lister) First() *s3.Key {
589         lister.getPage()
590         return lister.pop()
591 }
592
593 // Next returns the next item, fetching the next page if necessary. It
594 // returns nil if the last available item has already been fetched, or
595 // an error occurs.
596 func (lister *s3Lister) Next() *s3.Key {
597         if len(lister.buf) == 0 && lister.nextMarker != "" {
598                 lister.getPage()
599         }
600         return lister.pop()
601 }
602
603 // Return the most recent error encountered by First or Next.
604 func (lister *s3Lister) Error() error {
605         return lister.err
606 }
607
608 func (lister *s3Lister) getPage() {
609         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
610         lister.nextMarker = ""
611         if err != nil {
612                 lister.err = err
613                 return
614         }
615         if resp.IsTruncated {
616                 lister.nextMarker = resp.NextMarker
617         }
618         lister.buf = make([]s3.Key, 0, len(resp.Contents))
619         for _, key := range resp.Contents {
620                 if !strings.HasPrefix(key.Key, lister.Prefix) {
621                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
622                         continue
623                 }
624                 lister.buf = append(lister.buf, key)
625         }
626 }
627
628 func (lister *s3Lister) pop() (k *s3.Key) {
629         if len(lister.buf) > 0 {
630                 k = &lister.buf[0]
631                 lister.buf = lister.buf[1:]
632         }
633         return
634 }