projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Merge branch 'master' into 16811-public-favs
[arvados.git]
/
services
/
keepstore
/
pull_worker.go
diff --git
a/services/keepstore/pull_worker.go
b/services/keepstore/pull_worker.go
index 58266a19ce27b4cdba514ea9c461c5247908560f..670fa1a4140fc14229279d1ff920d76959679afd 100644
(file)
--- a/
services/keepstore/pull_worker.go
+++ b/
services/keepstore/pull_worker.go
@@
-1,30
+1,31
@@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
"context"
package main
import (
"context"
- "crypto/rand"
"fmt"
"io"
"io/ioutil"
"time"
"fmt"
"io"
"io/ioutil"
"time"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
-
- log "github.com/Sirupsen/logrus"
+ "git.arvados.org/arvados.git/sdk/go/keepclient"
)
// RunPullWorker receives PullRequests from pullq, invokes
// PullItemAndProcess on each one. After each PR, it logs a message
// indicating whether the pull was successful.
)
// RunPullWorker receives PullRequests from pullq, invokes
// PullItemAndProcess on each one. After each PR, it logs a message
// indicating whether the pull was successful.
-func
RunPullWorker(pullq *WorkQueue, keepClient *keepclient.KeepClient
) {
+func
(h *handler) runPullWorker(pullq *WorkQueue
) {
for item := range pullq.NextItem {
pr := item.(PullRequest)
for item := range pullq.NextItem {
pr := item.(PullRequest)
- err :=
PullItemAndProcess(pr, keepClient
)
+ err :=
h.pullItemAndProcess(pr
)
pullq.DoneItem <- struct{}{}
if err == nil {
pullq.DoneItem <- struct{}{}
if err == nil {
-
log
.Printf("Pull %s success", pr)
+
h.Logger
.Printf("Pull %s success", pr)
} else {
} else {
-
log
.Printf("Pull %s error: %s", pr, err)
+
h.Logger
.Printf("Pull %s error: %s", pr, err)
}
}
}
}
}
}
@@
-37,28
+38,28
@@
func RunPullWorker(pullq *WorkQueue, keepClient *keepclient.KeepClient) {
// only attempt to write the data to the corresponding
// volume. Otherwise it writes to any local volume, as a PUT request
// would.
// only attempt to write the data to the corresponding
// volume. Otherwise it writes to any local volume, as a PUT request
// would.
-func
PullItemAndProcess(pullRequest PullRequest, keepClient *keepclient.KeepClien
t) error {
- var vol
Volume
+func
(h *handler) pullItemAndProcess(pullRequest PullReques
t) error {
+ var vol
*VolumeMount
if uuid := pullRequest.MountUUID; uuid != "" {
if uuid := pullRequest.MountUUID; uuid != "" {
- vol =
KeepVM
.Lookup(pullRequest.MountUUID, true)
+ vol =
h.volmgr
.Lookup(pullRequest.MountUUID, true)
if vol == nil {
return fmt.Errorf("pull req has nonexistent mount: %v", pullRequest)
}
}
if vol == nil {
return fmt.Errorf("pull req has nonexistent mount: %v", pullRequest)
}
}
- keepClient.Arvados.ApiToken = randomToken
-
+ // Make a private copy of keepClient so we can set
+ // ServiceRoots to the source servers specified in the pull
+ // request.
+ keepClient := *h.keepClient
serviceRoots := make(map[string]string)
for _, addr := range pullRequest.Servers {
serviceRoots[addr] = addr
}
keepClient.SetServiceRoots(serviceRoots, nil, nil)
serviceRoots := make(map[string]string)
for _, addr := range pullRequest.Servers {
serviceRoots[addr] = addr
}
keepClient.SetServiceRoots(serviceRoots, nil, nil)
- // Generate signature with a random token
- expiresAt := time.Now().Add(60 * time.Second)
- signedLocator := SignLocator(pullRequest.Locator, randomToken, expiresAt)
+ signedLocator := SignLocator(h.Cluster, pullRequest.Locator, keepClient.Arvados.ApiToken, time.Now().Add(time.Minute))
- reader, contentLen, _, err := GetContent(signedLocator, keepClient)
+ reader, contentLen, _, err := GetContent(signedLocator,
&
keepClient)
if err != nil {
return err
}
if err != nil {
return err
}
@@
-76,33
+77,18
@@
func PullItemAndProcess(pullRequest PullRequest, keepClient *keepclient.KeepClie
return fmt.Errorf("Content not found for: %s", signedLocator)
}
return fmt.Errorf("Content not found for: %s", signedLocator)
}
- writePulledBlock(vol, readContent, pullRequest.Locator)
- return nil
+ return writePulledBlock(h.volmgr, vol, readContent, pullRequest.Locator)
}
}
-//
Fetch
the content for the given locator using keepclient.
+//
GetContent fetches
the content for the given locator using keepclient.
var GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (io.ReadCloser, int64, string, error) {
return keepClient.Get(signedLocator)
}
var GetContent = func(signedLocator string, keepClient *keepclient.KeepClient) (io.ReadCloser, int64, string, error) {
return keepClient.Get(signedLocator)
}
-var writePulledBlock = func(volume Volume, data []byte, locator string) {
- var err error
+var writePulledBlock = func(volmgr *RRVolumeManager, volume Volume, data []byte, locator string) error {
if volume != nil {
if volume != nil {
- err = volume.Put(context.Background(), locator, data)
- } else {
- _, err = PutBlock(context.Background(), data, locator)
- }
- if err != nil {
- log.Printf("error writing pulled block %q: %s", locator, err)
+ return volume.Put(context.Background(), locator, data)
}
}
+ _, err := PutBlock(context.Background(), volmgr, data, locator)
+ return err
}
}
-
-var randomToken = func() string {
- const alphaNumeric = "0123456789abcdefghijklmnopqrstuvwxyz"
- var bytes = make([]byte, 36)
- rand.Read(bytes)
- for i, b := range bytes {
- bytes[i] = alphaNumeric[b%byte(len(alphaNumeric))]
- }
- return (string(bytes))
-}()