projects
/
arvados.git
/ blobdiff
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Merge branch '8008-package-testing' refs #8008
[arvados.git]
/
sdk
/
go
/
manifest
/
manifest.go
diff --git
a/sdk/go/manifest/manifest.go
b/sdk/go/manifest/manifest.go
index 49faa01b4d56b2eb32ab0c60d7e68a0c4966df87..cf0ae85c5782a8848d234de10d3fce81b6e661cd 100644
(file)
--- a/
sdk/go/manifest/manifest.go
+++ b/
sdk/go/manifest/manifest.go
@@
-20,6
+20,7
@@
var LocatorPattern = regexp.MustCompile(
type Manifest struct {
Text string
type Manifest struct {
Text string
+ Err error
}
type BlockLocator struct {
}
type BlockLocator struct {
@@
-43,12
+44,19
@@
type FileSegment struct {
Len int
}
Len int
}
+// FileStreamSegment is a portion of a file described as a segment of a stream.
+type FileStreamSegment struct {
+ SegPos uint64
+ SegLen uint64
+ Name string
+}
+
// Represents a single line from a manifest.
type ManifestStream struct {
// Represents a single line from a manifest.
type ManifestStream struct {
- StreamName string
- Blocks []string
- File
Tokens []string
- Err error
+ StreamName
string
+ Blocks
[]string
+ File
StreamSegments []FileStreamSegment
+ Err
error
}
var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
}
var escapeSeq = regexp.MustCompile(`\\([0-9]{3}|\\)`)
@@
-96,21
+104,21
@@
func ParseBlockLocator(s string) (b BlockLocator, err error) {
return
}
return
}
-func parseFile
Token(tok string) (segPos, segLen uint64, name string
, err error) {
+func parseFile
StreamSegment(tok string) (ft FileStreamSegment
, err error) {
parts := strings.SplitN(tok, ":", 3)
if len(parts) != 3 {
err = ErrInvalidToken
return
}
parts := strings.SplitN(tok, ":", 3)
if len(parts) != 3 {
err = ErrInvalidToken
return
}
-
s
egPos, err = strconv.ParseUint(parts[0], 10, 64)
+
ft.S
egPos, err = strconv.ParseUint(parts[0], 10, 64)
if err != nil {
return
}
if err != nil {
return
}
-
s
egLen, err = strconv.ParseUint(parts[1], 10, 64)
+
ft.S
egLen, err = strconv.ParseUint(parts[1], 10, 64)
if err != nil {
return
}
if err != nil {
return
}
-
n
ame = UnescapeName(parts[2])
+
ft.N
ame = UnescapeName(parts[2])
return
}
return
}
@@
-127,12
+135,11
@@
func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *F
blockLens := make([]int, 0, len(s.Blocks))
// This is what streamName+"/"+fileName will look like:
target := "./" + filepath
blockLens := make([]int, 0, len(s.Blocks))
// This is what streamName+"/"+fileName will look like:
target := "./" + filepath
- for _, fTok := range s.FileTokens {
- wantPos, wantLen, name, err := parseFileToken(fTok)
- if err != nil {
- // Skip (!) invalid file tokens.
- continue
- }
+ for _, fTok := range s.FileStreamSegments {
+ wantPos := fTok.SegPos
+ wantLen := fTok.SegLen
+ name := fTok.Name
+
if s.StreamName+"/"+name != target {
continue
}
if s.StreamName+"/"+name != target {
continue
}
@@
-183,28
+190,40
@@
func (s *ManifestStream) sendFileSegmentIterByName(filepath string, ch chan<- *F
func parseManifestStream(s string) (m ManifestStream) {
tokens := strings.Split(s, " ")
func parseManifestStream(s string) (m ManifestStream) {
tokens := strings.Split(s, " ")
+
m.StreamName = UnescapeName(tokens[0])
m.StreamName = UnescapeName(tokens[0])
+ if m.StreamName != "." && !strings.HasPrefix(m.StreamName, "./") {
+ m.Err = fmt.Errorf("Invalid stream name: %s", m.StreamName)
+ return
+ }
+
tokens = tokens[1:]
var i int
tokens = tokens[1:]
var i int
- for i =
range tokens
{
+ for i =
0; i < len(tokens); i++
{
if !blockdigest.IsBlockLocator(tokens[i]) {
break
}
}
m.Blocks = tokens[:i]
if !blockdigest.IsBlockLocator(tokens[i]) {
break
}
}
m.Blocks = tokens[:i]
-
m.FileTokens
= tokens[i:]
+
fileTokens :
= tokens[i:]
- if
m.StreamName != "." && !strings.HasPrefix(m.StreamName, "./")
{
- m.Err = fmt.Errorf("
Invalid stream name: %s", m.StreamName
)
+ if
len(m.Blocks) == 0
{
+ m.Err = fmt.Errorf("
No block locators found"
)
return
}
return
}
- for _, ft := range m.FileTokens {
- _, _, _, err := parseFileToken(ft)
+ if len(fileTokens) == 0 {
+ m.Err = fmt.Errorf("No file tokens found")
+ return
+ }
+
+ for _, ft := range fileTokens {
+ pft, err := parseFileStreamSegment(ft)
if err != nil {
m.Err = fmt.Errorf("Invalid file token: %s", ft)
break
}
if err != nil {
m.Err = fmt.Errorf("Invalid file token: %s", ft)
break
}
+ m.FileStreamSegments = append(m.FileStreamSegments, pft)
}
return
}
return
@@
-246,28
+265,24
@@
func (m *Manifest) FileSegmentIterByName(filepath string) <-chan *FileSegment {
return ch
}
return ch
}
-type ManifestBlockLocator struct {
- Locator blockdigest.BlockLocator
- Err error
-}
-
// Blocks may appear mulitple times within the same manifest if they
// are used by multiple files. In that case this Iterator will output
// the same block multiple times.
// Blocks may appear mulitple times within the same manifest if they
// are used by multiple files. In that case this Iterator will output
// the same block multiple times.
-func (m *Manifest) BlockIterWithDuplicates() <-chan ManifestBlockLocator {
- blockChannel := make(chan ManifestBlockLocator)
+//
+// In order to detect parse errors, caller must check m.Err after the returned channel closes.
+func (m *Manifest) BlockIterWithDuplicates() <-chan blockdigest.BlockLocator {
+ blockChannel := make(chan blockdigest.BlockLocator)
go func(streamChannel <-chan ManifestStream) {
for ms := range streamChannel {
if ms.Err != nil {
go func(streamChannel <-chan ManifestStream) {
for ms := range streamChannel {
if ms.Err != nil {
- blockChannel <- ManifestBlockLocator{Locator: blockdigest.BlockLocator{}, Err: ms.Err}
+ m.Err = ms.Err
continue
}
for _, block := range ms.Blocks {
continue
}
for _, block := range ms.Blocks {
- b, err := blockdigest.ParseBlockLocator(block)
- if err == nil {
- blockChannel <- ManifestBlockLocator{b, nil}
+ if b, err := blockdigest.ParseBlockLocator(block); err == nil {
+ blockChannel <- b
} else {
} else {
- blockChannel <- ManifestBlockLocator{Locator: blockdigest.BlockLocator{}, Err: err}
+ m.Err = err
}
}
}
}
}
}