From f27954673577c056b2c3567141ff278e60b27650 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 30 Dec 2025 18:57:58 +0000 Subject: [PATCH 1/7] Initial plan From 06e19856dbfe34f353b97b84c3798488059fc149 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 30 Dec 2025 19:06:43 +0000 Subject: [PATCH 2/7] Add WriteMode feature with Upsert support for SQL Server extension - Create SqlWriteMode enum with Insert and Upsert modes - Update SqlServerSinkSettings with WriteMode and PrimaryKeyColumns - Implement WriteInsertAsync and WriteUpsertAsync methods - Add SQL MERGE logic using staging table approach - Create comprehensive unit tests for WriteMode settings - Update README with detailed WriteMode documentation and examples Co-authored-by: philnach <19275540+philnach@users.noreply.github.com> --- ...ansfer.SqlServerExtension.UnitTests.csproj | 1 + .../SqlServerSinkSettingsTests.cs | 179 ++++++++++++++++++ .../SqlServerDataSinkExtension.cs | 148 ++++++++++++++- .../SqlServerSinkSettings.cs | 30 +++ .../SqlWriteMode.cs | 21 ++ Extensions/SqlServer/README.md | 85 ++++++++- 6 files changed, 462 insertions(+), 2 deletions(-) create mode 100644 Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerSinkSettingsTests.cs create mode 100644 Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlWriteMode.cs diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/Cosmos.DataTransfer.SqlServerExtension.UnitTests.csproj b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/Cosmos.DataTransfer.SqlServerExtension.UnitTests.csproj index 36e5f3fc..7498e5f7 100644 --- a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/Cosmos.DataTransfer.SqlServerExtension.UnitTests.csproj +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/Cosmos.DataTransfer.SqlServerExtension.UnitTests.csproj @@ -14,6 +14,7 @@ + diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerSinkSettingsTests.cs b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerSinkSettingsTests.cs new file mode 100644 index 00000000..92f00a00 --- /dev/null +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerSinkSettingsTests.cs @@ -0,0 +1,179 @@ +using System.ComponentModel.DataAnnotations; +using Cosmos.DataTransfer.Interfaces; +using Microsoft.Extensions.Configuration; + +namespace Cosmos.DataTransfer.SqlServerExtension.UnitTests; + +[TestClass] +public class SqlServerSinkSettingsTests +{ + [TestMethod] + public void TestSinkSettings_DefaultWriteMode_IsInsert() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" } + } + }; + + Assert.AreEqual(SqlWriteMode.Insert, settings.WriteMode, "WriteMode should default to Insert"); + } + + [TestMethod] + public void TestSinkSettings_WriteMode_CanBeSetToUpsert() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Upsert, + PrimaryKeyColumns = new List { "Id" }, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" } + } + }; + + Assert.AreEqual(SqlWriteMode.Upsert, settings.WriteMode, "WriteMode should be settable to Upsert"); + } + + [TestMethod] + public void TestSinkSettings_UpsertMode_RequiresPrimaryKeyColumns() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Upsert, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" } + } + }; + + var validationResults = settings.Validate(new ValidationContext(settings)).ToList(); + + Assert.IsTrue(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.PrimaryKeyColumns))), + "Validation should fail when PrimaryKeyColumns is empty and WriteMode is Upsert"); + + Assert.IsTrue(validationResults.Any(v => v.ErrorMessage!.Contains("PrimaryKeyColumns must be specified")), + "Validation error should mention PrimaryKeyColumns requirement"); + } + + [TestMethod] + public void TestSinkSettings_UpsertMode_WithPrimaryKeyColumns_PassesValidation() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Upsert, + PrimaryKeyColumns = new List { "Id" }, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" }, + new ColumnMapping { ColumnName = "Name" } + } + }; + + var validationResults = settings.Validate(new ValidationContext(settings)).ToList(); + + Assert.IsFalse(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.PrimaryKeyColumns))), + "Validation should pass when PrimaryKeyColumns is provided with Upsert mode"); + } + + [TestMethod] + public void TestSinkSettings_InsertMode_DoesNotRequirePrimaryKeyColumns() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Insert, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" } + } + }; + + var validationResults = settings.Validate(new ValidationContext(settings)).ToList(); + + Assert.IsFalse(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.PrimaryKeyColumns))), + "Validation should not require PrimaryKeyColumns when WriteMode is Insert"); + } + + [TestMethod] + public void TestSinkSettings_WriteMode_DeserializesFromJson() + { + // Test JSON to enum conversion for Insert + var jsonInsert = """{"WriteMode": "Insert"}"""; + var configInsert = new ConfigurationBuilder() + .AddJsonStream(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(jsonInsert))) + .Build(); + var settingsInsert = configInsert.Get(); + Assert.AreEqual(SqlWriteMode.Insert, settingsInsert?.WriteMode, "WriteMode should be deserialized from JSON string 'Insert'"); + + // Test JSON to enum conversion for Upsert + var jsonUpsert = """{"WriteMode": "Upsert"}"""; + var configUpsert = new ConfigurationBuilder() + .AddJsonStream(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(jsonUpsert))) + .Build(); + var settingsUpsert = configUpsert.Get(); + Assert.AreEqual(SqlWriteMode.Upsert, settingsUpsert?.WriteMode, "WriteMode should be deserialized from JSON string 'Upsert'"); + } + + [TestMethod] + public void TestSinkSettings_PrimaryKeyColumns_DeserializesFromJson() + { + var json = """ + { + "ConnectionString": "Server=.;Database=Test;", + "TableName": "TestTable", + "WriteMode": "Upsert", + "PrimaryKeyColumns": ["Id", "TenantId"], + "ColumnMappings": [ + {"ColumnName": "Id"}, + {"ColumnName": "TenantId"} + ] + } + """; + + var config = new ConfigurationBuilder() + .AddJsonStream(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(json))) + .Build(); + var settings = config.Get(); + + Assert.IsNotNull(settings, "Settings should be deserialized"); + Assert.AreEqual(SqlWriteMode.Upsert, settings!.WriteMode, "WriteMode should be Upsert"); + Assert.IsNotNull(settings.PrimaryKeyColumns, "PrimaryKeyColumns should not be null"); + Assert.AreEqual(2, settings.PrimaryKeyColumns.Count, "Should have 2 primary key columns"); + CollectionAssert.AreEqual(new[] { "Id", "TenantId" }, settings.PrimaryKeyColumns, "Primary key columns should match"); + } + + [TestMethod] + public void TestSinkSettings_CompositePrimaryKey_PassesValidation() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Upsert, + PrimaryKeyColumns = new List { "TenantId", "UserId" }, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "TenantId" }, + new ColumnMapping { ColumnName = "UserId" }, + new ColumnMapping { ColumnName = "Name" } + } + }; + + var validationResults = settings.Validate(new ValidationContext(settings)).ToList(); + + Assert.IsFalse(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.PrimaryKeyColumns))), + "Validation should pass with composite primary key"); + } +} diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerDataSinkExtension.cs b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerDataSinkExtension.cs index 61804f45..ad6f63f6 100644 --- a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerDataSinkExtension.cs +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerDataSinkExtension.cs @@ -18,6 +18,18 @@ public async Task WriteAsync(IAsyncEnumerable dataItems, IConfigurati var settings = config.Get(); settings.Validate(); + if (settings!.WriteMode == SqlWriteMode.Upsert) + { + await WriteUpsertAsync(dataItems, settings, logger, cancellationToken); + } + else + { + await WriteInsertAsync(dataItems, settings, logger, cancellationToken); + } + } + + private async Task WriteInsertAsync(IAsyncEnumerable dataItems, SqlServerSinkSettings settings, ILogger logger, CancellationToken cancellationToken) + { string tableName = settings!.TableName!; await using var connection = new SqlConnection(settings.ConnectionString); @@ -88,14 +100,148 @@ public async Task WriteAsync(IAsyncEnumerable dataItems, IConfigurati } catch (Exception ex) { - Console.WriteLine($"Error copying data to table {tableName}: {ex.Message}"); + logger.LogError(ex, "Error copying data to table {TableName}", tableName); await transaction.RollbackAsync(cancellationToken); + throw; } } await connection.CloseAsync(); } + private async Task WriteUpsertAsync(IAsyncEnumerable dataItems, SqlServerSinkSettings settings, ILogger logger, CancellationToken cancellationToken) + { + string tableName = settings!.TableName!; + string stagingTableName = $"#Staging_{Guid.NewGuid():N}"; + + await using var connection = new SqlConnection(settings.ConnectionString); + await connection.OpenAsync(cancellationToken); + + try + { + // Create staging table with same structure as target table + var createStagingTableSql = $@" + SELECT TOP 0 * + INTO {stagingTableName} + FROM {tableName}"; + + await using (var createCommand = new SqlCommand(createStagingTableSql, connection)) + { + await createCommand.ExecuteNonQueryAsync(cancellationToken); + } + + // Bulk insert into staging table + using (var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.CheckConstraints | SqlBulkCopyOptions.KeepIdentity, null)) + { + bulkCopy.DestinationTableName = stagingTableName; + + var dataColumns = new Dictionary(); + foreach (ColumnMapping columnMapping in settings.ColumnMappings) + { + Type type = Type.GetType(columnMapping.DataType ?? "System.String")!; + DataColumn dbColumn = new DataColumn(columnMapping.ColumnName, type); + dataColumns.Add(columnMapping, dbColumn); + bulkCopy.ColumnMappings.Add(new SqlBulkCopyColumnMapping(dbColumn.ColumnName, dbColumn.ColumnName)); + } + + var dataTable = new DataTable(); + dataTable.Columns.AddRange(dataColumns.Values.ToArray()); + + var batches = dataItems.Buffer(settings.BatchSize); + await foreach (var batch in batches.WithCancellation(cancellationToken)) + { + foreach (var item in batch) + { + var fieldNames = item.GetFieldNames().ToList(); + DataRow row = dataTable.NewRow(); + foreach (var columnMapping in dataColumns) + { + DataColumn column = columnMapping.Value; + ColumnMapping mapping = columnMapping.Key; + + string? fieldName = mapping.GetFieldName(); + if (fieldName != null) + { + object? value = null; + var sourceField = fieldNames.FirstOrDefault(n => n.Equals(fieldName, StringComparison.CurrentCultureIgnoreCase)); + if (sourceField != null) + { + value = item.GetValue(sourceField); + } + + if (value != null || mapping.AllowNull) + { + if (value is IDataItem child) + { + value = child.AsJsonString(false, false); + } + row[column.ColumnName] = value; + } + else + { + row[column.ColumnName] = mapping.DefaultValue; + } + } + } + dataTable.Rows.Add(row); + } + await bulkCopy.WriteToServerAsync(dataTable, cancellationToken); + dataTable.Clear(); + } + } + + // Build and execute MERGE statement + var mergeStatement = BuildMergeStatement(tableName, stagingTableName, settings); + logger.LogInformation("Executing MERGE statement for upsert operation"); + + await using (var mergeCommand = new SqlCommand(mergeStatement, connection)) + { + mergeCommand.CommandTimeout = 300; // 5 minutes timeout for large merges + var rowsAffected = await mergeCommand.ExecuteNonQueryAsync(cancellationToken); + logger.LogInformation("MERGE completed. Rows affected: {RowsAffected}", rowsAffected); + } + } + catch (Exception ex) + { + logger.LogError(ex, "Error during upsert operation to table {TableName}", tableName); + throw; + } + finally + { + // Clean up staging table (temp tables are automatically dropped on connection close) + await connection.CloseAsync(); + } + } + + private string BuildMergeStatement(string targetTable, string stagingTable, SqlServerSinkSettings settings) + { + var allColumns = settings.ColumnMappings.Select(m => m.ColumnName).ToList(); + var primaryKeys = settings.PrimaryKeyColumns; + var nonKeyColumns = allColumns.Except(primaryKeys).ToList(); + + // Build ON clause for matching + var onClause = string.Join(" AND ", + primaryKeys.Select(pk => $"target.[{pk}] = source.[{pk}]")); + + // Build UPDATE SET clause + var updateSet = string.Join(", ", + nonKeyColumns.Select(col => $"target.[{col}] = source.[{col}]")); + + // Build INSERT columns and values + var insertColumns = string.Join(", ", allColumns.Select(col => $"[{col}]")); + var insertValues = string.Join(", ", allColumns.Select(col => $"source.[{col}]")); + + return $@" + MERGE {targetTable} AS target + USING {stagingTable} AS source + ON ({onClause}) + WHEN MATCHED THEN + UPDATE SET {updateSet} + WHEN NOT MATCHED BY TARGET THEN + INSERT ({insertColumns}) + VALUES ({insertValues});"; + } + public IEnumerable GetSettings() { yield return new SqlServerSinkSettings(); diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs index 01f390f6..e9d7aea9 100644 --- a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs @@ -18,5 +18,35 @@ public class SqlServerSinkSettings : IDataExtensionSettings [MinLength(1)] public List ColumnMappings { get; set; } = new List(); + /// + /// Specifies the behavior when writing data to SQL Server. + /// Insert: Inserts new records only (default). + /// Upsert: Uses SQL MERGE to insert or update based on primary key columns. + /// + public SqlWriteMode WriteMode { get; set; } = SqlWriteMode.Insert; + + /// + /// List of column names that form the primary key for the table. + /// Required when WriteMode is Upsert. These columns are used in the MERGE ON clause. + /// + public List PrimaryKeyColumns { get; set; } = new List(); + + public IEnumerable Validate(ValidationContext validationContext) + { + var results = new List(); + + // Standard validation + Validator.TryValidateObject(this, validationContext, results, true); + + // Custom validation for Upsert mode + if (WriteMode == SqlWriteMode.Upsert && (PrimaryKeyColumns == null || PrimaryKeyColumns.Count == 0)) + { + results.Add(new ValidationResult( + "PrimaryKeyColumns must be specified when WriteMode is Upsert.", + new[] { nameof(PrimaryKeyColumns) })); + } + + return results; + } } } \ No newline at end of file diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlWriteMode.cs b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlWriteMode.cs new file mode 100644 index 00000000..dd2d5267 --- /dev/null +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlWriteMode.cs @@ -0,0 +1,21 @@ +namespace Cosmos.DataTransfer.SqlServerExtension +{ + /// + /// Defines the behavior when writing data to SQL Server. + /// + public enum SqlWriteMode + { + /// + /// Inserts new records only using bulk insert. This is the default behavior. + /// + Insert, + + /// + /// Uses SQL MERGE to insert new records or update existing ones based on primary key columns. + /// When matched: updates all non-key columns with source values. + /// When not matched: inserts new records. + /// Requires PrimaryKeyColumns to be specified. + /// + Upsert + } +} diff --git a/Extensions/SqlServer/README.md b/Extensions/SqlServer/README.md index 0c232c5b..e1ae7c94 100644 --- a/Extensions/SqlServer/README.md +++ b/Extensions/SqlServer/README.md @@ -89,6 +89,25 @@ Sink settings require a `TableName` to define where to insert data and an array Sink settings also include an optional `BatchSize` parameter to specify the count of records to accumulate before bulk inserting, default value is 1000. +#### WriteMode + +The `WriteMode` parameter specifies how data should be written to the target table: + +- `Insert` (default): Inserts new records only using SQL bulk insert. This is the fastest mode but will fail if duplicate keys exist. +- `Upsert`: Uses SQL MERGE to insert new records or update existing ones based on primary key columns. When records match on the primary key(s), all non-key columns are updated with source values. When records don't match, new records are inserted. + +When using `Upsert` mode, you must also specify the `PrimaryKeyColumns` parameter with a list of column names that form the primary key. This can be a single column or a composite key. + +**Important Notes on Upsert Mode:** +- Upsert mode uses a temporary staging table and SQL MERGE statement +- All columns in `ColumnMappings` must exist in the target table +- Primary key columns must be included in `ColumnMappings` +- The operation syncs the source data with the destination (INSERT when not matched, UPDATE when matched) +- DELETE operations are not performed - records only in the destination remain unchanged +- Performance may be slower than Insert mode due to the MERGE operation overhead + +#### Basic Insert Example + ```json { "ConnectionString": "", @@ -110,10 +129,74 @@ Sink settings also include an optional `BatchSize` parameter to specify the coun { "ColumnName": "IsSet", "AllowNull": false, - "DefaultValue": false + "DefaultValue": false, "DataType": "System.Boolean" } ], "BatchSize": 1000 } ``` + +#### Upsert Example + +```json +{ + "ConnectionString": "Server=.;Database=PaymentService;Trusted_Connection=True;", + "TableName": "AccountTransactions", + "WriteMode": "Upsert", + "PrimaryKeyColumns": ["Id"], + "ColumnMappings": [ + { + "ColumnName": "Id" + }, + { + "ColumnName": "AccountNumber" + }, + { + "ColumnName": "TransactionDate", + "DataType": "System.DateTime" + }, + { + "ColumnName": "Amount", + "DataType": "System.Decimal" + }, + { + "ColumnName": "Status" + } + ], + "BatchSize": 1000 +} +``` + +#### Upsert with Composite Key Example + +```json +{ + "ConnectionString": "Server=.;Database=SalesDB;Trusted_Connection=True;", + "TableName": "OrderLineItems", + "WriteMode": "Upsert", + "PrimaryKeyColumns": ["OrderId", "LineItemId"], + "ColumnMappings": [ + { + "ColumnName": "OrderId", + "DataType": "System.Int32" + }, + { + "ColumnName": "LineItemId", + "DataType": "System.Int32" + }, + { + "ColumnName": "ProductName" + }, + { + "ColumnName": "Quantity", + "DataType": "System.Int32" + }, + { + "ColumnName": "UnitPrice", + "DataType": "System.Decimal" + } + ], + "BatchSize": 500 +} +``` From ce0a4b999efdd11600738d5d5b403da79e30576c Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 30 Dec 2025 19:10:15 +0000 Subject: [PATCH 3/7] Fix code review issues for WriteMode feature - Add IValidatableObject interface implementation - Add validation to ensure at least one non-key column exists for Upsert mode - Improve error handling with proper transaction rollback - Fix BuildMergeStatement to handle edge cases gracefully - Add test for all-columns-are-primary-keys validation - Fix stack overflow by removing recursive validation call Co-authored-by: philnach <19275540+philnach@users.noreply.github.com> --- .../SqlServerSinkSettingsTests.cs | 25 ++++ .../SqlServerDataSinkExtension.cs | 132 ++++++++++-------- .../SqlServerSinkSettings.cs | 29 ++-- 3 files changed, 120 insertions(+), 66 deletions(-) diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerSinkSettingsTests.cs b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerSinkSettingsTests.cs index 92f00a00..a9e387c4 100644 --- a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerSinkSettingsTests.cs +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerSinkSettingsTests.cs @@ -176,4 +176,29 @@ public void TestSinkSettings_CompositePrimaryKey_PassesValidation() Assert.IsFalse(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.PrimaryKeyColumns))), "Validation should pass with composite primary key"); } + + [TestMethod] + public void TestSinkSettings_AllColumnsArePrimaryKeys_FailsValidation() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Upsert, + PrimaryKeyColumns = new List { "Id", "Name" }, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" }, + new ColumnMapping { ColumnName = "Name" } + } + }; + + var validationResults = settings.Validate(new ValidationContext(settings)).ToList(); + + Assert.IsTrue(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.ColumnMappings))), + "Validation should fail when all columns are primary keys"); + + Assert.IsTrue(validationResults.Any(v => v.ErrorMessage!.Contains("non-primary key column")), + "Validation error should mention non-primary key column requirement"); + } } diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerDataSinkExtension.cs b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerDataSinkExtension.cs index ad6f63f6..9bcdf282 100644 --- a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerDataSinkExtension.cs +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerDataSinkExtension.cs @@ -34,76 +34,82 @@ private async Task WriteInsertAsync(IAsyncEnumerable dataItems, SqlSe await using var connection = new SqlConnection(settings.ConnectionString); await connection.OpenAsync(cancellationToken); - await using (var transaction = connection.BeginTransaction()) + await using var transaction = connection.BeginTransaction(); + + try { - try - { - using var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.CheckConstraints | SqlBulkCopyOptions.KeepIdentity, transaction); - bulkCopy.DestinationTableName = tableName; + using var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.CheckConstraints | SqlBulkCopyOptions.KeepIdentity, transaction); + bulkCopy.DestinationTableName = tableName; - var dataColumns = new Dictionary(); - foreach (ColumnMapping columnMapping in settings.ColumnMappings) - { - Type type = Type.GetType(columnMapping.DataType ?? "System.String")!; - DataColumn dbColumn = new DataColumn(columnMapping.ColumnName, type); - dataColumns.Add(columnMapping, dbColumn); - bulkCopy.ColumnMappings.Add(new SqlBulkCopyColumnMapping(dbColumn.ColumnName, dbColumn.ColumnName)); - } + var dataColumns = new Dictionary(); + foreach (ColumnMapping columnMapping in settings.ColumnMappings) + { + Type type = Type.GetType(columnMapping.DataType ?? "System.String")!; + DataColumn dbColumn = new DataColumn(columnMapping.ColumnName, type); + dataColumns.Add(columnMapping, dbColumn); + bulkCopy.ColumnMappings.Add(new SqlBulkCopyColumnMapping(dbColumn.ColumnName, dbColumn.ColumnName)); + } - - var dataTable = new DataTable(); - dataTable.Columns.AddRange(dataColumns.Values.ToArray()); + + var dataTable = new DataTable(); + dataTable.Columns.AddRange(dataColumns.Values.ToArray()); - var batches = dataItems.Buffer(settings.BatchSize); - await foreach (var batch in batches.WithCancellation(cancellationToken)) + var batches = dataItems.Buffer(settings.BatchSize); + await foreach (var batch in batches.WithCancellation(cancellationToken)) + { + foreach (var item in batch) { - foreach (var item in batch) + var fieldNames = item.GetFieldNames().ToList(); + DataRow row = dataTable.NewRow(); + foreach (var columnMapping in dataColumns) { - var fieldNames = item.GetFieldNames().ToList(); - DataRow row = dataTable.NewRow(); - foreach (var columnMapping in dataColumns) - { - DataColumn column = columnMapping.Value; - ColumnMapping mapping = columnMapping.Key; + DataColumn column = columnMapping.Value; + ColumnMapping mapping = columnMapping.Key; - string? fieldName = mapping.GetFieldName(); - if (fieldName != null) + string? fieldName = mapping.GetFieldName(); + if (fieldName != null) + { + object? value = null; + var sourceField = fieldNames.FirstOrDefault(n => n.Equals(fieldName, StringComparison.CurrentCultureIgnoreCase)); + if (sourceField != null) { - object? value = null; - var sourceField = fieldNames.FirstOrDefault(n => n.Equals(fieldName, StringComparison.CurrentCultureIgnoreCase)); - if (sourceField != null) - { - value = item.GetValue(sourceField); - } + value = item.GetValue(sourceField); + } - if (value != null || mapping.AllowNull) - { - if (value is IDataItem child) - { - value = child.AsJsonString(false, false); - } - row[column.ColumnName] = value; - } - else + if (value != null || mapping.AllowNull) + { + if (value is IDataItem child) { - row[column.ColumnName] = mapping.DefaultValue; + value = child.AsJsonString(false, false); } + row[column.ColumnName] = value; + } + else + { + row[column.ColumnName] = mapping.DefaultValue; } } - dataTable.Rows.Add(row); } - await bulkCopy.WriteToServerAsync(dataTable, cancellationToken); - dataTable.Clear(); + dataTable.Rows.Add(row); } - - await transaction.CommitAsync(cancellationToken); + await bulkCopy.WriteToServerAsync(dataTable, cancellationToken); + dataTable.Clear(); } - catch (Exception ex) + + await transaction.CommitAsync(cancellationToken); + } + catch (Exception ex) + { + logger.LogError(ex, "Error copying data to table {TableName}", tableName); + try { - logger.LogError(ex, "Error copying data to table {TableName}", tableName); await transaction.RollbackAsync(cancellationToken); - throw; } + catch (Exception rollbackEx) + { + logger.LogError(rollbackEx, "Error rolling back transaction for table {TableName}", tableName); + } + throw; } await connection.CloseAsync(); @@ -223,23 +229,33 @@ private string BuildMergeStatement(string targetTable, string stagingTable, SqlS var onClause = string.Join(" AND ", primaryKeys.Select(pk => $"target.[{pk}] = source.[{pk}]")); - // Build UPDATE SET clause - var updateSet = string.Join(", ", - nonKeyColumns.Select(col => $"target.[{col}] = source.[{col}]")); - // Build INSERT columns and values var insertColumns = string.Join(", ", allColumns.Select(col => $"[{col}]")); var insertValues = string.Join(", ", allColumns.Select(col => $"source.[{col}]")); - return $@" + // Build the MERGE statement + var mergeStatement = $@" MERGE {targetTable} AS target USING {stagingTable} AS source - ON ({onClause}) + ON ({onClause})"; + + // Only add UPDATE clause if there are non-key columns to update + if (nonKeyColumns.Count > 0) + { + var updateSet = string.Join(", ", + nonKeyColumns.Select(col => $"target.[{col}] = source.[{col}]")); + + mergeStatement += $@" WHEN MATCHED THEN - UPDATE SET {updateSet} + UPDATE SET {updateSet}"; + } + + mergeStatement += $@" WHEN NOT MATCHED BY TARGET THEN INSERT ({insertColumns}) VALUES ({insertValues});"; + + return mergeStatement; } public IEnumerable GetSettings() diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs index e9d7aea9..1ea77807 100644 --- a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs @@ -4,7 +4,7 @@ namespace Cosmos.DataTransfer.SqlServerExtension { - public class SqlServerSinkSettings : IDataExtensionSettings + public class SqlServerSinkSettings : IDataExtensionSettings, IValidatableObject { [Required] [SensitiveValue] @@ -34,16 +34,29 @@ public class SqlServerSinkSettings : IDataExtensionSettings public IEnumerable Validate(ValidationContext validationContext) { var results = new List(); - - // Standard validation - Validator.TryValidateObject(this, validationContext, results, true); // Custom validation for Upsert mode - if (WriteMode == SqlWriteMode.Upsert && (PrimaryKeyColumns == null || PrimaryKeyColumns.Count == 0)) + if (WriteMode == SqlWriteMode.Upsert) { - results.Add(new ValidationResult( - "PrimaryKeyColumns must be specified when WriteMode is Upsert.", - new[] { nameof(PrimaryKeyColumns) })); + if (PrimaryKeyColumns == null || PrimaryKeyColumns.Count == 0) + { + results.Add(new ValidationResult( + "PrimaryKeyColumns must be specified when WriteMode is Upsert.", + new[] { nameof(PrimaryKeyColumns) })); + } + else + { + // Ensure at least one non-key column exists for updates + var allColumns = ColumnMappings.Select(m => m.ColumnName).ToList(); + var nonKeyColumns = allColumns.Except(PrimaryKeyColumns).ToList(); + + if (nonKeyColumns.Count == 0) + { + results.Add(new ValidationResult( + "At least one non-primary key column must be specified in ColumnMappings for Upsert mode.", + new[] { nameof(ColumnMappings) })); + } + } } return results; From 77fd58639cf4c36e39da69c727b901b714324f11 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 30 Dec 2025 19:12:57 +0000 Subject: [PATCH 4/7] Add SQL injection protection for table and column names - Add ValidateSqlIdentifier method to validate SQL identifiers - Apply validation in both WriteInsertAsync and WriteUpsertAsync - Prevent SQL injection by ensuring identifiers only contain safe characters - Allows alphanumeric, underscores, dots, spaces, and brackets Co-authored-by: philnach <19275540+philnach@users.noreply.github.com> --- .../SqlServerDataSinkExtension.cs | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerDataSinkExtension.cs b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerDataSinkExtension.cs index 9bcdf282..8dd320ec 100644 --- a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerDataSinkExtension.cs +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerDataSinkExtension.cs @@ -31,6 +31,15 @@ public async Task WriteAsync(IAsyncEnumerable dataItems, IConfigurati private async Task WriteInsertAsync(IAsyncEnumerable dataItems, SqlServerSinkSettings settings, ILogger logger, CancellationToken cancellationToken) { string tableName = settings!.TableName!; + + // Validate table name to prevent SQL injection + ValidateSqlIdentifier(tableName, nameof(settings.TableName)); + + // Validate column names to prevent SQL injection + foreach (var column in settings.ColumnMappings) + { + ValidateSqlIdentifier(column.ColumnName!, nameof(column.ColumnName)); + } await using var connection = new SqlConnection(settings.ConnectionString); await connection.OpenAsync(cancellationToken); @@ -118,6 +127,20 @@ private async Task WriteInsertAsync(IAsyncEnumerable dataItems, SqlSe private async Task WriteUpsertAsync(IAsyncEnumerable dataItems, SqlServerSinkSettings settings, ILogger logger, CancellationToken cancellationToken) { string tableName = settings!.TableName!; + + // Validate table name to prevent SQL injection + ValidateSqlIdentifier(tableName, nameof(settings.TableName)); + + // Validate column names to prevent SQL injection + foreach (var column in settings.ColumnMappings) + { + ValidateSqlIdentifier(column.ColumnName!, nameof(column.ColumnName)); + } + foreach (var pkColumn in settings.PrimaryKeyColumns) + { + ValidateSqlIdentifier(pkColumn, nameof(settings.PrimaryKeyColumns)); + } + string stagingTableName = $"#Staging_{Guid.NewGuid():N}"; await using var connection = new SqlConnection(settings.ConnectionString); @@ -219,6 +242,26 @@ SELECT TOP 0 * } } + /// + /// Validates that a SQL identifier contains only allowed characters to prevent SQL injection. + /// Allows alphanumeric characters, underscores, dots (for schema.table), and spaces (for quoted identifiers). + /// + private static void ValidateSqlIdentifier(string identifier, string parameterName) + { + if (string.IsNullOrWhiteSpace(identifier)) + { + throw new ArgumentException("SQL identifier cannot be null or empty.", parameterName); + } + + // Allow alphanumeric, underscore, dot (for schema.table), space (for quoted identifiers), and brackets + if (!System.Text.RegularExpressions.Regex.IsMatch(identifier, @"^[\w\.\s\[\]]+$")) + { + throw new ArgumentException( + $"Invalid SQL identifier '{identifier}'. Identifiers can only contain alphanumeric characters, underscores, dots, spaces, and brackets.", + parameterName); + } + } + private string BuildMergeStatement(string targetTable, string stagingTable, SqlServerSinkSettings settings) { var allColumns = settings.ColumnMappings.Select(m => m.ColumnName).ToList(); From 4913b0f3af948c0aa749dbf0098fde1055ad2d99 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 30 Dec 2025 19:38:43 +0000 Subject: [PATCH 5/7] Add DeleteNotMatchedBySource option for full table sync with DELETE support - Add DeleteNotMatchedBySource property to SqlServerSinkSettings (defaults to false) - Update BuildMergeStatement to include WHEN NOT MATCHED BY SOURCE THEN DELETE when enabled - Add 3 new unit tests for DeleteNotMatchedBySource functionality - Update README with documentation and warning about data loss - Add full table sync example configuration with DELETE enabled Co-authored-by: philnach <19275540+philnach@users.noreply.github.com> --- .../SqlServerSinkSettingsTests.cs | 65 +++++++++++++++++++ .../SqlServerDataSinkExtension.cs | 12 +++- .../SqlServerSinkSettings.cs | 7 ++ Extensions/SqlServer/README.md | 41 +++++++++++- 4 files changed, 123 insertions(+), 2 deletions(-) diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerSinkSettingsTests.cs b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerSinkSettingsTests.cs index a9e387c4..a90f1400 100644 --- a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerSinkSettingsTests.cs +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerSinkSettingsTests.cs @@ -201,4 +201,69 @@ public void TestSinkSettings_AllColumnsArePrimaryKeys_FailsValidation() Assert.IsTrue(validationResults.Any(v => v.ErrorMessage!.Contains("non-primary key column")), "Validation error should mention non-primary key column requirement"); } + + [TestMethod] + public void TestSinkSettings_DeleteNotMatchedBySource_DefaultsToFalse() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Upsert, + PrimaryKeyColumns = new List { "Id" }, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" }, + new ColumnMapping { ColumnName = "Name" } + } + }; + + Assert.IsFalse(settings.DeleteNotMatchedBySource, "DeleteNotMatchedBySource should default to false"); + } + + [TestMethod] + public void TestSinkSettings_DeleteNotMatchedBySource_CanBeSetToTrue() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Upsert, + PrimaryKeyColumns = new List { "Id" }, + DeleteNotMatchedBySource = true, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" }, + new ColumnMapping { ColumnName = "Name" } + } + }; + + Assert.IsTrue(settings.DeleteNotMatchedBySource, "DeleteNotMatchedBySource should be settable to true"); + } + + [TestMethod] + public void TestSinkSettings_DeleteNotMatchedBySource_DeserializesFromJson() + { + var json = """ + { + "ConnectionString": "Server=.;Database=Test;", + "TableName": "TestTable", + "WriteMode": "Upsert", + "PrimaryKeyColumns": ["Id"], + "DeleteNotMatchedBySource": true, + "ColumnMappings": [ + {"ColumnName": "Id"}, + {"ColumnName": "Name"} + ] + } + """; + + var config = new ConfigurationBuilder() + .AddJsonStream(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(json))) + .Build(); + var settings = config.Get(); + + Assert.IsNotNull(settings, "Settings should be deserialized"); + Assert.IsTrue(settings!.DeleteNotMatchedBySource, "DeleteNotMatchedBySource should be true"); + } } diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerDataSinkExtension.cs b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerDataSinkExtension.cs index 8dd320ec..1549abd8 100644 --- a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerDataSinkExtension.cs +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerDataSinkExtension.cs @@ -296,7 +296,17 @@ WHEN MATCHED THEN mergeStatement += $@" WHEN NOT MATCHED BY TARGET THEN INSERT ({insertColumns}) - VALUES ({insertValues});"; + VALUES ({insertValues})"; + + // Add DELETE clause if requested for full table synchronization + if (settings.DeleteNotMatchedBySource) + { + mergeStatement += $@" + WHEN NOT MATCHED BY SOURCE THEN + DELETE"; + } + + mergeStatement += ";"; return mergeStatement; } diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs index 1ea77807..ae76fe73 100644 --- a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs @@ -31,6 +31,13 @@ public class SqlServerSinkSettings : IDataExtensionSettings, IValidatableObject /// public List PrimaryKeyColumns { get; set; } = new List(); + /// + /// When true and WriteMode is Upsert, records in the destination that do not exist in the source will be deleted. + /// This enables full table synchronization. Use with caution as this can result in data loss. + /// Default is false. + /// + public bool DeleteNotMatchedBySource { get; set; } = false; + public IEnumerable Validate(ValidationContext validationContext) { var results = new List(); diff --git a/Extensions/SqlServer/README.md b/Extensions/SqlServer/README.md index e1ae7c94..4716f2ef 100644 --- a/Extensions/SqlServer/README.md +++ b/Extensions/SqlServer/README.md @@ -103,7 +103,8 @@ When using `Upsert` mode, you must also specify the `PrimaryKeyColumns` paramete - All columns in `ColumnMappings` must exist in the target table - Primary key columns must be included in `ColumnMappings` - The operation syncs the source data with the destination (INSERT when not matched, UPDATE when matched) -- DELETE operations are not performed - records only in the destination remain unchanged +- By default, DELETE operations are not performed - records only in the destination remain unchanged +- To enable full table synchronization including deletions, set `DeleteNotMatchedBySource` to `true` (use with caution as this can result in data loss) - Performance may be slower than Insert mode due to the MERGE operation overhead #### Basic Insert Example @@ -200,3 +201,41 @@ When using `Upsert` mode, you must also specify the `PrimaryKeyColumns` paramete "BatchSize": 500 } ``` + +#### Full Table Sync with DELETE Example + +This example demonstrates full table synchronization where records that exist in the destination but not in the source will be deleted. **Use this with caution as it can result in data loss.** + +```json +{ + "ConnectionString": "Server=.;Database=InventoryDB;Trusted_Connection=True;", + "TableName": "Products", + "WriteMode": "Upsert", + "PrimaryKeyColumns": ["ProductId"], + "DeleteNotMatchedBySource": true, + "ColumnMappings": [ + { + "ColumnName": "ProductId", + "DataType": "System.Int32" + }, + { + "ColumnName": "ProductName" + }, + { + "ColumnName": "Price", + "DataType": "System.Decimal" + }, + { + "ColumnName": "StockQuantity", + "DataType": "System.Int32" + }, + { + "ColumnName": "LastUpdated", + "DataType": "System.DateTime" + } + ], + "BatchSize": 1000 +} +``` + +**Warning:** When `DeleteNotMatchedBySource` is set to `true`, any records in the destination table that do not have a matching primary key in the source data will be permanently deleted. Ensure you have proper backups and understand the implications before enabling this option. From ac59bb15ea8da8d79ae76923d715ad4811f36192 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 30 Dec 2025 19:41:11 +0000 Subject: [PATCH 6/7] Add validation to prevent DeleteNotMatchedBySource with Insert mode - Add validation to ensure DeleteNotMatchedBySource is only used with Upsert mode - Add unit test to verify the validation logic - Prevents invalid configuration that would have no effect Co-authored-by: philnach <19275540+philnach@users.noreply.github.com> --- .../SqlServerSinkSettingsTests.cs | 25 +++++++++++++++++++ .../SqlServerSinkSettings.cs | 7 ++++++ 2 files changed, 32 insertions(+) diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerSinkSettingsTests.cs b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerSinkSettingsTests.cs index a90f1400..b29d0ba5 100644 --- a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerSinkSettingsTests.cs +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerSinkSettingsTests.cs @@ -266,4 +266,29 @@ public void TestSinkSettings_DeleteNotMatchedBySource_DeserializesFromJson() Assert.IsNotNull(settings, "Settings should be deserialized"); Assert.IsTrue(settings!.DeleteNotMatchedBySource, "DeleteNotMatchedBySource should be true"); } + + [TestMethod] + public void TestSinkSettings_DeleteNotMatchedBySource_WithInsertMode_FailsValidation() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Insert, + DeleteNotMatchedBySource = true, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" }, + new ColumnMapping { ColumnName = "Name" } + } + }; + + var validationResults = settings.Validate(new ValidationContext(settings)).ToList(); + + Assert.IsTrue(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.DeleteNotMatchedBySource))), + "Validation should fail when DeleteNotMatchedBySource is true with Insert mode"); + + Assert.IsTrue(validationResults.Any(v => v.ErrorMessage!.Contains("can only be used when WriteMode is Upsert")), + "Validation error should mention Upsert mode requirement"); + } } diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs index ae76fe73..0bbf4933 100644 --- a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs @@ -65,6 +65,13 @@ public IEnumerable Validate(ValidationContext validationContex } } } + else if (WriteMode == SqlWriteMode.Insert && DeleteNotMatchedBySource) + { + // DeleteNotMatchedBySource only works with Upsert mode + results.Add(new ValidationResult( + "DeleteNotMatchedBySource can only be used when WriteMode is Upsert.", + new[] { nameof(DeleteNotMatchedBySource) })); + } return results; } From 7403a1e3581f832ca3b6234a93ffc3e95d9c4cfe Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 30 Dec 2025 19:43:29 +0000 Subject: [PATCH 7/7] Refine validation to allow all-primary-key columns with DeleteNotMatchedBySource - Update validation to allow Upsert mode with only primary key columns when DeleteNotMatchedBySource is true - This enables valid use case of syncing which records exist without updating other columns - Add test case to verify this scenario passes validation Co-authored-by: philnach <19275540+philnach@users.noreply.github.com> --- .../SqlServerSinkSettingsTests.cs | 25 ++++++++++++++++++- .../SqlServerSinkSettings.cs | 6 ++--- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerSinkSettingsTests.cs b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerSinkSettingsTests.cs index b29d0ba5..0f129ba6 100644 --- a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerSinkSettingsTests.cs +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/SqlServerSinkSettingsTests.cs @@ -196,12 +196,35 @@ public void TestSinkSettings_AllColumnsArePrimaryKeys_FailsValidation() var validationResults = settings.Validate(new ValidationContext(settings)).ToList(); Assert.IsTrue(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.ColumnMappings))), - "Validation should fail when all columns are primary keys"); + "Validation should fail when all columns are primary keys without DeleteNotMatchedBySource"); Assert.IsTrue(validationResults.Any(v => v.ErrorMessage!.Contains("non-primary key column")), "Validation error should mention non-primary key column requirement"); } + [TestMethod] + public void TestSinkSettings_AllColumnsArePrimaryKeys_WithDelete_PassesValidation() + { + var settings = new SqlServerSinkSettings + { + ConnectionString = "Server=.;Database=Test;", + TableName = "TestTable", + WriteMode = SqlWriteMode.Upsert, + PrimaryKeyColumns = new List { "Id", "Name" }, + DeleteNotMatchedBySource = true, + ColumnMappings = new List + { + new ColumnMapping { ColumnName = "Id" }, + new ColumnMapping { ColumnName = "Name" } + } + }; + + var validationResults = settings.Validate(new ValidationContext(settings)).ToList(); + + Assert.IsFalse(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.ColumnMappings))), + "Validation should pass when all columns are primary keys but DeleteNotMatchedBySource is true"); + } + [TestMethod] public void TestSinkSettings_DeleteNotMatchedBySource_DefaultsToFalse() { diff --git a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs index 0bbf4933..038c6ac4 100644 --- a/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs +++ b/Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension/SqlServerSinkSettings.cs @@ -53,14 +53,14 @@ public IEnumerable Validate(ValidationContext validationContex } else { - // Ensure at least one non-key column exists for updates + // Ensure at least one non-key column exists for updates, unless we're only doing DELETE sync var allColumns = ColumnMappings.Select(m => m.ColumnName).ToList(); var nonKeyColumns = allColumns.Except(PrimaryKeyColumns).ToList(); - if (nonKeyColumns.Count == 0) + if (nonKeyColumns.Count == 0 && !DeleteNotMatchedBySource) { results.Add(new ValidationResult( - "At least one non-primary key column must be specified in ColumnMappings for Upsert mode.", + "At least one non-primary key column must be specified in ColumnMappings for Upsert mode, or set DeleteNotMatchedBySource to true.", new[] { nameof(ColumnMappings) })); } }