*.gz.report
*.ico
*.jpg
+*.svg
+*.odg
*.json
*LICENSE*.html
.licenseignore
services/keepproxy/pkg-extras/etc/default/keepproxy
*.tar
tools/crunchstat-summary/tests/crunchstat_error_messages.txt
+tools/crunchstat-summary/crunchstat_summary/synchronizer.js
+build/package-build-dockerfiles/debian9/D39DC0E3.asc
+build/package-test-dockerfiles/debian9/D39DC0E3.asc
+sdk/R/DESCRIPTION
+sdk/R/NAMESPACE
+sdk/R/.Rbuildignore
+sdk/R/ArvadosR.Rproj
+*.Rd
President and Fellows of Harvard College <*@harvard.edu>
Thomas Mooney <tmooney@genome.wustl.edu>
Chen Chen <aflyhorse@gmail.com>
+Veritas Genetics, Inc. <*@veritasgenetics.com>
+<!-- Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: AGPL-3.0 -->
+
<div data-mount-mithril="TestComponent"></div>
+<%# Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: AGPL-3.0 %>
+
<%= javascript_tag do %>
function update_visibility() {
if (sessionStorage.getItem('link_account_api_token') &&
end
if Capybara.current_driver == :selenium
page.execute_script("window.localStorage.clear()")
+ else
+ page.driver.restart if defined?(page.driver.restart)
end
Capybara.reset_sessions!
end
#distribution(s)|name|version|iteration|type|architecture|extra fpm arguments
debian8,debian9,centos7|python-gflags|2.0|2|python|all
debian8,debian9,ubuntu1404,ubuntu1604,centos7|google-api-python-client|1.6.2|2|python|all
-debian8,debian9,ubuntu1404,ubuntu1604,centos7|apache-libcloud|2.3.0|3|python|all|--depends 'python-requests >= 2.4.3'
debian8,debian9,ubuntu1404,centos7|oauth2client|1.5.2|2|python|all
debian8,debian9,ubuntu1404,centos7|pyasn1|0.1.7|2|python|all
debian8,debian9,ubuntu1404,centos7|pyasn1-modules|0.0.5|2|python|all
centos7|pyparsing|2.1.10|2|python|all
centos7|keepalive|0.5|2|python|all
debian8,debian9,ubuntu1404,ubuntu1604,centos7|lockfile|0.12.2|2|python|all|--epoch 1
-debian8,debian9,ubuntu1404,ubuntu1604,centos7|subprocess32|3.5.0rc1|2|python|all
+debian8,debian9,ubuntu1404,ubuntu1604,centos7|subprocess32|3.5.1|2|python|all
all|ruamel.yaml|0.14.12|2|python|amd64|--python-setup-py-arguments --single-version-externally-managed
-all|cwltest|1.0.20180416154033|4|python|all|--depends 'python-futures >= 3.0.5' --depends 'python-subprocess32'
+all|cwltest|1.0.20180518074130|4|python|all|--depends 'python-futures >= 3.0.5' --depends 'python-subprocess32 >= 3.5.0'
all|junit-xml|1.8|3|python|all
all|rdflib-jsonld|0.4.0|2|python|all
all|futures|3.0.5|2|python|all
| */nodemanager/doc/*.cfg \
| */nodemanager/tests/fake*.cfg.template \
| */nginx.conf \
- | build/build.list)
+ | build/build.list | *.R)
fixer=fixer
cc="#"
;;
wantBYSAmd="[comment]: # (Copyright © The Arvados Authors. All rights reserved.)
[comment]: # ()
[comment]: # (SPDX-License-Identifier: CC-BY-SA-3.0)"
- found=$(head -n20 "$fnm" | egrep -A${grepAfter} -B${grepBefore} 'Copyright.*Arvados' || true)
+ found=$(head -n20 "$fnm" | egrep -A${grepAfter} -B${grepBefore} 'Copyright.*All rights reserved.' || true)
case ${fnm} in
Makefile | build/* | lib/* | tools/* | apps/* | services/* | sdk/cli/bin/crunch-job)
want=${wantGPL}
#
# SPDX-License-Identifier: AGPL-3.0
-LIBCLOUD_PIN=2.3.0
+LIBCLOUD_PIN=2.3.1.dev1
-using_fork=false
+using_fork=true
if [[ $using_fork = true ]]; then
LIBCLOUD_PIN_SRC="https://github.com/curoverse/libcloud/archive/apache-libcloud-$LIBCLOUD_PIN.zip"
else
fi
test_package_presence ${PYTHON2_PKG_PREFIX}-arvados-cwl-runner "$arvados_cwl_runner_version" python "$arvados_cwl_runner_iteration"
if [[ "$?" == "0" ]]; then
- fpm_build $WORKSPACE/sdk/cwl "${PYTHON2_PKG_PREFIX}-arvados-cwl-runner" 'Curoverse, Inc.' 'python' "$arvados_cwl_runner_version" "--url=https://arvados.org" "--description=The Arvados CWL runner" --depends "${PYTHON2_PKG_PREFIX}-setuptools" --depends "${PYTHON2_PKG_PREFIX}-subprocess32 >= 3.5.0rc1" --depends "${PYTHON2_PKG_PREFIX}-pathlib2" --depends "${PYTHON2_PKG_PREFIX}-scandir" "${iterargs[@]}"
+ fpm_build $WORKSPACE/sdk/cwl "${PYTHON2_PKG_PREFIX}-arvados-cwl-runner" 'Curoverse, Inc.' 'python' "$arvados_cwl_runner_version" "--url=https://arvados.org" "--description=The Arvados CWL runner" --depends "${PYTHON2_PKG_PREFIX}-setuptools" --depends "${PYTHON2_PKG_PREFIX}-subprocess32 >= 3.5.0" --depends "${PYTHON2_PKG_PREFIX}-pathlib2" --depends "${PYTHON2_PKG_PREFIX}-scandir" "${iterargs[@]}"
fi
# schema_salad. This is a python dependency of arvados-cwl-runner,
fpm_build $WORKSPACE/tools/crunchstat-summary ${PYTHON2_PKG_PREFIX}-crunchstat-summary 'Curoverse, Inc.' 'python' "$crunchstat_summary_version" "--url=https://arvados.org" "--description=Crunchstat-summary reads Arvados Crunch log files and summarize resource usage" --iteration "$iteration"
fi
-## if libcloud becomes our own fork see
-## https://dev.arvados.org/issues/12268#note-27
+# Forked libcloud
+if test_package_presence "$PYTHON2_PKG_PREFIX"-apache-libcloud "$LIBCLOUD_PIN" python 2
+then
+ LIBCLOUD_DIR=$(mktemp -d)
+ (
+ cd $LIBCLOUD_DIR
+ git clone $DASHQ_UNLESS_DEBUG https://github.com/curoverse/libcloud.git .
+ git checkout $DASHQ_UNLESS_DEBUG apache-libcloud-$LIBCLOUD_PIN
+ # libcloud is absurdly noisy without -q, so force -q here
+ OLD_DASHQ_UNLESS_DEBUG=$DASHQ_UNLESS_DEBUG
+ DASHQ_UNLESS_DEBUG=-q
+ handle_python_package
+ DASHQ_UNLESS_DEBUG=$OLD_DASHQ_UNLESS_DEBUG
+ )
+
+ # libcloud >= 2.3.0 now requires python-requests 2.4.3 or higher, otherwise
+ # it throws
+ # ImportError: No module named packages.urllib3.poolmanager
+ # when loaded. We only see this problem on ubuntu1404, because that is our
+ # only supported distribution that ships with a python-requests older than
+ # 2.4.3.
+ fpm_build $LIBCLOUD_DIR "$PYTHON2_PKG_PREFIX"-apache-libcloud "" python "" --iteration 2 --depends 'python-requests >= 2.4.3'
+ rm -rf $LIBCLOUD_DIR
+fi
# Python 2 dependencies
declare -a PIP_DOWNLOAD_SWITCHES=(--no-deps)
declare -A skip
declare -A testargs
skip[apps/workbench_profile]=1
+# nodemanager_integration tests are not reliable, see #12061.
+skip[services/nodemanager_integration]=1
while [[ -n "$1" ]]
do
set -e
mkdir -p "$GOPATH/src/git.curoverse.com"
rmdir -v --parents --ignore-fail-on-non-empty "${temp}/GOPATH"
+ if [[ ! -h "$GOPATH/src/git.curoverse.com/arvados.git" ]]; then
+ for d in \
+ "$GOPATH/src/git.curoverse.com/arvados.git/tmp/GOPATH" \
+ "$GOPATH/src/git.curoverse.com/arvados.git/tmp" \
+ "$GOPATH/src/git.curoverse.com/arvados.git"; do
+ [[ -d "$d" ]] && rmdir "$d"
+ done
+ fi
for d in \
- "$GOPATH/src/git.curoverse.com/arvados.git/arvados.git" \
- "$GOPATH/src/git.curoverse.com/arvados.git"; do
- [[ -d "$d" ]] && rmdir "$d"
+ "$GOPATH/src/git.curoverse.com/arvados.git/arvados" \
+ "$GOPATH/src/git.curoverse.com/arvados.git"; do
[[ -h "$d" ]] && rm "$d"
done
- ln -vsnfT "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git"
+ ln -vsfT "$WORKSPACE" "$GOPATH/src/git.curoverse.com/arvados.git"
go get -v github.com/kardianos/govendor
cd "$GOPATH/src/git.curoverse.com/arvados.git"
if [[ -n "$short" ]]; then
|@is_a@|string|Arvados object type|@["head_uuid","is_a","arvados#collection"]@|
|@exists@|string|Test if a subproperty is present.|@["properties","exists","my_subproperty"]@|
-h4. Filtering on subproperties
+h4(#subpropertyfilters). Filtering on subproperties
Some record type have an additional @properties@ attribute that allows recording and filtering on additional key-value pairs. To filter on a subproperty, the value in the @attribute@ position has the form @properties.user_property@. You may also use JSON-LD / RDF style URIs for property keys by enclosing them in @<...>@ for example @properties.<http://example.com/user_property>@. Alternately you may also provide a JSON-LD "@context" field, however at this time JSON-LD contexts are not interpreted by Arvados.
|_. Attribute|_. Type|_. Description|_. Example|
|name|string|||
|description|text|||
+|properties|hash|User-defined metadata, may be used in queries using "subproperty filters":{{site.baseurl}}/api/methods.html#subpropertyfilters ||
|portable_data_hash|string|The MD5 sum of the manifest text stripped of block hints other than the size hint.||
|manifest_text|text|||
|replication_desired|number|Minimum storage replication level desired for each data block referenced by this collection. A value of @null@ signifies that the site default replication level (typically 2) is desired.|@2@|
|_. Attribute|_. Type|_. Description|_. Notes|
|name|string|The name of the container_request.||
|description|string|The description of the container_request.||
-|properties|hash|Client-defined structured data that does not affect how the container is run.||
+|properties|hash|User-defined metadata that does not affect how the container is run. May be used in queries using "subproperty filters":{{site.baseurl}}/api/methods.html#subpropertyfilters||
|state|string|The allowed states are "Uncommitted", "Committed", and "Final".|Once a request is Committed, the only attributes that can be modified are priority, container_uuid, and container_count_max. A request in the "Final" state cannot have any of its functional parts modified (i.e., only name, description, and properties fields can be modified).|
|requesting_container_uuid|string|The uuid of the parent container that created this container_request, if any. Represents a process tree.|The priority of this container_request is inherited from the parent container, if the parent container is cancelled, this container_request will be cancelled as well.|
|container_uuid|string|The uuid of the container that satisfies this container_request. The system may return a preexisting Container that matches the container request criteria. See "Container reuse":#container_reuse for more details.|Container reuse is the default behavior, but may be disabled with @use_existing: false@ to always create a new container.|
|group_class|string|Type of group. This does not affect behavior, but determines how the group is presented in the user interface. For example, @project@ indicates that the group should be displayed by Workbench and arv-mount as a project for organizing and naming objects.|@"project"@
null|
|description|text|||
+|properties|hash|User-defined metadata, may be used in queries using "subproperty filters":{{site.baseurl}}/api/methods.html#subpropertyfilters ||
|writable_by|array|List of UUID strings identifying Users and other Groups that have write permission for this Group. Only users who are allowed to administer the Group will receive a full list. Other users will receive a partial list that includes the Group's owner_uuid and (if applicable) their own user UUID.||
|trash_at|datetime|If @trash_at@ is non-null and in the past, this group and all objects directly or indirectly owned by the group will be hidden from API calls. May be untrashed.||
|delete_at|datetime|If @delete_at@ is non-null and in the past, the group and all objects directly or indirectly owned by the group may be permanently deleted.||
|tail_uuid|string|The origin or actor in the description or action (may be null).|
|link_class|string|Type of link|
|name|string|Primary value of the link.|
-|properties|hash|Additional information, expressed as a key→value hash. Key: string. Value: string, number, array, or hash.|
+|properties|hash|Additional information, expressed as a key→value hash. Key: string. Value: string, number, array, or hash. May be used in queries using "subproperty filters":{{site.baseurl}}/api/methods.html#subpropertyfilters|
h2. Link classes
|job_uuid|string|The UUID of the job that this node is assigned to work on. If you do not have permission to read the job, this will be null.||
|first_ping_at|datetime|||
|last_ping_at|datetime|||
-|info|hash|||
+|info|hash|Sensitive information about the node (only visible to admin) such as 'ping_secret' and 'ec2_instance_id'. May be used in queries using "subproperty filters":{{site.baseurl}}/api/methods.html#subpropertyfilters||
+|properties|hash|Public information about the node, such as 'total_cpu_cores', 'total_ram_mb', and 'total_scratch_mb'. May be used in queries using "subproperty filters":{{site.baseurl}}/api/methods.html#subpropertyfilters||
h2. Methods
+/* Copyright (C) The Arvados Authors. All rights reserved.
+
+SPDX-License-Identifier: CC-BY-SA-3.0 */
+
img.full-width {
width: 100%
}
"errors"
"log"
"os/exec"
+ "sort"
"strings"
"time"
)
var (
- ErrConstraintsNotSatisfiable = errors.New("constraints not satisfiable by any configured instance type")
ErrInstanceTypesNotConfigured = errors.New("site configuration does not list any instance types")
discountConfiguredRAMPercent = 5
)
+// ConstraintsNotSatisfiableError includes a list of available instance types
+// to be reported back to the user.
+type ConstraintsNotSatisfiableError struct {
+ error
+ AvailableTypes []arvados.InstanceType
+}
+
// ChooseInstanceType returns the cheapest available
// arvados.InstanceType big enough to run ctr.
func ChooseInstanceType(cc *arvados.Cluster, ctr *arvados.Container) (best arvados.InstanceType, err error) {
needRAM := ctr.RuntimeConstraints.RAM + ctr.RuntimeConstraints.KeepCacheRAM
needRAM = (needRAM * 100) / int64(100-discountConfiguredRAMPercent)
- err = ErrConstraintsNotSatisfiable
+ availableTypes := make([]arvados.InstanceType, len(cc.InstanceTypes))
+ copy(availableTypes, cc.InstanceTypes)
+ sort.Slice(availableTypes, func(a, b int) bool {
+ return availableTypes[a].Price < availableTypes[b].Price
+ })
+ err = ConstraintsNotSatisfiableError{
+ errors.New("constraints not satisfiable by any configured instance type"),
+ availableTypes,
+ }
for _, it := range cc.InstanceTypes {
switch {
case err == nil && it.Price > best.Price:
case it.Scratch < needScratch:
case it.RAM < needRAM:
case it.VCPUs < needVCPUs:
+ case it.Preemptable != ctr.SchedulingParameters.Preemptable:
case it.Price == best.Price && (it.RAM < best.RAM || it.VCPUs < best.VCPUs):
// Equal price, but worse specs
default:
{Price: 2.2, RAM: 2000000000, VCPUs: 4, Name: "small2"},
{Price: 4.4, RAM: 4000000000, VCPUs: 8, Name: "small4", Scratch: GiB},
}}, ctr)
- c.Check(err, check.Equals, ErrConstraintsNotSatisfiable)
+ c.Check(err, check.FitsTypeOf, ConstraintsNotSatisfiableError{})
}
for _, rc := range []arvados.RuntimeConstraints{
c.Check(best.Scratch >= 2*GiB, check.Equals, true)
}
}
+
+func (*NodeSizeSuite) TestChoosePreemptable(c *check.C) {
+ menu := []arvados.InstanceType{
+ {Price: 4.4, RAM: 4000000000, VCPUs: 8, Scratch: 2 * GiB, Preemptable: true, Name: "costly"},
+ {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: 2 * GiB, Name: "almost best"},
+ {Price: 2.2, RAM: 2000000000, VCPUs: 4, Scratch: 2 * GiB, Preemptable: true, Name: "best"},
+ {Price: 1.1, RAM: 1000000000, VCPUs: 2, Scratch: 2 * GiB, Preemptable: true, Name: "small"},
+ }
+ best, err := ChooseInstanceType(&arvados.Cluster{InstanceTypes: menu}, &arvados.Container{
+ Mounts: map[string]arvados.Mount{
+ "/tmp": {Kind: "tmp", Capacity: 2 * GiB},
+ },
+ RuntimeConstraints: arvados.RuntimeConstraints{
+ VCPUs: 2,
+ RAM: 987654321,
+ KeepCacheRAM: 123456789,
+ },
+ SchedulingParameters: arvados.SchedulingParameters{
+ Preemptable: true,
+ },
+ })
+ c.Check(err, check.IsNil)
+ c.Check(best.Name, check.Equals, "best")
+ c.Check(best.RAM >= 1234567890, check.Equals, true)
+ c.Check(best.VCPUs >= 2, check.Equals, true)
+ c.Check(best.Scratch >= 2*GiB, check.Equals, true)
+ c.Check(best.Preemptable, check.Equals, true)
+}
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
#' users.get
#'
#' users.get is a method defined in Arvados class.
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
source("./R/util.R")
#' ArvadosFile
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
source("./R/Subcollection.R")
source("./R/ArvadosFile.R")
source("./R/RESTService.R")
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
source("./R/Subcollection.R")
source("./R/ArvadosFile.R")
source("./R/util.R")
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
HttpParser <- R6::R6Class(
"HttrParser",
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
source("./R/util.R")
HttpRequest <- R6::R6Class(
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
RESTService <- R6::R6Class(
"RESTService",
headers <- list(Authorization = paste("OAuth2", self$token))
- serverResponse <- self$http$execute("GET", discoveryDocumentURL, headers,
- retryTimes = self$numRetries)
+ serverResponse <- self$http$exec("GET", discoveryDocumentURL, headers,
+ retryTimes = self$numRetries)
discoveryDocument <- self$httpParser$parseJSONResponse(serverResponse)
private$webDavHostName <- discoveryDocument$keepWebServiceUrl
uuid, "/", relativePath);
headers <- list(Authorization = paste("OAuth2", self$token))
- serverResponse <- self$http$execute("DELETE", fileURL, headers,
- retryTimes = self$numRetries)
+ serverResponse <- self$http$exec("DELETE", fileURL, headers,
+ retryTimes = self$numRetries)
if(serverResponse$status_code < 200 || serverResponse$status_code >= 300)
stop(paste("Server code:", serverResponse$status_code))
headers <- list("Authorization" = paste("OAuth2", self$token),
"Destination" = toURL)
- serverResponse <- self$http$execute("MOVE", fromURL, headers,
- retryTimes = self$numRetries)
+ serverResponse <- self$http$exec("MOVE", fromURL, headers,
+ retryTimes = self$numRetries)
if(serverResponse$status_code < 200 || serverResponse$status_code >= 300)
stop(paste("Server code:", serverResponse$status_code))
headers <- list("Authorization" = paste("OAuth2", self$token))
- response <- self$http$execute("PROPFIND", collectionURL, headers,
- retryTimes = self$numRetries)
+ response <- self$http$exec("PROPFIND", collectionURL, headers,
+ retryTimes = self$numRetries)
if(all(response == ""))
stop("Response is empty, request may be misconfigured")
headers <- list("Authorization" = paste("OAuth2", self$token))
- response <- self$http$execute("PROPFIND", subcollectionURL, headers,
- retryTimes = self$numRetries)
+ response <- self$http$exec("PROPFIND", subcollectionURL, headers,
+ retryTimes = self$numRetries)
if(all(response == ""))
stop("Response is empty, request may be misconfigured")
if(!(contentType %in% self$httpParser$validContentTypes))
stop("Invalid contentType. Please use text or raw.")
- serverResponse <- self$http$execute("GET", fileURL, headers,
- retryTimes = self$numRetries)
+ serverResponse <- self$http$exec("GET", fileURL, headers,
+ retryTimes = self$numRetries)
if(serverResponse$status_code < 200 || serverResponse$status_code >= 300)
stop(paste("Server code:", serverResponse$status_code))
"Content-Type" = contentType)
body <- content
- serverResponse <- self$http$execute("PUT", fileURL, headers, body,
- retryTimes = self$numRetries)
+ serverResponse <- self$http$exec("PUT", fileURL, headers, body,
+ retryTimes = self$numRetries)
if(serverResponse$status_code < 200 || serverResponse$status_code >= 300)
stop(paste("Server code:", serverResponse$status_code))
"Content-Type" = contentType)
body <- NULL
- serverResponse <- self$http$execute("PUT", fileURL, headers, body,
- retryTimes = self$numRetries)
+ serverResponse <- self$http$exec("PUT", fileURL, headers, body,
+ retryTimes = self$numRetries)
if(serverResponse$status_code < 200 || serverResponse$status_code >= 300)
stop(paste("Server code:", serverResponse$status_code))
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
source("./R/util.R")
#' Subcollection
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
getAPIDocument <- function(){
url <- "https://4xphq.arvadosapi.com/discovery/v1/apis/arvados/v1/rest"
serverResponse <- httr::RETRY("GET", url = url)
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
#' listAll
#'
#' List all resources even if the number of items is greater than maximum API limit.
+[comment]: # (Copyright (c) The Arvados Authors. All rights reserved.)
+[comment]: # ()
+[comment]: # (SPDX-License-Identifier: CC-BY-SA-3.0)
+
## R SDK for Arvados
This SDK focuses on providing support for accessing Arvados projects, collections, and the files within collections.
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
#Run script with $Rscript createDoc.R input.Rmd output.html
require(knitr) # required for knitting from rmd to md
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
options(repos=structure(c(CRAN="http://cran.wustl.edu/")))
if (!requireNamespace("devtools")) {
install.packages("devtools")
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
results <- devtools::test()
any_error <- any(as.data.frame(results)$error)
if (any_error) {
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
library(testthat)
library(ArvadosR)
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
FakeArvados <- R6::R6Class(
"FakeArvados",
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
FakeHttpParser <- R6::R6Class(
"FakeHttrParser",
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
FakeHttpRequest <- R6::R6Class(
"FakeHttpRequest",
self$serverMaxElementsPerRequest <- 5
},
- execute = function(verb, url, headers = NULL, body = NULL, query = NULL,
- limit = NULL, offset = NULL, retryTimes = 0)
+ exec = function(verb, url, headers = NULL, body = NULL, query = NULL,
+ limit = NULL, offset = NULL, retryTimes = 0)
{
private$validateURL(url)
private$validateHeaders(headers)
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
FakeRESTService <- R6::R6Class(
"FakeRESTService",
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
source("fakes/FakeRESTService.R")
context("ArvadosFile")
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
source("fakes/FakeRESTService.R")
context("Collection")
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
context("CollectionTree")
test_that("constructor creates file tree from character array properly", {
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
context("Http Parser")
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
context("Http Request")
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
source("fakes/FakeArvados.R")
source("fakes/FakeHttpRequest.R")
source("fakes/FakeHttpParser.R")
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
source("fakes/FakeRESTService.R")
context("Subcollection")
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
context("Utility function")
test_that("listAll always returns all resource items from server", {
from .arvcontainer import ArvadosContainer, RunnerContainer
from .arvjob import ArvadosJob, RunnerJob, RunnerTemplate
-from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps, upload_dependencies
+from. runner import Runner, upload_docker, upload_job_order, upload_workflow_deps
from .arvtool import ArvadosCommandTool
from .arvworkflow import ArvadosWorkflow, upload_workflow
from .fsaccess import CollectionFsAccess, CollectionFetcher, collectionResolver, CollectionCache
self.workflow_eval_lock = threading.Condition(threading.RLock())
self.final_output = None
self.final_status = None
- self.uploaded = {}
self.num_retries = num_retries
self.uuid = None
self.stop_polling = threading.Event()
finally:
self.stop_polling.set()
- def get_uploaded(self):
- return self.uploaded.copy()
-
- def add_uploaded(self, src, pair):
- self.uploaded[src] = pair
-
def add_intermediate_output(self, uuid):
if uuid:
self.intermediate_output_collections.append(uuid)
if self.intermediate_output_ttl < 0:
raise Exception("Invalid value %d for --intermediate-output-ttl, cannot be less than zero" % self.intermediate_output_ttl)
+ if kwargs.get("submit_request_uuid") and self.work_api != "containers":
+ raise Exception("--submit-request-uuid requires containers API, but using '{}' api".format(self.work_api))
+
if not kwargs.get("name"):
kwargs["name"] = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
help="Docker image for workflow runner job, default arvados/jobs:%s" % __version__,
default=None)
+ parser.add_argument("--submit-request-uuid", type=str,
+ default=None,
+ help="Update and commit supplied container request instead of creating a new one (containers API only).")
+
parser.add_argument("--name", type=str,
help="Name to use for workflow execution instance.",
default=None)
container_request = {
"command": self.command_line,
- "owner_uuid": self.arvrunner.project_uuid,
"name": self.name,
"output_path": self.outdir,
"cwd": self.outdir,
}
runtime_constraints = {}
+ if self.arvrunner.project_uuid:
+ container_request["owner_uuid"] = self.arvrunner.project_uuid
+
if self.arvrunner.secret_store.has_secret(self.command_line):
raise WorkflowException("Secret material leaked on command line, only file literals may contain secrets")
self.output_callback = self.arvrunner.get_wrapped_callback(self.output_callback)
try:
- response = self.arvrunner.api.container_requests().create(
- body=container_request
- ).execute(num_retries=self.arvrunner.num_retries)
+ if kwargs.get("submit_request_uuid"):
+ response = self.arvrunner.api.container_requests().update(
+ uuid=kwargs["submit_request_uuid"],
+ body=container_request
+ ).execute(num_retries=self.arvrunner.num_retries)
+ else:
+ response = self.arvrunner.api.container_requests().create(
+ body=container_request
+ ).execute(num_retries=self.arvrunner.num_retries)
self.uuid = response["uuid"]
self.arvrunner.process_submitted(self)
self.job_order[param] = {"$include": mnt}
container_req = {
- "owner_uuid": self.arvrunner.project_uuid,
"name": self.name,
"output_path": "/var/spool/cwl",
"cwd": "/var/spool/cwl",
def run(self, **kwargs):
kwargs["keepprefix"] = "keep:"
job_spec = self.arvados_job_spec(**kwargs)
- job_spec.setdefault("owner_uuid", self.arvrunner.project_uuid)
+ if self.arvrunner.project_uuid:
+ job_spec["owner_uuid"] = self.arvrunner.project_uuid
- response = self.arvrunner.api.container_requests().create(
- body=job_spec
- ).execute(num_retries=self.arvrunner.num_retries)
+ if kwargs.get("submit_request_uuid"):
+ response = self.arvrunner.api.container_requests().update(
+ uuid=kwargs["submit_request_uuid"],
+ body=job_spec
+ ).execute(num_retries=self.arvrunner.num_retries)
+ else:
+ response = self.arvrunner.api.container_requests().create(
+ body=job_spec
+ ).execute(num_retries=self.arvrunner.num_retries)
self.uuid = response["uuid"]
self.arvrunner.process_submitted(self)
# Workaround for #13365
builderargs = kwargs.copy()
builderargs["toplevel"] = True
+ builderargs["tmp_outdir_prefix"] = ""
builder = self._init_job(joborder, **builderargs)
joborder = builder.job
})
kwargs["loader"] = self.doc_loader
kwargs["avsc_names"] = self.doc_schema
+ kwargs["metadata"] = self.metadata
return ArvadosCommandTool(self.arvrunner, wf_runner, **kwargs).job(joborder_resolved, output_callback, **kwargs)
else:
return super(ArvadosWorkflow, self).job(joborder, output_callback, **kwargs)
args.priority = arvados_cwl.DEFAULT_PRIORITY
args.do_validate = True
args.disable_js_validation = False
+ args.tmp_outdir_prefix = "tmp"
runner.arv_executor(t, job_order_object, **vars(args))
except Exception as e:
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import requests
+import email.utils
+import time
+import datetime
+import re
+import arvados
+import arvados.collection
+import urlparse
+import logging
+import calendar
+
+logger = logging.getLogger('arvados.cwl-runner')
+
+def my_formatdate(dt):
+ return email.utils.formatdate(timeval=calendar.timegm(dt.timetuple()),
+ localtime=False, usegmt=True)
+
+def my_parsedate(text):
+ parsed = email.utils.parsedate_tz(text)
+ if parsed:
+ if parsed[9]:
+ # Adjust to UTC
+ return datetime.datetime(*parsed[:6]) + datetime.timedelta(seconds=parsed[9])
+ else:
+ # TZ is zero or missing, assume UTC.
+ return datetime.datetime(*parsed[:6])
+ else:
+ return datetime.datetime(1970, 1, 1)
+
+def fresh_cache(url, properties, now):
+ pr = properties[url]
+ expires = None
+
+ logger.debug("Checking cache freshness for %s using %s", url, pr)
+
+ if "Cache-Control" in pr:
+ if re.match(r"immutable", pr["Cache-Control"]):
+ return True
+
+ g = re.match(r"(s-maxage|max-age)=(\d+)", pr["Cache-Control"])
+ if g:
+ expires = my_parsedate(pr["Date"]) + datetime.timedelta(seconds=int(g.group(2)))
+
+ if expires is None and "Expires" in pr:
+ expires = my_parsedate(pr["Expires"])
+
+ if expires is None:
+ # Use a default cache time of 24 hours if upstream didn't set
+ # any cache headers, to reduce redundant downloads.
+ expires = my_parsedate(pr["Date"]) + datetime.timedelta(hours=24)
+
+ if not expires:
+ return False
+
+ return (now < expires)
+
+def remember_headers(url, properties, headers, now):
+ properties.setdefault(url, {})
+ for h in ("Cache-Control", "ETag", "Expires", "Date", "Content-Length"):
+ if h in headers:
+ properties[url][h] = headers[h]
+ if "Date" not in headers:
+ properties[url]["Date"] = my_formatdate(now)
+
+
+def changed(url, properties, now):
+ req = requests.head(url, allow_redirects=True)
+ remember_headers(url, properties, req.headers, now)
+
+ if req.status_code != 200:
+ raise Exception("Got status %s" % req.status_code)
+
+ pr = properties[url]
+ if "ETag" in pr and "ETag" in req.headers:
+ if pr["ETag"] == req.headers["ETag"]:
+ return False
+
+ return True
+
+def http_to_keep(api, project_uuid, url, utcnow=datetime.datetime.utcnow):
+ r = api.collections().list(filters=[["properties", "exists", url]]).execute()
+
+ now = utcnow()
+
+ for item in r["items"]:
+ properties = item["properties"]
+ if fresh_cache(url, properties, now):
+ # Do nothing
+ cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
+ return "keep:%s/%s" % (item["portable_data_hash"], cr.keys()[0])
+
+ if not changed(url, properties, now):
+ # ETag didn't change, same content, just update headers
+ api.collections().update(uuid=item["uuid"], body={"collection":{"properties": properties}}).execute()
+ cr = arvados.collection.CollectionReader(item["portable_data_hash"], api_client=api)
+ return "keep:%s/%s" % (item["portable_data_hash"], cr.keys()[0])
+
+ properties = {}
+ req = requests.get(url, stream=True, allow_redirects=True)
+
+ if req.status_code != 200:
+ raise Exception("Failed to download '%s' got status %s " % (url, req.status_code))
+
+ remember_headers(url, properties, req.headers, now)
+
+ if "Content-Length" in properties[url]:
+ cl = int(properties[url]["Content-Length"])
+ logger.info("Downloading %s (%s bytes)", url, cl)
+ else:
+ cl = None
+ logger.info("Downloading %s (unknown size)", url)
+
+ c = arvados.collection.Collection()
+
+ if req.headers.get("Content-Disposition"):
+ grp = re.search(r'filename=("((\"|[^"])+)"|([^][()<>@,;:\"/?={} ]+))', req.headers["Content-Disposition"])
+ if grp.group(2):
+ name = grp.group(2)
+ else:
+ name = grp.group(4)
+ else:
+ name = urlparse.urlparse(url).path.split("/")[-1]
+
+ count = 0
+ start = time.time()
+ checkpoint = start
+ with c.open(name, "w") as f:
+ for chunk in req.iter_content(chunk_size=1024):
+ count += len(chunk)
+ f.write(chunk)
+ loopnow = time.time()
+ if (loopnow - checkpoint) > 20:
+ bps = (float(count)/float(loopnow - start))
+ if cl is not None:
+ logger.info("%2.1f%% complete, %3.2f MiB/s, %1.0f seconds left",
+ float(count * 100) / float(cl),
+ bps/(1024*1024),
+ (cl-count)/bps)
+ else:
+ logger.info("%d downloaded, %3.2f MiB/s", count, bps/(1024*1024))
+ checkpoint = loopnow
+
+ c.save_new(name="Downloaded from %s" % url, owner_uuid=project_uuid, ensure_unique_name=True)
+
+ api.collections().update(uuid=c.manifest_locator(), body={"collection":{"properties": properties}}).execute()
+
+ return "keep:%s/%s" % (c.portable_data_hash(), name)
from cwltool.pathmapper import PathMapper, MapperEnt, abspath, adjustFileObjs, adjustDirObjs
from cwltool.workflow import WorkflowException
+from .http import http_to_keep
+
logger = logging.getLogger('arvados.cwl-runner')
def trim_listing(obj):
raise WorkflowException("File literal '%s' is missing `contents`" % src)
if srcobj["class"] == "Directory" and "listing" not in srcobj:
raise WorkflowException("Directory literal '%s' is missing `listing`" % src)
+ elif src.startswith("http:") or src.startswith("https:"):
+ keepref = http_to_keep(self.arvrunner.api, self.arvrunner.project_uuid, src)
+ logger.info("%s is %s", src, keepref)
+ self._pathmap[src] = MapperEnt(keepref, keepref, srcobj["class"], True)
else:
self._pathmap[src] = MapperEnt(src, src, srcobj["class"], True)
keep_client=self.arvrunner.keep_client,
num_retries=self.arvrunner.num_retries)
- already_uploaded = self.arvrunner.get_uploaded()
- copied_files = set()
- for k in referenced_files:
- loc = k["location"]
- if loc in already_uploaded:
- v = already_uploaded[loc]
- self._pathmap[loc] = MapperEnt(v.resolved, self.collection_pattern % urllib.unquote(v.resolved[5:]), v.type, True)
- if self.single_collection:
- basename = k["basename"]
- if basename not in collection:
- self.addentry({"location": loc, "class": v.type, "basename": basename}, collection, ".", [])
- copied_files.add((loc, basename, v.type))
-
for srcobj in referenced_files:
self.visit(srcobj, uploadfiles)
fnPattern="keep:%s/%s",
name=self.name,
project=self.arvrunner.project_uuid,
- collection=collection)
+ collection=collection,
+ packed=False)
for src, ab, st in uploadfiles:
self._pathmap[src] = MapperEnt(urllib.quote(st.fn, "/:+@"), self.collection_pattern % st.fn[5:],
"Directory" if os.path.isdir(ab) else "File", True)
- self.arvrunner.add_uploaded(src, self._pathmap[src])
-
- for loc, basename, cls in copied_files:
- fn = "keep:%s/%s" % (collection.portable_data_hash(), basename)
- self._pathmap[loc] = MapperEnt(urllib.quote(fn, "/:+@"), self.collection_pattern % fn[5:], cls, True)
for srcobj in referenced_files:
remap = []
# that external references in $include and $mixin are captured.
scanobj = loadref("", workflowobj["id"])
- sc = scandeps(uri, scanobj,
+ sc_result = scandeps(uri, scanobj,
loadref_fields,
set(("$include", "$schemas", "location")),
loadref, urljoin=document_loader.fetcher.urljoin)
+ sc = []
+ def only_real(obj):
+ if obj.get("location", "").startswith("file:"):
+ sc.append(obj)
+
+ visit_class(sc_result, ("File", "Directory"), only_real)
+
normalizeFilesDirs(sc)
if include_primary and "id" in workflowobj:
else:
try:
save_version(setup_dir, module, git_latest_tag() + git_timestamp_tag())
- except subprocess.CalledProcessError:
+ except (subprocess.CalledProcessError, OSError):
pass
return read_version(setup_dir, module)
# Note that arvados/build/run-build-packages.sh looks at this
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20180508202931',
+ 'cwltool==1.0.20180524215209',
'schema-salad==2.7.20180501211602',
- 'typing==3.5.3.0',
+ 'typing >= 3.5.3',
'ruamel.yaml >=0.13.11, <0.15',
- 'arvados-python-client>=1.1.4.20180507184611',
+ 'arvados-python-client>=1.1.4.20180607143841',
'setuptools',
- 'ciso8601 >=1.0.6'
+ 'ciso8601 >=1.0.6, <2.0.0'
],
data_files=[
('share/doc/arvados-cwl-runner', ['LICENSE-2.0.txt', 'README.rst']),
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: ExpressionTool
requirements:
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
dir:
class: Directory
location: samples
\ No newline at end of file
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: CommandLineTool
requirements:
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: Workflow
requirements:
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
{
"cwlVersion": "v1.0",
"arguments": [
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: CommandLineTool
inputs:
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: Workflow
$namespaces:
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
i:
class: File
location: keep:f225e6259bdd63bc7240599648dde9f1+97/hg19.fa
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: Workflow
$namespaces:
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
class: Workflow
cwlVersion: v1.0
inputs:
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
class: CommandLineTool
cwlVersion: v1.0
inputs:
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
toplevel_input:
class: File
location: keep:4d8a70b1e63b2aad6984e40e338e2373+69/hello.txt
\ No newline at end of file
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
pw: blorp
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+ basedir="", make_fs_access=make_fs_access, loader=Loader({}),
+ metadata={"cwlVersion": "v1.0"})
arvtool.formatgraph = None
for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run_"+str(enable_reuse),
make_fs_access=make_fs_access, tmpdir="/tmp"):
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
avsc_names=avsc_names, make_fs_access=make_fs_access,
- loader=Loader({}))
+ loader=Loader({}), metadata={"cwlVersion": "v1.0"})
arvtool.formatgraph = None
for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_resource_requirements",
make_fs_access=make_fs_access, tmpdir="/tmp"):
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers",
avsc_names=avsc_names, make_fs_access=make_fs_access,
- loader=Loader({}))
+ loader=Loader({}), metadata={"cwlVersion": "v1.0"})
arvtool.formatgraph = None
for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_initial_work_dir",
make_fs_access=make_fs_access, tmpdir="/tmp"):
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+ basedir="", make_fs_access=make_fs_access, loader=Loader({}),
+ metadata={"cwlVersion": "v1.0"})
arvtool.formatgraph = None
for j in arvtool.job({}, mock.MagicMock(), basedir="", name="test_run_redirect",
make_fs_access=make_fs_access, tmpdir="/tmp"):
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+ basedir="", make_fs_access=make_fs_access, loader=Loader({}),
+ metadata={"cwlVersion": "v1.0"})
arvtool.formatgraph = None
job_order = {
"p1": {
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="containers", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+ basedir="", make_fs_access=make_fs_access, loader=Loader({}),
+ metadata={"cwlVersion": "v1.0"})
arvtool.formatgraph = None
job_order = {"pw": "blorp"}
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+import copy
+import cStringIO
+import functools
+import hashlib
+import json
+import logging
+import mock
+import sys
+import unittest
+import datetime
+
+import arvados
+import arvados.collection
+import arvados_cwl
+import arvados_cwl.runner
+import arvados.keep
+
+from .matcher import JsonDiffMatcher, StripYAMLComments
+from .mock_discovery import get_rootDesc
+
+import arvados_cwl.http
+
+import ruamel.yaml as yaml
+
+
+class TestHttpToKeep(unittest.TestCase):
+
+ @mock.patch("requests.get")
+ @mock.patch("arvados.collection.Collection")
+ def test_http_get(self, collectionmock, getmock):
+ api = mock.MagicMock()
+
+ api.collections().list().execute.return_value = {
+ "items": []
+ }
+
+ cm = mock.MagicMock()
+ cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+ cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+ collectionmock.return_value = cm
+
+ req = mock.MagicMock()
+ req.status_code = 200
+ req.headers = {}
+ req.iter_content.return_value = ["abc"]
+ getmock.return_value = req
+
+ utcnow = mock.MagicMock()
+ utcnow.return_value = datetime.datetime(2018, 5, 15)
+
+ r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+ self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+ getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True)
+
+ cm.open.assert_called_with("file1.txt", "w")
+ cm.save_new.assert_called_with(name="Downloaded from http://example.com/file1.txt",
+ owner_uuid=None, ensure_unique_name=True)
+
+ api.collections().update.assert_has_calls([
+ mock.call(uuid=cm.manifest_locator(),
+ body={"collection":{"properties": {'http://example.com/file1.txt': {'Date': 'Tue, 15 May 2018 00:00:00 GMT'}}}})
+ ])
+
+
+ @mock.patch("requests.get")
+ @mock.patch("arvados.collection.CollectionReader")
+ def test_http_expires(self, collectionmock, getmock):
+ api = mock.MagicMock()
+
+ api.collections().list().execute.return_value = {
+ "items": [{
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3",
+ "portable_data_hash": "99999999999999999999999999999998+99",
+ "properties": {
+ 'http://example.com/file1.txt': {
+ 'Date': 'Tue, 15 May 2018 00:00:00 GMT',
+ 'Expires': 'Tue, 17 May 2018 00:00:00 GMT'
+ }
+ }
+ }]
+ }
+
+ cm = mock.MagicMock()
+ cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+ cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+ cm.keys.return_value = ["file1.txt"]
+ collectionmock.return_value = cm
+
+ req = mock.MagicMock()
+ req.status_code = 200
+ req.headers = {}
+ req.iter_content.return_value = ["abc"]
+ getmock.return_value = req
+
+ utcnow = mock.MagicMock()
+ utcnow.return_value = datetime.datetime(2018, 5, 16)
+
+ r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+ self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+ getmock.assert_not_called()
+
+
+ @mock.patch("requests.get")
+ @mock.patch("arvados.collection.CollectionReader")
+ def test_http_cache_control(self, collectionmock, getmock):
+ api = mock.MagicMock()
+
+ api.collections().list().execute.return_value = {
+ "items": [{
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3",
+ "portable_data_hash": "99999999999999999999999999999998+99",
+ "properties": {
+ 'http://example.com/file1.txt': {
+ 'Date': 'Tue, 15 May 2018 00:00:00 GMT',
+ 'Cache-Control': 'max-age=172800'
+ }
+ }
+ }]
+ }
+
+ cm = mock.MagicMock()
+ cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+ cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+ cm.keys.return_value = ["file1.txt"]
+ collectionmock.return_value = cm
+
+ req = mock.MagicMock()
+ req.status_code = 200
+ req.headers = {}
+ req.iter_content.return_value = ["abc"]
+ getmock.return_value = req
+
+ utcnow = mock.MagicMock()
+ utcnow.return_value = datetime.datetime(2018, 5, 16)
+
+ r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+ self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+ getmock.assert_not_called()
+
+
+ @mock.patch("requests.get")
+ @mock.patch("requests.head")
+ @mock.patch("arvados.collection.Collection")
+ def test_http_expired(self, collectionmock, headmock, getmock):
+ api = mock.MagicMock()
+
+ api.collections().list().execute.return_value = {
+ "items": [{
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3",
+ "portable_data_hash": "99999999999999999999999999999998+99",
+ "properties": {
+ 'http://example.com/file1.txt': {
+ 'Date': 'Tue, 15 May 2018 00:00:00 GMT',
+ 'Expires': 'Tue, 16 May 2018 00:00:00 GMT'
+ }
+ }
+ }]
+ }
+
+ cm = mock.MagicMock()
+ cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz4"
+ cm.portable_data_hash.return_value = "99999999999999999999999999999997+99"
+ cm.keys.return_value = ["file1.txt"]
+ collectionmock.return_value = cm
+
+ req = mock.MagicMock()
+ req.status_code = 200
+ req.headers = {'Date': 'Tue, 17 May 2018 00:00:00 GMT'}
+ req.iter_content.return_value = ["def"]
+ getmock.return_value = req
+ headmock.return_value = req
+
+ utcnow = mock.MagicMock()
+ utcnow.return_value = datetime.datetime(2018, 5, 17)
+
+ r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+ self.assertEqual(r, "keep:99999999999999999999999999999997+99/file1.txt")
+
+ getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True)
+
+ cm.open.assert_called_with("file1.txt", "w")
+ cm.save_new.assert_called_with(name="Downloaded from http://example.com/file1.txt",
+ owner_uuid=None, ensure_unique_name=True)
+
+ api.collections().update.assert_has_calls([
+ mock.call(uuid=cm.manifest_locator(),
+ body={"collection":{"properties": {'http://example.com/file1.txt': {'Date': 'Tue, 17 May 2018 00:00:00 GMT'}}}})
+ ])
+
+
+ @mock.patch("requests.get")
+ @mock.patch("requests.head")
+ @mock.patch("arvados.collection.CollectionReader")
+ def test_http_etag(self, collectionmock, headmock, getmock):
+ api = mock.MagicMock()
+
+ api.collections().list().execute.return_value = {
+ "items": [{
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzz3",
+ "portable_data_hash": "99999999999999999999999999999998+99",
+ "properties": {
+ 'http://example.com/file1.txt': {
+ 'Date': 'Tue, 15 May 2018 00:00:00 GMT',
+ 'Expires': 'Tue, 16 May 2018 00:00:00 GMT',
+ 'ETag': '123456'
+ }
+ }
+ }]
+ }
+
+ cm = mock.MagicMock()
+ cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+ cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+ cm.keys.return_value = ["file1.txt"]
+ collectionmock.return_value = cm
+
+ req = mock.MagicMock()
+ req.status_code = 200
+ req.headers = {
+ 'Date': 'Tue, 17 May 2018 00:00:00 GMT',
+ 'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
+ 'ETag': '123456'
+ }
+ headmock.return_value = req
+
+ utcnow = mock.MagicMock()
+ utcnow.return_value = datetime.datetime(2018, 5, 17)
+
+ r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/file1.txt", utcnow=utcnow)
+ self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+ getmock.assert_not_called()
+ cm.open.assert_not_called()
+
+ api.collections().update.assert_has_calls([
+ mock.call(uuid=cm.manifest_locator(),
+ body={"collection":{"properties": {'http://example.com/file1.txt': {
+ 'Date': 'Tue, 17 May 2018 00:00:00 GMT',
+ 'Expires': 'Tue, 19 May 2018 00:00:00 GMT',
+ 'ETag': '123456'
+ }}}})
+ ])
+
+ @mock.patch("requests.get")
+ @mock.patch("arvados.collection.Collection")
+ def test_http_content_disp(self, collectionmock, getmock):
+ api = mock.MagicMock()
+
+ api.collections().list().execute.return_value = {
+ "items": []
+ }
+
+ cm = mock.MagicMock()
+ cm.manifest_locator.return_value = "zzzzz-4zz18-zzzzzzzzzzzzzz3"
+ cm.portable_data_hash.return_value = "99999999999999999999999999999998+99"
+ collectionmock.return_value = cm
+
+ req = mock.MagicMock()
+ req.status_code = 200
+ req.headers = {"Content-Disposition": "attachment; filename=file1.txt"}
+ req.iter_content.return_value = ["abc"]
+ getmock.return_value = req
+
+ utcnow = mock.MagicMock()
+ utcnow.return_value = datetime.datetime(2018, 5, 15)
+
+ r = arvados_cwl.http.http_to_keep(api, None, "http://example.com/download?fn=/file1.txt", utcnow=utcnow)
+ self.assertEqual(r, "keep:99999999999999999999999999999998+99/file1.txt")
+
+ getmock.assert_called_with("http://example.com/download?fn=/file1.txt", stream=True, allow_redirects=True)
+
+ cm.open.assert_called_with("file1.txt", "w")
+ cm.save_new.assert_called_with(name="Downloaded from http://example.com/download?fn=/file1.txt",
+ owner_uuid=None, ensure_unique_name=True)
+
+ api.collections().update.assert_has_calls([
+ mock.call(uuid=cm.manifest_locator(),
+ body={"collection":{"properties": {"http://example.com/download?fn=/file1.txt": {'Date': 'Tue, 15 May 2018 00:00:00 GMT'}}}})
+ ])
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
- basedir="", make_fs_access=make_fs_access, loader=Loader({}))
+ basedir="", make_fs_access=make_fs_access, loader=Loader({}),
+ metadata={"cwlVersion": "v1.0"})
arvtool.formatgraph = None
for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
j.run(enable_reuse=enable_reuse)
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
arvtool = arvados_cwl.ArvadosCommandTool(runner, tool, work_api="jobs", avsc_names=avsc_names,
- make_fs_access=make_fs_access, loader=Loader({}))
+ make_fs_access=make_fs_access, loader=Loader({}),
+ metadata={"cwlVersion": "v1.0"})
arvtool.formatgraph = None
for j in arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access):
j.run(enable_reuse=True)
basedir="", make_fs_access=make_fs_access, loader=document_loader,
makeTool=runner.arv_make_tool, metadata=metadata)
arvtool.formatgraph = None
- it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access)
+ it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access, tmp_outdir_prefix="")
it.next().run()
it.next().run()
basedir="", make_fs_access=make_fs_access, loader=document_loader,
makeTool=runner.arv_make_tool, metadata=metadata)
arvtool.formatgraph = None
- it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access)
+ it = arvtool.job({}, mock.MagicMock(), basedir="", make_fs_access=make_fs_access, tmp_outdir_prefix="")
it.next().run()
it.next().run()
from arvados_cwl.pathmapper import ArvPathMapper
-def upload_mock(files, api, dry_run=False, num_retries=0, project=None, fnPattern="$(file %s/%s)", name=None, collection=None):
+def upload_mock(files, api, dry_run=False, num_retries=0, project=None, fnPattern="$(file %s/%s)", name=None, collection=None, packed=None):
pdh = "99999999999999999999999999999991+99"
for c in files:
c.keepref = "%s/%s" % (pdh, os.path.basename(c.fn))
self.assertEqual({'file:tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999991+99/hw.py', target='/test/99999999999999999999999999999991+99/hw.py', type='File', staged=True)},
p._pathmap)
- @mock.patch("arvados.commands.run.uploadfiles")
- def test_prev_uploaded(self, upl):
- """Test pathmapper handling previously uploaded files."""
-
- arvrunner = arvados_cwl.ArvCwlRunner(self.api)
- arvrunner.add_uploaded('file:tests/hw.py', MapperEnt(resolved='keep:99999999999999999999999999999992+99/hw.py', target='', type='File', staged=True))
-
- upl.side_effect = upload_mock
-
- p = ArvPathMapper(arvrunner, [{
- "class": "File",
- "location": "file:tests/hw.py"
- }], "", "/test/%s", "/test/%s/%s")
-
- self.assertEqual({'file:tests/hw.py': MapperEnt(resolved='keep:99999999999999999999999999999992+99/hw.py', target='/test/99999999999999999999999999999992+99/hw.py', type='File', staged=True)},
- p._pathmap)
-
@mock.patch("arvados.commands.run.uploadfiles")
@mock.patch("arvados.commands.run.statfile")
def test_statfile(self, statfile, upl):
},
'secret_mounts': {},
'state': 'Committed',
- 'owner_uuid': None,
'command': ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
'--eval-timeout=20', '--thread-count=4',
'kind': 'json'
}
}, 'state': 'Committed',
- 'owner_uuid': None,
'output_path': '/var/spool/cwl',
'name': 'expect_arvworkflow.cwl#main',
'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
'kind': 'json'
}
}, 'state': 'Committed',
- 'owner_uuid': None,
'output_path': '/var/spool/cwl',
'name': 'a test workflow',
'container_image': 'arvados/jobs:'+arvados_cwl.__version__,
},
"name": "secret_wf.cwl",
"output_path": "/var/spool/cwl",
- "owner_uuid": None,
"priority": 500,
"properties": {},
"runtime_constraints": {
self.assertEqual(capture_stdout.getvalue(),
stubs.expect_container_request_uuid + '\n')
+ @stubs
+ def test_submit_request_uuid(self, stubs):
+ stubs.expect_container_request_uuid = "zzzzz-xvhdp-yyyyyyyyyyyyyyy"
+
+ stubs.api.container_requests().update().execute.return_value = {
+ "uuid": stubs.expect_container_request_uuid,
+ "container_uuid": "zzzzz-dz642-zzzzzzzzzzzzzzz",
+ "state": "Queued"
+ }
+
+ capture_stdout = cStringIO.StringIO()
+ try:
+ exited = arvados_cwl.main(
+ ["--submit", "--no-wait", "--api=containers", "--debug", "--submit-request-uuid=zzzzz-xvhdp-yyyyyyyyyyyyyyy",
+ "tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
+ capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
+ self.assertEqual(exited, 0)
+ except:
+ logging.exception("")
+
+ stubs.api.container_requests().update.assert_called_with(
+ uuid="zzzzz-xvhdp-yyyyyyyyyyyyyyy", body=JsonDiffMatcher(stubs.expect_container_spec))
+ self.assertEqual(capture_stdout.getvalue(),
+ stubs.expect_container_request_uuid + '\n')
+
class TestCreateTemplate(unittest.TestCase):
existing_template_uuid = "zzzzz-d1hrv-validworkfloyml"
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: CommandLineTool
inputs:
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: CommandLineTool
inputs:
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: CommandLineTool
inputs:
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: Workflow
inputs: []
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: Workflow
inputs: []
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: Workflow
inputs: []
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: CommandLineTool
inputs:
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: Workflow
inputs:
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: CommandLineTool
inputs:
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: Workflow
inputs: []
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: Workflow
inputs: []
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: Workflow
inputs: []
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: Workflow
inputs: []
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: Workflow
inputs: []
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: Workflow
inputs: []
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: Workflow
inputs: []
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
import arvados
import sys
import os
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: Workflow
requirements:
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: Workflow
$namespaces:
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: CommandLineTool
requirements:
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: CommandLineTool
requirements:
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: CommandLineTool
$namespaces:
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
cwlVersion: v1.0
class: Workflow
$namespaces:
RAM int64
Scratch int64
Price float64
+ Preemptable bool
}
// GetThisSystemNode returns a SystemNode for the node we're running
// SchedulingParameters specify a container's scheduling parameters
// such as Partitions
type SchedulingParameters struct {
- Partitions []string `json:"partitions"`
+ Partitions []string `json:"partitions"`
+ Preemptable bool `json:"preemptable"`
}
// ContainerList is an arvados#containerList resource.
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
package arvados
import (
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
package health
import (
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
package health
import (
Retries int
BlockCache *BlockCache
RequestID string
+ StorageClasses []string
// set to 1 if all writable services are of disk type, otherwise 0
replicasPerService int
}
type StubPutHandler struct {
- c *C
- expectPath string
- expectApiToken string
- expectBody string
- handled chan string
+ c *C
+ expectPath string
+ expectApiToken string
+ expectBody string
+ expectStorageClass string
+ handled chan string
}
func (sph StubPutHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
sph.c.Check(req.URL.Path, Equals, "/"+sph.expectPath)
sph.c.Check(req.Header.Get("Authorization"), Equals, fmt.Sprintf("OAuth2 %s", sph.expectApiToken))
+ sph.c.Check(req.Header.Get("X-Keep-Storage-Classes"), Equals, sph.expectStorageClass)
body, err := ioutil.ReadAll(req.Body)
sph.c.Check(err, Equals, nil)
sph.c.Check(body, DeepEquals, []byte(sph.expectBody))
"acbd18db4cc2f85cedef654fccc4a4d8",
"abc123",
"foo",
+ "hot",
make(chan string)}
UploadToStubHelper(c, st,
func(kc *KeepClient, url string, reader io.ReadCloser, writer io.WriteCloser, upload_status chan uploadStatus) {
-
+ kc.StorageClasses = []string{"hot"}
go kc.uploadToKeepServer(url, st.expectPath, reader, upload_status, int64(len("foo")), kc.getRequestID())
writer.Write([]byte("foo"))
"acbd18db4cc2f85cedef654fccc4a4d8",
"abc123",
"foo",
+ "",
make(chan string)}
UploadToStubHelper(c, st,
hash,
"abc123",
"foo",
+ "",
make(chan string, 5)}
arv, _ := arvadosclient.MakeArvadosClient()
hash,
"abc123",
"foo",
+ "",
make(chan string, 5)}
arv, _ := arvadosclient.MakeArvadosClient()
hash,
"abc123",
"foo",
+ "",
make(chan string, 4)}
fh := FailHandler{
hash,
"abc123",
"foo",
+ "",
make(chan string, 1)}
fh := FailHandler{
hash,
"abc123",
"foo",
+ "",
make(chan string, 5)}
arv, _ := arvadosclient.MakeArvadosClient()
hash,
"abc123",
"foo",
+ "",
make(chan string, 5)}
arv, _ := arvadosclient.MakeArvadosClient()
Md5String("foo"),
"abc123",
"foo",
+ "",
make(chan string, 5)}}
arv, _ := arvadosclient.MakeArvadosClient()
req.Header.Add("Authorization", "OAuth2 "+this.Arvados.ApiToken)
req.Header.Add("Content-Type", "application/octet-stream")
req.Header.Add(X_Keep_Desired_Replicas, fmt.Sprint(this.Want_replicas))
+ if len(this.StorageClasses) > 0 {
+ req.Header.Add("X-Keep-Storage-Classes", strings.Join(this.StorageClasses, ", "))
+ }
var resp *http.Response
if resp, err = this.httpClient().Do(req); err != nil {
import re
import errno
import hashlib
+import datetime
+import ciso8601
import time
import threading
def root_collection(self):
return self
+ def get_properties(self):
+ if self._api_response and self._api_response["properties"]:
+ return self._api_response["properties"]
+ else:
+ return {}
+
+ def get_trash_at(self):
+ if self._api_response and self._api_response["trash_at"]:
+ return ciso8601.parse_datetime(self._api_response["trash_at"])
+ else:
+ return None
+
def stream_name(self):
return "."
@must_be_writable
@synchronized
@retry_method
- def save(self, merge=True, num_retries=None):
+ def save(self,
+ properties=None,
+ storage_classes=None,
+ trash_at=None,
+ merge=True,
+ num_retries=None):
"""Save collection to an existing collection record.
Commit pending buffer blocks to Keep, merge with remote record (if
- merge=True, the default), and update the collection record. Returns
+ merge=True, the default), and update the collection record. Returns
the current manifest text.
Will raise AssertionError if not associated with a collection record on
the API server. If you want to save a manifest to Keep only, see
`save_new()`.
+ :properties:
+ Additional properties of collection. This value will replace any existing
+ properties of collection.
+
+ :storage_classes:
+ Specify desirable storage classes to be used when writing data to Keep.
+
+ :trash_at:
+ A collection is *expiring* when it has a *trash_at* time in the future.
+ An expiring collection can be accessed as normal,
+ but is scheduled to be trashed automatically at the *trash_at* time.
+
:merge:
Update and merge remote changes before saving. Otherwise, any
remote changes will be ignored and overwritten.
Retry count on API calls (if None, use the collection default)
"""
+ if properties and type(properties) is not dict:
+ raise errors.ArgumentError("properties must be dictionary type.")
+
+ if storage_classes and type(storage_classes) is not list:
+ raise errors.ArgumentError("storage_classes must be list type.")
+
+ if trash_at and type(trash_at) is not datetime.datetime:
+ raise errors.ArgumentError("trash_at must be datetime type.")
+
+ body={}
+ if properties:
+ body["properties"] = properties
+ if storage_classes:
+ body["storage_classes_desired"] = storage_classes
+ if trash_at:
+ t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
+ body["trash_at"] = t
+
if not self.committed():
if not self._has_collection_uuid():
raise AssertionError("Collection manifest_locator is not a collection uuid. Use save_new() for new collections.")
self.update()
text = self.manifest_text(strip=False)
+ body['manifest_text'] = text
+
self._remember_api_response(self._my_api().collections().update(
uuid=self._manifest_locator,
- body={'manifest_text': text}
- ).execute(
- num_retries=num_retries))
+ body=body
+ ).execute(num_retries=num_retries))
self._manifest_text = self._api_response["manifest_text"]
self._portable_data_hash = self._api_response["portable_data_hash"]
self.set_committed(True)
+ elif body:
+ self._remember_api_response(self._my_api().collections().update(
+ uuid=self._manifest_locator,
+ body=body
+ ).execute(num_retries=num_retries))
return self._manifest_text
def save_new(self, name=None,
create_collection_record=True,
owner_uuid=None,
+ properties=None,
+ storage_classes=None,
+ trash_at=None,
ensure_unique_name=False,
num_retries=None):
"""Save collection to a new collection record.
Commit pending buffer blocks to Keep and, when create_collection_record
is True (default), create a new collection record. After creating a
new collection record, this Collection object will be associated with
- the new record used by `save()`. Returns the current manifest text.
+ the new record used by `save()`. Returns the current manifest text.
:name:
The collection name.
the user, or project uuid that will own this collection.
If None, defaults to the current user.
+ :properties:
+ Additional properties of collection. This value will replace any existing
+ properties of collection.
+
+ :storage_classes:
+ Specify desirable storage classes to be used when writing data to Keep.
+
+ :trash_at:
+ A collection is *expiring* when it has a *trash_at* time in the future.
+ An expiring collection can be accessed as normal,
+ but is scheduled to be trashed automatically at the *trash_at* time.
+
:ensure_unique_name:
If True, ask the API server to rename the collection
if it conflicts with a collection with the same name and owner. If
Retry count on API calls (if None, use the collection default)
"""
+ if properties and type(properties) is not dict:
+ raise errors.ArgumentError("properties must be dictionary type.")
+
+ if storage_classes and type(storage_classes) is not list:
+ raise errors.ArgumentError("storage_classes must be list type.")
+
+ if trash_at and type(trash_at) is not datetime.datetime:
+ raise errors.ArgumentError("trash_at must be datetime type.")
+
self._my_block_manager().commit_all()
text = self.manifest_text(strip=False)
"replication_desired": self.replication_desired}
if owner_uuid:
body["owner_uuid"] = owner_uuid
+ if properties:
+ body["properties"] = properties
+ if storage_classes:
+ body["storage_classes_desired"] = storage_classes
+ if trash_at:
+ t = trash_at.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
+ body["trash_at"] = t
self._remember_api_response(self._my_api().collections().create(ensure_unique_name=ensure_unique_name, body=body).execute(num_retries=num_retries))
text = self._api_response["manifest_text"]
block. Default is to use the server-provided default (if any) or 2.
""")
+upload_opts.add_argument('--storage-classes', help="""
+Specify comma separated list of storage classes to be used when saving data to Keep.
+""")
+
upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
help="""
Set the number of upload threads to be used. Take into account that
def __init__(self, paths, resume=True, use_cache=True, reporter=None,
name=None, owner_uuid=None, api_client=None,
ensure_unique_name=False, num_retries=None,
- put_threads=None, replication_desired=None,
- filename=None, update_time=60.0, update_collection=None,
+ put_threads=None, replication_desired=None, filename=None,
+ update_time=60.0, update_collection=None, storage_classes=None,
logger=logging.getLogger('arvados.arv_put'), dry_run=False,
follow_links=True, exclude_paths=[], exclude_names=None):
self.paths = paths
self.replication_desired = replication_desired
self.put_threads = put_threads
self.filename = filename
+ self.storage_classes = storage_classes
self._api_client = api_client
self._state_lock = threading.Lock()
self._state = None # Previous run state (file list & manifest)
else:
# The file already exist on remote collection, skip it.
pass
- self._remote_collection.save(num_retries=self.num_retries)
+ self._remote_collection.save(storage_classes=self.storage_classes,
+ num_retries=self.num_retries)
else:
+ if self.storage_classes is None:
+ self.storage_classes = ['default']
self._local_collection.save_new(
name=self.name, owner_uuid=self.owner_uuid,
+ storage_classes=self.storage_classes,
ensure_unique_name=self.ensure_unique_name,
num_retries=self.num_retries)
else:
reporter = None
+ # Split storage-classes argument
+ storage_classes = None
+ if args.storage_classes:
+ storage_classes = args.storage_classes.strip().split(',')
+ if len(storage_classes) > 1:
+ logger.error("Multiple storage classes are not supported currently.")
+ sys.exit(1)
+
+
# Setup exclude regex from all the --exclude arguments provided
name_patterns = []
exclude_paths = []
owner_uuid = project_uuid,
ensure_unique_name = True,
update_collection = args.update_collection,
+ storage_classes=storage_classes,
logger=logger,
dry_run=args.dry_run,
follow_links=args.follow_links,
return prefix+fn
-def write_file(collection, pathprefix, fn):
+def write_file(collection, pathprefix, fn, flush=False):
with open(os.path.join(pathprefix, fn)) as src:
dst = collection.open(fn, "w")
r = src.read(1024*128)
while r:
dst.write(r)
r = src.read(1024*128)
- dst.close(flush=False)
+ dst.close(flush=flush)
def uploadfiles(files, api, dry_run=False, num_retries=0,
project=None,
fnPattern="$(file %s/%s)",
name=None,
- collection=None):
+ collection=None,
+ packed=True):
# Find the smallest path prefix that includes all the files that need to be uploaded.
# This starts at the root and iteratively removes common parent directory prefixes
# until all file paths no longer have a common parent.
continue
prev = localpath
if os.path.isfile(localpath):
- write_file(collection, pathprefix, f.fn)
+ write_file(collection, pathprefix, f.fn, not packed)
elif os.path.isdir(localpath):
for root, dirs, iterfiles in os.walk(localpath):
root = root[len(pathprefix):]
for src in iterfiles:
- write_file(collection, pathprefix, os.path.join(root, src))
+ write_file(collection, pathprefix, os.path.join(root, src), not packed)
filters=[["portable_data_hash", "=", collection.portable_data_hash()]]
if name:
('share/doc/arvados-python-client', ['LICENSE-2.0.txt', 'README.rst']),
],
install_requires=[
- 'ciso8601 >=1.0.6',
+ 'ciso8601 >=1.0.6, <2.0.0',
'future',
'google-api-python-client >=1.6.2, <1.7',
'httplib2 >=0.9.2',
self.call_main_with_args,
['--project-uuid', self.Z_UUID, '--stream'])
+ def test_error_when_multiple_storage_classes_specified(self):
+ self.assertRaises(SystemExit,
+ self.call_main_with_args,
+ ['--storage-classes', 'hot,cold'])
+
def test_error_when_excluding_absolute_path(self):
tmpdir = self.make_tmpdir()
self.assertRaises(SystemExit,
'--project-uuid', self.PROJECT_UUID])
self.assertEqual(link_name, collection['name'])
+ def test_put_collection_with_storage_classes_specified(self):
+ collection = self.run_and_find_collection("", ['--storage-classes', 'hot'])
+
+ self.assertEqual(len(collection['storage_classes_desired']), 1)
+ self.assertEqual(collection['storage_classes_desired'][0], 'hot')
+
+ def test_put_collection_without_storage_classes_specified(self):
+ collection = self.run_and_find_collection("")
+
+ self.assertEqual(len(collection['storage_classes_desired']), 1)
+ self.assertEqual(collection['storage_classes_desired'][0], 'default')
+
def test_exclude_filename_pattern(self):
tmpdir = self.make_tmpdir()
tmpsubdir = os.path.join(tmpdir, 'subdir')
import re
import sys
import tempfile
+import datetime
+import ciso8601
import time
import unittest
self.assertEqual(fn0, c.items()[0][0])
self.assertEqual(fn1, c.items()[1][0])
+ def test_get_properties(self):
+ c = Collection()
+ self.assertEqual(c.get_properties(), {})
+ c.save_new(properties={"foo":"bar"})
+ self.assertEqual(c.get_properties(), {"foo":"bar"})
+
+ def test_get_trash_at(self):
+ c = Collection()
+ self.assertEqual(c.get_trash_at(), None)
+ c.save_new(trash_at=datetime.datetime(2111, 1, 1, 11, 11, 11, 111111))
+ self.assertEqual(c.get_trash_at(), ciso8601.parse_datetime('2111-01-01T11:11:11.111111000Z'))
+
class CollectionOpenModes(run_test_server.TestCaseWithServers):
def test_create_and_save(self):
c = self.create_count_txt()
- c.save()
+ c.save(properties={'type' : 'Intermediate'},
+ storage_classes=['archive'],
+ trash_at=datetime.datetime(2111, 1, 1, 11, 11, 11, 111111))
+
self.assertRegex(
c.manifest_text(),
r"^\. 781e5e245d69b566979b86e28d23f2c7\+10\+A[a-f0-9]{40}@[a-f0-9]{8} 0:10:count\.txt$",)
+ self.assertEqual(c.api_response()["storage_classes_desired"], ['archive'])
+ self.assertEqual(c.api_response()["properties"], {'type' : 'Intermediate'})
+ self.assertEqual(c.api_response()["trash_at"], '2111-01-01T11:11:11.111111000Z')
+
def test_create_and_save_new(self):
c = self.create_count_txt()
- c.save_new()
+ c.save_new(properties={'type' : 'Intermediate'},
+ storage_classes=['archive'],
+ trash_at=datetime.datetime(2111, 1, 1, 11, 11, 11, 111111))
+
self.assertRegex(
c.manifest_text(),
r"^\. 781e5e245d69b566979b86e28d23f2c7\+10\+A[a-f0-9]{40}@[a-f0-9]{8} 0:10:count\.txt$",)
+ self.assertEqual(c.api_response()["storage_classes_desired"], ['archive'])
+ self.assertEqual(c.api_response()["properties"], {'type' : 'Intermediate'})
+ self.assertEqual(c.api_response()["trash_at"], '2111-01-01T11:11:11.111111000Z')
+
+ def test_create_and_save_after_commiting(self):
+ c = self.create_count_txt()
+ c.save(properties={'type' : 'Intermediate'},
+ storage_classes=['hot'],
+ trash_at=datetime.datetime(2111, 1, 1, 11, 11, 11, 111111))
+ c.save(properties={'type' : 'Output'},
+ storage_classes=['cold'],
+ trash_at=datetime.datetime(2222, 2, 2, 22, 22, 22, 222222))
+
+ self.assertEqual(c.api_response()["storage_classes_desired"], ['cold'])
+ self.assertEqual(c.api_response()["properties"], {'type' : 'Output'})
+ self.assertEqual(c.api_response()["trash_at"], '2222-02-02T22:22:22.222222000Z')
def test_create_diff_apply(self):
c1 = self.create_count_txt()
end
def update
- # container updates can trigger container request lookups, which
- # can deadlock if we don't lock the container_requests table
- # first.
- @object.transaction do
- ActiveRecord::Base.connection.execute('LOCK container_requests, containers IN EXCLUSIVE MODE')
+ @object.with_lock do
@object.reload
super
end
require 'log_reuse_info'
require 'whitelist_update'
require 'safe_json'
+require 'update_priority'
class Container < ArvadosModel
include ArvadosModelUpdates
before_save :scrub_secret_mounts
after_save :handle_completed
after_save :propagate_priority
+ after_commit { UpdatePriority.run_update_thread }
has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid
belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid
# Update the priority of child container requests to match new
# priority of the parent container (ignoring requests with no
# container assigned, because their priority doesn't matter).
- ActiveRecord::Base.connection.execute('LOCK container_requests, containers IN EXCLUSIVE MODE')
ContainerRequest.
where(requesting_container_uuid: self.uuid,
state: ContainerRequest::Committed).
# (because state might have changed while acquiring the lock).
check_lock_fail
transaction do
- # Locking involves assigning auth_uuid, which involves looking
- # up container requests, so we must lock both tables in the
- # proper order to avoid deadlock.
- ActiveRecord::Base.connection.execute('LOCK container_requests, containers IN EXCLUSIVE MODE')
reload
check_lock_fail
update_attributes!(state: Locked)
if self.state_changed? and self.final?
act_as_system_user do
- ActiveRecord::Base.connection.execute('LOCK container_requests, containers IN EXCLUSIVE MODE')
if self.state == Cancelled
retryable_requests = ContainerRequest.where("container_uuid = ? and priority > 0 and state = 'Committed' and container_count < container_count_max", uuid)
else
before_validation :fill_field_defaults, :if => :new_record?
before_validation :validate_runtime_constraints
- before_validation :validate_scheduling_parameters
before_validation :set_container
+ before_validation :set_default_preemptable_scheduling_parameter
validates :command, :container_image, :output_path, :cwd, :presence => true
validates :output_ttl, numericality: { only_integer: true, greater_than_or_equal_to: 0 }
validates :priority, numericality: { only_integer: true, greater_than_or_equal_to: 0, less_than_or_equal_to: 1000 }
+ validate :validate_scheduling_parameters
validate :validate_state_change
validate :check_update_whitelist
validate :secret_mounts_key_conflict
Committed => [Final]
}
- AttrsPermittedAlways = [:owner_uuid, :state, :name, :description]
+ AttrsPermittedAlways = [:owner_uuid, :state, :name, :description, :properties]
AttrsPermittedBeforeCommit = [:command, :container_count_max,
:container_image, :cwd, :environment, :filters, :mounts,
- :output_path, :priority, :properties,
+ :output_path, :priority,
:runtime_constraints, :state, :container_uuid, :use_existing,
:scheduling_parameters, :secret_mounts, :output_name, :output_ttl]
end
end
+ def set_default_preemptable_scheduling_parameter
+ if self.state == Committed
+ # If preemptable instances (eg: AWS Spot Instances) are allowed,
+ # ask them on child containers by default.
+ if Rails.configuration.preemptable_instances and
+ !self.requesting_container_uuid.nil? and
+ self.scheduling_parameters['preemptable'].nil?
+ self.scheduling_parameters['preemptable'] = true
+ end
+ end
+ end
+
def validate_runtime_constraints
case self.state
when Committed
scheduling_parameters['partitions'].size)
errors.add :scheduling_parameters, "partitions must be an array of strings"
end
+ if !Rails.configuration.preemptable_instances and scheduling_parameters['preemptable']
+ errors.add :scheduling_parameters, "preemptable instances are not allowed"
+ end
end
end
def update_priority
return unless state_changed? || priority_changed? || container_uuid_changed?
act_as_system_user do
- ActiveRecord::Base.connection.execute('LOCK container_requests, containers IN EXCLUSIVE MODE')
Container.
where('uuid in (?)', [self.container_uuid_was, self.container_uuid].compact).
map(&:update_priority!)
def set_requesting_container_uuid
return if !current_api_client_authorization
- ActiveRecord::Base.connection.execute('LOCK container_requests, containers IN EXCLUSIVE MODE')
if (c = Container.where('auth_uuid=?', current_api_client_authorization.uuid).select([:uuid, :priority]).first)
self.requesting_container_uuid = c.uuid
self.priority = c.priority>0 ? 1 : 0
include CanBeAnOwner
include Trashable
+ serialize :properties, Hash
+
after_create :invalidate_permissions_cache
after_update :maybe_invalidate_permissions_cache
before_create :assign_name
t.add :delete_at
t.add :trash_at
t.add :is_trashed
+ t.add :properties
end
def maybe_invalidate_permissions_cache
### Crunch, DNS & compute node management
###
+ # Preemptable instance support (e.g. AWS Spot Instances)
+ # When true, child containers will get created with the preemptable
+ # scheduling parameter parameter set.
+ preemptable_instances: false
+
# Docker image to be used when none found in runtime_constraints of a job
default_docker_image_for_jobs: false
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
require 'migrate_yaml_to_json'
class YamlToJson < ActiveRecord::Migration
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
require './db/migrate/20161213172944_full_text_search_indexes'
class JsonCollectionProperties < ActiveRecord::Migration
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
class AddIndexToContainers < ActiveRecord::Migration
def up
ActiveRecord::Base.connection.execute("CREATE INDEX index_containers_on_modified_at_uuid ON containers USING btree (modified_at desc, uuid asc)")
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
class FixTrashFlagFollow < ActiveRecord::Migration
def change
ActiveRecord::Base.connection.execute("DROP MATERIALIZED VIEW materialized_permission_view")
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
class AddGinIndexToCollectionProperties < ActiveRecord::Migration
def up
ActiveRecord::Base.connection.execute("CREATE INDEX collection_index_on_properties ON collections USING gin (properties);")
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
class AddSecretMountsToContainers < ActiveRecord::Migration
def change
add_column :container_requests, :secret_mounts, :jsonb, default: {}
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
class ChangeContainerPriorityBigint < ActiveRecord::Migration
def change
change_column :containers, :priority, :integer, limit: 8
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
class AddRedirectToUserUuidToUsers < ActiveRecord::Migration
def up
add_column :users, :redirect_to_user_uuid, :string
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
class AddContainerAuthUuidIndex < ActiveRecord::Migration
def change
add_index :containers, :auth_uuid
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+require './db/migrate/20161213172944_full_text_search_indexes'
+
+class PropertiesToJsonb < ActiveRecord::Migration
+
+ @@tables_columns = [["nodes", "properties"],
+ ["nodes", "info"],
+ ["container_requests", "properties"],
+ ["links", "properties"]]
+
+ def up
+ @@tables_columns.each do |table, column|
+ # Drop the FT index before changing column type to avoid
+ # "PG::DatatypeMismatch: ERROR: COALESCE types jsonb and text
+ # cannot be matched".
+ ActiveRecord::Base.connection.execute "DROP INDEX IF EXISTS #{table}_full_text_search_idx"
+ ActiveRecord::Base.connection.execute "ALTER TABLE #{table} ALTER COLUMN #{column} TYPE jsonb USING #{column}::jsonb"
+ ActiveRecord::Base.connection.execute "CREATE INDEX #{table}_index_on_#{column} ON #{table} USING gin (#{column})"
+ end
+ FullTextSearchIndexes.new.replace_index("container_requests")
+ end
+
+ def down
+ @@tables_columns.each do |table, column|
+ ActiveRecord::Base.connection.execute "DROP INDEX IF EXISTS #{table}_index_on_#{column}"
+ ActiveRecord::Base.connection.execute "ALTER TABLE #{table} ALTER COLUMN #{column} TYPE text"
+ end
+ end
+end
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+require './db/migrate/20161213172944_full_text_search_indexes'
+
+class AddPropertiesToGroups < ActiveRecord::Migration
+ def up
+ add_column :groups, :properties, :jsonb, default: {}
+ ActiveRecord::Base.connection.execute("CREATE INDEX group_index_on_properties ON groups USING gin (properties);")
+ FullTextSearchIndexes.new.replace_index('groups')
+ end
+
+ def down
+ ActiveRecord::Base.connection.execute("DROP INDEX IF EXISTS group_index_on_properties")
+ remove_column :groups, :properties
+ end
+end
modified_by_user_uuid character varying(255),
name character varying(255),
description text,
- properties text,
+ properties jsonb,
state character varying(255),
requesting_container_uuid character varying(255),
container_uuid character varying(255),
group_class character varying(255),
trash_at timestamp without time zone,
is_trashed boolean DEFAULT false NOT NULL,
- delete_at timestamp without time zone
+ delete_at timestamp without time zone,
+ properties jsonb DEFAULT '{}'::jsonb
);
link_class character varying(255),
name character varying(255),
head_uuid character varying(255),
- properties text,
+ properties jsonb,
updated_at timestamp without time zone NOT NULL
);
ip_address character varying(255),
first_ping_at timestamp without time zone,
last_ping_at timestamp without time zone,
- info text,
+ info jsonb,
updated_at timestamp without time zone NOT NULL,
- properties text,
+ properties jsonb,
job_uuid character varying(255)
);
-- Name: container_requests_full_text_search_idx; Type: INDEX; Schema: public; Owner: -
--
-CREATE INDEX container_requests_full_text_search_idx ON container_requests USING gin (to_tsvector('english'::regconfig, (((((((((((((((((((((((((((((((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(description, ''::text)) || ' '::text) || COALESCE(properties, ''::text)) || ' '::text) || (COALESCE(state, ''::character varying))::text) || ' '::text) || (COALESCE(requesting_container_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(container_uuid, ''::character varying))::text) || ' '::text) || COALESCE(runtime_constraints, ''::text)) || ' '::text) || (COALESCE(container_image, ''::character varying))::text) || ' '::text) || COALESCE(environment, ''::text)) || ' '::text) || (COALESCE(cwd, ''::character varying))::text) || ' '::text) || COALESCE(command, ''::text)) || ' '::text) || (COALESCE(output_path, ''::character varying))::text) || ' '::text) || COALESCE(filters, ''::text)) || ' '::text) || COALESCE(scheduling_parameters, ''::text)) || ' '::text) || (COALESCE(output_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(log_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(output_name, ''::character varying))::text)));
+CREATE INDEX container_requests_full_text_search_idx ON container_requests USING gin (to_tsvector('english'::regconfig, (((((((((((((((((((((((((((((((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || COALESCE(description, ''::text)) || ' '::text) || COALESCE((properties)::text, ''::text)) || ' '::text) || (COALESCE(state, ''::character varying))::text) || ' '::text) || (COALESCE(requesting_container_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(container_uuid, ''::character varying))::text) || ' '::text) || COALESCE(runtime_constraints, ''::text)) || ' '::text) || (COALESCE(container_image, ''::character varying))::text) || ' '::text) || COALESCE(environment, ''::text)) || ' '::text) || (COALESCE(cwd, ''::character varying))::text) || ' '::text) || COALESCE(command, ''::text)) || ' '::text) || (COALESCE(output_path, ''::character varying))::text) || ' '::text) || COALESCE(filters, ''::text)) || ' '::text) || COALESCE(scheduling_parameters, ''::text)) || ' '::text) || (COALESCE(output_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(log_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(output_name, ''::character varying))::text)));
+
+
+--
+-- Name: container_requests_index_on_properties; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX container_requests_index_on_properties ON container_requests USING gin (properties);
--
CREATE INDEX containers_search_index ON containers USING btree (uuid, owner_uuid, modified_by_client_uuid, modified_by_user_uuid, state, log, cwd, output_path, output, container_image, auth_uuid, locked_by_uuid);
+--
+-- Name: group_index_on_properties; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX group_index_on_properties ON groups USING gin (properties);
+
+
--
-- Name: groups_full_text_search_idx; Type: INDEX; Schema: public; Owner: -
--
-CREATE INDEX groups_full_text_search_idx ON groups USING gin (to_tsvector('english'::regconfig, (((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || (COALESCE(description, ''::character varying))::text) || ' '::text) || (COALESCE(group_class, ''::character varying))::text)));
+CREATE INDEX groups_full_text_search_idx ON groups USING gin (to_tsvector('english'::regconfig, (((((((((((((((COALESCE(uuid, ''::character varying))::text || ' '::text) || (COALESCE(owner_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_client_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(modified_by_user_uuid, ''::character varying))::text) || ' '::text) || (COALESCE(name, ''::character varying))::text) || ' '::text) || (COALESCE(description, ''::character varying))::text) || ' '::text) || (COALESCE(group_class, ''::character varying))::text) || ' '::text) || COALESCE((properties)::text, ''::text))));
--
CREATE INDEX keep_services_search_index ON keep_services USING btree (uuid, owner_uuid, modified_by_client_uuid, modified_by_user_uuid, service_host, service_type);
+--
+-- Name: links_index_on_properties; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX links_index_on_properties ON links USING gin (properties);
+
+
--
-- Name: links_search_index; Type: INDEX; Schema: public; Owner: -
--
CREATE INDEX logs_search_index ON logs USING btree (uuid, owner_uuid, modified_by_client_uuid, modified_by_user_uuid, object_uuid, event_type, object_owner_uuid);
+--
+-- Name: nodes_index_on_info; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX nodes_index_on_info ON nodes USING gin (info);
+
+
+--
+-- Name: nodes_index_on_properties; Type: INDEX; Schema: public; Owner: -
+--
+
+CREATE INDEX nodes_index_on_properties ON nodes USING gin (properties);
+
+
--
-- Name: nodes_search_index; Type: INDEX; Schema: public; Owner: -
--
INSERT INTO schema_migrations (version) VALUES ('20180514135529');
+INSERT INTO schema_migrations (version) VALUES ('20180608123145');
+
+INSERT INTO schema_migrations (version) VALUES ('20180607175050');
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+module UpdatePriority
+ # Clean up after races: if container priority>0 but there are no
+ # committed container requests for it, reset priority to 0.
+ def self.update_priority
+ if !File.owned?(Rails.root.join('tmp'))
+ Rails.logger.warn("UpdatePriority: not owner of #{Rails.root}/tmp, skipping")
+ return
+ end
+ lockfile = Rails.root.join('tmp', 'update_priority.lock')
+ File.open(lockfile, File::RDWR|File::CREAT, 0600) do |f|
+ return unless f.flock(File::LOCK_NB|File::LOCK_EX)
+ ActiveRecord::Base.connection.execute("UPDATE containers AS c SET priority=0 WHERE state='Queued' AND priority>0 AND uuid NOT IN (SELECT container_uuid FROM container_requests WHERE priority>0);")
+ end
+ end
+
+ def self.run_update_thread
+ need = false
+ Rails.cache.fetch('UpdatePriority', expires_in: 5.seconds) do
+ need = true
+ end
+ return if !need
+
+ Thread.new do
+ Thread.current.abort_on_exception = false
+ begin
+ update_priority
+ rescue => e
+ Rails.logger.error "#{e.class}: #{e}\n#{e.backtrace.join("\n\t")}"
+ ensure
+ ActiveRecord::Base.connection.close
+ end
+ end
+ end
+end
properties: {'foo' => 'bar'}.with_indifferent_access)
raw = ActiveRecord::Base.connection.
select_value("select properties from links where uuid='#{link.uuid}'")
- assert_equal '{"foo":"bar"}', raw
+ assert_equal '{"foo": "bar"}', raw
end
test "store long string" do
assert_equal ContainerRequest::Final, cr3.state
end
+ [
+ [false, ActiveRecord::RecordInvalid],
+ [true, nil],
+ ].each do |preemptable_conf, expected|
+ test "having Rails.configuration.preemptable_instances=#{preemptable_conf}, create preemptable container request and verify #{expected}" do
+ sp = {"preemptable" => true}
+ common_attrs = {cwd: "test",
+ priority: 1,
+ command: ["echo", "hello"],
+ output_path: "test",
+ scheduling_parameters: sp,
+ mounts: {"test" => {"kind" => "json"}}}
+ Rails.configuration.preemptable_instances = preemptable_conf
+ set_user_from_auth :active
+
+ cr = create_minimal_req!(common_attrs)
+ cr.state = ContainerRequest::Committed
+
+ if !expected.nil?
+ assert_raises(expected) do
+ cr.save!
+ end
+ else
+ cr.save!
+ assert_equal sp, cr.scheduling_parameters
+ end
+ end
+ end
+
+ [
+ 'zzzzz-dz642-runningcontainr',
+ nil,
+ ].each do |requesting_c|
+ test "having preemptable instances active on the API server, a committed #{requesting_c.nil? ? 'non-':''}child CR should not ask for preemptable instance if parameter already set to false" do
+ common_attrs = {cwd: "test",
+ priority: 1,
+ command: ["echo", "hello"],
+ output_path: "test",
+ scheduling_parameters: {"preemptable" => false},
+ mounts: {"test" => {"kind" => "json"}}}
+
+ Rails.configuration.preemptable_instances = true
+ set_user_from_auth :active
+
+ if requesting_c
+ cr = with_container_auth(Container.find_by_uuid requesting_c) do
+ create_minimal_req!(common_attrs)
+ end
+ assert_not_nil cr.requesting_container_uuid
+ else
+ cr = create_minimal_req!(common_attrs)
+ end
+
+ cr.state = ContainerRequest::Committed
+ cr.save!
+
+ assert_equal false, cr.scheduling_parameters['preemptable']
+ end
+ end
+
+ [
+ [true, 'zzzzz-dz642-runningcontainr', true],
+ [true, nil, nil],
+ [false, 'zzzzz-dz642-runningcontainr', nil],
+ [false, nil, nil],
+ ].each do |preemptable_conf, requesting_c, schedule_preemptable|
+ test "having Rails.configuration.preemptable_instances=#{preemptable_conf}, #{requesting_c.nil? ? 'non-':''}child CR should #{schedule_preemptable ? '':'not'} ask for preemptable instance by default" do
+ common_attrs = {cwd: "test",
+ priority: 1,
+ command: ["echo", "hello"],
+ output_path: "test",
+ mounts: {"test" => {"kind" => "json"}}}
+
+ Rails.configuration.preemptable_instances = preemptable_conf
+ set_user_from_auth :active
+
+ if requesting_c
+ cr = with_container_auth(Container.find_by_uuid requesting_c) do
+ create_minimal_req!(common_attrs)
+ end
+ assert_not_nil cr.requesting_container_uuid
+ else
+ cr = create_minimal_req!(common_attrs)
+ end
+
+ cr.state = ContainerRequest::Committed
+ cr.save!
+
+ assert_equal schedule_preemptable, cr.scheduling_parameters['preemptable']
+ end
+ end
+
[
[{"partitions" => ["fastcpu","vfastcpu", 100]}, ContainerRequest::Committed, ActiveRecord::RecordInvalid],
[{"partitions" => ["fastcpu","vfastcpu", 100]}, ContainerRequest::Uncommitted],
// Dispatcher service for Crunch that submits containers to the slurm queue.
import (
+ "bytes"
"context"
"flag"
"fmt"
log.Printf("Submitting container %s to slurm", ctr.UUID)
if err := disp.submit(ctr, disp.CrunchRunCommand); err != nil {
var text string
- if err == dispatchcloud.ErrConstraintsNotSatisfiable {
- text = fmt.Sprintf("cannot run container %s: %s", ctr.UUID, err)
+ if err, ok := err.(dispatchcloud.ConstraintsNotSatisfiableError); ok {
+ var logBuf bytes.Buffer
+ fmt.Fprintf(&logBuf, "cannot run container %s: %s\n", ctr.UUID, err)
+ if len(err.AvailableTypes) == 0 {
+ fmt.Fprint(&logBuf, "No instance types are configured.\n")
+ } else {
+ fmt.Fprint(&logBuf, "Available instance types:\n")
+ for _, t := range err.AvailableTypes {
+ fmt.Fprintf(&logBuf,
+ "Type %q: %d VCPUs, %d RAM, %d Scratch, %f Price\n",
+ t.Name, t.VCPUs, t.RAM, t.Scratch, t.Price,
+ )
+ }
+ }
+ text = logBuf.String()
disp.UpdateState(ctr.UUID, dispatch.Cancelled)
} else {
text = fmt.Sprintf("Error submitting container %s to slurm: %s", ctr.UUID, err)
types: []arvados.InstanceType{
{Name: "a1.tiny", Price: 0.02, RAM: 128000000, VCPUs: 1},
},
- err: dispatchcloud.ErrConstraintsNotSatisfiable,
+ err: dispatchcloud.ConstraintsNotSatisfiableError{},
},
} {
c.Logf("%#v", trial)
s.disp.cluster = &arvados.Cluster{InstanceTypes: trial.types}
args, err := s.disp.sbatchArgs(container)
- c.Check(err, Equals, trial.err)
+ c.Check(err == nil, Equals, trial.err == nil)
if trial.err == nil {
c.Check(args, DeepEquals, append([]string{"--job-name=123", "--nice=10000"}, trial.sbatchArgs...))
+ } else {
+ c.Check(len(err.(dispatchcloud.ConstraintsNotSatisfiableError).AvailableTypes), Equals, len(trial.types))
}
}
}
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
# llfuse 1.3.4 fails to install via pip
'llfuse >=1.2, <1.3.4',
'python-daemon',
- 'ciso8601 >=1.0.6',
+ 'ciso8601 >=1.0.6, <2.0.0',
'setuptools'
],
test_suite='tests',
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import (
return
}
}
+ bal.cleanupMounts()
if err = bal.CheckSanityEarly(&config.Client); err != nil {
return
})
}
+func (bal *Balancer) cleanupMounts() {
+ rwdev := map[string]*KeepService{}
+ for _, srv := range bal.KeepServices {
+ for _, mnt := range srv.mounts {
+ if !mnt.ReadOnly && mnt.DeviceID != "" {
+ rwdev[mnt.DeviceID] = srv
+ }
+ }
+ }
+ // Drop the readonly mounts whose device is mounted RW
+ // elsewhere.
+ for _, srv := range bal.KeepServices {
+ var dedup []*KeepMount
+ for _, mnt := range srv.mounts {
+ if mnt.ReadOnly && rwdev[mnt.DeviceID] != nil {
+ bal.logf("skipping srv %s readonly mount %q because same device %q is mounted read-write on srv %s", srv, mnt.UUID, mnt.DeviceID, rwdev[mnt.DeviceID])
+ } else {
+ dedup = append(dedup, mnt)
+ }
+ }
+ srv.mounts = dedup
+ }
+ for _, srv := range bal.KeepServices {
+ for _, mnt := range srv.mounts {
+ if mnt.Replication <= 0 {
+ log.Printf("%s: mount %s reports replication=%d, using replication=1", srv, mnt.UUID, mnt.Replication)
+ mnt.Replication = 1
+ }
+ }
+ }
+}
+
// CheckSanityEarly checks for configuration and runtime errors that
// can be detected before GetCurrentState() and ComputeChangeSets()
// are called.
errs := make(chan error, 2+len(bal.KeepServices))
wg := sync.WaitGroup{}
- // Start one goroutine for each KeepService: retrieve the
- // index, and add the returned blocks to BlockStateMap.
+ // When a device is mounted more than once, we will get its
+ // index only once, and call AddReplicas on all of the mounts.
+ // equivMount keys are the mounts that will be indexed, and
+ // each value is a list of mounts to apply the received index
+ // to.
+ equivMount := map[*KeepMount][]*KeepMount{}
+ // deviceMount maps each device ID to the one mount that will
+ // be indexed for that device.
+ deviceMount := map[string]*KeepMount{}
for _, srv := range bal.KeepServices {
+ for _, mnt := range srv.mounts {
+ equiv := deviceMount[mnt.DeviceID]
+ if equiv == nil {
+ equiv = mnt
+ if mnt.DeviceID != "" {
+ deviceMount[mnt.DeviceID] = equiv
+ }
+ }
+ equivMount[equiv] = append(equivMount[equiv], mnt)
+ }
+ }
+
+ // Start one goroutine for each (non-redundant) mount:
+ // retrieve the index, and add the returned blocks to
+ // BlockStateMap.
+ for _, mounts := range equivMount {
wg.Add(1)
- go func(srv *KeepService) {
+ go func(mounts []*KeepMount) {
defer wg.Done()
- bal.logf("%s: retrieve indexes", srv)
- for _, mount := range srv.mounts {
- bal.logf("%s: retrieve index", mount)
- idx, err := srv.IndexMount(c, mount.UUID, "")
- if err != nil {
- errs <- fmt.Errorf("%s: retrieve index: %v", mount, err)
- return
- }
- if len(errs) > 0 {
- // Some other goroutine encountered an
- // error -- any further effort here
- // will be wasted.
- return
- }
+ bal.logf("mount %s: retrieve index from %s", mounts[0], mounts[0].KeepService)
+ idx, err := mounts[0].KeepService.IndexMount(c, mounts[0].UUID, "")
+ if err != nil {
+ errs <- fmt.Errorf("%s: retrieve index: %v", mounts[0], err)
+ return
+ }
+ if len(errs) > 0 {
+ // Some other goroutine encountered an
+ // error -- any further effort here
+ // will be wasted.
+ return
+ }
+ for _, mount := range mounts {
bal.logf("%s: add %d replicas to map", mount, len(idx))
bal.BlockStateMap.AddReplicas(mount, idx)
- bal.logf("%s: done", mount)
+ bal.logf("%s: added %d replicas", mount, len(idx))
}
- bal.logf("%s: done", srv)
- }(srv)
+ bal.logf("mount %s: index done", mounts[0])
+ }(mounts)
}
// collQ buffers incoming collections so we can start fetching
blkid arvados.SizedDigest
blk *BlockState
}
- nWorkers := 1 + runtime.NumCPU()
- todo := make(chan balanceTask, nWorkers)
- results := make(chan balanceResult, 16)
- var wg sync.WaitGroup
- for i := 0; i < nWorkers; i++ {
- wg.Add(1)
- go func() {
- for work := range todo {
- results <- bal.balanceBlock(work.blkid, work.blk)
+ workers := runtime.GOMAXPROCS(-1)
+ todo := make(chan balanceTask, workers)
+ go func() {
+ bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
+ todo <- balanceTask{
+ blkid: blkid,
+ blk: blk,
}
- wg.Done()
- }()
- }
- bal.BlockStateMap.Apply(func(blkid arvados.SizedDigest, blk *BlockState) {
- todo <- balanceTask{
- blkid: blkid,
- blk: blk,
- }
- })
- close(todo)
+ })
+ close(todo)
+ }()
+ results := make(chan balanceResult, workers)
go func() {
+ var wg sync.WaitGroup
+ for i := 0; i < workers; i++ {
+ wg.Add(1)
+ go func() {
+ for work := range todo {
+ results <- bal.balanceBlock(work.blkid, work.blk)
+ }
+ wg.Done()
+ }()
+ }
wg.Wait()
close(results)
}()
for _, class := range bal.classes {
desired := blk.Desired[class]
+ countedDev := map[string]bool{}
have := 0
for _, slot := range slots {
- if slot.repl != nil && bal.mountsByClass[class][slot.mnt] {
- have++
+ if slot.repl != nil && bal.mountsByClass[class][slot.mnt] && !countedDev[slot.mnt.DeviceID] {
+ have += slot.mnt.Replication
+ if slot.mnt.DeviceID != "" {
+ countedDev[slot.mnt.DeviceID] = true
+ }
}
}
classState[class] = balancedBlockState{
}
})
- // Servers and mounts (with or without existing
+ // Servers/mounts/devices (with or without existing
// replicas) that are part of the best achievable
// layout for this storage class.
wantSrv := map[*KeepService]bool{}
wantMnt := map[*KeepMount]bool{}
+ wantDev := map[string]bool{}
// Positions (with existing replicas) that have been
// protected (via unsafeToDelete) to ensure we don't
// reduce replication below desired level when
// trashing replicas that aren't optimal positions for
// any storage class.
protMnt := map[*KeepMount]bool{}
+ // Replication planned so far (corresponds to wantMnt).
+ replWant := 0
+ // Protected replication (corresponds to protMnt).
+ replProt := 0
// trySlot tries using a slot to meet requirements,
// and returns true if all requirements are met.
trySlot := func(i int) bool {
slot := slots[i]
- if len(protMnt) < desired && slot.repl != nil {
+ if wantMnt[slot.mnt] || wantDev[slot.mnt.DeviceID] {
+ // Already allocated a replica to this
+ // backend device, possibly on a
+ // different server.
+ return false
+ }
+ if replProt < desired && slot.repl != nil && !protMnt[slot.mnt] {
unsafeToDelete[slot.repl.Mtime] = true
protMnt[slot.mnt] = true
+ replProt += slot.mnt.Replication
}
- if len(wantMnt) < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
+ if replWant < desired && (slot.repl != nil || !slot.mnt.ReadOnly) {
slots[i].want = true
wantSrv[slot.mnt.KeepService] = true
wantMnt[slot.mnt] = true
+ if slot.mnt.DeviceID != "" {
+ wantDev[slot.mnt.DeviceID] = true
+ }
+ replWant += slot.mnt.Replication
}
- return len(protMnt) >= desired && len(wantMnt) >= desired
+ return replProt >= desired && replWant >= desired
}
// First try to achieve desired replication without
if slot.repl == nil || !bal.mountsByClass[class][slot.mnt] {
continue
}
- if safe++; safe >= desired {
+ if safe += slot.mnt.Replication; safe >= desired {
break
}
}
cs.unachievable = true
classState[class] = cs
}
+
+ // Avoid deleting wanted replicas from devices that
+ // are mounted on multiple servers -- even if they
+ // haven't already been added to unsafeToDelete
+ // because the servers report different Mtimes.
+ for _, slot := range slots {
+ if slot.repl != nil && wantDev[slot.mnt.DeviceID] {
+ unsafeToDelete[slot.repl.Mtime] = true
+ }
+ }
}
// TODO: If multiple replicas are trashable, prefer the oldest
// replica that doesn't have a timestamp collision with
// others.
+ countedDev := map[string]bool{}
var have, want int
for _, slot := range slots {
+ if countedDev[slot.mnt.DeviceID] {
+ continue
+ }
if slot.want {
- want++
+ want += slot.mnt.Replication
}
if slot.repl != nil {
- have++
+ have += slot.mnt.Replication
+ }
+ if slot.mnt.DeviceID != "" {
+ countedDev[slot.mnt.DeviceID] = true
}
}
case surplus > 0:
s.overrep.replicas += surplus
s.overrep.blocks++
- s.overrep.bytes += bytes * int64(len(result.blk.Replicas)-result.want)
+ s.overrep.bytes += bytes * int64(result.have-result.want)
default:
s.justright.replicas += result.want
s.justright.blocks++
s.desired.blocks++
s.desired.bytes += bytes * int64(result.want)
}
- if len(result.blk.Replicas) > 0 {
- s.current.replicas += len(result.blk.Replicas)
+ if result.have > 0 {
+ s.current.replicas += result.have
s.current.blocks++
- s.current.bytes += bytes * int64(len(result.blk.Replicas))
+ s.current.bytes += bytes * int64(result.have)
}
- for len(s.replHistogram) <= len(result.blk.Replicas) {
+ for len(s.replHistogram) <= result.have {
s.replHistogram = append(s.replHistogram, 0)
}
- s.replHistogram[len(result.blk.Replicas)]++
+ s.replHistogram[result.have]++
}
for _, srv := range bal.KeepServices {
s.pulls += len(srv.ChangeSet.Pulls)
shouldPullMounts []string
shouldTrashMounts []string
+
+ expectResult balanceResult
}
func (bal *balancerSuite) SetUpSuite(c *check.C) {
}
bal.MinMtime = time.Now().UnixNano() - bal.signatureTTL*1e9
+ bal.cleanupMounts()
}
func (bal *balancerSuite) TestPerfect(c *check.C) {
shouldTrash: slots{2}})
}
+func (bal *balancerSuite) TestCleanupMounts(c *check.C) {
+ bal.srvs[3].mounts[0].KeepMount.ReadOnly = true
+ bal.srvs[3].mounts[0].KeepMount.DeviceID = "abcdef"
+ bal.srvs[14].mounts[0].KeepMount.DeviceID = "abcdef"
+ c.Check(len(bal.srvs[3].mounts), check.Equals, 1)
+ bal.cleanupMounts()
+ c.Check(len(bal.srvs[3].mounts), check.Equals, 0)
+ bal.try(c, tester{
+ known: 0,
+ desired: map[string]int{"default": 2},
+ current: slots{1},
+ shouldPull: slots{2}})
+}
+
+func (bal *balancerSuite) TestVolumeReplication(c *check.C) {
+ bal.srvs[0].mounts[0].KeepMount.Replication = 2 // srv 0
+ bal.srvs[14].mounts[0].KeepMount.Replication = 2 // srv e
+ bal.cleanupMounts()
+ // block 0 rendezvous is 3,e,a -- so slot 1 has repl=2
+ bal.try(c, tester{
+ known: 0,
+ desired: map[string]int{"default": 2},
+ current: slots{1},
+ shouldPull: slots{0}})
+ bal.try(c, tester{
+ known: 0,
+ desired: map[string]int{"default": 2},
+ current: slots{0, 1},
+ shouldPull: nil})
+ bal.try(c, tester{
+ known: 0,
+ desired: map[string]int{"default": 2},
+ current: slots{0, 1, 2},
+ shouldTrash: slots{2}})
+ bal.try(c, tester{
+ known: 0,
+ desired: map[string]int{"default": 3},
+ current: slots{0, 2, 3, 4},
+ shouldPull: slots{1},
+ shouldTrash: slots{4},
+ expectResult: balanceResult{
+ have: 4,
+ want: 3,
+ classState: map[string]balancedBlockState{"default": {
+ desired: 3,
+ surplus: 1,
+ unachievable: false}}}})
+ bal.try(c, tester{
+ known: 0,
+ desired: map[string]int{"default": 3},
+ current: slots{0, 1, 2, 3, 4},
+ shouldTrash: slots{2, 3, 4}})
+ bal.try(c, tester{
+ known: 0,
+ desired: map[string]int{"default": 4},
+ current: slots{0, 1, 2, 3, 4},
+ shouldTrash: slots{3, 4},
+ expectResult: balanceResult{
+ have: 6,
+ want: 4,
+ classState: map[string]balancedBlockState{"default": {
+ desired: 4,
+ surplus: 2,
+ unachievable: false}}}})
+ // block 1 rendezvous is 0,9,7 -- so slot 0 has repl=2
+ bal.try(c, tester{
+ known: 1,
+ desired: map[string]int{"default": 2},
+ current: slots{0},
+ expectResult: balanceResult{
+ have: 2,
+ want: 2,
+ classState: map[string]balancedBlockState{"default": {
+ desired: 2,
+ surplus: 0,
+ unachievable: false}}}})
+ bal.try(c, tester{
+ known: 1,
+ desired: map[string]int{"default": 3},
+ current: slots{0},
+ shouldPull: slots{1}})
+ bal.try(c, tester{
+ known: 1,
+ desired: map[string]int{"default": 4},
+ current: slots{0},
+ shouldPull: slots{1, 2}})
+ bal.try(c, tester{
+ known: 1,
+ desired: map[string]int{"default": 4},
+ current: slots{2},
+ shouldPull: slots{0, 1}})
+ bal.try(c, tester{
+ known: 1,
+ desired: map[string]int{"default": 4},
+ current: slots{7},
+ shouldPull: slots{0, 1, 2},
+ expectResult: balanceResult{
+ have: 1,
+ want: 4,
+ classState: map[string]balancedBlockState{"default": {
+ desired: 4,
+ surplus: -3,
+ unachievable: false}}}})
+ bal.try(c, tester{
+ known: 1,
+ desired: map[string]int{"default": 2},
+ current: slots{1, 2, 3, 4},
+ shouldPull: slots{0},
+ shouldTrash: slots{3, 4}})
+ bal.try(c, tester{
+ known: 1,
+ desired: map[string]int{"default": 2},
+ current: slots{0, 1, 2},
+ shouldTrash: slots{1, 2},
+ expectResult: balanceResult{
+ have: 4,
+ want: 2,
+ classState: map[string]balancedBlockState{"default": {
+ desired: 2,
+ surplus: 2,
+ unachievable: false}}}})
+}
+
+func (bal *balancerSuite) TestDeviceRWMountedByMultipleServers(c *check.C) {
+ bal.srvs[0].mounts[0].KeepMount.DeviceID = "abcdef"
+ bal.srvs[9].mounts[0].KeepMount.DeviceID = "abcdef"
+ bal.srvs[14].mounts[0].KeepMount.DeviceID = "abcdef"
+ // block 0 belongs on servers 3 and e, which have different
+ // device IDs.
+ bal.try(c, tester{
+ known: 0,
+ desired: map[string]int{"default": 2},
+ current: slots{1},
+ shouldPull: slots{0}})
+ // block 1 belongs on servers 0 and 9, which both report
+ // having a replica, but the replicas are on the same device
+ // ID -- so we should pull to the third position (7).
+ bal.try(c, tester{
+ known: 1,
+ desired: map[string]int{"default": 2},
+ current: slots{0, 1},
+ shouldPull: slots{2}})
+ // block 1 can be pulled to the doubly-mounted device, but the
+ // pull should only be done on the first of the two servers.
+ bal.try(c, tester{
+ known: 1,
+ desired: map[string]int{"default": 2},
+ current: slots{2},
+ shouldPull: slots{0}})
+ // block 0 has one replica on a single device mounted on two
+ // servers (e,9 at positions 1,9). Trashing the replica on 9
+ // would lose the block.
+ bal.try(c, tester{
+ known: 0,
+ desired: map[string]int{"default": 2},
+ current: slots{1, 9},
+ shouldPull: slots{0},
+ expectResult: balanceResult{
+ have: 1,
+ classState: map[string]balancedBlockState{"default": {
+ desired: 2,
+ surplus: -1,
+ unachievable: false}}}})
+ // block 0 is overreplicated, but the second and third
+ // replicas are the same replica according to DeviceID
+ // (despite different Mtimes). Don't trash the third replica.
+ bal.try(c, tester{
+ known: 0,
+ desired: map[string]int{"default": 2},
+ current: slots{0, 1, 9},
+ expectResult: balanceResult{
+ have: 2,
+ classState: map[string]balancedBlockState{"default": {
+ desired: 2,
+ surplus: 0,
+ unachievable: false}}}})
+ // block 0 is overreplicated; the third and fifth replicas are
+ // extra, but the fourth is another view of the second and
+ // shouldn't be trashed.
+ bal.try(c, tester{
+ known: 0,
+ desired: map[string]int{"default": 2},
+ current: slots{0, 1, 5, 9, 12},
+ shouldTrash: slots{5, 12},
+ expectResult: balanceResult{
+ have: 4,
+ classState: map[string]balancedBlockState{"default": {
+ desired: 2,
+ surplus: 2,
+ unachievable: false}}}})
+}
+
func (bal *balancerSuite) TestChangeStorageClasses(c *check.C) {
// For known blocks 0/1/2/3, server 9 is slot 9/1/14/0 in
// probe order. For these tests we give it two mounts, one
for _, srv := range bal.srvs {
srv.ChangeSet = &ChangeSet{}
}
- bal.balanceBlock(knownBlkid(t.known), blk)
+ result := bal.balanceBlock(knownBlkid(t.known), blk)
var didPull, didTrash slots
var didPullMounts, didTrashMounts []string
sort.Strings(didTrashMounts)
c.Check(didTrashMounts, check.DeepEquals, t.shouldTrashMounts)
}
+ if t.expectResult.have > 0 {
+ c.Check(result.have, check.Equals, t.expectResult.have)
+ }
+ if t.expectResult.want > 0 {
+ c.Check(result.want, check.Equals, t.expectResult.want)
+ }
+ if t.expectResult.classState != nil {
+ c.Check(result.classState, check.DeepEquals, t.expectResult.classState)
+ }
}
// srvList returns the KeepServices, sorted in rendezvous order and
"flag"
"fmt"
"log"
+ "net/http"
"os"
"os/signal"
"syscall"
// more memory, but can reduce store-and-forward latency when
// fetching pages)
CollectionBuffers int
+
+ // Timeout for outgoing http request/response cycle.
+ RequestTimeout arvados.Duration
}
// RunOptions controls runtime behavior. The flags/options that belong
log.Fatal(config.DumpAndExit(cfg))
}
+ to := time.Duration(cfg.RequestTimeout)
+ if to == 0 {
+ to = 30 * time.Minute
+ }
+ arvados.DefaultSecureClient.Timeout = to
+ arvados.InsecureHTTPClient.Timeout = to
+ http.DefaultClient.Timeout = to
+
log.Printf("keep-balance %s started", version)
if *debugFlag {
- disk
RunPeriod: 600s
CollectionBatchSize: 100000
-CollectionBuffers: 1000`)
+CollectionBuffers: 1000
+RequestTimeout: 30m`)
func usage() {
fmt.Fprintf(os.Stderr, `
while the current page is still being processed. If this is zero
or omitted, pages are processed serially.
+ RequestTimeout is the maximum time keep-balance will spend on a
+ single HTTP request (getting a page of collections, getting the
+ block index from a keepstore server, or sending a trash or pull
+ list to a keepstore server). Defaults to 30 minutes.
+
Limitations:
keep-balance does not attempt to discover whether committed pull
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
package main
import "golang.org/x/net/webdav"
locatorIn := mux.Vars(req)["locator"]
+ // Check if the client specified storage classes
+ if req.Header.Get("X-Keep-Storage-Classes") != "" {
+ var scl []string
+ for _, sc := range strings.Split(req.Header.Get("X-Keep-Storage-Classes"), ",") {
+ scl = append(scl, strings.Trim(sc, " "))
+ }
+ kc.StorageClasses = scl
+ }
+
_, err = fmt.Sscanf(req.Header.Get("Content-Length"), "%d", &expectLength)
if err != nil || expectLength < 0 {
err = LengthRequiredError
c.Check(err, ErrorMatches, `.*loop detected.*`)
}
+func (s *ServerRequiredSuite) TestStorageClassesHeader(c *C) {
+ kc := runProxy(c, nil, false)
+ defer closeListener()
+
+ // Set up fake keepstore to record request headers
+ var hdr http.Header
+ ts := httptest.NewServer(http.HandlerFunc(
+ func(w http.ResponseWriter, r *http.Request) {
+ hdr = r.Header
+ http.Error(w, "Error", http.StatusInternalServerError)
+ }))
+ defer ts.Close()
+
+ // Point keepproxy router's keepclient to the fake keepstore
+ sr := map[string]string{
+ TestProxyUUID: ts.URL,
+ }
+ router.(*proxyHandler).KeepClient.SetServiceRoots(sr, sr, sr)
+
+ // Set up client to ask for storage classes to keepproxy
+ kc.StorageClasses = []string{"secure"}
+ content := []byte("Very important data")
+ _, _, err := kc.PutB(content)
+ c.Check(err, NotNil)
+ c.Check(hdr.Get("X-Keep-Storage-Classes"), Equals, "secure")
+}
+
func (s *ServerRequiredSuite) TestDesiredReplicas(c *C) {
kc := runProxy(c, nil, false)
defer closeListener()
}
func (s *ServerRequiredSuite) TestAskGetKeepProxyConnectionError(c *C) {
- arv, err := arvadosclient.MakeArvadosClient()
- c.Assert(err, Equals, nil)
+ kc := runProxy(c, nil, false)
+ defer closeListener()
- // keepclient with no such keep server
- kc := keepclient.New(arv)
+ // Point keepproxy to a non-existant keepstore
locals := map[string]string{
TestProxyUUID: "http://localhost:12345",
}
- kc.SetServiceRoots(locals, nil, nil)
+ router.(*proxyHandler).KeepClient.SetServiceRoots(locals, nil, nil)
- // Ask should result in temporary connection refused error
+ // Ask should result in temporary bad gateway error
hash := fmt.Sprintf("%x", md5.Sum([]byte("foo")))
- _, _, err = kc.Ask(hash)
+ _, _, err := kc.Ask(hash)
c.Check(err, NotNil)
errNotFound, _ := err.(*keepclient.ErrNotFound)
c.Check(errNotFound.Temporary(), Equals, true)
- c.Assert(err, ErrorMatches, ".*connection refused.*")
+ c.Assert(err, ErrorMatches, ".*HTTP 502.*")
- // Get should result in temporary connection refused error
+ // Get should result in temporary bad gateway error
_, _, _, err = kc.Get(hash)
c.Check(err, NotNil)
errNotFound, _ = err.(*keepclient.ErrNotFound)
c.Check(errNotFound.Temporary(), Equals, true)
- c.Assert(err, ErrorMatches, ".*connection refused.*")
+ c.Assert(err, ErrorMatches, ".*HTTP 502.*")
}
func (s *NoKeepServerSuite) TestAskGetNoKeepServerError(c *C) {
case <-ctx.Done():
theConfig.debugLogf("%s: taking PutReader's input away: %s", v, ctx.Err())
// Our pipe might be stuck in Write(), waiting for
- // io.Copy() to read. If so, un-stick it. This means
+ // PutReader() to read. If so, un-stick it. This means
// PutReader will get corrupt data, but that's OK: the
// size and MD5 won't match, so the write will fail.
go io.Copy(ioutil.Discard, bufr)
theConfig.debugLogf("%s: abandoning PutReader goroutine", v)
return ctx.Err()
case <-ready:
+ // Unblock pipe in case PutReader did not consume it.
+ io.Copy(ioutil.Discard, bufr)
return v.translateError(err)
}
}
subsecs = float(subsec_match.group(1))
timestr = timestr[:subsec_match.start()] + 'Z'
return calendar.timegm(time.strptime(timestr + 'UTC',
- ARVADOS_TIMEFMT + '%Z'))
+ ARVADOS_TIMEFMT + '%Z')) + subsecs
def timestamp_fresh(timestamp, fresh_time):
return (time.time() - timestamp) < fresh_time
@RetryMixin._retry()
def create_cloud_node(self):
self._logger.info("Sending create_node request for node size %s.",
- self.cloud_size.name)
+ self.cloud_size.id)
try:
self.cloud_node = self._cloud.create_node(self.cloud_size,
self.arvados_node)
reason for the decision.
"""
+ # If this node's size is invalid (because it has a stale arvados_node_size
+ # tag), return True so that it's properly shut down.
+ if self.cloud_node.size.id == 'invalid':
+ return (True, "node's size tag '%s' not recognizable" % (self.cloud_node.extra['arvados_node_size'],))
+
# Collect states and then consult state transition table whether we
# should shut down. Possible states are:
# crunch_worker_state = ['unpaired', 'busy', 'idle', 'down']
try:
kwargs = self.create_kwargs.copy()
kwargs.update(self.arvados_create_kwargs(size, arvados_node))
- kwargs['size'] = size
+ kwargs['size'] = size.real
return self.real.create_node(**kwargs)
except CLOUD_ERRORS as create_error:
# Workaround for bug #6702: sometimes the create node request
def arvados_create_kwargs(self, size, arvados_node):
tags = {
+ # Set up tag indicating the Arvados assigned Cloud Size id.
+ 'arvados_node_size': size.id,
'booted_at': time.strftime(ARVADOS_TIMEFMT, time.gmtime()),
'arv-ping-url': self._make_ping_url(arvados_node)
}
# Do our own filtering based on tag.
nodes = [node for node in
super(ComputeNodeDriver, self).list_nodes(ex_fetch_nic=False, ex_fetch_power_state=False)
- if node.extra["tags"].get("arvados-class") == self.tags["arvados-class"]]
+ if node.extra.get("tags", {}).get("arvados-class") == self.tags["arvados-class"]]
for n in nodes:
# Need to populate Node.size
if not n.size:
n.size = self.sizes[n.extra["properties"]["hardwareProfile"]["vmSize"]]
+ n.extra['arvados_node_size'] = n.extra.get('tags', {}).get('arvados_node_size')
return nodes
def broken(self, cloud_node):
"VolumeSize": volsize,
"VolumeType": "gp2"
}}]
+ if size.preemptable:
+ # Request a Spot instance for this node
+ kw['ex_spot_market'] = True
return kw
def sync_node(self, cloud_node, arvados_node):
self.real.ex_create_tags(cloud_node,
{'Name': arvados_node_fqdn(arvados_node)})
+ def create_node(self, size, arvados_node):
+ # Set up tag indicating the Arvados assigned Cloud Size id.
+ self.create_kwargs['ex_metadata'].update({'arvados_node_size': size.id})
+ return super(ComputeNodeDriver, self).create_node(size, arvados_node)
+
def list_nodes(self):
# Need to populate Node.size
nodes = super(ComputeNodeDriver, self).list_nodes()
for n in nodes:
if not n.size:
n.size = self.sizes[n.extra["instance_type"]]
+ n.extra['arvados_node_size'] = n.extra.get('tags', {}).get('arvados_node_size')
return nodes
@classmethod
'ex_disks_gce_struct': disks,
}
result['ex_metadata'].update({
- 'arv-ping-url': self._make_ping_url(arvados_node),
- 'booted_at': time.strftime(ARVADOS_TIMEFMT, time.gmtime()),
- 'hostname': arvados_node_fqdn(arvados_node),
- })
+ 'arvados_node_size': size.id,
+ 'arv-ping-url': self._make_ping_url(arvados_node),
+ 'booted_at': time.strftime(ARVADOS_TIMEFMT, time.gmtime()),
+ 'hostname': arvados_node_fqdn(arvados_node),
+ })
return result
-
def list_nodes(self):
# The GCE libcloud driver only supports filtering node lists by zone.
# Do our own filtering based on tag list.
nodelist = [node for node in
super(ComputeNodeDriver, self).list_nodes()
if self.node_tags.issubset(node.extra.get('tags', []))]
- # As of 0.18, the libcloud GCE driver sets node.size to the size's name.
- # It's supposed to be the actual size object. Check that it's not,
- # and monkeypatch the results when that's the case.
- if nodelist and not hasattr(nodelist[0].size, 'id'):
- for node in nodelist:
+ for node in nodelist:
+ # As of 0.18, the libcloud GCE driver sets node.size to the size's name.
+ # It's supposed to be the actual size object. Check that it's not,
+ # and monkeypatch the results when that's the case.
+ if not hasattr(node.size, 'id'):
node.size = self._sizes_by_id[node.size]
+ # Get arvados-assigned cloud size id
+ node.extra['arvados_node_size'] = node.extra.get('metadata', {}).get('arvados_node_size')
return nodelist
@classmethod
from .baseactor import BaseNodeManagerActor
+from functools import partial
from libcloud.common.types import LibcloudError
from libcloud.common.exceptions import BaseHTTPError
if not self.has_option(sec_name, opt_name):
self.set(sec_name, opt_name, value)
- def get_section(self, section, transformer=None):
+ def get_section(self, section, transformers={}, default_transformer=None):
+ transformer_map = {
+ str: self.get,
+ int: self.getint,
+ bool: self.getboolean,
+ float: self.getfloat,
+ }
result = self._dict()
for key, value in self.items(section):
+ transformer = None
+ if transformers.get(key) in transformer_map:
+ transformer = partial(transformer_map[transformers[key]], section)
+ elif default_transformer in transformer_map:
+ transformer = partial(transformer_map[default_transformer], section)
if transformer is not None:
try:
- value = transformer(value)
+ value = transformer(key)
except (TypeError, ValueError):
pass
result[key] = value
self.get_section('Cloud Create'),
driver_class=driver_class)
- def node_sizes(self, all_sizes):
+ def node_sizes(self):
"""Finds all acceptable NodeSizes for our installation.
Returns a list of (NodeSize, kwargs) pairs for each NodeSize object
returned by libcloud that matches a size listed in our config file.
"""
-
+ all_sizes = self.new_cloud_client().list_sizes()
size_kwargs = {}
+ section_types = {
+ 'instance_type': str,
+ 'price': float,
+ 'preemptable': bool,
+ }
for sec_name in self.sections():
sec_words = sec_name.split(None, 2)
if sec_words[0] != 'Size':
continue
- size_spec = self.get_section(sec_name, int)
- if 'price' in size_spec:
- size_spec['price'] = float(size_spec['price'])
+ size_spec = self.get_section(sec_name, section_types, int)
+ if 'preemptable' not in size_spec:
+ size_spec['preemptable'] = False
+ if 'instance_type' not in size_spec:
+ # Assume instance type is Size name if missing
+ size_spec['instance_type'] = sec_words[1]
+ size_spec['id'] = sec_words[1]
size_kwargs[sec_words[1]] = size_spec
# EC2 node sizes are identified by id. GCE sizes are identified by name.
matching_sizes = []
for size in all_sizes:
- if size.id in size_kwargs:
- matching_sizes.append((size, size_kwargs[size.id]))
- elif size.name in size_kwargs:
- matching_sizes.append((size, size_kwargs[size.name]))
+ matching_sizes += [
+ (size, size_kwargs[s]) for s in size_kwargs
+ if size_kwargs[s]['instance_type'] == size.id
+ or size_kwargs[s]['instance_type'] == size.name
+ ]
return matching_sizes
def shutdown_windows(self):
- return [int(n)
+ return [float(n)
for n in self.get('Cloud', 'shutdown_windows').split(',')]
busy_count = counts["busy"]
wishlist_count = self._size_wishlist(size)
- self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.name,
+ self._logger.info("%s: wishlist %i, up %i (booting %i, unpaired %i, idle %i, busy %i), down %i, shutdown %i", size.id,
wishlist_count,
up_count,
counts["booting"],
can_boot = int((self.max_total_price - total_price) / size.price)
if can_boot == 0:
self._logger.info("Not booting %s (price %s) because with it would exceed max_total_price of %s (current total_price is %s)",
- size.name, size.price, self.max_total_price, total_price)
+ size.id, size.price, self.max_total_price, total_price)
return can_boot
else:
return wanted
return None
arvados_node = self.arvados_nodes.find_stale_node(self.node_stale_after)
self._logger.info("Want %i more %s nodes. Booting a node.",
- nodes_wanted, cloud_size.name)
+ nodes_wanted, cloud_size.id)
new_setup = self._node_setup.start(
timer_actor=self._timer,
arvados_client=self._new_arvados(),
that would best satisfy the jobs, choosing the cheapest size that
satisfies each job, and ignoring jobs that can't be satisfied.
"""
+ class InvalidCloudSize(object):
+ """
+ Dummy CloudSizeWrapper-like class, to be used when a cloud node doesn't
+ have a recognizable arvados_node_size tag.
+ """
+ def __init__(self):
+ self.id = 'invalid'
+ self.name = 'invalid'
+ self.ram = 0
+ self.disk = 0
+ self.scratch = 0
+ self.cores = 0
+ self.bandwidth = 0
+ self.price = 9999999
+ self.preemptable = False
+ self.extra = {}
+
+ def meets_constraints(self, **kwargs):
+ return False
+
class CloudSizeWrapper(object):
def __init__(self, real_size, node_mem_scaling, **kwargs):
self.disk = 0
self.scratch = self.disk * 1000
self.ram = int(self.ram * node_mem_scaling)
+ self.preemptable = False
for name, override in kwargs.iteritems():
+ if name == 'instance_type': continue
if not hasattr(self, name):
raise ValueError("unrecognized size field '%s'" % (name,))
setattr(self, name, override)
wants = {'cores': want_value('min_cores_per_node'),
'ram': want_value('min_ram_mb_per_node'),
'scratch': want_value('min_scratch_mb_per_node')}
+ # EC2 node sizes are identified by id. GCE sizes are identified by name.
for size in self.cloud_sizes:
if (size.meets_constraints(**wants) and
- (specified_size is None or size.id == specified_size)):
- return size
+ (specified_size is None or
+ size.id == specified_size or size.name == specified_size)):
+ return size
return None
def servers_for_queue(self, queue):
"Job's min_nodes constraint is greater than the configured "
"max_nodes (%d)" % self.max_nodes)
elif (want_count*cloud_size.price <= self.max_price):
- servers.extend([cloud_size.real] * want_count)
+ servers.extend([cloud_size] * want_count)
else:
unsatisfiable_jobs[job['uuid']] = (
"Job's price (%d) is above system's max_price "
for s in self.cloud_sizes:
if s.id == sizeid:
return s
- return None
+ return self.InvalidCloudSize()
class JobQueueMonitorActor(clientactor.RemotePollLoopActor):
job_uuid,
error)
self._logger.debug("Calculated wishlist: %s",
- ', '.join(s.name for s in server_list) or "(empty)")
+ ', '.join(s.id for s in server_list) or "(empty)")
return super(JobQueueMonitorActor, self)._got_response(server_list)
return root_logger
def build_server_calculator(config):
- cloud_size_list = config.node_sizes(config.new_cloud_client().list_sizes())
+ cloud_size_list = config.node_sizes()
if not cloud_size_list:
abort("No valid node sizes configured")
return ServerCalculator(cloud_size_list,
config.getfloat('Daemon', 'node_mem_scaling'))
def launch_pollers(config, server_calculator):
- poll_time = config.getint('Daemon', 'poll_time')
+ poll_time = config.getfloat('Daemon', 'poll_time')
max_poll_time = config.getint('Daemon', 'max_poll_time')
timer = TimedCallBackActor.start(poll_time / 10.0).tell_proxy()
def _send_request(self):
nodes = self._client.list_nodes()
for n in nodes:
- # Replace with libcloud NodeSize object with compatible
+ # Replace the libcloud NodeSize object with compatible
# CloudSizeWrapper object which merges the size info reported from
# the cloud with size information from the configuration file.
- n.size = self._calculator.find_size(n.size.id)
+ n.size = self._calculator.find_size(n.extra['arvados_node_size'])
return nodes
global all_nodes, create_calls
create_calls += 1
nodeid = "node%i" % create_calls
+ if ex_tags is None:
+ ex_tags = {}
+ ex_tags.update({'arvados_node_size': size.id})
n = Node(nodeid, nodeid, NodeState.RUNNING, [], [], self, size=size, extra={"tags": ex_tags})
all_nodes.append(n)
if ex_customdata:
ping_url = re.search(r"echo '(.*)' > /var/tmp/arv-node-data/arv-ping-url", ex_customdata).groups(1)[0]
if ex_userdata:
ping_url = ex_userdata
- if ex_metadata:
+ elif ex_metadata:
ping_url = ex_metadata["arv-ping-url"]
ping_url += "&instance_id=" + nodeid
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
create_calls += 1
if create_calls < 2:
raise RateLimitReachedError(429, "Rate limit exceeded",
- headers={'retry-after': '12'})
+ headers={'retry-after': '2'})
elif create_calls < 3:
raise BaseHTTPError(429, "Rate limit exceeded",
- {'retry-after': '2'})
+ {'retry-after': '1'})
else:
return super(RetryDriver, self).create_node(name=name,
size=size,
auth=auth,
ex_metadata=ex_metadata,
ex_userdata=ex_userdata)
- n.extra = {"launch_time": time.strftime(ARVADOS_TIMEFMT, time.gmtime())[:-1]}
+ n.extra = {
+ "launch_time": time.strftime(ARVADOS_TIMEFMT, time.gmtime())[:-1],
+ "tags" : {
+ "arvados_node_size": size.id
+ }
+ }
return n
def list_sizes(self, **kwargs):
ex_metadata=ex_metadata)
n.extra = {
"metadata": {
- "items": [{"key": k, "value": v} for k,v in ex_metadata.iteritems()]
+ "items": [{"key": k, "value": v} for k,v in ex_metadata.iteritems()],
+ "arvados_node_size": size.id
},
"zone": "fake"
}
('share/doc/arvados-node-manager', ['agpl-3.0.txt', 'README.rst']),
],
install_requires=[
- 'apache-libcloud>=2.3',
+ 'apache-libcloud>=2.3.1.dev1',
'arvados-python-client>=0.1.20170731145219',
'future',
'pykka',
'python-daemon',
'setuptools'
],
+ dependency_links=[
+ "https://github.com/curoverse/libcloud/archive/apache-libcloud-2.3.1.dev1.zip"
+ ],
test_suite='tests',
tests_require=[
'requests',
'pbr<1.7.0',
'mock>=1.0',
- 'apache-libcloud>=2.3',
+ 'apache-libcloud>=2.3.1.dev1',
],
zip_safe=False
)
max_total_price = 0
# Poll Azure nodes and Arvados for new information every N seconds.
-poll_time = 5
+poll_time = 0.5
# Polls have exponential backoff when services fail to respond.
# This is the longest time to wait between polls.
-max_poll_time = 300
+max_poll_time = 1
# If Node Manager can't succesfully poll a service for this long,
# it will never start or stop compute nodes, on the assumption that its
# information is too outdated.
-poll_stale_after = 600
+poll_stale_after = 1
# If Node Manager boots a cloud node, and it does not pair with an Arvados
# node before this long, assume that there was a cloud bootstrap failure and
# Azure bills by the minute, so it makes sense to agressively shut down idle
# nodes. Specify at least two windows. You can add as many as you need beyond
# that.
-shutdown_windows = 1, 999999
+shutdown_windows = 0.05, 999999
[Cloud Credentials]
# Use "azure account list" with the azure CLI to get these values.
max_total_price = 0
# Poll Azure nodes and Arvados for new information every N seconds.
-poll_time = 5
+poll_time = 0.5
# Polls have exponential backoff when services fail to respond.
# This is the longest time to wait between polls.
-max_poll_time = 300
+max_poll_time = 1
# If Node Manager can't succesfully poll a service for this long,
# it will never start or stop compute nodes, on the assumption that its
# information is too outdated.
-poll_stale_after = 600
+poll_stale_after = 1
# If Node Manager boots a cloud node, and it does not pair with an Arvados
# node before this long, assume that there was a cloud bootstrap failure and
# Azure bills by the minute, so it makes sense to agressively shut down idle
# nodes. Specify at least two windows. You can add as many as you need beyond
# that.
-shutdown_windows = 1, 999999
+shutdown_windows = 0.05, 999999
[Cloud Credentials]
max_total_price = 0
# Poll Azure nodes and Arvados for new information every N seconds.
-poll_time = 5
+poll_time = 0.5
# Polls have exponential backoff when services fail to respond.
# This is the longest time to wait between polls.
-max_poll_time = 300
+max_poll_time = 1
# If Node Manager can't succesfully poll a service for this long,
# it will never start or stop compute nodes, on the assumption that its
# information is too outdated.
-poll_stale_after = 600
+poll_stale_after = 1
# If Node Manager boots a cloud node, and it does not pair with an Arvados
# node before this long, assume that there was a cloud bootstrap failure and
# Azure bills by the minute, so it makes sense to agressively shut down idle
# nodes. Specify at least two windows. You can add as many as you need beyond
# that.
-shutdown_windows = 1, 999999
+shutdown_windows = 0.05, 999999
[Cloud Credentials]
key = 00000000-0000-0000-0000-000000000000
return 0
-def remaining_jobs(g):
- update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
- "\n".join("echo '%s|alloc|(null)'" % (v) for k,v in compute_nodes.items()))
-
- for k,v in all_jobs.items():
- all_jobs[k] = "Running"
-
- set_squeue(g)
-
- return 0
-
-
def node_busy(g):
update_script(os.path.join(fake_slurm, "sinfo"), "#!/bin/sh\n" +
"\n".join("echo '%s|idle|(null)'" % (v) for k,v in compute_nodes.items()))
def node_shutdown(g):
global compute_nodes
- del compute_nodes[g.group(1)]
+ if g.group(1) in compute_nodes:
+ del compute_nodes[g.group(1)]
return 0
def jobs_req(g):
driver_class=driver_class,
ssh_key=os.path.join(fake_slurm, "id_rsa.pub")))
- # Tests must complete in less than 3 minutes.
- timeout = time.time() + 180
+ # Tests must complete in less than 30 seconds.
+ timeout = time.time() + 30
terminated = False
# Now start node manager
if code != 0:
detail.error("Check failed")
if not terminated:
- p.terminate()
+ p.kill()
terminated = True
if terminated:
detail.error("Exceeded timeout with actions remaining: %s", actions)
code += 1
if not terminated:
- p.terminate()
+ p.kill()
terminated = True
k, v = actions[0]
code += v(g)
if code != 0:
detail.error("Action failed")
- p.terminate()
+ p.kill()
terminated = True
if not actions:
- p.terminate()
+ p.kill()
terminated = True
except KeyboardInterrupt:
p.kill()
],
# Checks (things that shouldn't happen)
{
- r".*Suggesting shutdown because node state is \('down', .*\)": fail,
r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 4),
r".*Setting node quota.*": fail,
},
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
(r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
- (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", remaining_jobs),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown)
],
# Checks (things that shouldn't happen)
{
- r".*Suggesting shutdown because node state is \('down', .*\)": fail,
r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 2),
r".*Sending create_node request.*": partial(expect_count, 5)
},
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", node_paired),
(r".*ComputeNodeMonitorActor\..*\.([^[]*).*Not eligible for shut down because node state is \('busy', 'open', .*\)", node_busy),
- (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", remaining_jobs),
+ (r".*ComputeNodeMonitorActor\..*\.([^[]*).*Suggesting shutdown because node state is \('idle', 'open', .*\)", noop),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
(r".*ComputeNodeShutdownActor\..*\.([^[]*).*Shutdown success", node_shutdown),
(r".*sending request", jobs_req),
],
# Checks (things that shouldn't happen)
{
- r".*Suggesting shutdown because node state is \('down', .*\)": fail,
r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 6),
r".*Sending create_node request.*": partial(expect_count, 9)
},
# Actions (pattern -> action)
[
(r".*Daemon started", set_squeue),
- (r".*Rate limit exceeded - scheduling retry in 12 seconds", noop),
(r".*Rate limit exceeded - scheduling retry in 2 seconds", noop),
+ (r".*Rate limit exceeded - scheduling retry in 1 seconds", noop),
(r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)", noop),
],
# Checks (things that shouldn't happen)
],
# Checks (things that shouldn't happen)
{
- r".*Suggesting shutdown because node state is \('down', .*\)": fail,
r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 1),
r".*Setting node quota.*": fail,
},
],
# Checks (things that shouldn't happen)
{
- r".*Suggesting shutdown because node state is \('down', .*\)": fail,
r".*Cloud node (\S+) is now paired with Arvados node (\S+) with hostname (\S+)": partial(expect_count, 1),
r".*Setting node quota.*": fail,
},
time_mock.return_value += 200
self.assertEqual(961, timer.next_opening())
self.assertFalse(timer.window_open())
+
+
+class ArvadosTimestamp(unittest.TestCase):
+ def test_arvados_timestamp(self):
+ self.assertEqual(1527710178, cnode.arvados_timestamp('2018-05-30T19:56:18Z'))
+ self.assertEqual(1527710178.999371, cnode.arvados_timestamp('2018-05-30T19:56:18.999371Z'))
self.assertEquals(self.node_actor.shutdown_eligible().get(self.TIMEOUT),
(False, "node state is ('unpaired', 'open', 'boot wait', 'idle exceeded')"))
+ def test_shutdown_when_invalid_cloud_node_size(self):
+ self.make_mocks(1)
+ self.cloud_mock.size.id = 'invalid'
+ self.cloud_mock.extra['arvados_node_size'] = 'stale.type'
+ self.make_actor()
+ self.shutdowns._set_state(True, 600)
+ self.assertEquals((True, "node's size tag 'stale.type' not recognizable"),
+ self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+
def test_shutdown_without_arvados_node(self):
self.make_actor(start_time=0)
self.shutdowns._set_state(True, 600)
for an_error, is_cloud_error in errors:
self.driver_mock().create_node.side_effect = an_error
with self.assertRaises(an_error):
- driver.create_node('1', 'id_1')
+ driver.create_node(testutil.MockSize(1), 'id_1')
if is_cloud_error:
error_count += 1
self.assertEqual(error_count, status.tracker.get('create_node_errors'))
self.assertIn('ping_secret=ssshh',
create_method.call_args[1].get('ex_tags', {}).get('arv-ping-url', ""))
+ def test_create_includes_arvados_node_size(self):
+ arv_node = testutil.arvados_node_mock()
+ arv_node["hostname"] = None
+ size = testutil.MockSize(1)
+ driver = self.new_driver()
+ driver.create_node(size, arv_node)
+ create_method = self.driver_mock().create_node
+ self.assertTrue(create_method.called)
+ self.assertIn(
+ ('arvados_node_size', size.id),
+ create_method.call_args[1].get('ex_tags', {'tags': 'missing'}).items()
+ )
+
def test_name_from_new_arvados_node(self):
arv_node = testutil.arvados_node_mock(hostname=None)
driver = self.new_driver()
self.assertEqual('compute-000000000000063-zzzzz',
driver.arvados_create_kwargs(testutil.MockSize(1), arv_node)['name'])
-
-
def check_node_tagged(self, cloud_node, expected_tags):
tag_mock = self.driver_mock().ex_create_tags
self.assertTrue(tag_mock.called)
""",
driver.arvados_create_kwargs(testutil.MockSize(1), arv_node)['ex_customdata'])
+ def test_list_nodes_ignores_nodes_without_tags(self):
+ driver = self.new_driver(create_kwargs={"tag_arvados-class": "dynamic-compute"})
+ # Mock cloud node without tags
+ nodelist = [testutil.cloud_node_mock(1)]
+ self.driver_mock().list_nodes.return_value = nodelist
+ n = driver.list_nodes()
+ self.assertEqual([], n)
+
def test_create_raises_but_actually_succeeded(self):
arv_node = testutil.arvados_node_mock(1, hostname=None)
driver = self.new_driver(create_kwargs={"tag_arvados-class": "dynamic-compute"})
driver.create_node(testutil.MockSize(1), arv_node)
create_method = self.driver_mock().create_node
self.assertTrue(create_method.called)
+ self.assertIn(
+ ('test', 'testvalue'),
+ create_method.call_args[1].get('ex_metadata', {'arg': 'missing'}).items()
+ )
+
+ def test_create_includes_arvados_node_size(self):
+ arv_node = testutil.arvados_node_mock()
+ size = testutil.MockSize(1)
+ driver = self.new_driver()
+ driver.create_node(size, arv_node)
+ create_method = self.driver_mock().create_node
+ self.assertTrue(create_method.called)
+ self.assertIn(
+ ('arvados_node_size', size.id),
+ create_method.call_args[1].get('ex_metadata', {'arg': 'missing'}).items()
+ )
+
+ def test_create_preemptable_instance(self):
+ arv_node = testutil.arvados_node_mock()
+ driver = self.new_driver()
+ driver.create_node(testutil.MockSize(1, preemptable=True), arv_node)
+ create_method = self.driver_mock().create_node
+ self.assertTrue(create_method.called)
self.assertEqual(
- {'test':'testvalue'},
- create_method.call_args[1].get('ex_metadata', {'arg': 'missing'})
+ True,
+ create_method.call_args[1].get('ex_spot_market', 'arg missing')
)
def test_hostname_from_arvados_node(self):
metadata = self.driver_mock().create_node.call_args[1]['ex_metadata']
self.assertIn('ping_secret=ssshh', metadata.get('arv-ping-url'))
+ def test_create_includes_arvados_node_size(self):
+ arv_node = testutil.arvados_node_mock()
+ size = testutil.MockSize(1)
+ driver = self.new_driver()
+ driver.create_node(size, arv_node)
+ create_method = self.driver_mock().create_node
+ self.assertIn(
+ ('arvados_node_size', size.id),
+ create_method.call_args[1].get('ex_metadata', {'metadata':'missing'}).items()
+ )
+
def test_create_raises_but_actually_succeeded(self):
arv_node = testutil.arvados_node_mock(1, hostname=None)
driver = self.new_driver()
cores = 1
price = 0.8
+[Size 1.preemptable]
+instance_type = 1
+preemptable = true
+cores = 1
+price = 0.8
+
[Logging]
file = /dev/null
level = DEBUG
def test_list_sizes(self):
config = self.load_config()
- client = config.new_cloud_client()
- sizes = config.node_sizes(client.list_sizes())
- self.assertEqual(1, len(sizes))
+ sizes = config.node_sizes()
+ self.assertEqual(2, len(sizes))
size, kwargs = sizes[0]
self.assertEqual('Small', size.name)
self.assertEqual(1, kwargs['cores'])
self.assertEqual(0.8, kwargs['price'])
+ # preemptable is False by default
+ self.assertEqual(False, kwargs['preemptable'])
+ # instance_type == arvados node size id by default
+ self.assertEqual(kwargs['id'], kwargs['instance_type'])
+ # Now retrieve the preemptable version
+ size, kwargs = sizes[1]
+ self.assertEqual('Small', size.name)
+ self.assertEqual('1.preemptable', kwargs['id'])
+ self.assertEqual(1, kwargs['cores'])
+ self.assertEqual(0.8, kwargs['price'])
+ self.assertEqual(True, kwargs['preemptable'])
+ self.assertEqual('1', kwargs['instance_type'])
+
def test_default_node_mem_scaling(self):
config = self.load_config()
from arvnodeman.computenode.dispatch import ComputeNodeMonitorActor
from . import testutil
from . import test_status
+from . import pykka_timeout
import logging
class NodeManagerDaemonActorTestCase(testutil.ActorTestMixin,
unittest.TestCase):
+ def assertwait(self, f, timeout=pykka_timeout*2):
+ deadline = time.time() + timeout
+ while True:
+ try:
+ return f()
+ except AssertionError:
+ if time.time() > deadline:
+ raise
+ pass
+ time.sleep(.1)
+ self.daemon.ping().get(self.TIMEOUT)
+
def busywait(self, f):
for n in xrange(200):
ok = f()
self.assertIn('node_quota', status.tracker._latest)
def check_monitors_arvados_nodes(self, *arv_nodes):
- self.busywait(lambda: len(arv_nodes) == len(self.monitored_arvados_nodes()))
- self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes())
+ self.assertwait(lambda: self.assertItemsEqual(arv_nodes, self.monitored_arvados_nodes()))
def test_node_pairing(self):
cloud_node = testutil.cloud_node_mock(1)
arvados_nodes=[testutil.arvados_node_mock(1),
testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
want_sizes=[size])
- self.busywait(lambda: 2 == self.paired_monitor_count())
+ self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
for mon_ref in self.monitor_list():
self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
self.assertEqual(1, self.node_shutdown.start.call_count)
arvados_nodes=[testutil.arvados_node_mock(1),
testutil.arvados_node_mock(2, last_ping_at='1970-01-01T01:02:03.04050607Z')],
want_sizes=[size])
- self.busywait(lambda: 2 == self.paired_monitor_count())
+ self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
get_cloud_node = mock.MagicMock(name="get_cloud_node")
get_cloud_node.get.return_value = cloud_nodes[1]
mock_node_monitor = mock.MagicMock()
self.daemon.cloud_nodes.get()[cloud_nodes[1].id].shutdown_actor = mock_shutdown.proxy()
- self.busywait(lambda: 2 == self.alive_monitor_count())
+ self.assertwait(lambda: self.assertEqual(2, self.alive_monitor_count()))
for mon_ref in self.monitor_list():
self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
self.busywait(lambda: 1 == self.node_shutdown.start.call_count)
arv_node = testutil.arvados_node_mock(2, job_uuid=True)
self.make_daemon([testutil.cloud_node_mock(2, size=size)], [arv_node],
[size], avail_sizes=[(size, {"cores":1})])
- self.busywait(lambda: 1 == self.paired_monitor_count())
- self.busywait(lambda: self.node_setup.start.called)
+ self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
+ self.assertwait(lambda: self.assertEqual(1, self.node_setup.start.called))
def test_boot_new_node_below_min_nodes(self):
min_size = testutil.MockSize(1)
arv_node = testutil.arvados_node_mock(1)
size = testutil.MockSize(1)
self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], want_sizes=[size])
- self.busywait(lambda: 1 == self.paired_monitor_count())
+ self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
monitor = self.monitor_list()[0].proxy()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
cloud_node = testutil.cloud_node_mock(1)
arv_node = testutil.arvados_node_mock(1)
self.make_daemon(cloud_nodes=[cloud_node], arvados_nodes=[arv_node], min_nodes=1)
- self.busywait(lambda: 1 == self.paired_monitor_count())
+ self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
monitor = self.monitor_list()[0].proxy()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.stop_proxy(self.daemon)
arv_nodes = [testutil.arvados_node_mock(3, job_uuid=True),
testutil.arvados_node_mock(4, job_uuid=None)]
self.make_daemon(cloud_nodes, arv_nodes, [size])
- self.busywait(lambda: 2 == self.paired_monitor_count())
+ self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
for mon_ref in self.monitor_list():
monitor = mon_ref.proxy()
if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.last_shutdown.success.get.return_value = False
self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
- self.busywait(lambda: 1 == self.paired_monitor_count())
+ self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.last_shutdown.success.get.return_value = True
self.last_shutdown.stop.side_effect = lambda: monitor.stop()
self.daemon.node_finished_shutdown(self.last_shutdown).get(self.TIMEOUT)
- self.busywait(lambda: 0 == self.paired_monitor_count())
+ self.assertwait(lambda: self.assertEqual(0, self.paired_monitor_count()))
def test_nodes_shutting_down_replaced_below_max_nodes(self):
size = testutil.MockSize(6)
cloud_node = testutil.cloud_node_mock(7)
self.make_daemon([cloud_node], [testutil.arvados_node_mock(7)],
max_nodes=1)
- self.busywait(lambda: 1 == self.paired_monitor_count())
+ self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
monitor = self.monitor_list()[0].proxy()
self.daemon.node_can_shutdown(monitor).get(self.TIMEOUT)
self.assertTrue(self.node_shutdown.start.called)
arv_nodes = [testutil.arvados_node_mock(n, size=size) for n in [8, 9]]
self.make_daemon(cloud_nodes, arv_nodes, [size],
avail_sizes=[(size, {"cores":1})])
- self.busywait(lambda: 2 == self.paired_monitor_count())
+ self.assertwait(lambda: self.assertEqual(2, self.paired_monitor_count()))
for mon_ref in self.monitor_list():
self.daemon.node_can_shutdown(mon_ref.proxy()).get(self.TIMEOUT)
self.assertEqual(1, self.node_shutdown.start.call_count)
cloud_nodes = [testutil.cloud_node_mock(1, size=size)]
arv_nodes = [testutil.arvados_node_mock(1, job_uuid=None)]
self.make_daemon(cloud_nodes, arv_nodes, [size])
- self.busywait(lambda: 1 == self.paired_monitor_count())
+ self.assertwait(lambda: self.assertEqual(1, self.paired_monitor_count()))
for mon_ref in self.monitor_list():
monitor = mon_ref.proxy()
if monitor.cloud_node.get(self.TIMEOUT) is cloud_nodes[-1]:
testutil.arvados_node_mock(3)],
want_sizes=[small, small, big],
avail_sizes=avail_sizes)
- self.busywait(lambda: 3 == self.paired_monitor_count())
+ self.assertwait(lambda: self.assertEqual(3, self.paired_monitor_count()))
self.daemon.update_server_wishlist([small, big, big]).get(self.TIMEOUT)
self.assertEqual(0, self.node_shutdown.start.call_count)
def test_nonfatal_error(self):
status.tracker.update({'actor_exceptions': 0})
kill_mock = mock.Mock('os.kill')
- act = BogusActor.start(OSError(errno.ENOENT, ""), killfunc=kill_mock).tell_proxy()
+ bgact = BogusActor.start(OSError(errno.ENOENT, ""), killfunc=kill_mock)
+ act_thread = bgact.proxy().get_thread().get()
+ act = bgact.tell_proxy()
act.doStuff()
act.actor_ref.stop(block=True)
+ act_thread.join()
self.assertFalse(kill_mock.called)
self.assertEqual(1, status.tracker.get('actor_exceptions'))
self.public_ips = []
self.size = testutil.MockSize(1)
self.state = 0
+ self.extra = {'arvados_node_size': self.size.id}
def build_monitor(self, side_effect, *args, **kwargs):
class MockSize(object):
- def __init__(self, factor):
+ def __init__(self, factor, preemptable=False):
self.id = 'z{}.test'.format(factor)
self.name = 'test size '+self.id
self.ram = 128 * factor
self.bandwidth = 16 * factor
self.price = float(factor)
self.extra = {}
+ self.real = self
+ self.preemptable = preemptable
def __eq__(self, other):
return self.id == other.id
#!/bin/sh
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
# system one time tasks
PATH=/command:/sbin:/bin:/usr/sbin:/usr/bin
#!/bin/sh
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
PATH=/usr/local/bin:/usr/local/sbin:/bin:/sbin:/usr/bin:/usr/sbin:/usr/X11R6/bin
#!/bin/sh
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
exec 2>&1
PATH=/command:/sbin:/bin:/usr/sbin:/usr/bin
#!/bin/sh
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
exec 2>&1
PATH=/command:/sbin:/bin:/usr/sbin:/usr/bin
+// Copyright (c) 2009 Dan Vanderkam. All rights reserved.
+//
+// SPDX-License-Identifier: MIT
+
/**
* Synchronize zooming and/or selections between a set of dygraphs.
*
* You may also set `range: false` if you wish to only sync the x-axis.
* The `range` option has no effect unless `zoom` is true (the default).
*
- * SPDX-License-Identifier: MIT
* Original source: https://github.com/danvk/dygraphs/blob/master/src/extras/synchronizer.js
* at commit b55a71d768d2f8de62877c32b3aec9e9975ac389
*