4621: collate_output pipes to python
[arvados.git] / sdk / go / keepclient / keepclient.go
1 /* Provides low-level Get/Put primitives for accessing Arvados Keep blocks. */
2 package keepclient
3
4 import (
5         "git.curoverse.com/arvados.git/sdk/go/streamer"
6         "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
7         "crypto/md5"
8         "errors"
9         "fmt"
10         "io"
11         "io/ioutil"
12         "log"
13         "net/http"
14         "regexp"
15         "strings"
16         "sync"
17         "sync/atomic"
18         "unsafe"
19 )
20
21 // A Keep "block" is 64MB.
22 const BLOCKSIZE = 64 * 1024 * 1024
23
24 var BlockNotFound = errors.New("Block not found")
25 var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
26 var OversizeBlockError = errors.New("Block too big")
27 var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
28 var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
29
30 const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas"
31 const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored"
32
33 // Information about Arvados and Keep servers.
34 type KeepClient struct {
35         Arvados       *arvadosclient.ArvadosClient
36         Want_replicas int
37         Using_proxy   bool
38         service_roots *map[string]string
39         lock          sync.Mutex
40         Client        *http.Client
41 }
42
43 // Create a new KeepClient.  This will contact the API server to discover Keep
44 // servers.
45 func MakeKeepClient(arv *arvadosclient.ArvadosClient) (kc KeepClient, err error) {
46         kc = KeepClient{
47                 Arvados:       arv,
48                 Want_replicas: 2,
49                 Using_proxy:   false,
50                 Client:        &http.Client{Transport: &http.Transport{}}}
51
52         err = (&kc).DiscoverKeepServers()
53
54         return kc, err
55 }
56
57 // Put a block given the block hash, a reader with the block data, and the
58 // expected length of that data.  The desired number of replicas is given in
59 // KeepClient.Want_replicas.  Returns the number of replicas that were written
60 // and if there was an error.  Note this will return InsufficientReplias
61 // whenever 0 <= replicas < this.Wants_replicas.
62 func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (locator string, replicas int, err error) {
63
64         // Buffer for reads from 'r'
65         var bufsize int
66         if expectedLength > 0 {
67                 if expectedLength > BLOCKSIZE {
68                         return "", 0, OversizeBlockError
69                 }
70                 bufsize = int(expectedLength)
71         } else {
72                 bufsize = BLOCKSIZE
73         }
74
75         t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
76         defer t.Close()
77
78         return this.putReplicas(hash, t, expectedLength)
79 }
80
81 // Put a block given the block hash and a byte buffer.  The desired number of
82 // replicas is given in KeepClient.Want_replicas.  Returns the number of
83 // replicas that were written and if there was an error.  Note this will return
84 // InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
85 func (this KeepClient) PutHB(hash string, buf []byte) (locator string, replicas int, err error) {
86         t := streamer.AsyncStreamFromSlice(buf)
87         defer t.Close()
88
89         return this.putReplicas(hash, t, int64(len(buf)))
90 }
91
92 // Put a block given a buffer.  The hash will be computed.  The desired number
93 // of replicas is given in KeepClient.Want_replicas.  Returns the number of
94 // replicas that were written and if there was an error.  Note this will return
95 // InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
96 func (this KeepClient) PutB(buffer []byte) (locator string, replicas int, err error) {
97         hash := fmt.Sprintf("%x", md5.Sum(buffer))
98         return this.PutHB(hash, buffer)
99 }
100
101 // Put a block, given a Reader.  This will read the entire reader into a buffer
102 // to compute the hash.  The desired number of replicas is given in
103 // KeepClient.Want_replicas.  Returns the number of replicas that were written
104 // and if there was an error.  Note this will return InsufficientReplias
105 // whenever 0 <= replicas < this.Wants_replicas.  Also nhote that if the block
106 // hash and data size are available, PutHR() is more efficient.
107 func (this KeepClient) PutR(r io.Reader) (locator string, replicas int, err error) {
108         if buffer, err := ioutil.ReadAll(r); err != nil {
109                 return "", 0, err
110         } else {
111                 return this.PutB(buffer)
112         }
113 }
114
115 // Get a block given a hash.  Return a reader, the expected data length, the
116 // URL the block was fetched from, and if there was an error.  If the block
117 // checksum does not match, the final Read() on the reader returned by this
118 // method will return a BadChecksum error instead of EOF.
119 func (this KeepClient) Get(hash string) (reader io.ReadCloser,
120         contentLength int64, url string, err error) {
121         return this.AuthorizedGet(hash, "", "")
122 }
123
124 // Get a block given a hash, with additional authorization provided by
125 // signature and timestamp.  Return a reader, the expected data length, the URL
126 // the block was fetched from, and if there was an error.  If the block
127 // checksum does not match, the final Read() on the reader returned by this
128 // method will return a BadChecksum error instead of EOF.
129 func (this KeepClient) AuthorizedGet(hash string,
130         signature string,
131         timestamp string) (reader io.ReadCloser,
132         contentLength int64, url string, err error) {
133
134         // Calculate the ordering for asking servers
135         sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
136
137         for _, host := range sv {
138                 var req *http.Request
139                 var err error
140                 var url string
141                 if signature != "" {
142                         url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
143                                 signature, timestamp)
144                 } else {
145                         url = fmt.Sprintf("%s/%s", host, hash)
146                 }
147                 if req, err = http.NewRequest("GET", url, nil); err != nil {
148                         continue
149                 }
150
151                 req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
152
153                 var resp *http.Response
154                 if resp, err = this.Client.Do(req); err != nil {
155                         continue
156                 }
157
158                 if resp.StatusCode == http.StatusOK {
159                         return HashCheckingReader{resp.Body, md5.New(), hash}, resp.ContentLength, url, nil
160                 }
161         }
162
163         return nil, 0, "", BlockNotFound
164 }
165
166 // Determine if a block with the given hash is available and readable, but does
167 // not return the block contents.
168 func (this KeepClient) Ask(hash string) (contentLength int64, url string, err error) {
169         return this.AuthorizedAsk(hash, "", "")
170 }
171
172 // Determine if a block with the given hash is available and readable with the
173 // given signature and timestamp, but does not return the block contents.
174 func (this KeepClient) AuthorizedAsk(hash string, signature string,
175         timestamp string) (contentLength int64, url string, err error) {
176         // Calculate the ordering for asking servers
177         sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
178
179         for _, host := range sv {
180                 var req *http.Request
181                 var err error
182                 if signature != "" {
183                         url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
184                                 signature, timestamp)
185                 } else {
186                         url = fmt.Sprintf("%s/%s", host, hash)
187                 }
188
189                 if req, err = http.NewRequest("HEAD", url, nil); err != nil {
190                         continue
191                 }
192
193                 req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
194
195                 var resp *http.Response
196                 if resp, err = this.Client.Do(req); err != nil {
197                         continue
198                 }
199
200                 if resp.StatusCode == http.StatusOK {
201                         return resp.ContentLength, url, nil
202                 }
203         }
204
205         return 0, "", BlockNotFound
206
207 }
208
209 // Atomically read the service_roots field.
210 func (this *KeepClient) ServiceRoots() map[string]string {
211         r := (*map[string]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
212         return *r
213 }
214
215 // Atomically update the service_roots field.  Enables you to update
216 // service_roots without disrupting any GET or PUT operations that might
217 // already be in progress.
218 func (this *KeepClient) SetServiceRoots(new_roots map[string]string) {
219         roots := make(map[string]string)
220         for uuid, root := range new_roots {
221                 roots[uuid] = root
222         }
223         atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)),
224                 unsafe.Pointer(&roots))
225 }
226
227 type Locator struct {
228         Hash      string
229         Size      int
230         Signature string
231         Timestamp string
232 }
233
234 func MakeLocator2(hash string, hints string) (locator Locator) {
235         locator.Hash = hash
236         if hints != "" {
237                 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
238                 for _, hint := range strings.Split(hints, "+") {
239                         if hint != "" {
240                                 if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
241                                         fmt.Sscanf(hint, "%d", &locator.Size)
242                                 } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
243                                         locator.Signature = m[1]
244                                         locator.Timestamp = m[2]
245                                 } else if match, _ := regexp.MatchString("^[:upper:]", hint); match {
246                                         // Any unknown hint that starts with an uppercase letter is
247                                         // presumed to be valid and ignored, to permit forward compatibility.
248                                 } else {
249                                         // Unknown format; not a valid locator.
250                                         return Locator{"", 0, "", ""}
251                                 }
252                         }
253                 }
254         }
255         return locator
256 }
257
258 func MakeLocator(path string) Locator {
259         pathpattern, err := regexp.Compile("^([0-9a-f]{32})([+].*)?$")
260         if err != nil {
261                 log.Print("Don't like regexp", err)
262         }
263
264         sm := pathpattern.FindStringSubmatch(path)
265         if sm == nil {
266                 log.Print("Failed match ", path)
267                 return Locator{"", 0, "", ""}
268         }
269
270         return MakeLocator2(sm[1], sm[2])
271 }