Fix a typo in Crunch Dispatch documentation. Arvados-DCO-1.1-Signed-off-by: Pavel...
[arvados.git] / services / keepstore / handlers.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 // REST handlers for Keep are implemented here.
8 //
9 // GetBlockHandler (GET /locator)
10 // PutBlockHandler (PUT /locator)
11 // IndexHandler    (GET /index, GET /index/prefix)
12 // StatusHandler   (GET /status.json)
13
14 import (
15         "container/list"
16         "context"
17         "crypto/md5"
18         "encoding/json"
19         "fmt"
20         "io"
21         "net/http"
22         "os"
23         "regexp"
24         "runtime"
25         "strconv"
26         "strings"
27         "sync"
28         "time"
29
30         "github.com/gorilla/mux"
31
32         "git.curoverse.com/arvados.git/sdk/go/health"
33         "git.curoverse.com/arvados.git/sdk/go/httpserver"
34 )
35
36 type router struct {
37         *mux.Router
38         limiter httpserver.RequestCounter
39 }
40
41 // MakeRESTRouter returns a new router that forwards all Keep requests
42 // to the appropriate handlers.
43 func MakeRESTRouter() http.Handler {
44         rtr := &router{Router: mux.NewRouter()}
45
46         rtr.HandleFunc(
47                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
48         rtr.HandleFunc(
49                 `/{hash:[0-9a-f]{32}}+{hints}`,
50                 GetBlockHandler).Methods("GET", "HEAD")
51
52         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
53         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
54         // List all blocks stored here. Privileged client only.
55         rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
56         // List blocks stored here whose hash has the given prefix.
57         // Privileged client only.
58         rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
59
60         // Internals/debugging info (runtime.MemStats)
61         rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
62
63         // List volumes: path, device number, bytes used/avail.
64         rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
65
66         // List mounts: UUID, readonly, tier, device ID, ...
67         rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
68         rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
69         rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
70
71         // Replace the current pull queue.
72         rtr.HandleFunc(`/pull`, PullHandler).Methods("PUT")
73
74         // Replace the current trash queue.
75         rtr.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
76
77         // Untrash moves blocks from trash back into store
78         rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
79
80         rtr.Handle("/_health/{check}", &health.Handler{
81                 Token:  theConfig.ManagementToken,
82                 Prefix: "/_health/",
83         }).Methods("GET")
84
85         // Any request which does not match any of these routes gets
86         // 400 Bad Request.
87         rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
88
89         theConfig.metrics.setup()
90
91         rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
92
93         mux := http.NewServeMux()
94         mux.Handle("/", theConfig.metrics.Instrument(
95                 httpserver.AddRequestIDs(httpserver.LogRequests(rtr.limiter))))
96         mux.HandleFunc("/metrics.json", theConfig.metrics.exportJSON)
97         mux.Handle("/metrics", theConfig.metrics.exportProm)
98
99         return mux
100 }
101
102 // BadRequestHandler is a HandleFunc to address bad requests.
103 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
104         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
105 }
106
107 // GetBlockHandler is a HandleFunc to address Get block requests.
108 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
109         ctx, cancel := contextForResponse(context.TODO(), resp)
110         defer cancel()
111
112         if theConfig.RequireSignatures {
113                 locator := req.URL.Path[1:] // strip leading slash
114                 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
115                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
116                         return
117                 }
118         }
119
120         // TODO: Probe volumes to check whether the block _might_
121         // exist. Some volumes/types could support a quick existence
122         // check without causing other operations to suffer. If all
123         // volumes support that, and assure us the block definitely
124         // isn't here, we can return 404 now instead of waiting for a
125         // buffer.
126
127         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
128         if err != nil {
129                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
130                 return
131         }
132         defer bufs.Put(buf)
133
134         size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
135         if err != nil {
136                 code := http.StatusInternalServerError
137                 if err, ok := err.(*KeepError); ok {
138                         code = err.HTTPCode
139                 }
140                 http.Error(resp, err.Error(), code)
141                 return
142         }
143
144         resp.Header().Set("Content-Length", strconv.Itoa(size))
145         resp.Header().Set("Content-Type", "application/octet-stream")
146         resp.Write(buf[:size])
147 }
148
149 // Return a new context that gets cancelled by resp's CloseNotifier.
150 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
151         ctx, cancel := context.WithCancel(parent)
152         if cn, ok := resp.(http.CloseNotifier); ok {
153                 go func(c <-chan bool) {
154                         select {
155                         case <-c:
156                                 theConfig.debugLogf("cancel context")
157                                 cancel()
158                         case <-ctx.Done():
159                         }
160                 }(cn.CloseNotify())
161         }
162         return ctx, cancel
163 }
164
165 // Get a buffer from the pool -- but give up and return a non-nil
166 // error if ctx ends before we get a buffer.
167 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
168         bufReady := make(chan []byte)
169         go func() {
170                 bufReady <- bufs.Get(bufSize)
171         }()
172         select {
173         case buf := <-bufReady:
174                 return buf, nil
175         case <-ctx.Done():
176                 go func() {
177                         // Even if closeNotifier happened first, we
178                         // need to keep waiting for our buf so we can
179                         // return it to the pool.
180                         bufs.Put(<-bufReady)
181                 }()
182                 return nil, ErrClientDisconnect
183         }
184 }
185
186 // PutBlockHandler is a HandleFunc to address Put block requests.
187 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
188         ctx, cancel := contextForResponse(context.TODO(), resp)
189         defer cancel()
190
191         hash := mux.Vars(req)["hash"]
192
193         // Detect as many error conditions as possible before reading
194         // the body: avoid transmitting data that will not end up
195         // being written anyway.
196
197         if req.ContentLength == -1 {
198                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
199                 return
200         }
201
202         if req.ContentLength > BlockSize {
203                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
204                 return
205         }
206
207         if len(KeepVM.AllWritable()) == 0 {
208                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
209                 return
210         }
211
212         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
213         if err != nil {
214                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
215                 return
216         }
217
218         _, err = io.ReadFull(req.Body, buf)
219         if err != nil {
220                 http.Error(resp, err.Error(), 500)
221                 bufs.Put(buf)
222                 return
223         }
224
225         replication, err := PutBlock(ctx, buf, hash)
226         bufs.Put(buf)
227
228         if err != nil {
229                 code := http.StatusInternalServerError
230                 if err, ok := err.(*KeepError); ok {
231                         code = err.HTTPCode
232                 }
233                 http.Error(resp, err.Error(), code)
234                 return
235         }
236
237         // Success; add a size hint, sign the locator if possible, and
238         // return it to the client.
239         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
240         apiToken := GetAPIToken(req)
241         if theConfig.blobSigningKey != nil && apiToken != "" {
242                 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
243                 returnHash = SignLocator(returnHash, apiToken, expiry)
244         }
245         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
246         resp.Write([]byte(returnHash + "\n"))
247 }
248
249 // IndexHandler responds to "/index", "/index/{prefix}", and
250 // "/mounts/{uuid}/blocks" requests.
251 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
252         if !IsSystemAuth(GetAPIToken(req)) {
253                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
254                 return
255         }
256
257         prefix := mux.Vars(req)["prefix"]
258         if prefix == "" {
259                 req.ParseForm()
260                 prefix = req.Form.Get("prefix")
261         }
262
263         uuid := mux.Vars(req)["uuid"]
264
265         var vols []Volume
266         if uuid == "" {
267                 vols = KeepVM.AllReadable()
268         } else if v := KeepVM.Lookup(uuid, false); v == nil {
269                 http.Error(resp, "mount not found", http.StatusNotFound)
270                 return
271         } else {
272                 vols = []Volume{v}
273         }
274
275         for _, v := range vols {
276                 if err := v.IndexTo(prefix, resp); err != nil {
277                         // The only errors returned by IndexTo are
278                         // write errors returned by resp.Write(),
279                         // which probably means the client has
280                         // disconnected and this error will never be
281                         // reported to the client -- but it will
282                         // appear in our own error log.
283                         http.Error(resp, err.Error(), http.StatusInternalServerError)
284                         return
285                 }
286         }
287         // An empty line at EOF is the only way the client can be
288         // assured the entire index was received.
289         resp.Write([]byte{'\n'})
290 }
291
292 // MountsHandler responds to "GET /mounts" requests.
293 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
294         err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
295         if err != nil {
296                 http.Error(resp, err.Error(), http.StatusInternalServerError)
297         }
298 }
299
300 // PoolStatus struct
301 type PoolStatus struct {
302         Alloc uint64 `json:"BytesAllocatedCumulative"`
303         Cap   int    `json:"BuffersMax"`
304         Len   int    `json:"BuffersInUse"`
305 }
306
307 type volumeStatusEnt struct {
308         Label         string
309         Status        *VolumeStatus `json:",omitempty"`
310         VolumeStats   *ioStats      `json:",omitempty"`
311         InternalStats interface{}   `json:",omitempty"`
312 }
313
314 // NodeStatus struct
315 type NodeStatus struct {
316         Volumes         []*volumeStatusEnt
317         BufferPool      PoolStatus
318         PullQueue       WorkQueueStatus
319         TrashQueue      WorkQueueStatus
320         RequestsCurrent int
321         RequestsMax     int
322         Version         string
323 }
324
325 var st NodeStatus
326 var stLock sync.Mutex
327
328 // DebugHandler addresses /debug.json requests.
329 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
330         type debugStats struct {
331                 MemStats runtime.MemStats
332         }
333         var ds debugStats
334         runtime.ReadMemStats(&ds.MemStats)
335         err := json.NewEncoder(resp).Encode(&ds)
336         if err != nil {
337                 http.Error(resp, err.Error(), 500)
338         }
339 }
340
341 // StatusHandler addresses /status.json requests.
342 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
343         stLock.Lock()
344         rtr.readNodeStatus(&st)
345         jstat, err := json.Marshal(&st)
346         stLock.Unlock()
347         if err == nil {
348                 resp.Write(jstat)
349         } else {
350                 log.Printf("json.Marshal: %s", err)
351                 log.Printf("NodeStatus = %v", &st)
352                 http.Error(resp, err.Error(), 500)
353         }
354 }
355
356 // populate the given NodeStatus struct with current values.
357 func (rtr *router) readNodeStatus(st *NodeStatus) {
358         st.Version = version
359         vols := KeepVM.AllReadable()
360         if cap(st.Volumes) < len(vols) {
361                 st.Volumes = make([]*volumeStatusEnt, len(vols))
362         }
363         st.Volumes = st.Volumes[:0]
364         for _, vol := range vols {
365                 var internalStats interface{}
366                 if vol, ok := vol.(InternalStatser); ok {
367                         internalStats = vol.InternalStats()
368                 }
369                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
370                         Label:         vol.String(),
371                         Status:        vol.Status(),
372                         InternalStats: internalStats,
373                         //VolumeStats: KeepVM.VolumeStats(vol),
374                 })
375         }
376         st.BufferPool.Alloc = bufs.Alloc()
377         st.BufferPool.Cap = bufs.Cap()
378         st.BufferPool.Len = bufs.Len()
379         st.PullQueue = getWorkQueueStatus(pullq)
380         st.TrashQueue = getWorkQueueStatus(trashq)
381         if rtr.limiter != nil {
382                 st.RequestsCurrent = rtr.limiter.Current()
383                 st.RequestsMax = rtr.limiter.Max()
384         }
385 }
386
387 // return a WorkQueueStatus for the given queue. If q is nil (which
388 // should never happen except in test suites), return a zero status
389 // value instead of crashing.
390 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
391         if q == nil {
392                 // This should only happen during tests.
393                 return WorkQueueStatus{}
394         }
395         return q.Status()
396 }
397
398 // DeleteHandler processes DELETE requests.
399 //
400 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
401 // from all connected volumes.
402 //
403 // Only the Data Manager, or an Arvados admin with scope "all", are
404 // allowed to issue DELETE requests.  If a DELETE request is not
405 // authenticated or is issued by a non-admin user, the server returns
406 // a PermissionError.
407 //
408 // Upon receiving a valid request from an authorized user,
409 // DeleteHandler deletes all copies of the specified block on local
410 // writable volumes.
411 //
412 // Response format:
413 //
414 // If the requested blocks was not found on any volume, the response
415 // code is HTTP 404 Not Found.
416 //
417 // Otherwise, the response code is 200 OK, with a response body
418 // consisting of the JSON message
419 //
420 //    {"copies_deleted":d,"copies_failed":f}
421 //
422 // where d and f are integers representing the number of blocks that
423 // were successfully and unsuccessfully deleted.
424 //
425 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
426         hash := mux.Vars(req)["hash"]
427
428         // Confirm that this user is an admin and has a token with unlimited scope.
429         var tok = GetAPIToken(req)
430         if tok == "" || !CanDelete(tok) {
431                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
432                 return
433         }
434
435         if !theConfig.EnableDelete {
436                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
437                 return
438         }
439
440         // Delete copies of this block from all available volumes.
441         // Report how many blocks were successfully deleted, and how
442         // many were found on writable volumes but not deleted.
443         var result struct {
444                 Deleted int `json:"copies_deleted"`
445                 Failed  int `json:"copies_failed"`
446         }
447         for _, vol := range KeepVM.AllWritable() {
448                 if err := vol.Trash(hash); err == nil {
449                         result.Deleted++
450                 } else if os.IsNotExist(err) {
451                         continue
452                 } else {
453                         result.Failed++
454                         log.Println("DeleteHandler:", err)
455                 }
456         }
457
458         var st int
459
460         if result.Deleted == 0 && result.Failed == 0 {
461                 st = http.StatusNotFound
462         } else {
463                 st = http.StatusOK
464         }
465
466         resp.WriteHeader(st)
467
468         if st == http.StatusOK {
469                 if body, err := json.Marshal(result); err == nil {
470                         resp.Write(body)
471                 } else {
472                         log.Printf("json.Marshal: %s (result = %v)", err, result)
473                         http.Error(resp, err.Error(), 500)
474                 }
475         }
476 }
477
478 /* PullHandler processes "PUT /pull" requests for the data manager.
479    The request body is a JSON message containing a list of pull
480    requests in the following format:
481
482    [
483       {
484          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
485          "servers":[
486                         "keep0.qr1hi.arvadosapi.com:25107",
487                         "keep1.qr1hi.arvadosapi.com:25108"
488                  ]
489           },
490           {
491                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
492                  "servers":[
493                         "10.0.1.5:25107",
494                         "10.0.1.6:25107",
495                         "10.0.1.7:25108"
496                  ]
497           },
498           ...
499    ]
500
501    Each pull request in the list consists of a block locator string
502    and an ordered list of servers.  Keepstore should try to fetch the
503    block from each server in turn.
504
505    If the request has not been sent by the Data Manager, return 401
506    Unauthorized.
507
508    If the JSON unmarshalling fails, return 400 Bad Request.
509 */
510
511 // PullRequest consists of a block locator and an ordered list of servers
512 type PullRequest struct {
513         Locator string   `json:"locator"`
514         Servers []string `json:"servers"`
515
516         // Destination mount, or "" for "anywhere"
517         MountUUID string `json:"mount_uuid"`
518 }
519
520 // PullHandler processes "PUT /pull" requests for the data manager.
521 func PullHandler(resp http.ResponseWriter, req *http.Request) {
522         // Reject unauthorized requests.
523         if !IsSystemAuth(GetAPIToken(req)) {
524                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
525                 return
526         }
527
528         // Parse the request body.
529         var pr []PullRequest
530         r := json.NewDecoder(req.Body)
531         if err := r.Decode(&pr); err != nil {
532                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
533                 return
534         }
535
536         // We have a properly formatted pull list sent from the data
537         // manager.  Report success and send the list to the pull list
538         // manager for further handling.
539         resp.WriteHeader(http.StatusOK)
540         resp.Write([]byte(
541                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
542
543         plist := list.New()
544         for _, p := range pr {
545                 plist.PushBack(p)
546         }
547         pullq.ReplaceQueue(plist)
548 }
549
550 // TrashRequest consists of a block locator and it's Mtime
551 type TrashRequest struct {
552         Locator    string `json:"locator"`
553         BlockMtime int64  `json:"block_mtime"`
554
555         // Target mount, or "" for "everywhere"
556         MountUUID string `json:"mount_uuid"`
557 }
558
559 // TrashHandler processes /trash requests.
560 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
561         // Reject unauthorized requests.
562         if !IsSystemAuth(GetAPIToken(req)) {
563                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
564                 return
565         }
566
567         // Parse the request body.
568         var trash []TrashRequest
569         r := json.NewDecoder(req.Body)
570         if err := r.Decode(&trash); err != nil {
571                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
572                 return
573         }
574
575         // We have a properly formatted trash list sent from the data
576         // manager.  Report success and send the list to the trash work
577         // queue for further handling.
578         resp.WriteHeader(http.StatusOK)
579         resp.Write([]byte(
580                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
581
582         tlist := list.New()
583         for _, t := range trash {
584                 tlist.PushBack(t)
585         }
586         trashq.ReplaceQueue(tlist)
587 }
588
589 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
590 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
591         // Reject unauthorized requests.
592         if !IsSystemAuth(GetAPIToken(req)) {
593                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
594                 return
595         }
596
597         hash := mux.Vars(req)["hash"]
598
599         if len(KeepVM.AllWritable()) == 0 {
600                 http.Error(resp, "No writable volumes", http.StatusNotFound)
601                 return
602         }
603
604         var untrashedOn, failedOn []string
605         var numNotFound int
606         for _, vol := range KeepVM.AllWritable() {
607                 err := vol.Untrash(hash)
608
609                 if os.IsNotExist(err) {
610                         numNotFound++
611                 } else if err != nil {
612                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
613                         failedOn = append(failedOn, vol.String())
614                 } else {
615                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
616                         untrashedOn = append(untrashedOn, vol.String())
617                 }
618         }
619
620         if numNotFound == len(KeepVM.AllWritable()) {
621                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
622                 return
623         }
624
625         if len(failedOn) == len(KeepVM.AllWritable()) {
626                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
627         } else {
628                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
629                 if len(failedOn) > 0 {
630                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
631                 }
632                 resp.Write([]byte(respBody))
633         }
634 }
635
636 // GetBlock and PutBlock implement lower-level code for handling
637 // blocks by rooting through volumes connected to the local machine.
638 // Once the handler has determined that system policy permits the
639 // request, it calls these methods to perform the actual operation.
640 //
641 // TODO(twp): this code would probably be better located in the
642 // VolumeManager interface. As an abstraction, the VolumeManager
643 // should be the only part of the code that cares about which volume a
644 // block is stored on, so it should be responsible for figuring out
645 // which volume to check for fetching blocks, storing blocks, etc.
646
647 // GetBlock fetches the block identified by "hash" into the provided
648 // buf, and returns the data size.
649 //
650 // If the block cannot be found on any volume, returns NotFoundError.
651 //
652 // If the block found does not have the correct MD5 hash, returns
653 // DiskHashError.
654 //
655 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
656         // Attempt to read the requested hash from a keep volume.
657         errorToCaller := NotFoundError
658
659         for _, vol := range KeepVM.AllReadable() {
660                 size, err := vol.Get(ctx, hash, buf)
661                 select {
662                 case <-ctx.Done():
663                         return 0, ErrClientDisconnect
664                 default:
665                 }
666                 if err != nil {
667                         // IsNotExist is an expected error and may be
668                         // ignored. All other errors are logged. In
669                         // any case we continue trying to read other
670                         // volumes. If all volumes report IsNotExist,
671                         // we return a NotFoundError.
672                         if !os.IsNotExist(err) {
673                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
674                         }
675                         continue
676                 }
677                 // Check the file checksum.
678                 //
679                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
680                 if filehash != hash {
681                         // TODO: Try harder to tell a sysadmin about
682                         // this.
683                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
684                                 vol, hash, filehash)
685                         errorToCaller = DiskHashError
686                         continue
687                 }
688                 if errorToCaller == DiskHashError {
689                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
690                                 vol, hash)
691                 }
692                 return size, nil
693         }
694         return 0, errorToCaller
695 }
696
697 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
698 //
699 // PutBlock(ctx, block, hash)
700 //   Stores the BLOCK (identified by the content id HASH) in Keep.
701 //
702 //   The MD5 checksum of the block must be identical to the content id HASH.
703 //   If not, an error is returned.
704 //
705 //   PutBlock stores the BLOCK on the first Keep volume with free space.
706 //   A failure code is returned to the user only if all volumes fail.
707 //
708 //   On success, PutBlock returns nil.
709 //   On failure, it returns a KeepError with one of the following codes:
710 //
711 //   500 Collision
712 //          A different block with the same hash already exists on this
713 //          Keep server.
714 //   422 MD5Fail
715 //          The MD5 hash of the BLOCK does not match the argument HASH.
716 //   503 Full
717 //          There was not enough space left in any Keep volume to store
718 //          the object.
719 //   500 Fail
720 //          The object could not be stored for some other reason (e.g.
721 //          all writes failed). The text of the error message should
722 //          provide as much detail as possible.
723 //
724 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
725         // Check that BLOCK's checksum matches HASH.
726         blockhash := fmt.Sprintf("%x", md5.Sum(block))
727         if blockhash != hash {
728                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
729                 return 0, RequestHashError
730         }
731
732         // If we already have this data, it's intact on disk, and we
733         // can update its timestamp, return success. If we have
734         // different data with the same hash, return failure.
735         if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
736                 return n, err
737         } else if ctx.Err() != nil {
738                 return 0, ErrClientDisconnect
739         }
740
741         // Choose a Keep volume to write to.
742         // If this volume fails, try all of the volumes in order.
743         if vol := KeepVM.NextWritable(); vol != nil {
744                 if err := vol.Put(ctx, hash, block); err == nil {
745                         return vol.Replication(), nil // success!
746                 }
747                 if ctx.Err() != nil {
748                         return 0, ErrClientDisconnect
749                 }
750         }
751
752         writables := KeepVM.AllWritable()
753         if len(writables) == 0 {
754                 log.Print("No writable volumes.")
755                 return 0, FullError
756         }
757
758         allFull := true
759         for _, vol := range writables {
760                 err := vol.Put(ctx, hash, block)
761                 if ctx.Err() != nil {
762                         return 0, ErrClientDisconnect
763                 }
764                 if err == nil {
765                         return vol.Replication(), nil // success!
766                 }
767                 if err != FullError {
768                         // The volume is not full but the
769                         // write did not succeed.  Report the
770                         // error and continue trying.
771                         allFull = false
772                         log.Printf("%s: Write(%s): %s", vol, hash, err)
773                 }
774         }
775
776         if allFull {
777                 log.Print("All volumes are full.")
778                 return 0, FullError
779         }
780         // Already logged the non-full errors.
781         return 0, GenericError
782 }
783
784 // CompareAndTouch returns the current replication level if one of the
785 // volumes already has the given content and it successfully updates
786 // the relevant block's modification time in order to protect it from
787 // premature garbage collection. Otherwise, it returns a non-nil
788 // error.
789 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
790         var bestErr error = NotFoundError
791         for _, vol := range KeepVM.AllWritable() {
792                 err := vol.Compare(ctx, hash, buf)
793                 if ctx.Err() != nil {
794                         return 0, ctx.Err()
795                 } else if err == CollisionError {
796                         // Stop if we have a block with same hash but
797                         // different content. (It will be impossible
798                         // to tell which one is wanted if we have
799                         // both, so there's no point writing it even
800                         // on a different volume.)
801                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
802                         return 0, err
803                 } else if os.IsNotExist(err) {
804                         // Block does not exist. This is the only
805                         // "normal" error: we don't log anything.
806                         continue
807                 } else if err != nil {
808                         // Couldn't open file, data is corrupt on
809                         // disk, etc.: log this abnormal condition,
810                         // and try the next volume.
811                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
812                         continue
813                 }
814                 if err := vol.Touch(hash); err != nil {
815                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
816                         bestErr = err
817                         continue
818                 }
819                 // Compare and Touch both worked --> done.
820                 return vol.Replication(), nil
821         }
822         return 0, bestErr
823 }
824
825 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
826
827 // IsValidLocator returns true if the specified string is a valid Keep locator.
828 //   When Keep is extended to support hash types other than MD5,
829 //   this should be updated to cover those as well.
830 //
831 func IsValidLocator(loc string) bool {
832         return validLocatorRe.MatchString(loc)
833 }
834
835 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
836
837 // GetAPIToken returns the OAuth2 token from the Authorization
838 // header of a HTTP request, or an empty string if no matching
839 // token is found.
840 func GetAPIToken(req *http.Request) string {
841         if auth, ok := req.Header["Authorization"]; ok {
842                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
843                         return match[1]
844                 }
845         }
846         return ""
847 }
848
849 // IsExpired returns true if the given Unix timestamp (expressed as a
850 // hexadecimal string) is in the past, or if timestampHex cannot be
851 // parsed as a hexadecimal string.
852 func IsExpired(timestampHex string) bool {
853         ts, err := strconv.ParseInt(timestampHex, 16, 0)
854         if err != nil {
855                 log.Printf("IsExpired: %s", err)
856                 return true
857         }
858         return time.Unix(ts, 0).Before(time.Now())
859 }
860
861 // CanDelete returns true if the user identified by apiToken is
862 // allowed to delete blocks.
863 func CanDelete(apiToken string) bool {
864         if apiToken == "" {
865                 return false
866         }
867         // Blocks may be deleted only when Keep has been configured with a
868         // data manager.
869         if IsSystemAuth(apiToken) {
870                 return true
871         }
872         // TODO(twp): look up apiToken with the API server
873         // return true if is_admin is true and if the token
874         // has unlimited scope
875         return false
876 }
877
878 // IsSystemAuth returns true if the given token is allowed to perform
879 // system level actions like deleting data.
880 func IsSystemAuth(token string) bool {
881         return token != "" && token == theConfig.systemAuthToken
882 }