10666: Added Version information to status.json on keep-web,
[arvados.git] / services / keepstore / handlers.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 // REST handlers for Keep are implemented here.
8 //
9 // GetBlockHandler (GET /locator)
10 // PutBlockHandler (PUT /locator)
11 // IndexHandler    (GET /index, GET /index/prefix)
12 // StatusHandler   (GET /status.json)
13
14 import (
15         "container/list"
16         "context"
17         "crypto/md5"
18         "encoding/json"
19         "fmt"
20         "io"
21         "net/http"
22         "os"
23         "regexp"
24         "runtime"
25         "strconv"
26         "strings"
27         "sync"
28         "time"
29
30         "github.com/gorilla/mux"
31
32         "git.curoverse.com/arvados.git/sdk/go/health"
33         "git.curoverse.com/arvados.git/sdk/go/httpserver"
34         arvadosVersion "git.curoverse.com/arvados.git/sdk/go/version"
35         log "github.com/Sirupsen/logrus"
36 )
37
38 type router struct {
39         *mux.Router
40         limiter httpserver.RequestCounter
41 }
42
43 // MakeRESTRouter returns a new router that forwards all Keep requests
44 // to the appropriate handlers.
45 func MakeRESTRouter() *router {
46         rest := mux.NewRouter()
47         rtr := &router{Router: rest}
48
49         rest.HandleFunc(
50                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
51         rest.HandleFunc(
52                 `/{hash:[0-9a-f]{32}}+{hints}`,
53                 GetBlockHandler).Methods("GET", "HEAD")
54
55         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
56         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
57         // List all blocks stored here. Privileged client only.
58         rest.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
59         // List blocks stored here whose hash has the given prefix.
60         // Privileged client only.
61         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
62
63         // Internals/debugging info (runtime.MemStats)
64         rest.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
65
66         // List volumes: path, device number, bytes used/avail.
67         rest.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
68
69         // List mounts: UUID, readonly, tier, device ID, ...
70         rest.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
71         rest.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
72         rest.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
73
74         // Replace the current pull queue.
75         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
76
77         // Replace the current trash queue.
78         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
79
80         // Untrash moves blocks from trash back into store
81         rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
82
83         rest.Handle("/_health/{check}", &health.Handler{
84                 Token:  theConfig.ManagementToken,
85                 Prefix: "/_health/",
86         }).Methods("GET")
87
88         // Any request which does not match any of these routes gets
89         // 400 Bad Request.
90         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
91
92         return rtr
93 }
94
95 // BadRequestHandler is a HandleFunc to address bad requests.
96 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
97         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
98 }
99
100 // GetBlockHandler is a HandleFunc to address Get block requests.
101 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
102         ctx, cancel := contextForResponse(context.TODO(), resp)
103         defer cancel()
104
105         if theConfig.RequireSignatures {
106                 locator := req.URL.Path[1:] // strip leading slash
107                 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
108                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
109                         return
110                 }
111         }
112
113         // TODO: Probe volumes to check whether the block _might_
114         // exist. Some volumes/types could support a quick existence
115         // check without causing other operations to suffer. If all
116         // volumes support that, and assure us the block definitely
117         // isn't here, we can return 404 now instead of waiting for a
118         // buffer.
119
120         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
121         if err != nil {
122                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
123                 return
124         }
125         defer bufs.Put(buf)
126
127         size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
128         if err != nil {
129                 code := http.StatusInternalServerError
130                 if err, ok := err.(*KeepError); ok {
131                         code = err.HTTPCode
132                 }
133                 http.Error(resp, err.Error(), code)
134                 return
135         }
136
137         resp.Header().Set("Content-Length", strconv.Itoa(size))
138         resp.Header().Set("Content-Type", "application/octet-stream")
139         resp.Write(buf[:size])
140 }
141
142 // Return a new context that gets cancelled by resp's CloseNotifier.
143 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
144         ctx, cancel := context.WithCancel(parent)
145         if cn, ok := resp.(http.CloseNotifier); ok {
146                 go func(c <-chan bool) {
147                         select {
148                         case <-c:
149                                 theConfig.debugLogf("cancel context")
150                                 cancel()
151                         case <-ctx.Done():
152                         }
153                 }(cn.CloseNotify())
154         }
155         return ctx, cancel
156 }
157
158 // Get a buffer from the pool -- but give up and return a non-nil
159 // error if ctx ends before we get a buffer.
160 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
161         bufReady := make(chan []byte)
162         go func() {
163                 bufReady <- bufs.Get(bufSize)
164         }()
165         select {
166         case buf := <-bufReady:
167                 return buf, nil
168         case <-ctx.Done():
169                 go func() {
170                         // Even if closeNotifier happened first, we
171                         // need to keep waiting for our buf so we can
172                         // return it to the pool.
173                         bufs.Put(<-bufReady)
174                 }()
175                 return nil, ErrClientDisconnect
176         }
177 }
178
179 // PutBlockHandler is a HandleFunc to address Put block requests.
180 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
181         ctx, cancel := contextForResponse(context.TODO(), resp)
182         defer cancel()
183
184         hash := mux.Vars(req)["hash"]
185
186         // Detect as many error conditions as possible before reading
187         // the body: avoid transmitting data that will not end up
188         // being written anyway.
189
190         if req.ContentLength == -1 {
191                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
192                 return
193         }
194
195         if req.ContentLength > BlockSize {
196                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
197                 return
198         }
199
200         if len(KeepVM.AllWritable()) == 0 {
201                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
202                 return
203         }
204
205         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
206         if err != nil {
207                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
208                 return
209         }
210
211         _, err = io.ReadFull(req.Body, buf)
212         if err != nil {
213                 http.Error(resp, err.Error(), 500)
214                 bufs.Put(buf)
215                 return
216         }
217
218         replication, err := PutBlock(ctx, buf, hash)
219         bufs.Put(buf)
220
221         if err != nil {
222                 code := http.StatusInternalServerError
223                 if err, ok := err.(*KeepError); ok {
224                         code = err.HTTPCode
225                 }
226                 http.Error(resp, err.Error(), code)
227                 return
228         }
229
230         // Success; add a size hint, sign the locator if possible, and
231         // return it to the client.
232         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
233         apiToken := GetAPIToken(req)
234         if theConfig.blobSigningKey != nil && apiToken != "" {
235                 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
236                 returnHash = SignLocator(returnHash, apiToken, expiry)
237         }
238         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
239         resp.Write([]byte(returnHash + "\n"))
240 }
241
242 // IndexHandler responds to "/index", "/index/{prefix}", and
243 // "/mounts/{uuid}/blocks" requests.
244 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
245         if !IsSystemAuth(GetAPIToken(req)) {
246                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
247                 return
248         }
249
250         prefix := mux.Vars(req)["prefix"]
251         if prefix == "" {
252                 req.ParseForm()
253                 prefix = req.Form.Get("prefix")
254         }
255
256         uuid := mux.Vars(req)["uuid"]
257
258         var vols []Volume
259         if uuid == "" {
260                 vols = KeepVM.AllReadable()
261         } else if v := KeepVM.Lookup(uuid, false); v == nil {
262                 http.Error(resp, "mount not found", http.StatusNotFound)
263                 return
264         } else {
265                 vols = []Volume{v}
266         }
267
268         for _, v := range vols {
269                 if err := v.IndexTo(prefix, resp); err != nil {
270                         // The only errors returned by IndexTo are
271                         // write errors returned by resp.Write(),
272                         // which probably means the client has
273                         // disconnected and this error will never be
274                         // reported to the client -- but it will
275                         // appear in our own error log.
276                         http.Error(resp, err.Error(), http.StatusInternalServerError)
277                         return
278                 }
279         }
280         // An empty line at EOF is the only way the client can be
281         // assured the entire index was received.
282         resp.Write([]byte{'\n'})
283 }
284
285 // MountsHandler responds to "GET /mounts" requests.
286 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
287         err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
288         if err != nil {
289                 http.Error(resp, err.Error(), http.StatusInternalServerError)
290         }
291 }
292
293 // PoolStatus struct
294 type PoolStatus struct {
295         Alloc uint64 `json:"BytesAllocatedCumulative"`
296         Cap   int    `json:"BuffersMax"`
297         Len   int    `json:"BuffersInUse"`
298 }
299
300 type volumeStatusEnt struct {
301         Label         string
302         Status        *VolumeStatus `json:",omitempty"`
303         VolumeStats   *ioStats      `json:",omitempty"`
304         InternalStats interface{}   `json:",omitempty"`
305 }
306
307 // NodeStatus struct
308 type NodeStatus struct {
309         Volumes         []*volumeStatusEnt
310         BufferPool      PoolStatus
311         PullQueue       WorkQueueStatus
312         TrashQueue      WorkQueueStatus
313         RequestsCurrent int
314         RequestsMax     int
315         Version         string
316 }
317
318 var st NodeStatus
319 var stLock sync.Mutex
320
321 // DebugHandler addresses /debug.json requests.
322 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
323         type debugStats struct {
324                 MemStats runtime.MemStats
325         }
326         var ds debugStats
327         runtime.ReadMemStats(&ds.MemStats)
328         err := json.NewEncoder(resp).Encode(&ds)
329         if err != nil {
330                 http.Error(resp, err.Error(), 500)
331         }
332 }
333
334 // StatusHandler addresses /status.json requests.
335 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
336         stLock.Lock()
337         rtr.readNodeStatus(&st)
338         jstat, err := json.Marshal(&st)
339         stLock.Unlock()
340         if err == nil {
341                 resp.Write(jstat)
342         } else {
343                 log.Printf("json.Marshal: %s", err)
344                 log.Printf("NodeStatus = %v", &st)
345                 http.Error(resp, err.Error(), 500)
346         }
347 }
348
349 // populate the given NodeStatus struct with current values.
350 func (rtr *router) readNodeStatus(st *NodeStatus) {
351         st.Version = arvadosVersion.GetVersion()
352         vols := KeepVM.AllReadable()
353         if cap(st.Volumes) < len(vols) {
354                 st.Volumes = make([]*volumeStatusEnt, len(vols))
355         }
356         st.Volumes = st.Volumes[:0]
357         for _, vol := range vols {
358                 var internalStats interface{}
359                 if vol, ok := vol.(InternalStatser); ok {
360                         internalStats = vol.InternalStats()
361                 }
362                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
363                         Label:         vol.String(),
364                         Status:        vol.Status(),
365                         InternalStats: internalStats,
366                         //VolumeStats: KeepVM.VolumeStats(vol),
367                 })
368         }
369         st.BufferPool.Alloc = bufs.Alloc()
370         st.BufferPool.Cap = bufs.Cap()
371         st.BufferPool.Len = bufs.Len()
372         st.PullQueue = getWorkQueueStatus(pullq)
373         st.TrashQueue = getWorkQueueStatus(trashq)
374         if rtr.limiter != nil {
375                 st.RequestsCurrent = rtr.limiter.Current()
376                 st.RequestsMax = rtr.limiter.Max()
377         }
378 }
379
380 // return a WorkQueueStatus for the given queue. If q is nil (which
381 // should never happen except in test suites), return a zero status
382 // value instead of crashing.
383 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
384         if q == nil {
385                 // This should only happen during tests.
386                 return WorkQueueStatus{}
387         }
388         return q.Status()
389 }
390
391 // DeleteHandler processes DELETE requests.
392 //
393 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
394 // from all connected volumes.
395 //
396 // Only the Data Manager, or an Arvados admin with scope "all", are
397 // allowed to issue DELETE requests.  If a DELETE request is not
398 // authenticated or is issued by a non-admin user, the server returns
399 // a PermissionError.
400 //
401 // Upon receiving a valid request from an authorized user,
402 // DeleteHandler deletes all copies of the specified block on local
403 // writable volumes.
404 //
405 // Response format:
406 //
407 // If the requested blocks was not found on any volume, the response
408 // code is HTTP 404 Not Found.
409 //
410 // Otherwise, the response code is 200 OK, with a response body
411 // consisting of the JSON message
412 //
413 //    {"copies_deleted":d,"copies_failed":f}
414 //
415 // where d and f are integers representing the number of blocks that
416 // were successfully and unsuccessfully deleted.
417 //
418 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
419         hash := mux.Vars(req)["hash"]
420
421         // Confirm that this user is an admin and has a token with unlimited scope.
422         var tok = GetAPIToken(req)
423         if tok == "" || !CanDelete(tok) {
424                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
425                 return
426         }
427
428         if !theConfig.EnableDelete {
429                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
430                 return
431         }
432
433         // Delete copies of this block from all available volumes.
434         // Report how many blocks were successfully deleted, and how
435         // many were found on writable volumes but not deleted.
436         var result struct {
437                 Deleted int `json:"copies_deleted"`
438                 Failed  int `json:"copies_failed"`
439         }
440         for _, vol := range KeepVM.AllWritable() {
441                 if err := vol.Trash(hash); err == nil {
442                         result.Deleted++
443                 } else if os.IsNotExist(err) {
444                         continue
445                 } else {
446                         result.Failed++
447                         log.Println("DeleteHandler:", err)
448                 }
449         }
450
451         var st int
452
453         if result.Deleted == 0 && result.Failed == 0 {
454                 st = http.StatusNotFound
455         } else {
456                 st = http.StatusOK
457         }
458
459         resp.WriteHeader(st)
460
461         if st == http.StatusOK {
462                 if body, err := json.Marshal(result); err == nil {
463                         resp.Write(body)
464                 } else {
465                         log.Printf("json.Marshal: %s (result = %v)", err, result)
466                         http.Error(resp, err.Error(), 500)
467                 }
468         }
469 }
470
471 /* PullHandler processes "PUT /pull" requests for the data manager.
472    The request body is a JSON message containing a list of pull
473    requests in the following format:
474
475    [
476       {
477          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
478          "servers":[
479                         "keep0.qr1hi.arvadosapi.com:25107",
480                         "keep1.qr1hi.arvadosapi.com:25108"
481                  ]
482           },
483           {
484                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
485                  "servers":[
486                         "10.0.1.5:25107",
487                         "10.0.1.6:25107",
488                         "10.0.1.7:25108"
489                  ]
490           },
491           ...
492    ]
493
494    Each pull request in the list consists of a block locator string
495    and an ordered list of servers.  Keepstore should try to fetch the
496    block from each server in turn.
497
498    If the request has not been sent by the Data Manager, return 401
499    Unauthorized.
500
501    If the JSON unmarshalling fails, return 400 Bad Request.
502 */
503
504 // PullRequest consists of a block locator and an ordered list of servers
505 type PullRequest struct {
506         Locator string   `json:"locator"`
507         Servers []string `json:"servers"`
508
509         // Destination mount, or "" for "anywhere"
510         MountUUID string
511 }
512
513 // PullHandler processes "PUT /pull" requests for the data manager.
514 func PullHandler(resp http.ResponseWriter, req *http.Request) {
515         // Reject unauthorized requests.
516         if !IsSystemAuth(GetAPIToken(req)) {
517                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
518                 return
519         }
520
521         // Parse the request body.
522         var pr []PullRequest
523         r := json.NewDecoder(req.Body)
524         if err := r.Decode(&pr); err != nil {
525                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
526                 return
527         }
528
529         // We have a properly formatted pull list sent from the data
530         // manager.  Report success and send the list to the pull list
531         // manager for further handling.
532         resp.WriteHeader(http.StatusOK)
533         resp.Write([]byte(
534                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
535
536         plist := list.New()
537         for _, p := range pr {
538                 plist.PushBack(p)
539         }
540         pullq.ReplaceQueue(plist)
541 }
542
543 // TrashRequest consists of a block locator and it's Mtime
544 type TrashRequest struct {
545         Locator    string `json:"locator"`
546         BlockMtime int64  `json:"block_mtime"`
547
548         // Target mount, or "" for "everywhere"
549         MountUUID string
550 }
551
552 // TrashHandler processes /trash requests.
553 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
554         // Reject unauthorized requests.
555         if !IsSystemAuth(GetAPIToken(req)) {
556                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
557                 return
558         }
559
560         // Parse the request body.
561         var trash []TrashRequest
562         r := json.NewDecoder(req.Body)
563         if err := r.Decode(&trash); err != nil {
564                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
565                 return
566         }
567
568         // We have a properly formatted trash list sent from the data
569         // manager.  Report success and send the list to the trash work
570         // queue for further handling.
571         resp.WriteHeader(http.StatusOK)
572         resp.Write([]byte(
573                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
574
575         tlist := list.New()
576         for _, t := range trash {
577                 tlist.PushBack(t)
578         }
579         trashq.ReplaceQueue(tlist)
580 }
581
582 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
583 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
584         // Reject unauthorized requests.
585         if !IsSystemAuth(GetAPIToken(req)) {
586                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
587                 return
588         }
589
590         hash := mux.Vars(req)["hash"]
591
592         if len(KeepVM.AllWritable()) == 0 {
593                 http.Error(resp, "No writable volumes", http.StatusNotFound)
594                 return
595         }
596
597         var untrashedOn, failedOn []string
598         var numNotFound int
599         for _, vol := range KeepVM.AllWritable() {
600                 err := vol.Untrash(hash)
601
602                 if os.IsNotExist(err) {
603                         numNotFound++
604                 } else if err != nil {
605                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
606                         failedOn = append(failedOn, vol.String())
607                 } else {
608                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
609                         untrashedOn = append(untrashedOn, vol.String())
610                 }
611         }
612
613         if numNotFound == len(KeepVM.AllWritable()) {
614                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
615                 return
616         }
617
618         if len(failedOn) == len(KeepVM.AllWritable()) {
619                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
620         } else {
621                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
622                 if len(failedOn) > 0 {
623                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
624                 }
625                 resp.Write([]byte(respBody))
626         }
627 }
628
629 // GetBlock and PutBlock implement lower-level code for handling
630 // blocks by rooting through volumes connected to the local machine.
631 // Once the handler has determined that system policy permits the
632 // request, it calls these methods to perform the actual operation.
633 //
634 // TODO(twp): this code would probably be better located in the
635 // VolumeManager interface. As an abstraction, the VolumeManager
636 // should be the only part of the code that cares about which volume a
637 // block is stored on, so it should be responsible for figuring out
638 // which volume to check for fetching blocks, storing blocks, etc.
639
640 // GetBlock fetches the block identified by "hash" into the provided
641 // buf, and returns the data size.
642 //
643 // If the block cannot be found on any volume, returns NotFoundError.
644 //
645 // If the block found does not have the correct MD5 hash, returns
646 // DiskHashError.
647 //
648 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
649         // Attempt to read the requested hash from a keep volume.
650         errorToCaller := NotFoundError
651
652         for _, vol := range KeepVM.AllReadable() {
653                 size, err := vol.Get(ctx, hash, buf)
654                 select {
655                 case <-ctx.Done():
656                         return 0, ErrClientDisconnect
657                 default:
658                 }
659                 if err != nil {
660                         // IsNotExist is an expected error and may be
661                         // ignored. All other errors are logged. In
662                         // any case we continue trying to read other
663                         // volumes. If all volumes report IsNotExist,
664                         // we return a NotFoundError.
665                         if !os.IsNotExist(err) {
666                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
667                         }
668                         continue
669                 }
670                 // Check the file checksum.
671                 //
672                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
673                 if filehash != hash {
674                         // TODO: Try harder to tell a sysadmin about
675                         // this.
676                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
677                                 vol, hash, filehash)
678                         errorToCaller = DiskHashError
679                         continue
680                 }
681                 if errorToCaller == DiskHashError {
682                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
683                                 vol, hash)
684                 }
685                 return size, nil
686         }
687         return 0, errorToCaller
688 }
689
690 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
691 //
692 // PutBlock(ctx, block, hash)
693 //   Stores the BLOCK (identified by the content id HASH) in Keep.
694 //
695 //   The MD5 checksum of the block must be identical to the content id HASH.
696 //   If not, an error is returned.
697 //
698 //   PutBlock stores the BLOCK on the first Keep volume with free space.
699 //   A failure code is returned to the user only if all volumes fail.
700 //
701 //   On success, PutBlock returns nil.
702 //   On failure, it returns a KeepError with one of the following codes:
703 //
704 //   500 Collision
705 //          A different block with the same hash already exists on this
706 //          Keep server.
707 //   422 MD5Fail
708 //          The MD5 hash of the BLOCK does not match the argument HASH.
709 //   503 Full
710 //          There was not enough space left in any Keep volume to store
711 //          the object.
712 //   500 Fail
713 //          The object could not be stored for some other reason (e.g.
714 //          all writes failed). The text of the error message should
715 //          provide as much detail as possible.
716 //
717 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
718         // Check that BLOCK's checksum matches HASH.
719         blockhash := fmt.Sprintf("%x", md5.Sum(block))
720         if blockhash != hash {
721                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
722                 return 0, RequestHashError
723         }
724
725         // If we already have this data, it's intact on disk, and we
726         // can update its timestamp, return success. If we have
727         // different data with the same hash, return failure.
728         if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
729                 return n, err
730         } else if ctx.Err() != nil {
731                 return 0, ErrClientDisconnect
732         }
733
734         // Choose a Keep volume to write to.
735         // If this volume fails, try all of the volumes in order.
736         if vol := KeepVM.NextWritable(); vol != nil {
737                 if err := vol.Put(ctx, hash, block); err == nil {
738                         return vol.Replication(), nil // success!
739                 }
740                 if ctx.Err() != nil {
741                         return 0, ErrClientDisconnect
742                 }
743         }
744
745         writables := KeepVM.AllWritable()
746         if len(writables) == 0 {
747                 log.Print("No writable volumes.")
748                 return 0, FullError
749         }
750
751         allFull := true
752         for _, vol := range writables {
753                 err := vol.Put(ctx, hash, block)
754                 if ctx.Err() != nil {
755                         return 0, ErrClientDisconnect
756                 }
757                 if err == nil {
758                         return vol.Replication(), nil // success!
759                 }
760                 if err != FullError {
761                         // The volume is not full but the
762                         // write did not succeed.  Report the
763                         // error and continue trying.
764                         allFull = false
765                         log.Printf("%s: Write(%s): %s", vol, hash, err)
766                 }
767         }
768
769         if allFull {
770                 log.Print("All volumes are full.")
771                 return 0, FullError
772         }
773         // Already logged the non-full errors.
774         return 0, GenericError
775 }
776
777 // CompareAndTouch returns the current replication level if one of the
778 // volumes already has the given content and it successfully updates
779 // the relevant block's modification time in order to protect it from
780 // premature garbage collection. Otherwise, it returns a non-nil
781 // error.
782 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
783         var bestErr error = NotFoundError
784         for _, vol := range KeepVM.AllWritable() {
785                 err := vol.Compare(ctx, hash, buf)
786                 if ctx.Err() != nil {
787                         return 0, ctx.Err()
788                 } else if err == CollisionError {
789                         // Stop if we have a block with same hash but
790                         // different content. (It will be impossible
791                         // to tell which one is wanted if we have
792                         // both, so there's no point writing it even
793                         // on a different volume.)
794                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
795                         return 0, err
796                 } else if os.IsNotExist(err) {
797                         // Block does not exist. This is the only
798                         // "normal" error: we don't log anything.
799                         continue
800                 } else if err != nil {
801                         // Couldn't open file, data is corrupt on
802                         // disk, etc.: log this abnormal condition,
803                         // and try the next volume.
804                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
805                         continue
806                 }
807                 if err := vol.Touch(hash); err != nil {
808                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
809                         bestErr = err
810                         continue
811                 }
812                 // Compare and Touch both worked --> done.
813                 return vol.Replication(), nil
814         }
815         return 0, bestErr
816 }
817
818 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
819
820 // IsValidLocator returns true if the specified string is a valid Keep locator.
821 //   When Keep is extended to support hash types other than MD5,
822 //   this should be updated to cover those as well.
823 //
824 func IsValidLocator(loc string) bool {
825         return validLocatorRe.MatchString(loc)
826 }
827
828 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
829
830 // GetAPIToken returns the OAuth2 token from the Authorization
831 // header of a HTTP request, or an empty string if no matching
832 // token is found.
833 func GetAPIToken(req *http.Request) string {
834         if auth, ok := req.Header["Authorization"]; ok {
835                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
836                         return match[1]
837                 }
838         }
839         return ""
840 }
841
842 // IsExpired returns true if the given Unix timestamp (expressed as a
843 // hexadecimal string) is in the past, or if timestampHex cannot be
844 // parsed as a hexadecimal string.
845 func IsExpired(timestampHex string) bool {
846         ts, err := strconv.ParseInt(timestampHex, 16, 0)
847         if err != nil {
848                 log.Printf("IsExpired: %s", err)
849                 return true
850         }
851         return time.Unix(ts, 0).Before(time.Now())
852 }
853
854 // CanDelete returns true if the user identified by apiToken is
855 // allowed to delete blocks.
856 func CanDelete(apiToken string) bool {
857         if apiToken == "" {
858                 return false
859         }
860         // Blocks may be deleted only when Keep has been configured with a
861         // data manager.
862         if IsSystemAuth(apiToken) {
863                 return true
864         }
865         // TODO(twp): look up apiToken with the API server
866         // return true if is_admin is true and if the token
867         // has unlimited scope
868         return false
869 }
870
871 // IsSystemAuth returns true if the given token is allowed to perform
872 // system level actions like deleting data.
873 func IsSystemAuth(token string) bool {
874         return token != "" && token == theConfig.systemAuthToken
875 }