Overview
Build real-time intelligence dashboards that ingest data from multiple sources (RSS, news APIs, social media), use AI to categorize, summarize, deduplicate, and score severity, then push updates to a live dashboard. Think of it as your own AI-powered situation room.
Instructions
When a user asks to build a news monitoring system, intelligence dashboard, or event aggregation feed:
- Define scope — What topics/regions/industries to monitor?
- Select sources — RSS feeds, NewsAPI, social APIs, custom scrapers
- Set up pipeline — Ingest → Deduplicate → Classify → Summarize → Score
- Build output — API + WebSocket for real-time push, alert rules
Source Ingestion (Python)
python
"""Fetch from multiple source types in parallel."""
import asyncio, hashlib, feedparser, httpx
from datetime import datetime, timezone
class NewsItem:
def __init__(self, title: str, content: str, source: str, url: str, published: datetime):
self.title, self.content, self.source, self.url = title, content, source, url
self.published = published
self.id = hashlib.sha256(f"{title}{url}".encode()).hexdigest()[:16]
async def fetch_rss(feeds: list[str]) -> list[NewsItem]:
items = []
async with httpx.AsyncClient(timeout=15) as client:
responses = await asyncio.gather(*[client.get(url) for url in feeds], return_exceptions=True)
for resp in responses:
if isinstance(resp, Exception):
continue
feed = feedparser.parse(resp.text)
for entry in feed.entries[:20]:
items.append(NewsItem(
title=entry.get("title", ""), content=entry.get("summary", ""),
source=feed.feed.get("title", "RSS"), url=entry.get("link", ""),
published=datetime.now(timezone.utc),
))
return items
async def fetch_newsapi(query: str, api_key: str) -> list[NewsItem]:
async with httpx.AsyncClient() as client:
resp = await client.get("https://newsapi.org/v2/everything",
params={"q": query, "sortBy": "publishedAt", "pageSize": 50},
headers={"X-Api-Key": api_key})
data = resp.json()
return [
NewsItem(title=a["title"], content=a.get("description", ""),
source=a["source"]["name"], url=a["url"],
published=datetime.fromisoformat(a["publishedAt"].replace("Z", "+00:00")))
for a in data.get("articles", [])
]
Deduplication
python
from difflib import SequenceMatcher
def deduplicate(items: list[NewsItem], threshold: float = 0.75) -> list[NewsItem]:
unique, seen_titles = [], []
for item in sorted(items, key=lambda x: x.published, reverse=True):
is_dup = any(SequenceMatcher(None, item.title.lower(), s.lower()).ratio() > threshold for s in seen_titles)
if not is_dup:
unique.append(item)
seen_titles.append(item.title)
return unique
AI Classification & Summarization
python
import json
from openai import OpenAI
client = OpenAI()
CATEGORIES = ["geopolitics", "technology", "finance", "security", "climate", "health", "regulation", "market-move"]
def analyze_article(item: NewsItem) -> dict:
response = client.chat.completions.create(
model="gpt-4o-mini", response_format={"type": "json_object"},
messages=[
{"role": "system", "content": f"Analyze this news. Return JSON: {{category: one of {CATEGORIES}, severity: 1-10, summary: '2-3 sentences', entities: [], sentiment: 'positive|negative|neutral', actionable: bool}}"},
{"role": "user", "content": f"Title: {item.title}\n\nContent: {item.content[:2000]}"},
],
)
analysis = json.loads(response.choices[0].message.content)
return {**analysis, "id": item.id, "title": item.title, "url": item.url, "source": item.source}
Dashboard API (Node.js)
typescript
import express from "express";
import { WebSocketServer, WebSocket } from "ws";
import { createServer } from "http";
interface IntelItem {
id: string; title: string; summary: string; category: string;
severity: number; source: string; url: string; timestamp: string;
}
const app = express();
const server = createServer(app);
const wss = new WebSocketServer({ server });
let feed: IntelItem[] = [];
const clients = new Set<WebSocket>();
wss.on("connection", (ws) => {
clients.add(ws);
ws.send(JSON.stringify({ type: "init", items: feed.slice(0, 50) }));
ws.on("close", () => clients.delete(ws));
});
function broadcast(item: IntelItem) {
const msg = JSON.stringify({ type: "new", item });
clients.forEach((ws) => { if (ws.readyState === WebSocket.OPEN) ws.send(msg); });
}
app.post("/api/ingest", express.json(), (req, res) => {
const item: IntelItem = { ...req.body, timestamp: new Date().toISOString() };
feed.unshift(item);
feed = feed.slice(0, 1000);
broadcast(item);
res.json({ ok: true });
});
app.get("/api/feed", (req, res) => {
let items = feed;
const { category, minSeverity, limit } = req.query;
if (category) items = items.filter((i) => i.category === category);
if (minSeverity) items = items.filter((i) => i.severity >= Number(minSeverity));
res.json(items.slice(0, Number(limit) || 50));
});
server.listen(3000, () => console.log("Intelligence dashboard on :3000"));
Alert Rules
python
import httpx
ALERT_RULES = [
{"category": "security", "min_severity": 7, "channel": "slack"},
{"category": "market-move", "min_severity": 8, "channel": "email"},
{"category": "*", "min_severity": 9, "channel": "all"},
]
async def check_alerts(item: dict):
for rule in ALERT_RULES:
cat_match = rule["category"] == "*" or rule["category"] == item["category"]
if cat_match and item["severity"] >= rule["min_severity"]:
await send_alert(rule["channel"], item)
async def send_alert(channel: str, item: dict):
msg = f"[{item['category'].upper()}] Severity {item['severity']}/10\n{item['title']}\n{item['summary']}"
if channel in ("slack", "all"):
await httpx.AsyncClient().post("https://hooks.slack.com/services/YOUR/WEBHOOK", json={"text": msg})
Examples
Example 1: Tech Industry News Monitor
python
RSS_FEEDS = [
"https://feeds.arstechnica.com/arstechnica/index",
"https://techcrunch.com/feed/",
"https://www.theverge.com/rss/index.xml",
]
async def run_tech_monitor():
items = await fetch_rss(RSS_FEEDS)
unique = deduplicate(items)
for item in unique[:20]:
analysis = analyze_article(item)
async with httpx.AsyncClient() as c:
await c.post("http://localhost:3000/api/ingest", json=analysis)
await check_alerts(analysis)
# Output: {"category": "technology", "severity": 4, "summary": "Apple announced..."}
Example 2: Geopolitical Event Monitoring with Scheduled Polling
python
async def monitor_loop(interval_minutes: int = 15):
"""Run the full pipeline on a schedule."""
while True:
items = await fetch_rss(RSS_FEEDS)
newsapi_items = await fetch_newsapi("geopolitics OR sanctions", NEWSAPI_KEY)
all_items = items + newsapi_items
unique = deduplicate(all_items)
for item in unique:
analysis = analyze_article(item)
async with httpx.AsyncClient() as c:
await c.post("http://localhost:3000/api/ingest", json=analysis)
await check_alerts(analysis)
await asyncio.sleep(interval_minutes * 60)
# Runs every 15 min, deduplicates across sources, alerts on severity >= 7
Guidelines
- Dedup aggressively — The same story appears across 20+ outlets. Dedup by title similarity
- Batch AI calls — Process 10 articles per LLM call instead of 1 to save ~90% on API costs
- Severity calibration — Periodically review severity scores. LLMs tend to over-rate severity
- Source diversity — Mix mainstream, niche, and social sources for balanced coverage
- Rate limit respect — Cache RSS feeds for 15-30 min. Don't hammer free APIs
- Historical storage — Keep analyzed articles in a DB for trend analysis over time
Dependencies
bash
pip install feedparser httpx openai # Python pipeline
npm install express ws # Node.js dashboard