b21f68d6830ab98eeeb8d556dd601bbbb6d6b1fa
[arvados.git] / services / keepstore / azure_blob_volume.go
1 package main
2
3 import (
4         "bytes"
5         "context"
6         "errors"
7         "flag"
8         "fmt"
9         "io"
10         "io/ioutil"
11         "log"
12         "net/http"
13         "os"
14         "regexp"
15         "strconv"
16         "strings"
17         "sync"
18         "time"
19
20         "git.curoverse.com/arvados.git/sdk/go/arvados"
21         "github.com/curoverse/azure-sdk-for-go/storage"
22 )
23
24 const azureDefaultRequestTimeout = arvados.Duration(10 * time.Minute)
25
26 var (
27         azureMaxGetBytes           int
28         azureStorageAccountName    string
29         azureStorageAccountKeyFile string
30         azureStorageReplication    int
31         azureWriteRaceInterval     = 15 * time.Second
32         azureWriteRacePollTime     = time.Second
33 )
34
35 func readKeyFromFile(file string) (string, error) {
36         buf, err := ioutil.ReadFile(file)
37         if err != nil {
38                 return "", errors.New("reading key from " + file + ": " + err.Error())
39         }
40         accountKey := strings.TrimSpace(string(buf))
41         if accountKey == "" {
42                 return "", errors.New("empty account key in " + file)
43         }
44         return accountKey, nil
45 }
46
47 type azureVolumeAdder struct {
48         *Config
49 }
50
51 // String implements flag.Value
52 func (s *azureVolumeAdder) String() string {
53         return "-"
54 }
55
56 func (s *azureVolumeAdder) Set(containerName string) error {
57         s.Config.Volumes = append(s.Config.Volumes, &AzureBlobVolume{
58                 ContainerName:         containerName,
59                 StorageAccountName:    azureStorageAccountName,
60                 StorageAccountKeyFile: azureStorageAccountKeyFile,
61                 AzureReplication:      azureStorageReplication,
62                 ReadOnly:              deprecated.flagReadonly,
63         })
64         return nil
65 }
66
67 func init() {
68         VolumeTypes = append(VolumeTypes, func() VolumeWithExamples { return &AzureBlobVolume{} })
69
70         flag.Var(&azureVolumeAdder{theConfig},
71                 "azure-storage-container-volume",
72                 "Use the given container as a storage volume. Can be given multiple times.")
73         flag.StringVar(
74                 &azureStorageAccountName,
75                 "azure-storage-account-name",
76                 "",
77                 "Azure storage account name used for subsequent --azure-storage-container-volume arguments.")
78         flag.StringVar(
79                 &azureStorageAccountKeyFile,
80                 "azure-storage-account-key-file",
81                 "",
82                 "`File` containing the account key used for subsequent --azure-storage-container-volume arguments.")
83         flag.IntVar(
84                 &azureStorageReplication,
85                 "azure-storage-replication",
86                 3,
87                 "Replication level to report to clients when data is stored in an Azure container.")
88         flag.IntVar(
89                 &azureMaxGetBytes,
90                 "azure-max-get-bytes",
91                 BlockSize,
92                 fmt.Sprintf("Maximum bytes to request in a single GET request. If smaller than %d, use multiple concurrent range requests to retrieve a block.", BlockSize))
93 }
94
95 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
96 // container.
97 type AzureBlobVolume struct {
98         StorageAccountName    string
99         StorageAccountKeyFile string
100         ContainerName         string
101         AzureReplication      int
102         ReadOnly              bool
103         RequestTimeout        arvados.Duration
104
105         azClient storage.Client
106         bsClient storage.BlobStorageClient
107 }
108
109 // Examples implements VolumeWithExamples.
110 func (*AzureBlobVolume) Examples() []Volume {
111         return []Volume{
112                 &AzureBlobVolume{
113                         StorageAccountName:    "example-account-name",
114                         StorageAccountKeyFile: "/etc/azure_storage_account_key.txt",
115                         ContainerName:         "example-container-name",
116                         AzureReplication:      3,
117                         RequestTimeout:        azureDefaultRequestTimeout,
118                 },
119         }
120 }
121
122 // Type implements Volume.
123 func (v *AzureBlobVolume) Type() string {
124         return "Azure"
125 }
126
127 // Start implements Volume.
128 func (v *AzureBlobVolume) Start() error {
129         if v.ContainerName == "" {
130                 return errors.New("no container name given")
131         }
132         if v.StorageAccountName == "" || v.StorageAccountKeyFile == "" {
133                 return errors.New("StorageAccountName and StorageAccountKeyFile must be given")
134         }
135         accountKey, err := readKeyFromFile(v.StorageAccountKeyFile)
136         if err != nil {
137                 return err
138         }
139         v.azClient, err = storage.NewBasicClient(v.StorageAccountName, accountKey)
140         if err != nil {
141                 return fmt.Errorf("creating Azure storage client: %s", err)
142         }
143
144         if v.RequestTimeout == 0 {
145                 v.RequestTimeout = azureDefaultRequestTimeout
146         }
147         v.azClient.HTTPClient = &http.Client{
148                 Timeout: time.Duration(v.RequestTimeout),
149         }
150         v.bsClient = v.azClient.GetBlobService()
151
152         ok, err := v.bsClient.ContainerExists(v.ContainerName)
153         if err != nil {
154                 return err
155         }
156         if !ok {
157                 return fmt.Errorf("Azure container %q does not exist", v.ContainerName)
158         }
159         return nil
160 }
161
162 // Return true if expires_at metadata attribute is found on the block
163 func (v *AzureBlobVolume) checkTrashed(loc string) (bool, map[string]string, error) {
164         metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
165         if err != nil {
166                 return false, metadata, v.translateError(err)
167         }
168         if metadata["expires_at"] != "" {
169                 return true, metadata, nil
170         }
171         return false, metadata, nil
172 }
173
174 // Get reads a Keep block that has been stored as a block blob in the
175 // container.
176 //
177 // If the block is younger than azureWriteRaceInterval and is
178 // unexpectedly empty, assume a PutBlob operation is in progress, and
179 // wait for it to finish writing.
180 func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int, error) {
181         trashed, _, err := v.checkTrashed(loc)
182         if err != nil {
183                 return 0, err
184         }
185         if trashed {
186                 return 0, os.ErrNotExist
187         }
188         var deadline time.Time
189         haveDeadline := false
190         size, err := v.get(loc, buf)
191         for err == nil && size == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
192                 // Seeing a brand new empty block probably means we're
193                 // in a race with CreateBlob, which under the hood
194                 // (apparently) does "CreateEmpty" and "CommitData"
195                 // with no additional transaction locking.
196                 if !haveDeadline {
197                         t, err := v.Mtime(loc)
198                         if err != nil {
199                                 log.Print("Got empty block (possible race) but Mtime failed: ", err)
200                                 break
201                         }
202                         deadline = t.Add(azureWriteRaceInterval)
203                         if time.Now().After(deadline) {
204                                 break
205                         }
206                         log.Printf("Race? Block %s is 0 bytes, %s old. Polling until %s", loc, time.Since(t), deadline)
207                         haveDeadline = true
208                 } else if time.Now().After(deadline) {
209                         break
210                 }
211                 time.Sleep(azureWriteRacePollTime)
212                 size, err = v.get(loc, buf)
213         }
214         if haveDeadline {
215                 log.Printf("Race ended with size==%d", size)
216         }
217         return size, err
218 }
219
220 func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
221         expectSize := len(buf)
222         if azureMaxGetBytes < BlockSize {
223                 // Unfortunately the handler doesn't tell us how long the blob
224                 // is expected to be, so we have to ask Azure.
225                 props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
226                 if err != nil {
227                         return 0, v.translateError(err)
228                 }
229                 if props.ContentLength > int64(BlockSize) || props.ContentLength < 0 {
230                         return 0, fmt.Errorf("block %s invalid size %d (max %d)", loc, props.ContentLength, BlockSize)
231                 }
232                 expectSize = int(props.ContentLength)
233         }
234
235         if expectSize == 0 {
236                 return 0, nil
237         }
238
239         // We'll update this actualSize if/when we get the last piece.
240         actualSize := -1
241         pieces := (expectSize + azureMaxGetBytes - 1) / azureMaxGetBytes
242         errors := make([]error, pieces)
243         var wg sync.WaitGroup
244         wg.Add(pieces)
245         for p := 0; p < pieces; p++ {
246                 go func(p int) {
247                         defer wg.Done()
248                         startPos := p * azureMaxGetBytes
249                         endPos := startPos + azureMaxGetBytes
250                         if endPos > expectSize {
251                                 endPos = expectSize
252                         }
253                         var rdr io.ReadCloser
254                         var err error
255                         if startPos == 0 && endPos == expectSize {
256                                 rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
257                         } else {
258                                 rdr, err = v.bsClient.GetBlobRange(v.ContainerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
259                         }
260                         if err != nil {
261                                 errors[p] = err
262                                 return
263                         }
264                         defer rdr.Close()
265                         n, err := io.ReadFull(rdr, buf[startPos:endPos])
266                         if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
267                                 // If we don't know the actual size,
268                                 // and just tried reading 64 MiB, it's
269                                 // normal to encounter EOF.
270                         } else if err != nil {
271                                 errors[p] = err
272                         }
273                         if p == pieces-1 {
274                                 actualSize = startPos + n
275                         }
276                 }(p)
277         }
278         wg.Wait()
279         for _, err := range errors {
280                 if err != nil {
281                         return 0, v.translateError(err)
282                 }
283         }
284         return actualSize, nil
285 }
286
287 // Compare the given data with existing stored data.
288 func (v *AzureBlobVolume) Compare(loc string, expect []byte) error {
289         trashed, _, err := v.checkTrashed(loc)
290         if err != nil {
291                 return err
292         }
293         if trashed {
294                 return os.ErrNotExist
295         }
296         rdr, err := v.bsClient.GetBlob(v.ContainerName, loc)
297         if err != nil {
298                 return v.translateError(err)
299         }
300         defer rdr.Close()
301         return compareReaderWithBuf(rdr, expect, loc[:32])
302 }
303
304 // Put stores a Keep block as a block blob in the container.
305 func (v *AzureBlobVolume) Put(loc string, block []byte) error {
306         if v.ReadOnly {
307                 return MethodDisabledError
308         }
309         return v.bsClient.CreateBlockBlobFromReader(v.ContainerName, loc, uint64(len(block)), bytes.NewReader(block), nil)
310 }
311
312 // Touch updates the last-modified property of a block blob.
313 func (v *AzureBlobVolume) Touch(loc string) error {
314         if v.ReadOnly {
315                 return MethodDisabledError
316         }
317         trashed, metadata, err := v.checkTrashed(loc)
318         if err != nil {
319                 return err
320         }
321         if trashed {
322                 return os.ErrNotExist
323         }
324
325         metadata["touch"] = fmt.Sprintf("%d", time.Now())
326         return v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
327 }
328
329 // Mtime returns the last-modified property of a block blob.
330 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
331         trashed, _, err := v.checkTrashed(loc)
332         if err != nil {
333                 return time.Time{}, err
334         }
335         if trashed {
336                 return time.Time{}, os.ErrNotExist
337         }
338
339         props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
340         if err != nil {
341                 return time.Time{}, err
342         }
343         return time.Parse(time.RFC1123, props.LastModified)
344 }
345
346 // IndexTo writes a list of Keep blocks that are stored in the
347 // container.
348 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
349         params := storage.ListBlobsParameters{
350                 Prefix:  prefix,
351                 Include: "metadata",
352         }
353         for {
354                 resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
355                 if err != nil {
356                         return err
357                 }
358                 for _, b := range resp.Blobs {
359                         t, err := time.Parse(time.RFC1123, b.Properties.LastModified)
360                         if err != nil {
361                                 return err
362                         }
363                         if !v.isKeepBlock(b.Name) {
364                                 continue
365                         }
366                         if b.Properties.ContentLength == 0 && t.Add(azureWriteRaceInterval).After(time.Now()) {
367                                 // A new zero-length blob is probably
368                                 // just a new non-empty blob that
369                                 // hasn't committed its data yet (see
370                                 // Get()), and in any case has no
371                                 // value.
372                                 continue
373                         }
374                         if b.Metadata["expires_at"] != "" {
375                                 // Trashed blob; exclude it from response
376                                 continue
377                         }
378                         fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.UnixNano())
379                 }
380                 if resp.NextMarker == "" {
381                         return nil
382                 }
383                 params.Marker = resp.NextMarker
384         }
385 }
386
387 // Trash a Keep block.
388 func (v *AzureBlobVolume) Trash(loc string) error {
389         if v.ReadOnly {
390                 return MethodDisabledError
391         }
392
393         // Ideally we would use If-Unmodified-Since, but that
394         // particular condition seems to be ignored by Azure. Instead,
395         // we get the Etag before checking Mtime, and use If-Match to
396         // ensure we don't delete data if Put() or Touch() happens
397         // between our calls to Mtime() and DeleteBlob().
398         props, err := v.bsClient.GetBlobProperties(v.ContainerName, loc)
399         if err != nil {
400                 return err
401         }
402         if t, err := v.Mtime(loc); err != nil {
403                 return err
404         } else if time.Since(t) < theConfig.BlobSignatureTTL.Duration() {
405                 return nil
406         }
407
408         // If TrashLifetime == 0, just delete it
409         if theConfig.TrashLifetime == 0 {
410                 return v.bsClient.DeleteBlob(v.ContainerName, loc, map[string]string{
411                         "If-Match": props.Etag,
412                 })
413         }
414
415         // Otherwise, mark as trash
416         return v.bsClient.SetBlobMetadata(v.ContainerName, loc, map[string]string{
417                 "expires_at": fmt.Sprintf("%d", time.Now().Add(theConfig.TrashLifetime.Duration()).Unix()),
418         }, map[string]string{
419                 "If-Match": props.Etag,
420         })
421 }
422
423 // Untrash a Keep block.
424 // Delete the expires_at metadata attribute
425 func (v *AzureBlobVolume) Untrash(loc string) error {
426         // if expires_at does not exist, return NotFoundError
427         metadata, err := v.bsClient.GetBlobMetadata(v.ContainerName, loc)
428         if err != nil {
429                 return v.translateError(err)
430         }
431         if metadata["expires_at"] == "" {
432                 return os.ErrNotExist
433         }
434
435         // reset expires_at metadata attribute
436         metadata["expires_at"] = ""
437         err = v.bsClient.SetBlobMetadata(v.ContainerName, loc, metadata, nil)
438         return v.translateError(err)
439 }
440
441 // Status returns a VolumeStatus struct with placeholder data.
442 func (v *AzureBlobVolume) Status() *VolumeStatus {
443         return &VolumeStatus{
444                 DeviceNum: 1,
445                 BytesFree: BlockSize * 1000,
446                 BytesUsed: 1,
447         }
448 }
449
450 // String returns a volume label, including the container name.
451 func (v *AzureBlobVolume) String() string {
452         return fmt.Sprintf("azure-storage-container:%+q", v.ContainerName)
453 }
454
455 // Writable returns true, unless the -readonly flag was on when the
456 // volume was added.
457 func (v *AzureBlobVolume) Writable() bool {
458         return !v.ReadOnly
459 }
460
461 // Replication returns the replication level of the container, as
462 // specified by the -azure-storage-replication argument.
463 func (v *AzureBlobVolume) Replication() int {
464         return v.AzureReplication
465 }
466
467 // If possible, translate an Azure SDK error to a recognizable error
468 // like os.ErrNotExist.
469 func (v *AzureBlobVolume) translateError(err error) error {
470         switch {
471         case err == nil:
472                 return err
473         case strings.Contains(err.Error(), "Not Found"):
474                 // "storage: service returned without a response body (404 Not Found)"
475                 return os.ErrNotExist
476         default:
477                 return err
478         }
479 }
480
481 var keepBlockRegexp = regexp.MustCompile(`^[0-9a-f]{32}$`)
482
483 func (v *AzureBlobVolume) isKeepBlock(s string) bool {
484         return keepBlockRegexp.MatchString(s)
485 }
486
487 // EmptyTrash looks for trashed blocks that exceeded TrashLifetime
488 // and deletes them from the volume.
489 func (v *AzureBlobVolume) EmptyTrash() {
490         var bytesDeleted, bytesInTrash int64
491         var blocksDeleted, blocksInTrash int
492         params := storage.ListBlobsParameters{Include: "metadata"}
493
494         for {
495                 resp, err := v.bsClient.ListBlobs(v.ContainerName, params)
496                 if err != nil {
497                         log.Printf("EmptyTrash: ListBlobs: %v", err)
498                         break
499                 }
500                 for _, b := range resp.Blobs {
501                         // Check if the block is expired
502                         if b.Metadata["expires_at"] == "" {
503                                 continue
504                         }
505
506                         blocksInTrash++
507                         bytesInTrash += b.Properties.ContentLength
508
509                         expiresAt, err := strconv.ParseInt(b.Metadata["expires_at"], 10, 64)
510                         if err != nil {
511                                 log.Printf("EmptyTrash: ParseInt(%v): %v", b.Metadata["expires_at"], err)
512                                 continue
513                         }
514
515                         if expiresAt > time.Now().Unix() {
516                                 continue
517                         }
518
519                         err = v.bsClient.DeleteBlob(v.ContainerName, b.Name, map[string]string{
520                                 "If-Match": b.Properties.Etag,
521                         })
522                         if err != nil {
523                                 log.Printf("EmptyTrash: DeleteBlob(%v): %v", b.Name, err)
524                                 continue
525                         }
526                         blocksDeleted++
527                         bytesDeleted += b.Properties.ContentLength
528                 }
529                 if resp.NextMarker == "" {
530                         break
531                 }
532                 params.Marker = resp.NextMarker
533         }
534
535         log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
536 }