5737: Merge branch 'master' into 5737-ruby231
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "container/list"
12         "context"
13         "crypto/md5"
14         "encoding/json"
15         "fmt"
16         "github.com/gorilla/mux"
17         "io"
18         "log"
19         "net/http"
20         "os"
21         "regexp"
22         "runtime"
23         "strconv"
24         "strings"
25         "sync"
26         "time"
27 )
28
29 // MakeRESTRouter returns a new mux.Router that forwards all Keep
30 // requests to the appropriate handlers.
31 //
32 func MakeRESTRouter() *mux.Router {
33         rest := mux.NewRouter()
34
35         rest.HandleFunc(
36                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
37         rest.HandleFunc(
38                 `/{hash:[0-9a-f]{32}}+{hints}`,
39                 GetBlockHandler).Methods("GET", "HEAD")
40
41         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
42         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
43         // List all blocks stored here. Privileged client only.
44         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
45         // List blocks stored here whose hash has the given prefix.
46         // Privileged client only.
47         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
48
49         // Internals/debugging info (runtime.MemStats)
50         rest.HandleFunc(`/debug.json`, DebugHandler).Methods("GET", "HEAD")
51
52         // List volumes: path, device number, bytes used/avail.
53         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
54
55         // Replace the current pull queue.
56         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
57
58         // Replace the current trash queue.
59         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
60
61         // Untrash moves blocks from trash back into store
62         rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
63
64         // Any request which does not match any of these routes gets
65         // 400 Bad Request.
66         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
67
68         return rest
69 }
70
71 // BadRequestHandler is a HandleFunc to address bad requests.
72 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
73         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
74 }
75
76 // GetBlockHandler is a HandleFunc to address Get block requests.
77 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
78         ctx, cancel := contextForResponse(context.TODO(), resp)
79         defer cancel()
80
81         if theConfig.RequireSignatures {
82                 locator := req.URL.Path[1:] // strip leading slash
83                 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
84                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
85                         return
86                 }
87         }
88
89         // TODO: Probe volumes to check whether the block _might_
90         // exist. Some volumes/types could support a quick existence
91         // check without causing other operations to suffer. If all
92         // volumes support that, and assure us the block definitely
93         // isn't here, we can return 404 now instead of waiting for a
94         // buffer.
95
96         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
97         if err != nil {
98                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
99                 return
100         }
101         defer bufs.Put(buf)
102
103         size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
104         if err != nil {
105                 code := http.StatusInternalServerError
106                 if err, ok := err.(*KeepError); ok {
107                         code = err.HTTPCode
108                 }
109                 http.Error(resp, err.Error(), code)
110                 return
111         }
112
113         resp.Header().Set("Content-Length", strconv.Itoa(size))
114         resp.Header().Set("Content-Type", "application/octet-stream")
115         resp.Write(buf[:size])
116 }
117
118 // Return a new context that gets cancelled by resp's CloseNotifier.
119 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
120         ctx, cancel := context.WithCancel(parent)
121         if cn, ok := resp.(http.CloseNotifier); ok {
122                 go func(c <-chan bool) {
123                         select {
124                         case <-c:
125                                 theConfig.debugLogf("cancel context")
126                                 cancel()
127                         case <-ctx.Done():
128                         }
129                 }(cn.CloseNotify())
130         }
131         return ctx, cancel
132 }
133
134 // Get a buffer from the pool -- but give up and return a non-nil
135 // error if ctx ends before we get a buffer.
136 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
137         bufReady := make(chan []byte)
138         go func() {
139                 bufReady <- bufs.Get(bufSize)
140         }()
141         select {
142         case buf := <-bufReady:
143                 return buf, nil
144         case <-ctx.Done():
145                 go func() {
146                         // Even if closeNotifier happened first, we
147                         // need to keep waiting for our buf so we can
148                         // return it to the pool.
149                         bufs.Put(<-bufReady)
150                 }()
151                 return nil, ErrClientDisconnect
152         }
153 }
154
155 // PutBlockHandler is a HandleFunc to address Put block requests.
156 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
157         ctx, cancel := contextForResponse(context.TODO(), resp)
158         defer cancel()
159
160         hash := mux.Vars(req)["hash"]
161
162         // Detect as many error conditions as possible before reading
163         // the body: avoid transmitting data that will not end up
164         // being written anyway.
165
166         if req.ContentLength == -1 {
167                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
168                 return
169         }
170
171         if req.ContentLength > BlockSize {
172                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
173                 return
174         }
175
176         if len(KeepVM.AllWritable()) == 0 {
177                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
178                 return
179         }
180
181         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
182         if err != nil {
183                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
184                 return
185         }
186
187         _, err = io.ReadFull(req.Body, buf)
188         if err != nil {
189                 http.Error(resp, err.Error(), 500)
190                 bufs.Put(buf)
191                 return
192         }
193
194         replication, err := PutBlock(ctx, buf, hash)
195         bufs.Put(buf)
196
197         if err != nil {
198                 code := http.StatusInternalServerError
199                 if err, ok := err.(*KeepError); ok {
200                         code = err.HTTPCode
201                 }
202                 http.Error(resp, err.Error(), code)
203                 return
204         }
205
206         // Success; add a size hint, sign the locator if possible, and
207         // return it to the client.
208         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
209         apiToken := GetAPIToken(req)
210         if theConfig.blobSigningKey != nil && apiToken != "" {
211                 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
212                 returnHash = SignLocator(returnHash, apiToken, expiry)
213         }
214         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
215         resp.Write([]byte(returnHash + "\n"))
216 }
217
218 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
219 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
220         // Reject unauthorized requests.
221         if !IsSystemAuth(GetAPIToken(req)) {
222                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
223                 return
224         }
225
226         prefix := mux.Vars(req)["prefix"]
227
228         for _, vol := range KeepVM.AllReadable() {
229                 if err := vol.IndexTo(prefix, resp); err != nil {
230                         // The only errors returned by IndexTo are
231                         // write errors returned by resp.Write(),
232                         // which probably means the client has
233                         // disconnected and this error will never be
234                         // reported to the client -- but it will
235                         // appear in our own error log.
236                         http.Error(resp, err.Error(), http.StatusInternalServerError)
237                         return
238                 }
239         }
240         // An empty line at EOF is the only way the client can be
241         // assured the entire index was received.
242         resp.Write([]byte{'\n'})
243 }
244
245 // PoolStatus struct
246 type PoolStatus struct {
247         Alloc uint64 `json:"BytesAllocated"`
248         Cap   int    `json:"BuffersMax"`
249         Len   int    `json:"BuffersInUse"`
250 }
251
252 type volumeStatusEnt struct {
253         Label         string
254         Status        *VolumeStatus `json:",omitempty"`
255         VolumeStats   *ioStats      `json:",omitempty"`
256         InternalStats interface{}   `json:",omitempty"`
257 }
258
259 // NodeStatus struct
260 type NodeStatus struct {
261         Volumes    []*volumeStatusEnt
262         BufferPool PoolStatus
263         PullQueue  WorkQueueStatus
264         TrashQueue WorkQueueStatus
265 }
266
267 var st NodeStatus
268 var stLock sync.Mutex
269
270 // DebugHandler addresses /debug.json requests.
271 func DebugHandler(resp http.ResponseWriter, req *http.Request) {
272         type debugStats struct {
273                 MemStats runtime.MemStats
274         }
275         var ds debugStats
276         runtime.ReadMemStats(&ds.MemStats)
277         err := json.NewEncoder(resp).Encode(&ds)
278         if err != nil {
279                 http.Error(resp, err.Error(), 500)
280         }
281 }
282
283 // StatusHandler addresses /status.json requests.
284 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
285         stLock.Lock()
286         readNodeStatus(&st)
287         jstat, err := json.Marshal(&st)
288         stLock.Unlock()
289         if err == nil {
290                 resp.Write(jstat)
291         } else {
292                 log.Printf("json.Marshal: %s", err)
293                 log.Printf("NodeStatus = %v", &st)
294                 http.Error(resp, err.Error(), 500)
295         }
296 }
297
298 // populate the given NodeStatus struct with current values.
299 func readNodeStatus(st *NodeStatus) {
300         vols := KeepVM.AllReadable()
301         if cap(st.Volumes) < len(vols) {
302                 st.Volumes = make([]*volumeStatusEnt, len(vols))
303         }
304         st.Volumes = st.Volumes[:0]
305         for _, vol := range vols {
306                 var internalStats interface{}
307                 if vol, ok := vol.(InternalStatser); ok {
308                         internalStats = vol.InternalStats()
309                 }
310                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
311                         Label:         vol.String(),
312                         Status:        vol.Status(),
313                         InternalStats: internalStats,
314                         //VolumeStats: KeepVM.VolumeStats(vol),
315                 })
316         }
317         st.BufferPool.Alloc = bufs.Alloc()
318         st.BufferPool.Cap = bufs.Cap()
319         st.BufferPool.Len = bufs.Len()
320         st.PullQueue = getWorkQueueStatus(pullq)
321         st.TrashQueue = getWorkQueueStatus(trashq)
322 }
323
324 // return a WorkQueueStatus for the given queue. If q is nil (which
325 // should never happen except in test suites), return a zero status
326 // value instead of crashing.
327 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
328         if q == nil {
329                 // This should only happen during tests.
330                 return WorkQueueStatus{}
331         }
332         return q.Status()
333 }
334
335 // DeleteHandler processes DELETE requests.
336 //
337 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
338 // from all connected volumes.
339 //
340 // Only the Data Manager, or an Arvados admin with scope "all", are
341 // allowed to issue DELETE requests.  If a DELETE request is not
342 // authenticated or is issued by a non-admin user, the server returns
343 // a PermissionError.
344 //
345 // Upon receiving a valid request from an authorized user,
346 // DeleteHandler deletes all copies of the specified block on local
347 // writable volumes.
348 //
349 // Response format:
350 //
351 // If the requested blocks was not found on any volume, the response
352 // code is HTTP 404 Not Found.
353 //
354 // Otherwise, the response code is 200 OK, with a response body
355 // consisting of the JSON message
356 //
357 //    {"copies_deleted":d,"copies_failed":f}
358 //
359 // where d and f are integers representing the number of blocks that
360 // were successfully and unsuccessfully deleted.
361 //
362 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
363         hash := mux.Vars(req)["hash"]
364
365         // Confirm that this user is an admin and has a token with unlimited scope.
366         var tok = GetAPIToken(req)
367         if tok == "" || !CanDelete(tok) {
368                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
369                 return
370         }
371
372         if !theConfig.EnableDelete {
373                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
374                 return
375         }
376
377         // Delete copies of this block from all available volumes.
378         // Report how many blocks were successfully deleted, and how
379         // many were found on writable volumes but not deleted.
380         var result struct {
381                 Deleted int `json:"copies_deleted"`
382                 Failed  int `json:"copies_failed"`
383         }
384         for _, vol := range KeepVM.AllWritable() {
385                 if err := vol.Trash(hash); err == nil {
386                         result.Deleted++
387                 } else if os.IsNotExist(err) {
388                         continue
389                 } else {
390                         result.Failed++
391                         log.Println("DeleteHandler:", err)
392                 }
393         }
394
395         var st int
396
397         if result.Deleted == 0 && result.Failed == 0 {
398                 st = http.StatusNotFound
399         } else {
400                 st = http.StatusOK
401         }
402
403         resp.WriteHeader(st)
404
405         if st == http.StatusOK {
406                 if body, err := json.Marshal(result); err == nil {
407                         resp.Write(body)
408                 } else {
409                         log.Printf("json.Marshal: %s (result = %v)", err, result)
410                         http.Error(resp, err.Error(), 500)
411                 }
412         }
413 }
414
415 /* PullHandler processes "PUT /pull" requests for the data manager.
416    The request body is a JSON message containing a list of pull
417    requests in the following format:
418
419    [
420       {
421          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
422          "servers":[
423                         "keep0.qr1hi.arvadosapi.com:25107",
424                         "keep1.qr1hi.arvadosapi.com:25108"
425                  ]
426           },
427           {
428                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
429                  "servers":[
430                         "10.0.1.5:25107",
431                         "10.0.1.6:25107",
432                         "10.0.1.7:25108"
433                  ]
434           },
435           ...
436    ]
437
438    Each pull request in the list consists of a block locator string
439    and an ordered list of servers.  Keepstore should try to fetch the
440    block from each server in turn.
441
442    If the request has not been sent by the Data Manager, return 401
443    Unauthorized.
444
445    If the JSON unmarshalling fails, return 400 Bad Request.
446 */
447
448 // PullRequest consists of a block locator and an ordered list of servers
449 type PullRequest struct {
450         Locator string   `json:"locator"`
451         Servers []string `json:"servers"`
452 }
453
454 // PullHandler processes "PUT /pull" requests for the data manager.
455 func PullHandler(resp http.ResponseWriter, req *http.Request) {
456         // Reject unauthorized requests.
457         if !IsSystemAuth(GetAPIToken(req)) {
458                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
459                 return
460         }
461
462         // Parse the request body.
463         var pr []PullRequest
464         r := json.NewDecoder(req.Body)
465         if err := r.Decode(&pr); err != nil {
466                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
467                 return
468         }
469
470         // We have a properly formatted pull list sent from the data
471         // manager.  Report success and send the list to the pull list
472         // manager for further handling.
473         resp.WriteHeader(http.StatusOK)
474         resp.Write([]byte(
475                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
476
477         plist := list.New()
478         for _, p := range pr {
479                 plist.PushBack(p)
480         }
481         pullq.ReplaceQueue(plist)
482 }
483
484 // TrashRequest consists of a block locator and it's Mtime
485 type TrashRequest struct {
486         Locator    string `json:"locator"`
487         BlockMtime int64  `json:"block_mtime"`
488 }
489
490 // TrashHandler processes /trash requests.
491 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
492         // Reject unauthorized requests.
493         if !IsSystemAuth(GetAPIToken(req)) {
494                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
495                 return
496         }
497
498         // Parse the request body.
499         var trash []TrashRequest
500         r := json.NewDecoder(req.Body)
501         if err := r.Decode(&trash); err != nil {
502                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
503                 return
504         }
505
506         // We have a properly formatted trash list sent from the data
507         // manager.  Report success and send the list to the trash work
508         // queue for further handling.
509         resp.WriteHeader(http.StatusOK)
510         resp.Write([]byte(
511                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
512
513         tlist := list.New()
514         for _, t := range trash {
515                 tlist.PushBack(t)
516         }
517         trashq.ReplaceQueue(tlist)
518 }
519
520 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
521 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
522         // Reject unauthorized requests.
523         if !IsSystemAuth(GetAPIToken(req)) {
524                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
525                 return
526         }
527
528         hash := mux.Vars(req)["hash"]
529
530         if len(KeepVM.AllWritable()) == 0 {
531                 http.Error(resp, "No writable volumes", http.StatusNotFound)
532                 return
533         }
534
535         var untrashedOn, failedOn []string
536         var numNotFound int
537         for _, vol := range KeepVM.AllWritable() {
538                 err := vol.Untrash(hash)
539
540                 if os.IsNotExist(err) {
541                         numNotFound++
542                 } else if err != nil {
543                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
544                         failedOn = append(failedOn, vol.String())
545                 } else {
546                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
547                         untrashedOn = append(untrashedOn, vol.String())
548                 }
549         }
550
551         if numNotFound == len(KeepVM.AllWritable()) {
552                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
553                 return
554         }
555
556         if len(failedOn) == len(KeepVM.AllWritable()) {
557                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
558         } else {
559                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
560                 if len(failedOn) > 0 {
561                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
562                 }
563                 resp.Write([]byte(respBody))
564         }
565 }
566
567 // GetBlock and PutBlock implement lower-level code for handling
568 // blocks by rooting through volumes connected to the local machine.
569 // Once the handler has determined that system policy permits the
570 // request, it calls these methods to perform the actual operation.
571 //
572 // TODO(twp): this code would probably be better located in the
573 // VolumeManager interface. As an abstraction, the VolumeManager
574 // should be the only part of the code that cares about which volume a
575 // block is stored on, so it should be responsible for figuring out
576 // which volume to check for fetching blocks, storing blocks, etc.
577
578 // GetBlock fetches the block identified by "hash" into the provided
579 // buf, and returns the data size.
580 //
581 // If the block cannot be found on any volume, returns NotFoundError.
582 //
583 // If the block found does not have the correct MD5 hash, returns
584 // DiskHashError.
585 //
586 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
587         // Attempt to read the requested hash from a keep volume.
588         errorToCaller := NotFoundError
589
590         for _, vol := range KeepVM.AllReadable() {
591                 size, err := vol.Get(ctx, hash, buf)
592                 select {
593                 case <-ctx.Done():
594                         return 0, ErrClientDisconnect
595                 default:
596                 }
597                 if err != nil {
598                         // IsNotExist is an expected error and may be
599                         // ignored. All other errors are logged. In
600                         // any case we continue trying to read other
601                         // volumes. If all volumes report IsNotExist,
602                         // we return a NotFoundError.
603                         if !os.IsNotExist(err) {
604                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
605                         }
606                         continue
607                 }
608                 // Check the file checksum.
609                 //
610                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
611                 if filehash != hash {
612                         // TODO: Try harder to tell a sysadmin about
613                         // this.
614                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
615                                 vol, hash, filehash)
616                         errorToCaller = DiskHashError
617                         continue
618                 }
619                 if errorToCaller == DiskHashError {
620                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
621                                 vol, hash)
622                 }
623                 return size, nil
624         }
625         return 0, errorToCaller
626 }
627
628 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
629 //
630 // PutBlock(ctx, block, hash)
631 //   Stores the BLOCK (identified by the content id HASH) in Keep.
632 //
633 //   The MD5 checksum of the block must be identical to the content id HASH.
634 //   If not, an error is returned.
635 //
636 //   PutBlock stores the BLOCK on the first Keep volume with free space.
637 //   A failure code is returned to the user only if all volumes fail.
638 //
639 //   On success, PutBlock returns nil.
640 //   On failure, it returns a KeepError with one of the following codes:
641 //
642 //   500 Collision
643 //          A different block with the same hash already exists on this
644 //          Keep server.
645 //   422 MD5Fail
646 //          The MD5 hash of the BLOCK does not match the argument HASH.
647 //   503 Full
648 //          There was not enough space left in any Keep volume to store
649 //          the object.
650 //   500 Fail
651 //          The object could not be stored for some other reason (e.g.
652 //          all writes failed). The text of the error message should
653 //          provide as much detail as possible.
654 //
655 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
656         // Check that BLOCK's checksum matches HASH.
657         blockhash := fmt.Sprintf("%x", md5.Sum(block))
658         if blockhash != hash {
659                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
660                 return 0, RequestHashError
661         }
662
663         // If we already have this data, it's intact on disk, and we
664         // can update its timestamp, return success. If we have
665         // different data with the same hash, return failure.
666         if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
667                 return n, err
668         } else if ctx.Err() != nil {
669                 return 0, ErrClientDisconnect
670         }
671
672         // Choose a Keep volume to write to.
673         // If this volume fails, try all of the volumes in order.
674         if vol := KeepVM.NextWritable(); vol != nil {
675                 if err := vol.Put(ctx, hash, block); err == nil {
676                         return vol.Replication(), nil // success!
677                 }
678                 if ctx.Err() != nil {
679                         return 0, ErrClientDisconnect
680                 }
681         }
682
683         writables := KeepVM.AllWritable()
684         if len(writables) == 0 {
685                 log.Print("No writable volumes.")
686                 return 0, FullError
687         }
688
689         allFull := true
690         for _, vol := range writables {
691                 err := vol.Put(ctx, hash, block)
692                 if ctx.Err() != nil {
693                         return 0, ErrClientDisconnect
694                 }
695                 if err == nil {
696                         return vol.Replication(), nil // success!
697                 }
698                 if err != FullError {
699                         // The volume is not full but the
700                         // write did not succeed.  Report the
701                         // error and continue trying.
702                         allFull = false
703                         log.Printf("%s: Write(%s): %s", vol, hash, err)
704                 }
705         }
706
707         if allFull {
708                 log.Print("All volumes are full.")
709                 return 0, FullError
710         }
711         // Already logged the non-full errors.
712         return 0, GenericError
713 }
714
715 // CompareAndTouch returns the current replication level if one of the
716 // volumes already has the given content and it successfully updates
717 // the relevant block's modification time in order to protect it from
718 // premature garbage collection. Otherwise, it returns a non-nil
719 // error.
720 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
721         var bestErr error = NotFoundError
722         for _, vol := range KeepVM.AllWritable() {
723                 err := vol.Compare(ctx, hash, buf)
724                 if ctx.Err() != nil {
725                         return 0, ctx.Err()
726                 } else if err == CollisionError {
727                         // Stop if we have a block with same hash but
728                         // different content. (It will be impossible
729                         // to tell which one is wanted if we have
730                         // both, so there's no point writing it even
731                         // on a different volume.)
732                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
733                         return 0, err
734                 } else if os.IsNotExist(err) {
735                         // Block does not exist. This is the only
736                         // "normal" error: we don't log anything.
737                         continue
738                 } else if err != nil {
739                         // Couldn't open file, data is corrupt on
740                         // disk, etc.: log this abnormal condition,
741                         // and try the next volume.
742                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
743                         continue
744                 }
745                 if err := vol.Touch(hash); err != nil {
746                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
747                         bestErr = err
748                         continue
749                 }
750                 // Compare and Touch both worked --> done.
751                 return vol.Replication(), nil
752         }
753         return 0, bestErr
754 }
755
756 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
757
758 // IsValidLocator returns true if the specified string is a valid Keep locator.
759 //   When Keep is extended to support hash types other than MD5,
760 //   this should be updated to cover those as well.
761 //
762 func IsValidLocator(loc string) bool {
763         return validLocatorRe.MatchString(loc)
764 }
765
766 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
767
768 // GetAPIToken returns the OAuth2 token from the Authorization
769 // header of a HTTP request, or an empty string if no matching
770 // token is found.
771 func GetAPIToken(req *http.Request) string {
772         if auth, ok := req.Header["Authorization"]; ok {
773                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
774                         return match[1]
775                 }
776         }
777         return ""
778 }
779
780 // IsExpired returns true if the given Unix timestamp (expressed as a
781 // hexadecimal string) is in the past, or if timestampHex cannot be
782 // parsed as a hexadecimal string.
783 func IsExpired(timestampHex string) bool {
784         ts, err := strconv.ParseInt(timestampHex, 16, 0)
785         if err != nil {
786                 log.Printf("IsExpired: %s", err)
787                 return true
788         }
789         return time.Unix(ts, 0).Before(time.Now())
790 }
791
792 // CanDelete returns true if the user identified by apiToken is
793 // allowed to delete blocks.
794 func CanDelete(apiToken string) bool {
795         if apiToken == "" {
796                 return false
797         }
798         // Blocks may be deleted only when Keep has been configured with a
799         // data manager.
800         if IsSystemAuth(apiToken) {
801                 return true
802         }
803         // TODO(twp): look up apiToken with the API server
804         // return true if is_admin is true and if the token
805         // has unlimited scope
806         return false
807 }
808
809 // IsSystemAuth returns true if the given token is allowed to perform
810 // system level actions like deleting data.
811 func IsSystemAuth(token string) bool {
812         return token != "" && token == theConfig.systemAuthToken
813 }