closes #10524
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "container/list"
12         "context"
13         "crypto/md5"
14         "encoding/json"
15         "fmt"
16         "github.com/gorilla/mux"
17         "io"
18         "net/http"
19         "os"
20         "regexp"
21         "runtime"
22         "strconv"
23         "strings"
24         "sync"
25         "time"
26
27         log "github.com/Sirupsen/logrus"
28 )
29
30 // MakeRESTRouter returns a new mux.Router that forwards all Keep
31 // requests to the appropriate handlers.
32 //
33 func MakeRESTRouter() *mux.Router {
34         rest := mux.NewRouter()
35
36         rest.HandleFunc(
37                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
38         rest.HandleFunc(
39                 `/{hash:[0-9a-f]{32}}+{hints}`,
40                 GetBlockHandler).Methods("GET", "HEAD")
41
42         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
43         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
44         // List all blocks stored here. Privileged client only.
45         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
46         // List blocks stored here whose hash has the given prefix.
47         // Privileged client only.
48         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
49
50         // Internals/debugging info (runtime.MemStats)
51         rest.HandleFunc(`/debug.json`, DebugHandler).Methods("GET", "HEAD")
52
53         // List volumes: path, device number, bytes used/avail.
54         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
55
56         // Replace the current pull queue.
57         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
58
59         // Replace the current trash queue.
60         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
61
62         // Untrash moves blocks from trash back into store
63         rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
64
65         // Any request which does not match any of these routes gets
66         // 400 Bad Request.
67         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
68
69         return rest
70 }
71
72 // BadRequestHandler is a HandleFunc to address bad requests.
73 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
74         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
75 }
76
77 // GetBlockHandler is a HandleFunc to address Get block requests.
78 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
79         ctx, cancel := contextForResponse(context.TODO(), resp)
80         defer cancel()
81
82         if theConfig.RequireSignatures {
83                 locator := req.URL.Path[1:] // strip leading slash
84                 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
85                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
86                         return
87                 }
88         }
89
90         // TODO: Probe volumes to check whether the block _might_
91         // exist. Some volumes/types could support a quick existence
92         // check without causing other operations to suffer. If all
93         // volumes support that, and assure us the block definitely
94         // isn't here, we can return 404 now instead of waiting for a
95         // buffer.
96
97         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
98         if err != nil {
99                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
100                 return
101         }
102         defer bufs.Put(buf)
103
104         size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
105         if err != nil {
106                 code := http.StatusInternalServerError
107                 if err, ok := err.(*KeepError); ok {
108                         code = err.HTTPCode
109                 }
110                 http.Error(resp, err.Error(), code)
111                 return
112         }
113
114         resp.Header().Set("Content-Length", strconv.Itoa(size))
115         resp.Header().Set("Content-Type", "application/octet-stream")
116         resp.Write(buf[:size])
117 }
118
119 // Return a new context that gets cancelled by resp's CloseNotifier.
120 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
121         ctx, cancel := context.WithCancel(parent)
122         if cn, ok := resp.(http.CloseNotifier); ok {
123                 go func(c <-chan bool) {
124                         select {
125                         case <-c:
126                                 theConfig.debugLogf("cancel context")
127                                 cancel()
128                         case <-ctx.Done():
129                         }
130                 }(cn.CloseNotify())
131         }
132         return ctx, cancel
133 }
134
135 // Get a buffer from the pool -- but give up and return a non-nil
136 // error if ctx ends before we get a buffer.
137 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
138         bufReady := make(chan []byte)
139         go func() {
140                 bufReady <- bufs.Get(bufSize)
141         }()
142         select {
143         case buf := <-bufReady:
144                 return buf, nil
145         case <-ctx.Done():
146                 go func() {
147                         // Even if closeNotifier happened first, we
148                         // need to keep waiting for our buf so we can
149                         // return it to the pool.
150                         bufs.Put(<-bufReady)
151                 }()
152                 return nil, ErrClientDisconnect
153         }
154 }
155
156 // PutBlockHandler is a HandleFunc to address Put block requests.
157 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
158         ctx, cancel := contextForResponse(context.TODO(), resp)
159         defer cancel()
160
161         hash := mux.Vars(req)["hash"]
162
163         // Detect as many error conditions as possible before reading
164         // the body: avoid transmitting data that will not end up
165         // being written anyway.
166
167         if req.ContentLength == -1 {
168                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
169                 return
170         }
171
172         if req.ContentLength > BlockSize {
173                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
174                 return
175         }
176
177         if len(KeepVM.AllWritable()) == 0 {
178                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
179                 return
180         }
181
182         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
183         if err != nil {
184                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
185                 return
186         }
187
188         _, err = io.ReadFull(req.Body, buf)
189         if err != nil {
190                 http.Error(resp, err.Error(), 500)
191                 bufs.Put(buf)
192                 return
193         }
194
195         replication, err := PutBlock(ctx, buf, hash)
196         bufs.Put(buf)
197
198         if err != nil {
199                 code := http.StatusInternalServerError
200                 if err, ok := err.(*KeepError); ok {
201                         code = err.HTTPCode
202                 }
203                 http.Error(resp, err.Error(), code)
204                 return
205         }
206
207         // Success; add a size hint, sign the locator if possible, and
208         // return it to the client.
209         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
210         apiToken := GetAPIToken(req)
211         if theConfig.blobSigningKey != nil && apiToken != "" {
212                 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
213                 returnHash = SignLocator(returnHash, apiToken, expiry)
214         }
215         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
216         resp.Write([]byte(returnHash + "\n"))
217 }
218
219 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
220 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
221         // Reject unauthorized requests.
222         if !IsSystemAuth(GetAPIToken(req)) {
223                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
224                 return
225         }
226
227         prefix := mux.Vars(req)["prefix"]
228
229         for _, vol := range KeepVM.AllReadable() {
230                 if err := vol.IndexTo(prefix, resp); err != nil {
231                         // The only errors returned by IndexTo are
232                         // write errors returned by resp.Write(),
233                         // which probably means the client has
234                         // disconnected and this error will never be
235                         // reported to the client -- but it will
236                         // appear in our own error log.
237                         http.Error(resp, err.Error(), http.StatusInternalServerError)
238                         return
239                 }
240         }
241         // An empty line at EOF is the only way the client can be
242         // assured the entire index was received.
243         resp.Write([]byte{'\n'})
244 }
245
246 // PoolStatus struct
247 type PoolStatus struct {
248         Alloc uint64 `json:"BytesAllocated"`
249         Cap   int    `json:"BuffersMax"`
250         Len   int    `json:"BuffersInUse"`
251 }
252
253 type volumeStatusEnt struct {
254         Label         string
255         Status        *VolumeStatus `json:",omitempty"`
256         VolumeStats   *ioStats      `json:",omitempty"`
257         InternalStats interface{}   `json:",omitempty"`
258 }
259
260 // NodeStatus struct
261 type NodeStatus struct {
262         Volumes    []*volumeStatusEnt
263         BufferPool PoolStatus
264         PullQueue  WorkQueueStatus
265         TrashQueue WorkQueueStatus
266 }
267
268 var st NodeStatus
269 var stLock sync.Mutex
270
271 // DebugHandler addresses /debug.json requests.
272 func DebugHandler(resp http.ResponseWriter, req *http.Request) {
273         type debugStats struct {
274                 MemStats runtime.MemStats
275         }
276         var ds debugStats
277         runtime.ReadMemStats(&ds.MemStats)
278         err := json.NewEncoder(resp).Encode(&ds)
279         if err != nil {
280                 http.Error(resp, err.Error(), 500)
281         }
282 }
283
284 // StatusHandler addresses /status.json requests.
285 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
286         stLock.Lock()
287         readNodeStatus(&st)
288         jstat, err := json.Marshal(&st)
289         stLock.Unlock()
290         if err == nil {
291                 resp.Write(jstat)
292         } else {
293                 log.Printf("json.Marshal: %s", err)
294                 log.Printf("NodeStatus = %v", &st)
295                 http.Error(resp, err.Error(), 500)
296         }
297 }
298
299 // populate the given NodeStatus struct with current values.
300 func readNodeStatus(st *NodeStatus) {
301         vols := KeepVM.AllReadable()
302         if cap(st.Volumes) < len(vols) {
303                 st.Volumes = make([]*volumeStatusEnt, len(vols))
304         }
305         st.Volumes = st.Volumes[:0]
306         for _, vol := range vols {
307                 var internalStats interface{}
308                 if vol, ok := vol.(InternalStatser); ok {
309                         internalStats = vol.InternalStats()
310                 }
311                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
312                         Label:         vol.String(),
313                         Status:        vol.Status(),
314                         InternalStats: internalStats,
315                         //VolumeStats: KeepVM.VolumeStats(vol),
316                 })
317         }
318         st.BufferPool.Alloc = bufs.Alloc()
319         st.BufferPool.Cap = bufs.Cap()
320         st.BufferPool.Len = bufs.Len()
321         st.PullQueue = getWorkQueueStatus(pullq)
322         st.TrashQueue = getWorkQueueStatus(trashq)
323 }
324
325 // return a WorkQueueStatus for the given queue. If q is nil (which
326 // should never happen except in test suites), return a zero status
327 // value instead of crashing.
328 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
329         if q == nil {
330                 // This should only happen during tests.
331                 return WorkQueueStatus{}
332         }
333         return q.Status()
334 }
335
336 // DeleteHandler processes DELETE requests.
337 //
338 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
339 // from all connected volumes.
340 //
341 // Only the Data Manager, or an Arvados admin with scope "all", are
342 // allowed to issue DELETE requests.  If a DELETE request is not
343 // authenticated or is issued by a non-admin user, the server returns
344 // a PermissionError.
345 //
346 // Upon receiving a valid request from an authorized user,
347 // DeleteHandler deletes all copies of the specified block on local
348 // writable volumes.
349 //
350 // Response format:
351 //
352 // If the requested blocks was not found on any volume, the response
353 // code is HTTP 404 Not Found.
354 //
355 // Otherwise, the response code is 200 OK, with a response body
356 // consisting of the JSON message
357 //
358 //    {"copies_deleted":d,"copies_failed":f}
359 //
360 // where d and f are integers representing the number of blocks that
361 // were successfully and unsuccessfully deleted.
362 //
363 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
364         hash := mux.Vars(req)["hash"]
365
366         // Confirm that this user is an admin and has a token with unlimited scope.
367         var tok = GetAPIToken(req)
368         if tok == "" || !CanDelete(tok) {
369                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
370                 return
371         }
372
373         if !theConfig.EnableDelete {
374                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
375                 return
376         }
377
378         // Delete copies of this block from all available volumes.
379         // Report how many blocks were successfully deleted, and how
380         // many were found on writable volumes but not deleted.
381         var result struct {
382                 Deleted int `json:"copies_deleted"`
383                 Failed  int `json:"copies_failed"`
384         }
385         for _, vol := range KeepVM.AllWritable() {
386                 if err := vol.Trash(hash); err == nil {
387                         result.Deleted++
388                 } else if os.IsNotExist(err) {
389                         continue
390                 } else {
391                         result.Failed++
392                         log.Println("DeleteHandler:", err)
393                 }
394         }
395
396         var st int
397
398         if result.Deleted == 0 && result.Failed == 0 {
399                 st = http.StatusNotFound
400         } else {
401                 st = http.StatusOK
402         }
403
404         resp.WriteHeader(st)
405
406         if st == http.StatusOK {
407                 if body, err := json.Marshal(result); err == nil {
408                         resp.Write(body)
409                 } else {
410                         log.Printf("json.Marshal: %s (result = %v)", err, result)
411                         http.Error(resp, err.Error(), 500)
412                 }
413         }
414 }
415
416 /* PullHandler processes "PUT /pull" requests for the data manager.
417    The request body is a JSON message containing a list of pull
418    requests in the following format:
419
420    [
421       {
422          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
423          "servers":[
424                         "keep0.qr1hi.arvadosapi.com:25107",
425                         "keep1.qr1hi.arvadosapi.com:25108"
426                  ]
427           },
428           {
429                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
430                  "servers":[
431                         "10.0.1.5:25107",
432                         "10.0.1.6:25107",
433                         "10.0.1.7:25108"
434                  ]
435           },
436           ...
437    ]
438
439    Each pull request in the list consists of a block locator string
440    and an ordered list of servers.  Keepstore should try to fetch the
441    block from each server in turn.
442
443    If the request has not been sent by the Data Manager, return 401
444    Unauthorized.
445
446    If the JSON unmarshalling fails, return 400 Bad Request.
447 */
448
449 // PullRequest consists of a block locator and an ordered list of servers
450 type PullRequest struct {
451         Locator string   `json:"locator"`
452         Servers []string `json:"servers"`
453 }
454
455 // PullHandler processes "PUT /pull" requests for the data manager.
456 func PullHandler(resp http.ResponseWriter, req *http.Request) {
457         // Reject unauthorized requests.
458         if !IsSystemAuth(GetAPIToken(req)) {
459                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
460                 return
461         }
462
463         // Parse the request body.
464         var pr []PullRequest
465         r := json.NewDecoder(req.Body)
466         if err := r.Decode(&pr); err != nil {
467                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
468                 return
469         }
470
471         // We have a properly formatted pull list sent from the data
472         // manager.  Report success and send the list to the pull list
473         // manager for further handling.
474         resp.WriteHeader(http.StatusOK)
475         resp.Write([]byte(
476                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
477
478         plist := list.New()
479         for _, p := range pr {
480                 plist.PushBack(p)
481         }
482         pullq.ReplaceQueue(plist)
483 }
484
485 // TrashRequest consists of a block locator and it's Mtime
486 type TrashRequest struct {
487         Locator    string `json:"locator"`
488         BlockMtime int64  `json:"block_mtime"`
489 }
490
491 // TrashHandler processes /trash requests.
492 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
493         // Reject unauthorized requests.
494         if !IsSystemAuth(GetAPIToken(req)) {
495                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
496                 return
497         }
498
499         // Parse the request body.
500         var trash []TrashRequest
501         r := json.NewDecoder(req.Body)
502         if err := r.Decode(&trash); err != nil {
503                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
504                 return
505         }
506
507         // We have a properly formatted trash list sent from the data
508         // manager.  Report success and send the list to the trash work
509         // queue for further handling.
510         resp.WriteHeader(http.StatusOK)
511         resp.Write([]byte(
512                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
513
514         tlist := list.New()
515         for _, t := range trash {
516                 tlist.PushBack(t)
517         }
518         trashq.ReplaceQueue(tlist)
519 }
520
521 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
522 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
523         // Reject unauthorized requests.
524         if !IsSystemAuth(GetAPIToken(req)) {
525                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
526                 return
527         }
528
529         hash := mux.Vars(req)["hash"]
530
531         if len(KeepVM.AllWritable()) == 0 {
532                 http.Error(resp, "No writable volumes", http.StatusNotFound)
533                 return
534         }
535
536         var untrashedOn, failedOn []string
537         var numNotFound int
538         for _, vol := range KeepVM.AllWritable() {
539                 err := vol.Untrash(hash)
540
541                 if os.IsNotExist(err) {
542                         numNotFound++
543                 } else if err != nil {
544                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
545                         failedOn = append(failedOn, vol.String())
546                 } else {
547                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
548                         untrashedOn = append(untrashedOn, vol.String())
549                 }
550         }
551
552         if numNotFound == len(KeepVM.AllWritable()) {
553                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
554                 return
555         }
556
557         if len(failedOn) == len(KeepVM.AllWritable()) {
558                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
559         } else {
560                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
561                 if len(failedOn) > 0 {
562                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
563                 }
564                 resp.Write([]byte(respBody))
565         }
566 }
567
568 // GetBlock and PutBlock implement lower-level code for handling
569 // blocks by rooting through volumes connected to the local machine.
570 // Once the handler has determined that system policy permits the
571 // request, it calls these methods to perform the actual operation.
572 //
573 // TODO(twp): this code would probably be better located in the
574 // VolumeManager interface. As an abstraction, the VolumeManager
575 // should be the only part of the code that cares about which volume a
576 // block is stored on, so it should be responsible for figuring out
577 // which volume to check for fetching blocks, storing blocks, etc.
578
579 // GetBlock fetches the block identified by "hash" into the provided
580 // buf, and returns the data size.
581 //
582 // If the block cannot be found on any volume, returns NotFoundError.
583 //
584 // If the block found does not have the correct MD5 hash, returns
585 // DiskHashError.
586 //
587 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
588         // Attempt to read the requested hash from a keep volume.
589         errorToCaller := NotFoundError
590
591         for _, vol := range KeepVM.AllReadable() {
592                 size, err := vol.Get(ctx, hash, buf)
593                 select {
594                 case <-ctx.Done():
595                         return 0, ErrClientDisconnect
596                 default:
597                 }
598                 if err != nil {
599                         // IsNotExist is an expected error and may be
600                         // ignored. All other errors are logged. In
601                         // any case we continue trying to read other
602                         // volumes. If all volumes report IsNotExist,
603                         // we return a NotFoundError.
604                         if !os.IsNotExist(err) {
605                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
606                         }
607                         continue
608                 }
609                 // Check the file checksum.
610                 //
611                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
612                 if filehash != hash {
613                         // TODO: Try harder to tell a sysadmin about
614                         // this.
615                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
616                                 vol, hash, filehash)
617                         errorToCaller = DiskHashError
618                         continue
619                 }
620                 if errorToCaller == DiskHashError {
621                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
622                                 vol, hash)
623                 }
624                 return size, nil
625         }
626         return 0, errorToCaller
627 }
628
629 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
630 //
631 // PutBlock(ctx, block, hash)
632 //   Stores the BLOCK (identified by the content id HASH) in Keep.
633 //
634 //   The MD5 checksum of the block must be identical to the content id HASH.
635 //   If not, an error is returned.
636 //
637 //   PutBlock stores the BLOCK on the first Keep volume with free space.
638 //   A failure code is returned to the user only if all volumes fail.
639 //
640 //   On success, PutBlock returns nil.
641 //   On failure, it returns a KeepError with one of the following codes:
642 //
643 //   500 Collision
644 //          A different block with the same hash already exists on this
645 //          Keep server.
646 //   422 MD5Fail
647 //          The MD5 hash of the BLOCK does not match the argument HASH.
648 //   503 Full
649 //          There was not enough space left in any Keep volume to store
650 //          the object.
651 //   500 Fail
652 //          The object could not be stored for some other reason (e.g.
653 //          all writes failed). The text of the error message should
654 //          provide as much detail as possible.
655 //
656 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
657         // Check that BLOCK's checksum matches HASH.
658         blockhash := fmt.Sprintf("%x", md5.Sum(block))
659         if blockhash != hash {
660                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
661                 return 0, RequestHashError
662         }
663
664         // If we already have this data, it's intact on disk, and we
665         // can update its timestamp, return success. If we have
666         // different data with the same hash, return failure.
667         if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
668                 return n, err
669         } else if ctx.Err() != nil {
670                 return 0, ErrClientDisconnect
671         }
672
673         // Choose a Keep volume to write to.
674         // If this volume fails, try all of the volumes in order.
675         if vol := KeepVM.NextWritable(); vol != nil {
676                 if err := vol.Put(ctx, hash, block); err == nil {
677                         return vol.Replication(), nil // success!
678                 }
679                 if ctx.Err() != nil {
680                         return 0, ErrClientDisconnect
681                 }
682         }
683
684         writables := KeepVM.AllWritable()
685         if len(writables) == 0 {
686                 log.Print("No writable volumes.")
687                 return 0, FullError
688         }
689
690         allFull := true
691         for _, vol := range writables {
692                 err := vol.Put(ctx, hash, block)
693                 if ctx.Err() != nil {
694                         return 0, ErrClientDisconnect
695                 }
696                 if err == nil {
697                         return vol.Replication(), nil // success!
698                 }
699                 if err != FullError {
700                         // The volume is not full but the
701                         // write did not succeed.  Report the
702                         // error and continue trying.
703                         allFull = false
704                         log.Printf("%s: Write(%s): %s", vol, hash, err)
705                 }
706         }
707
708         if allFull {
709                 log.Print("All volumes are full.")
710                 return 0, FullError
711         }
712         // Already logged the non-full errors.
713         return 0, GenericError
714 }
715
716 // CompareAndTouch returns the current replication level if one of the
717 // volumes already has the given content and it successfully updates
718 // the relevant block's modification time in order to protect it from
719 // premature garbage collection. Otherwise, it returns a non-nil
720 // error.
721 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
722         var bestErr error = NotFoundError
723         for _, vol := range KeepVM.AllWritable() {
724                 err := vol.Compare(ctx, hash, buf)
725                 if ctx.Err() != nil {
726                         return 0, ctx.Err()
727                 } else if err == CollisionError {
728                         // Stop if we have a block with same hash but
729                         // different content. (It will be impossible
730                         // to tell which one is wanted if we have
731                         // both, so there's no point writing it even
732                         // on a different volume.)
733                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
734                         return 0, err
735                 } else if os.IsNotExist(err) {
736                         // Block does not exist. This is the only
737                         // "normal" error: we don't log anything.
738                         continue
739                 } else if err != nil {
740                         // Couldn't open file, data is corrupt on
741                         // disk, etc.: log this abnormal condition,
742                         // and try the next volume.
743                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
744                         continue
745                 }
746                 if err := vol.Touch(hash); err != nil {
747                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
748                         bestErr = err
749                         continue
750                 }
751                 // Compare and Touch both worked --> done.
752                 return vol.Replication(), nil
753         }
754         return 0, bestErr
755 }
756
757 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
758
759 // IsValidLocator returns true if the specified string is a valid Keep locator.
760 //   When Keep is extended to support hash types other than MD5,
761 //   this should be updated to cover those as well.
762 //
763 func IsValidLocator(loc string) bool {
764         return validLocatorRe.MatchString(loc)
765 }
766
767 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
768
769 // GetAPIToken returns the OAuth2 token from the Authorization
770 // header of a HTTP request, or an empty string if no matching
771 // token is found.
772 func GetAPIToken(req *http.Request) string {
773         if auth, ok := req.Header["Authorization"]; ok {
774                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
775                         return match[1]
776                 }
777         }
778         return ""
779 }
780
781 // IsExpired returns true if the given Unix timestamp (expressed as a
782 // hexadecimal string) is in the past, or if timestampHex cannot be
783 // parsed as a hexadecimal string.
784 func IsExpired(timestampHex string) bool {
785         ts, err := strconv.ParseInt(timestampHex, 16, 0)
786         if err != nil {
787                 log.Printf("IsExpired: %s", err)
788                 return true
789         }
790         return time.Unix(ts, 0).Before(time.Now())
791 }
792
793 // CanDelete returns true if the user identified by apiToken is
794 // allowed to delete blocks.
795 func CanDelete(apiToken string) bool {
796         if apiToken == "" {
797                 return false
798         }
799         // Blocks may be deleted only when Keep has been configured with a
800         // data manager.
801         if IsSystemAuth(apiToken) {
802                 return true
803         }
804         // TODO(twp): look up apiToken with the API server
805         // return true if is_admin is true and if the token
806         // has unlimited scope
807         return false
808 }
809
810 // IsSystemAuth returns true if the given token is allowed to perform
811 // system level actions like deleting data.
812 func IsSystemAuth(token string) bool {
813         return token != "" && token == theConfig.systemAuthToken
814 }