80a7c89f2ed4f6669566711c40c4d0a59940e439
[arvados.git] / services / keepstore / s3_volume.go
1 package main
2
3 import (
4         "encoding/base64"
5         "encoding/hex"
6         "flag"
7         "fmt"
8         "io"
9         "log"
10         "net/http"
11         "os"
12         "regexp"
13         "strings"
14         "time"
15
16         "github.com/AdRoll/goamz/aws"
17         "github.com/AdRoll/goamz/s3"
18 )
19
20 var (
21         ErrS3DeleteNotAvailable = fmt.Errorf("delete without -s3-unsafe-delete is not implemented")
22
23         s3AccessKeyFile string
24         s3SecretKeyFile string
25         s3RegionName    string
26         s3Endpoint      string
27         s3Replication   int
28         s3UnsafeDelete  bool
29
30         s3ACL = s3.Private
31 )
32
33 const (
34         maxClockSkew  = 600 * time.Second
35         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
36 )
37
38 type s3VolumeAdder struct {
39         *volumeSet
40 }
41
42 func (s *s3VolumeAdder) Set(bucketName string) error {
43         if trashLifetime != 0 {
44                 return ErrNotImplemented
45         }
46         if bucketName == "" {
47                 return fmt.Errorf("no container name given")
48         }
49         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
50                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
51         }
52         region, ok := aws.Regions[s3RegionName]
53         if s3Endpoint == "" {
54                 if !ok {
55                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", s3RegionName)
56                 }
57         } else {
58                 if ok {
59                         return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
60                                 "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", s3RegionName, s3Endpoint)
61                 }
62                 region = aws.Region{
63                         Name:       s3RegionName,
64                         S3Endpoint: s3Endpoint,
65                 }
66         }
67         var err error
68         var auth aws.Auth
69         auth.AccessKey, err = readKeyFromFile(s3AccessKeyFile)
70         if err != nil {
71                 return err
72         }
73         auth.SecretKey, err = readKeyFromFile(s3SecretKeyFile)
74         if err != nil {
75                 return err
76         }
77         if flagSerializeIO {
78                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
79         }
80         v := NewS3Volume(auth, region, bucketName, flagReadonly, s3Replication)
81         if err := v.Check(); err != nil {
82                 return err
83         }
84         *s.volumeSet = append(*s.volumeSet, v)
85         return nil
86 }
87
88 func s3regions() (okList []string) {
89         for r, _ := range aws.Regions {
90                 okList = append(okList, r)
91         }
92         return
93 }
94
95 func init() {
96         flag.Var(&s3VolumeAdder{&volumes},
97                 "s3-bucket-volume",
98                 "Use the given bucket as a storage volume. Can be given multiple times.")
99         flag.StringVar(
100                 &s3RegionName,
101                 "s3-region",
102                 "",
103                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
104         flag.StringVar(
105                 &s3Endpoint,
106                 "s3-endpoint",
107                 "",
108                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
109         flag.StringVar(
110                 &s3AccessKeyFile,
111                 "s3-access-key-file",
112                 "",
113                 "File containing the access key used for subsequent -s3-bucket-volume arguments.")
114         flag.StringVar(
115                 &s3SecretKeyFile,
116                 "s3-secret-key-file",
117                 "",
118                 "File containing the secret key used for subsequent -s3-bucket-volume arguments.")
119         flag.IntVar(
120                 &s3Replication,
121                 "s3-replication",
122                 2,
123                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
124         flag.BoolVar(
125                 &s3UnsafeDelete,
126                 "s3-unsafe-delete",
127                 false,
128                 "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
129 }
130
131 type S3Volume struct {
132         *s3.Bucket
133         readonly      bool
134         replication   int
135         indexPageSize int
136 }
137
138 // NewS3Volume returns a new S3Volume using the given auth, region,
139 // and bucket name. The replication argument specifies the replication
140 // level to report when writing data.
141 func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, readonly bool, replication int) *S3Volume {
142         return &S3Volume{
143                 Bucket: &s3.Bucket{
144                         S3:   s3.New(auth, region),
145                         Name: bucket,
146                 },
147                 readonly:      readonly,
148                 replication:   replication,
149                 indexPageSize: 1000,
150         }
151 }
152
153 func (v *S3Volume) Check() error {
154         return nil
155 }
156
157 func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
158         rdr, err := v.Bucket.GetReader(loc)
159         if err != nil {
160                 return 0, v.translateError(err)
161         }
162         defer rdr.Close()
163         n, err := io.ReadFull(rdr, buf)
164         switch err {
165         case nil, io.EOF, io.ErrUnexpectedEOF:
166                 return n, nil
167         default:
168                 return 0, v.translateError(err)
169         }
170 }
171
172 func (v *S3Volume) Compare(loc string, expect []byte) error {
173         rdr, err := v.Bucket.GetReader(loc)
174         if err != nil {
175                 return v.translateError(err)
176         }
177         defer rdr.Close()
178         return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
179 }
180
181 func (v *S3Volume) Put(loc string, block []byte) error {
182         if v.readonly {
183                 return MethodDisabledError
184         }
185         var opts s3.Options
186         if len(block) > 0 {
187                 md5, err := hex.DecodeString(loc)
188                 if err != nil {
189                         return err
190                 }
191                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
192         }
193         return v.translateError(
194                 v.Bucket.Put(
195                         loc, block, "application/octet-stream", s3ACL, opts))
196 }
197
198 func (v *S3Volume) Touch(loc string) error {
199         if v.readonly {
200                 return MethodDisabledError
201         }
202         result, err := v.Bucket.PutCopy(loc, s3ACL, s3.CopyOptions{
203                 ContentType:       "application/octet-stream",
204                 MetadataDirective: "REPLACE",
205         }, v.Bucket.Name+"/"+loc)
206         if err != nil {
207                 return v.translateError(err)
208         }
209         t, err := time.Parse(time.RFC3339, result.LastModified)
210         if err != nil {
211                 return err
212         }
213         if time.Since(t) > maxClockSkew {
214                 return fmt.Errorf("PutCopy returned old LastModified %s => %s (%s ago)", result.LastModified, t, time.Since(t))
215         }
216         return nil
217 }
218
219 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
220         resp, err := v.Bucket.Head(loc, nil)
221         if err != nil {
222                 return zeroTime, v.translateError(err)
223         }
224         hdr := resp.Header.Get("Last-Modified")
225         t, err := time.Parse(time.RFC1123, hdr)
226         if err != nil && hdr != "" {
227                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
228                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
229                 // as required by HTTP spec. If it's not a valid HTTP
230                 // header value, it's probably AWS (or s3test) giving
231                 // us a nearly-RFC1123 timestamp.
232                 t, err = time.Parse(nearlyRFC1123, hdr)
233         }
234         return t, err
235 }
236
237 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
238         nextMarker := ""
239         for {
240                 listResp, err := v.Bucket.List(prefix, "", nextMarker, v.indexPageSize)
241                 if err != nil {
242                         return err
243                 }
244                 for _, key := range listResp.Contents {
245                         t, err := time.Parse(time.RFC3339, key.LastModified)
246                         if err != nil {
247                                 return err
248                         }
249                         if !v.isKeepBlock(key.Key) {
250                                 continue
251                         }
252                         fmt.Fprintf(writer, "%s+%d %d\n", key.Key, key.Size, t.Unix())
253                 }
254                 if !listResp.IsTruncated {
255                         break
256                 }
257                 nextMarker = listResp.NextMarker
258         }
259         return nil
260 }
261
262 // Trash a Keep block.
263 func (v *S3Volume) Trash(loc string) error {
264         if v.readonly {
265                 return MethodDisabledError
266         }
267         if trashLifetime != 0 {
268                 return ErrNotImplemented
269         }
270         if t, err := v.Mtime(loc); err != nil {
271                 return err
272         } else if time.Since(t) < blobSignatureTTL {
273                 return nil
274         }
275         if !s3UnsafeDelete {
276                 return ErrS3DeleteNotAvailable
277         }
278         return v.Bucket.Del(loc)
279 }
280
281 // TBD
282 func (v *S3Volume) Untrash(loc string) error {
283         return ErrNotImplemented
284 }
285
286 func (v *S3Volume) Status() *VolumeStatus {
287         return &VolumeStatus{
288                 DeviceNum: 1,
289                 BytesFree: BlockSize * 1000,
290                 BytesUsed: 1,
291         }
292 }
293
294 func (v *S3Volume) String() string {
295         return fmt.Sprintf("s3-bucket:%+q", v.Bucket.Name)
296 }
297
298 func (v *S3Volume) Writable() bool {
299         return !v.readonly
300 }
301 func (v *S3Volume) Replication() int {
302         return v.replication
303 }
304
305 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
306
307 func (v *S3Volume) isKeepBlock(s string) bool {
308         return s3KeepBlockRegexp.MatchString(s)
309 }
310
311 func (v *S3Volume) translateError(err error) error {
312         switch err := err.(type) {
313         case *s3.Error:
314                 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
315                         strings.Contains(err.Error(), "Not Found") {
316                         return os.ErrNotExist
317                 }
318                 // Other 404 errors like NoSuchVersion and
319                 // NoSuchBucket are different problems which should
320                 // get called out downstream, so we don't convert them
321                 // to os.ErrNotExist.
322         }
323         return err
324 }
325
326 // EmptyTrash looks for trashed blocks that exceeded trashLifetime
327 // and deletes them from the volume.
328 // TBD
329 func (v *S3Volume) EmptyTrash() {
330 }