15 "github.com/AdRoll/goamz/aws"
16 "github.com/AdRoll/goamz/s3"
20 ErrS3DeleteNotAvailable = fmt.Errorf("delete without -s3-unsafe-delete is not implemented")
22 s3AccessKeyFile string
23 s3SecretKeyFile string
33 maxClockSkew = 600 * time.Second
34 nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
37 type s3VolumeAdder struct {
41 func (s *s3VolumeAdder) Set(bucketName string) error {
43 return fmt.Errorf("no container name given")
45 if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
46 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
48 region, ok := aws.Regions[s3RegionName]
51 return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", s3RegionName)
55 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
56 "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", s3RegionName, s3Endpoint)
60 S3Endpoint: s3Endpoint,
65 auth.AccessKey, err = readKeyFromFile(s3AccessKeyFile)
69 auth.SecretKey, err = readKeyFromFile(s3SecretKeyFile)
74 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
76 v := NewS3Volume(auth, region, bucketName, flagReadonly, s3Replication)
77 if err := v.Check(); err != nil {
80 *s.volumeSet = append(*s.volumeSet, v)
84 func s3regions() (okList []string) {
85 for r, _ := range aws.Regions {
86 okList = append(okList, r)
92 flag.Var(&s3VolumeAdder{&volumes},
94 "Use the given bucket as a storage volume. Can be given multiple times.")
99 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
104 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
107 "s3-access-key-file",
109 "File containing the access key used for subsequent -s3-bucket-volume arguments.")
112 "s3-secret-key-file",
114 "File containing the secret key used for subsequent -s3-bucket-volume arguments.")
119 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
124 "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
127 type S3Volume struct {
134 // NewS3Volume returns a new S3Volume using the given auth, region,
135 // and bucket name. The replication argument specifies the replication
136 // level to report when writing data.
137 func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, readonly bool, replication int) *S3Volume {
140 S3: s3.New(auth, region),
144 replication: replication,
149 func (v *S3Volume) Check() error {
153 func (v *S3Volume) Get(loc string) ([]byte, error) {
154 rdr, err := v.Bucket.GetReader(loc)
156 return nil, v.translateError(err)
159 buf := bufs.Get(BlockSize)
160 n, err := io.ReadFull(rdr, buf)
162 case nil, io.EOF, io.ErrUnexpectedEOF:
166 return nil, v.translateError(err)
170 func (v *S3Volume) Compare(loc string, expect []byte) error {
171 rdr, err := v.Bucket.GetReader(loc)
173 return v.translateError(err)
176 return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
179 func (v *S3Volume) Put(loc string, block []byte) error {
181 return MethodDisabledError
185 md5, err := hex.DecodeString(loc)
189 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
191 return v.translateError(
193 loc, block, "application/octet-stream", s3ACL, opts))
196 func (v *S3Volume) Touch(loc string) error {
198 return MethodDisabledError
200 result, err := v.Bucket.PutCopy(loc, s3ACL, s3.CopyOptions{
201 ContentType: "application/octet-stream",
202 MetadataDirective: "REPLACE",
203 }, v.Bucket.Name+"/"+loc)
205 return v.translateError(err)
207 t, err := time.Parse(time.RFC3339, result.LastModified)
211 if time.Since(t) > maxClockSkew {
212 return fmt.Errorf("PutCopy returned old LastModified %s => %s (%s ago)", result.LastModified, t, time.Since(t))
217 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
218 resp, err := v.Bucket.Head(loc, nil)
220 return zeroTime, v.translateError(err)
222 hdr := resp.Header.Get("Last-Modified")
223 t, err := time.Parse(time.RFC1123, hdr)
224 if err != nil && hdr != "" {
225 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
226 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
227 // as required by HTTP spec. If it's not a valid HTTP
228 // header value, it's probably AWS (or s3test) giving
229 // us a nearly-RFC1123 timestamp.
230 t, err = time.Parse(nearlyRFC1123, hdr)
235 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
238 listResp, err := v.Bucket.List(prefix, "", nextMarker, v.indexPageSize)
242 for _, key := range listResp.Contents {
243 t, err := time.Parse(time.RFC3339, key.LastModified)
247 if !v.isKeepBlock(key.Key) {
250 fmt.Fprintf(writer, "%s+%d %d\n", key.Key, key.Size, t.Unix())
252 if !listResp.IsTruncated {
255 nextMarker = listResp.NextMarker
260 func (v *S3Volume) Trash(loc string) error {
262 return MethodDisabledError
264 if t, err := v.Mtime(loc); err != nil {
266 } else if time.Since(t) < blobSignatureTTL {
270 return ErrS3DeleteNotAvailable
272 return v.Bucket.Del(loc)
276 func (v *S3Volume) Untrash(loc string) error {
280 func (v *S3Volume) Status() *VolumeStatus {
281 return &VolumeStatus{
283 BytesFree: BlockSize * 1000,
288 func (v *S3Volume) String() string {
289 return fmt.Sprintf("s3-bucket:%+q", v.Bucket.Name)
292 func (v *S3Volume) Writable() bool {
295 func (v *S3Volume) Replication() int {
299 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
301 func (v *S3Volume) isKeepBlock(s string) bool {
302 return s3KeepBlockRegexp.MatchString(s)
305 func (v *S3Volume) translateError(err error) error {
306 switch err := err.(type) {
308 if err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey" {
309 return os.ErrNotExist
311 // Other 404 errors like NoSuchVersion and
312 // NoSuchBucket are different problems which should
313 // get called out downstream, so we don't convert them
314 // to os.ErrNotExist.