13996: Unit tests pass
[arvados.git] / services / keepstore / handlers.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "container/list"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "os"
16         "regexp"
17         "runtime"
18         "strconv"
19         "strings"
20         "sync"
21         "time"
22
23         "git.curoverse.com/arvados.git/sdk/go/arvados"
24         "git.curoverse.com/arvados.git/sdk/go/health"
25         "git.curoverse.com/arvados.git/sdk/go/httpserver"
26         "github.com/gorilla/mux"
27         "github.com/prometheus/client_golang/prometheus"
28 )
29
30 type router struct {
31         *mux.Router
32         limiter     httpserver.RequestCounter
33         cluster     *arvados.Cluster
34         remoteProxy remoteProxy
35         metrics     *nodeMetrics
36 }
37
38 // MakeRESTRouter returns a new router that forwards all Keep requests
39 // to the appropriate handlers.
40 func MakeRESTRouter(cluster *arvados.Cluster, reg *prometheus.Registry) http.Handler {
41         rtr := &router{
42                 Router:  mux.NewRouter(),
43                 cluster: cluster,
44                 metrics: &nodeMetrics{reg: reg},
45         }
46
47         rtr.HandleFunc(
48                 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
49         rtr.HandleFunc(
50                 `/{hash:[0-9a-f]{32}}+{hints}`,
51                 rtr.handleGET).Methods("GET", "HEAD")
52
53         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
54         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
55         // List all blocks stored here. Privileged client only.
56         rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
57         // List blocks stored here whose hash has the given prefix.
58         // Privileged client only.
59         rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
60
61         // Internals/debugging info (runtime.MemStats)
62         rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
63
64         // List volumes: path, device number, bytes used/avail.
65         rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
66
67         // List mounts: UUID, readonly, tier, device ID, ...
68         rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
69         rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
70         rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
71
72         // Replace the current pull queue.
73         rtr.HandleFunc(`/pull`, PullHandler).Methods("PUT")
74
75         // Replace the current trash queue.
76         rtr.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
77
78         // Untrash moves blocks from trash back into store
79         rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
80
81         rtr.Handle("/_health/{check}", &health.Handler{
82                 Token:  theConfig.ManagementToken,
83                 Prefix: "/_health/",
84         }).Methods("GET")
85
86         // Any request which does not match any of these routes gets
87         // 400 Bad Request.
88         rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
89
90         rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
91         rtr.metrics.setupBufferPoolMetrics(bufs)
92         rtr.metrics.setupWorkQueueMetrics(pullq, "pull")
93         rtr.metrics.setupWorkQueueMetrics(trashq, "trash")
94         rtr.metrics.setupRequestMetrics(rtr.limiter)
95
96         instrumented := httpserver.Instrument(rtr.metrics.reg, nil,
97                 httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter)))
98         return instrumented.ServeAPI(theConfig.ManagementToken, instrumented)
99 }
100
101 // BadRequestHandler is a HandleFunc to address bad requests.
102 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
103         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
104 }
105
106 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
107         ctx, cancel := contextForResponse(context.TODO(), resp)
108         defer cancel()
109
110         locator := req.URL.Path[1:]
111         if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
112                 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster)
113                 return
114         }
115
116         if theConfig.RequireSignatures {
117                 locator := req.URL.Path[1:] // strip leading slash
118                 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
119                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
120                         return
121                 }
122         }
123
124         // TODO: Probe volumes to check whether the block _might_
125         // exist. Some volumes/types could support a quick existence
126         // check without causing other operations to suffer. If all
127         // volumes support that, and assure us the block definitely
128         // isn't here, we can return 404 now instead of waiting for a
129         // buffer.
130
131         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
132         if err != nil {
133                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
134                 return
135         }
136         defer bufs.Put(buf)
137
138         size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
139         if err != nil {
140                 code := http.StatusInternalServerError
141                 if err, ok := err.(*KeepError); ok {
142                         code = err.HTTPCode
143                 }
144                 http.Error(resp, err.Error(), code)
145                 return
146         }
147
148         resp.Header().Set("Content-Length", strconv.Itoa(size))
149         resp.Header().Set("Content-Type", "application/octet-stream")
150         resp.Write(buf[:size])
151 }
152
153 // Return a new context that gets cancelled by resp's CloseNotifier.
154 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
155         ctx, cancel := context.WithCancel(parent)
156         if cn, ok := resp.(http.CloseNotifier); ok {
157                 go func(c <-chan bool) {
158                         select {
159                         case <-c:
160                                 theConfig.debugLogf("cancel context")
161                                 cancel()
162                         case <-ctx.Done():
163                         }
164                 }(cn.CloseNotify())
165         }
166         return ctx, cancel
167 }
168
169 // Get a buffer from the pool -- but give up and return a non-nil
170 // error if ctx ends before we get a buffer.
171 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
172         bufReady := make(chan []byte)
173         go func() {
174                 bufReady <- bufs.Get(bufSize)
175         }()
176         select {
177         case buf := <-bufReady:
178                 return buf, nil
179         case <-ctx.Done():
180                 go func() {
181                         // Even if closeNotifier happened first, we
182                         // need to keep waiting for our buf so we can
183                         // return it to the pool.
184                         bufs.Put(<-bufReady)
185                 }()
186                 return nil, ErrClientDisconnect
187         }
188 }
189
190 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
191         ctx, cancel := contextForResponse(context.TODO(), resp)
192         defer cancel()
193
194         hash := mux.Vars(req)["hash"]
195
196         // Detect as many error conditions as possible before reading
197         // the body: avoid transmitting data that will not end up
198         // being written anyway.
199
200         if req.ContentLength == -1 {
201                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
202                 return
203         }
204
205         if req.ContentLength > BlockSize {
206                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
207                 return
208         }
209
210         if len(KeepVM.AllWritable()) == 0 {
211                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
212                 return
213         }
214
215         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
216         if err != nil {
217                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
218                 return
219         }
220
221         _, err = io.ReadFull(req.Body, buf)
222         if err != nil {
223                 http.Error(resp, err.Error(), 500)
224                 bufs.Put(buf)
225                 return
226         }
227
228         replication, err := PutBlock(ctx, buf, hash)
229         bufs.Put(buf)
230
231         if err != nil {
232                 code := http.StatusInternalServerError
233                 if err, ok := err.(*KeepError); ok {
234                         code = err.HTTPCode
235                 }
236                 http.Error(resp, err.Error(), code)
237                 return
238         }
239
240         // Success; add a size hint, sign the locator if possible, and
241         // return it to the client.
242         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
243         apiToken := GetAPIToken(req)
244         if theConfig.blobSigningKey != nil && apiToken != "" {
245                 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
246                 returnHash = SignLocator(returnHash, apiToken, expiry)
247         }
248         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
249         resp.Write([]byte(returnHash + "\n"))
250 }
251
252 // IndexHandler responds to "/index", "/index/{prefix}", and
253 // "/mounts/{uuid}/blocks" requests.
254 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
255         if !IsSystemAuth(GetAPIToken(req)) {
256                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
257                 return
258         }
259
260         prefix := mux.Vars(req)["prefix"]
261         if prefix == "" {
262                 req.ParseForm()
263                 prefix = req.Form.Get("prefix")
264         }
265
266         uuid := mux.Vars(req)["uuid"]
267
268         var vols []Volume
269         if uuid == "" {
270                 vols = KeepVM.AllReadable()
271         } else if v := KeepVM.Lookup(uuid, false); v == nil {
272                 http.Error(resp, "mount not found", http.StatusNotFound)
273                 return
274         } else {
275                 vols = []Volume{v}
276         }
277
278         for _, v := range vols {
279                 if err := v.IndexTo(prefix, resp); err != nil {
280                         // The only errors returned by IndexTo are
281                         // write errors returned by resp.Write(),
282                         // which probably means the client has
283                         // disconnected and this error will never be
284                         // reported to the client -- but it will
285                         // appear in our own error log.
286                         http.Error(resp, err.Error(), http.StatusInternalServerError)
287                         return
288                 }
289         }
290         // An empty line at EOF is the only way the client can be
291         // assured the entire index was received.
292         resp.Write([]byte{'\n'})
293 }
294
295 // MountsHandler responds to "GET /mounts" requests.
296 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
297         err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
298         if err != nil {
299                 http.Error(resp, err.Error(), http.StatusInternalServerError)
300         }
301 }
302
303 // PoolStatus struct
304 type PoolStatus struct {
305         Alloc uint64 `json:"BytesAllocatedCumulative"`
306         Cap   int    `json:"BuffersMax"`
307         Len   int    `json:"BuffersInUse"`
308 }
309
310 type volumeStatusEnt struct {
311         Label         string
312         Status        *VolumeStatus `json:",omitempty"`
313         VolumeStats   *ioStats      `json:",omitempty"`
314         InternalStats interface{}   `json:",omitempty"`
315 }
316
317 // NodeStatus struct
318 type NodeStatus struct {
319         Volumes         []*volumeStatusEnt
320         BufferPool      PoolStatus
321         PullQueue       WorkQueueStatus
322         TrashQueue      WorkQueueStatus
323         RequestsCurrent int
324         RequestsMax     int
325         Version         string
326 }
327
328 var st NodeStatus
329 var stLock sync.Mutex
330
331 // DebugHandler addresses /debug.json requests.
332 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
333         type debugStats struct {
334                 MemStats runtime.MemStats
335         }
336         var ds debugStats
337         runtime.ReadMemStats(&ds.MemStats)
338         err := json.NewEncoder(resp).Encode(&ds)
339         if err != nil {
340                 http.Error(resp, err.Error(), 500)
341         }
342 }
343
344 // StatusHandler addresses /status.json requests.
345 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
346         stLock.Lock()
347         rtr.readNodeStatus(&st)
348         jstat, err := json.Marshal(&st)
349         stLock.Unlock()
350         if err == nil {
351                 resp.Write(jstat)
352         } else {
353                 log.Printf("json.Marshal: %s", err)
354                 log.Printf("NodeStatus = %v", &st)
355                 http.Error(resp, err.Error(), 500)
356         }
357 }
358
359 // populate the given NodeStatus struct with current values.
360 func (rtr *router) readNodeStatus(st *NodeStatus) {
361         st.Version = version
362         vols := KeepVM.AllReadable()
363         if cap(st.Volumes) < len(vols) {
364                 st.Volumes = make([]*volumeStatusEnt, len(vols))
365         }
366         st.Volumes = st.Volumes[:0]
367         for _, vol := range vols {
368                 var internalStats interface{}
369                 if vol, ok := vol.(InternalStatser); ok {
370                         internalStats = vol.InternalStats()
371                 }
372                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
373                         Label:         vol.String(),
374                         Status:        vol.Status(),
375                         InternalStats: internalStats,
376                         //VolumeStats: KeepVM.VolumeStats(vol),
377                 })
378         }
379         st.BufferPool.Alloc = bufs.Alloc()
380         st.BufferPool.Cap = bufs.Cap()
381         st.BufferPool.Len = bufs.Len()
382         st.PullQueue = getWorkQueueStatus(pullq)
383         st.TrashQueue = getWorkQueueStatus(trashq)
384         if rtr.limiter != nil {
385                 st.RequestsCurrent = rtr.limiter.Current()
386                 st.RequestsMax = rtr.limiter.Max()
387         }
388 }
389
390 // return a WorkQueueStatus for the given queue. If q is nil (which
391 // should never happen except in test suites), return a zero status
392 // value instead of crashing.
393 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
394         if q == nil {
395                 // This should only happen during tests.
396                 return WorkQueueStatus{}
397         }
398         return q.Status()
399 }
400
401 // DeleteHandler processes DELETE requests.
402 //
403 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
404 // from all connected volumes.
405 //
406 // Only the Data Manager, or an Arvados admin with scope "all", are
407 // allowed to issue DELETE requests.  If a DELETE request is not
408 // authenticated or is issued by a non-admin user, the server returns
409 // a PermissionError.
410 //
411 // Upon receiving a valid request from an authorized user,
412 // DeleteHandler deletes all copies of the specified block on local
413 // writable volumes.
414 //
415 // Response format:
416 //
417 // If the requested blocks was not found on any volume, the response
418 // code is HTTP 404 Not Found.
419 //
420 // Otherwise, the response code is 200 OK, with a response body
421 // consisting of the JSON message
422 //
423 //    {"copies_deleted":d,"copies_failed":f}
424 //
425 // where d and f are integers representing the number of blocks that
426 // were successfully and unsuccessfully deleted.
427 //
428 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
429         hash := mux.Vars(req)["hash"]
430
431         // Confirm that this user is an admin and has a token with unlimited scope.
432         var tok = GetAPIToken(req)
433         if tok == "" || !CanDelete(tok) {
434                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
435                 return
436         }
437
438         if !theConfig.EnableDelete {
439                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
440                 return
441         }
442
443         // Delete copies of this block from all available volumes.
444         // Report how many blocks were successfully deleted, and how
445         // many were found on writable volumes but not deleted.
446         var result struct {
447                 Deleted int `json:"copies_deleted"`
448                 Failed  int `json:"copies_failed"`
449         }
450         for _, vol := range KeepVM.AllWritable() {
451                 if err := vol.Trash(hash); err == nil {
452                         result.Deleted++
453                 } else if os.IsNotExist(err) {
454                         continue
455                 } else {
456                         result.Failed++
457                         log.Println("DeleteHandler:", err)
458                 }
459         }
460
461         var st int
462
463         if result.Deleted == 0 && result.Failed == 0 {
464                 st = http.StatusNotFound
465         } else {
466                 st = http.StatusOK
467         }
468
469         resp.WriteHeader(st)
470
471         if st == http.StatusOK {
472                 if body, err := json.Marshal(result); err == nil {
473                         resp.Write(body)
474                 } else {
475                         log.Printf("json.Marshal: %s (result = %v)", err, result)
476                         http.Error(resp, err.Error(), 500)
477                 }
478         }
479 }
480
481 /* PullHandler processes "PUT /pull" requests for the data manager.
482    The request body is a JSON message containing a list of pull
483    requests in the following format:
484
485    [
486       {
487          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
488          "servers":[
489                         "keep0.qr1hi.arvadosapi.com:25107",
490                         "keep1.qr1hi.arvadosapi.com:25108"
491                  ]
492           },
493           {
494                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
495                  "servers":[
496                         "10.0.1.5:25107",
497                         "10.0.1.6:25107",
498                         "10.0.1.7:25108"
499                  ]
500           },
501           ...
502    ]
503
504    Each pull request in the list consists of a block locator string
505    and an ordered list of servers.  Keepstore should try to fetch the
506    block from each server in turn.
507
508    If the request has not been sent by the Data Manager, return 401
509    Unauthorized.
510
511    If the JSON unmarshalling fails, return 400 Bad Request.
512 */
513
514 // PullRequest consists of a block locator and an ordered list of servers
515 type PullRequest struct {
516         Locator string   `json:"locator"`
517         Servers []string `json:"servers"`
518
519         // Destination mount, or "" for "anywhere"
520         MountUUID string `json:"mount_uuid"`
521 }
522
523 // PullHandler processes "PUT /pull" requests for the data manager.
524 func PullHandler(resp http.ResponseWriter, req *http.Request) {
525         // Reject unauthorized requests.
526         if !IsSystemAuth(GetAPIToken(req)) {
527                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
528                 return
529         }
530
531         // Parse the request body.
532         var pr []PullRequest
533         r := json.NewDecoder(req.Body)
534         if err := r.Decode(&pr); err != nil {
535                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
536                 return
537         }
538
539         // We have a properly formatted pull list sent from the data
540         // manager.  Report success and send the list to the pull list
541         // manager for further handling.
542         resp.WriteHeader(http.StatusOK)
543         resp.Write([]byte(
544                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
545
546         plist := list.New()
547         for _, p := range pr {
548                 plist.PushBack(p)
549         }
550         pullq.ReplaceQueue(plist)
551 }
552
553 // TrashRequest consists of a block locator and its Mtime
554 type TrashRequest struct {
555         Locator    string `json:"locator"`
556         BlockMtime int64  `json:"block_mtime"`
557
558         // Target mount, or "" for "everywhere"
559         MountUUID string `json:"mount_uuid"`
560 }
561
562 // TrashHandler processes /trash requests.
563 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
564         // Reject unauthorized requests.
565         if !IsSystemAuth(GetAPIToken(req)) {
566                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
567                 return
568         }
569
570         // Parse the request body.
571         var trash []TrashRequest
572         r := json.NewDecoder(req.Body)
573         if err := r.Decode(&trash); err != nil {
574                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
575                 return
576         }
577
578         // We have a properly formatted trash list sent from the data
579         // manager.  Report success and send the list to the trash work
580         // queue for further handling.
581         resp.WriteHeader(http.StatusOK)
582         resp.Write([]byte(
583                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
584
585         tlist := list.New()
586         for _, t := range trash {
587                 tlist.PushBack(t)
588         }
589         trashq.ReplaceQueue(tlist)
590 }
591
592 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
593 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
594         // Reject unauthorized requests.
595         if !IsSystemAuth(GetAPIToken(req)) {
596                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
597                 return
598         }
599
600         hash := mux.Vars(req)["hash"]
601
602         if len(KeepVM.AllWritable()) == 0 {
603                 http.Error(resp, "No writable volumes", http.StatusNotFound)
604                 return
605         }
606
607         var untrashedOn, failedOn []string
608         var numNotFound int
609         for _, vol := range KeepVM.AllWritable() {
610                 err := vol.Untrash(hash)
611
612                 if os.IsNotExist(err) {
613                         numNotFound++
614                 } else if err != nil {
615                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
616                         failedOn = append(failedOn, vol.String())
617                 } else {
618                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
619                         untrashedOn = append(untrashedOn, vol.String())
620                 }
621         }
622
623         if numNotFound == len(KeepVM.AllWritable()) {
624                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
625                 return
626         }
627
628         if len(failedOn) == len(KeepVM.AllWritable()) {
629                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
630         } else {
631                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
632                 if len(failedOn) > 0 {
633                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
634                 }
635                 resp.Write([]byte(respBody))
636         }
637 }
638
639 // GetBlock and PutBlock implement lower-level code for handling
640 // blocks by rooting through volumes connected to the local machine.
641 // Once the handler has determined that system policy permits the
642 // request, it calls these methods to perform the actual operation.
643 //
644 // TODO(twp): this code would probably be better located in the
645 // VolumeManager interface. As an abstraction, the VolumeManager
646 // should be the only part of the code that cares about which volume a
647 // block is stored on, so it should be responsible for figuring out
648 // which volume to check for fetching blocks, storing blocks, etc.
649
650 // GetBlock fetches the block identified by "hash" into the provided
651 // buf, and returns the data size.
652 //
653 // If the block cannot be found on any volume, returns NotFoundError.
654 //
655 // If the block found does not have the correct MD5 hash, returns
656 // DiskHashError.
657 //
658 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
659         // Attempt to read the requested hash from a keep volume.
660         errorToCaller := NotFoundError
661
662         for _, vol := range KeepVM.AllReadable() {
663                 size, err := vol.Get(ctx, hash, buf)
664                 select {
665                 case <-ctx.Done():
666                         return 0, ErrClientDisconnect
667                 default:
668                 }
669                 if err != nil {
670                         // IsNotExist is an expected error and may be
671                         // ignored. All other errors are logged. In
672                         // any case we continue trying to read other
673                         // volumes. If all volumes report IsNotExist,
674                         // we return a NotFoundError.
675                         if !os.IsNotExist(err) {
676                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
677                         }
678                         // If some volume returns a transient error, return it to the caller
679                         // instead of "Not found" so it can retry.
680                         if err == VolumeBusyError {
681                                 errorToCaller = err.(*KeepError)
682                         }
683                         continue
684                 }
685                 // Check the file checksum.
686                 //
687                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
688                 if filehash != hash {
689                         // TODO: Try harder to tell a sysadmin about
690                         // this.
691                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
692                                 vol, hash, filehash)
693                         errorToCaller = DiskHashError
694                         continue
695                 }
696                 if errorToCaller == DiskHashError {
697                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
698                                 vol, hash)
699                 }
700                 return size, nil
701         }
702         return 0, errorToCaller
703 }
704
705 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
706 //
707 // PutBlock(ctx, block, hash)
708 //   Stores the BLOCK (identified by the content id HASH) in Keep.
709 //
710 //   The MD5 checksum of the block must be identical to the content id HASH.
711 //   If not, an error is returned.
712 //
713 //   PutBlock stores the BLOCK on the first Keep volume with free space.
714 //   A failure code is returned to the user only if all volumes fail.
715 //
716 //   On success, PutBlock returns nil.
717 //   On failure, it returns a KeepError with one of the following codes:
718 //
719 //   500 Collision
720 //          A different block with the same hash already exists on this
721 //          Keep server.
722 //   422 MD5Fail
723 //          The MD5 hash of the BLOCK does not match the argument HASH.
724 //   503 Full
725 //          There was not enough space left in any Keep volume to store
726 //          the object.
727 //   500 Fail
728 //          The object could not be stored for some other reason (e.g.
729 //          all writes failed). The text of the error message should
730 //          provide as much detail as possible.
731 //
732 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
733         // Check that BLOCK's checksum matches HASH.
734         blockhash := fmt.Sprintf("%x", md5.Sum(block))
735         if blockhash != hash {
736                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
737                 return 0, RequestHashError
738         }
739
740         // If we already have this data, it's intact on disk, and we
741         // can update its timestamp, return success. If we have
742         // different data with the same hash, return failure.
743         if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
744                 return n, err
745         } else if ctx.Err() != nil {
746                 return 0, ErrClientDisconnect
747         }
748
749         // Choose a Keep volume to write to.
750         // If this volume fails, try all of the volumes in order.
751         if vol := KeepVM.NextWritable(); vol != nil {
752                 if err := vol.Put(ctx, hash, block); err == nil {
753                         return vol.Replication(), nil // success!
754                 }
755                 if ctx.Err() != nil {
756                         return 0, ErrClientDisconnect
757                 }
758         }
759
760         writables := KeepVM.AllWritable()
761         if len(writables) == 0 {
762                 log.Print("No writable volumes.")
763                 return 0, FullError
764         }
765
766         allFull := true
767         for _, vol := range writables {
768                 err := vol.Put(ctx, hash, block)
769                 if ctx.Err() != nil {
770                         return 0, ErrClientDisconnect
771                 }
772                 if err == nil {
773                         return vol.Replication(), nil // success!
774                 }
775                 if err != FullError {
776                         // The volume is not full but the
777                         // write did not succeed.  Report the
778                         // error and continue trying.
779                         allFull = false
780                         log.Printf("%s: Write(%s): %s", vol, hash, err)
781                 }
782         }
783
784         if allFull {
785                 log.Print("All volumes are full.")
786                 return 0, FullError
787         }
788         // Already logged the non-full errors.
789         return 0, GenericError
790 }
791
792 // CompareAndTouch returns the current replication level if one of the
793 // volumes already has the given content and it successfully updates
794 // the relevant block's modification time in order to protect it from
795 // premature garbage collection. Otherwise, it returns a non-nil
796 // error.
797 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
798         var bestErr error = NotFoundError
799         for _, vol := range KeepVM.AllWritable() {
800                 err := vol.Compare(ctx, hash, buf)
801                 if ctx.Err() != nil {
802                         return 0, ctx.Err()
803                 } else if err == CollisionError {
804                         // Stop if we have a block with same hash but
805                         // different content. (It will be impossible
806                         // to tell which one is wanted if we have
807                         // both, so there's no point writing it even
808                         // on a different volume.)
809                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
810                         return 0, err
811                 } else if os.IsNotExist(err) {
812                         // Block does not exist. This is the only
813                         // "normal" error: we don't log anything.
814                         continue
815                 } else if err != nil {
816                         // Couldn't open file, data is corrupt on
817                         // disk, etc.: log this abnormal condition,
818                         // and try the next volume.
819                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
820                         continue
821                 }
822                 if err := vol.Touch(hash); err != nil {
823                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
824                         bestErr = err
825                         continue
826                 }
827                 // Compare and Touch both worked --> done.
828                 return vol.Replication(), nil
829         }
830         return 0, bestErr
831 }
832
833 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
834
835 // IsValidLocator returns true if the specified string is a valid Keep locator.
836 //   When Keep is extended to support hash types other than MD5,
837 //   this should be updated to cover those as well.
838 //
839 func IsValidLocator(loc string) bool {
840         return validLocatorRe.MatchString(loc)
841 }
842
843 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
844
845 // GetAPIToken returns the OAuth2 token from the Authorization
846 // header of a HTTP request, or an empty string if no matching
847 // token is found.
848 func GetAPIToken(req *http.Request) string {
849         if auth, ok := req.Header["Authorization"]; ok {
850                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
851                         return match[2]
852                 }
853         }
854         return ""
855 }
856
857 // IsExpired returns true if the given Unix timestamp (expressed as a
858 // hexadecimal string) is in the past, or if timestampHex cannot be
859 // parsed as a hexadecimal string.
860 func IsExpired(timestampHex string) bool {
861         ts, err := strconv.ParseInt(timestampHex, 16, 0)
862         if err != nil {
863                 log.Printf("IsExpired: %s", err)
864                 return true
865         }
866         return time.Unix(ts, 0).Before(time.Now())
867 }
868
869 // CanDelete returns true if the user identified by apiToken is
870 // allowed to delete blocks.
871 func CanDelete(apiToken string) bool {
872         if apiToken == "" {
873                 return false
874         }
875         // Blocks may be deleted only when Keep has been configured with a
876         // data manager.
877         if IsSystemAuth(apiToken) {
878                 return true
879         }
880         // TODO(twp): look up apiToken with the API server
881         // return true if is_admin is true and if the token
882         // has unlimited scope
883         return false
884 }
885
886 // IsSystemAuth returns true if the given token is allowed to perform
887 // system level actions like deleting data.
888 func IsSystemAuth(token string) bool {
889         return token != "" && token == theConfig.systemAuthToken
890 }