Merge branch 'master' into 7162-support-service-types
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "container/list"
12         "crypto/md5"
13         "encoding/json"
14         "fmt"
15         "github.com/gorilla/mux"
16         "io"
17         "log"
18         "net/http"
19         "os"
20         "regexp"
21         "runtime"
22         "strconv"
23         "sync"
24         "time"
25 )
26
27 // MakeRESTRouter returns a new mux.Router that forwards all Keep
28 // requests to the appropriate handlers.
29 //
30 func MakeRESTRouter() *mux.Router {
31         rest := mux.NewRouter()
32
33         rest.HandleFunc(
34                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
35         rest.HandleFunc(
36                 `/{hash:[0-9a-f]{32}}+{hints}`,
37                 GetBlockHandler).Methods("GET", "HEAD")
38
39         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
40         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
41         // List all blocks stored here. Privileged client only.
42         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
43         // List blocks stored here whose hash has the given prefix.
44         // Privileged client only.
45         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
46
47         // List volumes: path, device number, bytes used/avail.
48         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
49
50         // Replace the current pull queue.
51         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
52
53         // Replace the current trash queue.
54         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
55
56         // Any request which does not match any of these routes gets
57         // 400 Bad Request.
58         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
59
60         return rest
61 }
62
63 // BadRequestHandler is a HandleFunc to address bad requests.
64 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
65         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
66 }
67
68 // GetBlockHandler is a HandleFunc to address Get block requests.
69 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
70         if enforcePermissions {
71                 locator := req.URL.Path[1:] // strip leading slash
72                 if err := VerifySignature(locator, GetApiToken(req)); err != nil {
73                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
74                         return
75                 }
76         }
77
78         block, err := GetBlock(mux.Vars(req)["hash"])
79         if err != nil {
80                 // This type assertion is safe because the only errors
81                 // GetBlock can return are DiskHashError or NotFoundError.
82                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
83                 return
84         }
85         defer bufs.Put(block)
86
87         resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
88         resp.Header().Set("Content-Type", "application/octet-stream")
89         resp.Write(block)
90 }
91
92 // PutBlockHandler is a HandleFunc to address Put block requests.
93 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
94         hash := mux.Vars(req)["hash"]
95
96         // Detect as many error conditions as possible before reading
97         // the body: avoid transmitting data that will not end up
98         // being written anyway.
99
100         if req.ContentLength == -1 {
101                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
102                 return
103         }
104
105         if req.ContentLength > BlockSize {
106                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
107                 return
108         }
109
110         if len(KeepVM.AllWritable()) == 0 {
111                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
112                 return
113         }
114
115         buf := bufs.Get(int(req.ContentLength))
116         _, err := io.ReadFull(req.Body, buf)
117         if err != nil {
118                 http.Error(resp, err.Error(), 500)
119                 bufs.Put(buf)
120                 return
121         }
122
123         err = PutBlock(buf, hash)
124         bufs.Put(buf)
125
126         if err != nil {
127                 ke := err.(*KeepError)
128                 http.Error(resp, ke.Error(), ke.HTTPCode)
129                 return
130         }
131
132         // Success; add a size hint, sign the locator if possible, and
133         // return it to the client.
134         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
135         apiToken := GetApiToken(req)
136         if PermissionSecret != nil && apiToken != "" {
137                 expiry := time.Now().Add(blobSignatureTTL)
138                 returnHash = SignLocator(returnHash, apiToken, expiry)
139         }
140         resp.Write([]byte(returnHash + "\n"))
141 }
142
143 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
144 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
145         // Reject unauthorized requests.
146         if !IsDataManagerToken(GetApiToken(req)) {
147                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
148                 return
149         }
150
151         prefix := mux.Vars(req)["prefix"]
152
153         for _, vol := range KeepVM.AllReadable() {
154                 if err := vol.IndexTo(prefix, resp); err != nil {
155                         // The only errors returned by IndexTo are
156                         // write errors returned by resp.Write(),
157                         // which probably means the client has
158                         // disconnected and this error will never be
159                         // reported to the client -- but it will
160                         // appear in our own error log.
161                         http.Error(resp, err.Error(), http.StatusInternalServerError)
162                         return
163                 }
164         }
165         // An empty line at EOF is the only way the client can be
166         // assured the entire index was received.
167         resp.Write([]byte{'\n'})
168 }
169
170 // StatusHandler
171 //     Responds to /status.json requests with the current node status,
172 //     described in a JSON structure.
173 //
174 //     The data given in a status.json response includes:
175 //        volumes - a list of Keep volumes currently in use by this server
176 //          each volume is an object with the following fields:
177 //            * mount_point
178 //            * device_num (an integer identifying the underlying filesystem)
179 //            * bytes_free
180 //            * bytes_used
181
182 // PoolStatus struct
183 type PoolStatus struct {
184         Alloc uint64 `json:"BytesAllocated"`
185         Cap   int    `json:"BuffersMax"`
186         Len   int    `json:"BuffersInUse"`
187 }
188
189 // NodeStatus struct
190 type NodeStatus struct {
191         Volumes    []*VolumeStatus `json:"volumes"`
192         BufferPool PoolStatus
193         PullQueue  WorkQueueStatus
194         TrashQueue WorkQueueStatus
195         Memory     runtime.MemStats
196 }
197
198 var st NodeStatus
199 var stLock sync.Mutex
200
201 // StatusHandler addresses /status.json requests.
202 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
203         stLock.Lock()
204         readNodeStatus(&st)
205         jstat, err := json.Marshal(&st)
206         stLock.Unlock()
207         if err == nil {
208                 resp.Write(jstat)
209         } else {
210                 log.Printf("json.Marshal: %s", err)
211                 log.Printf("NodeStatus = %v", &st)
212                 http.Error(resp, err.Error(), 500)
213         }
214 }
215
216 // populate the given NodeStatus struct with current values.
217 func readNodeStatus(st *NodeStatus) {
218         vols := KeepVM.AllReadable()
219         if cap(st.Volumes) < len(vols) {
220                 st.Volumes = make([]*VolumeStatus, len(vols))
221         }
222         st.Volumes = st.Volumes[:0]
223         for _, vol := range vols {
224                 if s := vol.Status(); s != nil {
225                         st.Volumes = append(st.Volumes, s)
226                 }
227         }
228         st.BufferPool.Alloc = bufs.Alloc()
229         st.BufferPool.Cap = bufs.Cap()
230         st.BufferPool.Len = bufs.Len()
231         st.PullQueue = getWorkQueueStatus(pullq)
232         st.TrashQueue = getWorkQueueStatus(trashq)
233         runtime.ReadMemStats(&st.Memory)
234 }
235
236 // return a WorkQueueStatus for the given queue. If q is nil (which
237 // should never happen except in test suites), return a zero status
238 // value instead of crashing.
239 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
240         if q == nil {
241                 // This should only happen during tests.
242                 return WorkQueueStatus{}
243         }
244         return q.Status()
245 }
246
247 // DeleteHandler processes DELETE requests.
248 //
249 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
250 // from all connected volumes.
251 //
252 // Only the Data Manager, or an Arvados admin with scope "all", are
253 // allowed to issue DELETE requests.  If a DELETE request is not
254 // authenticated or is issued by a non-admin user, the server returns
255 // a PermissionError.
256 //
257 // Upon receiving a valid request from an authorized user,
258 // DeleteHandler deletes all copies of the specified block on local
259 // writable volumes.
260 //
261 // Response format:
262 //
263 // If the requested blocks was not found on any volume, the response
264 // code is HTTP 404 Not Found.
265 //
266 // Otherwise, the response code is 200 OK, with a response body
267 // consisting of the JSON message
268 //
269 //    {"copies_deleted":d,"copies_failed":f}
270 //
271 // where d and f are integers representing the number of blocks that
272 // were successfully and unsuccessfully deleted.
273 //
274 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
275         hash := mux.Vars(req)["hash"]
276
277         // Confirm that this user is an admin and has a token with unlimited scope.
278         var tok = GetApiToken(req)
279         if tok == "" || !CanDelete(tok) {
280                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
281                 return
282         }
283
284         if neverDelete {
285                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
286                 return
287         }
288
289         // Delete copies of this block from all available volumes.
290         // Report how many blocks were successfully deleted, and how
291         // many were found on writable volumes but not deleted.
292         var result struct {
293                 Deleted int `json:"copies_deleted"`
294                 Failed  int `json:"copies_failed"`
295         }
296         for _, vol := range KeepVM.AllWritable() {
297                 if err := vol.Delete(hash); err == nil {
298                         result.Deleted++
299                 } else if os.IsNotExist(err) {
300                         continue
301                 } else {
302                         result.Failed++
303                         log.Println("DeleteHandler:", err)
304                 }
305         }
306
307         var st int
308
309         if result.Deleted == 0 && result.Failed == 0 {
310                 st = http.StatusNotFound
311         } else {
312                 st = http.StatusOK
313         }
314
315         resp.WriteHeader(st)
316
317         if st == http.StatusOK {
318                 if body, err := json.Marshal(result); err == nil {
319                         resp.Write(body)
320                 } else {
321                         log.Printf("json.Marshal: %s (result = %v)", err, result)
322                         http.Error(resp, err.Error(), 500)
323                 }
324         }
325 }
326
327 /* PullHandler processes "PUT /pull" requests for the data manager.
328    The request body is a JSON message containing a list of pull
329    requests in the following format:
330
331    [
332       {
333          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
334          "servers":[
335                         "keep0.qr1hi.arvadosapi.com:25107",
336                         "keep1.qr1hi.arvadosapi.com:25108"
337                  ]
338           },
339           {
340                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
341                  "servers":[
342                         "10.0.1.5:25107",
343                         "10.0.1.6:25107",
344                         "10.0.1.7:25108"
345                  ]
346           },
347           ...
348    ]
349
350    Each pull request in the list consists of a block locator string
351    and an ordered list of servers.  Keepstore should try to fetch the
352    block from each server in turn.
353
354    If the request has not been sent by the Data Manager, return 401
355    Unauthorized.
356
357    If the JSON unmarshalling fails, return 400 Bad Request.
358 */
359
360 // PullRequest consists of a block locator and an ordered list of servers
361 type PullRequest struct {
362         Locator string   `json:"locator"`
363         Servers []string `json:"servers"`
364 }
365
366 // PullHandler processes "PUT /pull" requests for the data manager.
367 func PullHandler(resp http.ResponseWriter, req *http.Request) {
368         // Reject unauthorized requests.
369         if !IsDataManagerToken(GetApiToken(req)) {
370                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
371                 return
372         }
373
374         // Parse the request body.
375         var pr []PullRequest
376         r := json.NewDecoder(req.Body)
377         if err := r.Decode(&pr); err != nil {
378                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
379                 return
380         }
381
382         // We have a properly formatted pull list sent from the data
383         // manager.  Report success and send the list to the pull list
384         // manager for further handling.
385         resp.WriteHeader(http.StatusOK)
386         resp.Write([]byte(
387                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
388
389         plist := list.New()
390         for _, p := range pr {
391                 plist.PushBack(p)
392         }
393         pullq.ReplaceQueue(plist)
394 }
395
396 // TrashRequest consists of a block locator and it's Mtime
397 type TrashRequest struct {
398         Locator    string `json:"locator"`
399         BlockMtime int64  `json:"block_mtime"`
400 }
401
402 // TrashHandler processes /trash requests.
403 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
404         // Reject unauthorized requests.
405         if !IsDataManagerToken(GetApiToken(req)) {
406                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
407                 return
408         }
409
410         // Parse the request body.
411         var trash []TrashRequest
412         r := json.NewDecoder(req.Body)
413         if err := r.Decode(&trash); err != nil {
414                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
415                 return
416         }
417
418         // We have a properly formatted trash list sent from the data
419         // manager.  Report success and send the list to the trash work
420         // queue for further handling.
421         resp.WriteHeader(http.StatusOK)
422         resp.Write([]byte(
423                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
424
425         tlist := list.New()
426         for _, t := range trash {
427                 tlist.PushBack(t)
428         }
429         trashq.ReplaceQueue(tlist)
430 }
431
432 // ==============================
433 // GetBlock and PutBlock implement lower-level code for handling
434 // blocks by rooting through volumes connected to the local machine.
435 // Once the handler has determined that system policy permits the
436 // request, it calls these methods to perform the actual operation.
437 //
438 // TODO(twp): this code would probably be better located in the
439 // VolumeManager interface. As an abstraction, the VolumeManager
440 // should be the only part of the code that cares about which volume a
441 // block is stored on, so it should be responsible for figuring out
442 // which volume to check for fetching blocks, storing blocks, etc.
443 // ==============================
444
445 // GetBlock fetches and returns the block identified by "hash".
446 //
447 // On success, GetBlock returns a byte slice with the block data, and
448 // a nil error.
449 //
450 // If the block cannot be found on any volume, returns NotFoundError.
451 //
452 // If the block found does not have the correct MD5 hash, returns
453 // DiskHashError.
454 //
455 func GetBlock(hash string) ([]byte, error) {
456         // Attempt to read the requested hash from a keep volume.
457         errorToCaller := NotFoundError
458
459         for _, vol := range KeepVM.AllReadable() {
460                 buf, err := vol.Get(hash)
461                 if err != nil {
462                         // IsNotExist is an expected error and may be
463                         // ignored. All other errors are logged. In
464                         // any case we continue trying to read other
465                         // volumes. If all volumes report IsNotExist,
466                         // we return a NotFoundError.
467                         if !os.IsNotExist(err) {
468                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
469                         }
470                         continue
471                 }
472                 // Check the file checksum.
473                 //
474                 filehash := fmt.Sprintf("%x", md5.Sum(buf))
475                 if filehash != hash {
476                         // TODO: Try harder to tell a sysadmin about
477                         // this.
478                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
479                                 vol, hash, filehash)
480                         errorToCaller = DiskHashError
481                         bufs.Put(buf)
482                         continue
483                 }
484                 if errorToCaller == DiskHashError {
485                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
486                                 vol, hash)
487                 }
488                 return buf, nil
489         }
490         return nil, errorToCaller
491 }
492
493 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
494 //
495 // PutBlock(block, hash)
496 //   Stores the BLOCK (identified by the content id HASH) in Keep.
497 //
498 //   The MD5 checksum of the block must be identical to the content id HASH.
499 //   If not, an error is returned.
500 //
501 //   PutBlock stores the BLOCK on the first Keep volume with free space.
502 //   A failure code is returned to the user only if all volumes fail.
503 //
504 //   On success, PutBlock returns nil.
505 //   On failure, it returns a KeepError with one of the following codes:
506 //
507 //   500 Collision
508 //          A different block with the same hash already exists on this
509 //          Keep server.
510 //   422 MD5Fail
511 //          The MD5 hash of the BLOCK does not match the argument HASH.
512 //   503 Full
513 //          There was not enough space left in any Keep volume to store
514 //          the object.
515 //   500 Fail
516 //          The object could not be stored for some other reason (e.g.
517 //          all writes failed). The text of the error message should
518 //          provide as much detail as possible.
519 //
520 func PutBlock(block []byte, hash string) error {
521         // Check that BLOCK's checksum matches HASH.
522         blockhash := fmt.Sprintf("%x", md5.Sum(block))
523         if blockhash != hash {
524                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
525                 return RequestHashError
526         }
527
528         // If we already have this data, it's intact on disk, and we
529         // can update its timestamp, return success. If we have
530         // different data with the same hash, return failure.
531         if err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
532                 return err
533         }
534
535         // Choose a Keep volume to write to.
536         // If this volume fails, try all of the volumes in order.
537         if vol := KeepVM.NextWritable(); vol != nil {
538                 if err := vol.Put(hash, block); err == nil {
539                         return nil // success!
540                 }
541         }
542
543         writables := KeepVM.AllWritable()
544         if len(writables) == 0 {
545                 log.Print("No writable volumes.")
546                 return FullError
547         }
548
549         allFull := true
550         for _, vol := range writables {
551                 err := vol.Put(hash, block)
552                 if err == nil {
553                         return nil // success!
554                 }
555                 if err != FullError {
556                         // The volume is not full but the
557                         // write did not succeed.  Report the
558                         // error and continue trying.
559                         allFull = false
560                         log.Printf("%s: Write(%s): %s", vol, hash, err)
561                 }
562         }
563
564         if allFull {
565                 log.Print("All volumes are full.")
566                 return FullError
567         }
568         // Already logged the non-full errors.
569         return GenericError
570 }
571
572 // CompareAndTouch returns nil if one of the volumes already has the
573 // given content and it successfully updates the relevant block's
574 // modification time in order to protect it from premature garbage
575 // collection.
576 func CompareAndTouch(hash string, buf []byte) error {
577         var bestErr error = NotFoundError
578         for _, vol := range KeepVM.AllWritable() {
579                 if err := vol.Compare(hash, buf); err == CollisionError {
580                         // Stop if we have a block with same hash but
581                         // different content. (It will be impossible
582                         // to tell which one is wanted if we have
583                         // both, so there's no point writing it even
584                         // on a different volume.)
585                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
586                         return err
587                 } else if os.IsNotExist(err) {
588                         // Block does not exist. This is the only
589                         // "normal" error: we don't log anything.
590                         continue
591                 } else if err != nil {
592                         // Couldn't open file, data is corrupt on
593                         // disk, etc.: log this abnormal condition,
594                         // and try the next volume.
595                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
596                         continue
597                 }
598                 if err := vol.Touch(hash); err != nil {
599                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
600                         bestErr = err
601                         continue
602                 }
603                 // Compare and Touch both worked --> done.
604                 return nil
605         }
606         return bestErr
607 }
608
609 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
610
611 // IsValidLocator returns true if the specified string is a valid Keep locator.
612 //   When Keep is extended to support hash types other than MD5,
613 //   this should be updated to cover those as well.
614 //
615 func IsValidLocator(loc string) bool {
616         return validLocatorRe.MatchString(loc)
617 }
618
619 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
620
621 // GetApiToken returns the OAuth2 token from the Authorization
622 // header of a HTTP request, or an empty string if no matching
623 // token is found.
624 func GetApiToken(req *http.Request) string {
625         if auth, ok := req.Header["Authorization"]; ok {
626                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
627                         return match[1]
628                 }
629         }
630         return ""
631 }
632
633 // IsExpired returns true if the given Unix timestamp (expressed as a
634 // hexadecimal string) is in the past, or if timestampHex cannot be
635 // parsed as a hexadecimal string.
636 func IsExpired(timestampHex string) bool {
637         ts, err := strconv.ParseInt(timestampHex, 16, 0)
638         if err != nil {
639                 log.Printf("IsExpired: %s", err)
640                 return true
641         }
642         return time.Unix(ts, 0).Before(time.Now())
643 }
644
645 // CanDelete returns true if the user identified by apiToken is
646 // allowed to delete blocks.
647 func CanDelete(apiToken string) bool {
648         if apiToken == "" {
649                 return false
650         }
651         // Blocks may be deleted only when Keep has been configured with a
652         // data manager.
653         if IsDataManagerToken(apiToken) {
654                 return true
655         }
656         // TODO(twp): look up apiToken with the API server
657         // return true if is_admin is true and if the token
658         // has unlimited scope
659         return false
660 }
661
662 // IsDataManagerToken returns true if apiToken represents the data
663 // manager's token.
664 func IsDataManagerToken(apiToken string) bool {
665         return dataManagerToken != "" && apiToken == dataManagerToken
666 }