How to Track Twitter Account Activity in Real Time

How to Track Twitter Account Activity in Real Time

Real-time Twitter account tracking has become essential for crypto traders, researchers, social media managers, and security professionals. Understanding when accounts post, how their engagement patterns change, and what behavioral shifts might indicate requires sophisticated monitoring beyond basic tweet notifications.

This comprehensive guide covers the complete spectrum of Twitter account tracking, from basic tweet monitoring to advanced behavioral analysis. We'll explore the tools, techniques, and strategies needed to build professional-grade tracking systems that provide actionable insights rather than just raw data.

Why Track Twitter Account Activity?

Twitter account tracking serves multiple strategic purposes across different domains:

Types of Account Activity to Monitor

Basic Activity Metrics

Tweet Frequency
12.3
+15% vs last week
Avg. Engagement
2.1K
-8% vs last week
Follower Growth
+142
Daily average
Response Time
23m
-12m vs average

Advanced Behavioral Indicators

Building a Real-Time Tracking System

Data Collection Infrastructure

Effective account tracking requires robust data collection that captures both explicit events (tweets, follows) and implicit signals (engagement patterns, timing changes).

import asyncio
import aioredis
from datetime import datetime, timedelta
from typing import Dict, List, Any
import logging

class TwitterAccountTracker:
    def __init__(self, redis_url="redis://localhost"):
        self.redis_pool = None
        self.tracked_accounts = set()
        self.activity_buffer = {}
        self.logger = logging.getLogger(__name__)
    
    async def start(self):
        """Initialize tracking infrastructure"""
        self.redis_pool = await aioredis.create_redis_pool(redis_url)
        await self.load_tracked_accounts()
    
    async def track_account(self, username: str, tracking_config: Dict = None):
        """Add account to tracking with configuration"""
        config = tracking_config or {
            'monitor_tweets': True,
            'monitor_engagement': True,
            'monitor_followers': True,
            'monitor_posting_patterns': True,
            'alert_thresholds': {
                'unusual_posting_time': True,
                'engagement_spike': 2.0,  # 2x normal engagement
                'follower_spike': 1000,   # 1k followers in short period
                'topic_shift': 0.7        # 70% topic similarity threshold
            }
        }
        
        # Store tracking configuration
        await self.redis_pool.hset(
            f"tracking:{username}",
            mapping={
                'config': json.dumps(config),
                'added_at': datetime.utcnow().isoformat(),
                'last_update': datetime.utcnow().isoformat()
            }
        )
        
        self.tracked_accounts.add(username)
        self.logger.info(f"Started tracking account: {username}")
    
    async def process_tweet_event(self, tweet_data: Dict[str, Any]):
        """Process incoming tweet for tracked account"""
        username = tweet_data['account']['username']
        
        if username not in self.tracked_accounts:
            return
        
        # Store tweet data
        await self._store_tweet_data(username, tweet_data)
        
        # Update activity patterns
        await self._update_activity_patterns(username, tweet_data)
        
        # Check for anomalies
        anomalies = await self._detect_anomalies(username, tweet_data)
        
        if anomalies:
            await self._trigger_alerts(username, anomalies, tweet_data)
    
    async def _store_tweet_data(self, username: str, tweet_data: Dict):
        """Store tweet with timestamp indexing"""
        tweet_key = f"tweets:{username}:{tweet_data['tweet']['id']}"
        
        # Store complete tweet data
        await self.redis_pool.hset(tweet_key, mapping={
            'data': json.dumps(tweet_data),
            'timestamp': tweet_data['timestamp']
        })
        
        # Add to timeline sorted set (score = timestamp)
        timestamp = datetime.fromisoformat(
            tweet_data['timestamp'].replace('Z', '+00:00')
        ).timestamp()
        
        await self.redis_pool.zadd(
            f"timeline:{username}",
            timestamp,
            tweet_data['tweet']['id']
        )
        
        # Maintain timeline size (keep last 1000 tweets)
        await self.redis_pool.zremrangebyrank(
            f"timeline:{username}", 0, -1001
        )
    
    async def _update_activity_patterns(self, username: str, tweet_data: Dict):
        """Update activity pattern analysis"""
        now = datetime.utcnow()
        
        # Update posting time patterns
        hour_of_day = now.hour
        day_of_week = now.weekday()
        
        # Increment hourly posting counter
        await self.redis_pool.hincrby(
            f"patterns:{username}:hourly",
            str(hour_of_day), 1
        )
        
        # Increment daily posting counter
        await self.redis_pool.hincrby(
            f"patterns:{username}:daily",
            str(day_of_week), 1
        )
        
        # Update posting velocity (tweets per hour)
        hour_key = f"velocity:{username}:{now.strftime('%Y%m%d%H')}"
        await self.redis_pool.incr(hour_key)
        await self.redis_pool.expire(hour_key, 86400 * 7)  # Keep 7 days
    
    async def _detect_anomalies(self, username: str, tweet_data: Dict) -> List[Dict]:
        """Detect unusual patterns in account activity"""
        anomalies = []
        
        # Check posting time anomaly
        if await self._is_unusual_posting_time(username):
            anomalies.append({
                'type': 'unusual_posting_time',
                'severity': 'medium',
                'description': 'Account posted at unusual time'
            })
        
        # Check engagement spike
        engagement_ratio = await self._calculate_engagement_ratio(username, tweet_data)
        if engagement_ratio > 2.0:
            anomalies.append({
                'type': 'engagement_spike',
                'severity': 'high',
                'description': f'Engagement {engagement_ratio:.1f}x higher than normal',
                'ratio': engagement_ratio
            })
        
        # Check posting velocity spike
        velocity_ratio = await self._calculate_velocity_ratio(username)
        if velocity_ratio > 3.0:
            anomalies.append({
                'type': 'velocity_spike',
                'severity': 'high',
                'description': f'Posting rate {velocity_ratio:.1f}x higher than normal',
                'ratio': velocity_ratio
            })
        
        return anomalies
    
    async def _is_unusual_posting_time(self, username: str) -> bool:
        """Check if current posting time is unusual for this account"""
        current_hour = datetime.utcnow().hour
        
        # Get historical posting pattern
        hourly_counts = await self.redis_pool.hgetall(f"patterns:{username}:hourly")
        
        if not hourly_counts:
            return False  # No historical data
        
        # Calculate total posts and current hour percentage
        total_posts = sum(int(count) for count in hourly_counts.values())
        current_hour_posts = int(hourly_counts.get(str(current_hour), 0))
        
        # If less than 5% of posts happen at this hour, it's unusual
        if total_posts > 50 and current_hour_posts / total_posts < 0.05:
            return True
        
        return False
    
    async def get_account_activity_summary(self, username: str, days: int = 7) -> Dict:
        """Generate comprehensive activity summary"""
        cutoff_timestamp = (datetime.utcnow() - timedelta(days=days)).timestamp()
        
        # Get recent tweets
        recent_tweet_ids = await self.redis_pool.zrangebyscore(
            f"timeline:{username}",
            cutoff_timestamp,
            '+inf'
        )
        
        # Calculate metrics
        total_tweets = len(recent_tweet_ids)
        tweets_per_day = total_tweets / days if days > 0 else 0
        
        # Get engagement data for recent tweets
        total_engagement = 0
        engagement_samples = []
        
        for tweet_id in recent_tweet_ids[-10:]:  # Sample last 10 tweets
            tweet_key = f"tweets:{username}:{tweet_id}"
            tweet_data_json = await self.redis_pool.hget(tweet_key, 'data')
            
            if tweet_data_json:
                tweet_data = json.loads(tweet_data_json)
                metrics = tweet_data['tweet'].get('metrics', {})
                engagement = (
                    metrics.get('like_count', 0) +
                    metrics.get('retweet_count', 0) +
                    metrics.get('reply_count', 0)
                )
                total_engagement += engagement
                engagement_samples.append(engagement)
        
        avg_engagement = (
            total_engagement / len(engagement_samples)
            if engagement_samples else 0
        )
        
        # Get posting patterns
        hourly_pattern = await self.redis_pool.hgetall(f"patterns:{username}:hourly")
        most_active_hour = max(
            hourly_pattern.items(),
            key=lambda x: int(x[1]),
            default=(None, 0)
        )[0] if hourly_pattern else None
        
        return {
            'username': username,
            'analysis_period_days': days,
            'activity_metrics': {
                'total_tweets': total_tweets,
                'tweets_per_day': round(tweets_per_day, 2),
                'avg_engagement': round(avg_engagement, 1),
                'most_active_hour': int(most_active_hour) if most_active_hour else None
            },
            'engagement_distribution': engagement_samples,
            'posting_pattern': {
                hour: int(count) for hour, count in hourly_pattern.items()
            } if hourly_pattern else {}
        }

# Usage example
async def main():
    tracker = TwitterAccountTracker()
    await tracker.start()
    
    # Track multiple accounts with different configurations
    await tracker.track_account('elonmusk', {
        'alert_thresholds': {
            'engagement_spike': 1.5,  # Lower threshold for high-impact account
            'unusual_posting_time': True
        }
    })
    
    await tracker.track_account('VitalikButerin', {
        'alert_thresholds': {
            'topic_shift': 0.8,  # Monitor for Ethereum-related topic changes
            'engagement_spike': 2.0
        }
    })
    
    # Get activity summary
    summary = await tracker.get_account_activity_summary('elonmusk', days=7)
    print(f"Weekly summary for @elonmusk: {summary}")

if __name__ == "__main__":
    asyncio.run(main())

Advanced Analytics and Pattern Detection

Sentiment Analysis Over Time

Tracking sentiment changes provides insights into account holder mindset and potential market implications:

import numpy as np
from textblob import TextBlob
from transformers import pipeline
import matplotlib.pyplot as plt
from datetime import datetime, timedelta

class SentimentTracker:
    def __init__(self):
        # Initialize transformer model for crypto-specific sentiment
        self.sentiment_analyzer = pipeline(
            "sentiment-analysis",
            model="cardiffnlp/twitter-roberta-base-sentiment-latest"
        )
        self.sentiment_history = {}
    
    async def analyze_tweet_sentiment(self, tweet_data: Dict) -> Dict:
        """Analyze sentiment of individual tweet"""
        text = tweet_data['tweet']['text']
        username = tweet_data['account']['username']
        
        # Multiple sentiment analysis approaches
        sentiments = {}
        
        # 1. TextBlob (traditional approach)
        blob = TextBlob(text)
        sentiments['textblob'] = {
            'polarity': blob.sentiment.polarity,
            'subjectivity': blob.sentiment.subjectivity
        }
        
        # 2. Transformer model (more accurate for social media)
        transformer_result = self.sentiment_analyzer(text)[0]
        sentiments['transformer'] = {
            'label': transformer_result['label'],
            'score': transformer_result['score']
        }
        
        # 3. Crypto-specific keyword analysis
        sentiments['crypto_sentiment'] = self._analyze_crypto_sentiment(text)
        
        # Store sentiment history
        timestamp = datetime.fromisoformat(
            tweet_data['timestamp'].replace('Z', '+00:00')
        )
        
        if username not in self.sentiment_history:
            self.sentiment_history[username] = []
        
        self.sentiment_history[username].append({
            'timestamp': timestamp,
            'tweet_id': tweet_data['tweet']['id'],
            'sentiments': sentiments,
            'engagement': tweet_data['tweet'].get('metrics', {})
        })
        
        return sentiments
    
    def _analyze_crypto_sentiment(self, text: str) -> Dict:
        """Crypto-specific sentiment analysis"""
        text_lower = text.lower()
        
        # Crypto-positive indicators
        positive_indicators = [
            'bull', 'bullish', 'moon', 'rocket', 'pump', 'hodl',
            'diamond hands', 'to the moon', 'break out', 'surge',
            'rally', 'breakthrough', 'adoption', 'mainstream'
        ]
        
        # Crypto-negative indicators
        negative_indicators = [
            'bear', 'bearish', 'dump', 'crash', 'dip', 'correction',
            'paper hands', 'sell off', 'resistance', 'fud',
            'scam', 'rug pull', 'liquidated', 'rekt'
        ]
        
        positive_score = sum(1 for term in positive_indicators if term in text_lower)
        negative_score = sum(1 for term in negative_indicators if term in text_lower)
        
        # Calculate overall crypto sentiment
        if positive_score > negative_score:
            sentiment = 'bullish'
            confidence = min(0.9, 0.5 + (positive_score - negative_score) * 0.1)
        elif negative_score > positive_score:
            sentiment = 'bearish'
            confidence = min(0.9, 0.5 + (negative_score - positive_score) * 0.1)
        else:
            sentiment = 'neutral'
            confidence = 0.5
        
        return {
            'label': sentiment,
            'confidence': confidence,
            'positive_indicators': positive_score,
            'negative_indicators': negative_score
        }
    
    def calculate_sentiment_trend(self, username: str, days: int = 7) -> Dict:
        """Calculate sentiment trend over time period"""
        if username not in self.sentiment_history:
            return {'error': 'No sentiment data available'}
        
        cutoff_date = datetime.utcnow() - timedelta(days=days)
        recent_sentiments = [
            s for s in self.sentiment_history[username]
            if s['timestamp'] > cutoff_date
        ]
        
        if not recent_sentiments:
            return {'error': 'No recent sentiment data'}
        
        # Calculate average sentiments
        textblob_polarities = [
            s['sentiments']['textblob']['polarity']
            for s in recent_sentiments
        ]
        
        crypto_sentiments = [
            1 if s['sentiments']['crypto_sentiment']['label'] == 'bullish' else
            -1 if s['sentiments']['crypto_sentiment']['label'] == 'bearish' else 0
            for s in recent_sentiments
        ]
        
        # Calculate trend
        trend_data = {
            'period_days': days,
            'total_tweets': len(recent_sentiments),
            'avg_polarity': np.mean(textblob_polarities),
            'polarity_std': np.std(textblob_polarities),
            'crypto_sentiment_avg': np.mean(crypto_sentiments),
            'sentiment_volatility': np.std(crypto_sentiments),
            'most_positive_tweet': max(
                recent_sentiments,
                key=lambda x: x['sentiments']['textblob']['polarity']
            )['tweet_id'],
            'most_negative_tweet': min(
                recent_sentiments,
                key=lambda x: x['sentiments']['textblob']['polarity']
            )['tweet_id']
        }
        
        return trend_data
    
    def detect_sentiment_anomalies(self, username: str) -> List[Dict]:
        """Detect unusual sentiment patterns"""
        anomalies = []
        
        if username not in self.sentiment_history:
            return anomalies
        
        recent_data = self.sentiment_history[username][-50:]  # Last 50 tweets
        
        if len(recent_data) < 10:
            return anomalies
        
        # Calculate baseline sentiment
        baseline_polarity = np.mean([
            s['sentiments']['textblob']['polarity'] for s in recent_data[:-5]
        ])
        
        # Check recent tweets for anomalies
        recent_polarities = [
            s['sentiments']['textblob']['polarity'] for s in recent_data[-5:]
        ]
        recent_avg = np.mean(recent_polarities)
        
        # Significant sentiment shift
        if abs(recent_avg - baseline_polarity) > 0.5:
            anomalies.append({
                'type': 'sentiment_shift',
                'severity': 'high' if abs(recent_avg - baseline_polarity) > 0.7 else 'medium',
                'description': f'Sentiment shifted from {baseline_polarity:.2f} to {recent_avg:.2f}',
                'baseline_sentiment': baseline_polarity,
                'recent_sentiment': recent_avg
            })
        
        return anomalies

Network Analysis and Relationship Tracking

Understanding how account interactions change over time reveals shifting alliances and influence patterns:

import networkx as nx
from collections import defaultdict, Counter
import json

class TwitterNetworkTracker:
    def __init__(self):
        self.interaction_graph = nx.DiGraph()
        self.interaction_history = defaultdict(list)
        self.mention_patterns = defaultdict(Counter)
    
    async def process_interaction(self, tweet_data: Dict):
        """Process tweet for network interactions"""
        username = tweet_data['account']['username']
        tweet = tweet_data['tweet']
        timestamp = datetime.fromisoformat(
            tweet_data['timestamp'].replace('Z', '+00:00')
        )
        
        # Extract mentions
        mentions = self._extract_mentions(tweet['text'])
        
        # Extract reply-to information
        reply_to = tweet.get('reply_to_username')
        
        # Extract quote tweet information
        quoted_user = tweet.get('quoted_tweet', {}).get('username')
        
        # Process different interaction types
        interactions = []
        
        # Mentions
        for mentioned_user in mentions:
            interactions.append({
                'type': 'mention',
                'from': username,
                'to': mentioned_user,
                'timestamp': timestamp,
                'tweet_id': tweet['id']
            })
        
        # Replies
        if reply_to:
            interactions.append({
                'type': 'reply',
                'from': username,
                'to': reply_to,
                'timestamp': timestamp,
                'tweet_id': tweet['id']
            })
        
        # Quote tweets
        if quoted_user:
            interactions.append({
                'type': 'quote',
                'from': username,
                'to': quoted_user,
                'timestamp': timestamp,
                'tweet_id': tweet['id']
            })
        
        # Update network graph and history
        for interaction in interactions:
            await self._update_network(interaction)
        
        return interactions
    
    def _extract_mentions(self, text: str) -> List[str]:
        """Extract mentioned usernames from tweet text"""
        import re
        
        mention_pattern = r'@([a-zA-Z0-9_]+)'
        mentions = re.findall(mention_pattern, text)
        
        # Filter out common false positives
        filtered_mentions = [
            m for m in mentions
            if len(m) > 1 and not m.isdigit()
        ]
        
        return filtered_mentions
    
    async def _update_network(self, interaction: Dict):
        """Update network graph with new interaction"""
        from_user = interaction['from']
        to_user = interaction['to']
        interaction_type = interaction['type']
        timestamp = interaction['timestamp']
        
        # Add nodes if they don't exist
        if not self.interaction_graph.has_node(from_user):
            self.interaction_graph.add_node(from_user, first_seen=timestamp)
        if not self.interaction_graph.has_node(to_user):
            self.interaction_graph.add_node(to_user, first_seen=timestamp)
        
        # Add or update edge
        edge_key = f"{from_user}_{to_user}"
        
        if self.interaction_graph.has_edge(from_user, to_user):
            # Update existing edge
            edge_data = self.interaction_graph[from_user][to_user]
            edge_data['weight'] += 1
            edge_data['last_interaction'] = timestamp
            edge_data[f'{interaction_type}_count'] = edge_data.get(f'{interaction_type}_count', 0) + 1
        else:
            # Create new edge
            edge_data = {
                'weight': 1,
                'first_interaction': timestamp,
                'last_interaction': timestamp,
                'mention_count': 1 if interaction_type == 'mention' else 0,
                'reply_count': 1 if interaction_type == 'reply' else 0,
                'quote_count': 1 if interaction_type == 'quote' else 0
            }
            self.interaction_graph.add_edge(from_user, to_user, **edge_data)
        
        # Store in interaction history
        self.interaction_history[from_user].append(interaction)
        
        # Update mention patterns
        if interaction_type == 'mention':
            self.mention_patterns[from_user][to_user] += 1
    
    def analyze_network_changes(self, username: str, days: int = 30) -> Dict:
        """Analyze how user's network interactions have changed"""
        cutoff_date = datetime.utcnow() - timedelta(days=days)
        
        # Get historical interactions for this user
        user_interactions = [
            i for i in self.interaction_history[username]
            if i['timestamp'] > cutoff_date
        ]
        
        if not user_interactions:
            return {'error': 'No recent interaction data'}
        
        # Analyze interaction patterns
        interaction_targets = Counter([i['to'] for i in user_interactions])
        interaction_types = Counter([i['type'] for i in user_interactions])
        
        # Calculate network metrics
        if self.interaction_graph.has_node(username):
            # Centrality measures
            try:
                betweenness = nx.betweenness_centrality(self.interaction_graph)[username]
                closeness = nx.closeness_centrality(self.interaction_graph)[username]
                pagerank = nx.pagerank(self.interaction_graph)[username]
            except:
                betweenness = closeness = pagerank = 0
            
            # Direct network measures
            out_degree = self.interaction_graph.out_degree(username)
            in_degree = self.interaction_graph.in_degree(username)
            
        else:
            betweenness = closeness = pagerank = out_degree = in_degree = 0
        
        # Identify new and frequent interaction partners
        top_interactions = interaction_targets.most_common(10)
        new_interactions = [
            user for user, count in interaction_targets.items()
            if not any(
                i['to'] == user and i['timestamp'] < cutoff_date
                for i in self.interaction_history[username]
            )
        ]
        
        return {
            'username': username,
            'analysis_period_days': days,
            'total_interactions': len(user_interactions),
            'interaction_breakdown': dict(interaction_types),
            'top_interaction_targets': top_interactions,
            'new_interaction_partners': new_interactions[:5],
            'network_metrics': {
                'betweenness_centrality': round(betweenness, 4),
                'closeness_centrality': round(closeness, 4),
                'pagerank': round(pagerank, 6),
                'out_degree': out_degree,
                'in_degree': in_degree
            }
        }
    
    def detect_network_anomalies(self, username: str) -> List[Dict]:
        """Detect unusual network behavior patterns"""
        anomalies = []
        
        # Recent vs historical interaction analysis
        recent_interactions = [
            i for i in self.interaction_history[username]
            if i['timestamp'] > datetime.utcnow() - timedelta(days=7)
        ]
        
        historical_interactions = [
            i for i in self.interaction_history[username]
            if i['timestamp'] < datetime.utcnow() - timedelta(days=7)
        ]
        
        if not recent_interactions or not historical_interactions:
            return anomalies
        
        # Calculate interaction frequency changes
        recent_targets = Counter([i['to'] for i in recent_interactions])
        historical_targets = Counter([i['to'] for i in historical_interactions])
        
        # Normalize by time period
        recent_daily_avg = len(recent_interactions) / 7
        historical_daily_avg = len(historical_interactions) / max(len(historical_interactions) // 10, 1)
        
        # Check for interaction frequency anomalies
        if recent_daily_avg > historical_daily_avg * 3:
            anomalies.append({
                'type': 'interaction_spike',
                'severity': 'high',
                'description': f'Interaction frequency increased {recent_daily_avg/historical_daily_avg:.1f}x',
                'recent_daily_avg': recent_daily_avg,
                'historical_daily_avg': historical_daily_avg
            })
        
        # Check for new interaction partners
        new_partners = set(recent_targets.keys()) - set(historical_targets.keys())
        if len(new_partners) > 10:
            anomalies.append({
                'type': 'new_interaction_partners',
                'severity': 'medium',
                'description': f'{len(new_partners)} new interaction partners',
                'new_partners_count': len(new_partners)
            })
        
        return anomalies

Real-Time Alert Systems

Effective account tracking requires intelligent alerting that provides actionable insights without overwhelming users:

import asyncio
from enum import Enum
from typing import List, Dict, Any, Callable
import json

class AlertSeverity(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

class AlertRule:
    def __init__(self, name: str, severity: AlertSeverity, condition_func: Callable, 
                 description: str, cooldown_minutes: int = 60):
        self.name = name
        self.severity = severity
        self.condition_func = condition_func
        self.description = description
        self.cooldown_minutes = cooldown_minutes
        self.last_triggered = {}
    
    async def check_condition(self, username: str, data: Dict) -> bool:
        """Check if alert condition is met"""
        # Check cooldown
        now = datetime.utcnow()
        if username in self.last_triggered:
            time_since_last = now - self.last_triggered[username]
            if time_since_last.total_seconds() < self.cooldown_minutes * 60:
                return False
        
        # Check condition
        if await self.condition_func(username, data):
            self.last_triggered[username] = now
            return True
        
        return False

class TwitterAlertSystem:
    def __init__(self):
        self.alert_rules = []
        self.alert_handlers = []
        self.alert_history = defaultdict(list)
        self.user_preferences = {}
    
    def add_alert_rule(self, rule: AlertRule):
        """Add new alert rule to the system"""
        self.alert_rules.append(rule)
    
    def add_alert_handler(self, handler: Callable):
        """Add alert handler (e.g., Telegram, email, webhook)"""
        self.alert_handlers.append(handler)
    
    async def process_account_data(self, username: str, activity_data: Dict):
        """Process account data and check for alerts"""
        triggered_alerts = []
        
        for rule in self.alert_rules:
            if await rule.check_condition(username, activity_data):
                alert = {
                    'rule_name': rule.name,
                    'severity': rule.severity.value,
                    'username': username,
                    'description': rule.description,
                    'timestamp': datetime.utcnow().isoformat(),
                    'data': activity_data
                }
                
                triggered_alerts.append(alert)
                self.alert_history[username].append(alert)
        
        # Send alerts through all handlers
        for alert in triggered_alerts:
            for handler in self.alert_handlers:
                try:
                    await handler(alert)
                except Exception as e:
                    logging.error(f"Alert handler failed: {e}")
        
        return triggered_alerts
    
    def set_user_preferences(self, username: str, preferences: Dict):
        """Set alerting preferences for specific user"""
        self.user_preferences[username] = preferences

# Define specific alert conditions
async def unusual_posting_volume(username: str, data: Dict) -> bool:
    """Alert on unusual posting volume"""
    recent_count = data.get('tweets_in_last_hour', 0)
    normal_hourly_rate = data.get('normal_hourly_rate', 1)
    
    return recent_count > normal_hourly_rate * 5

async def engagement_spike(username: str, data: Dict) -> bool:
    """Alert on engagement spike"""
    current_engagement = data.get('current_avg_engagement', 0)
    baseline_engagement = data.get('baseline_avg_engagement', 1)
    
    return current_engagement > baseline_engagement * 3

async def sentiment_flip(username: str, data: Dict) -> bool:
    """Alert on dramatic sentiment change"""
    recent_sentiment = data.get('recent_sentiment_avg', 0)
    baseline_sentiment = data.get('baseline_sentiment_avg', 0)
    
    return abs(recent_sentiment - baseline_sentiment) > 1.0

async def account_compromise_indicators(username: str, data: Dict) -> bool:
    """Alert on potential account compromise"""
    indicators = 0
    
    # Unusual posting time
    if data.get('unusual_posting_time', False):
        indicators += 1
    
    # Language change
    if data.get('language_change_detected', False):
        indicators += 2
    
    # Unusual interaction patterns
    if data.get('unusual_interactions', False):
        indicators += 1
    
    # High posting volume
    if data.get('posting_volume_spike', False):
        indicators += 1
    
    return indicators >= 3

# Alert handlers
async def telegram_alert_handler(alert: Dict):
    """Send alert via Telegram"""
    severity_emoji = {
        'low': '💬',
        'medium': '⚠️',
        'high': '🚨',
        'critical': '🔥'
    }
    
    message = f"""
{severity_emoji.get(alert['severity'], '📢')} **ALERT: {alert['rule_name']}**

**Account:** @{alert['username']}
**Severity:** {alert['severity'].upper()}
**Description:** {alert['description']}

**Time:** {alert['timestamp']}
"""
    
    # Implementation would send via Telegram API
    print(f"Telegram Alert: {message}")

async def webhook_alert_handler(alert: Dict):
    """Send alert via webhook"""
    webhook_payload = {
        'type': 'twitter_account_alert',
        'alert': alert,
        'timestamp': alert['timestamp']
    }
    
    # Implementation would POST to webhook URL
    print(f"Webhook Alert: {json.dumps(webhook_payload, indent=2)}")

# Setup alert system
async def setup_alert_system():
    alert_system = TwitterAlertSystem()
    
    # Add alert rules
    alert_system.add_alert_rule(AlertRule(
        name="Unusual Posting Volume",
        severity=AlertSeverity.MEDIUM,
        condition_func=unusual_posting_volume,
        description="Account posting much more frequently than normal",
        cooldown_minutes=30
    ))
    
    alert_system.add_alert_rule(AlertRule(
        name="Engagement Spike",
        severity=AlertSeverity.HIGH,
        condition_func=engagement_spike,
        description="Tweet engagement significantly higher than baseline",
        cooldown_minutes=15
    ))
    
    alert_system.add_alert_rule(AlertRule(
        name="Sentiment Flip",
        severity=AlertSeverity.HIGH,
        condition_func=sentiment_flip,
        description="Dramatic change in posting sentiment detected",
        cooldown_minutes=60
    ))
    
    alert_system.add_alert_rule(AlertRule(
        name="Potential Account Compromise",
        severity=AlertSeverity.CRITICAL,
        condition_func=account_compromise_indicators,
        description="Multiple indicators suggest possible account compromise",
        cooldown_minutes=120
    ))
    
    # Add alert handlers
    alert_system.add_alert_handler(telegram_alert_handler)
    alert_system.add_alert_handler(webhook_alert_handler)
    
    return alert_system
Implementation Strategy

Start with basic activity tracking (tweets, engagement) before implementing advanced features like sentiment analysis and network mapping. Use cloud infrastructure for scalability and implement comprehensive logging for debugging and analysis. Consider using managed services like Redis Cloud for data storage and real-time processing.

Tracking Multiple Accounts at Scale

Monitoring hundreds or thousands of accounts requires sophisticated infrastructure and intelligent resource management:

  1. Implement account prioritization. Not all accounts need the same monitoring intensity. Create tiers based on importance and activity levels.
  2. Use efficient data structures. Time-series databases and in-memory caches optimize performance for high-frequency updates.
  3. Batch processing where possible. Group similar operations to reduce overhead and improve throughput.
  4. Implement circuit breakers. Prevent system overload when external APIs fail or become slow.
  5. Monitor system performance. Track processing latency, memory usage, and API quota consumption.

Privacy and Ethical Considerations

Responsible account tracking requires careful attention to privacy and ethical boundaries:

Conclusion

Real-time Twitter account tracking provides powerful insights for trading, research, security, and business intelligence. The key to successful implementation lies in balancing comprehensive monitoring with system performance, combining multiple analysis techniques for robust insights, and maintaining ethical standards throughout the process.

For teams seeking immediate results without extensive development work, Xanguard provides enterprise-grade account tracking with built-in analytics, anomaly detection, and real-time alerting. The platform handles the technical complexity while providing the insights needed for informed decision-making.

Whether building custom solutions or using existing platforms, effective Twitter account tracking transforms raw social media activity into actionable intelligence that drives better outcomes across crypto trading, security monitoring, and strategic business decisions.

Start Advanced Twitter Tracking

Get comprehensive account monitoring with sentiment analysis, network tracking, and intelligent alerts.