"os/signal"
"path"
"path/filepath"
+ "sort"
"strings"
"sync"
"syscall"
// IKeepClient is the minimal Keep API methods used by crunch-run.
type IKeepClient interface {
PutHB(hash string, buf []byte) (string, int, error)
- ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error)
+ ManifestFileReader(m manifest.Manifest, filename string) (keepclient.Reader, error)
}
// NewLogWriter is a factory function to create a new log writer.
return c, nil
}
+func (runner *ContainerRunner) SetupArvMountPoint(prefix string) (err error) {
+ if runner.ArvMountPoint == "" {
+ runner.ArvMountPoint, err = runner.MkTempDir("", prefix)
+ }
+ return
+}
+
func (runner *ContainerRunner) SetupMounts() (err error) {
- runner.ArvMountPoint, err = runner.MkTempDir("", "keep")
+ err = runner.SetupArvMountPoint("keep")
if err != nil {
return fmt.Errorf("While creating keep mount temp dir: %v", err)
}
runner.Binds = nil
needCertMount := true
- for bind, mnt := range runner.Container.Mounts {
+ var binds []string
+ for bind, _ := range runner.Container.Mounts {
+ binds = append(binds, bind)
+ }
+ sort.Strings(binds)
+
+ for _, bind := range binds {
+ mnt := runner.Container.Mounts[bind]
if bind == "stdout" {
// Is it a "file" mount kind?
if mnt.Kind != "file" {
return fmt.Errorf("Stdout path does not start with OutputPath: %s, %s", mnt.Path, prefix)
}
}
+
if bind == "/etc/arvados/ca-certificates.crt" {
needCertMount = false
}
+ if strings.HasPrefix(bind, runner.Container.OutputPath+"/") && bind != runner.Container.OutputPath+"/" {
+ if mnt.Kind != "collection" {
+ return fmt.Errorf("Only mount points of kind 'collection' are supported underneath the output_path: %v", bind)
+ }
+ }
+
switch {
case mnt.Kind == "collection":
var src string
if mnt.Writable {
if bind == runner.Container.OutputPath {
runner.HostOutputDir = src
+ } else if strings.HasPrefix(bind, runner.Container.OutputPath+"/") {
+ return fmt.Errorf("Writable mount points are not permitted underneath the output_path: %v", bind)
}
runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
} else {
manifestText = rec.ManifestText
}
+ // Pre-populate output from the configured mount points
+ var binds []string
+ for bind, _ := range runner.Container.Mounts {
+ binds = append(binds, bind)
+ }
+ sort.Strings(binds)
+
+ for _, bind := range binds {
+ mnt := runner.Container.Mounts[bind]
+
+ bindSuffix := strings.TrimPrefix(bind, runner.Container.OutputPath)
+
+ if bindSuffix == bind || len(bindSuffix) <= 0 {
+ // either does not start with OutputPath or is OutputPath itself
+ continue
+ }
+
+ if strings.HasPrefix(bindSuffix, "/") == false {
+ bindSuffix = "/" + bindSuffix
+ }
+
+ if mnt.ExcludeFromOutput == true {
+ continue
+ }
+
+ idx := strings.Index(mnt.PortableDataHash, "/")
+ if idx > 0 {
+ mnt.Path = mnt.PortableDataHash[idx:]
+ mnt.PortableDataHash = mnt.PortableDataHash[0:idx]
+ }
+
+ // append to manifest_text
+ m, err := runner.getCollectionManifestForPath(mnt, bindSuffix)
+ if err != nil {
+ return err
+ }
+
+ manifestText = manifestText + m
+ }
+
+ // Save output
var response arvados.Collection
err = runner.ArvClient.Create("collections",
arvadosclient.Dict{
"collection": arvadosclient.Dict{
- "expires_at": time.Now().Add(runner.trashLifetime).Format(time.RFC3339),
+ "trash_at": time.Now().Add(runner.trashLifetime).Format(time.RFC3339),
"name": "output for " + runner.Container.UUID,
"manifest_text": manifestText}},
&response)
return nil
}
+var outputCollections = make(map[string]arvados.Collection)
+
+// Fetch the collection for the mnt.PortableDataHash
+// Return the manifest_text fragment corresponding to the specified mnt.Path
+// after making any required updates.
+// Ex:
+// If mnt.Path is not specified,
+// return the entire manifest_text after replacing any "." with bindSuffix
+// If mnt.Path corresponds to one stream,
+// return the manifest_text for that stream after replacing that stream name with bindSuffix
+// Otherwise, check if a filename in any one stream is being sought. Return the manifest_text
+// for that stream after replacing stream name with bindSuffix minus the last word
+// and the file name with last word of the bindSuffix
+// Allowed path examples:
+// "path":"/"
+// "path":"/subdir1"
+// "path":"/subdir1/subdir2"
+// "path":"/subdir/filename" etc
+func (runner *ContainerRunner) getCollectionManifestForPath(mnt arvados.Mount, bindSuffix string) (string, error) {
+ collection := outputCollections[mnt.PortableDataHash]
+ if collection.PortableDataHash == "" {
+ err := runner.ArvClient.Get("collections", mnt.PortableDataHash, nil, &collection)
+ if err != nil {
+ return "", fmt.Errorf("While getting collection for %v: %v", mnt.PortableDataHash, err)
+ }
+ outputCollections[mnt.PortableDataHash] = collection
+ }
+
+ manifest := manifest.Manifest{Text: collection.ManifestText}
+
+ manifestText := ""
+ if mnt.Path == "" || mnt.Path == "/" {
+ // no path specified; return the entire manifest text after making adjustments
+ manifestText = collection.ManifestText
+ manifestText = strings.Replace(manifestText, "./", "."+bindSuffix+"/", -1)
+ manifestText = strings.Replace(manifestText, ". ", "."+bindSuffix+" ", -1)
+ } else {
+ // either a single stream or file from a stream is being sought
+ bindIdx := strings.LastIndex(bindSuffix, "/")
+ var bindSubdir, bindFileName string
+ if bindIdx >= 0 {
+ bindSubdir = "." + bindSuffix[0:bindIdx]
+ bindFileName = bindSuffix[bindIdx+1:]
+ }
+ mntPath := mnt.Path
+ if strings.HasSuffix(mntPath, "/") {
+ mntPath = mntPath[0 : len(mntPath)-1]
+ }
+ pathIdx := strings.LastIndex(mntPath, "/")
+ var pathSubdir, pathFileName string
+ if pathIdx >= 0 {
+ pathSubdir = "." + mntPath[0:pathIdx]
+ pathFileName = mntPath[pathIdx+1:]
+ }
+ streams := strings.Split(collection.ManifestText, "\n")
+ for _, stream := range streams {
+ tokens := strings.Split(stream, " ")
+ if tokens[0] == "."+mntPath {
+ // path refers to this complete stream
+ adjustedStream := strings.Replace(stream, "."+mntPath, "."+bindSuffix, -1)
+ manifestText = adjustedStream + "\n"
+ break
+ } else {
+ // look for a matching file in this stream
+ fs := manifest.FileSegmentForPath(mntPath[1:])
+ if fs != "" {
+ manifestText = strings.Replace(fs, ":"+pathFileName, ":"+bindFileName, -1)
+ manifestText = strings.Replace(manifestText, pathSubdir, bindSubdir, -1)
+ manifestText += "\n"
+ break
+ }
+ }
+ }
+ }
+
+ if manifestText == "" {
+ runner.CrunchLog.Printf("No manifest segment found for bind '%v' with path '%v'", bindSuffix, mnt.Path)
+ }
+
+ return manifestText, nil
+}
+
func (runner *ContainerRunner) loadDiscoveryVars() {
tl, err := runner.ArvClient.Discovery("defaultTrashLifetime")
if err != nil {
err = runner.ArvClient.Create("collections",
arvadosclient.Dict{
"collection": arvadosclient.Dict{
- "expires_at": time.Now().Add(runner.trashLifetime).Format(time.RFC3339),
+ "trash_at": time.Now().Add(runner.trashLifetime).Format(time.RFC3339),
"name": "logs for " + runner.Container.UUID,
"manifest_text": mt}},
&response)
func (runner *ContainerRunner) UpdateContainerFinal() error {
update := arvadosclient.Dict{}
update["state"] = runner.finalState
+ if runner.LogsPDH != nil {
+ update["log"] = *runner.LogsPDH
+ }
if runner.finalState == "Complete" {
- if runner.LogsPDH != nil {
- update["log"] = *runner.LogsPDH
- }
if runner.ExitCode != nil {
update["exit_code"] = *runner.ExitCode
}