*/script/rails
sdk/cwl/tests/input/blorp.txt
sdk/cwl/tests/tool/blub.txt
+sdk/cwl/tests/19109-upload-secondary/*
sdk/cwl/tests/federation/data/*
sdk/cwl/tests/fake-keep-mount/fake_collection_dir/.arvados#collection
sdk/go/manifest/testdata/*_manifest
net-ssh-gateway (2.0.0)
net-ssh (>= 4.0.0)
nio4r (2.5.8)
- nokogiri (1.13.4)
+ nokogiri (1.13.6)
mini_portile2 (~> 2.8.0)
racc (~> 1.4)
npm-rails (0.2.1)
"Provide authenticated http access to Arvados-hosted git repositories"
package_go_binary services/crunch-dispatch-local crunch-dispatch-local "$FORMAT" "$ARCH" \
"Dispatch Crunch containers on the local system"
-package_go_binary services/crunch-dispatch-slurm crunch-dispatch-slurm "$FORMAT" "$ARCH" \
+package_go_binary cmd/arvados-server crunch-dispatch-slurm "$FORMAT" "$ARCH" \
"Dispatch Crunch containers to a SLURM cluster"
package_go_binary cmd/arvados-server crunch-run "$FORMAT" "$ARCH" \
"Supervise a single Crunch container"
package_go_binary services/crunchstat crunchstat "$FORMAT" "$ARCH" \
"Gather cpu/memory/network statistics of running Crunch jobs"
-package_go_binary services/health arvados-health "$FORMAT" "$ARCH" \
+package_go_binary cmd/arvados-server arvados-health "$FORMAT" "$ARCH" \
"Check health of all Arvados cluster services"
package_go_binary cmd/arvados-server keep-balance "$FORMAT" "$ARCH" \
"Rebalance and garbage-collect data blocks stored in Arvados Keep"
StartLimitIntervalSec=0
[Service]
-Type=simple
+Type=notify
+EnvironmentFile=-/etc/arvados/environment
ExecStart=/usr/bin/arvados-health
# Set a reasonable default for the open file limit
LimitNOFILE=65536
package main
import (
+ "context"
"encoding/json"
"fmt"
"io"
"git.arvados.org/arvados.git/lib/install"
"git.arvados.org/arvados.git/lib/lsf"
"git.arvados.org/arvados.git/lib/recovercollection"
+ "git.arvados.org/arvados.git/lib/service"
+ "git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/health"
"git.arvados.org/arvados.git/services/githttpd"
keepbalance "git.arvados.org/arvados.git/services/keep-balance"
"git.arvados.org/arvados.git/services/keepproxy"
"git.arvados.org/arvados.git/services/keepstore"
"git.arvados.org/arvados.git/services/ws"
+ "github.com/prometheus/client_golang/prometheus"
)
var (
"dispatch-cloud": dispatchcloud.Command,
"dispatch-lsf": lsf.DispatchCommand,
"git-httpd": githttpd.Command,
+ "health": healthCommand,
"install": install.Command,
"init": install.InitCommand,
"keep-balance": keepbalance.Command,
}
return 0
}
+
+var healthCommand cmd.Handler = service.Command(arvados.ServiceNameHealth, func(ctx context.Context, cluster *arvados.Cluster, _ string, reg *prometheus.Registry) service.Handler {
+ mClockSkew := prometheus.NewGauge(prometheus.GaugeOpts{
+ Namespace: "arvados",
+ Subsystem: "health",
+ Name: "clock_skew_seconds",
+ Help: "Clock skew observed in most recent health check",
+ })
+ reg.MustRegister(mClockSkew)
+ return &health.Aggregator{
+ Cluster: cluster,
+ MetricClockSkew: mClockSkew,
+ }
+})
Description=Arvados Crunch Dispatcher for SLURM
Documentation=https://doc.arvados.org/
After=network.target
+AssertPathExists=/etc/arvados/config.yml
# systemd>=230 (debian:9) obeys StartLimitIntervalSec in the [Unit] section
StartLimitIntervalSec=0
[Service]
Type=notify
+EnvironmentFile=-/etc/arvados/environment
ExecStart=/usr/bin/crunch-dispatch-slurm
# Set a reasonable default for the open file limit
LimitNOFILE=65536
Restart=always
RestartSec=1
-LimitNOFILE=1000000
# systemd<=219 (centos:7, debian:8, ubuntu:trusty) obeys StartLimitInterval in the [Service] section
StartLimitInterval=0
client_max_body_size 128m;
location / {
- proxy_pass http://controller;
- proxy_redirect off;
- proxy_connect_timeout 90s;
- proxy_read_timeout 300s;
+ proxy_pass http://controller;
+ proxy_redirect off;
+ proxy_connect_timeout 90s;
+ proxy_read_timeout 300s;
+ proxy_max_temp_file_size 0;
+ proxy_request_buffering off;
+ proxy_buffering off;
+ proxy_http_version 1.1;
proxy_set_header Host $http_host;
proxy_set_header Upgrade $http_upgrade;
"previous: Upgrading to 2.4.0":#v2_4_0
+h3. Slurm dispatcher requires configuration update
+
+If you use the Slurm dispatcher (@crunch-dispatch-slurm@) you must add a @Services.DispatchSLURM.InternalURLs@ section to your configuration file, as shown on the "updated install page":{{site.baseurl}}/install/crunch2-slurm/install-dispatch.html.
+
+h3. New proxy parameters for arvados-controller
+
+We now recommend disabling nginx proxy caching for arvados-controller, to avoid truncation of large responses.
+
+In your Nginx configuration file (@/etc/nginx/conf.d/arvados-api-and-controller.conf@), add the following lines to the @location /@ block with @http://controller@ (see "Update nginx configuration":{{site.baseurl}}/install/install-api-server.html#update-nginx for an example) and reload/restart Nginx (@sudo nginx -s reload@).
+
+<pre>
+ proxy_max_temp_file_size 0;
+ proxy_request_buffering off;
+ proxy_buffering off;
+ proxy_http_version 1.1;
+</pre>
+
h3. Now recommending Singularity 3.9.9
The compute image "build script":{{site.baseurl}}/install/crunch2-cloud/install-compute-node.html now installs Singularity 3.9.9 instead of 3.7.4. The newer version includes a bugfix that should resolve "intermittent loopback device errors":https://dev.arvados.org/issues/18489 when running containers.
+h3. Changes to @arvados-cwl-runner --create-workflow@ and @--update-workflow@
+
+When using @arvados-cwl-runner --create-workflow@ or @--update-workflow@, by default it will now make a copy of all collection and Docker image dependencies in the target project. Running workflows retains the old behavior (use the dependencies wherever they are found). The can be controlled explicit with @--copy-deps@ and @--no-copy-deps@.
+
h2(#v2_4_0). v2.4.0 (2022-04-08)
"previous: Upgrading to 2.3.1":#v2_3_1
|runtime_user_uuid|string|The user permission that will be granted to this container.||
|runtime_auth_scopes|array of string|The scopes associated with the auth token used to run this container.||
|output_storage_classes|array of strings|The storage classes that will be used for the log and output collections of this container request|default is ["default"]|
+|output_properties|hash|User metadata properties to set on the output collection. The output collection will also have default properties "type" ("intermediate" or "output") and "container_request" (the uuid of container request that produced the collection).|
h2(#priority). Priority
|gateway_address|string|Address (host:port) of gateway server.|Internal use only.|
|interactive_session_started|boolean|Indicates whether @arvados-client shell@ has been used to run commands in the container, which may have altered the container's behavior and output.||
|output_storage_classes|array of strings|The storage classes that will be used for the log and output collections of this container||
+|output_properties|hash|User metadata properties to set on the output collection.|
h2(#container_states). Container states
The Arvados Slurm dispatcher can run on any node that can submit requests to both the Arvados API server and the Slurm controller (via @sbatch@). It is not resource-intensive, so you can run it on the API server node.
-h2(#update-config). Update config.yml (optional)
+h2(#update-config). Update config.yml
-Crunch-dispatch-slurm reads the common configuration file at @config.yml@.
+Crunch-dispatch-slurm reads the common configuration file at @/etc/arvados/config.yml@.
+
+Add a DispatchSLURM entry to the Services section, using the hostname where @crunch-dispatch-slurm@ will run, and an available port:
+
+<notextile>
+<pre> Services:
+ DispatchSLURM:
+ InternalURLs:
+ "http://<code class="userinput">hostname.zzzzz.arvadosapi.com:9007</code>": {}</pre>
+</notextile>
The following configuration parameters are optional.
client_max_body_size 128m;
location / {
- proxy_pass http://controller;
- proxy_redirect off;
- proxy_connect_timeout 90s;
- proxy_read_timeout 300s;
+ proxy_pass http://controller;
+ proxy_redirect off;
+ proxy_connect_timeout 90s;
+ proxy_read_timeout 300s;
+ proxy_max_temp_file_size 0;
+ proxy_request_buffering off;
+ proxy_buffering off;
+ proxy_http_version 1.1;
proxy_set_header Host $http_host;
proxy_set_header Upgrade $http_upgrade;
property1: value1
property2: $(inputs.value2)
+ arv:OutputCollectionProperties:
+ outputProperties:
+ property1: value1
+ property2: $(inputs.value2)
+
cwltool:CUDARequirement:
cudaVersionMin: "11.0"
cudaComputeCapability: "9.0"
table(table table-bordered table-condensed).
|_. Field |_. Type |_. Description |
-|processProperties|key-value map, or list of objects with the fields {propertyName, propertyValue}|The properties that will be set on the container request. May include expressions that reference `$(inputs)` of the current workflow or tool.|
+|processProperties|key-value map, or list of objects with the fields {propertyName, propertyValue}|The properties that will be set on the container request. May include expressions that reference @$(inputs)@ of the current workflow or tool.|
+
+h2(#OutputCollectionProperties). arv:OutputCollectionProperties
+
+Specify custom "properties":{{site.baseurl}}/api/methods.html#subpropertyfilters that will be set on the output collection of the workflow step.
+
+table(table table-bordered table-condensed).
+|_. Field |_. Type |_. Description |
+|outputProperties|key-value map, or list of objects with the fields {propertyName, propertyValue}|The properties that will be set on the output collection. May include expressions that reference @$(inputs)@ of the current workflow or tool.|
h2(#CUDARequirement). cwltool:CUDARequirement
The @--match-submitter-images@ option will check the id of the image in the local Docker instance and compare it to the id of the image already in Arvados with the same name and tag. If they are different, it will choose the image matching the local image id, which will be uploaded it if necessary. This helpful for development, if you locally rebuild the image with the 'latest' tag, the @--match-submitter-images@ will ensure that the newer version is used.
+h3(#dependencies). Dependencies
+
+Dependencies include collections and Docker images referenced by the workflow. Dependencies are automatically uploaded by @arvados-cwl-runner@ if they are not present or need to be updated. When running a workflow, dependencies that already exist somewhere on the Arvados instance (from a previous upload) will not be uploaded or copied, regardless of the project they are located in. Sometimes this creates problems when sharing a workflow run with others. In this case, use @--copy-deps@ to indicate that you want all dependencies to be copied into the destination project (specified by @--project-uuid@).
+
h3. Command line options
See "arvados-cwl-runner options":{{site.baseurl}}/user/cwl/cwl-run-options.html
h2(#registering). Registering a workflow to use in Workbench
-Use @--create-workflow@ to register a CWL workflow with Arvados. This enables you to share workflows with other Arvados users, and run them by clicking the <span class="btn btn-sm btn-primary"><i class="fa fa-fw fa-gear"></i> Run a process...</span> button on the Workbench Dashboard and on the command line by UUID.
+Use @--create-workflow@ to register a CWL workflow with Arvados. Use @--project-uuid@ to upload the workflow to a specific project, along with its dependencies. You can share the workflow with other Arvados users by sharing that project. You can run the workflow by clicking the <span class="btn btn-sm btn-primary"><i class="fa fa-fw fa-gear"></i> Run a process...</span> button on the Workbench Dashboard, and on the command line by UUID.
<notextile>
-<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --create-workflow bwa-mem.cwl</span>
+<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --project-uuid zzzzz-j7d0g-p32bi47ogkjke11 --create-workflow bwa-mem.cwl</span>
arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Upload local files: "bwa-mem.cwl"
2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Uploaded to zzzzz-4zz18-7e0hedrmkuyoei3
You can provide a partial input file to set default values for the workflow input parameters. You can also use the @--name@ option to set the name of the workflow:
<notextile>
-<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --name "My workflow with defaults" --create-workflow bwa-mem.cwl bwa-mem-template.yml</span>
+<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --name "My workflow with defaults" --project-uuid zzzzz-j7d0g-p32bi47ogkjke11 --create-workflow bwa-mem.cwl bwa-mem-template.yml</span>
arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
2016-07-01 14:09:50 arvados.arv-run[3730] INFO: Upload local files: "bwa-mem.cwl"
2016-07-01 14:09:50 arvados.arv-run[3730] INFO: Uploaded to zzzzz-4zz18-0f91qkovk4ml18o
</code></pre>
</notextile>
+Use @--update-workflow <uuid>@ to update an existing workflow.
+
+<notextile>
+<pre><code>~/arvados/doc/user/cwl/bwa-mem$ <span class="userinput">arvados-cwl-runner --update-workflow zzzzz-p5p6p-zuniv58hn8d0qd8 bwa-mem.cwl</span>
+arvados-cwl-runner 1.0.20160628195002, arvados-python-client 0.1.20160616015107, cwltool 1.0.20160629140624
+2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Upload local files: "bwa-mem.cwl"
+2016-07-01 12:21:01 arvados.arv-run[15796] INFO: Uploaded to zzzzz-4zz18-7e0hedrmkuyoei3
+2016-07-01 12:21:01 arvados.cwl-runner[15796] INFO: Created template zzzzz-p5p6p-zuniv58hn8d0qd8
+zzzzz-p5p6p-zuniv58hn8d0qd8
+</code></pre>
+</notextile>
+
+When using @--create-workflow@ or @--update-workflow@, the @--copy-deps@ and @--match-submitter-images@ options are enabled by default.
+
h3. Running registered workflows at the command line
You can run a registered workflow at the command line by its UUID:
runNginx{},
runServiceCommand{name: "controller", svc: super.cluster.Services.Controller, depends: []supervisedTask{seedDatabase{}}},
runServiceCommand{name: "git-httpd", svc: super.cluster.Services.GitHTTP},
- runGoProgram{src: "services/health", svc: super.cluster.Services.Health},
+ runServiceCommand{name: "health", svc: super.cluster.Services.Health},
runServiceCommand{name: "keepproxy", svc: super.cluster.Services.Keepproxy, depends: []supervisedTask{runPassenger{src: "services/api"}}},
runServiceCommand{name: "keepstore", svc: super.cluster.Services.Keepstore},
runServiceCommand{name: "keep-web", svc: super.cluster.Services.WebDAV},
DispatchLSF:
InternalURLs: {SAMPLE: {}}
ExternalURL: ""
+ DispatchSLURM:
+ InternalURLs: {SAMPLE: {}}
+ ExternalURL: ""
Keepproxy:
InternalURLs: {SAMPLE: {}}
ExternalURL: ""
ManagementToken: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
SystemRootToken: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
Collections:
- BlobSigningKey: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa`, &logbuf).Load()
+ BlobSigningKey: aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
+ InstanceTypes:
+ abc:
+ IncludedScratch: 123456
+`, &logbuf).Load()
c.Assert(err, check.IsNil)
yaml, err := yaml.Marshal(cfg)
c.Assert(err, check.IsNil)
},
{
arvados.EndpointAPIClientAuthorizationList,
- func() interface{} { return &arvados.ListOptions{} },
+ func() interface{} { return &arvados.ListOptions{Limit: -1} },
func(ctx context.Context, opts interface{}) (interface{}, error) {
return rtr.backend.APIClientAuthorizationList(ctx, *opts.(*arvados.ListOptions))
},
shouldCall: "CollectionList",
withOptions: arvados.ListOptions{Limit: -1},
},
+ {
+ method: "GET",
+ path: "/arvados/v1/api_client_authorizations",
+ shouldCall: "APIClientAuthorizationList",
+ withOptions: arvados.ListOptions{Limit: -1},
+ },
{
method: "GET",
path: "/arvados/v1/collections?limit=123&offset=456&include_trash=true&include_old_versions=1",
"--storage-classes", strings.Join(runner.Container.OutputStorageClasses, ","),
fmt.Sprintf("--crunchstat-interval=%v", runner.statInterval.Seconds())}
- if runner.executor.Runtime() == "docker" {
+ if _, isdocker := runner.executor.(*dockerExecutor); isdocker {
arvMountCmd = append(arvMountCmd, "--allow-other")
}
func (runner *ContainerRunner) Run() (err error) {
runner.CrunchLog.Printf("crunch-run %s started", cmd.Version.String())
runner.CrunchLog.Printf("%s", currentUserAndGroups())
- runner.CrunchLog.Printf("Executing container '%s' using %s runtime", runner.Container.UUID, runner.executor.Runtime())
+ v, _ := exec.Command("arv-mount", "--version").CombinedOutput()
+ runner.CrunchLog.Printf("Using FUSE mount: %s", v)
+ runner.CrunchLog.Printf("Using container runtime: %s", runner.executor.Runtime())
+ runner.CrunchLog.Printf("Executing container: %s", runner.Container.UUID)
hostname, hosterr := os.Hostname()
if hosterr != nil {
"testing"
"time"
+ "git.arvados.org/arvados.git/lib/cmd"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
return e.loadErr
}
func (e *stubExecutor) Runtime() string { return "stub" }
+func (e *stubExecutor) Version() string { return "stub " + cmd.Version.String() }
func (e *stubExecutor) Create(spec containerSpec) error { e.created = spec; return e.createErr }
func (e *stubExecutor) Start() error { e.exit = make(chan int, 1); go e.runFunc(); return e.startErr }
func (e *stubExecutor) CgroupID() string { return "cgroupid" }
c.Assert(s.api.Logs["crunch-run"], NotNil)
c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*crunch-run \S+ \(go\S+\) start.*`)
c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*crunch-run process has uid=\d+\(.+\) gid=\d+\(.+\) groups=\d+\(.+\)(,\d+\(.+\))*\n.*`)
- c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Executing container 'zzzzz-zzzzz-zzzzzzzzzzzzzzz' using stub runtime.*`)
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Executing container: zzzzz-zzzzz-zzzzzzzzzzzzzzz.*`)
+ c.Check(s.api.Logs["crunch-run"].String(), Matches, `(?ms).*Using container runtime: stub.*`)
}
func (s *TestSuite) TestContainerRecordLog(c *C) {
}, err
}
-func (e *dockerExecutor) Runtime() string { return "docker" }
+func (e *dockerExecutor) Runtime() string {
+ v, _ := e.dockerclient.ServerVersion(context.Background())
+ info := ""
+ for _, cv := range v.Components {
+ if info != "" {
+ info += ", "
+ }
+ info += cv.Name + " " + cv.Version
+ }
+ if info == "" {
+ info = "(unknown version)"
+ }
+ return "docker " + info
+}
func (e *dockerExecutor) LoadImage(imageID string, imageTarballPath string, container arvados.Container, arvMountPoint string,
containerClient *arvados.Client) error {
// Release resources (temp dirs, stopped containers)
Close()
- // Name of runtime engine ("docker", "singularity")
+ // Name and version of runtime engine ("docker 20.10.16", "singularity-ce version 3.9.9")
Runtime() string
GatewayTarget
}
func (s *executorSuite) TestExecTrivialContainer(c *C) {
+ c.Logf("Using container runtime: %s", s.executor.Runtime())
s.spec.Command = []string{"echo", "ok"}
s.checkRun(c, 0)
c.Check(s.stdout.String(), Equals, "ok\n")
func (s *integrationSuite) TestRunTrivialContainerWithDocker(c *C) {
s.engine = "docker"
s.testRunTrivialContainer(c)
+ c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*Using container runtime: docker Engine \d+\.\d+.*`)
}
func (s *integrationSuite) TestRunTrivialContainerWithSingularity(c *C) {
s.engine = "singularity"
s.testRunTrivialContainer(c)
+ c.Check(s.logFiles["crunch-run.txt"], Matches, `(?ms).*Using container runtime: singularity.* version 3\.\d+.*`)
}
func (s *integrationSuite) TestRunTrivialContainerWithLocalKeepstore(c *C) {
"regexp"
"sort"
"strconv"
+ "strings"
"syscall"
"time"
}, nil
}
-func (e *singularityExecutor) Runtime() string { return "singularity" }
+func (e *singularityExecutor) Runtime() string {
+ buf, err := exec.Command("singularity", "--version").CombinedOutput()
+ if err != nil {
+ return "singularity (unknown version)"
+ }
+ return strings.TrimSuffix(string(buf), "\n")
+}
func (e *singularityExecutor) getOrCreateProject(ownerUuid string, name string, containerClient *arvados.Client) (*arvados.Group, error) {
var gp arvados.GroupList
// us to select specific devices we need to propagate that.
env = append(env, "SINGULARITYENV_CUDA_VISIBLE_DEVICES="+cudaVisibleDevices)
}
+ // Singularity's default behavior is to evaluate each
+ // SINGULARITYENV_* env var with a shell as a double-quoted
+ // string and pass the result to the contained
+ // process. Singularity 3.10+ has an option to pass env vars
+ // through literally without evaluating, which is what we
+ // want. See https://github.com/sylabs/singularity/pull/704
+ // and https://dev.arvados.org/issues/19081
+ env = append(env, "SINGULARITY_NO_EVAL=1")
args = append(args, e.imageFilename)
args = append(args, e.spec.Command...)
e.imageFilename = "/fake/image.sif"
cmd := e.execCmd("./singularity")
c.Check(cmd.Args, DeepEquals, []string{"./singularity", "exec", "--containall", "--cleanenv", "--pwd=/WorkingDir", "--net", "--network=none", "--nv", "--bind", "/hostpath:/mnt:ro", "/fake/image.sif"})
- c.Check(cmd.Env, DeepEquals, []string{"SINGULARITYENV_FOO=bar"})
+ c.Check(cmd.Env, DeepEquals, []string{"SINGULARITYENV_FOO=bar", "SINGULARITY_NO_EVAL=1"})
}
}
if dev || test {
pkgs = append(pkgs, "squashfs-tools") // for singularity
+ pkgs = append(pkgs, "gnupg") // for docker install recipe
}
switch {
case osv.Debian && osv.Major >= 11:
}
}
+ if dev || test {
+ if havedockerversion, err := exec.Command("docker", "--version").CombinedOutput(); err == nil {
+ logger.Printf("%s installed, assuming that version is ok", bytes.TrimSuffix(havedockerversion, []byte("\n")))
+ } else if osv.Debian {
+ var codename string
+ switch osv.Major {
+ case 10:
+ codename = "buster"
+ case 11:
+ codename = "bullseye"
+ default:
+ err = fmt.Errorf("don't know how to install docker-ce for debian %d", osv.Major)
+ return 1
+ }
+ err = inst.runBash(`
+rm -f /usr/share/keyrings/docker-archive-keyring.gpg
+curl -fsSL https://download.docker.com/linux/debian/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg
+echo 'deb [arch=amd64 signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/debian/ `+codename+` stable' | \
+ tee /etc/apt/sources.list.d/docker.list
+apt-get update
+DEBIAN_FRONTEND=noninteractive apt-get --yes --no-install-recommends install docker-ce
+`, stdout, stderr)
+ if err != nil {
+ return 1
+ }
+ } else {
+ err = fmt.Errorf("don't know how to install docker for osversion %v", osv)
+ return 1
+ }
+ }
+
os.Mkdir("/var/lib/arvados", 0755)
os.Mkdir("/var/lib/arvados/tmp", 0700)
if prod || pkg {
exgroup.add_argument("--enable-preemptible", dest="enable_preemptible", default=None, action="store_true", help="Use preemptible instances. Control individual steps with arv:UsePreemptible hint.")
exgroup.add_argument("--disable-preemptible", dest="enable_preemptible", default=None, action="store_false", help="Don't use preemptible instances.")
+ exgroup = parser.add_mutually_exclusive_group()
+ exgroup.add_argument("--copy-deps", dest="copy_deps", default=None, action="store_true", help="Copy dependencies into the destination project.")
+ exgroup.add_argument("--no-copy-deps", dest="copy_deps", default=None, action="store_false", help="Leave dependencies where they are.")
+
parser.add_argument(
"--skip-schemas",
action="store_true",
"http://arvados.org/cwl#ProcessProperties",
"http://commonwl.org/cwltool#CUDARequirement",
"http://arvados.org/cwl#UsePreemptible",
+ "http://arvados.org/cwl#OutputCollectionProperties",
])
def exit_signal_handler(sigcode, frame):
logger_handler=arvados.log_handler,
custom_schema_callback=add_arv_hints,
loadingContext=executor.loadingContext,
- runtimeContext=executor.runtimeContext,
+ runtimeContext=executor.toplevel_runtimeContext,
input_required=not (arvargs.create_workflow or arvargs.update_workflow))
- type: record
name: PropertyDef
doc: |
- Define a property that will be set on the submitted container
- request associated with this workflow or step.
+ Define an arvados metadata property that will be set on a
+ container request or output collection.
fields:
- name: propertyName
type: string
_id: "@type"
_type: "@vocab"
usePreemptible: boolean
+
+- name: OutputCollectionProperties
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Specify metadata properties that will be set on the output
+ collection associated with this workflow or step.
+ fields:
+ class:
+ type: string
+ doc: "Always 'arv:OutputCollectionProperties"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+ outputProperties:
+ type: PropertyDef[]
+ jsonldPredicate:
+ mapSubject: propertyName
+ mapPredicate: propertyValue
_id: "@type"
_type: "@vocab"
usePreemptible: boolean
+
+- name: OutputCollectionProperties
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Specify metadata properties that will be set on the output
+ collection associated with this workflow or step.
+ fields:
+ class:
+ type: string
+ doc: "Always 'arv:OutputCollectionProperties"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+ outputProperties:
+ type: PropertyDef[]
+ jsonldPredicate:
+ mapSubject: propertyName
+ mapPredicate: propertyValue
_id: "@type"
_type: "@vocab"
usePreemptible: boolean
+
+- name: OutputCollectionProperties
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Specify metadata properties that will be set on the output
+ collection associated with this workflow or step.
+ fields:
+ class:
+ type: string
+ doc: "Always 'arv:OutputCollectionProperties"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+ outputProperties:
+ type: PropertyDef[]
+ jsonldPredicate:
+ mapSubject: propertyName
+ mapPredicate: propertyValue
mounts[targetdir]["path"] = path
prevdir = targetdir + "/"
+ intermediate_collection_info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
+
with Perf(metrics, "generatefiles %s" % self.name):
if self.generatefiles["listing"]:
vwd = arvados.collection.Collection(api_client=self.arvrunner.api,
if not runtimeContext.current_container:
runtimeContext.current_container = arvados_cwl.util.get_current_container(self.arvrunner.api, self.arvrunner.num_retries, logger)
- info = arvados_cwl.util.get_intermediate_collection_info(self.name, runtimeContext.current_container, runtimeContext.intermediate_output_ttl)
- vwd.save_new(name=info["name"],
+ vwd.save_new(name=intermediate_collection_info["name"],
owner_uuid=runtimeContext.project_uuid,
ensure_unique_name=True,
- trash_at=info["trash_at"],
- properties=info["properties"])
+ trash_at=intermediate_collection_info["trash_at"],
+ properties=intermediate_collection_info["properties"])
prev = None
for f, p in sorteditems:
runtimeContext.project_uuid,
runtimeContext.force_docker_pull,
runtimeContext.tmp_outdir_prefix,
- runtimeContext.match_local_docker)
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
network_req, _ = self.get_requirement("NetworkAccess")
if network_req:
if runtimeContext.submit_runner_cluster:
extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
- container_request["output_name"] = "Output for step %s" % (self.name)
+ container_request["output_name"] = "Output from step %s" % (self.name)
container_request["output_ttl"] = self.output_ttl
container_request["mounts"] = mounts
container_request["secret_mounts"] = secret_mounts
for pr in properties_req["processProperties"]:
container_request["properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
+ output_properties_req, _ = self.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
+ if output_properties_req:
+ if self.arvrunner.api._rootDesc["revision"] >= "20220510":
+ container_request["output_properties"] = {}
+ for pr in output_properties_req["outputProperties"]:
+ container_request["output_properties"][pr["propertyName"]] = self.builder.do_eval(pr["propertyValue"])
+ else:
+ logger.warning("%s API revision is %s, revision %s is required to support setting properties on output collections.",
+ self.arvrunner.label(self), self.arvrunner.api._rootDesc["revision"], "20220510")
+
if runtimeContext.runnerjob.startswith("arvwf:"):
wfuuid = runtimeContext.runnerjob[6:runtimeContext.runnerjob.index("#")]
wfrecord = self.arvrunner.api.workflows().get(uuid=wfuuid).execute(num_retries=self.arvrunner.num_retries)
"cwd": "/var/spool/cwl",
"priority": self.priority,
"state": "Committed",
- "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image),
+ "container_image": arvados_jobs_image(self.arvrunner, self.jobs_image, runtimeContext),
"mounts": {
"/var/lib/cwl/cwl.input.json": {
"kind": "json",
"portable_data_hash": "%s" % workflowcollection
}
else:
- packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map)
+ packed = packed_workflow(self.arvrunner, self.embedded_tool, self.merged_map, runtimeContext)
workflowpath = "/var/lib/cwl/workflow.json#main"
container_req["mounts"]["/var/lib/cwl/workflow.json"] = {
"kind": "json",
if runtimeContext.intermediate_storage_classes != "default" and runtimeContext.intermediate_storage_classes:
command.append("--intermediate-storage-classes=" + runtimeContext.intermediate_storage_classes)
- if self.on_error:
+ if runtimeContext.on_error:
command.append("--on-error=" + self.on_error)
- if self.intermediate_output_ttl:
- command.append("--intermediate-output-ttl=%d" % self.intermediate_output_ttl)
+ if runtimeContext.intermediate_output_ttl:
+ command.append("--intermediate-output-ttl=%d" % runtimeContext.intermediate_output_ttl)
- if self.arvrunner.trash_intermediate:
+ if runtimeContext.trash_intermediate:
command.append("--trash-intermediate")
- if self.arvrunner.project_uuid:
- command.append("--project-uuid="+self.arvrunner.project_uuid)
+ if runtimeContext.project_uuid:
+ command.append("--project-uuid="+runtimeContext.project_uuid)
if self.enable_dev:
command.append("--enable-dev")
def run(self, runtimeContext):
runtimeContext.keepprefix = "keep:"
job_spec = self.arvados_job_spec(runtimeContext)
- if self.arvrunner.project_uuid:
- job_spec["owner_uuid"] = self.arvrunner.project_uuid
+ if runtimeContext.project_uuid:
+ job_spec["owner_uuid"] = runtimeContext.project_uuid
extra_submit_params = {}
if runtimeContext.submit_runner_cluster:
def arv_docker_get_image(api_client, dockerRequirement, pull_image, project_uuid,
- force_pull, tmp_outdir_prefix, match_local_docker):
+ force_pull, tmp_outdir_prefix, match_local_docker, copy_deps):
"""Check if a Docker image is available in Keep, if not, upload it using arv-keepdocker."""
if "http://arvados.org/cwl#dockerCollectionPDH" in dockerRequirement:
image_name = sp[0]
image_tag = sp[1] if len(sp) > 1 else "latest"
- images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
+ out_of_project_images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
image_name=image_name,
- image_tag=image_tag)
+ image_tag=image_tag,
+ project_uuid=None)
- if images and match_local_docker:
+ if copy_deps:
+ # Only images that are available in the destination project
+ images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
+ image_name=image_name,
+ image_tag=image_tag,
+ project_uuid=project_uuid)
+ else:
+ images = out_of_project_images
+
+ if match_local_docker:
local_image_id = determine_image_id(dockerRequirement["dockerImageId"])
if local_image_id:
# find it in the list
# force re-upload.
images = []
+ for i in out_of_project_images:
+ if i[1]["dockerhash"] == local_image_id:
+ found = True
+ out_of_project_images = [i]
+ break
+ if not found:
+ # force re-upload.
+ out_of_project_images = []
+
if not images:
- # Fetch Docker image if necessary.
- try:
- result = cwltool.docker.DockerCommandLineJob.get_image(dockerRequirement, pull_image,
- force_pull, tmp_outdir_prefix)
- if not result:
- raise WorkflowException("Docker image '%s' not available" % dockerRequirement["dockerImageId"])
- except OSError as e:
- raise WorkflowException("While trying to get Docker image '%s', failed to execute 'docker': %s" % (dockerRequirement["dockerImageId"], e))
+ if not out_of_project_images:
+ # Fetch Docker image if necessary.
+ try:
+ result = cwltool.docker.DockerCommandLineJob.get_image(dockerRequirement, pull_image,
+ force_pull, tmp_outdir_prefix)
+ if not result:
+ raise WorkflowException("Docker image '%s' not available" % dockerRequirement["dockerImageId"])
+ except OSError as e:
+ raise WorkflowException("While trying to get Docker image '%s', failed to execute 'docker': %s" % (dockerRequirement["dockerImageId"], e))
# Upload image to Arvados
args = []
images = arvados.commands.keepdocker.list_images_in_arv(api_client, 3,
image_name=image_name,
- image_tag=image_tag)
+ image_tag=image_tag,
+ project_uuid=project_uuid)
if not images:
raise WorkflowException("Could not find Docker image %s:%s" % (image_name, image_tag))
max_res_pars = ("coresMin", "coresMax", "ramMin", "ramMax", "tmpdirMin", "tmpdirMax")
sum_res_pars = ("outdirMin", "outdirMax")
-def upload_workflow(arvRunner, tool, job_order, project_uuid, uuid=None,
+def upload_workflow(arvRunner, tool, job_order, project_uuid,
+ runtimeContext, uuid=None,
submit_runner_ram=0, name=None, merged_map=None,
submit_runner_image=None):
- packed = packed_workflow(arvRunner, tool, merged_map)
+ packed = packed_workflow(arvRunner, tool, merged_map, runtimeContext)
adjustDirObjs(job_order, trim_listing)
adjustFileObjs(job_order, trim_anonymous_location)
name = tool.tool.get("label", os.path.basename(tool.tool["id"]))
upload_dependencies(arvRunner, name, tool.doc_loader,
- packed, tool.tool["id"], False)
+ packed, tool.tool["id"], False,
+ runtimeContext)
wf_runner_resources = None
wf_runner_resources = {"class": "http://arvados.org/cwl#WorkflowRunnerResources"}
hints.append(wf_runner_resources)
- wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner, submit_runner_image or "arvados/jobs:"+__version__)
+ wf_runner_resources["acrContainerImage"] = arvados_jobs_image(arvRunner,
+ submit_runner_image or "arvados/jobs:"+__version__,
+ runtimeContext)
if submit_runner_ram:
wf_runner_resources["ramMin"] = submit_runner_ram
self.doc_loader,
joborder,
joborder.get("id", "#"),
- False)
+ False,
+ runtimeContext)
if self.wf_pdh is None:
packed = pack(self.loadingContext, self.tool["id"], loader=self.doc_loader)
self.doc_loader,
packed,
self.tool["id"],
- False)
+ False,
+ runtimeContext)
# Discover files/directories referenced by the
# workflow (mainly "default" values)
if self.wf_pdh is None:
adjustFileObjs(packed, keepmount)
adjustDirObjs(packed, keepmount)
- self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed)
+ self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed, runtimeContext)
self.loadingContext = self.loadingContext.copy()
self.loadingContext.metadata = self.loadingContext.metadata.copy()
self.collection_cache_size = 256
self.match_local_docker = False
self.enable_preemptible = None
+ self.copy_deps = None
super(ArvRuntimeContext, self).__init__(kwargs)
import arvados_cwl.util
from .arvcontainer import RunnerContainer
-from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
+from .runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, make_builder
from .arvtool import ArvadosCommandTool, validate_cluster_target, ArvadosExpressionTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache, pdh_size
handler = RuntimeStatusLoggingHandler(self.runtime_status_update)
root_logger.addHandler(handler)
- self.runtimeContext = ArvRuntimeContext(vars(arvargs))
- self.runtimeContext.make_fs_access = partial(CollectionFsAccess,
+ self.toplevel_runtimeContext = ArvRuntimeContext(vars(arvargs))
+ self.toplevel_runtimeContext.make_fs_access = partial(CollectionFsAccess,
collection_cache=self.collection_cache)
- validate_cluster_target(self, self.runtimeContext)
+ validate_cluster_target(self, self.toplevel_runtimeContext)
def arv_make_tool(self, toolpath_object, loadingContext):
with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
self.check_features(v, parentfield=parentfield)
- def make_output_collection(self, name, storage_classes, tagsString, outputObj):
+ def make_output_collection(self, name, storage_classes, tagsString, output_properties, outputObj):
outputObj = copy.deepcopy(outputObj)
files = []
res = str(json.dumps(outputObj, sort_keys=True, indent=4, separators=(',',': '), ensure_ascii=False))
f.write(res)
- final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes, ensure_unique_name=True)
+
+ final.save_new(name=name, owner_uuid=self.project_uuid, storage_classes=storage_classes,
+ ensure_unique_name=True, properties=output_properties)
logger.info("Final output collection %s \"%s\" (%s)", final.portable_data_hash(),
final.api_response()["name"],
self.api.containers().update(uuid=current['uuid'],
body={
'output': self.final_output_collection.portable_data_hash(),
+ 'output_properties': self.final_output_collection.get_properties(),
}).execute(num_retries=self.num_retries)
self.api.collections().update(uuid=self.final_output_collection.manifest_locator(),
body={
updated_tool.visit(self.check_features)
- self.project_uuid = runtimeContext.project_uuid
self.pipeline = None
self.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
self.secret_store = runtimeContext.secret_store
if runtimeContext.submit_request_uuid and self.work_api != "containers":
raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
+ runtimeContext = runtimeContext.copy()
+
default_storage_classes = ",".join([k for k,v in self.api.config().get("StorageClasses", {"default": {"Default": True}}).items() if v.get("Default") is True])
if runtimeContext.storage_classes == "default":
runtimeContext.storage_classes = default_storage_classes
if not runtimeContext.name:
runtimeContext.name = self.name = updated_tool.tool.get("label") or updated_tool.metadata.get("label") or os.path.basename(updated_tool.tool["id"])
+ if runtimeContext.copy_deps is None and (runtimeContext.create_workflow or runtimeContext.update_workflow):
+ # When creating or updating workflow record, by default
+ # always copy dependencies and ensure Docker images are up
+ # to date.
+ runtimeContext.copy_deps = True
+ runtimeContext.match_local_docker = True
+
+ if runtimeContext.update_workflow and self.project_uuid is None:
+ # If we are updating a workflow, make sure anything that
+ # gets uploaded goes into the same parent project, unless
+ # an alternate --project-uuid was provided.
+ existing_wf = self.api.workflows().get(uuid=runtimeContext.update_workflow).execute()
+ runtimeContext.project_uuid = existing_wf["owner_uuid"]
+
+ self.project_uuid = runtimeContext.project_uuid
+
# Upload local file references in the job order.
job_order = upload_job_order(self, "%s input" % runtimeContext.name,
- updated_tool, job_order)
+ updated_tool, job_order, runtimeContext)
# the last clause means: if it is a command line tool, and we
# are going to wait for the result, and always_submit_runner
# Upload direct dependencies of workflow steps, get back mapping of files to keep references.
# Also uploads docker images.
- merged_map = upload_workflow_deps(self, tool)
+ merged_map = upload_workflow_deps(self, tool, runtimeContext)
# Recreate process object (ArvadosWorkflow or
# ArvadosCommandTool) because tool document may have been
loadingContext.metadata = tool.metadata
tool = load_tool(tool.tool, loadingContext)
- existing_uuid = runtimeContext.update_workflow
- if existing_uuid or runtimeContext.create_workflow:
+ if runtimeContext.update_workflow or runtimeContext.create_workflow:
# Create a pipeline template or workflow record and exit.
if self.work_api == "containers":
uuid = upload_workflow(self, tool, job_order,
- self.project_uuid,
- uuid=existing_uuid,
- submit_runner_ram=runtimeContext.submit_runner_ram,
- name=runtimeContext.name,
- merged_map=merged_map,
- submit_runner_image=runtimeContext.submit_runner_image)
+ runtimeContext.project_uuid,
+ runtimeContext,
+ uuid=runtimeContext.update_workflow,
+ submit_runner_ram=runtimeContext.submit_runner_ram,
+ name=runtimeContext.name,
+ merged_map=merged_map,
+ submit_runner_image=runtimeContext.submit_runner_image)
self.stdout.write(uuid + "\n")
return (None, "success")
self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
self.eval_timeout = runtimeContext.eval_timeout
- runtimeContext = runtimeContext.copy()
runtimeContext.use_container = True
runtimeContext.tmpdir_prefix = "tmp"
runtimeContext.work_api = self.work_api
+ if not self.output_name:
+ self.output_name = "Output from workflow %s" % runtimeContext.name
+
if self.work_api == "containers":
if self.ignore_docker_for_reuse:
raise Exception("--ignore-docker-for-reuse not supported with containers API.")
if workbench2 or workbench1:
logger.info("Output at %scollections/%s", workbench2 or workbench1, tool.final_output)
else:
- if self.output_name is None:
- self.output_name = "Output of %s" % (shortname(tool.tool["id"]))
if self.output_tags is None:
self.output_tags = ""
else:
storage_classes = runtimeContext.storage_classes.strip().split(",")
- self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes, self.output_tags, self.final_output)
+ output_properties = {}
+ output_properties_req, _ = tool.get_requirement("http://arvados.org/cwl#OutputCollectionProperties")
+ if output_properties_req:
+ builder = make_builder(job_order, tool.hints, tool.requirements, runtimeContext, tool.metadata)
+ for pr in output_properties_req["outputProperties"]:
+ output_properties[pr["propertyName"]] = builder.do_eval(pr["propertyValue"])
+
+ self.final_output, self.final_output_collection = self.make_output_collection(self.output_name, storage_classes,
+ self.output_tags, output_properties,
+ self.final_output)
self.set_crunch_output()
if runtimeContext.compute_checksum:
import schema_salad.validate as validate
import arvados.collection
+import arvados.util
from .util import collectionUUID
from ruamel.yaml import YAML
from ruamel.yaml.comments import CommentedMap, CommentedSeq
set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
return
+ if inputschema == "File":
+ inputschema = {"type": "File"}
+
if isinstance(inputschema, basestring):
sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
if sd:
set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
elif (inputschema["type"] == "File" and
- secondaryspec and
isinstance(primary, Mapping) and
- primary.get("class") == "File" and
- "secondaryFiles" not in primary):
+ primary.get("class") == "File"):
+
+ if "secondaryFiles" in primary or not secondaryspec:
+ # Nothing to do.
+ return
+
#
# Found a file, check for secondaryFiles
#
primary["secondaryFiles"] = secondaryspec
for i, sf in enumerate(aslist(secondaryspec)):
if builder.cwlVersion == "v1.0":
- pattern = builder.do_eval(sf, context=primary)
+ pattern = sf
else:
- pattern = builder.do_eval(sf["pattern"], context=primary)
+ pattern = sf["pattern"]
if pattern is None:
continue
if isinstance(pattern, list):
"Expression must return list, object, string or null")
if pattern is not None:
- sfpath = substitute(primary["location"], pattern)
+ if "${" in pattern or "$(" in pattern:
+ sfname = builder.do_eval(pattern, context=primary)
+ else:
+ sfname = substitute(primary["basename"], pattern)
+
+ if sfname is None:
+ continue
+
+ p_location = primary["location"]
+ if "/" in p_location:
+ sfpath = (
+ p_location[0 : p_location.rindex("/") + 1]
+ + sfname
+ )
required = builder.do_eval(required, context=primary)
primary["secondaryFiles"] = cmap(found)
if discovered is not None:
discovered[primary["location"]] = primary["secondaryFiles"]
- elif inputschema["type"] not in primitive_types_set:
+ elif inputschema["type"] not in primitive_types_set and inputschema["type"] not in ("File", "Directory"):
set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
def upload_dependencies(arvrunner, name, document_loader,
- workflowobj, uri, loadref_run,
+ workflowobj, uri, loadref_run, runtimeContext,
include_primary=True, discovered_secondaryfiles=None):
"""Upload the dependencies of the workflowobj document to Keep.
single_collection=True,
optional_deps=optional_deps)
+ keeprefs = set()
+ def addkeepref(k):
+ if k.startswith("keep:"):
+ keeprefs.add(collection_pdh_pattern.match(k).group(1))
+
def setloc(p):
loc = p.get("location")
if loc and (not loc.startswith("_:")) and (not loc.startswith("keep:")):
p["location"] = mapper.mapper(p["location"]).resolved
+ addkeepref(p["location"])
return
if not loc:
gp = collection_uuid_pattern.match(loc)
if not gp:
+ # Not a uuid pattern (must be a pdh pattern)
+ addkeepref(p["location"])
return
+
uuid = gp.groups()[0]
if uuid not in uuid_map:
raise SourceLine(p, "location", validate.ValidationException).makeError(
for d in discovered:
discovered_secondaryfiles[mapper.mapper(d).resolved] = discovered[d]
+ if runtimeContext.copy_deps:
+ # Find referenced collections and copy them into the
+ # destination project, for easy sharing.
+ already_present = list(arvados.util.keyset_list_all(arvrunner.api.collections().list,
+ filters=[["portable_data_hash", "in", list(keeprefs)],
+ ["owner_uuid", "=", runtimeContext.project_uuid]],
+ select=["uuid", "portable_data_hash", "created_at"]))
+
+ keeprefs = keeprefs - set(a["portable_data_hash"] for a in already_present)
+ for kr in keeprefs:
+ col = arvrunner.api.collections().list(filters=[["portable_data_hash", "=", kr]],
+ order="created_at desc",
+ select=["name", "description", "properties", "portable_data_hash", "manifest_text", "storage_classes_desired", "trash_at"],
+ limit=1).execute()
+ if len(col["items"]) == 0:
+ logger.warning("Cannot find collection with portable data hash %s", kr)
+ continue
+ col = col["items"][0]
+ try:
+ arvrunner.api.collections().create(body={"collection": {
+ "owner_uuid": runtimeContext.project_uuid,
+ "name": col["name"],
+ "description": col["description"],
+ "properties": col["properties"],
+ "portable_data_hash": col["portable_data_hash"],
+ "manifest_text": col["manifest_text"],
+ "storage_classes_desired": col["storage_classes_desired"],
+ "trash_at": col["trash_at"]
+ }}, ensure_unique_name=True).execute()
+ except Exception as e:
+ logger.warning("Unable copy collection to destination: %s", e)
+
if "$schemas" in workflowobj:
sch = CommentedSeq()
for s in workflowobj["$schemas"]:
return mapper
-def upload_docker(arvrunner, tool):
+def upload_docker(arvrunner, tool, runtimeContext):
"""Uploads Docker images used in CommandLineTool objects."""
if isinstance(tool, CommandLineTool):
(docker_req, docker_is_req) = tool.get_requirement("DockerRequirement")
if docker_req:
if docker_req.get("dockerOutputDirectory") and arvrunner.work_api != "containers":
- # TODO: can be supported by containers API, but not jobs API.
raise SourceLine(docker_req, "dockerOutputDirectory", UnsupportedRequirement).makeError(
"Option 'dockerOutputDirectory' of DockerRequirement not supported.")
- arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True, arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker)
+
+ arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, docker_req, True,
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
else:
arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": "arvados/jobs:"+__version__},
- True, arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker)
+ True,
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
elif isinstance(tool, cwltool.workflow.Workflow):
for s in tool.steps:
- upload_docker(arvrunner, s.embedded_tool)
+ upload_docker(arvrunner, s.embedded_tool, runtimeContext)
-def packed_workflow(arvrunner, tool, merged_map):
+def packed_workflow(arvrunner, tool, merged_map, runtimeContext):
"""Create a packed workflow.
A "packed" workflow is one where all the components have been combined into a single document."""
v["secondaryFiles"] = merged_map[cur_id].secondaryFiles[v["location"]]
if v.get("class") == "DockerRequirement":
v["http://arvados.org/cwl#dockerCollectionPDH"] = arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, v, True,
- arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker)
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
for l in v:
visit(v[l], cur_id)
if isinstance(v, list):
packed["http://schema.org/version"] = githash
-def upload_job_order(arvrunner, name, tool, job_order):
+def upload_job_order(arvrunner, name, tool, job_order, runtimeContext):
"""Upload local files referenced in the input object and return updated input
object with 'location' updated to the proper keep references.
"""
tool.doc_loader,
job_order,
job_order.get("id", "#"),
- False)
+ False,
+ runtimeContext)
if "id" in job_order:
del job_order["id"]
FileUpdates = namedtuple("FileUpdates", ["resolved", "secondaryFiles"])
-def upload_workflow_deps(arvrunner, tool):
+def upload_workflow_deps(arvrunner, tool, runtimeContext):
# Ensure that Docker images needed by this workflow are available
- upload_docker(arvrunner, tool)
+ upload_docker(arvrunner, tool, runtimeContext)
document_loader = tool.doc_loader
deptool,
deptool["id"],
False,
+ runtimeContext,
include_primary=False,
discovered_secondaryfiles=discovered_secondaryfiles)
document_loader.idx[deptool["id"]] = deptool
return merged_map
-def arvados_jobs_image(arvrunner, img):
+def arvados_jobs_image(arvrunner, img, runtimeContext):
"""Determine if the right arvados/jobs image version is available. If not, try to pull and upload it."""
try:
- return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img}, True, arvrunner.project_uuid,
- arvrunner.runtimeContext.force_docker_pull,
- arvrunner.runtimeContext.tmp_outdir_prefix,
- arvrunner.runtimeContext.match_local_docker)
+ return arvados_cwl.arvdocker.arv_docker_get_image(arvrunner.api, {"dockerPull": img},
+ True,
+ runtimeContext.project_uuid,
+ runtimeContext.force_docker_pull,
+ runtimeContext.tmp_outdir_prefix,
+ runtimeContext.match_local_docker,
+ runtimeContext.copy_deps)
except Exception as e:
raise Exception("Docker image %s is not available\n%s" % (img, e) )
-def upload_workflow_collection(arvrunner, name, packed):
+def upload_workflow_collection(arvrunner, name, packed, runtimeContext):
collection = arvados.collection.Collection(api_client=arvrunner.api,
keep_client=arvrunner.keep_client,
num_retries=arvrunner.num_retries)
filters = [["portable_data_hash", "=", collection.portable_data_hash()],
["name", "like", name+"%"]]
- if arvrunner.project_uuid:
- filters.append(["owner_uuid", "=", arvrunner.project_uuid])
+ if runtimeContext.project_uuid:
+ filters.append(["owner_uuid", "=", runtimeContext.project_uuid])
exists = arvrunner.api.collections().list(filters=filters).execute(num_retries=arvrunner.num_retries)
if exists["items"]:
logger.info("Using collection %s", exists["items"][0]["uuid"])
else:
collection.save_new(name=name,
- owner_uuid=arvrunner.project_uuid,
+ owner_uuid=runtimeContext.project_uuid,
ensure_unique_name=True,
num_retries=arvrunner.num_retries)
logger.info("Uploaded to %s", collection.manifest_locator())
if intermediate_output_ttl > 0:
trash_time = datetime.datetime.utcnow() + datetime.timedelta(seconds=intermediate_output_ttl)
container_uuid = None
+ props = {"type": "intermediate"}
if current_container:
- container_uuid = current_container['uuid']
- props = {"type": "intermediate", "container": container_uuid}
+ props["container"] = current_container['uuid']
return {"name" : name, "trash_at" : trash_time, "properties" : props}
arvbox start $config $tag
+# Copy the integration test suite from our local arvados clone instead
+# of using the one inside the container, so we can make changes to the
+# integration tests without necessarily having to rebuilding the
+# container image.
+docker cp -L $(readlink -f $(dirname $0)/tests) $ARVBOX_CONTAINER:/usr/src/arvados/sdk/cwl
+
arvbox pipe <<EOF
set -eu -o pipefail
if test -n "$build" ; then
/usr/src/arvados/build/build-dev-docker-jobs-image.sh
-elif test "$tag" = "latest" ; then
- arv-keepdocker --pull arvados/jobs $tag
-else
- set +u
- export WORKSPACE=/usr/src/arvados
- . /usr/src/arvados/build/run-library.sh
- TMPHERE=\$(pwd)
- cd /usr/src/arvados
-
- # This defines python_sdk_version and cwl_runner_version with python-style
- # package suffixes (.dev/rc)
- calculate_python_sdk_cwl_package_versions
-
- cd \$TMPHERE
- set -u
-
- arv-keepdocker --pull arvados/jobs \$cwl_runner_version
- docker tag arvados/jobs:\$cwl_runner_version arvados/jobs:latest
- arv-keepdocker arvados/jobs latest
fi
EXTRA=--compute-checksum
fi
env
+
+# Skip docker_entrypoint test because it fails on singularity
+#
+# Skip timelimit_invalid_wf test because the timeout is very short
+# (5s) and singularity containers loading off an arv-mount take too long
+# to start and get incorrectly terminated
+#
+# Skip test 199 in the v1.1 suite because it has different output
+# depending on whether there is a pty associated with stdout (fixed in
+# the v1.2 suite)
+#
+# Skip test 307 in the v1.2 suite because the test relied on
+# secondary file behavior of cwltool that wasn't actually correct to specification
+
if [[ "$suite" = "integration" ]] ; then
cd /usr/src/arvados/sdk/cwl/tests
exec ./arvados-tests.sh $@
elif [[ "$suite" = "conformance-v1.2" ]] ; then
- exec cwltest --tool arvados-cwl-runner --test conformance_tests.yaml -N307 $@ -- \$EXTRA
-else
- exec ./run_test.sh RUNNER=arvados-cwl-runner EXTRA="\$EXTRA" $@
+ exec cwltest --tool arvados-cwl-runner --test conformance_tests.yaml -Sdocker_entrypoint,timelimit_invalid_wf -N307 $@ -- \$EXTRA
+elif [[ "$suite" = "conformance-v1.1" ]] ; then
+ exec cwltest --tool arvados-cwl-runner --test conformance_tests.yaml -Sdocker_entrypoint,timelimit_invalid_wf -N199 $@ -- \$EXTRA
+elif [[ "$suite" = "conformance-v1.0" ]] ; then
+ exec cwltest --tool arvados-cwl-runner --test v1.0/conformance_test_v1.0.yaml -Sdocker_entrypoint $@ -- \$EXTRA
fi
EOF
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+class: Workflow
+cwlVersion: v1.2
+$namespaces:
+ arv: "http://arvados.org/cwl#"
+hints:
+ arv:OutputCollectionProperties:
+ outputProperties:
+ foo: bar
+ baz: $(inputs.inp.basename)
+inputs:
+ inp: File
+steps:
+ cat:
+ in:
+ inp: inp
+ run: cat.cwl
+ out: []
+outputs: []
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.2
+class: CommandLineTool
+baseCommand: echo
+inputs:
+ message:
+ type: File
+ inputBinding:
+ position: 1
+ default:
+ class: File
+ location: keep:d7514270f356df848477718d58308cc4+94/b
+
+outputs: []
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.2
+class: Workflow
+
+requirements:
+ InlineJavascriptRequirement: {}
+
+inputs:
+ file1:
+ type: File?
+ secondaryFiles:
+ - pattern: .tbi
+ required: true
+ file2:
+ type: File
+ secondaryFiles:
+ - pattern: |
+ ${
+ return self.basename + '.tbi';
+ }
+ required: true
+outputs:
+ out:
+ type: File
+ outputSource: cat/out
+ out2:
+ type: File
+ outputSource: cat2/out
+steps:
+ cat:
+ in:
+ inp: file1
+ run: cat2.cwl
+ out: [out]
+ cat2:
+ in:
+ inp: file2
+ run: cat2.cwl
+ out: [out]
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+file1:
+ class: File
+ location: 19109-upload-secondary/file1.txt
+file2:
+ class: File
+ location: 19109-upload-secondary/file2.txt
--- /dev/null
+strawberry
--- /dev/null
+blueberry
\ No newline at end of file
--- /dev/null
+mango
\ No newline at end of file
-#!/bin/sh
+#!/bin/bash
# Copyright (C) The Arvados Authors. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
+#
+# This in an additional integration test suite for Arvados specific
+# bugs and features that are not covered by the unit tests or CWL
+# conformance tests.
+#
-set -e
+set -ex
if ! arv-get d7514270f356df848477718d58308cc4+94 > /dev/null ; then
arv-put --portable-data-hash testdir/*
arv-put --portable-data-hash samples/sample1_S01_R1_001.fastq.gz
fi
+# Use the python executor associated with the installed OS package, if present.
+python=$(((ls /usr/share/python3*/dist/python3-arvados-cwl-runner/bin/python || echo python3) | head -n1) 2>/dev/null)
+
+# Test for #18888
+# This is a standalone test because the bug was observed with this
+# command line and was thought to be due to command line handling.
arvados-cwl-runner 18888-download_def.cwl --scripts scripts/
+# Test for #19070
+# The most effective way to test this seemed to be to write an
+# integration test to check for the expected behavior.
+$python test_copy_deps.py
+
+# Test for #17004
+# Checks that the final output collection has the expected properties.
+python test_set_output_prop.py
+
+# Run integration tests
exec cwltest --test arvados-tests.yml --tool arvados-cwl-runner $@ -- --disable-reuse --compute-checksum --api=containers
output: {}
tool: 18994-basename/wf_ren.cwl
doc: "Test issue 18994 - correctly stage file with modified basename"
+
+- job: 19109-upload-secondary.yml
+ output: {
+ "out": {
+ "basename": "file1.catted",
+ "class": "File",
+ "location": "file1.catted",
+ "size": 20,
+ "checksum": "sha1$c4cead17cebdd829f38c48e18a28f1da72339ef7"
+ },
+ "out2": {
+ "basename": "file2.catted",
+ "checksum": "sha1$6f71c5d1512519ede45bedfdd624e05fd8037b0d",
+ "class": "File",
+ "location": "file2.catted",
+ "size": 12
+ }
+ }
+ tool: 19109-upload-secondary.cwl
+ doc: "Test issue 19109 - correctly discover & upload secondary files"
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+cwlVersion: v1.2
+class: CommandLineTool
+inputs:
+ - id: inp
+ type: File
+ secondaryFiles:
+ - pattern: .tbi
+ required: true
+stdout: $(inputs.inp.nameroot).catted
+outputs:
+ out:
+ type: stdout
+arguments: [cat, '$(inputs.inp.path)', '$(inputs.inp.secondaryFiles[0].path)']
+#!/bin/sh
+
# Copyright (C) The Arvados Authors. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0
-#!/bin/bash
-
echo bubble
"capacity": 1073741824 }
},
'state': 'Committed',
- 'output_name': 'Output for step test_run_'+str(enable_reuse),
+ 'output_name': 'Output from step test_run_'+str(enable_reuse),
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
"capacity": 5242880000 }
},
'state': 'Committed',
- 'output_name': 'Output for step test_resource_requirements',
+ 'output_name': 'Output from step test_resource_requirements',
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 7200,
}
},
'state': 'Committed',
- 'output_name': 'Output for step test_initial_work_dir',
+ 'output_name': 'Output from step test_initial_work_dir',
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
},
},
'state': 'Committed',
- "output_name": "Output for step test_run_redirect",
+ "output_name": "Output from step test_run_redirect",
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
"capacity": 1073741824 }
},
'state': 'Committed',
- 'output_name': 'Output for step test_run_mounts',
+ 'output_name': 'Output from step test_run_mounts',
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
"capacity": 1073741824 }
},
'state': 'Committed',
- 'output_name': 'Output for step test_secrets',
+ 'output_name': 'Output from step test_secrets',
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
"capacity": 1073741824 }
},
'state': 'Committed',
- 'output_name': 'Output for step test_run_True',
+ 'output_name': 'Output from step test_run_True',
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
"capacity": 1073741824 }
},
'state': 'Committed',
- 'output_name': 'Output for step test_run_True',
+ 'output_name': 'Output from step test_run_True',
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
"capacity": 1073741824 }
},
'state': 'Committed',
- 'output_name': 'Output for step test_run_True' + ("" if test_case == 0 else "_"+str(test_case+1)),
+ 'output_name': 'Output from step test_run_True' + ("" if test_case == 0 else "_"+str(test_case+1)),
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
"capacity": 1073741824 }
},
'state': 'Committed',
- 'output_name': 'Output for step test_run_True',
+ 'output_name': 'Output from step test_run_True',
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
runtimeContext.match_local_docker = True
container_request['container_image'] = '99999999999999999999999999999993+99'
container_request['name'] = 'test_run_True_2'
- container_request['output_name'] = 'Output for step test_run_True_2'
+ container_request['output_name'] = 'Output from step test_run_True_2'
for j in arvtool.job({}, mock.MagicMock(), runtimeContext):
j.run(runtimeContext)
runner.api.container_requests().create.assert_called_with(
"capacity": 1073741824 }
},
'state': 'Committed',
- 'output_name': 'Output for step '+runtimeContext.name,
+ 'output_name': 'Output from step '+runtimeContext.name,
'owner_uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz',
'output_path': '/var/spool/cwl',
'output_ttl': 0,
}))
+ @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
+ def test_output_properties(self, keepdocker):
+ arvados_cwl.add_arv_hints()
+ for rev in ["20210628", "20220510"]:
+ runner = mock.MagicMock()
+ runner.ignore_docker_for_reuse = False
+ runner.intermediate_output_ttl = 0
+ runner.secret_store = cwltool.secrets.SecretStore()
+ runner.api._rootDesc = {"revision": rev}
+
+ keepdocker.return_value = [("zzzzz-4zz18-zzzzzzzzzzzzzz3", "")]
+ runner.api.collections().get().execute.return_value = {
+ "portable_data_hash": "99999999999999999999999999999993+99"}
+
+ tool = cmap({
+ "inputs": [{
+ "id": "inp",
+ "type": "string"
+ }],
+ "outputs": [],
+ "baseCommand": "ls",
+ "arguments": [{"valueFrom": "$(runtime.outdir)"}],
+ "id": "",
+ "cwlVersion": "v1.2",
+ "class": "CommandLineTool",
+ "hints": [
+ {
+ "class": "http://arvados.org/cwl#OutputCollectionProperties",
+ "outputProperties": {
+ "foo": "bar",
+ "baz": "$(inputs.inp)"
+ }
+ }
+ ]
+ })
+
+ loadingContext, runtimeContext = self.helper(runner)
+ runtimeContext.name = "test_timelimit"
+
+ arvtool = cwltool.load_tool.load_tool(tool, loadingContext)
+ arvtool.formatgraph = None
+
+ for j in arvtool.job({"inp": "quux"}, mock.MagicMock(), runtimeContext):
+ j.run(runtimeContext)
+
+ _, kwargs = runner.api.container_requests().create.call_args
+ if rev == "20220510":
+ self.assertEqual({"foo": "bar", "baz": "quux"}, kwargs['body'].get('output_properties'))
+ else:
+ self.assertEqual(None, kwargs['body'].get('output_properties'))
+
class TestWorkflow(unittest.TestCase):
def setUp(self):
}
},
"name": "scatterstep",
- "output_name": "Output for step scatterstep",
+ "output_name": "Output from step scatterstep",
"output_path": "/var/spool/cwl",
"output_ttl": 0,
"priority": 500,
u'cwl.input.yml'
],
'use_existing': True,
- 'output_name': u'Output for step echo-subwf',
+ 'output_name': u'Output from step echo-subwf',
'cwd': '/var/spool/cwl',
'output_storage_classes': ["default"]
}))
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import arvados
+import subprocess
+
+api = arvados.api()
+
+def check_contents(group, wf_uuid):
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ if len(contents["items"]) != 3:
+ raise Exception("Expected 3 items in "+group["uuid"]+" was "+len(contents["items"]))
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#workflow" and c["uuid"] == wf_uuid:
+ found = True
+ if not found:
+ raise Exception("Couldn't find workflow in "+group["uuid"])
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#collection" and c["portable_data_hash"] == "d7514270f356df848477718d58308cc4+94":
+ found = True
+ if not found:
+ raise Exception("Couldn't find collection dependency")
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#collection" and c["name"].startswith("Docker image arvados jobs"):
+ found = True
+ if not found:
+ raise Exception("Couldn't find jobs image dependency")
+
+
+def test_create():
+ group = api.groups().create(body={"group": {"name": "test-19070-project-1", "group_class": "project"}}, ensure_unique_name=True).execute()
+ try:
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ if len(contents["items"]) != 0:
+ raise Exception("Expected 0 items")
+
+ # Create workflow, by default should also copy dependencies
+ cmd = ["arvados-cwl-runner", "--create-workflow", "--project-uuid", group["uuid"], "19070-copy-deps.cwl"]
+ print(" ".join(cmd))
+ wf_uuid = subprocess.check_output(cmd)
+ wf_uuid = wf_uuid.decode("utf-8").strip()
+ check_contents(group, wf_uuid)
+ finally:
+ api.groups().delete(uuid=group["uuid"]).execute()
+
+
+def test_update():
+ group = api.groups().create(body={"group": {"name": "test-19070-project-2", "group_class": "project"}}, ensure_unique_name=True).execute()
+ try:
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ if len(contents["items"]) != 0:
+ raise Exception("Expected 0 items")
+
+ # Create workflow, but with --no-copy-deps it shouldn't copy anything
+ cmd = ["arvados-cwl-runner", "--no-copy-deps", "--create-workflow", "--project-uuid", group["uuid"], "19070-copy-deps.cwl"]
+ print(" ".join(cmd))
+ wf_uuid = subprocess.check_output(cmd)
+ wf_uuid = wf_uuid.decode("utf-8").strip()
+
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ if len(contents["items"]) != 1:
+ raise Exception("Expected 1 items")
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#workflow" and c["uuid"] == wf_uuid:
+ found = True
+ if not found:
+ raise Exception("Couldn't find workflow")
+
+ # Updating by default will copy missing items
+ cmd = ["arvados-cwl-runner", "--update-workflow", wf_uuid, "19070-copy-deps.cwl"]
+ print(" ".join(cmd))
+ wf_uuid = subprocess.check_output(cmd)
+ wf_uuid = wf_uuid.decode("utf-8").strip()
+ check_contents(group, wf_uuid)
+
+ finally:
+ api.groups().delete(uuid=group["uuid"]).execute()
+
+
+def test_execute():
+ group = api.groups().create(body={"group": {"name": "test-19070-project-3", "group_class": "project"}}, ensure_unique_name=True).execute()
+ try:
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ if len(contents["items"]) != 0:
+ raise Exception("Expected 0 items")
+
+ # Execute workflow, shouldn't copy anything.
+ cmd = ["arvados-cwl-runner", "--project-uuid", group["uuid"], "19070-copy-deps.cwl"]
+ print(" ".join(cmd))
+ wf_uuid = subprocess.check_output(cmd)
+ wf_uuid = wf_uuid.decode("utf-8").strip()
+
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ # container request
+ # final output collection
+ # container log
+ # step output collection
+ # container request log
+ if len(contents["items"]) != 5:
+ raise Exception("Expected 5 items")
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#collection" and c["portable_data_hash"] == "d7514270f356df848477718d58308cc4+94":
+ found = True
+ if found:
+ raise Exception("Didn't expect to find collection dependency")
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#collection" and c["name"].startswith("Docker image arvados jobs"):
+ found = True
+ if found:
+ raise Exception("Didn't expect to find jobs image dependency")
+
+ # Execute workflow with --copy-deps
+ cmd = ["arvados-cwl-runner", "--project-uuid", group["uuid"], "--copy-deps", "19070-copy-deps.cwl"]
+ print(" ".join(cmd))
+ wf_uuid = subprocess.check_output(cmd)
+ wf_uuid = wf_uuid.decode("utf-8").strip()
+
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#collection" and c["portable_data_hash"] == "d7514270f356df848477718d58308cc4+94":
+ found = True
+ if not found:
+ raise Exception("Couldn't find collection dependency")
+
+ found = False
+ for c in contents["items"]:
+ if c["kind"] == "arvados#collection" and c["name"].startswith("Docker image arvados jobs"):
+ found = True
+ if not found:
+ raise Exception("Couldn't find jobs image dependency")
+
+ finally:
+ api.groups().delete(uuid=group["uuid"]).execute()
+
+if __name__ == '__main__':
+ test_create()
+ test_update()
+ test_execute()
final.open.return_value = openmock
openmock.__enter__.return_value = cwlout
- _, runner.final_output_collection = runner.make_output_collection("Test output", ["foo"], "tag0,tag1,tag2", {
+ _, runner.final_output_collection = runner.make_output_collection("Test output", ["foo"], "tag0,tag1,tag2", {}, {
"foo": {
"class": "File",
"location": "keep:99999999999999999999999999999991+99/foo.txt",
final.copy.assert_has_calls([mock.call('bar.txt', 'baz.txt', overwrite=False, source_collection=readermock)])
final.copy.assert_has_calls([mock.call('foo.txt', 'foo.txt', overwrite=False, source_collection=readermock)])
- final.save_new.assert_has_calls([mock.call(ensure_unique_name=True, name='Test output', owner_uuid='zzzzz-j7d0g-zzzzzzzzzzzzzzz', storage_classes=['foo'])])
+ final.save_new.assert_has_calls([mock.call(ensure_unique_name=True, name='Test output', owner_uuid='zzzzz-j7d0g-zzzzzzzzzzzzzzz', properties={}, storage_classes=['foo'])])
self.assertEqual("""{
"bar": {
"basename": "baz.txt",
reader.return_value = readermock
# This output describes a single file listed in 2 different directories
- _, runner.final_output_collection = runner.make_output_collection("Test output", ["foo"], "", { 'out': [
+ _, runner.final_output_collection = runner.make_output_collection("Test output", ["foo"], "", {}, { 'out': [
{
'basename': 'testdir1',
'listing': [
reader.return_value = readermock
# This output describes two literals with the same basename
- _, runner.final_output_collection = runner.make_output_collection("Test output", ["foo"], "", [
+ _, runner.final_output_collection = runner.make_output_collection("Test output", ["foo"], "", {}, [
{
'lit':
{
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import arvados
+import subprocess
+
+api = arvados.api()
+
+def test_execute():
+ group = api.groups().create(body={"group": {"name": "test-17004-project", "group_class": "project"}}, ensure_unique_name=True).execute()
+ try:
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+ if len(contents["items"]) != 0:
+ raise Exception("Expected 0 items")
+
+ cmd = ["arvados-cwl-runner", "--project-uuid", group["uuid"], "17004-output-props.cwl", "--inp", "scripts/download_all_data.sh"]
+ print(" ".join(cmd))
+ subprocess.check_output(cmd)
+
+ contents = api.groups().contents(uuid=group["uuid"]).execute()
+
+ found = False
+ for c in contents["items"]:
+ if (c["kind"] == "arvados#collection" and
+ c["properties"].get("type") == "output" and
+ c["properties"].get("foo") == "bar" and
+ c["properties"].get("baz") == "download_all_data.sh"):
+ found = True
+ if not found:
+ raise Exception("Didn't find collection with properties")
+
+ finally:
+ api.groups().delete(uuid=group["uuid"]).execute()
+
+if __name__ == '__main__':
+ test_execute()
_rootDesc = None
-def stubs(func):
- @functools.wraps(func)
- @mock.patch("uuid.uuid4")
- @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
- @mock.patch("arvados.collection.KeepClient")
- @mock.patch("arvados.keep.KeepClient")
- @mock.patch("arvados.events.subscribe")
- def wrapped(self, events, keep_client1, keep_client2, keepdocker, uuid4, *args, **kwargs):
- class Stubs(object):
- pass
- stubs = Stubs()
- stubs.events = events
- stubs.keepdocker = keepdocker
-
- uuid4.side_effect = ["df80736f-f14d-4b10-b2e3-03aa27f034bb", "df80736f-f14d-4b10-b2e3-03aa27f034b1",
- "df80736f-f14d-4b10-b2e3-03aa27f034b2", "df80736f-f14d-4b10-b2e3-03aa27f034b3",
- "df80736f-f14d-4b10-b2e3-03aa27f034b4", "df80736f-f14d-4b10-b2e3-03aa27f034b5"]
-
- def putstub(p, **kwargs):
- return "%s+%i" % (hashlib.md5(p).hexdigest(), len(p))
- keep_client1().put.side_effect = putstub
- keep_client1.put.side_effect = putstub
- keep_client2().put.side_effect = putstub
- keep_client2.put.side_effect = putstub
-
- stubs.keep_client = keep_client2
- stubs.docker_images = {
- "arvados/jobs:"+arvados_cwl.__version__: [("zzzzz-4zz18-zzzzzzzzzzzzzd3", {})],
- "debian:buster-slim": [("zzzzz-4zz18-zzzzzzzzzzzzzd4", {})],
- "arvados/jobs:123": [("zzzzz-4zz18-zzzzzzzzzzzzzd5", {})],
- "arvados/jobs:latest": [("zzzzz-4zz18-zzzzzzzzzzzzzd6", {})],
- }
- def kd(a, b, image_name=None, image_tag=None):
- return stubs.docker_images.get("%s:%s" % (image_name, image_tag), [])
- stubs.keepdocker.side_effect = kd
+def stubs(wfname='submit_wf.cwl'):
+ def outer_wrapper(func, *rest):
+ @functools.wraps(func)
+ @mock.patch("arvados_cwl.arvdocker.determine_image_id")
+ @mock.patch("uuid.uuid4")
+ @mock.patch("arvados.commands.keepdocker.list_images_in_arv")
+ @mock.patch("arvados.collection.KeepClient")
+ @mock.patch("arvados.keep.KeepClient")
+ @mock.patch("arvados.events.subscribe")
+ def wrapped(self, events, keep_client1, keep_client2, keepdocker,
+ uuid4, determine_image_id, *args, **kwargs):
+ class Stubs(object):
+ pass
+ stubs = Stubs()
+ stubs.events = events
+ stubs.keepdocker = keepdocker
+
+ uuid4.side_effect = ["df80736f-f14d-4b10-b2e3-03aa27f034bb", "df80736f-f14d-4b10-b2e3-03aa27f034b1",
+ "df80736f-f14d-4b10-b2e3-03aa27f034b2", "df80736f-f14d-4b10-b2e3-03aa27f034b3",
+ "df80736f-f14d-4b10-b2e3-03aa27f034b4", "df80736f-f14d-4b10-b2e3-03aa27f034b5"]
+
+ determine_image_id.return_value = None
+
+ def putstub(p, **kwargs):
+ return "%s+%i" % (hashlib.md5(p).hexdigest(), len(p))
+ keep_client1().put.side_effect = putstub
+ keep_client1.put.side_effect = putstub
+ keep_client2().put.side_effect = putstub
+ keep_client2.put.side_effect = putstub
+
+ stubs.keep_client = keep_client2
+ stubs.docker_images = {
+ "arvados/jobs:"+arvados_cwl.__version__: [("zzzzz-4zz18-zzzzzzzzzzzzzd3", {})],
+ "debian:buster-slim": [("zzzzz-4zz18-zzzzzzzzzzzzzd4", {})],
+ "arvados/jobs:123": [("zzzzz-4zz18-zzzzzzzzzzzzzd5", {})],
+ "arvados/jobs:latest": [("zzzzz-4zz18-zzzzzzzzzzzzzd6", {})],
+ }
+ def kd(a, b, image_name=None, image_tag=None, project_uuid=None):
+ return stubs.docker_images.get("%s:%s" % (image_name, image_tag), [])
+ stubs.keepdocker.side_effect = kd
- stubs.fake_user_uuid = "zzzzz-tpzed-zzzzzzzzzzzzzzz"
- stubs.fake_container_uuid = "zzzzz-dz642-zzzzzzzzzzzzzzz"
+ stubs.fake_user_uuid = "zzzzz-tpzed-zzzzzzzzzzzzzzz"
+ stubs.fake_container_uuid = "zzzzz-dz642-zzzzzzzzzzzzzzz"
- if sys.version_info[0] < 3:
- stubs.capture_stdout = BytesIO()
- else:
- stubs.capture_stdout = StringIO()
+ if sys.version_info[0] < 3:
+ stubs.capture_stdout = BytesIO()
+ else:
+ stubs.capture_stdout = StringIO()
- stubs.api = mock.MagicMock()
- stubs.api._rootDesc = get_rootDesc()
- stubs.api._rootDesc["uuidPrefix"] = "zzzzz"
- stubs.api._rootDesc["revision"] = "20210628"
+ stubs.api = mock.MagicMock()
+ stubs.api._rootDesc = get_rootDesc()
+ stubs.api._rootDesc["uuidPrefix"] = "zzzzz"
+ stubs.api._rootDesc["revision"] = "20210628"
- stubs.api.users().current().execute.return_value = {
- "uuid": stubs.fake_user_uuid,
- }
- stubs.api.collections().list().execute.return_value = {"items": []}
- stubs.api.containers().current().execute.return_value = {
- "uuid": stubs.fake_container_uuid,
- }
- stubs.api.config()["StorageClasses"].items.return_value = {
- "default": {
- "Default": True
- }
- }.items()
-
- class CollectionExecute(object):
- def __init__(self, exe):
- self.exe = exe
- def execute(self, num_retries=None):
- return self.exe
-
- def collection_createstub(created_collections, body, ensure_unique_name=None):
- mt = body["manifest_text"].encode('utf-8')
- uuid = "zzzzz-4zz18-zzzzzzzzzzzzzx%d" % len(created_collections)
- pdh = "%s+%i" % (hashlib.md5(mt).hexdigest(), len(mt))
- created_collections[uuid] = {
- "uuid": uuid,
- "portable_data_hash": pdh,
- "manifest_text": mt.decode('utf-8')
+ stubs.api.users().current().execute.return_value = {
+ "uuid": stubs.fake_user_uuid,
}
- return CollectionExecute(created_collections[uuid])
-
- def collection_getstub(created_collections, uuid):
- for v in viewvalues(created_collections):
- if uuid in (v["uuid"], v["portable_data_hash"]):
- return CollectionExecute(v)
-
- created_collections = {
- "99999999999999999999999999999998+99": {
- "uuid": "",
- "portable_data_hash": "99999999999999999999999999999998+99",
- "manifest_text": ". 99999999999999999999999999999998+99 0:0:file1.txt"
- },
- "99999999999999999999999999999997+99": {
- "uuid": "",
- "portable_data_hash": "99999999999999999999999999999997+99",
- "manifest_text": ". 99999999999999999999999999999997+99 0:0:file1.txt"
- },
- "99999999999999999999999999999994+99": {
- "uuid": "",
- "portable_data_hash": "99999999999999999999999999999994+99",
- "manifest_text": ". 99999999999999999999999999999994+99 0:0:expect_arvworkflow.cwl"
- },
- "zzzzz-4zz18-zzzzzzzzzzzzzd3": {
- "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzd3",
- "portable_data_hash": "999999999999999999999999999999d3+99",
- "manifest_text": ""
- },
- "zzzzz-4zz18-zzzzzzzzzzzzzd4": {
- "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzd4",
- "portable_data_hash": "999999999999999999999999999999d4+99",
- "manifest_text": ""
- },
- "zzzzz-4zz18-zzzzzzzzzzzzzd5": {
- "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzd5",
- "portable_data_hash": "999999999999999999999999999999d5+99",
- "manifest_text": ""
- },
- "zzzzz-4zz18-zzzzzzzzzzzzzd6": {
- "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzd6",
- "portable_data_hash": "999999999999999999999999999999d6+99",
- "manifest_text": ""
+ stubs.api.collections().list().execute.return_value = {"items": []}
+ stubs.api.containers().current().execute.return_value = {
+ "uuid": stubs.fake_container_uuid,
}
- }
- stubs.api.collections().create.side_effect = functools.partial(collection_createstub, created_collections)
- stubs.api.collections().get.side_effect = functools.partial(collection_getstub, created_collections)
-
- stubs.expect_job_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
- stubs.api.jobs().create().execute.return_value = {
- "uuid": stubs.expect_job_uuid,
- "state": "Queued",
- }
-
- stubs.expect_container_request_uuid = "zzzzz-xvhdp-zzzzzzzzzzzzzzz"
- stubs.api.container_requests().create().execute.return_value = {
- "uuid": stubs.expect_container_request_uuid,
- "container_uuid": "zzzzz-dz642-zzzzzzzzzzzzzzz",
- "state": "Queued"
- }
-
- stubs.expect_pipeline_template_uuid = "zzzzz-d1hrv-zzzzzzzzzzzzzzz"
- stubs.api.pipeline_templates().create().execute.return_value = {
- "uuid": stubs.expect_pipeline_template_uuid,
- }
- stubs.expect_job_spec = {
- 'runtime_constraints': {
- 'docker_image': '999999999999999999999999999999d3+99',
- 'min_ram_mb_per_node': 1024
- },
- 'script_parameters': {
- 'x': {
- 'basename': 'blorp.txt',
- 'location': 'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
- 'class': 'File'
+ stubs.api.config()["StorageClasses"].items.return_value = {
+ "default": {
+ "Default": True
+ }
+ }.items()
+
+ class CollectionExecute(object):
+ def __init__(self, exe):
+ self.exe = exe
+ def execute(self, num_retries=None):
+ return self.exe
+
+ def collection_createstub(created_collections, body, ensure_unique_name=None):
+ mt = body["manifest_text"].encode('utf-8')
+ uuid = "zzzzz-4zz18-zzzzzzzzzzzzzx%d" % len(created_collections)
+ pdh = "%s+%i" % (hashlib.md5(mt).hexdigest(), len(mt))
+ created_collections[uuid] = {
+ "uuid": uuid,
+ "portable_data_hash": pdh,
+ "manifest_text": mt.decode('utf-8')
+ }
+ return CollectionExecute(created_collections[uuid])
+
+ def collection_getstub(created_collections, uuid):
+ for v in viewvalues(created_collections):
+ if uuid in (v["uuid"], v["portable_data_hash"]):
+ return CollectionExecute(v)
+
+ created_collections = {
+ "99999999999999999999999999999998+99": {
+ "uuid": "",
+ "portable_data_hash": "99999999999999999999999999999998+99",
+ "manifest_text": ". 99999999999999999999999999999998+99 0:0:file1.txt"
},
- 'y': {
- 'basename': '99999999999999999999999999999998+99',
- 'location': 'keep:99999999999999999999999999999998+99',
- 'class': 'Directory'
+ "99999999999999999999999999999997+99": {
+ "uuid": "",
+ "portable_data_hash": "99999999999999999999999999999997+99",
+ "manifest_text": ". 99999999999999999999999999999997+99 0:0:file1.txt"
},
- 'z': {
- 'basename': 'anonymous',
- "listing": [{
- "basename": "renamed.txt",
- "class": "File",
- "location": "keep:99999999999999999999999999999998+99/file1.txt",
- "size": 0
- }],
- 'class': 'Directory'
+ "99999999999999999999999999999994+99": {
+ "uuid": "",
+ "portable_data_hash": "99999999999999999999999999999994+99",
+ "manifest_text": ". 99999999999999999999999999999994+99 0:0:expect_arvworkflow.cwl"
},
- 'cwl:tool': '57ad063d64c60dbddc027791f0649211+60/workflow.cwl#main'
- },
- 'repository': 'arvados',
- 'script_version': 'master',
- 'minimum_script_version': '570509ab4d2ef93d870fd2b1f2eab178afb1bad9',
- 'script': 'cwl-runner'
- }
- stubs.pipeline_component = stubs.expect_job_spec.copy()
- stubs.expect_pipeline_instance = {
- 'name': 'submit_wf.cwl',
- 'state': 'RunningOnServer',
- 'owner_uuid': None,
- "components": {
- "cwl-runner": {
- 'runtime_constraints': {'docker_image': '999999999999999999999999999999d3+99', 'min_ram_mb_per_node': 1024},
- 'script_parameters': {
- 'y': {"value": {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'}},
- 'x': {"value": {
- 'basename': 'blorp.txt',
- 'class': 'File',
- 'location': 'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
- "size": 16
- }},
- 'z': {"value": {'basename': 'anonymous', 'class': 'Directory',
- 'listing': [
- {
- 'basename': 'renamed.txt',
- 'class': 'File', 'location':
- 'keep:99999999999999999999999999999998+99/file1.txt',
- 'size': 0
- }
- ]}},
- 'cwl:tool': '57ad063d64c60dbddc027791f0649211+60/workflow.cwl#main',
- 'arv:debug': True,
- 'arv:enable_reuse': True,
- 'arv:on_error': 'continue'
- },
- 'repository': 'arvados',
- 'script_version': 'master',
- 'minimum_script_version': '570509ab4d2ef93d870fd2b1f2eab178afb1bad9',
- 'script': 'cwl-runner',
- 'job': {'state': 'Queued', 'uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}
+ "zzzzz-4zz18-zzzzzzzzzzzzzd3": {
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzd3",
+ "portable_data_hash": "999999999999999999999999999999d3+99",
+ "manifest_text": ""
+ },
+ "zzzzz-4zz18-zzzzzzzzzzzzzd4": {
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzd4",
+ "portable_data_hash": "999999999999999999999999999999d4+99",
+ "manifest_text": ""
+ },
+ "zzzzz-4zz18-zzzzzzzzzzzzzd5": {
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzd5",
+ "portable_data_hash": "999999999999999999999999999999d5+99",
+ "manifest_text": ""
+ },
+ "zzzzz-4zz18-zzzzzzzzzzzzzd6": {
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzd6",
+ "portable_data_hash": "999999999999999999999999999999d6+99",
+ "manifest_text": ""
}
}
- }
- stubs.pipeline_create = copy.deepcopy(stubs.expect_pipeline_instance)
- stubs.expect_pipeline_uuid = "zzzzz-d1hrv-zzzzzzzzzzzzzzz"
- stubs.pipeline_create["uuid"] = stubs.expect_pipeline_uuid
- stubs.pipeline_with_job = copy.deepcopy(stubs.pipeline_create)
- stubs.pipeline_with_job["components"]["cwl-runner"]["job"] = {
- "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
- "state": "Queued"
- }
- stubs.api.pipeline_instances().create().execute.return_value = stubs.pipeline_create
- stubs.api.pipeline_instances().get().execute.return_value = stubs.pipeline_with_job
+ stubs.api.collections().create.side_effect = functools.partial(collection_createstub, created_collections)
+ stubs.api.collections().get.side_effect = functools.partial(collection_getstub, created_collections)
- with open("tests/wf/submit_wf_packed.cwl") as f:
- expect_packed_workflow = yaml.round_trip_load(f)
+ stubs.expect_job_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+ stubs.api.jobs().create().execute.return_value = {
+ "uuid": stubs.expect_job_uuid,
+ "state": "Queued",
+ }
- stubs.expect_container_spec = {
- 'priority': 500,
- 'mounts': {
- '/var/spool/cwl': {
- 'writable': True,
- 'kind': 'collection'
- },
- '/var/lib/cwl/workflow.json': {
- 'content': expect_packed_workflow,
- 'kind': 'json'
+ stubs.expect_container_request_uuid = "zzzzz-xvhdp-zzzzzzzzzzzzzzz"
+ stubs.api.container_requests().create().execute.return_value = {
+ "uuid": stubs.expect_container_request_uuid,
+ "container_uuid": "zzzzz-dz642-zzzzzzzzzzzzzzz",
+ "state": "Queued"
+ }
+
+ stubs.expect_pipeline_template_uuid = "zzzzz-d1hrv-zzzzzzzzzzzzzzz"
+ stubs.api.pipeline_templates().create().execute.return_value = {
+ "uuid": stubs.expect_pipeline_template_uuid,
+ }
+ stubs.expect_job_spec = {
+ 'runtime_constraints': {
+ 'docker_image': '999999999999999999999999999999d3+99',
+ 'min_ram_mb_per_node': 1024
},
- 'stdout': {
- 'path': '/var/spool/cwl/cwl.output.json',
- 'kind': 'file'
+ 'script_parameters': {
+ 'x': {
+ 'basename': 'blorp.txt',
+ 'location': 'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
+ 'class': 'File'
+ },
+ 'y': {
+ 'basename': '99999999999999999999999999999998+99',
+ 'location': 'keep:99999999999999999999999999999998+99',
+ 'class': 'Directory'
+ },
+ 'z': {
+ 'basename': 'anonymous',
+ "listing": [{
+ "basename": "renamed.txt",
+ "class": "File",
+ "location": "keep:99999999999999999999999999999998+99/file1.txt",
+ "size": 0
+ }],
+ 'class': 'Directory'
+ },
+ 'cwl:tool': '57ad063d64c60dbddc027791f0649211+60/workflow.cwl#main'
},
- '/var/lib/cwl/cwl.input.json': {
- 'kind': 'json',
- 'content': {
- 'y': {
- 'basename': '99999999999999999999999999999998+99',
- 'location': 'keep:99999999999999999999999999999998+99',
- 'class': 'Directory'},
- 'x': {
- 'basename': u'blorp.txt',
- 'class': 'File',
- 'location': u'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
- "size": 16
+ 'repository': 'arvados',
+ 'script_version': 'master',
+ 'minimum_script_version': '570509ab4d2ef93d870fd2b1f2eab178afb1bad9',
+ 'script': 'cwl-runner'
+ }
+ stubs.pipeline_component = stubs.expect_job_spec.copy()
+ stubs.expect_pipeline_instance = {
+ 'name': 'submit_wf.cwl',
+ 'state': 'RunningOnServer',
+ 'owner_uuid': None,
+ "components": {
+ "cwl-runner": {
+ 'runtime_constraints': {'docker_image': '999999999999999999999999999999d3+99', 'min_ram_mb_per_node': 1024},
+ 'script_parameters': {
+ 'y': {"value": {'basename': '99999999999999999999999999999998+99', 'location': 'keep:99999999999999999999999999999998+99', 'class': 'Directory'}},
+ 'x': {"value": {
+ 'basename': 'blorp.txt',
+ 'class': 'File',
+ 'location': 'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
+ "size": 16
+ }},
+ 'z': {"value": {'basename': 'anonymous', 'class': 'Directory',
+ 'listing': [
+ {
+ 'basename': 'renamed.txt',
+ 'class': 'File', 'location':
+ 'keep:99999999999999999999999999999998+99/file1.txt',
+ 'size': 0
+ }
+ ]}},
+ 'cwl:tool': '57ad063d64c60dbddc027791f0649211+60/workflow.cwl#main',
+ 'arv:debug': True,
+ 'arv:enable_reuse': True,
+ 'arv:on_error': 'continue'
},
- 'z': {'basename': 'anonymous', 'class': 'Directory', 'listing': [
- {'basename': 'renamed.txt',
- 'class': 'File',
- 'location': 'keep:99999999999999999999999999999998+99/file1.txt',
- 'size': 0
- }
- ]}
- },
- 'kind': 'json'
+ 'repository': 'arvados',
+ 'script_version': 'master',
+ 'minimum_script_version': '570509ab4d2ef93d870fd2b1f2eab178afb1bad9',
+ 'script': 'cwl-runner',
+ 'job': {'state': 'Queued', 'uuid': 'zzzzz-8i9sb-zzzzzzzzzzzzzzz'}
+ }
}
- },
- 'secret_mounts': {},
- 'state': 'Committed',
- 'command': ['arvados-cwl-runner', '--local', '--api=containers',
- '--no-log-timestamps', '--disable-validate', '--disable-color',
- '--eval-timeout=20', '--thread-count=0',
- '--enable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
- '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
- 'name': 'submit_wf.cwl',
- 'container_image': '999999999999999999999999999999d3+99',
- 'output_path': '/var/spool/cwl',
- 'cwd': '/var/spool/cwl',
- 'runtime_constraints': {
- 'API': True,
- 'vcpus': 1,
- 'ram': (1024+256)*1024*1024
- },
- 'use_existing': False,
- 'properties': {},
- 'secret_mounts': {}
- }
-
- stubs.expect_workflow_uuid = "zzzzz-7fd4e-zzzzzzzzzzzzzzz"
- stubs.api.workflows().create().execute.return_value = {
- "uuid": stubs.expect_workflow_uuid,
- }
- def update_mock(**kwargs):
- stubs.updated_uuid = kwargs.get('uuid')
- return mock.DEFAULT
- stubs.api.workflows().update.side_effect = update_mock
- stubs.api.workflows().update().execute.side_effect = lambda **kwargs: {
- "uuid": stubs.updated_uuid,
- }
+ }
+ stubs.pipeline_create = copy.deepcopy(stubs.expect_pipeline_instance)
+ stubs.expect_pipeline_uuid = "zzzzz-d1hrv-zzzzzzzzzzzzzzz"
+ stubs.pipeline_create["uuid"] = stubs.expect_pipeline_uuid
+ stubs.pipeline_with_job = copy.deepcopy(stubs.pipeline_create)
+ stubs.pipeline_with_job["components"]["cwl-runner"]["job"] = {
+ "uuid": "zzzzz-8i9sb-zzzzzzzzzzzzzzz",
+ "state": "Queued"
+ }
+ stubs.api.pipeline_instances().create().execute.return_value = stubs.pipeline_create
+ stubs.api.pipeline_instances().get().execute.return_value = stubs.pipeline_with_job
+
+ with open("tests/wf/submit_wf_packed.cwl") as f:
+ expect_packed_workflow = yaml.round_trip_load(f)
+
+ stubs.expect_container_spec = {
+ 'priority': 500,
+ 'mounts': {
+ '/var/spool/cwl': {
+ 'writable': True,
+ 'kind': 'collection'
+ },
+ '/var/lib/cwl/workflow.json': {
+ 'content': expect_packed_workflow,
+ 'kind': 'json'
+ },
+ 'stdout': {
+ 'path': '/var/spool/cwl/cwl.output.json',
+ 'kind': 'file'
+ },
+ '/var/lib/cwl/cwl.input.json': {
+ 'kind': 'json',
+ 'content': {
+ 'y': {
+ 'basename': '99999999999999999999999999999998+99',
+ 'location': 'keep:99999999999999999999999999999998+99',
+ 'class': 'Directory'},
+ 'x': {
+ 'basename': u'blorp.txt',
+ 'class': 'File',
+ 'location': u'keep:169f39d466a5438ac4a90e779bf750c7+53/blorp.txt',
+ "size": 16
+ },
+ 'z': {'basename': 'anonymous', 'class': 'Directory', 'listing': [
+ {'basename': 'renamed.txt',
+ 'class': 'File',
+ 'location': 'keep:99999999999999999999999999999998+99/file1.txt',
+ 'size': 0
+ }
+ ]}
+ },
+ 'kind': 'json'
+ }
+ },
+ 'secret_mounts': {},
+ 'state': 'Committed',
+ 'command': ['arvados-cwl-runner', '--local', '--api=containers',
+ '--no-log-timestamps', '--disable-validate', '--disable-color',
+ '--eval-timeout=20', '--thread-count=0',
+ '--enable-reuse', "--collection-cache-size=256",
+ '--output-name=Output from workflow '+wfname,
+ '--debug', '--on-error=continue',
+ '/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json'],
+ 'name': wfname,
+ 'container_image': '999999999999999999999999999999d3+99',
+ 'output_name': 'Output from workflow '+wfname,
+ 'output_path': '/var/spool/cwl',
+ 'cwd': '/var/spool/cwl',
+ 'runtime_constraints': {
+ 'API': True,
+ 'vcpus': 1,
+ 'ram': (1024+256)*1024*1024
+ },
+ 'use_existing': False,
+ 'properties': {},
+ 'secret_mounts': {}
+ }
- return func(self, stubs, *args, **kwargs)
- return wrapped
+ stubs.expect_workflow_uuid = "zzzzz-7fd4e-zzzzzzzzzzzzzzz"
+ stubs.api.workflows().create().execute.return_value = {
+ "uuid": stubs.expect_workflow_uuid,
+ }
+ def update_mock(**kwargs):
+ stubs.updated_uuid = kwargs.get('uuid')
+ return mock.DEFAULT
+ stubs.api.workflows().update.side_effect = update_mock
+ stubs.api.workflows().update().execute.side_effect = lambda **kwargs: {
+ "uuid": stubs.updated_uuid,
+ }
+ return func(self, stubs, *args, **kwargs)
+ return wrapped
+ return outer_wrapper
class TestSubmit(unittest.TestCase):
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
'--disable-reuse', "--collection-cache-size=256",
+ "--output-name=Output from workflow submit_wf.cwl",
'--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container["use_existing"] = False
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs('submit_wf_no_reuse.cwl')
def test_submit_container_reuse_disabled_by_workflow(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug",
'arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
- '--disable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
+ '--disable-reuse', "--collection-cache-size=256",
+ '--output-name=Output from workflow submit_wf_no_reuse.cwl', '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container["use_existing"] = False
- expect_container["name"] = "submit_wf_no_reuse.cwl"
expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][1]["hints"] = [
{
"class": "http://arvados.org/cwl#ReuseRequirement",
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
'--enable-reuse', "--collection-cache-size=256",
+ "--output-name=Output from workflow submit_wf.cwl",
'--debug', '--on-error=stop',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
- '--enable-reuse', "--collection-cache-size=256", "--debug",
+ '--enable-reuse', "--collection-cache-size=256",
+ '--output-name=Output from workflow submit_wf.cwl',
+ "--debug",
"--storage-classes=foo", '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
- '--enable-reuse', "--collection-cache-size=256", "--debug",
+ '--enable-reuse', "--collection-cache-size=256",
+ "--output-name=Output from workflow submit_wf.cwl",
+ "--debug",
"--storage-classes=foo,bar", "--intermediate-storage-classes=baz", '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
make_output.return_value = ({},final_output_c)
def set_final_output(job_order, output_callback, runtimeContext):
- output_callback("zzzzz-4zz18-zzzzzzzzzzzzzzzz", "success")
+ output_callback({"out": "zzzzz"}, "success")
return []
job.side_effect = set_final_output
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
- make_output.assert_called_with(u'Output of submit_wf.cwl', ['foo'], '', 'zzzzz-4zz18-zzzzzzzzzzzzzzzz')
+ make_output.assert_called_with(u'Output of submit_wf.cwl', ['foo'], '', {}, {"out": "zzzzz"})
self.assertEqual(exited, 0)
@mock.patch("cwltool.task_queue.TaskQueue")
stubs.api.config().get.return_value = {"default": {"Default": True}}
def set_final_output(job_order, output_callback, runtimeContext):
- output_callback("zzzzz-4zz18-zzzzzzzzzzzzzzzz", "success")
+ output_callback({"out": "zzzzz"}, "success")
return []
job.side_effect = set_final_output
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
- make_output.assert_called_with(u'Output of submit_wf.cwl', ['default'], '', 'zzzzz-4zz18-zzzzzzzzzzzzzzzz')
+ make_output.assert_called_with(u'Output of submit_wf.cwl', ['default'], '', {}, {"out": "zzzzz"})
self.assertEqual(exited, 0)
@mock.patch("cwltool.task_queue.TaskQueue")
make_output.return_value = ({},final_output_c)
def set_final_output(job_order, output_callback, runtimeContext):
- output_callback("zzzzz-4zz18-zzzzzzzzzzzzzzzz", "success")
+ output_callback({"out": "zzzzz"}, "success")
return []
job.side_effect = set_final_output
"tests/wf/submit_storage_class_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
- make_output.assert_called_with(u'Output of submit_storage_class_wf.cwl', ['foo', 'bar'], '', 'zzzzz-4zz18-zzzzzzzzzzzzzzzz')
+ make_output.assert_called_with(u'Output of submit_storage_class_wf.cwl', ['foo', 'bar'], '', {}, {"out": "zzzzz"})
self.assertEqual(exited, 0)
@stubs
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
- '--enable-reuse', "--collection-cache-size=256", '--debug',
+ '--enable-reuse', "--collection-cache-size=256",
+ "--output-name=Output from workflow submit_wf.cwl", '--debug',
'--on-error=continue',
"--intermediate-output-ttl=3600",
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
'--enable-reuse', "--collection-cache-size=256",
+ "--output-name=Output from workflow submit_wf.cwl",
"--output-tags="+output_tags, '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
}, 'state': 'Committed',
'output_path': '/var/spool/cwl',
'name': 'expect_arvworkflow.cwl#main',
+ 'output_name': 'Output from workflow expect_arvworkflow.cwl#main',
'container_image': '999999999999999999999999999999d3+99',
'command': ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
- '--enable-reuse', "--collection-cache-size=256", '--debug', '--on-error=continue',
+ '--enable-reuse', "--collection-cache-size=256",
+ '--output-name=Output from workflow expect_arvworkflow.cwl#main',
+ '--debug', '--on-error=continue',
'/var/lib/cwl/workflow/expect_arvworkflow.cwl#main', '/var/lib/cwl/cwl.input.json'],
'cwd': '/var/spool/cwl',
'runtime_constraints': {
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs('hello container 123')
def test_submit_container_name(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--name=hello container 123",
stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
expect_container = copy.deepcopy(stubs.expect_container_spec)
- expect_container["name"] = "hello container 123"
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate', '--disable-color',
"--eval-timeout=20", "--thread-count=0",
- '--enable-reuse', "--collection-cache-size=256", '--debug',
+ '--enable-reuse', "--collection-cache-size=256",
+ "--output-name=Output from workflow submit_wf.cwl", '--debug',
'--on-error=continue',
'--project-uuid='+project_uuid,
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
- @stubs
+ @stubs('submit_wf_runner_resources.cwl')
def test_submit_wf_runner_resources(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug",
"vcpus": 2,
"ram": (2000+512) * 2**20
}
- expect_container["name"] = "submit_wf_runner_resources.cwl"
expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][1]["hints"] = [
{
"class": "http://arvados.org/cwl#WorkflowRunnerResources",
expect_container['command'] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate', '--disable-color',
'--eval-timeout=20', '--thread-count=0',
- '--enable-reuse', "--collection-cache-size=512", '--debug', '--on-error=continue',
+ '--enable-reuse', "--collection-cache-size=512",
+ '--output-name=Output from workflow submit_wf_runner_resources.cwl',
+ '--debug', '--on-error=continue',
'/var/lib/cwl/workflow.json#main', '/var/lib/cwl/cwl.input.json']
stubs.api.container_requests().create.assert_called_with(
"name": "arvados/jobs:"+arvados_cwl.__version__,
"owner_uuid": "",
"properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0},
+ {"items": [{"created_at": "",
+ "head_uuid": "",
+ "link_class": "docker_image_hash",
+ "name": "123456",
+ "owner_uuid": "",
+ "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0},
+ {"items": [{"created_at": "",
+ "head_uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
+ "link_class": "docker_image_repo+tag",
+ "name": "arvados/jobs:"+arvados_cwl.__version__,
+ "owner_uuid": "",
+ "properties": {"image_timestamp": ""}}], "items_available": 1, "offset": 0},
{"items": [{"created_at": "",
"head_uuid": "",
"link_class": "docker_image_hash",
"owner_uuid": "",
"manifest_text": "",
"properties": ""
- }], "items_available": 1, "offset": 0},)
+ }], "items_available": 1, "offset": 0},
+ {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
+ "owner_uuid": "",
+ "manifest_text": "",
+ "properties": ""
+ }], "items_available": 1, "offset": 0})
arvrunner.api.collections().create().execute.return_value = {"uuid": ""}
arvrunner.api.collections().get().execute.return_value = {"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzb",
"portable_data_hash": "9999999999999999999999999999999b+99"}
+
self.assertEqual("9999999999999999999999999999999b+99",
- arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__))
+ arvados_cwl.runner.arvados_jobs_image(arvrunner, "arvados/jobs:"+arvados_cwl.__version__, arvrunner.runtimeContext))
@stubs
'--thread-count=0',
"--enable-reuse",
"--collection-cache-size=256",
+ '--output-name=Output from workflow secret_wf.cwl'
'--debug',
"--on-error=continue",
"/var/lib/cwl/workflow.json#main",
}
},
"name": "secret_wf.cwl",
+ "output_name": "Output from workflow secret_wf.cwl",
"output_path": "/var/spool/cwl",
"priority": 500,
"properties": {},
finally:
cwltool_logger.removeHandler(stderr_logger)
- @stubs
+ @stubs('submit_wf_process_properties.cwl')
def test_submit_set_process_properties(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug",
stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
expect_container = copy.deepcopy(stubs.expect_container_spec)
- expect_container["name"] = "submit_wf_process_properties.cwl"
+
expect_container["mounts"]["/var/lib/cwl/workflow.json"]["content"]["$graph"][1]["hints"] = [
{
"class": "http://arvados.org/cwl#ProcessProperties",
@stubs
def test_update(self, stubs):
+ project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+ stubs.api.workflows().get().execute.return_value = {"owner_uuid": project_uuid}
+
exited = arvados_cwl.main(
["--update-workflow", self.existing_workflow_uuid,
"--debug",
"name": "submit_wf.cwl",
"description": "",
"definition": self.expect_workflow,
+ "owner_uuid": project_uuid
}
}
stubs.api.workflows().update.assert_called_with(
@stubs
def test_update_name(self, stubs):
+ project_uuid = 'zzzzz-j7d0g-zzzzzzzzzzzzzzz'
+ stubs.api.workflows().get().execute.return_value = {"owner_uuid": project_uuid}
+
exited = arvados_cwl.main(
["--update-workflow", self.existing_workflow_uuid,
"--debug", "--name", "testing 123",
"name": "testing 123",
"description": "",
"definition": self.expect_workflow,
+ "owner_uuid": project_uuid
}
}
stubs.api.workflows().update.assert_called_with(
Controller Service
DispatchCloud Service
DispatchLSF Service
+ DispatchSLURM Service
GitHTTP Service
GitSSH Service
Health Service
}
type InstanceType struct {
- Name string
+ Name string `json:"-"`
ProviderType string
VCPUs int
RAM ByteSize
- Scratch ByteSize
+ Scratch ByteSize `json:"-"`
IncludedScratch ByteSize
AddedScratch ByteSize
Price float64
var errDuplicateInstanceTypeName = errors.New("duplicate instance type name")
-// UnmarshalJSON handles old config files that provide an array of
-// instance types instead of a hash.
+// UnmarshalJSON does special handling of InstanceTypes:
+// * populate computed fields (Name and Scratch)
+// * error out if InstancesTypes are populated as an array, which was
+// deprecated in Arvados 1.2.0
func (it *InstanceTypeMap) UnmarshalJSON(data []byte) error {
fixup := func(t InstanceType) (InstanceType, error) {
if t.ProviderType == "" {
t.ProviderType = t.Name
}
- if t.Scratch == 0 {
- t.Scratch = t.IncludedScratch + t.AddedScratch
- } else if t.AddedScratch == 0 {
- t.AddedScratch = t.Scratch - t.IncludedScratch
- } else if t.IncludedScratch == 0 {
- t.IncludedScratch = t.Scratch - t.AddedScratch
- }
-
- if t.Scratch != (t.IncludedScratch + t.AddedScratch) {
- return t, fmt.Errorf("InstanceType %q: Scratch != (IncludedScratch + AddedScratch)", t.Name)
- }
+ // If t.Scratch is set in the configuration file, it will be ignored and overwritten.
+ // It will also generate a "deprecated or unknown config entry" warning.
+ t.Scratch = t.IncludedScratch + t.AddedScratch
return t, nil
}
if len(data) > 0 && data[0] == '[' {
- var arr []InstanceType
- err := json.Unmarshal(data, &arr)
- if err != nil {
- return err
- }
- if len(arr) == 0 {
- *it = nil
- return nil
- }
- *it = make(map[string]InstanceType, len(arr))
- for _, t := range arr {
- if _, ok := (*it)[t.Name]; ok {
- return errDuplicateInstanceTypeName
- }
- t, err := fixup(t)
- if err != nil {
- return err
- }
- (*it)[t.Name] = t
- }
- return nil
+ return fmt.Errorf("InstanceTypes must be specified as a map, not an array, see https://doc.arvados.org/admin/config.html")
}
var hash map[string]InstanceType
err := json.Unmarshal(data, &hash)
ServiceNameController ServiceName = "arvados-controller"
ServiceNameDispatchCloud ServiceName = "arvados-dispatch-cloud"
ServiceNameDispatchLSF ServiceName = "arvados-dispatch-lsf"
+ ServiceNameDispatchSLURM ServiceName = "crunch-dispatch-slurm"
ServiceNameGitHTTP ServiceName = "arvados-git-httpd"
ServiceNameHealth ServiceName = "arvados-health"
ServiceNameKeepbalance ServiceName = "keep-balance"
ServiceNameController: svcs.Controller,
ServiceNameDispatchCloud: svcs.DispatchCloud,
ServiceNameDispatchLSF: svcs.DispatchLSF,
+ ServiceNameDispatchSLURM: svcs.DispatchSLURM,
ServiceNameGitHTTP: svcs.GitHTTP,
ServiceNameHealth: svcs.Health,
ServiceNameKeepbalance: svcs.Keepbalance,
type ConfigSuite struct{}
-func (s *ConfigSuite) TestInstanceTypesAsArray(c *check.C) {
+func (s *ConfigSuite) TestStringSetAsArray(c *check.C) {
var cluster Cluster
yaml.Unmarshal([]byte(`
API:
c.Check(ok, check.Equals, true)
}
-func (s *ConfigSuite) TestStringSetAsArray(c *check.C) {
- var cluster Cluster
- yaml.Unmarshal([]byte("InstanceTypes:\n- Name: foo\n"), &cluster)
- c.Check(len(cluster.InstanceTypes), check.Equals, 1)
- c.Check(cluster.InstanceTypes["foo"].Name, check.Equals, "foo")
-}
-
func (s *ConfigSuite) TestInstanceTypesAsHash(c *check.C) {
var cluster Cluster
yaml.Unmarshal([]byte("InstanceTypes:\n foo:\n ProviderType: bar\n"), &cluster)
func (s *ConfigSuite) TestInstanceTypeSize(c *check.C) {
var it InstanceType
- err := yaml.Unmarshal([]byte("Name: foo\nScratch: 4GB\nRAM: 4GiB\n"), &it)
+ err := yaml.Unmarshal([]byte("Name: foo\nIncludedScratch: 4GB\nRAM: 4GiB\n"), &it)
c.Check(err, check.IsNil)
- c.Check(int64(it.Scratch), check.Equals, int64(4000000000))
+ c.Check(int64(it.IncludedScratch), check.Equals, int64(4000000000))
c.Check(int64(it.RAM), check.Equals, int64(4294967296))
}
func (s *ConfigSuite) TestInstanceTypeFixup(c *check.C) {
for _, confdata := range []string{
// Current format: map of entries
- `{foo4: {IncludedScratch: 4GB}, foo8: {ProviderType: foo_8, Scratch: 8GB}}`,
- // Legacy format: array of entries with key in "Name" field
- `[{Name: foo4, IncludedScratch: 4GB}, {Name: foo8, ProviderType: foo_8, Scratch: 8GB}]`,
+ `{foo4: {IncludedScratch: 4GB}, foo8: {ProviderType: foo_8, AddedScratch: 8GB}}`,
} {
c.Log(confdata)
var itm InstanceTypeMap
Filters []Filter `json:"filters"`
ContainerCount int `json:"container_count"`
OutputStorageClasses []string `json:"output_storage_classes"`
+ OutputProperties map[string]interface{} `json:"output_properties"`
}
// Mount is special behavior to attach to a filesystem path or device.
"git.arvados.org/arvados.git/sdk/go/auth"
"git.arvados.org/arvados.git/sdk/go/ctxlog"
"github.com/ghodss/yaml"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
-const defaultTimeout = arvados.Duration(2 * time.Second)
+const (
+ defaultTimeout = arvados.Duration(2 * time.Second)
+ maxClockSkew = time.Minute
+)
// Aggregator implements service.Handler. It handles "GET /_health/all"
// by checking the health of all configured services on the cluster
// If non-nil, Log is called after handling each request.
Log func(*http.Request, error)
+
+ // If non-nil, report clock skew on each health-check.
+ MetricClockSkew prometheus.Gauge
}
func (agg *Aggregator) setup() {
// anywhere."
Services map[arvados.ServiceName]ServiceHealth `json:"services"`
+ // Difference between min/max timestamps in individual
+ // health-check responses.
+ ClockSkew arvados.Duration
+
Errors []string `json:"errors"`
}
HTTPStatusText string `json:",omitempty"`
Response map[string]interface{} `json:"response"`
ResponseTime json.Number `json:"responseTime"`
+ ClockTime time.Time `json:"clockTime"`
Metrics Metrics `json:"-"`
+ respTime time.Duration
}
type Metrics struct {
}
}
+ var maxResponseTime time.Duration
+ var clockMin, clockMax time.Time
+ for _, result := range resp.Checks {
+ if result.ClockTime.IsZero() {
+ continue
+ }
+ if clockMin.IsZero() || result.ClockTime.Before(clockMin) {
+ clockMin = result.ClockTime
+ }
+ if result.ClockTime.After(clockMax) {
+ clockMax = result.ClockTime
+ }
+ if result.respTime > maxResponseTime {
+ maxResponseTime = result.respTime
+ }
+ }
+ skew := clockMax.Sub(clockMin)
+ resp.ClockSkew = arvados.Duration(skew)
+ if skew > maxClockSkew+maxResponseTime {
+ msg := fmt.Sprintf("clock skew detected: maximum timestamp spread is %s (exceeds warning threshold of %s)", resp.ClockSkew, arvados.Duration(maxClockSkew))
+ resp.Errors = append(resp.Errors, msg)
+ resp.Health = "ERROR"
+ }
+ if agg.MetricClockSkew != nil {
+ agg.MetricClockSkew.Set(skew.Seconds())
+ }
+
var newest Metrics
for _, result := range resp.Checks {
if result.Metrics.ConfigSourceTimestamp.After(newest.ConfigSourceTimestamp) {
func (agg *Aggregator) ping(target *url.URL) (result CheckResult) {
t0 := time.Now()
defer func() {
- result.ResponseTime = json.Number(fmt.Sprintf("%.6f", time.Since(t0).Seconds()))
+ result.respTime = time.Since(t0)
+ result.ResponseTime = json.Number(fmt.Sprintf("%.6f", result.respTime.Seconds()))
}()
result.Health = "ERROR"
}
}
result.Health = "OK"
+ result.ClockTime, _ = time.Parse(time.RFC1123, resp.Header.Get("Date"))
return
}
s.checkOK(c)
}
+func (s *AggregatorSuite) TestClockSkew(c *check.C) {
+ // srv1: report real wall clock time
+ handler1 := healthyHandler{}
+ srv1, listen1 := s.stubServer(&handler1)
+ defer srv1.Close()
+ // srv2: report near-future time
+ handler2 := healthyHandler{headerDate: time.Now().Add(3 * time.Second)}
+ srv2, listen2 := s.stubServer(&handler2)
+ defer srv2.Close()
+ // srv3: report far-future time
+ handler3 := healthyHandler{headerDate: time.Now().Add(3*time.Minute + 3*time.Second)}
+ srv3, listen3 := s.stubServer(&handler3)
+ defer srv3.Close()
+
+ s.setAllServiceURLs(listen1)
+
+ // near-future time => OK
+ s.resp = httptest.NewRecorder()
+ arvadostest.SetServiceURL(&s.handler.Cluster.Services.DispatchCloud,
+ "http://localhost"+listen2+"/")
+ s.handler.ServeHTTP(s.resp, s.req)
+ s.checkOK(c)
+
+ // far-future time => error
+ s.resp = httptest.NewRecorder()
+ arvadostest.SetServiceURL(&s.handler.Cluster.Services.WebDAV,
+ "http://localhost"+listen3+"/")
+ s.handler.ServeHTTP(s.resp, s.req)
+ resp := s.checkUnhealthy(c)
+ if c.Check(len(resp.Errors) > 0, check.Equals, true) {
+ c.Check(resp.Errors[0], check.Matches, `clock skew detected: maximum timestamp spread is 3m.* \(exceeds warning threshold of 1m\)`)
+ }
+}
+
func (s *AggregatorSuite) TestPingTimeout(c *check.C) {
s.handler.timeout = arvados.Duration(100 * time.Millisecond)
srv, listen := s.stubServer(&slowHandler{})
&svcs.Controller,
&svcs.DispatchCloud,
&svcs.DispatchLSF,
+ &svcs.DispatchSLURM,
&svcs.GitHTTP,
&svcs.Keepbalance,
&svcs.Keepproxy,
type healthyHandler struct {
configHash string
configTime time.Time
+ headerDate time.Time
}
func (h *healthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ if !h.headerDate.IsZero() {
+ resp.Header().Set("Date", h.headerDate.Format(time.RFC1123))
+ }
authOK := req.Header.Get("Authorization") == "Bearer "+arvadostest.ManagementToken
if req.URL.Path == "/_health/ping" {
if !authOK {
http._request_id = util.new_request_id
return http
+def _close_connections(self):
+ for conn in self._http.connections.values():
+ conn.close()
+
# Monkey patch discovery._cast() so objects and arrays get serialized
# with json.dumps() instead of str().
_cast_orig = apiclient_discovery._cast
svc.request_id = request_id
svc.config = lambda: util.get_config_once(svc)
svc.vocabulary = lambda: util.get_vocabulary_once(svc)
+ svc.close_connections = types.MethodType(_close_connections, svc)
kwargs['http'].max_request_size = svc._rootDesc.get('maxRequestSize', 0)
kwargs['http'].cache = None
kwargs['http']._request_id = lambda: svc.request_id or util.new_request_id()
from arvados._version import __version__
-api_client = None
logger = logging.getLogger('arvados.arv-get')
parser = argparse.ArgumentParser(
return args
def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
- global api_client
-
if stdout is sys.stdout and hasattr(stdout, 'buffer'):
# in Python 3, write to stdout as binary
stdout = stdout.buffer
request_id = arvados.util.new_request_id()
logger.info('X-Request-Id: '+request_id)
- if api_client is None:
- api_client = arvados.api('v1', request_id=request_id)
+ api_client = arvados.api('v1', request_id=request_id)
r = re.search(r'^(.*?)(/.*)?$', args.locator)
col_loc = r.group(1)
def popen_docker(cmd, *args, **kwargs):
manage_stdin = ('stdin' not in kwargs)
kwargs.setdefault('stdin', subprocess.PIPE)
- kwargs.setdefault('stdout', sys.stderr)
+ kwargs.setdefault('stdout', subprocess.PIPE)
+ kwargs.setdefault('stderr', subprocess.PIPE)
try:
docker_proc = subprocess.Popen(['docker'] + cmd, *args, **kwargs)
except OSError: # No docker in $PATH, try docker.io
'tag': tag,
}
-def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None):
+def list_images_in_arv(api_client, num_retries, image_name=None, image_tag=None, project_uuid=None):
"""List all Docker images known to the api_client with image_name and
image_tag. If no image_name is given, defaults to listing all
Docker images.
search_filters = []
repo_links = None
hash_links = None
+
+ project_filter = []
+ if project_uuid is not None:
+ project_filter = [["owner_uuid", "=", project_uuid]]
+
if image_name:
# Find images with the name the user specified.
search_links = _get_docker_links(
api_client, num_retries,
filters=[['link_class', '=', 'docker_image_repo+tag'],
['name', '=',
- '{}:{}'.format(image_name, image_tag or 'latest')]])
+ '{}:{}'.format(image_name, image_tag or 'latest')]]+project_filter)
if search_links:
repo_links = search_links
else:
search_links = _get_docker_links(
api_client, num_retries,
filters=[['link_class', '=', 'docker_image_hash'],
- ['name', 'ilike', image_name + '%']])
+ ['name', 'ilike', image_name + '%']]+project_filter)
hash_links = search_links
# Only list information about images that were found in the search.
search_filters.append(['head_uuid', 'in',
if hash_links is None:
hash_links = _get_docker_links(
api_client, num_retries,
- filters=search_filters + [['link_class', '=', 'docker_image_hash']])
+ filters=search_filters + [['link_class', '=', 'docker_image_hash']]+project_filter)
hash_link_map = {link['head_uuid']: link for link in reversed(hash_links)}
# Each collection may have more than one name (though again, one name
repo_links = _get_docker_links(
api_client, num_retries,
filters=search_filters + [['link_class', '=',
- 'docker_image_repo+tag']])
+ 'docker_image_repo+tag']]+project_filter)
seen_image_names = collections.defaultdict(set)
images = []
for link in repo_links:
# Remove any image listings that refer to unknown collections.
existing_coll_uuids = {coll['uuid'] for coll in arvados.util.list_all(
api_client.collections().list, num_retries,
- filters=[['uuid', 'in', [im['collection'] for im in images]]],
+ filters=[['uuid', 'in', [im['collection'] for im in images]]]+project_filter,
select=['uuid'])}
return [(image['collection'], image) for image in images
if image['collection'] in existing_coll_uuids]
if args.pull and not find_image_hashes(args.image):
pull_image(args.image, args.tag)
+ images_in_arv = list_images_in_arv(api, args.retries, args.image, args.tag)
+
+ image_hash = None
try:
image_hash = find_one_image_hash(args.image, args.tag)
+ if not docker_image_compatible(api, image_hash):
+ if args.force_image_format:
+ logger.warning("forcing incompatible image")
+ else:
+ logger.error("refusing to store " \
+ "incompatible format (use --force-image-format to override)")
+ sys.exit(1)
except DockerError as error:
- logger.error(str(error))
- sys.exit(1)
-
- if not docker_image_compatible(api, image_hash):
- if args.force_image_format:
- logger.warning("forcing incompatible image")
+ if images_in_arv:
+ # We don't have Docker / we don't have the image locally,
+ # use image that's already uploaded to Arvados
+ image_hash = images_in_arv[0][1]['dockerhash']
else:
- logger.error("refusing to store " \
- "incompatible format (use --force-image-format to override)")
+ logger.error(str(error))
sys.exit(1)
image_repo_tag = '{}:{}'.format(args.image, args.tag) if not image_hash.startswith(args.image.lower()) else None
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto https;
proxy_redirect off;
+ proxy_max_temp_file_size 0;
+ proxy_request_buffering off;
+ proxy_buffering off;
+ proxy_http_version 1.1;
}
}
upstream arv-git-http {
'bar.txt' : 'bar',
'subdir/baz.txt' : 'baz',
}):
- c = collection.Collection()
+ api = arvados.api()
+ c = collection.Collection(api_client=api)
for path, data in listitems(contents):
with c.open(path, 'wb') as f:
f.write(data)
c.save_new()
+ api.close_connections()
+
return (c.manifest_locator(),
c.portable_data_hash(),
c.manifest_text(strip=strip_manifest))
self.run_arv_keepdocker(['--version'], sys.stderr)
self.assertVersionOutput(out, err)
+ @mock.patch('arvados.commands.keepdocker.list_images_in_arv',
+ return_value=[])
@mock.patch('arvados.commands.keepdocker.find_image_hashes',
return_value=['abc123'])
@mock.patch('arvados.commands.keepdocker.find_one_image_hash',
return_value='abc123')
- def test_image_format_compatibility(self, _1, _2):
+ def test_image_format_compatibility(self, _1, _2, _3):
old_id = hashlib.sha256(b'old').hexdigest()
new_id = 'sha256:'+hashlib.sha256(b'new').hexdigest()
for supported, img_id, expect_ok in [
self.run_arv_keepdocker(['[::1]/repo/img'], sys.stderr)
find_image_mock.assert_called_with('[::1]/repo/img', 'latest')
+ @mock.patch('arvados.commands.keepdocker.list_images_in_arv',
+ return_value=[])
@mock.patch('arvados.commands.keepdocker.find_image_hashes',
return_value=['abc123'])
@mock.patch('arvados.commands.keepdocker.find_one_image_hash',
return_value='abc123')
- def test_collection_property_update(self, _1, _2):
+ def test_collection_property_update(self, _1, _2, _3):
image_id = 'sha256:'+hashlib.sha256(b'image').hexdigest()
fakeDD = arvados.api('v1')._rootDesc
fakeDD['dockerImageFormats'] = ['v2']
multi_json (1.15.0)
multipart-post (2.1.1)
nio4r (2.5.8)
- nokogiri (1.13.4)
+ nokogiri (1.13.6)
mini_portile2 (~> 2.8.0)
racc (~> 1.4)
oj (3.9.2)
# format is YYYYMMDD, must be fixed width (needs to be lexically
# sortable), updated manually, may be used by clients to
# determine availability of API server features.
- revision: "20220222",
+ revision: "20220510",
source_version: AppVersion.hash,
sourceVersion: AppVersion.hash, # source_version should be deprecated in the future
packageVersion: AppVersion.package_version,
attribute :runtime_status, :jsonbHash, default: {}
attribute :runtime_auth_scopes, :jsonbArray, default: []
attribute :output_storage_classes, :jsonbArray, default: lambda { Rails.configuration.DefaultStorageClasses }
+ attribute :output_properties, :jsonbHash, default: {}
serialize :environment, Hash
serialize :mounts, Hash
t.add :gateway_address
t.add :interactive_session_started
t.add :output_storage_classes
+ t.add :output_properties
end
# Supported states for a container
def validate_change
permitted = [:state]
- progress_attrs = [:progress, :runtime_status, :log, :output]
+ progress_attrs = [:progress, :runtime_status, :log, :output, :output_properties]
final_attrs = [:exit_code, :finished_at]
if self.new_record?
permitted.push :priority
when Running
- permitted.push :priority, *progress_attrs
+ permitted.push :priority, :output_properties, *progress_attrs
if self.state_changed?
permitted.push :started_at, :gateway_address
end
attribute :properties, :jsonbHash, default: {}
attribute :secret_mounts, :jsonbHash, default: {}
attribute :output_storage_classes, :jsonbArray, default: lambda { Rails.configuration.DefaultStorageClasses }
+ attribute :output_properties, :jsonbHash, default: {}
serialize :environment, Hash
serialize :mounts, Hash
t.add :state
t.add :use_existing
t.add :output_storage_classes
+ t.add :output_properties
end
# Supported states for a container request
:output_path, :priority, :runtime_token,
:runtime_constraints, :state, :container_uuid, :use_existing,
:scheduling_parameters, :secret_mounts, :output_name, :output_ttl,
- :output_storage_classes]
+ :output_storage_classes, :output_properties]
def self.any_preemptible_instances?
Rails.configuration.InstanceTypes.any? do |k, v|
owner_uuid: self.owner_uuid,
name: coll_name,
manifest_text: "",
- storage_classes_desired: self.output_storage_classes,
- properties: {
- 'type' => out_type,
- 'container_request' => uuid,
- })
+ storage_classes_desired: self.output_storage_classes)
end
if out_type == "log"
manifest = dst.manifest_text
end
+ merged_properties = {}
+ merged_properties['container_request'] = uuid
+
+ if out_type == 'output' and !requesting_container_uuid.nil?
+ # output of a child process, give it "intermediate" type by
+ # default.
+ merged_properties['type'] = 'intermediate'
+ else
+ merged_properties['type'] = out_type
+ end
+
+ if out_type == "output"
+ merged_properties.update(container.output_properties)
+ merged_properties.update(self.output_properties)
+ end
+
coll.assign_attributes(
portable_data_hash: Digest::MD5.hexdigest(manifest) + '+' + manifest.bytesize.to_s,
manifest_text: manifest,
trash_at: trash_at,
- delete_at: trash_at)
+ delete_at: trash_at,
+ properties: merged_properties)
coll.save_with_unique_name!
self.send(out_type + '_uuid=', coll.uuid)
end
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+class AddOutputProperties < ActiveRecord::Migration[5.2]
+ def trgm_indexes
+ {
+ "container_requests" => "container_requests_trgm_text_search_idx",
+ }
+ end
+
+ def up
+ add_column :container_requests, :output_properties, :jsonb, default: {}
+ add_column :containers, :output_properties, :jsonb, default: {}
+
+ trgm_indexes.each do |model, indx|
+ execute "DROP INDEX IF EXISTS #{indx}"
+ execute "CREATE INDEX #{indx} ON #{model} USING gin((#{model.classify.constantize.full_text_trgm}) gin_trgm_ops)"
+ end
+ end
+
+ def down
+ remove_column :container_requests, :output_properties
+ remove_column :containers, :output_properties
+
+ trgm_indexes.each do |model, indx|
+ execute "DROP INDEX IF EXISTS #{indx}"
+ execute "CREATE INDEX #{indx} ON #{model} USING gin((#{model.classify.constantize.full_text_trgm}) gin_trgm_ops)"
+ end
+ end
+end
output_ttl integer DEFAULT 0 NOT NULL,
secret_mounts jsonb DEFAULT '{}'::jsonb,
runtime_token text,
- output_storage_classes jsonb DEFAULT '["default"]'::jsonb
+ output_storage_classes jsonb DEFAULT '["default"]'::jsonb,
+ output_properties jsonb DEFAULT '{}'::jsonb
);
lock_count integer DEFAULT 0 NOT NULL,
gateway_address character varying,
interactive_session_started boolean DEFAULT false NOT NULL,
- output_storage_classes jsonb DEFAULT '["default"]'::jsonb
+ output_storage_classes jsonb DEFAULT '["default"]'::jsonb,
+ output_properties jsonb DEFAULT '{}'::jsonb
);
-- Name: container_requests_trgm_text_search_idx; Type: INDEX; Schema: public; Owner: -
--
-CREATE INDEX container_requests_trgm_text_search_idx ON public.container_requests USING gin (((((((((((((((((((((((((((((((((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(description, ''::text)) || ' '::text) || COALESCE((properties)::text, ''::text)) || ' '::text) || (COALESCE(state, ''::character varying))::text) || ' '::text) || (COALESCE(requesting_container_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(container_uuid, ''::character varying))::text) || ' '::text) || COALESCE(runtime_constraints, ''::text)) || ' '::text) || (COALESCE(container_image, ''::character varying))::text) || ' '::text) || COALESCE(environment, ''::text)) || ' '::text) || (COALESCE(cwd, ''::character varying))::text) || ' '::text) || COALESCE(command, ''::text)) || ' '::text) || (COALESCE(output_path, ''::character varying))::text) || ' '::text) || COALESCE(filters, ''::text)) || ' '::text) || COALESCE(scheduling_parameters, ''::text)) || ' '::text) || (COALESCE(output_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(log_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(output_name, ''::character varying))::text)) public.gin_trgm_ops);
+CREATE INDEX container_requests_trgm_text_search_idx ON public.container_requests USING gin (((((((((((((((((((((((((((((((((((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(description, ''::text)) || ' '::text) || COALESCE((properties)::text, ''::text)) || ' '::text) || (COALESCE(state, ''::character varying))::text) || ' '::text) || (COALESCE(requesting_container_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(container_uuid, ''::character varying))::text) || ' '::text) || COALESCE(runtime_constraints, ''::text)) || ' '::text) || (COALESCE(container_image, ''::character varying))::text) || ' '::text) || COALESCE(environment, ''::text)) || ' '::text) || (COALESCE(cwd, ''::character varying))::text) || ' '::text) || COALESCE(command, ''::text)) || ' '::text) || (COALESCE(output_path, ''::character varying))::text) || ' '::text) || COALESCE(filters, ''::text)) || ' '::text) || COALESCE(scheduling_parameters, ''::text)) || ' '::text) || (COALESCE(output_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(log_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(output_name, ''::character varying))::text) || ' '::text) || COALESCE((output_properties)::text, ''::text))) public.gin_trgm_ops);
--
('20220224203102'),
('20220301155729'),
('20220303204419'),
-('20220401153101');
+('20220401153101'),
+('20220505112900');
].each do |token, expected, expected_priority|
test "create as #{token} and expect requesting_container_uuid to be #{expected}" do
set_user_from_auth token
- cr = ContainerRequest.create(container_image: "img", output_path: "/tmp", command: ["echo", "foo"])
+ cr = create_minimal_req!
assert_not_nil cr.uuid, 'uuid should be set for newly created container_request'
assert_equal expected, cr.requesting_container_uuid
assert_equal expected_priority, cr.priority
end
end
+ [
+ ['running_container_auth', 'zzzzz-dz642-runningcontainr', 501],
+ ].each do |token, expected, expected_priority|
+ test "create as #{token} with requesting_container_uuid set and expect output to be intermediate" do
+ set_user_from_auth token
+ cr = create_minimal_req!
+ assert_not_nil cr.uuid, 'uuid should be set for newly created container_request'
+ assert_equal expected, cr.requesting_container_uuid
+ assert_equal expected_priority, cr.priority
+
+ cr.state = ContainerRequest::Committed
+ cr.save!
+
+ run_container(cr)
+ cr.reload
+ output = Collection.find_by_uuid(cr.output_uuid)
+ props = {"type": "intermediate", "container_request": cr.uuid}
+ assert_equal props.symbolize_keys, output.properties.symbolize_keys
+ end
+ end
+
test "create as container_runtime_token and expect requesting_container_uuid to be zzzzz-dz642-20isqbkl8xwnsao" do
set_user_from_auth :container_runtime_token
Thread.current[:token] = "#{Thread.current[:token]}/zzzzz-dz642-20isqbkl8xwnsao"
assert_equal ["foo_storage_class"], output1.storage_classes_desired
assert_equal ["bar_storage_class"], output2.storage_classes_desired
end
+
+ [
+ [{}, {}, {"type": "output"}],
+ [{"a1": "b1"}, {}, {"type": "output", "a1": "b1"}],
+ [{}, {"a1": "b1"}, {"type": "output", "a1": "b1"}],
+ [{"a1": "b1"}, {"a1": "c1"}, {"type": "output", "a1": "b1"}],
+ [{"a1": "b1"}, {"a2": "c2"}, {"type": "output", "a1": "b1", "a2": "c2"}],
+ [{"type": "blah"}, {}, {"type": "blah"}],
+ ].each do |cr_prop, container_prop, expect_prop|
+ test "setting output_properties #{cr_prop} #{container_prop} on current container" do
+ act_as_user users(:active) do
+ cr = create_minimal_req!(priority: 1,
+ state: ContainerRequest::Committed,
+ output_name: 'foo',
+ output_properties: cr_prop)
+
+ act_as_system_user do
+ logc = Collection.new(owner_uuid: system_user_uuid,
+ manifest_text: ". ef772b2f28e2c8ca84de45466ed19ee9+7815 0:0:arv-mount.txt\n")
+ logc.save!
+
+ c = Container.find_by_uuid(cr.container_uuid)
+ c.update_attributes!(state: Container::Locked)
+ c.update_attributes!(state: Container::Running)
+
+ c.update_attributes!(output_properties: container_prop)
+
+ c.update_attributes!(state: Container::Complete,
+ exit_code: 0,
+ output: '1f4b0bc7583c2a7f9102c395f4ffc5e3+45',
+ log: logc.portable_data_hash)
+ logc.destroy
+ end
+
+ cr.reload
+ expect_prop["container_request"] = cr.uuid
+ output = Collection.find_by_uuid(cr.output_uuid)
+ assert_equal expect_prop.symbolize_keys, output.properties.symbolize_keys
+ end
+ end
+ end
+
end
//
// SPDX-License-Identifier: AGPL-3.0
-package main
-
// Dispatcher service for Crunch that submits containers to the slurm queue.
+package dispatchslurm
import (
"context"
- "flag"
"fmt"
"log"
"math"
+ "net/http"
"os"
"regexp"
"strings"
"time"
"git.arvados.org/arvados.git/lib/cmd"
- "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/lib/dispatchcloud"
+ "git.arvados.org/arvados.git/lib/service"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/dispatch"
"github.com/coreos/go-systemd/daemon"
- "github.com/ghodss/yaml"
+ "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
)
+var Command cmd.Handler = service.Command(arvados.ServiceNameDispatchSLURM, newHandler)
+
+func newHandler(ctx context.Context, cluster *arvados.Cluster, _ string, _ *prometheus.Registry) service.Handler {
+ logger := ctxlog.FromContext(ctx)
+ disp := &Dispatcher{logger: logger, cluster: cluster}
+ if err := disp.configure(); err != nil {
+ return service.ErrorHandler(ctx, cluster, err)
+ }
+ disp.setup()
+ go func() {
+ disp.err = disp.run()
+ close(disp.done)
+ }()
+ return disp
+}
+
type logger interface {
dispatch.Logger
Fatalf(string, ...interface{})
const initialNiceValue int64 = 10000
-var (
- version = "dev"
-)
-
type Dispatcher struct {
*dispatch.Dispatcher
logger logrus.FieldLogger
sqCheck *SqueueChecker
slurm Slurm
+ done chan struct{}
+ err error
+
Client arvados.Client
}
-func main() {
- logger := logrus.StandardLogger()
- if os.Getenv("DEBUG") != "" {
- logger.SetLevel(logrus.DebugLevel)
- }
- logger.Formatter = &logrus.JSONFormatter{
- TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
- }
- disp := &Dispatcher{logger: logger}
- err := disp.Run(os.Args[0], os.Args[1:])
- if err != nil {
- logrus.Fatalf("%s", err)
- }
+func (disp *Dispatcher) CheckHealth() error {
+ return disp.err
}
-func (disp *Dispatcher) Run(prog string, args []string) error {
- if err := disp.configure(prog, args); err != nil {
- return err
- }
- disp.setup()
- return disp.run()
+func (disp *Dispatcher) Done() <-chan struct{} {
+ return disp.done
+}
+
+func (disp *Dispatcher) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ http.NotFound(w, r)
}
-// configure() loads config files. Tests skip this.
-func (disp *Dispatcher) configure(prog string, args []string) error {
+// configure() loads config files. Some tests skip this (see
+// StubbedSuite).
+func (disp *Dispatcher) configure() error {
if disp.logger == nil {
disp.logger = logrus.StandardLogger()
}
- flags := flag.NewFlagSet(prog, flag.ContinueOnError)
- flags.Usage = func() { usage(flags) }
-
- loader := config.NewLoader(nil, disp.logger)
- loader.SetupFlags(flags)
-
- dumpConfig := flag.Bool(
- "dump-config",
- false,
- "write current configuration to stdout and exit")
- getVersion := flags.Bool(
- "version",
- false,
- "Print version information and exit.")
-
- args = loader.MungeLegacyConfigArgs(disp.logger, args, "-legacy-crunch-dispatch-slurm-config")
- if ok, code := cmd.ParseFlags(flags, prog, args, "", os.Stderr); !ok {
- os.Exit(code)
- }
-
- // Print version information if requested
- if *getVersion {
- fmt.Printf("crunch-dispatch-slurm %s\n", version)
- return nil
- }
-
- disp.logger.Printf("crunch-dispatch-slurm %s started", version)
-
- cfg, err := loader.Load()
- if err != nil {
- return err
- }
-
- if disp.cluster, err = cfg.GetCluster(""); err != nil {
- return fmt.Errorf("config error: %s", err)
- }
-
disp.logger = disp.logger.WithField("ClusterID", disp.cluster.ClusterID)
+ disp.logger.Printf("crunch-dispatch-slurm %s started", cmd.Version.String())
disp.Client.APIHost = disp.cluster.Services.Controller.ExternalURL.Host
disp.Client.AuthToken = disp.cluster.SystemRootToken
} else {
disp.logger.Warnf("Client credentials missing from config, so falling back on environment variables (deprecated).")
}
-
- if *dumpConfig {
- out, err := yaml.Marshal(cfg)
- if err != nil {
- return err
- }
- _, err = os.Stdout.Write(out)
- if err != nil {
- return err
- }
- }
-
return nil
}
// setup() initializes private fields after configure().
func (disp *Dispatcher) setup() {
+ disp.done = make(chan struct{})
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
disp.logger.Fatalf("Error making Arvados client: %v", err)
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package dispatchslurm
import (
"bytes"
"context"
"errors"
+ "flag"
"fmt"
"io"
"io/ioutil"
"testing"
"time"
+ "git.arvados.org/arvados.git/lib/cmd"
+ "git.arvados.org/arvados.git/lib/config"
"git.arvados.org/arvados.git/lib/dispatchcloud"
"git.arvados.org/arvados.git/sdk/go/arvados"
"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/arvadostest"
+ "git.arvados.org/arvados.git/sdk/go/ctxlog"
"git.arvados.org/arvados.git/sdk/go/dispatch"
"github.com/sirupsen/logrus"
. "gopkg.in/check.v1"
}
func (s *StubbedSuite) TestLoadLegacyConfig(c *C) {
+ log := ctxlog.TestLogger(c)
content := []byte(`
Client:
APIHost: example.com
MinRetryPeriod: 13s
BatchSize: 99
`)
- tmpfile, err := ioutil.TempFile("", "example")
- if err != nil {
- c.Error(err)
- }
-
- defer os.Remove(tmpfile.Name()) // clean up
-
- if _, err := tmpfile.Write(content); err != nil {
- c.Error(err)
- }
- if err := tmpfile.Close(); err != nil {
- c.Error(err)
+ tmpfile := c.MkDir() + "/config.yml"
+ err := ioutil.WriteFile(tmpfile, content, 0777)
+ c.Assert(err, IsNil)
- }
os.Setenv("ARVADOS_KEEP_SERVICES", "")
- err = s.disp.configure("crunch-dispatch-slurm", []string{"-config", tmpfile.Name()})
- c.Check(err, IsNil)
- c.Check(s.disp.cluster.Services.Controller.ExternalURL, Equals, arvados.URL{Scheme: "https", Host: "example.com", Path: "/"})
- c.Check(s.disp.cluster.SystemRootToken, Equals, "abcdefg")
- c.Check(s.disp.cluster.Containers.SLURM.SbatchArgumentsList, DeepEquals, []string{"--foo", "bar"})
- c.Check(s.disp.cluster.Containers.CloudVMs.PollInterval, Equals, arvados.Duration(12*time.Second))
- c.Check(s.disp.cluster.Containers.SLURM.PrioritySpread, Equals, int64(42))
- c.Check(s.disp.cluster.Containers.CrunchRunCommand, Equals, "x-crunch-run")
- c.Check(s.disp.cluster.Containers.CrunchRunArgumentsList, DeepEquals, []string{"--cgroup-parent-subsystem=memory"})
- c.Check(s.disp.cluster.Containers.ReserveExtraRAM, Equals, arvados.ByteSize(12345))
- c.Check(s.disp.cluster.Containers.MinRetryPeriod, Equals, arvados.Duration(13*time.Second))
- c.Check(s.disp.cluster.API.MaxItemsPerResponse, Equals, 99)
- c.Check(s.disp.cluster.Containers.SLURM.SbatchEnvironmentVariables, DeepEquals, map[string]string{
+ flags := flag.NewFlagSet("", flag.ContinueOnError)
+ flags.SetOutput(os.Stderr)
+ loader := config.NewLoader(&bytes.Buffer{}, log)
+ loader.SetupFlags(flags)
+ args := loader.MungeLegacyConfigArgs(log, []string{"-config", tmpfile}, "-legacy-"+string(arvados.ServiceNameDispatchSLURM)+"-config")
+ ok, _ := cmd.ParseFlags(flags, "crunch-dispatch-slurm", args, "", os.Stderr)
+ c.Check(ok, Equals, true)
+ cfg, err := loader.Load()
+ c.Assert(err, IsNil)
+ cluster, err := cfg.GetCluster("")
+ c.Assert(err, IsNil)
+
+ c.Check(cluster.Services.Controller.ExternalURL, Equals, arvados.URL{Scheme: "https", Host: "example.com", Path: "/"})
+ c.Check(cluster.SystemRootToken, Equals, "abcdefg")
+ c.Check(cluster.Containers.SLURM.SbatchArgumentsList, DeepEquals, []string{"--foo", "bar"})
+ c.Check(cluster.Containers.CloudVMs.PollInterval, Equals, arvados.Duration(12*time.Second))
+ c.Check(cluster.Containers.SLURM.PrioritySpread, Equals, int64(42))
+ c.Check(cluster.Containers.CrunchRunCommand, Equals, "x-crunch-run")
+ c.Check(cluster.Containers.CrunchRunArgumentsList, DeepEquals, []string{"--cgroup-parent-subsystem=memory"})
+ c.Check(cluster.Containers.ReserveExtraRAM, Equals, arvados.ByteSize(12345))
+ c.Check(cluster.Containers.MinRetryPeriod, Equals, arvados.Duration(13*time.Second))
+ c.Check(cluster.API.MaxItemsPerResponse, Equals, 99)
+ c.Check(cluster.Containers.SLURM.SbatchEnvironmentVariables, DeepEquals, map[string]string{
"ARVADOS_KEEP_SERVICES": "https://example.com/keep1 https://example.com/keep2",
})
+
+ // Ensure configure() copies SbatchEnvironmentVariables into
+ // the current process's environment (that's how they end up
+ // getting passed to sbatch).
+ s.disp.cluster = cluster
+ s.disp.configure()
c.Check(os.Getenv("ARVADOS_KEEP_SERVICES"), Equals, "https://example.com/keep1 https://example.com/keep2")
}
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package dispatchslurm
import (
"log"
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package dispatchslurm
const defaultSpread int64 = 10
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package dispatchslurm
import (
. "gopkg.in/check.v1"
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package dispatchslurm
import (
"strings"
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package dispatchslurm
import (
. "gopkg.in/check.v1"
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package dispatchslurm
import (
"fmt"
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package dispatchslurm
import (
"bytes"
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package dispatchslurm
import (
"time"
//
// SPDX-License-Identifier: AGPL-3.0
-package main
+package dispatchslurm
import (
"flag"
+++ /dev/null
-// Copyright (C) The Arvados Authors. All rights reserved.
-//
-// SPDX-License-Identifier: AGPL-3.0
-
-package main
-
-import (
- "context"
- "os"
-
- "git.arvados.org/arvados.git/lib/cmd"
- "git.arvados.org/arvados.git/lib/service"
- "git.arvados.org/arvados.git/sdk/go/arvados"
- "git.arvados.org/arvados.git/sdk/go/health"
- "github.com/prometheus/client_golang/prometheus"
-)
-
-var (
- version = "dev"
- command cmd.Handler = service.Command(arvados.ServiceNameHealth, newHandler)
-)
-
-func newHandler(ctx context.Context, cluster *arvados.Cluster, _ string, _ *prometheus.Registry) service.Handler {
- return &health.Aggregator{Cluster: cluster}
-}
-
-func main() {
- os.Exit(command.RunCommand(os.Args[0], os.Args[1:], os.Stdin, os.Stdout, os.Stderr))
-}
RUN apt-key add --no-tty /tmp/8D81803C0EBFCD88.asc && \
rm -f /tmp/8D81803C0EBFCD88.asc
-RUN mkdir -p /etc/apt/sources.list.d && \
- echo deb https://download.docker.com/linux/debian/ buster stable > /etc/apt/sources.list.d/docker.list && \
- apt-get update && \
- apt-get -yq --no-install-recommends install docker-ce=5:20.10.6~3-0~debian-buster && \
- apt-get clean
+# docker is now installed by arvados-server install
+# RUN mkdir -p /etc/apt/sources.list.d && \
+# echo deb https://download.docker.com/linux/debian/ buster stable > /etc/apt/sources.list.d/docker.list && \
+# apt-get update && \
+# apt-get -yq --no-install-recommends install docker-ce=5:20.10.6~3-0~debian-buster && \
+# apt-get clean
# Set UTF-8 locale
RUN echo en_US.UTF-8 UTF-8 > /etc/locale.gen && locale-gen
exit
fi
+API_HOST=${localip}:${services[controller-ssl]}
+
+if test -f /usr/src/workbench2/public/API_HOST ; then
+ API_HOST=$(cat /usr/src/workbench2/public/API_HOST)
+fi
+
cat <<EOF > /usr/src/workbench2/public/config.json
{
- "API_HOST": "${localip}:${services[controller-ssl]}",
- "VOCABULARY_URL": "/vocabulary-example.json",
- "FILE_VIEWERS_CONFIG_URL": "/file-viewers-example.json"
+ "API_HOST": "$API_HOST"
}
EOF
- proxy_set_header: 'X-Real-IP $remote_addr'
- proxy_set_header: 'X-Forwarded-For $proxy_add_x_forwarded_for'
- proxy_set_header: 'X-External-Client $external_client'
+ - proxy_max_temp_file_size: 0
+ - proxy_request_buffering: 'off'
+ - proxy_buffering: 'off'
+ - proxy_http_version: '1.1'
- include: snippets/ssl_hardening_default.conf
- ssl_certificate: __CERT_PEM__
- ssl_certificate_key: __CERT_KEY__
- proxy_set_header: 'X-Real-IP $remote_addr'
- proxy_set_header: 'X-Forwarded-For $proxy_add_x_forwarded_for'
- proxy_set_header: 'X-External-Client $external_client'
+ - proxy_max_temp_file_size: 0
+ - proxy_request_buffering: 'off'
+ - proxy_buffering: 'off'
+ - proxy_http_version: '1.1'
- include: snippets/ssl_hardening_default.conf
- ssl_certificate: __CERT_PEM__
- ssl_certificate_key: __CERT_KEY__
- proxy_set_header: 'X-Real-IP $remote_addr'
- proxy_set_header: 'X-Forwarded-For $proxy_add_x_forwarded_for'
- proxy_set_header: 'X-External-Client $external_client'
+ - proxy_max_temp_file_size: 0
+ - proxy_request_buffering: 'off'
+ - proxy_buffering: 'off'
+ - proxy_http_version: '1.1'
- include: snippets/ssl_hardening_default.conf
- ssl_certificate: __CERT_PEM__
- ssl_certificate_key: __CERT_KEY__