1. Only mount points of kind @collection@ are supported.
-2. Mount points underneath output_path must not use @"writable":true@. If any of them are set as @writable@, the API will refuse to create/update the container request, and crunch-run will fail the container.
+2. Mount points underneath output_path which have "writable":true are copied into output_path during container initialization and may be updated, renamed, or deleted by the running container. The original collection is not modified. On container completion, files remaining in the output are saved to the output collection. The mount at output_path must be big enough to accommodate copies of the inner writable mounts.
3. If any such mount points are configured as @exclude_from_output":true@, they will be excluded from the output.
def check_features(self, obj):
if isinstance(obj, dict):
- if obj.get("writable"):
- raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported")
+ if obj.get("writable") and self.work_api != "containers":
+ raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
if obj.get("class") == "DockerRequirement":
if obj.get("dockerOutputDirectory"):
- # TODO: can be supported by containers API, but not jobs API.
- raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
- "Option 'dockerOutputDirectory' of DockerRequirement not supported.")
+ if self.work_api != "containers":
+ raise SourceLine(obj, "dockerOutputDirectory", UnsupportedRequirement).makeError(
+ "Option 'dockerOutputDirectory' of DockerRequirement not supported with --api=jobs.")
+ if not obj.get("dockerOutputDirectory").startswith('/'):
+ raise SourceLine(obj, "dockerOutputDirectory", validate.ValidationException).makeError(
+ "Option 'dockerOutputDirectory' must be an absolute path.")
for v in obj.itervalues():
self.check_features(v)
elif isinstance(obj, list):
def rewrite(fileobj):
fileobj["location"] = generatemapper.mapper(fileobj["location"]).target
- for k in ("basename", "listing", "contents", "nameext", "nameroot", "dirname"):
+ for k in ("listing", "contents", "nameext", "nameroot", "dirname"):
if k in fileobj:
del fileobj[k]
generatemapper = NoFollowPathMapper([self.generatefiles], "", "",
separateDirs=False)
+ logger.debug("generatemapper is %s", generatemapper._pathmap)
+
with Perf(metrics, "createfiles %s" % self.name):
for f, p in generatemapper.items():
if not p.target:
pass
- elif p.type in ("File", "Directory"):
- source, path = self.arvrunner.fs_access.get_collection(p.resolved)
- vwd.copy(path, p.target, source_collection=source)
+ elif p.type in ("File", "Directory", "WritableFile", "WritableDirectory"):
+ if p.resolved.startswith("_:"):
+ vwd.mkdirs(p.target)
+ else:
+ source, path = self.arvrunner.fs_access.get_collection(p.resolved)
+ vwd.copy(path, p.target, source_collection=source)
elif p.type == "CreateFile":
with vwd.open(p.target, "w") as n:
n.write(p.resolved.encode("utf-8"))
+ def keepemptydirs(p):
+ if isinstance(p, arvados.collection.RichCollectionBase):
+ if len(p) == 0:
+ p.open(".keep", "w").close()
+ else:
+ for c in p:
+ keepemptydirs(p[c])
+
+ keepemptydirs(vwd)
+
with Perf(metrics, "generatefiles.save_new %s" % self.name):
vwd.save_new()
mounts[mountpoint] = {"kind": "collection",
"portable_data_hash": vwd.portable_data_hash(),
"path": p.target}
+ if p.type.startswith("Writable"):
+ mounts[mountpoint]["writable"] = True
container_request["environment"] = {"TMPDIR": self.tmpdir, "HOME": self.outdir}
if self.environment:
def job(self, joborder, output_callback, **kwargs):
if self.work_api == "containers":
- kwargs["outdir"] = "/var/spool/cwl"
- kwargs["docker_outdir"] = "/var/spool/cwl"
+ dockerReq, is_req = self.get_requirement("DockerRequirement")
+ if dockerReq and dockerReq.get("dockerOutputDirectory"):
+ kwargs["outdir"] = dockerReq.get("dockerOutputDirectory")
+ kwargs["docker_outdir"] = dockerReq.get("dockerOutputDirectory")
+ else:
+ kwargs["outdir"] = "/var/spool/cwl"
+ kwargs["docker_outdir"] = "/var/spool/cwl"
elif self.work_api == "jobs":
kwargs["outdir"] = "$(task.outdir)"
kwargs["docker_outdir"] = "$(task.outdir)"
tgt = os.path.join(stagedir, obj["basename"])
basetgt, baseext = os.path.splitext(tgt)
n = 1
- while tgt in self.targets:
- n += 1
- tgt = "%s_%i%s" % (basetgt, n, baseext)
+ if tgt in self.targets and (self.reversemap(tgt)[0] != loc):
+ while tgt in self.targets:
+ n += 1
+ tgt = "%s_%i%s" % (basetgt, n, baseext)
self.targets.add(tgt)
if obj["class"] == "Directory":
- self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
+ if obj.get("writable"):
+ self._pathmap[loc] = MapperEnt(loc, tgt, "WritableDirectory", staged)
+ else:
+ self._pathmap[loc] = MapperEnt(loc, tgt, "Directory", staged)
if loc.startswith("_:") or self._follow_dirs:
self.visitlisting(obj.get("listing", []), tgt, basedir)
elif obj["class"] == "File":
if "contents" in obj and loc.startswith("_:"):
self._pathmap[loc] = MapperEnt(obj["contents"], tgt, "CreateFile", staged)
else:
- if copy:
+ if copy or obj.get("writable"):
self._pathmap[loc] = MapperEnt(loc, tgt, "WritableFile", staged)
else:
self._pathmap[loc] = MapperEnt(loc, tgt, "File", staged)
if isinstance(tool, CommandLineTool):
(docker_req, docker_is_req) = get_feature(tool, "DockerRequirement")
if docker_req:
- if docker_req.get("dockerOutputDirectory"):
+ if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
# TODO: can be supported by containers API, but not jobs API.
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
# Note that arvados/build/run-build-packages.sh looks at this
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20180116213856',
+ 'cwltool==1.0.20180130110340',
'schema-salad==2.6.20171201034858',
'typing==3.5.3.0',
'ruamel.yaml==0.13.7',
if ! arv-get d7514270f356df848477718d58308cc4+94 > /dev/null ; then
arv-put --portable-data-hash testdir/*
fi
+if ! arv-get f225e6259bdd63bc7240599648dde9f1+97 > /dev/null ; then
+ arv-put --portable-data-hash hg19/*
+fi
exec cwltest --test arvados-tests.yml --tool arvados-cwl-runner $@ -- --disable-reuse --compute-checksum
final.save_new.assert_has_calls([mock.call(ensure_unique_name=True, name='Test output', owner_uuid='zzzzz-j7d0g-zzzzzzzzzzzzzzz')])
self.assertEqual("""{
"bar": {
+ "basename": "baz.txt",
"class": "File",
"location": "baz.txt",
"size": 4
},
"foo": {
+ "basename": "foo.txt",
"class": "File",
"location": "foo.txt",
"size": 3
DataManagerToken = "320mkve8qkswstz7ff61glpk3mhgghmg67wmic7elw4z41pke1"
ManagementToken = "jg3ajndnq63sywcd50gbs5dskdc9ckkysb0nsqmfz08nwf17nl"
ActiveUserUUID = "zzzzz-tpzed-xurymjxw79nv3jz"
+ FederatedActiveUserUUID = "zbbbb-tpzed-xurymjxw79nv3jz"
SpectatorUserUUID = "zzzzz-tpzed-l1s2piq4t4mps8r"
UserAgreementCollection = "zzzzz-4zz18-uukreo9rbgwsujr" // user_agreement_in_anonymously_accessible_project
FooCollection = "zzzzz-4zz18-fy296fx3hot09f7"
role: Computational biologist
getting_started_shown: 2015-03-26 12:34:56.789000000 Z
+federated_active:
+ owner_uuid: zzzzz-tpzed-000000000000000
+ uuid: zbbbb-tpzed-xurymjxw79nv3jz
+ email: zbbbb-active-user@arvados.local
+ first_name: Active
+ last_name: User
+ identity_url: https://active-user.openid.local
+ is_active: true
+ is_admin: false
+ username: federatedactive
+ prefs:
+ profile:
+ organization: example.com
+ role: Computational biologist
+ getting_started_shown: 2015-03-26 12:34:56.789000000 Z
+
project_viewer:
owner_uuid: zzzzz-tpzed-000000000000000
uuid: zzzzz-tpzed-projectviewer1a
// Dispatcher service for Crunch that submits containers to the slurm queue.
import (
- "bytes"
"context"
"flag"
"fmt"
"log"
"math"
"os"
- "os/exec"
"regexp"
"strings"
"time"
// Minimum time between two attempts to run the same container
MinRetryPeriod arvados.Duration
+
+ slurm Slurm
}
func main() {
+ theConfig.slurm = &slurmCLI{}
err := doMain()
if err != nil {
log.Fatal(err)
return (1000 - priority) * 10
}
-// sbatchCmd
-func sbatchFunc(container arvados.Container) *exec.Cmd {
+func sbatchArgs(container arvados.Container) []string {
mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM) / float64(1048576)))
var disk int64
sbatchArgs = append(sbatchArgs, fmt.Sprintf("--partition=%s", strings.Join(container.SchedulingParameters.Partitions, ",")))
}
- return exec.Command("sbatch", sbatchArgs...)
-}
-
-// scancelCmd
-func scancelFunc(container arvados.Container) *exec.Cmd {
- return exec.Command("scancel", "--name="+container.UUID)
-}
-
-// scontrolCmd
-func scontrolFunc(container arvados.Container) *exec.Cmd {
- return exec.Command("scontrol", "update", "JobName="+container.UUID, fmt.Sprintf("Nice=%d", niceness(container.Priority)))
+ return sbatchArgs
}
-// Wrap these so that they can be overridden by tests
-var sbatchCmd = sbatchFunc
-var scancelCmd = scancelFunc
-var scontrolCmd = scontrolFunc
-
-// Submit job to slurm using sbatch.
func submit(dispatcher *dispatch.Dispatcher, container arvados.Container, crunchRunCommand []string) error {
- cmd := sbatchCmd(container)
-
- // Send a tiny script on stdin to execute the crunch-run
- // command (slurm requires this to be a #! script)
-
// append() here avoids modifying crunchRunCommand's
// underlying array, which is shared with other goroutines.
- args := append([]string(nil), crunchRunCommand...)
- args = append(args, container.UUID)
- cmd.Stdin = strings.NewReader(execScript(args))
-
- var stdout, stderr bytes.Buffer
- cmd.Stdout = &stdout
- cmd.Stderr = &stderr
+ crArgs := append([]string(nil), crunchRunCommand...)
+ crArgs = append(crArgs, container.UUID)
+ crScript := strings.NewReader(execScript(crArgs))
- // Mutex between squeue sync and running sbatch or scancel.
sqCheck.L.Lock()
defer sqCheck.L.Unlock()
- log.Printf("exec sbatch %+q", cmd.Args)
- err := cmd.Run()
-
- switch err.(type) {
- case nil:
- log.Printf("sbatch succeeded: %q", strings.TrimSpace(stdout.String()))
- return nil
-
- case *exec.ExitError:
- dispatcher.Unlock(container.UUID)
- return fmt.Errorf("sbatch %+q failed: %v (stderr: %q)", cmd.Args, err, stderr.Bytes())
-
- default:
- dispatcher.Unlock(container.UUID)
- return fmt.Errorf("exec failed: %v", err)
- }
+ sbArgs := sbatchArgs(container)
+ log.Printf("running sbatch %+q", sbArgs)
+ return theConfig.slurm.Batch(crScript, sbArgs)
}
// Submit a container to the slurm queue (or resume monitoring if it's
} else if updated.Priority == 0 {
log.Printf("Container %s has state %q, priority %d: cancel slurm job", ctr.UUID, updated.State, updated.Priority)
scancel(ctr)
- } else if niceness(updated.Priority) != sqCheck.GetNiceness(ctr.UUID) && sqCheck.GetNiceness(ctr.UUID) != -1 {
- // dynamically adjust priority
- log.Printf("Container priority %v != %v", niceness(updated.Priority), sqCheck.GetNiceness(ctr.UUID))
- scontrolUpdate(updated)
+ } else {
+ renice(updated)
}
}
}
func scancel(ctr arvados.Container) {
sqCheck.L.Lock()
- cmd := scancelCmd(ctr)
- msg, err := cmd.CombinedOutput()
+ err := theConfig.slurm.Cancel(ctr.UUID)
sqCheck.L.Unlock()
if err != nil {
- log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg)
+ log.Printf("scancel: %s", err)
time.Sleep(time.Second)
} else if sqCheck.HasUUID(ctr.UUID) {
log.Printf("container %s is still in squeue after scancel", ctr.UUID)
}
}
-func scontrolUpdate(ctr arvados.Container) {
+func renice(ctr arvados.Container) {
+ nice := niceness(ctr.Priority)
+ oldnice := sqCheck.GetNiceness(ctr.UUID)
+ if nice == oldnice || oldnice == -1 {
+ return
+ }
+ log.Printf("updating slurm nice value to %d (was %d)", nice, oldnice)
sqCheck.L.Lock()
- cmd := scontrolCmd(ctr)
- msg, err := cmd.CombinedOutput()
+ err := theConfig.slurm.Renice(ctr.UUID, nice)
sqCheck.L.Unlock()
if err != nil {
- log.Printf("%q %q: %s %q", cmd.Path, cmd.Args, err, msg)
+ log.Printf("renice: %s", err)
time.Sleep(time.Second)
- } else if sqCheck.HasUUID(ctr.UUID) {
- log.Printf("Container %s priority is now %v, niceness is now %v",
+ return
+ }
+ if sqCheck.HasUUID(ctr.UUID) {
+ log.Printf("container %s has arvados priority %d, slurm nice %d",
ctr.UUID, ctr.Priority, sqCheck.GetNiceness(ctr.UUID))
}
}
import (
"bytes"
"context"
+ "errors"
"fmt"
"io"
"io/ioutil"
arvadostest.ResetEnv()
}
-func (s *TestSuite) integrationTest(c *C,
- newSqueueCmd func() *exec.Cmd,
- newScancelCmd func(arvados.Container) *exec.Cmd,
- newSbatchCmd func(arvados.Container) *exec.Cmd,
- newScontrolCmd func(arvados.Container) *exec.Cmd,
- sbatchCmdComps []string,
- runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
- arvadostest.ResetEnv()
+type slurmFake struct {
+ didBatch [][]string
+ didCancel []string
+ didRenice [][]string
+ queue string
+ // If non-nil, run this func during the 2nd+ call to Cancel()
+ onCancel func()
+ // Error returned by Batch()
+ errBatch error
+}
- arv, err := arvadosclient.MakeArvadosClient()
- c.Assert(err, IsNil)
+func (sf *slurmFake) Batch(script io.Reader, args []string) error {
+ sf.didBatch = append(sf.didBatch, args)
+ return sf.errBatch
+}
- var sbatchCmdLine []string
+func (sf *slurmFake) QueueCommand(args []string) *exec.Cmd {
+ return exec.Command("echo", sf.queue)
+}
- // Override sbatchCmd
- defer func(orig func(arvados.Container) *exec.Cmd) {
- sbatchCmd = orig
- }(sbatchCmd)
+func (sf *slurmFake) Renice(name string, nice int) error {
+ sf.didRenice = append(sf.didRenice, []string{name, fmt.Sprintf("%d", nice)})
+ return nil
+}
- if newSbatchCmd != nil {
- sbatchCmd = newSbatchCmd
- } else {
- sbatchCmd = func(container arvados.Container) *exec.Cmd {
- sbatchCmdLine = sbatchFunc(container).Args
- return exec.Command("sh")
- }
+func (sf *slurmFake) Cancel(name string) error {
+ sf.didCancel = append(sf.didCancel, name)
+ if len(sf.didCancel) == 1 {
+ // simulate error on first attempt
+ return errors.New("something terrible happened")
+ }
+ if sf.onCancel != nil {
+ sf.onCancel()
}
+ return nil
+}
- // Override squeueCmd
- defer func(orig func() *exec.Cmd) {
- squeueCmd = orig
- }(squeueCmd)
- squeueCmd = newSqueueCmd
+func (s *TestSuite) integrationTest(c *C, slurm *slurmFake,
+ expectBatch [][]string,
+ runContainer func(*dispatch.Dispatcher, arvados.Container)) arvados.Container {
+ arvadostest.ResetEnv()
- // Override scancel
- defer func(orig func(arvados.Container) *exec.Cmd) {
- scancelCmd = orig
- }(scancelCmd)
- scancelCmd = newScancelCmd
+ arv, err := arvadosclient.MakeArvadosClient()
+ c.Assert(err, IsNil)
- // Override scontrol
- defer func(orig func(arvados.Container) *exec.Cmd) {
- scontrolCmd = orig
- }(scontrolCmd)
- scontrolCmd = newScontrolCmd
+ defer func(orig Slurm) {
+ theConfig.slurm = orig
+ }(theConfig.slurm)
+ theConfig.slurm = slurm
// There should be one queued container
params := arvadosclient.Dict{
RunContainer: func(disp *dispatch.Dispatcher, ctr arvados.Container, status <-chan arvados.Container) {
go func() {
runContainer(disp, ctr)
+ slurm.queue = ""
doneRun <- struct{}{}
}()
run(disp, ctr, status)
sqCheck.Stop()
- c.Check(sbatchCmdLine, DeepEquals, sbatchCmdComps)
+ c.Check(slurm.didBatch, DeepEquals, expectBatch)
// There should be no queued containers now
err = arv.List("containers", params, &containers)
}
func (s *TestSuite) TestIntegrationNormal(c *C) {
- done := false
container := s.integrationTest(c,
- func() *exec.Cmd {
- if done {
- return exec.Command("true")
- } else {
- return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100")
- }
- },
- nil,
+ &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"},
nil,
- nil,
- []string(nil),
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
time.Sleep(3 * time.Second)
dispatcher.UpdateState(container.UUID, dispatch.Complete)
- done = true
})
c.Check(container.State, Equals, arvados.ContainerStateComplete)
}
func (s *TestSuite) TestIntegrationCancel(c *C) {
- var cmd *exec.Cmd
- var scancelCmdLine []string
- attempt := 0
-
+ slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+ readyToCancel := make(chan bool)
+ slurm.onCancel = func() { <-readyToCancel }
container := s.integrationTest(c,
- func() *exec.Cmd {
- if cmd != nil && cmd.ProcessState != nil {
- return exec.Command("true")
- } else {
- return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100")
- }
- },
- func(container arvados.Container) *exec.Cmd {
- if attempt++; attempt == 1 {
- return exec.Command("false")
- } else {
- scancelCmdLine = scancelFunc(container).Args
- cmd = exec.Command("echo")
- return cmd
- }
- },
- nil,
+ slurm,
nil,
- []string(nil),
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
- time.Sleep(1 * time.Second)
+ time.Sleep(time.Second)
dispatcher.Arv.Update("containers", container.UUID,
arvadosclient.Dict{
"container": arvadosclient.Dict{"priority": 0}},
nil)
+ readyToCancel <- true
+ close(readyToCancel)
})
c.Check(container.State, Equals, arvados.ContainerStateCancelled)
- c.Check(scancelCmdLine, DeepEquals, []string{"scancel", "--name=zzzzz-dz642-queuedcontainer"})
+ c.Check(len(slurm.didCancel) > 1, Equals, true)
+ c.Check(slurm.didCancel[:2], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "zzzzz-dz642-queuedcontainer"})
}
func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
- container := s.integrationTest(c,
- func() *exec.Cmd { return exec.Command("echo") },
- nil,
- nil,
- nil,
- []string{"sbatch",
+ container := s.integrationTest(c, &slurmFake{},
+ [][]string{{
fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
fmt.Sprintf("--mem=%d", 11445),
fmt.Sprintf("--cpus-per-task=%d", 4),
fmt.Sprintf("--tmp=%d", 45777),
- fmt.Sprintf("--nice=%d", 9990)},
+ fmt.Sprintf("--nice=%d", 9990)}},
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
time.Sleep(3 * time.Second)
func (s *TestSuite) TestSbatchFail(c *C) {
container := s.integrationTest(c,
- func() *exec.Cmd { return exec.Command("echo") },
- nil,
- func(container arvados.Container) *exec.Cmd {
- return exec.Command("false")
- },
- nil,
- []string(nil),
+ &slurmFake{errBatch: errors.New("something terrible happened")},
+ [][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--mem=11445", "--cpus-per-task=4", "--tmp=45777", "--nice=9990"}},
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
dispatcher.UpdateState(container.UUID, dispatch.Complete)
}
func testSbatchFuncWithArgs(c *C, args []string) {
+ defer func() { theConfig.SbatchArguments = nil }()
theConfig.SbatchArguments = append(theConfig.SbatchArguments, args...)
container := arvados.Container{
UUID: "123",
RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 2},
Priority: 1}
- sbatchCmd := sbatchFunc(container)
var expected []string
- expected = append(expected, "sbatch")
expected = append(expected, theConfig.SbatchArguments...)
expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=2", "--tmp=0", "--nice=9990")
-
- c.Check(sbatchCmd.Args, DeepEquals, expected)
+ c.Check(sbatchArgs(container), DeepEquals, expected)
}
func (s *MockArvadosServerSuite) TestSbatchPartition(c *C) {
- theConfig.SbatchArguments = nil
container := arvados.Container{
UUID: "123",
RuntimeConstraints: arvados.RuntimeConstraints{RAM: 250000000, VCPUs: 1},
SchedulingParameters: arvados.SchedulingParameters{Partitions: []string{"blurb", "b2"}},
Priority: 1}
- sbatchCmd := sbatchFunc(container)
- var expected []string
- expected = append(expected, "sbatch")
- expected = append(expected, "--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=9990", "--partition=blurb,b2")
-
- c.Check(sbatchCmd.Args, DeepEquals, expected)
+ c.Check(sbatchArgs(container), DeepEquals, []string{
+ "--job-name=123", "--mem=239", "--cpus-per-task=1", "--tmp=0", "--nice=9990",
+ "--partition=blurb,b2",
+ })
}
func (s *TestSuite) TestIntegrationChangePriority(c *C) {
- var scontrolCmdLine []string
- step := 0
-
- container := s.integrationTest(c,
- func() *exec.Cmd {
- if step == 0 {
- return exec.Command("echo", "zzzzz-dz642-queuedcontainer 9990 100")
- } else if step == 1 {
- return exec.Command("echo", "zzzzz-dz642-queuedcontainer 4000 100")
- } else {
- return exec.Command("echo")
- }
- },
- func(arvados.Container) *exec.Cmd { return exec.Command("true") },
- nil,
- func(container arvados.Container) *exec.Cmd {
- scontrolCmdLine = scontrolFunc(container).Args
- step = 1
- return exec.Command("true")
- },
- []string(nil),
+ slurm := &slurmFake{queue: "zzzzz-dz642-queuedcontainer 9990 100\n"}
+ container := s.integrationTest(c, slurm, nil,
func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
dispatcher.UpdateState(container.UUID, dispatch.Running)
- time.Sleep(1 * time.Second)
+ time.Sleep(time.Second)
dispatcher.Arv.Update("containers", container.UUID,
arvadosclient.Dict{
"container": arvadosclient.Dict{"priority": 600}},
nil)
- time.Sleep(1 * time.Second)
- step = 2
+ time.Sleep(time.Second)
dispatcher.UpdateState(container.UUID, dispatch.Complete)
})
c.Check(container.State, Equals, arvados.ContainerStateComplete)
- c.Check(scontrolCmdLine, DeepEquals, []string{"scontrol", "update", "JobName=zzzzz-dz642-queuedcontainer", "Nice=4000"})
+ c.Assert(len(slurm.didRenice), Not(Equals), 0)
+ c.Check(slurm.didRenice[len(slurm.didRenice)-1], DeepEquals, []string{"zzzzz-dz642-queuedcontainer", "4000"})
}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "fmt"
+ "io"
+ "log"
+ "os/exec"
+ "strings"
+)
+
+type Slurm interface {
+ Cancel(name string) error
+ Renice(name string, nice int) error
+ QueueCommand(args []string) *exec.Cmd
+ Batch(script io.Reader, args []string) error
+}
+
+type slurmCLI struct{}
+
+func (scli *slurmCLI) Batch(script io.Reader, args []string) error {
+ return scli.run(script, "sbatch", args)
+}
+
+func (scli *slurmCLI) Cancel(name string) error {
+ for _, args := range [][]string{
+ // If the slurm job hasn't started yet, remove it from
+ // the queue.
+ {"--state=pending"},
+ // If the slurm job has started, send SIGTERM. If we
+ // cancel a running job without a --signal argument,
+ // slurm will send SIGTERM and then (after some
+ // site-configured interval) SIGKILL. This would kill
+ // crunch-run without stopping the container, which we
+ // don't want.
+ {"--batch", "--signal=TERM", "--state=running"},
+ {"--batch", "--signal=TERM", "--state=suspended"},
+ } {
+ err := scli.run(nil, "scancel", append([]string{"--name=" + name}, args...))
+ if err != nil {
+ // scancel exits 0 if no job matches the given
+ // name and state. Any error from scancel here
+ // really indicates something is wrong.
+ return err
+ }
+ }
+ return nil
+}
+
+func (scli *slurmCLI) QueueCommand(args []string) *exec.Cmd {
+ return exec.Command("squeue", args...)
+}
+
+func (scli *slurmCLI) Renice(name string, nice int) error {
+ return scli.run(nil, "scontrol", []string{"update", "JobName=" + name, fmt.Sprintf("Nice=%d", nice)})
+}
+
+func (scli *slurmCLI) run(stdin io.Reader, prog string, args []string) error {
+ cmd := exec.Command(prog, args...)
+ cmd.Stdin = stdin
+ out, err := cmd.CombinedOutput()
+ outTrim := strings.TrimSpace(string(out))
+ if err != nil || len(out) > 0 {
+ log.Printf("%q %q: %q", cmd.Path, cmd.Args, outTrim)
+ }
+ if err != nil {
+ err = fmt.Errorf("%s: %s (%q)", cmd.Path, err, outTrim)
+ }
+ return err
+}
"bytes"
"fmt"
"log"
- "os/exec"
"strings"
"sync"
"time"
sync.Cond
}
-func squeueFunc() *exec.Cmd {
- return exec.Command("squeue", "--all", "--format=%j %y %Q")
-}
-
-var squeueCmd = squeueFunc
-
// HasUUID checks if a given container UUID is in the slurm queue.
// This does not run squeue directly, but instead blocks until woken
// up by next successful update of squeue.
sqc.L.Lock()
defer sqc.L.Unlock()
- cmd := squeueCmd()
+ cmd := theConfig.slurm.QueueCommand([]string{"--all", "--format=%j %y %Q"})
stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
cmd.Stdout, cmd.Stderr = stdout, stderr
if err := cmd.Run(); err != nil {
import (
"bytes"
- "context"
"encoding/json"
"errors"
"flag"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
+ "golang.org/x/net/context"
dockertypes "github.com/docker/docker/api/types"
dockercontainer "github.com/docker/docker/api/types/container"
ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error)
ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error
- ContainerStop(ctx context.Context, container string, timeout *time.Duration) error
+ ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error
ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error)
ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error)
ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error)
ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
}
-// ThinDockerClientProxy is a proxy implementation of ThinDockerClient
-// that executes the docker requests on dockerclient.Client
-type ThinDockerClientProxy struct {
- Docker *dockerclient.Client
-}
-
-// ContainerAttach invokes dockerclient.Client.ContainerAttach
-func (proxy ThinDockerClientProxy) ContainerAttach(ctx context.Context, container string, options dockertypes.ContainerAttachOptions) (dockertypes.HijackedResponse, error) {
- return proxy.Docker.ContainerAttach(ctx, container, options)
-}
-
-// ContainerCreate invokes dockerclient.Client.ContainerCreate
-func (proxy ThinDockerClientProxy) ContainerCreate(ctx context.Context, config *dockercontainer.Config, hostConfig *dockercontainer.HostConfig,
- networkingConfig *dockernetwork.NetworkingConfig, containerName string) (dockercontainer.ContainerCreateCreatedBody, error) {
- return proxy.Docker.ContainerCreate(ctx, config, hostConfig, networkingConfig, containerName)
-}
-
-// ContainerStart invokes dockerclient.Client.ContainerStart
-func (proxy ThinDockerClientProxy) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
- return proxy.Docker.ContainerStart(ctx, container, options)
-}
-
-// ContainerStop invokes dockerclient.Client.ContainerStop
-func (proxy ThinDockerClientProxy) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
- return proxy.Docker.ContainerStop(ctx, container, timeout)
-}
-
-// ContainerWait invokes dockerclient.Client.ContainerWait
-func (proxy ThinDockerClientProxy) ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) {
- return proxy.Docker.ContainerWait(ctx, container, condition)
-}
-
-// ImageInspectWithRaw invokes dockerclient.Client.ImageInspectWithRaw
-func (proxy ThinDockerClientProxy) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
- return proxy.Docker.ImageInspectWithRaw(ctx, image)
-}
-
-// ImageLoad invokes dockerclient.Client.ImageLoad
-func (proxy ThinDockerClientProxy) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
- return proxy.Docker.ImageLoad(ctx, input, quiet)
-}
-
-// ImageRemove invokes dockerclient.Client.ImageRemove
-func (proxy ThinDockerClientProxy) ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error) {
- return proxy.Docker.ImageRemove(ctx, image, options)
-}
-
// ContainerRunner is the main stateful struct used for a single execution of a
// container.
type ContainerRunner struct {
LogsPDH *string
RunArvMount
MkTempDir
- ArvMount *exec.Cmd
- ArvMountPoint string
- HostOutputDir string
- CleanupTempDir []string
- Binds []string
- Volumes map[string]struct{}
- OutputPDH *string
- SigChan chan os.Signal
- ArvMountExit chan error
- finalState string
-
- statLogger io.WriteCloser
- statReporter *crunchstat.Reporter
- statInterval time.Duration
- cgroupRoot string
+ ArvMount *exec.Cmd
+ ArvMountPoint string
+ HostOutputDir string
+ Binds []string
+ Volumes map[string]struct{}
+ OutputPDH *string
+ SigChan chan os.Signal
+ ArvMountExit chan error
+ finalState string
+ parentTemp string
+
+ statLogger io.WriteCloser
+ statReporter *crunchstat.Reporter
+ hoststatLogger io.WriteCloser
+ hoststatReporter *crunchstat.Reporter
+ statInterval time.Duration
+ cgroupRoot string
// What we expect the container's cgroup parent to be.
expectCgroupParent string
// What we tell docker to use as the container's cgroup
setCgroupParent string
cStateLock sync.Mutex
- cStarted bool // StartContainer() succeeded
cCancelled bool // StopContainer() invoked
enableNetwork string // one of "default" or "always"
signal.Notify(runner.SigChan, syscall.SIGQUIT)
go func(sig chan os.Signal) {
- s := <-sig
- if s != nil {
- runner.CrunchLog.Printf("Caught signal %v", s)
+ for s := range sig {
+ runner.CrunchLog.Printf("caught signal: %v", s)
+ runner.stop()
}
- runner.stop()
}(runner.SigChan)
}
func (runner *ContainerRunner) stop() {
runner.cStateLock.Lock()
defer runner.cStateLock.Unlock()
- if runner.cCancelled {
+ if runner.ContainerID == "" {
return
}
runner.cCancelled = true
- if runner.cStarted {
- timeout := time.Duration(10)
- err := runner.Docker.ContainerStop(context.TODO(), runner.ContainerID, &(timeout))
- if err != nil {
- runner.CrunchLog.Printf("StopContainer failed: %s", err)
- }
- // Suppress multiple calls to stop()
- runner.cStarted = false
+ runner.CrunchLog.Printf("removing container")
+ err := runner.Docker.ContainerRemove(context.TODO(), runner.ContainerID, dockertypes.ContainerRemoveOptions{Force: true})
+ if err != nil {
+ runner.CrunchLog.Printf("error removing container: %s", err)
}
}
func (runner *ContainerRunner) stopSignals() {
if runner.SigChan != nil {
signal.Stop(runner.SigChan)
- close(runner.SigChan)
}
}
func (runner *ContainerRunner) SetupArvMountPoint(prefix string) (err error) {
if runner.ArvMountPoint == "" {
- runner.ArvMountPoint, err = runner.MkTempDir("", prefix)
+ runner.ArvMountPoint, err = runner.MkTempDir(runner.parentTemp, prefix)
}
return
}
+func copyfile(src string, dst string) (err error) {
+ srcfile, err := os.Open(src)
+ if err != nil {
+ return
+ }
+
+ os.MkdirAll(path.Dir(dst), 0777)
+
+ dstfile, err := os.Create(dst)
+ if err != nil {
+ return
+ }
+ _, err = io.Copy(dstfile, srcfile)
+ if err != nil {
+ return
+ }
+
+ err = srcfile.Close()
+ err2 := dstfile.Close()
+
+ if err != nil {
+ return
+ }
+
+ if err2 != nil {
+ return err2
+ }
+
+ return nil
+}
+
func (runner *ContainerRunner) SetupMounts() (err error) {
err = runner.SetupArvMountPoint("keep")
if err != nil {
runner.Binds = nil
runner.Volumes = make(map[string]struct{})
needCertMount := true
+ type copyFile struct {
+ src string
+ bind string
+ }
+ var copyFiles []copyFile
var binds []string
for bind := range runner.Container.Mounts {
pdhOnly = false
src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
} else if mnt.PortableDataHash != "" {
- if mnt.Writable {
+ if mnt.Writable && !strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
return fmt.Errorf("Can never write to a collection specified by portable data hash")
}
idx := strings.Index(mnt.PortableDataHash, "/")
if mnt.Writable {
if bind == runner.Container.OutputPath {
runner.HostOutputDir = src
+ runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
} else if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
- return fmt.Errorf("Writable mount points are not permitted underneath the output_path: %v", bind)
+ copyFiles = append(copyFiles, copyFile{src, runner.HostOutputDir + bind[len(runner.Container.OutputPath):]})
+ } else {
+ runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
}
- runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
} else {
runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
}
case mnt.Kind == "tmp":
var tmpdir string
- tmpdir, err = runner.MkTempDir("", "")
+ tmpdir, err = runner.MkTempDir(runner.parentTemp, "tmp")
if err != nil {
return fmt.Errorf("While creating mount temp dir: %v", err)
}
if staterr != nil {
return fmt.Errorf("While Chmod temp dir: %v", err)
}
- runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", tmpdir, bind))
if bind == runner.Container.OutputPath {
runner.HostOutputDir = tmpdir
// can ensure the file is world-readable
// inside the container, without having to
// make it world-readable on the docker host.
- tmpdir, err := runner.MkTempDir("", "")
+ tmpdir, err := runner.MkTempDir(runner.parentTemp, "json")
if err != nil {
return fmt.Errorf("creating temp dir: %v", err)
}
- runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
tmpfn := filepath.Join(tmpdir, "mountdata.json")
err = ioutil.WriteFile(tmpfn, jsondata, 0644)
if err != nil {
runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", tmpfn, bind))
case mnt.Kind == "git_tree":
- tmpdir, err := runner.MkTempDir("", "")
+ tmpdir, err := runner.MkTempDir(runner.parentTemp, "git_tree")
if err != nil {
return fmt.Errorf("creating temp dir: %v", err)
}
- runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
err = gitMount(mnt).extractTree(runner.ArvClient, tmpdir, token)
if err != nil {
return err
}
}
+ for _, cp := range copyFiles {
+ st, err := os.Stat(cp.src)
+ if err != nil {
+ return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err)
+ }
+ if st.IsDir() {
+ err = filepath.Walk(cp.src, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
+ if walkerr != nil {
+ return walkerr
+ }
+ target := path.Join(cp.bind, walkpath[len(cp.src):])
+ if walkinfo.Mode().IsRegular() {
+ copyerr := copyfile(walkpath, target)
+ if copyerr != nil {
+ return copyerr
+ }
+ return os.Chmod(target, walkinfo.Mode()|0777)
+ } else if walkinfo.Mode().IsDir() {
+ mkerr := os.MkdirAll(target, 0777)
+ if mkerr != nil {
+ return mkerr
+ }
+ return os.Chmod(target, walkinfo.Mode()|os.ModeSetgid|0777)
+ } else {
+ return fmt.Errorf("Source %q is not a regular file or directory", cp.src)
+ }
+ })
+ } else if st.Mode().IsRegular() {
+ err = copyfile(cp.src, cp.bind)
+ if err == nil {
+ err = os.Chmod(cp.bind, st.Mode()|0777)
+ }
+ }
+ if err != nil {
+ return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err)
+ }
+ }
+
return nil
}
func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
// Handle docker log protocol
// https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
+ defer close(runner.loggingDone)
header := make([]byte, 8)
- for {
- _, readerr := io.ReadAtLeast(containerReader, header, 8)
-
- if readerr == nil {
- readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
- if header[0] == 1 {
- // stdout
- _, readerr = io.CopyN(runner.Stdout, containerReader, readsize)
- } else {
- // stderr
- _, readerr = io.CopyN(runner.Stderr, containerReader, readsize)
+ var err error
+ for err == nil {
+ _, err = io.ReadAtLeast(containerReader, header, 8)
+ if err != nil {
+ if err == io.EOF {
+ err = nil
}
+ break
}
+ readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
+ if header[0] == 1 {
+ // stdout
+ _, err = io.CopyN(runner.Stdout, containerReader, readsize)
+ } else {
+ // stderr
+ _, err = io.CopyN(runner.Stderr, containerReader, readsize)
+ }
+ }
- if readerr != nil {
- if readerr != io.EOF {
- runner.CrunchLog.Printf("While reading docker logs: %v", readerr)
- }
-
- closeerr := runner.Stdout.Close()
- if closeerr != nil {
- runner.CrunchLog.Printf("While closing stdout logs: %v", closeerr)
- }
+ if err != nil {
+ runner.CrunchLog.Printf("error reading docker logs: %v", err)
+ }
- closeerr = runner.Stderr.Close()
- if closeerr != nil {
- runner.CrunchLog.Printf("While closing stderr logs: %v", closeerr)
- }
+ err = runner.Stdout.Close()
+ if err != nil {
+ runner.CrunchLog.Printf("error closing stdout logs: %v", err)
+ }
- if runner.statReporter != nil {
- runner.statReporter.Stop()
- closeerr = runner.statLogger.Close()
- if closeerr != nil {
- runner.CrunchLog.Printf("While closing crunchstat logs: %v", closeerr)
- }
- }
+ err = runner.Stderr.Close()
+ if err != nil {
+ runner.CrunchLog.Printf("error closing stderr logs: %v", err)
+ }
- runner.loggingDone <- true
- close(runner.loggingDone)
- return
+ if runner.statReporter != nil {
+ runner.statReporter.Stop()
+ err = runner.statLogger.Close()
+ if err != nil {
+ runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
}
}
}
-func (runner *ContainerRunner) StartCrunchstat() {
+func (runner *ContainerRunner) stopHoststat() error {
+ if runner.hoststatReporter == nil {
+ return nil
+ }
+ runner.hoststatReporter.Stop()
+ err := runner.hoststatLogger.Close()
+ if err != nil {
+ return fmt.Errorf("error closing hoststat logs: %v", err)
+ }
+ return nil
+}
+
+func (runner *ContainerRunner) startHoststat() {
+ runner.hoststatLogger = NewThrottledLogger(runner.NewLogWriter("hoststat"))
+ runner.hoststatReporter = &crunchstat.Reporter{
+ Logger: log.New(runner.hoststatLogger, "", 0),
+ CgroupRoot: runner.cgroupRoot,
+ PollPeriod: runner.statInterval,
+ }
+ runner.hoststatReporter.Start()
+}
+
+func (runner *ContainerRunner) startCrunchstat() {
runner.statLogger = NewThrottledLogger(runner.NewLogWriter("crunchstat"))
runner.statReporter = &crunchstat.Reporter{
CID: runner.ContainerID,
}
return fmt.Errorf("could not start container: %v%s", err, advice)
}
- runner.cStarted = true
return nil
}
// WaitFinish waits for the container to terminate, capture the exit code, and
// close the stdout/stderr logging.
-func (runner *ContainerRunner) WaitFinish() (err error) {
+func (runner *ContainerRunner) WaitFinish() error {
runner.CrunchLog.Print("Waiting for container to finish")
- waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, "not-running")
+ waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning)
+ arvMountExit := runner.ArvMountExit
+ for {
+ select {
+ case waitBody := <-waitOk:
+ runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
+ code := int(waitBody.StatusCode)
+ runner.ExitCode = &code
+
+ // wait for stdout/stderr to complete
+ <-runner.loggingDone
+ return nil
+
+ case err := <-waitErr:
+ return fmt.Errorf("container wait: %v", err)
- go func() {
- <-runner.ArvMountExit
- if runner.cStarted {
+ case <-arvMountExit:
runner.CrunchLog.Printf("arv-mount exited while container is still running. Stopping container.")
runner.stop()
+ // arvMountExit will always be ready now that
+ // it's closed, but that doesn't interest us.
+ arvMountExit = nil
}
- }()
-
- var waitBody dockercontainer.ContainerWaitOKBody
- select {
- case waitBody = <-waitOk:
- case err = <-waitErr:
- }
-
- // Container isn't running any more
- runner.cStarted = false
-
- if err != nil {
- return fmt.Errorf("container wait: %v", err)
}
-
- runner.CrunchLog.Printf("Container exited with code: %v", waitBody.StatusCode)
- code := int(waitBody.StatusCode)
- runner.ExitCode = &code
-
- // wait for stdout/stderr to complete
- <-runner.loggingDone
-
- return nil
}
var ErrNotInOutputDir = fmt.Errorf("Must point to path within the output directory")
// go through mounts and try reverse map to collection reference
for _, bind := range binds {
mnt := runner.Container.Mounts[bind]
- if tgt == bind || strings.HasPrefix(tgt, bind+"/") {
+ if (tgt == bind || strings.HasPrefix(tgt, bind+"/")) && !mnt.Writable {
// get path relative to bind
targetSuffix := tgt[len(bind):]
// HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
func (runner *ContainerRunner) CaptureOutput() error {
- if runner.finalState != "Complete" {
- return nil
- }
-
if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
// Output may have been set directly by the container, so
// refresh the container record to check.
continue
}
- if mnt.ExcludeFromOutput == true {
+ if mnt.ExcludeFromOutput == true || mnt.Writable {
continue
}
}
}
- for _, tmpdir := range runner.CleanupTempDir {
- if rmerr := os.RemoveAll(tmpdir); rmerr != nil {
- runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
- }
+ if rmerr := os.RemoveAll(runner.parentTemp); rmerr != nil {
+ runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", runner.parentTemp, rmerr)
}
}
}
checkErr(runner.CaptureOutput())
+ checkErr(runner.stopHoststat())
checkErr(runner.CommitLogs())
checkErr(runner.UpdateContainerFinal())
}()
if err != nil {
return
}
-
- // setup signal handling
runner.setupSignals()
+ runner.startHoststat()
// check for and/or load image
err = runner.LoadImage()
}
runner.finalState = "Cancelled"
- runner.StartCrunchstat()
+ runner.startCrunchstat()
err = runner.StartContainer()
if err != nil {
}
err = runner.WaitFinish()
- if err == nil {
+ if err == nil && !runner.IsCancelled() {
runner.finalState = "Complete"
}
return
// API version 1.21 corresponds to Docker 1.9, which is currently the
// minimum version we want to support.
docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
- dockerClientProxy := ThinDockerClientProxy{Docker: docker}
-
- cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
+ cr := NewContainerRunner(api, kc, docker, containerId)
if dockererr != nil {
cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
cr.checkBrokenNode(dockererr)
os.Exit(1)
}
+ parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerId+".")
+ if tmperr != nil {
+ log.Fatalf("%s: %v", containerId, tmperr)
+ }
+
+ cr.parentTemp = parentTemp
cr.statInterval = *statInterval
cr.cgroupRoot = *cgroupRoot
cr.expectCgroupParent = *cgroupParent
import (
"bufio"
"bytes"
- "context"
"crypto/md5"
"encoding/json"
"errors"
"net"
"os"
"os/exec"
- "path/filepath"
"runtime/pprof"
"sort"
"strings"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
"git.curoverse.com/arvados.git/sdk/go/manifest"
+ "golang.org/x/net/context"
dockertypes "github.com/docker/docker/api/types"
dockercontainer "github.com/docker/docker/api/types/container"
TestingT(t)
}
-type TestSuite struct{}
-
// Gocheck boilerplate
var _ = Suite(&TestSuite{})
+type TestSuite struct {
+ docker *TestDockerClient
+}
+
+func (s *TestSuite) SetUpTest(c *C) {
+ s.docker = NewTestDockerClient()
+}
+
type ArvTestClient struct {
Total int64
Calls int
logReader io.ReadCloser
logWriter io.WriteCloser
fn func(t *TestDockerClient)
- finish int
+ exitCode int
stop chan bool
cwd string
env []string
api *ArvTestClient
realTemp string
+ calledWait bool
}
-func NewTestDockerClient(exitCode int) *TestDockerClient {
+func NewTestDockerClient() *TestDockerClient {
t := &TestDockerClient{}
t.logReader, t.logWriter = io.Pipe()
- t.finish = exitCode
t.stop = make(chan bool, 1)
t.cwd = "/"
return t
}
func (t *TestDockerClient) ContainerStart(ctx context.Context, container string, options dockertypes.ContainerStartOptions) error {
- if t.finish == 3 {
+ if t.exitCode == 3 {
return errors.New(`Error response from daemon: oci runtime error: container_linux.go:247: starting container process caused "process_linux.go:359: container init caused \"rootfs_linux.go:54: mounting \\\"/tmp/keep453790790/by_id/99999999999999999999999999999999+99999/myGenome\\\" to rootfs \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged\\\" at \\\"/tmp/docker/overlay2/9999999999999999999999999999999999999999999999999999999999999999/merged/keep/99999999999999999999999999999999+99999/myGenome\\\" caused \\\"no such file or directory\\\"\""`)
}
- if t.finish == 4 {
+ if t.exitCode == 4 {
return errors.New(`panic: standard_init_linux.go:175: exec user process caused "no such file or directory"`)
}
- if t.finish == 5 {
+ if t.exitCode == 5 {
return errors.New(`Error response from daemon: Cannot start container 41f26cbc43bcc1280f4323efb1830a394ba8660c9d1c2b564ba42bf7f7694845: [8] System error: no such file or directory`)
}
- if t.finish == 6 {
+ if t.exitCode == 6 {
return errors.New(`Error response from daemon: Cannot start container 58099cd76c834f3dc2a4fb76c8028f049ae6d4fdf0ec373e1f2cfea030670c2d: [8] System error: exec: "foobar": executable file not found in $PATH`)
}
}
}
-func (t *TestDockerClient) ContainerStop(ctx context.Context, container string, timeout *time.Duration) error {
+func (t *TestDockerClient) ContainerRemove(ctx context.Context, container string, options dockertypes.ContainerRemoveOptions) error {
t.stop <- true
return nil
}
func (t *TestDockerClient) ContainerWait(ctx context.Context, container string, condition dockercontainer.WaitCondition) (<-chan dockercontainer.ContainerWaitOKBody, <-chan error) {
- body := make(chan dockercontainer.ContainerWaitOKBody)
+ t.calledWait = true
+ body := make(chan dockercontainer.ContainerWaitOKBody, 1)
err := make(chan error)
go func() {
t.fn(t)
- body <- dockercontainer.ContainerWaitOKBody{StatusCode: int64(t.finish)}
- close(body)
- close(err)
+ body <- dockercontainer.ContainerWaitOKBody{StatusCode: int64(t.exitCode)}
}()
return body, err
}
func (t *TestDockerClient) ImageInspectWithRaw(ctx context.Context, image string) (dockertypes.ImageInspect, []byte, error) {
- if t.finish == 2 {
+ if t.exitCode == 2 {
return dockertypes.ImageInspect{}, nil, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
}
}
func (t *TestDockerClient) ImageLoad(ctx context.Context, input io.Reader, quiet bool) (dockertypes.ImageLoadResponse, error) {
- if t.finish == 2 {
+ if t.exitCode == 2 {
return dockertypes.ImageLoadResponse{}, fmt.Errorf("Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?")
}
_, err := io.Copy(ioutil.Discard, input)
func (s *TestSuite) TestLoadImage(c *C) {
kc := &KeepTestClient{}
- docker := NewTestDockerClient(0)
- cr := NewContainerRunner(&ArvTestClient{}, kc, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr := NewContainerRunner(&ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
_, err := cr.Docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
func (s *TestSuite) TestLoadImageKeepError(c *C) {
// (2) Keep error
- docker := NewTestDockerClient(0)
- cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
cr.Container.ContainerImage = hwPDH
err := cr.LoadImage()
func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
// (4) Collection doesn't contain image
- docker := NewTestDockerClient(0)
- cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
cr.Container.ContainerImage = hwPDH
err := cr.LoadImage()
}
func (s *TestSuite) TestRunContainer(c *C) {
- docker := NewTestDockerClient(0)
- docker.fn = func(t *TestDockerClient) {
+ s.docker.fn = func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, "Hello world\n"))
t.logWriter.Close()
}
- cr := NewContainerRunner(&ArvTestClient{}, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr := NewContainerRunner(&ArvTestClient{}, &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
var logs TestLogs
cr.NewLogWriter = logs.NewTestLoggingWriter
// Used by the TestFullRun*() test below to DRY up boilerplate setup to do full
// dress rehearsal of the Run() function, starting from a JSON container record.
-func FullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, realTemp string) {
+func (s *TestSuite) fullRunHelper(c *C, record string, extraMounts []string, exitCode int, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, realTemp string) {
rec := arvados.Container{}
err := json.Unmarshal([]byte(record), &rec)
c.Check(err, IsNil)
- docker := NewTestDockerClient(exitCode)
- docker.fn = fn
- docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
+ s.docker.exitCode = exitCode
+ s.docker.fn = fn
+ s.docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
api = &ArvTestClient{Container: rec}
- docker.api = api
- cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ s.docker.api = api
+ cr = NewContainerRunner(api, &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
cr.statInterval = 100 * time.Millisecond
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
c.Assert(err, IsNil)
defer os.RemoveAll(realTemp)
- docker.realTemp = realTemp
+ s.docker.realTemp = realTemp
tempcount := 0
cr.MkTempDir = func(_ string, prefix string) (string, error) {
}
func (s *TestSuite) TestFullRunHello(c *C) {
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["echo", "hello world"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
}
func (s *TestSuite) TestCrunchstat(c *C) {
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["sleep", "1"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
func (s *TestSuite) TestNodeInfoLog(c *C) {
os.Setenv("SLURMD_NODENAME", "compute2")
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["sleep", "1"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
}
func (s *TestSuite) TestContainerRecordLog(c *C) {
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["sleep", "1"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
}
func (s *TestSuite) TestFullRunStderr(c *C) {
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["/bin/sh", "-c", "echo hello ; echo world 1>&2 ; exit 1"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
}
func (s *TestSuite) TestFullRunDefaultCwd(c *C) {
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["pwd"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
}
func (s *TestSuite) TestFullRunSetCwd(c *C) {
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["pwd"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": "/bin",
func (s *TestSuite) TestStopOnSignal(c *C) {
s.testStopContainer(c, func(cr *ContainerRunner) {
go func() {
- for !cr.cStarted {
+ for !s.docker.calledWait {
time.Sleep(time.Millisecond)
}
cr.SigChan <- syscall.SIGINT
err := json.Unmarshal([]byte(record), &rec)
c.Check(err, IsNil)
- docker := NewTestDockerClient(0)
- docker.fn = func(t *TestDockerClient) {
+ s.docker.fn = func(t *TestDockerClient) {
<-t.stop
t.logWriter.Write(dockerLog(1, "foo\n"))
t.logWriter.Close()
}
- docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
+ s.docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
api := &ArvTestClient{Container: rec}
- cr := NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr := NewContainerRunner(api, &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
cr.RunArvMount = func([]string, string) (*exec.Cmd, error) { return nil, nil }
setup(cr)
}
func (s *TestSuite) TestFullRunSetEnv(c *C) {
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["/bin/sh", "-c", "echo $FROBIZ"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": "/bin",
c.Assert(err, IsNil)
stubCertPath := stubCert(certTemp)
+ cr.parentTemp = realTemp
+
defer os.RemoveAll(realTemp)
defer os.RemoveAll(certTemp)
}
checkEmpty := func() {
- filepath.Walk(realTemp, func(path string, _ os.FileInfo, err error) error {
- c.Check(path, Equals, realTemp)
- c.Check(err, IsNil)
- return nil
- })
+ // Should be deleted.
+ _, err := os.Stat(realTemp)
+ c.Assert(os.IsNotExist(err), Equals, true)
+
+ // Now recreate it for the next test.
+ c.Assert(os.Mkdir(realTemp, 0777), IsNil)
}
{
c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
"--read-write", "--crunchstat-interval=5",
"--mount-by-pdh", "by_id", realTemp + "/keep1"})
- c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/tmp"})
+ c.Check(cr.Binds, DeepEquals, []string{realTemp + "/tmp2:/tmp"})
os.RemoveAll(cr.ArvMountPoint)
cr.CleanupDirs()
checkEmpty()
c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
"--read-write", "--crunchstat-interval=5",
"--mount-by-pdh", "by_id", realTemp + "/keep1"})
- c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/out", realTemp + "/3:/tmp"})
+ c.Check(cr.Binds, DeepEquals, []string{realTemp + "/tmp2:/out", realTemp + "/tmp3:/tmp"})
os.RemoveAll(cr.ArvMountPoint)
cr.CleanupDirs()
checkEmpty()
c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
"--read-write", "--crunchstat-interval=5",
"--mount-by-pdh", "by_id", realTemp + "/keep1"})
- c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/tmp", stubCertPath + ":/etc/arvados/ca-certificates.crt:ro"})
+ c.Check(cr.Binds, DeepEquals, []string{realTemp + "/tmp2:/tmp", stubCertPath + ":/etc/arvados/ca-certificates.crt:ro"})
os.RemoveAll(cr.ArvMountPoint)
cr.CleanupDirs()
checkEmpty()
err := cr.SetupMounts()
c.Check(err, IsNil)
sort.StringSlice(cr.Binds).Sort()
- c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2/mountdata.json:/mnt/test.json:ro"})
- content, err := ioutil.ReadFile(realTemp + "/2/mountdata.json")
+ c.Check(cr.Binds, DeepEquals, []string{realTemp + "/json2/mountdata.json:/mnt/test.json:ro"})
+ content, err := ioutil.ReadFile(realTemp + "/json2/mountdata.json")
c.Check(err, IsNil)
c.Check(content, DeepEquals, []byte(test.out))
os.RemoveAll(cr.ArvMountPoint)
c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other",
"--read-write", "--crunchstat-interval=5",
"--file-cache", "512", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
- c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/tmp", realTemp + "/keep1/tmp0:/tmp/foo:ro"})
+ c.Check(cr.Binds, DeepEquals, []string{realTemp + "/tmp2:/tmp", realTemp + "/keep1/tmp0:/tmp/foo:ro"})
os.RemoveAll(cr.ArvMountPoint)
cr.CleanupDirs()
checkEmpty()
}
- // Writable mount points are not allowed underneath output_dir mount point
+ // Writable mount points copied to output_dir mount point
{
i = 0
cr.ArvMountPoint = ""
cr.Container.Mounts = make(map[string]arvados.Mount)
cr.Container.Mounts = map[string]arvados.Mount{
- "/tmp": {Kind: "tmp"},
- "/tmp/foo": {Kind: "collection", Writable: true},
+ "/tmp": {Kind: "tmp"},
+ "/tmp/foo": {Kind: "collection",
+ PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53",
+ Writable: true},
+ "/tmp/bar": {Kind: "collection",
+ PortableDataHash: "59389a8f9ee9d399be35462a0f92541d+53",
+ Path: "baz",
+ Writable: true},
}
cr.OutputPath = "/tmp"
+ os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
+ os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541d+53/baz", os.ModePerm)
+
+ rf, _ := os.Create(realTemp + "/keep1/by_id/59389a8f9ee9d399be35462a0f92541d+53/baz/quux")
+ rf.Write([]byte("bar"))
+ rf.Close()
+
err := cr.SetupMounts()
- c.Check(err, NotNil)
- c.Check(err, ErrorMatches, `Writable mount points are not permitted underneath the output_path.*`)
+ c.Check(err, IsNil)
+ _, err = os.Stat(cr.HostOutputDir + "/foo")
+ c.Check(err, IsNil)
+ _, err = os.Stat(cr.HostOutputDir + "/bar/quux")
+ c.Check(err, IsNil)
os.RemoveAll(cr.ArvMountPoint)
cr.CleanupDirs()
checkEmpty()
"runtime_constraints": {}
}`
- api, _, _ := FullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
+ api, _, _ := s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
t.logWriter.Close()
})
}
// Used by the TestStdoutWithWrongPath*()
-func StdoutErrorRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, err error) {
+func (s *TestSuite) stdoutErrorRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner, err error) {
rec := arvados.Container{}
err = json.Unmarshal([]byte(record), &rec)
c.Check(err, IsNil)
- docker := NewTestDockerClient(0)
- docker.fn = fn
- docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
+ s.docker.fn = fn
+ s.docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
api = &ArvTestClient{Container: rec}
- cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr = NewContainerRunner(api, &KeepTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
}
func (s *TestSuite) TestStdoutWithWrongPath(c *C) {
- _, _, err := StdoutErrorRunHelper(c, `{
+ _, _, err := s.stdoutErrorRunHelper(c, `{
"mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "file", "path":"/tmpa.out"} },
"output_path": "/tmp"
}`, func(t *TestDockerClient) {})
}
func (s *TestSuite) TestStdoutWithWrongKindTmp(c *C) {
- _, _, err := StdoutErrorRunHelper(c, `{
+ _, _, err := s.stdoutErrorRunHelper(c, `{
"mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "tmp", "path":"/tmp/a.out"} },
"output_path": "/tmp"
}`, func(t *TestDockerClient) {})
}
func (s *TestSuite) TestStdoutWithWrongKindCollection(c *C) {
- _, _, err := StdoutErrorRunHelper(c, `{
+ _, _, err := s.stdoutErrorRunHelper(c, `{
"mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "collection", "path":"/tmp/a.out"} },
"output_path": "/tmp"
}`, func(t *TestDockerClient) {})
func (s *TestSuite) TestFullRunWithAPI(c *C) {
os.Setenv("ARVADOS_API_HOST", "test.arvados.org")
defer os.Unsetenv("ARVADOS_API_HOST")
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": "/bin",
func (s *TestSuite) TestFullRunSetOutput(c *C) {
os.Setenv("ARVADOS_API_HOST", "test.arvados.org")
defer os.Unsetenv("ARVADOS_API_HOST")
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": "/bin",
extraMounts := []string{"a3e8f74c6f101eae01fa08bfb4e49b3a+54"}
- api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+ api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
t.logWriter.Close()
})
"a0def87f80dd594d4675809e83bd4f15+367/subdir1/subdir2/file2_in_subdir2.txt",
}
- api, runner, realtemp := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+ api, runner, realtemp := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
t.logWriter.Close()
})
- c.Check(runner.Binds, DeepEquals, []string{realtemp + "/2:/tmp",
+ c.Check(runner.Binds, DeepEquals, []string{realtemp + "/tmp2:/tmp",
realtemp + "/keep1/by_id/a0def87f80dd594d4675809e83bd4f15+367/file2_in_main.txt:/tmp/foo/bar:ro",
realtemp + "/keep1/by_id/a0def87f80dd594d4675809e83bd4f15+367/subdir1/subdir2/file2_in_subdir2.txt:/tmp/foo/baz/sub2file2:ro",
realtemp + "/keep1/by_id/a0def87f80dd594d4675809e83bd4f15+367/subdir1:/tmp/foo/sub1:ro",
"b0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt",
}
- api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+ api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
t.logWriter.Close()
})
"a0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt",
}
- api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
- os.Symlink("/keep/foo/sub1file2", t.realTemp+"/2/baz")
- os.Symlink("/keep/foo2/subdir1/file2_in_subdir1.txt", t.realTemp+"/2/baz2")
- os.Symlink("/keep/foo2/subdir1", t.realTemp+"/2/baz3")
- os.Mkdir(t.realTemp+"/2/baz4", 0700)
- os.Symlink("/keep/foo2/subdir1/file2_in_subdir1.txt", t.realTemp+"/2/baz4/baz5")
+ api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+ os.Symlink("/keep/foo/sub1file2", t.realTemp+"/tmp2/baz")
+ os.Symlink("/keep/foo2/subdir1/file2_in_subdir1.txt", t.realTemp+"/tmp2/baz2")
+ os.Symlink("/keep/foo2/subdir1", t.realTemp+"/tmp2/baz3")
+ os.Mkdir(t.realTemp+"/tmp2/baz4", 0700)
+ os.Symlink("/keep/foo2/subdir1/file2_in_subdir1.txt", t.realTemp+"/tmp2/baz4/baz5")
t.logWriter.Close()
})
extraMounts := []string{}
- api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
- os.Symlink("/etc/hosts", t.realTemp+"/2/baz")
+ api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+ os.Symlink("/etc/hosts", t.realTemp+"/tmp2/baz")
t.logWriter.Close()
})
extraMounts := []string{}
- api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
- rf, _ := os.Create(t.realTemp + "/2/realfile")
+ api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+ rf, _ := os.Create(t.realTemp + "/tmp2/realfile")
rf.Write([]byte("foo"))
rf.Close()
- os.Mkdir(t.realTemp+"/2/realdir", 0700)
- rf, _ = os.Create(t.realTemp + "/2/realdir/subfile")
+ os.Mkdir(t.realTemp+"/tmp2/realdir", 0700)
+ rf, _ = os.Create(t.realTemp + "/tmp2/realdir/subfile")
rf.Write([]byte("bar"))
rf.Close()
- os.Symlink("/tmp/realfile", t.realTemp+"/2/file1")
- os.Symlink("realfile", t.realTemp+"/2/file2")
- os.Symlink("/tmp/file1", t.realTemp+"/2/file3")
- os.Symlink("file2", t.realTemp+"/2/file4")
- os.Symlink("realdir", t.realTemp+"/2/dir1")
- os.Symlink("/tmp/realdir", t.realTemp+"/2/dir2")
+ os.Symlink("/tmp/realfile", t.realTemp+"/tmp2/file1")
+ os.Symlink("realfile", t.realTemp+"/tmp2/file2")
+ os.Symlink("/tmp/file1", t.realTemp+"/tmp2/file3")
+ os.Symlink("file2", t.realTemp+"/tmp2/file4")
+ os.Symlink("realdir", t.realTemp+"/tmp2/dir1")
+ os.Symlink("/tmp/realdir", t.realTemp+"/tmp2/dir2")
t.logWriter.Close()
})
"b0def87f80dd594d4675809e83bd4f15+367/file1_in_main.txt",
}
- api, _, _ := FullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
+ api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
t.logWriter.Close()
})
"runtime_constraints": {}
}`
- api, _, _ := FullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
+ api, _, _ := s.fullRunHelper(c, helperRecord, nil, 0, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
t.logWriter.Close()
})
}
func (s *TestSuite) TestStderrMount(c *C) {
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["/bin/sh", "-c", "echo hello;exit 1"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
ech := tf.Name()
brokenNodeHook = &ech
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["echo", "hello world"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
ech := ""
brokenNodeHook = &ech
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["echo", "hello world"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
ech := ""
brokenNodeHook = &ech
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["echo", "hello world"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
ech := ""
brokenNodeHook = &ech
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["echo", "hello world"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
ech := ""
brokenNodeHook = &ech
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["echo", "hello world"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
ech := ""
brokenNodeHook = &ech
- api, _, _ := FullRunHelper(c, `{
+ api, _, _ := s.fullRunHelper(c, `{
"command": ["echo", "hello world"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
self.current_user = api.users().current().execute(num_retries=num_retries)
self._poll = True
self._poll_time = poll_time
+ self._updating_lock = threading.Lock()
@use_counter
def update(self):
- with llfuse.lock_released:
- all_projects = arvados.util.list_all(
- self.api.groups().list, self.num_retries,
- filters=[['group_class','=','project']])
- objects = {}
- for ob in all_projects:
- objects[ob['uuid']] = ob
-
- roots = []
- root_owners = {}
- for ob in all_projects:
- if ob['owner_uuid'] != self.current_user['uuid'] and ob['owner_uuid'] not in objects:
- roots.append(ob)
- root_owners[ob['owner_uuid']] = True
-
- lusers = arvados.util.list_all(
- self.api.users().list, self.num_retries,
- filters=[['uuid','in', list(root_owners)]])
- lgroups = arvados.util.list_all(
- self.api.groups().list, self.num_retries,
- filters=[['uuid','in', list(root_owners)]])
-
- users = {}
- groups = {}
-
- for l in lusers:
- objects[l["uuid"]] = l
- for l in lgroups:
- objects[l["uuid"]] = l
-
- contents = {}
- for r in root_owners:
- if r in objects:
- obr = objects[r]
- if obr.get("name"):
- contents[obr["name"]] = obr
- #elif obr.get("username"):
- # contents[obr["username"]] = obr
- elif "first_name" in obr:
- contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
-
-
- for r in roots:
- if r['owner_uuid'] not in objects:
- contents[r['name']] = r
-
- # end with llfuse.lock_released, re-acquire lock
-
try:
+ with llfuse.lock_released:
+ self._updating_lock.acquire()
+ if not self.stale():
+ return
+
+ all_projects = arvados.util.list_all(
+ self.api.groups().list, self.num_retries,
+ filters=[['group_class','=','project']],
+ select=["uuid", "owner_uuid"])
+ objects = {}
+ for ob in all_projects:
+ objects[ob['uuid']] = ob
+
+ roots = []
+ root_owners = set()
+ current_uuid = self.current_user['uuid']
+ for ob in all_projects:
+ if ob['owner_uuid'] != current_uuid and ob['owner_uuid'] not in objects:
+ roots.append(ob['uuid'])
+ root_owners.add(ob['owner_uuid'])
+
+ lusers = arvados.util.list_all(
+ self.api.users().list, self.num_retries,
+ filters=[['uuid','in', list(root_owners)]])
+ lgroups = arvados.util.list_all(
+ self.api.groups().list, self.num_retries,
+ filters=[['uuid','in', list(root_owners)+roots]])
+
+ for l in lusers:
+ objects[l["uuid"]] = l
+ for l in lgroups:
+ objects[l["uuid"]] = l
+
+ contents = {}
+ for r in root_owners:
+ if r in objects:
+ obr = objects[r]
+ if obr.get("name"):
+ contents[obr["name"]] = obr
+ #elif obr.get("username"):
+ # contents[obr["username"]] = obr
+ elif "first_name" in obr:
+ contents[u"{} {}".format(obr["first_name"], obr["last_name"])] = obr
+
+ for r in roots:
+ if r in objects:
+ obr = objects[r]
+ if obr['owner_uuid'] not in objects:
+ contents[obr["name"]] = obr
+
+ # end with llfuse.lock_released, re-acquire lock
+
self.merge(contents.items(),
lambda i: i[0],
lambda a, i: a.uuid() == i[1]['uuid'],
lambda i: ProjectDirectory(self.inode, self.inodes, self.api, self.num_retries, i[1], poll=self._poll, poll_time=self._poll_time))
except Exception:
- _logger.exception()
+ _logger.exception("arv-mount shared dir error")
+ finally:
+ self._updating_lock.release()
def want_event_subscribe(self):
return True
}()
errChan := make(chan error)
go func() {
- errChan <- v.bsClient.CreateBlockBlobFromReader(v.ContainerName, loc, uint64(len(block)), bufr, nil)
+ var body io.Reader = bufr
+ if len(block) == 0 {
+ // We must send a "Content-Length: 0" header,
+ // but the http client interprets
+ // ContentLength==0 as "unknown" unless it can
+ // confirm by introspection that Body will
+ // read 0 bytes.
+ body = http.NoBody
+ bufr.Close()
+ }
+ errChan <- v.bsClient.CreateBlockBlobFromReader(v.ContainerName, loc, uint64(len(block)), body, nil)
}()
select {
case <-ctx.Done():
func (c *azureBlobClient) CreateBlockBlobFromReader(cname, bname string, size uint64, rdr io.Reader, hdrs map[string]string) error {
c.stats.Tick(&c.stats.Ops, &c.stats.CreateOps)
- rdr = NewCountingReader(rdr, c.stats.TickOutBytes)
+ if size != 0 {
+ rdr = NewCountingReader(rdr, c.stats.TickOutBytes)
+ }
err := c.client.CreateBlockBlobFromReader(cname, bname, size, rdr, hdrs)
c.stats.TickErr(err)
return err
return
}
+ if (r.Method == "PUT" || r.Method == "POST") && r.Header.Get("Content-Length") == "" {
+ rw.WriteHeader(http.StatusLengthRequired)
+ return
+ }
+
body, err := ioutil.ReadAll(r.Body)
if err != nil {
return
keep-setup.sh common.sh createusers.sh \
logger runsu.sh waitforpostgres.sh \
application_yml_override.py api-setup.sh \
+ go-setup.sh \
/usr/local/lib/arvbox/
ADD runit /etc/runit
RUN chown -R 1000:1000 /usr/src && /usr/local/lib/arvbox/createusers.sh
+RUN sudo -u arvbox /var/lib/arvbox/service/composer/run-service --only-deps
+RUN sudo -u arvbox /var/lib/arvbox/service/keep-web/run-service --only-deps
RUN sudo -u arvbox /var/lib/arvbox/service/sso/run-service --only-deps
RUN sudo -u arvbox /var/lib/arvbox/service/api/run-service --only-deps
RUN sudo -u arvbox /var/lib/arvbox/service/workbench/run-service --only-deps
RUN sudo -u arvbox /var/lib/arvbox/service/doc/run-service --only-deps
RUN sudo -u arvbox /var/lib/arvbox/service/vm/run-service --only-deps
-RUN sudo -u arvbox /var/lib/arvbox/service/keep-web/run-service --only-deps
RUN sudo -u arvbox /var/lib/arvbox/service/keepproxy/run-service --only-deps
RUN sudo -u arvbox /var/lib/arvbox/service/arv-git-httpd/run-service --only-deps
+RUN sudo -u arvbox /var/lib/arvbox/service/crunch-dispatch-local/run-service --only-deps
+RUN sudo -u arvbox /var/lib/arvbox/service/websockets/run-service --only-deps
RUN sudo -u arvbox /usr/local/lib/arvbox/keep-setup.sh --only-deps
RUN sudo -u arvbox /var/lib/arvbox/service/sdk/run-service
set -eux -o pipefail
. /usr/local/lib/arvbox/common.sh
+. /usr/local/lib/arvbox/go-setup.sh
-mkdir -p /var/lib/gopath
-cd /var/lib/gopath
-
-export GOPATH=$PWD
-mkdir -p "$GOPATH/src/git.curoverse.com"
-ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/crunchstat"
flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/sdk/go/crunchrunner"
-install bin/crunchstat bin/crunchrunner /usr/local/bin
+install $GOPATH/bin/crunchstat $GOPATH/bin/crunchrunner /usr/local/bin
if test -s /var/lib/arvados/api_rails_env ; then
RAILS_ENV=$(cat /var/lib/arvados/api_rails_env)
--- /dev/null
+#!/bin/bash
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+mkdir -p /var/lib/gopath
+cd /var/lib/gopath
+
+export GOPATH=$PWD
+mkdir -p "$GOPATH/src/git.curoverse.com"
+ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
+
+flock /var/lib/gopath/gopath.lock go get -t github.com/kardianos/govendor
+cd "$GOPATH/src/git.curoverse.com/arvados.git"
+flock /var/lib/gopath/gopath.lock go get -v -d ...
+flock /var/lib/gopath/gopath.lock "$GOPATH/bin/govendor" sync
set -eux -o pipefail
. /usr/local/lib/arvbox/common.sh
+. /usr/local/lib/arvbox/go-setup.sh
-mkdir -p /var/lib/gopath
-cd /var/lib/gopath
-
-export GOPATH=$PWD
-mkdir -p "$GOPATH/src/git.curoverse.com"
-ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/keepstore"
-install bin/keepstore /usr/local/bin
+install $GOPATH/bin/keepstore /usr/local/bin
if test "$1" = "--only-deps" ; then
exit
set -ex -o pipefail
. /usr/local/lib/arvbox/common.sh
+. /usr/local/lib/arvbox/go-setup.sh
-mkdir -p /var/lib/gopath
-cd /var/lib/gopath
-
-export GOPATH=$PWD
-mkdir -p "$GOPATH/src/git.curoverse.com"
-ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/arv-git-httpd"
-install bin/arv-git-httpd /usr/local/bin
+install $GOPATH/bin/arv-git-httpd /usr/local/bin
if test "$1" = "--only-deps" ; then
exit
cd /usr/src/composer
-npm -d install yarn
-
-PATH=$PATH:/usr/src/composer/node_modules/.bin
+npm -d install --prefix /usr/local --global yarn
yarn install
-if test "$1" != "--only-deps" ; then
- echo "apiEndPoint: https://${localip}:${services[api]}" > /usr/src/composer/src/composer.yml
- exec ng serve --host 0.0.0.0 --port 4200 --env=webdev
+if test "$1" = "--only-deps" ; then
+ exit
fi
+
+echo "apiEndPoint: https://${localip}:${services[api]}" > /usr/src/composer/src/composer.yml
+exec node_modules/.bin/ng serve --host 0.0.0.0 --port 4200 --env=webdev
# SPDX-License-Identifier: AGPL-3.0
exec 2>&1
-set -eux -o pipefail
+set -ex -o pipefail
. /usr/local/lib/arvbox/common.sh
+. /usr/local/lib/arvbox/go-setup.sh
-mkdir -p /var/lib/gopath
-cd /var/lib/gopath
-
-export GOPATH=$PWD
-mkdir -p "$GOPATH/src/git.curoverse.com"
-ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/crunch-run"
flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/crunch-dispatch-local"
-install bin/crunch-run bin/crunch-dispatch-local /usr/local/bin
+install $GOPATH/bin/crunch-run $GOPATH/bin/crunch-dispatch-local /usr/local/bin
+
+if test "$1" = "--only-deps" ; then
+ exit
+fi
cat > /usr/local/bin/crunch-run.sh <<EOF
#!/bin/sh
set -ex -o pipefail
. /usr/local/lib/arvbox/common.sh
+. /usr/local/lib/arvbox/go-setup.sh
-mkdir -p /var/lib/gopath
-cd /var/lib/gopath
-
-export GOPATH=$PWD
-mkdir -p "$GOPATH/src/git.curoverse.com"
-ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/keep-web"
-install bin/keep-web /usr/local/bin
+install $GOPATH/bin/keep-web /usr/local/bin
if test "$1" = "--only-deps" ; then
exit
set -ex -o pipefail
. /usr/local/lib/arvbox/common.sh
+. /usr/local/lib/arvbox/go-setup.sh
-mkdir -p /var/lib/gopath
-cd /var/lib/gopath
-
-export GOPATH=$PWD
-mkdir -p "$GOPATH/src/git.curoverse.com"
-ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/keepproxy"
-install bin/keepproxy /usr/local/bin
+install $GOPATH/bin/keepproxy /usr/local/bin
if test "$1" = "--only-deps" ; then
exit
flock /var/lib/arvados/createusers.lock /usr/local/lib/arvbox/createusers.sh
+make-ssl-cert generate-default-snakeoil --force-overwrite
+
. /usr/local/lib/arvbox/common.sh
chown -R $PGUSER:$PGGROUP /var/lib/postgresql
RAILS_ENV=development
fi
-mkdir -p /var/lib/gopath
-cd /var/lib/gopath
+. /usr/local/lib/arvbox/go-setup.sh
-export GOPATH=$PWD
-mkdir -p "$GOPATH/src/git.curoverse.com"
-ln -sfn "/usr/src/arvados" "$GOPATH/src/git.curoverse.com/arvados.git"
flock /var/lib/gopath/gopath.lock go get -t "git.curoverse.com/arvados.git/services/ws"
-install bin/ws /usr/local/bin/arvados-ws
+install $GOPATH/bin/ws /usr/local/bin/arvados-ws
if test "$1" = "--only-deps" ; then
exit
}
userIDToUUID[uID] = u.UUID
if cfg.Verbose {
- log.Printf("Seen user %q (%s)", u.Username, u.Email)
+ log.Printf("Seen user %q (%s)", u.Username, u.UUID)
}
}
return err
}
log.Printf("Found %d remote groups", len(remoteGroups))
+ if cfg.Verbose {
+ for groupUUID := range remoteGroups {
+ log.Printf("- Group %q: %d users", remoteGroups[groupUUID].Group.Name, len(remoteGroups[groupUUID].PreviousMembers))
+ }
+ }
membershipsRemoved := 0
Operator: "=",
Operand: group.UUID,
}, {
- Attr: "head_kind",
- Operator: "=",
- Operand: "arvados#user",
+ Attr: "head_uuid",
+ Operator: "like",
+ Operand: "%-tpzed-%",
}},
}
// User -> Group filter
Operator: "=",
Operand: group.UUID,
}, {
- Attr: "tail_kind",
- Operator: "=",
- Operand: "arvados#user",
+ Attr: "tail_uuid",
+ Operator: "like",
+ Operand: "%-tpzed-%",
}},
}
g2uLinks, err := GetAll(cfg.Client, "links", g2uFilter, &LinkList{})
// RemoveMemberFromGroup remove all links related to the membership
func RemoveMemberFromGroup(cfg *ConfigParams, user arvados.User, group arvados.Group) error {
if cfg.Verbose {
- log.Printf("Getting group membership links for user %q (%s) on group %q (%s)", user.Email, user.UUID, group.Name, group.UUID)
+ log.Printf("Getting group membership links for user %q (%s) on group %q (%s)", user.Username, user.UUID, group.Name, group.UUID)
}
var links []interface{}
// Search for all group<->user links (both ways)
c.Assert(len(s.users), Not(Equals), 0)
}
-// Clean any membership link and remote group created by the test
func (s *TestSuite) TearDownTest(c *C) {
var dst interface{}
// Reset database to fixture state after every test run.
var _ = Suite(&TestSuite{})
-// MakeTempCVSFile creates a temp file with data as comma separated values
+// MakeTempCSVFile creates a temp file with data as comma separated values
func MakeTempCSVFile(data [][]string) (f *os.File, err error) {
f, err = ioutil.TempFile("", "test_sync_remote_groups")
if err != nil {
// The absence of a user membership on the CSV file implies its removal
func (s *TestSuite) TestMembershipRemoval(c *C) {
- activeUserEmail := s.users[arvadostest.ActiveUserUUID].Email
- activeUserUUID := s.users[arvadostest.ActiveUserUUID].UUID
+ localUserEmail := s.users[arvadostest.ActiveUserUUID].Email
+ localUserUUID := s.users[arvadostest.ActiveUserUUID].UUID
+ remoteUserEmail := s.users[arvadostest.FederatedActiveUserUUID].Email
+ remoteUserUUID := s.users[arvadostest.FederatedActiveUserUUID].UUID
data := [][]string{
- {"TestGroup1", activeUserEmail},
- {"TestGroup2", activeUserEmail},
+ {"TestGroup1", localUserEmail},
+ {"TestGroup1", remoteUserEmail},
+ {"TestGroup2", localUserEmail},
+ {"TestGroup2", remoteUserEmail},
}
tmpfile, err := MakeTempCSVFile(data)
c.Assert(err, IsNil)
groupUUID, err := RemoteGroupExists(s.cfg, groupName)
c.Assert(err, IsNil)
c.Assert(groupUUID, Not(Equals), "")
- c.Assert(GroupMembershipExists(s.cfg.Client, activeUserUUID, groupUUID), Equals, true)
+ c.Assert(GroupMembershipExists(s.cfg.Client, localUserUUID, groupUUID), Equals, true)
+ c.Assert(GroupMembershipExists(s.cfg.Client, remoteUserUUID, groupUUID), Equals, true)
}
- // New CSV with one previous membership missing
+ // New CSV with some previous membership missing
data = [][]string{
- {"TestGroup1", activeUserEmail},
+ {"TestGroup1", localUserEmail},
+ {"TestGroup2", remoteUserEmail},
}
tmpfile2, err := MakeTempCSVFile(data)
c.Assert(err, IsNil)
s.cfg.Path = tmpfile2.Name()
err = doMain(s.cfg)
c.Assert(err, IsNil)
- // Confirm TestGroup1 membership still exist
+ // Confirm TestGroup1 memberships
groupUUID, err := RemoteGroupExists(s.cfg, "TestGroup1")
c.Assert(err, IsNil)
c.Assert(groupUUID, Not(Equals), "")
- c.Assert(GroupMembershipExists(s.cfg.Client, activeUserUUID, groupUUID), Equals, true)
- // Confirm TestGroup2 membership was removed
+ c.Assert(GroupMembershipExists(s.cfg.Client, localUserUUID, groupUUID), Equals, true)
+ c.Assert(GroupMembershipExists(s.cfg.Client, remoteUserUUID, groupUUID), Equals, false)
+ // Confirm TestGroup1 memberships
groupUUID, err = RemoteGroupExists(s.cfg, "TestGroup2")
c.Assert(err, IsNil)
c.Assert(groupUUID, Not(Equals), "")
- c.Assert(GroupMembershipExists(s.cfg.Client, activeUserUUID, groupUUID), Equals, false)
+ c.Assert(GroupMembershipExists(s.cfg.Client, localUserUUID, groupUUID), Equals, false)
+ c.Assert(GroupMembershipExists(s.cfg.Client, remoteUserUUID, groupUUID), Equals, true)
}
// If a group doesn't exist on the system, create it before adding users