Merge branch '1885-keep-proxy' closes #1885
[arvados.git] / sdk / go / src / arvados.org / keepclient / keepclient.go
1 /* Provides low-level Get/Put primitives for accessing Arvados Keep blocks. */
2 package keepclient
3
4 import (
5         "arvados.org/streamer"
6         "crypto/md5"
7         "crypto/tls"
8         "errors"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "net/http"
13         "os"
14         "sort"
15         "sync"
16         "sync/atomic"
17         "unsafe"
18 )
19
20 // A Keep "block" is 64MB.
21 const BLOCKSIZE = 64 * 1024 * 1024
22
23 var BlockNotFound = errors.New("Block not found")
24 var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
25 var OversizeBlockError = errors.New("Block too big")
26 var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
27 var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
28
29 const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas"
30 const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored"
31
32 // Information about Arvados and Keep servers.
33 type KeepClient struct {
34         ApiServer     string
35         ApiToken      string
36         ApiInsecure   bool
37         Want_replicas int
38         Client        *http.Client
39         Using_proxy   bool
40         External      bool
41         service_roots *[]string
42         lock          sync.Mutex
43 }
44
45 // Create a new KeepClient, initialized with standard Arvados environment
46 // variables ARVADOS_API_HOST, ARVADOS_API_TOKEN, and (optionally)
47 // ARVADOS_API_HOST_INSECURE.  This will contact the API server to discover
48 // Keep servers.
49 func MakeKeepClient() (kc KeepClient, err error) {
50         insecure := (os.Getenv("ARVADOS_API_HOST_INSECURE") == "true")
51         external := (os.Getenv("ARVADOS_EXTERNAL_CLIENT") == "true")
52
53         kc = KeepClient{
54                 ApiServer:     os.Getenv("ARVADOS_API_HOST"),
55                 ApiToken:      os.Getenv("ARVADOS_API_TOKEN"),
56                 ApiInsecure:   insecure,
57                 Want_replicas: 2,
58                 Client: &http.Client{Transport: &http.Transport{
59                         TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}},
60                 Using_proxy: false,
61                 External:    external}
62
63         if os.Getenv("ARVADOS_API_HOST") == "" {
64                 return kc, MissingArvadosApiHost
65         }
66         if os.Getenv("ARVADOS_API_TOKEN") == "" {
67                 return kc, MissingArvadosApiToken
68         }
69
70         err = (&kc).DiscoverKeepServers()
71
72         return kc, err
73 }
74
75 // Put a block given the block hash, a reader with the block data, and the
76 // expected length of that data.  The desired number of replicas is given in
77 // KeepClient.Want_replicas.  Returns the number of replicas that were written
78 // and if there was an error.  Note this will return InsufficientReplias
79 // whenever 0 <= replicas < this.Wants_replicas.
80 func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (replicas int, err error) {
81
82         // Buffer for reads from 'r'
83         var bufsize int
84         if expectedLength > 0 {
85                 if expectedLength > BLOCKSIZE {
86                         return 0, OversizeBlockError
87                 }
88                 bufsize = int(expectedLength)
89         } else {
90                 bufsize = BLOCKSIZE
91         }
92
93         t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
94         defer t.Close()
95
96         return this.putReplicas(hash, t, expectedLength)
97 }
98
99 // Put a block given the block hash and a byte buffer.  The desired number of
100 // replicas is given in KeepClient.Want_replicas.  Returns the number of
101 // replicas that were written and if there was an error.  Note this will return
102 // InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
103 func (this KeepClient) PutHB(hash string, buf []byte) (replicas int, err error) {
104         t := streamer.AsyncStreamFromSlice(buf)
105         defer t.Close()
106
107         return this.putReplicas(hash, t, int64(len(buf)))
108 }
109
110 // Put a block given a buffer.  The hash will be computed.  The desired number
111 // of replicas is given in KeepClient.Want_replicas.  Returns the number of
112 // replicas that were written and if there was an error.  Note this will return
113 // InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
114 func (this KeepClient) PutB(buffer []byte) (hash string, replicas int, err error) {
115         hash = fmt.Sprintf("%x", md5.Sum(buffer))
116         replicas, err = this.PutHB(hash, buffer)
117         return hash, replicas, err
118 }
119
120 // Put a block, given a Reader.  This will read the entire reader into a buffer
121 // to computed the hash.  The desired number of replicas is given in
122 // KeepClient.Want_replicas.  Returns the number of replicas that were written
123 // and if there was an error.  Note this will return InsufficientReplias
124 // whenever 0 <= replicas < this.Wants_replicas.  Also nhote that if the block
125 // hash and data size are available, PutHR() is more efficient.
126 func (this KeepClient) PutR(r io.Reader) (hash string, replicas int, err error) {
127         if buffer, err := ioutil.ReadAll(r); err != nil {
128                 return "", 0, err
129         } else {
130                 return this.PutB(buffer)
131         }
132 }
133
134 // Get a block given a hash.  Return a reader, the expected data length, the
135 // URL the block was fetched from, and if there was an error.  If the block
136 // checksum does not match, the final Read() on the reader returned by this
137 // method will return a BadChecksum error instead of EOF.
138 func (this KeepClient) Get(hash string) (reader io.ReadCloser,
139         contentLength int64, url string, err error) {
140         return this.AuthorizedGet(hash, "", "")
141 }
142
143 // Get a block given a hash, with additional authorization provided by
144 // signature and timestamp.  Return a reader, the expected data length, the URL
145 // the block was fetched from, and if there was an error.  If the block
146 // checksum does not match, the final Read() on the reader returned by this
147 // method will return a BadChecksum error instead of EOF.
148 func (this KeepClient) AuthorizedGet(hash string,
149         signature string,
150         timestamp string) (reader io.ReadCloser,
151         contentLength int64, url string, err error) {
152
153         // Calculate the ordering for asking servers
154         sv := this.shuffledServiceRoots(hash)
155
156         for _, host := range sv {
157                 var req *http.Request
158                 var err error
159                 var url string
160                 if signature != "" {
161                         url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
162                                 signature, timestamp)
163                 } else {
164                         url = fmt.Sprintf("%s/%s", host, hash)
165                 }
166                 if req, err = http.NewRequest("GET", url, nil); err != nil {
167                         continue
168                 }
169
170                 req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
171
172                 var resp *http.Response
173                 if resp, err = this.Client.Do(req); err != nil {
174                         continue
175                 }
176
177                 if resp.StatusCode == http.StatusOK {
178                         return HashCheckingReader{resp.Body, md5.New(), hash}, resp.ContentLength, url, nil
179                 }
180         }
181
182         return nil, 0, "", BlockNotFound
183 }
184
185 // Determine if a block with the given hash is available and readable, but does
186 // not return the block contents.
187 func (this KeepClient) Ask(hash string) (contentLength int64, url string, err error) {
188         return this.AuthorizedAsk(hash, "", "")
189 }
190
191 // Determine if a block with the given hash is available and readable with the
192 // given signature and timestamp, but does not return the block contents.
193 func (this KeepClient) AuthorizedAsk(hash string, signature string,
194         timestamp string) (contentLength int64, url string, err error) {
195         // Calculate the ordering for asking servers
196         sv := this.shuffledServiceRoots(hash)
197
198         for _, host := range sv {
199                 var req *http.Request
200                 var err error
201                 if signature != "" {
202                         url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
203                                 signature, timestamp)
204                 } else {
205                         url = fmt.Sprintf("%s/%s", host, hash)
206                 }
207
208                 if req, err = http.NewRequest("HEAD", url, nil); err != nil {
209                         continue
210                 }
211
212                 req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.ApiToken))
213
214                 var resp *http.Response
215                 if resp, err = this.Client.Do(req); err != nil {
216                         continue
217                 }
218
219                 if resp.StatusCode == http.StatusOK {
220                         return resp.ContentLength, url, nil
221                 }
222         }
223
224         return 0, "", BlockNotFound
225
226 }
227
228 // Atomically read the service_roots field.
229 func (this *KeepClient) ServiceRoots() []string {
230         r := (*[]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
231         return *r
232 }
233
234 // Atomically update the service_roots field.  Enables you to update
235 // service_roots without disrupting any GET or PUT operations that might
236 // already be in progress.
237 func (this *KeepClient) SetServiceRoots(svc []string) {
238         // Must be sorted for ShuffledServiceRoots() to produce consistent
239         // results.
240         roots := make([]string, len(svc))
241         copy(roots, svc)
242         sort.Strings(roots)
243         atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)),
244                 unsafe.Pointer(&roots))
245 }