8178: rename Delete api as Trash; add Untrash to volume interface; add UndeleteHandle...
[arvados.git] / services / keepstore / s3_volume.go
1 package main
2
3 import (
4         "encoding/base64"
5         "encoding/hex"
6         "flag"
7         "fmt"
8         "io"
9         "log"
10         "net/http"
11         "os"
12         "regexp"
13         "time"
14
15         "github.com/AdRoll/goamz/aws"
16         "github.com/AdRoll/goamz/s3"
17 )
18
19 var (
20         ErrS3DeleteNotAvailable = fmt.Errorf("delete without -s3-unsafe-delete is not implemented")
21
22         s3AccessKeyFile string
23         s3SecretKeyFile string
24         s3RegionName    string
25         s3Endpoint      string
26         s3Replication   int
27         s3UnsafeDelete  bool
28
29         s3ACL = s3.Private
30 )
31
32 const (
33         maxClockSkew  = 600 * time.Second
34         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
35 )
36
37 type s3VolumeAdder struct {
38         *volumeSet
39 }
40
41 func (s *s3VolumeAdder) Set(bucketName string) error {
42         if bucketName == "" {
43                 return fmt.Errorf("no container name given")
44         }
45         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
46                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
47         }
48         region, ok := aws.Regions[s3RegionName]
49         if s3Endpoint == "" {
50                 if !ok {
51                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", s3RegionName)
52                 }
53         } else {
54                 if ok {
55                         return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
56                                 "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", s3RegionName, s3Endpoint)
57                 }
58                 region = aws.Region{
59                         Name:       s3RegionName,
60                         S3Endpoint: s3Endpoint,
61                 }
62         }
63         var err error
64         var auth aws.Auth
65         auth.AccessKey, err = readKeyFromFile(s3AccessKeyFile)
66         if err != nil {
67                 return err
68         }
69         auth.SecretKey, err = readKeyFromFile(s3SecretKeyFile)
70         if err != nil {
71                 return err
72         }
73         if flagSerializeIO {
74                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
75         }
76         v := NewS3Volume(auth, region, bucketName, flagReadonly, s3Replication)
77         if err := v.Check(); err != nil {
78                 return err
79         }
80         *s.volumeSet = append(*s.volumeSet, v)
81         return nil
82 }
83
84 func s3regions() (okList []string) {
85         for r, _ := range aws.Regions {
86                 okList = append(okList, r)
87         }
88         return
89 }
90
91 func init() {
92         flag.Var(&s3VolumeAdder{&volumes},
93                 "s3-bucket-volume",
94                 "Use the given bucket as a storage volume. Can be given multiple times.")
95         flag.StringVar(
96                 &s3RegionName,
97                 "s3-region",
98                 "",
99                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
100         flag.StringVar(
101                 &s3Endpoint,
102                 "s3-endpoint",
103                 "",
104                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
105         flag.StringVar(
106                 &s3AccessKeyFile,
107                 "s3-access-key-file",
108                 "",
109                 "File containing the access key used for subsequent -s3-bucket-volume arguments.")
110         flag.StringVar(
111                 &s3SecretKeyFile,
112                 "s3-secret-key-file",
113                 "",
114                 "File containing the secret key used for subsequent -s3-bucket-volume arguments.")
115         flag.IntVar(
116                 &s3Replication,
117                 "s3-replication",
118                 2,
119                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
120         flag.BoolVar(
121                 &s3UnsafeDelete,
122                 "s3-unsafe-delete",
123                 false,
124                 "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
125 }
126
127 type S3Volume struct {
128         *s3.Bucket
129         readonly      bool
130         replication   int
131         indexPageSize int
132 }
133
134 // NewS3Volume returns a new S3Volume using the given auth, region,
135 // and bucket name. The replication argument specifies the replication
136 // level to report when writing data.
137 func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, readonly bool, replication int) *S3Volume {
138         return &S3Volume{
139                 Bucket: &s3.Bucket{
140                         S3:   s3.New(auth, region),
141                         Name: bucket,
142                 },
143                 readonly:      readonly,
144                 replication:   replication,
145                 indexPageSize: 1000,
146         }
147 }
148
149 func (v *S3Volume) Check() error {
150         return nil
151 }
152
153 func (v *S3Volume) Get(loc string) ([]byte, error) {
154         rdr, err := v.Bucket.GetReader(loc)
155         if err != nil {
156                 return nil, v.translateError(err)
157         }
158         defer rdr.Close()
159         buf := bufs.Get(BlockSize)
160         n, err := io.ReadFull(rdr, buf)
161         switch err {
162         case nil, io.EOF, io.ErrUnexpectedEOF:
163                 return buf[:n], nil
164         default:
165                 bufs.Put(buf)
166                 return nil, v.translateError(err)
167         }
168 }
169
170 func (v *S3Volume) Compare(loc string, expect []byte) error {
171         rdr, err := v.Bucket.GetReader(loc)
172         if err != nil {
173                 return v.translateError(err)
174         }
175         defer rdr.Close()
176         return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
177 }
178
179 func (v *S3Volume) Put(loc string, block []byte) error {
180         if v.readonly {
181                 return MethodDisabledError
182         }
183         var opts s3.Options
184         if len(block) > 0 {
185                 md5, err := hex.DecodeString(loc)
186                 if err != nil {
187                         return err
188                 }
189                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
190         }
191         return v.translateError(
192                 v.Bucket.Put(
193                         loc, block, "application/octet-stream", s3ACL, opts))
194 }
195
196 func (v *S3Volume) Touch(loc string) error {
197         if v.readonly {
198                 return MethodDisabledError
199         }
200         result, err := v.Bucket.PutCopy(loc, s3ACL, s3.CopyOptions{
201                 ContentType:       "application/octet-stream",
202                 MetadataDirective: "REPLACE",
203         }, v.Bucket.Name+"/"+loc)
204         if err != nil {
205                 return v.translateError(err)
206         }
207         t, err := time.Parse(time.RFC3339, result.LastModified)
208         if err != nil {
209                 return err
210         }
211         if time.Since(t) > maxClockSkew {
212                 return fmt.Errorf("PutCopy returned old LastModified %s => %s (%s ago)", result.LastModified, t, time.Since(t))
213         }
214         return nil
215 }
216
217 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
218         resp, err := v.Bucket.Head(loc, nil)
219         if err != nil {
220                 return zeroTime, v.translateError(err)
221         }
222         hdr := resp.Header.Get("Last-Modified")
223         t, err := time.Parse(time.RFC1123, hdr)
224         if err != nil && hdr != "" {
225                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
226                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
227                 // as required by HTTP spec. If it's not a valid HTTP
228                 // header value, it's probably AWS (or s3test) giving
229                 // us a nearly-RFC1123 timestamp.
230                 t, err = time.Parse(nearlyRFC1123, hdr)
231         }
232         return t, err
233 }
234
235 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
236         nextMarker := ""
237         for {
238                 listResp, err := v.Bucket.List(prefix, "", nextMarker, v.indexPageSize)
239                 if err != nil {
240                         return err
241                 }
242                 for _, key := range listResp.Contents {
243                         t, err := time.Parse(time.RFC3339, key.LastModified)
244                         if err != nil {
245                                 return err
246                         }
247                         if !v.isKeepBlock(key.Key) {
248                                 continue
249                         }
250                         fmt.Fprintf(writer, "%s+%d %d\n", key.Key, key.Size, t.Unix())
251                 }
252                 if !listResp.IsTruncated {
253                         break
254                 }
255                 nextMarker = listResp.NextMarker
256         }
257         return nil
258 }
259
260 func (v *S3Volume) Trash(loc string) error {
261         if v.readonly {
262                 return MethodDisabledError
263         }
264         if t, err := v.Mtime(loc); err != nil {
265                 return err
266         } else if time.Since(t) < blobSignatureTTL {
267                 return nil
268         }
269         if !s3UnsafeDelete {
270                 return ErrS3DeleteNotAvailable
271         }
272         return v.Bucket.Del(loc)
273 }
274
275 // TBD
276 func (v *S3Volume) Untrash(loc string) error {
277         return nil
278 }
279
280 func (v *S3Volume) Status() *VolumeStatus {
281         return &VolumeStatus{
282                 DeviceNum: 1,
283                 BytesFree: BlockSize * 1000,
284                 BytesUsed: 1,
285         }
286 }
287
288 func (v *S3Volume) String() string {
289         return fmt.Sprintf("s3-bucket:%+q", v.Bucket.Name)
290 }
291
292 func (v *S3Volume) Writable() bool {
293         return !v.readonly
294 }
295 func (v *S3Volume) Replication() int {
296         return v.replication
297 }
298
299 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
300
301 func (v *S3Volume) isKeepBlock(s string) bool {
302         return s3KeepBlockRegexp.MatchString(s)
303 }
304
305 func (v *S3Volume) translateError(err error) error {
306         switch err := err.(type) {
307         case *s3.Error:
308                 if err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey" {
309                         return os.ErrNotExist
310                 }
311                 // Other 404 errors like NoSuchVersion and
312                 // NoSuchBucket are different problems which should
313                 // get called out downstream, so we don't convert them
314                 // to os.ErrNotExist.
315         }
316         return err
317 }