.DS_Store
.vscode
.Rproj.user
+_version.py
\ No newline at end of file
url = 'https://' + url;
}
url = new URL(url);
- return m.request(url.origin + '/discovery/v1/apis/arvados/v1/rest').then(function() {
+ return db.discoveryDoc({baseURL: url.origin}).map(function() {
return url.origin + '/';
}).catch(function(err) {
// If url is a Workbench site (and isn't too old),
}
var session = db.loadLocal();
var apiHostname = new URL(session.baseURL).hostname;
- m.request(session.baseURL+'discovery/v1/apis/arvados/v1/rest').then(function(localDD) {
+ db.discoveryDoc(session).map(function(localDD) {
var uuidPrefix = localDD.uuidPrefix;
- m.request(baseURL+'discovery/v1/apis/arvados/v1/rest').then(function(dd) {
+ db.discoveryDoc({baseURL: baseURL}).map(function(dd) {
if (uuidPrefix in dd.remoteHosts ||
(dd.remoteHostsViaDNS && apiHostname.endsWith('.arvadosapi.com'))) {
// Federated identity login via salted token
var cache = db.discoveryCache[session.baseURL];
if (!cache) {
db.discoveryCache[session.baseURL] = cache = m.stream();
- m.request(session.baseURL+'discovery/v1/apis/arvados/v1/rest').then(cache);
+ m.request(session.baseURL+'discovery/v1/apis/arvados/v1/rest')
+ .then(function (dd) {
+ // Just in case we're talking with an old API server.
+ dd.remoteHosts = dd.remoteHosts || {};
+ if (dd.remoteHostsViaDNS === undefined) {
+ dd.remoteHostsViaDNS = false;
+ }
+ return dd;
+ })
+ .then(cache);
}
return cache;
},
if (userUUIDPrefix === session.user.owner_uuid.slice(0, 5)) {
return;
}
- var doc = db.discoveryDoc(session);
- doc.map(function(d) {
+ db.discoveryDoc(session).map(function (d) {
// Guess the remote host from the local discovery doc settings
var rHost = null;
if (d.remoteHosts[userUUIDPrefix]) {
}
// Get the remote cluster workbench url & redirect there.
db.findAPI(rHost).then(function (apiUrl) {
- var doc = db.discoveryDoc({baseURL: apiUrl});
- doc.map(function (d) {
+ db.discoveryDoc({baseURL: apiUrl}).map(function (d) {
document.location = d.workbenchUrl + path;
});
});
set -e
if [[ -z "$WORKSPACE" ]] ; then
- echo "$helpmessage"
- echo
- echo "Must set WORKSPACE"
- exit 1
+ export WORKSPACE=$(readlink -f $(dirname $0)/..)
+ echo "Using WORKSPACE $WORKSPACE"
fi
if [[ -z "$ARVADOS_API_HOST" || -z "$ARVADOS_API_TOKEN" ]] ; then
debian8,debian9,ubuntu1204,ubuntu1404,ubuntu1604,centos7|lockfile|0.12.2|2|python|all|--epoch 1
debian8,ubuntu1404,centos7|subprocess32|3.2.7|2|python|all
all|ruamel.yaml|0.13.7|2|python|amd64|--python-setup-py-arguments --single-version-externally-managed
-all|cwltest|1.0.20180209171722|4|python|all|--depends 'python-futures >= 3.0.5' --depends 'python-subprocess32'
-all|junit-xml|1.7|3|python|all
+all|cwltest|1.0.20180416154033|4|python|all|--depends 'python-futures >= 3.0.5' --depends 'python-subprocess32'
+all|junit-xml|1.8|3|python|all
all|rdflib-jsonld|0.4.0|2|python|all
all|futures|3.0.5|2|python|all
all|future|0.16.0|2|python|all
timer_reset
if [ ${#failures[@]} -eq 0 ]; then
+ echo "/usr/local/arvados-dev/jenkins/run_upload_packages.py -H jenkinsapt@apt.arvados.org -o Port=2222 --workspace $WORKSPACE $TARGET"
/usr/local/arvados-dev/jenkins/run_upload_packages.py -H jenkinsapt@apt.arvados.org -o Port=2222 --workspace $WORKSPACE $TARGET
else
echo "Skipping package upload, there were errors building and/or testing the packages"
fi
declare $(format_last_commit_here "git_ts=%ct git_hash=%h")
- echo "${prefix}.$(date -ud "@$git_ts" +%Y%m%d%H%M%S).$git_hash"
-}
+ ARVADOS_BUILDING_VERSION="$(git describe --abbrev=0).$(date -ud "@$git_ts" +%Y%m%d%H%M%S)"
+ echo "$ARVADOS_BUILDING_VERSION"
+}
nohash_version_from_git() {
if [[ -n "$ARVADOS_BUILDING_VERSION" ]]; then
echo "$ARVADOS_BUILDING_VERSION"
return
fi
- version_from_git $1 | cut -d. -f1-3
+ version_from_git $1 | cut -d. -f1-4
}
timestamp_from_git() {
# See if we can skip building the package, only if it already exists in the
# processed/ directory. If so, move it back to the packages directory to make
# sure it gets picked up by the test and/or upload steps.
- if [[ -e "processed/$complete_pkgname" ]]; then
- echo "Package $complete_pkgname exists, not rebuilding!"
- mv processed/$complete_pkgname .
- return 1
+ # Get the list of packages from the repos
+
+ if [[ "$FORMAT" == "deb" ]]; then
+ debian_distros="jessie precise stretch trusty wheezy xenial"
+
+ for D in ${debian_distros}; do
+ if [ ${pkgname:0:3} = "lib" ]; then
+ repo_subdir=${pkgname:0:4}
+ else
+ repo_subdir=${pkgname:0:1}
+ fi
+
+ repo_pkg_list=$(curl -o - http://apt.arvados.org/pool/${D}/main/${repo_subdir}/)
+ echo ${repo_pkg_list} |grep -q ${complete_pkgname}
+ if [ $? -eq 0 ]; then
+ echo "Package $complete_pkgname exists, not rebuilding!"
+ curl -o ./${complete_pkgname} http://apt.arvados.org/pool/${D}/main/${repo_subdir}/${complete_pkgname}
+ return 1
+ else
+ echo "Package $complete_pkgname not found, building"
+ return 0
+ fi
+ done
else
- echo "Package $complete_pkgname not found, building"
- return 0
+ centos_repo="http://rpm.arvados.org/CentOS/7/dev/x86_64/"
+
+ repo_pkg_list=$(curl -o - ${centos_repo})
+ echo ${repo_pkg_list} |grep -q ${complete_pkgname}
+ if [ $? -eq 0 ]; then
+ echo "Package $complete_pkgname exists, not rebuilding!"
+ curl -o ./${complete_pkgname} ${centos_repo}${complete_pkgname}
+ return 1
+ else
+ echo "Package $complete_pkgname not found, building"
+ return 0
+ fi
fi
}
# The following directions are based on
# https://azure.microsoft.com/en-us/documentation/articles/resource-group-authenticate-service-principal/
+# and updated for v2 of the Azure cli tool.
#
-# azure config mode arm
-# azure ad app create --name "<Your Application Display Name>" --home-page "<https://YourApplicationHomePage>" --identifier-uris "<https://YouApplicationUri>" --password <Your_Password>
-# azure ad sp create "<Application_Id>"
-# azure role assignment create --objectId "<Object_Id>" -o Owner -c /subscriptions/{subscriptionId}/
+# az ad app create --display-name "Node Manager" --homepage "https://arvados.org" --identifier-uris "https://<Your_Application_Uri>" --password <Your_Password>
+# az ad sp create "<Application_Id>"
+# az role assignment create --assignee "<Application_Id>" --role Owner --resource-group "<Your_Azure_Arvados_Resource_Group>"
#
# Use <Application_Id> for "key" and the <Your_Password> for "secret"
#
# If the authorization origins are not displayed, clicking on *Create Client ID* will take you to *Consent screen* settings.
## On consent screen settings, enter the appropriate details and click on *Save*.
## This will return you to the *Create Client ID* dialog box.
-# You must set the authorization origins. Edit @sso.your-site.com@ to the appropriate hostname that you will use to access the SSO service:
-## JavaScript origin should be @https://sso.your-site.com/@
-## Redirect URI should be @https://sso.your-site.com/users/auth/google_oauth2/callback@
+# You must set the authorization origins. Edit @auth.your.domain@ to the appropriate hostname that you will use to access the SSO service:
+## JavaScript origin should be @https://auth.your.domain/@
+## Redirect URI should be @https://auth.your.domain/users/auth/google_oauth2/callback@
# Copy the values of *Client ID* and *Client secret* from the Google Developers Console into the Google section of @config/application.yml@, like this:
<notextile>
exit
end
+git_latest_tag = `git describe --abbrev=0`
+git_latest_tag = git_latest_tag.encode('utf-8').strip
git_timestamp, git_hash = `git log -n1 --first-parent --format=%ct:%H .`.chomp.split(":")
git_timestamp = Time.at(git_timestamp.to_i).utc
Gem::Specification.new do |s|
s.name = 'arvados-cli'
- s.version = "0.1.#{git_timestamp.strftime('%Y%m%d%H%M%S')}"
+ s.version = "#{git_latest_tag}.#{git_timestamp.strftime('%Y%m%d%H%M%S')}"
s.date = git_timestamp.strftime("%Y-%m-%d")
s.summary = "Arvados CLI tools"
s.description = "Arvados command line tools, git commit #{git_hash}"
include LICENSE-2.0.txt
include README.rst
+include arvados_version.py
\ No newline at end of file
import re
from functools import partial
import pkg_resources # part of setuptools
+import Queue
+import time
+import signal
+import thread
from cwltool.errors import WorkflowException
import cwltool.main
import arvados.config
from arvados.keep import KeepClient
from arvados.errors import ApiError
+import arvados.commands._util as arv_cmd
from .arvcontainer import ArvadosContainer, RunnerContainer
from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
from .perf import Perf
from .pathmapper import NoFollowPathMapper
+from .task_queue import TaskQueue
from ._version import __version__
from cwltool.pack import pack
"""
- def __init__(self, api_client, work_api=None, keep_client=None, output_name=None, output_tags=None, num_retries=4):
+ def __init__(self, api_client, work_api=None, keep_client=None,
+ output_name=None, output_tags=None, num_retries=4,
+ thread_count=4):
self.api = api_client
self.processes = {}
- self.lock = threading.Lock()
- self.cond = threading.Condition(self.lock)
+ self.workflow_eval_lock = threading.Condition(threading.RLock())
self.final_output = None
self.final_status = None
self.uploaded = {}
self.intermediate_output_ttl = 0
self.intermediate_output_collections = []
self.trash_intermediate = False
+ self.thread_count = thread_count
+ self.poll_interval = 12
if keep_client is not None:
self.keep_client = keep_client
return cwltool.workflow.defaultMakeTool(toolpath_object, **kwargs)
def output_callback(self, out, processStatus):
- if processStatus == "success":
- logger.info("Overall process status is %s", processStatus)
- if self.pipeline:
- self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
- body={"state": "Complete"}).execute(num_retries=self.num_retries)
- else:
- logger.warn("Overall process status is %s", processStatus)
- if self.pipeline:
- self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
- body={"state": "Failed"}).execute(num_retries=self.num_retries)
- self.final_status = processStatus
- self.final_output = out
+ with self.workflow_eval_lock:
+ if processStatus == "success":
+ logger.info("Overall process status is %s", processStatus)
+ if self.pipeline:
+ self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+ body={"state": "Complete"}).execute(num_retries=self.num_retries)
+ else:
+ logger.warn("Overall process status is %s", processStatus)
+ if self.pipeline:
+ self.api.pipeline_instances().update(uuid=self.pipeline["uuid"],
+ body={"state": "Failed"}).execute(num_retries=self.num_retries)
+ self.final_status = processStatus
+ self.final_output = out
+ self.workflow_eval_lock.notifyAll()
+
+
+ def start_run(self, runnable, kwargs):
+ self.task_queue.add(partial(runnable.run, **kwargs))
+
+ def process_submitted(self, container):
+ with self.workflow_eval_lock:
+ self.processes[container.uuid] = container
+
+ def process_done(self, uuid):
+ with self.workflow_eval_lock:
+ if uuid in self.processes:
+ del self.processes[uuid]
+
+ def wrapped_callback(self, cb, obj, st):
+ with self.workflow_eval_lock:
+ cb(obj, st)
+ self.workflow_eval_lock.notifyAll()
+
+ def get_wrapped_callback(self, cb):
+ return partial(self.wrapped_callback, cb)
def on_message(self, event):
- if "object_uuid" in event:
- if event["object_uuid"] in self.processes and event["event_type"] == "update":
- if event["properties"]["new_attributes"]["state"] == "Running" and self.processes[event["object_uuid"]].running is False:
- uuid = event["object_uuid"]
- with self.lock:
- j = self.processes[uuid]
- logger.info("%s %s is Running", self.label(j), uuid)
+ if event.get("object_uuid") in self.processes and event["event_type"] == "update":
+ uuid = event["object_uuid"]
+ if event["properties"]["new_attributes"]["state"] == "Running":
+ with self.workflow_eval_lock:
+ j = self.processes[uuid]
+ if j.running is False:
j.running = True
j.update_pipeline_component(event["properties"]["new_attributes"])
- elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
- uuid = event["object_uuid"]
- try:
- self.cond.acquire()
- j = self.processes[uuid]
- logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
- with Perf(metrics, "done %s" % j.name):
- j.done(event["properties"]["new_attributes"])
- self.cond.notify()
- finally:
- self.cond.release()
+ logger.info("%s %s is Running", self.label(j), uuid)
+ elif event["properties"]["new_attributes"]["state"] in ("Complete", "Failed", "Cancelled", "Final"):
+ with self.workflow_eval_lock:
+ j = self.processes[uuid]
+ self.task_queue.add(partial(j.done, event["properties"]["new_attributes"]))
+ logger.info("%s %s is %s", self.label(j), uuid, event["properties"]["new_attributes"]["state"])
def label(self, obj):
return "[%s %s]" % (self.work_api[0:-1], obj.name)
"""
try:
+ remain_wait = self.poll_interval
while True:
- self.stop_polling.wait(15)
+ if remain_wait > 0:
+ self.stop_polling.wait(remain_wait)
if self.stop_polling.is_set():
break
- with self.lock:
- keys = self.processes.keys()
+ with self.workflow_eval_lock:
+ keys = list(self.processes.keys())
if not keys:
+ remain_wait = self.poll_interval
continue
+ begin_poll = time.time()
if self.work_api == "containers":
table = self.poll_api.container_requests()
elif self.work_api == "jobs":
proc_states = table.list(filters=[["uuid", "in", keys]]).execute(num_retries=self.num_retries)
except Exception as e:
logger.warn("Error checking states on API server: %s", e)
+ remain_wait = self.poll_interval
continue
for p in proc_states["items"]:
"new_attributes": p
}
})
+ finish_poll = time.time()
+ remain_wait = self.poll_interval - (finish_poll - begin_poll)
except:
- logger.error("Fatal error in state polling thread.", exc_info=(sys.exc_info()[1] if self.debug else False))
- self.cond.acquire()
- self.processes.clear()
- self.cond.notify()
- self.cond.release()
+ logger.exception("Fatal error in state polling thread.")
+ with self.workflow_eval_lock:
+ self.processes.clear()
+ self.workflow_eval_lock.notifyAll()
finally:
self.stop_polling.set()
self.api.collections().delete(uuid=i).execute(num_retries=self.num_retries)
except:
logger.warn("Failed to delete intermediate output: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
- if sys.exc_info()[0] is KeyboardInterrupt:
+ if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
break
def check_features(self, obj):
# Reload tool object which may have been updated by
# upload_workflow_deps
+ # Don't validate this time because it will just print redundant errors.
tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
makeTool=self.arv_make_tool,
loader=tool.doc_loader,
avsc_names=tool.doc_schema,
- metadata=tool.metadata)
+ metadata=tool.metadata,
+ do_validate=False)
# Upload local file references in the job order.
job_order = upload_job_order(self, "%s input" % kwargs["name"],
logger.info("Pipeline instance %s", self.pipeline["uuid"])
if runnerjob and not kwargs.get("wait"):
- runnerjob.run(wait=kwargs.get("wait"))
+ submitargs = kwargs.copy()
+ submitargs['submit'] = False
+ runnerjob.run(**submitargs)
return (runnerjob.uuid, "success")
self.poll_api = arvados.api('v1')
self.polling_thread = threading.Thread(target=self.poll_states)
self.polling_thread.start()
+ self.task_queue = TaskQueue(self.workflow_eval_lock, self.thread_count)
+
if runnerjob:
jobiter = iter((runnerjob,))
else:
**kwargs)
try:
- self.cond.acquire()
- # Will continue to hold the lock for the duration of this code
- # except when in cond.wait(), at which point on_message can update
- # job state and process output callbacks.
+ self.workflow_eval_lock.acquire()
+ # Holds the lock while this code runs and releases it when
+ # it is safe to do so in self.workflow_eval_lock.wait(),
+ # at which point on_message can update job state and
+ # process output callbacks.
loopperf = Perf(metrics, "jobiter")
loopperf.__enter__()
if self.stop_polling.is_set():
break
+ if self.task_queue.error is not None:
+ raise self.task_queue.error
+
if runnable:
with Perf(metrics, "run"):
- runnable.run(**kwargs)
+ self.start_run(runnable, kwargs)
else:
- if self.processes:
- self.cond.wait(1)
+ if (self.task_queue.in_flight + len(self.processes)) > 0:
+ self.workflow_eval_lock.wait(3)
else:
- logger.error("Workflow is deadlocked, no runnable jobs and not waiting on any pending jobs.")
+ logger.error("Workflow is deadlocked, no runnable processes and not waiting on any pending processes.")
break
loopperf.__enter__()
loopperf.__exit__()
- while self.processes:
- self.cond.wait(1)
+ while (self.task_queue.in_flight + len(self.processes)) > 0:
+ if self.task_queue.error is not None:
+ raise self.task_queue.error
+ self.workflow_eval_lock.wait(3)
except UnsupportedRequirement:
raise
except:
- if sys.exc_info()[0] is KeyboardInterrupt:
- logger.error("Interrupted, marking pipeline as failed")
+ if sys.exc_info()[0] is KeyboardInterrupt or sys.exc_info()[0] is SystemExit:
+ logger.error("Interrupted, workflow will be cancelled")
else:
logger.error("Execution failed: %s", sys.exc_info()[1], exc_info=(sys.exc_info()[1] if self.debug else False))
if self.pipeline:
self.api.container_requests().update(uuid=runnerjob.uuid,
body={"priority": "0"}).execute(num_retries=self.num_retries)
finally:
- self.cond.release()
+ self.workflow_eval_lock.release()
+ self.task_queue.drain()
self.stop_polling.set()
self.polling_thread.join()
+ self.task_queue.join()
if self.final_status == "UnsupportedRequirement":
raise UnsupportedRequirement("Check log for details.")
help="Workflow priority (range 1..1000, higher has precedence over lower, containers api only)",
default=DEFAULT_PRIORITY)
+ parser.add_argument("--disable-validate", dest="do_validate",
+ action="store_false", default=True,
+ help=argparse.SUPPRESS)
+
+ parser.add_argument("--disable-js-validation",
+ action="store_true", default=False,
+ help=argparse.SUPPRESS)
+
+ parser.add_argument("--thread-count", type=int,
+ default=4, help="Number of threads to use for job submit and output collection.")
+
exgroup = parser.add_mutually_exclusive_group()
exgroup.add_argument("--trash-intermediate", action="store_true",
default=False, dest="trash_intermediate",
"http://arvados.org/cwl#ReuseRequirement"
])
-def main(args, stdout, stderr, api_client=None, keep_client=None):
+def exit_signal_handler(sigcode, frame):
+ logger.error("Caught signal {}, exiting.".format(sigcode))
+ sys.exit(-sigcode)
+
+def main(args, stdout, stderr, api_client=None, keep_client=None,
+ install_sig_handlers=True):
parser = arg_parser()
job_order_object = None
arvargs = parser.parse_args(args)
+ if install_sig_handlers:
+ arv_cmd.install_signal_handlers()
+
if arvargs.update_workflow:
if arvargs.update_workflow.find('-7fd4e-') == 5:
want_api = 'containers'
try:
if api_client is None:
- api_client=arvados.api('v1', model=OrderedJsonModel())
+ api_client = arvados.safeapi.ThreadSafeApiCache(api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4})
+ keep_client = api_client.keep
if keep_client is None:
keep_client = arvados.keep.KeepClient(api_client=api_client, num_retries=4)
runner = ArvCwlRunner(api_client, work_api=arvargs.work_api, keep_client=keep_client,
num_retries=4, output_name=arvargs.output_name,
- output_tags=arvargs.output_tags)
+ output_tags=arvargs.output_tags,
+ thread_count=arvargs.thread_count)
except Exception as e:
logger.error(e)
return 1
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import pkg_resources
-
-__version__ = pkg_resources.require('arvados-cwl-runner')[0].version
container_request["name"] = wfrecord["name"]
container_request["properties"]["template_uuid"] = wfuuid
+ self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
+
try:
response = self.arvrunner.api.container_requests().create(
body=container_request
).execute(num_retries=self.arvrunner.num_retries)
self.uuid = response["uuid"]
- self.arvrunner.processes[self.uuid] = self
+ self.arvrunner.process_submitted(self)
if response["state"] == "Final":
logger.info("%s reused container %s", self.arvrunner.label(self), response["container_uuid"])
processStatus = "permanentFail"
finally:
self.output_callback(outputs, processStatus)
- if record["uuid"] in self.arvrunner.processes:
- del self.arvrunner.processes[record["uuid"]]
+ self.arvrunner.process_done(record["uuid"])
class RunnerContainer(Runner):
container_req["properties"]["template_uuid"] = self.tool.tool["id"][6:33]
- command = ["arvados-cwl-runner", "--local", "--api=containers", "--no-log-timestamps"]
+ # --local means execute the workflow instead of submitting a container request
+ # --api=containers means use the containers API
+ # --no-log-timestamps means don't add timestamps (the logging infrastructure does this)
+ # --disable-validate because we already validated so don't need to do it again
+ # --eval-timeout is the timeout for javascript invocation
+ # --parallel-task-count is the number of threads to use for job submission
+ # --enable/disable-reuse sets desired job reuse
+ command = ["arvados-cwl-runner",
+ "--local",
+ "--api=containers",
+ "--no-log-timestamps",
+ "--disable-validate",
+ "--eval-timeout=%s" % self.arvrunner.eval_timeout,
+ "--thread-count=%s" % self.arvrunner.thread_count,
+ "--enable-reuse" if self.enable_reuse else "--disable-reuse"]
+
if self.output_name:
command.append("--output-name=" + self.output_name)
container_req["output_name"] = self.output_name
if kwargs.get("debug"):
command.append("--debug")
- if self.enable_reuse:
- command.append("--enable-reuse")
- else:
- command.append("--disable-reuse")
-
if self.on_error:
command.append("--on-error=" + self.on_error)
if self.arvrunner.project_uuid:
command.append("--project-uuid="+self.arvrunner.project_uuid)
- command.append("--eval-timeout=%s" % self.arvrunner.eval_timeout)
-
command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
container_req["command"] = command
return container_req
- def run(self, *args, **kwargs):
+ def run(self, **kwargs):
kwargs["keepprefix"] = "keep:"
- job_spec = self.arvados_job_spec(*args, **kwargs)
+ job_spec = self.arvados_job_spec(**kwargs)
job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
response = self.arvrunner.api.container_requests().create(
).execute(num_retries=self.arvrunner.num_retries)
self.uuid = response["uuid"]
- self.arvrunner.processes[self.uuid] = self
+ self.arvrunner.process_submitted(self)
logger.info("%s submitted container %s", self.arvrunner.label(self), response["uuid"])
else:
super(RunnerContainer, self).done(container)
finally:
- if record["uuid"] in self.arvrunner.processes:
- del self.arvrunner.processes[record["uuid"]]
+ self.arvrunner.process_done(record["uuid"])
args.append(image_tag)
logger.info("Uploading Docker image %s:%s", image_name, image_tag)
try:
- arvados.commands.keepdocker.main(args, stdout=sys.stderr)
+ arvados.commands.put.api_client = api_client
+ arvados.commands.keepdocker.main(args, stdout=sys.stderr, install_sig_handlers=False, api=api_client)
except SystemExit as e:
if e.code:
raise WorkflowException("keepdocker exited with code %s" % e.code)
if reuse_req:
enable_reuse = reuse_req["enableReuse"]
+ self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
+
try:
with Perf(metrics, "create %s" % self.name):
response = self.arvrunner.api.jobs().create(
find_or_create=enable_reuse
).execute(num_retries=self.arvrunner.num_retries)
- self.arvrunner.processes[response["uuid"]] = self
+ self.uuid = response["uuid"]
+ self.arvrunner.process_submitted(self)
self.update_pipeline_component(response)
self.output_callback({}, "permanentFail")
def update_pipeline_component(self, record):
- if self.arvrunner.pipeline:
- self.arvrunner.pipeline["components"][self.name] = {"job": record}
- with Perf(metrics, "update_pipeline_component %s" % self.name):
- self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(
- uuid=self.arvrunner.pipeline["uuid"],
- body={
- "components": self.arvrunner.pipeline["components"]
- }).execute(num_retries=self.arvrunner.num_retries)
- if self.arvrunner.uuid:
- try:
- job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
- if job:
- components = job["components"]
- components[self.name] = record["uuid"]
- self.arvrunner.api.jobs().update(
- uuid=self.arvrunner.uuid,
+ with self.arvrunner.workflow_eval_lock:
+ if self.arvrunner.pipeline:
+ self.arvrunner.pipeline["components"][self.name] = {"job": record}
+ with Perf(metrics, "update_pipeline_component %s" % self.name):
+ self.arvrunner.pipeline = self.arvrunner.api.pipeline_instances().update(
+ uuid=self.arvrunner.pipeline["uuid"],
body={
- "components": components
+ "components": self.arvrunner.pipeline["components"]
}).execute(num_retries=self.arvrunner.num_retries)
- except Exception as e:
- logger.info("Error adding to components: %s", e)
+ if self.arvrunner.uuid:
+ try:
+ job = self.arvrunner.api.jobs().get(uuid=self.arvrunner.uuid).execute()
+ if job:
+ components = job["components"]
+ components[self.name] = record["uuid"]
+ self.arvrunner.api.jobs().update(
+ uuid=self.arvrunner.uuid,
+ body={
+ "components": components
+ }).execute(num_retries=self.arvrunner.num_retries)
+ except Exception as e:
+ logger.info("Error adding to components: %s", e)
def done(self, record):
try:
processStatus = "permanentFail"
finally:
self.output_callback(outputs, processStatus)
- if record["uuid"] in self.arvrunner.processes:
- del self.arvrunner.processes[record["uuid"]]
+ self.arvrunner.process_done(record["uuid"])
+
class RunnerJob(Runner):
"""Submit and manage a Crunch job that runs crunch_scripts/cwl-runner."""
}
}
- def run(self, *args, **kwargs):
- job_spec = self.arvados_job_spec(*args, **kwargs)
+ def run(self, **kwargs):
+ job_spec = self.arvados_job_spec(**kwargs)
job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
return
self.uuid = job["uuid"]
- self.arvrunner.processes[self.uuid] = self
+ self.arvrunner.process_submitted(self)
if job["state"] in ("Complete", "Failed", "Cancelled"):
self.done(job)
if isinstance(res_req[a], int): # integer check
all_res_req[a].append(res_req[a])
else:
- msg = SourceLine(res_req).makeError(
+ msg = SourceLine(res_req, a).makeError(
"Non-top-level ResourceRequirement in single container cannot have expressions")
exception_msgs.append(msg)
if exception_msgs:
self.arvrunner = arvrunner
self.work_api = kwargs["work_api"]
self.wf_pdh = None
+ self.dynamic_resource_req = []
+ self.static_resource_req = []
def job(self, joborder, output_callback, **kwargs):
kwargs["work_api"] = self.work_api
builder.hints = workflowobj["hints"]
builder.resources = {}
- res_reqs = {"requirements": [], "hints": []}
- for t in ("requirements", "hints"):
- for item in packed["$graph"]:
- if t in item:
- if item["id"] == "#main": # evaluate potential expressions in the top-level requirements/hints
- for req in item[t]:
- if req["class"] == "ResourceRequirement":
- eval_req = {"class": "ResourceRequirement"}
- for a in max_res_pars + sum_res_pars:
- if a in req:
- eval_req[a] = builder.do_eval(req[a])
- res_reqs[t].append(eval_req)
- else:
- for req in item[t]:
- if req["class"] == "ResourceRequirement":
- res_reqs[t].append(req)
- overall_res_req = {"requirements": get_overall_res_req(res_reqs["requirements"]),
- "hints": get_overall_res_req(res_reqs["hints"])}
-
- new_spec = {"requirements": self.requirements, "hints": self.hints}
- for t in ("requirements", "hints"):
- for req in new_spec[t]:
- if req["class"] == "ResourceRequirement":
- new_spec[t].remove(req)
- if overall_res_req[t]:
- new_spec[t].append(overall_res_req[t])
+ def visit(item):
+ for t in ("hints", "requirements"):
+ if t not in item:
+ continue
+ for req in item[t]:
+ if req["class"] == "ResourceRequirement":
+ dyn = False
+ for k in max_res_pars + sum_res_pars:
+ if k in req:
+ if isinstance(req[k], basestring):
+ if item["id"] == "#main":
+ # only the top-level requirements/hints may contain expressions
+ self.dynamic_resource_req.append(req)
+ dyn = True
+ break
+ else:
+ with SourceLine(req, k, WorkflowException):
+ raise WorkflowException("Non-top-level ResourceRequirement in single container cannot have expressions")
+ if not dyn:
+ self.static_resource_req.append(req)
+
+ visit_class(packed["$graph"], ("Workflow", "CommandLineTool"), visit)
+
+ if self.static_resource_req:
+ self.static_resource_req = [get_overall_res_req(self.static_resource_req)]
upload_dependencies(self.arvrunner,
kwargs.get("name", ""),
uri,
False)
+ if self.dynamic_resource_req:
+ builder = Builder()
+ builder.job = joborder
+ builder.requirements = self.requirements
+ builder.hints = self.hints
+ builder.resources = {}
+
+ # Evaluate dynamic resource requirements using current builder
+ rs = copy.copy(self.static_resource_req)
+ for dyn_rs in self.dynamic_resource_req:
+ eval_req = {"class": "ResourceRequirement"}
+ for a in max_res_pars + sum_res_pars:
+ if a in dyn_rs:
+ eval_req[a] = builder.do_eval(dyn_rs[a])
+ rs.append(eval_req)
+ job_res_reqs = [get_overall_res_req(rs)]
+ else:
+ job_res_reqs = self.static_resource_req
+
with Perf(metrics, "subworkflow adjust"):
joborder_resolved = copy.deepcopy(joborder)
joborder_keepmount = copy.deepcopy(joborder)
"inputs": self.tool["inputs"],
"outputs": self.tool["outputs"],
"stdout": "cwl.output.json",
- "requirements": self.requirements+[
+ "requirements": self.requirements+job_res_reqs+[
{
"class": "InitialWorkDirRequirement",
"listing": [{
debug = job_order_object["arv:debug"]
del job_order_object["arv:debug"]
- runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()),
+ runner = arvados_cwl.ArvCwlRunner(api_client=arvados.safeapi.ThreadSafeApiCache(
+ api_params={"model": OrderedJsonModel()}, keep_params={"num_retries": 4}),
output_name=output_name, output_tags=output_tags)
make_fs_access = functools.partial(CollectionFsAccess,
args.trash_intermediate = False
args.intermediate_output_ttl = 0
args.priority = arvados_cwl.DEFAULT_PRIORITY
+ args.do_validate = True
+ args.disable_js_validation = False
runner.arv_executor(t, job_order_object, **vars(args))
except Exception as e:
else:
self.arvrunner.output_callback(outputs, processStatus)
finally:
- if record["uuid"] in self.arvrunner.processes:
- del self.arvrunner.processes[record["uuid"]]
+ self.arvrunner.process_done(record["uuid"])
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import Queue
+import threading
+import logging
+
+logger = logging.getLogger('arvados.cwl-runner')
+
+class TaskQueue(object):
+ def __init__(self, lock, thread_count):
+ self.thread_count = thread_count
+ self.task_queue = Queue.Queue()
+ self.task_queue_threads = []
+ self.lock = lock
+ self.in_flight = 0
+ self.error = None
+
+ for r in xrange(0, self.thread_count):
+ t = threading.Thread(target=self.task_queue_func)
+ self.task_queue_threads.append(t)
+ t.start()
+
+ def task_queue_func(self):
+
+ while True:
+ task = self.task_queue.get()
+ if task is None:
+ return
+ try:
+ task()
+ except Exception as e:
+ logger.exception("Unhandled exception running task")
+ self.error = e
+
+ with self.lock:
+ self.in_flight -= 1
+
+ def add(self, task):
+ with self.lock:
+ if self.thread_count > 1:
+ self.in_flight += 1
+ self.task_queue.put(task)
+ else:
+ task()
+
+ def drain(self):
+ try:
+ # Drain queue
+ while not self.task_queue.empty():
+ self.task_queue.get(True, .1)
+ except Queue.Empty:
+ pass
+
+ def join(self):
+ for t in self.task_queue_threads:
+ self.task_queue.put(None)
+ for t in self.task_queue_threads:
+ t.join()
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import subprocess
+import time
+import os
+import re
+
+def git_latest_tag():
+ gitinfo = subprocess.check_output(
+ ['git', 'describe', '--abbrev=0']).strip()
+ return str(gitinfo.decode('utf-8'))
+
+def git_timestamp_tag():
+ gitinfo = subprocess.check_output(
+ ['git', 'log', '--first-parent', '--max-count=1',
+ '--format=format:%ct', '.']).strip()
+ return str(time.strftime('.%Y%m%d%H%M%S', time.gmtime(int(gitinfo))))
+
+def save_version(setup_dir, module, v):
+ with open(os.path.join(setup_dir, module, "_version.py"), 'w') as fp:
+ return fp.write("__version__ = '%s'\n" % v)
+
+def read_version(setup_dir, module):
+ with open(os.path.join(setup_dir, module, "_version.py"), 'r') as fp:
+ return re.match("__version__ = '(.*)'$", fp.read()).groups()[0]
+
+def get_version(setup_dir, module):
+ env_version = os.environ.get("ARVADOS_BUILDING_VERSION")
+
+ if env_version:
+ save_version(setup_dir, module, env_version)
+ else:
+ try:
+ save_version(setup_dir, module, git_latest_tag() + git_timestamp_tag())
+ except subprocess.CalledProcessError:
+ pass
+
+ return read_version(setup_dir, module)
If a build tag has already been set (e.g., "egg_info -b", building
from source package), leave it alone.
"""
+ def git_latest_tag(self):
+ gitinfo = subprocess.check_output(
+ ['git', 'describe', '--abbrev=0']).strip()
+ return str(gitinfo.decode('utf-8'))
def git_timestamp_tag(self):
gitinfo = subprocess.check_output(
def tags(self):
if self.tag_build is None:
- self.tag_build = self.git_timestamp_tag()
+ self.tag_build = self.git_latest_tag() + self.git_timestamp_tag()
return egg_info.tags(self)
#
# SPDX-License-Identifier: Apache-2.0
+from __future__ import absolute_import
import os
import sys
-import subprocess
-import setuptools.command.egg_info as egg_info_cmd
from setuptools import setup, find_packages
SETUP_DIR = os.path.dirname(__file__) or '.'
README = os.path.join(SETUP_DIR, 'README.rst')
-tagger = egg_info_cmd.egg_info
-version = os.environ.get("ARVADOS_BUILDING_VERSION")
-if not version:
- version = "1.0"
- try:
- import gittaggers
- tagger = gittaggers.EggInfoFromGit
- except ImportError:
- pass
+import arvados_version
+version = arvados_version.get_version(SETUP_DIR, "arvados_cwl")
setup(name='arvados-cwl-runner',
version=version,
# Note that arvados/build/run-build-packages.sh looks at this
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20180322194411',
+ 'cwltool==1.0.20180403145700',
'schema-salad==2.6.20171201034858',
'typing==3.5.3.0',
'ruamel.yaml==0.13.7',
- 'arvados-python-client>=0.1.20170526013812',
+ 'arvados-python-client>=1.1.4.20180418202329',
'setuptools',
'ciso8601 >=1.0.0, <=1.0.4',
],
],
test_suite='tests',
tests_require=['mock>=1.0'],
- zip_safe=True,
- cmdclass={'egg_info': tagger},
+ zip_safe=True
)
}
tool: wf/secret_wf.cwl
doc: "Test secret input parameters"
+
+- job: null
+ output:
+ out: null
+ tool: wf/runin-reqs-wf.cwl
+ doc: "RunInSingleContainer handles dynamic resource requests on step"
+
+- job: null
+ output:
+ out: null
+ tool: wf/runin-reqs-wf2.cwl
+ doc: "RunInSingleContainer handles dynamic resource requests on embedded subworkflow"
+
+- job: null
+ output:
+ out: null
+ tool: wf/runin-reqs-wf3.cwl
+ should_fail: true
+ doc: "RunInSingleContainer disallows dynamic resource request on subworkflow steps"
+
+- job: null
+ output:
+ out: null
+ tool: wf/runin-reqs-wf4.cwl
+ doc: "RunInSingleContainer discovers static resource request in subworkflow steps"
}
]}},
'cwl:tool': '3fffdeaa75e018172e1b583425f4ebff+60/workflow.cwl#main',
+ 'arv:debug': True,
'arv:enable_reuse': True,
'arv:on_error': 'continue'
},
'secret_mounts': {},
'state': 'Committed',
'owner_uuid': None,
- 'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--enable-reuse', '--on-error=continue', '--eval-timeout=20',
+ 'command': ['arvados-cwl-runner', '--local', '--api=containers',
+ '--no-log-timestamps', '--disable-validate',
+ '--eval-timeout=20', '--thread-count=4',
+ '--enable-reuse', '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
'name': 'submit_wf.cwl',
'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
exited = arvados_cwl.main(
- ["--submit", "--no-wait",
+ ["--submit", "--no-wait", "--debug",
"--project-uuid", project_uuid,
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
sys.stdout, sys.stderr, api_client=stubs.api)
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = [
- 'arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--disable-reuse', '--on-error=continue', '--eval-timeout=20',
+ 'arvados-cwl-runner', '--local', '--api=containers',
+ '--no-log-timestamps', '--disable-validate',
+ '--eval-timeout=20', '--thread-count=4',
+ '--disable-reuse', '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container["use_existing"] = False
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = [
- 'arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--disable-reuse', '--on-error=continue', '--eval-timeout=20',
+ 'arvados-cwl-runner', '--local', '--api=containers',
+ '--no-log-timestamps', '--disable-validate',
+ '--eval-timeout=20', '--thread-count=4',
+ '--disable-reuse', '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container["use_existing"] = False
expect_container["name"] = "submit_wf_no_reuse.cwl"
logging.exception("")
expect_container = copy.deepcopy(stubs.expect_container_spec)
- expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--enable-reuse', '--on-error=stop', '--eval-timeout=20',
- '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
+ '--no-log-timestamps', '--disable-validate',
+ '--eval-timeout=20', '--thread-count=4',
+ '--enable-reuse', '--debug', '--on-error=stop',
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
logging.exception("")
expect_container = copy.deepcopy(stubs.expect_container_spec)
- expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- "--output-name="+output_name, '--enable-reuse', '--on-error=continue', '--eval-timeout=20',
- '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
+ '--no-log-timestamps', '--disable-validate',
+ '--eval-timeout=20', '--thread-count=4',
+ '--enable-reuse',
+ "--output-name="+output_name, '--debug', '--on-error=continue',
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container["output_name"] = output_name
stubs.api.container_requests().create.assert_called_with(
logging.exception("")
expect_container = copy.deepcopy(stubs.expect_container_spec)
- expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--enable-reuse', '--on-error=continue',
- "--intermediate-output-ttl=3600", '--eval-timeout=20',
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
+ '--no-log-timestamps', '--disable-validate',
+ '--eval-timeout=20', '--thread-count=4',
+ '--enable-reuse', '--debug', '--on-error=continue',
+ "--intermediate-output-ttl=3600",
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.api.container_requests().create.assert_called_with(
logging.exception("")
expect_container = copy.deepcopy(stubs.expect_container_spec)
- expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--enable-reuse', '--on-error=continue',
- "--trash-intermediate", '--eval-timeout=20',
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
+ '--no-log-timestamps', '--disable-validate',
+ '--eval-timeout=20', '--thread-count=4',
+ '--enable-reuse', '--debug', '--on-error=continue',
+ "--trash-intermediate",
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.api.container_requests().create.assert_called_with(
logging.exception("")
expect_container = copy.deepcopy(stubs.expect_container_spec)
- expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- "--output-tags="+output_tags, '--enable-reuse', '--on-error=continue', '--eval-timeout=20',
- '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
+ '--no-log-timestamps', '--disable-validate',
+ '--eval-timeout=20', '--thread-count=4',
+ '--enable-reuse',
+ "--output-tags="+output_tags, '--debug', '--on-error=continue',
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
'output_path': '/var/spool/cwl',
'name': 'expect_arvworkflow.cwl#main',
'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
- 'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--enable-reuse', '--on-error=continue', '--eval-timeout=20',
+ 'command': ['arvados-cwl-runner', '--local', '--api=containers',
+ '--no-log-timestamps', '--disable-validate',
+ '--eval-timeout=20', '--thread-count=4',
+ '--enable-reuse', '--debug', '--on-error=continue',
'/var/lib/cwl/workflow/expect_arvworkflow.cwl#main', '/var/lib/cwl/cwl.input.json'],
'cwd': '/var/spool/cwl',
'runtime_constraints': {
'output_path': '/var/spool/cwl',
'name': 'a test workflow',
'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
- 'command': ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--enable-reuse', '--on-error=continue', '--eval-timeout=20',
+ 'command': ['arvados-cwl-runner', '--local', '--api=containers',
+ '--no-log-timestamps', '--disable-validate',
+ '--eval-timeout=20', '--thread-count=4',
+ '--enable-reuse', '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
'cwd': '/var/spool/cwl',
'runtime_constraints': {
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["owner_uuid"] = project_uuid
- expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--enable-reuse', '--on-error=continue', '--project-uuid='+project_uuid, '--eval-timeout=20',
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
+ '--no-log-timestamps', '--disable-validate',
+ "--eval-timeout=20", "--thread-count=4",
+ '--enable-reuse', '--debug', '--on-error=continue',
+ '--project-uuid='+project_uuid,
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.api.container_requests().create.assert_called_with(
logging.exception("")
expect_container = copy.deepcopy(stubs.expect_container_spec)
- expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers', '--no-log-timestamps',
- '--enable-reuse', '--on-error=continue', '--eval-timeout=60.0',
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
+ '--no-log-timestamps', '--disable-validate',
+ '--eval-timeout=60.0', '--thread-count=4',
+ '--enable-reuse', '--debug', '--on-error=continue',
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
+
+ stubs.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher(expect_container))
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
+
+ @stubs
+ def test_submit_container_thread_count(self, stubs):
+ project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+ capture_stdout = cStringIO.StringIO()
+ try:
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug", "--thread-count=20",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 0)
+ except:
+ logging.exception("")
+
+ expect_container = copy.deepcopy(stubs.expect_container_spec)
+ expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
+ '--no-log-timestamps', '--disable-validate',
+ '--eval-timeout=20', '--thread-count=20',
+ '--enable-reuse', '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.api.container_requests().create.assert_called_with(
"--local",
"--api=containers",
"--no-log-timestamps",
+ "--disable-validate",
+ "--eval-timeout=20",
+ '--thread-count=4',
"--enable-reuse",
+ '--debug',
"--on-error=continue",
- "--eval-timeout=20",
"/var/lib/cwl/workflow.json#main",
"/var/lib/cwl/cwl.input.json"
],
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import functools
+import mock
+import sys
+import unittest
+import json
+import logging
+import os
+import threading
+
+from arvados_cwl.task_queue import TaskQueue
+
+def success_task():
+ pass
+
+def fail_task():
+ raise Exception("Testing error handling")
+
+class TestTaskQueue(unittest.TestCase):
+ def test_tq(self):
+ tq = TaskQueue(threading.Lock(), 2)
+
+ self.assertIsNone(tq.error)
+
+ tq.add(success_task)
+ tq.add(success_task)
+ tq.add(success_task)
+ tq.add(success_task)
+
+ tq.join()
+
+ self.assertIsNone(tq.error)
+
+
+ def test_tq_error(self):
+ tq = TaskQueue(threading.Lock(), 2)
+
+ self.assertIsNone(tq.error)
+
+ tq.add(success_task)
+ tq.add(success_task)
+ tq.add(fail_task)
+ tq.add(success_task)
+
+ tq.join()
+
+ self.assertIsNotNone(tq.error)
--- /dev/null
+import arvados
+import sys
+import os
+
+if "JOB_UUID" in os.environ:
+ requested = arvados.api().jobs().get(uuid=os.environ["JOB_UUID"]).execute()["runtime_constraints"]["min_ram_mb_per_node"]
+else:
+ requested = arvados.api().containers().current().execute()["runtime_constraints"]["ram"]/(1024*1024)
+
+print("Requested %d expected %d" % (requested, int(sys.argv[1])))
+
+exit(0 if requested == int(sys.argv[1]) else 1)
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+class: Workflow
+cwlVersion: v1.0
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+inputs:
+ count:
+ type: int[]
+ default: [1, 2, 3, 4]
+ script:
+ type: File
+ default:
+ class: File
+ location: check_mem.py
+outputs:
+ out: []
+requirements:
+ SubworkflowFeatureRequirement: {}
+ ScatterFeatureRequirement: {}
+ InlineJavascriptRequirement: {}
+ StepInputExpressionRequirement: {}
+steps:
+ substep:
+ in:
+ count: count
+ script: script
+ out: []
+ hints:
+ - class: arv:RunInSingleContainer
+ - class: ResourceRequirement
+ ramMin: $(inputs.count*128)
+ - class: arv:APIRequirement
+ scatter: count
+ run:
+ class: Workflow
+ id: mysub
+ inputs:
+ count: int
+ script: File
+ outputs: []
+ steps:
+ sleep1:
+ in:
+ count: count
+ script: script
+ out: []
+ run:
+ class: CommandLineTool
+ id: subtool
+ inputs:
+ count:
+ type: int
+ script: File
+ outputs: []
+ arguments: [python, $(inputs.script), $(inputs.count * 128)]
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+class: Workflow
+cwlVersion: v1.0
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+inputs:
+ count:
+ type: int[]
+ default: [1, 2, 3, 4]
+ script:
+ type: File
+ default:
+ class: File
+ location: check_mem.py
+outputs:
+ out: []
+requirements:
+ SubworkflowFeatureRequirement: {}
+ ScatterFeatureRequirement: {}
+ InlineJavascriptRequirement: {}
+ StepInputExpressionRequirement: {}
+steps:
+ substep:
+ in:
+ count: count
+ script: script
+ out: []
+ hints:
+ - class: arv:RunInSingleContainer
+ - class: arv:APIRequirement
+ scatter: count
+ run:
+ class: Workflow
+ id: mysub
+ inputs:
+ count: int
+ script: File
+ outputs: []
+ hints:
+ - class: ResourceRequirement
+ ramMin: $(inputs.count*128)
+ steps:
+ sleep1:
+ in:
+ count: count
+ script: script
+ out: []
+ run:
+ class: CommandLineTool
+ id: subtool
+ inputs:
+ count:
+ type: int
+ script: File
+ outputs: []
+ arguments: [python, $(inputs.script), $(inputs.count * 128)]
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+class: Workflow
+cwlVersion: v1.0
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+inputs:
+ count:
+ type: int[]
+ default: [1, 2, 3, 4]
+ script:
+ type: File
+ default:
+ class: File
+ location: check_mem.py
+outputs:
+ out: []
+requirements:
+ SubworkflowFeatureRequirement: {}
+ ScatterFeatureRequirement: {}
+ InlineJavascriptRequirement: {}
+ StepInputExpressionRequirement: {}
+steps:
+ substep:
+ in:
+ count: count
+ script: script
+ out: []
+ hints:
+ - class: arv:RunInSingleContainer
+ - class: arv:APIRequirement
+ scatter: count
+ run:
+ class: Workflow
+ id: mysub
+ inputs:
+ count: int
+ script: File
+ outputs: []
+ steps:
+ sleep1:
+ in:
+ count: count
+ script: script
+ out: []
+ run:
+ class: CommandLineTool
+ id: subtool
+ hints:
+ - class: ResourceRequirement
+ ramMin: $(inputs.count*128)
+ inputs:
+ count:
+ type: int
+ script: File
+ outputs: []
+ arguments: [python, $(inputs.script), $(inputs.count * 128)]
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+class: Workflow
+cwlVersion: v1.0
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+inputs:
+ count:
+ type: int[]
+ default: [1, 2, 3, 4]
+ script:
+ type: File
+ default:
+ class: File
+ location: check_mem.py
+outputs:
+ out: []
+requirements:
+ SubworkflowFeatureRequirement: {}
+ ScatterFeatureRequirement: {}
+ InlineJavascriptRequirement: {}
+ StepInputExpressionRequirement: {}
+steps:
+ substep:
+ in:
+ count: count
+ script: script
+ out: []
+ hints:
+ - class: arv:RunInSingleContainer
+ - class: arv:APIRequirement
+ scatter: count
+ run:
+ class: Workflow
+ id: mysub
+ inputs:
+ count: int
+ script: File
+ outputs: []
+ steps:
+ sleep1:
+ in:
+ count: count
+ script: script
+ out: []
+ run:
+ class: CommandLineTool
+ id: subtool
+ hints:
+ - class: ResourceRequirement
+ ramMin: 128
+ inputs:
+ count:
+ type: int
+ script: File
+ outputs: []
+ arguments: [python, $(inputs.script), "128"]
c.Check(f.Close(), check.IsNil)
m, err := s.fs.MarshalManifest(".")
+ c.Assert(err, check.IsNil)
c.Check(m, check.Matches, `. 37b51d194a7513e45b56f6524f2d51f2\+3\+\S+ 0:3:new-file\\0401\n./dir1 .* 3:3:bar 0:3:foo\n`)
}
c.Check(err, check.IsNil)
pos, err = f.Seek(0, io.SeekCurrent)
c.Check(pos, check.Equals, int64(18))
+ c.Check(err, check.IsNil)
pos, err = f.Seek(-18, io.SeekCurrent)
+ c.Check(pos, check.Equals, int64(0))
c.Check(err, check.IsNil)
n, err = io.ReadFull(f, buf)
c.Check(n, check.Equals, 18)
// truncate to current size
err = f.Truncate(18)
+ c.Check(err, check.IsNil)
f2.Seek(0, io.SeekStart)
buf2, err = ioutil.ReadAll(f2)
c.Check(err, check.IsNil)
// shrink to block/extent boundary
err = f.Truncate(32)
+ c.Check(err, check.IsNil)
f2.Seek(0, io.SeekStart)
buf2, err = ioutil.ReadAll(f2)
c.Check(err, check.IsNil)
// shrink to partial block/extent
err = f.Truncate(15)
+ c.Check(err, check.IsNil)
f2.Seek(0, io.SeekStart)
buf2, err = ioutil.ReadAll(f2)
c.Check(err, check.IsNil)
checkSize := func(size int64) {
fi, err := f.Stat()
+ c.Assert(err, check.IsNil)
c.Check(fi.Size(), check.Equals, size)
f, err := fs.OpenFile("test", os.O_CREATE|os.O_RDWR, 0755)
c.Assert(err, check.IsNil)
defer f.Close()
fi, err = f.Stat()
+ c.Check(err, check.IsNil)
c.Check(fi.Size(), check.Equals, size)
pos, err := f.Seek(0, io.SeekEnd)
+ c.Check(err, check.IsNil)
c.Check(pos, check.Equals, size)
}
expect := map[string][]byte{
"0": nil,
- "00": []byte{},
- "one": []byte{1},
+ "00": {},
+ "one": {1},
"dir/0": nil,
- "dir/two": []byte{1, 2},
+ "dir/two": {1, 2},
"dir/zero": nil,
"dir/zerodir/zero": nil,
"zero/zero/zero": nil,
c.Assert(err, check.IsNil)
for name, data := range expect {
- f, err := persisted.Open("bogus-" + name)
+ _, err = persisted.Open("bogus-" + name)
c.Check(err, check.NotNil)
- f, err = persisted.Open(name)
+ f, err := persisted.Open(name)
c.Assert(err, check.IsNil)
if data == nil {
c.Check(n, check.Equals, 1)
c.Check(buf[:1], check.DeepEquals, []byte{1})
pos, err = f.Seek(0, io.SeekCurrent)
+ c.Assert(err, check.IsNil)
c.Check(pos, check.Equals, int64(1))
f.Write([]byte{4, 5, 6})
pos, err = f.Seek(0, io.SeekCurrent)
+ c.Assert(err, check.IsNil)
c.Check(pos, check.Equals, int64(6))
f.Seek(0, io.SeekStart)
n, err = f.Read(buf)
c.Check(pos, check.Equals, int64(3))
f.Write([]byte{7, 8, 9})
pos, err = f.Seek(0, io.SeekCurrent)
+ c.Check(err, check.IsNil)
c.Check(pos, check.Equals, int64(9))
f.Close()
for {
if err := os.Chdir("sdk/python/tests"); err == nil {
pythonTestDir, err = os.Getwd()
+ if err != nil {
+ log.Fatal(err)
+ }
return
}
if parent, err := os.Getwd(); err != nil || parent == "/" {
c.Check(offset, check.Equals, int64(a))
buf := make([]byte, b-a)
n, err := io.ReadFull(rdr, buf)
+ c.Check(err, check.IsNil)
c.Check(n, check.Equals, b-a)
c.Check(string(buf), check.Equals, want[a:b])
}
arv.DiscoveryDoc["defaultCollectionReplication"] = 1.0
kc, err = MakeKeepClient(arv)
+ c.Check(err, IsNil)
c.Assert(kc.Want_replicas, Equals, 1)
}
include examples/shellinabox
include lib/libpam_arvados.py
include pam-configs/arvados
+include arvados_version.py
\ No newline at end of file
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import subprocess
+import time
+import os
+import re
+
+def git_latest_tag():
+ gitinfo = subprocess.check_output(
+ ['git', 'describe', '--abbrev=0']).strip()
+ return str(gitinfo.decode('utf-8'))
+
+def git_timestamp_tag():
+ gitinfo = subprocess.check_output(
+ ['git', 'log', '--first-parent', '--max-count=1',
+ '--format=format:%ct', '.']).strip()
+ return str(time.strftime('.%Y%m%d%H%M%S', time.gmtime(int(gitinfo))))
+
+def save_version(setup_dir, module, v):
+ with open(os.path.join(setup_dir, module, "_version.py"), 'w') as fp:
+ return fp.write("__version__ = '%s'\n" % v)
+
+def read_version(setup_dir, module):
+ with open(os.path.join(setup_dir, module, "_version.py"), 'r') as fp:
+ return re.match("__version__ = '(.*)'$", fp.read()).groups()[0]
+
+def get_version(setup_dir, module):
+ env_version = os.environ.get("ARVADOS_BUILDING_VERSION")
+
+ if env_version:
+ save_version(setup_dir, module, env_version)
+ else:
+ try:
+ save_version(setup_dir, module, git_latest_tag() + git_timestamp_tag())
+ except subprocess.CalledProcessError:
+ pass
+
+ return read_version(setup_dir, module)
#
# SPDX-License-Identifier: Apache-2.0
+from __future__ import absolute_import
import glob
import os
import sys
-import setuptools.command.egg_info as egg_info_cmd
+import re
import subprocess
from setuptools import setup, find_packages
SETUP_DIR = os.path.dirname(__file__) or '.'
README = os.path.join(SETUP_DIR, 'README.rst')
-tagger = egg_info_cmd.egg_info
-version = os.environ.get("ARVADOS_BUILDING_VERSION")
-if not version:
- version = "0.1"
- try:
- import gittaggers
- tagger = gittaggers.EggInfoFromGit
- except ImportError:
- pass
+import arvados_version
+version = arvados_version.get_version(SETUP_DIR, "arvados_pam")
+
+short_tests_only = False
+if '--short-tests-only' in sys.argv:
+ short_tests_only = True
+ sys.argv.remove('--short-tests-only')
setup(name='arvados-pam',
version=version,
],
test_suite='tests',
tests_require=['pbr<1.7.0', 'mock>=1.0', 'python-pam'],
- zip_safe=False,
- cmdclass={'egg_info': tagger},
+ zip_safe=False
)
include LICENSE-2.0.txt
include README.rst
+include arvados_version.py
\ No newline at end of file
1. Add this Arvados repository to your sources list::
- deb http://apt.arvados.org/ wheezy main
+ deb http://apt.arvados.org/ stretch main
2. Update your package list.
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: Apache-2.0
-
-import pkg_resources
-
-__version__ = pkg_resources.require('arvados-python-client')[0].version
import argparse
import errno
import os
+import logging
+import signal
+from future.utils import listitems, listvalues
+import sys
def _pos_int(s):
num = int(s)
if mode is not None:
os.chmod(abs_path, mode)
return abs_path
+
+CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
+
+def exit_signal_handler(sigcode, frame):
+ logging.getLogger('arvados').error("Caught signal {}, exiting.".format(sigcode))
+ sys.exit(-sigcode)
+
+def install_signal_handlers():
+ global orig_signal_handlers
+ orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
+ for sigcode in CAUGHT_SIGNALS}
+
+def restore_signal_handlers():
+ for sigcode, orig_handler in listitems(orig_signal_handlers):
+ signal.signal(sigcode, orig_handler)
import tempfile
import shutil
import _strptime
+import fcntl
from operator import itemgetter
from stat import *
except STAT_CACHE_ERRORS:
pass # We won't resume from this cache. No big deal.
+def get_cache_dir():
+ return arv_cmd.make_home_conf_dir(
+ os.path.join('.cache', 'arvados', 'docker'), 0o700)
+
def prep_image_file(filename):
# Return a file object ready to save a Docker image,
# and a boolean indicating whether or not we need to actually save the
# image (False if a cached save is available).
- cache_dir = arv_cmd.make_home_conf_dir(
- os.path.join('.cache', 'arvados', 'docker'), 0o700)
+ cache_dir = get_cache_dir()
if cache_dir is None:
image_file = tempfile.NamedTemporaryFile(suffix='.tar')
need_save = True
select=['portable_data_hash'],
).execute()['items'][0]['portable_data_hash']
-def main(arguments=None, stdout=sys.stdout):
+def main(arguments=None, stdout=sys.stdout, install_sig_handlers=True, api=None):
args = arg_parser.parse_args(arguments)
- api = arvados.api('v1')
+ if api is None:
+ api = arvados.api('v1')
if args.image is None or args.image == 'images':
fmt = "{:30} {:10} {:12} {:29} {:20}\n"
else:
collection_name = args.name
- if not args.force:
- # Check if this image is already in Arvados.
-
- # Project where everything should be owned
- if args.project_uuid:
- parent_project_uuid = args.project_uuid
- else:
- parent_project_uuid = api.users().current().execute(
- num_retries=args.retries)['uuid']
-
- # Find image hash tags
- existing_links = _get_docker_links(
- api, args.retries,
- filters=[['link_class', '=', 'docker_image_hash'],
- ['name', '=', image_hash]])
- if existing_links:
- # get readable collections
- collections = api.collections().list(
- filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]],
- select=["uuid", "owner_uuid", "name", "manifest_text"]
- ).execute(num_retries=args.retries)['items']
-
- if collections:
- # check for repo+tag links on these collections
- if image_repo_tag:
- existing_repo_tag = _get_docker_links(
- api, args.retries,
- filters=[['link_class', '=', 'docker_image_repo+tag'],
- ['name', '=', image_repo_tag],
- ['head_uuid', 'in', [c["uuid"] for c in collections]]])
- else:
- existing_repo_tag = []
-
- try:
- coll_uuid = next(items_owned_by(parent_project_uuid, collections))['uuid']
- except StopIteration:
- # create new collection owned by the project
- coll_uuid = api.collections().create(
- body={"manifest_text": collections[0]['manifest_text'],
- "name": collection_name,
- "owner_uuid": parent_project_uuid},
- ensure_unique_name=True
- ).execute(num_retries=args.retries)['uuid']
-
- link_base = {'owner_uuid': parent_project_uuid,
- 'head_uuid': coll_uuid,
- 'properties': existing_links[0]['properties']}
-
- if not any(items_owned_by(parent_project_uuid, existing_links)):
- # create image link owned by the project
- make_link(api, args.retries,
- 'docker_image_hash', image_hash, **link_base)
-
- if image_repo_tag and not any(items_owned_by(parent_project_uuid, existing_repo_tag)):
- # create repo+tag link owned by the project
- make_link(api, args.retries, 'docker_image_repo+tag',
- image_repo_tag, **link_base)
-
- stdout.write(coll_uuid + "\n")
-
- sys.exit(0)
-
- # Open a file for the saved image, and write it if needed.
+ # Acquire a lock so that only one arv-keepdocker process will
+ # dump/upload a particular docker image at a time. Do this before
+ # checking if the image already exists in Arvados so that if there
+ # is an upload already underway, when that upload completes and
+ # this process gets a turn, it will discover the Docker image is
+ # already available and exit quickly.
outfile_name = '{}.tar'.format(image_hash)
- image_file, need_save = prep_image_file(outfile_name)
- if need_save:
- save_image(image_hash, image_file)
+ lockfile_name = '{}.lock'.format(outfile_name)
+ lockfile = None
+ cache_dir = get_cache_dir()
+ if cache_dir:
+ lockfile = open(os.path.join(cache_dir, lockfile_name), 'w+')
+ fcntl.flock(lockfile, fcntl.LOCK_EX)
- # Call arv-put with switches we inherited from it
- # (a.k.a., switches that aren't our own).
- put_args = keepdocker_parser.parse_known_args(arguments)[1]
-
- if args.name is None:
- put_args += ['--name', collection_name]
+ try:
+ if not args.force:
+ # Check if this image is already in Arvados.
- coll_uuid = arv_put.main(
- put_args + ['--filename', outfile_name, image_file.name], stdout=stdout).strip()
+ # Project where everything should be owned
+ parent_project_uuid = args.project_uuid or api.users().current().execute(
+ num_retries=args.retries)['uuid']
- # Read the image metadata and make Arvados links from it.
- image_file.seek(0)
- image_tar = tarfile.open(fileobj=image_file)
- image_hash_type, _, raw_image_hash = image_hash.rpartition(':')
- if image_hash_type:
- json_filename = raw_image_hash + '.json'
- else:
- json_filename = raw_image_hash + '/json'
- json_file = image_tar.extractfile(image_tar.getmember(json_filename))
- image_metadata = json.load(json_file)
- json_file.close()
- image_tar.close()
- link_base = {'head_uuid': coll_uuid, 'properties': {}}
- if 'created' in image_metadata:
- link_base['properties']['image_timestamp'] = image_metadata['created']
- if args.project_uuid is not None:
- link_base['owner_uuid'] = args.project_uuid
-
- make_link(api, args.retries, 'docker_image_hash', image_hash, **link_base)
- if image_repo_tag:
- make_link(api, args.retries,
- 'docker_image_repo+tag', image_repo_tag, **link_base)
-
- # Clean up.
- image_file.close()
- for filename in [stat_cache_name(image_file), image_file.name]:
- try:
- os.unlink(filename)
- except OSError as error:
- if error.errno != errno.ENOENT:
- raise
+ # Find image hash tags
+ existing_links = _get_docker_links(
+ api, args.retries,
+ filters=[['link_class', '=', 'docker_image_hash'],
+ ['name', '=', image_hash]])
+ if existing_links:
+ # get readable collections
+ collections = api.collections().list(
+ filters=[['uuid', 'in', [link['head_uuid'] for link in existing_links]]],
+ select=["uuid", "owner_uuid", "name", "manifest_text"]
+ ).execute(num_retries=args.retries)['items']
+
+ if collections:
+ # check for repo+tag links on these collections
+ if image_repo_tag:
+ existing_repo_tag = _get_docker_links(
+ api, args.retries,
+ filters=[['link_class', '=', 'docker_image_repo+tag'],
+ ['name', '=', image_repo_tag],
+ ['head_uuid', 'in', [c["uuid"] for c in collections]]])
+ else:
+ existing_repo_tag = []
+
+ try:
+ coll_uuid = next(items_owned_by(parent_project_uuid, collections))['uuid']
+ except StopIteration:
+ # create new collection owned by the project
+ coll_uuid = api.collections().create(
+ body={"manifest_text": collections[0]['manifest_text'],
+ "name": collection_name,
+ "owner_uuid": parent_project_uuid},
+ ensure_unique_name=True
+ ).execute(num_retries=args.retries)['uuid']
+
+ link_base = {'owner_uuid': parent_project_uuid,
+ 'head_uuid': coll_uuid,
+ 'properties': existing_links[0]['properties']}
+
+ if not any(items_owned_by(parent_project_uuid, existing_links)):
+ # create image link owned by the project
+ make_link(api, args.retries,
+ 'docker_image_hash', image_hash, **link_base)
+
+ if image_repo_tag and not any(items_owned_by(parent_project_uuid, existing_repo_tag)):
+ # create repo+tag link owned by the project
+ make_link(api, args.retries, 'docker_image_repo+tag',
+ image_repo_tag, **link_base)
+
+ stdout.write(coll_uuid + "\n")
+
+ sys.exit(0)
+
+ # Open a file for the saved image, and write it if needed.
+ image_file, need_save = prep_image_file(outfile_name)
+ if need_save:
+ save_image(image_hash, image_file)
+
+ # Call arv-put with switches we inherited from it
+ # (a.k.a., switches that aren't our own).
+ put_args = keepdocker_parser.parse_known_args(arguments)[1]
+
+ if args.name is None:
+ put_args += ['--name', collection_name]
+
+ coll_uuid = arv_put.main(
+ put_args + ['--filename', outfile_name, image_file.name], stdout=stdout,
+ install_sig_handlers=install_sig_handlers).strip()
+
+ # Read the image metadata and make Arvados links from it.
+ image_file.seek(0)
+ image_tar = tarfile.open(fileobj=image_file)
+ image_hash_type, _, raw_image_hash = image_hash.rpartition(':')
+ if image_hash_type:
+ json_filename = raw_image_hash + '.json'
+ else:
+ json_filename = raw_image_hash + '/json'
+ json_file = image_tar.extractfile(image_tar.getmember(json_filename))
+ image_metadata = json.load(json_file)
+ json_file.close()
+ image_tar.close()
+ link_base = {'head_uuid': coll_uuid, 'properties': {}}
+ if 'created' in image_metadata:
+ link_base['properties']['image_timestamp'] = image_metadata['created']
+ if args.project_uuid is not None:
+ link_base['owner_uuid'] = args.project_uuid
+
+ make_link(api, args.retries, 'docker_image_hash', image_hash, **link_base)
+ if image_repo_tag:
+ make_link(api, args.retries,
+ 'docker_image_repo+tag', image_repo_tag, **link_base)
+
+ # Clean up.
+ image_file.close()
+ for filename in [stat_cache_name(image_file), image_file.name]:
+ try:
+ os.unlink(filename)
+ except OSError as error:
+ if error.errno != errno.ENOENT:
+ raise
+ finally:
+ if lockfile is not None:
+ # Closing the lockfile unlocks it.
+ lockfile.close()
if __name__ == '__main__':
main()
import arvados.commands._util as arv_cmd
-CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
api_client = None
upload_opts = argparse.ArgumentParser(add_help=False)
outfile.write(progress_func(bytes_written, bytes_expected))
return write_progress
-def exit_signal_handler(sigcode, frame):
- logging.getLogger('arvados.arv_put').error("Caught signal {}, exiting.".format(sigcode))
- sys.exit(-sigcode)
-
def desired_project_uuid(api_client, project_uuid, num_retries):
if not project_uuid:
query = api_client.users().current()
raise ValueError("Not a valid project UUID: {}".format(project_uuid))
return query.execute(num_retries=num_retries)['uuid']
-def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
+def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr,
+ install_sig_handlers=True):
global api_client
args = parse_arguments(arguments)
if api_client is None:
api_client = arvados.api('v1', request_id=request_id)
- # Install our signal handler for each code in CAUGHT_SIGNALS, and save
- # the originals.
- orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
- for sigcode in CAUGHT_SIGNALS}
+ if install_sig_handlers:
+ arv_cmd.install_signal_handlers()
# Determine the name to use
if args.name:
if not output.endswith('\n'):
stdout.write('\n')
- for sigcode, orig_handler in listitems(orig_signal_handlers):
- signal.signal(sigcode, orig_handler)
+ if install_sig_handlers:
+ arv_cmd.restore_signal_handlers()
if status != 0:
sys.exit(status)
"""
- def __init__(self, apiconfig=None, keep_params={}):
+ def __init__(self, apiconfig=None, keep_params={}, api_params={}):
if apiconfig is None:
apiconfig = config.settings()
self.apiconfig = copy.copy(apiconfig)
+ self.api_params = api_params
self.local = threading.local()
self.keep = keep.KeepClient(api_client=self, **keep_params)
def localapi(self):
if 'api' not in self.local.__dict__:
- self.local.api = arvados.api_from_config('v1', apiconfig=self.apiconfig)
+ self.local.api = arvados.api_from_config('v1', apiconfig=self.apiconfig,
+ **self.api_params)
return self.local.api
def __getattr__(self, name):
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import subprocess
+import time
+import os
+import re
+
+def git_latest_tag():
+ gitinfo = subprocess.check_output(
+ ['git', 'describe', '--abbrev=0']).strip()
+ return str(gitinfo.decode('utf-8'))
+
+def git_timestamp_tag():
+ gitinfo = subprocess.check_output(
+ ['git', 'log', '--first-parent', '--max-count=1',
+ '--format=format:%ct', '.']).strip()
+ return str(time.strftime('.%Y%m%d%H%M%S', time.gmtime(int(gitinfo))))
+
+def save_version(setup_dir, module, v):
+ with open(os.path.join(setup_dir, module, "_version.py"), 'w') as fp:
+ return fp.write("__version__ = '%s'\n" % v)
+
+def read_version(setup_dir, module):
+ with open(os.path.join(setup_dir, module, "_version.py"), 'r') as fp:
+ return re.match("__version__ = '(.*)'$", fp.read()).groups()[0]
+
+def get_version(setup_dir, module):
+ env_version = os.environ.get("ARVADOS_BUILDING_VERSION")
+
+ if env_version:
+ save_version(setup_dir, module, env_version)
+ else:
+ try:
+ save_version(setup_dir, module, git_latest_tag() + git_timestamp_tag())
+ except subprocess.CalledProcessError:
+ pass
+
+ return read_version(setup_dir, module)
If a build tag has already been set (e.g., "egg_info -b", building
from source package), leave it alone.
"""
+ def git_latest_tag(self):
+ gitinfo = subprocess.check_output(
+ ['git', 'describe', '--abbrev=0']).strip()
+ return str(gitinfo.decode('utf-8'))
+
def git_timestamp_tag(self):
gitinfo = subprocess.check_output(
['git', 'log', '--first-parent', '--max-count=1',
def tags(self):
if self.tag_build is None:
- self.tag_build = self.git_timestamp_tag()
+ self.tag_build = self.git_latest_tag()+self.git_timestamp_tag()
return egg_info.tags(self)
#
# SPDX-License-Identifier: Apache-2.0
+from __future__ import absolute_import
import os
import sys
-import setuptools.command.egg_info as egg_info_cmd
+import re
from setuptools import setup, find_packages
SETUP_DIR = os.path.dirname(__file__) or '.'
README = os.path.join(SETUP_DIR, 'README.rst')
-tagger = egg_info_cmd.egg_info
-version = os.environ.get("ARVADOS_BUILDING_VERSION")
-if not version:
- version = "0.1"
- try:
- import gittaggers
- tagger = gittaggers.EggInfoFromGit
- except ImportError:
- pass
+import arvados_version
+version = arvados_version.get_version(SETUP_DIR, "arvados")
short_tests_only = False
if '--short-tests-only' in sys.argv:
],
test_suite='tests',
tests_require=['pbr<1.7.0', 'mock>=1.0', 'PyYAML'],
- zip_safe=False,
- cmdclass={'egg_info': tagger},
+ zip_safe=False
)
exit
end
+git_latest_tag = `git describe --abbrev=0`
+git_latest_tag = git_latest_tag.encode('utf-8').strip
git_timestamp, git_hash = `git log -n1 --first-parent --format=%ct:%H .`.chomp.split(":")
git_timestamp = Time.at(git_timestamp.to_i).utc
Gem::Specification.new do |s|
s.name = 'arvados'
- s.version = "0.1.#{git_timestamp.strftime('%Y%m%d%H%M%S')}"
+ s.version = "#{git_latest_tag}.#{git_timestamp.strftime('%Y%m%d%H%M%S')}"
s.date = git_timestamp.strftime("%Y-%m-%d")
s.summary = "Arvados client library"
s.description = "Arvados client library, git commit #{git_hash}"
#
# SPDX-License-Identifier: AGPL-3.0
+require 'arvados_model_updates'
require 'has_uuid'
require 'record_filters'
require 'serializers'
class ArvadosModel < ActiveRecord::Base
self.abstract_class = true
+ include ArvadosModelUpdates
include CurrentApiClient # current_user, current_api_client, etc.
include DbCurrentTime
extend RecordFilters
self.updated_at = current_time
self.owner_uuid ||= current_default_owner if self.respond_to? :owner_uuid=
self.modified_at = current_time
- self.modified_by_user_uuid = current_user ? current_user.uuid : nil
+ if !anonymous_updater
+ self.modified_by_user_uuid = current_user ? current_user.uuid : nil
+ end
self.modified_by_client_uuid = current_api_client ? current_api_client.uuid : nil
true
end
# SPDX-License-Identifier: AGPL-3.0
require 'arvados/keep'
-require 'sweep_trashed_collections'
+require 'sweep_trashed_objects'
require 'trashable'
class Collection < ArvadosModel
end
def self.where *args
- SweepTrashedCollections.sweep_if_stale
+ SweepTrashedObjects.sweep_if_stale
super
end
require 'safe_json'
class Container < ArvadosModel
+ include ArvadosModelUpdates
include HasUuid
include KindAndEtag
include CommonApiTemplate
c = Container.create! c_attrs
retryable_requests.each do |cr|
cr.with_lock do
- # Use row locking because this increments container_count
- cr.container_uuid = c.uuid
- cr.save!
+ leave_modified_by_user_alone do
+ # Use row locking because this increments container_count
+ cr.container_uuid = c.uuid
+ cr.save!
+ end
end
end
end
# Notify container requests associated with this container
ContainerRequest.where(container_uuid: uuid,
state: ContainerRequest::Committed).each do |cr|
- cr.finalize!
+ leave_modified_by_user_alone do
+ cr.finalize!
+ end
end
# Cancel outstanding container requests made by this container.
includes(:container).
where(requesting_container_uuid: uuid,
state: ContainerRequest::Committed).each do |cr|
- cr.update_attributes!(priority: 0)
- cr.container.reload
- if cr.container.state == Container::Queued || cr.container.state == Container::Locked
- # If the child container hasn't started yet, finalize the
- # child CR now instead of leaving it "on hold", i.e.,
- # Queued with priority 0. (OTOH, if the child is already
- # running, leave it alone so it can get cancelled the
- # usual way, get a copy of the log collection, etc.)
- cr.update_attributes!(state: ContainerRequest::Final)
+ leave_modified_by_user_alone do
+ cr.update_attributes!(priority: 0)
+ cr.container.reload
+ if cr.container.state == Container::Queued || cr.container.state == Container::Locked
+ # If the child container hasn't started yet, finalize the
+ # child CR now instead of leaving it "on hold", i.e.,
+ # Queued with priority 0. (OTOH, if the child is already
+ # running, leave it alone so it can get cancelled the
+ # usual way, get a copy of the log collection, etc.)
+ cr.update_attributes!(state: ContainerRequest::Final)
+ end
end
end
end
end
end
-
end
require 'whitelist_update'
class ContainerRequest < ArvadosModel
+ include ArvadosModelUpdates
include HasUuid
include KindAndEtag
include CommonApiTemplate
if state == Committed && Container.find_by_uuid(container_uuid).final?
reload
act_as_system_user do
- finalize!
+ leave_modified_by_user_alone do
+ finalize!
+ end
end
end
end
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+module ArvadosModelUpdates
+ # ArvadosModel checks this to decide whether it should update the
+ # 'modified_by_user_uuid' field.
+ def anonymous_updater
+ Thread.current[:anonymous_updater] || false
+ end
+
+ def leave_modified_by_user_alone
+ anonymous_updater_was = anonymous_updater
+ begin
+ Thread.current[:anonymous_updater] = true
+ yield
+ ensure
+ Thread.current[:anonymous_updater] = anonymous_updater_was
+ end
+ end
+end
end
def act_as_user user
- #auth_was = Thread.current[:api_client_authorization]
user_was = Thread.current[:user]
Thread.current[:user] = user
- #Thread.current[:api_client_authorization] = ApiClientAuthorization.
- # where('user_id=? and scopes is null', user.id).
- # order('expires_at desc').
- # first
begin
yield
ensure
Thread.current[:user] = user_was
- #Thread.current[:api_client_authorization] = auth_was
end
end
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-require 'current_api_client'
-
-module SweepTrashedCollections
- extend CurrentApiClient
-
- def self.sweep_now
- act_as_system_user do
- Collection.
- where('delete_at is not null and delete_at < statement_timestamp()').
- destroy_all
- Collection.
- where('is_trashed = false and trash_at < statement_timestamp()').
- update_all('is_trashed = true')
- end
- end
-
- def self.sweep_if_stale
- return if Rails.configuration.trash_sweep_interval <= 0
- exp = Rails.configuration.trash_sweep_interval.seconds
- need = false
- Rails.cache.fetch('SweepTrashedCollections', expires_in: exp) do
- need = true
- end
- if need
- Thread.new do
- Thread.current.abort_on_exception = false
- begin
- sweep_now
- rescue => e
- Rails.logger.error "#{e.class}: #{e}\n#{e.backtrace.join("\n\t")}"
- ensure
- ActiveRecord::Base.connection.close
- end
- end
- end
- end
-end
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+require 'current_api_client'
+
+module SweepTrashedObjects
+ extend CurrentApiClient
+
+ def self.delete_project_and_contents(p_uuid)
+ p = Group.find_by_uuid(p_uuid)
+ if !p || p.group_class != 'project'
+ raise "can't sweep group '#{p_uuid}', it may not exist or not be a project"
+ end
+ # First delete sub projects
+ Group.where({group_class: 'project', owner_uuid: p_uuid}).each do |sub_project|
+ delete_project_and_contents(sub_project.uuid)
+ end
+ # Next, iterate over all tables which have owner_uuid fields, with some
+ # exceptions, and delete records owned by this project
+ skipped_classes = ['Group', 'User']
+ ActiveRecord::Base.descendants.reject(&:abstract_class?).each do |klass|
+ if !skipped_classes.include?(klass.name) && klass.columns.collect(&:name).include?('owner_uuid')
+ klass.where({owner_uuid: p_uuid}).destroy_all
+ end
+ end
+ # Finally delete the project itself
+ p.destroy
+ end
+
+ def self.sweep_now
+ act_as_system_user do
+ # Sweep trashed collections
+ Collection.
+ where('delete_at is not null and delete_at < statement_timestamp()').
+ destroy_all
+ Collection.
+ where('is_trashed = false and trash_at < statement_timestamp()').
+ update_all('is_trashed = true')
+
+ # Sweep trashed projects and their contents
+ Group.
+ where({group_class: 'project'}).
+ where('delete_at is not null and delete_at < statement_timestamp()').each do |project|
+ delete_project_and_contents(project.uuid)
+ end
+ Group.
+ where({group_class: 'project'}).
+ where('is_trashed = false and trash_at < statement_timestamp()').
+ update_all('is_trashed = true')
+ end
+ end
+
+ def self.sweep_if_stale
+ return if Rails.configuration.trash_sweep_interval <= 0
+ exp = Rails.configuration.trash_sweep_interval.seconds
+ need = false
+ Rails.cache.fetch('SweepTrashedObjects', expires_in: exp) do
+ need = true
+ end
+ if need
+ Thread.new do
+ Thread.current.abort_on_exception = false
+ begin
+ sweep_now
+ rescue => e
+ Rails.logger.error "#{e.class}: #{e}\n#{e.backtrace.join("\n\t")}"
+ ensure
+ ActiveRecord::Base.connection.close
+ end
+ end
+ end
+ end
+end
Get an existing anonymous user token. If no such token exists \
or if this option is omitted, a new token is created and returned.
eos
+ opt :token, "token to create (optional)", :type => :string
end
get_existing = opts[:get]
+supplied_token = opts[:token]
require File.dirname(__FILE__) + '/../config/environment'
include ApplicationHelper
act_as_system_user
-def create_api_client_auth
+def create_api_client_auth(supplied_token=nil)
+
+ # If token is supplied, see if it exists
+ if supplied_token
+ api_client_auth = ApiClientAuthorization.
+ where(api_token: supplied_token).
+ first
+ if !api_client_auth
+ # fall through to create a token
+ else
+ raise "Token exists, aborting!"
+ end
+ end
+
api_client_auth = ApiClientAuthorization.
new(user: anonymous_user,
api_client_id: 0,
expires_at: Time.now + 100.years,
- scopes: ['GET /'])
+ scopes: ['GET /'],
+ api_token: supplied_token)
api_client_auth.save!
api_client_auth.reload
+ api_client_auth
end
if get_existing
# either not a get or no api_client_auth was found
if !api_client_auth
- api_client_auth = create_api_client_auth
+ api_client_auth = create_api_client_auth(supplied_token)
end
# print it to the console
name: trashed project
group_class: project
trash_at: 2001-01-01T00:00:00Z
- delete_at: 2038-03-01T00:00:00Z
+ delete_at: 2008-03-01T00:00:00Z
is_trashed: true
modified_at: 2001-01-01T00:00:00Z
trash_at: 2001-01-01T00:00:00Z
delete_at: 2038-03-01T00:00:00Z
is_trashed: true
+ modified_at: 2001-01-01T00:00:00Z
+
+trashed_on_next_sweep:
+ uuid: zzzzz-j7d0g-soontobetrashed
+ owner_uuid: zzzzz-j7d0g-xurymjxw79nv3jz
+ name: soon to be trashed project
+ group_class: project
+ trash_at: 2001-01-01T00:00:00Z
+ delete_at: 2038-03-01T00:00:00Z
+ is_trashed: false
modified_at: 2001-01-01T00:00:00Z
\ No newline at end of file
state: Complete
script_parameters_digest: 99914b932bd37a50b983c5e7c90ae93b
+job_in_trashed_project:
+ uuid: zzzzz-8i9sb-subprojectjob02
+ created_at: 2014-10-15 12:00:00
+ owner_uuid: zzzzz-j7d0g-trashedproject2
+ log: ~
+ repository: active/foo
+ script: hash
+ script_version: 4fe459abe02d9b365932b8f5dc419439ab4e2577
+ state: Complete
+ script_parameters_digest: 99914b932bd37a50b983c5e7c90ae93b
+
running_will_be_completed:
uuid: zzzzz-8i9sb-rshmckwoma9pjh8
owner_uuid: zzzzz-j7d0g-v955i6s2oi1cbso
# SPDX-License-Identifier: AGPL-3.0
require 'test_helper'
-require 'sweep_trashed_collections'
+require 'sweep_trashed_objects'
class CollectionTest < ActiveSupport::TestCase
include DbCurrentTime
assert_includes(coll_uuids, collections(:docker_image).uuid)
end
- test "move to trash in SweepTrashedCollections" do
+ test "move collections to trash in SweepTrashedObjects" do
c = collections(:trashed_on_next_sweep)
refute_empty Collection.where('uuid=? and is_trashed=false', c.uuid)
assert_raises(ActiveRecord::RecordNotUnique) do
name: c.name)
end
end
- SweepTrashedCollections.sweep_now
+ SweepTrashedObjects.sweep_now
c = Collection.where('uuid=? and is_trashed=true', c.uuid).first
assert c
act_as_user users(:active) do
end
end
- test "delete in SweepTrashedCollections" do
+ test "delete collections in SweepTrashedObjects" do
uuid = 'zzzzz-4zz18-3u1p5umicfpqszp' # deleted_on_next_sweep
assert_not_empty Collection.where(uuid: uuid)
- SweepTrashedCollections.sweep_now
+ SweepTrashedObjects.sweep_now
assert_empty Collection.where(uuid: uuid)
end
- test "delete referring links in SweepTrashedCollections" do
+ test "delete referring links in SweepTrashedObjects" do
uuid = collections(:trashed_on_next_sweep).uuid
act_as_system_user do
Link.create!(head_uuid: uuid,
Collection.where(uuid: uuid).
update_all(is_trashed: true, trash_at: past, delete_at: past)
assert_not_empty Collection.where(uuid: uuid)
- SweepTrashedCollections.sweep_now
+ SweepTrashedObjects.sweep_now
assert_empty Collection.where(uuid: uuid)
end
end
test "Request is finalized when its container is cancelled" do
set_user_from_auth :active
cr = create_minimal_req!(priority: 1, state: "Committed", container_count_max: 1)
+ assert_equal users(:active).uuid, cr.modified_by_user_uuid
act_as_system_user do
Container.find_by_uuid(cr.container_uuid).
cr.reload
assert_equal "Final", cr.state
+ assert_equal users(:active).uuid, cr.modified_by_user_uuid
end
test "Request is finalized when its container is completed" do
cr = create_minimal_req!(owner_uuid: project.uuid,
priority: 1,
state: "Committed")
+ assert_equal users(:active).uuid, cr.modified_by_user_uuid
c = act_as_system_user do
c = Container.find_by_uuid(cr.container_uuid)
cr.reload
assert_equal "Final", cr.state
+ assert_equal users(:active).uuid, cr.modified_by_user_uuid
['output', 'log'].each do |out_type|
pdh = Container.find_by_uuid(cr.container_uuid).send(out_type)
assert_equal(1, Collection.where(portable_data_hash: pdh,
cr2 = create_minimal_req!
cr2.update_attributes!(priority: 10, state: "Committed", requesting_container_uuid: c.uuid, command: ["echo", "foo2"], container_count_max: 1)
cr2.reload
+ assert_equal users(:active).uuid, cr2.modified_by_user_uuid
c2 = Container.find_by_uuid cr2.container_uuid
assert_operator 0, :<, c2.priority
cr2.reload
assert_equal 0, cr2.priority
+ assert_equal users(:active).uuid, cr2.modified_by_user_uuid
c2.reload
assert_equal 0, c2.priority
assert g_foo.errors.messages[:owner_uuid].join(" ").match(/ownership cycle/)
end
- test "delete group hides contents" do
+ test "trash group hides contents" do
set_user_from_auth :active_trustedclient
g_foo = Group.create!(name: "foo")
assert Collection.readable_by(users(:active)).where(uuid: col.uuid).any?
end
- test "delete group" do
+ test "trash group" do
set_user_from_auth :active_trustedclient
g_foo = Group.create!(name: "foo")
end
- test "delete subgroup" do
+ test "trash subgroup" do
set_user_from_auth :active_trustedclient
g_foo = Group.create!(name: "foo")
assert Group.readable_by(users(:active), {:include_trash => true}).where(uuid: g_baz.uuid).any?
end
- test "delete subsubgroup" do
+ test "trash subsubgroup" do
set_user_from_auth :active_trustedclient
g_foo = Group.create!(name: "foo")
end
- test "delete group propagates to subgroups" do
+ test "trash group propagates to subgroups" do
set_user_from_auth :active_trustedclient
g_foo = groups(:trashed_project)
assert Group.readable_by(users(:active)).where(uuid: g_bar.uuid).any?
assert Collection.readable_by(users(:active)).where(uuid: col.uuid).any?
- # this one should still be deleted.
+ # this one should still be trashed.
assert Group.readable_by(users(:active)).where(uuid: g_baz.uuid).empty?
g_baz.update! is_trashed: false
assert User.readable_by(users(:admin)).where(uuid: u_bar.uuid).any?
end
+ test "move projects to trash in SweepTrashedObjects" do
+ p = groups(:trashed_on_next_sweep)
+ assert_empty Group.where('uuid=? and is_trashed=true', p.uuid)
+ SweepTrashedObjects.sweep_now
+ assert_not_empty Group.where('uuid=? and is_trashed=true', p.uuid)
+ end
+
+ test "delete projects and their contents in SweepTrashedObjects" do
+ g_foo = groups(:trashed_project)
+ g_bar = groups(:trashed_subproject)
+ g_baz = groups(:trashed_subproject3)
+ col = collections(:collection_in_trashed_subproject)
+ job = jobs(:job_in_trashed_project)
+ cr = container_requests(:cr_in_trashed_project)
+ # Save how many objects were before the sweep
+ user_nr_was = User.all.length
+ coll_nr_was = Collection.all.length
+ group_nr_was = Group.where('group_class<>?', 'project').length
+ project_nr_was = Group.where(group_class: 'project').length
+ cr_nr_was = ContainerRequest.all.length
+ job_nr_was = Job.all.length
+ assert_not_empty Group.where(uuid: g_foo.uuid)
+ assert_not_empty Group.where(uuid: g_bar.uuid)
+ assert_not_empty Group.where(uuid: g_baz.uuid)
+ assert_not_empty Collection.where(uuid: col.uuid)
+ assert_not_empty Job.where(uuid: job.uuid)
+ assert_not_empty ContainerRequest.where(uuid: cr.uuid)
+ SweepTrashedObjects.sweep_now
+ assert_empty Group.where(uuid: g_foo.uuid)
+ assert_empty Group.where(uuid: g_bar.uuid)
+ assert_empty Group.where(uuid: g_baz.uuid)
+ assert_empty Collection.where(uuid: col.uuid)
+ assert_empty Job.where(uuid: job.uuid)
+ assert_empty ContainerRequest.where(uuid: cr.uuid)
+ # No unwanted deletions should have happened
+ assert_equal user_nr_was, User.all.length
+ assert_equal coll_nr_was-2, # collection_in_trashed_subproject
+ Collection.all.length # & deleted_on_next_sweep collections
+ assert_equal group_nr_was, Group.where('group_class<>?', 'project').length
+ assert_equal project_nr_was-3, Group.where(group_class: 'project').length
+ assert_equal cr_nr_was-1, ContainerRequest.all.length
+ assert_equal job_nr_was-1, Job.all.length
+ end
end
ExecStart=/usr/bin/crunch-dispatch-slurm
Restart=always
RestartSec=1
+LimitNOFILE=1000000
# systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
StartLimitInterval=0
{"object_uuid", "=", container.UUID},
{"event_type", "=", "dispatch"},
}}, &ll)
+ c.Assert(err, IsNil)
c.Assert(len(ll.Items), Equals, 1)
}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "os"
+ "path/filepath"
+ "sort"
+ "strings"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
+)
+
+type printfer interface {
+ Printf(string, ...interface{})
+}
+
+var errTooManySymlinks = errors.New("too many symlinks, or symlink cycle")
+
+const limitFollowSymlinks = 10
+
+type filetodo struct {
+ src string
+ dst string
+ size int64
+}
+
+// copier copies data from a finished container's output path to a new
+// Arvados collection.
+//
+// Regular files (and symlinks to regular files) in hostOutputDir are
+// copied from the local filesystem.
+//
+// Symlinks to mounted collections, and any collections mounted under
+// ctrOutputDir, are copied by transforming the relevant parts of the
+// existing manifests, without moving any data around.
+//
+// Symlinks to other parts of the container's filesystem result in
+// errors.
+//
+// Use:
+//
+// manifest, err := (&copier{...}).Copy()
+type copier struct {
+ client *arvados.Client
+ arvClient IArvadosClient
+ keepClient IKeepClient
+ hostOutputDir string
+ ctrOutputDir string
+ binds []string
+ mounts map[string]arvados.Mount
+ secretMounts map[string]arvados.Mount
+ logger printfer
+
+ dirs []string
+ files []filetodo
+ manifest string
+
+ manifestCache map[string]*manifest.Manifest
+}
+
+// Copy copies data as needed, and returns a new manifest.
+func (cp *copier) Copy() (string, error) {
+ err := cp.walkMount("", cp.ctrOutputDir, limitFollowSymlinks, true)
+ if err != nil {
+ return "", err
+ }
+ fs, err := (&arvados.Collection{ManifestText: cp.manifest}).FileSystem(cp.client, cp.keepClient)
+ if err != nil {
+ return "", err
+ }
+ for _, d := range cp.dirs {
+ err = fs.Mkdir(d, 0777)
+ if err != nil {
+ return "", err
+ }
+ }
+ for _, f := range cp.files {
+ err = cp.copyFile(fs, f)
+ if err != nil {
+ return "", err
+ }
+ }
+ return fs.MarshalManifest(".")
+}
+
+func (cp *copier) copyFile(fs arvados.CollectionFileSystem, f filetodo) error {
+ cp.logger.Printf("copying %q (%d bytes)", f.dst, f.size)
+ dst, err := fs.OpenFile(f.dst, os.O_CREATE|os.O_WRONLY, 0666)
+ if err != nil {
+ return err
+ }
+ src, err := os.Open(f.src)
+ if err != nil {
+ dst.Close()
+ return err
+ }
+ defer src.Close()
+ _, err = io.Copy(dst, src)
+ if err != nil {
+ dst.Close()
+ return err
+ }
+ return dst.Close()
+}
+
+// Append to cp.manifest, cp.files, and cp.dirs so as to copy src (an
+// absolute path in the container's filesystem) to dest (an absolute
+// path in the output collection, or "" for output root).
+//
+// src must be (or be a descendant of) a readonly "collection" mount,
+// a writable collection mounted at ctrOutputPath, or a "tmp" mount.
+//
+// If walkMountsBelow is true, include contents of any collection
+// mounted below src as well.
+func (cp *copier) walkMount(dest, src string, maxSymlinks int, walkMountsBelow bool) error {
+ // srcRoot, srcMount indicate the innermost mount that
+ // contains src.
+ var srcRoot string
+ var srcMount arvados.Mount
+ for root, mnt := range cp.mounts {
+ if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
+ srcRoot, srcMount = root, mnt
+ }
+ }
+ for root := range cp.secretMounts {
+ if len(root) > len(srcRoot) && strings.HasPrefix(src+"/", root+"/") {
+ // Silently omit secrets, and symlinks to
+ // secrets.
+ return nil
+ }
+ }
+ if srcRoot == "" {
+ return fmt.Errorf("cannot output file %q: not in any mount", src)
+ }
+
+ // srcRelPath is the path to the file/dir we are trying to
+ // copy, relative to its mount point -- ".", "./foo.txt", ...
+ srcRelPath := filepath.Join(".", srcMount.Path, src[len(srcRoot):])
+
+ switch {
+ case srcMount.ExcludeFromOutput:
+ case srcMount.Kind == "tmp":
+ // Handle by walking the host filesystem.
+ return cp.walkHostFS(dest, src, maxSymlinks, walkMountsBelow)
+ case srcMount.Kind != "collection":
+ return fmt.Errorf("%q: unsupported mount %q in output (kind is %q)", src, srcRoot, srcMount.Kind)
+ case !srcMount.Writable:
+ mft, err := cp.getManifest(srcMount.PortableDataHash)
+ if err != nil {
+ return err
+ }
+ cp.manifest += mft.Extract(srcRelPath, dest).Text
+ default:
+ hostRoot, err := cp.hostRoot(srcRoot)
+ if err != nil {
+ return err
+ }
+ f, err := os.Open(filepath.Join(hostRoot, ".arvados#collection"))
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ var coll arvados.Collection
+ err = json.NewDecoder(f).Decode(&coll)
+ if err != nil {
+ return err
+ }
+ mft := manifest.Manifest{Text: coll.ManifestText}
+ cp.manifest += mft.Extract(srcRelPath, dest).Text
+ }
+ if walkMountsBelow {
+ return cp.walkMountsBelow(dest, src)
+ } else {
+ return nil
+ }
+}
+
+func (cp *copier) walkMountsBelow(dest, src string) error {
+ for mnt, mntinfo := range cp.mounts {
+ if !strings.HasPrefix(mnt, src+"/") {
+ continue
+ }
+ if cp.copyRegularFiles(mntinfo) {
+ // These got copied into the nearest parent
+ // mount as regular files during setup, so
+ // they get copied as regular files when we
+ // process the parent. Output will reflect any
+ // changes and deletions done by the
+ // container.
+ continue
+ }
+ // Example: we are processing dest=/foo src=/mnt1/dir1
+ // (perhaps we followed a symlink /outdir/foo ->
+ // /mnt1/dir1). Caller has already processed the
+ // collection mounted at /mnt1, but now we find that
+ // /mnt1/dir1/mnt2 is also a mount, so we need to copy
+ // src=/mnt1/dir1/mnt2 to dest=/foo/mnt2.
+ //
+ // We handle all descendants of /mnt1/dir1 in this
+ // loop instead of using recursion:
+ // /mnt1/dir1/mnt2/mnt3 is a child of both /mnt1 and
+ // /mnt1/dir1/mnt2, but we only want to walk it
+ // once. (This simplification is safe because mounted
+ // collections cannot contain symlinks.)
+ err := cp.walkMount(dest+mnt[len(src):], mnt, 0, false)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+// Add entries to cp.dirs and cp.files so as to copy src (an absolute
+// path in the container's filesystem which corresponds to a real file
+// or directory in cp.hostOutputDir) to dest (an absolute path in the
+// output collection, or "" for output root).
+//
+// Always follow symlinks.
+//
+// If includeMounts is true, include mounts at and below src.
+// Otherwise, skip them.
+func (cp *copier) walkHostFS(dest, src string, maxSymlinks int, includeMounts bool) error {
+ if includeMounts {
+ err := cp.walkMountsBelow(dest, src)
+ if err != nil {
+ return err
+ }
+ }
+
+ hostsrc := cp.hostOutputDir + src[len(cp.ctrOutputDir):]
+
+ // If src is a symlink, walk its target.
+ fi, err := os.Lstat(hostsrc)
+ if err != nil {
+ return fmt.Errorf("lstat %q: %s", src, err)
+ }
+ if fi.Mode()&os.ModeSymlink != 0 {
+ if maxSymlinks < 0 {
+ return errTooManySymlinks
+ }
+ target, err := os.Readlink(hostsrc)
+ if err != nil {
+ return fmt.Errorf("readlink %q: %s", src, err)
+ }
+ if !strings.HasPrefix(target, "/") {
+ target = filepath.Join(filepath.Dir(src), target)
+ }
+ return cp.walkMount(dest, target, maxSymlinks-1, true)
+ }
+
+ // If src is a regular directory, append it to cp.dirs and
+ // walk each of its children. (If there are no children,
+ // create an empty file "dest/.keep".)
+ if fi.Mode().IsDir() {
+ if dest != "" {
+ cp.dirs = append(cp.dirs, dest)
+ }
+ dir, err := os.Open(hostsrc)
+ if err != nil {
+ return fmt.Errorf("open %q: %s", src, err)
+ }
+ names, err := dir.Readdirnames(-1)
+ dir.Close()
+ if err != nil {
+ return fmt.Errorf("readdirnames %q: %s", src, err)
+ }
+ if len(names) == 0 {
+ if dest != "" {
+ cp.files = append(cp.files, filetodo{
+ src: os.DevNull,
+ dst: dest + "/.keep",
+ })
+ }
+ return nil
+ }
+ sort.Strings(names)
+ for _, name := range names {
+ dest, src := dest+"/"+name, src+"/"+name
+ if _, isSecret := cp.secretMounts[src]; isSecret {
+ continue
+ }
+ if mntinfo, isMount := cp.mounts[src]; isMount && !cp.copyRegularFiles(mntinfo) {
+ // If a regular file/dir somehow
+ // exists at a path that's also a
+ // mount target, ignore the file --
+ // the mount has already been included
+ // with walkMountsBelow().
+ //
+ // (...except mount types that are
+ // handled as regular files.)
+ continue
+ }
+ err = cp.walkHostFS(dest, src, maxSymlinks, false)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+ }
+
+ // If src is a regular file, append it to cp.files.
+ if fi.Mode().IsRegular() {
+ cp.files = append(cp.files, filetodo{
+ src: hostsrc,
+ dst: dest,
+ size: fi.Size(),
+ })
+ return nil
+ }
+
+ return fmt.Errorf("Unsupported file type (mode %o) in output dir: %q", fi.Mode(), src)
+}
+
+// Return the host path that was mounted at the given path in the
+// container.
+func (cp *copier) hostRoot(ctrRoot string) (string, error) {
+ if ctrRoot == cp.ctrOutputDir {
+ return cp.hostOutputDir, nil
+ }
+ for _, bind := range cp.binds {
+ tokens := strings.Split(bind, ":")
+ if len(tokens) >= 2 && tokens[1] == ctrRoot {
+ return tokens[0], nil
+ }
+ }
+ return "", fmt.Errorf("not bind-mounted: %q", ctrRoot)
+}
+
+func (cp *copier) copyRegularFiles(m arvados.Mount) bool {
+ return m.Kind == "text" || m.Kind == "json" || (m.Kind == "collection" && m.Writable)
+}
+
+func (cp *copier) getManifest(pdh string) (*manifest.Manifest, error) {
+ if mft, ok := cp.manifestCache[pdh]; ok {
+ return mft, nil
+ }
+ var coll arvados.Collection
+ err := cp.arvClient.Get("collections", pdh, nil, &coll)
+ if err != nil {
+ return nil, fmt.Errorf("error retrieving collection record for %q: %s", pdh, err)
+ }
+ mft := &manifest.Manifest{Text: coll.ManifestText}
+ if cp.manifestCache == nil {
+ cp.manifestCache = map[string]*manifest.Manifest{pdh: mft}
+ } else {
+ cp.manifestCache[pdh] = mft
+ }
+ return mft, nil
+}
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package main
+
+import (
+ "io"
+ "io/ioutil"
+ "os"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&copierSuite{})
+
+type copierSuite struct {
+ cp copier
+}
+
+func (s *copierSuite) SetUpTest(c *check.C) {
+ tmpdir, err := ioutil.TempDir("", "crunch-run.test.")
+ c.Assert(err, check.IsNil)
+ api, err := arvadosclient.MakeArvadosClient()
+ c.Assert(err, check.IsNil)
+ s.cp = copier{
+ client: arvados.NewClientFromEnv(),
+ arvClient: api,
+ hostOutputDir: tmpdir,
+ ctrOutputDir: "/ctr/outdir",
+ mounts: map[string]arvados.Mount{
+ "/ctr/outdir": {Kind: "tmp"},
+ },
+ secretMounts: map[string]arvados.Mount{
+ "/secret_text": {Kind: "text", Content: "xyzzy"},
+ },
+ }
+}
+
+func (s *copierSuite) TearDownTest(c *check.C) {
+ os.RemoveAll(s.cp.hostOutputDir)
+}
+
+func (s *copierSuite) TestEmptyOutput(c *check.C) {
+ err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+ c.Check(err, check.IsNil)
+ c.Check(s.cp.dirs, check.DeepEquals, []string(nil))
+ c.Check(len(s.cp.files), check.Equals, 0)
+}
+
+func (s *copierSuite) TestRegularFilesAndDirs(c *check.C) {
+ err := os.MkdirAll(s.cp.hostOutputDir+"/dir1/dir2/dir3", 0755)
+ c.Assert(err, check.IsNil)
+ f, err := os.OpenFile(s.cp.hostOutputDir+"/dir1/foo", os.O_CREATE|os.O_WRONLY, 0644)
+ c.Assert(err, check.IsNil)
+ _, err = io.WriteString(f, "foo")
+ c.Assert(err, check.IsNil)
+ c.Assert(f.Close(), check.IsNil)
+
+ err = s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+ c.Check(err, check.IsNil)
+ c.Check(s.cp.dirs, check.DeepEquals, []string{"/dir1", "/dir1/dir2", "/dir1/dir2/dir3"})
+ c.Check(s.cp.files, check.DeepEquals, []filetodo{
+ {src: os.DevNull, dst: "/dir1/dir2/dir3/.keep"},
+ {src: s.cp.hostOutputDir + "/dir1/foo", dst: "/dir1/foo", size: 3},
+ })
+}
+
+func (s *copierSuite) TestSymlinkCycle(c *check.C) {
+ c.Assert(os.Mkdir(s.cp.hostOutputDir+"/dir1", 0755), check.IsNil)
+ c.Assert(os.Mkdir(s.cp.hostOutputDir+"/dir2", 0755), check.IsNil)
+ c.Assert(os.Symlink("../dir2", s.cp.hostOutputDir+"/dir1/l_dir2"), check.IsNil)
+ c.Assert(os.Symlink("../dir1", s.cp.hostOutputDir+"/dir2/l_dir1"), check.IsNil)
+ err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+ c.Check(err, check.ErrorMatches, `.*cycle.*`)
+}
+
+func (s *copierSuite) TestSymlinkTargetMissing(c *check.C) {
+ c.Assert(os.Symlink("./missing", s.cp.hostOutputDir+"/symlink"), check.IsNil)
+ err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+ c.Check(err, check.ErrorMatches, `.*/ctr/outdir/missing.*`)
+}
+
+func (s *copierSuite) TestSymlinkTargetNotMounted(c *check.C) {
+ c.Assert(os.Symlink("../boop", s.cp.hostOutputDir+"/symlink"), check.IsNil)
+ err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+ c.Check(err, check.ErrorMatches, `.*/ctr/boop.*`)
+}
+
+func (s *copierSuite) TestSymlinkToSecret(c *check.C) {
+ c.Assert(os.Symlink("/secret_text", s.cp.hostOutputDir+"/symlink"), check.IsNil)
+ err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+ c.Check(err, check.IsNil)
+ c.Check(len(s.cp.dirs), check.Equals, 0)
+ c.Check(len(s.cp.files), check.Equals, 0)
+}
+
+func (s *copierSuite) TestSecretInOutputDir(c *check.C) {
+ s.cp.secretMounts["/ctr/outdir/secret_text"] = s.cp.secretMounts["/secret_text"]
+ s.writeFileInOutputDir(c, "secret_text", "xyzzy")
+ err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+ c.Check(err, check.IsNil)
+ c.Check(len(s.cp.dirs), check.Equals, 0)
+ c.Check(len(s.cp.files), check.Equals, 0)
+}
+
+func (s *copierSuite) TestSymlinkToMountedCollection(c *check.C) {
+ // simulate mounted read-only collection
+ s.cp.mounts["/mnt"] = arvados.Mount{
+ Kind: "collection",
+ PortableDataHash: arvadostest.FooPdh,
+ }
+
+ // simulate mounted writable collection
+ bindtmp, err := ioutil.TempDir("", "crunch-run.test.")
+ c.Assert(err, check.IsNil)
+ defer os.RemoveAll(bindtmp)
+ f, err := os.OpenFile(bindtmp+"/.arvados#collection", os.O_CREATE|os.O_WRONLY, 0644)
+ c.Assert(err, check.IsNil)
+ _, err = io.WriteString(f, `{"manifest_text":". 37b51d194a7513e45b56f6524f2d51f2+3 0:3:bar\n"}`)
+ c.Assert(err, check.IsNil)
+ c.Assert(f.Close(), check.IsNil)
+ s.cp.mounts["/mnt-w"] = arvados.Mount{
+ Kind: "collection",
+ PortableDataHash: arvadostest.FooPdh,
+ Writable: true,
+ }
+ s.cp.binds = append(s.cp.binds, bindtmp+":/mnt-w")
+
+ c.Assert(os.Symlink("../../mnt", s.cp.hostOutputDir+"/l_dir"), check.IsNil)
+ c.Assert(os.Symlink("/mnt/foo", s.cp.hostOutputDir+"/l_file"), check.IsNil)
+ c.Assert(os.Symlink("/mnt-w/bar", s.cp.hostOutputDir+"/l_file_w"), check.IsNil)
+
+ err = s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+ c.Check(err, check.IsNil)
+ c.Check(s.cp.manifest, check.Matches, `(?ms)\./l_dir acbd\S+ 0:3:foo\n\. acbd\S+ 0:3:l_file\n\. 37b5\S+ 0:3:l_file_w\n`)
+}
+
+func (s *copierSuite) TestSymlink(c *check.C) {
+ hostfile := s.cp.hostOutputDir + "/dir1/file"
+
+ err := os.MkdirAll(s.cp.hostOutputDir+"/dir1/dir2/dir3", 0755)
+ c.Assert(err, check.IsNil)
+ s.writeFileInOutputDir(c, "dir1/file", "file")
+ for _, err := range []error{
+ os.Symlink(s.cp.ctrOutputDir+"/dir1/file", s.cp.hostOutputDir+"/l_abs_file"),
+ os.Symlink(s.cp.ctrOutputDir+"/dir1/dir2", s.cp.hostOutputDir+"/l_abs_dir2"),
+ os.Symlink("../../dir1/file", s.cp.hostOutputDir+"/dir1/dir2/l_rel_file"),
+ os.Symlink("dir1/file", s.cp.hostOutputDir+"/l_rel_file"),
+ os.MkdirAll(s.cp.hostOutputDir+"/morelinks", 0755),
+ os.Symlink("../dir1/dir2", s.cp.hostOutputDir+"/morelinks/l_rel_dir2"),
+ os.Symlink("dir1/dir2/dir3", s.cp.hostOutputDir+"/l_rel_dir3"),
+ // rel. symlink -> rel. symlink -> regular file
+ os.Symlink("../dir1/dir2/l_rel_file", s.cp.hostOutputDir+"/morelinks/l_rel_l_rel_file"),
+ } {
+ c.Assert(err, check.IsNil)
+ }
+
+ err = s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+ c.Check(err, check.IsNil)
+ c.Check(s.cp.dirs, check.DeepEquals, []string{
+ "/dir1", "/dir1/dir2", "/dir1/dir2/dir3",
+ "/l_abs_dir2", "/l_abs_dir2/dir3",
+ "/l_rel_dir3",
+ "/morelinks", "/morelinks/l_rel_dir2", "/morelinks/l_rel_dir2/dir3",
+ })
+ c.Check(s.cp.files, check.DeepEquals, []filetodo{
+ {dst: "/dir1/dir2/dir3/.keep", src: os.DevNull},
+ {dst: "/dir1/dir2/l_rel_file", src: hostfile, size: 4},
+ {dst: "/dir1/file", src: hostfile, size: 4},
+ {dst: "/l_abs_dir2/dir3/.keep", src: os.DevNull},
+ {dst: "/l_abs_dir2/l_rel_file", src: hostfile, size: 4},
+ {dst: "/l_abs_file", src: hostfile, size: 4},
+ {dst: "/l_rel_dir3/.keep", src: os.DevNull},
+ {dst: "/l_rel_file", src: hostfile, size: 4},
+ {dst: "/morelinks/l_rel_dir2/dir3/.keep", src: os.DevNull},
+ {dst: "/morelinks/l_rel_dir2/l_rel_file", src: hostfile, size: 4},
+ {dst: "/morelinks/l_rel_l_rel_file", src: hostfile, size: 4},
+ })
+}
+
+func (s *copierSuite) TestUnsupportedOutputMount(c *check.C) {
+ s.cp.mounts["/ctr/outdir"] = arvados.Mount{Kind: "waz"}
+ err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+ c.Check(err, check.NotNil)
+}
+
+func (s *copierSuite) TestUnsupportedMountKindBelow(c *check.C) {
+ s.cp.mounts["/ctr/outdir/dirk"] = arvados.Mount{Kind: "waz"}
+ err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+ c.Check(err, check.NotNil)
+}
+
+func (s *copierSuite) TestWritableMountBelow(c *check.C) {
+ s.cp.mounts["/ctr/outdir/mount"] = arvados.Mount{
+ Kind: "collection",
+ PortableDataHash: arvadostest.FooPdh,
+ Writable: true,
+ }
+ c.Assert(os.MkdirAll(s.cp.hostOutputDir+"/mount", 0755), check.IsNil)
+ s.writeFileInOutputDir(c, "file", "file")
+ s.writeFileInOutputDir(c, "mount/foo", "foo")
+
+ err := s.cp.walkMount("", s.cp.ctrOutputDir, 10, true)
+ c.Check(err, check.IsNil)
+ c.Check(s.cp.dirs, check.DeepEquals, []string{"/mount"})
+ c.Check(s.cp.files, check.DeepEquals, []filetodo{
+ {src: s.cp.hostOutputDir + "/file", dst: "/file", size: 4},
+ {src: s.cp.hostOutputDir + "/mount/foo", dst: "/mount/foo", size: 3},
+ })
+}
+
+func (s *copierSuite) writeFileInOutputDir(c *check.C, path, data string) {
+ f, err := os.OpenFile(s.cp.hostOutputDir+"/"+path, os.O_CREATE|os.O_WRONLY, 0644)
+ c.Assert(err, check.IsNil)
+ _, err = io.WriteString(f, data)
+ c.Assert(err, check.IsNil)
+ c.Assert(f.Close(), check.IsNil)
+}
// IKeepClient is the minimal Keep API methods used by crunch-run.
type IKeepClient interface {
- PutHB(hash string, buf []byte) (string, int, error)
+ PutB(buf []byte) (string, int, error)
+ ReadAt(locator string, p []byte, off int) (int, error)
ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error)
ClearBlockCache()
}
// NewLogWriter is a factory function to create a new log writer.
-type NewLogWriter func(name string) io.WriteCloser
+type NewLogWriter func(name string) (io.WriteCloser, error)
type RunArvMount func(args []string, tok string) (*exec.Cmd, error)
// container.
type ContainerRunner struct {
Docker ThinDockerClient
+ client *arvados.Client
ArvClient IArvadosClient
Kc IKeepClient
arvados.Container
CrunchLog *ThrottledLogger
Stdout io.WriteCloser
Stderr io.WriteCloser
- LogCollection *CollectionWriter
+ LogCollection arvados.CollectionFileSystem
LogsPDH *string
RunArvMount
MkTempDir
}
c.Env = append(c.Env, "ARVADOS_API_TOKEN="+token)
- runner.arvMountLog = NewThrottledLogger(runner.NewLogWriter("arv-mount"))
+ w, err := runner.NewLogWriter("arv-mount")
+ if err != nil {
+ return nil, err
+ }
+ runner.arvMountLog = NewThrottledLogger(w)
c.Stdout = runner.arvMountLog
c.Stderr = runner.arvMountLog
return nil
}
-func (runner *ContainerRunner) startHoststat() {
- runner.hoststatLogger = NewThrottledLogger(runner.NewLogWriter("hoststat"))
+func (runner *ContainerRunner) startHoststat() error {
+ w, err := runner.NewLogWriter("hoststat")
+ if err != nil {
+ return err
+ }
+ runner.hoststatLogger = NewThrottledLogger(w)
runner.hoststatReporter = &crunchstat.Reporter{
Logger: log.New(runner.hoststatLogger, "", 0),
CgroupRoot: runner.cgroupRoot,
PollPeriod: runner.statInterval,
}
runner.hoststatReporter.Start()
+ return nil
}
-func (runner *ContainerRunner) startCrunchstat() {
- runner.statLogger = NewThrottledLogger(runner.NewLogWriter("crunchstat"))
+func (runner *ContainerRunner) startCrunchstat() error {
+ w, err := runner.NewLogWriter("crunchstat")
+ if err != nil {
+ return err
+ }
+ runner.statLogger = NewThrottledLogger(w)
runner.statReporter = &crunchstat.Reporter{
CID: runner.ContainerID,
Logger: log.New(runner.statLogger, "", 0),
PollPeriod: runner.statInterval,
}
runner.statReporter.Start()
+ return nil
}
type infoCommand struct {
// might differ from what's described in the node record (see
// LogNodeRecord).
func (runner *ContainerRunner) LogHostInfo() (err error) {
- w := runner.NewLogWriter("node-info")
+ w, err := runner.NewLogWriter("node-info")
+ if err != nil {
+ return
+ }
commands := []infoCommand{
{
}
func (runner *ContainerRunner) logAPIResponse(label, path string, params map[string]interface{}, munge func(interface{})) (logged bool, err error) {
+ writer, err := runner.LogCollection.OpenFile(label+".json", os.O_CREATE|os.O_WRONLY, 0666)
+ if err != nil {
+ return false, err
+ }
w := &ArvLogWriter{
ArvClient: runner.ArvClient,
UUID: runner.Container.UUID,
loggingStream: label,
- writeCloser: runner.LogCollection.Open(label + ".json"),
+ writeCloser: writer,
}
reader, err := runner.ArvClient.CallRaw("GET", path, "", "", arvadosclient.Dict(params))
return err
}
runner.Stdout = stdoutFile
+ } else if w, err := runner.NewLogWriter("stdout"); err != nil {
+ return err
} else {
- runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
+ runner.Stdout = NewThrottledLogger(w)
}
if stderrMnt, ok := runner.Container.Mounts["stderr"]; ok {
return err
}
runner.Stderr = stderrFile
+ } else if w, err := runner.NewLogWriter("stderr"); err != nil {
+ return err
} else {
- runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+ runner.Stderr = NewThrottledLogger(w)
}
if stdinRdr != nil {
}
}
-var ErrNotInOutputDir = fmt.Errorf("Must point to path within the output directory")
-
-func (runner *ContainerRunner) derefOutputSymlink(path string, startinfo os.FileInfo) (tgt string, readlinktgt string, info os.FileInfo, err error) {
- // Follow symlinks if necessary
- info = startinfo
- tgt = path
- readlinktgt = ""
- nextlink := path
- for followed := 0; info.Mode()&os.ModeSymlink != 0; followed++ {
- if followed >= limitFollowSymlinks {
- // Got stuck in a loop or just a pathological number of links, give up.
- err = fmt.Errorf("Followed more than %v symlinks from path %q", limitFollowSymlinks, path)
- return
- }
-
- readlinktgt, err = os.Readlink(nextlink)
- if err != nil {
- return
- }
-
- tgt = readlinktgt
- if !strings.HasPrefix(tgt, "/") {
- // Relative symlink, resolve it to host path
- tgt = filepath.Join(filepath.Dir(path), tgt)
- }
- if strings.HasPrefix(tgt, runner.Container.OutputPath+"/") && !strings.HasPrefix(tgt, runner.HostOutputDir+"/") {
- // Absolute symlink to container output path, adjust it to host output path.
- tgt = filepath.Join(runner.HostOutputDir, tgt[len(runner.Container.OutputPath):])
- }
- if !strings.HasPrefix(tgt, runner.HostOutputDir+"/") {
- // After dereferencing, symlink target must either be
- // within output directory, or must point to a
- // collection mount.
- err = ErrNotInOutputDir
- return
- }
-
- info, err = os.Lstat(tgt)
- if err != nil {
- // tgt
- err = fmt.Errorf("Symlink in output %q points to invalid location %q: %v",
- path[len(runner.HostOutputDir):], readlinktgt, err)
- return
- }
-
- nextlink = tgt
- }
-
- return
-}
-
-var limitFollowSymlinks = 10
-
-// UploadFile uploads files within the output directory, with special handling
-// for symlinks. If the symlink leads to a keep mount, copy the manifest text
-// from the keep mount into the output manifestText. Ensure that whether
-// symlinks are relative or absolute, every symlink target (even targets that
-// are symlinks themselves) must point to a path in either the output directory
-// or a collection mount.
-//
-// Assumes initial value of "path" is absolute, and located within runner.HostOutputDir.
-func (runner *ContainerRunner) UploadOutputFile(
- path string,
- info os.FileInfo,
- infoerr error,
- binds []string,
- walkUpload *WalkUpload,
- relocateFrom string,
- relocateTo string,
- followed int) (manifestText string, err error) {
-
- if infoerr != nil {
- return "", infoerr
- }
-
- if info.Mode().IsDir() {
- // if empty, need to create a .keep file
- dir, direrr := os.Open(path)
- if direrr != nil {
- return "", direrr
- }
- defer dir.Close()
- names, eof := dir.Readdirnames(1)
- if len(names) == 0 && eof == io.EOF && path != runner.HostOutputDir {
- containerPath := runner.OutputPath + path[len(runner.HostOutputDir):]
- for _, bind := range binds {
- mnt := runner.Container.Mounts[bind]
- // Check if there is a bind for this
- // directory, in which case assume we don't need .keep
- if (containerPath == bind || strings.HasPrefix(containerPath, bind+"/")) && mnt.PortableDataHash != "d41d8cd98f00b204e9800998ecf8427e+0" {
- return
- }
- }
- outputSuffix := path[len(runner.HostOutputDir)+1:]
- return fmt.Sprintf("./%v d41d8cd98f00b204e9800998ecf8427e+0 0:0:.keep\n", outputSuffix), nil
- }
- return
- }
-
- if followed >= limitFollowSymlinks {
- // Got stuck in a loop or just a pathological number of
- // directory links, give up.
- err = fmt.Errorf("Followed more than %v symlinks from path %q", limitFollowSymlinks, path)
- return
- }
-
- // "path" is the actual path we are visiting
- // "tgt" is the target of "path" (a non-symlink) after following symlinks
- // "relocated" is the path in the output manifest where the file should be placed,
- // but has HostOutputDir as a prefix.
-
- // The destination path in the output manifest may need to be
- // logically relocated to some other path in order to appear
- // in the correct location as a result of following a symlink.
- // Remove the relocateFrom prefix and replace it with
- // relocateTo.
- relocated := relocateTo + path[len(relocateFrom):]
-
- tgt, readlinktgt, info, derefErr := runner.derefOutputSymlink(path, info)
- if derefErr != nil && derefErr != ErrNotInOutputDir {
- return "", derefErr
- }
-
- // go through mounts and try reverse map to collection reference
- for _, bind := range binds {
- mnt := runner.Container.Mounts[bind]
- if (tgt == bind || strings.HasPrefix(tgt, bind+"/")) && !mnt.Writable {
- // get path relative to bind
- targetSuffix := tgt[len(bind):]
-
- // Copy mount and adjust the path to add path relative to the bind
- adjustedMount := mnt
- adjustedMount.Path = filepath.Join(adjustedMount.Path, targetSuffix)
-
- // Terminates in this keep mount, so add the
- // manifest text at appropriate location.
- outputSuffix := relocated[len(runner.HostOutputDir):]
- manifestText, err = runner.getCollectionManifestForPath(adjustedMount, outputSuffix)
- return
- }
- }
-
- // If target is not a collection mount, it must be located within the
- // output directory, otherwise it is an error.
- if derefErr == ErrNotInOutputDir {
- err = fmt.Errorf("Symlink in output %q points to invalid location %q, must point to path within the output directory.",
- path[len(runner.HostOutputDir):], readlinktgt)
- return
- }
-
- if info.Mode().IsRegular() {
- return "", walkUpload.UploadFile(relocated, tgt)
- }
-
- if info.Mode().IsDir() {
- // Symlink leads to directory. Walk() doesn't follow
- // directory symlinks, so we walk the target directory
- // instead. Within the walk, file paths are relocated
- // so they appear under the original symlink path.
- err = filepath.Walk(tgt, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
- var m string
- m, walkerr = runner.UploadOutputFile(walkpath, walkinfo, walkerr,
- binds, walkUpload, tgt, relocated, followed+1)
- if walkerr == nil {
- manifestText = manifestText + m
- }
- return walkerr
- })
- return
- }
-
- return
-}
-
-// HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
+// CaptureOutput saves data from the container's output directory if
+// needed, and updates the container output accordingly.
func (runner *ContainerRunner) CaptureOutput() error {
if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
// Output may have been set directly by the container, so
}
}
- if runner.HostOutputDir == "" {
- return nil
- }
-
- _, err := os.Stat(runner.HostOutputDir)
+ txt, err := (&copier{
+ client: runner.client,
+ arvClient: runner.ArvClient,
+ keepClient: runner.Kc,
+ hostOutputDir: runner.HostOutputDir,
+ ctrOutputDir: runner.Container.OutputPath,
+ binds: runner.Binds,
+ mounts: runner.Container.Mounts,
+ secretMounts: runner.SecretMounts,
+ logger: runner.CrunchLog,
+ }).Copy()
if err != nil {
- return fmt.Errorf("While checking host output path: %v", err)
- }
-
- // Pre-populate output from the configured mount points
- var binds []string
- for bind, mnt := range runner.Container.Mounts {
- if mnt.Kind == "collection" {
- binds = append(binds, bind)
- }
- }
- sort.Strings(binds)
-
- // Delete secret mounts so they don't get saved to the output collection.
- for bind := range runner.SecretMounts {
- if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
- err = os.Remove(runner.HostOutputDir + bind[len(runner.Container.OutputPath):])
- if err != nil {
- return fmt.Errorf("Unable to remove secret mount: %v", err)
- }
- }
- }
-
- var manifestText string
-
- collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
- _, err = os.Stat(collectionMetafile)
- if err != nil {
- // Regular directory
-
- cw := CollectionWriter{0, runner.Kc, nil, nil, sync.Mutex{}}
- walkUpload := cw.BeginUpload(runner.HostOutputDir, runner.CrunchLog.Logger)
-
- var m string
- err = filepath.Walk(runner.HostOutputDir, func(path string, info os.FileInfo, err error) error {
- m, err = runner.UploadOutputFile(path, info, err, binds, walkUpload, "", "", 0)
- if err == nil {
- manifestText = manifestText + m
- }
- return err
- })
-
- cw.EndUpload(walkUpload)
-
- if err != nil {
- return fmt.Errorf("While uploading output files: %v", err)
- }
-
- m, err = cw.ManifestText()
- manifestText = manifestText + m
- if err != nil {
- return fmt.Errorf("While uploading output files: %v", err)
- }
- } else {
- // FUSE mount directory
- file, openerr := os.Open(collectionMetafile)
- if openerr != nil {
- return fmt.Errorf("While opening FUSE metafile: %v", err)
- }
- defer file.Close()
-
- var rec arvados.Collection
- err = json.NewDecoder(file).Decode(&rec)
- if err != nil {
- return fmt.Errorf("While reading FUSE metafile: %v", err)
- }
- manifestText = rec.ManifestText
- }
-
- for _, bind := range binds {
- mnt := runner.Container.Mounts[bind]
-
- bindSuffix := strings.TrimPrefix(bind, runner.Container.OutputPath)
-
- if bindSuffix == bind || len(bindSuffix) <= 0 {
- // either does not start with OutputPath or is OutputPath itself
- continue
- }
-
- if mnt.ExcludeFromOutput == true || mnt.Writable {
- continue
- }
-
- // append to manifest_text
- m, err := runner.getCollectionManifestForPath(mnt, bindSuffix)
- if err != nil {
- return err
- }
-
- manifestText = manifestText + m
- }
-
- // Save output
- var response arvados.Collection
- manifest := manifest.Manifest{Text: manifestText}
- manifestText = manifest.Extract(".", ".").Text
- err = runner.ArvClient.Create("collections",
- arvadosclient.Dict{
- "ensure_unique_name": true,
- "collection": arvadosclient.Dict{
- "is_trashed": true,
- "name": "output for " + runner.Container.UUID,
- "manifest_text": manifestText}},
- &response)
+ return err
+ }
+ var resp arvados.Collection
+ err = runner.ArvClient.Create("collections", arvadosclient.Dict{
+ "ensure_unique_name": true,
+ "collection": arvadosclient.Dict{
+ "is_trashed": true,
+ "name": "output for " + runner.Container.UUID,
+ "manifest_text": txt,
+ },
+ }, &resp)
if err != nil {
- return fmt.Errorf("While creating output collection: %v", err)
+ return fmt.Errorf("error creating output collection: %v", err)
}
- runner.OutputPDH = &response.PortableDataHash
+ runner.OutputPDH = &resp.PortableDataHash
return nil
}
-var outputCollections = make(map[string]arvados.Collection)
-
-// Fetch the collection for the mnt.PortableDataHash
-// Return the manifest_text fragment corresponding to the specified mnt.Path
-// after making any required updates.
-// Ex:
-// If mnt.Path is not specified,
-// return the entire manifest_text after replacing any "." with bindSuffix
-// If mnt.Path corresponds to one stream,
-// return the manifest_text for that stream after replacing that stream name with bindSuffix
-// Otherwise, check if a filename in any one stream is being sought. Return the manifest_text
-// for that stream after replacing stream name with bindSuffix minus the last word
-// and the file name with last word of the bindSuffix
-// Allowed path examples:
-// "path":"/"
-// "path":"/subdir1"
-// "path":"/subdir1/subdir2"
-// "path":"/subdir/filename" etc
-func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, bindSuffix string) (string, error) {
- collection := outputCollections[mnt.PortableDataHash]
- if collection.PortableDataHash == "" {
- err := runner.ArvClient.Get("collections", mnt.PortableDataHash, nil, &collection)
- if err != nil {
- return "", fmt.Errorf("While getting collection for %v: %v", mnt.PortableDataHash, err)
- }
- outputCollections[mnt.PortableDataHash] = collection
- }
-
- if collection.ManifestText == "" {
- runner.CrunchLog.Printf("No manifest text for collection %v", collection.PortableDataHash)
- return "", nil
- }
-
- mft := manifest.Manifest{Text: collection.ManifestText}
- extracted := mft.Extract(mnt.Path, bindSuffix)
- if extracted.Err != nil {
- return "", fmt.Errorf("Error parsing manifest for %v: %v", mnt.PortableDataHash, extracted.Err.Error())
- }
- return extracted.Text, nil
-}
-
func (runner *ContainerRunner) CleanupDirs() {
if runner.ArvMount != nil {
var delay int64 = 8
// point, but re-open crunch log with ArvClient in case there are any
// other further errors (such as failing to write the log to Keep!)
// while shutting down
- runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient,
- UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil})
+ runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{
+ ArvClient: runner.ArvClient,
+ UUID: runner.Container.UUID,
+ loggingStream: "crunch-run",
+ writeCloser: nil,
+ })
runner.CrunchLog.Immediate = log.New(os.Stderr, runner.Container.UUID+" ", 0)
}()
return nil
}
- mt, err := runner.LogCollection.ManifestText()
+ mt, err := runner.LogCollection.MarshalManifest(".")
if err != nil {
return fmt.Errorf("While creating log manifest: %v", err)
}
}
// NewArvLogWriter creates an ArvLogWriter
-func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
+func (runner *ContainerRunner) NewArvLogWriter(name string) (io.WriteCloser, error) {
+ writer, err := runner.LogCollection.OpenFile(name+".txt", os.O_CREATE|os.O_WRONLY, 0666)
+ if err != nil {
+ return nil, err
+ }
return &ArvLogWriter{
ArvClient: runner.ArvClient,
UUID: runner.Container.UUID,
loggingStream: name,
- writeCloser: runner.LogCollection.Open(name + ".txt")}
+ writeCloser: writer,
+ }, nil
}
// Run the full container lifecycle.
return
}
runner.setupSignals()
- runner.startHoststat()
+ err = runner.startHoststat()
+ if err != nil {
+ return
+ }
// check for and/or load image
err = runner.LoadImage()
}
runner.finalState = "Cancelled"
- runner.startCrunchstat()
+ err = runner.startCrunchstat()
+ if err != nil {
+ return
+ }
err = runner.StartContainer()
if err != nil {
}
// NewContainerRunner creates a new container runner.
-func NewContainerRunner(api IArvadosClient,
- kc IKeepClient,
- docker ThinDockerClient,
- containerUUID string) *ContainerRunner {
-
- cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker}
+func NewContainerRunner(client *arvados.Client, api IArvadosClient, kc IKeepClient, docker ThinDockerClient, containerUUID string) (*ContainerRunner, error) {
+ cr := &ContainerRunner{
+ client: client,
+ ArvClient: api,
+ Kc: kc,
+ Docker: docker,
+ }
cr.NewLogWriter = cr.NewArvLogWriter
cr.RunArvMount = cr.ArvMountCmd
cr.MkTempDir = ioutil.TempDir
cl.ApiToken = token
return cl, nil
}
- cr.LogCollection = &CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
+ var err error
+ cr.LogCollection, err = (&arvados.Collection{}).FileSystem(cr.client, cr.Kc)
+ if err != nil {
+ return nil, err
+ }
cr.Container.UUID = containerUUID
- cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
+ w, err := cr.NewLogWriter("crunch-run")
+ if err != nil {
+ return nil, err
+ }
+ cr.CrunchLog = NewThrottledLogger(w)
cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
loadLogThrottleParams(api)
- return cr
+ return cr, nil
}
func main() {
// minimum version we want to support.
docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
- cr := NewContainerRunner(api, kc, docker, containerId)
+ cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerId)
+ if err != nil {
+ log.Fatal(err)
+ }
if dockererr != nil {
cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
cr.checkBrokenNode(dockererr)
if *memprofile != "" {
f, err := os.Create(*memprofile)
if err != nil {
- log.Printf("could not create memory profile: ", err)
+ log.Printf("could not create memory profile: %s", err)
}
runtime.GC() // get up-to-date statistics
if err := pprof.WriteHeapProfile(f); err != nil {
- log.Printf("could not write memory profile: ", err)
+ log.Printf("could not write memory profile: %s", err)
}
closeerr := f.Close()
if closeerr != nil {
- log.Printf("closing memprofile file: ", err)
+ log.Printf("closing memprofile file: %s", err)
}
}
"fmt"
"io"
"io/ioutil"
- "log"
"net"
"os"
"os/exec"
var _ = Suite(&TestSuite{})
type TestSuite struct {
+ client *arvados.Client
docker *TestDockerClient
}
func (s *TestSuite) SetUpTest(c *C) {
+ s.client = arvados.NewClientFromEnv()
s.docker = NewTestDockerClient()
}
return nil
}
-func (client *KeepTestClient) PutHB(hash string, buf []byte) (string, int, error) {
+func (client *KeepTestClient) PutB(buf []byte) (string, int, error) {
client.Content = buf
- return fmt.Sprintf("%s+%d", hash, len(buf)), len(buf), nil
+ return fmt.Sprintf("%x+%d", md5.Sum(buf), len(buf)), len(buf), nil
}
-func (*KeepTestClient) ClearBlockCache() {
+func (client *KeepTestClient) ReadAt(string, []byte, int) (int, error) {
+ return 0, errors.New("not implemented")
+}
+
+func (client *KeepTestClient) ClearBlockCache() {
}
func (client *KeepTestClient) Close() {
func (s *TestSuite) TestLoadImage(c *C) {
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(&ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
- _, err := cr.Docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
+ _, err = cr.Docker.ImageRemove(nil, hwImageId, dockertypes.ImageRemoveOptions{})
+ c.Check(err, IsNil)
_, _, err = cr.Docker.ImageInspectWithRaw(nil, hwImageId)
c.Check(err, NotNil)
return discoveryMap[key], nil
}
-type KeepErrorTestClient struct{}
-
-func (KeepErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
- return "", 0, errors.New("KeepError")
+type KeepErrorTestClient struct {
+ KeepTestClient
}
-func (KeepErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (arvados.File, error) {
+func (*KeepErrorTestClient) ManifestFileReader(manifest.Manifest, string) (arvados.File, error) {
return nil, errors.New("KeepError")
}
-func (KeepErrorTestClient) ClearBlockCache() {
+func (*KeepErrorTestClient) PutB(buf []byte) (string, int, error) {
+ return "", 0, errors.New("KeepError")
}
-type KeepReadErrorTestClient struct{}
-
-func (KeepReadErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
- return "", 0, nil
+type KeepReadErrorTestClient struct {
+ KeepTestClient
}
-func (KeepReadErrorTestClient) ClearBlockCache() {
+func (*KeepReadErrorTestClient) ReadAt(string, []byte, int) (int, error) {
+ return 0, errors.New("KeepError")
}
type ErrorReader struct {
// (1) Arvados error
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(ArvErrorTestClient{}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, ArvErrorTestClient{}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
cr.Container.ContainerImage = hwPDH
- err := cr.LoadImage()
+ err = cr.LoadImage()
c.Check(err.Error(), Equals, "While getting container image collection: ArvError")
}
func (s *TestSuite) TestLoadImageKeepError(c *C) {
// (2) Keep error
- cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, &ArvTestClient{}, &KeepErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
cr.Container.ContainerImage = hwPDH
- err := cr.LoadImage()
+ err = cr.LoadImage()
+ c.Assert(err, NotNil)
c.Check(err.Error(), Equals, "While creating ManifestFileReader for container image: KeepError")
}
func (s *TestSuite) TestLoadImageCollectionError(c *C) {
// (3) Collection doesn't contain image
- cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, &ArvTestClient{}, &KeepReadErrorTestClient{}, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
cr.Container.ContainerImage = otherPDH
- err := cr.LoadImage()
+ err = cr.LoadImage()
c.Check(err.Error(), Equals, "First file in the container image collection does not end in .tar")
}
func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
// (4) Collection doesn't contain image
- cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, &ArvTestClient{}, &KeepReadErrorTestClient{}, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
cr.Container.ContainerImage = hwPDH
- err := cr.LoadImage()
+ err = cr.LoadImage()
c.Check(err, NotNil)
}
Stderr ClosableBuffer
}
-func (tl *TestLogs) NewTestLoggingWriter(logstr string) io.WriteCloser {
+func (tl *TestLogs) NewTestLoggingWriter(logstr string) (io.WriteCloser, error) {
if logstr == "stdout" {
- return &tl.Stdout
+ return &tl.Stdout, nil
}
if logstr == "stderr" {
- return &tl.Stderr
+ return &tl.Stderr, nil
}
- return nil
+ return nil, errors.New("???")
}
func dockerLog(fd byte, msg string) []byte {
}
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(&ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, &ArvTestClient{}, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
var logs TestLogs
cr.NewLogWriter = logs.NewTestLoggingWriter
cr.Container.ContainerImage = hwPDH
cr.Container.Command = []string{"./hw"}
- err := cr.LoadImage()
+ err = cr.LoadImage()
c.Check(err, IsNil)
err = cr.CreateContainer()
api := &ArvTestClient{}
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
cr.CrunchLog.Print("Hello world!")
cr.CrunchLog.Print("Goodbye")
cr.finalState = "Complete"
- err := cr.CommitLogs()
+ err = cr.CommitLogs()
c.Check(err, IsNil)
c.Check(api.Calls, Equals, 2)
api := &ArvTestClient{}
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
- err := cr.UpdateContainerRunning()
+ err = cr.UpdateContainerRunning()
c.Check(err, IsNil)
c.Check(api.Content[0]["container"].(arvadosclient.Dict)["state"], Equals, "Running")
api := &ArvTestClient{}
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
cr.LogsPDH = new(string)
*cr.LogsPDH = "d3a229d2fe3690c2c3e75a71a153c6a3+60"
*cr.ExitCode = 42
cr.finalState = "Complete"
- err := cr.UpdateContainerFinal()
+ err = cr.UpdateContainerFinal()
c.Check(err, IsNil)
c.Check(api.Content[0]["container"].(arvadosclient.Dict)["log"], Equals, *cr.LogsPDH)
api := &ArvTestClient{}
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
cr.cCancelled = true
cr.finalState = "Cancelled"
- err := cr.UpdateContainerFinal()
+ err = cr.UpdateContainerFinal()
c.Check(err, IsNil)
c.Check(api.Content[0]["container"].(arvadosclient.Dict)["log"], IsNil)
err = json.Unmarshal([]byte(record), &sm)
c.Check(err, IsNil)
secretMounts, err := json.Marshal(sm)
- log.Printf("%q %q", sm, secretMounts)
+ c.Logf("%s %q", sm, secretMounts)
c.Check(err, IsNil)
s.docker.exitCode = exitCode
s.docker.api = api
kc := &KeepTestClient{}
defer kc.Close()
- cr = NewContainerRunner(api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err = NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
cr.statInterval = 100 * time.Millisecond
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
api := &ArvTestClient{Container: rec}
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
cr.RunArvMount = func([]string, string) (*exec.Cmd, error) { return nil, nil }
cr.MkArvClient = func(token string) (IArvadosClient, error) {
return &ArvTestClient{}, nil
api := &ArvTestClient{}
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
api = &ArvTestClient{Container: rec}
kc := &KeepTestClient{}
defer kc.Close()
- cr = NewContainerRunner(api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err = NewContainerRunner(s.client, api, kc, s.docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
cr.MkArvClient = func(token string) (IArvadosClient, error) {
}
func (s *TestSuite) TestFullRunWithAPI(c *C) {
+ defer os.Setenv("ARVADOS_API_HOST", os.Getenv("ARVADOS_API_HOST"))
os.Setenv("ARVADOS_API_HOST", "test.arvados.org")
- defer os.Unsetenv("ARVADOS_API_HOST")
api, _, _ := s.fullRunHelper(c, `{
"command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
}
func (s *TestSuite) TestFullRunSetOutput(c *C) {
+ defer os.Setenv("ARVADOS_API_HOST", os.Getenv("ARVADOS_API_HOST"))
os.Setenv("ARVADOS_API_HOST", "test.arvados.org")
- defer os.Unsetenv("ARVADOS_API_HOST")
api, _, _ := s.fullRunHelper(c, `{
"command": ["/bin/sh", "-c", "echo $ARVADOS_API_HOST"],
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
manifest := collection["manifest_text"].(string)
c.Check(manifest, Equals, `./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out
-./foo 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0abcdefgh11234567890@569fa8c3 9:18:bar 9:18:sub1file2
+./foo 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0abcdefgh11234567890@569fa8c3 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396cabcdefghij6419876543234@569fa8c4 9:18:bar 36:18:sub1file2
./foo/baz 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0bcdefghijk544332211@569fa8c5 9:18:sub2file2
./foo/sub1 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396cabcdefghij6419876543234@569fa8c4 0:9:file1_in_subdir1.txt 9:18:file2_in_subdir1.txt
./foo/sub1/subdir2 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0bcdefghijk544332211@569fa8c5 0:9:file1_in_subdir2.txt 9:18:file2_in_subdir2.txt
"environment": {"FROBIZ": "bilbo"},
"mounts": {
"/tmp": {"kind": "tmp"},
- "/tmp/foo/bar": {"kind": "collection", "portable_data_hash": "b0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt"},
+ "/tmp/foo/bar": {"kind": "collection", "portable_data_hash": "b0def87f80dd594d4675809e83bd4f15+367", "path": "/subdir1/file2_in_subdir1.txt"},
"stdout": {"kind": "file", "path": "/tmp/a/b/c.out"}
},
"output_path": "/tmp",
}
}
-func (s *TestSuite) TestOutputSymlinkToInput(c *C) {
- helperRecord := `{
- "command": ["/bin/sh", "-c", "echo $FROBIZ"],
- "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
- "cwd": "/bin",
- "environment": {"FROBIZ": "bilbo"},
- "mounts": {
- "/tmp": {"kind": "tmp"},
- "/keep/foo/sub1file2": {"kind": "collection", "portable_data_hash": "a0def87f80dd594d4675809e83bd4f15+367", "path": "/subdir1/file2_in_subdir1.txt"},
- "/keep/foo2": {"kind": "collection", "portable_data_hash": "a0def87f80dd594d4675809e83bd4f15+367"}
- },
- "output_path": "/tmp",
- "priority": 1,
- "runtime_constraints": {}
- }`
-
- extraMounts := []string{
- "a0def87f80dd594d4675809e83bd4f15+367/subdir1/file2_in_subdir1.txt",
- }
-
- api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
- os.Symlink("/keep/foo/sub1file2", t.realTemp+"/tmp2/baz")
- os.Symlink("/keep/foo2/subdir1/file2_in_subdir1.txt", t.realTemp+"/tmp2/baz2")
- os.Symlink("/keep/foo2/subdir1", t.realTemp+"/tmp2/baz3")
- os.Mkdir(t.realTemp+"/tmp2/baz4", 0700)
- os.Symlink("/keep/foo2/subdir1/file2_in_subdir1.txt", t.realTemp+"/tmp2/baz4/baz5")
- t.logWriter.Close()
- })
-
- c.Check(api.CalledWith("container.exit_code", 0), NotNil)
- c.Check(api.CalledWith("container.state", "Complete"), NotNil)
- for _, v := range api.Content {
- if v["collection"] != nil {
- collection := v["collection"].(arvadosclient.Dict)
- if strings.Index(collection["name"].(string), "output") == 0 {
- manifest := collection["manifest_text"].(string)
- c.Check(manifest, Equals, `. 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396cabcdefghij6419876543234@569fa8c4 9:18:baz 9:18:baz2
-./baz3 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396cabcdefghij6419876543234@569fa8c4 0:9:file1_in_subdir1.txt 9:18:file2_in_subdir1.txt
-./baz3/subdir2 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396c73c0bcdefghijk544332211@569fa8c5 0:9:file1_in_subdir2.txt 9:18:file2_in_subdir2.txt
-./baz4 3e426d509afffb85e06c4c96a7c15e91+27+Aa124ac75e5168396cabcdefghij6419876543234@569fa8c4 9:18:baz5
-`)
- }
- }
- }
-}
-
func (s *TestSuite) TestOutputError(c *C) {
helperRecord := `{
"command": ["/bin/sh", "-c", "echo $FROBIZ"],
c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
}
-func (s *TestSuite) TestOutputSymlinkToOutput(c *C) {
- helperRecord := `{
- "command": ["/bin/sh", "-c", "echo $FROBIZ"],
- "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
- "cwd": "/bin",
- "environment": {"FROBIZ": "bilbo"},
- "mounts": {
- "/tmp": {"kind": "tmp"}
- },
- "output_path": "/tmp",
- "priority": 1,
- "runtime_constraints": {}
- }`
-
- extraMounts := []string{}
-
- api, _, _ := s.fullRunHelper(c, helperRecord, extraMounts, 0, func(t *TestDockerClient) {
- rf, _ := os.Create(t.realTemp + "/tmp2/realfile")
- rf.Write([]byte("foo"))
- rf.Close()
-
- os.Mkdir(t.realTemp+"/tmp2/realdir", 0700)
- rf, _ = os.Create(t.realTemp + "/tmp2/realdir/subfile")
- rf.Write([]byte("bar"))
- rf.Close()
-
- os.Symlink("/tmp/realfile", t.realTemp+"/tmp2/file1")
- os.Symlink("realfile", t.realTemp+"/tmp2/file2")
- os.Symlink("/tmp/file1", t.realTemp+"/tmp2/file3")
- os.Symlink("file2", t.realTemp+"/tmp2/file4")
- os.Symlink("realdir", t.realTemp+"/tmp2/dir1")
- os.Symlink("/tmp/realdir", t.realTemp+"/tmp2/dir2")
- t.logWriter.Close()
- })
-
- c.Check(api.CalledWith("container.exit_code", 0), NotNil)
- c.Check(api.CalledWith("container.state", "Complete"), NotNil)
- for _, v := range api.Content {
- if v["collection"] != nil {
- collection := v["collection"].(arvadosclient.Dict)
- if strings.Index(collection["name"].(string), "output") == 0 {
- manifest := collection["manifest_text"].(string)
- c.Check(manifest, Equals,
- `. 7a2c86e102dcc231bd232aad99686dfa+15 0:3:file1 3:3:file2 6:3:file3 9:3:file4 12:3:realfile
-./dir1 37b51d194a7513e45b56f6524f2d51f2+3 0:3:subfile
-./dir2 37b51d194a7513e45b56f6524f2d51f2+3 0:3:subfile
-./realdir 37b51d194a7513e45b56f6524f2d51f2+3 0:3:subfile
-`)
- }
- }
- }
-}
-
func (s *TestSuite) TestStdinCollectionMountPoint(c *C) {
helperRecord := `{
"command": ["/bin/sh", "-c", "echo $FROBIZ"],
func (s *TestSuite) TestNumberRoundTrip(c *C) {
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(&ArvTestClient{callraw: true}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, &ArvTestClient{callraw: true}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
cr.fetchContainerRecord()
jsondata, err := json.Marshal(cr.Container.Mounts["/json"].Content)
c.Check(string(jsondata), Equals, `{"number":123456789123456789}`)
}
-func (s *TestSuite) TestEvalSymlinks(c *C) {
- kc := &KeepTestClient{}
- defer kc.Close()
- cr := NewContainerRunner(&ArvTestClient{callraw: true}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
-
- realTemp, err := ioutil.TempDir("", "crunchrun_test-")
- c.Assert(err, IsNil)
- defer os.RemoveAll(realTemp)
-
- cr.HostOutputDir = realTemp
-
- // Absolute path outside output dir
- os.Symlink("/etc/passwd", realTemp+"/p1")
-
- // Relative outside output dir
- os.Symlink("../zip", realTemp+"/p2")
-
- // Circular references
- os.Symlink("p4", realTemp+"/p3")
- os.Symlink("p5", realTemp+"/p4")
- os.Symlink("p3", realTemp+"/p5")
-
- // Target doesn't exist
- os.Symlink("p99", realTemp+"/p6")
-
- for _, v := range []string{"p1", "p2", "p3", "p4", "p5"} {
- info, err := os.Lstat(realTemp + "/" + v)
- _, _, _, err = cr.derefOutputSymlink(realTemp+"/"+v, info)
- c.Assert(err, NotNil)
- }
-}
-
-func (s *TestSuite) TestEvalSymlinkDir(c *C) {
- kc := &KeepTestClient{}
- defer kc.Close()
- cr := NewContainerRunner(&ArvTestClient{callraw: true}, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
-
- realTemp, err := ioutil.TempDir("", "crunchrun_test-")
- c.Assert(err, IsNil)
- defer os.RemoveAll(realTemp)
-
- cr.HostOutputDir = realTemp
-
- // Absolute path outside output dir
- os.Symlink(".", realTemp+"/loop")
-
- v := "loop"
- info, err := os.Lstat(realTemp + "/" + v)
- _, err = cr.UploadOutputFile(realTemp+"/"+v, info, err, []string{}, nil, "", "", 0)
- c.Assert(err, NotNil)
-}
-
func (s *TestSuite) TestFullBrokenDocker1(c *C) {
tf, err := ioutil.TempFile("", "brokenNodeHook-")
c.Assert(err, IsNil)
"testing"
"time"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
. "gopkg.in/check.v1"
)
-type LoggingTestSuite struct{}
+type LoggingTestSuite struct {
+ client *arvados.Client
+}
type TestTimestamper struct {
count int
// Gocheck boilerplate
var _ = Suite(&LoggingTestSuite{})
+func (s *LoggingTestSuite) SetUpTest(c *C) {
+ s.client = arvados.NewClientFromEnv()
+}
+
func (s *LoggingTestSuite) TestWriteLogs(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
cr.CrunchLog.Print("Hello world!")
c.Check(api.Calls, Equals, 1)
- mt, err := cr.LogCollection.ManifestText()
+ mt, err := cr.LogCollection.MarshalManifest(".")
c.Check(err, IsNil)
c.Check(mt, Equals, ". 74561df9ae65ee9f35d5661d42454264+83 0:83:crunch-run.txt\n")
api := &ArvTestClient{}
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
cr.CrunchLog.Immediate = nil
c.Check(api.Calls > 1, Equals, true)
c.Check(api.Calls < 2000000, Equals, true)
- mt, err := cr.LogCollection.ManifestText()
+ mt, err := cr.LogCollection.MarshalManifest(".")
c.Check(err, IsNil)
c.Check(mt, Equals, ". 9c2c05d1fae6aaa8af85113ba725716d+67108864 80b821383a07266c2a66a4566835e26e+21780065 0:88888929:crunch-run.txt\n")
}
api := &ArvTestClient{}
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
ts := &TestTimestamper{}
cr.CrunchLog.Timestamper = ts.Timestamp
- stdout := NewThrottledLogger(cr.NewLogWriter("stdout"))
+ w, err := cr.NewLogWriter("stdout")
+ c.Assert(err, IsNil)
+ stdout := NewThrottledLogger(w)
stdout.Timestamper = ts.Timestamp
cr.CrunchLog.Print("Hello world!")
2015-12-29T15:51:45.000000004Z Blurb
`)
- mt, err := cr.LogCollection.ManifestText()
+ mt, err := cr.LogCollection.MarshalManifest(".")
c.Check(err, IsNil)
- c.Check(mt, Equals, ""+
- ". 408672f5b5325f7d20edfbf899faee42+83 0:83:crunch-run.txt\n"+
- ". c556a293010069fa79a6790a931531d5+80 0:80:stdout.txt\n")
+ c.Check(mt, Equals, ". 48f9023dc683a850b1c9b482b14c4b97+163 0:83:crunch-run.txt 83:80:stdout.txt\n")
}
func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleBytes(c *C) {
- testWriteLogsWithRateLimit(c, "crunchLogThrottleBytes", 50, 65536, "Exceeded rate 50 bytes per 60 seconds")
+ s.testWriteLogsWithRateLimit(c, "crunchLogThrottleBytes", 50, 65536, "Exceeded rate 50 bytes per 60 seconds")
}
func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleLines(c *C) {
- testWriteLogsWithRateLimit(c, "crunchLogThrottleLines", 1, 1024, "Exceeded rate 1 lines per 60 seconds")
+ s.testWriteLogsWithRateLimit(c, "crunchLogThrottleLines", 1, 1024, "Exceeded rate 1 lines per 60 seconds")
}
func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleBytesPerEvent(c *C) {
- testWriteLogsWithRateLimit(c, "crunchLimitLogBytesPerJob", 50, 67108864, "Exceeded log limit 50 bytes (crunch_limit_log_bytes_per_job)")
+ s.testWriteLogsWithRateLimit(c, "crunchLimitLogBytesPerJob", 50, 67108864, "Exceeded log limit 50 bytes (crunch_limit_log_bytes_per_job)")
}
-func testWriteLogsWithRateLimit(c *C, throttleParam string, throttleValue int, throttleDefault int, expected string) {
+func (s *LoggingTestSuite) testWriteLogsWithRateLimit(c *C, throttleParam string, throttleValue int, throttleDefault int, expected string) {
discoveryMap[throttleParam] = float64(throttleValue)
defer func() {
discoveryMap[throttleParam] = float64(throttleDefault)
api := &ArvTestClient{}
kc := &KeepTestClient{}
defer kc.Close()
- cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+ cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
cr.CrunchLog.Print("Hello world!")
c.Check(api.Calls, Equals, 1)
- mt, err := cr.LogCollection.ManifestText()
+ mt, err := cr.LogCollection.MarshalManifest(".")
c.Check(err, IsNil)
c.Check(mt, Equals, ". 74561df9ae65ee9f35d5661d42454264+83 0:83:crunch-run.txt\n")
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-// Originally based on sdk/go/crunchrunner/upload.go
-//
-// Unlike the original, which iterates over a directory tree and uploads each
-// file sequentially, this version supports opening and writing multiple files
-// in a collection simultaneously.
-//
-// Eventually this should move into the Arvados Go SDK for a more comprehensive
-// implementation of Collections.
-
-import (
- "bytes"
- "crypto/md5"
- "errors"
- "fmt"
- "io"
- "log"
- "os"
- "path/filepath"
- "strings"
- "sync"
-
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "git.curoverse.com/arvados.git/sdk/go/manifest"
-)
-
-// Block is a data block in a manifest stream
-type Block struct {
- data []byte
- offset int64
-}
-
-// CollectionFileWriter is a Writer that permits writing to a file in a Keep Collection.
-type CollectionFileWriter struct {
- IKeepClient
- *manifest.ManifestStream
- offset uint64
- length uint64
- *Block
- uploader chan *Block
- finish chan []error
- fn string
-}
-
-// Write to a file in a keep collection
-func (m *CollectionFileWriter) Write(p []byte) (int, error) {
- n, err := m.ReadFrom(bytes.NewReader(p))
- return int(n), err
-}
-
-// ReadFrom a Reader and write to the Keep collection file.
-func (m *CollectionFileWriter) ReadFrom(r io.Reader) (n int64, err error) {
- var total int64
- var count int
-
- for err == nil {
- if m.Block == nil {
- m.Block = &Block{make([]byte, keepclient.BLOCKSIZE), 0}
- }
- count, err = r.Read(m.Block.data[m.Block.offset:])
- total += int64(count)
- m.Block.offset += int64(count)
- if m.Block.offset == keepclient.BLOCKSIZE {
- m.uploader <- m.Block
- m.Block = nil
- }
- }
-
- m.length += uint64(total)
-
- if err == io.EOF {
- return total, nil
- }
- return total, err
-}
-
-// Close stops writing a file and adds it to the parent manifest.
-func (m *CollectionFileWriter) Close() error {
- m.ManifestStream.FileStreamSegments = append(m.ManifestStream.FileStreamSegments,
- manifest.FileStreamSegment{m.offset, m.length, m.fn})
- return nil
-}
-
-func (m *CollectionFileWriter) NewFile(fn string) {
- m.offset += m.length
- m.length = 0
- m.fn = fn
-}
-
-func (m *CollectionFileWriter) goUpload(workers chan struct{}) {
- var mtx sync.Mutex
- var wg sync.WaitGroup
-
- var errors []error
- uploader := m.uploader
- finish := m.finish
- for block := range uploader {
- mtx.Lock()
- m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, "")
- blockIndex := len(m.ManifestStream.Blocks) - 1
- mtx.Unlock()
-
- workers <- struct{}{} // wait for an available worker slot
- wg.Add(1)
-
- go func(block *Block, blockIndex int) {
- hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
- signedHash, _, err := m.IKeepClient.PutHB(hash, block.data[0:block.offset])
- <-workers
-
- mtx.Lock()
- if err != nil {
- errors = append(errors, err)
- } else {
- m.ManifestStream.Blocks[blockIndex] = signedHash
- }
- mtx.Unlock()
-
- wg.Done()
- }(block, blockIndex)
- }
- wg.Wait()
-
- finish <- errors
-}
-
-// CollectionWriter implements creating new Keep collections by opening files
-// and writing to them.
-type CollectionWriter struct {
- MaxWriters int
- IKeepClient
- Streams []*CollectionFileWriter
- workers chan struct{}
- mtx sync.Mutex
-}
-
-// Open a new file for writing in the Keep collection.
-func (m *CollectionWriter) Open(path string) io.WriteCloser {
- var dir string
- var fn string
-
- i := strings.Index(path, "/")
- if i > -1 {
- dir = "./" + path[0:i]
- fn = path[i+1:]
- } else {
- dir = "."
- fn = path
- }
-
- fw := &CollectionFileWriter{
- m.IKeepClient,
- &manifest.ManifestStream{StreamName: dir},
- 0,
- 0,
- nil,
- make(chan *Block),
- make(chan []error),
- fn}
-
- m.mtx.Lock()
- defer m.mtx.Unlock()
- if m.workers == nil {
- if m.MaxWriters < 1 {
- m.MaxWriters = 2
- }
- m.workers = make(chan struct{}, m.MaxWriters)
- }
-
- go fw.goUpload(m.workers)
-
- m.Streams = append(m.Streams, fw)
-
- return fw
-}
-
-// Finish writing the collection, wait for all blocks to complete uploading.
-func (m *CollectionWriter) Finish() error {
- var errstring string
- m.mtx.Lock()
- defer m.mtx.Unlock()
-
- for _, stream := range m.Streams {
- if stream.uploader == nil {
- continue
- }
- if stream.Block != nil {
- stream.uploader <- stream.Block
- stream.Block = nil
- }
- close(stream.uploader)
- stream.uploader = nil
-
- errors := <-stream.finish
- close(stream.finish)
- stream.finish = nil
-
- for _, r := range errors {
- errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
- }
- }
- if errstring != "" {
- return errors.New(errstring)
- }
- return nil
-}
-
-// ManifestText returns the manifest text of the collection. Calls Finish()
-// first to ensure that all blocks are written and that signed locators and
-// available.
-func (m *CollectionWriter) ManifestText() (mt string, err error) {
- err = m.Finish()
- if err != nil {
- return "", err
- }
-
- var buf bytes.Buffer
-
- m.mtx.Lock()
- defer m.mtx.Unlock()
- for _, v := range m.Streams {
- if len(v.FileStreamSegments) == 0 {
- continue
- }
- k := v.StreamName
- if k == "." {
- buf.WriteString(".")
- } else {
- k = strings.Replace(k, " ", "\\040", -1)
- k = strings.Replace(k, "\n", "", -1)
- buf.WriteString("./" + k)
- }
- if len(v.Blocks) > 0 {
- for _, b := range v.Blocks {
- buf.WriteString(" ")
- buf.WriteString(b)
- }
- } else {
- buf.WriteString(" d41d8cd98f00b204e9800998ecf8427e+0")
- }
- for _, f := range v.FileStreamSegments {
- buf.WriteString(" ")
- name := strings.Replace(f.Name, " ", "\\040", -1)
- name = strings.Replace(name, "\n", "", -1)
- buf.WriteString(fmt.Sprintf("%v:%v:%v", f.SegPos, f.SegLen, name))
- }
- buf.WriteString("\n")
- }
- return buf.String(), nil
-}
-
-type WalkUpload struct {
- MaxWriters int
- kc IKeepClient
- stripPrefix string
- streamMap map[string]*CollectionFileWriter
- status *log.Logger
- workers chan struct{}
- mtx sync.Mutex
-}
-
-func (m *WalkUpload) UploadFile(path string, sourcePath string) error {
- var dir string
- basename := filepath.Base(path)
- if len(path) > (len(m.stripPrefix) + len(basename) + 1) {
- dir = path[len(m.stripPrefix)+1 : (len(path) - len(basename) - 1)]
- }
- if dir == "" {
- dir = "."
- }
-
- fn := path[(len(path) - len(basename)):]
-
- info, err := os.Stat(sourcePath)
- if err != nil {
- return err
- }
- file, err := os.Open(sourcePath)
- if err != nil {
- return err
- }
- defer file.Close()
-
- if m.streamMap[dir] == nil {
- m.streamMap[dir] = &CollectionFileWriter{
- m.kc,
- &manifest.ManifestStream{StreamName: dir},
- 0,
- 0,
- nil,
- make(chan *Block),
- make(chan []error),
- ""}
-
- m.mtx.Lock()
- if m.workers == nil {
- if m.MaxWriters < 1 {
- m.MaxWriters = 2
- }
- m.workers = make(chan struct{}, m.MaxWriters)
- }
- m.mtx.Unlock()
-
- go m.streamMap[dir].goUpload(m.workers)
- }
-
- fileWriter := m.streamMap[dir]
-
- // Reset the CollectionFileWriter for a new file
- fileWriter.NewFile(fn)
-
- m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
-
- _, err = io.Copy(fileWriter, file)
- if err != nil {
- m.status.Printf("Uh oh")
- return err
- }
-
- // Commits the current file. Legal to call this repeatedly.
- fileWriter.Close()
-
- return nil
-}
-
-func (cw *CollectionWriter) BeginUpload(root string, status *log.Logger) *WalkUpload {
- streamMap := make(map[string]*CollectionFileWriter)
- return &WalkUpload{0, cw.IKeepClient, root, streamMap, status, nil, sync.Mutex{}}
-}
-
-func (cw *CollectionWriter) EndUpload(wu *WalkUpload) {
- cw.mtx.Lock()
- for _, st := range wu.streamMap {
- cw.Streams = append(cw.Streams, st)
- }
- cw.mtx.Unlock()
-}
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-import (
- "io/ioutil"
- "log"
- "os"
- "path/filepath"
- "sync"
- "syscall"
-
- . "gopkg.in/check.v1"
-)
-
-type UploadTestSuite struct{}
-
-// Gocheck boilerplate
-var _ = Suite(&UploadTestSuite{})
-
-func writeTree(cw *CollectionWriter, root string, status *log.Logger) (mt string, err error) {
- walkUpload := cw.BeginUpload(root, status)
-
- err = filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
- info, _ = os.Stat(path)
- if info.Mode().IsRegular() {
- return walkUpload.UploadFile(path, path)
- }
- return nil
- })
-
- cw.EndUpload(walkUpload)
- if err != nil {
- return "", err
- }
- mt, err = cw.ManifestText()
- return
-}
-
-func (s *TestSuite) TestSimpleUpload(c *C) {
- tmpdir, _ := ioutil.TempDir("", "")
- defer func() {
- os.RemoveAll(tmpdir)
- }()
-
- ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
-
- kc := &KeepTestClient{}
- defer kc.Close()
- cw := CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
- str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
- c.Check(err, IsNil)
- c.Check(str, Equals, ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt\n")
-}
-
-func (s *TestSuite) TestUploadThreeFiles(c *C) {
- tmpdir, _ := ioutil.TempDir("", "")
- defer func() {
- os.RemoveAll(tmpdir)
- }()
-
- for _, err := range []error{
- ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600),
- ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600),
- os.Symlink("./file2.txt", tmpdir+"/file3.txt"),
- syscall.Mkfifo(tmpdir+"/ignore.fifo", 0600),
- } {
- c.Assert(err, IsNil)
- }
-
- kc := &KeepTestClient{}
- defer kc.Close()
- cw := CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
- str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
-
- c.Check(err, IsNil)
- c.Check(str, Equals, ". aa65a413921163458c52fea478d5d3ee+9 0:3:file1.txt 3:3:file2.txt 6:3:file3.txt\n")
-}
-
-func (s *TestSuite) TestSimpleUploadSubdir(c *C) {
- tmpdir, _ := ioutil.TempDir("", "")
- defer func() {
- os.RemoveAll(tmpdir)
- }()
-
- os.Mkdir(tmpdir+"/subdir", 0700)
-
- ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
- ioutil.WriteFile(tmpdir+"/subdir/file2.txt", []byte("bar"), 0600)
-
- kc := &KeepTestClient{}
- defer kc.Close()
- cw := CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
- str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
-
- c.Check(err, IsNil)
-
- // streams can get added in either order because of scheduling
- // of goroutines.
- if str != `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
-./subdir 37b51d194a7513e45b56f6524f2d51f2+3 0:3:file2.txt
-` && str != `./subdir 37b51d194a7513e45b56f6524f2d51f2+3 0:3:file2.txt
-. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
-` {
- c.Error("Did not get expected manifest text")
- }
-}
-
-func (s *TestSuite) TestSimpleUploadLarge(c *C) {
- tmpdir, _ := ioutil.TempDir("", "")
- defer func() {
- os.RemoveAll(tmpdir)
- }()
-
- file, _ := os.Create(tmpdir + "/" + "file1.txt")
- data := make([]byte, 1024*1024-1)
- for i := range data {
- data[i] = byte(i % 10)
- }
- for i := 0; i < 65; i++ {
- file.Write(data)
- }
- file.Close()
-
- ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
-
- kc := &KeepTestClient{}
- defer kc.Close()
- cw := CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
- str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
-
- c.Check(err, IsNil)
- c.Check(str, Equals, ". 00ecf01e0d93385115c9f8bed757425d+67108864 485cd630387b6b1846fe429f261ea05f+1048514 0:68157375:file1.txt 68157375:3:file2.txt\n")
-}
-
-func (s *TestSuite) TestUploadEmptySubdir(c *C) {
- tmpdir, _ := ioutil.TempDir("", "")
- defer func() {
- os.RemoveAll(tmpdir)
- }()
-
- os.Mkdir(tmpdir+"/subdir", 0700)
-
- ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
-
- kc := &KeepTestClient{}
- defer kc.Close()
- cw := CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
- str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
-
- c.Check(err, IsNil)
- c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
-`)
-}
-
-func (s *TestSuite) TestUploadEmptyFile(c *C) {
- tmpdir, _ := ioutil.TempDir("", "")
- defer func() {
- os.RemoveAll(tmpdir)
- }()
-
- ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte(""), 0600)
-
- kc := &KeepTestClient{}
- defer kc.Close()
- cw := CollectionWriter{0, kc, nil, nil, sync.Mutex{}}
- str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
-
- c.Check(err, IsNil)
- c.Check(str, Equals, `. d41d8cd98f00b204e9800998ecf8427e+0 0:0:file1.txt
-`)
-}
-
-func (s *TestSuite) TestUploadError(c *C) {
- tmpdir, _ := ioutil.TempDir("", "")
- defer func() {
- os.RemoveAll(tmpdir)
- }()
-
- ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
-
- cw := CollectionWriter{0, &KeepErrorTestClient{}, nil, nil, sync.Mutex{}}
- str, err := writeTree(&cw, tmpdir, log.New(os.Stdout, "", 0))
-
- c.Check(err, NotNil)
- c.Check(str, Equals, "")
-}
include agpl-3.0.txt
include arvados-docker-cleaner.service
+include arvados_version.py
\ No newline at end of file
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import subprocess
+import time
+import os
+import re
+
+def git_latest_tag():
+ gitinfo = subprocess.check_output(
+ ['git', 'describe', '--abbrev=0']).strip()
+ return str(gitinfo.decode('utf-8'))
+
+def git_timestamp_tag():
+ gitinfo = subprocess.check_output(
+ ['git', 'log', '--first-parent', '--max-count=1',
+ '--format=format:%ct', '.']).strip()
+ return str(time.strftime('.%Y%m%d%H%M%S', time.gmtime(int(gitinfo))))
+
+def save_version(setup_dir, module, v):
+ with open(os.path.join(setup_dir, module, "_version.py"), 'w') as fp:
+ return fp.write("__version__ = '%s'\n" % v)
+
+def read_version(setup_dir, module):
+ with open(os.path.join(setup_dir, module, "_version.py"), 'r') as fp:
+ return re.match("__version__ = '(.*)'$", fp.read()).groups()[0]
+
+def get_version(setup_dir, module):
+ env_version = os.environ.get("ARVADOS_BUILDING_VERSION")
+
+ if env_version:
+ save_version(setup_dir, module, env_version)
+ else:
+ try:
+ save_version(setup_dir, module, git_latest_tag() + git_timestamp_tag())
+ except subprocess.CalledProcessError:
+ pass
+
+ return read_version(setup_dir, module)
#
# SPDX-License-Identifier: AGPL-3.0
+from __future__ import absolute_import
import os
import sys
-import setuptools.command.egg_info as egg_info_cmd
+import re
from setuptools import setup, find_packages
-tagger = egg_info_cmd.egg_info
-version = os.environ.get("ARVADOS_BUILDING_VERSION")
-if not version:
- version = "0.1"
- try:
- import gittaggers
- tagger = gittaggers.EggInfoFromGit
- except ImportError:
- pass
+SETUP_DIR = os.path.dirname(__file__) or '.'
+README = os.path.join(SETUP_DIR, 'README.rst')
+
+import arvados_version
+version = arvados_version.get_version(SETUP_DIR, "arvados_docker")
+
+short_tests_only = False
+if '--short-tests-only' in sys.argv:
+ short_tests_only = True
+ sys.argv.remove('--short-tests-only')
setup(name="arvados-docker-cleaner",
version=version,
'mock',
],
test_suite='tests',
- zip_safe=False,
- cmdclass={'egg_info': tagger},
+ zip_safe=False
)
include agpl-3.0.txt
include README.rst
+include arvados_version.py
\ No newline at end of file
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-import pkg_resources
-
-__version__ = pkg_resources.require('arvados_fuse')[0].version
README_TEXT = """
This directory provides access to Arvados collections as subdirectories listed
by uuid (in the form 'zzzzz-4zz18-1234567890abcde') or portable data hash (in
-the form '1234567890abcdef0123456789abcdef+123').
+the form '1234567890abcdef0123456789abcdef+123'), and Arvados projects by uuid
+(in the form 'zzzzz-j7d0g-1234567890abcde').
Note that this directory will appear empty until you attempt to access a
-specific collection subdirectory (such as trying to 'cd' into it), at which
-point the collection will actually be looked up on the server and the directory
-will appear if it exists.
+specific collection or project subdirectory (such as trying to 'cd' into it),
+at which point the collection or project will actually be looked up on the server
+and the directory will appear if it exists.
""".lstrip()
try:
e = None
- e = self.inodes.add_entry(CollectionDirectory(
- self.inode, self.inodes, self.api, self.num_retries, k))
+
+ if group_uuid_pattern.match(k):
+ project = self.api.groups().list(
+ filters=[['group_class', '=', 'project'], ["uuid", "=", k]]).execute(num_retries=self.num_retries)
+ if project[u'items_available'] == 0:
+ return False
+ e = self.inodes.add_entry(ProjectDirectory(
+ self.inode, self.inodes, self.api, self.num_retries, project[u'items'][0]))
+ else:
+ e = self.inodes.add_entry(CollectionDirectory(
+ self.inode, self.inodes, self.api, self.num_retries, k))
if e.update():
if k not in self._entries:
self.inodes.add_entry(self.project_object_file)
if not self._full_listing:
- return
+ return True
def samefn(a, i):
if isinstance(a, CollectionDirectory) or isinstance(a, ProjectDirectory):
self.namefn,
samefn,
self.createDirectory)
+ return True
finally:
self._updating_lock.release()
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import subprocess
+import time
+import os
+import re
+
+def git_latest_tag():
+ gitinfo = subprocess.check_output(
+ ['git', 'describe', '--abbrev=0']).strip()
+ return str(gitinfo.decode('utf-8'))
+
+def git_timestamp_tag():
+ gitinfo = subprocess.check_output(
+ ['git', 'log', '--first-parent', '--max-count=1',
+ '--format=format:%ct', '.']).strip()
+ return str(time.strftime('.%Y%m%d%H%M%S', time.gmtime(int(gitinfo))))
+
+def save_version(setup_dir, module, v):
+ with open(os.path.join(setup_dir, module, "_version.py"), 'w') as fp:
+ return fp.write("__version__ = '%s'\n" % v)
+
+def read_version(setup_dir, module):
+ with open(os.path.join(setup_dir, module, "_version.py"), 'r') as fp:
+ return re.match("__version__ = '(.*)'$", fp.read()).groups()[0]
+
+def get_version(setup_dir, module):
+ env_version = os.environ.get("ARVADOS_BUILDING_VERSION")
+
+ if env_version:
+ save_version(setup_dir, module, env_version)
+ else:
+ try:
+ save_version(setup_dir, module, git_latest_tag() + git_timestamp_tag())
+ except subprocess.CalledProcessError:
+ pass
+
+ return read_version(setup_dir, module)
#
# SPDX-License-Identifier: AGPL-3.0
+from __future__ import absolute_import
import os
import sys
-import setuptools.command.egg_info as egg_info_cmd
+import re
from setuptools import setup, find_packages
SETUP_DIR = os.path.dirname(__file__) or '.'
README = os.path.join(SETUP_DIR, 'README.rst')
-tagger = egg_info_cmd.egg_info
-version = os.environ.get("ARVADOS_BUILDING_VERSION")
-if not version:
- version = "0.1"
- try:
- import gittaggers
- tagger = gittaggers.EggInfoFromGit
- except ImportError:
- pass
+import arvados_version
+version = arvados_version.get_version(SETUP_DIR, "arvados_fuse")
short_tests_only = False
if '--short-tests-only' in sys.argv:
],
test_suite='tests',
tests_require=['pbr<1.7.0', 'mock>=1.0', 'PyYAML'],
- zip_safe=False,
- cmdclass={'egg_info': tagger},
+ zip_safe=False
)
def setUp(self, api=None):
super(FuseMagicTest, self).setUp(api=api)
+ self.test_project = run_test_server.fixture('groups')['aproject']['uuid']
+ self.non_project_group = run_test_server.fixture('groups')['public']['uuid']
+ self.collection_in_test_project = run_test_server.fixture('collections')['foo_collection_in_aproject']['name']
+
cw = arvados.CollectionWriter()
cw.start_new_file('thing1.txt')
self.assertFalse(any(arvados.util.keep_locator_pattern.match(fn) or
arvados.util.uuid_pattern.match(fn)
for fn in mount_ls),
- "new FUSE MagicDirectory lists Collection")
+ "new FUSE MagicDirectory has no collections or projects")
self.assertDirContents(self.testcollection, ['thing1.txt'])
self.assertDirContents(os.path.join('by_id', self.testcollection),
['thing1.txt'])
+ self.assertIn(self.collection_in_test_project,
+ llfuse.listdir(os.path.join(self.mounttmp, self.test_project)))
+ self.assertIn(self.collection_in_test_project,
+ llfuse.listdir(os.path.join(self.mounttmp, 'by_id', self.test_project)))
+
mount_ls = llfuse.listdir(self.mounttmp)
self.assertIn('README', mount_ls)
self.assertIn(self.testcollection, mount_ls)
self.assertIn(self.testcollection,
llfuse.listdir(os.path.join(self.mounttmp, 'by_id')))
+ self.assertIn(self.test_project, mount_ls)
+ self.assertIn(self.test_project,
+ llfuse.listdir(os.path.join(self.mounttmp, 'by_id')))
+
+ with self.assertRaises(OSError):
+ llfuse.listdir(os.path.join(self.mounttmp, 'by_id', self.non_project_group))
files = {}
files[os.path.join(self.mounttmp, self.testcollection, 'thing1.txt')] = 'data 1'
// Discard (but measure size of) anything past 128 MiB.
var discarded int64
if err == io.ErrUnexpectedEOF {
- err = nil
buf = buf[:n]
} else {
c.Assert(err, check.Equals, nil)
}
func (fs *webdavFS) makeparents(name string) {
- dir, name := path.Split(name)
+ dir, _ := path.Split(name)
if dir == "" || dir == "/" {
return
}
// Now try to put the block through
if locatorIn == "" {
- if bytes, err := ioutil.ReadAll(req.Body); err != nil {
- err = errors.New(fmt.Sprintf("Error reading request body: %s", err))
+ bytes, err2 := ioutil.ReadAll(req.Body)
+ if err2 != nil {
+ _ = errors.New(fmt.Sprintf("Error reading request body: %s", err2))
status = http.StatusInternalServerError
return
- } else {
- locatorOut, wroteReplicas, err = kc.PutB(bytes)
}
+ locatorOut, wroteReplicas, err = kc.PutB(bytes)
} else {
locatorOut, wroteReplicas, err = kc.PutHR(locatorIn, req.Body, expectLength)
}
req, err := http.NewRequest("POST",
"http://"+listener.Addr().String()+"/",
strings.NewReader("TestViaHeader"))
+ c.Assert(err, Equals, nil)
req.Header.Add("Authorization", "OAuth2 "+arvadostest.ActiveToken)
resp, err := (&http.Client{}).Do(req)
c.Assert(err, Equals, nil)
reader, blocklen, _, err := kc.Get(hash2)
c.Assert(err, Equals, nil)
all, err := ioutil.ReadAll(reader)
+ c.Check(err, IsNil)
c.Check(all, DeepEquals, []byte("foo"))
c.Check(blocklen, Equals, int64(3))
c.Log("Finished Get (expected success)")
reader, blocklen, _, err := kc.Get("d41d8cd98f00b204e9800998ecf8427e")
c.Assert(err, Equals, nil)
all, err := ioutil.ReadAll(reader)
+ c.Check(err, IsNil)
c.Check(all, DeepEquals, []byte(""))
c.Check(blocklen, Equals, int64(0))
c.Log("Finished Get zero block")
req, err := http.NewRequest("OPTIONS",
fmt.Sprintf("http://%s/%x+3", listener.Addr().String(), md5.Sum([]byte("foo"))),
nil)
+ c.Assert(err, IsNil)
req.Header.Add("Access-Control-Request-Method", "PUT")
req.Header.Add("Access-Control-Request-Headers", "Authorization, X-Keep-Desired-Replicas")
resp, err := client.Do(req)
c.Check(err, Equals, nil)
c.Check(resp.StatusCode, Equals, 200)
body, err := ioutil.ReadAll(resp.Body)
+ c.Check(err, IsNil)
c.Check(string(body), Equals, "")
c.Check(resp.Header.Get("Access-Control-Allow-Methods"), Equals, "GET, HEAD, POST, PUT, OPTIONS")
c.Check(resp.Header.Get("Access-Control-Allow-Origin"), Equals, "*")
req, err := http.NewRequest("POST",
"http://"+listener.Addr().String()+"/",
strings.NewReader("qux"))
+ c.Check(err, IsNil)
req.Header.Add("Authorization", "OAuth2 "+arvadostest.ActiveToken)
req.Header.Add("Content-Type", "application/octet-stream")
resp, err := client.Do(req)
c.Check(err, Equals, nil)
reader, blocklen, _, err := kc.Get(hash)
- c.Assert(err, Equals, nil)
+ c.Assert(err, IsNil)
c.Check(blocklen, Equals, int64(10))
all, err := ioutil.ReadAll(reader)
+ c.Assert(err, IsNil)
c.Check(all, DeepEquals, data)
// Put some more blocks
- _, rep, err = kc.PutB([]byte("some-more-index-data"))
- c.Check(err, Equals, nil)
+ _, _, err = kc.PutB([]byte("some-more-index-data"))
+ c.Check(err, IsNil)
kc.Arvados.ApiToken = arvadostest.DataManagerToken
}
// Call Trash, then check canTrash and canGetAfterTrash
- loc, blk = setupScenario()
+ loc, _ = setupScenario()
err = v.Trash(loc)
c.Check(err == nil, check.Equals, scenario.canTrash)
_, err = v.Get(context.Background(), loc, buf)
}
// Call Untrash, then check canUntrash
- loc, blk = setupScenario()
+ loc, _ = setupScenario()
err = v.Untrash(loc)
c.Check(err == nil, check.Equals, scenario.canUntrash)
if scenario.dataT != none || scenario.trashT != none {
// Call EmptyTrash, then check haveTrashAfterEmpty and
// freshAfterEmpty
- loc, blk = setupScenario()
+ loc, _ = setupScenario()
v.EmptyTrash()
_, err = v.bucket.Head("trash/"+loc, nil)
c.Check(err == nil, check.Equals, scenario.haveTrashAfterEmpty)
// goes away.
// (In Azure volumes, un/trash changes Mtime, so first backdate again)
v.TouchWithDate(TestHash, time.Now().Add(-2*theConfig.BlobSignatureTTL.Duration()))
- err = v.Trash(TestHash)
+ _ = v.Trash(TestHash)
err = checkGet()
if err == nil || !os.IsNotExist(err) {
t.Fatalf("os.IsNotExist(%v) should have been true", err)
"+", fileInfo[0].Size(),
" ", fileInfo[0].ModTime().UnixNano(),
"\n")
+ if err != nil {
+ log.Print("Error writing : ", err)
+ lastErr = err
+ break
+ }
}
blockdir.Close()
}
exit
end
+git_latest_tag = `git describe --abbrev=0`
+git_latest_tag = git_latest_tag.encode('utf-8').strip
git_timestamp, git_hash = `git log -n1 --first-parent --format=%ct:%H .`.chomp.split(":")
git_timestamp = Time.at(git_timestamp.to_i).utc
Gem::Specification.new do |s|
s.name = 'arvados-login-sync'
- s.version = "0.1.#{git_timestamp.strftime('%Y%m%d%H%M%S')}"
+ s.version = "#{git_latest_tag}.#{git_timestamp.strftime('%Y%m%d%H%M%S')}"
s.date = git_timestamp.strftime("%Y-%m-%d")
s.summary = "Set up local login accounts for Arvados users"
s.description = "Creates and updates local login accounts for Arvados users. Built from git commit #{git_hash}"
include agpl-3.0.txt
include README.rst
+include arvados_version.py
\ No newline at end of file
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import subprocess
+import time
+import os
+import re
+
+def git_latest_tag():
+ gitinfo = subprocess.check_output(
+ ['git', 'describe', '--abbrev=0']).strip()
+ return str(gitinfo.decode('utf-8'))
+
+def git_timestamp_tag():
+ gitinfo = subprocess.check_output(
+ ['git', 'log', '--first-parent', '--max-count=1',
+ '--format=format:%ct', '.']).strip()
+ return str(time.strftime('.%Y%m%d%H%M%S', time.gmtime(int(gitinfo))))
+
+def save_version(setup_dir, module, v):
+ with open(os.path.join(setup_dir, module, "_version.py"), 'w') as fp:
+ return fp.write("__version__ = '%s'\n" % v)
+
+def read_version(setup_dir, module):
+ with open(os.path.join(setup_dir, module, "_version.py"), 'r') as fp:
+ return re.match("__version__ = '(.*)'$", fp.read()).groups()[0]
+
+def get_version(setup_dir, module):
+ env_version = os.environ.get("ARVADOS_BUILDING_VERSION")
+
+ if env_version:
+ save_version(setup_dir, module, env_version)
+ else:
+ try:
+ save_version(setup_dir, module, git_latest_tag() + git_timestamp_tag())
+ except subprocess.CalledProcessError:
+ pass
+
+ return read_version(setup_dir, module)
+++ /dev/null
-# Copyright (C) The Arvados Authors. All rights reserved.
-#
-# SPDX-License-Identifier: AGPL-3.0
-
-import pkg_resources
-
-__version__ = pkg_resources.require('arvados-node-manager')[0].version
import pykka
+from .status import tracker
+
class _TellCallableProxy(object):
"""Internal helper class for proxying callables."""
exception_type is OSError and exception_value.errno == errno.ENOMEM):
lg.critical("Unhandled exception is a fatal error, killing Node Manager")
self._killfunc(os.getpid(), signal.SIGKILL)
+ tracker.counter_add('actor_exceptions')
def ping(self):
return True
import time
from ..config import CLOUD_ERRORS
+from ..status import tracker
from libcloud.common.exceptions import BaseHTTPError, RateLimitReachedError
ARVADOS_TIMEFMT = '%Y-%m-%dT%H:%M:%SZ'
if error.code == 429 or error.code >= 500:
should_retry = True
except CLOUD_ERRORS as error:
+ tracker.counter_add('cloud_errors')
should_retry = True
except errors as error:
should_retry = True
# As a libcloud workaround for drivers that don't use
# typed exceptions, consider bare Exception() objects
# retryable.
- should_retry = type(error) is Exception
+ if type(error) is Exception:
+ tracker.counter_add('cloud_errors')
+ should_retry = True
else:
- # No exception,
+ # No exception
self.retry_wait = self.min_retry_wait
return ret
arvados_node_missing, RetryMixin
from ...clientactor import _notify_subscribers
from ... import config
+from ... import status
from .transitions import transitions
QuotaExceeded = "QuotaExceeded"
self.cancel_shutdown("No longer eligible for shut down because %s" % reason,
try_resume=True)
return
+ # If boot failed, count the event
+ if self._monitor.get_state().get() == 'unpaired':
+ status.tracker.counter_add('boot_failures')
self._destroy_node()
def _destroy_node(self):
#if state == 'idle' and self.arvados_node['job_uuid']:
# state = 'busy'
+ # Update idle node times tracker
+ if state == 'idle':
+ status.tracker.idle_in(self.arvados_node['hostname'])
+ else:
+ status.tracker.idle_out(self.arvados_node['hostname'])
+
return state
def in_state(self, *states):
from libcloud.compute.base import NodeDriver, NodeAuthSSHKey
from ...config import CLOUD_ERRORS
+from ...status import tracker
from .. import RetryMixin
class BaseComputeNodeDriver(RetryMixin):
def list_nodes(self, **kwargs):
l = self.list_kwargs.copy()
l.update(kwargs)
- return self.real.list_nodes(**l)
+ try:
+ return self.real.list_nodes(**l)
+ except CLOUD_ERRORS:
+ tracker.counter_add('list_nodes_errors')
+ raise
def create_cloud_name(self, arvados_node):
"""Return a cloud node name for the given Arvados node record.
try:
return self.search_for_now(kwargs['name'], 'list_nodes', self._name_key)
except ValueError:
+ tracker.counter_add('create_node_errors')
raise create_error
def post_create_node(self, cloud_node):
def destroy_node(self, cloud_node):
try:
return self.real.destroy_node(cloud_node)
- except CLOUD_ERRORS as destroy_error:
+ except CLOUD_ERRORS:
# Sometimes the destroy node request succeeds but times out and
# raises an exception instead of returning success. If this
# happens, we get a noisy stack trace. Check if the node is still
# it, which means destroy_node actually succeeded.
return True
# The node is still on the list. Re-raise.
+ tracker.counter_add('destroy_node_errors')
raise
# Now that we've defined all our own methods, delegate generic, public
if hasattr(record.cloud_node, "_nodemanager_recently_booted"):
self.cloud_nodes.add(record)
else:
- # Node disappeared from the cloud node list. Stop the monitor
- # actor if necessary and forget about the node.
+ # Node disappeared from the cloud node list. If it's paired,
+ # remove its idle time counter.
+ if record.arvados_node:
+ status.tracker.idle_out(record.arvados_node.get('hostname'))
+ # Stop the monitor actor if necessary and forget about the node.
if record.actor:
try:
record.actor.stop()
updates.setdefault('nodes_'+s, 0)
updates['nodes_'+s] += 1
updates['nodes_wish'] = len(self.last_wishlist)
+ updates['node_quota'] = self.node_quota
status.tracker.update(updates)
def _state_counts(self, size):
def update_server_wishlist(self, wishlist):
self._update_poll_time('server_wishlist')
- self.last_wishlist = wishlist
+ requestable_nodes = self.node_quota - (self._nodes_booting(None) + len(self.cloud_nodes))
+ self.last_wishlist = wishlist[:requestable_nodes]
for size in reversed(self.server_calculator.cloud_sizes):
try:
nodes_wanted = self._nodes_wanted(size)
self._later.start_node(size)
elif (nodes_wanted < 0) and self.booting:
self._later.stop_booting_node(size)
- except Exception as e:
+ except Exception:
self._logger.exception("while calculating nodes wanted for size %s", getattr(size, "id", "(id not available)"))
try:
self._update_tracker()
def _send_request(self):
queuelist = []
if self.slurm_queue:
- # cpus, memory, tempory disk space, reason, job name, feature constraints
- squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j|%f"])
+ # cpus, memory, tempory disk space, reason, job name, feature constraints, priority
+ squeue_out = subprocess.check_output(["squeue", "--state=PENDING", "--noheader", "--format=%c|%m|%d|%r|%j|%f|%Q"])
for out in squeue_out.splitlines():
try:
- cpu, ram, disk, reason, jobname, features = out.split("|", 5)
+ cpu, ram, disk, reason, jobname, features, priority = out.split("|", 6)
except ValueError:
self._logger.warning("ignored malformed line in squeue output: %r", out)
continue
"uuid": jobname,
"runtime_constraints": {
"instance_type": instance_type,
- }
+ },
+ "priority": int(priority)
})
break
else:
"min_cores_per_node": cpu,
"min_ram_mb_per_node": self.coerce_to_mb(ram),
"min_scratch_mb_per_node": self.coerce_to_mb(disk)
- }
+ },
+ "priority": int(priority)
})
+ queuelist.sort(key=lambda x: x.get('priority', 1), reverse=True)
if self.jobs_queue:
queuelist.extend(self._client.jobs().queue().execute()['items'])
from future import standard_library
import http.server
+import time
import json
import logging
import socketserver
return
self._config = config
self._tracker = tracker
+ self._tracker.update({'config_max_nodes': config.getint('Daemon', 'max_nodes')})
super(Server, self).__init__(
(config.get('Manage', 'address'), port), Handler)
self._thread = threading.Thread(target=self.serve_forever)
class Tracker(object):
def __init__(self):
self._mtx = threading.Lock()
- self._latest = {}
+ self._latest = {
+ 'list_nodes_errors': 0,
+ 'create_node_errors': 0,
+ 'destroy_node_errors': 0,
+ 'boot_failures': 0,
+ 'actor_exceptions': 0
+ }
self._version = {'Version' : __version__}
+ self._idle_nodes = {}
def get_json(self):
with self._mtx:
- return json.dumps(dict(self._latest, **self._version))
+ times = {'idle_times' : {}}
+ now = time.time()
+ for node, ts in self._idle_nodes.items():
+ times['idle_times'][node] = int(now - ts)
+ return json.dumps(
+ dict(dict(self._latest, **self._version), **times))
def keys(self):
with self._mtx:
return self._latest.keys()
+ def get(self, key):
+ with self._mtx:
+ return self._latest.get(key)
+
def update(self, updates):
with self._mtx:
self._latest.update(updates)
+ def counter_add(self, counter, value=1):
+ with self._mtx:
+ self._latest.setdefault(counter, 0)
+ self._latest[counter] += value
+
+ def idle_in(self, nodename):
+ with self._mtx:
+ if self._idle_nodes.get(nodename):
+ return
+ self._idle_nodes[nodename] = time.time()
+
+ def idle_out(self, nodename):
+ with self._mtx:
+ try:
+ del self._idle_nodes[nodename]
+ except KeyError:
+ pass
tracker = Tracker()
#
# SPDX-License-Identifier: AGPL-3.0
+from __future__ import absolute_import
import os
import sys
-import setuptools.command.egg_info as egg_info_cmd
+import re
from setuptools import setup, find_packages
-SETUP_DIR = os.path.dirname(__file__) or "."
+SETUP_DIR = os.path.dirname(__file__) or '.'
README = os.path.join(SETUP_DIR, 'README.rst')
-tagger = egg_info_cmd.egg_info
-version = os.environ.get("ARVADOS_BUILDING_VERSION")
-if not version:
- version = "0.1"
- try:
- import gittaggers
- tagger = gittaggers.EggInfoFromGit
- except ImportError:
- pass
+import arvados_version
+version = arvados_version.get_version(SETUP_DIR, "arvnodeman")
+
+short_tests_only = False
+if '--short-tests-only' in sys.argv:
+ short_tests_only = True
+ sys.argv.remove('--short-tests-only')
setup(name='arvados-node-manager',
version=version,
'mock>=1.0',
'apache-libcloud>=2.3',
],
- zip_safe=False,
- cmdclass={'egg_info': tagger},
+ zip_safe=False
)
def set_squeue(g):
global all_jobs
update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n" +
- "\n".join("echo '1|100|100|%s|%s|(null)'" % (v, k) for k,v in all_jobs.items()))
+ "\n".join("echo '1|100|100|%s|%s|(null)|1234567890'" % (v, k) for k,v in all_jobs.items()))
return 0
def set_queue_unsatisfiable(g):
global all_jobs, unsatisfiable_job_scancelled
# Simulate a job requesting a 99 core node.
update_script(os.path.join(fake_slurm, "squeue"), "#!/bin/sh\n" +
- "\n".join("echo '99|100|100|%s|%s|(null)'" % (v, k) for k,v in all_jobs.items()))
+ "\n".join("echo '99|100|100|%s|%s|(null)|1234567890'" % (v, k) for k,v in all_jobs.items()))
update_script(os.path.join(fake_slurm, "scancel"), "#!/bin/sh\n" +
"\ntouch %s" % unsatisfiable_job_scancelled)
return 0
# Provider
"azure"),
"test_single_node_azure": (
+ # Actions (pattern -> action)
[
(r".*Daemon started", set_squeue),
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
(r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
(r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
- ], {
+ ],
+ # Checks (things that shouldn't happen)
+ {
r".*Suggesting shutdown because node state is \('down', .*\)": fail,
r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 1),
r".*Setting node quota.*": fail,
},
+ # Driver class
"arvnodeman.test.fake_driver.FakeDriver",
+ # Jobs
{"34t0i-dz642-h42bg3hq4bdfpf9": "ReqNodeNotAvail"},
+ # Provider
"azure"),
"test_multiple_nodes": (
+ # Actions (pattern -> action)
[
(r".*Daemon started", set_squeue),
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
- ], {
+ ],
+ # Checks (things that shouldn't happen)
+ {
r".*Suggesting shutdown because node state is \('down', .*\)": fail,
r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 4),
r".*Setting node quota.*": fail,
},
+ # Driver class
"arvnodeman.test.fake_driver.FakeDriver",
+ # Jobs
{"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail",
"34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
"34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
- "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
- }, "azure"),
+ "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"},
+ # Provider
+ "azure"),
"test_hit_quota": (
+ # Actions (pattern -> action)
[
(r".*Daemon started", set_squeue),
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
(r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
(r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", remaining_jobs),
- (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown)
- ], {
+ ],
+ # Checks (things that shouldn't happen)
+ {
r".*Suggesting shutdown because node state is \('down', .*\)": fail,
r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 2),
r".*Sending create_node request.*": partial(expect_count, 5)
},
+ # Driver class
"arvnodeman.test.fake_driver.QuotaDriver",
+ # Jobs
{"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail",
"34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
"34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
- "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
- }, "azure"),
+ "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"},
+ # Provider
+ "azure"),
"test_probe_quota": (
+ # Actions (pattern -> action)
[
(r".*Daemon started", set_squeue),
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
(r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
(r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", remaining_jobs),
- (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
(r".*sending request", jobs_req),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
- ], {
+ ],
+ # Checks (things that shouldn't happen)
+ {
r".*Suggesting shutdown because node state is \('down', .*\)": fail,
r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 6),
r".*Sending create_node request.*": partial(expect_count, 9)
},
+ # Driver class
"arvnodeman.test.fake_driver.QuotaDriver",
+ # Jobs
{"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail",
"34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
"34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
- "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
- }, "azure"),
+ "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"},
+ # Provider
+ "azure"),
"test_no_hang_failing_node_create": (
+ # Actions (pattern -> action)
[
(r".*Daemon started", set_squeue),
(r".*Client error: nope", noop),
(r".*Client error: nope", noop),
(r".*Client error: nope", noop),
],
+ # Checks (things that shouldn't happen)
{},
+ # Driver class
"arvnodeman.test.fake_driver.FailingDriver",
+ # Jobs
{"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail",
"34t0i-dz642-h42bg3hq4bdfpf2": "ReqNodeNotAvail",
"34t0i-dz642-h42bg3hq4bdfpf3": "ReqNodeNotAvail",
- "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"
- }, "azure"),
+ "34t0i-dz642-h42bg3hq4bdfpf4": "ReqNodeNotAvail"},
+ # Provider
+ "azure"),
"test_retry_create": (
+ # Actions (pattern -> action)
[
(r".*Daemon started", set_squeue),
(r".*Rate limit exceeded - scheduling retry in 12 seconds", noop),
(r".*Rate limit exceeded - scheduling retry in 2 seconds", noop),
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", noop),
],
+ # Checks (things that shouldn't happen)
{},
+ # Driver class
"arvnodeman.test.fake_driver.RetryDriver",
- {"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail"
- }, "azure"),
+ # Jobs
+ {"34t0i-dz642-h42bg3hq4bdfpf1": "ReqNodeNotAvail"},
+ # Provider
+ "azure"),
"test_single_node_aws": (
+ # Actions (pattern -> action)
[
(r".*Daemon started", set_squeue),
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
(r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
(r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
- ], {
+ ],
+ # Checks (things that shouldn't happen)
+ {
r".*Suggesting shutdown because node state is \('down', .*\)": fail,
r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 1),
r".*Setting node quota.*": fail,
},
+ # Driver class
"arvnodeman.test.fake_driver.FakeAwsDriver",
+ # Jobs
{"34t0i-dz642-h42bg3hq4bdfpf9": "ReqNodeNotAvail"},
+ # Provider
"ec2"),
"test_single_node_gce": (
+ # Actions (pattern -> action)
[
(r".*Daemon started", set_squeue),
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
(r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
(r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
- ], {
+ ],
+ # Checks (things that shouldn't happen)
+ {
r".*Suggesting shutdown because node state is \('down', .*\)": fail,
r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 1),
r".*Setting node quota.*": fail,
},
+ # Driver class
"arvnodeman.test.fake_driver.FakeGceDriver",
+ # Jobs
{"34t0i-dz642-h42bg3hq4bdfpf9": "ReqNodeNotAvail"},
+ # Provider
"gce")
}
from libcloud.common.exceptions import BaseHTTPError
import arvnodeman.computenode.dispatch as dispatch
+import arvnodeman.status as status
from arvnodeman.computenode.driver import BaseComputeNodeDriver
from . import testutil
def check_success_flag(self, expected, allow_msg_count=1):
# allow_msg_count is the number of internal messages that may
# need to be handled for shutdown to finish.
- for try_num in range(1 + allow_msg_count):
+ for _ in range(1 + allow_msg_count):
last_flag = self.shutdown_actor.success.get(self.TIMEOUT)
if last_flag is expected:
break
else:
self.fail("success flag {} is not {}".format(last_flag, expected))
+ def test_boot_failure_counting(self, *mocks):
+ # A boot failure happens when a node transitions from unpaired to shutdown
+ status.tracker.update({'boot_failures': 0})
+ self.make_mocks(shutdown_open=True, arvados_node=testutil.arvados_node_mock(crunch_worker_state="unpaired"))
+ self.cloud_client.destroy_node.return_value = True
+ self.make_actor(cancellable=False)
+ self.check_success_flag(True, 2)
+ self.assertTrue(self.cloud_client.destroy_node.called)
+ self.assertEqual(1, status.tracker.get('boot_failures'))
+
def test_cancellable_shutdown(self, *mocks):
self.make_mocks(shutdown_open=True, arvados_node=testutil.arvados_node_mock(crunch_worker_state="busy"))
self.cloud_client.destroy_node.return_value = True
self.assertFalse(self.cloud_client.destroy_node.called)
def test_uncancellable_shutdown(self, *mocks):
+ status.tracker.update({'boot_failures': 0})
self.make_mocks(shutdown_open=True, arvados_node=testutil.arvados_node_mock(crunch_worker_state="busy"))
self.cloud_client.destroy_node.return_value = True
self.make_actor(cancellable=False)
self.check_success_flag(True, 4)
self.assertTrue(self.cloud_client.destroy_node.called)
+ # A normal shutdown shouldn't be counted as boot failure
+ self.assertEqual(0, status.tracker.get('boot_failures'))
def test_arvados_node_cleaned_after_shutdown(self, *mocks):
if len(mocks) == 1:
self.assertTrue(self.node_state('down'))
def test_in_idle_state(self):
+ idle_nodes_before = status.tracker._idle_nodes.keys()
self.make_actor(2, arv_node=testutil.arvados_node_mock(job_uuid=None))
self.assertTrue(self.node_state('idle'))
self.assertFalse(self.node_state('busy'))
self.assertTrue(self.node_state('idle', 'busy'))
+ idle_nodes_after = status.tracker._idle_nodes.keys()
+ new_idle_nodes = [n for n in idle_nodes_after if n not in idle_nodes_before]
+ # There should be 1 additional idle node
+ self.assertEqual(1, len(new_idle_nodes))
def test_in_busy_state(self):
+ idle_nodes_before = status.tracker._idle_nodes.keys()
self.make_actor(3, arv_node=testutil.arvados_node_mock(job_uuid=True))
self.assertFalse(self.node_state('idle'))
self.assertTrue(self.node_state('busy'))
self.assertTrue(self.node_state('idle', 'busy'))
+ idle_nodes_after = status.tracker._idle_nodes.keys()
+ new_idle_nodes = [n for n in idle_nodes_after if n not in idle_nodes_before]
+ # There shouldn't be any additional idle node
+ self.assertEqual(0, len(new_idle_nodes))
def test_init_shutdown_scheduling(self):
self.make_actor()
import mock
import arvnodeman.computenode.driver as driver_base
+import arvnodeman.status as status
+import arvnodeman.config as config
from . import testutil
class ComputeNodeDriverTestCase(unittest.TestCase):
self.assertIs(driver.search_for('id_1', 'list_images'),
driver.search_for('id_1', 'list_images'))
self.assertEqual(1, self.driver_mock().list_images.call_count)
+
+
+ class TestBaseComputeNodeDriver(driver_base.BaseComputeNodeDriver):
+ def arvados_create_kwargs(self, size, arvados_node):
+ return {'name': arvados_node}
+
+
+ def test_create_node_only_cloud_errors_are_counted(self):
+ status.tracker.update({'create_node_errors': 0})
+ errors = [(config.CLOUD_ERRORS[0], True), (KeyError, False)]
+ self.driver_mock().list_images.return_value = []
+ driver = self.TestBaseComputeNodeDriver({}, {}, {}, self.driver_mock)
+ error_count = 0
+ for an_error, is_cloud_error in errors:
+ self.driver_mock().create_node.side_effect = an_error
+ with self.assertRaises(an_error):
+ driver.create_node('1', 'id_1')
+ if is_cloud_error:
+ error_count += 1
+ self.assertEqual(error_count, status.tracker.get('create_node_errors'))
+
+ def test_list_nodes_only_cloud_errors_are_counted(self):
+ status.tracker.update({'list_nodes_errors': 0})
+ errors = [(config.CLOUD_ERRORS[0], True), (KeyError, False)]
+ driver = self.TestBaseComputeNodeDriver({}, {}, {}, self.driver_mock)
+ error_count = 0
+ for an_error, is_cloud_error in errors:
+ self.driver_mock().list_nodes.side_effect = an_error
+ with self.assertRaises(an_error):
+ driver.list_nodes()
+ if is_cloud_error:
+ error_count += 1
+ self.assertEqual(error_count, status.tracker.get('list_nodes_errors'))
+
+ def test_destroy_node_only_cloud_errors_are_counted(self):
+ status.tracker.update({'destroy_node_errors': 0})
+ errors = [(config.CLOUD_ERRORS[0], True), (KeyError, False)]
+ self.driver_mock().list_nodes.return_value = [testutil.MockSize(1)]
+ driver = self.TestBaseComputeNodeDriver({}, {}, {}, self.driver_mock)
+ error_count = 0
+ for an_error, is_cloud_error in errors:
+ self.driver_mock().destroy_node.side_effect = an_error
+ with self.assertRaises(an_error):
+ driver.destroy_node(testutil.MockSize(1))
+ if is_cloud_error:
+ error_count += 1
+ self.assertEqual(error_count, status.tracker.get('destroy_node_errors'))
size = testutil.MockSize(1)
self.make_daemon(want_sizes=[size])
self.busywait(lambda: self.node_setup.start.called)
+ self.assertIn('node_quota', status.tracker._latest)
def check_monitors_arvados_nodes(self, *arv_nodes):
self.busywait(lambda: len(arv_nodes) == len(self.monitored_arvados_nodes()))
self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
self.busywait(lambda: 1 == self.last_shutdown.stop.call_count)
+ def test_idle_node_disappearing_clears_status_idle_time_counter(self):
+ size = testutil.MockSize(1)
+ status.tracker._idle_nodes = {}
+ cloud_nodes = [testutil.cloud_node_mock(1, size=size)]
+ arv_nodes = [testutil.arvados_node_mock(1, job_uuid=None)]
+ self.make_daemon(cloud_nodes, arv_nodes, [size])
+ self.busywait(lambda: 1 == self.paired_monitor_count())
+ for mon_ref in self.monitor_list():
+ monitor = mon_ref.proxy()
+ if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
+ break
+ else:
+ self.fail("monitor for idle node not found")
+ self.assertEqual(1, status.tracker.get('nodes_idle'))
+ hostname = monitor.arvados_node.get()['hostname']
+ self.assertIn(hostname, status.tracker._idle_nodes)
+ # Simulate the node disappearing from the cloud node list
+ self.daemon.update_cloud_nodes([]).get(self.TIMEOUT)
+ self.busywait(lambda: 0 == self.alive_monitor_count())
+ self.assertNotIn(hostname, status.tracker._idle_nodes)
+
def test_shutdown_actor_cleanup_copes_with_dead_actors(self):
self.make_daemon(cloud_nodes=[testutil.cloud_node_mock()])
self.assertEqual(1, self.alive_monitor_count())
big = testutil.MockSize(2)
avail_sizes = [(testutil.MockSize(1), {"cores":1}),
(testutil.MockSize(2), {"cores":2})]
- self.make_daemon(want_sizes=[small, small, small, big],
+ self.make_daemon(want_sizes=[small, small, big, small],
avail_sizes=avail_sizes, max_nodes=3)
# the daemon runs in another thread, so we need to wait and see
self.assertEqual(2, sizecounts[small.id])
self.assertEqual(1, sizecounts[big.id])
+ def test_wishlist_ordering(self):
+ # Check that big nodes aren't prioritized; since #12199 containers are
+ # scheduled on specific node sizes.
+ small = testutil.MockSize(1)
+ big = testutil.MockSize(2)
+ avail_sizes = [(testutil.MockSize(1), {"cores":1}),
+ (testutil.MockSize(2), {"cores":2})]
+ self.make_daemon(want_sizes=[small, small, small, big],
+ avail_sizes=avail_sizes, max_nodes=3)
+
+ # the daemon runs in another thread, so we need to wait and see
+ # if it does all the work we're expecting it to do before stopping it.
+ self.busywait(lambda: self.node_setup.start.call_count == 3)
+ booting = self.daemon.booting.get(self.TIMEOUT)
+ self.stop_proxy(self.daemon)
+ sizecounts = {a[0].id: 0 for a in avail_sizes}
+ for b in booting.itervalues():
+ sizecounts[b.cloud_size.get().id] += 1
+ self.assertEqual(3, sizecounts[small.id])
+ self.assertEqual(0, sizecounts[big.id])
+
def test_wishlist_reconfigure(self):
small = testutil.MockSize(1)
big = testutil.MockSize(2)
from . import testutil
import arvnodeman.baseactor
+import arvnodeman.status as status
class BogusActor(arvnodeman.baseactor.BaseNodeManagerActor):
def __init__(self, e, killfunc=None):
self.assertTrue(kill_mock.called)
def test_nonfatal_error(self):
+ status.tracker.update({'actor_exceptions': 0})
kill_mock = mock.Mock('os.kill')
act = BogusActor.start(OSError(errno.ENOENT, ""), killfunc=kill_mock).tell_proxy()
act.doStuff()
act.actor_ref.stop(block=True)
self.assertFalse(kill_mock.called)
+ self.assertEqual(1, status.tracker.get('actor_exceptions'))
class WatchdogActorTest(testutil.ActorTestMixin, unittest.TestCase):
def test_unsatisfiable_jobs(self, mock_squeue, mock_scancel):
job_uuid = 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'
container_uuid = 'yyyyy-dz642-yyyyyyyyyyyyyyy'
- mock_squeue.return_value = "1|1024|0|(Resources)|" + container_uuid + "|\n"
+ mock_squeue.return_value = "1|1024|0|(Resources)|" + container_uuid + "||1234567890\n"
self.build_monitor([{'items': [{'uuid': job_uuid}]}],
self.MockCalculatorUnsatisfiableJobs(), True, True)
@mock.patch("subprocess.check_output")
def test_squeue_server_list(self, mock_squeue):
- mock_squeue.return_value = """1|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzy|(null)
-2|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzz|(null)
+ mock_squeue.return_value = """1|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzy|(null)|1234567890
+2|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzz|(null)|1234567890
"""
super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
@mock.patch("subprocess.check_output")
def test_squeue_server_list_suffix(self, mock_squeue):
- mock_squeue.return_value = """1|1024M|0|(ReqNodeNotAvail, UnavailableNodes:compute123)|zzzzz-dz642-zzzzzzzzzzzzzzy|(null)
-1|2G|0|(ReqNodeNotAvail)|zzzzz-dz642-zzzzzzzzzzzzzzz|(null)
+ mock_squeue.return_value = """1|1024M|0|(ReqNodeNotAvail, UnavailableNodes:compute123)|zzzzz-dz642-zzzzzzzzzzzzzzy|(null)|1234567890
+1|2G|0|(ReqNodeNotAvail)|zzzzz-dz642-zzzzzzzzzzzzzzz|(null)|1234567890
"""
super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
@mock.patch("subprocess.check_output")
def test_squeue_server_list_instancetype_constraint(self, mock_squeue):
- mock_squeue.return_value = """1|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzy|instancetype=z2.test\n"""
+ mock_squeue.return_value = """1|1024|0|(Resources)|zzzzz-dz642-zzzzzzzzzzzzzzy|instancetype=z2.test|1234567890\n"""
super(JobQueueMonitorActorTestCase, self).build_monitor(jobqueue.ServerCalculator(
[(testutil.MockSize(n), {'cores': n, 'ram': n*1024, 'scratch': n}) for n in range(1, 3)]),
True, True)
from future import standard_library
import json
+import mock
+import random
import requests
import unittest
self.assertEqual(n, resp['nodes_'+str(n)])
self.assertEqual(1, resp['nodes_1'])
self.assertIn('Version', resp)
+ self.assertIn('config_max_nodes', resp)
+
+ def test_counters(self):
+ with TestServer() as srv:
+ resp = srv.get_status()
+ # Test counters existance
+ for counter in ['list_nodes_errors', 'create_node_errors',
+ 'destroy_node_errors', 'boot_failures', 'actor_exceptions']:
+ self.assertIn(counter, resp)
+ # Test counter increment
+ for count in range(1, 3):
+ status.tracker.counter_add('a_counter')
+ resp = srv.get_status()
+ self.assertEqual(count, resp['a_counter'])
+
+ @mock.patch('time.time')
+ def test_idle_times(self, time_mock):
+ with TestServer() as srv:
+ resp = srv.get_status()
+ node_name = 'idle_compute{}'.format(random.randint(1, 1024))
+ self.assertIn('idle_times', resp)
+ # Test add an idle node
+ time_mock.return_value = 10
+ status.tracker.idle_in(node_name)
+ time_mock.return_value += 10
+ resp = srv.get_status()
+ self.assertEqual(10, resp['idle_times'][node_name])
+ # Test adding the same idle node a 2nd time
+ time_mock.return_value += 10
+ status.tracker.idle_in(node_name)
+ time_mock.return_value += 10
+ resp = srv.get_status()
+ # Idle timestamp doesn't get reset if already exists
+ self.assertEqual(30, resp['idle_times'][node_name])
+ # Test remove idle node
+ status.tracker.idle_out(node_name)
+ resp = srv.get_status()
+ self.assertNotIn(node_name, resp['idle_times'])
class StatusServerDisabled(unittest.TestCase):
;;
sh*)
- exec docker exec -ti $ARVBOX_CONTAINER /usr/bin/env TERM=$TERM GEM_HOME=/var/lib/gems /bin/bash
+ exec docker exec -ti -e LINES=$(tput lines) -e COLUMNS=$(tput cols) -e TERM=$TERM -e GEM_HOME=/var/lib/gems $ARVBOX_CONTAINER /bin/bash
;;
pipe)
# multiple packages, because it will blindly install the latest version of each
# dependency requested by each package, even if a compatible package version is
# already installed.
-pip_install pip
+pip_install pip==9.0.3
pip_install wheel
include agpl-3.0.txt
include crunchstat_summary/dygraphs.js
include crunchstat_summary/synchronizer.js
+include arvados_version.py
\ No newline at end of file
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import subprocess
+import time
+import os
+import re
+
+def git_latest_tag():
+ gitinfo = subprocess.check_output(
+ ['git', 'describe', '--abbrev=0']).strip()
+ return str(gitinfo.decode('utf-8'))
+
+def git_timestamp_tag():
+ gitinfo = subprocess.check_output(
+ ['git', 'log', '--first-parent', '--max-count=1',
+ '--format=format:%ct', '.']).strip()
+ return str(time.strftime('.%Y%m%d%H%M%S', time.gmtime(int(gitinfo))))
+
+def save_version(setup_dir, module, v):
+ with open(os.path.join(setup_dir, module, "_version.py"), 'w') as fp:
+ return fp.write("__version__ = '%s'\n" % v)
+
+def read_version(setup_dir, module):
+ with open(os.path.join(setup_dir, module, "_version.py"), 'r') as fp:
+ return re.match("__version__ = '(.*)'$", fp.read()).groups()[0]
+
+def get_version(setup_dir, module):
+ env_version = os.environ.get("ARVADOS_BUILDING_VERSION")
+
+ if env_version:
+ save_version(setup_dir, module, env_version)
+ else:
+ try:
+ save_version(setup_dir, module, git_latest_tag() + git_timestamp_tag())
+ except subprocess.CalledProcessError:
+ pass
+
+ return read_version(setup_dir, module)
#
# SPDX-License-Identifier: AGPL-3.0
+from __future__ import absolute_import
import os
import sys
-import setuptools.command.egg_info as egg_info_cmd
+import re
from setuptools import setup, find_packages
SETUP_DIR = os.path.dirname(__file__) or '.'
+README = os.path.join(SETUP_DIR, 'README.rst')
-tagger = egg_info_cmd.egg_info
-version = os.environ.get("ARVADOS_BUILDING_VERSION")
-if not version:
- version = "0.1"
- try:
- import gittaggers
- tagger = gittaggers.EggInfoFromGit
- except ImportError:
- pass
+import arvados_version
+version = arvados_version.get_version(SETUP_DIR, "crunchstat_summary")
+
+short_tests_only = False
+if '--short-tests-only' in sys.argv:
+ short_tests_only = True
+ sys.argv.remove('--short-tests-only')
setup(name='crunchstat_summary',
version=version,
],
test_suite='tests',
tests_require=['pbr<1.7.0', 'mock>=1.0'],
- zip_safe=False,
- cmdclass={'egg_info': tagger},
+ zip_safe=False
)