projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
15720: Batch user update API.
[arvados.git]
/
lib
/
dispatchcloud
/
worker
/
worker.go
diff --git
a/lib/dispatchcloud/worker/worker.go
b/lib/dispatchcloud/worker/worker.go
index 41117c1d4eafb5aa2a92c163d3f79d72ace443d3..03ab15176f5297b85182d3689b71f5a3f0195004 100644
(file)
--- a/
lib/dispatchcloud/worker/worker.go
+++ b/
lib/dispatchcloud/worker/worker.go
@@
-5,7
+5,6
@@
package worker
import (
package worker
import (
- "bytes"
"fmt"
"strings"
"sync"
"fmt"
"strings"
"sync"
@@
-215,11
+214,16
@@
func (wkr *worker) probeAndUpdate() {
logger.Info("instance booted; will try probeRunning")
}
}
logger.Info("instance booted; will try probeRunning")
}
}
+ reportedBroken := false
if booted || wkr.state == StateUnknown {
if booted || wkr.state == StateUnknown {
- ctrUUIDs, ok = wkr.probeRunning()
+ ctrUUIDs,
reportedBroken,
ok = wkr.probeRunning()
}
wkr.mtx.Lock()
defer wkr.mtx.Unlock()
}
wkr.mtx.Lock()
defer wkr.mtx.Unlock()
+ if reportedBroken && wkr.idleBehavior == IdleBehaviorRun {
+ logger.Info("probe reported broken instance")
+ wkr.setIdleBehavior(IdleBehaviorDrain)
+ }
if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
if wkr.state == StateShutdown && wkr.updated.After(updated) {
// Skip the logging noise if shutdown was
if !ok || (!booted && len(ctrUUIDs) == 0 && len(wkr.running) == 0) {
if wkr.state == StateShutdown && wkr.updated.After(updated) {
// Skip the logging noise if shutdown was
@@
-313,7
+317,7
@@
func (wkr *worker) probeAndUpdate() {
go wkr.wp.notify()
}
go wkr.wp.notify()
}
-func (wkr *worker) probeRunning() (running []string, ok bool) {
+func (wkr *worker) probeRunning() (running []string,
reportsBroken,
ok bool) {
cmd := "crunch-run --list"
if u := wkr.instance.RemoteUser(); u != "root" {
cmd = "sudo " + cmd
cmd := "crunch-run --list"
if u := wkr.instance.RemoteUser(); u != "root" {
cmd = "sudo " + cmd
@@
-325,13
+329,17
@@
func (wkr *worker) probeRunning() (running []string, ok bool) {
"stdout": string(stdout),
"stderr": string(stderr),
}).WithError(err).Warn("probe failed")
"stdout": string(stdout),
"stderr": string(stderr),
}).WithError(err).Warn("probe failed")
- return
nil, false
+ return
}
}
- stdout = bytes.TrimRight(stdout, "\n")
- if len(stdout) == 0 {
- return nil, true
+ ok = true
+ for _, s := range strings.Split(string(stdout), "\n") {
+ if s == "broken" {
+ reportsBroken = true
+ } else if s != "" {
+ running = append(running, s)
+ }
}
}
- return
strings.Split(string(stdout), "\n"), true
+ return
}
func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
}
func (wkr *worker) probeBooted() (ok bool, stderr []byte) {
@@
-447,8
+455,8
@@
func (wkr *worker) saveTags() {
instance := wkr.instance
tags := instance.Tags()
update := cloud.InstanceTags{
instance := wkr.instance
tags := instance.Tags()
update := cloud.InstanceTags{
- tagKeyInstanceType: wkr.instType.Name,
- tagKeyIdleBehavior: string(wkr.idleBehavior),
+
wkr.wp.tagKeyPrefix +
tagKeyInstanceType: wkr.instType.Name,
+
wkr.wp.tagKeyPrefix +
tagKeyIdleBehavior: string(wkr.idleBehavior),
}
save := false
for k, v := range update {
}
save := false
for k, v := range update {