diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/Cosmos.DataTransfer.SqlServerExtension.UnitTests.csproj b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/Cosmos.DataTransfer.SqlServerExtension.UnitTests.csproj index 36e5f3f..7498e5f 100644 --- a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/Cosmos.DataTransfer.SqlServerExtension.UnitTests.csproj +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/Cosmos.DataTransfer.SqlServerExtension.UnitTests.csproj @@ -14,6 +14,7 @@ + diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerSinkSettingsTests.cs b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerSinkSettingsTests.cs new file mode 100644 index 0000000..0f129ba --- /dev/null +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerSinkSettingsTests.cs @@ -0,0 +1,317 @@ +using System.ComponentModel.DataAnnotations; +using Cosmos.DataTransfer.Interfaces; +using Microsoft.Extensions.Configuration; + +namespace Cosmos.DataTransfer.SqlServerExtension.UnitTests; + +[TestClass] +public class SqlServerSinkSettingsTests +{ + [TestMethod] + public void TestSinkSettings_DefaultWriteMode_IsInsert() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" } + } + }; + + Assert.AreEqual(SqlWriteMode.Insert, settings.WriteMode, "WriteMode should default to Insert"); + } + + [TestMethod] + public void TestSinkSettings_WriteMode_CanBeSetToUpsert() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Upsert, + PrimaryKeyColumns = new List { "Id" }, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" } + } + }; + + Assert.AreEqual(SqlWriteMode.Upsert, settings.WriteMode, "WriteMode should be settable to Upsert"); + } + + [TestMethod] + public void TestSinkSettings_UpsertMode_RequiresPrimaryKeyColumns() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Upsert, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" } + } + }; + + var validationResults = settings.Validate(new ValidationContext(settings)).ToList(); + + Assert.IsTrue(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.PrimaryKeyColumns))), + "Validation should fail when PrimaryKeyColumns is empty and WriteMode is Upsert"); + + Assert.IsTrue(validationResults.Any(v => v.ErrorMessage!.Contains("PrimaryKeyColumns must be specified")), + "Validation error should mention PrimaryKeyColumns requirement"); + } + + [TestMethod] + public void TestSinkSettings_UpsertMode_WithPrimaryKeyColumns_PassesValidation() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Upsert, + PrimaryKeyColumns = new List { "Id" }, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" }, + new ColumnMapping { ColumnName = "Name" } + } + }; + + var validationResults = settings.Validate(new ValidationContext(settings)).ToList(); + + Assert.IsFalse(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.PrimaryKeyColumns))), + "Validation should pass when PrimaryKeyColumns is provided with Upsert mode"); + } + + [TestMethod] + public void TestSinkSettings_InsertMode_DoesNotRequirePrimaryKeyColumns() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Insert, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" } + } + }; + + var validationResults = settings.Validate(new ValidationContext(settings)).ToList(); + + Assert.IsFalse(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.PrimaryKeyColumns))), + "Validation should not require PrimaryKeyColumns when WriteMode is Insert"); + } + + [TestMethod] + public void TestSinkSettings_WriteMode_DeserializesFromJson() + { + // Test JSON to enum conversion for Insert + var jsonInsert = """{"WriteMode": "Insert"}"""; + var configInsert = new ConfigurationBuilder() + .AddJsonStream(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(jsonInsert))) + .Build(); + var settingsInsert = configInsert.Get(); + Assert.AreEqual(SqlWriteMode.Insert, settingsInsert?.WriteMode, "WriteMode should be deserialized from JSON string 'Insert'"); + + // Test JSON to enum conversion for Upsert + var jsonUpsert = """{"WriteMode": "Upsert"}"""; + var configUpsert = new ConfigurationBuilder() + .AddJsonStream(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(jsonUpsert))) + .Build(); + var settingsUpsert = configUpsert.Get(); + Assert.AreEqual(SqlWriteMode.Upsert, settingsUpsert?.WriteMode, "WriteMode should be deserialized from JSON string 'Upsert'"); + } + + [TestMethod] + public void TestSinkSettings_PrimaryKeyColumns_DeserializesFromJson() + { + var json = """ + { + "ConnectionString": "Server=.;Database=Test;", + "TableName": "TestTable", + "WriteMode": "Upsert", + "PrimaryKeyColumns": ["Id", "TenantId"], + "ColumnMappings": [ + {"ColumnName": "Id"}, + {"ColumnName": "TenantId"} + ] + } + """; + + var config = new ConfigurationBuilder() + .AddJsonStream(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(json))) + .Build(); + var settings = config.Get(); + + Assert.IsNotNull(settings, "Settings should be deserialized"); + Assert.AreEqual(SqlWriteMode.Upsert, settings!.WriteMode, "WriteMode should be Upsert"); + Assert.IsNotNull(settings.PrimaryKeyColumns, "PrimaryKeyColumns should not be null"); + Assert.AreEqual(2, settings.PrimaryKeyColumns.Count, "Should have 2 primary key columns"); + CollectionAssert.AreEqual(new[] { "Id", "TenantId" }, settings.PrimaryKeyColumns, "Primary key columns should match"); + } + + [TestMethod] + public void TestSinkSettings_CompositePrimaryKey_PassesValidation() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Upsert, + PrimaryKeyColumns = new List { "TenantId", "UserId" }, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "TenantId" }, + new ColumnMapping { ColumnName = "UserId" }, + new ColumnMapping { ColumnName = "Name" } + } + }; + + var validationResults = settings.Validate(new ValidationContext(settings)).ToList(); + + Assert.IsFalse(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.PrimaryKeyColumns))), + "Validation should pass with composite primary key"); + } + + [TestMethod] + public void TestSinkSettings_AllColumnsArePrimaryKeys_FailsValidation() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Upsert, + PrimaryKeyColumns = new List { "Id", "Name" }, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" }, + new ColumnMapping { ColumnName = "Name" } + } + }; + + var validationResults = settings.Validate(new ValidationContext(settings)).ToList(); + + Assert.IsTrue(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.ColumnMappings))), + "Validation should fail when all columns are primary keys without DeleteNotMatchedBySource"); + + Assert.IsTrue(validationResults.Any(v => v.ErrorMessage!.Contains("non-primary key column")), + "Validation error should mention non-primary key column requirement"); + } + + [TestMethod] + public void TestSinkSettings_AllColumnsArePrimaryKeys_WithDelete_PassesValidation() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Upsert, + PrimaryKeyColumns = new List { "Id", "Name" }, + DeleteNotMatchedBySource = true, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" }, + new ColumnMapping { ColumnName = "Name" } + } + }; + + var validationResults = settings.Validate(new ValidationContext(settings)).ToList(); + + Assert.IsFalse(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.ColumnMappings))), + "Validation should pass when all columns are primary keys but DeleteNotMatchedBySource is true"); + } + + [TestMethod] + public void TestSinkSettings_DeleteNotMatchedBySource_DefaultsToFalse() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Upsert, + PrimaryKeyColumns = new List { "Id" }, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" }, + new ColumnMapping { ColumnName = "Name" } + } + }; + + Assert.IsFalse(settings.DeleteNotMatchedBySource, "DeleteNotMatchedBySource should default to false"); + } + + [TestMethod] + public void TestSinkSettings_DeleteNotMatchedBySource_CanBeSetToTrue() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Upsert, + PrimaryKeyColumns = new List { "Id" }, + DeleteNotMatchedBySource = true, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" }, + new ColumnMapping { ColumnName = "Name" } + } + }; + + Assert.IsTrue(settings.DeleteNotMatchedBySource, "DeleteNotMatchedBySource should be settable to true"); + } + + [TestMethod] + public void TestSinkSettings_DeleteNotMatchedBySource_DeserializesFromJson() + { + var json = """ + { + "ConnectionString": "Server=.;Database=Test;", + "TableName": "TestTable", + "WriteMode": "Upsert", + "PrimaryKeyColumns": ["Id"], + "DeleteNotMatchedBySource": true, + "ColumnMappings": [ + {"ColumnName": "Id"}, + {"ColumnName": "Name"} + ] + } + """; + + var config = new ConfigurationBuilder() + .AddJsonStream(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(json))) + .Build(); + var settings = config.Get(); + + Assert.IsNotNull(settings, "Settings should be deserialized"); + Assert.IsTrue(settings!.DeleteNotMatchedBySource, "DeleteNotMatchedBySource should be true"); + } + + [TestMethod] + public void TestSinkSettings_DeleteNotMatchedBySource_WithInsertMode_FailsValidation() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Insert, + DeleteNotMatchedBySource = true, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" }, + new ColumnMapping { ColumnName = "Name" } + } + }; + + var validationResults = settings.Validate(new ValidationContext(settings)).ToList(); + + Assert.IsTrue(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.DeleteNotMatchedBySource))), + "Validation should fail when DeleteNotMatchedBySource is true with Insert mode"); + + Assert.IsTrue(validationResults.Any(v => v.ErrorMessage!.Contains("can only be used when WriteMode is Upsert")), + "Validation error should mention Upsert mode requirement"); + } +} diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerDataSinkExtension.cs b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerDataSinkExtension.cs index 61804f4..1549abd 100644 --- a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerDataSinkExtension.cs +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerDataSinkExtension.cs @@ -18,16 +18,151 @@ public async Task WriteAsync(IAsyncEnumerable dataItems, IConfigurati var settings = config.Get(); settings.Validate(); + if (settings!.WriteMode == SqlWriteMode.Upsert) + { + await WriteUpsertAsync(dataItems, settings, logger, cancellationToken); + } + else + { + await WriteInsertAsync(dataItems, settings, logger, cancellationToken); + } + } + + private async Task WriteInsertAsync(IAsyncEnumerable dataItems, SqlServerSinkSettings settings, ILogger logger, CancellationToken cancellationToken) + { string tableName = settings!.TableName!; + + // Validate table name to prevent SQL injection + ValidateSqlIdentifier(tableName, nameof(settings.TableName)); + + // Validate column names to prevent SQL injection + foreach (var column in settings.ColumnMappings) + { + ValidateSqlIdentifier(column.ColumnName!, nameof(column.ColumnName)); + } await using var connection = new SqlConnection(settings.ConnectionString); await connection.OpenAsync(cancellationToken); - await using (var transaction = connection.BeginTransaction()) + await using var transaction = connection.BeginTransaction(); + + try { + using var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.CheckConstraints | SqlBulkCopyOptions.KeepIdentity, transaction); + bulkCopy.DestinationTableName = tableName; + + var dataColumns = new Dictionary(); + foreach (ColumnMapping columnMapping in settings.ColumnMappings) + { + Type type = Type.GetType(columnMapping.DataType ?? "System.String")!; + DataColumn dbColumn = new DataColumn(columnMapping.ColumnName, type); + dataColumns.Add(columnMapping, dbColumn); + bulkCopy.ColumnMappings.Add(new SqlBulkCopyColumnMapping(dbColumn.ColumnName, dbColumn.ColumnName)); + } + + + var dataTable = new DataTable(); + dataTable.Columns.AddRange(dataColumns.Values.ToArray()); + + var batches = dataItems.Buffer(settings.BatchSize); + await foreach (var batch in batches.WithCancellation(cancellationToken)) + { + foreach (var item in batch) + { + var fieldNames = item.GetFieldNames().ToList(); + DataRow row = dataTable.NewRow(); + foreach (var columnMapping in dataColumns) + { + DataColumn column = columnMapping.Value; + ColumnMapping mapping = columnMapping.Key; + + string? fieldName = mapping.GetFieldName(); + if (fieldName != null) + { + object? value = null; + var sourceField = fieldNames.FirstOrDefault(n => n.Equals(fieldName, StringComparison.CurrentCultureIgnoreCase)); + if (sourceField != null) + { + value = item.GetValue(sourceField); + } + + if (value != null || mapping.AllowNull) + { + if (value is IDataItem child) + { + value = child.AsJsonString(false, false); + } + row[column.ColumnName] = value; + } + else + { + row[column.ColumnName] = mapping.DefaultValue; + } + } + } + dataTable.Rows.Add(row); + } + await bulkCopy.WriteToServerAsync(dataTable, cancellationToken); + dataTable.Clear(); + } + + await transaction.CommitAsync(cancellationToken); + } + catch (Exception ex) + { + logger.LogError(ex, "Error copying data to table {TableName}", tableName); try { - using var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.CheckConstraints | SqlBulkCopyOptions.KeepIdentity, transaction); - bulkCopy.DestinationTableName = tableName; + await transaction.RollbackAsync(cancellationToken); + } + catch (Exception rollbackEx) + { + logger.LogError(rollbackEx, "Error rolling back transaction for table {TableName}", tableName); + } + throw; + } + + await connection.CloseAsync(); + } + + private async Task WriteUpsertAsync(IAsyncEnumerable dataItems, SqlServerSinkSettings settings, ILogger logger, CancellationToken cancellationToken) + { + string tableName = settings!.TableName!; + + // Validate table name to prevent SQL injection + ValidateSqlIdentifier(tableName, nameof(settings.TableName)); + + // Validate column names to prevent SQL injection + foreach (var column in settings.ColumnMappings) + { + ValidateSqlIdentifier(column.ColumnName!, nameof(column.ColumnName)); + } + foreach (var pkColumn in settings.PrimaryKeyColumns) + { + ValidateSqlIdentifier(pkColumn, nameof(settings.PrimaryKeyColumns)); + } + + string stagingTableName = $"#Staging_{Guid.NewGuid():N}"; + + await using var connection = new SqlConnection(settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + try + { + // Create staging table with same structure as target table + var createStagingTableSql = $@" + SELECT TOP 0 * + INTO {stagingTableName} + FROM {tableName}"; + + await using (var createCommand = new SqlCommand(createStagingTableSql, connection)) + { + await createCommand.ExecuteNonQueryAsync(cancellationToken); + } + + // Bulk insert into staging table + using (var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.CheckConstraints | SqlBulkCopyOptions.KeepIdentity, null)) + { + bulkCopy.DestinationTableName = stagingTableName; var dataColumns = new Dictionary(); foreach (ColumnMapping columnMapping in settings.ColumnMappings) @@ -38,7 +173,6 @@ public async Task WriteAsync(IAsyncEnumerable dataItems, IConfigurati bulkCopy.ColumnMappings.Add(new SqlBulkCopyColumnMapping(dbColumn.ColumnName, dbColumn.ColumnName)); } - var dataTable = new DataTable(); dataTable.Columns.AddRange(dataColumns.Values.ToArray()); @@ -83,17 +217,98 @@ public async Task WriteAsync(IAsyncEnumerable dataItems, IConfigurati await bulkCopy.WriteToServerAsync(dataTable, cancellationToken); dataTable.Clear(); } - - await transaction.CommitAsync(cancellationToken); } - catch (Exception ex) + + // Build and execute MERGE statement + var mergeStatement = BuildMergeStatement(tableName, stagingTableName, settings); + logger.LogInformation("Executing MERGE statement for upsert operation"); + + await using (var mergeCommand = new SqlCommand(mergeStatement, connection)) { - Console.WriteLine($"Error copying data to table {tableName}: {ex.Message}"); - await transaction.RollbackAsync(cancellationToken); + mergeCommand.CommandTimeout = 300; // 5 minutes timeout for large merges + var rowsAffected = await mergeCommand.ExecuteNonQueryAsync(cancellationToken); + logger.LogInformation("MERGE completed. Rows affected: {RowsAffected}", rowsAffected); } } + catch (Exception ex) + { + logger.LogError(ex, "Error during upsert operation to table {TableName}", tableName); + throw; + } + finally + { + // Clean up staging table (temp tables are automatically dropped on connection close) + await connection.CloseAsync(); + } + } - await connection.CloseAsync(); + /// + /// Validates that a SQL identifier contains only allowed characters to prevent SQL injection. + /// Allows alphanumeric characters, underscores, dots (for schema.table), and spaces (for quoted identifiers). + /// + private static void ValidateSqlIdentifier(string identifier, string parameterName) + { + if (string.IsNullOrWhiteSpace(identifier)) + { + throw new ArgumentException("SQL identifier cannot be null or empty.", parameterName); + } + + // Allow alphanumeric, underscore, dot (for schema.table), space (for quoted identifiers), and brackets + if (!System.Text.RegularExpressions.Regex.IsMatch(identifier, @"^[\w\.\s\[\]]+$")) + { + throw new ArgumentException( + $"Invalid SQL identifier '{identifier}'. Identifiers can only contain alphanumeric characters, underscores, dots, spaces, and brackets.", + parameterName); + } + } + + private string BuildMergeStatement(string targetTable, string stagingTable, SqlServerSinkSettings settings) + { + var allColumns = settings.ColumnMappings.Select(m => m.ColumnName).ToList(); + var primaryKeys = settings.PrimaryKeyColumns; + var nonKeyColumns = allColumns.Except(primaryKeys).ToList(); + + // Build ON clause for matching + var onClause = string.Join(" AND ", + primaryKeys.Select(pk => $"target.[{pk}] = source.[{pk}]")); + + // Build INSERT columns and values + var insertColumns = string.Join(", ", allColumns.Select(col => $"[{col}]")); + var insertValues = string.Join(", ", allColumns.Select(col => $"source.[{col}]")); + + // Build the MERGE statement + var mergeStatement = $@" + MERGE {targetTable} AS target + USING {stagingTable} AS source + ON ({onClause})"; + + // Only add UPDATE clause if there are non-key columns to update + if (nonKeyColumns.Count > 0) + { + var updateSet = string.Join(", ", + nonKeyColumns.Select(col => $"target.[{col}] = source.[{col}]")); + + mergeStatement += $@" + WHEN MATCHED THEN + UPDATE SET {updateSet}"; + } + + mergeStatement += $@" + WHEN NOT MATCHED BY TARGET THEN + INSERT ({insertColumns}) + VALUES ({insertValues})"; + + // Add DELETE clause if requested for full table synchronization + if (settings.DeleteNotMatchedBySource) + { + mergeStatement += $@" + WHEN NOT MATCHED BY SOURCE THEN + DELETE"; + } + + mergeStatement += ";"; + + return mergeStatement; } public IEnumerable GetSettings() diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs index 01f390f..038c6ac 100644 --- a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs @@ -4,7 +4,7 @@ namespace Cosmos.DataTransfer.SqlServerExtension { - public class SqlServerSinkSettings : IDataExtensionSettings + public class SqlServerSinkSettings : IDataExtensionSettings, IValidatableObject { [Required] [SensitiveValue] @@ -18,5 +18,62 @@ public class SqlServerSinkSettings : IDataExtensionSettings [MinLength(1)] public List ColumnMappings { get; set; } = new List(); + /// + /// Specifies the behavior when writing data to SQL Server. + /// Insert: Inserts new records only (default). + /// Upsert: Uses SQL MERGE to insert or update based on primary key columns. + /// + public SqlWriteMode WriteMode { get; set; } = SqlWriteMode.Insert; + + /// + /// List of column names that form the primary key for the table. + /// Required when WriteMode is Upsert. These columns are used in the MERGE ON clause. + /// + public List PrimaryKeyColumns { get; set; } = new List(); + + /// + /// When true and WriteMode is Upsert, records in the destination that do not exist in the source will be deleted. + /// This enables full table synchronization. Use with caution as this can result in data loss. + /// Default is false. + /// + public bool DeleteNotMatchedBySource { get; set; } = false; + + public IEnumerable Validate(ValidationContext validationContext) + { + var results = new List(); + + // Custom validation for Upsert mode + if (WriteMode == SqlWriteMode.Upsert) + { + if (PrimaryKeyColumns == null || PrimaryKeyColumns.Count == 0) + { + results.Add(new ValidationResult( + "PrimaryKeyColumns must be specified when WriteMode is Upsert.", + new[] { nameof(PrimaryKeyColumns) })); + } + else + { + // Ensure at least one non-key column exists for updates, unless we're only doing DELETE sync + var allColumns = ColumnMappings.Select(m => m.ColumnName).ToList(); + var nonKeyColumns = allColumns.Except(PrimaryKeyColumns).ToList(); + + if (nonKeyColumns.Count == 0 && !DeleteNotMatchedBySource) + { + results.Add(new ValidationResult( + "At least one non-primary key column must be specified in ColumnMappings for Upsert mode, or set DeleteNotMatchedBySource to true.", + new[] { nameof(ColumnMappings) })); + } + } + } + else if (WriteMode == SqlWriteMode.Insert && DeleteNotMatchedBySource) + { + // DeleteNotMatchedBySource only works with Upsert mode + results.Add(new ValidationResult( + "DeleteNotMatchedBySource can only be used when WriteMode is Upsert.", + new[] { nameof(DeleteNotMatchedBySource) })); + } + + return results; + } } } \ No newline at end of file diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlWriteMode.cs b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlWriteMode.cs new file mode 100644 index 0000000..dd2d526 --- /dev/null +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlWriteMode.cs @@ -0,0 +1,21 @@ +namespace Cosmos.DataTransfer.SqlServerExtension +{ + /// + /// Defines the behavior when writing data to SQL Server. + /// + public enum SqlWriteMode + { + /// + /// Inserts new records only using bulk insert. This is the default behavior. + /// + Insert, + + /// + /// Uses SQL MERGE to insert new records or update existing ones based on primary key columns. + /// When matched: updates all non-key columns with source values. + /// When not matched: inserts new records. + /// Requires PrimaryKeyColumns to be specified. + /// + Upsert + } +} diff --git a/Extensions/SqlServer/README.md b/Extensions/SqlServer/README.md index 0c232c5..4716f2e 100644 --- a/Extensions/SqlServer/README.md +++ b/Extensions/SqlServer/README.md @@ -89,6 +89,26 @@ Sink settings require a `TableName` to define where to insert data and an array Sink settings also include an optional `BatchSize` parameter to specify the count of records to accumulate before bulk inserting, default value is 1000. +#### WriteMode + +The `WriteMode` parameter specifies how data should be written to the target table: + +- `Insert` (default): Inserts new records only using SQL bulk insert. This is the fastest mode but will fail if duplicate keys exist. +- `Upsert`: Uses SQL MERGE to insert new records or update existing ones based on primary key columns. When records match on the primary key(s), all non-key columns are updated with source values. When records don't match, new records are inserted. + +When using `Upsert` mode, you must also specify the `PrimaryKeyColumns` parameter with a list of column names that form the primary key. This can be a single column or a composite key. + +**Important Notes on Upsert Mode:** +- Upsert mode uses a temporary staging table and SQL MERGE statement +- All columns in `ColumnMappings` must exist in the target table +- Primary key columns must be included in `ColumnMappings` +- The operation syncs the source data with the destination (INSERT when not matched, UPDATE when matched) +- By default, DELETE operations are not performed - records only in the destination remain unchanged +- To enable full table synchronization including deletions, set `DeleteNotMatchedBySource` to `true` (use with caution as this can result in data loss) +- Performance may be slower than Insert mode due to the MERGE operation overhead + +#### Basic Insert Example + ```json { "ConnectionString": "", @@ -110,10 +130,112 @@ Sink settings also include an optional `BatchSize` parameter to specify the coun { "ColumnName": "IsSet", "AllowNull": false, - "DefaultValue": false + "DefaultValue": false, "DataType": "System.Boolean" } ], "BatchSize": 1000 } ``` + +#### Upsert Example + +```json +{ + "ConnectionString": "Server=.;Database=PaymentService;Trusted_Connection=True;", + "TableName": "AccountTransactions", + "WriteMode": "Upsert", + "PrimaryKeyColumns": ["Id"], + "ColumnMappings": [ + { + "ColumnName": "Id" + }, + { + "ColumnName": "AccountNumber" + }, + { + "ColumnName": "TransactionDate", + "DataType": "System.DateTime" + }, + { + "ColumnName": "Amount", + "DataType": "System.Decimal" + }, + { + "ColumnName": "Status" + } + ], + "BatchSize": 1000 +} +``` + +#### Upsert with Composite Key Example + +```json +{ + "ConnectionString": "Server=.;Database=SalesDB;Trusted_Connection=True;", + "TableName": "OrderLineItems", + "WriteMode": "Upsert", + "PrimaryKeyColumns": ["OrderId", "LineItemId"], + "ColumnMappings": [ + { + "ColumnName": "OrderId", + "DataType": "System.Int32" + }, + { + "ColumnName": "LineItemId", + "DataType": "System.Int32" + }, + { + "ColumnName": "ProductName" + }, + { + "ColumnName": "Quantity", + "DataType": "System.Int32" + }, + { + "ColumnName": "UnitPrice", + "DataType": "System.Decimal" + } + ], + "BatchSize": 500 +} +``` + +#### Full Table Sync with DELETE Example + +This example demonstrates full table synchronization where records that exist in the destination but not in the source will be deleted. **Use this with caution as it can result in data loss.** + +```json +{ + "ConnectionString": "Server=.;Database=InventoryDB;Trusted_Connection=True;", + "TableName": "Products", + "WriteMode": "Upsert", + "PrimaryKeyColumns": ["ProductId"], + "DeleteNotMatchedBySource": true, + "ColumnMappings": [ + { + "ColumnName": "ProductId", + "DataType": "System.Int32" + }, + { + "ColumnName": "ProductName" + }, + { + "ColumnName": "Price", + "DataType": "System.Decimal" + }, + { + "ColumnName": "StockQuantity", + "DataType": "System.Int32" + }, + { + "ColumnName": "LastUpdated", + "DataType": "System.DateTime" + } + ], + "BatchSize": 1000 +} +``` + +**Warning:** When `DeleteNotMatchedBySource` is set to `true`, any records in the destination table that do not have a matching primary key in the source data will be permanently deleted. Ensure you have proper backups and understand the implications before enabling this option.