7159: Work around CreateBlob race by polling for updates when a brand new blob is...
[arvados.git] / services / keepstore / azure_blob_volume.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         "flag"
7         "fmt"
8         "io"
9         "io/ioutil"
10         "log"
11         "os"
12         "strings"
13         "time"
14
15         "github.com/curoverse/azure-sdk-for-go/storage"
16 )
17
18 var (
19         azureStorageAccountName    string
20         azureStorageAccountKeyFile string
21         azureStorageReplication    int
22         azureWriteRaceInterval     time.Duration = 15 * time.Second
23         azureWriteRacePollTime     time.Duration = time.Second
24 )
25
26 func readKeyFromFile(file string) (string, error) {
27         buf, err := ioutil.ReadFile(file)
28         if err != nil {
29                 return "", errors.New("reading key from " + file + ": " + err.Error())
30         }
31         accountKey := strings.TrimSpace(string(buf))
32         if accountKey == "" {
33                 return "", errors.New("empty account key in " + file)
34         }
35         return accountKey, nil
36 }
37
38 type azureVolumeAdder struct {
39         *volumeSet
40 }
41
42 func (s *azureVolumeAdder) Set(containerName string) error {
43         if containerName == "" {
44                 return errors.New("no container name given")
45         }
46         if azureStorageAccountName == "" || azureStorageAccountKeyFile == "" {
47                 return errors.New("-azure-storage-account-name and -azure-storage-account-key-file arguments must given before -azure-storage-container-volume")
48         }
49         accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
50         if err != nil {
51                 return err
52         }
53         azClient, err := storage.NewBasicClient(azureStorageAccountName, accountKey)
54         if err != nil {
55                 return errors.New("creating Azure storage client: " + err.Error())
56         }
57         if flagSerializeIO {
58                 log.Print("Notice: -serialize is not supported by azure-blob-container volumes.")
59         }
60         v := NewAzureBlobVolume(azClient, containerName, flagReadonly, azureStorageReplication)
61         if err := v.Check(); err != nil {
62                 return err
63         }
64         *s.volumeSet = append(*s.volumeSet, v)
65         return nil
66 }
67
68 func init() {
69         flag.Var(&azureVolumeAdder{&volumes},
70                 "azure-storage-container-volume",
71                 "Use the given container as a storage volume. Can be given multiple times.")
72         flag.StringVar(
73                 &azureStorageAccountName,
74                 "azure-storage-account-name",
75                 "",
76                 "Azure storage account name used for subsequent --azure-storage-container-volume arguments.")
77         flag.StringVar(
78                 &azureStorageAccountKeyFile,
79                 "azure-storage-account-key-file",
80                 "",
81                 "File containing the account key used for subsequent --azure-storage-container-volume arguments.")
82         flag.IntVar(
83                 &azureStorageReplication,
84                 "azure-storage-replication",
85                 3,
86                 "Replication level to report to clients when data is stored in an Azure container.")
87 }
88
89 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
90 // container.
91 type AzureBlobVolume struct {
92         azClient      storage.Client
93         bsClient      storage.BlobStorageClient
94         containerName string
95         readonly      bool
96         replication   int
97 }
98
99 func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool, replication int) *AzureBlobVolume {
100         return &AzureBlobVolume{
101                 azClient:      client,
102                 bsClient:      client.GetBlobService(),
103                 containerName: containerName,
104                 readonly:      readonly,
105                 replication:   replication,
106         }
107 }
108
109 // Check returns nil if the volume is usable.
110 func (v *AzureBlobVolume) Check() error {
111         ok, err := v.bsClient.ContainerExists(v.containerName)
112         if err != nil {
113                 return err
114         }
115         if !ok {
116                 return errors.New("container does not exist")
117         }
118         return nil
119 }
120
121 func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
122         var deadline time.Time
123         haveDeadline := false
124         buf, err := v.get(loc)
125         for err == nil && len(buf) == 0 && loc[:32] != "d41d8cd98f00b204e9800998ecf8427e" {
126                 // Seeing a brand new empty block probably means we're
127                 // in a race with CreateBlob, which under the hood
128                 // (apparently) does "CreateEmpty" and "CommitData"
129                 // with no additional transaction locking.
130                 if !haveDeadline {
131                         t, err := v.Mtime(loc)
132                         if err != nil {
133                                 log.Print("Got empty block (possible race) but Mtime failed: ", err)
134                                 break
135                         }
136                         deadline = t.Add(azureWriteRaceInterval)
137                         haveDeadline = true
138                 }
139                 if time.Now().After(deadline) {
140                         break
141                 }
142                 bufs.Put(buf)
143                 time.Sleep(azureWriteRacePollTime)
144                 buf, err = v.get(loc)
145         }
146         return buf, err
147 }
148
149 func (v *AzureBlobVolume) get(loc string) ([]byte, error) {
150         rdr, err := v.bsClient.GetBlob(v.containerName, loc)
151         if err != nil {
152                 if strings.Contains(err.Error(), "404 Not Found") {
153                         // "storage: service returned without a response body (404 Not Found)"
154                         return nil, os.ErrNotExist
155                 }
156                 return nil, err
157         }
158         switch err := err.(type) {
159         case nil:
160         default:
161                 log.Printf("ERROR IN Get(): %T %#v", err, err)
162                 return nil, err
163         }
164         defer rdr.Close()
165         buf := bufs.Get(BlockSize)
166         n, err := io.ReadFull(rdr, buf)
167         switch err {
168         case io.EOF, io.ErrUnexpectedEOF:
169                 return buf[:n], nil
170         default:
171                 bufs.Put(buf)
172                 return nil, err
173         }
174 }
175
176 func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
177         rdr, err := v.bsClient.GetBlob(v.containerName, loc)
178         if err != nil {
179                 return err
180         }
181         defer rdr.Close()
182         return compareReaderWithBuf(rdr, expect, loc[:32])
183 }
184
185 func (v *AzureBlobVolume) Put(loc string, block []byte) error {
186         if v.readonly {
187                 return MethodDisabledError
188         }
189         return v.bsClient.CreateBlockBlobFromReader(v.containerName, loc, uint64(len(block)), bytes.NewReader(block))
190 }
191
192 func (v *AzureBlobVolume) Touch(loc string) error {
193         if v.readonly {
194                 return MethodDisabledError
195         }
196         return v.bsClient.SetBlobMetadata(v.containerName, loc, map[string]string{
197                 "touch": fmt.Sprintf("%d", time.Now()),
198         })
199 }
200
201 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
202         props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
203         if err != nil {
204                 return time.Time{}, err
205         }
206         return time.Parse(time.RFC1123, props.LastModified)
207 }
208
209 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
210         params := storage.ListBlobsParameters{
211                 Prefix: prefix,
212         }
213         for {
214                 resp, err := v.bsClient.ListBlobs(v.containerName, params)
215                 if err != nil {
216                         return err
217                 }
218                 for _, b := range resp.Blobs {
219                         t, err := time.Parse(time.RFC1123, b.Properties.LastModified)
220                         if err != nil {
221                                 return err
222                         }
223                         fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.Unix())
224                 }
225                 if resp.NextMarker == "" {
226                         return nil
227                 }
228                 params.Marker = resp.NextMarker
229         }
230 }
231
232 func (v *AzureBlobVolume) Delete(loc string) error {
233         if v.readonly {
234                 return MethodDisabledError
235         }
236         // Ideally we would use If-Unmodified-Since, but that
237         // particular condition seems to be ignored by Azure. Instead,
238         // we get the Etag before checking Mtime, and use If-Match to
239         // ensure we don't delete data if Put() or Touch() happens
240         // between our calls to Mtime() and DeleteBlob().
241         props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
242         if err != nil {
243                 return err
244         }
245         if t, err := v.Mtime(loc); err != nil {
246                 return err
247         } else if time.Since(t) < blobSignatureTTL {
248                 return nil
249         }
250         return v.bsClient.DeleteBlob(v.containerName, loc, map[string]string{
251                 "If-Match": props.Etag,
252         })
253 }
254
255 func (v *AzureBlobVolume) Status() *VolumeStatus {
256         return &VolumeStatus{
257                 DeviceNum: 1,
258                 BytesFree: BlockSize * 1000,
259                 BytesUsed: 1,
260         }
261 }
262
263 func (v *AzureBlobVolume) String() string {
264         return fmt.Sprintf("azure-storage-container:%+q", v.containerName)
265 }
266
267 func (v *AzureBlobVolume) Writable() bool {
268         return !v.readonly
269 }
270
271 func (v *AzureBlobVolume) Replication() int {
272         return v.replication
273 }