The Problem
Pavel leads intelligence at a 20-person fintech. Their analysts monitor 50+ news sources manually for market-moving events — regulatory changes, company earnings, geopolitical developments. They miss critical news because it's buried in noise. Duplicate stories from different outlets waste time. There's no way to filter by topic relevance or sentiment. By the time an analyst reads and forwards a relevant article, the trading window is gone. They need automated aggregation: multi-source ingestion, AI categorization, deduplication, sentiment scoring, and real-time alerts.
Step 1: Build the News Aggregation Engine
typescript
// src/news/aggregator.ts — Multi-source news aggregation with AI categorization
import { pool } from "../db";
import { Redis } from "ioredis";
import { createHash, randomBytes } from "node:crypto";
import { parseStringPromise } from "xml2js";
const redis = new Redis(process.env.REDIS_URL!);
interface NewsSource {
id: string;
name: string;
url: string;
type: "rss" | "atom" | "json_feed";
category: string;
refreshIntervalMs: number;
lastFetchedAt: string | null;
status: "active" | "error" | "disabled";
}
interface Article {
id: string;
sourceId: string;
title: string;
summary: string;
content: string;
url: string;
author: string;
publishedAt: string;
categories: string[];
sentiment: { score: number; label: "positive" | "negative" | "neutral" };
relevanceScore: number;
contentHash: string;
isDuplicate: boolean;
duplicateOf: string | null;
metadata: Record<string, any>;
}
interface AlertRule {
id: string;
name: string;
conditions: {
keywords?: string[];
categories?: string[];
sentimentMin?: number;
sentimentMax?: number;
sources?: string[];
};
actions: Array<{ type: "webhook" | "email" | "push"; target: string }>;
userId: string;
}
// Fetch and process articles from a source
export async function fetchSource(source: NewsSource): Promise<Article[]> {
const response = await fetch(source.url, {
headers: { "User-Agent": "NewsAggregator/1.0" },
signal: AbortSignal.timeout(10000),
});
const text = await response.text();
const items = await parseRSS(text);
const articles: Article[] = [];
for (const item of items) {
const contentHash = createHash("sha256")
.update(item.title + item.link)
.digest("hex")
.slice(0, 16);
// Check for duplicates
const existing = await pool.query(
"SELECT id FROM articles WHERE content_hash = $1",
[contentHash]
);
if (existing.rows.length > 0) continue;
// Check cross-source duplicates by title similarity
const duplicate = await findDuplicate(item.title);
// AI categorization and sentiment
const categories = categorize(item.title + " " + (item.description || ""));
const sentiment = analyzeSentiment(item.title + " " + (item.description || ""));
const relevance = calculateRelevance(item, categories, sentiment);
const article: Article = {
id: `art-${randomBytes(6).toString("hex")}`,
sourceId: source.id,
title: item.title,
summary: (item.description || "").slice(0, 500),
content: item["content:encoded"] || item.description || "",
url: item.link,
author: item.author || item["dc:creator"] || source.name,
publishedAt: item.pubDate ? new Date(item.pubDate).toISOString() : new Date().toISOString(),
categories,
sentiment,
relevanceScore: relevance,
contentHash,
isDuplicate: !!duplicate,
duplicateOf: duplicate,
metadata: {},
};
await pool.query(
`INSERT INTO articles (id, source_id, title, summary, content, url, author, published_at, categories, sentiment, relevance_score, content_hash, is_duplicate, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, NOW())`,
[article.id, source.id, article.title, article.summary, article.content,
article.url, article.author, article.publishedAt,
JSON.stringify(categories), JSON.stringify(sentiment),
relevance, contentHash, article.isDuplicate]
);
// Publish for real-time dashboard
await redis.publish("news:new", JSON.stringify(article));
articles.push(article);
// Check alert rules
if (!article.isDuplicate) await checkAlerts(article);
}
// Update source status
await pool.query(
"UPDATE news_sources SET last_fetched_at = NOW(), status = 'active' WHERE id = $1",
[source.id]
);
return articles;
}
// Categorization using keyword matching (in production: LLM-based)
function categorize(text: string): string[] {
const lower = text.toLowerCase();
const categories: string[] = [];
const rules: Record<string, string[]> = {
"regulation": ["sec", "regulation", "compliance", "law", "legislation", "federal"],
"earnings": ["earnings", "revenue", "profit", "quarterly", "fiscal", "eps"],
"markets": ["stock", "market", "index", "trading", "nasdaq", "s&p"],
"technology": ["ai", "machine learning", "blockchain", "cloud", "saas"],
"geopolitics": ["sanctions", "tariff", "trade war", "diplomatic", "nato"],
"crypto": ["bitcoin", "ethereum", "crypto", "defi", "token"],
"macro": ["inflation", "fed", "interest rate", "gdp", "unemployment"],
};
for (const [category, keywords] of Object.entries(rules)) {
if (keywords.some((kw) => lower.includes(kw))) categories.push(category);
}
return categories.length > 0 ? categories : ["general"];
}
// Sentiment analysis (simplified — production would use LLM)
function analyzeSentiment(text: string): { score: number; label: "positive" | "negative" | "neutral" } {
const positive = ["growth", "surge", "gain", "profit", "breakthrough", "upgrade", "bull"];
const negative = ["crash", "loss", "decline", "warning", "risk", "crisis", "bearish"];
const lower = text.toLowerCase();
let score = 0;
positive.forEach((w) => { if (lower.includes(w)) score += 0.2; });
negative.forEach((w) => { if (lower.includes(w)) score -= 0.2; });
score = Math.max(-1, Math.min(1, score));
return { score, label: score > 0.1 ? "positive" : score < -0.1 ? "negative" : "neutral" };
}
function calculateRelevance(item: any, categories: string[], sentiment: any): number {
let score = 0.5;
if (categories.includes("regulation") || categories.includes("macro")) score += 0.2;
if (Math.abs(sentiment.score) > 0.3) score += 0.15; // strong sentiment = more relevant
return Math.min(1, score);
}
async function findDuplicate(title: string): Promise<string | null> {
// Simple trigram similarity check
const { rows } = await pool.query(
`SELECT id FROM articles WHERE similarity(title, $1) > 0.6 AND created_at > NOW() - INTERVAL '24 hours' LIMIT 1`,
[title]
);
return rows.length > 0 ? rows[0].id : null;
}
async function parseRSS(xml: string): Promise<any[]> {
const result = await parseStringPromise(xml, { explicitArray: false });
if (result.rss) return [].concat(result.rss.channel.item || []);
if (result.feed) return [].concat(result.feed.entry || []).map((e: any) => ({
title: e.title?.["_"] || e.title,
link: e.link?.$?.href || e.link,
description: e.summary?.["_"] || e.summary || e.content?.["_"] || "",
pubDate: e.published || e.updated,
}));
return [];
}
async function checkAlerts(article: Article): Promise<void> {
const { rows: rules } = await pool.query("SELECT * FROM alert_rules WHERE enabled = true");
for (const rule of rules) {
const conditions = JSON.parse(rule.conditions);
let matches = true;
if (conditions.keywords?.length) {
const lower = (article.title + " " + article.summary).toLowerCase();
matches = conditions.keywords.some((kw: string) => lower.includes(kw.toLowerCase()));
}
if (matches && conditions.categories?.length) {
matches = conditions.categories.some((c: string) => article.categories.includes(c));
}
if (matches) {
const actions = JSON.parse(rule.actions);
for (const action of actions) {
await redis.rpush("alerts:queue", JSON.stringify({ article, action }));
}
}
}
}
// Dashboard API
export async function getDashboard(options?: {
categories?: string[]; sentiment?: string; limit?: number; offset?: number;
}): Promise<{ articles: Article[]; total: number }> {
let sql = "SELECT * FROM articles WHERE is_duplicate = false";
const params: any[] = [];
let idx = 1;
if (options?.categories?.length) {
sql += ` AND categories::jsonb ?| $${idx}`;
params.push(options.categories);
idx++;
}
sql += " ORDER BY published_at DESC";
sql += ` LIMIT $${idx} OFFSET $${idx + 1}`;
params.push(options?.limit || 50, options?.offset || 0);
const { rows } = await pool.query(sql, params);
const { rows: [{ count }] } = await pool.query(
"SELECT COUNT(*) as count FROM articles WHERE is_duplicate = false"
);
return { articles: rows, total: parseInt(count) };
}
Results
- 50 sources monitored automatically — RSS feeds polled every 5 minutes; 500+ articles/day processed; analysts focus on reading, not searching
- Deduplication saves 40% time — same story from Reuters, Bloomberg, and AP rendered once with source links; no redundant reading
- AI categorization — "SEC announces new crypto regulation" auto-tagged [regulation, crypto]; analysts filter to their domain in one click
- Real-time alerts — "regulation" + negative sentiment triggers Slack alert in <30 seconds; trading team acts before manual review would even see it
- Sentiment trending — dashboard shows sentiment shift from neutral to negative across macro category over 48 hours; early warning for market moves