Finish basic state change & record protection unit tests.
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "container/list"
12         "crypto/md5"
13         "encoding/json"
14         "fmt"
15         "github.com/gorilla/mux"
16         "io"
17         "log"
18         "net/http"
19         "os"
20         "regexp"
21         "runtime"
22         "strconv"
23         "sync"
24         "time"
25 )
26
27 // MakeRESTRouter returns a new mux.Router that forwards all Keep
28 // requests to the appropriate handlers.
29 //
30 func MakeRESTRouter() *mux.Router {
31         rest := mux.NewRouter()
32
33         rest.HandleFunc(
34                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
35         rest.HandleFunc(
36                 `/{hash:[0-9a-f]{32}}+{hints}`,
37                 GetBlockHandler).Methods("GET", "HEAD")
38
39         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
40         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
41         // List all blocks stored here. Privileged client only.
42         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
43         // List blocks stored here whose hash has the given prefix.
44         // Privileged client only.
45         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
46
47         // List volumes: path, device number, bytes used/avail.
48         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
49
50         // Replace the current pull queue.
51         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
52
53         // Replace the current trash queue.
54         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
55
56         // Any request which does not match any of these routes gets
57         // 400 Bad Request.
58         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
59
60         return rest
61 }
62
63 // BadRequestHandler is a HandleFunc to address bad requests.
64 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
65         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
66 }
67
68 // GetBlockHandler is a HandleFunc to address Get block requests.
69 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
70         if enforcePermissions {
71                 locator := req.URL.Path[1:] // strip leading slash
72                 if err := VerifySignature(locator, GetApiToken(req)); err != nil {
73                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
74                         return
75                 }
76         }
77
78         block, err := GetBlock(mux.Vars(req)["hash"])
79         if err != nil {
80                 // This type assertion is safe because the only errors
81                 // GetBlock can return are DiskHashError or NotFoundError.
82                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
83                 return
84         }
85         defer bufs.Put(block)
86
87         resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
88         resp.Header().Set("Content-Type", "application/octet-stream")
89         resp.Write(block)
90 }
91
92 // PutBlockHandler is a HandleFunc to address Put block requests.
93 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
94         hash := mux.Vars(req)["hash"]
95
96         // Detect as many error conditions as possible before reading
97         // the body: avoid transmitting data that will not end up
98         // being written anyway.
99
100         if req.ContentLength == -1 {
101                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
102                 return
103         }
104
105         if req.ContentLength > BlockSize {
106                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
107                 return
108         }
109
110         if len(KeepVM.AllWritable()) == 0 {
111                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
112                 return
113         }
114
115         buf := bufs.Get(int(req.ContentLength))
116         _, err := io.ReadFull(req.Body, buf)
117         if err != nil {
118                 http.Error(resp, err.Error(), 500)
119                 bufs.Put(buf)
120                 return
121         }
122
123         replication, err := PutBlock(buf, hash)
124         bufs.Put(buf)
125
126         if err != nil {
127                 ke := err.(*KeepError)
128                 http.Error(resp, ke.Error(), ke.HTTPCode)
129                 return
130         }
131
132         // Success; add a size hint, sign the locator if possible, and
133         // return it to the client.
134         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
135         apiToken := GetApiToken(req)
136         if PermissionSecret != nil && apiToken != "" {
137                 expiry := time.Now().Add(blobSignatureTTL)
138                 returnHash = SignLocator(returnHash, apiToken, expiry)
139         }
140         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
141         resp.Write([]byte(returnHash + "\n"))
142 }
143
144 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
145 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
146         // Reject unauthorized requests.
147         if !IsDataManagerToken(GetApiToken(req)) {
148                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
149                 return
150         }
151
152         prefix := mux.Vars(req)["prefix"]
153
154         for _, vol := range KeepVM.AllReadable() {
155                 if err := vol.IndexTo(prefix, resp); err != nil {
156                         // The only errors returned by IndexTo are
157                         // write errors returned by resp.Write(),
158                         // which probably means the client has
159                         // disconnected and this error will never be
160                         // reported to the client -- but it will
161                         // appear in our own error log.
162                         http.Error(resp, err.Error(), http.StatusInternalServerError)
163                         return
164                 }
165         }
166         // An empty line at EOF is the only way the client can be
167         // assured the entire index was received.
168         resp.Write([]byte{'\n'})
169 }
170
171 // StatusHandler
172 //     Responds to /status.json requests with the current node status,
173 //     described in a JSON structure.
174 //
175 //     The data given in a status.json response includes:
176 //        volumes - a list of Keep volumes currently in use by this server
177 //          each volume is an object with the following fields:
178 //            * mount_point
179 //            * device_num (an integer identifying the underlying filesystem)
180 //            * bytes_free
181 //            * bytes_used
182
183 // PoolStatus struct
184 type PoolStatus struct {
185         Alloc uint64 `json:"BytesAllocated"`
186         Cap   int    `json:"BuffersMax"`
187         Len   int    `json:"BuffersInUse"`
188 }
189
190 // NodeStatus struct
191 type NodeStatus struct {
192         Volumes    []*VolumeStatus `json:"volumes"`
193         BufferPool PoolStatus
194         PullQueue  WorkQueueStatus
195         TrashQueue WorkQueueStatus
196         Memory     runtime.MemStats
197 }
198
199 var st NodeStatus
200 var stLock sync.Mutex
201
202 // StatusHandler addresses /status.json requests.
203 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
204         stLock.Lock()
205         readNodeStatus(&st)
206         jstat, err := json.Marshal(&st)
207         stLock.Unlock()
208         if err == nil {
209                 resp.Write(jstat)
210         } else {
211                 log.Printf("json.Marshal: %s", err)
212                 log.Printf("NodeStatus = %v", &st)
213                 http.Error(resp, err.Error(), 500)
214         }
215 }
216
217 // populate the given NodeStatus struct with current values.
218 func readNodeStatus(st *NodeStatus) {
219         vols := KeepVM.AllReadable()
220         if cap(st.Volumes) < len(vols) {
221                 st.Volumes = make([]*VolumeStatus, len(vols))
222         }
223         st.Volumes = st.Volumes[:0]
224         for _, vol := range vols {
225                 if s := vol.Status(); s != nil {
226                         st.Volumes = append(st.Volumes, s)
227                 }
228         }
229         st.BufferPool.Alloc = bufs.Alloc()
230         st.BufferPool.Cap = bufs.Cap()
231         st.BufferPool.Len = bufs.Len()
232         st.PullQueue = getWorkQueueStatus(pullq)
233         st.TrashQueue = getWorkQueueStatus(trashq)
234         runtime.ReadMemStats(&st.Memory)
235 }
236
237 // return a WorkQueueStatus for the given queue. If q is nil (which
238 // should never happen except in test suites), return a zero status
239 // value instead of crashing.
240 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
241         if q == nil {
242                 // This should only happen during tests.
243                 return WorkQueueStatus{}
244         }
245         return q.Status()
246 }
247
248 // DeleteHandler processes DELETE requests.
249 //
250 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
251 // from all connected volumes.
252 //
253 // Only the Data Manager, or an Arvados admin with scope "all", are
254 // allowed to issue DELETE requests.  If a DELETE request is not
255 // authenticated or is issued by a non-admin user, the server returns
256 // a PermissionError.
257 //
258 // Upon receiving a valid request from an authorized user,
259 // DeleteHandler deletes all copies of the specified block on local
260 // writable volumes.
261 //
262 // Response format:
263 //
264 // If the requested blocks was not found on any volume, the response
265 // code is HTTP 404 Not Found.
266 //
267 // Otherwise, the response code is 200 OK, with a response body
268 // consisting of the JSON message
269 //
270 //    {"copies_deleted":d,"copies_failed":f}
271 //
272 // where d and f are integers representing the number of blocks that
273 // were successfully and unsuccessfully deleted.
274 //
275 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
276         hash := mux.Vars(req)["hash"]
277
278         // Confirm that this user is an admin and has a token with unlimited scope.
279         var tok = GetApiToken(req)
280         if tok == "" || !CanDelete(tok) {
281                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
282                 return
283         }
284
285         if neverDelete {
286                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
287                 return
288         }
289
290         // Delete copies of this block from all available volumes.
291         // Report how many blocks were successfully deleted, and how
292         // many were found on writable volumes but not deleted.
293         var result struct {
294                 Deleted int `json:"copies_deleted"`
295                 Failed  int `json:"copies_failed"`
296         }
297         for _, vol := range KeepVM.AllWritable() {
298                 if err := vol.Delete(hash); err == nil {
299                         result.Deleted++
300                 } else if os.IsNotExist(err) {
301                         continue
302                 } else {
303                         result.Failed++
304                         log.Println("DeleteHandler:", err)
305                 }
306         }
307
308         var st int
309
310         if result.Deleted == 0 && result.Failed == 0 {
311                 st = http.StatusNotFound
312         } else {
313                 st = http.StatusOK
314         }
315
316         resp.WriteHeader(st)
317
318         if st == http.StatusOK {
319                 if body, err := json.Marshal(result); err == nil {
320                         resp.Write(body)
321                 } else {
322                         log.Printf("json.Marshal: %s (result = %v)", err, result)
323                         http.Error(resp, err.Error(), 500)
324                 }
325         }
326 }
327
328 /* PullHandler processes "PUT /pull" requests for the data manager.
329    The request body is a JSON message containing a list of pull
330    requests in the following format:
331
332    [
333       {
334          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
335          "servers":[
336                         "keep0.qr1hi.arvadosapi.com:25107",
337                         "keep1.qr1hi.arvadosapi.com:25108"
338                  ]
339           },
340           {
341                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
342                  "servers":[
343                         "10.0.1.5:25107",
344                         "10.0.1.6:25107",
345                         "10.0.1.7:25108"
346                  ]
347           },
348           ...
349    ]
350
351    Each pull request in the list consists of a block locator string
352    and an ordered list of servers.  Keepstore should try to fetch the
353    block from each server in turn.
354
355    If the request has not been sent by the Data Manager, return 401
356    Unauthorized.
357
358    If the JSON unmarshalling fails, return 400 Bad Request.
359 */
360
361 // PullRequest consists of a block locator and an ordered list of servers
362 type PullRequest struct {
363         Locator string   `json:"locator"`
364         Servers []string `json:"servers"`
365 }
366
367 // PullHandler processes "PUT /pull" requests for the data manager.
368 func PullHandler(resp http.ResponseWriter, req *http.Request) {
369         // Reject unauthorized requests.
370         if !IsDataManagerToken(GetApiToken(req)) {
371                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
372                 return
373         }
374
375         // Parse the request body.
376         var pr []PullRequest
377         r := json.NewDecoder(req.Body)
378         if err := r.Decode(&pr); err != nil {
379                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
380                 return
381         }
382
383         // We have a properly formatted pull list sent from the data
384         // manager.  Report success and send the list to the pull list
385         // manager for further handling.
386         resp.WriteHeader(http.StatusOK)
387         resp.Write([]byte(
388                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
389
390         plist := list.New()
391         for _, p := range pr {
392                 plist.PushBack(p)
393         }
394         pullq.ReplaceQueue(plist)
395 }
396
397 // TrashRequest consists of a block locator and it's Mtime
398 type TrashRequest struct {
399         Locator    string `json:"locator"`
400         BlockMtime int64  `json:"block_mtime"`
401 }
402
403 // TrashHandler processes /trash requests.
404 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
405         // Reject unauthorized requests.
406         if !IsDataManagerToken(GetApiToken(req)) {
407                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
408                 return
409         }
410
411         // Parse the request body.
412         var trash []TrashRequest
413         r := json.NewDecoder(req.Body)
414         if err := r.Decode(&trash); err != nil {
415                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
416                 return
417         }
418
419         // We have a properly formatted trash list sent from the data
420         // manager.  Report success and send the list to the trash work
421         // queue for further handling.
422         resp.WriteHeader(http.StatusOK)
423         resp.Write([]byte(
424                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
425
426         tlist := list.New()
427         for _, t := range trash {
428                 tlist.PushBack(t)
429         }
430         trashq.ReplaceQueue(tlist)
431 }
432
433 // ==============================
434 // GetBlock and PutBlock implement lower-level code for handling
435 // blocks by rooting through volumes connected to the local machine.
436 // Once the handler has determined that system policy permits the
437 // request, it calls these methods to perform the actual operation.
438 //
439 // TODO(twp): this code would probably be better located in the
440 // VolumeManager interface. As an abstraction, the VolumeManager
441 // should be the only part of the code that cares about which volume a
442 // block is stored on, so it should be responsible for figuring out
443 // which volume to check for fetching blocks, storing blocks, etc.
444 // ==============================
445
446 // GetBlock fetches and returns the block identified by "hash".
447 //
448 // On success, GetBlock returns a byte slice with the block data, and
449 // a nil error.
450 //
451 // If the block cannot be found on any volume, returns NotFoundError.
452 //
453 // If the block found does not have the correct MD5 hash, returns
454 // DiskHashError.
455 //
456 func GetBlock(hash string) ([]byte, error) {
457         // Attempt to read the requested hash from a keep volume.
458         errorToCaller := NotFoundError
459
460         for _, vol := range KeepVM.AllReadable() {
461                 buf, err := vol.Get(hash)
462                 if err != nil {
463                         // IsNotExist is an expected error and may be
464                         // ignored. All other errors are logged. In
465                         // any case we continue trying to read other
466                         // volumes. If all volumes report IsNotExist,
467                         // we return a NotFoundError.
468                         if !os.IsNotExist(err) {
469                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
470                         }
471                         continue
472                 }
473                 // Check the file checksum.
474                 //
475                 filehash := fmt.Sprintf("%x", md5.Sum(buf))
476                 if filehash != hash {
477                         // TODO: Try harder to tell a sysadmin about
478                         // this.
479                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
480                                 vol, hash, filehash)
481                         errorToCaller = DiskHashError
482                         bufs.Put(buf)
483                         continue
484                 }
485                 if errorToCaller == DiskHashError {
486                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
487                                 vol, hash)
488                 }
489                 return buf, nil
490         }
491         return nil, errorToCaller
492 }
493
494 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
495 //
496 // PutBlock(block, hash)
497 //   Stores the BLOCK (identified by the content id HASH) in Keep.
498 //
499 //   The MD5 checksum of the block must be identical to the content id HASH.
500 //   If not, an error is returned.
501 //
502 //   PutBlock stores the BLOCK on the first Keep volume with free space.
503 //   A failure code is returned to the user only if all volumes fail.
504 //
505 //   On success, PutBlock returns nil.
506 //   On failure, it returns a KeepError with one of the following codes:
507 //
508 //   500 Collision
509 //          A different block with the same hash already exists on this
510 //          Keep server.
511 //   422 MD5Fail
512 //          The MD5 hash of the BLOCK does not match the argument HASH.
513 //   503 Full
514 //          There was not enough space left in any Keep volume to store
515 //          the object.
516 //   500 Fail
517 //          The object could not be stored for some other reason (e.g.
518 //          all writes failed). The text of the error message should
519 //          provide as much detail as possible.
520 //
521 func PutBlock(block []byte, hash string) (int, error) {
522         // Check that BLOCK's checksum matches HASH.
523         blockhash := fmt.Sprintf("%x", md5.Sum(block))
524         if blockhash != hash {
525                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
526                 return 0, RequestHashError
527         }
528
529         // If we already have this data, it's intact on disk, and we
530         // can update its timestamp, return success. If we have
531         // different data with the same hash, return failure.
532         if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
533                 return n, err
534         }
535
536         // Choose a Keep volume to write to.
537         // If this volume fails, try all of the volumes in order.
538         if vol := KeepVM.NextWritable(); vol != nil {
539                 if err := vol.Put(hash, block); err == nil {
540                         return vol.Replication(), nil // success!
541                 }
542         }
543
544         writables := KeepVM.AllWritable()
545         if len(writables) == 0 {
546                 log.Print("No writable volumes.")
547                 return 0, FullError
548         }
549
550         allFull := true
551         for _, vol := range writables {
552                 err := vol.Put(hash, block)
553                 if err == nil {
554                         return vol.Replication(), nil // success!
555                 }
556                 if err != FullError {
557                         // The volume is not full but the
558                         // write did not succeed.  Report the
559                         // error and continue trying.
560                         allFull = false
561                         log.Printf("%s: Write(%s): %s", vol, hash, err)
562                 }
563         }
564
565         if allFull {
566                 log.Print("All volumes are full.")
567                 return 0, FullError
568         }
569         // Already logged the non-full errors.
570         return 0, GenericError
571 }
572
573 // CompareAndTouch returns the current replication level if one of the
574 // volumes already has the given content and it successfully updates
575 // the relevant block's modification time in order to protect it from
576 // premature garbage collection. Otherwise, it returns a non-nil
577 // error.
578 func CompareAndTouch(hash string, buf []byte) (int, error) {
579         var bestErr error = NotFoundError
580         for _, vol := range KeepVM.AllWritable() {
581                 if err := vol.Compare(hash, buf); err == CollisionError {
582                         // Stop if we have a block with same hash but
583                         // different content. (It will be impossible
584                         // to tell which one is wanted if we have
585                         // both, so there's no point writing it even
586                         // on a different volume.)
587                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
588                         return 0, err
589                 } else if os.IsNotExist(err) {
590                         // Block does not exist. This is the only
591                         // "normal" error: we don't log anything.
592                         continue
593                 } else if err != nil {
594                         // Couldn't open file, data is corrupt on
595                         // disk, etc.: log this abnormal condition,
596                         // and try the next volume.
597                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
598                         continue
599                 }
600                 if err := vol.Touch(hash); err != nil {
601                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
602                         bestErr = err
603                         continue
604                 }
605                 // Compare and Touch both worked --> done.
606                 return vol.Replication(), nil
607         }
608         return 0, bestErr
609 }
610
611 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
612
613 // IsValidLocator returns true if the specified string is a valid Keep locator.
614 //   When Keep is extended to support hash types other than MD5,
615 //   this should be updated to cover those as well.
616 //
617 func IsValidLocator(loc string) bool {
618         return validLocatorRe.MatchString(loc)
619 }
620
621 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
622
623 // GetApiToken returns the OAuth2 token from the Authorization
624 // header of a HTTP request, or an empty string if no matching
625 // token is found.
626 func GetApiToken(req *http.Request) string {
627         if auth, ok := req.Header["Authorization"]; ok {
628                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
629                         return match[1]
630                 }
631         }
632         return ""
633 }
634
635 // IsExpired returns true if the given Unix timestamp (expressed as a
636 // hexadecimal string) is in the past, or if timestampHex cannot be
637 // parsed as a hexadecimal string.
638 func IsExpired(timestampHex string) bool {
639         ts, err := strconv.ParseInt(timestampHex, 16, 0)
640         if err != nil {
641                 log.Printf("IsExpired: %s", err)
642                 return true
643         }
644         return time.Unix(ts, 0).Before(time.Now())
645 }
646
647 // CanDelete returns true if the user identified by apiToken is
648 // allowed to delete blocks.
649 func CanDelete(apiToken string) bool {
650         if apiToken == "" {
651                 return false
652         }
653         // Blocks may be deleted only when Keep has been configured with a
654         // data manager.
655         if IsDataManagerToken(apiToken) {
656                 return true
657         }
658         // TODO(twp): look up apiToken with the API server
659         // return true if is_admin is true and if the token
660         // has unlimited scope
661         return false
662 }
663
664 // IsDataManagerToken returns true if apiToken represents the data
665 // manager's token.
666 func IsDataManagerToken(apiToken string) bool {
667         return dataManagerToken != "" && apiToken == dataManagerToken
668 }