Merge branch 'master' into 8019-crunchrun-log-throttle
authorradhika <radhika@curoverse.com>
Fri, 5 May 2017 16:02:36 +0000 (12:02 -0400)
committerradhika <radhika@curoverse.com>
Fri, 5 May 2017 16:02:36 +0000 (12:02 -0400)
1  2 
services/crunch-run/crunchrun.go
services/crunch-run/crunchrun_test.go

index 812525db6904ba1201a54502c5fd781686b0188b,c6847bc19db3116d7286b98e1914f144c8d66a8b..07937291724c8cde9f6a938414924e205e63c0ef
@@@ -430,24 -430,25 +430,25 @@@ func (runner *ContainerRunner) SetupMou
                        }
                        collectionPaths = append(collectionPaths, src)
  
-               case mnt.Kind == "tmp" && bind == runner.Container.OutputPath:
-                       runner.HostOutputDir, err = runner.MkTempDir("", "")
+               case mnt.Kind == "tmp":
+                       var tmpdir string
+                       tmpdir, err = runner.MkTempDir("", "")
                        if err != nil {
                                return fmt.Errorf("While creating mount temp dir: %v", err)
                        }
-                       st, staterr := os.Stat(runner.HostOutputDir)
+                       st, staterr := os.Stat(tmpdir)
                        if staterr != nil {
                                return fmt.Errorf("While Stat on temp dir: %v", staterr)
                        }
-                       err = os.Chmod(runner.HostOutputDir, st.Mode()|os.ModeSetgid|0777)
+                       err = os.Chmod(tmpdir, st.Mode()|os.ModeSetgid|0777)
                        if staterr != nil {
                                return fmt.Errorf("While Chmod temp dir: %v", err)
                        }
-                       runner.CleanupTempDir = append(runner.CleanupTempDir, runner.HostOutputDir)
-                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", runner.HostOutputDir, bind))
-               case mnt.Kind == "tmp":
-                       runner.Volumes[bind] = struct{}{}
+                       runner.CleanupTempDir = append(runner.CleanupTempDir, tmpdir)
+                       runner.Binds = append(runner.Binds, fmt.Sprintf("%s:%s", tmpdir, bind))
+                       if bind == runner.Container.OutputPath {
+                               runner.HostOutputDir = tmpdir
+                       }
  
                case mnt.Kind == "json":
                        jsondata, err := json.Marshal(mnt.Content)
@@@ -633,12 -634,11 +634,12 @@@ func (runner *ContainerRunner) LogNodeI
  // Get and save the raw JSON container record from the API server
  func (runner *ContainerRunner) LogContainerRecord() (err error) {
        w := &ArvLogWriter{
 -              runner.ArvClient,
 -              runner.Container.UUID,
 -              "container",
 -              runner.LogCollection.Open("container.json"),
 +              ArvClient:     runner.ArvClient,
 +              UUID:          runner.Container.UUID,
 +              loggingStream: "container",
 +              writeCloser:   runner.LogCollection.Open("container.json"),
        }
 +
        // Get Container record JSON from the API Server
        reader, err := runner.ArvClient.CallRaw("GET", "containers", runner.Container.UUID, "", nil)
        if err != nil {
@@@ -1066,8 -1066,8 +1067,8 @@@ func (runner *ContainerRunner) CommitLo
        // point, but re-open crunch log with ArvClient in case there are any
        // other further (such as failing to write the log to Keep!) while
        // shutting down
 -      runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{runner.ArvClient, runner.Container.UUID,
 -              "crunch-run", nil})
 +      runner.CrunchLog = NewThrottledLogger(&ArvLogWriter{ArvClient: runner.ArvClient,
 +              UUID: runner.Container.UUID, loggingStream: "crunch-run", writeCloser: nil})
  
        if runner.LogsPDH != nil {
                // If we have already assigned something to LogsPDH,
@@@ -1154,8 -1154,7 +1155,8 @@@ func (runner *ContainerRunner) IsCancel
  
  // NewArvLogWriter creates an ArvLogWriter
  func (runner *ContainerRunner) NewArvLogWriter(name string) io.WriteCloser {
 -      return &ArvLogWriter{runner.ArvClient, runner.Container.UUID, name, runner.LogCollection.Open(name + ".txt")}
 +      return &ArvLogWriter{ArvClient: runner.ArvClient, UUID: runner.Container.UUID, loggingStream: name,
 +              writeCloser: runner.LogCollection.Open(name + ".txt")}
  }
  
  // Run the full container lifecycle.
@@@ -1295,9 -1294,6 +1296,9 @@@ func NewContainerRunner(api IArvadosCli
        cr.Container.UUID = containerUUID
        cr.CrunchLog = NewThrottledLogger(cr.NewLogWriter("crunch-run"))
        cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
 +
 +      loadLogThrottleParams(api)
 +
        return cr
  }
  
index 5bfbf796f045c2d90fe45495dc7c645151c190ca,183752b7f4010ce9f3cb75d94ae8a5bba1d0e03c..c8427563cb9447ceada0af2d44a3ef30586672ab
@@@ -249,16 -249,7 +249,16 @@@ func (client *ArvTestClient) Update(res
        return nil
  }
  
 -var discoveryMap = map[string]interface{}{"defaultTrashLifetime": float64(1209600)}
 +var discoveryMap = map[string]interface{}{
 +      "defaultTrashLifetime":               float64(1209600),
 +      "crunchLimitLogBytesPerJob":          float64(67108864),
 +      "crunchLogThrottleBytes":             float64(65536),
 +      "crunchLogThrottlePeriod":            float64(60),
 +      "crunchLogThrottleLines":             float64(1024),
 +      "crunchLogPartialLineThrottlePeriod": float64(5),
 +      "crunchLogBytesPerEvent":             float64(4096),
 +      "crunchLogSecondsBetweenEvents":      float64(1),
 +}
  
  func (client *ArvTestClient) Discovery(key string) (interface{}, error) {
        return discoveryMap[key], nil
@@@ -977,6 -968,22 +977,22 @@@ func (s *TestSuite) TestSetupMounts(c *
                checkEmpty()
        }
  
+       {
+               i = 0
+               cr.ArvMountPoint = ""
+               cr.Container.Mounts = make(map[string]arvados.Mount)
+               cr.Container.Mounts["/out"] = arvados.Mount{Kind: "tmp"}
+               cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"}
+               cr.OutputPath = "/out"
+               err := cr.SetupMounts()
+               c.Check(err, IsNil)
+               c.Check(am.Cmd, DeepEquals, []string{"--foreground", "--allow-other", "--read-write", "--mount-by-pdh", "by_id", realTemp + "/keep1"})
+               c.Check(cr.Binds, DeepEquals, []string{realTemp + "/2:/out", realTemp + "/3:/tmp"})
+               cr.CleanupDirs()
+               checkEmpty()
+       }
        {
                i = 0
                cr.ArvMountPoint = ""