8784: Fix test for latest firefox.
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "container/list"
12         "context"
13         "crypto/md5"
14         "encoding/json"
15         "fmt"
16         "io"
17         "net/http"
18         "os"
19         "regexp"
20         "runtime"
21         "strconv"
22         "strings"
23         "sync"
24         "time"
25
26         "github.com/gorilla/mux"
27
28         "git.curoverse.com/arvados.git/sdk/go/httpserver"
29         log "github.com/Sirupsen/logrus"
30 )
31
32 type router struct {
33         *mux.Router
34         limiter httpserver.RequestCounter
35 }
36
37 // MakeRESTRouter returns a new router that forwards all Keep requests
38 // to the appropriate handlers.
39 func MakeRESTRouter() *router {
40         rest := mux.NewRouter()
41         rtr := &router{Router: rest}
42
43         rest.HandleFunc(
44                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
45         rest.HandleFunc(
46                 `/{hash:[0-9a-f]{32}}+{hints}`,
47                 GetBlockHandler).Methods("GET", "HEAD")
48
49         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
50         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
51         // List all blocks stored here. Privileged client only.
52         rest.HandleFunc(`/index`, rtr.IndexHandler).Methods("GET", "HEAD")
53         // List blocks stored here whose hash has the given prefix.
54         // Privileged client only.
55         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.IndexHandler).Methods("GET", "HEAD")
56
57         // Internals/debugging info (runtime.MemStats)
58         rest.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
59
60         // List volumes: path, device number, bytes used/avail.
61         rest.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
62
63         // List mounts: UUID, readonly, tier, device ID, ...
64         rest.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
65         rest.HandleFunc(`/mounts/{uuid}/blocks`, rtr.IndexHandler).Methods("GET")
66         rest.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.IndexHandler).Methods("GET")
67
68         // Replace the current pull queue.
69         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
70
71         // Replace the current trash queue.
72         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
73
74         // Untrash moves blocks from trash back into store
75         rest.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, UntrashHandler).Methods("PUT")
76
77         // Any request which does not match any of these routes gets
78         // 400 Bad Request.
79         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
80
81         return rtr
82 }
83
84 // BadRequestHandler is a HandleFunc to address bad requests.
85 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
86         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
87 }
88
89 // GetBlockHandler is a HandleFunc to address Get block requests.
90 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
91         ctx, cancel := contextForResponse(context.TODO(), resp)
92         defer cancel()
93
94         if theConfig.RequireSignatures {
95                 locator := req.URL.Path[1:] // strip leading slash
96                 if err := VerifySignature(locator, GetAPIToken(req)); err != nil {
97                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
98                         return
99                 }
100         }
101
102         // TODO: Probe volumes to check whether the block _might_
103         // exist. Some volumes/types could support a quick existence
104         // check without causing other operations to suffer. If all
105         // volumes support that, and assure us the block definitely
106         // isn't here, we can return 404 now instead of waiting for a
107         // buffer.
108
109         buf, err := getBufferWithContext(ctx, bufs, BlockSize)
110         if err != nil {
111                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
112                 return
113         }
114         defer bufs.Put(buf)
115
116         size, err := GetBlock(ctx, mux.Vars(req)["hash"], buf, resp)
117         if err != nil {
118                 code := http.StatusInternalServerError
119                 if err, ok := err.(*KeepError); ok {
120                         code = err.HTTPCode
121                 }
122                 http.Error(resp, err.Error(), code)
123                 return
124         }
125
126         resp.Header().Set("Content-Length", strconv.Itoa(size))
127         resp.Header().Set("Content-Type", "application/octet-stream")
128         resp.Write(buf[:size])
129 }
130
131 // Return a new context that gets cancelled by resp's CloseNotifier.
132 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
133         ctx, cancel := context.WithCancel(parent)
134         if cn, ok := resp.(http.CloseNotifier); ok {
135                 go func(c <-chan bool) {
136                         select {
137                         case <-c:
138                                 theConfig.debugLogf("cancel context")
139                                 cancel()
140                         case <-ctx.Done():
141                         }
142                 }(cn.CloseNotify())
143         }
144         return ctx, cancel
145 }
146
147 // Get a buffer from the pool -- but give up and return a non-nil
148 // error if ctx ends before we get a buffer.
149 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
150         bufReady := make(chan []byte)
151         go func() {
152                 bufReady <- bufs.Get(bufSize)
153         }()
154         select {
155         case buf := <-bufReady:
156                 return buf, nil
157         case <-ctx.Done():
158                 go func() {
159                         // Even if closeNotifier happened first, we
160                         // need to keep waiting for our buf so we can
161                         // return it to the pool.
162                         bufs.Put(<-bufReady)
163                 }()
164                 return nil, ErrClientDisconnect
165         }
166 }
167
168 // PutBlockHandler is a HandleFunc to address Put block requests.
169 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
170         ctx, cancel := contextForResponse(context.TODO(), resp)
171         defer cancel()
172
173         hash := mux.Vars(req)["hash"]
174
175         // Detect as many error conditions as possible before reading
176         // the body: avoid transmitting data that will not end up
177         // being written anyway.
178
179         if req.ContentLength == -1 {
180                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
181                 return
182         }
183
184         if req.ContentLength > BlockSize {
185                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
186                 return
187         }
188
189         if len(KeepVM.AllWritable()) == 0 {
190                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
191                 return
192         }
193
194         buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
195         if err != nil {
196                 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
197                 return
198         }
199
200         _, err = io.ReadFull(req.Body, buf)
201         if err != nil {
202                 http.Error(resp, err.Error(), 500)
203                 bufs.Put(buf)
204                 return
205         }
206
207         replication, err := PutBlock(ctx, buf, hash)
208         bufs.Put(buf)
209
210         if err != nil {
211                 code := http.StatusInternalServerError
212                 if err, ok := err.(*KeepError); ok {
213                         code = err.HTTPCode
214                 }
215                 http.Error(resp, err.Error(), code)
216                 return
217         }
218
219         // Success; add a size hint, sign the locator if possible, and
220         // return it to the client.
221         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
222         apiToken := GetAPIToken(req)
223         if theConfig.blobSigningKey != nil && apiToken != "" {
224                 expiry := time.Now().Add(theConfig.BlobSignatureTTL.Duration())
225                 returnHash = SignLocator(returnHash, apiToken, expiry)
226         }
227         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
228         resp.Write([]byte(returnHash + "\n"))
229 }
230
231 // IndexHandler responds to "/index", "/index/{prefix}", and
232 // "/mounts/{uuid}/blocks" requests.
233 func (rtr *router) IndexHandler(resp http.ResponseWriter, req *http.Request) {
234         if !IsSystemAuth(GetAPIToken(req)) {
235                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
236                 return
237         }
238
239         prefix := mux.Vars(req)["prefix"]
240         if prefix == "" {
241                 req.ParseForm()
242                 prefix = req.Form.Get("prefix")
243         }
244
245         uuid := mux.Vars(req)["uuid"]
246
247         var vols []Volume
248         if uuid == "" {
249                 vols = KeepVM.AllReadable()
250         } else if v := KeepVM.Lookup(uuid, false); v == nil {
251                 http.Error(resp, "mount not found", http.StatusNotFound)
252                 return
253         } else {
254                 vols = []Volume{v}
255         }
256
257         for _, v := range vols {
258                 if err := v.IndexTo(prefix, resp); err != nil {
259                         // The only errors returned by IndexTo are
260                         // write errors returned by resp.Write(),
261                         // which probably means the client has
262                         // disconnected and this error will never be
263                         // reported to the client -- but it will
264                         // appear in our own error log.
265                         http.Error(resp, err.Error(), http.StatusInternalServerError)
266                         return
267                 }
268         }
269         // An empty line at EOF is the only way the client can be
270         // assured the entire index was received.
271         resp.Write([]byte{'\n'})
272 }
273
274 // MountsHandler responds to "GET /mounts" requests.
275 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
276         err := json.NewEncoder(resp).Encode(KeepVM.Mounts())
277         if err != nil {
278                 http.Error(resp, err.Error(), http.StatusInternalServerError)
279         }
280 }
281
282 // PoolStatus struct
283 type PoolStatus struct {
284         Alloc uint64 `json:"BytesAllocated"`
285         Cap   int    `json:"BuffersMax"`
286         Len   int    `json:"BuffersInUse"`
287 }
288
289 type volumeStatusEnt struct {
290         Label         string
291         Status        *VolumeStatus `json:",omitempty"`
292         VolumeStats   *ioStats      `json:",omitempty"`
293         InternalStats interface{}   `json:",omitempty"`
294 }
295
296 // NodeStatus struct
297 type NodeStatus struct {
298         Volumes         []*volumeStatusEnt
299         BufferPool      PoolStatus
300         PullQueue       WorkQueueStatus
301         TrashQueue      WorkQueueStatus
302         RequestsCurrent int
303         RequestsMax     int
304 }
305
306 var st NodeStatus
307 var stLock sync.Mutex
308
309 // DebugHandler addresses /debug.json requests.
310 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
311         type debugStats struct {
312                 MemStats runtime.MemStats
313         }
314         var ds debugStats
315         runtime.ReadMemStats(&ds.MemStats)
316         err := json.NewEncoder(resp).Encode(&ds)
317         if err != nil {
318                 http.Error(resp, err.Error(), 500)
319         }
320 }
321
322 // StatusHandler addresses /status.json requests.
323 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
324         stLock.Lock()
325         rtr.readNodeStatus(&st)
326         jstat, err := json.Marshal(&st)
327         stLock.Unlock()
328         if err == nil {
329                 resp.Write(jstat)
330         } else {
331                 log.Printf("json.Marshal: %s", err)
332                 log.Printf("NodeStatus = %v", &st)
333                 http.Error(resp, err.Error(), 500)
334         }
335 }
336
337 // populate the given NodeStatus struct with current values.
338 func (rtr *router) readNodeStatus(st *NodeStatus) {
339         vols := KeepVM.AllReadable()
340         if cap(st.Volumes) < len(vols) {
341                 st.Volumes = make([]*volumeStatusEnt, len(vols))
342         }
343         st.Volumes = st.Volumes[:0]
344         for _, vol := range vols {
345                 var internalStats interface{}
346                 if vol, ok := vol.(InternalStatser); ok {
347                         internalStats = vol.InternalStats()
348                 }
349                 st.Volumes = append(st.Volumes, &volumeStatusEnt{
350                         Label:         vol.String(),
351                         Status:        vol.Status(),
352                         InternalStats: internalStats,
353                         //VolumeStats: KeepVM.VolumeStats(vol),
354                 })
355         }
356         st.BufferPool.Alloc = bufs.Alloc()
357         st.BufferPool.Cap = bufs.Cap()
358         st.BufferPool.Len = bufs.Len()
359         st.PullQueue = getWorkQueueStatus(pullq)
360         st.TrashQueue = getWorkQueueStatus(trashq)
361         if rtr.limiter != nil {
362                 st.RequestsCurrent = rtr.limiter.Current()
363                 st.RequestsMax = rtr.limiter.Max()
364         }
365 }
366
367 // return a WorkQueueStatus for the given queue. If q is nil (which
368 // should never happen except in test suites), return a zero status
369 // value instead of crashing.
370 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
371         if q == nil {
372                 // This should only happen during tests.
373                 return WorkQueueStatus{}
374         }
375         return q.Status()
376 }
377
378 // DeleteHandler processes DELETE requests.
379 //
380 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
381 // from all connected volumes.
382 //
383 // Only the Data Manager, or an Arvados admin with scope "all", are
384 // allowed to issue DELETE requests.  If a DELETE request is not
385 // authenticated or is issued by a non-admin user, the server returns
386 // a PermissionError.
387 //
388 // Upon receiving a valid request from an authorized user,
389 // DeleteHandler deletes all copies of the specified block on local
390 // writable volumes.
391 //
392 // Response format:
393 //
394 // If the requested blocks was not found on any volume, the response
395 // code is HTTP 404 Not Found.
396 //
397 // Otherwise, the response code is 200 OK, with a response body
398 // consisting of the JSON message
399 //
400 //    {"copies_deleted":d,"copies_failed":f}
401 //
402 // where d and f are integers representing the number of blocks that
403 // were successfully and unsuccessfully deleted.
404 //
405 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
406         hash := mux.Vars(req)["hash"]
407
408         // Confirm that this user is an admin and has a token with unlimited scope.
409         var tok = GetAPIToken(req)
410         if tok == "" || !CanDelete(tok) {
411                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
412                 return
413         }
414
415         if !theConfig.EnableDelete {
416                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
417                 return
418         }
419
420         // Delete copies of this block from all available volumes.
421         // Report how many blocks were successfully deleted, and how
422         // many were found on writable volumes but not deleted.
423         var result struct {
424                 Deleted int `json:"copies_deleted"`
425                 Failed  int `json:"copies_failed"`
426         }
427         for _, vol := range KeepVM.AllWritable() {
428                 if err := vol.Trash(hash); err == nil {
429                         result.Deleted++
430                 } else if os.IsNotExist(err) {
431                         continue
432                 } else {
433                         result.Failed++
434                         log.Println("DeleteHandler:", err)
435                 }
436         }
437
438         var st int
439
440         if result.Deleted == 0 && result.Failed == 0 {
441                 st = http.StatusNotFound
442         } else {
443                 st = http.StatusOK
444         }
445
446         resp.WriteHeader(st)
447
448         if st == http.StatusOK {
449                 if body, err := json.Marshal(result); err == nil {
450                         resp.Write(body)
451                 } else {
452                         log.Printf("json.Marshal: %s (result = %v)", err, result)
453                         http.Error(resp, err.Error(), 500)
454                 }
455         }
456 }
457
458 /* PullHandler processes "PUT /pull" requests for the data manager.
459    The request body is a JSON message containing a list of pull
460    requests in the following format:
461
462    [
463       {
464          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
465          "servers":[
466                         "keep0.qr1hi.arvadosapi.com:25107",
467                         "keep1.qr1hi.arvadosapi.com:25108"
468                  ]
469           },
470           {
471                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
472                  "servers":[
473                         "10.0.1.5:25107",
474                         "10.0.1.6:25107",
475                         "10.0.1.7:25108"
476                  ]
477           },
478           ...
479    ]
480
481    Each pull request in the list consists of a block locator string
482    and an ordered list of servers.  Keepstore should try to fetch the
483    block from each server in turn.
484
485    If the request has not been sent by the Data Manager, return 401
486    Unauthorized.
487
488    If the JSON unmarshalling fails, return 400 Bad Request.
489 */
490
491 // PullRequest consists of a block locator and an ordered list of servers
492 type PullRequest struct {
493         Locator string   `json:"locator"`
494         Servers []string `json:"servers"`
495
496         // Destination mount, or "" for "anywhere"
497         MountUUID string
498 }
499
500 // PullHandler processes "PUT /pull" requests for the data manager.
501 func PullHandler(resp http.ResponseWriter, req *http.Request) {
502         // Reject unauthorized requests.
503         if !IsSystemAuth(GetAPIToken(req)) {
504                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
505                 return
506         }
507
508         // Parse the request body.
509         var pr []PullRequest
510         r := json.NewDecoder(req.Body)
511         if err := r.Decode(&pr); err != nil {
512                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
513                 return
514         }
515
516         // We have a properly formatted pull list sent from the data
517         // manager.  Report success and send the list to the pull list
518         // manager for further handling.
519         resp.WriteHeader(http.StatusOK)
520         resp.Write([]byte(
521                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
522
523         plist := list.New()
524         for _, p := range pr {
525                 plist.PushBack(p)
526         }
527         pullq.ReplaceQueue(plist)
528 }
529
530 // TrashRequest consists of a block locator and it's Mtime
531 type TrashRequest struct {
532         Locator    string `json:"locator"`
533         BlockMtime int64  `json:"block_mtime"`
534
535         // Target mount, or "" for "everywhere"
536         MountUUID string
537 }
538
539 // TrashHandler processes /trash requests.
540 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
541         // Reject unauthorized requests.
542         if !IsSystemAuth(GetAPIToken(req)) {
543                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
544                 return
545         }
546
547         // Parse the request body.
548         var trash []TrashRequest
549         r := json.NewDecoder(req.Body)
550         if err := r.Decode(&trash); err != nil {
551                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
552                 return
553         }
554
555         // We have a properly formatted trash list sent from the data
556         // manager.  Report success and send the list to the trash work
557         // queue for further handling.
558         resp.WriteHeader(http.StatusOK)
559         resp.Write([]byte(
560                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
561
562         tlist := list.New()
563         for _, t := range trash {
564                 tlist.PushBack(t)
565         }
566         trashq.ReplaceQueue(tlist)
567 }
568
569 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
570 func UntrashHandler(resp http.ResponseWriter, req *http.Request) {
571         // Reject unauthorized requests.
572         if !IsSystemAuth(GetAPIToken(req)) {
573                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
574                 return
575         }
576
577         hash := mux.Vars(req)["hash"]
578
579         if len(KeepVM.AllWritable()) == 0 {
580                 http.Error(resp, "No writable volumes", http.StatusNotFound)
581                 return
582         }
583
584         var untrashedOn, failedOn []string
585         var numNotFound int
586         for _, vol := range KeepVM.AllWritable() {
587                 err := vol.Untrash(hash)
588
589                 if os.IsNotExist(err) {
590                         numNotFound++
591                 } else if err != nil {
592                         log.Printf("Error untrashing %v on volume %v", hash, vol.String())
593                         failedOn = append(failedOn, vol.String())
594                 } else {
595                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
596                         untrashedOn = append(untrashedOn, vol.String())
597                 }
598         }
599
600         if numNotFound == len(KeepVM.AllWritable()) {
601                 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
602                 return
603         }
604
605         if len(failedOn) == len(KeepVM.AllWritable()) {
606                 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
607         } else {
608                 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
609                 if len(failedOn) > 0 {
610                         respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
611                 }
612                 resp.Write([]byte(respBody))
613         }
614 }
615
616 // GetBlock and PutBlock implement lower-level code for handling
617 // blocks by rooting through volumes connected to the local machine.
618 // Once the handler has determined that system policy permits the
619 // request, it calls these methods to perform the actual operation.
620 //
621 // TODO(twp): this code would probably be better located in the
622 // VolumeManager interface. As an abstraction, the VolumeManager
623 // should be the only part of the code that cares about which volume a
624 // block is stored on, so it should be responsible for figuring out
625 // which volume to check for fetching blocks, storing blocks, etc.
626
627 // GetBlock fetches the block identified by "hash" into the provided
628 // buf, and returns the data size.
629 //
630 // If the block cannot be found on any volume, returns NotFoundError.
631 //
632 // If the block found does not have the correct MD5 hash, returns
633 // DiskHashError.
634 //
635 func GetBlock(ctx context.Context, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
636         // Attempt to read the requested hash from a keep volume.
637         errorToCaller := NotFoundError
638
639         for _, vol := range KeepVM.AllReadable() {
640                 size, err := vol.Get(ctx, hash, buf)
641                 select {
642                 case <-ctx.Done():
643                         return 0, ErrClientDisconnect
644                 default:
645                 }
646                 if err != nil {
647                         // IsNotExist is an expected error and may be
648                         // ignored. All other errors are logged. In
649                         // any case we continue trying to read other
650                         // volumes. If all volumes report IsNotExist,
651                         // we return a NotFoundError.
652                         if !os.IsNotExist(err) {
653                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
654                         }
655                         continue
656                 }
657                 // Check the file checksum.
658                 //
659                 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
660                 if filehash != hash {
661                         // TODO: Try harder to tell a sysadmin about
662                         // this.
663                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
664                                 vol, hash, filehash)
665                         errorToCaller = DiskHashError
666                         continue
667                 }
668                 if errorToCaller == DiskHashError {
669                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
670                                 vol, hash)
671                 }
672                 return size, nil
673         }
674         return 0, errorToCaller
675 }
676
677 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
678 //
679 // PutBlock(ctx, block, hash)
680 //   Stores the BLOCK (identified by the content id HASH) in Keep.
681 //
682 //   The MD5 checksum of the block must be identical to the content id HASH.
683 //   If not, an error is returned.
684 //
685 //   PutBlock stores the BLOCK on the first Keep volume with free space.
686 //   A failure code is returned to the user only if all volumes fail.
687 //
688 //   On success, PutBlock returns nil.
689 //   On failure, it returns a KeepError with one of the following codes:
690 //
691 //   500 Collision
692 //          A different block with the same hash already exists on this
693 //          Keep server.
694 //   422 MD5Fail
695 //          The MD5 hash of the BLOCK does not match the argument HASH.
696 //   503 Full
697 //          There was not enough space left in any Keep volume to store
698 //          the object.
699 //   500 Fail
700 //          The object could not be stored for some other reason (e.g.
701 //          all writes failed). The text of the error message should
702 //          provide as much detail as possible.
703 //
704 func PutBlock(ctx context.Context, block []byte, hash string) (int, error) {
705         // Check that BLOCK's checksum matches HASH.
706         blockhash := fmt.Sprintf("%x", md5.Sum(block))
707         if blockhash != hash {
708                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
709                 return 0, RequestHashError
710         }
711
712         // If we already have this data, it's intact on disk, and we
713         // can update its timestamp, return success. If we have
714         // different data with the same hash, return failure.
715         if n, err := CompareAndTouch(ctx, hash, block); err == nil || err == CollisionError {
716                 return n, err
717         } else if ctx.Err() != nil {
718                 return 0, ErrClientDisconnect
719         }
720
721         // Choose a Keep volume to write to.
722         // If this volume fails, try all of the volumes in order.
723         if vol := KeepVM.NextWritable(); vol != nil {
724                 if err := vol.Put(ctx, hash, block); err == nil {
725                         return vol.Replication(), nil // success!
726                 }
727                 if ctx.Err() != nil {
728                         return 0, ErrClientDisconnect
729                 }
730         }
731
732         writables := KeepVM.AllWritable()
733         if len(writables) == 0 {
734                 log.Print("No writable volumes.")
735                 return 0, FullError
736         }
737
738         allFull := true
739         for _, vol := range writables {
740                 err := vol.Put(ctx, hash, block)
741                 if ctx.Err() != nil {
742                         return 0, ErrClientDisconnect
743                 }
744                 if err == nil {
745                         return vol.Replication(), nil // success!
746                 }
747                 if err != FullError {
748                         // The volume is not full but the
749                         // write did not succeed.  Report the
750                         // error and continue trying.
751                         allFull = false
752                         log.Printf("%s: Write(%s): %s", vol, hash, err)
753                 }
754         }
755
756         if allFull {
757                 log.Print("All volumes are full.")
758                 return 0, FullError
759         }
760         // Already logged the non-full errors.
761         return 0, GenericError
762 }
763
764 // CompareAndTouch returns the current replication level if one of the
765 // volumes already has the given content and it successfully updates
766 // the relevant block's modification time in order to protect it from
767 // premature garbage collection. Otherwise, it returns a non-nil
768 // error.
769 func CompareAndTouch(ctx context.Context, hash string, buf []byte) (int, error) {
770         var bestErr error = NotFoundError
771         for _, vol := range KeepVM.AllWritable() {
772                 err := vol.Compare(ctx, hash, buf)
773                 if ctx.Err() != nil {
774                         return 0, ctx.Err()
775                 } else if err == CollisionError {
776                         // Stop if we have a block with same hash but
777                         // different content. (It will be impossible
778                         // to tell which one is wanted if we have
779                         // both, so there's no point writing it even
780                         // on a different volume.)
781                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
782                         return 0, err
783                 } else if os.IsNotExist(err) {
784                         // Block does not exist. This is the only
785                         // "normal" error: we don't log anything.
786                         continue
787                 } else if err != nil {
788                         // Couldn't open file, data is corrupt on
789                         // disk, etc.: log this abnormal condition,
790                         // and try the next volume.
791                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
792                         continue
793                 }
794                 if err := vol.Touch(hash); err != nil {
795                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
796                         bestErr = err
797                         continue
798                 }
799                 // Compare and Touch both worked --> done.
800                 return vol.Replication(), nil
801         }
802         return 0, bestErr
803 }
804
805 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
806
807 // IsValidLocator returns true if the specified string is a valid Keep locator.
808 //   When Keep is extended to support hash types other than MD5,
809 //   this should be updated to cover those as well.
810 //
811 func IsValidLocator(loc string) bool {
812         return validLocatorRe.MatchString(loc)
813 }
814
815 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
816
817 // GetAPIToken returns the OAuth2 token from the Authorization
818 // header of a HTTP request, or an empty string if no matching
819 // token is found.
820 func GetAPIToken(req *http.Request) string {
821         if auth, ok := req.Header["Authorization"]; ok {
822                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
823                         return match[1]
824                 }
825         }
826         return ""
827 }
828
829 // IsExpired returns true if the given Unix timestamp (expressed as a
830 // hexadecimal string) is in the past, or if timestampHex cannot be
831 // parsed as a hexadecimal string.
832 func IsExpired(timestampHex string) bool {
833         ts, err := strconv.ParseInt(timestampHex, 16, 0)
834         if err != nil {
835                 log.Printf("IsExpired: %s", err)
836                 return true
837         }
838         return time.Unix(ts, 0).Before(time.Now())
839 }
840
841 // CanDelete returns true if the user identified by apiToken is
842 // allowed to delete blocks.
843 func CanDelete(apiToken string) bool {
844         if apiToken == "" {
845                 return false
846         }
847         // Blocks may be deleted only when Keep has been configured with a
848         // data manager.
849         if IsSystemAuth(apiToken) {
850                 return true
851         }
852         // TODO(twp): look up apiToken with the API server
853         // return true if is_admin is true and if the token
854         // has unlimited scope
855         return false
856 }
857
858 // IsSystemAuth returns true if the given token is allowed to perform
859 // system level actions like deleting data.
860 func IsSystemAuth(token string) bool {
861         return token != "" && token == theConfig.systemAuthToken
862 }