Files
damus/devtools/visualize_stream_pipeline.py
Daniel D’Aquino 690f8b891e Implement timestamp-based network subscription optimization
Changelog-Changed: Optimized network bandwidth usage and improved timeline performance
Signed-off-by: Daniel D’Aquino <daniel@daquino.me>
2025-10-15 14:15:55 -07:00

475 lines
15 KiB
Python

#!/usr/bin/env python3
"""Generate interactive Sankey diagram from network CSV data using Plotly."""
from __future__ import annotations
import argparse
import csv
from datetime import datetime
from pathlib import Path
from collections import defaultdict
from typing import Dict, List, Tuple, Optional
import plotly.graph_objects as go
import plotly.express as px
def parse_timestamp(timestamp_str: str) -> float:
"""Parse timestamp string and return as milliseconds since epoch."""
# Strip whitespace
timestamp_str = timestamp_str.strip()
# Remove any prefix (e.g., "STREAM_PIPELINE: ")
if ": " in timestamp_str:
timestamp_str = timestamp_str.split(": ", 1)[1]
try:
# Try parsing as ISO format with milliseconds
dt = datetime.fromisoformat(timestamp_str)
return dt.timestamp() * 1000
except ValueError:
try:
# Try replacing space with 'T' for ISO format (e.g., "2025-10-13 15:36:46.3650")
if " " in timestamp_str and "-" in timestamp_str:
timestamp_str = timestamp_str.replace(" ", "T")
dt = datetime.fromisoformat(timestamp_str)
return dt.timestamp() * 1000
raise ValueError()
except ValueError:
try:
# Try parsing as float (milliseconds)
return float(timestamp_str)
except ValueError:
raise ValueError(f"Could not parse timestamp: {timestamp_str}")
def load_network_data(csv_file: str, start_time: Optional[str] = None,
end_time: Optional[str] = None) -> Dict[Tuple[str, str], int]:
"""
Load network data from CSV and aggregate edge counts.
Args:
csv_file: Path to CSV file
start_time: Optional start time filter (ISO format)
end_time: Optional end time filter (ISO format)
Returns:
Dictionary mapping (source, destination) tuples to counts
"""
edge_counts = defaultdict(int)
timestamps = []
# Parse time filters if provided
start_ts = parse_timestamp(start_time) if start_time else None
end_ts = parse_timestamp(end_time) if end_time else None
with open(csv_file, 'r') as f:
reader = csv.reader(f)
# Skip header if present
first_row = next(reader, None)
if first_row is None:
print("Empty CSV file")
return edge_counts
# Check if first row is a header
try:
parse_timestamp(first_row[0])
rows = [first_row] # First row is data
except (ValueError, IndexError):
rows = [] # First row is header, skip it
# Add remaining rows
rows.extend(reader)
for row_idx, row in enumerate(rows):
if len(row) < 3:
print(f"Skipping invalid row {row_idx + 1}: {row}")
continue
try:
timestamp_str = row[0]
source = row[1].strip()
destination = row[2].strip()
# Parse timestamp
timestamp_ms = parse_timestamp(timestamp_str)
# Apply time filters
if start_ts and timestamp_ms < start_ts:
continue
if end_ts and timestamp_ms > end_ts:
continue
timestamps.append(timestamp_ms)
edge_counts[(source, destination)] += 1
except (ValueError, IndexError) as e:
print(f"Error processing row {row_idx + 1}: {e}")
continue
if timestamps:
start_dt = datetime.fromtimestamp(min(timestamps) / 1000.0)
end_dt = datetime.fromtimestamp(max(timestamps) / 1000.0)
print(f"\nLoaded {sum(edge_counts.values())} events")
print(f"Time range: {start_dt} to {end_dt}")
print(f"Unique edges: {len(edge_counts)}")
return edge_counts
def filter_top_edges(edge_counts: Dict[Tuple[str, str], int],
top_n: Optional[int] = None) -> Dict[Tuple[str, str], int]:
"""Filter to keep only top N most active edges."""
if top_n is None or top_n <= 0:
return edge_counts
# Sort by count and take top N
sorted_edges = sorted(edge_counts.items(), key=lambda x: x[1], reverse=True)
return dict(sorted_edges[:top_n])
def filter_top_nodes(edge_counts: Dict[Tuple[str, str], int],
top_n: Optional[int] = None) -> Dict[Tuple[str, str], int]:
"""Filter to keep only edges involving top N most active nodes."""
if top_n is None or top_n <= 0:
return edge_counts
# Calculate node activity (both as source and destination)
node_activity = defaultdict(int)
for (source, dest), count in edge_counts.items():
node_activity[source] += count
node_activity[dest] += count
# Get top N nodes
top_nodes = set(sorted(node_activity.items(), key=lambda x: x[1], reverse=True)[:top_n])
top_nodes = {node for node, _ in top_nodes}
# Filter edges to only include top nodes
filtered = {}
for (source, dest), count in edge_counts.items():
if source in top_nodes and dest in top_nodes:
filtered[(source, dest)] = count
return filtered
def create_sankey_diagram(edge_counts: Dict[Tuple[str, str], int],
title: str = "Network Flow Sankey Diagram",
color_scheme: str = "Viridis",
show_values: bool = True) -> go.Figure:
"""
Create an interactive Sankey diagram from edge counts.
Args:
edge_counts: Dictionary mapping (source, destination) to flow count
title: Title for the diagram
color_scheme: Plotly color scheme name
show_values: Whether to show flow values on hover
Returns:
Plotly Figure object
"""
if not edge_counts:
print("No data to visualize")
return go.Figure()
# Create node list (unique sources and destinations)
all_nodes = set()
for source, dest in edge_counts.keys():
all_nodes.add(source)
all_nodes.add(dest)
# Create node index mapping
node_list = sorted(all_nodes)
node_to_idx = {node: idx for idx, node in enumerate(node_list)}
# Prepare Sankey data
sources = []
targets = []
values = []
link_colors = []
for (source, dest), count in edge_counts.items():
sources.append(node_to_idx[source])
targets.append(node_to_idx[dest])
values.append(count)
# Calculate node colors based on total flow
node_flow = defaultdict(int)
for (source, dest), count in edge_counts.items():
node_flow[source] += count
node_flow[dest] += count
# Get color scale
max_flow = max(node_flow.values()) if node_flow else 1
colors = px.colors.sample_colorscale(
color_scheme,
[node_flow.get(node, 0) / max_flow for node in node_list]
)
# Create link colors (semi-transparent version of source node color)
for source_idx in sources:
color = colors[source_idx]
# Convert to rgba with transparency
if color.startswith('rgb'):
link_colors.append(color.replace('rgb', 'rgba').replace(')', ', 0.4)'))
else:
link_colors.append(color)
# Create hover text for nodes
node_hover = []
for node in node_list:
total_flow = node_flow.get(node, 0)
# Calculate in/out flows
inflow = sum(count for (s, d), count in edge_counts.items() if d == node)
outflow = sum(count for (s, d), count in edge_counts.items() if s == node)
hover_text = f"<b>{node}</b><br>"
hover_text += f"Total Flow: {total_flow}<br>"
hover_text += f"Inflow: {inflow}<br>"
hover_text += f"Outflow: {outflow}"
node_hover.append(hover_text)
# Create hover text for links
link_hover = []
for i, ((source, dest), count) in enumerate(edge_counts.items()):
hover_text = f"<b>{source}{dest}</b><br>"
hover_text += f"Flow: {count} events<br>"
if sum(values) > 0:
percentage = (count / sum(values)) * 100
hover_text += f"Percentage: {percentage:.1f}%"
link_hover.append(hover_text)
# Create the Sankey diagram
fig = go.Figure(data=[go.Sankey(
node=dict(
pad=15,
thickness=20,
line=dict(color="black", width=0.5),
label=node_list,
color=colors,
customdata=node_hover,
hovertemplate='%{customdata}<extra></extra>'
),
link=dict(
source=sources,
target=targets,
value=values,
color=link_colors,
customdata=link_hover,
hovertemplate='%{customdata}<extra></extra>'
)
)])
# Update layout
fig.update_layout(
title=dict(
text=title,
font=dict(size=20, color='#333')
),
font=dict(size=12),
plot_bgcolor='white',
paper_bgcolor='white',
height=800,
margin=dict(l=20, r=20, t=80, b=20)
)
return fig
def print_summary_statistics(edge_counts: Dict[Tuple[str, str], int]) -> None:
"""Print summary statistics about the network flows."""
if not edge_counts:
print("No data to summarize")
return
print("\n" + "="*70)
print("SANKEY DIAGRAM SUMMARY")
print("="*70)
# Calculate statistics
total_events = sum(edge_counts.values())
unique_edges = len(edge_counts)
all_sources = {source for source, _ in edge_counts.keys()}
all_destinations = {dest for _, dest in edge_counts.keys()}
all_nodes = all_sources | all_destinations
print(f"\nTotal Events: {total_events}")
print(f"Unique Edges: {unique_edges}")
print(f"Unique Nodes: {len(all_nodes)}")
print(f" - Source nodes: {len(all_sources)}")
print(f" - Destination nodes: {len(all_destinations)}")
# Node activity
node_activity = defaultdict(lambda: {'in': 0, 'out': 0, 'total': 0})
for (source, dest), count in edge_counts.items():
node_activity[source]['out'] += count
node_activity[source]['total'] += count
node_activity[dest]['in'] += count
node_activity[dest]['total'] += count
print(f"\nTop 10 Most Active Edges:")
sorted_edges = sorted(edge_counts.items(), key=lambda x: x[1], reverse=True)
for i, ((source, dest), count) in enumerate(sorted_edges[:10], 1):
pct = (count / total_events) * 100
print(f" {i:2d}. {source:<25s}{dest:<25s} {count:>6d} ({pct:>5.1f}%)")
print(f"\nTop 10 Most Active Nodes (by total flow):")
sorted_nodes = sorted(node_activity.items(), key=lambda x: x[1]['total'], reverse=True)
for i, (node, flows) in enumerate(sorted_nodes[:10], 1):
print(f" {i:2d}. {node:<30s} Total: {flows['total']:>6d} "
f"(In: {flows['in']:>5d}, Out: {flows['out']:>5d})")
print("\n" + "="*70 + "\n")
def main() -> None:
parser = argparse.ArgumentParser(
description="Generate interactive Sankey diagram from network CSV data.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Generate basic Sankey diagram
%(prog)s data.csv
# Filter to top 20 edges and save to HTML
%(prog)s data.csv --top-edges 20 --output sankey.html
# Filter to top 15 nodes with custom title
%(prog)s data.csv --top-nodes 15 --title "My Network Flows"
# Filter by time range
%(prog)s data.csv --start-time "2025-01-13 10:00:00" --end-time "2025-01-13 12:00:00"
# Combine filters
%(prog)s data.csv --top-nodes 10 --color-scheme Plasma --output flows.html
"""
)
parser.add_argument(
"csv_file",
type=str,
help="Path to CSV file with format: timestamp, source_node, destination_node"
)
parser.add_argument(
"--output",
type=str,
default=None,
help="Output HTML file path (if not specified, opens in browser)"
)
parser.add_argument(
"--top-edges",
type=int,
default=None,
help="Show only top N most active edges (default: all)"
)
parser.add_argument(
"--top-nodes",
type=int,
default=None,
help="Show only edges involving top N most active nodes (default: all)"
)
parser.add_argument(
"--start-time",
type=str,
default=None,
help="Start time filter (ISO format, e.g., '2025-01-13 10:00:00')"
)
parser.add_argument(
"--end-time",
type=str,
default=None,
help="End time filter (ISO format, e.g., '2025-01-13 12:00:00')"
)
parser.add_argument(
"--title",
type=str,
default="Network Flow Sankey Diagram",
help="Title for the diagram (default: 'Network Flow Sankey Diagram')"
)
parser.add_argument(
"--color-scheme",
type=str,
default="Viridis",
choices=["Viridis", "Plasma", "Inferno", "Magma", "Cividis", "Turbo",
"Blues", "Greens", "Reds", "Purples", "Rainbow"],
help="Color scheme for nodes (default: Viridis)"
)
parser.add_argument(
"--no-summary",
action="store_true",
help="Skip printing summary statistics"
)
parser.add_argument(
"--auto-open",
action="store_true",
help="Automatically open in browser (default: True if no output file specified)"
)
args = parser.parse_args()
# Check if CSV file exists
csv_path = Path(args.csv_file)
if not csv_path.exists():
print(f"Error: CSV file not found: {args.csv_file}")
return
# Load data
print(f"Loading data from {args.csv_file}...")
edge_counts = load_network_data(args.csv_file, args.start_time, args.end_time)
if not edge_counts:
print("No data to visualize!")
return
# Apply filters
if args.top_edges:
print(f"Filtering to top {args.top_edges} edges...")
edge_counts = filter_top_edges(edge_counts, args.top_edges)
if args.top_nodes:
print(f"Filtering to edges involving top {args.top_nodes} nodes...")
edge_counts = filter_top_nodes(edge_counts, args.top_nodes)
# Print summary statistics
if not args.no_summary:
print_summary_statistics(edge_counts)
# Create Sankey diagram
print("Generating Sankey diagram...")
fig = create_sankey_diagram(
edge_counts,
title=args.title,
color_scheme=args.color_scheme
)
# Save or show
if args.output:
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
fig.write_html(str(output_path))
print(f"\nSaved Sankey diagram to: {output_path}")
print(f"Open the file in a web browser to view the interactive diagram.")
if args.auto_open:
import webbrowser
webbrowser.open(f"file://{output_path.absolute()}")
else:
print("\nOpening Sankey diagram in browser...")
fig.show()
print("\nDone!")
if __name__ == "__main__":
main()