Documentation Index
Fetch the complete documentation index at: https://mintlify.com/Neumenon/cowrie/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Gen1’s StreamDecoder provides safe, efficient streaming decoding for any io.Reader. It properly handles record boundaries and unconsumed bytes, working correctly with network connections, compressed streams, and other non-seekable readers.
StreamDecoder
Streaming decoder that maintains an internal buffer to handle incomplete records.
type StreamDecoder struct {
// Internal fields (not exported)
}
NewStreamDecoder
Creates a new streaming decoder for the given reader.
func NewStreamDecoder(r io.Reader) *StreamDecoder
Any reader providing Gen1 Cowrie binary data. Supports seekable (*os.File, *bytes.Reader) and non-seekable (net.Conn, gzip.Reader, http.Response.Body) readers.
Decoder instance ready to decode values
Example
conn, err := net.Dial("tcp", "localhost:8080")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
dec := gen1.NewStreamDecoder(conn)
Decode
Reads and decodes one value from the stream.
func (d *StreamDecoder) Decode() (any, error)
Decoded Go value. Same types as gen1.Decode():
- Primitives:
nil, bool, int64, float64, string, []byte
- Containers:
map[string]any, []any
- Typed arrays:
[]int64, []float64, []string
- Graph types:
Node, Edge, AdjList, NodeBatch, EdgeBatch, GraphShard
io.EOF: No more data available (clean stream end)
errors.New("unexpected EOF: incomplete record"): Stream ended mid-record
- Other errors: Malformed data or security limit exceeded
Example: Basic Streaming
f, err := os.Open("data.cowrie")
if err != nil {
log.Fatal(err)
}
defer f.Close()
dec := gen1.NewStreamDecoder(f)
for {
value, err := dec.Decode()
if err == io.EOF {
break // Clean end of stream
}
if err != nil {
log.Fatal(err) // Decode error
}
// Process value
obj := value.(map[string]any)
fmt.Printf("ID: %s\n", obj["id"])
}
Example: Network Stream
conn, err := net.Dial("tcp", "api.example.com:9000")
if err != nil {
log.Fatal(err)
}
defer conn.Close()
dec := gen1.NewStreamDecoder(conn)
for {
record, err := dec.Decode()
if err == io.EOF {
break
}
if err != nil {
log.Printf("Decode error: %v", err)
break
}
// Process streaming record
processRecord(record)
}
Example: Compressed Stream
f, _ := os.Open("archive.cowrie.gz")
defer f.Close()
gzr, err := gzip.NewReader(f)
if err != nil {
log.Fatal(err)
}
defer gzr.Close()
dec := gen1.NewStreamDecoder(gzr)
for {
value, err := dec.Decode()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
processValue(value)
}
Example: HTTP Response
resp, err := http.Get("https://api.example.com/stream")
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
log.Fatalf("HTTP %d", resp.StatusCode)
}
dec := gen1.NewStreamDecoder(resp.Body)
for {
event, err := dec.Decode()
if err == io.EOF {
break
}
if err != nil {
log.Printf("Stream error: %v", err)
break
}
handleEvent(event)
}
Error Handling
EOF Handling
The decoder distinguishes between clean stream end and truncated data:
for {
value, err := dec.Decode()
if err == io.EOF {
// Clean end: no more complete records
fmt.Println("Stream complete")
break
}
if err != nil {
// Decode error: malformed data or incomplete record
if strings.Contains(err.Error(), "unexpected EOF") {
log.Println("Stream truncated mid-record")
} else {
log.Printf("Decode error: %v", err)
}
break
}
process(value)
}
Incomplete Records
The decoder buffers incomplete data automatically:
// Reader returns partial data
conn.Read() // Returns 50 bytes of a 100-byte record
// First Decode() call
value, err := dec.Decode()
// err = nil (internally buffers 50 bytes, reads more data)
// Decoder continues reading until complete record is available
Security Limits
Streaming decode respects the same security limits as Decode():
dec := gen1.NewStreamDecoder(untrustedConn)
for {
value, err := dec.Decode()
if err == gen1.ErrMaxArrayLen {
log.Println("Malicious array size detected")
conn.Close()
break
}
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
process(value)
}
Buffer Management
- Initial buffer: 4KB
- Grows automatically for larger records
- Efficiently handles unconsumed bytes between records
- No data loss on partial reads
Memory Efficiency
The decoder maintains a small buffer (typically 4-8KB) even when processing large datasets:
// Efficiently processes millions of records
for {
record, err := dec.Decode()
if err == io.EOF {
break
}
// Process and discard - only one record in memory
saveToDatabase(record)
}
Best Practices
1. Always Check io.EOF
// Good: Explicit EOF check
for {
value, err := dec.Decode()
if err == io.EOF {
break // Expected end
}
if err != nil {
return err // Unexpected error
}
process(value)
}
// Bad: Treating EOF as error
for {
value, err := dec.Decode()
if err != nil {
return err // Wrong: EOF is not an error
}
process(value)
}
2. Reuse Decoder for Multiple Records
// Good: One decoder for entire stream
dec := gen1.NewStreamDecoder(conn)
for {
value, err := dec.Decode()
// ...
}
// Bad: Creating new decoder per record (loses buffered data)
for {
dec := gen1.NewStreamDecoder(conn) // Wrong!
value, err := dec.Decode()
// ...
}
3. Handle Partial Reads Gracefully
The decoder handles partial reads automatically - no special handling needed:
// Network with variable packet sizes
dec := gen1.NewStreamDecoder(conn)
// Decoder buffers partial data transparently
for {
value, err := dec.Decode()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
process(value)
}
4. Combine with Context for Timeouts
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
type result struct {
value any
err error
}
ch := make(chan result)
go func() {
value, err := dec.Decode()
ch <- result{value, err}
}()
select {
case res := <-ch:
if res.err != nil {
log.Fatal(res.err)
}
process(res.value)
case <-ctx.Done():
log.Println("Decode timeout")
}
Comparison with Non-Streaming Decode
Non-Streaming (Decode)
Requires entire message in memory:
// Must read entire record first
data, err := io.ReadAll(conn)
if err != nil {
log.Fatal(err)
}
value, err := gen1.Decode(data)
if err != nil {
log.Fatal(err)
}
Use when:
- Data already in memory (buffers, HTTP responses)
- Small, fixed-size messages
- Random access needed
Streaming (StreamDecoder)
Processes data incrementally:
// Processes records as they arrive
dec := gen1.NewStreamDecoder(conn)
for {
value, err := dec.Decode()
if err == io.EOF {
break
}
if err != nil {
log.Fatal(err)
}
process(value)
}
Use when:
- Reading from network connections
- Processing large files
- Handling multiple records in sequence
- Working with compressed streams
- Memory efficiency is important
Common Patterns
Pattern: Streaming Pipeline
func processCowrieStream(r io.Reader) error {
dec := gen1.NewStreamDecoder(r)
for {
value, err := dec.Decode()
if err == io.EOF {
return nil // Success
}
if err != nil {
return fmt.Errorf("decode: %w", err)
}
if err := processRecord(value); err != nil {
return fmt.Errorf("process: %w", err)
}
}
}
Pattern: Count Records
func countRecords(r io.Reader) (int, error) {
dec := gen1.NewStreamDecoder(r)
count := 0
for {
_, err := dec.Decode()
if err == io.EOF {
return count, nil
}
if err != nil {
return 0, err
}
count++
}
}
Pattern: Batch Processing
func processBatches(r io.Reader, batchSize int) error {
dec := gen1.NewStreamDecoder(r)
batch := make([]any, 0, batchSize)
for {
value, err := dec.Decode()
if err == io.EOF {
if len(batch) > 0 {
processBatch(batch) // Process final partial batch
}
return nil
}
if err != nil {
return err
}
batch = append(batch, value)
if len(batch) >= batchSize {
if err := processBatch(batch); err != nil {
return err
}
batch = batch[:0] // Reuse slice
}
}
}
Pattern: Filter Stream
func filterStream(r io.Reader, w io.Writer, predicate func(any) bool) error {
dec := gen1.NewStreamDecoder(r)
for {
value, err := dec.Decode()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
if predicate(value) {
if err := gen1.EncodeTo(w, value); err != nil {
return err
}
}
}
}