13647: Use cluster config instead of custom keepstore config.
[arvados.git] / services / keepstore / pull_worker.go
index 42b5d5889d30984685d43655f66fb3857f98de92..100d08838d84f3597403d0d56374cc035d7fdddb 100644 (file)
@@ -6,7 +6,6 @@ package main
 
 import (
        "context"
-       "crypto/rand"
        "fmt"
        "io"
        "io/ioutil"
@@ -18,15 +17,15 @@ import (
 // RunPullWorker receives PullRequests from pullq, invokes
 // PullItemAndProcess on each one. After each PR, it logs a message
 // indicating whether the pull was successful.
-func RunPullWorker(pullq *WorkQueue, keepClient *keepclient.KeepClient) {
+func (h *handler) runPullWorker(pullq *WorkQueue) {
        for item := range pullq.NextItem {
                pr := item.(PullRequest)
-               err := PullItemAndProcess(pr, keepClient)
+               err := h.pullItemAndProcess(pr)
                pullq.DoneItem <- struct{}{}
                if err == nil {
-                       log.Printf("Pull %s success", pr)
+                       h.Logger.Printf("Pull %s success", pr)
                } else {
-                       log.Printf("Pull %s error: %s", pr, err)
+                       h.Logger.Printf("Pull %s error: %s", pr, err)
                }
        }
 }
@@ -39,28 +38,28 @@ func RunPullWorker(pullq *WorkQueue, keepClient *keepclient.KeepClient) {
 // only attempt to write the data to the corresponding
 // volume. Otherwise it writes to any local volume, as a PUT request
 // would.
-func PullItemAndProcess(pullRequest PullRequest, keepClient *keepclient.KeepClient) error {
-       var vol Volume
+func (h *handler) pullItemAndProcess(pullRequest PullRequest) error {
+       var vol *VolumeMount
        if uuid := pullRequest.MountUUID; uuid != "" {
-               vol = KeepVM.Lookup(pullRequest.MountUUID, true)
+               vol = h.volmgr.Lookup(pullRequest.MountUUID, true)
                if vol == nil {
                        return fmt.Errorf("pull req has nonexistent mount: %v", pullRequest)
                }
        }
 
-       keepClient.Arvados.ApiToken = randomToken
-
+       // Make a private copy of keepClient so we can set
+       // ServiceRoots to the source servers specified in the pull
+       // request.
+       keepClient := *h.keepClient
        serviceRoots := make(map[string]string)
        for _, addr := range pullRequest.Servers {
                serviceRoots[addr] = addr
        }
        keepClient.SetServiceRoots(serviceRoots, nil, nil)
 
-       // Generate signature with a random token
-       expiresAt := time.Now().Add(60 * time.Second)
-       signedLocator := SignLocator(pullRequest.Locator, randomToken, expiresAt)
+       signedLocator := SignLocator(h.Cluster, pullRequest.Locator, keepClient.Arvados.ApiToken, time.Now().Add(time.Minute))
 
-       reader, contentLen, _, err := GetContent(signedLocator, keepClient)
+       reader, contentLen, _, err := GetContent(signedLocator, &keepClient)
        if err != nil {
                return err
        }
@@ -78,8 +77,7 @@ func PullItemAndProcess(pullRequest PullRequest, keepClient *keepclient.KeepClie
                return fmt.Errorf("Content not found for: %s", signedLocator)
        }
 
-       writePulledBlock(vol, readContent, pullRequest.Locator)
-       return nil
+       return writePulledBlock(h.volmgr, vol, readContent, pullRequest.Locator)
 }
 
 // Fetch the content for the given locator using keepclient.
@@ -87,24 +85,11 @@ var GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
        return keepClient.Get(signedLocator)
 }
 
-var writePulledBlock = func(volume Volume, data []byte, locator string) {
-       var err error
+var writePulledBlock = func(volmgr *RRVolumeManager, volume Volume, data []byte, locator string) error {
        if volume != nil {
-               err = volume.Put(context.Background(), locator, data)
+               return volume.Put(context.Background(), locator, data)
        } else {
-               _, err = PutBlock(context.Background(), data, locator)
-       }
-       if err != nil {
-               log.Printf("error writing pulled block %q: %s", locator, err)
+               _, err := PutBlock(context.Background(), volmgr, data, locator)
+               return err
        }
 }
-
-var randomToken = func() string {
-       const alphaNumeric = "0123456789abcdefghijklmnopqrstuvwxyz"
-       var bytes = make([]byte, 36)
-       rand.Read(bytes)
-       for i, b := range bytes {
-               bytes[i] = alphaNumeric[b%byte(len(alphaNumeric))]
-       }
-       return (string(bytes))
-}()