1 /* Provides low-level Get/Put primitives for accessing Arvados Keep blocks. */
8 "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
9 "git.curoverse.com/arvados.git/sdk/go/streamer"
22 // A Keep "block" is 64MB.
23 const BLOCKSIZE = 64 * 1024 * 1024
25 var BlockNotFound = errors.New("Block not found")
26 var InsufficientReplicasError = errors.New("Could not write sufficient replicas")
27 var OversizeBlockError = errors.New("Block too big")
28 var MissingArvadosApiHost = errors.New("Missing required environment variable ARVADOS_API_HOST")
29 var MissingArvadosApiToken = errors.New("Missing required environment variable ARVADOS_API_TOKEN")
31 const X_Keep_Desired_Replicas = "X-Keep-Desired-Replicas"
32 const X_Keep_Replicas_Stored = "X-Keep-Replicas-Stored"
34 // Information about Arvados and Keep servers.
35 type KeepClient struct {
36 Arvados *arvadosclient.ArvadosClient
39 service_roots *map[string]string
44 // Create a new KeepClient. This will contact the API server to discover Keep
46 func MakeKeepClient(arv *arvadosclient.ArvadosClient) (kc KeepClient, err error) {
51 Client: &http.Client{},
53 err = (&kc).DiscoverKeepServers()
58 // Put a block given the block hash, a reader with the block data, and the
59 // expected length of that data. The desired number of replicas is given in
60 // KeepClient.Want_replicas. Returns the number of replicas that were written
61 // and if there was an error. Note this will return InsufficientReplias
62 // whenever 0 <= replicas < this.Wants_replicas.
63 func (this KeepClient) PutHR(hash string, r io.Reader, expectedLength int64) (locator string, replicas int, err error) {
65 // Buffer for reads from 'r'
67 if expectedLength > 0 {
68 if expectedLength > BLOCKSIZE {
69 return "", 0, OversizeBlockError
71 bufsize = int(expectedLength)
76 t := streamer.AsyncStreamFromReader(bufsize, HashCheckingReader{r, md5.New(), hash})
79 return this.putReplicas(hash, t, expectedLength)
82 // Put a block given the block hash and a byte buffer. The desired number of
83 // replicas is given in KeepClient.Want_replicas. Returns the number of
84 // replicas that were written and if there was an error. Note this will return
85 // InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
86 func (this KeepClient) PutHB(hash string, buf []byte) (locator string, replicas int, err error) {
87 t := streamer.AsyncStreamFromSlice(buf)
90 return this.putReplicas(hash, t, int64(len(buf)))
93 // Put a block given a buffer. The hash will be computed. The desired number
94 // of replicas is given in KeepClient.Want_replicas. Returns the number of
95 // replicas that were written and if there was an error. Note this will return
96 // InsufficientReplias whenever 0 <= replicas < this.Wants_replicas.
97 func (this KeepClient) PutB(buffer []byte) (locator string, replicas int, err error) {
98 hash := fmt.Sprintf("%x", md5.Sum(buffer))
99 return this.PutHB(hash, buffer)
102 // Put a block, given a Reader. This will read the entire reader into a buffer
103 // to compute the hash. The desired number of replicas is given in
104 // KeepClient.Want_replicas. Returns the number of replicas that were written
105 // and if there was an error. Note this will return InsufficientReplias
106 // whenever 0 <= replicas < this.Wants_replicas. Also nhote that if the block
107 // hash and data size are available, PutHR() is more efficient.
108 func (this KeepClient) PutR(r io.Reader) (locator string, replicas int, err error) {
109 if buffer, err := ioutil.ReadAll(r); err != nil {
112 return this.PutB(buffer)
116 // Get a block given a hash. Return a reader, the expected data length, the
117 // URL the block was fetched from, and if there was an error. If the block
118 // checksum does not match, the final Read() on the reader returned by this
119 // method will return a BadChecksum error instead of EOF.
120 func (this KeepClient) Get(hash string) (reader io.ReadCloser,
121 contentLength int64, url string, err error) {
122 return this.AuthorizedGet(hash, "", "")
125 // Get a block given a hash, with additional authorization provided by
126 // signature and timestamp. Return a reader, the expected data length, the URL
127 // the block was fetched from, and if there was an error. If the block
128 // checksum does not match, the final Read() on the reader returned by this
129 // method will return a BadChecksum error instead of EOF.
130 func (this KeepClient) AuthorizedGet(hash string,
132 timestamp string) (reader io.ReadCloser,
133 contentLength int64, url string, err error) {
135 // Take the hash of locator and timestamp in order to identify this
136 // specific transaction in log statements.
137 requestId := fmt.Sprintf("%x", md5.Sum([]byte(hash+time.Now().String())))[0:8]
139 // Calculate the ordering for asking servers
140 sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
142 for _, host := range sv {
143 var req *http.Request
147 url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
148 signature, timestamp)
150 url = fmt.Sprintf("%s/%s", host, hash)
152 if req, err = http.NewRequest("GET", url, nil); err != nil {
156 req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
158 log.Printf("[%v] Begin download %s", requestId, url)
160 var resp *http.Response
161 if resp, err = this.Client.Do(req); err != nil || resp.StatusCode != http.StatusOK {
162 respbody, _ := ioutil.ReadAll(&io.LimitedReader{resp.Body, 4096})
163 response := strings.TrimSpace(string(respbody))
164 log.Printf("[%v] Download %v status code: %v error: \"%v\" response: \"%v\"",
165 requestId, url, resp.StatusCode, err, response)
169 if resp.StatusCode == http.StatusOK {
170 log.Printf("[%v] Download %v status code: %v", requestId, url, resp.StatusCode)
171 return HashCheckingReader{resp.Body, md5.New(), hash}, resp.ContentLength, url, nil
175 return nil, 0, "", BlockNotFound
178 // Determine if a block with the given hash is available and readable, but does
179 // not return the block contents.
180 func (this KeepClient) Ask(hash string) (contentLength int64, url string, err error) {
181 return this.AuthorizedAsk(hash, "", "")
184 // Determine if a block with the given hash is available and readable with the
185 // given signature and timestamp, but does not return the block contents.
186 func (this KeepClient) AuthorizedAsk(hash string, signature string,
187 timestamp string) (contentLength int64, url string, err error) {
188 // Calculate the ordering for asking servers
189 sv := NewRootSorter(this.ServiceRoots(), hash).GetSortedRoots()
191 for _, host := range sv {
192 var req *http.Request
195 url = fmt.Sprintf("%s/%s+A%s@%s", host, hash,
196 signature, timestamp)
198 url = fmt.Sprintf("%s/%s", host, hash)
201 if req, err = http.NewRequest("HEAD", url, nil); err != nil {
205 req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", this.Arvados.ApiToken))
207 var resp *http.Response
208 if resp, err = this.Client.Do(req); err != nil {
212 if resp.StatusCode == http.StatusOK {
213 return resp.ContentLength, url, nil
217 return 0, "", BlockNotFound
221 // Atomically read the service_roots field.
222 func (this *KeepClient) ServiceRoots() map[string]string {
223 r := (*map[string]string)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots))))
227 // Atomically update the service_roots field. Enables you to update
228 // service_roots without disrupting any GET or PUT operations that might
229 // already be in progress.
230 func (this *KeepClient) SetServiceRoots(new_roots map[string]string) {
231 roots := make(map[string]string)
232 for uuid, root := range new_roots {
235 atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&this.service_roots)),
236 unsafe.Pointer(&roots))
239 type Locator struct {
246 func MakeLocator2(hash string, hints string) (locator Locator) {
249 signature_pat, _ := regexp.Compile("^A([[:xdigit:]]+)@([[:xdigit:]]{8})$")
250 for _, hint := range strings.Split(hints, "+") {
252 if match, _ := regexp.MatchString("^[[:digit:]]+$", hint); match {
253 fmt.Sscanf(hint, "%d", &locator.Size)
254 } else if m := signature_pat.FindStringSubmatch(hint); m != nil {
255 locator.Signature = m[1]
256 locator.Timestamp = m[2]
257 } else if match, _ := regexp.MatchString("^[:upper:]", hint); match {
258 // Any unknown hint that starts with an uppercase letter is
259 // presumed to be valid and ignored, to permit forward compatibility.
261 // Unknown format; not a valid locator.
262 return Locator{"", 0, "", ""}
270 func MakeLocator(path string) Locator {
271 pathpattern, err := regexp.Compile("^([0-9a-f]{32})([+].*)?$")
273 log.Print("Don't like regexp", err)
276 sm := pathpattern.FindStringSubmatch(path)
278 log.Print("Failed match ", path)
279 return Locator{"", 0, "", ""}
282 return MakeLocator2(sm[1], sm[2])