Documentation Index
Fetch the complete documentation index at: https://mintlify.com/Neumenon/cowrie/llms.txt
Use this file to discover all available pages before exploring further.
Overview
GraphShard is a self-contained subgraph type that packages nodes, edges, and metadata into a single Cowrie value. Designed for:
- GNN Mini-Batch Checkpointing: Save/restore training state with full subgraph context
- Distributed Graph Processing: Partition large graphs across workers (e.g., GraphX, Pregel)
- Graph Database Snapshots: Export/import complete subgraphs with metadata
- Streaming Graph Partitions: Send graph fragments over network with provenance
GraphShard combines nodes, edges, and metadata in a single atomic unit, ensuring consistency during distributed operations.
Tag(0x15) | nodeCount:varint | Node* | edgeCount:varint | Edge* | metaCount:varint | (keyLen:varint | keyBytes | value)*
Structure:
- Node count followed by Gen1 Node elements (tag 0x10)
- Edge count followed by Gen1 Edge elements (tag 0x11)
- Metadata as inline key-value pairs (like Object)
Tag(0x39) | nodeCount:varint | Node* | edgeCount:varint | Edge* | metaCount:varint | (dictIdx:varint | value)*
Structure:
- Node count followed by Gen2 Node elements (tag 0x35)
- Edge count followed by Gen2 Edge elements (tag 0x36)
- Metadata with dictionary-coded keys (shared with nodes/edges)
All property keys (node props, edge props, metadata) share the same dictionary in Gen2, maximizing compression.
Logical Structure
A GraphShard conceptually represents:
{
"nodes": [
{"id": "1", "labels": ["Node"], "props": {"x": 0.1}},
{"id": "2", "labels": ["Node"], "props": {"x": 0.2}}
],
"edges": [
{"from": "1", "to": "2", "type": "EDGE", "props": {"weight": 0.85}}
],
"metadata": {
"version": 1,
"partitionId": 42,
"timestamp": 1677649200,
"source": "graph_db_snapshot"
}
}
Use Cases
1. GNN Mini-Batch Checkpointing
Save complete training batch with neighborhood context:
package main
import (
"bytes"
"github.com/Neumenon/cowrie"
"github.com/Neumenon/cowrie/graph"
)
func createGNNCheckpoint(nodes []*graph.NodeEvent, edges []*graph.EdgeEvent, epoch int, batchID int) []byte {
var buf bytes.Buffer
sw := graph.NewStreamWriter(&buf)
// Pre-populate common keys
sw.Header().AddLabel("Node")
sw.Header().AddLabel("EDGE")
sw.Header().AddField("features")
sw.Header().AddField("label")
sw.Header().AddField("weight")
sw.Header().AddField("epoch")
sw.Header().AddField("batchId")
sw.Header().AddField("timestamp")
sw.WriteHeader()
// Write nodes (mini-batch)
for _, node := range nodes {
sw.WriteNode(node)
}
// Write edges (neighborhood)
for _, edge := range edges {
sw.WriteEdge(edge)
}
// Write checkpoint metadata
// Note: GraphShard metadata is conceptual; in streaming format,
// metadata is stored in stream header or as special events
sw.Close()
return buf.Bytes()
}
func restoreGNNCheckpoint(data []byte) ([]*graph.NodeEvent, []*graph.EdgeEvent, error) {
sr, err := graph.NewStreamReader(data)
if err != nil {
return nil, nil, err
}
var nodes []*graph.NodeEvent
var edges []*graph.EdgeEvent
for {
evt, err := sr.Next()
if err != nil || evt == nil {
break
}
switch evt.Kind {
case graph.EventNode:
nodes = append(nodes, evt.Node)
case graph.EventEdge:
edges = append(edges, evt.Edge)
}
}
return nodes, edges, nil
}
2. Distributed Graph Partitioning
Partition large graph across workers:
type GraphPartition struct {
ID int
NodeIDs []string
Boundary map[string][]string // Cross-partition edges
}
func partitionGraph(g *Graph, numPartitions int) []*GraphPartition {
// Use METIS, spectral clustering, etc.
return metisPartition(g, numPartitions)
}
func createShardForPartition(g *Graph, partition *GraphPartition) []byte {
var buf bytes.Buffer
sw := graph.NewStreamWriter(&buf)
sw.Header().AddLabel("Node")
sw.Header().AddLabel("EDGE")
sw.Header().AddField("partitionId")
sw.Header().AddField("nodeCount")
sw.WriteHeader()
// Write partition nodes
for _, nodeID := range partition.NodeIDs {
node := g.GetNode(nodeID)
sw.WriteNode(&graph.NodeEvent{
Op: graph.OpUpsert,
ID: nodeID,
Labels: node.Labels,
Props: node.Props,
})
}
// Write internal edges
for _, nodeID := range partition.NodeIDs {
for _, edge := range g.GetOutgoingEdges(nodeID) {
// Only include edges within partition
if contains(partition.NodeIDs, edge.To) {
sw.WriteEdge(&graph.EdgeEvent{
Op: graph.OpUpsert,
Label: edge.Label,
FromID: edge.From,
ToID: edge.To,
Props: edge.Props,
})
}
}
}
sw.Close()
return buf.Bytes()
}
// Distribute shards to workers
func distributeGraph(g *Graph, workers []string) {
partitions := partitionGraph(g, len(workers))
for i, partition := range partitions {
shardData := createShardForPartition(g, partition)
sendToWorker(workers[i], shardData)
}
}
3. Graph Database Snapshot
Export complete subgraph with provenance:
func snapshotSubgraph(db *GraphDB, rootID string, depth int) []byte {
var buf bytes.Buffer
sw := graph.NewStreamWriter(&buf)
// Collect subgraph via BFS/DFS
nodes, edges := traverseSubgraph(db, rootID, depth)
sw.Header().AddField("snapshotTime")
sw.Header().AddField("rootNode")
sw.Header().AddField("depth")
sw.Header().AddField("nodeCount")
sw.Header().AddField("edgeCount")
sw.WriteHeader()
// Write nodes
for _, node := range nodes {
sw.WriteNode(node)
}
// Write edges
for _, edge := range edges {
sw.WriteEdge(edge)
}
sw.Close()
return buf.Bytes()
}
func restoreSubgraph(db *GraphDB, shardData []byte) error {
sr, err := graph.NewStreamReader(shardData)
if err != nil {
return err
}
tx := db.BeginTx()
defer tx.Rollback()
events, err := sr.ReadAll()
if err != nil {
return err
}
for _, evt := range events {
switch evt.Kind {
case graph.EventNode:
if err := tx.CreateNode(evt.Node); err != nil {
return err
}
case graph.EventEdge:
if err := tx.CreateEdge(evt.Edge); err != nil {
return err
}
}
}
return tx.Commit()
}
4. Streaming Graph Analytics
Process graph in partitioned chunks:
type GraphAnalyzer struct {
workers int
batchSize int
}
func (a *GraphAnalyzer) PageRank(g *Graph) map[string]float64 {
// Partition graph
partitions := partitionGraph(g, a.workers)
// Create shards
shards := make([][]byte, len(partitions))
for i, partition := range partitions {
shards[i] = createShardForPartition(g, partition)
}
// Distribute to workers
results := make(chan map[string]float64, a.workers)
for _, shard := range shards {
go func(data []byte) {
results <- computeLocalPageRank(data)
}(shard)
}
// Aggregate results
globalScores := make(map[string]float64)
for i := 0; i < a.workers; i++ {
localScores := <-results
for id, score := range localScores {
globalScores[id] += score
}
}
return globalScores
}
func computeLocalPageRank(shardData []byte) map[string]float64 {
sr, _ := graph.NewStreamReader(shardData)
// Load shard into memory
nodes := make(map[string]*graph.NodeEvent)
edges := make([]*graph.EdgeEvent, 0)
for {
evt, err := sr.Next()
if err != nil || evt == nil {
break
}
if evt.Kind == graph.EventNode {
nodes[evt.Node.ID] = evt.Node
} else if evt.Kind == graph.EventEdge {
edges = append(edges, evt.Edge)
}
}
// Run local PageRank iterations
return runPageRank(nodes, edges)
}
Common metadata fields for GraphShards:
// Partition metadata
metadata := map[string]any{
"partitionId": 42,
"totalPartitions": 100,
"nodeCount": 1000,
"edgeCount": 5000,
}
// Snapshot metadata
metadata := map[string]any{
"version": "v1.2.3",
"snapshotTime": time.Now().Unix(),
"rootNode": "alice",
"depth": 3,
"source": "neo4j",
}
// GNN checkpoint metadata
metadata := map[string]any{
"epoch": 10,
"batchId": 42,
"loss": 0.123,
"accuracy": 0.89,
"timestamp": time.Now().Unix(),
"modelVersion": "gnn_v2",
}
// Streaming analytics metadata
metadata := map[string]any{
"windowStart": startTime,
"windowEnd": endTime,
"algorithm": "pagerank",
"iterations": 20,
"convergence": 0.0001,
}
Complete Example: Graph Partitioning System
package main
import (
"bytes"
"fmt"
"sync"
"github.com/Neumenon/cowrie/graph"
)
// GraphShardManager manages distributed graph processing
type GraphShardManager struct {
graph *Graph
numShards int
shards [][]byte
}
func NewGraphShardManager(g *Graph, numShards int) *GraphShardManager {
return &GraphShardManager{
graph: g,
numShards: numShards,
}
}
func (m *GraphShardManager) CreateShards() error {
// Partition graph
partitions := partitionGraph(m.graph, m.numShards)
m.shards = make([][]byte, len(partitions))
// Create shard for each partition
for i, partition := range partitions {
var buf bytes.Buffer
sw := graph.NewStreamWriter(&buf)
// Add common labels/fields
sw.Header().AddLabel("Node")
sw.Header().AddLabel("EDGE")
sw.Header().AddField("partitionId")
sw.Header().AddField("nodeCount")
sw.Header().AddField("edgeCount")
sw.WriteHeader()
nodeCount := 0
edgeCount := 0
// Write nodes in partition
for _, nodeID := range partition.NodeIDs {
node := m.graph.GetNode(nodeID)
sw.WriteNode(&graph.NodeEvent{
Op: graph.OpUpsert,
ID: nodeID,
Labels: node.Labels,
Props: node.Props,
})
nodeCount++
}
// Write internal edges
for _, nodeID := range partition.NodeIDs {
for _, edge := range m.graph.GetOutgoingEdges(nodeID) {
if contains(partition.NodeIDs, edge.To) {
sw.WriteEdge(&graph.EdgeEvent{
Op: graph.OpUpsert,
Label: edge.Label,
FromID: edge.From,
ToID: edge.To,
Props: edge.Props,
})
edgeCount++
}
}
}
sw.Close()
m.shards[i] = buf.Bytes()
fmt.Printf("Shard %d: %d nodes, %d edges, %d bytes\n",
i, nodeCount, edgeCount, len(buf.Bytes()))
}
return nil
}
func (m *GraphShardManager) ProcessInParallel(fn func([]byte) map[string]float64) map[string]float64 {
var wg sync.WaitGroup
resultsChan := make(chan map[string]float64, m.numShards)
// Process each shard in parallel
for _, shard := range m.shards {
wg.Add(1)
go func(data []byte) {
defer wg.Done()
result := fn(data)
resultsChan <- result
}(shard)
}
// Wait and close channel
go func() {
wg.Wait()
close(resultsChan)
}()
// Aggregate results
globalResult := make(map[string]float64)
for localResult := range resultsChan {
for id, value := range localResult {
globalResult[id] += value
}
}
return globalResult
}
func main() {
// Load large graph
g := LoadGraph("large_graph.json")
// Create shard manager
manager := NewGraphShardManager(g, 8)
manager.CreateShards()
// Run PageRank in parallel across shards
scores := manager.ProcessInParallel(func(shardData []byte) map[string]float64 {
return computeLocalPageRank(shardData)
})
// Print top nodes
topNodes := getTopK(scores, 10)
for i, node := range topNodes {
fmt.Printf("%d. %s: %.6f\n", i+1, node.ID, node.Score)
}
}
Shard Size Guidelines
| Graph Size | Nodes per Shard | Edges per Shard | Memory per Shard |
|---|
| Small (1K-10K) | 100-1K | 1K-10K | 10-100 KB |
| Medium (10K-1M) | 1K-10K | 10K-100K | 100KB-1MB |
| Large (1M-100M) | 10K-100K | 100K-1M | 1-10 MB |
| Very Large (100M+) | 100K-1M | 1M-10M | 10-100 MB |
Dictionary Compression
GraphShard benefits significantly from dictionary coding:
Without dictionary (Gen1):
- 10K nodes × 50 bytes/node = 500KB
- 50K edges × 40 bytes/edge = 2MB
- Total: 2.5MB
With dictionary (Gen2):
- Dictionary: 100 keys × 20 bytes = 2KB
- 10K nodes × 10 bytes/node = 100KB
- 50K edges × 8 bytes/edge = 400KB
- Total: 502KB (80% reduction!)
Parallelization Efficiency
// Measure speedup with parallel processing
func benchmarkParallelShards(g *Graph) {
for numShards := 1; numShards <= 16; numShards *= 2 {
start := time.Now()
manager := NewGraphShardManager(g, numShards)
manager.CreateShards()
manager.ProcessInParallel(computeLocalPageRank)
elapsed := time.Since(start)
fmt.Printf("Shards: %d, Time: %v\n", numShards, elapsed)
}
}
// Typical results:
// Shards: 1, Time: 10.5s (baseline)
// Shards: 2, Time: 5.8s (1.8× speedup)
// Shards: 4, Time: 3.2s (3.3× speedup)
// Shards: 8, Time: 1.9s (5.5× speedup)
// Shards: 16, Time: 1.4s (7.5× speedup)
| Format | Tag | Name | Structure |
|---|
| Gen1 | 0x15 | GraphShard | nodes + edges + metadata (inline keys) |
| Gen2 | 0x39 | GraphShard | nodes + edges + metadata (dict-coded) |
Boundary Edges
Handle cross-shard edges:
type ShardWithBoundary struct {
InternalNodes []string
InternalEdges []*graph.EdgeEvent
BoundaryEdges []*graph.EdgeEvent // Edges to other shards
}
func createShardWithBoundary(g *Graph, partition *GraphPartition) []byte {
var buf bytes.Buffer
sw := graph.NewStreamWriter(&buf)
sw.WriteHeader()
// Write internal nodes
for _, nodeID := range partition.NodeIDs {
// ... write node
}
// Write internal edges
for _, edge := range getInternalEdges(partition) {
sw.WriteEdge(edge)
}
// Write boundary edges (marked with metadata)
for _, edge := range getBoundaryEdges(partition) {
edge.Props["boundary"] = true
edge.Props["targetShard"] = getTargetShard(edge.To)
sw.WriteEdge(edge)
}
sw.Close()
return buf.Bytes()
}
For distributed graph algorithms (PageRank, connected components), track boundary edges to enable cross-shard communication.
Error Handling
func loadShard(data []byte) error {
sr, err := graph.NewStreamReader(data)
if err != nil {
return fmt.Errorf("failed to parse shard: %w", err)
}
nodeCount := 0
edgeCount := 0
for {
evt, err := sr.Next()
if err != nil {
return fmt.Errorf("error reading event %d: %w",
nodeCount+edgeCount, err)
}
if evt == nil {
break
}
switch evt.Kind {
case graph.EventNode:
nodeCount++
case graph.EventEdge:
edgeCount++
}
}
fmt.Printf("Loaded shard: %d nodes, %d edges\n", nodeCount, edgeCount)
return nil
}
Always validate shard integrity after deserialization. Check that all edge endpoints exist in the shard’s node set for internal edges.