13937: Refactors approach to pass volume metrics as curried vecs (WIP)
[arvados.git] / services / keepstore / handlers.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "container/list"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "os"
16         "regexp"
17         "runtime"
18         "strconv"
19         "strings"
20         "sync"
21         "time"
22
23         "git.curoverse.com/arvados.git/sdk/go/arvados"
24         "git.curoverse.com/arvados.git/sdk/go/health"
25         "git.curoverse.com/arvados.git/sdk/go/httpserver"
26         "github.com/gorilla/mux"
27         "github.com/prometheus/client_golang/prometheus"
28 )
29
30 type router struct {
31         *mux.Router
32         limiter     httpserver.RequestCounter
33         cluster     *arvados.Cluster
34         remoteProxy remoteProxy
35         metrics     *nodeMetrics
36 }
37
38 // MakeRESTRouter returns a new router that forwards all Keep requests
39 // to the appropriate handlers.
40 func MakeRESTRouter(cluster *arvados.Cluster, reg *prometheus.Registry) http.Handler {
41         rtr := &router{
42                 Router:  mux.NewRouter(),
43                 cluster: cluster,
44                 metrics: &nodeMetrics{reg: reg},
45         }
46
47         rtr.HandleFunc(
48                 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
49         rtr.HandleFunc(
50                 `/{hash:[0-9a-f]{32}}+{hints}`,
51                 rtr.handleGET).Methods("GET", "HEAD")
52
53         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
54         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
55         // List all blocks stored here. Privileged client only.
56         rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
57         // List blocks stored here whose hash has the given prefix.
58         // Privileged client only.
59         rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
60
61         // Internals/debugging info (runtime.MemStats)
62         rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
63
64         // List volumes: path, device number, bytes used/avail.
65         rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
66
67         // List mounts: UUID, readonly, tier, device ID, ...
68         rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
69         rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
70         rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
71
72         // Replace the current pull queue.
73         rtr.HandleFunc(`/pull`, PullHandler).Methods("PUT")
74
75         // Replace the current trash queue.
76         rtr.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
77
78         // Untrash moves blocks from trash back into store
79         rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
80
81         rtr.Handle("/_health/{check}", &health.Handler{
82                 Token:  theConfig.ManagementToken,
83                 Prefix: "/_health/",
84         }).Methods("GET")
85
86         // Any request which does not match any of these routes gets
87         // 400 Bad Request.
88         rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
89
90         rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
91         rtr.metrics.setupBufferPoolMetrics(bufs)
92         rtr.metrics.setupWorkQueueMetrics(pullq, "pull")
93         rtr.metrics.setupWorkQueueMetrics(trashq, "trash")
94         rtr.metrics.setupRequestMetrics(rtr.limiter)
95
96         instrumented := httpserver.Instrument(rtr.metrics.reg, nil,
97                 httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter)))
98         return instrumented.ServeAPI(theConfig.ManagementToken, instrumented)
99 }
100
101 // BadRequestHandler is a HandleFunc to address bad requests.
102 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
103         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
104 }
105
106 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
107         ctx, cancel := contextForResponse(context.TODO(), resp)
108         defer cancel()
109
110         locator := req.URL.Path[1:]
111         if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
112                 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster)
113                 return
114         }
115
116         if theConfig.RequireSignatures {
117                 locator := req.URL.Path[1:] // strip leading slash
118                 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
119                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
120                         return
121                 }
122         }
123
124         // TODO: Probe volumes to check whether the block _might_
125         // exist. Some volumes/types could support a quick existence
126         // check without causing other operations to suffer. If all
127         // volumes support that, and assure us the block definitely
128         // isn't here, we can return 404 now instead of waiting for a
129         // buffer.
130
131         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
132         if err != nil {
133                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
134                 return
135         }
136         defer bufs.Put(buf)
137
138         size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
139         if err != nil {
140                 code := http.StatusInternalServerError
141                 if err, ok := err.(*KeepError); ok {
142                         code = err.HTTPCode
143                 }
144                 http.Error(resp, err.Error(), code)
145                 return
146         }
147
148         resp.Header().Set("Content-Length", strconv.Itoa(size))
149         resp.Header().Set("Content-Type", "application/octet-stream")
150         resp.Write(buf[:size])
151 }
152
153 // Return a new context that gets cancelled by resp's CloseNotifier.
154 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
155         ctx, cancel := context.WithCancel(parent)
156         if cn, ok := resp.(http.CloseNotifier); ok {
157                 go func(c <-chan bool) {
158                         select {
159                         case <-c:
160                                 theConfig.debugLogf("cancel context")
161                                 cancel()
162                         case <-ctx.Done():
163                         }
164                 }(cn.CloseNotify())
165         }
166         return ctx, cancel
167 }
168
169 // Get a buffer from the pool -- but give up and return a non-nil
170 // error if ctx ends before we get a buffer.
171 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
172         bufReady := make(chan []byte)
173         go func() {
174                 bufReady <- bufs.Get(bufSize)
175         }()
176         select {
177         case buf := <-bufReady:
178                 return buf, nil
179         case <-ctx.Done():
180                 go func() {
181                         // Even if closeNotifier happened first, we
182                         // need to keep waiting for our buf so we can
183                         // return it to the pool.
184                         bufs.Put(<-bufReady)
185                 }()
186                 return nil, ErrClientDisconnect
187         }
188 }
189
190 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
191         ctx, cancel := contextForResponse(context.TODO(), resp)
192         defer cancel()
193
194         hash := mux.Vars(req)["hash"]
195
196         // Detect as many error conditions as possible before reading
197         // the body: avoid transmitting data that will not end up
198         // being written anyway.
199
200         if req.ContentLength == -1 {
201                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
202                 return
203         }
204
205         if req.ContentLength > BlockSize {
206                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
207                 return
208         }
209
210         if len(KeepVM.AllWritable()) == 0 {
211                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
212                 return
213         }
214
215         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
216         if err != nil {
217                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
218                 return
219         }
220
221         _, err = io.ReadFull(req.Body, buf)
222         if err != nil {
223                 http.Error(resp, err.Error(), 500)
224                 bufs.Put(buf)
225                 return
226         }
227
228         replication, err := PutBlock(ctx, buf, hash)
229         bufs.Put(buf)
230
231         if err != nil {
232                 code := http.StatusInternalServerError
233                 if err, ok := err.(*KeepError); ok {
234                         code = err.HTTPCode
235                 }
236                 http.Error(resp, err.Error(), code)
237                 return
238         }
239
240         // Success; add a size hint, sign the locator if possible, and
241         // return it to the client.
242         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
243         apiToken := GetAPIToken(req)
244         if theConfig.blobSigningKey != nil && apiToken != "" {
245                 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
246                 returnHash = SignLocator(returnHash, apiToken, expiry)
247         }
248         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
249         resp.Write([]byte(returnHash + "\n"))
250 }
251
252 // IndexHandler responds to "/index", "/index/{prefix}", and
253 // "/mounts/{uuid}/blocks" requests.
254 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
255         if !IsSystemAuth(GetAPIToken(req)) {
256                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
257                 return
258         }
259
260         prefix := mux.Vars(req)["prefix"]
261         if prefix == "" {
262                 req.ParseForm()
263                 prefix = req.Form.Get("prefix")
264         }
265
266         uuid := mux.Vars(req)["uuid"]
267
268         var vols []Volume
269         if uuid == "" {
270                 vols = KeepVM.AllReadable()
271         } else if v := KeepVM.Lookup(uuid, false); v == nil {
272                 http.Error(resp, "mount not found", http.StatusNotFound)
273                 return
274         } else {
275                 vols = []Volume{v}
276         }
277
278         for _, v := range vols {
279                 if err := v.IndexTo(prefix, resp); err != nil {
280                         // The only errors returned by IndexTo are
281                         // write errors returned by resp.Write(),
282                         // which probably means the client has
283                         // disconnected and this error will never be
284                         // reported to the client -- but it will
285                         // appear in our own error log.
286                         http.Error(resp, err.Error(), http.StatusInternalServerError)
287                         return
288                 }
289         }
290         // An empty line at EOF is the only way the client can be
291         // assured the entire index was received.
292         resp.Write([]byte{'\n'})
293 }
294
295 // MountsHandler responds to "GET /mounts" requests.
296 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
297         err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
298         if err != nil {
299                 http.Error(resp, err.Error(), http.StatusInternalServerError)
300         }
301 }
302
303 // PoolStatus struct
304 type PoolStatus struct {
305         Alloc uint64 `json:"BytesAllocatedCumulative"`
306         Cap   int    `json:"BuffersMax"`
307         Len   int    `json:"BuffersInUse"`
308 }
309
310 type volumeStatusEnt struct {
311         Label         string
312         Status        *VolumeStatus `json:",omitempty"`
313         VolumeStats   *ioStats      `json:",omitempty"`
314         InternalStats interface{}   `json:",omitempty"`
315 }
316
317 // NodeStatus struct
318 type NodeStatus struct {
319         Volumes         []*volumeStatusEnt
320         BufferPool      PoolStatus
321         PullQueue       WorkQueueStatus
322         TrashQueue      WorkQueueStatus
323         RequestsCurrent int
324         RequestsMax     int
325         Version         string
326 }
327
328 var st NodeStatus
329 var stLock sync.Mutex
330
331 // DebugHandler addresses /debug.json requests.
332 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
333         type debugStats struct {
334                 MemStats runtime.MemStats
335         }
336         var ds debugStats
337         runtime.ReadMemStats(&ds.MemStats)
338         err := json.NewEncoder(resp).Encode(&ds)
339         if err != nil {
340                 http.Error(resp, err.Error(), 500)
341         }
342 }
343
344 // StatusHandler addresses /status.json requests.
345 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
346         stLock.Lock()
347         rtr.readNodeStatus(&st)
348         jstat, err := json.Marshal(&st)
349         stLock.Unlock()
350         if err == nil {
351                 resp.Write(jstat)
352         } else {
353                 log.Printf("json.Marshal: %s", err)
354                 log.Printf("NodeStatus = %v", &st)
355                 http.Error(resp, err.Error(), 500)
356         }
357 }
358
359 // populate the given NodeStatus struct with current values.
360 func (rtr *router) readNodeStatus(st *NodeStatus) {
361         st.Version = version
362         vols := KeepVM.AllReadable()
363         if cap(st.Volumes) < len(vols) {
364                 st.Volumes = make([]*volumeStatusEnt, len(vols))
365         }
366         st.Volumes = st.Volumes[:0]
367         for _, vol := range vols {
368                 var internalStats interface{}
369                 if vol, ok := vol.(InternalStatser); ok {
370                         internalStats = vol.InternalStats()
371                 }
372                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
373                         Label:         vol.String(),
374                         Status:        vol.Status(),
375                         InternalStats: internalStats,
376                         //VolumeStats: KeepVM.VolumeStats(vol),
377                 })
378         }
379         st.BufferPool.Alloc = bufs.Alloc()
380         st.BufferPool.Cap = bufs.Cap()
381         st.BufferPool.Len = bufs.Len()
382         st.PullQueue = getWorkQueueStatus(pullq)
383         st.TrashQueue = getWorkQueueStatus(trashq)
384         if rtr.limiter != nil {
385                 st.RequestsCurrent = rtr.limiter.Current()
386                 st.RequestsMax = rtr.limiter.Max()
387         }
388 }
389
390 // return a WorkQueueStatus for the given queue. If q is nil (which
391 // should never happen except in test suites), return a zero status
392 // value instead of crashing.
393 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
394         if q == nil {
395                 // This should only happen during tests.
396                 return WorkQueueStatus{}
397         }
398         return q.Status()
399 }
400
401 // DeleteHandler processes DELETE requests.
402 //
403 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
404 // from all connected volumes.
405 //
406 // Only the Data Manager, or an Arvados admin with scope "all", are
407 // allowed to issue DELETE requests.  If a DELETE request is not
408 // authenticated or is issued by a non-admin user, the server returns
409 // a PermissionError.
410 //
411 // Upon receiving a valid request from an authorized user,
412 // DeleteHandler deletes all copies of the specified block on local
413 // writable volumes.
414 //
415 // Response format:
416 //
417 // If the requested blocks was not found on any volume, the response
418 // code is HTTP 404 Not Found.
419 //
420 // Otherwise, the response code is 200 OK, with a response body
421 // consisting of the JSON message
422 //
423 //    {"copies_deleted":d,"copies_failed":f}
424 //
425 // where d and f are integers representing the number of blocks that
426 // were successfully and unsuccessfully deleted.
427 //
428 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
429         hash := mux.Vars(req)["hash"]
430
431         // Confirm that this user is an admin and has a token with unlimited scope.
432         var tok = GetAPIToken(req)
433         if tok == "" || !CanDelete(tok) {
434                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
435                 return
436         }
437
438         if !theConfig.EnableDelete {
439                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
440                 return
441         }
442
443         // Delete copies of this block from all available volumes.
444         // Report how many blocks were successfully deleted, and how
445         // many were found on writable volumes but not deleted.
446         var result struct {
447                 Deleted int `json:"copies_deleted"`
448                 Failed  int `json:"copies_failed"`
449         }
450         for _, vol := range KeepVM.AllWritable() {
451                 if err := vol.Trash(hash); err == nil {
452                         result.Deleted++
453                 } else if os.IsNotExist(err) {
454                         continue
455                 } else {
456                         result.Failed++
457                         log.Println("DeleteHandler:", err)
458                 }
459         }
460
461         var st int
462
463         if result.Deleted == 0 && result.Failed == 0 {
464                 st = http.StatusNotFound
465         } else {
466                 st = http.StatusOK
467         }
468
469         resp.WriteHeader(st)
470
471         if st == http.StatusOK {
472                 if body, err := json.Marshal(result); err == nil {
473                         resp.Write(body)
474                 } else {
475                         log.Printf("json.Marshal: %s (result = %v)", err, result)
476                         http.Error(resp, err.Error(), 500)
477                 }
478         }
479 }
480
481 /* PullHandler processes "PUT /pull" requests for the data manager.
482    The request body is a JSON message containing a list of pull
483    requests in the following format:
484
485    [
486       {
487          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
488          "servers":[
489                         "keep0.qr1hi.arvadosapi.com:25107",
490                         "keep1.qr1hi.arvadosapi.com:25108"
491                  ]
492           },
493           {
494                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
495                  "servers":[
496                         "10.0.1.5:25107",
497                         "10.0.1.6:25107",
498                         "10.0.1.7:25108"
499                  ]
500           },
501           ...
502    ]
503
504    Each pull request in the list consists of a block locator string
505    and an ordered list of servers.  Keepstore should try to fetch the
506    block from each server in turn.
507
508    If the request has not been sent by the Data Manager, return 401
509    Unauthorized.
510
511    If the JSON unmarshalling fails, return 400 Bad Request.
512 */
513
514 // PullRequest consists of a block locator and an ordered list of servers
515 type PullRequest struct {
516         Locator string   `json:"locator"`
517         Servers []string `json:"servers"`
518
519         // Destination mount, or "" for "anywhere"
520         MountUUID string `json:"mount_uuid"`
521 }
522
523 // PullHandler processes "PUT /pull" requests for the data manager.
524 func PullHandler(resp http.ResponseWriter, req *http.Request) {
525         // Reject unauthorized requests.
526         if !IsSystemAuth(GetAPIToken(req)) {
527                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
528                 return
529         }
530
531         // Parse the request body.
532         var pr []PullRequest
533         r := json.NewDecoder(req.Body)
534         if err := r.Decode(&pr); err != nil {
535                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
536                 return
537         }
538
539         // We have a properly formatted pull list sent from the data
540         // manager.  Report success and send the list to the pull list
541         // manager for further handling.
542         resp.WriteHeader(http.StatusOK)
543         resp.Write([]byte(
544                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
545
546         plist := list.New()
547         for _, p := range pr {
548                 plist.PushBack(p)
549         }
550         pullq.ReplaceQueue(plist)
551 }
552
553 // TrashRequest consists of a block locator and its Mtime
554 type TrashRequest struct {
555         Locator    string `json:"locator"`
556         BlockMtime int64  `json:"block_mtime"`
557
558         // Target mount, or "" for "everywhere"
559         MountUUID string `json:"mount_uuid"`
560 }
561
562 // TrashHandler processes /trash requests.
563 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
564         // Reject unauthorized requests.
565         if !IsSystemAuth(GetAPIToken(req)) {
566                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
567                 return
568         }
569
570         // Parse the request body.
571         var trash []TrashRequest
572         r := json.NewDecoder(req.Body)
573         if err := r.Decode(&trash); err != nil {
574                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
575                 return
576         }
577
578         // We have a properly formatted trash list sent from the data
579         // manager.  Report success and send the list to the trash work
580         // queue for further handling.
581         resp.WriteHeader(http.StatusOK)
582         resp.Write([]byte(
583                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
584
585         tlist := list.New()
586         for _, t := range trash {
587                 tlist.PushBack(t)
588         }
589         trashq.ReplaceQueue(tlist)
590 }
591
592 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
593 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
594         // Reject unauthorized requests.
595         if !IsSystemAuth(GetAPIToken(req)) {
596                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
597                 return
598         }
599
600         hash := mux.Vars(req)["hash"]
601
602         if len(KeepVM.AllWritable()) == 0 {
603                 http.Error(resp, "No writable volumes", http.StatusNotFound)
604                 return
605         }
606
607         var untrashedOn, failedOn []string
608         var numNotFound int
609         for _, vol := range KeepVM.AllWritable() {
610                 err := vol.Untrash(hash)
611
612                 if os.IsNotExist(err) {
613                         numNotFound++
614                 } else if err != nil {
615                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
616                         failedOn = append(failedOn, vol.String())
617                 } else {
618                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
619                         untrashedOn = append(untrashedOn, vol.String())
620                 }
621         }
622
623         if numNotFound == len(KeepVM.AllWritable()) {
624                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
625                 return
626         }
627
628         if len(failedOn) == len(KeepVM.AllWritable()) {
629                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
630         } else {
631                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
632                 if len(failedOn) > 0 {
633                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
634                 }
635                 resp.Write([]byte(respBody))
636         }
637 }
638
639 // GetBlock and PutBlock implement lower-level code for handling
640 // blocks by rooting through volumes connected to the local machine.
641 // Once the handler has determined that system policy permits the
642 // request, it calls these methods to perform the actual operation.
643 //
644 // TODO(twp): this code would probably be better located in the
645 // VolumeManager interface. As an abstraction, the VolumeManager
646 // should be the only part of the code that cares about which volume a
647 // block is stored on, so it should be responsible for figuring out
648 // which volume to check for fetching blocks, storing blocks, etc.
649
650 // GetBlock fetches the block identified by "hash" into the provided
651 // buf, and returns the data size.
652 //
653 // If the block cannot be found on any volume, returns NotFoundError.
654 //
655 // If the block found does not have the correct MD5 hash, returns
656 // DiskHashError.
657 //
658 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
659         // Attempt to read the requested hash from a keep volume.
660         errorToCaller := NotFoundError
661
662         for _, vol := range KeepVM.AllReadable() {
663                 size, err := vol.Get(ctx, hash, buf)
664                 select {
665                 case <-ctx.Done():
666                         return 0, ErrClientDisconnect
667                 default:
668                 }
669                 if err != nil {
670                         // IsNotExist is an expected error and may be
671                         // ignored. All other errors are logged. In
672                         // any case we continue trying to read other
673                         // volumes. If all volumes report IsNotExist,
674                         // we return a NotFoundError.
675                         if !os.IsNotExist(err) {
676                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
677                         }
678                         continue
679                 }
680                 // Check the file checksum.
681                 //
682                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
683                 if filehash != hash {
684                         // TODO: Try harder to tell a sysadmin about
685                         // this.
686                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
687                                 vol, hash, filehash)
688                         errorToCaller = DiskHashError
689                         continue
690                 }
691                 if errorToCaller == DiskHashError {
692                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
693                                 vol, hash)
694                 }
695                 return size, nil
696         }
697         return 0, errorToCaller
698 }
699
700 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
701 //
702 // PutBlock(ctx, block, hash)
703 //   Stores the BLOCK (identified by the content id HASH) in Keep.
704 //
705 //   The MD5 checksum of the block must be identical to the content id HASH.
706 //   If not, an error is returned.
707 //
708 //   PutBlock stores the BLOCK on the first Keep volume with free space.
709 //   A failure code is returned to the user only if all volumes fail.
710 //
711 //   On success, PutBlock returns nil.
712 //   On failure, it returns a KeepError with one of the following codes:
713 //
714 //   500 Collision
715 //          A different block with the same hash already exists on this
716 //          Keep server.
717 //   422 MD5Fail
718 //          The MD5 hash of the BLOCK does not match the argument HASH.
719 //   503 Full
720 //          There was not enough space left in any Keep volume to store
721 //          the object.
722 //   500 Fail
723 //          The object could not be stored for some other reason (e.g.
724 //          all writes failed). The text of the error message should
725 //          provide as much detail as possible.
726 //
727 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
728         // Check that BLOCK's checksum matches HASH.
729         blockhash := fmt.Sprintf("%x", md5.Sum(block))
730         if blockhash != hash {
731                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
732                 return 0, RequestHashError
733         }
734
735         // If we already have this data, it's intact on disk, and we
736         // can update its timestamp, return success. If we have
737         // different data with the same hash, return failure.
738         if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
739                 return n, err
740         } else if ctx.Err() != nil {
741                 return 0, ErrClientDisconnect
742         }
743
744         // Choose a Keep volume to write to.
745         // If this volume fails, try all of the volumes in order.
746         if vol := KeepVM.NextWritable(); vol != nil {
747                 if err := vol.Put(ctx, hash, block); err == nil {
748                         return vol.Replication(), nil // success!
749                 }
750                 if ctx.Err() != nil {
751                         return 0, ErrClientDisconnect
752                 }
753         }
754
755         writables := KeepVM.AllWritable()
756         if len(writables) == 0 {
757                 log.Print("No writable volumes.")
758                 return 0, FullError
759         }
760
761         allFull := true
762         for _, vol := range writables {
763                 err := vol.Put(ctx, hash, block)
764                 if ctx.Err() != nil {
765                         return 0, ErrClientDisconnect
766                 }
767                 if err == nil {
768                         return vol.Replication(), nil // success!
769                 }
770                 if err != FullError {
771                         // The volume is not full but the
772                         // write did not succeed.  Report the
773                         // error and continue trying.
774                         allFull = false
775                         log.Printf("%s: Write(%s): %s", vol, hash, err)
776                 }
777         }
778
779         if allFull {
780                 log.Print("All volumes are full.")
781                 return 0, FullError
782         }
783         // Already logged the non-full errors.
784         return 0, GenericError
785 }
786
787 // CompareAndTouch returns the current replication level if one of the
788 // volumes already has the given content and it successfully updates
789 // the relevant block's modification time in order to protect it from
790 // premature garbage collection. Otherwise, it returns a non-nil
791 // error.
792 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
793         var bestErr error = NotFoundError
794         for _, vol := range KeepVM.AllWritable() {
795                 err := vol.Compare(ctx, hash, buf)
796                 if ctx.Err() != nil {
797                         return 0, ctx.Err()
798                 } else if err == CollisionError {
799                         // Stop if we have a block with same hash but
800                         // different content. (It will be impossible
801                         // to tell which one is wanted if we have
802                         // both, so there's no point writing it even
803                         // on a different volume.)
804                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
805                         return 0, err
806                 } else if os.IsNotExist(err) {
807                         // Block does not exist. This is the only
808                         // "normal" error: we don't log anything.
809                         continue
810                 } else if err != nil {
811                         // Couldn't open file, data is corrupt on
812                         // disk, etc.: log this abnormal condition,
813                         // and try the next volume.
814                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
815                         continue
816                 }
817                 if err := vol.Touch(hash); err != nil {
818                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
819                         bestErr = err
820                         continue
821                 }
822                 // Compare and Touch both worked --> done.
823                 return vol.Replication(), nil
824         }
825         return 0, bestErr
826 }
827
828 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
829
830 // IsValidLocator returns true if the specified string is a valid Keep locator.
831 //   When Keep is extended to support hash types other than MD5,
832 //   this should be updated to cover those as well.
833 //
834 func IsValidLocator(loc string) bool {
835         return validLocatorRe.MatchString(loc)
836 }
837
838 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
839
840 // GetAPIToken returns the OAuth2 token from the Authorization
841 // header of a HTTP request, or an empty string if no matching
842 // token is found.
843 func GetAPIToken(req *http.Request) string {
844         if auth, ok := req.Header["Authorization"]; ok {
845                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
846                         return match[2]
847                 }
848         }
849         return ""
850 }
851
852 // IsExpired returns true if the given Unix timestamp (expressed as a
853 // hexadecimal string) is in the past, or if timestampHex cannot be
854 // parsed as a hexadecimal string.
855 func IsExpired(timestampHex string) bool {
856         ts, err := strconv.ParseInt(timestampHex, 16, 0)
857         if err != nil {
858                 log.Printf("IsExpired: %s", err)
859                 return true
860         }
861         return time.Unix(ts, 0).Before(time.Now())
862 }
863
864 // CanDelete returns true if the user identified by apiToken is
865 // allowed to delete blocks.
866 func CanDelete(apiToken string) bool {
867         if apiToken == "" {
868                 return false
869         }
870         // Blocks may be deleted only when Keep has been configured with a
871         // data manager.
872         if IsSystemAuth(apiToken) {
873                 return true
874         }
875         // TODO(twp): look up apiToken with the API server
876         // return true if is_admin is true and if the token
877         // has unlimited scope
878         return false
879 }
880
881 // IsSystemAuth returns true if the given token is allowed to perform
882 // system level actions like deleting data.
883 func IsSystemAuth(token string) bool {
884         return token != "" && token == theConfig.systemAuthToken
885 }