// ConfigData contains environment variables and (when needed) cluster
// configuration, passed from dispatchcloud to crunch-run on stdin.
type ConfigData struct {
- Env map[string]string
- KeepBuffers int
- Cluster *arvados.Cluster
+ Env map[string]string
+ KeepBuffers int
+ EC2SpotCheck bool
+ Cluster *arvados.Cluster
}
// IArvadosClient is the minimal Arvados API methods used by crunch-run.
"Block not found error",
"Unhandled exception during FUSE operation",
},
- ReportFunc: runner.reportArvMountWarning,
+ ReportFunc: func(pattern, text string) {
+ runner.updateRuntimeStatus(arvadosclient.Dict{
+ "warning": "arv-mount: " + pattern,
+ "warningDetail": text,
+ })
+ },
}
c.Stdout = runner.arvMountLog
c.Stderr = io.MultiWriter(runner.arvMountLog, os.Stderr, &scanner)
return nil
}
runner.hoststatReporter.Stop()
+ runner.hoststatReporter.LogProcessMemMax(runner.CrunchLog)
err := runner.hoststatLogger.Close()
if err != nil {
return fmt.Errorf("error closing hoststat logs: %v", err)
}
runner.statLogger = NewThrottledLogger(w)
runner.statReporter = &crunchstat.Reporter{
- CID: runner.executor.CgroupID(),
- Logger: log.New(runner.statLogger, "", 0),
CgroupParent: runner.expectCgroupParent,
CgroupRoot: runner.cgroupRoot,
- PollPeriod: runner.statInterval,
- TempDir: runner.parentTemp,
+ CID: runner.executor.CgroupID(),
+ Logger: log.New(runner.statLogger, "", 0),
+ MemThresholds: map[string][]crunchstat.Threshold{
+ "rss": crunchstat.NewThresholdsFromPercentages(runner.Container.RuntimeConstraints.RAM, []int64{90, 95, 99}),
+ },
+ PollPeriod: runner.statInterval,
+ TempDir: runner.parentTemp,
+ ThresholdLogger: runner.CrunchLog,
}
runner.statReporter.Start()
return nil
if runner.statReporter != nil {
runner.statReporter.Stop()
+ runner.statReporter.LogMaxima(runner.CrunchLog, map[string]int64{
+ "rss": runner.Container.RuntimeConstraints.RAM,
+ })
err = runner.statLogger.Close()
if err != nil {
runner.CrunchLog.Printf("error closing crunchstat logs: %v", err)
}
}
-func (runner *ContainerRunner) reportArvMountWarning(pattern, text string) {
- var updated arvados.Container
+var spotInterruptionCheckInterval = 5 * time.Second
+var ec2MetadataBaseURL = "http://169.254.169.254"
+
+const ec2TokenTTL = time.Second * 21600
+
+func (runner *ContainerRunner) checkSpotInterruptionNotices() {
+ type ec2metadata struct {
+ Action string `json:"action"`
+ Time time.Time `json:"time"`
+ }
+ runner.CrunchLog.Printf("Checking for spot interruptions every %v using instance metadata at %s", spotInterruptionCheckInterval, ec2MetadataBaseURL)
+ var metadata ec2metadata
+ var token string
+ var tokenExp time.Time
+ check := func() error {
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Minute))
+ defer cancel()
+ if token == "" || tokenExp.Sub(time.Now()) < time.Minute {
+ req, err := http.NewRequestWithContext(ctx, http.MethodPut, ec2MetadataBaseURL+"/latest/api/token", nil)
+ if err != nil {
+ return err
+ }
+ req.Header.Set("X-aws-ec2-metadata-token-ttl-seconds", fmt.Sprintf("%d", int(ec2TokenTTL/time.Second)))
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ return fmt.Errorf("%s", resp.Status)
+ }
+ newtoken, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return err
+ }
+ token = strings.TrimSpace(string(newtoken))
+ tokenExp = time.Now().Add(ec2TokenTTL)
+ }
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, ec2MetadataBaseURL+"/latest/meta-data/spot/instance-action", nil)
+ if err != nil {
+ return err
+ }
+ req.Header.Set("X-aws-ec2-metadata-token", token)
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ metadata = ec2metadata{}
+ switch resp.StatusCode {
+ case http.StatusOK:
+ break
+ case http.StatusNotFound:
+ // "If Amazon EC2 is not preparing to stop or
+ // terminate the instance, or if you
+ // terminated the instance yourself,
+ // instance-action is not present in the
+ // instance metadata and you receive an HTTP
+ // 404 error when you try to retrieve it."
+ return nil
+ case http.StatusUnauthorized:
+ token = ""
+ return fmt.Errorf("%s", resp.Status)
+ default:
+ return fmt.Errorf("%s", resp.Status)
+ }
+ err = json.NewDecoder(resp.Body).Decode(&metadata)
+ if err != nil {
+ return err
+ }
+ return nil
+ }
+ failures := 0
+ var lastmetadata ec2metadata
+ for range time.NewTicker(spotInterruptionCheckInterval).C {
+ err := check()
+ if err != nil {
+ runner.CrunchLog.Printf("Error checking spot interruptions: %s", err)
+ failures++
+ if failures > 5 {
+ runner.CrunchLog.Printf("Giving up on checking spot interruptions after too many consecutive failures")
+ return
+ }
+ continue
+ }
+ failures = 0
+ if metadata != lastmetadata {
+ lastmetadata = metadata
+ text := fmt.Sprintf("Cloud provider scheduled instance %s at %s", metadata.Action, metadata.Time.UTC().Format(time.RFC3339))
+ runner.CrunchLog.Printf("%s", text)
+ runner.updateRuntimeStatus(arvadosclient.Dict{
+ "warning": "preemption notice",
+ "warningDetail": text,
+ "preemptionNotice": text,
+ })
+ if proc, err := os.FindProcess(os.Getpid()); err == nil {
+ // trigger updateLogs
+ proc.Signal(syscall.SIGUSR1)
+ }
+ }
+ }
+}
+
+func (runner *ContainerRunner) updateRuntimeStatus(status arvadosclient.Dict) {
err := runner.DispatcherArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
"container": arvadosclient.Dict{
- "runtime_status": arvadosclient.Dict{
- "warning": "arv-mount: " + pattern,
- "warningDetail": text,
- },
+ "runtime_status": status,
},
- }, &updated)
+ }, nil)
if err != nil {
runner.CrunchLog.Printf("error updating container runtime_status: %s", err)
}
if final {
updates["is_trashed"] = true
} else {
- exp := time.Now().Add(crunchLogUpdatePeriod * 24)
+ // We set trash_at so this collection gets
+ // automatically cleaned up eventually. It used to be
+ // 12 hours but we had a situation where the API
+ // server was down over a weekend but the containers
+ // kept running such that the log collection got
+ // trashed, so now we make it 2 weeks. refs #20378
+ exp := time.Now().Add(time.Duration(24*14) * time.Hour)
updates["trash_at"] = exp
updates["delete_at"] = exp
}
}
// UpdateContainerRunning updates the container state to "Running"
-func (runner *ContainerRunner) UpdateContainerRunning() error {
+func (runner *ContainerRunner) UpdateContainerRunning(logId string) error {
runner.cStateLock.Lock()
defer runner.cStateLock.Unlock()
if runner.cCancelled {
return ErrCancelled
}
- return runner.DispatcherArvClient.Update("containers", runner.Container.UUID,
- arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running", "gateway_address": runner.gateway.Address}}, nil)
+ updates := arvadosclient.Dict{
+ "gateway_address": runner.gateway.Address,
+ "state": "Running",
+ }
+ if logId != "" {
+ updates["log"] = logId
+ }
+ return runner.DispatcherArvClient.Update(
+ "containers",
+ runner.Container.UUID,
+ arvadosclient.Dict{"container": updates},
+ nil,
+ )
}
// ContainerToken returns the api_token the container (and any
signal.Notify(sigusr2, syscall.SIGUSR2)
defer signal.Stop(sigusr2)
runner.loadPrices()
- go func() {
- for range sigusr2 {
- runner.loadPrices()
- }
- }()
+ go runner.handleSIGUSR2(sigusr2)
runner.finalState = "Queued"
return
}
- err = runner.UpdateContainerRunning()
+ logCollection, err := runner.saveLogCollection(false)
+ var logId string
+ if err == nil {
+ logId = logCollection.PortableDataHash
+ } else {
+ runner.CrunchLog.Printf("Error committing initial log collection: %v", err)
+ }
+ err = runner.UpdateContainerRunning(logId)
if err != nil {
return
}
ContainerUUID: containerUUID,
Target: cr.executor,
Log: cr.CrunchLog,
+ LogCollection: cr.LogCollection,
}
if gwListen == "" {
// Direct connection won't work, so we use the
cr.expectCgroupParent = p
}
+ if conf.EC2SpotCheck {
+ go cr.checkSpotInterruptionNotices()
+ }
+
runerr := cr.Run()
if *memprofile != "" {
return cost
}
+
+func (runner *ContainerRunner) handleSIGUSR2(sigchan chan os.Signal) {
+ for range sigchan {
+ runner.loadPrices()
+ update := arvadosclient.Dict{
+ "container": arvadosclient.Dict{
+ "cost": runner.calculateCost(time.Now()),
+ },
+ }
+ runner.DispatcherArvClient.Update("containers", runner.Container.UUID, update, nil)
+ }
+}