Skip to content

Commit 2b73fd9

Browse files
authored
Merge pull request #228 from AzureCosmosDB/copilot/add-sql-server-write-mode
Add WriteMode with SQL MERGE upsert support including optional DELETE to SQL Server extension
2 parents f28c644 + 095de3d commit 2b73fd9

File tree

6 files changed

+745
-12
lines changed

6 files changed

+745
-12
lines changed

Extensions/SqlServer/Cosmos.DataTransfer.SqlServerExtension.UnitTests/Cosmos.DataTransfer.SqlServerExtension.UnitTests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
<ItemGroup>
1515
<PackageReference Include="Microsoft.Data.Sqlite" />
1616
<PackageReference Include="Microsoft.Extensions.Configuration" />
17+
<PackageReference Include="Microsoft.Extensions.Configuration.Json" />
1718
<PackageReference Include="Microsoft.NET.Test.Sdk" />
1819
<PackageReference Include="MSTest.TestAdapter" />
1920
<PackageReference Include="MSTest.TestFramework" />
Lines changed: 317 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,317 @@
1+
using System.ComponentModel.DataAnnotations;
2+
using Cosmos.DataTransfer.Interfaces;
3+
using Microsoft.Extensions.Configuration;
4+
5+
namespace Cosmos.DataTransfer.SqlServerExtension.UnitTests;
6+
7+
[TestClass]
8+
public class SqlServerSinkSettingsTests
9+
{
10+
[TestMethod]
11+
public void TestSinkSettings_DefaultWriteMode_IsInsert()
12+
{
13+
var settings = new SqlServerSinkSettings
14+
{
15+
ConnectionString = "Server=.;Database=Test;",
16+
TableName = "TestTable",
17+
ColumnMappings = new List<ColumnMapping>
18+
{
19+
new ColumnMapping { ColumnName = "Id" }
20+
}
21+
};
22+
23+
Assert.AreEqual(SqlWriteMode.Insert, settings.WriteMode, "WriteMode should default to Insert");
24+
}
25+
26+
[TestMethod]
27+
public void TestSinkSettings_WriteMode_CanBeSetToUpsert()
28+
{
29+
var settings = new SqlServerSinkSettings
30+
{
31+
ConnectionString = "Server=.;Database=Test;",
32+
TableName = "TestTable",
33+
WriteMode = SqlWriteMode.Upsert,
34+
PrimaryKeyColumns = new List<string> { "Id" },
35+
ColumnMappings = new List<ColumnMapping>
36+
{
37+
new ColumnMapping { ColumnName = "Id" }
38+
}
39+
};
40+
41+
Assert.AreEqual(SqlWriteMode.Upsert, settings.WriteMode, "WriteMode should be settable to Upsert");
42+
}
43+
44+
[TestMethod]
45+
public void TestSinkSettings_UpsertMode_RequiresPrimaryKeyColumns()
46+
{
47+
var settings = new SqlServerSinkSettings
48+
{
49+
ConnectionString = "Server=.;Database=Test;",
50+
TableName = "TestTable",
51+
WriteMode = SqlWriteMode.Upsert,
52+
ColumnMappings = new List<ColumnMapping>
53+
{
54+
new ColumnMapping { ColumnName = "Id" }
55+
}
56+
};
57+
58+
var validationResults = settings.Validate(new ValidationContext(settings)).ToList();
59+
60+
Assert.IsTrue(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.PrimaryKeyColumns))),
61+
"Validation should fail when PrimaryKeyColumns is empty and WriteMode is Upsert");
62+
63+
Assert.IsTrue(validationResults.Any(v => v.ErrorMessage!.Contains("PrimaryKeyColumns must be specified")),
64+
"Validation error should mention PrimaryKeyColumns requirement");
65+
}
66+
67+
[TestMethod]
68+
public void TestSinkSettings_UpsertMode_WithPrimaryKeyColumns_PassesValidation()
69+
{
70+
var settings = new SqlServerSinkSettings
71+
{
72+
ConnectionString = "Server=.;Database=Test;",
73+
TableName = "TestTable",
74+
WriteMode = SqlWriteMode.Upsert,
75+
PrimaryKeyColumns = new List<string> { "Id" },
76+
ColumnMappings = new List<ColumnMapping>
77+
{
78+
new ColumnMapping { ColumnName = "Id" },
79+
new ColumnMapping { ColumnName = "Name" }
80+
}
81+
};
82+
83+
var validationResults = settings.Validate(new ValidationContext(settings)).ToList();
84+
85+
Assert.IsFalse(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.PrimaryKeyColumns))),
86+
"Validation should pass when PrimaryKeyColumns is provided with Upsert mode");
87+
}
88+
89+
[TestMethod]
90+
public void TestSinkSettings_InsertMode_DoesNotRequirePrimaryKeyColumns()
91+
{
92+
var settings = new SqlServerSinkSettings
93+
{
94+
ConnectionString = "Server=.;Database=Test;",
95+
TableName = "TestTable",
96+
WriteMode = SqlWriteMode.Insert,
97+
ColumnMappings = new List<ColumnMapping>
98+
{
99+
new ColumnMapping { ColumnName = "Id" }
100+
}
101+
};
102+
103+
var validationResults = settings.Validate(new ValidationContext(settings)).ToList();
104+
105+
Assert.IsFalse(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.PrimaryKeyColumns))),
106+
"Validation should not require PrimaryKeyColumns when WriteMode is Insert");
107+
}
108+
109+
[TestMethod]
110+
public void TestSinkSettings_WriteMode_DeserializesFromJson()
111+
{
112+
// Test JSON to enum conversion for Insert
113+
var jsonInsert = """{"WriteMode": "Insert"}""";
114+
var configInsert = new ConfigurationBuilder()
115+
.AddJsonStream(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(jsonInsert)))
116+
.Build();
117+
var settingsInsert = configInsert.Get<SqlServerSinkSettings>();
118+
Assert.AreEqual(SqlWriteMode.Insert, settingsInsert?.WriteMode, "WriteMode should be deserialized from JSON string 'Insert'");
119+
120+
// Test JSON to enum conversion for Upsert
121+
var jsonUpsert = """{"WriteMode": "Upsert"}""";
122+
var configUpsert = new ConfigurationBuilder()
123+
.AddJsonStream(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(jsonUpsert)))
124+
.Build();
125+
var settingsUpsert = configUpsert.Get<SqlServerSinkSettings>();
126+
Assert.AreEqual(SqlWriteMode.Upsert, settingsUpsert?.WriteMode, "WriteMode should be deserialized from JSON string 'Upsert'");
127+
}
128+
129+
[TestMethod]
130+
public void TestSinkSettings_PrimaryKeyColumns_DeserializesFromJson()
131+
{
132+
var json = """
133+
{
134+
"ConnectionString": "Server=.;Database=Test;",
135+
"TableName": "TestTable",
136+
"WriteMode": "Upsert",
137+
"PrimaryKeyColumns": ["Id", "TenantId"],
138+
"ColumnMappings": [
139+
{"ColumnName": "Id"},
140+
{"ColumnName": "TenantId"}
141+
]
142+
}
143+
""";
144+
145+
var config = new ConfigurationBuilder()
146+
.AddJsonStream(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(json)))
147+
.Build();
148+
var settings = config.Get<SqlServerSinkSettings>();
149+
150+
Assert.IsNotNull(settings, "Settings should be deserialized");
151+
Assert.AreEqual(SqlWriteMode.Upsert, settings!.WriteMode, "WriteMode should be Upsert");
152+
Assert.IsNotNull(settings.PrimaryKeyColumns, "PrimaryKeyColumns should not be null");
153+
Assert.AreEqual(2, settings.PrimaryKeyColumns.Count, "Should have 2 primary key columns");
154+
CollectionAssert.AreEqual(new[] { "Id", "TenantId" }, settings.PrimaryKeyColumns, "Primary key columns should match");
155+
}
156+
157+
[TestMethod]
158+
public void TestSinkSettings_CompositePrimaryKey_PassesValidation()
159+
{
160+
var settings = new SqlServerSinkSettings
161+
{
162+
ConnectionString = "Server=.;Database=Test;",
163+
TableName = "TestTable",
164+
WriteMode = SqlWriteMode.Upsert,
165+
PrimaryKeyColumns = new List<string> { "TenantId", "UserId" },
166+
ColumnMappings = new List<ColumnMapping>
167+
{
168+
new ColumnMapping { ColumnName = "TenantId" },
169+
new ColumnMapping { ColumnName = "UserId" },
170+
new ColumnMapping { ColumnName = "Name" }
171+
}
172+
};
173+
174+
var validationResults = settings.Validate(new ValidationContext(settings)).ToList();
175+
176+
Assert.IsFalse(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.PrimaryKeyColumns))),
177+
"Validation should pass with composite primary key");
178+
}
179+
180+
[TestMethod]
181+
public void TestSinkSettings_AllColumnsArePrimaryKeys_FailsValidation()
182+
{
183+
var settings = new SqlServerSinkSettings
184+
{
185+
ConnectionString = "Server=.;Database=Test;",
186+
TableName = "TestTable",
187+
WriteMode = SqlWriteMode.Upsert,
188+
PrimaryKeyColumns = new List<string> { "Id", "Name" },
189+
ColumnMappings = new List<ColumnMapping>
190+
{
191+
new ColumnMapping { ColumnName = "Id" },
192+
new ColumnMapping { ColumnName = "Name" }
193+
}
194+
};
195+
196+
var validationResults = settings.Validate(new ValidationContext(settings)).ToList();
197+
198+
Assert.IsTrue(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.ColumnMappings))),
199+
"Validation should fail when all columns are primary keys without DeleteNotMatchedBySource");
200+
201+
Assert.IsTrue(validationResults.Any(v => v.ErrorMessage!.Contains("non-primary key column")),
202+
"Validation error should mention non-primary key column requirement");
203+
}
204+
205+
[TestMethod]
206+
public void TestSinkSettings_AllColumnsArePrimaryKeys_WithDelete_PassesValidation()
207+
{
208+
var settings = new SqlServerSinkSettings
209+
{
210+
ConnectionString = "Server=.;Database=Test;",
211+
TableName = "TestTable",
212+
WriteMode = SqlWriteMode.Upsert,
213+
PrimaryKeyColumns = new List<string> { "Id", "Name" },
214+
DeleteNotMatchedBySource = true,
215+
ColumnMappings = new List<ColumnMapping>
216+
{
217+
new ColumnMapping { ColumnName = "Id" },
218+
new ColumnMapping { ColumnName = "Name" }
219+
}
220+
};
221+
222+
var validationResults = settings.Validate(new ValidationContext(settings)).ToList();
223+
224+
Assert.IsFalse(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.ColumnMappings))),
225+
"Validation should pass when all columns are primary keys but DeleteNotMatchedBySource is true");
226+
}
227+
228+
[TestMethod]
229+
public void TestSinkSettings_DeleteNotMatchedBySource_DefaultsToFalse()
230+
{
231+
var settings = new SqlServerSinkSettings
232+
{
233+
ConnectionString = "Server=.;Database=Test;",
234+
TableName = "TestTable",
235+
WriteMode = SqlWriteMode.Upsert,
236+
PrimaryKeyColumns = new List<string> { "Id" },
237+
ColumnMappings = new List<ColumnMapping>
238+
{
239+
new ColumnMapping { ColumnName = "Id" },
240+
new ColumnMapping { ColumnName = "Name" }
241+
}
242+
};
243+
244+
Assert.IsFalse(settings.DeleteNotMatchedBySource, "DeleteNotMatchedBySource should default to false");
245+
}
246+
247+
[TestMethod]
248+
public void TestSinkSettings_DeleteNotMatchedBySource_CanBeSetToTrue()
249+
{
250+
var settings = new SqlServerSinkSettings
251+
{
252+
ConnectionString = "Server=.;Database=Test;",
253+
TableName = "TestTable",
254+
WriteMode = SqlWriteMode.Upsert,
255+
PrimaryKeyColumns = new List<string> { "Id" },
256+
DeleteNotMatchedBySource = true,
257+
ColumnMappings = new List<ColumnMapping>
258+
{
259+
new ColumnMapping { ColumnName = "Id" },
260+
new ColumnMapping { ColumnName = "Name" }
261+
}
262+
};
263+
264+
Assert.IsTrue(settings.DeleteNotMatchedBySource, "DeleteNotMatchedBySource should be settable to true");
265+
}
266+
267+
[TestMethod]
268+
public void TestSinkSettings_DeleteNotMatchedBySource_DeserializesFromJson()
269+
{
270+
var json = """
271+
{
272+
"ConnectionString": "Server=.;Database=Test;",
273+
"TableName": "TestTable",
274+
"WriteMode": "Upsert",
275+
"PrimaryKeyColumns": ["Id"],
276+
"DeleteNotMatchedBySource": true,
277+
"ColumnMappings": [
278+
{"ColumnName": "Id"},
279+
{"ColumnName": "Name"}
280+
]
281+
}
282+
""";
283+
284+
var config = new ConfigurationBuilder()
285+
.AddJsonStream(new MemoryStream(System.Text.Encoding.UTF8.GetBytes(json)))
286+
.Build();
287+
var settings = config.Get<SqlServerSinkSettings>();
288+
289+
Assert.IsNotNull(settings, "Settings should be deserialized");
290+
Assert.IsTrue(settings!.DeleteNotMatchedBySource, "DeleteNotMatchedBySource should be true");
291+
}
292+
293+
[TestMethod]
294+
public void TestSinkSettings_DeleteNotMatchedBySource_WithInsertMode_FailsValidation()
295+
{
296+
var settings = new SqlServerSinkSettings
297+
{
298+
ConnectionString = "Server=.;Database=Test;",
299+
TableName = "TestTable",
300+
WriteMode = SqlWriteMode.Insert,
301+
DeleteNotMatchedBySource = true,
302+
ColumnMappings = new List<ColumnMapping>
303+
{
304+
new ColumnMapping { ColumnName = "Id" },
305+
new ColumnMapping { ColumnName = "Name" }
306+
}
307+
};
308+
309+
var validationResults = settings.Validate(new ValidationContext(settings)).ToList();
310+
311+
Assert.IsTrue(validationResults.Any(v => v.MemberNames.Contains(nameof(SqlServerSinkSettings.DeleteNotMatchedBySource))),
312+
"Validation should fail when DeleteNotMatchedBySource is true with Insert mode");
313+
314+
Assert.IsTrue(validationResults.Any(v => v.ErrorMessage!.Contains("can only be used when WriteMode is Upsert")),
315+
"Validation error should mention Upsert mode requirement");
316+
}
317+
}

0 commit comments

Comments
 (0)