91989bd4794fcde574a9c0849e9979ad0dc86cb5
[arvados.git] / sdk / go / src / arvados.org / keepclient / keepclient.go
1 /* Provides low-level Get/Put primitives for accessing Arvados Keep blocks. */
2 package keepclient
3
4 import (
5         "arvados.org/streamer"
6         "crypto/md5"
7         "crypto/tls"
8         "errors"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "net/http"
13         "os"
14 )
15
16 // A Keep "block" is 64MB.
17 const BLOCKSIZE = 64 * 1024 * 1024
18
19 var BlockNotFound = errors.New("Block not found")
20 var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
21 var OversizeBlockError = errors.New("Block too big")
22
23 // Information about Arvados and Keep servers.
24 type KeepClient struct {
25         ApiServer     string
26         ApiToken      string
27         ApiInsecure   bool
28         Service_roots []string
29         Want_replicas int
30         Client        *http.Client
31         Using_proxy   bool
32         External      bool
33 }
34
35 // Create a new KeepClient, initialized with standard Arvados environment
36 // variables ARVADOS_API_HOST, ARVADOS_API_TOKEN, and (optionally)
37 // ARVADOS_API_HOST_INSECURE.  This will contact the API server to discover
38 // Keep servers.
39 func MakeKeepClient() (kc KeepClient, err error) {
40         insecure := (os.Getenv("ARVADOS_API_HOST_INSECURE") == "true")
41         external := (os.Getenv("ARVADOS_EXTERNAL_CLIENT") == "true")
42
43         kc = KeepClient{
44                 ApiServer:     os.Getenv("ARVADOS_API_HOST"),
45                 ApiToken:      os.Getenv("ARVADOS_API_TOKEN"),
46                 ApiInsecure:   insecure,
47                 Want_replicas: 2,
48                 Client: &http.Client{Transport: &http.Transport{
49                         TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}},
50                 Using_proxy: false,
51                 External:    external}
52
53         err = (&kc).discoverKeepServers()
54
55         return kc, err
56 }
57
58 // Put a block given the block hash, a reader with the block data, and the
59 // expected length of that data.  The desired number of replicas is given in
60 // KeepClient.Want_replicas.  Returns the number of replicas that were written
61 // and if there was an error.  Note this will return InsufficientReplias
62 // whenever 0 <= replicas < this.Wants_replicas.
63 func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (replicas int, err error) {
64
65         // Buffer for reads from 'r'
66         var bufsize int
67         if expectedLength > 0 {
68                 if expectedLength > BLOCKSIZE {
69                         return 0, OversizeBlockError
70                 }
71                 bufsize = int(expectedLength)
72         } else {
73                 bufsize = BLOCKSIZE
74         }
75
76         t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
77         defer t.Close()
78
79         return this.putReplicas(hash, t, expectedLength)
80 }
81
82 // Put a block given the block hash and a byte buffer.  The desired number of
83 // replicas is given in KeepClient.Want_replicas.  Returns the number of
84 // replicas that were written and if there was an error.  Note this will return
85 // InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
86 func (this KeepClient) PutHB(hash string, buf []byte) (replicas int, err error) {
87         t := streamer.AsyncStreamFromSlice(buf)
88         defer t.Close()
89
90         return this.putReplicas(hash, t, int64(len(buf)))
91 }
92
93 // Put a block given a buffer.  The hash will be computed.  The desired number
94 // of replicas is given in KeepClient.Want_replicas.  Returns the number of
95 // replicas that were written and if there was an error.  Note this will return
96 // InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
97 func (this KeepClient) PutB(buffer []byte) (hash string, replicas int, err error) {
98         hash = fmt.Sprintf("%x", md5.Sum(buffer))
99         replicas, err = this.PutHB(hash, buffer)
100         return hash, replicas, err
101 }
102
103 // Put a block, given a Reader.  This will read the entire reader into a buffer
104 // to computed the hash.  The desired number of replicas is given in
105 // KeepClient.Want_replicas.  Returns the number of replicas that were written
106 // and if there was an error.  Note this will return InsufficientReplias
107 // whenever 0 <= replicas < this.Wants_replicas.  Also nhote that if the block
108 // hash and data size are available, PutHR() is more efficient.
109 func (this KeepClient) PutR(r io.Reader) (hash string, replicas int, err error) {
110         if buffer, err := ioutil.ReadAll(r); err != nil {
111                 return "", 0, err
112         } else {
113                 return this.PutB(buffer)
114         }
115 }
116
117 // Get a block given a hash.  Return a reader, the expected data length, the
118 // URL the block was fetched from, and if there was an error.  If the block
119 // checksum does not match, the final Read() on the reader returned by this
120 // method will return a BadChecksum error instead of EOF.
121 func (this KeepClient) Get(hash string) (reader io.ReadCloser,
122         contentLength int64, url string, err error) {
123         return this.AuthorizedGet(hash, "", "")
124 }
125
126 // Get a block given a hash, with additional authorization provided by
127 // signature and timestamp.  Return a reader, the expected data length, the URL
128 // the block was fetched from, and if there was an error.  If the block
129 // checksum does not match, the final Read() on the reader returned by this
130 // method will return a BadChecksum error instead of EOF.
131 func (this KeepClient) AuthorizedGet(hash string,
132         signature string,
133         timestamp string) (reader io.ReadCloser,
134         contentLength int64, url string, err error) {
135
136         // Calculate the ordering for asking servers
137         sv := this.shuffledServiceRoots(hash)
138
139         for _, host := range sv {
140                 var req *http.Request
141                 var err error
142                 var url string
143                 if signature != "" {
144                         url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
145                                 signature, timestamp)
146                 } else {
147                         url = fmt.Sprintf("%s/%s", host, hash)
148                 }
149                 if req, err = http.NewRequest("GET", url, nil); err != nil {
150                         continue
151                 }
152
153                 req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
154
155                 var resp *http.Response
156                 if resp, err = this.Client.Do(req); err != nil {
157                         continue
158                 }
159
160                 if resp.StatusCode == http.StatusOK {
161                         return HashCheckingReader{resp.Body, md5.New(), hash}, resp.ContentLength, url, nil
162                 }
163         }
164
165         return nil, 0, "", BlockNotFound
166 }
167
168 // Determine if a block with the given hash is available and readable, but does
169 // not return the block contents.
170 func (this KeepClient) Ask(hash string) (contentLength int64, url string, err error) {
171         return this.AuthorizedAsk(hash, "", "")
172 }
173
174 // Determine if a block with the given hash is available and readable with the
175 // given signature and timestamp, but does not return the block contents.
176 func (this KeepClient) AuthorizedAsk(hash string, signature string,
177         timestamp string) (contentLength int64, url string, err error) {
178         // Calculate the ordering for asking servers
179         sv := this.shuffledServiceRoots(hash)
180
181         for _, host := range sv {
182                 var req *http.Request
183                 var err error
184                 if signature != "" {
185                         url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
186                                 signature, timestamp)
187                 } else {
188                         url = fmt.Sprintf("%s/%s", host, hash)
189                 }
190
191                 if req, err = http.NewRequest("HEAD", url, nil); err != nil {
192                         continue
193                 }
194
195                 req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
196
197                 var resp *http.Response
198                 if resp, err = this.Client.Do(req); err != nil {
199                         continue
200                 }
201
202                 if resp.StatusCode == http.StatusOK {
203                         return resp.ContentLength, url, nil
204                 }
205         }
206
207         return 0, "", BlockNotFound
208
209 }