-
Notifications
You must be signed in to change notification settings - Fork 17
Add caching for chatbot experiment data #2473
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add caching for chatbot experiment data #2473
Conversation
- Proposes SQL-based caching tables for experiment and session statistics - Three update strategies: scheduled refresh, incremental, and hybrid - Includes data compression and archival strategies - Provides detailed implementation plans, migration phases, and testing approach - Addresses performance, scalability, and maintainability concerns
- Add Option B: Time-bucketed statistics with hourly/daily/weekly/monthly granularity - Add Option C: Hybrid approach combining buckets with denormalized totals - Include comprehensive comparison table between single-row and time-bucketed approaches - Add bucket compression policies for automatic data lifecycle management - Show how time buckets enable trend analysis and efficient incremental updates - Provide clear decision matrix for choosing between options - Recommend starting with simple Option A, migrating to Option C when needed - Add code examples for bucket creation, compression, and trend queries
Complete plan for Option B (time-bucketed) with simplified approach: - No denormalized totals tables (query by summing buckets) - Static compression policy: 1 day hourly, 30 days daily, monthly after - Never delete data, only compress to larger buckets - Two models: ExperimentStatisticsBucket, SessionStatisticsBucket - Manager methods for efficient batch queries - Celery tasks for updating and compressing buckets - Management commands for manual control and monitoring - Comprehensive testing strategy - 6-phase migration plan with clear deliverables - Admin interface for monitoring - Rollback plan and success metrics
Key improvements per feedback: - Use SQL aggregation (TruncHour, GROUP BY) instead of Python loops in _update_experiment_buckets - Fix new_participant_count to count participants by first session created_at in bucket - Remove is_finalized field entirely - not needed for static compression policy - Simplify indexes - remove redundant ones (unique_together already creates index) - Clean up admin actions, tests, and management commands accordingly
Key improvements based on feedback: - Manager methods now sum new_participant_count from buckets instead of querying sessions - Update tasks use SQL aggregation keyed by truncated date (no Python loops) - Participant counting uses ParticipantData model for accuracy - Compression tasks filter old buckets directly in SQL - Use dateutil.relativedelta for date calculations - Bulk update operations to minimize database round-trips
Store experiment versions directly in SessionStatisticsBucket using ArrayField: - Add experiment_versions ArrayField to SessionStatisticsBucket model - Update _update_session_buckets to populate versions from CustomTaggedItem - Update SessionStatisticsBucketManager methods to aggregate versions from buckets - Update compression to combine versions from hourly buckets into daily buckets - Update view integration to include experiment_versions in cached data - Eliminates separate query to CustomTaggedItem when displaying session stats This optimization caches experiment version data alongside other statistics, further reducing database queries when loading session tables.
📝 WalkthroughWalkthroughTwo design and implementation plan documents have been added to guide the development of a statistics caching system for experiments and sessions. The first document outlines problem statements related to expensive queries, design goals, and three architectural options (Single-Row Aggregates, Time-Bucketed Data, and Hybrid) with schema proposals, update strategies, and graceful degradation logic. The second document provides a detailed, production-ready implementation plan focused on time-bucketed statistics with SQL-centric aggregation, automatic bucket compression, Celery-based tasks, and a phased rollout strategy. Both include Django model sketches, integration points with existing views, migration guidance, monitoring considerations, and testing strategies. No code changes or API alterations are present. Estimated code review effort🎯 3 (Moderate) | ⏱️ ~35–40 minutes
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches🧪 Generate unit tests (beta)
Tip 📝 Customizable high-level summaries are now available in beta!You can now customize how CodeRabbit generates the high-level summary in your pull requests — including its content, structure, tone, and formatting.
Example instruction:
Note: This feature is currently in beta for Pro-tier users, and pricing will be announced later. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (10)
docs/experiment-statistics-cache-design.md (5)
522-545: Use proper heading levels instead of bold emphasis.Lines 522, 533, 546, 579, 1375, 1390 use bold emphasis (
**text**) where Markdown headings (### text) would be more appropriate for section organization. This improves document structure and navigation.Example fix (line 522):
- **1. Natural Incremental Updates** → ### 1. Natural Incremental Updates
1338-1338: Add language identifiers to all fenced code blocks.Lines 1338, 1486, 1501 contain fenced code blocks without language specification (missing the language after opening backticks). This affects syntax highlighting and documentation rendering.
Examples:
- Line 1338:
```should be```textor```python- Line 1486-1497:
```should be```sql- Line 1501-1508:
```should be```sqlAlso applies to: 1486-1486, 1501-1501
1010-1016: Implement fallbackcalculate_experiment_statistics_live()function.Line 1010 references
calculate_experiment_statistics_live(experiment)but it's only shown as a stub (passon line 1016). This function is critical for graceful degradation.Provide a complete implementation or clearly document:
- What queries it should run (likely the original subqueries from
ChatbotExperimentTableView)- The object interface it should return (must match cached statistics interface)
- Whether it should be async-safe for concurrent requests
Would you like me to provide a reference implementation of the fallback calculation function?
1437-1437: Fix hyphenation in text.Line 1437: "low risk, incremental delivery" should use a hyphen for compound adjectives before a noun: "low-risk, incremental delivery".
55-55: Replace weak intensifiers with stronger alternatives.Static analysis flags the use of "very" in several places (lines 55, 870, 902, 1415) as an over-used intensifier. Consider:
- Line 55: "Very simple" → "Simple" or "Straightforward"
- Line 870: "very old, completed sessions" → "old completed sessions" or "historic completed sessions"
- Line 902: "very old experiments" → "historic experiments" or "aged experiments"
- Line 1415: "very large data volumes" → "large data volumes" or "high data volumes"
Also applies to: 870-870, 902-902, 1415-1415
docs/time-bucketed-statistics-implementation-plan.md (5)
1110-1145: Use database-level sorting in view queries.Lines 1140-1143 sort experiments in Python after fetching, which only sorts the current page result set. If users paginate, the sort order may vary between pages and the sort is inefficient.
Preferred approach: Use Django QuerySet annotation with
F()expressions to sort at the database level:from django.db.models import F, Case, When def get_table_data(self): queryset = super().get_table_data() # Fetch stats in one batch experiment_ids = list(queryset.values_list('id', flat=True)) stats_map = ExperimentStatisticsBucket.objects.get_totals_for_experiments(experiment_ids) # Annotate with stats and preserve database-level sorting # (or use prefetch_related with custom query) return queryset # Let database handle ordering via default queryset orderingIf stats must be fetched post-query, consider using
preserve_order_with_stats()helper to maintain consistent sort across pages.
505-505: Remove unused imports.Line 505 imports
OuterRefandSubquerybut they are not used in_update_experiment_buckets(). Remove to clean up code:- from django.db.models import Count, Max, OuterRef, Subquery + from django.db.models import Count, Max
217-217: Use idiomatic Django pattern for model managers.Lines 217 and 391 use
add_to_class()to attach managers to models:ExperimentStatisticsBucket.add_to_class('objects', ExperimentStatisticsBucketManager()) SessionStatisticsBucket.add_to_class('objects', SessionStatisticsBucketManager())While this works, the idiomatic Django pattern is to define the manager on the model class directly:
class ExperimentStatisticsBucket(BaseTeamModel): objects = ExperimentStatisticsBucketManager() ...
add_to_class()is typically used for dynamic additions in edge cases. For model definition, using class-level assignment is clearer and more maintainable.Check if this pattern (
add_to_class()for managers) is used elsewhere in the codebase; if not, prefer the standard approach.Also applies to: 391-391
874-874: Clarify datetime import in compression function.Line 874 imports
datetimeinside_compress_session_hourly_to_daily():def _compress_session_hourly_to_daily(session_id, date): ... from datetime import datetime ... day_start = timezone.make_aware(datetime.combine(date, datetime.min.time()))While this works, it's unconventional to import at function scope. Move to the top-level imports with other datetime imports for consistency:
from datetime import datetime, timedelta
1-10: Provide clear assumptions and prerequisites section.The document jumps into "Design Decisions" without explicitly stating prerequisites or assumptions. Add an "Assumptions & Prerequisites" section early (after Overview) that covers:
- Database: PostgreSQL required (due to ArrayField)
- Models: Assumes
ParticipantData,CustomTaggedItem,Chat,ChatMessagemodels exist with specific relationships- Django version: Minimum version for features used
- Celery: Assumes Celery is already set up with periodic tasks
- Compatibility: List which other existing models/views this depends on
This helps implementers verify they're ready before starting.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
docs/experiment-statistics-cache-design.md(1 hunks)docs/time-bucketed-statistics-implementation-plan.md(1 hunks)
🧰 Additional context used
🪛 LanguageTool
docs/experiment-statistics-cache-design.md
[style] ~55-~55: As an alternative to the over-used intensifier ‘very’, consider replacing this phrase.
Context: ...-----------| | Query Simplicity | ✅ Very simple (single row) |
(EN_WEAK_ADJECTIVE)
[style] ~870-~870: As an alternative to the over-used intensifier ‘very’, consider replacing this phrase.
Context: ...elete or archive session statistics for very old, completed sessions. *Implementation...
(EN_WEAK_ADJECTIVE)
[style] ~902-~902: As an alternative to the over-used intensifier ‘very’, consider replacing this phrase.
Context: ...regation Compression Approach: For very old experiments, aggregate statistics by ti...
(EN_WEAK_ADJECTIVE)
[style] ~1415-~1415: As an alternative to the over-used intensifier ‘very’, consider replacing this phrase.
Context: ...analysis / historical data - ✅ You have very large data volumes (millions of messages) - ✅...
(EN_WEAK_ADJECTIVE)
[grammar] ~1437-~1437: Use a hyphen to join words.
Context: ...ple, maintainable code initially - ✅ Low risk, incremental delivery - ✅ Clear mig...
(QB_NEW_EN_HYPHEN)
🪛 markdownlint-cli2 (0.18.1)
docs/experiment-statistics-cache-design.md
522-522: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
533-533: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
546-546: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
579-579: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
1338-1338: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
1375-1375: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
1390-1390: Emphasis used instead of a heading
(MD036, no-emphasis-as-heading)
1486-1486: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
1501-1501: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
🔇 Additional comments (3)
docs/experiment-statistics-cache-design.md (1)
687-687: Verify PostgreSQL-specific dependencies.Line 698 imports
StringAggfromdjango.contrib.postgres, which is PostgreSQL-specific. The document should clarify whether this codebase:
- Supports PostgreSQL exclusively, or
- Needs to support multiple databases (MySQL, SQLite for dev, etc.)
If multi-database support is needed, this aggregation function will need a conditional import or alternative implementation.
Also applies to: 698-698
docs/time-bucketed-statistics-implementation-plan.md (2)
225-225: Clarify PostgreSQL dependency for ArrayField.Line 225 imports
ArrayFieldfromdjango.contrib.postgres, and line 262 uses it inSessionStatisticsBucket.experiment_versions. This creates a hard PostgreSQL dependency.Add a prominent note in the document (near the top or in assumptions section):
Assumption: This implementation assumes PostgreSQL is the primary database. The
ArrayFieldfield type is PostgreSQL-specific. If multi-database support is required, consider using aCharFieldwith comma-separated values or a separateExperimentVersionEntrytable instead.Alternatively, make this conditional:
if DATABASE_ENGINE == 'postgresql': experiment_versions = ArrayField(...) else: experiment_versions = CharField(max_length=500) # fallbackAlso applies to: 262-267
533-540: Verify ParticipantData model exists and is appropriate.Line 533 references a
ParticipantDatamodel:participant_stats = ParticipantData.objects.filter( experiment=experiment, created_at__gte=cutoff, ).annotate( hour=TruncHour('created_at') ).values('hour').annotate( participant_count=Count('id'), )This model is not shown in the codebase context. Verify:
- Does
ParticipantDataexist inapps/participants/models.py?- Is it the right model to count unique participants per experiment per hour?
- Does it have a
created_atfield and relationship toexperiment?- Is this the intended way to count participants (vs. counting distinct
ExperimentSession.participant)?The design doc (doc 1) uses
ExperimentSession.objects.values('participant').distinct().count()for participant counting, which differs from this approach.
|
|
||
| self.total_session_count = bucket_aggregates['total_sessions'] or 0 | ||
| self.total_participant_count = participant_count | ||
| self.total_human_message_count = bucket_aggregates['total_messages'] or 0 | ||
| self.last_activity_at = bucket_aggregates['last_activity'] | ||
| self.oldest_bucket_start = bucket_aggregates['oldest_bucket'] | ||
| self.newest_bucket_end = bucket_aggregates['newest_bucket'] | ||
| self.save() | ||
|
|
||
| class SessionStatisticsTotals(BaseTeamModel): | ||
| """ | ||
| Denormalized session totals for fast access. | ||
| """ | ||
|
|
||
| session = models.OneToOneField( | ||
| ExperimentSession, | ||
| on_delete=models.CASCADE, | ||
| related_name='statistics_totals', | ||
| primary_key=True | ||
| ) | ||
|
|
||
| total_human_message_count = models.IntegerField(default=0) | ||
| last_activity_at = models.DateTimeField(null=True, blank=True) | ||
| experiment_versions = models.CharField(max_length=500, blank=True) | ||
|
|
||
| last_updated_at = models.DateTimeField(auto_now=True) | ||
| is_complete = models.BooleanField(default=False) | ||
|
|
||
| class Meta: | ||
| db_table = 'experiments_session_statistics_totals' | ||
|
|
||
| def refresh_from_buckets(self): | ||
| """Recalculate totals from bucket data.""" | ||
| from django.db.models import Sum, Max | ||
|
|
||
| bucket_aggregates = self.session.statistics_buckets.aggregate( | ||
| total_messages=Sum('human_message_count'), | ||
| last_activity=Max('last_activity_at'), | ||
| ) | ||
|
|
||
| # Get experiment versions (stored separately) | ||
| from django.contrib.contenttypes.models import ContentType | ||
| from apps.annotations.models import CustomTaggedItem | ||
| from apps.chat.models import Chat, ChatMessage | ||
|
|
||
| message_ct = ContentType.objects.get_for_model(ChatMessage) | ||
| versions = CustomTaggedItem.objects.filter( | ||
| content_type=message_ct, | ||
| object_id__in=ChatMessage.objects.filter( | ||
| chat=self.session.chat | ||
| ).values('id'), | ||
| tag__category=Chat.MetadataKeys.EXPERIMENT_VERSION, | ||
| ).values_list('tag__name', flat=True).distinct().order_by('tag__name') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Simplify SessionStatisticsTotals.refresh_from_buckets() or document external dependencies.
The method at lines 444-471 imports and queries multiple internal models (ContentType, CustomTaggedItem, Chat). While functional, this creates tight coupling and maintenance burden. Consider:
- Option A: Move this logic to a manager method or utility function for better encapsulation
- Option B: Document the external dependencies clearly and ensure they're stable APIs
- Option C: Cache experiment versions in the bucket itself (similar to the time-bucketed doc's approach)
The time-bucketed implementation document (doc 2) handles this more elegantly by storing versions directly in buckets.
| def _update_session_buckets(session, cutoff): | ||
| """Update buckets for a single session using SQL aggregation.""" | ||
| from django.db.models.functions import TruncHour | ||
| from django.contrib.contenttypes.models import ContentType | ||
| from apps.annotations.models import CustomTaggedItem | ||
| from apps.chat.models import Chat | ||
|
|
||
| # Aggregate message statistics keyed by hour in single SQL query | ||
| message_stats = ChatMessage.objects.filter( | ||
| chat=session.chat, | ||
| message_type=ChatMessageType.HUMAN, | ||
| created_at__gte=cutoff, | ||
| ).annotate( | ||
| hour=TruncHour('created_at') | ||
| ).values('hour').annotate( | ||
| message_count=Count('id'), | ||
| last_activity=Max('created_at'), | ||
| ) | ||
|
|
||
| # Get experiment versions per hour | ||
| message_ct = ContentType.objects.get_for_model(ChatMessage) | ||
|
|
||
| # Get all messages with their versions, grouped by hour | ||
| messages_with_versions = ChatMessage.objects.filter( | ||
| chat=session.chat, | ||
| message_type=ChatMessageType.HUMAN, | ||
| created_at__gte=cutoff, | ||
| ).annotate( | ||
| hour=TruncHour('created_at') | ||
| ).values('id', 'hour') | ||
|
|
||
| # Build a mapping of hour -> message IDs | ||
| hour_message_ids = {} | ||
| for msg in messages_with_versions: | ||
| hour = msg['hour'] | ||
| if hour not in hour_message_ids: | ||
| hour_message_ids[hour] = [] | ||
| hour_message_ids[hour].append(msg['id']) | ||
|
|
||
| # Get versions for all messages | ||
| hour_versions = {} | ||
| for hour, message_ids in hour_message_ids.items(): | ||
| versions = CustomTaggedItem.objects.filter( | ||
| content_type=message_ct, | ||
| object_id__in=message_ids, | ||
| tag__category=Chat.MetadataKeys.EXPERIMENT_VERSION, | ||
| ).values_list('tag__name', flat=True).distinct() | ||
| hour_versions[hour] = sorted(set(versions)) | ||
|
|
||
| # Bulk create/update buckets | ||
| buckets_to_update = [] | ||
| for stat in message_stats: | ||
| hour_start = stat['hour'] | ||
|
|
||
| bucket, created = SessionStatisticsBucket.get_or_create_bucket( | ||
| session, | ||
| hour_start, | ||
| SessionStatisticsBucket.BucketSize.HOUR | ||
| ) | ||
|
|
||
| bucket.human_message_count = stat['message_count'] | ||
| bucket.last_activity_at = stat['last_activity'] | ||
| bucket.experiment_versions = hour_versions.get(hour_start, []) | ||
| buckets_to_update.append(bucket) | ||
|
|
||
| # Bulk update all buckets | ||
| if buckets_to_update: | ||
| SessionStatisticsBucket.objects.bulk_update( | ||
| buckets_to_update, | ||
| ['human_message_count', 'last_activity_at', 'experiment_versions'] | ||
| ) | ||
|
|
||
| logger.info( | ||
| f"Updated {len(buckets_to_update)} hourly buckets for session {session.id}" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Optimize session version aggregation to avoid N+1 queries.
Lines 647-654 have a potential N+1 query issue:
for hour, message_ids in hour_message_ids.items():
versions = CustomTaggedItem.objects.filter(
content_type=message_ct,
object_id__in=message_ids,
tag__category=Chat.MetadataKeys.EXPERIMENT_VERSION,
).values_list('tag__name', flat=True).distinct()This executes one query per hour. For a session with messages spread across 24 hours, this is 24 queries. Consider optimizing:
Option 1: Fetch all versions at once and group in Python:
all_versions = CustomTaggedItem.objects.filter(
content_type=message_ct,
object_id__in=[msg_id for ids in hour_message_ids.values() for msg_id in ids],
tag__category=Chat.MetadataKeys.EXPERIMENT_VERSION,
).values_list('object_id', 'tag__name')
# Build dict: {hour: [versions]}
hour_versions = defaultdict(set)
for msg_id, version in all_versions:
hour = hour_message_ids_lookup[msg_id] # lookup which hour this message belongs to
hour_versions[hour].add(version)This reduces N queries to 2 (one for messages, one for versions).
🤖 Prompt for AI Agents
In docs/time-bucketed-statistics-implementation-plan.md around lines 607 to 681,
the current loop queries CustomTaggedItem once per hour (N+1) which causes many
DB round-trips; instead fetch all version tags in a single query for all message
IDs, retrieve pairs of (object_id, tag_name), build a lookup mapping from
message_id to hour (from the earlier messages_with_versions results) and then
group versions per hour in Python to populate hour_versions, finally proceed to
assign sorted unique versions to each bucket — this reduces the queries to one
(or two including initial messages fetch) and avoids the per-hour queries.
Add array_concat custom aggregate function to improve query efficiency: - Create migration for array_concat PostgreSQL aggregate function - Add ArrayConcat Django aggregate class wrapping the custom function - Update SessionStatisticsBucketManager.get_totals_for_sessions() to use single query - Concatenate version arrays in SQL instead of separate query + Python loop - Deduplicate and sort versions in Python (lightweight operation) - Update migration plan to include aggregate function creation - Update performance optimizations documentation Benefits: - Reduces from 2 queries to 1 for fetching session statistics - Offloads array concatenation to PostgreSQL - Maintains clean separation: SQL for aggregation, Python for deduplication/sorting
Fixes based on PR review feedback: 1. Remove unused imports (OuterRef, Subquery) from _update_experiment_buckets - These imports were not being used in the function 2. Fix N+1 query problem in version aggregation - Fetch all versions for all messages at once instead of looping per hour - Reduces from ~24 queries (one per hour) to 2 queries total - Group versions by hour in Python after fetching 3. Fix database-level sorting for experiments - Use QuerySet annotation with Subquery to get last activity from buckets - Order by cached_last_activity at database level - Removes Python-based sorting which broke pagination 4. Improve Markdown formatting - Convert bold text (**Benefits**, **Tasks**, etc.) to proper headings - Use #### subheadings for better document structure - Improves readability and navigation These changes optimize query performance and improve documentation quality.
Store experiment versions directly on ExperimentSession instead of in time buckets: Design improvements: - Add cached_experiment_versions ArrayField to ExperimentSession model - Remove experiment_versions from SessionStatisticsBucket model - Versions don't need temporal granularity, only aggregate per session - Eliminates high redundancy (most buckets would have same versions) Implementation changes: - Remove ArrayConcat aggregate class (no longer needed) - Remove custom PostgreSQL array_concat aggregate function migration - Update SessionStatisticsBucketManager to read from session.cached_experiment_versions - Update _update_session_buckets to update session field directly - Update compression to not handle version aggregation - Simplify get_totals_for_sessions to query session field Performance benefits: - Reduced storage (one array per session vs one array per bucket) - Simpler queries (direct field access vs aggregation) - Easier to update incrementally (add new versions as they appear) - No need for custom aggregate function Code reduction: -125 lines, +67 lines (net -58 lines)
Technical Description
Demo
Docs and Changelog