// For example, filterable.js writes filters in
// infiniteContentParamsFilterable ("search for text foo")
// without worrying about clobbering the filters set up by the
- // tab pane ("only show jobs and pipelines in this tab").
+ // tab pane ("only show container requests and pipeline instances
+ // in this tab").
$.each($container.data(), function(datakey, datavalue) {
// Note: We attach these data to DOM elements using
// <element data-foo-bar="baz">. We store/retrieve them
# Column names should always be qualified by a table name and a direction is optional, defaulting to asc
# (e.g. "collections.name" or "collections.name desc").
# If a column name is specified, that table will be sorted by that column.
- # If there are objects from different models that will be shown (such as in Jobs and Pipelines tab),
+ # If there are objects from different models that will be shown (such as in Pipelines and processes tab),
# then a sort column name can optionally be specified for each model, passed as an comma-separated list (e.g. "jobs.script, pipeline_instances.name")
# Currently only one sort column name and direction can be specified for each model.
def load_filters_and_paging_params
class ContainerRequestsController < ApplicationController
+ skip_around_filter :require_thread_api_token, if: proc { |ctrl|
+ Rails.configuration.anonymous_user_token and
+ 'show' == ctrl.action_name
+ }
+
def show_pane_list
%w(Status Log Advanced)
end
class ContainersController < ApplicationController
+ skip_around_filter :require_thread_api_token, if: proc { |ctrl|
+ Rails.configuration.anonymous_user_token and
+ 'show' == ctrl.action_name
+ }
+
def show_pane_list
%w(Status Log Advanced)
end
end
def logs
- @logs = Log.select(%w(event_type object_uuid event_at properties))
- .order('event_at DESC')
- .filter([["event_type", "=", "stderr"],
- ["object_uuid", "in", [@object.uuid]]])
- .limit(500)
- .results
- .to_a
- .map{ |e| e.serializable_hash.merge({ 'prepend' => true }) }
+ @logs = @object.
+ stderr_log_query(Rails.configuration.running_job_log_records_to_fetch).
+ map { |e| e.serializable_hash.merge({ 'prepend' => true }) }
respond_to do |format|
format.json { render json: @logs }
end
}
pane_list <<
{
- :name => 'Jobs_and_pipelines',
- :filters => [%w(uuid is_a) + [%w(arvados#job arvados#pipelineInstance)]]
+ :name => 'Pipelines_and_processes',
+ :filters => [%w(uuid is_a) + [%w(arvados#containerRequest arvados#pipelineInstance)]]
}
pane_list <<
{
@name_link_for = {}
kind_filters.each do |attr,op,val|
(val.is_a?(Array) ? val : [val]).each do |type|
+ filters = @filters - kind_filters + [['uuid', 'is_a', type]]
+ if type == 'arvados#containerRequest'
+ filters = filters + [['container_requests.requesting_container_uuid', '=', nil]]
+ end
objects = @object.contents(order: @order,
limit: @limit,
- filters: (@filters - kind_filters + [['uuid', 'is_a', type]]),
+ filters: filters,
)
objects.each do |object|
@name_link_for[object.andand.uuid] = objects.links_for(object, 'name').first
[ 'description' ]
end
+ def self.goes_in_projects?
+ true
+ end
+
def work_unit(label=nil)
ContainerWorkUnit.new(self, label)
end
+++ /dev/null
-<%= render_pane 'tab_contents', to_string: true, locals: {
- limit: 50,
- filters: [['uuid', 'is_a', ["arvados#job", "arvados#pipelineInstance"]]],
- sortable_columns: { 'name' => 'jobs.script, pipeline_instances.name', 'description' => 'jobs.description, pipeline_instances.description' }
- }.merge(local_assigns) %>
--- /dev/null
+<%= render_pane 'tab_contents', to_string: true, locals: {
+ limit: 50,
+ filters: [['uuid', 'is_a', ["arvados#containerRequest", "arvados#pipelineInstance"]]],
+ sortable_columns: { 'name' => 'container_requests.name, pipeline_instances.name', 'description' => 'container_requests.description, pipeline_instances.description' }
+ }.merge(local_assigns) %>
[
["active", 5, ["aproject", "asubproject"], "anonymously_accessible_project"],
- ["user1_with_load", 2, ["project_with_10_collections"], "project_with_2_pipelines_and_60_jobs"],
+ ["user1_with_load", 2, ["project_with_10_collections"], "project_with_2_pipelines_and_60_crs"],
["admin", 5, ["anonymously_accessible_project", "subproject_in_anonymous_accessible_project"], "aproject"],
].each do |user, page_size, tree_segment, unexpected|
test "build my projects tree for #{user} user and verify #{unexpected} is omitted" do
assert_selector 'a', text: 'Description'
assert_selector 'a', text: 'Data collections'
- assert_selector 'a', text: 'Jobs and pipelines'
+ assert_selector 'a', text: 'Pipelines and processes'
assert_selector 'a', text: 'Pipeline templates'
assert_selector 'a', text: 'Subprojects'
assert_selector 'a', text: 'Advanced'
end
[
- 'running_job',
- 'completed_job',
+ 'running anonymously accessible cr',
'pipelineInstance'
- ].each do |type|
- test "anonymous user accesses jobs and pipelines tab in shared project and clicks on #{type}" do
+ ].each do |proc|
+ test "anonymous user accesses pipelines and processes tab in shared project and clicks on '#{proc}'" do
visit PUBLIC_PROJECT
click_link 'Data collections'
assert_text 'GNU General Public License'
- click_link 'Jobs and pipelines'
+ click_link 'Pipelines and processes'
assert_text 'Pipeline in publicly accessible project'
- # click on the specified job
- if type.include? 'job'
- verify_job_row type
- else
+ if proc.include? 'pipeline'
verify_pipeline_instance_row
+ else
+ verify_container_request_row proc
end
end
end
- def verify_job_row look_for
+ def verify_container_request_row look_for
within first('tr', text: look_for) do
click_link 'Show'
end
assert_text 'Public Projects Unrestricted public data'
- assert_text 'script_version'
+ assert_text 'command'
assert_text 'zzzzz-tpzed-xurymjxw79nv3jz' # modified by user
assert_no_selector 'a', text: 'zzzzz-tpzed-xurymjxw79nv3jz'
- assert_no_selector 'a', text: 'Move job'
assert_no_selector 'button', text: 'Cancel'
- assert_no_selector 'button', text: 'Re-run job'
end
def verify_pipeline_instance_row
within "#collection_files" do
find('[title~=Download]').click
end
- wait_for_download 'w a z', 'w a z'
+ wait_for_download 'w a z', 'w a z', timeout: 6
end
- def wait_for_download filename, expect_data
+ def wait_for_download filename, expect_data, timeout: 3
data = nil
tries = 0
- while tries < 20
+ while tries < timeout*10
sleep 0.1
tries += 1
data = File.read(DownloadHelper.path.join filename) rescue nil
wait_for_ajax
end
- click_link 'Jobs and pipelines'
+ click_link 'Pipelines and processes'
find('tr[data-kind="arvados#pipelineInstance"]', text: '(none)').
find('a', text: 'Show').
click
[
['project_with_10_pipelines', 10, 0],
- ['project_with_2_pipelines_and_60_jobs', 2, 60],
+ ['project_with_2_pipelines_and_60_crs', 2, 60],
['project_with_25_pipelines', 25, 0],
- ].each do |project_name, num_pipelines, num_jobs|
- test "scroll pipeline instances tab for #{project_name} with #{num_pipelines} pipelines and #{num_jobs} jobs" do
- item_list_parameter = "Jobs_and_pipelines"
+ ].each do |project_name, num_pipelines, num_crs|
+ test "scroll pipeline instances tab for #{project_name} with #{num_pipelines} pipelines and #{num_crs} container requests" do
+ item_list_parameter = "Pipelines_and_processes"
scroll_setup project_name,
- num_pipelines + num_jobs,
+ num_pipelines + num_crs,
item_list_parameter
# check the general scrolling and the pipelines
scroll_items_check num_pipelines,
"pipeline_",
item_list_parameter,
'tr[data-kind="arvados#pipelineInstance"]'
- # Check job count separately
- jobs_found = page.all('tr[data-kind="arvados#job"]')
- found_job_count = jobs_found.count
- assert_equal num_jobs, found_job_count, 'Did not find expected number of jobs'
+ # Check container request count separately
+ crs_found = page.all('tr[data-kind="arvados#containerRequest"]')
+ found_cr_count = crs_found.count
+ assert_equal num_crs, found_cr_count, 'Did not find expected number of container requests'
end
end
assert_no_selector 'li.disabled', text: 'Copy selected'
end
- # Go to Jobs and pipelines tab and assert none selected
- click_link 'Jobs and pipelines'
+ # Go to Pipelines and processes tab and assert none selected
+ click_link 'Pipelines and processes'
wait_for_ajax
# Since this is the first visit to this tab, all selection options should be disabled
# So we build this thing separately.
#
# Ward, 2016-03-17
-fpm_build schema_salad "" "" python 1.12.20160610104117
+fpm_build schema_salad "" "" python 1.14.20160708181155
# And schema_salad now depends on ruamel-yaml, which apparently has a braindead setup.py that requires special arguments to build (otherwise, it aborts with 'error: you have to install with "pip install ."'). Sigh.
# Ward, 2016-05-26
-fpm_build ruamel.yaml "" "" python "" --python-setup-py-arguments "--single-version-externally-managed"
+# ...and schema_salad 1.12.20160610104117 doesn't work with ruamel-yaml > 0.11.11.
+fpm_build ruamel.yaml "" "" python 0.11.11 --python-setup-py-arguments "--single-version-externally-managed"
# And for cwltool we have the same problem as for schema_salad. Ward, 2016-03-17
-fpm_build cwltool "" "" python 1.0.20160630171631
+fpm_build cwltool "" "" python 1.0.20160714182449
# FPM eats the trailing .0 in the python-rdflib-jsonld package when built with 'rdflib-jsonld>=0.3.0'. Force the version. Ward, 2016-03-25
fpm_build rdflib-jsonld "" "" python 0.3.0
sdk/go/manifest
sdk/go/streamer
sdk/go/crunchrunner
+ lib/crunchstat
services/arv-git-httpd
services/crunchstat
services/keep-web
import json
import argparse
from arvados.api import OrderedJsonModel
-from cwltool.process import adjustFiles
+from cwltool.process import adjustFileObjs
from cwltool.load_tool import load_tool
# Print package versions
def keeppath(v):
if arvados.util.keep_locator_pattern.match(v):
- return "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], v)
+ return "keep:%s" % v
else:
return v
- job_order_object["cwl:tool"] = keeppath(job_order_object["cwl:tool"])
+ def keeppathObj(v):
+ v["location"] = keeppath(v["location"])
+
+ job_order_object["cwl:tool"] = "file://%s/%s" % (os.environ['TASK_KEEPMOUNT'], job_order_object["cwl:tool"])
for k,v in job_order_object.items():
if isinstance(v, basestring) and arvados.util.keep_locator_pattern.match(v):
job_order_object[k] = {
"class": "File",
- "path": keeppath(v)
+ "location": "keep:%s" % v
}
- adjustFiles(job_order_object, keeppath)
+ adjustFileObjs(job_order_object, keeppathObj)
runner = arvados_cwl.ArvCwlRunner(api_client=arvados.api('v1', model=OrderedJsonModel()))
outputObj = runner.arvExecutor(t, job_order_object, **vars(args))
files = {}
- def capture(path):
+ def capture(fileobj):
+ path = fileobj["location"]
sp = path.split("/")
col = sp[0][5:]
if col not in files:
files[col] = set()
files[col].add("/".join(sp[1:]))
- return path
+ fileobj["location"] = path
- adjustFiles(outputObj, capture)
+ adjustFileObjs(outputObj, capture)
final = arvados.collection.Collection()
for f in c:
final.copy(f, f, c, True)
- def makeRelative(path):
- return "/".join(path.split("/")[1:])
+ def makeRelative(fileobj):
+ fileobj["location"] = "/".join(fileobj["location"].split("/")[1:])
- adjustFiles(outputObj, makeRelative)
+ adjustFileObjs(outputObj, makeRelative)
with final.open("cwl.output.json", "w") as f:
json.dump(outputObj, f, indent=4)
Note: Because adding access tokens to manifests can be computationally expensive, the @manifest_text@ field is not included in listed collections. If you need it, request a "list of collections":{{site.baseurl}}/api/methods/collections.html with the filter @["owner_uuid", "=", GROUP_UUID]@, and @"manifest_text"@ listed in the select parameter.
+Note: Use filters with the attribute format @<item type>.<field name>@ to filter items of a specific type. For example: @["pipeline_instances.state", "=", "Complete"]@ to filter @pipeline_instances@ where @state@ is @Complete@. All other types of items owned by this group will be unimpacted by this filter and will still be included.
+
h2. create
Create a new Group.
<notextile>
<pre><code>~$ <span class="userinput">keepstore -h</span>
-2015/05/08 13:41:16 keepstore starting, pid 2565
+2016/07/01 14:06:21 keepstore starting, pid 32339
Usage of ./keepstore:
- -azure-storage-account-key-file="": File containing the account key used for subsequent --azure-storage-container-volume arguments.
- -azure-storage-account-name="": Azure storage account name used for subsequent --azure-storage-container-volume arguments.
- -azure-storage-container-volume=[]: Use the given container as a storage volume. Can be given multiple times.
- -azure-storage-replication=3: Replication level to report to clients when data is stored in an Azure container.
- -blob-signature-ttl=1209600: Lifetime of blob permission signatures. Modifying the ttl will invalidate all existing signatures. See services/api/config/application.default.yml.
- -blob-signing-key-file="": File containing the secret key for generating and verifying blob permission signatures.
- -data-manager-token-file="": File with the API token used by the Data Manager. All DELETE requests or GET /index requests must carry this token.
- -enforce-permissions=false: Enforce permission signatures on requests.
- -listen=":25107": Listening address, in the form "host:port". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces.
- -max-buffers=128: Maximum RAM to use for data buffers, given in multiples of block size (64 MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released.
+ -azure-max-get-bytes int
+ Maximum bytes to request in a single GET request. If smaller than 67108864, use multiple concurrent range requests to retrieve a block. (default 67108864)
+ -azure-storage-account-key-file string
+ File containing the account key used for subsequent --azure-storage-container-volume arguments.
+ -azure-storage-account-name string
+ Azure storage account name used for subsequent --azure-storage-container-volume arguments.
+ -azure-storage-container-volume value
+ Use the given container as a storage volume. Can be given multiple times. (default [])
+ -azure-storage-replication int
+ Replication level to report to clients when data is stored in an Azure container. (default 3)
+ -blob-signature-ttl int
+ Lifetime of blob permission signatures in seconds. Modifying the ttl will invalidate all existing signatures. See services/api/config/application.default.yml. (default 1209600)
+ -blob-signing-key-file string
+ File containing the secret key for generating and verifying blob permission signatures.
+ -data-manager-token-file string
+ File with the API token used by the Data Manager. All DELETE requests or GET /index requests must carry this token.
+ -enforce-permissions
+ Enforce permission signatures on requests.
+ -listen string
+ Listening address, in the form "host:port". e.g., 10.0.1.24:8000. Omit the host part to listen on all interfaces. (default ":25107")
+ -max-buffers int
+ Maximum RAM to use for data buffers, given in multiples of block size (64 MiB). When this limit is reached, HTTP requests requiring buffers (like GET and PUT) will wait for buffer space to be released. (default 128)
-max-requests int
- Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)
- -never-delete=false: If set, nothing will be deleted. HTTP 405 will be returned for valid DELETE requests.
- -permission-key-file="": Synonym for -blob-signing-key-file.
- -permission-ttl=0: Synonym for -blob-signature-ttl.
- -pid="": Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so `fuser -k pidfile` is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.
- -readonly=false: Do not write, delete, or touch anything on the following volumes.
- -serialize=false: Serialize read and write operations on the following volumes.
- -volume=[]: Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named "keep" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.
- -volumes=[]: Deprecated synonym for -volume.
+ Maximum concurrent requests. When this limit is reached, new requests will receive 503 responses. Note: this limit does not include idle connections from clients using HTTP keepalive, so it does not strictly limit the number of concurrent connections. (default 2 * max-buffers)
+ -never-delete
+ If true, nothing will be deleted. Warning: the relevant features in keepstore and data manager have not been extensively tested. You should leave this option alone unless you can afford to lose data. (default true)
+ -permission-key-file string
+ Synonym for -blob-signing-key-file.
+ -permission-ttl int
+ Synonym for -blob-signature-ttl.
+ -pid fuser -k pidfile
+ Path to write pid file during startup. This file is kept open and locked with LOCK_EX until keepstore exits, so fuser -k pidfile is one way to shut down. Exit immediately if there is an error opening, locking, or writing the pid file.
+ -readonly
+ Do not write, delete, or touch anything on the following volumes.
+ -s3-access-key-file string
+ File containing the access key used for subsequent -s3-bucket-volume arguments.
+ -s3-bucket-volume value
+ Use the given bucket as a storage volume. Can be given multiple times. (default [])
+ -s3-endpoint string
+ Endpoint URL used for subsequent -s3-bucket-volume arguments. If blank, use the AWS endpoint corresponding to the -s3-region argument. For Google Storage, use "https://storage.googleapis.com".
+ -s3-region string
+ AWS region used for subsequent -s3-bucket-volume arguments. Allowed values are ["ap-southeast-1" "eu-west-1" "us-gov-west-1" "sa-east-1" "cn-north-1" "ap-northeast-1" "ap-southeast-2" "eu-central-1" "us-east-1" "us-west-1" "us-west-2"].
+ -s3-replication int
+ Replication level reported to clients for subsequent -s3-bucket-volume arguments. (default 2)
+ -s3-secret-key-file string
+ File containing the secret key used for subsequent -s3-bucket-volume arguments.
+ -s3-unsafe-delete
+ EXPERIMENTAL. Enable deletion (garbage collection), even though there are known race conditions that can cause data loss.
+ -serialize
+ Serialize read and write operations on the following volumes.
+ -trash-check-interval duration
+ Time duration at which the emptyTrash goroutine will check and delete expired trashed blocks. Default is one day. (default 24h0m0s)
+ -trash-lifetime duration
+ Time duration after a block is trashed during which it can be recovered using an /untrash request
+ -volume value
+ Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named "keep" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead. (default [])
+ -volumes value
+ Deprecated synonym for -volume. (default [])
</code></pre>
</notextile>
ARG COMMIT=latest
RUN echo $COMMIT && apt-get update -q
-RUN apt-get install -qy git python-pip python-virtualenv python-arvados-python-client python-dev libcurl4-gnutls-dev nodejs python-arvados-cwl-runner
+RUN apt-get install -qy git python-pip python-virtualenv python-arvados-python-client python-dev libgnutls28-dev libcurl4-gnutls-dev nodejs python-arvados-cwl-runner
# Install dependencies and set up system.
RUN /usr/sbin/adduser --disabled-password \
--- /dev/null
+// Package crunchstat reports resource usage (CPU, memory, disk,
+// network) for a cgroup.
+package crunchstat
+
+import (
+ "bufio"
+ "bytes"
+ "errors"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "log"
+ "os"
+ "strconv"
+ "strings"
+ "time"
+)
+
+// This magically allows us to look up userHz via _SC_CLK_TCK:
+
+/*
+#include <unistd.h>
+#include <sys/types.h>
+#include <pwd.h>
+#include <stdlib.h>
+*/
+import "C"
+
+// A Reporter gathers statistics for a cgroup and writes them to a
+// log.Logger.
+type Reporter struct {
+ // CID of the container to monitor. If empty, read the CID
+ // from CIDFile (first waiting until a non-empty file appears
+ // at CIDFile). If CIDFile is also empty, report host
+ // statistics.
+ CID string
+
+ // Path to a file we can read CID from.
+ CIDFile string
+
+ // Where cgroup accounting files live on this system, e.g.,
+ // "/sys/fs/cgroup".
+ CgroupRoot string
+
+ // Parent cgroup, e.g., "docker".
+ CgroupParent string
+
+ // Interval between samples. Must be positive.
+ PollPeriod time.Duration
+
+ // Where to write statistics. Must not be nil.
+ Logger *log.Logger
+
+ reportedStatFile map[string]string
+ lastNetSample map[string]ioSample
+ lastDiskSample map[string]ioSample
+ lastCPUSample cpuSample
+
+ done chan struct{}
+}
+
+// Start starts monitoring in a new goroutine, and returns
+// immediately.
+//
+// The monitoring goroutine waits for a non-empty CIDFile to appear
+// (unless CID is non-empty). Then it waits for the accounting files
+// to appear for the monitored container. Then it collects and reports
+// statistics until Stop is called.
+//
+// Callers should not call Start more than once.
+//
+// Callers should not modify public data fields after calling Start.
+func (r *Reporter) Start() {
+ r.done = make(chan struct{})
+ go r.run()
+}
+
+// Stop reporting. Do not call more than once, or before calling
+// Start.
+//
+// Nothing will be logged after Stop returns.
+func (r *Reporter) Stop() {
+ close(r.done)
+}
+
+func (r *Reporter) readAllOrWarn(in io.Reader) ([]byte, error) {
+ content, err := ioutil.ReadAll(in)
+ if err != nil {
+ r.Logger.Print(err)
+ }
+ return content, err
+}
+
+// Open the cgroup stats file in /sys/fs corresponding to the target
+// cgroup, and return an io.ReadCloser. If no stats file is available,
+// return nil.
+//
+// Log the file that was opened, if it isn't the same file opened on
+// the last openStatFile for this stat.
+//
+// Log "not available" if no file is found and either this stat has
+// been available in the past, or verbose==true.
+//
+// TODO: Instead of trying all options, choose a process in the
+// container, and read /proc/PID/cgroup to determine the appropriate
+// cgroup root for the given statgroup. (This will avoid falling back
+// to host-level stats during container setup and teardown.)
+func (r *Reporter) openStatFile(statgroup, stat string, verbose bool) (io.ReadCloser, error) {
+ var paths []string
+ if r.CID != "" {
+ // Collect container's stats
+ paths = []string{
+ fmt.Sprintf("%s/%s/%s/%s/%s", r.CgroupRoot, statgroup, r.CgroupParent, r.CID, stat),
+ fmt.Sprintf("%s/%s/%s/%s", r.CgroupRoot, r.CgroupParent, r.CID, stat),
+ }
+ } else {
+ // Collect this host's stats
+ paths = []string{
+ fmt.Sprintf("%s/%s/%s", r.CgroupRoot, statgroup, stat),
+ fmt.Sprintf("%s/%s", r.CgroupRoot, stat),
+ }
+ }
+ var path string
+ var file *os.File
+ var err error
+ for _, path = range paths {
+ file, err = os.Open(path)
+ if err == nil {
+ break
+ } else {
+ path = ""
+ }
+ }
+ if pathWas := r.reportedStatFile[stat]; pathWas != path {
+ // Log whenever we start using a new/different cgroup
+ // stat file for a given statistic. This typically
+ // happens 1 to 3 times per statistic, depending on
+ // whether we happen to collect stats [a] before any
+ // processes have been created in the container and
+ // [b] after all contained processes have exited.
+ if path == "" && verbose {
+ r.Logger.Printf("notice: stats not available: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, r.CID, r.CgroupParent, r.CgroupRoot)
+ } else if pathWas != "" {
+ r.Logger.Printf("notice: stats moved from %s to %s\n", r.reportedStatFile[stat], path)
+ } else {
+ r.Logger.Printf("notice: reading stats from %s\n", path)
+ }
+ r.reportedStatFile[stat] = path
+ }
+ return file, err
+}
+
+func (r *Reporter) getContainerNetStats() (io.Reader, error) {
+ procsFile, err := r.openStatFile("cpuacct", "cgroup.procs", true)
+ if err != nil {
+ return nil, err
+ }
+ defer procsFile.Close()
+ reader := bufio.NewScanner(procsFile)
+ for reader.Scan() {
+ taskPid := reader.Text()
+ statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
+ stats, err := ioutil.ReadFile(statsFilename)
+ if err != nil {
+ r.Logger.Print(err)
+ continue
+ }
+ return strings.NewReader(string(stats)), nil
+ }
+ return nil, errors.New("Could not read stats for any proc in container")
+}
+
+type ioSample struct {
+ sampleTime time.Time
+ txBytes int64
+ rxBytes int64
+}
+
+func (r *Reporter) doBlkIOStats() {
+ c, err := r.openStatFile("blkio", "blkio.io_service_bytes", true)
+ if err != nil {
+ return
+ }
+ defer c.Close()
+ b := bufio.NewScanner(c)
+ var sampleTime = time.Now()
+ newSamples := make(map[string]ioSample)
+ for b.Scan() {
+ var device, op string
+ var val int64
+ if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
+ continue
+ }
+ var thisSample ioSample
+ var ok bool
+ if thisSample, ok = newSamples[device]; !ok {
+ thisSample = ioSample{sampleTime, -1, -1}
+ }
+ switch op {
+ case "Read":
+ thisSample.rxBytes = val
+ case "Write":
+ thisSample.txBytes = val
+ }
+ newSamples[device] = thisSample
+ }
+ for dev, sample := range newSamples {
+ if sample.txBytes < 0 || sample.rxBytes < 0 {
+ continue
+ }
+ delta := ""
+ if prev, ok := r.lastDiskSample[dev]; ok {
+ delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
+ sample.sampleTime.Sub(prev.sampleTime).Seconds(),
+ sample.txBytes-prev.txBytes,
+ sample.rxBytes-prev.rxBytes)
+ }
+ r.Logger.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
+ r.lastDiskSample[dev] = sample
+ }
+}
+
+type memSample struct {
+ sampleTime time.Time
+ memStat map[string]int64
+}
+
+func (r *Reporter) doMemoryStats() {
+ c, err := r.openStatFile("memory", "memory.stat", true)
+ if err != nil {
+ return
+ }
+ defer c.Close()
+ b := bufio.NewScanner(c)
+ thisSample := memSample{time.Now(), make(map[string]int64)}
+ wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
+ for b.Scan() {
+ var stat string
+ var val int64
+ if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err != nil {
+ continue
+ }
+ thisSample.memStat[stat] = val
+ }
+ var outstat bytes.Buffer
+ for _, key := range wantStats {
+ if val, ok := thisSample.memStat[key]; ok {
+ outstat.WriteString(fmt.Sprintf(" %d %s", val, key))
+ }
+ }
+ r.Logger.Printf("mem%s\n", outstat.String())
+}
+
+func (r *Reporter) doNetworkStats() {
+ sampleTime := time.Now()
+ stats, err := r.getContainerNetStats()
+ if err != nil {
+ return
+ }
+
+ scanner := bufio.NewScanner(stats)
+ for scanner.Scan() {
+ var ifName string
+ var rx, tx int64
+ words := strings.Fields(scanner.Text())
+ if len(words) != 17 {
+ // Skip lines with wrong format
+ continue
+ }
+ ifName = strings.TrimRight(words[0], ":")
+ if ifName == "lo" || ifName == "" {
+ // Skip loopback interface and lines with wrong format
+ continue
+ }
+ if tx, err = strconv.ParseInt(words[9], 10, 64); err != nil {
+ continue
+ }
+ if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil {
+ continue
+ }
+ nextSample := ioSample{}
+ nextSample.sampleTime = sampleTime
+ nextSample.txBytes = tx
+ nextSample.rxBytes = rx
+ var delta string
+ if prev, ok := r.lastNetSample[ifName]; ok {
+ interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
+ delta = fmt.Sprintf(" -- interval %.4f seconds %d tx %d rx",
+ interval,
+ tx-prev.txBytes,
+ rx-prev.rxBytes)
+ }
+ r.Logger.Printf("net:%s %d tx %d rx%s\n", ifName, tx, rx, delta)
+ r.lastNetSample[ifName] = nextSample
+ }
+}
+
+type cpuSample struct {
+ hasData bool // to distinguish the zero value from real data
+ sampleTime time.Time
+ user float64
+ sys float64
+ cpus int64
+}
+
+// Return the number of CPUs available in the container. Return 0 if
+// we can't figure out the real number of CPUs.
+func (r *Reporter) getCPUCount() int64 {
+ cpusetFile, err := r.openStatFile("cpuset", "cpuset.cpus", true)
+ if err != nil {
+ return 0
+ }
+ defer cpusetFile.Close()
+ b, err := r.readAllOrWarn(cpusetFile)
+ if err != nil {
+ return 0
+ }
+ sp := strings.Split(string(b), ",")
+ cpus := int64(0)
+ for _, v := range sp {
+ var min, max int64
+ n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
+ if n == 2 {
+ cpus += (max - min) + 1
+ } else {
+ cpus++
+ }
+ }
+ return cpus
+}
+
+func (r *Reporter) doCPUStats() {
+ statFile, err := r.openStatFile("cpuacct", "cpuacct.stat", true)
+ if err != nil {
+ return
+ }
+ defer statFile.Close()
+ b, err := r.readAllOrWarn(statFile)
+ if err != nil {
+ return
+ }
+
+ var userTicks, sysTicks int64
+ fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
+ userHz := float64(C.sysconf(C._SC_CLK_TCK))
+ nextSample := cpuSample{
+ hasData: true,
+ sampleTime: time.Now(),
+ user: float64(userTicks) / userHz,
+ sys: float64(sysTicks) / userHz,
+ cpus: r.getCPUCount(),
+ }
+
+ delta := ""
+ if r.lastCPUSample.hasData {
+ delta = fmt.Sprintf(" -- interval %.4f seconds %.4f user %.4f sys",
+ nextSample.sampleTime.Sub(r.lastCPUSample.sampleTime).Seconds(),
+ nextSample.user-r.lastCPUSample.user,
+ nextSample.sys-r.lastCPUSample.sys)
+ }
+ r.Logger.Printf("cpu %.4f user %.4f sys %d cpus%s\n",
+ nextSample.user, nextSample.sys, nextSample.cpus, delta)
+ r.lastCPUSample = nextSample
+}
+
+// Report stats periodically until we learn (via r.done) that someone
+// called Stop.
+func (r *Reporter) run() {
+ r.reportedStatFile = make(map[string]string)
+
+ if !r.waitForCIDFile() || !r.waitForCgroup() {
+ return
+ }
+
+ r.lastNetSample = make(map[string]ioSample)
+ r.lastDiskSample = make(map[string]ioSample)
+
+ ticker := time.NewTicker(r.PollPeriod)
+ for {
+ r.doMemoryStats()
+ r.doCPUStats()
+ r.doBlkIOStats()
+ r.doNetworkStats()
+ select {
+ case <-r.done:
+ return
+ case <-ticker.C:
+ }
+ }
+}
+
+// If CID is empty, wait for it to appear in CIDFile. Return true if
+// we get it before we learn (via r.done) that someone called Stop.
+func (r *Reporter) waitForCIDFile() bool {
+ if r.CID != "" || r.CIDFile == "" {
+ return true
+ }
+
+ ticker := time.NewTicker(100 * time.Millisecond)
+ defer ticker.Stop()
+ for {
+ cid, err := ioutil.ReadFile(r.CIDFile)
+ if err == nil && len(cid) > 0 {
+ r.CID = string(cid)
+ return true
+ }
+ select {
+ case <-ticker.C:
+ case <-r.done:
+ r.Logger.Printf("CID never appeared in %+q: %v", r.CIDFile, err)
+ return false
+ }
+ }
+}
+
+// Wait for the cgroup stats files to appear in cgroup_root. Return
+// true if they appear before r.done indicates someone called Stop. If
+// they don't appear within one poll interval, log a warning and keep
+// waiting.
+func (r *Reporter) waitForCgroup() bool {
+ ticker := time.NewTicker(100 * time.Millisecond)
+ defer ticker.Stop()
+ warningTimer := time.After(r.PollPeriod)
+ for {
+ c, err := r.openStatFile("cpuacct", "cgroup.procs", false)
+ if err == nil {
+ c.Close()
+ return true
+ }
+ select {
+ case <-ticker.C:
+ case <-warningTimer:
+ r.Logger.Printf("cgroup stats files have not appeared after %v (config error?) -- still waiting...", r.PollPeriod)
+ case <-r.done:
+ r.Logger.Printf("cgroup stats files never appeared for %v", r.CID)
+ return false
+ }
+ }
+}
--- /dev/null
+package crunchstat
+
+import (
+ "bufio"
+ "io"
+ "log"
+ "os"
+ "regexp"
+ "testing"
+)
+
+func bufLogger() (*log.Logger, *bufio.Reader) {
+ r, w := io.Pipe()
+ logger := log.New(w, "", 0)
+ return logger, bufio.NewReader(r)
+}
+
+func TestReadAllOrWarnFail(t *testing.T) {
+ logger, rcv := bufLogger()
+ rep := Reporter{Logger: logger}
+
+ done := make(chan bool)
+ var msg []byte
+ var err error
+ go func() {
+ msg, err = rcv.ReadBytes('\n')
+ close(done)
+ }()
+ {
+ // The special file /proc/self/mem can be opened for
+ // reading, but reading from byte 0 returns an error.
+ f, err := os.Open("/proc/self/mem")
+ if err != nil {
+ t.Fatalf("Opening /proc/self/mem: %s", err)
+ }
+ if x, err := rep.readAllOrWarn(f); err == nil {
+ t.Fatalf("Expected error, got %v", x)
+ }
+ }
+ <-done
+ if err != nil {
+ t.Fatal(err)
+ } else if matched, err := regexp.MatchString("^read /proc/self/mem: .*", string(msg)); err != nil || !matched {
+ t.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
+ }
+}
+
+func TestReadAllOrWarnSuccess(t *testing.T) {
+ rep := Reporter{Logger: log.New(os.Stderr, "", 0)}
+
+ f, err := os.Open("./crunchstat_test.go")
+ if err != nil {
+ t.Fatalf("Opening ./crunchstat_test.go: %s", err)
+ }
+ data, err := rep.readAllOrWarn(f)
+ if err != nil {
+ t.Fatalf("got error %s", err)
+ }
+ if matched, err := regexp.MatchString("^package crunchstat\n", string(data)); err != nil || !matched {
+ t.Fatalf("data failed regexp: err %v, matched %v", err, matched)
+ }
+}
useruuid = self.api.users().current().execute()["uuid"]
self.project_uuid = kwargs.get("project_uuid") if kwargs.get("project_uuid") else useruuid
self.pipeline = None
+ self.fs_access = CollectionFsAccess(kwargs["basedir"], api_client=self.api)
if kwargs.get("create_template"):
tmpl = RunnerTemplate(self, tool, job_order, kwargs.get("enable_reuse"))
self.debug = kwargs.get("debug")
self.ignore_docker_for_reuse = kwargs.get("ignore_docker_for_reuse")
- self.fs_access = CollectionFsAccess(kwargs["basedir"])
kwargs["fs_access"] = self.fs_access
kwargs["enable_reuse"] = kwargs.get("enable_reuse")
+ kwargs["use_container"] = True
+ kwargs["tmpdir_prefix"] = "tmp"
+ kwargs["on_error"] = "continue"
if self.work_api == "containers":
kwargs["outdir"] = "/var/spool/cwl"
+ kwargs["docker_outdir"] = "/var/spool/cwl"
kwargs["tmpdir"] = "/tmp"
elif self.work_api == "jobs":
kwargs["outdir"] = "$(task.outdir)"
+ kwargs["docker_outdir"] = "$(task.outdir)"
kwargs["tmpdir"] = "$(task.tmpdir)"
runnerjob = None
self.uuid = kwargs.get("cwl_runner_job").get('uuid')
jobiter = tool.job(job_order,
self.output_callback,
- docker_outdir="$(task.outdir)",
**kwargs)
try:
if self.final_status == "UnsupportedRequirement":
raise UnsupportedRequirement("Check log for details.")
+ if self.final_status != "success":
+ raise WorkflowException("Workflow failed.")
+
if self.final_output is None:
raise WorkflowException("Workflow did not return a result.")
return 1
arvargs.conformance_test = None
+ arvargs.use_container = True
return cwltool.main.main(args=arvargs,
stdout=stdout,
import os
from cwltool.errors import WorkflowException
-from cwltool.process import get_feature, adjustFiles, UnsupportedRequirement, shortname
+from cwltool.process import get_feature, UnsupportedRequirement, shortname
+from cwltool.pathmapper import adjustFiles
import arvados.collection
}
}
+ dirs = set()
for f in self.pathmapper.files():
- _, p = self.pathmapper.mapper(f)
- mounts[p] = {
- "kind": "collection",
- "portable_data_hash": p[6:]
- }
+ _, p, tp = self.pathmapper.mapper(f)
+ if tp == "Directory" and '/' not in p[6:]:
+ mounts[p] = {
+ "kind": "collection",
+ "portable_data_hash": p[6:]
+ }
+ dirs.add(p[6:])
+ for f in self.pathmapper.files():
+ _, p, tp = self.pathmapper.mapper(f)
+ if p[6:].split("/")[0] not in dirs:
+ mounts[p] = {
+ "kind": "collection",
+ "portable_data_hash": p[6:]
+ }
- if self.generatefiles:
+ if self.generatefiles["listing"]:
raise UnsupportedRequirement("Generate files not supported")
container_request["environment"] = {"TMPDIR": "/tmp"}
if self.stdin:
raise UnsupportedRequirement("Stdin redirection currently not suppported")
+ if self.stderr:
+ raise UnsupportedRequirement("Stderr redirection currently not suppported")
+
if self.stdout:
mounts["stdout"] = {"kind": "file",
"path": "%s/%s" % (self.outdir, self.stdout)}
from cwltool.process import get_feature, shortname
from cwltool.errors import WorkflowException
-from cwltool.draft2tool import revmap_file, remove_hostfs, CommandLineTool
+from cwltool.draft2tool import revmap_file, CommandLineTool
from cwltool.load_tool import fetch_document
from cwltool.builder import Builder
from .arvdocker import arv_docker_get_image
from .runner import Runner
+from .pathmapper import InitialWorkDirPathMapper
from . import done
logger = logging.getLogger('arvados.cwl-runner')
}
runtime_constraints = {}
- if self.generatefiles:
+ if self.generatefiles["listing"]:
vwd = arvados.collection.Collection()
script_parameters["task.vwd"] = {}
- for t in self.generatefiles:
- if isinstance(self.generatefiles[t], dict):
- src, rest = self.arvrunner.fs_access.get_collection(self.generatefiles[t]["path"].replace("$(task.keep)/", "keep:"))
- vwd.copy(rest, t, source_collection=src)
- else:
- with vwd.open(t, "w") as f:
- f.write(self.generatefiles[t].encode('utf-8'))
+ generatemapper = InitialWorkDirPathMapper([self.generatefiles], "", "",
+ separateDirs=False)
+ for f, p in generatemapper.items():
+ if p.type == "CreateFile":
+ with vwd.open(p.target, "w") as n:
+ n.write(p.resolved.encode("utf-8"))
vwd.save_new()
- for t in self.generatefiles:
- script_parameters["task.vwd"][t] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), t)
+ for f, p in generatemapper.items():
+ if p.type == "File":
+ script_parameters["task.vwd"][p.target] = p.resolved
+ if p.type == "CreateFile":
+ script_parameters["task.vwd"][p.target] = "$(task.keep)/%s/%s" % (vwd.portable_data_hash(), p.target)
script_parameters["task.env"] = {"TMPDIR": "$(task.tmpdir)"}
if self.environment:
script_parameters["task.env"].update(self.environment)
if self.stdin:
- script_parameters["task.stdin"] = self.pathmapper.mapper(self.stdin)[1]
+ script_parameters["task.stdin"] = self.stdin
if self.stdout:
script_parameters["task.stdout"] = self.stdout
+ if self.stderr:
+ script_parameters["task.stderr"] = self.stderr
+
+ if self.successCodes:
+ script_parameters["task.successCodes"] = self.successCodes
+ if self.temporaryFailCodes:
+ script_parameters["task.temporaryFailCodes"] = self.temporaryFailCodes
+ if self.permanentFailCodes:
+ script_parameters["task.permanentFailCodes"] = self.permanentFailCodes
+
(docker_req, docker_is_req) = get_feature(self, "DockerRequirement")
if docker_req and kwargs.get("use_container") is not False:
runtime_constraints["docker_image"] = arv_docker_get_image(self.arvrunner.api, docker_req, pull_image, self.arvrunner.project_uuid)
# Title and description...
title = param.pop('label', '')
- descr = param.pop('description', '').rstrip('\n')
+ descr = param.pop('doc', '').rstrip('\n')
if title:
param['title'] = title
if descr:
pass
elif not isinstance(value, dict):
param['value'] = value
- elif param.get('dataclass') == 'File' and value.get('path'):
- param['value'] = value['path']
+ elif param.get('dataclass') == 'File' and value.get('location'):
+ param['value'] = value['location']
spec['script_parameters'][param_id] = param
spec['script_parameters']['cwl:tool'] = job_params['cwl:tool']
elif self.work_api == "jobs":
return ArvadosJob(self.arvrunner)
- def makePathMapper(self, reffiles, **kwargs):
+ def makePathMapper(self, reffiles, stagedir, **kwargs):
+ # type: (List[Any], unicode, **Any) -> PathMapper
if self.work_api == "containers":
return ArvPathMapper(self.arvrunner, reffiles, kwargs["basedir"],
"/keep/%s",
def job(self, joborder, output_callback, **kwargs):
if self.work_api == "containers":
kwargs["outdir"] = "/var/spool/cwl"
+ kwargs["docker_outdir"] = "/var/spool/cwl"
elif self.work_api == "jobs":
kwargs["outdir"] = "$(task.outdir)"
+ kwargs["docker_outdir"] = "$(task.outdir)"
return super(ArvadosCommandTool, self).job(joborder, output_callback, **kwargs)
import os
import cwltool.process
+from cwltool.pathmapper import abspath
import arvados.util
import arvados.collection
+import arvados.arvfile
class CollectionFsAccess(cwltool.process.StdFsAccess):
"""Implement the cwltool FsAccess interface for Arvados Collections."""
- def __init__(self, basedir):
+ def __init__(self, basedir, api_client=None):
super(CollectionFsAccess, self).__init__(basedir)
+ self.api_client = api_client
self.collections = {}
def get_collection(self, path):
if p[0].startswith("keep:") and arvados.util.keep_locator_pattern.match(p[0][5:]):
pdh = p[0][5:]
if pdh not in self.collections:
- self.collections[pdh] = arvados.collection.CollectionReader(pdh)
+ self.collections[pdh] = arvados.collection.CollectionReader(pdh, api_client=self.api_client)
return (self.collections[pdh], "/".join(p[1:]))
else:
return (None, path)
def glob(self, pattern):
collection, rest = self.get_collection(pattern)
+ if collection and not rest:
+ return [pattern]
patternsegments = rest.split("/")
return self._match(collection, patternsegments, "keep:" + collection.manifest_locator())
if collection:
return collection.open(rest, mode)
else:
- return open(self._abs(fn), mode)
+ return super(CollectionFsAccess, self).open(self._abs(fn), mode)
def exists(self, fn):
collection, rest = self.get_collection(fn)
if collection:
return collection.exists(rest)
else:
- return os.path.exists(self._abs(fn))
+ return super(CollectionFsAccess, self).exists(fn)
+
+ def isfile(self, fn): # type: (unicode) -> bool
+ collection, rest = self.get_collection(fn)
+ if collection:
+ if rest:
+ return isinstance(collection.find(rest), arvados.arvfile.ArvadosFile)
+ else:
+ return False
+ else:
+ return super(CollectionFsAccess, self).isfile(fn)
+
+ def isdir(self, fn): # type: (unicode) -> bool
+ collection, rest = self.get_collection(fn)
+ if collection:
+ if rest:
+ return isinstance(collection.find(rest), arvados.collection.Collection)
+ else:
+ return True
+ else:
+ return super(CollectionFsAccess, self).isdir(fn)
+
+ def listdir(self, fn): # type: (unicode) -> List[unicode]
+ collection, rest = self.get_collection(fn)
+ if rest:
+ dir = collection.find(rest)
+ else:
+ dir = collection
+ if collection:
+ return [abspath(l, fn) for l in dir.keys()]
+ else:
+ return super(CollectionFsAccess, self).listdir(fn)
+
+ def join(self, path, *paths): # type: (unicode, *unicode) -> unicode
+ if paths and paths[-1].startswith("keep:") and arvados.util.keep_locator_pattern.match(paths[-1][5:]):
+ return paths[-1]
+ return os.path.join(path, *paths)
import re
+import logging
+import uuid
import arvados.commands.run
import arvados.collection
-import cwltool.pathmapper
-class ArvPathMapper(cwltool.pathmapper.PathMapper):
+from cwltool.pathmapper import PathMapper, MapperEnt, abspath
+from cwltool.workflow import WorkflowException
+
+logger = logging.getLogger('arvados.cwl-runner')
+
+class ArvPathMapper(PathMapper):
"""Convert container-local paths to and from Keep collection ids."""
+ pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+$')
+ pdh_dirpath = re.compile(r'^keep:[0-9a-f]{32}\+\d+(/.+)?$')
+
def __init__(self, arvrunner, referenced_files, input_basedir,
collection_pattern, file_pattern, name=None, **kwargs):
- self._pathmap = arvrunner.get_uploaded()
- uploadfiles = set()
-
- pdh_path = re.compile(r'^keep:[0-9a-f]{32}\+\d+/.+')
+ self.arvrunner = arvrunner
+ self.input_basedir = input_basedir
+ self.collection_pattern = collection_pattern
+ self.file_pattern = file_pattern
+ self.name = name
+ super(ArvPathMapper, self).__init__(referenced_files, input_basedir, None)
- for src in referenced_files:
- if isinstance(src, basestring) and pdh_path.match(src):
- self._pathmap[src] = (src, collection_pattern % src[5:])
+ def visit(self, srcobj, uploadfiles):
+ src = srcobj["location"]
+ if srcobj["class"] == "File":
if "#" in src:
src = src[:src.index("#")]
+ if isinstance(src, basestring) and ArvPathMapper.pdh_path.match(src):
+ self._pathmap[src] = MapperEnt(src, self.collection_pattern % src[5:], "File")
if src not in self._pathmap:
- ab = cwltool.pathmapper.abspath(src, input_basedir)
- st = arvados.commands.run.statfile("", ab, fnPattern=file_pattern)
- if kwargs.get("conformance_test"):
- self._pathmap[src] = (src, ab)
- elif isinstance(st, arvados.commands.run.UploadFile):
+ # Local FS ref, may need to be uploaded or may be on keep
+ # mount.
+ ab = abspath(src, self.input_basedir)
+ st = arvados.commands.run.statfile("", ab, fnPattern=self.file_pattern)
+ if isinstance(st, arvados.commands.run.UploadFile):
uploadfiles.add((src, ab, st))
elif isinstance(st, arvados.commands.run.ArvFile):
- self._pathmap[src] = (ab, st.fn)
+ self._pathmap[src] = MapperEnt(ab, st.fn, "File")
+ elif src.startswith("_:") and "contents" in srcobj:
+ pass
else:
- raise cwltool.workflow.WorkflowException("Input file path '%s' is invalid" % st)
+ raise WorkflowException("Input file path '%s' is invalid" % st)
+ if "secondaryFiles" in srcobj:
+ for l in srcobj["secondaryFiles"]:
+ self.visit(l, uploadfiles)
+ elif srcobj["class"] == "Directory":
+ if isinstance(src, basestring) and ArvPathMapper.pdh_dirpath.match(src):
+ self._pathmap[src] = MapperEnt(src, self.collection_pattern % src[5:], "Directory")
+ else:
+ for l in srcobj["listing"]:
+ self.visit(l, uploadfiles)
+
+ def addentry(self, obj, c, path, subdirs):
+ if obj["location"] in self._pathmap:
+ src, srcpath = self.arvrunner.fs_access.get_collection(self._pathmap[obj["location"]].resolved)
+ c.copy(srcpath, path + "/" + obj["basename"], source_collection=src, overwrite=True)
+ for l in obj.get("secondaryFiles", []):
+ self.addentry(l, c, path, subdirs)
+ elif obj["class"] == "Directory":
+ for l in obj["listing"]:
+ self.addentry(l, c, path + "/" + obj["basename"], subdirs)
+ subdirs.append((obj["location"], path + "/" + obj["basename"]))
+ elif obj["location"].startswith("_:") and "contents" in obj:
+ with c.open(path + "/" + obj["basename"], "w") as f:
+ f.write(obj["contents"].encode("utf-8"))
+ else:
+ raise WorkflowException("Don't know what to do with '%s'" % obj["location"])
+
+ def setup(self, referenced_files, basedir):
+ # type: (List[Any], unicode) -> None
+ self._pathmap = self.arvrunner.get_uploaded()
+ uploadfiles = set()
+
+ for srcobj in referenced_files:
+ self.visit(srcobj, uploadfiles)
if uploadfiles:
arvados.commands.run.uploadfiles([u[2] for u in uploadfiles],
- arvrunner.api,
- dry_run=kwargs.get("dry_run"),
- num_retries=3,
- fnPattern=file_pattern,
- name=name,
- project=arvrunner.project_uuid)
+ self.arvrunner.api,
+ dry_run=False,
+ num_retries=self.arvrunner.num_retries,
+ fnPattern=self.file_pattern,
+ name=self.name,
+ project=self.arvrunner.project_uuid)
for src, ab, st in uploadfiles:
- arvrunner.add_uploaded(src, (ab, st.fn))
- self._pathmap[src] = (ab, st.fn)
+ self._pathmap[src] = MapperEnt("keep:" + st.keepref, st.fn, "File")
+ self.arvrunner.add_uploaded(src, self._pathmap[src])
+
+ for srcobj in referenced_files:
+ if srcobj["class"] == "Directory":
+ if srcobj["location"] not in self._pathmap:
+ c = arvados.collection.Collection(api_client=self.arvrunner.api,
+ num_retries=self.arvrunner.num_retries)
+ subdirs = []
+ for l in srcobj["listing"]:
+ self.addentry(l, c, ".", subdirs)
+
+ check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
+ if not check["items"]:
+ c.save_new(owner_uuid=self.arvrunner.project_uuid)
+
+ ab = self.collection_pattern % c.portable_data_hash()
+ self._pathmap[srcobj["location"]] = MapperEnt(ab, ab, "Directory")
+ for loc, sub in subdirs:
+ ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
+ self._pathmap[loc] = MapperEnt(ab, ab, "Directory")
+ elif srcobj["class"] == "File" and (srcobj.get("secondaryFiles") or
+ (srcobj["location"].startswith("_:") and "contents" in srcobj)):
+
+ c = arvados.collection.Collection(api_client=self.arvrunner.api,
+ num_retries=self.arvrunner.num_retries )
+ subdirs = []
+ self.addentry(srcobj, c, ".", subdirs)
+
+ check = self.arvrunner.api.collections().list(filters=[["portable_data_hash", "=", c.portable_data_hash()]], limit=1).execute(num_retries=self.arvrunner.num_retries)
+ if not check["items"]:
+ c.save_new(owner_uuid=self.arvrunner.project_uuid)
+
+ ab = self.file_pattern % (c.portable_data_hash(), srcobj["basename"])
+ self._pathmap[srcobj["location"]] = MapperEnt(ab, ab, "File")
+ if srcobj.get("secondaryFiles"):
+ ab = self.collection_pattern % c.portable_data_hash()
+ self._pathmap["_:" + unicode(uuid.uuid4())] = MapperEnt(ab, ab, "Directory")
+ for loc, sub in subdirs:
+ ab = self.file_pattern % (c.portable_data_hash(), sub[2:])
+ self._pathmap[loc] = MapperEnt(ab, ab, "Directory")
self.keepdir = None
return (target, "keep:" + target[len(self.keepdir)+1:])
else:
return super(ArvPathMapper, self).reversemap(target)
+
+class InitialWorkDirPathMapper(PathMapper):
+ def setup(self, referenced_files, basedir):
+ # type: (List[Any], unicode) -> None
+
+ # Go through each file and set the target to its own directory along
+ # with any secondary files.
+ stagedir = self.stagedir
+ for fob in referenced_files:
+ self.visit(fob, stagedir, basedir)
+
+ for path, (ab, tgt, type) in self._pathmap.items():
+ if type in ("File", "Directory") and ab.startswith("keep:"):
+ self._pathmap[path] = MapperEnt("$(task.keep)/%s" % ab[5:], tgt, type)
from functools import partial
import logging
import json
+import re
+import cwltool.draft2tool
from cwltool.draft2tool import CommandLineTool
import cwltool.workflow
-from cwltool.process import get_feature, scandeps, adjustFiles, UnsupportedRequirement
+from cwltool.process import get_feature, scandeps, UnsupportedRequirement, normalizeFilesDirs
from cwltool.load_tool import fetch_document
+from cwltool.pathmapper import adjustFileObjs, adjustDirObjs
import arvados.collection
logger = logging.getLogger('arvados.cwl-runner')
+cwltool.draft2tool.ACCEPTLIST_RE = re.compile(r"^[a-zA-Z0-9._+-]+$")
+
class Runner(object):
def __init__(self, runner, tool, job_order, enable_reuse):
self.arvrunner = runner
def arvados_job_spec(self, *args, **kwargs):
self.upload_docker(self.tool)
- workflowfiles = set()
- jobfiles = set()
- workflowfiles.add(self.tool.tool["id"])
+ workflowfiles = []
+ jobfiles = []
+ workflowfiles.append({"class":"File", "location": self.tool.tool["id"]})
self.name = os.path.basename(self.tool.tool["id"])
def visitFiles(files, path):
- files.add(path)
- return path
+ files.append(path)
document_loader, workflowobj, uri = fetch_document(self.tool.tool["id"])
loaded = set()
sc = scandeps(uri, workflowobj,
set(("$import", "run")),
- set(("$include", "$schemas", "path")),
+ set(("$include", "$schemas", "path", "location")),
loadref)
- adjustFiles(sc, partial(visitFiles, workflowfiles))
- adjustFiles(self.job_order, partial(visitFiles, jobfiles))
+ adjustFileObjs(sc, partial(visitFiles, workflowfiles))
+ adjustFileObjs(self.job_order, partial(visitFiles, jobfiles))
+ adjustDirObjs(sc, partial(visitFiles, workflowfiles))
+ adjustDirObjs(self.job_order, partial(visitFiles, jobfiles))
+
+ normalizeFilesDirs(jobfiles)
+ normalizeFilesDirs(workflowfiles)
keepprefix = kwargs.get("keepprefix", "")
workflowmapper = ArvPathMapper(self.arvrunner, workflowfiles, "",
name=os.path.basename(self.job_order.get("id", "#")),
**kwargs)
- adjustFiles(self.job_order, lambda p: jobmapper.mapper(p)[1])
+ def setloc(p):
+ p["location"] = jobmapper.mapper(p["location"])[1]
+ adjustFileObjs(self.job_order, setloc)
+ adjustDirObjs(self.job_order, setloc)
if "id" in self.job_order:
del self.job_order["id"]
outc = arvados.collection.Collection(record["output"])
with outc.open("cwl.output.json") as f:
outputs = json.load(f)
- def keepify(path):
+ def keepify(fileobj):
+ path = fileobj["location"]
if not path.startswith("keep:"):
- return "keep:%s/%s" % (record["output"], path)
- else:
- return path
- adjustFiles(outputs, keepify)
+ fileobj["location"] = "keep:%s/%s" % (record["output"], path)
+ adjustFileObjs(outputs, keepify)
+ adjustDirObjs(outputs, keepify)
except Exception as e:
logger.error("While getting final output object: %s", e)
self.arvrunner.output_callback(outputs, processStatus)
'bin/cwl-runner',
'bin/arvados-cwl-runner'
],
+ # Make sure to update arvados/build/run-build-packages.sh as well
+ # when updating the cwltool version pin.
install_requires=[
- 'cwltool==1.0.20160630171631',
+ 'cwltool==1.0.20160714182449',
'arvados-python-client>=0.1.20160322001610'
],
data_files=[
#!/bin/sh
+set -x
+
if ! which arvbox >/dev/null ; then
export PATH=$PATH:$(readlink -f $(dirname $0)/../../tools/arvbox/bin)
fi
leave_running=0
config=dev
docker_pull=1
+tag=""
while test -n "$1" ; do
arg="$1"
docker_pull=0
shift
;;
+ --tag)
+ tag=$2
+ shift ; shift
+ ;;
-h|--help)
- echo "$0 [--no-reset-container] [--leave-running] [--no-docker-pull] [--config dev|localdemo]"
+ echo "$0 [--no-reset-container] [--leave-running] [--no-docker-pull] [--config dev|localdemo] [--tag docker_tag]"
exit
;;
*)
arvbox reset -f
fi
-arvbox start $config
+arvbox start $config $tag
arvbox pipe <<EOF
set -eu -o pipefail
export ARVADOS_API_TOKEN=\$(cat /var/lib/arvados/superuser_token)
if test $docker_pull = 1 ; then
- arv-keepdocker --pull arvados/jobs
+ arv-keepdocker --pull arvados/jobs $tag
fi
cat >/tmp/cwltest/arv-cwl-jobs <<EOF2
stubs.api.collections().create().execute.side_effect = ({
"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz1",
"portable_data_hash": "99999999999999999999999999999991+99",
+ "manifest_text": ""
}, {
"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz2",
"portable_data_hash": "99999999999999999999999999999992+99",
+ "manifest_text": "./tool 00000000000000000000000000000000+0 0:0:submit_tool.cwl 0:0:blub.txt"
},
{
"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz4",
"portable_data_hash": "99999999999999999999999999999994+99",
"manifest_text": ""
- })
+ },
+ {
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz5",
+ "portable_data_hash": "99999999999999999999999999999995+99",
+ "manifest_text": ""
+ } )
stubs.api.collections().get().execute.return_value = {
- "portable_data_hash": "99999999999999999999999999999993+99"}
+ "portable_data_hash": "99999999999999999999999999999993+99", "manifest_text": "./tool 00000000000000000000000000000000+0 0:0:submit_tool.cwl 0:0:blub.txt"}
stubs.expect_job_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
stubs.api.jobs().create().execute.return_value = {
},
'script_parameters': {
'x': {
- 'path': '99999999999999999999999999999992+99/blorp.txt',
+ 'basename': 'blorp.txt',
+ 'location': '99999999999999999999999999999994+99/blorp.txt',
'class': 'File'
},
'cwl:tool':
'kind': 'file'
},
'/var/lib/cwl/job/cwl.input.json': {
- 'portable_data_hash': '33be5c865fe12e1e4788d2f1bc627f7a+60/cwl.input.json',
+ 'portable_data_hash': '765fda0d9897729ff467a4609879c00a+60/cwl.input.json',
'kind': 'collection'
}
},
mock.call(),
mock.call(body={
'manifest_text':
- './tool a3954c369b8924d40547ec8cf5f6a7f4+449 '
- '0:16:blub.txt 16:433:submit_tool.cwl\n./wf '
- 'e046cace0b1a0a6ee645f6ea8688f7e2+364 0:364:submit_wf.cwl\n',
+ './tool d51232d96b6116d964a69bfb7e0c73bf+450 '
+ '0:16:blub.txt 16:434:submit_tool.cwl\n./wf '
+ '4d31c5fefd087faf67ca8db0111af36c+353 0:353:submit_wf.cwl\n',
'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
'name': 'submit_wf.cwl',
}, ensure_unique_name=True),
mock.call().execute(),
+ mock.call(body={'manifest_text': '. d41d8cd98f00b204e9800998ecf8427e+0 '
+ '0:0:blub.txt 0:0:submit_tool.cwl\n',
+ 'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+ 'name': 'New collection'},
+ ensure_unique_name=True),
+ mock.call().execute(num_retries=4),
mock.call(body={
'manifest_text':
'. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
mock.call(),
mock.call(body={
'manifest_text':
- './tool a3954c369b8924d40547ec8cf5f6a7f4+449 '
- '0:16:blub.txt 16:433:submit_tool.cwl\n./wf '
- 'e046cace0b1a0a6ee645f6ea8688f7e2+364 0:364:submit_wf.cwl\n',
+ './tool d51232d96b6116d964a69bfb7e0c73bf+450 '
+ '0:16:blub.txt 16:434:submit_tool.cwl\n./wf '
+ '4d31c5fefd087faf67ca8db0111af36c+353 0:353:submit_wf.cwl\n',
'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
'name': 'submit_wf.cwl',
}, ensure_unique_name=True),
mock.call().execute(),
+ mock.call(body={'manifest_text': '. d41d8cd98f00b204e9800998ecf8427e+0 '
+ '0:0:blub.txt 0:0:submit_tool.cwl\n',
+ 'owner_uuid': 'zzzzz-tpzed-zzzzzzzzzzzzzzz',
+ 'name': 'New collection'},
+ ensure_unique_name=True),
+ mock.call().execute(num_retries=4),
mock.call(body={
'manifest_text':
'. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
capture_stdout = cStringIO.StringIO()
exited = arvados_cwl.main(
- ["--create-template", "--no-wait",
+ ["--create-template", "--no-wait", "--debug",
"--project-uuid", project_uuid,
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
capture_stdout, sys.stderr, api_client=stubs.api)
'dataclass': 'File',
'required': True,
'type': 'File',
- 'value': '99999999999999999999999999999992+99/blorp.txt',
+ 'value': '99999999999999999999999999999994+99/blorp.txt',
}
expect_template = {
"components": {
expect_template["owner_uuid"] = stubs.fake_user_uuid
params = expect_template[
"components"]["inputs_test.cwl"]["script_parameters"]
- params["fileInput"]["value"] = '99999999999999999999999999999992+99/blorp.txt'
+ params["fileInput"]["value"] = '99999999999999999999999999999994+99/blorp.txt'
params["floatInput"]["value"] = 1.234
params["boolInput"]["value"] = True
# value blub.txt) and uploading to Keep works as intended.
class: CommandLineTool
-cwlVersion: draft-3
+cwlVersion: v1.0
requirements:
- class: DockerRequirement
dockerPull: debian:8
type: File
default:
class: File
- path: blub.txt
+ location: blub.txt
inputBinding:
position: 1
outputs: []
# various input types as script_parameters in pipeline templates.
class: Workflow
-cwlVersion: draft-3
+cwlVersion: v1.0
inputs:
- id: "#fileInput"
type: File
label: It's a file; we expect to find some characters in it.
- description: |
+ doc: |
If there were anything further to say, it would be said here,
or here.
- id: "#boolInput"
outputs: []
steps:
- id: step1
- inputs:
- - { id: x, source: "#x" }
- outputs: []
+ in:
+ - { id: x, source: "#fileInput" }
+ out: []
run: ../tool/submit_tool.cwl
# (e.g. submit_tool.cwl) and uploading to Keep works as intended.
class: Workflow
-cwlVersion: draft-3
+cwlVersion: v1.0
inputs:
- id: x
type: File
outputs: []
steps:
- id: step1
- inputs:
+ in:
- { id: x, source: "#x" }
- outputs: []
+ out: []
run: ../tool/submit_tool.cwl
},
},
{
- in: map[string]interface{}{"foo": map[string]interface{}{"bar":1.234}},
+ in: map[string]interface{}{"foo": map[string]interface{}{"bar": 1.234}},
ok: func(out url.Values) bool {
return out.Get("foo") == `{"bar":1.234}`
},
// us about a stored block.
type KeepServiceIndexEntry struct {
SizedDigest
+ // Time of last write, in nanoseconds since Unix epoch
Mtime int64
}
if err != nil {
return nil, fmt.Errorf("Malformed index line %q: mtime: %v", line, err)
}
+ if mtime < 1e12 {
+ // An old version of keepstore is giving us
+ // timestamps in seconds instead of
+ // nanoseconds. (This threshold correctly
+ // handles all times between 1970-01-02 and
+ // 33658-09-27.)
+ mtime = mtime * 1e9
+ }
entries = append(entries, KeepServiceIndexEntry{
SizedDigest: SizedDigest(fields[0]),
Mtime: mtime,
Env map[string]string `json:"task.env"`
Stdin string `json:"task.stdin"`
Stdout string `json:"task.stdout"`
+ Stderr string `json:"task.stderr"`
Vwd map[string]string `json:"task.vwd"`
SuccessCodes []int `json:"task.successCodes"`
PermanentFailCodes []int `json:"task.permanentFailCodes"`
return nil
}
-func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout string, err error) {
+func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout, stderr string, err error) {
if taskp.Vwd != nil {
for k, v := range taskp.Vwd {
v = substitute(v, replacements)
err = checkOutputFilename(outdir, k)
if err != nil {
- return "", "", err
+ return "", "", "", err
}
os.Symlink(v, outdir+"/"+k)
}
stdin = substitute(taskp.Stdin, replacements)
cmd.Stdin, err = os.Open(stdin)
if err != nil {
- return "", "", err
+ return "", "", "", err
}
}
if taskp.Stdout != "" {
err = checkOutputFilename(outdir, taskp.Stdout)
if err != nil {
- return "", "", err
+ return "", "", "", err
}
// Set up stdout redirection
stdout = outdir + "/" + taskp.Stdout
cmd.Stdout, err = os.Create(stdout)
if err != nil {
- return "", "", err
+ return "", "", "", err
}
} else {
cmd.Stdout = os.Stdout
}
- cmd.Stderr = os.Stderr
+ if taskp.Stderr != "" {
+ err = checkOutputFilename(outdir, taskp.Stderr)
+ if err != nil {
+ return "", "", "", err
+ }
+ // Set up stderr redirection
+ stderr = outdir + "/" + taskp.Stderr
+ cmd.Stderr, err = os.Create(stderr)
+ if err != nil {
+ return "", "", "", err
+ }
+ } else {
+ cmd.Stderr = os.Stderr
+ }
if taskp.Env != nil {
// Set up subprocess environment
cmd.Env = append(cmd.Env, k+"="+v)
}
}
- return stdin, stdout, nil
+ return stdin, stdout, stderr, nil
}
// Set up signal handlers. Go sends signal notifications to a "signal
cmd.Dir = outdir
- var stdin, stdout string
- stdin, stdout, err = setupCommand(cmd, taskp, outdir, replacements)
+ var stdin, stdout, stderr string
+ stdin, stdout, stderr, err = setupCommand(cmd, taskp, outdir, replacements)
if err != nil {
return err
}
if stdout != "" {
stdout = " > " + stdout
}
- log.Printf("Running %v%v%v", cmd.Args, stdin, stdout)
+ if stderr != "" {
+ stderr = " 2> " + stderr
+ }
+ log.Printf("Running %v%v%v%v", cmd.Args, stdin, stdout, stderr)
var caughtSignal os.Signal
sigChan := setupSignals(cmd)
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Job{Script_parameters: Tasks{[]TaskDef{{
Command: []string{"echo", "foo"}}}}},
Task{Sequence: 0})
c.Check(err, IsNil)
tmpdir,
"",
Job{Script_parameters: Tasks{[]TaskDef{
- TaskDef{Command: []string{"echo", "bar"}},
- TaskDef{Command: []string{"echo", "foo"}}}}},
+ {Command: []string{"echo", "bar"}},
+ {Command: []string{"echo", "foo"}}}}},
Task{Parameters: TaskDef{
Command: []string{"echo", "foo"},
Stdout: "output.txt"},
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Job{Script_parameters: Tasks{[]TaskDef{{
Command: []string{"cat"},
Stdout: "output.txt",
Stdin: tmpfile.Name()}}}},
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Job{Script_parameters: Tasks{[]TaskDef{{
Command: []string{"/bin/sh", "-c", "echo $BAR"},
Stdout: "output.txt",
Env: map[string]string{"BAR": "foo"}}}}},
"zzzz-ot0gb-111111111111111",
tmpdir,
"foo\n",
- Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Job{Script_parameters: Tasks{[]TaskDef{{
Command: []string{"/bin/sh", "-c", "echo $BAR"},
Stdout: "output.txt",
Env: map[string]string{"BAR": "$(task.keep)"}}}}},
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Job{Script_parameters: Tasks{[]TaskDef{{
Command: []string{"/bin/sh", "-c", "echo $PATH"},
Stdout: "output.txt",
Env: map[string]string{"PATH": "foo"}}}}},
func (s *TestSuite) TestScheduleSubtask(c *C) {
api := SubtaskTestClient{c, []Task{
- Task{Job_uuid: "zzzz-8i9sb-111111111111111",
+ {Job_uuid: "zzzz-8i9sb-111111111111111",
Created_by_job_task_uuid: "zzzz-ot0gb-111111111111111",
Sequence: 1,
Parameters: TaskDef{
Command: []string{"echo", "bar"}}},
- Task{Job_uuid: "zzzz-8i9sb-111111111111111",
+ {Job_uuid: "zzzz-8i9sb-111111111111111",
Created_by_job_task_uuid: "zzzz-ot0gb-111111111111111",
Sequence: 1,
Parameters: TaskDef{
tmpdir,
"",
Job{Script_parameters: Tasks{[]TaskDef{
- TaskDef{Command: []string{"echo", "bar"}},
- TaskDef{Command: []string{"echo", "foo"}}}}},
+ {Command: []string{"echo", "bar"}},
+ {Command: []string{"echo", "foo"}}}}},
Task{Sequence: 0})
c.Check(err, IsNil)
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Job{Script_parameters: Tasks{[]TaskDef{{
Command: []string{"/bin/sh", "-c", "exit 1"}}}}},
Task{Sequence: 0})
c.Check(err, FitsTypeOf, PermFail{})
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Job{Script_parameters: Tasks{[]TaskDef{{
Command: []string{"/bin/sh", "-c", "exit 1"},
SuccessCodes: []int{0, 1}}}}},
Task{Sequence: 0})
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Job{Script_parameters: Tasks{[]TaskDef{{
Command: []string{"/bin/sh", "-c", "exit 0"},
PermanentFailCodes: []int{0, 1}}}}},
Task{Sequence: 0})
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Job{Script_parameters: Tasks{[]TaskDef{{
Command: []string{"/bin/sh", "-c", "exit 1"},
TemporaryFailCodes: []int{1}}}}},
Task{Sequence: 0})
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Job{Script_parameters: Tasks{[]TaskDef{{
Command: []string{"ls", "output.txt"},
Vwd: map[string]string{
"output.txt": tmpfile.Name()}}}}},
"zzzz-ot0gb-111111111111111",
tmpdir,
keepmount,
- Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Job{Script_parameters: Tasks{[]TaskDef{{
Command: []string{"cat"},
Stdout: "output.txt",
Stdin: "$(task.keep)/file1.txt"}}}},
"zzzz-ot0gb-111111111111111",
tmpdir,
keepmount,
- Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Job{Script_parameters: Tasks{[]TaskDef{{
Command: []string{"cat", "$(task.keep)/file1.txt"},
Stdout: "output.txt"}}}},
Task{Sequence: 0})
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Job{Script_parameters: Tasks{[]TaskDef{{
Command: []string{"sleep", "4"}}}}},
Task{Sequence: 0})
c.Check(err, FitsTypeOf, PermFail{})
"zzzz-ot0gb-111111111111111",
tmpdir,
"",
- Job{Script_parameters: Tasks{[]TaskDef{TaskDef{
+ Job{Script_parameters: Tasks{[]TaskDef{{
Command: []string{"echo", "foo"},
Stdout: "s ub:dir/:e vi\nl"}}}},
Task{Sequence: 0})
// In case we exited the above loop early: before returning,
// drain the toGet channel so its sender doesn't sit around
// blocking forever.
- for _ = range r.toGet {
+ for range r.toGet {
}
}
pdh = item["portable_data_hash"]
for c in files:
+ c.keepref = "%s/%s" % (pdh, c.fn)
c.fn = fnPattern % (pdh, c.fn)
os.chdir(orgdir)
import re
import socket
import ssl
+import sys
import threading
import timer
global_client_object = None
+# Monkey patch TCP constants when not available (apple). Values sourced from:
+# http://www.opensource.apple.com/source/xnu/xnu-2422.115.4/bsd/netinet/tcp.h
+if sys.platform == 'darwin':
+ if not hasattr(socket, 'TCP_KEEPALIVE'):
+ socket.TCP_KEEPALIVE = 0x010
+ if not hasattr(socket, 'TCP_KEEPINTVL'):
+ socket.TCP_KEEPINTVL = 0x101
+ if not hasattr(socket, 'TCP_KEEPCNT'):
+ socket.TCP_KEEPCNT = 0x102
+
+
class KeepLocator(object):
EPOCH_DATETIME = datetime.datetime.utcfromtimestamp(0)
HINT_RE = re.compile(r'^[A-Z][A-Za-z0-9@_-]+$')
"""Because pycurl doesn't have CURLOPT_TCP_KEEPALIVE"""
s = socket.socket(family, socktype, protocol)
s.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
- s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
+ # Will throw invalid protocol error on mac. This test prevents that.
+ if hasattr(socket, 'TCP_KEEPIDLE'):
+ s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 75)
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 75)
return s
raise
os.mkfifo(fifo, 0700)
subprocess.Popen(
- ['sed', '-e', 's/^/['+label+'] /', fifo],
+ ['stdbuf', '-i0', '-oL', '-eL', 'sed', '-e', 's/^/['+label+'] /', fifo],
stdout=sys.stderr)
return fifo
request_orders = @orders.clone
@orders = []
- [Group,
- Job, PipelineInstance, PipelineTemplate,
+ request_filters = @filters
+
+ klasses = [Group,
+ Job, PipelineInstance, PipelineTemplate, ContainerRequest,
Collection,
- Human, Specimen, Trait].each do |klass|
+ Human, Specimen, Trait]
+
+ table_names = klasses.map(&:table_name)
+ request_filters.each do |col, op, val|
+ if col.index('.') && !table_names.include?(col.split('.', 2)[0])
+ raise ArgumentError.new("Invalid attribute '#{col}' in filter")
+ end
+ end
+
+ klasses.each do |klass|
# If the currently requested orders specifically match the
# table_name for the current klass, apply that order.
# Otherwise, order by recency.
where_conds[:group_class] = "project"
end
+ @filters = request_filters.map do |col, op, val|
+ if !col.index('.')
+ [col, op, val]
+ elsif (col = col.split('.', 2))[0] == klass.table_name
+ [col[1], op, val]
+ else
+ nil
+ end
+ end.compact
+
@objects = klass.readable_by(*@read_users).
order(request_order).where(where_conds)
@limit = limit_all - all_objects.count
begin
# Must have at least one filter set up to receive events
if ws.filters.length > 0
- # Start with log rows readable by user, sorted in ascending order
- logs = Log.readable_by(ws.user).order("id asc")
+ # Start with log rows readable by user
+ logs = Log.readable_by(ws.user)
cond_id = nil
cond_out = []
logs = logs.where(cond_id, *param_out)
end
- # Execute query and actually send the matching log rows
- logs.each do |l|
+ # Execute query and actually send the matching log rows. Load
+ # the full log records only when we're ready to send them,
+ # though: otherwise, (1) postgres has to build the whole
+ # result set and return it to us before we can send the first
+ # event, and (2) we store lots of records in memory while
+ # waiting to spool them out to the client. Both of these are
+ # troublesome when log records are large (e.g., a collection
+ # update contains both old and new manifest_text).
+ #
+ # Note: find_each implies order('id asc'), which is what we
+ # want.
+ logs.select(:id).find_each do |l|
if not ws.sent_ids.include?(l.id)
# only send if not a duplicate
- ws.send(l.as_api_response.to_json)
+ ws.send(Log.find(l.id).as_api_response.to_json)
end
if not ws.last_log_id.nil?
# record ids only when sending "catchup" messages, not notifies
output_path: test
command: ["echo", "hello"]
requesting_container_uuid: zzzzz-dz642-requestercntnr1
+
+running_anonymous_accessible:
+ uuid: zzzzz-xvhdp-runninganonaccs
+ owner_uuid: zzzzz-j7d0g-zhxawtyetzwc5f0
+ name: running anonymously accessible cr
+ state: Committed
+ priority: 1
+ created_at: 2016-01-11 11:11:11.111111111 Z
+ updated_at: 2016-01-11 11:11:11.111111111 Z
+ modified_at: 2016-01-11 11:11:11.111111111 Z
+ modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ container_image: test
+ cwd: test
+ output_path: test
+ command: ["echo", "hello"]
+ container_uuid: zzzzz-dz642-runningcontain2
+
+# Test Helper trims the rest of the file
+
+# Do not add your fixtures below this line as the rest of this file will be trimmed by test_helper
+
+# container requests in project_with_2_pipelines_and_60_crs
+<% for i in 1..60 do %>
+cr_<%=i%>_of_60:
+ uuid: zzzzz-xvhdp-oneof60crs<%= i.to_s.rjust(5, '0') %>
+ created_at: <%= ((i+5)/5).hour.ago.to_s(:db) %>
+ owner_uuid: zzzzz-j7d0g-nnncrspipelines
+ name: cr-<%= i.to_s %>
+ output_path: test
+ command: ["echo", "hello"]
+<% end %>
+
+# Do not add your fixtures below this line as the rest of this file will be trimmed by test_helper
description: project with 10 pipelines
group_class: project
-project_with_2_pipelines_and_60_jobs:
- uuid: zzzzz-j7d0g-nnjobspipelines
+project_with_2_pipelines_and_60_crs:
+ uuid: zzzzz-j7d0g-nnncrspipelines
owner_uuid: zzzzz-tpzed-user1withloadab
created_at: 2014-04-21 15:37:48 -0400
modified_by_client_uuid: zzzzz-ozdt8-brczlopd8u8d0jr
modified_by_user_uuid: zzzzz-tpzed-user1withloadab
modified_at: 2014-04-21 15:37:48 -0400
updated_at: 2014-04-21 15:37:48 -0400
- name: project with 2 pipelines and 60 jobs
+ name: project with 2 pipelines and 60 crs
description: This will result in two pages in the display
group_class: project
components:
component1: zzzzz-8i9sb-jyq01m7in1jlofj
component2: zzzzz-d1hrv-partdonepipelin
-
-# Test Helper trims the rest of the file
-
-# Do not add your fixtures below this line as the rest of this file will be trimmed by test_helper
-
-# jobs in project_with_2_pipelines_and_60_jobs
-<% for i in 1..60 do %>
-job_<%=i%>_of_60:
- uuid: zzzzz-8i9sb-oneof100jobs<%= i.to_s.rjust(3, '0') %>
- created_at: <%= ((i+5)/5).minute.ago.to_s(:db) %>
- owner_uuid: zzzzz-j7d0g-nnjobspipelines
- script_version: 7def43a4d3f20789dda4700f703b5514cc3ed250
- state: Complete
-<% end %>
-
-# Do not add your fixtures below this line as the rest of this file will be trimmed by test_helper
title: foo instance input
<% end %>
-# pipelines in project_with_2_pipelines_and_100_jobs
+# pipelines in project_with_2_pipelines_and_60_crs
<% for i in 1..2 do %>
-pipeline_<%=i%>_of_2_pipelines_and_100_jobs:
+pipeline_<%=i%>_of_2_pipelines_and_60_crs:
name: pipeline_<%= i %>
state: New
uuid: zzzzz-d1hrv-abcgneyn6brx<%= i.to_s.rjust(3, '0') %>
- owner_uuid: zzzzz-j7d0g-nnjobspipelines
+ owner_uuid: zzzzz-j7d0g-nnncrspipelines
created_at: <%= i.minute.ago.to_s(:db) %>
components:
foo:
end
assert_equal true, found_projects.include?(groups(:starred_and_shared_active_user_project).uuid)
end
+
+ [
+ [['owner_uuid', '!=', 'zzzzz-tpzed-xurymjxw79nv3jz'], 200,
+ 'zzzzz-d1hrv-subprojpipeline', 'zzzzz-d1hrv-1xfj6xkicf2muk2'],
+ [["pipeline_instances.state", "not in", ["Complete", "Failed"]], 200,
+ 'zzzzz-d1hrv-1xfj6xkicf2muk2', 'zzzzz-d1hrv-i3e77t9z5y8j9cc'],
+ [['container_requests.requesting_container_uuid', '=', nil], 200,
+ 'zzzzz-xvhdp-cr4queuedcontnr', 'zzzzz-xvhdp-cr4requestercn2'],
+ [['container_requests.no_such_column', '=', nil], 422],
+ [['container_requests.', '=', nil], 422],
+ [['.requesting_container_uuid', '=', nil], 422],
+ [['no_such_table.uuid', '!=', 'zzzzz-tpzed-xurymjxw79nv3jz'], 422],
+ ].each do |filter, expect_code, expect_uuid, not_expect_uuid|
+ test "get contents with '#{filter}' filter" do
+ authorize_with :active
+ get :contents, filters: [filter], format: :json
+ assert_response expect_code
+ if expect_code == 200
+ assert_not_empty json_response['items']
+ item_uuids = json_response['items'].collect {|item| item['uuid']}
+ assert_includes(item_uuids, expect_uuid)
+ assert_not_includes(item_uuids, not_expect_uuid)
+ end
+ end
+ end
end
EM.run {
if token
- ws = Faye::WebSocket::Client.new("ws://localhost:3002/websocket?api_token=#{api_client_authorizations(token).api_token}")
+ ws = Faye::WebSocket::Client.new("ws://localhost:#{WEBSOCKET_PORT}/websocket?api_token=#{api_client_authorizations(token).api_token}")
else
- ws = Faye::WebSocket::Client.new("ws://localhost:3002/websocket")
+ ws = Faye::WebSocket::Client.new("ws://localhost:#{WEBSOCKET_PORT}/websocket")
end
ws.on :open do |event|
require 'bundler'
+require 'socket'
$ARV_API_SERVER_DIR = File.expand_path('../..', __FILE__)
-SERVER_PID_PATH = 'tmp/pids/passenger.3002.pid'
+
+s = TCPServer.new('0.0.0.0', 0)
+WEBSOCKET_PORT = s.addr[1]
+s.close
+SERVER_PID_PATH = "tmp/pids/passenger.#{WEBSOCKET_PORT}.pid"
class WebsocketTestRunner < MiniTest::Unit
def _system(*cmd)
def _run(args=[])
server_pid = Dir.chdir($ARV_API_SERVER_DIR) do |apidir|
# Only passenger seems to be able to run the websockets server successfully.
- _system('passenger', 'start', '-d', '-p3002')
+ _system('passenger', 'start', '-d', "-p#{WEBSOCKET_PORT}")
timeout = Time.now.tv_sec + 10
begin
sleep 0.2
super(args)
ensure
Dir.chdir($ARV_API_SERVER_DIR) do
- _system('passenger', 'stop', '-p3002')
+ _system('passenger', 'stop', "-p#{WEBSOCKET_PORT}")
end
# DatabaseCleaner leaves the database empty. Prefer to leave it full.
dc = DatabaseController.new
}
// drain any subsequent status changes
- for _ = range status {
+ for range status {
}
log.Printf("Finalized container %v", uuid)
// There should be no queued containers now
params := arvadosclient.Dict{
- "filters": [][]string{[]string{"state", "=", "Queued"}},
+ "filters": [][]string{{"state", "=", "Queued"}},
}
var containers arvados.ContainerList
err = arv.List("containers", params, &containers)
// sbatchCmd
func sbatchFunc(container arvados.Container) *exec.Cmd {
memPerCPU := math.Ceil(float64(container.RuntimeConstraints.RAM) / (float64(container.RuntimeConstraints.VCPUs) * 1048576))
- return exec.Command("sbatch", "--share", "--parsable",
+ return exec.Command("sbatch", "--share",
fmt.Sprintf("--job-name=%s", container.UUID),
fmt.Sprintf("--mem-per-cpu=%d", int(memPerCPU)),
fmt.Sprintf("--cpus-per-task=%d", container.RuntimeConstraints.VCPUs),
// Submit job to slurm using sbatch.
func submit(dispatcher *dispatch.Dispatcher,
- container arvados.Container, crunchRunCommand string) (jobid string, submitErr error) {
- submitErr = nil
-
+ container arvados.Container, crunchRunCommand string) (submitErr error) {
defer func() {
// If we didn't get as far as submitting a slurm job,
// unlock the container and return it to the queue.
return
}
- // If everything worked out, got the jobid on stdout
- jobid = strings.TrimSpace(string(stdoutMsg))
-
+ log.Printf("sbatch succeeded: %s", strings.TrimSpace(string(stdoutMsg)))
return
}
log.Printf("About to submit queued container %v", container.UUID)
- if _, err := submit(dispatcher, container, *crunchRunCommand); err != nil {
+ if err := submit(dispatcher, container, *crunchRunCommand); err != nil {
log.Printf("Error submitting container %s to slurm: %v",
container.UUID, err)
// maybe sbatch is broken, put it back to queued
}
func (s *TestSuite) TestIntegrationMissingFromSqueue(c *C) {
- container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo") }, []string{"sbatch", "--share", "--parsable",
+ container := s.integrationTest(c, func() *exec.Cmd { return exec.Command("echo") }, []string{"sbatch", "--share",
fmt.Sprintf("--job-name=%s", "zzzzz-dz642-queuedcontainer"),
fmt.Sprintf("--mem-per-cpu=%d", 2862),
fmt.Sprintf("--cpus-per-task=%d", 4),
// There should be no queued containers now
params := arvadosclient.Dict{
- "filters": [][]string{[]string{"state", "=", "Queued"}},
+ "filters": [][]string{{"state", "=", "Queued"}},
}
var containers arvados.ContainerList
err = arv.List("containers", params, &containers)
"errors"
"flag"
"fmt"
+ "git.curoverse.com/arvados.git/lib/crunchstat"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
SigChan chan os.Signal
ArvMountExit chan error
finalState string
+
+ statLogger io.WriteCloser
+ statReporter *crunchstat.Reporter
+ statInterval time.Duration
+ cgroupRoot string
+ cgroupParent string
}
// SetupSignals sets up signal handling to gracefully terminate the underlying
signal.Notify(runner.SigChan, syscall.SIGQUIT)
go func(sig <-chan os.Signal) {
- for _ = range sig {
+ for range sig {
if !runner.Cancelled {
runner.CancelLock.Lock()
runner.Cancelled = true
runner.CrunchLog.Printf("While closing stderr logs: %v", closeerr)
}
+ if runner.statReporter != nil {
+ runner.statReporter.Stop()
+ closeerr = runner.statLogger.Close()
+ if closeerr != nil {
+ runner.CrunchLog.Printf("While closing crunchstat logs: %v", closeerr)
+ }
+ }
+
runner.loggingDone <- true
close(runner.loggingDone)
return
}
}
+func (runner *ContainerRunner) StartCrunchstat() {
+ runner.statLogger = NewThrottledLogger(runner.NewLogWriter("crunchstat"))
+ runner.statReporter = &crunchstat.Reporter{
+ CID: runner.ContainerID,
+ Logger: log.New(runner.statLogger, "", 0),
+ CgroupParent: runner.cgroupParent,
+ CgroupRoot: runner.cgroupRoot,
+ PollPeriod: runner.statInterval,
+ }
+ runner.statReporter.Start()
+}
+
// AttachLogs connects the docker container stdout and stderr logs to the
// Arvados logger which logs to Keep and the API server logs table.
func (runner *ContainerRunner) AttachStreams() (err error) {
return
}
+ runner.StartCrunchstat()
+
if runner.IsCancelled() {
return
}
}
func main() {
+ statInterval := flag.Duration("crunchstat-interval", 10*time.Second, "sampling period for periodic resource usage reporting")
+ cgroupRoot := flag.String("cgroup-root", "/sys/fs/cgroup", "path to sysfs cgroup tree")
+ cgroupParent := flag.String("cgroup-parent", "docker", "name of container's parent cgroup")
flag.Parse()
containerId := flag.Arg(0)
}
cr := NewContainerRunner(api, kc, docker, containerId)
+ cr.statInterval = *statInterval
+ cr.cgroupRoot = *cgroupRoot
+ cr.cgroupParent = *cgroupParent
err = cr.Run()
if err != nil {
. "gopkg.in/check.v1"
"io"
"io/ioutil"
- "log"
"os"
"os/exec"
"sort"
client.Mutex.Lock()
defer client.Mutex.Unlock()
- client.Calls += 1
+ client.Calls++
client.Content = append(client.Content, parameters)
if resourceType == "logs" {
func (client *ArvTestClient) Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) {
client.Mutex.Lock()
defer client.Mutex.Unlock()
- client.Calls += 1
+ client.Calls++
client.Content = append(client.Content, parameters)
if resourceType == "containers" {
if parameters["container"].(arvadosclient.Dict)["state"] == "Running" {
// parameters match jpath/string. E.g., CalledWith(c, "foo.bar",
// "baz") returns parameters with parameters["foo"]["bar"]=="baz". If
// no call matches, it returns nil.
-func (client *ArvTestClient) CalledWith(jpath, expect string) arvadosclient.Dict {
+func (client *ArvTestClient) CalledWith(jpath string, expect interface{}) arvadosclient.Dict {
call:
for _, content := range client.Content {
var v interface{} = content
v = dict[k]
}
}
- if v, ok := v.(string); ok && v == expect {
+ if v == expect {
return content
}
}
api = &ArvTestClient{Container: rec}
cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ cr.statInterval = 100 * time.Millisecond
am := &ArvMountCmdLine{}
cr.RunArvMount = am.ArvMountTest
t.finish <- dockerclient.WaitResult{}
})
- c.Check(api.Calls, Equals, 7)
- c.Check(api.Content[6]["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
- c.Check(api.Content[6]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
-
+ c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(api.CalledWith("container.state", "Complete"), NotNil)
c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "hello world\n"), Equals, true)
}
+func (s *TestSuite) TestCrunchstat(c *C) {
+ api, _ := FullRunHelper(c, `{
+ "command": ["sleep", "1"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": ".",
+ "environment": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+ }`, func(t *TestDockerClient) {
+ time.Sleep(time.Second)
+ t.logWriter.Close()
+ t.finish <- dockerclient.WaitResult{}
+ })
+
+ c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+
+ // We didn't actually start a container, so crunchstat didn't
+ // find accounting files and therefore didn't log any stats.
+ // It should have logged a "can't find accounting files"
+ // message after one poll interval, though, so we can confirm
+ // it's alive:
+ c.Assert(api.Logs["crunchstat"], NotNil)
+ c.Check(api.Logs["crunchstat"].String(), Matches, `(?ms).*cgroup stats files have not appeared after 100ms.*`)
+
+ // The "files never appeared" log assures us that we called
+ // (*crunchstat.Reporter)Stop(), and that we set it up with
+ // the correct container ID "abcde":
+ c.Check(api.Logs["crunchstat"].String(), Matches, `(?ms).*cgroup stats files never appeared for abcde\n`)
+}
+
func (s *TestSuite) TestFullRunStderr(c *C) {
api, _ := FullRunHelper(c, `{
"command": ["/bin/sh", "-c", "echo hello ; echo world 1>&2 ; exit 1"],
t.finish <- dockerclient.WaitResult{ExitCode: 1}
})
- c.Assert(api.Calls, Equals, 8)
- c.Check(api.Content[7]["container"].(arvadosclient.Dict)["log"], NotNil)
- c.Check(api.Content[7]["container"].(arvadosclient.Dict)["exit_code"], Equals, 1)
- c.Check(api.Content[7]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
+ final := api.CalledWith("container.state", "Complete")
+ c.Assert(final, NotNil)
+ c.Check(final["container"].(arvadosclient.Dict)["exit_code"], Equals, 1)
+ c.Check(final["container"].(arvadosclient.Dict)["log"], NotNil)
c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "hello\n"), Equals, true)
c.Check(strings.HasSuffix(api.Logs["stderr"].String(), "world\n"), Equals, true)
t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
- c.Check(api.Calls, Equals, 7)
- c.Check(api.Content[6]["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
- c.Check(api.Content[6]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
-
- log.Print(api.Logs["stdout"].String())
-
+ c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+ c.Log(api.Logs["stdout"])
c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/\n"), Equals, true)
}
t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
- c.Check(api.Calls, Equals, 7)
- c.Check(api.Content[6]["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
- c.Check(api.Content[6]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
-
+ c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(api.CalledWith("container.state", "Complete"), NotNil)
c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/bin\n"), Equals, true)
}
}
}
- c.Assert(api.Calls, Equals, 6)
- c.Check(api.Content[5]["container"].(arvadosclient.Dict)["log"], IsNil)
- c.Check(api.Content[5]["container"].(arvadosclient.Dict)["state"], Equals, "Cancelled")
+ c.Check(api.CalledWith("container.log", nil), NotNil)
+ c.Check(api.CalledWith("container.state", "Cancelled"), NotNil)
c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "foo\n"), Equals, true)
}
t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
- c.Check(api.Calls, Equals, 7)
- c.Check(api.Content[6]["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
- c.Check(api.Content[6]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
-
+ c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(api.CalledWith("container.state", "Complete"), NotNil)
c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "bilbo\n"), Equals, true)
}
}
func (s *TestSuite) TestStdout(c *C) {
- helperRecord := `{`
- helperRecord += `"command": ["/bin/sh", "-c", "echo $FROBIZ"],`
- helperRecord += `"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",`
- helperRecord += `"cwd": "/bin",`
- helperRecord += `"environment": {"FROBIZ": "bilbo"},`
- helperRecord += `"mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"} },`
- helperRecord += `"output_path": "/tmp",`
- helperRecord += `"priority": 1,`
- helperRecord += `"runtime_constraints": {}`
- helperRecord += `}`
+ helperRecord := `{
+ "command": ["/bin/sh", "-c", "echo $FROBIZ"],
+ "container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
+ "cwd": "/bin",
+ "environment": {"FROBIZ": "bilbo"},
+ "mounts": {"/tmp": {"kind": "tmp"}, "stdout": {"kind": "file", "path": "/tmp/a/b/c.out"} },
+ "output_path": "/tmp",
+ "priority": 1,
+ "runtime_constraints": {}
+ }`
api, _ := FullRunHelper(c, helperRecord, func(t *TestDockerClient) {
t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
- c.Assert(api.Calls, Equals, 6)
- c.Check(api.Content[5]["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
- c.Check(api.Content[5]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
- c.Check(api.CalledWith("collection.manifest_text", "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out\n"), Not(IsNil))
+ c.Check(api.CalledWith("container.exit_code", 0), NotNil)
+ c.Check(api.CalledWith("container.state", "Complete"), NotNil)
+ c.Check(api.CalledWith("collection.manifest_text", "./a/b 307372fa8fd5c146b22ae7a45b49bc31+6 0:6:c.out\n"), NotNil)
}
// Used by the TestStdoutWithWrongPath*()
Immediate *log.Logger
}
-// RFC3339Fixed is a fixed-width version of RFC3339 with microsecond precision,
-// because the RFC3339Nano format isn't fixed width.
-const RFC3339Fixed = "2006-01-02T15:04:05.000000Z07:00"
+// RFC3339NanoFixed is a fixed-width version of time.RFC3339Nano.
+const RFC3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
-// RFC3339Timestamp return a RFC3339 formatted timestamp using RFC3339Fixed
-func RFC3339Timestamp(now time.Time) string {
- return now.Format(RFC3339Fixed)
+// RFC3339Timestamp formats t as RFC3339NanoFixed.
+func RFC3339Timestamp(t time.Time) string {
+ return t.Format(RFC3339NanoFixed)
}
-// Write to the internal buffer. Prepend a timestamp to each line of the input
-// data.
+// Write prepends a timestamp to each line of the input data and
+// appends to the internal buffer. Each line is also logged to
+// tl.Immediate, if tl.Immediate is not nil.
func (tl *ThrottledLogger) Write(p []byte) (n int, err error) {
tl.Mutex.Lock()
if tl.buf == nil {
now := tl.Timestamper(time.Now().UTC())
sc := bufio.NewScanner(bytes.NewBuffer(p))
- for sc.Scan() {
- _, err = fmt.Fprintf(tl.buf, "%s %s\n", now, sc.Text())
+ for err == nil && sc.Scan() {
+ out := fmt.Sprintf("%s %s\n", now, sc.Bytes())
if tl.Immediate != nil {
- tl.Immediate.Printf("%s %s\n", now, sc.Text())
+ tl.Immediate.Print(out[:len(out)-1])
}
+ _, err = io.WriteString(tl.buf, out)
}
- return len(p), err
+ if err == nil {
+ err = sc.Err()
+ if err == nil {
+ n = len(p)
+ }
+ }
+ return
}
// Periodically check the current buffer; if not empty, send it on the
// (b) batches log messages and only calls the underlying Writer at most once
// per second.
func NewThrottledLogger(writer io.WriteCloser) *ThrottledLogger {
- alw := &ThrottledLogger{}
- alw.flusherDone = make(chan bool)
- alw.writer = writer
- alw.Logger = log.New(alw, "", 0)
- alw.Timestamper = RFC3339Timestamp
- go alw.flusher()
- return alw
+ tl := &ThrottledLogger{}
+ tl.flusherDone = make(chan bool)
+ tl.writer = writer
+ tl.Logger = log.New(tl, "", 0)
+ tl.Timestamper = RFC3339Timestamp
+ go tl.flusher()
+ return tl
}
-// ArvLogWriter implements a writer that writes to each of a WriteCloser
-// (typically CollectionFileWriter) and creates an API server log entry.
+// ArvLogWriter is an io.WriteCloser that processes each write by
+// writing it through to another io.WriteCloser (typically a
+// CollectionFileWriter) and creating an Arvados log entry.
type ArvLogWriter struct {
ArvClient IArvadosClient
UUID string
func (this *TestTimestamper) Timestamp(t time.Time) string {
this.count += 1
- return fmt.Sprintf("2015-12-29T15:51:45.%09dZ", this.count)
+ t, err := time.ParseInLocation(time.RFC3339Nano, fmt.Sprintf("2015-12-29T15:51:45.%09dZ", this.count), t.Location())
+ if err != nil {
+ panic(err)
+ }
+ return RFC3339Timestamp(t)
}
// Gocheck boilerplate
import (
"bufio"
- "bytes"
- "errors"
"flag"
- "fmt"
"io"
- "io/ioutil"
"log"
"os"
"os/exec"
"os/signal"
- "strconv"
- "strings"
"syscall"
"time"
-)
-/*
-#include <unistd.h>
-#include <sys/types.h>
-#include <pwd.h>
-#include <stdlib.h>
-*/
-import "C"
+ "git.curoverse.com/arvados.git/lib/crunchstat"
+)
-// The above block of magic allows us to look up user_hz via _SC_CLK_TCK.
+const MaxLogLine = 1 << 14 // Child stderr lines >16KiB will be split
-type Cgroup struct {
- root string
- parent string
- cid string
-}
+func main() {
+ reporter := crunchstat.Reporter{
+ Logger: log.New(os.Stderr, "crunchstat: ", 0),
+ }
-var childLog = log.New(os.Stderr, "", 0)
-var statLog = log.New(os.Stderr, "crunchstat: ", 0)
+ flag.StringVar(&reporter.CgroupRoot, "cgroup-root", "", "Root of cgroup tree")
+ flag.StringVar(&reporter.CgroupParent, "cgroup-parent", "", "Name of container parent under cgroup")
+ flag.StringVar(&reporter.CIDFile, "cgroup-cid", "", "Path to container id file")
+ pollMsec := flag.Int64("poll", 1000, "Reporting interval, in milliseconds")
-const (
- MaxLogLine = 1 << 14 // Child stderr lines >16KiB will be split
-)
+ flag.Parse()
-func CopyPipeToChildLog(in io.ReadCloser, done chan<- bool) {
- reader := bufio.NewReaderSize(in, MaxLogLine)
- var prefix string
- for {
- line, isPrefix, err := reader.ReadLine()
- if err == io.EOF {
- break
- } else if err != nil {
- statLog.Fatal("error reading child stderr:", err)
- }
- var suffix string
- if isPrefix {
- suffix = "[...]"
- }
- childLog.Print(prefix, string(line), suffix)
- // Set up prefix for following line
- if isPrefix {
- prefix = "[...]"
- } else {
- prefix = ""
- }
+ if reporter.CgroupRoot == "" {
+ reporter.Logger.Fatal("error: must provide -cgroup-root")
}
- done <- true
- in.Close()
-}
+ reporter.PollPeriod = time.Duration(*pollMsec) * time.Millisecond
-func ReadAllOrWarn(in *os.File) ([]byte, error) {
- content, err := ioutil.ReadAll(in)
- if err != nil {
- statLog.Printf("error reading %s: %s\n", in.Name(), err)
- }
- return content, err
-}
+ reporter.Start()
+ err := runCommand(flag.Args(), reporter.Logger)
+ reporter.Stop()
-var reportedStatFile = map[string]string{}
+ if err, ok := err.(*exec.ExitError); ok {
+ // The program has exited with an exit code != 0
-// Open the cgroup stats file in /sys/fs corresponding to the target
-// cgroup, and return an *os.File. If no stats file is available,
-// return nil.
-//
-// TODO: Instead of trying all options, choose a process in the
-// container, and read /proc/PID/cgroup to determine the appropriate
-// cgroup root for the given statgroup. (This will avoid falling back
-// to host-level stats during container setup and teardown.)
-func OpenStatFile(cgroup Cgroup, statgroup string, stat string) (*os.File, error) {
- var paths []string
- if cgroup.cid != "" {
- // Collect container's stats
- paths = []string{
- fmt.Sprintf("%s/%s/%s/%s/%s", cgroup.root, statgroup, cgroup.parent, cgroup.cid, stat),
- fmt.Sprintf("%s/%s/%s/%s", cgroup.root, cgroup.parent, cgroup.cid, stat),
- }
- } else {
- // Collect this host's stats
- paths = []string{
- fmt.Sprintf("%s/%s/%s", cgroup.root, statgroup, stat),
- fmt.Sprintf("%s/%s", cgroup.root, stat),
- }
- }
- var path string
- var file *os.File
- var err error
- for _, path = range paths {
- file, err = os.Open(path)
- if err == nil {
- break
+ // This works on both Unix and Windows. Although
+ // package syscall is generally platform dependent,
+ // WaitStatus is defined for both Unix and Windows and
+ // in both cases has an ExitStatus() method with the
+ // same signature.
+ if status, ok := err.Sys().(syscall.WaitStatus); ok {
+ os.Exit(status.ExitStatus())
} else {
- path = ""
+ reporter.Logger.Fatalln("ExitError without WaitStatus:", err)
}
+ } else if err != nil {
+ reporter.Logger.Fatalln("error in cmd.Wait:", err)
}
- if pathWas, ok := reportedStatFile[stat]; !ok || pathWas != path {
- // Log whenever we start using a new/different cgroup
- // stat file for a given statistic. This typically
- // happens 1 to 3 times per statistic, depending on
- // whether we happen to collect stats [a] before any
- // processes have been created in the container and
- // [b] after all contained processes have exited.
- if path == "" {
- statLog.Printf("notice: stats not available: stat %s, statgroup %s, cid %s, parent %s, root %s\n", stat, statgroup, cgroup.cid, cgroup.parent, cgroup.root)
- } else if ok {
- statLog.Printf("notice: stats moved from %s to %s\n", reportedStatFile[stat], path)
- } else {
- statLog.Printf("notice: reading stats from %s\n", path)
- }
- reportedStatFile[stat] = path
- }
- return file, err
}
-func GetContainerNetStats(cgroup Cgroup) (io.Reader, error) {
- procsFile, err := OpenStatFile(cgroup, "cpuacct", "cgroup.procs")
- if err != nil {
- return nil, err
- }
- defer procsFile.Close()
- reader := bufio.NewScanner(procsFile)
- for reader.Scan() {
- taskPid := reader.Text()
- statsFilename := fmt.Sprintf("/proc/%s/net/dev", taskPid)
- stats, err := ioutil.ReadFile(statsFilename)
- if err != nil {
- statLog.Printf("error reading %s: %s\n", statsFilename, err)
- continue
- }
- return strings.NewReader(string(stats)), nil
- }
- return nil, errors.New("Could not read stats for any proc in container")
-}
+func runCommand(argv []string, logger *log.Logger) error {
+ cmd := exec.Command(argv[0], argv[1:]...)
-type IoSample struct {
- sampleTime time.Time
- txBytes int64
- rxBytes int64
-}
+ logger.Println("Running", argv)
-func DoBlkIoStats(cgroup Cgroup, lastSample map[string]IoSample) {
- c, err := OpenStatFile(cgroup, "blkio", "blkio.io_service_bytes")
- if err != nil {
- return
- }
- defer c.Close()
- b := bufio.NewScanner(c)
- var sampleTime = time.Now()
- newSamples := make(map[string]IoSample)
- for b.Scan() {
- var device, op string
- var val int64
- if _, err := fmt.Sscanf(string(b.Text()), "%s %s %d", &device, &op, &val); err != nil {
- continue
- }
- var thisSample IoSample
- var ok bool
- if thisSample, ok = newSamples[device]; !ok {
- thisSample = IoSample{sampleTime, -1, -1}
- }
- switch op {
- case "Read":
- thisSample.rxBytes = val
- case "Write":
- thisSample.txBytes = val
- }
- newSamples[device] = thisSample
- }
- for dev, sample := range newSamples {
- if sample.txBytes < 0 || sample.rxBytes < 0 {
- continue
- }
- delta := ""
- if prev, ok := lastSample[dev]; ok {
- delta = fmt.Sprintf(" -- interval %.4f seconds %d write %d read",
- sample.sampleTime.Sub(prev.sampleTime).Seconds(),
- sample.txBytes-prev.txBytes,
- sample.rxBytes-prev.rxBytes)
- }
- statLog.Printf("blkio:%s %d write %d read%s\n", dev, sample.txBytes, sample.rxBytes, delta)
- lastSample[dev] = sample
- }
-}
-
-type MemSample struct {
- sampleTime time.Time
- memStat map[string]int64
-}
+ // Child process will use our stdin and stdout pipes
+ // (we close our copies below)
+ cmd.Stdin = os.Stdin
+ cmd.Stdout = os.Stdout
-func DoMemoryStats(cgroup Cgroup) {
- c, err := OpenStatFile(cgroup, "memory", "memory.stat")
- if err != nil {
- return
- }
- defer c.Close()
- b := bufio.NewScanner(c)
- thisSample := MemSample{time.Now(), make(map[string]int64)}
- wantStats := [...]string{"cache", "swap", "pgmajfault", "rss"}
- for b.Scan() {
- var stat string
- var val int64
- if _, err := fmt.Sscanf(string(b.Text()), "%s %d", &stat, &val); err != nil {
- continue
- }
- thisSample.memStat[stat] = val
- }
- var outstat bytes.Buffer
- for _, key := range wantStats {
- if val, ok := thisSample.memStat[key]; ok {
- outstat.WriteString(fmt.Sprintf(" %d %s", val, key))
+ // Forward SIGINT and SIGTERM to child process
+ sigChan := make(chan os.Signal, 1)
+ go func(sig <-chan os.Signal) {
+ catch := <-sig
+ if cmd.Process != nil {
+ cmd.Process.Signal(catch)
}
- }
- statLog.Printf("mem%s\n", outstat.String())
-}
+ logger.Println("notice: caught signal:", catch)
+ }(sigChan)
+ signal.Notify(sigChan, syscall.SIGTERM)
+ signal.Notify(sigChan, syscall.SIGINT)
-func DoNetworkStats(cgroup Cgroup, lastSample map[string]IoSample) {
- sampleTime := time.Now()
- stats, err := GetContainerNetStats(cgroup)
+ // Funnel stderr through our channel
+ stderr_pipe, err := cmd.StderrPipe()
if err != nil {
- return
+ logger.Fatalln("error in StderrPipe:", err)
}
- scanner := bufio.NewScanner(stats)
- for scanner.Scan() {
- var ifName string
- var rx, tx int64
- words := strings.Fields(scanner.Text())
- if len(words) != 17 {
- // Skip lines with wrong format
- continue
- }
- ifName = strings.TrimRight(words[0], ":")
- if ifName == "lo" || ifName == "" {
- // Skip loopback interface and lines with wrong format
- continue
- }
- if tx, err = strconv.ParseInt(words[9], 10, 64); err != nil {
- continue
- }
- if rx, err = strconv.ParseInt(words[1], 10, 64); err != nil {
- continue
- }
- nextSample := IoSample{}
- nextSample.sampleTime = sampleTime
- nextSample.txBytes = tx
- nextSample.rxBytes = rx
- var delta string
- if prev, ok := lastSample[ifName]; ok {
- interval := nextSample.sampleTime.Sub(prev.sampleTime).Seconds()
- delta = fmt.Sprintf(" -- interval %.4f seconds %d tx %d rx",
- interval,
- tx-prev.txBytes,
- rx-prev.rxBytes)
- }
- statLog.Printf("net:%s %d tx %d rx%s\n", ifName, tx, rx, delta)
- lastSample[ifName] = nextSample
+ // Run subprocess
+ if err := cmd.Start(); err != nil {
+ logger.Fatalln("error in cmd.Start:", err)
}
-}
-type CpuSample struct {
- hasData bool // to distinguish the zero value from real data
- sampleTime time.Time
- user float64
- sys float64
- cpus int64
-}
+ // Close stdin/stdout in this (parent) process
+ os.Stdin.Close()
+ os.Stdout.Close()
-// Return the number of CPUs available in the container. Return 0 if
-// we can't figure out the real number of CPUs.
-func GetCpuCount(cgroup Cgroup) int64 {
- cpusetFile, err := OpenStatFile(cgroup, "cpuset", "cpuset.cpus")
- if err != nil {
- return 0
- }
- defer cpusetFile.Close()
- b, err := ReadAllOrWarn(cpusetFile)
- sp := strings.Split(string(b), ",")
- cpus := int64(0)
- for _, v := range sp {
- var min, max int64
- n, _ := fmt.Sscanf(v, "%d-%d", &min, &max)
- if n == 2 {
- cpus += (max - min) + 1
- } else {
- cpus += 1
- }
- }
- return cpus
-}
+ copyPipeToChildLog(stderr_pipe, log.New(os.Stderr, "", 0))
-func DoCpuStats(cgroup Cgroup, lastSample *CpuSample) {
- statFile, err := OpenStatFile(cgroup, "cpuacct", "cpuacct.stat")
- if err != nil {
- return
- }
- defer statFile.Close()
- b, err := ReadAllOrWarn(statFile)
- if err != nil {
- return
- }
-
- nextSample := CpuSample{true, time.Now(), 0, 0, GetCpuCount(cgroup)}
- var userTicks, sysTicks int64
- fmt.Sscanf(string(b), "user %d\nsystem %d", &userTicks, &sysTicks)
- user_hz := float64(C.sysconf(C._SC_CLK_TCK))
- nextSample.user = float64(userTicks) / user_hz
- nextSample.sys = float64(sysTicks) / user_hz
-
- delta := ""
- if lastSample.hasData {
- delta = fmt.Sprintf(" -- interval %.4f seconds %.4f user %.4f sys",
- nextSample.sampleTime.Sub(lastSample.sampleTime).Seconds(),
- nextSample.user-lastSample.user,
- nextSample.sys-lastSample.sys)
- }
- statLog.Printf("cpu %.4f user %.4f sys %d cpus%s\n",
- nextSample.user, nextSample.sys, nextSample.cpus, delta)
- *lastSample = nextSample
+ return cmd.Wait()
}
-func PollCgroupStats(cgroup Cgroup, poll int64, stop_poll_chan <-chan bool) {
- var lastNetSample = map[string]IoSample{}
- var lastDiskSample = map[string]IoSample{}
- var lastCpuSample = CpuSample{}
-
- poll_chan := make(chan bool, 1)
- go func() {
- // Send periodic poll events.
- poll_chan <- true
- for {
- time.Sleep(time.Duration(poll) * time.Millisecond)
- poll_chan <- true
- }
- }()
+func copyPipeToChildLog(in io.ReadCloser, logger *log.Logger) {
+ reader := bufio.NewReaderSize(in, MaxLogLine)
+ var prefix string
for {
- select {
- case <-stop_poll_chan:
- return
- case <-poll_chan:
- // Emit stats, then select again.
- }
- DoMemoryStats(cgroup)
- DoCpuStats(cgroup, &lastCpuSample)
- DoBlkIoStats(cgroup, lastDiskSample)
- DoNetworkStats(cgroup, lastNetSample)
- }
-}
-
-func run(logger *log.Logger) error {
-
- var (
- cgroup_root string
- cgroup_parent string
- cgroup_cidfile string
- wait int64
- poll int64
- )
-
- flag.StringVar(&cgroup_root, "cgroup-root", "", "Root of cgroup tree")
- flag.StringVar(&cgroup_parent, "cgroup-parent", "", "Name of container parent under cgroup")
- flag.StringVar(&cgroup_cidfile, "cgroup-cid", "", "Path to container id file")
- flag.Int64Var(&wait, "wait", 5, "Maximum time (in seconds) to wait for cid file to show up")
- flag.Int64Var(&poll, "poll", 1000, "Polling frequency, in milliseconds")
-
- flag.Parse()
-
- if cgroup_root == "" {
- statLog.Fatal("error: must provide -cgroup-root")
- }
-
- finish_chan := make(chan bool)
- defer close(finish_chan)
-
- var cmd *exec.Cmd
-
- if len(flag.Args()) > 0 {
- // Set up subprocess
- cmd = exec.Command(flag.Args()[0], flag.Args()[1:]...)
-
- childLog.Println("Running", flag.Args())
-
- // Child process will use our stdin and stdout pipes
- // (we close our copies below)
- cmd.Stdin = os.Stdin
- cmd.Stdout = os.Stdout
-
- // Forward SIGINT and SIGTERM to inner process
- sigChan := make(chan os.Signal, 1)
- go func(sig <-chan os.Signal) {
- catch := <-sig
- if cmd.Process != nil {
- cmd.Process.Signal(catch)
- }
- statLog.Println("notice: caught signal:", catch)
- }(sigChan)
- signal.Notify(sigChan, syscall.SIGTERM)
- signal.Notify(sigChan, syscall.SIGINT)
-
- // Funnel stderr through our channel
- stderr_pipe, err := cmd.StderrPipe()
- if err != nil {
- statLog.Fatalln("error in StderrPipe:", err)
- }
- go CopyPipeToChildLog(stderr_pipe, finish_chan)
-
- // Run subprocess
- if err := cmd.Start(); err != nil {
- statLog.Fatalln("error in cmd.Start:", err)
- }
-
- // Close stdin/stdout in this (parent) process
- os.Stdin.Close()
- os.Stdout.Close()
- }
-
- // Read the cid file
- var container_id string
- if cgroup_cidfile != "" {
- // wait up to 'wait' seconds for the cid file to appear
- ok := false
- var i time.Duration
- for i = 0; i < time.Duration(wait)*time.Second; i += (100 * time.Millisecond) {
- cid, err := ioutil.ReadFile(cgroup_cidfile)
- if err == nil && len(cid) > 0 {
- ok = true
- container_id = string(cid)
- break
- }
- time.Sleep(100 * time.Millisecond)
+ line, isPrefix, err := reader.ReadLine()
+ if err == io.EOF {
+ break
+ } else if err != nil {
+ logger.Fatal("error reading child stderr:", err)
}
- if !ok {
- statLog.Println("error reading cid file:", cgroup_cidfile)
+ var suffix string
+ if isPrefix {
+ suffix = "[...]"
}
- }
-
- stop_poll_chan := make(chan bool, 1)
- cgroup := Cgroup{cgroup_root, cgroup_parent, container_id}
- go PollCgroupStats(cgroup, poll, stop_poll_chan)
-
- // When the child exits, tell the polling goroutine to stop.
- defer func() { stop_poll_chan <- true }()
-
- // Wait for CopyPipeToChan to consume child's stderr pipe
- <-finish_chan
-
- return cmd.Wait()
-}
-
-func main() {
- logger := log.New(os.Stderr, "crunchstat: ", 0)
- if err := run(logger); err != nil {
- if exiterr, ok := err.(*exec.ExitError); ok {
- // The program has exited with an exit code != 0
-
- // This works on both Unix and
- // Windows. Although package syscall is
- // generally platform dependent, WaitStatus is
- // defined for both Unix and Windows and in
- // both cases has an ExitStatus() method with
- // the same signature.
- if status, ok := exiterr.Sys().(syscall.WaitStatus); ok {
- os.Exit(status.ExitStatus())
- }
+ logger.Print(prefix, string(line), suffix)
+ // Set up prefix for following line
+ if isPrefix {
+ prefix = "[...]"
} else {
- statLog.Fatalln("error in cmd.Wait:", err)
+ prefix = ""
}
}
+ in.Close()
}
"io"
"log"
"math/rand"
- "os"
- "regexp"
"testing"
"time"
)
-func TestReadAllOrWarnFail(t *testing.T) {
- rcv := captureLogs()
- defer uncaptureLogs()
- go func() {
- // The special file /proc/self/mem can be opened for
- // reading, but reading from byte 0 returns an error.
- f, err := os.Open("/proc/self/mem")
- if err != nil {
- t.Fatalf("Opening /proc/self/mem: %s", err)
- }
- if x, err := ReadAllOrWarn(f); err == nil {
- t.Fatalf("Expected error, got %v", x)
- }
- }()
- if msg, err := rcv.ReadBytes('\n'); err != nil {
- t.Fatal(err)
- } else if matched, err := regexp.MatchString("^crunchstat: .*error.*", string(msg)); err != nil || !matched {
- t.Fatalf("Expected error message about unreadable file, got \"%s\"", msg)
- }
-}
-
-func TestReadAllOrWarnSuccess(t *testing.T) {
- f, err := os.Open("./crunchstat_test.go")
- if err != nil {
- t.Fatalf("Opening ./crunchstat_test.go: %s", err)
- }
- data, err := ReadAllOrWarn(f)
- if err != nil {
- t.Fatalf("got error %s", err)
- }
- if matched, err := regexp.MatchString("^package main\n", string(data)); err != nil || !matched {
- t.Fatalf("data failed regexp: %s", err)
- }
-}
-
// Test that CopyPipeToChildLog works even on lines longer than
// bufio.MaxScanTokenSize.
func TestCopyPipeToChildLogLongLines(t *testing.T) {
- rcv := captureLogs()
- defer uncaptureLogs()
+ logger, logBuf := bufLogger()
- control := make(chan bool)
pipeIn, pipeOut := io.Pipe()
- go CopyPipeToChildLog(pipeIn, control)
+ copied := make(chan bool)
+ go func() {
+ copyPipeToChildLog(pipeIn, logger)
+ close(copied)
+ }()
sentBytes := make([]byte, bufio.MaxScanTokenSize+MaxLogLine+(1<<22))
go func() {
pipeOut.Close()
}()
- if before, err := rcv.ReadBytes('\n'); err != nil || string(before) != "before\n" {
+ if before, err := logBuf.ReadBytes('\n'); err != nil || string(before) != "before\n" {
t.Fatalf("\"before\n\" not received (got \"%s\", %s)", before, err)
}
var receivedBytes []byte
done := false
for !done {
- line, err := rcv.ReadBytes('\n')
+ line, err := logBuf.ReadBytes('\n')
if err != nil {
t.Fatal(err)
}
}
line = line[5:]
}
- if len(line) >= 6 && string(line[len(line)-6:len(line)]) == "[...]\n" {
+ if len(line) >= 6 && string(line[len(line)-6:]) == "[...]\n" {
line = line[:len(line)-6]
} else {
done = true
t.Fatalf("sent %d bytes, got %d different bytes", len(sentBytes), len(receivedBytes))
}
- if after, err := rcv.ReadBytes('\n'); err != nil || string(after) != "after\n" {
+ if after, err := logBuf.ReadBytes('\n'); err != nil || string(after) != "after\n" {
t.Fatalf("\"after\n\" not received (got \"%s\", %s)", after, err)
}
select {
case <-time.After(time.Second):
t.Fatal("Timeout")
- case <-control:
+ case <-copied:
// Done.
}
}
-func captureLogs() *bufio.Reader {
- // Send childLog to our bufio reader instead of stderr
- stderrIn, stderrOut := io.Pipe()
- childLog = log.New(stderrOut, "", 0)
- statLog = log.New(stderrOut, "crunchstat: ", 0)
- return bufio.NewReader(stderrIn)
-}
-
-func uncaptureLogs() {
- childLog = log.New(os.Stderr, "", 0)
- statLog = log.New(os.Stderr, "crunchstat: ", 0)
+func bufLogger() (*log.Logger, *bufio.Reader) {
+ r, w := io.Pipe()
+ logger := log.New(w, "", 0)
+ return logger, bufio.NewReader(r)
}
// SdkCollectionInfo holds collection info from api
type SdkCollectionInfo struct {
- UUID string `json:"uuid"`
- OwnerUUID string `json:"owner_uuid"`
- ReplicationDesired int `json:"replication_desired"`
- ModifiedAt time.Time `json:"modified_at"`
- ManifestText string `json:"manifest_text"`
+ UUID string `json:"uuid"`
+ OwnerUUID string `json:"owner_uuid"`
+ ReplicationDesired int `json:"replication_desired"`
+ ModifiedAt time.Time `json:"modified_at"`
+ ManifestText string `json:"manifest_text"`
}
// SdkCollectionList lists collections from api
sdkParams := arvadosclient.Dict{
"select": fieldsWanted,
"order": []string{"modified_at ASC", "uuid ASC"},
- "filters": [][]string{[]string{"modified_at", ">=", "1900-01-01T00:00:00Z"}},
+ "filters": [][]string{{"modified_at", ">=", "1900-01-01T00:00:00Z"}},
"offset": 0}
if params.BatchSize > 0 {
}
func (s *MySuite) TestSummarizeSimple(checker *C) {
- rc := MakeTestReadCollections([]TestCollectionSpec{TestCollectionSpec{
+ rc := MakeTestReadCollections([]TestCollectionSpec{{
ReplicationLevel: 5,
Blocks: []int{1, 2},
}})
expected := ExpectedSummary{
OwnerToCollectionSize: map[string]int{c.OwnerUUID: c.TotalSize},
BlockToDesiredReplication: map[blockdigest.DigestWithSize]int{blockDigest1: 5, blockDigest2: 5},
- BlockToCollectionUuids: map[blockdigest.DigestWithSize][]string{blockDigest1: []string{c.UUID}, blockDigest2: []string{c.UUID}},
+ BlockToCollectionUuids: map[blockdigest.DigestWithSize][]string{blockDigest1: {c.UUID}, blockDigest2: {c.UUID}},
}
CompareSummarizedReadCollections(checker, rc, expected)
func (s *MySuite) TestSummarizeOverlapping(checker *C) {
rc := MakeTestReadCollections([]TestCollectionSpec{
- TestCollectionSpec{
+ {
ReplicationLevel: 5,
Blocks: []int{1, 2},
},
- TestCollectionSpec{
+ {
ReplicationLevel: 8,
Blocks: []int{2, 3},
},
blockDigest3: 8,
},
BlockToCollectionUuids: map[blockdigest.DigestWithSize][]string{
- blockDigest1: []string{c0.UUID},
- blockDigest2: []string{c0.UUID, c1.UUID},
- blockDigest3: []string{c1.UUID},
+ blockDigest1: {c0.UUID},
+ blockDigest2: {c0.UUID, c1.UUID},
+ blockDigest3: {c1.UUID},
},
}
// GetKeepServers from api server
func GetKeepServers(params GetKeepServersParams) (results ReadServers, err error) {
sdkParams := arvadosclient.Dict{
- "filters": [][]string{[]string{"service_type", "!=", "proxy"}},
+ "filters": [][]string{{"service_type", "!=", "proxy"}},
}
if params.Limit > 0 {
sdkParams["limit"] = params.Limit
return
}
- blockInfo.Mtime, err = strconv.ParseInt(tokens[1], 10, 64)
+ var ns int64
+ ns, err = strconv.ParseInt(tokens[1], 10, 64)
if err != nil {
return
}
- blockInfo.Digest =
- blockdigest.DigestWithSize{Digest: locator.Digest,
- Size: uint32(locator.Size)}
+ if ns < 1e12 {
+ // An old version of keepstore is giving us timestamps
+ // in seconds instead of nanoseconds. (This threshold
+ // correctly handles all times between 1970-01-02 and
+ // 33658-09-27.)
+ ns = ns * 1e9
+ }
+ blockInfo.Mtime = ns
+ blockInfo.Digest = blockdigest.DigestWithSize{
+ Digest: locator.Digest,
+ Size: uint32(locator.Size),
+ }
return
}
defer server.Close()
tl := map[string]TrashList{
- server.URL: TrashList{TrashRequest{"000000000000000000000000deadbeef", 99}}}
+ server.URL: {TrashRequest{"000000000000000000000000deadbeef", 99}}}
arv := arvadosclient.ArvadosClient{ApiToken: "abc123"}
kc := keepclient.KeepClient{Arvados: &arv, Client: &http.Client{}}
func sendTrashListError(c *C, server *httptest.Server) {
tl := map[string]TrashList{
- server.URL: TrashList{TrashRequest{"000000000000000000000000deadbeef", 99}}}
+ server.URL: {TrashRequest{"000000000000000000000000deadbeef", 99}}}
arv := arvadosclient.ArvadosClient{ApiToken: "abc123"}
kc := keepclient.KeepClient{Arvados: &arv, Client: &http.Client{}}
locator1 := Locator{Digest: blockdigest.MakeTestBlockDigest(0xBadBeef)}
c.Check(
BuildPullLists(map[Locator]PullServers{
- locator1: PullServers{To: []string{}, From: []string{}}}),
+ locator1: {To: []string{}, From: []string{}}}),
PullListMapEquals,
map[string]PullList{})
c.Check(
BuildPullLists(map[Locator]PullServers{
- locator1: PullServers{To: []string{}, From: []string{"f1", "f2"}}}),
+ locator1: {To: []string{}, From: []string{"f1", "f2"}}}),
PullListMapEquals,
map[string]PullList{})
c.Check(
BuildPullLists(map[Locator]PullServers{
- locator1: PullServers{To: []string{"t1"}, From: []string{"f1", "f2"}}}),
+ locator1: {To: []string{"t1"}, From: []string{"f1", "f2"}}}),
PullListMapEquals,
map[string]PullList{
- "t1": PullList{PullRequest{locator1, []string{"f1", "f2"}}}})
+ "t1": {PullRequest{locator1, []string{"f1", "f2"}}}})
c.Check(
BuildPullLists(map[Locator]PullServers{
- locator1: PullServers{To: []string{"t1"}, From: []string{}}}),
+ locator1: {To: []string{"t1"}, From: []string{}}}),
PullListMapEquals,
- map[string]PullList{"t1": PullList{
+ map[string]PullList{"t1": {
PullRequest{locator1, []string{}}}})
c.Check(
BuildPullLists(map[Locator]PullServers{
- locator1: PullServers{
+ locator1: {
To: []string{"t1", "t2"},
From: []string{"f1", "f2"},
}}),
PullListMapEquals,
map[string]PullList{
- "t1": PullList{PullRequest{locator1, []string{"f1", "f2"}}},
- "t2": PullList{PullRequest{locator1, []string{"f1", "f2"}}},
+ "t1": {PullRequest{locator1, []string{"f1", "f2"}}},
+ "t2": {PullRequest{locator1, []string{"f1", "f2"}}},
})
locator2 := Locator{Digest: blockdigest.MakeTestBlockDigest(0xCabbed)}
c.Check(
BuildPullLists(map[Locator]PullServers{
- locator1: PullServers{To: []string{"t1"}, From: []string{"f1", "f2"}},
- locator2: PullServers{To: []string{"t2"}, From: []string{"f3", "f4"}}}),
+ locator1: {To: []string{"t1"}, From: []string{"f1", "f2"}},
+ locator2: {To: []string{"t2"}, From: []string{"f3", "f4"}}}),
PullListMapEquals,
map[string]PullList{
- "t1": PullList{PullRequest{locator1, []string{"f1", "f2"}}},
- "t2": PullList{PullRequest{locator2, []string{"f3", "f4"}}},
+ "t1": {PullRequest{locator1, []string{"f1", "f2"}}},
+ "t2": {PullRequest{locator2, []string{"f3", "f4"}}},
})
c.Check(
BuildPullLists(map[Locator]PullServers{
- locator1: PullServers{
+ locator1: {
To: []string{"t1"},
From: []string{"f1", "f2"}},
- locator2: PullServers{
+ locator2: {
To: []string{"t2", "t1"},
From: []string{"f3", "f4"}},
}),
PullListMapEquals,
map[string]PullList{
- "t1": PullList{
+ "t1": {
PullRequest{locator1, []string{"f1", "f2"}},
PullRequest{locator2, []string{"f3", "f4"}},
},
- "t2": PullList{
+ "t2": {
PullRequest{locator2, []string{"f3", "f4"}},
},
})
locator4 := Locator{Digest: blockdigest.MakeTestBlockDigest(0xFedBeef)}
c.Check(
BuildPullLists(map[Locator]PullServers{
- locator1: PullServers{
+ locator1: {
To: []string{"t1"},
From: []string{"f1", "f2"}},
- locator2: PullServers{
+ locator2: {
To: []string{"t2", "t1"},
From: []string{"f3", "f4"}},
- locator3: PullServers{
+ locator3: {
To: []string{"t3", "t2", "t1"},
From: []string{"f4", "f5"}},
- locator4: PullServers{
+ locator4: {
To: []string{"t4", "t3", "t2", "t1"},
From: []string{"f1", "f5"}},
}),
PullListMapEquals,
map[string]PullList{
- "t1": PullList{
+ "t1": {
PullRequest{locator1, []string{"f1", "f2"}},
PullRequest{locator2, []string{"f3", "f4"}},
PullRequest{locator3, []string{"f4", "f5"}},
PullRequest{locator4, []string{"f1", "f5"}},
},
- "t2": PullList{
+ "t2": {
PullRequest{locator2, []string{"f3", "f4"}},
PullRequest{locator3, []string{"f4", "f5"}},
PullRequest{locator4, []string{"f1", "f5"}},
},
- "t3": PullList{
+ "t3": {
PullRequest{locator3, []string{"f4", "f5"}},
PullRequest{locator4, []string{"f1", "f5"}},
},
- "t4": PullList{
+ "t4": {
PullRequest{locator4, []string{"f1", "f5"}},
},
})
}
func TestToCollectionIndexSet(t *testing.T) {
- VerifyToCollectionIndexSet(t, []int{6}, map[int][]int{6: []int{0}}, []int{0})
- VerifyToCollectionIndexSet(t, []int{4}, map[int][]int{4: []int{1}}, []int{1})
- VerifyToCollectionIndexSet(t, []int{4}, map[int][]int{4: []int{1, 9}}, []int{1, 9})
+ VerifyToCollectionIndexSet(t, []int{6}, map[int][]int{6: {0}}, []int{0})
+ VerifyToCollectionIndexSet(t, []int{4}, map[int][]int{4: {1}}, []int{1})
+ VerifyToCollectionIndexSet(t, []int{4}, map[int][]int{4: {1, 9}}, []int{1, 9})
VerifyToCollectionIndexSet(t, []int{5, 6},
- map[int][]int{5: []int{2, 3}, 6: []int{3, 4}},
+ map[int][]int{5: {2, 3}, 6: {3, 4}},
[]int{2, 3, 4})
VerifyToCollectionIndexSet(t, []int{5, 6},
- map[int][]int{5: []int{8}, 6: []int{4}},
+ map[int][]int{5: {8}, 6: {4}},
[]int{4, 8})
- VerifyToCollectionIndexSet(t, []int{6}, map[int][]int{5: []int{0}}, []int{})
+ VerifyToCollectionIndexSet(t, []int{6}, map[int][]int{5: {0}}, []int{})
}
func TestSimpleSummary(t *testing.T) {
rc := collection.MakeTestReadCollections([]collection.TestCollectionSpec{
- collection.TestCollectionSpec{ReplicationLevel: 1, Blocks: []int{1, 2}},
+ {ReplicationLevel: 1, Blocks: []int{1, 2}},
})
rc.Summarize(nil)
cIndex := rc.CollectionIndicesForTesting()
func TestMissingBlock(t *testing.T) {
rc := collection.MakeTestReadCollections([]collection.TestCollectionSpec{
- collection.TestCollectionSpec{ReplicationLevel: 1, Blocks: []int{1, 2}},
+ {ReplicationLevel: 1, Blocks: []int{1, 2}},
})
rc.Summarize(nil)
cIndex := rc.CollectionIndicesForTesting()
func TestUnderAndOverReplicatedBlocks(t *testing.T) {
rc := collection.MakeTestReadCollections([]collection.TestCollectionSpec{
- collection.TestCollectionSpec{ReplicationLevel: 2, Blocks: []int{1, 2}},
+ {ReplicationLevel: 2, Blocks: []int{1, 2}},
})
rc.Summarize(nil)
cIndex := rc.CollectionIndicesForTesting()
func TestMixedReplication(t *testing.T) {
rc := collection.MakeTestReadCollections([]collection.TestCollectionSpec{
- collection.TestCollectionSpec{ReplicationLevel: 1, Blocks: []int{1, 2}},
- collection.TestCollectionSpec{ReplicationLevel: 1, Blocks: []int{3, 4}},
- collection.TestCollectionSpec{ReplicationLevel: 2, Blocks: []int{5, 6}},
+ {ReplicationLevel: 1, Blocks: []int{1, 2}},
+ {ReplicationLevel: 1, Blocks: []int{3, 4}},
+ {ReplicationLevel: 2, Blocks: []int{5, 6}},
})
rc.Summarize(nil)
cIndex := rc.CollectionIndicesForTesting()
ttl := int64(_ttl.(float64))
// expire unreferenced blocks more than "ttl" seconds old.
- expiry := time.Now().UTC().Unix() - ttl
+ expiry := time.Now().UTC().UnixNano() - ttl*1e9
return buildTrashListsInternal(writableServers, keepServerInfo, expiry, keepBlocksNotInCollections), nil
}
var keepServerInfo = keep.ReadServers{
KeepServerIndexToAddress: []keep.ServerAddress{sv0, sv1},
BlockToServers: map[blockdigest.DigestWithSize][]keep.BlockServerInfo{
- block0: []keep.BlockServerInfo{
- keep.BlockServerInfo{0, 99},
- keep.BlockServerInfo{1, 101}},
- block1: []keep.BlockServerInfo{
- keep.BlockServerInfo{0, 99},
- keep.BlockServerInfo{1, 101}}}}
+ block0: {
+ {0, 99},
+ {1, 101}},
+ block1: {
+ {0, 99},
+ {1, 101}}}}
// only block0 is in delete set
var bs = make(BlockSet)
// Test trash list where only sv0 is on writable list.
c.Check(buildTrashListsInternal(
map[string]struct{}{
- sv0.URL(): struct{}{}},
+ sv0.URL(): {}},
&keepServerInfo,
110,
bs),
DeepEquals,
map[string]keep.TrashList{
- "http://keep0.example.com:80": keep.TrashList{keep.TrashRequest{"000000000000000000000000deadbeef", 99}}})
+ "http://keep0.example.com:80": {keep.TrashRequest{"000000000000000000000000deadbeef", 99}}})
// Test trash list where both sv0 and sv1 are on writable list.
c.Check(buildTrashListsInternal(
map[string]struct{}{
- sv0.URL(): struct{}{},
- sv1.URL(): struct{}{}},
+ sv0.URL(): {},
+ sv1.URL(): {}},
&keepServerInfo,
110,
bs),
DeepEquals,
map[string]keep.TrashList{
- "http://keep0.example.com:80": keep.TrashList{keep.TrashRequest{"000000000000000000000000deadbeef", 99}},
- "http://keep1.example.com:80": keep.TrashList{keep.TrashRequest{"000000000000000000000000deadbeef", 101}}})
+ "http://keep0.example.com:80": {keep.TrashRequest{"000000000000000000000000deadbeef", 99}},
+ "http://keep1.example.com:80": {keep.TrashRequest{"000000000000000000000000deadbeef", 101}}})
// Test trash list where only block on sv0 is expired
c.Check(buildTrashListsInternal(
map[string]struct{}{
- sv0.URL(): struct{}{},
- sv1.URL(): struct{}{}},
+ sv0.URL(): {},
+ sv1.URL(): {}},
&keepServerInfo,
100,
bs),
DeepEquals,
map[string]keep.TrashList{
- "http://keep0.example.com:80": keep.TrashList{keep.TrashRequest{"000000000000000000000000deadbeef", 99}}})
+ "http://keep0.example.com:80": {keep.TrashRequest{"000000000000000000000000deadbeef", 99}}})
}
"math"
"os"
"runtime"
+ "sort"
"strings"
"sync"
"time"
}
// Run performs a balance operation using the given config and
-// runOptions. It should only be called once on a given Balancer
-// object. Typical usage:
+// runOptions, and returns RunOptions suitable for passing to a
+// subsequent balance operation.
//
-// err = (&Balancer{}).Run(config, runOptions)
-func (bal *Balancer) Run(config Config, runOptions RunOptions) (err error) {
+// Run should only be called once on a given Balancer object.
+//
+// Typical usage:
+//
+// runOptions, err = (&Balancer{}).Run(config, runOptions)
+func (bal *Balancer) Run(config Config, runOptions RunOptions) (nextRunOptions RunOptions, err error) {
+ nextRunOptions = runOptions
+
bal.Dumper = runOptions.Dumper
bal.Logger = runOptions.Logger
if bal.Logger == nil {
if err = bal.CheckSanityEarly(&config.Client); err != nil {
return
}
- if runOptions.CommitTrash {
+ rs := bal.rendezvousState()
+ if runOptions.CommitTrash && rs != runOptions.SafeRendezvousState {
+ if runOptions.SafeRendezvousState != "" {
+ bal.logf("notice: KeepServices list has changed since last run")
+ }
+ bal.logf("clearing existing trash lists, in case the new rendezvous order differs from previous run")
if err = bal.ClearTrashLists(&config.Client); err != nil {
return
}
+ // The current rendezvous state becomes "safe" (i.e.,
+ // OK to compute changes for that state without
+ // clearing existing trash lists) only now, after we
+ // succeed in clearing existing trash lists.
+ nextRunOptions.SafeRendezvousState = rs
}
if err = bal.GetCurrentState(&config.Client, config.CollectionBatchSize, config.CollectionBuffers); err != nil {
return
return nil
}
+// rendezvousState returns a fingerprint (e.g., a sorted list of
+// UUID+host+port) of the current set of keep services.
+func (bal *Balancer) rendezvousState() string {
+ srvs := make([]string, 0, len(bal.KeepServices))
+ for _, srv := range bal.KeepServices {
+ srvs = append(srvs, srv.String())
+ }
+ sort.Strings(srvs)
+ return strings.Join(srvs, "; ")
+}
+
// ClearTrashLists sends an empty trash list to each keep
// service. Calling this before GetCurrentState avoids races.
//
return err
}
bal.DefaultReplication = dd.DefaultCollectionReplication
- bal.MinMtime = time.Now().Unix() - dd.BlobSignatureTTL
+ bal.MinMtime = time.Now().UnixNano() - dd.BlobSignatureTTL*1e9
errs := make(chan error, 2+len(bal.KeepServices))
wg := sync.WaitGroup{}
}(srv)
}
var lastErr error
- for _ = range bal.KeepServices {
+ for range bal.KeepServices {
if err := <-errs; err != nil {
bal.logf("%v", err)
lastErr = err
s.stub.serveKeepstoreIndexFoo4Bar1()
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- err := (&Balancer{}).Run(s.config, opts)
+ _, err := (&Balancer{}).Run(s.config, opts)
c.Check(err, check.ErrorMatches, "received zero collections")
c.Check(trashReqs.Count(), check.Equals, 4)
c.Check(pullReqs.Count(), check.Equals, 0)
s.stub.serveFourDiskKeepServices()
indexReqs := s.stub.serveKeepstoreIndexFoo4Bar1()
trashReqs := s.stub.serveKeepstoreTrash()
- err := (&Balancer{}).Run(s.config, opts)
+ _, err := (&Balancer{}).Run(s.config, opts)
c.Check(err, check.IsNil)
c.Check(indexReqs.Count(), check.Equals, 0)
c.Check(trashReqs.Count(), check.Equals, 0)
s.stub.serveFourDiskKeepServices()
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- err := (&Balancer{}).Run(s.config, opts)
+ _, err := (&Balancer{}).Run(s.config, opts)
c.Check(err, check.ErrorMatches, "current user .* is not .* admin user")
c.Check(trashReqs.Count(), check.Equals, 0)
c.Check(pullReqs.Count(), check.Equals, 0)
s.stub.serveKeepstoreIndexFoo4Bar1()
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
- err := (&Balancer{}).Run(s.config, opts)
+ _, err := (&Balancer{}).Run(s.config, opts)
c.Check(err, check.ErrorMatches, `Retrieved 2 collections with modtime <= .* but server now reports there are 3 collections.*`)
c.Check(trashReqs.Count(), check.Equals, 4)
c.Check(pullReqs.Count(), check.Equals, 0)
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
var bal Balancer
- err := bal.Run(s.config, opts)
+ _, err := bal.Run(s.config, opts)
c.Check(err, check.IsNil)
c.Check(trashReqs.Count(), check.Equals, 0)
c.Check(pullReqs.Count(), check.Equals, 0)
trashReqs := s.stub.serveKeepstoreTrash()
pullReqs := s.stub.serveKeepstorePull()
var bal Balancer
- err := bal.Run(s.config, opts)
+ _, err := bal.Run(s.config, opts)
c.Check(err, check.IsNil)
c.Check(trashReqs.Count(), check.Equals, 8)
c.Check(pullReqs.Count(), check.Equals, 4)
s.config.RunPeriod = arvados.Duration(time.Millisecond)
go RunForever(s.config, opts, stop)
- // Each run should send 4 clear trash lists + 4 pull lists + 4
- // trash lists. We should complete four runs in much less than
+ // Each run should send 4 pull lists + 4 trash lists. The
+ // first run should also send 4 empty trash lists at
+ // startup. We should complete all four runs in much less than
// a second.
for t0 := time.Now(); pullReqs.Count() < 16 && time.Since(t0) < 10*time.Second; {
time.Sleep(time.Millisecond)
}
stop <- true
c.Check(pullReqs.Count() >= 16, check.Equals, true)
- c.Check(trashReqs.Count(), check.Equals, 2*pullReqs.Count())
+ c.Check(trashReqs.Count(), check.Equals, pullReqs.Count() + 4)
}
bal.KeepServices[srv.UUID] = srv
}
- bal.MinMtime = time.Now().Unix() - bal.signatureTTL
+ bal.MinMtime = time.Now().UnixNano() - bal.signatureTTL*1e9
}
func (bal *balancerSuite) TestPerfect(c *check.C) {
// replList is like srvList but returns an "existing replicas" slice,
// suitable for a BlockState test fixture.
func (bal *balancerSuite) replList(knownBlockID int, order slots) (repls []Replica) {
- mtime := time.Now().Unix() - bal.signatureTTL - 86400
+ mtime := time.Now().UnixNano() - (bal.signatureTTL+86400)*1e9
for _, srv := range bal.srvList(knownBlockID, order) {
repls = append(repls, Replica{srv, mtime})
mtime++
CommitTrash: true,
Logger: log.New(logBuf, "", log.LstdFlags),
}
- err := (&Balancer{}).Run(s.config, opts)
+ nextOpts, err := (&Balancer{}).Run(s.config, opts)
c.Check(err, check.IsNil)
+ c.Check(nextOpts.SafeRendezvousState, check.Not(check.Equals), "")
+ c.Check(nextOpts.CommitPulls, check.Equals, true)
if iter == 0 {
c.Check(logBuf.String(), check.Matches, `(?ms).*ChangeSet{Pulls:1.*`)
c.Check(logBuf.String(), check.Not(check.Matches), `(?ms).*ChangeSet{.*Trashes:[^0]}*`)
CommitTrash bool
Logger *log.Logger
Dumper *log.Logger
+
+ // SafeRendezvousState from the most recent balance operation,
+ // or "" if unknown. If this changes from one run to the next,
+ // we need to watch out for races. See
+ // (*Balancer)ClearTrashLists.
+ SafeRendezvousState string
}
var debugf = func(string, ...interface{}) {}
if err != nil {
// (don't run)
} else if runOptions.Once {
- err = (&Balancer{}).Run(config, runOptions)
+ _, err = (&Balancer{}).Run(config, runOptions)
} else {
err = RunForever(config, runOptions, nil)
}
logger.Print("======= Consider using -commit-pulls and -commit-trash flags.")
}
- err := (&Balancer{}).Run(config, runOptions)
+ bal := &Balancer{}
+ var err error
+ runOptions, err = bal.Run(config, runOptions)
if err != nil {
logger.Print("run failed: ", err)
} else {
// Trashed blob; exclude it from response
continue
}
- fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.Unix())
+ fmt.Fprintf(writer, "%s+%d %d\n", b.Name, b.Properties.ContentLength, t.UnixNano())
}
if resp.NextMarker == "" {
return nil
flag.IntVar(
&permissionTTLSec,
"blob-signature-ttl",
- int(time.Duration(2*7*24*time.Hour).Seconds()),
- "Lifetime of blob permission signatures. Modifying the ttl will invalidate all existing signatures. "+
+ 2*7*24*3600,
+ "Lifetime of blob permission signatures in seconds. Modifying the ttl will invalidate all existing signatures. "+
"See services/api/config/application.default.yml.")
flag.BoolVar(
&flagSerializeIO,
}
func s3regions() (okList []string) {
- for r, _ := range aws.Regions {
+ for r := range aws.Regions {
okList = append(okList, r)
}
return
if !v.isKeepBlock(key.Key) {
continue
}
- fmt.Fprintf(writer, "%s+%d %d\n", key.Key, key.Size, t.Unix())
+ fmt.Fprintf(writer, "%s+%d %d\n", key.Key, key.Size, t.UnixNano())
}
if !listResp.IsTruncated {
break
// TrashItem deletes the indicated block from every writable volume.
func TrashItem(trashRequest TrashRequest) {
- reqMtime := time.Unix(trashRequest.BlockMtime, 0)
+ reqMtime := time.Unix(0, trashRequest.BlockMtime)
if time.Since(reqMtime) < blobSignatureTTL {
log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
time.Since(reqMtime),
log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
continue
}
- if trashRequest.BlockMtime != mtime.Unix() {
- log.Printf("%v Delete(%v): mtime on volume is %v does not match trash list value %v", volume, trashRequest.Locator, mtime.Unix(), trashRequest.BlockMtime)
+ if trashRequest.BlockMtime != mtime.UnixNano() {
+ log.Printf("%v Delete(%v): stored mtime %v does not match trash list value %v", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
continue
}
// Create TrashRequest for the test
trashRequest := TrashRequest{
Locator: testData.DeleteLocator,
- BlockMtime: oldBlockTime.Unix(),
+ BlockMtime: oldBlockTime.UnixNano(),
}
// Run trash worker and put the trashRequest on trashq
"os"
"regexp"
"sort"
+ "strconv"
"strings"
"time"
v := factory(t)
defer v.Teardown()
+ // minMtime and maxMtime are the minimum and maximum
+ // acceptable values the index can report for our test
+ // blocks. 1-second precision is acceptable.
+ minMtime := time.Now().UTC().UnixNano()
+ minMtime -= minMtime % 1e9
+
v.PutRaw(TestHash, TestBlock)
v.PutRaw(TestHash2, TestBlock2)
v.PutRaw(TestHash3, TestBlock3)
+ maxMtime := time.Now().UTC().UnixNano()
+ if maxMtime%1e9 > 0 {
+ maxMtime -= maxMtime % 1e9
+ maxMtime += 1e9
+ }
+
// Blocks whose names aren't Keep hashes should be omitted from
// index
v.PutRaw("fffffffffnotreallyahashfffffffff", nil)
indexRows := strings.Split(string(buf.Bytes()), "\n")
sort.Strings(indexRows)
sortedIndex := strings.Join(indexRows, "\n")
- m, err := regexp.MatchString(
- `^\n`+TestHash+`\+\d+ \d+\n`+
- TestHash3+`\+\d+ \d+\n`+
- TestHash2+`\+\d+ \d+$`,
- sortedIndex)
- if err != nil {
- t.Error(err)
- } else if !m {
+ m := regexp.MustCompile(
+ `^\n` + TestHash + `\+\d+ (\d+)\n` +
+ TestHash3 + `\+\d+ \d+\n` +
+ TestHash2 + `\+\d+ \d+$`,
+ ).FindStringSubmatch(sortedIndex)
+ if m == nil {
t.Errorf("Got index %q for empty prefix", sortedIndex)
+ } else {
+ mtime, err := strconv.ParseInt(m[1], 10, 64)
+ if err != nil {
+ t.Error(err)
+ } else if mtime < minMtime || mtime > maxMtime {
+ t.Errorf("got %d for TestHash timestamp, expected %d <= t <= %d",
+ mtime, minMtime, maxMtime)
+ }
}
for _, prefix := range []string{"f", "f15", "f15ac"} {
for _, prefix := range []string{"zero", "zip", "zilch"} {
buf = new(bytes.Buffer)
- v.IndexTo(prefix, buf)
+ err := v.IndexTo(prefix, buf)
if err != nil {
t.Errorf("Got error on IndexTo with no such prefix %v", err.Error())
} else if buf.Len() != 0 {
return e
}
defer unlockfile(f)
- now := time.Now().Unix()
- utime := syscall.Utimbuf{now, now}
- return syscall.Utime(p, &utime)
+ ts := syscall.NsecToTimespec(time.Now().UnixNano())
+ return syscall.UtimesNano(p, []syscall.Timespec{ts, ts})
}
// Mtime returns the stored timestamp for the given locator.
_, err = fmt.Fprint(w,
name,
"+", fileInfo[0].Size(),
- " ", fileInfo[0].ModTime().Unix(),
+ " ", fileInfo[0].ModTime().UnixNano(),
"\n")
}
blockdir.Close()
gate := make(chan struct{})
go func() {
<-gate
- for _ = range b.NextItem {
+ for range b.NextItem {
<-gate
time.Sleep(time.Millisecond)
b.DoneItem <- struct{}{}
# libcloud compute drivers typically raise bare Exceptions to
# represent API errors. Return True for any exception that is
# exactly an Exception, or a better-known higher-level exception.
- if (exception is BaseHTTPError and
- self.message and self.message.startswith("InvalidInstanceID.NotFound")):
+ if (type(exception) is BaseHTTPError and
+ exception.message and
+ (exception.message.startswith("InvalidInstanceID.NotFound") or
+ exception.message.startswith("InstanceLimitExceeded"))):
return True
return (isinstance(exception, cls.CLOUD_ERRORS) or
type(exception) is Exception)
import pykka
import threading
+from libcloud.common.exceptions import BaseHTTPError
+
import arvnodeman.computenode.dispatch as dispatch
+from arvnodeman.computenode.driver import BaseComputeNodeDriver
from . import testutil
class ComputeNodeSetupActorTestCase(testutil.ActorTestMixin, unittest.TestCase):
self.api_client.nodes().update().execute.side_effect = arvados_effect
self.cloud_client = mock.MagicMock(name='cloud_client')
self.cloud_client.create_node.return_value = testutil.cloud_node_mock(1)
+ self.cloud_client.is_cloud_exception = BaseComputeNodeDriver.is_cloud_exception
def make_actor(self, arv_node=None):
if not hasattr(self, 'timer'):
self.make_actor()
self.wait_for_assignment(self.setup_actor, 'cloud_node')
+ def test_unknown_basehttperror_not_retried(self):
+ self.make_mocks()
+ self.cloud_client.create_node.side_effect = [
+ BaseHTTPError(400, "Unknown"),
+ self.cloud_client.create_node.return_value,
+ ]
+ self.make_actor()
+ finished = threading.Event()
+ self.setup_actor.subscribe(lambda _: finished.set())
+ assert(finished.wait(self.TIMEOUT))
+ self.assertEqual(0, self.cloud_client.post_create_node.call_count)
+
+ def test_known_basehttperror_retried(self):
+ self.make_mocks()
+ self.cloud_client.create_node.side_effect = [
+ BaseHTTPError(400, "InstanceLimitExceeded"),
+ self.cloud_client.create_node.return_value,
+ ]
+ self.make_actor()
+ self.wait_for_assignment(self.setup_actor, 'cloud_node')
+ self.assertEqual(1, self.cloud_client.post_create_node.call_count)
+
def test_failed_post_create_retried(self):
self.make_mocks()
self.cloud_client.post_create_node.side_effect = [
}
run() {
+ CONFIG=$1
+ TAG=$2
+
if docker ps -a --filter "status=running" | grep -E "$ARVBOX_CONTAINER$" -q ; then
echo "Container $ARVBOX_CONTAINER is already running"
exit 0
echo "Container $ARVBOX_CONTAINER already exists but is not running; use restart or rebuild"
exit 1
fi
+
+ if test ! -z "$TAG"
+ then
+ TAG=":$TAG"
+ fi
- if echo "$1" | grep '^public' ; then
+ if echo "$CONFIG" | grep '^public' ; then
if test -n "$ARVBOX_PUBLISH_IP" ; then
localip=$ARVBOX_PUBLISH_IP
else
PUBLIC=""
fi
- if echo "$1" | grep 'demo$' ; then
+ if echo "$CONFIG" | grep 'demo$' ; then
if test -d "$ARVBOX_DATA" ; then
echo "It looks like you already have a development container named $ARVBOX_CONTAINER."
echo "Set ARVBOX_CONTAINER to set a different name for your demo container"
--privileged \
--volumes-from $ARVBOX_CONTAINER-data \
$PUBLIC \
- arvados/arvbox-demo
+ arvados/arvbox-demo$TAG
updateconf
wait_for_arvbox
else
git clone https://github.com/curoverse/sso-devise-omniauth-provider.git "$SSO_ROOT"
fi
- if test "$1" = test ; then
- shift
+ if test "$CONFIG" = test ; then
mkdir -p $VAR_DATA/test
"--volume=$GEMS:/var/lib/gems:rw" \
"--volume=$PIPCACHE:/var/lib/pip:rw" \
"--volume=$GOSTUFF:/var/lib/gopath:rw" \
- arvados/arvbox-dev \
+ arvados/arvbox-dev$TAG \
/usr/local/bin/runsvinit -svdir=/etc/test-service
docker exec -ti \
WORKSPACE=/usr/src/arvados \
GEM_HOME=/var/lib/gems \
"$@"
- elif echo "$1" | grep 'dev$' ; then
+ elif echo "$CONFIG" | grep 'dev$' ; then
docker run \
--detach \
--name=$ARVBOX_CONTAINER \
"--volume=$PIPCACHE:/var/lib/pip:rw" \
"--volume=$GOSTUFF:/var/lib/gopath:rw" \
$PUBLIC \
- arvados/arvbox-dev
+ arvados/arvbox-dev$TAG
updateconf
wait_for_arvbox
echo "The Arvados source code is checked out at: $ARVADOS_ROOT"
else
- echo "Unknown configuration '$1'"
+ echo "Unknown configuration '$CONFIG'"
fi
fi
}
echo
echo "build <config> build arvbox Docker image"
echo "rebuild <config> build arvbox Docker image, no layer cache"
- echo "start|run <config> start $ARVBOX_CONTAINER container"
+ echo "start|run <config> [tag] start $ARVBOX_CONTAINER container"
echo "open open arvbox workbench in a web browser"
echo "shell enter arvbox shell"
echo "ip print arvbox docker container ip address"