13937: Export stats as prometheus metrics. (WIP)
[arvados.git] / services / keepstore / handlers.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "container/list"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "os"
16         "regexp"
17         "runtime"
18         "strconv"
19         "strings"
20         "sync"
21         "time"
22
23         "git.curoverse.com/arvados.git/sdk/go/arvados"
24         "git.curoverse.com/arvados.git/sdk/go/health"
25         "git.curoverse.com/arvados.git/sdk/go/httpserver"
26         "github.com/gorilla/mux"
27         "github.com/prometheus/client_golang/prometheus"
28 )
29
30 type router struct {
31         *mux.Router
32         limiter     httpserver.RequestCounter
33         cluster     *arvados.Cluster
34         remoteProxy remoteProxy
35         registry    *prometheus.Registry
36         metrics     nodeMetrics
37 }
38
39 // MakeRESTRouter returns a new router that forwards all Keep requests
40 // to the appropriate handlers.
41 func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
42         rtr := &router{
43                 Router:   mux.NewRouter(),
44                 cluster:  cluster,
45                 registry: prometheus.NewRegistry(),
46         }
47
48         rtr.HandleFunc(
49                 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
50         rtr.HandleFunc(
51                 `/{hash:[0-9a-f]{32}}+{hints}`,
52                 rtr.handleGET).Methods("GET", "HEAD")
53
54         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
55         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
56         // List all blocks stored here. Privileged client only.
57         rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
58         // List blocks stored here whose hash has the given prefix.
59         // Privileged client only.
60         rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
61
62         // Internals/debugging info (runtime.MemStats)
63         rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
64
65         // List volumes: path, device number, bytes used/avail.
66         rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
67
68         // List mounts: UUID, readonly, tier, device ID, ...
69         rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
70         rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
71         rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
72
73         // Replace the current pull queue.
74         rtr.HandleFunc(`/pull`, PullHandler).Methods("PUT")
75
76         // Replace the current trash queue.
77         rtr.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
78
79         // Untrash moves blocks from trash back into store
80         rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
81
82         rtr.Handle("/_health/{check}", &health.Handler{
83                 Token:  theConfig.ManagementToken,
84                 Prefix: "/_health/",
85         }).Methods("GET")
86
87         // Any request which does not match any of these routes gets
88         // 400 Bad Request.
89         rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
90
91         rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
92         rtr.metrics = nodeMetrics{
93                 reg: rtr.registry,
94                 rc:  rtr.limiter,
95         }
96         rtr.metrics.setup()
97
98         instrumented := httpserver.Instrument(rtr.registry, nil,
99                 httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter)))
100         return instrumented.ServeAPI(theConfig.ManagementToken, instrumented)
101 }
102
103 // BadRequestHandler is a HandleFunc to address bad requests.
104 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
105         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
106 }
107
108 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
109         ctx, cancel := contextForResponse(context.TODO(), resp)
110         defer cancel()
111
112         locator := req.URL.Path[1:]
113         if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
114                 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster)
115                 return
116         }
117
118         if theConfig.RequireSignatures {
119                 locator := req.URL.Path[1:] // strip leading slash
120                 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
121                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
122                         return
123                 }
124         }
125
126         // TODO: Probe volumes to check whether the block _might_
127         // exist. Some volumes/types could support a quick existence
128         // check without causing other operations to suffer. If all
129         // volumes support that, and assure us the block definitely
130         // isn't here, we can return 404 now instead of waiting for a
131         // buffer.
132
133         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
134         if err != nil {
135                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
136                 return
137         }
138         defer bufs.Put(buf)
139
140         size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
141         if err != nil {
142                 code := http.StatusInternalServerError
143                 if err, ok := err.(*KeepError); ok {
144                         code = err.HTTPCode
145                 }
146                 http.Error(resp, err.Error(), code)
147                 return
148         }
149
150         resp.Header().Set("Content-Length", strconv.Itoa(size))
151         resp.Header().Set("Content-Type", "application/octet-stream")
152         resp.Write(buf[:size])
153 }
154
155 // Return a new context that gets cancelled by resp's CloseNotifier.
156 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
157         ctx, cancel := context.WithCancel(parent)
158         if cn, ok := resp.(http.CloseNotifier); ok {
159                 go func(c <-chan bool) {
160                         select {
161                         case <-c:
162                                 theConfig.debugLogf("cancel context")
163                                 cancel()
164                         case <-ctx.Done():
165                         }
166                 }(cn.CloseNotify())
167         }
168         return ctx, cancel
169 }
170
171 // Get a buffer from the pool -- but give up and return a non-nil
172 // error if ctx ends before we get a buffer.
173 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
174         bufReady := make(chan []byte)
175         go func() {
176                 bufReady <- bufs.Get(bufSize)
177         }()
178         select {
179         case buf := <-bufReady:
180                 return buf, nil
181         case <-ctx.Done():
182                 go func() {
183                         // Even if closeNotifier happened first, we
184                         // need to keep waiting for our buf so we can
185                         // return it to the pool.
186                         bufs.Put(<-bufReady)
187                 }()
188                 return nil, ErrClientDisconnect
189         }
190 }
191
192 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
193         ctx, cancel := contextForResponse(context.TODO(), resp)
194         defer cancel()
195
196         hash := mux.Vars(req)["hash"]
197
198         // Detect as many error conditions as possible before reading
199         // the body: avoid transmitting data that will not end up
200         // being written anyway.
201
202         if req.ContentLength == -1 {
203                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
204                 return
205         }
206
207         if req.ContentLength > BlockSize {
208                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
209                 return
210         }
211
212         if len(KeepVM.AllWritable()) == 0 {
213                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
214                 return
215         }
216
217         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
218         if err != nil {
219                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
220                 return
221         }
222
223         _, err = io.ReadFull(req.Body, buf)
224         if err != nil {
225                 http.Error(resp, err.Error(), 500)
226                 bufs.Put(buf)
227                 return
228         }
229
230         replication, err := PutBlock(ctx, buf, hash)
231         bufs.Put(buf)
232
233         if err != nil {
234                 code := http.StatusInternalServerError
235                 if err, ok := err.(*KeepError); ok {
236                         code = err.HTTPCode
237                 }
238                 http.Error(resp, err.Error(), code)
239                 return
240         }
241
242         // Success; add a size hint, sign the locator if possible, and
243         // return it to the client.
244         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
245         apiToken := GetAPIToken(req)
246         if theConfig.blobSigningKey != nil && apiToken != "" {
247                 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
248                 returnHash = SignLocator(returnHash, apiToken, expiry)
249         }
250         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
251         resp.Write([]byte(returnHash + "\n"))
252 }
253
254 // IndexHandler responds to "/index", "/index/{prefix}", and
255 // "/mounts/{uuid}/blocks" requests.
256 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
257         if !IsSystemAuth(GetAPIToken(req)) {
258                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
259                 return
260         }
261
262         prefix := mux.Vars(req)["prefix"]
263         if prefix == "" {
264                 req.ParseForm()
265                 prefix = req.Form.Get("prefix")
266         }
267
268         uuid := mux.Vars(req)["uuid"]
269
270         var vols []Volume
271         if uuid == "" {
272                 vols = KeepVM.AllReadable()
273         } else if v := KeepVM.Lookup(uuid, false); v == nil {
274                 http.Error(resp, "mount not found", http.StatusNotFound)
275                 return
276         } else {
277                 vols = []Volume{v}
278         }
279
280         for _, v := range vols {
281                 if err := v.IndexTo(prefix, resp); err != nil {
282                         // The only errors returned by IndexTo are
283                         // write errors returned by resp.Write(),
284                         // which probably means the client has
285                         // disconnected and this error will never be
286                         // reported to the client -- but it will
287                         // appear in our own error log.
288                         http.Error(resp, err.Error(), http.StatusInternalServerError)
289                         return
290                 }
291         }
292         // An empty line at EOF is the only way the client can be
293         // assured the entire index was received.
294         resp.Write([]byte{'\n'})
295 }
296
297 // MountsHandler responds to "GET /mounts" requests.
298 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
299         err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
300         if err != nil {
301                 http.Error(resp, err.Error(), http.StatusInternalServerError)
302         }
303 }
304
305 // PoolStatus struct
306 type PoolStatus struct {
307         Alloc uint64 `json:"BytesAllocatedCumulative"`
308         Cap   int    `json:"BuffersMax"`
309         Len   int    `json:"BuffersInUse"`
310 }
311
312 type volumeStatusEnt struct {
313         Label         string
314         Status        *VolumeStatus `json:",omitempty"`
315         VolumeStats   *ioStats      `json:",omitempty"`
316         InternalStats interface{}   `json:",omitempty"`
317 }
318
319 // NodeStatus struct
320 type NodeStatus struct {
321         Volumes         []*volumeStatusEnt
322         BufferPool      PoolStatus
323         PullQueue       WorkQueueStatus
324         TrashQueue      WorkQueueStatus
325         RequestsCurrent int
326         RequestsMax     int
327         Version         string
328 }
329
330 var st NodeStatus
331 var stLock sync.Mutex
332
333 // DebugHandler addresses /debug.json requests.
334 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
335         type debugStats struct {
336                 MemStats runtime.MemStats
337         }
338         var ds debugStats
339         runtime.ReadMemStats(&ds.MemStats)
340         err := json.NewEncoder(resp).Encode(&ds)
341         if err != nil {
342                 http.Error(resp, err.Error(), 500)
343         }
344 }
345
346 // StatusHandler addresses /status.json requests.
347 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
348         stLock.Lock()
349         rtr.readNodeStatus(&st)
350         jstat, err := json.Marshal(&st)
351         stLock.Unlock()
352         if err == nil {
353                 resp.Write(jstat)
354         } else {
355                 log.Printf("json.Marshal: %s", err)
356                 log.Printf("NodeStatus = %v", &st)
357                 http.Error(resp, err.Error(), 500)
358         }
359 }
360
361 // populate the given NodeStatus struct with current values.
362 func (rtr *router) readNodeStatus(st *NodeStatus) {
363         st.Version = version
364         vols := KeepVM.AllReadable()
365         if cap(st.Volumes) < len(vols) {
366                 st.Volumes = make([]*volumeStatusEnt, len(vols))
367         }
368         st.Volumes = st.Volumes[:0]
369         for _, vol := range vols {
370                 var internalStats interface{}
371                 if vol, ok := vol.(InternalStatser); ok {
372                         internalStats = vol.InternalStats()
373                 }
374                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
375                         Label:         vol.String(),
376                         Status:        vol.Status(),
377                         InternalStats: internalStats,
378                         //VolumeStats: KeepVM.VolumeStats(vol),
379                 })
380         }
381         st.BufferPool.Alloc = bufs.Alloc()
382         st.BufferPool.Cap = bufs.Cap()
383         st.BufferPool.Len = bufs.Len()
384         st.PullQueue = getWorkQueueStatus(pullq)
385         st.TrashQueue = getWorkQueueStatus(trashq)
386         if rtr.limiter != nil {
387                 st.RequestsCurrent = rtr.limiter.Current()
388                 st.RequestsMax = rtr.limiter.Max()
389         }
390 }
391
392 // return a WorkQueueStatus for the given queue. If q is nil (which
393 // should never happen except in test suites), return a zero status
394 // value instead of crashing.
395 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
396         if q == nil {
397                 // This should only happen during tests.
398                 return WorkQueueStatus{}
399         }
400         return q.Status()
401 }
402
403 // DeleteHandler processes DELETE requests.
404 //
405 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
406 // from all connected volumes.
407 //
408 // Only the Data Manager, or an Arvados admin with scope "all", are
409 // allowed to issue DELETE requests.  If a DELETE request is not
410 // authenticated or is issued by a non-admin user, the server returns
411 // a PermissionError.
412 //
413 // Upon receiving a valid request from an authorized user,
414 // DeleteHandler deletes all copies of the specified block on local
415 // writable volumes.
416 //
417 // Response format:
418 //
419 // If the requested blocks was not found on any volume, the response
420 // code is HTTP 404 Not Found.
421 //
422 // Otherwise, the response code is 200 OK, with a response body
423 // consisting of the JSON message
424 //
425 //    {"copies_deleted":d,"copies_failed":f}
426 //
427 // where d and f are integers representing the number of blocks that
428 // were successfully and unsuccessfully deleted.
429 //
430 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
431         hash := mux.Vars(req)["hash"]
432
433         // Confirm that this user is an admin and has a token with unlimited scope.
434         var tok = GetAPIToken(req)
435         if tok == "" || !CanDelete(tok) {
436                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
437                 return
438         }
439
440         if !theConfig.EnableDelete {
441                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
442                 return
443         }
444
445         // Delete copies of this block from all available volumes.
446         // Report how many blocks were successfully deleted, and how
447         // many were found on writable volumes but not deleted.
448         var result struct {
449                 Deleted int `json:"copies_deleted"`
450                 Failed  int `json:"copies_failed"`
451         }
452         for _, vol := range KeepVM.AllWritable() {
453                 if err := vol.Trash(hash); err == nil {
454                         result.Deleted++
455                 } else if os.IsNotExist(err) {
456                         continue
457                 } else {
458                         result.Failed++
459                         log.Println("DeleteHandler:", err)
460                 }
461         }
462
463         var st int
464
465         if result.Deleted == 0 && result.Failed == 0 {
466                 st = http.StatusNotFound
467         } else {
468                 st = http.StatusOK
469         }
470
471         resp.WriteHeader(st)
472
473         if st == http.StatusOK {
474                 if body, err := json.Marshal(result); err == nil {
475                         resp.Write(body)
476                 } else {
477                         log.Printf("json.Marshal: %s (result = %v)", err, result)
478                         http.Error(resp, err.Error(), 500)
479                 }
480         }
481 }
482
483 /* PullHandler processes "PUT /pull" requests for the data manager.
484    The request body is a JSON message containing a list of pull
485    requests in the following format:
486
487    [
488       {
489          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
490          "servers":[
491                         "keep0.qr1hi.arvadosapi.com:25107",
492                         "keep1.qr1hi.arvadosapi.com:25108"
493                  ]
494           },
495           {
496                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
497                  "servers":[
498                         "10.0.1.5:25107",
499                         "10.0.1.6:25107",
500                         "10.0.1.7:25108"
501                  ]
502           },
503           ...
504    ]
505
506    Each pull request in the list consists of a block locator string
507    and an ordered list of servers.  Keepstore should try to fetch the
508    block from each server in turn.
509
510    If the request has not been sent by the Data Manager, return 401
511    Unauthorized.
512
513    If the JSON unmarshalling fails, return 400 Bad Request.
514 */
515
516 // PullRequest consists of a block locator and an ordered list of servers
517 type PullRequest struct {
518         Locator string   `json:"locator"`
519         Servers []string `json:"servers"`
520
521         // Destination mount, or "" for "anywhere"
522         MountUUID string `json:"mount_uuid"`
523 }
524
525 // PullHandler processes "PUT /pull" requests for the data manager.
526 func PullHandler(resp http.ResponseWriter, req *http.Request) {
527         // Reject unauthorized requests.
528         if !IsSystemAuth(GetAPIToken(req)) {
529                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
530                 return
531         }
532
533         // Parse the request body.
534         var pr []PullRequest
535         r := json.NewDecoder(req.Body)
536         if err := r.Decode(&pr); err != nil {
537                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
538                 return
539         }
540
541         // We have a properly formatted pull list sent from the data
542         // manager.  Report success and send the list to the pull list
543         // manager for further handling.
544         resp.WriteHeader(http.StatusOK)
545         resp.Write([]byte(
546                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
547
548         plist := list.New()
549         for _, p := range pr {
550                 plist.PushBack(p)
551         }
552         pullq.ReplaceQueue(plist)
553 }
554
555 // TrashRequest consists of a block locator and its Mtime
556 type TrashRequest struct {
557         Locator    string `json:"locator"`
558         BlockMtime int64  `json:"block_mtime"`
559
560         // Target mount, or "" for "everywhere"
561         MountUUID string `json:"mount_uuid"`
562 }
563
564 // TrashHandler processes /trash requests.
565 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
566         // Reject unauthorized requests.
567         if !IsSystemAuth(GetAPIToken(req)) {
568                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
569                 return
570         }
571
572         // Parse the request body.
573         var trash []TrashRequest
574         r := json.NewDecoder(req.Body)
575         if err := r.Decode(&trash); err != nil {
576                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
577                 return
578         }
579
580         // We have a properly formatted trash list sent from the data
581         // manager.  Report success and send the list to the trash work
582         // queue for further handling.
583         resp.WriteHeader(http.StatusOK)
584         resp.Write([]byte(
585                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
586
587         tlist := list.New()
588         for _, t := range trash {
589                 tlist.PushBack(t)
590         }
591         trashq.ReplaceQueue(tlist)
592 }
593
594 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
595 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
596         // Reject unauthorized requests.
597         if !IsSystemAuth(GetAPIToken(req)) {
598                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
599                 return
600         }
601
602         hash := mux.Vars(req)["hash"]
603
604         if len(KeepVM.AllWritable()) == 0 {
605                 http.Error(resp, "No writable volumes", http.StatusNotFound)
606                 return
607         }
608
609         var untrashedOn, failedOn []string
610         var numNotFound int
611         for _, vol := range KeepVM.AllWritable() {
612                 err := vol.Untrash(hash)
613
614                 if os.IsNotExist(err) {
615                         numNotFound++
616                 } else if err != nil {
617                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
618                         failedOn = append(failedOn, vol.String())
619                 } else {
620                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
621                         untrashedOn = append(untrashedOn, vol.String())
622                 }
623         }
624
625         if numNotFound == len(KeepVM.AllWritable()) {
626                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
627                 return
628         }
629
630         if len(failedOn) == len(KeepVM.AllWritable()) {
631                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
632         } else {
633                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
634                 if len(failedOn) > 0 {
635                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
636                 }
637                 resp.Write([]byte(respBody))
638         }
639 }
640
641 // GetBlock and PutBlock implement lower-level code for handling
642 // blocks by rooting through volumes connected to the local machine.
643 // Once the handler has determined that system policy permits the
644 // request, it calls these methods to perform the actual operation.
645 //
646 // TODO(twp): this code would probably be better located in the
647 // VolumeManager interface. As an abstraction, the VolumeManager
648 // should be the only part of the code that cares about which volume a
649 // block is stored on, so it should be responsible for figuring out
650 // which volume to check for fetching blocks, storing blocks, etc.
651
652 // GetBlock fetches the block identified by "hash" into the provided
653 // buf, and returns the data size.
654 //
655 // If the block cannot be found on any volume, returns NotFoundError.
656 //
657 // If the block found does not have the correct MD5 hash, returns
658 // DiskHashError.
659 //
660 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
661         // Attempt to read the requested hash from a keep volume.
662         errorToCaller := NotFoundError
663
664         for _, vol := range KeepVM.AllReadable() {
665                 size, err := vol.Get(ctx, hash, buf)
666                 select {
667                 case <-ctx.Done():
668                         return 0, ErrClientDisconnect
669                 default:
670                 }
671                 if err != nil {
672                         // IsNotExist is an expected error and may be
673                         // ignored. All other errors are logged. In
674                         // any case we continue trying to read other
675                         // volumes. If all volumes report IsNotExist,
676                         // we return a NotFoundError.
677                         if !os.IsNotExist(err) {
678                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
679                         }
680                         continue
681                 }
682                 // Check the file checksum.
683                 //
684                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
685                 if filehash != hash {
686                         // TODO: Try harder to tell a sysadmin about
687                         // this.
688                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
689                                 vol, hash, filehash)
690                         errorToCaller = DiskHashError
691                         continue
692                 }
693                 if errorToCaller == DiskHashError {
694                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
695                                 vol, hash)
696                 }
697                 return size, nil
698         }
699         return 0, errorToCaller
700 }
701
702 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
703 //
704 // PutBlock(ctx, block, hash)
705 //   Stores the BLOCK (identified by the content id HASH) in Keep.
706 //
707 //   The MD5 checksum of the block must be identical to the content id HASH.
708 //   If not, an error is returned.
709 //
710 //   PutBlock stores the BLOCK on the first Keep volume with free space.
711 //   A failure code is returned to the user only if all volumes fail.
712 //
713 //   On success, PutBlock returns nil.
714 //   On failure, it returns a KeepError with one of the following codes:
715 //
716 //   500 Collision
717 //          A different block with the same hash already exists on this
718 //          Keep server.
719 //   422 MD5Fail
720 //          The MD5 hash of the BLOCK does not match the argument HASH.
721 //   503 Full
722 //          There was not enough space left in any Keep volume to store
723 //          the object.
724 //   500 Fail
725 //          The object could not be stored for some other reason (e.g.
726 //          all writes failed). The text of the error message should
727 //          provide as much detail as possible.
728 //
729 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
730         // Check that BLOCK's checksum matches HASH.
731         blockhash := fmt.Sprintf("%x", md5.Sum(block))
732         if blockhash != hash {
733                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
734                 return 0, RequestHashError
735         }
736
737         // If we already have this data, it's intact on disk, and we
738         // can update its timestamp, return success. If we have
739         // different data with the same hash, return failure.
740         if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
741                 return n, err
742         } else if ctx.Err() != nil {
743                 return 0, ErrClientDisconnect
744         }
745
746         // Choose a Keep volume to write to.
747         // If this volume fails, try all of the volumes in order.
748         if vol := KeepVM.NextWritable(); vol != nil {
749                 if err := vol.Put(ctx, hash, block); err == nil {
750                         return vol.Replication(), nil // success!
751                 }
752                 if ctx.Err() != nil {
753                         return 0, ErrClientDisconnect
754                 }
755         }
756
757         writables := KeepVM.AllWritable()
758         if len(writables) == 0 {
759                 log.Print("No writable volumes.")
760                 return 0, FullError
761         }
762
763         allFull := true
764         for _, vol := range writables {
765                 err := vol.Put(ctx, hash, block)
766                 if ctx.Err() != nil {
767                         return 0, ErrClientDisconnect
768                 }
769                 if err == nil {
770                         return vol.Replication(), nil // success!
771                 }
772                 if err != FullError {
773                         // The volume is not full but the
774                         // write did not succeed.  Report the
775                         // error and continue trying.
776                         allFull = false
777                         log.Printf("%s: Write(%s): %s", vol, hash, err)
778                 }
779         }
780
781         if allFull {
782                 log.Print("All volumes are full.")
783                 return 0, FullError
784         }
785         // Already logged the non-full errors.
786         return 0, GenericError
787 }
788
789 // CompareAndTouch returns the current replication level if one of the
790 // volumes already has the given content and it successfully updates
791 // the relevant block's modification time in order to protect it from
792 // premature garbage collection. Otherwise, it returns a non-nil
793 // error.
794 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
795         var bestErr error = NotFoundError
796         for _, vol := range KeepVM.AllWritable() {
797                 err := vol.Compare(ctx, hash, buf)
798                 if ctx.Err() != nil {
799                         return 0, ctx.Err()
800                 } else if err == CollisionError {
801                         // Stop if we have a block with same hash but
802                         // different content. (It will be impossible
803                         // to tell which one is wanted if we have
804                         // both, so there's no point writing it even
805                         // on a different volume.)
806                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
807                         return 0, err
808                 } else if os.IsNotExist(err) {
809                         // Block does not exist. This is the only
810                         // "normal" error: we don't log anything.
811                         continue
812                 } else if err != nil {
813                         // Couldn't open file, data is corrupt on
814                         // disk, etc.: log this abnormal condition,
815                         // and try the next volume.
816                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
817                         continue
818                 }
819                 if err := vol.Touch(hash); err != nil {
820                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
821                         bestErr = err
822                         continue
823                 }
824                 // Compare and Touch both worked --> done.
825                 return vol.Replication(), nil
826         }
827         return 0, bestErr
828 }
829
830 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
831
832 // IsValidLocator returns true if the specified string is a valid Keep locator.
833 //   When Keep is extended to support hash types other than MD5,
834 //   this should be updated to cover those as well.
835 //
836 func IsValidLocator(loc string) bool {
837         return validLocatorRe.MatchString(loc)
838 }
839
840 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
841
842 // GetAPIToken returns the OAuth2 token from the Authorization
843 // header of a HTTP request, or an empty string if no matching
844 // token is found.
845 func GetAPIToken(req *http.Request) string {
846         if auth, ok := req.Header["Authorization"]; ok {
847                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
848                         return match[2]
849                 }
850         }
851         return ""
852 }
853
854 // IsExpired returns true if the given Unix timestamp (expressed as a
855 // hexadecimal string) is in the past, or if timestampHex cannot be
856 // parsed as a hexadecimal string.
857 func IsExpired(timestampHex string) bool {
858         ts, err := strconv.ParseInt(timestampHex, 16, 0)
859         if err != nil {
860                 log.Printf("IsExpired: %s", err)
861                 return true
862         }
863         return time.Unix(ts, 0).Before(time.Now())
864 }
865
866 // CanDelete returns true if the user identified by apiToken is
867 // allowed to delete blocks.
868 func CanDelete(apiToken string) bool {
869         if apiToken == "" {
870                 return false
871         }
872         // Blocks may be deleted only when Keep has been configured with a
873         // data manager.
874         if IsSystemAuth(apiToken) {
875                 return true
876         }
877         // TODO(twp): look up apiToken with the API server
878         // return true if is_admin is true and if the token
879         // has unlimited scope
880         return false
881 }
882
883 // IsSystemAuth returns true if the given token is allowed to perform
884 // system level actions like deleting data.
885 func IsSystemAuth(token string) bool {
886         return token != "" && token == theConfig.systemAuthToken
887 }