MAINTAINER Brett Smith <brett@curoverse.com>
# Install build dependencies provided in base distribution
-RUN yum -q -y install make automake gcc gcc-c++ libyaml-devel patch readline-devel zlib-devel libffi-devel openssl-devel bzip2 libtool bison sqlite-devel rpm-build git perl-ExtUtils-MakeMaker libattr-devel nss-devel libcurl-devel which tar scl-utils centos-release-SCL postgresql-devel
+RUN yum -q -y install make automake gcc gcc-c++ libyaml-devel patch readline-devel zlib-devel libffi-devel openssl-devel bzip2 libtool bison sqlite-devel rpm-build git perl-ExtUtils-MakeMaker libattr-devel nss-devel libcurl-devel which tar unzip scl-utils centos-release-SCL postgresql-devel
# Install golang binary
ADD generated/golang-amd64.tar.gz /usr/local/
RUN touch /var/lib/rpm/* && yum -q -y install python27 python33
RUN scl enable python33 "easy_install-3.3 pip" && scl enable python27 "easy_install-2.7 pip"
+# fpm requires ffi which now wants xz-libs-5 which isn't packaged for centos6
+# but the library from xz-libs-4.999 appears to be good enough.
+RUN ln -s /usr/lib64/liblzma.so.0 /usr/lib64/lzma.so.5
+
RUN cd /tmp && \
curl -OL 'http://pkgs.repoforge.org/rpmforge-release/rpmforge-release-0.5.3-1.el6.rf.x86_64.rpm' && \
rpm -ivh rpmforge-release-0.5.3-1.el6.rf.x86_64.rpm && \
MAINTAINER Ward Vandewege <ward@curoverse.com>
# Install dependencies and set up system.
-RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev curl git procps libattr1-dev libfuse-dev libpq-dev python-pip
+RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev curl git procps libattr1-dev libfuse-dev libpq-dev python-pip unzip
# Install RVM
RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
MAINTAINER Ward Vandewege <ward@curoverse.com>
# Install dependencies and set up system.
-RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev curl git procps libattr1-dev libfuse-dev libgnutls28-dev libpq-dev python-pip
+RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev curl git procps libattr1-dev libfuse-dev libgnutls28-dev libpq-dev python-pip unzip
# Install RVM
RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
MAINTAINER Ward Vandewege <ward@curoverse.com>
# Install dependencies and set up system.
-RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip build-essential
+RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip build-essential unzip
# Install RVM
RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
MAINTAINER Brett Smith <brett@curoverse.com>
# Install dependencies and set up system.
-RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip
+RUN /usr/bin/apt-get update && /usr/bin/apt-get install -q -y python2.7-dev python3 python-setuptools python3-setuptools libcurl4-gnutls-dev curl git libattr1-dev libfuse-dev libpq-dev python-pip unzip
# Install RVM
RUN gpg --keyserver pool.sks-keyservers.net --recv-keys D39DC0E3 && \
crunchstat
keepproxy
keep-rsync
+ keep-block-check
keepstore
keep-web
libarvados-perl"
PYTHON2_PKG_PREFIX=python
PYTHON3_PACKAGE=python$PYTHON3_VERSION
PYTHON3_PKG_PREFIX=python3
- PYTHON_BACKPORTS=(python-gflags google-api-python-client==1.4.2 \
+ PYTHON_BACKPORTS=(python-gflags==2.0 google-api-python-client==1.4.2 \
oauth2client==1.5.2 pyasn1==0.1.7 pyasn1-modules==0.0.5 \
rsa uritemplate httplib2 ws4py pykka six pyexecjs jsonschema \
ciso8601 pycrypto backports.ssl_match_hostname llfuse==0.41.1 \
PYTHON2_PKG_PREFIX=python
PYTHON3_PACKAGE=python$PYTHON3_VERSION
PYTHON3_PKG_PREFIX=python3
- PYTHON_BACKPORTS=(python-gflags google-api-python-client==1.4.2 \
+ PYTHON_BACKPORTS=(python-gflags==2.0 google-api-python-client==1.4.2 \
oauth2client==1.5.2 pyasn1==0.1.7 pyasn1-modules==0.0.5 \
rsa uritemplate httplib2 ws4py pykka six pyexecjs jsonschema \
ciso8601 pycrypto backports.ssl_match_hostname llfuse==0.41.1 \
PYTHON2_PKG_PREFIX=python
PYTHON3_PACKAGE=python$PYTHON3_VERSION
PYTHON3_PKG_PREFIX=python3
- PYTHON_BACKPORTS=(python-gflags google-api-python-client==1.4.2 \
+ PYTHON_BACKPORTS=(python-gflags==2.0 google-api-python-client==1.4.2 \
oauth2client==1.5.2 pyasn1==0.1.7 pyasn1-modules==0.0.5 \
rsa uritemplate httplib2 ws4py pykka six pyexecjs jsonschema \
ciso8601 pycrypto backports.ssl_match_hostname llfuse==0.41.1 \
PYTHON2_PKG_PREFIX=$PYTHON2_PACKAGE
PYTHON3_PACKAGE=$(rpm -qf "$(which python$PYTHON3_VERSION)" --queryformat '%{NAME}\n')
PYTHON3_PKG_PREFIX=$PYTHON3_PACKAGE
- PYTHON_BACKPORTS=(python-gflags google-api-python-client==1.4.2 \
+ PYTHON_BACKPORTS=(python-gflags==2.0 google-api-python-client==1.4.2 \
oauth2client==1.5.2 pyasn1==0.1.7 pyasn1-modules==0.0.5 \
rsa uritemplate httplib2 ws4py pykka six pyexecjs jsonschema \
ciso8601 pycrypto backports.ssl_match_hostname 'pycurl<7.21.5' \
rpm2cpio ${LIBFUSE_DIR}/fuse-2.9.2-6.el7.src.rpm | cpio -i
perl -pi -e 's/Conflicts:\s*filesystem.*//g' fuse.spec
)
- # build rpms from source
+ # build rpms from source
rpmbuild -bb /root/rpmbuild/SOURCES/fuse.spec
rm -f fuse-2.9.2-6.el7.src.rpm
# move built RPMs to LIBFUSE_DIR
"Gather cpu/memory/network statistics of running Crunch jobs"
package_go_binary tools/keep-rsync keep-rsync \
"Copy all data from one set of Keep servers to another"
+package_go_binary tools/keep-block-check keep-block-check \
+ "Verify that all data from one set of Keep servers to another was copied"
package_go_binary sdk/go/crunchrunner crunchrunner \
"Crunchrunner executes a command inside a container and uploads the output"
fpm --maintainer='Ward Vandewege <ward@curoverse.com>' -s python -t $FORMAT --exclude=*/dist-packages/tests/* --exclude=*/site-packages/tests/* --deb-ignore-iteration-in-dependencies -n "${PYTHON2_PKG_PREFIX}-schema-salad" --iteration 1 --python-bin python2.7 --python-easyinstall "$EASY_INSTALL2" --python-package-name-prefix "$PYTHON2_PKG_PREFIX" --depends "$PYTHON2_PACKAGE" -v 1.7.20160316203940 schema_salad
# And for cwltool we have the same problem as for schema_salad. Ward, 2016-03-17
-fpm --maintainer='Ward Vandewege <ward@curoverse.com>' -s python -t $FORMAT --exclude=*/dist-packages/tests/* --exclude=*/site-packages/tests/* --deb-ignore-iteration-in-dependencies -n "${PYTHON2_PKG_PREFIX}-cwltool" --iteration 1 --python-bin python2.7 --python-easyinstall "$EASY_INSTALL2" --python-package-name-prefix "$PYTHON2_PKG_PREFIX" --depends "$PYTHON2_PACKAGE" -v 1.0.20160325200114 cwltool
+fpm --maintainer='Ward Vandewege <ward@curoverse.com>' -s python -t $FORMAT --exclude=*/dist-packages/tests/* --exclude=*/site-packages/tests/* --deb-ignore-iteration-in-dependencies -n "${PYTHON2_PKG_PREFIX}-cwltool" --iteration 1 --python-bin python2.7 --python-easyinstall "$EASY_INSTALL2" --python-package-name-prefix "$PYTHON2_PKG_PREFIX" --depends "$PYTHON2_PACKAGE" -v 1.0.20160421140153 cwltool
# FPM eats the trailing .0 in the python-rdflib-jsonld package when built with 'rdflib-jsonld>=0.3.0'. Force the version. Ward, 2016-03-25
fpm --maintainer='Ward Vandewege <ward@curoverse.com>' -s python -t $FORMAT --exclude=*/dist-packages/tests/* --exclude=*/site-packages/tests/* --deb-ignore-iteration-in-dependencies --verbose --log info -n "${PYTHON2_PKG_PREFIX}-rdflib-jsonld" --iteration 1 --python-bin python2.7 --python-easyinstall "$EASY_INSTALL2" --python-package-name-prefix "$PYTHON2_PKG_PREFIX" --depends "$PYTHON2_PACKAGE" -v 0.3.0 rdflib-jsonld
set -e
cd "$pyfpm_workdir"
pip install "${PIP_DOWNLOAD_SWITCHES[@]}" --download . "$deppkg"
- tar -xf "$deppkg"-*.tar*
+ # Sometimes pip gives us a tarball, sometimes a zip file...
+ DOWNLOADED=`ls $deppkg-*`
+ [[ "$DOWNLOADED" =~ ".tar" ]] && tar -xf $DOWNLOADED
+ [[ "$DOWNLOADED" =~ ".zip" ]] && unzip $DOWNLOADED
cd "$deppkg"-*/
"python$PYTHON2_VERSION" setup.py $DASHQ_UNLESS_DEBUG egg_info build
chmod -R go+rX .
sdk/cwl
tools/crunchstat-summary
tools/keep-rsync
+tools/keep-block-check
EOF
services/crunch-dispatch-slurm
services/crunch-run
tools/keep-rsync
+ tools/keep-block-check
)
for g in "${gostuff[@]}"
do
args.submit = False
args.debug = True
args.quiet = False
+ args.ignore_docker_for_reuse = False
outputObj = runner.arvExecutor(t, job_order_object, "", args, cwl_runner_job={"uuid": arvados.current_job()["uuid"], "state": arvados.current_job()["state"]})
files = {}
$ arvbox
Arvados-in-a-box http://arvados.org
-arvbox (build|start|run|open|shell|ip|stop|rebuild|reset|destroy|log|svrestart)
-
-build <config> build arvbox Docker image
+build <config> build arvbox Docker image
+rebuild <config> build arvbox Docker image, no layer cache
start|run <config> start arvbox container
open open arvbox workbench in a web browser
shell enter arvbox shell
status print some information about current arvbox
stop stop arvbox container
restart <config> stop, then run again
-rebuild <config> stop, build arvbox Docker image, run
+reboot <config> stop, build arvbox Docker image, run
reset delete arvbox arvados data (be careful!)
destroy delete all arvbox code and data (be careful!)
log <service> tail log of specified service
In "dev" and "localdemo" mode, Arvbox can only be accessed on the same host it is running. To publish Arvbox service ports to the host's service ports and advertise the host's IP address for services, use @publicdev@ or @publicdemo@:
<pre>
-$ arvbox rebuild publicdemo
+$ arvbox start publicdemo
</pre>
This attempts to auto-detect the correct IP address to use by taking the IP address of the default route device. If the auto-detection is wrong, you want to publish a hostname instead of a raw address, or you need to access it through a different device (such as a router or firewall), set @ARVBOX_PUBLISH_IP@ to the desire hostname or IP address.
<pre>
$ export ARVBOX_PUBLISH_IP=example.com
-$ arvbox rebuild publicdemo
+$ arvbox start publicdemo
</pre>
Note: this expects to bind the host's port 80 (http) for workbench, so you cannot have a conflicting web server already running on the host. It does not attempt to take bind the host's port 22 (ssh), as a result the arvbox ssh port is not published.
On Debian-based systems:
<notextile>
-<pre><code>~$ <span class="userinput">sudo apt-get install perl python-virtualenv fuse python-arvados-python-client python-arvados-fuse crunchstat arvados-docker-cleaner iptables ca-certificates</span>
+<pre><code>~$ <span class="userinput">sudo apt-get install perl python-virtualenv fuse python-arvados-python-client python-arvados-fuse crunchrunner crunchstat arvados-docker-cleaner iptables ca-certificates</span>
</code></pre>
</notextile>
On Red Hat-based systems:
<notextile>
-<pre><code>~$ <span class="userinput">sudo yum install perl python27-python-virtualenv fuse python27-python-arvados-python-client python27-python-arvados-fuse crunchstat arvados-docker-cleaner iptables ca-certificates</span>
+<pre><code>~$ <span class="userinput">sudo yum install perl python27-python-virtualenv fuse python27-python-arvados-python-client python27-python-arvados-fuse crunchrunner crunchstat arvados-docker-cleaner iptables ca-certificates</span>
</code></pre>
</notextile>
-azure-storage-account-name="": Azure storage account name used for subsequent --azure-storage-container-volume arguments.
-azure-storage-container-volume=[]: Use the given container as a storage volume. Can be given multiple times.
-azure-storage-replication=3: Replication level to report to clients when data is stored in an Azure container.
- -blob-signature-ttl=1209600: Lifetime of blob permission signatures. See services/api/config/application.default.yml.
+ -blob-signature-ttl=1209600: Lifetime of blob permission signatures. Modifying the ttl will invalidate all existing signatures. See services/api/config/application.default.yml.
-blob-signing-key-file="": File containing the secret key for generating and verifying blob permission signatures.
-data-manager-token-file="": File with the API token used by the Data Manager. All DELETE requests or GET /index requests must carry this token.
-enforce-permissions=false: Enforce permission signatures on requests.
On Debian-based systems:
<notextile>
-<pre><code>~$ <span class="userinput">sudo apt-get install python-arvados-python-client python-arvados-fuse</span>
+<pre><code>~$ <span class="userinput">sudo apt-get install python-arvados-python-client python-arvados-fuse crunchrunner</span>
</code></pre>
</notextile>
On Red Hat-based systems:
<notextile>
-<pre><code>~$ <span class="userinput">sudo yum install python27-python-arvados-python-client python27-python-arvados-fuse</span>
+<pre><code>~$ <span class="userinput">sudo yum install python27-python-arvados-python-client python27-python-arvados-fuse crunchrunner</span>
</code></pre>
</notextile>
require 'trollop'
require 'google/api_client'
rescue LoadError => l
- puts $:
+ $stderr.puts $:
abort <<-EOS
#{$0}: fatal: #{l.message}
Some runtime dependencies may be missing.
abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
end
elsif not $options[:template]
- puts "error: you must supply a --template or --instance."
+ $stderr.puts "error: you must supply a --template or --instance."
p.educate
abort
end
.q{&& SWAP=$(awk '($1 == "SwapTotal:"){print $2}' </proc/meminfo) }
."&& MEMLIMIT=\$(( (\$MEM * 95) / ($ENV{CRUNCH_NODE_SLOTS} * 100) )) "
."&& let SWAPLIMIT=\$MEMLIMIT+\$SWAP "
- # $VOLUME_CRUNCHRUNNER and $VOLUME_CERTS will be passed unquoted as
- # arguments to `docker run`. They must contain their own quoting.
- .q{&& VOLUME_CRUNCHRUNNER="" VOLUME_CERTS="" }
- .q{&& if which crunchrunner >/dev/null ; then VOLUME_CRUNCHRUNNER=--volume=$(which crunchrunner):/usr/local/bin/crunchrunner ; fi }
- .q{&& if test -f /etc/ssl/certs/ca-certificates.crt ; then VOLUME_CERTS=--volume=/etc/ssl/certs/ca-certificates.crt:/etc/arvados/ca-certificates.crt ; }
- .q{elif test -f /etc/pki/tls/certs/ca-bundle.crt ; then VOLUME_CERTS=--volume=/etc/pki/tls/certs/ca-bundle.crt:/etc/arvados/ca-certificates.crt ; fi };
+ .q{&& declare -a VOLUMES=() }
+ .q{&& if which crunchrunner >/dev/null ; then VOLUMES+=("--volume=$(which crunchrunner):/usr/local/bin/crunchrunner") ; fi }
+ .q{&& if test -f /etc/ssl/certs/ca-certificates.crt ; then VOLUMES+=("--volume=/etc/ssl/certs/ca-certificates.crt:/etc/arvados/ca-certificates.crt") ; }
+ .q{elif test -f /etc/pki/tls/certs/ca-bundle.crt ; then VOLUMES+=("--volume=/etc/pki/tls/certs/ca-bundle.crt:/etc/arvados/ca-certificates.crt") ; fi };
$command .= "&& exec arv-mount --read-write --mount-by-pdh=by_pdh --mount-tmp=tmp --crunchstat-interval=10 --allow-other $arv_file_cache \Q$keep_mnt\E --exec ";
$ENV{TASK_KEEPMOUNT} = "$keep_mnt/by_pdh";
# Bind mount the crunchrunner binary and host TLS certificates file into
# the container.
- $command .= "\$VOLUME_CRUNCHRUNNER \$VOLUME_CERTS ";
+ $command .= '"${VOLUMES[@]}" ';
while (my ($env_key, $env_val) = each %ENV)
{
'bin/arvados-cwl-runner'
],
install_requires=[
- 'cwltool>=1.0.20160325200114',
+ 'cwltool==1.0.20160421140153',
'arvados-python-client>=0.1.20160322001610'
],
test_suite='tests',
}
MD5CollisionMD5 = "cee9a457e790cf20d4bdaa6d69f01e41"
)
+
+const BlobSigningKey = "zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc"
}
var buf = make([]byte, fs.Offset+fs.Len)
_, err = io.ReadFull(rdr, buf)
+ errClosing := rdr.Close()
+ if err == nil {
+ err = errClosing
+ }
if err != nil {
r.err = err
close(r.errNotNil)
c.Check(err, check.NotNil)
c.Check(err, check.Not(check.Equals), io.EOF)
}
+ c.Check(rdr.Close(), check.NotNil)
}
// makePermSignature generates a SHA-1 HMAC digest for the given blob,
// token, expiry, and site secret.
-func makePermSignature(blobHash, apiToken, expiry string, permissionSecret []byte) string {
+func makePermSignature(blobHash, apiToken, expiry, blobSignatureTTL string, permissionSecret []byte) string {
hmac := hmac.New(sha1.New, permissionSecret)
hmac.Write([]byte(blobHash))
hmac.Write([]byte("@"))
hmac.Write([]byte(apiToken))
hmac.Write([]byte("@"))
hmac.Write([]byte(expiry))
+ hmac.Write([]byte("@"))
+ hmac.Write([]byte(blobSignatureTTL))
digest := hmac.Sum(nil)
return fmt.Sprintf("%x", digest)
}
//
// This function is intended to be used by system components and admin
// utilities: userland programs do not know the permissionSecret.
-func SignLocator(blobLocator, apiToken string, expiry time.Time, permissionSecret []byte) string {
+func SignLocator(blobLocator, apiToken string, expiry time.Time, blobSignatureTTL time.Duration, permissionSecret []byte) string {
if len(permissionSecret) == 0 || apiToken == "" {
return blobLocator
}
// Strip off all hints: only the hash is used to sign.
blobHash := strings.Split(blobLocator, "+")[0]
timestampHex := fmt.Sprintf("%08x", expiry.Unix())
+ blobSignatureTTLHex := strconv.FormatInt(int64(blobSignatureTTL.Seconds()), 16)
return blobLocator +
- "+A" + makePermSignature(blobHash, apiToken, timestampHex, permissionSecret) +
+ "+A" + makePermSignature(blobHash, apiToken, timestampHex, blobSignatureTTLHex, permissionSecret) +
"@" + timestampHex
}
//
// This function is intended to be used by system components and admin
// utilities: userland programs do not know the permissionSecret.
-func VerifySignature(signedLocator, apiToken string, permissionSecret []byte) error {
+func VerifySignature(signedLocator, apiToken string, blobSignatureTTL time.Duration, permissionSecret []byte) error {
matches := signedLocatorRe.FindStringSubmatch(signedLocator)
if matches == nil {
return ErrSignatureMissing
} else if expiryTime.Before(time.Now()) {
return ErrSignatureExpired
}
- if signatureHex != makePermSignature(blobHash, apiToken, expiryHex, permissionSecret) {
+ blobSignatureTTLHex := strconv.FormatInt(int64(blobSignatureTTL.Seconds()), 16)
+ if signatureHex != makePermSignature(blobHash, apiToken, expiryHex, blobSignatureTTLHex, permissionSecret) {
return ErrSignatureInvalid
}
return nil
"gokee3eamvjy8qq1fvy238838enjmy5wzy2md7yvsitp5vztft6j4q866efym7e6" +
"vu5wm9fpnwjyxfldw3vbo01mgjs75rgo7qioh8z8ij7jpyp8508okhgbbex3ceei" +
"786u5rw2a9gx743dj3fgq2irk"
- knownSignature = "257f3f5f5f0a4e4626a18fc74bd42ec34dcb228a"
+ knownSignature = "89118b78732c33104a4d6231e8b5a5fa1e4301e3"
knownTimestamp = "7fffffff"
knownSigHint = "+A" + knownSignature + "@" + knownTimestamp
knownSignedLocator = knownLocator + knownSigHint
+ blobSignatureTTL = 1209600 * time.Second
)
func TestSignLocator(t *testing.T) {
if ts, err := parseHexTimestamp(knownTimestamp); err != nil {
t.Errorf("bad knownTimestamp %s", knownTimestamp)
} else {
- if knownSignedLocator != SignLocator(knownLocator, knownToken, ts, []byte(knownKey)) {
+ if knownSignedLocator != SignLocator(knownLocator, knownToken, ts, blobSignatureTTL, []byte(knownKey)) {
t.Fail()
}
}
}
func TestVerifySignature(t *testing.T) {
- if VerifySignature(knownSignedLocator, knownToken, []byte(knownKey)) != nil {
+ if VerifySignature(knownSignedLocator, knownToken, blobSignatureTTL, []byte(knownKey)) != nil {
t.Fail()
}
}
func TestVerifySignatureExtraHints(t *testing.T) {
- if VerifySignature(knownLocator+"+K@xyzzy"+knownSigHint, knownToken, []byte(knownKey)) != nil {
+ if VerifySignature(knownLocator+"+K@xyzzy"+knownSigHint, knownToken, blobSignatureTTL, []byte(knownKey)) != nil {
t.Fatal("Verify cannot handle hint before permission signature")
}
- if VerifySignature(knownLocator+knownSigHint+"+Zfoo", knownToken, []byte(knownKey)) != nil {
+ if VerifySignature(knownLocator+knownSigHint+"+Zfoo", knownToken, blobSignatureTTL, []byte(knownKey)) != nil {
t.Fatal("Verify cannot handle hint after permission signature")
}
- if VerifySignature(knownLocator+"+K@xyzzy"+knownSigHint+"+Zfoo", knownToken, []byte(knownKey)) != nil {
+ if VerifySignature(knownLocator+"+K@xyzzy"+knownSigHint+"+Zfoo", knownToken, blobSignatureTTL, []byte(knownKey)) != nil {
t.Fatal("Verify cannot handle hints around permission signature")
}
}
// The size hint on the locator string should not affect signature validation.
func TestVerifySignatureWrongSize(t *testing.T) {
- if VerifySignature(knownHash+"+999999"+knownSigHint, knownToken, []byte(knownKey)) != nil {
+ if VerifySignature(knownHash+"+999999"+knownSigHint, knownToken, blobSignatureTTL, []byte(knownKey)) != nil {
t.Fatal("Verify cannot handle incorrect size hint")
}
- if VerifySignature(knownHash+knownSigHint, knownToken, []byte(knownKey)) != nil {
+ if VerifySignature(knownHash+knownSigHint, knownToken, blobSignatureTTL, []byte(knownKey)) != nil {
t.Fatal("Verify cannot handle missing size hint")
}
}
func TestVerifySignatureBadSig(t *testing.T) {
badLocator := knownLocator + "+Aaaaaaaaaaaaaaaa@" + knownTimestamp
- if VerifySignature(badLocator, knownToken, []byte(knownKey)) != ErrSignatureMissing {
+ if VerifySignature(badLocator, knownToken, blobSignatureTTL, []byte(knownKey)) != ErrSignatureMissing {
t.Fail()
}
}
func TestVerifySignatureBadTimestamp(t *testing.T) {
badLocator := knownLocator + "+A" + knownSignature + "@OOOOOOOl"
- if VerifySignature(badLocator, knownToken, []byte(knownKey)) != ErrSignatureMissing {
+ if VerifySignature(badLocator, knownToken, blobSignatureTTL, []byte(knownKey)) != ErrSignatureMissing {
t.Fail()
}
}
func TestVerifySignatureBadSecret(t *testing.T) {
- if VerifySignature(knownSignedLocator, knownToken, []byte("00000000000000000000")) != ErrSignatureInvalid {
+ if VerifySignature(knownSignedLocator, knownToken, blobSignatureTTL, []byte("00000000000000000000")) != ErrSignatureInvalid {
t.Fail()
}
}
func TestVerifySignatureBadToken(t *testing.T) {
- if VerifySignature(knownSignedLocator, "00000000", []byte(knownKey)) != ErrSignatureInvalid {
+ if VerifySignature(knownSignedLocator, "00000000", blobSignatureTTL, []byte(knownKey)) != ErrSignatureInvalid {
t.Fail()
}
}
func TestVerifySignatureExpired(t *testing.T) {
yesterday := time.Now().AddDate(0, 0, -1)
- expiredLocator := SignLocator(knownHash, knownToken, yesterday, []byte(knownKey))
- if VerifySignature(expiredLocator, knownToken, []byte(knownKey)) != ErrSignatureExpired {
+ expiredLocator := SignLocator(knownHash, knownToken, yesterday, blobSignatureTTL, []byte(knownKey))
+ if VerifySignature(expiredLocator, knownToken, blobSignatureTTL, []byte(knownKey)) != ErrSignatureExpired {
t.Fail()
}
}
return ch
}
-// Blocks may appear mulitple times within the same manifest if they
+// Blocks may appear multiple times within the same manifest if they
// are used by multiple files. In that case this Iterator will output
// the same block multiple times.
//
Meanwhile, the transfer() function selects() on two channels, the "requests"
channel and the "slices" channel.
-When a message is recieved on the "slices" channel, this means the a new
+When a message is received on the "slices" channel, this means the a new
section of the buffer has data, or an error is signaled. Since the data has
been read directly into the source_buffer, it is able to simply increases the
size of the body slice to encompass the newly filled in section. Then any
pending reads are serviced with handleReadRequest (described below).
-When a message is recieved on the "requests" channel, it means a StreamReader
+When a message is received on the "requests" channel, it means a StreamReader
wants access to a slice of the buffer. This is passed to handleReadRequest().
The handleReadRequest() function takes a sliceRequest consisting of a buffer
put_args += ['--name', collection_name]
coll_uuid = arv_put.main(
- put_args + ['--filename', outfile_name, image_file.name]).strip()
+ put_args + ['--filename', outfile_name, image_file.name], stdout=stdout).strip()
# Read the image metadata and make Arvados links from it.
image_file.seek(0)
s.files = ["lib/arvados.rb", "lib/arvados/google_api_client.rb",
"lib/arvados/collection.rb", "lib/arvados/keep.rb",
"README", "LICENSE-2.0.txt"]
- s.required_ruby_version = '>= 2.1.0'
+ s.required_ruby_version = '>= 1.8.7'
# activesupport <4.2.6 only because https://dev.arvados.org/issues/8222
- s.add_dependency('activesupport', '>= 3.2.13', '< 4.2.6')
+ s.add_dependency('activesupport', '>= 3', '< 4.2.6')
s.add_dependency('andand', '~> 1.3', '>= 1.3.3')
- s.add_dependency('google-api-client', '~> 0.6.3', '>= 0.6.3')
+ s.add_dependency('google-api-client', '>= 0.7', '< 0.9')
+ # work around undeclared dependency on i18n in some activesupport 3.x.x:
+ s.add_dependency('i18n', '~> 0')
s.add_dependency('json', '~> 1.7', '>= 1.7.7')
- s.add_runtime_dependency('jwt', '>= 0.1.5', '< 1.0.0')
+ s.add_runtime_dependency('jwt', '<2', '>= 0.1.5')
s.homepage =
'https://arvados.org'
end
:parameters => parameters,
:body_object => body,
:headers => {
- authorization: 'OAuth2 '+arvados.config['ARVADOS_API_TOKEN']
+ :authorization => 'OAuth2 '+arvados.config['ARVADOS_API_TOKEN']
})
resp = JSON.parse result.body, :symbolize_names => true
if resp[:errors]
elsif resp[:uuid] and resp[:etag]
self.new(resp)
elsif resp[:items].is_a? Array
- resp.merge(items: resp[:items].collect do |i|
+ resp.merge(:items => resp[:items].collect do |i|
self.new(i)
end)
else
end
def cp_r(source, target, source_collection=nil)
- opts = {descend_target: !source.end_with?("/")}
+ opts = {:descend_target => !source.end_with?("/")}
copy(:merge, source.chomp("/"), target, source_collection, opts)
end
end
def rm_r(source)
- remove(source, recursive: true)
+ remove(source, :recursive => true)
end
protected
modified
end
- LocatorSegment = Struct.new(:locators, :start_pos, :length)
+ Struct.new("LocatorSegment", :locators, :start_pos, :length)
class LocatorRange < Range
attr_reader :locator
end_index = search_for_byte(start_pos + length - 1, start_index)
end
seg_ranges = @ranges[start_index..end_index]
- LocatorSegment.new(seg_ranges.map(&:locator),
- start_pos - seg_ranges.first.begin,
- length)
+ Struct::LocatorSegment.new(seg_ranges.map(&:locator),
+ start_pos - seg_ranges.first.begin,
+ length)
end
private
raise ArgumentError.new "locator is nil or empty"
end
- m = LOCATOR_REGEXP.match(tok.strip)
+ m = LOCATOR_REGEXP.match(tok)
unless m
raise ArgumentError.new "not a valid locator #{tok}"
end
- tokhash, _, toksize, _, trailer = m[1..5]
+ tokhash, _, toksize, _, _, trailer = m[1..6]
tokhints = []
if trailer
trailer.split('+').each do |hint|
- if hint =~ /^[[:upper:]][[:alnum:]@_-]+$/
+ if hint =~ /^[[:upper:]][[:alnum:]@_-]*$/
tokhints.push(hint)
else
- raise ArgumentError.new "unknown hint #{hint}"
+ raise ArgumentError.new "invalid hint #{hint}"
end
end
end
[true, 'd41d8cd98f00b204e9800998ecf8427e+0', '+0','0',nil],
[true, 'd41d8cd98f00b204e9800998ecf8427e+0+Fizz+Buzz','+0','0','+Fizz+Buzz'],
[true, 'd41d8cd98f00b204e9800998ecf8427e+Fizz+Buzz', nil,nil,'+Fizz+Buzz'],
+ [true, 'd41d8cd98f00b204e9800998ecf8427e+0+Ad41d8cd98f00b204e9800998ecf8427e00000000+Foo', '+0','0','+Ad41d8cd98f00b204e9800998ecf8427e00000000+Foo'],
+ [true, 'd41d8cd98f00b204e9800998ecf8427e+Ad41d8cd98f00b204e9800998ecf8427e00000000+Foo', nil,nil,'+Ad41d8cd98f00b204e9800998ecf8427e00000000+Foo'],
[true, 'd41d8cd98f00b204e9800998ecf8427e+0+Z', '+0','0','+Z'],
[true, 'd41d8cd98f00b204e9800998ecf8427e+Z', nil,nil,'+Z'],
].each do |ok, locator, match2, match3, match4|
assert_equal match4, match[4]
end
end
+ define_method "test_parse_method_on_#{locator.inspect}" do
+ loc = Keep::Locator.parse locator
+ if !ok
+ assert_nil loc
+ else
+ refute_nil loc
+ assert loc.is_a?(Keep::Locator)
+ #assert loc.hash
+ #assert loc.size
+ #assert loc.hints.is_a?(Array)
+ end
+ end
end
[
[true, ". d41d8cd98f00b204e9800998ecf8427e+0 0:0:\\040\n"],
[true, ". 00000000000000000000000000000000+0 0:0:0\n"],
[true, ". 00000000000000000000000000000000+0 0:0:d41d8cd98f00b204e9800998ecf8427e+0+Ad41d8cd98f00b204e9800998ecf8427e00000000@ffffffff\n"],
+ [true, ". d41d8cd98f00b204e9800998ecf8427e+0+Ad41d8cd98f00b204e9800998ecf8427e00000000@ffffffff 0:0:empty.txt\n"],
[false, '. d41d8cd98f00b204e9800998ecf8427e 0:0:abc.txt',
"Invalid manifest: does not end with newline"],
[false, "abc d41d8cd98f00b204e9800998ecf8427e 0:0:abc.txt\n",
end
timestamp_hex = timestamp.to_s(16)
# => "53163cb4"
+ blob_signature_ttl = Rails.configuration.blob_signature_ttl.to_s(16)
# Generate a signature.
signature =
generate_signature((opts[:key] or Rails.configuration.blob_signing_key),
- blob_hash, opts[:api_token], timestamp_hex)
+ blob_hash, opts[:api_token], timestamp_hex, blob_signature_ttl)
blob_locator + '+A' + signature + '@' + timestamp_hex
end
if timestamp.to_i(16) < (opts[:now] or db_current_time.to_i)
raise Blob::InvalidSignatureError.new 'Signature expiry time has passed.'
end
+ blob_signature_ttl = Rails.configuration.blob_signature_ttl.to_s(16)
my_signature =
generate_signature((opts[:key] or Rails.configuration.blob_signing_key),
- blob_hash, opts[:api_token], timestamp)
+ blob_hash, opts[:api_token], timestamp, blob_signature_ttl)
if my_signature != given_signature
raise Blob::InvalidSignatureError.new 'Signature is invalid.'
true
end
- def self.generate_signature key, blob_hash, api_token, timestamp
+ def self.generate_signature key, blob_hash, api_token, timestamp, blob_signature_ttl
OpenSSL::HMAC.hexdigest('sha1', key,
[blob_hash,
api_token,
- timestamp].join('@'))
+ timestamp,
+ blob_signature_ttl].join('@'))
end
end
# generate permission signatures for Keep locators. It must be
# identical to the permission key given to Keep. IMPORTANT: This is
# a site secret. It should be at least 50 characters.
+ #
+ # Modifying blob_signing_key will invalidate all existing
+ # signatures, which can cause programs to fail (e.g., arv-put,
+ # arv-get, and Crunch jobs). To avoid errors, rotate keys only when
+ # no such processes are running.
blob_signing_key: ~
# These settings are provided by your OAuth2 provider (e.g.,
# still has permission) the client can retrieve the collection again
# to get fresh signatures.
#
- # Datamanager considers an unreferenced block older than this to be
- # eligible for garbage collection. Therefore, it should never be
- # smaller than the corresponding value used by any local keepstore
- # service (see keepstore -blob-signature-ttl flag). This rule
- # prevents datamanager from trying to garbage-collect recently
- # written blocks while clients are still holding valid signatures.
+ # This must be exactly equal to the -blob-signature-ttl flag used by
+ # keepstore servers. Otherwise, reading data blocks and saving
+ # collections will fail with HTTP 403 permission errors.
+ #
+ # Modifying blob_signature_ttl invalidates existing signatures; see
+ # blob_signing_key note above.
#
# The default is 2 weeks.
blob_signature_ttl: 1209600
'vu5wm9fpnwjyxfldw3vbo01mgjs75rgo7qioh8z8ij7jpyp8508okhgbbex3ceei' +
'786u5rw2a9gx743dj3fgq2irk'
@@known_signed_locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3' +
- '+A257f3f5f5f0a4e4626a18fc74bd42ec34dcb228a@7fffffff'
+ '+A89118b78732c33104a4d6231e8b5a5fa1e4301e3@7fffffff'
test 'generate predictable invincible signature' do
signed = Blob.sign_locator @@known_locator, {
Blob.verify_signature!(@@blob_locator, api_token: @@api_token, key: @@key)
end
end
+
+ test 'signature changes when ttl changes' do
+ signed = Blob.sign_locator @@known_locator, {
+ api_token: @@known_token,
+ key: @@known_key,
+ expire: 0x7fffffff,
+ }
+
+ original_ttl = Rails.configuration.blob_signature_ttl
+ Rails.configuration.blob_signature_ttl = original_ttl*2
+ signed2 = Blob.sign_locator @@known_locator, {
+ api_token: @@known_token,
+ key: @@known_key,
+ expire: 0x7fffffff,
+ }
+ Rails.configuration.blob_signature_ttl = original_ttl
+
+ assert_not_equal signed, signed2
+ end
end
}
// Used by the TestFullRun*() test below to DRY up boilerplate setup to do full
-// dress rehersal of the Run() function, starting from a JSON container record.
+// dress rehearsal of the Run() function, starting from a JSON container record.
func FullRunHelper(c *C, record string, fn func(t *TestDockerClient)) (api *ArvTestClient, cr *ContainerRunner) {
rec := ContainerRecord{}
err := json.NewDecoder(strings.NewReader(record)).Decode(&rec)
t.finish <- dockerclient.WaitResult{ExitCode: 1}
})
- c.Check(api.Calls, Equals, 8)
+ c.Assert(api.Calls, Equals, 8)
c.Check(api.Content[7]["container"].(arvadosclient.Dict)["log"], NotNil)
c.Check(api.Content[7]["container"].(arvadosclient.Dict)["exit_code"], Equals, 1)
c.Check(api.Content[7]["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
"time"
)
-// Useful to call at the begining of execution to log info about the
+// Useful to call at the beginning of execution to log info about the
// current run.
func LogRunInfo(arvLogger *logger.Logger) {
if arvLogger != nil {
blockToDesiredReplication map[blockdigest.DigestWithSize]int,
underReplicated BlockSet) (m map[Locator]PullServers) {
m = map[Locator]PullServers{}
- // We use CanonicalString to avoid filling memory with dupicate
+ // We use CanonicalString to avoid filling memory with duplicate
// copies of the same string.
var cs CanonicalString
@mock.patch('arvados.keep.KeepClient.get')
def runTest(self, mocked_get):
- logging.getLogger('arvados.arvados_fuse').setLevel(logging.DEBUG)
self.api._rootDesc = {"blobSignatureTtl": 2}
mnt = self.make_mount(fuse.CollectionDirectory, collection_record='zzzzz-4zz18-op4e2lbej01tcvu')
mocked_get.return_value = 'fake data'
statusCode, statusText = http.StatusInternalServerError, err.Error()
return
}
+ if kc.Client != nil && kc.Client.Transport != nil {
+ // Workaround for https://dev.arvados.org/issues/9005
+ if t, ok := kc.Client.Transport.(*http.Transport); ok {
+ defer t.CloseIdleConnections()
+ }
+ }
rdr, err := kc.CollectionFileReader(collection, filename)
if os.IsNotExist(err) {
statusCode = http.StatusNotFound
}
if cache.RecallToken(tok) {
- // Valid in the cache, short circut
+ // Valid in the cache, short circuit
return true, tok
}
&permissionTTLSec,
"blob-signature-ttl",
int(time.Duration(2*7*24*time.Hour).Seconds()),
- "Lifetime of blob permission signatures. "+
+ "Lifetime of blob permission signatures. Modifying the ttl will invalidate all existing signatures. "+
"See services/api/config/application.default.yml.")
flag.BoolVar(
&flagSerializeIO,
// SignLocator takes a blobLocator, an apiToken and an expiry time, and
// returns a signed locator string.
func SignLocator(blobLocator, apiToken string, expiry time.Time) string {
- return keepclient.SignLocator(blobLocator, apiToken, expiry, PermissionSecret)
+ return keepclient.SignLocator(blobLocator, apiToken, expiry, blobSignatureTTL, PermissionSecret)
}
// VerifySignature returns nil if the signature on the signedLocator
// something the client could have figured out independently) or
// PermissionError.
func VerifySignature(signedLocator, apiToken string) error {
- err := keepclient.VerifySignature(signedLocator, apiToken, PermissionSecret)
+ err := keepclient.VerifySignature(signedLocator, apiToken, blobSignatureTTL, PermissionSecret)
if err == keepclient.ErrSignatureExpired {
return ExpiredError
} else if err != nil {
"gokee3eamvjy8qq1fvy238838enjmy5wzy2md7yvsitp5vztft6j4q866efym7e6" +
"vu5wm9fpnwjyxfldw3vbo01mgjs75rgo7qioh8z8ij7jpyp8508okhgbbex3ceei" +
"786u5rw2a9gx743dj3fgq2irk"
- knownSignature = "257f3f5f5f0a4e4626a18fc74bd42ec34dcb228a"
+ knownSignatureTTL = 1209600 * time.Second
+ knownSignature = "89118b78732c33104a4d6231e8b5a5fa1e4301e3"
knownTimestamp = "7fffffff"
knownSigHint = "+A" + knownSignature + "@" + knownTimestamp
knownSignedLocator = knownLocator + knownSigHint
}
t0 := time.Unix(tsInt, 0)
+ blobSignatureTTL = knownSignatureTTL
+
PermissionSecret = []byte(knownKey)
if x := SignLocator(knownLocator, knownToken, t0); x != knownSignedLocator {
t.Fatalf("Got %+q, expected %+q", x, knownSignedLocator)
PermissionSecret = b
}(PermissionSecret)
+ blobSignatureTTL = knownSignatureTTL
+
PermissionSecret = []byte(knownKey)
if err := VerifySignature(knownSignedLocator, knownToken); err != nil {
t.Fatal(err)
}
/* Allow default Trash Life time to be used. Thus, the newly created block
- will not be deleted becuase its Mtime is within the trash life time.
+ will not be deleted because its Mtime is within the trash life time.
*/
func TestTrashWorkerIntegration_SameLocatorInTwoVolumesWithDefaultTrashLifeTime(t *testing.T) {
neverDelete = false
arvados_node_missing, RetryMixin
from ...clientactor import _notify_subscribers
from ... import config
+from .transitions import transitions
class ComputeNodeStateChangeBase(config.actor_class, RetryMixin):
"""Base class for actors that change a compute node's state.
self._logger.info("Shutdown cancelled: %s.", reason)
self._finished(success_flag=False)
- def _stop_if_window_closed(orig_func):
- @functools.wraps(orig_func)
- def stop_wrapper(self, *args, **kwargs):
- if (self.cancellable and
- (self._monitor.shutdown_eligible().get() is not True)):
- self._later.cancel_shutdown(self.WINDOW_CLOSED)
- return None
- else:
- return orig_func(self, *args, **kwargs)
- return stop_wrapper
-
def _cancel_on_exception(orig_func):
@functools.wraps(orig_func)
def finish_wrapper(self, *args, **kwargs):
return finish_wrapper
@_cancel_on_exception
- @_stop_if_window_closed
@RetryMixin._retry()
def shutdown_node(self):
self._logger.info("Starting shutdown")
self._clean_arvados_node(arvados_node, "Shut down by Node Manager")
self._finished(success_flag=True)
- # Make the decorator available to subclasses.
- _stop_if_window_closed = staticmethod(_stop_if_window_closed)
-
class ComputeNodeUpdateActor(config.actor_class):
"""Actor to dispatch one-off cloud management requests.
return result
def shutdown_eligible(self):
- """Return True if eligible for shutdown, or a string explaining why the node
- is not eligible for shutdown."""
+ """Determine if node is candidate for shut down.
+
+ Returns a tuple of (boolean, string) where the first value is whether
+ the node is candidate for shut down, and the second value is the
+ reason for the decision.
+ """
+
+ # Collect states and then consult state transition table whether we
+ # should shut down. Possible states are:
+ # crunch_worker_state = ['unpaired', 'busy', 'idle', 'down']
+ # window = ["open", "closed"]
+ # boot_grace = ["boot wait", "boot exceeded"]
+ # idle_grace = ["not idle", "idle wait", "idle exceeded"]
- if not self._shutdowns.window_open():
- return "shutdown window is not open."
if self.arvados_node is None:
- # Node is unpaired.
- # If it hasn't pinged Arvados after boot_fail seconds, shut it down
- if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
- return "node is still booting, will be considered a failed boot at %s" % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(self.cloud_node_start_time + self.boot_fail_after))
- else:
- return True
- missing = arvados_node_missing(self.arvados_node, self.node_stale_after)
- if missing and self._cloud.broken(self.cloud_node):
- # Node is paired, but Arvados says it is missing and the cloud says the node
- # is in an error state, so shut it down.
- return True
- if missing is None and self._cloud.broken(self.cloud_node):
- self._logger.info(
- "Cloud node considered 'broken' but paired node %s last_ping_at is None, " +
- "cannot check node_stale_after (node may be shut down and we just haven't gotten the message yet).",
- self.arvados_node['uuid'])
- if self.in_state('idle'):
- return True
+ crunch_worker_state = 'unpaired'
+ elif not timestamp_fresh(arvados_node_mtime(self.arvados_node), self.node_stale_after):
+ return (False, "node state is stale")
+ elif self.arvados_node['crunch_worker_state']:
+ crunch_worker_state = self.arvados_node['crunch_worker_state']
+ else:
+ return (False, "node is paired but crunch_worker_state is '%s'" % self.arvados_node['crunch_worker_state'])
+
+ window = "open" if self._shutdowns.window_open() else "closed"
+
+ if timestamp_fresh(self.cloud_node_start_time, self.boot_fail_after):
+ boot_grace = "boot wait"
else:
- return "node is not idle."
+ boot_grace = "boot exceeded"
- def resume_node(self):
- pass
+ # API server side not implemented yet.
+ idle_grace = 'idle exceeded'
+
+ node_state = (crunch_worker_state, window, boot_grace, idle_grace)
+ t = transitions[node_state]
+ if t is not None:
+ # yes, shutdown eligible
+ return (True, "node state is %s" % (node_state,))
+ else:
+ # no, return a reason
+ return (False, "node state is %s" % (node_state,))
def consider_shutdown(self):
try:
+ eligible, reason = self.shutdown_eligible()
next_opening = self._shutdowns.next_opening()
- eligible = self.shutdown_eligible()
- if eligible is True:
- self._debug("Suggesting shutdown.")
+ if eligible:
+ self._debug("Suggesting shutdown because %s", reason)
_notify_subscribers(self.actor_ref.proxy(), self.subscribers)
- elif self._shutdowns.window_open():
- self._debug("Cannot shut down because %s", eligible)
- elif self.last_shutdown_opening != next_opening:
- self._debug("Shutdown window closed. Next at %s.",
- time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
- self._timer.schedule(next_opening, self._later.consider_shutdown)
- self.last_shutdown_opening = next_opening
+ else:
+ self._debug("Not eligible for shut down because %s", reason)
+
+ if self.last_shutdown_opening != next_opening:
+ self._debug("Shutdown window closed. Next at %s.",
+ time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_opening)))
+ self._timer.schedule(next_opening, self._later.consider_shutdown)
+ self.last_shutdown_opening = next_opening
except Exception:
self._logger.exception("Unexpected exception")
import time
from . import \
- ComputeNodeSetupActor, ComputeNodeUpdateActor
+ ComputeNodeSetupActor, ComputeNodeUpdateActor, ComputeNodeMonitorActor
from . import ComputeNodeShutdownActor as ShutdownActorBase
-from . import ComputeNodeMonitorActor as MonitorActorBase
from .. import RetryMixin
class SlurmMixin(object):
return super(ComputeNodeShutdownActor, self).cancel_shutdown(reason)
@RetryMixin._retry((subprocess.CalledProcessError,))
- @ShutdownActorBase._stop_if_window_closed
def issue_slurm_drain(self):
- self._set_node_state(self._nodename, 'DRAIN', 'Reason=Node Manager shutdown')
- self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
- self._later.await_slurm_drain()
+ if self.cancel_reason is not None:
+ return
+ if self._nodename:
+ self._set_node_state(self._nodename, 'DRAIN', 'Reason=Node Manager shutdown')
+ self._logger.info("Waiting for SLURM node %s to drain", self._nodename)
+ self._later.await_slurm_drain()
+ else:
+ self._later.shutdown_node()
@RetryMixin._retry((subprocess.CalledProcessError,))
- @ShutdownActorBase._stop_if_window_closed
def await_slurm_drain(self):
+ if self.cancel_reason is not None:
+ return
output = self._get_slurm_state(self._nodename)
- if output in self.SLURM_END_STATES:
- self._later.shutdown_node()
- else:
+ if output in ("drng\n", "alloc\n", "drng*\n", "alloc*\n"):
self._timer.schedule(time.time() + 10,
self._later.await_slurm_drain)
-
-
-class ComputeNodeMonitorActor(SlurmMixin, MonitorActorBase):
-
- def shutdown_eligible(self):
- if self.arvados_node is not None:
- state = self._get_slurm_state(self.arvados_node['hostname'])
- # Automatically eligible for shutdown if it's down or failed, but
- # not drain to avoid a race condition with resume_node().
- if state in self.SLURM_END_STATES:
- if state in self.SLURM_DRAIN_STATES:
- return "node is draining"
- else:
- return True
- return super(ComputeNodeMonitorActor, self).shutdown_eligible()
-
- def resume_node(self):
- try:
- if (self.arvados_node is not None and
- self._get_slurm_state(self.arvados_node['hostname']) in self.SLURM_DRAIN_STATES):
- # Resume from "drng" or "drain"
- self._set_node_state(self.arvados_node['hostname'], 'RESUME')
- except Exception as error:
- self._logger.warn(
- "Exception reenabling node: %s", error, exc_info=error)
+ elif output in ("idle\n"):
+ # Not in "drng" so cancel self.
+ self.cancel_shutdown("slurm state is %s" % output.strip())
+ else:
+ # any other state.
+ self._later.shutdown_node()
--- /dev/null
+transitions = {
+ ('busy', 'closed', 'boot exceeded', 'idle exceeded'): None,
+ ('busy', 'closed', 'boot exceeded', 'idle wait'): None,
+ ('busy', 'closed', 'boot exceeded', 'not idle'): None,
+ ('busy', 'closed', 'boot wait', 'idle exceeded'): None,
+ ('busy', 'closed', 'boot wait', 'idle wait'): None,
+ ('busy', 'closed', 'boot wait', 'not idle'): None,
+ ('busy', 'open', 'boot exceeded', 'idle exceeded'): None,
+ ('busy', 'open', 'boot exceeded', 'idle wait'): None,
+ ('busy', 'open', 'boot exceeded', 'not idle'): None,
+ ('busy', 'open', 'boot wait', 'idle exceeded'): None,
+ ('busy', 'open', 'boot wait', 'idle wait'): None,
+ ('busy', 'open', 'boot wait', 'not idle'): None,
+
+ ('down', 'closed', 'boot exceeded', 'idle exceeded'): "START_SHUTDOWN",
+ ('down', 'closed', 'boot exceeded', 'idle wait'): "START_SHUTDOWN",
+ ('down', 'closed', 'boot exceeded', 'not idle'): "START_SHUTDOWN",
+ ('down', 'closed', 'boot wait', 'idle exceeded'): None,
+ ('down', 'closed', 'boot wait', 'idle wait'): None,
+ ('down', 'closed', 'boot wait', 'not idle'): None,
+ ('down', 'open', 'boot exceeded', 'idle exceeded'): "START_SHUTDOWN",
+ ('down', 'open', 'boot exceeded', 'idle wait'): "START_SHUTDOWN",
+ ('down', 'open', 'boot exceeded', 'not idle'): "START_SHUTDOWN",
+ ('down', 'open', 'boot wait', 'idle exceeded'): "START_SHUTDOWN",
+ ('down', 'open', 'boot wait', 'idle wait'): "START_SHUTDOWN",
+ ('down', 'open', 'boot wait', 'not idle'): "START_SHUTDOWN",
+
+ ('idle', 'closed', 'boot exceeded', 'idle exceeded'): None,
+ ('idle', 'closed', 'boot exceeded', 'idle wait'): None,
+ ('idle', 'closed', 'boot exceeded', 'not idle'): None,
+ ('idle', 'closed', 'boot wait', 'idle exceeded'): None,
+ ('idle', 'closed', 'boot wait', 'idle wait'): None,
+ ('idle', 'closed', 'boot wait', 'not idle'): None,
+ ('idle', 'open', 'boot exceeded', 'idle exceeded'): "START_DRAIN",
+ ('idle', 'open', 'boot exceeded', 'idle wait'): None,
+ ('idle', 'open', 'boot exceeded', 'not idle'): None,
+ ('idle', 'open', 'boot wait', 'idle exceeded'): "START_DRAIN",
+ ('idle', 'open', 'boot wait', 'idle wait'): None,
+ ('idle', 'open', 'boot wait', 'not idle'): None,
+
+ ('unpaired', 'closed', 'boot exceeded', 'idle exceeded'): "START_SHUTDOWN",
+ ('unpaired', 'closed', 'boot exceeded', 'idle wait'): "START_SHUTDOWN",
+ ('unpaired', 'closed', 'boot exceeded', 'not idle'): "START_SHUTDOWN",
+ ('unpaired', 'closed', 'boot wait', 'idle exceeded'): None,
+ ('unpaired', 'closed', 'boot wait', 'idle wait'): None,
+ ('unpaired', 'closed', 'boot wait', 'not idle'): None,
+ ('unpaired', 'open', 'boot exceeded', 'idle exceeded'): "START_SHUTDOWN",
+ ('unpaired', 'open', 'boot exceeded', 'idle wait'): "START_SHUTDOWN",
+ ('unpaired', 'open', 'boot exceeded', 'not idle'): "START_SHUTDOWN",
+ ('unpaired', 'open', 'boot wait', 'idle exceeded'): None,
+ ('unpaired', 'open', 'boot wait', 'idle wait'): None,
+ ('unpaired', 'open', 'boot wait', 'not idle'): None}
Arguments:
* term: The value that identifies a matching item.
- * list_method: A string that names the method to call on this
- instance's libcloud driver for a list of objects.
+ * list_method: A string that names the method to call for a
+ list of objects.
* key: A function that accepts a cloud object and returns a
value search for a `term` match on each item. Returns the
object's 'id' attribute by default.
"""
- items = getattr(self.real, list_method)(**kwargs)
+ items = getattr(self, list_method)(**kwargs)
results = [item for item in items if key(item) == term]
count = len(results)
if count != 1:
def _update_poll_time(self, poll_key):
self.last_polls[poll_key] = time.time()
- def _resume_node(self, node_record):
- node_record.actor.resume_node()
-
def _pair_nodes(self, node_record, arvados_node):
self._logger.info("Cloud node %s is now paired with Arvados node %s",
node_record.cloud_node.name, arvados_node['uuid'])
if cloud_rec.actor.offer_arvados_pair(arv_node).get():
self._pair_nodes(cloud_rec, arv_node)
break
- for rec in self.cloud_nodes.nodes.itervalues():
- # crunch-dispatch turns all slurm states that are not either "idle"
- # or "alloc" into "down", but in case that behavior changes, assume
- # any state that is not "idle" or "alloc" could be a state we want
- # to try to resume from.
- if (rec.arvados_node is not None and
- rec.arvados_node["info"].get("slurm_state") not in ("idle", "alloc") and
- rec.cloud_node.id not in self.shutdowns):
- self._resume_node(rec)
def _nodes_booting(self, size):
s = sum(1
booting_count = self._nodes_booting(size) + self._nodes_unpaired(size)
shutdown_count = self._size_shutdowns(size)
busy_count = self._nodes_busy(size)
- up_count = self._nodes_up(size) - (shutdown_count + busy_count + self._nodes_missing(size))
+ idle_count = self._nodes_up(size) - (busy_count + self._nodes_missing(size))
self._logger.info("%s: wishlist %i, up %i (booting %i, idle %i, busy %i), shutting down %i", size.name,
self._size_wishlist(size),
- up_count + busy_count,
+ idle_count + busy_count,
booting_count,
- up_count - booting_count,
+ idle_count - booting_count,
busy_count,
shutdown_count)
- wanted = self._size_wishlist(size) - up_count
+ wanted = self._size_wishlist(size) - idle_count
if wanted > 0 and self.max_total_price and ((total_price + (size.price*wanted)) > self.max_total_price):
can_boot = int((self.max_total_price - total_price) / size.price)
if can_boot == 0:
return wanted
def _nodes_excess(self, size):
- up_count = self._nodes_up(size) - self._size_shutdowns(size)
+ up_count = (self._nodes_booting(size) + self._nodes_booted(size)) - self._size_shutdowns(size)
if size.id == self.min_cloud_size.id:
up_count -= self.min_nodes
return up_count - self._nodes_busy(size) - self._size_wishlist(size)
cloud_node = testutil.cloud_node_mock(61)
arv_node = testutil.arvados_node_mock(61)
self.make_mocks(cloud_node, arv_node, shutdown_open=False)
+ self.cloud_client.destroy_node.return_value = False
self.make_actor(cancellable=True)
+ self.shutdown_actor.cancel_shutdown("test")
self.check_success_flag(False, 2)
self.assertFalse(self.arvados_client.nodes().update.called)
self.check_success_flag(True)
self.assertTrue(self.cloud_client.destroy_node.called)
- def test_shutdown_cancelled_when_window_closes(self):
- self.make_mocks(shutdown_open=False)
- self.make_actor()
- self.check_success_flag(False, 2)
- self.assertFalse(self.cloud_client.destroy_node.called)
- self.assertEqual(self.ACTOR_CLASS.WINDOW_CLOSED,
- self.shutdown_actor.cancel_reason.get(self.TIMEOUT))
-
def test_shutdown_retries_when_cloud_fails(self):
self.make_mocks()
self.cloud_client.destroy_node.return_value = False
def test_no_shutdown_booting(self):
self.make_actor()
self.shutdowns._set_state(True, 600)
- self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is still booting"))
+ self.assertEquals(self.node_actor.shutdown_eligible().get(self.TIMEOUT),
+ (False, "node state is ('unpaired', 'open', 'boot wait', 'idle exceeded')"))
def test_shutdown_without_arvados_node(self):
self.make_actor(start_time=0)
self.shutdowns._set_state(True, 600)
- self.assertIs(True, self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+ self.assertEquals(self.node_actor.shutdown_eligible().get(self.TIMEOUT), (True, "node state is ('unpaired', 'open', 'boot exceeded', 'idle exceeded')"))
- def test_no_shutdown_missing(self):
+ def test_shutdown_missing(self):
arv_node = testutil.arvados_node_mock(10, job_uuid=None,
crunch_worker_state="down",
last_ping_at='1970-01-01T01:02:03.04050607Z')
self.make_actor(10, arv_node)
self.shutdowns._set_state(True, 600)
- self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
+ self.assertEquals(self.node_actor.shutdown_eligible().get(self.TIMEOUT), (True, "node state is ('down', 'open', 'boot wait', 'idle exceeded')"))
- def test_no_shutdown_running_broken(self):
+ def test_shutdown_running_broken(self):
arv_node = testutil.arvados_node_mock(12, job_uuid=None,
crunch_worker_state="down")
self.make_actor(12, arv_node)
self.shutdowns._set_state(True, 600)
self.cloud_client.broken.return_value = True
- self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
+ self.assertEquals(self.node_actor.shutdown_eligible().get(self.TIMEOUT), (True, "node state is ('down', 'open', 'boot wait', 'idle exceeded')"))
def test_shutdown_missing_broken(self):
arv_node = testutil.arvados_node_mock(11, job_uuid=None,
self.make_actor(11, arv_node)
self.shutdowns._set_state(True, 600)
self.cloud_client.broken.return_value = True
- self.assertIs(True, self.node_actor.shutdown_eligible().get(self.TIMEOUT))
+ self.assertEquals(self.node_actor.shutdown_eligible().get(self.TIMEOUT), (True, "node state is ('down', 'open', 'boot wait', 'idle exceeded')"))
def test_no_shutdown_when_window_closed(self):
self.make_actor(3, testutil.arvados_node_mock(3, job_uuid=None))
- self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("shutdown window is not open."))
+ self.assertEquals(self.node_actor.shutdown_eligible().get(self.TIMEOUT),
+ (False, "node state is ('idle', 'closed', 'boot wait', 'idle exceeded')"))
def test_no_shutdown_when_node_running_job(self):
self.make_actor(4, testutil.arvados_node_mock(4, job_uuid=True))
self.shutdowns._set_state(True, 600)
- self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
+ self.assertEquals(self.node_actor.shutdown_eligible().get(self.TIMEOUT),
+ (False, "node state is ('busy', 'open', 'boot wait', 'idle exceeded')"))
def test_no_shutdown_when_node_state_unknown(self):
self.make_actor(5, testutil.arvados_node_mock(
5, crunch_worker_state=None))
self.shutdowns._set_state(True, 600)
- self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
+ self.assertEquals(self.node_actor.shutdown_eligible().get(self.TIMEOUT),
+ (False, "node is paired but crunch_worker_state is 'None'"))
def test_no_shutdown_when_node_state_stale(self):
self.make_actor(6, testutil.arvados_node_mock(6, age=90000))
self.shutdowns._set_state(True, 600)
- self.assertTrue(self.node_actor.shutdown_eligible().get(self.TIMEOUT).startswith("node is not idle."))
+ self.assertEquals(self.node_actor.shutdown_eligible().get(self.TIMEOUT),
+ (False, "node state is stale"))
def test_arvados_node_match(self):
self.make_actor(2)
self.check_success_after_reset(proc_mock, end_state)
return test
- for wait_state in ['alloc\n', 'drng\n', 'idle*\n']:
+ for wait_state in ['alloc\n', 'drng\n']:
locals()['test_wait_while_' + wait_state.strip()
] = make_wait_state_test(start_state=wait_state)
- for end_state in ['down\n', 'down*\n', 'drain\n', 'fail\n']:
+ for end_state in ['idle*\n', 'down\n', 'down*\n', 'drain\n', 'fail\n']:
locals()['test_wait_until_' + end_state.strip()
] = make_wait_state_test(end_state=end_state)
def test_slurm_bypassed_when_no_arvados_node(self, proc_mock):
# Test we correctly handle a node that failed to bootstrap.
- proc_mock.return_value = 'idle\n'
+ proc_mock.return_value = 'down\n'
self.make_actor(start_time=0)
self.check_success_flag(True)
self.assertFalse(proc_mock.called)
- def test_node_undrained_when_shutdown_window_closes(self, proc_mock):
- proc_mock.side_effect = iter(['drng\n', 'idle\n'])
- self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True))
- self.make_actor()
- self.check_success_flag(False, 2)
- self.check_slurm_got_args(proc_mock, 'NodeName=compute99', 'State=RESUME')
-
- def test_alloc_node_undrained_when_shutdown_window_closes(self, proc_mock):
- proc_mock.side_effect = iter(['alloc\n'])
- self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True))
- self.make_actor()
- self.check_success_flag(False, 2)
- self.check_slurm_got_args(proc_mock, 'sinfo', '--noheader', '-o', '%t', '-n', 'compute99')
+ def test_node_undrained_when_shutdown_cancelled(self, proc_mock):
+ try:
+ proc_mock.side_effect = iter(['', 'drng\n', 'drng\n', ''])
+ self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True))
+ self.timer = testutil.MockTimer(False)
+ self.make_actor()
+ self.busywait(lambda: proc_mock.call_args is not None)
+ self.shutdown_actor.cancel_shutdown("test").get(self.TIMEOUT)
+ self.check_success_flag(False, 2)
+ self.assertEqual(proc_mock.call_args_list,
+ [mock.call(['scontrol', 'update', 'NodeName=compute99', 'State=DRAIN', 'Reason=Node Manager shutdown']),
+ mock.call(['sinfo', '--noheader', '-o', '%t', '-n', 'compute99']),
+ mock.call(['sinfo', '--noheader', '-o', '%t', '-n', 'compute99']),
+ mock.call(['scontrol', 'update', 'NodeName=compute99', 'State=RESUME'])])
+ finally:
+ self.shutdown_actor.actor_ref.stop()
def test_cancel_shutdown_retry(self, proc_mock):
- proc_mock.side_effect = iter([OSError, 'drain\n', OSError, 'idle\n'])
+ proc_mock.side_effect = iter([OSError, 'drain\n', OSError, 'idle\n', 'idle\n'])
self.make_mocks(arvados_node=testutil.arvados_node_mock(job_uuid=True))
self.make_actor()
self.check_success_flag(False, 2)
proc_mock.return_value = 'drain\n'
super(SLURMComputeNodeShutdownActorTestCase,
self).test_arvados_node_cleaned_after_shutdown()
-
-class SLURMComputeNodeMonitorActorTestCase(testutil.ActorTestMixin,
- unittest.TestCase):
-
- def make_mocks(self, node_num):
- self.shutdowns = testutil.MockShutdownTimer()
- self.shutdowns._set_state(False, 300)
- self.timer = mock.MagicMock(name='timer_mock')
- self.updates = mock.MagicMock(name='update_mock')
- self.cloud_mock = testutil.cloud_node_mock(node_num)
- self.subscriber = mock.Mock(name='subscriber_mock')
- self.cloud_client = mock.MagicMock(name='cloud_client')
- self.cloud_client.broken.return_value = False
-
- def make_actor(self, node_num=1, arv_node=None, start_time=None):
- if not hasattr(self, 'cloud_mock'):
- self.make_mocks(node_num)
- if start_time is None:
- start_time = time.time()
- self.node_actor = slurm_dispatch.ComputeNodeMonitorActor.start(
- self.cloud_mock, start_time, self.shutdowns,
- testutil.cloud_node_fqdn, self.timer, self.updates, self.cloud_client,
- arv_node, boot_fail_after=300).proxy()
- self.node_actor.subscribe(self.subscriber).get(self.TIMEOUT)
-
- @mock.patch("subprocess.check_output")
- def test_resume_node(self, check_output):
- arv_node = testutil.arvados_node_mock()
- self.make_actor(arv_node=arv_node)
- check_output.return_value = "drain\n"
- self.node_actor.resume_node().get(self.TIMEOUT)
- self.assertIn(mock.call(['sinfo', '--noheader', '-o', '%t', '-n', arv_node['hostname']]), check_output.call_args_list)
- self.assertIn(mock.call(['scontrol', 'update', 'NodeName=' + arv_node['hostname'], 'State=RESUME']), check_output.call_args_list)
-
- @mock.patch("subprocess.check_output")
- def test_no_resume_idle_node(self, check_output):
- arv_node = testutil.arvados_node_mock()
- self.make_actor(arv_node=arv_node)
- check_output.return_value = "idle\n"
- self.node_actor.resume_node().get(self.TIMEOUT)
- self.assertIn(mock.call(['sinfo', '--noheader', '-o', '%t', '-n', arv_node['hostname']]), check_output.call_args_list)
- self.assertNotIn(mock.call(['scontrol', 'update', 'NodeName=' + arv_node['hostname'], 'State=RESUME']), check_output.call_args_list)
-
- @mock.patch("subprocess.check_output")
- def test_resume_node_exception(self, check_output):
- arv_node = testutil.arvados_node_mock()
- self.make_actor(arv_node=arv_node)
- check_output.side_effect = Exception()
- self.node_actor.resume_node().get(self.TIMEOUT)
- self.assertIn(mock.call(['sinfo', '--noheader', '-o', '%t', '-n', arv_node['hostname']]), check_output.call_args_list)
- self.assertNotIn(mock.call(['scontrol', 'update', 'NodeName=' + arv_node['hostname'], 'State=RESUME']), check_output.call_args_list)
-
- @mock.patch("subprocess.check_output")
- def test_shutdown_down_node(self, check_output):
- check_output.return_value = "down\n"
- self.make_actor(arv_node=testutil.arvados_node_mock())
- self.assertIs(True, self.node_actor.shutdown_eligible().get(self.TIMEOUT))
-
- @mock.patch("subprocess.check_output")
- def test_no_shutdown_drain_node(self, check_output):
- check_output.return_value = "drain\n"
- self.make_actor(arv_node=testutil.arvados_node_mock())
- self.assertEquals('node is draining', self.node_actor.shutdown_eligible().get(self.TIMEOUT))
n = driver.list_nodes()
self.assertEqual(nodelist, n)
self.driver_mock().list_nodes.assert_called_with(ex_fetch_nic=False, ex_resource_group='TestResourceGroup')
+
+ def test_create_can_find_node_after_timeout(self):
+ super(AzureComputeNodeDriverTestCase,
+ self).test_create_can_find_node_after_timeout(
+ create_kwargs={'tag_arvados-class': 'test'},
+ node_extra={'tags': {'arvados-class': 'test'}})
+
+ def test_node_found_after_timeout_has_fixed_size(self):
+ size = testutil.MockSize(4)
+ node_props = {'hardwareProfile': {'vmSize': size.id}}
+ cloud_node = testutil.cloud_node_mock(
+ size=None, tags={'arvados-class': 'test'}, properties=node_props)
+ self.check_node_found_after_timeout_has_fixed_size(
+ size, cloud_node, {'tag_arvados-class': 'test'})
self.assertIs(node, nodelist[0])
self.assertIs(size, nodelist[0].size)
+ def test_node_found_after_timeout_has_fixed_size(self):
+ size = testutil.MockSize(4)
+ cloud_node = testutil.cloud_node_mock(size=size.id)
+ self.check_node_found_after_timeout_has_fixed_size(size, cloud_node)
+
def test_list_empty_nodes(self):
self.driver_mock().list_nodes.return_value = []
self.assertEqual([], self.new_driver().list_nodes())
arv_node = testutil.arvados_node_mock(2, job_uuid=True)
self.make_daemon([testutil.cloud_node_mock(2, size=size)], [arv_node],
[size], avail_sizes=[(size, {"cores":1})])
+ self.busywait(lambda: self.node_setup.start.called)
self.stop_proxy(self.daemon)
self.assertTrue(self.node_setup.start.called)
def test_nodes_shutting_down_replaced_below_max_nodes(self):
size = testutil.MockSize(6)
cloud_node = testutil.cloud_node_mock(6, size=size)
- self.make_daemon([cloud_node], [testutil.arvados_node_mock(6)],
+ self.make_daemon([cloud_node], [testutil.arvados_node_mock(6, crunch_worker_state='down')],
avail_sizes=[(size, {"cores":1})])
self.assertEqual(1, self.alive_monitor_count())
monitor = self.monitor_list()[0].proxy()
self.stop_proxy(self.daemon)
self.assertEqual(1, self.last_shutdown.stop.call_count)
- def busywait(self, f):
- n = 0
- while not f() and n < 10:
- time.sleep(.1)
- n += 1
- self.assertTrue(f())
-
def test_node_create_two_sizes(self):
small = testutil.MockSize(1)
big = testutil.MockSize(2)
# test for that.
self.assertEqual(2, sizecounts[small.id])
self.assertEqual(1, sizecounts[big.id])
-
- @mock.patch("arvnodeman.daemon.NodeManagerDaemonActor._resume_node")
- def test_resume_drained_nodes(self, resume_node):
- cloud_node = testutil.cloud_node_mock(1)
- arv_node = testutil.arvados_node_mock(1, info={"ec2_instance_id": "1", "slurm_state": "down"})
- self.make_daemon([cloud_node], [arv_node])
- resume_node.assert_called_with(self.daemon.cloud_nodes.get(self.TIMEOUT).nodes.values()[0])
- self.stop_proxy(self.daemon)
-
- @mock.patch("arvnodeman.daemon.NodeManagerDaemonActor._resume_node")
- def test_no_resume_shutdown_nodes(self, resume_node):
- cloud_node = testutil.cloud_node_mock(1)
- arv_node = testutil.arvados_node_mock(1, info={"ec2_instance_id": "1", "slurm_state": "down"})
-
- self.make_daemon([cloud_node], [])
-
- self.node_shutdown = mock.MagicMock(name='shutdown_mock')
- self.daemon.shutdowns.get(self.TIMEOUT)[cloud_node.id] = self.node_shutdown
-
- self.daemon.update_arvados_nodes([arv_node]).get(self.TIMEOUT)
- self.stop_proxy(self.daemon)
- resume_node.assert_not_called()
if result is not unassigned:
return result
+ def busywait(self, f):
+ n = 0
+ while not f() and n < 10:
+ time.sleep(.1)
+ n += 1
+ self.assertTrue(f())
+
class DriverTestMixin(object):
def setUp(self):
self.assertTrue(self.driver_mock.called)
self.assertIs(driver.real, driver_mock2)
- def test_create_can_find_node_after_timeout(self):
- driver = self.new_driver()
+ def test_create_can_find_node_after_timeout(self, create_kwargs={}, node_extra={}):
+ driver = self.new_driver(create_kwargs=create_kwargs)
arv_node = arvados_node_mock()
- cloud_node = cloud_node_mock()
+ cloud_node = cloud_node_mock(**node_extra)
cloud_node.name = driver.create_cloud_name(arv_node)
create_method = self.driver_mock().create_node
create_method.side_effect = cloud_types.LibcloudError("fake timeout")
driver.create_node(MockSize(1), arv_node)
self.assertIs(create_method.side_effect, exc_test.exception)
+ def check_node_found_after_timeout_has_fixed_size(self, size, cloud_node,
+ create_kwargs={}):
+ # This method needs to be called explicitly by driver test suites
+ # that need it.
+ self.driver_mock().list_sizes.return_value = [size]
+ driver = self.new_driver(create_kwargs=create_kwargs)
+ arv_node = arvados_node_mock()
+ cloud_node.name = driver.create_cloud_name(arv_node)
+ create_method = self.driver_mock().create_node
+ create_method.side_effect = cloud_types.LibcloudError("fake timeout")
+ self.driver_mock().list_nodes.return_value = [cloud_node]
+ actual = driver.create_node(size, arv_node)
+ self.assertIs(size, actual.size)
+
class RemotePollLoopActorTestMixin(ActorTestMixin):
def build_monitor(self, *args, **kwargs):
}
run() {
+ if docker ps -a --filter "status=running" | grep -E "$ARVBOX_CONTAINER$" -q ; then
+ echo "Container $ARVBOX_CONTAINER is already running"
+ exit 0
+ fi
+
if docker ps -a | grep -E "$ARVBOX_CONTAINER$" -q ; then
- echo "Container $ARVBOX_CONTAINER is already running, use stop, restart or rebuild"
+ echo "Container $ARVBOX_CONTAINER already exists but is not running; use restart or rebuild"
exit 1
fi
--publish=25100:25100
--publish=25107:25107
--publish=25108:25108
- --publish=8001:8001"
+ --publish=8001:8001
+ --publish=8002:8002"
else
PUBLIC=""
fi
echo "Could not find Dockerfile (expected it at $ARVBOX_DOCKER/Dockerfile.base)"
exit 1
fi
- docker build -t arvados/arvbox-base -f "$ARVBOX_DOCKER/Dockerfile.base" "$ARVBOX_DOCKER"
+ docker build $NO_CACHE -t arvados/arvbox-base -f "$ARVBOX_DOCKER/Dockerfile.base" "$ARVBOX_DOCKER"
if test "$1" = localdemo -o "$1" = publicdemo ; then
- docker build -t arvados/arvbox-demo -f "$ARVBOX_DOCKER/Dockerfile.demo" "$ARVBOX_DOCKER"
+ docker build $NO_CACHE -t arvados/arvbox-demo -f "$ARVBOX_DOCKER/Dockerfile.demo" "$ARVBOX_DOCKER"
else
- docker build -t arvados/arvbox-dev -f "$ARVBOX_DOCKER/Dockerfile.dev" "$ARVBOX_DOCKER"
+ docker build $NO_CACHE -t arvados/arvbox-dev -f "$ARVBOX_DOCKER/Dockerfile.dev" "$ARVBOX_DOCKER"
fi
}
build $@
;;
+ rebuild)
+ check $@
+ NO_CACHE=--no-cache build $@
+ ;;
+
start|run)
check $@
run $@
run $@
;;
- rebuild)
+ reboot)
check $@
stop
build $@
*)
echo "Arvados-in-a-box http://arvados.org"
echo
- echo "$(basename $0) (build|start|run|open|shell|ip|stop|rebuild|reset|destroy|log|svrestart)"
- echo
- echo "build <config> build arvbox Docker image"
+ echo "build <config> build arvbox Docker image"
+ echo "rebuild <config> build arvbox Docker image, no layer cache"
echo "start|run <config> start $ARVBOX_CONTAINER container"
echo "open open arvbox workbench in a web browser"
echo "shell enter arvbox shell"
echo "status print some information about current arvbox"
echo "stop stop arvbox container"
echo "restart <config> stop, then run again"
- echo "rebuild <config> stop, build arvbox Docker image, run"
+ echo "reboot <config> stop, build arvbox Docker image, run"
echo "reset delete arvbox arvados data (be careful!)"
echo "destroy delete all arvbox code and data (be careful!)"
echo "log <service> tail log of specified service"
ADD crunch-setup.sh gitolite.rc \
keep-setup.sh common.sh createusers.sh \
logger runsu.sh waitforpostgres.sh \
- application_yml_override.py \
+ application_yml_override.py api-setup.sh \
/usr/local/lib/arvbox/
# Start the supervisor.
--- /dev/null
+#!/bin/bash
+
+exec 2>&1
+set -ex -o pipefail
+
+. /usr/local/lib/arvbox/common.sh
+
+cd /usr/src/arvados/services/api
+export RAILS_ENV=development
+
+set -u
+
+if ! test -s /var/lib/arvados/api_uuid_prefix ; then
+ ruby -e 'puts "#{rand(2**64).to_s(36)[0,5]}"' > /var/lib/arvados/api_uuid_prefix
+fi
+uuid_prefix=$(cat /var/lib/arvados/api_uuid_prefix)
+
+if ! test -s /var/lib/arvados/api_secret_token ; then
+ ruby -e 'puts rand(2**400).to_s(36)' > /var/lib/arvados/api_secret_token
+fi
+secret_token=$(cat /var/lib/arvados/api_secret_token)
+
+if ! test -s /var/lib/arvados/blob_signing_key ; then
+ ruby -e 'puts rand(2**400).to_s(36)' > /var/lib/arvados/blob_signing_key
+fi
+blob_signing_key=$(cat /var/lib/arvados/blob_signing_key)
+
+# self signed key will be created by SSO server script.
+test -s /var/lib/arvados/self-signed.key
+
+sso_app_secret=$(cat /var/lib/arvados/sso_app_secret)
+
+if test -s /var/lib/arvados/vm-uuid ; then
+ vm_uuid=$(cat /var/lib/arvados/vm-uuid)
+else
+ vm_uuid=$uuid_prefix-2x53u-$(ruby -e 'puts rand(2**400).to_s(36)[0,15]')
+ echo $vm_uuid > /var/lib/arvados/vm-uuid
+fi
+
+cat >config/application.yml <<EOF
+development:
+ uuid_prefix: $uuid_prefix
+ secret_token: $secret_token
+ blob_signing_key: $blob_signing_key
+ sso_app_secret: $sso_app_secret
+ sso_app_id: arvados-server
+ sso_provider_url: "https://$localip:${services[sso]}"
+ sso_insecure: true
+ workbench_address: "http://$localip/"
+ websocket_address: "ws://$localip:${services[websockets]}/websocket"
+ git_repo_ssh_base: "git@$localip:"
+ git_repo_https_base: "http://$localip:${services[arv-git-httpd]}/"
+ new_users_are_active: true
+ auto_admin_first_user: true
+ auto_setup_new_users: true
+ auto_setup_new_users_with_vm_uuid: $vm_uuid
+ auto_setup_new_users_with_repository: true
+ default_collection_replication: 1
+EOF
+
+(cd config && /usr/local/lib/arvbox/application_yml_override.py)
+
+if ! test -f /var/lib/arvados/api_database_pw ; then
+ ruby -e 'puts rand(2**128).to_s(36)' > /var/lib/arvados/api_database_pw
+fi
+database_pw=$(cat /var/lib/arvados/api_database_pw)
+
+if ! (psql postgres -c "\du" | grep "^ arvados ") >/dev/null ; then
+ psql postgres -c "create user arvados with password '$database_pw'"
+ psql postgres -c "ALTER USER arvados CREATEDB;"
+fi
+
+sed "s/password:.*/password: $database_pw/" <config/database.yml.example >config/database.yml
+
+if ! test -f /var/lib/arvados/api_database_setup ; then
+ bundle exec rake db:setup
+ touch /var/lib/arvados/api_database_setup
+fi
+
+if ! test -s /var/lib/arvados/superuser_token ; then
+ bundle exec ./script/create_superuser_token.rb > /var/lib/arvados/superuser_token
+fi
+
+rm -rf tmp
+
+bundle exec rake db:migrate
[keepstore1]=25108
[ssh]=22
[doc]=8001
+ [websockets]=8002
)
if test "$(id arvbox -u 2>/dev/null)" = 0 ; then
HOSTGID=$(ls -nd /usr/src/arvados | sed 's/ */ /' | cut -d' ' -f5)
FUSEGID=$(ls -nd /dev/fuse | sed 's/ */ /' | cut -d' ' -f5)
- mkdir -p /var/lib/arvados/git /var/lib/gems /var/lib/passenger
+ mkdir -p /var/lib/arvados/git /var/lib/gems \
+ /var/lib/passenger /var/lib/gopath /var/lib/pip
groupadd --gid $HOSTGID --non-unique arvbox
groupadd --gid $FUSEGID --non-unique fuse
chown arvbox:arvbox -R /usr/local /var/lib/arvados /var/lib/gems \
/var/lib/passenger /var/lib/postgresql \
- /var/lib/nginx /var/log/nginx /etc/ssl/private
+ /var/lib/nginx /var/log/nginx /etc/ssl/private \
+ /var/lib/gopath /var/lib/pip
mkdir -p /var/lib/gems/ruby/2.1.0
chown arvbox:arvbox -R /var/lib/gems/ruby/2.1.0
exit
fi
-set -u
-
-if ! test -s /var/lib/arvados/api_uuid_prefix ; then
- ruby -e 'puts "#{rand(2**64).to_s(36)[0,5]}"' > /var/lib/arvados/api_uuid_prefix
-fi
-uuid_prefix=$(cat /var/lib/arvados/api_uuid_prefix)
-
-if ! test -s /var/lib/arvados/api_secret_token ; then
- ruby -e 'puts rand(2**400).to_s(36)' > /var/lib/arvados/api_secret_token
-fi
-secret_token=$(cat /var/lib/arvados/api_secret_token)
-
-if ! test -s /var/lib/arvados/blob_signing_key ; then
- ruby -e 'puts rand(2**400).to_s(36)' > /var/lib/arvados/blob_signing_key
-fi
-blob_signing_key=$(cat /var/lib/arvados/blob_signing_key)
-
-# self signed key will be created by SSO server script.
-test -s /var/lib/arvados/self-signed.key
-
-sso_app_secret=$(cat /var/lib/arvados/sso_app_secret)
-
-if test -s /var/lib/arvados/vm-uuid ; then
- vm_uuid=$(cat /var/lib/arvados/vm-uuid)
-else
- vm_uuid=$uuid_prefix-2x53u-$(ruby -e 'puts rand(2**400).to_s(36)[0,15]')
- echo $vm_uuid > /var/lib/arvados/vm-uuid
-fi
-
-cat >config/application.yml <<EOF
-development:
- uuid_prefix: $uuid_prefix
- secret_token: $secret_token
- blob_signing_key: $blob_signing_key
- sso_app_secret: $sso_app_secret
- sso_app_id: arvados-server
- sso_provider_url: "https://$localip:${services[sso]}"
- sso_insecure: true
- workbench_address: "http://$localip/"
- git_repo_ssh_base: "git@$localip:"
- git_repo_https_base: "http://$localip:${services[arv-git-httpd]}/"
- new_users_are_active: true
- auto_admin_first_user: true
- auto_setup_new_users: true
- auto_setup_new_users_with_vm_uuid: $vm_uuid
- auto_setup_new_users_with_repository: true
- default_collection_replication: 1
-EOF
-
-(cd config && /usr/local/lib/arvbox/application_yml_override.py)
-
-if ! test -f /var/lib/arvados/api_database_pw ; then
- ruby -e 'puts rand(2**128).to_s(36)' > /var/lib/arvados/api_database_pw
-fi
-database_pw=$(cat /var/lib/arvados/api_database_pw)
-
-if ! (psql postgres -c "\du" | grep "^ arvados ") >/dev/null ; then
- psql postgres -c "create user arvados with password '$database_pw'"
- psql postgres -c "ALTER USER arvados CREATEDB;"
-fi
-
-sed "s/password:.*/password: $database_pw/" <config/database.yml.example >config/database.yml
-
-if ! test -f /var/lib/arvados/api_database_setup ; then
- bundle exec rake db:setup
- touch /var/lib/arvados/api_database_setup
-fi
-
-if ! test -s /var/lib/arvados/superuser_token ; then
- bundle exec ./script/create_superuser_token.rb > /var/lib/arvados/superuser_token
-fi
-
-rm -rf tmp
-
-bundle exec rake db:migrate
+flock /var/lib/arvados/api.lock /usr/local/lib/arvbox/api-setup.sh
set +u
if test "$1" = "--only-setup" ; then
exit
fi
-ARVADOS_WEBSOCKETS=1 exec bundle exec passenger start --port=${services[api]} \
+exec bundle exec passenger start --port=${services[api]} \
--runtime-dir=/var/lib/passenger \
--ssl --ssl-certificate=/var/lib/arvados/self-signed.pem \
--ssl-certificate-key=/var/lib/arvados/self-signed.key
--- /dev/null
+/usr/local/lib/arvbox/logger
\ No newline at end of file
--- /dev/null
+/usr/local/lib/arvbox/runsu.sh
\ No newline at end of file
--- /dev/null
+#!/bin/bash
+
+exec 2>&1
+set -ex -o pipefail
+
+. /usr/local/lib/arvbox/common.sh
+
+cd /usr/src/arvados/services/api
+export RAILS_ENV=development
+
+run_bundler --without=development
+
+if test "$1" = "--only-deps" ; then
+ exit
+fi
+
+flock /var/lib/arvados/api.lock /usr/local/lib/arvbox/api-setup.sh
+
+set +u
+if test "$1" = "--only-setup" ; then
+ exit
+fi
+
+export ARVADOS_WEBSOCKETS=ws-only
+
+# serving ssl directly doesn't work, gets
+# Rack app error: #<TypeError: no implicit conversion of Puma::MiniSSL::Socket into Integer>
+#exec bundle exec puma -b "ssl://0.0.0.0:${services[websockets]}?cert=/var/lib/arvados/self-signed.pem&key=/var/lib/arvados/self-signed.key"
+
+exec bundle exec puma -p${services[websockets]}
import arvados
import Queue
import threading
+import _strptime
from crunchstat_summary import logger
import re
import sys
import threading
+import _strptime
from arvados.api import OrderedJsonModel
from crunchstat_summary import logger
--- /dev/null
+keep-block-check
--- /dev/null
+package main
+
+import (
+ "crypto/tls"
+ "errors"
+ "flag"
+ "fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "io/ioutil"
+ "log"
+ "net/http"
+ "os"
+ "regexp"
+ "strings"
+ "time"
+)
+
+func main() {
+ err := doMain(os.Args[1:])
+ if err != nil {
+ log.Fatalf("%v", err)
+ }
+}
+
+func doMain(args []string) error {
+ flags := flag.NewFlagSet("keep-block-check", flag.ExitOnError)
+
+ configFile := flags.String(
+ "config",
+ "",
+ "Configuration filename. May be either a pathname to a config file, or (for example) 'foo' as shorthand for $HOME/.config/arvados/foo.conf file. This file is expected to specify the values for ARVADOS_API_TOKEN, ARVADOS_API_HOST, ARVADOS_API_HOST_INSECURE, and ARVADOS_BLOB_SIGNING_KEY for the source.")
+
+ keepServicesJSON := flags.String(
+ "keep-services-json",
+ "",
+ "An optional list of available keepservices. "+
+ "If not provided, this list is obtained from api server configured in config-file.")
+
+ locatorFile := flags.String(
+ "block-hash-file",
+ "",
+ "Filename containing the block hashes to be checked. This is required. "+
+ "This file contains the block hashes one per line.")
+
+ prefix := flags.String(
+ "prefix",
+ "",
+ "Block hash prefix. When a prefix is specified, only hashes listed in the file with this prefix will be checked.")
+
+ blobSignatureTTLFlag := flags.Duration(
+ "blob-signature-ttl",
+ 0,
+ "Lifetime of blob permission signatures on the keepservers. If not provided, this will be retrieved from the API server's discovery document.")
+
+ verbose := flags.Bool(
+ "v",
+ false,
+ "Log progress of each block verification")
+
+ // Parse args; omit the first arg which is the command name
+ flags.Parse(args)
+
+ config, blobSigningKey, err := loadConfig(*configFile)
+ if err != nil {
+ return fmt.Errorf("Error loading configuration from file: %s", err.Error())
+ }
+
+ // get list of block locators to be checked
+ blockLocators, err := getBlockLocators(*locatorFile, *prefix)
+ if err != nil {
+ return fmt.Errorf("Error reading block hashes to be checked from file: %s", err.Error())
+ }
+
+ // setup keepclient
+ kc, blobSignatureTTL, err := setupKeepClient(config, *keepServicesJSON, *blobSignatureTTLFlag)
+ if err != nil {
+ return fmt.Errorf("Error configuring keepclient: %s", err.Error())
+ }
+
+ return performKeepBlockCheck(kc, blobSignatureTTL, blobSigningKey, blockLocators, *verbose)
+}
+
+type apiConfig struct {
+ APIToken string
+ APIHost string
+ APIHostInsecure bool
+ ExternalClient bool
+}
+
+// Load config from given file
+func loadConfig(configFile string) (config apiConfig, blobSigningKey string, err error) {
+ if configFile == "" {
+ err = errors.New("Client config file not specified")
+ return
+ }
+
+ config, blobSigningKey, err = readConfigFromFile(configFile)
+ return
+}
+
+var matchTrue = regexp.MustCompile("^(?i:1|yes|true)$")
+
+// Read config from file
+func readConfigFromFile(filename string) (config apiConfig, blobSigningKey string, err error) {
+ if !strings.Contains(filename, "/") {
+ filename = os.Getenv("HOME") + "/.config/arvados/" + filename + ".conf"
+ }
+
+ content, err := ioutil.ReadFile(filename)
+
+ if err != nil {
+ return
+ }
+
+ lines := strings.Split(string(content), "\n")
+ for _, line := range lines {
+ if line == "" {
+ continue
+ }
+
+ kv := strings.SplitN(line, "=", 2)
+ if len(kv) == 2 {
+ key := strings.TrimSpace(kv[0])
+ value := strings.TrimSpace(kv[1])
+
+ switch key {
+ case "ARVADOS_API_TOKEN":
+ config.APIToken = value
+ case "ARVADOS_API_HOST":
+ config.APIHost = value
+ case "ARVADOS_API_HOST_INSECURE":
+ config.APIHostInsecure = matchTrue.MatchString(value)
+ case "ARVADOS_EXTERNAL_CLIENT":
+ config.ExternalClient = matchTrue.MatchString(value)
+ case "ARVADOS_BLOB_SIGNING_KEY":
+ blobSigningKey = value
+ }
+ }
+ }
+
+ return
+}
+
+// setup keepclient using the config provided
+func setupKeepClient(config apiConfig, keepServicesJSON string, blobSignatureTTL time.Duration) (kc *keepclient.KeepClient, ttl time.Duration, err error) {
+ arv := arvadosclient.ArvadosClient{
+ ApiToken: config.APIToken,
+ ApiServer: config.APIHost,
+ ApiInsecure: config.APIHostInsecure,
+ Client: &http.Client{Transport: &http.Transport{
+ TLSClientConfig: &tls.Config{InsecureSkipVerify: config.APIHostInsecure}}},
+ External: config.ExternalClient,
+ }
+
+ // if keepServicesJSON is provided, use it to load services; else, use DiscoverKeepServers
+ if keepServicesJSON == "" {
+ kc, err = keepclient.MakeKeepClient(&arv)
+ if err != nil {
+ return
+ }
+ } else {
+ kc = keepclient.New(&arv)
+ err = kc.LoadKeepServicesFromJSON(keepServicesJSON)
+ if err != nil {
+ return
+ }
+ }
+
+ // Get if blobSignatureTTL is not provided
+ ttl = blobSignatureTTL
+ if blobSignatureTTL == 0 {
+ value, err := arv.Discovery("blobSignatureTtl")
+ if err == nil {
+ ttl = time.Duration(int(value.(float64))) * time.Second
+ } else {
+ return nil, 0, err
+ }
+ }
+
+ return
+}
+
+// Get list of unique block locators from the given file
+func getBlockLocators(locatorFile, prefix string) (locators []string, err error) {
+ if locatorFile == "" {
+ err = errors.New("block-hash-file not specified")
+ return
+ }
+
+ content, err := ioutil.ReadFile(locatorFile)
+ if err != nil {
+ return
+ }
+
+ locatorMap := make(map[string]bool)
+ for _, line := range strings.Split(string(content), "\n") {
+ line = strings.TrimSpace(line)
+ if line == "" || !strings.HasPrefix(line, prefix) || locatorMap[line] {
+ continue
+ }
+ locators = append(locators, line)
+ locatorMap[line] = true
+ }
+
+ return
+}
+
+// Get block headers from keep. Log any errors.
+func performKeepBlockCheck(kc *keepclient.KeepClient, blobSignatureTTL time.Duration, blobSigningKey string, blockLocators []string, verbose bool) error {
+ totalBlocks := len(blockLocators)
+ notFoundBlocks := 0
+ current := 0
+ for _, locator := range blockLocators {
+ current++
+ if verbose {
+ log.Printf("Verifying block %d of %d: %v", current, totalBlocks, locator)
+ }
+ getLocator := locator
+ if blobSigningKey != "" {
+ expiresAt := time.Now().AddDate(0, 0, 1)
+ getLocator = keepclient.SignLocator(locator, kc.Arvados.ApiToken, expiresAt, blobSignatureTTL, []byte(blobSigningKey))
+ }
+
+ _, _, err := kc.Ask(getLocator)
+ if err != nil {
+ notFoundBlocks++
+ log.Printf("Error verifying block %v: %v", locator, err)
+ }
+ }
+
+ log.Printf("Verify block totals: %d attempts, %d successes, %d errors", totalBlocks, totalBlocks-notFoundBlocks, notFoundBlocks)
+
+ if notFoundBlocks > 0 {
+ return fmt.Errorf("Block verification failed for %d out of %d blocks with matching prefix.", notFoundBlocks, totalBlocks)
+ }
+
+ return nil
+}
--- /dev/null
+package main
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "log"
+ "os"
+ "regexp"
+ "strings"
+ "testing"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+
+ . "gopkg.in/check.v1"
+)
+
+// Gocheck boilerplate
+func Test(t *testing.T) {
+ TestingT(t)
+}
+
+// Gocheck boilerplate
+var _ = Suite(&ServerRequiredSuite{})
+var _ = Suite(&DoMainTestSuite{})
+
+type ServerRequiredSuite struct{}
+type DoMainTestSuite struct{}
+
+var kc *keepclient.KeepClient
+var logBuffer bytes.Buffer
+
+var TestHash = "aaaa09c290d0fb1ca068ffaddf22cbd0"
+var TestHash2 = "aaaac516f788aec4f30932ffb6395c39"
+
+var blobSignatureTTL = time.Duration(2*7*24) * time.Hour
+
+func (s *ServerRequiredSuite) SetUpSuite(c *C) {
+ arvadostest.StartAPI()
+}
+
+func (s *ServerRequiredSuite) TearDownSuite(c *C) {
+ arvadostest.StopAPI()
+ arvadostest.ResetEnv()
+}
+
+func (s *ServerRequiredSuite) SetUpTest(c *C) {
+ logOutput := io.MultiWriter(&logBuffer)
+ log.SetOutput(logOutput)
+}
+
+func (s *ServerRequiredSuite) TearDownTest(c *C) {
+ arvadostest.StopKeep(2)
+ log.SetOutput(os.Stdout)
+ log.Printf("%v", logBuffer.String())
+}
+
+func (s *DoMainTestSuite) SetUpSuite(c *C) {
+}
+
+func (s *DoMainTestSuite) SetUpTest(c *C) {
+ logOutput := io.MultiWriter(&logBuffer)
+ log.SetOutput(logOutput)
+}
+
+func (s *DoMainTestSuite) TearDownTest(c *C) {
+ log.SetOutput(os.Stdout)
+ log.Printf("%v", logBuffer.String())
+}
+
+func setupKeepBlockCheck(c *C, enforcePermissions bool, keepServicesJSON string) {
+ setupKeepBlockCheckWithTTL(c, enforcePermissions, keepServicesJSON, blobSignatureTTL)
+}
+
+func setupKeepBlockCheckWithTTL(c *C, enforcePermissions bool, keepServicesJSON string, ttl time.Duration) {
+ var config apiConfig
+ config.APIHost = os.Getenv("ARVADOS_API_HOST")
+ config.APIToken = arvadostest.DataManagerToken
+ config.APIHostInsecure = matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
+
+ // Start Keep servers
+ arvadostest.StartKeep(2, enforcePermissions)
+
+ // setup keepclients
+ var err error
+ kc, ttl, err = setupKeepClient(config, keepServicesJSON, ttl)
+ c.Assert(ttl, Equals, blobSignatureTTL)
+ c.Check(err, IsNil)
+}
+
+// Setup test data
+func setupTestData(c *C) []string {
+ allLocators := []string{}
+
+ // Put a few blocks
+ for i := 0; i < 5; i++ {
+ hash, _, err := kc.PutB([]byte(fmt.Sprintf("keep-block-check-test-data-%d", i)))
+ c.Check(err, IsNil)
+ allLocators = append(allLocators, strings.Split(hash, "+A")[0])
+ }
+
+ return allLocators
+}
+
+func setupConfigFile(c *C, fileName string) string {
+ // Setup a config file
+ file, err := ioutil.TempFile(os.TempDir(), fileName)
+ c.Check(err, IsNil)
+
+ // Add config to file. While at it, throw some extra white space
+ fileContent := "ARVADOS_API_HOST=" + os.Getenv("ARVADOS_API_HOST") + "\n"
+ fileContent += "ARVADOS_API_TOKEN=" + arvadostest.DataManagerToken + "\n"
+ fileContent += "\n"
+ fileContent += "ARVADOS_API_HOST_INSECURE=" + os.Getenv("ARVADOS_API_HOST_INSECURE") + "\n"
+ fileContent += " ARVADOS_EXTERNAL_CLIENT = false \n"
+ fileContent += " NotANameValuePairAndShouldGetIgnored \n"
+ fileContent += "ARVADOS_BLOB_SIGNING_KEY=abcdefg\n"
+
+ _, err = file.Write([]byte(fileContent))
+ c.Check(err, IsNil)
+
+ return file.Name()
+}
+
+func setupBlockHashFile(c *C, name string, blocks []string) string {
+ // Setup a block hash file
+ file, err := ioutil.TempFile(os.TempDir(), name)
+ c.Check(err, IsNil)
+
+ // Add the hashes to the file. While at it, throw some extra white space
+ fileContent := ""
+ for _, hash := range blocks {
+ fileContent += fmt.Sprintf(" %s \n", hash)
+ }
+ fileContent += "\n"
+ _, err = file.Write([]byte(fileContent))
+ c.Check(err, IsNil)
+
+ return file.Name()
+}
+
+func checkErrorLog(c *C, blocks []string, prefix, suffix string) {
+ for _, hash := range blocks {
+ expected := prefix + `.*` + hash + `.*` + suffix
+ match, _ := regexp.MatchString(expected, logBuffer.String())
+ c.Assert(match, Equals, true)
+ }
+}
+
+func checkNoErrorsLogged(c *C, prefix, suffix string) {
+ expected := prefix + `.*` + suffix
+ match, _ := regexp.MatchString(expected, logBuffer.String())
+ c.Assert(match, Equals, false)
+}
+
+func (s *ServerRequiredSuite) TestBlockCheck(c *C) {
+ setupKeepBlockCheck(c, false, "")
+ allLocators := setupTestData(c)
+ err := performKeepBlockCheck(kc, blobSignatureTTL, "", allLocators, true)
+ c.Check(err, IsNil)
+ checkNoErrorsLogged(c, "Error verifying block", "Block not found")
+}
+
+func (s *ServerRequiredSuite) TestBlockCheckWithBlobSigning(c *C) {
+ setupKeepBlockCheck(c, true, "")
+ allLocators := setupTestData(c)
+ err := performKeepBlockCheck(kc, blobSignatureTTL, arvadostest.BlobSigningKey, allLocators, true)
+ c.Check(err, IsNil)
+ checkNoErrorsLogged(c, "Error verifying block", "Block not found")
+}
+
+func (s *ServerRequiredSuite) TestBlockCheckWithBlobSigningAndTTLFromDiscovery(c *C) {
+ setupKeepBlockCheckWithTTL(c, true, "", 0)
+ allLocators := setupTestData(c)
+ err := performKeepBlockCheck(kc, blobSignatureTTL, arvadostest.BlobSigningKey, allLocators, true)
+ c.Check(err, IsNil)
+ checkNoErrorsLogged(c, "Error verifying block", "Block not found")
+}
+
+func (s *ServerRequiredSuite) TestBlockCheck_NoSuchBlock(c *C) {
+ setupKeepBlockCheck(c, false, "")
+ allLocators := setupTestData(c)
+ allLocators = append(allLocators, TestHash)
+ allLocators = append(allLocators, TestHash2)
+ err := performKeepBlockCheck(kc, blobSignatureTTL, "", allLocators, true)
+ c.Check(err, NotNil)
+ c.Assert(err.Error(), Equals, "Block verification failed for 2 out of 7 blocks with matching prefix.")
+ checkErrorLog(c, []string{TestHash, TestHash2}, "Error verifying block", "Block not found")
+}
+
+func (s *ServerRequiredSuite) TestBlockCheck_NoSuchBlock_WithMatchingPrefix(c *C) {
+ setupKeepBlockCheck(c, false, "")
+ allLocators := setupTestData(c)
+ allLocators = append(allLocators, TestHash)
+ allLocators = append(allLocators, TestHash2)
+ locatorFile := setupBlockHashFile(c, "block-hash", allLocators)
+ defer os.Remove(locatorFile)
+ locators, err := getBlockLocators(locatorFile, "aaa")
+ c.Check(err, IsNil)
+ err = performKeepBlockCheck(kc, blobSignatureTTL, "", locators, true)
+ c.Check(err, NotNil)
+ // Of the 7 blocks in allLocators, only two match the prefix and hence only those are checked
+ c.Assert(err.Error(), Equals, "Block verification failed for 2 out of 2 blocks with matching prefix.")
+ checkErrorLog(c, []string{TestHash, TestHash2}, "Error verifying block", "Block not found")
+}
+
+func (s *ServerRequiredSuite) TestBlockCheck_NoSuchBlock_WithPrefixMismatch(c *C) {
+ setupKeepBlockCheck(c, false, "")
+ allLocators := setupTestData(c)
+ allLocators = append(allLocators, TestHash)
+ allLocators = append(allLocators, TestHash2)
+ locatorFile := setupBlockHashFile(c, "block-hash", allLocators)
+ defer os.Remove(locatorFile)
+ locators, err := getBlockLocators(locatorFile, "999")
+ c.Check(err, IsNil)
+ err = performKeepBlockCheck(kc, blobSignatureTTL, "", locators, true)
+ c.Check(err, IsNil) // there were no matching locators in file and hence nothing was checked
+}
+
+func (s *ServerRequiredSuite) TestBlockCheck_BadSignature(c *C) {
+ setupKeepBlockCheck(c, true, "")
+ setupTestData(c)
+ err := performKeepBlockCheck(kc, blobSignatureTTL, "badblobsigningkey", []string{TestHash, TestHash2}, false)
+ c.Assert(err.Error(), Equals, "Block verification failed for 2 out of 2 blocks with matching prefix.")
+ checkErrorLog(c, []string{TestHash, TestHash2}, "Error verifying block", "HTTP 403")
+ // verbose logging not requested
+ c.Assert(strings.Contains(logBuffer.String(), "Verifying block 1 of 2"), Equals, false)
+}
+
+var testKeepServicesJSON = `{
+ "kind":"arvados#keepServiceList",
+ "etag":"",
+ "self_link":"",
+ "offset":null, "limit":null,
+ "items":[
+ {"href":"/keep_services/zzzzz-bi6l4-123456789012340",
+ "kind":"arvados#keepService",
+ "uuid":"zzzzz-bi6l4-123456789012340",
+ "service_host":"keep0.zzzzz.arvadosapi.com",
+ "service_port":25107,
+ "service_ssl_flag":false,
+ "service_type":"disk",
+ "read_only":false },
+ {"href":"/keep_services/zzzzz-bi6l4-123456789012341",
+ "kind":"arvados#keepService",
+ "uuid":"zzzzz-bi6l4-123456789012341",
+ "service_host":"keep0.zzzzz.arvadosapi.com",
+ "service_port":25108,
+ "service_ssl_flag":false,
+ "service_type":"disk",
+ "read_only":false }
+ ],
+ "items_available":2 }`
+
+// Setup block-check using keepServicesJSON with fake keepservers.
+// Expect error during performKeepBlockCheck due to unreachable keepservers.
+func (s *ServerRequiredSuite) TestErrorDuringKeepBlockCheck_FakeKeepservers(c *C) {
+ setupKeepBlockCheck(c, false, testKeepServicesJSON)
+ err := performKeepBlockCheck(kc, blobSignatureTTL, "", []string{TestHash, TestHash2}, true)
+ c.Assert(err.Error(), Equals, "Block verification failed for 2 out of 2 blocks with matching prefix.")
+ checkErrorLog(c, []string{TestHash, TestHash2}, "Error verifying block", "")
+}
+
+// Test keep-block-check initialization with keepServicesJSON
+func (s *ServerRequiredSuite) TestKeepBlockCheck_InitializeWithKeepServicesJSON(c *C) {
+ setupKeepBlockCheck(c, false, testKeepServicesJSON)
+ found := 0
+ for k := range kc.LocalRoots() {
+ if k == "zzzzz-bi6l4-123456789012340" || k == "zzzzz-bi6l4-123456789012341" {
+ found++
+ }
+ }
+ c.Check(found, Equals, 2)
+}
+
+// Test loadConfig func
+func (s *ServerRequiredSuite) TestLoadConfig(c *C) {
+ // Setup config file
+ configFile := setupConfigFile(c, "config")
+ defer os.Remove(configFile)
+
+ // load configuration from the file
+ config, blobSigningKey, err := loadConfig(configFile)
+ c.Check(err, IsNil)
+
+ c.Assert(config.APIHost, Equals, os.Getenv("ARVADOS_API_HOST"))
+ c.Assert(config.APIToken, Equals, arvadostest.DataManagerToken)
+ c.Assert(config.APIHostInsecure, Equals, matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE")))
+ c.Assert(config.ExternalClient, Equals, false)
+ c.Assert(blobSigningKey, Equals, "abcdefg")
+}
+
+func (s *DoMainTestSuite) Test_doMain_WithNoConfig(c *C) {
+ args := []string{"-prefix", "a"}
+ err := doMain(args)
+ c.Check(err, NotNil)
+ c.Assert(strings.Contains(err.Error(), "config file not specified"), Equals, true)
+}
+
+func (s *DoMainTestSuite) Test_doMain_WithNoSuchConfigFile(c *C) {
+ args := []string{"-config", "no-such-file"}
+ err := doMain(args)
+ c.Check(err, NotNil)
+ c.Assert(strings.Contains(err.Error(), "no such file or directory"), Equals, true)
+}
+
+func (s *DoMainTestSuite) Test_doMain_WithNoBlockHashFile(c *C) {
+ config := setupConfigFile(c, "config")
+ defer os.Remove(config)
+
+ // Start keepservers.
+ arvadostest.StartKeep(2, false)
+ defer arvadostest.StopKeep(2)
+
+ args := []string{"-config", config}
+ err := doMain(args)
+ c.Assert(strings.Contains(err.Error(), "block-hash-file not specified"), Equals, true)
+}
+
+func (s *DoMainTestSuite) Test_doMain_WithNoSuchBlockHashFile(c *C) {
+ config := setupConfigFile(c, "config")
+ defer os.Remove(config)
+
+ arvadostest.StartKeep(2, false)
+ defer arvadostest.StopKeep(2)
+
+ args := []string{"-config", config, "-block-hash-file", "no-such-file"}
+ err := doMain(args)
+ c.Assert(strings.Contains(err.Error(), "no such file or directory"), Equals, true)
+}
+
+func (s *DoMainTestSuite) Test_doMain(c *C) {
+ // Start keepservers.
+ arvadostest.StartKeep(2, false)
+ defer arvadostest.StopKeep(2)
+
+ config := setupConfigFile(c, "config")
+ defer os.Remove(config)
+
+ locatorFile := setupBlockHashFile(c, "block-hash", []string{TestHash, TestHash2})
+ defer os.Remove(locatorFile)
+
+ args := []string{"-config", config, "-block-hash-file", locatorFile, "-v"}
+ err := doMain(args)
+ c.Check(err, NotNil)
+ c.Assert(err.Error(), Equals, "Block verification failed for 2 out of 2 blocks with matching prefix.")
+ checkErrorLog(c, []string{TestHash, TestHash2}, "Error verifying block", "Block not found")
+ c.Assert(strings.Contains(logBuffer.String(), "Verifying block 1 of 2"), Equals, true)
+}
"",
"Index prefix")
+ srcBlobSignatureTTLFlag := flags.Duration(
+ "src-blob-signature-ttl",
+ 0,
+ "Lifetime of blob permission signatures on source keepservers. If not provided, this will be retrieved from the API server's discovery document.")
+
// Parse args; omit the first arg which is the command name
flags.Parse(os.Args[1:])
}
// setup src and dst keepclients
- kcSrc, err := setupKeepClient(srcConfig, *srcKeepServicesJSON, false, 0)
+ kcSrc, srcBlobSignatureTTL, err := setupKeepClient(srcConfig, *srcKeepServicesJSON, false, 0, *srcBlobSignatureTTLFlag)
if err != nil {
return fmt.Errorf("Error configuring src keepclient: %s", err.Error())
}
- kcDst, err := setupKeepClient(dstConfig, *dstKeepServicesJSON, true, *replications)
+ kcDst, _, err := setupKeepClient(dstConfig, *dstKeepServicesJSON, true, *replications, 0)
if err != nil {
return fmt.Errorf("Error configuring dst keepclient: %s", err.Error())
}
// Copy blocks not found in dst from src
- err = performKeepRsync(kcSrc, kcDst, srcBlobSigningKey, *prefix)
+ err = performKeepRsync(kcSrc, kcDst, srcBlobSignatureTTL, srcBlobSigningKey, *prefix)
if err != nil {
return fmt.Errorf("Error while syncing data: %s", err.Error())
}
}
// setup keepclient using the config provided
-func setupKeepClient(config apiConfig, keepServicesJSON string, isDst bool, replications int) (kc *keepclient.KeepClient, err error) {
+func setupKeepClient(config apiConfig, keepServicesJSON string, isDst bool, replications int, srcBlobSignatureTTL time.Duration) (kc *keepclient.KeepClient, blobSignatureTTL time.Duration, err error) {
arv := arvadosclient.ArvadosClient{
ApiToken: config.APIToken,
ApiServer: config.APIHost,
if keepServicesJSON == "" {
kc, err = keepclient.MakeKeepClient(&arv)
if err != nil {
- return nil, err
+ return nil, 0, err
}
} else {
kc = keepclient.New(&arv)
err = kc.LoadKeepServicesFromJSON(keepServicesJSON)
if err != nil {
- return kc, err
+ return kc, 0, err
}
}
if err == nil {
replications = int(value.(float64))
} else {
- return nil, err
+ return nil, 0, err
}
}
kc.Want_replicas = replications
}
- return kc, nil
+ // If srcBlobSignatureTTL is not provided, get it from API server discovery doc
+ blobSignatureTTL = srcBlobSignatureTTL
+ if !isDst && srcBlobSignatureTTL == 0 {
+ value, err := arv.Discovery("blobSignatureTtl")
+ if err == nil {
+ blobSignatureTTL = time.Duration(int(value.(float64))) * time.Second
+ } else {
+ return nil, 0, err
+ }
+ }
+
+ return kc, blobSignatureTTL, nil
}
// Get unique block locators from src and dst
// Copy any blocks missing in dst
-func performKeepRsync(kcSrc, kcDst *keepclient.KeepClient, blobSigningKey, prefix string) error {
+func performKeepRsync(kcSrc, kcDst *keepclient.KeepClient, srcBlobSignatureTTL time.Duration, blobSigningKey, prefix string) error {
// Get unique locators from src
srcIndex, err := getUniqueLocators(kcSrc, prefix)
if err != nil {
log.Printf("Before keep-rsync, there are %d blocks in src and %d blocks in dst. Start copying %d blocks from src not found in dst.",
len(srcIndex), len(dstIndex), len(toBeCopied))
- err = copyBlocksToDst(toBeCopied, kcSrc, kcDst, blobSigningKey)
+ err = copyBlocksToDst(toBeCopied, kcSrc, kcDst, srcBlobSignatureTTL, blobSigningKey)
return err
}
}
// Copy blocks from src to dst; only those that are missing in dst are copied
-func copyBlocksToDst(toBeCopied []string, kcSrc, kcDst *keepclient.KeepClient, blobSigningKey string) error {
+func copyBlocksToDst(toBeCopied []string, kcSrc, kcDst *keepclient.KeepClient, srcBlobSignatureTTL time.Duration, blobSigningKey string) error {
total := len(toBeCopied)
startedAt := time.Now()
getLocator := locator
expiresAt := time.Now().AddDate(0, 0, 1)
if blobSigningKey != "" {
- getLocator = keepclient.SignLocator(getLocator, kcSrc.Arvados.ApiToken, expiresAt, []byte(blobSigningKey))
+ getLocator = keepclient.SignLocator(getLocator, kcSrc.Arvados.ApiToken, expiresAt, srcBlobSignatureTTL, []byte(blobSigningKey))
}
reader, len, _, err := kcSrc.Get(getLocator)
var kcSrc, kcDst *keepclient.KeepClient
var srcKeepServicesJSON, dstKeepServicesJSON, blobSigningKey string
+var blobSignatureTTL = time.Duration(2*7*24) * time.Hour
func (s *ServerRequiredSuite) SetUpTest(c *C) {
// reset all variables between tests
dstConfig.APIHostInsecure = matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
if enforcePermissions {
- blobSigningKey = "zfhgfenhffzltr9dixws36j1yhksjoll2grmku38mi7yxd66h5j4q9w4jzanezacp8s6q0ro3hxakfye02152hncy6zml2ed0uc"
+ blobSigningKey = arvadostest.BlobSigningKey
}
// Start Keep servers
// setup keepclients
var err error
- kcSrc, err = setupKeepClient(srcConfig, srcKeepServicesJSON, false, 0)
+ kcSrc, _, err = setupKeepClient(srcConfig, srcKeepServicesJSON, false, 0, blobSignatureTTL)
c.Check(err, IsNil)
- kcDst, err = setupKeepClient(dstConfig, dstKeepServicesJSON, true, replications)
+ kcDst, _, err = setupKeepClient(dstConfig, dstKeepServicesJSON, true, replications, 0)
c.Check(err, IsNil)
for uuid := range kcSrc.LocalRoots() {
c.Assert(err, Equals, nil)
locator = strings.Split(locator, "+")[0]
- _, _, _, err = kc2.Get(keepclient.SignLocator(locator, kc2.Arvados.ApiToken, time.Now().AddDate(0, 0, 1), []byte(blobSigningKey)))
+ _, _, _, err = kc2.Get(keepclient.SignLocator(locator, kc2.Arvados.ApiToken, time.Now().AddDate(0, 0, 1), blobSignatureTTL, []byte(blobSigningKey)))
c.Assert(err, NotNil)
c.Check(err.Error(), Equals, "Block not found")
}
// setupTestData
setupTestData(c, prefix)
- err := performKeepRsync(kcSrc, kcDst, blobSigningKey, prefix)
+ err := performKeepRsync(kcSrc, kcDst, blobSignatureTTL, blobSigningKey, prefix)
c.Check(err, IsNil)
// Now GetIndex from dst and verify that all 5 from src and the 2 extra blocks are found
setupRsync(c, false, 1)
- err := performKeepRsync(kcSrc, kcDst, "", "")
+ err := performKeepRsync(kcSrc, kcDst, blobSignatureTTL, "", "")
log.Printf("Err = %v", err)
c.Check(strings.Contains(err.Error(), "no such host"), Equals, true)
}
setupRsync(c, false, 1)
- err := performKeepRsync(kcSrc, kcDst, "", "")
+ err := performKeepRsync(kcSrc, kcDst, blobSignatureTTL, "", "")
log.Printf("Err = %v", err)
c.Check(strings.Contains(err.Error(), "no such host"), Equals, true)
}
// Change blob signing key to a fake key, so that Get from src fails
blobSigningKey = "thisisfakeblobsigningkey"
- err := performKeepRsync(kcSrc, kcDst, blobSigningKey, "")
+ err := performKeepRsync(kcSrc, kcDst, blobSignatureTTL, blobSigningKey, "")
c.Check(strings.Contains(err.Error(), "HTTP 403 \"Forbidden\""), Equals, true)
}
// Increase Want_replicas on dst to result in insufficient replicas error during Put
kcDst.Want_replicas = 2
- err := performKeepRsync(kcSrc, kcDst, blobSigningKey, "")
+ err := performKeepRsync(kcSrc, kcDst, blobSignatureTTL, blobSigningKey, "")
c.Check(strings.Contains(err.Error(), "Could not write sufficient replicas"), Equals, true)
}
c.Assert(strings.Contains(err.Error(), "no such file or directory"), Equals, true)
}
+func (s *ServerNotRequiredSuite) TestSetupKeepClient_NoBlobSignatureTTL(c *C) {
+ var srcConfig apiConfig
+ srcConfig.APIHost = os.Getenv("ARVADOS_API_HOST")
+ srcConfig.APIToken = arvadostest.DataManagerToken
+ srcConfig.APIHostInsecure = matchTrue.MatchString(os.Getenv("ARVADOS_API_HOST_INSECURE"))
+ arvadostest.StartKeep(2, false)
+
+ _, ttl, err := setupKeepClient(srcConfig, srcKeepServicesJSON, false, 0, 0)
+ c.Check(err, IsNil)
+ c.Assert(ttl, Equals, blobSignatureTTL)
+}
+
func setupConfigFile(c *C, name string) *os.File {
// Setup a config file
file, err := ioutil.TempFile(os.TempDir(), name)