15 "github.com/AdRoll/goamz/aws"
16 "github.com/AdRoll/goamz/s3"
20 ErrS3DeleteNotAvailable = fmt.Errorf("delete without -s3-unsafe-delete is not implemented")
22 s3AccessKeyFile string
23 s3SecretKeyFile string
33 maxClockSkew = 600 * time.Second
34 nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
37 type s3VolumeAdder struct {
41 func (s *s3VolumeAdder) Set(bucketName string) error {
42 if trashLifetime != 0 {
43 return ErrNotImplemented
46 return fmt.Errorf("no container name given")
48 if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
49 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
51 region, ok := aws.Regions[s3RegionName]
54 return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", s3RegionName)
58 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
59 "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", s3RegionName, s3Endpoint)
63 S3Endpoint: s3Endpoint,
68 auth.AccessKey, err = readKeyFromFile(s3AccessKeyFile)
72 auth.SecretKey, err = readKeyFromFile(s3SecretKeyFile)
77 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
79 v := NewS3Volume(auth, region, bucketName, flagReadonly, s3Replication)
80 if err := v.Check(); err != nil {
83 *s.volumeSet = append(*s.volumeSet, v)
87 func s3regions() (okList []string) {
88 for r, _ := range aws.Regions {
89 okList = append(okList, r)
95 flag.Var(&s3VolumeAdder{&volumes},
97 "Use the given bucket as a storage volume. Can be given multiple times.")
102 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
107 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
110 "s3-access-key-file",
112 "File containing the access key used for subsequent -s3-bucket-volume arguments.")
115 "s3-secret-key-file",
117 "File containing the secret key used for subsequent -s3-bucket-volume arguments.")
122 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
127 "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
130 type S3Volume struct {
137 // NewS3Volume returns a new S3Volume using the given auth, region,
138 // and bucket name. The replication argument specifies the replication
139 // level to report when writing data.
140 func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, readonly bool, replication int) *S3Volume {
143 S3: s3.New(auth, region),
147 replication: replication,
152 func (v *S3Volume) Check() error {
156 func (v *S3Volume) Get(loc string) ([]byte, error) {
157 rdr, err := v.Bucket.GetReader(loc)
159 return nil, v.translateError(err)
162 buf := bufs.Get(BlockSize)
163 n, err := io.ReadFull(rdr, buf)
165 case nil, io.EOF, io.ErrUnexpectedEOF:
169 return nil, v.translateError(err)
173 func (v *S3Volume) Compare(loc string, expect []byte) error {
174 rdr, err := v.Bucket.GetReader(loc)
176 return v.translateError(err)
179 return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
182 func (v *S3Volume) Put(loc string, block []byte) error {
184 return MethodDisabledError
188 md5, err := hex.DecodeString(loc)
192 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
194 return v.translateError(
196 loc, block, "application/octet-stream", s3ACL, opts))
199 func (v *S3Volume) Touch(loc string) error {
201 return MethodDisabledError
203 result, err := v.Bucket.PutCopy(loc, s3ACL, s3.CopyOptions{
204 ContentType: "application/octet-stream",
205 MetadataDirective: "REPLACE",
206 }, v.Bucket.Name+"/"+loc)
208 return v.translateError(err)
210 t, err := time.Parse(time.RFC3339, result.LastModified)
214 if time.Since(t) > maxClockSkew {
215 return fmt.Errorf("PutCopy returned old LastModified %s => %s (%s ago)", result.LastModified, t, time.Since(t))
220 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
221 resp, err := v.Bucket.Head(loc, nil)
223 return zeroTime, v.translateError(err)
225 hdr := resp.Header.Get("Last-Modified")
226 t, err := time.Parse(time.RFC1123, hdr)
227 if err != nil && hdr != "" {
228 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
229 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
230 // as required by HTTP spec. If it's not a valid HTTP
231 // header value, it's probably AWS (or s3test) giving
232 // us a nearly-RFC1123 timestamp.
233 t, err = time.Parse(nearlyRFC1123, hdr)
238 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
241 listResp, err := v.Bucket.List(prefix, "", nextMarker, v.indexPageSize)
245 for _, key := range listResp.Contents {
246 t, err := time.Parse(time.RFC3339, key.LastModified)
250 if !v.isKeepBlock(key.Key) {
253 fmt.Fprintf(writer, "%s+%d %d\n", key.Key, key.Size, t.Unix())
255 if !listResp.IsTruncated {
258 nextMarker = listResp.NextMarker
263 // Trash a Keep block.
264 func (v *S3Volume) Trash(loc string) error {
266 return MethodDisabledError
268 if trashLifetime != 0 {
269 return ErrNotImplemented
271 if t, err := v.Mtime(loc); err != nil {
273 } else if time.Since(t) < blobSignatureTTL {
277 return ErrS3DeleteNotAvailable
279 return v.Bucket.Del(loc)
283 func (v *S3Volume) Untrash(loc string) error {
284 return ErrNotImplemented
287 func (v *S3Volume) Status() *VolumeStatus {
288 return &VolumeStatus{
290 BytesFree: BlockSize * 1000,
295 func (v *S3Volume) String() string {
296 return fmt.Sprintf("s3-bucket:%+q", v.Bucket.Name)
299 func (v *S3Volume) Writable() bool {
302 func (v *S3Volume) Replication() int {
306 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
308 func (v *S3Volume) isKeepBlock(s string) bool {
309 return s3KeepBlockRegexp.MatchString(s)
312 func (v *S3Volume) translateError(err error) error {
313 switch err := err.(type) {
315 if err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey" {
316 return os.ErrNotExist
318 // Other 404 errors like NoSuchVersion and
319 // NoSuchBucket are different problems which should
320 // get called out downstream, so we don't convert them
321 // to os.ErrNotExist.
326 // EmptyTrash looks for trashed blocks that exceeded trashLifetime
327 // and deletes them from the volume.
329 func (v *S3Volume) EmptyTrash() {