14819: Merge branch 'master' into 14819-arvados-jobs-on-stretch
[arvados.git] / services / keepstore / handlers.go
1 // Copyright (C) The Arvados Authors. All rights reserved.
2 //
3 // SPDX-License-Identifier: AGPL-3.0
4
5 package main
6
7 import (
8         "container/list"
9         "context"
10         "crypto/md5"
11         "encoding/json"
12         "fmt"
13         "io"
14         "net/http"
15         "os"
16         "regexp"
17         "runtime"
18         "strconv"
19         "strings"
20         "sync"
21         "time"
22
23         "git.curoverse.com/arvados.git/sdk/go/arvados"
24         "git.curoverse.com/arvados.git/sdk/go/health"
25         "git.curoverse.com/arvados.git/sdk/go/httpserver"
26         "github.com/gorilla/mux"
27 )
28
29 type router struct {
30         *mux.Router
31         limiter     httpserver.RequestCounter
32         cluster     *arvados.Cluster
33         remoteProxy remoteProxy
34 }
35
36 // MakeRESTRouter returns a new router that forwards all Keep requests
37 // to the appropriate handlers.
38 func MakeRESTRouter(cluster *arvados.Cluster) http.Handler {
39         rtr := &router{
40                 Router:  mux.NewRouter(),
41                 cluster: cluster,
42         }
43
44         rtr.HandleFunc(
45                 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
46         rtr.HandleFunc(
47                 `/{hash:[0-9a-f]{32}}+{hints}`,
48                 rtr.handleGET).Methods("GET", "HEAD")
49
50         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
51         rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
52         // List all blocks stored here. Privileged client only.
53         rtr.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
54         // List blocks stored here whose hash has the given prefix.
55         // Privileged client only.
56         rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
57
58         // Internals/debugging info (runtime.MemStats)
59         rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
60
61         // List volumes: path, device number, bytes used/avail.
62         rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
63
64         // List mounts: UUID, readonly, tier, device ID, ...
65         rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
66         rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
67         rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
68
69         // Replace the current pull queue.
70         rtr.HandleFunc(`/pull`, PullHandler).Methods("PUT")
71
72         // Replace the current trash queue.
73         rtr.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
74
75         // Untrash moves blocks from trash back into store
76         rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
77
78         rtr.Handle("/_health/{check}", &health.Handler{
79                 Token:  theConfig.ManagementToken,
80                 Prefix: "/_health/",
81         }).Methods("GET")
82
83         // Any request which does not match any of these routes gets
84         // 400 Bad Request.
85         rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
86
87         rtr.limiter = httpserver.NewRequestLimiter(theConfig.MaxRequests, rtr)
88
89         instrumented := httpserver.Instrument(nil, nil,
90                 httpserver.AddRequestIDs(httpserver.LogRequests(nil, rtr.limiter)))
91         return instrumented.ServeAPI(theConfig.ManagementToken, instrumented)
92 }
93
94 // BadRequestHandler is a HandleFunc to address bad requests.
95 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
96         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
97 }
98
99 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
100         ctx, cancel := contextForResponse(context.TODO(), resp)
101         defer cancel()
102
103         locator := req.URL.Path[1:]
104         if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
105                 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster)
106                 return
107         }
108
109         if theConfig.RequireSignatures {
110                 locator := req.URL.Path[1:] // strip leading slash
111                 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
112                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
113                         return
114                 }
115         }
116
117         // TODO: Probe volumes to check whether the block _might_
118         // exist. Some volumes/types could support a quick existence
119         // check without causing other operations to suffer. If all
120         // volumes support that, and assure us the block definitely
121         // isn't here, we can return 404 now instead of waiting for a
122         // buffer.
123
124         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
125         if err != nil {
126                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
127                 return
128         }
129         defer bufs.Put(buf)
130
131         size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
132         if err != nil {
133                 code := http.StatusInternalServerError
134                 if err, ok := err.(*KeepError); ok {
135                         code = err.HTTPCode
136                 }
137                 http.Error(resp, err.Error(), code)
138                 return
139         }
140
141         resp.Header().Set("Content-Length", strconv.Itoa(size))
142         resp.Header().Set("Content-Type", "application/octet-stream")
143         resp.Write(buf[:size])
144 }
145
146 // Return a new context that gets cancelled by resp's CloseNotifier.
147 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
148         ctx, cancel := context.WithCancel(parent)
149         if cn, ok := resp.(http.CloseNotifier); ok {
150                 go func(c <-chan bool) {
151                         select {
152                         case <-c:
153                                 theConfig.debugLogf("cancel context")
154                                 cancel()
155                         case <-ctx.Done():
156                         }
157                 }(cn.CloseNotify())
158         }
159         return ctx, cancel
160 }
161
162 // Get a buffer from the pool -- but give up and return a non-nil
163 // error if ctx ends before we get a buffer.
164 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
165         bufReady := make(chan []byte)
166         go func() {
167                 bufReady <- bufs.Get(bufSize)
168         }()
169         select {
170         case buf := <-bufReady:
171                 return buf, nil
172         case <-ctx.Done():
173                 go func() {
174                         // Even if closeNotifier happened first, we
175                         // need to keep waiting for our buf so we can
176                         // return it to the pool.
177                         bufs.Put(<-bufReady)
178                 }()
179                 return nil, ErrClientDisconnect
180         }
181 }
182
183 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
184         ctx, cancel := contextForResponse(context.TODO(), resp)
185         defer cancel()
186
187         hash := mux.Vars(req)["hash"]
188
189         // Detect as many error conditions as possible before reading
190         // the body: avoid transmitting data that will not end up
191         // being written anyway.
192
193         if req.ContentLength == -1 {
194                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
195                 return
196         }
197
198         if req.ContentLength > BlockSize {
199                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
200                 return
201         }
202
203         if len(KeepVM.AllWritable()) == 0 {
204                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
205                 return
206         }
207
208         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
209         if err != nil {
210                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
211                 return
212         }
213
214         _, err = io.ReadFull(req.Body, buf)
215         if err != nil {
216                 http.Error(resp, err.Error(), 500)
217                 bufs.Put(buf)
218                 return
219         }
220
221         replication, err := PutBlock(ctx, buf, hash)
222         bufs.Put(buf)
223
224         if err != nil {
225                 code := http.StatusInternalServerError
226                 if err, ok := err.(*KeepError); ok {
227                         code = err.HTTPCode
228                 }
229                 http.Error(resp, err.Error(), code)
230                 return
231         }
232
233         // Success; add a size hint, sign the locator if possible, and
234         // return it to the client.
235         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
236         apiToken := GetAPIToken(req)
237         if theConfig.blobSigningKey != nil && apiToken != "" {
238                 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
239                 returnHash = SignLocator(returnHash, apiToken, expiry)
240         }
241         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
242         resp.Write([]byte(returnHash + "\n"))
243 }
244
245 // IndexHandler responds to "/index", "/index/{prefix}", and
246 // "/mounts/{uuid}/blocks" requests.
247 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
248         if !IsSystemAuth(GetAPIToken(req)) {
249                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
250                 return
251         }
252
253         prefix := mux.Vars(req)["prefix"]
254         if prefix == "" {
255                 req.ParseForm()
256                 prefix = req.Form.Get("prefix")
257         }
258
259         uuid := mux.Vars(req)["uuid"]
260
261         var vols []Volume
262         if uuid == "" {
263                 vols = KeepVM.AllReadable()
264         } else if v := KeepVM.Lookup(uuid, false); v == nil {
265                 http.Error(resp, "mount not found", http.StatusNotFound)
266                 return
267         } else {
268                 vols = []Volume{v}
269         }
270
271         for _, v := range vols {
272                 if err := v.IndexTo(prefix, resp); err != nil {
273                         // The only errors returned by IndexTo are
274                         // write errors returned by resp.Write(),
275                         // which probably means the client has
276                         // disconnected and this error will never be
277                         // reported to the client -- but it will
278                         // appear in our own error log.
279                         http.Error(resp, err.Error(), http.StatusInternalServerError)
280                         return
281                 }
282         }
283         // An empty line at EOF is the only way the client can be
284         // assured the entire index was received.
285         resp.Write([]byte{'\n'})
286 }
287
288 // MountsHandler responds to "GET /mounts" requests.
289 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
290         err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
291         if err != nil {
292                 http.Error(resp, err.Error(), http.StatusInternalServerError)
293         }
294 }
295
296 // PoolStatus struct
297 type PoolStatus struct {
298         Alloc uint64 `json:"BytesAllocatedCumulative"`
299         Cap   int    `json:"BuffersMax"`
300         Len   int    `json:"BuffersInUse"`
301 }
302
303 type volumeStatusEnt struct {
304         Label         string
305         Status        *VolumeStatus `json:",omitempty"`
306         VolumeStats   *ioStats      `json:",omitempty"`
307         InternalStats interface{}   `json:",omitempty"`
308 }
309
310 // NodeStatus struct
311 type NodeStatus struct {
312         Volumes         []*volumeStatusEnt
313         BufferPool      PoolStatus
314         PullQueue       WorkQueueStatus
315         TrashQueue      WorkQueueStatus
316         RequestsCurrent int
317         RequestsMax     int
318         Version         string
319 }
320
321 var st NodeStatus
322 var stLock sync.Mutex
323
324 // DebugHandler addresses /debug.json requests.
325 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
326         type debugStats struct {
327                 MemStats runtime.MemStats
328         }
329         var ds debugStats
330         runtime.ReadMemStats(&ds.MemStats)
331         err := json.NewEncoder(resp).Encode(&ds)
332         if err != nil {
333                 http.Error(resp, err.Error(), 500)
334         }
335 }
336
337 // StatusHandler addresses /status.json requests.
338 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
339         stLock.Lock()
340         rtr.readNodeStatus(&st)
341         jstat, err := json.Marshal(&st)
342         stLock.Unlock()
343         if err == nil {
344                 resp.Write(jstat)
345         } else {
346                 log.Printf("json.Marshal: %s", err)
347                 log.Printf("NodeStatus = %v", &st)
348                 http.Error(resp, err.Error(), 500)
349         }
350 }
351
352 // populate the given NodeStatus struct with current values.
353 func (rtr *router) readNodeStatus(st *NodeStatus) {
354         st.Version = version
355         vols := KeepVM.AllReadable()
356         if cap(st.Volumes) < len(vols) {
357                 st.Volumes = make([]*volumeStatusEnt, len(vols))
358         }
359         st.Volumes = st.Volumes[:0]
360         for _, vol := range vols {
361                 var internalStats interface{}
362                 if vol, ok := vol.(InternalStatser); ok {
363                         internalStats = vol.InternalStats()
364                 }
365                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
366                         Label:         vol.String(),
367                         Status:        vol.Status(),
368                         InternalStats: internalStats,
369                         //VolumeStats: KeepVM.VolumeStats(vol),
370                 })
371         }
372         st.BufferPool.Alloc = bufs.Alloc()
373         st.BufferPool.Cap = bufs.Cap()
374         st.BufferPool.Len = bufs.Len()
375         st.PullQueue = getWorkQueueStatus(pullq)
376         st.TrashQueue = getWorkQueueStatus(trashq)
377         if rtr.limiter != nil {
378                 st.RequestsCurrent = rtr.limiter.Current()
379                 st.RequestsMax = rtr.limiter.Max()
380         }
381 }
382
383 // return a WorkQueueStatus for the given queue. If q is nil (which
384 // should never happen except in test suites), return a zero status
385 // value instead of crashing.
386 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
387         if q == nil {
388                 // This should only happen during tests.
389                 return WorkQueueStatus{}
390         }
391         return q.Status()
392 }
393
394 // DeleteHandler processes DELETE requests.
395 //
396 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
397 // from all connected volumes.
398 //
399 // Only the Data Manager, or an Arvados admin with scope "all", are
400 // allowed to issue DELETE requests.  If a DELETE request is not
401 // authenticated or is issued by a non-admin user, the server returns
402 // a PermissionError.
403 //
404 // Upon receiving a valid request from an authorized user,
405 // DeleteHandler deletes all copies of the specified block on local
406 // writable volumes.
407 //
408 // Response format:
409 //
410 // If the requested blocks was not found on any volume, the response
411 // code is HTTP 404 Not Found.
412 //
413 // Otherwise, the response code is 200 OK, with a response body
414 // consisting of the JSON message
415 //
416 //    {"copies_deleted":d,"copies_failed":f}
417 //
418 // where d and f are integers representing the number of blocks that
419 // were successfully and unsuccessfully deleted.
420 //
421 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
422         hash := mux.Vars(req)["hash"]
423
424         // Confirm that this user is an admin and has a token with unlimited scope.
425         var tok = GetAPIToken(req)
426         if tok == "" || !CanDelete(tok) {
427                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
428                 return
429         }
430
431         if !theConfig.EnableDelete {
432                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
433                 return
434         }
435
436         // Delete copies of this block from all available volumes.
437         // Report how many blocks were successfully deleted, and how
438         // many were found on writable volumes but not deleted.
439         var result struct {
440                 Deleted int `json:"copies_deleted"`
441                 Failed  int `json:"copies_failed"`
442         }
443         for _, vol := range KeepVM.AllWritable() {
444                 if err := vol.Trash(hash); err == nil {
445                         result.Deleted++
446                 } else if os.IsNotExist(err) {
447                         continue
448                 } else {
449                         result.Failed++
450                         log.Println("DeleteHandler:", err)
451                 }
452         }
453
454         var st int
455
456         if result.Deleted == 0 && result.Failed == 0 {
457                 st = http.StatusNotFound
458         } else {
459                 st = http.StatusOK
460         }
461
462         resp.WriteHeader(st)
463
464         if st == http.StatusOK {
465                 if body, err := json.Marshal(result); err == nil {
466                         resp.Write(body)
467                 } else {
468                         log.Printf("json.Marshal: %s (result = %v)", err, result)
469                         http.Error(resp, err.Error(), 500)
470                 }
471         }
472 }
473
474 /* PullHandler processes "PUT /pull" requests for the data manager.
475    The request body is a JSON message containing a list of pull
476    requests in the following format:
477
478    [
479       {
480          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
481          "servers":[
482                         "keep0.qr1hi.arvadosapi.com:25107",
483                         "keep1.qr1hi.arvadosapi.com:25108"
484                  ]
485           },
486           {
487                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
488                  "servers":[
489                         "10.0.1.5:25107",
490                         "10.0.1.6:25107",
491                         "10.0.1.7:25108"
492                  ]
493           },
494           ...
495    ]
496
497    Each pull request in the list consists of a block locator string
498    and an ordered list of servers.  Keepstore should try to fetch the
499    block from each server in turn.
500
501    If the request has not been sent by the Data Manager, return 401
502    Unauthorized.
503
504    If the JSON unmarshalling fails, return 400 Bad Request.
505 */
506
507 // PullRequest consists of a block locator and an ordered list of servers
508 type PullRequest struct {
509         Locator string   `json:"locator"`
510         Servers []string `json:"servers"`
511
512         // Destination mount, or "" for "anywhere"
513         MountUUID string `json:"mount_uuid"`
514 }
515
516 // PullHandler processes "PUT /pull" requests for the data manager.
517 func PullHandler(resp http.ResponseWriter, req *http.Request) {
518         // Reject unauthorized requests.
519         if !IsSystemAuth(GetAPIToken(req)) {
520                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
521                 return
522         }
523
524         // Parse the request body.
525         var pr []PullRequest
526         r := json.NewDecoder(req.Body)
527         if err := r.Decode(&pr); err != nil {
528                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
529                 return
530         }
531
532         // We have a properly formatted pull list sent from the data
533         // manager.  Report success and send the list to the pull list
534         // manager for further handling.
535         resp.WriteHeader(http.StatusOK)
536         resp.Write([]byte(
537                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
538
539         plist := list.New()
540         for _, p := range pr {
541                 plist.PushBack(p)
542         }
543         pullq.ReplaceQueue(plist)
544 }
545
546 // TrashRequest consists of a block locator and its Mtime
547 type TrashRequest struct {
548         Locator    string `json:"locator"`
549         BlockMtime int64  `json:"block_mtime"`
550
551         // Target mount, or "" for "everywhere"
552         MountUUID string `json:"mount_uuid"`
553 }
554
555 // TrashHandler processes /trash requests.
556 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
557         // Reject unauthorized requests.
558         if !IsSystemAuth(GetAPIToken(req)) {
559                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
560                 return
561         }
562
563         // Parse the request body.
564         var trash []TrashRequest
565         r := json.NewDecoder(req.Body)
566         if err := r.Decode(&trash); err != nil {
567                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
568                 return
569         }
570
571         // We have a properly formatted trash list sent from the data
572         // manager.  Report success and send the list to the trash work
573         // queue for further handling.
574         resp.WriteHeader(http.StatusOK)
575         resp.Write([]byte(
576                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
577
578         tlist := list.New()
579         for _, t := range trash {
580                 tlist.PushBack(t)
581         }
582         trashq.ReplaceQueue(tlist)
583 }
584
585 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
586 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
587         // Reject unauthorized requests.
588         if !IsSystemAuth(GetAPIToken(req)) {
589                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
590                 return
591         }
592
593         hash := mux.Vars(req)["hash"]
594
595         if len(KeepVM.AllWritable()) == 0 {
596                 http.Error(resp, "No writable volumes", http.StatusNotFound)
597                 return
598         }
599
600         var untrashedOn, failedOn []string
601         var numNotFound int
602         for _, vol := range KeepVM.AllWritable() {
603                 err := vol.Untrash(hash)
604
605                 if os.IsNotExist(err) {
606                         numNotFound++
607                 } else if err != nil {
608                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
609                         failedOn = append(failedOn, vol.String())
610                 } else {
611                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
612                         untrashedOn = append(untrashedOn, vol.String())
613                 }
614         }
615
616         if numNotFound == len(KeepVM.AllWritable()) {
617                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
618                 return
619         }
620
621         if len(failedOn) == len(KeepVM.AllWritable()) {
622                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
623         } else {
624                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
625                 if len(failedOn) > 0 {
626                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
627                 }
628                 resp.Write([]byte(respBody))
629         }
630 }
631
632 // GetBlock and PutBlock implement lower-level code for handling
633 // blocks by rooting through volumes connected to the local machine.
634 // Once the handler has determined that system policy permits the
635 // request, it calls these methods to perform the actual operation.
636 //
637 // TODO(twp): this code would probably be better located in the
638 // VolumeManager interface. As an abstraction, the VolumeManager
639 // should be the only part of the code that cares about which volume a
640 // block is stored on, so it should be responsible for figuring out
641 // which volume to check for fetching blocks, storing blocks, etc.
642
643 // GetBlock fetches the block identified by "hash" into the provided
644 // buf, and returns the data size.
645 //
646 // If the block cannot be found on any volume, returns NotFoundError.
647 //
648 // If the block found does not have the correct MD5 hash, returns
649 // DiskHashError.
650 //
651 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
652         // Attempt to read the requested hash from a keep volume.
653         errorToCaller := NotFoundError
654
655         for _, vol := range KeepVM.AllReadable() {
656                 size, err := vol.Get(ctx, hash, buf)
657                 select {
658                 case <-ctx.Done():
659                         return 0, ErrClientDisconnect
660                 default:
661                 }
662                 if err != nil {
663                         // IsNotExist is an expected error and may be
664                         // ignored. All other errors are logged. In
665                         // any case we continue trying to read other
666                         // volumes. If all volumes report IsNotExist,
667                         // we return a NotFoundError.
668                         if !os.IsNotExist(err) {
669                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
670                         }
671                         // If some volume returns a transient error, return it to the caller
672                         // instead of "Not found" so it can retry.
673                         if err == VolumeBusyError {
674                                 errorToCaller = err.(*KeepError)
675                         }
676                         continue
677                 }
678                 // Check the file checksum.
679                 //
680                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
681                 if filehash != hash {
682                         // TODO: Try harder to tell a sysadmin about
683                         // this.
684                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
685                                 vol, hash, filehash)
686                         errorToCaller = DiskHashError
687                         continue
688                 }
689                 if errorToCaller == DiskHashError {
690                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
691                                 vol, hash)
692                 }
693                 return size, nil
694         }
695         return 0, errorToCaller
696 }
697
698 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
699 //
700 // PutBlock(ctx, block, hash)
701 //   Stores the BLOCK (identified by the content id HASH) in Keep.
702 //
703 //   The MD5 checksum of the block must be identical to the content id HASH.
704 //   If not, an error is returned.
705 //
706 //   PutBlock stores the BLOCK on the first Keep volume with free space.
707 //   A failure code is returned to the user only if all volumes fail.
708 //
709 //   On success, PutBlock returns nil.
710 //   On failure, it returns a KeepError with one of the following codes:
711 //
712 //   500 Collision
713 //          A different block with the same hash already exists on this
714 //          Keep server.
715 //   422 MD5Fail
716 //          The MD5 hash of the BLOCK does not match the argument HASH.
717 //   503 Full
718 //          There was not enough space left in any Keep volume to store
719 //          the object.
720 //   500 Fail
721 //          The object could not be stored for some other reason (e.g.
722 //          all writes failed). The text of the error message should
723 //          provide as much detail as possible.
724 //
725 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
726         // Check that BLOCK's checksum matches HASH.
727         blockhash := fmt.Sprintf("%x", md5.Sum(block))
728         if blockhash != hash {
729                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
730                 return 0, RequestHashError
731         }
732
733         // If we already have this data, it's intact on disk, and we
734         // can update its timestamp, return success. If we have
735         // different data with the same hash, return failure.
736         if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
737                 return n, err
738         } else if ctx.Err() != nil {
739                 return 0, ErrClientDisconnect
740         }
741
742         // Choose a Keep volume to write to.
743         // If this volume fails, try all of the volumes in order.
744         if vol := KeepVM.NextWritable(); vol != nil {
745                 if err := vol.Put(ctx, hash, block); err == nil {
746                         return vol.Replication(), nil // success!
747                 }
748                 if ctx.Err() != nil {
749                         return 0, ErrClientDisconnect
750                 }
751         }
752
753         writables := KeepVM.AllWritable()
754         if len(writables) == 0 {
755                 log.Print("No writable volumes.")
756                 return 0, FullError
757         }
758
759         allFull := true
760         for _, vol := range writables {
761                 err := vol.Put(ctx, hash, block)
762                 if ctx.Err() != nil {
763                         return 0, ErrClientDisconnect
764                 }
765                 if err == nil {
766                         return vol.Replication(), nil // success!
767                 }
768                 if err != FullError {
769                         // The volume is not full but the
770                         // write did not succeed.  Report the
771                         // error and continue trying.
772                         allFull = false
773                         log.Printf("%s: Write(%s): %s", vol, hash, err)
774                 }
775         }
776
777         if allFull {
778                 log.Print("All volumes are full.")
779                 return 0, FullError
780         }
781         // Already logged the non-full errors.
782         return 0, GenericError
783 }
784
785 // CompareAndTouch returns the current replication level if one of the
786 // volumes already has the given content and it successfully updates
787 // the relevant block's modification time in order to protect it from
788 // premature garbage collection. Otherwise, it returns a non-nil
789 // error.
790 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
791         var bestErr error = NotFoundError
792         for _, vol := range KeepVM.AllWritable() {
793                 err := vol.Compare(ctx, hash, buf)
794                 if ctx.Err() != nil {
795                         return 0, ctx.Err()
796                 } else if err == CollisionError {
797                         // Stop if we have a block with same hash but
798                         // different content. (It will be impossible
799                         // to tell which one is wanted if we have
800                         // both, so there's no point writing it even
801                         // on a different volume.)
802                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
803                         return 0, err
804                 } else if os.IsNotExist(err) {
805                         // Block does not exist. This is the only
806                         // "normal" error: we don't log anything.
807                         continue
808                 } else if err != nil {
809                         // Couldn't open file, data is corrupt on
810                         // disk, etc.: log this abnormal condition,
811                         // and try the next volume.
812                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
813                         continue
814                 }
815                 if err := vol.Touch(hash); err != nil {
816                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
817                         bestErr = err
818                         continue
819                 }
820                 // Compare and Touch both worked --> done.
821                 return vol.Replication(), nil
822         }
823         return 0, bestErr
824 }
825
826 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
827
828 // IsValidLocator returns true if the specified string is a valid Keep locator.
829 //   When Keep is extended to support hash types other than MD5,
830 //   this should be updated to cover those as well.
831 //
832 func IsValidLocator(loc string) bool {
833         return validLocatorRe.MatchString(loc)
834 }
835
836 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
837
838 // GetAPIToken returns the OAuth2 token from the Authorization
839 // header of a HTTP request, or an empty string if no matching
840 // token is found.
841 func GetAPIToken(req *http.Request) string {
842         if auth, ok := req.Header["Authorization"]; ok {
843                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
844                         return match[2]
845                 }
846         }
847         return ""
848 }
849
850 // IsExpired returns true if the given Unix timestamp (expressed as a
851 // hexadecimal string) is in the past, or if timestampHex cannot be
852 // parsed as a hexadecimal string.
853 func IsExpired(timestampHex string) bool {
854         ts, err := strconv.ParseInt(timestampHex, 16, 0)
855         if err != nil {
856                 log.Printf("IsExpired: %s", err)
857                 return true
858         }
859         return time.Unix(ts, 0).Before(time.Now())
860 }
861
862 // CanDelete returns true if the user identified by apiToken is
863 // allowed to delete blocks.
864 func CanDelete(apiToken string) bool {
865         if apiToken == "" {
866                 return false
867         }
868         // Blocks may be deleted only when Keep has been configured with a
869         // data manager.
870         if IsSystemAuth(apiToken) {
871                 return true
872         }
873         // TODO(twp): look up apiToken with the API server
874         // return true if is_admin is true and if the token
875         // has unlimited scope
876         return false
877 }
878
879 // IsSystemAuth returns true if the given token is allowed to perform
880 // system level actions like deleting data.
881 func IsSystemAuth(token string) bool {
882         return token != "" && token == theConfig.systemAuthToken
883 }