log.Printf("Error updating container state to Complete for %v: %q", uuid, err)
}
}
+
+ log.Printf("Finished container run for %v", uuid)
}
package main
import (
+ "encoding/json"
"errors"
"flag"
+ "fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
"github.com/curoverse/dockerclient"
"io"
+ "io/ioutil"
"log"
"os"
+ "os/exec"
"os/signal"
"strings"
"sync"
"syscall"
+ "time"
)
// IArvadosClient is the minimal Arvados API methods used by crunch-run.
}
// Mount describes the mount points to create inside the container.
-type Mount struct{}
+type Mount struct {
+ Kind string `json:"kind"`
+ Writable bool `json:"writable"`
+ PortableDataHash string `json:"portable_data_hash"`
+ UUID string `json:"uuid"`
+ DeviceType string `json:"device_type"`
+}
// Collection record returned by the API server.
-type Collection struct {
- ManifestText string `json:"manifest_text"`
+type CollectionRecord struct {
+ ManifestText string `json:"manifest_text"`
+ PortableDataHash string `json:"portable_data_hash"`
}
// ContainerRecord is the container record returned by the API server.
Priority int `json:"priority"`
RuntimeConstraints map[string]interface{} `json:"runtime_constraints"`
State string `json:"state"`
+ Output string `json:"output"`
}
// NewLogWriter is a factory function to create a new log writer.
type NewLogWriter func(name string) io.WriteCloser
+type RunArvMount func([]string) (*exec.Cmd, error)
+
+type MkTempDir func(string, string) (string, error)
+
// ThinDockerClient is the minimal Docker client interface used by crunch-run.
type ThinDockerClient interface {
StopContainer(id string, timeout int) error
LoadImage(reader io.Reader) error
CreateContainer(config *dockerclient.ContainerConfig, name string, authConfig *dockerclient.AuthConfig) (string, error)
StartContainer(id string, config *dockerclient.HostConfig) error
- ContainerLogs(id string, options *dockerclient.LogOptions) (io.ReadCloser, error)
+ AttachContainer(id string, options *dockerclient.AttachOptions) (io.ReadCloser, error)
Wait(id string) <-chan dockerclient.WaitResult
RemoveImage(name string, force bool) ([]*dockerclient.ImageDelete, error)
}
Stderr *ThrottledLogger
LogCollection *CollectionWriter
LogsPDH *string
- CancelLock sync.Mutex
- Cancelled bool
- SigChan chan os.Signal
- finalState string
+ RunArvMount
+ MkTempDir
+ ArvMount *exec.Cmd
+ ArvMountPoint string
+ HostOutputDir string
+ CleanupTempDir []string
+ Binds []string
+ OutputPDH *string
+ CancelLock sync.Mutex
+ Cancelled bool
+ SigChan chan os.Signal
+ ArvMountExit chan error
+ finalState string
}
// SetupSignals sets up signal handling to gracefully terminate the underlying
runner.CrunchLog.Printf("Fetching Docker image from collection '%s'", runner.ContainerRecord.ContainerImage)
- var collection Collection
+ var collection CollectionRecord
err = runner.ArvClient.Get("collections", runner.ContainerRecord.ContainerImage, nil, &collection)
if err != nil {
- return err
+ return fmt.Errorf("While getting container image collection: %v", err)
}
manifest := manifest.Manifest{Text: collection.ManifestText}
var img, imageID string
for ms := range manifest.StreamIter() {
img = ms.FileStreamSegments[0].Name
if !strings.HasSuffix(img, ".tar") {
- return errors.New("First file in the collection does not end in .tar")
+ return fmt.Errorf("First file in the container image collection does not end in .tar")
}
imageID = img[:len(img)-4]
}
var readCloser io.ReadCloser
readCloser, err = runner.Kc.ManifestFileReader(manifest, img)
if err != nil {
- return err
+ return fmt.Errorf("While creating ManifestFileReader for container image: %v", err)
}
err = runner.Docker.LoadImage(readCloser)
if err != nil {
- return err
+ return fmt.Errorf("While loading container image into Docker: %v", err)
}
} else {
runner.CrunchLog.Print("Docker image is available")
return nil
}
+func (runner *ContainerRunner) ArvMountCmd(arvMountCmd []string) (c *exec.Cmd, err error) {
+ c = exec.Command("arv-mount", arvMountCmd...)
+ nt := NewThrottledLogger(runner.NewLogWriter("arv-mount"))
+ c.Stdout = nt
+ c.Stderr = nt
+
+ err = c.Start()
+ if err != nil {
+ return nil, err
+ }
+
+ statReadme := make(chan bool)
+ runner.ArvMountExit = make(chan error)
+
+ keepStatting := true
+ go func() {
+ for keepStatting {
+ time.Sleep(100 * time.Millisecond)
+ _, err = os.Stat(fmt.Sprintf("%s/by_id/README", runner.ArvMountPoint))
+ if err == nil {
+ keepStatting = false
+ statReadme <- true
+ }
+ }
+ close(statReadme)
+ }()
+
+ go func() {
+ runner.ArvMountExit <- c.Wait()
+ close(runner.ArvMountExit)
+ }()
+
+ select {
+ case <-statReadme:
+ break
+ case err := <-runner.ArvMountExit:
+ runner.ArvMount = nil
+ keepStatting = false
+ return nil, err
+ }
+
+ return c, nil
+}
+
+func (runner *ContainerRunner) SetupMounts() (err error) {
+ runner.ArvMountPoint, err = runner.MkTempDir("", "keep")
+ if err != nil {
+ return fmt.Errorf("While creating keep mount temp dir: %v", err)
+ }
+
+ runner.CleanupTempDir = append(runner.CleanupTempDir, runner.ArvMountPoint)
+
+ pdhOnly := true
+ tmpcount := 0
+ arvMountCmd := []string{"--foreground", "--allow-other", "--read-write"}
+ collectionPaths := []string{}
+ runner.Binds = nil
+
+ for bind, mnt := range runner.ContainerRecord.Mounts {
+ if mnt.Kind == "collection" {
+ var src string
+ if mnt.UUID != "" && mnt.PortableDataHash != "" {
+ return fmt.Errorf("Cannot specify both 'uuid' and 'portable_data_hash' for a collection mount")
+ }
+ if mnt.UUID != "" {
+ if mnt.Writable {
+ return fmt.Errorf("Writing to existing collections currently not permitted.")
+ }
+ pdhOnly = false
+ src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.UUID)
+ } else if mnt.PortableDataHash != "" {
+ if mnt.Writable {
+ return fmt.Errorf("Can never write to a collection specified by portable data hash")
+ }
+ src = fmt.Sprintf("%s/by_id/%s", runner.ArvMountPoint, mnt.PortableDataHash)
+ } else {
+ src = fmt.Sprintf("%s/tmp%d", runner.ArvMountPoint, tmpcount)
+ arvMountCmd = append(arvMountCmd, "--mount-tmp")
+ arvMountCmd = append(arvMountCmd, fmt.Sprintf("tmp%d", tmpcount))
+ tmpcount += 1
+ }
+ if mnt.Writable {
+ if bind == runner.ContainerRecord.OutputPath {
+ runner.HostOutputDir = src
+ }
+ runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", src, bind))
+ } else {
+ runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s:ro", src, bind))
+ }
+ collectionPaths = append(collectionPaths, src)
+ } else if mnt.Kind == "tmp" {
+ if bind == runner.ContainerRecord.OutputPath {
+ runner.HostOutputDir, err = runner.MkTempDir("", "")
+ if err != nil {
+ return fmt.Errorf("While creating mount temp dir: %v", err)
+ }
+ st, staterr := os.Stat(runner.HostOutputDir)
+ if staterr != nil {
+ return fmt.Errorf("While Stat on temp dir: %v", staterr)
+ }
+ err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
+ if staterr != nil {
+ return fmt.Errorf("While Chmod temp dir: %v", err)
+ }
+ runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
+ runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
+ } else {
+ runner.Binds = append(runner.Binds, bind)
+ }
+ } else {
+ return fmt.Errorf("Unknown mount kind '%s'", mnt.Kind)
+ }
+ }
+
+ if runner.HostOutputDir == "" {
+ return fmt.Errorf("Output path does not correspond to a writable mount point")
+ }
+
+ if pdhOnly {
+ arvMountCmd = append(arvMountCmd, "--mount-by-pdh", "by_id")
+ } else {
+ arvMountCmd = append(arvMountCmd, "--mount-by-id", "by_id")
+ }
+ arvMountCmd = append(arvMountCmd, runner.ArvMountPoint)
+
+ runner.ArvMount, err = runner.RunArvMount(arvMountCmd)
+ if err != nil {
+ return fmt.Errorf("While trying to start arv-mount: %v", err)
+ }
+
+ for _, p := range collectionPaths {
+ _, err = os.Stat(p)
+ if err != nil {
+ return fmt.Errorf("While checking that input files exist: %v", err)
+ }
+ }
+
+ return nil
+}
+
+func (runner *ContainerRunner) ProcessDockerAttach(containerReader io.Reader) {
+ // Handle docker log protocol
+ // https://docs.docker.com/engine/reference/api/docker_remote_api_v1.15/#attach-to-a-container
+
+ header := make([]byte, 8)
+ for {
+ _, readerr := io.ReadAtLeast(containerReader, header, 8)
+
+ if readerr == nil {
+ readsize := int64(header[7]) | (int64(header[6]) << 8) | (int64(header[5]) << 16) | (int64(header[4]) << 24)
+ if header[0] == 1 {
+ // stdout
+ _, readerr = io.CopyN(runner.Stdout, containerReader, readsize)
+ } else {
+ // stderr
+ _, readerr = io.CopyN(runner.Stderr, containerReader, readsize)
+ }
+ }
+
+ if readerr != nil {
+ if readerr != io.EOF {
+ runner.CrunchLog.Printf("While reading docker logs: %v", readerr)
+ }
+
+ closeerr := runner.Stdout.Close()
+ if closeerr != nil {
+ runner.CrunchLog.Printf("While closing stdout logs: %v", readerr)
+ }
+
+ closeerr = runner.Stderr.Close()
+ if closeerr != nil {
+ runner.CrunchLog.Printf("While closing stderr logs: %v", readerr)
+ }
+
+ runner.loggingDone <- true
+ close(runner.loggingDone)
+ return
+ }
+ }
+}
+
+// AttachLogs connects the docker container stdout and stderr logs to the
+// Arvados logger which logs to Keep and the API server logs table.
+func (runner *ContainerRunner) AttachStreams() (err error) {
+
+ runner.CrunchLog.Print("Attaching container streams")
+
+ var containerReader io.Reader
+ containerReader, err = runner.Docker.AttachContainer(runner.ContainerID,
+ &dockerclient.AttachOptions{Stream: true, Stdout: true, Stderr: true})
+ if err != nil {
+ return fmt.Errorf("While attaching container logs: %v", err)
+ }
+
+ runner.loggingDone = make(chan bool)
+
+ runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
+ runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
+
+ go runner.ProcessDockerAttach(containerReader)
+
+ return nil
+}
+
// StartContainer creates the container and runs it.
func (runner *ContainerRunner) StartContainer() (err error) {
runner.CrunchLog.Print("Creating Docker container")
for k, v := range runner.ContainerRecord.Environment {
runner.ContainerConfig.Env = append(runner.ContainerConfig.Env, k+"="+v)
}
+ runner.ContainerConfig.NetworkDisabled = true
runner.ContainerID, err = runner.Docker.CreateContainer(&runner.ContainerConfig, "", nil)
if err != nil {
- return
+ return fmt.Errorf("While creating container: %v", err)
}
- hostConfig := &dockerclient.HostConfig{}
+ hostConfig := &dockerclient.HostConfig{Binds: runner.Binds,
+ LogConfig: dockerclient.LogConfig{Type: "none"}}
- runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
- err = runner.Docker.StartContainer(runner.ContainerID, hostConfig)
+ runner.AttachStreams()
if err != nil {
- return
+ return fmt.Errorf("While attaching streams: %v", err)
+ return err
}
- return nil
-}
-
-// AttachLogs connects the docker container stdout and stderr logs to the
-// Arvados logger which logs to Keep and the API server logs table.
-func (runner *ContainerRunner) AttachLogs() (err error) {
-
- runner.CrunchLog.Print("Attaching container logs")
-
- var stderrReader, stdoutReader io.Reader
- stderrReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stderr: true})
- if err != nil {
- return
- }
- stdoutReader, err = runner.Docker.ContainerLogs(runner.ContainerID, &dockerclient.LogOptions{Follow: true, Stdout: true})
+ runner.CrunchLog.Printf("Starting Docker container id '%s'", runner.ContainerID)
+ err = runner.Docker.StartContainer(runner.ContainerID, hostConfig)
if err != nil {
- return
+ return fmt.Errorf("While starting container: %v", err)
}
- runner.loggingDone = make(chan bool)
-
- runner.Stdout = NewThrottledLogger(runner.NewLogWriter("stdout"))
- runner.Stderr = NewThrottledLogger(runner.NewLogWriter("stderr"))
- go ReadWriteLines(stdoutReader, runner.Stdout, runner.loggingDone)
- go ReadWriteLines(stderrReader, runner.Stderr, runner.loggingDone)
-
return nil
}
// WaitFinish waits for the container to terminate, capture the exit code, and
// close the stdout/stderr logging.
func (runner *ContainerRunner) WaitFinish() error {
+ runner.CrunchLog.Print("Waiting for container to finish")
+
result := runner.Docker.Wait(runner.ContainerID)
wr := <-result
if wr.Error != nil {
- return wr.Error
+ return fmt.Errorf("While waiting for container to finish: %v", wr.Error)
}
runner.ExitCode = &wr.ExitCode
- // drain stdout/stderr
- <-runner.loggingDone
+ // wait for stdout/stderr to complete
<-runner.loggingDone
- runner.Stdout.Close()
- runner.Stderr.Close()
+ return nil
+}
+
+// HandleOutput sets the output, unmounts the FUSE mount, and deletes temporary directories
+func (runner *ContainerRunner) CaptureOutput() error {
+ if runner.finalState != "Complete" {
+ return nil
+ }
+
+ if runner.HostOutputDir == "" {
+ return nil
+ }
+
+ _, err := os.Stat(runner.HostOutputDir)
+ if err != nil {
+ return fmt.Errorf("While checking host output path: %v", err)
+ }
+
+ var manifestText string
+
+ collectionMetafile := fmt.Sprintf("%s/.arvados#collection", runner.HostOutputDir)
+ _, err = os.Stat(collectionMetafile)
+ if err != nil {
+ // Regular directory
+ cw := CollectionWriter{runner.Kc, nil, sync.Mutex{}}
+ manifestText, err = cw.WriteTree(runner.HostOutputDir, runner.CrunchLog.Logger)
+ if err != nil {
+ return fmt.Errorf("While uploading output files: %v", err)
+ }
+ } else {
+ // FUSE mount directory
+ file, openerr := os.Open(collectionMetafile)
+ if openerr != nil {
+ return fmt.Errorf("While opening FUSE metafile: %v", err)
+ }
+ defer file.Close()
+
+ rec := CollectionRecord{}
+ err = json.NewDecoder(file).Decode(&rec)
+ if err != nil {
+ return fmt.Errorf("While reading FUSE metafile: %v", err)
+ }
+ manifestText = rec.ManifestText
+ }
+
+ var response CollectionRecord
+ err = runner.ArvClient.Create("collections",
+ arvadosclient.Dict{
+ "collection": arvadosclient.Dict{
+ "manifest_text": manifestText}},
+ &response)
+ if err != nil {
+ return fmt.Errorf("While creating output collection: %v", err)
+ }
+
+ runner.OutputPDH = new(string)
+ *runner.OutputPDH = response.PortableDataHash
return nil
}
+func (runner *ContainerRunner) CleanupDirs() {
+ if runner.ArvMount != nil {
+ umount := exec.Command("fusermount", "-z", "-u", runner.ArvMountPoint)
+ umnterr := umount.Run()
+ if umnterr != nil {
+ runner.CrunchLog.Printf("While running fusermount: %v", umnterr)
+ }
+
+ mnterr := <-runner.ArvMountExit
+ if mnterr != nil {
+ runner.CrunchLog.Printf("Arv-mount exit error: %v", mnterr)
+ }
+ }
+
+ for _, tmpdir := range runner.CleanupTempDir {
+ rmerr := os.RemoveAll(tmpdir)
+ if rmerr != nil {
+ runner.CrunchLog.Printf("While cleaning up temporary directory %s: %v", tmpdir, rmerr)
+ }
+ }
+}
+
// CommitLogs posts the collection containing the final container logs.
func (runner *ContainerRunner) CommitLogs() error {
runner.CrunchLog.Print(runner.finalState)
mt, err := runner.LogCollection.ManifestText()
if err != nil {
- return err
+ return fmt.Errorf("While creating log manifest: %v", err)
}
- response := make(map[string]string)
+ var response CollectionRecord
err = runner.ArvClient.Create("collections",
- arvadosclient.Dict{"name": "logs for " + runner.ContainerRecord.UUID,
- "manifest_text": mt},
- response)
+ arvadosclient.Dict{
+ "collection": arvadosclient.Dict{
+ "name": "logs for " + runner.ContainerRecord.UUID,
+ "manifest_text": mt}},
+ &response)
if err != nil {
- return err
+ return fmt.Errorf("While creating log collection: %v", err)
}
runner.LogsPDH = new(string)
- *runner.LogsPDH = response["portable_data_hash"]
+ *runner.LogsPDH = response.PortableDataHash
return nil
}
// UpdateContainerRecordRunning updates the container state to "Running"
func (runner *ContainerRunner) UpdateContainerRecordRunning() error {
- update := arvadosclient.Dict{"state": "Running"}
- return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, update, nil)
+ return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID,
+ arvadosclient.Dict{"container": arvadosclient.Dict{"state": "Running"}}, nil)
}
// UpdateContainerRecordComplete updates the container record state on API
if runner.ExitCode != nil {
update["exit_code"] = *runner.ExitCode
}
+ if runner.OutputPDH != nil {
+ update["output"] = runner.OutputPDH
+ }
update["state"] = runner.finalState
- return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, update, nil)
+ return runner.ArvClient.Update("containers", runner.ContainerRecord.UUID, arvadosclient.Dict{"container": update}, nil)
}
// NewArvLogWriter creates an ArvLogWriter
}
// Run the full container lifecycle.
-func (runner *ContainerRunner) Run(containerUUID string) (err error) {
- runner.CrunchLog.Printf("Executing container '%s'", containerUUID)
+func (runner *ContainerRunner) Run() (err error) {
+ runner.CrunchLog.Printf("Executing container '%s'", runner.ContainerRecord.UUID)
+
+ hostname, hosterr := os.Hostname()
+ if hosterr != nil {
+ runner.CrunchLog.Printf("Error getting hostname '%v'", hosterr)
+ } else {
+ runner.CrunchLog.Printf("Executing on host '%s'", runner.ContainerRecord.UUID, hostname)
+ }
var runerr, waiterr error
runner.finalState = "Complete"
}
- // (6) write logs
+ // (6) capture output
+ outputerr := runner.CaptureOutput()
+ if outputerr != nil {
+ runner.CrunchLog.Print(outputerr)
+ }
+
+ // (7) clean up temporary directories
+ runner.CleanupDirs()
+
+ // (8) write logs
logerr := runner.CommitLogs()
if logerr != nil {
runner.CrunchLog.Print(logerr)
}
- // (7) update container record with results
+ // (9) update container record with results
updateerr := runner.UpdateContainerRecordComplete()
if updateerr != nil {
runner.CrunchLog.Print(updateerr)
}
}()
- err = runner.ArvClient.Get("containers", containerUUID, nil, &runner.ContainerRecord)
+ err = runner.ArvClient.Get("containers", runner.ContainerRecord.UUID, nil, &runner.ContainerRecord)
if err != nil {
- return
+ return fmt.Errorf("While getting container record: %v", err)
}
- // (0) setup signal handling
+ // (1) setup signal handling
err = runner.SetupSignals()
if err != nil {
- return
+ return fmt.Errorf("While setting up signal handling: %v", err)
}
- // (1) check for and/or load image
+ // (2) check for and/or load image
err = runner.LoadImage()
if err != nil {
- return
+ return fmt.Errorf("While loading container image: %v", err)
}
- // (2) start container
+ // (3) set up FUSE mount and binds
+ err = runner.SetupMounts()
+ if err != nil {
+ return fmt.Errorf("While setting up mounts: %v", err)
+ }
+
+ // (3) create and start container
err = runner.StartContainer()
if err != nil {
if err == ErrCancelled {
return
}
- // (3) update container record state
+ // (4) update container record state
err = runner.UpdateContainerRecordRunning()
if err != nil {
runner.CrunchLog.Print(err)
}
- // (4) attach container logs
- runerr = runner.AttachLogs()
- if runerr != nil {
- runner.CrunchLog.Print(runerr)
- }
-
// (5) wait for container to finish
waiterr = runner.WaitFinish()
// NewContainerRunner creates a new container runner.
func NewContainerRunner(api IArvadosClient,
kc IKeepClient,
- docker ThinDockerClient) *ContainerRunner {
+ docker ThinDockerClient,
+ containerUUID string) *ContainerRunner {
cr := &ContainerRunner{ArvClient: api, Kc: kc, Docker: docker}
cr.NewLogWriter = cr.NewArvLogWriter
- cr.LogCollection = &CollectionWriter{kc, nil}
+ cr.RunArvMount = cr.ArvMountCmd
+ cr.MkTempDir = ioutil.TempDir
+ cr.LogCollection = &CollectionWriter{kc, nil, sync.Mutex{}}
+ cr.ContainerRecord.UUID = containerUUID
cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
+ cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
return cr
}
func main() {
flag.Parse()
+ containerId := flag.Arg(0)
+
api, err := arvadosclient.MakeArvadosClient()
if err != nil {
- log.Fatal(err)
+ log.Fatalf("%s: %v", containerId, err)
}
api.Retries = 8
var kc *keepclient.KeepClient
kc, err = keepclient.MakeKeepClient(&api)
if err != nil {
- log.Fatal(err)
+ log.Fatalf("%s: %v", containerId, err)
}
kc.Retries = 4
var docker *dockerclient.DockerClient
docker, err = dockerclient.NewDockerClient("unix:///var/run/docker.sock", nil)
if err != nil {
- log.Fatal(err)
+ log.Fatalf("%s: %v", containerId, err)
}
- cr := NewContainerRunner(api, kc, docker)
+ cr := NewContainerRunner(api, kc, docker, containerId)
- err = cr.Run(flag.Arg(0))
+ err = cr.Run()
if err != nil {
- log.Fatal(err)
+ log.Fatalf("%s: %v", containerId, err)
}
}
. "gopkg.in/check.v1"
"io"
"io/ioutil"
+ "log"
+ "os"
+ "os/exec"
+ "sort"
"strings"
"syscall"
"testing"
var otherPDH = "a3e8f74c6f101eae01fa08bfb4e49b3a+54"
type TestDockerClient struct {
- imageLoaded string
- stdoutReader io.ReadCloser
- stderrReader io.ReadCloser
- stdoutWriter io.WriteCloser
- stderrWriter io.WriteCloser
- fn func(t *TestDockerClient)
- finish chan dockerclient.WaitResult
- stop chan bool
- cwd string
- env []string
+ imageLoaded string
+ logReader io.ReadCloser
+ logWriter io.WriteCloser
+ fn func(t *TestDockerClient)
+ finish chan dockerclient.WaitResult
+ stop chan bool
+ cwd string
+ env []string
}
func NewTestDockerClient() *TestDockerClient {
t := &TestDockerClient{}
- t.stdoutReader, t.stdoutWriter = io.Pipe()
- t.stderrReader, t.stderrWriter = io.Pipe()
+ t.logReader, t.logWriter = io.Pipe()
t.finish = make(chan dockerclient.WaitResult)
t.stop = make(chan bool)
t.cwd = "/"
}
}
-func (t *TestDockerClient) ContainerLogs(id string, options *dockerclient.LogOptions) (io.ReadCloser, error) {
- if options.Stdout {
- return t.stdoutReader, nil
- }
- if options.Stderr {
- return t.stderrReader, nil
- }
- return nil, nil
+func (t *TestDockerClient) AttachContainer(id string, options *dockerclient.AttachOptions) (io.ReadCloser, error) {
+ return t.logReader, nil
}
func (t *TestDockerClient) Wait(id string) <-chan dockerclient.WaitResult {
this.Content = parameters
if resourceType == "logs" {
- et := parameters["event_type"].(string)
+ et := parameters["log"].(arvadosclient.Dict)["event_type"].(string)
if this.Logs == nil {
this.Logs = make(map[string]*bytes.Buffer)
}
if this.Logs[et] == nil {
this.Logs[et] = &bytes.Buffer{}
}
- this.Logs[et].Write([]byte(parameters["properties"].(map[string]string)["text"]))
+ this.Logs[et].Write([]byte(parameters["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"]))
}
if resourceType == "collections" && output != nil {
- mt := parameters["manifest_text"].(string)
- outmap := output.(map[string]string)
- outmap["portable_data_hash"] = fmt.Sprintf("%x+%d", md5.Sum([]byte(mt)), len(mt))
+ mt := parameters["collection"].(arvadosclient.Dict)["manifest_text"].(string)
+ outmap := output.(*CollectionRecord)
+ outmap.PortableDataHash = fmt.Sprintf("%x+%d", md5.Sum([]byte(mt)), len(mt))
}
return nil
func (this *ArvTestClient) Get(resourceType string, uuid string, parameters arvadosclient.Dict, output interface{}) error {
if resourceType == "collections" {
if uuid == hwPDH {
- output.(*Collection).ManifestText = hwManifest
+ output.(*CollectionRecord).ManifestText = hwManifest
} else if uuid == otherPDH {
- output.(*Collection).ManifestText = otherManifest
+ output.(*CollectionRecord).ManifestText = otherManifest
}
}
if resourceType == "containers" {
this.Content = parameters
if resourceType == "containers" {
- if parameters["state"] == "Running" {
+ if parameters["container"].(arvadosclient.Dict)["state"] == "Running" {
this.WasSetRunning = true
}
func (s *TestSuite) TestLoadImage(c *C) {
kc := &KeepTestClient{}
docker := NewTestDockerClient()
- cr := NewContainerRunner(&ArvTestClient{}, kc, docker)
+ cr := NewContainerRunner(&ArvTestClient{}, kc, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
_, err := cr.Docker.RemoveImage(hwImageId, true)
}
func (this KeepErrorTestClient) PutHB(hash string, buf []byte) (string, int, error) {
- return "", 0, nil
+ return "", 0, errors.New("KeepError")
}
func (this KeepErrorTestClient) ManifestFileReader(m manifest.Manifest, filename string) (keepclient.ReadCloserWithLen, error) {
func (s *TestSuite) TestLoadImageArvError(c *C) {
// (1) Arvados error
- cr := NewContainerRunner(ArvErrorTestClient{}, &KeepTestClient{}, nil)
+ cr := NewContainerRunner(ArvErrorTestClient{}, &KeepTestClient{}, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
cr.ContainerRecord.ContainerImage = hwPDH
err := cr.LoadImage()
- c.Check(err.Error(), Equals, "ArvError")
+ c.Check(err.Error(), Equals, "While getting container image collection: ArvError")
}
func (s *TestSuite) TestLoadImageKeepError(c *C) {
// (2) Keep error
docker := NewTestDockerClient()
- cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, docker)
+ cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
cr.ContainerRecord.ContainerImage = hwPDH
err := cr.LoadImage()
- c.Check(err.Error(), Equals, "KeepError")
+ c.Check(err.Error(), Equals, "While creating ManifestFileReader for container image: KeepError")
}
func (s *TestSuite) TestLoadImageCollectionError(c *C) {
// (3) Collection doesn't contain image
- cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, nil)
+ cr := NewContainerRunner(&ArvTestClient{}, KeepErrorTestClient{}, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
cr.ContainerRecord.ContainerImage = otherPDH
err := cr.LoadImage()
- c.Check(err.Error(), Equals, "First file in the collection does not end in .tar")
+ c.Check(err.Error(), Equals, "First file in the container image collection does not end in .tar")
}
func (s *TestSuite) TestLoadImageKeepReadError(c *C) {
// (4) Collection doesn't contain image
docker := NewTestDockerClient()
- cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, docker)
+ cr := NewContainerRunner(&ArvTestClient{}, KeepReadErrorTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
cr.ContainerRecord.ContainerImage = hwPDH
err := cr.LoadImage()
return nil
}
+func dockerLog(fd byte, msg string) []byte {
+ by := []byte(msg)
+ header := make([]byte, 8+len(by))
+ header[0] = fd
+ header[7] = byte(len(by))
+ copy(header[8:], by)
+ return header
+}
+
func (s *TestSuite) TestRunContainer(c *C) {
docker := NewTestDockerClient()
docker.fn = func(t *TestDockerClient) {
- t.stdoutWriter.Write([]byte("Hello world\n"))
- t.stdoutWriter.Close()
- t.stderrWriter.Close()
+ t.logWriter.Write(dockerLog(1, "Hello world\n"))
+ t.logWriter.Close()
t.finish <- dockerclient.WaitResult{}
}
- cr := NewContainerRunner(&ArvTestClient{}, &KeepTestClient{}, docker)
+ cr := NewContainerRunner(&ArvTestClient{}, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
var logs TestLogs
cr.NewLogWriter = logs.NewTestLoggingWriter
err = cr.StartContainer()
c.Check(err, IsNil)
- err = cr.AttachLogs()
- c.Check(err, IsNil)
-
err = cr.WaitFinish()
c.Check(err, IsNil)
func (s *TestSuite) TestCommitLogs(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
- cr := NewContainerRunner(api, kc, nil)
- cr.ContainerRecord.UUID = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+ cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
cr.CrunchLog.Print("Hello world!")
err := cr.CommitLogs()
c.Check(err, IsNil)
- c.Check(api.Content["name"], Equals, "logs for zzzzz-zzzzz-zzzzzzzzzzzzzzz")
- c.Check(api.Content["manifest_text"], Equals, ". 744b2e4553123b02fa7b452ec5c18993+123 0:123:crunch-run.txt\n")
+ c.Check(api.Content["collection"].(arvadosclient.Dict)["name"], Equals, "logs for zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ c.Check(api.Content["collection"].(arvadosclient.Dict)["manifest_text"], Equals, ". 744b2e4553123b02fa7b452ec5c18993+123 0:123:crunch-run.txt\n")
c.Check(*cr.LogsPDH, Equals, "63da7bdacf08c40f604daad80c261e9a+60")
}
func (s *TestSuite) TestUpdateContainerRecordRunning(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
- cr := NewContainerRunner(api, kc, nil)
- cr.ContainerRecord.UUID = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+ cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
err := cr.UpdateContainerRecordRunning()
c.Check(err, IsNil)
- c.Check(api.Content["state"], Equals, "Running")
+ c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Running")
}
func (s *TestSuite) TestUpdateContainerRecordComplete(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
- cr := NewContainerRunner(api, kc, nil)
- cr.ContainerRecord.UUID = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+ cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
cr.LogsPDH = new(string)
*cr.LogsPDH = "d3a229d2fe3690c2c3e75a71a153c6a3+60"
err := cr.UpdateContainerRecordComplete()
c.Check(err, IsNil)
- c.Check(api.Content["log"], Equals, *cr.LogsPDH)
- c.Check(api.Content["exit_code"], Equals, *cr.ExitCode)
- c.Check(api.Content["state"], Equals, "Complete")
+ c.Check(api.Content["container"].(arvadosclient.Dict)["log"], Equals, *cr.LogsPDH)
+ c.Check(api.Content["container"].(arvadosclient.Dict)["exit_code"], Equals, *cr.ExitCode)
+ c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
}
func (s *TestSuite) TestUpdateContainerRecordCancelled(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
- cr := NewContainerRunner(api, kc, nil)
- cr.ContainerRecord.UUID = "zzzzz-zzzzz-zzzzzzzzzzzzzzz"
+ cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
cr.Cancelled = true
cr.finalState = "Cancelled"
err := cr.UpdateContainerRecordComplete()
c.Check(err, IsNil)
- c.Check(api.Content["log"], IsNil)
- c.Check(api.Content["exit_code"], IsNil)
- c.Check(api.Content["state"], Equals, "Cancelled")
+ c.Check(api.Content["container"].(arvadosclient.Dict)["log"], IsNil)
+ c.Check(api.Content["container"].(arvadosclient.Dict)["exit_code"], IsNil)
+ c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Cancelled")
}
// Used by the TestFullRun*() test below to DRY up boilerplate setup to do full
docker.RemoveImage(hwImageId, true)
api = &ArvTestClient{ContainerRecord: rec}
- cr = NewContainerRunner(api, &KeepTestClient{}, docker)
+ cr = NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
- err = cr.Run("zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ err = cr.Run()
c.Check(err, IsNil)
c.Check(api.WasSetRunning, Equals, true)
- c.Check(api.Content["log"], NotNil)
+ c.Check(api.Content["container"].(arvadosclient.Dict)["log"], NotNil)
if err != nil {
for k, v := range api.Logs {
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
"environment": {},
- "mounts": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
}`, func(t *TestDockerClient) {
- t.stdoutWriter.Write([]byte("hello world\n"))
- t.stdoutWriter.Close()
- t.stderrWriter.Close()
+ t.logWriter.Write(dockerLog(1, "hello world\n"))
+ t.logWriter.Close()
t.finish <- dockerclient.WaitResult{}
})
- c.Check(api.Content["exit_code"], Equals, 0)
- c.Check(api.Content["state"], Equals, "Complete")
+ c.Check(api.Content["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
+ c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "hello world\n"), Equals, true)
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
"environment": {},
- "mounts": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
}`, func(t *TestDockerClient) {
- t.stdoutWriter.Write([]byte("hello\n"))
- t.stderrWriter.Write([]byte("world\n"))
- t.stdoutWriter.Close()
- t.stderrWriter.Close()
+ t.logWriter.Write(dockerLog(1, "hello\n"))
+ t.logWriter.Write(dockerLog(2, "world\n"))
+ t.logWriter.Close()
t.finish <- dockerclient.WaitResult{ExitCode: 1}
})
- c.Check(api.Content["log"], NotNil)
- c.Check(api.Content["exit_code"], Equals, 1)
- c.Check(api.Content["state"], Equals, "Complete")
+ c.Check(api.Content["container"].(arvadosclient.Dict)["log"], NotNil)
+ c.Check(api.Content["container"].(arvadosclient.Dict)["exit_code"], Equals, 1)
+ c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "hello\n"), Equals, true)
c.Check(strings.HasSuffix(api.Logs["stderr"].String(), "world\n"), Equals, true)
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
"environment": {},
- "mounts": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
}`, func(t *TestDockerClient) {
- t.stdoutWriter.Write([]byte(t.cwd + "\n"))
- t.stdoutWriter.Close()
- t.stderrWriter.Close()
+ t.logWriter.Write(dockerLog(1, t.cwd+"\n"))
+ t.logWriter.Close()
t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
- c.Check(api.Content["exit_code"], Equals, 0)
- c.Check(api.Content["state"], Equals, "Complete")
+ c.Check(api.Content["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
+ c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
+
+ log.Print(api.Logs["stdout"].String())
c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/\n"), Equals, true)
}
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": "/bin",
"environment": {},
- "mounts": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
}`, func(t *TestDockerClient) {
- t.stdoutWriter.Write([]byte(t.cwd + "\n"))
- t.stdoutWriter.Close()
- t.stderrWriter.Close()
+ t.logWriter.Write(dockerLog(1, t.cwd+"\n"))
+ t.logWriter.Close()
t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
- c.Check(api.Content["exit_code"], Equals, 0)
- c.Check(api.Content["state"], Equals, "Complete")
+ c.Check(api.Content["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
+ c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "/bin\n"), Equals, true)
}
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": ".",
"environment": {},
- "mounts": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
docker := NewTestDockerClient()
docker.fn = func(t *TestDockerClient) {
<-t.stop
- t.stdoutWriter.Write([]byte("foo\n"))
- t.stdoutWriter.Close()
- t.stderrWriter.Close()
+ t.logWriter.Write(dockerLog(1, "foo\n"))
+ t.logWriter.Close()
t.finish <- dockerclient.WaitResult{ExitCode: 0}
}
docker.RemoveImage(hwImageId, true)
api := &ArvTestClient{ContainerRecord: rec}
- cr := NewContainerRunner(api, &KeepTestClient{}, docker)
+ cr := NewContainerRunner(api, &KeepTestClient{}, docker, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
go func() {
for cr.ContainerID == "" {
cr.SigChan <- syscall.SIGINT
}()
- err = cr.Run("zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ err = cr.Run()
c.Check(err, IsNil)
- c.Check(api.Content["log"], NotNil)
+ c.Check(api.Content["container"].(arvadosclient.Dict)["log"], NotNil)
if err != nil {
for k, v := range api.Logs {
}
}
- c.Check(api.Content["state"], Equals, "Cancelled")
+ c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Cancelled")
c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "foo\n"), Equals, true)
"container_image": "d4ab34d3d4f8a72f5c4973051ae69fab+122",
"cwd": "/bin",
"environment": {"FROBIZ": "bilbo"},
- "mounts": {},
+ "mounts": {"/tmp": {"kind": "tmp"} },
"output_path": "/tmp",
"priority": 1,
"runtime_constraints": {}
}`, func(t *TestDockerClient) {
- t.stdoutWriter.Write([]byte(t.env[0][7:] + "\n"))
- t.stdoutWriter.Close()
- t.stderrWriter.Close()
+ t.logWriter.Write(dockerLog(1, t.env[0][7:]+"\n"))
+ t.logWriter.Close()
t.finish <- dockerclient.WaitResult{ExitCode: 0}
})
- c.Check(api.Content["exit_code"], Equals, 0)
- c.Check(api.Content["state"], Equals, "Complete")
+ c.Check(api.Content["container"].(arvadosclient.Dict)["exit_code"], Equals, 0)
+ c.Check(api.Content["container"].(arvadosclient.Dict)["state"], Equals, "Complete")
c.Check(strings.HasSuffix(api.Logs["stdout"].String(), "bilbo\n"), Equals, true)
}
+
+type ArvMountCmdLine struct {
+ Cmd []string
+}
+
+func (am *ArvMountCmdLine) ArvMountTest(c []string) (*exec.Cmd, error) {
+ am.Cmd = c
+ return nil, nil
+}
+
+func (s *TestSuite) TestSetupMounts(c *C) {
+ api := &ArvTestClient{}
+ kc := &KeepTestClient{}
+ cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzz-zzzzzzzzzzzzzzz")
+ am := &ArvMountCmdLine{}
+ cr.RunArvMount = am.ArvMountTest
+
+ i := 0
+ cr.MkTempDir = func(string, string) (string, error) {
+ i += 1
+ d := fmt.Sprintf("/tmp/mktmpdir%d", i)
+ os.Mkdir(d, os.ModePerm)
+ return d, nil
+ }
+
+ {
+ cr.ContainerRecord.Mounts = make(map[string]Mount)
+ cr.ContainerRecord.Mounts["/tmp"] = Mount{Kind: "tmp"}
+ cr.OutputPath = "/tmp"
+
+ err := cr.SetupMounts()
+ c.Check(err, IsNil)
+ c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--mount-by-pdh", "by_id", "/tmp/mktmpdir1"})
+ c.Check(cr.Binds, DeepEquals, []string{"/tmp/mktmpdir2:/tmp"})
+ cr.CleanupDirs()
+ }
+
+ {
+ i = 0
+ cr.ContainerRecord.Mounts = make(map[string]Mount)
+ cr.ContainerRecord.Mounts["/keeptmp"] = Mount{Kind: "collection", Writable: true}
+ cr.OutputPath = "/keeptmp"
+
+ os.MkdirAll("/tmp/mktmpdir1/tmp0", os.ModePerm)
+
+ err := cr.SetupMounts()
+ c.Check(err, IsNil)
+ c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", "/tmp/mktmpdir1"})
+ c.Check(cr.Binds, DeepEquals, []string{"/tmp/mktmpdir1/tmp0:/keeptmp"})
+ cr.CleanupDirs()
+ }
+
+ {
+ i = 0
+ cr.ContainerRecord.Mounts = make(map[string]Mount)
+ cr.ContainerRecord.Mounts["/keepinp"] = Mount{Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"}
+ cr.ContainerRecord.Mounts["/keepout"] = Mount{Kind: "collection", Writable: true}
+ cr.OutputPath = "/keepout"
+
+ os.MkdirAll("/tmp/mktmpdir1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
+ os.MkdirAll("/tmp/mktmpdir1/tmp0", os.ModePerm)
+
+ err := cr.SetupMounts()
+ c.Check(err, IsNil)
+ c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--mount-tmp", "tmp0", "--mount-by-pdh", "by_id", "/tmp/mktmpdir1"})
+ var ss sort.StringSlice = cr.Binds
+ ss.Sort()
+ c.Check(cr.Binds, DeepEquals, []string{"/tmp/mktmpdir1/by_id/59389a8f9ee9d399be35462a0f92541c+53:/keepinp:ro",
+ "/tmp/mktmpdir1/tmp0:/keepout"})
+ cr.CleanupDirs()
+ }
+}
stop bool
flusherDone chan bool
Timestamper
+ Immediate *log.Logger
}
// RFC3339Fixed is a fixed-width version of RFC3339 with microsecond precision,
sc := bufio.NewScanner(bytes.NewBuffer(p))
for sc.Scan() {
_, err = fmt.Fprintf(tl.buf, "%s %s\n", now, sc.Text())
+ if tl.Immediate != nil {
+ tl.Immediate.Printf("%s %s\n", now, sc.Text())
+ }
}
return len(p), err
}
}
// write to API
- lr := arvadosclient.Dict{"object_uuid": arvlog.UUID,
- "event_type": arvlog.loggingStream,
- "properties": map[string]string{"text": string(p)}}
+ lr := arvadosclient.Dict{"log": arvadosclient.Dict{
+ "object_uuid": arvlog.UUID,
+ "event_type": arvlog.loggingStream,
+ "properties": map[string]string{"text": string(p)}}}
err2 := arvlog.ArvClient.Create("logs", lr, nil)
if err1 != nil || err2 != nil {
import (
"fmt"
+ "git.curoverse.com/arvados.git/sdk/go/arvadosclient"
. "gopkg.in/check.v1"
"time"
)
func (s *LoggingTestSuite) TestWriteLogs(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
- cr := NewContainerRunner(api, kc, nil)
+ cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
cr.CrunchLog.Print("Hello world!")
logtext := "2015-12-29T15:51:45.000000001Z Hello world!\n" +
"2015-12-29T15:51:45.000000002Z Goodbye\n"
- c.Check(api.Content["event_type"], Equals, "crunch-run")
- c.Check(api.Content["properties"].(map[string]string)["text"], Equals, logtext)
+ c.Check(api.Content["log"].(arvadosclient.Dict)["event_type"], Equals, "crunch-run")
+ c.Check(api.Content["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"], Equals, logtext)
c.Check(string(kc.Content), Equals, logtext)
}
func (s *LoggingTestSuite) TestWriteLogsLarge(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
- cr := NewContainerRunner(api, kc, nil)
+ cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
cr.CrunchLog.Timestamper = (&TestTimestamper{}).Timestamp
+ cr.CrunchLog.Immediate = nil
for i := 0; i < 2000000; i += 1 {
cr.CrunchLog.Printf("Hello %d", i)
func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) {
api := &ArvTestClient{}
kc := &KeepTestClient{}
- cr := NewContainerRunner(api, kc, nil)
+ cr := NewContainerRunner(api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
ts := &TestTimestamper{}
cr.CrunchLog.Timestamper = ts.Timestamp
stdout := NewThrottledLogger(cr.NewLogWriter("stdout"))
cr.CrunchLog.Close()
logtext1 := "2015-12-29T15:51:45.000000001Z Hello world!\n" +
"2015-12-29T15:51:45.000000003Z Goodbye\n"
- c.Check(api.Content["event_type"], Equals, "crunch-run")
- c.Check(api.Content["properties"].(map[string]string)["text"], Equals, logtext1)
+ c.Check(api.Content["log"].(arvadosclient.Dict)["event_type"], Equals, "crunch-run")
+ c.Check(api.Content["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"], Equals, logtext1)
stdout.Close()
logtext2 := "2015-12-29T15:51:45.000000002Z Doing stuff\n" +
"2015-12-29T15:51:45.000000004Z Blurb\n"
- c.Check(api.Content["event_type"], Equals, "stdout")
- c.Check(api.Content["properties"].(map[string]string)["text"], Equals, logtext2)
+ c.Check(api.Content["log"].(arvadosclient.Dict)["event_type"], Equals, "stdout")
+ c.Check(api.Content["log"].(arvadosclient.Dict)["properties"].(map[string]string)["text"], Equals, logtext2)
mt, err := cr.LogCollection.ManifestText()
c.Check(err, IsNil)
"git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/manifest"
"io"
+ "log"
+ "os"
+ "path/filepath"
"strings"
+ "sync"
)
// Block is a data block in a manifest stream
return nil
}
+func (m *CollectionFileWriter) NewFile(fn string) {
+ m.offset += m.length
+ m.length = 0
+ m.fn = fn
+}
+
func (m *CollectionFileWriter) goUpload() {
var errors []error
uploader := m.uploader
type CollectionWriter struct {
IKeepClient
Streams []*CollectionFileWriter
+ mtx sync.Mutex
}
// Open a new file for writing in the Keep collection.
fn}
go fw.goUpload()
+ m.mtx.Lock()
+ defer m.mtx.Unlock()
m.Streams = append(m.Streams, fw)
return fw
// Finish writing the collection, wait for all blocks to complete uploading.
func (m *CollectionWriter) Finish() error {
var errstring string
+ m.mtx.Lock()
+ defer m.mtx.Unlock()
+
for _, stream := range m.Streams {
if stream.uploader == nil {
continue
var buf bytes.Buffer
+ m.mtx.Lock()
+ defer m.mtx.Unlock()
for _, v := range m.Streams {
+ if len(v.FileStreamSegments) == 0 {
+ continue
+ }
k := v.StreamName
if k == "." {
buf.WriteString(".")
k = strings.Replace(k, "\n", "", -1)
buf.WriteString("./" + k)
}
- for _, b := range v.Blocks {
- buf.WriteString(" ")
- buf.WriteString(b)
+ if len(v.Blocks) > 0 {
+ for _, b := range v.Blocks {
+ buf.WriteString(" ")
+ buf.WriteString(b)
+ }
+ } else {
+ buf.WriteString(" d41d8cd98f00b204e9800998ecf8427e+0")
}
for _, f := range v.FileStreamSegments {
buf.WriteString(" ")
}
return buf.String(), nil
}
+
+type WalkUpload struct {
+ kc IKeepClient
+ stripPrefix string
+ streamMap map[string]*CollectionFileWriter
+ status *log.Logger
+}
+
+// WalkFunc walks a directory tree, uploads each file found and adds it to the
+// CollectionWriter.
+func (m *WalkUpload) WalkFunc(path string, info os.FileInfo, err error) error {
+
+ if info.IsDir() {
+ return nil
+ }
+
+ var dir string
+ if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
+ dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
+ }
+ if dir == "" {
+ dir = "."
+ }
+
+ fn := path[(len(path) - len(info.Name())):]
+
+ if m.streamMap[dir] == nil {
+ m.streamMap[dir] = &CollectionFileWriter{
+ m.kc,
+ &manifest.ManifestStream{StreamName: dir},
+ 0,
+ 0,
+ nil,
+ make(chan *Block),
+ make(chan []error),
+ ""}
+ go m.streamMap[dir].goUpload()
+ }
+
+ fileWriter := m.streamMap[dir]
+
+ // Reset the CollectionFileWriter for a new file
+ fileWriter.NewFile(fn)
+
+ file, err := os.Open(path)
+ if err != nil {
+ return err
+ }
+ defer file.Close()
+
+ m.status.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
+
+ _, err = io.Copy(fileWriter, file)
+ if err != nil {
+ return err
+ }
+
+ // Commits the current file. Legal to call this repeatedly.
+ fileWriter.Close()
+
+ return nil
+}
+
+func (cw *CollectionWriter) WriteTree(root string, status *log.Logger) (manifest string, err error) {
+ streamMap := make(map[string]*CollectionFileWriter)
+ wu := &WalkUpload{cw.IKeepClient, root, streamMap, status}
+ err = filepath.Walk(root, wu.WalkFunc)
+
+ if err != nil {
+ return "", err
+ }
+
+ cw.mtx.Lock()
+ for _, st := range streamMap {
+ cw.Streams = append(cw.Streams, st)
+ }
+ cw.mtx.Unlock()
+
+ return cw.ManifestText()
+}
--- /dev/null
+package main
+
+import (
+ . "gopkg.in/check.v1"
+ "io/ioutil"
+ "log"
+ "os"
+ "sync"
+)
+
+type UploadTestSuite struct{}
+
+// Gocheck boilerplate
+var _ = Suite(&UploadTestSuite{})
+
+func (s *TestSuite) TestSimpleUpload(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+
+ cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+ str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+ c.Check(err, IsNil)
+ c.Check(str, Equals, ". acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt\n")
+}
+
+func (s *TestSuite) TestSimpleUploadTwofiles(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+ ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
+
+ cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+ str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+
+ c.Check(err, IsNil)
+ c.Check(str, Equals, ". 3858f62230ac3c915f300c664312c63f+6 0:3:file1.txt 3:3:file2.txt\n")
+}
+
+func (s *TestSuite) TestSimpleUploadSubdir(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ os.Mkdir(tmpdir+"/subdir", 0700)
+
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+ ioutil.WriteFile(tmpdir+"/subdir/file2.txt", []byte("bar"), 0600)
+
+ cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+ str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+
+ c.Check(err, IsNil)
+
+ // streams can get added in either order because of scheduling
+ // of goroutines.
+ if str != `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
+./subdir 37b51d194a7513e45b56f6524f2d51f2+3 0:3:file2.txt
+` && str != `./subdir 37b51d194a7513e45b56f6524f2d51f2+3 0:3:file2.txt
+. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
+` {
+ c.Error("Did not get expected manifest text")
+ }
+}
+
+func (s *TestSuite) TestSimpleUploadLarge(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ file, _ := os.Create(tmpdir + "/" + "file1.txt")
+ data := make([]byte, 1024*1024-1)
+ for i := range data {
+ data[i] = byte(i % 10)
+ }
+ for i := 0; i < 65; i++ {
+ file.Write(data)
+ }
+ file.Close()
+
+ ioutil.WriteFile(tmpdir+"/"+"file2.txt", []byte("bar"), 0600)
+
+ cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+ str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+
+ c.Check(err, IsNil)
+ c.Check(str, Equals, ". 00ecf01e0d93385115c9f8bed757425d+67108864 485cd630387b6b1846fe429f261ea05f+1048514 0:68157375:file1.txt 68157375:3:file2.txt\n")
+}
+
+func (s *TestSuite) TestUploadEmptySubdir(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ os.Mkdir(tmpdir+"/subdir", 0700)
+
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+
+ cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+ str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+
+ c.Check(err, IsNil)
+ c.Check(str, Equals, `. acbd18db4cc2f85cedef654fccc4a4d8+3 0:3:file1.txt
+`)
+}
+
+func (s *TestSuite) TestUploadEmptyFile(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte(""), 0600)
+
+ cw := CollectionWriter{&KeepTestClient{}, nil, sync.Mutex{}}
+ str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+
+ c.Check(err, IsNil)
+ c.Check(str, Equals, `. d41d8cd98f00b204e9800998ecf8427e+0 0:0:file1.txt
+`)
+}
+
+func (s *TestSuite) TestUploadError(c *C) {
+ tmpdir, _ := ioutil.TempDir("", "")
+ defer func() {
+ os.RemoveAll(tmpdir)
+ }()
+
+ ioutil.WriteFile(tmpdir+"/"+"file1.txt", []byte("foo"), 0600)
+
+ cw := CollectionWriter{&KeepErrorTestClient{}, nil, sync.Mutex{}}
+ str, err := cw.WriteTree(tmpdir, log.New(os.Stdout, "", 0))
+
+ c.Check(err, NotNil)
+ c.Check(str, Equals, "")
+}