7159: Return benign os.ErrNotExist error from Compare to avoid excessive logs. refs...
[arvados.git] / services / keepstore / azure_blob_volume.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         "flag"
7         "fmt"
8         "io"
9         "io/ioutil"
10         "log"
11         "os"
12         "strings"
13         "time"
14
15         "github.com/curoverse/azure-sdk-for-go/storage"
16 )
17
18 var (
19         azureStorageAccountName    string
20         azureStorageAccountKeyFile string
21         azureStorageReplication    int
22         azureWriteRaceInterval     time.Duration = 15 * time.Second
23         azureWriteRacePollTime     time.Duration = time.Second
24 )
25
26 func readKeyFromFile(file string) (string, error) {
27         buf, err := ioutil.ReadFile(file)
28         if err != nil {
29                 return "", errors.New("reading key from " + file + ": " + err.Error())
30         }
31         accountKey := strings.TrimSpace(string(buf))
32         if accountKey == "" {
33                 return "", errors.New("empty account key in " + file)
34         }
35         return accountKey, nil
36 }
37
38 type azureVolumeAdder struct {
39         *volumeSet
40 }
41
42 func (s *azureVolumeAdder) Set(containerName string) error {
43         if containerName == "" {
44                 return errors.New("no container name given")
45         }
46         if azureStorageAccountName == "" || azureStorageAccountKeyFile == "" {
47                 return errors.New("-azure-storage-account-name and -azure-storage-account-key-file arguments must given before -azure-storage-container-volume")
48         }
49         accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
50         if err != nil {
51                 return err
52         }
53         azClient, err := storage.NewBasicClient(azureStorageAccountName, accountKey)
54         if err != nil {
55                 return errors.New("creating Azure storage client: " + err.Error())
56         }
57         if flagSerializeIO {
58                 log.Print("Notice: -serialize is not supported by azure-blob-container volumes.")
59         }
60         v := NewAzureBlobVolume(azClient, containerName, flagReadonly, azureStorageReplication)
61         if err := v.Check(); err != nil {
62                 return err
63         }
64         *s.volumeSet = append(*s.volumeSet, v)
65         return nil
66 }
67
68 func init() {
69         flag.Var(&azureVolumeAdder{&volumes},
70                 "azure-storage-container-volume",
71                 "Use the given container as a storage volume. Can be given multiple times.")
72         flag.StringVar(
73                 &azureStorageAccountName,
74                 "azure-storage-account-name",
75                 "",
76                 "Azure storage account name used for subsequent --azure-storage-container-volume arguments.")
77         flag.StringVar(
78                 &azureStorageAccountKeyFile,
79                 "azure-storage-account-key-file",
80                 "",
81                 "File containing the account key used for subsequent --azure-storage-container-volume arguments.")
82         flag.IntVar(
83                 &azureStorageReplication,
84                 "azure-storage-replication",
85                 3,
86                 "Replication level to report to clients when data is stored in an Azure container.")
87 }
88
89 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
90 // container.
91 type AzureBlobVolume struct {
92         azClient      storage.Client
93         bsClient      storage.BlobStorageClient
94         containerName string
95         readonly      bool
96         replication   int
97 }
98
99 func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool, replication int) *AzureBlobVolume {
100         return &AzureBlobVolume{
101                 azClient:      client,
102                 bsClient:      client.GetBlobService(),
103                 containerName: containerName,
104                 readonly:      readonly,
105                 replication:   replication,
106         }
107 }
108
109 // Check returns nil if the volume is usable.
110 func (v *AzureBlobVolume) Check() error {
111         ok, err := v.bsClient.ContainerExists(v.containerName)
112         if err != nil {
113                 return err
114         }
115         if !ok {
116                 return errors.New("container does not exist")
117         }
118         return nil
119 }
120
121 func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
122         var deadline time.Time
123         haveDeadline := false
124         buf, err := v.get(loc)
125         for err == nil && len(buf) == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
126                 // Seeing a brand new empty block probably means we're
127                 // in a race with CreateBlob, which under the hood
128                 // (apparently) does "CreateEmpty" and "CommitData"
129                 // with no additional transaction locking.
130                 if !haveDeadline {
131                         t, err := v.Mtime(loc)
132                         if err != nil {
133                                 log.Print("Got empty block (possible race) but Mtime failed: ", err)
134                                 break
135                         }
136                         deadline = t.Add(azureWriteRaceInterval)
137                         if time.Now().After(deadline) {
138                                 break
139                         }
140                         log.Printf("Race? Block %s is 0 bytes, %s old. Polling until %s", loc, time.Since(t), deadline)
141                         haveDeadline = true
142                 } else if time.Now().After(deadline) {
143                         break
144                 }
145                 bufs.Put(buf)
146                 time.Sleep(azureWriteRacePollTime)
147                 buf, err = v.get(loc)
148         }
149         if haveDeadline {
150                 log.Printf("Race ended with len(buf)==%d", len(buf))
151         }
152         return buf, err
153 }
154
155 func (v *AzureBlobVolume) get(loc string) ([]byte, error) {
156         rdr, err := v.bsClient.GetBlob(v.containerName, loc)
157         if err != nil {
158                 return nil, v.translateError(err)
159         }
160         defer rdr.Close()
161         buf := bufs.Get(BlockSize)
162         n, err := io.ReadFull(rdr, buf)
163         switch err {
164         case nil, io.EOF, io.ErrUnexpectedEOF:
165                 return buf[:n], nil
166         default:
167                 bufs.Put(buf)
168                 return nil, err
169         }
170 }
171
172 func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
173         rdr, err := v.bsClient.GetBlob(v.containerName, loc)
174         if err != nil {
175                 return v.translateError(err)
176         }
177         defer rdr.Close()
178         return compareReaderWithBuf(rdr, expect, loc[:32])
179 }
180
181 func (v *AzureBlobVolume) Put(loc string, block []byte) error {
182         if v.readonly {
183                 return MethodDisabledError
184         }
185         return v.bsClient.CreateBlockBlobFromReader(v.containerName, loc, uint64(len(block)), bytes.NewReader(block))
186 }
187
188 func (v *AzureBlobVolume) Touch(loc string) error {
189         if v.readonly {
190                 return MethodDisabledError
191         }
192         return v.bsClient.SetBlobMetadata(v.containerName, loc, map[string]string{
193                 "touch": fmt.Sprintf("%d", time.Now()),
194         })
195 }
196
197 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
198         props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
199         if err != nil {
200                 return time.Time{}, err
201         }
202         return time.Parse(time.RFC1123, props.LastModified)
203 }
204
205 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
206         params := storage.ListBlobsParameters{
207                 Prefix: prefix,
208         }
209         for {
210                 resp, err := v.bsClient.ListBlobs(v.containerName, params)
211                 if err != nil {
212                         return err
213                 }
214                 for _, b := range resp.Blobs {
215                         t, err := time.Parse(time.RFC1123, b.Properties.LastModified)
216                         if err != nil {
217                                 return err
218                         }
219                         if b.Properties.ContentLength == 0 && t.Add(azureWriteRaceInterval).After(time.Now()) {
220                                 // A new zero-length blob is probably
221                                 // just a new non-empty blob that
222                                 // hasn't committed its data yet (see
223                                 // Get()), and in any case has no
224                                 // value.
225                                 continue
226                         }
227                         fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.Unix())
228                 }
229                 if resp.NextMarker == "" {
230                         return nil
231                 }
232                 params.Marker = resp.NextMarker
233         }
234 }
235
236 func (v *AzureBlobVolume) Delete(loc string) error {
237         if v.readonly {
238                 return MethodDisabledError
239         }
240         // Ideally we would use If-Unmodified-Since, but that
241         // particular condition seems to be ignored by Azure. Instead,
242         // we get the Etag before checking Mtime, and use If-Match to
243         // ensure we don't delete data if Put() or Touch() happens
244         // between our calls to Mtime() and DeleteBlob().
245         props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
246         if err != nil {
247                 return err
248         }
249         if t, err := v.Mtime(loc); err != nil {
250                 return err
251         } else if time.Since(t) < blobSignatureTTL {
252                 return nil
253         }
254         return v.bsClient.DeleteBlob(v.containerName, loc, map[string]string{
255                 "If-Match": props.Etag,
256         })
257 }
258
259 func (v *AzureBlobVolume) Status() *VolumeStatus {
260         return &VolumeStatus{
261                 DeviceNum: 1,
262                 BytesFree: BlockSize * 1000,
263                 BytesUsed: 1,
264         }
265 }
266
267 func (v *AzureBlobVolume) String() string {
268         return fmt.Sprintf("azure-storage-container:%+q", v.containerName)
269 }
270
271 func (v *AzureBlobVolume) Writable() bool {
272         return !v.readonly
273 }
274
275 func (v *AzureBlobVolume) Replication() int {
276         return v.replication
277 }
278
279 // If possible, translate an Azure SDK error to a recognizable error
280 // like os.ErrNotExist.
281 func (v *AzureBlobVolume) translateError(err error) error {
282         switch {
283         case err == nil:
284                 return err
285         case strings.Contains(err.Error(), "404 Not Found"):
286                 // "storage: service returned without a response body (404 Not Found)"
287                 return os.ErrNotExist
288         default:
289                 return err
290         }
291 }