simplecov-html (0.10.2)
simplecov-rcov (0.2.3)
simplecov (>= 0.4.1)
- sprockets (3.7.1)
+ sprockets (3.7.2)
concurrent-ruby (~> 1.0)
rack (> 1, < 3)
sprockets-rails (3.2.1)
arvados_api_client.discovery[:source_version]
end
+ # Get the packageVersion given in the API server's discovery
+ # document.
+ def api_package_version
+ arvados_api_client.discovery[:packageVersion]
+ end
+
# URL for browsing source code for the given version.
def version_link_target version
"https://arvados.org/projects/arvados/repository/changes?rev=#{version.sub(/-.*/, "")}"
additional_info_str = additional_info.map {|k,v| "#{k}=#{v}"}.join("\n")
additional_info['api_source_version'] = api_source_version
+ additional_info['api_package_version'] = api_package_version
additional_info['generated_at'] = generated_at
additional_info['workbench_version'] = AppVersion.hash
+ additional_info['workbench_package_version'] = AppVersion.package_version
additional_info['arvados_base'] = arvados_base
additional_info['support_email'] = support_email
additional_info['error_message'] = params[:error_message] if params[:error_message]
<label for="wb_version" class="col-sm-4 control-label"> Workbench version </label>
<div class="col-sm-8">
<p class="form-control-static" name="wb_version">
- <%= link_to AppVersion.hash, version_link_target(AppVersion.hash) %>
+ <%= AppVersion.package_version %> (<%= link_to AppVersion.hash, version_link_target(AppVersion.hash) %>)
</p>
</div>
</div>
<label for="server_version" class="col-sm-4 control-label"> API version </label>
<div class="col-sm-8">
<p class="form-control-static" name="server_version">
- <%= link_to api_source_version, version_link_target(api_source_version) %>
+ <%= api_package_version %> (<%= link_to api_source_version, version_link_target(api_source_version) %>)
</p>
</div>
</div>
# "git log".
source_version: false
+ # Override the automatic package string. With the default value of
+ # false, the package string is read from package-build.version in
+ # Rails.root (included in vendor packages).
+ package_version: false
+
# report notification to and from addresses
issue_reporter_email_from: arvados@example.com
issue_reporter_email_to: arvados@example.com
def self.forget
@hash = nil
+ @package_version = nil
end
# Return abbrev commit hash for current code version: "abc1234", or
@hash || "unknown"
end
+
+ def self.package_version
+ if (cached = Rails.configuration.package_version || @package_version)
+ return cached
+ end
+
+ begin
+ @package_version = IO.read(Rails.root.join("package-build.version")).strip
+ rescue Errno::ENOENT
+ @package_version = "unknown"
+ end
+
+ @package_version
+ end
end
type Multi map[string]Handler
func (m Multi) RunCommand(prog string, args []string, stdin io.Reader, stdout, stderr io.Writer) int {
- if len(args) < 1 {
- fmt.Fprintf(stderr, "usage: %s command [args]\n", prog)
- m.Usage(stderr)
- return 2
- }
_, basename := filepath.Split(prog)
- if strings.HasPrefix(basename, "arvados-") {
- basename = basename[8:]
- } else if strings.HasPrefix(basename, "crunch-") {
- basename = basename[7:]
- }
+ basename = strings.TrimPrefix(basename, "arvados-")
+ basename = strings.TrimPrefix(basename, "crunch-")
if cmd, ok := m[basename]; ok {
return cmd.RunCommand(prog, args, stdin, stdout, stderr)
+ } else if len(args) < 1 {
+ fmt.Fprintf(stderr, "usage: %s command [args]\n", prog)
+ m.Usage(stderr)
+ return 2
} else if cmd, ok = m[args[0]]; ok {
return cmd.RunCommand(prog+" "+args[0], args[1:], stdin, stdout, stderr)
} else {
import ciso8601
import uuid
+from arvados_cwl.util import get_current_container, get_intermediate_collection_info
import ruamel.yaml as yaml
from cwltool.errors import WorkflowException
keepemptydirs(vwd)
- with Perf(metrics, "generatefiles.save_new %s" % self.name):
- vwd.save_new()
+ if not runtimeContext.current_container:
+ runtimeContext.current_container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
+ info = get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
+ vwd.save_new(name=info["name"],
+ owner_uuid=self.arvrunner.project_uuid,
+ ensure_unique_name=True,
+ trash_at=info["trash_at"],
+ properties=info["properties"])
prev = None
for f, p in sorteditems:
if self.timelimit is not None:
scheduling_parameters["max_run_time"] = self.timelimit
+ container_request["output_name"] = "Output for step %s" % (self.name)
container_request["output_ttl"] = self.output_ttl
container_request["mounts"] = mounts
container_request["secret_mounts"] = secret_mounts
from schema_salad.sourceline import SourceLine
+from arvados_cwl.util import get_current_container, get_intermediate_collection_info
import ruamel.yaml as yaml
import arvados.collection
if vwd:
with Perf(metrics, "generatefiles.save_new %s" % self.name):
- vwd.save_new()
+ if not runtimeContext.current_container:
+ runtimeContext.current_container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
+ info = get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
+ vwd.save_new(name=info["name"],
+ owner_uuid=self.arvrunner.project_uuid,
+ ensure_unique_name=True,
+ trash_at=info["trash_at"],
+ properties=info["properties"])
for f, p in generatemapper.items():
if p.type == "File":
"workflow": {
"name": name,
"description": tool.tool.get("doc", ""),
- "definition":yaml.round_trip_dump(packed)
+ "definition":json.dumps(packed, sort_keys=True, indent=4, separators=(',',': '))
}}
if project_uuid:
body["workflow"]["owner_uuid"] = project_uuid
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
from cwltool.context import LoadingContext, RuntimeContext
class ArvLoadingContext(LoadingContext):
self.wait = True
self.cwl_runner_job = None
self.storage_classes = "default"
+ self.current_container = None
super(ArvRuntimeContext, self).__init__(kwargs)
import os
import urllib
+from arvados_cwl.util import get_current_container, get_intermediate_collection_info
import arvados.commands.run
import arvados.collection
from schema_salad.sourceline import SourceLine
+from arvados.errors import ApiError
from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
from cwltool.workflow import WorkflowException
for l in srcobj.get("listing", []):
self.addentry(l, c, ".", remap)
- check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
- if not check["items"]:
- c.save_new(owner_uuid=self.arvrunner.project_uuid)
+ container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
+ info = get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
+
+ c.save_new(name=info["name"],
+ owner_uuid=self.arvrunner.project_uuid,
+ ensure_unique_name=True,
+ trash_at=info["trash_at"],
+ properties=info["properties"])
ab = self.collection_pattern % c.portable_data_hash()
self._pathmap[srcobj["location"]] = MapperEnt("keep:"+c.portable_data_hash(), ab, "Directory", True)
num_retries=self.arvrunner.num_retries )
self.addentry(srcobj, c, ".", remap)
- check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
- if not check["items"]:
- c.save_new(owner_uuid=self.arvrunner.project_uuid)
+ container = get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
+ info = get_intermediate_collection_info(None, container, self.arvrunner.intermediate_output_ttl)
+
+ c.save_new(name=info["name"],
+ owner_uuid=self.arvrunner.project_uuid,
+ ensure_unique_name=True,
+ trash_at=info["trash_at"],
+ properties=info["properties"])
ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
self._pathmap[srcobj["location"]] = MapperEnt("keep:%s/%s" % (c.portable_data_hash(), srcobj["basename"]),
else:
return None
+
class StagingPathMapper(PathMapper):
_follow_dirs = True
from functools import partial
import logging
import json
-import subprocess
+import subprocess32 as subprocess
from collections import namedtuple
from StringIO import StringIO
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import datetime
+from arvados.errors import ApiError
+
+def get_intermediate_collection_info(workflow_step_name, current_container, intermediate_output_ttl):
+ if workflow_step_name:
+ name = "Intermediate collection for step %s" % (workflow_step_name)
+ else:
+ name = "Intermediate collection"
+ trash_time = None
+ if intermediate_output_ttl > 0:
+ trash_time = datetime.datetime.utcnow() + datetime.timedelta(seconds=intermediate_output_ttl)
+ container_uuid = None
+ if current_container:
+ container_uuid = current_container['uuid']
+ props = {"type": "intermediate", "container": container_uuid}
+
+ return {"name" : name, "trash_at" : trash_time, "properties" : props}
+
+def get_current_container(api, num_retries=0, logger=None):
+ current_container = None
+ try:
+ current_container = api.containers().current().execute(num_retries=num_retries)
+ except ApiError as e:
+ # Status code 404 just means we're not running in a container.
+ if e.resp.status != 404 and logger:
+ logger.info("Getting current container: %s", e)
+ return current_container
'ruamel.yaml >=0.13.11, <0.15',
'arvados-python-client>=1.1.4.20180607143841',
'setuptools',
- 'ciso8601 >=1.0.6, <2.0.0'
+ 'ciso8601 >=1.0.6, <2.0.0',
+ 'subprocess32>=3.5.1',
],
data_files=[
('share/doc/arvados-cwl-runner', ['LICENSE-2.0.txt', 'README.rst']),
],
test_suite='tests',
- tests_require=['mock>=1.0'],
+ tests_require=[
+ 'mock>=1.0',
+ 'subprocess32>=3.5.1',
+ ],
zip_safe=True
)
#
# SPDX-License-Identifier: Apache-2.0
-cwlVersion: v1.0
-$graph:
-- class: Workflow
- inputs: []
- outputs: []
- steps:
- - in: []
- out: []
- run: '#step1.cwl'
- id: '#main/step1'
- - in: []
- out: []
- run: '#step2.cwl'
- id: '#main/step2'
- id: '#main'
-- class: CommandLineTool
- inputs:
- - type: File
- default:
- class: File
- location: keep:b9fca8bf06b170b8507b80b2564ee72b+57/a.txt
- id: '#step1.cwl/a'
- - type: File
- default:
- class: File
- location: keep:b9fca8bf06b170b8507b80b2564ee72b+57/b.txt
- id: '#step1.cwl/b'
- outputs: []
- arguments: [echo, $(inputs.a), $(inputs.b)]
- id: '#step1.cwl'
-- class: CommandLineTool
- inputs:
- - type: File
- default:
- class: File
- location: keep:8e2d09a066d96cdffdd2be41579e4e2e+57/b.txt
- id: '#step2.cwl/b'
- - type: File
- default:
- class: File
- location: keep:8e2d09a066d96cdffdd2be41579e4e2e+57/c.txt
- id: '#step2.cwl/c'
- outputs: []
- arguments: [echo, $(inputs.c), $(inputs.b)]
- id: '#step2.cwl'
+{
+ "$graph": [
+ {
+ "class": "Workflow",
+ "id": "#main",
+ "inputs": [],
+ "outputs": [],
+ "steps": [
+ {
+ "id": "#main/step1",
+ "in": [],
+ "out": [],
+ "run": "#step1.cwl"
+ },
+ {
+ "id": "#main/step2",
+ "in": [],
+ "out": [],
+ "run": "#step2.cwl"
+ }
+ ]
+ },
+ {
+ "arguments": [
+ "echo",
+ "$(inputs.a)",
+ "$(inputs.b)"
+ ],
+ "class": "CommandLineTool",
+ "id": "#step1.cwl",
+ "inputs": [
+ {
+ "default": {
+ "class": "File",
+ "location": "keep:b9fca8bf06b170b8507b80b2564ee72b+57/a.txt"
+ },
+ "id": "#step1.cwl/a",
+ "type": "File"
+ },
+ {
+ "default": {
+ "class": "File",
+ "location": "keep:b9fca8bf06b170b8507b80b2564ee72b+57/b.txt"
+ },
+ "id": "#step1.cwl/b",
+ "type": "File"
+ }
+ ],
+ "outputs": []
+ },
+ {
+ "arguments": [
+ "echo",
+ "$(inputs.c)",
+ "$(inputs.b)"
+ ],
+ "class": "CommandLineTool",
+ "id": "#step2.cwl",
+ "inputs": [
+ {
+ "default": {
+ "class": "File",
+ "location": "keep:8e2d09a066d96cdffdd2be41579e4e2e+57/b.txt"
+ },
+ "id": "#step2.cwl/b",
+ "type": "File"
+ },
+ {
+ "default": {
+ "class": "File",
+ "location": "keep:8e2d09a066d96cdffdd2be41579e4e2e+57/c.txt"
+ },
+ "id": "#step2.cwl/c",
+ "type": "File"
+ }
+ ],
+ "outputs": []
+ }
+ ],
+ "cwlVersion": "v1.0"
+}
\ No newline at end of file
--- /dev/null
+class: CommandLineTool
+cwlVersion: v1.0
+requirements:
+ InitialWorkDirRequirement:
+ listing:
+ - $(inputs.inp1)
+ - $(inputs.inp2)
+ - $(inputs.inp3)
+inputs:
+ inp1: File
+ inp2: [File, Directory]
+ inp3: Directory
+outputs: []
+arguments: [echo, $(inputs.inp1), $(inputs.inp2), $(inputs.inp3)]
--- /dev/null
+cwlVersion: v1.0
+class: Workflow
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+requirements:
+ SubworkflowFeatureRequirement: {}
+inputs:
+ inp1:
+ type: File
+ default:
+ class: File
+ location: hello1.txt
+ inp2:
+ type: [File, Directory]
+ default:
+ class: File
+ basename: "hello2.txt"
+ contents: "Hello world"
+ inp3:
+ type: [File, Directory]
+ default:
+ class: Directory
+ basename: inp3
+ listing:
+ - class: File
+ basename: "hello3.txt"
+ contents: "hello world"
+outputs: []
+steps:
+ step1:
+ requirements:
+ arv:RunInSingleContainer: {}
+ in:
+ inp1: inp1
+ inp2: inp2
+ inp3: inp3
+ out: []
+ run: subwf.cwl
--- /dev/null
+cwlVersion: v1.0
+class: Workflow
+inputs:
+ inp1: File
+ inp2: File
+ inp3: Directory
+outputs: []
+steps:
+ step1:
+ in:
+ inp1: inp1
+ inp2: inp2
+ inp3: inp3
+ out: []
+ run: echo.cwl
logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
logging.getLogger('arvados.arv-run').setLevel(logging.WARN)
-
class TestContainer(unittest.TestCase):
def helper(self, runner, enable_reuse=True):
"capacity": 1073741824 }
},
'state': 'Committed',
+ 'output_name': 'Output for step test_run_'+str(enable_reuse),
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
"capacity": 5242880000 }
},
'state': 'Committed',
+ 'output_name': 'Output for step test_resource_requirements',
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 7200,
}
},
'state': 'Committed',
+ 'output_name': 'Output for step test_initial_work_dir',
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
},
},
'state': 'Committed',
+ "output_name": "Output for step test_run_redirect",
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
"capacity": 1073741824 }
},
'state': 'Committed',
+ 'output_name': 'Output for step test_run_mounts',
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
"capacity": 1073741824 }
},
'state': 'Committed',
+ 'output_name': 'Output for step test_secrets',
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import unittest
+import mock
+import datetime
+import httplib2
+
+from arvados_cwl.util import *
+from arvados.errors import ApiError
+
+class MockDateTime(datetime.datetime):
+ @classmethod
+ def utcnow(cls):
+ return datetime.datetime(2018, 1, 1, 0, 0, 0, 0)
+
+datetime.datetime = MockDateTime
+
+class TestUtil(unittest.TestCase):
+ def test_get_intermediate_collection_info(self):
+ name = "one"
+ current_container = {"uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz"}
+ intermediate_output_ttl = 120
+
+ info = get_intermediate_collection_info(name, current_container, intermediate_output_ttl)
+
+ self.assertEqual(info["name"], "Intermediate collection for step one")
+ self.assertEqual(info["trash_at"], datetime.datetime(2018, 1, 1, 0, 2, 0, 0))
+ self.assertEqual(info["properties"], {"type" : "intermediate", "container" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"})
+
+ def test_get_current_container_success(self):
+ api = mock.MagicMock()
+ api.containers().current().execute.return_value = {"uuid" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"}
+
+ current_container = get_current_container(api)
+
+ self.assertEqual(current_container, {"uuid" : "zzzzz-8i9sb-zzzzzzzzzzzzzzz"})
+
+ def test_get_current_container_error(self):
+ api = mock.MagicMock()
+ api.containers().current().execute.side_effect = ApiError(httplib2.Response({"status": 300}), "")
+ logger = mock.MagicMock()
+
+ self.assertRaises(ApiError, get_current_container(api, num_retries=0, logger=logger))
#
# SPDX-License-Identifier: Apache-2.0
-cwlVersion: v1.0
-$graph:
-- class: CommandLineTool
- requirements:
- - class: DockerRequirement
- dockerPull: debian:8
- inputs:
- - id: '#submit_tool.cwl/x'
- type: File
- default:
- class: File
- location: keep:5d373e7629203ce39e7c22af98a0f881+52/blub.txt
- inputBinding:
- position: 1
- outputs: []
- baseCommand: cat
- id: '#submit_tool.cwl'
-- class: Workflow
- inputs:
- - id: '#main/x'
- type: File
- default: {class: File, location: keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt,
- size: 16, basename: blorp.txt, nameroot: blorp, nameext: .txt}
- - id: '#main/y'
- type: Directory
- default: {class: Directory, location: keep:99999999999999999999999999999998+99,
- basename: 99999999999999999999999999999998+99}
- - id: '#main/z'
- type: Directory
- default: {class: Directory, basename: anonymous, listing: [{basename: renamed.txt,
- class: File, location: keep:99999999999999999999999999999998+99/file1.txt,
- nameroot: renamed, nameext: .txt}]}
- outputs: []
- steps:
- - id: '#main/step1'
- in:
- - {id: '#main/step1/x', source: '#main/x'}
- out: []
- run: '#submit_tool.cwl'
- id: '#main'
+{
+ "$graph": [
+ {
+ "baseCommand": "cat",
+ "class": "CommandLineTool",
+ "id": "#submit_tool.cwl",
+ "inputs": [
+ {
+ "default": {
+ "class": "File",
+ "location": "keep:5d373e7629203ce39e7c22af98a0f881+52/blub.txt"
+ },
+ "id": "#submit_tool.cwl/x",
+ "inputBinding": {
+ "position": 1
+ },
+ "type": "File"
+ }
+ ],
+ "outputs": [],
+ "requirements": [
+ {
+ "class": "DockerRequirement",
+ "dockerPull": "debian:8"
+ }
+ ]
+ },
+ {
+ "class": "Workflow",
+ "id": "#main",
+ "inputs": [
+ {
+ "default": {
+ "basename": "blorp.txt",
+ "class": "File",
+ "location": "keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt",
+ "nameext": ".txt",
+ "nameroot": "blorp",
+ "size": 16
+ },
+ "id": "#main/x",
+ "type": "File"
+ },
+ {
+ "default": {
+ "basename": "99999999999999999999999999999998+99",
+ "class": "Directory",
+ "location": "keep:99999999999999999999999999999998+99"
+ },
+ "id": "#main/y",
+ "type": "Directory"
+ },
+ {
+ "default": {
+ "basename": "anonymous",
+ "class": "Directory",
+ "listing": [
+ {
+ "basename": "renamed.txt",
+ "class": "File",
+ "location": "keep:99999999999999999999999999999998+99/file1.txt",
+ "nameext": ".txt",
+ "nameroot": "renamed"
+ }
+ ]
+ },
+ "id": "#main/z",
+ "type": "Directory"
+ }
+ ],
+ "outputs": [],
+ "steps": [
+ {
+ "id": "#main/step1",
+ "in": [
+ {
+ "id": "#main/step1/x",
+ "source": "#main/x"
+ }
+ ],
+ "out": [],
+ "run": "#submit_tool.cwl"
+ }
+ ]
+ }
+ ],
+ "cwlVersion": "v1.0"
+}
\ No newline at end of file
import pprint
import re
import string
-import subprocess
import sys
-import threading
import time
import types
import zlib
import json
import os
import re
-import subprocess
+import subprocess32 as subprocess
import sys
import tarfile
import tempfile
'ruamel.yaml >=0.13.11, <0.15',
'setuptools',
'ws4py <0.4',
+ 'subprocess32>=3.5.1',
],
test_suite='tests',
tests_require=['pbr<1.7.0', 'mock>=1.0', 'PyYAML'],
simplecov-html (0.7.1)
simplecov-rcov (0.2.3)
simplecov (>= 0.4.1)
- sprockets (2.12.4)
+ sprockets (2.12.5)
hike (~> 1.2)
multi_json (~> 1.0)
rack (~> 1.0)
version: "v1",
revision: "20131114",
source_version: AppVersion.hash,
+ sourceVersion: AppVersion.hash, # source_version should be deprecated in the future
+ packageVersion: AppVersion.package_version,
generatedAt: db_current_time.iso8601,
title: "Arvados API",
description: "The API to interact with Arvados.",
# "git log".
source_version: false
+ # Override the automatic package version string. With the default version of
+ # false, the package version is read from package-build.version in Rails.root
+ # (included in vendor packages).
+ package_version: false
+
# Enable asynchronous permission graph rebuild. Must run
# script/permission-updater.rb as a separate process. When the permission
# cache is invalidated, the background process will update the permission
def self.forget
@hash = nil
+ @package_version = nil
end
# Return abbrev commit hash for current code version: "abc1234", or
@hash || "unknown"
end
+
+ def self.package_version
+ if (cached = Rails.configuration.package_version || @package_version)
+ return cached
+ end
+
+ begin
+ @package_version = IO.read(Rails.root.join("package-build.version")).strip
+ rescue Errno::ENOENT
+ @package_version = "unknown"
+ end
+
+ @package_version
+ end
end
assert_includes discovery_doc, 'defaultTrashLifetime'
assert_equal discovery_doc['defaultTrashLifetime'], Rails.application.config.default_trash_lifetime
assert_match(/^[0-9a-f]+(-modified)?$/, discovery_doc['source_version'])
+ assert_match(/^[0-9a-f]+(-modified)?$/, discovery_doc['sourceVersion'])
+ assert_match(/^unknown$/, discovery_doc['packageVersion'])
assert_equal discovery_doc['websocketUrl'], Rails.application.config.websocket_address
assert_equal discovery_doc['workbenchUrl'], Rails.application.config.workbench_address
assert_equal('zzzzz', discovery_doc['uuidPrefix'])
end
- test "discovery document overrides source_version with config" do
+ test "discovery document overrides source_version & sourceVersion with config" do
Rails.configuration.source_version = 'aaa888fff'
get :index
assert_response :success
discovery_doc = JSON.parse(@response.body)
+ # Key source_version will be replaced with sourceVersion
assert_equal 'aaa888fff', discovery_doc['source_version']
+ assert_equal 'aaa888fff', discovery_doc['sourceVersion']
+ end
+
+ test "discovery document overrides packageVersion with config" do
+ Rails.configuration.package_version = '1.0.0-stable'
+ get :index
+ assert_response :success
+ discovery_doc = JSON.parse(@response.body)
+ assert_equal '1.0.0-stable', discovery_doc['packageVersion']
end
test "empty disable_api_methods" do
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
+ "github.com/shirou/gopsutil/process"
"golang.org/x/net/context"
dockertypes "github.com/docker/docker/api/types"
ImageRemove(ctx context.Context, image string, options dockertypes.ImageRemoveOptions) ([]dockertypes.ImageDeleteResponseItem, error)
}
+type PsProcess interface {
+ CmdlineSlice() ([]string, error)
+}
+
// ContainerRunner is the main stateful struct used for a single execution of a
// container.
type ContainerRunner struct {
finalState string
parentTemp string
+ ListProcesses func() ([]PsProcess, error)
+
statLogger io.WriteCloser
statReporter *crunchstat.Reporter
hoststatLogger io.WriteCloser
cStateLock sync.Mutex
cCancelled bool // StopContainer() invoked
- enableNetwork string // one of "default" or "always"
- networkMode string // passed through to HostConfig.NetworkMode
- arvMountLog *ThrottledLogger
+ enableNetwork string // one of "default" or "always"
+ networkMode string // passed through to HostConfig.NetworkMode
+ arvMountLog *ThrottledLogger
+ checkContainerd time.Duration
}
// setupSignals sets up signal handling to gracefully terminate the underlying
var errorBlacklist = []string{
"(?ms).*[Cc]annot connect to the Docker daemon.*",
"(?ms).*oci runtime error.*starting container process.*container init.*mounting.*to rootfs.*no such file or directory.*",
+ "(?ms).*grpc: the connection is unavailable.*",
}
var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)")
+func (runner *ContainerRunner) runBrokenNodeHook() {
+ if *brokenNodeHook == "" {
+ runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.")
+ } else {
+ runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
+ // run killme script
+ c := exec.Command(*brokenNodeHook)
+ c.Stdout = runner.CrunchLog
+ c.Stderr = runner.CrunchLog
+ err := c.Run()
+ if err != nil {
+ runner.CrunchLog.Printf("Error running broken node hook: %v", err)
+ }
+ }
+}
+
func (runner *ContainerRunner) checkBrokenNode(goterr error) bool {
for _, d := range errorBlacklist {
if m, e := regexp.MatchString(d, goterr.Error()); m && e == nil {
runner.CrunchLog.Printf("Error suggests node is unable to run containers: %v", goterr)
- if *brokenNodeHook == "" {
- runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.")
- } else {
- runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
- // run killme script
- c := exec.Command(*brokenNodeHook)
- c.Stdout = runner.CrunchLog
- c.Stderr = runner.CrunchLog
- err := c.Run()
- if err != nil {
- runner.CrunchLog.Printf("Error running broken node hook: %v", err)
- }
- }
+ runner.runBrokenNodeHook()
return true
}
}
runner.ContainerConfig.Volumes = runner.Volumes
maxRAM := int64(runner.Container.RuntimeConstraints.RAM)
+ if maxRAM < 4*1024*1024 {
+ // Docker daemon won't let you set a limit less than 4 MiB
+ maxRAM = 4 * 1024 * 1024
+ }
runner.HostConfig = dockercontainer.HostConfig{
Binds: runner.Binds,
LogConfig: dockercontainer.LogConfig{
return nil
}
+// checkContainerd checks if "containerd" is present in the process list.
+func (runner *ContainerRunner) CheckContainerd() error {
+ if runner.checkContainerd == 0 {
+ return nil
+ }
+ p, _ := runner.ListProcesses()
+ for _, i := range p {
+ e, _ := i.CmdlineSlice()
+ if len(e) > 0 {
+ if strings.Index(e[0], "containerd") > -1 {
+ return nil
+ }
+ }
+ }
+
+ // Not found
+ runner.runBrokenNodeHook()
+ runner.stop(nil)
+ return fmt.Errorf("'containerd' not found in process list.")
+}
+
// WaitFinish waits for the container to terminate, capture the exit code, and
// close the stdout/stderr logging.
func (runner *ContainerRunner) WaitFinish() error {
if timeout := runner.Container.SchedulingParameters.MaxRunTime; timeout > 0 {
runTimeExceeded = time.After(time.Duration(timeout) * time.Second)
}
+
+ containerdGone := make(chan error)
+ defer close(containerdGone)
+ if runner.checkContainerd > 0 {
+ go func() {
+ ticker := time.NewTicker(time.Duration(runner.checkContainerd))
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ if ck := runner.CheckContainerd(); ck != nil {
+ containerdGone <- ck
+ return
+ }
+ case <-containerdGone:
+ // Channel closed, quit goroutine
+ return
+ }
+ }
+ }()
+ }
+
for {
select {
case waitBody := <-waitOk:
runner.CrunchLog.Printf("maximum run time exceeded. Stopping container.")
runner.stop(nil)
runTimeExceeded = nil
+
+ case err := <-containerdGone:
+ return err
}
}
}
return
}
+ // Sanity check that containerd is running.
+ err = runner.CheckContainerd()
+ if err != nil {
+ return
+ }
+
// check for and/or load image
err = runner.LoadImage()
if err != nil {
cr.NewLogWriter = cr.NewArvLogWriter
cr.RunArvMount = cr.ArvMountCmd
cr.MkTempDir = ioutil.TempDir
+ cr.ListProcesses = func() ([]PsProcess, error) {
+ pr, err := process.Processes()
+ if err != nil {
+ return nil, err
+ }
+ ps := make([]PsProcess, len(pr))
+ for i, j := range pr {
+ ps[i] = j
+ }
+ return ps, nil
+ }
cr.MkArvClient = func(token string) (IArvadosClient, error) {
cl, err := arvadosclient.MakeArvadosClient()
if err != nil {
`)
memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container")
getVersion := flag.Bool("version", false, "Print version information and exit.")
+ checkContainerd := flag.Duration("check-containerd", 60*time.Second, "Periodic check if (docker-)containerd is running (use 0s to disable).")
flag.Parse()
// Print version information if requested
cr.expectCgroupParent = *cgroupParent
cr.enableNetwork = *enableNetwork
cr.networkMode = *networkMode
+ cr.checkContainerd = *checkContainerd
if *cgroupParentSubsystem != "" {
p := findCgroup(*cgroupParentSubsystem)
cr.setCgroupParent = p
c.Check(api.CalledWith("collection.manifest_text", ". 34819d7beeabb9260a5c854bc85b3e44+10 0:10:secret.conf\n"), IsNil)
c.Check(api.CalledWith("collection.manifest_text", ""), NotNil)
}
+
+type FakeProcess struct {
+ cmdLine []string
+}
+
+func (fp FakeProcess) CmdlineSlice() ([]string, error) {
+ return fp.cmdLine, nil
+}
+
+func (s *TestSuite) helpCheckContainerd(c *C, lp func() ([]PsProcess, error)) error {
+ kc := &KeepTestClient{}
+ defer kc.Close()
+ cr, err := NewContainerRunner(s.client, &ArvTestClient{callraw: true}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr.checkContainerd = time.Duration(100 * time.Millisecond)
+ c.Assert(err, IsNil)
+ cr.ListProcesses = lp
+
+ s.docker.fn = func(t *TestDockerClient) {
+ time.Sleep(1 * time.Second)
+ t.logWriter.Close()
+ }
+
+ err = cr.CreateContainer()
+ c.Check(err, IsNil)
+
+ err = cr.StartContainer()
+ c.Check(err, IsNil)
+
+ err = cr.WaitFinish()
+ return err
+
+}
+
+func (s *TestSuite) TestCheckContainerdPresent(c *C) {
+ err := s.helpCheckContainerd(c, func() ([]PsProcess, error) {
+ return []PsProcess{FakeProcess{[]string{"docker-containerd"}}}, nil
+ })
+ c.Check(err, IsNil)
+}
+
+func (s *TestSuite) TestCheckContainerdMissing(c *C) {
+ err := s.helpCheckContainerd(c, func() ([]PsProcess, error) {
+ return []PsProcess{FakeProcess{[]string{"abc"}}}, nil
+ })
+ c.Check(err, ErrorMatches, `'containerd' not found in process list.`)
+}
cr.CrunchLog.Print("Goodbye")
cr.CrunchLog.Close()
- c.Check(api.Calls > 1, Equals, true)
+ c.Check(api.Calls > 0, Equals, true)
c.Check(api.Calls < 2000000, Equals, true)
mt, err := cr.LogCollection.MarshalManifest(".")
from __future__ import absolute_import, print_function
-import subprocess
+import subprocess32 as subprocess
import time
from . import ComputeNodeMonitorActor
import logging
import re
-import subprocess
+import subprocess32 as subprocess
import arvados.util
from __future__ import absolute_import, print_function
-import subprocess
+import subprocess32 as subprocess
from . import clientactor
from . import config
'future',
'pykka',
'python-daemon',
- 'setuptools'
+ 'setuptools',
+ 'subprocess32>=3.5.1',
],
dependency_links=[
"https://github.com/curoverse/libcloud/archive/apache-libcloud-2.3.1.dev1.zip"
'pbr<1.7.0',
'mock>=1.0',
'apache-libcloud>=2.3.1.dev1',
+ 'subprocess32>=3.5.1',
],
zip_safe=False
)
"""
-import subprocess
+import subprocess32 as subprocess
import os
import sys
import re
from __future__ import absolute_import, print_function
-import subprocess
+import subprocess32 as subprocess
import time
import unittest
ComputeNodeSetupActorTestCase, \
ComputeNodeUpdateActorTestCase
-@mock.patch('subprocess.check_output')
+@mock.patch('subprocess32.check_output')
class SLURMComputeNodeShutdownActorTestCase(ComputeNodeShutdownActorMixin,
unittest.TestCase):
ACTOR_CLASS = slurm_dispatch.ComputeNodeShutdownActor
super(SLURMComputeNodeShutdownActorTestCase,
self).test_uncancellable_shutdown()
-@mock.patch('subprocess.check_output')
+@mock.patch('subprocess32.check_output')
class SLURMComputeNodeUpdateActorTestCase(ComputeNodeUpdateActorTestCase):
ACTOR_CLASS = slurm_dispatch.ComputeNodeUpdateActor
class SLURMComputeNodeSetupActorTestCase(ComputeNodeSetupActorTestCase):
ACTOR_CLASS = slurm_dispatch.ComputeNodeSetupActor
- @mock.patch('subprocess.check_output')
+ @mock.patch('subprocess32.check_output')
def test_update_node_features(self, check_output):
# `scontrol update` happens only if the Arvados node record
# has a hostname. ComputeNodeSetupActorTestCase.make_mocks
self.wait_for_assignment(self.setup_actor, 'cloud_node')
check_output.assert_called_with(['scontrol', 'update', 'NodeName=compute99', 'Weight=1000', 'Features=instancetype=z1.test'])
- @mock.patch('subprocess.check_output')
+ @mock.patch('subprocess32.check_output')
def test_failed_arvados_calls_retried(self, check_output):
super(SLURMComputeNodeSetupActorTestCase, self).test_failed_arvados_calls_retried()
- @mock.patch('subprocess.check_output')
+ @mock.patch('subprocess32.check_output')
def test_subscribe(self, check_output):
super(SLURMComputeNodeSetupActorTestCase, self).test_subscribe()
- @mock.patch('subprocess.check_output')
+ @mock.patch('subprocess32.check_output')
def test_creation_with_arvados_node(self, check_output):
super(SLURMComputeNodeSetupActorTestCase, self).test_creation_with_arvados_node()
super(JobQueueMonitorActorTestCase, self).build_monitor(*args, **kwargs)
self.client.jobs().queue().execute.side_effect = side_effect
- @mock.patch("subprocess.check_call")
- @mock.patch("subprocess.check_output")
+ @mock.patch("subprocess32.check_call")
+ @mock.patch("subprocess32.check_output")
def test_unsatisfiable_jobs(self, mock_squeue, mock_scancel):
job_uuid = 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'
container_uuid = 'yyyyy-dz642-yyyyyyyyyyyyyyy'
self.client.jobs().cancel.assert_called_with(uuid=job_uuid)
mock_scancel.assert_called_with(['scancel', '--name='+container_uuid])
- @mock.patch("subprocess.check_output")
+ @mock.patch("subprocess32.check_output")
def test_subscribers_get_server_lists(self, mock_squeue):
mock_squeue.return_value = ""
self.subscriber.assert_called_with([testutil.MockSize(1),
testutil.MockSize(2)])
- @mock.patch("subprocess.check_output")
+ @mock.patch("subprocess32.check_output")
def test_squeue_server_list(self, mock_squeue):
mock_squeue.return_value = """1|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzy|(null)|1234567890
2|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzz|(null)|1234567890
self.subscriber.assert_called_with([testutil.MockSize(1),
testutil.MockSize(2)])
- @mock.patch("subprocess.check_output")
+ @mock.patch("subprocess32.check_output")
def test_squeue_server_list_suffix(self, mock_squeue):
mock_squeue.return_value = """1|1024M|0|(ReqNodeNotAvail, UnavailableNodes:compute123)|zzzzz-dz642-zzzzzzzzzzzzzzy|(null)|1234567890
1|2G|0|(ReqNodeNotAvail)|zzzzz-dz642-zzzzzzzzzzzzzzz|(null)|1234567890
self.subscriber.assert_called_with([testutil.MockSize(1),
testutil.MockSize(2)])
- @mock.patch("subprocess.check_output")
+ @mock.patch("subprocess32.check_output")
def test_squeue_server_list_instancetype_constraint(self, mock_squeue):
mock_squeue.return_value = """1|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzy|instancetype=z2.test|1234567890\n"""
super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
*args, **kwargs)
self.client.nodes().list().execute.side_effect = side_effect
- @mock.patch("subprocess.check_output")
+ @mock.patch("subprocess32.check_output")
def test_uuid_is_subscription_key(self, sinfo_mock):
sinfo_mock.return_value = ""
node = testutil.arvados_node_mock()
self.subscriber.assert_called_with(node)
self.assertEqual("down", node["crunch_worker_state"])
- @mock.patch("subprocess.check_output")
+ @mock.patch("subprocess32.check_output")
def test_update_from_sinfo(self, sinfo_mock):
sinfo_mock.return_value = """compute1|idle|instancetype=a1.test
compute2|alloc|(null)
"revision": "d682213848ed68c0a260ca37d6dd5ace8423f5ba",
"revisionTime": "2017-12-05T20:32:29Z"
},
+ {
+ "checksumSHA1": "st4vb0GmDeoKbsfxdpNZ2MPl76M=",
+ "path": "github.com/StackExchange/wmi",
+ "revision": "cdffdb33acae0e14efff2628f9bae377b597840e",
+ "revisionTime": "2018-04-12T20:51:11Z"
+ },
{
"checksumSHA1": "spyv5/YFBjYyZLZa1U2LBfDR8PM=",
"path": "github.com/beorn7/perks/quantile",
"revision": "0ca9ea5df5451ffdf184b4428c902747c2c11cd7",
"revisionTime": "2017-03-27T23:54:44Z"
},
+ {
+ "checksumSHA1": "Kqv7bA4oJG0nPwQvGWDwGGaKONo=",
+ "path": "github.com/go-ole/go-ole",
+ "revision": "7a0fa49edf48165190530c675167e2f319a05268",
+ "revisionTime": "2018-06-25T08:58:08Z"
+ },
+ {
+ "checksumSHA1": "PArleDBtadu2qO4hJwHR8a3IOTA=",
+ "path": "github.com/go-ole/go-ole/oleutil",
+ "revision": "7a0fa49edf48165190530c675167e2f319a05268",
+ "revisionTime": "2018-06-25T08:58:08Z"
+ },
{
"checksumSHA1": "wn2shNJMwRZpvuvkf1s7h0wvqHI=",
"path": "github.com/gogo/protobuf/proto",
"revision": "1744e2970ca51c86172c8190fadad617561ed6e7",
"revisionTime": "2017-11-10T11:01:46Z"
},
+ {
+ "checksumSHA1": "q14d3C3xvWevU3dSv4P5K0+OSD0=",
+ "path": "github.com/shirou/gopsutil/cpu",
+ "revision": "63728fcf6b24475ecfea044e22242447666c2f52",
+ "revisionTime": "2018-07-05T13:28:12Z"
+ },
+ {
+ "checksumSHA1": "LZ9GloiGLTISmQ4dalK2XspH6Wo=",
+ "path": "github.com/shirou/gopsutil/host",
+ "revision": "63728fcf6b24475ecfea044e22242447666c2f52",
+ "revisionTime": "2018-07-05T13:28:12Z"
+ },
+ {
+ "checksumSHA1": "cyoqI0gryzjxGTkaAfyUqMiuUR0=",
+ "path": "github.com/shirou/gopsutil/internal/common",
+ "revision": "63728fcf6b24475ecfea044e22242447666c2f52",
+ "revisionTime": "2018-07-05T13:28:12Z"
+ },
+ {
+ "checksumSHA1": "vEQLjAO5T5K9zXblEMYdoaBZzj0=",
+ "path": "github.com/shirou/gopsutil/mem",
+ "revision": "63728fcf6b24475ecfea044e22242447666c2f52",
+ "revisionTime": "2018-07-05T13:28:12Z"
+ },
+ {
+ "checksumSHA1": "KMWFRa0DVpabo9d8euB4RYjUBQE=",
+ "path": "github.com/shirou/gopsutil/net",
+ "revision": "63728fcf6b24475ecfea044e22242447666c2f52",
+ "revisionTime": "2018-07-05T13:28:12Z"
+ },
+ {
+ "checksumSHA1": "fbO7c1gv1kSvWKOb/+5HUWFkBaA=",
+ "path": "github.com/shirou/gopsutil/process",
+ "revision": "63728fcf6b24475ecfea044e22242447666c2f52",
+ "revisionTime": "2018-07-05T13:28:12Z"
+ },
+ {
+ "checksumSHA1": "Nve7SpDmjsv6+rhkXAkfg/UQx94=",
+ "path": "github.com/shirou/w32",
+ "revision": "bb4de0191aa41b5507caa14b0650cdbddcd9280b",
+ "revisionTime": "2016-09-30T03:27:40Z"
+ },
{
"checksumSHA1": "8QeSG127zQqbA+YfkO1WkKx/iUI=",
"path": "github.com/src-d/gcfg",