From: Tom Clegg Date: Thu, 1 Feb 2018 16:51:40 +0000 (-0500) Subject: Merge branch '12902-queued-to-cancelled' X-Git-Tag: 1.1.3~4 X-Git-Url: https://git.arvados.org/arvados.git/commitdiff_plain/b3e8a483835334becbef9cba2bebbcf08df47c15?hp=9359659d79a0c17265ff8a09e896920243a1b800 Merge branch '12902-queued-to-cancelled' refs #12902 Arvados-DCO-1.1-Signed-off-by: Tom Clegg --- diff --git a/doc/_includes/_mount_types.liquid b/doc/_includes/_mount_types.liquid index 734b07c8b7..fc8a7991b3 100644 --- a/doc/_includes/_mount_types.liquid +++ b/doc/_includes/_mount_types.liquid @@ -64,7 +64,7 @@ When a container's output_path is a tmp mount backed by local disk, this output 1. Only mount points of kind @collection@ are supported. -2. Mount points underneath output_path must not use @"writable":true@. If any of them are set as @writable@, the API will refuse to create/update the container request, and crunch-run will fail the container. +2. Mount points underneath output_path which have "writable":true are copied into output_path during container initialization and may be updated, renamed, or deleted by the running container. The original collection is not modified. On container completion, files remaining in the output are saved to the output collection. The mount at output_path must be big enough to accommodate copies of the inner writable mounts. 3. If any such mount points are configured as @exclude_from_output":true@, they will be excluded from the output. diff --git a/sdk/cwl/arvados_cwl/__init__.py b/sdk/cwl/arvados_cwl/__init__.py index e82fd9feef..71ddd17221 100644 --- a/sdk/cwl/arvados_cwl/__init__.py +++ b/sdk/cwl/arvados_cwl/__init__.py @@ -224,13 +224,16 @@ class ArvCwlRunner(object): def check_features(self, obj): if isinstance(obj, dict): - if obj.get("writable"): - raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported") + if obj.get("writable") and self.work_api != "containers": + raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs") if obj.get("class") == "DockerRequirement": if obj.get("dockerOutputDirectory"): - # TODO: can be supported by containers API, but not jobs API. - raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError( - "Option 'dockerOutputDirectory' of DockerRequirement not supported.") + if self.work_api != "containers": + raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError( + "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.") + if not obj.get("dockerOutputDirectory").startswith('/'): + raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError( + "Option 'dockerOutputDirectory' must be an absolute path.") for v in obj.itervalues(): self.check_features(v) elif isinstance(obj, list): @@ -279,7 +282,7 @@ class ArvCwlRunner(object): def rewrite(fileobj): fileobj["location"] = generatemapper.mapper(fileobj["location"]).target - for k in ("basename", "listing", "contents", "nameext", "nameroot", "dirname"): + for k in ("listing", "contents", "nameext", "nameroot", "dirname"): if k in fileobj: del fileobj[k] diff --git a/sdk/cwl/arvados_cwl/arvcontainer.py b/sdk/cwl/arvados_cwl/arvcontainer.py index 014e1b94aa..abe67c8fb3 100644 --- a/sdk/cwl/arvados_cwl/arvcontainer.py +++ b/sdk/cwl/arvados_cwl/arvcontainer.py @@ -105,17 +105,32 @@ class ArvadosContainer(object): generatemapper = NoFollowPathMapper([self.generatefiles], "", "", separateDirs=False) + logger.debug("generatemapper is %s", generatemapper._pathmap) + with Perf(metrics, "createfiles %s" % self.name): for f, p in generatemapper.items(): if not p.target: pass - elif p.type in ("File", "Directory"): - source, path = self.arvrunner.fs_access.get_collection(p.resolved) - vwd.copy(path, p.target, source_collection=source) + elif p.type in ("File", "Directory", "WritableFile", "WritableDirectory"): + if p.resolved.startswith("_:"): + vwd.mkdirs(p.target) + else: + source, path = self.arvrunner.fs_access.get_collection(p.resolved) + vwd.copy(path, p.target, source_collection=source) elif p.type == "CreateFile": with vwd.open(p.target, "w") as n: n.write(p.resolved.encode("utf-8")) + def keepemptydirs(p): + if isinstance(p, arvados.collection.RichCollectionBase): + if len(p) == 0: + p.open(".keep", "w").close() + else: + for c in p: + keepemptydirs(p[c]) + + keepemptydirs(vwd) + with Perf(metrics, "generatefiles.save_new %s" % self.name): vwd.save_new() @@ -126,6 +141,8 @@ class ArvadosContainer(object): mounts[mountpoint] = {"kind": "collection", "portable_data_hash": vwd.portable_data_hash(), "path": p.target} + if p.type.startswith("Writable"): + mounts[mountpoint]["writable"] = True container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir} if self.environment: diff --git a/sdk/cwl/arvados_cwl/arvtool.py b/sdk/cwl/arvados_cwl/arvtool.py index b667dac1ca..81faff44e6 100644 --- a/sdk/cwl/arvados_cwl/arvtool.py +++ b/sdk/cwl/arvados_cwl/arvtool.py @@ -36,8 +36,13 @@ class ArvadosCommandTool(CommandLineTool): def job(self, joborder, output_callback, **kwargs): if self.work_api == "containers": - kwargs["outdir"] = "/var/spool/cwl" - kwargs["docker_outdir"] = "/var/spool/cwl" + dockerReq, is_req = self.get_requirement("DockerRequirement") + if dockerReq and dockerReq.get("dockerOutputDirectory"): + kwargs["outdir"] = dockerReq.get("dockerOutputDirectory") + kwargs["docker_outdir"] = dockerReq.get("dockerOutputDirectory") + else: + kwargs["outdir"] = "/var/spool/cwl" + kwargs["docker_outdir"] = "/var/spool/cwl" elif self.work_api == "jobs": kwargs["outdir"] = "$(task.outdir)" kwargs["docker_outdir"] = "$(task.outdir)" diff --git a/sdk/cwl/arvados_cwl/pathmapper.py b/sdk/cwl/arvados_cwl/pathmapper.py index 914ccaa5a1..998890a31c 100644 --- a/sdk/cwl/arvados_cwl/pathmapper.py +++ b/sdk/cwl/arvados_cwl/pathmapper.py @@ -225,12 +225,16 @@ class StagingPathMapper(PathMapper): tgt = os.path.join(stagedir, obj["basename"]) basetgt, baseext = os.path.splitext(tgt) n = 1 - while tgt in self.targets: - n += 1 - tgt = "%s_%i%s" % (basetgt, n, baseext) + if tgt in self.targets and (self.reversemap(tgt)[0] != loc): + while tgt in self.targets: + n += 1 + tgt = "%s_%i%s" % (basetgt, n, baseext) self.targets.add(tgt) if obj["class"] == "Directory": - self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged) + if obj.get("writable"): + self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged) + else: + self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged) if loc.startswith("_:") or self._follow_dirs: self.visitlisting(obj.get("listing", []), tgt, basedir) elif obj["class"] == "File": @@ -239,7 +243,7 @@ class StagingPathMapper(PathMapper): if "contents" in obj and loc.startswith("_:"): self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged) else: - if copy: + if copy or obj.get("writable"): self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged) else: self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged) diff --git a/sdk/cwl/arvados_cwl/runner.py b/sdk/cwl/arvados_cwl/runner.py index 2ca63cfe50..fb5d036e94 100644 --- a/sdk/cwl/arvados_cwl/runner.py +++ b/sdk/cwl/arvados_cwl/runner.py @@ -161,7 +161,7 @@ def upload_docker(arvrunner, tool): if isinstance(tool, CommandLineTool): (docker_req, docker_is_req) = get_feature(tool, "DockerRequirement") if docker_req: - if docker_req.get("dockerOutputDirectory"): + if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers": # TODO: can be supported by containers API, but not jobs API. raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError( "Option 'dockerOutputDirectory' of DockerRequirement not supported.") diff --git a/sdk/cwl/setup.py b/sdk/cwl/setup.py index 88e3d80db3..e5484651ed 100644 --- a/sdk/cwl/setup.py +++ b/sdk/cwl/setup.py @@ -41,7 +41,7 @@ setup(name='arvados-cwl-runner', # Note that arvados/build/run-build-packages.sh looks at this # file to determine what version of cwltool and schema-salad to build. install_requires=[ - 'cwltool==1.0.20180116213856', + 'cwltool==1.0.20180130110340', 'schema-salad==2.6.20171201034858', 'typing==3.5.3.0', 'ruamel.yaml==0.13.7', diff --git a/sdk/cwl/tests/arvados-tests.sh b/sdk/cwl/tests/arvados-tests.sh index 26b31615ea..d3c1e90637 100755 --- a/sdk/cwl/tests/arvados-tests.sh +++ b/sdk/cwl/tests/arvados-tests.sh @@ -6,4 +6,7 @@ if ! arv-get d7514270f356df848477718d58308cc4+94 > /dev/null ; then arv-put --portable-data-hash testdir/* fi +if ! arv-get f225e6259bdd63bc7240599648dde9f1+97 > /dev/null ; then + arv-put --portable-data-hash hg19/* +fi exec cwltest --test arvados-tests.yml --tool arvados-cwl-runner $@ -- --disable-reuse --compute-checksum diff --git a/sdk/cwl/tests/hg19/hg19.fa b/sdk/cwl/tests/hg19/hg19.fa new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/cwl/tests/hg19/hg19.fa.amb b/sdk/cwl/tests/hg19/hg19.fa.amb new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/cwl/tests/hg19/hg19.fa.ann b/sdk/cwl/tests/hg19/hg19.fa.ann new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/cwl/tests/hg19/hg19.fa.fai b/sdk/cwl/tests/hg19/hg19.fa.fai new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sdk/cwl/tests/test_make_output.py b/sdk/cwl/tests/test_make_output.py index 05c9ee7410..806d63ab85 100644 --- a/sdk/cwl/tests/test_make_output.py +++ b/sdk/cwl/tests/test_make_output.py @@ -59,11 +59,13 @@ class TestMakeOutput(unittest.TestCase): final.save_new.assert_has_calls([mock.call(ensure_unique_name=True, name='Test output', owner_uuid='zzzzz-j7d0g-zzzzzzzzzzzzzzz')]) self.assertEqual("""{ "bar": { + "basename": "baz.txt", "class": "File", "location": "baz.txt", "size": 4 }, "foo": { + "basename": "foo.txt", "class": "File", "location": "foo.txt", "size": 3 diff --git a/sdk/go/arvadostest/fixtures.go b/sdk/go/arvadostest/fixtures.go index 5e530658ab..d057c09b22 100644 --- a/sdk/go/arvadostest/fixtures.go +++ b/sdk/go/arvadostest/fixtures.go @@ -13,6 +13,7 @@ const ( DataManagerToken = "320mkve8qkswstz7ff61glpk3mhgghmg67wmic7elw4z41pke1" ManagementToken = "jg3ajndnq63sywcd50gbs5dskdc9ckkysb0nsqmfz08nwf17nl" ActiveUserUUID = "zzzzz-tpzed-xurymjxw79nv3jz" + FederatedActiveUserUUID = "zbbbb-tpzed-xurymjxw79nv3jz" SpectatorUserUUID = "zzzzz-tpzed-l1s2piq4t4mps8r" UserAgreementCollection = "zzzzz-4zz18-uukreo9rbgwsujr" // user_agreement_in_anonymously_accessible_project FooCollection = "zzzzz-4zz18-fy296fx3hot09f7" diff --git a/services/api/test/fixtures/users.yml b/services/api/test/fixtures/users.yml index 8087952133..8fb800c5f9 100644 --- a/services/api/test/fixtures/users.yml +++ b/services/api/test/fixtures/users.yml @@ -84,6 +84,22 @@ active: role: Computational biologist getting_started_shown: 2015-03-26 12:34:56.789000000 Z +federated_active: + owner_uuid: zzzzz-tpzed-000000000000000 + uuid: zbbbb-tpzed-xurymjxw79nv3jz + email: zbbbb-active-user@arvados.local + first_name: Active + last_name: User + identity_url: https://active-user.openid.local + is_active: true + is_admin: false + username: federatedactive + prefs: + profile: + organization: example.com + role: Computational biologist + getting_started_shown: 2015-03-26 12:34:56.789000000 Z + project_viewer: owner_uuid: zzzzz-tpzed-000000000000000 uuid: zzzzz-tpzed-projectviewer1a diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go index 3c89103f38..ae2ca58421 100644 --- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go +++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go @@ -7,14 +7,12 @@ package main // Dispatcher service for Crunch that submits containers to the slurm queue. import ( - "bytes" "context" "flag" "fmt" "log" "math" "os" - "os/exec" "regexp" "strings" "time" @@ -43,9 +41,12 @@ type Config struct { // Minimum time between two attempts to run the same container MinRetryPeriod arvados.Duration + + slurm Slurm } func main() { + theConfig.slurm = &slurmCLI{} err := doMain() if err != nil { log.Fatal(err) @@ -175,8 +176,7 @@ func niceness(priority int) int { return (1000 - priority) * 10 } -// sbatchCmd -func sbatchFunc(container arvados.Container) *exec.Cmd { +func sbatchArgs(container arvados.Container) []string { mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576))) var disk int64 @@ -198,61 +198,22 @@ func sbatchFunc(container arvados.Container) *exec.Cmd { sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ","))) } - return exec.Command("sbatch", sbatchArgs...) -} - -// scancelCmd -func scancelFunc(container arvados.Container) *exec.Cmd { - return exec.Command("scancel", "--name="+container.UUID) -} - -// scontrolCmd -func scontrolFunc(container arvados.Container) *exec.Cmd { - return exec.Command("scontrol", "update", "JobName="+container.UUID, fmt.Sprintf("Nice=%d", niceness(container.Priority))) + return sbatchArgs } -// Wrap these so that they can be overridden by tests -var sbatchCmd = sbatchFunc -var scancelCmd = scancelFunc -var scontrolCmd = scontrolFunc - -// Submit job to slurm using sbatch. func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunchRunCommand []string) error { - cmd := sbatchCmd(container) - - // Send a tiny script on stdin to execute the crunch-run - // command (slurm requires this to be a #! script) - // append() here avoids modifying crunchRunCommand's // underlying array, which is shared with other goroutines. - args := append([]string(nil), crunchRunCommand...) - args = append(args, container.UUID) - cmd.Stdin = strings.NewReader(execScript(args)) - - var stdout, stderr bytes.Buffer - cmd.Stdout = &stdout - cmd.Stderr = &stderr + crArgs := append([]string(nil), crunchRunCommand...) + crArgs = append(crArgs, container.UUID) + crScript := strings.NewReader(execScript(crArgs)) - // Mutex between squeue sync and running sbatch or scancel. sqCheck.L.Lock() defer sqCheck.L.Unlock() - log.Printf("exec sbatch %+q", cmd.Args) - err := cmd.Run() - - switch err.(type) { - case nil: - log.Printf("sbatch succeeded: %q", strings.TrimSpace(stdout.String())) - return nil - - case *exec.ExitError: - dispatcher.Unlock(container.UUID) - return fmt.Errorf("sbatch %+q failed: %v (stderr: %q)", cmd.Args, err, stderr.Bytes()) - - default: - dispatcher.Unlock(container.UUID) - return fmt.Errorf("exec failed: %v", err) - } + sbArgs := sbatchArgs(container) + log.Printf("running sbatch %+q", sbArgs) + return theConfig.slurm.Batch(crScript, sbArgs) } // Submit a container to the slurm queue (or resume monitoring if it's @@ -313,10 +274,8 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados } else if updated.Priority == 0 { log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority) scancel(ctr) - } else if niceness(updated.Priority) != sqCheck.GetNiceness(ctr.UUID) && sqCheck.GetNiceness(ctr.UUID) != -1 { - // dynamically adjust priority - log.Printf("Container priority %v != %v", niceness(updated.Priority), sqCheck.GetNiceness(ctr.UUID)) - scontrolUpdate(updated) + } else { + renice(updated) } } } @@ -324,12 +283,11 @@ func run(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados func scancel(ctr arvados.Container) { sqCheck.L.Lock() - cmd := scancelCmd(ctr) - msg, err := cmd.CombinedOutput() + err := theConfig.slurm.Cancel(ctr.UUID) sqCheck.L.Unlock() if err != nil { - log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg) + log.Printf("scancel: %s", err) time.Sleep(time.Second) } else if sqCheck.HasUUID(ctr.UUID) { log.Printf("container %s is still in squeue after scancel", ctr.UUID) @@ -337,17 +295,24 @@ func scancel(ctr arvados.Container) { } } -func scontrolUpdate(ctr arvados.Container) { +func renice(ctr arvados.Container) { + nice := niceness(ctr.Priority) + oldnice := sqCheck.GetNiceness(ctr.UUID) + if nice == oldnice || oldnice == -1 { + return + } + log.Printf("updating slurm nice value to %d (was %d)", nice, oldnice) sqCheck.L.Lock() - cmd := scontrolCmd(ctr) - msg, err := cmd.CombinedOutput() + err := theConfig.slurm.Renice(ctr.UUID, nice) sqCheck.L.Unlock() if err != nil { - log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg) + log.Printf("renice: %s", err) time.Sleep(time.Second) - } else if sqCheck.HasUUID(ctr.UUID) { - log.Printf("Container %s priority is now %v, niceness is now %v", + return + } + if sqCheck.HasUUID(ctr.UUID) { + log.Printf("container %s has arvados priority %d, slurm nice %d", ctr.UUID, ctr.Priority, sqCheck.GetNiceness(ctr.UUID)) } } diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go index a823755379..830976d66b 100644 --- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go +++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go @@ -7,6 +7,7 @@ package main import ( "bytes" "context" + "errors" "fmt" "io" "io/ioutil" @@ -64,51 +65,55 @@ func (s *MockArvadosServerSuite) TearDownTest(c *C) { arvadostest.ResetEnv() } -func (s *TestSuite) integrationTest(c *C, - newSqueueCmd func() *exec.Cmd, - newScancelCmd func(arvados.Container) *exec.Cmd, - newSbatchCmd func(arvados.Container) *exec.Cmd, - newScontrolCmd func(arvados.Container) *exec.Cmd, - sbatchCmdComps []string, - runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container { - arvadostest.ResetEnv() +type slurmFake struct { + didBatch [][]string + didCancel []string + didRenice [][]string + queue string + // If non-nil, run this func during the 2nd+ call to Cancel() + onCancel func() + // Error returned by Batch() + errBatch error +} - arv, err := arvadosclient.MakeArvadosClient() - c.Assert(err, IsNil) +func (sf *slurmFake) Batch(script io.Reader, args []string) error { + sf.didBatch = append(sf.didBatch, args) + return sf.errBatch +} - var sbatchCmdLine []string +func (sf *slurmFake) QueueCommand(args []string) *exec.Cmd { + return exec.Command("echo", sf.queue) +} - // Override sbatchCmd - defer func(orig func(arvados.Container) *exec.Cmd) { - sbatchCmd = orig - }(sbatchCmd) +func (sf *slurmFake) Renice(name string, nice int) error { + sf.didRenice = append(sf.didRenice, []string{name, fmt.Sprintf("%d", nice)}) + return nil +} - if newSbatchCmd != nil { - sbatchCmd = newSbatchCmd - } else { - sbatchCmd = func(container arvados.Container) *exec.Cmd { - sbatchCmdLine = sbatchFunc(container).Args - return exec.Command("sh") - } +func (sf *slurmFake) Cancel(name string) error { + sf.didCancel = append(sf.didCancel, name) + if len(sf.didCancel) == 1 { + // simulate error on first attempt + return errors.New("something terrible happened") + } + if sf.onCancel != nil { + sf.onCancel() } + return nil +} - // Override squeueCmd - defer func(orig func() *exec.Cmd) { - squeueCmd = orig - }(squeueCmd) - squeueCmd = newSqueueCmd +func (s *TestSuite) integrationTest(c *C, slurm *slurmFake, + expectBatch [][]string, + runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container { + arvadostest.ResetEnv() - // Override scancel - defer func(orig func(arvados.Container) *exec.Cmd) { - scancelCmd = orig - }(scancelCmd) - scancelCmd = newScancelCmd + arv, err := arvadosclient.MakeArvadosClient() + c.Assert(err, IsNil) - // Override scontrol - defer func(orig func(arvados.Container) *exec.Cmd) { - scontrolCmd = orig - }(scontrolCmd) - scontrolCmd = newScontrolCmd + defer func(orig Slurm) { + theConfig.slurm = orig + }(theConfig.slurm) + theConfig.slurm = slurm // There should be one queued container params := arvadosclient.Dict{ @@ -130,6 +135,7 @@ func (s *TestSuite) integrationTest(c *C, RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) { go func() { runContainer(disp, ctr) + slurm.queue = "" doneRun <- struct{}{} }() run(disp, ctr, status) @@ -145,7 +151,7 @@ func (s *TestSuite) integrationTest(c *C, sqCheck.Stop() - c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps) + c.Check(slurm.didBatch, DeepEquals, expectBatch) // There should be no queued containers now err = arv.List("containers", params, &containers) @@ -160,77 +166,47 @@ func (s *TestSuite) integrationTest(c *C, } func (s *TestSuite) TestIntegrationNormal(c *C) { - done := false container := s.integrationTest(c, - func() *exec.Cmd { - if done { - return exec.Command("true") - } else { - return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100") - } - }, - nil, + &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}, nil, - nil, - []string(nil), func(dispatcher *dispatch.Dispatcher, container arvados.Container) { dispatcher.UpdateState(container.UUID, dispatch.Running) time.Sleep(3 * time.Second) dispatcher.UpdateState(container.UUID, dispatch.Complete) - done = true }) c.Check(container.State, Equals, arvados.ContainerStateComplete) } func (s *TestSuite) TestIntegrationCancel(c *C) { - var cmd *exec.Cmd - var scancelCmdLine []string - attempt := 0 - + slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"} + readyToCancel := make(chan bool) + slurm.onCancel = func() { <-readyToCancel } container := s.integrationTest(c, - func() *exec.Cmd { - if cmd != nil && cmd.ProcessState != nil { - return exec.Command("true") - } else { - return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100") - } - }, - func(container arvados.Container) *exec.Cmd { - if attempt++; attempt == 1 { - return exec.Command("false") - } else { - scancelCmdLine = scancelFunc(container).Args - cmd = exec.Command("echo") - return cmd - } - }, - nil, + slurm, nil, - []string(nil), func(dispatcher *dispatch.Dispatcher, container arvados.Container) { dispatcher.UpdateState(container.UUID, dispatch.Running) - time.Sleep(1 * time.Second) + time.Sleep(time.Second) dispatcher.Arv.Update("containers", container.UUID, arvadosclient.Dict{ "container": arvadosclient.Dict{"priority": 0}}, nil) + readyToCancel <- true + close(readyToCancel) }) c.Check(container.State, Equals, arvados.ContainerStateCancelled) - c.Check(scancelCmdLine, DeepEquals, []string{"scancel", "--name=zzzzz-dz642-queuedcontainer"}) + c.Check(len(slurm.didCancel) > 1, Equals, true) + c.Check(slurm.didCancel[:2], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "zzzzz-dz642-queuedcontainer"}) } func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) { - container := s.integrationTest(c, - func() *exec.Cmd { return exec.Command("echo") }, - nil, - nil, - nil, - []string{"sbatch", + container := s.integrationTest(c, &slurmFake{}, + [][]string{{ fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"), fmt.Sprintf("--mem=%d", 11445), fmt.Sprintf("--cpus-per-task=%d", 4), fmt.Sprintf("--tmp=%d", 45777), - fmt.Sprintf("--nice=%d", 9990)}, + fmt.Sprintf("--nice=%d", 9990)}}, func(dispatcher *dispatch.Dispatcher, container arvados.Container) { dispatcher.UpdateState(container.UUID, dispatch.Running) time.Sleep(3 * time.Second) @@ -241,13 +217,8 @@ func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) { func (s *TestSuite) TestSbatchFail(c *C) { container := s.integrationTest(c, - func() *exec.Cmd { return exec.Command("echo") }, - nil, - func(container arvados.Container) *exec.Cmd { - return exec.Command("false") - }, - nil, - []string(nil), + &slurmFake{errBatch: errors.New("something terrible happened")}, + [][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--mem=11445", "--cpus-per-task=4", "--tmp=45777", "--nice=9990"}}, func(dispatcher *dispatch.Dispatcher, container arvados.Container) { dispatcher.UpdateState(container.UUID, dispatch.Running) dispatcher.UpdateState(container.UUID, dispatch.Complete) @@ -387,71 +358,47 @@ func (s *MockArvadosServerSuite) TestSbatchFuncWithConfigArgs(c *C) { } func testSbatchFuncWithArgs(c *C, args []string) { + defer func() { theConfig.SbatchArguments = nil }() theConfig.SbatchArguments = append(theConfig.SbatchArguments, args...) container := arvados.Container{ UUID: "123", RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2}, Priority: 1} - sbatchCmd := sbatchFunc(container) var expected []string - expected = append(expected, "sbatch") expected = append(expected, theConfig.SbatchArguments...) expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990") - - c.Check(sbatchCmd.Args, DeepEquals, expected) + c.Check(sbatchArgs(container), DeepEquals, expected) } func (s *MockArvadosServerSuite) TestSbatchPartition(c *C) { - theConfig.SbatchArguments = nil container := arvados.Container{ UUID: "123", RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1}, SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}}, Priority: 1} - sbatchCmd := sbatchFunc(container) - var expected []string - expected = append(expected, "sbatch") - expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=9990", "--partition=blurb,b2") - - c.Check(sbatchCmd.Args, DeepEquals, expected) + c.Check(sbatchArgs(container), DeepEquals, []string{ + "--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=9990", + "--partition=blurb,b2", + }) } func (s *TestSuite) TestIntegrationChangePriority(c *C) { - var scontrolCmdLine []string - step := 0 - - container := s.integrationTest(c, - func() *exec.Cmd { - if step == 0 { - return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100") - } else if step == 1 { - return exec.Command("echo", "zzzzz-dz642-queuedcontainer 4000 100") - } else { - return exec.Command("echo") - } - }, - func(arvados.Container) *exec.Cmd { return exec.Command("true") }, - nil, - func(container arvados.Container) *exec.Cmd { - scontrolCmdLine = scontrolFunc(container).Args - step = 1 - return exec.Command("true") - }, - []string(nil), + slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"} + container := s.integrationTest(c, slurm, nil, func(dispatcher *dispatch.Dispatcher, container arvados.Container) { dispatcher.UpdateState(container.UUID, dispatch.Running) - time.Sleep(1 * time.Second) + time.Sleep(time.Second) dispatcher.Arv.Update("containers", container.UUID, arvadosclient.Dict{ "container": arvadosclient.Dict{"priority": 600}}, nil) - time.Sleep(1 * time.Second) - step = 2 + time.Sleep(time.Second) dispatcher.UpdateState(container.UUID, dispatch.Complete) }) c.Check(container.State, Equals, arvados.ContainerStateComplete) - c.Check(scontrolCmdLine, DeepEquals, []string{"scontrol", "update", "JobName=zzzzz-dz642-queuedcontainer", "Nice=4000"}) + c.Assert(len(slurm.didRenice), Not(Equals), 0) + c.Check(slurm.didRenice[len(slurm.didRenice)-1], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "4000"}) } diff --git a/services/crunch-dispatch-slurm/slurm.go b/services/crunch-dispatch-slurm/slurm.go new file mode 100644 index 0000000000..bd193778b3 --- /dev/null +++ b/services/crunch-dispatch-slurm/slurm.go @@ -0,0 +1,73 @@ +// Copyright (C) The Arvados Authors. All rights reserved. +// +// SPDX-License-Identifier: AGPL-3.0 + +package main + +import ( + "fmt" + "io" + "log" + "os/exec" + "strings" +) + +type Slurm interface { + Cancel(name string) error + Renice(name string, nice int) error + QueueCommand(args []string) *exec.Cmd + Batch(script io.Reader, args []string) error +} + +type slurmCLI struct{} + +func (scli *slurmCLI) Batch(script io.Reader, args []string) error { + return scli.run(script, "sbatch", args) +} + +func (scli *slurmCLI) Cancel(name string) error { + for _, args := range [][]string{ + // If the slurm job hasn't started yet, remove it from + // the queue. + {"--state=pending"}, + // If the slurm job has started, send SIGTERM. If we + // cancel a running job without a --signal argument, + // slurm will send SIGTERM and then (after some + // site-configured interval) SIGKILL. This would kill + // crunch-run without stopping the container, which we + // don't want. + {"--batch", "--signal=TERM", "--state=running"}, + {"--batch", "--signal=TERM", "--state=suspended"}, + } { + err := scli.run(nil, "scancel", append([]string{"--name=" + name}, args...)) + if err != nil { + // scancel exits 0 if no job matches the given + // name and state. Any error from scancel here + // really indicates something is wrong. + return err + } + } + return nil +} + +func (scli *slurmCLI) QueueCommand(args []string) *exec.Cmd { + return exec.Command("squeue", args...) +} + +func (scli *slurmCLI) Renice(name string, nice int) error { + return scli.run(nil, "scontrol", []string{"update", "JobName=" + name, fmt.Sprintf("Nice=%d", nice)}) +} + +func (scli *slurmCLI) run(stdin io.Reader, prog string, args []string) error { + cmd := exec.Command(prog, args...) + cmd.Stdin = stdin + out, err := cmd.CombinedOutput() + outTrim := strings.TrimSpace(string(out)) + if err != nil || len(out) > 0 { + log.Printf("%q %q: %q", cmd.Path, cmd.Args, outTrim) + } + if err != nil { + err = fmt.Errorf("%s: %s (%q)", cmd.Path, err, outTrim) + } + return err +} diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go index 819c2d2510..5ecfe8ff2f 100644 --- a/services/crunch-dispatch-slurm/squeue.go +++ b/services/crunch-dispatch-slurm/squeue.go @@ -8,7 +8,6 @@ import ( "bytes" "fmt" "log" - "os/exec" "strings" "sync" "time" @@ -29,12 +28,6 @@ type SqueueChecker struct { sync.Cond } -func squeueFunc() *exec.Cmd { - return exec.Command("squeue", "--all", "--format=%j %y %Q") -} - -var squeueCmd = squeueFunc - // HasUUID checks if a given container UUID is in the slurm queue. // This does not run squeue directly, but instead blocks until woken // up by next successful update of squeue. @@ -84,7 +77,7 @@ func (sqc *SqueueChecker) check() { sqc.L.Lock() defer sqc.L.Unlock() - cmd := squeueCmd() + cmd := theConfig.slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"}) stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{} cmd.Stdout, cmd.Stderr = stdout, stderr if err := cmd.Run(); err != nil { diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go index b480c068c5..a9f1c25d37 100644 --- a/services/crunch-run/crunchrun.go +++ b/services/crunch-run/crunchrun.go @@ -6,7 +6,6 @@ package main import ( "bytes" - "context" "encoding/json" "errors" "flag" @@ -33,6 +32,7 @@ import ( "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/keepclient" "git.curoverse.com/arvados.git/sdk/go/manifest" + "golang.org/x/net/context" dockertypes "github.com/docker/docker/api/types" dockercontainer "github.com/docker/docker/api/types/container" @@ -75,60 +75,13 @@ type ThinDockerClient interface { ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig, networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error - ContainerStop(ctx context.Context, container string, timeout *time.Duration) error + ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) } -// ThinDockerClientProxy is a proxy implementation of ThinDockerClient -// that executes the docker requests on dockerclient.Client -type ThinDockerClientProxy struct { - Docker *dockerclient.Client -} - -// ContainerAttach invokes dockerclient.Client.ContainerAttach -func (proxy ThinDockerClientProxy) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) { - return proxy.Docker.ContainerAttach(ctx, container, options) -} - -// ContainerCreate invokes dockerclient.Client.ContainerCreate -func (proxy ThinDockerClientProxy) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig, - networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) { - return proxy.Docker.ContainerCreate(ctx, config, hostConfig, networkingConfig, containerName) -} - -// ContainerStart invokes dockerclient.Client.ContainerStart -func (proxy ThinDockerClientProxy) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error { - return proxy.Docker.ContainerStart(ctx, container, options) -} - -// ContainerStop invokes dockerclient.Client.ContainerStop -func (proxy ThinDockerClientProxy) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error { - return proxy.Docker.ContainerStop(ctx, container, timeout) -} - -// ContainerWait invokes dockerclient.Client.ContainerWait -func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) { - return proxy.Docker.ContainerWait(ctx, container, condition) -} - -// ImageInspectWithRaw invokes dockerclient.Client.ImageInspectWithRaw -func (proxy ThinDockerClientProxy) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) { - return proxy.Docker.ImageInspectWithRaw(ctx, image) -} - -// ImageLoad invokes dockerclient.Client.ImageLoad -func (proxy ThinDockerClientProxy) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) { - return proxy.Docker.ImageLoad(ctx, input, quiet) -} - -// ImageRemove invokes dockerclient.Client.ImageRemove -func (proxy ThinDockerClientProxy) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) { - return proxy.Docker.ImageRemove(ctx, image, options) -} - // ContainerRunner is the main stateful struct used for a single execution of a // container. type ContainerRunner struct { @@ -150,21 +103,23 @@ type ContainerRunner struct { LogsPDH *string RunArvMount MkTempDir - ArvMount *exec.Cmd - ArvMountPoint string - HostOutputDir string - CleanupTempDir []string - Binds []string - Volumes map[string]struct{} - OutputPDH *string - SigChan chan os.Signal - ArvMountExit chan error - finalState string - - statLogger io.WriteCloser - statReporter *crunchstat.Reporter - statInterval time.Duration - cgroupRoot string + ArvMount *exec.Cmd + ArvMountPoint string + HostOutputDir string + Binds []string + Volumes map[string]struct{} + OutputPDH *string + SigChan chan os.Signal + ArvMountExit chan error + finalState string + parentTemp string + + statLogger io.WriteCloser + statReporter *crunchstat.Reporter + hoststatLogger io.WriteCloser + hoststatReporter *crunchstat.Reporter + statInterval time.Duration + cgroupRoot string // What we expect the container's cgroup parent to be. expectCgroupParent string // What we tell docker to use as the container's cgroup @@ -180,7 +135,6 @@ type ContainerRunner struct { setCgroupParent string cStateLock sync.Mutex - cStarted bool // StartContainer() succeeded cCancelled bool // StopContainer() invoked enableNetwork string // one of "default" or "always" @@ -197,11 +151,10 @@ func (runner *ContainerRunner) setupSignals() { signal.Notify(runner.SigChan, syscall.SIGQUIT) go func(sig chan os.Signal) { - s := <-sig - if s != nil { - runner.CrunchLog.Printf("Caught signal %v", s) + for s := range sig { + runner.CrunchLog.Printf("caught signal: %v", s) + runner.stop() } - runner.stop() }(runner.SigChan) } @@ -209,25 +162,20 @@ func (runner *ContainerRunner) setupSignals() { func (runner *ContainerRunner) stop() { runner.cStateLock.Lock() defer runner.cStateLock.Unlock() - if runner.cCancelled { + if runner.ContainerID == "" { return } runner.cCancelled = true - if runner.cStarted { - timeout := time.Duration(10) - err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &(timeout)) - if err != nil { - runner.CrunchLog.Printf("StopContainer failed: %s", err) - } - // Suppress multiple calls to stop() - runner.cStarted = false + runner.CrunchLog.Printf("removing container") + err := runner.Docker.ContainerRemove(context.TODO(), runner.ContainerID, dockertypes.ContainerRemoveOptions{Force: true}) + if err != nil { + runner.CrunchLog.Printf("error removing container: %s", err) } } func (runner *ContainerRunner) stopSignals() { if runner.SigChan != nil { signal.Stop(runner.SigChan) - close(runner.SigChan) } } @@ -379,11 +327,42 @@ func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string, token string) ( func (runner *ContainerRunner) SetupArvMountPoint(prefix string) (err error) { if runner.ArvMountPoint == "" { - runner.ArvMountPoint, err = runner.MkTempDir("", prefix) + runner.ArvMountPoint, err = runner.MkTempDir(runner.parentTemp, prefix) } return } +func copyfile(src string, dst string) (err error) { + srcfile, err := os.Open(src) + if err != nil { + return + } + + os.MkdirAll(path.Dir(dst), 0777) + + dstfile, err := os.Create(dst) + if err != nil { + return + } + _, err = io.Copy(dstfile, srcfile) + if err != nil { + return + } + + err = srcfile.Close() + err2 := dstfile.Close() + + if err != nil { + return + } + + if err2 != nil { + return err2 + } + + return nil +} + func (runner *ContainerRunner) SetupMounts() (err error) { err = runner.SetupArvMountPoint("keep") if err != nil { @@ -411,6 +390,11 @@ func (runner *ContainerRunner) SetupMounts() (err error) { runner.Binds = nil runner.Volumes = make(map[string]struct{}) needCertMount := true + type copyFile struct { + src string + bind string + } + var copyFiles []copyFile var binds []string for bind := range runner.Container.Mounts { @@ -466,7 +450,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { pdhOnly = false src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID) } else if mnt.PortableDataHash != "" { - if mnt.Writable { + if mnt.Writable && !strings.HasPrefix(bind, runner.Container.OutputPath+"/") { return fmt.Errorf("Can never write to a collection specified by portable data hash") } idx := strings.Index(mnt.PortableDataHash, "/") @@ -493,10 +477,12 @@ func (runner *ContainerRunner) SetupMounts() (err error) { if mnt.Writable { if bind == runner.Container.OutputPath { runner.HostOutputDir = src + runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind)) } else if strings.HasPrefix(bind, runner.Container.OutputPath+"/") { - return fmt.Errorf("Writable mount points are not permitted underneath the output_path: %v", bind) + copyFiles = append(copyFiles, copyFile{src, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]}) + } else { + runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind)) } - runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind)) } else { runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind)) } @@ -504,7 +490,7 @@ func (runner *ContainerRunner) SetupMounts() (err error) { case mnt.Kind == "tmp": var tmpdir string - tmpdir, err = runner.MkTempDir("", "") + tmpdir, err = runner.MkTempDir(runner.parentTemp, "tmp") if err != nil { return fmt.Errorf("While creating mount temp dir: %v", err) } @@ -516,7 +502,6 @@ func (runner *ContainerRunner) SetupMounts() (err error) { if staterr != nil { return fmt.Errorf("While Chmod temp dir: %v", err) } - runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir) runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", tmpdir, bind)) if bind == runner.Container.OutputPath { runner.HostOutputDir = tmpdir @@ -532,11 +517,10 @@ func (runner *ContainerRunner) SetupMounts() (err error) { // can ensure the file is world-readable // inside the container, without having to // make it world-readable on the docker host. - tmpdir, err := runner.MkTempDir("", "") + tmpdir, err := runner.MkTempDir(runner.parentTemp, "json") if err != nil { return fmt.Errorf("creating temp dir: %v", err) } - runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir) tmpfn := filepath.Join(tmpdir, "mountdata.json") err = ioutil.WriteFile(tmpfn, jsondata, 0644) if err != nil { @@ -545,11 +529,10 @@ func (runner *ContainerRunner) SetupMounts() (err error) { runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", tmpfn, bind)) case mnt.Kind == "git_tree": - tmpdir, err := runner.MkTempDir("", "") + tmpdir, err := runner.MkTempDir(runner.parentTemp, "git_tree") if err != nil { return fmt.Errorf("creating temp dir: %v", err) } - runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir) err = gitMount(mnt).extractTree(runner.ArvClient, tmpdir, token) if err != nil { return err @@ -591,59 +574,118 @@ func (runner *ContainerRunner) SetupMounts() (err error) { } } + for _, cp := range copyFiles { + st, err := os.Stat(cp.src) + if err != nil { + return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err) + } + if st.IsDir() { + err = filepath.Walk(cp.src, func(walkpath string, walkinfo os.FileInfo, walkerr error) error { + if walkerr != nil { + return walkerr + } + target := path.Join(cp.bind, walkpath[len(cp.src):]) + if walkinfo.Mode().IsRegular() { + copyerr := copyfile(walkpath, target) + if copyerr != nil { + return copyerr + } + return os.Chmod(target, walkinfo.Mode()|0777) + } else if walkinfo.Mode().IsDir() { + mkerr := os.MkdirAll(target, 0777) + if mkerr != nil { + return mkerr + } + return os.Chmod(target, walkinfo.Mode()|os.ModeSetgid|0777) + } else { + return fmt.Errorf("Source %q is not a regular file or directory", cp.src) + } + }) + } else if st.Mode().IsRegular() { + err = copyfile(cp.src, cp.bind) + if err == nil { + err = os.Chmod(cp.bind, st.Mode()|0777) + } + } + if err != nil { + return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err) + } + } + return nil } func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) { // Handle docker log protocol // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container + defer close(runner.loggingDone) header := make([]byte, 8) - for { - _, readerr := io.ReadAtLeast(containerReader, header, 8) - - if readerr == nil { - readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24) - if header[0] == 1 { - // stdout - _, readerr = io.CopyN(runner.Stdout, containerReader, readsize) - } else { - // stderr - _, readerr = io.CopyN(runner.Stderr, containerReader, readsize) + var err error + for err == nil { + _, err = io.ReadAtLeast(containerReader, header, 8) + if err != nil { + if err == io.EOF { + err = nil } + break } + readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24) + if header[0] == 1 { + // stdout + _, err = io.CopyN(runner.Stdout, containerReader, readsize) + } else { + // stderr + _, err = io.CopyN(runner.Stderr, containerReader, readsize) + } + } - if readerr != nil { - if readerr != io.EOF { - runner.CrunchLog.Printf("While reading docker logs: %v", readerr) - } - - closeerr := runner.Stdout.Close() - if closeerr != nil { - runner.CrunchLog.Printf("While closing stdout logs: %v", closeerr) - } + if err != nil { + runner.CrunchLog.Printf("error reading docker logs: %v", err) + } - closeerr = runner.Stderr.Close() - if closeerr != nil { - runner.CrunchLog.Printf("While closing stderr logs: %v", closeerr) - } + err = runner.Stdout.Close() + if err != nil { + runner.CrunchLog.Printf("error closing stdout logs: %v", err) + } - if runner.statReporter != nil { - runner.statReporter.Stop() - closeerr = runner.statLogger.Close() - if closeerr != nil { - runner.CrunchLog.Printf("While closing crunchstat logs: %v", closeerr) - } - } + err = runner.Stderr.Close() + if err != nil { + runner.CrunchLog.Printf("error closing stderr logs: %v", err) + } - runner.loggingDone <- true - close(runner.loggingDone) - return + if runner.statReporter != nil { + runner.statReporter.Stop() + err = runner.statLogger.Close() + if err != nil { + runner.CrunchLog.Printf("error closing crunchstat logs: %v", err) } } } -func (runner *ContainerRunner) StartCrunchstat() { +func (runner *ContainerRunner) stopHoststat() error { + if runner.hoststatReporter == nil { + return nil + } + runner.hoststatReporter.Stop() + err := runner.hoststatLogger.Close() + if err != nil { + return fmt.Errorf("error closing hoststat logs: %v", err) + } + return nil +} + +func (runner *ContainerRunner) startHoststat() { + runner.hoststatLogger = NewThrottledLogger(runner.NewLogWriter("hoststat")) + runner.hoststatReporter = &crunchstat.Reporter{ + Logger: log.New(runner.hoststatLogger, "", 0), + CgroupRoot: runner.cgroupRoot, + PollPeriod: runner.statInterval, + } + runner.hoststatReporter.Start() +} + +func (runner *ContainerRunner) startCrunchstat() { runner.statLogger = NewThrottledLogger(runner.NewLogWriter("crunchstat")) runner.statReporter = &crunchstat.Reporter{ CID: runner.ContainerID, @@ -973,46 +1015,38 @@ func (runner *ContainerRunner) StartContainer() error { } return fmt.Errorf("could not start container: %v%s", err, advice) } - runner.cStarted = true return nil } // WaitFinish waits for the container to terminate, capture the exit code, and // close the stdout/stderr logging. -func (runner *ContainerRunner) WaitFinish() (err error) { +func (runner *ContainerRunner) WaitFinish() error { runner.CrunchLog.Print("Waiting for container to finish") - waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, "not-running") + waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning) + arvMountExit := runner.ArvMountExit + for { + select { + case waitBody := <-waitOk: + runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode) + code := int(waitBody.StatusCode) + runner.ExitCode = &code + + // wait for stdout/stderr to complete + <-runner.loggingDone + return nil + + case err := <-waitErr: + return fmt.Errorf("container wait: %v", err) - go func() { - <-runner.ArvMountExit - if runner.cStarted { + case <-arvMountExit: runner.CrunchLog.Printf("arv-mount exited while container is still running. Stopping container.") runner.stop() + // arvMountExit will always be ready now that + // it's closed, but that doesn't interest us. + arvMountExit = nil } - }() - - var waitBody dockercontainer.ContainerWaitOKBody - select { - case waitBody = <-waitOk: - case err = <-waitErr: - } - - // Container isn't running any more - runner.cStarted = false - - if err != nil { - return fmt.Errorf("container wait: %v", err) } - - runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode) - code := int(waitBody.StatusCode) - runner.ExitCode = &code - - // wait for stdout/stderr to complete - <-runner.loggingDone - - return nil } var ErrNotInOutputDir = fmt.Errorf("Must point to path within the output directory") @@ -1141,7 +1175,7 @@ func (runner *ContainerRunner) UploadOutputFile( // go through mounts and try reverse map to collection reference for _, bind := range binds { mnt := runner.Container.Mounts[bind] - if tgt == bind || strings.HasPrefix(tgt, bind+"/") { + if (tgt == bind || strings.HasPrefix(tgt, bind+"/")) && !mnt.Writable { // get path relative to bind targetSuffix := tgt[len(bind):] @@ -1191,10 +1225,6 @@ func (runner *ContainerRunner) UploadOutputFile( // HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories func (runner *ContainerRunner) CaptureOutput() error { - if runner.finalState != "Complete" { - return nil - } - if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI { // Output may have been set directly by the container, so // refresh the container record to check. @@ -1284,7 +1314,7 @@ func (runner *ContainerRunner) CaptureOutput() error { continue } - if mnt.ExcludeFromOutput == true { + if mnt.ExcludeFromOutput == true || mnt.Writable { continue } @@ -1406,10 +1436,8 @@ func (runner *ContainerRunner) CleanupDirs() { } } - for _, tmpdir := range runner.CleanupTempDir { - if rmerr := os.RemoveAll(tmpdir); rmerr != nil { - runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr) - } + if rmerr := os.RemoveAll(runner.parentTemp); rmerr != nil { + runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", runner.parentTemp, rmerr) } } @@ -1579,6 +1607,7 @@ func (runner *ContainerRunner) Run() (err error) { } checkErr(runner.CaptureOutput()) + checkErr(runner.stopHoststat()) checkErr(runner.CommitLogs()) checkErr(runner.UpdateContainerFinal()) }() @@ -1587,9 +1616,8 @@ func (runner *ContainerRunner) Run() (err error) { if err != nil { return } - - // setup signal handling runner.setupSignals() + runner.startHoststat() // check for and/or load image err = runner.LoadImage() @@ -1638,7 +1666,7 @@ func (runner *ContainerRunner) Run() (err error) { } runner.finalState = "Cancelled" - runner.StartCrunchstat() + runner.startCrunchstat() err = runner.StartContainer() if err != nil { @@ -1647,7 +1675,7 @@ func (runner *ContainerRunner) Run() (err error) { } err = runner.WaitFinish() - if err == nil { + if err == nil && !runner.IsCancelled() { runner.finalState = "Complete" } return @@ -1739,10 +1767,8 @@ func main() { // API version 1.21 corresponds to Docker 1.9, which is currently the // minimum version we want to support. docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil) - dockerClientProxy := ThinDockerClientProxy{Docker: docker} - - cr := NewContainerRunner(api, kc, dockerClientProxy, containerId) + cr := NewContainerRunner(api, kc, docker, containerId) if dockererr != nil { cr.CrunchLog.Printf("%s: %v", containerId, dockererr) cr.checkBrokenNode(dockererr) @@ -1750,6 +1776,12 @@ func main() { os.Exit(1) } + parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerId+".") + if tmperr != nil { + log.Fatalf("%s: %v", containerId, tmperr) + } + + cr.parentTemp = parentTemp cr.statInterval = *statInterval cr.cgroupRoot = *cgroupRoot cr.expectCgroupParent = *cgroupParent diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go index 4979cf8a0c..94b713355d 100644 --- a/services/crunch-run/crunchrun_test.go +++ b/services/crunch-run/crunchrun_test.go @@ -7,7 +7,6 @@ package main import ( "bufio" "bytes" - "context" "crypto/md5" "encoding/json" "errors" @@ -17,7 +16,6 @@ import ( "net" "os" "os/exec" - "path/filepath" "runtime/pprof" "sort" "strings" @@ -30,6 +28,7 @@ import ( "git.curoverse.com/arvados.git/sdk/go/arvadosclient" "git.curoverse.com/arvados.git/sdk/go/arvadostest" "git.curoverse.com/arvados.git/sdk/go/manifest" + "golang.org/x/net/context" dockertypes "github.com/docker/docker/api/types" dockercontainer "github.com/docker/docker/api/types/container" @@ -42,11 +41,17 @@ func TestCrunchExec(t *testing.T) { TestingT(t) } -type TestSuite struct{} - // Gocheck boilerplate var _ = Suite(&TestSuite{}) +type TestSuite struct { + docker *TestDockerClient +} + +func (s *TestSuite) SetUpTest(c *C) { + s.docker = NewTestDockerClient() +} + type ArvTestClient struct { Total int64 Calls int @@ -88,18 +93,18 @@ type TestDockerClient struct { logReader io.ReadCloser logWriter io.WriteCloser fn func(t *TestDockerClient) - finish int + exitCode int stop chan bool cwd string env []string api *ArvTestClient realTemp string + calledWait bool } -func NewTestDockerClient(exitCode int) *TestDockerClient { +func NewTestDockerClient() *TestDockerClient { t := &TestDockerClient{} t.logReader, t.logWriter = io.Pipe() - t.finish = exitCode t.stop = make(chan bool, 1) t.cwd = "/" return t @@ -131,16 +136,16 @@ func (t *TestDockerClient) ContainerCreate(ctx context.Context, config *dockerco } func (t *TestDockerClient) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error { - if t.finish == 3 { + if t.exitCode == 3 { return errors.New(`Error response from daemon: oci runtime error: container_linux.go:247: starting container process caused "process_linux.go:359: container init caused \"rootfs_linux.go:54: mounting \\\"/tmp/keep453790790/by_id/99999999999999999999999999999999+99999/myGenome\\\" to rootfs \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged\\\" at \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged/keep/99999999999999999999999999999999+99999/myGenome\\\" caused \\\"no such file or directory\\\"\""`) } - if t.finish == 4 { + if t.exitCode == 4 { return errors.New(`panic: standard_init_linux.go:175: exec user process caused "no such file or directory"`) } - if t.finish == 5 { + if t.exitCode == 5 { return errors.New(`Error response from daemon: Cannot start container 41f26cbc43bcc1280f4323efb1830a394ba8660c9d1c2b564ba42bf7f7694845: [8] System error: no such file or directory`) } - if t.finish == 6 { + if t.exitCode == 6 { return errors.New(`Error response from daemon: Cannot start container 58099cd76c834f3dc2a4fb76c8028f049ae6d4fdf0ec373e1f2cfea030670c2d: [8] System error: exec: "foobar": executable file not found in $PATH`) } @@ -152,25 +157,24 @@ func (t *TestDockerClient) ContainerStart(ctx context.Context, container string, } } -func (t *TestDockerClient) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error { +func (t *TestDockerClient) ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error { t.stop <- true return nil } func (t *TestDockerClient) ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) { - body := make(chan dockercontainer.ContainerWaitOKBody) + t.calledWait = true + body := make(chan dockercontainer.ContainerWaitOKBody, 1) err := make(chan error) go func() { t.fn(t) - body <- dockercontainer.ContainerWaitOKBody{StatusCode: int64(t.finish)} - close(body) - close(err) + body <- dockercontainer.ContainerWaitOKBody{StatusCode: int64(t.exitCode)} }() return body, err } func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) { - if t.finish == 2 { + if t.exitCode == 2 { return dockertypes.ImageInspect{}, nil, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?") } @@ -182,7 +186,7 @@ func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string } func (t *TestDockerClient) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) { - if t.finish == 2 { + if t.exitCode == 2 { return dockertypes.ImageLoadResponse{}, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?") } _, err := io.Copy(ioutil.Discard, input) @@ -396,8 +400,7 @@ func (client *KeepTestClient) ManifestFileReader(m manifest.Manifest, filename s func (s *TestSuite) TestLoadImage(c *C) { kc := &KeepTestClient{} - docker := NewTestDockerClient(0) - cr := NewContainerRunner(&ArvTestClient{}, kc, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") + cr := NewContainerRunner(&ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") _, err := cr.Docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{}) @@ -512,8 +515,7 @@ func (s *TestSuite) TestLoadImageArvError(c *C) { func (s *TestSuite) TestLoadImageKeepError(c *C) { // (2) Keep error - docker := NewTestDockerClient(0) - cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") + cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") cr.Container.ContainerImage = hwPDH err := cr.LoadImage() @@ -531,8 +533,7 @@ func (s *TestSuite) TestLoadImageCollectionError(c *C) { func (s *TestSuite) TestLoadImageKeepReadError(c *C) { // (4) Collection doesn't contain image - docker := NewTestDockerClient(0) - cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") + cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") cr.Container.ContainerImage = hwPDH err := cr.LoadImage() @@ -572,12 +573,11 @@ func dockerLog(fd byte, msg string) []byte { } func (s *TestSuite) TestRunContainer(c *C) { - docker := NewTestDockerClient(0) - docker.fn = func(t *TestDockerClient) { + s.docker.fn = func(t *TestDockerClient) { t.logWriter.Write(dockerLog(1, "Hello world\n")) t.logWriter.Close() } - cr := NewContainerRunner(&ArvTestClient{}, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") + cr := NewContainerRunner(&ArvTestClient{}, &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") var logs TestLogs cr.NewLogWriter = logs.NewTestLoggingWriter @@ -667,18 +667,18 @@ func (s *TestSuite) TestUpdateContainerCancelled(c *C) { // Used by the TestFullRun*() test below to DRY up boilerplate setup to do full // dress rehearsal of the Run() function, starting from a JSON container record. -func FullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, realTemp string) { +func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, realTemp string) { rec := arvados.Container{} err := json.Unmarshal([]byte(record), &rec) c.Check(err, IsNil) - docker := NewTestDockerClient(exitCode) - docker.fn = fn - docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{}) + s.docker.exitCode = exitCode + s.docker.fn = fn + s.docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{}) api = &ArvTestClient{Container: rec} - docker.api = api - cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") + s.docker.api = api + cr = NewContainerRunner(api, &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") cr.statInterval = 100 * time.Millisecond am := &ArvMountCmdLine{} cr.RunArvMount = am.ArvMountTest @@ -687,7 +687,7 @@ func FullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn f c.Assert(err, IsNil) defer os.RemoveAll(realTemp) - docker.realTemp = realTemp + s.docker.realTemp = realTemp tempcount := 0 cr.MkTempDir = func(_ string, prefix string) (string, error) { @@ -730,7 +730,7 @@ func FullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn f } func (s *TestSuite) TestFullRunHello(c *C) { - api, _, _ := FullRunHelper(c, `{ + api, _, _ := s.fullRunHelper(c, `{ "command": ["echo", "hello world"], "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", "cwd": ".", @@ -751,7 +751,7 @@ func (s *TestSuite) TestFullRunHello(c *C) { } func (s *TestSuite) TestCrunchstat(c *C) { - api, _, _ := FullRunHelper(c, `{ + api, _, _ := s.fullRunHelper(c, `{ "command": ["sleep", "1"], "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", "cwd": ".", @@ -784,7 +784,7 @@ func (s *TestSuite) TestCrunchstat(c *C) { func (s *TestSuite) TestNodeInfoLog(c *C) { os.Setenv("SLURMD_NODENAME", "compute2") - api, _, _ := FullRunHelper(c, `{ + api, _, _ := s.fullRunHelper(c, `{ "command": ["sleep", "1"], "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", "cwd": ".", @@ -818,7 +818,7 @@ func (s *TestSuite) TestNodeInfoLog(c *C) { } func (s *TestSuite) TestContainerRecordLog(c *C) { - api, _, _ := FullRunHelper(c, `{ + api, _, _ := s.fullRunHelper(c, `{ "command": ["sleep", "1"], "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", "cwd": ".", @@ -841,7 +841,7 @@ func (s *TestSuite) TestContainerRecordLog(c *C) { } func (s *TestSuite) TestFullRunStderr(c *C) { - api, _, _ := FullRunHelper(c, `{ + api, _, _ := s.fullRunHelper(c, `{ "command": ["/bin/sh", "-c", "echo hello ; echo world 1>&2 ; exit 1"], "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", "cwd": ".", @@ -866,7 +866,7 @@ func (s *TestSuite) TestFullRunStderr(c *C) { } func (s *TestSuite) TestFullRunDefaultCwd(c *C) { - api, _, _ := FullRunHelper(c, `{ + api, _, _ := s.fullRunHelper(c, `{ "command": ["pwd"], "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", "cwd": ".", @@ -887,7 +887,7 @@ func (s *TestSuite) TestFullRunDefaultCwd(c *C) { } func (s *TestSuite) TestFullRunSetCwd(c *C) { - api, _, _ := FullRunHelper(c, `{ + api, _, _ := s.fullRunHelper(c, `{ "command": ["pwd"], "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", "cwd": "/bin", @@ -909,7 +909,7 @@ func (s *TestSuite) TestFullRunSetCwd(c *C) { func (s *TestSuite) TestStopOnSignal(c *C) { s.testStopContainer(c, func(cr *ContainerRunner) { go func() { - for !cr.cStarted { + for !s.docker.calledWait { time.Sleep(time.Millisecond) } cr.SigChan <- syscall.SIGINT @@ -943,16 +943,15 @@ func (s *TestSuite) testStopContainer(c *C, setup func(cr *ContainerRunner)) { err := json.Unmarshal([]byte(record), &rec) c.Check(err, IsNil) - docker := NewTestDockerClient(0) - docker.fn = func(t *TestDockerClient) { + s.docker.fn = func(t *TestDockerClient) { <-t.stop t.logWriter.Write(dockerLog(1, "foo\n")) t.logWriter.Close() } - docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{}) + s.docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{}) api := &ArvTestClient{Container: rec} - cr := NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") + cr := NewContainerRunner(api, &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") cr.RunArvMount = func([]string, string) (*exec.Cmd, error) { return nil, nil } setup(cr) @@ -978,7 +977,7 @@ func (s *TestSuite) testStopContainer(c *C, setup func(cr *ContainerRunner)) { } func (s *TestSuite) TestFullRunSetEnv(c *C) { - api, _, _ := FullRunHelper(c, `{ + api, _, _ := s.fullRunHelper(c, `{ "command": ["/bin/sh", "-c", "echo $FROBIZ"], "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", "cwd": "/bin", @@ -1029,6 +1028,8 @@ func (s *TestSuite) TestSetupMounts(c *C) { c.Assert(err, IsNil) stubCertPath := stubCert(certTemp) + cr.parentTemp = realTemp + defer os.RemoveAll(realTemp) defer os.RemoveAll(certTemp) @@ -1045,11 +1046,12 @@ func (s *TestSuite) TestSetupMounts(c *C) { } checkEmpty := func() { - filepath.Walk(realTemp, func(path string, _ os.FileInfo, err error) error { - c.Check(path, Equals, realTemp) - c.Check(err, IsNil) - return nil - }) + // Should be deleted. + _, err := os.Stat(realTemp) + c.Assert(os.IsNotExist(err), Equals, true) + + // Now recreate it for the next test. + c.Assert(os.Mkdir(realTemp, 0777), IsNil) } { @@ -1064,7 +1066,7 @@ func (s *TestSuite) TestSetupMounts(c *C) { c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--crunchstat-interval=5", "--mount-by-pdh", "by_id", realTemp + "/keep1"}) - c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/tmp"}) + c.Check(cr.Binds, DeepEquals, []string{realTemp + "/tmp2:/tmp"}) os.RemoveAll(cr.ArvMountPoint) cr.CleanupDirs() checkEmpty() @@ -1083,7 +1085,7 @@ func (s *TestSuite) TestSetupMounts(c *C) { c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--crunchstat-interval=5", "--mount-by-pdh", "by_id", realTemp + "/keep1"}) - c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/out", realTemp + "/3:/tmp"}) + c.Check(cr.Binds, DeepEquals, []string{realTemp + "/tmp2:/out", realTemp + "/tmp3:/tmp"}) os.RemoveAll(cr.ArvMountPoint) cr.CleanupDirs() checkEmpty() @@ -1104,7 +1106,7 @@ func (s *TestSuite) TestSetupMounts(c *C) { c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--crunchstat-interval=5", "--mount-by-pdh", "by_id", realTemp + "/keep1"}) - c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/tmp", stubCertPath + ":/etc/arvados/ca-certificates.crt:ro"}) + c.Check(cr.Binds, DeepEquals, []string{realTemp + "/tmp2:/tmp", stubCertPath + ":/etc/arvados/ca-certificates.crt:ro"}) os.RemoveAll(cr.ArvMountPoint) cr.CleanupDirs() checkEmpty() @@ -1200,8 +1202,8 @@ func (s *TestSuite) TestSetupMounts(c *C) { err := cr.SetupMounts() c.Check(err, IsNil) sort.StringSlice(cr.Binds).Sort() - c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2/mountdata.json:/mnt/test.json:ro"}) - content, err := ioutil.ReadFile(realTemp + "/2/mountdata.json") + c.Check(cr.Binds, DeepEquals, []string{realTemp + "/json2/mountdata.json:/mnt/test.json:ro"}) + content, err := ioutil.ReadFile(realTemp + "/json2/mountdata.json") c.Check(err, IsNil) c.Check(content, DeepEquals, []byte(test.out)) os.RemoveAll(cr.ArvMountPoint) @@ -1227,26 +1229,42 @@ func (s *TestSuite) TestSetupMounts(c *C) { c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--crunchstat-interval=5", "--file-cache", "512", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"}) - c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/tmp", realTemp + "/keep1/tmp0:/tmp/foo:ro"}) + c.Check(cr.Binds, DeepEquals, []string{realTemp + "/tmp2:/tmp", realTemp + "/keep1/tmp0:/tmp/foo:ro"}) os.RemoveAll(cr.ArvMountPoint) cr.CleanupDirs() checkEmpty() } - // Writable mount points are not allowed underneath output_dir mount point + // Writable mount points copied to output_dir mount point { i = 0 cr.ArvMountPoint = "" cr.Container.Mounts = make(map[string]arvados.Mount) cr.Container.Mounts = map[string]arvados.Mount{ - "/tmp": {Kind: "tmp"}, - "/tmp/foo": {Kind: "collection", Writable: true}, + "/tmp": {Kind: "tmp"}, + "/tmp/foo": {Kind: "collection", + PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53", + Writable: true}, + "/tmp/bar": {Kind: "collection", + PortableDataHash: "59389a8f9ee9d399be35462a0f92541d+53", + Path: "baz", + Writable: true}, } cr.OutputPath = "/tmp" + os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm) + os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541d+53/baz", os.ModePerm) + + rf, _ := os.Create(realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541d+53/baz/quux") + rf.Write([]byte("bar")) + rf.Close() + err := cr.SetupMounts() - c.Check(err, NotNil) - c.Check(err, ErrorMatches, `Writable mount points are not permitted underneath the output_path.*`) + c.Check(err, IsNil) + _, err = os.Stat(cr.HostOutputDir + "/foo") + c.Check(err, IsNil) + _, err = os.Stat(cr.HostOutputDir + "/bar/quux") + c.Check(err, IsNil) os.RemoveAll(cr.ArvMountPoint) cr.CleanupDirs() checkEmpty() @@ -1359,7 +1377,7 @@ func (s *TestSuite) TestStdout(c *C) { "runtime_constraints": {} }` - api, _, _ := FullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) { + api, _, _ := s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) { t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n")) t.logWriter.Close() }) @@ -1370,17 +1388,16 @@ func (s *TestSuite) TestStdout(c *C) { } // Used by the TestStdoutWithWrongPath*() -func StdoutErrorRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, err error) { +func (s *TestSuite) stdoutErrorRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, err error) { rec := arvados.Container{} err = json.Unmarshal([]byte(record), &rec) c.Check(err, IsNil) - docker := NewTestDockerClient(0) - docker.fn = fn - docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{}) + s.docker.fn = fn + s.docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{}) api = &ArvTestClient{Container: rec} - cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") + cr = NewContainerRunner(api, &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz") am := &ArvMountCmdLine{} cr.RunArvMount = am.ArvMountTest @@ -1389,7 +1406,7 @@ func StdoutErrorRunHelper(c *C, record string, fn func(t *TestDockerClient)) (ap } func (s *TestSuite) TestStdoutWithWrongPath(c *C) { - _, _, err := StdoutErrorRunHelper(c, `{ + _, _, err := s.stdoutErrorRunHelper(c, `{ "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "file", "path":"/tmpa.out"} }, "output_path": "/tmp" }`, func(t *TestDockerClient) {}) @@ -1399,7 +1416,7 @@ func (s *TestSuite) TestStdoutWithWrongPath(c *C) { } func (s *TestSuite) TestStdoutWithWrongKindTmp(c *C) { - _, _, err := StdoutErrorRunHelper(c, `{ + _, _, err := s.stdoutErrorRunHelper(c, `{ "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "tmp", "path":"/tmp/a.out"} }, "output_path": "/tmp" }`, func(t *TestDockerClient) {}) @@ -1409,7 +1426,7 @@ func (s *TestSuite) TestStdoutWithWrongKindTmp(c *C) { } func (s *TestSuite) TestStdoutWithWrongKindCollection(c *C) { - _, _, err := StdoutErrorRunHelper(c, `{ + _, _, err := s.stdoutErrorRunHelper(c, `{ "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "collection", "path":"/tmp/a.out"} }, "output_path": "/tmp" }`, func(t *TestDockerClient) {}) @@ -1421,7 +1438,7 @@ func (s *TestSuite) TestStdoutWithWrongKindCollection(c *C) { func (s *TestSuite) TestFullRunWithAPI(c *C) { os.Setenv("ARVADOS_API_HOST", "test.arvados.org") defer os.Unsetenv("ARVADOS_API_HOST") - api, _, _ := FullRunHelper(c, `{ + api, _, _ := s.fullRunHelper(c, `{ "command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"], "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", "cwd": "/bin", @@ -1444,7 +1461,7 @@ func (s *TestSuite) TestFullRunWithAPI(c *C) { func (s *TestSuite) TestFullRunSetOutput(c *C) { os.Setenv("ARVADOS_API_HOST", "test.arvados.org") defer os.Unsetenv("ARVADOS_API_HOST") - api, _, _ := FullRunHelper(c, `{ + api, _, _ := s.fullRunHelper(c, `{ "command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"], "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", "cwd": "/bin", @@ -1484,7 +1501,7 @@ func (s *TestSuite) TestStdoutWithExcludeFromOutputMountPointUnderOutputDir(c *C extraMounts := []string{"a3e8f74c6f101eae01fa08bfb4e49b3a+54"} - api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) { + api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) { t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n")) t.logWriter.Close() }) @@ -1519,12 +1536,12 @@ func (s *TestSuite) TestStdoutWithMultipleMountPointsUnderOutputDir(c *C) { "a0def87f80dd594d4675809e83bd4f15+367/subdir1/subdir2/file2_in_subdir2.txt", } - api, runner, realtemp := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) { + api, runner, realtemp := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) { t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n")) t.logWriter.Close() }) - c.Check(runner.Binds, DeepEquals, []string{realtemp + "/2:/tmp", + c.Check(runner.Binds, DeepEquals, []string{realtemp + "/tmp2:/tmp", realtemp + "/keep1/by_id/a0def87f80dd594d4675809e83bd4f15+367/file2_in_main.txt:/tmp/foo/bar:ro", realtemp + "/keep1/by_id/a0def87f80dd594d4675809e83bd4f15+367/subdir1/subdir2/file2_in_subdir2.txt:/tmp/foo/baz/sub2file2:ro", realtemp + "/keep1/by_id/a0def87f80dd594d4675809e83bd4f15+367/subdir1:/tmp/foo/sub1:ro", @@ -1571,7 +1588,7 @@ func (s *TestSuite) TestStdoutWithMountPointsUnderOutputDirDenormalizedManifest( "b0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt", } - api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) { + api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) { t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n")) t.logWriter.Close() }) @@ -1612,12 +1629,12 @@ func (s *TestSuite) TestOutputSymlinkToInput(c *C) { "a0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt", } - api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) { - os.Symlink("/keep/foo/sub1file2", t.realTemp+"/2/baz") - os.Symlink("/keep/foo2/subdir1/file2_in_subdir1.txt", t.realTemp+"/2/baz2") - os.Symlink("/keep/foo2/subdir1", t.realTemp+"/2/baz3") - os.Mkdir(t.realTemp+"/2/baz4", 0700) - os.Symlink("/keep/foo2/subdir1/file2_in_subdir1.txt", t.realTemp+"/2/baz4/baz5") + api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) { + os.Symlink("/keep/foo/sub1file2", t.realTemp+"/tmp2/baz") + os.Symlink("/keep/foo2/subdir1/file2_in_subdir1.txt", t.realTemp+"/tmp2/baz2") + os.Symlink("/keep/foo2/subdir1", t.realTemp+"/tmp2/baz3") + os.Mkdir(t.realTemp+"/tmp2/baz4", 0700) + os.Symlink("/keep/foo2/subdir1/file2_in_subdir1.txt", t.realTemp+"/tmp2/baz4/baz5") t.logWriter.Close() }) @@ -1654,8 +1671,8 @@ func (s *TestSuite) TestOutputError(c *C) { extraMounts := []string{} - api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) { - os.Symlink("/etc/hosts", t.realTemp+"/2/baz") + api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) { + os.Symlink("/etc/hosts", t.realTemp+"/tmp2/baz") t.logWriter.Close() }) @@ -1678,22 +1695,22 @@ func (s *TestSuite) TestOutputSymlinkToOutput(c *C) { extraMounts := []string{} - api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) { - rf, _ := os.Create(t.realTemp + "/2/realfile") + api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) { + rf, _ := os.Create(t.realTemp + "/tmp2/realfile") rf.Write([]byte("foo")) rf.Close() - os.Mkdir(t.realTemp+"/2/realdir", 0700) - rf, _ = os.Create(t.realTemp + "/2/realdir/subfile") + os.Mkdir(t.realTemp+"/tmp2/realdir", 0700) + rf, _ = os.Create(t.realTemp + "/tmp2/realdir/subfile") rf.Write([]byte("bar")) rf.Close() - os.Symlink("/tmp/realfile", t.realTemp+"/2/file1") - os.Symlink("realfile", t.realTemp+"/2/file2") - os.Symlink("/tmp/file1", t.realTemp+"/2/file3") - os.Symlink("file2", t.realTemp+"/2/file4") - os.Symlink("realdir", t.realTemp+"/2/dir1") - os.Symlink("/tmp/realdir", t.realTemp+"/2/dir2") + os.Symlink("/tmp/realfile", t.realTemp+"/tmp2/file1") + os.Symlink("realfile", t.realTemp+"/tmp2/file2") + os.Symlink("/tmp/file1", t.realTemp+"/tmp2/file3") + os.Symlink("file2", t.realTemp+"/tmp2/file4") + os.Symlink("realdir", t.realTemp+"/tmp2/dir1") + os.Symlink("/tmp/realdir", t.realTemp+"/tmp2/dir2") t.logWriter.Close() }) @@ -1735,7 +1752,7 @@ func (s *TestSuite) TestStdinCollectionMountPoint(c *C) { "b0def87f80dd594d4675809e83bd4f15+367/file1_in_main.txt", } - api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) { + api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) { t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n")) t.logWriter.Close() }) @@ -1770,7 +1787,7 @@ func (s *TestSuite) TestStdinJsonMountPoint(c *C) { "runtime_constraints": {} }` - api, _, _ := FullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) { + api, _, _ := s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) { t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n")) t.logWriter.Close() }) @@ -1790,7 +1807,7 @@ func (s *TestSuite) TestStdinJsonMountPoint(c *C) { } func (s *TestSuite) TestStderrMount(c *C) { - api, _, _ := FullRunHelper(c, `{ + api, _, _ := s.fullRunHelper(c, `{ "command": ["/bin/sh", "-c", "echo hello;exit 1"], "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", "cwd": ".", @@ -1887,7 +1904,7 @@ exec echo killme ech := tf.Name() brokenNodeHook = &ech - api, _, _ := FullRunHelper(c, `{ + api, _, _ := s.fullRunHelper(c, `{ "command": ["echo", "hello world"], "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", "cwd": ".", @@ -1912,7 +1929,7 @@ func (s *TestSuite) TestFullBrokenDocker2(c *C) { ech := "" brokenNodeHook = &ech - api, _, _ := FullRunHelper(c, `{ + api, _, _ := s.fullRunHelper(c, `{ "command": ["echo", "hello world"], "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", "cwd": ".", @@ -1935,7 +1952,7 @@ func (s *TestSuite) TestFullBrokenDocker3(c *C) { ech := "" brokenNodeHook = &ech - api, _, _ := FullRunHelper(c, `{ + api, _, _ := s.fullRunHelper(c, `{ "command": ["echo", "hello world"], "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", "cwd": ".", @@ -1957,7 +1974,7 @@ func (s *TestSuite) TestBadCommand1(c *C) { ech := "" brokenNodeHook = &ech - api, _, _ := FullRunHelper(c, `{ + api, _, _ := s.fullRunHelper(c, `{ "command": ["echo", "hello world"], "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", "cwd": ".", @@ -1979,7 +1996,7 @@ func (s *TestSuite) TestBadCommand2(c *C) { ech := "" brokenNodeHook = &ech - api, _, _ := FullRunHelper(c, `{ + api, _, _ := s.fullRunHelper(c, `{ "command": ["echo", "hello world"], "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", "cwd": ".", @@ -2001,7 +2018,7 @@ func (s *TestSuite) TestBadCommand3(c *C) { ech := "" brokenNodeHook = &ech - api, _, _ := FullRunHelper(c, `{ + api, _, _ := s.fullRunHelper(c, `{ "command": ["echo", "hello world"], "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122", "cwd": ".", diff --git a/services/fuse/arvados_fuse/fusedir.py b/services/fuse/arvados_fuse/fusedir.py index becd66975f..34fd594be2 100644 --- a/services/fuse/arvados_fuse/fusedir.py +++ b/services/fuse/arvados_fuse/fusedir.py @@ -1028,64 +1028,71 @@ class SharedDirectory(Directory): self.current_user = api.users().current().execute(num_retries=num_retries) self._poll = True self._poll_time = poll_time + self._updating_lock = threading.Lock() @use_counter def update(self): - with llfuse.lock_released: - all_projects = arvados.util.list_all( - self.api.groups().list, self.num_retries, - filters=[['group_class','=','project']]) - objects = {} - for ob in all_projects: - objects[ob['uuid']] = ob - - roots = [] - root_owners = {} - for ob in all_projects: - if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects: - roots.append(ob) - root_owners[ob['owner_uuid']] = True - - lusers = arvados.util.list_all( - self.api.users().list, self.num_retries, - filters=[['uuid','in', list(root_owners)]]) - lgroups = arvados.util.list_all( - self.api.groups().list, self.num_retries, - filters=[['uuid','in', list(root_owners)]]) - - users = {} - groups = {} - - for l in lusers: - objects[l["uuid"]] = l - for l in lgroups: - objects[l["uuid"]] = l - - contents = {} - for r in root_owners: - if r in objects: - obr = objects[r] - if obr.get("name"): - contents[obr["name"]] = obr - #elif obr.get("username"): - # contents[obr["username"]] = obr - elif "first_name" in obr: - contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr - - - for r in roots: - if r['owner_uuid'] not in objects: - contents[r['name']] = r - - # end with llfuse.lock_released, re-acquire lock - try: + with llfuse.lock_released: + self._updating_lock.acquire() + if not self.stale(): + return + + all_projects = arvados.util.list_all( + self.api.groups().list, self.num_retries, + filters=[['group_class','=','project']], + select=["uuid", "owner_uuid"]) + objects = {} + for ob in all_projects: + objects[ob['uuid']] = ob + + roots = [] + root_owners = set() + current_uuid = self.current_user['uuid'] + for ob in all_projects: + if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects: + roots.append(ob['uuid']) + root_owners.add(ob['owner_uuid']) + + lusers = arvados.util.list_all( + self.api.users().list, self.num_retries, + filters=[['uuid','in', list(root_owners)]]) + lgroups = arvados.util.list_all( + self.api.groups().list, self.num_retries, + filters=[['uuid','in', list(root_owners)+roots]]) + + for l in lusers: + objects[l["uuid"]] = l + for l in lgroups: + objects[l["uuid"]] = l + + contents = {} + for r in root_owners: + if r in objects: + obr = objects[r] + if obr.get("name"): + contents[obr["name"]] = obr + #elif obr.get("username"): + # contents[obr["username"]] = obr + elif "first_name" in obr: + contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr + + for r in roots: + if r in objects: + obr = objects[r] + if obr['owner_uuid'] not in objects: + contents[obr["name"]] = obr + + # end with llfuse.lock_released, re-acquire lock + self.merge(contents.items(), lambda i: i[0], lambda a, i: a.uuid() == i[1]['uuid'], lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time)) except Exception: - _logger.exception() + _logger.exception("arv-mount shared dir error") + finally: + self._updating_lock.release() def want_event_subscribe(self): return True diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go index c64ac7a8fe..62c856da34 100644 --- a/services/keepstore/azure_blob_volume.go +++ b/services/keepstore/azure_blob_volume.go @@ -402,7 +402,17 @@ func (v *AzureBlobVolume) Put(ctx context.Context, loc string, block []byte) err }() errChan := make(chan error) go func() { - errChan <- v.bsClient.CreateBlockBlobFromReader(v.ContainerName, loc, uint64(len(block)), bufr, nil) + var body io.Reader = bufr + if len(block) == 0 { + // We must send a "Content-Length: 0" header, + // but the http client interprets + // ContentLength==0 as "unknown" unless it can + // confirm by introspection that Body will + // read 0 bytes. + body = http.NoBody + bufr.Close() + } + errChan <- v.bsClient.CreateBlockBlobFromReader(v.ContainerName, loc, uint64(len(block)), body, nil) }() select { case <-ctx.Done(): @@ -722,7 +732,9 @@ func (c *azureBlobClient) GetBlobRange(cname, bname, byterange string, hdrs map[ func (c *azureBlobClient) CreateBlockBlobFromReader(cname, bname string, size uint64, rdr io.Reader, hdrs map[string]string) error { c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps) - rdr = NewCountingReader(rdr, c.stats.TickOutBytes) + if size != 0 { + rdr = NewCountingReader(rdr, c.stats.TickOutBytes) + } err := c.client.CreateBlockBlobFromReader(cname, bname, size, rdr, hdrs) c.stats.TickErr(err) return err diff --git a/services/keepstore/azure_blob_volume_test.go b/services/keepstore/azure_blob_volume_test.go index 4256ec0d0c..06216edcb8 100644 --- a/services/keepstore/azure_blob_volume_test.go +++ b/services/keepstore/azure_blob_volume_test.go @@ -124,6 +124,11 @@ func (h *azStubHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { return } + if (r.Method == "PUT" || r.Method == "POST") && r.Header.Get("Content-Length") == "" { + rw.WriteHeader(http.StatusLengthRequired) + return + } + body, err := ioutil.ReadAll(r.Body) if err != nil { return diff --git a/tools/arvbox/lib/arvbox/docker/Dockerfile.base b/tools/arvbox/lib/arvbox/docker/Dockerfile.base index 83d507b62b..18d430dd7c 100644 --- a/tools/arvbox/lib/arvbox/docker/Dockerfile.base +++ b/tools/arvbox/lib/arvbox/docker/Dockerfile.base @@ -90,6 +90,7 @@ ADD crunch-setup.sh gitolite.rc \ keep-setup.sh common.sh createusers.sh \ logger runsu.sh waitforpostgres.sh \ application_yml_override.py api-setup.sh \ + go-setup.sh \ /usr/local/lib/arvbox/ ADD runit /etc/runit diff --git a/tools/arvbox/lib/arvbox/docker/Dockerfile.demo b/tools/arvbox/lib/arvbox/docker/Dockerfile.demo index 80344c16f2..7cb51edfdc 100644 --- a/tools/arvbox/lib/arvbox/docker/Dockerfile.demo +++ b/tools/arvbox/lib/arvbox/docker/Dockerfile.demo @@ -24,13 +24,16 @@ RUN echo "production" > /var/lib/arvados/workbench_rails_env RUN chown -R 1000:1000 /usr/src && /usr/local/lib/arvbox/createusers.sh +RUN sudo -u arvbox /var/lib/arvbox/service/composer/run-service --only-deps +RUN sudo -u arvbox /var/lib/arvbox/service/keep-web/run-service --only-deps RUN sudo -u arvbox /var/lib/arvbox/service/sso/run-service --only-deps RUN sudo -u arvbox /var/lib/arvbox/service/api/run-service --only-deps RUN sudo -u arvbox /var/lib/arvbox/service/workbench/run-service --only-deps RUN sudo -u arvbox /var/lib/arvbox/service/doc/run-service --only-deps RUN sudo -u arvbox /var/lib/arvbox/service/vm/run-service --only-deps -RUN sudo -u arvbox /var/lib/arvbox/service/keep-web/run-service --only-deps RUN sudo -u arvbox /var/lib/arvbox/service/keepproxy/run-service --only-deps RUN sudo -u arvbox /var/lib/arvbox/service/arv-git-httpd/run-service --only-deps +RUN sudo -u arvbox /var/lib/arvbox/service/crunch-dispatch-local/run-service --only-deps +RUN sudo -u arvbox /var/lib/arvbox/service/websockets/run-service --only-deps RUN sudo -u arvbox /usr/local/lib/arvbox/keep-setup.sh --only-deps RUN sudo -u arvbox /var/lib/arvbox/service/sdk/run-service diff --git a/tools/arvbox/lib/arvbox/docker/crunch-setup.sh b/tools/arvbox/lib/arvbox/docker/crunch-setup.sh index 30ecafb889..b3ec5cd104 100755 --- a/tools/arvbox/lib/arvbox/docker/crunch-setup.sh +++ b/tools/arvbox/lib/arvbox/docker/crunch-setup.sh @@ -7,16 +7,11 @@ exec 2>&1 set -eux -o pipefail . /usr/local/lib/arvbox/common.sh +. /usr/local/lib/arvbox/go-setup.sh -mkdir -p /var/lib/gopath -cd /var/lib/gopath - -export GOPATH=$PWD -mkdir -p "$GOPATH/src/git.curoverse.com" -ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git" flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/crunchstat" flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/sdk/go/crunchrunner" -install bin/crunchstat bin/crunchrunner /usr/local/bin +install $GOPATH/bin/crunchstat $GOPATH/bin/crunchrunner /usr/local/bin if test -s /var/lib/arvados/api_rails_env ; then RAILS_ENV=$(cat /var/lib/arvados/api_rails_env) diff --git a/tools/arvbox/lib/arvbox/docker/go-setup.sh b/tools/arvbox/lib/arvbox/docker/go-setup.sh new file mode 100644 index 0000000000..f068ce6842 --- /dev/null +++ b/tools/arvbox/lib/arvbox/docker/go-setup.sh @@ -0,0 +1,16 @@ +#!/bin/bash +# Copyright (C) The Arvados Authors. All rights reserved. +# +# SPDX-License-Identifier: AGPL-3.0 + +mkdir -p /var/lib/gopath +cd /var/lib/gopath + +export GOPATH=$PWD +mkdir -p "$GOPATH/src/git.curoverse.com" +ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git" + +flock /var/lib/gopath/gopath.lock go get -t github.com/kardianos/govendor +cd "$GOPATH/src/git.curoverse.com/arvados.git" +flock /var/lib/gopath/gopath.lock go get -v -d ... +flock /var/lib/gopath/gopath.lock "$GOPATH/bin/govendor" sync diff --git a/tools/arvbox/lib/arvbox/docker/keep-setup.sh b/tools/arvbox/lib/arvbox/docker/keep-setup.sh index 5da2cfac44..8ef66a6068 100755 --- a/tools/arvbox/lib/arvbox/docker/keep-setup.sh +++ b/tools/arvbox/lib/arvbox/docker/keep-setup.sh @@ -8,15 +8,10 @@ sleep 2 set -eux -o pipefail . /usr/local/lib/arvbox/common.sh +. /usr/local/lib/arvbox/go-setup.sh -mkdir -p /var/lib/gopath -cd /var/lib/gopath - -export GOPATH=$PWD -mkdir -p "$GOPATH/src/git.curoverse.com" -ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git" flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/keepstore" -install bin/keepstore /usr/local/bin +install $GOPATH/bin/keepstore /usr/local/bin if test "$1" = "--only-deps" ; then exit diff --git a/tools/arvbox/lib/arvbox/docker/service/arv-git-httpd/run-service b/tools/arvbox/lib/arvbox/docker/service/arv-git-httpd/run-service index 806f9cd37a..1383f7140f 100755 --- a/tools/arvbox/lib/arvbox/docker/service/arv-git-httpd/run-service +++ b/tools/arvbox/lib/arvbox/docker/service/arv-git-httpd/run-service @@ -7,15 +7,10 @@ exec 2>&1 set -ex -o pipefail . /usr/local/lib/arvbox/common.sh +. /usr/local/lib/arvbox/go-setup.sh -mkdir -p /var/lib/gopath -cd /var/lib/gopath - -export GOPATH=$PWD -mkdir -p "$GOPATH/src/git.curoverse.com" -ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git" flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/arv-git-httpd" -install bin/arv-git-httpd /usr/local/bin +install $GOPATH/bin/arv-git-httpd /usr/local/bin if test "$1" = "--only-deps" ; then exit diff --git a/tools/arvbox/lib/arvbox/docker/service/composer/run-service b/tools/arvbox/lib/arvbox/docker/service/composer/run-service index ac4441de09..abd350f073 100755 --- a/tools/arvbox/lib/arvbox/docker/service/composer/run-service +++ b/tools/arvbox/lib/arvbox/docker/service/composer/run-service @@ -10,13 +10,13 @@ set -ex -o pipefail cd /usr/src/composer -npm -d install yarn - -PATH=$PATH:/usr/src/composer/node_modules/.bin +npm -d install --prefix /usr/local --global yarn yarn install -if test "$1" != "--only-deps" ; then - echo "apiEndPoint: https://${localip}:${services[api]}" > /usr/src/composer/src/composer.yml - exec ng serve --host 0.0.0.0 --port 4200 --env=webdev +if test "$1" = "--only-deps" ; then + exit fi + +echo "apiEndPoint: https://${localip}:${services[api]}" > /usr/src/composer/src/composer.yml +exec node_modules/.bin/ng serve --host 0.0.0.0 --port 4200 --env=webdev diff --git a/tools/arvbox/lib/arvbox/docker/service/crunch-dispatch-local/run-service b/tools/arvbox/lib/arvbox/docker/service/crunch-dispatch-local/run-service index e7a3026821..decbccddee 100755 --- a/tools/arvbox/lib/arvbox/docker/service/crunch-dispatch-local/run-service +++ b/tools/arvbox/lib/arvbox/docker/service/crunch-dispatch-local/run-service @@ -4,19 +4,18 @@ # SPDX-License-Identifier: AGPL-3.0 exec 2>&1 -set -eux -o pipefail +set -ex -o pipefail . /usr/local/lib/arvbox/common.sh +. /usr/local/lib/arvbox/go-setup.sh -mkdir -p /var/lib/gopath -cd /var/lib/gopath - -export GOPATH=$PWD -mkdir -p "$GOPATH/src/git.curoverse.com" -ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git" flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/crunch-run" flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/crunch-dispatch-local" -install bin/crunch-run bin/crunch-dispatch-local /usr/local/bin +install $GOPATH/bin/crunch-run $GOPATH/bin/crunch-dispatch-local /usr/local/bin + +if test "$1" = "--only-deps" ; then + exit +fi cat > /usr/local/bin/crunch-run.sh <&1 set -ex -o pipefail . /usr/local/lib/arvbox/common.sh +. /usr/local/lib/arvbox/go-setup.sh -mkdir -p /var/lib/gopath -cd /var/lib/gopath - -export GOPATH=$PWD -mkdir -p "$GOPATH/src/git.curoverse.com" -ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git" flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/keep-web" -install bin/keep-web /usr/local/bin +install $GOPATH/bin/keep-web /usr/local/bin if test "$1" = "--only-deps" ; then exit diff --git a/tools/arvbox/lib/arvbox/docker/service/keepproxy/run-service b/tools/arvbox/lib/arvbox/docker/service/keepproxy/run-service index dae2dfdd78..199247b7a0 100755 --- a/tools/arvbox/lib/arvbox/docker/service/keepproxy/run-service +++ b/tools/arvbox/lib/arvbox/docker/service/keepproxy/run-service @@ -8,15 +8,10 @@ sleep 2 set -ex -o pipefail . /usr/local/lib/arvbox/common.sh +. /usr/local/lib/arvbox/go-setup.sh -mkdir -p /var/lib/gopath -cd /var/lib/gopath - -export GOPATH=$PWD -mkdir -p "$GOPATH/src/git.curoverse.com" -ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git" flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/keepproxy" -install bin/keepproxy /usr/local/bin +install $GOPATH/bin/keepproxy /usr/local/bin if test "$1" = "--only-deps" ; then exit diff --git a/tools/arvbox/lib/arvbox/docker/service/postgres/run b/tools/arvbox/lib/arvbox/docker/service/postgres/run index 45407cb9ab..3ef78ee455 100755 --- a/tools/arvbox/lib/arvbox/docker/service/postgres/run +++ b/tools/arvbox/lib/arvbox/docker/service/postgres/run @@ -5,6 +5,8 @@ flock /var/lib/arvados/createusers.lock /usr/local/lib/arvbox/createusers.sh +make-ssl-cert generate-default-snakeoil --force-overwrite + . /usr/local/lib/arvbox/common.sh chown -R $PGUSER:$PGGROUP /var/lib/postgresql diff --git a/tools/arvbox/lib/arvbox/docker/service/websockets/run-service b/tools/arvbox/lib/arvbox/docker/service/websockets/run-service index cb56ac7f4d..2d01d90798 100755 --- a/tools/arvbox/lib/arvbox/docker/service/websockets/run-service +++ b/tools/arvbox/lib/arvbox/docker/service/websockets/run-service @@ -14,14 +14,10 @@ else RAILS_ENV=development fi -mkdir -p /var/lib/gopath -cd /var/lib/gopath +. /usr/local/lib/arvbox/go-setup.sh -export GOPATH=$PWD -mkdir -p "$GOPATH/src/git.curoverse.com" -ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git" flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/ws" -install bin/ws /usr/local/bin/arvados-ws +install $GOPATH/bin/ws /usr/local/bin/arvados-ws if test "$1" = "--only-deps" ; then exit diff --git a/tools/sync-groups/sync-groups.go b/tools/sync-groups/sync-groups.go index ebc40b13cb..10569b2e13 100644 --- a/tools/sync-groups/sync-groups.go +++ b/tools/sync-groups/sync-groups.go @@ -307,7 +307,7 @@ func doMain(cfg *ConfigParams) error { } userIDToUUID[uID] = u.UUID if cfg.Verbose { - log.Printf("Seen user %q (%s)", u.Username, u.Email) + log.Printf("Seen user %q (%s)", u.Username, u.UUID) } } @@ -317,6 +317,11 @@ func doMain(cfg *ConfigParams) error { return err } log.Printf("Found %d remote groups", len(remoteGroups)) + if cfg.Verbose { + for groupUUID := range remoteGroups { + log.Printf("- Group %q: %d users", remoteGroups[groupUUID].Group.Name, len(remoteGroups[groupUUID].PreviousMembers)) + } + } membershipsRemoved := 0 @@ -504,9 +509,9 @@ func GetRemoteGroups(cfg *ConfigParams, allUsers map[string]arvados.User) (remot Operator: "=", Operand: group.UUID, }, { - Attr: "head_kind", - Operator: "=", - Operand: "arvados#user", + Attr: "head_uuid", + Operator: "like", + Operand: "%-tpzed-%", }}, } // User -> Group filter @@ -528,9 +533,9 @@ func GetRemoteGroups(cfg *ConfigParams, allUsers map[string]arvados.User) (remot Operator: "=", Operand: group.UUID, }, { - Attr: "tail_kind", - Operator: "=", - Operand: "arvados#user", + Attr: "tail_uuid", + Operator: "like", + Operand: "%-tpzed-%", }}, } g2uLinks, err := GetAll(cfg.Client, "links", g2uFilter, &LinkList{}) @@ -579,7 +584,7 @@ func GetRemoteGroups(cfg *ConfigParams, allUsers map[string]arvados.User) (remot // RemoveMemberFromGroup remove all links related to the membership func RemoveMemberFromGroup(cfg *ConfigParams, user arvados.User, group arvados.Group) error { if cfg.Verbose { - log.Printf("Getting group membership links for user %q (%s) on group %q (%s)", user.Email, user.UUID, group.Name, group.UUID) + log.Printf("Getting group membership links for user %q (%s) on group %q (%s)", user.Username, user.UUID, group.Name, group.UUID) } var links []interface{} // Search for all group<->user links (both ways) diff --git a/tools/sync-groups/sync-groups_test.go b/tools/sync-groups/sync-groups_test.go index e776648a80..4a3e470c42 100644 --- a/tools/sync-groups/sync-groups_test.go +++ b/tools/sync-groups/sync-groups_test.go @@ -83,7 +83,6 @@ func (s *TestSuite) SetUpTest(c *C) { c.Assert(len(s.users), Not(Equals), 0) } -// Clean any membership link and remote group created by the test func (s *TestSuite) TearDownTest(c *C) { var dst interface{} // Reset database to fixture state after every test run. @@ -93,7 +92,7 @@ func (s *TestSuite) TearDownTest(c *C) { var _ = Suite(&TestSuite{}) -// MakeTempCVSFile creates a temp file with data as comma separated values +// MakeTempCSVFile creates a temp file with data as comma separated values func MakeTempCSVFile(data [][]string) (f *os.File, err error) { f, err = ioutil.TempFile("", "test_sync_remote_groups") if err != nil { @@ -266,11 +265,15 @@ func (s *TestSuite) TestIgnoreSpaces(c *C) { // The absence of a user membership on the CSV file implies its removal func (s *TestSuite) TestMembershipRemoval(c *C) { - activeUserEmail := s.users[arvadostest.ActiveUserUUID].Email - activeUserUUID := s.users[arvadostest.ActiveUserUUID].UUID + localUserEmail := s.users[arvadostest.ActiveUserUUID].Email + localUserUUID := s.users[arvadostest.ActiveUserUUID].UUID + remoteUserEmail := s.users[arvadostest.FederatedActiveUserUUID].Email + remoteUserUUID := s.users[arvadostest.FederatedActiveUserUUID].UUID data := [][]string{ - {"TestGroup1", activeUserEmail}, - {"TestGroup2", activeUserEmail}, + {"TestGroup1", localUserEmail}, + {"TestGroup1", remoteUserEmail}, + {"TestGroup2", localUserEmail}, + {"TestGroup2", remoteUserEmail}, } tmpfile, err := MakeTempCSVFile(data) c.Assert(err, IsNil) @@ -283,11 +286,13 @@ func (s *TestSuite) TestMembershipRemoval(c *C) { groupUUID, err := RemoteGroupExists(s.cfg, groupName) c.Assert(err, IsNil) c.Assert(groupUUID, Not(Equals), "") - c.Assert(GroupMembershipExists(s.cfg.Client, activeUserUUID, groupUUID), Equals, true) + c.Assert(GroupMembershipExists(s.cfg.Client, localUserUUID, groupUUID), Equals, true) + c.Assert(GroupMembershipExists(s.cfg.Client, remoteUserUUID, groupUUID), Equals, true) } - // New CSV with one previous membership missing + // New CSV with some previous membership missing data = [][]string{ - {"TestGroup1", activeUserEmail}, + {"TestGroup1", localUserEmail}, + {"TestGroup2", remoteUserEmail}, } tmpfile2, err := MakeTempCSVFile(data) c.Assert(err, IsNil) @@ -295,16 +300,18 @@ func (s *TestSuite) TestMembershipRemoval(c *C) { s.cfg.Path = tmpfile2.Name() err = doMain(s.cfg) c.Assert(err, IsNil) - // Confirm TestGroup1 membership still exist + // Confirm TestGroup1 memberships groupUUID, err := RemoteGroupExists(s.cfg, "TestGroup1") c.Assert(err, IsNil) c.Assert(groupUUID, Not(Equals), "") - c.Assert(GroupMembershipExists(s.cfg.Client, activeUserUUID, groupUUID), Equals, true) - // Confirm TestGroup2 membership was removed + c.Assert(GroupMembershipExists(s.cfg.Client, localUserUUID, groupUUID), Equals, true) + c.Assert(GroupMembershipExists(s.cfg.Client, remoteUserUUID, groupUUID), Equals, false) + // Confirm TestGroup1 memberships groupUUID, err = RemoteGroupExists(s.cfg, "TestGroup2") c.Assert(err, IsNil) c.Assert(groupUUID, Not(Equals), "") - c.Assert(GroupMembershipExists(s.cfg.Client, activeUserUUID, groupUUID), Equals, false) + c.Assert(GroupMembershipExists(s.cfg.Client, localUserUUID, groupUUID), Equals, false) + c.Assert(GroupMembershipExists(s.cfg.Client, remoteUserUUID, groupUUID), Equals, true) } // If a group doesn't exist on the system, create it before adding users