+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
package main
import (
- "crypto/x509"
+ "encoding/json"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "io"
"io/ioutil"
"log"
- "net/http"
"os"
"os/exec"
"os/signal"
- "path"
"strings"
"syscall"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
)
type TaskDef struct {
Env map[string]string `json:"task.env"`
Stdin string `json:"task.stdin"`
Stdout string `json:"task.stdout"`
+ Stderr string `json:"task.stderr"`
Vwd map[string]string `json:"task.vwd"`
SuccessCodes []int `json:"task.successCodes"`
PermanentFailCodes []int `json:"task.permanentFailCodes"`
TemporaryFailCodes []int `json:"task.temporaryFailCodes"`
+ KeepTmpOutput bool `json:"task.keepTmpOutput"`
}
type Tasks struct {
}
type Job struct {
- Script_parameters Tasks `json:"script_parameters"`
+ ScriptParameters Tasks `json:"script_parameters"`
}
type Task struct {
- Job_uuid string `json:"job_uuid"`
- Created_by_job_task_uuid string `json:"created_by_job_task_uuid"`
- Parameters TaskDef `json:"parameters"`
- Sequence int `json:"sequence"`
- Output string `json:"output"`
- Success bool `json:"success"`
- Progress float32 `json:"sequence"`
+ JobUUID string `json:"job_uuid"`
+ CreatedByJobTaskUUID string `json:"created_by_job_task_uuid"`
+ Parameters TaskDef `json:"parameters"`
+ Sequence int `json:"sequence"`
+ Output string `json:"output"`
+ Success bool `json:"success"`
+ Progress float32 `json:"sequence"`
}
type IArvadosClient interface {
Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error)
}
-func setupDirectories(crunchtmpdir, taskUuid string) (tmpdir, outdir string, err error) {
+func setupDirectories(crunchtmpdir, taskUUID string, keepTmp bool) (tmpdir, outdir string, err error) {
tmpdir = crunchtmpdir + "/tmpdir"
err = os.Mkdir(tmpdir, 0700)
if err != nil {
return "", "", err
}
- outdir = crunchtmpdir + "/outdir"
- err = os.Mkdir(outdir, 0700)
- if err != nil {
- return "", "", err
+ if keepTmp {
+ outdir = os.Getenv("TASK_KEEPMOUNT_TMP")
+ } else {
+ outdir = crunchtmpdir + "/outdir"
+ err = os.Mkdir(outdir, 0700)
+ if err != nil {
+ return "", "", err
+ }
}
return tmpdir, outdir, nil
return nil
}
-func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout string, err error) {
+func copyFile(dst, src string) error {
+ in, err := os.Open(src)
+ if err != nil {
+ return err
+ }
+ defer in.Close()
+
+ out, err := os.Create(dst)
+ if err != nil {
+ return err
+ }
+ defer out.Close()
+
+ _, err = io.Copy(out, in)
+ return err
+}
+
+func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[string]string) (stdin, stdout, stderr string, err error) {
if taskp.Vwd != nil {
for k, v := range taskp.Vwd {
v = substitute(v, replacements)
err = checkOutputFilename(outdir, k)
if err != nil {
- return "", "", err
+ return "", "", "", err
+ }
+ if taskp.KeepTmpOutput {
+ err = copyFile(v, outdir+"/"+k)
+ } else {
+ err = os.Symlink(v, outdir+"/"+k)
+ }
+ if err != nil {
+ return "", "", "", err
}
- os.Symlink(v, outdir+"/"+k)
}
}
stdin = substitute(taskp.Stdin, replacements)
cmd.Stdin, err = os.Open(stdin)
if err != nil {
- return "", "", err
+ return "", "", "", err
}
}
if taskp.Stdout != "" {
err = checkOutputFilename(outdir, taskp.Stdout)
if err != nil {
- return "", "", err
+ return "", "", "", err
}
// Set up stdout redirection
stdout = outdir + "/" + taskp.Stdout
cmd.Stdout, err = os.Create(stdout)
if err != nil {
- return "", "", err
+ return "", "", "", err
}
} else {
cmd.Stdout = os.Stdout
}
- cmd.Stderr = os.Stderr
+ if taskp.Stderr != "" {
+ err = checkOutputFilename(outdir, taskp.Stderr)
+ if err != nil {
+ return "", "", "", err
+ }
+ // Set up stderr redirection
+ stderr = outdir + "/" + taskp.Stderr
+ cmd.Stderr, err = os.Create(stderr)
+ if err != nil {
+ return "", "", "", err
+ }
+ } else {
+ cmd.Stderr = os.Stderr
+ }
if taskp.Env != nil {
// Set up subprocess environment
cmd.Env = append(cmd.Env, k+"="+v)
}
}
- return stdin, stdout, nil
+ return stdin, stdout, stderr, nil
}
// Set up signal handlers. Go sends signal notifications to a "signal
return inp
}
+func getKeepTmp(outdir string) (manifest string, err error) {
+ fn, err := os.Open(outdir + "/" + ".arvados#collection")
+ if err != nil {
+ return "", err
+ }
+ defer fn.Close()
+
+ buf, err := ioutil.ReadAll(fn)
+ if err != nil {
+ return "", err
+ }
+ collection := arvados.Collection{}
+ err = json.Unmarshal(buf, &collection)
+ return collection.ManifestText, err
+}
+
func runner(api IArvadosClient,
kc IKeepClient,
- jobUuid, taskUuid, crunchtmpdir, keepmount string,
+ jobUUID, taskUUID, crunchtmpdir, keepmount string,
jobStruct Job, taskStruct Task) error {
var err error
// If this is task 0 and there are multiple tasks, dispatch subtasks
// and exit.
if taskStruct.Sequence == 0 {
- if len(jobStruct.Script_parameters.Tasks) == 1 {
- taskp = jobStruct.Script_parameters.Tasks[0]
+ if len(jobStruct.ScriptParameters.Tasks) == 1 {
+ taskp = jobStruct.ScriptParameters.Tasks[0]
} else {
- for _, task := range jobStruct.Script_parameters.Tasks {
+ for _, task := range jobStruct.ScriptParameters.Tasks {
err := api.Create("job_tasks",
map[string]interface{}{
- "job_task": Task{Job_uuid: jobUuid,
- Created_by_job_task_uuid: taskUuid,
- Sequence: 1,
- Parameters: task}},
+ "job_task": Task{
+ JobUUID: jobUUID,
+ CreatedByJobTaskUUID: taskUUID,
+ Sequence: 1,
+ Parameters: task}},
nil)
if err != nil {
return TempFail{err}
}
}
- err = api.Update("job_tasks", taskUuid,
+ err = api.Update("job_tasks", taskUUID,
map[string]interface{}{
- "job_task": Task{
- Output: "",
- Success: true,
- Progress: 1.0}},
+ "job_task": map[string]interface{}{
+ "output": "",
+ "success": true,
+ "progress": 1.0}},
nil)
return nil
}
}
var tmpdir, outdir string
- tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUuid)
+ tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUUID, taskp.KeepTmpOutput)
if err != nil {
return TempFail{err}
}
cmd.Dir = outdir
- var stdin, stdout string
- stdin, stdout, err = setupCommand(cmd, taskp, outdir, replacements)
+ var stdin, stdout, stderr string
+ stdin, stdout, stderr, err = setupCommand(cmd, taskp, outdir, replacements)
if err != nil {
return err
}
if stdout != "" {
stdout = " > " + stdout
}
- log.Printf("Running %v%v%v", cmd.Args, stdin, stdout)
+ if stderr != "" {
+ stderr = " 2> " + stderr
+ }
+ log.Printf("Running %v%v%v%v", cmd.Args, stdin, stdout, stderr)
var caughtSignal os.Signal
sigChan := setupSignals(cmd)
}
// Upload output directory
- manifest, err := WriteTree(kc, outdir)
+ var manifest string
+ if taskp.KeepTmpOutput {
+ manifest, err = getKeepTmp(outdir)
+ } else {
+ manifest, err = WriteTree(kc, outdir)
+ }
if err != nil {
return TempFail{err}
}
// Set status
- err = api.Update("job_tasks", taskUuid,
+ err = api.Update("job_tasks", taskUUID,
map[string]interface{}{
"job_task": Task{
Output: manifest,
log.Fatal(err)
}
- // Container may not have certificates installed, so need to look for
- // /etc/arvados/ca-certificates.crt in addition to normal system certs.
- var certFiles = []string{
- "/etc/ssl/certs/ca-certificates.crt", // Debian
- "/etc/pki/tls/certs/ca-bundle.crt", // Red Hat
- "/etc/arvados/ca-certificates.crt",
- }
-
- certs := x509.NewCertPool()
- for _, file := range certFiles {
- data, err := ioutil.ReadFile(file)
- if err == nil {
- log.Printf("Using TLS certificates at %v", file)
- certs.AppendCertsFromPEM(data)
- }
- }
- api.Client.Transport.(*http.Transport).TLSClientConfig.RootCAs = certs
-
- jobUuid := os.Getenv("JOB_UUID")
- taskUuid := os.Getenv("TASK_UUID")
+ jobUUID := os.Getenv("JOB_UUID")
+ taskUUID := os.Getenv("TASK_UUID")
tmpdir := os.Getenv("TASK_WORK")
keepmount := os.Getenv("TASK_KEEPMOUNT")
var jobStruct Job
var taskStruct Task
- err = api.Get("jobs", jobUuid, nil, &jobStruct)
+ err = api.Get("jobs", jobUUID, nil, &jobStruct)
if err != nil {
log.Fatal(err)
}
- err = api.Get("job_tasks", taskUuid, nil, &taskStruct)
+ err = api.Get("job_tasks", taskUUID, nil, &taskStruct)
if err != nil {
log.Fatal(err)
}
var kc IKeepClient
- kc, err = keepclient.MakeKeepClient(&api)
+ kc, err = keepclient.MakeKeepClient(api)
if err != nil {
log.Fatal(err)
}
syscall.Umask(0022)
- err = runner(api, kc, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct)
+ err = runner(api, kc, jobUUID, taskUUID, tmpdir, keepmount, jobStruct, taskStruct)
if err == nil {
os.Exit(0)