Documentation Index Fetch the complete documentation index at: https://mintlify.com/Neumenon/cowrie/llms.txt
Use this file to discover all available pages before exploring further.
Overview
NodeBatch and EdgeBatch types enable efficient encoding of multiple graph elements in a single Cowrie value. They are optimized for:
GNN Mini-Batches : Streaming training data for graph neural networks
Bulk Graph Loading : Efficiently load thousands of nodes/edges at once
Dictionary Sharing : All nodes/edges in a batch share the same property key dictionary
Compressed Transport : Single compression envelope for entire batch
Batches reduce per-element overhead by amortizing dictionary and metadata costs across many elements.
NodeBatch
Tag(0x13) | count:varint | Node[0] | Node[1] | ... | Node[count-1]
Encoding:
Count specifies number of nodes in batch
Each node encoded as Gen1 Node (tag 0x10)
No shared dictionary (each node has inline keys)
Tag(0x37) | count:varint | Node[0] | Node[1] | ... | Node[count-1]
Encoding:
Count specifies number of nodes
Each node encoded as Gen2 Node (tag 0x35)
Shared dictionary : All nodes reference same header dictionary
Significant size savings when nodes have similar schemas
Usage Patterns
NodeBatch is ideal for:
GNN Training : Mini-batch sampling from large graphs
Graph Snapshots : Bulk export of subgraphs
Incremental Sync : Send batches of changed nodes
Parallel Processing : Distribute node batches across workers
Example: Creating Node Batches
Manual Construction
GNN Mini-Batch
Bulk Loading
import " github.com/Neumenon/cowrie "
// Build array of Node values
nodes := make ([] * cowrie . Value , 0 , 100 )
for i := 0 ; i < 100 ; i ++ {
node := cowrie . Object (
cowrie . Member { Key : "id" , Value : cowrie . String ( fmt . Sprintf ( "node_ %d " , i ))},
cowrie . Member { Key : "labels" , Value : cowrie . Array ( cowrie . String ( "Node" ))},
cowrie . Member { Key : "props" , Value : cowrie . Object (
cowrie . Member { Key : "x" , Value : cowrie . Float64 ( rand . Float64 ())},
cowrie . Member { Key : "y" , Value : cowrie . Float64 ( rand . Float64 ())},
)},
)
nodes = append ( nodes , node )
}
// Encode as NodeBatch
batch := cowrie . NodeBatch ( nodes ... )
data , _ := cowrie . Encode ( batch )
EdgeBatch
Tag(0x14) | count:varint | Edge[0] | Edge[1] | ... | Edge[count-1]
Encoding:
Count specifies number of edges
Each edge encoded as Gen1 Edge (tag 0x11)
COO (Coordinate) format: explicit src/dst for each edge
Tag(0x38) | count:varint | Edge[0] | Edge[1] | ... | Edge[count-1]
Encoding:
Count specifies number of edges
Each edge encoded as Gen2 Edge (tag 0x36)
Dictionary-coded edge properties
Still uses COO format (not CSR)
EdgeBatch uses COO format, not CSR (Compressed Sparse Row). For CSR representation, use the AdjList type (tag 0x30/0x12).
Usage Patterns
EdgeBatch is ideal for:
Bulk Edge Loading : Import large edge lists (e.g., social graphs, knowledge graphs)
GNN Message Passing : Batch edges for neighborhood aggregation
Graph Streaming : Continuously append new edges
ETL Pipelines : Transform and load edge data
Example: Creating Edge Batches
Social Graph Edges
Knowledge Graph Triples
Weighted Graph
// Load friendship network
friends := [] struct { from , to string }{
{ "alice" , "bob" },
{ "alice" , "carol" },
{ "bob" , "dave" },
{ "carol" , "dave" },
{ "dave" , "eve" },
}
sw := graph . NewStreamWriter ( & buf )
sw . Header (). AddLabel ( "FRIENDS" )
sw . WriteHeader ()
for _ , pair := range friends {
sw . WriteEdge ( & graph . EdgeEvent {
Op : graph . OpUpsert ,
Label : "FRIENDS" ,
FromID : pair . from ,
ToID : pair . to ,
Props : map [ string ] any {
"since" : time . Now (). Unix (),
},
})
}
sw . Close ()
GNN Training Example
Complete example of streaming mini-batches for GNN training:
package main
import (
" fmt "
" math/rand "
" github.com/Neumenon/cowrie/graph "
" github.com/Neumenon/cowrie/graph/loader "
)
// MiniBatchIterator generates training mini-batches
type MiniBatchIterator struct {
graph * Graph
batchSize int
nodeIDs [] string
pos int
}
func NewMiniBatchIterator ( g * Graph , batchSize int ) * MiniBatchIterator {
// Get all node IDs and shuffle
nodeIDs := g . GetAllNodeIDs ()
rand . Shuffle ( len ( nodeIDs ), func ( i , j int ) {
nodeIDs [ i ], nodeIDs [ j ] = nodeIDs [ j ], nodeIDs [ i ]
})
return & MiniBatchIterator {
graph : g ,
batchSize : batchSize ,
nodeIDs : nodeIDs ,
}
}
func ( it * MiniBatchIterator ) Next () ([] byte , error ) {
if it . pos >= len ( it . nodeIDs ) {
return nil , nil // End of epoch
}
// Get next batch of node IDs
end := it . pos + it . batchSize
if end > len ( it . nodeIDs ) {
end = len ( it . nodeIDs )
}
batchIDs := it . nodeIDs [ it . pos : end ]
it . pos = end
// Create graph stream with nodes and their edges
var buf bytes . Buffer
sw := graph . NewStreamWriter ( & buf )
// Write nodes
sw . Header (). AddLabel ( "Node" )
sw . Header (). AddField ( "features" )
sw . Header (). AddField ( "label" )
sw . WriteHeader ()
for _ , id := range batchIDs {
node := it . graph . GetNode ( id )
sw . WriteNode ( & graph . NodeEvent {
Op : graph . OpUpsert ,
ID : id ,
Labels : [] string { "Node" },
Props : map [ string ] any {
"features" : node . FeatureVector (),
"label" : node . Label ,
},
})
}
// Write edges (for message passing)
for _ , id := range batchIDs {
for _ , edge := range it . graph . GetOutgoingEdges ( id ) {
sw . WriteEdge ( & graph . EdgeEvent {
Op : graph . OpUpsert ,
Label : "EDGE" ,
FromID : edge . From ,
ToID : edge . To ,
Props : map [ string ] any {
"weight" : edge . Weight ,
},
})
}
}
sw . Close ()
return buf . Bytes (), nil
}
func main () {
g := LoadGraph ( "graph.json" )
// Create mini-batch iterator
iter := NewMiniBatchIterator ( g , 32 )
// Training loop
for epoch := 0 ; epoch < 100 ; epoch ++ {
iter . pos = 0 // Reset for new epoch
for {
batchData , err := iter . Next ()
if err != nil || batchData == nil {
break
}
// Send to GNN trainer
trainOnBatch ( batchData )
}
}
}
Streaming Large Batches
For very large batches, use streaming I/O:
import " io "
// Stream writer (e.g., to file or network)
f , _ := os . Create ( "graph_batch.cowrie" )
defer f . Close ()
sw := graph . NewStreamWriter ( f )
sw . WriteHeader ()
// Stream millions of nodes
for i := 0 ; i < 10_000_000 ; i ++ {
sw . WriteNode ( & graph . NodeEvent {
Op : graph . OpUpsert ,
ID : fmt . Sprintf ( "node_ %d " , i ),
Labels : [] string { "Node" },
Props : map [ string ] any {
"value" : rand . Float64 (),
},
})
// Flush periodically to control memory
if i % 10000 == 0 {
f . Sync ()
}
}
sw . Close ()
Dictionary Pre-Population
Always pre-populate dictionaries for best compression:
sw := graph . NewStreamWriter ( & buf )
// Add all expected property keys upfront
for _ , key := range [] string { "name" , "age" , "score" , "vector" , "type" } {
sw . Header (). AddField ( key )
}
// Add all expected labels
for _ , label := range [] string { "Person" , "Document" , "Entity" } {
sw . Header (). AddLabel ( label )
}
sw . WriteHeader ()
// Now all events use dictionary indices
Batch Size Guidelines
Use Case Recommended Batch Size Notes GNN Training 32-512 nodes Fits in GPU memory Bulk Loading 10K-100K nodes Balance memory and I/O Streaming 100-1000 nodes Low latency Analytics 1M+ nodes Maximize throughput
Memory vs. I/O Trade-offs
// High memory, low I/O
sw := graph . NewStreamWriter ( & buf )
for i := 0 ; i < 1_000_000 ; i ++ {
sw . WriteNode ( node )
}
data := buf . Bytes () // All in memory
// Low memory, high I/O
sw := graph . NewStreamWriter ( file )
for i := 0 ; i < 1_000_000 ; i ++ {
sw . WriteNode ( node )
if i % 1000 == 0 {
file . Sync () // Periodic flush
}
}
Batch Statistics
Track batch loading performance:
import " github.com/Neumenon/cowrie/graph/loader "
// Wrap writer with stats collector
memGraph := loader . NewMemoryGraph ()
statsWriter := loader . NewStatsWriter ( memGraph )
// Load batch
loader . LoadFromStream ( batchData , statsWriter )
// Print stats
stats := statsWriter . Stats ()
fmt . Printf ( "Nodes: %d , Edges: %d \n " ,
stats . NodesWritten , stats . EdgesWritten )
Format Tag Name Structure Gen1 0x13 NodeBatch count + Node[count] Gen1 0x14 EdgeBatch count + Edge[count] Gen2 0x37 NodeBatch count + Node[count] (dict-coded) Gen2 0x38 EdgeBatch count + Edge[count] (dict-coded)
Use Gen2 format for batches with repeated schemas. The dictionary overhead is amortized across all elements, resulting in 70-80% size reduction.