diff --git a/bigfiles/README.md b/bigfiles/README.md index e3bdd5a..fed554d 100644 --- a/bigfiles/README.md +++ b/bigfiles/README.md @@ -1,22 +1,67 @@ -# Scala CPS-Dataset-Comparison +# Scala CPS-Dataset-Comparison This is scala implementation of the project. It is used for comparing big files (files that can not fit to RAM). +- [Project Structure](#project-structure) - [How to run](#how-to-run) - - [Requirements](#requirements) - - [Switch to specific SDK](#switch-to-specific-sdk) + - [Requirements](#requirements) + - [Switch to specific SDK](#switch-to-specific-sdk) - [How to run tests](#how-to-run-tests) +## Project Structure + +The project is split into two SBT submodules: + +| Module | Purpose | +|--------|---------| +| `api` | Pure comparison logic and data models: `Comparator`, `DatasetComparisonHelper`, `HashUtils`, all `analysis/*` classes. No CLI or I/O dependencies. | +| `app` | CLI entry point, I/O, serialization: `DatasetComparison`, `MetricsSerializer`, `IOHandler`, `ArgsParser`, `Arguments`, `DiffComputeType`, `OutputFormatType`. Depends on `api`. | + +``` +bigfiles/ +├── api/ +│ ├── src/main/scala/za/co/absa/ +│ │ ├── analysis/ # AnalyseStat, AnalysisResult, ColumnsDiff, RowsDiff, +│ │ │ # ComparisonMetrics, ComparisonMetricsCalculator, RowByRowAnalysis +│ │ ├── hash/HashUtils.scala +│ │ ├── Comparator.scala +│ │ └── DatasetComparisonHelper.scala +│ └── src/test/scala/ # ComparatorTest, ComparisonMetricsCalculatorTest, +│ # DatasetComparisonHelperTest, HashTableTest, +│ # RowByRowAnalysesTest, AnalysisResultTest, SparkTestSession +├── app/ +│ ├── src/main/scala/za/co/absa/ +│ │ ├── io/IOHandler.scala +│ │ ├── parser/ # ArgsParser, Arguments, DiffComputeType, OutputFormatType +│ │ ├── DatasetComparison.scala +│ │ └── MetricsSerializer.scala +│ ├── src/main/resources/application.conf +│ └── src/test/scala/ # DatasetComparisonTest, ArgsParserTest, IOHandlerTest, +│ # MetricsSerializerTest, VersionTest, SparkTestSession +├── testdata/ # Shared test resources (symlinked into api & app test resources) +│ ├── namesA.parquet +│ ├── namesB.parquet +│ ├── inputA.txt +│ ├── inputB.txt +│ └── out.txt +└── project/ + └── Dependencies.scala # apiDependencies and appDependencies groups +``` + ## How to run -First run assembly: `sbt assembly` +First run assembly from the `app` module: + +```bash +sbt app/assembly +``` Then run: ```bash -spark-submit target/scala-2.12/dataset-comparison-assembly-1.0.jar -o --inputA --inputB - +spark-submit app/target/scala-2.12/dataset-comparison-assembly-1.0.jar -o --inputA --inputB ``` + ### Parameters: | Parameter | Description | Required | |-----------|-------------|----------| @@ -29,23 +74,27 @@ spark-submit target/scala-2.12/dataset-comparison-assembly-1.0.jar -o --inputA --inputB -d Row +spark-submit --class za.co.absa.DatasetComparison \ + --conf "spark.driver.extraJavaOptions=-Dconfig.file=/path/to/application.conf" \ + app/target/scala-2.12/dataset-comparison-assembly-1.0.jar \ + -o --inputA --inputB -d Row ``` + `-d Row` is optional parameter for detailed analyses that specifies which analyses to use. Now it can be only `Row`. -It will compute detailed analyses if number of different columns is less than 200, you can change this threshold in `src/main/resources/application.conf`. +It will compute detailed analyses if number of different columns is less than 200, you can change this threshold in `app/src/main/resources/application.conf`. + ### Spark configuration you can set spark configuration in `spark-defaults.conf` file it is stored in `$SPARK_HOME/conf` directory You will found there `spark-defaults.conf.template` remove `.template` from the file name and set your configuration there. @@ -54,7 +103,6 @@ It could look like this: ```bash spark.hadoop.fs.default.name hdfs://localhost:9999/ # set your hdfs uri spark.hadoop.fs.defaultFS hdfs://localhost:9999/ # set your hdfs uri - ``` ### Requirements @@ -83,17 +131,23 @@ sdk env install ## How to run tests +Tests are split between the two modules. Use the following commands: -| sbt command | test type | info | -| ----------- |-----------|----------------------------------------| -| `sbt test` | ... | It will run tests in test/scala folder | +| sbt command | Module | Test files | +|------------------|--------|------------| +| `sbt api/test` | api | ComparatorTest, ComparisonMetricsCalculatorTest, DatasetComparisonHelperTest, HashTableTest, RowByRowAnalysesTest, AnalysisResultTest | +| `sbt app/test` | app | DatasetComparisonTest, ArgsParserTest, IOHandlerTest, MetricsSerializerTest, VersionTest | +| `sbt test` | both | Runs all tests across both modules (root aggregate) | +`SparkTestSession` is duplicated into both `api/src/test/scala/` and `app/src/test/scala/` as it is required by tests in both modules. + +Test resources (`namesA.parquet`, `namesB.parquet`, `inputA.txt`, `inputB.txt`, `out.txt`) live in `testdata/` at the root and are symlinked into `api/src/test/resources/` and `app/src/test/resources/`. --------- ## Installing hadoop -tutorial [here](https://dev.to/awwsmm/installing-and-running-hadoop-and-spark-on-ubuntu-18-393h) +tutorial [here](https://dev.to/awwsmm/installing-and-running-hadoop-and-spark-on-ubuntu-18-393h) 1. sdk install hadoop 2. ``$ echo "export PATH=\$PATH:\$HADOOP_HOME/bin:\$HADOOP_HOME/sbin" >> ~/.bashrc`` 3. configure files core-site.xml, hdfs-site.xml, mapred-site.xml, yarn-site.xml in /Users//.sdkman/candidates/hadoop/3.3.5/etc @@ -129,22 +183,22 @@ tutorial [here](https://dev.to/awwsmm/installing-and-running-hadoop-and-spark-on false ``` - Add this into **core-site.xml** between tags + Add this into **core-site.xml** between tags ```xml fs.defaultFS hdfs://localhost:9999 ``` - - Add this into **mapred-site.xml** between tags + + Add this into **mapred-site.xml** between tags ```xml mapreduce.framework.name yarn ``` - Add this into **yarn-site.xml** between tags + Add this into **yarn-site.xml** between tags ```xml yarn.nodemanager.aux-services @@ -156,9 +210,9 @@ tutorial [here](https://dev.to/awwsmm/installing-and-running-hadoop-and-spark-on ``` Add this into **hadoop-env.sh** - ```export JAVA_HOME="/.../.sdkman/candidates/java/8.0.422-amzn"``` + ```export JAVA_HOME="/.../.sdkman/candidates/java/11.0.x-amzn"``` -4. create directories by configuration for example: +4. create directories by configuration for example: ``` sudo mkdir -p /opt/hadoop_tmp/hdfs/datanode sudo mkdir -p /opt/hadoop_tmp/hdfs/namenode @@ -168,28 +222,28 @@ tutorial [here](https://dev.to/awwsmm/installing-and-running-hadoop-and-spark-on 6. copy keys to authorized_keys `cat ~/.ssh/id_ed25519.pub >> ~/.ssh/authorized_keys` 7. copy keys into ssh localhost `ssh-copy-id username@localhost` 8. test ssh ` ssh @localhost` -8. format namenode `hdfs namenode -format -force` (or `hadoop namenode -format`) -9. start hadoop `start-dfs.sh && start-yarn.sh` -10. add files to hdfs `hdfs dfs -put /path/to/file /path/to/hdfs` -11. stop hdfs `stop-dfs.sh && stop-yarn.sh` +9. format namenode `hdfs namenode -format -force` (or `hadoop namenode -format`) +10. start hadoop `start-dfs.sh && start-yarn.sh` +11. add files to hdfs `hdfs dfs -put /path/to/file /path/to/hdfs` +12. stop hdfs `stop-dfs.sh && stop-yarn.sh` if something goes wrong check logs in /Users//.sdkman/candidates/hadoop/3.3.5/logs ResourceManager web running on http://localhost:8088/cluster hdfs running on port 9999 -NameNode web interface http://localhost:9870/ +NameNode web interface http://localhost:9870/ And you have to set remote login on: ![img.png](images/remote_login.png) running with hadoop: ```bash -sbt assembly -spark-submit target/scala-2.12/dataset-comparison-assembly-0.1.0.jar -o --inputA --inputB --fsURI http://localhost:9999/ +sbt app/assembly +spark-submit app/target/scala-2.12/dataset-comparison-assembly-1.0.jar -o --inputA --inputB --fsURI http://localhost:9999/ ``` -` spark-submit target/scala-2.12/dataset-comparison-assembly-0.1.0.jar -o /test_files/output --inputA /test_files/RUN20_edit.parquet --inputB /test_files/RUN20.parquet --fsURI hdfs://localhost:9999/` +`spark-submit app/target/scala-2.12/dataset-comparison-assembly-1.0.jar -o /test_files/output --inputA /test_files/RUN20_edit.parquet --inputB /test_files/RUN20.parquet --fsURI hdfs://localhost:9999/` or you can uncomment code at the start of validate part in ArgsParserTest and change -FS to HDFS_URI and then run `sbt test` +FS to HDFS_URI and then run `sbt app/test` ### Setting up IntelliJ IDEA diff --git a/bigfiles/api/src/main/scala/za/co/absa/Comparator.scala b/bigfiles/api/src/main/scala/za/co/absa/Comparator.scala new file mode 100644 index 0000000..61431bd --- /dev/null +++ b/bigfiles/api/src/main/scala/za/co/absa/Comparator.scala @@ -0,0 +1,71 @@ +/** Copyright 2020 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package za.co.absa + +import za.co.absa.hash.HashUtils.HASH_COLUMN_NAME +import hash.HashUtils +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.slf4j.{Logger, LoggerFactory} + +/** Comparator object that compares two parquet files and creates metrics + */ +object Comparator { + private val logger: Logger = LoggerFactory.getLogger(Comparator.getClass) + + /** Compare two DataFrames and return diff rows + * + * @param dataA + * A DataFrame whole data + * @param dataB + * B DataFrame whole data + * @param excludedColumns + * columns to exclude from comparison + * @param spark + * SparkSession + * @return + * Tuple of (diffA, diffB) DataFrames containing rows unique to each dataset + */ + def compare( + dataA: DataFrame, + dataB: DataFrame, + excludedColumns: Seq[String] = Seq.empty + )(implicit spark: SparkSession): (DataFrame, DataFrame) = { + // Apply column exclusion + val filteredDataA = DatasetComparisonHelper.exclude(dataA, excludedColumns, "A") + val filteredDataB = DatasetComparisonHelper.exclude(dataB, excludedColumns, "B") + + val dfWithHashA: DataFrame = HashUtils.createHashColumn(filteredDataA) + logger.info("Hash for A created") + + val dfWithHashB: DataFrame = HashUtils.createHashColumn(filteredDataB) + logger.info("Hash for B created") + + // select non matching hashs + logger.info("Getting diff hashes, A except B") + val diffHashA: DataFrame = dfWithHashA.select(HASH_COLUMN_NAME).exceptAll(dfWithHashB.select(HASH_COLUMN_NAME)) + logger.info("Getting diff hashes, B except A") + val diffHashB: DataFrame = dfWithHashB.select(HASH_COLUMN_NAME).exceptAll(dfWithHashA.select(HASH_COLUMN_NAME)) + + // join on hash column (get back whole rows) + logger.info("Getting diff rows for A") + val distinctDiffA: DataFrame = diffHashA.join(dfWithHashA, Seq(HASH_COLUMN_NAME)).distinct() + val diffA: DataFrame = diffHashA.join(distinctDiffA, Seq(HASH_COLUMN_NAME)) + + logger.info("Getting diff rows for B") + val distinctDiffB: DataFrame = diffHashB.join(dfWithHashB, Seq(HASH_COLUMN_NAME)).distinct() + val diffB: DataFrame = diffHashB.join(distinctDiffB, Seq(HASH_COLUMN_NAME)) + + (diffA, diffB) + } + +} diff --git a/bigfiles/src/main/scala/za/co/absa/DatasetComparisonHelper.scala b/bigfiles/api/src/main/scala/za/co/absa/DatasetComparisonHelper.scala similarity index 94% rename from bigfiles/src/main/scala/za/co/absa/DatasetComparisonHelper.scala rename to bigfiles/api/src/main/scala/za/co/absa/DatasetComparisonHelper.scala index 519c012..a1077e0 100644 --- a/bigfiles/src/main/scala/za/co/absa/DatasetComparisonHelper.scala +++ b/bigfiles/api/src/main/scala/za/co/absa/DatasetComparisonHelper.scala @@ -28,12 +28,10 @@ object DatasetComparisonHelper { * columns to exclude * @param dfName * name of the DataFrame - * @param spark - * SparkSession * @return * DataFrame with excluded columns */ - def exclude(df: DataFrame, toExclude: Seq[String], dfName: String)(implicit spark: SparkSession): DataFrame = { + def exclude(df: DataFrame, toExclude: Seq[String], dfName: String): DataFrame = { if (toExclude.isEmpty) df else { logger.info(s"Excluding columns from the $dfName DataFrame") diff --git a/bigfiles/src/main/scala/za/co/absa/analysis/AnalyseStat.scala b/bigfiles/api/src/main/scala/za/co/absa/analysis/AnalyseStat.scala similarity index 100% rename from bigfiles/src/main/scala/za/co/absa/analysis/AnalyseStat.scala rename to bigfiles/api/src/main/scala/za/co/absa/analysis/AnalyseStat.scala diff --git a/bigfiles/api/src/main/scala/za/co/absa/analysis/AnalysisResult.scala b/bigfiles/api/src/main/scala/za/co/absa/analysis/AnalysisResult.scala new file mode 100644 index 0000000..92267a3 --- /dev/null +++ b/bigfiles/api/src/main/scala/za/co/absa/analysis/AnalysisResult.scala @@ -0,0 +1,53 @@ +/** Copyright 2020 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package za.co.absa.analysis + +/** Result of row-by-row analysis with explicit outcome types. + */ +sealed trait AnalysisResult + +object AnalysisResult { + + /** Analysis completed successfully with row-by-row differences. + * + * @param diffAToB + * Differences from dataset A to B + * @param diffBToA + * Differences from dataset B to A + */ + case class Success(diffAToB: Seq[RowsDiff], diffBToA: Seq[RowsDiff]) extends AnalysisResult + + /** Datasets are identical - no differences to analyze. + */ + case object DatasetsIdentical extends AnalysisResult + + /** One dataset has differences but the other doesn't - cannot perform row-by-row matching. + * + * @param diffCountA + * Number of differences in dataset A + * @param diffCountB + * Number of differences in dataset B + */ + case class OneSidedDifference(diffCountA: Long, diffCountB: Long) extends AnalysisResult + + /** Number of differences exceeds the threshold. + * + * @param diffCountA + * Number of differences in dataset A + * @param diffCountB + * Number of differences in dataset B + * @param threshold + * The threshold that was exceeded + */ + case class ThresholdExceeded(diffCountA: Long, diffCountB: Long, threshold: Int) extends AnalysisResult +} diff --git a/bigfiles/src/main/scala/za/co/absa/analysis/ColumnsDiff.scala b/bigfiles/api/src/main/scala/za/co/absa/analysis/ColumnsDiff.scala similarity index 100% rename from bigfiles/src/main/scala/za/co/absa/analysis/ColumnsDiff.scala rename to bigfiles/api/src/main/scala/za/co/absa/analysis/ColumnsDiff.scala diff --git a/bigfiles/api/src/main/scala/za/co/absa/analysis/ComparisonMetrics.scala b/bigfiles/api/src/main/scala/za/co/absa/analysis/ComparisonMetrics.scala new file mode 100644 index 0000000..5ab617b --- /dev/null +++ b/bigfiles/api/src/main/scala/za/co/absa/analysis/ComparisonMetrics.scala @@ -0,0 +1,52 @@ +/** Copyright 2020 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package za.co.absa.analysis + +/** Typed metrics representing the comparison statistics between two datasets. + * + * @param rowCountA + * Total number of rows in dataset A + * @param rowCountB + * Total number of rows in dataset B + * @param columnCountA + * Number of columns in dataset A + * @param columnCountB + * Number of columns in dataset B + * @param diffCountA + * Number of rows present in A but not in B + * @param diffCountB + * Number of rows present in B but not in A + * @param uniqueRowCountA + * Number of unique rows in dataset A + * @param uniqueRowCountB + * Number of unique rows in dataset B + * @param sameRecordsCount + * Number of records that are the same in both datasets + * @param sameRecordsPercentToA + * Percentage of same records relative to dataset A + * @param excludedColumns + * Columns that were excluded from the comparison + */ +case class ComparisonMetrics( + rowCountA: Long, + rowCountB: Long, + columnCountA: Int, + columnCountB: Int, + diffCountA: Long, + diffCountB: Long, + uniqueRowCountA: Long, + uniqueRowCountB: Long, + sameRecordsCount: Long, + sameRecordsPercentToA: Double, + excludedColumns: Seq[String] +) diff --git a/bigfiles/api/src/main/scala/za/co/absa/analysis/ComparisonMetricsCalculator.scala b/bigfiles/api/src/main/scala/za/co/absa/analysis/ComparisonMetricsCalculator.scala new file mode 100644 index 0000000..c02248f --- /dev/null +++ b/bigfiles/api/src/main/scala/za/co/absa/analysis/ComparisonMetricsCalculator.scala @@ -0,0 +1,78 @@ +/** Copyright 2020 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package za.co.absa.analysis + +import org.apache.spark.sql.DataFrame +import org.slf4j.{Logger, LoggerFactory} +import za.co.absa.DatasetComparisonHelper + +/** Default implementation of ComparisonMetricsCalculator. + * + * Always returns metrics, including for identical datasets. + */ +object ComparisonMetricsCalculator { + + private val logger: Logger = LoggerFactory.getLogger(this.getClass) + + def calculate( + originalA: DataFrame, + originalB: DataFrame, + diffA: DataFrame, + diffB: DataFrame, + excludedColumns: Seq[String] + ): Option[ComparisonMetrics] = { + + logger.info("Computing comparison metrics") + + val dataA = DatasetComparisonHelper.exclude(originalA, excludedColumns, "A") + val dataB = DatasetComparisonHelper.exclude(originalB, excludedColumns, "B") + + val diffCountA = diffA.count() + val diffCountB = diffB.count() + + logger.info(s"Diff counts - A: $diffCountA, B: $diffCountB") + + val rowCountA = dataA.count() + val rowCountB = dataB.count() + val uniqRowCountA = dataA.distinct().count() + val uniqRowCountB = dataB.distinct().count() + + val sameRecords = if (rowCountA - diffCountA == rowCountB - diffCountB) { + rowCountA - diffCountA + } else { + -1 + } + + val sameRecordsPercent = if (rowCountA > 0) { + (math.floor(sameRecords.toFloat / rowCountA * 10000)) / 100 + } else { + 0.0 + } + + Some( + ComparisonMetrics( + rowCountA = rowCountA, + rowCountB = rowCountB, + columnCountA = dataA.columns.length, + columnCountB = dataB.columns.length, + diffCountA = diffCountA, + diffCountB = diffCountB, + uniqueRowCountA = uniqRowCountA, + uniqueRowCountB = uniqRowCountB, + sameRecordsCount = sameRecords, + sameRecordsPercentToA = sameRecordsPercent, + excludedColumns = excludedColumns + ) + ) + } +} diff --git a/bigfiles/src/main/scala/za/co/absa/analysis/RowByRowAnalysis.scala b/bigfiles/api/src/main/scala/za/co/absa/analysis/RowByRowAnalysis.scala similarity index 78% rename from bigfiles/src/main/scala/za/co/absa/analysis/RowByRowAnalysis.scala rename to bigfiles/api/src/main/scala/za/co/absa/analysis/RowByRowAnalysis.scala index 4527a0f..937bf18 100644 --- a/bigfiles/src/main/scala/za/co/absa/analysis/RowByRowAnalysis.scala +++ b/bigfiles/api/src/main/scala/za/co/absa/analysis/RowByRowAnalysis.scala @@ -23,19 +23,47 @@ object RowByRowAnalysis { private val logger: Logger = LoggerFactory.getLogger(this.getClass) + /** Analyze comparison result and generate row-by-row differences if within threshold. + * + * @param diffA + * DataFrame containing rows unique to dataset A + * @param diffB + * DataFrame containing rows unique to dataset B + * @param threshold + * Maximum number of differences to analyze per dataset + * @return + * AnalysisResult indicating the outcome: Success with diffs, DatasetsIdentical, OneSidedDifference, or + * ThresholdExceeded + */ + def analyze(diffA: DataFrame, diffB: DataFrame, threshold: Int): AnalysisResult = { + val diffCountA = diffA.count() + val diffCountB = diffB.count() + + if (diffCountA == 0 && diffCountB == 0) { + AnalysisResult.DatasetsIdentical + } else if (diffCountA == 0 || diffCountB == 0) { + AnalysisResult.OneSidedDifference(diffCountA, diffCountB) + } else if (diffCountA > threshold || diffCountB > threshold) { + AnalysisResult.ThresholdExceeded(diffCountA, diffCountB, threshold) + } else { + val diffAResult = generateDiffJson(diffA, diffB, "A") + val diffBResult = generateDiffJson(diffB, diffA, "B") + AnalysisResult.Success(diffAResult, diffBResult) + } + } + /** Recursively finds the best matching row in diffRight for the given rowLeft based on the minimum difference score. * * @param rowLeft - * The row from DataFrame Left to compare. + * The row from the left DataFrame to compare * @param diffRight - * The DataFrame Right to compare against. + * The right DataFrame to compare against * @param bestScore - * score of difference between two rows, starts on row length +1 + * The best score found so far (lower is better) * @param mask - * it is a sequence of 0 and 1, where 0 means no difference and 1 means difference. Then you can pick just - * different rows on behalf of this mask. + * The mask indicating which columns differ in the best match * @param bestRowRight - * represents the best row from Right DataFrame that is the closest to the row from Left DataFrame. + * The best matching row found so far * @return * best match statistics. This contains the best score, the best row from DataFrame Right and mask which has 0 and * 1, 1 means the column is different. Mask represents the comparison between two rows where 0 means they are the diff --git a/bigfiles/src/main/scala/za/co/absa/analysis/RowsDiff.scala b/bigfiles/api/src/main/scala/za/co/absa/analysis/RowsDiff.scala similarity index 100% rename from bigfiles/src/main/scala/za/co/absa/analysis/RowsDiff.scala rename to bigfiles/api/src/main/scala/za/co/absa/analysis/RowsDiff.scala diff --git a/bigfiles/src/main/scala/za/co/absa/hash/HashUtils.scala b/bigfiles/api/src/main/scala/za/co/absa/hash/HashUtils.scala similarity index 100% rename from bigfiles/src/main/scala/za/co/absa/hash/HashUtils.scala rename to bigfiles/api/src/main/scala/za/co/absa/hash/HashUtils.scala diff --git a/bigfiles/api/src/test/scala/ComparatorTest.scala b/bigfiles/api/src/test/scala/ComparatorTest.scala new file mode 100644 index 0000000..277c6eb --- /dev/null +++ b/bigfiles/api/src/test/scala/ComparatorTest.scala @@ -0,0 +1,117 @@ +/** + * Copyright 2020 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +import za.co.absa.Comparator +import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} +import org.scalatest.funsuite.AnyFunSuite + +class ComparatorTest extends AnyFunSuite { + implicit val spark: SparkSession = SparkTestSession.spark + + import spark.implicits._ + + test("test that comparator returns diff rows"){ + val tmp1: DataFrame = Seq((1, "one"), (2, "two")).toDF("id", "value") + val tmp2: DataFrame = Seq((1, "one"), (2, "three"), (3, "two")).toDF("id", "value") + + val (diffA, diffB) = Comparator.compare(tmp1, tmp2) + assert(diffA.count() == 1) + assert(diffB.count() == 2) + } + + test("test that comparator returns all rows if dataframes are completely different"){ + val tmp1: DataFrame = Seq((1, "one"), (2, "two")).toDF("id", "value") + val tmp2: DataFrame = Seq(("12af", 1003), ("12qw", 3004), ("123q", 3456)).toDF("id", "value") + + val (diffA, diffB) = Comparator.compare(tmp1, tmp2) + assert(diffA.count() == 2) + assert(diffB.count() == 3) + } + + test("test that comparator returns empty DataFrames if all rows are the same"){ + val tmp1: DataFrame = Seq((1, "one"), (2, "two")).toDF("id", "value") + val tmp2: DataFrame = Seq((1, "one"), (2, "two")).toDF("id", "value") + + val (diffA, diffB) = Comparator.compare(tmp1, tmp2) + assert(diffA.count() == 0) + assert(diffB.count() == 0) + } + + test("test that comparator returns empty DataFrames if all rows are the same but in different order"){ + val tmp1: DataFrame = Seq((2, "two"), (1, "one")).toDF("id", "value") + val tmp2: DataFrame = Seq((1, "one"), (2, "two")).toDF("id", "value") + + val (diffA, diffB) = Comparator.compare(tmp1, tmp2) + assert(diffA.count() == 0) + assert(diffB.count() == 0) + } + + test("test that comparator returns correct dataframes if one duplicate is present in one table"){ + val tmp1: DataFrame = Seq((1, "one"), (1, "one"), (2, "two")).toDF("id", "value") + val tmp2: DataFrame = Seq((1, "one"), (2, "two")).toDF("id", "value") + + val (diffA, diffB) = Comparator.compare(tmp1, tmp2) + + assert(diffA.count() == 1) + assert(diffB.count() == 0) + } + + test("test that comparator returns correct dataframes if 2 duplicates are present in one table"){ + val tmp1: DataFrame = Seq((1, "one"), (1, "one"), (1, "one"), (2, "two")).toDF("id", "value") + val tmp2: DataFrame = Seq((1, "one"), (2, "two")).toDF("id", "value") + + val (diffA, diffB) = Comparator.compare(tmp1, tmp2) + + assert(diffA.count() == 2) + assert(diffB.count() == 0) + } + + + test("test that comparator returns correct dataframes if duplicates are present"){ + val tmp1: DataFrame = Seq((1, "one"), (1, "one"), (2, "two")).toDF("id", "value") + val tmp2: DataFrame = Seq((1, "one"), (1, "one"), (1, "one"), (2, "two")).toDF("id", "value") + val (diffA, diffB) = Comparator.compare(tmp1, tmp2) + + assert(diffA.count() == 0) + assert(diffB.count() == 1) + } + + test("test that comparator excludes columns correctly") { + val tmp1: DataFrame = Seq((1, "one", 100), (2, "two", 200)).toDF("id", "value", "extra") + val tmp2: DataFrame = Seq((1, "one", 999), (2, "two", 888)).toDF("id", "value", "extra") + + // Without exclusion, datasets differ + val (diffA1, diffB1) = Comparator.compare(tmp1, tmp2) + assert(diffA1.count() == 2) + assert(diffB1.count() == 2) + + // With exclusion of "extra" column, datasets are identical + val (diffA2, diffB2) = Comparator.compare(tmp1, tmp2, Seq("extra")) + assert(diffA2.count() == 0) + assert(diffB2.count() == 0) + } + + test("test that comparator excludes multiple columns correctly") { + val tmp1: DataFrame = Seq((1, "one", 100, "x"), (2, "two", 200, "y")).toDF("id", "value", "extra1", "extra2") + val tmp2: DataFrame = Seq((1, "one", 999, "z"), (2, "two", 888, "w")).toDF("id", "value", "extra1", "extra2") + + // Excluding both extra columns makes datasets identical + val (diffA, diffB) = Comparator.compare(tmp1, tmp2, Seq("extra1", "extra2")) + assert(diffA.count() == 0) + assert(diffB.count() == 0) + } + +} diff --git a/bigfiles/api/src/test/scala/ComparisonMetricsCalculatorTest.scala b/bigfiles/api/src/test/scala/ComparisonMetricsCalculatorTest.scala new file mode 100644 index 0000000..2bc14f7 --- /dev/null +++ b/bigfiles/api/src/test/scala/ComparisonMetricsCalculatorTest.scala @@ -0,0 +1,158 @@ +/** + * Copyright 2020 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.scalatest.funsuite.AnyFunSuite +import za.co.absa.analysis.{ComparisonMetrics, ComparisonMetricsCalculator} + +class ComparisonMetricsCalculatorTest extends AnyFunSuite { + implicit val spark: SparkSession = SparkTestSession.spark + + import spark.implicits._ + + test("test that calculator returns metrics when datasets are identical") { + val tmp1: DataFrame = Seq((1, "one"), (2, "two")).toDF("id", "value") + val tmp2: DataFrame = Seq((1, "one"), (2, "two")).toDF("id", "value") + + val diffA = Seq.empty[(Int, String)].toDF("id", "value") + val diffB = Seq.empty[(Int, String)].toDF("id", "value") + val metricsOpt = ComparisonMetricsCalculator.calculate(tmp1, tmp2, diffA, diffB, Seq()) + + assert(metricsOpt.isDefined) + val metrics = metricsOpt.get + + assert(metrics.rowCountA == 2) + assert(metrics.rowCountB == 2) + assert(metrics.columnCountA == 2) + assert(metrics.columnCountB == 2) + assert(metrics.diffCountA == 0) + assert(metrics.diffCountB == 0) + assert(metrics.uniqueRowCountA == 2) + assert(metrics.uniqueRowCountB == 2) + assert(metrics.sameRecordsCount == 2) + assert(metrics.sameRecordsPercentToA == 100.0) + assert(metrics.excludedColumns.isEmpty) + } + + test("test that calculator returns correct metrics when datasets differ") { + val tmp1: DataFrame = Seq((1, "one"), (2, "two")).toDF("id", "value") + val tmp2: DataFrame = Seq((1, "one"), (2, "three"), (3, "two")).toDF("id", "value") + + val diffA = Seq((2, "two")).toDF("id", "value") + val diffB = Seq((2, "three"), (3, "two")).toDF("id", "value") + val metricsOpt = ComparisonMetricsCalculator.calculate(tmp1, tmp2, diffA, diffB, Seq()) + + assert(metricsOpt.isDefined) + val metrics = metricsOpt.get + + assert(metrics.rowCountA == 2) + assert(metrics.rowCountB == 3) + assert(metrics.columnCountA == 2) + assert(metrics.columnCountB == 2) + assert(metrics.diffCountA == 1) + assert(metrics.diffCountB == 2) + assert(metrics.uniqueRowCountA == 2) + assert(metrics.uniqueRowCountB == 3) + assert(metrics.sameRecordsCount == 1) + assert(metrics.sameRecordsPercentToA == 50.0) + assert(metrics.excludedColumns.isEmpty) + } + + test("test that calculator returns correct metrics with duplicates in data") { + val tmp1: DataFrame = Seq((1, "one"), (1, "one"), (2, "two")).toDF("id", "value") + val tmp2: DataFrame = Seq((1, "one"), (1, "one"), (1, "one"), (2, "two")).toDF("id", "value") + + val diffA = Seq.empty[(Int, String)].toDF("id", "value") + val diffB = Seq((1, "one")).toDF("id", "value") + val metricsOpt = ComparisonMetricsCalculator.calculate(tmp1, tmp2, diffA, diffB, Seq()) + + assert(metricsOpt.isDefined) + val metrics = metricsOpt.get + + assert(metrics.rowCountA == 3) + assert(metrics.rowCountB == 4) + assert(metrics.columnCountA == 2) + assert(metrics.columnCountB == 2) + assert(metrics.diffCountA == 0) + assert(metrics.diffCountB == 1) + assert(metrics.uniqueRowCountA == 2) + assert(metrics.uniqueRowCountB == 2) + assert(metrics.sameRecordsCount == 3) + assert(metrics.sameRecordsPercentToA == 100.0) + assert(metrics.excludedColumns.isEmpty) + } + + test("test that calculator returns correct metrics with excluded columns") { + val tmp1: DataFrame = Seq(("one"), ("two")).toDF("value") + val tmp2: DataFrame = Seq(("one"), ("three"), ("two")).toDF("value") + + val diffA = Seq.empty[String].toDF("value") + val diffB = Seq("three").toDF("value") + val metricsOpt = ComparisonMetricsCalculator.calculate(tmp1, tmp2, diffA, diffB, Seq("id")) + + assert(metricsOpt.isDefined) + val metrics = metricsOpt.get + + assert(metrics.rowCountA == 2) + assert(metrics.rowCountB == 3) + assert(metrics.columnCountA == 1) + assert(metrics.columnCountB == 1) + assert(metrics.diffCountA == 0) + assert(metrics.diffCountB == 1) + assert(metrics.uniqueRowCountA == 2) + assert(metrics.uniqueRowCountB == 3) + assert(metrics.sameRecordsCount == 2) + assert(metrics.sameRecordsPercentToA == 100.0) + assert(metrics.excludedColumns == Seq("id")) + } + + test("test that calculator returns metrics when datasets are completely different") { + val tmp1: DataFrame = Seq((1, "one"), (2, "two")).toDF("id", "value") + val tmp2: DataFrame = Seq(("12af", 1003), ("12qw", 3004), ("123q", 3456)).toDF("id", "value") + + val diffA = Seq((1, "one"), (2, "two")).toDF("id", "value") + val diffB = Seq(("12af", 1003), ("12qw", 3004), ("123q", 3456)).toDF("id", "value") + val metricsOpt = ComparisonMetricsCalculator.calculate(tmp1, tmp2, diffA, diffB, Seq()) + + assert(metricsOpt.isDefined) + val metrics = metricsOpt.get + + assert(metrics.rowCountA == 2) + assert(metrics.rowCountB == 3) + assert(metrics.diffCountA == 2) + assert(metrics.diffCountB == 3) + assert(metrics.sameRecordsCount == 0) + assert(metrics.sameRecordsPercentToA == 0.0) + } + + test("test that excluded columns are reflected correctly in result") { + val tmp1: DataFrame = Seq((1, "one", 100), (2, "two", 200)).toDF("id", "value", "extra") + val tmp2: DataFrame = Seq((1, "one", 999), (2, "two", 888)).toDF("id", "value", "extra") + + val diffA = Seq.empty[(Int, String, Int)].toDF("id", "value", "extra") + val diffB = Seq.empty[(Int, String, Int)].toDF("id", "value", "extra") + val metricsOpt = ComparisonMetricsCalculator.calculate(tmp1, tmp2, diffA, diffB, Seq("extra")) + + assert(metricsOpt.isDefined) // Should have metrics even when datasets are identical after exclusion + val metrics = metricsOpt.get + + assert(metrics.diffCountA == 0) + assert(metrics.diffCountB == 0) + assert(metrics.sameRecordsCount == 2) + assert(metrics.sameRecordsPercentToA == 100.0) + assert(metrics.excludedColumns == Seq("extra")) + } +} diff --git a/bigfiles/src/test/scala/DatasetComparisonHelperTest.scala b/bigfiles/api/src/test/scala/DatasetComparisonHelperTest.scala similarity index 100% rename from bigfiles/src/test/scala/DatasetComparisonHelperTest.scala rename to bigfiles/api/src/test/scala/DatasetComparisonHelperTest.scala diff --git a/bigfiles/src/test/scala/HashTableTest.scala b/bigfiles/api/src/test/scala/HashTableTest.scala similarity index 100% rename from bigfiles/src/test/scala/HashTableTest.scala rename to bigfiles/api/src/test/scala/HashTableTest.scala diff --git a/bigfiles/src/test/scala/RowByRowAnalysesTest.scala b/bigfiles/api/src/test/scala/RowByRowAnalysesTest.scala similarity index 53% rename from bigfiles/src/test/scala/RowByRowAnalysesTest.scala rename to bigfiles/api/src/test/scala/RowByRowAnalysesTest.scala index 2a1ac11..ed3632e 100644 --- a/bigfiles/src/test/scala/RowByRowAnalysesTest.scala +++ b/bigfiles/api/src/test/scala/RowByRowAnalysesTest.scala @@ -14,10 +14,10 @@ * limitations under the License. **/ -import za.co.absa.analysis.{ColumnsDiff, RowsDiff} +import za.co.absa.analysis.{AnalysisResult, ColumnsDiff, RowByRowAnalysis, RowsDiff} import za.co.absa.analysis.RowByRowAnalysis.generateDiffJson import za.co.absa.hash.HashUtils.HASH_COLUMN_NAME -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{DataFrame, SparkSession} import org.scalatest.funsuite.AnyFunSuite import upickle.default._ @@ -27,7 +27,6 @@ class RowByRowAnalysesTest extends AnyFunSuite{ import spark.implicits._ implicit val ColumnsDiffRw: ReadWriter[ColumnsDiff] = macroRW implicit val RowDiffRw: ReadWriter[RowsDiff] = macroRW -// spark.sparkContext.setLogLevel("DEBUG") test("test analyses multiple changes") { val dataA = Seq( @@ -193,6 +192,187 @@ class RowByRowAnalysesTest extends AnyFunSuite{ -} + // ============================================================================ + // Tests for analyze() method (AnalysisResult) + // ============================================================================ + + test("analyze returns DatasetsIdentical when datasets are identical") { + val diffA: DataFrame = Seq.empty[(Int, String, Int)].toDF("id", "value", HASH_COLUMN_NAME) + val diffB: DataFrame = Seq.empty[(Int, String, Int)].toDF("id", "value", HASH_COLUMN_NAME) + + val result = RowByRowAnalysis.analyze(diffA, diffB, threshold = 100) + + result match { + case AnalysisResult.DatasetsIdentical => + assert(true, "Correctly identified identical datasets") + case other => + fail(s"Expected DatasetsIdentical but got $other") + } + } + + test("analyze returns Success when differences are within threshold") { + val diffA: DataFrame = Seq((2, "two", 12345)).toDF("id", "value", HASH_COLUMN_NAME) + val diffB: DataFrame = Seq((2, "three", 67890)).toDF("id", "value", HASH_COLUMN_NAME) + + val result = RowByRowAnalysis.analyze(diffA, diffB, threshold = 10) + + result match { + case AnalysisResult.Success(diffAToB, diffBToA) => + assert(diffAToB.nonEmpty, "Should have differences from A to B") + assert(diffBToA.nonEmpty, "Should have differences from B to A") + assert(diffAToB.head.inputLeftHash.nonEmpty, "Should have hash for left row") + assert(diffAToB.head.inputRightHash.nonEmpty, "Should have hash for right row") + case other => + fail(s"Expected Success but got $other") + } + } + + test("analyze returns ThresholdExceeded when differences exceed threshold") { + val diffA: DataFrame = Seq( + (1, "one", 11111), + (2, "two", 22222), + (3, "three", 33333), + (4, "four", 44444), + (5, "five", 55555) + ).toDF("id", "value", HASH_COLUMN_NAME) + val diffB: DataFrame = Seq( + (1, "ONE", 11112), + (2, "TWO", 22223), + (3, "THREE", 33334), + (4, "FOUR", 44445), + (5, "FIVE", 55556) + ).toDF("id", "value", HASH_COLUMN_NAME) + + val threshold = 2 + val result = RowByRowAnalysis.analyze(diffA, diffB, threshold) + + result match { + case AnalysisResult.ThresholdExceeded(countA, countB, thresh) => + assert(countA == 5, s"Expected 5 differences in A but got $countA") + assert(countB == 5, s"Expected 5 differences in B but got $countB") + assert(thresh == threshold, s"Expected threshold $threshold but got $thresh") + case other => + fail(s"Expected ThresholdExceeded but got $other") + } + } + + test("analyze returns OneSidedDifference when only A has differences even with low threshold") { + val diffA: DataFrame = Seq((3, "three", 33333)).toDF("id", "value", HASH_COLUMN_NAME) + val diffB: DataFrame = Seq.empty[(Int, String, Int)].toDF("id", "value", HASH_COLUMN_NAME) + val threshold = 0 + val result = RowByRowAnalysis.analyze(diffA, diffB, threshold) + + result match { + case AnalysisResult.OneSidedDifference(countA, countB) => + assert(countA == 1, s"Expected 1 difference in A but got $countA") + assert(countB == 0, s"Expected 0 differences in B but got $countB") + case other => + fail(s"Expected OneSidedDifference but got $other") + } + } + + test("analyze returns OneSidedDifference when only B has differences even with low threshold") { + val diffA: DataFrame = Seq.empty[(Int, String, Int)].toDF("id", "value", HASH_COLUMN_NAME) + val diffB: DataFrame = Seq((2, "two", 22222), (3, "three", 33333)).toDF("id", "value", HASH_COLUMN_NAME) + + val threshold = 1 + val result = RowByRowAnalysis.analyze(diffA, diffB, threshold) + + result match { + case AnalysisResult.OneSidedDifference(countA, countB) => + assert(countA == 0, s"Expected 0 differences in A but got $countA") + assert(countB == 2, s"Expected 2 differences in B but got $countB") + case other => + fail(s"Expected OneSidedDifference but got $other") + } + } + + test("analyze returns OneSidedDifference when only A has differences") { + val diffA: DataFrame = Seq((3, "three", 33333)).toDF("id", "value", HASH_COLUMN_NAME) + val diffB: DataFrame = Seq.empty[(Int, String, Int)].toDF("id", "value", HASH_COLUMN_NAME) + + val result = RowByRowAnalysis.analyze(diffA, diffB, threshold = 10) + + result match { + case AnalysisResult.OneSidedDifference(countA, countB) => + assert(countA == 1, s"Expected 1 difference in A but got $countA") + assert(countB == 0, s"Expected 0 differences in B but got $countB") + case other => + fail(s"Expected OneSidedDifference but got $other") + } + } + test("analyze returns OneSidedDifference when only B has differences") { + val diffA: DataFrame = Seq.empty[(Int, String, Int)].toDF("id", "value", HASH_COLUMN_NAME) + val diffB: DataFrame = Seq((3, "three", 33333), (4, "four", 44444)).toDF("id", "value", HASH_COLUMN_NAME) + + val result = RowByRowAnalysis.analyze(diffA, diffB, threshold = 10) + + result match { + case AnalysisResult.OneSidedDifference(countA, countB) => + assert(countA == 0, s"Expected 0 differences in A but got $countA") + assert(countB == 2, s"Expected 2 differences in B but got $countB") + case other => + fail(s"Expected OneSidedDifference but got $other") + } + } + + test("analyze returns Success with correct row diffs for small differences") { + val diffA: DataFrame = Seq((1, "one", 100, 11111), (2, "two", 200, 22222)).toDF("id", "name", "value", HASH_COLUMN_NAME) + val diffB: DataFrame = Seq((1, "one", 999, 11112), (2, "two", 888, 22223)).toDF("id", "name", "value", HASH_COLUMN_NAME) + + val result = RowByRowAnalysis.analyze(diffA, diffB, threshold = 10) + + result match { + case AnalysisResult.Success(diffAToB, diffBToA) => + assert(diffAToB.length == 2, s"Expected 2 row diffs from A to B but got ${diffAToB.length}") + assert(diffBToA.length == 2, s"Expected 2 row diffs from B to A but got ${diffBToA.length}") + assert(diffAToB.forall(_.diffs.nonEmpty), "All row diffs should have column differences") + assert(diffBToA.forall(_.diffs.nonEmpty), "All row diffs should have column differences") + case other => + fail(s"Expected Success but got $other") + } + } + + test("analyze result is type-safe and can be pattern matched") { + val diffA: DataFrame = Seq((1, "a", 11111)).toDF("id", "value", HASH_COLUMN_NAME) + val diffB: DataFrame = Seq((1, "b", 11112)).toDF("id", "value", HASH_COLUMN_NAME) + + val result: AnalysisResult = RowByRowAnalysis.analyze(diffA, diffB, threshold = 10) + + val message = result match { + case AnalysisResult.Success(_, _) => "analysis complete" + case AnalysisResult.DatasetsIdentical => "identical" + case AnalysisResult.OneSidedDifference(_, _) => "one-sided" + case AnalysisResult.ThresholdExceeded(_, _, _) => "too many diffs" + } + + assert(message == "analysis complete") + } + + test("analyze returns DatasetsIdentical for empty diff DataFrames") { + val emptyDiffA: DataFrame = Seq.empty[(Int, String, Int)].toDF("id", "value", HASH_COLUMN_NAME) + val emptyDiffB: DataFrame = Seq.empty[(Int, String, Int)].toDF("id", "value", HASH_COLUMN_NAME) + + val result = RowByRowAnalysis.analyze(emptyDiffA, emptyDiffB, threshold = 10) + + assert(result == AnalysisResult.DatasetsIdentical) + } + + test("analyze ThresholdExceeded contains exact counts") { + val diffA: DataFrame = (1 to 10).map(i => (i, s"value$i", 10000 + i)).toDF("id", "value", HASH_COLUMN_NAME) + val diffB: DataFrame = (11 to 25).map(i => (i, s"value$i", 20000 + i)).toDF("id", "value", HASH_COLUMN_NAME) + + val result = RowByRowAnalysis.analyze(diffA, diffB, threshold = 5) + + result match { + case AnalysisResult.ThresholdExceeded(countA, countB, thresh) => + assert(countA == 10, "Count A should be exactly 10") + assert(countB == 15, "Count B should be exactly 15") + assert(thresh == 5, "Threshold should be exactly 5") + case other => + fail(s"Expected ThresholdExceeded but got $other") + } + } +} diff --git a/bigfiles/src/test/scala/SparkTestSession.scala b/bigfiles/api/src/test/scala/SparkTestSession.scala similarity index 100% rename from bigfiles/src/test/scala/SparkTestSession.scala rename to bigfiles/api/src/test/scala/SparkTestSession.scala diff --git a/bigfiles/src/main/resources/application.conf b/bigfiles/app/src/main/resources/application.conf similarity index 100% rename from bigfiles/src/main/resources/application.conf rename to bigfiles/app/src/main/resources/application.conf diff --git a/bigfiles/app/src/main/scala/za/co/absa/DatasetComparison.scala b/bigfiles/app/src/main/scala/za/co/absa/DatasetComparison.scala new file mode 100644 index 0000000..260804c --- /dev/null +++ b/bigfiles/app/src/main/scala/za/co/absa/DatasetComparison.scala @@ -0,0 +1,93 @@ +/** Copyright 2020 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package za.co.absa + +import za.co.absa.analysis.{AnalysisResult, ComparisonMetricsCalculator, RowByRowAnalysis} +import za.co.absa.parser.{ArgsParser, DiffComputeType} +import za.co.absa.io.IOHandler +import org.apache.spark.sql.SparkSession +import com.typesafe.config.ConfigFactory +import org.slf4j.{Logger, LoggerFactory} + +import java.nio.file.Paths + +object DatasetComparison { + private val logger: Logger = LoggerFactory.getLogger(this.getClass) + + def main(args: Array[String]): Unit = { + val conf = ConfigFactory.load() + val threshold = conf.getInt("dataset-comparison.analysis.diff-threshold") + + val arguments = ArgsParser.getArgs(args) + + implicit val spark: SparkSession = SparkSession + .builder() + .appName("DatasetComparator") + .getOrCreate() + + // validate arguments + ArgsParser.validate(arguments) + + // read data + val rawDataA = IOHandler.sparkRead(arguments.inputA) + val rawDataB = IOHandler.sparkRead(arguments.inputB) + + val (diffA, diffB) = Comparator.compare(rawDataA, rawDataB, arguments.exclude) + + // write diff files + val out = arguments.out + IOHandler.dfWrite(Paths.get(out, "inputA_differences").toString, diffA, arguments.outFormat) + IOHandler.dfWrite(Paths.get(out, "inputB_differences").toString, diffB, arguments.outFormat) + + val metrics = ComparisonMetricsCalculator + .calculate(rawDataA, rawDataB, diffA, diffB, arguments.exclude) + .getOrElse(throw new RuntimeException("Failed to calculate metrics")) + val metricsJson = MetricsSerializer.serialize(metrics) + IOHandler.jsonWrite(Paths.get(out, "metrics.json").toString, metricsJson) + + arguments.diff match { + case DiffComputeType.Row => + logger.info("Starting row-by-row analysis") + RowByRowAnalysis.analyze(diffA, diffB, threshold) match { + case AnalysisResult.Success(diffAToB, diffBToA) => + logger.info("Computing row-by-row differences") + IOHandler.rowDiffWriteAsJson(Paths.get(out, "A_to_B_changes.json").toString, diffAToB) + IOHandler.rowDiffWriteAsJson(Paths.get(out, "B_to_A_changes.json").toString, diffBToA) + logger.info("Row-by-row analysis completed successfully") + + case AnalysisResult.DatasetsIdentical => + logger.info("Datasets are identical, no row-by-row analysis needed") + + case AnalysisResult.OneSidedDifference(countA, countB) => + logger.info( + s"""Detailed analysis will not be computed - one-sided difference: + |A: ${if (countA == 0) "All rows matched" else s"$countA differences (see inputA_differences)"} + |B: ${if (countB == 0) "All rows matched" else s"$countB differences (see inputB_differences)"} + |Row-by-row matching requires differences in both datasets + |""".stripMargin + ) + + case AnalysisResult.ThresholdExceeded(countA, countB, thresh) => + logger.warn( + s"""Row-by-row analysis skipped - threshold exceeded: + |A differences: $countA + |B differences: $countB + |Threshold: $thresh + |Details available in inputA_differences and inputB_differences files + |""".stripMargin + ) + } + case _ => logger.info("None DiffComputeType selected") + } + } +} diff --git a/bigfiles/app/src/main/scala/za/co/absa/MetricsSerializer.scala b/bigfiles/app/src/main/scala/za/co/absa/MetricsSerializer.scala new file mode 100644 index 0000000..5b4a7eb --- /dev/null +++ b/bigfiles/app/src/main/scala/za/co/absa/MetricsSerializer.scala @@ -0,0 +1,58 @@ +/** Copyright 2020 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package za.co.absa + +import org.json4s.JsonAST +import org.json4s.JsonDSL._ +import org.json4s.native.JsonMethods.{compact, render} +import za.co.absa.analysis.ComparisonMetrics + +/** Serializes ComparisonMetrics to JSON format. + */ +object MetricsSerializer { + + /** Converts ComparisonMetrics to a JSON object. + * + * @param metrics + * The comparison metrics to serialize + * @return + * JSON object representation of the metrics + */ + def toJson(metrics: ComparisonMetrics): JsonAST.JObject = { + ("A" -> + ("row count" -> metrics.rowCountA) ~ + ("column count" -> metrics.columnCountA) ~ + ("rows not present in B" -> metrics.diffCountA) ~ + ("unique rows count" -> metrics.uniqueRowCountA)) ~ + ("B" -> + ("row count" -> metrics.rowCountB) ~ + ("column count" -> metrics.columnCountB) ~ + ("rows not present in A" -> metrics.diffCountB) ~ + ("unique rows count" -> metrics.uniqueRowCountB)) ~ + ("general" -> + ("same records count" -> metrics.sameRecordsCount) ~ + ("same records percent to A" -> metrics.sameRecordsPercentToA) ~ + ("excluded columns" -> metrics.excludedColumns.mkString(", "))) + } + + /** Serializes ComparisonMetrics to a compact JSON string. + * + * @param metrics + * The comparison metrics to serialize + * @return + * Compact JSON string representation of the metrics + */ + def serialize(metrics: ComparisonMetrics): String = { + compact(render(toJson(metrics))) + } +} diff --git a/bigfiles/src/main/scala/za/co/absa/io/IOHandler.scala b/bigfiles/app/src/main/scala/za/co/absa/io/IOHandler.scala similarity index 100% rename from bigfiles/src/main/scala/za/co/absa/io/IOHandler.scala rename to bigfiles/app/src/main/scala/za/co/absa/io/IOHandler.scala diff --git a/bigfiles/src/main/scala/za/co/absa/parser/ArgsParser.scala b/bigfiles/app/src/main/scala/za/co/absa/parser/ArgsParser.scala similarity index 100% rename from bigfiles/src/main/scala/za/co/absa/parser/ArgsParser.scala rename to bigfiles/app/src/main/scala/za/co/absa/parser/ArgsParser.scala diff --git a/bigfiles/src/main/scala/za/co/absa/parser/Arguments.scala b/bigfiles/app/src/main/scala/za/co/absa/parser/Arguments.scala similarity index 100% rename from bigfiles/src/main/scala/za/co/absa/parser/Arguments.scala rename to bigfiles/app/src/main/scala/za/co/absa/parser/Arguments.scala diff --git a/bigfiles/src/main/scala/za/co/absa/parser/DiffComputeType.scala b/bigfiles/app/src/main/scala/za/co/absa/parser/DiffComputeType.scala similarity index 100% rename from bigfiles/src/main/scala/za/co/absa/parser/DiffComputeType.scala rename to bigfiles/app/src/main/scala/za/co/absa/parser/DiffComputeType.scala diff --git a/bigfiles/src/main/scala/za/co/absa/parser/OutputFormatType.scala b/bigfiles/app/src/main/scala/za/co/absa/parser/OutputFormatType.scala similarity index 100% rename from bigfiles/src/main/scala/za/co/absa/parser/OutputFormatType.scala rename to bigfiles/app/src/main/scala/za/co/absa/parser/OutputFormatType.scala diff --git a/bigfiles/src/test/resources/inputA.txt b/bigfiles/app/src/test/resources/inputA.txt similarity index 100% rename from bigfiles/src/test/resources/inputA.txt rename to bigfiles/app/src/test/resources/inputA.txt diff --git a/bigfiles/src/test/resources/inputB.txt b/bigfiles/app/src/test/resources/inputB.txt similarity index 100% rename from bigfiles/src/test/resources/inputB.txt rename to bigfiles/app/src/test/resources/inputB.txt diff --git a/bigfiles/src/test/resources/namesA.parquet b/bigfiles/app/src/test/resources/namesA.parquet similarity index 100% rename from bigfiles/src/test/resources/namesA.parquet rename to bigfiles/app/src/test/resources/namesA.parquet diff --git a/bigfiles/src/test/resources/namesB.parquet b/bigfiles/app/src/test/resources/namesB.parquet similarity index 100% rename from bigfiles/src/test/resources/namesB.parquet rename to bigfiles/app/src/test/resources/namesB.parquet diff --git a/bigfiles/src/test/resources/out.txt b/bigfiles/app/src/test/resources/out.txt similarity index 100% rename from bigfiles/src/test/resources/out.txt rename to bigfiles/app/src/test/resources/out.txt diff --git a/bigfiles/src/test/resources/out/file b/bigfiles/app/src/test/resources/out/file similarity index 100% rename from bigfiles/src/test/resources/out/file rename to bigfiles/app/src/test/resources/out/file diff --git a/bigfiles/src/test/scala/ArgsParserTest.scala b/bigfiles/app/src/test/scala/ArgsParserTest.scala similarity index 100% rename from bigfiles/src/test/scala/ArgsParserTest.scala rename to bigfiles/app/src/test/scala/ArgsParserTest.scala diff --git a/bigfiles/src/test/scala/DatasetComparisonTest.scala b/bigfiles/app/src/test/scala/DatasetComparisonTest.scala similarity index 100% rename from bigfiles/src/test/scala/DatasetComparisonTest.scala rename to bigfiles/app/src/test/scala/DatasetComparisonTest.scala diff --git a/bigfiles/src/test/scala/IOHandlerTest.scala b/bigfiles/app/src/test/scala/IOHandlerTest.scala similarity index 100% rename from bigfiles/src/test/scala/IOHandlerTest.scala rename to bigfiles/app/src/test/scala/IOHandlerTest.scala diff --git a/bigfiles/app/src/test/scala/MetricsSerializerTest.scala b/bigfiles/app/src/test/scala/MetricsSerializerTest.scala new file mode 100644 index 0000000..78f8792 --- /dev/null +++ b/bigfiles/app/src/test/scala/MetricsSerializerTest.scala @@ -0,0 +1,154 @@ +/** + * Copyright 2020 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +import za.co.absa.MetricsSerializer +import org.scalatest.funsuite.AnyFunSuite +import org.json4s._ +import org.json4s.native.JsonMethods._ +import za.co.absa.analysis.ComparisonMetrics + +class MetricsSerializerTest extends AnyFunSuite { + + implicit val formats: DefaultFormats.type = DefaultFormats + + test("test that serialize returns correct JSON string for datasets with differences") { + val metrics = ComparisonMetrics( + rowCountA = 2, + rowCountB = 3, + columnCountA = 2, + columnCountB = 2, + diffCountA = 1, + diffCountB = 2, + uniqueRowCountA = 2, + uniqueRowCountB = 3, + sameRecordsCount = 1, + sameRecordsPercentToA = 50.0, + excludedColumns = Seq() + ) + + val result = MetricsSerializer.serialize(metrics) + val expected = "{\"A\":{\"row count\":2," + + "\"column count\":2," + + "\"rows not present in B\":1," + + "\"unique rows count\":2}," + + "\"B\":{\"row count\":3," + + "\"column count\":2," + + "\"rows not present in A\":2," + + "\"unique rows count\":3}," + + "\"general\":{\"same records count\":1,\"same records percent to A\":50.0,\"excluded columns\":\"\"}}" + + assert(result == expected) + } + + test("test that serialize returns correct JSON string with duplicates in data") { + val metrics = ComparisonMetrics( + rowCountA = 3, + rowCountB = 4, + columnCountA = 2, + columnCountB = 2, + diffCountA = 0, + diffCountB = 1, + uniqueRowCountA = 2, + uniqueRowCountB = 2, + sameRecordsCount = 3, + sameRecordsPercentToA = 100.0, + excludedColumns = Seq() + ) + + val result = MetricsSerializer.serialize(metrics) + val expected = "{\"A\":{\"row count\":3," + + "\"column count\":2," + + "\"rows not present in B\":0," + + "\"unique rows count\":2}," + + "\"B\":{\"row count\":4," + + "\"column count\":2," + + "\"rows not present in A\":1," + + "\"unique rows count\":2}," + + "\"general\":{\"same records count\":3,\"same records percent to A\":100.0,\"excluded columns\":\"\"}}" + + assert(result == expected) + } + + test("test that serialize returns correct JSON string with excluded columns") { + val metrics = ComparisonMetrics( + rowCountA = 2, + rowCountB = 3, + columnCountA = 1, + columnCountB = 1, + diffCountA = 0, + diffCountB = 1, + uniqueRowCountA = 2, + uniqueRowCountB = 3, + sameRecordsCount = 2, + sameRecordsPercentToA = 100.0, + excludedColumns = Seq("id") + ) + + val result = MetricsSerializer.serialize(metrics) + val expected = "{\"A\":{\"row count\":2," + + "\"column count\":1," + + "\"rows not present in B\":0," + + "\"unique rows count\":2}," + + "\"B\":{\"row count\":3," + + "\"column count\":1," + + "\"rows not present in A\":1," + + "\"unique rows count\":3}," + + "\"general\":{\"same records count\":2,\"same records percent to A\":100.0,\"excluded columns\":\"id\"}}" + + assert(result == expected) + } + + test("test that toJson creates parseable JSON object") { + val metrics = ComparisonMetrics( + rowCountA = 10, + rowCountB = 12, + columnCountA = 4, + columnCountB = 4, + diffCountA = 2, + diffCountB = 4, + uniqueRowCountA = 9, + uniqueRowCountB = 11, + sameRecordsCount = 8, + sameRecordsPercentToA = 80.0, + excludedColumns = Seq("timestamp", "id") + ) + + val json = MetricsSerializer.toJson(metrics) + + // Verify structure + assert(json \ "A" != JNothing) + assert(json \ "B" != JNothing) + assert(json \ "general" != JNothing) + + // Verify A values + assert((json \ "A" \ "row count").extract[Long] == 10) + assert((json \ "A" \ "column count").extract[Int] == 4) + assert((json \ "A" \ "rows not present in B").extract[Long] == 2) + assert((json \ "A" \ "unique rows count").extract[Long] == 9) + + // Verify B values + assert((json \ "B" \ "row count").extract[Long] == 12) + assert((json \ "B" \ "column count").extract[Int] == 4) + assert((json \ "B" \ "rows not present in A").extract[Long] == 4) + assert((json \ "B" \ "unique rows count").extract[Long] == 11) + + // Verify general values + assert((json \ "general" \ "same records count").extract[Long] == 8) + assert((json \ "general" \ "same records percent to A").extract[Double] == 80.0) + assert((json \ "general" \ "excluded columns").extract[String] == "timestamp, id") + } +} + diff --git a/bigfiles/app/src/test/scala/SparkTestSession.scala b/bigfiles/app/src/test/scala/SparkTestSession.scala new file mode 100644 index 0000000..bb13e95 --- /dev/null +++ b/bigfiles/app/src/test/scala/SparkTestSession.scala @@ -0,0 +1,30 @@ +/** + * Copyright 2020 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +import org.apache.spark.sql.SparkSession + +import java.io.File + +object SparkTestSession { + lazy val spark: SparkSession = SparkSession.builder() + .appName("SparkTestSession") + .config("spark.hadoop.fs.default.name", FS_URI) + .config("spark.hadoop.fs.defaultFS", FS_URI) + .master("local[*]") + .getOrCreate() + + val FS_URI: String = new File("src/test/resources").getAbsoluteFile.toURI.toString +} diff --git a/bigfiles/src/test/scala/VersionTest.scala b/bigfiles/app/src/test/scala/VersionTest.scala similarity index 100% rename from bigfiles/src/test/scala/VersionTest.scala rename to bigfiles/app/src/test/scala/VersionTest.scala diff --git a/bigfiles/build.sbt b/bigfiles/build.sbt index cb9ce33..9ade556 100644 --- a/bigfiles/build.sbt +++ b/bigfiles/build.sbt @@ -15,24 +15,43 @@ ThisBuild / version := "0.1.0" ThisBuild / scalaVersion := scala212 ThisBuild / organization := "za.co.absa" -lazy val root = (project in file(".")) +ThisBuild / assemblyMergeStrategy := { + case PathList("META-INF", xs @ _*) => MergeStrategy.discard + case x => MergeStrategy.first +} + +// JaCoCo code coverage +Test / jacocoReportSettings := JacocoReportSettings( + title = s"{project} Jacoco Report - scala:${scalaVersion.value}", + formats = Seq(JacocoReportFormats.HTML, JacocoReportFormats.XML) +) + +Test / jacocoExcludes := Seq("za.co.absa.DatasetComparison*") + +// ── api module ──────────────────────────────────────────────────────────────── +// Pure comparison logic and data models. No CLI or I/O concerns. +lazy val api = (project in file("api")) + .settings( + name := "dataset-comparison-api", + crossScalaVersions := supportedScalaVersions, + libraryDependencies ++= apiDependencies(scalaVersion.value), + javacOptions ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint"), + Test / fork := true, + Test / baseDirectory := (ThisBuild / baseDirectory).value / "api" + ) + +// ── app module ──────────────────────────────────────────────────────────────── +// CLI entry point, argument parsing, I/O and serialization. Depends on api. +lazy val app = (project in file("app")) + .dependsOn(api) .settings( name := "dataset-comparison", crossScalaVersions := supportedScalaVersions, assembly / mainClass := Some("za.co.absa.DatasetComparison"), - libraryDependencies ++= bigfilesDependencies ++ Seq( - "org.apache.spark" %% "spark-core" % sparkVersionForScala(scalaVersion.value) % Provided, - "org.apache.spark" %% "spark-sql" % sparkVersionForScala(scalaVersion.value) % Provided, - "org.json4s" %% "json4s-native" % jsonVersionForScala(scalaVersion.value), - "org.json4s" %% "json4s-jackson" % jsonVersionForScala(scalaVersion.value), - "org.apache.hadoop" % "hadoop-common" % hadoopVersionForScala(scalaVersion.value), - "org.apache.hadoop" % "hadoop-client" % hadoopVersionForScala(scalaVersion.value), - "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersionForScala(scalaVersion.value), - "com.lihaoyi" %% "upickle" % unpickleVersionForScala(scalaVersion.value) - ), + libraryDependencies ++= appDependencies(scalaVersion.value), javacOptions ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint"), Test / fork := true, - Test / baseDirectory := (ThisBuild / baseDirectory).value, + Test / baseDirectory := (ThisBuild / baseDirectory).value / "app", packageOptions := Seq( ManifestAttributes( ("Built-By", System.getProperty("user.name")), @@ -42,15 +61,11 @@ lazy val root = (project in file(".")) ) ) -// JaCoCo code coverage -Test / jacocoReportSettings := JacocoReportSettings( - title = s"{project} Jacoco Report - scala:${scalaVersion.value}", - formats = Seq(JacocoReportFormats.HTML, JacocoReportFormats.XML) -) - -Test / jacocoExcludes := Seq("za.co.absa.DatasetComparison*") - -ThisBuild / assemblyMergeStrategy := { - case PathList("META-INF", xs @ _*) => MergeStrategy.discard - case x => MergeStrategy.first -} +// ── root aggregate ──────────────────────────────────────────────────────────── +lazy val root = (project in file(".")) + .aggregate(api, app) + .settings( + name := "dataset-comparison-root", + // Prevent root from being published or assembled + publish / skip := true + ) diff --git a/bigfiles/project/Dependencies.scala b/bigfiles/project/Dependencies.scala index 96806a9..f25062a 100644 --- a/bigfiles/project/Dependencies.scala +++ b/bigfiles/project/Dependencies.scala @@ -69,26 +69,54 @@ object Dependencies { } } - def bigfilesDependencies: Seq[ModuleID] = { - lazy val fansi = "com.lihaoyi" %% "fansi" % Versions.fansi - lazy val scopt = "com.github.scopt" %% "scopt" % Versions.scopt - lazy val slf4jApi = "org.slf4j" % "slf4j-api" % Versions.slf4jApi exclude ("log4j", "log4j") - lazy val config = "com.typesafe" % "config" % Versions.config - + /** Dependencies for the api module — pure comparison logic, no CLI concerns. */ + def apiDependencies(scalaVersion: String): Seq[ModuleID] = { + lazy val slf4jApi = "org.slf4j" % "slf4j-api" % Versions.slf4jApi exclude ("log4j", "log4j") lazy val scalatest = "org.scalatest" %% "scalatest" % Versions.scalatest % Test - // Required for scala 2.11 + spark 2.4.7 lazy val snappy = "org.xerial.snappy" % "snappy-java" % "1.1.8.4" lazy val jackson = "com.fasterxml.jackson.module" %% "jackson-module-scala" % Versions.jackson211_212 % Provided Seq( + "org.apache.spark" %% "spark-core" % sparkVersionForScala(scalaVersion) % Provided, + "org.apache.spark" %% "spark-sql" % sparkVersionForScala(scalaVersion) % Provided, + "org.apache.hadoop" % "hadoop-common" % hadoopVersionForScala(scalaVersion), + "org.apache.hadoop" % "hadoop-client" % hadoopVersionForScala(scalaVersion), + "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersionForScala(scalaVersion), + "com.lihaoyi" %% "upickle" % unpickleVersionForScala(scalaVersion), + slf4jApi, scalatest, + snappy, + jackson + ) + } + + /** Dependencies for the app module — CLI, I/O and serialization layer. */ + def appDependencies(scalaVersion: String): Seq[ModuleID] = { + lazy val fansi = "com.lihaoyi" %% "fansi" % Versions.fansi + lazy val scopt = "com.github.scopt" %% "scopt" % Versions.scopt + lazy val slf4jApi = "org.slf4j" % "slf4j-api" % Versions.slf4jApi exclude ("log4j", "log4j") + lazy val config = "com.typesafe" % "config" % Versions.config + lazy val scalatest = "org.scalatest" %% "scalatest" % Versions.scalatest % Test + // Required for scala 2.11 + spark 2.4.7 + lazy val snappy = "org.xerial.snappy" % "snappy-java" % "1.1.8.4" + lazy val jackson = "com.fasterxml.jackson.module" %% "jackson-module-scala" % Versions.jackson211_212 % Provided + + Seq( + "org.apache.spark" %% "spark-core" % sparkVersionForScala(scalaVersion) % Provided, + "org.apache.spark" %% "spark-sql" % sparkVersionForScala(scalaVersion) % Provided, + "org.apache.hadoop" % "hadoop-common" % hadoopVersionForScala(scalaVersion), + "org.apache.hadoop" % "hadoop-client" % hadoopVersionForScala(scalaVersion), + "org.apache.hadoop" % "hadoop-hdfs" % hadoopVersionForScala(scalaVersion), + "org.json4s" %% "json4s-native" % jsonVersionForScala(scalaVersion), + "org.json4s" %% "json4s-jackson" % jsonVersionForScala(scalaVersion), fansi, scopt, slf4jApi, config, - jackson, - snappy + scalatest, + snappy, + jackson ) } } diff --git a/bigfiles/project/plugins.sbt b/bigfiles/project/plugins.sbt index 28851e0..3e7e57f 100644 --- a/bigfiles/project/plugins.sbt +++ b/bigfiles/project/plugins.sbt @@ -12,7 +12,6 @@ addSbtPlugin("org.jetbrains.scala" % "sbt-ide-settings" % "1.1.2") -// Plugins to build the server module as a jar file addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.2.0") // sbt-jacoco dependency downloading diff --git a/bigfiles/src/main/scala/za/co/absa/Comparator.scala b/bigfiles/src/main/scala/za/co/absa/Comparator.scala deleted file mode 100644 index 7cb0a20..0000000 --- a/bigfiles/src/main/scala/za/co/absa/Comparator.scala +++ /dev/null @@ -1,121 +0,0 @@ -/** Copyright 2020 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package za.co.absa - -import za.co.absa.hash.HashUtils.HASH_COLUMN_NAME -import hash.HashUtils -import org.apache.spark.sql.{DataFrame, SparkSession} -import org.json4s.JsonAST -import org.json4s.native.JsonMethods.{compact, render} -import org.json4s.JsonDSL._ -import org.slf4j.{Logger, LoggerFactory} - -/** Comparator object that compares two parquet files and creates metrics - */ -object Comparator { - - private val logger: Logger = LoggerFactory.getLogger(Comparator.getClass) - - /** Create metrics - * - * @param dataA - * A DataFrame whole data - * @param dataB - * B DataFrame whole data - * @param diffA - * only diff rows in A DataFrame - * @param diffB - * only diff rows in B DataFrame - * @param excludedColumns - * columns that were excluded from comparison - * @return - * JSON string with metrics - */ - def createMetrics( - dataA: DataFrame, - dataB: DataFrame, - diffA: DataFrame, - diffB: DataFrame, - excludedColumns: Seq[String] - ): String = { - - logger.info("Computing metrics") - - // compute metrics - val rowCountA = dataA.count() - val rowCountB = dataB.count() - val uniqRowCountA = dataA.distinct().count() - val uniqRowCountB = dataB.distinct().count() - val diffCountA = diffA.count() - val diffCountB = diffB.count() - val sameRecords = if (rowCountA - diffCountA == rowCountB - diffCountB) rowCountA - diffCountA else -1 - - val metricsJson: JsonAST.JObject = - ("A" -> - ("row count" -> rowCountA) ~ - ("column count" -> dataA.columns.length) ~ - ("rows not present in B" -> diffCountA) ~ - ("unique rows count" -> uniqRowCountA)) ~ - ("B" -> - ("row count" -> rowCountB) ~ - ("column count" -> dataB.columns.length) ~ - ("rows not present in A" -> diffCountB) ~ - ("unique rows count" -> uniqRowCountB)) ~ - ("general" -> - ("same records count" -> sameRecords) ~ - ("same records percent to A" -> (math floor (sameRecords.toFloat / rowCountA) * 10000) / 100) ~ - ("excluded columns" -> excludedColumns.mkString(", "))) - - compact(render(metricsJson)) - } - - /** Compare two parquet files and return diff rows - * - * @param dataA - * dataframe from input A - * @param dataB - * dataframe from input B - * @param spark - * SparkSession - * @return - * two DataFrames with diff rows - */ - def compare(dataA: DataFrame, dataB: DataFrame)(implicit spark: SparkSession): (DataFrame, DataFrame) = { - - // preprocess data todo will be solved by issue #5 - - // compute hash rows - val dfWithHashA: DataFrame = HashUtils.createHashColumn(dataA) - logger.info("Hash for A created") - val dfWithHashB: DataFrame = HashUtils.createHashColumn(dataB) - logger.info("Hash for B created") - - // select non matching hashs - logger.info("Getting diff hashes, A except B") - val diffHashA: DataFrame = dfWithHashA.select(HASH_COLUMN_NAME).exceptAll(dfWithHashB.select(HASH_COLUMN_NAME)) - logger.info("Getting diff hashes, B except A") - val diffHashB: DataFrame = dfWithHashB.select(HASH_COLUMN_NAME).exceptAll(dfWithHashA.select(HASH_COLUMN_NAME)) - - // join on hash column (get back whole rows) - logger.info("Getting diff rows for A") - val distinctDiffA: DataFrame = diffHashA.join(dfWithHashA, Seq(HASH_COLUMN_NAME)).distinct() - val diffA: DataFrame = diffHashA.join(distinctDiffA, Seq(HASH_COLUMN_NAME)) - - logger.info("Getting diff rows for B") - val distinctDiffB: DataFrame = diffHashB.join(dfWithHashB, Seq(HASH_COLUMN_NAME)).distinct() - val diffB: DataFrame = diffHashB.join(distinctDiffB, Seq(HASH_COLUMN_NAME)) - - (diffA, diffB) - } - -} diff --git a/bigfiles/src/main/scala/za/co/absa/DatasetComparison.scala b/bigfiles/src/main/scala/za/co/absa/DatasetComparison.scala deleted file mode 100644 index e171277..0000000 --- a/bigfiles/src/main/scala/za/co/absa/DatasetComparison.scala +++ /dev/null @@ -1,90 +0,0 @@ -/** Copyright 2020 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package za.co.absa - -import za.co.absa.DatasetComparison.logger -import za.co.absa.analysis.RowByRowAnalysis -import za.co.absa.parser.{ArgsParser, DiffComputeType} -import za.co.absa.io.IOHandler -import org.apache.spark.sql.{DataFrame, SparkSession} -import com.typesafe.config.ConfigFactory -import org.slf4j.{Logger, LoggerFactory} - -import java.nio.file.Paths - -object DatasetComparison { - private val logger: Logger = LoggerFactory.getLogger(this.getClass) - - def main(args: Array[String]): Unit = { - val conf = ConfigFactory.load() - val threshold = conf.getInt("dataset-comparison.analysis.diff-threshold") - - val arguments = ArgsParser.getArgs(args) - - implicit val spark: SparkSession = SparkSession - .builder() - .appName("DatasetComparator") - .getOrCreate() - - // validate arguments - ArgsParser.validate(arguments) - - // read data - val rawDataA = IOHandler.sparkRead(arguments.inputA) - val rawDataB = IOHandler.sparkRead(arguments.inputB) - val dataA: DataFrame = DatasetComparisonHelper.exclude(rawDataA, arguments.exclude, "A") - val dataB: DataFrame = DatasetComparisonHelper.exclude(rawDataB, arguments.exclude, "B") - - val (uniqA, uniqB) = Comparator.compare(dataA, dataB) - - val metrics: String = Comparator.createMetrics(dataA, dataB, uniqA, uniqB, arguments.exclude) - - // write to files - val out = arguments.out - IOHandler.dfWrite(Paths.get(out, "inputA_differences").toString, uniqA, arguments.outFormat) - IOHandler.dfWrite(Paths.get(out, "inputB_differences").toString, uniqB, arguments.outFormat) - IOHandler.jsonWrite(Paths.get(out, "metrics.json").toString, metrics) - - val uniqAEmpty = uniqA.isEmpty - val uniqBEmpty = uniqB.isEmpty - arguments.diff match { - case _ if uniqBEmpty || uniqAEmpty => logEitherUniqEmpty(uniqAEmpty, uniqBEmpty) - case DiffComputeType.Row => handleRowDiffType(uniqA, uniqB, out, threshold) - case _ => logger.info("None DiffComputeType selected") - } - } - - private def handleRowDiffType(uniqA: DataFrame, uniqB: DataFrame, out: String, threshold: Int)(implicit - sparkSession: SparkSession - ): Unit = { - if (uniqA.count() <= threshold && uniqB.count() <= threshold) { - val diffA = RowByRowAnalysis.generateDiffJson(uniqA, uniqB, "A") - val diffB = RowByRowAnalysis.generateDiffJson(uniqB, uniqA, "B") - - // write diff - IOHandler.rowDiffWriteAsJson(Paths.get(out, "A_to_B_changes.json").toString, diffA) - IOHandler.rowDiffWriteAsJson(Paths.get(out, "B_to_A_changes.json").toString, diffB) - } else { - logger.warn("The number of differences is too large to compute row by row differences.") - } - } - - private def logEitherUniqEmpty(uniqAEmpty: Boolean, uniqBEmpty: Boolean): Unit = { - logger.info( - s"""Detailed analysis will not be computed: - |A: ${if (uniqAEmpty) "All rows matched" else "There is no other row in B look to inputA_differences"} - |B: ${if (uniqBEmpty) "All rows matched" else "There is no other row in A look to inputB_differences"} - |""".stripMargin - ) - } -} diff --git a/bigfiles/src/test/scala/ComparatorTest.scala b/bigfiles/src/test/scala/ComparatorTest.scala deleted file mode 100644 index 74b605b..0000000 --- a/bigfiles/src/test/scala/ComparatorTest.scala +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Copyright 2020 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -import za.co.absa.Comparator -import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} -import org.scalatest.funsuite.AnyFunSuite - -class ComparatorTest extends AnyFunSuite { - implicit val spark: SparkSession = SparkTestSession.spark - - import spark.implicits._ - - test("test that comparator returns diff rows"){ - val tmp1: DataFrame = Seq((1, "one"), (2, "two")).toDF("id", "value") - val tmp2: DataFrame = Seq((1, "one"), (2, "three"), (3, "two")).toDF("id", "value") - - val (diff1, diff2) = Comparator.compare(tmp1, tmp2) - assert(diff1.count() == 1) - assert(diff2.count() == 2) - } - - test("test that comparator returns all rows if dataframes are completely different"){ - val tmp1: DataFrame = Seq((1, "one"), (2, "two")).toDF("id", "value") - val tmp2: DataFrame = Seq(("12af", 1003), ("12qw", 3004), ("123q", 3456)).toDF("id", "value") - - val (diff1, diff2) = Comparator.compare(tmp1, tmp2) - assert(diff1.count() == 2) - assert(diff2.count() == 3) - } - - test("test that comparator returns empty DataFrames if all rows are the same"){ - val tmp1: DataFrame = Seq((1, "one"), (2, "two")).toDF("id", "value") - val tmp2: DataFrame = Seq((1, "one"), (2, "two")).toDF("id", "value") - - val (diff1, diff2) = Comparator.compare(tmp1, tmp2) - assert(diff1.count() == 0) - assert(diff2.count() == 0) - } - - test("test that comparator returns empty DataFrames if all rows are the same but in different order"){ - val tmp1: DataFrame = Seq((2, "two"), (1, "one")).toDF("id", "value") - val tmp2: DataFrame = Seq((1, "one"), (2, "two")).toDF("id", "value") - - val (diff1, diff2) = Comparator.compare(tmp1, tmp2) - assert(diff1.count() == 0) - assert(diff2.count() == 0) - } - - test("test that comparator returns correct dataframes if one duplicate is present in one table"){ - val tmp1: DataFrame = Seq((1, "one"), (1, "one"), (2, "two")).toDF("id", "value") - val tmp2: DataFrame = Seq((1, "one"), (2, "two")).toDF("id", "value") - - val (diff1, diff2) = Comparator.compare(tmp1, tmp2) - - assert(diff1.count() == 1) - assert(diff2.count() == 0) - } - - test("test that comparator returns correct dataframes if 2 duplicates are present in one table"){ - val tmp1: DataFrame = Seq((1, "one"), (1, "one"), (1, "one"), (2, "two")).toDF("id", "value") - val tmp2: DataFrame = Seq((1, "one"), (2, "two")).toDF("id", "value") - - val (diff1, diff2) = Comparator.compare(tmp1, tmp2) - - assert(diff1.count() == 2) - assert(diff2.count() == 0) - } - - - test("test that comparator returns correct dataframes if duplicates are present"){ - val tmp1: DataFrame = Seq((1, "one"), (1, "one"), (2, "two")).toDF("id", "value") - val tmp2: DataFrame = Seq((1, "one"), (1, "one"), (1, "one"), (2, "two")).toDF("id", "value") - val (diff1, diff2) = Comparator.compare(tmp1, tmp2) - - assert(diff1.count() == 0) - assert(diff2.count() == 1) - } - - - //////////////////////////// createMetrics ///////////////////////////////////// - - test("test that createMetrics returns correct JSON string"){ - val tmp1: DataFrame = Seq((1, "one"), (2, "two")).toDF("id", "value") - val tmp2: DataFrame = Seq((1, "one"), (2, "three"), (3, "two")).toDF("id", "value") - val (diff1, diff2) = Comparator.compare(tmp1, tmp2) - val metrics = Comparator.createMetrics(tmp1, tmp2, diff1, diff2, Seq()) - val expected = "{\"A\":{\"row count\":2," + - "\"column count\":2," + - "\"rows not present in B\":1," + - "\"unique rows count\":2}," + - "\"B\":{\"row count\":3," + - "\"column count\":2," + - "\"rows not present in A\":2," + - "\"unique rows count\":3}," + - "\"general\":{\"same records count\":1,\"same records percent to A\":50.0,\"excluded columns\":\"\"}}" - assert(metrics == expected) - } - - test("test that createMetrics returns correct JSON string, duplicates in data"){ - val tmp1: DataFrame = Seq((1, "one"), (1, "one"), (2, "two")).toDF("id", "value") - val tmp2: DataFrame = Seq((1, "one"), (1, "one"), (1, "one"), (2, "two")).toDF("id", "value") - val (diff1, diff2) = Comparator.compare(tmp1, tmp2) - val metrics = Comparator.createMetrics(tmp1, tmp2, diff1, diff2, Seq()) - val expected = "{\"A\":{\"row count\":3," + - "\"column count\":2," + - "\"rows not present in B\":0," + - "\"unique rows count\":2}," + - "\"B\":{\"row count\":4," + - "\"column count\":2," + - "\"rows not present in A\":1," + - "\"unique rows count\":2}," + - "\"general\":{\"same records count\":3,\"same records percent to A\":100.0,\"excluded columns\":\"\"}}" - assert(metrics == expected) - } - - test("test that createMetrics returns correct JSON string with excluded columns"){ - val tmp1: DataFrame = Seq(("one"), ("two")).toDF("value") - val tmp2: DataFrame = Seq(("one"), ("three"), ("two")).toDF("value") - val (diff1, diff2) = Comparator.compare(tmp1, tmp2) - val metrics = Comparator.createMetrics(tmp1, tmp2, diff1, diff2, Seq("id")) - val expected = "{\"A\":{\"row count\":2," + - "\"column count\":1," + - "\"rows not present in B\":0," + - "\"unique rows count\":2}," + - "\"B\":{\"row count\":3," + - "\"column count\":1," + - "\"rows not present in A\":1," + - "\"unique rows count\":3}," + - "\"general\":{\"same records count\":2,\"same records percent to A\":100.0,\"excluded columns\":\"id\"}}" - assert(metrics == expected) - } -}