+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: Apache-2.0
+
package main
import (
"bytes"
"crypto/md5"
+ "errors"
"fmt"
- "git.curoverse.com/arvados.git/sdk/go/keepclient"
- "git.curoverse.com/arvados.git/sdk/go/manifest"
"io"
"log"
"os"
"path/filepath"
+ "sort"
+ "strings"
+
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.curoverse.com/arvados.git/sdk/go/manifest"
)
type Block struct {
offset int64
*Block
uploader chan *Block
+ finish chan []error
}
type IKeepClient interface {
PutHB(hash string, buf []byte) (string, int, error)
}
-func (m *ManifestStreamWriter) Write(p []byte) (n int, err error) {
- // Needed to conform to Writer interface, but not implemented
- // because io.Copy will actually use ReadFrom instead.
- return 0, nil
+func (m *ManifestStreamWriter) Write(p []byte) (int, error) {
+ n, err := m.ReadFrom(bytes.NewReader(p))
+ return int(n), err
}
func (m *ManifestStreamWriter) ReadFrom(r io.Reader) (n int64, err error) {
count, err = r.Read(m.Block.data[m.Block.offset:])
total += int64(count)
m.Block.offset += int64(count)
- if count > 0 {
- if m.Block.offset == keepclient.BLOCKSIZE {
- m.uploader <- m.Block
- m.Block = nil
- }
+ if m.Block.offset == keepclient.BLOCKSIZE {
+ m.uploader <- m.Block
+ m.Block = nil
}
}
- return total, err
+ if err == io.EOF {
+ return total, nil
+ } else {
+ return total, err
+ }
+
}
func (m *ManifestStreamWriter) goUpload() {
- select {
- case block, valid := <-m.uploader:
- if !valid {
- return
- }
+ var errors []error
+ uploader := m.uploader
+ finish := m.finish
+ for block := range uploader {
hash := fmt.Sprintf("%x", md5.Sum(block.data[0:block.offset]))
- signedHash, _, _ := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
- m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
+ signedHash, _, err := m.ManifestWriter.IKeepClient.PutHB(hash, block.data[0:block.offset])
+ if err != nil {
+ errors = append(errors, err)
+ } else {
+ m.ManifestStream.Blocks = append(m.ManifestStream.Blocks, signedHash)
+ }
}
-
+ finish <- errors
}
type ManifestWriter struct {
Streams map[string]*ManifestStreamWriter
}
-type walker struct {
- currentDir string
- m *ManifestWriter
-}
-
-func (w walker) WalkFunc(path string, info os.FileInfo, err error) error {
- log.Print("path ", path, " ", info.Name(), " ", info.IsDir())
+func (m *ManifestWriter) WalkFunc(path string, info os.FileInfo, err error) error {
+ if err != nil {
+ return err
+ }
- if info.IsDir() {
- if path == w.currentDir {
- return nil
+ targetPath, targetInfo := path, info
+ if info.Mode()&os.ModeSymlink != 0 {
+ // Update targetpath/info to reflect the symlink
+ // target, not the symlink itself
+ targetPath, err = filepath.EvalSymlinks(path)
+ if err != nil {
+ return err
+ }
+ targetInfo, err = os.Stat(targetPath)
+ if err != nil {
+ return fmt.Errorf("stat symlink %q target %q: %s", path, targetPath, err)
}
- return filepath.Walk(path, walker{path, w.m}.WalkFunc)
}
- m := w.m
- dir := path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()))]
+ if targetInfo.Mode()&os.ModeType != 0 {
+ // Skip directories, pipes, other non-regular files
+ return nil
+ }
+
+ var dir string
+ if len(path) > (len(m.stripPrefix) + len(info.Name()) + 1) {
+ dir = path[len(m.stripPrefix)+1 : (len(path) - len(info.Name()) - 1)]
+ }
+ if dir == "" {
+ dir = "."
+ }
+
fn := path[(len(path) - len(info.Name())):]
if m.Streams[dir] == nil {
&manifest.ManifestStream{StreamName: dir},
0,
nil,
- make(chan *Block)}
+ make(chan *Block),
+ make(chan []error)}
go m.Streams[dir].goUpload()
}
return err
}
+ log.Printf("Uploading %v/%v (%v bytes)", dir, fn, info.Size())
+
var count int64
count, err = io.Copy(stream, file)
- if err != nil && err != io.EOF {
+ if err != nil {
return err
}
stream.offset += count
- stream.ManifestStream.Files = append(stream.ManifestStream.Files,
- fmt.Sprintf("%v:%v:%v", fileStart, count, fn))
+ stream.ManifestStream.FileStreamSegments = append(stream.ManifestStream.FileStreamSegments,
+ manifest.FileStreamSegment{uint64(fileStart), uint64(count), fn})
return nil
}
-func (m *ManifestWriter) Finish() {
- for _, v := range m.Streams {
- if v.uploader != nil {
- if v.Block != nil {
- v.uploader <- v.Block
- }
- close(v.uploader)
- v.uploader = nil
+func (m *ManifestWriter) Finish() error {
+ var errstring string
+ for _, stream := range m.Streams {
+ if stream.uploader == nil {
+ continue
}
+ if stream.Block != nil {
+ stream.uploader <- stream.Block
+ }
+ close(stream.uploader)
+ stream.uploader = nil
+
+ errors := <-stream.finish
+ close(stream.finish)
+ stream.finish = nil
+
+ for _, r := range errors {
+ errstring = fmt.Sprintf("%v%v\n", errstring, r.Error())
+ }
+ }
+ if errstring != "" {
+ return errors.New(errstring)
+ } else {
+ return nil
}
}
func (m *ManifestWriter) ManifestText() string {
m.Finish()
var buf bytes.Buffer
- for k, v := range m.Streams {
- if k == "" {
+
+ dirs := make([]string, len(m.Streams))
+ i := 0
+ for k := range m.Streams {
+ dirs[i] = k
+ i++
+ }
+ sort.Strings(dirs)
+
+ for _, k := range dirs {
+ v := m.Streams[k]
+
+ if k == "." {
buf.WriteString(".")
} else {
+ k = strings.Replace(k, " ", "\\040", -1)
+ k = strings.Replace(k, "\n", "", -1)
buf.WriteString("./" + k)
}
for _, b := range v.Blocks {
buf.WriteString(" ")
buf.WriteString(b)
}
- for _, f := range v.Files {
+ for _, f := range v.FileStreamSegments {
buf.WriteString(" ")
- buf.WriteString(f)
+ name := strings.Replace(f.Name, " ", "\\040", -1)
+ name = strings.Replace(name, "\n", "", -1)
+ buf.WriteString(fmt.Sprintf("%d:%d:%s", f.SegPos, f.SegLen, name))
}
buf.WriteString("\n")
}
func WriteTree(kc IKeepClient, root string) (manifest string, err error) {
mw := ManifestWriter{kc, root, map[string]*ManifestStreamWriter{}}
- err = filepath.Walk(root, walker{root, &mw}.WalkFunc)
- mw.Finish()
+ err = filepath.Walk(root, mw.WalkFunc)
if err != nil {
return "", err
- } else {
- return mw.ManifestText(), nil
}
+ err = mw.Finish()
+ if err != nil {
+ return "", err
+ }
+
+ return mw.ManifestText(), nil
}