Merge branch '8087-arv-cli-request-body-from-file' of https://github.com/wtsi-hgi...
[arvados.git] / services / keepstore / s3_volume.go
1 package main
2
3 import (
4         "encoding/base64"
5         "encoding/hex"
6         "flag"
7         "fmt"
8         "io"
9         "log"
10         "net/http"
11         "os"
12         "regexp"
13         "time"
14
15         "github.com/AdRoll/goamz/aws"
16         "github.com/AdRoll/goamz/s3"
17 )
18
19 var (
20         ErrS3DeleteNotAvailable = fmt.Errorf("delete without -s3-unsafe-delete is not implemented")
21
22         s3AccessKeyFile string
23         s3SecretKeyFile string
24         s3RegionName    string
25         s3Endpoint      string
26         s3Replication   int
27         s3UnsafeDelete  bool
28
29         s3ACL = s3.Private
30 )
31
32 const (
33         maxClockSkew  = 600 * time.Second
34         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
35 )
36
37 type s3VolumeAdder struct {
38         *volumeSet
39 }
40
41 func (s *s3VolumeAdder) Set(bucketName string) error {
42         if trashLifetime != 0 {
43                 return ErrNotImplemented
44         }
45         if bucketName == "" {
46                 return fmt.Errorf("no container name given")
47         }
48         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
49                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
50         }
51         region, ok := aws.Regions[s3RegionName]
52         if s3Endpoint == "" {
53                 if !ok {
54                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", s3RegionName)
55                 }
56         } else {
57                 if ok {
58                         return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
59                                 "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", s3RegionName, s3Endpoint)
60                 }
61                 region = aws.Region{
62                         Name:       s3RegionName,
63                         S3Endpoint: s3Endpoint,
64                 }
65         }
66         var err error
67         var auth aws.Auth
68         auth.AccessKey, err = readKeyFromFile(s3AccessKeyFile)
69         if err != nil {
70                 return err
71         }
72         auth.SecretKey, err = readKeyFromFile(s3SecretKeyFile)
73         if err != nil {
74                 return err
75         }
76         if flagSerializeIO {
77                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
78         }
79         v := NewS3Volume(auth, region, bucketName, flagReadonly, s3Replication)
80         if err := v.Check(); err != nil {
81                 return err
82         }
83         *s.volumeSet = append(*s.volumeSet, v)
84         return nil
85 }
86
87 func s3regions() (okList []string) {
88         for r, _ := range aws.Regions {
89                 okList = append(okList, r)
90         }
91         return
92 }
93
94 func init() {
95         flag.Var(&s3VolumeAdder{&volumes},
96                 "s3-bucket-volume",
97                 "Use the given bucket as a storage volume. Can be given multiple times.")
98         flag.StringVar(
99                 &s3RegionName,
100                 "s3-region",
101                 "",
102                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
103         flag.StringVar(
104                 &s3Endpoint,
105                 "s3-endpoint",
106                 "",
107                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
108         flag.StringVar(
109                 &s3AccessKeyFile,
110                 "s3-access-key-file",
111                 "",
112                 "File containing the access key used for subsequent -s3-bucket-volume arguments.")
113         flag.StringVar(
114                 &s3SecretKeyFile,
115                 "s3-secret-key-file",
116                 "",
117                 "File containing the secret key used for subsequent -s3-bucket-volume arguments.")
118         flag.IntVar(
119                 &s3Replication,
120                 "s3-replication",
121                 2,
122                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
123         flag.BoolVar(
124                 &s3UnsafeDelete,
125                 "s3-unsafe-delete",
126                 false,
127                 "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
128 }
129
130 type S3Volume struct {
131         *s3.Bucket
132         readonly      bool
133         replication   int
134         indexPageSize int
135 }
136
137 // NewS3Volume returns a new S3Volume using the given auth, region,
138 // and bucket name. The replication argument specifies the replication
139 // level to report when writing data.
140 func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, readonly bool, replication int) *S3Volume {
141         return &S3Volume{
142                 Bucket: &s3.Bucket{
143                         S3:   s3.New(auth, region),
144                         Name: bucket,
145                 },
146                 readonly:      readonly,
147                 replication:   replication,
148                 indexPageSize: 1000,
149         }
150 }
151
152 func (v *S3Volume) Check() error {
153         return nil
154 }
155
156 func (v *S3Volume) Get(loc string) ([]byte, error) {
157         rdr, err := v.Bucket.GetReader(loc)
158         if err != nil {
159                 return nil, v.translateError(err)
160         }
161         defer rdr.Close()
162         buf := bufs.Get(BlockSize)
163         n, err := io.ReadFull(rdr, buf)
164         switch err {
165         case nil, io.EOF, io.ErrUnexpectedEOF:
166                 return buf[:n], nil
167         default:
168                 bufs.Put(buf)
169                 return nil, v.translateError(err)
170         }
171 }
172
173 func (v *S3Volume) Compare(loc string, expect []byte) error {
174         rdr, err := v.Bucket.GetReader(loc)
175         if err != nil {
176                 return v.translateError(err)
177         }
178         defer rdr.Close()
179         return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
180 }
181
182 func (v *S3Volume) Put(loc string, block []byte) error {
183         if v.readonly {
184                 return MethodDisabledError
185         }
186         var opts s3.Options
187         if len(block) > 0 {
188                 md5, err := hex.DecodeString(loc)
189                 if err != nil {
190                         return err
191                 }
192                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
193         }
194         return v.translateError(
195                 v.Bucket.Put(
196                         loc, block, "application/octet-stream", s3ACL, opts))
197 }
198
199 func (v *S3Volume) Touch(loc string) error {
200         if v.readonly {
201                 return MethodDisabledError
202         }
203         result, err := v.Bucket.PutCopy(loc, s3ACL, s3.CopyOptions{
204                 ContentType:       "application/octet-stream",
205                 MetadataDirective: "REPLACE",
206         }, v.Bucket.Name+"/"+loc)
207         if err != nil {
208                 return v.translateError(err)
209         }
210         t, err := time.Parse(time.RFC3339, result.LastModified)
211         if err != nil {
212                 return err
213         }
214         if time.Since(t) > maxClockSkew {
215                 return fmt.Errorf("PutCopy returned old LastModified %s => %s (%s ago)", result.LastModified, t, time.Since(t))
216         }
217         return nil
218 }
219
220 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
221         resp, err := v.Bucket.Head(loc, nil)
222         if err != nil {
223                 return zeroTime, v.translateError(err)
224         }
225         hdr := resp.Header.Get("Last-Modified")
226         t, err := time.Parse(time.RFC1123, hdr)
227         if err != nil && hdr != "" {
228                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
229                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
230                 // as required by HTTP spec. If it's not a valid HTTP
231                 // header value, it's probably AWS (or s3test) giving
232                 // us a nearly-RFC1123 timestamp.
233                 t, err = time.Parse(nearlyRFC1123, hdr)
234         }
235         return t, err
236 }
237
238 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
239         nextMarker := ""
240         for {
241                 listResp, err := v.Bucket.List(prefix, "", nextMarker, v.indexPageSize)
242                 if err != nil {
243                         return err
244                 }
245                 for _, key := range listResp.Contents {
246                         t, err := time.Parse(time.RFC3339, key.LastModified)
247                         if err != nil {
248                                 return err
249                         }
250                         if !v.isKeepBlock(key.Key) {
251                                 continue
252                         }
253                         fmt.Fprintf(writer, "%s+%d %d\n", key.Key, key.Size, t.Unix())
254                 }
255                 if !listResp.IsTruncated {
256                         break
257                 }
258                 nextMarker = listResp.NextMarker
259         }
260         return nil
261 }
262
263 // Trash a Keep block.
264 func (v *S3Volume) Trash(loc string) error {
265         if v.readonly {
266                 return MethodDisabledError
267         }
268         if trashLifetime != 0 {
269                 return ErrNotImplemented
270         }
271         if t, err := v.Mtime(loc); err != nil {
272                 return err
273         } else if time.Since(t) < blobSignatureTTL {
274                 return nil
275         }
276         if !s3UnsafeDelete {
277                 return ErrS3DeleteNotAvailable
278         }
279         return v.Bucket.Del(loc)
280 }
281
282 // TBD
283 func (v *S3Volume) Untrash(loc string) error {
284         return ErrNotImplemented
285 }
286
287 func (v *S3Volume) Status() *VolumeStatus {
288         return &VolumeStatus{
289                 DeviceNum: 1,
290                 BytesFree: BlockSize * 1000,
291                 BytesUsed: 1,
292         }
293 }
294
295 func (v *S3Volume) String() string {
296         return fmt.Sprintf("s3-bucket:%+q", v.Bucket.Name)
297 }
298
299 func (v *S3Volume) Writable() bool {
300         return !v.readonly
301 }
302 func (v *S3Volume) Replication() int {
303         return v.replication
304 }
305
306 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
307
308 func (v *S3Volume) isKeepBlock(s string) bool {
309         return s3KeepBlockRegexp.MatchString(s)
310 }
311
312 func (v *S3Volume) translateError(err error) error {
313         switch err := err.(type) {
314         case *s3.Error:
315                 if err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey" {
316                         return os.ErrNotExist
317                 }
318                 // Other 404 errors like NoSuchVersion and
319                 // NoSuchBucket are different problems which should
320                 // get called out downstream, so we don't convert them
321                 // to os.ErrNotExist.
322         }
323         return err
324 }
325
326 // EmptyTrash looks for trashed blocks that exceeded trashLifetime
327 // and deletes them from the volume.
328 // TBD
329 func (v *S3Volume) EmptyTrash() {
330 }