15 "github.com/AdRoll/goamz/aws"
16 "github.com/AdRoll/goamz/s3"
20 ErrS3DeleteNotAvailable = fmt.Errorf("delete without -s3-unsafe-delete is not implemented")
22 s3AccessKeyFile string
23 s3SecretKeyFile string
33 maxClockSkew = 600 * time.Second
34 nearlyRFC1123 = "Mon, 2 Jan 2006 15:04:05 GMT"
37 type s3VolumeAdder struct {
41 func (s *s3VolumeAdder) Set(bucketName string) error {
42 if trashLifetime <= 0 {
43 log.Print("Missing required configuration parameter: trash-lifetime")
44 return ErrNotImplemented
47 return fmt.Errorf("no container name given")
49 if s3AccessKeyFile == "" || s3SecretKeyFile == "" {
50 return fmt.Errorf("-s3-access-key-file and -s3-secret-key-file arguments must given before -s3-bucket-volume")
52 region, ok := aws.Regions[s3RegionName]
55 return fmt.Errorf("unrecognized region %+q; try specifying -s3-endpoint instead", s3RegionName)
59 return fmt.Errorf("refusing to use AWS region name %+q with endpoint %+q; "+
60 "specify empty endpoint (\"-s3-endpoint=\") or use a different region name", s3RegionName, s3Endpoint)
64 S3Endpoint: s3Endpoint,
69 auth.AccessKey, err = readKeyFromFile(s3AccessKeyFile)
73 auth.SecretKey, err = readKeyFromFile(s3SecretKeyFile)
78 log.Print("Notice: -serialize is not supported by s3-bucket volumes.")
80 v := NewS3Volume(auth, region, bucketName, flagReadonly, s3Replication)
81 if err := v.Check(); err != nil {
84 *s.volumeSet = append(*s.volumeSet, v)
88 func s3regions() (okList []string) {
89 for r, _ := range aws.Regions {
90 okList = append(okList, r)
96 flag.Var(&s3VolumeAdder{&volumes},
98 "Use the given bucket as a storage volume. Can be given multiple times.")
103 fmt.Sprintf("AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are %+q.", s3regions()))
108 "Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use \"https://storage.googleapis.com\".")
111 "s3-access-key-file",
113 "File containing the access key used for subsequent -s3-bucket-volume arguments.")
116 "s3-secret-key-file",
118 "File containing the secret key used for subsequent -s3-bucket-volume arguments.")
123 "Replication level reported to clients for subsequent -s3-bucket-volume arguments.")
128 "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
131 type S3Volume struct {
138 // NewS3Volume returns a new S3Volume using the given auth, region,
139 // and bucket name. The replication argument specifies the replication
140 // level to report when writing data.
141 func NewS3Volume(auth aws.Auth, region aws.Region, bucket string, readonly bool, replication int) *S3Volume {
144 S3: s3.New(auth, region),
148 replication: replication,
153 func (v *S3Volume) Check() error {
157 func (v *S3Volume) Get(loc string) ([]byte, error) {
158 rdr, err := v.Bucket.GetReader(loc)
160 return nil, v.translateError(err)
163 buf := bufs.Get(BlockSize)
164 n, err := io.ReadFull(rdr, buf)
166 case nil, io.EOF, io.ErrUnexpectedEOF:
170 return nil, v.translateError(err)
174 func (v *S3Volume) Compare(loc string, expect []byte) error {
175 rdr, err := v.Bucket.GetReader(loc)
177 return v.translateError(err)
180 return v.translateError(compareReaderWithBuf(rdr, expect, loc[:32]))
183 func (v *S3Volume) Put(loc string, block []byte) error {
185 return MethodDisabledError
189 md5, err := hex.DecodeString(loc)
193 opts.ContentMD5 = base64.StdEncoding.EncodeToString(md5)
195 return v.translateError(
197 loc, block, "application/octet-stream", s3ACL, opts))
200 func (v *S3Volume) Touch(loc string) error {
202 return MethodDisabledError
204 result, err := v.Bucket.PutCopy(loc, s3ACL, s3.CopyOptions{
205 ContentType: "application/octet-stream",
206 MetadataDirective: "REPLACE",
207 }, v.Bucket.Name+"/"+loc)
209 return v.translateError(err)
211 t, err := time.Parse(time.RFC3339, result.LastModified)
215 if time.Since(t) > maxClockSkew {
216 return fmt.Errorf("PutCopy returned old LastModified %s => %s (%s ago)", result.LastModified, t, time.Since(t))
221 func (v *S3Volume) Mtime(loc string) (time.Time, error) {
222 resp, err := v.Bucket.Head(loc, nil)
224 return zeroTime, v.translateError(err)
226 hdr := resp.Header.Get("Last-Modified")
227 t, err := time.Parse(time.RFC1123, hdr)
228 if err != nil && hdr != "" {
229 // AWS example is "Sun, 1 Jan 2006 12:00:00 GMT",
230 // which isn't quite "Sun, 01 Jan 2006 12:00:00 GMT"
231 // as required by HTTP spec. If it's not a valid HTTP
232 // header value, it's probably AWS (or s3test) giving
233 // us a nearly-RFC1123 timestamp.
234 t, err = time.Parse(nearlyRFC1123, hdr)
239 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
242 listResp, err := v.Bucket.List(prefix, "", nextMarker, v.indexPageSize)
246 for _, key := range listResp.Contents {
247 t, err := time.Parse(time.RFC3339, key.LastModified)
251 if !v.isKeepBlock(key.Key) {
254 fmt.Fprintf(writer, "%s+%d %d\n", key.Key, key.Size, t.Unix())
256 if !listResp.IsTruncated {
259 nextMarker = listResp.NextMarker
264 func (v *S3Volume) Trash(loc string) error {
266 return MethodDisabledError
268 if t, err := v.Mtime(loc); err != nil {
270 } else if time.Since(t) < blobSignatureTTL {
274 return ErrS3DeleteNotAvailable
276 return v.Bucket.Del(loc)
280 func (v *S3Volume) Untrash(loc string) error {
284 func (v *S3Volume) Status() *VolumeStatus {
285 return &VolumeStatus{
287 BytesFree: BlockSize * 1000,
292 func (v *S3Volume) String() string {
293 return fmt.Sprintf("s3-bucket:%+q", v.Bucket.Name)
296 func (v *S3Volume) Writable() bool {
299 func (v *S3Volume) Replication() int {
303 var s3KeepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
305 func (v *S3Volume) isKeepBlock(s string) bool {
306 return s3KeepBlockRegexp.MatchString(s)
309 func (v *S3Volume) translateError(err error) error {
310 switch err := err.(type) {
312 if err.StatusCode == http.StatusNotFound && err.Code == "NoSuchKey" {
313 return os.ErrNotExist
315 // Other 404 errors like NoSuchVersion and
316 // NoSuchBucket are different problems which should
317 // get called out downstream, so we don't convert them
318 // to os.ErrNotExist.