-
Notifications
You must be signed in to change notification settings - Fork 220
/
Copy pathstream.go
108 lines (93 loc) · 4.02 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package stream
import (
"bufio"
"container/list"
"errors"
"io"
"github.com/johnkerl/miller/v6/pkg/cli"
"github.com/johnkerl/miller/v6/pkg/input"
"github.com/johnkerl/miller/v6/pkg/output"
"github.com/johnkerl/miller/v6/pkg/transformers"
"github.com/johnkerl/miller/v6/pkg/types"
)
// Since Go is concurrent, the context struct (AWK-like variables such as
// FILENAME, NF, NF, FNR, etc.) needs to be duplicated and passed through the
// channels along with each record.
//
// * Record-readers update FILENAME, FILENUM, NF, NR, FNR within context structs.
//
// * Record-transformers can read these from the context structs.
//
// * Record-writers don't need them (OPS et al. are already in the
// writer-options struct). However, we have chained transformers using the
// 'then' command-line syntax. This means a given transformer might be piping
// its output to a record-writer, or another transformer. So, the
// record-and-context pair goes to the record-writers even though they don't
// need the contexts.
// Stream is the high-level sketch of Miller. It coordinates instantiating
// format-specific record-reader and record-writer objects, using flags from
// the command line; setting up I/O channels; running the record stream from
// the record-reader object, through the specified chain of transformers
// (verbs), to the record-writer object.
func Stream(
// fileNames argument is separate from options.FileNames for in-place mode,
// which sends along only one file name per call to Stream():
fileNames []string,
options *cli.TOptions,
recordTransformers []transformers.IRecordTransformer,
outputStream io.WriteCloser,
outputIsStdout bool,
) error {
// Since Go is concurrent, the context struct needs to be duplicated and
// passed through the channels along with each record.
initialContext := types.NewContext()
// Instantiate the record-reader.
// RecordsPerBatch is tracked separately from ReaderOptions since join/repl
// may use batch size of 1.
recordReader, err := input.Create(&options.ReaderOptions, options.ReaderOptions.RecordsPerBatch)
if err != nil {
return err
}
// Instantiate the record-writer
recordWriter, err := output.Create(&options.WriterOptions)
if err != nil {
return err
}
// Set up the reader-to-transformer and transformer-to-writer channels.
readerChannel := make(chan *list.List, 2) // list of *types.RecordAndContext
writerChannel := make(chan *list.List, 1) // list of *types.RecordAndContext
// We're done when a fatal error is registered on input (file not found,
// etc) or when the record-writer has written all its output. We use
// channels to communicate both of these conditions.
inputErrorChannel := make(chan error, 1)
doneWritingChannel := make(chan bool, 1)
dataProcessingErrorChannel := make(chan bool, 1)
// For mlr head, so a transformer can communicate it will disregard all
// further input. It writes this back upstream, and that is passed back to
// the record-reader which then stops reading input. This is necessary to
// get quick response from, for example, mlr head -n 10 on input files with
// millions or billions of records.
readerDownstreamDoneChannel := make(chan bool, 1)
// Start the reader, transformer, and writer. Let them run until fatal input
// error or end-of-processing happens.
bufferedOutputStream := bufio.NewWriter(outputStream)
go recordReader.Read(fileNames, *initialContext, readerChannel, inputErrorChannel, readerDownstreamDoneChannel)
go transformers.ChainTransformer(readerChannel, readerDownstreamDoneChannel, recordTransformers,
writerChannel, options)
go output.ChannelWriter(writerChannel, recordWriter, &options.WriterOptions, doneWritingChannel,
dataProcessingErrorChannel, bufferedOutputStream, outputIsStdout)
var retval error
done := false
for !done {
select {
case ierr := <-inputErrorChannel:
retval = ierr
case <-dataProcessingErrorChannel:
retval = errors.New("exiting due to data error") // details already printed
case <-doneWritingChannel:
done = true
}
}
bufferedOutputStream.Flush()
return retval
}