8555: gofmt
[arvados.git] / services / keepstore / s3_volume.go
1 package main
2
3 import (
4         "encoding/base64"
5         "encoding/hex"
6         "flag"
7         "fmt"
8         "io"
9         "log"
10         "net/http"
11         "os"
12         "regexp"
13         "strings"
14         "time"
15
16         "github.com/AdRoll/goamz/aws"
17         "github.com/AdRoll/goamz/s3"
18 )
19
20 var (
21         ErrS3TrashDisabled = fmt.Errorf("trash function is disabled because -trash-lifetime=0 and -s3-unsafe-delete=false")
22
23         s3AccessKeyFile string
24         s3SecretKeyFile string
25         s3RegionName    string
26         s3Endpoint      string
27         s3Replication   int
28         s3UnsafeDelete  bool
29         s3RaceWindow    time.Duration
30
31         s3ACL = s3.Private
32 )
33
34 const (
35         maxClockSkew  = 600 * time.Second
36         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
37 )
38
39 type s3VolumeAdder struct {
40         *volumeSet
41 }
42
43 func (s *s3VolumeAdder) Set(bucketName string) error {
44         if trashLifetime != 0 {
45                 return ErrNotImplemented
46         }
47         if bucketName == "" {
48                 return fmt.Errorf("no container name given")
49         }
50         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
51                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
52         }
53         region, ok := aws.Regions[s3RegionName]
54         if s3Endpoint == "" {
55                 if !ok {
56                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", s3RegionName)
57                 }
58         } else {
59                 if ok {
60                         return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
61                                 "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", s3RegionName, s3Endpoint)
62                 }
63                 region = aws.Region{
64                         Name:       s3RegionName,
65                         S3Endpoint: s3Endpoint,
66                 }
67         }
68         var err error
69         var auth aws.Auth
70         auth.AccessKey, err = readKeyFromFile(s3AccessKeyFile)
71         if err != nil {
72                 return err
73         }
74         auth.SecretKey, err = readKeyFromFile(s3SecretKeyFile)
75         if err != nil {
76                 return err
77         }
78         if flagSerializeIO {
79                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
80         }
81         v := NewS3Volume(auth, region, bucketName, s3RaceWindow, flagReadonly, s3Replication)
82         if err := v.Check(); err != nil {
83                 return err
84         }
85         *s.volumeSet = append(*s.volumeSet, v)
86         return nil
87 }
88
89 func s3regions() (okList []string) {
90         for r := range aws.Regions {
91                 okList = append(okList, r)
92         }
93         return
94 }
95
96 func init() {
97         flag.Var(&s3VolumeAdder{&volumes},
98                 "s3-bucket-volume",
99                 "Use the given bucket as a storage volume. Can be given multiple times.")
100         flag.StringVar(
101                 &s3RegionName,
102                 "s3-region",
103                 "",
104                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
105         flag.StringVar(
106                 &s3Endpoint,
107                 "s3-endpoint",
108                 "",
109                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
110         flag.StringVar(
111                 &s3AccessKeyFile,
112                 "s3-access-key-file",
113                 "",
114                 "File containing the access key used for subsequent -s3-bucket-volume arguments.")
115         flag.StringVar(
116                 &s3SecretKeyFile,
117                 "s3-secret-key-file",
118                 "",
119                 "File containing the secret key used for subsequent -s3-bucket-volume arguments.")
120         flag.DurationVar(
121                 &s3RaceWindow,
122                 "s3-race-window",
123                 24*time.Hour,
124                 "Maximum eventual consistency latency for subsequent -s3-bucket-volume arguments.")
125         flag.IntVar(
126                 &s3Replication,
127                 "s3-replication",
128                 2,
129                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
130         flag.BoolVar(
131                 &s3UnsafeDelete,
132                 "s3-unsafe-delete",
133                 false,
134                 "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
135 }
136
137 type S3Volume struct {
138         *s3.Bucket
139         raceWindow    time.Duration
140         readonly      bool
141         replication   int
142         indexPageSize int
143 }
144
145 // NewS3Volume returns a new S3Volume using the given auth, region,
146 // and bucket name. The replication argument specifies the replication
147 // level to report when writing data.
148 func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, raceWindow time.Duration, readonly bool, replication int) *S3Volume {
149         return &S3Volume{
150                 Bucket: &s3.Bucket{
151                         S3:   s3.New(auth, region),
152                         Name: bucket,
153                 },
154                 raceWindow:    raceWindow,
155                 readonly:      readonly,
156                 replication:   replication,
157                 indexPageSize: 1000,
158         }
159 }
160
161 func (v *S3Volume) Check() error {
162         return nil
163 }
164
165 // getReader wraps (Bucket)GetReader.
166 //
167 // In situations where (Bucket)GetReader would fail because the block
168 // disappeared in a Trash race, getReader calls fixRace to recover the
169 // data, and tries again.
170 func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
171         rdr, err = v.Bucket.GetReader(loc)
172         err = v.translateError(err)
173         if err == nil || !os.IsNotExist(err) {
174                 return
175         }
176         _, err = v.Bucket.Head("recent/"+loc, nil)
177         err = v.translateError(err)
178         if err != nil {
179                 // If we can't read recent/X, there's no point in
180                 // trying fixRace. Give up.
181                 return
182         }
183         if !v.fixRace(loc) {
184                 err = os.ErrNotExist
185                 return
186         }
187         rdr, err = v.Bucket.GetReader(loc)
188         if err != nil {
189                 log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
190                 err = v.translateError(err)
191         }
192         return
193 }
194
195 func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
196         rdr, err := v.getReader(loc)
197         if err != nil {
198                 return 0, err
199         }
200         defer rdr.Close()
201         n, err := io.ReadFull(rdr, buf)
202         switch err {
203         case nil, io.EOF, io.ErrUnexpectedEOF:
204                 return n, nil
205         default:
206                 return 0, v.translateError(err)
207         }
208 }
209
210 func (v *S3Volume) Compare(loc string, expect []byte) error {
211         rdr, err := v.getReader(loc)
212         if err != nil {
213                 return err
214         }
215         defer rdr.Close()
216         return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
217 }
218
219 func (v *S3Volume) Put(loc string, block []byte) error {
220         if v.readonly {
221                 return MethodDisabledError
222         }
223         var opts s3.Options
224         if len(block) > 0 {
225                 md5, err := hex.DecodeString(loc)
226                 if err != nil {
227                         return err
228                 }
229                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
230         }
231         err := v.Bucket.Put(loc, block, "application/octet-stream", s3ACL, opts)
232         if err != nil {
233                 return v.translateError(err)
234         }
235         err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
236         return v.translateError(err)
237 }
238
239 func (v *S3Volume) Touch(loc string) error {
240         if v.readonly {
241                 return MethodDisabledError
242         }
243         _, err := v.Bucket.Head(loc, nil)
244         err = v.translateError(err)
245         if os.IsNotExist(err) && v.fixRace(loc) {
246                 // The data object got trashed in a race, but fixRace
247                 // rescued it.
248         } else if err != nil {
249                 return err
250         }
251         err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
252         return v.translateError(err)
253 }
254
255 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
256         _, err := v.Bucket.Head(loc, nil)
257         if err != nil {
258                 return zeroTime, v.translateError(err)
259         }
260         resp, err := v.Bucket.Head("recent/"+loc, nil)
261         err = v.translateError(err)
262         if os.IsNotExist(err) {
263                 // The data object X exists, but recent/X is missing.
264                 err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
265                 if err != nil {
266                         log.Printf("error: creating %q: %s", "recent/"+loc, err)
267                         return zeroTime, v.translateError(err)
268                 }
269                 log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
270                 resp, err = v.Bucket.Head("recent/"+loc, nil)
271                 if err != nil {
272                         log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
273                         return zeroTime, v.translateError(err)
274                 }
275         } else if err != nil {
276                 // HEAD recent/X failed for some other reason.
277                 return zeroTime, err
278         }
279         return v.lastModified(resp)
280 }
281
282 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
283         // Use a merge sort to find matching sets of X and recent/X.
284         dataL := s3Lister{
285                 Bucket:   v.Bucket,
286                 Prefix:   prefix,
287                 PageSize: v.indexPageSize,
288         }
289         recentL := s3Lister{
290                 Bucket:   v.Bucket,
291                 Prefix:   "recent/" + prefix,
292                 PageSize: v.indexPageSize,
293         }
294         for data, recent := dataL.First(), recentL.First(); data != nil; data = dataL.Next() {
295                 if data.Key >= "g" {
296                         // Conveniently, "recent/*" and "trash/*" are
297                         // lexically greater than all hex-encoded data
298                         // hashes, so stopping here avoids iterating
299                         // over all of them needlessly with dataL.
300                         break
301                 }
302                 if !v.isKeepBlock(data.Key) {
303                         continue
304                 }
305
306                 // stamp is the list entry we should use to report the
307                 // last-modified time for this data block: it will be
308                 // the recent/X entry if one exists, otherwise the
309                 // entry for the data block itself.
310                 stamp := data
311
312                 // Advance to the corresponding recent/X marker, if any
313                 for recent != nil {
314                         if cmp := strings.Compare(recent.Key[7:], data.Key); cmp < 0 {
315                                 recent = recentL.Next()
316                                 continue
317                         } else if cmp == 0 {
318                                 stamp = recent
319                                 recent = recentL.Next()
320                                 break
321                         } else {
322                                 // recent/X marker is missing: we'll
323                                 // use the timestamp on the data
324                                 // object.
325                                 break
326                         }
327                 }
328                 t, err := time.Parse(time.RFC3339, stamp.LastModified)
329                 if err != nil {
330                         return err
331                 }
332                 fmt.Fprintf(writer, "%s+%d %d\n", data.Key, data.Size, t.UnixNano())
333         }
334         return nil
335 }
336
337 // Trash a Keep block.
338 func (v *S3Volume) Trash(loc string) error {
339         if v.readonly {
340                 return MethodDisabledError
341         }
342         if t, err := v.Mtime(loc); err != nil {
343                 return err
344         } else if time.Since(t) < blobSignatureTTL {
345                 return nil
346         }
347         if trashLifetime == 0 {
348                 if !s3UnsafeDelete {
349                         return ErrS3TrashDisabled
350                 }
351                 return v.Bucket.Del(loc)
352         }
353         err := v.checkRaceWindow(loc)
354         if err != nil {
355                 return err
356         }
357         err = v.safeCopy("trash/"+loc, loc)
358         if err != nil {
359                 return err
360         }
361         return v.translateError(v.Bucket.Del(loc))
362 }
363
364 // checkRaceWindow returns a non-nil error if trash/loc is, or might
365 // be, in the race window (i.e., it's not safe to trash loc).
366 func (v *S3Volume) checkRaceWindow(loc string) error {
367         resp, err := v.Bucket.Head("trash/"+loc, nil)
368         err = v.translateError(err)
369         if os.IsNotExist(err) {
370                 // OK, trash/X doesn't exist so we're not in the race
371                 // window
372                 return nil
373         } else if err != nil {
374                 // Error looking up trash/X. We don't know whether
375                 // we're in the race window
376                 return err
377         }
378         t, err := v.lastModified(resp)
379         if err != nil {
380                 // Can't parse timestamp
381                 return err
382         }
383         safeWindow := t.Add(trashLifetime).Sub(time.Now().Add(v.raceWindow))
384         if safeWindow <= 0 {
385                 // We can't count on "touch trash/X" to prolong
386                 // trash/X's lifetime. The new timestamp might not
387                 // become visible until now+raceWindow, and EmptyTrash
388                 // is allowed to delete trash/X before then.
389                 return fmt.Errorf("same block is already in trash, and safe window ended %s ago", -safeWindow)
390         }
391         // trash/X exists, but it won't be eligible for deletion until
392         // after now+raceWindow, so it's safe to overwrite it.
393         return nil
394 }
395
396 func (v *S3Volume) safeCopy(dst, src string) error {
397         resp, err := v.Bucket.PutCopy(dst, s3ACL, s3.CopyOptions{
398                 ContentType:       "application/octet-stream",
399                 MetadataDirective: "REPLACE",
400         }, v.Bucket.Name+"/"+src)
401         err = v.translateError(err)
402         if err != nil {
403                 return err
404         }
405         if t, err := time.Parse(time.RFC3339Nano, resp.LastModified); err != nil {
406                 return fmt.Errorf("PutCopy succeeded but did not return a timestamp: %q: %s", resp.LastModified, err)
407         } else if time.Now().Sub(t) > maxClockSkew {
408                 return fmt.Errorf("PutCopy succeeded but returned an old timestamp: %q: %s", resp.LastModified, t)
409         }
410         return nil
411 }
412
413 // Get the LastModified header from resp, and parse it as RFC1123 or
414 // -- if it isn't valid RFC1123 -- as Amazon's variant of RFC1123.
415 func (v *S3Volume) lastModified(resp *http.Response) (t time.Time, err error) {
416         s := resp.Header.Get("Last-Modified")
417         t, err = time.Parse(time.RFC1123, s)
418         if err != nil && s != "" {
419                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
420                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
421                 // as required by HTTP spec. If it's not a valid HTTP
422                 // header value, it's probably AWS (or s3test) giving
423                 // us a nearly-RFC1123 timestamp.
424                 t, err = time.Parse(nearlyRFC1123, s)
425         }
426         return
427 }
428
429 func (v *S3Volume) Untrash(loc string) error {
430         err := v.safeCopy(loc, "trash/"+loc)
431         if err != nil {
432                 return err
433         }
434         err = v.Bucket.Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
435         return v.translateError(err)
436 }
437
438 func (v *S3Volume) Status() *VolumeStatus {
439         return &VolumeStatus{
440                 DeviceNum: 1,
441                 BytesFree: BlockSize * 1000,
442                 BytesUsed: 1,
443         }
444 }
445
446 func (v *S3Volume) String() string {
447         return fmt.Sprintf("s3-bucket:%+q", v.Bucket.Name)
448 }
449
450 func (v *S3Volume) Writable() bool {
451         return !v.readonly
452 }
453 func (v *S3Volume) Replication() int {
454         return v.replication
455 }
456
457 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
458
459 func (v *S3Volume) isKeepBlock(s string) bool {
460         return s3KeepBlockRegexp.MatchString(s)
461 }
462
463 // fixRace(X) is called when "recent/X" exists but "X" doesn't
464 // exist. If the timestamps on "recent/"+loc and "trash/"+loc indicate
465 // there was a race between Put and Trash, fixRace recovers from the
466 // race by Untrashing the block.
467 func (v *S3Volume) fixRace(loc string) bool {
468         trash, err := v.Bucket.Head("trash/"+loc, nil)
469         if err != nil {
470                 if !os.IsNotExist(v.translateError(err)) {
471                         log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
472                 }
473                 return false
474         }
475         trashTime, err := v.lastModified(trash)
476         if err != nil {
477                 log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
478                 return false
479         }
480
481         recent, err := v.Bucket.Head("recent/"+loc, nil)
482         if err != nil {
483                 log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
484                 return false
485         }
486         recentTime, err := v.lastModified(recent)
487         if err != nil {
488                 log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
489                 return false
490         }
491
492         ageWhenTrashed := trashTime.Sub(recentTime)
493         if ageWhenTrashed >= blobSignatureTTL {
494                 // No evidence of a race: block hasn't been written
495                 // since it became eligible for Trash. No fix needed.
496                 return false
497         }
498
499         log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, blobSignatureTTL)
500         log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
501         err = v.safeCopy(loc, "trash/"+loc)
502         if err != nil {
503                 log.Printf("error: fixRace: %s", err)
504                 return false
505         }
506         return true
507 }
508
509 func (v *S3Volume) translateError(err error) error {
510         switch err := err.(type) {
511         case *s3.Error:
512                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
513                         strings.Contains(err.Error(), "Not Found") {
514                         return os.ErrNotExist
515                 }
516                 // Other 404 errors like NoSuchVersion and
517                 // NoSuchBucket are different problems which should
518                 // get called out downstream, so we don't convert them
519                 // to os.ErrNotExist.
520         }
521         return err
522 }
523
524 // EmptyTrash looks for trashed blocks that exceeded trashLifetime
525 // and deletes them from the volume.
526 func (v *S3Volume) EmptyTrash() {
527         // Use a merge sort to find matching sets of trash/X and recent/X.
528         trashL := s3Lister{
529                 Bucket:   v.Bucket,
530                 Prefix:   "trash/",
531                 PageSize: v.indexPageSize,
532         }
533         // Define "ready to delete" as "...when EmptyTrash started".
534         now := time.Now()
535         for trash := trashL.First(); trash != nil; trash = trashL.Next() {
536                 loc := trash.Key[6:]
537                 if !v.isKeepBlock(loc) {
538                         continue
539                 }
540                 recent, err := v.Bucket.Head("recent/"+loc, nil)
541                 if err != nil {
542                         log.Printf("warning: %s: EmptyTrash: cannot delete trash %q with no corresponding recent/* marker", v, trash.Key)
543                         continue
544                 }
545                 trashT, err := time.Parse(time.RFC3339, trash.LastModified)
546                 if err != nil {
547                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
548                         continue
549                 }
550                 recentT, err := v.lastModified(recent)
551                 if err != nil {
552                         log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
553                         continue
554                 }
555                 if trashT.Sub(recentT) < blobSignatureTTL {
556                         v.fixRace(loc)
557                         continue
558                 }
559                 if now.Sub(trashT) < trashLifetime {
560                         continue
561                 }
562                 err = v.Bucket.Del(trash.Key)
563                 if err != nil {
564                         log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
565                         continue
566                 }
567                 _, err = v.Bucket.Head(loc, nil)
568                 if os.IsNotExist(err) {
569                         err = v.Bucket.Del("recent/" + loc)
570                         if err != nil {
571                                 log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
572                         }
573                 } else if err != nil {
574                         log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
575                 }
576         }
577         if err := trashL.Error(); err != nil {
578                 log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
579         }
580 }
581
582 type s3Lister struct {
583         Bucket     *s3.Bucket
584         Prefix     string
585         PageSize   int
586         nextMarker string
587         buf        []s3.Key
588         err        error
589 }
590
591 // First fetches the first page and returns the first item. It returns
592 // nil if the response is the empty set or an error occurs.
593 func (lister *s3Lister) First() *s3.Key {
594         lister.getPage()
595         return lister.pop()
596 }
597
598 // Next returns the next item, fetching the next page if necessary. It
599 // returns nil if the last available item has already been fetched, or
600 // an error occurs.
601 func (lister *s3Lister) Next() *s3.Key {
602         if len(lister.buf) == 0 && lister.nextMarker != "" {
603                 lister.getPage()
604         }
605         return lister.pop()
606 }
607
608 // Return the most recent error encountered by First or Next.
609 func (lister *s3Lister) Error() error {
610         return lister.err
611 }
612
613 func (lister *s3Lister) getPage() {
614         resp, err := lister.Bucket.List(lister.Prefix, "", lister.nextMarker, lister.PageSize)
615         lister.nextMarker = ""
616         if err != nil {
617                 lister.err = err
618                 return
619         }
620         if resp.IsTruncated {
621                 lister.nextMarker = resp.NextMarker
622         }
623         lister.buf = make([]s3.Key, 0, len(resp.Contents))
624         for _, key := range resp.Contents {
625                 if !strings.HasPrefix(key.Key, lister.Prefix) {
626                         log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
627                         continue
628                 }
629                 lister.buf = append(lister.buf, key)
630         }
631 }
632
633 func (lister *s3Lister) pop() (k *s3.Key) {
634         if len(lister.buf) > 0 {
635                 k = &lister.buf[0]
636                 lister.buf = lister.buf[1:]
637         }
638         return
639 }