X-Git-Url: https://git.arvados.org/arvados.git/blobdiff_plain/0d3dd79bff8665c36e2442a76a9f7bb700702101..2333472a4f517a227278f028bbbc4e72687c0e71:/sdk/go/crunchrunner/crunchrunner.go diff --git a/sdk/go/crunchrunner/crunchrunner.go b/sdk/go/crunchrunner/crunchrunner.go index 114c49c349..999a19371d 100644 --- a/sdk/go/crunchrunner/crunchrunner.go +++ b/sdk/go/crunchrunner/crunchrunner.go @@ -1,21 +1,20 @@ package main import ( - "crypto/x509" "encoding/json" "fmt" - "git.curoverse.com/arvados.git/sdk/go/arvados" - "git.curoverse.com/arvados.git/sdk/go/arvadosclient" - "git.curoverse.com/arvados.git/sdk/go/keepclient" "io" "io/ioutil" "log" - "net/http" "os" "os/exec" "os/signal" "strings" "syscall" + + "git.curoverse.com/arvados.git/sdk/go/arvados" + "git.curoverse.com/arvados.git/sdk/go/arvadosclient" + "git.curoverse.com/arvados.git/sdk/go/keepclient" ) type TaskDef struct { @@ -28,7 +27,7 @@ type TaskDef struct { SuccessCodes []int `json:"task.successCodes"` PermanentFailCodes []int `json:"task.permanentFailCodes"` TemporaryFailCodes []int `json:"task.temporaryFailCodes"` - keepTmpOutput bool `json:"task.keepTmpOutput"` + KeepTmpOutput bool `json:"task.keepTmpOutput"` } type Tasks struct { @@ -36,17 +35,17 @@ type Tasks struct { } type Job struct { - Script_parameters Tasks `json:"script_parameters"` + ScriptParameters Tasks `json:"script_parameters"` } type Task struct { - Job_uuid string `json:"job_uuid"` - Created_by_job_task_uuid string `json:"created_by_job_task_uuid"` - Parameters TaskDef `json:"parameters"` - Sequence int `json:"sequence"` - Output string `json:"output"` - Success bool `json:"success"` - Progress float32 `json:"sequence"` + JobUUID string `json:"job_uuid"` + CreatedByJobTaskUUID string `json:"created_by_job_task_uuid"` + Parameters TaskDef `json:"parameters"` + Sequence int `json:"sequence"` + Output string `json:"output"` + Success bool `json:"success"` + Progress float32 `json:"sequence"` } type IArvadosClient interface { @@ -54,7 +53,7 @@ type IArvadosClient interface { Update(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) (err error) } -func setupDirectories(crunchtmpdir, taskUuid string, keepTmp bool) (tmpdir, outdir string, err error) { +func setupDirectories(crunchtmpdir, taskUUID string, keepTmp bool) (tmpdir, outdir string, err error) { tmpdir = crunchtmpdir + "/tmpdir" err = os.Mkdir(tmpdir, 0700) if err != nil { @@ -114,11 +113,13 @@ func setupCommand(cmd *exec.Cmd, taskp TaskDef, outdir string, replacements map[ if err != nil { return "", "", "", err } - if taskp.keepTmpOutput { - // Is there an os.Copy? - copyFile(v, outdir+"/"+k) + if taskp.KeepTmpOutput { + err = copyFile(v, outdir+"/"+k) } else { - os.Symlink(v, outdir+"/"+k) + err = os.Symlink(v, outdir+"/"+k) + } + if err != nil { + return "", "", "", err } } } @@ -222,13 +223,13 @@ func getKeepTmp(outdir string) (manifest string, err error) { return "", err } collection := arvados.Collection{} - json.Unmarshal(buf, &collection) - return collection.ManifestText, nil + err = json.Unmarshal(buf, &collection) + return collection.ManifestText, err } func runner(api IArvadosClient, kc IKeepClient, - jobUuid, taskUuid, crunchtmpdir, keepmount string, + jobUUID, taskUUID, crunchtmpdir, keepmount string, jobStruct Job, taskStruct Task) error { var err error @@ -237,34 +238,35 @@ func runner(api IArvadosClient, // If this is task 0 and there are multiple tasks, dispatch subtasks // and exit. if taskStruct.Sequence == 0 { - if len(jobStruct.Script_parameters.Tasks) == 1 { - taskp = jobStruct.Script_parameters.Tasks[0] + if len(jobStruct.ScriptParameters.Tasks) == 1 { + taskp = jobStruct.ScriptParameters.Tasks[0] } else { - for _, task := range jobStruct.Script_parameters.Tasks { + for _, task := range jobStruct.ScriptParameters.Tasks { err := api.Create("job_tasks", map[string]interface{}{ - "job_task": Task{Job_uuid: jobUuid, - Created_by_job_task_uuid: taskUuid, - Sequence: 1, - Parameters: task}}, + "job_task": Task{ + JobUUID: jobUUID, + CreatedByJobTaskUUID: taskUUID, + Sequence: 1, + Parameters: task}}, nil) if err != nil { return TempFail{err} } } - err = api.Update("job_tasks", taskUuid, + err = api.Update("job_tasks", taskUUID, map[string]interface{}{ - "job_task": Task{ - Output: "", - Success: true, - Progress: 1.0}}, + "job_task": map[string]interface{}{ + "output": "", + "success": true, + "progress": 1.0}}, nil) return nil } } var tmpdir, outdir string - tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUuid, taskp.keepTmpOutput) + tmpdir, outdir, err = setupDirectories(crunchtmpdir, taskUUID, taskp.KeepTmpOutput) if err != nil { return TempFail{err} } @@ -360,17 +362,17 @@ func runner(api IArvadosClient, // Upload output directory var manifest string - if taskp.keepTmpOutput { + if taskp.KeepTmpOutput { manifest, err = getKeepTmp(outdir) } else { manifest, err = WriteTree(kc, outdir) - if err != nil { - return TempFail{err} - } + } + if err != nil { + return TempFail{err} } // Set status - err = api.Update("job_tasks", taskUuid, + err = api.Update("job_tasks", taskUUID, map[string]interface{}{ "job_task": Task{ Output: manifest, @@ -394,37 +396,19 @@ func main() { log.Fatal(err) } - // Container may not have certificates installed, so need to look for - // /etc/arvados/ca-certificates.crt in addition to normal system certs. - var certFiles = []string{ - "/etc/ssl/certs/ca-certificates.crt", // Debian - "/etc/pki/tls/certs/ca-bundle.crt", // Red Hat - "/etc/arvados/ca-certificates.crt", - } - - certs := x509.NewCertPool() - for _, file := range certFiles { - data, err := ioutil.ReadFile(file) - if err == nil { - log.Printf("Using TLS certificates at %v", file) - certs.AppendCertsFromPEM(data) - } - } - api.Client.Transport.(*http.Transport).TLSClientConfig.RootCAs = certs - - jobUuid := os.Getenv("JOB_UUID") - taskUuid := os.Getenv("TASK_UUID") + jobUUID := os.Getenv("JOB_UUID") + taskUUID := os.Getenv("TASK_UUID") tmpdir := os.Getenv("TASK_WORK") keepmount := os.Getenv("TASK_KEEPMOUNT") var jobStruct Job var taskStruct Task - err = api.Get("jobs", jobUuid, nil, &jobStruct) + err = api.Get("jobs", jobUUID, nil, &jobStruct) if err != nil { log.Fatal(err) } - err = api.Get("job_tasks", taskUuid, nil, &taskStruct) + err = api.Get("job_tasks", taskUUID, nil, &taskStruct) if err != nil { log.Fatal(err) } @@ -436,7 +420,7 @@ func main() { } syscall.Umask(0022) - err = runner(api, kc, jobUuid, taskUuid, tmpdir, keepmount, jobStruct, taskStruct) + err = runner(api, kc, jobUUID, taskUUID, tmpdir, keepmount, jobStruct, taskStruct) if err == nil { os.Exit(0)