cd "$WORKSPACE"
py=python
+pipcmd=pip
if [[ -n "$PYCMD" ]] ; then
- py="$PYCMD" ;
+ py="$PYCMD"
+ if [[ $py = python3 ]] ; then
+ pipcmd=pip3
+ fi
fi
(cd sdk/python && python setup.py sdist)
cwl_runner_version=$(cd sdk/python && nohash_version_from_git 1.0)
fi
-docker build --build-arg sdk=$sdk --build-arg runner=$runner --build-arg salad=$salad --build-arg cwltool=$cwltool --build-arg pythoncmd=$py -f "$WORKSPACE/sdk/dev-jobs.dockerfile" -t arvados/jobs:$cwl_runner_version "$WORKSPACE/sdk"
+docker build --build-arg sdk=$sdk --build-arg runner=$runner --build-arg salad=$salad --build-arg cwltool=$cwltool --build-arg pythoncmd=$py --build-arg pipcmd=$pipcmd -f "$WORKSPACE/sdk/dev-jobs.dockerfile" -t arvados/jobs:$cwl_runner_version "$WORKSPACE/sdk"
echo arv-keepdocker arvados/jobs $cwl_runner_version
arv-keepdocker arvados/jobs $cwl_runner_version
WORKSPACE=/path/to/arvados $(basename $0) [options]
--target <target>
- Distribution to build packages for (default: debian8)
+ Distribution to build packages for (default: debian9)
--command
Build command to execute (default: use built-in Docker image command)
--test-packages
exit 1
fi
-TARGET=debian8
+TARGET=debian9
COMMAND=
DEBUG=
set +e
mv -f ${WORKSPACE}/packages/${TARGET}/* ${WORKSPACE}/packages/${TARGET}/processed/ 2>/dev/null
set -e
+set -x
# Build packages. ulimit option can be removed when debian8 and ubuntu1404 are retired
if docker run --ulimit nofile=4096:4096 \
--rm \
--debug
Output debug information (default: false)
--target
- Distribution to build packages for (default: debian8)
+ Distribution to build packages for (default: debian9)
WORKSPACE=path Path to the Arvados SSO source tree to build packages from
EXITCODE=0
DEBUG=${ARVADOS_DEBUG:-0}
-TARGET=debian8
+TARGET=debian9
PARSEDOPTS=$(getopt --name "$0" --longoptions \
help,build-bundle-packages,debug,target: \
WORKSPACE=/path/to/arvados $(basename $0) [options]
--target <target>
- Distribution to build packages for (default: debian8)
+ Distribution to build packages for (default: debian9)
--upload
If the build and test steps are successful, upload the packages
to a remote apt repository (default: false)
exit 1
fi
-TARGET=debian8
+TARGET=debian9
UPLOAD=0
RC=0
title "End of upload packages (`timer`)"
fi
-exit_cleanly
\ No newline at end of file
+exit_cleanly
services/ws
sdk/cli
sdk/pam
+sdk/pam:py3
sdk/python
sdk/python:py3
sdk/ruby
testfuncargs[$dir:py3]="$dir pip $VENV3DIR/bin/"
done
+testfuncargs["sdk/cli"]="sdk/cli"
+
if [[ -z ${interactive} ]]; then
install_all
test_all
TODO: extract this information based on git commit messages and generate changelogs / release notes automatically.
{% endcomment %}
-h3. current master branch
+table(table table-bordered table-condensed).
+|_. development|"master":#master|\3.|
+|_. latest stable|"v1.4.0":#v1_4_0|\3.|
+|_\5. past stable|
+|"v1.3.3":#v1_3_3|"v1.3.0":#v1_3_0|\3.|
+|"v1.2.1":#v1_2_1|"v1.2.0":#v1_2_0|\3.|
+|"v1.1.4":#v1_1_4|"v1.1.3":#v1_1_3|"v1.1.2":#v1_1_2|"v1.1.1":#v1_1_1|"v1.1.0":#v1_1_0|
+|\5. "older":#older|
+
+h3(#master). development master (as of 2019-06-07)
+
+h4. No longer stripping ':' from strings in serialized database columns
+
+ (bug #15311) Strings read from serialized columns in the database with a leading ':' would have the ':' stripped after loading the record. This behavior existed due to legacy serialization behavior which stored Ruby symbols with a leading ':'. Unfortunately this corrupted fields where the leading ":" was intentional. This behavior has been removed.
+
+You can test if any records in your database are affected by going to the API server directory and running @bundle exec rake symbols:check@. This will report which records contain fields with a leading ':' that would previously have been stripped. If there are records to be updated, you can update the database using @bundle exec rake symbols:stringify@.
+
+h3(#v1_4_0). v1.4.0 (2019-06-05)
+
+h4. Populating the new file_count and file_size_total columns on the collections table
+
+As part of story "#14484":https://dev.arvados.org/issues/14484, two new columns were added to the collections table in a database migration. If your installation has a large collections table, this migration may take some time. We've seen it take ~5 minutes on an installation with 250k collections, but your mileage may vary.
+
+The new columns are initialized with a zero value. In order to populate them, it is necessary to run a script called <code class="userinput">populate-file-info-columns-in-collections.rb</code> from the scripts directory of the API server. This can be done out of band, ideally directly after the API server has been upgraded to v1.4.0.
h4. Stricter collection manifest validation on the API server
Arvados is migrating to a centralized configuration file for all components. During the migration, legacy configuration files will continue to be loaded. See "Migrating Configuration":config-migration.html for details.
-h3. v1.3.0 (2018-12-05)
+h3(#v1_3_3). v1.3.3 (2019-05-14)
+
+This release corrects a potential data loss issue, if you are running Arvados 1.3.0 or 1.3.1 we strongly recommended disabling @keep-balance@ until you can upgrade to 1.3.3 or 1.4.0. With keep-balance disabled, there is no chance of data loss.
+
+We've put together a "wiki page":https://dev.arvados.org/projects/arvados/wiki/Recovering_lost_data which outlines how to recover blocks which have been put in the trash, but not yet deleted, as well as how to identify any collections which have missing blocks so that they can be regenerated. The keep-balance component has been enhanced to provide a list of missing blocks and affected collections and we've provided a "utility script":https://github.com/curoverse/arvados/blob/master/tools/keep-xref/keep-xref.py which can be used to identify the workflows that generated those collections and who ran those workflows, so that they can be rerun.
+
+h3(#v1_3_0). v1.3.0 (2018-12-05)
This release includes several database migrations, which will be executed automatically as part of the API server upgrade. On large Arvados installations, these migrations will take a while. We've seen the upgrade take 30 minutes or more on installations with a lot of collections.
Support for the deprecated "jobs" API is broken in this release. Users who rely on it should not upgrade. This will be fixed in an upcoming 1.3.1 patch release, however users are "encouraged to migrate":upgrade-crunch2.html as support for the "jobs" API will be dropped in an upcoming release. Users who are already using the "containers" API are not affected.
-h3. v1.2.1 (2018-11-26)
+h3(#v1_2_1). v1.2.1 (2018-11-26)
There are no special upgrade notes for this release.
-h3. v1.2.0 (2018-09-05)
+h3(#v1_2_0). v1.2.0 (2018-09-05)
h4. Regenerate Postgres table statistics
Verify your setup by confirming that API calls appear in the controller's logs (_e.g._, @journalctl -fu arvados-controller@) while loading a workbench page.
-h3. v1.1.4 (2018-04-10)
+h3(#v1_1_4). v1.1.4 (2018-04-10)
h4. arvados-cwl-runner regressions (2018-04-05)
This bug has been fixed in Arvados release v1.2.0.
-h3. v1.1.3 (2018-02-08)
+h3(#v1_1_3). v1.1.3 (2018-02-08)
There are no special upgrade notes for this release.
-h3. v1.1.2 (2017-12-22)
+h3(#v1_1_2). v1.1.2 (2017-12-22)
h4. The minimum version for Postgres is now 9.4 (2017-12-08)
*# Install the @rh-postgresql94@ backport package from either Software Collections: http://doc.arvados.org/install/install-postgresql.html or the Postgres developers: https://www.postgresql.org/download/linux/redhat/
*# Restore from the backup using @psql@
-h3. v1.1.1 (2017-11-30)
+h3(#v1_1_1). v1.1.1 (2017-11-30)
There are no special upgrade notes for this release.
-h3. v1.1.0 (2017-10-24)
+h3(#v1_1_0). v1.1.0 (2017-10-24)
h4. The minimum version for Postgres is now 9.3 (2017-09-25)
*# Install the @rh-postgresql94@ backport package from either Software Collections: http://doc.arvados.org/install/install-postgresql.html or the Postgres developers: https://www.postgresql.org/download/linux/redhat/
*# Restore from the backup using @psql@
-h3. Older versions
+h3(#older). Older versions
h4. Upgrade slower than usual (2017-06-30)
<notextile>
<pre><code>Clusters:
<span class="userinput">uuid_prefix</span>:
- NodeProfiles:
- apiserver:
- arvados-controller:
- Listen: ":<span class="userinput">9004</span>" # must match the "upstream controller" section of your Nginx config
+ Services:
+ Controller:
+ InternalURLs:
+ "http://localhost:<span class="userinput">9004</span>": {} # must match the "upstream controller" section of your Nginx config
+ RailsAPI:
arvados-api-server:
- Listen: ":<span class="userinput">8000</span>" # must match the "upstream api" section of your Nginx config
+ "http://localhost:<span class="userinput">8000</span>": {} # must match the "upstream api" section of your Nginx config
PostgreSQL:
ConnectionPool: 128
Connection:
<span class="userinput">uuid_prefix</span>:
ManagementToken: xyzzy
SystemRootToken: <span class="userinput">zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz</span>
- NodeProfiles:
- # The key "apiserver" corresponds to ARVADOS_NODE_PROFILE in environment file (see below).
- apiserver:
- arvados-dispatch-cloud:
- Listen: ":9006"
Services:
Controller:
ExternalURL: "https://<span class="userinput">uuid_prefix.arvadosapi.com</span>"
- CloudVMs:
- # BootProbeCommand is a shell command that succeeds when an instance is ready for service
- BootProbeCommand: "sudo systemctl status docker"
+ DispatchCloud:
+ InternalURLs:
+ "http://localhost:9006": {}
+ Containers:
+ CloudVMs:
+ # BootProbeCommand is a shell command that succeeds when an instance is ready for service
+ BootProbeCommand: "sudo systemctl status docker"
- <b># --- driver-specific configuration goes here --- see Amazon and Azure examples below ---</b>
+ <b># --- driver-specific configuration goes here --- see Amazon and Azure examples below ---</b>
- Dispatch:
- PrivateKey: |
+ DispatchPrivateKey: |
-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEAqXoCzcOBkFQ7w4dvXf9B++1ctgZRqEbgRYL3SstuMV4oawks
ttUuxJycDdsPmeYcHsKo8vsEZpN6iYsX6ZZzhkO5nEayUTU8sBjmg1ZCTo4QqKXr
<notextile>
<pre><code>Clusters:
<span class="userinput">uuid_prefix</span>:
- CloudVMs:
- ImageID: ami-01234567890abcdef
- Driver: ec2
- DriverParameters:
- AccessKeyID: EALMF21BJC7MKNF9FVVR
- SecretAccessKey: yKJAPmoCQOMtYWzEUQ1tKTyrocTcbH60CRvGP3pM
- SecurityGroupIDs:
- - sg-0123abcd
- SubnetID: subnet-0123abcd
- Region: us-east-1
- EBSVolumeType: gp2
- AdminUsername: debian
+ Containers:
+ CloudVMs:
+ ImageID: ami-01234567890abcdef
+ Driver: ec2
+ DriverParameters:
+ AccessKeyID: EALMF21BJC7MKNF9FVVR
+ SecretAccessKey: yKJAPmoCQOMtYWzEUQ1tKTyrocTcbH60CRvGP3pM
+ SecurityGroupIDs:
+ - sg-0123abcd
+ SubnetID: subnet-0123abcd
+ Region: us-east-1
+ EBSVolumeType: gp2
+ AdminUsername: debian
</code></pre>
</notextile>
<notextile>
<pre><code>Clusters:
<span class="userinput">uuid_prefix</span>:
- CloudVMs:
- ImageID: "https://zzzzzzzz.blob.core.windows.net/system/Microsoft.Compute/Images/images/zzzzz-compute-osDisk.55555555-5555-5555-5555-555555555555.vhd"
- Driver: azure
- DriverParameters:
- SubscriptionID: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
- ClientID: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
- ClientSecret: 2WyXt0XFbEtutnf2hp528t6Wk9S5bOHWkRaaWwavKQo=
- TenantID: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
- CloudEnvironment: AzurePublicCloud
- ResourceGroup: zzzzz
- Location: centralus
- Network: zzzzz
- Subnet: zzzzz-subnet-private
- StorageAccount: example
- BlobContainer: vhds
- DeleteDanglingResourcesAfter: 20s
- AdminUsername: arvados
-</code></pre>
-</notextile>
-
-Create the host configuration file @/etc/arvados/environment@.
-
-<notextile>
-<pre><code>ARVADOS_NODE_PROFILE=apiserver
+ Containers:
+ CloudVMs:
+ ImageID: "https://zzzzzzzz.blob.core.windows.net/system/Microsoft.Compute/Images/images/zzzzz-compute-osDisk.55555555-5555-5555-5555-555555555555.vhd"
+ Driver: azure
+ DriverParameters:
+ SubscriptionID: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
+ ClientID: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
+ ClientSecret: 2WyXt0XFbEtutnf2hp528t6Wk9S5bOHWkRaaWwavKQo=
+ TenantID: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
+ CloudEnvironment: AzurePublicCloud
+ ResourceGroup: zzzzz
+ Location: centralus
+ Network: zzzzz
+ Subnet: zzzzz-subnet-private
+ StorageAccount: example
+ BlobContainer: vhds
+ DeleteDanglingResourcesAfter: 20s
+ AdminUsername: arvados
</code></pre>
</notextile>
AdminUsername string
}
-const tagKeyInstanceSecret = "InstanceSecret"
-
type containerWrapper interface {
GetBlobReference(name string) *storage.Blob
ListBlobs(params storage.ListBlobsParameters) (storage.BlobListResponse, error)
logger logrus.FieldLogger
}
-func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
+func newAzureInstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
azcfg := azureInstanceSetConfig{}
err = json.Unmarshal(config, &azcfg)
if err != nil {
name = az.namePrefix + name
- timestamp := time.Now().Format(time.RFC3339Nano)
-
- tags := make(map[string]*string)
- tags["created-at"] = ×tamp
+ tags := map[string]*string{}
for k, v := range newTags {
- newstr := v
- tags["dispatch-"+k] = &newstr
+ tags[k] = to.StringPtr(v)
}
+ tags["created-at"] = to.StringPtr(time.Now().Format(time.RFC3339Nano))
nicParameters := network.Interface{
Location: &az.azconfig.Location,
return nil, wrapAzureError(err)
}
- instances := make([]cloud.Instance, 0)
-
+ var instances []cloud.Instance
for ; result.NotDone(); err = result.Next() {
if err != nil {
return nil, wrapAzureError(err)
}
- if strings.HasPrefix(*result.Value().Name, az.namePrefix) {
- instances = append(instances, &azureInstance{
- provider: az,
- vm: result.Value(),
- nic: interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID]})
- }
+ instances = append(instances, &azureInstance{
+ provider: az,
+ vm: result.Value(),
+ nic: interfaces[*(*result.Value().NetworkProfile.NetworkInterfaces)[0].ID],
+ })
}
return instances, nil
}
// ManageNics returns a list of Azure network interface resources.
-// Also performs garbage collection of NICs which have "namePrefix", are
-// not associated with a virtual machine and have a "create-at" time
-// more than DeleteDanglingResourcesAfter (to prevent racing and
+// Also performs garbage collection of NICs which have "namePrefix",
+// are not associated with a virtual machine and have a "created-at"
+// time more than DeleteDanglingResourcesAfter (to prevent racing and
// deleting newly created NICs) in the past are deleted.
func (az *azureInstanceSet) manageNics() (map[string]network.Interface, error) {
az.stopWg.Add(1)
ai.provider.stopWg.Add(1)
defer ai.provider.stopWg.Done()
- tags := make(map[string]*string)
-
+ tags := map[string]*string{}
for k, v := range ai.vm.Tags {
- if !strings.HasPrefix(k, "dispatch-") {
- tags[k] = v
- }
+ tags[k] = v
}
for k, v := range newTags {
- newstr := v
- tags["dispatch-"+k] = &newstr
+ tags[k] = to.StringPtr(v)
}
vmParameters := compute.VirtualMachine{
}
func (ai *azureInstance) Tags() cloud.InstanceTags {
- tags := make(map[string]string)
-
+ tags := cloud.InstanceTags{}
for k, v := range ai.vm.Tags {
- if strings.HasPrefix(k, "dispatch-") {
- tags[k[9:]] = *v
- }
+ tags[k] = *v
}
-
return tags
}
"net"
"net/http"
"os"
+ "strings"
"testing"
"time"
var _ = check.Suite(&AzureInstanceSetSuite{})
+const testNamePrefix = "compute-test123-"
+
type VirtualMachinesClientStub struct{}
func (*VirtualMachinesClientStub) createOrUpdate(ctx context.Context,
return nil, cloud.ImageID(""), cluster, err
}
- ap, err := newAzureInstanceSet(exampleCfg.DriverParameters, "test123", logrus.StandardLogger())
+ ap, err := newAzureInstanceSet(exampleCfg.DriverParameters, "test123", nil, logrus.StandardLogger())
return ap, cloud.ImageID(exampleCfg.ImageIDForTestSuite), cluster, err
}
ap := azureInstanceSet{
BlobContainer: "vhds",
},
dispatcherID: "test123",
- namePrefix: "compute-test123-",
+ namePrefix: testNamePrefix,
logger: logrus.StandardLogger(),
deleteNIC: make(chan string),
deleteBlob: make(chan storage.Blob),
l, err := ap.Instances(nil)
c.Assert(err, check.IsNil)
- for _, i := range l {
+ for _, i := range filterInstances(c, l) {
c.Check(i.Destroy(), check.IsNil)
}
}
if err != nil {
c.Fatal("Error making provider", err)
}
+
l, err := ap.Instances(nil)
c.Assert(err, check.IsNil)
-
+ l = filterInstances(c, l)
if len(l) > 0 {
err = l[0].SetTags(map[string]string{"foo": "bar"})
if err != nil {
c.Fatal("Error setting tags", err)
}
}
+
l, err = ap.Instances(nil)
c.Assert(err, check.IsNil)
+ l = filterInstances(c, l)
if len(l) > 0 {
tg := l[0].Tags()
}
l, err := ap.Instances(nil)
c.Assert(err, check.IsNil)
+ l = filterInstances(c, l)
if len(l) > 0 {
sshclient, err := SetupSSHClient(c, l[0])
return client, nil
}
+
+func filterInstances(c *check.C, instances []cloud.Instance) []cloud.Instance {
+ var r []cloud.Instance
+ for _, i := range instances {
+ if !strings.HasPrefix(i.String(), testNamePrefix) {
+ c.Logf("ignoring instance %s", i)
+ continue
+ }
+ r = append(r, i)
+ }
+ return r
+}
"encoding/json"
"fmt"
"math/big"
- "strings"
"sync"
"git.curoverse.com/arvados.git/lib/cloud"
"golang.org/x/crypto/ssh"
)
-const arvadosDispatchID = "arvados-dispatch-id"
-const tagPrefix = "arvados-dispatch-tag-"
-
// Driver is the ec2 implementation of the cloud.Driver interface.
var Driver = cloud.DriverFunc(newEC2InstanceSet)
}
type ec2InstanceSet struct {
- ec2config ec2InstanceSetConfig
- dispatcherID cloud.InstanceSetID
- logger logrus.FieldLogger
- client ec2Interface
- keysMtx sync.Mutex
- keys map[string]string
+ ec2config ec2InstanceSetConfig
+ instanceSetID cloud.InstanceSetID
+ logger logrus.FieldLogger
+ client ec2Interface
+ keysMtx sync.Mutex
+ keys map[string]string
}
-func newEC2InstanceSet(config json.RawMessage, dispatcherID cloud.InstanceSetID, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
+func newEC2InstanceSet(config json.RawMessage, instanceSetID cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (prv cloud.InstanceSet, err error) {
instanceSet := &ec2InstanceSet{
- dispatcherID: dispatcherID,
- logger: logger,
+ instanceSetID: instanceSetID,
+ logger: logger,
}
err = json.Unmarshal(config, &instanceSet.ec2config)
if err != nil {
}
instanceSet.keysMtx.Unlock()
- ec2tags := []*ec2.Tag{
- &ec2.Tag{
- Key: aws.String(arvadosDispatchID),
- Value: aws.String(string(instanceSet.dispatcherID)),
- },
- &ec2.Tag{
- Key: aws.String("arvados-class"),
- Value: aws.String("dynamic-compute"),
- },
- }
+ ec2tags := []*ec2.Tag{}
for k, v := range newTags {
ec2tags = append(ec2tags, &ec2.Tag{
- Key: aws.String(tagPrefix + k),
+ Key: aws.String(k),
Value: aws.String(v),
})
}
}},
DisableApiTermination: aws.Bool(false),
InstanceInitiatedShutdownBehavior: aws.String("terminate"),
- UserData: aws.String(base64.StdEncoding.EncodeToString([]byte("#!/bin/sh\n" + initCommand + "\n"))),
TagSpecifications: []*ec2.TagSpecification{
&ec2.TagSpecification{
ResourceType: aws.String("instance"),
Tags: ec2tags,
}},
+ UserData: aws.String(base64.StdEncoding.EncodeToString([]byte("#!/bin/sh\n" + initCommand + "\n"))),
}
if instanceType.AddedScratch > 0 {
}, nil
}
-func (instanceSet *ec2InstanceSet) Instances(cloud.InstanceTags) (instances []cloud.Instance, err error) {
- dii := &ec2.DescribeInstancesInput{
- Filters: []*ec2.Filter{&ec2.Filter{
- Name: aws.String("tag:" + arvadosDispatchID),
- Values: []*string{aws.String(string(instanceSet.dispatcherID))},
- }}}
-
+func (instanceSet *ec2InstanceSet) Instances(tags cloud.InstanceTags) (instances []cloud.Instance, err error) {
+ var filters []*ec2.Filter
+ for k, v := range tags {
+ filters = append(filters, &ec2.Filter{
+ Name: aws.String("tag:" + k),
+ Values: []*string{aws.String(v)},
+ })
+ }
+ dii := &ec2.DescribeInstancesInput{Filters: filters}
for {
dio, err := instanceSet.client.DescribeInstances(dii)
if err != nil {
}
func (inst *ec2Instance) SetTags(newTags cloud.InstanceTags) error {
- ec2tags := []*ec2.Tag{
- &ec2.Tag{
- Key: aws.String(arvadosDispatchID),
- Value: aws.String(string(inst.provider.dispatcherID)),
- },
- }
+ var ec2tags []*ec2.Tag
for k, v := range newTags {
ec2tags = append(ec2tags, &ec2.Tag{
- Key: aws.String(tagPrefix + k),
+ Key: aws.String(k),
Value: aws.String(v),
})
}
tags := make(map[string]string)
for _, t := range inst.instance.Tags {
- if strings.HasPrefix(*t.Key, tagPrefix) {
- tags[(*t.Key)[len(tagPrefix):]] = *t.Value
- }
+ tags[*t.Key] = *t.Value
}
return tags
return nil, cloud.ImageID(""), cluster, err
}
- ap, err := newEC2InstanceSet(exampleCfg.DriverParameters, "test123", logrus.StandardLogger())
+ ap, err := newEC2InstanceSet(exampleCfg.DriverParameters, "test123", nil, logrus.StandardLogger())
return ap, cloud.ImageID(exampleCfg.ImageIDForTestSuite), cluster, err
}
ap := ec2InstanceSet{
- ec2config: ec2InstanceSetConfig{},
- dispatcherID: "test123",
- logger: logrus.StandardLogger(),
- client: &ec2stub{},
- keys: make(map[string]string),
+ ec2config: ec2InstanceSetConfig{},
+ instanceSetID: "test123",
+ logger: logrus.StandardLogger(),
+ client: &ec2stub{},
+ keys: make(map[string]string),
}
return &ap, cloud.ImageID("blob"), cluster, nil
}
error
}
+type SharedResourceTags map[string]string
type InstanceSetID string
type InstanceTags map[string]string
type InstanceID string
// A Driver returns an InstanceSet that uses the given InstanceSetID
// and driver-dependent configuration parameters.
//
+// If the driver creates cloud resources that aren't attached to a
+// single VM instance (like SSH key pairs on AWS) and support tagging,
+// they should be tagged with the provided SharedResourceTags.
+//
// The supplied id will be of the form "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
// where each z can be any alphanum. The returned InstanceSet must use
// this id to tag long-lived cloud resources that it creates, and must
// other mechanism. The tags must be visible to another instance of
// the same driver running on a different host.
//
-// The returned InstanceSet must ignore existing resources that are
-// visible but not tagged with the given id, except that it should log
-// a summary of such resources -- only once -- when it starts
-// up. Thus, two identically configured InstanceSets running on
-// different hosts with different ids should log about the existence
-// of each other's resources at startup, but will not interfere with
-// each other.
+// The returned InstanceSet must not modify or delete cloud resources
+// unless they are tagged with the given InstanceSetID or the caller
+// (dispatcher) calls Destroy() on them. It may log a summary of
+// untagged resources once at startup, though. Thus, two identically
+// configured InstanceSets running on different hosts with different
+// ids should log about the existence of each other's resources at
+// startup, but will not interfere with each other.
+//
+// The dispatcher always passes the InstanceSetID as a tag when
+// calling Create() and Instances(), so the driver does not need to
+// tag/filter VMs by InstanceSetID itself.
//
// Example:
//
//
// type exampleDriver struct {}
//
-// func (*exampleDriver) InstanceSet(config json.RawMessage, id InstanceSetID) (InstanceSet, error) {
+// func (*exampleDriver) InstanceSet(config json.RawMessage, id cloud.InstanceSetID, tags cloud.SharedResourceTags, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
// var is exampleInstanceSet
// if err := json.Unmarshal(config, &is); err != nil {
// return nil, err
//
// var _ = registerCloudDriver("example", &exampleDriver{})
type Driver interface {
- InstanceSet(config json.RawMessage, id InstanceSetID, logger logrus.FieldLogger) (InstanceSet, error)
+ InstanceSet(config json.RawMessage, id InstanceSetID, tags SharedResourceTags, logger logrus.FieldLogger) (InstanceSet, error)
}
// DriverFunc makes a Driver using the provided function as its
// InstanceSet method. This is similar to http.HandlerFunc.
-func DriverFunc(fn func(config json.RawMessage, id InstanceSetID, logger logrus.FieldLogger) (InstanceSet, error)) Driver {
+func DriverFunc(fn func(config json.RawMessage, id InstanceSetID, tags SharedResourceTags, logger logrus.FieldLogger) (InstanceSet, error)) Driver {
return driverFunc(fn)
}
-type driverFunc func(config json.RawMessage, id InstanceSetID, logger logrus.FieldLogger) (InstanceSet, error)
+type driverFunc func(config json.RawMessage, id InstanceSetID, tags SharedResourceTags, logger logrus.FieldLogger) (InstanceSet, error)
-func (df driverFunc) InstanceSet(config json.RawMessage, id InstanceSetID, logger logrus.FieldLogger) (InstanceSet, error) {
- return df(config, id, logger)
+func (df driverFunc) InstanceSet(config json.RawMessage, id InstanceSetID, tags SharedResourceTags, logger logrus.FieldLogger) (InstanceSet, error) {
+ return df(config, id, tags, logger)
}
c.Check(stderr.String(), check.Matches, `(?ms).*unexpected object in config entry: Clusters.z1234.PostgreSQL.ConnectionPool\n.*`)
}
+func (s *CommandSuite) TestDumpFormatting(c *check.C) {
+ var stdout, stderr bytes.Buffer
+ in := `
+Clusters:
+ z1234:
+ Containers:
+ CloudVMs:
+ TimeoutBooting: 600s
+ Services:
+ Controller:
+ InternalURLs:
+ http://localhost:12345: {}
+`
+ code := DumpCommand.RunCommand("arvados config-dump", nil, bytes.NewBufferString(in), &stdout, &stderr)
+ c.Check(code, check.Equals, 0)
+ c.Check(stdout.String(), check.Matches, `(?ms).*TimeoutBooting: 10m\n.*`)
+ c.Check(stdout.String(), check.Matches, `(?ms).*http://localhost:12345: {}\n.*`)
+}
+
func (s *CommandSuite) TestDumpUnknownKey(c *check.C) {
var stdout, stderr bytes.Buffer
in := `
Services:
RailsAPI:
InternalURLs: {}
- GitHTTP:
- InternalURLs: {}
- ExternalURL: ""
- Keepstore:
- InternalURLs: {}
+ ExternalURL: "-"
Controller:
InternalURLs: {}
ExternalURL: ""
ExternalURL: ""
Keepbalance:
InternalURLs: {}
+ ExternalURL: "-"
GitHTTP:
InternalURLs: {}
ExternalURL: ""
ExternalURL: ""
DispatchCloud:
InternalURLs: {}
+ ExternalURL: "-"
SSO:
ExternalURL: ""
Keepproxy:
ExternalURL: ""
Keepstore:
InternalURLs: {}
+ ExternalURL: "-"
Composer:
ExternalURL: ""
WebShell:
ExternalURL: ""
Workbench2:
ExternalURL: ""
+ Nodemanager:
+ InternalURLs: {}
+ ExternalURL: "-"
+ Health:
+ InternalURLs: {}
+ ExternalURL: "-"
+
PostgreSQL:
# max concurrent connections per arvados server daemon
ConnectionPool: 32
# site secret. It should be at least 50 characters.
RailsSessionSecretToken: ""
+ # Maximum wall clock time to spend handling an incoming request.
+ RequestTimeout: 5m
+
Users:
# Config parameters to automatically setup new users. If enabled,
# this users will be able to self-activate. Enable this if you want
UnloggedAttributes: []
SystemLogs:
+
+ # Logging threshold: panic, fatal, error, warn, info, debug, or
+ # trace
+ LogLevel: info
+
+ # Logging format: json or text
+ Format: json
+
# Maximum characters of (JSON-encoded) query parameters to include
# in each request log entry. When params exceed this size, they will
# be JSON-encoded, truncated to this size, and logged as
Repositories: /var/lib/arvados/git/repositories
TLS:
+ Certificate: ""
+ Key: ""
Insecure: false
Containers:
# troubleshooting purposes.
LogReuseDecisions: false
+ # PEM encoded SSH key (RSA, DSA, or ECDSA) used by the
+ # (experimental) cloud dispatcher for executing containers on
+ # worker VMs. Begins with "-----BEGIN RSA PRIVATE KEY-----\n"
+ # and ends with "\n-----END RSA PRIVATE KEY-----\n".
+ DispatchPrivateKey: none
+
+ # Maximum time to wait for workers to come up before abandoning
+ # stale locks from a previous dispatch process.
+ StaleLockTimeout: 1m
+
Logging:
# When you run the db:delete_old_container_logs task, it will find
# containers that have been finished for at least this many seconds,
# original job reuse behavior, and is still the default).
ReuseJobIfOutputsDiffer: false
+ CloudVMs:
+ # Enable the cloud scheduler (experimental).
+ Enable: false
+
+ # Name/number of port where workers' SSH services listen.
+ SSHPort: "22"
+
+ # Interval between queue polls.
+ PollInterval: 10s
+
+ # Shell command to execute on each worker to determine whether
+ # the worker is booted and ready to run containers. It should
+ # exit zero if the worker is ready.
+ BootProbeCommand: "docker ps"
+
+ # Minimum interval between consecutive probes to a single
+ # worker.
+ ProbeInterval: 10s
+
+ # Maximum probes per second, across all workers in a pool.
+ MaxProbesPerSecond: 10
+
+ # Time before repeating SIGTERM when killing a container.
+ TimeoutSignal: 5s
+
+ # Time to give up on SIGTERM and write off the worker.
+ TimeoutTERM: 2m
+
+ # Maximum create/destroy-instance operations per second (0 =
+ # unlimited).
+ MaxCloudOpsPerSecond: 0
+
+ # Interval between cloud provider syncs/updates ("list all
+ # instances").
+ SyncInterval: 1m
+
+ # Time to leave an idle worker running (in case new containers
+ # appear in the queue that it can run) before shutting it
+ # down.
+ TimeoutIdle: 1m
+
+ # Time to wait for a new worker to boot (i.e., pass
+ # BootProbeCommand) before giving up and shutting it down.
+ TimeoutBooting: 10m
+
+ # Maximum time a worker can stay alive with no successful
+ # probes before being automatically shut down.
+ TimeoutProbe: 10m
+
+ # Time after shutting down a worker to retry the
+ # shutdown/destroy operation.
+ TimeoutShutdown: 10s
+
+ # Worker VM image ID.
+ ImageID: ami-01234567890abcdef
+
+ # Tags to add on all resources (VMs, NICs, disks) created by
+ # the container dispatcher. (Arvados's own tags --
+ # InstanceType, IdleBehavior, and InstanceSecret -- will also
+ # be added.)
+ ResourceTags:
+ SAMPLE: "tag value"
+
+ # Prefix for predefined tags used by Arvados (InstanceSetID,
+ # InstanceType, InstanceSecret, IdleBehavior). With the
+ # default value "Arvados", tags are "ArvadosInstanceSetID",
+ # "ArvadosInstanceSecret", etc.
+ #
+ # This should only be changed while no cloud resources are in
+ # use and the cloud dispatcher is not running. Otherwise,
+ # VMs/resources that were added using the old tag prefix will
+ # need to be detected and cleaned up manually.
+ TagKeyPrefix: Arvados
+
+ # Cloud driver: "azure" (Microsoft Azure) or "ec2" (Amazon AWS).
+ Driver: ec2
+
+ # Cloud-specific driver parameters.
+ DriverParameters:
+
+ # (ec2) Credentials.
+ AccessKeyID: ""
+ SecretAccessKey: ""
+
+ # (ec2) Instance configuration.
+ SecurityGroupIDs:
+ - ""
+ SubnetID: ""
+ Region: ""
+ EBSVolumeType: gp2
+ AdminUsername: debian
+
+ # (azure) Credentials.
+ SubscriptionID: ""
+ ClientID: ""
+ ClientSecret: ""
+ TenantID: ""
+
+ # (azure) Instance configuration.
+ CloudEnvironment: AzurePublicCloud
+ ResourceGroup: ""
+ Location: centralus
+ Network: ""
+ Subnet: ""
+ StorageAccount: ""
+ BlobContainer: ""
+ DeleteDanglingResourcesAfter: 20s
+ AdminUsername: arvados
+
+ InstanceTypes:
+
+ # Use the instance type name as the key (in place of "SAMPLE" in
+ # this sample entry).
+ SAMPLE:
+ # Cloud provider's instance type. Defaults to the configured type name.
+ ProviderType: ""
+ VCPUs: 1
+ RAM: 128MiB
+ IncludedScratch: 16GB
+ AddedScratch: 0
+ Price: 0.1
+ Preemptible: false
+
Mail:
MailchimpAPIKey: ""
MailchimpListID: ""
EmailFrom: ""
RemoteClusters:
"*":
+ Host: ""
+ Proxy: false
+ Scheme: https
+ Insecure: false
+ ActivateUsers: false
+ SAMPLE:
+ Host: sample.arvadosapi.com
Proxy: false
+ Scheme: https
+ Insecure: false
ActivateUsers: false
type deprCluster struct {
RequestLimits deprRequestLimits
- NodeProfiles map[string]arvados.NodeProfile
+ NodeProfiles map[string]nodeProfile
}
type deprecatedConfig struct {
Clusters map[string]deprCluster
}
+type nodeProfile struct {
+ Controller systemServiceInstance `json:"arvados-controller"`
+ Health systemServiceInstance `json:"arvados-health"`
+ Keepbalance systemServiceInstance `json:"keep-balance"`
+ Keepproxy systemServiceInstance `json:"keepproxy"`
+ Keepstore systemServiceInstance `json:"keepstore"`
+ Keepweb systemServiceInstance `json:"keep-web"`
+ Nodemanager systemServiceInstance `json:"arvados-node-manager"`
+ DispatchCloud systemServiceInstance `json:"arvados-dispatch-cloud"`
+ RailsAPI systemServiceInstance `json:"arvados-api-server"`
+ Websocket systemServiceInstance `json:"arvados-ws"`
+ Workbench1 systemServiceInstance `json:"arvados-workbench"`
+}
+
+type systemServiceInstance struct {
+ Listen string
+ TLS bool
+ Insecure bool
+}
+
func applyDeprecatedConfig(cfg *arvados.Config, configdata []byte, log logger) error {
var dc deprecatedConfig
err := yaml.Unmarshal(configdata, &dc)
return nil
}
-func applyDeprecatedNodeProfile(hostname string, ssi arvados.SystemServiceInstance, svc *arvados.Service) {
+func applyDeprecatedNodeProfile(hostname string, ssi systemServiceInstance, svc *arvados.Service) {
scheme := "https"
if !ssi.TLS {
scheme = "http"
Services:
RailsAPI:
InternalURLs: {}
- GitHTTP:
- InternalURLs: {}
- ExternalURL: ""
- Keepstore:
- InternalURLs: {}
+ ExternalURL: "-"
Controller:
InternalURLs: {}
ExternalURL: ""
ExternalURL: ""
Keepbalance:
InternalURLs: {}
+ ExternalURL: "-"
GitHTTP:
InternalURLs: {}
ExternalURL: ""
ExternalURL: ""
DispatchCloud:
InternalURLs: {}
+ ExternalURL: "-"
SSO:
ExternalURL: ""
Keepproxy:
ExternalURL: ""
Keepstore:
InternalURLs: {}
+ ExternalURL: "-"
Composer:
ExternalURL: ""
WebShell:
ExternalURL: ""
Workbench2:
ExternalURL: ""
+ Nodemanager:
+ InternalURLs: {}
+ ExternalURL: "-"
+ Health:
+ InternalURLs: {}
+ ExternalURL: "-"
+
PostgreSQL:
# max concurrent connections per arvados server daemon
ConnectionPool: 32
# site secret. It should be at least 50 characters.
RailsSessionSecretToken: ""
+ # Maximum wall clock time to spend handling an incoming request.
+ RequestTimeout: 5m
+
Users:
# Config parameters to automatically setup new users. If enabled,
# this users will be able to self-activate. Enable this if you want
UnloggedAttributes: []
SystemLogs:
+
+ # Logging threshold: panic, fatal, error, warn, info, debug, or
+ # trace
+ LogLevel: info
+
+ # Logging format: json or text
+ Format: json
+
# Maximum characters of (JSON-encoded) query parameters to include
# in each request log entry. When params exceed this size, they will
# be JSON-encoded, truncated to this size, and logged as
Repositories: /var/lib/arvados/git/repositories
TLS:
+ Certificate: ""
+ Key: ""
Insecure: false
Containers:
# troubleshooting purposes.
LogReuseDecisions: false
+ # PEM encoded SSH key (RSA, DSA, or ECDSA) used by the
+ # (experimental) cloud dispatcher for executing containers on
+ # worker VMs. Begins with "-----BEGIN RSA PRIVATE KEY-----\n"
+ # and ends with "\n-----END RSA PRIVATE KEY-----\n".
+ DispatchPrivateKey: none
+
+ # Maximum time to wait for workers to come up before abandoning
+ # stale locks from a previous dispatch process.
+ StaleLockTimeout: 1m
+
Logging:
# When you run the db:delete_old_container_logs task, it will find
# containers that have been finished for at least this many seconds,
# original job reuse behavior, and is still the default).
ReuseJobIfOutputsDiffer: false
+ CloudVMs:
+ # Enable the cloud scheduler (experimental).
+ Enable: false
+
+ # Name/number of port where workers' SSH services listen.
+ SSHPort: "22"
+
+ # Interval between queue polls.
+ PollInterval: 10s
+
+ # Shell command to execute on each worker to determine whether
+ # the worker is booted and ready to run containers. It should
+ # exit zero if the worker is ready.
+ BootProbeCommand: "docker ps"
+
+ # Minimum interval between consecutive probes to a single
+ # worker.
+ ProbeInterval: 10s
+
+ # Maximum probes per second, across all workers in a pool.
+ MaxProbesPerSecond: 10
+
+ # Time before repeating SIGTERM when killing a container.
+ TimeoutSignal: 5s
+
+ # Time to give up on SIGTERM and write off the worker.
+ TimeoutTERM: 2m
+
+ # Maximum create/destroy-instance operations per second (0 =
+ # unlimited).
+ MaxCloudOpsPerSecond: 0
+
+ # Interval between cloud provider syncs/updates ("list all
+ # instances").
+ SyncInterval: 1m
+
+ # Time to leave an idle worker running (in case new containers
+ # appear in the queue that it can run) before shutting it
+ # down.
+ TimeoutIdle: 1m
+
+ # Time to wait for a new worker to boot (i.e., pass
+ # BootProbeCommand) before giving up and shutting it down.
+ TimeoutBooting: 10m
+
+ # Maximum time a worker can stay alive with no successful
+ # probes before being automatically shut down.
+ TimeoutProbe: 10m
+
+ # Time after shutting down a worker to retry the
+ # shutdown/destroy operation.
+ TimeoutShutdown: 10s
+
+ # Worker VM image ID.
+ ImageID: ami-01234567890abcdef
+
+ # Tags to add on all resources (VMs, NICs, disks) created by
+ # the container dispatcher. (Arvados's own tags --
+ # InstanceType, IdleBehavior, and InstanceSecret -- will also
+ # be added.)
+ ResourceTags:
+ SAMPLE: "tag value"
+
+ # Prefix for predefined tags used by Arvados (InstanceSetID,
+ # InstanceType, InstanceSecret, IdleBehavior). With the
+ # default value "Arvados", tags are "ArvadosInstanceSetID",
+ # "ArvadosInstanceSecret", etc.
+ #
+ # This should only be changed while no cloud resources are in
+ # use and the cloud dispatcher is not running. Otherwise,
+ # VMs/resources that were added using the old tag prefix will
+ # need to be detected and cleaned up manually.
+ TagKeyPrefix: Arvados
+
+ # Cloud driver: "azure" (Microsoft Azure) or "ec2" (Amazon AWS).
+ Driver: ec2
+
+ # Cloud-specific driver parameters.
+ DriverParameters:
+
+ # (ec2) Credentials.
+ AccessKeyID: ""
+ SecretAccessKey: ""
+
+ # (ec2) Instance configuration.
+ SecurityGroupIDs:
+ - ""
+ SubnetID: ""
+ Region: ""
+ EBSVolumeType: gp2
+ AdminUsername: debian
+
+ # (azure) Credentials.
+ SubscriptionID: ""
+ ClientID: ""
+ ClientSecret: ""
+ TenantID: ""
+
+ # (azure) Instance configuration.
+ CloudEnvironment: AzurePublicCloud
+ ResourceGroup: ""
+ Location: centralus
+ Network: ""
+ Subnet: ""
+ StorageAccount: ""
+ BlobContainer: ""
+ DeleteDanglingResourcesAfter: 20s
+ AdminUsername: arvados
+
+ InstanceTypes:
+
+ # Use the instance type name as the key (in place of "SAMPLE" in
+ # this sample entry).
+ SAMPLE:
+ # Cloud provider's instance type. Defaults to the configured type name.
+ ProviderType: ""
+ VCPUs: 1
+ RAM: 128MiB
+ IncludedScratch: 16GB
+ AddedScratch: 0
+ Price: 0.1
+ Preemptible: false
+
Mail:
MailchimpAPIKey: ""
MailchimpListID: ""
EmailFrom: ""
RemoteClusters:
"*":
+ Host: ""
+ Proxy: false
+ Scheme: https
+ Insecure: false
+ ActivateUsers: false
+ SAMPLE:
+ Host: sample.arvadosapi.com
Proxy: false
+ Scheme: https
+ Insecure: false
ActivateUsers: false
`)
return nil, fmt.Errorf("loading config data: %s", err)
}
logExtraKeys(log, merged, src, "")
+ removeSampleKeys(merged)
err = mergo.Merge(&merged, src, mergo.WithOverride)
if err != nil {
return nil, fmt.Errorf("merging config data: %s", err)
return nil
}
+func removeSampleKeys(m map[string]interface{}) {
+ delete(m, "SAMPLE")
+ for _, v := range m {
+ if v, _ := v.(map[string]interface{}); v != nil {
+ removeSampleKeys(v)
+ }
+ }
+}
+
func logExtraKeys(log logger, expected, supplied map[string]interface{}, prefix string) {
if log == nil {
return
}
+ allowed := map[string]interface{}{}
+ for k, v := range expected {
+ allowed[strings.ToLower(k)] = v
+ }
for k, vsupp := range supplied {
- if vexp, ok := expected[k]; !ok {
+ vexp, ok := allowed[strings.ToLower(k)]
+ if !ok && expected["SAMPLE"] != nil {
+ vexp = expected["SAMPLE"]
+ } else if !ok {
log.Warnf("deprecated or unknown config entry: %s%s", prefix, k)
- } else if vsupp, ok := vsupp.(map[string]interface{}); !ok {
+ continue
+ }
+ if vsupp, ok := vsupp.(map[string]interface{}); !ok {
// if vsupp is a map but vexp isn't map, this
// will be caught elsewhere; see TestBadType.
continue
"io"
"os"
"os/exec"
+ "strings"
"testing"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/ctxlog"
"github.com/ghodss/yaml"
+ "github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
)
c.Check(cc.API.MaxItemsPerResponse, check.Equals, 1000)
}
+func (s *LoadSuite) TestSampleKeys(c *check.C) {
+ for _, yaml := range []string{
+ `{"Clusters":{"z1111":{}}}`,
+ `{"Clusters":{"z1111":{"InstanceTypes":{"Foo":{"RAM": "12345M"}}}}}`,
+ } {
+ cfg, err := Load(bytes.NewBufferString(yaml), ctxlog.TestLogger(c))
+ c.Assert(err, check.IsNil)
+ cc, err := cfg.GetCluster("z1111")
+ _, hasSample := cc.InstanceTypes["SAMPLE"]
+ c.Check(hasSample, check.Equals, false)
+ if strings.Contains(yaml, "Foo") {
+ c.Check(cc.InstanceTypes["Foo"].RAM, check.Equals, arvados.ByteSize(12345000000))
+ c.Check(cc.InstanceTypes["Foo"].Price, check.Equals, 0.0)
+ }
+ }
+}
+
func (s *LoadSuite) TestMultipleClusters(c *check.C) {
cfg, err := Load(bytes.NewBufferString(`{"Clusters":{"z1111":{},"z2222":{}}}`), ctxlog.TestLogger(c))
c.Assert(err, check.IsNil)
c.Check(c2.ClusterID, check.Equals, "z2222")
}
+func (s *LoadSuite) TestDeprecatedOrUnknownWarning(c *check.C) {
+ var logbuf bytes.Buffer
+ logger := logrus.New()
+ logger.Out = &logbuf
+ _, err := Load(bytes.NewBufferString(`
+Clusters:
+ zzzzz:
+ postgresql: {}
+ BadKey: {}
+ Containers: {}
+ RemoteClusters:
+ z2222:
+ Host: z2222.arvadosapi.com
+ Proxy: true
+ BadKey: badValue
+`), logger)
+ c.Assert(err, check.IsNil)
+ logs := strings.Split(strings.TrimSuffix(logbuf.String(), "\n"), "\n")
+ for _, log := range logs {
+ c.Check(log, check.Matches, `.*deprecated or unknown config entry:.*BadKey.*`)
+ }
+ c.Check(logs, check.HasLen, 2)
+}
+
+func (s *LoadSuite) TestNoWarningsForDumpedConfig(c *check.C) {
+ var logbuf bytes.Buffer
+ logger := logrus.New()
+ logger.Out = &logbuf
+ cfg, err := Load(bytes.NewBufferString(`{"Clusters":{"zzzzz":{}}}`), logger)
+ c.Assert(err, check.IsNil)
+ yaml, err := yaml.Marshal(cfg)
+ c.Assert(err, check.IsNil)
+ cfgDumped, err := Load(bytes.NewBuffer(yaml), logger)
+ c.Assert(err, check.IsNil)
+ c.Check(cfg, check.DeepEquals, cfgDumped)
+ c.Check(logbuf.String(), check.Equals, "")
+}
+
func (s *LoadSuite) TestPostgreSQLKeyConflict(c *check.C) {
_, err := Load(bytes.NewBufferString(`
Clusters:
var Command cmd.Handler = service.Command(arvados.ServiceNameController, newHandler)
-func newHandler(_ context.Context, cluster *arvados.Cluster, np *arvados.NodeProfile, _ string) service.Handler {
- return &Handler{Cluster: cluster, NodeProfile: np}
+func newHandler(_ context.Context, cluster *arvados.Cluster, _ string) service.Handler {
+ return &Handler{Cluster: cluster}
}
s.remoteMock.Server.Handler = http.HandlerFunc(s.remoteMockHandler)
c.Assert(s.remoteMock.Start(), check.IsNil)
- nodeProfile := arvados.NodeProfile{
- Controller: arvados.SystemServiceInstance{Listen: ":"},
- RailsAPI: arvados.SystemServiceInstance{Listen: ":1"}, // local reqs will error "connection refused"
- }
- s.testHandler = &Handler{Cluster: &arvados.Cluster{
+ cluster := &arvados.Cluster{
ClusterID: "zhome",
PostgreSQL: integrationTestCluster().PostgreSQL,
- NodeProfiles: map[string]arvados.NodeProfile{
- "*": nodeProfile,
- },
+ TLS: arvados.TLS{Insecure: true},
API: arvados.API{
MaxItemsPerResponse: 1000,
MaxRequestAmplification: 4,
},
- }, NodeProfile: &nodeProfile}
+ }
+ arvadostest.SetServiceURL(&cluster.Services.RailsAPI, "http://localhost:1/")
+ arvadostest.SetServiceURL(&cluster.Services.Controller, "http://localhost:/")
+ s.testHandler = &Handler{Cluster: cluster}
s.testServer = newServerFromIntegrationTestEnv(c)
s.testServer.Server.Handler = httpserver.AddRequestIDs(httpserver.LogRequests(s.log, s.testHandler))
- s.testHandler.Cluster.RemoteClusters = map[string]arvados.RemoteCluster{
+ cluster.RemoteClusters = map[string]arvados.RemoteCluster{
"zzzzz": {
Host: s.remoteServer.Addr,
Proxy: true,
Handler: h,
},
}
-
c.Assert(srv.Start(), check.IsNil)
-
- np := arvados.NodeProfile{
- Controller: arvados.SystemServiceInstance{Listen: ":"},
- RailsAPI: arvados.SystemServiceInstance{Listen: srv.Addr,
- TLS: false, Insecure: true}}
- s.testHandler.Cluster.NodeProfiles["*"] = np
- s.testHandler.NodeProfile = &np
-
+ arvadostest.SetServiceURL(&s.testHandler.Cluster.Services.RailsAPI, "http://"+srv.Addr)
return srv
}
}
func (s *FederationSuite) TestGetLocalCollection(c *check.C) {
- np := arvados.NodeProfile{
- Controller: arvados.SystemServiceInstance{Listen: ":"},
- RailsAPI: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"),
- TLS: true, Insecure: true}}
s.testHandler.Cluster.ClusterID = "zzzzz"
- s.testHandler.Cluster.NodeProfiles["*"] = np
- s.testHandler.NodeProfile = &np
+ arvadostest.SetServiceURL(&s.testHandler.Cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST"))
// HTTP GET
}
func (s *FederationSuite) TestGetLocalCollectionByPDH(c *check.C) {
- np := arvados.NodeProfile{
- Controller: arvados.SystemServiceInstance{Listen: ":"},
- RailsAPI: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"),
- TLS: true, Insecure: true}}
- s.testHandler.Cluster.NodeProfiles["*"] = np
- s.testHandler.NodeProfile = &np
+ arvadostest.SetServiceURL(&s.testHandler.Cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST"))
req := httptest.NewRequest("GET", "/arvados/v1/collections/"+arvadostest.UserAgreementPDH, nil)
req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveToken)
}
func (s *FederationSuite) TestSaltedTokenGetCollectionByPDH(c *check.C) {
- np := arvados.NodeProfile{
- Controller: arvados.SystemServiceInstance{Listen: ":"},
- RailsAPI: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"),
- TLS: true, Insecure: true}}
- s.testHandler.Cluster.NodeProfiles["*"] = np
- s.testHandler.NodeProfile = &np
+ arvadostest.SetServiceURL(&s.testHandler.Cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST"))
req := httptest.NewRequest("GET", "/arvados/v1/collections/"+arvadostest.UserAgreementPDH, nil)
req.Header.Set("Authorization", "Bearer v2/zzzzz-gj3su-077z32aux8dg2s1/282d7d172b6cfdce364c5ed12ddf7417b2d00065")
}
func (s *FederationSuite) TestSaltedTokenGetCollectionByPDHError(c *check.C) {
- np := arvados.NodeProfile{
- Controller: arvados.SystemServiceInstance{Listen: ":"},
- RailsAPI: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"),
- TLS: true, Insecure: true}}
- s.testHandler.Cluster.NodeProfiles["*"] = np
- s.testHandler.NodeProfile = &np
+ arvadostest.SetServiceURL(&s.testHandler.Cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST"))
req := httptest.NewRequest("GET", "/arvados/v1/collections/99999999999999999999999999999999+99", nil)
req.Header.Set("Authorization", "Bearer v2/zzzzz-gj3su-077z32aux8dg2s1/282d7d172b6cfdce364c5ed12ddf7417b2d00065")
req.Header.Set("Authorization", "Bearer "+arvadostest.ActiveTokenV2)
req.Header.Set("Content-type", "application/json")
- np := arvados.NodeProfile{
- Controller: arvados.SystemServiceInstance{Listen: ":"},
- RailsAPI: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"),
- TLS: true, Insecure: true}}
+ arvadostest.SetServiceURL(&s.testHandler.Cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST"))
s.testHandler.Cluster.ClusterID = "zzzzz"
- s.testHandler.Cluster.NodeProfiles["*"] = np
- s.testHandler.NodeProfile = &np
resp := s.testRequest(req)
c.Check(resp.StatusCode, check.Equals, http.StatusOK)
"context"
"database/sql"
"errors"
- "net"
+ "fmt"
"net/http"
"net/url"
"strings"
)
type Handler struct {
- Cluster *arvados.Cluster
- NodeProfile *arvados.NodeProfile
+ Cluster *arvados.Cluster
setupOnce sync.Once
handlerStack http.Handler
req.URL.Path = strings.Replace(req.URL.Path, "//", "/", -1)
}
}
- if h.Cluster.HTTPRequestTimeout > 0 {
- ctx, cancel := context.WithDeadline(req.Context(), time.Now().Add(time.Duration(h.Cluster.HTTPRequestTimeout)))
+ if h.Cluster.API.RequestTimeout > 0 {
+ ctx, cancel := context.WithDeadline(req.Context(), time.Now().Add(time.Duration(h.Cluster.API.RequestTimeout)))
req = req.WithContext(ctx)
defer cancel()
}
func (h *Handler) CheckHealth() error {
h.setupOnce.Do(h.setup)
- _, _, err := findRailsAPI(h.Cluster, h.NodeProfile)
+ _, _, err := findRailsAPI(h.Cluster)
return err
}
}
func (h *Handler) localClusterRequest(req *http.Request) (*http.Response, error) {
- urlOut, insecure, err := findRailsAPI(h.Cluster, h.NodeProfile)
+ urlOut, insecure, err := findRailsAPI(h.Cluster)
if err != nil {
return nil, err
}
}
}
-// For now, findRailsAPI always uses the rails API running on this
-// node.
-func findRailsAPI(cluster *arvados.Cluster, np *arvados.NodeProfile) (*url.URL, bool, error) {
- hostport := np.RailsAPI.Listen
- if len(hostport) > 1 && hostport[0] == ':' && strings.TrimRight(hostport[1:], "0123456789") == "" {
- // ":12345" => connect to indicated port on localhost
- hostport = "localhost" + hostport
- } else if _, _, err := net.SplitHostPort(hostport); err == nil {
- // "[::1]:12345" => connect to indicated address & port
- } else {
- return nil, false, err
+// Use a localhost entry from Services.RailsAPI.InternalURLs if one is
+// present, otherwise choose an arbitrary entry.
+func findRailsAPI(cluster *arvados.Cluster) (*url.URL, bool, error) {
+ var best *url.URL
+ for target := range cluster.Services.RailsAPI.InternalURLs {
+ target := url.URL(target)
+ best = &target
+ if strings.HasPrefix(target.Host, "localhost:") || strings.HasPrefix(target.Host, "127.0.0.1:") || strings.HasPrefix(target.Host, "[::1]:") {
+ break
+ }
}
- proto := "http"
- if np.RailsAPI.TLS {
- proto = "https"
+ if best == nil {
+ return nil, false, fmt.Errorf("Services.RailsAPI.InternalURLs is empty")
}
- url, err := url.Parse(proto + "://" + hostport)
- return url, np.RailsAPI.Insecure, err
+ return best, cluster.TLS.Insecure, nil
}
s.cluster = &arvados.Cluster{
ClusterID: "zzzzz",
PostgreSQL: integrationTestCluster().PostgreSQL,
- NodeProfiles: map[string]arvados.NodeProfile{
- "*": {
- Controller: arvados.SystemServiceInstance{Listen: ":"},
- RailsAPI: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"), TLS: true, Insecure: true},
- },
- },
+ TLS: arvados.TLS{Insecure: true},
}
- node := s.cluster.NodeProfiles["*"]
- s.handler = newHandler(s.ctx, s.cluster, &node, "")
+ arvadostest.SetServiceURL(&s.cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST"))
+ arvadostest.SetServiceURL(&s.cluster.Services.Controller, "http://localhost:/")
+ s.handler = newHandler(s.ctx, s.cluster, "")
}
func (s *HandlerSuite) TearDownTest(c *check.C) {
}
func (s *HandlerSuite) TestRequestTimeout(c *check.C) {
- s.cluster.HTTPRequestTimeout = arvados.Duration(time.Nanosecond)
+ s.cluster.API.RequestTimeout = arvados.Duration(time.Nanosecond)
req := httptest.NewRequest("GET", "/discovery/v1/apis/arvados/v1/rest", nil)
resp := httptest.NewRecorder()
s.handler.ServeHTTP(resp, req)
"path/filepath"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
"git.curoverse.com/arvados.git/sdk/go/ctxlog"
"git.curoverse.com/arvados.git/sdk/go/httpserver"
check "gopkg.in/check.v1"
func newServerFromIntegrationTestEnv(c *check.C) *httpserver.Server {
log := ctxlog.TestLogger(c)
- nodeProfile := arvados.NodeProfile{
- Controller: arvados.SystemServiceInstance{Listen: ":"},
- RailsAPI: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_TEST_API_HOST"), TLS: true, Insecure: true},
- }
handler := &Handler{Cluster: &arvados.Cluster{
ClusterID: "zzzzz",
PostgreSQL: integrationTestCluster().PostgreSQL,
- NodeProfiles: map[string]arvados.NodeProfile{
- "*": nodeProfile,
- },
- }, NodeProfile: &nodeProfile}
+ TLS: arvados.TLS{Insecure: true},
+ }}
+ arvadostest.SetServiceURL(&handler.Cluster.Services.RailsAPI, "https://"+os.Getenv("ARVADOS_TEST_API_HOST"))
+ arvadostest.SetServiceURL(&handler.Cluster.Services.Controller, "http://localhost:/")
srv := &httpserver.Server{
Server: http.Server{
Handler: httpserver.AddRequestIDs(httpserver.LogRequests(log, handler)),
},
- Addr: nodeProfile.Controller.Listen,
+ Addr: ":",
}
return srv
}
var Command cmd.Handler = service.Command(arvados.ServiceNameDispatchCloud, newHandler)
-func newHandler(ctx context.Context, cluster *arvados.Cluster, np *arvados.NodeProfile, token string) service.Handler {
+func newHandler(ctx context.Context, cluster *arvados.Cluster, token string) service.Handler {
ac, err := arvados.NewClientFromConfig(cluster)
if err != nil {
- return service.ErrorHandler(ctx, cluster, np, fmt.Errorf("error initializing client from cluster config: %s", err))
+ return service.ErrorHandler(ctx, cluster, fmt.Errorf("error initializing client from cluster config: %s", err))
}
d := &dispatcher{
Cluster: cluster,
// Make a worker.Executor for the given instance.
func (disp *dispatcher) newExecutor(inst cloud.Instance) worker.Executor {
exr := ssh_executor.New(inst)
- exr.SetTargetPort(disp.Cluster.CloudVMs.SSHPort)
+ exr.SetTargetPort(disp.Cluster.Containers.CloudVMs.SSHPort)
exr.SetSigners(disp.sshKey)
return exr
}
disp.stop = make(chan struct{}, 1)
disp.stopped = make(chan struct{})
- if key, err := ssh.ParsePrivateKey([]byte(disp.Cluster.Dispatch.PrivateKey)); err != nil {
- disp.logger.Fatalf("error parsing configured Dispatch.PrivateKey: %s", err)
+ if key, err := ssh.ParsePrivateKey([]byte(disp.Cluster.Containers.DispatchPrivateKey)); err != nil {
+ disp.logger.Fatalf("error parsing configured Containers.DispatchPrivateKey: %s", err)
} else {
disp.sshKey = key
}
}
disp.instanceSet = instanceSet
disp.reg = prometheus.NewRegistry()
- disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.reg, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
+ disp.pool = worker.NewPool(disp.logger, disp.ArvClient, disp.reg, disp.InstanceSetID, disp.instanceSet, disp.newExecutor, disp.sshKey.PublicKey(), disp.Cluster)
disp.queue = container.NewQueue(disp.logger, disp.reg, disp.typeChooser, disp.ArvClient)
if disp.Cluster.ManagementToken == "" {
defer disp.instanceSet.Stop()
defer disp.pool.Stop()
- staleLockTimeout := time.Duration(disp.Cluster.Dispatch.StaleLockTimeout)
+ staleLockTimeout := time.Duration(disp.Cluster.Containers.StaleLockTimeout)
if staleLockTimeout == 0 {
staleLockTimeout = defaultStaleLockTimeout
}
- pollInterval := time.Duration(disp.Cluster.Dispatch.PollInterval)
+ pollInterval := time.Duration(disp.Cluster.Containers.CloudVMs.PollInterval)
if pollInterval <= 0 {
pollInterval = defaultPollInterval
}
}
s.cluster = &arvados.Cluster{
- CloudVMs: arvados.CloudVMs{
- Driver: "test",
- SyncInterval: arvados.Duration(10 * time.Millisecond),
- TimeoutIdle: arvados.Duration(150 * time.Millisecond),
- TimeoutBooting: arvados.Duration(150 * time.Millisecond),
- TimeoutProbe: arvados.Duration(15 * time.Millisecond),
- TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
- MaxCloudOpsPerSecond: 500,
- },
- Dispatch: arvados.Dispatch{
- PrivateKey: string(dispatchprivraw),
- PollInterval: arvados.Duration(5 * time.Millisecond),
- ProbeInterval: arvados.Duration(5 * time.Millisecond),
+ Containers: arvados.ContainersConfig{
+ DispatchPrivateKey: string(dispatchprivraw),
StaleLockTimeout: arvados.Duration(5 * time.Millisecond),
- MaxProbesPerSecond: 1000,
- TimeoutSignal: arvados.Duration(3 * time.Millisecond),
- TimeoutTERM: arvados.Duration(20 * time.Millisecond),
+ CloudVMs: arvados.CloudVMsConfig{
+ Driver: "test",
+ SyncInterval: arvados.Duration(10 * time.Millisecond),
+ TimeoutIdle: arvados.Duration(150 * time.Millisecond),
+ TimeoutBooting: arvados.Duration(150 * time.Millisecond),
+ TimeoutProbe: arvados.Duration(15 * time.Millisecond),
+ TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
+ MaxCloudOpsPerSecond: 500,
+ PollInterval: arvados.Duration(5 * time.Millisecond),
+ ProbeInterval: arvados.Duration(5 * time.Millisecond),
+ MaxProbesPerSecond: 1000,
+ TimeoutSignal: arvados.Duration(3 * time.Millisecond),
+ TimeoutTERM: arvados.Duration(20 * time.Millisecond),
+ ResourceTags: map[string]string{"testtag": "test value"},
+ TagKeyPrefix: "test:",
+ },
},
InstanceTypes: arvados.InstanceTypeMap{
test.InstanceType(1).Name: test.InstanceType(1),
test.InstanceType(8).Name: test.InstanceType(8),
test.InstanceType(16).Name: test.InstanceType(16),
},
- NodeProfiles: map[string]arvados.NodeProfile{
- "*": {
- Controller: arvados.SystemServiceInstance{Listen: os.Getenv("ARVADOS_API_HOST")},
- DispatchCloud: arvados.SystemServiceInstance{Listen: ":"},
- },
- },
- Services: arvados.Services{
- Controller: arvados.Service{ExternalURL: arvados.URL{Scheme: "https", Host: os.Getenv("ARVADOS_API_HOST")}},
- },
}
+ arvadostest.SetServiceURL(&s.cluster.Services.DispatchCloud, "http://localhost:/")
+ arvadostest.SetServiceURL(&s.cluster.Services.Controller, "https://"+os.Getenv("ARVADOS_API_HOST")+"/")
arvClient, err := arvados.NewClientFromConfig(s.cluster)
c.Check(err, check.IsNil)
func (s *DispatcherSuite) TestInstancesAPI(c *check.C) {
s.cluster.ManagementToken = "abcdefgh"
- s.cluster.CloudVMs.TimeoutBooting = arvados.Duration(time.Second)
+ s.cluster.Containers.CloudVMs.TimeoutBooting = arvados.Duration(time.Second)
drivers["test"] = s.stubDriver
s.disp.setupOnce.Do(s.disp.initialize)
s.disp.queue = &test.Queue{}
}
func newInstanceSet(cluster *arvados.Cluster, setID cloud.InstanceSetID, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
- driver, ok := drivers[cluster.CloudVMs.Driver]
+ driver, ok := drivers[cluster.Containers.CloudVMs.Driver]
if !ok {
- return nil, fmt.Errorf("unsupported cloud driver %q", cluster.CloudVMs.Driver)
+ return nil, fmt.Errorf("unsupported cloud driver %q", cluster.Containers.CloudVMs.Driver)
}
- is, err := driver.InstanceSet(cluster.CloudVMs.DriverParameters, setID, logger)
- if maxops := cluster.CloudVMs.MaxCloudOpsPerSecond; maxops > 0 {
- is = &rateLimitedInstanceSet{
+ sharedResourceTags := cloud.SharedResourceTags(cluster.Containers.CloudVMs.ResourceTags)
+ is, err := driver.InstanceSet(cluster.Containers.CloudVMs.DriverParameters, setID, sharedResourceTags, logger)
+ if maxops := cluster.Containers.CloudVMs.MaxCloudOpsPerSecond; maxops > 0 {
+ is = rateLimitedInstanceSet{
InstanceSet: is,
ticker: time.NewTicker(time.Second / time.Duration(maxops)),
}
}
+ is = defaultTaggingInstanceSet{
+ InstanceSet: is,
+ defaultTags: cloud.InstanceTags(cluster.Containers.CloudVMs.ResourceTags),
+ }
+ is = filteringInstanceSet{
+ InstanceSet: is,
+ logger: logger,
+ }
return is, err
}
<-inst.ticker.C
return inst.Instance.Destroy()
}
+
+// Adds the specified defaultTags to every Create() call.
+type defaultTaggingInstanceSet struct {
+ cloud.InstanceSet
+ defaultTags cloud.InstanceTags
+}
+
+func (is defaultTaggingInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, init cloud.InitCommand, pk ssh.PublicKey) (cloud.Instance, error) {
+ allTags := cloud.InstanceTags{}
+ for k, v := range is.defaultTags {
+ allTags[k] = v
+ }
+ for k, v := range tags {
+ allTags[k] = v
+ }
+ return is.InstanceSet.Create(it, image, allTags, init, pk)
+}
+
+// Filters the instances returned by the wrapped InstanceSet's
+// Instances() method (in case the wrapped InstanceSet didn't do this
+// itself).
+type filteringInstanceSet struct {
+ cloud.InstanceSet
+ logger logrus.FieldLogger
+}
+
+func (is filteringInstanceSet) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
+ instances, err := is.InstanceSet.Instances(tags)
+
+ skipped := 0
+ var returning []cloud.Instance
+nextInstance:
+ for _, inst := range instances {
+ instTags := inst.Tags()
+ for k, v := range tags {
+ if instTags[k] != v {
+ skipped++
+ continue nextInstance
+ }
+ }
+ returning = append(returning, inst)
+ }
+ is.logger.WithFields(logrus.Fields{
+ "returning": len(returning),
+ "skipped": skipped,
+ }).WithError(err).Debugf("filteringInstanceSet returning instances")
+ return returning, err
+}
}
// InstanceSet returns a new *StubInstanceSet.
-func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
+func (sd *StubDriver) InstanceSet(params json.RawMessage, id cloud.InstanceSetID, _ cloud.SharedResourceTags, logger logrus.FieldLogger) (cloud.InstanceSet, error) {
if sd.holdCloudOps == nil {
sd.holdCloudOps = make(chan bool)
}
tagKeyInstanceType = "InstanceType"
tagKeyIdleBehavior = "IdleBehavior"
tagKeyInstanceSecret = "InstanceSecret"
+ tagKeyInstanceSetID = "InstanceSetID"
)
// An InstanceView shows a worker's current state and recent activity.
//
// New instances are configured and set up according to the given
// cluster configuration.
-func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
+func NewPool(logger logrus.FieldLogger, arvClient *arvados.Client, reg *prometheus.Registry, instanceSetID cloud.InstanceSetID, instanceSet cloud.InstanceSet, newExecutor func(cloud.Instance) Executor, installPublicKey ssh.PublicKey, cluster *arvados.Cluster) *Pool {
wp := &Pool{
logger: logger,
arvClient: arvClient,
+ instanceSetID: instanceSetID,
instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
newExecutor: newExecutor,
- bootProbeCommand: cluster.CloudVMs.BootProbeCommand,
- imageID: cloud.ImageID(cluster.CloudVMs.ImageID),
+ bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
+ imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
instanceTypes: cluster.InstanceTypes,
- maxProbesPerSecond: cluster.Dispatch.MaxProbesPerSecond,
- probeInterval: duration(cluster.Dispatch.ProbeInterval, defaultProbeInterval),
- syncInterval: duration(cluster.CloudVMs.SyncInterval, defaultSyncInterval),
- timeoutIdle: duration(cluster.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
- timeoutBooting: duration(cluster.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
- timeoutProbe: duration(cluster.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
- timeoutShutdown: duration(cluster.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
- timeoutTERM: duration(cluster.Dispatch.TimeoutTERM, defaultTimeoutTERM),
- timeoutSignal: duration(cluster.Dispatch.TimeoutSignal, defaultTimeoutSignal),
+ maxProbesPerSecond: cluster.Containers.CloudVMs.MaxProbesPerSecond,
+ probeInterval: duration(cluster.Containers.CloudVMs.ProbeInterval, defaultProbeInterval),
+ syncInterval: duration(cluster.Containers.CloudVMs.SyncInterval, defaultSyncInterval),
+ timeoutIdle: duration(cluster.Containers.CloudVMs.TimeoutIdle, defaultTimeoutIdle),
+ timeoutBooting: duration(cluster.Containers.CloudVMs.TimeoutBooting, defaultTimeoutBooting),
+ timeoutProbe: duration(cluster.Containers.CloudVMs.TimeoutProbe, defaultTimeoutProbe),
+ timeoutShutdown: duration(cluster.Containers.CloudVMs.TimeoutShutdown, defaultTimeoutShutdown),
+ timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
+ timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
installPublicKey: installPublicKey,
+ tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
stop: make(chan bool),
}
wp.registerMetrics(reg)
// configuration
logger logrus.FieldLogger
arvClient *arvados.Client
+ instanceSetID cloud.InstanceSetID
instanceSet *throttledInstanceSet
newExecutor func(cloud.Instance) Executor
bootProbeCommand string
timeoutTERM time.Duration
timeoutSignal time.Duration
installPublicKey ssh.PublicKey
+ tagKeyPrefix string
// private state
subscribers map[<-chan struct{}]chan<- struct{}
go func() {
defer wp.notify()
tags := cloud.InstanceTags{
- tagKeyInstanceType: it.Name,
- tagKeyIdleBehavior: string(IdleBehaviorRun),
- tagKeyInstanceSecret: secret,
+ wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID),
+ wp.tagKeyPrefix + tagKeyInstanceType: it.Name,
+ wp.tagKeyPrefix + tagKeyIdleBehavior: string(IdleBehaviorRun),
+ wp.tagKeyPrefix + tagKeyInstanceSecret: secret,
}
initCmd := cloud.InitCommand(fmt.Sprintf("umask 0177 && echo -n %q >%s", secret, instanceSecretFilename))
inst, err := wp.instanceSet.Create(it, wp.imageID, tags, initCmd, wp.installPublicKey)
//
// Caller must have lock.
func (wp *Pool) updateWorker(inst cloud.Instance, it arvados.InstanceType) (*worker, bool) {
- inst = tagVerifier{inst}
+ secret := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceSecret]
+ inst = tagVerifier{inst, secret}
id := inst.ID()
if wkr := wp.workers[id]; wkr != nil {
wkr.executor.SetTarget(inst)
}
state := StateUnknown
- if _, ok := wp.creating[inst.Tags()[tagKeyInstanceSecret]]; ok {
+ if _, ok := wp.creating[secret]; ok {
state = StateBooting
}
// process); otherwise, default to "run". After this,
// wkr.idleBehavior is the source of truth, and will only be
// changed via SetIdleBehavior().
- idleBehavior := IdleBehavior(inst.Tags()[tagKeyIdleBehavior])
+ idleBehavior := IdleBehavior(inst.Tags()[wp.tagKeyPrefix+tagKeyIdleBehavior])
if !validIdleBehavior[idleBehavior] {
idleBehavior = IdleBehaviorRun
}
}
wp.logger.Debug("getting instance list")
threshold := time.Now()
- instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
+ instances, err := wp.instanceSet.Instances(cloud.InstanceTags{wp.tagKeyPrefix + tagKeyInstanceSetID: string(wp.instanceSetID)})
if err != nil {
wp.instanceSet.throttleInstances.CheckRateLimitError(err, wp.logger, "list instances", wp.notify)
return err
notify := false
for _, inst := range instances {
- itTag := inst.Tags()[tagKeyInstanceType]
+ itTag := inst.Tags()[wp.tagKeyPrefix+tagKeyInstanceType]
it, ok := wp.instanceTypes[itTag]
if !ok {
wp.logger.WithField("Instance", inst).Errorf("unknown InstanceType tag %q --- ignoring", itTag)
logger := ctxlog.TestLogger(c)
driver := &test.StubDriver{}
- is, err := driver.InstanceSet(nil, "", logger)
+ instanceSetID := cloud.InstanceSetID("test-instance-set-id")
+ is, err := driver.InstanceSet(nil, instanceSetID, nil, logger)
c.Assert(err, check.IsNil)
newExecutor := func(cloud.Instance) Executor {
}
cluster := &arvados.Cluster{
- Dispatch: arvados.Dispatch{
- MaxProbesPerSecond: 1000,
- ProbeInterval: arvados.Duration(time.Millisecond * 10),
- },
- CloudVMs: arvados.CloudVMs{
- BootProbeCommand: "true",
- SyncInterval: arvados.Duration(time.Millisecond * 10),
+ Containers: arvados.ContainersConfig{
+ CloudVMs: arvados.CloudVMsConfig{
+ BootProbeCommand: "true",
+ MaxProbesPerSecond: 1000,
+ ProbeInterval: arvados.Duration(time.Millisecond * 10),
+ SyncInterval: arvados.Duration(time.Millisecond * 10),
+ TagKeyPrefix: "testprefix:",
+ },
},
InstanceTypes: arvados.InstanceTypeMap{
type1.Name: type1,
},
}
- pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, nil, cluster)
+ pool := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
notify := pool.Subscribe()
defer pool.Unsubscribe(notify)
pool.Create(type1)
}
}
// Wait for the tags to save to the cloud provider
+ tagKey := cluster.Containers.CloudVMs.TagKeyPrefix + tagKeyIdleBehavior
deadline := time.Now().Add(time.Second)
for !func() bool {
pool.mtx.RLock()
defer pool.mtx.RUnlock()
for _, wkr := range pool.workers {
if wkr.instType == type2 {
- return wkr.instance.Tags()[tagKeyIdleBehavior] == string(IdleBehaviorHold)
+ return wkr.instance.Tags()[tagKey] == string(IdleBehaviorHold)
}
}
return false
c.Log("------- starting new pool, waiting to recover state")
- pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), is, newExecutor, nil, cluster)
+ pool2 := NewPool(logger, arvados.NewClientFromEnv(), prometheus.NewRegistry(), instanceSetID, is, newExecutor, nil, cluster)
notify2 := pool2.Subscribe()
defer pool2.Unsubscribe(notify2)
waitForIdle(pool2, notify2)
func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
logger := ctxlog.TestLogger(c)
driver := test.StubDriver{HoldCloudOps: true}
- instanceSet, err := driver.InstanceSet(nil, "", logger)
+ instanceSet, err := driver.InstanceSet(nil, "test-instance-set-id", nil, logger)
c.Assert(err, check.IsNil)
type1 := arvados.InstanceType{Name: "a1s", ProviderType: "a1.small", VCPUs: 1, RAM: 1 * GiB, Price: .01}
type tagVerifier struct {
cloud.Instance
+ secret string
}
func (tv tagVerifier) VerifyHostKey(pubKey ssh.PublicKey, client *ssh.Client) error {
- expectSecret := tv.Instance.Tags()[tagKeyInstanceSecret]
- if err := tv.Instance.VerifyHostKey(pubKey, client); err != cloud.ErrNotImplemented || expectSecret == "" {
+ if err := tv.Instance.VerifyHostKey(pubKey, client); err != cloud.ErrNotImplemented || tv.secret == "" {
// If the wrapped instance indicates it has a way to
// verify the key, return that decision.
return err
if err != nil {
return err
}
- if stdout.String() != expectSecret {
+ if stdout.String() != tv.secret {
return errBadInstanceSecret
}
return nil
instance := wkr.instance
tags := instance.Tags()
update := cloud.InstanceTags{
- tagKeyInstanceType: wkr.instType.Name,
- tagKeyIdleBehavior: string(wkr.idleBehavior),
+ wkr.wp.tagKeyPrefix + tagKeyInstanceType: wkr.instType.Name,
+ wkr.wp.tagKeyPrefix + tagKeyIdleBehavior: string(wkr.idleBehavior),
}
save := false
for k, v := range update {
bootTimeout := time.Minute
probeTimeout := time.Second
- is, err := (&test.StubDriver{}).InstanceSet(nil, "", logger)
+ is, err := (&test.StubDriver{}).InstanceSet(nil, "test-instance-set-id", nil, logger)
c.Assert(err, check.IsNil)
inst, err := is.Create(arvados.InstanceType{}, "", nil, "echo InitCommand", nil)
c.Assert(err, check.IsNil)
"fmt"
"io"
"io/ioutil"
+ "net"
"net/http"
"net/url"
"os"
+ "strings"
"git.curoverse.com/arvados.git/lib/cmd"
"git.curoverse.com/arvados.git/lib/config"
CheckHealth() error
}
-type NewHandlerFunc func(_ context.Context, _ *arvados.Cluster, _ *arvados.NodeProfile, token string) Handler
+type NewHandlerFunc func(_ context.Context, _ *arvados.Cluster, token string) Handler
type command struct {
newHandler NewHandlerFunc
flags := flag.NewFlagSet("", flag.ContinueOnError)
flags.SetOutput(stderr)
configFile := flags.String("config", arvados.DefaultConfigFile, "Site configuration `file`")
- nodeProfile := flags.String("node-profile", "", "`Name` of NodeProfiles config entry to use (if blank, use $ARVADOS_NODE_PROFILE or hostname reported by OS)")
err = flags.Parse(args)
if err == flag.ErrHelp {
err = nil
if err != nil {
return 1
}
- log = ctxlog.New(stderr, cluster.Logging.Format, cluster.Logging.Level).WithFields(logrus.Fields{
+ log = ctxlog.New(stderr, cluster.SystemLogs.Format, cluster.SystemLogs.LogLevel).WithFields(logrus.Fields{
"PID": os.Getpid(),
})
ctx := ctxlog.Context(c.ctx, log)
- profileName := *nodeProfile
- if profileName == "" {
- profileName = os.Getenv("ARVADOS_NODE_PROFILE")
- }
- profile, err := cluster.GetNodeProfile(profileName)
+ listen, err := getListenAddr(cluster.Services, c.svcName)
if err != nil {
return 1
}
- listen := profile.ServicePorts()[c.svcName]
- if listen == "" {
- err = fmt.Errorf("configuration does not enable the %s service on this host", c.svcName)
- return 1
- }
if cluster.SystemRootToken == "" {
log.Warn("SystemRootToken missing from cluster config, falling back to ARVADOS_API_TOKEN environment variable")
}
}
- handler := c.newHandler(ctx, cluster, profile, cluster.SystemRootToken)
+ handler := c.newHandler(ctx, cluster, cluster.SystemRootToken)
if err = handler.CheckHealth(); err != nil {
return 1
}
}
const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
+
+func getListenAddr(svcs arvados.Services, prog arvados.ServiceName) (string, error) {
+ svc, ok := svcs.Map()[prog]
+ if !ok {
+ return "", fmt.Errorf("unknown service name %q", prog)
+ }
+ for url := range svc.InternalURLs {
+ if strings.HasPrefix(url.Host, "localhost:") {
+ return url.Host, nil
+ }
+ listener, err := net.Listen("tcp", url.Host)
+ if err == nil {
+ listener.Close()
+ return url.Host, nil
+ }
+ }
+ return "", fmt.Errorf("configuration does not enable the %s service on this host", prog)
+}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- cmd := Command(arvados.ServiceNameController, func(ctx context.Context, _ *arvados.Cluster, _ *arvados.NodeProfile, token string) Handler {
+ cmd := Command(arvados.ServiceNameController, func(ctx context.Context, _ *arvados.Cluster, token string) Handler {
c.Check(ctx.Value("foo"), check.Equals, "bar")
c.Check(token, check.Equals, "abcde")
return &testHandler{ctx: ctx, healthCheck: healthCheck}
// responds 500 to all requests. ErrorHandler itself logs the given
// error once, and the handler logs it again for each incoming
// request.
-func ErrorHandler(ctx context.Context, _ *arvados.Cluster, _ *arvados.NodeProfile, err error) Handler {
+func ErrorHandler(ctx context.Context, _ *arvados.Cluster, err error) Handler {
logger := ctxlog.FromContext(ctx)
logger.WithError(err).Error("unhealthy service")
return errorHandler{err, logger}
Log (undef, "collate");
my ($child_out, $child_in);
- my $pid = open2($child_out, $child_in, 'python', '-c', q{
+ # This depends on the python-arvados-python-client package, which needs to be installed
+ # on the machine running crunch-dispatch (typically, the API server).
+ my $pid = open2($child_out, $child_in, '/usr/share/python2.7/dist/python-arvados-python-client/bin/python', '-c', q{
import arvados
import sys
print (arvados.api("v1").collections().
end
def test_output_collection_owner_uuid
+ skip "Depends on a post 1.3 python-arvados-python-client package being installed"
+
j = jobspec :grep_local
out, err = capture_subprocess_io do
tryjobrecord j, binstubs: ['arv-mount', 'output_coll_owner']
def add_arv_hints():
cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
- res = pkg_resources.resource_stream(__name__, 'arv-cwl-schema.yml')
- use_custom_schema("v1.0", "http://arvados.org/cwl", res.read())
- res.close()
+ res10 = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-v1.0.yml')
+ res11 = pkg_resources.resource_stream(__name__, 'arv-cwl-schema-v1.1.yml')
+ customschema10 = res10.read()
+ customschema11 = res11.read()
+ use_custom_schema("v1.0", "http://arvados.org/cwl", customschema10)
+ use_custom_schema("v1.1.0-dev1", "http://arvados.org/cwl", customschema11)
+ use_custom_schema("v1.1", "http://arvados.org/cwl", customschema11)
+ res10.close()
+ res11.close()
cwltool.process.supportedProcessRequirements.extend([
"http://arvados.org/cwl#RunInSingleContainer",
"http://arvados.org/cwl#OutputDirType",
else:
arvados.log_handler.setFormatter(logging.Formatter('%(name)s %(levelname)s: %(message)s'))
+ if stdout is sys.stdout:
+ # cwltool.main has code to work around encoding issues with
+ # sys.stdout and unix pipes (they default to ASCII encoding,
+ # we want utf-8), so when stdout is sys.stdout set it to None
+ # to take advantage of that. Don't override it for all cases
+ # since we still want to be able to capture stdout for the
+ # unit tests.
+ stdout = None
+
return cwltool.main.main(args=arvargs,
stdout=stdout,
stderr=stderr,
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: Apache-2.0
+
+$base: "http://arvados.org/cwl#"
+$namespaces:
+ cwl: "https://w3id.org/cwl/cwl#"
+ cwltool: "http://commonwl.org/cwltool#"
+$graph:
+- $import: https://w3id.org/cwl/CommonWorkflowLanguage.yml
+
+- name: cwltool:Secrets
+ type: record
+ inVocab: false
+ extends: cwl:ProcessRequirement
+ fields:
+ class:
+ type: string
+ doc: "Always 'Secrets'"
+ jsonldPredicate:
+ "_id": "@type"
+ "_type": "@vocab"
+ secrets:
+ type: string[]
+ doc: |
+ List one or more input parameters that are sensitive (such as passwords)
+ which will be deliberately obscured from logging.
+ jsonldPredicate:
+ "_type": "@id"
+ refScope: 0
+
+- name: RunInSingleContainer
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Indicates that a subworkflow should run in a single container
+ and not be scheduled as separate steps.
+ fields:
+ - name: class
+ type: string
+ doc: "Always 'arv:RunInSingleContainer'"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+
+- name: OutputDirType
+ type: enum
+ symbols:
+ - local_output_dir
+ - keep_output_dir
+ doc:
+ - |
+ local_output_dir: Use regular file system local to the compute node.
+ There must be sufficient local scratch space to store entire output;
+ specify this with `outdirMin` of `ResourceRequirement`. Files are
+ batch uploaded to Keep when the process completes. Most compatible, but
+ upload step can be time consuming for very large files.
+ - |
+ keep_output_dir: Use writable Keep mount. Files are streamed to Keep as
+ they are written. Does not consume local scratch space, but does consume
+ RAM for output buffers (up to 192 MiB per file simultaneously open for
+ writing.) Best suited to processes which produce sequential output of
+ large files (non-sequential writes may produced fragmented file
+ manifests). Supports regular files and directories, does not support
+ special files such as symlinks, hard links, named pipes, named sockets,
+ or device nodes.
+
+
+- name: RuntimeConstraints
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Set Arvados-specific runtime hints.
+ fields:
+ - name: class
+ type: string
+ doc: "Always 'arv:RuntimeConstraints'"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+ - name: keep_cache
+ type: int?
+ doc: |
+ Size of file data buffer for Keep mount in MiB. Default is 256
+ MiB. Increase this to reduce cache thrashing in situations such as
+ accessing multiple large (64+ MiB) files at the same time, or
+ performing random access on a large file.
+ - name: outputDirType
+ type: OutputDirType?
+ doc: |
+ Preferred backing store for output staging. If not specified, the
+ system may choose which one to use.
+
+- name: PartitionRequirement
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Select preferred compute partitions on which to run jobs.
+ fields:
+ - name: class
+ type: string
+ doc: "Always 'arv:PartitionRequirement'"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+ - name: partition
+ type:
+ - string
+ - string[]
+
+- name: APIRequirement
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Indicates that process wants to access to the Arvados API. Will be granted
+ limited network access and have ARVADOS_API_HOST and ARVADOS_API_TOKEN set
+ in the environment.
+ fields:
+ - name: class
+ type: string
+ doc: "Always 'arv:APIRequirement'"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+
+- name: IntermediateOutput
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Specify desired handling of intermediate output collections.
+ fields:
+ class:
+ type: string
+ doc: "Always 'arv:IntermediateOutput'"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+ outputTTL:
+ type: int
+ doc: |
+ If the value is greater than zero, consider intermediate output
+ collections to be temporary and should be automatically
+ trashed. Temporary collections will be trashed `outputTTL` seconds
+ after creation. A value of zero means intermediate output should be
+ retained indefinitely (this is the default behavior).
+
+ Note: arvados-cwl-runner currently does not take workflow dependencies
+ into account when setting the TTL on an intermediate output
+ collection. If the TTL is too short, it is possible for a collection to
+ be trashed before downstream steps that consume it are started. The
+ recommended minimum value for TTL is the expected duration of the
+ entire the workflow.
+
+- name: WorkflowRunnerResources
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Specify memory or cores resource request for the CWL runner process itself.
+ fields:
+ class:
+ type: string
+ doc: "Always 'arv:WorkflowRunnerResources'"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+ ramMin:
+ type: int?
+ doc: Minimum RAM, in mebibytes (2**20)
+ jsonldPredicate: "https://w3id.org/cwl/cwl#ResourceRequirement/ramMin"
+ coresMin:
+ type: int?
+ doc: Minimum cores allocated to cwl-runner
+ jsonldPredicate: "https://w3id.org/cwl/cwl#ResourceRequirement/coresMin"
+ keep_cache:
+ type: int?
+ doc: |
+ Size of collection metadata cache for the workflow runner, in
+ MiB. Default 256 MiB. Will be added on to the RAM request
+ when determining node size to request.
+ jsonldPredicate: "http://arvados.org/cwl#RuntimeConstraints/keep_cache"
+
+- name: ClusterTarget
+ type: record
+ extends: cwl:ProcessRequirement
+ inVocab: false
+ doc: |
+ Specify where a workflow step should run
+ fields:
+ class:
+ type: string
+ doc: "Always 'arv:ClusterTarget'"
+ jsonldPredicate:
+ _id: "@type"
+ _type: "@vocab"
+ cluster_id:
+ type: string?
+ doc: The cluster to run the container
+ project_uuid:
+ type: string?
+ doc: The project that will own the container requests and intermediate collections
runtimeContext.pull_image,
runtimeContext.project_uuid)
+ network_req, _ = self.get_requirement("NetworkAccess")
+ if network_req:
+ runtime_constraints["API"] = network_req["networkAccess"]
+
api_req, _ = self.get_requirement("http://arvados.org/cwl#APIRequirement")
if api_req:
runtime_constraints["API"] = True
if self.output_ttl < 0:
raise WorkflowException("Invalid value %d for output_ttl, cannot be less than zero" % container_request["output_ttl"])
- if self.timelimit is not None:
+ if self.timelimit is not None and self.timelimit > 0:
scheduling_parameters["max_run_time"] = self.timelimit
extra_submit_params = {}
enable_reuse = runtimeContext.enable_reuse
if enable_reuse:
+ reuse_req, _ = self.get_requirement("WorkReuse")
+ if reuse_req:
+ enable_reuse = reuse_req["enableReuse"]
reuse_req, _ = self.get_requirement("http://arvados.org/cwl#ReuseRequirement")
if reuse_req:
enable_reuse = reuse_req["enableReuse"]
if self.arvrunner.project_uuid:
command.append("--project-uuid="+self.arvrunner.project_uuid)
+ if self.enable_dev:
+ command.append("--enable-dev")
+
command.extend([workflowpath, "/var/lib/cwl/cwl.input.json"])
container_req["command"] = command
try:
if record["state"] == "Complete":
processStatus = "success"
+ # we don't have the real exit code so fake it.
+ record["exit_code"] = 0
else:
processStatus = "permanentFail"
+ record["exit_code"] = 1
outputs = {}
try:
outputs = done.done(self, record, dirs["tmpdir"],
dirs["outdir"], dirs["keep"])
except WorkflowException as e:
- # Only include a stack trace if in debug mode.
- # This is most likely a user workflow error and a stack trace may obfuscate more useful output.
+ # Only include a stack trace if in debug mode.
+ # This is most likely a user workflow error and a stack trace may obfuscate more useful output.
logger.error("%s unable to collect output from %s:\n%s",
self.arvrunner.label(self), record["output"], e, exc_info=(e if self.arvrunner.debug else False))
processStatus = "permanentFail"
# SPDX-License-Identifier: Apache-2.0
from cwltool.command_line_tool import CommandLineTool, ExpressionTool
-from cwltool.builder import Builder
from .arvjob import ArvadosJob
from .arvcontainer import ArvadosContainer
from .pathmapper import ArvPathMapper
+from .runner import make_builder
from functools import partial
from schema_salad.sourceline import SourceLine
from cwltool.errors import WorkflowException
return runtimeContext
-def make_builder(joborder, hints, requirements, runtimeContext):
- return Builder(
- job=joborder,
- files=[], # type: List[Dict[Text, Text]]
- bindings=[], # type: List[Dict[Text, Any]]
- schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
- names=None, # type: Names
- requirements=requirements, # type: List[Dict[Text, Any]]
- hints=hints, # type: List[Dict[Text, Any]]
- resources={}, # type: Dict[str, int]
- mutation_manager=None, # type: Optional[MutationManager]
- formatgraph=None, # type: Optional[Graph]
- make_fs_access=None, # type: Type[StdFsAccess]
- fs_access=None, # type: StdFsAccess
- job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
- timeout=runtimeContext.eval_timeout, # type: float
- debug=runtimeContext.debug, # type: bool
- js_console=runtimeContext.js_console, # type: bool
- force_docker_pull=runtimeContext.force_docker_pull, # type: bool
- loadListing="", # type: Text
- outdir="", # type: Text
- tmpdir="", # type: Text
- stagedir="", # type: Text
- )
class ArvadosCommandTool(CommandLineTool):
"""Wrap cwltool CommandLineTool to override selected methods."""
import ruamel.yaml as yaml
from .runner import (upload_dependencies, packed_workflow, upload_workflow_collection,
- trim_anonymous_location, remove_redundant_fields, discover_secondary_files)
+ trim_anonymous_location, remove_redundant_fields, discover_secondary_files,
+ make_builder)
from .pathmapper import ArvPathMapper, trim_listing
-from .arvtool import ArvadosCommandTool, set_cluster_target, make_builder
+from .arvtool import ArvadosCommandTool, set_cluster_target
+
from .perf import Perf
logger = logging.getLogger('arvados.cwl-runner')
raise WorkflowException("%s object must have 'id'" % (self.tool["class"]))
document_loader, workflowobj, uri = (self.doc_loader, self.doc_loader.fetch(self.tool["id"]), self.tool["id"])
- discover_secondary_files(self.tool["inputs"], joborder)
+ discover_secondary_files(self.arvrunner.fs_access, builder,
+ self.tool["inputs"], joborder)
with Perf(metrics, "subworkflow upload_deps"):
upload_dependencies(self.arvrunner,
adjustDirObjs(packed, keepmount)
self.wf_pdh = upload_workflow_collection(self.arvrunner, shortname(self.tool["id"]), packed)
+ self.loadingContext = self.loadingContext.copy()
+ self.loadingContext.metadata = self.loadingContext.metadata.copy()
+ self.loadingContext.metadata["http://commonwl.org/cwltool#original_cwlVersion"] = "v1.0"
+
wf_runner = cmap({
"class": "CommandLineTool",
"baseCommand": "cwltool",
def done_outputs(self, record, tmpdir, outdir, keepdir):
self.builder.outdir = outdir
self.builder.pathmapper.keepdir = keepdir
- return self.collect_outputs("keep:" + record["output"])
+ return self.collect_outputs("keep:" + record["output"], record["exit_code"])
crunchstat_re = re.compile(r"^\d{4}-\d\d-\d\d_\d\d:\d\d:\d\d [a-z0-9]{5}-8i9sb-[a-z0-9]{15} \d+ \d+ stderr crunchstat:")
timestamp_re = re.compile(r"^(\d{4}-\d\d-\d\dT\d\d:\d\d:\d\d\.\d+Z) (.*)")
from builtins import next
from builtins import object
from builtins import str
-from future.utils import viewvalues
+from future.utils import viewvalues, viewitems
import argparse
import logging
from cwltool.process import shortname, UnsupportedRequirement, use_custom_schema
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, get_listing, visit_class
from cwltool.command_line_tool import compute_checksums
+from cwltool.load_tool import load_tool
logger = logging.getLogger('arvados.cwl-runner')
metrics = logging.getLogger('arvados.cwl-runner.metrics')
self.poll_interval = 12
self.loadingContext = None
self.should_estimate_cache_size = True
+ self.fs_access = None
+ self.secret_store = None
if keep_client is not None:
self.keep_client = keep_client
num_retries=self.num_retries)
self.work_api = None
- expected_api = ["jobs", "containers"]
+ expected_api = ["containers", "jobs"]
for api in expected_api:
try:
methods = self.api._rootDesc.get('resources')[api]['methods']
while keys:
page = keys[:pageSize]
- keys = keys[pageSize:]
try:
proc_states = table.list(filters=[["uuid", "in", page]]).execute(num_retries=self.num_retries)
except Exception:
"new_attributes": p
}
})
+ keys = keys[pageSize:]
+
finish_poll = time.time()
remain_wait = self.poll_interval - (finish_poll - begin_poll)
except:
except (KeyboardInterrupt, SystemExit):
break
- def check_features(self, obj):
+ def check_features(self, obj, parentfield=""):
if isinstance(obj, dict):
if obj.get("writable") and self.work_api != "containers":
raise SourceLine(obj, "writable", UnsupportedRequirement).makeError("InitialWorkDir feature 'writable: true' not supported with --api=jobs")
"Option 'dockerOutputDirectory' must be an absolute path.")
if obj.get("class") == "http://commonwl.org/cwltool#Secrets" and self.work_api != "containers":
raise SourceLine(obj, "class", UnsupportedRequirement).makeError("Secrets not supported with --api=jobs")
- for v in viewvalues(obj):
- self.check_features(v)
+ if obj.get("class") == "InplaceUpdateRequirement":
+ if obj["inplaceUpdate"] and parentfield == "requirements":
+ raise SourceLine(obj, "class", UnsupportedRequirement).makeError("InplaceUpdateRequirement not supported for keep collections.")
+ for k,v in viewitems(obj):
+ self.check_features(v, parentfield=k)
elif isinstance(obj, list):
for i,v in enumerate(obj):
with SourceLine(obj, i, UnsupportedRequirement, logger.isEnabledFor(logging.DEBUG)):
- self.check_features(v)
+ self.check_features(v, parentfield=parentfield)
def make_output_collection(self, name, storage_classes, tagsString, outputObj):
outputObj = copy.deepcopy(outputObj)
'progress':1.0
}).execute(num_retries=self.num_retries)
+ def apply_reqs(self, job_order_object, tool):
+ if "https://w3id.org/cwl/cwl#requirements" in job_order_object:
+ if tool.metadata.get("http://commonwl.org/cwltool#original_cwlVersion") == 'v1.0':
+ raise WorkflowException(
+ "`cwl:requirements` in the input object is not part of CWL "
+ "v1.0. You can adjust to use `cwltool:overrides` instead; or you "
+ "can set the cwlVersion to v1.1 or greater and re-run with "
+ "--enable-dev.")
+ job_reqs = job_order_object["https://w3id.org/cwl/cwl#requirements"]
+ for req in job_reqs:
+ tool.requirements.append(req)
+
def arv_executor(self, tool, job_order, runtimeContext, logger=None):
self.debug = runtimeContext.debug
if not runtimeContext.name:
runtimeContext.name = self.name = tool.tool.get("label") or tool.metadata.get("label") or os.path.basename(tool.tool["id"])
+ # Upload local file references in the job order.
+ job_order = upload_job_order(self, "%s input" % runtimeContext.name,
+ tool, job_order)
+
+ submitting = (runtimeContext.update_workflow or
+ runtimeContext.create_workflow or
+ (runtimeContext.submit and not
+ (tool.tool["class"] == "CommandLineTool" and
+ runtimeContext.wait and
+ not runtimeContext.always_submit_runner)))
+
+ loadingContext = self.loadingContext.copy()
+ loadingContext.do_validate = False
+ loadingContext.do_update = False
+ if submitting:
+ # Document may have been auto-updated. Reload the original
+ # document with updating disabled because we want to
+ # submit the original document, not the auto-updated one.
+ tool = load_tool(tool.tool["id"], loadingContext)
+
# Upload direct dependencies of workflow steps, get back mapping of files to keep references.
# Also uploads docker images.
merged_map = upload_workflow_deps(self, tool)
- # Reload tool object which may have been updated by
- # upload_workflow_deps
- # Don't validate this time because it will just print redundant errors.
- loadingContext = self.loadingContext.copy()
+ # Recreate process object (ArvadosWorkflow or
+ # ArvadosCommandTool) because tool document may have been
+ # updated by upload_workflow_deps in ways that modify
+ # inheritance of hints or requirements.
loadingContext.loader = tool.doc_loader
loadingContext.avsc_names = tool.doc_schema
loadingContext.metadata = tool.metadata
- loadingContext.do_validate = False
-
- tool = self.arv_make_tool(tool.doc_loader.idx[tool.tool["id"]],
- loadingContext)
-
- # Upload local file references in the job order.
- job_order = upload_job_order(self, "%s input" % runtimeContext.name,
- tool, job_order)
+ tool = load_tool(tool.tool, loadingContext)
existing_uuid = runtimeContext.update_workflow
if existing_uuid or runtimeContext.create_workflow:
merged_map=merged_map),
"success")
+ self.apply_reqs(job_order, tool)
+
self.ignore_docker_for_reuse = runtimeContext.ignore_docker_for_reuse
self.eval_timeout = runtimeContext.eval_timeout
from future import standard_library
standard_library.install_aliases()
from future.utils import viewvalues, viewitems
+from past.builtins import basestring
import os
import sys
from functools import partial
import logging
import json
+import copy
from collections import namedtuple
from io import StringIO
+from typing import Mapping, Sequence
if os.name == "posix" and sys.version_info[0] < 3:
import subprocess32 as subprocess
from cwltool.command_line_tool import CommandLineTool
import cwltool.workflow
-from cwltool.process import scandeps, UnsupportedRequirement, normalizeFilesDirs, shortname, Process
+from cwltool.process import (scandeps, UnsupportedRequirement, normalizeFilesDirs,
+ shortname, Process, fill_in_defaults)
from cwltool.load_tool import fetch_document
from cwltool.pathmapper import adjustFileObjs, adjustDirObjs, visit_class
from cwltool.utils import aslist
from cwltool.builder import substitute
from cwltool.pack import pack
+from cwltool.update import INTERNAL_VERSION
+from cwltool.builder import Builder
import schema_salad.validate as validate
import arvados.collection
from .pathmapper import ArvPathMapper, trim_listing
from ._version import __version__
from . import done
+from . context import ArvRuntimeContext
logger = logging.getLogger('arvados.cwl-runner')
for i in viewvalues(d):
find_defaults(i, op)
-def setSecondary(t, fileobj, discovered):
- if isinstance(fileobj, dict) and fileobj.get("class") == "File":
- if "secondaryFiles" not in fileobj:
- fileobj["secondaryFiles"] = cmap([{"location": substitute(fileobj["location"], sf), "class": "File"} for sf in t["secondaryFiles"]])
- if discovered is not None:
- discovered[fileobj["location"]] = fileobj["secondaryFiles"]
- elif isinstance(fileobj, list):
- for e in fileobj:
- setSecondary(t, e, discovered)
-
-def discover_secondary_files(inputs, job_order, discovered=None):
- for t in inputs:
- if shortname(t["id"]) in job_order and t.get("secondaryFiles"):
- setSecondary(t, job_order[shortname(t["id"])], discovered)
+def make_builder(joborder, hints, requirements, runtimeContext):
+ return Builder(
+ job=joborder,
+ files=[], # type: List[Dict[Text, Text]]
+ bindings=[], # type: List[Dict[Text, Any]]
+ schemaDefs={}, # type: Dict[Text, Dict[Text, Any]]
+ names=None, # type: Names
+ requirements=requirements, # type: List[Dict[Text, Any]]
+ hints=hints, # type: List[Dict[Text, Any]]
+ resources={}, # type: Dict[str, int]
+ mutation_manager=None, # type: Optional[MutationManager]
+ formatgraph=None, # type: Optional[Graph]
+ make_fs_access=None, # type: Type[StdFsAccess]
+ fs_access=None, # type: StdFsAccess
+ job_script_provider=runtimeContext.job_script_provider, # type: Optional[Any]
+ timeout=runtimeContext.eval_timeout, # type: float
+ debug=runtimeContext.debug, # type: bool
+ js_console=runtimeContext.js_console, # type: bool
+ force_docker_pull=runtimeContext.force_docker_pull, # type: bool
+ loadListing="", # type: Text
+ outdir="", # type: Text
+ tmpdir="", # type: Text
+ stagedir="", # type: Text
+ )
+
+def search_schemadef(name, reqs):
+ for r in reqs:
+ if r["class"] == "SchemaDefRequirement":
+ for sd in r["types"]:
+ if sd["name"] == name:
+ return sd
+ return None
+
+primitive_types_set = frozenset(("null", "boolean", "int", "long",
+ "float", "double", "string", "record",
+ "array", "enum"))
+
+def set_secondary(fsaccess, builder, inputschema, secondaryspec, primary, discovered):
+ if isinstance(inputschema, Sequence) and not isinstance(inputschema, basestring):
+ # union type, collect all possible secondaryFiles
+ for i in inputschema:
+ set_secondary(fsaccess, builder, i, secondaryspec, primary, discovered)
+ return
+
+ if isinstance(inputschema, basestring):
+ sd = search_schemadef(inputschema, reversed(builder.hints+builder.requirements))
+ if sd:
+ inputschema = sd
+ else:
+ return
+
+ if "secondaryFiles" in inputschema:
+ # set secondaryFiles, may be inherited by compound types.
+ secondaryspec = inputschema["secondaryFiles"]
+
+ if (isinstance(inputschema["type"], (Mapping, Sequence)) and
+ not isinstance(inputschema["type"], basestring)):
+ # compound type (union, array, record)
+ set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
+
+ elif (inputschema["type"] == "record" and
+ isinstance(primary, Mapping)):
+ #
+ # record type, find secondary files associated with fields.
+ #
+ for f in inputschema["fields"]:
+ p = primary.get(shortname(f["name"]))
+ if p:
+ set_secondary(fsaccess, builder, f, secondaryspec, p, discovered)
+
+ elif (inputschema["type"] == "array" and
+ isinstance(primary, Sequence)):
+ #
+ # array type, find secondary files of elements
+ #
+ for p in primary:
+ set_secondary(fsaccess, builder, {"type": inputschema["items"]}, secondaryspec, p, discovered)
+
+ elif (inputschema["type"] == "File" and
+ secondaryspec and
+ isinstance(primary, Mapping) and
+ primary.get("class") == "File" and
+ "secondaryFiles" not in primary):
+ #
+ # Found a file, check for secondaryFiles
+ #
+ primary["secondaryFiles"] = []
+ for i, sf in enumerate(aslist(secondaryspec)):
+ pattern = builder.do_eval(sf["pattern"], context=primary)
+ if pattern is None:
+ continue
+ sfpath = substitute(primary["location"], pattern)
+ required = builder.do_eval(sf["required"], context=primary)
+
+ if fsaccess.exists(sfpath):
+ primary["secondaryFiles"].append({"location": sfpath, "class": "File"})
+ elif required:
+ raise SourceLine(primary["secondaryFiles"], i, validate.ValidationException).makeError(
+ "Required secondary file '%s' does not exist" % sfpath)
+
+ primary["secondaryFiles"] = cmap(primary["secondaryFiles"])
+ if discovered is not None:
+ discovered[primary["location"]] = primary["secondaryFiles"]
+ elif inputschema["type"] not in primitive_types_set:
+ set_secondary(fsaccess, builder, inputschema["type"], secondaryspec, primary, discovered)
+
+def discover_secondary_files(fsaccess, builder, inputs, job_order, discovered=None):
+ for inputschema in inputs:
+ primary = job_order.get(shortname(inputschema["id"]))
+ if isinstance(primary, (Mapping, Sequence)):
+ set_secondary(fsaccess, builder, inputschema, None, primary, discovered)
collection_uuid_pattern = re.compile(r'^keep:([a-z0-9]{5}-4zz18-[a-z0-9]{15})(/.*)?$')
collection_pdh_pattern = re.compile(r'^keep:([0-9a-f]{32}\+\d+)(/.*)?')
loadref_fields = set(("$import",))
scanobj = workflowobj
- if "id" in workflowobj:
+ if "id" in workflowobj and not workflowobj["id"].startswith("_:"):
# Need raw file content (before preprocessing) to ensure
# that external references in $include and $mixin are captured.
scanobj = loadref("", workflowobj["id"])
discovered = {}
def discover_default_secondary_files(obj):
- discover_secondary_files(obj["inputs"],
- {shortname(t["id"]): t["default"] for t in obj["inputs"] if "default" in t},
+ builder_job_order = {}
+ for t in obj["inputs"]:
+ builder_job_order[shortname(t["id"])] = t["default"] if "default" in t else None
+ # Need to create a builder object to evaluate expressions.
+ builder = make_builder(builder_job_order,
+ obj.get("hints", []),
+ obj.get("requirements", []),
+ ArvRuntimeContext())
+ discover_secondary_files(arvrunner.fs_access,
+ builder,
+ obj["inputs"],
+ builder_job_order,
discovered)
visit_class(workflowobj, ("CommandLineTool", "Workflow"), discover_default_secondary_files)
object with 'location' updated to the proper keep references.
"""
- discover_secondary_files(tool.tool["inputs"], job_order)
+ # Make a copy of the job order and set defaults.
+ builder_job_order = copy.copy(job_order)
+
+ # fill_in_defaults throws an error if there are any
+ # missing required parameters, we don't want it to do that
+ # so make them all optional.
+ inputs_copy = copy.deepcopy(tool.tool["inputs"])
+ for i in inputs_copy:
+ if "null" not in i["type"]:
+ i["type"] = ["null"] + aslist(i["type"])
+
+ fill_in_defaults(inputs_copy,
+ builder_job_order,
+ arvrunner.fs_access)
+ # Need to create a builder object to evaluate expressions.
+ builder = make_builder(builder_job_order,
+ tool.hints,
+ tool.requirements,
+ ArvRuntimeContext())
+ # Now update job_order with secondaryFiles
+ discover_secondary_files(arvrunner.fs_access,
+ builder,
+ tool.tool["inputs"],
+ job_order)
jobmapper = upload_dependencies(arvrunner,
name,
collection_cache_size=256,
collection_cache_is_default=True):
+ loadingContext = loadingContext.copy()
+ loadingContext.metadata = loadingContext.metadata.copy()
+ loadingContext.metadata["cwlVersion"] = INTERNAL_VERSION
+
super(Runner, self).__init__(tool.tool, loadingContext)
self.arvrunner = runner
self.intermediate_output_ttl = intermediate_output_ttl
self.priority = priority
self.secret_store = secret_store
+ self.enable_dev = loadingContext.enable_dev
self.submit_runner_cores = 1
self.submit_runner_ram = 1024 # defaut 1 GiB
download_url="https://github.com/curoverse/arvados.git",
license='Apache 2.0',
packages=find_packages(),
- package_data={'arvados_cwl': ['arv-cwl-schema.yml']},
+ package_data={'arvados_cwl': ['arv-cwl-schema-v1.0.yml', 'arv-cwl-schema-v1.1.yml']},
scripts=[
'bin/cwl-runner',
'bin/arvados-cwl-runner',
# Note that arvados/build/run-build-packages.sh looks at this
# file to determine what version of cwltool and schema-salad to build.
install_requires=[
- 'cwltool==1.0.20181217162649',
- 'schema-salad==3.0.20181129082112',
+ 'cwltool==1.0.20190603140227',
+ 'schema-salad==4.2.20190417121603',
'typing >= 3.6.4',
'ruamel.yaml >=0.15.54, <=0.15.77',
'arvados-python-client>=1.3.0.20190205182514',
shift ; shift
;;
-h|--help)
- echo "$0 [--no-reset-container] [--leave-running] [--config dev|localdemo] [--tag docker_tag] [--build] [--pythoncmd python(2|3)] [--suite (integration|conformance)]"
+ echo "$0 [--no-reset-container] [--leave-running] [--config dev|localdemo] [--tag docker_tag] [--build] [--pythoncmd python(2|3)] [--suite (integration|conformance-v1.0|conformance-v1.1)]"
exit
;;
*)
export ARVBOX_CONTAINER=cwltest
fi
+if test "$suite" = "conformance" ; then
+ suite=conformance-v1.0
+fi
+
if test $reset_container = 1 ; then
arvbox stop
docker rm $ARVBOX_CONTAINER
mkdir -p /tmp/cwltest
cd /tmp/cwltest
-if ! test -d common-workflow-language ; then
- git clone https://github.com/common-workflow-language/common-workflow-language.git
+
+if [[ "$suite" = "conformance-v1.0" ]] ; then
+ if ! test -d common-workflow-language ; then
+ git clone https://github.com/common-workflow-language/common-workflow-language.git
+ fi
+ cd common-workflow-language
+elif [[ "$suite" = "conformance-v1.1" ]] ; then
+ if ! test -d cwl-v1.1 ; then
+ git clone https://github.com/common-workflow-language/cwl-v1.1.git
+ fi
+ cd cwl-v1.1
+fi
+
+if [[ "$suite" != "integration" ]] ; then
+ git pull
fi
-cd common-workflow-language
-git pull
+
export ARVADOS_API_HOST=localhost:8000
export ARVADOS_API_HOST_INSECURE=1
export ARVADOS_API_TOKEN=\$(cat /var/lib/arvados/superuser_token)
-
if test -n "$build" ; then
- /usr/src/arvados/build/build-dev-docker-jobs-image.sh
+ /usr/src/arvados/build/build-dev-docker-jobs-image.sh
elif test "$tag" = "latest" ; then
arv-keepdocker --pull arvados/jobs $tag
else
chmod +x /tmp/cwltest/arv-cwl-containers
env
-if [[ "$suite" = "conformance" ]] ; then
- exec ./run_test.sh RUNNER=/tmp/cwltest/arv-cwl-${runapi} EXTRA=--compute-checksum $@
-elif [[ "$suite" = "integration" ]] ; then
+if [[ "$suite" = "integration" ]] ; then
cd /usr/src/arvados/sdk/cwl/tests
exec ./arvados-tests.sh $@
+else
+ exec ./run_test.sh RUNNER=/tmp/cwltest/arv-cwl-${runapi} EXTRA=--compute-checksum $@
fi
EOF
from schema_salad.ref_resolver import Loader
from schema_salad.sourceline import cmap
-from .matcher import JsonDiffMatcher
+from .matcher import JsonDiffMatcher, StripYAMLComments
from .mock_discovery import get_rootDesc
if not os.getenv('ARVADOS_DEBUG'):
class TestContainer(unittest.TestCase):
def helper(self, runner, enable_reuse=True):
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
"basedir": "",
"make_fs_access": make_fs_access,
"loader": Loader({}),
- "metadata": {"cwlVersion": "v1.0"}})
+ "metadata": {"cwlVersion": "v1.1", "http://commonwl.org/cwltool#original_cwlVersion": "v1.0"}})
runtimeContext = arvados_cwl.context.ArvRuntimeContext(
{"work_api": "containers",
"basedir": "",
runner.api.collections().get().execute.return_value = {
"portable_data_hash": "99999999999999999999999999999993+99"}
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
tool = cmap({
"inputs": [],
self.assertFalse(api.collections().create.called)
self.assertFalse(runner.runtime_status_error.called)
- arvjob.collect_outputs.assert_called_with("keep:abc+123")
+ arvjob.collect_outputs.assert_called_with("keep:abc+123", 0)
arvjob.output_callback.assert_called_with({"out": "stuff"}, "success")
runner.add_intermediate_output.assert_called_with("zzzzz-4zz18-zzzzzzzzzzzzzz2")
"portable_data_hash": "99999999999999999999999999999994+99",
"manifest_text": ". 99999999999999999999999999999994+99 0:0:file1 0:0:file2"}
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
tool = cmap({
"inputs": [
runner.api.collections().get().execute.return_value = {
"portable_data_hash": "99999999999999999999999999999993+99"}
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
tool = cmap({"arguments": ["md5sum", "example.conf"],
"class": "CommandLineTool",
"class": "CommandLineTool",
"hints": [
{
- "class": "http://commonwl.org/cwltool#TimeLimit",
+ "class": "ToolTimeLimit",
"timelimit": 42
}
]
_, kwargs = runner.api.container_requests().create.call_args
self.assertEqual(42, kwargs['body']['scheduling_parameters'].get('max_run_time'))
+
+
+class TestWorkflow(unittest.TestCase):
+ def helper(self, runner, enable_reuse=True):
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
+
+ make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
+ collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
+
+ document_loader.fetcher_constructor = functools.partial(arvados_cwl.CollectionFetcher, api_client=runner.api, fs_access=make_fs_access(""))
+ document_loader.fetcher = document_loader.fetcher_constructor(document_loader.cache, document_loader.session)
+ document_loader.fetch_text = document_loader.fetcher.fetch_text
+ document_loader.check_exists = document_loader.fetcher.check_exists
+
+ loadingContext = arvados_cwl.context.ArvLoadingContext(
+ {"avsc_names": avsc_names,
+ "basedir": "",
+ "make_fs_access": make_fs_access,
+ "loader": document_loader,
+ "metadata": {"cwlVersion": "v1.1", "http://commonwl.org/cwltool#original_cwlVersion": "v1.0"},
+ "construct_tool_object": runner.arv_make_tool})
+ runtimeContext = arvados_cwl.context.ArvRuntimeContext(
+ {"work_api": "containers",
+ "basedir": "",
+ "name": "test_run_wf_"+str(enable_reuse),
+ "make_fs_access": make_fs_access,
+ "tmpdir": "/tmp",
+ "enable_reuse": enable_reuse,
+ "priority": 500})
+
+ return loadingContext, runtimeContext
+
+ # The test passes no builder.resources
+ # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
+ @mock.patch("arvados.collection.CollectionReader")
+ @mock.patch("arvados.collection.Collection")
+ @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
+ def test_run(self, list_images_in_arv, mockcollection, mockcollectionreader):
+ arv_docker_clear_cache()
+ arvados_cwl.add_arv_hints()
+
+ api = mock.MagicMock()
+ api._rootDesc = get_rootDesc()
+
+ runner = arvados_cwl.executor.ArvCwlExecutor(api)
+ self.assertEqual(runner.work_api, 'containers')
+
+ list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
+ runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
+ runner.api.collections().list().execute.return_value = {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
+ "portable_data_hash": "99999999999999999999999999999993+99"}]}
+
+ runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+ runner.ignore_docker_for_reuse = False
+ runner.num_retries = 0
+ runner.secret_store = cwltool.secrets.SecretStore()
+
+ loadingContext, runtimeContext = self.helper(runner)
+ runner.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
+
+ tool, metadata = loadingContext.loader.resolve_ref("tests/wf/scatter2.cwl")
+ metadata["cwlVersion"] = tool["cwlVersion"]
+
+ mockc = mock.MagicMock()
+ mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mockc, *args, **kwargs)
+ mockcollectionreader().find.return_value = arvados.arvfile.ArvadosFile(mock.MagicMock(), "token.txt")
+
+ arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
+ arvtool.formatgraph = None
+ it = arvtool.job({}, mock.MagicMock(), runtimeContext)
+
+ next(it).run(runtimeContext)
+ next(it).run(runtimeContext)
+
+ with open("tests/wf/scatter2_subwf.cwl") as f:
+ subwf = StripYAMLComments(f.read()).rstrip()
+
+ runner.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher({
+ "command": [
+ "cwltool",
+ "--no-container",
+ "--move-outputs",
+ "--preserve-entire-environment",
+ "workflow.cwl#main",
+ "cwl.input.yml"
+ ],
+ "container_image": "99999999999999999999999999999993+99",
+ "cwd": "/var/spool/cwl",
+ "environment": {
+ "HOME": "/var/spool/cwl",
+ "TMPDIR": "/tmp"
+ },
+ "mounts": {
+ "/keep/99999999999999999999999999999999+118": {
+ "kind": "collection",
+ "portable_data_hash": "99999999999999999999999999999999+118"
+ },
+ "/tmp": {
+ "capacity": 1073741824,
+ "kind": "tmp"
+ },
+ "/var/spool/cwl": {
+ "capacity": 1073741824,
+ "kind": "tmp"
+ },
+ "/var/spool/cwl/cwl.input.yml": {
+ "kind": "collection",
+ "path": "cwl.input.yml",
+ "portable_data_hash": "99999999999999999999999999999996+99"
+ },
+ "/var/spool/cwl/workflow.cwl": {
+ "kind": "collection",
+ "path": "workflow.cwl",
+ "portable_data_hash": "99999999999999999999999999999996+99"
+ },
+ "stdout": {
+ "kind": "file",
+ "path": "/var/spool/cwl/cwl.output.json"
+ }
+ },
+ "name": "scatterstep",
+ "output_name": "Output for step scatterstep",
+ "output_path": "/var/spool/cwl",
+ "output_ttl": 0,
+ "priority": 500,
+ "properties": {},
+ "runtime_constraints": {
+ "ram": 1073741824,
+ "vcpus": 1
+ },
+ "scheduling_parameters": {},
+ "secret_mounts": {},
+ "state": "Committed",
+ "use_existing": True
+ }))
+ mockc.open().__enter__().write.assert_has_calls([mock.call(subwf)])
+ mockc.open().__enter__().write.assert_has_calls([mock.call(
+'''{
+ "fileblub": {
+ "basename": "token.txt",
+ "class": "File",
+ "location": "/keep/99999999999999999999999999999999+118/token.txt",
+ "size": 0
+ },
+ "sleeptime": 5
+}''')])
+
+ # The test passes no builder.resources
+ # Hence the default resources will apply: {'cores': 1, 'ram': 1024, 'outdirSize': 1024, 'tmpdirSize': 1024}
+ @mock.patch("arvados.collection.CollectionReader")
+ @mock.patch("arvados.collection.Collection")
+ @mock.patch('arvados.commands.keepdocker.list_images_in_arv')
+ def test_overall_resource_singlecontainer(self, list_images_in_arv, mockcollection, mockcollectionreader):
+ arv_docker_clear_cache()
+ arvados_cwl.add_arv_hints()
+
+ api = mock.MagicMock()
+ api._rootDesc = get_rootDesc()
+
+ runner = arvados_cwl.executor.ArvCwlExecutor(api)
+ self.assertEqual(runner.work_api, 'containers')
+
+ list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
+ runner.api.collections().get().execute.return_value = {"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
+ "portable_data_hash": "99999999999999999999999999999993+99"}
+ runner.api.collections().list().execute.return_value = {"items": [{"uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
+ "portable_data_hash": "99999999999999999999999999999993+99"}]}
+
+ runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
+ runner.ignore_docker_for_reuse = False
+ runner.num_retries = 0
+ runner.secret_store = cwltool.secrets.SecretStore()
+
+ loadingContext, runtimeContext = self.helper(runner)
+ runner.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
+ loadingContext.do_update = True
+ tool, metadata = loadingContext.loader.resolve_ref("tests/wf/echo-wf.cwl")
+
+ mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mock.MagicMock(), *args, **kwargs)
+
+ arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
+ arvtool.formatgraph = None
+ it = arvtool.job({}, mock.MagicMock(), runtimeContext)
+
+ next(it).run(runtimeContext)
+ next(it).run(runtimeContext)
+
+ with open("tests/wf/echo-subwf.cwl") as f:
+ subwf = StripYAMLComments(f.read())
+
+ runner.api.container_requests().create.assert_called_with(
+ body=JsonDiffMatcher({
+ 'output_ttl': 0,
+ 'environment': {'HOME': '/var/spool/cwl', 'TMPDIR': '/tmp'},
+ 'scheduling_parameters': {},
+ 'name': u'echo-subwf',
+ 'secret_mounts': {},
+ 'runtime_constraints': {'API': True, 'vcpus': 3, 'ram': 1073741824},
+ 'properties': {},
+ 'priority': 500,
+ 'mounts': {
+ '/var/spool/cwl/cwl.input.yml': {
+ 'portable_data_hash': '99999999999999999999999999999996+99',
+ 'kind': 'collection',
+ 'path': 'cwl.input.yml'
+ },
+ '/var/spool/cwl/workflow.cwl': {
+ 'portable_data_hash': '99999999999999999999999999999996+99',
+ 'kind': 'collection',
+ 'path': 'workflow.cwl'
+ },
+ 'stdout': {
+ 'path': '/var/spool/cwl/cwl.output.json',
+ 'kind': 'file'
+ },
+ '/tmp': {
+ 'kind': 'tmp',
+ 'capacity': 1073741824
+ }, '/var/spool/cwl': {
+ 'kind': 'tmp',
+ 'capacity': 3221225472
+ }
+ },
+ 'state': 'Committed',
+ 'output_path': '/var/spool/cwl',
+ 'container_image': '99999999999999999999999999999993+99',
+ 'command': [
+ u'cwltool',
+ u'--no-container',
+ u'--move-outputs',
+ u'--preserve-entire-environment',
+ u'workflow.cwl#main',
+ u'cwl.input.yml'
+ ],
+ 'use_existing': True,
+ 'output_name': u'Output for step echo-subwf',
+ 'cwd': '/var/spool/cwl'
+ }))
+
+ def test_default_work_api(self):
+ arvados_cwl.add_arv_hints()
+
+ api = mock.MagicMock()
+ api._rootDesc = copy.deepcopy(get_rootDesc())
+ del api._rootDesc.get('resources')['jobs']['methods']['create']
+ runner = arvados_cwl.executor.ArvCwlExecutor(api)
+ self.assertEqual(runner.work_api, 'containers')
import unittest
import copy
import io
+import argparse
import arvados
import arvados_cwl
from .mock_discovery import get_rootDesc
from .matcher import JsonDiffMatcher, StripYAMLComments
from .test_container import CollectionMock
+from arvados_cwl.arvdocker import arv_docker_clear_cache
if not os.getenv('ARVADOS_DEBUG'):
logging.getLogger('arvados.cwl-runner').setLevel(logging.WARN)
class TestJob(unittest.TestCase):
def helper(self, runner, enable_reuse=True):
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
"basedir": "",
"make_fs_access": make_fs_access,
"loader": Loader({}),
- "metadata": {"cwlVersion": "v1.0"},
+ "metadata": {"cwlVersion": "v1.1", "http://commonwl.org/cwltool#original_cwlVersion": "v1.0"},
"makeTool": runner.arv_make_tool})
runtimeContext = arvados_cwl.context.ArvRuntimeContext(
{"work_api": "jobs",
@mock.patch('arvados.commands.keepdocker.list_images_in_arv')
def test_run(self, list_images_in_arv):
for enable_reuse in (True, False):
+ arv_docker_clear_cache()
runner = mock.MagicMock()
runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
runner.ignore_docker_for_reuse = False
arvados_cwl.add_arv_hints()
list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
- runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
+ runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
tool = {
"inputs": [],
class TestWorkflow(unittest.TestCase):
def helper(self, runner, enable_reuse=True):
- document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.0")
+ document_loader, avsc_names, schema_metadata, metaschema_loader = cwltool.process.get_schema("v1.1")
make_fs_access=functools.partial(arvados_cwl.CollectionFsAccess,
collection_cache=arvados_cwl.CollectionCache(runner.api, None, 0))
"basedir": "",
"make_fs_access": make_fs_access,
"loader": document_loader,
- "metadata": {"cwlVersion": "v1.0"},
+ "metadata": {"cwlVersion": "v1.1", "http://commonwl.org/cwltool#original_cwlVersion": "v1.0"},
"construct_tool_object": runner.arv_make_tool})
runtimeContext = arvados_cwl.context.ArvRuntimeContext(
{"work_api": "jobs",
@mock.patch("arvados.collection.Collection")
@mock.patch('arvados.commands.keepdocker.list_images_in_arv')
def test_run(self, list_images_in_arv, mockcollection, mockcollectionreader):
+ arv_docker_clear_cache()
arvados_cwl.add_arv_hints()
api = mock.MagicMock()
api._rootDesc = get_rootDesc()
- runner = arvados_cwl.executor.ArvCwlExecutor(api)
+ runner = arvados_cwl.executor.ArvCwlExecutor(api, argparse.Namespace(work_api="jobs",
+ output_name=None,
+ output_tags=None,
+ thread_count=1,
+ collection_cache_size=None))
self.assertEqual(runner.work_api, 'jobs')
list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
- runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
- runner.api.collections().list().execute.return_vaulue = {"items": [{"portable_data_hash": "99999999999999999999999999999993+99"}]}
+ runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
+ runner.api.collections().list().execute.return_value = {"items": [{
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
+ "portable_data_hash": "99999999999999999999999999999993+99"}]}
runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
runner.ignore_docker_for_reuse = False
runner.num_retries = 0
loadingContext, runtimeContext = self.helper(runner)
-
+ runner.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
tool, metadata = loadingContext.loader.resolve_ref("tests/wf/scatter2.cwl")
metadata["cwlVersion"] = tool["cwlVersion"]
next(it).run(runtimeContext)
with open("tests/wf/scatter2_subwf.cwl") as f:
- subwf = StripYAMLComments(f.read())
+ subwf = StripYAMLComments(f.read().rstrip())
runner.api.jobs().create.assert_called_with(
body=JsonDiffMatcher({
@mock.patch("arvados.collection.Collection")
@mock.patch('arvados.commands.keepdocker.list_images_in_arv')
def test_overall_resource_singlecontainer(self, list_images_in_arv, mockcollection, mockcollectionreader):
+ arv_docker_clear_cache()
arvados_cwl.add_arv_hints()
api = mock.MagicMock()
api._rootDesc = get_rootDesc()
- runner = arvados_cwl.executor.ArvCwlExecutor(api)
+ runner = arvados_cwl.executor.ArvCwlExecutor(api, argparse.Namespace(work_api="jobs",
+ output_name=None,
+ output_tags=None,
+ thread_count=1,
+ collection_cache_size=None))
self.assertEqual(runner.work_api, 'jobs')
list_images_in_arv.return_value = [["zzzzz-4zz18-zzzzzzzzzzzzzzz"]]
- runner.api.collections().get().execute.return_vaulue = {"portable_data_hash": "99999999999999999999999999999993+99"}
- runner.api.collections().list().execute.return_vaulue = {"items": [{"portable_data_hash": "99999999999999999999999999999993+99"}]}
+ runner.api.collections().get().execute.return_value = {"portable_data_hash": "99999999999999999999999999999993+99"}
+ runner.api.collections().list().execute.return_value = {"items": [{
+ "uuid": "zzzzz-4zz18-zzzzzzzzzzzzzzz",
+ "portable_data_hash": "99999999999999999999999999999993+99"}]}
runner.project_uuid = "zzzzz-8i9sb-zzzzzzzzzzzzzzz"
runner.ignore_docker_for_reuse = False
runner.num_retries = 0
loadingContext, runtimeContext = self.helper(runner)
-
+ loadingContext.do_update = True
+ runner.fs_access = runtimeContext.make_fs_access(runtimeContext.basedir)
tool, metadata = loadingContext.loader.resolve_ref("tests/wf/echo-wf.cwl")
- metadata["cwlVersion"] = tool["cwlVersion"]
mockcollection.side_effect = lambda *args, **kwargs: CollectionMock(mock.MagicMock(), *args, **kwargs)
arvtool = arvados_cwl.ArvadosWorkflow(runner, tool, loadingContext)
arvtool.formatgraph = None
it = arvtool.job({}, mock.MagicMock(), runtimeContext)
-
+
next(it).run(runtimeContext)
next(it).run(runtimeContext)
stubs.api = mock.MagicMock()
stubs.api._rootDesc = get_rootDesc()
+ stubs.api._rootDesc["uuidPrefix"] = "zzzzz"
stubs.api.users().current().execute.return_value = {
"uuid": stubs.fake_user_uuid,
stubs.api.collections().create.assert_has_calls([
mock.call(body=JsonDiffMatcher({
'manifest_text':
- '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
+ '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
'replication_desired': None,
- 'name': 'submit_tool.cwl dependencies (5d373e7629203ce39e7c22af98a0f881+52)',
+ 'name': 'submit_wf.cwl input (169f39d466a5438ac4a90e779bf750c7+53)',
}), ensure_unique_name=False),
mock.call(body=JsonDiffMatcher({
'manifest_text':
- '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
+ '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
'replication_desired': None,
- 'name': 'submit_wf.cwl input (169f39d466a5438ac4a90e779bf750c7+53)',
+ 'name': 'submit_tool.cwl dependencies (5d373e7629203ce39e7c22af98a0f881+52)',
}), ensure_unique_name=False),
mock.call(body=JsonDiffMatcher({
'manifest_text':
def test_submit_runner_ram(self, stubs, tm):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--debug", "--submit-runner-ram=2048",
+ "--api=jobs",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api)
exited = arvados_cwl.main(
["--submit", "--no-wait", "--debug", "--output-name", output_name,
+ "--api=jobs",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api)
def test_submit_pipeline_name(self, stubs, tm):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--debug", "--name=hello job 123",
+ "--api=jobs",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
exited = arvados_cwl.main(
["--submit", "--no-wait", "--debug", "--output-tags", output_tags,
+ "--api=jobs",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
exited = arvados_cwl.main(
["--submit", "--no-wait", "--debug",
"--project-uuid", project_uuid,
+ "--api=jobs",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
sys.stdout, sys.stderr, api_client=stubs.api)
self.assertEqual(exited, 0)
stubs.api.collections().create.assert_has_calls([
mock.call(body=JsonDiffMatcher({
'manifest_text':
- '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
+ '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
'replication_desired': None,
- 'name': 'submit_tool.cwl dependencies (5d373e7629203ce39e7c22af98a0f881+52)',
+ 'name': 'submit_wf.cwl input (169f39d466a5438ac4a90e779bf750c7+53)',
}), ensure_unique_name=False),
mock.call(body=JsonDiffMatcher({
'manifest_text':
- '. 979af1245a12a1fed634d4222473bfdc+16 0:16:blorp.txt\n',
+ '. 5bcc9fe8f8d5992e6cf418dc7ce4dbb3+16 0:16:blub.txt\n',
'replication_desired': None,
- 'name': 'submit_wf.cwl input (169f39d466a5438ac4a90e779bf750c7+53)',
- }), ensure_unique_name=False)])
+ 'name': 'submit_tool.cwl dependencies (5d373e7629203ce39e7c22af98a0f881+52)',
+ }), ensure_unique_name=False),
+ ])
expect_container = copy.deepcopy(stubs.expect_container_spec)
stubs.api.container_requests().create.assert_called_with(
class TestCreateTemplate(unittest.TestCase):
- existing_template_uuid = "zzzzz-d1hrv-validworkfloyml"
+ existing_template_uuid = "zzzzz-p5p6p-validworkfloyml"
def _adjust_script_params(self, expect_component):
expect_component['script_parameters']['x'] = {
@stubs
def test_inputs_empty(self, stubs):
exited = arvados_cwl.main(
- ["--create-template",
+ ["--debug", "--api=jobs", "--create-template",
"tests/wf/inputs_test.cwl", "tests/order/empty_order.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api)
@stubs
def test_inputs(self, stubs):
exited = arvados_cwl.main(
- ["--create-template",
+ ["--api=jobs", "--create-template",
"tests/wf/inputs_test.cwl", "tests/order/inputs_test_order.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api)
"$graph": [
{
"class": "Workflow",
- "cwlVersion": "v1.0",
+ "cwlVersion": "v1.1",
"hints": [],
"id": "#main",
"inputs": [
"run": {
"baseCommand": "sleep",
"class": "CommandLineTool",
- "id": "#main/sleep1/subtool",
+ "id": "#main/sleep1/run/subtool",
"inputs": [
{
- "id": "#main/sleep1/subtool/sleeptime",
+ "id": "#main/sleep1/run/subtool/sleeptime",
"inputBinding": {
"position": 1
},
],
"outputs": [
{
- "id": "#main/sleep1/subtool/out",
+ "id": "#main/sleep1/run/subtool/out",
"outputBinding": {
"outputEval": "out"
},
]
}
],
- "cwlVersion": "v1.0"
-}
\ No newline at end of file
+ "cwlVersion": "v1.1"
+}
# (This dockerfile file must be located in the arvados/sdk/ directory because
# of the docker build root.)
-FROM debian:jessie
+FROM debian:9
MAINTAINER Ward Vandewege <ward@curoverse.com>
ENV DEBIAN_FRONTEND noninteractive
ARG pythoncmd=python
+ARG pipcmd=pip
RUN apt-get update -q && apt-get install -qy --no-install-recommends \
git ${pythoncmd}-pip ${pythoncmd}-virtualenv ${pythoncmd}-dev libcurl4-gnutls-dev \
libgnutls28-dev nodejs ${pythoncmd}-pyasn1-modules build-essential
-RUN if [ "$pythoncmd" = "python3" ]; then \
- pip3 install -U setuptools six requests ; \
- else \
- pip install -U setuptools six requests ; \
- fi
+RUN $pipcmd install -U setuptools six requests
ARG sdk
ARG runner
ADD cwl/cwltool_dist/$cwltool /tmp/
ADD cwl/dist/$runner /tmp/
-RUN cd /tmp/arvados-python-client-* && $pythoncmd setup.py install
-RUN if test -d /tmp/schema-salad-* ; then cd /tmp/schema-salad-* && $pythoncmd setup.py install ; fi
-RUN if test -d /tmp/cwltool-* ; then cd /tmp/cwltool-* && $pythoncmd setup.py install ; fi
-RUN cd /tmp/arvados-cwl-runner-* && $pythoncmd setup.py install
+RUN cd /tmp/arvados-python-client-* && $pipcmd install .
+RUN if test -d /tmp/schema-salad-* ; then cd /tmp/schema-salad-* && $pipcmd install . ; fi
+RUN if test -d /tmp/cwltool-* ; then cd /tmp/cwltool-* && $pipcmd install networkx==2.2 && $pipcmd install . ; fi
+RUN cd /tmp/arvados-cwl-runner-* && $pipcmd install .
# Install dependencies and set up system.
RUN /usr/sbin/adduser --disabled-password \
"errors"
"fmt"
"net/url"
- "os"
"git.curoverse.com/arvados.git/sdk/go/config"
)
type API struct {
MaxItemsPerResponse int
MaxRequestAmplification int
+ RequestTimeout Duration
}
type Cluster struct {
- ClusterID string `json:"-"`
- ManagementToken string
- SystemRootToken string
- Services Services
- NodeProfiles map[string]NodeProfile
- InstanceTypes InstanceTypeMap
- CloudVMs CloudVMs
- Dispatch Dispatch
- HTTPRequestTimeout Duration
- RemoteClusters map[string]RemoteCluster
- PostgreSQL PostgreSQL
- API API
- Logging Logging
- TLS TLS
+ ClusterID string `json:"-"`
+ ManagementToken string
+ SystemRootToken string
+ Services Services
+ InstanceTypes InstanceTypeMap
+ Containers ContainersConfig
+ RemoteClusters map[string]RemoteCluster
+ PostgreSQL PostgreSQL
+ API API
+ SystemLogs SystemLogs
+ TLS TLS
}
type Services struct {
}
type Service struct {
- InternalURLs map[URL]ServiceInstance
+ InternalURLs map[URL]ServiceInstance `json:",omitempty"`
ExternalURL URL
}
type ServiceInstance struct{}
-type Logging struct {
- Level string
- Format string
+type SystemLogs struct {
+ LogLevel string
+ Format string
+ MaxRequestLogParamsSize int
}
type PostgreSQL struct {
Preemptible bool
}
-type Dispatch struct {
- // PEM encoded SSH key (RSA, DSA, or ECDSA) able to log in to
- // cloud VMs.
- PrivateKey string
-
- // Max time for workers to come up before abandoning stale
- // locks from previous run
- StaleLockTimeout Duration
-
- // Interval between queue polls
- PollInterval Duration
-
- // Interval between probes to each worker
- ProbeInterval Duration
-
- // Maximum total worker probes per second
- MaxProbesPerSecond int
-
- // Time before repeating SIGTERM when killing a container
- TimeoutSignal Duration
-
- // Time to give up on SIGTERM and write off the worker
- TimeoutTERM Duration
+type ContainersConfig struct {
+ CloudVMs CloudVMsConfig
+ DispatchPrivateKey string
+ StaleLockTimeout Duration
}
-type CloudVMs struct {
- // Shell command that exits zero IFF the VM is fully booted
- // and ready to run containers, e.g., "mount | grep
- // /encrypted-tmp"
- BootProbeCommand string
-
- // Listening port (name or number) of SSH servers on worker
- // VMs
- SSHPort string
+type CloudVMsConfig struct {
+ Enable bool
- SyncInterval Duration
-
- // Maximum idle time before automatic shutdown
- TimeoutIdle Duration
-
- // Maximum booting time before automatic shutdown
- TimeoutBooting Duration
-
- // Maximum time with no successful probes before automatic shutdown
- TimeoutProbe Duration
-
- // Time after shutdown to retry shutdown
- TimeoutShutdown Duration
-
- // Maximum create/destroy-instance operations per second
+ BootProbeCommand string
+ ImageID string
MaxCloudOpsPerSecond int
-
- ImageID string
+ MaxProbesPerSecond int
+ PollInterval Duration
+ ProbeInterval Duration
+ SSHPort string
+ SyncInterval Duration
+ TimeoutBooting Duration
+ TimeoutIdle Duration
+ TimeoutProbe Duration
+ TimeoutShutdown Duration
+ TimeoutSignal Duration
+ TimeoutTERM Duration
+ ResourceTags map[string]string
+ TagKeyPrefix string
Driver string
DriverParameters json.RawMessage
return nil
}
-// GetNodeProfile returns a NodeProfile for the given hostname. An
-// error is returned if the appropriate configuration can't be
-// determined (e.g., this does not appear to be a system node). If
-// node is empty, use the OS-reported hostname.
-func (cc *Cluster) GetNodeProfile(node string) (*NodeProfile, error) {
- if node == "" {
- hostname, err := os.Hostname()
- if err != nil {
- return nil, err
- }
- node = hostname
- }
- if cfg, ok := cc.NodeProfiles[node]; ok {
- return &cfg, nil
- }
- // If node is not listed, but "*" gives a default system node
- // config, use the default config.
- if cfg, ok := cc.NodeProfiles["*"]; ok {
- return &cfg, nil
- }
- return nil, fmt.Errorf("config does not provision host %q as a system node", node)
-}
-
-type NodeProfile struct {
- Controller SystemServiceInstance `json:"arvados-controller"`
- Health SystemServiceInstance `json:"arvados-health"`
- Keepbalance SystemServiceInstance `json:"keep-balance"`
- Keepproxy SystemServiceInstance `json:"keepproxy"`
- Keepstore SystemServiceInstance `json:"keepstore"`
- Keepweb SystemServiceInstance `json:"keep-web"`
- Nodemanager SystemServiceInstance `json:"arvados-node-manager"`
- DispatchCloud SystemServiceInstance `json:"arvados-dispatch-cloud"`
- RailsAPI SystemServiceInstance `json:"arvados-api-server"`
- Websocket SystemServiceInstance `json:"arvados-ws"`
- Workbench SystemServiceInstance `json:"arvados-workbench"`
-}
-
type ServiceName string
const (
ServiceNameRailsAPI ServiceName = "arvados-api-server"
ServiceNameController ServiceName = "arvados-controller"
ServiceNameDispatchCloud ServiceName = "arvados-dispatch-cloud"
+ ServiceNameHealth ServiceName = "arvados-health"
ServiceNameNodemanager ServiceName = "arvados-node-manager"
- ServiceNameWorkbench ServiceName = "arvados-workbench"
+ ServiceNameWorkbench1 ServiceName = "arvados-workbench1"
+ ServiceNameWorkbench2 ServiceName = "arvados-workbench2"
ServiceNameWebsocket ServiceName = "arvados-ws"
ServiceNameKeepbalance ServiceName = "keep-balance"
ServiceNameKeepweb ServiceName = "keep-web"
ServiceNameKeepstore ServiceName = "keepstore"
)
-// ServicePorts returns the configured listening address (or "" if
-// disabled) for each service on the node.
-func (np *NodeProfile) ServicePorts() map[ServiceName]string {
- return map[ServiceName]string{
- ServiceNameRailsAPI: np.RailsAPI.Listen,
- ServiceNameController: np.Controller.Listen,
- ServiceNameDispatchCloud: np.DispatchCloud.Listen,
- ServiceNameNodemanager: np.Nodemanager.Listen,
- ServiceNameWorkbench: np.Workbench.Listen,
- ServiceNameWebsocket: np.Websocket.Listen,
- ServiceNameKeepbalance: np.Keepbalance.Listen,
- ServiceNameKeepweb: np.Keepweb.Listen,
- ServiceNameKeepproxy: np.Keepproxy.Listen,
- ServiceNameKeepstore: np.Keepstore.Listen,
+// Map returns all services as a map, suitable for iterating over all
+// services or looking up a service by name.
+func (svcs Services) Map() map[ServiceName]Service {
+ return map[ServiceName]Service{
+ ServiceNameRailsAPI: svcs.RailsAPI,
+ ServiceNameController: svcs.Controller,
+ ServiceNameDispatchCloud: svcs.DispatchCloud,
+ ServiceNameHealth: svcs.Health,
+ ServiceNameNodemanager: svcs.Nodemanager,
+ ServiceNameWorkbench1: svcs.Workbench1,
+ ServiceNameWorkbench2: svcs.Workbench2,
+ ServiceNameWebsocket: svcs.Websocket,
+ ServiceNameKeepbalance: svcs.Keepbalance,
+ ServiceNameKeepweb: svcs.WebDAV,
+ ServiceNameKeepproxy: svcs.Keepproxy,
+ ServiceNameKeepstore: svcs.Keepstore,
}
}
-type SystemServiceInstance struct {
- Listen string
- TLS bool
- Insecure bool
-}
-
type TLS struct {
Certificate string
Key string
import (
"encoding/json"
"fmt"
+ "strings"
"time"
)
}
// MarshalJSON implements json.Marshaler.
-func (d *Duration) MarshalJSON() ([]byte, error) {
+func (d Duration) MarshalJSON() ([]byte, error) {
return json.Marshal(d.String())
}
-// String implements fmt.Stringer.
+// String returns a format similar to (time.Duration)String() but with
+// "0m" and "0s" removed: e.g., "1h" instead of "1h0m0s".
func (d Duration) String() string {
- return time.Duration(d).String()
+ s := time.Duration(d).String()
+ s = strings.Replace(s, "m0s", "m", 1)
+ s = strings.Replace(s, "h0m", "h", 1)
+ return s
}
// Duration returns a time.Duration.
--- /dev/null
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
+package arvados
+
+import (
+ "encoding/json"
+ "time"
+
+ check "gopkg.in/check.v1"
+)
+
+var _ = check.Suite(&DurationSuite{})
+
+type DurationSuite struct{}
+
+func (s *DurationSuite) TestMarshalJSON(c *check.C) {
+ var d struct {
+ D Duration
+ }
+ err := json.Unmarshal([]byte(`{"D":"1.234s"}`), &d)
+ c.Check(err, check.IsNil)
+ c.Check(d.D, check.Equals, Duration(time.Second+234*time.Millisecond))
+ buf, err := json.Marshal(d)
+ c.Check(string(buf), check.Equals, `{"D":"1.234s"}`)
+
+ for _, trial := range []struct {
+ seconds int
+ out string
+ }{
+ {30, "30s"},
+ {60, "1m"},
+ {120, "2m"},
+ {150, "2m30s"},
+ {3600, "1h"},
+ {7201, "2h1s"},
+ {360600, "100h10m"},
+ {360610, "100h10m10s"},
+ } {
+ buf, err := json.Marshal(Duration(time.Duration(trial.seconds) * time.Second))
+ c.Check(err, check.IsNil)
+ c.Check(string(buf), check.Equals, `"`+trial.out+`"`)
+ }
+}
"testing"
"time"
- "git.curoverse.com/arvados.git/sdk/go/arvadostest"
check "gopkg.in/check.v1"
)
func (s *CollectionFSSuite) SetUpTest(c *check.C) {
s.client = NewClientFromEnv()
- err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+arvadostest.FooAndBarFilesInDirUUID, nil, nil)
+ err := s.client.RequestAndDecode(&s.coll, "GET", "arvados/v1/collections/"+fixtureFooAndBarFilesInDirUUID, nil, nil)
c.Assert(err, check.IsNil)
s.kc = &keepClientStub{
blocks: map[string][]byte{
"path/filepath"
"strings"
- "git.curoverse.com/arvados.git/sdk/go/arvadostest"
check "gopkg.in/check.v1"
)
func (s *SiteFSSuite) TestSlashInName(c *check.C) {
badCollection := Collection{
Name: "bad/collection",
- OwnerUUID: arvadostest.AProjectUUID,
+ OwnerUUID: fixtureAProjectUUID,
}
err := s.client.RequestAndDecode(&badCollection, "POST", "arvados/v1/collections", s.client.UpdateBody(&badCollection), nil)
c.Assert(err, check.IsNil)
badProject := Group{
Name: "bad/project",
GroupClass: "project",
- OwnerUUID: arvadostest.AProjectUUID,
+ OwnerUUID: fixtureAProjectUUID,
}
err = s.client.RequestAndDecode(&badProject, "POST", "arvados/v1/groups", s.client.UpdateBody(&badProject), nil)
c.Assert(err, check.IsNil)
oob := Collection{
Name: "oob",
- OwnerUUID: arvadostest.AProjectUUID,
+ OwnerUUID: fixtureAProjectUUID,
}
err = s.client.RequestAndDecode(&oob, "POST", "arvados/v1/collections", s.client.UpdateBody(&oob), nil)
c.Assert(err, check.IsNil)
"net/http"
"os"
- "git.curoverse.com/arvados.git/sdk/go/arvadostest"
check "gopkg.in/check.v1"
)
+const (
+ // Importing arvadostest would be an import cycle, so these
+ // fixtures are duplicated here [until fs moves to a separate
+ // package].
+ fixtureActiveToken = "3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"
+ fixtureAProjectUUID = "zzzzz-j7d0g-v955i6s2oi1cbso"
+ fixtureFooAndBarFilesInDirUUID = "zzzzz-4zz18-foonbarfilesdir"
+ fixtureFooCollectionName = "zzzzz-4zz18-fy296fx3hot09f7 added sometime"
+ fixtureFooCollectionPDH = "1f4b0bc7583c2a7f9102c395f4ffc5e3+45"
+ fixtureFooCollection = "zzzzz-4zz18-fy296fx3hot09f7"
+ fixtureNonexistentCollection = "zzzzz-4zz18-totallynotexist"
+)
+
var _ = check.Suite(&SiteFSSuite{})
type SiteFSSuite struct {
func (s *SiteFSSuite) SetUpTest(c *check.C) {
s.client = &Client{
APIHost: os.Getenv("ARVADOS_API_HOST"),
- AuthToken: arvadostest.ActiveToken,
+ AuthToken: fixtureActiveToken,
Insecure: true,
}
s.kc = &keepClientStub{
c.Check(err, check.IsNil)
c.Check(len(fis), check.Equals, 0)
- err = s.fs.Mkdir("/by_id/"+arvadostest.FooCollection, 0755)
+ err = s.fs.Mkdir("/by_id/"+fixtureFooCollection, 0755)
c.Check(err, check.Equals, os.ErrExist)
- f, err = s.fs.Open("/by_id/" + arvadostest.NonexistentCollection)
+ f, err = s.fs.Open("/by_id/" + fixtureNonexistentCollection)
c.Assert(err, check.Equals, os.ErrNotExist)
for _, path := range []string{
- arvadostest.FooCollection,
- arvadostest.FooPdh,
- arvadostest.AProjectUUID + "/" + arvadostest.FooCollectionName,
+ fixtureFooCollection,
+ fixtureFooCollectionPDH,
+ fixtureAProjectUUID + "/" + fixtureFooCollectionName,
} {
f, err = s.fs.Open("/by_id/" + path)
c.Assert(err, check.IsNil)
c.Check(names, check.DeepEquals, []string{"foo"})
}
- f, err = s.fs.Open("/by_id/" + arvadostest.AProjectUUID + "/A Subproject/baz_file")
+ f, err = s.fs.Open("/by_id/" + fixtureAProjectUUID + "/A Subproject/baz_file")
c.Assert(err, check.IsNil)
fis, err = f.Readdir(-1)
var names []string
}
c.Check(names, check.DeepEquals, []string{"baz"})
- _, err = s.fs.OpenFile("/by_id/"+arvadostest.NonexistentCollection, os.O_RDWR|os.O_CREATE, 0755)
+ _, err = s.fs.OpenFile("/by_id/"+fixtureNonexistentCollection, os.O_RDWR|os.O_CREATE, 0755)
c.Check(err, check.Equals, ErrInvalidOperation)
- err = s.fs.Rename("/by_id/"+arvadostest.FooCollection, "/by_id/beep")
+ err = s.fs.Rename("/by_id/"+fixtureFooCollection, "/by_id/beep")
c.Check(err, check.Equals, ErrInvalidArgument)
- err = s.fs.Rename("/by_id/"+arvadostest.FooCollection+"/foo", "/by_id/beep")
+ err = s.fs.Rename("/by_id/"+fixtureFooCollection+"/foo", "/by_id/beep")
c.Check(err, check.Equals, ErrInvalidArgument)
_, err = s.fs.Stat("/by_id/beep")
c.Check(err, check.Equals, os.ErrNotExist)
- err = s.fs.Rename("/by_id/"+arvadostest.FooCollection+"/foo", "/by_id/"+arvadostest.FooCollection+"/bar")
+ err = s.fs.Rename("/by_id/"+fixtureFooCollection+"/foo", "/by_id/"+fixtureFooCollection+"/bar")
c.Check(err, check.IsNil)
err = s.fs.Rename("/by_id", "/beep")
FooBarDirCollection = "zzzzz-4zz18-foonbarfilesdir"
WazVersion1Collection = "zzzzz-4zz18-25k12570yk1ver1"
UserAgreementPDH = "b519d9cb706a29fc7ea24dbea2f05851+93"
- FooPdh = "1f4b0bc7583c2a7f9102c395f4ffc5e3+45"
HelloWorldPdh = "55713e6a34081eb03609e7ad5fcad129+62"
AProjectUUID = "zzzzz-j7d0g-v955i6s2oi1cbso"
import (
"net/http"
+ "net/url"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
)
// StubResponse struct with response status and body
resp.Write([]byte(``))
}
}
+
+// SetServiceURL overrides the given service config/discovery with the
+// given internalURLs.
+//
+// ExternalURL is set to the last internalURL, which only aims to
+// address the case where there is only one.
+//
+// SetServiceURL panics on errors.
+func SetServiceURL(service *arvados.Service, internalURLs ...string) {
+ service.InternalURLs = map[arvados.URL]arvados.ServiceInstance{}
+ for _, u := range internalURLs {
+ u, err := url.Parse(u)
+ if err != nil {
+ panic(err)
+ }
+ service.InternalURLs[arvados.URL(*u)] = arvados.ServiceInstance{}
+ service.ExternalURL = arvados.URL(*u)
+ }
+}
"encoding/json"
"errors"
"fmt"
- "net"
"net/http"
+ "net/url"
"sync"
"time"
httpClient *http.Client
timeout arvados.Duration
- Config *arvados.Config
+ Cluster *arvados.Cluster
// If non-nil, Log is called after handling each request.
Log func(*http.Request, error)
}
}
+func (agg *Aggregator) CheckHealth() error {
+ return nil
+}
+
func (agg *Aggregator) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
agg.setupOnce.Do(agg.setup)
sendErr := func(statusCode int, err error) {
resp.Header().Set("Content-Type", "application/json")
- cluster, err := agg.Config.GetCluster("")
- if err != nil {
- err = fmt.Errorf("arvados.GetCluster(): %s", err)
- sendErr(http.StatusInternalServerError, err)
- return
- }
- if !agg.checkAuth(req, cluster) {
+ if !agg.checkAuth(req) {
sendErr(http.StatusUnauthorized, errUnauthorized)
return
}
sendErr(http.StatusNotFound, errNotFound)
return
}
- json.NewEncoder(resp).Encode(agg.ClusterHealth(cluster))
+ json.NewEncoder(resp).Encode(agg.ClusterHealth())
if agg.Log != nil {
agg.Log(req, nil)
}
N int `json:"n"`
}
-func (agg *Aggregator) ClusterHealth(cluster *arvados.Cluster) ClusterHealthResponse {
+func (agg *Aggregator) ClusterHealth() ClusterHealthResponse {
resp := ClusterHealthResponse{
Health: "OK",
Checks: make(map[string]CheckResult),
mtx := sync.Mutex{}
wg := sync.WaitGroup{}
- for profileName, profile := range cluster.NodeProfiles {
- for svc, addr := range profile.ServicePorts() {
- // Ensure svc is listed in resp.Services.
- mtx.Lock()
- if _, ok := resp.Services[svc]; !ok {
- resp.Services[svc] = ServiceHealth{Health: "ERROR"}
- }
- mtx.Unlock()
-
- if addr == "" {
- // svc is not expected on this node.
- continue
- }
+ for svcName, svc := range agg.Cluster.Services.Map() {
+ // Ensure svc is listed in resp.Services.
+ mtx.Lock()
+ if _, ok := resp.Services[svcName]; !ok {
+ resp.Services[svcName] = ServiceHealth{Health: "ERROR"}
+ }
+ mtx.Unlock()
+ for addr := range svc.InternalURLs {
wg.Add(1)
- go func(profileName string, svc arvados.ServiceName, addr string) {
+ go func(svcName arvados.ServiceName, addr arvados.URL) {
defer wg.Done()
var result CheckResult
- url, err := agg.pingURL(profileName, addr)
+ pingURL, err := agg.pingURL(addr)
if err != nil {
result = CheckResult{
Health: "ERROR",
Error: err.Error(),
}
} else {
- result = agg.ping(url, cluster)
+ result = agg.ping(pingURL)
}
mtx.Lock()
defer mtx.Unlock()
- resp.Checks[fmt.Sprintf("%s+%s", svc, url)] = result
+ resp.Checks[fmt.Sprintf("%s+%s", svcName, pingURL)] = result
if result.Health == "OK" {
- h := resp.Services[svc]
+ h := resp.Services[svcName]
h.N++
h.Health = "OK"
- resp.Services[svc] = h
+ resp.Services[svcName] = h
} else {
resp.Health = "ERROR"
}
- }(profileName, svc, addr)
+ }(svcName, addr)
}
}
wg.Wait()
return resp
}
-func (agg *Aggregator) pingURL(node, addr string) (string, error) {
- _, port, err := net.SplitHostPort(addr)
- return "http://" + node + ":" + port + "/_health/ping", err
+func (agg *Aggregator) pingURL(svcURL arvados.URL) (*url.URL, error) {
+ base := url.URL(svcURL)
+ return base.Parse("/_health/ping")
}
-func (agg *Aggregator) ping(url string, cluster *arvados.Cluster) (result CheckResult) {
+func (agg *Aggregator) ping(target *url.URL) (result CheckResult) {
t0 := time.Now()
var err error
}
}()
- req, err := http.NewRequest("GET", url, nil)
+ req, err := http.NewRequest("GET", target.String(), nil)
if err != nil {
return
}
- req.Header.Set("Authorization", "Bearer "+cluster.ManagementToken)
+ req.Header.Set("Authorization", "Bearer "+agg.Cluster.ManagementToken)
ctx, cancel := context.WithTimeout(req.Context(), time.Duration(agg.timeout))
defer cancel()
return
}
-func (agg *Aggregator) checkAuth(req *http.Request, cluster *arvados.Cluster) bool {
+func (agg *Aggregator) checkAuth(req *http.Request) bool {
creds := auth.CredentialsFromRequest(req)
for _, token := range creds.Tokens {
- if token != "" && token == cluster.ManagementToken {
+ if token != "" && token == agg.Cluster.ManagementToken {
return true
}
}
}
func (s *AggregatorSuite) SetUpTest(c *check.C) {
- s.handler = &Aggregator{Config: &arvados.Config{
- Clusters: map[string]arvados.Cluster{
- "zzzzz": {
- ManagementToken: arvadostest.ManagementToken,
- NodeProfiles: map[string]arvados.NodeProfile{},
- },
- },
+ s.handler = &Aggregator{Cluster: &arvados.Cluster{
+ ManagementToken: arvadostest.ManagementToken,
}}
s.req = httptest.NewRequest("GET", "/_health/all", nil)
s.req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
c.Check(s.resp.Code, check.Equals, http.StatusUnauthorized)
}
-func (s *AggregatorSuite) TestEmptyConfig(c *check.C) {
+func (s *AggregatorSuite) TestNoServicesConfigured(c *check.C) {
s.handler.ServeHTTP(s.resp, s.req)
- s.checkOK(c)
+ s.checkUnhealthy(c)
}
func (s *AggregatorSuite) stubServer(handler http.Handler) (*httptest.Server, string) {
return srv, ":" + port
}
-type unhealthyHandler struct{}
-
-func (*unhealthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
- if req.URL.Path == "/_health/ping" {
- resp.Write([]byte(`{"health":"ERROR","error":"the bends"}`))
- } else {
- http.Error(resp, "not found", http.StatusNotFound)
- }
-}
-
func (s *AggregatorSuite) TestUnhealthy(c *check.C) {
srv, listen := s.stubServer(&unhealthyHandler{})
defer srv.Close()
- s.handler.Config.Clusters["zzzzz"].NodeProfiles["localhost"] = arvados.NodeProfile{
- Keepstore: arvados.SystemServiceInstance{Listen: listen},
- }
+ arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, "http://localhost"+listen+"/")
s.handler.ServeHTTP(s.resp, s.req)
s.checkUnhealthy(c)
}
-type healthyHandler struct{}
-
-func (*healthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
- if req.URL.Path == "/_health/ping" {
- resp.Write([]byte(`{"health":"OK"}`))
- } else {
- http.Error(resp, "not found", http.StatusNotFound)
- }
-}
-
func (s *AggregatorSuite) TestHealthy(c *check.C) {
srv, listen := s.stubServer(&healthyHandler{})
defer srv.Close()
- s.handler.Config.Clusters["zzzzz"].NodeProfiles["localhost"] = arvados.NodeProfile{
- Controller: arvados.SystemServiceInstance{Listen: listen},
- DispatchCloud: arvados.SystemServiceInstance{Listen: listen},
- Keepbalance: arvados.SystemServiceInstance{Listen: listen},
- Keepproxy: arvados.SystemServiceInstance{Listen: listen},
- Keepstore: arvados.SystemServiceInstance{Listen: listen},
- Keepweb: arvados.SystemServiceInstance{Listen: listen},
- Nodemanager: arvados.SystemServiceInstance{Listen: listen},
- RailsAPI: arvados.SystemServiceInstance{Listen: listen},
- Websocket: arvados.SystemServiceInstance{Listen: listen},
- Workbench: arvados.SystemServiceInstance{Listen: listen},
- }
+ s.setAllServiceURLs(listen)
s.handler.ServeHTTP(s.resp, s.req)
resp := s.checkOK(c)
svc := "keepstore+http://localhost" + listen + "/_health/ping"
defer srvH.Close()
srvU, listenU := s.stubServer(&unhealthyHandler{})
defer srvU.Close()
- s.handler.Config.Clusters["zzzzz"].NodeProfiles["localhost"] = arvados.NodeProfile{
- Controller: arvados.SystemServiceInstance{Listen: listenH},
- DispatchCloud: arvados.SystemServiceInstance{Listen: listenH},
- Keepbalance: arvados.SystemServiceInstance{Listen: listenH},
- Keepproxy: arvados.SystemServiceInstance{Listen: listenH},
- Keepstore: arvados.SystemServiceInstance{Listen: listenH},
- Keepweb: arvados.SystemServiceInstance{Listen: listenH},
- Nodemanager: arvados.SystemServiceInstance{Listen: listenH},
- RailsAPI: arvados.SystemServiceInstance{Listen: listenH},
- Websocket: arvados.SystemServiceInstance{Listen: listenH},
- Workbench: arvados.SystemServiceInstance{Listen: listenH},
- }
- s.handler.Config.Clusters["zzzzz"].NodeProfiles["127.0.0.1"] = arvados.NodeProfile{
- Keepstore: arvados.SystemServiceInstance{Listen: listenU},
- }
+ s.setAllServiceURLs(listenH)
+ arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, "http://localhost"+listenH+"/", "http://127.0.0.1"+listenU+"/")
s.handler.ServeHTTP(s.resp, s.req)
resp := s.checkUnhealthy(c)
ep := resp.Checks["keepstore+http://localhost"+listenH+"/_health/ping"]
c.Logf("%#v", ep)
}
+func (s *AggregatorSuite) TestPingTimeout(c *check.C) {
+ s.handler.timeout = arvados.Duration(100 * time.Millisecond)
+ srv, listen := s.stubServer(&slowHandler{})
+ defer srv.Close()
+ arvadostest.SetServiceURL(&s.handler.Cluster.Services.Keepstore, "http://localhost"+listen+"/")
+ s.handler.ServeHTTP(s.resp, s.req)
+ resp := s.checkUnhealthy(c)
+ ep := resp.Checks["keepstore+http://localhost"+listen+"/_health/ping"]
+ c.Check(ep.Health, check.Equals, "ERROR")
+ c.Check(ep.HTTPStatusCode, check.Equals, 0)
+ rt, err := ep.ResponseTime.Float64()
+ c.Check(err, check.IsNil)
+ c.Check(rt > 0.005, check.Equals, true)
+}
+
func (s *AggregatorSuite) checkError(c *check.C) {
c.Check(s.resp.Code, check.Not(check.Equals), http.StatusOK)
var resp ClusterHealthResponse
- err := json.NewDecoder(s.resp.Body).Decode(&resp)
+ err := json.Unmarshal(s.resp.Body.Bytes(), &resp)
c.Check(err, check.IsNil)
c.Check(resp.Health, check.Not(check.Equals), "OK")
}
func (s *AggregatorSuite) checkResult(c *check.C, health string) ClusterHealthResponse {
c.Check(s.resp.Code, check.Equals, http.StatusOK)
var resp ClusterHealthResponse
- err := json.NewDecoder(s.resp.Body).Decode(&resp)
+ c.Log(s.resp.Body.String())
+ err := json.Unmarshal(s.resp.Body.Bytes(), &resp)
c.Check(err, check.IsNil)
c.Check(resp.Health, check.Equals, health)
return resp
}
-type slowHandler struct{}
+func (s *AggregatorSuite) setAllServiceURLs(listen string) {
+ svcs := &s.handler.Cluster.Services
+ for _, svc := range []*arvados.Service{
+ &svcs.Controller,
+ &svcs.DispatchCloud,
+ &svcs.Keepbalance,
+ &svcs.Keepproxy,
+ &svcs.Keepstore,
+ &svcs.Health,
+ &svcs.Nodemanager,
+ &svcs.RailsAPI,
+ &svcs.WebDAV,
+ &svcs.Websocket,
+ &svcs.Workbench1,
+ &svcs.Workbench2,
+ } {
+ arvadostest.SetServiceURL(svc, "http://localhost"+listen+"/")
+ }
+}
-func (*slowHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+type unhealthyHandler struct{}
+
+func (*unhealthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ if req.URL.Path == "/_health/ping" {
+ resp.Write([]byte(`{"health":"ERROR","error":"the bends"}`))
+ } else {
+ http.Error(resp, "not found", http.StatusNotFound)
+ }
+}
+
+type healthyHandler struct{}
+
+func (*healthyHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
if req.URL.Path == "/_health/ping" {
- time.Sleep(3 * time.Second)
resp.Write([]byte(`{"health":"OK"}`))
} else {
http.Error(resp, "not found", http.StatusNotFound)
}
}
-func (s *AggregatorSuite) TestPingTimeout(c *check.C) {
- s.handler.timeout = arvados.Duration(100 * time.Millisecond)
- srv, listen := s.stubServer(&slowHandler{})
- defer srv.Close()
- s.handler.Config.Clusters["zzzzz"].NodeProfiles["localhost"] = arvados.NodeProfile{
- Keepstore: arvados.SystemServiceInstance{Listen: listen},
+type slowHandler struct{}
+
+func (*slowHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ if req.URL.Path == "/_health/ping" {
+ time.Sleep(3 * time.Second)
+ resp.Write([]byte(`{"health":"OK"}`))
+ } else {
+ http.Error(resp, "not found", http.StatusNotFound)
}
- s.handler.ServeHTTP(s.resp, s.req)
- resp := s.checkUnhealthy(c)
- ep := resp.Checks["keepstore+http://localhost"+listen+"/_health/ping"]
- c.Check(ep.Health, check.Equals, "ERROR")
- c.Check(ep.HTTPStatusCode, check.Equals, 0)
- rt, err := ep.ResponseTime.Float64()
- c.Check(err, check.IsNil)
- c.Check(rt > 0.005, check.Equals, true)
}
try:
username = pamh.get_user(None)
- except pamh.exception, e:
+ except pamh.exception as e:
return e.pam_result
if not username:
Clusters:
zzzzz:
ManagementToken: e687950a23c3a9bceec28c6223a06c79
- HTTPRequestTimeout: 30s
+ API:
+ RequestTimeout: 30s
PostgreSQL:
ConnectionPool: 32
Connection:
- host: {}
- dbname: {}
- user: {}
- password: {}
- NodeProfiles:
- "*":
- "arvados-controller":
- Listen: ":{}"
- "arvados-api-server":
- Listen: ":{}"
- TLS: true
- Insecure: true
+ host: {dbhost}
+ dbname: {dbname}
+ user: {dbuser}
+ password: {dbpass}
+ TLS:
+ Insecure: true
+ Services:
+ Controller:
+ InternalURLs:
+ "http://localhost:{controllerport}": {{}}
+ RailsAPI:
+ InternalURLs:
+ "https://localhost:{railsport}": {{}}
""".format(
- _dbconfig('host'),
- _dbconfig('database'),
- _dbconfig('username'),
- _dbconfig('password'),
- port,
- rails_api_port,
+ dbhost=_dbconfig('host'),
+ dbname=_dbconfig('database'),
+ dbuser=_dbconfig('username'),
+ dbpass=_dbconfig('password'),
+ controllerport=port,
+ railsport=rails_api_port,
))
logf = open(_logfilename('controller'), 'a')
controller = subprocess.Popen(
before_action :require_auth_scope, except: ERROR_ACTIONS
before_action :catch_redirect_hint
+ before_action :load_required_parameters
before_action(:find_object_by_uuid,
except: [:index, :create] + ERROR_ACTIONS)
- before_action :load_required_parameters
before_action :load_limit_offset_order_params, only: [:index, :contents]
before_action :load_where_param, only: [:index, :contents]
before_action :load_filters_param, only: [:index, :contents]
protected
+ def bool_param(pname)
+ if params.include?(pname)
+ if params[pname].is_a?(Boolean)
+ return params[pname]
+ else
+ logger.warn "Warning: received non-boolean parameter '#{pname}' on #{self.class.inspect}."
+ end
+ end
+ false
+ end
+
def send_error(*args)
if args.last.is_a? Hash
err = args.pop
def find_objects_for_index
@objects ||= model_class.readable_by(*@read_users, {
- :include_trash => (params[:include_trash] || 'untrash' == action_name),
- :include_old_versions => params[:include_old_versions]
+ :include_trash => (bool_param(:include_trash) || 'untrash' == action_name),
+ :include_old_versions => bool_param(:include_old_versions)
})
apply_where_limit_order_params
end
})
end
+ def self._show_requires_parameters
+ (super rescue {}).
+ merge({
+ include_trash: {
+ type: 'boolean', required: false, description: "Show collection even if its is_trashed attribute is true."
+ },
+ include_old_versions: {
+ type: 'boolean', required: false, description: "Include past collection versions."
+ },
+ })
+ end
+
def create
if resource_attrs[:uuid] and (loc = Keep::Locator.parse(resource_attrs[:uuid]))
resource_attrs[:portable_data_hash] = loc.to_s
accept_attribute_as_json :filters, Array
accept_attribute_as_json :scheduling_parameters, Hash
accept_attribute_as_json :secret_mounts, Hash
+
+ def self._index_requires_parameters
+ (super rescue {}).
+ merge({
+ include_trash: {
+ type: 'boolean', required: false, description: "Include container requests whose owner project is trashed."
+ },
+ })
+ end
+
+ def self._show_requires_parameters
+ (super rescue {}).
+ merge({
+ include_trash: {
+ type: 'boolean', required: false, description: "Show container request even if its owner project is trashed."
+ },
+ })
+ end
end
})
end
+ def self._show_requires_parameters
+ (super rescue {}).
+ merge({
+ include_trash: {
+ type: 'boolean', required: false, description: "Show group/project even if its is_trashed attribute is true."
+ },
+ })
+ end
+
def self._contents_requires_parameters
params = _index_requires_parameters.
merge({
after_create :log_create
after_update :log_update
after_destroy :log_destroy
- after_find :convert_serialized_symbols_to_strings
before_validation :normalize_collection_uuids
before_validation :set_default_owner
validate :ensure_valid_uuids
false
end
- def self.has_symbols? x
- if x.is_a? Hash
- x.each do |k,v|
- return true if has_symbols?(k) or has_symbols?(v)
- end
- elsif x.is_a? Array
- x.each do |k|
- return true if has_symbols?(k)
- end
- elsif x.is_a? Symbol
- return true
- elsif x.is_a? String
- return true if x.start_with?(':') && !x.start_with?('::')
- end
- false
- end
-
- def self.recursive_stringify x
- if x.is_a? Hash
- Hash[x.collect do |k,v|
- [recursive_stringify(k), recursive_stringify(v)]
- end]
- elsif x.is_a? Array
- x.collect do |k|
- recursive_stringify k
- end
- elsif x.is_a? Symbol
- x.to_s
- elsif x.is_a? String and x.start_with?(':') and !x.start_with?('::')
- x[1..-1]
- else
- x
- end
- end
-
def self.where_serialized(colname, value, md5: false)
colsql = colname.to_s
if md5
self.class.serialized_attributes
end
- def convert_serialized_symbols_to_strings
- # ensure_serialized_attribute_type should prevent symbols from
- # getting into the database in the first place. If someone managed
- # to get them into the database (perhaps using an older version)
- # we'll convert symbols to strings when loading from the
- # database. (Otherwise, loading and saving an object with existing
- # symbols in a serialized field will crash.)
- jsonb_cols = self.class.columns.select{|c| c.type == :jsonb}.collect{|j| j.name}
- (jsonb_cols + self.class.serialized_attributes.keys).uniq.each do |colname|
- if self.class.has_symbols? attributes[colname]
- attributes[colname] = self.class.recursive_stringify attributes[colname]
- send(colname + '=',
- self.class.recursive_stringify(attributes[colname]))
- end
- end
- end
-
def foreign_key_attributes
attributes.keys.select { |a| a.match(/_uuid$/) }
end
if self.prefs_changed?
if self.prefs_was.andand.empty? || !self.prefs_was.andand['profile']
profile_notification_address = Rails.configuration.Users.UserProfileNotificationAddress
- ProfileNotifier.profile_created(self, profile_notification_address).deliver_now if profile_notification_address
+ ProfileNotifier.profile_created(self, profile_notification_address).deliver_now if profile_notification_address and !profile_notification_address.empty?
end
end
end
raise "Missing #{::Rails.root.to_s}/config/config.default.yml"
end
+def remove_sample_entries(h)
+ return unless h.is_a? Hash
+ h.delete("SAMPLE")
+ h.each { |k, v| remove_sample_entries(v) }
+end
+remove_sample_entries($arvados_config_defaults)
+
clusterID, clusterConfig = $arvados_config_defaults["Clusters"].first
$arvados_config_defaults = clusterConfig
$arvados_config_defaults["ClusterID"] = clusterID
#
# SPDX-License-Identifier: AGPL-3.0
-require "arvados/keep"
-require "group_pdhs"
-
class AddFileInfoToCollection < ActiveRecord::Migration[4.2]
- def do_batch(pdhs)
- pdhs_str = ''
- pdhs.each do |pdh|
- pdhs_str << "'" << pdh << "'" << ","
- end
-
- collections = ActiveRecord::Base.connection.exec_query(
- "SELECT DISTINCT portable_data_hash, manifest_text FROM collections "\
- "WHERE portable_data_hash IN (#{pdhs_str[0..-2]}) "
- )
-
- collections.rows.each do |row|
- manifest = Keep::Manifest.new(row[1])
- ActiveRecord::Base.connection.exec_query("BEGIN")
- ActiveRecord::Base.connection.exec_query("UPDATE collections SET file_count=#{manifest.files_count}, "\
- "file_size_total=#{manifest.files_size} "\
- "WHERE portable_data_hash='#{row[0]}'")
- ActiveRecord::Base.connection.exec_query("COMMIT")
- end
- end
-
def up
add_column :collections, :file_count, :integer, default: 0, null: false
add_column :collections, :file_size_total, :integer, limit: 8, default: 0, null: false
- distinct_pdh_count = ActiveRecord::Base.connection.exec_query(
- "SELECT DISTINCT portable_data_hash FROM collections"
- ).rows.count
-
- # Generator that queries for all the distinct pdhs greater than last_pdh
- ordered_pdh_query = lambda { |last_pdh, &block|
- pdhs = ActiveRecord::Base.connection.exec_query(
- "SELECT DISTINCT portable_data_hash FROM collections "\
- "WHERE portable_data_hash > '#{last_pdh}' "\
- "ORDER BY portable_data_hash LIMIT 1000"
- )
- pdhs.rows.each do |row|
- block.call(row[0])
- end
- }
-
- batch_size_max = 1 << 28 # 256 MiB
- GroupPdhs.group_pdhs_for_multiple_transactions(ordered_pdh_query,
- distinct_pdh_count,
- batch_size_max,
- "AddFileInfoToCollection") do |pdhs|
- do_batch(pdhs)
- end
+ puts "Collections now have two new columns, file_count and file_size_total."
+ puts "They were initialized with a zero value. If you are upgrading an Arvados"
+ puts "installation, please run the populate-file-info-columns-in-collections.rb"
+ puts "script to populate the columns. If this is a new installation, that is not"
+ puts "necessary."
end
def down
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+require 'current_api_client'
+
+include CurrentApiClient
+
+def has_symbols? x
+ if x.is_a? Hash
+ x.each do |k,v|
+ return true if has_symbols?(k) or has_symbols?(v)
+ end
+ elsif x.is_a? Array
+ x.each do |k|
+ return true if has_symbols?(k)
+ end
+ elsif x.is_a? Symbol
+ return true
+ elsif x.is_a? String
+ return true if x.start_with?(':') && !x.start_with?('::')
+ end
+ false
+end
+
+def check_for_serialized_symbols rec
+ jsonb_cols = rec.class.columns.select{|c| c.type == :jsonb}.collect{|j| j.name}
+ (jsonb_cols + rec.class.serialized_attributes.keys).uniq.each do |colname|
+ if has_symbols? rec.attributes[colname]
+ st = recursive_stringify rec.attributes[colname]
+ puts "Found value potentially containing Ruby symbols in #{colname} attribute of #{rec.uuid}, current value is\n#{rec.attributes[colname].to_s[0..1024]}\nrake symbols:stringify will update it to:\n#{st.to_s[0..1024]}\n\n"
+ end
+ end
+end
+
+def recursive_stringify x
+ if x.is_a? Hash
+ Hash[x.collect do |k,v|
+ [recursive_stringify(k), recursive_stringify(v)]
+ end]
+ elsif x.is_a? Array
+ x.collect do |k|
+ recursive_stringify k
+ end
+ elsif x.is_a? Symbol
+ x.to_s
+ elsif x.is_a? String and x.start_with?(':') and !x.start_with?('::')
+ x[1..-1]
+ else
+ x
+ end
+end
+
+def stringify_serialized_symbols rec
+ # ensure_serialized_attribute_type should prevent symbols from
+ # getting into the database in the first place. If someone managed
+ # to get them into the database (perhaps using an older version)
+ # we'll convert symbols to strings when loading from the
+ # database. (Otherwise, loading and saving an object with existing
+ # symbols in a serialized field will crash.)
+ jsonb_cols = rec.class.columns.select{|c| c.type == :jsonb}.collect{|j| j.name}
+ (jsonb_cols + rec.class.serialized_attributes.keys).uniq.each do |colname|
+ if has_symbols? rec.attributes[colname]
+ begin
+ st = recursive_stringify rec.attributes[colname]
+ puts "Updating #{colname} attribute of #{rec.uuid} from\n#{rec.attributes[colname].to_s[0..1024]}\nto\n#{st.to_s[0..1024]}\n\n"
+ rec.write_attribute(colname, st)
+ rec.save!
+ rescue => e
+ puts "Failed to update #{rec.uuid}: #{e}"
+ end
+ end
+ end
+end
+
+namespace :symbols do
+ desc 'Warn about serialized values starting with ":" that may be symbols'
+ task check: :environment do
+ [ApiClientAuthorization, ApiClient,
+ AuthorizedKey, Collection,
+ Container, ContainerRequest, Group,
+ Human, Job, JobTask, KeepDisk, KeepService, Link,
+ Node, PipelineInstance, PipelineTemplate,
+ Repository, Specimen, Trait, User, VirtualMachine,
+ Workflow].each do |klass|
+ act_as_system_user do
+ klass.all.each do |c|
+ check_for_serialized_symbols c
+ end
+ end
+ end
+ end
+
+ task stringify: :environment do
+ [ApiClientAuthorization, ApiClient,
+ AuthorizedKey, Collection,
+ Container, ContainerRequest, Group,
+ Human, Job, JobTask, KeepDisk, KeepService, Link,
+ Node, PipelineInstance, PipelineTemplate,
+ Repository, Specimen, Trait, User, VirtualMachine,
+ Workflow].each do |klass|
+ act_as_system_user do
+ klass.all.each do |c|
+ stringify_serialized_symbols c
+ end
+ end
+ end
+ end
+end
--- /dev/null
+#!/usr/bin/env ruby
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+# Arvados version 1.4.0 introduces two new columns on the collections table named
+# file_count
+# file_size_total
+#
+# The database migration that adds these columns does not populate them with data,
+# it initializes them set to zero.
+#
+# This script will populate the columns, if file_count is zero. It will ignore
+# collections that have invalid manifests, but it will spit out details for those
+# collections.
+#
+# Run the script as
+#
+# cd scripts
+# RAILS_ENV=production bundle exec populate-file-info-columns-in-collections.rb
+#
+
+ENV["RAILS_ENV"] = ARGV[0] || ENV["RAILS_ENV"] || "development"
+require File.dirname(__FILE__) + '/../config/boot'
+require File.dirname(__FILE__) + '/../config/environment'
+
+require "arvados/keep"
+require "group_pdhs"
+
+ def do_batch(pdhs)
+ pdhs_str = ''
+ pdhs.each do |pdh|
+ pdhs_str << "'" << pdh << "'" << ","
+ end
+
+ collections = ActiveRecord::Base.connection.exec_query(
+ "SELECT DISTINCT portable_data_hash, manifest_text FROM collections "\
+ "WHERE portable_data_hash IN (#{pdhs_str[0..-2]}) "
+ )
+ collections.rows.each do |row|
+ begin
+ manifest = Keep::Manifest.new(row[1])
+ ActiveRecord::Base.connection.exec_query("BEGIN")
+ ActiveRecord::Base.connection.exec_query("UPDATE collections SET file_count=#{manifest.files_count}, "\
+ "file_size_total=#{manifest.files_size} "\
+ "WHERE portable_data_hash='#{row[0]}'")
+ ActiveRecord::Base.connection.exec_query("COMMIT")
+ rescue ArgumentError => detail
+ require 'pp'
+ puts
+ puts "*************** Row detail ***************"
+ puts
+ pp row
+ puts
+ puts "************ Collection detail ***********"
+ puts
+ pp Collection.find_by_portable_data_hash(row[0])
+ puts
+ puts "************** Error detail **************"
+ puts
+ pp detail
+ puts
+ puts "Skipping this collection, continuing!"
+ next
+ end
+ end
+ end
+
+
+def main
+
+ distinct_pdh_count = ActiveRecord::Base.connection.exec_query(
+ "SELECT DISTINCT portable_data_hash FROM collections where file_count=0"
+ ).rows.count
+
+ # Generator that queries for all the distinct pdhs greater than last_pdh
+ ordered_pdh_query = lambda { |last_pdh, &block|
+ pdhs = ActiveRecord::Base.connection.exec_query(
+ "SELECT DISTINCT portable_data_hash FROM collections "\
+ "WHERE file_count=0 and portable_data_hash > '#{last_pdh}' "\
+ "ORDER BY portable_data_hash LIMIT 1000"
+ )
+ pdhs.rows.each do |row|
+ block.call(row[0])
+ end
+ }
+
+ batch_size_max = 1 << 28 # 256 MiB
+ GroupPdhs.group_pdhs_for_multiple_transactions(ordered_pdh_query,
+ distinct_pdh_count,
+ batch_size_max,
+ "AddFileInfoToCollection") do |pdhs|
+ do_batch(pdhs)
+ end
+end
+
+main
head_uuid: zzzzz-4zz18-pyw8yp9g3pr7irn
properties: {}
+#
+# This fixture was used in the test "Stringify symbols coming from
+# serialized attribute in database" which tested the hook
+# "convert_serialized_symbols_to_strings". That hook (and the
+# corresponding test) was removed in #15311. This fixture remains to
+# facilitate manual testing of the "rake symbols:check" and "rake
+# symbols:stringify" tasks that we added to assist with database
+# fixup.
+#
has_symbol_keys_in_database_somehow:
uuid: zzzzz-o0j2j-enl1wg58310loc6
owner_uuid: zzzzz-tpzed-000000000000000
end
end
- test 'get trashed collection with include_trash' do
- uuid = 'zzzzz-4zz18-mto52zx1s7sn3ih' # expired_collection
- authorize_with :active
- get :show, params: {
- id: uuid,
- include_trash: true,
- }
- assert_response 200
+ [true, false].each do |include_trash|
+ test "get trashed collection with include_trash=#{include_trash}" do
+ uuid = 'zzzzz-4zz18-mto52zx1s7sn3ih' # expired_collection
+ authorize_with :active
+ get :show, params: {
+ id: uuid,
+ include_trash: include_trash,
+ }
+ if include_trash
+ assert_response 200
+ else
+ assert_response 404
+ end
+ end
end
[:admin, :active].each do |user|
end
end
+ test "show include_trash=false #{project} #{untrash} as #{auth}" do
+ authorize_with auth
+ untrash.each do |pr|
+ Group.find_by_uuid(groups(pr).uuid).update! is_trashed: false
+ end
+ get :show, params: {
+ id: groups(project).uuid,
+ format: :json,
+ include_trash: false
+ }
+ if visible
+ assert_response :success
+ else
+ assert_response 404
+ end
+ end
+
test "show include_trash #{project} #{untrash} as #{auth}" do
authorize_with auth
untrash.each do |pr|
end
end
+ [
+ ["false", false],
+ ["0", false],
+ ["true", true],
+ ["1", true]
+ ].each do |param, truthiness|
+ test "include_trash=#{param.inspect} param JSON-encoded should be interpreted as include_trash=#{truthiness}" do
+ expired_col = collections(:expired_collection)
+ assert expired_col.is_trashed
+ # Try #index first
+ post "/arvados/v1/collections",
+ params: {
+ :_method => 'GET',
+ :include_trash => param,
+ :filters => [['uuid', '=', expired_col.uuid]].to_json
+ },
+ headers: auth(:active)
+ assert_response :success
+ assert_not_nil json_response['items']
+ assert_equal truthiness, json_response['items'].collect {|c| c['uuid']}.include?(expired_col.uuid)
+ # Try #show next
+ post "/arvados/v1/collections/#{expired_col.uuid}",
+ params: {
+ :_method => 'GET',
+ :include_trash => param,
+ },
+ headers: auth(:active)
+ if truthiness
+ assert_response :success
+ else
+ assert_response 404
+ end
+ end
+ end
+
+ [
+ ["false", false],
+ ["0", false],
+ ["true", true],
+ ["1", true]
+ ].each do |param, truthiness|
+ test "include_trash=#{param.inspect} param encoding via query string should be interpreted as include_trash=#{truthiness}" do
+ expired_col = collections(:expired_collection)
+ assert expired_col.is_trashed
+ # Try #index first
+ get("/arvados/v1/collections?include_trash=#{param}&filters=#{[['uuid','=',expired_col.uuid]].to_json}",
+ headers: auth(:active))
+ assert_response :success
+ assert_not_nil json_response['items']
+ assert_equal truthiness, json_response['items'].collect {|c| c['uuid']}.include?(expired_col.uuid)
+ # Try #show next
+ get("/arvados/v1/collections/#{expired_col.uuid}?include_trash=#{param}",
+ headers: auth(:active))
+ if truthiness
+ assert_response :success
+ else
+ assert_response 404
+ end
+ end
+ end
+
+ [
+ ["false", false],
+ ["0", false],
+ ["true", true],
+ ["1", true]
+ ].each do |param, truthiness|
+ test "include_trash=#{param.inspect} form-encoded param should be interpreted as include_trash=#{truthiness}" do
+ expired_col = collections(:expired_collection)
+ assert expired_col.is_trashed
+ params = [
+ ['_method', 'GET'],
+ ['include_trash', param],
+ ['filters', [['uuid','=',expired_col.uuid]].to_json],
+ ]
+ # Try #index first
+ post "/arvados/v1/collections",
+ params: URI.encode_www_form(params),
+ headers: {
+ "Content-type" => "application/x-www-form-urlencoded"
+ }.update(auth(:active))
+ assert_response :success
+ assert_not_nil json_response['items']
+ assert_equal truthiness, json_response['items'].collect {|c| c['uuid']}.include?(expired_col.uuid)
+ # Try #show next
+ post "/arvados/v1/collections/#{expired_col.uuid}",
+ params: URI.encode_www_form([['_method', 'GET'],['include_trash', param]]),
+ headers: {
+ "Content-type" => "application/x-www-form-urlencoded"
+ }.update(auth(:active))
+ if truthiness
+ assert_response :success
+ else
+ assert_response 404
+ end
+ end
+ end
+
test "search collection using full text search" do
# create collection to be searched for
signed_manifest = Collection.sign_manifest(". 85877ca2d7e05498dd3d109baf2df106+95+A3a4e26a366ee7e4ed3e476ccf05354761be2e4ae@545a9920 0:95:file_in_subdir1\n./subdir2/subdir3 2bbc341c702df4d8f42ec31f16c10120+64+A315d7e7bad2ce937e711fc454fae2d1194d14d64@545a9920 0:32:file1_in_subdir3.txt 32:32:file2_in_subdir3.txt\n./subdir2/subdir3/subdir4 2bbc341c702df4d8f42ec31f16c10120+64+A315d7e7bad2ce937e711fc454fae2d1194d14d64@545a9920 0:32:file3_in_subdir4.txt 32:32:file4_in_subdir4.txt\n", api_token(:active))
--- /dev/null
+# Copyright (C) The Arvados Authors. All rights reserved.
+#
+# SPDX-License-Identifier: AGPL-3.0
+
+require 'test_helper'
+
+class ContainerRequestIntegrationTest < ActionDispatch::IntegrationTest
+
+ test "test colon in input" do
+ # Tests for bug #15311 where strings with leading colons get
+ # corrupted when the leading ":" is stripped.
+ val = {"itemSeparator" => ":"}
+ post "/arvados/v1/container_requests",
+ params: {
+ :container_request => {
+ :name => "workflow",
+ :state => "Committed",
+ :command => ["echo"],
+ :container_image => "fa3c1a9cb6783f85f2ecda037e07b8c3+167",
+ :output_path => "/",
+ :priority => 1,
+ :runtime_constraints => {"vcpus" => 1, "ram" => 1},
+ :mounts => {
+ :foo => {
+ :kind => "json",
+ :content => JSON.parse(SafeJSON.dump(val)),
+ }
+ }
+ }
+ }.to_json,
+ headers: {
+ 'HTTP_AUTHORIZATION' => "OAuth2 #{api_client_authorizations(:active).api_token}",
+ 'CONTENT_TYPE' => 'application/json'
+ }
+ assert_response :success
+ assert_equal "arvados#containerRequest", json_response['kind']
+ assert_equal val, json_response['mounts']['foo']['content']
+ end
+end
end
end
- test "Stringify symbols coming from serialized attribute in database" do
- set_user_from_auth :admin_trustedclient
- fixed = Link.find_by_uuid(links(:has_symbol_keys_in_database_somehow).uuid)
- assert_equal(["baz", "foo"], fixed.properties.keys.sort,
- "Hash symbol keys from DB did not get stringified.")
- assert_equal(['waz', 'waz', 'waz', 1, nil, false, true],
- fixed.properties['baz'],
- "Array symbol values from DB did not get stringified.")
- assert_equal true, fixed.save, "Failed to save fixed model back to db."
- end
-
test "No HashWithIndifferentAccess in database" do
set_user_from_auth :admin_trustedclient
link = Link.create!(link_class: 'test',
// simulate mounted read-only collection
s.cp.mounts["/mnt"] = arvados.Mount{
Kind: "collection",
- PortableDataHash: arvadostest.FooPdh,
+ PortableDataHash: arvadostest.FooCollectionPDH,
}
// simulate mounted writable collection
c.Assert(f.Close(), check.IsNil)
s.cp.mounts["/mnt-w"] = arvados.Mount{
Kind: "collection",
- PortableDataHash: arvadostest.FooPdh,
+ PortableDataHash: arvadostest.FooCollectionPDH,
Writable: true,
}
s.cp.binds = append(s.cp.binds, bindtmp+":/mnt-w")
func (s *copierSuite) TestWritableMountBelow(c *check.C) {
s.cp.mounts["/ctr/outdir/mount"] = arvados.Mount{
Kind: "collection",
- PortableDataHash: arvadostest.FooPdh,
+ PortableDataHash: arvadostest.FooCollectionPDH,
Writable: true,
}
c.Assert(os.MkdirAll(s.cp.hostOutputDir+"/mount", 0755), check.IsNil)
package main
import (
- "flag"
- "fmt"
- "net/http"
+ "context"
+ "os"
+ "git.curoverse.com/arvados.git/lib/cmd"
+ "git.curoverse.com/arvados.git/lib/service"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/health"
- "git.curoverse.com/arvados.git/sdk/go/httpserver"
- log "github.com/sirupsen/logrus"
)
-var version = "dev"
-
-func main() {
- configFile := flag.String("config", arvados.DefaultConfigFile, "`path` to arvados configuration file")
- getVersion := flag.Bool("version", false, "Print version information and exit.")
- flag.Parse()
-
- // Print version information if requested
- if *getVersion {
- fmt.Printf("arvados-health %s\n", version)
- return
- }
-
- log.SetFormatter(&log.JSONFormatter{
- TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
- })
- log.Printf("arvados-health %s started", version)
+var (
+ version = "dev"
+ command cmd.Handler = service.Command(arvados.ServiceNameController, newHandler)
+)
- cfg, err := arvados.GetConfig(*configFile)
- if err != nil {
- log.Fatal(err)
- }
- clusterCfg, err := cfg.GetCluster("")
- if err != nil {
- log.Fatal(err)
- }
- nodeCfg, err := clusterCfg.GetNodeProfile("")
- if err != nil {
- log.Fatal(err)
- }
+func newHandler(ctx context.Context, cluster *arvados.Cluster, _ string) service.Handler {
+ return &health.Aggregator{Cluster: cluster}
+}
- log := log.WithField("Service", "Health")
- srv := &httpserver.Server{
- Addr: nodeCfg.Health.Listen,
- Server: http.Server{
- Handler: &health.Aggregator{
- Config: cfg,
- Log: func(req *http.Request, err error) {
- log.WithField("RemoteAddr", req.RemoteAddr).
- WithField("Path", req.URL.Path).
- WithError(err).
- Info("HTTP request")
- },
- },
- },
- }
- if err := srv.Start(); err != nil {
- log.Fatal(err)
- }
- log.WithField("Listen", srv.Addr).Info("listening")
- if err := srv.Wait(); err != nil {
- log.Fatal(err)
- }
+func main() {
+ os.Exit(command.RunCommand(os.Args[0], os.Args[1:], os.Stdin, os.Stdout, os.Stderr))
}
coll, err = cache.Get(arv, arvadostest.FooCollection, false)
c.Check(err, check.Equals, nil)
c.Assert(coll, check.NotNil)
- c.Check(coll.PortableDataHash, check.Equals, arvadostest.FooPdh)
+ c.Check(coll.PortableDataHash, check.Equals, arvadostest.FooCollectionPDH)
c.Check(coll.ManifestText[:2], check.Equals, ". ")
}
s.checkCacheMetrics(c, cache.registry,
// lookup.
arv.ApiToken = arvadostest.ActiveToken
- coll2, err := cache.Get(arv, arvadostest.FooPdh, false)
+ coll2, err := cache.Get(arv, arvadostest.FooCollectionPDH, false)
c.Check(err, check.Equals, nil)
c.Assert(coll2, check.NotNil)
- c.Check(coll2.PortableDataHash, check.Equals, arvadostest.FooPdh)
+ c.Check(coll2.PortableDataHash, check.Equals, arvadostest.FooCollectionPDH)
c.Check(coll2.ManifestText[:2], check.Equals, ". ")
c.Check(coll2.ManifestText, check.Not(check.Equals), coll.ManifestText)
"pdh_hits 4",
"api_calls 2")
- coll2, err = cache.Get(arv, arvadostest.FooPdh, false)
+ coll2, err = cache.Get(arv, arvadostest.FooCollectionPDH, false)
c.Check(err, check.Equals, nil)
c.Assert(coll2, check.NotNil)
- c.Check(coll2.PortableDataHash, check.Equals, arvadostest.FooPdh)
+ c.Check(coll2.PortableDataHash, check.Equals, arvadostest.FooCollectionPDH)
c.Check(coll2.ManifestText[:2], check.Equals, ". ")
s.checkCacheMetrics(c, cache.registry,
cache.registry = prometheus.NewRegistry()
for _, forceReload := range []bool{false, true, false, true} {
- _, err := cache.Get(arv, arvadostest.FooPdh, forceReload)
+ _, err := cache.Get(arv, arvadostest.FooCollectionPDH, forceReload)
c.Check(err, check.Equals, nil)
}
c.Check(stdout, check.Matches, `(?ms).*collection is empty.*`)
}
for _, path := range []string{
- "/by_id/" + arvadostest.FooPdh,
- "/by_id/" + arvadostest.FooPdh + "/",
+ "/by_id/" + arvadostest.FooCollectionPDH,
+ "/by_id/" + arvadostest.FooCollectionPDH + "/",
"/by_id/" + arvadostest.FooCollection,
"/by_id/" + arvadostest.FooCollection + "/",
} {
}
func (s *UnitSuite) TestInvalidUUID(c *check.C) {
- bogusID := strings.Replace(arvadostest.FooPdh, "+", "-", 1) + "-"
+ bogusID := strings.Replace(arvadostest.FooCollectionPDH, "+", "-", 1) + "-"
token := arvadostest.ActiveToken
for _, trial := range []string{
"http://keep-web/c=" + bogusID + "/foo",
arvadostest.FooCollection + ".example.com/foo",
arvadostest.FooCollection + "--collections.example.com/foo",
arvadostest.FooCollection + "--collections.example.com/_/foo",
- arvadostest.FooPdh + ".example.com/foo",
- strings.Replace(arvadostest.FooPdh, "+", "-", -1) + "--collections.example.com/foo",
+ arvadostest.FooCollectionPDH + ".example.com/foo",
+ strings.Replace(arvadostest.FooCollectionPDH, "+", "-", -1) + "--collections.example.com/foo",
arvadostest.FooBarDirCollection + ".example.com/dir1/foo",
} {
c.Log("doRequests: ", hostPath)
dataMD5: "acbd18db4cc2f85cedef654fccc4a4d8",
},
{
- host: strings.Replace(arvadostest.FooPdh, "+", "-", 1) + ".collections.example.com",
+ host: strings.Replace(arvadostest.FooCollectionPDH, "+", "-", 1) + ".collections.example.com",
path: "/t=" + arvadostest.ActiveToken + "/foo",
dataMD5: "acbd18db4cc2f85cedef654fccc4a4d8",
},
{
- path: "/c=" + arvadostest.FooPdh + "/t=" + arvadostest.ActiveToken + "/foo",
+ path: "/c=" + arvadostest.FooCollectionPDH + "/t=" + arvadostest.ActiveToken + "/foo",
dataMD5: "acbd18db4cc2f85cedef654fccc4a4d8",
},
{
- path: "/c=" + strings.Replace(arvadostest.FooPdh, "+", "-", 1) + "/t=" + arvadostest.ActiveToken + "/_/foo",
+ path: "/c=" + strings.Replace(arvadostest.FooCollectionPDH, "+", "-", 1) + "/t=" + arvadostest.ActiveToken + "/_/foo",
dataMD5: "acbd18db4cc2f85cedef654fccc4a4d8",
},
{
-set_serial $RANDOM$RANDOM \
-extfile <(cat /etc/ssl/openssl.cnf \
<(printf "\n[x509_ext]\nkeyUsage=critical,digitalSignature,keyEncipherment\nsubjectAltName=DNS:localhost,$san")) \
- -extensions x509_ext
+ -extensions x509_ext \
+ -days 365
chown arvbox:arvbox /var/lib/arvados/server-cert-${localip}.*
fi