closes #8464
[arvados.git] / services / keepstore / s3_volume.go
1 package main
2
3 import (
4         "encoding/base64"
5         "encoding/hex"
6         "flag"
7         "fmt"
8         "io"
9         "log"
10         "net/http"
11         "os"
12         "regexp"
13         "time"
14
15         "github.com/AdRoll/goamz/aws"
16         "github.com/AdRoll/goamz/s3"
17 )
18
19 var (
20         ErrS3DeleteNotAvailable = fmt.Errorf("delete without -s3-unsafe-delete is not implemented")
21
22         s3AccessKeyFile string
23         s3SecretKeyFile string
24         s3RegionName    string
25         s3Endpoint      string
26         s3Replication   int
27         s3UnsafeDelete  bool
28
29         s3ACL = s3.Private
30 )
31
32 const (
33         maxClockSkew  = 600 * time.Second
34         nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
35 )
36
37 type s3VolumeAdder struct {
38         *volumeSet
39 }
40
41 func (s *s3VolumeAdder) Set(bucketName string) error {
42         if trashLifetime != 0 {
43                 return ErrNotImplemented
44         }
45         if bucketName == "" {
46                 return fmt.Errorf("no container name given")
47         }
48         if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
49                 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
50         }
51         region, ok := aws.Regions[s3RegionName]
52         if s3Endpoint == "" {
53                 if !ok {
54                         return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", s3RegionName)
55                 }
56         } else {
57                 if ok {
58                         return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
59                                 "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", s3RegionName, s3Endpoint)
60                 }
61                 region = aws.Region{
62                         Name:       s3RegionName,
63                         S3Endpoint: s3Endpoint,
64                 }
65         }
66         var err error
67         var auth aws.Auth
68         auth.AccessKey, err = readKeyFromFile(s3AccessKeyFile)
69         if err != nil {
70                 return err
71         }
72         auth.SecretKey, err = readKeyFromFile(s3SecretKeyFile)
73         if err != nil {
74                 return err
75         }
76         if flagSerializeIO {
77                 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
78         }
79         v := NewS3Volume(auth, region, bucketName, flagReadonly, s3Replication)
80         if err := v.Check(); err != nil {
81                 return err
82         }
83         *s.volumeSet = append(*s.volumeSet, v)
84         return nil
85 }
86
87 func s3regions() (okList []string) {
88         for r, _ := range aws.Regions {
89                 okList = append(okList, r)
90         }
91         return
92 }
93
94 func init() {
95         flag.Var(&s3VolumeAdder{&volumes},
96                 "s3-bucket-volume",
97                 "Use the given bucket as a storage volume. Can be given multiple times.")
98         flag.StringVar(
99                 &s3RegionName,
100                 "s3-region",
101                 "",
102                 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
103         flag.StringVar(
104                 &s3Endpoint,
105                 "s3-endpoint",
106                 "",
107                 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
108         flag.StringVar(
109                 &s3AccessKeyFile,
110                 "s3-access-key-file",
111                 "",
112                 "File containing the access key used for subsequent -s3-bucket-volume arguments.")
113         flag.StringVar(
114                 &s3SecretKeyFile,
115                 "s3-secret-key-file",
116                 "",
117                 "File containing the secret key used for subsequent -s3-bucket-volume arguments.")
118         flag.IntVar(
119                 &s3Replication,
120                 "s3-replication",
121                 2,
122                 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
123         flag.BoolVar(
124                 &s3UnsafeDelete,
125                 "s3-unsafe-delete",
126                 false,
127                 "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
128 }
129
130 type S3Volume struct {
131         *s3.Bucket
132         readonly      bool
133         replication   int
134         indexPageSize int
135 }
136
137 // NewS3Volume returns a new S3Volume using the given auth, region,
138 // and bucket name. The replication argument specifies the replication
139 // level to report when writing data.
140 func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, readonly bool, replication int) *S3Volume {
141         return &S3Volume{
142                 Bucket: &s3.Bucket{
143                         S3:   s3.New(auth, region),
144                         Name: bucket,
145                 },
146                 readonly:      readonly,
147                 replication:   replication,
148                 indexPageSize: 1000,
149         }
150 }
151
152 func (v *S3Volume) Check() error {
153         return nil
154 }
155
156 func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
157         rdr, err := v.Bucket.GetReader(loc)
158         if err != nil {
159                 return 0, v.translateError(err)
160         }
161         defer rdr.Close()
162         n, err := io.ReadFull(rdr, buf)
163         switch err {
164         case nil, io.EOF, io.ErrUnexpectedEOF:
165                 return n, nil
166         default:
167                 return 0, v.translateError(err)
168         }
169 }
170
171 func (v *S3Volume) Compare(loc string, expect []byte) error {
172         rdr, err := v.Bucket.GetReader(loc)
173         if err != nil {
174                 return v.translateError(err)
175         }
176         defer rdr.Close()
177         return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
178 }
179
180 func (v *S3Volume) Put(loc string, block []byte) error {
181         if v.readonly {
182                 return MethodDisabledError
183         }
184         var opts s3.Options
185         if len(block) > 0 {
186                 md5, err := hex.DecodeString(loc)
187                 if err != nil {
188                         return err
189                 }
190                 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
191         }
192         return v.translateError(
193                 v.Bucket.Put(
194                         loc, block, "application/octet-stream", s3ACL, opts))
195 }
196
197 func (v *S3Volume) Touch(loc string) error {
198         if v.readonly {
199                 return MethodDisabledError
200         }
201         result, err := v.Bucket.PutCopy(loc, s3ACL, s3.CopyOptions{
202                 ContentType:       "application/octet-stream",
203                 MetadataDirective: "REPLACE",
204         }, v.Bucket.Name+"/"+loc)
205         if err != nil {
206                 return v.translateError(err)
207         }
208         t, err := time.Parse(time.RFC3339, result.LastModified)
209         if err != nil {
210                 return err
211         }
212         if time.Since(t) > maxClockSkew {
213                 return fmt.Errorf("PutCopy returned old LastModified %s => %s (%s ago)", result.LastModified, t, time.Since(t))
214         }
215         return nil
216 }
217
218 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
219         resp, err := v.Bucket.Head(loc, nil)
220         if err != nil {
221                 return zeroTime, v.translateError(err)
222         }
223         hdr := resp.Header.Get("Last-Modified")
224         t, err := time.Parse(time.RFC1123, hdr)
225         if err != nil && hdr != "" {
226                 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
227                 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
228                 // as required by HTTP spec. If it's not a valid HTTP
229                 // header value, it's probably AWS (or s3test) giving
230                 // us a nearly-RFC1123 timestamp.
231                 t, err = time.Parse(nearlyRFC1123, hdr)
232         }
233         return t, err
234 }
235
236 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
237         nextMarker := ""
238         for {
239                 listResp, err := v.Bucket.List(prefix, "", nextMarker, v.indexPageSize)
240                 if err != nil {
241                         return err
242                 }
243                 for _, key := range listResp.Contents {
244                         t, err := time.Parse(time.RFC3339, key.LastModified)
245                         if err != nil {
246                                 return err
247                         }
248                         if !v.isKeepBlock(key.Key) {
249                                 continue
250                         }
251                         fmt.Fprintf(writer, "%s+%d %d\n", key.Key, key.Size, t.Unix())
252                 }
253                 if !listResp.IsTruncated {
254                         break
255                 }
256                 nextMarker = listResp.NextMarker
257         }
258         return nil
259 }
260
261 // Trash a Keep block.
262 func (v *S3Volume) Trash(loc string) error {
263         if v.readonly {
264                 return MethodDisabledError
265         }
266         if trashLifetime != 0 {
267                 return ErrNotImplemented
268         }
269         if t, err := v.Mtime(loc); err != nil {
270                 return err
271         } else if time.Since(t) < blobSignatureTTL {
272                 return nil
273         }
274         if !s3UnsafeDelete {
275                 return ErrS3DeleteNotAvailable
276         }
277         return v.Bucket.Del(loc)
278 }
279
280 // TBD
281 func (v *S3Volume) Untrash(loc string) error {
282         return ErrNotImplemented
283 }
284
285 func (v *S3Volume) Status() *VolumeStatus {
286         return &VolumeStatus{
287                 DeviceNum: 1,
288                 BytesFree: BlockSize * 1000,
289                 BytesUsed: 1,
290         }
291 }
292
293 func (v *S3Volume) String() string {
294         return fmt.Sprintf("s3-bucket:%+q", v.Bucket.Name)
295 }
296
297 func (v *S3Volume) Writable() bool {
298         return !v.readonly
299 }
300 func (v *S3Volume) Replication() int {
301         return v.replication
302 }
303
304 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
305
306 func (v *S3Volume) isKeepBlock(s string) bool {
307         return s3KeepBlockRegexp.MatchString(s)
308 }
309
310 func (v *S3Volume) translateError(err error) error {
311         switch err := err.(type) {
312         case *s3.Error:
313                 if err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey" {
314                         return os.ErrNotExist
315                 }
316                 // Other 404 errors like NoSuchVersion and
317                 // NoSuchBucket are different problems which should
318                 // get called out downstream, so we don't convert them
319                 // to os.ErrNotExist.
320         }
321         return err
322 }
323
324 // EmptyTrash looks for trashed blocks that exceeded trashLifetime
325 // and deletes them from the volume.
326 // TBD
327 func (v *S3Volume) EmptyTrash() {
328 }