20318: Route (*KeepClient)Get() through disk cache layer.
[arvados.git] / services / keepstore / pull_worker.go
index 100d08838d84f3597403d0d56374cc035d7fdddb..348bfb4df00087a1726ef36cbd186fe0eb5ea4c7 100644 (file)
@@ -2,7 +2,7 @@
 //
 // SPDX-License-Identifier: AGPL-3.0
 
-package main
+package keepstore
 
 import (
        "context"
@@ -11,7 +11,7 @@ import (
        "io/ioutil"
        "time"
 
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       "git.arvados.org/arvados.git/sdk/go/keepclient"
 )
 
 // RunPullWorker receives PullRequests from pullq, invokes
@@ -50,7 +50,7 @@ func (h *handler) pullItemAndProcess(pullRequest PullRequest) error {
        // Make a private copy of keepClient so we can set
        // ServiceRoots to the source servers specified in the pull
        // request.
-       keepClient := *h.keepClient
+       keepClient := h.keepClient.Clone()
        serviceRoots := make(map[string]string)
        for _, addr := range pullRequest.Servers {
                serviceRoots[addr] = addr
@@ -59,7 +59,7 @@ func (h *handler) pullItemAndProcess(pullRequest PullRequest) error {
 
        signedLocator := SignLocator(h.Cluster, pullRequest.Locator, keepClient.Arvados.ApiToken, time.Now().Add(time.Minute))
 
-       reader, contentLen, _, err := GetContent(signedLocator, &keepClient)
+       reader, _, _, err := GetContent(signedLocator, keepClient)
        if err != nil {
                return err
        }
@@ -73,14 +73,14 @@ func (h *handler) pullItemAndProcess(pullRequest PullRequest) error {
                return err
        }
 
-       if (readContent == nil) || (int64(len(readContent)) != contentLen) {
+       if readContent == nil {
                return fmt.Errorf("Content not found for: %s", signedLocator)
        }
 
        return writePulledBlock(h.volmgr, vol, readContent, pullRequest.Locator)
 }
 
-// Fetch the content for the given locator using keepclient.
+// GetContent fetches the content for the given locator using keepclient.
 var GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (io.ReadCloser, int64, string, error) {
        return keepClient.Get(signedLocator)
 }
@@ -88,8 +88,7 @@ var GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (
 var writePulledBlock = func(volmgr *RRVolumeManager, volume Volume, data []byte, locator string) error {
        if volume != nil {
                return volume.Put(context.Background(), locator, data)
-       } else {
-               _, err := PutBlock(context.Background(), volmgr, data, locator)
-               return err
        }
+       _, err := PutBlock(context.Background(), volmgr, data, locator, nil)
+       return err
 }