1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
24 "git.arvados.org/arvados.git/lib/cmd"
25 "git.arvados.org/arvados.git/sdk/go/arvados"
26 "git.arvados.org/arvados.git/sdk/go/ctxlog"
27 "git.arvados.org/arvados.git/sdk/go/health"
28 "git.arvados.org/arvados.git/sdk/go/httpserver"
29 "github.com/gorilla/mux"
30 "github.com/prometheus/client_golang/prometheus"
31 "github.com/sirupsen/logrus"
36 cluster *arvados.Cluster
37 logger logrus.FieldLogger
38 remoteProxy remoteProxy
40 volmgr *RRVolumeManager
45 // MakeRESTRouter returns a new router that forwards all Keep requests
46 // to the appropriate handlers.
47 func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
49 Router: mux.NewRouter(),
51 logger: ctxlog.FromContext(ctx),
52 metrics: &nodeMetrics{reg: reg},
59 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
61 `/{hash:[0-9a-f]{32}}+{hints}`,
62 rtr.handleGET).Methods("GET", "HEAD")
64 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
65 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
66 // List all blocks stored here. Privileged client only.
67 rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
68 // List blocks stored here whose hash has the given prefix.
69 // Privileged client only.
70 rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
71 // Update timestamp on existing block. Privileged client only.
72 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleTOUCH).Methods("TOUCH")
74 // Internals/debugging info (runtime.MemStats)
75 rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
77 // List volumes: path, device number, bytes used/avail.
78 rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
80 // List mounts: UUID, readonly, tier, device ID, ...
81 rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
82 rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
83 rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
85 // Replace the current pull queue.
86 rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
88 // Replace the current trash queue.
89 rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
91 // Untrash moves blocks from trash back into store
92 rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
94 rtr.Handle("/_health/{check}", &health.Handler{
95 Token: cluster.ManagementToken,
99 // Any request which does not match any of these routes gets
101 rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
103 rtr.metrics.setupBufferPoolMetrics(bufs)
104 rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
105 rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
110 // BadRequestHandler is a HandleFunc to address bad requests.
111 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
112 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
115 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
116 locator := req.URL.Path[1:]
117 if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
118 rtr.remoteProxy.Get(req.Context(), resp, req, rtr.cluster, rtr.volmgr)
122 if rtr.cluster.Collections.BlobSigning {
123 locator := req.URL.Path[1:] // strip leading slash
124 if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
125 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
130 // TODO: Probe volumes to check whether the block _might_
131 // exist. Some volumes/types could support a quick existence
132 // check without causing other operations to suffer. If all
133 // volumes support that, and assure us the block definitely
134 // isn't here, we can return 404 now instead of waiting for a
137 buf, err := getBufferWithContext(req.Context(), bufs, BlockSize)
139 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
144 size, err := GetBlock(req.Context(), rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
146 code := http.StatusInternalServerError
147 if err, ok := err.(*KeepError); ok {
150 http.Error(resp, err.Error(), code)
154 resp.Header().Set("Content-Length", strconv.Itoa(size))
155 resp.Header().Set("Content-Type", "application/octet-stream")
156 resp.Write(buf[:size])
159 // Get a buffer from the pool -- but give up and return a non-nil
160 // error if ctx ends before we get a buffer.
161 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
162 bufReady := make(chan []byte)
164 bufReady <- bufs.Get(bufSize)
167 case buf := <-bufReady:
171 // Even if closeNotifier happened first, we
172 // need to keep waiting for our buf so we can
173 // return it to the pool.
176 return nil, ErrClientDisconnect
180 func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
181 if !rtr.isSystemAuth(GetAPIToken(req)) {
182 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
185 hash := mux.Vars(req)["hash"]
186 vols := rtr.volmgr.AllWritable()
188 http.Error(resp, "no volumes", http.StatusNotFound)
192 for _, mnt := range vols {
193 err = mnt.Touch(hash)
201 case os.IsNotExist(err):
202 http.Error(resp, err.Error(), http.StatusNotFound)
204 http.Error(resp, err.Error(), http.StatusInternalServerError)
208 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
209 hash := mux.Vars(req)["hash"]
211 // Detect as many error conditions as possible before reading
212 // the body: avoid transmitting data that will not end up
213 // being written anyway.
215 if req.ContentLength == -1 {
216 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
220 if req.ContentLength > BlockSize {
221 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
225 if len(rtr.volmgr.AllWritable()) == 0 {
226 http.Error(resp, FullError.Error(), FullError.HTTPCode)
230 var wantStorageClasses []string
231 if hdr := req.Header.Get("X-Keep-Storage-Classes"); hdr != "" {
232 wantStorageClasses = strings.Split(hdr, ",")
233 for i, sc := range wantStorageClasses {
234 wantStorageClasses[i] = strings.TrimSpace(sc)
237 // none specified -- use configured default
238 for class, cfg := range rtr.cluster.StorageClasses {
240 wantStorageClasses = append(wantStorageClasses, class)
245 buf, err := getBufferWithContext(req.Context(), bufs, int(req.ContentLength))
247 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
251 _, err = io.ReadFull(req.Body, buf)
253 http.Error(resp, err.Error(), 500)
258 result, err := PutBlock(req.Context(), rtr.volmgr, buf, hash, wantStorageClasses)
262 code := http.StatusInternalServerError
263 if err, ok := err.(*KeepError); ok {
266 http.Error(resp, err.Error(), code)
270 // Success; add a size hint, sign the locator if possible, and
271 // return it to the client.
272 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
273 apiToken := GetAPIToken(req)
274 if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
275 expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
276 returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
278 resp.Header().Set("X-Keep-Replicas-Stored", result.TotalReplication())
279 resp.Header().Set("X-Keep-Storage-Classes-Confirmed", result.ClassReplication())
280 resp.Write([]byte(returnHash + "\n"))
283 // IndexHandler responds to "/index", "/index/{prefix}", and
284 // "/mounts/{uuid}/blocks" requests.
285 func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
286 if !rtr.isSystemAuth(GetAPIToken(req)) {
287 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
291 prefix := mux.Vars(req)["prefix"]
294 prefix = req.Form.Get("prefix")
297 uuid := mux.Vars(req)["uuid"]
299 var vols []*VolumeMount
301 vols = rtr.volmgr.AllReadable()
302 } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
303 http.Error(resp, "mount not found", http.StatusNotFound)
306 vols = []*VolumeMount{mnt}
309 for _, v := range vols {
310 if err := v.IndexTo(prefix, resp); err != nil {
311 // We can't send an error status/message to
312 // the client because IndexTo() might have
313 // already written body content. All we can do
314 // is log the error in our own logs.
316 // The client must notice the lack of trailing
317 // newline as an indication that the response
319 ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
323 // An empty line at EOF is the only way the client can be
324 // assured the entire index was received.
325 resp.Write([]byte{'\n'})
328 // MountsHandler responds to "GET /mounts" requests.
329 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
330 err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
332 httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
337 type PoolStatus struct {
338 Alloc uint64 `json:"BytesAllocatedCumulative"`
339 Cap int `json:"BuffersMax"`
340 Len int `json:"BuffersInUse"`
343 type volumeStatusEnt struct {
345 Status *VolumeStatus `json:",omitempty"`
346 VolumeStats *ioStats `json:",omitempty"`
347 InternalStats interface{} `json:",omitempty"`
351 type NodeStatus struct {
352 Volumes []*volumeStatusEnt
353 BufferPool PoolStatus
354 PullQueue WorkQueueStatus
355 TrashQueue WorkQueueStatus
362 var stLock sync.Mutex
364 // DebugHandler addresses /debug.json requests.
365 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
366 type debugStats struct {
367 MemStats runtime.MemStats
370 runtime.ReadMemStats(&ds.MemStats)
371 data, err := json.Marshal(&ds)
373 http.Error(resp, err.Error(), http.StatusInternalServerError)
379 // StatusHandler addresses /status.json requests.
380 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
382 rtr.readNodeStatus(&st)
383 data, err := json.Marshal(&st)
386 http.Error(resp, err.Error(), http.StatusInternalServerError)
392 // populate the given NodeStatus struct with current values.
393 func (rtr *router) readNodeStatus(st *NodeStatus) {
394 st.Version = strings.SplitN(cmd.Version.String(), " ", 2)[0]
395 vols := rtr.volmgr.AllReadable()
396 if cap(st.Volumes) < len(vols) {
397 st.Volumes = make([]*volumeStatusEnt, len(vols))
399 st.Volumes = st.Volumes[:0]
400 for _, vol := range vols {
401 var internalStats interface{}
402 if vol, ok := vol.Volume.(InternalStatser); ok {
403 internalStats = vol.InternalStats()
405 st.Volumes = append(st.Volumes, &volumeStatusEnt{
407 Status: vol.Status(),
408 InternalStats: internalStats,
409 //VolumeStats: rtr.volmgr.VolumeStats(vol),
412 st.BufferPool.Alloc = bufs.Alloc()
413 st.BufferPool.Cap = bufs.Cap()
414 st.BufferPool.Len = bufs.Len()
415 st.PullQueue = getWorkQueueStatus(rtr.pullq)
416 st.TrashQueue = getWorkQueueStatus(rtr.trashq)
419 // return a WorkQueueStatus for the given queue. If q is nil (which
420 // should never happen except in test suites), return a zero status
421 // value instead of crashing.
422 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
424 // This should only happen during tests.
425 return WorkQueueStatus{}
430 // handleDELETE processes DELETE requests.
432 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
433 // from all connected volumes.
435 // Only the Data Manager, or an Arvados admin with scope "all", are
436 // allowed to issue DELETE requests. If a DELETE request is not
437 // authenticated or is issued by a non-admin user, the server returns
438 // a PermissionError.
440 // Upon receiving a valid request from an authorized user,
441 // handleDELETE deletes all copies of the specified block on local
446 // If the requested blocks was not found on any volume, the response
447 // code is HTTP 404 Not Found.
449 // Otherwise, the response code is 200 OK, with a response body
450 // consisting of the JSON message
452 // {"copies_deleted":d,"copies_failed":f}
454 // where d and f are integers representing the number of blocks that
455 // were successfully and unsuccessfully deleted.
456 func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
457 hash := mux.Vars(req)["hash"]
459 // Confirm that this user is an admin and has a token with unlimited scope.
460 var tok = GetAPIToken(req)
461 if tok == "" || !rtr.canDelete(tok) {
462 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
466 if !rtr.cluster.Collections.BlobTrash {
467 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
471 // Delete copies of this block from all available volumes.
472 // Report how many blocks were successfully deleted, and how
473 // many were found on writable volumes but not deleted.
475 Deleted int `json:"copies_deleted"`
476 Failed int `json:"copies_failed"`
478 for _, vol := range rtr.volmgr.Mounts() {
479 if !vol.KeepMount.AllowTrash {
481 } else if err := vol.Trash(hash); err == nil {
483 } else if os.IsNotExist(err) {
487 ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
490 if result.Deleted == 0 && result.Failed == 0 {
491 resp.WriteHeader(http.StatusNotFound)
494 body, err := json.Marshal(result)
496 http.Error(resp, err.Error(), http.StatusInternalServerError)
502 /* PullHandler processes "PUT /pull" requests for the data manager.
503 The request body is a JSON message containing a list of pull
504 requests in the following format:
508 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
510 "keep0.qr1hi.arvadosapi.com:25107",
511 "keep1.qr1hi.arvadosapi.com:25108"
515 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
525 Each pull request in the list consists of a block locator string
526 and an ordered list of servers. Keepstore should try to fetch the
527 block from each server in turn.
529 If the request has not been sent by the Data Manager, return 401
532 If the JSON unmarshalling fails, return 400 Bad Request.
535 // PullRequest consists of a block locator and an ordered list of servers
536 type PullRequest struct {
537 Locator string `json:"locator"`
538 Servers []string `json:"servers"`
540 // Destination mount, or "" for "anywhere"
541 MountUUID string `json:"mount_uuid"`
544 // PullHandler processes "PUT /pull" requests for the data manager.
545 func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
546 // Reject unauthorized requests.
547 if !rtr.isSystemAuth(GetAPIToken(req)) {
548 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
552 // Parse the request body.
554 r := json.NewDecoder(req.Body)
555 if err := r.Decode(&pr); err != nil {
556 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
560 // We have a properly formatted pull list sent from the data
561 // manager. Report success and send the list to the pull list
562 // manager for further handling.
563 resp.WriteHeader(http.StatusOK)
565 fmt.Sprintf("Received %d pull requests\n", len(pr))))
568 for _, p := range pr {
571 rtr.pullq.ReplaceQueue(plist)
574 // TrashRequest consists of a block locator and its Mtime
575 type TrashRequest struct {
576 Locator string `json:"locator"`
577 BlockMtime int64 `json:"block_mtime"`
579 // Target mount, or "" for "everywhere"
580 MountUUID string `json:"mount_uuid"`
583 // TrashHandler processes /trash requests.
584 func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
585 // Reject unauthorized requests.
586 if !rtr.isSystemAuth(GetAPIToken(req)) {
587 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
591 // Parse the request body.
592 var trash []TrashRequest
593 r := json.NewDecoder(req.Body)
594 if err := r.Decode(&trash); err != nil {
595 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
599 // We have a properly formatted trash list sent from the data
600 // manager. Report success and send the list to the trash work
601 // queue for further handling.
602 resp.WriteHeader(http.StatusOK)
604 fmt.Sprintf("Received %d trash requests\n", len(trash))))
607 for _, t := range trash {
610 rtr.trashq.ReplaceQueue(tlist)
613 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
614 func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
615 // Reject unauthorized requests.
616 if !rtr.isSystemAuth(GetAPIToken(req)) {
617 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
621 log := ctxlog.FromContext(req.Context())
622 hash := mux.Vars(req)["hash"]
624 if len(rtr.volmgr.AllWritable()) == 0 {
625 http.Error(resp, "No writable volumes", http.StatusNotFound)
629 var untrashedOn, failedOn []string
631 for _, vol := range rtr.volmgr.AllWritable() {
632 err := vol.Untrash(hash)
634 if os.IsNotExist(err) {
636 } else if err != nil {
637 log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
638 failedOn = append(failedOn, vol.String())
640 log.Infof("Untrashed %v on volume %v", hash, vol.String())
641 untrashedOn = append(untrashedOn, vol.String())
645 if numNotFound == len(rtr.volmgr.AllWritable()) {
646 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
647 } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
648 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
650 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
651 if len(failedOn) > 0 {
652 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
653 http.Error(resp, respBody, http.StatusInternalServerError)
655 fmt.Fprintln(resp, respBody)
660 // GetBlock and PutBlock implement lower-level code for handling
661 // blocks by rooting through volumes connected to the local machine.
662 // Once the handler has determined that system policy permits the
663 // request, it calls these methods to perform the actual operation.
665 // TODO(twp): this code would probably be better located in the
666 // VolumeManager interface. As an abstraction, the VolumeManager
667 // should be the only part of the code that cares about which volume a
668 // block is stored on, so it should be responsible for figuring out
669 // which volume to check for fetching blocks, storing blocks, etc.
671 // GetBlock fetches the block identified by "hash" into the provided
672 // buf, and returns the data size.
674 // If the block cannot be found on any volume, returns NotFoundError.
676 // If the block found does not have the correct MD5 hash, returns
678 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
679 log := ctxlog.FromContext(ctx)
681 // Attempt to read the requested hash from a keep volume.
682 errorToCaller := NotFoundError
684 for _, vol := range volmgr.AllReadable() {
685 size, err := vol.Get(ctx, hash, buf)
688 return 0, ErrClientDisconnect
692 // IsNotExist is an expected error and may be
693 // ignored. All other errors are logged. In
694 // any case we continue trying to read other
695 // volumes. If all volumes report IsNotExist,
696 // we return a NotFoundError.
697 if !os.IsNotExist(err) {
698 log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
700 // If some volume returns a transient error, return it to the caller
701 // instead of "Not found" so it can retry.
702 if err == VolumeBusyError {
703 errorToCaller = err.(*KeepError)
707 // Check the file checksum.
708 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
709 if filehash != hash {
710 // TODO: Try harder to tell a sysadmin about
712 log.Errorf("checksum mismatch for block %s (actual %s), size %d on %s", hash, filehash, size, vol)
713 errorToCaller = DiskHashError
716 if errorToCaller == DiskHashError {
717 log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
721 return 0, errorToCaller
724 type putProgress struct {
725 classNeeded map[string]bool
726 classTodo map[string]bool
727 mountUsed map[*VolumeMount]bool
729 classDone map[string]int
732 // Number of distinct replicas stored. "2" can mean the block was
733 // stored on 2 different volumes with replication 1, or on 1 volume
734 // with replication 2.
735 func (pr putProgress) TotalReplication() string {
736 return strconv.Itoa(pr.totalReplication)
739 // Number of replicas satisfying each storage class, formatted like
740 // "default=2; special=1".
741 func (pr putProgress) ClassReplication() string {
743 for k, v := range pr.classDone {
747 s += k + "=" + strconv.Itoa(v)
752 func (pr *putProgress) Add(mnt *VolumeMount) {
753 if pr.mountUsed[mnt] {
754 logrus.Warnf("BUG? superfluous extra write to mount %s", mnt.UUID)
757 pr.mountUsed[mnt] = true
758 pr.totalReplication += mnt.Replication
759 for class := range mnt.StorageClasses {
760 pr.classDone[class] += mnt.Replication
761 delete(pr.classTodo, class)
765 func (pr *putProgress) Sub(mnt *VolumeMount) {
766 if !pr.mountUsed[mnt] {
767 logrus.Warnf("BUG? Sub called with no prior matching Add: %s", mnt.UUID)
770 pr.mountUsed[mnt] = false
771 pr.totalReplication -= mnt.Replication
772 for class := range mnt.StorageClasses {
773 pr.classDone[class] -= mnt.Replication
774 if pr.classNeeded[class] {
775 pr.classTodo[class] = true
780 func (pr *putProgress) Done() bool {
781 return len(pr.classTodo) == 0 && pr.totalReplication > 0
784 func (pr *putProgress) Want(mnt *VolumeMount) bool {
785 if pr.Done() || pr.mountUsed[mnt] {
788 if len(pr.classTodo) == 0 {
789 // none specified == "any"
792 for class := range mnt.StorageClasses {
793 if pr.classTodo[class] {
800 func (pr *putProgress) Copy() *putProgress {
802 classNeeded: pr.classNeeded,
803 classTodo: make(map[string]bool, len(pr.classTodo)),
804 classDone: make(map[string]int, len(pr.classDone)),
805 mountUsed: make(map[*VolumeMount]bool, len(pr.mountUsed)),
806 totalReplication: pr.totalReplication,
808 for k, v := range pr.classTodo {
811 for k, v := range pr.classDone {
814 for k, v := range pr.mountUsed {
820 func newPutProgress(classes []string) putProgress {
822 classNeeded: make(map[string]bool, len(classes)),
823 classTodo: make(map[string]bool, len(classes)),
824 classDone: map[string]int{},
825 mountUsed: map[*VolumeMount]bool{},
827 for _, c := range classes {
829 pr.classNeeded[c] = true
830 pr.classTodo[c] = true
836 // PutBlock stores the given block on one or more volumes.
838 // The MD5 checksum of the block must match the given hash.
840 // The block is written to each writable volume (ordered by priority
841 // and then UUID, see volume.go) until at least one replica has been
842 // stored in each of the requested storage classes.
844 // The returned error, if any, is a KeepError with one of the
849 // A different block with the same hash already exists on this
854 // The MD5 hash of the BLOCK does not match the argument HASH.
858 // There was not enough space left in any Keep volume to store
863 // The object could not be stored for some other reason (e.g.
864 // all writes failed). The text of the error message should
865 // provide as much detail as possible.
866 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) {
867 log := ctxlog.FromContext(ctx)
869 // Check that BLOCK's checksum matches HASH.
870 blockhash := fmt.Sprintf("%x", md5.Sum(block))
871 if blockhash != hash {
872 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
873 return putProgress{}, RequestHashError
876 result := newPutProgress(wantStorageClasses)
878 // If we already have this data, it's intact on disk, and we
879 // can update its timestamp, return success. If we have
880 // different data with the same hash, return failure.
881 if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil || result.Done() {
884 if ctx.Err() != nil {
885 return result, ErrClientDisconnect
888 writables := volmgr.NextWritable()
889 if len(writables) == 0 {
890 log.Error("no writable volumes")
891 return result, FullError
894 var wg sync.WaitGroup
896 cond := sync.Cond{L: &mtx}
897 // pending predicts what result will be if all pending writes
899 pending := result.Copy()
900 var allFull atomic.Value
903 // We hold the lock for the duration of the "each volume" loop
904 // below, except when it is released during cond.Wait().
907 for _, mnt := range writables {
908 // Wait until our decision to use this mount does not
909 // depend on the outcome of pending writes.
910 for result.Want(mnt) && !pending.Want(mnt) {
913 if !result.Want(mnt) {
920 log.Debugf("PutBlock: start write to %s", mnt.UUID)
922 err := mnt.Put(ctx, hash, block)
926 log.Debugf("PutBlock: write to %s failed", mnt.UUID)
929 log.Debugf("PutBlock: write to %s succeeded", mnt.UUID)
935 if err != nil && err != FullError && ctx.Err() == nil {
936 // The volume is not full but the
937 // write did not succeed. Report the
938 // error and continue trying.
940 log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
946 if ctx.Err() != nil {
947 return result, ErrClientDisconnect
953 if result.totalReplication > 0 {
954 // Some, but not all, of the storage classes were
955 // satisfied. This qualifies as success.
957 } else if allFull.Load().(bool) {
958 log.Error("all volumes with qualifying storage classes are full")
959 return putProgress{}, FullError
961 // Already logged the non-full errors.
962 return putProgress{}, GenericError
966 // CompareAndTouch looks for volumes where the given content already
967 // exists and its modification time can be updated (i.e., it is
968 // protected from garbage collection), and updates result accordingly.
969 // It returns when the result is Done() or all volumes have been
971 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, result *putProgress) error {
972 log := ctxlog.FromContext(ctx)
973 for _, mnt := range volmgr.AllWritable() {
974 if !result.Want(mnt) {
977 err := mnt.Compare(ctx, hash, buf)
978 if ctx.Err() != nil {
980 } else if err == CollisionError {
981 // Stop if we have a block with same hash but
982 // different content. (It will be impossible
983 // to tell which one is wanted if we have
984 // both, so there's no point writing it even
985 // on a different volume.)
986 log.Errorf("collision in Compare(%s) on volume %s", hash, mnt.Volume)
987 return CollisionError
988 } else if os.IsNotExist(err) {
989 // Block does not exist. This is the only
990 // "normal" error: we don't log anything.
992 } else if err != nil {
993 // Couldn't open file, data is corrupt on
994 // disk, etc.: log this abnormal condition,
995 // and try the next volume.
996 log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
999 if err := mnt.Touch(hash); err != nil {
1000 log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
1003 // Compare and Touch both worked --> done.
1012 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
1014 // IsValidLocator returns true if the specified string is a valid Keep
1015 // locator. When Keep is extended to support hash types other than
1016 // MD5, this should be updated to cover those as well.
1017 func IsValidLocator(loc string) bool {
1018 return validLocatorRe.MatchString(loc)
1021 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
1023 // GetAPIToken returns the OAuth2 token from the Authorization
1024 // header of a HTTP request, or an empty string if no matching
1026 func GetAPIToken(req *http.Request) string {
1027 if auth, ok := req.Header["Authorization"]; ok {
1028 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
1035 // canDelete returns true if the user identified by apiToken is
1036 // allowed to delete blocks.
1037 func (rtr *router) canDelete(apiToken string) bool {
1041 // Blocks may be deleted only when Keep has been configured with a
1043 if rtr.isSystemAuth(apiToken) {
1046 // TODO(twp): look up apiToken with the API server
1047 // return true if is_admin is true and if the token
1048 // has unlimited scope
1052 // isSystemAuth returns true if the given token is allowed to perform
1053 // system level actions like deleting data.
1054 func (rtr *router) isSystemAuth(token string) bool {
1055 return token != "" && token == rtr.cluster.SystemRootToken