From 23dbd3706ca2f4123d82e11718729a81b2c3db4f Mon Sep 17 00:00:00 2001 From: bplunkett-stripe Date: Fri, 20 Dec 2024 17:46:31 -0500 Subject: [PATCH 1/3] Generated columns --- internal/queries/queries.sql | 15 +++++++++++++-- internal/queries/queries.sql.go | 9 ++++++--- internal/schema/schema.go | 21 ++++++++++++++++----- 3 files changed, 35 insertions(+), 10 deletions(-) diff --git a/internal/queries/queries.sql b/internal/queries/queries.sql index ca3ca0a..a3d72e9 100644 --- a/internal/queries/queries.sql +++ b/internal/queries/queries.sql @@ -76,16 +76,27 @@ WITH identity_col_seq AS ( depend.refobjid = owner_attr.attrelid AND depend.refobjsubid = owner_attr.attnum WHERE owner_attr.attidentity != '' +), dep_column AS ( + SELECT + a.attrelid as src_relid, + a.attnum as src_attnum, + dep_a.attname as tgt_name + FROM pg_catalog.pg_attribute AS a + JOIN pg_catalog.pg_depend dep ON dep.objid = a.attrelid AND dep.objsubid = a.attnum AND dep.classid = 'pg_class'::REGCLASS + JOIN pg_catalog.pg_attribute dep_a ON dep.refobjid = dep_a.attrelid AND dep.refobjsubid = dep_a.attnum ) - SELECT a.attname::TEXT AS column_name, COALESCE(coll.collname, '')::TEXT AS collation_name, COALESCE(collation_namespace.nspname, '')::TEXT AS collation_schema_name, COALESCE( pg_catalog.pg_get_expr(d.adbin, d.adrelid), '' - )::TEXT AS default_value, + )::TEXT AS attr_def, a.attnotnull AS is_not_null, + a.attgenerated = 's' AS is_generated, + ( + SELECT ARRAY_AGG(dep_column.tgt_name) FROM dep_column WHERE a.attrelid = dep_column.src_relid AND a.attnum = dep_column.src_attnum + )::TEXT [] AS dep_column_names, a.attlen AS column_size, a.attidentity::TEXT AS identity_type, identity_col_seq.seqstart AS start_value, diff --git a/internal/queries/queries.sql.go b/internal/queries/queries.sql.go index 1c36969..181d101 100644 --- a/internal/queries/queries.sql.go +++ b/internal/queries/queries.sql.go @@ -119,8 +119,9 @@ SELECT COALESCE(collation_namespace.nspname, '')::TEXT AS collation_schema_name, COALESCE( pg_catalog.pg_get_expr(d.adbin, d.adrelid), '' - )::TEXT AS default_value, + )::TEXT AS attr_def, a.attnotnull AS is_not_null, + a.attgenerated = 's' AS is_generated, a.attlen AS column_size, a.attidentity::TEXT AS identity_type, identity_col_seq.seqstart AS start_value, @@ -154,8 +155,9 @@ type GetColumnsForTableRow struct { ColumnName string CollationName string CollationSchemaName string - DefaultValue string + AttrDef string IsNotNull bool + IsGenerated bool ColumnSize int16 IdentityType string StartValue sql.NullInt64 @@ -180,8 +182,9 @@ func (q *Queries) GetColumnsForTable(ctx context.Context, attrelid interface{}) &i.ColumnName, &i.CollationName, &i.CollationSchemaName, - &i.DefaultValue, + &i.AttrDef, &i.IsNotNull, + &i.IsGenerated, &i.ColumnSize, &i.IdentityType, &i.StartValue, diff --git a/internal/schema/schema.go b/internal/schema/schema.go index e8c50e1..bedfc0a 100644 --- a/internal/schema/schema.go +++ b/internal/schema/schema.go @@ -228,8 +228,11 @@ type ( // ''::text // CURRENT_TIMESTAMP // If empty, indicates that there is no default value. - Default string - IsNullable bool + Default string + // If the column is generated, this will be a SQL string representing the generated expression. + // If empty, indicates that there is no default value. + GeneratedExpr string + IsNullable bool // Size is the number of bytes required to store the value. // It is used for data-packing purposes Size int @@ -878,6 +881,13 @@ func (s *schemaFetcher) buildTable( } } + defaultExpr := column.AttrDef + generatedExpr := "" + if column.IsGenerated { + defaultExpr = "" + generatedExpr = column.AttrDef + } + columns = append(columns, Column{ Name: column.ColumnName, Type: column.ColumnType, @@ -888,9 +898,10 @@ func (s *schemaFetcher) buildTable( // ''::text // CURRENT_TIMESTAMP // If empty, indicates that there is no default value. - Default: column.DefaultValue, - Size: int(column.ColumnSize), - Identity: identity, + Default: defaultExpr, + GeneratedExpr: generatedExpr, + Size: int(column.ColumnSize), + Identity: identity, }) } From 732cc6f9eacada291312491304880c4cd31b90d6 Mon Sep 17 00:00:00 2001 From: bplunkett-stripe Date: Fri, 20 Dec 2024 17:47:40 -0500 Subject: [PATCH 2/3] Update --- internal/queries/queries.sql | 30 ++++++++++++++++++++++-------- internal/queries/queries.sql.go | 32 +++++++++++++++++++++++++++++--- 2 files changed, 51 insertions(+), 11 deletions(-) diff --git a/internal/queries/queries.sql b/internal/queries/queries.sql index a3d72e9..3a61402 100644 --- a/internal/queries/queries.sql +++ b/internal/queries/queries.sql @@ -76,15 +76,25 @@ WITH identity_col_seq AS ( depend.refobjid = owner_attr.attrelid AND depend.refobjsubid = owner_attr.attnum WHERE owner_attr.attidentity != '' -), dep_column AS ( +), + +dep_column AS ( SELECT - a.attrelid as src_relid, - a.attnum as src_attnum, - dep_a.attname as tgt_name + a.attrelid AS src_relid, + a.attnum AS src_attnum, + dep_a.attname AS tgt_name FROM pg_catalog.pg_attribute AS a - JOIN pg_catalog.pg_depend dep ON dep.objid = a.attrelid AND dep.objsubid = a.attnum AND dep.classid = 'pg_class'::REGCLASS - JOIN pg_catalog.pg_attribute dep_a ON dep.refobjid = dep_a.attrelid AND dep.refobjsubid = dep_a.attnum + INNER JOIN + pg_catalog.pg_depend AS dep + ON + a.attrelid = dep.objid + AND a.attnum = dep.objsubid + AND dep.classid = 'pg_class'::REGCLASS + INNER JOIN + pg_catalog.pg_attribute AS dep_a + ON dep.refobjid = dep_a.attrelid AND dep.refobjsubid = dep_a.attnum ) + SELECT a.attname::TEXT AS column_name, COALESCE(coll.collname, '')::TEXT AS collation_name, @@ -93,9 +103,12 @@ SELECT pg_catalog.pg_get_expr(d.adbin, d.adrelid), '' )::TEXT AS attr_def, a.attnotnull AS is_not_null, - a.attgenerated = 's' AS is_generated, ( - SELECT ARRAY_AGG(dep_column.tgt_name) FROM dep_column WHERE a.attrelid = dep_column.src_relid AND a.attnum = dep_column.src_attnum + SELECT ARRAY_AGG(dep_column.tgt_name) + FROM dep_column + WHERE + a.attrelid = dep_column.src_relid + AND a.attnum = dep_column.src_attnum )::TEXT [] AS dep_column_names, a.attlen AS column_size, a.attidentity::TEXT AS identity_type, @@ -105,6 +118,7 @@ SELECT identity_col_seq.seqmin AS min_value, identity_col_seq.seqcache AS cache_size, identity_col_seq.seqcycle AS is_cycle, + a.attgenerated = 's' AS is_generated, pg_catalog.format_type(a.atttypid, a.atttypmod) AS column_type FROM pg_catalog.pg_attribute AS a LEFT JOIN diff --git a/internal/queries/queries.sql.go b/internal/queries/queries.sql.go index 181d101..251efdf 100644 --- a/internal/queries/queries.sql.go +++ b/internal/queries/queries.sql.go @@ -111,6 +111,23 @@ WITH identity_col_seq AS ( depend.refobjid = owner_attr.attrelid AND depend.refobjsubid = owner_attr.attnum WHERE owner_attr.attidentity != '' +), + +dep_column AS ( + SELECT + a.attrelid AS src_relid, + a.attnum AS src_attnum, + dep_a.attname AS tgt_name + FROM pg_catalog.pg_attribute AS a + INNER JOIN + pg_catalog.pg_depend AS dep + ON + a.attrelid = dep.objid + AND a.attnum = dep.objsubid + AND dep.classid = 'pg_class'::REGCLASS + INNER JOIN + pg_catalog.pg_attribute AS dep_a + ON dep.refobjid = dep_a.attrelid AND dep.refobjsubid = dep_a.attnum ) SELECT @@ -121,7 +138,13 @@ SELECT pg_catalog.pg_get_expr(d.adbin, d.adrelid), '' )::TEXT AS attr_def, a.attnotnull AS is_not_null, - a.attgenerated = 's' AS is_generated, + ( + SELECT ARRAY_AGG(dep_column.tgt_name) + FROM dep_column + WHERE + a.attrelid = dep_column.src_relid + AND a.attnum = dep_column.src_attnum + )::TEXT [] AS dep_column_names, a.attlen AS column_size, a.attidentity::TEXT AS identity_type, identity_col_seq.seqstart AS start_value, @@ -130,6 +153,7 @@ SELECT identity_col_seq.seqmin AS min_value, identity_col_seq.seqcache AS cache_size, identity_col_seq.seqcycle AS is_cycle, + a.attgenerated = 's' AS is_generated, pg_catalog.format_type(a.atttypid, a.atttypmod) AS column_type FROM pg_catalog.pg_attribute AS a LEFT JOIN @@ -157,7 +181,7 @@ type GetColumnsForTableRow struct { CollationSchemaName string AttrDef string IsNotNull bool - IsGenerated bool + DepColumnNames []string ColumnSize int16 IdentityType string StartValue sql.NullInt64 @@ -166,6 +190,7 @@ type GetColumnsForTableRow struct { MinValue sql.NullInt64 CacheSize sql.NullInt64 IsCycle sql.NullBool + IsGenerated bool ColumnType string } @@ -184,7 +209,7 @@ func (q *Queries) GetColumnsForTable(ctx context.Context, attrelid interface{}) &i.CollationSchemaName, &i.AttrDef, &i.IsNotNull, - &i.IsGenerated, + pq.Array(&i.DepColumnNames), &i.ColumnSize, &i.IdentityType, &i.StartValue, @@ -193,6 +218,7 @@ func (q *Queries) GetColumnsForTable(ctx context.Context, attrelid interface{}) &i.MinValue, &i.CacheSize, &i.IsCycle, + &i.IsGenerated, &i.ColumnType, ); err != nil { return nil, err From a5c1562888bee782cfe80604f05a809d620352d5 Mon Sep 17 00:00:00 2001 From: bplunkett-stripe Date: Tue, 4 Mar 2025 20:24:14 -0800 Subject: [PATCH 3/3] update --- internal/queries/queries.sql | 12 +- internal/queries/queries.sql.go | 16 +- internal/schema/schema.go | 21 +- pkg/diff/column_sql_vertex_generator.go | 280 ++++++++++++++++++++++++ pkg/diff/sql_generator.go | 271 ----------------------- 5 files changed, 311 insertions(+), 289 deletions(-) create mode 100644 pkg/diff/column_sql_vertex_generator.go diff --git a/internal/queries/queries.sql b/internal/queries/queries.sql index 3a61402..14f5761 100644 --- a/internal/queries/queries.sql +++ b/internal/queries/queries.sql @@ -78,7 +78,7 @@ WITH identity_col_seq AS ( WHERE owner_attr.attidentity != '' ), -dep_column AS ( +dep_on_column AS ( SELECT a.attrelid AS src_relid, a.attnum AS src_attnum, @@ -104,12 +104,12 @@ SELECT )::TEXT AS attr_def, a.attnotnull AS is_not_null, ( - SELECT ARRAY_AGG(dep_column.tgt_name) - FROM dep_column + SELECT ARRAY_AGG(dep_on_column.tgt_name) + FROM dep_on_column WHERE - a.attrelid = dep_column.src_relid - AND a.attnum = dep_column.src_attnum - )::TEXT [] AS dep_column_names, + a.attrelid = dep_on_column.src_relid + AND a.attnum = dep_on_column.src_attnum + )::TEXT [] AS dep_on_column_names, a.attlen AS column_size, a.attidentity::TEXT AS identity_type, identity_col_seq.seqstart AS start_value, diff --git a/internal/queries/queries.sql.go b/internal/queries/queries.sql.go index 251efdf..b060f1e 100644 --- a/internal/queries/queries.sql.go +++ b/internal/queries/queries.sql.go @@ -113,7 +113,7 @@ WITH identity_col_seq AS ( WHERE owner_attr.attidentity != '' ), -dep_column AS ( +dep_on_column AS ( SELECT a.attrelid AS src_relid, a.attnum AS src_attnum, @@ -139,12 +139,12 @@ SELECT )::TEXT AS attr_def, a.attnotnull AS is_not_null, ( - SELECT ARRAY_AGG(dep_column.tgt_name) - FROM dep_column + SELECT ARRAY_AGG(dep_on_column.tgt_name) + FROM dep_on_column WHERE - a.attrelid = dep_column.src_relid - AND a.attnum = dep_column.src_attnum - )::TEXT [] AS dep_column_names, + a.attrelid = dep_on_column.src_relid + AND a.attnum = dep_on_column.src_attnum + )::TEXT [] AS dep_on_column_names, a.attlen AS column_size, a.attidentity::TEXT AS identity_type, identity_col_seq.seqstart AS start_value, @@ -181,7 +181,7 @@ type GetColumnsForTableRow struct { CollationSchemaName string AttrDef string IsNotNull bool - DepColumnNames []string + DepOnColumnNames []string ColumnSize int16 IdentityType string StartValue sql.NullInt64 @@ -209,7 +209,7 @@ func (q *Queries) GetColumnsForTable(ctx context.Context, attrelid interface{}) &i.CollationSchemaName, &i.AttrDef, &i.IsNotNull, - pq.Array(&i.DepColumnNames), + pq.Array(&i.DepOnColumnNames), &i.ColumnSize, &i.IdentityType, &i.StartValue, diff --git a/internal/schema/schema.go b/internal/schema/schema.go index bedfc0a..f33874c 100644 --- a/internal/schema/schema.go +++ b/internal/schema/schema.go @@ -94,6 +94,15 @@ func (s Schema) Normalize() Schema { func normalizeTable(t Table) Table { // Don't normalize columns order. their order is derived from the postgres catalogs // (relevant to data packing) + var normColumns []Column + for _, c := range t.Columns { + c.DependsOnColumns = sortByKey(c.DependsOnColumns, func(s string) string { + return s + }) + normColumns = append(normColumns, c) + } + t.Columns = normColumns + var normCheckConstraints []CheckConstraint for _, checkConstraint := range sortSchemaObjectsByName(t.CheckConstraints) { checkConstraint.DependsOnFunctions = sortSchemaObjectsByName(checkConstraint.DependsOnFunctions) @@ -115,6 +124,7 @@ func normalizeTable(t Table) Table { normPolicies = append(normPolicies, p) } t.Policies = normPolicies + return t } @@ -233,6 +243,8 @@ type ( // If empty, indicates that there is no default value. GeneratedExpr string IsNullable bool + // DependsOnColumns is a list of (unescaped) column names that this column depends on. + DependsOnColumns []string // Size is the number of bytes required to store the value. // It is used for data-packing purposes Size int @@ -898,10 +910,11 @@ func (s *schemaFetcher) buildTable( // ''::text // CURRENT_TIMESTAMP // If empty, indicates that there is no default value. - Default: defaultExpr, - GeneratedExpr: generatedExpr, - Size: int(column.ColumnSize), - Identity: identity, + Default: defaultExpr, + GeneratedExpr: generatedExpr, + DependsOnColumns: column.DepOnColumnNames, + Size: int(column.ColumnSize), + Identity: identity, }) } diff --git a/pkg/diff/column_sql_vertex_generator.go b/pkg/diff/column_sql_vertex_generator.go new file mode 100644 index 0000000..a82f567 --- /dev/null +++ b/pkg/diff/column_sql_vertex_generator.go @@ -0,0 +1,280 @@ +package diff + +import ( + "fmt" + "strings" + + "github.com/google/go-cmp/cmp" + "github.com/stripe/pg-schema-diff/internal/schema" +) + +type columnSQLVertexGenerator struct { + tableName schema.SchemaQualifiedName +} + +func newColumnSQLVertexGenerator(tableName schema.SchemaQualifiedName) sqlVertexGenerator[schema.Column, columnDiff] { + return legacyToNewSqlVertexGenerator[schema.Column, columnDiff](&columnSQLVertexGenerator{tableName: tableName}) +} + +func (csg *columnSQLVertexGenerator) Add(column schema.Column) ([]Statement, error) { + columnDef, err := buildColumnDefinition(column) + if err != nil { + return nil, fmt.Errorf("building column definition: %w", err) + } + return []Statement{{ + DDL: fmt.Sprintf("%s ADD COLUMN %s", alterTablePrefix(csg.tableName), columnDef), + Timeout: statementTimeoutDefault, + LockTimeout: lockTimeoutDefault, + }}, nil +} + +func (csg *columnSQLVertexGenerator) Delete(column schema.Column) ([]Statement, error) { + return []Statement{{ + DDL: fmt.Sprintf("%s DROP COLUMN %s", alterTablePrefix(csg.tableName), schema.EscapeIdentifier(column.Name)), + Timeout: statementTimeoutDefault, + LockTimeout: lockTimeoutDefault, + Hazards: []MigrationHazard{ + { + Type: MigrationHazardTypeDeletesData, + Message: "Deletes all values in the column", + }, + }, + }}, nil +} + +func (csg *columnSQLVertexGenerator) Alter(diff columnDiff) ([]Statement, error) { + if diff.oldOrdering != diff.newOrdering { + return nil, fmt.Errorf("old=%d; new=%d: %w", diff.oldOrdering, diff.newOrdering, ErrColumnOrderingChanged) + } + oldColumn, newColumn := diff.old, diff.new + var stmts []Statement + alterColumnPrefix := fmt.Sprintf("%s ALTER COLUMN %s", alterTablePrefix(csg.tableName), schema.EscapeIdentifier(newColumn.Name)) + + // Adding a "NOT NULL" constraint must come before updating a column to be an identity column, otherwise + // the add statement will fail because a column must be non-nullable to become an identity column. + if oldColumn.IsNullable != newColumn.IsNullable && !newColumn.IsNullable { + stmts = append(stmts, Statement{ + DDL: fmt.Sprintf("%s SET NOT NULL", alterColumnPrefix), + Timeout: statementTimeoutDefault, + LockTimeout: lockTimeoutDefault, + }) + } + + // Drop the default before type conversion. This will allow type conversions + // between incompatible types if the previous column has a default and the new column is dropping its default. + // It also must be dropped before an identity is added to the column, otherwise adding the identity errors + // with "a default already exists." + // + // To keep the code simpler, put dropping the default before updating the column identity AND dropping the default. + // There is an argument to drop the default after removing the not null constraint, since writes will continue to succeed + // on columns migrating from not-null and a default to nullable with no default; however, this would not work + // for the case where an identity is being added to the column. + if len(oldColumn.Default) > 0 && len(newColumn.Default) == 0 { + stmts = append(stmts, Statement{ + DDL: fmt.Sprintf("%s DROP DEFAULT", alterColumnPrefix), + Timeout: statementTimeoutDefault, + LockTimeout: lockTimeoutDefault, + }) + } + + updateIdentityStmts, err := csg.buildUpdateIdentityStatements(oldColumn, newColumn) + if err != nil { + return nil, fmt.Errorf("building update identity statements: %w", err) + } + stmts = append(stmts, updateIdentityStmts...) + + // Removing a "NOT NULL" constraint must come after updating a column to no longer be an identity column, otherwise + // the "DROP NOT NULL" statement will fail because the column will still be an identity column. + if oldColumn.IsNullable != newColumn.IsNullable && newColumn.IsNullable { + stmts = append(stmts, Statement{ + DDL: fmt.Sprintf("%s DROP NOT NULL", alterColumnPrefix), + Timeout: statementTimeoutDefault, + LockTimeout: lockTimeoutDefault, + }) + } + + if !strings.EqualFold(oldColumn.Type, newColumn.Type) || + !strings.EqualFold(oldColumn.Collation.GetFQEscapedName(), newColumn.Collation.GetFQEscapedName()) { + stmts = append(stmts, + []Statement{ + csg.generateTypeTransformationStatement( + diff.new, + oldColumn.Type, + newColumn.Type, + newColumn.Collation, + ), + // When "SET TYPE" is used to alter a column, that column's statistics are removed, which could + // affect query plans. In order to mitigate the effect on queries, re-generate the statistics for the + // column before continuing with the migration. + { + DDL: fmt.Sprintf("ANALYZE %s (%s)", csg.tableName.GetFQEscapedName(), schema.EscapeIdentifier(newColumn.Name)), + Timeout: statementTimeoutAnalyzeColumn, + LockTimeout: lockTimeoutDefault, + Hazards: []MigrationHazard{ + { + Type: MigrationHazardTypeImpactsDatabasePerformance, + Message: "Running analyze will read rows from the table, putting increased load " + + "on the database and consuming database resources. It won't prevent reads/writes to " + + "the table, but it could affect performance when executing queries.", + }, + }, + }, + }...) + } + + if oldColumn.Default != newColumn.Default && len(newColumn.Default) > 0 { + // Set the default after the type conversion. This will allow type conversions + // between incompatible types if the previous column has no default and the new column has a default + stmts = append(stmts, Statement{ + DDL: fmt.Sprintf("%s SET DEFAULT %s", alterColumnPrefix, newColumn.Default), + Timeout: statementTimeoutDefault, + LockTimeout: lockTimeoutDefault, + }) + } + + return stmts, nil +} + +func (csg *columnSQLVertexGenerator) generateTypeTransformationStatement( + col schema.Column, + oldType string, + newType string, + newTypeCollation schema.SchemaQualifiedName, +) Statement { + if strings.EqualFold(oldType, "bigint") && + strings.EqualFold(newType, "timestamp without time zone") { + return Statement{ + DDL: fmt.Sprintf("%s SET DATA TYPE %s using to_timestamp(%s / 1000)", + csg.alterColumnPrefix(col), + newType, + schema.EscapeIdentifier(col.Name), + ), + Timeout: statementTimeoutDefault, + LockTimeout: lockTimeoutDefault, + Hazards: []MigrationHazard{{ + Type: MigrationHazardTypeAcquiresAccessExclusiveLock, + Message: "This will completely lock the table while the data is being " + + "re-written for a duration of time that scales with the size of your data. " + + "The values previously stored as BIGINT will be translated into a " + + "TIMESTAMP value via the PostgreSQL to_timestamp() function. This " + + "translation will assume that the values stored in BIGINT represent a " + + "millisecond epoch value.", + }}, + } + } + + collationModifier := "" + if !newTypeCollation.IsEmpty() { + collationModifier = fmt.Sprintf("COLLATE %s ", newTypeCollation.GetFQEscapedName()) + } + + return Statement{ + DDL: fmt.Sprintf("%s SET DATA TYPE %s %susing %s::%s", + csg.alterColumnPrefix(col), + newType, + collationModifier, + schema.EscapeIdentifier(col.Name), + newType, + ), + Timeout: statementTimeoutDefault, + LockTimeout: lockTimeoutDefault, + Hazards: []MigrationHazard{{ + Type: MigrationHazardTypeAcquiresAccessExclusiveLock, + Message: "This will completely lock the table while the data is being re-written. " + + "The duration of this conversion depends on if the type conversion is trivial " + + "or not. A non-trivial conversion will require a table rewrite. A trivial " + + "conversion is one where the binary values are coercible and the column " + + "contents are not changing.", + }}, + } +} + +func (csg *columnSQLVertexGenerator) buildUpdateIdentityStatements(old, new schema.Column) ([]Statement, error) { + if cmp.Equal(old.Identity, new.Identity) { + return nil, nil + } + + // Drop the old identity + if new.Identity == nil { + // ALTER [ COLUMN ] column_name DROP IDENTITY [ IF EXISTS ] + return []Statement{{ + DDL: fmt.Sprintf("%s DROP IDENTITY", csg.alterColumnPrefix(old)), + Timeout: statementTimeoutDefault, + LockTimeout: lockTimeoutDefault, + }}, nil + } + + // Add the new identity + if old.Identity == nil { + def, err := buildColumnIdentityDefinition(*new.Identity) + if err != nil { + return nil, fmt.Errorf("building column identity definition: %w", err) + } + // ALTER [ COLUMN ] column_name ADD GENERATED { ALWAYS | BY DEFAULT } AS IDENTITY [ ( sequence_options ) ] + return []Statement{{ + DDL: fmt.Sprintf("%s ADD %s", csg.alterColumnPrefix(new), def), + Timeout: statementTimeoutDefault, + LockTimeout: lockTimeoutDefault, + }}, nil + } + + // Alter the existing identity + var modifications []string + if old.Identity.Type != new.Identity.Type { + typeModifier, err := columnIdentityTypeToModifier(new.Identity.Type) + if err != nil { + return nil, fmt.Errorf("column identity type modifier: %w", err) + } + modifications = append(modifications, fmt.Sprintf("\tSET GENERATED %s", typeModifier)) + } + if old.Identity.Increment != new.Identity.Increment { + modifications = append(modifications, fmt.Sprintf("\tSET INCREMENT BY %d", new.Identity.Increment)) + } + if old.Identity.MinValue != new.Identity.MinValue { + modifications = append(modifications, fmt.Sprintf("\tSET MINVALUE %d", new.Identity.MinValue)) + } + if old.Identity.MaxValue != new.Identity.MaxValue { + modifications = append(modifications, fmt.Sprintf("\tSET MAXVALUE %d", new.Identity.MaxValue)) + } + if old.Identity.StartValue != new.Identity.StartValue { + modifications = append(modifications, fmt.Sprintf("\tSET START %d", new.Identity.StartValue)) + } + if old.Identity.CacheSize != new.Identity.CacheSize { + modifications = append(modifications, fmt.Sprintf("\tSET CACHE %d", new.Identity.CacheSize)) + } + if old.Identity.Cycle != new.Identity.Cycle { + cycleModifier := "" + if !new.Identity.Cycle { + cycleModifier = "NO " + } + modifications = append(modifications, fmt.Sprintf("\tSET %sCYCLE", cycleModifier)) + } + // ALTER [ COLUMN ] column_name { SET GENERATED { ALWAYS | BY DEFAULT } | SET sequence_option | RESTART [ [ WITH ] restart ] } [...] + return []Statement{{ + DDL: fmt.Sprintf("%s\n%s", csg.alterColumnPrefix(new), strings.Join(modifications, "\n")), + Timeout: statementTimeoutDefault, + LockTimeout: lockTimeoutDefault, + }}, nil +} + +func (csg *columnSQLVertexGenerator) alterColumnPrefix(col schema.Column) string { + return fmt.Sprintf("%s ALTER COLUMN %s", alterTablePrefix(csg.tableName), schema.EscapeIdentifier(col.Name)) +} + +func (csg *columnSQLVertexGenerator) GetSQLVertexId(column schema.Column, diffType diffType) sqlVertexId { + return buildColumnVertexId(column.Name, diffType) +} + +func buildColumnVertexId(columnName string, diffType diffType) sqlVertexId { + return buildSchemaObjVertexId("column", columnName, diffType) +} + +func (csg *columnSQLVertexGenerator) GetAddAlterDependencies(col, _ schema.Column) ([]dependency, error) { + return []dependency{ + mustRun(csg.GetSQLVertexId(col, diffTypeDelete)).before(csg.GetSQLVertexId(col, diffTypeAddAlter)), + }, nil +} + +func (csg *columnSQLVertexGenerator) GetDeleteDependencies(_ schema.Column) ([]dependency, error) { + return nil, nil +} diff --git a/pkg/diff/sql_generator.go b/pkg/diff/sql_generator.go index 94f67ab..96d0d28 100644 --- a/pkg/diff/sql_generator.go +++ b/pkg/diff/sql_generator.go @@ -1145,277 +1145,6 @@ func (t *tableSQLVertexGenerator) GetDeleteDependencies(table schema.Table) ([]d return deps, nil } -type columnSQLVertexGenerator struct { - tableName schema.SchemaQualifiedName -} - -func newColumnSQLVertexGenerator(tableName schema.SchemaQualifiedName) sqlVertexGenerator[schema.Column, columnDiff] { - return legacyToNewSqlVertexGenerator[schema.Column, columnDiff](&columnSQLVertexGenerator{tableName: tableName}) -} - -func (csg *columnSQLVertexGenerator) Add(column schema.Column) ([]Statement, error) { - columnDef, err := buildColumnDefinition(column) - if err != nil { - return nil, fmt.Errorf("building column definition: %w", err) - } - return []Statement{{ - DDL: fmt.Sprintf("%s ADD COLUMN %s", alterTablePrefix(csg.tableName), columnDef), - Timeout: statementTimeoutDefault, - LockTimeout: lockTimeoutDefault, - }}, nil -} - -func (csg *columnSQLVertexGenerator) Delete(column schema.Column) ([]Statement, error) { - return []Statement{{ - DDL: fmt.Sprintf("%s DROP COLUMN %s", alterTablePrefix(csg.tableName), schema.EscapeIdentifier(column.Name)), - Timeout: statementTimeoutDefault, - LockTimeout: lockTimeoutDefault, - Hazards: []MigrationHazard{ - { - Type: MigrationHazardTypeDeletesData, - Message: "Deletes all values in the column", - }, - }, - }}, nil -} - -func (csg *columnSQLVertexGenerator) Alter(diff columnDiff) ([]Statement, error) { - if diff.oldOrdering != diff.newOrdering { - return nil, fmt.Errorf("old=%d; new=%d: %w", diff.oldOrdering, diff.newOrdering, ErrColumnOrderingChanged) - } - oldColumn, newColumn := diff.old, diff.new - var stmts []Statement - alterColumnPrefix := fmt.Sprintf("%s ALTER COLUMN %s", alterTablePrefix(csg.tableName), schema.EscapeIdentifier(newColumn.Name)) - - // Adding a "NOT NULL" constraint must come before updating a column to be an identity column, otherwise - // the add statement will fail because a column must be non-nullable to become an identity column. - if oldColumn.IsNullable != newColumn.IsNullable && !newColumn.IsNullable { - stmts = append(stmts, Statement{ - DDL: fmt.Sprintf("%s SET NOT NULL", alterColumnPrefix), - Timeout: statementTimeoutDefault, - LockTimeout: lockTimeoutDefault, - }) - } - - // Drop the default before type conversion. This will allow type conversions - // between incompatible types if the previous column has a default and the new column is dropping its default. - // It also must be dropped before an identity is added to the column, otherwise adding the identity errors - // with "a default already exists." - // - // To keep the code simpler, put dropping the default before updating the column identity AND dropping the default. - // There is an argument to drop the default after removing the not null constraint, since writes will continue to succeed - // on columns migrating from not-null and a default to nullable with no default; however, this would not work - // for the case where an identity is being added to the column. - if len(oldColumn.Default) > 0 && len(newColumn.Default) == 0 { - stmts = append(stmts, Statement{ - DDL: fmt.Sprintf("%s DROP DEFAULT", alterColumnPrefix), - Timeout: statementTimeoutDefault, - LockTimeout: lockTimeoutDefault, - }) - } - - updateIdentityStmts, err := csg.buildUpdateIdentityStatements(oldColumn, newColumn) - if err != nil { - return nil, fmt.Errorf("building update identity statements: %w", err) - } - stmts = append(stmts, updateIdentityStmts...) - - // Removing a "NOT NULL" constraint must come after updating a column to no longer be an identity column, otherwise - // the "DROP NOT NULL" statement will fail because the column will still be an identity column. - if oldColumn.IsNullable != newColumn.IsNullable && newColumn.IsNullable { - stmts = append(stmts, Statement{ - DDL: fmt.Sprintf("%s DROP NOT NULL", alterColumnPrefix), - Timeout: statementTimeoutDefault, - LockTimeout: lockTimeoutDefault, - }) - } - - if !strings.EqualFold(oldColumn.Type, newColumn.Type) || - !strings.EqualFold(oldColumn.Collation.GetFQEscapedName(), newColumn.Collation.GetFQEscapedName()) { - stmts = append(stmts, - []Statement{ - csg.generateTypeTransformationStatement( - diff.new, - oldColumn.Type, - newColumn.Type, - newColumn.Collation, - ), - // When "SET TYPE" is used to alter a column, that column's statistics are removed, which could - // affect query plans. In order to mitigate the effect on queries, re-generate the statistics for the - // column before continuing with the migration. - { - DDL: fmt.Sprintf("ANALYZE %s (%s)", csg.tableName.GetFQEscapedName(), schema.EscapeIdentifier(newColumn.Name)), - Timeout: statementTimeoutAnalyzeColumn, - LockTimeout: lockTimeoutDefault, - Hazards: []MigrationHazard{ - { - Type: MigrationHazardTypeImpactsDatabasePerformance, - Message: "Running analyze will read rows from the table, putting increased load " + - "on the database and consuming database resources. It won't prevent reads/writes to " + - "the table, but it could affect performance when executing queries.", - }, - }, - }, - }...) - } - - if oldColumn.Default != newColumn.Default && len(newColumn.Default) > 0 { - // Set the default after the type conversion. This will allow type conversions - // between incompatible types if the previous column has no default and the new column has a default - stmts = append(stmts, Statement{ - DDL: fmt.Sprintf("%s SET DEFAULT %s", alterColumnPrefix, newColumn.Default), - Timeout: statementTimeoutDefault, - LockTimeout: lockTimeoutDefault, - }) - } - - return stmts, nil -} - -func (csg *columnSQLVertexGenerator) generateTypeTransformationStatement( - col schema.Column, - oldType string, - newType string, - newTypeCollation schema.SchemaQualifiedName, -) Statement { - if strings.EqualFold(oldType, "bigint") && - strings.EqualFold(newType, "timestamp without time zone") { - return Statement{ - DDL: fmt.Sprintf("%s SET DATA TYPE %s using to_timestamp(%s / 1000)", - csg.alterColumnPrefix(col), - newType, - schema.EscapeIdentifier(col.Name), - ), - Timeout: statementTimeoutDefault, - LockTimeout: lockTimeoutDefault, - Hazards: []MigrationHazard{{ - Type: MigrationHazardTypeAcquiresAccessExclusiveLock, - Message: "This will completely lock the table while the data is being " + - "re-written for a duration of time that scales with the size of your data. " + - "The values previously stored as BIGINT will be translated into a " + - "TIMESTAMP value via the PostgreSQL to_timestamp() function. This " + - "translation will assume that the values stored in BIGINT represent a " + - "millisecond epoch value.", - }}, - } - } - - collationModifier := "" - if !newTypeCollation.IsEmpty() { - collationModifier = fmt.Sprintf("COLLATE %s ", newTypeCollation.GetFQEscapedName()) - } - - return Statement{ - DDL: fmt.Sprintf("%s SET DATA TYPE %s %susing %s::%s", - csg.alterColumnPrefix(col), - newType, - collationModifier, - schema.EscapeIdentifier(col.Name), - newType, - ), - Timeout: statementTimeoutDefault, - LockTimeout: lockTimeoutDefault, - Hazards: []MigrationHazard{{ - Type: MigrationHazardTypeAcquiresAccessExclusiveLock, - Message: "This will completely lock the table while the data is being re-written. " + - "The duration of this conversion depends on if the type conversion is trivial " + - "or not. A non-trivial conversion will require a table rewrite. A trivial " + - "conversion is one where the binary values are coercible and the column " + - "contents are not changing.", - }}, - } -} - -func (csg *columnSQLVertexGenerator) buildUpdateIdentityStatements(old, new schema.Column) ([]Statement, error) { - if cmp.Equal(old.Identity, new.Identity) { - return nil, nil - } - - // Drop the old identity - if new.Identity == nil { - // ALTER [ COLUMN ] column_name DROP IDENTITY [ IF EXISTS ] - return []Statement{{ - DDL: fmt.Sprintf("%s DROP IDENTITY", csg.alterColumnPrefix(old)), - Timeout: statementTimeoutDefault, - LockTimeout: lockTimeoutDefault, - }}, nil - } - - // Add the new identity - if old.Identity == nil { - def, err := buildColumnIdentityDefinition(*new.Identity) - if err != nil { - return nil, fmt.Errorf("building column identity definition: %w", err) - } - // ALTER [ COLUMN ] column_name ADD GENERATED { ALWAYS | BY DEFAULT } AS IDENTITY [ ( sequence_options ) ] - return []Statement{{ - DDL: fmt.Sprintf("%s ADD %s", csg.alterColumnPrefix(new), def), - Timeout: statementTimeoutDefault, - LockTimeout: lockTimeoutDefault, - }}, nil - } - - // Alter the existing identity - var modifications []string - if old.Identity.Type != new.Identity.Type { - typeModifier, err := columnIdentityTypeToModifier(new.Identity.Type) - if err != nil { - return nil, fmt.Errorf("column identity type modifier: %w", err) - } - modifications = append(modifications, fmt.Sprintf("\tSET GENERATED %s", typeModifier)) - } - if old.Identity.Increment != new.Identity.Increment { - modifications = append(modifications, fmt.Sprintf("\tSET INCREMENT BY %d", new.Identity.Increment)) - } - if old.Identity.MinValue != new.Identity.MinValue { - modifications = append(modifications, fmt.Sprintf("\tSET MINVALUE %d", new.Identity.MinValue)) - } - if old.Identity.MaxValue != new.Identity.MaxValue { - modifications = append(modifications, fmt.Sprintf("\tSET MAXVALUE %d", new.Identity.MaxValue)) - } - if old.Identity.StartValue != new.Identity.StartValue { - modifications = append(modifications, fmt.Sprintf("\tSET START %d", new.Identity.StartValue)) - } - if old.Identity.CacheSize != new.Identity.CacheSize { - modifications = append(modifications, fmt.Sprintf("\tSET CACHE %d", new.Identity.CacheSize)) - } - if old.Identity.Cycle != new.Identity.Cycle { - cycleModifier := "" - if !new.Identity.Cycle { - cycleModifier = "NO " - } - modifications = append(modifications, fmt.Sprintf("\tSET %sCYCLE", cycleModifier)) - } - // ALTER [ COLUMN ] column_name { SET GENERATED { ALWAYS | BY DEFAULT } | SET sequence_option | RESTART [ [ WITH ] restart ] } [...] - return []Statement{{ - DDL: fmt.Sprintf("%s\n%s", csg.alterColumnPrefix(new), strings.Join(modifications, "\n")), - Timeout: statementTimeoutDefault, - LockTimeout: lockTimeoutDefault, - }}, nil -} - -func (csg *columnSQLVertexGenerator) alterColumnPrefix(col schema.Column) string { - return fmt.Sprintf("%s ALTER COLUMN %s", alterTablePrefix(csg.tableName), schema.EscapeIdentifier(col.Name)) -} - -func (csg *columnSQLVertexGenerator) GetSQLVertexId(column schema.Column, diffType diffType) sqlVertexId { - return buildColumnVertexId(column.Name, diffType) -} - -func buildColumnVertexId(columnName string, diffType diffType) sqlVertexId { - return buildSchemaObjVertexId("column", columnName, diffType) -} - -func (csg *columnSQLVertexGenerator) GetAddAlterDependencies(col, _ schema.Column) ([]dependency, error) { - return []dependency{ - mustRun(csg.GetSQLVertexId(col, diffTypeDelete)).before(csg.GetSQLVertexId(col, diffTypeAddAlter)), - }, nil -} - -func (csg *columnSQLVertexGenerator) GetDeleteDependencies(_ schema.Column) ([]dependency, error) { - return nil, nil -} - type renameConflictingIndexSQLVertexGenerator struct { // indexesInOldSchemaByName is a map of index name to the index in the old schema // It is used to identify if an index has been re-created