Merge branch '11925-nodemanager-watchdog-test' refs #11925
authorPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 9 Aug 2017 19:11:35 +0000 (15:11 -0400)
committerPeter Amstutz <peter.amstutz@curoverse.com>
Wed, 9 Aug 2017 19:11:40 +0000 (15:11 -0400)
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <peter.amstutz@curoverse.com>

14 files changed:
build/run-build-packages.sh
build/run-library.sh
sdk/cli/bin/crunch-job
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/tests/test_submit.py
sdk/go/arvados/container.go
sdk/go/arvados/error.go
sdk/perl/lib/Arvados/Request.pm
services/keepstore/s3_volume.go
services/nodemanager/arvnodeman/jobqueue.py
services/ws/event_source.go
services/ws/handler.go
services/ws/session_v0.go
vendor/vendor.json

index 39c40934311c9786ba0ed7786818807d77086ab6..a0a6c5f2645f6a17ca201f51672e160f29d37e86 100755 (executable)
@@ -340,6 +340,7 @@ fi
 # Go binaries
 cd $WORKSPACE/packages/$TARGET
 export GOPATH=$(mktemp -d)
+go get -v github.com/kardianos/govendor
 package_go_binary sdk/go/crunchrunner crunchrunner \
     "Crunchrunner executes a command inside a container and uploads the output"
 package_go_binary services/arv-git-httpd arvados-git-httpd \
index ae5ad6d49bff34557c68eacebb8976d97ab4b2db..cf7755b68de780631cee4319ea720160146ffdff 100755 (executable)
@@ -103,19 +103,27 @@ package_go_binary() {
 
     mkdir -p "$GOPATH/src/git.curoverse.com"
     ln -sfn "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git"
+    (cd "$GOPATH/src/git.curoverse.com/arvados.git" && "$GOPATH/bin/govendor" sync -v)
 
     cd "$GOPATH/src/git.curoverse.com/arvados.git/$src_path"
     local version="$(version_from_git)"
     local timestamp="$(timestamp_from_git)"
 
-    # If the command imports anything from the Arvados SDK, bump the
-    # version number and build a new package whenever the SDK changes.
+    # Update the version number and build a new package if the vendor
+    # bundle has changed, or the command imports anything from the
+    # Arvados SDK and the SDK has changed.
+    declare -a checkdirs=(vendor)
     if grep -qr git.curoverse.com/arvados .; then
-        cd "$GOPATH/src/git.curoverse.com/arvados.git/sdk/go"
-        if [[ $(timestamp_from_git) -gt "$timestamp" ]]; then
+        checkdirs+=(sdk/go)
+    fi
+    for dir in ${checkdirs[@]}; do
+        cd "$GOPATH/src/git.curoverse.com/arvados.git/$dir"
+        ts="$(timestamp_from_git)"
+        if [[ "$ts" -gt "$timestamp" ]]; then
             version=$(version_from_git)
+            timestamp="$ts"
         fi
-    fi
+    done
 
     cd $WORKSPACE/packages/$TARGET
     test_package_presence $prog $version go
index 5a92176e7f02fba11525190ccee511339819e1d2..5e6c3a084ed49d4f5d9e8beff6d9e38f815a2c5c 100755 (executable)
@@ -1544,7 +1544,7 @@ sub preprocess_stderr
         $st->{node}->{fail_count}++;
       }
     }
-    elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b)/i) {
+    elsif ($line =~ /srun: error: .*?\b(Node failure on|Aborting, .*?\bio error\b|cannot communicate with node .* aborting job)/i) {
       $jobstep[$jobstepidx]->{tempfail} = 1;
       if (defined($job_slot_index)) {
         $slot[$job_slot_index]->{node}->{fail_count}++;
index 4ab65d9d8774708613787b3a694f64bf876004da..769a63bce3f56763e7fa1767317d5af9828a03d0 100644 (file)
@@ -363,6 +363,9 @@ class RunnerContainer(Runner):
         if self.arvrunner.trash_intermediate:
             command.append("--trash-intermediate")
 
+        if self.arvrunner.project_uuid:
+            command.append("--project-uuid="+self.arvrunner.project_uuid)
+
         command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
 
         container_req["command"] = command
index 8ab0a8de9c2448e999475b7ac8735dcedc928099..49545a83dc7ac34eea9acc11dc3e022f2839f22c 100644 (file)
@@ -860,6 +860,31 @@ class TestSubmit(unittest.TestCase):
                          stubs.expect_container_request_uuid + '\n')
 
 
+    @stubs
+    def test_submit_container_project(self, stubs):
+        project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+        capture_stdout = cStringIO.StringIO()
+        try:
+            exited = arvados_cwl.main(
+                ["--submit", "--no-wait", "--api=containers", "--debug", "--project-uuid="+project_uuid,
+                 "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+                capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+            self.assertEqual(exited, 0)
+        except:
+            logging.exception("")
+
+        expect_container = copy.deepcopy(stubs.expect_container_spec)
+        expect_container["owner_uuid"] = project_uuid
+        expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
+                                       '--enable-reuse', '--on-error=continue', '--project-uuid='+project_uuid,
+                                       '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+
+        stubs.api.container_requests().create.assert_called_with(
+            body=JsonDiffMatcher(expect_container))
+        self.assertEqual(capture_stdout.getvalue(),
+                         stubs.expect_container_request_uuid + '\n')
+
+
     @stubs
     def test_submit_job_runner_image(self, stubs):
         capture_stdout = cStringIO.StringIO()
index 7d39d678f8ec02c52f6446461c4de9f8e95c142a..7e588be17bb16c04cdbd6098b8dbff8f7c599d18 100644 (file)
@@ -31,7 +31,7 @@ type Mount struct {
        Path              string      `json:"path"`
        Content           interface{} `json:"content"`
        ExcludeFromOutput bool        `json:"exclude_from_output"`
-       Capacity          int64       `json:capacity`
+       Capacity          int64       `json:"capacity"`
 }
 
 // RuntimeConstraints specify a container's compute resources (RAM,
index 29eebdbf729d557a88e121b582cdd78171e31bdd..773a2e6f9c7d787406511f85a6a5585596153738 100644 (file)
@@ -21,7 +21,7 @@ type TransactionError struct {
 }
 
 func (e TransactionError) Error() (s string) {
-       s = fmt.Sprintf("request failed: %s", e.URL)
+       s = fmt.Sprintf("request failed: %s", e.URL.String())
        if e.Status != "" {
                s = s + ": " + e.Status
        }
index 03d542805161481f4023c7c9084a2e0cce3c7b16..4523f7d6b3ac38561e17c50b889201166f22baad 100644 (file)
@@ -46,9 +46,12 @@ sub process_request
     $self->{'req'} = new HTTP::Request (%req);
     $self->{'req'}->header('Authorization' => ('OAuth2 ' . $self->{'authToken'})) if $self->{'authToken'};
     $self->{'req'}->header('Accept' => 'application/json');
+
+    # allow_nonref lets us encode JSON::true and JSON::false, see #12078
+    my $json = JSON->new->allow_nonref;
     my ($p, $v);
     while (($p, $v) = each %{$self->{'queryParams'}}) {
-        $content{$p} = (ref($v) eq "") ? $v : JSON::encode_json($v);
+        $content{$p} = (ref($v) eq "") ? $v : $json->encode($v);
     }
     my $content;
     while (($p, $v) = each %content) {
index 0fe773a59e278b93264bf1d63457a14d9b709ef8..0ab3e969a0ebaa3f0d007e0c764a1e7507f6ec8a 100644 (file)
@@ -133,7 +133,7 @@ func init() {
                &s3UnsafeDelete,
                "s3-unsafe-delete",
                false,
-               "EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.")
+               "EXPERIMENTAL. Enable deletion (garbage collection) even when trash lifetime is zero, even though there are known race conditions that can cause data loss.")
 }
 
 // S3Volume implements Volume using an S3 bucket.
index fa1ea983015f0e041ea258bba085eabda1924d77..4d2d3e0c0ace3e6ff9db5832d3f8a9dcc4b7ad9a 100644 (file)
@@ -156,7 +156,7 @@ class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
             for out in squeue_out.splitlines():
                 try:
                     cpu, ram, disk, reason, jobname = out.split("|", 4)
-                    if ("ReqNodeNotAvail" in reason) or ("Resources" in reason):
+                    if ("ReqNodeNotAvail" in reason) or ("Resources" in reason) or ("Priority" in reason):
                         queuelist.append({
                             "uuid": jobname,
                             "runtime_constraints": {
index edeb647e4628e675be696cb68f4b61892b4cc606..cfb828b2a5d84c6d16407866374e1f4900185f84 100644 (file)
@@ -248,7 +248,8 @@ func (ps *pgEventSource) DB() *sql.DB {
 }
 
 func (ps *pgEventSource) DBHealth() error {
-       ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
+       ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
+       defer cancel()
        var i int
        return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i)
 }
index f9f7f53edc58430f231e9a52d5d95bb1a025084a..d527c39ba1c4eeb12c0cbae63526150da27f096d 100644 (file)
@@ -60,6 +60,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
        // Receive websocket frames from the client and pass them to
        // sess.Receive().
        go func() {
+               defer cancel()
                buf := make([]byte, 2<<20)
                for {
                        select {
@@ -75,16 +76,14 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                                err = errFrameTooBig
                        }
                        if err != nil {
-                               if err != io.EOF {
+                               if err != io.EOF && ctx.Err() == nil {
                                        log.WithError(err).Info("read error")
                                }
-                               cancel()
                                return
                        }
                        err = sess.Receive(buf)
                        if err != nil {
                                log.WithError(err).Error("sess.Receive() failed")
-                               cancel()
                                return
                        }
                }
@@ -94,6 +93,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
        // sess.EventMessage() as needed, and send them to the client
        // as websocket frames.
        go func() {
+               defer cancel()
                for {
                        var ok bool
                        var data interface{}
@@ -119,8 +119,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                                buf, err = sess.EventMessage(e)
                                if err != nil {
                                        log.WithError(err).Error("EventMessage failed")
-                                       cancel()
-                                       break
+                                       return
                                } else if len(buf) == 0 {
                                        log.Debug("skip")
                                        continue
@@ -135,9 +134,10 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                        t0 := time.Now()
                        _, err = ws.Write(buf)
                        if err != nil {
-                               log.WithError(err).Error("write failed")
-                               cancel()
-                               break
+                               if ctx.Err() == nil {
+                                       log.WithError(err).Error("write failed")
+                               }
+                               return
                        }
                        log.Debug("sent")
 
@@ -159,6 +159,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
        // is done/cancelled or the incoming event stream ends. Shut
        // down the handler if the outgoing queue fills up.
        go func() {
+               defer cancel()
                ticker := time.NewTicker(h.PingTimeout)
                defer ticker.Stop()
 
@@ -178,10 +179,8 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                                        default:
                                        }
                                }
-                               continue
                        case e, ok := <-incoming.Channel():
                                if !ok {
-                                       cancel()
                                        return
                                }
                                if !sess.Filter(e) {
@@ -191,7 +190,6 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
                                case queue <- e:
                                default:
                                        log.WithError(errQueueFull).Error("terminate")
-                                       cancel()
                                        return
                                }
                        }
index 58c64231cb53c1204ceed70b0ea030a7050ebb95..4fbfc489cf30fe0e56425e37d909c250f83d967d 100644 (file)
@@ -205,6 +205,10 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
                        // client will probably reconnect and do the
                        // same thing all over again.
                        time.Sleep(100 * time.Millisecond)
+                       if sess.ws.Request().Context().Err() != nil {
+                               // Session terminated while we were sleeping
+                               return
+                       }
                }
                now := time.Now()
                e := &event{
index 958c81d4df8f408ade30f5047c2996af35e8c271..f9c46e7325aed6358e1cc8296d0a364fea27a908 100644 (file)
@@ -3,22 +3,25 @@
        "ignore": "test",
        "package": [
                {
-                       "checksumSHA1": "b68aaMZImS90FjnReAxpbp20FGA=",
+                       "checksumSHA1": "jf7K+UTQNIzRdlG5F4zX/8b++/E=",
                        "origin": "github.com/curoverse/goamz/aws",
                        "path": "github.com/AdRoll/goamz/aws",
-                       "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9"
+                       "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9",
+                       "revisionTime": "2017-07-27T13:52:37Z"
                },
                {
-                       "checksumSHA1": "ey9ddXTW9dncjJz/COKpeYm+sgg=",
+                       "checksumSHA1": "9nUwQXI+pNxZo6bnR7NslpMpfPI=",
                        "origin": "github.com/curoverse/goamz/s3",
                        "path": "github.com/AdRoll/goamz/s3",
-                       "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9"
+                       "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9",
+                       "revisionTime": "2017-07-27T13:52:37Z"
                },
                {
-                       "checksumSHA1": "pDHYVqUQtRsPYw/X4kUrdK7pxMs=",
+                       "checksumSHA1": "tvxbsTkdjB0C/uxEglqD6JfVnMg=",
                        "origin": "github.com/curoverse/goamz/s3/s3test",
                        "path": "github.com/AdRoll/goamz/s3/s3test",
-                       "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9"
+                       "revision": "21e563311c2dc5ac53464a2c31cb91fb833c6cb9",
+                       "revisionTime": "2017-07-27T13:52:37Z"
                },
                {
                        "checksumSHA1": "Rjy2uYZkQ8Kjht6ZFU0qzm2I/kI=",
                        "revisionTime": "2017-03-24T20:46:54Z"
                },
                {
-                       "checksumSHA1": "Gk3jTNQ5uGDUE0WMJFWcYz9PMps=",
+                       "checksumSHA1": "q5SZBWFVC3wOIzftf+l/h5WLG1k=",
                        "path": "github.com/lib/pq/oid",
                        "revision": "2704adc878c21e1329f46f6e56a1c387d788ff94",
                        "revisionTime": "2017-03-24T20:46:54Z"
                        "revisionTime": "2017-05-12T22:20:15Z"
                },
                {
-                       "checksumSHA1": "ENl6I8+3AaBanbn9CVExMjDTHPc=",
+                       "checksumSHA1": "dUfdXzRJupI9VpqNR2LlppeZvLc=",
                        "origin": "github.com/docker/docker/vendor/golang.org/x/sys/unix",
                        "path": "golang.org/x/sys/unix",
                        "revision": "280327cb4d1e1fe4f118d00596ce0b3a6ae6d07e",