8178: All three currently supported volumes return error when trash-lifetime period...
[arvados.git] / services / keepstore / azure_blob_volume.go
1 package main
2
3 import (
4         "bytes"
5         "errors"
6         "flag"
7         "fmt"
8         "io"
9         "io/ioutil"
10         "log"
11         "os"
12         "regexp"
13         "strings"
14         "sync"
15         "time"
16
17         "github.com/curoverse/azure-sdk-for-go/storage"
18 )
19
20 var (
21         azureMaxGetBytes           int
22         azureStorageAccountName    string
23         azureStorageAccountKeyFile string
24         azureStorageReplication    int
25         azureWriteRaceInterval     = 15 * time.Second
26         azureWriteRacePollTime     = time.Second
27 )
28
29 func readKeyFromFile(file string) (string, error) {
30         buf, err := ioutil.ReadFile(file)
31         if err != nil {
32                 return "", errors.New("reading key from " + file + ": " + err.Error())
33         }
34         accountKey := strings.TrimSpace(string(buf))
35         if accountKey == "" {
36                 return "", errors.New("empty account key in " + file)
37         }
38         return accountKey, nil
39 }
40
41 type azureVolumeAdder struct {
42         *volumeSet
43 }
44
45 func (s *azureVolumeAdder) Set(containerName string) error {
46         if trashLifetime <= 0 {
47                 log.Print("Missing required configuration parameter: trash-lifetime")
48                 return ErrNotImplemented
49         }
50
51         if containerName == "" {
52                 return errors.New("no container name given")
53         }
54         if azureStorageAccountName == "" || azureStorageAccountKeyFile == "" {
55                 return errors.New("-azure-storage-account-name and -azure-storage-account-key-file arguments must given before -azure-storage-container-volume")
56         }
57         accountKey, err := readKeyFromFile(azureStorageAccountKeyFile)
58         if err != nil {
59                 return err
60         }
61         azClient, err := storage.NewBasicClient(azureStorageAccountName, accountKey)
62         if err != nil {
63                 return errors.New("creating Azure storage client: " + err.Error())
64         }
65         if flagSerializeIO {
66                 log.Print("Notice: -serialize is not supported by azure-blob-container volumes.")
67         }
68         v := NewAzureBlobVolume(azClient, containerName, flagReadonly, azureStorageReplication)
69         if err := v.Check(); err != nil {
70                 return err
71         }
72         *s.volumeSet = append(*s.volumeSet, v)
73         return nil
74 }
75
76 func init() {
77         flag.Var(&azureVolumeAdder{&volumes},
78                 "azure-storage-container-volume",
79                 "Use the given container as a storage volume. Can be given multiple times.")
80         flag.StringVar(
81                 &azureStorageAccountName,
82                 "azure-storage-account-name",
83                 "",
84                 "Azure storage account name used for subsequent --azure-storage-container-volume arguments.")
85         flag.StringVar(
86                 &azureStorageAccountKeyFile,
87                 "azure-storage-account-key-file",
88                 "",
89                 "File containing the account key used for subsequent --azure-storage-container-volume arguments.")
90         flag.IntVar(
91                 &azureStorageReplication,
92                 "azure-storage-replication",
93                 3,
94                 "Replication level to report to clients when data is stored in an Azure container.")
95         flag.IntVar(
96                 &azureMaxGetBytes,
97                 "azure-max-get-bytes",
98                 BlockSize,
99                 fmt.Sprintf("Maximum bytes to request in a single GET request. If smaller than %d, use multiple concurrent range requests to retrieve a block.", BlockSize))
100 }
101
102 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
103 // container.
104 type AzureBlobVolume struct {
105         azClient      storage.Client
106         bsClient      storage.BlobStorageClient
107         containerName string
108         readonly      bool
109         replication   int
110 }
111
112 // NewAzureBlobVolume returns a new AzureBlobVolume using the given
113 // client and container name. The replication argument specifies the
114 // replication level to report when writing data.
115 func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool, replication int) *AzureBlobVolume {
116         return &AzureBlobVolume{
117                 azClient:      client,
118                 bsClient:      client.GetBlobService(),
119                 containerName: containerName,
120                 readonly:      readonly,
121                 replication:   replication,
122         }
123 }
124
125 // Check returns nil if the volume is usable.
126 func (v *AzureBlobVolume) Check() error {
127         ok, err := v.bsClient.ContainerExists(v.containerName)
128         if err != nil {
129                 return err
130         }
131         if !ok {
132                 return errors.New("container does not exist")
133         }
134         return nil
135 }
136
137 // Get reads a Keep block that has been stored as a block blob in the
138 // container.
139 //
140 // If the block is younger than azureWriteRaceInterval and is
141 // unexpectedly empty, assume a PutBlob operation is in progress, and
142 // wait for it to finish writing.
143 func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
144         var deadline time.Time
145         haveDeadline := false
146         buf, err := v.get(loc)
147         for err == nil && len(buf) == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
148                 // Seeing a brand new empty block probably means we're
149                 // in a race with CreateBlob, which under the hood
150                 // (apparently) does "CreateEmpty" and "CommitData"
151                 // with no additional transaction locking.
152                 if !haveDeadline {
153                         t, err := v.Mtime(loc)
154                         if err != nil {
155                                 log.Print("Got empty block (possible race) but Mtime failed: ", err)
156                                 break
157                         }
158                         deadline = t.Add(azureWriteRaceInterval)
159                         if time.Now().After(deadline) {
160                                 break
161                         }
162                         log.Printf("Race? Block %s is 0 bytes, %s old. Polling until %s", loc, time.Since(t), deadline)
163                         haveDeadline = true
164                 } else if time.Now().After(deadline) {
165                         break
166                 }
167                 bufs.Put(buf)
168                 time.Sleep(azureWriteRacePollTime)
169                 buf, err = v.get(loc)
170         }
171         if haveDeadline {
172                 log.Printf("Race ended with len(buf)==%d", len(buf))
173         }
174         return buf, err
175 }
176
177 func (v *AzureBlobVolume) get(loc string) ([]byte, error) {
178         expectSize := BlockSize
179         if azureMaxGetBytes < BlockSize {
180                 // Unfortunately the handler doesn't tell us how long the blob
181                 // is expected to be, so we have to ask Azure.
182                 props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
183                 if err != nil {
184                         return nil, v.translateError(err)
185                 }
186                 if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
187                         return nil, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
188                 }
189                 expectSize = int(props.ContentLength)
190         }
191
192         buf := bufs.Get(expectSize)
193         if expectSize == 0 {
194                 return buf, nil
195         }
196
197         // We'll update this actualSize if/when we get the last piece.
198         actualSize := -1
199         pieces := (expectSize + azureMaxGetBytes - 1) / azureMaxGetBytes
200         errors := make([]error, pieces)
201         var wg sync.WaitGroup
202         wg.Add(pieces)
203         for p := 0; p < pieces; p++ {
204                 go func(p int) {
205                         defer wg.Done()
206                         startPos := p * azureMaxGetBytes
207                         endPos := startPos + azureMaxGetBytes
208                         if endPos > expectSize {
209                                 endPos = expectSize
210                         }
211                         var rdr io.ReadCloser
212                         var err error
213                         if startPos == 0 && endPos == expectSize {
214                                 rdr, err = v.bsClient.GetBlob(v.containerName, loc)
215                         } else {
216                                 rdr, err = v.bsClient.GetBlobRange(v.containerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1))
217                         }
218                         if err != nil {
219                                 errors[p] = err
220                                 return
221                         }
222                         defer rdr.Close()
223                         n, err := io.ReadFull(rdr, buf[startPos:endPos])
224                         if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
225                                 // If we don't know the actual size,
226                                 // and just tried reading 64 MiB, it's
227                                 // normal to encounter EOF.
228                         } else if err != nil {
229                                 errors[p] = err
230                         }
231                         if p == pieces-1 {
232                                 actualSize = startPos + n
233                         }
234                 }(p)
235         }
236         wg.Wait()
237         for _, err := range errors {
238                 if err != nil {
239                         bufs.Put(buf)
240                         return nil, v.translateError(err)
241                 }
242         }
243         return buf[:actualSize], nil
244 }
245
246 // Compare the given data with existing stored data.
247 func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
248         rdr, err := v.bsClient.GetBlob(v.containerName, loc)
249         if err != nil {
250                 return v.translateError(err)
251         }
252         defer rdr.Close()
253         return compareReaderWithBuf(rdr, expect, loc[:32])
254 }
255
256 // Put stores a Keep block as a block blob in the container.
257 func (v *AzureBlobVolume) Put(loc string, block []byte) error {
258         if v.readonly {
259                 return MethodDisabledError
260         }
261         return v.bsClient.CreateBlockBlobFromReader(v.containerName, loc, uint64(len(block)), bytes.NewReader(block))
262 }
263
264 // Touch updates the last-modified property of a block blob.
265 func (v *AzureBlobVolume) Touch(loc string) error {
266         if v.readonly {
267                 return MethodDisabledError
268         }
269         return v.bsClient.SetBlobMetadata(v.containerName, loc, map[string]string{
270                 "touch": fmt.Sprintf("%d", time.Now()),
271         })
272 }
273
274 // Mtime returns the last-modified property of a block blob.
275 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
276         props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
277         if err != nil {
278                 return time.Time{}, err
279         }
280         return time.Parse(time.RFC1123, props.LastModified)
281 }
282
283 // IndexTo writes a list of Keep blocks that are stored in the
284 // container.
285 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
286         params := storage.ListBlobsParameters{
287                 Prefix: prefix,
288         }
289         for {
290                 resp, err := v.bsClient.ListBlobs(v.containerName, params)
291                 if err != nil {
292                         return err
293                 }
294                 for _, b := range resp.Blobs {
295                         t, err := time.Parse(time.RFC1123, b.Properties.LastModified)
296                         if err != nil {
297                                 return err
298                         }
299                         if !v.isKeepBlock(b.Name) {
300                                 continue
301                         }
302                         if b.Properties.ContentLength == 0 && t.Add(azureWriteRaceInterval).After(time.Now()) {
303                                 // A new zero-length blob is probably
304                                 // just a new non-empty blob that
305                                 // hasn't committed its data yet (see
306                                 // Get()), and in any case has no
307                                 // value.
308                                 continue
309                         }
310                         fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.Unix())
311                 }
312                 if resp.NextMarker == "" {
313                         return nil
314                 }
315                 params.Marker = resp.NextMarker
316         }
317 }
318
319 // Trash a Keep block.
320 func (v *AzureBlobVolume) Trash(loc string) error {
321         if v.readonly {
322                 return MethodDisabledError
323         }
324         // Ideally we would use If-Unmodified-Since, but that
325         // particular condition seems to be ignored by Azure. Instead,
326         // we get the Etag before checking Mtime, and use If-Match to
327         // ensure we don't delete data if Put() or Touch() happens
328         // between our calls to Mtime() and DeleteBlob().
329         props, err := v.bsClient.GetBlobProperties(v.containerName, loc)
330         if err != nil {
331                 return err
332         }
333         if t, err := v.Mtime(loc); err != nil {
334                 return err
335         } else if time.Since(t) < blobSignatureTTL {
336                 return nil
337         }
338         return v.bsClient.DeleteBlob(v.containerName, loc, map[string]string{
339                 "If-Match": props.Etag,
340         })
341 }
342
343 // Untrash a Keep block.
344 // TBD
345 func (v *AzureBlobVolume) Untrash(loc string) error {
346         return nil
347 }
348
349 // Status returns a VolumeStatus struct with placeholder data.
350 func (v *AzureBlobVolume) Status() *VolumeStatus {
351         return &VolumeStatus{
352                 DeviceNum: 1,
353                 BytesFree: BlockSize * 1000,
354                 BytesUsed: 1,
355         }
356 }
357
358 // String returns a volume label, including the container name.
359 func (v *AzureBlobVolume) String() string {
360         return fmt.Sprintf("azure-storage-container:%+q", v.containerName)
361 }
362
363 // Writable returns true, unless the -readonly flag was on when the
364 // volume was added.
365 func (v *AzureBlobVolume) Writable() bool {
366         return !v.readonly
367 }
368
369 // Replication returns the replication level of the container, as
370 // specified by the -azure-storage-replication argument.
371 func (v *AzureBlobVolume) Replication() int {
372         return v.replication
373 }
374
375 // If possible, translate an Azure SDK error to a recognizable error
376 // like os.ErrNotExist.
377 func (v *AzureBlobVolume) translateError(err error) error {
378         switch {
379         case err == nil:
380                 return err
381         case strings.Contains(err.Error(), "404 Not Found"):
382                 // "storage: service returned without a response body (404 Not Found)"
383                 return os.ErrNotExist
384         default:
385                 return err
386         }
387 }
388
389 var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
390
391 func (v *AzureBlobVolume) isKeepBlock(s string) bool {
392         return keepBlockRegexp.MatchString(s)
393 }