arvMountLog *ThrottledLogger
containerWatchdogInterval time.Duration
+
+ gateway Gateway
}
// setupSignals sets up signal handling to gracefully terminate the underlying
src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
arvMountCmd = append(arvMountCmd, "--mount-tmp")
arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
- tmpcount += 1
+ tmpcount++
}
if mnt.Writable {
if bind == runner.Container.OutputPath {
return fmt.Errorf("output path does not correspond to a writable mount point")
}
- if wantAPI := runner.Container.RuntimeConstraints.API; needCertMount && wantAPI != nil && *wantAPI {
+ if needCertMount && runner.Container.RuntimeConstraints.API {
for _, certfile := range arvadosclient.CertFiles {
_, err := os.Stat(certfile)
if err == nil {
// If stdin mount is provided, attach it to the docker container
var stdinRdr arvados.File
- var stdinJson []byte
+ var stdinJSON []byte
if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
if stdinMnt.Kind == "collection" {
var stdinColl arvados.Collection
- collId := stdinMnt.UUID
- if collId == "" {
- collId = stdinMnt.PortableDataHash
+ collID := stdinMnt.UUID
+ if collID == "" {
+ collID = stdinMnt.PortableDataHash
}
- err = runner.ContainerArvClient.Get("collections", collId, nil, &stdinColl)
+ err = runner.ContainerArvClient.Get("collections", collID, nil, &stdinColl)
if err != nil {
return fmt.Errorf("While getting stdin collection: %v", err)
}
return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
}
} else if stdinMnt.Kind == "json" {
- stdinJson, err = json.Marshal(stdinMnt.Content)
+ stdinJSON, err = json.Marshal(stdinMnt.Content)
if err != nil {
return fmt.Errorf("While encoding stdin json data: %v", err)
}
}
}
- stdinUsed := stdinRdr != nil || len(stdinJson) != 0
+ stdinUsed := stdinRdr != nil || len(stdinJSON) != 0
response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
if err != nil {
stdinRdr.Close()
response.CloseWrite()
}()
- } else if len(stdinJson) != 0 {
+ } else if len(stdinJSON) != 0 {
go func() {
- _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson))
+ _, err := io.Copy(response.Conn, bytes.NewReader(stdinJSON))
if err != nil {
runner.CrunchLog.Printf("While writing stdin json to docker container: %v", err)
runner.stop(nil)
},
}
- if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
+ if runner.Container.RuntimeConstraints.API {
tok, err := runner.ContainerToken()
if err != nil {
return err
// CaptureOutput saves data from the container's output directory if
// needed, and updates the container output accordingly.
func (runner *ContainerRunner) CaptureOutput() error {
- if wantAPI := runner.Container.RuntimeConstraints.API; wantAPI != nil && *wantAPI {
+ if runner.Container.RuntimeConstraints.API {
// Output may have been set directly by the container, so
// refresh the container record to check.
err := runner.DispatcherArvClient.Get("containers", runner.Container.UUID,
// Already finalized.
return
}
- mt, err := runner.LogCollection.MarshalManifest(".")
- if err != nil {
- err = fmt.Errorf("error creating log manifest: %v", err)
- return
- }
updates := arvadosclient.Dict{
- "name": "logs for " + runner.Container.UUID,
- "manifest_text": mt,
+ "name": "logs for " + runner.Container.UUID,
}
+ mt, err1 := runner.LogCollection.MarshalManifest(".")
+ if err1 == nil {
+ // Only send updated manifest text if there was no
+ // error.
+ updates["manifest_text"] = mt
+ }
+
+ // Even if flushing the manifest had an error, we still want
+ // to update the log record, if possible, to push the trash_at
+ // and delete_at times into the future. Details on bug
+ // #17293.
if final {
updates["is_trashed"] = true
} else {
updates["delete_at"] = exp
}
reqBody := arvadosclient.Dict{"collection": updates}
+ var err2 error
if runner.logUUID == "" {
reqBody["ensure_unique_name"] = true
- err = runner.DispatcherArvClient.Create("collections", reqBody, &response)
+ err2 = runner.DispatcherArvClient.Create("collections", reqBody, &response)
} else {
- err = runner.DispatcherArvClient.Update("collections", runner.logUUID, reqBody, &response)
+ err2 = runner.DispatcherArvClient.Update("collections", runner.logUUID, reqBody, &response)
}
- if err != nil {
- return
+ if err2 == nil {
+ runner.logUUID = response.UUID
+ }
+
+ if err1 != nil || err2 != nil {
+ err = fmt.Errorf("error recording logs: %q, %q", err1, err2)
}
- runner.logUUID = response.UUID
return
}
return ErrCancelled
}
return runner.DispatcherArvClient.Update("containers", runner.Container.UUID,
- arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
+ arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running", "gateway_address": runner.gateway.Address}}, nil)
}
// ContainerToken returns the api_token the container (and any
}
}
- containerId := flags.Arg(0)
+ containerID := flags.Arg(0)
switch {
case *detach && !ignoreDetachFlag:
- return Detach(containerId, prog, args, os.Stdout, os.Stderr)
+ return Detach(containerID, prog, args, os.Stdout, os.Stderr)
case *kill >= 0:
- return KillProcess(containerId, syscall.Signal(*kill), os.Stdout, os.Stderr)
+ return KillProcess(containerID, syscall.Signal(*kill), os.Stdout, os.Stderr)
case *list:
return ListProcesses(os.Stdout, os.Stderr)
}
- if containerId == "" {
+ if containerID == "" {
log.Printf("usage: %s [options] UUID", prog)
return 1
}
api, err := arvadosclient.MakeArvadosClient()
if err != nil {
- log.Printf("%s: %v", containerId, err)
+ log.Printf("%s: %v", containerID, err)
return 1
}
api.Retries = 8
kc, kcerr := keepclient.MakeKeepClient(api)
if kcerr != nil {
- log.Printf("%s: %v", containerId, kcerr)
+ log.Printf("%s: %v", containerID, kcerr)
return 1
}
kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
// minimum version we want to support.
docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
- cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerId)
+ cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerID)
if err != nil {
log.Print(err)
return 1
}
if dockererr != nil {
- cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
+ cr.CrunchLog.Printf("%s: %v", containerID, dockererr)
cr.checkBrokenNode(dockererr)
cr.CrunchLog.Close()
return 1
}
- parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerId+".")
+ cr.gateway = Gateway{
+ Address: os.Getenv("GatewayAddress"),
+ AuthSecret: os.Getenv("GatewayAuthSecret"),
+ ContainerUUID: containerID,
+ DockerContainerID: &cr.ContainerID,
+ Log: cr.CrunchLog,
+ }
+ os.Unsetenv("GatewayAuthSecret")
+ err = cr.gateway.Start()
+ if err != nil {
+ log.Printf("error starting gateway server: %s", err)
+ return 1
+ }
+
+ parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerID+".")
if tmperr != nil {
- log.Printf("%s: %v", containerId, tmperr)
+ log.Printf("%s: %v", containerID, tmperr)
return 1
}
}
if runerr != nil {
- log.Printf("%s: %v", containerId, runerr)
+ log.Printf("%s: %v", containerID, runerr)
return 1
}
return 0