13100: Use collection FS for output and logs.
authorTom Clegg <tclegg@veritasgenetics.com>
Thu, 5 Apr 2018 06:31:10 +0000 (02:31 -0400)
committerTom Clegg <tclegg@veritasgenetics.com>
Thu, 5 Apr 2018 19:49:44 +0000 (15:49 -0400)
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg@veritasgenetics.com>

services/crunch-run/copier.go [new file with mode: 0644]
services/crunch-run/copier_test.go [new file with mode: 0644]
services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go
services/crunch-run/logging_test.go
services/crunch-run/upload.go [deleted file]
services/crunch-run/upload_test.go [deleted file]

diff --git a/services/crunch-run/copier.go b/services/crunch-run/copier.go
new file mode 100644 (file)
index 0000000..273ec79
--- /dev/null
@@ -0,0 +1,337 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "encoding/json"
+       "errors"
+       "fmt"
+       "io"
+       "os"
+       "path/filepath"
+       "sort"
+       "strings"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/manifest"
+)
+
+type printfer interface {
+       Printf(string, ...interface{})
+}
+
+var errTooManySymlinks = errors.New("too many symlinks, or symlink cycle")
+
+const limitFollowSymlinks = 10
+
+type filetodo struct {
+       src  string
+       dst  string
+       size int64
+}
+
+// copier copies data from a finished container's output path to a new
+// Arvados collection.
+//
+// Regular files (and symlinks to regular files) in hostOutputDir are
+// copied from the local filesystem.
+//
+// Symlinks to mounted collections, and any collections mounted under
+// ctrOutputDir, are copied by transforming the relevant parts of the
+// existing manifests, without moving any data around.
+//
+// Symlinks to other parts of the container's filesystem result in
+// errors.
+//
+// To use a copier: first call walkMount() to inspect the output
+// directory, grab the necessary parts of already-stored collections,
+// and prepare a list of files that need to be copied from the local
+// filesystem; then call commit() to copy the file data and return a
+// complete output manifest.
+type copier struct {
+       client        *arvados.Client
+       arvClient     IArvadosClient
+       keepClient    IKeepClient
+       hostOutputDir string
+       ctrOutputDir  string
+       mounts        map[string]arvados.Mount
+       secretMounts  map[string]arvados.Mount
+       logger        printfer
+
+       dirs     []string
+       files    []filetodo
+       manifest string
+
+       manifestCache map[string]*manifest.Manifest
+}
+
+// Copy copies data as needed, and returns a new manifest.
+func (cp *copier) Copy() (string, error) {
+       err := cp.walkMount("", cp.ctrOutputDir, limitFollowSymlinks, true)
+       if err != nil {
+               return "", err
+       }
+       fs, err := (&arvados.Collection{ManifestText: cp.manifest}).FileSystem(cp.client, cp.keepClient)
+       if err != nil {
+               return "", err
+       }
+       for _, d := range cp.dirs {
+               err = fs.Mkdir(d, 0777)
+               if err != nil {
+                       return "", err
+               }
+       }
+       for _, f := range cp.files {
+               err = cp.copyFile(fs, f)
+               if err != nil {
+                       return "", err
+               }
+       }
+       return fs.MarshalManifest(".")
+}
+
+func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) error {
+       cp.logger.Printf("copying %q (%d bytes)", f.dst, f.size)
+       dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666)
+       if err != nil {
+               return err
+       }
+       src, err := os.Open(f.src)
+       if err != nil {
+               dst.Close()
+               return err
+       }
+       defer src.Close()
+       _, err = io.Copy(dst, src)
+       if err != nil {
+               dst.Close()
+               return err
+       }
+       return dst.Close()
+}
+
+// Append to cp.manifest, cp.files, and cp.dirs so as to copy src (an
+// absolute path in the container's filesystem) to dest (an absolute
+// path in the output collection, or "" for output root).
+//
+// src must be (or be a descendant of) a readonly "collection" mount,
+// a writable collection mounted at ctrOutputPath, or a "tmp" mount.
+//
+// If walkMountsBelow is true, include contents of any collection
+// mounted below src as well.
+func (cp *copier) walkMount(dest, src string, maxSymlinks int, walkMountsBelow bool) error {
+       // srcRoot, srcMount indicate the innermost mount that
+       // contains src.
+       var srcRoot string
+       var srcMount arvados.Mount
+       for root, mnt := range cp.mounts {
+               if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
+                       srcRoot, srcMount = root, mnt
+               }
+       }
+       for root := range cp.secretMounts {
+               if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
+                       // Silently omit secrets, and symlinks to
+                       // secrets.
+                       return nil
+               }
+       }
+       if srcRoot == "" {
+               return fmt.Errorf("cannot output file %q: not in any mount", src)
+       }
+
+       // srcRelPath is the path to the file/dir we are trying to
+       // copy, relative to its mount point -- ".", "./foo.txt", ...
+       srcRelPath := filepath.Join(".", srcMount.Path, src[len(srcRoot):])
+
+       switch {
+       case srcMount.ExcludeFromOutput:
+       case srcMount.Kind == "tmp":
+               // Handle by walking the host filesystem.
+               return cp.walkHostFS(dest, src, maxSymlinks, walkMountsBelow)
+       case srcMount.Kind != "collection":
+               return fmt.Errorf("%q: unsupported mount %q in output (kind is %q)", src, srcRoot, srcMount.Kind)
+       case !srcMount.Writable:
+               mft, err := cp.getManifest(srcMount.PortableDataHash)
+               if err != nil {
+                       return err
+               }
+               cp.manifest += mft.Extract(srcRelPath, dest).Text
+       case srcRoot == cp.ctrOutputDir:
+               f, err := os.Open(filepath.Join(cp.hostOutputDir, ".arvados#collection"))
+               if err != nil {
+                       return err
+               }
+               defer f.Close()
+               var coll arvados.Collection
+               err = json.NewDecoder(f).Decode(&coll)
+               if err != nil {
+                       return err
+               }
+               mft := manifest.Manifest{Text: coll.ManifestText}
+               cp.manifest += mft.Extract(srcRelPath, dest).Text
+       default:
+               return fmt.Errorf("cannot output %q as %q: writable collection mounted at %q", src, dest, srcRoot)
+       }
+       if walkMountsBelow {
+               return cp.walkMountsBelow(dest, src)
+       } else {
+               return nil
+       }
+}
+
+func (cp *copier) walkMountsBelow(dest, src string) error {
+       for mnt, mntinfo := range cp.mounts {
+               if !strings.HasPrefix(mnt, src+"/") {
+                       continue
+               }
+               if mntinfo.Kind == "text" || mntinfo.Kind == "json" {
+                       // These got copied into the nearest parent
+                       // mount as regular files during setup, so
+                       // they get copied as regular files when we
+                       // process the parent. Output will reflect any
+                       // changes and deletions done by the
+                       // container.
+                       continue
+               }
+               // Example: we are processing dest=/foo src=/mnt1/dir1
+               // (perhaps we followed a symlink /outdir/foo ->
+               // /mnt1/dir1). Caller has already processed the
+               // collection mounted at /mnt1, but now we find that
+               // /mnt1/dir1/mnt2 is also a mount, so we need to copy
+               // src=/mnt1/dir1/mnt2 to dest=/foo/mnt2.
+               //
+               // We handle all descendants of /mnt1/dir1 in this
+               // loop instead of using recursion:
+               // /mnt1/dir1/mnt2/mnt3 is a child of both /mnt1 and
+               // /mnt1/dir1/mnt2, but we only want to walk it
+               // once. (This simplification is safe because mounted
+               // collections cannot contain symlinks.)
+               err := cp.walkMount(dest+mnt[len(src):], mnt, 0, false)
+               if err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+// Add entries to cp.dirs and cp.files so as to copy src (an absolute
+// path in the container's filesystem which corresponds to a real file
+// or directory in cp.hostOutputDir) to dest (an absolute path in the
+// output collection, or "" for output root).
+//
+// Always follow symlinks.
+//
+// If includeMounts is true, include mounts at and below src.
+// Otherwise, skip them.
+func (cp *copier) walkHostFS(dest, src string, maxSymlinks int, includeMounts bool) error {
+       if includeMounts {
+               err := cp.walkMountsBelow(dest, src)
+               if err != nil {
+                       return err
+               }
+       }
+
+       hostsrc := cp.hostOutputDir + src[len(cp.ctrOutputDir):]
+
+       // If src is a symlink, walk its target.
+       fi, err := os.Lstat(hostsrc)
+       if err != nil {
+               return fmt.Errorf("lstat %q: %s", src, err)
+       }
+       if fi.Mode()&os.ModeSymlink != 0 {
+               if maxSymlinks < 0 {
+                       return errTooManySymlinks
+               }
+               target, err := os.Readlink(hostsrc)
+               if err != nil {
+                       return fmt.Errorf("readlink %q: %s", src, err)
+               }
+               if !strings.HasPrefix(target, "/") {
+                       target = filepath.Join(filepath.Dir(src), target)
+               }
+               return cp.walkMount(dest, target, maxSymlinks-1, true)
+       }
+
+       // If src is a regular directory, append it to cp.dirs and
+       // walk each of its children. (If there are no children,
+       // create an empty file "dest/.keep".)
+       if fi.Mode().IsDir() {
+               if dest != "" {
+                       cp.dirs = append(cp.dirs, dest)
+               }
+               dir, err := os.Open(hostsrc)
+               if err != nil {
+                       return fmt.Errorf("open %q: %s", src, err)
+               }
+               names, err := dir.Readdirnames(-1)
+               dir.Close()
+               if err != nil {
+                       return fmt.Errorf("readdirnames %q: %s", src, err)
+               }
+               if len(names) == 0 {
+                       if dest != "" {
+                               cp.files = append(cp.files, filetodo{
+                                       src: os.DevNull,
+                                       dst: dest + "/.keep",
+                               })
+                       }
+                       return nil
+               }
+               sort.Strings(names)
+               for _, name := range names {
+                       dest, src := dest+"/"+name, src+"/"+name
+                       if _, isSecret := cp.secretMounts[src]; isSecret {
+                               continue
+                       }
+                       if mntinfo, isMount := cp.mounts[src]; isMount && mntinfo.Kind != "text" && mntinfo.Kind != "json" {
+                               // If a regular file/dir somehow
+                               // exists at a path that's also a
+                               // mount target, ignore the file --
+                               // the mount has already been included
+                               // with walkMountsBelow().
+                               //
+                               // (...except json/text mounts, which
+                               // are handled as regular files.)
+                               continue
+                       }
+                       err = cp.walkHostFS(dest, src, maxSymlinks, false)
+                       if err != nil {
+                               return err
+                       }
+               }
+               return nil
+       }
+
+       // If src is a regular file, append it to cp.files.
+       if fi.Mode().IsRegular() {
+               cp.files = append(cp.files, filetodo{
+                       src:  hostsrc,
+                       dst:  dest,
+                       size: fi.Size(),
+               })
+               return nil
+       }
+
+       return fmt.Errorf("Unsupported file type (mode %o) in output dir: %q", fi.Mode(), src)
+}
+
+func (cp *copier) getManifest(pdh string) (*manifest.Manifest, error) {
+       if mft, ok := cp.manifestCache[pdh]; ok {
+               return mft, nil
+       }
+       var coll arvados.Collection
+       err := cp.arvClient.Get("collections", pdh, nil, &coll)
+       if err != nil {
+               return nil, fmt.Errorf("error retrieving collection record for %q: %s", pdh, err)
+       }
+       mft := &manifest.Manifest{Text: coll.ManifestText}
+       if cp.manifestCache == nil {
+               cp.manifestCache = map[string]*manifest.Manifest{pdh: mft}
+       } else {
+               cp.manifestCache[pdh] = mft
+       }
+       return mft, nil
+}
diff --git a/services/crunch-run/copier_test.go b/services/crunch-run/copier_test.go
new file mode 100644 (file)
index 0000000..7b13ef9
--- /dev/null
@@ -0,0 +1,191 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+       "io"
+       "io/ioutil"
+       "os"
+
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
+       "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+       "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+       check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&copierSuite{})
+
+type copierSuite struct {
+       cp copier
+}
+
+func (s *copierSuite) SetUpTest(c *check.C) {
+       tmpdir, err := ioutil.TempDir("", "crunch-run.test.")
+       c.Assert(err, check.IsNil)
+       api, err := arvadosclient.MakeArvadosClient()
+       c.Assert(err, check.IsNil)
+       s.cp = copier{
+               client:        arvados.NewClientFromEnv(),
+               arvClient:     api,
+               hostOutputDir: tmpdir,
+               ctrOutputDir:  "/ctr/outdir",
+               mounts: map[string]arvados.Mount{
+                       "/ctr/outdir": {Kind: "tmp"},
+               },
+               secretMounts: map[string]arvados.Mount{
+                       "/secret_text": {Kind: "text", Content: "xyzzy"},
+               },
+       }
+}
+
+func (s *copierSuite) TearDownTest(c *check.C) {
+       os.RemoveAll(s.cp.hostOutputDir)
+}
+
+func (s *copierSuite) TestEmptyOutput(c *check.C) {
+       err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+       c.Check(err, check.IsNil)
+       c.Check(s.cp.dirs, check.DeepEquals, []string(nil))
+       c.Check(len(s.cp.files), check.Equals, 0)
+}
+
+func (s *copierSuite) TestRegularFilesAndDirs(c *check.C) {
+       err := os.MkdirAll(s.cp.hostOutputDir+"/dir1/dir2/dir3", 0755)
+       c.Assert(err, check.IsNil)
+       f, err := os.OpenFile(s.cp.hostOutputDir+"/dir1/foo", os.O_CREATE|os.O_WRONLY, 0644)
+       c.Assert(err, check.IsNil)
+       _, err = io.WriteString(f, "foo")
+       c.Assert(err, check.IsNil)
+       c.Assert(f.Close(), check.IsNil)
+
+       err = s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+       c.Check(err, check.IsNil)
+       c.Check(s.cp.dirs, check.DeepEquals, []string{"/dir1", "/dir1/dir2", "/dir1/dir2/dir3"})
+       c.Check(s.cp.files, check.DeepEquals, []filetodo{
+               {src: os.DevNull, dst: "/dir1/dir2/dir3/.keep"},
+               {src: s.cp.hostOutputDir + "/dir1/foo", dst: "/dir1/foo", size: 3},
+       })
+}
+
+func (s *copierSuite) TestSymlinkCycle(c *check.C) {
+       c.Assert(os.Mkdir(s.cp.hostOutputDir+"/dir1", 0755), check.IsNil)
+       c.Assert(os.Mkdir(s.cp.hostOutputDir+"/dir2", 0755), check.IsNil)
+       c.Assert(os.Symlink("../dir2", s.cp.hostOutputDir+"/dir1/l_dir2"), check.IsNil)
+       c.Assert(os.Symlink("../dir1", s.cp.hostOutputDir+"/dir2/l_dir1"), check.IsNil)
+       err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+       c.Check(err, check.ErrorMatches, `.*cycle.*`)
+}
+
+func (s *copierSuite) TestSymlinkTargetMissing(c *check.C) {
+       c.Assert(os.Symlink("./missing", s.cp.hostOutputDir+"/symlink"), check.IsNil)
+       err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+       c.Check(err, check.ErrorMatches, `.*/ctr/outdir/missing.*`)
+}
+
+func (s *copierSuite) TestSymlinkTargetNotMounted(c *check.C) {
+       c.Assert(os.Symlink("../boop", s.cp.hostOutputDir+"/symlink"), check.IsNil)
+       err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+       c.Check(err, check.ErrorMatches, `.*/ctr/boop.*`)
+}
+
+func (s *copierSuite) TestSymlinkToSecret(c *check.C) {
+       c.Assert(os.Symlink("/secret_text", s.cp.hostOutputDir+"/symlink"), check.IsNil)
+       err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+       c.Check(err, check.IsNil)
+       c.Check(len(s.cp.dirs), check.Equals, 0)
+       c.Check(len(s.cp.files), check.Equals, 0)
+}
+
+func (s *copierSuite) TestSecretInOutputDir(c *check.C) {
+       s.cp.secretMounts["/ctr/outdir/secret_text"] = s.cp.secretMounts["/secret_text"]
+       s.writeFileInOutputDir(c, "secret_text", "xyzzy")
+       err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+       c.Check(err, check.IsNil)
+       c.Check(len(s.cp.dirs), check.Equals, 0)
+       c.Check(len(s.cp.files), check.Equals, 0)
+}
+
+func (s *copierSuite) TestSymlinkToMountedCollection(c *check.C) {
+       s.cp.mounts["/mnt"] = arvados.Mount{
+               Kind:             "collection",
+               PortableDataHash: arvadostest.FooPdh,
+       }
+       c.Assert(os.Symlink("../../mnt", s.cp.hostOutputDir+"/l_dir"), check.IsNil)
+       c.Assert(os.Symlink("/mnt/foo", s.cp.hostOutputDir+"/l_file"), check.IsNil)
+
+       err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+       c.Check(err, check.IsNil)
+       c.Check(s.cp.manifest, check.Matches, `(?ms)\./l_dir acbd\S+ 0:3:foo\n\. acbd\S+ 0:3:l_file\n`)
+}
+
+func (s *copierSuite) TestSymlink(c *check.C) {
+       hostfile := s.cp.hostOutputDir + "/dir1/file"
+
+       err := os.MkdirAll(s.cp.hostOutputDir+"/dir1/dir2/dir3", 0755)
+       c.Assert(err, check.IsNil)
+       s.writeFileInOutputDir(c, "dir1/file", "file")
+       for _, err := range []error{
+               os.Symlink(s.cp.ctrOutputDir+"/dir1/file", s.cp.hostOutputDir+"/l_abs_file"),
+               os.Symlink(s.cp.ctrOutputDir+"/dir1/dir2", s.cp.hostOutputDir+"/l_abs_dir2"),
+               os.Symlink("../../dir1/file", s.cp.hostOutputDir+"/dir1/dir2/l_rel_file"),
+               os.Symlink("dir1/file", s.cp.hostOutputDir+"/l_rel_file"),
+               os.MkdirAll(s.cp.hostOutputDir+"/morelinks", 0755),
+               os.Symlink("../dir1/dir2", s.cp.hostOutputDir+"/morelinks/l_rel_dir2"),
+               os.Symlink("dir1/dir2/dir3", s.cp.hostOutputDir+"/l_rel_dir3"),
+       } {
+               c.Assert(err, check.IsNil)
+       }
+
+       err = s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+       c.Check(err, check.IsNil)
+       c.Check(s.cp.dirs, check.DeepEquals, []string{
+               "/dir1", "/dir1/dir2", "/dir1/dir2/dir3",
+               "/l_abs_dir2", "/l_abs_dir2/dir3",
+               "/l_rel_dir3",
+               "/morelinks", "/morelinks/l_rel_dir2", "/morelinks/l_rel_dir2/dir3",
+       })
+       c.Check(s.cp.files, check.DeepEquals, []filetodo{
+               {dst: "/dir1/dir2/dir3/.keep", src: os.DevNull},
+               {dst: "/dir1/dir2/l_rel_file", src: hostfile, size: 4},
+               {dst: "/dir1/file", src: hostfile, size: 4},
+               {dst: "/l_abs_dir2/dir3/.keep", src: os.DevNull},
+               {dst: "/l_abs_dir2/l_rel_file", src: hostfile, size: 4},
+               {dst: "/l_abs_file", src: hostfile, size: 4},
+               {dst: "/l_rel_dir3/.keep", src: os.DevNull},
+               {dst: "/l_rel_file", src: hostfile, size: 4},
+               {dst: "/morelinks/l_rel_dir2/dir3/.keep", src: os.DevNull},
+               {dst: "/morelinks/l_rel_dir2/l_rel_file", src: hostfile, size: 4},
+       })
+}
+
+func (s *copierSuite) TestUnsupportedOutputMount(c *check.C) {
+       s.cp.mounts["/ctr/outdir"] = arvados.Mount{Kind: "waz"}
+       err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+       c.Check(err, check.NotNil)
+}
+
+func (s *copierSuite) TestUnsupportedMountKindBelow(c *check.C) {
+       s.cp.mounts["/ctr/outdir/dirk"] = arvados.Mount{Kind: "waz"}
+       err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+       c.Check(err, check.NotNil)
+}
+
+func (s *copierSuite) TestUnsupportedWritableMountBelow(c *check.C) {
+       s.cp.mounts["/ctr/outdir/dirk"] = arvados.Mount{
+               Kind:             "collection",
+               PortableDataHash: arvadostest.FooPdh,
+               Writable:         true,
+       }
+       err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+       c.Check(err, check.NotNil)
+}
+
+func (s *copierSuite) writeFileInOutputDir(c *check.C, path, data string) {
+       f, err := os.OpenFile(s.cp.hostOutputDir+"/"+path, os.O_CREATE|os.O_WRONLY, 0644)
+       c.Assert(err, check.IsNil)
+       _, err = io.WriteString(f, data)
+       c.Assert(err, check.IsNil)
+       c.Assert(f.Close(), check.IsNil)
+}
index 53815cbe1c8222d4e6c9614ce889d649224af7e1..aea5917be4c0c0d2b118d17584284bf6231ab2ef 100644 (file)
@@ -57,13 +57,14 @@ var ErrCancelled = errors.New("Cancelled")
 
 // IKeepClient is the minimal Keep API methods used by crunch-run.
 type IKeepClient interface {
-       PutHB(hash string, buf []byte) (string, int, error)
+       PutB(buf []byte) (string, int, error)
+       ReadAt(locator string, p []byte, off int) (int, error)
        ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
        ClearBlockCache()
 }
 
 // NewLogWriter is a factory function to create a new log writer.
-type NewLogWriter func(name string) io.WriteCloser
+type NewLogWriter func(name string) (io.WriteCloser, error)
 
 type RunArvMount func(args []string, tok string) (*exec.Cmd, error)
 
@@ -86,6 +87,7 @@ type ThinDockerClient interface {
 // container.
 type ContainerRunner struct {
        Docker    ThinDockerClient
+       client    *arvados.Client
        ArvClient IArvadosClient
        Kc        IKeepClient
        arvados.Container
@@ -99,7 +101,7 @@ type ContainerRunner struct {
        CrunchLog     *ThrottledLogger
        Stdout        io.WriteCloser
        Stderr        io.WriteCloser
-       LogCollection *CollectionWriter
+       LogCollection arvados.CollectionFileSystem
        LogsPDH       *string
        RunArvMount
        MkTempDir
@@ -275,7 +277,11 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) (
        }
        c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
 
-       runner.arvMountLog = NewThrottledLogger(runner.NewLogWriter("arv-mount"))
+       w, err := runner.NewLogWriter("arv-mount")
+       if err != nil {
+               return nil, err
+       }
+       runner.arvMountLog = NewThrottledLogger(w)
        c.Stdout = runner.arvMountLog
        c.Stderr = runner.arvMountLog
 
@@ -696,18 +702,27 @@ func (runner *ContainerRunner) stopHoststat() error {
        return nil
 }
 
-func (runner *ContainerRunner) startHoststat() {
-       runner.hoststatLogger = NewThrottledLogger(runner.NewLogWriter("hoststat"))
+func (runner *ContainerRunner) startHoststat() error {
+       w, err := runner.NewLogWriter("hoststat")
+       if err != nil {
+               return err
+       }
+       runner.hoststatLogger = NewThrottledLogger(w)
        runner.hoststatReporter = &crunchstat.Reporter{
                Logger:     log.New(runner.hoststatLogger, "", 0),
                CgroupRoot: runner.cgroupRoot,
                PollPeriod: runner.statInterval,
        }
        runner.hoststatReporter.Start()
+       return nil
 }
 
-func (runner *ContainerRunner) startCrunchstat() {
-       runner.statLogger = NewThrottledLogger(runner.NewLogWriter("crunchstat"))
+func (runner *ContainerRunner) startCrunchstat() error {
+       w, err := runner.NewLogWriter("crunchstat")
+       if err != nil {
+               return err
+       }
+       runner.statLogger = NewThrottledLogger(w)
        runner.statReporter = &crunchstat.Reporter{
                CID:          runner.ContainerID,
                Logger:       log.New(runner.statLogger, "", 0),
@@ -716,6 +731,7 @@ func (runner *ContainerRunner) startCrunchstat() {
                PollPeriod:   runner.statInterval,
        }
        runner.statReporter.Start()
+       return nil
 }
 
 type infoCommand struct {
@@ -729,7 +745,10 @@ type infoCommand struct {
 // might differ from what's described in the node record (see
 // LogNodeRecord).
 func (runner *ContainerRunner) LogHostInfo() (err error) {
-       w := runner.NewLogWriter("node-info")
+       w, err := runner.NewLogWriter("node-info")
+       if err != nil {
+               return
+       }
 
        commands := []infoCommand{
                {
@@ -802,11 +821,15 @@ func (runner *ContainerRunner) LogNodeRecord() error {
 }
 
 func (runner *ContainerRunner) logAPIResponse(label, path string, params map[string]interface{}, munge func(interface{})) (logged bool, err error) {
+       writer, err := runner.LogCollection.OpenFile(label+".json", os.O_CREATE|os.O_WRONLY, 0666)
+       if err != nil {
+               return false, err
+       }
        w := &ArvLogWriter{
                ArvClient:     runner.ArvClient,
                UUID:          runner.Container.UUID,
                loggingStream: label,
-               writeCloser:   runner.LogCollection.Open(label + ".json"),
+               writeCloser:   writer,
        }
 
        reader, err := runner.ArvClient.CallRaw("GET", path, "", "", arvadosclient.Dict(params))
@@ -893,8 +916,10 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
                        return err
                }
                runner.Stdout = stdoutFile
+       } else if w, err := runner.NewLogWriter("stdout"); err != nil {
+               return err
        } else {
-               runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
+               runner.Stdout = NewThrottledLogger(w)
        }
 
        if stderrMnt, ok := runner.Container.Mounts["stderr"]; ok {
@@ -903,8 +928,10 @@ func (runner *ContainerRunner) AttachStreams() (err error) {
                        return err
                }
                runner.Stderr = stderrFile
+       } else if w, err := runner.NewLogWriter("stderr"); err != nil {
+               return err
        } else {
-               runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+               runner.Stderr = NewThrottledLogger(w)
        }
 
        if stdinRdr != nil {
@@ -1075,181 +1102,8 @@ func (runner *ContainerRunner) WaitFinish() error {
        }
 }
 
-var ErrNotInOutputDir = fmt.Errorf("Must point to path within the output directory")
-
-func (runner *ContainerRunner) derefOutputSymlink(path string, startinfo os.FileInfo) (tgt string, readlinktgt string, info os.FileInfo, err error) {
-       // Follow symlinks if necessary
-       info = startinfo
-       tgt = path
-       readlinktgt = ""
-       nextlink := path
-       for followed := 0; info.Mode()&os.ModeSymlink != 0; followed++ {
-               if followed >= limitFollowSymlinks {
-                       // Got stuck in a loop or just a pathological number of links, give up.
-                       err = fmt.Errorf("Followed more than %v symlinks from path %q", limitFollowSymlinks, path)
-                       return
-               }
-
-               readlinktgt, err = os.Readlink(nextlink)
-               if err != nil {
-                       return
-               }
-
-               tgt = readlinktgt
-               if !strings.HasPrefix(tgt, "/") {
-                       // Relative symlink, resolve it to host path
-                       tgt = filepath.Join(filepath.Dir(path), tgt)
-               }
-               if strings.HasPrefix(tgt, runner.Container.OutputPath+"/") && !strings.HasPrefix(tgt, runner.HostOutputDir+"/") {
-                       // Absolute symlink to container output path, adjust it to host output path.
-                       tgt = filepath.Join(runner.HostOutputDir, tgt[len(runner.Container.OutputPath):])
-               }
-               if !strings.HasPrefix(tgt, runner.HostOutputDir+"/") {
-                       // After dereferencing, symlink target must either be
-                       // within output directory, or must point to a
-                       // collection mount.
-                       err = ErrNotInOutputDir
-                       return
-               }
-
-               info, err = os.Lstat(tgt)
-               if err != nil {
-                       // tgt
-                       err = fmt.Errorf("Symlink in output %q points to invalid location %q: %v",
-                               path[len(runner.HostOutputDir):], readlinktgt, err)
-                       return
-               }
-
-               nextlink = tgt
-       }
-
-       return
-}
-
-var limitFollowSymlinks = 10
-
-// UploadFile uploads files within the output directory, with special handling
-// for symlinks. If the symlink leads to a keep mount, copy the manifest text
-// from the keep mount into the output manifestText.  Ensure that whether
-// symlinks are relative or absolute, every symlink target (even targets that
-// are symlinks themselves) must point to a path in either the output directory
-// or a collection mount.
-//
-// Assumes initial value of "path" is absolute, and located within runner.HostOutputDir.
-func (runner *ContainerRunner) UploadOutputFile(
-       path string,
-       info os.FileInfo,
-       infoerr error,
-       binds []string,
-       walkUpload *WalkUpload,
-       relocateFrom string,
-       relocateTo string,
-       followed int) (manifestText string, err error) {
-
-       if infoerr != nil {
-               return "", infoerr
-       }
-
-       if info.Mode().IsDir() {
-               // if empty, need to create a .keep file
-               dir, direrr := os.Open(path)
-               if direrr != nil {
-                       return "", direrr
-               }
-               defer dir.Close()
-               names, eof := dir.Readdirnames(1)
-               if len(names) == 0 && eof == io.EOF && path != runner.HostOutputDir {
-                       containerPath := runner.OutputPath + path[len(runner.HostOutputDir):]
-                       for _, bind := range binds {
-                               mnt := runner.Container.Mounts[bind]
-                               // Check if there is a bind for this
-                               // directory, in which case assume we don't need .keep
-                               if (containerPath == bind || strings.HasPrefix(containerPath, bind+"/")) && mnt.PortableDataHash != "d41d8cd98f00b204e9800998ecf8427e+0" {
-                                       return
-                               }
-                       }
-                       outputSuffix := path[len(runner.HostOutputDir)+1:]
-                       return fmt.Sprintf("./%v d41d8cd98f00b204e9800998ecf8427e+0 0:0:.keep\n", outputSuffix), nil
-               }
-               return
-       }
-
-       if followed >= limitFollowSymlinks {
-               // Got stuck in a loop or just a pathological number of
-               // directory links, give up.
-               err = fmt.Errorf("Followed more than %v symlinks from path %q", limitFollowSymlinks, path)
-               return
-       }
-
-       // "path" is the actual path we are visiting
-       // "tgt" is the target of "path" (a non-symlink) after following symlinks
-       // "relocated" is the path in the output manifest where the file should be placed,
-       // but has HostOutputDir as a prefix.
-
-       // The destination path in the output manifest may need to be
-       // logically relocated to some other path in order to appear
-       // in the correct location as a result of following a symlink.
-       // Remove the relocateFrom prefix and replace it with
-       // relocateTo.
-       relocated := relocateTo + path[len(relocateFrom):]
-
-       tgt, readlinktgt, info, derefErr := runner.derefOutputSymlink(path, info)
-       if derefErr != nil && derefErr != ErrNotInOutputDir {
-               return "", derefErr
-       }
-
-       // go through mounts and try reverse map to collection reference
-       for _, bind := range binds {
-               mnt := runner.Container.Mounts[bind]
-               if (tgt == bind || strings.HasPrefix(tgt, bind+"/")) && !mnt.Writable {
-                       // get path relative to bind
-                       targetSuffix := tgt[len(bind):]
-
-                       // Copy mount and adjust the path to add path relative to the bind
-                       adjustedMount := mnt
-                       adjustedMount.Path = filepath.Join(adjustedMount.Path, targetSuffix)
-
-                       // Terminates in this keep mount, so add the
-                       // manifest text at appropriate location.
-                       outputSuffix := relocated[len(runner.HostOutputDir):]
-                       manifestText, err = runner.getCollectionManifestForPath(adjustedMount, outputSuffix)
-                       return
-               }
-       }
-
-       // If target is not a collection mount, it must be located within the
-       // output directory, otherwise it is an error.
-       if derefErr == ErrNotInOutputDir {
-               err = fmt.Errorf("Symlink in output %q points to invalid location %q, must point to path within the output directory.",
-                       path[len(runner.HostOutputDir):], readlinktgt)
-               return
-       }
-
-       if info.Mode().IsRegular() {
-               return "", walkUpload.UploadFile(relocated, tgt)
-       }
-
-       if info.Mode().IsDir() {
-               // Symlink leads to directory.  Walk() doesn't follow
-               // directory symlinks, so we walk the target directory
-               // instead.  Within the walk, file paths are relocated
-               // so they appear under the original symlink path.
-               err = filepath.Walk(tgt, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
-                       var m string
-                       m, walkerr = runner.UploadOutputFile(walkpath, walkinfo, walkerr,
-                               binds, walkUpload, tgt, relocated, followed+1)
-                       if walkerr == nil {
-                               manifestText = manifestText + m
-                       }
-                       return walkerr
-               })
-               return
-       }
-
-       return
-}
-
-// HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
+// CaptureOutput saves data from the container's output directory if
+// needed, and updates the container output accordingly.
 func (runner *ContainerRunner) CaptureOutput() error {
        if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
                // Output may have been set directly by the container, so
@@ -1266,163 +1120,35 @@ func (runner *ContainerRunner) CaptureOutput() error {
                }
        }
 
-       if runner.HostOutputDir == "" {
-               return nil
-       }
-
-       _, err := os.Stat(runner.HostOutputDir)
+       txt, err := (&copier{
+               client:        runner.client,
+               arvClient:     runner.ArvClient,
+               keepClient:    runner.Kc,
+               hostOutputDir: runner.HostOutputDir,
+               ctrOutputDir:  runner.Container.OutputPath,
+               mounts:        runner.Container.Mounts,
+               secretMounts:  runner.SecretMounts,
+               logger:        runner.CrunchLog,
+       }).Copy()
        if err != nil {
-               return fmt.Errorf("While checking host output path: %v", err)
-       }
-
-       // Pre-populate output from the configured mount points
-       var binds []string
-       for bind, mnt := range runner.Container.Mounts {
-               if mnt.Kind == "collection" {
-                       binds = append(binds, bind)
-               }
-       }
-       sort.Strings(binds)
-
-       // Delete secret mounts so they don't get saved to the output collection.
-       for bind := range runner.SecretMounts {
-               if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
-                       err = os.Remove(runner.HostOutputDir + bind[len(runner.Container.OutputPath):])
-                       if err != nil {
-                               return fmt.Errorf("Unable to remove secret mount: %v", err)
-                       }
-               }
-       }
-
-       var manifestText string
-
-       collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
-       _, err = os.Stat(collectionMetafile)
-       if err != nil {
-               // Regular directory
-
-               cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}}
-               walkUpload := cw.BeginUpload(runner.HostOutputDir, runner.CrunchLog.Logger)
-
-               var m string
-               err = filepath.Walk(runner.HostOutputDir, func(path string, info os.FileInfo, err error) error {
-                       m, err = runner.UploadOutputFile(path, info, err, binds, walkUpload, "", "", 0)
-                       if err == nil {
-                               manifestText = manifestText + m
-                       }
-                       return err
-               })
-
-               cw.EndUpload(walkUpload)
-
-               if err != nil {
-                       return fmt.Errorf("While uploading output files: %v", err)
-               }
-
-               m, err = cw.ManifestText()
-               manifestText = manifestText + m
-               if err != nil {
-                       return fmt.Errorf("While uploading output files: %v", err)
-               }
-       } else {
-               // FUSE mount directory
-               file, openerr := os.Open(collectionMetafile)
-               if openerr != nil {
-                       return fmt.Errorf("While opening FUSE metafile: %v", err)
-               }
-               defer file.Close()
-
-               var rec arvados.Collection
-               err = json.NewDecoder(file).Decode(&rec)
-               if err != nil {
-                       return fmt.Errorf("While reading FUSE metafile: %v", err)
-               }
-               manifestText = rec.ManifestText
-       }
-
-       for _, bind := range binds {
-               mnt := runner.Container.Mounts[bind]
-
-               bindSuffix := strings.TrimPrefix(bind, runner.Container.OutputPath)
-
-               if bindSuffix == bind || len(bindSuffix) <= 0 {
-                       // either does not start with OutputPath or is OutputPath itself
-                       continue
-               }
-
-               if mnt.ExcludeFromOutput == true || mnt.Writable {
-                       continue
-               }
-
-               // append to manifest_text
-               m, err := runner.getCollectionManifestForPath(mnt, bindSuffix)
-               if err != nil {
-                       return err
-               }
-
-               manifestText = manifestText + m
-       }
-
-       // Save output
-       var response arvados.Collection
-       manifest := manifest.Manifest{Text: manifestText}
-       manifestText = manifest.Extract(".", ".").Text
-       err = runner.ArvClient.Create("collections",
-               arvadosclient.Dict{
-                       "ensure_unique_name": true,
-                       "collection": arvadosclient.Dict{
-                               "is_trashed":    true,
-                               "name":          "output for " + runner.Container.UUID,
-                               "manifest_text": manifestText}},
-               &response)
+               return err
+       }
+       var resp arvados.Collection
+       err = runner.ArvClient.Create("collections", arvadosclient.Dict{
+               "ensure_unique_name": true,
+               "collection": arvadosclient.Dict{
+                       "is_trashed":    true,
+                       "name":          "output for " + runner.Container.UUID,
+                       "manifest_text": txt,
+               },
+       }, &resp)
        if err != nil {
-               return fmt.Errorf("While creating output collection: %v", err)
+               return fmt.Errorf("error creating output collection: %v", err)
        }
-       runner.OutputPDH = &response.PortableDataHash
+       runner.OutputPDH = &resp.PortableDataHash
        return nil
 }
 
-var outputCollections = make(map[string]arvados.Collection)
-
-// Fetch the collection for the mnt.PortableDataHash
-// Return the manifest_text fragment corresponding to the specified mnt.Path
-//  after making any required updates.
-//  Ex:
-//    If mnt.Path is not specified,
-//      return the entire manifest_text after replacing any "." with bindSuffix
-//    If mnt.Path corresponds to one stream,
-//      return the manifest_text for that stream after replacing that stream name with bindSuffix
-//    Otherwise, check if a filename in any one stream is being sought. Return the manifest_text
-//      for that stream after replacing stream name with bindSuffix minus the last word
-//      and the file name with last word of the bindSuffix
-//  Allowed path examples:
-//    "path":"/"
-//    "path":"/subdir1"
-//    "path":"/subdir1/subdir2"
-//    "path":"/subdir/filename" etc
-func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, bindSuffix string) (string, error) {
-       collection := outputCollections[mnt.PortableDataHash]
-       if collection.PortableDataHash == "" {
-               err := runner.ArvClient.Get("collections", mnt.PortableDataHash, nil, &collection)
-               if err != nil {
-                       return "", fmt.Errorf("While getting collection for %v: %v", mnt.PortableDataHash, err)
-               }
-               outputCollections[mnt.PortableDataHash] = collection
-       }
-
-       if collection.ManifestText == "" {
-               runner.CrunchLog.Printf("No manifest text for collection %v", collection.PortableDataHash)
-               return "", nil
-       }
-
-       mft := manifest.Manifest{Text: collection.ManifestText}
-       extracted := mft.Extract(mnt.Path, bindSuffix)
-       if extracted.Err != nil {
-               return "", fmt.Errorf("Error parsing manifest for %v: %v", mnt.PortableDataHash, extracted.Err.Error())
-       }
-       return extracted.Text, nil
-}
-
 func (runner *ContainerRunner) CleanupDirs() {
        if runner.ArvMount != nil {
                var delay int64 = 8
@@ -1495,8 +1221,12 @@ func (runner *ContainerRunner) CommitLogs() error {
                // point, but re-open crunch log with ArvClient in case there are any
                // other further errors (such as failing to write the log to Keep!)
                // while shutting down
-               runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient,
-                       UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil})
+               runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{
+                       ArvClient:     runner.ArvClient,
+                       UUID:          runner.Container.UUID,
+                       loggingStream: "crunch-run",
+                       writeCloser:   nil,
+               })
                runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
        }()
 
@@ -1509,7 +1239,7 @@ func (runner *ContainerRunner) CommitLogs() error {
                return nil
        }
 
-       mt, err := runner.LogCollection.ManifestText()
+       mt, err := runner.LogCollection.MarshalManifest(".")
        if err != nil {
                return fmt.Errorf("While creating log manifest: %v", err)
        }
@@ -1584,12 +1314,17 @@ func (runner *ContainerRunner) IsCancelled() bool {
 }
 
 // NewArvLogWriter creates an ArvLogWriter
-func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
+func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, error) {
+       writer, err := runner.LogCollection.OpenFile(name+".txt", os.O_CREATE|os.O_WRONLY, 0666)
+       if err != nil {
+               return nil, err
+       }
        return &ArvLogWriter{
                ArvClient:     runner.ArvClient,
                UUID:          runner.Container.UUID,
                loggingStream: name,
-               writeCloser:   runner.LogCollection.Open(name + ".txt")}
+               writeCloser:   writer,
+       }, nil
 }
 
 // Run the full container lifecycle.
@@ -1658,7 +1393,10 @@ func (runner *ContainerRunner) Run() (err error) {
                return
        }
        runner.setupSignals()
-       runner.startHoststat()
+       err = runner.startHoststat()
+       if err != nil {
+               return
+       }
 
        // check for and/or load image
        err = runner.LoadImage()
@@ -1707,7 +1445,10 @@ func (runner *ContainerRunner) Run() (err error) {
        }
        runner.finalState = "Cancelled"
 
-       runner.startCrunchstat()
+       err = runner.startCrunchstat()
+       if err != nil {
+               return
+       }
 
        err = runner.StartContainer()
        if err != nil {
@@ -1766,12 +1507,13 @@ func (runner *ContainerRunner) fetchContainerRecord() error {
 }
 
 // NewContainerRunner creates a new container runner.
-func NewContainerRunner(api IArvadosClient,
-       kc IKeepClient,
-       docker ThinDockerClient,
-       containerUUID string) *ContainerRunner {
-
-       cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker}
+func NewContainerRunner(client *arvados.Client, api IArvadosClient, kc IKeepClient, docker ThinDockerClient, containerUUID string) (*ContainerRunner, error) {
+       cr := &ContainerRunner{
+               client:    client,
+               ArvClient: api,
+               Kc:        kc,
+               Docker:    docker,
+       }
        cr.NewLogWriter = cr.NewArvLogWriter
        cr.RunArvMount = cr.ArvMountCmd
        cr.MkTempDir = ioutil.TempDir
@@ -1783,14 +1525,22 @@ func NewContainerRunner(api IArvadosClient,
                cl.ApiToken = token
                return cl, nil
        }
-       cr.LogCollection = &CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
+       var err error
+       cr.LogCollection, err = (&arvados.Collection{}).FileSystem(cr.client, cr.Kc)
+       if err != nil {
+               return nil, err
+       }
        cr.Container.UUID = containerUUID
-       cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
+       w, err := cr.NewLogWriter("crunch-run")
+       if err != nil {
+               return nil, err
+       }
+       cr.CrunchLog = NewThrottledLogger(w)
        cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
 
        loadLogThrottleParams(api)
 
-       return cr
+       return cr, nil
 }
 
 func main() {
@@ -1842,7 +1592,10 @@ func main() {
        // minimum version we want to support.
        docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
 
-       cr := NewContainerRunner(api, kc, docker, containerId)
+       cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerId)
+       if err != nil {
+               log.Fatal(err)
+       }
        if dockererr != nil {
                cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
                cr.checkBrokenNode(dockererr)
@@ -1872,15 +1625,15 @@ func main() {
        if *memprofile != "" {
                f, err := os.Create(*memprofile)
                if err != nil {
-                       log.Printf("could not create memory profile: ", err)
+                       log.Printf("could not create memory profile: %s", err)
                }
                runtime.GC() // get up-to-date statistics
                if err := pprof.WriteHeapProfile(f); err != nil {
-                       log.Printf("could not write memory profile: ", err)
+                       log.Printf("could not write memory profile: %s", err)
                }
                closeerr := f.Close()
                if closeerr != nil {
-                       log.Printf("closing memprofile file: ", err)
+                       log.Printf("closing memprofile file: %s", err)
                }
        }
 
index ba9195999d3e7a3f867b3ece3eecea03a600c0e7..8ee462586d4b994743b141c712f445ee02268253 100644 (file)
@@ -13,7 +13,6 @@ import (
        "fmt"
        "io"
        "io/ioutil"
-       "log"
        "net"
        "os"
        "os/exec"
@@ -46,10 +45,12 @@ func TestCrunchExec(t *testing.T) {
 var _ = Suite(&TestSuite{})
 
 type TestSuite struct {
+       client *arvados.Client
        docker *TestDockerClient
 }
 
 func (s *TestSuite) SetUpTest(c *C) {
+       s.client = arvados.NewClientFromEnv()
        s.docker = NewTestDockerClient()
 }
 
@@ -356,12 +357,16 @@ call:
        return nil
 }
 
-func (client *KeepTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+func (client *KeepTestClient) PutB(buf []byte) (string, int, error) {
        client.Content = buf
-       return fmt.Sprintf("%s+%d", hash, len(buf)), len(buf), nil
+       return fmt.Sprintf("%x+%d", md5.Sum(buf), len(buf)), len(buf), nil
 }
 
-func (*KeepTestClient) ClearBlockCache() {
+func (client *KeepTestClient) ReadAt(string, []byte, int) (int, error) {
+       return 0, errors.New("not implemented")
+}
+
+func (client *KeepTestClient) ClearBlockCache() {
 }
 
 func (client *KeepTestClient) Close() {
@@ -413,9 +418,10 @@ func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename s
 func (s *TestSuite) TestLoadImage(c *C) {
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr := NewContainerRunner(&ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       c.Assert(err, IsNil)
 
-       _, err := cr.Docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
+       _, err = cr.Docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
        c.Check(err, IsNil)
 
        _, _, err = cr.Docker.ImageInspectWithRaw(nil, hwImageId)
@@ -480,26 +486,24 @@ func (ArvErrorTestClient) Discovery(key string) (interface{}, error) {
        return discoveryMap[key], nil
 }
 
-type KeepErrorTestClient struct{}
-
-func (KeepErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
-       return "", 0, errors.New("KeepError")
+type KeepErrorTestClient struct {
+       KeepTestClient
 }
 
-func (KeepErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) {
+func (*KeepErrorTestClient) ManifestFileReader(manifest.Manifest, string) (arvados.File, error) {
        return nil, errors.New("KeepError")
 }
 
-func (KeepErrorTestClient) ClearBlockCache() {
+func (*KeepErrorTestClient) PutB(buf []byte) (string, int, error) {
+       return "", 0, errors.New("KeepError")
 }
 
-type KeepReadErrorTestClient struct{}
-
-func (KeepReadErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
-       return "", 0, nil
+type KeepReadErrorTestClient struct {
+       KeepTestClient
 }
 
-func (KeepReadErrorTestClient) ClearBlockCache() {
+func (*KeepReadErrorTestClient) ReadAt(string, []byte, int) (int, error) {
+       return 0, errors.New("KeepError")
 }
 
 type ErrorReader struct {
@@ -522,37 +526,42 @@ func (s *TestSuite) TestLoadImageArvError(c *C) {
        // (1) Arvados error
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr := NewContainerRunner(ArvErrorTestClient{}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, ArvErrorTestClient{}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       c.Assert(err, IsNil)
        cr.Container.ContainerImage = hwPDH
 
-       err := cr.LoadImage()
+       err = cr.LoadImage()
        c.Check(err.Error(), Equals, "While getting container image collection: ArvError")
 }
 
 func (s *TestSuite) TestLoadImageKeepError(c *C) {
        // (2) Keep error
-       cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, &ArvTestClient{}, &KeepErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       c.Assert(err, IsNil)
        cr.Container.ContainerImage = hwPDH
 
-       err := cr.LoadImage()
+       err = cr.LoadImage()
+       c.Assert(err, NotNil)
        c.Check(err.Error(), Equals, "While creating ManifestFileReader for container image: KeepError")
 }
 
 func (s *TestSuite) TestLoadImageCollectionError(c *C) {
        // (3) Collection doesn't contain image
-       cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, &ArvTestClient{}, &KeepReadErrorTestClient{}, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       c.Assert(err, IsNil)
        cr.Container.ContainerImage = otherPDH
 
-       err := cr.LoadImage()
+       err = cr.LoadImage()
        c.Check(err.Error(), Equals, "First file in the container image collection does not end in .tar")
 }
 
 func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
        // (4) Collection doesn't contain image
-       cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, &ArvTestClient{}, &KeepReadErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       c.Assert(err, IsNil)
        cr.Container.ContainerImage = hwPDH
 
-       err := cr.LoadImage()
+       err = cr.LoadImage()
        c.Check(err, NotNil)
 }
 
@@ -569,14 +578,14 @@ type TestLogs struct {
        Stderr ClosableBuffer
 }
 
-func (tl *TestLogs) NewTestLoggingWriter(logstr string) io.WriteCloser {
+func (tl *TestLogs) NewTestLoggingWriter(logstr string) (io.WriteCloser, error) {
        if logstr == "stdout" {
-               return &tl.Stdout
+               return &tl.Stdout, nil
        }
        if logstr == "stderr" {
-               return &tl.Stderr
+               return &tl.Stderr, nil
        }
-       return nil
+       return nil, errors.New("???")
 }
 
 func dockerLog(fd byte, msg string) []byte {
@@ -595,13 +604,14 @@ func (s *TestSuite) TestRunContainer(c *C) {
        }
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr := NewContainerRunner(&ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       c.Assert(err, IsNil)
 
        var logs TestLogs
        cr.NewLogWriter = logs.NewTestLoggingWriter
        cr.Container.ContainerImage = hwPDH
        cr.Container.Command = []string{"./hw"}
-       err := cr.LoadImage()
+       err = cr.LoadImage()
        c.Check(err, IsNil)
 
        err = cr.CreateContainer()
@@ -621,14 +631,15 @@ func (s *TestSuite) TestCommitLogs(c *C) {
        api := &ArvTestClient{}
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       c.Assert(err, IsNil)
        cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
 
        cr.CrunchLog.Print("Hello world!")
        cr.CrunchLog.Print("Goodbye")
        cr.finalState = "Complete"
 
-       err := cr.CommitLogs()
+       err = cr.CommitLogs()
        c.Check(err, IsNil)
 
        c.Check(api.Calls, Equals, 2)
@@ -642,9 +653,10 @@ func (s *TestSuite) TestUpdateContainerRunning(c *C) {
        api := &ArvTestClient{}
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       c.Assert(err, IsNil)
 
-       err := cr.UpdateContainerRunning()
+       err = cr.UpdateContainerRunning()
        c.Check(err, IsNil)
 
        c.Check(api.Content[0]["container"].(arvadosclient.Dict)["state"], Equals, "Running")
@@ -654,7 +666,8 @@ func (s *TestSuite) TestUpdateContainerComplete(c *C) {
        api := &ArvTestClient{}
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       c.Assert(err, IsNil)
 
        cr.LogsPDH = new(string)
        *cr.LogsPDH = "d3a229d2fe3690c2c3e75a71a153c6a3+60"
@@ -663,7 +676,7 @@ func (s *TestSuite) TestUpdateContainerComplete(c *C) {
        *cr.ExitCode = 42
        cr.finalState = "Complete"
 
-       err := cr.UpdateContainerFinal()
+       err = cr.UpdateContainerFinal()
        c.Check(err, IsNil)
 
        c.Check(api.Content[0]["container"].(arvadosclient.Dict)["log"], Equals, *cr.LogsPDH)
@@ -675,11 +688,12 @@ func (s *TestSuite) TestUpdateContainerCancelled(c *C) {
        api := &ArvTestClient{}
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       c.Assert(err, IsNil)
        cr.cCancelled = true
        cr.finalState = "Cancelled"
 
-       err := cr.UpdateContainerFinal()
+       err = cr.UpdateContainerFinal()
        c.Check(err, IsNil)
 
        c.Check(api.Content[0]["container"].(arvadosclient.Dict)["log"], IsNil)
@@ -700,7 +714,7 @@ func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exi
        err = json.Unmarshal([]byte(record), &sm)
        c.Check(err, IsNil)
        secretMounts, err := json.Marshal(sm)
-       log.Printf("%q %q", sm, secretMounts)
+       c.Logf("%s %q", sm, secretMounts)
        c.Check(err, IsNil)
 
        s.docker.exitCode = exitCode
@@ -711,7 +725,8 @@ func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exi
        s.docker.api = api
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr = NewContainerRunner(api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr, err = NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       c.Assert(err, IsNil)
        cr.statInterval = 100 * time.Millisecond
        am := &ArvMountCmdLine{}
        cr.RunArvMount = am.ArvMountTest
@@ -989,7 +1004,8 @@ func (s *TestSuite) testStopContainer(c *C, setup func(cr *ContainerRunner)) {
        api := &ArvTestClient{Container: rec}
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr := NewContainerRunner(api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       c.Assert(err, IsNil)
        cr.RunArvMount = func([]string, string) (*exec.Cmd, error) { return nil, nil }
        cr.MkArvClient = func(token string) (IArvadosClient, error) {
                return &ArvTestClient{}, nil
@@ -1060,7 +1076,8 @@ func (s *TestSuite) TestSetupMounts(c *C) {
        api := &ArvTestClient{}
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       c.Assert(err, IsNil)
        am := &ArvMountCmdLine{}
        cr.RunArvMount = am.ArvMountTest
 
@@ -1470,7 +1487,8 @@ func (s *TestSuite) stdoutErrorRunHelper(c *C, record string, fn func(t *TestDoc
        api = &ArvTestClient{Container: rec}
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr = NewContainerRunner(api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr, err = NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       c.Assert(err, IsNil)
        am := &ArvMountCmdLine{}
        cr.RunArvMount = am.ArvMountTest
        cr.MkArvClient = func(token string) (IArvadosClient, error) {
@@ -1512,8 +1530,8 @@ func (s *TestSuite) TestStdoutWithWrongKindCollection(c *C) {
 }
 
 func (s *TestSuite) TestFullRunWithAPI(c *C) {
+       defer os.Setenv("ARVADOS_API_HOST", os.Getenv("ARVADOS_API_HOST"))
        os.Setenv("ARVADOS_API_HOST", "test.arvados.org")
-       defer os.Unsetenv("ARVADOS_API_HOST")
        api, _, _ := s.fullRunHelper(c, `{
     "command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
@@ -1535,8 +1553,8 @@ func (s *TestSuite) TestFullRunWithAPI(c *C) {
 }
 
 func (s *TestSuite) TestFullRunSetOutput(c *C) {
+       defer os.Setenv("ARVADOS_API_HOST", os.Getenv("ARVADOS_API_HOST"))
        os.Setenv("ARVADOS_API_HOST", "test.arvados.org")
-       defer os.Unsetenv("ARVADOS_API_HOST")
        api, _, _ := s.fullRunHelper(c, `{
     "command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"],
     "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
@@ -1634,7 +1652,7 @@ func (s *TestSuite) TestStdoutWithMultipleMountPointsUnderOutputDir(c *C) {
                                manifest := collection["manifest_text"].(string)
 
                                c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
-./foo 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0abcdefgh11234567890@569fa8c3 9:18:bar 9:18:sub1file2
+./foo 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0abcdefgh11234567890@569fa8c3 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396cabcdefghij6419876543234@569fa8c4 9:18:bar 36:18:sub1file2
 ./foo/baz 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0bcdefghijk544332211@569fa8c5 9:18:sub2file2
 ./foo/sub1 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396cabcdefghij6419876543234@569fa8c4 0:9:file1_in_subdir1.txt 9:18:file2_in_subdir1.txt
 ./foo/sub1/subdir2 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0bcdefghijk544332211@569fa8c5 0:9:file1_in_subdir2.txt 9:18:file2_in_subdir2.txt
@@ -1652,7 +1670,7 @@ func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest(
                "environment": {"FROBIZ": "bilbo"},
                "mounts": {
         "/tmp": {"kind": "tmp"},
-        "/tmp/foo/bar": {"kind": "collection", "portable_data_hash": "b0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt"},
+        "/tmp/foo/bar": {"kind": "collection", "portable_data_hash": "b0def87f80dd594d4675809e83bd4f15+367", "path": "/subdir1/file2_in_subdir1.txt"},
         "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
     },
                "output_path": "/tmp",
@@ -1685,52 +1703,6 @@ func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest(
        }
 }
 
-func (s *TestSuite) TestOutputSymlinkToInput(c *C) {
-       helperRecord := `{
-               "command": ["/bin/sh", "-c", "echo $FROBIZ"],
-               "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
-               "cwd": "/bin",
-               "environment": {"FROBIZ": "bilbo"},
-               "mounts": {
-        "/tmp": {"kind": "tmp"},
-        "/keep/foo/sub1file2": {"kind": "collection", "portable_data_hash": "a0def87f80dd594d4675809e83bd4f15+367", "path": "/subdir1/file2_in_subdir1.txt"},
-        "/keep/foo2": {"kind": "collection", "portable_data_hash": "a0def87f80dd594d4675809e83bd4f15+367"}
-    },
-               "output_path": "/tmp",
-               "priority": 1,
-               "runtime_constraints": {}
-       }`
-
-       extraMounts := []string{
-               "a0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt",
-       }
-
-       api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
-               os.Symlink("/keep/foo/sub1file2", t.realTemp+"/tmp2/baz")
-               os.Symlink("/keep/foo2/subdir1/file2_in_subdir1.txt", t.realTemp+"/tmp2/baz2")
-               os.Symlink("/keep/foo2/subdir1", t.realTemp+"/tmp2/baz3")
-               os.Mkdir(t.realTemp+"/tmp2/baz4", 0700)
-               os.Symlink("/keep/foo2/subdir1/file2_in_subdir1.txt", t.realTemp+"/tmp2/baz4/baz5")
-               t.logWriter.Close()
-       })
-
-       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
-       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
-       for _, v := range api.Content {
-               if v["collection"] != nil {
-                       collection := v["collection"].(arvadosclient.Dict)
-                       if strings.Index(collection["name"].(string), "output") == 0 {
-                               manifest := collection["manifest_text"].(string)
-                               c.Check(manifest, Equals, `. 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396cabcdefghij6419876543234@569fa8c4 9:18:baz 9:18:baz2
-./baz3 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396cabcdefghij6419876543234@569fa8c4 0:9:file1_in_subdir1.txt 9:18:file2_in_subdir1.txt
-./baz3/subdir2 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0bcdefghijk544332211@569fa8c5 0:9:file1_in_subdir2.txt 9:18:file2_in_subdir2.txt
-./baz4 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396cabcdefghij6419876543234@569fa8c4 9:18:baz5
-`)
-                       }
-               }
-       }
-}
-
 func (s *TestSuite) TestOutputError(c *C) {
        helperRecord := `{
                "command": ["/bin/sh", "-c", "echo $FROBIZ"],
@@ -1755,59 +1727,6 @@ func (s *TestSuite) TestOutputError(c *C) {
        c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
 }
 
-func (s *TestSuite) TestOutputSymlinkToOutput(c *C) {
-       helperRecord := `{
-               "command": ["/bin/sh", "-c", "echo $FROBIZ"],
-               "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
-               "cwd": "/bin",
-               "environment": {"FROBIZ": "bilbo"},
-               "mounts": {
-        "/tmp": {"kind": "tmp"}
-    },
-               "output_path": "/tmp",
-               "priority": 1,
-               "runtime_constraints": {}
-       }`
-
-       extraMounts := []string{}
-
-       api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
-               rf, _ := os.Create(t.realTemp + "/tmp2/realfile")
-               rf.Write([]byte("foo"))
-               rf.Close()
-
-               os.Mkdir(t.realTemp+"/tmp2/realdir", 0700)
-               rf, _ = os.Create(t.realTemp + "/tmp2/realdir/subfile")
-               rf.Write([]byte("bar"))
-               rf.Close()
-
-               os.Symlink("/tmp/realfile", t.realTemp+"/tmp2/file1")
-               os.Symlink("realfile", t.realTemp+"/tmp2/file2")
-               os.Symlink("/tmp/file1", t.realTemp+"/tmp2/file3")
-               os.Symlink("file2", t.realTemp+"/tmp2/file4")
-               os.Symlink("realdir", t.realTemp+"/tmp2/dir1")
-               os.Symlink("/tmp/realdir", t.realTemp+"/tmp2/dir2")
-               t.logWriter.Close()
-       })
-
-       c.Check(api.CalledWith("container.exit_code", 0), NotNil)
-       c.Check(api.CalledWith("container.state", "Complete"), NotNil)
-       for _, v := range api.Content {
-               if v["collection"] != nil {
-                       collection := v["collection"].(arvadosclient.Dict)
-                       if strings.Index(collection["name"].(string), "output") == 0 {
-                               manifest := collection["manifest_text"].(string)
-                               c.Check(manifest, Equals,
-                                       `. 7a2c86e102dcc231bd232aad99686dfa+15 0:3:file1 3:3:file2 6:3:file3 9:3:file4 12:3:realfile
-./dir1 37b51d194a7513e45b56f6524f2d51f2+3 0:3:subfile
-./dir2 37b51d194a7513e45b56f6524f2d51f2+3 0:3:subfile
-./realdir 37b51d194a7513e45b56f6524f2d51f2+3 0:3:subfile
-`)
-                       }
-               }
-       }
-}
-
 func (s *TestSuite) TestStdinCollectionMountPoint(c *C) {
        helperRecord := `{
                "command": ["/bin/sh", "-c", "echo $FROBIZ"],
@@ -1911,7 +1830,8 @@ func (s *TestSuite) TestStderrMount(c *C) {
 func (s *TestSuite) TestNumberRoundTrip(c *C) {
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr := NewContainerRunner(&ArvTestClient{callraw: true}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, &ArvTestClient{callraw: true}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+       c.Assert(err, IsNil)
        cr.fetchContainerRecord()
 
        jsondata, err := json.Marshal(cr.Container.Mounts["/json"].Content)
@@ -1920,59 +1840,6 @@ func (s *TestSuite) TestNumberRoundTrip(c *C) {
        c.Check(string(jsondata), Equals, `{"number":123456789123456789}`)
 }
 
-func (s *TestSuite) TestEvalSymlinks(c *C) {
-       kc := &KeepTestClient{}
-       defer kc.Close()
-       cr := NewContainerRunner(&ArvTestClient{callraw: true}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
-
-       realTemp, err := ioutil.TempDir("", "crunchrun_test-")
-       c.Assert(err, IsNil)
-       defer os.RemoveAll(realTemp)
-
-       cr.HostOutputDir = realTemp
-
-       // Absolute path outside output dir
-       os.Symlink("/etc/passwd", realTemp+"/p1")
-
-       // Relative outside output dir
-       os.Symlink("../zip", realTemp+"/p2")
-
-       // Circular references
-       os.Symlink("p4", realTemp+"/p3")
-       os.Symlink("p5", realTemp+"/p4")
-       os.Symlink("p3", realTemp+"/p5")
-
-       // Target doesn't exist
-       os.Symlink("p99", realTemp+"/p6")
-
-       for _, v := range []string{"p1", "p2", "p3", "p4", "p5"} {
-               info, err := os.Lstat(realTemp + "/" + v)
-               c.Assert(err, IsNil)
-               _, _, _, err = cr.derefOutputSymlink(realTemp+"/"+v, info)
-               c.Assert(err, NotNil)
-       }
-}
-
-func (s *TestSuite) TestEvalSymlinkDir(c *C) {
-       kc := &KeepTestClient{}
-       defer kc.Close()
-       cr := NewContainerRunner(&ArvTestClient{callraw: true}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
-
-       realTemp, err := ioutil.TempDir("", "crunchrun_test-")
-       c.Assert(err, IsNil)
-       defer os.RemoveAll(realTemp)
-
-       cr.HostOutputDir = realTemp
-
-       // Absolute path outside output dir
-       os.Symlink(".", realTemp+"/loop")
-
-       v := "loop"
-       info, err := os.Lstat(realTemp + "/" + v)
-       _, err = cr.UploadOutputFile(realTemp+"/"+v, info, err, []string{}, nil, "", "", 0)
-       c.Assert(err, NotNil)
-}
-
 func (s *TestSuite) TestFullBrokenDocker1(c *C) {
        tf, err := ioutil.TempFile("", "brokenNodeHook-")
        c.Assert(err, IsNil)
index abac2bbecc6e997886283f11012d50a44a9beab9..86f8cec04ae8037e37a1d4c9250216416f2f9bd6 100644 (file)
@@ -10,11 +10,14 @@ import (
        "testing"
        "time"
 
+       "git.curoverse.com/arvados.git/sdk/go/arvados"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        . "gopkg.in/check.v1"
 )
 
-type LoggingTestSuite struct{}
+type LoggingTestSuite struct {
+       client *arvados.Client
+}
 
 type TestTimestamper struct {
        count int
@@ -32,11 +35,16 @@ func (this *TestTimestamper) Timestamp(t time.Time) string {
 // Gocheck boilerplate
 var _ = Suite(&LoggingTestSuite{})
 
+func (s *LoggingTestSuite) SetUpTest(c *C) {
+       s.client = arvados.NewClientFromEnv()
+}
+
 func (s *LoggingTestSuite) TestWriteLogs(c *C) {
        api := &ArvTestClient{}
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+       c.Assert(err, IsNil)
        cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
 
        cr.CrunchLog.Print("Hello world!")
@@ -45,7 +53,7 @@ func (s *LoggingTestSuite) TestWriteLogs(c *C) {
 
        c.Check(api.Calls, Equals, 1)
 
-       mt, err := cr.LogCollection.ManifestText()
+       mt, err := cr.LogCollection.MarshalManifest(".")
        c.Check(err, IsNil)
        c.Check(mt, Equals, ". 74561df9ae65ee9f35d5661d42454264+83 0:83:crunch-run.txt\n")
 
@@ -64,7 +72,8 @@ func (s *LoggingTestSuite) TestWriteLogsLarge(c *C) {
        api := &ArvTestClient{}
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+       c.Assert(err, IsNil)
        cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
        cr.CrunchLog.Immediate = nil
 
@@ -77,7 +86,7 @@ func (s *LoggingTestSuite) TestWriteLogsLarge(c *C) {
        c.Check(api.Calls > 1, Equals, true)
        c.Check(api.Calls < 2000000, Equals, true)
 
-       mt, err := cr.LogCollection.ManifestText()
+       mt, err := cr.LogCollection.MarshalManifest(".")
        c.Check(err, IsNil)
        c.Check(mt, Equals, ". 9c2c05d1fae6aaa8af85113ba725716d+67108864 80b821383a07266c2a66a4566835e26e+21780065 0:88888929:crunch-run.txt\n")
 }
@@ -86,10 +95,13 @@ func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) {
        api := &ArvTestClient{}
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+       c.Assert(err, IsNil)
        ts := &TestTimestamper{}
        cr.CrunchLog.Timestamper = ts.Timestamp
-       stdout := NewThrottledLogger(cr.NewLogWriter("stdout"))
+       w, err := cr.NewLogWriter("stdout")
+       c.Assert(err, IsNil)
+       stdout := NewThrottledLogger(w)
        stdout.Timestamper = ts.Timestamp
 
        cr.CrunchLog.Print("Hello world!")
@@ -112,26 +124,24 @@ func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) {
 2015-12-29T15:51:45.000000004Z Blurb
 `)
 
-       mt, err := cr.LogCollection.ManifestText()
+       mt, err := cr.LogCollection.MarshalManifest(".")
        c.Check(err, IsNil)
-       c.Check(mt, Equals, ""+
-               ". 408672f5b5325f7d20edfbf899faee42+83 0:83:crunch-run.txt\n"+
-               ". c556a293010069fa79a6790a931531d5+80 0:80:stdout.txt\n")
+       c.Check(mt, Equals, ". 48f9023dc683a850b1c9b482b14c4b97+163 0:83:crunch-run.txt 83:80:stdout.txt\n")
 }
 
 func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleBytes(c *C) {
-       testWriteLogsWithRateLimit(c, "crunchLogThrottleBytes", 50, 65536, "Exceeded rate 50 bytes per 60 seconds")
+       s.testWriteLogsWithRateLimit(c, "crunchLogThrottleBytes", 50, 65536, "Exceeded rate 50 bytes per 60 seconds")
 }
 
 func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleLines(c *C) {
-       testWriteLogsWithRateLimit(c, "crunchLogThrottleLines", 1, 1024, "Exceeded rate 1 lines per 60 seconds")
+       s.testWriteLogsWithRateLimit(c, "crunchLogThrottleLines", 1, 1024, "Exceeded rate 1 lines per 60 seconds")
 }
 
 func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleBytesPerEvent(c *C) {
-       testWriteLogsWithRateLimit(c, "crunchLimitLogBytesPerJob", 50, 67108864, "Exceeded log limit 50 bytes (crunch_limit_log_bytes_per_job)")
+       s.testWriteLogsWithRateLimit(c, "crunchLimitLogBytesPerJob", 50, 67108864, "Exceeded log limit 50 bytes (crunch_limit_log_bytes_per_job)")
 }
 
-func testWriteLogsWithRateLimit(c *C, throttleParam string, throttleValue int, throttleDefault int, expected string) {
+func (s *LoggingTestSuite) testWriteLogsWithRateLimit(c *C, throttleParam string, throttleValue int, throttleDefault int, expected string) {
        discoveryMap[throttleParam] = float64(throttleValue)
        defer func() {
                discoveryMap[throttleParam] = float64(throttleDefault)
@@ -140,7 +150,8 @@ func testWriteLogsWithRateLimit(c *C, throttleParam string, throttleValue int, t
        api := &ArvTestClient{}
        kc := &KeepTestClient{}
        defer kc.Close()
-       cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+       cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+       c.Assert(err, IsNil)
        cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
 
        cr.CrunchLog.Print("Hello world!")
@@ -149,7 +160,7 @@ func testWriteLogsWithRateLimit(c *C, throttleParam string, throttleValue int, t
 
        c.Check(api.Calls, Equals, 1)
 
-       mt, err := cr.LogCollection.ManifestText()
+       mt, err := cr.LogCollection.MarshalManifest(".")
        c.Check(err, IsNil)
        c.Check(mt, Equals, ". 74561df9ae65ee9f35d5661d42454264+83 0:83:crunch-run.txt\n")
 
diff --git a/services/crunch-run/upload.go b/services/crunch-run/upload.go
deleted file mode 100644 (file)
index ddad8bf..0000000
+++ /dev/null
@@ -1,342 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-// Originally based on sdk/go/crunchrunner/upload.go
-//
-// Unlike the original, which iterates over a directory tree and uploads each
-// file sequentially, this version supports opening and writing multiple files
-// in a collection simultaneously.
-//
-// Eventually this should move into the Arvados Go SDK for a more comprehensive
-// implementation of Collections.
-
-import (
-       "bytes"
-       "crypto/md5"
-       "errors"
-       "fmt"
-       "io"
-       "log"
-       "os"
-       "path/filepath"
-       "strings"
-       "sync"
-
-       "git.curoverse.com/arvados.git/sdk/go/keepclient"
-       "git.curoverse.com/arvados.git/sdk/go/manifest"
-)
-
-// Block is a data block in a manifest stream
-type Block struct {
-       data   []byte
-       offset int64
-}
-
-// CollectionFileWriter is a Writer that permits writing to a file in a Keep Collection.
-type CollectionFileWriter struct {
-       IKeepClient
-       *manifest.ManifestStream
-       offset uint64
-       length uint64
-       *Block
-       uploader chan *Block
-       finish   chan []error
-       fn       string
-}
-
-// Write to a file in a keep collection
-func (m *CollectionFileWriter) Write(p []byte) (int, error) {
-       n, err := m.ReadFrom(bytes.NewReader(p))
-       return int(n), err
-}
-
-// ReadFrom a Reader and write to the Keep collection file.
-func (m *CollectionFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
-       var total int64
-       var count int
-
-       for err == nil {
-               if m.Block == nil {
-                       m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
-               }
-               count, err = r.Read(m.Block.data[m.Block.offset:])
-               total += int64(count)
-               m.Block.offset += int64(count)
-               if m.Block.offset == keepclient.BLOCKSIZE {
-                       m.uploader <- m.Block
-                       m.Block = nil
-               }
-       }
-
-       m.length += uint64(total)
-
-       if err == io.EOF {
-               return total, nil
-       }
-       return total, err
-}
-
-// Close stops writing a file and adds it to the parent manifest.
-func (m *CollectionFileWriter) Close() error {
-       m.ManifestStream.FileStreamSegments = append(m.ManifestStream.FileStreamSegments,
-               manifest.FileStreamSegment{m.offset, m.length, m.fn})
-       return nil
-}
-
-func (m *CollectionFileWriter) NewFile(fn string) {
-       m.offset += m.length
-       m.length = 0
-       m.fn = fn
-}
-
-func (m *CollectionFileWriter) goUpload(workers chan struct{}) {
-       var mtx sync.Mutex
-       var wg sync.WaitGroup
-
-       var errors []error
-       uploader := m.uploader
-       finish := m.finish
-       for block := range uploader {
-               mtx.Lock()
-               m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, "")
-               blockIndex := len(m.ManifestStream.Blocks) - 1
-               mtx.Unlock()
-
-               workers <- struct{}{} // wait for an available worker slot
-               wg.Add(1)
-
-               go func(block *Block, blockIndex int) {
-                       hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
-                       signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
-                       <-workers
-
-                       mtx.Lock()
-                       if err != nil {
-                               errors = append(errors, err)
-                       } else {
-                               m.ManifestStream.Blocks[blockIndex] = signedHash
-                       }
-                       mtx.Unlock()
-
-                       wg.Done()
-               }(block, blockIndex)
-       }
-       wg.Wait()
-
-       finish <- errors
-}
-
-// CollectionWriter implements creating new Keep collections by opening files
-// and writing to them.
-type CollectionWriter struct {
-       MaxWriters int
-       IKeepClient
-       Streams []*CollectionFileWriter
-       workers chan struct{}
-       mtx     sync.Mutex
-}
-
-// Open a new file for writing in the Keep collection.
-func (m *CollectionWriter) Open(path string) io.WriteCloser {
-       var dir string
-       var fn string
-
-       i := strings.Index(path, "/")
-       if i > -1 {
-               dir = "./" + path[0:i]
-               fn = path[i+1:]
-       } else {
-               dir = "."
-               fn = path
-       }
-
-       fw := &CollectionFileWriter{
-               m.IKeepClient,
-               &manifest.ManifestStream{StreamName: dir},
-               0,
-               0,
-               nil,
-               make(chan *Block),
-               make(chan []error),
-               fn}
-
-       m.mtx.Lock()
-       defer m.mtx.Unlock()
-       if m.workers == nil {
-               if m.MaxWriters < 1 {
-                       m.MaxWriters = 2
-               }
-               m.workers = make(chan struct{}, m.MaxWriters)
-       }
-
-       go fw.goUpload(m.workers)
-
-       m.Streams = append(m.Streams, fw)
-
-       return fw
-}
-
-// Finish writing the collection, wait for all blocks to complete uploading.
-func (m *CollectionWriter) Finish() error {
-       var errstring string
-       m.mtx.Lock()
-       defer m.mtx.Unlock()
-
-       for _, stream := range m.Streams {
-               if stream.uploader == nil {
-                       continue
-               }
-               if stream.Block != nil {
-                       stream.uploader <- stream.Block
-                       stream.Block = nil
-               }
-               close(stream.uploader)
-               stream.uploader = nil
-
-               errors := <-stream.finish
-               close(stream.finish)
-               stream.finish = nil
-
-               for _, r := range errors {
-                       errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
-               }
-       }
-       if errstring != "" {
-               return errors.New(errstring)
-       }
-       return nil
-}
-
-// ManifestText returns the manifest text of the collection.  Calls Finish()
-// first to ensure that all blocks are written and that signed locators and
-// available.
-func (m *CollectionWriter) ManifestText() (mt string, err error) {
-       err = m.Finish()
-       if err != nil {
-               return "", err
-       }
-
-       var buf bytes.Buffer
-
-       m.mtx.Lock()
-       defer m.mtx.Unlock()
-       for _, v := range m.Streams {
-               if len(v.FileStreamSegments) == 0 {
-                       continue
-               }
-               k := v.StreamName
-               if k == "." {
-                       buf.WriteString(".")
-               } else {
-                       k = strings.Replace(k, " ", "\\040", -1)
-                       k = strings.Replace(k, "\n", "", -1)
-                       buf.WriteString("./" + k)
-               }
-               if len(v.Blocks) > 0 {
-                       for _, b := range v.Blocks {
-                               buf.WriteString(" ")
-                               buf.WriteString(b)
-                       }
-               } else {
-                       buf.WriteString(" d41d8cd98f00b204e9800998ecf8427e+0")
-               }
-               for _, f := range v.FileStreamSegments {
-                       buf.WriteString(" ")
-                       name := strings.Replace(f.Name, " ", "\\040", -1)
-                       name = strings.Replace(name, "\n", "", -1)
-                       buf.WriteString(fmt.Sprintf("%v:%v:%v", f.SegPos, f.SegLen, name))
-               }
-               buf.WriteString("\n")
-       }
-       return buf.String(), nil
-}
-
-type WalkUpload struct {
-       MaxWriters  int
-       kc          IKeepClient
-       stripPrefix string
-       streamMap   map[string]*CollectionFileWriter
-       status      *log.Logger
-       workers     chan struct{}
-       mtx         sync.Mutex
-}
-
-func (m *WalkUpload) UploadFile(path string, sourcePath string) error {
-       var dir string
-       basename := filepath.Base(path)
-       if len(path) > (len(m.stripPrefix) + len(basename) + 1) {
-               dir = path[len(m.stripPrefix)+1 : (len(path) - len(basename) - 1)]
-       }
-       if dir == "" {
-               dir = "."
-       }
-
-       fn := path[(len(path) - len(basename)):]
-
-       info, err := os.Stat(sourcePath)
-       if err != nil {
-               return err
-       }
-       file, err := os.Open(sourcePath)
-       if err != nil {
-               return err
-       }
-       defer file.Close()
-
-       if m.streamMap[dir] == nil {
-               m.streamMap[dir] = &CollectionFileWriter{
-                       m.kc,
-                       &manifest.ManifestStream{StreamName: dir},
-                       0,
-                       0,
-                       nil,
-                       make(chan *Block),
-                       make(chan []error),
-                       ""}
-
-               m.mtx.Lock()
-               if m.workers == nil {
-                       if m.MaxWriters < 1 {
-                               m.MaxWriters = 2
-                       }
-                       m.workers = make(chan struct{}, m.MaxWriters)
-               }
-               m.mtx.Unlock()
-
-               go m.streamMap[dir].goUpload(m.workers)
-       }
-
-       fileWriter := m.streamMap[dir]
-
-       // Reset the CollectionFileWriter for a new file
-       fileWriter.NewFile(fn)
-
-       m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
-
-       _, err = io.Copy(fileWriter, file)
-       if err != nil {
-               m.status.Printf("Uh oh")
-               return err
-       }
-
-       // Commits the current file.  Legal to call this repeatedly.
-       fileWriter.Close()
-
-       return nil
-}
-
-func (cw *CollectionWriter) BeginUpload(root string, status *log.Logger) *WalkUpload {
-       streamMap := make(map[string]*CollectionFileWriter)
-       return &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}}
-}
-
-func (cw *CollectionWriter) EndUpload(wu *WalkUpload) {
-       cw.mtx.Lock()
-       for _, st := range wu.streamMap {
-               cw.Streams = append(cw.Streams, st)
-       }
-       cw.mtx.Unlock()
-}
diff --git a/services/crunch-run/upload_test.go b/services/crunch-run/upload_test.go
deleted file mode 100644 (file)
index 24333c3..0000000
+++ /dev/null
@@ -1,189 +0,0 @@
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-import (
-       "io/ioutil"
-       "log"
-       "os"
-       "path/filepath"
-       "sync"
-       "syscall"
-
-       . "gopkg.in/check.v1"
-)
-
-type UploadTestSuite struct{}
-
-// Gocheck boilerplate
-var _ = Suite(&UploadTestSuite{})
-
-func writeTree(cw *CollectionWriter, root string, status *log.Logger) (mt string, err error) {
-       walkUpload := cw.BeginUpload(root, status)
-
-       err = filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
-               info, _ = os.Stat(path)
-               if info.Mode().IsRegular() {
-                       return walkUpload.UploadFile(path, path)
-               }
-               return nil
-       })
-
-       cw.EndUpload(walkUpload)
-       if err != nil {
-               return "", err
-       }
-       mt, err = cw.ManifestText()
-       return
-}
-
-func (s *TestSuite) TestSimpleUpload(c *C) {
-       tmpdir, _ := ioutil.TempDir("", "")
-       defer func() {
-               os.RemoveAll(tmpdir)
-       }()
-
-       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
-
-       kc := &KeepTestClient{}
-       defer kc.Close()
-       cw := CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
-       str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
-       c.Check(err, IsNil)
-       c.Check(str, Equals, ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt\n")
-}
-
-func (s *TestSuite) TestUploadThreeFiles(c *C) {
-       tmpdir, _ := ioutil.TempDir("", "")
-       defer func() {
-               os.RemoveAll(tmpdir)
-       }()
-
-       for _, err := range []error{
-               ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600),
-               ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600),
-               os.Symlink("./file2.txt", tmpdir+"/file3.txt"),
-               syscall.Mkfifo(tmpdir+"/ignore.fifo", 0600),
-       } {
-               c.Assert(err, IsNil)
-       }
-
-       kc := &KeepTestClient{}
-       defer kc.Close()
-       cw := CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
-       str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
-
-       c.Check(err, IsNil)
-       c.Check(str, Equals, ". aa65a413921163458c52fea478d5d3ee+9 0:3:file1.txt 3:3:file2.txt 6:3:file3.txt\n")
-}
-
-func (s *TestSuite) TestSimpleUploadSubdir(c *C) {
-       tmpdir, _ := ioutil.TempDir("", "")
-       defer func() {
-               os.RemoveAll(tmpdir)
-       }()
-
-       os.Mkdir(tmpdir+"/subdir", 0700)
-
-       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
-       ioutil.WriteFile(tmpdir+"/subdir/file2.txt", []byte("bar"), 0600)
-
-       kc := &KeepTestClient{}
-       defer kc.Close()
-       cw := CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
-       str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
-
-       c.Check(err, IsNil)
-
-       // streams can get added in either order because of scheduling
-       // of goroutines.
-       if str != `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
-./subdir 37b51d194a7513e45b56f6524f2d51f2+3 0:3:file2.txt
-` && str != `./subdir 37b51d194a7513e45b56f6524f2d51f2+3 0:3:file2.txt
-. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
-` {
-               c.Error("Did not get expected manifest text")
-       }
-}
-
-func (s *TestSuite) TestSimpleUploadLarge(c *C) {
-       tmpdir, _ := ioutil.TempDir("", "")
-       defer func() {
-               os.RemoveAll(tmpdir)
-       }()
-
-       file, _ := os.Create(tmpdir + "/" + "file1.txt")
-       data := make([]byte, 1024*1024-1)
-       for i := range data {
-               data[i] = byte(i % 10)
-       }
-       for i := 0; i < 65; i++ {
-               file.Write(data)
-       }
-       file.Close()
-
-       ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
-
-       kc := &KeepTestClient{}
-       defer kc.Close()
-       cw := CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
-       str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
-
-       c.Check(err, IsNil)
-       c.Check(str, Equals, ". 00ecf01e0d93385115c9f8bed757425d+67108864 485cd630387b6b1846fe429f261ea05f+1048514 0:68157375:file1.txt 68157375:3:file2.txt\n")
-}
-
-func (s *TestSuite) TestUploadEmptySubdir(c *C) {
-       tmpdir, _ := ioutil.TempDir("", "")
-       defer func() {
-               os.RemoveAll(tmpdir)
-       }()
-
-       os.Mkdir(tmpdir+"/subdir", 0700)
-
-       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
-
-       kc := &KeepTestClient{}
-       defer kc.Close()
-       cw := CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
-       str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
-
-       c.Check(err, IsNil)
-       c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
-`)
-}
-
-func (s *TestSuite) TestUploadEmptyFile(c *C) {
-       tmpdir, _ := ioutil.TempDir("", "")
-       defer func() {
-               os.RemoveAll(tmpdir)
-       }()
-
-       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte(""), 0600)
-
-       kc := &KeepTestClient{}
-       defer kc.Close()
-       cw := CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
-       str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
-
-       c.Check(err, IsNil)
-       c.Check(str, Equals, `. d41d8cd98f00b204e9800998ecf8427e+0 0:0:file1.txt
-`)
-}
-
-func (s *TestSuite) TestUploadError(c *C) {
-       tmpdir, _ := ioutil.TempDir("", "")
-       defer func() {
-               os.RemoveAll(tmpdir)
-       }()
-
-       ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
-
-       cw := CollectionWriter{0, &KeepErrorTestClient{}, nil, nil, sync.Mutex{}}
-       str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
-
-       c.Check(err, NotNil)
-       c.Check(str, Equals, "")
-}