Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions compiler/parser/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,20 +99,22 @@ func (e *Error) Error() string {
if e.sset == nil {
return e.Msg
}
src := e.sset.SourceOf(e.Pos)
start := src.Position(e.Pos)
end := src.Position(e.End)
var b strings.Builder
b.WriteString(e.Msg)
if src.Filename != "" {
fmt.Fprintf(&b, " in %s", src.Filename)
}
line := src.LineOfPos(e.sset.Text, e.Pos)
fmt.Fprintf(&b, " at line %d, column %d:\n%s\n", start.Line, start.Column, line)
if end.IsValid() {
formatSpanError(&b, line, start, end)
} else {
formatPointError(&b, start)
if e.Pos >= 0 {
src := e.sset.SourceOf(e.Pos)
start := src.Position(e.Pos)
end := src.Position(e.End)
if src.Filename != "" {
fmt.Fprintf(&b, " in %s", src.Filename)
}
line := src.LineOfPos(e.sset.Text, e.Pos)
fmt.Fprintf(&b, " at line %d, column %d:\n%s\n", start.Line, start.Column, line)
if end.IsValid() {
formatSpanError(&b, line, start, end)
} else {
formatPointError(&b, start)
}
}
return b.String()
}
Expand Down
100 changes: 61 additions & 39 deletions compiler/semantic/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
func Analyze(ctx context.Context, seq ast.Seq, source *data.Source, head *lakeparse.Commitish) (dag.Seq, error) {
a := newAnalyzer(ctx, source, head)
s := a.semSeq(seq)
s = a.checkOutputs(true, s)
s, _ = a.checkSeqOutputs(true, s)
if a.errors != nil {
return nil, a.errors
}
Expand All @@ -30,7 +30,7 @@ func Analyze(ctx context.Context, seq ast.Seq, source *data.Source, head *lakepa
func AnalyzeAddSource(ctx context.Context, seq ast.Seq, source *data.Source, head *lakeparse.Commitish) (dag.Seq, error) {
a := newAnalyzer(ctx, source, head)
s := a.semSeq(seq)
s = a.checkOutputs(true, s)
s, _ = a.checkSeqOutputs(true, s)
if a.errors != nil {
return nil, a.errors
}
Expand Down Expand Up @@ -151,54 +151,76 @@ func badOp() dag.Op {
}

func (a *analyzer) error(n ast.Node, err error) {
a.errors.Append(err.Error(), n.Pos(), n.End())
pos, end := -1, -1
if n != nil {
pos, end = n.Pos(), n.End()
}
a.errors.Append(err.Error(), pos, end)
}

func (a *analyzer) checkOutputs(isLeaf bool, seq dag.Seq) dag.Seq {
func (a *analyzer) checkSeqOutputs(isLeaf bool, seq dag.Seq) (dag.Seq, bool) {
if len(seq) == 0 {
return seq
return seq, false
}
// - Report an error in any outputs are not located in the leaves.
// - Add output operators to any leaves where they do not exist.
lastN := len(seq) - 1
last := len(seq) - 1
var blocked bool
for i, o := range seq {
isLast := lastN == i
switch o := o.(type) {
case *dag.Output:
if !isLast || !isLeaf {
n, ok := a.outputs[o]
if !ok {
panic("system error: untracked user output")
}
a.error(n, errors.New("output operator must be at flowgraph leaf"))
}
case *dag.Scope:
o.Body = a.checkOutputs(isLast && isLeaf, o.Body)
case *dag.Scatter:
for k := range o.Paths {
o.Paths[k] = a.checkOutputs(isLast && isLeaf, o.Paths[k])
}
case *dag.Over:
o.Body = a.checkOutputs(false, o.Body)
case *dag.Fork:
for k := range o.Paths {
o.Paths[k] = a.checkOutputs(isLast && isLeaf, o.Paths[k])
}
case *dag.Switch:
for k := range o.Cases {
o.Cases[k].Path = a.checkOutputs(isLast && isLeaf, o.Cases[k].Path)
}
case *dag.Mirror:
o.Main = a.checkOutputs(isLast && isLeaf, o.Main)
o.Mirror = a.checkOutputs(isLast && isLeaf, o.Mirror)
if blocked {
// XXX We need to map all DAG ops to their AST equivalent so we can
// get location specific errors.
a.error(nil, errors.New("unreachable operator"))
return seq, true
}
blocked = a.checkOpOutput(o, isLeaf, last == i)
}
switch seq[lastN].(type) {
case *dag.Scope, *dag.Output, *dag.Scatter, *dag.Fork, *dag.Switch, *dag.Mirror:
switch seq[last].(type) {
case *dag.Scope, *dag.Output, *dag.Scatter, *dag.Fork, *dag.Switch, *dag.Mirror, *dag.Over:
default:
if isLeaf {
return append(seq, &dag.Output{Kind: "Output", Name: "main"})
seq.Append(&dag.Output{Kind: "Output", Name: "main"})
blocked = true
}
}
return seq, blocked
}

func (a *analyzer) checkOpOutput(o dag.Op, isLeafSeq bool, isLast bool) bool {
var blocked bool
switch o := o.(type) {
case *dag.Output:
blocked = true
case *dag.Scope:
o.Body, blocked = a.checkSeqOutputs(isLast && isLeafSeq, o.Body)
case *dag.Scatter:
blocked = true
for k := range o.Paths {
var pathBlocked bool
o.Paths[k], pathBlocked = a.checkSeqOutputs(isLast && isLeafSeq, o.Paths[k])
blocked = blocked && pathBlocked
}
case *dag.Over:
o.Body, blocked = a.checkSeqOutputs(isLast && isLeafSeq, o.Body)
case *dag.Fork:
blocked = true
for k := range o.Paths {
var pathBlocked bool
o.Paths[k], pathBlocked = a.checkSeqOutputs(isLast && isLeafSeq, o.Paths[k])
blocked = blocked && pathBlocked
}
case *dag.Switch:
blocked = true
for k := range o.Cases {
var pathBlocked bool
o.Cases[k].Path, pathBlocked = a.checkSeqOutputs(isLast && isLeafSeq, o.Cases[k].Path)
blocked = blocked && pathBlocked
}
case *dag.Mirror:
var mainOut, mirrorOut bool
o.Main, mainOut = a.checkSeqOutputs(isLast && isLeafSeq, o.Main)
o.Mirror, mirrorOut = a.checkSeqOutputs(isLast && isLeafSeq, o.Mirror)
blocked = mainOut && mirrorOut
}
return seq
return blocked
}
26 changes: 24 additions & 2 deletions zfmt/ztests/output.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
script: |
zc -C -s 'fork (=> output foo => pass)'
echo '// ==='
zc -C -s 'switch x (case "foo" => output foo case "bar" => pass)'
zc -C -s 'switch x (case "foo" => output foo case "bar" => pass) | sort this'
echo '// ==='
zc -C -s 'over this => ( sum(this) | output bar )'
# error cases
! zc -C -s 'over this => ( sum(this) | output bar ) | x := "foo"'
! zc -C -s 'fork (=> output foo | "yield foo" => pass) | sort this'
! zc -C -s 'fork (=> output foo => output bar) | count()'
! zc -C -s 'switch x (case "foo" => output foo case "bar" => output bar) | count()'

outputs:
- name: stdout
Expand All @@ -21,5 +28,20 @@ outputs:
output foo
case "bar" =>
pass
| output main
)
| sort this
| output main
// ===
reader
| over this => (
summarize
sum:=sum(this)
| yield sum
| output bar
)
- name: stderr
data: |
unreachable operator
unreachable operator
unreachable operator
unreachable operator