import (
"bufio"
+ "bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
+ "net/http"
"net/url"
"os"
"regexp"
}()
neednewline := ""
+ logTell := map[string]int64{}
lastState := cr.State
refreshCR := func() {
log.Printf("subscribe container UUID: %s", cr.ContainerUUID)
client.Subscribe(logch, cr.ContainerUUID)
subscribedUUID = cr.ContainerUUID
+ logTell = map[string]int64{}
}
}
+ var logWaitMax = time.Second * 10
+ var logWaitMin = time.Second
+ var logWait = logWaitMin
+ var logWaitDone = time.After(logWait)
var reCrunchstat = regexp.MustCompile(`mem .* (\d+) rss`)
waitctr:
for cr.State != arvados.ContainerRequestStateFinal {
case <-refreshTicker.C:
refreshCR()
case msg := <-logch:
- switch msg.EventType {
- case "update":
+ if msg.EventType == "update" {
refreshCR()
- case "stderr":
- for _, line := range strings.Split(msg.Properties.Text, "\n") {
- if line != "" {
+ }
+ case <-logWaitDone:
+ any := false
+ for _, fnm := range []string{"stderr.txt", "crunchstat.txt"} {
+ req, err := http.NewRequest("GET", "https://"+runner.Client.APIHost+"/arvados/v1/container_requests/"+cr.UUID+"/log/"+cr.ContainerUUID+"/"+fnm, nil)
+ if err != nil {
+ log.Errorf("error preparing log request: %s", err)
+ continue
+ }
+ req.Header.Set("Range", fmt.Sprintf("bytes=%d-", logTell[fnm]))
+ resp, err := runner.Client.Do(req)
+ if err != nil {
+ log.Errorf("error getting log data: %s", err)
+ continue
+ } else if (resp.StatusCode == http.StatusNotFound && logTell[fnm] == 0) ||
+ (resp.StatusCode == http.StatusRequestedRangeNotSatisfiable && logTell[fnm] > 0) {
+ continue
+ } else if resp.StatusCode >= 300 {
+ log.Errorf("error getting log data: %s", resp.Status)
+ continue
+ }
+ logdata, err := io.ReadAll(resp.Body)
+ if err != nil {
+ log.Errorf("error reading log data: %s", err)
+ continue
+ }
+ if len(logdata) == 0 {
+ continue
+ }
+ for {
+ eol := bytes.Index(logdata, []byte{'\n'})
+ if eol < 0 {
+ break
+ }
+ line := string(logdata[:eol])
+ logdata = logdata[eol+1:]
+ logTell[fnm] += int64(eol + 1)
+ if len(line) == 0 {
+ continue
+ }
+ any = true
+ if fnm == "stderr.txt" {
fmt.Fprint(os.Stderr, neednewline)
neednewline = ""
log.Print(line)
+ } else if fnm == "crunchstat.txt" {
+ m := reCrunchstat.FindStringSubmatch(line)
+ if m != nil {
+ rss, _ := strconv.ParseInt(m[1], 10, 64)
+ fmt.Fprintf(os.Stderr, "%s rss %.3f GB \r", cr.UUID, float64(rss)/1e9)
+ neednewline = "\n"
+ }
}
}
- case "crunchstat":
- for _, line := range strings.Split(msg.Properties.Text, "\n") {
- m := reCrunchstat.FindStringSubmatch(line)
- if m != nil {
- rss, _ := strconv.ParseInt(m[1], 10, 64)
- fmt.Fprintf(os.Stderr, "%s rss %.3f GB \r", cr.UUID, float64(rss)/1e9)
- neednewline = "\n"
- }
+ }
+ if any {
+ logWait = logWaitMin
+ } else {
+ logWait = logWait * 2
+ if logWait > logWaitMax {
+ logWait = logWaitMax
}
}
+ logWaitDone = time.After(logWait)
}
}
fmt.Fprint(os.Stderr, neednewline)