Go SDK improvements:
[arvados.git] / sdk / go / keepclient / keepclient.go
1 /* Provides low-level Get/Put primitives for accessing Arvados Keep blocks. */
2 package keepclient
3
4 import (
5         "crypto/md5"
6         "crypto/tls"
7         "errors"
8         "fmt"
9         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
10         "git.curoverse.com/arvados.git/sdk/go/streamer"
11         "io"
12         "io/ioutil"
13         "log"
14         "net/http"
15         "os"
16         "regexp"
17         "strings"
18         "sync"
19         "sync/atomic"
20         "time"
21         "unsafe"
22 )
23
24 // A Keep "block" is 64MB.
25 const BLOCKSIZE = 64 * 1024 * 1024
26
27 var BlockNotFound = errors.New("Block not found")
28 var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
29 var OversizeBlockError = errors.New("Block too big")
30 var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
31 var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
32
33 const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas"
34 const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored"
35
36 // Information about Arvados and Keep servers.
37 type KeepClient struct {
38         Arvados       *arvadosclient.ArvadosClient
39         Want_replicas int
40         Using_proxy   bool
41         service_roots *map[string]string
42         lock          sync.Mutex
43         Client        *http.Client
44 }
45
46 // Create a new KeepClient.  This will contact the API server to discover Keep
47 // servers.
48 func MakeKeepClient(arv *arvadosclient.ArvadosClient) (kc KeepClient, err error) {
49         var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
50         insecure := matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
51         kc = KeepClient{
52                 Arvados:       arv,
53                 Want_replicas: 2,
54                 Using_proxy:   false,
55                 Client: &http.Client{Transport: &http.Transport{
56                         TLSClientConfig: &tls.Config{InsecureSkipVerify: insecure}}},
57         }
58         _, err = (&kc).DiscoverKeepServers()
59
60         return kc, err
61 }
62
63 // Put a block given the block hash, a reader with the block data, and the
64 // expected length of that data.  The desired number of replicas is given in
65 // KeepClient.Want_replicas.  Returns the number of replicas that were written
66 // and if there was an error.  Note this will return InsufficientReplias
67 // whenever 0 <= replicas < this.Wants_replicas.
68 func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (locator string, replicas int, err error) {
69
70         // Buffer for reads from 'r'
71         var bufsize int
72         if expectedLength > 0 {
73                 if expectedLength > BLOCKSIZE {
74                         return "", 0, OversizeBlockError
75                 }
76                 bufsize = int(expectedLength)
77         } else {
78                 bufsize = BLOCKSIZE
79         }
80
81         t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
82         defer t.Close()
83
84         return this.putReplicas(hash, t, expectedLength)
85 }
86
87 // Put a block given the block hash and a byte buffer.  The desired number of
88 // replicas is given in KeepClient.Want_replicas.  Returns the number of
89 // replicas that were written and if there was an error.  Note this will return
90 // InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
91 func (this KeepClient) PutHB(hash string, buf []byte) (locator string, replicas int, err error) {
92         t := streamer.AsyncStreamFromSlice(buf)
93         defer t.Close()
94
95         return this.putReplicas(hash, t, int64(len(buf)))
96 }
97
98 // Put a block given a buffer.  The hash will be computed.  The desired number
99 // of replicas is given in KeepClient.Want_replicas.  Returns the number of
100 // replicas that were written and if there was an error.  Note this will return
101 // InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
102 func (this KeepClient) PutB(buffer []byte) (locator string, replicas int, err error) {
103         hash := fmt.Sprintf("%x", md5.Sum(buffer))
104         return this.PutHB(hash, buffer)
105 }
106
107 // Put a block, given a Reader.  This will read the entire reader into a buffer
108 // to compute the hash.  The desired number of replicas is given in
109 // KeepClient.Want_replicas.  Returns the number of replicas that were written
110 // and if there was an error.  Note this will return InsufficientReplias
111 // whenever 0 <= replicas < this.Wants_replicas.  Also nhote that if the block
112 // hash and data size are available, PutHR() is more efficient.
113 func (this KeepClient) PutR(r io.Reader) (locator string, replicas int, err error) {
114         if buffer, err := ioutil.ReadAll(r); err != nil {
115                 return "", 0, err
116         } else {
117                 return this.PutB(buffer)
118         }
119 }
120
121 // Get a block given a hash.  Return a reader, the expected data length, the
122 // URL the block was fetched from, and if there was an error.  If the block
123 // checksum does not match, the final Read() on the reader returned by this
124 // method will return a BadChecksum error instead of EOF.
125 func (this KeepClient) Get(hash string) (reader io.ReadCloser,
126         contentLength int64, url string, err error) {
127         return this.AuthorizedGet(hash, "", "")
128 }
129
130 // Get a block given a hash, with additional authorization provided by
131 // signature and timestamp.  Return a reader, the expected data length, the URL
132 // the block was fetched from, and if there was an error.  If the block
133 // checksum does not match, the final Read() on the reader returned by this
134 // method will return a BadChecksum error instead of EOF.
135 func (this KeepClient) AuthorizedGet(hash string,
136         signature string,
137         timestamp string) (reader io.ReadCloser,
138         contentLength int64, url string, err error) {
139
140         // Take the hash of locator and timestamp in order to identify this
141         // specific transaction in log statements.
142         requestId := fmt.Sprintf("%x", md5.Sum([]byte(hash+time.Now().String())))[0:8]
143
144         // Calculate the ordering for asking servers
145         sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
146
147         for _, host := range sv {
148                 var req *http.Request
149                 var err error
150                 var url string
151                 if signature != "" {
152                         url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
153                                 signature, timestamp)
154                 } else {
155                         url = fmt.Sprintf("%s/%s", host, hash)
156                 }
157                 if req, err = http.NewRequest("GET", url, nil); err != nil {
158                         continue
159                 }
160
161                 req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
162
163                 log.Printf("[%v] Begin download %s", requestId, url)
164
165                 var resp *http.Response
166                 if resp, err = this.Client.Do(req); err != nil || resp.StatusCode != http.StatusOK {
167                         statusCode := -1
168                         var respbody []byte
169                         if resp != nil {
170                                 statusCode = resp.StatusCode
171                                 if resp.Body != nil {
172                                         respbody, _ = ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
173                                 }
174                         }
175                         response := strings.TrimSpace(string(respbody))
176                         log.Printf("[%v] Download %v status code: %v error: \"%v\" response: \"%v\"",
177                                 requestId, url, statusCode, err, response)
178                         continue
179                 }
180
181                 if resp.StatusCode == http.StatusOK {
182                         log.Printf("[%v] Download %v status code: %v", requestId, url, resp.StatusCode)
183                         return HashCheckingReader{resp.Body, md5.New(), hash}, resp.ContentLength, url, nil
184                 }
185         }
186
187         return nil, 0, "", BlockNotFound
188 }
189
190 // Determine if a block with the given hash is available and readable, but does
191 // not return the block contents.
192 func (this KeepClient) Ask(hash string) (contentLength int64, url string, err error) {
193         return this.AuthorizedAsk(hash, "", "")
194 }
195
196 // Determine if a block with the given hash is available and readable with the
197 // given signature and timestamp, but does not return the block contents.
198 func (this KeepClient) AuthorizedAsk(hash string, signature string,
199         timestamp string) (contentLength int64, url string, err error) {
200         // Calculate the ordering for asking servers
201         sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
202
203         for _, host := range sv {
204                 var req *http.Request
205                 var err error
206                 if signature != "" {
207                         url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
208                                 signature, timestamp)
209                 } else {
210                         url = fmt.Sprintf("%s/%s", host, hash)
211                 }
212
213                 if req, err = http.NewRequest("HEAD", url, nil); err != nil {
214                         continue
215                 }
216
217                 req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
218
219                 var resp *http.Response
220                 if resp, err = this.Client.Do(req); err != nil {
221                         continue
222                 }
223
224                 if resp.StatusCode == http.StatusOK {
225                         return resp.ContentLength, url, nil
226                 }
227         }
228
229         return 0, "", BlockNotFound
230
231 }
232
233 // Atomically read the service_roots field.
234 func (this *KeepClient) ServiceRoots() map[string]string {
235         r := (*map[string]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
236         return *r
237 }
238
239 // Atomically update the service_roots field.  Enables you to update
240 // service_roots without disrupting any GET or PUT operations that might
241 // already be in progress.
242 func (this *KeepClient) SetServiceRoots(new_roots map[string]string) {
243         roots := make(map[string]string)
244         for uuid, root := range new_roots {
245                 roots[uuid] = root
246         }
247         atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)),
248                 unsafe.Pointer(&roots))
249 }
250
251 type Locator struct {
252         Hash      string
253         Size      int
254         Signature string
255         Timestamp string
256 }
257
258 func MakeLocator2(hash string, hints string) (locator Locator) {
259         locator.Hash = hash
260         if hints != "" {
261                 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
262                 for _, hint := range strings.Split(hints, "+") {
263                         if hint != "" {
264                                 if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
265                                         fmt.Sscanf(hint, "%d", &locator.Size)
266                                 } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
267                                         locator.Signature = m[1]
268                                         locator.Timestamp = m[2]
269                                 } else if match, _ := regexp.MatchString("^[:upper:]", hint); match {
270                                         // Any unknown hint that starts with an uppercase letter is
271                                         // presumed to be valid and ignored, to permit forward compatibility.
272                                 } else {
273                                         // Unknown format; not a valid locator.
274                                         return Locator{"", 0, "", ""}
275                                 }
276                         }
277                 }
278         }
279         return locator
280 }
281
282 func MakeLocator(path string) Locator {
283         pathpattern, err := regexp.Compile("^([0-9a-f]{32})([+].*)?$")
284         if err != nil {
285                 log.Print("Don't like regexp", err)
286         }
287
288         sm := pathpattern.FindStringSubmatch(path)
289         if sm == nil {
290                 log.Print("Failed match ", path)
291                 return Locator{"", 0, "", ""}
292         }
293
294         return MakeLocator2(sm[1], sm[2])
295 }