7241: Add Put() and Check()
[arvados.git] / services / keepstore / azure_blob_volume.go
1 package main
2
3 import (
4         "errors"
5         "flag"
6         "fmt"
7         "io"
8         "io/ioutil"
9         "log"
10         "strings"
11         "time"
12
13         "github.com/Azure/azure-sdk-for-go/storage"
14 )
15
16 var (
17         azureStorageAccountName    string
18         azureStorageAccountKeyFile string
19 )
20
21 type azureVolumeAdder struct {
22         *volumeSet
23 }
24
25 func (s *azureVolumeAdder) Set(containerName string) error {
26         if containerName == "" {
27                 return errors.New("no container name given")
28         }
29         buf, err := ioutil.ReadFile(azureStorageAccountKeyFile)
30         if err != nil {
31                 return errors.New("reading key from " + azureStorageAccountKeyFile + ": " + err.Error())
32         }
33         accountKey := strings.TrimSpace(string(buf))
34         if accountKey == "" {
35                 return errors.New("empty account key in " + azureStorageAccountKeyFile)
36         }
37         azClient, err := storage.NewBasicClient(azureStorageAccountName, accountKey)
38         if err != nil {
39                 return errors.New("creating Azure storage client: " + err.Error())
40         }
41         if flagSerializeIO {
42                 log.Print("Notice: -serialize is not supported by azure-blob-container volumes.")
43         }
44         v := NewAzureBlobVolume(azClient, containerName, flagReadonly)
45         if err := v.Check(); err != nil {
46                 return err
47         }
48         *s.volumeSet = append(*s.volumeSet, v)
49         return nil
50 }
51
52 func init() {
53         flag.Var(&azureVolumeAdder{&volumes},
54                 "azure-storage-container-volume",
55                 "Use the given container as a storage volume. Can be given multiple times.")
56         flag.StringVar(
57                 &azureStorageAccountName,
58                 "azure-storage-account-name",
59                 "",
60                 "Azure storage account name used for subsequent --azure-storage-container-volume arguments.")
61         flag.StringVar(
62                 &azureStorageAccountKeyFile,
63                 "azure-storage-account-key-file",
64                 "",
65                 "File containing the account key used for subsequent --azure-storage-container-volume arguments.")
66 }
67
68 // An AzureBlobVolume stores and retrieves blocks in an Azure Blob
69 // container.
70 type AzureBlobVolume struct {
71         azClient      storage.Client
72         bsClient      storage.BlobStorageClient
73         containerName string
74         readonly      bool
75 }
76
77 func NewAzureBlobVolume(client storage.Client, containerName string, readonly bool) *AzureBlobVolume {
78         return &AzureBlobVolume{
79                 azClient: client,
80                 bsClient: client.GetBlobService(),
81                 containerName: containerName,
82                 readonly: readonly,
83         }
84 }
85
86 // Check returns nil if the volume is usable.
87 func (v *AzureBlobVolume) Check() error {
88         ok, err := v.bsClient.ContainerExists(v.containerName)
89         if err != nil {
90                 return err
91         }
92         if !ok {
93                 return errors.New("container does not exist")
94         }
95         return nil
96 }
97
98 func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
99         rdr, err := v.bsClient.GetBlob(v.containerName, loc)
100         if err != nil {
101                 return nil, err
102         }
103         defer rdr.Close()
104         buf := bufs.Get(BlockSize)
105         n, err := io.ReadFull(rdr, buf)
106         switch err {
107         case io.EOF, io.ErrUnexpectedEOF:
108                 return buf[:n], nil
109         default:
110                 bufs.Put(buf)
111                 return nil, err
112         }
113 }
114
115 func (v *AzureBlobVolume) Compare(loc string, data []byte) error {
116         return NotFoundError
117 }
118
119 func (v *AzureBlobVolume) Put(loc string, block []byte) error {
120         if err := v.bsClient.CreateBlockBlob(v.containerName, loc); err != nil {
121                 return err
122         }
123         // We use the same block ID, base64("0")=="MA==", for everything.
124         if err := v.bsClient.PutBlock(v.containerName, loc, "MA==", block); err != nil {
125                 return err
126         }
127         return v.bsClient.PutBlockList(v.containerName, loc, []storage.Block{{"MA==", storage.BlockStatusUncommitted}})
128 }
129
130 func (v *AzureBlobVolume) Touch(loc string) error {
131         return NotFoundError
132 }
133
134 func (v *AzureBlobVolume) Mtime(loc string) (time.Time, error) {
135         return time.Time{}, NotFoundError
136 }
137
138 func (v *AzureBlobVolume) IndexTo(prefix string, writer io.Writer) error {
139         return nil
140 }
141
142 func (v *AzureBlobVolume) Delete(loc string) error {
143         return NotFoundError
144 }
145
146 func (v *AzureBlobVolume) Status() *VolumeStatus {
147         return &VolumeStatus{
148                 DeviceNum: 1,
149                 BytesFree: BlockSize * 1000,
150                 BytesUsed: 1,
151         }
152 }
153
154 func (v *AzureBlobVolume) String() string {
155         return fmt.Sprintf("azure-storage-container:%+q", v.containerName)
156 }
157
158 func (v *AzureBlobVolume) Writable() bool {
159         return !v.readonly
160 }