Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 87 additions & 33 deletions bigfiles/README.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,67 @@
# Scala CPS-Dataset-Comparison
# Scala CPS-Dataset-Comparison

This is scala implementation of the project. It is used for comparing big files (files that can not fit to RAM).

- [Project Structure](#project-structure)
- [How to run](#how-to-run)
- [Requirements](#requirements)
- [Switch to specific SDK](#switch-to-specific-sdk)
- [Requirements](#requirements)
- [Switch to specific SDK](#switch-to-specific-sdk)
- [How to run tests](#how-to-run-tests)

## Project Structure

The project is split into two SBT submodules:

| Module | Purpose |
|--------|---------|
| `api` | Pure comparison logic and data models: `Comparator`, `DatasetComparisonHelper`, `HashUtils`, all `analysis/*` classes. No CLI or I/O dependencies. |
| `app` | CLI entry point, I/O, serialization: `DatasetComparison`, `MetricsSerializer`, `IOHandler`, `ArgsParser`, `Arguments`, `DiffComputeType`, `OutputFormatType`. Depends on `api`. |

```
bigfiles/
├── api/
│ ├── src/main/scala/za/co/absa/
│ │ ├── analysis/ # AnalyseStat, AnalysisResult, ColumnsDiff, RowsDiff,
│ │ │ # ComparisonMetrics, ComparisonMetricsCalculator, RowByRowAnalysis
│ │ ├── hash/HashUtils.scala
│ │ ├── Comparator.scala
│ │ └── DatasetComparisonHelper.scala
│ └── src/test/scala/ # ComparatorTest, ComparisonMetricsCalculatorTest,
│ # DatasetComparisonHelperTest, HashTableTest,
│ # RowByRowAnalysesTest, AnalysisResultTest, SparkTestSession
├── app/
│ ├── src/main/scala/za/co/absa/
│ │ ├── io/IOHandler.scala
│ │ ├── parser/ # ArgsParser, Arguments, DiffComputeType, OutputFormatType
│ │ ├── DatasetComparison.scala
│ │ └── MetricsSerializer.scala
│ ├── src/main/resources/application.conf
│ └── src/test/scala/ # DatasetComparisonTest, ArgsParserTest, IOHandlerTest,
│ # MetricsSerializerTest, VersionTest, SparkTestSession
├── testdata/ # Shared test resources (symlinked into api & app test resources)
│ ├── namesA.parquet
│ ├── namesB.parquet
│ ├── inputA.txt
│ ├── inputB.txt
│ └── out.txt
└── project/
└── Dependencies.scala # apiDependencies and appDependencies groups
```

## How to run

First run assembly: `sbt assembly`
First run assembly from the `app` module:

```bash
sbt app/assembly
```

Then run:

```bash
spark-submit target/scala-2.12/dataset-comparison-assembly-1.0.jar -o <output-path> --inputA <A-file-path> --inputB <B-file-path>

spark-submit app/target/scala-2.12/dataset-comparison-assembly-1.0.jar -o <output-path> --inputA <A-file-path> --inputB <B-file-path>
```

### Parameters:
| Parameter | Description | Required |
|-----------|-------------|----------|
Expand All @@ -29,23 +74,27 @@ spark-submit target/scala-2.12/dataset-comparison-assembly-1.0.jar -o <output-pa

Example:
```bash
spark-submit --class africa.absa.cps.DatasetComparison \
--conf "spark.driver.extraJavaOptions=-Dconfig.file=/../bigfiles/src/main/resources/application.conf" \
target/scala-2.11/dataset-comparison-assembly-0.1.0.jar \
spark-submit --class za.co.absa.DatasetComparison \
--conf "spark.driver.extraJavaOptions=-Dconfig.file=/../bigfiles/app/src/main/resources/application.conf" \
app/target/scala-2.12/dataset-comparison-assembly-1.0.jar \
-o "/test_files/output_names$(date '+%Y-%m-%d_%H%M%S')" \
--inputA /test_files/namesA.parquet \
--inputB /test_files/namesB.parquet \
-d Row

```

### Run with specific config
### Run with specific config

```bash
spark-submit --class za.co.absa.DatasetComparison --conf "spark.driver.extraJavaOptions=-Dconfig.file=/path/to/application.conf" target/scala-2.12/dataset-comparison-assembly-0.1.0.jar -o <output-path> --inputA <A-file-path> --inputB <B-file-path> -d Row
spark-submit --class za.co.absa.DatasetComparison \
--conf "spark.driver.extraJavaOptions=-Dconfig.file=/path/to/application.conf" \
app/target/scala-2.12/dataset-comparison-assembly-1.0.jar \
-o <output-path> --inputA <A-file-path> --inputB <B-file-path> -d Row
```

`-d Row` is optional parameter for detailed analyses that specifies which analyses to use. Now it can be only `Row`.
It will compute detailed analyses if number of different columns is less than 200, you can change this threshold in `src/main/resources/application.conf`.
It will compute detailed analyses if number of different columns is less than 200, you can change this threshold in `app/src/main/resources/application.conf`.

### Spark configuration
you can set spark configuration in `spark-defaults.conf` file it is stored in `$SPARK_HOME/conf` directory
You will found there `spark-defaults.conf.template` remove `.template` from the file name and set your configuration there.
Expand All @@ -54,7 +103,6 @@ It could look like this:
```bash
spark.hadoop.fs.default.name hdfs://localhost:9999/ # set your hdfs uri
spark.hadoop.fs.defaultFS hdfs://localhost:9999/ # set your hdfs uri

```

### Requirements
Expand Down Expand Up @@ -83,17 +131,23 @@ sdk env install

## How to run tests

Tests are split between the two modules. Use the following commands:

| sbt command | test type | info |
| ----------- |-----------|----------------------------------------|
| `sbt test` | ... | It will run tests in test/scala folder |
| sbt command | Module | Test files |
|------------------|--------|------------|
| `sbt api/test` | api | ComparatorTest, ComparisonMetricsCalculatorTest, DatasetComparisonHelperTest, HashTableTest, RowByRowAnalysesTest, AnalysisResultTest |
| `sbt app/test` | app | DatasetComparisonTest, ArgsParserTest, IOHandlerTest, MetricsSerializerTest, VersionTest |
| `sbt test` | both | Runs all tests across both modules (root aggregate) |

`SparkTestSession` is duplicated into both `api/src/test/scala/` and `app/src/test/scala/` as it is required by tests in both modules.

Test resources (`namesA.parquet`, `namesB.parquet`, `inputA.txt`, `inputB.txt`, `out.txt`) live in `testdata/` at the root and are symlinked into `api/src/test/resources/` and `app/src/test/resources/`.

---------

## Installing hadoop

tutorial [here](https://dev.to/awwsmm/installing-and-running-hadoop-and-spark-on-ubuntu-18-393h)
tutorial [here](https://dev.to/awwsmm/installing-and-running-hadoop-and-spark-on-ubuntu-18-393h)
1. sdk install hadoop
2. ``$ echo "export PATH=\$PATH:\$HADOOP_HOME/bin:\$HADOOP_HOME/sbin" >> ~/.bashrc``
3. configure files core-site.xml, hdfs-site.xml, mapred-site.xml, yarn-site.xml in /Users/<username>/.sdkman/candidates/hadoop/3.3.5/etc
Expand Down Expand Up @@ -129,22 +183,22 @@ tutorial [here](https://dev.to/awwsmm/installing-and-running-hadoop-and-spark-on
<value>false</value>
</property>
```
Add this into **core-site.xml** between <configuration> tags
Add this into **core-site.xml** between <configuration> tags
```xml
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9999</value>
</property>
```
Add this into **mapred-site.xml** between <configuration> tags

Add this into **mapred-site.xml** between <configuration> tags
```xml
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
```
Add this into **yarn-site.xml** between <configuration> tags
Add this into **yarn-site.xml** between <configuration> tags
```xml
<property>
<name>yarn.nodemanager.aux-services</name>
Expand All @@ -156,9 +210,9 @@ tutorial [here](https://dev.to/awwsmm/installing-and-running-hadoop-and-spark-on
</property>
```
Add this into **hadoop-env.sh**
```export JAVA_HOME="/.../.sdkman/candidates/java/8.0.422-amzn"```
```export JAVA_HOME="/.../.sdkman/candidates/java/11.0.x-amzn"```

4. create directories by configuration for example:
4. create directories by configuration for example:
```
sudo mkdir -p /opt/hadoop_tmp/hdfs/datanode
sudo mkdir -p /opt/hadoop_tmp/hdfs/namenode
Expand All @@ -168,28 +222,28 @@ tutorial [here](https://dev.to/awwsmm/installing-and-running-hadoop-and-spark-on
6. copy keys to authorized_keys `cat ~/.ssh/id_ed25519.pub >> ~/.ssh/authorized_keys`
7. copy keys into ssh localhost `ssh-copy-id username@localhost`
8. test ssh ` ssh <username>@localhost`
8. format namenode `hdfs namenode -format -force` (or `hadoop namenode -format`)
9. start hadoop `start-dfs.sh && start-yarn.sh`
10. add files to hdfs `hdfs dfs -put /path/to/file /path/to/hdfs`
11. stop hdfs `stop-dfs.sh && stop-yarn.sh`
9. format namenode `hdfs namenode -format -force` (or `hadoop namenode -format`)
10. start hadoop `start-dfs.sh && start-yarn.sh`
11. add files to hdfs `hdfs dfs -put /path/to/file /path/to/hdfs`
12. stop hdfs `stop-dfs.sh && stop-yarn.sh`

if something goes wrong check logs in /Users/<username>/.sdkman/candidates/hadoop/3.3.5/logs

ResourceManager web running on http://localhost:8088/cluster
hdfs running on port 9999
NameNode web interface http://localhost:9870/
NameNode web interface http://localhost:9870/

And you have to set remote login on:
![img.png](images/remote_login.png)

running with hadoop:
```bash
sbt assembly
spark-submit target/scala-2.12/dataset-comparison-assembly-0.1.0.jar -o <output-path> --inputA <A-file-path> --inputB <B-file-path> --fsURI http://localhost:9999/
sbt app/assembly
spark-submit app/target/scala-2.12/dataset-comparison-assembly-1.0.jar -o <output-path> --inputA <A-file-path> --inputB <B-file-path> --fsURI http://localhost:9999/
```
` spark-submit target/scala-2.12/dataset-comparison-assembly-0.1.0.jar -o /test_files/output --inputA /test_files/RUN20_edit.parquet --inputB /test_files/RUN20.parquet --fsURI hdfs://localhost:9999/`
`spark-submit app/target/scala-2.12/dataset-comparison-assembly-1.0.jar -o /test_files/output --inputA /test_files/RUN20_edit.parquet --inputB /test_files/RUN20.parquet --fsURI hdfs://localhost:9999/`
or you can uncomment code at the start of validate part in ArgsParserTest and change
FS to HDFS_URI and then run `sbt test`
FS to HDFS_URI and then run `sbt app/test`

### Setting up IntelliJ IDEA

Expand Down
71 changes: 71 additions & 0 deletions bigfiles/api/src/main/scala/za/co/absa/Comparator.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/** Copyright 2020 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package za.co.absa
Copy link

@lsulak lsulak Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These packages must be changed. If we release this into the maven central, there won't be a single nice package containing the dataset comparison library, it will be scattered and living among other absa OSS stuff. I propose this, on high level for all of the files pretty much (I am not a big fan of analysis either - too broad of a word):

package za.co.absa.datasetcomparison or package za.co.absa.dataset.comparison

Do this for the lib and for the CLI part. Then, under the lib part, it could be:

package za.co.absa.datasetcomparison.core

and under CLI:

package za.co.absa.datasetcomparison.cli

Then, further, but this is really optional, it would make sense to split the interfaces and the overall structure into sub-packages: model or api and diff and metrics - so splitting interfaces, metrics, and diff logic.

The cli part is pretty ok in terms of its sub-packages.

Copy link

@lsulak lsulak Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in the end of the day, something like this:

package za.co.absa.datasetcomparison
package za.co.absa.datasetcomparison.model
// note that I excluded `hash` sub-package because it's maybe unnecessary and can be placed under subpackage `diff`
package za.co.absa.datasetcomparison.diff
package za.co.absa.datasetcomparison.metrics

package za.co.absa.datasetcomparison.cli
package za.co.absa.datasetcomparison.cli.io
package za.co.absa.datasetcomparison.cli.parser


import za.co.absa.hash.HashUtils.HASH_COLUMN_NAME
import hash.HashUtils
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.{Logger, LoggerFactory}

/** Comparator object that compares two parquet files and creates metrics
*/
object Comparator {
private val logger: Logger = LoggerFactory.getLogger(Comparator.getClass)

/** Compare two DataFrames and return diff rows
*
* @param dataA
* A DataFrame whole data
* @param dataB
* B DataFrame whole data
* @param excludedColumns
* columns to exclude from comparison
* @param spark
* SparkSession
* @return
* Tuple of (diffA, diffB) DataFrames containing rows unique to each dataset
*/
def compare(
dataA: DataFrame,
dataB: DataFrame,
excludedColumns: Seq[String] = Seq.empty
)(implicit spark: SparkSession): (DataFrame, DataFrame) = {
// Apply column exclusion
val filteredDataA = DatasetComparisonHelper.exclude(dataA, excludedColumns, "A")
val filteredDataB = DatasetComparisonHelper.exclude(dataB, excludedColumns, "B")

val dfWithHashA: DataFrame = HashUtils.createHashColumn(filteredDataA)
logger.info("Hash for A created")

val dfWithHashB: DataFrame = HashUtils.createHashColumn(filteredDataB)
logger.info("Hash for B created")

// select non matching hashs
logger.info("Getting diff hashes, A except B")
val diffHashA: DataFrame = dfWithHashA.select(HASH_COLUMN_NAME).exceptAll(dfWithHashB.select(HASH_COLUMN_NAME))
logger.info("Getting diff hashes, B except A")
val diffHashB: DataFrame = dfWithHashB.select(HASH_COLUMN_NAME).exceptAll(dfWithHashA.select(HASH_COLUMN_NAME))

// join on hash column (get back whole rows)
logger.info("Getting diff rows for A")
val distinctDiffA: DataFrame = diffHashA.join(dfWithHashA, Seq(HASH_COLUMN_NAME)).distinct()
val diffA: DataFrame = diffHashA.join(distinctDiffA, Seq(HASH_COLUMN_NAME))

logger.info("Getting diff rows for B")
val distinctDiffB: DataFrame = diffHashB.join(dfWithHashB, Seq(HASH_COLUMN_NAME)).distinct()
val diffB: DataFrame = diffHashB.join(distinctDiffB, Seq(HASH_COLUMN_NAME))

(diffA, diffB)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@ object DatasetComparisonHelper {
* columns to exclude
* @param dfName
* name of the DataFrame
* @param spark
* SparkSession
* @return
* DataFrame with excluded columns
*/
def exclude(df: DataFrame, toExclude: Seq[String], dfName: String)(implicit spark: SparkSession): DataFrame = {
def exclude(df: DataFrame, toExclude: Seq[String], dfName: String): DataFrame = {
if (toExclude.isEmpty) df
else {
logger.info(s"Excluding columns from the $dfName DataFrame")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/** Copyright 2020 ABSA Group Limited
Copy link

@lsulak lsulak Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm gonna write it here so that we can have a chat based on this:

I like the split and the overall structure, making things pretty obvious that one is CLI tool and the other is supposed to be an importable Scala library. However, naming (one of the difficulties of SW engineering, especially in prior-AI era) should be improved in my opinion. The module api implies interfaces, models, etc, no or not a lot of actual implementation. But here it's actually the whole solution - for the library. And the app is a bit broad term.

What about this? Rename:

  • api to core
  • app to cli

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So:

  • core (published library; reusable comparison engine)
  • cli (runnable wrapper around core; no publish)

*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package za.co.absa.analysis

/** Result of row-by-row analysis with explicit outcome types.
*/
sealed trait AnalysisResult

object AnalysisResult {

/** Analysis completed successfully with row-by-row differences.
*
* @param diffAToB
* Differences from dataset A to B
* @param diffBToA
* Differences from dataset B to A
*/
case class Success(diffAToB: Seq[RowsDiff], diffBToA: Seq[RowsDiff]) extends AnalysisResult

/** Datasets are identical - no differences to analyze.
*/
case object DatasetsIdentical extends AnalysisResult

/** One dataset has differences but the other doesn't - cannot perform row-by-row matching.
*
* @param diffCountA
* Number of differences in dataset A
* @param diffCountB
* Number of differences in dataset B
*/
case class OneSidedDifference(diffCountA: Long, diffCountB: Long) extends AnalysisResult

/** Number of differences exceeds the threshold.
*
* @param diffCountA
* Number of differences in dataset A
* @param diffCountB
* Number of differences in dataset B
* @param threshold
* The threshold that was exceeded
*/
case class ThresholdExceeded(diffCountA: Long, diffCountB: Long, threshold: Int) extends AnalysisResult
}
Loading
Loading