diff --git a/.gitignore b/.gitignore
index 05a71a84..510fa63a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -9,3 +9,4 @@ dist
.mypy_cache
__pycache__/
node_modules/
+.hardpy
\ No newline at end of file
diff --git a/.vscode/launch.json b/.vscode/launch.json
index fc1522a4..e5b35449 100644
--- a/.vscode/launch.json
+++ b/.vscode/launch.json
@@ -138,6 +138,17 @@
"examples/minute_parity"
]
},
+ {
+ "name": "Python: Example JSON storage",
+ "type": "debugpy",
+ "request": "launch",
+ "module": "hardpy.cli.cli",
+ "console": "integratedTerminal",
+ "args": [
+ "run",
+ "examples/json_storage"
+ ]
+ },
{
"name": "Python: Example Multiple configs",
"type": "debugpy",
diff --git a/README.md b/README.md
index 08a1c929..860d969b 100644
--- a/README.md
+++ b/README.md
@@ -13,7 +13,7 @@ HardPy is a python library for creating a test bench for devices.
[](https://docs.pytest.org/en/latest/)
[](https://everypinio.github.io/hardpy/)
[](https://www.reddit.com/r/HardPy)
-[](https://discord.com/invite/3kBG9CbS)
+[](https://discord.gg/98bWadmG8J)
[](https://t.me/everypin)
@@ -26,7 +26,7 @@ HardPy allows you to:
* Create test benches for devices using [pytest](https://docs.pytest.org/);
* Use a browser to view, start, stop, and interact with tests;
-* Store test results in the [CouchDB](https://couchdb.apache.org/) database;
+* Store test results in the [CouchDB](https://couchdb.apache.org/) database or to simple JSON files;
* Store test results on the [StandCloud](https://standcloud.io/) analytics platform.
@@ -41,6 +41,8 @@ pip install hardpy
## Getting Started
+### With CouchDB
+
1. Create your first test bench.
```bash
hardpy init
@@ -59,6 +61,18 @@ hardpy run
Login and password: **dev**, database - **runstore**.
+### Without a database
+
+1. Create your first test bench.
+```bash
+hardpy init --no-create-database --storage-type json
+```
+2. Launch HardPy operator panel.
+```bash
+hardpy run
+```
+3. View operator panel in browser: http://localhost:8000/
+
## Examples
For more examples of using **HardPy**, see the [examples](https://github.com/everypinio/hardpy/tree/main/examples) folder and the [documentation](https://everypinio.github.io/hardpy/examples/).
diff --git a/docs/changelog.md b/docs/changelog.md
index 4af0ead4..1d0c753d 100644
--- a/docs/changelog.md
+++ b/docs/changelog.md
@@ -2,6 +2,11 @@
Versions follow [Semantic Versioning](https://semver.org/): `..`.
+## 0.20.0
+
+* Add JSON file storage support as alternative to CouchDB.
+ [[PR-239](https://github.com/everypinio/hardpy/pull/239)]
+
## 0.19.1
* Add a `stop_time` validator function. If a case or module has a start time,
@@ -19,7 +24,7 @@ Versions follow [Semantic Versioning](https://semver.org/): `.. 1000, "Transfer rate too slow"
+
+@pytest.mark.case_name("Protocol Validation")
+@pytest.mark.module_name("Communication Tests")
+@pytest.mark.critical # Critical test - stops all if fails
+def test_protocol_validation():
+ """Test communication protocol validation."""
+ hardpy.set_message("Validating communication protocol...")
+
+ # Simulate protocol check
+ protocol_version = "v2.1"
+ expected_version = "v2.1"
+
+ hardpy.set_case_measurement(
+ hardpy.StringMeasurement(
+ name="Protocol Version",
+ value=protocol_version,
+ comparison_value=expected_version,
+ )
+ )
+
+ hardpy.set_message(f"Protocol version: {protocol_version}")
+
+ assert protocol_version == expected_version, \
+ f"Protocol mismatch: got {protocol_version}, expected {expected_version}"
+
+@pytest.mark.case_name("Error Handling Test")
+@pytest.mark.module_name("Communication Tests")
+@pytest.mark.dependency("test_communication::test_protocol_validation")
+def test_error_handling():
+ """Test error handling (depends on protocol validation)."""
+ hardpy.set_message("Testing error handling...")
+
+ # Simulate error injection and recovery
+ errors_injected = 5
+ errors_handled = 5
+
+ hardpy.set_case_measurement(
+ hardpy.NumericMeasurement(
+ name="Errors Handled",
+ value=errors_handled,
+ unit="count",
+ comparison_value=errors_injected,
+ )
+ )
+
+ hardpy.set_message(f"Handled {errors_handled}/{errors_injected} errors")
+
+ assert errors_handled == errors_injected, "Some errors not handled correctly"
+```
+
+### test_voltage.py
+
+```python
+import hardpy
+import pytest
+from time import sleep
+
+@pytest.mark.case_name("Check Power Supply Voltage")
+@pytest.mark.module_name("Power Supply Tests")
+def test_power_supply_voltage():
+ """Test that power supply outputs correct voltage."""
+ # Set test stand information
+ hardpy.set_stand_name("Test Bench #1")
+ hardpy.set_stand_location("Lab A")
+
+ # Set device under test information
+ hardpy.set_dut_serial_number("PSU-12345")
+ hardpy.set_dut_name("Power Supply Unit")
+ hardpy.set_dut_type("DC Power Supply")
+
+ # Simulate voltage measurement
+ expected_voltage = 5.0
+ measured_voltage = 5.02 # Simulated measurement
+ tolerance = 0.1
+
+ # Record measurement
+ hardpy.set_case_measurement(
+ hardpy.NumericMeasurement(
+ name="Output Voltage",
+ value=measured_voltage,
+ unit="V",
+ lower_limit=expected_voltage - tolerance,
+ upper_limit=expected_voltage + tolerance,
+ )
+ )
+
+ # Add message
+ hardpy.set_message(f"Measured voltage: {measured_voltage}V (expected: {expected_voltage}V)")
+
+ # Verify voltage is within tolerance
+ assert abs(measured_voltage - expected_voltage) <= tolerance, \
+ f"Voltage out of tolerance: {measured_voltage}V"
+
+@pytest.mark.case_name("Check Current Limit")
+@pytest.mark.module_name("Power Supply Tests")
+def test_current_limit():
+ """Test that power supply has correct current limit."""
+ # Simulate current limit test
+ expected_limit = 3.0
+ measured_limit = 3.05 # Simulated measurement
+ tolerance = 0.2
+
+ # Record measurement
+ hardpy.set_case_measurement(
+ hardpy.NumericMeasurement(
+ name="Current Limit",
+ value=measured_limit,
+ unit="A",
+ lower_limit=expected_limit - tolerance,
+ upper_limit=expected_limit + tolerance,
+ )
+ )
+
+ hardpy.set_message(f"Current limit: {measured_limit}A")
+
+ assert abs(measured_limit - expected_limit) <= tolerance
+
+@pytest.mark.case_name("Voltage Stability Test")
+@pytest.mark.module_name("Power Supply Tests")
+@pytest.mark.attempt(2) # Retry once if fails
+def test_voltage_stability():
+ """Test voltage stability over time."""
+ hardpy.set_message("Testing voltage stability over 5 seconds...")
+
+ voltage_readings = []
+ for i in range(5):
+ # Simulate reading voltage
+ voltage = 5.0 + (i * 0.01) # Slight increase
+ voltage_readings.append(voltage)
+ sleep(0.1) # Simulate measurement delay
+
+ max_variation = max(voltage_readings) - min(voltage_readings)
+
+ # Record measurement
+ hardpy.set_case_measurement(
+ hardpy.NumericMeasurement(
+ name="Voltage Variation",
+ value=max_variation,
+ unit="V",
+ upper_limit=0.1,
+ )
+ )
+
+ hardpy.set_message(f"Max voltage variation: {max_variation:.3f}V")
+
+ assert max_variation < 0.1, f"Voltage not stable: {max_variation}V variation"
+```
diff --git a/docs/features/features.md b/docs/features/features.md
index d3f9f936..9f17e76f 100644
--- a/docs/features/features.md
+++ b/docs/features/features.md
@@ -190,7 +190,7 @@ The user can check the status of tests using the [hardpy status](./../documentat
### Storing test result in database
-**HardPy** does not allow you to run tests without a running [CouchDB](https://couchdb.apache.org/) database.
+**HardPy** allows you to run tests with a running [CouchDB](https://couchdb.apache.org/) database.
This is a NoSQL database that ensures that the results of the current test run are committed,
even if the tests are aborted early.
@@ -206,6 +206,15 @@ An example of configuring **conftest.py** to store test run history can be found
including the [couchdb_load](./../examples/couchdb_load.md) and
[minute_parity](./../examples/minute_parity.md).
+### JSON
+
+With **HardPy**, you can run tests without a database and save the test data to local JSON documents.
+These documents have a structure similar to that of databases and documents in **CouchDB**.
+A .hardpy/storage folder is created by default in the project folder,
+where test reports can be found in the **runstore** folder.
+
+An example of its use can be found on page [JSON storage](./../examples/json_storage.md).
+
### Other databases
In order to save data to other databases, users will need to write their own adapter class to convert their
diff --git a/docs/index.md b/docs/index.md
index 8e084310..ebf9e2c4 100644
--- a/docs/index.md
+++ b/docs/index.md
@@ -14,7 +14,7 @@ HardPy allows you to:
* Create test benches for devices using [pytest](https://docs.pytest.org/);
* Use a browser to view, start, stop, and interact with tests;
-* Store test results in the [CouchDB](https://couchdb.apache.org/) database;
+* Store test results in the [CouchDB](https://couchdb.apache.org/) database or to simple JSON files;
* Store test results on the [StandCloud](https://standcloud.io/) analytics platform.
@@ -29,6 +29,8 @@ pip install hardpy
## Getting Started
+### With CouchDB
+
1. Create your first test bench.
```bash
@@ -62,6 +64,18 @@ hardpy run
alt="hardpy runstore" style="width:500px;">
+### Without a database
+
+1. Create your first test bench.
+```bash
+hardpy init --no-create-database --storage-type json
+```
+2. Launch HardPy operator panel.
+```bash
+hardpy run
+```
+3. View operator panel in browser: http://localhost:8000/
+
## Measurement instruments
**HardPy** does not contain any drivers for interacting with measuring equipment.
diff --git a/examples/attempts/hardpy.toml b/examples/attempts/hardpy.toml
index b94f1a38..f6247fcf 100644
--- a/examples/attempts/hardpy.toml
+++ b/examples/attempts/hardpy.toml
@@ -3,6 +3,7 @@ tests_name = "Attempts"
current_test_config = ""
[database]
+storage_type = "couchdb"
user = "dev"
password = "dev"
host = "localhost"
diff --git a/examples/couchdb_load/hardpy.toml b/examples/couchdb_load/hardpy.toml
index a8c4bab9..a7001d31 100644
--- a/examples/couchdb_load/hardpy.toml
+++ b/examples/couchdb_load/hardpy.toml
@@ -3,6 +3,7 @@ tests_name = "CouchDB Load"
current_test_config = ""
[database]
+storage_type = "couchdb"
user = "dev"
password = "dev"
host = "localhost"
diff --git a/examples/dialog_box/hardpy.toml b/examples/dialog_box/hardpy.toml
index 6d502185..d305d1c0 100644
--- a/examples/dialog_box/hardpy.toml
+++ b/examples/dialog_box/hardpy.toml
@@ -3,6 +3,7 @@ tests_name = "Dialog Box"
current_test_config = ""
[database]
+storage_type = "couchdb"
user = "dev"
password = "dev"
host = "localhost"
diff --git a/examples/hello_hardpy/hardpy.toml b/examples/hello_hardpy/hardpy.toml
index ce541134..bb7559c6 100644
--- a/examples/hello_hardpy/hardpy.toml
+++ b/examples/hello_hardpy/hardpy.toml
@@ -3,6 +3,7 @@ tests_name = "Hello HardPy"
current_test_config = ""
[database]
+storage_type = "couchdb"
user = "dev"
password = "dev"
host = "localhost"
diff --git a/examples/json_storage/.gitignore b/examples/json_storage/.gitignore
new file mode 100644
index 00000000..c4a847d9
--- /dev/null
+++ b/examples/json_storage/.gitignore
@@ -0,0 +1 @@
+/result
diff --git a/examples/json_storage/README.md b/examples/json_storage/README.md
new file mode 100644
index 00000000..dcc71eaf
--- /dev/null
+++ b/examples/json_storage/README.md
@@ -0,0 +1,3 @@
+# JSON storage
+
+Example documentation: https://everypinio.github.io/hardpy/examples/json_storage/
diff --git a/examples/json_storage/conftest.py b/examples/json_storage/conftest.py
new file mode 100644
index 00000000..dd169b15
--- /dev/null
+++ b/examples/json_storage/conftest.py
@@ -0,0 +1,58 @@
+from pathlib import Path
+import pytest
+
+from hardpy import JsonLoader, get_current_report
+
+
+@pytest.fixture(scope="session")
+def setup_test_environment():
+ """Set up test environment before all tests."""
+ print("\n=== Setting up test environment ===")
+ # Add any global setup here
+ yield
+ print("\n=== Tearing down test environment ===")
+ # Add any global cleanup here
+
+
+@pytest.fixture(scope="function")
+def test_device():
+ """Fixture providing simulated test device."""
+
+ class TestDevice:
+ def __init__(self):
+ self.connected = False
+ self.voltage = 5.0
+ self.current = 0.0
+
+ def connect(self):
+ self.connected = True
+ return True
+
+ def disconnect(self):
+ self.connected = False
+
+ def measure_voltage(self):
+ return self.voltage
+
+ def measure_current(self):
+ return self.current
+
+ device = TestDevice()
+ device.connect()
+
+ yield device
+
+ device.disconnect()
+
+
+def save_report_to_dir():
+ report = get_current_report()
+ if report:
+ loader = JsonLoader(Path.cwd() / "reports")
+ loader.load(report)
+
+
+@pytest.fixture(scope="session", autouse=True)
+def fill_actions_after_test(post_run_functions: list):
+ post_run_functions.append(save_report_to_dir)
+ yield
diff --git a/examples/json_storage/hardpy.toml b/examples/json_storage/hardpy.toml
new file mode 100644
index 00000000..24cd7165
--- /dev/null
+++ b/examples/json_storage/hardpy.toml
@@ -0,0 +1,30 @@
+# HardPy Configuration File
+# This demo uses JSON file storage (no CouchDB required!)
+
+title = "HardPy JSON Storage Demo"
+tests_name = "Device Test Suite"
+
+[database]
+# Storage type: "json"
+storage_type = "json"
+storage_path = "result"
+
+# CouchDB settings (only needed if storage_type = "couchdb")
+# user = "dev"
+# password = "dev"
+# host = "localhost"
+# port = 5984
+
+[frontend]
+host = "localhost"
+port = 8000
+language = "en"
+sound_on = false
+full_size_button = false
+manual_collect = false
+measurement_display = true
+
+[frontend.modal_result]
+enable = true
+auto_dismiss_pass = true
+auto_dismiss_timeout = 5
diff --git a/examples/json_storage/pytest.ini b/examples/json_storage/pytest.ini
new file mode 100644
index 00000000..4491d9d2
--- /dev/null
+++ b/examples/json_storage/pytest.ini
@@ -0,0 +1,6 @@
+[pytest]
+pythonpath = .
+testpaths = .
+python_files = test_*.py
+python_classes = Test*
+python_functions = test_*
diff --git a/examples/json_storage/test_chart_demo.py b/examples/json_storage/test_chart_demo.py
new file mode 100644
index 00000000..3c519036
--- /dev/null
+++ b/examples/json_storage/test_chart_demo.py
@@ -0,0 +1,95 @@
+import hardpy
+import pytest
+import math
+
+
+@pytest.mark.case_name("Sine Wave Analysis")
+@pytest.mark.module_name("Chart Demonstrations")
+def test_sine_wave():
+ """Test generating and analyzing a sine wave."""
+ hardpy.set_message("Generating sine wave data...")
+
+ # Generate sine wave data
+ x_data = []
+ y_data = []
+
+ for i in range(100):
+ x = i / 10.0 # 0 to 10
+ y = math.sin(x)
+ x_data.append(x)
+ y_data.append(y)
+
+ # Create chart
+ chart = hardpy.Chart(
+ title="Sine Wave",
+ x_label="Time",
+ y_label="Amplitude",
+ type=hardpy.ChartType.LINE,
+ )
+ chart.add_series(x_data, y_data, "Sine Wave")
+
+ hardpy.set_case_chart(chart)
+
+ # Verify amplitude
+ max_amplitude = max(y_data)
+ min_amplitude = min(y_data)
+ peak_to_peak = max_amplitude - min_amplitude
+
+ hardpy.set_case_measurement(
+ hardpy.NumericMeasurement(
+ name="Peak-to-Peak Amplitude",
+ value=peak_to_peak,
+ unit="V",
+ lower_limit=1.9,
+ upper_limit=2.1,
+ )
+ )
+
+ hardpy.set_message(f"Peak-to-peak amplitude: {peak_to_peak:.3f}V")
+
+ assert 1.9 <= peak_to_peak <= 2.1, "Amplitude out of range"
+
+
+@pytest.mark.case_name("Temperature Curve")
+@pytest.mark.module_name("Chart Demonstrations")
+def test_temperature_curve():
+ """Test temperature rise curve."""
+ hardpy.set_message("Recording temperature curve...")
+
+ # Simulate temperature rise
+ time_data = []
+ temp_data = []
+
+ for i in range(50):
+ time = i * 2 # seconds
+ # Exponential rise to 80°C
+ temp = 25 + 55 * (1 - math.exp(-i / 20))
+ time_data.append(time)
+ temp_data.append(temp)
+
+ # Create chart
+ chart = hardpy.Chart(
+ title="Temperature Rise Curve",
+ x_label="Time (seconds)",
+ y_label="Temperature (°C)",
+ type=hardpy.ChartType.LINE,
+ )
+ chart.add_series(time_data, temp_data, "Temperature")
+
+ hardpy.set_case_chart(chart)
+
+ # Check final temperature
+ final_temp = temp_data[-1]
+
+ hardpy.set_case_measurement(
+ hardpy.NumericMeasurement(
+ name="Final Temperature",
+ value=final_temp,
+ unit="°C",
+ upper_limit=85,
+ )
+ )
+
+ hardpy.set_message(f"Final temperature: {final_temp:.1f}°C")
+
+ assert final_temp < 85, f"Temperature too high: {final_temp}°C"
diff --git a/examples/json_storage/test_communication.py b/examples/json_storage/test_communication.py
new file mode 100644
index 00000000..97fb7a16
--- /dev/null
+++ b/examples/json_storage/test_communication.py
@@ -0,0 +1,108 @@
+import hardpy
+import pytest
+from time import sleep
+
+
+@pytest.mark.case_name("Serial Port Connection")
+@pytest.mark.module_name("Communication Tests")
+def test_serial_connection():
+ """Test serial port connection."""
+ hardpy.set_message("Testing serial port connection...")
+
+ # Simulate connection
+ port = "/dev/ttyUSB0"
+ baudrate = 115200
+
+ hardpy.set_instrument(
+ hardpy.Instrument(
+ name="Serial Port",
+ comment=f"{port} @ {baudrate} baud"
+ )
+ )
+
+ # Simulate successful connection
+ connection_ok = True
+
+ hardpy.set_message(f"Connected to {port} at {baudrate} baud")
+
+ assert connection_ok, "Failed to establish serial connection"
+
+
+@pytest.mark.case_name("Data Transfer Test")
+@pytest.mark.module_name("Communication Tests")
+@pytest.mark.attempt(3) # Allow 2 retries
+def test_data_transfer():
+ """Test data transfer over serial."""
+ hardpy.set_message("Testing data transfer...")
+
+ # Simulate sending and receiving data
+ sent_bytes = 1024
+ received_bytes = 1024
+ transfer_time = 0.5 # seconds
+
+ # Calculate transfer rate
+ transfer_rate = (sent_bytes + received_bytes) / transfer_time
+
+ hardpy.set_case_measurement(
+ hardpy.NumericMeasurement(
+ name="Transfer Rate",
+ value=transfer_rate,
+ unit="bytes/s",
+ lower_limit=1000,
+ )
+ )
+
+ hardpy.set_message(f"Transfer rate: {transfer_rate:.0f} bytes/s")
+
+ assert received_bytes == sent_bytes, "Data integrity error"
+ assert transfer_rate > 1000, "Transfer rate too slow"
+
+
+@pytest.mark.case_name("Protocol Validation")
+@pytest.mark.module_name("Communication Tests")
+@pytest.mark.critical # Critical test - stops all if fails
+def test_protocol_validation():
+ """Test communication protocol validation."""
+ hardpy.set_message("Validating communication protocol...")
+
+ # Simulate protocol check
+ protocol_version = "v2.1"
+ expected_version = "v2.1"
+
+ hardpy.set_case_measurement(
+ hardpy.StringMeasurement(
+ name="Protocol Version",
+ value=protocol_version,
+ comparison_value=expected_version,
+ )
+ )
+
+ hardpy.set_message(f"Protocol version: {protocol_version}")
+
+ assert protocol_version == expected_version, \
+ f"Protocol mismatch: got {protocol_version}, expected {expected_version}"
+
+
+@pytest.mark.case_name("Error Handling Test")
+@pytest.mark.module_name("Communication Tests")
+@pytest.mark.dependency("test_communication::test_protocol_validation")
+def test_error_handling():
+ """Test error handling (depends on protocol validation)."""
+ hardpy.set_message("Testing error handling...")
+
+ # Simulate error injection and recovery
+ errors_injected = 5
+ errors_handled = 5
+
+ hardpy.set_case_measurement(
+ hardpy.NumericMeasurement(
+ name="Errors Handled",
+ value=errors_handled,
+ unit="count",
+ comparison_value=errors_injected,
+ )
+ )
+
+ hardpy.set_message(f"Handled {errors_handled}/{errors_injected} errors")
+
+ assert errors_handled == errors_injected, "Some errors not handled correctly"
diff --git a/examples/json_storage/test_voltage.py b/examples/json_storage/test_voltage.py
new file mode 100644
index 00000000..2d73164a
--- /dev/null
+++ b/examples/json_storage/test_voltage.py
@@ -0,0 +1,96 @@
+import hardpy
+import pytest
+from time import sleep
+
+
+@pytest.mark.case_name("Check Power Supply Voltage")
+@pytest.mark.module_name("Power Supply Tests")
+def test_power_supply_voltage():
+ """Test that power supply outputs correct voltage."""
+ # Set test stand information
+ hardpy.set_stand_name("Test Bench #1")
+ hardpy.set_stand_location("Lab A")
+
+ # Set device under test information
+ hardpy.set_dut_serial_number("PSU-12345")
+ hardpy.set_dut_name("Power Supply Unit")
+ hardpy.set_dut_type("DC Power Supply")
+
+ # Simulate voltage measurement
+ expected_voltage = 5.0
+ measured_voltage = 5.02 # Simulated measurement
+ tolerance = 0.1
+
+ # Record measurement
+ hardpy.set_case_measurement(
+ hardpy.NumericMeasurement(
+ name="Output Voltage",
+ value=measured_voltage,
+ unit="V",
+ lower_limit=expected_voltage - tolerance,
+ upper_limit=expected_voltage + tolerance,
+ )
+ )
+
+ # Add message
+ hardpy.set_message(f"Measured voltage: {measured_voltage}V (expected: {expected_voltage}V)")
+
+ # Verify voltage is within tolerance
+ assert abs(measured_voltage - expected_voltage) <= tolerance, \
+ f"Voltage out of tolerance: {measured_voltage}V"
+
+
+@pytest.mark.case_name("Check Current Limit")
+@pytest.mark.module_name("Power Supply Tests")
+def test_current_limit():
+ """Test that power supply has correct current limit."""
+ # Simulate current limit test
+ expected_limit = 3.0
+ measured_limit = 3.05 # Simulated measurement
+ tolerance = 0.2
+
+ # Record measurement
+ hardpy.set_case_measurement(
+ hardpy.NumericMeasurement(
+ name="Current Limit",
+ value=measured_limit,
+ unit="A",
+ lower_limit=expected_limit - tolerance,
+ upper_limit=expected_limit + tolerance,
+ )
+ )
+
+ hardpy.set_message(f"Current limit: {measured_limit}A")
+
+ assert abs(measured_limit - expected_limit) <= tolerance
+
+
+@pytest.mark.case_name("Voltage Stability Test")
+@pytest.mark.module_name("Power Supply Tests")
+@pytest.mark.attempt(2) # Retry once if fails
+def test_voltage_stability():
+ """Test voltage stability over time."""
+ hardpy.set_message("Testing voltage stability over 5 seconds...")
+
+ voltage_readings = []
+ for i in range(5):
+ # Simulate reading voltage
+ voltage = 5.0 + (i * 0.01) # Slight increase
+ voltage_readings.append(voltage)
+ sleep(0.1) # Simulate measurement delay
+
+ max_variation = max(voltage_readings) - min(voltage_readings)
+
+ # Record measurement
+ hardpy.set_case_measurement(
+ hardpy.NumericMeasurement(
+ name="Voltage Variation",
+ value=max_variation,
+ unit="V",
+ upper_limit=0.1,
+ )
+ )
+
+ hardpy.set_message(f"Max voltage variation: {max_variation:.3f}V")
+
+ assert max_variation < 0.1, f"Voltage not stable: {max_variation}V variation"
diff --git a/examples/measurement/hardpy.toml b/examples/measurement/hardpy.toml
index 022960ae..0c6b4902 100644
--- a/examples/measurement/hardpy.toml
+++ b/examples/measurement/hardpy.toml
@@ -3,6 +3,7 @@ tests_name = "Measurement"
current_test_config = ""
[database]
+storage_type = "couchdb"
user = "dev"
password = "dev"
host = "localhost"
diff --git a/examples/minute_parity/hardpy.toml b/examples/minute_parity/hardpy.toml
index 0342d5cd..f6c0fd9f 100644
--- a/examples/minute_parity/hardpy.toml
+++ b/examples/minute_parity/hardpy.toml
@@ -3,6 +3,7 @@ tests_name = "Minute Parity"
current_test_config = ""
[database]
+storage_type = "couchdb"
user = "dev"
password = "dev"
host = "localhost"
diff --git a/examples/multiple_configs/hardpy.toml b/examples/multiple_configs/hardpy.toml
index 89fd9a12..124a807e 100644
--- a/examples/multiple_configs/hardpy.toml
+++ b/examples/multiple_configs/hardpy.toml
@@ -9,6 +9,7 @@ host = "localhost"
port = 5984
[frontend]
+storage_type = "couchdb"
host = "localhost"
port = 8000
language = "en"
diff --git a/examples/operator_msg/hardpy.toml b/examples/operator_msg/hardpy.toml
index 89b4d452..d6dad4d0 100644
--- a/examples/operator_msg/hardpy.toml
+++ b/examples/operator_msg/hardpy.toml
@@ -3,6 +3,7 @@ tests_name = "Operator Message"
current_test_config = ""
[database]
+storage_type = "couchdb"
user = "dev"
password = "dev"
host = "localhost"
diff --git a/examples/stand_cloud/hardpy.toml b/examples/stand_cloud/hardpy.toml
index 3847a1aa..87f48d4a 100644
--- a/examples/stand_cloud/hardpy.toml
+++ b/examples/stand_cloud/hardpy.toml
@@ -3,6 +3,7 @@ tests_name = "StandCloud"
current_test_config = ""
[database]
+storage_type = "couchdb"
user = "dev"
password = "dev"
host = "localhost"
diff --git a/hardpy/__init__.py b/hardpy/__init__.py
index ca4560b6..f11f2dda 100644
--- a/hardpy/__init__.py
+++ b/hardpy/__init__.py
@@ -45,6 +45,7 @@
)
from hardpy.pytest_hardpy.result import (
CouchdbLoader,
+ JsonLoader,
StandCloudLoader,
StandCloudReader,
)
@@ -82,6 +83,7 @@
"HTMLComponent",
"ImageComponent",
"Instrument",
+ "JsonLoader",
"MultistepWidget",
"NumericInputWidget",
"NumericMeasurement",
diff --git a/hardpy/cli/cli.py b/hardpy/cli/cli.py
index 1fc1f5f0..425ca090 100644
--- a/hardpy/cli/cli.py
+++ b/hardpy/cli/cli.py
@@ -82,6 +82,10 @@ def init( # noqa: PLR0913
default=default_config.stand_cloud.api_key,
help="Specify a StandCloud API key.",
),
+ storage_type: str = typer.Option(
+ default=default_config.database.storage_type.value,
+ help="Specify a storage type.",
+ ),
) -> None:
"""Initialize HardPy tests directory.
@@ -100,6 +104,7 @@ def init( # noqa: PLR0913
sc_connection_only (bool): Flag to check StandCloud service availability
sc_autosync (bool): Flag to enable StandCloud auto syncronization
sc_api_key (str | None): StandCloud API key
+ storage_type (str): Storage type, "json" or "couchdb", "couchdb" by default
"""
dir_path = Path(Path.cwd() / tests_dir if tests_dir else "tests")
config_manager = ConfigManager()
@@ -116,6 +121,7 @@ def init( # noqa: PLR0913
sc_connection_only=sc_connection_only,
sc_autosync=sc_autosync,
sc_api_key=sc_api_key,
+ storage_type=storage_type,
)
# create tests directory
Path.mkdir(dir_path, exist_ok=True, parents=True)
diff --git a/hardpy/common/config.py b/hardpy/common/config.py
index ddf2ea2f..86f218f2 100644
--- a/hardpy/common/config.py
+++ b/hardpy/common/config.py
@@ -2,6 +2,7 @@
# GNU General Public License v3.0 (see LICENSE or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import annotations
+from enum import Enum
from logging import getLogger
from pathlib import Path
@@ -14,17 +15,32 @@
logger = getLogger(__name__)
+class StorageType(str, Enum):
+ """Storage backend types for HardPy data persistence.
+
+ Attributes:
+ JSON: JSON file-based storage on local filesystem
+ COUCHDB: CouchDB database storage
+ """
+
+ JSON = "json"
+ COUCHDB = "couchdb"
+
+
class DatabaseConfig(BaseModel):
"""Database configuration."""
model_config = ConfigDict(extra="forbid")
+ storage_type: StorageType = StorageType.COUCHDB
user: str = "dev"
password: str = "dev"
host: str = "localhost"
port: int = 5984
doc_id: str = Field(exclude=True, default="")
url: str = Field(exclude=True, default="")
+ # This field is relevant only when storage_type is "json"
+ storage_path: str = Field(exclude=True, default=".hardpy")
def model_post_init(self, __context) -> None: # noqa: ANN001,PYI063
"""Get database connection url."""
@@ -157,6 +173,7 @@ def init_config( # noqa: PLR0913
sc_connection_only: bool,
sc_autosync: bool,
sc_api_key: str,
+ storage_type: str,
) -> None:
"""Initialize the HardPy configuration.
@@ -175,12 +192,14 @@ def init_config( # noqa: PLR0913
sc_connection_only (bool): StandCloud check availability.
sc_autosync (bool): StandCloud auto syncronization.
sc_api_key (str): StandCloud API key.
+ storage_type (str): Database storage type.
"""
self._config.tests_name = tests_name
self._config.frontend.host = frontend_host
self._config.frontend.port = frontend_port
self._config.frontend.language = frontend_language
self._config.database.user = database_user
+ self._config.database.storage_type = StorageType(storage_type)
self._config.database.password = database_password
self._config.database.host = database_host
self._config.database.port = database_port
diff --git a/hardpy/hardpy_panel/api.py b/hardpy/hardpy_panel/api.py
index 1e69149b..e59df105 100644
--- a/hardpy/hardpy_panel/api.py
+++ b/hardpy/hardpy_panel/api.py
@@ -17,7 +17,7 @@
from fastapi import FastAPI, Query, Request
from fastapi.staticfiles import StaticFiles
-from hardpy.common.config import ConfigManager
+from hardpy.common.config import ConfigManager, StorageType
from hardpy.pytest_hardpy.pytest_wrapper import PyTestWrapper
from hardpy.pytest_hardpy.result.report_synchronizer import StandCloudSynchronizer
@@ -361,6 +361,67 @@ def set_manual_collect_mode(mode_data: dict) -> dict:
return {"status": "success", "manual_collect_mode": enabled}
+@app.get("/api/storage_type")
+def get_storage_type() -> dict:
+ """Get the configured storage type.
+
+ Returns:
+ dict[str, str]: storage type ("json" or "couchdb")
+ """
+ config_manager = ConfigManager()
+ return {"storage_type": config_manager.config.database.storage_type}
+
+
+@app.get("/api/json_data")
+def get_json_data() -> dict:
+ """Get test run data from JSON storage.
+
+ Returns:
+ dict: Test run data from JSON files
+ """
+ config_manager = ConfigManager()
+ storage_type = config_manager.config.database.storage_type
+
+ if storage_type != StorageType.JSON:
+ return {"error": "JSON storage not configured"}
+
+ try:
+ config_storage_path = Path(config_manager.config.database.storage_path)
+ if config_storage_path.is_absolute():
+ storage_dir = config_storage_path / "storage" / "statestore"
+ else:
+ storage_dir = Path(
+ config_manager.tests_path
+ / config_manager.config.database.storage_path
+ / "storage"
+ / "statestore",
+ )
+ _doc_id = config_manager.config.database.doc_id
+ statestore_file = storage_dir / f"{_doc_id}.json"
+
+ if not statestore_file.exists():
+ return {"rows": [], "total_rows": 0}
+
+ with statestore_file.open("r") as f:
+ data = json.load(f)
+
+ # Format data to match CouchDB's _all_docs format
+ return {
+ "rows": [
+ {
+ "id": data.get("_id", ""),
+ "key": data.get("_id", ""),
+ "value": {"rev": data.get("_rev", "1-0")},
+ "doc": data,
+ },
+ ],
+ "total_rows": 1,
+ }
+ except Exception as exc:
+ logger.exception("Error reading JSON storage")
+ return {"error": str(exc), "rows": [], "total_rows": 0}
+
+
if "DEBUG_FRONTEND" not in os.environ:
app.mount(
"/",
diff --git a/hardpy/hardpy_panel/frontend/src/App.tsx b/hardpy/hardpy_panel/frontend/src/App.tsx
index ac1839ed..fc6c90fb 100644
--- a/hardpy/hardpy_panel/frontend/src/App.tsx
+++ b/hardpy/hardpy_panel/frontend/src/App.tsx
@@ -27,7 +27,7 @@ import PlaySound from "./hardpy_test_view/PlaySound";
import TestConfigOverlay from "./hardpy_test_view/TestConfigOverlay";
import TestCompletionModalResult from "./hardpy_test_view/TestCompletionModalResult";
-import { useAllDocs } from "use-pouchdb";
+import { useStorageData } from "./hooks/useStorageData";
import "./App.css";
@@ -479,9 +479,7 @@ function App({ syncDocumentId }: { syncDocumentId: string }): JSX.Element {
return -1;
}
- const { rows, state, loading, error } = useAllDocs({
- include_docs: true,
- });
+ const { rows, state, loading, error } = useStorageData();
/**
* Monitors database changes and updates application state accordingly
diff --git a/hardpy/hardpy_panel/frontend/src/hooks/useStorageData.ts b/hardpy/hardpy_panel/frontend/src/hooks/useStorageData.ts
new file mode 100644
index 00000000..ea6b342d
--- /dev/null
+++ b/hardpy/hardpy_panel/frontend/src/hooks/useStorageData.ts
@@ -0,0 +1,115 @@
+// Copyright (c) 2025 Everypin
+// GNU General Public License v3.0 (see LICENSE or https://www.gnu.org/licenses/gpl-3.0.txt)
+
+import * as React from "react";
+import { useAllDocs } from "use-pouchdb";
+
+type StorageState = "loading" | "done" | "error";
+
+interface StorageRow {
+ id: string;
+ key: string;
+ value: { rev: string };
+ doc: any;
+}
+
+interface StorageData {
+ rows: StorageRow[];
+ state: StorageState;
+ loading: boolean;
+ error: Error | null;
+}
+
+interface StorageTypeResponse {
+ storage_type: string;
+}
+
+interface JsonDataResponse {
+ rows: StorageRow[];
+ total_rows: number;
+ error?: string;
+}
+
+/**
+ * Custom hook to fetch data from either JSON storage or CouchDB
+ * Automatically detects storage type and uses appropriate method
+ */
+export const useStorageData = (): StorageData => {
+ const [storageType, setStorageType] = React.useState(null);
+ const [jsonData, setJsonData] = React.useState([]);
+ const [jsonLoading, setJsonLoading] = React.useState(true);
+ const [jsonError, setJsonError] = React.useState(null);
+
+ // Fetch storage type on mount
+ React.useEffect(() => {
+ fetch("/api/storage_type")
+ .then((res) => res.json())
+ .then((data: StorageTypeResponse) => {
+ setStorageType(data.storage_type);
+ })
+ .catch((err) => {
+ console.error("Failed to fetch storage type:", err);
+ setStorageType("couchdb"); // Default to CouchDB
+ });
+ }, []);
+
+ // For JSON storage, poll the API endpoint
+ React.useEffect(() => {
+ if (storageType !== "json") return;
+
+ const fetchJsonData = () => {
+ fetch("/api/json_data")
+ .then((res) => res.json())
+ .then((data: JsonDataResponse) => {
+ if (data.error) {
+ setJsonError(new Error(data.error));
+ setJsonLoading(false);
+ } else {
+ setJsonData(data.rows);
+ setJsonLoading(false);
+ setJsonError(null);
+ }
+ })
+ .catch((err) => {
+ setJsonError(err);
+ setJsonLoading(false);
+ });
+ };
+
+ // Initial fetch
+ fetchJsonData();
+
+ // Poll every 500ms for updates
+ const interval = setInterval(fetchJsonData, 500);
+
+ return () => clearInterval(interval);
+ }, [storageType]);
+
+ // For CouchDB, use the existing PouchDB hook
+ const pouchDbData = useAllDocs({
+ include_docs: true,
+ });
+
+ // Return appropriate data based on storage type
+ if (storageType === null) {
+ // Still detecting storage type
+ return {
+ rows: [],
+ state: "loading",
+ loading: true,
+ error: null,
+ };
+ }
+
+ if (storageType === "json") {
+ return {
+ rows: jsonData,
+ state: jsonError ? "error" : jsonLoading ? "loading" : "done",
+ loading: jsonLoading,
+ error: jsonError,
+ };
+ }
+
+ // Default to CouchDB
+ return pouchDbData;
+};
diff --git a/hardpy/hardpy_panel/frontend/src/index.tsx b/hardpy/hardpy_panel/frontend/src/index.tsx
index 2660c807..6dec2c00 100644
--- a/hardpy/hardpy_panel/frontend/src/index.tsx
+++ b/hardpy/hardpy_panel/frontend/src/index.tsx
@@ -26,6 +26,21 @@ function ErrorMessage() {
);
}
+/**
+ * Gets the storage type from the backend API.
+ * @returns {Promise} A promise that resolves to the storage type ("json" or "couchdb").
+ */
+async function getStorageType(): Promise {
+ try {
+ const response = await fetch("/api/storage_type");
+ const data = await response.json();
+ return data.storage_type || "couchdb";
+ } catch (error) {
+ console.error(error);
+ return "couchdb";
+ }
+}
+
/**
* Fetches the synchronization URL for PouchDB from the backend API.
*
@@ -63,33 +78,49 @@ const root = ReactDOM.createRoot(
document.getElementById("root") as HTMLElement
);
-const syncURL = await getSyncURL();
-if (syncURL !== undefined) {
- const db = new PouchDB(syncURL);
+const storageType = await getStorageType();
+const syncDocumentId = await getDatabaseDocumentId();
- const syncDocumentId = await getDatabaseDocumentId();
+if (storageType === "json") {
+ // For JSON storage, create a dummy local PouchDB instance (not used but required by Provider)
+ // This creates an IndexedDB database that won't try to sync anywhere
+ const dummyDb = new PouchDB("hardpy-local-dummy");
- /**
- * Renders the main application wrapped in a PouchDB Provider and React StrictMode.
- *
- * @param {PouchDB.Database} db - The PouchDB database instance to be provided to the application.
- */
root.render(
-
+
);
} else {
- /**
- * Renders an error message if the PouchDB sync URL could not be retrieved.
- */
- root.render(
-
-
-
- );
+ // For CouchDB storage, connect to the actual database
+ const syncURL = await getSyncURL();
+ if (syncURL !== undefined) {
+ const db = new PouchDB(syncURL);
+
+ /**
+ * Renders the main application wrapped in a PouchDB Provider and React StrictMode.
+ *
+ * @param {PouchDB.Database} db - The PouchDB database instance to be provided to the application.
+ */
+ root.render(
+
+
+
+
+
+ );
+ } else {
+ /**
+ * Renders an error message if the PouchDB sync URL could not be retrieved.
+ */
+ root.render(
+
+
+
+ );
+ }
}
if (process.env.NODE_ENV !== "development") {
diff --git a/hardpy/pytest_hardpy/db/__init__.py b/hardpy/pytest_hardpy/db/__init__.py
index c267a673..5857f6c6 100644
--- a/hardpy/pytest_hardpy/db/__init__.py
+++ b/hardpy/pytest_hardpy/db/__init__.py
@@ -1,7 +1,6 @@
# Copyright (c) 2025 Everypin
# GNU General Public License v3.0 (see LICENSE or https://www.gnu.org/licenses/gpl-3.0.txt)
-from hardpy.pytest_hardpy.db.base_store import BaseStore
from hardpy.pytest_hardpy.db.const import DatabaseField
from hardpy.pytest_hardpy.db.runstore import RunStore
from hardpy.pytest_hardpy.db.schema import ResultRunStore, ResultStateStore
@@ -16,7 +15,6 @@
from hardpy.pytest_hardpy.db.tempstore import TempStore
__all__ = [
- "BaseStore",
"Chart",
"DatabaseField",
"Instrument",
diff --git a/hardpy/pytest_hardpy/db/base_store.py b/hardpy/pytest_hardpy/db/base_store.py
deleted file mode 100644
index 484b05a7..00000000
--- a/hardpy/pytest_hardpy/db/base_store.py
+++ /dev/null
@@ -1,179 +0,0 @@
-# Copyright (c) 2024 Everypin
-# GNU General Public License v3.0 (see LICENSE or https://www.gnu.org/licenses/gpl-3.0.txt)
-
-from json import dumps
-from logging import getLogger
-from typing import Any
-
-from glom import assign, glom
-from pycouchdb import Server as DbServer
-from pycouchdb.client import Database
-from pycouchdb.exceptions import Conflict, GenericError, NotFound
-from pydantic._internal._model_construction import ModelMetaclass
-from requests.exceptions import ConnectionError # noqa: A004
-
-from hardpy.common.config import ConfigManager
-from hardpy.pytest_hardpy.db.const import DatabaseField as DF # noqa: N817
-
-
-class BaseStore:
- """HardPy base storage interface for CouchDB."""
-
- def __init__(self, db_name: str) -> None:
- config_manager = ConfigManager()
- config = config_manager.config
- self._db_srv = DbServer(config.database.url)
- self._db_name = db_name
- self._db = self._init_db()
- self._doc_id = config.database.doc_id
- self._log = getLogger(__name__)
- self._doc: dict = self._init_doc()
- self._schema: ModelMetaclass
-
- def compact(self) -> None:
- """Compact database."""
- self._db.compact()
-
- def get_field(self, key: str) -> Any: # noqa: ANN401
- """Get field from the state store.
-
- Args:
- key (str): field name
-
- Returns:
- Any: field value
- """
- return glom(self._doc, key)
-
- def update_doc_value(self, key: str, value: Any) -> None: # noqa: ANN401
- """Update document value.
-
- HardPy collecting uses a simple key without dots.
- Assign is used to update a document.
- Assign is a longer function.
-
- Args:
- key (str): document key
- value: document value
- """
- try:
- dumps(value)
- except Exception: # noqa: BLE001
- # serialize non-serializable objects as string
- value = dumps(value, default=str)
- if "." in key:
- assign(self._doc, key, value)
- else:
- self._doc[key] = value
-
- def update_db(self) -> None:
- """Update database by current document."""
- try:
- self._doc = self._db.save(self._doc)
- except Conflict:
- self._doc["_rev"] = self._db.get(self._doc_id)["_rev"]
- self._doc = self._db.save(self._doc)
-
- def update_doc(self) -> None:
- """Update current document by database."""
- self._doc["_rev"] = self._db.get(self._doc_id)["_rev"]
- self._doc = self._db.get(self._doc_id)
-
- def get_document(self) -> ModelMetaclass:
- """Get document by schema.
-
- Returns:
- ModelMetaclass: document by schema
- """
- self._doc = self._db.get(self._doc_id)
- return self._schema(**self._doc)
-
- def clear(self) -> None:
- """Clear database."""
- try:
- # Clear statestore and runstore databases before each launch
- self._db.delete(self._doc_id)
- except (Conflict, NotFound):
- self._log.debug("Database will be created for the first time")
- self._doc: dict = self._init_doc()
-
- def _init_db(self) -> Database:
- try:
- return self._db_srv.create(self._db_name) # type: ignore
- except Conflict:
- # database is already created
- return self._db_srv.database(self._db_name)
- except GenericError as exc:
- msg = f"Error initializing database {exc}"
- raise RuntimeError(msg) from exc
- except ConnectionError as exc:
- msg = f"Error initializing database: {exc}"
- raise RuntimeError(msg) from exc
-
- def _init_doc(self) -> dict:
- try:
- doc = self._db.get(self._doc_id)
- except NotFound:
- return {
- "_id": self._doc_id,
- DF.MODULES: {},
- DF.DUT: {
- DF.TYPE: None,
- DF.NAME: None,
- DF.REVISION: None,
- DF.SERIAL_NUMBER: None,
- DF.PART_NUMBER: None,
- DF.SUB_UNITS: [],
- DF.INFO: {},
- },
- DF.TEST_STAND: {
- DF.HW_ID: None,
- DF.NAME: None,
- DF.REVISION: None,
- DF.TIMEZONE: None,
- DF.LOCATION: None,
- DF.NUMBER: None,
- DF.INSTRUMENTS: [],
- DF.DRIVERS: {},
- DF.INFO: {},
- },
- DF.PROCESS: {
- DF.NAME: None,
- DF.NUMBER: None,
- DF.INFO: {},
- },
- }
-
- # init document
- if DF.MODULES not in doc:
- doc[DF.MODULES] = {}
-
- doc[DF.DUT] = {
- DF.TYPE: None,
- DF.NAME: None,
- DF.REVISION: None,
- DF.SERIAL_NUMBER: None,
- DF.PART_NUMBER: None,
- DF.SUB_UNITS: [],
- DF.INFO: {},
- }
-
- doc[DF.TEST_STAND] = {
- DF.HW_ID: None,
- DF.NAME: None,
- DF.REVISION: None,
- DF.TIMEZONE: None,
- DF.LOCATION: None,
- DF.NUMBER: None,
- DF.INSTRUMENTS: [],
- DF.DRIVERS: {},
- DF.INFO: {},
- }
-
- doc[DF.PROCESS] = {
- DF.NAME: None,
- DF.NUMBER: None,
- DF.INFO: {},
- }
-
- return doc
diff --git a/hardpy/pytest_hardpy/db/runstore.py b/hardpy/pytest_hardpy/db/runstore.py
index 1ad27075..e8dfb5ad 100644
--- a/hardpy/pytest_hardpy/db/runstore.py
+++ b/hardpy/pytest_hardpy/db/runstore.py
@@ -1,28 +1,396 @@
# Copyright (c) 2024 Everypin
# GNU General Public License v3.0 (see LICENSE or https://www.gnu.org/licenses/gpl-3.0.txt)
+from __future__ import annotations
+
+import json
+from abc import ABC, abstractmethod
+from json import dumps
from logging import getLogger
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
-from pycouchdb.exceptions import Conflict, NotFound
+from glom import PathAccessError, assign, glom
+from hardpy.common.config import ConfigManager, StorageType
from hardpy.common.singleton import SingletonMeta
-from hardpy.pytest_hardpy.db.base_store import BaseStore
+from hardpy.pytest_hardpy.db.const import DatabaseField as DF # noqa: N817
from hardpy.pytest_hardpy.db.schema import ResultRunStore
+if TYPE_CHECKING:
+ from pycouchdb.client import Database # type: ignore[import-untyped]
+ from pydantic import BaseModel
+
+
+def _create_default_doc_structure(doc_id: str, doc_id_for_rev: str) -> dict:
+ """Create default document structure with standard fields.
+
+ Args:
+ doc_id (str): Document ID to use
+ doc_id_for_rev (str): Document ID for _rev field (for JSON compatibility)
+
+ Returns:
+ dict: Default document structure
+ """
+ return {
+ "_id": doc_id,
+ "_rev": doc_id_for_rev,
+ DF.MODULES: {},
+ DF.DUT: {
+ DF.TYPE: None,
+ DF.NAME: None,
+ DF.REVISION: None,
+ DF.SERIAL_NUMBER: None,
+ DF.PART_NUMBER: None,
+ DF.SUB_UNITS: [],
+ DF.INFO: {},
+ },
+ DF.TEST_STAND: {
+ DF.HW_ID: None,
+ DF.NAME: None,
+ DF.REVISION: None,
+ DF.TIMEZONE: None,
+ DF.LOCATION: None,
+ DF.NUMBER: None,
+ DF.INSTRUMENTS: [],
+ DF.DRIVERS: {},
+ DF.INFO: {},
+ },
+ DF.PROCESS: {
+ DF.NAME: None,
+ DF.NUMBER: None,
+ DF.INFO: {},
+ },
+ }
+
+
+class RunStoreInterface(ABC):
+ """Interface for run storage implementations."""
+
+ @abstractmethod
+ def get_field(self, key: str) -> Any: # noqa: ANN401
+ """Get field from the run store.
+
+ Args:
+ key (str): Field key, supports nested access with dots
+
+ Returns:
+ Any: Field value
+ """
-class RunStore(BaseStore, metaclass=SingletonMeta):
- """HardPy run storage interface for CouchDB.
+ @abstractmethod
+ def update_doc_value(self, key: str, value: Any) -> None: # noqa: ANN401
+ """Update document value in memory (does not persist).
- Save state and case artifact.
+ Args:
+ key (str): Field key, supports nested access with dots
+ value (Any): Value to set
+ """
+
+ @abstractmethod
+ def update_db(self) -> None:
+ """Persist in-memory document to storage backend."""
+
+ @abstractmethod
+ def update_doc(self) -> None:
+ """Reload document from storage backend to memory."""
+
+ @abstractmethod
+ def get_document(self) -> BaseModel:
+ """Get full document with schema validation.
+
+ Returns:
+ BaseModel: Validated document model
+ """
+
+ @abstractmethod
+ def clear(self) -> None:
+ """Clear storage and reset to initial state."""
+
+ @abstractmethod
+ def compact(self) -> None:
+ """Optimize storage (implementation-specific, may be no-op)."""
+
+
+class JsonRunStore(RunStoreInterface):
+ """JSON file-based run storage implementation.
+
+ Stores test run data using JSON files.
"""
def __init__(self) -> None:
- super().__init__("runstore")
+ config_manager = ConfigManager()
+ self._store_name = "runstore"
+ config_storage_path = Path(config_manager.config.database.storage_path)
+ if config_storage_path.is_absolute():
+ self._storage_dir = config_storage_path / "storage" / self._store_name
+ else:
+ self._storage_dir = Path(
+ config_manager.tests_path
+ / config_manager.config.database.storage_path
+ / "storage"
+ / self._store_name,
+ )
+ self._storage_dir.mkdir(parents=True, exist_ok=True)
+ self._doc_id = config_manager.config.database.doc_id
+ self._file_path = self._storage_dir / f"{self._doc_id}.json"
self._log = getLogger(__name__)
+ self._schema: type[BaseModel] = ResultRunStore
+ self._doc: dict = self._init_doc()
+
+ def get_field(self, key: str) -> Any: # noqa: ANN401
+ """Get field value from document using dot notation.
+
+ Args:
+ key (str): Field key, supports nested access with dots
+
+ Returns:
+ Any: Field value, or None if path does not exist
+ """
+ try:
+ return glom(self._doc, key)
+ except PathAccessError:
+ return None
+
+ def update_doc_value(self, key: str, value: Any) -> None: # noqa: ANN401
+ """Update document value in memory (does not persist).
+
+ Args:
+ key (str): Field key, supports nested access with dots
+ value (Any): Value to set
+ """
+ try:
+ dumps(value)
+ except Exception: # noqa: BLE001
+ value = dumps(value, default=str)
+
+ if "." in key:
+ assign(self._doc, key, value, missing=dict)
+ else:
+ self._doc[key] = value
+
+ def update_db(self) -> None:
+ """Persist in-memory document to JSON file with atomic write."""
+ self._storage_dir.mkdir(parents=True, exist_ok=True)
+ temp_file = self._file_path.with_suffix(".tmp")
+
+ try:
+ with temp_file.open("w") as f:
+ json.dump(self._doc, f, indent=2, default=str)
+ temp_file.replace(self._file_path)
+ except Exception as exc:
+ self._log.error(f"Error writing to storage file: {exc}")
+ if temp_file.exists():
+ temp_file.unlink()
+ raise
+
+ def update_doc(self) -> None:
+ """Reload document from JSON file to memory."""
+ if self._file_path.exists():
+ try:
+ with self._file_path.open("r") as f:
+ self._doc = json.load(f)
+ except json.JSONDecodeError as exc:
+ self._log.error(f"Error reading storage file: {exc}")
+ except Exception as exc:
+ self._log.error(f"Error reading storage file: {exc}")
+ raise
+
+ def get_document(self) -> BaseModel:
+ """Get full document with schema validation.
+
+ Returns:
+ BaseModel: Validated document model
+ """
+ self.update_doc()
+ return self._schema(**self._doc)
+
+ def clear(self) -> None:
+ """Clear storage by resetting to initial state (in-memory only)."""
+ self._doc = _create_default_doc_structure(self._doc_id, self._doc_id)
+
+ def compact(self) -> None:
+ """Optimize storage (no-op for JSON file storage)."""
+
+ def _init_doc(self) -> dict:
+ """Initialize or load document structure."""
+ if self._file_path.exists():
+ try:
+ with self._file_path.open("r") as f:
+ doc = json.load(f)
+
+ if DF.MODULES not in doc:
+ doc[DF.MODULES] = {}
+
+ return doc
+ except json.JSONDecodeError:
+ self._log.warning(f"Corrupted storage file {self._file_path},"
+ f" creating new")
+ except Exception as exc: # noqa: BLE001
+ self._log.warning(f"Error loading storage file: {exc}, creating new")
+
+ return _create_default_doc_structure(self._doc_id, self._doc_id)
+
+
+class CouchDBRunStore(RunStoreInterface):
+ """CouchDB-based run storage implementation.
+
+ Stores test run data using CouchDB.
+ Clears the storage on initialization to start fresh.
+ """
+
+ def __init__(self) -> None:
+ from pycouchdb import Server as DbServer # type: ignore[import-untyped]
+ from pycouchdb.exceptions import ( # type: ignore[import-untyped]
+ Conflict,
+ GenericError,
+ )
+ from requests.exceptions import ConnectionError # noqa: A004
+
+ config_manager = ConfigManager()
+ config = config_manager.config
+ self._db_srv = DbServer(config.database.url)
+ self._db_name = "runstore"
+ self._doc_id = config.database.doc_id
+ self._log = getLogger(__name__)
+ self._schema: type[BaseModel] = ResultRunStore
+
+ # Initialize database
+ try:
+ self._db: Database = self._db_srv.create(self._db_name) # type: ignore[name-defined]
+ except Conflict:
+ self._db = self._db_srv.database(self._db_name)
+ except GenericError as exc:
+ msg = f"Error initializing database {exc}"
+ raise RuntimeError(msg) from exc
+ except ConnectionError as exc:
+ msg = f"Error initializing database: {exc}"
+ raise RuntimeError(msg) from exc
+
+ self._doc: dict = self._init_doc()
+
+ # Clear the runstore on initialization for CouchDB
+ try:
+ self.clear()
+ except Exception: # noqa: BLE001
+ self._log.debug("Runstore storage will be created for the first time")
+
+ def get_field(self, key: str) -> Any: # noqa: ANN401
+ """Get field from the run store.
+
+ Args:
+ key (str): Field key, supports nested access with dots
+
+ Returns:
+ Any: Field value, or None if path does not exist
+ """
+ try:
+ return glom(self._doc, key)
+ except PathAccessError:
+ return None
+
+ def update_doc_value(self, key: str, value: Any) -> None: # noqa: ANN401
+ """Update document value in memory (does not persist).
+
+ Args:
+ key (str): Field key, supports nested access with dots
+ value (Any): Value to set
+ """
+ try:
+ dumps(value)
+ except Exception: # noqa: BLE001
+ value = dumps(value, default=str)
+
+ if "." in key:
+ assign(self._doc, key, value, missing=dict)
+ else:
+ self._doc[key] = value
+
+ def update_db(self) -> None:
+ """Persist in-memory document to storage backend."""
+ from pycouchdb.exceptions import Conflict # type: ignore[import-untyped]
+
+ try:
+ self._doc = self._db.save(self._doc)
+ except Conflict:
+ self._doc["_rev"] = self._db.get(self._doc_id)["_rev"]
+ self._doc = self._db.save(self._doc)
+
+ def update_doc(self) -> None:
+ """Reload document from storage backend to memory."""
+ self._doc["_rev"] = self._db.get(self._doc_id)["_rev"]
+ self._doc = self._db.get(self._doc_id)
+
+ def get_document(self) -> BaseModel:
+ """Get full document with schema validation.
+
+ Returns:
+ BaseModel: Validated document model
+ """
+ self._doc = self._db.get(self._doc_id)
+ return self._schema(**self._doc)
+
+ def clear(self) -> None:
+ """Clear storage and reset to initial state."""
+ from pycouchdb.exceptions import ( # type: ignore[import-untyped]
+ Conflict,
+ NotFound,
+ )
+
try:
- # Clear the runstore database before each launch
self._db.delete(self._doc_id)
except (Conflict, NotFound):
- self._log.debug("Runstore database will be created for the first time")
- self._doc: dict = self._init_doc()
- self._schema = ResultRunStore
+ self._log.debug("Database will be created for the first time")
+ self._doc = self._init_doc()
+
+ def compact(self) -> None:
+ """Optimize storage (implementation-specific, may be no-op)."""
+ self._db.compact()
+
+ def _init_doc(self) -> dict:
+ """Initialize or load document structure."""
+ from pycouchdb.exceptions import NotFound # type: ignore[import-untyped]
+
+ try:
+ doc = self._db.get(self._doc_id)
+ except NotFound:
+ # CouchDB doesn't need _rev field in the default structure
+ default = _create_default_doc_structure(self._doc_id, self._doc_id)
+ del default["_rev"] # CouchDB manages _rev automatically
+ return default
+
+ if DF.MODULES not in doc:
+ doc[DF.MODULES] = {}
+
+ return doc
+
+
+class RunStore(metaclass=SingletonMeta):
+ """HardPy run storage factory for test run data.
+
+ Creates appropriate storage backend based on configuration:
+ - JSON file storage when storage_type is "json"
+ - CouchDB storage when storage_type is "couchdb"
+
+ Save state and case artifact. Supports multiple storage backends
+ through the factory pattern.
+
+ Note: This class acts as a factory. When instantiated, it returns
+ the appropriate concrete implementation (JsonRunStore or CouchDBRunStore).
+ """
+
+ def __new__(cls) -> RunStoreInterface: # type: ignore[misc]
+ """Create and return the appropriate storage implementation.
+
+ Returns:
+ RunStoreInterface: Concrete storage implementation based on config
+ """
+ config = ConfigManager()
+ storage_type = config.config.database.storage_type
+
+ if storage_type == StorageType.JSON:
+ return JsonRunStore()
+ if storage_type == StorageType.COUCHDB:
+ return CouchDBRunStore()
+ msg = f"Unknown storage type: {storage_type}"
+ raise ValueError(msg)
diff --git a/hardpy/pytest_hardpy/db/statestore.py b/hardpy/pytest_hardpy/db/statestore.py
index 5c8fa638..f07f8eff 100644
--- a/hardpy/pytest_hardpy/db/statestore.py
+++ b/hardpy/pytest_hardpy/db/statestore.py
@@ -1,17 +1,402 @@
# Copyright (c) 2024 Everypin
# GNU General Public License v3.0 (see LICENSE or https://www.gnu.org/licenses/gpl-3.0.txt)
+from __future__ import annotations
+
+import json
+from abc import ABC, abstractmethod
+from json import dumps
from logging import getLogger
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+from glom import PathAccessError, assign, glom
+from hardpy.common.config import ConfigManager, StorageType
from hardpy.common.singleton import SingletonMeta
-from hardpy.pytest_hardpy.db.base_store import BaseStore
+from hardpy.pytest_hardpy.db.const import DatabaseField as DF # noqa: N817
from hardpy.pytest_hardpy.db.schema import ResultStateStore
+if TYPE_CHECKING:
+ from pycouchdb.client import Database # type: ignore[import-untyped]
+ from pydantic import BaseModel
+
+
+def _create_default_doc_structure(doc_id: str, doc_id_for_rev: str) -> dict:
+ """Create default document structure with standard fields.
+
+ Args:
+ doc_id (str): Document ID to use
+ doc_id_for_rev (str): Document ID for _rev field (for JSON compatibility)
+
+ Returns:
+ dict: Default document structure
+ """
+ return {
+ "_id": doc_id,
+ "_rev": doc_id_for_rev,
+ DF.MODULES: {},
+ DF.DUT: {
+ DF.TYPE: None,
+ DF.NAME: None,
+ DF.REVISION: None,
+ DF.SERIAL_NUMBER: None,
+ DF.PART_NUMBER: None,
+ DF.SUB_UNITS: [],
+ DF.INFO: {},
+ },
+ DF.TEST_STAND: {
+ DF.HW_ID: None,
+ DF.NAME: None,
+ DF.REVISION: None,
+ DF.TIMEZONE: None,
+ DF.LOCATION: None,
+ DF.NUMBER: None,
+ DF.INSTRUMENTS: [],
+ DF.DRIVERS: {},
+ DF.INFO: {},
+ },
+ DF.PROCESS: {
+ DF.NAME: None,
+ DF.NUMBER: None,
+ DF.INFO: {},
+ },
+ }
+
+
+class StateStoreInterface(ABC):
+ """Interface for state storage implementations."""
+
+ @abstractmethod
+ def get_field(self, key: str) -> Any: # noqa: ANN401
+ """Get field from the state store.
+
+ Args:
+ key (str): Field key, supports nested access with dots
+
+ Returns:
+ Any: Field value
+ """
+
+ @abstractmethod
+ def update_doc_value(self, key: str, value: Any) -> None: # noqa: ANN401
+ """Update document value in memory (does not persist).
+
+ Args:
+ key (str): Field key, supports nested access with dots
+ value (Any): Value to set
+ """
+
+ @abstractmethod
+ def update_db(self) -> None:
+ """Persist in-memory document to storage backend."""
+
+ @abstractmethod
+ def update_doc(self) -> None:
+ """Reload document from storage backend to memory."""
+
+ @abstractmethod
+ def get_document(self) -> BaseModel:
+ """Get full document with schema validation.
+
+ Returns:
+ BaseModel: Validated document model
+ """
+
+ @abstractmethod
+ def clear(self) -> None:
+ """Clear storage and reset to initial state."""
+
+ @abstractmethod
+ def compact(self) -> None:
+ """Optimize storage (implementation-specific, may be no-op)."""
-class StateStore(BaseStore, metaclass=SingletonMeta):
- """HardPy state storage interface for CouchDB."""
+
+class JsonStateStore(StateStoreInterface):
+ """JSON file-based state storage implementation.
+
+ Stores test execution state using JSON files.
+ """
def __init__(self) -> None:
- super().__init__("statestore")
+ config_manager = ConfigManager()
+ self._store_name = "statestore"
+ config_storage_path = Path(config_manager.config.database.storage_path)
+ if config_storage_path.is_absolute():
+ self._storage_dir = config_storage_path / "storage" / self._store_name
+ else:
+ self._storage_dir = Path(
+ config_manager.tests_path
+ / config_manager.config.database.storage_path
+ / "storage"
+ / self._store_name,
+ )
+ self._storage_dir.mkdir(parents=True, exist_ok=True)
+ self._doc_id = config_manager.config.database.doc_id
+ self._file_path = self._storage_dir / f"{self._doc_id}.json"
self._log = getLogger(__name__)
- self._schema = ResultStateStore
+ self._schema: type[BaseModel] = ResultStateStore
+ self._doc: dict = self._init_doc()
+
+ def get_field(self, key: str) -> Any: # noqa: ANN401
+ """Get field value from document using dot notation.
+
+ Args:
+ key (str): Field key, supports nested access with dots
+
+ Returns:
+ Any: Field value, or None if path does not exist
+ """
+ try:
+ return glom(self._doc, key)
+ except PathAccessError:
+ return None
+
+ def update_doc_value(self, key: str, value: Any) -> None: # noqa: ANN401
+ """Update document value in memory (does not persist).
+
+ Args:
+ key (str): Field key, supports nested access with dots
+ value (Any): Value to set
+ """
+ try:
+ dumps(value)
+ except Exception: # noqa: BLE001
+ value = dumps(value, default=str)
+
+ if "." in key:
+ assign(self._doc, key, value, missing=dict)
+ else:
+ self._doc[key] = value
+
+ def update_db(self) -> None:
+ """Persist in-memory document to JSON file with atomic write."""
+ self._storage_dir.mkdir(parents=True, exist_ok=True)
+ temp_file = self._file_path.with_suffix(".tmp")
+
+ try:
+ with temp_file.open("w") as f:
+ json.dump(self._doc, f, indent=2, default=str)
+ temp_file.replace(self._file_path)
+ except Exception as exc:
+ self._log.error(f"Error writing to storage file: {exc}")
+ if temp_file.exists():
+ temp_file.unlink()
+ raise
+
+ def update_doc(self) -> None:
+ """Reload document from JSON file to memory."""
+ if self._file_path.exists():
+ try:
+ with self._file_path.open("r") as f:
+ self._doc = json.load(f)
+ except json.JSONDecodeError as exc:
+ self._log.error(f"Error reading storage file: {exc}")
+ except Exception as exc:
+ self._log.error(f"Error reading storage file: {exc}")
+ raise
+
+ def get_document(self) -> BaseModel:
+ """Get full document with schema validation.
+
+ Returns:
+ BaseModel: Validated document model
+ """
+ self.update_doc()
+ return self._schema(**self._doc)
+
+ def clear(self) -> None:
+ """Clear storage by resetting to initial state (in-memory only)."""
+ self._doc = _create_default_doc_structure(self._doc_id, self._doc_id)
+
+ def compact(self) -> None:
+ """Optimize storage (no-op for JSON file storage)."""
+
+ def _init_doc(self) -> dict:
+ """Initialize or load document structure."""
+ if self._file_path.exists():
+ try:
+ with self._file_path.open("r") as f:
+ doc = json.load(f)
+
+ if DF.MODULES not in doc:
+ doc[DF.MODULES] = {}
+
+ # Reset volatile fields for statestore
+ default_doc = _create_default_doc_structure(doc["_id"],
+ self._doc_id)
+ doc[DF.DUT] = default_doc[DF.DUT]
+ doc[DF.TEST_STAND] = default_doc[DF.TEST_STAND]
+ doc[DF.PROCESS] = default_doc[DF.PROCESS]
+
+ return doc
+ except json.JSONDecodeError:
+ self._log.warning(
+ f"Corrupted storage file {self._file_path}, creating new",
+ )
+ except Exception as exc: # noqa: BLE001
+ self._log.warning(f"Error loading storage file: {exc}, creating new")
+
+ return _create_default_doc_structure(self._doc_id, self._doc_id)
+
+
+class CouchDBStateStore(StateStoreInterface):
+ """CouchDB-based state storage implementation.
+
+ Stores test execution state using CouchDB.
+ """
+
+ def __init__(self) -> None:
+ from pycouchdb import Server as DbServer # type: ignore[import-untyped]
+ from pycouchdb.exceptions import ( # type: ignore[import-untyped]
+ Conflict,
+ GenericError,
+ )
+ from requests.exceptions import ConnectionError # noqa: A004
+
+ config_manager = ConfigManager()
+ config = config_manager.config
+ self._db_srv = DbServer(config.database.url)
+ self._db_name = "statestore"
+ self._doc_id = config.database.doc_id
+ self._log = getLogger(__name__)
+ self._schema: type[BaseModel] = ResultStateStore
+
+ # Initialize database
+ try:
+ self._db: Database = self._db_srv.create(self._db_name) # type: ignore[name-defined]
+ except Conflict:
+ self._db = self._db_srv.database(self._db_name)
+ except GenericError as exc:
+ msg = f"Error initializing database {exc}"
+ raise RuntimeError(msg) from exc
+ except ConnectionError as exc:
+ msg = f"Error initializing database: {exc}"
+ raise RuntimeError(msg) from exc
+
+ self._doc: dict = self._init_doc()
+
+ def get_field(self, key: str) -> Any: # noqa: ANN401
+ """Get field from the state store.
+
+ Args:
+ key (str): Field key, supports nested access with dots
+
+ Returns:
+ Any: Field value, or None if path does not exist
+ """
+ try:
+ return glom(self._doc, key)
+ except PathAccessError:
+ return None
+
+ def update_doc_value(self, key: str, value: Any) -> None: # noqa: ANN401
+ """Update document value in memory (does not persist).
+
+ Args:
+ key (str): Field key, supports nested access with dots
+ value (Any): Value to set
+ """
+ try:
+ dumps(value)
+ except Exception: # noqa: BLE001
+ value = dumps(value, default=str)
+
+ if "." in key:
+ assign(self._doc, key, value, missing=dict)
+ else:
+ self._doc[key] = value
+
+ def update_db(self) -> None:
+ """Persist in-memory document to storage backend."""
+ from pycouchdb.exceptions import Conflict # type: ignore[import-untyped]
+
+ try:
+ self._doc = self._db.save(self._doc)
+ except Conflict:
+ self._doc["_rev"] = self._db.get(self._doc_id)["_rev"]
+ self._doc = self._db.save(self._doc)
+
+ def update_doc(self) -> None:
+ """Reload document from storage backend to memory."""
+ self._doc["_rev"] = self._db.get(self._doc_id)["_rev"]
+ self._doc = self._db.get(self._doc_id)
+
+ def get_document(self) -> BaseModel:
+ """Get full document with schema validation.
+
+ Returns:
+ BaseModel: Validated document model
+ """
+ self._doc = self._db.get(self._doc_id)
+ return self._schema(**self._doc)
+
+ def clear(self) -> None:
+ """Clear storage and reset to initial state."""
+ from pycouchdb.exceptions import ( # type: ignore[import-untyped]
+ Conflict,
+ NotFound,
+ )
+
+ try:
+ self._db.delete(self._doc_id)
+ except (Conflict, NotFound):
+ self._log.debug("Database will be created for the first time")
+ self._doc = self._init_doc()
+
+ def compact(self) -> None:
+ """Optimize storage (implementation-specific, may be no-op)."""
+ self._db.compact()
+
+ def _init_doc(self) -> dict:
+ """Initialize or load document structure."""
+ from pycouchdb.exceptions import NotFound # type: ignore[import-untyped]
+
+ try:
+ doc = self._db.get(self._doc_id)
+ except NotFound:
+ # CouchDB doesn't need _rev field in the default structure
+ default = _create_default_doc_structure(self._doc_id, self._doc_id)
+ del default["_rev"] # CouchDB manages _rev automatically
+ return default
+
+ if DF.MODULES not in doc:
+ doc[DF.MODULES] = {}
+
+ # Reset volatile fields
+ default_doc = _create_default_doc_structure(doc["_id"], self._doc_id)
+ doc[DF.DUT] = default_doc[DF.DUT]
+ doc[DF.TEST_STAND] = default_doc[DF.TEST_STAND]
+ doc[DF.PROCESS] = default_doc[DF.PROCESS]
+
+ return doc
+
+
+class StateStore(metaclass=SingletonMeta):
+ """HardPy state storage factory for test execution state.
+
+ Creates appropriate storage backend based on configuration:
+ - JSON file storage when storage_type is "json"
+ - CouchDB storage when storage_type is "couchdb"
+
+ This ensures state data is stored in the same backend as the main data.
+
+ Note: This class acts as a factory. When instantiated, it returns
+ the appropriate concrete implementation (JsonStateStore or CouchDBStateStore).
+ """
+
+ def __new__(cls) -> StateStoreInterface: # type: ignore[misc]
+ """Create and return the appropriate storage implementation.
+
+ Returns:
+ StateStoreInterface: Concrete storage implementation based on config
+ """
+ config = ConfigManager()
+ storage_type = config.config.database.storage_type
+
+ if storage_type == StorageType.JSON:
+ return JsonStateStore()
+ if storage_type == StorageType.COUCHDB:
+ return CouchDBStateStore()
+ msg = f"Unknown storage type: {storage_type}"
+ raise ValueError(msg)
diff --git a/hardpy/pytest_hardpy/db/tempstore.py b/hardpy/pytest_hardpy/db/tempstore.py
index 5930fe80..235a08b7 100644
--- a/hardpy/pytest_hardpy/db/tempstore.py
+++ b/hardpy/pytest_hardpy/db/tempstore.py
@@ -3,52 +3,254 @@
from __future__ import annotations
+import json
+from abc import ABC, abstractmethod
from logging import getLogger
+from pathlib import Path
from typing import TYPE_CHECKING
-from pycouchdb.exceptions import Conflict, NotFound
+from uuid6 import uuid7
+from hardpy.common.config import ConfigManager, StorageType
from hardpy.common.singleton import SingletonMeta
-from hardpy.pytest_hardpy.db.base_store import BaseStore
from hardpy.pytest_hardpy.db.schema import ResultRunStore
if TYPE_CHECKING:
from collections.abc import Generator
-class TempStore(BaseStore, metaclass=SingletonMeta):
- """HardPy temporary storage for data syncronization."""
+class TempStoreInterface(ABC):
+ """Interface for temporary storage implementations."""
+
+ @abstractmethod
+ def push_report(self, report: ResultRunStore) -> bool:
+ """Push report to the temporary storage.
+
+ Args:
+ report (ResultRunStore): report to store
+
+ Returns:
+ bool: True if successful, False otherwise
+ """
+
+ @abstractmethod
+ def reports(self) -> Generator[dict]:
+ """Get all reports from the temporary storage.
+
+ Yields:
+ dict: report from temporary storage
+ """
+
+ @abstractmethod
+ def delete(self, report_id: str) -> bool:
+ """Delete report from the temporary storage.
+
+ Args:
+ report_id (str): report ID to delete
+
+ Returns:
+ bool: True if successful, False otherwise
+ """
+
+ def dict_to_schema(self, report: dict) -> ResultRunStore:
+ """Convert report dict to report schema.
+
+ Args:
+ report (dict): report dictionary
+
+ Returns:
+ ResultRunStore: validated report schema
+ """
+ return ResultRunStore(**report)
+
+
+class JsonTempStore(TempStoreInterface):
+ """JSON file-based temporary storage implementation.
+
+ Stores reports temporarily when StandCloud sync fails using JSON files.
+ """
def __init__(self) -> None:
- super().__init__("tempstore")
self._log = getLogger(__name__)
- self._doc: dict = self._init_doc()
+ config = ConfigManager()
+ config_storage_path = Path(config.config.database.storage_path)
+ if config_storage_path.is_absolute():
+ self._storage_dir = config_storage_path / "storage" / "tempstore"
+ else:
+ self._storage_dir = Path(
+ config.tests_path
+ / config.config.database.storage_path
+ / "storage"
+ / "tempstore",
+ )
+ self._storage_dir.mkdir(parents=True, exist_ok=True)
self._schema = ResultRunStore
def push_report(self, report: ResultRunStore) -> bool:
- """Push report to the report database."""
+ """Push report to the temporary storage.
+
+ Args:
+ report (ResultRunStore): report to store
+
+ Returns:
+ bool: True if successful, False otherwise
+ """
report_dict = report.model_dump()
- report_id = report_dict.pop("id")
+ report_dict.pop("id", None)
+ report_id = str(uuid7())
+ report_dict["_id"] = report_id
+ report_dict["_rev"] = report_id
+ report_file = self._storage_dir / f"{report_id}.json"
+
+ try:
+ with report_file.open("w") as f:
+ json.dump(report_dict, f, indent=2, default=str)
+ except Exception as exc: # noqa: BLE001
+ self._log.error(f"Error while saving report {report_id}: {exc}")
+ return False
+ else:
+ self._log.debug(f"Report saved with id: {report_id}")
+ return True
+
+ def reports(self) -> Generator[dict]:
+ """Get all reports from the temporary storage.
+
+ Yields:
+ dict: report from temporary storage
+ """
+ for report_file in self._storage_dir.glob("*.json"):
+ try:
+ with report_file.open("r") as f:
+ report_dict = json.load(f)
+ yield report_dict
+ except Exception as exc: # noqa: BLE001, PERF203
+ self._log.error(f"Error loading report from {report_file}: {exc}")
+ continue
+
+ def delete(self, report_id: str) -> bool:
+ """Delete report from the temporary storage.
+
+ Args:
+ report_id (str): report ID to delete
+
+ Returns:
+ bool: True if successful, False otherwise
+ """
+ report_file = self._storage_dir / f"{report_id}.json"
+ try:
+ report_file.unlink()
+ except FileNotFoundError:
+ self._log.warning(f"Report {report_id} not found in temporary storage")
+ return False
+ except Exception as exc: # noqa: BLE001
+ self._log.error(f"Error deleting report {report_id}: {exc}")
+ return False
+ else:
+ return True
+
+
+class CouchDBTempStore(TempStoreInterface):
+ """CouchDB-based temporary storage implementation.
+
+ Stores reports temporarily when StandCloud sync fails using CouchDB.
+ """
+
+ def __init__(self) -> None:
+ from pycouchdb import Server as DbServer # type: ignore[import-untyped]
+ from pycouchdb.exceptions import Conflict # type: ignore[import-untyped]
+
+ self._log = getLogger(__name__)
+ config = ConfigManager()
+ self._db_srv = DbServer(config.config.database.url)
+ self._db_name = "tempstore"
+ self._schema = ResultRunStore
+
+ try:
+ self._db = self._db_srv.create(self._db_name)
+ except Conflict:
+ # database already exists
+ self._db = self._db_srv.database(self._db_name)
+
+ def push_report(self, report: ResultRunStore) -> bool:
+ """Push report to the temporary storage.
+
+ Args:
+ report (ResultRunStore): report to store
+
+ Returns:
+ bool: True if successful, False otherwise
+ """
+ from pycouchdb.exceptions import Conflict # type: ignore[import-untyped]
+
+ report_dict = report.model_dump()
+ report_id = report_dict.pop("id", None)
+ if not report_id:
+ self._log.error("Report missing required 'id' field")
+ return False
try:
self._db.save(report_dict)
except Conflict as exc:
self._log.error(f"Error while saving report {report_id}: {exc}")
return False
- self._log.debug(f"Report saved with id: {report_id}")
- return True
+ else:
+ self._log.debug(f"Report saved with id: {report_id}")
+ return True
+
+ def reports(self) -> Generator[dict]:
+ """Get all reports from the temporary storage.
- def reports(self) -> Generator[ResultRunStore]:
- """Get all reports from the report database."""
+ Yields:
+ dict: report from temporary storage
+ """
yield from self._db.all()
def delete(self, report_id: str) -> bool:
- """Delete report from the report database."""
+ """Delete report from the temporary storage.
+
+ Args:
+ report_id (str): report ID to delete
+
+ Returns:
+ bool: True if successful, False otherwise
+ """
+ from pycouchdb.exceptions import ( # type: ignore[import-untyped]
+ Conflict,
+ NotFound,
+ )
+
try:
self._db.delete(report_id)
except (NotFound, Conflict):
return False
- return True
+ else:
+ return True
- def dict_to_schema(self, report: dict) -> ResultRunStore:
- """Convert report dict to report schema."""
- return self._schema(**report)
+
+class TempStore(metaclass=SingletonMeta):
+ """HardPy temporary storage factory for data synchronization.
+
+ Creates appropriate storage backend based on configuration:
+ - JSON file storage when storage_type is "json"
+ - CouchDB storage when storage_type is "couchdb"
+
+ This ensures temporary reports are stored in the same backend as the main data.
+
+ Note: This class acts as a factory. When instantiated, it returns
+ the appropriate concrete implementation (JsonTempStore or CouchDBTempStore).
+ """
+
+ def __new__(cls) -> TempStoreInterface: # type: ignore[misc]
+ """Create and return the appropriate storage implementation.
+
+ Returns:
+ TempStoreInterface: Concrete storage implementation based on config
+ """
+ config = ConfigManager()
+ storage_type = config.config.database.storage_type
+
+ if storage_type == StorageType.JSON:
+ return JsonTempStore()
+ if storage_type == StorageType.COUCHDB:
+ return CouchDBTempStore()
+ msg = f"Unknown storage type: {storage_type}"
+ raise ValueError(msg)
diff --git a/hardpy/pytest_hardpy/plugin.py b/hardpy/pytest_hardpy/plugin.py
index 373f999f..d4c4ffeb 100644
--- a/hardpy/pytest_hardpy/plugin.py
+++ b/hardpy/pytest_hardpy/plugin.py
@@ -496,7 +496,7 @@ def _validate_stop_time(self) -> None:
module_start_time = self._reporter.get_module_start_time(module_id)
module_stop_time = self._reporter.get_module_stop_time(module_id)
if module_start_time and not module_stop_time:
- self._reporter.set_module_stop_time(module_start_time)
+ self._reporter.set_module_stop_time(module_id)
for module_data_key in module_data:
# skip module status
if module_data_key == "module_status":
@@ -505,7 +505,7 @@ def _validate_stop_time(self) -> None:
case_start_time = self._reporter.get_case_start_time(module_id, case_id)
case_stop_time = self._reporter.get_case_stop_time(module_id, case_id)
if case_start_time and not case_stop_time:
- self._reporter.set_case_stop_time(case_start_time)
+ self._reporter.set_case_stop_time(module_id, case_id)
def _stop_tests(self) -> None:
"""Update module and case statuses to stopped and skipped."""
diff --git a/hardpy/pytest_hardpy/result/__init__.py b/hardpy/pytest_hardpy/result/__init__.py
index a82d101b..204629db 100644
--- a/hardpy/pytest_hardpy/result/__init__.py
+++ b/hardpy/pytest_hardpy/result/__init__.py
@@ -2,6 +2,7 @@
# GNU General Public License v3.0 (see LICENSE or https://www.gnu.org/licenses/gpl-3.0.txt)
from hardpy.pytest_hardpy.result.report_loader.couchdb_loader import CouchdbLoader
+from hardpy.pytest_hardpy.result.report_loader.json_loader import JsonLoader
from hardpy.pytest_hardpy.result.report_loader.stand_cloud_loader import (
StandCloudLoader,
)
@@ -13,6 +14,7 @@
__all__ = [
"CouchdbLoader",
"CouchdbReader",
+ "JsonLoader",
"StandCloudLoader",
"StandCloudReader",
]
diff --git a/hardpy/pytest_hardpy/result/report_loader/json_loader.py b/hardpy/pytest_hardpy/result/report_loader/json_loader.py
new file mode 100644
index 00000000..c6e3d2eb
--- /dev/null
+++ b/hardpy/pytest_hardpy/result/report_loader/json_loader.py
@@ -0,0 +1,49 @@
+# Copyright (c) 2024 Everypin
+# GNU General Public License v3.0 (see LICENSE or https://www.gnu.org/licenses/gpl-3.0.txt)
+from __future__ import annotations
+
+import json
+from logging import getLogger
+from pathlib import Path
+from typing import TYPE_CHECKING
+
+from uuid6 import uuid7
+
+if TYPE_CHECKING:
+ from hardpy.pytest_hardpy.db.schema import ResultRunStore
+
+
+class JsonLoader:
+ """JSON report generator."""
+
+ def __init__(self, storage_dir: Path | None = None) -> None:
+ if not storage_dir:
+ storage_dir = Path.cwd() / "reports"
+ self._storage_dir = storage_dir
+ self._storage_dir.mkdir(parents=True, exist_ok=True)
+ self._log = getLogger(__name__)
+
+ def load(self, report: ResultRunStore, new_report_id: str | None = None) -> bool:
+ """Load report to the report database.
+
+ Args:
+ report (ResultRunStore): report
+ new_report_id (str | None, optional): user's report ID. Defaults to uuid7.
+
+ Returns:
+ bool: True if success, else False
+ """
+ report_dict = report.model_dump()
+ report_id = new_report_id if new_report_id else str(uuid7())
+ report_dict["id"] = report_id
+ report_file = self._storage_dir / f"{report_id}.json"
+
+ try:
+ with report_file.open("w") as f:
+ json.dump(report_dict, f, indent=2, default=str)
+ except Exception as exc: # noqa: BLE001
+ self._log.error(f"Error while saving report {report_id}: {exc}")
+ return False
+ else:
+ self._log.debug(f"Report saved with id: {report_id}")
+ return True
diff --git a/hardpy/pytest_hardpy/result/report_synchronizer/synchronizer.py b/hardpy/pytest_hardpy/result/report_synchronizer/synchronizer.py
index de3cea4a..4d1c9b50 100644
--- a/hardpy/pytest_hardpy/result/report_synchronizer/synchronizer.py
+++ b/hardpy/pytest_hardpy/result/report_synchronizer/synchronizer.py
@@ -8,7 +8,7 @@
from pydantic import ValidationError
from hardpy.common.stand_cloud.exception import StandCloudError
-from hardpy.pytest_hardpy.db.tempstore import TempStore
+from hardpy.pytest_hardpy.db.tempstore import CouchDBTempStore, TempStore
from hardpy.pytest_hardpy.result.report_loader.stand_cloud_loader import (
StandCloudLoader,
)
@@ -21,7 +21,18 @@ class StandCloudSynchronizer:
"""Synchronize reports with StandCloud."""
def __init__(self) -> None:
- self._tempstore = TempStore()
+ self._tempstore: TempStore | None = None
+
+ @property
+ def _get_tempstore(self) -> TempStore:
+ """Get TempStore instance lazily.
+
+ Returns:
+ TempStore: TempStore singleton instance
+ """
+ if self._tempstore is None:
+ self._tempstore = TempStore()
+ return self._tempstore
def sync(self) -> str:
"""Sync reports with StandCloud.
@@ -29,16 +40,21 @@ def sync(self) -> str:
Returns:
str: Synchronization message
"""
- if not self._tempstore.reports():
+ _tempstore = self._get_tempstore
+ if not self._get_tempstore.reports():
return "All reports are synchronized with StandCloud"
loader = self._create_sc_loader()
invalid_reports = []
success_report_counter = 0
- for _report in self._tempstore.reports():
+ for _report in self._get_tempstore.reports():
try:
- report_id = _report.get("id")
- document: dict = _report.get("doc")
+ if isinstance(_tempstore, CouchDBTempStore):
+ document: dict = _report.get("doc") # type: ignore[assignment]
+ report_id: str = _report.get("id") # type: ignore[assignment]
+ else:
+ document: dict = _report
+ report_id: str = _report.get("_id")
document.pop("rev")
except KeyError:
try:
@@ -50,7 +66,7 @@ def sync(self) -> str:
invalid_reports.append({report_id: reason})
continue
try:
- schema_report = self._tempstore.dict_to_schema(document)
+ schema_report = self._get_tempstore.dict_to_schema(document)
except ValidationError as exc:
reason = f"Report has invalid format: {exc}"
invalid_reports.append({report_id: reason})
@@ -65,7 +81,7 @@ def sync(self) -> str:
reason = f"Staus code: {response.status_code}, text: {response.text}"
invalid_reports.append({report_id: reason})
continue
- if not self._tempstore.delete(report_id):
+ if not self._get_tempstore.delete(report_id):
reason = f"Report {report_id} not deleted from the temporary storage"
invalid_reports.append({report_id: reason})
success_report_counter += 1
@@ -80,7 +96,7 @@ def push_to_tempstore(self, report: ResultRunStore) -> bool:
Returns:
bool: True if success, else False
"""
- return self._tempstore.push_report(report)
+ return self._get_tempstore.push_report(report)
def push_to_sc(self, report: ResultRunStore) -> bool:
"""Push report to the StandCloud.
diff --git a/mkdocs.yml b/mkdocs.yml
index 0d4d2c7b..cd720a89 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -70,6 +70,7 @@ nav:
- Dialog box: examples/dialog_box.md
- HardPy launch: examples/hardpy_launch.md
- Hello HardPy: examples/hello_hardpy.md
+ - JSON storage: examples/json_storage.md
- Launch arguments: examples/launch_arguments.md
- Pytest logging: examples/logging.md
- Measurement: examples/measurement.md
diff --git a/pyproject.toml b/pyproject.toml
index 1339636d..df36ea1b 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -3,7 +3,7 @@
[project]
name = "hardpy"
- version = "0.19.1"
+ version = "0.20.0"
description = "HardPy library for device testing"
license = "GPL-3.0-or-later"
authors = [{ name = "Everypin", email = "info@everypin.io" }]
@@ -36,6 +36,7 @@
"tomli>=2.0.1, <3",
"py-machineid~=0.6.0",
"tzlocal~=5.2",
+ "uuid6",
# Frontend
"fastapi>=0.100.1",
diff --git a/tests/test_cli/test_hardpy_init.py b/tests/test_cli/test_hardpy_init.py
index beee5073..0f23878d 100644
--- a/tests/test_cli/test_hardpy_init.py
+++ b/tests/test_cli/test_hardpy_init.py
@@ -11,11 +11,13 @@
frontend_no_default_host = "localhost1"
frontend_no_default_port = "8001"
stand_cloud_no_default_addr = "everypin1.standcloud.localhost"
+storage_type_no_default = "json"
db_default_port = "5984"
frontend_default_host = "localhost"
frontend_default_port = "8000"
frontend_default_language = "en"
+storage_type_default = "couchdb"
def test_cli_init(tmp_path: Path):
@@ -112,6 +114,31 @@ def test_cli_init_db_port(tmp_path: Path):
";port = 5985" in content
), "couchdb.ini does not contain the expected port."
+def test_cli_init_storage_type_default(tmp_path: Path):
+ subprocess.run(
+ [*HARDPY_COMMAND, tmp_path],
+ check=True,
+ )
+ hardpy_toml_path = tmp_path / "hardpy.toml"
+ with Path.open(hardpy_toml_path) as f:
+ content = f.read()
+ storage_type_info = f"""storage_type = "{storage_type_default}"
+"""
+ assert_msg = "hardpy.toml does not contain the default storage type."
+ assert storage_type_info in content, assert_msg
+
+def test_cli_init_storage_type_no_default(tmp_path: Path):
+ subprocess.run(
+ [*HARDPY_COMMAND, tmp_path, "--storage-type", storage_type_no_default],
+ check=True,
+ )
+ hardpy_toml_path = tmp_path / "hardpy.toml"
+ with Path.open(hardpy_toml_path) as f:
+ content = f.read()
+ storage_type_info = f"""storage_type = "{storage_type_no_default}"
+"""
+ assert_msg = "hardpy.toml does not contain the valid storage type."
+ assert storage_type_info in content, assert_msg
def test_cli_init_frontend_host(tmp_path: Path):
subprocess.run(
diff --git a/tests/test_common/test_config.py b/tests/test_common/test_config.py
index 440d07c5..54a3e2bb 100644
--- a/tests/test_common/test_config.py
+++ b/tests/test_common/test_config.py
@@ -64,10 +64,12 @@ def test_config_manager_init():
sc_connection_only=stand_cloud_no_default_connection_only,
sc_autosync=stand_cloud_no_default_autosync,
sc_api_key=stand_cloud_no_default_api_key,
+ storage_type="json",
)
config = config_manager.config
assert isinstance(config, HardpyConfig)
assert config.tests_name == tests_no_default_name
+ assert config.database.storage_type == "json"
assert config.database.user == db_no_default_user
assert config.database.password == db_no_default_password
assert config.database.host == db_no_default_host
@@ -157,6 +159,7 @@ def test_config_manager_create_config(tmp_path: Path):
sc_connection_only=stand_cloud_dafault_connection_only,
sc_autosync=stand_cloud_default_autosync,
sc_api_key=stand_cloud_default_api_key,
+ storage_type="json",
)
config_manager.create_config(tests_dir)
diff --git a/tests/test_plugin/conftest.py b/tests/test_plugin/conftest.py
index b386beb8..1d9b5f7f 100644
--- a/tests/test_plugin/conftest.py
+++ b/tests/test_plugin/conftest.py
@@ -7,18 +7,26 @@
pytest_plugins = "pytester"
-@pytest.fixture
-def hardpy_opts():
+@pytest.fixture(params=["couchdb", "json"], autouse=True)
+def hardpy_opts(request): # noqa: ANN001
config_manager = ConfigManager()
- config_data = config_manager.read_config(
- Path(__file__).parent.resolve(),
- )
- if not config_data:
- msg = "Config not found"
- raise RuntimeError(msg)
- return [
- "--hardpy-clear-database",
- "--hardpy-db-url",
- config_data.database.url,
- "--hardpy-pt",
- ]
+ if request.param == "couchdb":
+ config_data = config_manager.read_config(
+ Path(__file__).parent.resolve(),
+ )
+ if not config_data:
+ msg = "Config not found"
+ raise RuntimeError(msg)
+
+ return [
+ "--hardpy-clear-database",
+ "--hardpy-db-url",
+ config_data.database.url,
+ "--hardpy-pt",
+ ]
+ if request.param == "json":
+ config_data = config_manager.read_config(
+ Path(__file__).parent / "json_toml",
+ )
+ return [ "--hardpy-clear-database", "--hardpy-pt"]
+ return None
diff --git a/tests/test_plugin/hardpy.toml b/tests/test_plugin/hardpy.toml
index c91e7d77..8f0c2ad0 100644
--- a/tests/test_plugin/hardpy.toml
+++ b/tests/test_plugin/hardpy.toml
@@ -1,6 +1,7 @@
title = "HardPy TOML config"
[database]
+storage_type = "couchdb"
user = "dev"
password = "dev"
host = "localhost"
diff --git a/tests/test_plugin/json_toml/hardpy.toml b/tests/test_plugin/json_toml/hardpy.toml
new file mode 100644
index 00000000..86d5114b
--- /dev/null
+++ b/tests/test_plugin/json_toml/hardpy.toml
@@ -0,0 +1,18 @@
+title = "HardPy TOML config"
+
+[database]
+storage_type = "json"
+
+[frontend]
+host = "localhost"
+port = 8000
+language = "en"
+full_size_button = false
+sound_on = false
+measurement_display = true
+manual_collect = false
+
+[frontend.modal_result]
+enable = false
+auto_dismiss_pass = true
+auto_dismiss_timeout = 5