import (
"crypto/x509"
+ "encoding/json"
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "io"
"io/ioutil"
"log"
"net/http"
"os"
"os/exec"
"os/signal"
- "path"
"strings"
"syscall"
)
Env map[string]string `json:"task.env"`
Stdin string `json:"task.stdin"`
Stdout string `json:"task.stdout"`
+ Stderr string `json:"task.stderr"`
Vwd map[string]string `json:"task.vwd"`
SuccessCodes []int `json:"task.successCodes"`
PermanentFailCodes []int `json:"task.permanentFailCodes"`
TemporaryFailCodes []int `json:"task.temporaryFailCodes"`
+ KeepTmpOutput bool `json:"task.keepTmpOutput"`
}
type Tasks struct {
Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
}
-func setupDirectories(crunchtmpdir, taskUuid string) (tmpdir, outdir string, err error) {
+func setupDirectories(crunchtmpdir, taskUuid string, keepTmp bool) (tmpdir, outdir string, err error) {
tmpdir = crunchtmpdir + "/tmpdir"
err = os.Mkdir(tmpdir, 0700)
if err != nil {
return "", "", err
}
- outdir = crunchtmpdir + "/outdir"
- err = os.Mkdir(outdir, 0700)
- if err != nil {
- return "", "", err
+ if keepTmp {
+ outdir = os.Getenv("TASK_KEEPMOUNT_TMP")
+ } else {
+ outdir = crunchtmpdir + "/outdir"
+ err = os.Mkdir(outdir, 0700)
+ if err != nil {
+ return "", "", err
+ }
}
return tmpdir, outdir, nil
return nil
}
-func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout string, err error) {
+func copyFile(dst, src string) error {
+ in, err := os.Open(src)
+ if err != nil {
+ return err
+ }
+ defer in.Close()
+
+ out, err := os.Create(dst)
+ if err != nil {
+ return err
+ }
+ defer out.Close()
+
+ _, err = io.Copy(out, in)
+ return err
+}
+
+func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout, stderr string, err error) {
if taskp.Vwd != nil {
for k, v := range taskp.Vwd {
v = substitute(v, replacements)
err = checkOutputFilename(outdir, k)
if err != nil {
- return "", "", err
+ return "", "", "", err
+ }
+ if taskp.KeepTmpOutput {
+ err = copyFile(v, outdir+"/"+k)
+ } else {
+ err = os.Symlink(v, outdir+"/"+k)
+ }
+ if err != nil {
+ return "", "", "", err
}
- os.Symlink(v, outdir+"/"+k)
}
}
stdin = substitute(taskp.Stdin, replacements)
cmd.Stdin, err = os.Open(stdin)
if err != nil {
- return "", "", err
+ return "", "", "", err
}
}
if taskp.Stdout != "" {
err = checkOutputFilename(outdir, taskp.Stdout)
if err != nil {
- return "", "", err
+ return "", "", "", err
}
// Set up stdout redirection
stdout = outdir + "/" + taskp.Stdout
cmd.Stdout, err = os.Create(stdout)
if err != nil {
- return "", "", err
+ return "", "", "", err
}
} else {
cmd.Stdout = os.Stdout
}
- cmd.Stderr = os.Stderr
+ if taskp.Stderr != "" {
+ err = checkOutputFilename(outdir, taskp.Stderr)
+ if err != nil {
+ return "", "", "", err
+ }
+ // Set up stderr redirection
+ stderr = outdir + "/" + taskp.Stderr
+ cmd.Stderr, err = os.Create(stderr)
+ if err != nil {
+ return "", "", "", err
+ }
+ } else {
+ cmd.Stderr = os.Stderr
+ }
if taskp.Env != nil {
// Set up subprocess environment
cmd.Env = append(cmd.Env, k+"="+v)
}
}
- return stdin, stdout, nil
+ return stdin, stdout, stderr, nil
}
// Set up signal handlers. Go sends signal notifications to a "signal
return inp
}
+func getKeepTmp(outdir string) (manifest string, err error) {
+ fn, err := os.Open(outdir + "/" + ".arvados#collection")
+ if err != nil {
+ return "", err
+ }
+ defer fn.Close()
+
+ buf, err := ioutil.ReadAll(fn)
+ if err != nil {
+ return "", err
+ }
+ collection := arvados.Collection{}
+ err = json.Unmarshal(buf, &collection)
+ return collection.ManifestText, err
+}
+
func runner(api IArvadosClient,
kc IKeepClient,
jobUuid, taskUuid, crunchtmpdir, keepmount string,
}
var tmpdir, outdir string
- tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUuid)
+ tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUuid, taskp.KeepTmpOutput)
if err != nil {
return TempFail{err}
}
cmd.Dir = outdir
- var stdin, stdout string
- stdin, stdout, err = setupCommand(cmd, taskp, outdir, replacements)
+ var stdin, stdout, stderr string
+ stdin, stdout, stderr, err = setupCommand(cmd, taskp, outdir, replacements)
if err != nil {
return err
}
if stdout != "" {
stdout = " > " + stdout
}
- log.Printf("Running %v%v%v", cmd.Args, stdin, stdout)
+ if stderr != "" {
+ stderr = " 2> " + stderr
+ }
+ log.Printf("Running %v%v%v%v", cmd.Args, stdin, stdout, stderr)
var caughtSignal os.Signal
sigChan := setupSignals(cmd)
}
// Upload output directory
- manifest, err := WriteTree(kc, outdir)
+ var manifest string
+ if taskp.KeepTmpOutput {
+ manifest, err = getKeepTmp(outdir)
+ } else {
+ manifest, err = WriteTree(kc, outdir)
+ }
if err != nil {
return TempFail{err}
}
log.Fatal(err)
}
- certpath := path.Join(path.Dir(os.Args[0]), "ca-certificates.crt")
- certdata, err := ioutil.ReadFile(certpath)
- if err == nil {
- log.Printf("Using TLS certificates at %v", certpath)
- certs := x509.NewCertPool()
- certs.AppendCertsFromPEM(certdata)
- api.Client.Transport.(*http.Transport).TLSClientConfig.RootCAs = certs
+ // Container may not have certificates installed, so need to look for
+ // /etc/arvados/ca-certificates.crt in addition to normal system certs.
+ var certFiles = []string{
+ "/etc/ssl/certs/ca-certificates.crt", // Debian
+ "/etc/pki/tls/certs/ca-bundle.crt", // Red Hat
+ "/etc/arvados/ca-certificates.crt",
+ }
+
+ certs := x509.NewCertPool()
+ for _, file := range certFiles {
+ data, err := ioutil.ReadFile(file)
+ if err == nil {
+ log.Printf("Using TLS certificates at %v", file)
+ certs.AppendCertsFromPEM(data)
+ }
}
+ api.Client.Transport.(*http.Transport).TLSClientConfig.RootCAs = certs
jobUuid := os.Getenv("JOB_UUID")
taskUuid := os.Getenv("TASK_UUID")
}
var kc IKeepClient
- kc, err = keepclient.MakeKeepClient(&api)
+ kc, err = keepclient.MakeKeepClient(api)
if err != nil {
log.Fatal(err)
}