b3b761324094ed9234f5b60df39c915be382c305
[arvados.git] / services / keepstore / azure_blob_volume.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         "flag"
7         "fmt"
8         "io"
9         "io/ioutil"
10         "log"
11         "os"
12         "regexp"
13         "strings"
14         "time"
15
16         "github.com/curoverse/azure-sdk-for-go/storage"
17 )
18
19 var (
20         azureStorageAccountName    string
21         azureStorageAccountKeyFile string
22         azureStorageReplication    int
23         azureWriteRaceInterval     time.Duration = 15 * time.Second
24         azureWriteRacePollTime     time.Duration = time.Second
25 )
26
27 func readKeyFromFile(file string) (string, error) {
28         buf, err := ioutil.ReadFile(file)
29         if err != nil {
30                 return "", errors.New("reading key from " + file + ": " + err.Error())
31         }
32         accountKey := strings.TrimSpace(string(buf))
33         if accountKey == "" {
34                 return "", errors.New("empty account key in " + file)
35         }
36         return accountKey, nil
37 }
38
39 type azureVolumeAdder struct {
40         *volumeSet
41 }
42
43 func (s *azureVolumeAdder) Set(containerName string) error {
44         if containerName == "" {
45                 return errors.New("no container name given")
46         }
47         if azureStorageAccountName == "" || azureStorageAccountKeyFile == "" {
48                 return errors.New("-azure-storage-account-name and -azure-storage-account-key-file arguments must given before -azure-storage-container-volume")
49         }
50         accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
51         if err != nil {
52                 return err
53         }
54         azClient, err := storage.NewBasicClient(azureStorageAccountName, accountKey)
55         if err != nil {
56                 return errors.New("creating Azure storage client: " + err.Error())
57         }
58         if flagSerializeIO {
59                 log.Print("Notice: -serialize is not supported by azure-blob-container volumes.")
60         }
61         v := NewAzureBlobVolume(azClient, containerName, flagReadonly, azureStorageReplication)
62         if err := v.Check(); err != nil {
63                 return err
64         }
65         *s.volumeSet = append(*s.volumeSet, v)
66         return nil
67 }
68
69 func init() {
70         flag.Var(&azureVolumeAdder{&volumes},
71                 "azure-storage-container-volume",
72                 "Use the given container as a storage volume. Can be given multiple times.")
73         flag.StringVar(
74                 &azureStorageAccountName,
75                 "azure-storage-account-name",
76                 "",
77                 "Azure storage account name used for subsequent --azure-storage-container-volume arguments.")
78         flag.StringVar(
79                 &azureStorageAccountKeyFile,
80                 "azure-storage-account-key-file",
81                 "",
82                 "File containing the account key used for subsequent --azure-storage-container-volume arguments.")
83         flag.IntVar(
84                 &azureStorageReplication,
85                 "azure-storage-replication",
86                 3,
87                 "Replication level to report to clients when data is stored in an Azure container.")
88 }
89
90 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
91 // container.
92 type AzureBlobVolume struct {
93         azClient      storage.Client
94         bsClient      storage.BlobStorageClient
95         containerName string
96         readonly      bool
97         replication   int
98 }
99
100 func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool, replication int) *AzureBlobVolume {
101         return &AzureBlobVolume{
102                 azClient:      client,
103                 bsClient:      client.GetBlobService(),
104                 containerName: containerName,
105                 readonly:      readonly,
106                 replication:   replication,
107         }
108 }
109
110 // Check returns nil if the volume is usable.
111 func (v *AzureBlobVolume) Check() error {
112         ok, err := v.bsClient.ContainerExists(v.containerName)
113         if err != nil {
114                 return err
115         }
116         if !ok {
117                 return errors.New("container does not exist")
118         }
119         return nil
120 }
121
122 func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
123         var deadline time.Time
124         haveDeadline := false
125         buf, err := v.get(loc)
126         for err == nil && len(buf) == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
127                 // Seeing a brand new empty block probably means we're
128                 // in a race with CreateBlob, which under the hood
129                 // (apparently) does "CreateEmpty" and "CommitData"
130                 // with no additional transaction locking.
131                 if !haveDeadline {
132                         t, err := v.Mtime(loc)
133                         if err != nil {
134                                 log.Print("Got empty block (possible race) but Mtime failed: ", err)
135                                 break
136                         }
137                         deadline = t.Add(azureWriteRaceInterval)
138                         if time.Now().After(deadline) {
139                                 break
140                         }
141                         log.Printf("Race? Block %s is 0 bytes, %s old. Polling until %s", loc, time.Since(t), deadline)
142                         haveDeadline = true
143                 } else if time.Now().After(deadline) {
144                         break
145                 }
146                 bufs.Put(buf)
147                 time.Sleep(azureWriteRacePollTime)
148                 buf, err = v.get(loc)
149         }
150         if haveDeadline {
151                 log.Printf("Race ended with len(buf)==%d", len(buf))
152         }
153         return buf, err
154 }
155
156 func (v *AzureBlobVolume) get(loc string) ([]byte, error) {
157         rdr, err := v.bsClient.GetBlob(v.containerName, loc)
158         if err != nil {
159                 return nil, v.translateError(err)
160         }
161         defer rdr.Close()
162         buf := bufs.Get(BlockSize)
163         n, err := io.ReadFull(rdr, buf)
164         switch err {
165         case nil, io.EOF, io.ErrUnexpectedEOF:
166                 return buf[:n], nil
167         default:
168                 bufs.Put(buf)
169                 return nil, err
170         }
171 }
172
173 func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
174         rdr, err := v.bsClient.GetBlob(v.containerName, loc)
175         if err != nil {
176                 return v.translateError(err)
177         }
178         defer rdr.Close()
179         return compareReaderWithBuf(rdr, expect, loc[:32])
180 }
181
182 func (v *AzureBlobVolume) Put(loc string, block []byte) error {
183         if v.readonly {
184                 return MethodDisabledError
185         }
186         return v.bsClient.CreateBlockBlobFromReader(v.containerName, loc, uint64(len(block)), bytes.NewReader(block))
187 }
188
189 func (v *AzureBlobVolume) Touch(loc string) error {
190         if v.readonly {
191                 return MethodDisabledError
192         }
193         return v.bsClient.SetBlobMetadata(v.containerName, loc, map[string]string{
194                 "touch": fmt.Sprintf("%d", time.Now()),
195         })
196 }
197
198 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
199         props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
200         if err != nil {
201                 return time.Time{}, err
202         }
203         return time.Parse(time.RFC1123, props.LastModified)
204 }
205
206 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
207         params := storage.ListBlobsParameters{
208                 Prefix: prefix,
209         }
210         for {
211                 resp, err := v.bsClient.ListBlobs(v.containerName, params)
212                 if err != nil {
213                         return err
214                 }
215                 for _, b := range resp.Blobs {
216                         t, err := time.Parse(time.RFC1123, b.Properties.LastModified)
217                         if err != nil {
218                                 return err
219                         }
220                         if !v.isKeepBlock(b.Name) {
221                                 continue
222                         }
223                         if b.Properties.ContentLength == 0 && t.Add(azureWriteRaceInterval).After(time.Now()) {
224                                 // A new zero-length blob is probably
225                                 // just a new non-empty blob that
226                                 // hasn't committed its data yet (see
227                                 // Get()), and in any case has no
228                                 // value.
229                                 continue
230                         }
231                         fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.Unix())
232                 }
233                 if resp.NextMarker == "" {
234                         return nil
235                 }
236                 params.Marker = resp.NextMarker
237         }
238 }
239
240 func (v *AzureBlobVolume) Delete(loc string) error {
241         if v.readonly {
242                 return MethodDisabledError
243         }
244         // Ideally we would use If-Unmodified-Since, but that
245         // particular condition seems to be ignored by Azure. Instead,
246         // we get the Etag before checking Mtime, and use If-Match to
247         // ensure we don't delete data if Put() or Touch() happens
248         // between our calls to Mtime() and DeleteBlob().
249         props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
250         if err != nil {
251                 return err
252         }
253         if t, err := v.Mtime(loc); err != nil {
254                 return err
255         } else if time.Since(t) < blobSignatureTTL {
256                 return nil
257         }
258         return v.bsClient.DeleteBlob(v.containerName, loc, map[string]string{
259                 "If-Match": props.Etag,
260         })
261 }
262
263 func (v *AzureBlobVolume) Status() *VolumeStatus {
264         return &VolumeStatus{
265                 DeviceNum: 1,
266                 BytesFree: BlockSize * 1000,
267                 BytesUsed: 1,
268         }
269 }
270
271 func (v *AzureBlobVolume) String() string {
272         return fmt.Sprintf("azure-storage-container:%+q", v.containerName)
273 }
274
275 func (v *AzureBlobVolume) Writable() bool {
276         return !v.readonly
277 }
278
279 func (v *AzureBlobVolume) Replication() int {
280         return v.replication
281 }
282
283 // If possible, translate an Azure SDK error to a recognizable error
284 // like os.ErrNotExist.
285 func (v *AzureBlobVolume) translateError(err error) error {
286         switch {
287         case err == nil:
288                 return err
289         case strings.Contains(err.Error(), "404 Not Found"):
290                 // "storage: service returned without a response body (404 Not Found)"
291                 return os.ErrNotExist
292         default:
293                 return err
294         }
295 }
296
297 var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
298 func (v *AzureBlobVolume) isKeepBlock(s string) bool {
299         return keepBlockRegexp.MatchString(s)
300 }