14 "github.com/Azure/azure-sdk-for-go/storage"
18 azureStorageAccountName string
19 azureStorageAccountKeyFile string
20 azureStorageReplication int
23 func readKeyFromFile(file string) (string, error) {
24 buf, err := ioutil.ReadFile(file)
26 return "", errors.New("reading key from " + file + ": " + err.Error())
28 accountKey := strings.TrimSpace(string(buf))
30 return "", errors.New("empty account key in " + file)
32 return accountKey, nil
35 type azureVolumeAdder struct {
39 func (s *azureVolumeAdder) Set(containerName string) error {
40 if containerName == "" {
41 return errors.New("no container name given")
43 accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
47 azClient, err := storage.NewBasicClient(azureStorageAccountName, accountKey)
49 return errors.New("creating Azure storage client: " + err.Error())
52 log.Print("Notice: -serialize is not supported by azure-blob-container volumes.")
54 v := NewAzureBlobVolume(azClient, containerName, flagReadonly, azureStorageReplication)
55 if err := v.Check(); err != nil {
58 *s.volumeSet = append(*s.volumeSet, v)
63 flag.Var(&azureVolumeAdder{&volumes},
64 "azure-storage-container-volume",
65 "Use the given container as a storage volume. Can be given multiple times.")
67 &azureStorageAccountName,
68 "azure-storage-account-name",
70 "Azure storage account name used for subsequent --azure-storage-container-volume arguments.")
72 &azureStorageAccountKeyFile,
73 "azure-storage-account-key-file",
75 "File containing the account key used for subsequent --azure-storage-container-volume arguments.")
77 &azureStorageReplication,
78 "azure-storage-replication",
80 "Replication level to report to clients when data is stored in an Azure container.")
83 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
85 type AzureBlobVolume struct {
86 azClient storage.Client
87 bsClient storage.BlobStorageClient
93 func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool, replication int) *AzureBlobVolume {
94 return &AzureBlobVolume{
96 bsClient: client.GetBlobService(),
97 containerName: containerName,
99 replication: replication,
103 // Check returns nil if the volume is usable.
104 func (v *AzureBlobVolume) Check() error {
105 ok, err := v.bsClient.ContainerExists(v.containerName)
110 return errors.New("container does not exist")
115 func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
116 rdr, err := v.bsClient.GetBlob(v.containerName, loc)
118 if strings.Contains(err.Error(), "404 Not Found") {
119 // "storage: service returned without a response body (404 Not Found)"
120 return nil, os.ErrNotExist
124 switch err := err.(type) {
127 log.Printf("ERROR IN Get(): %T %#v", err, err)
131 buf := bufs.Get(BlockSize)
132 n, err := io.ReadFull(rdr, buf)
134 case io.EOF, io.ErrUnexpectedEOF:
142 func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
143 rdr, err := v.bsClient.GetBlob(v.containerName, loc)
148 return compareReaderWithBuf(rdr, expect, loc[:32])
151 func (v *AzureBlobVolume) Put(loc string, block []byte) error {
153 return MethodDisabledError
155 if err := v.bsClient.CreateBlockBlob(v.containerName, loc); err != nil {
158 // We use the same block ID, base64("0")=="MA==", for everything.
159 if err := v.bsClient.PutBlock(v.containerName, loc, "MA==", block); err != nil {
162 return v.bsClient.PutBlockList(v.containerName, loc, []storage.Block{{"MA==", storage.BlockStatusUncommitted}})
165 func (v *AzureBlobVolume) Touch(loc string) error {
167 return MethodDisabledError
169 if exists, err := v.bsClient.BlobExists(v.containerName, loc); err != nil {
172 return os.ErrNotExist
174 return v.bsClient.PutBlockList(v.containerName, loc, []storage.Block{{"MA==", storage.BlockStatusCommitted}})
177 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
178 props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
180 return time.Time{}, err
182 return time.Parse(time.RFC1123, props.LastModified)
185 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
186 params := storage.ListBlobsParameters{
190 resp, err := v.bsClient.ListBlobs(v.containerName, params)
194 for _, b := range resp.Blobs {
195 t, err := time.Parse(time.RFC1123, b.Properties.LastModified)
199 fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.Unix())
201 if resp.NextMarker == "" {
204 params.Marker = resp.NextMarker
208 func (v *AzureBlobVolume) Delete(loc string) error {
209 // TODO: Use leases to handle races with Touch and Put.
211 return MethodDisabledError
213 if t, err := v.Mtime(loc); err != nil {
215 } else if time.Since(t) < blobSignatureTTL {
218 return v.bsClient.DeleteBlob(v.containerName, loc)
221 func (v *AzureBlobVolume) Status() *VolumeStatus {
222 return &VolumeStatus{
224 BytesFree: BlockSize * 1000,
229 func (v *AzureBlobVolume) String() string {
230 return fmt.Sprintf("azure-storage-container:%+q", v.containerName)
233 func (v *AzureBlobVolume) Writable() bool {
237 func (v *AzureBlobVolume) Replication() int {