refactor as procedural
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "container/list"
12         "context"
13         "crypto/md5"
14         "encoding/json"
15         "fmt"
16         "github.com/gorilla/mux"
17         "io"
18         "net/http"
19         "os"
20         "regexp"
21         "runtime"
22         "strconv"
23         "strings"
24         "sync"
25         "time"
26
27         "git.curoverse.com/arvados.git/sdk/go/httpserver"
28         log "github.com/Sirupsen/logrus"
29 )
30
31 type router struct {
32         *mux.Router
33         limiter httpserver.RequestCounter
34 }
35
36 // MakeRESTRouter returns a new router that forwards all Keep requests
37 // to the appropriate handlers.
38 func MakeRESTRouter() *router {
39         rest := mux.NewRouter()
40         rtr := &router{Router: rest}
41
42         rest.HandleFunc(
43                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
44         rest.HandleFunc(
45                 `/{hash:[0-9a-f]{32}}+{hints}`,
46                 GetBlockHandler).Methods("GET", "HEAD")
47
48         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
49         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
50         // List all blocks stored here. Privileged client only.
51         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
52         // List blocks stored here whose hash has the given prefix.
53         // Privileged client only.
54         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
55
56         // Internals/debugging info (runtime.MemStats)
57         rest.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
58
59         // List volumes: path, device number, bytes used/avail.
60         rest.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
61
62         // Replace the current pull queue.
63         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
64
65         // Replace the current trash queue.
66         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
67
68         // Untrash moves blocks from trash back into store
69         rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
70
71         // Any request which does not match any of these routes gets
72         // 400 Bad Request.
73         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
74
75         return rtr
76 }
77
78 // BadRequestHandler is a HandleFunc to address bad requests.
79 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
80         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
81 }
82
83 // GetBlockHandler is a HandleFunc to address Get block requests.
84 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
85         ctx, cancel := contextForResponse(context.TODO(), resp)
86         defer cancel()
87
88         if theConfig.RequireSignatures {
89                 locator := req.URL.Path[1:] // strip leading slash
90                 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
91                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
92                         return
93                 }
94         }
95
96         // TODO: Probe volumes to check whether the block _might_
97         // exist. Some volumes/types could support a quick existence
98         // check without causing other operations to suffer. If all
99         // volumes support that, and assure us the block definitely
100         // isn't here, we can return 404 now instead of waiting for a
101         // buffer.
102
103         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
104         if err != nil {
105                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
106                 return
107         }
108         defer bufs.Put(buf)
109
110         size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
111         if err != nil {
112                 code := http.StatusInternalServerError
113                 if err, ok := err.(*KeepError); ok {
114                         code = err.HTTPCode
115                 }
116                 http.Error(resp, err.Error(), code)
117                 return
118         }
119
120         resp.Header().Set("Content-Length", strconv.Itoa(size))
121         resp.Header().Set("Content-Type", "application/octet-stream")
122         resp.Write(buf[:size])
123 }
124
125 // Return a new context that gets cancelled by resp's CloseNotifier.
126 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
127         ctx, cancel := context.WithCancel(parent)
128         if cn, ok := resp.(http.CloseNotifier); ok {
129                 go func(c <-chan bool) {
130                         select {
131                         case <-c:
132                                 theConfig.debugLogf("cancel context")
133                                 cancel()
134                         case <-ctx.Done():
135                         }
136                 }(cn.CloseNotify())
137         }
138         return ctx, cancel
139 }
140
141 // Get a buffer from the pool -- but give up and return a non-nil
142 // error if ctx ends before we get a buffer.
143 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
144         bufReady := make(chan []byte)
145         go func() {
146                 bufReady <- bufs.Get(bufSize)
147         }()
148         select {
149         case buf := <-bufReady:
150                 return buf, nil
151         case <-ctx.Done():
152                 go func() {
153                         // Even if closeNotifier happened first, we
154                         // need to keep waiting for our buf so we can
155                         // return it to the pool.
156                         bufs.Put(<-bufReady)
157                 }()
158                 return nil, ErrClientDisconnect
159         }
160 }
161
162 // PutBlockHandler is a HandleFunc to address Put block requests.
163 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
164         ctx, cancel := contextForResponse(context.TODO(), resp)
165         defer cancel()
166
167         hash := mux.Vars(req)["hash"]
168
169         // Detect as many error conditions as possible before reading
170         // the body: avoid transmitting data that will not end up
171         // being written anyway.
172
173         if req.ContentLength == -1 {
174                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
175                 return
176         }
177
178         if req.ContentLength > BlockSize {
179                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
180                 return
181         }
182
183         if len(KeepVM.AllWritable()) == 0 {
184                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
185                 return
186         }
187
188         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
189         if err != nil {
190                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
191                 return
192         }
193
194         _, err = io.ReadFull(req.Body, buf)
195         if err != nil {
196                 http.Error(resp, err.Error(), 500)
197                 bufs.Put(buf)
198                 return
199         }
200
201         replication, err := PutBlock(ctx, buf, hash)
202         bufs.Put(buf)
203
204         if err != nil {
205                 code := http.StatusInternalServerError
206                 if err, ok := err.(*KeepError); ok {
207                         code = err.HTTPCode
208                 }
209                 http.Error(resp, err.Error(), code)
210                 return
211         }
212
213         // Success; add a size hint, sign the locator if possible, and
214         // return it to the client.
215         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
216         apiToken := GetAPIToken(req)
217         if theConfig.blobSigningKey != nil && apiToken != "" {
218                 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
219                 returnHash = SignLocator(returnHash, apiToken, expiry)
220         }
221         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
222         resp.Write([]byte(returnHash + "\n"))
223 }
224
225 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
226 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
227         // Reject unauthorized requests.
228         if !IsSystemAuth(GetAPIToken(req)) {
229                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
230                 return
231         }
232
233         prefix := mux.Vars(req)["prefix"]
234
235         for _, vol := range KeepVM.AllReadable() {
236                 if err := vol.IndexTo(prefix, resp); err != nil {
237                         // The only errors returned by IndexTo are
238                         // write errors returned by resp.Write(),
239                         // which probably means the client has
240                         // disconnected and this error will never be
241                         // reported to the client -- but it will
242                         // appear in our own error log.
243                         http.Error(resp, err.Error(), http.StatusInternalServerError)
244                         return
245                 }
246         }
247         // An empty line at EOF is the only way the client can be
248         // assured the entire index was received.
249         resp.Write([]byte{'\n'})
250 }
251
252 // PoolStatus struct
253 type PoolStatus struct {
254         Alloc uint64 `json:"BytesAllocated"`
255         Cap   int    `json:"BuffersMax"`
256         Len   int    `json:"BuffersInUse"`
257 }
258
259 type volumeStatusEnt struct {
260         Label         string
261         Status        *VolumeStatus `json:",omitempty"`
262         VolumeStats   *ioStats      `json:",omitempty"`
263         InternalStats interface{}   `json:",omitempty"`
264 }
265
266 // NodeStatus struct
267 type NodeStatus struct {
268         Volumes         []*volumeStatusEnt
269         BufferPool      PoolStatus
270         PullQueue       WorkQueueStatus
271         TrashQueue      WorkQueueStatus
272         RequestsCurrent int
273         RequestsMax     int
274 }
275
276 var st NodeStatus
277 var stLock sync.Mutex
278
279 // DebugHandler addresses /debug.json requests.
280 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
281         type debugStats struct {
282                 MemStats runtime.MemStats
283         }
284         var ds debugStats
285         runtime.ReadMemStats(&ds.MemStats)
286         err := json.NewEncoder(resp).Encode(&ds)
287         if err != nil {
288                 http.Error(resp, err.Error(), 500)
289         }
290 }
291
292 // StatusHandler addresses /status.json requests.
293 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
294         stLock.Lock()
295         rtr.readNodeStatus(&st)
296         jstat, err := json.Marshal(&st)
297         stLock.Unlock()
298         if err == nil {
299                 resp.Write(jstat)
300         } else {
301                 log.Printf("json.Marshal: %s", err)
302                 log.Printf("NodeStatus = %v", &st)
303                 http.Error(resp, err.Error(), 500)
304         }
305 }
306
307 // populate the given NodeStatus struct with current values.
308 func (rtr *router) readNodeStatus(st *NodeStatus) {
309         vols := KeepVM.AllReadable()
310         if cap(st.Volumes) < len(vols) {
311                 st.Volumes = make([]*volumeStatusEnt, len(vols))
312         }
313         st.Volumes = st.Volumes[:0]
314         for _, vol := range vols {
315                 var internalStats interface{}
316                 if vol, ok := vol.(InternalStatser); ok {
317                         internalStats = vol.InternalStats()
318                 }
319                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
320                         Label:         vol.String(),
321                         Status:        vol.Status(),
322                         InternalStats: internalStats,
323                         //VolumeStats: KeepVM.VolumeStats(vol),
324                 })
325         }
326         st.BufferPool.Alloc = bufs.Alloc()
327         st.BufferPool.Cap = bufs.Cap()
328         st.BufferPool.Len = bufs.Len()
329         st.PullQueue = getWorkQueueStatus(pullq)
330         st.TrashQueue = getWorkQueueStatus(trashq)
331         if rtr.limiter != nil {
332                 st.RequestsCurrent = rtr.limiter.Current()
333                 st.RequestsMax = rtr.limiter.Max()
334         }
335 }
336
337 // return a WorkQueueStatus for the given queue. If q is nil (which
338 // should never happen except in test suites), return a zero status
339 // value instead of crashing.
340 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
341         if q == nil {
342                 // This should only happen during tests.
343                 return WorkQueueStatus{}
344         }
345         return q.Status()
346 }
347
348 // DeleteHandler processes DELETE requests.
349 //
350 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
351 // from all connected volumes.
352 //
353 // Only the Data Manager, or an Arvados admin with scope "all", are
354 // allowed to issue DELETE requests.  If a DELETE request is not
355 // authenticated or is issued by a non-admin user, the server returns
356 // a PermissionError.
357 //
358 // Upon receiving a valid request from an authorized user,
359 // DeleteHandler deletes all copies of the specified block on local
360 // writable volumes.
361 //
362 // Response format:
363 //
364 // If the requested blocks was not found on any volume, the response
365 // code is HTTP 404 Not Found.
366 //
367 // Otherwise, the response code is 200 OK, with a response body
368 // consisting of the JSON message
369 //
370 //    {"copies_deleted":d,"copies_failed":f}
371 //
372 // where d and f are integers representing the number of blocks that
373 // were successfully and unsuccessfully deleted.
374 //
375 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
376         hash := mux.Vars(req)["hash"]
377
378         // Confirm that this user is an admin and has a token with unlimited scope.
379         var tok = GetAPIToken(req)
380         if tok == "" || !CanDelete(tok) {
381                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
382                 return
383         }
384
385         if !theConfig.EnableDelete {
386                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
387                 return
388         }
389
390         // Delete copies of this block from all available volumes.
391         // Report how many blocks were successfully deleted, and how
392         // many were found on writable volumes but not deleted.
393         var result struct {
394                 Deleted int `json:"copies_deleted"`
395                 Failed  int `json:"copies_failed"`
396         }
397         for _, vol := range KeepVM.AllWritable() {
398                 if err := vol.Trash(hash); err == nil {
399                         result.Deleted++
400                 } else if os.IsNotExist(err) {
401                         continue
402                 } else {
403                         result.Failed++
404                         log.Println("DeleteHandler:", err)
405                 }
406         }
407
408         var st int
409
410         if result.Deleted == 0 && result.Failed == 0 {
411                 st = http.StatusNotFound
412         } else {
413                 st = http.StatusOK
414         }
415
416         resp.WriteHeader(st)
417
418         if st == http.StatusOK {
419                 if body, err := json.Marshal(result); err == nil {
420                         resp.Write(body)
421                 } else {
422                         log.Printf("json.Marshal: %s (result = %v)", err, result)
423                         http.Error(resp, err.Error(), 500)
424                 }
425         }
426 }
427
428 /* PullHandler processes "PUT /pull" requests for the data manager.
429    The request body is a JSON message containing a list of pull
430    requests in the following format:
431
432    [
433       {
434          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
435          "servers":[
436                         "keep0.qr1hi.arvadosapi.com:25107",
437                         "keep1.qr1hi.arvadosapi.com:25108"
438                  ]
439           },
440           {
441                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
442                  "servers":[
443                         "10.0.1.5:25107",
444                         "10.0.1.6:25107",
445                         "10.0.1.7:25108"
446                  ]
447           },
448           ...
449    ]
450
451    Each pull request in the list consists of a block locator string
452    and an ordered list of servers.  Keepstore should try to fetch the
453    block from each server in turn.
454
455    If the request has not been sent by the Data Manager, return 401
456    Unauthorized.
457
458    If the JSON unmarshalling fails, return 400 Bad Request.
459 */
460
461 // PullRequest consists of a block locator and an ordered list of servers
462 type PullRequest struct {
463         Locator string   `json:"locator"`
464         Servers []string `json:"servers"`
465 }
466
467 // PullHandler processes "PUT /pull" requests for the data manager.
468 func PullHandler(resp http.ResponseWriter, req *http.Request) {
469         // Reject unauthorized requests.
470         if !IsSystemAuth(GetAPIToken(req)) {
471                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
472                 return
473         }
474
475         // Parse the request body.
476         var pr []PullRequest
477         r := json.NewDecoder(req.Body)
478         if err := r.Decode(&pr); err != nil {
479                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
480                 return
481         }
482
483         // We have a properly formatted pull list sent from the data
484         // manager.  Report success and send the list to the pull list
485         // manager for further handling.
486         resp.WriteHeader(http.StatusOK)
487         resp.Write([]byte(
488                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
489
490         plist := list.New()
491         for _, p := range pr {
492                 plist.PushBack(p)
493         }
494         pullq.ReplaceQueue(plist)
495 }
496
497 // TrashRequest consists of a block locator and it's Mtime
498 type TrashRequest struct {
499         Locator    string `json:"locator"`
500         BlockMtime int64  `json:"block_mtime"`
501 }
502
503 // TrashHandler processes /trash requests.
504 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
505         // Reject unauthorized requests.
506         if !IsSystemAuth(GetAPIToken(req)) {
507                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
508                 return
509         }
510
511         // Parse the request body.
512         var trash []TrashRequest
513         r := json.NewDecoder(req.Body)
514         if err := r.Decode(&trash); err != nil {
515                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
516                 return
517         }
518
519         // We have a properly formatted trash list sent from the data
520         // manager.  Report success and send the list to the trash work
521         // queue for further handling.
522         resp.WriteHeader(http.StatusOK)
523         resp.Write([]byte(
524                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
525
526         tlist := list.New()
527         for _, t := range trash {
528                 tlist.PushBack(t)
529         }
530         trashq.ReplaceQueue(tlist)
531 }
532
533 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
534 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
535         // Reject unauthorized requests.
536         if !IsSystemAuth(GetAPIToken(req)) {
537                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
538                 return
539         }
540
541         hash := mux.Vars(req)["hash"]
542
543         if len(KeepVM.AllWritable()) == 0 {
544                 http.Error(resp, "No writable volumes", http.StatusNotFound)
545                 return
546         }
547
548         var untrashedOn, failedOn []string
549         var numNotFound int
550         for _, vol := range KeepVM.AllWritable() {
551                 err := vol.Untrash(hash)
552
553                 if os.IsNotExist(err) {
554                         numNotFound++
555                 } else if err != nil {
556                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
557                         failedOn = append(failedOn, vol.String())
558                 } else {
559                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
560                         untrashedOn = append(untrashedOn, vol.String())
561                 }
562         }
563
564         if numNotFound == len(KeepVM.AllWritable()) {
565                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
566                 return
567         }
568
569         if len(failedOn) == len(KeepVM.AllWritable()) {
570                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
571         } else {
572                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
573                 if len(failedOn) > 0 {
574                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
575                 }
576                 resp.Write([]byte(respBody))
577         }
578 }
579
580 // GetBlock and PutBlock implement lower-level code for handling
581 // blocks by rooting through volumes connected to the local machine.
582 // Once the handler has determined that system policy permits the
583 // request, it calls these methods to perform the actual operation.
584 //
585 // TODO(twp): this code would probably be better located in the
586 // VolumeManager interface. As an abstraction, the VolumeManager
587 // should be the only part of the code that cares about which volume a
588 // block is stored on, so it should be responsible for figuring out
589 // which volume to check for fetching blocks, storing blocks, etc.
590
591 // GetBlock fetches the block identified by "hash" into the provided
592 // buf, and returns the data size.
593 //
594 // If the block cannot be found on any volume, returns NotFoundError.
595 //
596 // If the block found does not have the correct MD5 hash, returns
597 // DiskHashError.
598 //
599 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
600         // Attempt to read the requested hash from a keep volume.
601         errorToCaller := NotFoundError
602
603         for _, vol := range KeepVM.AllReadable() {
604                 size, err := vol.Get(ctx, hash, buf)
605                 select {
606                 case <-ctx.Done():
607                         return 0, ErrClientDisconnect
608                 default:
609                 }
610                 if err != nil {
611                         // IsNotExist is an expected error and may be
612                         // ignored. All other errors are logged. In
613                         // any case we continue trying to read other
614                         // volumes. If all volumes report IsNotExist,
615                         // we return a NotFoundError.
616                         if !os.IsNotExist(err) {
617                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
618                         }
619                         continue
620                 }
621                 // Check the file checksum.
622                 //
623                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
624                 if filehash != hash {
625                         // TODO: Try harder to tell a sysadmin about
626                         // this.
627                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
628                                 vol, hash, filehash)
629                         errorToCaller = DiskHashError
630                         continue
631                 }
632                 if errorToCaller == DiskHashError {
633                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
634                                 vol, hash)
635                 }
636                 return size, nil
637         }
638         return 0, errorToCaller
639 }
640
641 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
642 //
643 // PutBlock(ctx, block, hash)
644 //   Stores the BLOCK (identified by the content id HASH) in Keep.
645 //
646 //   The MD5 checksum of the block must be identical to the content id HASH.
647 //   If not, an error is returned.
648 //
649 //   PutBlock stores the BLOCK on the first Keep volume with free space.
650 //   A failure code is returned to the user only if all volumes fail.
651 //
652 //   On success, PutBlock returns nil.
653 //   On failure, it returns a KeepError with one of the following codes:
654 //
655 //   500 Collision
656 //          A different block with the same hash already exists on this
657 //          Keep server.
658 //   422 MD5Fail
659 //          The MD5 hash of the BLOCK does not match the argument HASH.
660 //   503 Full
661 //          There was not enough space left in any Keep volume to store
662 //          the object.
663 //   500 Fail
664 //          The object could not be stored for some other reason (e.g.
665 //          all writes failed). The text of the error message should
666 //          provide as much detail as possible.
667 //
668 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
669         // Check that BLOCK's checksum matches HASH.
670         blockhash := fmt.Sprintf("%x", md5.Sum(block))
671         if blockhash != hash {
672                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
673                 return 0, RequestHashError
674         }
675
676         // If we already have this data, it's intact on disk, and we
677         // can update its timestamp, return success. If we have
678         // different data with the same hash, return failure.
679         if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
680                 return n, err
681         } else if ctx.Err() != nil {
682                 return 0, ErrClientDisconnect
683         }
684
685         // Choose a Keep volume to write to.
686         // If this volume fails, try all of the volumes in order.
687         if vol := KeepVM.NextWritable(); vol != nil {
688                 if err := vol.Put(ctx, hash, block); err == nil {
689                         return vol.Replication(), nil // success!
690                 }
691                 if ctx.Err() != nil {
692                         return 0, ErrClientDisconnect
693                 }
694         }
695
696         writables := KeepVM.AllWritable()
697         if len(writables) == 0 {
698                 log.Print("No writable volumes.")
699                 return 0, FullError
700         }
701
702         allFull := true
703         for _, vol := range writables {
704                 err := vol.Put(ctx, hash, block)
705                 if ctx.Err() != nil {
706                         return 0, ErrClientDisconnect
707                 }
708                 if err == nil {
709                         return vol.Replication(), nil // success!
710                 }
711                 if err != FullError {
712                         // The volume is not full but the
713                         // write did not succeed.  Report the
714                         // error and continue trying.
715                         allFull = false
716                         log.Printf("%s: Write(%s): %s", vol, hash, err)
717                 }
718         }
719
720         if allFull {
721                 log.Print("All volumes are full.")
722                 return 0, FullError
723         }
724         // Already logged the non-full errors.
725         return 0, GenericError
726 }
727
728 // CompareAndTouch returns the current replication level if one of the
729 // volumes already has the given content and it successfully updates
730 // the relevant block's modification time in order to protect it from
731 // premature garbage collection. Otherwise, it returns a non-nil
732 // error.
733 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
734         var bestErr error = NotFoundError
735         for _, vol := range KeepVM.AllWritable() {
736                 err := vol.Compare(ctx, hash, buf)
737                 if ctx.Err() != nil {
738                         return 0, ctx.Err()
739                 } else if err == CollisionError {
740                         // Stop if we have a block with same hash but
741                         // different content. (It will be impossible
742                         // to tell which one is wanted if we have
743                         // both, so there's no point writing it even
744                         // on a different volume.)
745                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
746                         return 0, err
747                 } else if os.IsNotExist(err) {
748                         // Block does not exist. This is the only
749                         // "normal" error: we don't log anything.
750                         continue
751                 } else if err != nil {
752                         // Couldn't open file, data is corrupt on
753                         // disk, etc.: log this abnormal condition,
754                         // and try the next volume.
755                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
756                         continue
757                 }
758                 if err := vol.Touch(hash); err != nil {
759                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
760                         bestErr = err
761                         continue
762                 }
763                 // Compare and Touch both worked --> done.
764                 return vol.Replication(), nil
765         }
766         return 0, bestErr
767 }
768
769 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
770
771 // IsValidLocator returns true if the specified string is a valid Keep locator.
772 //   When Keep is extended to support hash types other than MD5,
773 //   this should be updated to cover those as well.
774 //
775 func IsValidLocator(loc string) bool {
776         return validLocatorRe.MatchString(loc)
777 }
778
779 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
780
781 // GetAPIToken returns the OAuth2 token from the Authorization
782 // header of a HTTP request, or an empty string if no matching
783 // token is found.
784 func GetAPIToken(req *http.Request) string {
785         if auth, ok := req.Header["Authorization"]; ok {
786                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
787                         return match[1]
788                 }
789         }
790         return ""
791 }
792
793 // IsExpired returns true if the given Unix timestamp (expressed as a
794 // hexadecimal string) is in the past, or if timestampHex cannot be
795 // parsed as a hexadecimal string.
796 func IsExpired(timestampHex string) bool {
797         ts, err := strconv.ParseInt(timestampHex, 16, 0)
798         if err != nil {
799                 log.Printf("IsExpired: %s", err)
800                 return true
801         }
802         return time.Unix(ts, 0).Before(time.Now())
803 }
804
805 // CanDelete returns true if the user identified by apiToken is
806 // allowed to delete blocks.
807 func CanDelete(apiToken string) bool {
808         if apiToken == "" {
809                 return false
810         }
811         // Blocks may be deleted only when Keep has been configured with a
812         // data manager.
813         if IsSystemAuth(apiToken) {
814                 return true
815         }
816         // TODO(twp): look up apiToken with the API server
817         // return true if is_admin is true and if the token
818         // has unlimited scope
819         return false
820 }
821
822 // IsSystemAuth returns true if the given token is allowed to perform
823 // system level actions like deleting data.
824 func IsSystemAuth(token string) bool {
825         return token != "" && token == theConfig.systemAuthToken
826 }