1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
23 "git.arvados.org/arvados.git/sdk/go/arvados"
24 "git.arvados.org/arvados.git/sdk/go/ctxlog"
25 "git.arvados.org/arvados.git/sdk/go/health"
26 "git.arvados.org/arvados.git/sdk/go/httpserver"
27 "github.com/gorilla/mux"
28 "github.com/prometheus/client_golang/prometheus"
29 "github.com/sirupsen/logrus"
34 cluster *arvados.Cluster
35 logger logrus.FieldLogger
36 remoteProxy remoteProxy
38 volmgr *RRVolumeManager
43 // MakeRESTRouter returns a new router that forwards all Keep requests
44 // to the appropriate handlers.
45 func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
47 Router: mux.NewRouter(),
49 logger: ctxlog.FromContext(ctx),
50 metrics: &nodeMetrics{reg: reg},
57 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
59 `/{hash:[0-9a-f]{32}}+{hints}`,
60 rtr.handleGET).Methods("GET", "HEAD")
62 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
63 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
64 // List all blocks stored here. Privileged client only.
65 rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
66 // List blocks stored here whose hash has the given prefix.
67 // Privileged client only.
68 rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
69 // Update timestamp on existing block. Privileged client only.
70 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleTOUCH).Methods("TOUCH")
72 // Internals/debugging info (runtime.MemStats)
73 rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
75 // List volumes: path, device number, bytes used/avail.
76 rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
78 // List mounts: UUID, readonly, tier, device ID, ...
79 rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
80 rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
81 rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
83 // Replace the current pull queue.
84 rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
86 // Replace the current trash queue.
87 rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
89 // Untrash moves blocks from trash back into store
90 rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
92 rtr.Handle("/_health/{check}", &health.Handler{
93 Token: cluster.ManagementToken,
97 // Any request which does not match any of these routes gets
99 rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
101 rtr.metrics.setupBufferPoolMetrics(bufs)
102 rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
103 rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
108 // BadRequestHandler is a HandleFunc to address bad requests.
109 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
110 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
113 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
114 ctx, cancel := contextForResponse(context.TODO(), resp)
117 locator := req.URL.Path[1:]
118 if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
119 rtr.remoteProxy.Get(ctx, resp, req, rtr.cluster, rtr.volmgr)
123 if rtr.cluster.Collections.BlobSigning {
124 locator := req.URL.Path[1:] // strip leading slash
125 if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
126 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
131 // TODO: Probe volumes to check whether the block _might_
132 // exist. Some volumes/types could support a quick existence
133 // check without causing other operations to suffer. If all
134 // volumes support that, and assure us the block definitely
135 // isn't here, we can return 404 now instead of waiting for a
138 buf, err := getBufferWithContext(ctx, bufs, BlockSize)
140 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
145 size, err := GetBlock(ctx, rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
147 code := http.StatusInternalServerError
148 if err, ok := err.(*KeepError); ok {
151 http.Error(resp, err.Error(), code)
155 resp.Header().Set("Content-Length", strconv.Itoa(size))
156 resp.Header().Set("Content-Type", "application/octet-stream")
157 resp.Write(buf[:size])
160 // Return a new context that gets cancelled by resp's CloseNotifier.
161 func contextForResponse(parent context.Context, resp http.ResponseWriter) (context.Context, context.CancelFunc) {
162 ctx, cancel := context.WithCancel(parent)
163 if cn, ok := resp.(http.CloseNotifier); ok {
164 go func(c <-chan bool) {
175 // Get a buffer from the pool -- but give up and return a non-nil
176 // error if ctx ends before we get a buffer.
177 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
178 bufReady := make(chan []byte)
180 bufReady <- bufs.Get(bufSize)
183 case buf := <-bufReady:
187 // Even if closeNotifier happened first, we
188 // need to keep waiting for our buf so we can
189 // return it to the pool.
192 return nil, ErrClientDisconnect
196 func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
197 if !rtr.isSystemAuth(GetAPIToken(req)) {
198 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
201 hash := mux.Vars(req)["hash"]
202 vols := rtr.volmgr.AllWritable()
204 http.Error(resp, "no volumes", http.StatusNotFound)
208 for _, mnt := range vols {
209 err = mnt.Touch(hash)
217 case os.IsNotExist(err):
218 http.Error(resp, err.Error(), http.StatusNotFound)
220 http.Error(resp, err.Error(), http.StatusInternalServerError)
224 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
225 ctx, cancel := contextForResponse(context.TODO(), resp)
228 hash := mux.Vars(req)["hash"]
230 // Detect as many error conditions as possible before reading
231 // the body: avoid transmitting data that will not end up
232 // being written anyway.
234 if req.ContentLength == -1 {
235 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
239 if req.ContentLength > BlockSize {
240 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
244 if len(rtr.volmgr.AllWritable()) == 0 {
245 http.Error(resp, FullError.Error(), FullError.HTTPCode)
249 var wantStorageClasses []string
250 if hdr := req.Header.Get("X-Keep-Storage-Classes"); hdr != "" {
251 wantStorageClasses = strings.Split(hdr, ",")
252 for i, sc := range wantStorageClasses {
253 wantStorageClasses[i] = strings.TrimSpace(sc)
256 // none specified -- use configured default
257 for class, cfg := range rtr.cluster.StorageClasses {
259 wantStorageClasses = append(wantStorageClasses, class)
264 buf, err := getBufferWithContext(ctx, bufs, int(req.ContentLength))
266 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
270 _, err = io.ReadFull(req.Body, buf)
272 http.Error(resp, err.Error(), 500)
277 result, err := PutBlock(ctx, rtr.volmgr, buf, hash, wantStorageClasses)
281 code := http.StatusInternalServerError
282 if err, ok := err.(*KeepError); ok {
285 http.Error(resp, err.Error(), code)
289 // Success; add a size hint, sign the locator if possible, and
290 // return it to the client.
291 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
292 apiToken := GetAPIToken(req)
293 if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
294 expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
295 returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
297 resp.Header().Set("X-Keep-Replicas-Stored", result.TotalReplication())
298 resp.Header().Set("X-Keep-Storage-Classes-Confirmed", result.ClassReplication())
299 resp.Write([]byte(returnHash + "\n"))
302 // IndexHandler responds to "/index", "/index/{prefix}", and
303 // "/mounts/{uuid}/blocks" requests.
304 func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
305 if !rtr.isSystemAuth(GetAPIToken(req)) {
306 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
310 prefix := mux.Vars(req)["prefix"]
313 prefix = req.Form.Get("prefix")
316 uuid := mux.Vars(req)["uuid"]
318 var vols []*VolumeMount
320 vols = rtr.volmgr.AllReadable()
321 } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
322 http.Error(resp, "mount not found", http.StatusNotFound)
325 vols = []*VolumeMount{mnt}
328 for _, v := range vols {
329 if err := v.IndexTo(prefix, resp); err != nil {
330 // We can't send an error status/message to
331 // the client because IndexTo() might have
332 // already written body content. All we can do
333 // is log the error in our own logs.
335 // The client must notice the lack of trailing
336 // newline as an indication that the response
338 ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
342 // An empty line at EOF is the only way the client can be
343 // assured the entire index was received.
344 resp.Write([]byte{'\n'})
347 // MountsHandler responds to "GET /mounts" requests.
348 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
349 err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
351 httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
356 type PoolStatus struct {
357 Alloc uint64 `json:"BytesAllocatedCumulative"`
358 Cap int `json:"BuffersMax"`
359 Len int `json:"BuffersInUse"`
362 type volumeStatusEnt struct {
364 Status *VolumeStatus `json:",omitempty"`
365 VolumeStats *ioStats `json:",omitempty"`
366 InternalStats interface{} `json:",omitempty"`
370 type NodeStatus struct {
371 Volumes []*volumeStatusEnt
372 BufferPool PoolStatus
373 PullQueue WorkQueueStatus
374 TrashQueue WorkQueueStatus
381 var stLock sync.Mutex
383 // DebugHandler addresses /debug.json requests.
384 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
385 type debugStats struct {
386 MemStats runtime.MemStats
389 runtime.ReadMemStats(&ds.MemStats)
390 data, err := json.Marshal(&ds)
392 http.Error(resp, err.Error(), http.StatusInternalServerError)
398 // StatusHandler addresses /status.json requests.
399 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
401 rtr.readNodeStatus(&st)
402 data, err := json.Marshal(&st)
405 http.Error(resp, err.Error(), http.StatusInternalServerError)
411 // populate the given NodeStatus struct with current values.
412 func (rtr *router) readNodeStatus(st *NodeStatus) {
414 vols := rtr.volmgr.AllReadable()
415 if cap(st.Volumes) < len(vols) {
416 st.Volumes = make([]*volumeStatusEnt, len(vols))
418 st.Volumes = st.Volumes[:0]
419 for _, vol := range vols {
420 var internalStats interface{}
421 if vol, ok := vol.Volume.(InternalStatser); ok {
422 internalStats = vol.InternalStats()
424 st.Volumes = append(st.Volumes, &volumeStatusEnt{
426 Status: vol.Status(),
427 InternalStats: internalStats,
428 //VolumeStats: rtr.volmgr.VolumeStats(vol),
431 st.BufferPool.Alloc = bufs.Alloc()
432 st.BufferPool.Cap = bufs.Cap()
433 st.BufferPool.Len = bufs.Len()
434 st.PullQueue = getWorkQueueStatus(rtr.pullq)
435 st.TrashQueue = getWorkQueueStatus(rtr.trashq)
438 // return a WorkQueueStatus for the given queue. If q is nil (which
439 // should never happen except in test suites), return a zero status
440 // value instead of crashing.
441 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
443 // This should only happen during tests.
444 return WorkQueueStatus{}
449 // handleDELETE processes DELETE requests.
451 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
452 // from all connected volumes.
454 // Only the Data Manager, or an Arvados admin with scope "all", are
455 // allowed to issue DELETE requests. If a DELETE request is not
456 // authenticated or is issued by a non-admin user, the server returns
457 // a PermissionError.
459 // Upon receiving a valid request from an authorized user,
460 // handleDELETE deletes all copies of the specified block on local
465 // If the requested blocks was not found on any volume, the response
466 // code is HTTP 404 Not Found.
468 // Otherwise, the response code is 200 OK, with a response body
469 // consisting of the JSON message
471 // {"copies_deleted":d,"copies_failed":f}
473 // where d and f are integers representing the number of blocks that
474 // were successfully and unsuccessfully deleted.
476 func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
477 hash := mux.Vars(req)["hash"]
479 // Confirm that this user is an admin and has a token with unlimited scope.
480 var tok = GetAPIToken(req)
481 if tok == "" || !rtr.canDelete(tok) {
482 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
486 if !rtr.cluster.Collections.BlobTrash {
487 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
491 // Delete copies of this block from all available volumes.
492 // Report how many blocks were successfully deleted, and how
493 // many were found on writable volumes but not deleted.
495 Deleted int `json:"copies_deleted"`
496 Failed int `json:"copies_failed"`
498 for _, vol := range rtr.volmgr.AllWritable() {
499 if err := vol.Trash(hash); err == nil {
501 } else if os.IsNotExist(err) {
505 ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
508 if result.Deleted == 0 && result.Failed == 0 {
509 resp.WriteHeader(http.StatusNotFound)
512 body, err := json.Marshal(result)
514 http.Error(resp, err.Error(), http.StatusInternalServerError)
520 /* PullHandler processes "PUT /pull" requests for the data manager.
521 The request body is a JSON message containing a list of pull
522 requests in the following format:
526 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
528 "keep0.qr1hi.arvadosapi.com:25107",
529 "keep1.qr1hi.arvadosapi.com:25108"
533 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
543 Each pull request in the list consists of a block locator string
544 and an ordered list of servers. Keepstore should try to fetch the
545 block from each server in turn.
547 If the request has not been sent by the Data Manager, return 401
550 If the JSON unmarshalling fails, return 400 Bad Request.
553 // PullRequest consists of a block locator and an ordered list of servers
554 type PullRequest struct {
555 Locator string `json:"locator"`
556 Servers []string `json:"servers"`
558 // Destination mount, or "" for "anywhere"
559 MountUUID string `json:"mount_uuid"`
562 // PullHandler processes "PUT /pull" requests for the data manager.
563 func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
564 // Reject unauthorized requests.
565 if !rtr.isSystemAuth(GetAPIToken(req)) {
566 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
570 // Parse the request body.
572 r := json.NewDecoder(req.Body)
573 if err := r.Decode(&pr); err != nil {
574 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
578 // We have a properly formatted pull list sent from the data
579 // manager. Report success and send the list to the pull list
580 // manager for further handling.
581 resp.WriteHeader(http.StatusOK)
583 fmt.Sprintf("Received %d pull requests\n", len(pr))))
586 for _, p := range pr {
589 rtr.pullq.ReplaceQueue(plist)
592 // TrashRequest consists of a block locator and its Mtime
593 type TrashRequest struct {
594 Locator string `json:"locator"`
595 BlockMtime int64 `json:"block_mtime"`
597 // Target mount, or "" for "everywhere"
598 MountUUID string `json:"mount_uuid"`
601 // TrashHandler processes /trash requests.
602 func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
603 // Reject unauthorized requests.
604 if !rtr.isSystemAuth(GetAPIToken(req)) {
605 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
609 // Parse the request body.
610 var trash []TrashRequest
611 r := json.NewDecoder(req.Body)
612 if err := r.Decode(&trash); err != nil {
613 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
617 // We have a properly formatted trash list sent from the data
618 // manager. Report success and send the list to the trash work
619 // queue for further handling.
620 resp.WriteHeader(http.StatusOK)
622 fmt.Sprintf("Received %d trash requests\n", len(trash))))
625 for _, t := range trash {
628 rtr.trashq.ReplaceQueue(tlist)
631 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
632 func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
633 // Reject unauthorized requests.
634 if !rtr.isSystemAuth(GetAPIToken(req)) {
635 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
639 log := ctxlog.FromContext(req.Context())
640 hash := mux.Vars(req)["hash"]
642 if len(rtr.volmgr.AllWritable()) == 0 {
643 http.Error(resp, "No writable volumes", http.StatusNotFound)
647 var untrashedOn, failedOn []string
649 for _, vol := range rtr.volmgr.AllWritable() {
650 err := vol.Untrash(hash)
652 if os.IsNotExist(err) {
654 } else if err != nil {
655 log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
656 failedOn = append(failedOn, vol.String())
658 log.Infof("Untrashed %v on volume %v", hash, vol.String())
659 untrashedOn = append(untrashedOn, vol.String())
663 if numNotFound == len(rtr.volmgr.AllWritable()) {
664 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
665 } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
666 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
668 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
669 if len(failedOn) > 0 {
670 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
671 http.Error(resp, respBody, http.StatusInternalServerError)
673 fmt.Fprintln(resp, respBody)
678 // GetBlock and PutBlock implement lower-level code for handling
679 // blocks by rooting through volumes connected to the local machine.
680 // Once the handler has determined that system policy permits the
681 // request, it calls these methods to perform the actual operation.
683 // TODO(twp): this code would probably be better located in the
684 // VolumeManager interface. As an abstraction, the VolumeManager
685 // should be the only part of the code that cares about which volume a
686 // block is stored on, so it should be responsible for figuring out
687 // which volume to check for fetching blocks, storing blocks, etc.
689 // GetBlock fetches the block identified by "hash" into the provided
690 // buf, and returns the data size.
692 // If the block cannot be found on any volume, returns NotFoundError.
694 // If the block found does not have the correct MD5 hash, returns
697 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
698 log := ctxlog.FromContext(ctx)
700 // Attempt to read the requested hash from a keep volume.
701 errorToCaller := NotFoundError
703 for _, vol := range volmgr.AllReadable() {
704 size, err := vol.Get(ctx, hash, buf)
707 return 0, ErrClientDisconnect
711 // IsNotExist is an expected error and may be
712 // ignored. All other errors are logged. In
713 // any case we continue trying to read other
714 // volumes. If all volumes report IsNotExist,
715 // we return a NotFoundError.
716 if !os.IsNotExist(err) {
717 log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
719 // If some volume returns a transient error, return it to the caller
720 // instead of "Not found" so it can retry.
721 if err == VolumeBusyError {
722 errorToCaller = err.(*KeepError)
726 // Check the file checksum.
727 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
728 if filehash != hash {
729 // TODO: Try harder to tell a sysadmin about
731 log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
732 errorToCaller = DiskHashError
735 if errorToCaller == DiskHashError {
736 log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
740 return 0, errorToCaller
743 type putProgress struct {
744 classTodo map[string]bool
745 mountUsed map[*VolumeMount]bool
747 classDone map[string]int
750 // Number of distinct replicas stored. "2" can mean the block was
751 // stored on 2 different volumes with replication 1, or on 1 volume
752 // with replication 2.
753 func (pr putProgress) TotalReplication() string {
754 return strconv.Itoa(pr.totalReplication)
757 // Number of replicas satisfying each storage class, formatted like
758 // "default=2; special=1".
759 func (pr putProgress) ClassReplication() string {
761 for k, v := range pr.classDone {
765 s += k + "=" + strconv.Itoa(v)
770 func (pr *putProgress) Add(mnt *VolumeMount) {
771 if pr.mountUsed[mnt] {
772 logrus.Warnf("BUG? superfluous extra write to mount %s", mnt)
775 pr.mountUsed[mnt] = true
776 pr.totalReplication += mnt.Replication
777 for class := range mnt.StorageClasses {
778 pr.classDone[class] += mnt.Replication
779 delete(pr.classTodo, class)
783 func (pr *putProgress) Done() bool {
784 return len(pr.classTodo) == 0 && pr.totalReplication > 0
787 func (pr *putProgress) Want(mnt *VolumeMount) bool {
788 if pr.Done() || pr.mountUsed[mnt] {
791 if len(pr.classTodo) == 0 {
792 // none specified == "any"
795 for class := range mnt.StorageClasses {
796 if pr.classTodo[class] {
803 func newPutResult(classes []string) putProgress {
805 classTodo: make(map[string]bool, len(classes)),
806 classDone: map[string]int{},
807 mountUsed: map[*VolumeMount]bool{},
809 for _, c := range classes {
811 pr.classTodo[c] = true
817 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
819 // PutBlock(ctx, block, hash)
820 // Stores the BLOCK (identified by the content id HASH) in Keep.
822 // The MD5 checksum of the block must be identical to the content id HASH.
823 // If not, an error is returned.
825 // PutBlock stores the BLOCK on the first Keep volume with free space.
826 // A failure code is returned to the user only if all volumes fail.
828 // On success, PutBlock returns nil.
829 // On failure, it returns a KeepError with one of the following codes:
832 // A different block with the same hash already exists on this
835 // The MD5 hash of the BLOCK does not match the argument HASH.
837 // There was not enough space left in any Keep volume to store
840 // The object could not be stored for some other reason (e.g.
841 // all writes failed). The text of the error message should
842 // provide as much detail as possible.
844 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) {
845 log := ctxlog.FromContext(ctx)
847 // Check that BLOCK's checksum matches HASH.
848 blockhash := fmt.Sprintf("%x", md5.Sum(block))
849 if blockhash != hash {
850 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
851 return putProgress{}, RequestHashError
854 result := newPutResult(wantStorageClasses)
856 // If we already have this data, it's intact on disk, and we
857 // can update its timestamp, return success. If we have
858 // different data with the same hash, return failure.
859 if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil {
862 if ctx.Err() != nil {
863 return result, ErrClientDisconnect
866 // Choose a Keep volume to write to.
867 // If this volume fails, try all of the volumes in order.
868 if mnt := volmgr.NextWritable(); mnt == nil || !result.Want(mnt) {
869 // fall through to "try all volumes" below
870 } else if err := mnt.Put(ctx, hash, block); err != nil {
871 log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
878 if ctx.Err() != nil {
879 return putProgress{}, ErrClientDisconnect
882 writables := volmgr.AllWritable()
883 if len(writables) == 0 {
884 log.Error("no writable volumes")
885 return putProgress{}, FullError
889 for _, mnt := range writables {
890 if !result.Want(mnt) {
893 err := mnt.Put(ctx, hash, block)
894 if ctx.Err() != nil {
895 return result, ErrClientDisconnect
907 // The volume is not full but the
908 // write did not succeed. Report the
909 // error and continue trying.
911 log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
915 if result.totalReplication > 0 {
916 // Some, but not all, of the storage classes were
917 // satisfied. This qualifies as success.
920 log.Error("all volumes with qualifying storage classes are full")
921 return putProgress{}, FullError
923 // Already logged the non-full errors.
924 return putProgress{}, GenericError
928 // CompareAndTouch looks for volumes where the given content already
929 // exists and its modification time can be updated (i.e., it is
930 // protected from garbage collection), and updates result accordingly.
931 // It returns when the result is Done() or all volumes have been
933 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, result *putProgress) error {
934 log := ctxlog.FromContext(ctx)
935 for _, mnt := range volmgr.AllWritable() {
936 if !result.Want(mnt) {
939 err := mnt.Compare(ctx, hash, buf)
940 if ctx.Err() != nil {
942 } else if err == CollisionError {
943 // Stop if we have a block with same hash but
944 // different content. (It will be impossible
945 // to tell which one is wanted if we have
946 // both, so there's no point writing it even
947 // on a different volume.)
948 log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
949 return CollisionError
950 } else if os.IsNotExist(err) {
951 // Block does not exist. This is the only
952 // "normal" error: we don't log anything.
954 } else if err != nil {
955 // Couldn't open file, data is corrupt on
956 // disk, etc.: log this abnormal condition,
957 // and try the next volume.
958 log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
961 if err := mnt.Touch(hash); err != nil {
962 log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
965 // Compare and Touch both worked --> done.
974 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
976 // IsValidLocator returns true if the specified string is a valid Keep locator.
977 // When Keep is extended to support hash types other than MD5,
978 // this should be updated to cover those as well.
980 func IsValidLocator(loc string) bool {
981 return validLocatorRe.MatchString(loc)
984 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
986 // GetAPIToken returns the OAuth2 token from the Authorization
987 // header of a HTTP request, or an empty string if no matching
989 func GetAPIToken(req *http.Request) string {
990 if auth, ok := req.Header["Authorization"]; ok {
991 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
998 // canDelete returns true if the user identified by apiToken is
999 // allowed to delete blocks.
1000 func (rtr *router) canDelete(apiToken string) bool {
1004 // Blocks may be deleted only when Keep has been configured with a
1006 if rtr.isSystemAuth(apiToken) {
1009 // TODO(twp): look up apiToken with the API server
1010 // return true if is_admin is true and if the token
1011 // has unlimited scope
1015 // isSystemAuth returns true if the given token is allowed to perform
1016 // system level actions like deleting data.
1017 func (rtr *router) isSystemAuth(token string) bool {
1018 return token != "" && token == rtr.cluster.SystemRootToken