Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/Neumenon/cowrie/llms.txt

Use this file to discover all available pages before exploring further.

Cowrie supports two streaming protocols for efficient record-by-record processing: Gen1 (simple length-prefixed) and Gen2 (master stream with metadata and checksums).

Gen1 Stream (Legacy)

Simple length-prefixed Cowrie records, similar to JSON Lines (JSONL).

Wire Format

[u32 length] [cowrie bytes]
[u32 length] [cowrie bytes]
...
[0x00 0x00 0x00 0x00]  // optional end marker
  • Length: 4-byte little-endian uint32
  • Payload: Complete Cowrie v2 document (header + data)
  • End Marker: Zero length signals end of stream (optional)

StreamWriter

import (
    "os"
    "github.com/Neumenon/cowrie/codec"
)

// Create writer
f, _ := os.Create("data.cowrie")
defer f.Close()

writer := codec.NewStreamWriter(f)

// Write records
for _, item := range items {
    err := writer.Write(item)
    if err != nil {
        return err
    }
}

// Flush if needed
writer.Sync()

StreamReader

// Read from file
data, _ := os.ReadFile("data.cowrie")
reader := codec.NewStreamReader(data)

// Read all records
for {
    var item MyStruct
    err := reader.Next(&item)
    if err == io.EOF {
        break
    }
    if err != nil {
        return err
    }
    process(item)
}

// Or read all at once
items, err := codec.ReadAllStream[MyStruct](data)

Use Cases

  • Log Shipping: Append-only event logs
  • Message Queues: Kafka-like record streams
  • Database Exports: Streaming table dumps
  • API Pagination: Cursor-based result sets

Example: Event Log

type Event struct {
    Timestamp int64
    UserID    string
    Action    string
    Metadata  map[string]any
}

// Write events
writer := codec.NewStreamWriter(logFile)
for _, event := range events {
    writer.Write(event)
}

// Read events
reader := codec.NewStreamReader(logData)
for {
    var event Event
    if err := reader.Next(&event); err == io.EOF {
        break
    }
    processEvent(event)
}

Gen2 Master Stream

Advanced streaming protocol with metadata, compression, checksums, and type routing.

Wire Format

Magic:       'S' 'J' 'S' 'T' (4 bytes)
Version:     0x02 (1 byte)
Flags:       u8 (compression, CRC, deterministic, metadata)
HeaderLen:   u16 LE (24 bytes for v2)
TypeID:      u32 LE (schema fingerprint)
PayloadLen:  u32 LE (compressed length)
RawLen:      u32 LE (original length, 0 if not compressed)
MetaLen:     u32 LE (metadata length, 0 if none)
[Metadata:   cowrie bytes (if MetaLen > 0)]
[Payload:    cowrie bytes or compressed]
[CRC32:      u32 LE (if FlagMasterCRC set)]

Flags

BitFlagMeaning
0CompressedPayload is compressed
1-2Compression type0=none, 1=gzip, 2=zstd
3DeterministicKeys sorted for reproducible encoding
4CRCCRC32 checksum follows payload
5MetaMetadata present before payload

MasterWriter

import "github.com/Neumenon/cowrie/codec"

// Configure writer
opts := codec.DefaultMasterWriterOptions()
// opts.Deterministic = true  (default)
// opts.Compression = cowrie.CompressionZstd  (default)
// opts.EnableCRC = true  (default)

writer := codec.NewMasterWriter(file, opts)

// Write with metadata
meta := cowrie.Object(
    cowrie.Member{Key: "version", Value: cowrie.Int64(1)},
    cowrie.Member{Key: "source", Value: cowrie.String("api")},
)

err := writer.WriteWithMeta(data, meta)

MasterReader

// Configure reader
opts := codec.DefaultMasterReaderOptions()
// opts.MaxDecompressedSize = 100 * 1024 * 1024  // 100MB default
// opts.AllowLegacy = true  // Auto-detect Gen1 streams

reader := codec.NewMasterReader(data, opts)

// Read frames
for {
    frame, err := reader.Next()
    if err == io.EOF {
        break
    }
    if err != nil {
        return err
    }
    
    // Access frame data
    fmt.Println("TypeID:", frame.TypeID)
    fmt.Println("Compressed:", frame.Header.Compression)
    
    // Access metadata (if present)
    if frame.Meta != nil {
        version := frame.Meta.Get("version").Int64()
    }
    
    // Access payload
    payload := frame.Payload
    processData(payload)
}

Example: Type Routing

// Define type handlers
handlers := map[uint32]func(*cowrie.Value) error{
    12345: handleUserEvent,
    67890: handleSystemEvent,
}

reader := codec.NewMasterReader(data, codec.MasterReaderOptions{
    TypeHandlers: handlers,
})

for {
    frame, err := reader.Next()
    if err == io.EOF {
        break
    }
    
    // Automatic dispatch based on TypeID
    if handler, ok := handlers[frame.TypeID]; ok {
        handler(frame.Payload)
    }
}

Schema Fingerprinting

Type IDs are computed using FNV-1a hash of the value’s type structure:
value := cowrie.Object(
    cowrie.Member{Key: "name", Value: cowrie.String("Alice")},
    cowrie.Member{Key: "age", Value: cowrie.Int64(30)},
)

typeID := cowrie.SchemaFingerprint32(value)  // Stable hash
Use cases:
  • Type Routing: Dispatch to correct handler
  • Schema Validation: Detect schema changes
  • Multiplexing: Route records by type
  • Schema Registry: Track schema evolution

Compression

Master stream automatically compresses payloads ≥ 256 bytes when enabled:
opts := codec.MasterWriterOptions{
    Compression: cowrie.CompressionZstd,  // or CompressionGzip
}
Compression is applied when:
  • Payload ≥ 256 bytes
  • Compressed size < original size
Security: Decompression is size-limited to prevent bombs:
opts := codec.MasterReaderOptions{
    MaxDecompressedSize: 100 * 1024 * 1024,  // 100MB default
}

CRC32 Checksums

Enable data integrity verification:
opts := codec.MasterWriterOptions{
    EnableCRC: true,  // default
}
CRC32 (IEEE polynomial) covers the entire frame from magic to payload. Reader automatically validates and returns ErrMasterCRCMismatch if corrupted.

Deterministic Encoding

Ensures reproducible output for content addressing:
opts := codec.MasterWriterOptions{
    Deterministic: true,  // default
}
  • Object keys sorted lexicographically
  • Same input → same output bytes
  • Essential for caching, deduplication, Merkle trees

Comparison

FeatureGen1 StreamGen2 Master Stream
Overhead4 bytes24 bytes + optional metadata
CompressionNoGzip, Zstd
ChecksumsNoCRC32
MetadataNoPer-frame metadata
Type RoutingNoSchema fingerprinting
Legacy SupportN/AAuto-detect Gen1

Column Readers (Advanced)

Gen2 supports columnar reading with column hints for efficient partial decoding.

Column Hints Format

HintCount: varint
For each hint:
  Field:     [len:varint][utf8 bytes]
  Type:      u8
  ShapeLen:  varint
  ShapeDims: ShapeLen * varint
  Flags:     u8
Hints appear after header flags, before dictionary (only if FlagHasColumnHints set).

Use Cases

  • Parquet-like Access: Read specific columns without full decode
  • Query Push-down: Filter on indexed columns
  • Partial Hydration: Load only needed fields
  • Wide Tables: Skip unused columns in large records

Security Limits

opts := cowrie.DecodeOptions{
    MaxHintCount: 10_000,  // Max column hints
}
See Security Limits for details.

Performance Tips

Gen1 Stream

  1. Batch Writes: Group small records to reduce syscalls
var buffer bytes.Buffer
writer := codec.NewStreamWriter(&buffer)
for _, item := range batch {
    writer.Write(item)
}
file.Write(buffer.Bytes())  // Single write
  1. Streaming Read: Process records without loading entire file
// Good: Streaming
reader := codec.NewStreamReader(data)
for {
    var item Item
    if err := reader.Next(&item); err == io.EOF {
        break
    }
    process(item)  // Constant memory
}

// Avoid: Load all
items, _ := codec.ReadAllStream[Item](data)  // Allocates array
  1. Zero-Copy Decoding: Reuse structs
var item Item
reader := codec.NewStreamReader(data)
for {
    err := reader.Next(&item)
    if err == io.EOF {
        break
    }
    process(item)
    // 'item' reused on next iteration
}

Gen2 Master Stream

  1. Enable Compression: 30-70% size reduction for mixed data
opts := codec.MasterWriterOptions{
    Compression: cowrie.CompressionZstd,
}
  1. Deterministic for Dedup: Cache compressed frames by hash
opts := codec.MasterWriterOptions{
    Deterministic: true,
    Compression:   cowrie.CompressionZstd,
}
frame, _ := encodeFrame(data, opts)
hash := sha256.Sum256(frame)
cache[hash] = frame  // Deduplication
  1. Type-Based Routing: Avoid decoding unneeded frames
frame, _ := reader.Next()
if frame.TypeID == expectedTypeID {
    processPayload(frame.Payload)
} else {
    skipPayload(frame)
}
  1. Metadata for Filtering: Skip frames without full decode
for {
    frame, _ := reader.Next()
    
    // Check metadata first
    if frame.Meta != nil {
        partition := frame.Meta.Get("partition").Int64()
        if partition != targetPartition {
            continue  // Skip without decoding payload
        }
    }
    
    // Only decode relevant frames
    process(frame.Payload)
}

Real-World Examples

Event Streaming (Gen1)

// Producer
func streamEvents(events []Event, w io.Writer) error {
    writer := codec.NewStreamWriter(w)
    for _, event := range events {
        if err := writer.Write(event); err != nil {
            return err
        }
    }
    return writer.Sync()
}

// Consumer
func consumeEvents(r io.Reader) error {
    data, _ := io.ReadAll(r)
    reader := codec.NewStreamReader(data)
    
    for {
        var event Event
        err := reader.Next(&event)
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }
        handleEvent(event)
    }
    return nil
}

Multi-Tenant Data Stream (Gen2)

// Write with tenant metadata
opts := codec.DefaultMasterWriterOptions()
writer := codec.NewMasterWriter(stream, opts)

for _, record := range records {
    meta := cowrie.Object(
        cowrie.Member{Key: "tenant", Value: cowrie.String(record.TenantID)},
        cowrie.Member{Key: "timestamp", Value: cowrie.Int64(time.Now().Unix())},
    )
    writer.WriteWithMeta(record.Data, meta)
}

// Read filtered by tenant
reader := codec.NewMasterReader(data, codec.DefaultMasterReaderOptions())
targetTenant := "acme_corp"

for {
    frame, err := reader.Next()
    if err == io.EOF {
        break
    }
    
    // Filter by metadata (no payload decode!)
    if frame.Meta != nil {
        tenant := frame.Meta.Get("tenant").String()
        if tenant != targetTenant {
            continue
        }
    }
    
    // Process only relevant tenant's data
    process(frame.Payload)
}

Error Handling

Gen1 Stream Errors

reader := codec.NewStreamReader(data)
for {
    var item Item
    err := reader.Next(&item)
    if err == io.EOF {
        break  // Normal end
    }
    if err != nil {
        // Truncated frame, decode error, etc.
        log.Printf("Stream error at position %d: %v", reader.Position(), err)
        return err
    }
}

Gen2 Master Stream Errors

reader := codec.NewMasterReader(data, opts)
for {
    frame, err := reader.Next()
    switch err {
    case nil:
        // Success
    case io.EOF:
        // Normal end
    case codec.ErrMasterCRCMismatch:
        // Corrupted frame
        log.Printf("CRC mismatch at offset %d", reader.Position())
    case codec.ErrMasterTruncated:
        // Incomplete frame
        log.Printf("Truncated frame at offset %d", reader.Position())
    case cowrie.ErrDecompressedTooLarge:
        // Decompression bomb detected
        log.Printf("Suspicious payload at offset %d", reader.Position())
    default:
        // Other errors
        return err
    }
}