1 // Copyright (C) The Arvados Authors. All rights reserved.
3 // SPDX-License-Identifier: AGPL-3.0
24 "git.arvados.org/arvados.git/sdk/go/arvados"
25 "git.arvados.org/arvados.git/sdk/go/ctxlog"
26 "git.arvados.org/arvados.git/sdk/go/health"
27 "git.arvados.org/arvados.git/sdk/go/httpserver"
28 "github.com/gorilla/mux"
29 "github.com/prometheus/client_golang/prometheus"
30 "github.com/sirupsen/logrus"
35 cluster *arvados.Cluster
36 logger logrus.FieldLogger
37 remoteProxy remoteProxy
39 volmgr *RRVolumeManager
44 // MakeRESTRouter returns a new router that forwards all Keep requests
45 // to the appropriate handlers.
46 func MakeRESTRouter(ctx context.Context, cluster *arvados.Cluster, reg *prometheus.Registry, volmgr *RRVolumeManager, pullq, trashq *WorkQueue) http.Handler {
48 Router: mux.NewRouter(),
50 logger: ctxlog.FromContext(ctx),
51 metrics: &nodeMetrics{reg: reg},
58 `/{hash:[0-9a-f]{32}}`, rtr.handleGET).Methods("GET", "HEAD")
60 `/{hash:[0-9a-f]{32}}+{hints}`,
61 rtr.handleGET).Methods("GET", "HEAD")
63 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handlePUT).Methods("PUT")
64 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleDELETE).Methods("DELETE")
65 // List all blocks stored here. Privileged client only.
66 rtr.HandleFunc(`/index`, rtr.handleIndex).Methods("GET", "HEAD")
67 // List blocks stored here whose hash has the given prefix.
68 // Privileged client only.
69 rtr.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, rtr.handleIndex).Methods("GET", "HEAD")
70 // Update timestamp on existing block. Privileged client only.
71 rtr.HandleFunc(`/{hash:[0-9a-f]{32}}`, rtr.handleTOUCH).Methods("TOUCH")
73 // Internals/debugging info (runtime.MemStats)
74 rtr.HandleFunc(`/debug.json`, rtr.DebugHandler).Methods("GET", "HEAD")
76 // List volumes: path, device number, bytes used/avail.
77 rtr.HandleFunc(`/status.json`, rtr.StatusHandler).Methods("GET", "HEAD")
79 // List mounts: UUID, readonly, tier, device ID, ...
80 rtr.HandleFunc(`/mounts`, rtr.MountsHandler).Methods("GET")
81 rtr.HandleFunc(`/mounts/{uuid}/blocks`, rtr.handleIndex).Methods("GET")
82 rtr.HandleFunc(`/mounts/{uuid}/blocks/`, rtr.handleIndex).Methods("GET")
84 // Replace the current pull queue.
85 rtr.HandleFunc(`/pull`, rtr.handlePull).Methods("PUT")
87 // Replace the current trash queue.
88 rtr.HandleFunc(`/trash`, rtr.handleTrash).Methods("PUT")
90 // Untrash moves blocks from trash back into store
91 rtr.HandleFunc(`/untrash/{hash:[0-9a-f]{32}}`, rtr.handleUntrash).Methods("PUT")
93 rtr.Handle("/_health/{check}", &health.Handler{
94 Token: cluster.ManagementToken,
98 // Any request which does not match any of these routes gets
100 rtr.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
102 rtr.metrics.setupBufferPoolMetrics(bufs)
103 rtr.metrics.setupWorkQueueMetrics(rtr.pullq, "pull")
104 rtr.metrics.setupWorkQueueMetrics(rtr.trashq, "trash")
109 // BadRequestHandler is a HandleFunc to address bad requests.
110 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
111 http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
114 func (rtr *router) handleGET(resp http.ResponseWriter, req *http.Request) {
115 locator := req.URL.Path[1:]
116 if strings.Contains(locator, "+R") && !strings.Contains(locator, "+A") {
117 rtr.remoteProxy.Get(req.Context(), resp, req, rtr.cluster, rtr.volmgr)
121 if rtr.cluster.Collections.BlobSigning {
122 locator := req.URL.Path[1:] // strip leading slash
123 if err := VerifySignature(rtr.cluster, locator, GetAPIToken(req)); err != nil {
124 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
129 // TODO: Probe volumes to check whether the block _might_
130 // exist. Some volumes/types could support a quick existence
131 // check without causing other operations to suffer. If all
132 // volumes support that, and assure us the block definitely
133 // isn't here, we can return 404 now instead of waiting for a
136 buf, err := getBufferWithContext(req.Context(), bufs, BlockSize)
138 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
143 size, err := GetBlock(req.Context(), rtr.volmgr, mux.Vars(req)["hash"], buf, resp)
145 code := http.StatusInternalServerError
146 if err, ok := err.(*KeepError); ok {
149 http.Error(resp, err.Error(), code)
153 resp.Header().Set("Content-Length", strconv.Itoa(size))
154 resp.Header().Set("Content-Type", "application/octet-stream")
155 resp.Write(buf[:size])
158 // Get a buffer from the pool -- but give up and return a non-nil
159 // error if ctx ends before we get a buffer.
160 func getBufferWithContext(ctx context.Context, bufs *bufferPool, bufSize int) ([]byte, error) {
161 bufReady := make(chan []byte)
163 bufReady <- bufs.Get(bufSize)
166 case buf := <-bufReady:
170 // Even if closeNotifier happened first, we
171 // need to keep waiting for our buf so we can
172 // return it to the pool.
175 return nil, ErrClientDisconnect
179 func (rtr *router) handleTOUCH(resp http.ResponseWriter, req *http.Request) {
180 if !rtr.isSystemAuth(GetAPIToken(req)) {
181 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
184 hash := mux.Vars(req)["hash"]
185 vols := rtr.volmgr.AllWritable()
187 http.Error(resp, "no volumes", http.StatusNotFound)
191 for _, mnt := range vols {
192 err = mnt.Touch(hash)
200 case os.IsNotExist(err):
201 http.Error(resp, err.Error(), http.StatusNotFound)
203 http.Error(resp, err.Error(), http.StatusInternalServerError)
207 func (rtr *router) handlePUT(resp http.ResponseWriter, req *http.Request) {
208 hash := mux.Vars(req)["hash"]
210 // Detect as many error conditions as possible before reading
211 // the body: avoid transmitting data that will not end up
212 // being written anyway.
214 if req.ContentLength == -1 {
215 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
219 if req.ContentLength > BlockSize {
220 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
224 if len(rtr.volmgr.AllWritable()) == 0 {
225 http.Error(resp, FullError.Error(), FullError.HTTPCode)
229 var wantStorageClasses []string
230 if hdr := req.Header.Get("X-Keep-Storage-Classes"); hdr != "" {
231 wantStorageClasses = strings.Split(hdr, ",")
232 for i, sc := range wantStorageClasses {
233 wantStorageClasses[i] = strings.TrimSpace(sc)
236 // none specified -- use configured default
237 for class, cfg := range rtr.cluster.StorageClasses {
239 wantStorageClasses = append(wantStorageClasses, class)
244 buf, err := getBufferWithContext(req.Context(), bufs, int(req.ContentLength))
246 http.Error(resp, err.Error(), http.StatusServiceUnavailable)
250 _, err = io.ReadFull(req.Body, buf)
252 http.Error(resp, err.Error(), 500)
257 result, err := PutBlock(req.Context(), rtr.volmgr, buf, hash, wantStorageClasses)
261 code := http.StatusInternalServerError
262 if err, ok := err.(*KeepError); ok {
265 http.Error(resp, err.Error(), code)
269 // Success; add a size hint, sign the locator if possible, and
270 // return it to the client.
271 returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
272 apiToken := GetAPIToken(req)
273 if rtr.cluster.Collections.BlobSigningKey != "" && apiToken != "" {
274 expiry := time.Now().Add(rtr.cluster.Collections.BlobSigningTTL.Duration())
275 returnHash = SignLocator(rtr.cluster, returnHash, apiToken, expiry)
277 resp.Header().Set("X-Keep-Replicas-Stored", result.TotalReplication())
278 resp.Header().Set("X-Keep-Storage-Classes-Confirmed", result.ClassReplication())
279 resp.Write([]byte(returnHash + "\n"))
282 // IndexHandler responds to "/index", "/index/{prefix}", and
283 // "/mounts/{uuid}/blocks" requests.
284 func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
285 if !rtr.isSystemAuth(GetAPIToken(req)) {
286 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
290 prefix := mux.Vars(req)["prefix"]
293 prefix = req.Form.Get("prefix")
296 uuid := mux.Vars(req)["uuid"]
298 var vols []*VolumeMount
300 vols = rtr.volmgr.AllReadable()
301 } else if mnt := rtr.volmgr.Lookup(uuid, false); mnt == nil {
302 http.Error(resp, "mount not found", http.StatusNotFound)
305 vols = []*VolumeMount{mnt}
308 for _, v := range vols {
309 if err := v.IndexTo(prefix, resp); err != nil {
310 // We can't send an error status/message to
311 // the client because IndexTo() might have
312 // already written body content. All we can do
313 // is log the error in our own logs.
315 // The client must notice the lack of trailing
316 // newline as an indication that the response
318 ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
322 // An empty line at EOF is the only way the client can be
323 // assured the entire index was received.
324 resp.Write([]byte{'\n'})
327 // MountsHandler responds to "GET /mounts" requests.
328 func (rtr *router) MountsHandler(resp http.ResponseWriter, req *http.Request) {
329 err := json.NewEncoder(resp).Encode(rtr.volmgr.Mounts())
331 httpserver.Error(resp, err.Error(), http.StatusInternalServerError)
336 type PoolStatus struct {
337 Alloc uint64 `json:"BytesAllocatedCumulative"`
338 Cap int `json:"BuffersMax"`
339 Len int `json:"BuffersInUse"`
342 type volumeStatusEnt struct {
344 Status *VolumeStatus `json:",omitempty"`
345 VolumeStats *ioStats `json:",omitempty"`
346 InternalStats interface{} `json:",omitempty"`
350 type NodeStatus struct {
351 Volumes []*volumeStatusEnt
352 BufferPool PoolStatus
353 PullQueue WorkQueueStatus
354 TrashQueue WorkQueueStatus
361 var stLock sync.Mutex
363 // DebugHandler addresses /debug.json requests.
364 func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
365 type debugStats struct {
366 MemStats runtime.MemStats
369 runtime.ReadMemStats(&ds.MemStats)
370 data, err := json.Marshal(&ds)
372 http.Error(resp, err.Error(), http.StatusInternalServerError)
378 // StatusHandler addresses /status.json requests.
379 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
381 rtr.readNodeStatus(&st)
382 data, err := json.Marshal(&st)
385 http.Error(resp, err.Error(), http.StatusInternalServerError)
391 // populate the given NodeStatus struct with current values.
392 func (rtr *router) readNodeStatus(st *NodeStatus) {
394 vols := rtr.volmgr.AllReadable()
395 if cap(st.Volumes) < len(vols) {
396 st.Volumes = make([]*volumeStatusEnt, len(vols))
398 st.Volumes = st.Volumes[:0]
399 for _, vol := range vols {
400 var internalStats interface{}
401 if vol, ok := vol.Volume.(InternalStatser); ok {
402 internalStats = vol.InternalStats()
404 st.Volumes = append(st.Volumes, &volumeStatusEnt{
406 Status: vol.Status(),
407 InternalStats: internalStats,
408 //VolumeStats: rtr.volmgr.VolumeStats(vol),
411 st.BufferPool.Alloc = bufs.Alloc()
412 st.BufferPool.Cap = bufs.Cap()
413 st.BufferPool.Len = bufs.Len()
414 st.PullQueue = getWorkQueueStatus(rtr.pullq)
415 st.TrashQueue = getWorkQueueStatus(rtr.trashq)
418 // return a WorkQueueStatus for the given queue. If q is nil (which
419 // should never happen except in test suites), return a zero status
420 // value instead of crashing.
421 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
423 // This should only happen during tests.
424 return WorkQueueStatus{}
429 // handleDELETE processes DELETE requests.
431 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
432 // from all connected volumes.
434 // Only the Data Manager, or an Arvados admin with scope "all", are
435 // allowed to issue DELETE requests. If a DELETE request is not
436 // authenticated or is issued by a non-admin user, the server returns
437 // a PermissionError.
439 // Upon receiving a valid request from an authorized user,
440 // handleDELETE deletes all copies of the specified block on local
445 // If the requested blocks was not found on any volume, the response
446 // code is HTTP 404 Not Found.
448 // Otherwise, the response code is 200 OK, with a response body
449 // consisting of the JSON message
451 // {"copies_deleted":d,"copies_failed":f}
453 // where d and f are integers representing the number of blocks that
454 // were successfully and unsuccessfully deleted.
456 func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
457 hash := mux.Vars(req)["hash"]
459 // Confirm that this user is an admin and has a token with unlimited scope.
460 var tok = GetAPIToken(req)
461 if tok == "" || !rtr.canDelete(tok) {
462 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
466 if !rtr.cluster.Collections.BlobTrash {
467 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
471 // Delete copies of this block from all available volumes.
472 // Report how many blocks were successfully deleted, and how
473 // many were found on writable volumes but not deleted.
475 Deleted int `json:"copies_deleted"`
476 Failed int `json:"copies_failed"`
478 for _, vol := range rtr.volmgr.AllWritable() {
479 if err := vol.Trash(hash); err == nil {
481 } else if os.IsNotExist(err) {
485 ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
488 if result.Deleted == 0 && result.Failed == 0 {
489 resp.WriteHeader(http.StatusNotFound)
492 body, err := json.Marshal(result)
494 http.Error(resp, err.Error(), http.StatusInternalServerError)
500 /* PullHandler processes "PUT /pull" requests for the data manager.
501 The request body is a JSON message containing a list of pull
502 requests in the following format:
506 "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
508 "keep0.qr1hi.arvadosapi.com:25107",
509 "keep1.qr1hi.arvadosapi.com:25108"
513 "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
523 Each pull request in the list consists of a block locator string
524 and an ordered list of servers. Keepstore should try to fetch the
525 block from each server in turn.
527 If the request has not been sent by the Data Manager, return 401
530 If the JSON unmarshalling fails, return 400 Bad Request.
533 // PullRequest consists of a block locator and an ordered list of servers
534 type PullRequest struct {
535 Locator string `json:"locator"`
536 Servers []string `json:"servers"`
538 // Destination mount, or "" for "anywhere"
539 MountUUID string `json:"mount_uuid"`
542 // PullHandler processes "PUT /pull" requests for the data manager.
543 func (rtr *router) handlePull(resp http.ResponseWriter, req *http.Request) {
544 // Reject unauthorized requests.
545 if !rtr.isSystemAuth(GetAPIToken(req)) {
546 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
550 // Parse the request body.
552 r := json.NewDecoder(req.Body)
553 if err := r.Decode(&pr); err != nil {
554 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
558 // We have a properly formatted pull list sent from the data
559 // manager. Report success and send the list to the pull list
560 // manager for further handling.
561 resp.WriteHeader(http.StatusOK)
563 fmt.Sprintf("Received %d pull requests\n", len(pr))))
566 for _, p := range pr {
569 rtr.pullq.ReplaceQueue(plist)
572 // TrashRequest consists of a block locator and its Mtime
573 type TrashRequest struct {
574 Locator string `json:"locator"`
575 BlockMtime int64 `json:"block_mtime"`
577 // Target mount, or "" for "everywhere"
578 MountUUID string `json:"mount_uuid"`
581 // TrashHandler processes /trash requests.
582 func (rtr *router) handleTrash(resp http.ResponseWriter, req *http.Request) {
583 // Reject unauthorized requests.
584 if !rtr.isSystemAuth(GetAPIToken(req)) {
585 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
589 // Parse the request body.
590 var trash []TrashRequest
591 r := json.NewDecoder(req.Body)
592 if err := r.Decode(&trash); err != nil {
593 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
597 // We have a properly formatted trash list sent from the data
598 // manager. Report success and send the list to the trash work
599 // queue for further handling.
600 resp.WriteHeader(http.StatusOK)
602 fmt.Sprintf("Received %d trash requests\n", len(trash))))
605 for _, t := range trash {
608 rtr.trashq.ReplaceQueue(tlist)
611 // UntrashHandler processes "PUT /untrash/{hash:[0-9a-f]{32}}" requests for the data manager.
612 func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
613 // Reject unauthorized requests.
614 if !rtr.isSystemAuth(GetAPIToken(req)) {
615 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
619 log := ctxlog.FromContext(req.Context())
620 hash := mux.Vars(req)["hash"]
622 if len(rtr.volmgr.AllWritable()) == 0 {
623 http.Error(resp, "No writable volumes", http.StatusNotFound)
627 var untrashedOn, failedOn []string
629 for _, vol := range rtr.volmgr.AllWritable() {
630 err := vol.Untrash(hash)
632 if os.IsNotExist(err) {
634 } else if err != nil {
635 log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
636 failedOn = append(failedOn, vol.String())
638 log.Infof("Untrashed %v on volume %v", hash, vol.String())
639 untrashedOn = append(untrashedOn, vol.String())
643 if numNotFound == len(rtr.volmgr.AllWritable()) {
644 http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
645 } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
646 http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
648 respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
649 if len(failedOn) > 0 {
650 respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
651 http.Error(resp, respBody, http.StatusInternalServerError)
653 fmt.Fprintln(resp, respBody)
658 // GetBlock and PutBlock implement lower-level code for handling
659 // blocks by rooting through volumes connected to the local machine.
660 // Once the handler has determined that system policy permits the
661 // request, it calls these methods to perform the actual operation.
663 // TODO(twp): this code would probably be better located in the
664 // VolumeManager interface. As an abstraction, the VolumeManager
665 // should be the only part of the code that cares about which volume a
666 // block is stored on, so it should be responsible for figuring out
667 // which volume to check for fetching blocks, storing blocks, etc.
669 // GetBlock fetches the block identified by "hash" into the provided
670 // buf, and returns the data size.
672 // If the block cannot be found on any volume, returns NotFoundError.
674 // If the block found does not have the correct MD5 hash, returns
677 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
678 log := ctxlog.FromContext(ctx)
680 // Attempt to read the requested hash from a keep volume.
681 errorToCaller := NotFoundError
683 for _, vol := range volmgr.AllReadable() {
684 size, err := vol.Get(ctx, hash, buf)
687 return 0, ErrClientDisconnect
691 // IsNotExist is an expected error and may be
692 // ignored. All other errors are logged. In
693 // any case we continue trying to read other
694 // volumes. If all volumes report IsNotExist,
695 // we return a NotFoundError.
696 if !os.IsNotExist(err) {
697 log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
699 // If some volume returns a transient error, return it to the caller
700 // instead of "Not found" so it can retry.
701 if err == VolumeBusyError {
702 errorToCaller = err.(*KeepError)
706 // Check the file checksum.
707 filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
708 if filehash != hash {
709 // TODO: Try harder to tell a sysadmin about
711 log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
712 errorToCaller = DiskHashError
715 if errorToCaller == DiskHashError {
716 log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
720 return 0, errorToCaller
723 type putProgress struct {
724 classNeeded map[string]bool
725 classTodo map[string]bool
726 mountUsed map[*VolumeMount]bool
728 classDone map[string]int
731 // Number of distinct replicas stored. "2" can mean the block was
732 // stored on 2 different volumes with replication 1, or on 1 volume
733 // with replication 2.
734 func (pr putProgress) TotalReplication() string {
735 return strconv.Itoa(pr.totalReplication)
738 // Number of replicas satisfying each storage class, formatted like
739 // "default=2; special=1".
740 func (pr putProgress) ClassReplication() string {
742 for k, v := range pr.classDone {
746 s += k + "=" + strconv.Itoa(v)
751 func (pr *putProgress) Add(mnt *VolumeMount) {
752 if pr.mountUsed[mnt] {
753 logrus.Warnf("BUG? superfluous extra write to mount %s", mnt.UUID)
756 pr.mountUsed[mnt] = true
757 pr.totalReplication += mnt.Replication
758 for class := range mnt.StorageClasses {
759 pr.classDone[class] += mnt.Replication
760 delete(pr.classTodo, class)
764 func (pr *putProgress) Sub(mnt *VolumeMount) {
765 if !pr.mountUsed[mnt] {
766 logrus.Warnf("BUG? Sub called with no prior matching Add: %s", mnt.UUID)
769 pr.mountUsed[mnt] = false
770 pr.totalReplication -= mnt.Replication
771 for class := range mnt.StorageClasses {
772 pr.classDone[class] -= mnt.Replication
773 if pr.classNeeded[class] {
774 pr.classTodo[class] = true
779 func (pr *putProgress) Done() bool {
780 return len(pr.classTodo) == 0 && pr.totalReplication > 0
783 func (pr *putProgress) Want(mnt *VolumeMount) bool {
784 if pr.Done() || pr.mountUsed[mnt] {
787 if len(pr.classTodo) == 0 {
788 // none specified == "any"
791 for class := range mnt.StorageClasses {
792 if pr.classTodo[class] {
799 func (pr *putProgress) Copy() *putProgress {
801 classNeeded: pr.classNeeded,
802 classTodo: make(map[string]bool, len(pr.classTodo)),
803 classDone: make(map[string]int, len(pr.classDone)),
804 mountUsed: make(map[*VolumeMount]bool, len(pr.mountUsed)),
805 totalReplication: pr.totalReplication,
807 for k, v := range pr.classTodo {
810 for k, v := range pr.classDone {
813 for k, v := range pr.mountUsed {
819 func newPutProgress(classes []string) putProgress {
821 classNeeded: make(map[string]bool, len(classes)),
822 classTodo: make(map[string]bool, len(classes)),
823 classDone: map[string]int{},
824 mountUsed: map[*VolumeMount]bool{},
826 for _, c := range classes {
828 pr.classNeeded[c] = true
829 pr.classTodo[c] = true
835 // PutBlock stores the given block on one or more volumes.
837 // The MD5 checksum of the block must match the given hash.
839 // The block is written to each writable volume (ordered by priority
840 // and then UUID, see volume.go) until at least one replica has been
841 // stored in each of the requested storage classes.
843 // The returned error, if any, is a KeepError with one of the
847 // A different block with the same hash already exists on this
850 // The MD5 hash of the BLOCK does not match the argument HASH.
852 // There was not enough space left in any Keep volume to store
855 // The object could not be stored for some other reason (e.g.
856 // all writes failed). The text of the error message should
857 // provide as much detail as possible.
858 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string, wantStorageClasses []string) (putProgress, error) {
859 log := ctxlog.FromContext(ctx)
861 // Check that BLOCK's checksum matches HASH.
862 blockhash := fmt.Sprintf("%x", md5.Sum(block))
863 if blockhash != hash {
864 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
865 return putProgress{}, RequestHashError
868 result := newPutProgress(wantStorageClasses)
870 // If we already have this data, it's intact on disk, and we
871 // can update its timestamp, return success. If we have
872 // different data with the same hash, return failure.
873 if err := CompareAndTouch(ctx, volmgr, hash, block, &result); err != nil || result.Done() {
876 if ctx.Err() != nil {
877 return result, ErrClientDisconnect
880 writables := volmgr.NextWritable()
881 if len(writables) == 0 {
882 log.Error("no writable volumes")
883 return result, FullError
886 var wg sync.WaitGroup
888 cond := sync.Cond{L: &mtx}
889 // pending predicts what result will be if all pending writes
891 pending := result.Copy()
892 var allFull atomic.Value
895 // We hold the lock for the duration of the "each volume" loop
896 // below, except when it is released during cond.Wait().
899 for _, mnt := range writables {
900 // Wait until our decision to use this mount does not
901 // depend on the outcome of pending writes.
902 for result.Want(mnt) && !pending.Want(mnt) {
905 if !result.Want(mnt) {
912 log.Debugf("PutBlock: start write to %s", mnt.UUID)
914 err := mnt.Put(ctx, hash, block)
918 log.Debugf("PutBlock: write to %s failed", mnt.UUID)
921 log.Debugf("PutBlock: write to %s succeeded", mnt.UUID)
927 if err != nil && err != FullError && ctx.Err() == nil {
928 // The volume is not full but the
929 // write did not succeed. Report the
930 // error and continue trying.
932 log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
938 if ctx.Err() != nil {
939 return result, ErrClientDisconnect
945 if result.totalReplication > 0 {
946 // Some, but not all, of the storage classes were
947 // satisfied. This qualifies as success.
949 } else if allFull.Load().(bool) {
950 log.Error("all volumes with qualifying storage classes are full")
951 return putProgress{}, FullError
953 // Already logged the non-full errors.
954 return putProgress{}, GenericError
958 // CompareAndTouch looks for volumes where the given content already
959 // exists and its modification time can be updated (i.e., it is
960 // protected from garbage collection), and updates result accordingly.
961 // It returns when the result is Done() or all volumes have been
963 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, result *putProgress) error {
964 log := ctxlog.FromContext(ctx)
965 for _, mnt := range volmgr.AllWritable() {
966 if !result.Want(mnt) {
969 err := mnt.Compare(ctx, hash, buf)
970 if ctx.Err() != nil {
972 } else if err == CollisionError {
973 // Stop if we have a block with same hash but
974 // different content. (It will be impossible
975 // to tell which one is wanted if we have
976 // both, so there's no point writing it even
977 // on a different volume.)
978 log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
979 return CollisionError
980 } else if os.IsNotExist(err) {
981 // Block does not exist. This is the only
982 // "normal" error: we don't log anything.
984 } else if err != nil {
985 // Couldn't open file, data is corrupt on
986 // disk, etc.: log this abnormal condition,
987 // and try the next volume.
988 log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
991 if err := mnt.Touch(hash); err != nil {
992 log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
995 // Compare and Touch both worked --> done.
1004 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
1006 // IsValidLocator returns true if the specified string is a valid Keep locator.
1007 // When Keep is extended to support hash types other than MD5,
1008 // this should be updated to cover those as well.
1010 func IsValidLocator(loc string) bool {
1011 return validLocatorRe.MatchString(loc)
1014 var authRe = regexp.MustCompile(`^(OAuth2|Bearer)\s+(.*)`)
1016 // GetAPIToken returns the OAuth2 token from the Authorization
1017 // header of a HTTP request, or an empty string if no matching
1019 func GetAPIToken(req *http.Request) string {
1020 if auth, ok := req.Header["Authorization"]; ok {
1021 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
1028 // canDelete returns true if the user identified by apiToken is
1029 // allowed to delete blocks.
1030 func (rtr *router) canDelete(apiToken string) bool {
1034 // Blocks may be deleted only when Keep has been configured with a
1036 if rtr.isSystemAuth(apiToken) {
1039 // TODO(twp): look up apiToken with the API server
1040 // return true if is_admin is true and if the token
1041 // has unlimited scope
1045 // isSystemAuth returns true if the given token is allowed to perform
1046 // system level actions like deleting data.
1047 func (rtr *router) isSystemAuth(token string) bool {
1048 return token != "" && token == rtr.cluster.SystemRootToken