Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions engine/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,43 @@ func Connect(connectURL string) {
if err != nil {
log.Fatalln("Failed to auto migrate:", err)
}

// Create materialized views
createCumulativeScoresView()
}

// createCumulativeScoresView creates the materialized view for cumulative scores.
func createCumulativeScoresView() {
// Create the materialized view if it doesn't exist
// If it does exist, CREATE won't refresh it, so we do that separately
err := db.Exec(`
CREATE MATERIALIZED VIEW IF NOT EXISTS cumulative_scores AS
SELECT DISTINCT
round_id,
team_id,
SUM(CASE WHEN result = '1' THEN points ELSE 0 END)
OVER(PARTITION BY team_id ORDER BY round_id) as cumulative_points
FROM service_check_schemas
ORDER BY team_id, round_id
`).Error
if err != nil {
log.Fatalln("Failed to create cumulative_scores materialized view:", err)
}

// Unique index required to enable REFRESH CONCURRENTLY
err = db.Exec(`
CREATE UNIQUE INDEX IF NOT EXISTS idx_cumulative_scores_round_team
ON cumulative_scores (round_id, team_id)
`).Error
if err != nil {
log.Fatalln("Failed to create index on cumulative_scores:", err)
}

// Ensure view is populated/fresh on startup in case there was existing data
err = db.Exec("REFRESH MATERIALIZED VIEW cumulative_scores").Error
if err != nil {
log.Fatalln("Failed to refresh cumulative_scores materialized view:", err)
}
}

func AddTeams(conf *config.ConfigSettings) error {
Expand Down Expand Up @@ -115,5 +152,10 @@ func ResetScores() error {
return err
}

// Refresh the materialized view to clear it
if err := db.Exec("REFRESH MATERIALIZED VIEW cumulative_scores").Error; err != nil {
return err
}

return nil
}
5 changes: 5 additions & 0 deletions engine/db/rounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,8 @@ func GetLastRound() (RoundSchema, error) {
}
return round, nil
}

func RefreshScoresMaterializedView() error {
// Use concurrent refresh to avoid blocking reads
return db.Exec("REFRESH MATERIALIZED VIEW CONCURRENTLY cumulative_scores").Error
}
7 changes: 3 additions & 4 deletions engine/db/servicechecks.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,10 @@ func GetServiceCheckSumByRound() ([]map[uint]int, error) {
// creates array with size of num rounds
result := make([]map[uint]int, last.ID)

// Query from materialized view instead of running window function each time
rows, err := db.Raw(`
SELECT DISTINCT round_id, team_id,
SUM(CASE WHEN result = '1' THEN points ELSE 0 END)
OVER(PARTITION BY team_id ORDER BY round_id)
FROM service_check_schemas
SELECT round_id, team_id, cumulative_points
FROM cumulative_scores
ORDER BY team_id, round_id
`).Rows()
if err != nil {
Expand Down
17 changes: 17 additions & 0 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"strings"
"sync"
"sync/atomic"
"time"
"unicode/utf8"

Expand All @@ -31,6 +32,9 @@ type ScoringEngine struct {
CurrentRoundStartTime time.Time
RedisClient *redis.Client

// Concurrency control for materialized view refresh
Refreshing atomic.Bool

// Config update handling
configPath string
}
Expand Down Expand Up @@ -573,4 +577,17 @@ func (se *ScoringEngine) processCollectedResults(results []checks.Result) {
}

slog.Debug("Successfully processed results for round", "round", se.CurrentRound, "total", len(dbResults))

// Refresh materialized view asynchronously, but avoid concurrent refreshes
currentRound := se.CurrentRound
if se.Refreshing.CompareAndSwap(false, true) {
go func(round uint) {
defer se.Refreshing.Store(false)
if err := db.RefreshScoresMaterializedView(); err != nil {
slog.Error("failed to refresh materialized view", "round", round, "error", err)
}
}(currentRound)
} else {
slog.Debug("refresh already in progress, skipping refresh spawn", "round", currentRound)
}
}
69 changes: 69 additions & 0 deletions tests/integration/materialized_view_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package integration

import (
"quotient/engine/db"
"quotient/tests/testutil"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestMaterializedViewLifecycle(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test in short mode")
}

// Start PostgreSQL connection
pgContainer := testutil.StartPostgres(t)

// Initialize database connection
// This calls createCumulativeScoresView() internally, which now includes the initial REFRESH
db.Connect(pgContainer.ConnectionString())

// Data cleanup
err := db.ResetScores()
require.NoError(t, err, "ResetScores should succeed")

t.Run("refresh with zero rows", func(t *testing.T) {
// The view should operate correctly even with no data
err := db.RefreshScoresMaterializedView()
require.NoError(t, err, "RefreshScoresMaterializedView should succeed with 0 rows")
})

t.Run("refresh with data", func(t *testing.T) {
// Add a team
team := db.TeamSchema{
Name: "ViewTestTeam",
Active: true,
Identifier: "vt1",
}
teamCreated, err := db.CreateTeam(team)
require.NoError(t, err)

// Create a round with a result
check := db.ServiceCheckSchema{
TeamID: teamCreated.ID,
RoundID: 1,
ServiceName: "test-service",
Points: 10,
Result: true,
}

round := db.RoundSchema{
ID: 1,
StartTime: time.Now(),
Checks: []db.ServiceCheckSchema{check},
}

_, err = db.CreateRound(round)
require.NoError(t, err, "should save round to database")

// Refresh should succeed with data
err = db.RefreshScoresMaterializedView()
require.NoError(t, err, "RefreshScoresMaterializedView should succeed with data")

// Optional: We could verify data via db.GetServiceCheckSumByRound() if we wanted to be thorough
// but the main point here is that the REFRESH command doesn't throw an error.
})
}