7241: Add -azure-storage-replication flag.
[arvados.git] / services / keepstore / azure_blob_volume.go
1 package main
2
3 import (
4         "errors"
5         "flag"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "log"
10         "os"
11         "strings"
12         "time"
13
14         "github.com/Azure/azure-sdk-for-go/storage"
15 )
16
17 var (
18         azureStorageAccountName    string
19         azureStorageAccountKeyFile string
20         azureStorageReplication    int
21 )
22
23 func readKeyFromFile(file string) (string, error) {
24         buf, err := ioutil.ReadFile(file)
25         if err != nil {
26                 return "", errors.New("reading key from " + file + ": " + err.Error())
27         }
28         accountKey := strings.TrimSpace(string(buf))
29         if accountKey == "" {
30                 return "", errors.New("empty account key in " + file)
31         }
32         return accountKey, nil
33 }
34
35 type azureVolumeAdder struct {
36         *volumeSet
37 }
38
39 func (s *azureVolumeAdder) Set(containerName string) error {
40         if containerName == "" {
41                 return errors.New("no container name given")
42         }
43         accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
44         if err != nil {
45                 return err
46         }
47         azClient, err := storage.NewBasicClient(azureStorageAccountName, accountKey)
48         if err != nil {
49                 return errors.New("creating Azure storage client: " + err.Error())
50         }
51         if flagSerializeIO {
52                 log.Print("Notice: -serialize is not supported by azure-blob-container volumes.")
53         }
54         v := NewAzureBlobVolume(azClient, containerName, flagReadonly, azureStorageReplication)
55         if err := v.Check(); err != nil {
56                 return err
57         }
58         *s.volumeSet = append(*s.volumeSet, v)
59         return nil
60 }
61
62 func init() {
63         flag.Var(&azureVolumeAdder{&volumes},
64                 "azure-storage-container-volume",
65                 "Use the given container as a storage volume. Can be given multiple times.")
66         flag.StringVar(
67                 &azureStorageAccountName,
68                 "azure-storage-account-name",
69                 "",
70                 "Azure storage account name used for subsequent --azure-storage-container-volume arguments.")
71         flag.StringVar(
72                 &azureStorageAccountKeyFile,
73                 "azure-storage-account-key-file",
74                 "",
75                 "File containing the account key used for subsequent --azure-storage-container-volume arguments.")
76         flag.IntVar(
77                 &azureStorageReplication,
78                 "azure-storage-replication",
79                 3,
80                 "Replication level to report to clients when data is stored in an Azure container.")
81 }
82
83 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
84 // container.
85 type AzureBlobVolume struct {
86         azClient      storage.Client
87         bsClient      storage.BlobStorageClient
88         containerName string
89         readonly      bool
90         replication   int
91 }
92
93 func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool, replication int) *AzureBlobVolume {
94         return &AzureBlobVolume{
95                 azClient: client,
96                 bsClient: client.GetBlobService(),
97                 containerName: containerName,
98                 readonly: readonly,
99                 replication: replication,
100         }
101 }
102
103 // Check returns nil if the volume is usable.
104 func (v *AzureBlobVolume) Check() error {
105         ok, err := v.bsClient.ContainerExists(v.containerName)
106         if err != nil {
107                 return err
108         }
109         if !ok {
110                 return errors.New("container does not exist")
111         }
112         return nil
113 }
114
115 func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
116         rdr, err := v.bsClient.GetBlob(v.containerName, loc)
117         if err != nil {
118                 if strings.Contains(err.Error(), "404 Not Found") {
119                         // "storage: service returned without a response body (404 Not Found)"
120                         return nil, os.ErrNotExist
121                 }
122                 return nil, err
123         }
124         switch err := err.(type) {
125         case nil:
126         default:
127                 log.Printf("ERROR IN Get(): %T %#v", err, err)
128                 return nil, err
129         }
130         defer rdr.Close()
131         buf := bufs.Get(BlockSize)
132         n, err := io.ReadFull(rdr, buf)
133         switch err {
134         case io.EOF, io.ErrUnexpectedEOF:
135                 return buf[:n], nil
136         default:
137                 bufs.Put(buf)
138                 return nil, err
139         }
140 }
141
142 func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
143         rdr, err := v.bsClient.GetBlob(v.containerName, loc)
144         if err != nil {
145                 return err
146         }
147         defer rdr.Close()
148         return compareReaderWithBuf(rdr, expect, loc[:32])
149 }
150
151 func (v *AzureBlobVolume) Put(loc string, block []byte) error {
152         if v.readonly {
153                 return MethodDisabledError
154         }
155         if err := v.bsClient.CreateBlockBlob(v.containerName, loc); err != nil {
156                 return err
157         }
158         // We use the same block ID, base64("0")=="MA==", for everything.
159         if err := v.bsClient.PutBlock(v.containerName, loc, "MA==", block); err != nil {
160                 return err
161         }
162         return v.bsClient.PutBlockList(v.containerName, loc, []storage.Block{{"MA==", storage.BlockStatusUncommitted}})
163 }
164
165 func (v *AzureBlobVolume) Touch(loc string) error {
166         if v.readonly {
167                 return MethodDisabledError
168         }
169         if exists, err := v.bsClient.BlobExists(v.containerName, loc); err != nil {
170                 return err
171         } else if !exists {
172                 return os.ErrNotExist
173         }
174         return v.bsClient.PutBlockList(v.containerName, loc, []storage.Block{{"MA==", storage.BlockStatusCommitted}})
175 }
176
177 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
178         props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
179         if err != nil {
180                 return time.Time{}, err
181         }
182         return time.Parse(time.RFC1123, props.LastModified)
183 }
184
185 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
186         params := storage.ListBlobsParameters{
187                 Prefix: prefix,
188         }
189         for {
190                 resp, err := v.bsClient.ListBlobs(v.containerName, params)
191                 if err != nil {
192                         return err
193                 }
194                 for _, b := range resp.Blobs {
195                         t, err := time.Parse(time.RFC1123, b.Properties.LastModified)
196                         if err != nil {
197                                 return err
198                         }
199                         fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.Unix())
200                 }
201                 if resp.NextMarker == "" {
202                         return nil
203                 }
204                 params.Marker = resp.NextMarker
205         }
206 }
207
208 func (v *AzureBlobVolume) Delete(loc string) error {
209         // TODO: Use leases to handle races with Touch and Put.
210         if v.readonly {
211                 return MethodDisabledError
212         }
213         if t, err := v.Mtime(loc); err != nil {
214                 return err
215         } else if time.Since(t) < blobSignatureTTL {
216                 return nil
217         }
218         return v.bsClient.DeleteBlob(v.containerName, loc)
219 }
220
221 func (v *AzureBlobVolume) Status() *VolumeStatus {
222         return &VolumeStatus{
223                 DeviceNum: 1,
224                 BytesFree: BlockSize * 1000,
225                 BytesUsed: 1,
226         }
227 }
228
229 func (v *AzureBlobVolume) String() string {
230         return fmt.Sprintf("azure-storage-container:%+q", v.containerName)
231 }
232
233 func (v *AzureBlobVolume) Writable() bool {
234         return !v.readonly
235 }
236
237 func (v *AzureBlobVolume) Replication() int {
238         return v.replication
239 }