8178: rename Delete api as Trash; add Untrash to volume interface; add UndeleteHandle...
[arvados.git] / services / keepstore / handlers.go
1 package main
2
3 // REST handlers for Keep are implemented here.
4 //
5 // GetBlockHandler (GET /locator)
6 // PutBlockHandler (PUT /locator)
7 // IndexHandler    (GET /index, GET /index/prefix)
8 // StatusHandler   (GET /status.json)
9
10 import (
11         "container/list"
12         "crypto/md5"
13         "encoding/json"
14         "fmt"
15         "github.com/gorilla/mux"
16         "io"
17         "log"
18         "net/http"
19         "os"
20         "regexp"
21         "runtime"
22         "strconv"
23         "sync"
24         "time"
25 )
26
27 // MakeRESTRouter returns a new mux.Router that forwards all Keep
28 // requests to the appropriate handlers.
29 //
30 func MakeRESTRouter() *mux.Router {
31         rest := mux.NewRouter()
32
33         rest.HandleFunc(
34                 `/{hash:[0-9a-f]{32}}`, GetBlockHandler).Methods("GET", "HEAD")
35         rest.HandleFunc(
36                 `/{hash:[0-9a-f]{32}}+{hints}`,
37                 GetBlockHandler).Methods("GET", "HEAD")
38
39         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, PutBlockHandler).Methods("PUT")
40         rest.HandleFunc(`/{hash:[0-9a-f]{32}}`, DeleteHandler).Methods("DELETE")
41         // List all blocks stored here. Privileged client only.
42         rest.HandleFunc(`/index`, IndexHandler).Methods("GET", "HEAD")
43         // List blocks stored here whose hash has the given prefix.
44         // Privileged client only.
45         rest.HandleFunc(`/index/{prefix:[0-9a-f]{0,32}}`, IndexHandler).Methods("GET", "HEAD")
46
47         // List volumes: path, device number, bytes used/avail.
48         rest.HandleFunc(`/status.json`, StatusHandler).Methods("GET", "HEAD")
49
50         // Replace the current pull queue.
51         rest.HandleFunc(`/pull`, PullHandler).Methods("PUT")
52
53         // Replace the current trash queue.
54         rest.HandleFunc(`/trash`, TrashHandler).Methods("PUT")
55
56         // Undelete moves blocks from trash back into store
57         rest.HandleFunc(`/undelete/{hash:[0-9a-f]{32}}`, UndeleteHandler).Methods("PUT")
58
59         // Any request which does not match any of these routes gets
60         // 400 Bad Request.
61         rest.NotFoundHandler = http.HandlerFunc(BadRequestHandler)
62
63         return rest
64 }
65
66 // BadRequestHandler is a HandleFunc to address bad requests.
67 func BadRequestHandler(w http.ResponseWriter, r *http.Request) {
68         http.Error(w, BadRequestError.Error(), BadRequestError.HTTPCode)
69 }
70
71 // GetBlockHandler is a HandleFunc to address Get block requests.
72 func GetBlockHandler(resp http.ResponseWriter, req *http.Request) {
73         if enforcePermissions {
74                 locator := req.URL.Path[1:] // strip leading slash
75                 if err := VerifySignature(locator, GetApiToken(req)); err != nil {
76                         http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
77                         return
78                 }
79         }
80
81         block, err := GetBlock(mux.Vars(req)["hash"])
82         if err != nil {
83                 // This type assertion is safe because the only errors
84                 // GetBlock can return are DiskHashError or NotFoundError.
85                 http.Error(resp, err.Error(), err.(*KeepError).HTTPCode)
86                 return
87         }
88         defer bufs.Put(block)
89
90         resp.Header().Set("Content-Length", strconv.Itoa(len(block)))
91         resp.Header().Set("Content-Type", "application/octet-stream")
92         resp.Write(block)
93 }
94
95 // PutBlockHandler is a HandleFunc to address Put block requests.
96 func PutBlockHandler(resp http.ResponseWriter, req *http.Request) {
97         hash := mux.Vars(req)["hash"]
98
99         // Detect as many error conditions as possible before reading
100         // the body: avoid transmitting data that will not end up
101         // being written anyway.
102
103         if req.ContentLength == -1 {
104                 http.Error(resp, SizeRequiredError.Error(), SizeRequiredError.HTTPCode)
105                 return
106         }
107
108         if req.ContentLength > BlockSize {
109                 http.Error(resp, TooLongError.Error(), TooLongError.HTTPCode)
110                 return
111         }
112
113         if len(KeepVM.AllWritable()) == 0 {
114                 http.Error(resp, FullError.Error(), FullError.HTTPCode)
115                 return
116         }
117
118         buf := bufs.Get(int(req.ContentLength))
119         _, err := io.ReadFull(req.Body, buf)
120         if err != nil {
121                 http.Error(resp, err.Error(), 500)
122                 bufs.Put(buf)
123                 return
124         }
125
126         replication, err := PutBlock(buf, hash)
127         bufs.Put(buf)
128
129         if err != nil {
130                 ke := err.(*KeepError)
131                 http.Error(resp, ke.Error(), ke.HTTPCode)
132                 return
133         }
134
135         // Success; add a size hint, sign the locator if possible, and
136         // return it to the client.
137         returnHash := fmt.Sprintf("%s+%d", hash, req.ContentLength)
138         apiToken := GetApiToken(req)
139         if PermissionSecret != nil && apiToken != "" {
140                 expiry := time.Now().Add(blobSignatureTTL)
141                 returnHash = SignLocator(returnHash, apiToken, expiry)
142         }
143         resp.Header().Set("X-Keep-Replicas-Stored", strconv.Itoa(replication))
144         resp.Write([]byte(returnHash + "\n"))
145 }
146
147 // IndexHandler is a HandleFunc to address /index and /index/{prefix} requests.
148 func IndexHandler(resp http.ResponseWriter, req *http.Request) {
149         // Reject unauthorized requests.
150         if !IsDataManagerToken(GetApiToken(req)) {
151                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
152                 return
153         }
154
155         prefix := mux.Vars(req)["prefix"]
156
157         for _, vol := range KeepVM.AllReadable() {
158                 if err := vol.IndexTo(prefix, resp); err != nil {
159                         // The only errors returned by IndexTo are
160                         // write errors returned by resp.Write(),
161                         // which probably means the client has
162                         // disconnected and this error will never be
163                         // reported to the client -- but it will
164                         // appear in our own error log.
165                         http.Error(resp, err.Error(), http.StatusInternalServerError)
166                         return
167                 }
168         }
169         // An empty line at EOF is the only way the client can be
170         // assured the entire index was received.
171         resp.Write([]byte{'\n'})
172 }
173
174 // StatusHandler
175 //     Responds to /status.json requests with the current node status,
176 //     described in a JSON structure.
177 //
178 //     The data given in a status.json response includes:
179 //        volumes - a list of Keep volumes currently in use by this server
180 //          each volume is an object with the following fields:
181 //            * mount_point
182 //            * device_num (an integer identifying the underlying filesystem)
183 //            * bytes_free
184 //            * bytes_used
185
186 // PoolStatus struct
187 type PoolStatus struct {
188         Alloc uint64 `json:"BytesAllocated"`
189         Cap   int    `json:"BuffersMax"`
190         Len   int    `json:"BuffersInUse"`
191 }
192
193 // NodeStatus struct
194 type NodeStatus struct {
195         Volumes    []*VolumeStatus `json:"volumes"`
196         BufferPool PoolStatus
197         PullQueue  WorkQueueStatus
198         TrashQueue WorkQueueStatus
199         Memory     runtime.MemStats
200 }
201
202 var st NodeStatus
203 var stLock sync.Mutex
204
205 // StatusHandler addresses /status.json requests.
206 func StatusHandler(resp http.ResponseWriter, req *http.Request) {
207         stLock.Lock()
208         readNodeStatus(&st)
209         jstat, err := json.Marshal(&st)
210         stLock.Unlock()
211         if err == nil {
212                 resp.Write(jstat)
213         } else {
214                 log.Printf("json.Marshal: %s", err)
215                 log.Printf("NodeStatus = %v", &st)
216                 http.Error(resp, err.Error(), 500)
217         }
218 }
219
220 // populate the given NodeStatus struct with current values.
221 func readNodeStatus(st *NodeStatus) {
222         vols := KeepVM.AllReadable()
223         if cap(st.Volumes) < len(vols) {
224                 st.Volumes = make([]*VolumeStatus, len(vols))
225         }
226         st.Volumes = st.Volumes[:0]
227         for _, vol := range vols {
228                 if s := vol.Status(); s != nil {
229                         st.Volumes = append(st.Volumes, s)
230                 }
231         }
232         st.BufferPool.Alloc = bufs.Alloc()
233         st.BufferPool.Cap = bufs.Cap()
234         st.BufferPool.Len = bufs.Len()
235         st.PullQueue = getWorkQueueStatus(pullq)
236         st.TrashQueue = getWorkQueueStatus(trashq)
237         runtime.ReadMemStats(&st.Memory)
238 }
239
240 // return a WorkQueueStatus for the given queue. If q is nil (which
241 // should never happen except in test suites), return a zero status
242 // value instead of crashing.
243 func getWorkQueueStatus(q *WorkQueue) WorkQueueStatus {
244         if q == nil {
245                 // This should only happen during tests.
246                 return WorkQueueStatus{}
247         }
248         return q.Status()
249 }
250
251 // DeleteHandler processes DELETE requests.
252 //
253 // DELETE /{hash:[0-9a-f]{32} will delete the block with the specified hash
254 // from all connected volumes.
255 //
256 // Only the Data Manager, or an Arvados admin with scope "all", are
257 // allowed to issue DELETE requests.  If a DELETE request is not
258 // authenticated or is issued by a non-admin user, the server returns
259 // a PermissionError.
260 //
261 // Upon receiving a valid request from an authorized user,
262 // DeleteHandler deletes all copies of the specified block on local
263 // writable volumes.
264 //
265 // Response format:
266 //
267 // If the requested blocks was not found on any volume, the response
268 // code is HTTP 404 Not Found.
269 //
270 // Otherwise, the response code is 200 OK, with a response body
271 // consisting of the JSON message
272 //
273 //    {"copies_deleted":d,"copies_failed":f}
274 //
275 // where d and f are integers representing the number of blocks that
276 // were successfully and unsuccessfully deleted.
277 //
278 func DeleteHandler(resp http.ResponseWriter, req *http.Request) {
279         hash := mux.Vars(req)["hash"]
280
281         // Confirm that this user is an admin and has a token with unlimited scope.
282         var tok = GetApiToken(req)
283         if tok == "" || !CanDelete(tok) {
284                 http.Error(resp, PermissionError.Error(), PermissionError.HTTPCode)
285                 return
286         }
287
288         if neverDelete {
289                 http.Error(resp, MethodDisabledError.Error(), MethodDisabledError.HTTPCode)
290                 return
291         }
292
293         // Delete copies of this block from all available volumes.
294         // Report how many blocks were successfully deleted, and how
295         // many were found on writable volumes but not deleted.
296         var result struct {
297                 Deleted int `json:"copies_deleted"`
298                 Failed  int `json:"copies_failed"`
299         }
300         for _, vol := range KeepVM.AllWritable() {
301                 if err := vol.Trash(hash); err == nil {
302                         result.Deleted++
303                 } else if os.IsNotExist(err) {
304                         continue
305                 } else {
306                         result.Failed++
307                         log.Println("DeleteHandler:", err)
308                 }
309         }
310
311         var st int
312
313         if result.Deleted == 0 && result.Failed == 0 {
314                 st = http.StatusNotFound
315         } else {
316                 st = http.StatusOK
317         }
318
319         resp.WriteHeader(st)
320
321         if st == http.StatusOK {
322                 if body, err := json.Marshal(result); err == nil {
323                         resp.Write(body)
324                 } else {
325                         log.Printf("json.Marshal: %s (result = %v)", err, result)
326                         http.Error(resp, err.Error(), 500)
327                 }
328         }
329 }
330
331 /* PullHandler processes "PUT /pull" requests for the data manager.
332    The request body is a JSON message containing a list of pull
333    requests in the following format:
334
335    [
336       {
337          "locator":"e4d909c290d0fb1ca068ffaddf22cbd0+4985",
338          "servers":[
339                         "keep0.qr1hi.arvadosapi.com:25107",
340                         "keep1.qr1hi.arvadosapi.com:25108"
341                  ]
342           },
343           {
344                  "locator":"55ae4d45d2db0793d53f03e805f656e5+658395",
345                  "servers":[
346                         "10.0.1.5:25107",
347                         "10.0.1.6:25107",
348                         "10.0.1.7:25108"
349                  ]
350           },
351           ...
352    ]
353
354    Each pull request in the list consists of a block locator string
355    and an ordered list of servers.  Keepstore should try to fetch the
356    block from each server in turn.
357
358    If the request has not been sent by the Data Manager, return 401
359    Unauthorized.
360
361    If the JSON unmarshalling fails, return 400 Bad Request.
362 */
363
364 // PullRequest consists of a block locator and an ordered list of servers
365 type PullRequest struct {
366         Locator string   `json:"locator"`
367         Servers []string `json:"servers"`
368 }
369
370 // PullHandler processes "PUT /pull" requests for the data manager.
371 func PullHandler(resp http.ResponseWriter, req *http.Request) {
372         // Reject unauthorized requests.
373         if !IsDataManagerToken(GetApiToken(req)) {
374                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
375                 return
376         }
377
378         // Parse the request body.
379         var pr []PullRequest
380         r := json.NewDecoder(req.Body)
381         if err := r.Decode(&pr); err != nil {
382                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
383                 return
384         }
385
386         // We have a properly formatted pull list sent from the data
387         // manager.  Report success and send the list to the pull list
388         // manager for further handling.
389         resp.WriteHeader(http.StatusOK)
390         resp.Write([]byte(
391                 fmt.Sprintf("Received %d pull requests\n", len(pr))))
392
393         plist := list.New()
394         for _, p := range pr {
395                 plist.PushBack(p)
396         }
397         pullq.ReplaceQueue(plist)
398 }
399
400 // TrashRequest consists of a block locator and it's Mtime
401 type TrashRequest struct {
402         Locator    string `json:"locator"`
403         BlockMtime int64  `json:"block_mtime"`
404 }
405
406 // TrashHandler processes /trash requests.
407 func TrashHandler(resp http.ResponseWriter, req *http.Request) {
408         // Reject unauthorized requests.
409         if !IsDataManagerToken(GetApiToken(req)) {
410                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
411                 return
412         }
413
414         // Parse the request body.
415         var trash []TrashRequest
416         r := json.NewDecoder(req.Body)
417         if err := r.Decode(&trash); err != nil {
418                 http.Error(resp, err.Error(), BadRequestError.HTTPCode)
419                 return
420         }
421
422         // We have a properly formatted trash list sent from the data
423         // manager.  Report success and send the list to the trash work
424         // queue for further handling.
425         resp.WriteHeader(http.StatusOK)
426         resp.Write([]byte(
427                 fmt.Sprintf("Received %d trash requests\n", len(trash))))
428
429         tlist := list.New()
430         for _, t := range trash {
431                 tlist.PushBack(t)
432         }
433         trashq.ReplaceQueue(tlist)
434 }
435
436 // UndeleteHandler processes "PUT /undelete/{hash:[0-9a-f]{32}}" requests for the data manager.
437 func UndeleteHandler(resp http.ResponseWriter, req *http.Request) {
438         // Reject unauthorized requests.
439         if !IsDataManagerToken(GetApiToken(req)) {
440                 http.Error(resp, UnauthorizedError.Error(), UnauthorizedError.HTTPCode)
441                 return
442         }
443
444         hash := mux.Vars(req)["hash"]
445         successResp := "Untrashed on volume: "
446         var st int
447         for _, vol := range KeepVM.AllWritable() {
448                 if err := vol.Untrash(hash); err == nil {
449                         log.Printf("Untrashed %v on volume %v", hash, vol.String())
450                         st = http.StatusOK
451                         successResp += vol.String()
452                         break
453                 } else {
454                         log.Printf("Error untrashing %v on volume %v: %v", hash, vol.String(), err)
455                         st = 500
456                 }
457         }
458
459         if st == http.StatusOK {
460                 resp.Write([]byte(successResp))
461         }
462
463         resp.WriteHeader(st)
464 }
465
466 // ==============================
467 // GetBlock and PutBlock implement lower-level code for handling
468 // blocks by rooting through volumes connected to the local machine.
469 // Once the handler has determined that system policy permits the
470 // request, it calls these methods to perform the actual operation.
471 //
472 // TODO(twp): this code would probably be better located in the
473 // VolumeManager interface. As an abstraction, the VolumeManager
474 // should be the only part of the code that cares about which volume a
475 // block is stored on, so it should be responsible for figuring out
476 // which volume to check for fetching blocks, storing blocks, etc.
477 // ==============================
478
479 // GetBlock fetches and returns the block identified by "hash".
480 //
481 // On success, GetBlock returns a byte slice with the block data, and
482 // a nil error.
483 //
484 // If the block cannot be found on any volume, returns NotFoundError.
485 //
486 // If the block found does not have the correct MD5 hash, returns
487 // DiskHashError.
488 //
489 func GetBlock(hash string) ([]byte, error) {
490         // Attempt to read the requested hash from a keep volume.
491         errorToCaller := NotFoundError
492
493         for _, vol := range KeepVM.AllReadable() {
494                 buf, err := vol.Get(hash)
495                 if err != nil {
496                         // IsNotExist is an expected error and may be
497                         // ignored. All other errors are logged. In
498                         // any case we continue trying to read other
499                         // volumes. If all volumes report IsNotExist,
500                         // we return a NotFoundError.
501                         if !os.IsNotExist(err) {
502                                 log.Printf("%s: Get(%s): %s", vol, hash, err)
503                         }
504                         continue
505                 }
506                 // Check the file checksum.
507                 //
508                 filehash := fmt.Sprintf("%x", md5.Sum(buf))
509                 if filehash != hash {
510                         // TODO: Try harder to tell a sysadmin about
511                         // this.
512                         log.Printf("%s: checksum mismatch for request %s (actual %s)",
513                                 vol, hash, filehash)
514                         errorToCaller = DiskHashError
515                         bufs.Put(buf)
516                         continue
517                 }
518                 if errorToCaller == DiskHashError {
519                         log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
520                                 vol, hash)
521                 }
522                 return buf, nil
523         }
524         return nil, errorToCaller
525 }
526
527 // PutBlock Stores the BLOCK (identified by the content id HASH) in Keep.
528 //
529 // PutBlock(block, hash)
530 //   Stores the BLOCK (identified by the content id HASH) in Keep.
531 //
532 //   The MD5 checksum of the block must be identical to the content id HASH.
533 //   If not, an error is returned.
534 //
535 //   PutBlock stores the BLOCK on the first Keep volume with free space.
536 //   A failure code is returned to the user only if all volumes fail.
537 //
538 //   On success, PutBlock returns nil.
539 //   On failure, it returns a KeepError with one of the following codes:
540 //
541 //   500 Collision
542 //          A different block with the same hash already exists on this
543 //          Keep server.
544 //   422 MD5Fail
545 //          The MD5 hash of the BLOCK does not match the argument HASH.
546 //   503 Full
547 //          There was not enough space left in any Keep volume to store
548 //          the object.
549 //   500 Fail
550 //          The object could not be stored for some other reason (e.g.
551 //          all writes failed). The text of the error message should
552 //          provide as much detail as possible.
553 //
554 func PutBlock(block []byte, hash string) (int, error) {
555         // Check that BLOCK's checksum matches HASH.
556         blockhash := fmt.Sprintf("%x", md5.Sum(block))
557         if blockhash != hash {
558                 log.Printf("%s: MD5 checksum %s did not match request", hash, blockhash)
559                 return 0, RequestHashError
560         }
561
562         // If we already have this data, it's intact on disk, and we
563         // can update its timestamp, return success. If we have
564         // different data with the same hash, return failure.
565         if n, err := CompareAndTouch(hash, block); err == nil || err == CollisionError {
566                 return n, err
567         }
568
569         // Choose a Keep volume to write to.
570         // If this volume fails, try all of the volumes in order.
571         if vol := KeepVM.NextWritable(); vol != nil {
572                 if err := vol.Put(hash, block); err == nil {
573                         return vol.Replication(), nil // success!
574                 }
575         }
576
577         writables := KeepVM.AllWritable()
578         if len(writables) == 0 {
579                 log.Print("No writable volumes.")
580                 return 0, FullError
581         }
582
583         allFull := true
584         for _, vol := range writables {
585                 err := vol.Put(hash, block)
586                 if err == nil {
587                         return vol.Replication(), nil // success!
588                 }
589                 if err != FullError {
590                         // The volume is not full but the
591                         // write did not succeed.  Report the
592                         // error and continue trying.
593                         allFull = false
594                         log.Printf("%s: Write(%s): %s", vol, hash, err)
595                 }
596         }
597
598         if allFull {
599                 log.Print("All volumes are full.")
600                 return 0, FullError
601         }
602         // Already logged the non-full errors.
603         return 0, GenericError
604 }
605
606 // CompareAndTouch returns the current replication level if one of the
607 // volumes already has the given content and it successfully updates
608 // the relevant block's modification time in order to protect it from
609 // premature garbage collection. Otherwise, it returns a non-nil
610 // error.
611 func CompareAndTouch(hash string, buf []byte) (int, error) {
612         var bestErr error = NotFoundError
613         for _, vol := range KeepVM.AllWritable() {
614                 if err := vol.Compare(hash, buf); err == CollisionError {
615                         // Stop if we have a block with same hash but
616                         // different content. (It will be impossible
617                         // to tell which one is wanted if we have
618                         // both, so there's no point writing it even
619                         // on a different volume.)
620                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
621                         return 0, err
622                 } else if os.IsNotExist(err) {
623                         // Block does not exist. This is the only
624                         // "normal" error: we don't log anything.
625                         continue
626                 } else if err != nil {
627                         // Couldn't open file, data is corrupt on
628                         // disk, etc.: log this abnormal condition,
629                         // and try the next volume.
630                         log.Printf("%s: Compare(%s): %s", vol, hash, err)
631                         continue
632                 }
633                 if err := vol.Touch(hash); err != nil {
634                         log.Printf("%s: Touch %s failed: %s", vol, hash, err)
635                         bestErr = err
636                         continue
637                 }
638                 // Compare and Touch both worked --> done.
639                 return vol.Replication(), nil
640         }
641         return 0, bestErr
642 }
643
644 var validLocatorRe = regexp.MustCompile(`^[0-9a-f]{32}$`)
645
646 // IsValidLocator returns true if the specified string is a valid Keep locator.
647 //   When Keep is extended to support hash types other than MD5,
648 //   this should be updated to cover those as well.
649 //
650 func IsValidLocator(loc string) bool {
651         return validLocatorRe.MatchString(loc)
652 }
653
654 var authRe = regexp.MustCompile(`^OAuth2\s+(.*)`)
655
656 // GetApiToken returns the OAuth2 token from the Authorization
657 // header of a HTTP request, or an empty string if no matching
658 // token is found.
659 func GetApiToken(req *http.Request) string {
660         if auth, ok := req.Header["Authorization"]; ok {
661                 if match := authRe.FindStringSubmatch(auth[0]); match != nil {
662                         return match[1]
663                 }
664         }
665         return ""
666 }
667
668 // IsExpired returns true if the given Unix timestamp (expressed as a
669 // hexadecimal string) is in the past, or if timestampHex cannot be
670 // parsed as a hexadecimal string.
671 func IsExpired(timestampHex string) bool {
672         ts, err := strconv.ParseInt(timestampHex, 16, 0)
673         if err != nil {
674                 log.Printf("IsExpired: %s", err)
675                 return true
676         }
677         return time.Unix(ts, 0).Before(time.Now())
678 }
679
680 // CanDelete returns true if the user identified by apiToken is
681 // allowed to delete blocks.
682 func CanDelete(apiToken string) bool {
683         if apiToken == "" {
684                 return false
685         }
686         // Blocks may be deleted only when Keep has been configured with a
687         // data manager.
688         if IsDataManagerToken(apiToken) {
689                 return true
690         }
691         // TODO(twp): look up apiToken with the API server
692         // return true if is_admin is true and if the token
693         // has unlimited scope
694         return false
695 }
696
697 // IsDataManagerToken returns true if apiToken represents the data
698 // manager's token.
699 func IsDataManagerToken(apiToken string) bool {
700         return dataManagerToken != "" && apiToken == dataManagerToken
701 }