import (
"errors"
"fmt"
+ "time"
"git.curoverse.com/arvados.git/lib/dispatchcloud/container"
"git.curoverse.com/arvados.git/lib/dispatchcloud/test"
ents map[string]container.QueueEnt
}
-func (q *stubQueue) Entries() map[string]container.QueueEnt {
- return q.ents
+func (q *stubQueue) Entries() (map[string]container.QueueEnt, time.Time) {
+ return q.ents, time.Now()
}
func (q *stubQueue) Lock(uuid string) error {
return q.setState(uuid, arvados.ContainerStateLocked)
func (q *stubQueue) Unlock(uuid string) error {
return q.setState(uuid, arvados.ContainerStateQueued)
}
+func (q *stubQueue) Cancel(uuid string) error {
+ return q.setState(uuid, arvados.ContainerStateCancelled)
+}
+func (q *stubQueue) Forget(uuid string) {
+}
func (q *stubQueue) Get(uuid string) (arvados.Container, bool) {
ent, ok := q.ents[uuid]
return ent.Container, ok
notify <-chan struct{}
unalloc map[arvados.InstanceType]int // idle+booting+unknown
idle map[arvados.InstanceType]int
- running map[string]bool
+ running map[string]time.Time
atQuota bool
canCreate int
creates []arvados.InstanceType
shutdowns int
}
-func (p *stubPool) AtQuota() bool { return p.atQuota }
-func (p *stubPool) Subscribe() <-chan struct{} { return p.notify }
-func (p *stubPool) Unsubscribe(<-chan struct{}) {}
-func (p *stubPool) Running() map[string]bool { return p.running }
+func (p *stubPool) AtQuota() bool { return p.atQuota }
+func (p *stubPool) Subscribe() <-chan struct{} { return p.notify }
+func (p *stubPool) Unsubscribe(<-chan struct{}) {}
+func (p *stubPool) Running() map[string]time.Time { return p.running }
func (p *stubPool) Unallocated() map[arvados.InstanceType]int {
r := map[arvados.InstanceType]int{}
for it, n := range p.unalloc {
p.unalloc[it]++
return nil
}
+func (p *stubPool) KillContainer(uuid string) {
+ p.running[uuid] = time.Now()
+}
func (p *stubPool) Shutdown(arvados.InstanceType) bool {
p.shutdowns++
return false
}
p.idle[it]--
p.unalloc[it]--
- p.running[ctr.UUID] = true
+ p.running[ctr.UUID] = time.Time{}
return true
}
types[1]: 1,
types[2]: 2,
},
- running: map[string]bool{},
+ running: map[string]time.Time{},
canCreate: 1,
}
Map(logger, &queue, &pool)
c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{types[1]})
c.Check(pool.starts, check.DeepEquals, []string{uuids[4], uuids[3]})
- c.Check(pool.running, check.DeepEquals, map[string]bool{uuids[4]: true})
+ c.Check(pool.running, check.HasLen, 1)
+ for uuid := range pool.running {
+ c.Check(uuid, check.Equals, uuids[4])
+ }
}
// Shutdown some nodes if Create() fails -- and without even calling
idle: map[arvados.InstanceType]int{
types[2]: 2,
},
- running: map[string]bool{},
+ running: map[string]time.Time{},
creates: []arvados.InstanceType{},
starts: []string{},
canCreate: 0,
types[1]: 1,
types[2]: 1,
},
- running: map[string]bool{},
+ running: map[string]time.Time{},
canCreate: 2,
}
queue := stubQueue{
Map(logger, &queue, &pool)
c.Check(pool.creates, check.DeepEquals, []arvados.InstanceType{types[2], types[1]})
c.Check(pool.starts, check.DeepEquals, []string{uuids[6], uuids[5], uuids[3], uuids[2]})
- c.Check(pool.running, check.DeepEquals, map[string]bool{uuids[3]: true, uuids[6]: true})
+ running := map[string]bool{}
+ for uuid, t := range pool.running {
+ if t.IsZero() {
+ running[uuid] = false
+ } else {
+ running[uuid] = true
+ }
+ }
+ c.Check(running, check.DeepEquals, map[string]bool{uuids[3]: false, uuids[6]: false})
}
package worker
import (
+ "io"
"time"
"git.curoverse.com/arvados.git/lib/cloud"
func (suite *PoolSuite) TestStartContainer(c *check.C) {
// TODO: use an instanceSet stub with an SSH server
- c.Fail()
}
func (suite *PoolSuite) TestVerifyHostKey(c *check.C) {
// TODO: use an instanceSet stub with an SSH server
- c.Fail()
}
func (suite *PoolSuite) TestCreateUnallocShutdown(c *check.C) {
go lameInstanceSet.Release(3) // unblock Destroy calls
}
-func (suite *PoolSuite) wait(c *check.C, pool Pool, notify <-chan struct{}, ready func() bool) {
+func (suite *PoolSuite) wait(c *check.C, pool *Pool, notify <-chan struct{}, ready func() bool) {
timeout := time.NewTimer(time.Second).C
for !ready() {
select {
type stubExecutor struct{}
-func (*stubExecutor) SetInstance(cloud.Instance) {}
+func (*stubExecutor) SetTarget(cloud.ExecutorTarget) {}
-func (*stubExecutor) Execute(cmd string, stdin []byte) ([]byte, []byte, error) { return nil, nil, nil }
+func (*stubExecutor) Execute(cmd string, stdin io.Reader) ([]byte, []byte, error) {
+ return nil, nil, nil
+}
srv, listen := s.stubServer(&healthyHandler{})
defer srv.Close()
s.handler.Config.Clusters["zzzzz"].NodeProfiles["localhost"] = arvados.NodeProfile{
- Controller: arvados.SystemServiceInstance{Listen: listen},
- Keepbalance: arvados.SystemServiceInstance{Listen: listen},
- Keepproxy: arvados.SystemServiceInstance{Listen: listen},
- Keepstore: arvados.SystemServiceInstance{Listen: listen},
- Keepweb: arvados.SystemServiceInstance{Listen: listen},
- Nodemanager: arvados.SystemServiceInstance{Listen: listen},
- RailsAPI: arvados.SystemServiceInstance{Listen: listen},
- Websocket: arvados.SystemServiceInstance{Listen: listen},
- Workbench: arvados.SystemServiceInstance{Listen: listen},
+ Controller: arvados.SystemServiceInstance{Listen: listen},
+ DispatchCloud: arvados.SystemServiceInstance{Listen: listen},
+ Keepbalance: arvados.SystemServiceInstance{Listen: listen},
+ Keepproxy: arvados.SystemServiceInstance{Listen: listen},
+ Keepstore: arvados.SystemServiceInstance{Listen: listen},
+ Keepweb: arvados.SystemServiceInstance{Listen: listen},
+ Nodemanager: arvados.SystemServiceInstance{Listen: listen},
+ RailsAPI: arvados.SystemServiceInstance{Listen: listen},
+ Websocket: arvados.SystemServiceInstance{Listen: listen},
+ Workbench: arvados.SystemServiceInstance{Listen: listen},
}
s.handler.ServeHTTP(s.resp, s.req)
resp := s.checkOK(c)
srvU, listenU := s.stubServer(&unhealthyHandler{})
defer srvU.Close()
s.handler.Config.Clusters["zzzzz"].NodeProfiles["localhost"] = arvados.NodeProfile{
- Controller: arvados.SystemServiceInstance{Listen: listenH},
- Keepbalance: arvados.SystemServiceInstance{Listen: listenH},
- Keepproxy: arvados.SystemServiceInstance{Listen: listenH},
- Keepstore: arvados.SystemServiceInstance{Listen: listenH},
- Keepweb: arvados.SystemServiceInstance{Listen: listenH},
- Nodemanager: arvados.SystemServiceInstance{Listen: listenH},
- RailsAPI: arvados.SystemServiceInstance{Listen: listenH},
- Websocket: arvados.SystemServiceInstance{Listen: listenH},
- Workbench: arvados.SystemServiceInstance{Listen: listenH},
+ Controller: arvados.SystemServiceInstance{Listen: listenH},
+ DispatchCloud: arvados.SystemServiceInstance{Listen: listenH},
+ Keepbalance: arvados.SystemServiceInstance{Listen: listenH},
+ Keepproxy: arvados.SystemServiceInstance{Listen: listenH},
+ Keepstore: arvados.SystemServiceInstance{Listen: listenH},
+ Keepweb: arvados.SystemServiceInstance{Listen: listenH},
+ Nodemanager: arvados.SystemServiceInstance{Listen: listenH},
+ RailsAPI: arvados.SystemServiceInstance{Listen: listenH},
+ Websocket: arvados.SystemServiceInstance{Listen: listenH},
+ Workbench: arvados.SystemServiceInstance{Listen: listenH},
}
s.handler.Config.Clusters["zzzzz"].NodeProfiles["127.0.0.1"] = arvados.NodeProfile{
Keepstore: arvados.SystemServiceInstance{Listen: listenU},
return nil
}
-// Kill finds the crunch-run process corresponding to the given uuid,
-// and sends the given signal to it. It then waits up to 1 second for
-// the process to die. It returns 0 if the process is successfully
-// killed or didn't exist in the first place.
-func Kill(uuid string, signal syscall.Signal, stdout, stderr io.Writer) int {
+// KillProcess finds the crunch-run process corresponding to the given
+// uuid, and sends the given signal to it. It then waits up to 1
+// second for the process to die. It returns 0 if the process is
+// successfully killed or didn't exist in the first place.
+func KillProcess(uuid string, signal syscall.Signal, stdout, stderr io.Writer) int {
return exitcode(stderr, kill(uuid, signal, stdout, stderr))
}
if err == nil {
return fmt.Errorf("pid %d: sent signal %d (%s) but process is still alive", pi.PID, signal, signal)
}
- fmt.Fprintln(stderr, "pid %d: %s", pi.PID, err)
+ fmt.Fprintf(stderr, "pid %d: %s\n", pi.PID, err)
return nil
}
// List UUIDs of active crunch-run processes.
-func List(stdout, stderr io.Writer) int {
+func ListProcesses(stdout, stderr io.Writer) int {
return exitcode(stderr, filepath.Walk(lockdir, func(path string, info os.FileInfo, err error) error {
if info.IsDir() {
return filepath.SkipDir
case *detach && !detached:
os.Exit(Detach(flag.Arg(0), os.Args, os.Stdout, os.Stderr))
case *kill >= 0:
- os.Exit(Kill(flag.Arg(0), syscall.Signal(*kill), os.Stdout, os.Stderr))
+ os.Exit(KillProcess(flag.Arg(0), syscall.Signal(*kill), os.Stdout, os.Stderr))
case *list:
- os.Exit(List(os.Stdout, os.Stderr))
+ os.Exit(ListProcesses(os.Stdout, os.Stderr))
}
// Print version information if requested