6260: Pull entire status object out of WorkQueue atomically.
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "bytes"
12         "container/list"
13         "crypto/md5"
14         "encoding/json"
15         "fmt"
16         "github.com/gorilla/mux"
17         "io"
18         "log"
19         "net/http"
20         "os"
21         "regexp"
22         "runtime"
23         "strconv"
24         "sync"
25         "time"
26 )
27
28 // MakeRESTRouter returns a new mux.Router that forwards all Keep
29 // requests to the appropriate handlers.
30 //
31 func MakeRESTRouter() *mux.Router {
32         rest := mux.NewRouter()
33
34         rest.HandleFunc(
35                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
36         rest.HandleFunc(
37                 `/{hash:[0-9a-f]{32}}+{hints}`,
38                 GetBlockHandler).Methods("GET", "HEAD")
39
40         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
41         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
42         // List all blocks stored here. Privileged client only.
43         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
44         // List blocks stored here whose hash has the given prefix.
45         // Privileged client only.
46         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
47
48         // List volumes: path, device number, bytes used/avail.
49         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
50
51         // Replace the current pull queue.
52         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
53
54         // Replace the current trash queue.
55         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
56
57         // Any request which does not match any of these routes gets
58         // 400 Bad Request.
59         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
60
61         return rest
62 }
63
64 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
65         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
66 }
67
68 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
69         if enforce_permissions {
70                 locator := req.URL.Path[1:] // strip leading slash
71                 if err := VerifySignature(locator, GetApiToken(req)); err != nil {
72                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
73                         return
74                 }
75         }
76
77         block, err := GetBlock(mux.Vars(req)["hash"], false)
78         if err != nil {
79                 // This type assertion is safe because the only errors
80                 // GetBlock can return are DiskHashError or NotFoundError.
81                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
82                 return
83         }
84         defer bufs.Put(block)
85
86         resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
87         resp.Header().Set("Content-Type", "application/octet-stream")
88         resp.Write(block)
89 }
90
91 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
92         hash := mux.Vars(req)["hash"]
93
94         // Detect as many error conditions as possible before reading
95         // the body: avoid transmitting data that will not end up
96         // being written anyway.
97
98         if req.ContentLength == -1 {
99                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
100                 return
101         }
102
103         if req.ContentLength > BLOCKSIZE {
104                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
105                 return
106         }
107
108         if len(KeepVM.AllWritable()) == 0 {
109                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
110                 return
111         }
112
113         buf := bufs.Get(int(req.ContentLength))
114         _, err := io.ReadFull(req.Body, buf)
115         if err != nil {
116                 http.Error(resp, err.Error(), 500)
117                 bufs.Put(buf)
118                 return
119         }
120
121         err = PutBlock(buf, hash)
122         bufs.Put(buf)
123
124         if err != nil {
125                 ke := err.(*KeepError)
126                 http.Error(resp, ke.Error(), ke.HTTPCode)
127                 return
128         }
129
130         // Success; add a size hint, sign the locator if possible, and
131         // return it to the client.
132         return_hash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
133         api_token := GetApiToken(req)
134         if PermissionSecret != nil && api_token != "" {
135                 expiry := time.Now().Add(blob_signature_ttl)
136                 return_hash = SignLocator(return_hash, api_token, expiry)
137         }
138         resp.Write([]byte(return_hash + "\n"))
139 }
140
141 // IndexHandler
142 //     A HandleFunc to address /index and /index/{prefix} requests.
143 //
144 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
145         // Reject unauthorized requests.
146         if !IsDataManagerToken(GetApiToken(req)) {
147                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
148                 return
149         }
150
151         prefix := mux.Vars(req)["prefix"]
152
153         for _, vol := range KeepVM.AllReadable() {
154                 if err := vol.IndexTo(prefix, resp); err != nil {
155                         // The only errors returned by IndexTo are
156                         // write errors returned by resp.Write(),
157                         // which probably means the client has
158                         // disconnected and this error will never be
159                         // reported to the client -- but it will
160                         // appear in our own error log.
161                         http.Error(resp, err.Error(), http.StatusInternalServerError)
162                         return
163                 }
164         }
165         // An empty line at EOF is the only way the client can be
166         // assured the entire index was received.
167         resp.Write([]byte{'\n'})
168 }
169
170 // StatusHandler
171 //     Responds to /status.json requests with the current node status,
172 //     described in a JSON structure.
173 //
174 //     The data given in a status.json response includes:
175 //        volumes - a list of Keep volumes currently in use by this server
176 //          each volume is an object with the following fields:
177 //            * mount_point
178 //            * device_num (an integer identifying the underlying filesystem)
179 //            * bytes_free
180 //            * bytes_used
181 //
182 type VolumeStatus struct {
183         MountPoint string `json:"mount_point"`
184         DeviceNum  uint64 `json:"device_num"`
185         BytesFree  uint64 `json:"bytes_free"`
186         BytesUsed  uint64 `json:"bytes_used"`
187 }
188
189 type PoolStatus struct {
190         Alloc uint64 `json:"BytesAllocated"`
191         Cap   int    `json:"BuffersMax"`
192         Len   int    `json:"BuffersInUse"`
193 }
194
195 type NodeStatus struct {
196         Volumes    []*VolumeStatus `json:"volumes"`
197         BufferPool PoolStatus
198         PullQueue  WorkQueueStatus
199         TrashQueue WorkQueueStatus
200         Memory     runtime.MemStats
201 }
202
203 var st NodeStatus
204 var stLock sync.Mutex
205
206 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
207         stLock.Lock()
208         readNodeStatus(&st)
209         jstat, err := json.Marshal(&st)
210         stLock.Unlock()
211         if err == nil {
212                 resp.Write(jstat)
213         } else {
214                 log.Printf("json.Marshal: %s\n", err)
215                 log.Printf("NodeStatus = %v\n", &st)
216                 http.Error(resp, err.Error(), 500)
217         }
218 }
219
220 // populate the given NodeStatus struct with current values.
221 func readNodeStatus(st *NodeStatus) {
222         vols := KeepVM.AllReadable()
223         if cap(st.Volumes) < len(vols) {
224                 st.Volumes = make([]*VolumeStatus, len(vols))
225         }
226         st.Volumes = st.Volumes[:0]
227         for _, vol := range vols {
228                 if s := vol.Status(); s != nil {
229                         st.Volumes = append(st.Volumes, s)
230                 }
231         }
232         st.BufferPool.Alloc = bufs.Alloc()
233         st.BufferPool.Cap = bufs.Cap()
234         st.BufferPool.Len = bufs.Len()
235         st.PullQueue = getWorkQueueStatus(pullq)
236         st.TrashQueue = getWorkQueueStatus(trashq)
237         runtime.ReadMemStats(&st.Memory)
238 }
239
240 // return a WorkQueueStatus for the given queue. If q is nil (which
241 // should never happen except in test suites), return a zero status
242 // value instead of crashing.
243 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
244         if q == nil {
245                 // This should only happen during tests.
246                 return WorkQueueStatus{}
247         }
248         return q.Status()
249 }
250
251 // DeleteHandler processes DELETE requests.
252 //
253 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
254 // from all connected volumes.
255 //
256 // Only the Data Manager, or an Arvados admin with scope "all", are
257 // allowed to issue DELETE requests.  If a DELETE request is not
258 // authenticated or is issued by a non-admin user, the server returns
259 // a PermissionError.
260 //
261 // Upon receiving a valid request from an authorized user,
262 // DeleteHandler deletes all copies of the specified block on local
263 // writable volumes.
264 //
265 // Response format:
266 //
267 // If the requested blocks was not found on any volume, the response
268 // code is HTTP 404 Not Found.
269 //
270 // Otherwise, the response code is 200 OK, with a response body
271 // consisting of the JSON message
272 //
273 //    {"copies_deleted":d,"copies_failed":f}
274 //
275 // where d and f are integers representing the number of blocks that
276 // were successfully and unsuccessfully deleted.
277 //
278 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
279         hash := mux.Vars(req)["hash"]
280
281         // Confirm that this user is an admin and has a token with unlimited scope.
282         var tok = GetApiToken(req)
283         if tok == "" || !CanDelete(tok) {
284                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
285                 return
286         }
287
288         if never_delete {
289                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
290                 return
291         }
292
293         // Delete copies of this block from all available volumes.
294         // Report how many blocks were successfully deleted, and how
295         // many were found on writable volumes but not deleted.
296         var result struct {
297                 Deleted int `json:"copies_deleted"`
298                 Failed  int `json:"copies_failed"`
299         }
300         for _, vol := range KeepVM.AllWritable() {
301                 if err := vol.Delete(hash); err == nil {
302                         result.Deleted++
303                 } else if os.IsNotExist(err) {
304                         continue
305                 } else {
306                         result.Failed++
307                         log.Println("DeleteHandler:", err)
308                 }
309         }
310
311         var st int
312
313         if result.Deleted == 0 && result.Failed == 0 {
314                 st = http.StatusNotFound
315         } else {
316                 st = http.StatusOK
317         }
318
319         resp.WriteHeader(st)
320
321         if st == http.StatusOK {
322                 if body, err := json.Marshal(result); err == nil {
323                         resp.Write(body)
324                 } else {
325                         log.Printf("json.Marshal: %s (result = %v)\n", err, result)
326                         http.Error(resp, err.Error(), 500)
327                 }
328         }
329 }
330
331 /* PullHandler processes "PUT /pull" requests for the data manager.
332    The request body is a JSON message containing a list of pull
333    requests in the following format:
334
335    [
336       {
337          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
338          "servers":[
339                         "keep0.qr1hi.arvadosapi.com:25107",
340                         "keep1.qr1hi.arvadosapi.com:25108"
341                  ]
342           },
343           {
344                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
345                  "servers":[
346                         "10.0.1.5:25107",
347                         "10.0.1.6:25107",
348                         "10.0.1.7:25108"
349                  ]
350           },
351           ...
352    ]
353
354    Each pull request in the list consists of a block locator string
355    and an ordered list of servers.  Keepstore should try to fetch the
356    block from each server in turn.
357
358    If the request has not been sent by the Data Manager, return 401
359    Unauthorized.
360
361    If the JSON unmarshalling fails, return 400 Bad Request.
362 */
363
364 type PullRequest struct {
365         Locator string   `json:"locator"`
366         Servers []string `json:"servers"`
367 }
368
369 func PullHandler(resp http.ResponseWriter, req *http.Request) {
370         // Reject unauthorized requests.
371         if !IsDataManagerToken(GetApiToken(req)) {
372                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
373                 return
374         }
375
376         // Parse the request body.
377         var pr []PullRequest
378         r := json.NewDecoder(req.Body)
379         if err := r.Decode(&pr); err != nil {
380                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
381                 return
382         }
383
384         // We have a properly formatted pull list sent from the data
385         // manager.  Report success and send the list to the pull list
386         // manager for further handling.
387         resp.WriteHeader(http.StatusOK)
388         resp.Write([]byte(
389                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
390
391         plist := list.New()
392         for _, p := range pr {
393                 plist.PushBack(p)
394         }
395         pullq.ReplaceQueue(plist)
396 }
397
398 type TrashRequest struct {
399         Locator    string `json:"locator"`
400         BlockMtime int64  `json:"block_mtime"`
401 }
402
403 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
404         // Reject unauthorized requests.
405         if !IsDataManagerToken(GetApiToken(req)) {
406                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
407                 return
408         }
409
410         // Parse the request body.
411         var trash []TrashRequest
412         r := json.NewDecoder(req.Body)
413         if err := r.Decode(&trash); err != nil {
414                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
415                 return
416         }
417
418         // We have a properly formatted trash list sent from the data
419         // manager.  Report success and send the list to the trash work
420         // queue for further handling.
421         resp.WriteHeader(http.StatusOK)
422         resp.Write([]byte(
423                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
424
425         tlist := list.New()
426         for _, t := range trash {
427                 tlist.PushBack(t)
428         }
429         trashq.ReplaceQueue(tlist)
430 }
431
432 // ==============================
433 // GetBlock and PutBlock implement lower-level code for handling
434 // blocks by rooting through volumes connected to the local machine.
435 // Once the handler has determined that system policy permits the
436 // request, it calls these methods to perform the actual operation.
437 //
438 // TODO(twp): this code would probably be better located in the
439 // VolumeManager interface. As an abstraction, the VolumeManager
440 // should be the only part of the code that cares about which volume a
441 // block is stored on, so it should be responsible for figuring out
442 // which volume to check for fetching blocks, storing blocks, etc.
443
444 // ==============================
445 // GetBlock fetches and returns the block identified by "hash".  If
446 // the update_timestamp argument is true, GetBlock also updates the
447 // block's file modification time (for the sake of PutBlock, which
448 // must update the file's timestamp when the block already exists).
449 //
450 // On success, GetBlock returns a byte slice with the block data, and
451 // a nil error.
452 //
453 // If the block cannot be found on any volume, returns NotFoundError.
454 //
455 // If the block found does not have the correct MD5 hash, returns
456 // DiskHashError.
457 //
458
459 func GetBlock(hash string, update_timestamp bool) ([]byte, error) {
460         // Attempt to read the requested hash from a keep volume.
461         error_to_caller := NotFoundError
462
463         var vols []Volume
464         if update_timestamp {
465                 // Pointless to find the block on an unwritable volume
466                 // because Touch() will fail -- this is as good as
467                 // "not found" for purposes of callers who need to
468                 // update_timestamp.
469                 vols = KeepVM.AllWritable()
470         } else {
471                 vols = KeepVM.AllReadable()
472         }
473
474         for _, vol := range vols {
475                 buf, err := vol.Get(hash)
476                 if err != nil {
477                         // IsNotExist is an expected error and may be
478                         // ignored. All other errors are logged. In
479                         // any case we continue trying to read other
480                         // volumes. If all volumes report IsNotExist,
481                         // we return a NotFoundError.
482                         if !os.IsNotExist(err) {
483                                 log.Printf("GetBlock: reading %s: %s\n", hash, err)
484                         }
485                         continue
486                 }
487                 // Check the file checksum.
488                 //
489                 filehash := fmt.Sprintf("%x", md5.Sum(buf))
490                 if filehash != hash {
491                         // TODO: Try harder to tell a sysadmin about
492                         // this.
493                         log.Printf("%s: checksum mismatch for request %s (actual %s)\n",
494                                 vol, hash, filehash)
495                         error_to_caller = DiskHashError
496                         bufs.Put(buf)
497                         continue
498                 }
499                 if error_to_caller == DiskHashError {
500                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
501                                 vol, hash)
502                 }
503                 if update_timestamp {
504                         if err := vol.Touch(hash); err != nil {
505                                 error_to_caller = GenericError
506                                 log.Printf("%s: Touch %s failed: %s",
507                                         vol, hash, error_to_caller)
508                                 bufs.Put(buf)
509                                 continue
510                         }
511                 }
512                 return buf, nil
513         }
514         return nil, error_to_caller
515 }
516
517 /* PutBlock(block, hash)
518    Stores the BLOCK (identified by the content id HASH) in Keep.
519
520    The MD5 checksum of the block must be identical to the content id HASH.
521    If not, an error is returned.
522
523    PutBlock stores the BLOCK on the first Keep volume with free space.
524    A failure code is returned to the user only if all volumes fail.
525
526    On success, PutBlock returns nil.
527    On failure, it returns a KeepError with one of the following codes:
528
529    500 Collision
530           A different block with the same hash already exists on this
531           Keep server.
532    422 MD5Fail
533           The MD5 hash of the BLOCK does not match the argument HASH.
534    503 Full
535           There was not enough space left in any Keep volume to store
536           the object.
537    500 Fail
538           The object could not be stored for some other reason (e.g.
539           all writes failed). The text of the error message should
540           provide as much detail as possible.
541 */
542
543 func PutBlock(block []byte, hash string) error {
544         // Check that BLOCK's checksum matches HASH.
545         blockhash := fmt.Sprintf("%x", md5.Sum(block))
546         if blockhash != hash {
547                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
548                 return RequestHashError
549         }
550
551         // If we already have a block on disk under this identifier, return
552         // success (but check for MD5 collisions).  While fetching the block,
553         // update its timestamp.
554         // The only errors that GetBlock can return are DiskHashError and NotFoundError.
555         // In either case, we want to write our new (good) block to disk,
556         // so there is nothing special to do if err != nil.
557         //
558         if oldblock, err := GetBlock(hash, true); err == nil {
559                 defer bufs.Put(oldblock)
560                 if bytes.Compare(block, oldblock) == 0 {
561                         // The block already exists; return success.
562                         return nil
563                 } else {
564                         return CollisionError
565                 }
566         }
567
568         // Choose a Keep volume to write to.
569         // If this volume fails, try all of the volumes in order.
570         if vol := KeepVM.NextWritable(); vol != nil {
571                 if err := vol.Put(hash, block); err == nil {
572                         return nil // success!
573                 }
574         }
575
576         writables := KeepVM.AllWritable()
577         if len(writables) == 0 {
578                 log.Print("No writable volumes.")
579                 return FullError
580         }
581
582         allFull := true
583         for _, vol := range writables {
584                 err := vol.Put(hash, block)
585                 if err == nil {
586                         return nil // success!
587                 }
588                 if err != FullError {
589                         // The volume is not full but the
590                         // write did not succeed.  Report the
591                         // error and continue trying.
592                         allFull = false
593                         log.Printf("%s: Write(%s): %s\n", vol, hash, err)
594                 }
595         }
596
597         if allFull {
598                 log.Print("All volumes are full.")
599                 return FullError
600         } else {
601                 // Already logged the non-full errors.
602                 return GenericError
603         }
604 }
605
606 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
607
608 // IsValidLocator
609 //     Return true if the specified string is a valid Keep locator.
610 //     When Keep is extended to support hash types other than MD5,
611 //     this should be updated to cover those as well.
612 //
613 func IsValidLocator(loc string) bool {
614         return validLocatorRe.MatchString(loc)
615 }
616
617 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
618
619 // GetApiToken returns the OAuth2 token from the Authorization
620 // header of a HTTP request, or an empty string if no matching
621 // token is found.
622 func GetApiToken(req *http.Request) string {
623         if auth, ok := req.Header["Authorization"]; ok {
624                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
625                         return match[1]
626                 }
627         }
628         return ""
629 }
630
631 // IsExpired returns true if the given Unix timestamp (expressed as a
632 // hexadecimal string) is in the past, or if timestamp_hex cannot be
633 // parsed as a hexadecimal string.
634 func IsExpired(timestamp_hex string) bool {
635         ts, err := strconv.ParseInt(timestamp_hex, 16, 0)
636         if err != nil {
637                 log.Printf("IsExpired: %s\n", err)
638                 return true
639         }
640         return time.Unix(ts, 0).Before(time.Now())
641 }
642
643 // CanDelete returns true if the user identified by api_token is
644 // allowed to delete blocks.
645 func CanDelete(api_token string) bool {
646         if api_token == "" {
647                 return false
648         }
649         // Blocks may be deleted only when Keep has been configured with a
650         // data manager.
651         if IsDataManagerToken(api_token) {
652                 return true
653         }
654         // TODO(twp): look up api_token with the API server
655         // return true if is_admin is true and if the token
656         // has unlimited scope
657         return false
658 }
659
660 // IsDataManagerToken returns true if api_token represents the data
661 // manager's token.
662 func IsDataManagerToken(api_token string) bool {
663         return data_manager_token != "" && api_token == data_manager_token
664 }