Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/Neumenon/cowrie/llms.txt

Use this file to discover all available pages before exploring further.

Overview

NodeBatch and EdgeBatch types enable efficient encoding of multiple graph elements in a single Cowrie value. They are optimized for:
  • GNN Mini-Batches: Streaming training data for graph neural networks
  • Bulk Graph Loading: Efficiently load thousands of nodes/edges at once
  • Dictionary Sharing: All nodes/edges in a batch share the same property key dictionary
  • Compressed Transport: Single compression envelope for entire batch
Batches reduce per-element overhead by amortizing dictionary and metadata costs across many elements.

NodeBatch

Gen1 Wire Format (Tag 0x13)

Tag(0x13) | count:varint | Node[0] | Node[1] | ... | Node[count-1]
Encoding:
  • Count specifies number of nodes in batch
  • Each node encoded as Gen1 Node (tag 0x10)
  • No shared dictionary (each node has inline keys)

Gen2 Wire Format (Tag 0x37)

Tag(0x37) | count:varint | Node[0] | Node[1] | ... | Node[count-1]
Encoding:
  • Count specifies number of nodes
  • Each node encoded as Gen2 Node (tag 0x35)
  • Shared dictionary: All nodes reference same header dictionary
  • Significant size savings when nodes have similar schemas

Usage Patterns

NodeBatch is ideal for:
  1. GNN Training: Mini-batch sampling from large graphs
  2. Graph Snapshots: Bulk export of subgraphs
  3. Incremental Sync: Send batches of changed nodes
  4. Parallel Processing: Distribute node batches across workers

Example: Creating Node Batches

import "github.com/Neumenon/cowrie"

// Build array of Node values
nodes := make([]*cowrie.Value, 0, 100)

for i := 0; i < 100; i++ {
    node := cowrie.Object(
        cowrie.Member{Key: "id", Value: cowrie.String(fmt.Sprintf("node_%d", i))},
        cowrie.Member{Key: "labels", Value: cowrie.Array(cowrie.String("Node"))},
        cowrie.Member{Key: "props", Value: cowrie.Object(
            cowrie.Member{Key: "x", Value: cowrie.Float64(rand.Float64())},
            cowrie.Member{Key: "y", Value: cowrie.Float64(rand.Float64())},
        )},
    )
    nodes = append(nodes, node)
}

// Encode as NodeBatch
batch := cowrie.NodeBatch(nodes...)
data, _ := cowrie.Encode(batch)

EdgeBatch

Gen1 Wire Format (Tag 0x14)

Tag(0x14) | count:varint | Edge[0] | Edge[1] | ... | Edge[count-1]
Encoding:
  • Count specifies number of edges
  • Each edge encoded as Gen1 Edge (tag 0x11)
  • COO (Coordinate) format: explicit src/dst for each edge

Gen2 Wire Format (Tag 0x38)

Tag(0x38) | count:varint | Edge[0] | Edge[1] | ... | Edge[count-1]
Encoding:
  • Count specifies number of edges
  • Each edge encoded as Gen2 Edge (tag 0x36)
  • Dictionary-coded edge properties
  • Still uses COO format (not CSR)
EdgeBatch uses COO format, not CSR (Compressed Sparse Row). For CSR representation, use the AdjList type (tag 0x30/0x12).

Usage Patterns

EdgeBatch is ideal for:
  1. Bulk Edge Loading: Import large edge lists (e.g., social graphs, knowledge graphs)
  2. GNN Message Passing: Batch edges for neighborhood aggregation
  3. Graph Streaming: Continuously append new edges
  4. ETL Pipelines: Transform and load edge data

Example: Creating Edge Batches

// Load friendship network
friends := []struct{ from, to string }{
    {"alice", "bob"},
    {"alice", "carol"},
    {"bob", "dave"},
    {"carol", "dave"},
    {"dave", "eve"},
}

sw := graph.NewStreamWriter(&buf)
sw.Header().AddLabel("FRIENDS")
sw.WriteHeader()

for _, pair := range friends {
    sw.WriteEdge(&graph.EdgeEvent{
        Op:     graph.OpUpsert,
        Label:  "FRIENDS",
        FromID: pair.from,
        ToID:   pair.to,
        Props: map[string]any{
            "since": time.Now().Unix(),
        },
    })
}

sw.Close()

GNN Training Example

Complete example of streaming mini-batches for GNN training:
package main

import (
    "fmt"
    "math/rand"
    "github.com/Neumenon/cowrie/graph"
    "github.com/Neumenon/cowrie/graph/loader"
)

// MiniBatchIterator generates training mini-batches
type MiniBatchIterator struct {
    graph     *Graph
    batchSize int
    nodeIDs   []string
    pos       int
}

func NewMiniBatchIterator(g *Graph, batchSize int) *MiniBatchIterator {
    // Get all node IDs and shuffle
    nodeIDs := g.GetAllNodeIDs()
    rand.Shuffle(len(nodeIDs), func(i, j int) {
        nodeIDs[i], nodeIDs[j] = nodeIDs[j], nodeIDs[i]
    })
    
    return &MiniBatchIterator{
        graph:     g,
        batchSize: batchSize,
        nodeIDs:   nodeIDs,
    }
}

func (it *MiniBatchIterator) Next() ([]byte, error) {
    if it.pos >= len(it.nodeIDs) {
        return nil, nil // End of epoch
    }
    
    // Get next batch of node IDs
    end := it.pos + it.batchSize
    if end > len(it.nodeIDs) {
        end = len(it.nodeIDs)
    }
    batchIDs := it.nodeIDs[it.pos:end]
    it.pos = end
    
    // Create graph stream with nodes and their edges
    var buf bytes.Buffer
    sw := graph.NewStreamWriter(&buf)
    
    // Write nodes
    sw.Header().AddLabel("Node")
    sw.Header().AddField("features")
    sw.Header().AddField("label")
    sw.WriteHeader()
    
    for _, id := range batchIDs {
        node := it.graph.GetNode(id)
        sw.WriteNode(&graph.NodeEvent{
            Op:     graph.OpUpsert,
            ID:     id,
            Labels: []string{"Node"},
            Props: map[string]any{
                "features": node.FeatureVector(),
                "label":    node.Label,
            },
        })
    }
    
    // Write edges (for message passing)
    for _, id := range batchIDs {
        for _, edge := range it.graph.GetOutgoingEdges(id) {
            sw.WriteEdge(&graph.EdgeEvent{
                Op:     graph.OpUpsert,
                Label:  "EDGE",
                FromID: edge.From,
                ToID:   edge.To,
                Props: map[string]any{
                    "weight": edge.Weight,
                },
            })
        }
    }
    
    sw.Close()
    return buf.Bytes(), nil
}

func main() {
    g := LoadGraph("graph.json")
    
    // Create mini-batch iterator
    iter := NewMiniBatchIterator(g, 32)
    
    // Training loop
    for epoch := 0; epoch < 100; epoch++ {
        iter.pos = 0 // Reset for new epoch
        
        for {
            batchData, err := iter.Next()
            if err != nil || batchData == nil {
                break
            }
            
            // Send to GNN trainer
            trainOnBatch(batchData)
        }
    }
}

Streaming Large Batches

For very large batches, use streaming I/O:
import "io"

// Stream writer (e.g., to file or network)
f, _ := os.Create("graph_batch.cowrie")
defer f.Close()

sw := graph.NewStreamWriter(f)
sw.WriteHeader()

// Stream millions of nodes
for i := 0; i < 10_000_000; i++ {
    sw.WriteNode(&graph.NodeEvent{
        Op:     graph.OpUpsert,
        ID:     fmt.Sprintf("node_%d", i),
        Labels: []string{"Node"},
        Props: map[string]any{
            "value": rand.Float64(),
        },
    })
    
    // Flush periodically to control memory
    if i%10000 == 0 {
        f.Sync()
    }
}

sw.Close()

Performance Considerations

Dictionary Pre-Population

Always pre-populate dictionaries for best compression:
sw := graph.NewStreamWriter(&buf)

// Add all expected property keys upfront
for _, key := range []string{"name", "age", "score", "vector", "type"} {
    sw.Header().AddField(key)
}

// Add all expected labels
for _, label := range []string{"Person", "Document", "Entity"} {
    sw.Header().AddLabel(label)
}

sw.WriteHeader()

// Now all events use dictionary indices

Batch Size Guidelines

Use CaseRecommended Batch SizeNotes
GNN Training32-512 nodesFits in GPU memory
Bulk Loading10K-100K nodesBalance memory and I/O
Streaming100-1000 nodesLow latency
Analytics1M+ nodesMaximize throughput

Memory vs. I/O Trade-offs

// High memory, low I/O
sw := graph.NewStreamWriter(&buf)
for i := 0; i < 1_000_000; i++ {
    sw.WriteNode(node)
}
data := buf.Bytes() // All in memory

// Low memory, high I/O
sw := graph.NewStreamWriter(file)
for i := 0; i < 1_000_000; i++ {
    sw.WriteNode(node)
    if i%1000 == 0 {
        file.Sync() // Periodic flush
    }
}

Batch Statistics

Track batch loading performance:
import "github.com/Neumenon/cowrie/graph/loader"

// Wrap writer with stats collector
memGraph := loader.NewMemoryGraph()
statsWriter := loader.NewStatsWriter(memGraph)

// Load batch
loader.LoadFromStream(batchData, statsWriter)

// Print stats
stats := statsWriter.Stats()
fmt.Printf("Nodes: %d, Edges: %d\n", 
    stats.NodesWritten, stats.EdgesWritten)

Type Tags Reference

FormatTagNameStructure
Gen10x13NodeBatchcount + Node[count]
Gen10x14EdgeBatchcount + Edge[count]
Gen20x37NodeBatchcount + Node[count] (dict-coded)
Gen20x38EdgeBatchcount + Edge[count] (dict-coded)
Use Gen2 format for batches with repeated schemas. The dictionary overhead is amortized across all elements, resulting in 70-80% size reduction.