pdhOnly := true
tmpcount := 0
- arvMountCmd := []string{"--foreground", "--allow-other"}
+ arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"}
collectionPaths := []string{}
runner.Binds = nil
if staterr != nil {
return fmt.Errorf("While Stat on temp dir: %v", staterr)
}
- err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid)
+ err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
if staterr != nil {
return fmt.Errorf("While Chmod temp dir: %v", err)
}
_, readerr := io.ReadAtLeast(containerReader, header, 8)
if readerr == nil {
- readsize := int64(header[4]) | (int64(header[5]) << 8) | (int64(header[6]) << 16) | (int64(header[7]) << 24)
+ readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
if header[0] == 1 {
// stdout
_, readerr = io.CopyN(runner.Stdout, containerReader, readsize)
func (runner *ContainerRunner) Run() (err error) {
runner.CrunchLog.Printf("Executing container '%s'", runner.ContainerRecord.UUID)
+ hostname, hosterr := os.Hostname()
+ if hosterr != nil {
+ runner.CrunchLog.Printf("Error getting hostname '%v'", hosterr)
+ } else {
+ runner.CrunchLog.Printf("Executing on host '%s'", runner.ContainerRecord.UUID, hostname)
+ }
+
var runerr, waiterr error
defer func() {
cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}}
cr.ContainerRecord.UUID = containerUUID
cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
+ cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
return cr
}
func main() {
flag.Parse()
+ containerId := flag.Arg(0)
+
api, err := arvadosclient.MakeArvadosClient()
if err != nil {
- log.Fatalf("%s: %v", flag.Arg(0), err)
+ log.Fatalf("%s: %v", containerId, err)
}
api.Retries = 8
var kc *keepclient.KeepClient
kc, err = keepclient.MakeKeepClient(&api)
if err != nil {
- log.Fatalf("%s: %v", flag.Arg(0), err)
+ log.Fatalf("%s: %v", containerId, err)
}
kc.Retries = 4
var docker *dockerclient.DockerClient
docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
if err != nil {
- log.Fatalf("%s: %v", flag.Arg(0), err)
+ log.Fatalf("%s: %v", containerId, err)
}
- cr := NewContainerRunner(api, kc, docker, flag.Arg(0))
+ cr := NewContainerRunner(api, kc, docker, containerId)
err = cr.Run()
if err != nil {
- log.Fatalf("%s: %v", flag.Arg(0), err)
+ log.Fatalf("%s: %v", containerId, err)
}
}
by := []byte(msg)
header := make([]byte, 8+len(by))
header[0] = fd
- header[4] = byte(len(by))
+ header[7] = byte(len(by))
copy(header[8:], by)
return header
}
err := cr.SetupMounts()
c.Check(err, IsNil)
- c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--mount-by-pdh", "by_id", "/tmp/mktmpdir1"})
+ c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--mount-by-pdh", "by_id", "/tmp/mktmpdir1"})
c.Check(cr.Binds, DeepEquals, []string{"/tmp/mktmpdir2:/tmp"})
cr.CleanupDirs()
}
err := cr.SetupMounts()
c.Check(err, IsNil)
- c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", "/tmp/mktmpdir1"})
+ c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", "/tmp/mktmpdir1"})
c.Check(cr.Binds, DeepEquals, []string{"/tmp/mktmpdir1/tmp0:/keeptmp"})
cr.CleanupDirs()
}
err := cr.SetupMounts()
c.Check(err, IsNil)
- c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", "/tmp/mktmpdir1"})
+ c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", "/tmp/mktmpdir1"})
var ss sort.StringSlice = cr.Binds
ss.Sort()
c.Check(cr.Binds, DeepEquals, []string{"/tmp/mktmpdir1/by_id/59389a8f9ee9d399be35462a0f92541c+53:/keepinp:ro",
stop bool
flusherDone chan bool
Timestamper
+ Immediate *log.Logger
}
// RFC3339Fixed is a fixed-width version of RFC3339 with microsecond precision,
sc := bufio.NewScanner(bytes.NewBuffer(p))
for sc.Scan() {
_, err = fmt.Fprintf(tl.buf, "%s %s\n", now, sc.Text())
+ if tl.Immediate != nil {
+ tl.Immediate.Printf("%s %s\n", now, sc.Text())
+ }
}
return len(p), err
}
kc := &KeepTestClient{}
cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
+ cr.CrunchLog.Immediate = nil
for i := 0; i < 2000000; i += 1 {
cr.CrunchLog.Printf("Hello %d", i)
str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
c.Check(err, IsNil)
- c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
+
+ // streams can get added in either order because of scheduling
+ // of goroutines.
+ if str != `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
./subdir 37b51d194a7513e45b56f6524f2d51f2+3 0:3:file2.txt
-`)
+` && str != `./subdir 37b51d194a7513e45b56f6524f2d51f2+3 0:3:file2.txt
+. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
+` {
+ c.Error("Did not get expected manifest text")
+ }
}
func (s *TestSuite) TestSimpleUploadLarge(c *C) {