Merge branch '14012-arvput-check-cache'
authorLucas Di Pentima <ldipentima@veritasgenetics.com>
Thu, 13 Dec 2018 19:34:21 +0000 (16:34 -0300)
committerLucas Di Pentima <ldipentima@veritasgenetics.com>
Thu, 13 Dec 2018 19:34:21 +0000 (16:34 -0300)
Closes #14012

Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>

33 files changed:
apps/workbench/app/controllers/users_controller.rb
apps/workbench/test/controllers/users_controller_test.rb
build/run-build-docker-jobs-image.sh
doc/admin/upgrading.html.textile.liquid
doc/install/install-controller.html.textile.liquid
docker/jobs/apt.arvados.org.list
lib/dispatchcloud/node_size.go
lib/dispatchcloud/node_size_test.go
sdk/cwl/arvados_cwl/__init__.py
sdk/cwl/arvados_cwl/arvcontainer.py
sdk/cwl/arvados_cwl/arvjob.py
sdk/cwl/arvados_cwl/arvtool.py
sdk/cwl/arvados_cwl/arvworkflow.py
sdk/cwl/arvados_cwl/crunch_script.py
sdk/cwl/arvados_cwl/executor.py
sdk/cwl/arvados_cwl/runner.py
sdk/cwl/setup.py
sdk/cwl/tests/submit_test_job_missing.json [new file with mode: 0644]
sdk/cwl/tests/test_submit.py
sdk/go/arvados/contextgroup.go [new file with mode: 0644]
sdk/go/arvados/fs_collection.go
sdk/go/arvados/fs_collection_test.go
sdk/go/arvados/throttle.go [new file with mode: 0644]
sdk/python/arvados/arvfile.py
sdk/python/arvados/collection.py
sdk/python/arvados/commands/run.py
sdk/python/tests/test_arvfile.py
sdk/python/tests/test_collections.py
services/api/app/models/arvados_model.rb
services/api/app/models/container.rb
services/api/test/unit/container_test.rb
services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go

index 8cfc2c10f1c29eee67a11074acfabe70da19aaba..c954944e0b4b8ad75ff75a805caf2927893a8c50 100644 (file)
@@ -144,7 +144,7 @@ class UsersController < ApplicationController
                                       owner_uuid: @object.uuid
                                     }
                                   })
-    redirect_to root_url(api_token: resp[:api_token])
+    redirect_to root_url(api_token: "v2/#{resp[:uuid]}/#{resp[:api_token]}")
   end
 
   def home
index 50b35021c093f23facb414667e74b84890f311b0..393b864dc53a61f3a81a91af6abd49e836a5e831 100644 (file)
@@ -35,6 +35,14 @@ class UsersControllerTest < ActionController::TestCase
     assert_match /\/users\/welcome/, @response.redirect_url
   end
 
+  test "'log in as user' feature uses a v2 token" do
+    post :sudo, {
+      id: api_fixture('users')['active']['uuid']
+    }, session_for('admin_trustedclient')
+    assert_response :redirect
+    assert_match /api_token=v2%2F/, @response.redirect_url
+  end
+
   test "request shell access" do
     user = api_fixture('users')['spectator']
 
index b1e99fc66b27c40d3ac13542e6f7de76ba37e202..83bb5ae7165cb2cfd11879e85411253472887b26 100755 (executable)
@@ -118,6 +118,11 @@ timer_reset
 # clean up the docker build environment
 cd "$WORKSPACE"
 
+if [[ -z "$ARVADOS_BUILDING_VERSION" ]] && ! [[ -z "$version_tag" ]]; then
+       ARVADOS_BUILDING_VERSION="$version_tag"
+       ARVADOS_BUILDING_ITERATION="1"
+fi
+
 python_sdk_ts=$(cd sdk/python && timestamp_from_git)
 cwl_runner_ts=$(cd sdk/cwl && timestamp_from_git)
 
@@ -130,11 +135,25 @@ fi
 
 echo cwl_runner_version $cwl_runner_version python_sdk_version $python_sdk_version
 
+if [[ "${python_sdk_version}" != "${ARVADOS_BUILDING_VERSION}" ]]; then
+       python_sdk_version="${python_sdk_version}-2"
+else
+       python_sdk_version="${ARVADOS_BUILDING_VERSION}-${ARVADOS_BUILDING_ITERATION}"
+fi
+
+cwl_runner_version_orig=$cwl_runner_version
+
+if [[ "${cwl_runner_version}" != "${ARVADOS_BUILDING_VERSION}" ]]; then
+       cwl_runner_version="${cwl_runner_version}-4"
+else
+       cwl_runner_version="${ARVADOS_BUILDING_VERSION}-${ARVADOS_BUILDING_ITERATION}"
+fi
+
 cd docker/jobs
 docker build $NOCACHE \
-       --build-arg python_sdk_version=${python_sdk_version}-2 \
-       --build-arg cwl_runner_version=${cwl_runner_version}-4 \
-       -t arvados/jobs:$cwl_runner_version .
+       --build-arg python_sdk_version=${python_sdk_version} \
+       --build-arg cwl_runner_version=${cwl_runner_version} \
+       -t arvados/jobs:$cwl_runner_version_orig .
 
 ECODE=$?
 
@@ -157,9 +176,9 @@ if docker --version |grep " 1\.[0-9]\." ; then
     FORCE=-f
 fi
 if ! [[ -z "$version_tag" ]]; then
-    docker tag $FORCE arvados/jobs:$cwl_runner_version arvados/jobs:"$version_tag"
+    docker tag $FORCE arvados/jobs:$cwl_runner_version_orig arvados/jobs:"$version_tag"
 else
-    docker tag $FORCE arvados/jobs:$cwl_runner_version arvados/jobs:latest
+    docker tag $FORCE arvados/jobs:$cwl_runner_version_orig arvados/jobs:latest
 fi
 
 ECODE=$?
@@ -185,7 +204,7 @@ else
         if ! [[ -z "$version_tag" ]]; then
             docker_push arvados/jobs:"$version_tag"
         else
-           docker_push arvados/jobs:$cwl_runner_version
+           docker_push arvados/jobs:$cwl_runner_version_orig
            docker_push arvados/jobs:latest
         fi
         title "upload arvados images finished (`timer`)"
index 0941fef339273f903d71ceabdb0aa99813773124..6a3e000ca0165649b0418dba566a43999be1be09 100644 (file)
@@ -30,6 +30,14 @@ Note to developers: Add new items at the top. Include the date, issue number, co
 TODO: extract this information based on git commit messages and generate changelogs / release notes automatically.
 {% endcomment %}
 
+h3. v1.3.0 (2018-12-05)
+
+This release includes several database migrations, which will be executed automatically as part of the API server upgrade. On large Arvados installations, these migrations will take a while. We've seen the upgrade take 30 minutes or more on installations with a lot of collections.
+
+The @arvados-controller@ component now requires the /etc/arvados/config.yml file to be present. See <a href="{{ site.baseurl }}/install/install-controller.html#configuration">the @arvados-controller@ installation instructions</a>.
+
+Support for the deprecated "jobs" API is broken in this release.  Users who rely on it should not upgrade.  This will be fixed in an upcoming 1.3.1 patch release, however users are "encouraged to migrate":upgrade-crunch2.html as support for the "jobs" API will be dropped in an upcoming release.  Users who are already using the "containers" API are not affected.
+
 h3. v1.2.1 (2018-11-26)
 
 There are no special upgrade notes for this release.
index ccb8d980aebc1f3f658a5ed603459ca15878736d..3e94b290d54076e77a12a44097061f6ed935f79f 100644 (file)
@@ -85,7 +85,7 @@ Restart Nginx to apply the new configuration.
 </code></pre>
 </notextile>
 
-h3. Configure arvados-controller
+h3(#configuration). Configure arvados-controller
 
 Create the cluster configuration file @/etc/arvados/config.yml@ using the following template.
 
index ae1a0862a67437e257fa1fdec2919473799ac56c..11b98e25bb0f3905ef299c1bc1d1a94f08c4a664 100644 (file)
@@ -1,3 +1,2 @@
 # apt.arvados.org
-deb http://apt.arvados.org/ jessie main
 deb http://apt.arvados.org/ jessie-dev main
index 1c36d6cf5bb770cb447b6f7f177d39c5ff7ef469..339e042c1aa0d15cb3f6cf1994c6146bf34de11d 100644 (file)
@@ -8,7 +8,9 @@ import (
        "errors"
        "log"
        "os/exec"
+       "regexp"
        "sort"
+       "strconv"
        "strings"
        "time"
 
@@ -27,6 +29,67 @@ type ConstraintsNotSatisfiableError struct {
        AvailableTypes []arvados.InstanceType
 }
 
+var pdhRegexp = regexp.MustCompile(`^[0-9a-f]{32}\+(\d+)$`)
+
+// estimateDockerImageSize estimates how much disk space will be used
+// by a Docker image, given the PDH of a collection containing a
+// Docker image that was created by "arv-keepdocker".  Returns
+// estimated number of bytes of disk space that should be reserved.
+func estimateDockerImageSize(collectionPDH string) int64 {
+       m := pdhRegexp.FindStringSubmatch(collectionPDH)
+       if m == nil {
+               log.Printf("estimateDockerImageSize: '%v' did not match pdhRegexp, returning 0", collectionPDH)
+               return 0
+       }
+       n, err := strconv.ParseInt(m[1], 10, 64)
+       if err != nil || n < 122 {
+               log.Printf("estimateDockerImageSize: short manifest %v or error (%v), returning 0", n, err)
+               return 0
+       }
+       // To avoid having to fetch the collection, take advantage of
+       // the fact that the manifest storing a container image
+       // uploaded by arv-keepdocker has a predictable format, which
+       // allows us to estimate the size of the image based on just
+       // the size of the manifest.
+       //
+       // Use the following heuristic:
+       // - Start with the length of the mainfest (n)
+       // - Subtract 80 characters for the filename and file segment
+       // - Divide by 42 to get the number of block identifiers ('hash\+size\ ' is 32+1+8+1)
+       // - Assume each block is full, multiply by 64 MiB
+       return ((n - 80) / 42) * (64 * 1024 * 1024)
+}
+
+// EstimateScratchSpace estimates how much available disk space (in
+// bytes) is needed to run the container by summing the capacity
+// requested by 'tmp' mounts plus disk space required to load the
+// Docker image.
+func EstimateScratchSpace(ctr *arvados.Container) (needScratch int64) {
+       for _, m := range ctr.Mounts {
+               if m.Kind == "tmp" {
+                       needScratch += m.Capacity
+               }
+       }
+
+       // Account for disk space usage by Docker, assumes the following behavior:
+       // - Layer tarballs are buffered to disk during "docker load".
+       // - Individual layer tarballs are extracted from buffered
+       // copy to the filesystem
+       dockerImageSize := estimateDockerImageSize(ctr.ContainerImage)
+
+       // The buffer is only needed during image load, so make sure
+       // the baseline scratch space at least covers dockerImageSize,
+       // and assume it will be released to the job afterwards.
+       if needScratch < dockerImageSize {
+               needScratch = dockerImageSize
+       }
+
+       // Now reserve space for the extracted image on disk.
+       needScratch += dockerImageSize
+
+       return
+}
+
 // ChooseInstanceType returns the cheapest available
 // arvados.InstanceType big enough to run ctr.
 func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvados.InstanceType, err error) {
@@ -35,12 +98,7 @@ func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvad
                return
        }
 
-       needScratch := int64(0)
-       for _, m := range ctr.Mounts {
-               if m.Kind == "tmp" {
-                       needScratch += m.Capacity
-               }
-       }
+       needScratch := EstimateScratchSpace(ctr)
 
        needVCPUs := ctr.RuntimeConstraints.VCPUs
 
index 91c6bb1049fb381d9070e747b1f076eec2f95dbc..eef86f74775134b4c6b0848d0b2897c5d47bef29 100644 (file)
@@ -119,3 +119,25 @@ func (*NodeSizeSuite) TestChoosePreemptable(c *check.C) {
        c.Check(best.Scratch >= 2*GiB, check.Equals, true)
        c.Check(best.Preemptible, check.Equals, true)
 }
+
+func (*NodeSizeSuite) TestScratchForDockerImage(c *check.C) {
+       n := EstimateScratchSpace(&arvados.Container{
+               ContainerImage: "d5025c0f29f6eef304a7358afa82a822+342",
+       })
+       // Actual image is 371.1 MiB (according to workbench)
+       // Estimated size is 384 MiB (402653184 bytes)
+       // Want to reserve 2x the estimated size, so 805306368 bytes
+       c.Check(n, check.Equals, int64(805306368))
+
+       n = EstimateScratchSpace(&arvados.Container{
+               ContainerImage: "d5025c0f29f6eef304a7358afa82a822+-342",
+       })
+       // Parse error will return 0
+       c.Check(n, check.Equals, int64(0))
+
+       n = EstimateScratchSpace(&arvados.Container{
+               ContainerImage: "d5025c0f29f6eef304a7358afa82a822+34",
+       })
+       // Short manifest will return 0
+       c.Check(n, check.Equals, int64(0))
+}
index 225741f947e396e3dcc86ad67ca1cdc056e03cfc..7e149528308fc9c6e38e0021af858da5450b58f8 100644 (file)
@@ -195,7 +195,7 @@ def arg_parser():  # type: () -> argparse.ArgumentParser
                         help=argparse.SUPPRESS)
 
     parser.add_argument("--thread-count", type=int,
-                        default=4, help="Number of threads to use for job submit and output collection.")
+                        default=1, help="Number of threads to use for job submit and output collection.")
 
     parser.add_argument("--http-timeout", type=int,
                         default=5*60, dest="http_timeout", help="API request timeout in seconds. Default is 300 seconds (5 minutes).")
index 4c49a449b2a68fdf1eaaa5cd674129ae257dfc6e..4f8c0338b3f04fb9a51afaf21a6d1ebbc47bf992 100644 (file)
@@ -414,8 +414,8 @@ class RunnerContainer(Runner):
             "properties": {}
         }
 
-        if self.tool.tool.get("id", "").startswith("keep:"):
-            sp = self.tool.tool["id"].split('/')
+        if self.embedded_tool.tool.get("id", "").startswith("keep:"):
+            sp = self.embedded_tool.tool["id"].split('/')
             workflowcollection = sp[0][5:]
             workflowname = "/".join(sp[1:])
             workflowpath = "/var/lib/cwl/workflow/%s" % workflowname
@@ -424,14 +424,14 @@ class RunnerContainer(Runner):
                 "portable_data_hash": "%s" % workflowcollection
             }
         else:
-            packed = packed_workflow(self.arvrunner, self.tool, self.merged_map)
+            packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
             workflowpath = "/var/lib/cwl/workflow.json#main"
             container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
                 "kind": "json",
                 "content": packed
             }
-            if self.tool.tool.get("id", "").startswith("arvwf:"):
-                container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
+            if self.embedded_tool.tool.get("id", "").startswith("arvwf:"):
+                container_req["properties"]["template_uuid"] = self.embedded_tool.tool["id"][6:33]
 
 
         # --local means execute the workflow instead of submitting a container request
index 9a03372d32de9375e9401fe4fc4099dce61f1181..2e4ef55015157c4196c649797029b1e9912e00bb 100644 (file)
@@ -296,10 +296,10 @@ class RunnerJob(Runner):
         a pipeline template or pipeline instance.
         """
 
-        if self.tool.tool["id"].startswith("keep:"):
-            self.job_order["cwl:tool"] = self.tool.tool["id"][5:]
+        if self.embedded_tool.tool["id"].startswith("keep:"):
+            self.job_order["cwl:tool"] = self.embedded_tool.tool["id"][5:]
         else:
-            packed = packed_workflow(self.arvrunner, self.tool, self.merged_map)
+            packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
             wf_pdh = upload_workflow_collection(self.arvrunner, self.name, packed)
             self.job_order["cwl:tool"] = "%s/workflow.cwl#main" % wf_pdh
 
@@ -386,19 +386,21 @@ class RunnerTemplate(object):
     }
 
     def __init__(self, runner, tool, job_order, enable_reuse, uuid,
-                 submit_runner_ram=0, name=None, merged_map=None):
+                 submit_runner_ram=0, name=None, merged_map=None,
+                 loadingContext=None):
         self.runner = runner
-        self.tool = tool
+        self.embedded_tool = tool
         self.job = RunnerJob(
             runner=runner,
             tool=tool,
-            job_order=job_order,
             enable_reuse=enable_reuse,
             output_name=None,
             output_tags=None,
             submit_runner_ram=submit_runner_ram,
             name=name,
-            merged_map=merged_map)
+            merged_map=merged_map,
+            loadingContext=loadingContext)
+        self.job.job_order = job_order
         self.uuid = uuid
 
     def pipeline_component_spec(self):
@@ -420,7 +422,7 @@ class RunnerTemplate(object):
         job_params = spec['script_parameters']
         spec['script_parameters'] = {}
 
-        for param in self.tool.tool['inputs']:
+        for param in self.embedded_tool.tool['inputs']:
             param = copy.deepcopy(param)
 
             # Data type and "required" flag...
index cd319e55b12137db6170b37b763a0dccb36d497b..c4e9f44abb0b20ecb66a7bdc13c5240beaaeeccb 100644 (file)
@@ -2,7 +2,7 @@
 #
 # SPDX-License-Identifier: Apache-2.0
 
-from cwltool.command_line_tool import CommandLineTool
+from cwltool.command_line_tool import CommandLineTool, ExpressionTool
 from cwltool.builder import Builder
 from .arvjob import ArvadosJob
 from .arvcontainer import ArvadosContainer
@@ -105,3 +105,15 @@ class ArvadosCommandTool(CommandLineTool):
             runtimeContext.tmpdir = "$(task.tmpdir)"
             runtimeContext.docker_tmpdir = "$(task.tmpdir)"
         return super(ArvadosCommandTool, self).job(joborder, output_callback, runtimeContext)
+
+class ArvadosExpressionTool(ExpressionTool):
+    def __init__(self, arvrunner, toolpath_object, loadingContext):
+        super(ArvadosExpressionTool, self).__init__(toolpath_object, loadingContext)
+        self.arvrunner = arvrunner
+
+    def job(self,
+            job_order,         # type: Mapping[Text, Text]
+            output_callback,  # type: Callable[[Any, Any], Any]
+            runtimeContext     # type: RuntimeContext
+           ):
+        return super(ArvadosExpressionTool, self).job(job_order, self.arvrunner.get_wrapped_callback(output_callback), runtimeContext)
index eb78a25fedbd4754752ff8598d7e1faa6b1585db..ea167d4044d76fa91953eb401962107afd6b878e 100644 (file)
@@ -205,6 +205,9 @@ class ArvadosWorkflow(Workflow):
                                                     raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
                                 if not dyn:
                                     self.static_resource_req.append(req)
+                            if req["class"] == "DockerRequirement":
+                                if "http://arvados.org/cwl#dockerCollectionPDH" in req:
+                                    del req["http://arvados.org/cwl#dockerCollectionPDH"]
 
                 visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit)
 
index 7512d5bef27f28014f650d897d24e3d59cb7b3c4..61f9cbbe0dc80a7ce7c4894ccb2697c0b0310652 100644 (file)
@@ -103,6 +103,7 @@ def run():
         arvargs.output_name = output_name
         arvargs.output_tags = output_tags
         arvargs.thread_count = 1
+        arvargs.collection_cache_size = None
 
         runner = arvados_cwl.ArvCwlExecutor(api_client=arvados.safeapi.ThreadSafeApiCache(
             api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4}),
index 9595b55915477b6cb8beb858e8cc8e4b89f3d603..27774b2f7cf6bd1fbb9bd8474f5dde4e7e4d6d51 100644 (file)
@@ -27,7 +27,7 @@ import arvados_cwl.util
 from .arvcontainer import RunnerContainer
 from .arvjob import RunnerJob, RunnerTemplate
 from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
-from .arvtool import ArvadosCommandTool, validate_cluster_target
+from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
 from .arvworkflow import ArvadosWorkflow, upload_workflow
 from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
 from .perf import Perf
@@ -195,8 +195,10 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
             return ArvadosCommandTool(self, toolpath_object, loadingContext)
         elif "class" in toolpath_object and toolpath_object["class"] == "Workflow":
             return ArvadosWorkflow(self, toolpath_object, loadingContext)
+        elif "class" in toolpath_object and toolpath_object["class"] == "ExpressionTool":
+            return ArvadosExpressionTool(self, toolpath_object, loadingContext)
         else:
-            return cwltool.workflow.default_make_tool(toolpath_object, loadingContext)
+            raise Exception("Unknown tool %s" % toolpath_object.get("class"))
 
     def output_callback(self, out, processStatus):
         with self.workflow_eval_lock:
@@ -557,7 +559,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                                       uuid=existing_uuid,
                                       submit_runner_ram=runtimeContext.submit_runner_ram,
                                       name=runtimeContext.name,
-                                      merged_map=merged_map)
+                                      merged_map=merged_map,
+                                      loadingContext=loadingContext)
                 tmpl.save()
                 # cwltool.main will write our return value to stdout.
                 return (tmpl.uuid, "success")
@@ -616,11 +619,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
             if self.work_api == "containers":
                 if tool.tool["class"] == "CommandLineTool" and runtimeContext.wait and (not runtimeContext.always_submit_runner):
                     runtimeContext.runnerjob = tool.tool["id"]
-                    runnerjob = tool.job(job_order,
-                                         self.output_callback,
-                                         runtimeContext).next()
                 else:
-                    runnerjob = RunnerContainer(self, tool, job_order, runtimeContext.enable_reuse,
+                    tool = RunnerContainer(self, tool, loadingContext, runtimeContext.enable_reuse,
                                                 self.output_name,
                                                 self.output_tags,
                                                 submit_runner_ram=runtimeContext.submit_runner_ram,
@@ -634,7 +634,7 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                                                 collection_cache_size=runtimeContext.collection_cache_size,
                                                 collection_cache_is_default=self.should_estimate_cache_size)
             elif self.work_api == "jobs":
-                runnerjob = RunnerJob(self, tool, job_order, runtimeContext.enable_reuse,
+                tool = RunnerJob(self, tool, loadingContext, runtimeContext.enable_reuse,
                                       self.output_name,
                                       self.output_tags,
                                       submit_runner_ram=runtimeContext.submit_runner_ram,
@@ -652,10 +652,16 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
                     "state": "RunningOnClient"}).execute(num_retries=self.num_retries)
             logger.info("Pipeline instance %s", self.pipeline["uuid"])
 
-        if runnerjob and not runtimeContext.wait:
-            submitargs = runtimeContext.copy()
-            submitargs.submit = False
-            runnerjob.run(submitargs)
+        if runtimeContext.cwl_runner_job is not None:
+            self.uuid = runtimeContext.cwl_runner_job.get('uuid')
+
+        jobiter = tool.job(job_order,
+                           self.output_callback,
+                           runtimeContext)
+
+        if runtimeContext.submit and not runtimeContext.wait:
+            runnerjob = jobiter.next()
+            runnerjob.run(runtimeContext)
             return (runnerjob.uuid, "success")
 
         current_container = arvados_cwl.util.get_current_container(self.api, self.num_retries, logger)
@@ -670,14 +676,6 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
 
         try:
             self.workflow_eval_lock.acquire()
-            if runnerjob:
-                jobiter = iter((runnerjob,))
-            else:
-                if runtimeContext.cwl_runner_job is not None:
-                    self.uuid = runtimeContext.cwl_runner_job.get('uuid')
-                jobiter = tool.job(job_order,
-                                   self.output_callback,
-                                   runtimeContext)
 
             # Holds the lock while this code runs and releases it when
             # it is safe to do so in self.workflow_eval_lock.wait(),
@@ -726,8 +724,10 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
             if self.pipeline:
                 self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
                                                      body={"state": "Failed"}).execute(num_retries=self.num_retries)
-            if runnerjob and runnerjob.uuid and self.work_api == "containers":
-                self.api.container_requests().update(uuid=runnerjob.uuid,
+            if runtimeContext.submit and isinstance(tool, Runner):
+                runnerjob = tool
+                if runnerjob.uuid and self.work_api == "containers":
+                    self.api.container_requests().update(uuid=runnerjob.uuid,
                                                      body={"priority": "0"}).execute(num_retries=self.num_retries)
         finally:
             self.workflow_eval_lock.release()
@@ -742,8 +742,8 @@ http://doc.arvados.org/install/install-api-server.html#disable_api_methods
         if self.final_output is None:
             raise WorkflowException("Workflow did not return a result.")
 
-        if runtimeContext.submit and isinstance(runnerjob, Runner):
-            logger.info("Final output collection %s", runnerjob.final_output)
+        if runtimeContext.submit and isinstance(tool, Runner):
+            logger.info("Final output collection %s", tool.final_output)
         else:
             if self.output_name is None:
                 self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
index 6094cfe245872b1b58976901668bd80a8b5b91b0..1d5f98f200c16bb444878da3b418f01b57bf1002 100644 (file)
@@ -16,7 +16,7 @@ from schema_salad.sourceline import SourceLine, cmap
 
 from cwltool.command_line_tool import CommandLineTool
 import cwltool.workflow
-from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname
+from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process
 from cwltool.load_tool import fetch_document
 from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
 from cwltool.utils import aslist
@@ -356,25 +356,28 @@ def upload_workflow_collection(arvrunner, name, packed):
     return collection.portable_data_hash()
 
 
-class Runner(object):
+class Runner(Process):
     """Base class for runner processes, which submit an instance of
     arvados-cwl-runner and wait for the final result."""
 
-    def __init__(self, runner, tool, job_order, enable_reuse,
+    def __init__(self, runner, tool, loadingContext, enable_reuse,
                  output_name, output_tags, submit_runner_ram=0,
                  name=None, on_error=None, submit_runner_image=None,
                  intermediate_output_ttl=0, merged_map=None,
                  priority=None, secret_store=None,
                  collection_cache_size=256,
                  collection_cache_is_default=True):
+
+        super(Runner, self).__init__(tool.tool, loadingContext)
+
         self.arvrunner = runner
-        self.tool = tool
-        self.job_order = job_order
+        self.embedded_tool = tool
+        self.job_order = None
         self.running = False
         if enable_reuse:
             # If reuse is permitted by command line arguments but
             # disabled by the workflow itself, disable it.
-            reuse_req, _ = self.tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
+            reuse_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#ReuseRequirement")
             if reuse_req:
                 enable_reuse = reuse_req["enableReuse"]
         self.enable_reuse = enable_reuse
@@ -393,7 +396,7 @@ class Runner(object):
         self.submit_runner_ram = 1024  # defaut 1 GiB
         self.collection_cache_size = collection_cache_size
 
-        runner_resource_req, _ = self.tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
+        runner_resource_req, _ = self.embedded_tool.get_requirement("http://arvados.org/cwl#WorkflowRunnerResources")
         if runner_resource_req:
             if runner_resource_req.get("coresMin"):
                 self.submit_runner_cores = runner_resource_req["coresMin"]
@@ -414,6 +417,15 @@ class Runner(object):
 
         self.merged_map = merged_map or {}
 
+    def job(self,
+            job_order,         # type: Mapping[Text, Text]
+            output_callbacks,  # type: Callable[[Any, Any], Any]
+            runtimeContext     # type: RuntimeContext
+           ):  # type: (...) -> Generator[Any, None, None]
+        self.job_order = job_order
+        self._init_job(job_order, runtimeContext)
+        yield self
+
     def update_pipeline_component(self, record):
         pass
 
index 9d25a562ab32d09dcdfba627fc2089260879cce1..f731db9555e4f1c0e73b0f300fdf3349ddb89f42 100644 (file)
@@ -33,8 +33,8 @@ setup(name='arvados-cwl-runner',
       # Note that arvados/build/run-build-packages.sh looks at this
       # file to determine what version of cwltool and schema-salad to build.
       install_requires=[
-          'cwltool==1.0.20181116032456',
-          'schema-salad==2.7.20181116024232',
+          'cwltool==1.0.20181201184214',
+          'schema-salad==3.0.20181129082112',
           'typing >= 3.6.4',
           'ruamel.yaml >=0.15.54, <=0.15.77',
           'arvados-python-client>=1.2.1.20181130020805',
diff --git a/sdk/cwl/tests/submit_test_job_missing.json b/sdk/cwl/tests/submit_test_job_missing.json
new file mode 100644 (file)
index 0000000..02d61fa
--- /dev/null
@@ -0,0 +1,14 @@
+{
+    "x": {
+        "class": "File",
+        "path": "input/blorp.txt"
+    },
+    "y": {
+        "class": "Directory",
+        "location": "keep:99999999999999999999999999999998+99",
+        "listing": [{
+            "class": "File",
+            "location": "keep:99999999999999999999999999999998+99/file1.txt"
+        }]
+    }
+}
index 8a40fbaf066fb9c42fc741a0138a809d5f9ed4ad..90dab01471ef61ab380955e6301a73306648edef 100644 (file)
@@ -273,7 +273,7 @@ def stubs(func):
             'state': 'Committed',
             'command': ['arvados-cwl-runner', '--local', '--api=containers',
                         '--no-log-timestamps', '--disable-validate',
-                        '--eval-timeout=20', '--thread-count=4',
+                        '--eval-timeout=20', '--thread-count=1',
                         '--enable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
                         '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
             'name': 'submit_wf.cwl',
@@ -558,7 +558,7 @@ class TestSubmit(unittest.TestCase):
         expect_container["command"] = [
             'arvados-cwl-runner', '--local', '--api=containers',
             '--no-log-timestamps', '--disable-validate',
-            '--eval-timeout=20', '--thread-count=4',
+            '--eval-timeout=20', '--thread-count=1',
             '--disable-reuse', "--collection-cache-size=256",
             '--debug', '--on-error=continue',
             '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
@@ -584,7 +584,7 @@ class TestSubmit(unittest.TestCase):
         expect_container["command"] = [
             'arvados-cwl-runner', '--local', '--api=containers',
             '--no-log-timestamps', '--disable-validate',
-            '--eval-timeout=20', '--thread-count=4',
+            '--eval-timeout=20', '--thread-count=1',
             '--disable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
             '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
         expect_container["use_existing"] = False
@@ -621,7 +621,7 @@ class TestSubmit(unittest.TestCase):
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
                                        '--no-log-timestamps', '--disable-validate',
-                                       '--eval-timeout=20', '--thread-count=4',
+                                       '--eval-timeout=20', '--thread-count=1',
                                        '--enable-reuse', "--collection-cache-size=256",
                                        '--debug', '--on-error=stop',
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
@@ -648,7 +648,7 @@ class TestSubmit(unittest.TestCase):
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
                                        '--no-log-timestamps', '--disable-validate',
-                                       '--eval-timeout=20', '--thread-count=4',
+                                       '--eval-timeout=20', '--thread-count=1',
                                        '--enable-reuse', "--collection-cache-size=256",
                                        "--output-name="+output_name, '--debug', '--on-error=continue',
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
@@ -674,7 +674,7 @@ class TestSubmit(unittest.TestCase):
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
                                        '--no-log-timestamps', '--disable-validate',
-                                       '--eval-timeout=20', '--thread-count=4',
+                                       '--eval-timeout=20', '--thread-count=1',
                                        '--enable-reuse', "--collection-cache-size=256", "--debug",
                                        "--storage-classes=foo", '--on-error=continue',
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
@@ -741,7 +741,7 @@ class TestSubmit(unittest.TestCase):
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
                                        '--no-log-timestamps', '--disable-validate',
-                                       '--eval-timeout=20', '--thread-count=4',
+                                       '--eval-timeout=20', '--thread-count=1',
                                        '--enable-reuse', "--collection-cache-size=256", '--debug',
                                        '--on-error=continue',
                                        "--intermediate-output-ttl=3600",
@@ -767,7 +767,7 @@ class TestSubmit(unittest.TestCase):
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
                                        '--no-log-timestamps', '--disable-validate',
-                                       '--eval-timeout=20', '--thread-count=4',
+                                       '--eval-timeout=20', '--thread-count=1',
                                        '--enable-reuse', "--collection-cache-size=256",
                                        '--debug', '--on-error=continue',
                                        "--trash-intermediate",
@@ -795,7 +795,7 @@ class TestSubmit(unittest.TestCase):
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
                                        '--no-log-timestamps', '--disable-validate',
-                                       '--eval-timeout=20', '--thread-count=4',
+                                       '--eval-timeout=20', '--thread-count=1',
                                        '--enable-reuse', "--collection-cache-size=256",
                                        "--output-tags="+output_tags, '--debug', '--on-error=continue',
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
@@ -880,7 +880,7 @@ class TestSubmit(unittest.TestCase):
             'container_image': '999999999999999999999999999999d3+99',
             'command': ['arvados-cwl-runner', '--local', '--api=containers',
                         '--no-log-timestamps', '--disable-validate',
-                        '--eval-timeout=20', '--thread-count=4',
+                        '--eval-timeout=20', '--thread-count=1',
                         '--enable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
                         '/var/lib/cwl/workflow/expect_arvworkflow.cwl#main', '/var/lib/cwl/cwl.input.json'],
             'cwd': '/var/spool/cwl',
@@ -1002,7 +1002,7 @@ class TestSubmit(unittest.TestCase):
             'container_image': "999999999999999999999999999999d3+99",
             'command': ['arvados-cwl-runner', '--local', '--api=containers',
                         '--no-log-timestamps', '--disable-validate',
-                        '--eval-timeout=20', '--thread-count=4',
+                        '--eval-timeout=20', '--thread-count=1',
                         '--enable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
                         '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
             'cwd': '/var/spool/cwl',
@@ -1045,6 +1045,23 @@ class TestSubmit(unittest.TestCase):
                          stubs.expect_container_request_uuid + '\n')
 
 
+    @stubs
+    def test_submit_missing_input(self, stubs):
+        capture_stdout = cStringIO.StringIO()
+        exited = arvados_cwl.main(
+            ["--submit", "--no-wait", "--api=containers", "--debug",
+             "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+            capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+        self.assertEqual(exited, 0)
+
+        capture_stdout = cStringIO.StringIO()
+        exited = arvados_cwl.main(
+            ["--submit", "--no-wait", "--api=containers", "--debug",
+             "tests/wf/submit_wf.cwl", "tests/submit_test_job_missing.json"],
+            capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+        self.assertEqual(exited, 1)
+
+
     @stubs
     def test_submit_container_project(self, stubs):
         project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
@@ -1062,7 +1079,7 @@ class TestSubmit(unittest.TestCase):
         expect_container["owner_uuid"] = project_uuid
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
                                        '--no-log-timestamps', '--disable-validate',
-                                       "--eval-timeout=20", "--thread-count=4",
+                                       "--eval-timeout=20", "--thread-count=1",
                                        '--enable-reuse', "--collection-cache-size=256", '--debug',
                                        '--on-error=continue',
                                        '--project-uuid='+project_uuid,
@@ -1089,7 +1106,7 @@ class TestSubmit(unittest.TestCase):
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
                                        '--no-log-timestamps', '--disable-validate',
-                                       '--eval-timeout=60.0', '--thread-count=4',
+                                       '--eval-timeout=60.0', '--thread-count=1',
                                        '--enable-reuse', "--collection-cache-size=256",
                                        '--debug', '--on-error=continue',
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
@@ -1115,7 +1132,7 @@ class TestSubmit(unittest.TestCase):
         expect_container = copy.deepcopy(stubs.expect_container_spec)
         expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
                                        '--no-log-timestamps', '--disable-validate',
-                                       '--eval-timeout=20', '--thread-count=4',
+                                       '--eval-timeout=20', '--thread-count=1',
                                        '--enable-reuse', "--collection-cache-size=500",
                                        '--debug', '--on-error=continue',
                                        '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
@@ -1247,7 +1264,7 @@ class TestSubmit(unittest.TestCase):
         }
         expect_container['command'] = ['arvados-cwl-runner', '--local', '--api=containers',
                         '--no-log-timestamps', '--disable-validate',
-                        '--eval-timeout=20', '--thread-count=4',
+                        '--eval-timeout=20', '--thread-count=1',
                         '--enable-reuse', "--collection-cache-size=512", '--debug', '--on-error=continue',
                         '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
 
@@ -1317,7 +1334,7 @@ class TestSubmit(unittest.TestCase):
                 "--no-log-timestamps",
                 "--disable-validate",
                 "--eval-timeout=20",
-                '--thread-count=4',
+                '--thread-count=1',
                 "--enable-reuse",
                 "--collection-cache-size=256",
                 '--debug',
diff --git a/sdk/go/arvados/contextgroup.go b/sdk/go/arvados/contextgroup.go
new file mode 100644 (file)
index 0000000..fa0de24
--- /dev/null
@@ -0,0 +1,95 @@
+package arvados
+
+import (
+       "context"
+       "sync"
+)
+
+// A contextGroup is a context-aware variation on sync.WaitGroup. It
+// provides a child context for the added funcs to use, so they can
+// exit early if another added func returns an error. Its Wait()
+// method returns the first error returned by any added func.
+//
+// Example:
+//
+//     err := errors.New("oops")
+//     cg := newContextGroup()
+//     defer cg.Cancel()
+//     cg.Go(func() error {
+//             someFuncWithContext(cg.Context())
+//             return nil
+//     })
+//     cg.Go(func() error {
+//             return err // this cancels cg.Context()
+//     })
+//     return cg.Wait() // returns err after both goroutines have ended
+type contextGroup struct {
+       ctx    context.Context
+       cancel context.CancelFunc
+       wg     sync.WaitGroup
+       err    error
+       mtx    sync.Mutex
+}
+
+// newContextGroup returns a new contextGroup. The caller must
+// eventually call the Cancel() method of the returned contextGroup.
+func newContextGroup(ctx context.Context) *contextGroup {
+       ctx, cancel := context.WithCancel(ctx)
+       return &contextGroup{
+               ctx:    ctx,
+               cancel: cancel,
+       }
+}
+
+// Cancel cancels the context group.
+func (cg *contextGroup) Cancel() {
+       cg.cancel()
+}
+
+// Context returns a context.Context which will be canceled when all
+// funcs have succeeded or one has failed.
+func (cg *contextGroup) Context() context.Context {
+       return cg.ctx
+}
+
+// Go calls f in a new goroutine. If f returns an error, the
+// contextGroup is canceled.
+//
+// If f notices cg.Context() is done, it should abandon further work
+// and return. In this case, f's return value will be ignored.
+func (cg *contextGroup) Go(f func() error) {
+       cg.mtx.Lock()
+       defer cg.mtx.Unlock()
+       if cg.err != nil {
+               return
+       }
+       cg.wg.Add(1)
+       go func() {
+               defer cg.wg.Done()
+               err := f()
+               cg.mtx.Lock()
+               defer cg.mtx.Unlock()
+               if err != nil && cg.err == nil {
+                       cg.err = err
+                       cg.cancel()
+               }
+       }()
+}
+
+// Wait waits for all added funcs to return, and returns the first
+// non-nil error.
+//
+// If the parent context is canceled before a func returns an error,
+// Wait returns the parent context's Err().
+//
+// Wait returns nil if all funcs return nil before the parent context
+// is canceled.
+func (cg *contextGroup) Wait() error {
+       cg.wg.Wait()
+       cg.mtx.Lock()
+       defer cg.mtx.Unlock()
+       if cg.err != nil {
+               return cg.err
+       }
+       return cg.ctx.Err()
+}
index b996542abd52cf7be04549962fdb31dfb7a366a0..6644f4cfb8e93ef7d601e667cee21a9dbce5d39b 100644 (file)
@@ -5,6 +5,7 @@
 package arvados
 
 import (
+       "context"
        "encoding/json"
        "fmt"
        "io"
@@ -18,7 +19,11 @@ import (
        "time"
 )
 
-var maxBlockSize = 1 << 26
+var (
+       maxBlockSize      = 1 << 26
+       concurrentWriters = 4 // max goroutines writing to Keep during sync()
+       writeAheadBlocks  = 1 // max background jobs flushing to Keep before blocking writes
+)
 
 // A CollectionFileSystem is a FileSystem that can be serialized as a
 // manifest and stored as a collection.
@@ -136,7 +141,7 @@ func (fs *collectionFileSystem) Sync() error {
 func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
        fs.fileSystem.root.Lock()
        defer fs.fileSystem.root.Unlock()
-       return fs.fileSystem.root.(*dirnode).marshalManifest(prefix)
+       return fs.fileSystem.root.(*dirnode).marshalManifest(context.TODO(), prefix, newThrottle(concurrentWriters))
 }
 
 func (fs *collectionFileSystem) Size() int64 {
@@ -228,6 +233,7 @@ type filenode struct {
        memsize  int64 // bytes in memSegments
        sync.RWMutex
        nullnode
+       throttle *throttle
 }
 
 // caller must have lock
@@ -490,30 +496,75 @@ func (fn *filenode) Write(p []byte, startPtr filenodePtr) (n int, ptr filenodePt
 // Write some data out to disk to reduce memory use. Caller must have
 // write lock.
 func (fn *filenode) pruneMemSegments() {
-       // TODO: async (don't hold Lock() while waiting for Keep)
        // TODO: share code with (*dirnode)sync()
        // TODO: pack/flush small blocks too, when fragmented
+       if fn.throttle == nil {
+               // TODO: share a throttle with filesystem
+               fn.throttle = newThrottle(writeAheadBlocks)
+       }
        for idx, seg := range fn.segments {
                seg, ok := seg.(*memSegment)
-               if !ok || seg.Len() < maxBlockSize {
-                       continue
-               }
-               locator, _, err := fn.FS().PutB(seg.buf)
-               if err != nil {
-                       // TODO: stall (or return errors from)
-                       // subsequent writes until flushing
-                       // starts to succeed
+               if !ok || seg.Len() < maxBlockSize || seg.flushing != nil {
                        continue
                }
-               fn.memsize -= int64(seg.Len())
-               fn.segments[idx] = storedSegment{
-                       kc:      fn.FS(),
-                       locator: locator,
-                       size:    seg.Len(),
-                       offset:  0,
-                       length:  seg.Len(),
+               // Setting seg.flushing guarantees seg.buf will not be
+               // modified in place: WriteAt and Truncate will
+               // allocate a new buf instead, if necessary.
+               idx, buf := idx, seg.buf
+               done := make(chan struct{})
+               seg.flushing = done
+               // If lots of background writes are already in
+               // progress, block here until one finishes, rather
+               // than pile up an unlimited number of buffered writes
+               // and network flush operations.
+               fn.throttle.Acquire()
+               go func() {
+                       defer close(done)
+                       locator, _, err := fn.FS().PutB(buf)
+                       fn.throttle.Release()
+                       fn.Lock()
+                       defer fn.Unlock()
+                       if curbuf := seg.buf[:1]; &curbuf[0] != &buf[0] {
+                               // A new seg.buf has been allocated.
+                               return
+                       }
+                       seg.flushing = nil
+                       if err != nil {
+                               // TODO: stall (or return errors from)
+                               // subsequent writes until flushing
+                               // starts to succeed.
+                               return
+                       }
+                       if len(fn.segments) <= idx || fn.segments[idx] != seg || len(seg.buf) != len(buf) {
+                               // Segment has been dropped/moved/resized.
+                               return
+                       }
+                       fn.memsize -= int64(len(buf))
+                       fn.segments[idx] = storedSegment{
+                               kc:      fn.FS(),
+                               locator: locator,
+                               size:    len(buf),
+                               offset:  0,
+                               length:  len(buf),
+                       }
+               }()
+       }
+}
+
+// Block until all pending pruneMemSegments work is finished. Caller
+// must NOT have lock.
+func (fn *filenode) waitPrune() {
+       var pending []<-chan struct{}
+       fn.Lock()
+       for _, seg := range fn.segments {
+               if seg, ok := seg.(*memSegment); ok && seg.flushing != nil {
+                       pending = append(pending, seg.flushing)
                }
        }
+       fn.Unlock()
+       for _, p := range pending {
+               <-p
+       }
 }
 
 type dirnode struct {
@@ -546,46 +597,67 @@ func (dn *dirnode) Child(name string, replace func(inode) (inode, error)) (inode
        return dn.treenode.Child(name, replace)
 }
 
+type fnSegmentRef struct {
+       fn  *filenode
+       idx int
+}
+
+// commitBlock concatenates the data from the given filenode segments
+// (which must be *memSegments), writes the data out to Keep as a
+// single block, and replaces the filenodes' *memSegments with
+// storedSegments that reference the relevant portions of the new
+// block.
+//
+// Caller must have write lock.
+func (dn *dirnode) commitBlock(ctx context.Context, throttle *throttle, refs []fnSegmentRef) error {
+       if len(refs) == 0 {
+               return nil
+       }
+       throttle.Acquire()
+       defer throttle.Release()
+       if err := ctx.Err(); err != nil {
+               return err
+       }
+       block := make([]byte, 0, maxBlockSize)
+       for _, ref := range refs {
+               block = append(block, ref.fn.segments[ref.idx].(*memSegment).buf...)
+       }
+       locator, _, err := dn.fs.PutB(block)
+       if err != nil {
+               return err
+       }
+       off := 0
+       for _, ref := range refs {
+               data := ref.fn.segments[ref.idx].(*memSegment).buf
+               ref.fn.segments[ref.idx] = storedSegment{
+                       kc:      dn.fs,
+                       locator: locator,
+                       size:    len(block),
+                       offset:  off,
+                       length:  len(data),
+               }
+               off += len(data)
+               ref.fn.memsize -= int64(len(data))
+       }
+       return nil
+}
+
 // sync flushes in-memory data and remote block references (for the
 // children with the given names, which must be children of dn) to
 // local persistent storage. Caller must have write lock on dn and the
 // named children.
-func (dn *dirnode) sync(names []string) error {
-       type shortBlock struct {
-               fn  *filenode
-               idx int
-       }
-       var pending []shortBlock
-       var pendingLen int
+func (dn *dirnode) sync(ctx context.Context, throttle *throttle, names []string) error {
+       cg := newContextGroup(ctx)
+       defer cg.Cancel()
 
-       flush := func(sbs []shortBlock) error {
-               if len(sbs) == 0 {
-                       return nil
-               }
-               block := make([]byte, 0, maxBlockSize)
-               for _, sb := range sbs {
-                       block = append(block, sb.fn.segments[sb.idx].(*memSegment).buf...)
-               }
-               locator, _, err := dn.fs.PutB(block)
-               if err != nil {
-                       return err
-               }
-               off := 0
-               for _, sb := range sbs {
-                       data := sb.fn.segments[sb.idx].(*memSegment).buf
-                       sb.fn.segments[sb.idx] = storedSegment{
-                               kc:      dn.fs,
-                               locator: locator,
-                               size:    len(block),
-                               offset:  off,
-                               length:  len(data),
-                       }
-                       off += len(data)
-                       sb.fn.memsize -= int64(len(data))
-               }
-               return nil
+       goCommit := func(refs []fnSegmentRef) {
+               cg.Go(func() error {
+                       return dn.commitBlock(cg.Context(), throttle, refs)
+               })
        }
 
+       var pending []fnSegmentRef
+       var pendingLen int = 0
        localLocator := map[string]string{}
        for _, name := range names {
                fn, ok := dn.inodes[name].(*filenode)
@@ -608,39 +680,29 @@ func (dn *dirnode) sync(names []string) error {
                                fn.segments[idx] = seg
                        case *memSegment:
                                if seg.Len() > maxBlockSize/2 {
-                                       if err := flush([]shortBlock{{fn, idx}}); err != nil {
-                                               return err
-                                       }
+                                       goCommit([]fnSegmentRef{{fn, idx}})
                                        continue
                                }
                                if pendingLen+seg.Len() > maxBlockSize {
-                                       if err := flush(pending); err != nil {
-                                               return err
-                                       }
+                                       goCommit(pending)
                                        pending = nil
                                        pendingLen = 0
                                }
-                               pending = append(pending, shortBlock{fn, idx})
+                               pending = append(pending, fnSegmentRef{fn, idx})
                                pendingLen += seg.Len()
                        default:
                                panic(fmt.Sprintf("can't sync segment type %T", seg))
                        }
                }
        }
-       return flush(pending)
+       goCommit(pending)
+       return cg.Wait()
 }
 
 // caller must have write lock.
-func (dn *dirnode) marshalManifest(prefix string) (string, error) {
-       var streamLen int64
-       type filepart struct {
-               name   string
-               offset int64
-               length int64
-       }
-       var fileparts []filepart
-       var subdirs string
-       var blocks []string
+func (dn *dirnode) marshalManifest(ctx context.Context, prefix string, throttle *throttle) (string, error) {
+       cg := newContextGroup(ctx)
+       defer cg.Cancel()
 
        if len(dn.inodes) == 0 {
                if prefix == "." {
@@ -658,26 +720,61 @@ func (dn *dirnode) marshalManifest(prefix string) (string, error) {
                names = append(names, name)
        }
        sort.Strings(names)
+
+       // Wait for children to finish any pending write operations
+       // before locking them.
        for _, name := range names {
                node := dn.inodes[name]
-               node.Lock()
-               defer node.Unlock()
-       }
-       if err := dn.sync(names); err != nil {
-               return "", err
+               if fn, ok := node.(*filenode); ok {
+                       fn.waitPrune()
+               }
        }
+
+       var dirnames []string
+       var filenames []string
        for _, name := range names {
-               switch node := dn.inodes[name].(type) {
+               node := dn.inodes[name]
+               node.Lock()
+               defer node.Unlock()
+               switch node := node.(type) {
                case *dirnode:
-                       subdir, err := node.marshalManifest(prefix + "/" + name)
-                       if err != nil {
-                               return "", err
-                       }
-                       subdirs = subdirs + subdir
+                       dirnames = append(dirnames, name)
                case *filenode:
+                       filenames = append(filenames, name)
+               default:
+                       panic(fmt.Sprintf("can't marshal inode type %T", node))
+               }
+       }
+
+       subdirs := make([]string, len(dirnames))
+       rootdir := ""
+       for i, name := range dirnames {
+               i, name := i, name
+               cg.Go(func() error {
+                       txt, err := dn.inodes[name].(*dirnode).marshalManifest(cg.Context(), prefix+"/"+name, throttle)
+                       subdirs[i] = txt
+                       return err
+               })
+       }
+
+       cg.Go(func() error {
+               var streamLen int64
+               type filepart struct {
+                       name   string
+                       offset int64
+                       length int64
+               }
+
+               var fileparts []filepart
+               var blocks []string
+               if err := dn.sync(cg.Context(), throttle, names); err != nil {
+                       return err
+               }
+               for _, name := range filenames {
+                       node := dn.inodes[name].(*filenode)
                        if len(node.segments) == 0 {
                                fileparts = append(fileparts, filepart{name: name})
-                               break
+                               continue
                        }
                        for _, seg := range node.segments {
                                switch seg := seg.(type) {
@@ -707,20 +804,21 @@ func (dn *dirnode) marshalManifest(prefix string) (string, error) {
                                        panic(fmt.Sprintf("can't marshal segment type %T", seg))
                                }
                        }
-               default:
-                       panic(fmt.Sprintf("can't marshal inode type %T", node))
                }
-       }
-       var filetokens []string
-       for _, s := range fileparts {
-               filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
-       }
-       if len(filetokens) == 0 {
-               return subdirs, nil
-       } else if len(blocks) == 0 {
-               blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
-       }
-       return manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n" + subdirs, nil
+               var filetokens []string
+               for _, s := range fileparts {
+                       filetokens = append(filetokens, fmt.Sprintf("%d:%d:%s", s.offset, s.length, manifestEscape(s.name)))
+               }
+               if len(filetokens) == 0 {
+                       return nil
+               } else if len(blocks) == 0 {
+                       blocks = []string{"d41d8cd98f00b204e9800998ecf8427e+0"}
+               }
+               rootdir = manifestEscape(prefix) + " " + strings.Join(blocks, " ") + " " + strings.Join(filetokens, " ") + "\n"
+               return nil
+       })
+       err := cg.Wait()
+       return rootdir + strings.Join(subdirs, ""), err
 }
 
 func (dn *dirnode) loadManifest(txt string) error {
@@ -936,6 +1034,11 @@ type segment interface {
 
 type memSegment struct {
        buf []byte
+       // If flushing is not nil, then a) buf is being shared by a
+       // pruneMemSegments goroutine, and must be copied on write;
+       // and b) the flushing channel will close when the goroutine
+       // finishes, whether it succeeds or not.
+       flushing <-chan struct{}
 }
 
 func (me *memSegment) Len() int {
@@ -952,28 +1055,31 @@ func (me *memSegment) Slice(off, length int) segment {
 }
 
 func (me *memSegment) Truncate(n int) {
-       if n > cap(me.buf) {
+       if n > cap(me.buf) || (me.flushing != nil && n > len(me.buf)) {
                newsize := 1024
                for newsize < n {
                        newsize = newsize << 2
                }
                newbuf := make([]byte, n, newsize)
                copy(newbuf, me.buf)
-               me.buf = newbuf
+               me.buf, me.flushing = newbuf, nil
        } else {
-               // Zero unused part when shrinking, in case we grow
-               // and start using it again later.
-               for i := n; i < len(me.buf); i++ {
+               // reclaim existing capacity, and zero reclaimed part
+               oldlen := len(me.buf)
+               me.buf = me.buf[:n]
+               for i := oldlen; i < n; i++ {
                        me.buf[i] = 0
                }
        }
-       me.buf = me.buf[:n]
 }
 
 func (me *memSegment) WriteAt(p []byte, off int) {
        if off+len(p) > len(me.buf) {
                panic("overflowed segment")
        }
+       if me.flushing != nil {
+               me.buf, me.flushing = append([]byte(nil), me.buf...), nil
+       }
        copy(me.buf[off:], p)
 }
 
index a6d4ab1e5b71baccabafdbdf810db0ee264420a5..2ae2bd8924e23b583a267091cc6b9985e52d3422 100644 (file)
@@ -19,6 +19,7 @@ import (
        "runtime"
        "strings"
        "sync"
+       "sync/atomic"
        "testing"
        "time"
 
@@ -31,6 +32,7 @@ var _ = check.Suite(&CollectionFSSuite{})
 type keepClientStub struct {
        blocks      map[string][]byte
        refreshable map[string]bool
+       onPut       func(bufcopy []byte) // called from PutB, before acquiring lock
        sync.RWMutex
 }
 
@@ -50,6 +52,9 @@ func (kcs *keepClientStub) PutB(p []byte) (string, int, error) {
        locator := fmt.Sprintf("%x+%d+A12345@abcde", md5.Sum(p), len(p))
        buf := make([]byte, len(p))
        copy(buf, p)
+       if kcs.onPut != nil {
+               kcs.onPut(buf)
+       }
        kcs.Lock()
        defer kcs.Unlock()
        kcs.blocks[locator[:32]] = buf
@@ -583,7 +588,7 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
        const ngoroutines = 256
 
        var wg sync.WaitGroup
-       for n := 0; n < nfiles; n++ {
+       for n := 0; n < ngoroutines; n++ {
                wg.Add(1)
                go func(n int) {
                        defer wg.Done()
@@ -592,7 +597,7 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
                        f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDWR|os.O_CREATE|os.O_EXCL, 0)
                        c.Assert(err, check.IsNil)
                        defer f.Close()
-                       for i := 0; i < ngoroutines; i++ {
+                       for i := 0; i < nfiles; i++ {
                                trunc := rand.Intn(65)
                                woff := rand.Intn(trunc + 1)
                                wbytes = wbytes[:rand.Intn(64-woff+1)]
@@ -618,11 +623,18 @@ func (s *CollectionFSSuite) TestRandomWrites(c *check.C) {
                                c.Check(string(buf), check.Equals, string(expect))
                                c.Check(err, check.IsNil)
                        }
-                       s.checkMemSize(c, f)
                }(n)
        }
        wg.Wait()
 
+       for n := 0; n < ngoroutines; n++ {
+               f, err := s.fs.OpenFile(fmt.Sprintf("random-%d", n), os.O_RDONLY, 0)
+               c.Assert(err, check.IsNil)
+               f.(*filehandle).inode.(*filenode).waitPrune()
+               s.checkMemSize(c, f)
+               defer f.Close()
+       }
+
        root, err := s.fs.Open("/")
        c.Assert(err, check.IsNil)
        defer root.Close()
@@ -1029,8 +1041,37 @@ func (s *CollectionFSSuite) TestOpenFileFlags(c *check.C) {
 }
 
 func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
+       defer func(wab, mbs int) {
+               writeAheadBlocks = wab
+               maxBlockSize = mbs
+       }(writeAheadBlocks, maxBlockSize)
+       writeAheadBlocks = 2
        maxBlockSize = 1024
-       defer func() { maxBlockSize = 2 << 26 }()
+
+       proceed := make(chan struct{})
+       var started, concurrent int32
+       blk2done := false
+       s.kc.onPut = func([]byte) {
+               atomic.AddInt32(&concurrent, 1)
+               switch atomic.AddInt32(&started, 1) {
+               case 1:
+                       // Wait until block 2 starts and finishes, and block 3 starts
+                       select {
+                       case <-proceed:
+                               c.Check(blk2done, check.Equals, true)
+                       case <-time.After(time.Second):
+                               c.Error("timed out")
+                       }
+               case 2:
+                       time.Sleep(time.Millisecond)
+                       blk2done = true
+               case 3:
+                       close(proceed)
+               default:
+                       time.Sleep(time.Millisecond)
+               }
+               c.Check(atomic.AddInt32(&concurrent, -1) < int32(writeAheadBlocks), check.Equals, true)
+       }
 
        fs, err := (&Collection{}).FileSystem(s.client, s.kc)
        c.Assert(err, check.IsNil)
@@ -1056,6 +1097,7 @@ func (s *CollectionFSSuite) TestFlushFullBlocks(c *check.C) {
                }
                return
        }
+       f.(*filehandle).inode.(*filenode).waitPrune()
        c.Check(currentMemExtents(), check.HasLen, 1)
 
        m, err := fs.MarshalManifest(".")
diff --git a/sdk/go/arvados/throttle.go b/sdk/go/arvados/throttle.go
new file mode 100644 (file)
index 0000000..464b73b
--- /dev/null
@@ -0,0 +1,17 @@
+package arvados
+
+type throttle struct {
+       c chan struct{}
+}
+
+func newThrottle(n int) *throttle {
+       return &throttle{c: make(chan struct{}, n)}
+}
+
+func (t *throttle) Acquire() {
+       t.c <- struct{}{}
+}
+
+func (t *throttle) Release() {
+       <-t.c
+}
index 3281d78e209db3a0e69726d285c59b456ea93035..37666eb8e8b8f7e2d8f4cbbdf76ff7bda56b003b 100644 (file)
@@ -88,9 +88,6 @@ class _FileLikeObjectBase(object):
 class ArvadosFileReaderBase(_FileLikeObjectBase):
     def __init__(self, name, mode, num_retries=None):
         super(ArvadosFileReaderBase, self).__init__(name, mode)
-        self._binary = 'b' in mode
-        if sys.version_info >= (3, 0) and not self._binary:
-            raise NotImplementedError("text mode {!r} is not implemented".format(mode))
         self._filepos = 0
         self.num_retries = num_retries
         self._readline_cache = (None, None)
@@ -1278,6 +1275,11 @@ class ArvadosFileReader(ArvadosFileReaderBase):
     def stream_name(self):
         return self.arvadosfile.parent.stream_name()
 
+    def readinto(self, b):
+        data = self.read(len(b))
+        b[:len(data)] = data
+        return len(data)
+
     @_FileLikeObjectBase._before_close
     @retry_method
     def read(self, size=None, num_retries=None):
@@ -1356,3 +1358,33 @@ class ArvadosFileWriter(ArvadosFileReader):
         if not self.closed:
             self.arvadosfile.remove_writer(self, flush)
             super(ArvadosFileWriter, self).close()
+
+
+class WrappableFile(object):
+    """An interface to an Arvados file that's compatible with io wrappers.
+
+    """
+    def __init__(self, f):
+        self.f = f
+        self.closed = False
+    def close(self):
+        self.closed = True
+        return self.f.close()
+    def flush(self):
+        return self.f.flush()
+    def read(self, *args, **kwargs):
+        return self.f.read(*args, **kwargs)
+    def readable(self):
+        return self.f.readable()
+    def readinto(self, *args, **kwargs):
+        return self.f.readinto(*args, **kwargs)
+    def seek(self, *args, **kwargs):
+        return self.f.seek(*args, **kwargs)
+    def seekable(self):
+        return self.f.seekable()
+    def tell(self):
+        return self.f.tell()
+    def writable(self):
+        return self.f.writable()
+    def write(self, *args, **kwargs):
+        return self.f.write(*args, **kwargs)
index 48fdaf03ecd685f3e420437cebb3d46fd1741085..627f0346db2c6760710db3edaf356f4cb724bf91 100644 (file)
@@ -7,21 +7,23 @@ from future.utils import listitems, listvalues, viewkeys
 from builtins import str
 from past.builtins import basestring
 from builtins import object
+import ciso8601
+import datetime
+import errno
 import functools
+import hashlib
+import io
 import logging
 import os
 import re
-import errno
-import hashlib
-import datetime
-import ciso8601
-import time
+import sys
 import threading
+import time
 
 from collections import deque
 from stat import *
 
-from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, _BlockManager, synchronized, must_be_writable, NoopLock
+from .arvfile import split, _FileLikeObjectBase, ArvadosFile, ArvadosFileWriter, ArvadosFileReader, WrappableFile, _BlockManager, synchronized, must_be_writable, NoopLock
 from .keep import KeepLocator, KeepClient
 from .stream import StreamReader
 from ._normalize_stream import normalize_stream
@@ -35,6 +37,21 @@ from arvados.retry import retry_method
 
 _logger = logging.getLogger('arvados.collection')
 
+
+if sys.version_info >= (3, 0):
+    TextIOWrapper = io.TextIOWrapper
+else:
+    class TextIOWrapper(io.TextIOWrapper):
+        """To maintain backward compatibility, cast str to unicode in
+        write('foo').
+
+        """
+        def write(self, data):
+            if isinstance(data, basestring):
+                data = unicode(data)
+            return super(TextIOWrapper, self).write(data)
+
+
 class CollectionBase(object):
     """Abstract base class for Collection classes."""
 
@@ -654,7 +671,7 @@ class RichCollectionBase(CollectionBase):
 
         return self.find_or_create(path, COLLECTION)
 
-    def open(self, path, mode="r"):
+    def open(self, path, mode="r", encoding=None):
         """Open a file-like object for access.
 
         :path:
@@ -676,6 +693,7 @@ class RichCollectionBase(CollectionBase):
             opens for reading and writing.  All writes are appended to
             the end of the file.  Writing does not affect the file pointer for
             reading.
+
         """
 
         if not re.search(r'^[rwa][bt]?\+?$', mode):
@@ -698,7 +716,12 @@ class RichCollectionBase(CollectionBase):
         if mode[0] == 'w':
             arvfile.truncate(0)
 
-        return fclass(arvfile, mode=mode, num_retries=self.num_retries)
+        binmode = mode[0] + 'b' + re.sub('[bt]', '', mode[1:])
+        f = fclass(arvfile, mode=binmode, num_retries=self.num_retries)
+        if 'b' not in mode:
+            bufferclass = io.BufferedRandom if f.writable() else io.BufferedReader
+            f = TextIOWrapper(bufferclass(WrappableFile(f)), encoding=encoding)
+        return f
 
     def modified(self):
         """Determine if the collection has been modified since last commited."""
index 96f5bdd44a12ae42c25fbe64f68b342cb0356fcf..b17ed291807ab88de5948cfcdfaf6562bea5d009 100644 (file)
@@ -151,8 +151,8 @@ def statfile(prefix, fn, fnPattern="$(file %s/%s)", dirPattern="$(dir %s/%s/)",
     return prefix+fn
 
 def write_file(collection, pathprefix, fn, flush=False):
-    with open(os.path.join(pathprefix, fn)) as src:
-        dst = collection.open(fn, "w")
+    with open(os.path.join(pathprefix, fn), "rb") as src:
+        dst = collection.open(fn, "wb")
         r = src.read(1024*128)
         while r:
             dst.write(r)
index faad29872de541621ae258ac5dfb635c35ca9bf6..a760255dd6da8e01470dacafa6bcfafeddc50058 100644 (file)
@@ -217,26 +217,41 @@ class ArvadosFileWriterTestCase(unittest.TestCase):
         keep = ArvadosFileWriterTestCase.MockKeep({
             "781e5e245d69b566979b86e28d23f2c7+10": b"0123456789",
         })
-        c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n', keep_client=keep)
-        writer = c.open("count.txt", "ab+")
-        self.assertEqual(writer.read(20), b"0123456789")
-
-        writer.seek(0, os.SEEK_SET)
-        writer.write("hello")
-        self.assertEqual(writer.read(), b"")
-        writer.seek(-5, os.SEEK_CUR)
-        self.assertEqual(writer.read(3), b"hel")
-        self.assertEqual(writer.read(), b"lo")
-        writer.seek(0, os.SEEK_SET)
-        self.assertEqual(writer.read(), b"0123456789hello")
-
-        writer.seek(0)
-        writer.write("world")
-        self.assertEqual(writer.read(), b"")
-        writer.seek(0)
-        self.assertEqual(writer.read(), b"0123456789helloworld")
-
-        self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 fc5e038d38a57032085441e7fe7010b0+10 0:20:count.txt\n", c.portable_manifest_text())
+        for (mode, convert) in (
+                ('a+', lambda data: data.decode(encoding='utf-8')),
+                ('at+', lambda data: data.decode(encoding='utf-8')),
+                ('ab+', lambda data: data)):
+            c = Collection('. 781e5e245d69b566979b86e28d23f2c7+10 0:10:count.txt\n', keep_client=keep)
+            writer = c.open("count.txt", mode)
+            self.assertEqual(writer.read(20), convert(b"0123456789"))
+
+            writer.seek(0, os.SEEK_SET)
+            writer.write(convert(b"hello"))
+            self.assertEqual(writer.read(), convert(b""))
+            if 'b' in mode:
+                writer.seek(-5, os.SEEK_CUR)
+                self.assertEqual(writer.read(3), convert(b"hel"))
+                self.assertEqual(writer.read(), convert(b"lo"))
+            else:
+                with self.assertRaises(IOError):
+                    writer.seek(-5, os.SEEK_CUR)
+                with self.assertRaises(IOError):
+                    writer.seek(-3, os.SEEK_END)
+            writer.seek(0, os.SEEK_SET)
+            writer.read(7)
+            self.assertEqual(7, writer.tell())
+            self.assertEqual(7, writer.seek(7, os.SEEK_SET))
+
+            writer.seek(0, os.SEEK_SET)
+            self.assertEqual(writer.read(), convert(b"0123456789hello"))
+
+            writer.seek(0)
+            writer.write(convert(b"world"))
+            self.assertEqual(writer.read(), convert(b""))
+            writer.seek(0)
+            self.assertEqual(writer.read(), convert(b"0123456789helloworld"))
+
+            self.assertEqual(". 781e5e245d69b566979b86e28d23f2c7+10 fc5e038d38a57032085441e7fe7010b0+10 0:20:count.txt\n", c.portable_manifest_text())
 
     def test_write_at_beginning(self):
         keep = ArvadosFileWriterTestCase.MockKeep({
index ac18c44c6844c2f54fbc54d91acf094778834b3c..de01006741e91b12047f70d6b82dfff04f80bfdc 100644 (file)
@@ -836,17 +836,58 @@ class CollectionOpenModes(run_test_server.TestCaseWithServers):
         with c.open('foo', 'wb') as f:
             f.write('foo')
         for mode in ['r', 'rt', 'r+', 'rt+', 'w', 'wt', 'a', 'at']:
-            if sys.version_info >= (3, 0):
-                with self.assertRaises(NotImplementedError):
-                    c.open('foo', mode)
-            else:
-                with c.open('foo', mode) as f:
-                    if mode[0] == 'r' and '+' not in mode:
-                        self.assertEqual('foo', f.read(3))
-                    else:
-                        f.write('bar')
-                        f.seek(-3, os.SEEK_CUR)
-                        self.assertEqual('bar', f.read(3))
+            with c.open('foo', mode) as f:
+                if mode[0] == 'r' and '+' not in mode:
+                    self.assertEqual('foo', f.read(3))
+                else:
+                    f.write('bar')
+                    f.seek(0, os.SEEK_SET)
+                    self.assertEqual('bar', f.read(3))
+
+
+class TextModes(run_test_server.TestCaseWithServers):
+
+    def setUp(self):
+        arvados.config.KEEP_BLOCK_SIZE = 4
+        if sys.version_info < (3, 0):
+            import unicodedata
+            self.sailboat = unicodedata.lookup('SAILBOAT')
+            self.snowman = unicodedata.lookup('SNOWMAN')
+        else:
+            self.sailboat = '\N{SAILBOAT}'
+            self.snowman = '\N{SNOWMAN}'
+
+    def tearDown(self):
+        arvados.config.KEEP_BLOCK_SIZE = 2 ** 26
+
+    def test_read_sailboat_across_block_boundary(self):
+        c = Collection()
+        f = c.open('sailboats', 'wb')
+        data = self.sailboat.encode('utf-8')
+        f.write(data)
+        f.write(data[:1])
+        f.write(data[1:])
+        f.write(b'\n')
+        f.close()
+        self.assertRegex(c.portable_manifest_text(), r'\+4 .*\+3 ')
+
+        f = c.open('sailboats', 'r')
+        string = f.readline()
+        self.assertEqual(string, self.sailboat+self.sailboat+'\n')
+        f.close()
+
+    def test_write_snowman_across_block_boundary(self):
+        c = Collection()
+        f = c.open('snowmany', 'w')
+        data = self.snowman
+        f.write(data+data+'\n'+data+'\n')
+        f.close()
+        self.assertRegex(c.portable_manifest_text(), r'\+4 .*\+4 .*\+3 ')
+
+        f = c.open('snowmany', 'r')
+        self.assertEqual(f.readline(), self.snowman+self.snowman+'\n')
+        self.assertEqual(f.readline(), self.snowman+'\n')
+        f.close()
 
 
 class NewCollectionTestCase(unittest.TestCase, CollectionTestMixin):
index 4e7f8f0bf7b2672e0dc264022cd1d160e76d868c..93d5b9a0239753a8820d86b883abcbdf1a06b776 100644 (file)
@@ -274,9 +274,8 @@ class ArvadosModel < ActiveRecord::Base
       if !include_trash
         if sql_table != "api_client_authorizations"
           # Only include records where the owner is not trashed
-          sql_conds = "NOT EXISTS(SELECT 1 FROM #{PERMISSION_VIEW} "+
-                      "WHERE trashed = 1 AND "+
-                      "(#{sql_table}.owner_uuid = target_uuid)) #{exclude_trashed_records}"
+          sql_conds = "#{sql_table}.owner_uuid NOT IN (SELECT target_uuid FROM #{PERMISSION_VIEW} "+
+                      "WHERE trashed = 1) #{exclude_trashed_records}"
         end
       end
     else
@@ -294,14 +293,14 @@ class ArvadosModel < ActiveRecord::Base
       # see issue 13208 for details.
 
       # Match a direct read permission link from the user to the record uuid
-      direct_check = "EXISTS(SELECT 1 FROM #{PERMISSION_VIEW} "+
-                     "WHERE user_uuid IN (:user_uuids) AND perm_level >= 1 #{trashed_check} AND target_uuid = #{sql_table}.uuid)"
+      direct_check = "#{sql_table}.uuid IN (SELECT target_uuid FROM #{PERMISSION_VIEW} "+
+                     "WHERE user_uuid IN (:user_uuids) AND perm_level >= 1 #{trashed_check})"
 
       # Match a read permission link from the user to the record's owner_uuid
       owner_check = ""
       if sql_table != "api_client_authorizations" and sql_table != "groups" then
-        owner_check = "OR EXISTS(SELECT 1 FROM #{PERMISSION_VIEW} "+
-          "WHERE user_uuid IN (:user_uuids) AND perm_level >= 1 #{trashed_check} AND target_uuid = #{sql_table}.owner_uuid AND target_owner_uuid IS NOT NULL) "
+        owner_check = "OR #{sql_table}.owner_uuid IN (SELECT target_uuid FROM #{PERMISSION_VIEW} "+
+          "WHERE user_uuid IN (:user_uuids) AND perm_level >= 1 #{trashed_check} AND target_owner_uuid IS NOT NULL) "
       end
 
       links_cond = ""
index ac67040edf799465c1dda671e0a4d0eb80cf9483..bd586907ee2eaf205616251be126bc7cf9c94b09 100644 (file)
@@ -279,14 +279,6 @@ class Container < ArvadosModel
     candidates = candidates.where_serialized(:runtime_constraints, resolve_runtime_constraints(attrs[:runtime_constraints]), md5: true)
     log_reuse_info(candidates) { "after filtering on runtime_constraints #{attrs[:runtime_constraints].inspect}" }
 
-    candidates = candidates.where('runtime_user_uuid = ? or (runtime_user_uuid is NULL and runtime_auth_scopes is NULL)',
-                                  attrs[:runtime_user_uuid])
-    log_reuse_info(candidates) { "after filtering on runtime_user_uuid #{attrs[:runtime_user_uuid].inspect}" }
-
-    candidates = candidates.where('runtime_auth_scopes = ? or (runtime_user_uuid is NULL and runtime_auth_scopes is NULL)',
-                                  SafeJSON.dump(attrs[:runtime_auth_scopes].sort))
-    log_reuse_info(candidates) { "after filtering on runtime_auth_scopes #{attrs[:runtime_auth_scopes].inspect}" }
-
     log_reuse_info { "checking for state=Complete with readable output and log..." }
 
     select_readable_pdh = Collection.
index 90b4f13bf597b5b9ea306dec04b698e75fb98ae3..2a9ff5bf4cc6985a413f62a03d7b9555e9c0f938 100644 (file)
@@ -558,7 +558,8 @@ class ContainerTest < ActiveSupport::TestCase
     c1, _ = minimal_new(common_attrs.merge({runtime_token: api_client_authorizations(:active).token}))
     assert_equal Container::Queued, c1.state
     reused = Container.find_reusable(common_attrs.merge(runtime_token_attr(:container_runtime_token)))
-    assert_nil reused
+    # See #14584
+    assert_equal c1.uuid, reused.uuid
   end
 
   test "find_reusable method with nil runtime_token, then runtime_token with different user" do
@@ -567,7 +568,8 @@ class ContainerTest < ActiveSupport::TestCase
     c1, _ = minimal_new(common_attrs.merge({runtime_token: nil}))
     assert_equal Container::Queued, c1.state
     reused = Container.find_reusable(common_attrs.merge(runtime_token_attr(:container_runtime_token)))
-    assert_nil reused
+    # See #14584
+    assert_equal c1.uuid, reused.uuid
   end
 
   test "find_reusable method with different runtime_token, different scope, same user" do
@@ -576,7 +578,8 @@ class ContainerTest < ActiveSupport::TestCase
     c1, _ = minimal_new(common_attrs.merge({runtime_token: api_client_authorizations(:runtime_token_limited_scope).token}))
     assert_equal Container::Queued, c1.state
     reused = Container.find_reusable(common_attrs.merge(runtime_token_attr(:container_runtime_token)))
-    assert_nil reused
+    # See #14584
+    assert_equal c1.uuid, reused.uuid
   end
 
   test "Container running" do
index 084700d39bfad76b109078f29e81ecf82c40c5be..29fad32bd150749d1e1bcb1df696359adbde0a0f 100644 (file)
@@ -229,12 +229,7 @@ func (disp *Dispatcher) checkSqueueForOrphans() {
 func (disp *Dispatcher) slurmConstraintArgs(container arvados.Container) []string {
        mem := int64(math.Ceil(float64(container.RuntimeConstraints.RAM+container.RuntimeConstraints.KeepCacheRAM+disp.ReserveExtraRAM) / float64(1048576)))
 
-       var disk int64
-       for _, m := range container.Mounts {
-               if m.Kind == "tmp" {
-                       disk += m.Capacity
-               }
-       }
+       disk := dispatchcloud.EstimateScratchSpace(&container)
        disk = int64(math.Ceil(float64(disk) / float64(1048576)))
        return []string{
                fmt.Sprintf("--mem=%d", mem),
@@ -246,7 +241,7 @@ func (disp *Dispatcher) slurmConstraintArgs(container arvados.Container) []strin
 func (disp *Dispatcher) sbatchArgs(container arvados.Container) ([]string, error) {
        var args []string
        args = append(args, disp.SbatchArguments...)
-       args = append(args, "--job-name="+container.UUID, fmt.Sprintf("--nice=%d", initialNiceValue))
+       args = append(args, "--job-name="+container.UUID, fmt.Sprintf("--nice=%d", initialNiceValue), "--no-requeue")
 
        if disp.cluster == nil {
                // no instance types configured
index b76ece314d47806afcfb328ba12970b9171b58d5..10fdb07124cc817cc857f35384ded22ca0193d63 100644 (file)
@@ -202,6 +202,7 @@ func (s *IntegrationSuite) TestMissingFromSqueue(c *C) {
                [][]string{{
                        fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
                        fmt.Sprintf("--nice=%d", 10000),
+                       "--no-requeue",
                        fmt.Sprintf("--mem=%d", 11445),
                        fmt.Sprintf("--cpus-per-task=%d", 4),
                        fmt.Sprintf("--tmp=%d", 45777),
@@ -217,7 +218,7 @@ func (s *IntegrationSuite) TestMissingFromSqueue(c *C) {
 func (s *IntegrationSuite) TestSbatchFail(c *C) {
        s.slurm = slurmFake{errBatch: errors.New("something terrible happened")}
        container := s.integrationTest(c,
-               [][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--nice=10000", "--mem=11445", "--cpus-per-task=4", "--tmp=45777"}},
+               [][]string{{"--job-name=zzzzz-dz642-queuedcontainer", "--nice=10000", "--no-requeue", "--mem=11445", "--cpus-per-task=4", "--tmp=45777"}},
                func(dispatcher *dispatch.Dispatcher, container arvados.Container) {
                        dispatcher.UpdateState(container.UUID, dispatch.Running)
                        dispatcher.UpdateState(container.UUID, dispatch.Complete)
@@ -362,7 +363,7 @@ func (s *StubbedSuite) TestSbatchArgs(c *C) {
                s.disp.SbatchArguments = defaults
 
                args, err := s.disp.sbatchArgs(container)
-               c.Check(args, DeepEquals, append(defaults, "--job-name=123", "--nice=10000", "--mem=239", "--cpus-per-task=2", "--tmp=0"))
+               c.Check(args, DeepEquals, append(defaults, "--job-name=123", "--nice=10000", "--no-requeue", "--mem=239", "--cpus-per-task=2", "--tmp=0"))
                c.Check(err, IsNil)
        }
 }
@@ -408,7 +409,7 @@ func (s *StubbedSuite) TestSbatchInstanceTypeConstraint(c *C) {
                args, err := s.disp.sbatchArgs(container)
                c.Check(err == nil, Equals, trial.err == nil)
                if trial.err == nil {
-                       c.Check(args, DeepEquals, append([]string{"--job-name=123", "--nice=10000"}, trial.sbatchArgs...))
+                       c.Check(args, DeepEquals, append([]string{"--job-name=123", "--nice=10000", "--no-requeue"}, trial.sbatchArgs...))
                } else {
                        c.Check(len(err.(dispatchcloud.ConstraintsNotSatisfiableError).AvailableTypes), Equals, len(trial.types))
                }
@@ -425,7 +426,7 @@ func (s *StubbedSuite) TestSbatchPartition(c *C) {
 
        args, err := s.disp.sbatchArgs(container)
        c.Check(args, DeepEquals, []string{
-               "--job-name=123", "--nice=10000",
+               "--job-name=123", "--nice=10000", "--no-requeue",
                "--mem=239", "--cpus-per-task=1", "--tmp=0",
                "--partition=blurb,b2",
        })