10666: Merge branch 'master' into 10666-report-version
authorLucas Di Pentima <ldipentima@veritasgenetics.com>
Mon, 4 Dec 2017 19:53:20 +0000 (16:53 -0300)
committerLucas Di Pentima <ldipentima@veritasgenetics.com>
Mon, 4 Dec 2017 19:53:46 +0000 (16:53 -0300)
Arvados-DCO-1.1-Signed-off-by: Lucas Di Pentima <ldipentima@veritasgenetics.com>

1  2 
services/crunch-run/crunchrun.go
services/keep-web/handler.go
services/keepproxy/keepproxy.go
services/keepstore/keepstore.go

index 41027817accb94625175c2ce9f91d5ac44b113ae,fc0dda718ceda7fddd2e1f41c704fa434fdb0034..0980ba11503d0faffa4134db5022431448b561b6
@@@ -39,8 -39,6 +39,8 @@@ import 
        dockerclient "github.com/docker/docker/client"
  )
  
 +var version = "dev"
 +
  // IArvadosClient is the minimal Arvados API methods used by crunch-run.
  type IArvadosClient interface {
        Create(resourceType string, parameters arvadosclient.Dict, output interface{}) error
@@@ -230,6 -228,32 +230,32 @@@ func (runner *ContainerRunner) stopSign
        }
  }
  
+ var errorBlacklist = []string{"Cannot connect to the Docker daemon"}
+ var brokenNodeHook *string = flag.String("broken-node-hook", "", "Script to run if node is detected to be broken (for example, Docker daemon is not running)")
+ func (runner *ContainerRunner) checkBrokenNode(goterr error) bool {
+       for _, d := range errorBlacklist {
+               if strings.Index(goterr.Error(), d) != -1 {
+                       runner.CrunchLog.Printf("Error suggests node is unable to run containers: %v", goterr)
+                       if *brokenNodeHook == "" {
+                               runner.CrunchLog.Printf("No broken node hook provided, cannot mark node as broken.")
+                       } else {
+                               runner.CrunchLog.Printf("Running broken node hook %q", *brokenNodeHook)
+                               // run killme script
+                               c := exec.Command(*brokenNodeHook)
+                               c.Stdout = runner.CrunchLog
+                               c.Stderr = runner.CrunchLog
+                               err := c.Run()
+                               if err != nil {
+                                       runner.CrunchLog.Printf("Error running broken node hook: %v", err)
+                               }
+                       }
+                       return true
+               }
+       }
+       return false
+ }
  // LoadImage determines the docker image id from the container record and
  // checks if it is available in the local Docker image store.  If not, it loads
  // the image from Keep.
@@@ -618,7 -642,7 +644,7 @@@ type infoCommand struct 
        cmd   []string
  }
  
 -// Gather node information and store it on the log for debugging
 +// LogNodeInfo gathers node information and store it on the log for debugging
  // purposes.
  func (runner *ContainerRunner) LogNodeInfo() (err error) {
        w := runner.NewLogWriter("node-info")
        return nil
  }
  
 -// Get and save the raw JSON container record from the API server
 +// LogContainerRecord gets and saves the raw JSON container record from the API server
  func (runner *ContainerRunner) LogContainerRecord() (err error) {
        w := &ArvLogWriter{
                ArvClient:     runner.ArvClient,
@@@ -1420,7 -1444,6 +1446,7 @@@ func (runner *ContainerRunner) NewArvLo
  
  // Run the full container lifecycle.
  func (runner *ContainerRunner) Run() (err error) {
 +      runner.CrunchLog.Printf("crunch-run %s started", version)
        runner.CrunchLog.Printf("Executing container '%s'", runner.Container.UUID)
  
        hostname, hosterr := os.Hostname()
        // check for and/or load image
        err = runner.LoadImage()
        if err != nil {
-               runner.finalState = "Cancelled"
+               if !runner.checkBrokenNode(err) {
+                       // Failed to load image but not due to a "broken node"
+                       // condition, probably user error.
+                       runner.finalState = "Cancelled"
+               }
                err = fmt.Errorf("While loading container image: %v", err)
                return
        }
                return
        }
  
-       runner.StartCrunchstat()
        if runner.IsCancelled() {
                return
        }
        }
        runner.finalState = "Cancelled"
  
+       runner.StartCrunchstat()
        err = runner.StartContainer()
        if err != nil {
+               runner.checkBrokenNode(err)
                return
        }
  
@@@ -1596,17 -1624,8 +1627,17 @@@ func main() 
                `Set networking mode for container.  Corresponds to Docker network mode (--net).
        `)
        memprofile := flag.String("memprofile", "", "write memory profile to `file` after running container")
 +      getVersion := flag.Bool("version", false, "Print version information and exit.")
        flag.Parse()
  
 +      // Print version information if requested
 +      if *getVersion {
 +              fmt.Printf("crunch-run %s\n", version)
 +              return
 +      }
 +
 +      log.Printf("crunch-run %s started", version)
 +
        containerId := flag.Arg(0)
  
        if *caCertsPath != "" {
        }
        api.Retries = 8
  
-       var kc *keepclient.KeepClient
-       kc, err = keepclient.MakeKeepClient(api)
-       if err != nil {
-               log.Fatalf("%s: %v", containerId, err)
+       kc, kcerr := keepclient.MakeKeepClient(api)
+       if kcerr != nil {
+               log.Fatalf("%s: %v", containerId, kcerr)
        }
        kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
        kc.Retries = 4
  
-       var docker *dockerclient.Client
        // API version 1.21 corresponds to Docker 1.9, which is currently the
        // minimum version we want to support.
-       docker, err = dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
-       if err != nil {
-               log.Fatalf("%s: %v", containerId, err)
-       }
+       docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
        dockerClientProxy := ThinDockerClientProxy{Docker: docker}
  
        cr := NewContainerRunner(api, kc, dockerClientProxy, containerId)
+       if dockererr != nil {
+               cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
+               cr.checkBrokenNode(dockererr)
+               cr.CrunchLog.Close()
+               os.Exit(1)
+       }
        cr.statInterval = *statInterval
        cr.cgroupRoot = *cgroupRoot
        cr.expectCgroupParent = *cgroupParent
index 7bdd521226a5ab8789862a2e5d3f3f274a059493,4222e3822e14d9d35fbccc84c58b5f8f0ca45182..a1476d3a8eb1b62fad8ea519702ec290e7f3472c
@@@ -10,6 -10,7 +10,7 @@@ import 
        "html"
        "html/template"
        "io"
+       "log"
        "net/http"
        "net/url"
        "os"
@@@ -90,18 -91,70 +91,72 @@@ func (h *handler) setup() 
  func (h *handler) serveStatus(w http.ResponseWriter, r *http.Request) {
        status := struct {
                cacheStats
 +              Version string
        }{
                cacheStats: h.Config.Cache.Stats(),
 +              Version:    version,
        }
        json.NewEncoder(w).Encode(status)
  }
  
+ // updateOnSuccess wraps httpserver.ResponseWriter. If the handler
+ // sends an HTTP header indicating success, updateOnSuccess first
+ // calls the provided update func. If the update func fails, a 500
+ // response is sent, and the status code and body sent by the handler
+ // are ignored (all response writes return the update error).
+ type updateOnSuccess struct {
+       httpserver.ResponseWriter
+       update     func() error
+       sentHeader bool
+       err        error
+ }
+ func (uos *updateOnSuccess) Write(p []byte) (int, error) {
+       if uos.err != nil {
+               return 0, uos.err
+       }
+       if !uos.sentHeader {
+               uos.WriteHeader(http.StatusOK)
+       }
+       return uos.ResponseWriter.Write(p)
+ }
+ func (uos *updateOnSuccess) WriteHeader(code int) {
+       if !uos.sentHeader {
+               uos.sentHeader = true
+               if code >= 200 && code < 400 {
+                       if uos.err = uos.update(); uos.err != nil {
+                               code := http.StatusInternalServerError
+                               if err, ok := uos.err.(*arvados.TransactionError); ok {
+                                       code = err.StatusCode
+                               }
+                               log.Printf("update() changes response to HTTP %d: %T %q", code, uos.err, uos.err)
+                               http.Error(uos.ResponseWriter, uos.err.Error(), code)
+                               return
+                       }
+               }
+       }
+       uos.ResponseWriter.WriteHeader(code)
+ }
  var (
+       writeMethod = map[string]bool{
+               "COPY":   true,
+               "DELETE": true,
+               "MKCOL":  true,
+               "MOVE":   true,
+               "PUT":    true,
+               "RMCOL":  true,
+       }
        webdavMethod = map[string]bool{
+               "COPY":     true,
+               "DELETE":   true,
+               "MKCOL":    true,
+               "MOVE":     true,
                "OPTIONS":  true,
                "PROPFIND": true,
+               "PUT":      true,
+               "RMCOL":    true,
        }
        browserMethod = map[string]bool{
                "GET":  true,
@@@ -149,7 -202,7 +204,7 @@@ func (h *handler) ServeHTTP(wOrig http.
                        return
                }
                w.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type, Range")
-               w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS, PROPFIND")
+               w.Header().Set("Access-Control-Allow-Methods", "COPY, DELETE, GET, MKCOL, MOVE, OPTIONS, POST, PROPFIND, PUT, RMCOL")
                w.Header().Set("Access-Control-Allow-Origin", "*")
                w.Header().Set("Access-Control-Max-Age", "86400")
                statusCode = http.StatusOK
        }
        applyContentDispositionHdr(w, r, basename, attachment)
  
-       fs := collection.FileSystem(&arvados.Client{
+       client := &arvados.Client{
                APIHost:   arv.ApiServer,
                AuthToken: arv.ApiToken,
                Insecure:  arv.ApiInsecure,
-       }, kc)
+       }
+       fs, err := collection.FileSystem(client, kc)
+       if err != nil {
+               statusCode, statusText = http.StatusInternalServerError, err.Error()
+               return
+       }
+       targetIsPDH := arvadosclient.PDHMatch(targetID)
+       if targetIsPDH && writeMethod[r.Method] {
+               statusCode, statusText = http.StatusMethodNotAllowed, errReadOnly.Error()
+               return
+       }
        if webdavMethod[r.Method] {
+               if writeMethod[r.Method] {
+                       // Save the collection only if/when all
+                       // webdav->filesystem operations succeed --
+                       // and send a 500 error if the modified
+                       // collection can't be saved.
+                       w = &updateOnSuccess{
+                               ResponseWriter: w,
+                               update: func() error {
+                                       return h.Config.Cache.Update(client, *collection, fs)
+                               }}
+               }
                h := webdav.Handler{
-                       Prefix:     "/" + strings.Join(pathParts[:stripParts], "/"),
-                       FileSystem: &webdavFS{collfs: fs},
+                       Prefix: "/" + strings.Join(pathParts[:stripParts], "/"),
+                       FileSystem: &webdavFS{
+                               collfs:  fs,
+                               writing: writeMethod[r.Method],
+                       },
                        LockSystem: h.webdavLS,
                        Logger: func(_ *http.Request, err error) {
-                               if os.IsNotExist(err) {
-                                       statusCode, statusText = http.StatusNotFound, err.Error()
-                               } else if err != nil {
-                                       statusCode, statusText = http.StatusInternalServerError, err.Error()
+                               if err != nil {
+                                       log.Printf("error from webdav handler: %q", err)
                                }
                        },
                }
index 23a22a86a328847d772daea8e44af90ef316f375,e2a6221f10e28981ea0c3f64fa5ce6d52ac718ea..145b39d4c3d1e643983c6f517eb31ff2c8d417fd
@@@ -10,7 -10,6 +10,6 @@@ import 
        "fmt"
        "io"
        "io/ioutil"
-       "log"
        "net"
        "net/http"
        "os"
        "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
        "git.curoverse.com/arvados.git/sdk/go/config"
        "git.curoverse.com/arvados.git/sdk/go/health"
+       "git.curoverse.com/arvados.git/sdk/go/httpserver"
        "git.curoverse.com/arvados.git/sdk/go/keepclient"
+       log "github.com/Sirupsen/logrus"
        "github.com/coreos/go-systemd/daemon"
        "github.com/ghodss/yaml"
        "github.com/gorilla/mux"
  )
  
 +var version = "dev"
 +
  type Config struct {
        Client          arvados.Client
        Listen          string
@@@ -57,7 -56,13 +58,13 @@@ var 
        router   http.Handler
  )
  
+ const rfc3339NanoFixed = "2006-01-02T15:04:05.000000000Z07:00"
  func main() {
+       log.SetFormatter(&log.JSONFormatter{
+               TimestampFormat: rfc3339NanoFixed,
+       })
        cfg := DefaultConfig()
  
        flagset := flag.NewFlagSet("keepproxy", flag.ExitOnError)
        const defaultCfgPath = "/etc/arvados/keepproxy/keepproxy.yml"
        flagset.StringVar(&cfgPath, "config", defaultCfgPath, "Configuration file `path`")
        dumpConfig := flagset.Bool("dump-config", false, "write current configuration to stdout and exit")
 +      getVersion := flagset.Bool("version", false, "Print version information and exit.")
        flagset.Parse(os.Args[1:])
  
 +      // Print version information if requested
 +      if *getVersion {
 +              fmt.Printf("keepproxy %s\n", version)
 +              return
 +      }
 +
        err := config.LoadFile(cfg, cfgPath)
        if err != nil {
                h := os.Getenv("ARVADOS_API_HOST")
                log.Fatal(config.DumpAndExit(cfg))
        }
  
 +      log.Printf("keepproxy %s started", version)
 +
        arv, err := arvadosclient.New(&cfg.Client)
        if err != nil {
                log.Fatalf("Error setting up arvados client %s", err.Error())
  
        // Start serving requests.
        router = MakeRESTRouter(!cfg.DisableGet, !cfg.DisablePut, kc, time.Duration(cfg.Timeout), cfg.ManagementToken)
-       http.Serve(listener, router)
+       http.Serve(listener, httpserver.AddRequestIDs(httpserver.LogRequests(router)))
  
        log.Println("shutting down")
  }
@@@ -607,7 -603,8 +614,8 @@@ func (h *proxyHandler) makeKeepClient(r
                        Timeout:   h.timeout,
                        Transport: h.transport,
                },
-               proto: req.Proto,
+               proto:     req.Proto,
+               requestID: req.Header.Get("X-Request-Id"),
        }
        return &kc
  }
index f3d30d9831a10c5aaf2f4c5d997e66c3577bb39f,e422179f643e9cad438742cd2aa450d52ae84fa4..b8a0ffb1cba46777ff1e2d1c745eb8102ea5fa61
@@@ -22,8 -22,6 +22,8 @@@ import 
        "github.com/coreos/go-systemd/daemon"
  )
  
 +var version = "dev"
 +
  // A Keep "block" is 64MB.
  const BlockSize = 64 * 1024 * 1024
  
@@@ -91,7 -89,6 +91,7 @@@ func main() 
        deprecated.beforeFlagParse(theConfig)
  
        dumpConfig := flag.Bool("dump-config", false, "write current configuration to stdout and exit (useful for migrating from command line flags to config file)")
 +      getVersion := flag.Bool("version", false, "Print version information and exit.")
  
        defaultConfigPath := "/etc/arvados/keepstore/keepstore.yml"
        var configPath string
        flag.Usage = usage
        flag.Parse()
  
 +      // Print version information if requested
 +      if *getVersion {
 +              fmt.Printf("keepstore %s\n", version)
 +              return
 +      }
 +
        deprecated.afterFlagParse(theConfig)
  
        err := config.LoadFile(theConfig, configPath)
                log.Fatal(config.DumpAndExit(theConfig))
        }
  
 +      log.Printf("keepstore %s started", version)
 +
        err = theConfig.Start()
        if err != nil {
                log.Fatal(err)
        // Start a round-robin VolumeManager with the volumes we have found.
        KeepVM = MakeRRVolumeManager(theConfig.Volumes)
  
-       // Middleware stack: logger, MaxRequests limiter, method handlers
+       // Middleware/handler stack
        router := MakeRESTRouter()
        limiter := httpserver.NewRequestLimiter(theConfig.MaxRequests, router)
        router.limiter = limiter
-       http.Handle("/", &LoggingRESTRouter{router: limiter})
+       http.Handle("/", httpserver.AddRequestIDs(httpserver.LogRequests(limiter)))
  
        // Set up a TCP listener.
        listener, err := net.Listen("tcp", theConfig.Listen)