1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
23 "git.arvados.org/arvados.git/sdk/go/arvados"
24 "git.arvados.org/arvados.git/sdk/go/ctxlog"
25 "git.arvados.org/arvados.git/sdk/go/health"
26 "git.arvados.org/arvados.git/sdk/go/httpserver"
27 "github.com/gorilla/mux"
28 "github.com/prometheus/client_golang/prometheus"
29 "github.com/sirupsen/logrus"
34 cluster *arvados.Cluster
35 logger logrus.FieldLogger
36 remoteProxy remoteProxy
38 volmgr *RRVolumeManager
43 // MakeRESTRouter returns a new router that forwards all Keep requests
44 // to the appropriate handlers.
45 func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
47 Router: mux.NewRouter(),
49 logger: ctxlog.FromContext(ctx),
50 metrics: &nodeMetrics{reg: reg},
57 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
59 `/{hash:[0-9a-f]{32}}+{hints}`,
60 rtr.handleGET).Methods("GET", "HEAD")
62 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
63 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
64 // List all blocks stored here. Privileged client only.
65 rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
66 // List blocks stored here whose hash has the given prefix.
67 // Privileged client only.
68 rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
69 // Update timestamp on existing block. Privileged client only.
70 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleTOUCH).Methods("TOUCH")
72 // Internals/debugging info (runtime.MemStats)
73 rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
75 // List volumes: path, device number, bytes used/avail.
76 rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
78 // List mounts: UUID, readonly, tier, device ID, ...
79 rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
80 rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
81 rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
83 // Replace the current pull queue.
84 rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
86 // Replace the current trash queue.
87 rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
89 // Untrash moves blocks from trash back into store
90 rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
92 rtr.Handle("/_health/{check}", &health.Handler{
93 Token: cluster.ManagementToken,
97 // Any request which does not match any of these routes gets
99 rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
101 rtr.metrics.setupBufferPoolMetrics(bufs)
102 rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
103 rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
108 // BadRequestHandler is a HandleFunc to address bad requests.
109 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
110 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
113 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
114 ctx, cancel := contextForResponse(context.TODO(), resp)
117 locator := req.URL.Path[1:]
118 if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
119 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
123 if rtr.cluster.Collections.BlobSigning {
124 locator := req.URL.Path[1:] // strip leading slash
125 if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
126 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
131 // TODO: Probe volumes to check whether the block _might_
132 // exist. Some volumes/types could support a quick existence
133 // check without causing other operations to suffer. If all
134 // volumes support that, and assure us the block definitely
135 // isn't here, we can return 404 now instead of waiting for a
138 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
140 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
145 size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
147 code := http.StatusInternalServerError
148 if err, ok := err.(*KeepError); ok {
151 http.Error(resp, err.Error(), code)
155 resp.Header().Set("Content-Length", strconv.Itoa(size))
156 resp.Header().Set("Content-Type", "application/octet-stream")
157 resp.Write(buf[:size])
160 // Return a new context that gets cancelled by resp's CloseNotifier.
161 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
162 ctx, cancel := context.WithCancel(parent)
163 if cn, ok := resp.(http.CloseNotifier); ok {
164 go func(c <-chan bool) {
175 // Get a buffer from the pool -- but give up and return a non-nil
176 // error if ctx ends before we get a buffer.
177 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
178 bufReady := make(chan []byte)
180 bufReady <- bufs.Get(bufSize)
183 case buf := <-bufReady:
187 // Even if closeNotifier happened first, we
188 // need to keep waiting for our buf so we can
189 // return it to the pool.
192 return nil, ErrClientDisconnect
196 func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
197 if !rtr.isSystemAuth(GetAPIToken(req)) {
198 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
201 hash := mux.Vars(req)["hash"]
202 vols := rtr.volmgr.AllWritable()
204 http.Error(resp, "no volumes", http.StatusNotFound)
208 for _, mnt := range vols {
209 err = mnt.Touch(hash)
217 case os.IsNotExist(err):
218 http.Error(resp, err.Error(), http.StatusNotFound)
220 http.Error(resp, err.Error(), http.StatusInternalServerError)
224 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
225 ctx, cancel := contextForResponse(context.TODO(), resp)
228 hash := mux.Vars(req)["hash"]
230 // Detect as many error conditions as possible before reading
231 // the body: avoid transmitting data that will not end up
232 // being written anyway.
234 if req.ContentLength == -1 {
235 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
239 if req.ContentLength > BlockSize {
240 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
244 if len(rtr.volmgr.AllWritable()) == 0 {
245 http.Error(resp, FullError.Error(), FullError.HTTPCode)
249 var wantStorageClasses []string
250 if hdr := req.Header.Get("X-Keep-Storage-Classes"); hdr != "" {
251 wantStorageClasses = strings.Split(hdr, ",")
252 for i, sc := range wantStorageClasses {
253 wantStorageClasses[i] = strings.TrimSpace(sc)
257 buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
259 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
263 _, err = io.ReadFull(req.Body, buf)
265 http.Error(resp, err.Error(), 500)
270 result, err := PutBlock(ctx, rtr.volmgr, buf, hash, wantStorageClasses)
274 code := http.StatusInternalServerError
275 if err, ok := err.(*KeepError); ok {
278 http.Error(resp, err.Error(), code)
282 // Success; add a size hint, sign the locator if possible, and
283 // return it to the client.
284 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
285 apiToken := GetAPIToken(req)
286 if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
287 expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
288 returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
290 resp.Header().Set("X-Keep-Replicas-Stored", result.TotalReplication())
291 resp.Header().Set("X-Keep-Storage-Classes-Confirmed", result.ClassReplication())
292 resp.Write([]byte(returnHash + "\n"))
295 // IndexHandler responds to "/index", "/index/{prefix}", and
296 // "/mounts/{uuid}/blocks" requests.
297 func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
298 if !rtr.isSystemAuth(GetAPIToken(req)) {
299 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
303 prefix := mux.Vars(req)["prefix"]
306 prefix = req.Form.Get("prefix")
309 uuid := mux.Vars(req)["uuid"]
311 var vols []*VolumeMount
313 vols = rtr.volmgr.AllReadable()
314 } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
315 http.Error(resp, "mount not found", http.StatusNotFound)
318 vols = []*VolumeMount{mnt}
321 for _, v := range vols {
322 if err := v.IndexTo(prefix, resp); err != nil {
323 // We can't send an error status/message to
324 // the client because IndexTo() might have
325 // already written body content. All we can do
326 // is log the error in our own logs.
328 // The client must notice the lack of trailing
329 // newline as an indication that the response
331 ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
335 // An empty line at EOF is the only way the client can be
336 // assured the entire index was received.
337 resp.Write([]byte{'\n'})
340 // MountsHandler responds to "GET /mounts" requests.
341 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
342 err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
344 httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
349 type PoolStatus struct {
350 Alloc uint64 `json:"BytesAllocatedCumulative"`
351 Cap int `json:"BuffersMax"`
352 Len int `json:"BuffersInUse"`
355 type volumeStatusEnt struct {
357 Status *VolumeStatus `json:",omitempty"`
358 VolumeStats *ioStats `json:",omitempty"`
359 InternalStats interface{} `json:",omitempty"`
363 type NodeStatus struct {
364 Volumes []*volumeStatusEnt
365 BufferPool PoolStatus
366 PullQueue WorkQueueStatus
367 TrashQueue WorkQueueStatus
374 var stLock sync.Mutex
376 // DebugHandler addresses /debug.json requests.
377 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
378 type debugStats struct {
379 MemStats runtime.MemStats
382 runtime.ReadMemStats(&ds.MemStats)
383 data, err := json.Marshal(&ds)
385 http.Error(resp, err.Error(), http.StatusInternalServerError)
391 // StatusHandler addresses /status.json requests.
392 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
394 rtr.readNodeStatus(&st)
395 data, err := json.Marshal(&st)
398 http.Error(resp, err.Error(), http.StatusInternalServerError)
404 // populate the given NodeStatus struct with current values.
405 func (rtr *router) readNodeStatus(st *NodeStatus) {
407 vols := rtr.volmgr.AllReadable()
408 if cap(st.Volumes) < len(vols) {
409 st.Volumes = make([]*volumeStatusEnt, len(vols))
411 st.Volumes = st.Volumes[:0]
412 for _, vol := range vols {
413 var internalStats interface{}
414 if vol, ok := vol.Volume.(InternalStatser); ok {
415 internalStats = vol.InternalStats()
417 st.Volumes = append(st.Volumes, &volumeStatusEnt{
419 Status: vol.Status(),
420 InternalStats: internalStats,
421 //VolumeStats: rtr.volmgr.VolumeStats(vol),
424 st.BufferPool.Alloc = bufs.Alloc()
425 st.BufferPool.Cap = bufs.Cap()
426 st.BufferPool.Len = bufs.Len()
427 st.PullQueue = getWorkQueueStatus(rtr.pullq)
428 st.TrashQueue = getWorkQueueStatus(rtr.trashq)
431 // return a WorkQueueStatus for the given queue. If q is nil (which
432 // should never happen except in test suites), return a zero status
433 // value instead of crashing.
434 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
436 // This should only happen during tests.
437 return WorkQueueStatus{}
442 // handleDELETE processes DELETE requests.
444 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
445 // from all connected volumes.
447 // Only the Data Manager, or an Arvados admin with scope "all", are
448 // allowed to issue DELETE requests. If a DELETE request is not
449 // authenticated or is issued by a non-admin user, the server returns
450 // a PermissionError.
452 // Upon receiving a valid request from an authorized user,
453 // handleDELETE deletes all copies of the specified block on local
458 // If the requested blocks was not found on any volume, the response
459 // code is HTTP 404 Not Found.
461 // Otherwise, the response code is 200 OK, with a response body
462 // consisting of the JSON message
464 // {"copies_deleted":d,"copies_failed":f}
466 // where d and f are integers representing the number of blocks that
467 // were successfully and unsuccessfully deleted.
469 func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
470 hash := mux.Vars(req)["hash"]
472 // Confirm that this user is an admin and has a token with unlimited scope.
473 var tok = GetAPIToken(req)
474 if tok == "" || !rtr.canDelete(tok) {
475 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
479 if !rtr.cluster.Collections.BlobTrash {
480 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
484 // Delete copies of this block from all available volumes.
485 // Report how many blocks were successfully deleted, and how
486 // many were found on writable volumes but not deleted.
488 Deleted int `json:"copies_deleted"`
489 Failed int `json:"copies_failed"`
491 for _, vol := range rtr.volmgr.AllWritable() {
492 if err := vol.Trash(hash); err == nil {
494 } else if os.IsNotExist(err) {
498 ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
501 if result.Deleted == 0 && result.Failed == 0 {
502 resp.WriteHeader(http.StatusNotFound)
505 body, err := json.Marshal(result)
507 http.Error(resp, err.Error(), http.StatusInternalServerError)
513 /* PullHandler processes "PUT /pull" requests for the data manager.
514 The request body is a JSON message containing a list of pull
515 requests in the following format:
519 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
521 "keep0.qr1hi.arvadosapi.com:25107",
522 "keep1.qr1hi.arvadosapi.com:25108"
526 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
536 Each pull request in the list consists of a block locator string
537 and an ordered list of servers. Keepstore should try to fetch the
538 block from each server in turn.
540 If the request has not been sent by the Data Manager, return 401
543 If the JSON unmarshalling fails, return 400 Bad Request.
546 // PullRequest consists of a block locator and an ordered list of servers
547 type PullRequest struct {
548 Locator string `json:"locator"`
549 Servers []string `json:"servers"`
551 // Destination mount, or "" for "anywhere"
552 MountUUID string `json:"mount_uuid"`
555 // PullHandler processes "PUT /pull" requests for the data manager.
556 func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
557 // Reject unauthorized requests.
558 if !rtr.isSystemAuth(GetAPIToken(req)) {
559 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
563 // Parse the request body.
565 r := json.NewDecoder(req.Body)
566 if err := r.Decode(&pr); err != nil {
567 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
571 // We have a properly formatted pull list sent from the data
572 // manager. Report success and send the list to the pull list
573 // manager for further handling.
574 resp.WriteHeader(http.StatusOK)
576 fmt.Sprintf("Received %d pull requests\n", len(pr))))
579 for _, p := range pr {
582 rtr.pullq.ReplaceQueue(plist)
585 // TrashRequest consists of a block locator and its Mtime
586 type TrashRequest struct {
587 Locator string `json:"locator"`
588 BlockMtime int64 `json:"block_mtime"`
590 // Target mount, or "" for "everywhere"
591 MountUUID string `json:"mount_uuid"`
594 // TrashHandler processes /trash requests.
595 func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
596 // Reject unauthorized requests.
597 if !rtr.isSystemAuth(GetAPIToken(req)) {
598 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
602 // Parse the request body.
603 var trash []TrashRequest
604 r := json.NewDecoder(req.Body)
605 if err := r.Decode(&trash); err != nil {
606 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
610 // We have a properly formatted trash list sent from the data
611 // manager. Report success and send the list to the trash work
612 // queue for further handling.
613 resp.WriteHeader(http.StatusOK)
615 fmt.Sprintf("Received %d trash requests\n", len(trash))))
618 for _, t := range trash {
621 rtr.trashq.ReplaceQueue(tlist)
624 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
625 func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
626 // Reject unauthorized requests.
627 if !rtr.isSystemAuth(GetAPIToken(req)) {
628 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
632 log := ctxlog.FromContext(req.Context())
633 hash := mux.Vars(req)["hash"]
635 if len(rtr.volmgr.AllWritable()) == 0 {
636 http.Error(resp, "No writable volumes", http.StatusNotFound)
640 var untrashedOn, failedOn []string
642 for _, vol := range rtr.volmgr.AllWritable() {
643 err := vol.Untrash(hash)
645 if os.IsNotExist(err) {
647 } else if err != nil {
648 log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
649 failedOn = append(failedOn, vol.String())
651 log.Infof("Untrashed %v on volume %v", hash, vol.String())
652 untrashedOn = append(untrashedOn, vol.String())
656 if numNotFound == len(rtr.volmgr.AllWritable()) {
657 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
658 } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
659 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
661 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
662 if len(failedOn) > 0 {
663 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
664 http.Error(resp, respBody, http.StatusInternalServerError)
666 fmt.Fprintln(resp, respBody)
671 // GetBlock and PutBlock implement lower-level code for handling
672 // blocks by rooting through volumes connected to the local machine.
673 // Once the handler has determined that system policy permits the
674 // request, it calls these methods to perform the actual operation.
676 // TODO(twp): this code would probably be better located in the
677 // VolumeManager interface. As an abstraction, the VolumeManager
678 // should be the only part of the code that cares about which volume a
679 // block is stored on, so it should be responsible for figuring out
680 // which volume to check for fetching blocks, storing blocks, etc.
682 // GetBlock fetches the block identified by "hash" into the provided
683 // buf, and returns the data size.
685 // If the block cannot be found on any volume, returns NotFoundError.
687 // If the block found does not have the correct MD5 hash, returns
690 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
691 log := ctxlog.FromContext(ctx)
693 // Attempt to read the requested hash from a keep volume.
694 errorToCaller := NotFoundError
696 for _, vol := range volmgr.AllReadable() {
697 size, err := vol.Get(ctx, hash, buf)
700 return 0, ErrClientDisconnect
704 // IsNotExist is an expected error and may be
705 // ignored. All other errors are logged. In
706 // any case we continue trying to read other
707 // volumes. If all volumes report IsNotExist,
708 // we return a NotFoundError.
709 if !os.IsNotExist(err) {
710 log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
712 // If some volume returns a transient error, return it to the caller
713 // instead of "Not found" so it can retry.
714 if err == VolumeBusyError {
715 errorToCaller = err.(*KeepError)
719 // Check the file checksum.
720 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
721 if filehash != hash {
722 // TODO: Try harder to tell a sysadmin about
724 log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
725 errorToCaller = DiskHashError
728 if errorToCaller == DiskHashError {
729 log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
733 return 0, errorToCaller
736 type putProgress struct {
737 classTodo map[string]bool
738 mountUsed map[*VolumeMount]bool
740 classDone map[string]int
743 // Number of distinct replicas stored. "2" can mean the block was
744 // stored on 2 different volumes with replication 1, or on 1 volume
745 // with replication 2.
746 func (pr putProgress) TotalReplication() string {
747 return strconv.Itoa(pr.totalReplication)
750 // Number of replicas satisfying each storage class, formatted like
751 // "default=2; special=1".
752 func (pr putProgress) ClassReplication() string {
754 for k, v := range pr.classDone {
758 s += k + "=" + strconv.Itoa(v)
763 func (pr *putProgress) Add(mnt *VolumeMount) {
764 if pr.mountUsed[mnt] {
765 logrus.Warnf("BUG? superfluous extra write to mount %s", mnt)
768 pr.mountUsed[mnt] = true
769 pr.totalReplication += mnt.Replication
770 for class := range mnt.StorageClasses {
771 pr.classDone[class] += mnt.Replication
772 delete(pr.classTodo, class)
776 func (pr *putProgress) Done() bool {
777 return len(pr.classTodo) == 0 && pr.totalReplication > 0
780 func (pr *putProgress) Want(mnt *VolumeMount) bool {
781 if pr.Done() || pr.mountUsed[mnt] {
784 if len(pr.classTodo) == 0 {
785 // none specified == "any"
788 for class := range mnt.StorageClasses {
789 if pr.classTodo[class] {
796 func newPutResult(classes []string) putProgress {
798 classTodo: make(map[string]bool, len(classes)),
799 classDone: map[string]int{},
800 mountUsed: map[*VolumeMount]bool{},
802 for _, c := range classes {
804 pr.classTodo[c] = true
810 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
812 // PutBlock(ctx, block, hash)
813 // Stores the BLOCK (identified by the content id HASH) in Keep.
815 // The MD5 checksum of the block must be identical to the content id HASH.
816 // If not, an error is returned.
818 // PutBlock stores the BLOCK on the first Keep volume with free space.
819 // A failure code is returned to the user only if all volumes fail.
821 // On success, PutBlock returns nil.
822 // On failure, it returns a KeepError with one of the following codes:
825 // A different block with the same hash already exists on this
828 // The MD5 hash of the BLOCK does not match the argument HASH.
830 // There was not enough space left in any Keep volume to store
833 // The object could not be stored for some other reason (e.g.
834 // all writes failed). The text of the error message should
835 // provide as much detail as possible.
837 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) {
838 log := ctxlog.FromContext(ctx)
840 // Check that BLOCK's checksum matches HASH.
841 blockhash := fmt.Sprintf("%x", md5.Sum(block))
842 if blockhash != hash {
843 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
844 return putProgress{}, RequestHashError
847 result := newPutResult(wantStorageClasses)
849 // If we already have this data, it's intact on disk, and we
850 // can update its timestamp, return success. If we have
851 // different data with the same hash, return failure.
852 if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil {
855 if ctx.Err() != nil {
856 return result, ErrClientDisconnect
859 // Choose a Keep volume to write to.
860 // If this volume fails, try all of the volumes in order.
861 if mnt := volmgr.NextWritable(); mnt == nil || !result.Want(mnt) {
862 // fall through to "try all volumes" below
863 } else if err := mnt.Put(ctx, hash, block); err != nil {
864 log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
871 if ctx.Err() != nil {
872 return putProgress{}, ErrClientDisconnect
875 writables := volmgr.AllWritable()
876 if len(writables) == 0 {
877 log.Error("no writable volumes")
878 return putProgress{}, FullError
882 for _, mnt := range writables {
883 if !result.Want(mnt) {
886 err := mnt.Put(ctx, hash, block)
887 if ctx.Err() != nil {
888 return result, ErrClientDisconnect
900 // The volume is not full but the
901 // write did not succeed. Report the
902 // error and continue trying.
904 log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
908 if result.totalReplication > 0 {
909 // Some, but not all, of the storage classes were
910 // satisfied. This qualifies as success.
913 log.Error("all volumes with qualifying storage classes are full")
914 return putProgress{}, FullError
916 // Already logged the non-full errors.
917 return putProgress{}, GenericError
921 // CompareAndTouch looks for volumes where the given content already
922 // exists and its modification time can be updated (i.e., it is
923 // protected from garbage collection), and updates result accordingly.
924 // It returns when the result is Done() or all volumes have been
926 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, result *putProgress) error {
927 log := ctxlog.FromContext(ctx)
928 for _, mnt := range volmgr.AllWritable() {
929 if !result.Want(mnt) {
932 err := mnt.Compare(ctx, hash, buf)
933 if ctx.Err() != nil {
935 } else if err == CollisionError {
936 // Stop if we have a block with same hash but
937 // different content. (It will be impossible
938 // to tell which one is wanted if we have
939 // both, so there's no point writing it even
940 // on a different volume.)
941 log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
942 return CollisionError
943 } else if os.IsNotExist(err) {
944 // Block does not exist. This is the only
945 // "normal" error: we don't log anything.
947 } else if err != nil {
948 // Couldn't open file, data is corrupt on
949 // disk, etc.: log this abnormal condition,
950 // and try the next volume.
951 log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
954 if err := mnt.Touch(hash); err != nil {
955 log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
958 // Compare and Touch both worked --> done.
967 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
969 // IsValidLocator returns true if the specified string is a valid Keep locator.
970 // When Keep is extended to support hash types other than MD5,
971 // this should be updated to cover those as well.
973 func IsValidLocator(loc string) bool {
974 return validLocatorRe.MatchString(loc)
977 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
979 // GetAPIToken returns the OAuth2 token from the Authorization
980 // header of a HTTP request, or an empty string if no matching
982 func GetAPIToken(req *http.Request) string {
983 if auth, ok := req.Header["Authorization"]; ok {
984 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
991 // canDelete returns true if the user identified by apiToken is
992 // allowed to delete blocks.
993 func (rtr *router) canDelete(apiToken string) bool {
997 // Blocks may be deleted only when Keep has been configured with a
999 if rtr.isSystemAuth(apiToken) {
1002 // TODO(twp): look up apiToken with the API server
1003 // return true if is_admin is true and if the token
1004 // has unlimited scope
1008 // isSystemAuth returns true if the given token is allowed to perform
1009 // system level actions like deleting data.
1010 func (rtr *router) isSystemAuth(token string) bool {
1011 return token != "" && token == rtr.cluster.SystemRootToken