"Arvados server daemons"
package_go_binary cmd/arvados-server arvados-controller \
"Arvados cluster controller daemon"
-package_go_binary cmd/arvados-server crunch-dispatch-cloud \
+package_go_binary cmd/arvados-server arvados-dispatch-cloud \
"Arvados cluster cloud dispatch"
package_go_binary sdk/go/crunchrunner crunchrunner \
"Crunchrunner executes a command inside a container and uploads the output"
[Service]
Type=notify
EnvironmentFile=-/etc/arvados/environment
-ExecStart=/usr/bin/crunch-dispatch-cloud
+ExecStart=/usr/bin/arvados-dispatch-cloud
Restart=always
RestartSec=1
// error: it wouldn't help to try again, or to leave
// it for a different dispatcher process to attempt.
errorString := err.Error()
- cq.logger.WithField("ContainerUUID", ctr.UUID).Warn("cancel container with no suitable instance type")
+ logger := cq.logger.WithField("ContainerUUID", ctr.UUID)
+ logger.WithError(err).Warn("cancel container with no suitable instance type")
go func() {
+ if ctr.State == arvados.ContainerStateQueued {
+ // Can't set runtime error without
+ // locking first. If Lock() is
+ // successful, it will call addEnt()
+ // again itself, and we'll fall
+ // through to the
+ // setRuntimeError/Cancel code below.
+ err := cq.Lock(ctr.UUID)
+ if err != nil {
+ logger.WithError(err).Warn("lock failed")
+ // ...and try again on the
+ // next Update, if the problem
+ // still exists.
+ }
+ return
+ }
var err error
defer func() {
if err == nil {
if latest.State == arvados.ContainerStateCancelled {
return
}
- cq.logger.WithField("ContainerUUID", ctr.UUID).WithError(err).Warn("error while trying to cancel unsatisfiable container")
+ logger.WithError(err).Warn("error while trying to cancel unsatisfiable container")
}()
- if ctr.State == arvados.ContainerStateQueued {
- err = cq.Lock(ctr.UUID)
- if err != nil {
- return
- }
- }
err = cq.setRuntimeError(ctr.UUID, errorString)
if err != nil {
return
disp.stopped = make(chan struct{})
disp.logger = logrus.StandardLogger()
- if key, err := ssh.ParsePrivateKey(disp.Cluster.Dispatch.PrivateKey); err != nil {
+ if key, err := ssh.ParsePrivateKey([]byte(disp.Cluster.Dispatch.PrivateKey)); err != nil {
disp.logger.Fatalf("error parsing configured Dispatch.PrivateKey: %s", err)
} else {
disp.sshKey = key
TimeoutShutdown: arvados.Duration(5 * time.Millisecond),
},
Dispatch: arvados.Dispatch{
- PrivateKey: dispatchprivraw,
+ PrivateKey: string(dispatchprivraw),
PollInterval: arvados.Duration(5 * time.Millisecond),
ProbeInterval: arvados.Duration(5 * time.Millisecond),
StaleLockTimeout: arvados.Duration(5 * time.Millisecond),
c.Fatalf("timed out; still waiting for %d containers: %q", len(waiting), waiting)
}
- deadline := time.Now().Add(time.Second)
+ deadline := time.Now().Add(5 * time.Second)
for range time.NewTicker(10 * time.Millisecond).C {
insts, err := s.stubDriver.InstanceSets()[0].Instances(nil)
c.Check(err, check.IsNil)
func (sch *Scheduler) fixStaleLocks() {
wp := sch.pool.Subscribe()
defer sch.pool.Unsubscribe(wp)
+
+ var stale []string
timeout := time.NewTimer(sch.staleLockTimeout)
waiting:
for {
- unlock := false
- select {
- case <-wp:
- // If all workers have been contacted, unlock
- // containers that aren't claimed by any
- // worker.
- unlock = sch.pool.CountWorkers()[worker.StateUnknown] == 0
- case <-timeout.C:
- // Give up and unlock the containers, even
- // though they might be working.
- unlock = true
- }
-
running := sch.pool.Running()
qEntries, _ := sch.queue.Entries()
+
+ stale = nil
for uuid, ent := range qEntries {
if ent.Container.State != arvados.ContainerStateLocked {
continue
if _, running := running[uuid]; running {
continue
}
- if !unlock {
- continue waiting
- }
- err := sch.queue.Unlock(uuid)
- if err != nil {
- sch.logger.Warnf("Unlock %s: %s", uuid, err)
+ stale = append(stale, uuid)
+ }
+ if len(stale) == 0 {
+ return
+ }
+
+ select {
+ case <-wp:
+ // Stop waiting if all workers have been
+ // contacted.
+ if sch.pool.CountWorkers()[worker.StateUnknown] == 0 {
+ break waiting
}
+ case <-timeout.C:
+ // Give up.
+ break waiting
+ }
+
+ }
+
+ for _, uuid := range stale {
+ err := sch.queue.Unlock(uuid)
+ if err != nil {
+ sch.logger.Warnf("Unlock %s: %s", uuid, err)
}
- return
}
}
s.executables << "arv-crunch-job"
s.executables << "arv-tag"
s.required_ruby_version = '>= 2.1.0'
- s.add_runtime_dependency 'arvados', '~> 1.2.0', '>= 1.2.0'
+ s.add_runtime_dependency 'arvados', '~> 1.3.0', '>= 1.3.0'
# Our google-api-client dependency used to be < 0.9, but that could be
# satisfied by the buggy 0.9.pre*. https://dev.arvados.org/issues/9213
s.add_runtime_dependency 'cure-google-api-client', '~> 0.6', '>= 0.6.3', '<0.8.9'
extra_submit_params["cluster_id"] = runtimeContext.submit_runner_cluster
if runtimeContext.submit_request_uuid:
+ if "cluster_id" in extra_submit_params:
+ # Doesn't make sense for "update" and actually fails
+ del extra_submit_params["cluster_id"]
response = self.arvrunner.api.container_requests().update(
uuid=runtimeContext.submit_request_uuid,
body=job_spec,
count = 0
start = time.time()
checkpoint = start
- with c.open(name, "w") as f:
+ with c.open(name, "wb") as f:
for chunk in req.iter_content(chunk_size=1024):
count += len(chunk)
f.write(chunk)
'schema-salad==3.0.20181129082112',
'typing >= 3.6.4',
'ruamel.yaml >=0.15.54, <=0.15.77',
- 'arvados-python-client>=1.2.1.20181130020805',
+ 'arvados-python-client>=1.3.0.20190205182514',
'setuptools',
'ciso8601 >=1.0.6, <2.0.0',
'subprocess32>=3.5.1',
getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True)
- cm.open.assert_called_with("file1.txt", "w")
+ cm.open.assert_called_with("file1.txt", "wb")
cm.save_new.assert_called_with(name="Downloaded from http://example.com/file1.txt",
owner_uuid=None, ensure_unique_name=True)
getmock.assert_called_with("http://example.com/file1.txt", stream=True, allow_redirects=True)
- cm.open.assert_called_with("file1.txt", "w")
+ cm.open.assert_called_with("file1.txt", "wb")
cm.save_new.assert_called_with(name="Downloaded from http://example.com/file1.txt",
owner_uuid=None, ensure_unique_name=True)
getmock.assert_called_with("http://example.com/download?fn=/file1.txt", stream=True, allow_redirects=True)
- cm.open.assert_called_with("file1.txt", "w")
+ cm.open.assert_called_with("file1.txt", "wb")
cm.save_new.assert_called_with(name="Downloaded from http://example.com/download?fn=/file1.txt",
owner_uuid=None, ensure_unique_name=True)
body=JsonDiffMatcher(expect_pipeline))
@stubs
- def test_submit_container(self, stubs):
+ def test_submit_container(self, stubs):
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
stubs.api.container_requests().create.assert_called_with(
body=JsonDiffMatcher(expect_container))
self.assertEqual(stubs.capture_stdout.getvalue(),
- stubs.expect_container_request_uuid + '\n')
+ stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
@stubs
["--submit", "--no-wait", "--api=containers", "--debug", "--on-error=stop",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
-
+
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
'--no-log-timestamps', '--disable-validate',
@stubs
def test_submit_container_output_name(self, stubs):
output_name = "test_output_name"
-
+
exited = arvados_cwl.main(
["--submit", "--no-wait", "--api=containers", "--debug", "--output-name", output_name,
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
self.assertEqual(exited, 0)
@stubs
- def test_submit_storage_classes(self, stubs):
+ def test_submit_storage_classes(self, stubs):
exited = arvados_cwl.main(
["--debug", "--submit", "--no-wait", "--api=containers", "--storage-classes=foo",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
["--submit", "--no-wait", "--api=containers", "--debug", "--trash-intermediate",
"tests/wf/submit_wf.cwl", "tests/submit_test_job.json"],
stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
-
+
expect_container = copy.deepcopy(stubs.expect_container_spec)
expect_container["command"] = ['arvados-cwl-runner', '--local', '--api=containers',
stubs.capture_stdout, sys.stderr, api_client=stubs.api, keep_client=stubs.keep_client)
stubs.api.container_requests().update.assert_called_with(
- uuid="zzzzz-xvhdp-yyyyyyyyyyyyyyy", body=JsonDiffMatcher(stubs.expect_container_spec), cluster_id="zzzzz")
+ uuid="zzzzz-xvhdp-yyyyyyyyyyyyyyy", body=JsonDiffMatcher(stubs.expect_container_spec))
self.assertEqual(stubs.capture_stdout.getvalue(),
stubs.expect_container_request_uuid + '\n')
self.assertEqual(exited, 0)
self.assertEqual(stubs.capture_stdout.getvalue(),
self.existing_template_uuid + '\n')
self.assertEqual(exited, 0)
-
+
class TestCreateWorkflow(unittest.TestCase):
existing_workflow_uuid = "zzzzz-7fd4e-validworkfloyml"
type Dispatch struct {
// PEM encoded SSH key (RSA, DSA, or ECDSA) able to log in to
// cloud VMs.
- PrivateKey []byte
+ PrivateKey string
// Max time for workers to come up before abandoning stale
// locks from previous run
clnt = HTTPClient.new
if Rails.configuration.sso_insecure
clnt.ssl_config.verify_mode = OpenSSL::SSL::VERIFY_NONE
+ else
+ # Use system CA certificates
+ ["/etc/ssl/certs/ca-certificates.crt",
+ "/etc/pki/tls/certs/ca-bundle.crt"]
+ .select { |ca_path| File.readable?(ca_path) }
+ .each { |ca_path| clnt.ssl_config.add_trust_ca(ca_path) }
end
remote_user = SafeJSON.load(
clnt.get_content('https://' + host + '/arvados/v1/users/current',
s.files = ["bin/arvados-login-sync", "agpl-3.0.txt"]
s.executables << "arvados-login-sync"
s.required_ruby_version = '>= 2.1.0'
- s.add_runtime_dependency 'arvados', '~> 1.2.0', '>= 1.2.0'
+ s.add_runtime_dependency 'arvados', '~> 1.3.0', '>= 1.3.0'
s.homepage =
'https://arvados.org'
end