Merge branch 'master' into 7167-keep-rsync-test-setup
[arvados.git] / services / keepstore / azure_blob_volume.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         "flag"
7         "fmt"
8         "io"
9         "io/ioutil"
10         "log"
11         "os"
12         "strings"
13         "time"
14
15         "github.com/curoverse/azure-sdk-for-go/storage"
16 )
17
18 var (
19         azureStorageAccountName    string
20         azureStorageAccountKeyFile string
21         azureStorageReplication    int
22 )
23
24 func readKeyFromFile(file string) (string, error) {
25         buf, err := ioutil.ReadFile(file)
26         if err != nil {
27                 return "", errors.New("reading key from " + file + ": " + err.Error())
28         }
29         accountKey := strings.TrimSpace(string(buf))
30         if accountKey == "" {
31                 return "", errors.New("empty account key in " + file)
32         }
33         return accountKey, nil
34 }
35
36 type azureVolumeAdder struct {
37         *volumeSet
38 }
39
40 func (s *azureVolumeAdder) Set(containerName string) error {
41         if containerName == "" {
42                 return errors.New("no container name given")
43         }
44         if azureStorageAccountName == "" || azureStorageAccountKeyFile == "" {
45                 return errors.New("-azure-storage-account-name and -azure-storage-account-key-file arguments must given before -azure-storage-container-volume")
46         }
47         accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
48         if err != nil {
49                 return err
50         }
51         azClient, err := storage.NewBasicClient(azureStorageAccountName, accountKey)
52         if err != nil {
53                 return errors.New("creating Azure storage client: " + err.Error())
54         }
55         if flagSerializeIO {
56                 log.Print("Notice: -serialize is not supported by azure-blob-container volumes.")
57         }
58         v := NewAzureBlobVolume(azClient, containerName, flagReadonly, azureStorageReplication)
59         if err := v.Check(); err != nil {
60                 return err
61         }
62         *s.volumeSet = append(*s.volumeSet, v)
63         return nil
64 }
65
66 func init() {
67         flag.Var(&azureVolumeAdder{&volumes},
68                 "azure-storage-container-volume",
69                 "Use the given container as a storage volume. Can be given multiple times.")
70         flag.StringVar(
71                 &azureStorageAccountName,
72                 "azure-storage-account-name",
73                 "",
74                 "Azure storage account name used for subsequent --azure-storage-container-volume arguments.")
75         flag.StringVar(
76                 &azureStorageAccountKeyFile,
77                 "azure-storage-account-key-file",
78                 "",
79                 "File containing the account key used for subsequent --azure-storage-container-volume arguments.")
80         flag.IntVar(
81                 &azureStorageReplication,
82                 "azure-storage-replication",
83                 3,
84                 "Replication level to report to clients when data is stored in an Azure container.")
85 }
86
87 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
88 // container.
89 type AzureBlobVolume struct {
90         azClient      storage.Client
91         bsClient      storage.BlobStorageClient
92         containerName string
93         readonly      bool
94         replication   int
95 }
96
97 func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool, replication int) *AzureBlobVolume {
98         return &AzureBlobVolume{
99                 azClient:      client,
100                 bsClient:      client.GetBlobService(),
101                 containerName: containerName,
102                 readonly:      readonly,
103                 replication:   replication,
104         }
105 }
106
107 // Check returns nil if the volume is usable.
108 func (v *AzureBlobVolume) Check() error {
109         ok, err := v.bsClient.ContainerExists(v.containerName)
110         if err != nil {
111                 return err
112         }
113         if !ok {
114                 return errors.New("container does not exist")
115         }
116         return nil
117 }
118
119 func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
120         rdr, err := v.bsClient.GetBlob(v.containerName, loc)
121         if err != nil {
122                 if strings.Contains(err.Error(), "404 Not Found") {
123                         // "storage: service returned without a response body (404 Not Found)"
124                         return nil, os.ErrNotExist
125                 }
126                 return nil, err
127         }
128         switch err := err.(type) {
129         case nil:
130         default:
131                 log.Printf("ERROR IN Get(): %T %#v", err, err)
132                 return nil, err
133         }
134         defer rdr.Close()
135         buf := bufs.Get(BlockSize)
136         n, err := io.ReadFull(rdr, buf)
137         switch err {
138         case io.EOF, io.ErrUnexpectedEOF:
139                 return buf[:n], nil
140         default:
141                 bufs.Put(buf)
142                 return nil, err
143         }
144 }
145
146 func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
147         rdr, err := v.bsClient.GetBlob(v.containerName, loc)
148         if err != nil {
149                 return err
150         }
151         defer rdr.Close()
152         return compareReaderWithBuf(rdr, expect, loc[:32])
153 }
154
155 func (v *AzureBlobVolume) Put(loc string, block []byte) error {
156         if v.readonly {
157                 return MethodDisabledError
158         }
159         return v.bsClient.CreateBlockBlobFromReader(v.containerName, loc, uint64(len(block)), bytes.NewReader(block))
160 }
161
162 func (v *AzureBlobVolume) Touch(loc string) error {
163         if v.readonly {
164                 return MethodDisabledError
165         }
166         return v.bsClient.SetBlobMetadata(v.containerName, loc, map[string]string{
167                 "touch": fmt.Sprintf("%d", time.Now()),
168         })
169 }
170
171 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
172         props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
173         if err != nil {
174                 return time.Time{}, err
175         }
176         return time.Parse(time.RFC1123, props.LastModified)
177 }
178
179 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
180         params := storage.ListBlobsParameters{
181                 Prefix: prefix,
182         }
183         for {
184                 resp, err := v.bsClient.ListBlobs(v.containerName, params)
185                 if err != nil {
186                         return err
187                 }
188                 for _, b := range resp.Blobs {
189                         t, err := time.Parse(time.RFC1123, b.Properties.LastModified)
190                         if err != nil {
191                                 return err
192                         }
193                         fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.Unix())
194                 }
195                 if resp.NextMarker == "" {
196                         return nil
197                 }
198                 params.Marker = resp.NextMarker
199         }
200 }
201
202 func (v *AzureBlobVolume) Delete(loc string) error {
203         if v.readonly {
204                 return MethodDisabledError
205         }
206         // Ideally we would use If-Unmodified-Since, but that
207         // particular condition seems to be ignored by Azure. Instead,
208         // we get the Etag before checking Mtime, and use If-Match to
209         // ensure we don't delete data if Put() or Touch() happens
210         // between our calls to Mtime() and DeleteBlob().
211         props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
212         if err != nil {
213                 return err
214         }
215         if t, err := v.Mtime(loc); err != nil {
216                 return err
217         } else if time.Since(t) < blobSignatureTTL {
218                 return nil
219         }
220         return v.bsClient.DeleteBlob(v.containerName, loc, map[string]string{
221                 "If-Match": props.Etag,
222         })
223 }
224
225 func (v *AzureBlobVolume) Status() *VolumeStatus {
226         return &VolumeStatus{
227                 DeviceNum: 1,
228                 BytesFree: BlockSize * 1000,
229                 BytesUsed: 1,
230         }
231 }
232
233 func (v *AzureBlobVolume) String() string {
234         return fmt.Sprintf("azure-storage-container:%+q", v.containerName)
235 }
236
237 func (v *AzureBlobVolume) Writable() bool {
238         return !v.readonly
239 }
240
241 func (v *AzureBlobVolume) Replication() int {
242         return v.replication
243 }