13219: Moves time out code from dispatch to crunch-run
authorLucas Di Pentima <ldipentima@veritasgenetics.com>
Wed, 27 Jun 2018 20:43:58 +0000 (17:43 -0300)
committerLucas Di Pentima <ldipentima@veritasgenetics.com>
Wed, 27 Jun 2018 20:43:58 +0000 (17:43 -0300)
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>

sdk/go/arvados/container.go
sdk/go/dispatch/dispatch.go
services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go

index 1bdf0890230e96c27d9f77ae0239bfdd3b65b403..210ed9981c07292ec3c1508da978eaac351acae7 100644 (file)
@@ -20,7 +20,6 @@ type Container struct {
        OutputPath           string               `json:"output_path"`
        Priority             int                  `json:"priority"`
        RuntimeConstraints   RuntimeConstraints   `json:"runtime_constraints"`
-       StartedAt            time.Time            `json:"started_at"`
        State                ContainerState       `json:"state"`
        SchedulingParameters SchedulingParameters `json:"scheduling_parameters"`
 }
index ca2dbc48d4053f4e3036f0db6bcdaf4e03a0e064..3289c67b013f37a67ae8ddeaa52d3fd74abe34e5 100644 (file)
@@ -195,13 +195,6 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo ma
                        case Queued:
                                tracker.close()
                        case Locked, Running:
-                               if c.SchedulingParameters.MaxRunTime > 0 {
-                                       maxRunTime := time.Duration(c.SchedulingParameters.MaxRunTime) * time.Second
-                                       if time.Since(c.StartedAt) >= maxRunTime {
-                                               // Time's up, schedule container for cancellation
-                                               c.Priority = 0
-                                       }
-                               }
                                tracker.update(c)
                        case Cancelled, Complete:
                                tracker.close()
index 2f9ccf52460a667215cdfb9156b7df56605712a5..adce853a531e0f5012d88796613b30e4ff5a33d3 100644 (file)
@@ -1074,10 +1074,14 @@ func (runner *ContainerRunner) StartContainer() error {
 // WaitFinish waits for the container to terminate, capture the exit code, and
 // close the stdout/stderr logging.
 func (runner *ContainerRunner) WaitFinish() error {
+       var runTimeExceeded <-chan time.Time
        runner.CrunchLog.Print("Waiting for container to finish")
 
        waitOk, waitErr := runner.Docker.ContainerWait(context.TODO(), runner.ContainerID, dockercontainer.WaitConditionNotRunning)
        arvMountExit := runner.ArvMountExit
+       if timeout := runner.Container.SchedulingParameters.MaxRunTime; timeout > 0 {
+               runTimeExceeded = time.After(time.Duration(timeout) * time.Second)
+       }
        for {
                select {
                case waitBody := <-waitOk:
@@ -1098,6 +1102,11 @@ func (runner *ContainerRunner) WaitFinish() error {
                        // arvMountExit will always be ready now that
                        // it's closed, but that doesn't interest us.
                        arvMountExit = nil
+
+               case <-runTimeExceeded:
+                       runner.CrunchLog.Printf("maximum run time exceeded. Stopping container.")
+                       runner.stop(nil)
+                       runTimeExceeded = nil
                }
        }
 }
index c76682f1c69be0297606f88ceaaa8b8aa260d71a..8ad487d7719b6a0dc4936ddda6312f8e0ec2948f 100644 (file)
@@ -793,7 +793,7 @@ func (s *TestSuite) TestFullRunHello(c *C) {
     "mounts": {"/tmp": {"kind": "tmp"} },
     "output_path": "/tmp",
     "priority": 1,
-    "runtime_constraints": {}
+       "runtime_constraints": {}
 }`, nil, 0, func(t *TestDockerClient) {
                t.logWriter.Write(dockerLog(1, "hello world\n"))
                t.logWriter.Close()
@@ -805,6 +805,26 @@ func (s *TestSuite) TestFullRunHello(c *C) {
 
 }
 
+func (s *TestSuite) TestRunTimeExceeded(c *C) {
+       api, _, _ := s.fullRunHelper(c, `{
+    "command": ["sleep", "3"],
+    "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+    "cwd": ".",
+    "environment": {},
+    "mounts": {"/tmp": {"kind": "tmp"} },
+    "output_path": "/tmp",
+    "priority": 1,
+       "runtime_constraints": {},
+       "scheduling_parameters":{"max_run_time": 1}
+}`, nil, 0, func(t *TestDockerClient) {
+               time.Sleep(3 * time.Second)
+               t.logWriter.Close()
+       })
+
+       c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
+       c.Check(api.Logs["crunch-run"].String(), Matches, "(?ms).*maximum run time exceeded.*")
+}
+
 func (s *TestSuite) TestCrunchstat(c *C) {
        api, _, _ := s.fullRunHelper(c, `{
                "command": ["sleep", "1"],