10671: Merge branch 'master' into 10671-pipeline-instance-finish-time
authorLucas Di Pentima <lucas@curoverse.com>
Wed, 21 Dec 2016 17:28:41 +0000 (14:28 -0300)
committerLucas Di Pentima <lucas@curoverse.com>
Wed, 21 Dec 2016 17:28:41 +0000 (14:28 -0300)
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/crunch_script.py
sdk/cwl/tests/test_job.py
sdk/cwl/tests/test_submit.py
sdk/dev-jobs.dockerfile
services/crunch-run/crunchrun.go
services/keepstore/azure_blob_volume.go
services/keepstore/azure_blob_volume_test.go

index aba2f21ac96ea66220b4e9c5bf21e39f2f702338..04e94aececd05468b8452a19ce732a51b96038ae 100644 (file)
@@ -24,6 +24,8 @@ metrics = logging.getLogger('arvados.cwl-runner.metrics')
 
 crunchrunner_re = re.compile(r"^\S+ \S+ \d+ \d+ stderr \S+ \S+ crunchrunner: \$\(task\.(tmpdir|outdir|keep)\)=(.*)")
 
+crunchrunner_git_commit = 'a3f2cb186e437bfce0031b024b2157b73ed2717d'
+
 class ArvadosJob(object):
     """Submit and manage a Crunch job for executing a CWL CommandLineTool."""
 
@@ -109,7 +111,7 @@ class ArvadosJob(object):
 
         filters = [["repository", "=", "arvados"],
                    ["script", "=", "crunchrunner"],
-                   ["script_version", "in git", "9e5b98e8f5f4727856b53447191f9c06e3da2ba6"]]
+                   ["script_version", "in git", crunchrunner_git_commit]]
         if not self.arvrunner.ignore_docker_for_reuse:
             filters.append(["docker_image_locator", "in docker", runtime_constraints["docker_image"]])
 
@@ -121,7 +123,7 @@ class ArvadosJob(object):
                         "script": "crunchrunner",
                         "repository": "arvados",
                         "script_version": "master",
-                        "minimum_script_version": "9e5b98e8f5f4727856b53447191f9c06e3da2ba6",
+                        "minimum_script_version": crunchrunner_git_commit,
                         "script_parameters": {"tasks": [script_parameters]},
                         "runtime_constraints": runtime_constraints
                     },
@@ -247,7 +249,8 @@ class RunnerJob(Runner):
 
         return {
             "script": "cwl-runner",
-            "script_version": __version__,
+            "script_version": "master",
+            "minimum_script_version": "570509ab4d2ef93d870fd2b1f2eab178afb1bad9",
             "repository": "arvados",
             "script_parameters": self.job_order,
             "runtime_constraints": {
index 849b177aebbd7c4f5a507a8d0bb05ec915cf3b58..6f84d6825190b6df6fc930d4e10989e3ed936b38 100644 (file)
@@ -90,6 +90,7 @@ def run():
         args.quiet = False
         args.ignore_docker_for_reuse = False
         args.basedir = os.getcwd()
+        args.name = None
         args.cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]}
         outputObj = runner.arv_executor(t, job_order_object, **vars(args))
     except Exception as e:
index 7dbc9c8ca101bddb96735053bda5f83049a82368..15da596eae2ffa2cde76eca07c4a7e098e5aa19b 100644 (file)
@@ -12,6 +12,7 @@ import arvados_cwl
 import cwltool.process
 from schema_salad.ref_resolver import Loader
 from .mock_discovery import get_rootDesc
+from .matcher import JsonDiffMatcher
 
 if not os.getenv('ARVADOS_DEBUG'):
     logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
@@ -46,7 +47,7 @@ class TestJob(unittest.TestCase):
             for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
                 j.run(enable_reuse=enable_reuse)
                 runner.api.jobs().create.assert_called_with(
-                    body={
+                    body=JsonDiffMatcher({
                         'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
                         'runtime_constraints': {},
                         'script_parameters': {
@@ -56,7 +57,7 @@ class TestJob(unittest.TestCase):
                             }],
                         },
                         'script_version': 'master',
-                        'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
+                        'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
                         'repository': 'arvados',
                         'script': 'crunchrunner',
                         'runtime_constraints': {
@@ -65,11 +66,11 @@ class TestJob(unittest.TestCase):
                             'min_ram_mb_per_node': 1024,
                             'min_scratch_mb_per_node': 2048 # tmpdirSize + outdirSize
                         }
-                    },
+                    }),
                     find_or_create=enable_reuse,
                     filters=[['repository', '=', 'arvados'],
                              ['script', '=', 'crunchrunner'],
-                             ['script_version', 'in git', '9e5b98e8f5f4727856b53447191f9c06e3da2ba6'],
+                             ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
                              ['docker_image_locator', 'in docker', 'arvados/jobs:'+arvados_cwl.__version__]]
                 )
 
@@ -113,7 +114,7 @@ class TestJob(unittest.TestCase):
         for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
             j.run()
         runner.api.jobs().create.assert_called_with(
-            body={
+            body=JsonDiffMatcher({
                 'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
                 'runtime_constraints': {},
                 'script_parameters': {
@@ -124,7 +125,7 @@ class TestJob(unittest.TestCase):
                     }]
             },
             'script_version': 'master',
-                'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
+                'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
                 'repository': 'arvados',
                 'script': 'crunchrunner',
                 'runtime_constraints': {
@@ -134,11 +135,11 @@ class TestJob(unittest.TestCase):
                     'min_scratch_mb_per_node': 5024, # tmpdirSize + outdirSize
                     'keep_cache_mb_per_task': 512
                 }
-            },
+            }),
             find_or_create=True,
             filters=[['repository', '=', 'arvados'],
                      ['script', '=', 'crunchrunner'],
-                     ['script_version', 'in git', '9e5b98e8f5f4727856b53447191f9c06e3da2ba6'],
+                     ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
                      ['docker_image_locator', 'in docker', 'arvados/jobs:'+arvados_cwl.__version__]])
 
     @mock.patch("arvados.collection.CollectionReader")
@@ -275,8 +276,8 @@ class TestWorkflow(unittest.TestCase):
             subwf = f.read()
 
         runner.api.jobs().create.assert_called_with(
-            body={
-                'minimum_script_version': '9e5b98e8f5f4727856b53447191f9c06e3da2ba6',
+            body=JsonDiffMatcher({
+                'minimum_script_version': 'a3f2cb186e437bfce0031b024b2157b73ed2717d',
                 'repository': 'arvados',
                 'script_version': 'master',
                 'script': 'crunchrunner',
@@ -296,10 +297,10 @@ class TestWorkflow(unittest.TestCase):
                     'docker_image': 'arvados/jobs:'+arvados_cwl.__version__,
                     'min_ram_mb_per_node': 1024
                 },
-                'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'},
+                'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}),
             filters=[['repository', '=', 'arvados'],
                      ['script', '=', 'crunchrunner'],
-                     ['script_version', 'in git', '9e5b98e8f5f4727856b53447191f9c06e3da2ba6'],
+                     ['script_version', 'in git', 'a3f2cb186e437bfce0031b024b2157b73ed2717d'],
                      ['docker_image_locator', 'in docker', 'arvados/jobs:'+arvados_cwl.__version__]],
             find_or_create=True)
 
index 4418ee3fd153ac1d3ddf92a798ce7a0b28ace361..a39972b3b82f93dddf6d8a13463fdbec47a22d22 100644 (file)
@@ -125,7 +125,8 @@ def stubs(func):
                 '99999999999999999999999999999991+99/wf/submit_wf.cwl'
             },
             'repository': 'arvados',
-            'script_version': arvados_cwl.__version__,
+            'script_version': 'master',
+            'minimum_script_version': '570509ab4d2ef93d870fd2b1f2eab178afb1bad9',
             'script': 'cwl-runner'
         }
         stubs.pipeline_component = stubs.expect_job_spec.copy()
@@ -147,7 +148,8 @@ def stubs(func):
                         'arv:enable_reuse': True
                     },
                     'repository': 'arvados',
-                    'script_version': arvados_cwl.__version__,
+                    'script_version': 'master',
+                    'minimum_script_version': '570509ab4d2ef93d870fd2b1f2eab178afb1bad9',
                     'script': 'cwl-runner',
                     'job': {'state': 'Queued', 'uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}
                 }
@@ -1039,7 +1041,8 @@ class TestTemplateInputs(unittest.TestCase):
                     },
                 },
                 'repository': 'arvados',
-                'script_version': arvados_cwl.__version__,
+                'script_version': 'master',
+                'minimum_script_version': '570509ab4d2ef93d870fd2b1f2eab178afb1bad9',
                 'script': 'cwl-runner',
             },
         },
index bd80a0c1af25f74b21a158cdab99c68488af4d0d..38fefd0c4df3d0519f6834749842793ce539d8b8 100644 (file)
@@ -14,7 +14,7 @@ MAINTAINER Ward Vandewege <ward@curoverse.com>
 
 ENV DEBIAN_FRONTEND noninteractive
 
-RUN apt-get update -q && apt-get install -qy git python-pip python-virtualenv python-dev libcurl4-gnutls-dev libgnutls28-dev nodejs
+RUN apt-get update -q && apt-get install -qy git python-pip python-virtualenv python-dev libcurl4-gnutls-dev libgnutls28-dev nodejs python-pyasn1-modules
 
 RUN pip install -U setuptools
 
index ebee1a8f1eb739f0de7096ef6d5176d66c15836c..e13033edb3920ff9de4c9fa5061e6d54471febfd 100644 (file)
@@ -751,10 +751,10 @@ func (runner *ContainerRunner) ContainerToken() (string, error) {
 func (runner *ContainerRunner) UpdateContainerFinal() error {
        update := arvadosclient.Dict{}
        update["state"] = runner.finalState
+       if runner.LogsPDH != nil {
+               update["log"] = *runner.LogsPDH
+       }
        if runner.finalState == "Complete" {
-               if runner.LogsPDH != nil {
-                       update["log"] = *runner.LogsPDH
-               }
                if runner.ExitCode != nil {
                        update["exit_code"] = *runner.ExitCode
                }
index 43cf83a07ead3db94b2620be74375c738d4e5d08..75344890ab082ba0ef837741b747ba0889e44f67 100644 (file)
@@ -187,7 +187,7 @@ func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int,
        }
        var deadline time.Time
        haveDeadline := false
-       size, err := v.get(loc, buf)
+       size, err := v.get(ctx, loc, buf)
        for err == nil && size == 0 && loc != "d41d8cd98f00b204e9800998ecf8427e" {
                // Seeing a brand new empty block probably means we're
                // in a race with CreateBlob, which under the hood
@@ -208,8 +208,12 @@ func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int,
                } else if time.Now().After(deadline) {
                        break
                }
-               time.Sleep(azureWriteRacePollTime)
-               size, err = v.get(loc, buf)
+               select {
+               case <-ctx.Done():
+                       return 0, ctx.Err()
+               case <-time.After(azureWriteRacePollTime):
+               }
+               size, err = v.get(ctx, loc, buf)
        }
        if haveDeadline {
                log.Printf("Race ended with size==%d", size)
@@ -217,7 +221,9 @@ func (v *AzureBlobVolume) Get(ctx context.Context, loc string, buf []byte) (int,
        return size, err
 }
 
-func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
+func (v *AzureBlobVolume) get(ctx context.Context, loc string, buf []byte) (int, error) {
+       ctx, cancel := context.WithCancel(ctx)
+       defer cancel()
        expectSize := len(buf)
        if azureMaxGetBytes < BlockSize {
                // Unfortunately the handler doesn't tell us how long the blob
@@ -239,10 +245,18 @@ func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
        // We'll update this actualSize if/when we get the last piece.
        actualSize := -1
        pieces := (expectSize + azureMaxGetBytes - 1) / azureMaxGetBytes
-       errors := make([]error, pieces)
+       errors := make(chan error, pieces)
        var wg sync.WaitGroup
        wg.Add(pieces)
        for p := 0; p < pieces; p++ {
+               // Each goroutine retrieves one piece. If we hit an
+               // error, it is sent to the errors chan so get() can
+               // return it -- but only if the error happens before
+               // ctx is done. This way, if ctx is done before we hit
+               // any other error (e.g., requesting client has hung
+               // up), we return the original ctx.Err() instead of
+               // the secondary errors from the transfers that got
+               // interrupted as a result.
                go func(p int) {
                        defer wg.Done()
                        startPos := p * azureMaxGetBytes
@@ -252,23 +266,51 @@ func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
                        }
                        var rdr io.ReadCloser
                        var err error
-                       if startPos == 0 && endPos == expectSize {
-                               rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
-                       } else {
-                               rdr, err = v.bsClient.GetBlobRange(v.ContainerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
+                       gotRdr := make(chan struct{})
+                       go func() {
+                               defer close(gotRdr)
+                               if startPos == 0 && endPos == expectSize {
+                                       rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
+                               } else {
+                                       rdr, err = v.bsClient.GetBlobRange(v.ContainerName, loc, fmt.Sprintf("%d-%d", startPos, endPos-1), nil)
+                               }
+                       }()
+                       select {
+                       case <-ctx.Done():
+                               go func() {
+                                       <-gotRdr
+                                       if err == nil {
+                                               rdr.Close()
+                                       }
+                               }()
+                               return
+                       case <-gotRdr:
                        }
                        if err != nil {
-                               errors[p] = err
+                               errors <- err
+                               cancel()
                                return
                        }
-                       defer rdr.Close()
+                       go func() {
+                               // Close the reader when the client
+                               // hangs up or another piece fails
+                               // (possibly interrupting ReadFull())
+                               // or when all pieces succeed and
+                               // get() returns.
+                               <-ctx.Done()
+                               rdr.Close()
+                       }()
                        n, err := io.ReadFull(rdr, buf[startPos:endPos])
                        if pieces == 1 && (err == io.ErrUnexpectedEOF || err == io.EOF) {
                                // If we don't know the actual size,
                                // and just tried reading 64 MiB, it's
                                // normal to encounter EOF.
                        } else if err != nil {
-                               errors[p] = err
+                               if ctx.Err() == nil {
+                                       errors <- err
+                               }
+                               cancel()
+                               return
                        }
                        if p == pieces-1 {
                                actualSize = startPos + n
@@ -276,10 +318,12 @@ func (v *AzureBlobVolume) get(loc string, buf []byte) (int, error) {
                }(p)
        }
        wg.Wait()
-       for _, err := range errors {
-               if err != nil {
-                       return 0, v.translateError(err)
-               }
+       close(errors)
+       if len(errors) > 0 {
+               return 0, v.translateError(<-errors)
+       }
+       if ctx.Err() != nil {
+               return 0, ctx.Err()
        }
        return actualSize, nil
 }
@@ -293,7 +337,23 @@ func (v *AzureBlobVolume) Compare(ctx context.Context, loc string, expect []byte
        if trashed {
                return os.ErrNotExist
        }
-       rdr, err := v.bsClient.GetBlob(v.ContainerName, loc)
+       var rdr io.ReadCloser
+       gotRdr := make(chan struct{})
+       go func() {
+               defer close(gotRdr)
+               rdr, err = v.bsClient.GetBlob(v.ContainerName, loc)
+       }()
+       select {
+       case <-ctx.Done():
+               go func() {
+                       <-gotRdr
+                       if err == nil {
+                               rdr.Close()
+                       }
+               }()
+               return ctx.Err()
+       case <-gotRdr:
+       }
        if err != nil {
                return v.translateError(err)
        }
@@ -306,7 +366,36 @@ func (v *AzureBlobVolume) Put(ctx context.Context, loc string, block []byte) err
        if v.ReadOnly {
                return MethodDisabledError
        }
-       return v.bsClient.CreateBlockBlobFromReader(v.ContainerName, loc, uint64(len(block)), bytes.NewReader(block), nil)
+       // Send the block data through a pipe, so that (if we need to)
+       // we can close the pipe early and abandon our
+       // CreateBlockBlobFromReader() goroutine, without worrying
+       // about CreateBlockBlobFromReader() accessing our block
+       // buffer after we release it.
+       bufr, bufw := io.Pipe()
+       go func() {
+               io.Copy(bufw, bytes.NewReader(block))
+               bufw.Close()
+       }()
+       errChan := make(chan error)
+       go func() {
+               errChan <- v.bsClient.CreateBlockBlobFromReader(v.ContainerName, loc, uint64(len(block)), bufr, nil)
+       }()
+       select {
+       case <-ctx.Done():
+               theConfig.debugLogf("%s: taking CreateBlockBlobFromReader's input away: %s", v, ctx.Err())
+               // Our pipe might be stuck in Write(), waiting for
+               // io.Copy() to read. If so, un-stick it. This means
+               // CreateBlockBlobFromReader will get corrupt data,
+               // but that's OK: the size won't match, so the write
+               // will fail.
+               go io.Copy(ioutil.Discard, bufr)
+               // CloseWithError() will return once pending I/O is done.
+               bufw.CloseWithError(ctx.Err())
+               theConfig.debugLogf("%s: abandoning CreateBlockBlobFromReader goroutine", v)
+               return ctx.Err()
+       case err := <-errChan:
+               return err
+       }
 }
 
 // Touch updates the last-modified property of a block blob.
index c5dbc8f5831402aa3e223391c3ad0ece918de0a3..232382c4216a60fd3c9f658a549957441805633a 100644 (file)
@@ -576,6 +576,70 @@ func TestAzureBlobVolumeCreateBlobRaceDeadline(t *testing.T) {
        }
 }
 
+func TestAzureBlobVolumeContextCancelGet(t *testing.T) {
+       testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
+               v.PutRaw(TestHash, TestBlock)
+               _, err := v.Get(ctx, TestHash, make([]byte, BlockSize))
+               return err
+       })
+}
+
+func TestAzureBlobVolumeContextCancelPut(t *testing.T) {
+       testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
+               return v.Put(ctx, TestHash, make([]byte, BlockSize))
+       })
+}
+
+func TestAzureBlobVolumeContextCancelCompare(t *testing.T) {
+       testAzureBlobVolumeContextCancel(t, func(ctx context.Context, v *TestableAzureBlobVolume) error {
+               v.PutRaw(TestHash, TestBlock)
+               return v.Compare(ctx, TestHash, TestBlock2)
+       })
+}
+
+func testAzureBlobVolumeContextCancel(t *testing.T, testFunc func(context.Context, *TestableAzureBlobVolume) error) {
+       defer func(t http.RoundTripper) {
+               http.DefaultTransport = t
+       }(http.DefaultTransport)
+       http.DefaultTransport = &http.Transport{
+               Dial: (&azStubDialer{}).Dial,
+       }
+
+       v := NewTestableAzureBlobVolume(t, false, 3)
+       defer v.Teardown()
+       v.azHandler.race = make(chan chan struct{})
+
+       ctx, cancel := context.WithCancel(context.Background())
+       allDone := make(chan struct{})
+       go func() {
+               defer close(allDone)
+               err := testFunc(ctx, v)
+               if err != context.Canceled {
+                       t.Errorf("got %T %q, expected %q", err, err, context.Canceled)
+               }
+       }()
+       releaseHandler := make(chan struct{})
+       select {
+       case <-allDone:
+               t.Error("testFunc finished without waiting for v.azHandler.race")
+       case <-time.After(10 * time.Second):
+               t.Error("timed out waiting to enter handler")
+       case v.azHandler.race <- releaseHandler:
+       }
+
+       cancel()
+
+       select {
+       case <-time.After(10 * time.Second):
+               t.Error("timed out waiting to cancel")
+       case <-allDone:
+       }
+
+       go func() {
+               <-releaseHandler
+       }()
+}
+
 func (v *TestableAzureBlobVolume) PutRaw(locator string, data []byte) {
        v.azHandler.PutRaw(v.ContainerName, locator, data)
 }