<div class="releasenotes">
</notextile>
-h2(#main). development main (as of 2021-06-03)
+h2(#main). development main (as of 2021-07-15)
"Upgrading from 2.2.0":#v2_2_0
+h3. crunch-dispatch-local now requires config.yml
+
+The @crunch-dispatch-local@ dispatcher now reads the API host and token from the system wide @/etc/arvados/config.yml@ . It will fail to start that file is not found or not readable.
+
h2(#v2_2_0). v2.2.0 (2021-06-03)
"Upgrading from 2.1.0":#v2_1_0
// IKeepClient is the minimal Keep API methods used by crunch-run.
type IKeepClient interface {
- PutB(buf []byte) (string, int, error)
+ BlockWrite(context.Context, arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error)
ReadAt(locator string, p []byte, off int) (int, error)
ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
LocalLocator(locator string) (string, error)
// ContainerRunner is the main stateful struct used for a single execution of a
// container.
type ContainerRunner struct {
- executor containerExecutor
+ executor containerExecutor
+ executorStdin io.Closer
+ executorStdout io.Closer
+ executorStderr io.Closer
// Dispatcher client is initialized with the Dispatcher token.
// This is a privileged token used to manage container status
ExitCode *int
NewLogWriter NewLogWriter
CrunchLog *ThrottledLogger
- Stdout io.WriteCloser
- Stderr io.WriteCloser
logUUID string
logMtx sync.Mutex
LogCollection arvados.CollectionFileSystem
// CreateContainer creates the docker container.
func (runner *ContainerRunner) CreateContainer(imageID string, bindmounts map[string]bindmount) error {
- var stdin io.ReadCloser
+ var stdin io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil))
if mnt, ok := runner.Container.Mounts["stdin"]; ok {
switch mnt.Kind {
case "collection":
if !runner.enableMemoryLimit {
ram = 0
}
+ runner.executorStdin = stdin
+ runner.executorStdout = stdout
+ runner.executorStderr = stderr
return runner.executor.Create(containerSpec{
Image: imageID,
VCPUs: runner.Container.RuntimeConstraints.VCPUs,
}
runner.ExitCode = &exitcode
+ var returnErr error
+ if err = runner.executorStdin.Close(); err != nil {
+ err = fmt.Errorf("error closing container stdin: %s", err)
+ runner.CrunchLog.Printf("%s", err)
+ returnErr = err
+ }
+ if err = runner.executorStdout.Close(); err != nil {
+ err = fmt.Errorf("error closing container stdout: %s", err)
+ runner.CrunchLog.Printf("%s", err)
+ if returnErr == nil {
+ returnErr = err
+ }
+ }
+ if err = runner.executorStderr.Close(); err != nil {
+ err = fmt.Errorf("error closing container stderr: %s", err)
+ runner.CrunchLog.Printf("%s", err)
+ if returnErr == nil {
+ returnErr = err
+ }
+ }
+
if runner.statReporter != nil {
runner.statReporter.Stop()
err = runner.statLogger.Close()
runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
}
}
- return nil
+ return returnErr
}
func (runner *ContainerRunner) updateLogs() {
func (e *stubExecutor) Stop() error { e.stopped = true; go func() { e.exit <- -1 }(); return e.stopErr }
func (e *stubExecutor) Close() { e.closed = true }
func (e *stubExecutor) Wait(context.Context) (int, error) {
- defer e.created.Stdout.Close()
- defer e.created.Stderr.Close()
return <-e.exit, e.waitErr
}
return locator, nil
}
-func (client *KeepTestClient) PutB(buf []byte) (string, int, error) {
- client.Content = buf
- return fmt.Sprintf("%x+%d", md5.Sum(buf), len(buf)), len(buf), nil
+func (client *KeepTestClient) BlockWrite(_ context.Context, opts arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
+ client.Content = opts.Data
+ return arvados.BlockWriteResponse{
+ Locator: fmt.Sprintf("%x+%d", md5.Sum(opts.Data), len(opts.Data)),
+ }, nil
}
func (client *KeepTestClient) ReadAt(string, []byte, int) (int, error) {
return nil, errors.New("KeepError")
}
-func (*KeepErrorTestClient) PutB(buf []byte) (string, int, error) {
- return "", 0, errors.New("KeepError")
+func (*KeepErrorTestClient) BlockWrite(context.Context, arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
+ return arvados.BlockWriteResponse{}, errors.New("KeepError")
}
func (*KeepErrorTestClient) LocalLocator(string) (string, error) {
func (s *TestSuite) TestRunContainer(c *C) {
s.executor.runFunc = func() {
fmt.Fprintf(s.executor.created.Stdout, "Hello world\n")
- s.executor.created.Stdout.Close()
- s.executor.created.Stderr.Close()
s.executor.exit <- 0
}
}
}
-func (e *dockerExecutor) startIO(stdin io.ReadCloser, stdout, stderr io.WriteCloser) error {
+func (e *dockerExecutor) startIO(stdin io.Reader, stdout, stderr io.Writer) error {
resp, err := e.dockerclient.ContainerAttach(context.TODO(), e.containerID, dockertypes.ContainerAttachOptions{
Stream: true,
Stdin: stdin != nil,
return nil
}
-func (e *dockerExecutor) handleStdin(stdin io.ReadCloser, conn io.Writer, closeConn func() error) error {
- defer stdin.Close()
+func (e *dockerExecutor) handleStdin(stdin io.Reader, conn io.Writer, closeConn func() error) error {
defer closeConn()
_, err := io.Copy(conn, stdin)
if err != nil {
// Handle docker log protocol; see
// https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
-func (e *dockerExecutor) handleStdoutStderr(stdout, stderr io.WriteCloser, reader io.Reader) error {
+func (e *dockerExecutor) handleStdoutStderr(stdout, stderr io.Writer, reader io.Reader) error {
header := make([]byte, 8)
var err error
for err == nil {
if err != nil {
return fmt.Errorf("error copying stdout/stderr from docker: %v", err)
}
- err = stdout.Close()
- if err != nil {
- return fmt.Errorf("error writing stdout: close: %v", err)
- }
- err = stderr.Close()
- if err != nil {
- return fmt.Errorf("error writing stderr: close: %v", err)
- }
return nil
}
EnableNetwork bool
NetworkMode string // docker network mode, normally "default"
CgroupParent string
- Stdin io.ReadCloser
- Stdout io.WriteCloser
- Stderr io.WriteCloser
+ Stdin io.Reader
+ Stdout io.Writer
+ Stderr io.Writer
}
// containerExecutor is an interface to a container runtime
}
}
+func (s *executorSuite) TestExecWorkingDir(c *C) {
+ s.spec.WorkingDir = "/tmp"
+ s.spec.Command = []string{"sh", "-c", "pwd"}
+ s.checkRun(c, 0)
+ c.Check(s.stdout.String(), Equals, "/tmp\n")
+}
+
func (s *executorSuite) TestExecStdoutStderr(c *C) {
s.spec.Command = []string{"sh", "-c", "echo foo; echo -n bar >&2; echo baz; echo waz >&2"}
s.checkRun(c, 0)
"io/ioutil"
"os"
"os/exec"
+ "sort"
"syscall"
"golang.org/x/net/context"
}
func (e *singularityExecutor) Start() error {
- args := []string{"singularity", "exec", "--containall", "--no-home", "--cleanenv"}
+ args := []string{"singularity", "exec", "--containall", "--no-home", "--cleanenv", "--pwd", e.spec.WorkingDir}
if !e.spec.EnableNetwork {
args = append(args, "--net", "--network=none")
}
false: "rw",
true: "ro",
}
- for path, mount := range e.spec.BindMounts {
+ var binds []string
+ for path, _ := range e.spec.BindMounts {
+ binds = append(binds, path)
+ }
+ sort.Strings(binds)
+ for _, path := range binds {
+ mount := e.spec.BindMounts[path]
args = append(args, "--bind", mount.HostPath+":"+path+":"+readonlyflag[mount.ReadOnly])
}
args = append(args, e.imageFilename)
"bufio"
"context"
"encoding/json"
+ "io"
"net"
"github.com/sirupsen/logrus"
ReturnTo string `json:"return_to"` // Redirect to this URL after logging out
}
+type BlockWriteOptions struct {
+ Hash string
+ Data []byte
+ Reader io.Reader
+ DataSize int // Must be set if Data is nil.
+ RequestID string
+ StorageClasses []string
+ Replicas int
+ Attempts int
+}
+
+type BlockWriteResponse struct {
+ Locator string
+ Replicas int
+}
+
type API interface {
ConfigGet(ctx context.Context) (json.RawMessage, error)
Login(ctx context.Context, options LoginOptions) (LoginResponse, error)
package arvados
-import "io"
+import (
+ "context"
+ "io"
+)
type fsBackend interface {
keepClient
type keepClient interface {
ReadAt(locator string, p []byte, off int) (int, error)
- PutB(p []byte) (string, int, error)
+ BlockWrite(context.Context, BlockWriteOptions) (BlockWriteResponse, error)
LocalLocator(locator string) (string, error)
}
type collectionFileSystem struct {
fileSystem
- uuid string
+ uuid string
+ replicas int
+ storageClasses []string
}
// FileSystem returns a CollectionFileSystem for the collection.
modTime = time.Now()
}
fs := &collectionFileSystem{
- uuid: c.UUID,
+ uuid: c.UUID,
+ storageClasses: c.StorageClassesDesired,
fileSystem: fileSystem{
fsBackend: keepBackend{apiClient: client, keepClient: kc},
thr: newThrottle(concurrentWriters),
},
}
+ if r := c.ReplicationDesired; r != nil {
+ fs.replicas = *r
+ }
root := &dirnode{
fs: fs,
treenode: treenode{
// filenode implements inode.
type filenode struct {
parent inode
- fs FileSystem
+ fs *collectionFileSystem
fileinfo fileinfo
segments []segment
// number of times `segments` has changed in a
fn.fs.throttle().Acquire()
go func() {
defer close(done)
- locator, _, err := fn.FS().PutB(buf)
+ resp, err := fn.FS().BlockWrite(context.Background(), BlockWriteOptions{
+ Data: buf,
+ Replicas: fn.fs.replicas,
+ StorageClasses: fn.fs.storageClasses,
+ })
fn.fs.throttle().Release()
fn.Lock()
defer fn.Unlock()
fn.memsize -= int64(len(buf))
fn.segments[idx] = storedSegment{
kc: fn.FS(),
- locator: locator,
+ locator: resp.Locator,
size: len(buf),
offset: 0,
length: len(buf),
go func() {
defer close(done)
defer close(errs)
- locator, _, err := dn.fs.PutB(block)
+ resp, err := dn.fs.BlockWrite(context.Background(), BlockWriteOptions{
+ Data: block,
+ Replicas: dn.fs.replicas,
+ StorageClasses: dn.fs.storageClasses,
+ })
dn.fs.throttle().Release()
if err != nil {
errs <- err
data := ref.fn.segments[ref.idx].(*memSegment).buf
ref.fn.segments[ref.idx] = storedSegment{
kc: dn.fs,
- locator: locator,
+ locator: resp.Locator,
size: blocksize,
offset: offsets[idx],
length: len(data),
import (
"bytes"
+ "context"
"crypto/md5"
"errors"
"fmt"
type keepClientStub struct {
blocks map[string][]byte
refreshable map[string]bool
- onPut func(bufcopy []byte) // called from PutB, before acquiring lock
+ onWrite func(bufcopy []byte) // called from WriteBlock, before acquiring lock
authToken string // client's auth token (used for signing locators)
sigkey string // blob signing key
sigttl time.Duration // blob signing ttl
return copy(p, buf[off:]), nil
}
-func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
- locator := SignLocator(fmt.Sprintf("%x+%d", md5.Sum(p), len(p)), kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
- buf := make([]byte, len(p))
- copy(buf, p)
- if kcs.onPut != nil {
- kcs.onPut(buf)
+func (kcs *keepClientStub) BlockWrite(_ context.Context, opts BlockWriteOptions) (BlockWriteResponse, error) {
+ if opts.Data == nil {
+ panic("oops, stub is not made for this")
+ }
+ locator := SignLocator(fmt.Sprintf("%x+%d", md5.Sum(opts.Data), len(opts.Data)), kcs.authToken, time.Now().Add(kcs.sigttl), kcs.sigttl, []byte(kcs.sigkey))
+ buf := make([]byte, len(opts.Data))
+ copy(buf, opts.Data)
+ if kcs.onWrite != nil {
+ kcs.onWrite(buf)
+ }
+ for _, sc := range opts.StorageClasses {
+ if sc != "default" {
+ return BlockWriteResponse{}, fmt.Errorf("stub does not write storage class %q", sc)
+ }
}
kcs.Lock()
defer kcs.Unlock()
kcs.blocks[locator[:32]] = buf
- return locator, 1, nil
+ return BlockWriteResponse{Locator: locator, Replicas: 1}, nil
}
var reRemoteSignature = regexp.MustCompile(`\+[AR][^+]*`)
c.Check(ok, check.Equals, true)
}
+func (s *CollectionFSSuite) TestUnattainableStorageClasses(c *check.C) {
+ fs, err := (&Collection{
+ StorageClassesDesired: []string{"unobtainium"},
+ }).FileSystem(s.client, s.kc)
+ c.Assert(err, check.IsNil)
+
+ f, err := fs.OpenFile("/foo", os.O_CREATE|os.O_WRONLY, 0777)
+ c.Assert(err, check.IsNil)
+ _, err = f.Write([]byte("food"))
+ c.Assert(err, check.IsNil)
+ err = f.Close()
+ c.Assert(err, check.IsNil)
+ _, err = fs.MarshalManifest(".")
+ c.Assert(err, check.ErrorMatches, `.*stub does not write storage class \"unobtainium\"`)
+}
+
func (s *CollectionFSSuite) TestColonInFilename(c *check.C) {
fs, err := (&Collection{
ManifestText: "./foo:foo 3858f62230ac3c915f300c664312c63f+3 0:3:bar:bar\n",
proceed := make(chan struct{})
var started, concurrent int32
blk2done := false
- s.kc.onPut = func([]byte) {
+ s.kc.onWrite = func([]byte) {
atomic.AddInt32(&concurrent, 1)
switch atomic.AddInt32(&started, 1) {
case 1:
fs, err := (&Collection{}).FileSystem(s.client, s.kc)
c.Assert(err, check.IsNil)
- s.kc.onPut = func([]byte) {
+ s.kc.onWrite = func([]byte) {
// discard flushed data -- otherwise the stub will use
// unlimited memory
time.Sleep(time.Millisecond)
c.Assert(err, check.IsNil)
var flushed int64
- s.kc.onPut = func(p []byte) {
+ s.kc.onWrite = func(p []byte) {
atomic.AddInt64(&flushed, int64(len(p)))
}
time.AfterFunc(10*time.Second, func() { close(timeout) })
var putCount, concurrency int64
var unflushed int64
- s.kc.onPut = func(p []byte) {
+ s.kc.onWrite = func(p []byte) {
defer atomic.AddInt64(&unflushed, -int64(len(p)))
cur := atomic.AddInt64(&concurrency, 1)
defer atomic.AddInt64(&concurrency, -1)
})
wrote := 0
- s.kc.onPut = func(p []byte) {
+ s.kc.onWrite = func(p []byte) {
s.kc.Lock()
s.kc.blocks = map[string][]byte{}
wrote++
}
func (s *CollectionFSSuite) TestFlushShort(c *check.C) {
- s.kc.onPut = func([]byte) {
+ s.kc.onWrite = func([]byte) {
s.kc.Lock()
s.kc.blocks = map[string][]byte{}
s.kc.Unlock()
// Importing arvadostest would be an import cycle, so these
// fixtures are duplicated here [until fs moves to a separate
// package].
- fixtureActiveToken = "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
- fixtureAProjectUUID = "zzzzz-j7d0g-v955i6s2oi1cbso"
- fixtureThisFilterGroupUUID = "zzzzz-j7d0g-thisfiltergroup"
- fixtureAFilterGroupTwoUUID = "zzzzz-j7d0g-afiltergrouptwo"
- fixtureAFilterGroupThreeUUID = "zzzzz-j7d0g-filtergroupthre"
- fixtureFooAndBarFilesInDirUUID = "zzzzz-4zz18-foonbarfilesdir"
- fixtureFooCollectionName = "zzzzz-4zz18-fy296fx3hot09f7 added sometime"
- fixtureFooCollectionPDH = "1f4b0bc7583c2a7f9102c395f4ffc5e3+45"
- fixtureFooCollection = "zzzzz-4zz18-fy296fx3hot09f7"
- fixtureNonexistentCollection = "zzzzz-4zz18-totallynotexist"
- fixtureBlobSigningKey = "zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc"
- fixtureBlobSigningTTL = 336 * time.Hour
+ fixtureActiveToken = "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
+ fixtureAProjectUUID = "zzzzz-j7d0g-v955i6s2oi1cbso"
+ fixtureThisFilterGroupUUID = "zzzzz-j7d0g-thisfiltergroup"
+ fixtureAFilterGroupTwoUUID = "zzzzz-j7d0g-afiltergrouptwo"
+ fixtureAFilterGroupThreeUUID = "zzzzz-j7d0g-filtergroupthre"
+ fixtureFooAndBarFilesInDirUUID = "zzzzz-4zz18-foonbarfilesdir"
+ fixtureFooCollectionName = "zzzzz-4zz18-fy296fx3hot09f7 added sometime"
+ fixtureFooCollectionPDH = "1f4b0bc7583c2a7f9102c395f4ffc5e3+45"
+ fixtureFooCollection = "zzzzz-4zz18-fy296fx3hot09f7"
+ fixtureNonexistentCollection = "zzzzz-4zz18-totallynotexist"
+ fixtureStorageClassesDesiredArchive = "zzzzz-4zz18-3t236wr12769qqa"
+ fixtureBlobSigningKey = "zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc"
+ fixtureBlobSigningTTL = 336 * time.Hour
)
var _ = check.Suite(&SiteFSSuite{})
c.Check(len(fis), check.Equals, 0)
}
+func (s *SiteFSSuite) TestUpdateStorageClasses(c *check.C) {
+ f, err := s.fs.OpenFile("/by_id/"+fixtureStorageClassesDesiredArchive+"/newfile", os.O_CREATE|os.O_RDWR, 0777)
+ c.Assert(err, check.IsNil)
+ _, err = f.Write([]byte("nope"))
+ c.Assert(err, check.IsNil)
+ err = f.Close()
+ c.Assert(err, check.IsNil)
+ err = s.fs.Sync()
+ c.Assert(err, check.ErrorMatches, `.*stub does not write storage class "archive"`)
+}
+
func (s *SiteFSSuite) TestByUUIDAndPDH(c *check.C) {
f, err := s.fs.Open("/by_id")
c.Assert(err, check.IsNil)
import (
"bytes"
+ "context"
"crypto/md5"
"errors"
"fmt"
"sync"
"time"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
- "git.arvados.org/arvados.git/sdk/go/asyncbuf"
"git.arvados.org/arvados.git/sdk/go/httpserver"
)
// Returns an InsufficientReplicasError if 0 <= replicas <
// kc.Wants_replicas.
func (kc *KeepClient) PutHR(hash string, r io.Reader, dataBytes int64) (string, int, error) {
- // Buffer for reads from 'r'
- var bufsize int
- if dataBytes > 0 {
- if dataBytes > BLOCKSIZE {
- return "", 0, ErrOversizeBlock
- }
- bufsize = int(dataBytes)
- } else {
- bufsize = BLOCKSIZE
- }
-
- buf := asyncbuf.NewBuffer(make([]byte, 0, bufsize))
- go func() {
- _, err := io.Copy(buf, HashCheckingReader{r, md5.New(), hash})
- buf.CloseWithError(err)
- }()
- return kc.putReplicas(hash, buf.NewReader, dataBytes)
+ resp, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
+ Hash: hash,
+ Reader: r,
+ DataSize: int(dataBytes),
+ })
+ return resp.Locator, resp.Replicas, err
}
// PutHB writes a block to Keep. The hash of the bytes is given in
//
// Return values are the same as for PutHR.
func (kc *KeepClient) PutHB(hash string, buf []byte) (string, int, error) {
- newReader := func() io.Reader { return bytes.NewBuffer(buf) }
- return kc.putReplicas(hash, newReader, int64(len(buf)))
+ resp, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
+ Hash: hash,
+ Data: buf,
+ })
+ return resp.Locator, resp.Replicas, err
}
// PutB writes a block to Keep. It computes the hash itself.
//
// Return values are the same as for PutHR.
func (kc *KeepClient) PutB(buffer []byte) (string, int, error) {
- hash := fmt.Sprintf("%x", md5.Sum(buffer))
- return kc.PutHB(hash, buffer)
+ resp, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
+ Data: buffer,
+ })
+ return resp.Locator, resp.Replicas, err
}
// PutR writes a block to Keep. It first reads all data from r into a buffer
import (
"bytes"
+ "context"
"crypto/md5"
"errors"
"fmt"
"testing"
"time"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
. "gopkg.in/check.v1"
UploadToStubHelper(c, st,
func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
- go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, int64(len("foo")), kc.getRequestID())
+ go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID())
writer.Write([]byte("foo"))
writer.Close()
UploadToStubHelper(c, st,
func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, uploadStatusChan chan uploadStatus) {
- go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, int64(len("foo")), kc.getRequestID())
+ go kc.uploadToKeepServer(url, st.expectPath, nil, reader, uploadStatusChan, len("foo"), kc.getRequestID())
writer.Write([]byte("foo"))
writer.Close()
func (s *StandaloneSuite) TestPutWithStorageClasses(c *C) {
nServers := 5
for _, trial := range []struct {
- replicas int
- classes []string
- minRequests int
- maxRequests int
- success bool
+ replicas int
+ clientClasses []string
+ putClasses []string // putClasses takes precedence over clientClasses
+ minRequests int
+ maxRequests int
+ success bool
}{
- {1, []string{"class1"}, 1, 1, true},
- {2, []string{"class1"}, 1, 2, true},
- {3, []string{"class1"}, 2, 3, true},
- {1, []string{"class1", "class2"}, 1, 1, true},
- {nServers*2 + 1, []string{"class1"}, nServers, nServers, false},
- {1, []string{"class404"}, nServers, nServers, false},
- {1, []string{"class1", "class404"}, nServers, nServers, false},
+ {1, []string{"class1"}, nil, 1, 1, true},
+ {2, []string{"class1"}, nil, 1, 2, true},
+ {3, []string{"class1"}, nil, 2, 3, true},
+ {1, []string{"class1", "class2"}, nil, 1, 1, true},
+ {3, nil, []string{"class1"}, 2, 3, true},
+ {1, nil, []string{"class1", "class2"}, 1, 1, true},
+ {1, []string{"class404"}, []string{"class1", "class2"}, 1, 1, true},
+ {1, []string{"class1"}, []string{"class404", "class2"}, nServers, nServers, false},
+ {nServers*2 + 1, []string{"class1"}, nil, nServers, nServers, false},
+ {1, []string{"class404"}, nil, nServers, nServers, false},
+ {1, []string{"class1", "class404"}, nil, nServers, nServers, false},
+ {1, nil, []string{"class1", "class404"}, nServers, nServers, false},
} {
c.Logf("%+v", trial)
st := &StubPutHandler{
arv, _ := arvadosclient.MakeArvadosClient()
kc, _ := MakeKeepClient(arv)
kc.Want_replicas = trial.replicas
- kc.StorageClasses = trial.classes
+ kc.StorageClasses = trial.clientClasses
arv.ApiToken = "abc123"
localRoots := make(map[string]string)
writableLocalRoots := make(map[string]string)
}
kc.SetServiceRoots(localRoots, writableLocalRoots, nil)
- _, _, err := kc.PutB([]byte("foo"))
+ _, err := kc.BlockWrite(context.Background(), arvados.BlockWriteOptions{
+ Data: []byte("foo"),
+ StorageClasses: trial.putClasses,
+ })
if trial.success {
c.Check(err, check.IsNil)
} else {
package keepclient
import (
+ "bytes"
+ "context"
"crypto/md5"
"errors"
"fmt"
"strconv"
"strings"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/sdk/go/asyncbuf"
)
// DebugPrintf emits debug messages. The easiest way to enable
}
func (kc *KeepClient) uploadToKeepServer(host string, hash string, classesTodo []string, body io.Reader,
- uploadStatusChan chan<- uploadStatus, expectedLength int64, reqid string) {
+ uploadStatusChan chan<- uploadStatus, expectedLength int, reqid string) {
var req *http.Request
var err error
return
}
- req.ContentLength = expectedLength
+ req.ContentLength = int64(expectedLength)
if expectedLength > 0 {
req.Body = ioutil.NopCloser(body)
} else {
}
}
-func (kc *KeepClient) putReplicas(
- hash string,
- getReader func() io.Reader,
- expectedLength int64) (locator string, replicas int, err error) {
-
- reqid := kc.getRequestID()
+func (kc *KeepClient) BlockWrite(ctx context.Context, req arvados.BlockWriteOptions) (arvados.BlockWriteResponse, error) {
+ var resp arvados.BlockWriteResponse
+ var getReader func() io.Reader
+ if req.Data == nil && req.Reader == nil {
+ return resp, errors.New("invalid BlockWriteOptions: Data and Reader are both nil")
+ }
+ if req.DataSize < 0 {
+ return resp, fmt.Errorf("invalid BlockWriteOptions: negative DataSize %d", req.DataSize)
+ }
+ if req.DataSize > BLOCKSIZE || len(req.Data) > BLOCKSIZE {
+ return resp, ErrOversizeBlock
+ }
+ if req.Data != nil {
+ if req.DataSize > len(req.Data) {
+ return resp, errors.New("invalid BlockWriteOptions: DataSize > len(Data)")
+ }
+ if req.DataSize == 0 {
+ req.DataSize = len(req.Data)
+ }
+ getReader = func() io.Reader { return bytes.NewReader(req.Data[:req.DataSize]) }
+ } else {
+ buf := asyncbuf.NewBuffer(make([]byte, 0, req.DataSize))
+ go func() {
+ _, err := io.Copy(buf, HashCheckingReader{req.Reader, md5.New(), req.Hash})
+ buf.CloseWithError(err)
+ }()
+ getReader = buf.NewReader
+ }
+ if req.Hash == "" {
+ m := md5.New()
+ _, err := io.Copy(m, getReader())
+ if err != nil {
+ return resp, err
+ }
+ req.Hash = fmt.Sprintf("%x", m.Sum(nil))
+ }
+ if req.StorageClasses == nil {
+ req.StorageClasses = kc.StorageClasses
+ }
+ if req.Replicas == 0 {
+ req.Replicas = kc.Want_replicas
+ }
+ if req.RequestID == "" {
+ req.RequestID = kc.getRequestID()
+ }
+ if req.Attempts == 0 {
+ req.Attempts = 1 + kc.Retries
+ }
// Calculate the ordering for uploading to servers
- sv := NewRootSorter(kc.WritableLocalRoots(), hash).GetSortedRoots()
+ sv := NewRootSorter(kc.WritableLocalRoots(), req.Hash).GetSortedRoots()
// The next server to try contacting
nextServer := 0
}()
}()
- replicasWanted := kc.Want_replicas
replicasTodo := map[string]int{}
- for _, c := range kc.StorageClasses {
- replicasTodo[c] = replicasWanted
+ for _, c := range req.StorageClasses {
+ replicasTodo[c] = req.Replicas
}
- replicasDone := 0
replicasPerThread := kc.replicasPerService
if replicasPerThread < 1 {
// unlimited or unknown
- replicasPerThread = replicasWanted
+ replicasPerThread = req.Replicas
}
- retriesRemaining := 1 + kc.Retries
+ retriesRemaining := req.Attempts
var retryServers []string
lastError := make(map[string]string)
}
}
if !trackingClasses {
- maxConcurrency = replicasWanted - replicasDone
+ maxConcurrency = req.Replicas - resp.Replicas
}
if maxConcurrency < 1 {
// If there are no non-zero entries in
for active*replicasPerThread < maxConcurrency {
// Start some upload requests
if nextServer < len(sv) {
- DebugPrintf("DEBUG: [%s] Begin upload %s to %s", reqid, hash, sv[nextServer])
- go kc.uploadToKeepServer(sv[nextServer], hash, classesTodo, getReader(), uploadStatusChan, expectedLength, reqid)
+ DebugPrintf("DEBUG: [%s] Begin upload %s to %s", req.RequestID, req.Hash, sv[nextServer])
+ go kc.uploadToKeepServer(sv[nextServer], req.Hash, classesTodo, getReader(), uploadStatusChan, req.DataSize, req.RequestID)
nextServer++
active++
} else {
msg += resp + "; "
}
msg = msg[:len(msg)-2]
- return locator, replicasDone, InsufficientReplicasError(errors.New(msg))
+ return resp, InsufficientReplicasError(errors.New(msg))
}
break
}
}
- DebugPrintf("DEBUG: [%s] Replicas remaining to write: %v active uploads: %v", reqid, replicasTodo, active)
+ DebugPrintf("DEBUG: [%s] Replicas remaining to write: %v active uploads: %v", req.RequestID, replicasTodo, active)
if active < 1 {
break
}
if status.statusCode == http.StatusOK {
delete(lastError, status.url)
- replicasDone += status.replicasStored
+ resp.Replicas += status.replicasStored
if len(status.classesStored) == 0 {
// Server doesn't report
// storage classes. Give up
delete(replicasTodo, className)
}
}
- locator = status.response
+ resp.Locator = status.response
} else {
msg := fmt.Sprintf("[%d] %s", status.statusCode, status.response)
if len(msg) > 100 {
sv = retryServers
}
- return locator, replicasDone, nil
+ return resp, nil
}
func parseStorageClassesConfirmedHeader(hdr string) (map[string]int, error) {
"syscall"
"time"
+ "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/dispatch"
return nil
}
+ loader := config.NewLoader(nil, logger)
+ cfg, err := loader.Load()
+ cluster, err := cfg.GetCluster("")
+ if err != nil {
+ return fmt.Errorf("config error: %s", err)
+ }
+
logger.Printf("crunch-dispatch-local %s started", version)
runningCmds = make(map[string]*exec.Cmd)
+ var client arvados.Client
+ client.APIHost = cluster.Services.Controller.ExternalURL.Host
+ client.AuthToken = cluster.SystemRootToken
+ client.Insecure = cluster.TLS.Insecure
+
+ if client.APIHost != "" || client.AuthToken != "" {
+ // Copy real configs into env vars so [a]
+ // MakeArvadosClient() uses them, and [b] they get
+ // propagated to crunch-run via SLURM.
+ os.Setenv("ARVADOS_API_HOST", client.APIHost)
+ os.Setenv("ARVADOS_API_TOKEN", client.AuthToken)
+ os.Setenv("ARVADOS_API_HOST_INSECURE", "")
+ if client.Insecure {
+ os.Setenv("ARVADOS_API_HOST_INSECURE", "1")
+ }
+ os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
+ } else {
+ logger.Warnf("Client credentials missing from config, so falling back on environment variables (deprecated).")
+ }
+
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
logger.Errorf("error making Arvados client: %v", err)
dispatcher := dispatch.Dispatcher{
Logger: logger,
Arv: arv,
- RunContainer: (&LocalRun{startFunc, make(chan bool, 8), ctx}).run,
+ RunContainer: (&LocalRun{startFunc, make(chan bool, 8), ctx, cluster}).run,
PollPeriod: time.Duration(*pollInterval) * time.Second,
}
startCmd func(container arvados.Container, cmd *exec.Cmd) error
concurrencyLimit chan bool
ctx context.Context
+ cluster *arvados.Cluster
}
// Run a container.
waitGroup.Add(1)
defer waitGroup.Done()
- cmd := exec.Command(*crunchRunCommand, uuid)
+ cmd := exec.Command(*crunchRunCommand, "--runtime-engine="+lr.cluster.Containers.RuntimeEngine, uuid)
cmd.Stdin = nil
cmd.Stderr = os.Stderr
cmd.Stdout = os.Stderr
return cmd.Start()
}
+ cl := arvados.Cluster{Containers: arvados.ContainersConfig{RuntimeEngine: "docker"}}
+
dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
- (&LocalRun{startCmd, make(chan bool, 8), ctx}).run(d, c, s)
+ (&LocalRun{startCmd, make(chan bool, 8), ctx, &cl}).run(d, c, s)
cancel()
}
return cmd.Start()
}
+ cl := arvados.Cluster{Containers: arvados.ContainersConfig{RuntimeEngine: "docker"}}
+
dispatcher.RunContainer = func(d *dispatch.Dispatcher, c arvados.Container, s <-chan arvados.Container) {
- (&LocalRun{startCmd, make(chan bool, 8), ctx}).run(d, c, s)
+ (&LocalRun{startCmd, make(chan bool, 8), ctx, &cl}).run(d, c, s)
cancel()
}
// append() here avoids modifying crunchRunCommand's
// underlying array, which is shared with other goroutines.
crArgs := append([]string(nil), crunchRunCommand...)
+ crArgs = append(crArgs, "--runtime-engine="+disp.cluster.Containers.RuntimeEngine)
crArgs = append(crArgs, container.UUID)
crScript := strings.NewReader(execScript(crArgs))
+++ /dev/null
-/usr/local/lib/arvbox/runsu.sh
\ No newline at end of file
--- /dev/null
+#!/bin/bash
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+exec 2>&1
+set -ex -o pipefail
+
+. /usr/local/lib/arvbox/common.sh
+. /usr/local/lib/arvbox/go-setup.sh
+
+flock /var/lib/gopath/gopath.lock go install "git.arvados.org/arvados.git/services/crunch-dispatch-local"
+install $GOPATH/bin/crunch-dispatch-local /usr/local/bin
+ln -sf arvados-server /usr/local/bin/crunch-run
+
+if test "$1" = "--only-deps" ; then
+ exit
+fi
+
+cat > /usr/local/bin/crunch-run.sh <<EOF
+#!/bin/sh
+exec /usr/local/bin/crunch-run -container-enable-networking=default -container-network-mode=host \$@
+EOF
+chmod +x /usr/local/bin/crunch-run.sh
+
+export ARVADOS_API_HOST=$localip:${services[controller-ssl]}
+export ARVADOS_API_HOST_INSECURE=1
+export ARVADOS_API_TOKEN=$(cat $ARVADOS_CONTAINER_PATH/superuser_token)
+
+exec /usr/local/bin/crunch-dispatch-local -crunch-run-command=/usr/local/bin/crunch-run.sh -poll-interval=1
+++ /dev/null
-#!/bin/bash
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-exec 2>&1
-set -ex -o pipefail
-
-. /usr/local/lib/arvbox/common.sh
-. /usr/local/lib/arvbox/go-setup.sh
-
-flock /var/lib/gopath/gopath.lock go install "git.arvados.org/arvados.git/services/crunch-dispatch-local"
-install $GOPATH/bin/crunch-dispatch-local /usr/local/bin
-ln -sf arvados-server /usr/local/bin/crunch-run
-
-if test "$1" = "--only-deps" ; then
- exit
-fi
-
-cat > /usr/local/bin/crunch-run.sh <<EOF
-#!/bin/sh
-exec /usr/local/bin/crunch-run -container-enable-networking=default -container-network-mode=host \$@
-EOF
-chmod +x /usr/local/bin/crunch-run.sh
-
-export ARVADOS_API_HOST=$localip:${services[controller-ssl]}
-export ARVADOS_API_HOST_INSECURE=1
-export ARVADOS_API_TOKEN=$(cat $ARVADOS_CONTAINER_PATH/superuser_token)
-
-exec /usr/local/bin/crunch-dispatch-local -crunch-run-command=/usr/local/bin/crunch-run.sh -poll-interval=1
fi
done
-if ! (ps x | grep -v grep | grep "crunch-dispatch") > /dev/null ; then
+if ! (ps ax | grep -v grep | grep "crunch-dispatch") > /dev/null ; then
waiting="$waiting crunch-dispatch"
fi