"git.arvados.org/arvados.git/sdk/go/arvadosclient"
"git.arvados.org/arvados.git/sdk/go/keepclient"
"git.arvados.org/arvados.git/sdk/go/manifest"
+ "golang.org/x/crypto/ssh"
"golang.org/x/net/context"
dockertypes "github.com/docker/docker/api/types"
arvMountLog *ThrottledLogger
containerWatchdogInterval time.Duration
+
+ gatewayAddress string
+ gatewaySSHConfig *ssh.ServerConfig
+ gatewayAuthSecret string
}
// setupSignals sets up signal handling to gracefully terminate the underlying
}
for bind := range runner.SecretMounts {
if _, ok := runner.Container.Mounts[bind]; ok {
- return fmt.Errorf("Secret mount %q conflicts with regular mount", bind)
+ return fmt.Errorf("secret mount %q conflicts with regular mount", bind)
}
if runner.SecretMounts[bind].Kind != "json" &&
runner.SecretMounts[bind].Kind != "text" {
- return fmt.Errorf("Secret mount %q type is %q but only 'json' and 'text' are permitted.",
+ return fmt.Errorf("secret mount %q type is %q but only 'json' and 'text' are permitted",
bind, runner.SecretMounts[bind].Kind)
}
binds = append(binds, bind)
if bind == "stdout" || bind == "stderr" {
// Is it a "file" mount kind?
if mnt.Kind != "file" {
- return fmt.Errorf("Unsupported mount kind '%s' for %s. Only 'file' is supported.", mnt.Kind, bind)
+ return fmt.Errorf("unsupported mount kind '%s' for %s: only 'file' is supported", mnt.Kind, bind)
}
// Does path start with OutputPath?
if bind == "stdin" {
// Is it a "collection" mount kind?
if mnt.Kind != "collection" && mnt.Kind != "json" {
- return fmt.Errorf("Unsupported mount kind '%s' for stdin. Only 'collection' or 'json' are supported.", mnt.Kind)
+ return fmt.Errorf("unsupported mount kind '%s' for stdin: only 'collection' and 'json' are supported", mnt.Kind)
}
}
if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && bind != runner.Container.OutputPath+"/" {
if mnt.Kind != "collection" && mnt.Kind != "text" && mnt.Kind != "json" {
- return fmt.Errorf("Only mount points of kind 'collection', 'text' or 'json' are supported underneath the output_path for %q, was %q", bind, mnt.Kind)
+ return fmt.Errorf("only mount points of kind 'collection', 'text' or 'json' are supported underneath the output_path for %q, was %q", bind, mnt.Kind)
}
}
case mnt.Kind == "collection" && bind != "stdin":
var src string
if mnt.UUID != "" && mnt.PortableDataHash != "" {
- return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
+ return fmt.Errorf("cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
}
if mnt.UUID != "" {
if mnt.Writable {
- return fmt.Errorf("Writing to existing collections currently not permitted.")
+ return fmt.Errorf("writing to existing collections currently not permitted")
}
pdhOnly = false
src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
} else if mnt.PortableDataHash != "" {
if mnt.Writable && !strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
- return fmt.Errorf("Can never write to a collection specified by portable data hash")
+ return fmt.Errorf("can never write to a collection specified by portable data hash")
}
idx := strings.Index(mnt.PortableDataHash, "/")
if idx > 0 {
src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
arvMountCmd = append(arvMountCmd, "--mount-tmp")
arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
- tmpcount += 1
+ tmpcount++
}
if mnt.Writable {
if bind == runner.Container.OutputPath {
var tmpdir string
tmpdir, err = runner.MkTempDir(runner.parentTemp, "tmp")
if err != nil {
- return fmt.Errorf("While creating mount temp dir: %v", err)
+ return fmt.Errorf("while creating mount temp dir: %v", err)
}
st, staterr := os.Stat(tmpdir)
if staterr != nil {
- return fmt.Errorf("While Stat on temp dir: %v", staterr)
+ return fmt.Errorf("while Stat on temp dir: %v", staterr)
}
err = os.Chmod(tmpdir, st.Mode()|os.ModeSetgid|0777)
if staterr != nil {
- return fmt.Errorf("While Chmod temp dir: %v", err)
+ return fmt.Errorf("while Chmod temp dir: %v", err)
}
runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", tmpdir, bind))
if bind == runner.Container.OutputPath {
}
if runner.HostOutputDir == "" {
- return fmt.Errorf("Output path does not correspond to a writable mount point")
+ return fmt.Errorf("output path does not correspond to a writable mount point")
}
if wantAPI := runner.Container.RuntimeConstraints.API; needCertMount && wantAPI != nil && *wantAPI {
runner.ArvMount, err = runner.RunArvMount(arvMountCmd, token)
if err != nil {
- return fmt.Errorf("While trying to start arv-mount: %v", err)
+ return fmt.Errorf("while trying to start arv-mount: %v", err)
}
for _, p := range collectionPaths {
_, err = os.Stat(p)
if err != nil {
- return fmt.Errorf("While checking that input files exist: %v", err)
+ return fmt.Errorf("while checking that input files exist: %v", err)
}
}
for _, cp := range copyFiles {
st, err := os.Stat(cp.src)
if err != nil {
- return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err)
+ return fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err)
}
if st.IsDir() {
err = filepath.Walk(cp.src, func(walkpath string, walkinfo os.FileInfo, walkerr error) error {
}
return os.Chmod(target, walkinfo.Mode()|os.ModeSetgid|0777)
} else {
- return fmt.Errorf("Source %q is not a regular file or directory", cp.src)
+ return fmt.Errorf("source %q is not a regular file or directory", cp.src)
}
})
} else if st.Mode().IsRegular() {
}
}
if err != nil {
- return fmt.Errorf("While staging writable file from %q to %q: %v", cp.src, cp.bind, err)
+ return fmt.Errorf("while staging writable file from %q to %q: %v", cp.src, cp.bind, err)
}
}
// If stdin mount is provided, attach it to the docker container
var stdinRdr arvados.File
- var stdinJson []byte
+ var stdinJSON []byte
if stdinMnt, ok := runner.Container.Mounts["stdin"]; ok {
if stdinMnt.Kind == "collection" {
var stdinColl arvados.Collection
- collId := stdinMnt.UUID
- if collId == "" {
- collId = stdinMnt.PortableDataHash
+ collID := stdinMnt.UUID
+ if collID == "" {
+ collID = stdinMnt.PortableDataHash
}
- err = runner.ContainerArvClient.Get("collections", collId, nil, &stdinColl)
+ err = runner.ContainerArvClient.Get("collections", collID, nil, &stdinColl)
if err != nil {
return fmt.Errorf("While getting stdin collection: %v", err)
}
return fmt.Errorf("While getting stdin collection path %v: %v", stdinMnt.Path, err)
}
} else if stdinMnt.Kind == "json" {
- stdinJson, err = json.Marshal(stdinMnt.Content)
+ stdinJSON, err = json.Marshal(stdinMnt.Content)
if err != nil {
return fmt.Errorf("While encoding stdin json data: %v", err)
}
}
}
- stdinUsed := stdinRdr != nil || len(stdinJson) != 0
+ stdinUsed := stdinRdr != nil || len(stdinJSON) != 0
response, err := runner.Docker.ContainerAttach(context.TODO(), runner.ContainerID,
dockertypes.ContainerAttachOptions{Stream: true, Stdin: stdinUsed, Stdout: true, Stderr: true})
if err != nil {
stdinRdr.Close()
response.CloseWrite()
}()
- } else if len(stdinJson) != 0 {
+ } else if len(stdinJSON) != 0 {
go func() {
- _, err := io.Copy(response.Conn, bytes.NewReader(stdinJson))
+ _, err := io.Copy(response.Conn, bytes.NewReader(stdinJSON))
if err != nil {
runner.CrunchLog.Printf("While writing stdin json to docker container: %v", err)
runner.stop(nil)
return ErrCancelled
}
return runner.DispatcherArvClient.Update("containers", runner.Container.UUID,
- arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
+ arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running", "gateway_address": runner.gatewayAddress}}, nil)
}
// ContainerToken returns the api_token the container (and any
}
}
- containerId := flags.Arg(0)
+ containerID := flags.Arg(0)
switch {
case *detach && !ignoreDetachFlag:
- return Detach(containerId, prog, args, os.Stdout, os.Stderr)
+ return Detach(containerID, prog, args, os.Stdout, os.Stderr)
case *kill >= 0:
- return KillProcess(containerId, syscall.Signal(*kill), os.Stdout, os.Stderr)
+ return KillProcess(containerID, syscall.Signal(*kill), os.Stdout, os.Stderr)
case *list:
return ListProcesses(os.Stdout, os.Stderr)
}
- if containerId == "" {
+ if containerID == "" {
log.Printf("usage: %s [options] UUID", prog)
return 1
}
api, err := arvadosclient.MakeArvadosClient()
if err != nil {
- log.Printf("%s: %v", containerId, err)
+ log.Printf("%s: %v", containerID, err)
return 1
}
api.Retries = 8
kc, kcerr := keepclient.MakeKeepClient(api)
if kcerr != nil {
- log.Printf("%s: %v", containerId, kcerr)
+ log.Printf("%s: %v", containerID, kcerr)
return 1
}
kc.BlockCache = &keepclient.BlockCache{MaxBlocks: 2}
// minimum version we want to support.
docker, dockererr := dockerclient.NewClient(dockerclient.DefaultDockerHost, "1.21", nil, nil)
- cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerId)
+ cr, err := NewContainerRunner(arvados.NewClientFromEnv(), api, kc, docker, containerID)
if err != nil {
log.Print(err)
return 1
}
if dockererr != nil {
- cr.CrunchLog.Printf("%s: %v", containerId, dockererr)
+ cr.CrunchLog.Printf("%s: %v", containerID, dockererr)
cr.checkBrokenNode(dockererr)
cr.CrunchLog.Close()
return 1
}
- parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerId+".")
+ cr.gatewayAuthSecret = os.Getenv("GatewayAuthSecret")
+ os.Unsetenv("GatewayAuthSecret")
+ err = cr.startGatewayServer()
+ if err != nil {
+ log.Printf("error starting gateway server: %s", err)
+ return 1
+ }
+
+ parentTemp, tmperr := cr.MkTempDir("", "crunch-run."+containerID+".")
if tmperr != nil {
- log.Printf("%s: %v", containerId, tmperr)
+ log.Printf("%s: %v", containerID, tmperr)
return 1
}
}
if runerr != nil {
- log.Printf("%s: %v", containerId, runerr)
+ log.Printf("%s: %v", containerID, runerr)
return 1
}
return 0