package worker
import (
+ "crypto/hmac"
"crypto/md5"
"crypto/rand"
+ "crypto/sha256"
"errors"
"fmt"
"io"
"io/ioutil"
+ mathrand "math/rand"
"sort"
"strings"
"sync"
func duration(conf arvados.Duration, def time.Duration) time.Duration {
if conf > 0 {
return time.Duration(conf)
- } else {
- return def
}
+ return def
}
// NewPool creates a Pool of workers backed by instanceSet.
instanceSetID: instanceSetID,
instanceSet: &throttledInstanceSet{InstanceSet: instanceSet},
newExecutor: newExecutor,
+ cluster: cluster,
bootProbeCommand: cluster.Containers.CloudVMs.BootProbeCommand,
runnerSource: cluster.Containers.CloudVMs.DeployRunnerBinary,
imageID: cloud.ImageID(cluster.Containers.CloudVMs.ImageID),
timeoutTERM: duration(cluster.Containers.CloudVMs.TimeoutTERM, defaultTimeoutTERM),
timeoutSignal: duration(cluster.Containers.CloudVMs.TimeoutSignal, defaultTimeoutSignal),
timeoutStaleRunLock: duration(cluster.Containers.CloudVMs.TimeoutStaleRunLock, defaultTimeoutStaleRunLock),
+ systemRootToken: cluster.SystemRootToken,
installPublicKey: installPublicKey,
tagKeyPrefix: cluster.Containers.CloudVMs.TagKeyPrefix,
+ runnerCmdDefault: cluster.Containers.CrunchRunCommand,
+ runnerArgs: append([]string{"--runtime-engine=" + cluster.Containers.RuntimeEngine}, cluster.Containers.CrunchRunArgumentsList...),
stop: make(chan bool),
}
wp.registerMetrics(reg)
instanceSetID cloud.InstanceSetID
instanceSet *throttledInstanceSet
newExecutor func(cloud.Instance) Executor
+ cluster *arvados.Cluster
bootProbeCommand string
runnerSource string
imageID cloud.ImageID
timeoutTERM time.Duration
timeoutSignal time.Duration
timeoutStaleRunLock time.Duration
+ systemRootToken string
installPublicKey ssh.PublicKey
tagKeyPrefix string
+ runnerCmdDefault string // crunch-run command to use if not deploying a binary
+ runnerArgs []string // extra args passed to crunch-run
// private state
subscribers map[<-chan struct{}]chan<- struct{}
Help: "Number of seconds per runProbe call.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001},
}, []string{"outcome"})
- for _, v := range []string{"success", "fail"} {
- wp.mRunProbeDuration.WithLabelValues(v).Observe(0)
- }
reg.MustRegister(wp.mRunProbeDuration)
}
workers := []cloud.InstanceID{}
for range probeticker.C {
+ // Add some jitter. Without this, if probeInterval is
+ // a multiple of syncInterval and sync is
+ // instantaneous (as with the loopback driver), the
+ // first few probes race with sync operations and
+ // don't update the workers.
+ time.Sleep(time.Duration(mathrand.Int63n(int64(wp.probeInterval) / 23)))
+
workers = workers[:0]
wp.mtx.Lock()
for id, wkr := range wp.workers {
if wp.runnerData != nil {
return nil
} else if wp.runnerSource == "" {
- wp.runnerCmd = "crunch-run"
+ wp.runnerCmd = wp.runnerCmdDefault
wp.runnerData = []byte{}
return nil
}
}
wp.runnerData = buf
wp.runnerMD5 = md5.Sum(buf)
- wp.runnerCmd = fmt.Sprintf("/var/lib/arvados/crunch-run~%x", wp.runnerMD5)
+ wp.runnerCmd = fmt.Sprintf("/tmp/arvados-crunch-run/crunch-run~%x", wp.runnerMD5)
return nil
}
}
}
+func (wp *Pool) gatewayAuthSecret(uuid string) string {
+ h := hmac.New(sha256.New, []byte(wp.systemRootToken))
+ fmt.Fprint(h, uuid)
+ return fmt.Sprintf("%x", h.Sum(nil))
+}
+
// Return a random string of n hexadecimal digits (n*4 random bits). n
// must be even.
func randomHex(n int) string {