16 "github.com/AdRoll/goamz/aws"
17 "github.com/AdRoll/goamz/s3"
21 ErrS3DeleteNotAvailable = fmt.Errorf("delete without -s3-unsafe-delete is not implemented")
23 s3AccessKeyFile string
24 s3SecretKeyFile string
34 maxClockSkew = 600 * time.Second
35 nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
38 type s3VolumeAdder struct {
42 func (s *s3VolumeAdder) Set(bucketName string) error {
43 if trashLifetime != 0 {
44 return ErrNotImplemented
47 return fmt.Errorf("no container name given")
49 if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
50 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
52 region, ok := aws.Regions[s3RegionName]
55 return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", s3RegionName)
59 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
60 "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", s3RegionName, s3Endpoint)
64 S3Endpoint: s3Endpoint,
69 auth.AccessKey, err = readKeyFromFile(s3AccessKeyFile)
73 auth.SecretKey, err = readKeyFromFile(s3SecretKeyFile)
78 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
80 v := NewS3Volume(auth, region, bucketName, flagReadonly, s3Replication)
81 if err := v.Check(); err != nil {
84 *s.volumeSet = append(*s.volumeSet, v)
88 func s3regions() (okList []string) {
89 for r, _ := range aws.Regions {
90 okList = append(okList, r)
96 flag.Var(&s3VolumeAdder{&volumes},
98 "Use the given bucket as a storage volume. Can be given multiple times.")
103 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
108 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
111 "s3-access-key-file",
113 "File containing the access key used for subsequent -s3-bucket-volume arguments.")
116 "s3-secret-key-file",
118 "File containing the secret key used for subsequent -s3-bucket-volume arguments.")
123 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
128 "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
131 type S3Volume struct {
138 // NewS3Volume returns a new S3Volume using the given auth, region,
139 // and bucket name. The replication argument specifies the replication
140 // level to report when writing data.
141 func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, readonly bool, replication int) *S3Volume {
144 S3: s3.New(auth, region),
148 replication: replication,
153 func (v *S3Volume) Check() error {
157 func (v *S3Volume) Get(loc string, buf []byte) (int, error) {
158 rdr, err := v.Bucket.GetReader(loc)
160 return 0, v.translateError(err)
163 n, err := io.ReadFull(rdr, buf)
165 case nil, io.EOF, io.ErrUnexpectedEOF:
168 return 0, v.translateError(err)
172 func (v *S3Volume) Compare(loc string, expect []byte) error {
173 rdr, err := v.Bucket.GetReader(loc)
175 return v.translateError(err)
178 return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
181 func (v *S3Volume) Put(loc string, block []byte) error {
183 return MethodDisabledError
187 md5, err := hex.DecodeString(loc)
191 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
193 return v.translateError(
195 loc, block, "application/octet-stream", s3ACL, opts))
198 func (v *S3Volume) Touch(loc string) error {
200 return MethodDisabledError
202 result, err := v.Bucket.PutCopy(loc, s3ACL, s3.CopyOptions{
203 ContentType: "application/octet-stream",
204 MetadataDirective: "REPLACE",
205 }, v.Bucket.Name+"/"+loc)
207 return v.translateError(err)
209 t, err := time.Parse(time.RFC3339, result.LastModified)
213 if time.Since(t) > maxClockSkew {
214 return fmt.Errorf("PutCopy returned old LastModified %s => %s (%s ago)", result.LastModified, t, time.Since(t))
219 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
220 resp, err := v.Bucket.Head(loc, nil)
222 return zeroTime, v.translateError(err)
224 hdr := resp.Header.Get("Last-Modified")
225 t, err := time.Parse(time.RFC1123, hdr)
226 if err != nil && hdr != "" {
227 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
228 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
229 // as required by HTTP spec. If it's not a valid HTTP
230 // header value, it's probably AWS (or s3test) giving
231 // us a nearly-RFC1123 timestamp.
232 t, err = time.Parse(nearlyRFC1123, hdr)
237 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
240 listResp, err := v.Bucket.List(prefix, "", nextMarker, v.indexPageSize)
244 for _, key := range listResp.Contents {
245 t, err := time.Parse(time.RFC3339, key.LastModified)
249 if !v.isKeepBlock(key.Key) {
252 fmt.Fprintf(writer, "%s+%d %d\n", key.Key, key.Size, t.Unix())
254 if !listResp.IsTruncated {
257 nextMarker = listResp.NextMarker
262 // Trash a Keep block.
263 func (v *S3Volume) Trash(loc string) error {
265 return MethodDisabledError
267 if trashLifetime != 0 {
268 return ErrNotImplemented
270 if t, err := v.Mtime(loc); err != nil {
272 } else if time.Since(t) < blobSignatureTTL {
276 return ErrS3DeleteNotAvailable
278 return v.Bucket.Del(loc)
282 func (v *S3Volume) Untrash(loc string) error {
283 return ErrNotImplemented
286 func (v *S3Volume) Status() *VolumeStatus {
287 return &VolumeStatus{
289 BytesFree: BlockSize * 1000,
294 func (v *S3Volume) String() string {
295 return fmt.Sprintf("s3-bucket:%+q", v.Bucket.Name)
298 func (v *S3Volume) Writable() bool {
301 func (v *S3Volume) Replication() int {
305 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
307 func (v *S3Volume) isKeepBlock(s string) bool {
308 return s3KeepBlockRegexp.MatchString(s)
311 func (v *S3Volume) translateError(err error) error {
312 switch err := err.(type) {
314 if (err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey") ||
315 strings.Contains(err.Error(), "Not Found") {
316 return os.ErrNotExist
318 // Other 404 errors like NoSuchVersion and
319 // NoSuchBucket are different problems which should
320 // get called out downstream, so we don't convert them
321 // to os.ErrNotExist.
326 // EmptyTrash looks for trashed blocks that exceeded trashLifetime
327 // and deletes them from the volume.
329 func (v *S3Volume) EmptyTrash() {