8178: All three currently supported volumes return error when trash-lifetime period...
[arvados.git] / services / keepstore / s3_volume.go
1 package main
2
3 import (
4         "encoding/base64"
5         "encoding/hex"
6         "flag"
7         "fmt"
8         "io"
9         "log"
10         "net/http"
11         "os"
12         "regexp"
13         "time"
14
15         "github.com/AdRoll/goamz/aws"
16         "github.com/AdRoll/goamz/s3"
17 )
18
19 var (
20         ErrS3DeleteNotAvailable = fmt.Errorf("delete without -s3-unsafe-delete is not implemented")
21
22         s3AccessKeyFile string
23         s3SecretKeyFile string
24         s3RegionName    string
25         s3Endpoint      string
26         s3Replication   int
27         s3UnsafeDelete  bool
28
29         s3ACL = s3.Private
30 )
31
32 const (
33         maxClockSkew  = 600 * time.Second
34         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
35 )
36
37 type s3VolumeAdder struct {
38         *volumeSet
39 }
40
41 func (s *s3VolumeAdder) Set(bucketName string) error {
42         if trashLifetime <= 0 {
43                 log.Print("Missing required configuration parameter: trash-lifetime")
44                 return ErrNotImplemented
45         }
46         if bucketName == "" {
47                 return fmt.Errorf("no container name given")
48         }
49         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
50                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
51         }
52         region, ok := aws.Regions[s3RegionName]
53         if s3Endpoint == "" {
54                 if !ok {
55                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", s3RegionName)
56                 }
57         } else {
58                 if ok {
59                         return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
60                                 "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", s3RegionName, s3Endpoint)
61                 }
62                 region = aws.Region{
63                         Name:       s3RegionName,
64                         S3Endpoint: s3Endpoint,
65                 }
66         }
67         var err error
68         var auth aws.Auth
69         auth.AccessKey, err = readKeyFromFile(s3AccessKeyFile)
70         if err != nil {
71                 return err
72         }
73         auth.SecretKey, err = readKeyFromFile(s3SecretKeyFile)
74         if err != nil {
75                 return err
76         }
77         if flagSerializeIO {
78                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
79         }
80         v := NewS3Volume(auth, region, bucketName, flagReadonly, s3Replication)
81         if err := v.Check(); err != nil {
82                 return err
83         }
84         *s.volumeSet = append(*s.volumeSet, v)
85         return nil
86 }
87
88 func s3regions() (okList []string) {
89         for r, _ := range aws.Regions {
90                 okList = append(okList, r)
91         }
92         return
93 }
94
95 func init() {
96         flag.Var(&s3VolumeAdder{&volumes},
97                 "s3-bucket-volume",
98                 "Use the given bucket as a storage volume. Can be given multiple times.")
99         flag.StringVar(
100                 &s3RegionName,
101                 "s3-region",
102                 "",
103                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
104         flag.StringVar(
105                 &s3Endpoint,
106                 "s3-endpoint",
107                 "",
108                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
109         flag.StringVar(
110                 &s3AccessKeyFile,
111                 "s3-access-key-file",
112                 "",
113                 "File containing the access key used for subsequent -s3-bucket-volume arguments.")
114         flag.StringVar(
115                 &s3SecretKeyFile,
116                 "s3-secret-key-file",
117                 "",
118                 "File containing the secret key used for subsequent -s3-bucket-volume arguments.")
119         flag.IntVar(
120                 &s3Replication,
121                 "s3-replication",
122                 2,
123                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
124         flag.BoolVar(
125                 &s3UnsafeDelete,
126                 "s3-unsafe-delete",
127                 false,
128                 "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
129 }
130
131 type S3Volume struct {
132         *s3.Bucket
133         readonly      bool
134         replication   int
135         indexPageSize int
136 }
137
138 // NewS3Volume returns a new S3Volume using the given auth, region,
139 // and bucket name. The replication argument specifies the replication
140 // level to report when writing data.
141 func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, readonly bool, replication int) *S3Volume {
142         return &S3Volume{
143                 Bucket: &s3.Bucket{
144                         S3:   s3.New(auth, region),
145                         Name: bucket,
146                 },
147                 readonly:      readonly,
148                 replication:   replication,
149                 indexPageSize: 1000,
150         }
151 }
152
153 func (v *S3Volume) Check() error {
154         return nil
155 }
156
157 func (v *S3Volume) Get(loc string) ([]byte, error) {
158         rdr, err := v.Bucket.GetReader(loc)
159         if err != nil {
160                 return nil, v.translateError(err)
161         }
162         defer rdr.Close()
163         buf := bufs.Get(BlockSize)
164         n, err := io.ReadFull(rdr, buf)
165         switch err {
166         case nil, io.EOF, io.ErrUnexpectedEOF:
167                 return buf[:n], nil
168         default:
169                 bufs.Put(buf)
170                 return nil, v.translateError(err)
171         }
172 }
173
174 func (v *S3Volume) Compare(loc string, expect []byte) error {
175         rdr, err := v.Bucket.GetReader(loc)
176         if err != nil {
177                 return v.translateError(err)
178         }
179         defer rdr.Close()
180         return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
181 }
182
183 func (v *S3Volume) Put(loc string, block []byte) error {
184         if v.readonly {
185                 return MethodDisabledError
186         }
187         var opts s3.Options
188         if len(block) > 0 {
189                 md5, err := hex.DecodeString(loc)
190                 if err != nil {
191                         return err
192                 }
193                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
194         }
195         return v.translateError(
196                 v.Bucket.Put(
197                         loc, block, "application/octet-stream", s3ACL, opts))
198 }
199
200 func (v *S3Volume) Touch(loc string) error {
201         if v.readonly {
202                 return MethodDisabledError
203         }
204         result, err := v.Bucket.PutCopy(loc, s3ACL, s3.CopyOptions{
205                 ContentType:       "application/octet-stream",
206                 MetadataDirective: "REPLACE",
207         }, v.Bucket.Name+"/"+loc)
208         if err != nil {
209                 return v.translateError(err)
210         }
211         t, err := time.Parse(time.RFC3339, result.LastModified)
212         if err != nil {
213                 return err
214         }
215         if time.Since(t) > maxClockSkew {
216                 return fmt.Errorf("PutCopy returned old LastModified %s => %s (%s ago)", result.LastModified, t, time.Since(t))
217         }
218         return nil
219 }
220
221 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
222         resp, err := v.Bucket.Head(loc, nil)
223         if err != nil {
224                 return zeroTime, v.translateError(err)
225         }
226         hdr := resp.Header.Get("Last-Modified")
227         t, err := time.Parse(time.RFC1123, hdr)
228         if err != nil && hdr != "" {
229                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
230                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
231                 // as required by HTTP spec. If it's not a valid HTTP
232                 // header value, it's probably AWS (or s3test) giving
233                 // us a nearly-RFC1123 timestamp.
234                 t, err = time.Parse(nearlyRFC1123, hdr)
235         }
236         return t, err
237 }
238
239 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
240         nextMarker := ""
241         for {
242                 listResp, err := v.Bucket.List(prefix, "", nextMarker, v.indexPageSize)
243                 if err != nil {
244                         return err
245                 }
246                 for _, key := range listResp.Contents {
247                         t, err := time.Parse(time.RFC3339, key.LastModified)
248                         if err != nil {
249                                 return err
250                         }
251                         if !v.isKeepBlock(key.Key) {
252                                 continue
253                         }
254                         fmt.Fprintf(writer, "%s+%d %d\n", key.Key, key.Size, t.Unix())
255                 }
256                 if !listResp.IsTruncated {
257                         break
258                 }
259                 nextMarker = listResp.NextMarker
260         }
261         return nil
262 }
263
264 func (v *S3Volume) Trash(loc string) error {
265         if v.readonly {
266                 return MethodDisabledError
267         }
268         if t, err := v.Mtime(loc); err != nil {
269                 return err
270         } else if time.Since(t) < blobSignatureTTL {
271                 return nil
272         }
273         if !s3UnsafeDelete {
274                 return ErrS3DeleteNotAvailable
275         }
276         return v.Bucket.Del(loc)
277 }
278
279 // TBD
280 func (v *S3Volume) Untrash(loc string) error {
281         return nil
282 }
283
284 func (v *S3Volume) Status() *VolumeStatus {
285         return &VolumeStatus{
286                 DeviceNum: 1,
287                 BytesFree: BlockSize * 1000,
288                 BytesUsed: 1,
289         }
290 }
291
292 func (v *S3Volume) String() string {
293         return fmt.Sprintf("s3-bucket:%+q", v.Bucket.Name)
294 }
295
296 func (v *S3Volume) Writable() bool {
297         return !v.readonly
298 }
299 func (v *S3Volume) Replication() int {
300         return v.replication
301 }
302
303 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
304
305 func (v *S3Volume) isKeepBlock(s string) bool {
306         return s3KeepBlockRegexp.MatchString(s)
307 }
308
309 func (v *S3Volume) translateError(err error) error {
310         switch err := err.(type) {
311         case *s3.Error:
312                 if err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey" {
313                         return os.ErrNotExist
314                 }
315                 // Other 404 errors like NoSuchVersion and
316                 // NoSuchBucket are different problems which should
317                 // get called out downstream, so we don't convert them
318                 // to os.ErrNotExist.
319         }
320         return err
321 }