diff --git a/docs/experiment-statistics-cache-design.md b/docs/experiment-statistics-cache-design.md new file mode 100644 index 0000000000..99d7ec979b --- /dev/null +++ b/docs/experiment-statistics-cache-design.md @@ -0,0 +1,1543 @@ +# Experiment Statistics Cache System - Design Document + +## Overview + +This document describes the design for a caching system to improve the performance of experiment and session statistics queries in Open Chat Studio. The current implementation uses expensive subqueries that cause slow page load times, particularly for the chatbot table view. + +## Problem Statement + +The `ChatbotExperimentTableView` in `apps/chatbots/views.py` (lines 163-238) performs expensive subqueries to calculate statistics for each experiment: + +- **Experiment Level**: + - Total session count + - Participant count (unique participants) + - Message count (human messages only) + - Last activity timestamp (last human message) + +- **Session Level** (used in `ChatbotSessionsTableView`): + - Message count (human messages) + - Last activity timestamp (last human message) + - List of experiment version numbers used + +These queries are executed on every page load and become increasingly slow as data volume grows. + +## Design Goals + +1. **Performance**: Reduce query time from seconds to milliseconds +2. **Accuracy**: Balance freshness with performance (near real-time for recent data) +3. **Scalability**: Handle growing data volume through periodic compression +4. **Maintainability**: Simple, understandable code following Django patterns +5. **Reliability**: Graceful degradation if cache is stale or missing + +## Proposed Solution + +### Architecture Overview + +A two-tier caching system with periodic aggregation: + +1. **SQL-based cache tables** storing pre-computed statistics +2. **Periodic background tasks** to update and compress statistics +3. **Hybrid update strategy** combining scheduled updates with live updates for recent data + +## Schema Design: Two Approaches + +### Approach Comparison + +There are two primary approaches for structuring the cache: + +1. **Single-Row Aggregates** (Simpler): One row per experiment/session with total counts +2. **Time-Bucketed Data** (More Powerful): Multiple rows per experiment with time-series data + +#### Quick Comparison Table + +| Aspect | Single-Row | Time-Bucketed | +|--------|-----------|---------------| +| **Query Simplicity** | ✅ Very simple (single row) | ⚠️ Requires SUM across buckets | +| **Storage Efficiency** | ✅ Minimal storage | ⚠️ More storage (multiple buckets) | +| **Compression** | ⚠️ Manual deletion only | ✅ Natural (merge old buckets) | +| **Trend Analysis** | ❌ No historical data | ✅ Built-in time-series | +| **Incremental Updates** | ⚠️ Need full recalc | ✅ Update current bucket only | +| **Fast Totals** | ✅ Single row lookup | ⚠️ Need to sum (or denormalize) | +| **Data Lifecycle** | ⚠️ Manual management | ✅ Automatic aging | +| **Initial Complexity** | ✅ Simple | ⚠️ More complex | + +#### Recommendation + +**Hybrid Approach**: Use time-bucketed storage with denormalized totals: + +- **Time buckets** for historical tracking, trends, and incremental updates +- **Denormalized totals** (materialized view or separate table) for fast lookups +- Best of both worlds: performance + flexibility + +--- + +## Database Schema + +### Option A: Single-Row Aggregates (Original Design) + +#### 1. Experiment Statistics Cache + +```python +# apps/experiments/models.py + +class ExperimentStatistics(BaseTeamModel): + """ + Cached statistics for experiments, updated periodically. + """ + + experiment = models.OneToOneField( + Experiment, + on_delete=models.CASCADE, + related_name='cached_statistics', + primary_key=True + ) + + # Aggregate statistics + total_session_count = models.IntegerField(default=0) + participant_count = models.IntegerField(default=0) + human_message_count = models.IntegerField(default=0) + last_activity_at = models.DateTimeField(null=True, blank=True) + + # Metadata + last_updated_at = models.DateTimeField(auto_now=True, db_index=True) + last_full_refresh_at = models.DateTimeField(null=True, blank=True) + + class Meta: + db_table = 'experiments_experiment_statistics' + indexes = [ + models.Index(fields=['experiment', 'last_updated_at']), + models.Index(fields=['last_activity_at']), + ] + + def __str__(self): + return f"Stats for {self.experiment.name}" +``` + +### 2. Session Statistics Cache + +```python +# apps/experiments/models.py + +class SessionStatistics(BaseTeamModel): + """ + Cached statistics for experiment sessions, updated periodically. + """ + + session = models.OneToOneField( + ExperimentSession, + on_delete=models.CASCADE, + related_name='cached_statistics', + primary_key=True + ) + + # Aggregate statistics + human_message_count = models.IntegerField(default=0) + last_activity_at = models.DateTimeField(null=True, blank=True) + experiment_versions = models.CharField(max_length=500, blank=True) + + # Metadata + last_updated_at = models.DateTimeField(auto_now=True, db_index=True) + is_complete = models.BooleanField( + default=False, + help_text="Whether this session is complete and won't receive more updates" + ) + + class Meta: + db_table = 'experiments_session_statistics' + indexes = [ + models.Index(fields=['session', 'last_updated_at']), + models.Index(fields=['last_activity_at']), + models.Index(fields=['is_complete']), + ] + + def __str__(self): + return f"Stats for Session {self.session.external_id}" +``` + +### 3. Statistics Update Log (Optional, for debugging) + +```python +# apps/experiments/models.py + +class StatisticsUpdateLog(BaseModel): + """ + Log of statistics update operations for monitoring and debugging. + Optional - can be added if needed for troubleshooting. + """ + + update_type = models.CharField( + max_length=50, + choices=[ + ('full_refresh', 'Full Refresh'), + ('incremental', 'Incremental Update'), + ('compression', 'Compression'), + ] + ) + scope = models.CharField( + max_length=50, + choices=[ + ('all_experiments', 'All Experiments'), + ('single_experiment', 'Single Experiment'), + ('all_sessions', 'All Sessions'), + ('single_session', 'Single Session'), + ] + ) + + experiments_updated = models.IntegerField(default=0) + sessions_updated = models.IntegerField(default=0) + duration_seconds = models.FloatField() + errors = models.JSONField(default=list, blank=True) + + class Meta: + db_table = 'experiments_statistics_update_log' + indexes = [ + models.Index(fields=['-created_at']), + ] +``` + +--- + +### Option B: Time-Bucketed Statistics (Recommended) + +This approach stores statistics in time buckets, enabling trend analysis, natural compression, and efficient incremental updates. + +#### 1. Experiment Statistics Buckets + +```python +# apps/experiments/models.py + +class ExperimentStatisticsBucket(BaseTeamModel): + """ + Time-bucketed statistics for experiments. + Enables historical tracking, trends, and efficient updates. + """ + + class BucketSize(models.TextChoices): + HOUR = 'hour', 'Hourly' + DAY = 'day', 'Daily' + WEEK = 'week', 'Weekly' + MONTH = 'month', 'Monthly' + + experiment = models.ForeignKey( + Experiment, + on_delete=models.CASCADE, + related_name='statistics_buckets' + ) + + # Time bucket definition + bucket_size = models.CharField( + max_length=10, + choices=BucketSize.choices, + default=BucketSize.HOUR + ) + bucket_start = models.DateTimeField(db_index=True) + bucket_end = models.DateTimeField(db_index=True) + + # Aggregate statistics for this time period + session_count = models.IntegerField(default=0) + new_participant_count = models.IntegerField( + default=0, + help_text="New participants in this bucket" + ) + human_message_count = models.IntegerField(default=0) + last_activity_at = models.DateTimeField(null=True, blank=True) + + # Metadata + last_updated_at = models.DateTimeField(auto_now=True) + is_finalized = models.BooleanField( + default=False, + help_text="Whether this bucket is complete and won't be updated" + ) + + class Meta: + db_table = 'experiments_experiment_statistics_bucket' + unique_together = [('experiment', 'bucket_start', 'bucket_size')] + indexes = [ + models.Index(fields=['experiment', 'bucket_start', 'bucket_size']), + models.Index(fields=['bucket_start', 'bucket_end']), + models.Index(fields=['is_finalized']), + ] + ordering = ['-bucket_start'] + + def __str__(self): + return f"{self.experiment.name} - {self.bucket_size} ({self.bucket_start.date()})" + + @classmethod + def get_bucket_boundaries(cls, dt, bucket_size): + """Calculate bucket start/end for a given datetime.""" + from django.utils import timezone + + if bucket_size == cls.BucketSize.HOUR: + start = dt.replace(minute=0, second=0, microsecond=0) + end = start + timedelta(hours=1) + elif bucket_size == cls.BucketSize.DAY: + start = dt.replace(hour=0, minute=0, second=0, microsecond=0) + end = start + timedelta(days=1) + elif bucket_size == cls.BucketSize.WEEK: + start = dt - timedelta(days=dt.weekday()) + start = start.replace(hour=0, minute=0, second=0, microsecond=0) + end = start + timedelta(weeks=1) + elif bucket_size == cls.BucketSize.MONTH: + start = dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0) + if start.month == 12: + end = start.replace(year=start.year + 1, month=1) + else: + end = start.replace(month=start.month + 1) + + return start, end + + @classmethod + def get_or_create_bucket(cls, experiment, dt, bucket_size): + """Get or create a bucket for the given experiment and datetime.""" + start, end = cls.get_bucket_boundaries(dt, bucket_size) + + bucket, created = cls.objects.get_or_create( + experiment=experiment, + bucket_size=bucket_size, + bucket_start=start, + defaults={ + 'bucket_end': end, + 'team': experiment.team, + } + ) + return bucket +``` + +#### 2. Session Statistics Buckets + +```python +# apps/experiments/models.py + +class SessionStatisticsBucket(BaseTeamModel): + """ + Time-bucketed statistics for experiment sessions. + """ + + class BucketSize(models.TextChoices): + HOUR = 'hour', 'Hourly' + DAY = 'day', 'Daily' + + session = models.ForeignKey( + ExperimentSession, + on_delete=models.CASCADE, + related_name='statistics_buckets' + ) + + # Time bucket + bucket_size = models.CharField( + max_length=10, + choices=BucketSize.choices, + default=BucketSize.HOUR + ) + bucket_start = models.DateTimeField(db_index=True) + bucket_end = models.DateTimeField(db_index=True) + + # Statistics for this period + human_message_count = models.IntegerField(default=0) + last_activity_at = models.DateTimeField(null=True, blank=True) + + # Metadata + last_updated_at = models.DateTimeField(auto_now=True) + is_finalized = models.BooleanField(default=False) + + class Meta: + db_table = 'experiments_session_statistics_bucket' + unique_together = [('session', 'bucket_start', 'bucket_size')] + indexes = [ + models.Index(fields=['session', 'bucket_start']), + models.Index(fields=['is_finalized']), + ] + + @classmethod + def get_bucket_boundaries(cls, dt, bucket_size): + """Calculate bucket start/end for a given datetime.""" + if bucket_size == cls.BucketSize.HOUR: + start = dt.replace(minute=0, second=0, microsecond=0) + end = start + timedelta(hours=1) + elif bucket_size == cls.BucketSize.DAY: + start = dt.replace(hour=0, minute=0, second=0, microsecond=0) + end = start + timedelta(days=1) + + return start, end +``` + +#### 3. Denormalized Totals (For Fast Access) + +```python +# apps/experiments/models.py + +class ExperimentStatisticsTotals(BaseTeamModel): + """ + Denormalized totals for fast access. + Computed by summing buckets, refreshed periodically. + """ + + experiment = models.OneToOneField( + Experiment, + on_delete=models.CASCADE, + related_name='statistics_totals', + primary_key=True + ) + + # Aggregate totals (sum of all buckets) + total_session_count = models.IntegerField(default=0) + total_participant_count = models.IntegerField(default=0) + total_human_message_count = models.IntegerField(default=0) + last_activity_at = models.DateTimeField(null=True, blank=True) + + # Metadata + last_updated_at = models.DateTimeField(auto_now=True) + oldest_bucket_start = models.DateTimeField(null=True, blank=True) + newest_bucket_end = models.DateTimeField(null=True, blank=True) + + class Meta: + db_table = 'experiments_experiment_statistics_totals' + + def refresh_from_buckets(self): + """Recalculate totals from bucket data.""" + from django.db.models import Sum, Max, Min + + bucket_aggregates = self.experiment.statistics_buckets.aggregate( + total_sessions=Sum('session_count'), + total_messages=Sum('human_message_count'), + last_activity=Max('last_activity_at'), + oldest_bucket=Min('bucket_start'), + newest_bucket=Max('bucket_end'), + ) + + # Get unique participant count (needs special handling) + from apps.experiments.models import ExperimentSession + participant_count = ExperimentSession.objects.filter( + experiment=self.experiment + ).values('participant').distinct().count() + + self.total_session_count = bucket_aggregates['total_sessions'] or 0 + self.total_participant_count = participant_count + self.total_human_message_count = bucket_aggregates['total_messages'] or 0 + self.last_activity_at = bucket_aggregates['last_activity'] + self.oldest_bucket_start = bucket_aggregates['oldest_bucket'] + self.newest_bucket_end = bucket_aggregates['newest_bucket'] + self.save() + +class SessionStatisticsTotals(BaseTeamModel): + """ + Denormalized session totals for fast access. + """ + + session = models.OneToOneField( + ExperimentSession, + on_delete=models.CASCADE, + related_name='statistics_totals', + primary_key=True + ) + + total_human_message_count = models.IntegerField(default=0) + last_activity_at = models.DateTimeField(null=True, blank=True) + experiment_versions = models.CharField(max_length=500, blank=True) + + last_updated_at = models.DateTimeField(auto_now=True) + is_complete = models.BooleanField(default=False) + + class Meta: + db_table = 'experiments_session_statistics_totals' + + def refresh_from_buckets(self): + """Recalculate totals from bucket data.""" + from django.db.models import Sum, Max + + bucket_aggregates = self.session.statistics_buckets.aggregate( + total_messages=Sum('human_message_count'), + last_activity=Max('last_activity_at'), + ) + + # Get experiment versions (stored separately) + from django.contrib.contenttypes.models import ContentType + from apps.annotations.models import CustomTaggedItem + from apps.chat.models import Chat, ChatMessage + + message_ct = ContentType.objects.get_for_model(ChatMessage) + versions = CustomTaggedItem.objects.filter( + content_type=message_ct, + object_id__in=ChatMessage.objects.filter( + chat=self.session.chat + ).values('id'), + tag__category=Chat.MetadataKeys.EXPERIMENT_VERSION, + ).values_list('tag__name', flat=True).distinct().order_by('tag__name') + + self.total_human_message_count = bucket_aggregates['total_messages'] or 0 + self.last_activity_at = bucket_aggregates['last_activity'] + self.experiment_versions = ', '.join(versions) if versions else '' + self.is_complete = self.session.status == SessionStatus.COMPLETE + self.save() +``` + +#### 4. Bucket Compression and Lifecycle + +```python +# apps/experiments/models.py + +class BucketCompressionPolicy(models.Model): + """ + Defines how buckets are compressed over time. + E.g., "After 7 days, compress hourly to daily" + """ + + name = models.CharField(max_length=100) + description = models.TextField(blank=True) + + # Age threshold + age_days = models.IntegerField( + help_text="Compress buckets older than this many days" + ) + + # Compression action + source_bucket_size = models.CharField( + max_length=10, + choices=ExperimentStatisticsBucket.BucketSize.choices + ) + target_bucket_size = models.CharField( + max_length=10, + choices=ExperimentStatisticsBucket.BucketSize.choices + ) + + # Whether to delete source buckets after compression + delete_source = models.BooleanField(default=True) + + # Active status + is_active = models.BooleanField(default=True) + + class Meta: + db_table = 'experiments_bucket_compression_policy' + ordering = ['age_days'] + +# Default policies could be: +# - 7 days: hour -> day +# - 30 days: day -> week +# - 90 days: week -> month +# - 365 days: delete monthly buckets (keep totals only) +``` + +#### Time-Bucketed Design Benefits + +**1. Natural Incremental Updates** +```python +# When a new message arrives, only update the current hour bucket +current_bucket = ExperimentStatisticsBucket.get_or_create_bucket( + experiment, timezone.now(), BucketSize.HOUR +) +current_bucket.human_message_count += 1 +current_bucket.last_activity_at = timezone.now() +current_bucket.save() +``` + +**2. Built-in Trend Analysis** +```python +# Get activity for last 7 days +seven_days_ago = timezone.now() - timedelta(days=7) +daily_buckets = ExperimentStatisticsBucket.objects.filter( + experiment=experiment, + bucket_size=BucketSize.DAY, + bucket_start__gte=seven_days_ago +).order_by('bucket_start') + +# Can easily plot trends, show sparklines, etc. +``` + +**3. Automatic Compression** +```python +# Compress old hourly buckets to daily +from django.db.models import Sum, Max + +hourly_buckets = ExperimentStatisticsBucket.objects.filter( + experiment=experiment, + bucket_size=BucketSize.HOUR, + bucket_start__date=target_date +) + +aggregates = hourly_buckets.aggregate( + total_sessions=Sum('session_count'), + total_messages=Sum('human_message_count'), + last_activity=Max('last_activity_at'), +) + +# Create daily bucket +daily_bucket = ExperimentStatisticsBucket.objects.create( + experiment=experiment, + bucket_size=BucketSize.DAY, + bucket_start=target_date.replace(hour=0, minute=0), + bucket_end=target_date.replace(hour=0, minute=0) + timedelta(days=1), + session_count=aggregates['total_sessions'], + human_message_count=aggregates['total_messages'], + last_activity_at=aggregates['last_activity'], + is_finalized=True, +) + +# Delete hourly buckets +hourly_buckets.delete() +``` + +**4. Flexible Retention** +```python +# Keep different granularities for different time periods: +# - Last 48 hours: hourly buckets +# - Last 30 days: daily buckets +# - Last 12 months: weekly buckets +# - Older: monthly buckets or delete (keep totals only) +``` + +--- + +### Option C: Hybrid (Best of Both Worlds) + +Combine time buckets with denormalized totals: + +**Schema**: +- `ExperimentStatisticsBucket` - Time-series data +- `ExperimentStatisticsTotals` - Denormalized totals (materialized sum of buckets) +- Automatic refresh of totals from buckets + +**Advantages**: +- ✅ Fast total queries (single row lookup) +- ✅ Historical trend data available +- ✅ Efficient incremental updates (update current bucket) +- ✅ Natural compression (merge old buckets) +- ✅ Flexible data lifecycle + +**Complexity**: Moderate (two related tables to manage) + +**Recommendation**: **This is the best long-term solution** - start with Option A for simplicity, then migrate to Option C when trend analysis is needed. + +--- + +## Update Strategies + +### Strategy 1: Scheduled Full Refresh (Recommended Starting Point) + +**Approach**: Periodically recalculate all statistics from scratch. + +**Pros**: +- Simple to implement and understand +- Always accurate (no drift) +- Easy to debug and verify +- No complex incremental logic + +**Cons**: +- Less efficient for large datasets +- Statistics can be stale between refreshes +- May be heavy on database during refresh + +**Implementation**: +```python +# apps/experiments/tasks.py + +@shared_task +def refresh_experiment_statistics(experiment_id=None, full_refresh=True): + """ + Refresh statistics for experiments. + + Args: + experiment_id: If provided, refresh only this experiment. Otherwise, all. + full_refresh: If True, recalculate from scratch. If False, incremental. + """ + from django.db.models import Count, Max, Q + from apps.experiments.models import ( + Experiment, ExperimentStatistics, ExperimentSession + ) + from apps.chat.models import ChatMessage, ChatMessageType + + # Determine which experiments to update + if experiment_id: + experiments = Experiment.objects.filter(id=experiment_id) + else: + experiments = Experiment.objects.filter( + working_version__isnull=True, + is_archived=False + ) + + for experiment in experiments.iterator(chunk_size=100): + # Calculate statistics using efficient aggregation + session_stats = ExperimentSession.objects.filter( + experiment=experiment + ).aggregate( + total_sessions=Count('id'), + unique_participants=Count('participant', distinct=True) + ) + + # Get message statistics + message_stats = ChatMessage.objects.filter( + chat__experiment_session__experiment=experiment, + message_type=ChatMessageType.HUMAN + ).aggregate( + total_messages=Count('id'), + last_message=Max('created_at') + ) + + # Update or create cache entry + ExperimentStatistics.objects.update_or_create( + experiment=experiment, + defaults={ + 'total_session_count': session_stats['total_sessions'] or 0, + 'participant_count': session_stats['unique_participants'] or 0, + 'human_message_count': message_stats['total_messages'] or 0, + 'last_activity_at': message_stats['last_message'], + 'last_full_refresh_at': timezone.now(), + } + ) + +@shared_task +def refresh_session_statistics(session_id=None, mark_complete=False): + """ + Refresh statistics for experiment sessions. + + Args: + session_id: If provided, refresh only this session. Otherwise, all active. + mark_complete: Whether to mark the session as complete (no more updates). + """ + from django.db.models import Count, Max + from django.contrib.contenttypes.models import ContentType + from django.contrib.postgres.aggregates import StringAgg + from apps.experiments.models import ( + ExperimentSession, SessionStatistics + ) + from apps.chat.models import Chat, ChatMessage, ChatMessageType + from apps.annotations.models import CustomTaggedItem + + # Determine which sessions to update + if session_id: + sessions = ExperimentSession.objects.filter(id=session_id) + else: + # Update only active sessions (not complete) + sessions = ExperimentSession.objects.exclude( + cached_statistics__is_complete=True + ) + + message_ct = ContentType.objects.get_for_model(ChatMessage) + + for session in sessions.iterator(chunk_size=500): + # Get message statistics + message_stats = ChatMessage.objects.filter( + chat=session.chat, + message_type=ChatMessageType.HUMAN + ).aggregate( + total_messages=Count('id'), + last_message=Max('created_at') + ) + + # Get experiment versions used + versions = CustomTaggedItem.objects.filter( + content_type=message_ct, + object_id__in=ChatMessage.objects.filter( + chat=session.chat + ).values('id'), + tag__category=Chat.MetadataKeys.EXPERIMENT_VERSION, + ).values_list('tag__name', flat=True).distinct().order_by('tag__name') + + versions_str = ', '.join(versions) if versions else '' + + # Determine if session is complete + is_complete = mark_complete or ( + session.status == SessionStatus.COMPLETE + ) + + # Update or create cache entry + SessionStatistics.objects.update_or_create( + session=session, + defaults={ + 'human_message_count': message_stats['total_messages'] or 0, + 'last_activity_at': message_stats['last_message'], + 'experiment_versions': versions_str, + 'is_complete': is_complete, + } + ) +``` + +**Schedule Configuration**: +```python +# config/settings.py + +SCHEDULED_TASKS = { + # ... existing tasks ... + + "experiments.tasks.refresh_all_experiment_statistics": { + "task": "apps.experiments.tasks.refresh_all_experiment_statistics", + "schedule": timedelta(minutes=5), # Every 5 minutes + }, + "experiments.tasks.refresh_active_session_statistics": { + "task": "apps.experiments.tasks.refresh_active_session_statistics", + "schedule": timedelta(minutes=2), # Every 2 minutes for active sessions + }, + "experiments.tasks.cleanup_old_statistics_logs": { + "task": "apps.experiments.tasks.cleanup_old_statistics_logs", + "schedule": timedelta(days=7), # Weekly cleanup + }, +} +``` + +### Strategy 2: Incremental Updates (Future Enhancement) + +**Approach**: Track changes and update only affected statistics. + +**Pros**: +- More efficient for large datasets +- Fresher data +- Lower database load per update + +**Cons**: +- More complex implementation +- Risk of drift over time +- Requires change tracking +- Harder to debug + +**Implementation Notes**: +- Use Django signals on `ChatMessage` creation to update session statistics +- Use Django signals on `ExperimentSession` creation to update experiment statistics +- Combine with periodic full refresh to prevent drift +- Add `last_full_refresh_at` timestamp to detect drift + +**Signal Example**: +```python +# apps/experiments/signals.py + +from django.db.models.signals import post_save +from django.dispatch import receiver +from apps.chat.models import ChatMessage, ChatMessageType + +@receiver(post_save, sender=ChatMessage) +def update_statistics_on_message(sender, instance, created, **kwargs): + """Update statistics when a new human message is created.""" + if not created or instance.message_type != ChatMessageType.HUMAN: + return + + # Queue a task to update session and experiment statistics + from apps.experiments.tasks import update_statistics_incremental + update_statistics_incremental.delay( + session_id=instance.chat.experiment_session_id + ) +``` + +### Strategy 3: Hybrid Approach (Recommended for Scale) + +**Approach**: Combine scheduled refresh for old data with live updates for recent data. + +**Pros**: +- Best balance of performance and freshness +- Scalable to large datasets +- Recent data always fresh +- Historical data updated periodically + +**Cons**: +- Most complex to implement +- Requires careful coordination + +**Implementation**: +```python +@shared_task +def hybrid_refresh_statistics(): + """ + Hybrid refresh strategy: + 1. Live update active sessions (last 24 hours) + 2. Periodic refresh for older sessions + 3. Full refresh weekly + """ + from datetime import timedelta + from django.utils import timezone + + cutoff = timezone.now() - timedelta(hours=24) + + # Update recent sessions immediately + recent_sessions = ExperimentSession.objects.filter( + created_at__gte=cutoff + ).values_list('id', flat=True) + + for session_id in recent_sessions: + refresh_session_statistics.delay(session_id) + + # Update experiments with recent activity + recent_experiments = Experiment.objects.filter( + sessions__created_at__gte=cutoff + ).distinct().values_list('id', flat=True) + + for exp_id in recent_experiments: + refresh_experiment_statistics.delay(exp_id) +``` + +## Data Compression Strategy + +As data volume grows, older statistics can be "compressed" to reduce storage: + +### Time-based Compression + +**Approach**: Delete or archive session statistics for very old, completed sessions. + +**Implementation**: +```python +@shared_task +def compress_old_statistics(): + """ + Archive or delete statistics for old completed sessions. + Keep experiment-level aggregates, remove session-level detail. + """ + from datetime import timedelta + from django.utils import timezone + from apps.experiments.models import SessionStatistics + + # Define retention policy: keep session details for 90 days + cutoff = timezone.now() - timedelta(days=90) + + # Delete session statistics older than cutoff + # Experiment statistics remain intact + old_stats = SessionStatistics.objects.filter( + is_complete=True, + session__ended_at__lt=cutoff + ) + + count = old_stats.count() + old_stats.delete() + + logger.info(f"Compressed {count} old session statistics") +``` + +### Aggregation Compression + +**Approach**: For very old experiments, aggregate statistics by time period. + +**Schema** (optional): +```python +class ExperimentStatisticsArchive(BaseTeamModel): + """ + Time-bucketed historical statistics for experiments. + Used for long-term trends without session-level detail. + """ + + experiment = models.ForeignKey(Experiment, on_delete=models.CASCADE) + period_start = models.DateField() + period_end = models.DateField() + period_type = models.CharField( + max_length=20, + choices=[('day', 'Daily'), ('week', 'Weekly'), ('month', 'Monthly')] + ) + + session_count = models.IntegerField() + participant_count = models.IntegerField() + message_count = models.IntegerField() + + class Meta: + unique_together = ('experiment', 'period_start', 'period_type') +``` + +## View Integration + +### Update `ChatbotExperimentTableView` + +**Before** (current slow query): +```python +def get_table_data(self): + queryset = super().get_table_data() + + # Expensive subqueries + queryset = queryset.annotate( + session_count=Subquery(session_count_subquery, output_field=IntegerField()), + participant_count=Subquery(participant_count_subquery, output_field=IntegerField()), + messages_count=Subquery(messages_count_subquery, output_field=IntegerField()), + last_message=Subquery(last_message_subquery, output_field=DateTimeField()), + ).order_by(F("last_message").desc(nulls_last=True)) + return queryset +``` + +**After** (using cache): +```python +def get_table_data(self): + queryset = super().get_table_data() + + # Use cached statistics + queryset = queryset.select_related('cached_statistics').annotate( + session_count=F('cached_statistics__total_session_count'), + participant_count=F('cached_statistics__participant_count'), + messages_count=F('cached_statistics__human_message_count'), + last_message=F('cached_statistics__last_activity_at'), + ).order_by(F("last_message").desc(nulls_last=True)) + return queryset +``` + +### Update `ChatbotSessionsTableView` + +**Before**: +```python +def get_table_data(self): + queryset = super().get_table_data() + return queryset.annotate_with_message_count().annotate_with_last_message_created_at() +``` + +**After**: +```python +def get_table_data(self): + queryset = super().get_table_data() + return queryset.select_related('cached_statistics').annotate( + message_count=F('cached_statistics__human_message_count'), + last_message_created_at=F('cached_statistics__last_activity_at'), + experiment_versions=F('cached_statistics__experiment_versions'), + ) +``` + +## Graceful Degradation + +Handle missing or stale cache entries gracefully: + +```python +# apps/experiments/utils.py + +def get_experiment_statistics(experiment): + """ + Get statistics for an experiment, using cache if available, + falling back to live calculation if not. + """ + try: + stats = experiment.cached_statistics + + # Check if cache is too old (> 1 hour) + if stats.last_updated_at < timezone.now() - timedelta(hours=1): + # Queue refresh but return cached data + from apps.experiments.tasks import refresh_experiment_statistics + refresh_experiment_statistics.delay(experiment.id) + + return stats + except ExperimentStatistics.DoesNotExist: + # Cache missing - calculate live and queue cache creation + from apps.experiments.tasks import refresh_experiment_statistics + refresh_experiment_statistics.delay(experiment.id) + + # Return live calculation + return calculate_experiment_statistics_live(experiment) + +def calculate_experiment_statistics_live(experiment): + """Fallback: calculate statistics without cache.""" + # Use the original subquery logic + # Return a dict or object with the same interface + pass +``` + +## Migration Plan + +### Phase 1: Foundation (Week 1) +1. Create new models (`ExperimentStatistics`, `SessionStatistics`) +2. Write and test migration +3. Create basic refresh tasks +4. Add management command for manual refresh + +### Phase 2: Integration (Week 2) +1. Update views to use cached statistics +2. Add graceful degradation logic +3. Set up periodic tasks +4. Monitor performance improvements + +### Phase 3: Optimization (Week 3) +1. Fine-tune refresh intervals +2. Implement compression for old data +3. Add monitoring/alerting for cache staleness +4. Optimize queries based on real usage + +### Phase 4: Enhancement (Future) +1. Implement incremental updates (signals) +2. Add hybrid refresh strategy +3. Implement statistics archive for historical trends +4. Add admin UI for cache management + +## Management Commands + +Provide CLI tools for managing the cache: + +```python +# apps/experiments/management/commands/refresh_statistics.py + +class Command(BaseCommand): + help = 'Refresh experiment and session statistics cache' + + def add_arguments(self, parser): + parser.add_argument( + '--experiment-id', + type=int, + help='Refresh specific experiment' + ) + parser.add_argument( + '--session-id', + type=int, + help='Refresh specific session' + ) + parser.add_argument( + '--full', + action='store_true', + help='Full refresh of all statistics' + ) + parser.add_argument( + '--async', + action='store_true', + help='Run refresh tasks asynchronously via Celery' + ) + + def handle(self, *args, **options): + if options['async']: + # Queue Celery tasks + if options['experiment_id']: + refresh_experiment_statistics.delay(options['experiment_id']) + else: + refresh_all_experiment_statistics.delay() + else: + # Run synchronously + if options['experiment_id']: + refresh_experiment_statistics(options['experiment_id']) + else: + refresh_all_experiment_statistics() +``` + +## Monitoring and Alerting + +Track cache health and performance: + +### Metrics to Monitor + +1. **Cache Hit Rate**: % of requests using cache vs. live calculation +2. **Cache Staleness**: Age of cached data +3. **Refresh Duration**: Time taken to refresh statistics +4. **Query Performance**: Compare cached vs. non-cached query times +5. **Error Rate**: Failed cache updates + +### Implementation + +```python +# apps/experiments/monitoring.py + +import logging +from django.utils import timezone + +logger = logging.getLogger('ocs.experiments.cache') + +def log_cache_performance(operation, duration, records_updated, errors=None): + """Log cache operation performance.""" + logger.info( + f"Cache {operation} completed", + extra={ + 'operation': operation, + 'duration_seconds': duration, + 'records_updated': records_updated, + 'errors': errors or [], + 'timestamp': timezone.now(), + } + ) +``` + +### Admin Dashboard (Future) + +```python +# apps/experiments/admin.py + +@admin.register(ExperimentStatistics) +class ExperimentStatisticsAdmin(admin.ModelAdmin): + list_display = [ + 'experiment', + 'total_session_count', + 'participant_count', + 'human_message_count', + 'last_activity_at', + 'last_updated_at', + 'cache_age', + ] + list_filter = ['last_updated_at', 'last_full_refresh_at'] + search_fields = ['experiment__name'] + readonly_fields = [ + 'last_updated_at', + 'last_full_refresh_at', + 'cache_age', + ] + + def cache_age(self, obj): + """Display age of cached data.""" + if obj.last_updated_at: + age = timezone.now() - obj.last_updated_at + return f"{age.total_seconds() / 60:.1f} minutes" + return "Never" + cache_age.short_description = "Cache Age" + + actions = ['refresh_statistics'] + + def refresh_statistics(self, request, queryset): + """Admin action to refresh selected statistics.""" + for stats in queryset: + refresh_experiment_statistics.delay(stats.experiment_id) + self.message_user( + request, + f"Queued refresh for {queryset.count()} experiments" + ) + refresh_statistics.short_description = "Refresh selected statistics" +``` + +## Testing Strategy + +### Unit Tests + +```python +# apps/experiments/tests/test_statistics_cache.py + +class TestExperimentStatistics(TestCase): + def setUp(self): + self.team = TeamFactory() + self.experiment = ExperimentFactory(team=self.team) + + def test_statistics_calculation(self): + """Test accurate statistics calculation.""" + # Create test data + session1 = ExperimentSessionFactory(experiment=self.experiment) + session2 = ExperimentSessionFactory(experiment=self.experiment) + + ChatMessageFactory.create_batch( + 5, + chat=session1.chat, + message_type=ChatMessageType.HUMAN + ) + ChatMessageFactory.create_batch( + 3, + chat=session2.chat, + message_type=ChatMessageType.HUMAN + ) + + # Refresh statistics + refresh_experiment_statistics(self.experiment.id) + + # Verify + stats = self.experiment.cached_statistics + assert stats.total_session_count == 2 + assert stats.human_message_count == 8 + + def test_statistics_update_on_new_message(self): + """Test incremental update when new message arrives.""" + # Test for Strategy 2/3 + pass + + def test_graceful_degradation(self): + """Test fallback when cache is missing.""" + # Delete cache + ExperimentStatistics.objects.all().delete() + + # Query should still work + stats = get_experiment_statistics(self.experiment) + assert stats is not None +``` + +### Performance Tests + +```python +class TestCachePerformance(TestCase): + def test_query_performance_improvement(self): + """Verify cache improves query performance.""" + # Create large dataset + experiments = ExperimentFactory.create_batch(100) + for exp in experiments: + sessions = ExperimentSessionFactory.create_batch(10, experiment=exp) + for session in sessions: + ChatMessageFactory.create_batch( + 20, + chat=session.chat, + message_type=ChatMessageType.HUMAN + ) + + # Refresh cache + refresh_all_experiment_statistics() + + # Measure uncached query + with self.assertNumQueries(100): # Many queries + list(Experiment.objects.all().annotate(...)) # Old way + + # Measure cached query + with self.assertNumQueries(1): # Single query + list(Experiment.objects.all().select_related('cached_statistics')) +``` + +## Configuration Options + +Allow customization through Django settings: + +```python +# config/settings.py + +# Statistics Cache Configuration +STATISTICS_CACHE = { + # Refresh intervals (in seconds) + 'EXPERIMENT_REFRESH_INTERVAL': 300, # 5 minutes + 'SESSION_REFRESH_INTERVAL': 120, # 2 minutes + + # Compression settings + 'SESSION_RETENTION_DAYS': 90, + 'ENABLE_COMPRESSION': True, + + # Update strategy + 'UPDATE_STRATEGY': 'scheduled', # 'scheduled', 'incremental', 'hybrid' + + # Performance + 'BATCH_SIZE': 100, + 'REFRESH_TIMEOUT': 300, # seconds + + # Monitoring + 'LOG_PERFORMANCE': True, + 'ALERT_ON_STALE_CACHE': True, + 'STALE_THRESHOLD_MINUTES': 60, +} +``` + +## Risks and Mitigations + +### Risk 1: Cache Drift +**Issue**: Statistics become inaccurate over time. + +**Mitigation**: +- Regular full refreshes (daily/weekly) +- Track `last_full_refresh_at` timestamp +- Monitoring and alerting for drift +- Validation tests comparing cache vs. live + +### Risk 2: Database Load During Refresh +**Issue**: Full refresh may spike database load. + +**Mitigation**: +- Use `.iterator()` for memory efficiency +- Process in batches +- Run during off-peak hours +- Add query timeouts +- Use database connection pooling + +### Risk 3: Complexity +**Issue**: More code to maintain. + +**Mitigation**: +- Start with simple Strategy 1 +- Comprehensive testing +- Good documentation +- Management commands for manual control +- Graceful degradation + +### Risk 4: Storage Growth +**Issue**: Cache tables grow over time. + +**Mitigation**: +- Implement compression strategy +- Archive old statistics +- Define retention policies +- Monitor table sizes + +## Performance Expectations + +### Current Performance (Estimated) +- Table load time: 5-30 seconds (with 1000+ experiments) +- Database queries per page: 100+ +- Memory usage: High (large result sets) + +### Expected Performance (With Cache) +- Table load time: 0.5-2 seconds +- Database queries per page: 2-5 +- Memory usage: Low (simple joins) + +### ROI Calculation +``` +Time saved per page load: ~10 seconds +Average page loads per day: 100 +Daily time saved: 1,000 seconds (~17 minutes) +Weekly time saved: ~2 hours +``` + +## Alternative Approaches Considered + +### 1. Redis Cache +**Pros**: Very fast, built-in TTL +**Cons**: +- Data not queryable +- Extra infrastructure +- Memory limits +- Doesn't solve filtering/sorting issues + +### 2. Materialized Views +**Pros**: Database-native, automatic updates (with triggers) +**Cons**: +- PostgreSQL-specific +- Less flexible +- Harder to customize refresh logic +- Can't easily add metadata (compression, update logs) + +### 3. Denormalized Columns on Experiment +**Pros**: Simpler schema +**Cons**: +- Clutters main model +- Harder to manage/update +- Can't track update metadata +- Limited flexibility for compression + +## Recommendation + +### Phased Implementation Approach + +**Phase 1: Start Simple (Option A - Single-Row Aggregates)** + +Implement the simple single-row cache first: +1. Create `ExperimentStatistics` and `SessionStatistics` tables +2. Implement scheduled full refresh (Strategy 1) +3. Update views to use cached data +4. Measure performance improvements + +**Timeline**: 1-2 weeks +**Benefits**: +- ✅ Immediate 10-50x performance improvement +- ✅ Low implementation risk +- ✅ Simple to understand and debug +- ✅ Quick to deploy + +**Phase 2: Add Intelligence (If Needed)** + +If requirements emerge for trend analysis or better incremental updates: +1. Migrate to time-bucketed design (Option B/C) +2. Backfill historical buckets from existing data +3. Implement bucket compression policies +4. Add trend visualization + +**Timeline**: 2-3 weeks +**Benefits**: +- ✅ Historical trend data available +- ✅ More efficient incremental updates +- ✅ Natural data compression +- ✅ Enables time-series analysis + +### Decision Matrix + +**Choose Option A (Single-Row)** if: +- ✅ You only need current totals (no trends) +- ✅ You want the simplest solution +- ✅ You can tolerate full recalculation +- ✅ Storage optimization isn't critical yet + +**Choose Option C (Time-Bucketed + Totals)** if: +- ✅ You need trend analysis / historical data +- ✅ You have very large data volumes (millions of messages) +- ✅ You want efficient incremental updates +- ✅ You need flexible data retention policies +- ✅ You already have requirements for activity charts/graphs + +### Final Recommendation + +**Start with Option A** because: +1. It solves the immediate performance problem +2. It's the simplest to implement and test +3. It can be migrated to Option C later if needed +4. The performance improvement alone justifies the effort + +**Migrate to Option C when**: +- You need to show activity trends over time +- Data volume requires more efficient compression +- You want to add activity charts/sparklines to the UI +- You need more granular activity tracking + +This approach provides: +- ✅ Immediate performance improvements (Option A) +- ✅ Simple, maintainable code initially +- ✅ Low risk, incremental delivery +- ✅ Clear migration path to advanced features (Option C) +- ✅ Foundation for future enhancements + +## Success Metrics + +1. **Performance**: Table load time < 2 seconds +2. **Accuracy**: Cache never more than 5 minutes stale +3. **Reliability**: 99.9% uptime for cache refresh tasks +4. **Scalability**: Handles 10,000+ experiments without degradation + +## References + +- Current slow query: `apps/chatbots/views.py:175-238` +- Session annotations: `apps/experiments/models.py:1662-1700` +- Existing cache pattern: `apps/dashboard/models.py:9-46` +- Celery periodic tasks: `config/settings.py:460-494` +- Team-based model pattern: `apps/teams/models.py` + +## Appendix: Sample Queries + +### Current Slow Query +```python +# Experiment-level subquery (current) +messages_count_subquery = ( + ChatMessage.objects.filter( + chat__experiment_session__experiment_id=OuterRef("pk") + ) + .values("chat__experiment_session__experiment_id") + .annotate(count=Count("id")) + .values("count") +) +``` + +### Cached Query +```python +# Experiment-level with cache (proposed) +experiments = Experiment.objects.filter( + team=team, + working_version__isnull=True +).select_related('cached_statistics').annotate( + session_count=F('cached_statistics__total_session_count'), + messages_count=F('cached_statistics__human_message_count'), +) +``` + +### EXPLAIN ANALYZE Comparison + +**Before** (estimated): +``` +Seq Scan on experiments_experiment (cost=0..5000.00 rows=1000) + SubPlan 1 + -> Aggregate (cost=100..120) + -> Seq Scan on experiments_session (cost=0..100) + SubPlan 2 + -> Aggregate (cost=500..550) + -> Nested Loop (cost=0..500) + -> Seq Scan on experiments_session + -> Seq Scan on chat_chatmessage +Planning time: 5 ms +Execution time: 15000 ms +``` + +**After** (estimated): +``` +Hash Join (cost=20..100 rows=1000) + -> Seq Scan on experiments_experiment (cost=0..50) + -> Hash (cost=20..20 rows=1000) + -> Seq Scan on experiments_statistics (cost=0..20) +Planning time: 1 ms +Execution time: 50 ms +``` + +## Conclusion + +This design document presents a comprehensive solution for caching experiment statistics with two complementary approaches: + +### Option A: Single-Row Aggregates +A simple, proven approach that: +- ✅ **Solves the immediate problem** with 10-50x performance improvement +- ✅ **Minimal complexity** - easy to implement, test, and maintain +- ✅ **Quick delivery** - can be implemented in 1-2 weeks +- ✅ **Low risk** - straightforward logic, easy to debug + +### Option C: Time-Bucketed + Totals +A sophisticated approach that: +- ✅ **Enables trend analysis** with built-in time-series data +- ✅ **Scales better** with natural compression and efficient incremental updates +- ✅ **Flexible data lifecycle** - automatic aging and retention policies +- ✅ **Future-proof** - supports advanced features like activity charts + +### Core Benefits (Both Options) + +1. **Dramatic performance improvements** through pre-computed SQL caches +2. **Maintains accuracy** through scheduled and/or incremental refreshes +3. **Scales efficiently** with smart compression and retention policies +4. **Follows Django patterns** and existing codebase conventions +5. **Graceful degradation** when cache is missing or stale +6. **Clear migration path** from simple to advanced + +### Recommended Path Forward + +1. **Start with Option A** for immediate wins (1-2 weeks) +2. **Measure and validate** performance improvements +3. **Migrate to Option C** when trend analysis or advanced features are needed (2-3 weeks) + +This phased approach delivers immediate value while building a foundation for future enhancements, all while minimizing risk and complexity. diff --git a/docs/time-bucketed-statistics-implementation-plan.md b/docs/time-bucketed-statistics-implementation-plan.md new file mode 100644 index 0000000000..e589e398e5 --- /dev/null +++ b/docs/time-bucketed-statistics-implementation-plan.md @@ -0,0 +1,2102 @@ +# Time-Bucketed Statistics Implementation Plan + +## Overview + +This document provides a complete implementation plan for caching experiment and session statistics using time-bucketed storage with automatic compression. This approach stores statistics in time buckets (hourly/daily/monthly) that are automatically compressed over time, providing both performance and historical data retention. + +## Design Decisions + +### Key Performance Optimizations + +This implementation uses SQL-based aggregation throughout to minimize Python loops and database queries: + +1. **Manager Methods**: Participant counts computed by summing `new_participant_count` from buckets (not querying sessions) +2. **Update Tasks**: All statistics aggregated in SQL keyed by truncated date (hourly/daily/monthly) +3. **Participant Counting**: Uses `ParticipantData` model for accurate unique participant tracking per experiment +4. **Compression Tasks**: Filter old buckets directly in SQL using date comparisons (no Python loops) +5. **Date Calculations**: Uses `dateutil.relativedelta` for accurate relative date calculations +6. **Bulk Operations**: Uses `bulk_update()` to minimize database round-trips +7. **Experiment Versions**: Cached directly on session model, avoiding temporal redundancy in time buckets + +### Key Simplifications + +1. **No Denormalized Totals Tables**: Query totals by summing across buckets (typically 1-30 buckets) +2. **Static Compression Policy**: Hardcoded policy instead of configurable database table +3. **No Deletion**: Compress old buckets into larger ones, never delete +4. **Simple Bucket Structure**: Just experiment and session buckets + +### Compression Policy (Static) + +```python +# Retention policy (hardcoded in settings or code): +# - 0-24 hours: Hourly buckets +# - 1-30 days: Daily buckets +# - 30+ days: Monthly buckets +# - Never delete, only compress +``` + +### Trade-offs + +#### Benefits + +- ✅ No sync issues between buckets and totals +- ✅ Simpler schema (2 tables instead of 4+) +- ✅ Historical data naturally preserved +- ✅ Easy to understand and debug +- ✅ Flexible querying (can sum any time range) + +#### Considerations + +- ⚠️ Totals require SUM() across buckets (typically 1-30 rows) +- ⚠️ Slightly slower than single-row lookup, but still fast enough + +--- + +## Database Schema + +### 1. Experiment Statistics Buckets + +```python +# apps/experiments/models.py + +from datetime import timedelta +from django.db import models +from django.utils import timezone +from apps.teams.models import BaseTeamModel + + +class ExperimentStatisticsBucket(BaseTeamModel): + """ + Time-bucketed statistics for experiments. + + Buckets are automatically compressed over time: + - Recent data (0-24h): hourly buckets + - Medium-term (1-30 days): daily buckets + - Long-term (30+ days): monthly buckets + """ + + class BucketSize(models.TextChoices): + HOUR = 'hour', 'Hourly' + DAY = 'day', 'Daily' + MONTH = 'month', 'Monthly' + + experiment = models.ForeignKey( + 'experiments.Experiment', + on_delete=models.CASCADE, + related_name='statistics_buckets' + ) + + # Time bucket definition + bucket_size = models.CharField( + max_length=10, + choices=BucketSize.choices, + default=BucketSize.HOUR + ) + bucket_start = models.DateTimeField(db_index=True) + bucket_end = models.DateTimeField(db_index=True) + + # Aggregate statistics for this time period + session_count = models.IntegerField(default=0) + new_participant_count = models.IntegerField( + default=0, + help_text="Participants whose first session was created in this bucket" + ) + human_message_count = models.IntegerField(default=0) + last_activity_at = models.DateTimeField( + null=True, + blank=True, + help_text="Timestamp of last human message in this bucket" + ) + + # Metadata + last_updated_at = models.DateTimeField(auto_now=True) + + class Meta: + db_table = 'experiments_experiment_statistics_bucket' + unique_together = [('experiment', 'bucket_start', 'bucket_size')] + indexes = [ + # unique_together already creates an index on (experiment, bucket_start, bucket_size) + # Add index for time-range queries + models.Index(fields=['bucket_start']), + ] + ordering = ['-bucket_start'] + + def __str__(self): + return f"{self.experiment.name} - {self.bucket_size} ({self.bucket_start.date()})" + + @classmethod + def get_bucket_boundaries(cls, dt, bucket_size): + """Calculate bucket start/end for a given datetime.""" + if bucket_size == cls.BucketSize.HOUR: + start = dt.replace(minute=0, second=0, microsecond=0) + end = start + timedelta(hours=1) + elif bucket_size == cls.BucketSize.DAY: + start = dt.replace(hour=0, minute=0, second=0, microsecond=0) + end = start + timedelta(days=1) + elif bucket_size == cls.BucketSize.MONTH: + start = dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0) + if start.month == 12: + end = start.replace(year=start.year + 1, month=1) + else: + end = start.replace(month=start.month + 1) + else: + raise ValueError(f"Invalid bucket_size: {bucket_size}") + + return start, end + + @classmethod + def get_or_create_bucket(cls, experiment, dt, bucket_size): + """Get or create a bucket for the given experiment and datetime.""" + start, end = cls.get_bucket_boundaries(dt, bucket_size) + + bucket, created = cls.objects.get_or_create( + experiment=experiment, + bucket_size=bucket_size, + bucket_start=start, + defaults={ + 'bucket_end': end, + 'team': experiment.team, + } + ) + return bucket, created + + +class ExperimentStatisticsBucketManager(models.Manager): + """Manager with helper methods for querying statistics.""" + + def get_totals_for_experiment(self, experiment): + """ + Get aggregated totals for an experiment across all buckets. + Returns dict with total_sessions, total_participants, total_messages, last_activity. + """ + from django.db.models import Sum, Max + + aggregates = self.filter(experiment=experiment).aggregate( + total_sessions=Sum('session_count'), + total_participants=Sum('new_participant_count'), + total_messages=Sum('human_message_count'), + last_activity=Max('last_activity_at'), + ) + + return { + 'total_sessions': aggregates['total_sessions'] or 0, + 'total_participants': aggregates['total_participants'] or 0, + 'total_messages': aggregates['total_messages'] or 0, + 'last_activity': aggregates['last_activity'], + } + + def get_totals_for_experiments(self, experiment_ids): + """ + Get totals for multiple experiments efficiently. + Returns dict mapping experiment_id -> totals dict. + """ + from django.db.models import Sum, Max + + # Get bucket aggregates per experiment (including participant count from buckets) + bucket_data = self.filter( + experiment_id__in=experiment_ids + ).values('experiment_id').annotate( + total_sessions=Sum('session_count'), + total_participants=Sum('new_participant_count'), + total_messages=Sum('human_message_count'), + last_activity=Max('last_activity_at'), + ) + + # Build results dict + results = {} + for bucket in bucket_data: + results[bucket['experiment_id']] = { + 'total_sessions': bucket['total_sessions'] or 0, + 'total_participants': bucket['total_participants'] or 0, + 'total_messages': bucket['total_messages'] or 0, + 'last_activity': bucket['last_activity'], + } + + return results + + +# Add the manager to the model +ExperimentStatisticsBucket.add_to_class('objects', ExperimentStatisticsBucketManager()) +``` + +### 2. Cached Experiment Versions on Session + +Add a field to the `ExperimentSession` model to cache experiment versions: + +```python +# apps/experiments/models.py + +from django.contrib.postgres.fields import ArrayField + +class ExperimentSession(BaseTeamModel): + # ... existing fields ... + + # Cached statistics + cached_experiment_versions = ArrayField( + models.CharField(max_length=50), + default=list, + blank=True, + help_text="Cached list of unique experiment version numbers used in this session" + ) +``` + +This field stores all unique experiment versions seen in the session, updated periodically by the statistics tasks. + +### 3. Session Statistics Buckets + +```python +# apps/experiments/models.py + +class SessionStatisticsBucket(BaseTeamModel): + """ + Time-bucketed statistics for experiment sessions. + + Similar compression policy to experiments: + - Recent: hourly buckets + - Medium-term: daily buckets + """ + + class BucketSize(models.TextChoices): + HOUR = 'hour', 'Hourly' + DAY = 'day', 'Daily' + + session = models.ForeignKey( + 'experiments.ExperimentSession', + on_delete=models.CASCADE, + related_name='statistics_buckets' + ) + + # Time bucket + bucket_size = models.CharField( + max_length=10, + choices=BucketSize.choices, + default=BucketSize.HOUR + ) + bucket_start = models.DateTimeField(db_index=True) + bucket_end = models.DateTimeField(db_index=True) + + # Statistics for this period + human_message_count = models.IntegerField(default=0) + last_activity_at = models.DateTimeField( + null=True, + blank=True, + help_text="Timestamp of last human message in this bucket" + ) + + # Metadata + last_updated_at = models.DateTimeField(auto_now=True) + + class Meta: + db_table = 'experiments_session_statistics_bucket' + unique_together = [('session', 'bucket_start', 'bucket_size')] + indexes = [ + # unique_together already creates an index on (session, bucket_start, bucket_size) + models.Index(fields=['bucket_start']), + ] + ordering = ['-bucket_start'] + + def __str__(self): + return f"Session {self.session.external_id} - {self.bucket_size} ({self.bucket_start.date()})" + + @classmethod + def get_bucket_boundaries(cls, dt, bucket_size): + """Calculate bucket start/end for a given datetime.""" + if bucket_size == cls.BucketSize.HOUR: + start = dt.replace(minute=0, second=0, microsecond=0) + end = start + timedelta(hours=1) + elif bucket_size == cls.BucketSize.DAY: + start = dt.replace(hour=0, minute=0, second=0, microsecond=0) + end = start + timedelta(days=1) + else: + raise ValueError(f"Invalid bucket_size: {bucket_size}") + + return start, end + + @classmethod + def get_or_create_bucket(cls, session, dt, bucket_size): + """Get or create a bucket for the given session and datetime.""" + start, end = cls.get_bucket_boundaries(dt, bucket_size) + + bucket, created = cls.objects.get_or_create( + session=session, + bucket_size=bucket_size, + bucket_start=start, + defaults={ + 'bucket_end': end, + 'team': session.team, + } + ) + return bucket, created + + +class SessionStatisticsBucketManager(models.Manager): + """Manager with helper methods for querying session statistics.""" + + def get_totals_for_session(self, session): + """ + Get aggregated totals for a session across all buckets. + Returns dict with total_messages, last_activity, experiment_versions. + """ + from django.db.models import Sum, Max + + aggregates = self.filter(session=session).aggregate( + total_messages=Sum('human_message_count'), + last_activity=Max('last_activity_at'), + ) + + # Get experiment versions from session's cached field + experiment_versions = ', '.join(session.cached_experiment_versions) if session.cached_experiment_versions else '' + + return { + 'total_messages': aggregates['total_messages'] or 0, + 'last_activity': aggregates['last_activity'], + 'experiment_versions': experiment_versions, + } + + def get_totals_for_sessions(self, session_ids): + """ + Get totals for multiple sessions efficiently. + Returns dict mapping session_id -> totals dict. + """ + from django.db.models import Sum, Max + from apps.experiments.models import ExperimentSession + + # Get bucket aggregates per session + bucket_data = self.filter( + session_id__in=session_ids + ).values('session_id').annotate( + total_messages=Sum('human_message_count'), + last_activity=Max('last_activity_at'), + ) + + # Get cached experiment versions from sessions + sessions = ExperimentSession.objects.filter( + id__in=session_ids + ).values('id', 'cached_experiment_versions') + + # Build version mapping + version_map = { + s['id']: ', '.join(s['cached_experiment_versions']) if s['cached_experiment_versions'] else '' + for s in sessions + } + + # Combine results + results = {} + for bucket in bucket_data: + session_id = bucket['session_id'] + results[session_id] = { + 'total_messages': bucket['total_messages'] or 0, + 'last_activity': bucket['last_activity'], + 'experiment_versions': version_map.get(session_id, ''), + } + + return results + + +SessionStatisticsBucket.add_to_class('objects', SessionStatisticsBucketManager()) +``` + +--- + +## Compression Policy Configuration + +```python +# apps/experiments/statistics_config.py + +""" +Static configuration for statistics bucket compression. +""" + +from datetime import timedelta +from django.utils import timezone +from dateutil.relativedelta import relativedelta + +from apps.experiments.models import ( + ExperimentStatisticsBucket, + SessionStatisticsBucket, +) + + +class CompressionPolicy: + """ + Static compression policy for statistics buckets. + + Rules: + - 0-24 hours: Keep hourly buckets + - 1-30 days: Compress hourly -> daily + - 30+ days: Compress daily -> monthly + """ + + @classmethod + def get_hourly_to_daily_cutoff(cls): + """Get cutoff datetime for compressing hourly buckets to daily (1 day ago).""" + return timezone.now() - relativedelta(days=1) + + @classmethod + def get_daily_to_monthly_cutoff(cls): + """Get cutoff datetime for compressing daily buckets to monthly (30 days ago).""" + return timezone.now() - relativedelta(days=30) + + +# Configuration for scheduled tasks +STATISTICS_CONFIG = { + # How often to update buckets (seconds) + 'UPDATE_INTERVAL': 120, # 2 minutes + + # How often to run compression (seconds) + 'COMPRESSION_INTERVAL': 3600, # 1 hour + + # Batch size for processing + 'BATCH_SIZE': 100, +} +``` + +--- + +## Celery Tasks + +### 1. Update Current Buckets + +```python +# apps/experiments/tasks.py + +import logging +from datetime import timedelta +from celery import shared_task +from django.db.models import Count, Q, Max +from django.utils import timezone + +from apps.experiments.models import ( + Experiment, + ExperimentSession, + ExperimentStatisticsBucket, + SessionStatisticsBucket, +) +from apps.chat.models import ChatMessage, ChatMessageType + +logger = logging.getLogger('ocs.experiments.statistics') + + +@shared_task +def update_experiment_buckets(experiment_id=None, hours_back=24): + """ + Update experiment statistics buckets for recent activity. + + Args: + experiment_id: If provided, update only this experiment + hours_back: How many hours of data to process (default: 24) + """ + from apps.experiments.statistics_config import STATISTICS_CONFIG + + cutoff = timezone.now() - timedelta(hours=hours_back) + + # Get experiments with recent activity + if experiment_id: + experiments = Experiment.objects.filter(id=experiment_id) + else: + experiments = Experiment.objects.filter( + working_version__isnull=True, + is_archived=False, + sessions__chat__messages__created_at__gte=cutoff, + sessions__chat__messages__message_type=ChatMessageType.HUMAN, + ).distinct() + + for experiment in experiments.iterator(chunk_size=STATISTICS_CONFIG['BATCH_SIZE']): + _update_experiment_buckets(experiment, cutoff) + + +def _update_experiment_buckets(experiment, cutoff): + """Update buckets for a single experiment using SQL aggregation.""" + from django.db.models import Count, Max + from django.db.models.functions import TruncHour + from apps.participants.models import ParticipantData + + # Aggregate all statistics keyed by hour in single SQL query + # Get message counts and last activity per hour + message_stats = ChatMessage.objects.filter( + chat__experiment_session__experiment=experiment, + message_type=ChatMessageType.HUMAN, + created_at__gte=cutoff, + ).annotate( + hour=TruncHour('created_at') + ).values('hour').annotate( + message_count=Count('id'), + last_activity=Max('created_at'), + ) + + # Get session counts per hour + session_stats = ExperimentSession.objects.filter( + experiment=experiment, + created_at__gte=cutoff, + ).annotate( + hour=TruncHour('created_at') + ).values('hour').annotate( + session_count=Count('id'), + ) + + # Get participant counts per hour using ParticipantData + participant_stats = ParticipantData.objects.filter( + experiment=experiment, + created_at__gte=cutoff, + ).annotate( + hour=TruncHour('created_at') + ).values('hour').annotate( + participant_count=Count('id'), + ) + + # Build dictionaries for fast lookup + message_dict = {item['hour']: item for item in message_stats} + session_dict = {item['hour']: item for item in session_stats} + participant_dict = {item['hour']: item for item in participant_stats} + + # Get all unique hours from all queries + all_hours = set(message_dict.keys()) | set(session_dict.keys()) | set(participant_dict.keys()) + + # Bulk create/update buckets + buckets_to_update = [] + for hour_start in all_hours: + bucket, created = ExperimentStatisticsBucket.get_or_create_bucket( + experiment, + hour_start, + ExperimentStatisticsBucket.BucketSize.HOUR + ) + + # Update bucket with aggregated statistics + bucket.human_message_count = message_dict.get(hour_start, {}).get('message_count', 0) + bucket.last_activity_at = message_dict.get(hour_start, {}).get('last_activity') + bucket.session_count = session_dict.get(hour_start, {}).get('session_count', 0) + bucket.new_participant_count = participant_dict.get(hour_start, {}).get('participant_count', 0) + buckets_to_update.append(bucket) + + # Bulk update all buckets + if buckets_to_update: + ExperimentStatisticsBucket.objects.bulk_update( + buckets_to_update, + ['human_message_count', 'last_activity_at', 'session_count', 'new_participant_count'] + ) + + logger.info( + f"Updated {len(buckets_to_update)} hourly buckets for experiment {experiment.id}" + ) + + +@shared_task +def update_session_buckets(session_id=None, hours_back=24): + """ + Update session statistics buckets for recent activity. + + Args: + session_id: If provided, update only this session + hours_back: How many hours of data to process (default: 24) + """ + from apps.experiments.statistics_config import STATISTICS_CONFIG + + cutoff = timezone.now() - timedelta(hours=hours_back) + + # Get sessions with recent activity + if session_id: + sessions = ExperimentSession.objects.filter(id=session_id) + else: + # Update sessions that: + # 1. Have recent messages, OR + # 2. Are not yet complete (status != COMPLETE) + sessions = ExperimentSession.objects.filter( + Q(chat__messages__created_at__gte=cutoff) | + Q(status__ne='complete') + ).distinct() + + for session in sessions.iterator(chunk_size=STATISTICS_CONFIG['BATCH_SIZE']): + _update_session_buckets(session, cutoff) + + +def _update_session_buckets(session, cutoff): + """Update buckets for a single session using SQL aggregation.""" + from django.db.models.functions import TruncHour + from django.contrib.contenttypes.models import ContentType + from apps.annotations.models import CustomTaggedItem + from apps.chat.models import Chat + + # Aggregate message statistics keyed by hour in single SQL query + message_stats = ChatMessage.objects.filter( + chat=session.chat, + message_type=ChatMessageType.HUMAN, + created_at__gte=cutoff, + ).annotate( + hour=TruncHour('created_at') + ).values('hour').annotate( + message_count=Count('id'), + last_activity=Max('created_at'), + ) + + # Bulk create/update buckets + buckets_to_update = [] + for stat in message_stats: + hour_start = stat['hour'] + + bucket, created = SessionStatisticsBucket.get_or_create_bucket( + session, + hour_start, + SessionStatisticsBucket.BucketSize.HOUR + ) + + bucket.human_message_count = stat['message_count'] + bucket.last_activity_at = stat['last_activity'] + buckets_to_update.append(bucket) + + # Bulk update all buckets + if buckets_to_update: + SessionStatisticsBucket.objects.bulk_update( + buckets_to_update, + ['human_message_count', 'last_activity_at'] + ) + + # Update session's cached experiment versions + message_ct = ContentType.objects.get_for_model(ChatMessage) + all_versions = CustomTaggedItem.objects.filter( + content_type=message_ct, + object_id__in=ChatMessage.objects.filter( + chat=session.chat, + message_type=ChatMessageType.HUMAN, + ).values_list('id', flat=True), + tag__category=Chat.MetadataKeys.EXPERIMENT_VERSION, + ).values_list('tag__name', flat=True).distinct() + + # Update session with sorted unique versions + session.cached_experiment_versions = sorted(set(all_versions)) + session.save(update_fields=['cached_experiment_versions']) + + logger.info( + f"Updated {len(buckets_to_update)} hourly buckets for session {session.id}" + ) +``` + +### 2. Compression Tasks + +```python +# apps/experiments/tasks.py (continued) + +@shared_task +def compress_experiment_buckets(): + """ + Compress old experiment statistics buckets. + - Hourly -> Daily (for buckets older than 1 day) + - Daily -> Monthly (for buckets older than 30 days) + """ + from apps.experiments.statistics_config import CompressionPolicy + from django.db.models import Sum, Max + from django.db.models.functions import TruncDate, TruncMonth + + # Get cutoff dates + hourly_cutoff = CompressionPolicy.get_hourly_to_daily_cutoff() + daily_cutoff = CompressionPolicy.get_daily_to_monthly_cutoff() + + # Compress hourly to daily - filter old buckets in SQL + old_hourly_buckets = ExperimentStatisticsBucket.objects.filter( + bucket_size=ExperimentStatisticsBucket.BucketSize.HOUR, + bucket_start__lt=hourly_cutoff, + ).annotate( + date=TruncDate('bucket_start') + ).values('experiment_id', 'date').distinct() + + days_to_compress = [(item['experiment_id'], item['date']) for item in old_hourly_buckets] + + logger.info(f"Compressing {len(days_to_compress)} days from hourly to daily") + + for experiment_id, date in days_to_compress: + _compress_hourly_to_daily(experiment_id, date) + + # Compress daily to monthly - filter old buckets in SQL + old_daily_buckets = ExperimentStatisticsBucket.objects.filter( + bucket_size=ExperimentStatisticsBucket.BucketSize.DAY, + bucket_start__lt=daily_cutoff, + ).annotate( + month=TruncMonth('bucket_start') + ).values('experiment_id', 'month').distinct() + + months_to_compress = [(item['experiment_id'], item['month']) for item in old_daily_buckets] + + logger.info(f"Compressing {len(months_to_compress)} months from daily to monthly") + + for experiment_id, month_start in months_to_compress: + _compress_daily_to_monthly(experiment_id, month_start) + + +def _compress_hourly_to_daily(experiment_id, date): + """Compress all hourly buckets for a given day into a single daily bucket.""" + from django.db import transaction + from django.db.models import Sum, Max + + day_start = timezone.make_aware(datetime.combine(date, datetime.min.time())) + day_end = day_start + timedelta(days=1) + + # Get all hourly buckets for this day + hourly_buckets = ExperimentStatisticsBucket.objects.filter( + experiment_id=experiment_id, + bucket_size=ExperimentStatisticsBucket.BucketSize.HOUR, + bucket_start__gte=day_start, + bucket_start__lt=day_end, + ) + + if not hourly_buckets.exists(): + return + + # Aggregate statistics + aggregates = hourly_buckets.aggregate( + total_sessions=Sum('session_count'), + total_participants=Sum('new_participant_count'), + total_messages=Sum('human_message_count'), + last_activity=Max('last_activity_at'), + ) + + with transaction.atomic(): + # Create or update daily bucket + experiment = Experiment.objects.get(id=experiment_id) + daily_bucket, created = ExperimentStatisticsBucket.objects.update_or_create( + experiment=experiment, + bucket_size=ExperimentStatisticsBucket.BucketSize.DAY, + bucket_start=day_start, + defaults={ + 'bucket_end': day_end, + 'session_count': aggregates['total_sessions'] or 0, + 'new_participant_count': aggregates['total_participants'] or 0, + 'human_message_count': aggregates['total_messages'] or 0, + 'last_activity_at': aggregates['last_activity'], + 'team_id': experiment.team_id, + } + ) + + # Delete hourly buckets + count = hourly_buckets.delete()[0] + logger.info( + f"Compressed {count} hourly buckets to daily bucket " + f"for experiment {experiment_id} on {date}" + ) + + +def _compress_daily_to_monthly(experiment_id, month_start): + """Compress all daily buckets for a given month into a single monthly bucket.""" + from django.db import transaction + from django.db.models import Sum, Max + + # Calculate month end + if month_start.month == 12: + month_end = month_start.replace(year=month_start.year + 1, month=1) + else: + month_end = month_start.replace(month=month_start.month + 1) + + # Get all daily buckets for this month + daily_buckets = ExperimentStatisticsBucket.objects.filter( + experiment_id=experiment_id, + bucket_size=ExperimentStatisticsBucket.BucketSize.DAY, + bucket_start__gte=month_start, + bucket_start__lt=month_end, + ) + + if not daily_buckets.exists(): + return + + # Aggregate statistics + aggregates = daily_buckets.aggregate( + total_sessions=Sum('session_count'), + total_participants=Sum('new_participant_count'), + total_messages=Sum('human_message_count'), + last_activity=Max('last_activity_at'), + ) + + with transaction.atomic(): + # Create or update monthly bucket + experiment = Experiment.objects.get(id=experiment_id) + monthly_bucket, created = ExperimentStatisticsBucket.objects.update_or_create( + experiment=experiment, + bucket_size=ExperimentStatisticsBucket.BucketSize.MONTH, + bucket_start=month_start, + defaults={ + 'bucket_end': month_end, + 'session_count': aggregates['total_sessions'] or 0, + 'new_participant_count': aggregates['total_participants'] or 0, + 'human_message_count': aggregates['total_messages'] or 0, + 'last_activity_at': aggregates['last_activity'], + 'team_id': experiment.team_id, + } + ) + + # Delete daily buckets + count = daily_buckets.delete()[0] + logger.info( + f"Compressed {count} daily buckets to monthly bucket " + f"for experiment {experiment_id} starting {month_start.date()}" + ) + + +@shared_task +def compress_session_buckets(): + """ + Compress old session statistics buckets. + - Hourly -> Daily (for buckets older than 1 day) + """ + from apps.experiments.statistics_config import CompressionPolicy + from django.db.models.functions import TruncDate + + # Get cutoff date + hourly_cutoff = CompressionPolicy.get_hourly_to_daily_cutoff() + + # Compress hourly to daily - filter old buckets in SQL + old_hourly_buckets = SessionStatisticsBucket.objects.filter( + bucket_size=SessionStatisticsBucket.BucketSize.HOUR, + bucket_start__lt=hourly_cutoff, + ).annotate( + date=TruncDate('bucket_start') + ).values('session_id', 'date').distinct() + + days_to_compress = [(item['session_id'], item['date']) for item in old_hourly_buckets] + + logger.info(f"Compressing {len(days_to_compress)} session days from hourly to daily") + + for session_id, date in days_to_compress: + _compress_session_hourly_to_daily(session_id, date) + + +def _compress_session_hourly_to_daily(session_id, date): + """Compress all hourly session buckets for a given day into a single daily bucket.""" + from django.db import transaction + from django.db.models import Sum, Max + from datetime import datetime + + day_start = timezone.make_aware(datetime.combine(date, datetime.min.time())) + day_end = day_start + timedelta(days=1) + + # Get all hourly buckets for this day + hourly_buckets = SessionStatisticsBucket.objects.filter( + session_id=session_id, + bucket_size=SessionStatisticsBucket.BucketSize.HOUR, + bucket_start__gte=day_start, + bucket_start__lt=day_end, + ) + + if not hourly_buckets.exists(): + return + + # Aggregate statistics + aggregates = hourly_buckets.aggregate( + total_messages=Sum('human_message_count'), + last_activity=Max('last_activity_at'), + ) + + with transaction.atomic(): + # Create or update daily bucket + session = ExperimentSession.objects.get(id=session_id) + daily_bucket, created = SessionStatisticsBucket.objects.update_or_create( + session=session, + bucket_size=SessionStatisticsBucket.BucketSize.DAY, + bucket_start=day_start, + defaults={ + 'bucket_end': day_end, + 'human_message_count': aggregates['total_messages'] or 0, + 'last_activity_at': aggregates['last_activity'], + 'team_id': session.team_id, + } + ) + + # Delete hourly buckets + count = hourly_buckets.delete()[0] + logger.info( + f"Compressed {count} hourly session buckets to daily " + f"for session {session_id} on {date}" + ) +``` + +### 3. Backfill Task + +```python +# apps/experiments/tasks.py (continued) + +@shared_task +def backfill_statistics(experiment_id=None, session_id=None, start_date=None): + """ + Backfill statistics buckets from existing data. + + Args: + experiment_id: Backfill specific experiment + session_id: Backfill specific session + start_date: Start date for backfill (default: beginning of data) + """ + if experiment_id: + _backfill_experiment(experiment_id, start_date) + elif session_id: + _backfill_session(session_id, start_date) + else: + # Backfill all + experiments = Experiment.objects.filter( + working_version__isnull=True, + is_archived=False, + ) + for exp in experiments.iterator(chunk_size=50): + _backfill_experiment(exp.id, start_date) + + +def _backfill_experiment(experiment_id, start_date=None): + """Backfill statistics for a single experiment.""" + from django.db.models import Min + + experiment = Experiment.objects.get(id=experiment_id) + + # Determine date range + if start_date is None: + first_message = ChatMessage.objects.filter( + chat__experiment_session__experiment=experiment, + message_type=ChatMessageType.HUMAN, + ).aggregate(first=Min('created_at'))['first'] + + if not first_message: + logger.info(f"No messages found for experiment {experiment_id}") + return + + start_date = first_message.date() + + # Delete existing buckets to avoid duplicates + ExperimentStatisticsBucket.objects.filter(experiment=experiment).delete() + + # Process in chunks by month to avoid memory issues + current_date = start_date + end_date = timezone.now().date() + + while current_date <= end_date: + month_end = min( + (current_date.replace(day=1) + timedelta(days=32)).replace(day=1), + end_date + ) + + logger.info( + f"Backfilling experiment {experiment_id} from {current_date} to {month_end}" + ) + + # Update buckets for this month + _update_experiment_buckets( + experiment, + cutoff=timezone.make_aware( + datetime.combine(current_date, datetime.min.time()) + ) + ) + + current_date = month_end + + logger.info(f"Completed backfill for experiment {experiment_id}") + + +def _backfill_session(session_id, start_date=None): + """Backfill statistics for a single session.""" + from django.db.models import Min + + session = ExperimentSession.objects.get(id=session_id) + + # Determine date range + if start_date is None: + first_message = ChatMessage.objects.filter( + chat=session.chat, + message_type=ChatMessageType.HUMAN, + ).aggregate(first=Min('created_at'))['first'] + + if not first_message: + return + + start_date = first_message.date() + + # Delete existing buckets + SessionStatisticsBucket.objects.filter(session=session).delete() + + # Update buckets + _update_session_buckets( + session, + cutoff=timezone.make_aware( + datetime.combine(start_date, datetime.min.time()) + ) + ) + + logger.info(f"Completed backfill for session {session_id}") +``` + +--- + +## Scheduled Tasks Configuration + +```python +# config/settings.py + +# Add to SCHEDULED_TASKS dictionary + +SCHEDULED_TASKS = { + # ... existing tasks ... + + "experiments.tasks.update_experiment_buckets": { + "task": "apps.experiments.tasks.update_experiment_buckets", + "schedule": 120, # Every 2 minutes + }, + "experiments.tasks.update_session_buckets": { + "task": "apps.experiments.tasks.update_session_buckets", + "schedule": 120, # Every 2 minutes + }, + "experiments.tasks.compress_experiment_buckets": { + "task": "apps.experiments.tasks.compress_experiment_buckets", + "schedule": 3600, # Every hour + }, + "experiments.tasks.compress_session_buckets": { + "task": "apps.experiments.tasks.compress_session_buckets", + "schedule": 3600, # Every hour + }, +} +``` + +--- + +## View Integration + +### Update ChatbotExperimentTableView + +```python +# apps/chatbots/views.py + +class ChatbotExperimentTableView(LoginAndTeamRequiredMixin, SingleTableView, PermissionRequiredMixin): + template_name = "table/single_table.html" + model = Experiment + table_class = ChatbotTable + permission_required = "experiments.view_experiment" + + def get_table(self, **kwargs): + table = super().get_table(**kwargs) + if not flag_is_active(self.request, "flag_tracing"): + table.exclude = ("trends",) + return table + + def get_queryset(self): + """Returns queryset with database-level ordering by last activity.""" + from django.db.models import OuterRef, Subquery, Max + from apps.experiments.models import ExperimentStatisticsBucket + + query_set = ( + self.model.objects.get_all() + .filter(team=self.request.team, working_version__isnull=True, pipeline__isnull=False) + .select_related("team", "owner") + ) + show_archived = self.request.GET.get("show_archived") == "on" + if not show_archived: + query_set = query_set.filter(is_archived=False) + + search = self.request.GET.get("search") + if search: + query_set = similarity_search( + query_set, + search_phase=search, + columns=["name", "description"], + extra_conditions=Q(owner__username__icontains=search), + score=0.1, + ) + + # Annotate with last activity from buckets for database-level ordering + last_activity_subquery = ExperimentStatisticsBucket.objects.filter( + experiment=OuterRef('pk') + ).values('experiment').annotate( + max_activity=Max('last_activity_at') + ).values('max_activity')[:1] + + query_set = query_set.annotate( + cached_last_activity=Subquery(last_activity_subquery) + ).order_by('-cached_last_activity', '-created_at') + + return query_set + + def get_table_data(self): + """Add statistics from buckets.""" + queryset = super().get_table_data() + + # Get all experiment IDs in this page + experiment_ids = list(queryset.values_list('id', flat=True)) + + # Fetch statistics for all experiments in batch + from apps.experiments.models import ExperimentStatisticsBucket + stats_map = ExperimentStatisticsBucket.objects.get_totals_for_experiments(experiment_ids) + + # Annotate queryset with statistics + experiments_with_stats = [] + for experiment in queryset: + stats = stats_map.get(experiment.id, { + 'total_sessions': 0, + 'total_participants': 0, + 'total_messages': 0, + 'last_activity': None, + }) + + # Add as properties for template access + experiment.session_count = stats['total_sessions'] + experiment.participant_count = stats['total_participants'] + experiment.messages_count = stats['total_messages'] + experiment.last_message = stats['last_activity'] + + experiments_with_stats.append(experiment) + + # Already sorted at database level, no need to sort in Python + return experiments_with_stats +``` + +### Update ChatbotSessionsTableView + +```python +# apps/chatbots/views.py + +class ChatbotSessionsTableView(ExperimentSessionsTableView): + table_class = ChatbotSessionsTable + + def get_table_data(self): + """Add statistics from buckets.""" + queryset = super().get_table_data() + + # Get session IDs + session_ids = list(queryset.values_list('id', flat=True)) + + # Fetch statistics for all sessions in batch + from apps.experiments.models import SessionStatisticsBucket + stats_map = SessionStatisticsBucket.objects.get_totals_for_sessions(session_ids) + + # Annotate queryset + sessions_with_stats = [] + for session in queryset: + stats = stats_map.get(session.id, { + 'total_messages': 0, + 'last_activity': None, + 'experiment_versions': '', + }) + + session.message_count = stats['total_messages'] + session.last_message_created_at = stats['last_activity'] + session.experiment_versions = stats['experiment_versions'] + + sessions_with_stats.append(session) + + return sessions_with_stats + + def get_table(self, **kwargs): + """When viewing sessions for a specific chatbot, hide the chatbot column.""" + table = super().get_table(**kwargs) + if self.kwargs.get("experiment_id"): + table.exclude = ("chatbot",) + return table +``` + +--- + +## Management Commands + +### 1. Refresh Statistics + +```python +# apps/experiments/management/commands/refresh_statistics.py + +from django.core.management.base import BaseCommand +from apps.experiments.tasks import ( + update_experiment_buckets, + update_session_buckets, + compress_experiment_buckets, + compress_session_buckets, +) + + +class Command(BaseCommand): + help = 'Refresh experiment and session statistics buckets' + + def add_arguments(self, parser): + parser.add_argument( + '--experiment-id', + type=int, + help='Refresh specific experiment' + ) + parser.add_argument( + '--session-id', + type=int, + help='Refresh specific session' + ) + parser.add_argument( + '--hours-back', + type=int, + default=24, + help='How many hours of data to process (default: 24)' + ) + parser.add_argument( + '--compress', + action='store_true', + help='Also run compression' + ) + parser.add_argument( + '--async', + action='store_true', + dest='use_async', + help='Run tasks asynchronously via Celery' + ) + + def handle(self, *args, **options): + experiment_id = options.get('experiment_id') + session_id = options.get('session_id') + hours_back = options['hours_back'] + use_async = options['use_async'] + + if experiment_id: + self.stdout.write(f"Refreshing experiment {experiment_id}...") + if use_async: + update_experiment_buckets.delay(experiment_id, hours_back) + self.stdout.write(self.style.SUCCESS("Task queued")) + else: + update_experiment_buckets(experiment_id, hours_back) + self.stdout.write(self.style.SUCCESS("Done")) + + elif session_id: + self.stdout.write(f"Refreshing session {session_id}...") + if use_async: + update_session_buckets.delay(session_id, hours_back) + self.stdout.write(self.style.SUCCESS("Task queued")) + else: + update_session_buckets(session_id, hours_back) + self.stdout.write(self.style.SUCCESS("Done")) + + else: + self.stdout.write("Refreshing all statistics...") + if use_async: + update_experiment_buckets.delay(hours_back=hours_back) + update_session_buckets.delay(hours_back=hours_back) + self.stdout.write(self.style.SUCCESS("Tasks queued")) + else: + update_experiment_buckets(hours_back=hours_back) + update_session_buckets(hours_back=hours_back) + self.stdout.write(self.style.SUCCESS("Done")) + + if options['compress']: + self.stdout.write("Running compression...") + if use_async: + compress_experiment_buckets.delay() + compress_session_buckets.delay() + self.stdout.write(self.style.SUCCESS("Compression tasks queued")) + else: + compress_experiment_buckets() + compress_session_buckets() + self.stdout.write(self.style.SUCCESS("Compression done")) +``` + +### 2. Backfill Statistics + +```python +# apps/experiments/management/commands/backfill_statistics.py + +from datetime import datetime +from django.core.management.base import BaseCommand +from django.utils import timezone +from apps.experiments.tasks import backfill_statistics + + +class Command(BaseCommand): + help = 'Backfill statistics buckets from existing data' + + def add_arguments(self, parser): + parser.add_argument( + '--experiment-id', + type=int, + help='Backfill specific experiment' + ) + parser.add_argument( + '--session-id', + type=int, + help='Backfill specific session' + ) + parser.add_argument( + '--start-date', + type=str, + help='Start date for backfill (YYYY-MM-DD)' + ) + parser.add_argument( + '--async', + action='store_true', + dest='use_async', + help='Run task asynchronously via Celery' + ) + + def handle(self, *args, **options): + experiment_id = options.get('experiment_id') + session_id = options.get('session_id') + start_date_str = options.get('start_date') + use_async = options['use_async'] + + start_date = None + if start_date_str: + start_date = datetime.strptime(start_date_str, '%Y-%m-%d').date() + + if use_async: + backfill_statistics.delay( + experiment_id=experiment_id, + session_id=session_id, + start_date=start_date + ) + self.stdout.write(self.style.SUCCESS("Backfill task queued")) + else: + self.stdout.write("Starting backfill...") + backfill_statistics( + experiment_id=experiment_id, + session_id=session_id, + start_date=start_date + ) + self.stdout.write(self.style.SUCCESS("Backfill complete")) +``` + +### 3. Show Statistics + +```python +# apps/experiments/management/commands/show_statistics.py + +from django.core.management.base import BaseCommand +from apps.experiments.models import ( + Experiment, + ExperimentSession, + ExperimentStatisticsBucket, + SessionStatisticsBucket, +) + + +class Command(BaseCommand): + help = 'Show statistics for an experiment or session' + + def add_arguments(self, parser): + parser.add_argument( + '--experiment-id', + type=int, + help='Show statistics for specific experiment' + ) + parser.add_argument( + '--session-id', + type=int, + help='Show statistics for specific session' + ) + parser.add_argument( + '--buckets', + action='store_true', + help='Show individual bucket details' + ) + + def handle(self, *args, **options): + experiment_id = options.get('experiment_id') + session_id = options.get('session_id') + show_buckets = options['buckets'] + + if experiment_id: + self._show_experiment_stats(experiment_id, show_buckets) + elif session_id: + self._show_session_stats(session_id, show_buckets) + else: + self.stdout.write(self.style.ERROR("Provide --experiment-id or --session-id")) + + def _show_experiment_stats(self, experiment_id, show_buckets): + try: + experiment = Experiment.objects.get(id=experiment_id) + except Experiment.DoesNotExist: + self.stdout.write(self.style.ERROR(f"Experiment {experiment_id} not found")) + return + + self.stdout.write(self.style.SUCCESS(f"\nExperiment: {experiment.name}")) + self.stdout.write("-" * 60) + + # Get totals + totals = ExperimentStatisticsBucket.objects.get_totals_for_experiment(experiment) + + self.stdout.write(f"Total Sessions: {totals['total_sessions']}") + self.stdout.write(f"Total Participants: {totals['total_participants']}") + self.stdout.write(f"Total Messages: {totals['total_messages']}") + self.stdout.write(f"Last Activity: {totals['last_activity']}") + + if show_buckets: + self.stdout.write(f"\nBuckets:") + buckets = ExperimentStatisticsBucket.objects.filter( + experiment=experiment + ).order_by('-bucket_start') + + for bucket in buckets: + self.stdout.write( + f" {bucket.bucket_size:6s} | {bucket.bucket_start} | " + f"Sessions: {bucket.session_count:4d} | " + f"Messages: {bucket.human_message_count:5d}" + ) + + def _show_session_stats(self, session_id, show_buckets): + try: + session = ExperimentSession.objects.get(id=session_id) + except ExperimentSession.DoesNotExist: + self.stdout.write(self.style.ERROR(f"Session {session_id} not found")) + return + + self.stdout.write(self.style.SUCCESS(f"\nSession: {session.external_id}")) + self.stdout.write("-" * 60) + + # Get totals + totals = SessionStatisticsBucket.objects.get_totals_for_session(session) + + self.stdout.write(f"Total Messages: {totals['total_messages']}") + self.stdout.write(f"Last Activity: {totals['last_activity']}") + self.stdout.write(f"Versions: {totals['experiment_versions']}") + + if show_buckets: + self.stdout.write(f"\nBuckets:") + buckets = SessionStatisticsBucket.objects.filter( + session=session + ).order_by('-bucket_start') + + for bucket in buckets: + self.stdout.write( + f" {bucket.bucket_size:6s} | {bucket.bucket_start} | " + f"Messages: {bucket.human_message_count:5d}" + ) +``` + +--- + +## Migration Plan + +### Phase 1: Foundation (Week 1) + +#### Tasks + +1. Add `cached_experiment_versions` field to `ExperimentSession` model +2. Create models (`ExperimentStatisticsBucket`, `SessionStatisticsBucket`) +3. Create migration files for models +4. Add `statistics_config.py` with compression policy +5. Create management commands + +#### Deliverables + +- Django migrations (including session field and bucket tables) +- Empty tables ready for data +- Management commands for manual control + +#### Testing +```bash +# Create migrations +python manage.py makemigrations experiments + +# Apply migrations +python manage.py migrate + +# Verify tables exist +python manage.py dbshell +\dt experiments_*_bucket +\d experiments_experimentsession +``` + +### Phase 2: Backfill (Week 1-2) + +#### Tasks + +1. Implement backfill task +2. Test on staging environment +3. Run backfill for production data (in batches) + +#### Deliverables + +- Historical data loaded into buckets +- Verified accuracy against live queries + +#### Testing +```bash +# Backfill single experiment (test) +python manage.py backfill_statistics --experiment-id=1 + +# Verify results +python manage.py show_statistics --experiment-id=1 --buckets + +# Backfill all (production) +python manage.py backfill_statistics --async +``` + +### Phase 3: Scheduled Updates (Week 2) + +#### Tasks +1. Implement update tasks (`update_experiment_buckets`, `update_session_buckets`) +2. Add to SCHEDULED_TASKS +3. Monitor task execution + +#### Deliverables +- Automated bucket updates every 2 minutes +- Monitoring/logging in place + +#### Testing +```bash +# Run manually first +python manage.py refresh_statistics --hours-back=1 + +# Enable scheduled tasks +python manage.py setup_periodic_tasks + +# Monitor Celery logs +inv celery +``` + +### Phase 4: Compression (Week 2) + +#### Tasks +1. Implement compression tasks +2. Test compression on old data +3. Add to SCHEDULED_TASKS (hourly) + +#### Deliverables +- Automatic compression of old buckets +- Reduced storage footprint + +#### Testing +```bash +# Run compression manually +python manage.py refresh_statistics --compress + +# Verify compression +python manage.py show_statistics --experiment-id=1 --buckets +# Should see daily/monthly buckets for old data +``` + +### Phase 5: View Integration (Week 3) + +#### Tasks +1. Update `ChatbotExperimentTableView` to use buckets +2. Update `ChatbotSessionsTableView` to use buckets +3. Add fallback logic for missing cache +4. Performance testing + +#### Deliverables +- Views using cached statistics +- Fast page loads (< 2 seconds) +- Graceful degradation + +#### Testing +```bash +# Load test page +# Measure query count and time +# Compare before/after performance +``` + +### Phase 6: Monitoring & Optimization (Week 3-4) + +#### Tasks +1. Add monitoring for cache freshness +2. Tune batch sizes and intervals +3. Add admin interface for cache management +4. Document for team + +#### Deliverables +- Production-ready system +- Monitoring dashboards +- Team documentation + +--- + +## Testing Strategy + +### Unit Tests + +```python +# apps/experiments/tests/test_statistics_buckets.py + +import pytest +from datetime import datetime, timedelta +from django.utils import timezone +from apps.experiments.models import ( + ExperimentStatisticsBucket, + SessionStatisticsBucket, +) +from apps.utils.factories.experiments import ( + ExperimentFactory, + ExperimentSessionFactory, +) +from apps.utils.factories.chat import ChatMessageFactory +from apps.chat.models import ChatMessageType + + +@pytest.mark.django_db +class TestExperimentStatisticsBucket: + def test_get_bucket_boundaries_hour(self): + """Test hourly bucket boundary calculation.""" + dt = datetime(2024, 1, 15, 14, 35, 22, tzinfo=timezone.utc) + start, end = ExperimentStatisticsBucket.get_bucket_boundaries( + dt, + ExperimentStatisticsBucket.BucketSize.HOUR + ) + + assert start == datetime(2024, 1, 15, 14, 0, 0, tzinfo=timezone.utc) + assert end == datetime(2024, 1, 15, 15, 0, 0, tzinfo=timezone.utc) + + def test_get_bucket_boundaries_day(self): + """Test daily bucket boundary calculation.""" + dt = datetime(2024, 1, 15, 14, 35, 22, tzinfo=timezone.utc) + start, end = ExperimentStatisticsBucket.get_bucket_boundaries( + dt, + ExperimentStatisticsBucket.BucketSize.DAY + ) + + assert start == datetime(2024, 1, 15, 0, 0, 0, tzinfo=timezone.utc) + assert end == datetime(2024, 1, 16, 0, 0, 0, tzinfo=timezone.utc) + + def test_get_bucket_boundaries_month(self): + """Test monthly bucket boundary calculation.""" + dt = datetime(2024, 1, 15, 14, 35, 22, tzinfo=timezone.utc) + start, end = ExperimentStatisticsBucket.get_bucket_boundaries( + dt, + ExperimentStatisticsBucket.BucketSize.MONTH + ) + + assert start == datetime(2024, 1, 1, 0, 0, 0, tzinfo=timezone.utc) + assert end == datetime(2024, 2, 1, 0, 0, 0, tzinfo=timezone.utc) + + def test_get_or_create_bucket(self): + """Test bucket creation.""" + experiment = ExperimentFactory() + dt = timezone.now() + + bucket, created = ExperimentStatisticsBucket.get_or_create_bucket( + experiment, + dt, + ExperimentStatisticsBucket.BucketSize.HOUR + ) + + assert created is True + assert bucket.experiment == experiment + assert bucket.bucket_size == ExperimentStatisticsBucket.BucketSize.HOUR + + # Second call should return existing bucket + bucket2, created2 = ExperimentStatisticsBucket.get_or_create_bucket( + experiment, + dt, + ExperimentStatisticsBucket.BucketSize.HOUR + ) + + assert created2 is False + assert bucket2.id == bucket.id + + def test_get_totals_for_experiment(self): + """Test aggregating totals across buckets.""" + experiment = ExperimentFactory() + + # Create some buckets with data + now = timezone.now() + for i in range(3): + hour = now - timedelta(hours=i) + bucket, _ = ExperimentStatisticsBucket.get_or_create_bucket( + experiment, + hour, + ExperimentStatisticsBucket.BucketSize.HOUR + ) + bucket.session_count = 5 + bucket.human_message_count = 10 * (i + 1) + bucket.last_activity_at = hour + bucket.save() + + # Get totals + totals = ExperimentStatisticsBucket.objects.get_totals_for_experiment(experiment) + + assert totals['total_sessions'] == 15 # 5 + 5 + 5 + assert totals['total_messages'] == 60 # 10 + 20 + 30 + assert totals['last_activity'] is not None + + +@pytest.mark.django_db +class TestCompressionPolicy: + def test_hourly_to_daily_cutoff(self): + """Test hourly -> daily compression cutoff calculation.""" + from apps.experiments.statistics_config import CompressionPolicy + from dateutil.relativedelta import relativedelta + + cutoff = CompressionPolicy.get_hourly_to_daily_cutoff() + expected = timezone.now() - relativedelta(days=1) + + # Cutoff should be approximately 1 day ago (within 1 second) + assert abs((cutoff - expected).total_seconds()) < 1 + + def test_daily_to_monthly_cutoff(self): + """Test daily -> monthly compression cutoff calculation.""" + from apps.experiments.statistics_config import CompressionPolicy + from dateutil.relativedelta import relativedelta + + cutoff = CompressionPolicy.get_daily_to_monthly_cutoff() + expected = timezone.now() - relativedelta(days=30) + + # Cutoff should be approximately 30 days ago (within 1 second) + assert abs((cutoff - expected).total_seconds()) < 1 + + +@pytest.mark.django_db +class TestCompressionTasks: + def test_compress_hourly_to_daily(self): + """Test compressing multiple hourly buckets into daily.""" + from apps.experiments.tasks import _compress_hourly_to_daily + + experiment = ExperimentFactory() + target_date = (timezone.now() - timedelta(days=2)).date() + + # Create 24 hourly buckets for the target date + for hour in range(24): + dt = timezone.make_aware( + datetime.combine(target_date, datetime.min.time()) + ) + timedelta(hours=hour) + + bucket, _ = ExperimentStatisticsBucket.get_or_create_bucket( + experiment, + dt, + ExperimentStatisticsBucket.BucketSize.HOUR + ) + bucket.session_count = 1 + bucket.human_message_count = 10 + bucket.last_activity_at = dt + bucket.save() + + # Verify 24 hourly buckets exist + hourly_count = ExperimentStatisticsBucket.objects.filter( + experiment=experiment, + bucket_size=ExperimentStatisticsBucket.BucketSize.HOUR + ).count() + assert hourly_count == 24 + + # Compress + _compress_hourly_to_daily(experiment.id, target_date) + + # Verify compression + hourly_count = ExperimentStatisticsBucket.objects.filter( + experiment=experiment, + bucket_size=ExperimentStatisticsBucket.BucketSize.HOUR + ).count() + assert hourly_count == 0 + + daily_buckets = ExperimentStatisticsBucket.objects.filter( + experiment=experiment, + bucket_size=ExperimentStatisticsBucket.BucketSize.DAY + ) + assert daily_buckets.count() == 1 + + daily_bucket = daily_buckets.first() + assert daily_bucket.session_count == 24 + assert daily_bucket.human_message_count == 240 +``` + +### Integration Tests + +```python +# apps/experiments/tests/test_statistics_integration.py + +@pytest.mark.django_db +class TestStatisticsIntegration: + def test_end_to_end_workflow(self): + """Test complete workflow from messages to cached statistics.""" + from apps.experiments.tasks import ( + update_experiment_buckets, + compress_experiment_buckets, + ) + + # Create experiment with sessions and messages + experiment = ExperimentFactory() + session1 = ExperimentSessionFactory(experiment=experiment) + session2 = ExperimentSessionFactory(experiment=experiment) + + # Create messages at different times + now = timezone.now() + for i in range(5): + ChatMessageFactory( + chat=session1.chat, + message_type=ChatMessageType.HUMAN, + created_at=now - timedelta(hours=i) + ) + + for i in range(3): + ChatMessageFactory( + chat=session2.chat, + message_type=ChatMessageType.HUMAN, + created_at=now - timedelta(hours=i) + ) + + # Update buckets + update_experiment_buckets(experiment.id, hours_back=24) + + # Verify buckets created + buckets = ExperimentStatisticsBucket.objects.filter(experiment=experiment) + assert buckets.exists() + + # Get totals + totals = ExperimentStatisticsBucket.objects.get_totals_for_experiment(experiment) + assert totals['total_messages'] == 8 + assert totals['total_sessions'] == 2 +``` + +### Performance Tests + +```python +# apps/experiments/tests/test_statistics_performance.py + +@pytest.mark.django_db +class TestStatisticsPerformance: + def test_query_performance_with_buckets(self): + """Verify bucket-based queries are fast.""" + from django.test.utils import override_settings + from django.db import connection + from django.test import TestCase + + # Create 100 experiments with data + experiments = [] + for i in range(100): + exp = ExperimentFactory() + experiments.append(exp) + + # Create some buckets + for j in range(30): # 30 days of daily buckets + dt = timezone.now() - timedelta(days=j) + bucket, _ = ExperimentStatisticsBucket.get_or_create_bucket( + exp, + dt, + ExperimentStatisticsBucket.BucketSize.DAY + ) + bucket.session_count = 5 + bucket.human_message_count = 50 + bucket.save() + + # Measure query performance + experiment_ids = [e.id for e in experiments] + + # Reset query counter + from django.db import reset_queries + reset_queries() + + with override_settings(DEBUG=True): + # Get totals for all experiments + stats_map = ExperimentStatisticsBucket.objects.get_totals_for_experiments( + experiment_ids + ) + + # Should be fast (< 10 queries) + query_count = len(connection.queries) + assert query_count < 10 + + # Verify results + assert len(stats_map) == 100 + for exp_id, stats in stats_map.items(): + assert stats['total_sessions'] == 150 # 5 * 30 + assert stats['total_messages'] == 1500 # 50 * 30 +``` + +--- + +## Monitoring & Observability + +### Metrics to Track + +```python +# apps/experiments/monitoring.py + +import logging +from django.core.cache import cache +from django.utils import timezone + +logger = logging.getLogger('ocs.experiments.statistics') + + +def log_bucket_update(bucket_type, count, duration): + """Log bucket update performance.""" + logger.info( + f"Updated {count} {bucket_type} buckets in {duration:.2f}s", + extra={ + 'bucket_type': bucket_type, + 'count': count, + 'duration_seconds': duration, + } + ) + + +def log_compression(bucket_type, source_count, target_count, duration): + """Log compression performance.""" + logger.info( + f"Compressed {source_count} {bucket_type} buckets to {target_count} in {duration:.2f}s", + extra={ + 'bucket_type': bucket_type, + 'source_count': source_count, + 'target_count': target_count, + 'duration_seconds': duration, + 'compression_ratio': source_count / max(target_count, 1), + } + ) + + +def get_cache_health(): + """Get health metrics for statistics cache.""" + from apps.experiments.models import ( + ExperimentStatisticsBucket, + SessionStatisticsBucket, + ) + + now = timezone.now() + one_hour_ago = now - timedelta(hours=1) + + # Count recent bucket updates + recent_experiment_updates = ExperimentStatisticsBucket.objects.filter( + last_updated_at__gte=one_hour_ago + ).count() + + recent_session_updates = SessionStatisticsBucket.objects.filter( + last_updated_at__gte=one_hour_ago + ).count() + + # Count buckets by size + exp_bucket_counts = {} + for size in ExperimentStatisticsBucket.BucketSize: + exp_bucket_counts[size] = ExperimentStatisticsBucket.objects.filter( + bucket_size=size + ).count() + + return { + 'recent_experiment_updates': recent_experiment_updates, + 'recent_session_updates': recent_session_updates, + 'experiment_buckets_by_size': exp_bucket_counts, + 'timestamp': now, + } +``` + +### Admin Interface + +```python +# apps/experiments/admin.py + +from django.contrib import admin +from apps.experiments.models import ( + ExperimentStatisticsBucket, + SessionStatisticsBucket, +) + + +@admin.register(ExperimentStatisticsBucket) +class ExperimentStatisticsBucketAdmin(admin.ModelAdmin): + list_display = [ + 'experiment', + 'bucket_size', + 'bucket_start', + 'session_count', + 'human_message_count', + 'last_updated_at', + ] + list_filter = [ + 'bucket_size', + 'bucket_start', + ] + search_fields = ['experiment__name'] + readonly_fields = [ + 'last_updated_at', + 'bucket_start', + 'bucket_end', + ] + date_hierarchy = 'bucket_start' + + actions = ['refresh_buckets'] + + def refresh_buckets(self, request, queryset): + """Manually refresh selected buckets.""" + from apps.experiments.tasks import update_experiment_buckets + + experiment_ids = queryset.values_list('experiment_id', flat=True).distinct() + for exp_id in experiment_ids: + update_experiment_buckets.delay(exp_id) + + self.message_user( + request, + f"Queued refresh for {len(experiment_ids)} experiments" + ) + refresh_buckets.short_description = "Refresh selected buckets" + + +@admin.register(SessionStatisticsBucket) +class SessionStatisticsBucketAdmin(admin.ModelAdmin): + list_display = [ + 'session', + 'bucket_size', + 'bucket_start', + 'human_message_count', + 'last_updated_at', + ] + list_filter = [ + 'bucket_size', + 'bucket_start', + ] + readonly_fields = [ + 'last_updated_at', + 'bucket_start', + 'bucket_end', + ] + date_hierarchy = 'bucket_start' +``` + +--- + +## Rollback Plan + +If issues arise, the system can be safely rolled back: + +### Option 1: Disable Scheduled Tasks + +```bash +# Temporarily disable statistics tasks +# Comment out tasks in SCHEDULED_TASKS +python manage.py setup_periodic_tasks +``` + +### Option 2: Revert View Changes + +```python +# Revert ChatbotExperimentTableView.get_table_data() to use original subqueries +# Keep bucket tables but don't use them +``` + +### Option 3: Full Rollback + +```bash +# Drop bucket tables +python manage.py migrate experiments + +# Revert code changes +git revert +``` + +--- + +## Success Metrics + +1. **Performance**: + - Table load time < 2 seconds (target: < 1 second) + - Database queries per page < 10 (currently 100+) + +2. **Accuracy**: + - Statistics match live queries within acceptable margin + - Cache staleness < 2 minutes for active experiments + +3. **Storage**: + - Bucket count stays manageable (< 1M rows after compression) + - Storage growth controlled via compression policy + +4. **Reliability**: + - Scheduled tasks run without errors + - Compression completes within scheduled window + - No data loss during compression + +--- + +## Future Enhancements + +1. **Real-time Updates**: Add Django signals to update buckets on message creation +2. **Trend Visualization**: Add charts showing activity over time +3. **Custom Retention**: Per-experiment compression policies +4. **Query Optimization**: Add materialized views for frequently accessed aggregates +5. **Export**: Allow exporting bucket data for analysis + +--- + +## Conclusion + +This implementation plan provides a complete, production-ready solution for time-bucketed statistics caching. The approach: + +- ✅ **Solves the performance problem** with efficient bucket-based aggregation +- ✅ **Preserves historical data** with smart compression +- ✅ **Scales efficiently** as data grows +- ✅ **Requires no denormalization** (simpler than hybrid approach) +- ✅ **Provides clear migration path** with phased rollout +- ✅ **Includes comprehensive testing** and monitoring + +The static compression policy (hourly → daily → monthly) provides a good balance between query performance, storage efficiency, and implementation complexity.