From 22da38e328f5cd54c42cb2dc753ecd60bb2ee586 Mon Sep 17 00:00:00 2001 From: "sriram.puttagunta" Date: Wed, 7 Jan 2026 11:42:57 +0530 Subject: [PATCH 1/3] Added Semantic Dedup Graph Post Processor --- sygra/core/dataset/dataset_processor.py | 25 ++- sygra/core/graph/graph_postprocessor.py | 167 ++++++++++++++++++ .../examples/semantic_dedup/graph_config.yaml | 46 +++++ tasks/examples/semantic_dedup/input.json | 18 ++ .../semantic_dedup_no_seed/graph_config.yaml | 82 +++++++++ .../semantic_dedup_no_seed/task_executor.py | 39 ++++ 6 files changed, 373 insertions(+), 4 deletions(-) create mode 100644 tasks/examples/semantic_dedup/graph_config.yaml create mode 100644 tasks/examples/semantic_dedup/input.json create mode 100644 tasks/examples/semantic_dedup_no_seed/graph_config.yaml create mode 100644 tasks/examples/semantic_dedup_no_seed/task_executor.py diff --git a/sygra/core/dataset/dataset_processor.py b/sygra/core/dataset/dataset_processor.py index c1615f2a..5990267a 100644 --- a/sygra/core/dataset/dataset_processor.py +++ b/sygra/core/dataset/dataset_processor.py @@ -438,10 +438,27 @@ async def _process_and_store_results(self): with open(output_file, "r") as f: output_data = json.load(f) for post_processor in post_processors: - metadata = {"output_file": output_file} - processor = utils.get_func_from_str(post_processor) - processor_name = post_processor.split(".")[-1] - processed_output_data = processor().process(output_data, metadata) + processor_path = None + processor_params: dict[str, Any] = {} + if isinstance(post_processor, str): + processor_path = post_processor + elif isinstance(post_processor, dict): + processor_path = post_processor.get("processor") + processor_params = post_processor.get("params", {}) or {} + else: + raise ValueError( + "Invalid graph_post_process entry. Must be a string or dict with 'processor'." + ) + + if not processor_path: + raise ValueError("graph_post_process processor path is missing") + + metadata = {"output_file": output_file, "params": processor_params} + processor_cls = utils.get_func_from_str(processor_path) + processor_name = processor_path.split(".")[-1] + processed_output_data = processor_cls(**processor_params).process( + output_data, metadata + ) new_output_file = output_file[: output_file.rfind("/") + 1] + output_file[ output_file.rfind("/") + 1 : ].replace("output_", processor_name + "_", 1) diff --git a/sygra/core/graph/graph_postprocessor.py b/sygra/core/graph/graph_postprocessor.py index 772a85f4..65950eed 100644 --- a/sygra/core/graph/graph_postprocessor.py +++ b/sygra/core/graph/graph_postprocessor.py @@ -1,4 +1,14 @@ +import json +import os from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, Any, Optional, cast + +import numpy as np + +from sygra.logger.logger_config import logger + +if TYPE_CHECKING: + from sentence_transformers import SentenceTransformer class GraphPostProcessor(ABC): @@ -11,3 +21,160 @@ class GraphPostProcessor(ABC): def process(self, data: list, metadata: dict) -> list: # implement post processing logic with whole data, return the final data list pass + + +class SemanticDedupPostProcessor(GraphPostProcessor): + def __init__( + self, + field: str = "text", + similarity_threshold: float = 0.9, + id_field: str = "id", + embedding_backend: str = "sentence_transformers", + embedding_model: str = "all-MiniLM-L6-v2", + report_filename: Optional[str] = None, + keep: str = "first", + max_pairs_in_report: int = 2000, + ): + self.field = field + self.similarity_threshold = float(similarity_threshold) + self.id_field = id_field + self.embedding_backend = embedding_backend + self.embedding_model = embedding_model + self.report_filename = report_filename + self.keep = keep + self.max_pairs_in_report = int(max_pairs_in_report) + + self._embedder: Optional["SentenceTransformer"] = None + + def _get_embedder(self) -> "SentenceTransformer": + if self.embedding_backend == "sentence_transformers": + if self._embedder is None: + from sentence_transformers import SentenceTransformer + + self._embedder = SentenceTransformer(self.embedding_model) + return self._embedder + + raise ValueError(f"Unsupported embedding_backend: {self.embedding_backend}") + + @staticmethod + def _cosine_sim_matrix(a: np.ndarray) -> np.ndarray: + a = a / (np.linalg.norm(a, axis=1, keepdims=True) + 1e-12) + return cast(np.ndarray, a @ a.T) + + def _get_text(self, item: Any) -> str: + if not isinstance(item, dict): + return str(item) + v = item.get(self.field, "") + if v is None: + return "" + if isinstance(v, (list, tuple)): + return "\n".join("" if x is None else str(x) for x in v) + return str(v) + + def _get_id(self, item: Any, fallback: str) -> str: + if isinstance(item, dict) and self.id_field in item and item[self.id_field] is not None: + return str(item[self.id_field]) + return fallback + + def _resolve_report_path(self, output_file: str) -> str: + if self.report_filename: + if os.path.isabs(self.report_filename): + return self.report_filename + return os.path.join(os.path.dirname(output_file), self.report_filename) + + base = os.path.basename(output_file) + # output_*.json -> semantic_dedup_report_*.json + report_base = base.replace("output_", "semantic_dedup_report_", 1) + return os.path.join(os.path.dirname(output_file), report_base) + + def process(self, data: list, metadata: dict) -> list: + if not data: + return data + + output_file = str(metadata.get("output_file", "")) + logger.info( + "SemanticDedupPostProcessor: field=%s threshold=%s n=%s", + self.field, + self.similarity_threshold, + len(data), + ) + + texts = [self._get_text(item) for item in data] + embedder = self._get_embedder() + embs = embedder.encode(texts, normalize_embeddings=True) + embs = np.asarray(embs, dtype=np.float32) + + sims = self._cosine_sim_matrix(embs) + + kept: list[int] = [] + dropped: set[int] = set() + duplicate_pairs: list[dict[str, Any]] = [] + + def should_keep(i: int, j: int) -> bool: + # i and j are indices where sim(i, j) is above threshold and j is candidate duplicate. + # keep='first' means keep earlier index, drop later one. + if self.keep == "first": + return i < j + if self.keep == "last": + return i > j + return i < j + + # Greedy pass: keep an item if it isn't too-similar to a previously kept one + for i in range(len(data)): + if i in dropped: + continue + + kept.append(i) + # mark duplicates of i + row = sims[i] + dup_indices = np.where(row >= self.similarity_threshold)[0] + for j in dup_indices: + if j == i: + continue + if j in dropped: + continue + if j in kept: + continue + + # Decide drop based on keep strategy + if should_keep(i, int(j)): + dropped.add(int(j)) + if len(duplicate_pairs) < self.max_pairs_in_report: + duplicate_pairs.append( + { + "kept_index": i, + "dropped_index": int(j), + "kept_id": self._get_id(data[i], str(i)), + "dropped_id": self._get_id(data[int(j)], str(j)), + "similarity": float(row[int(j)]), + } + ) + + deduped = [data[i] for i in kept] + + report = { + "processor": "SemanticDedupPostProcessor", + "field": self.field, + "id_field": self.id_field, + "embedding_backend": self.embedding_backend, + "embedding_model": self.embedding_model, + "similarity_threshold": self.similarity_threshold, + "keep": self.keep, + "input_count": len(data), + "output_count": len(deduped), + "dropped_count": len(data) - len(deduped), + "pairs_reported": len(duplicate_pairs), + "max_pairs_in_report": self.max_pairs_in_report, + "duplicates": duplicate_pairs, + } + + if output_file: + try: + report_path = self._resolve_report_path(output_file) + with open(report_path, "w") as f: + json.dump(report, f, indent=4) + logger.info("Semantic dedup report written to %s", report_path) + except Exception as e: + logger.error("Failed to write semantic dedup report: %s", e) + + return deduped diff --git a/tasks/examples/semantic_dedup/graph_config.yaml b/tasks/examples/semantic_dedup/graph_config.yaml new file mode 100644 index 00000000..0af1888a --- /dev/null +++ b/tasks/examples/semantic_dedup/graph_config.yaml @@ -0,0 +1,46 @@ +data_config: + source: + type: "disk" + file_path: "tasks/examples/semantic_dedup_demo/input.json" + file_format: "json" + +graph_config: + nodes: + respond: + node_type: llm + output_keys: answer + prompt: + - system: | + You are a helpful assistant. + - user: | + {prompt} + model: + name: gpt-4o-mini + parameters: + temperature: 0.7 + + edges: + - from: START + to: respond + - from: respond + to: END + +graph_post_process: + - processor: sygra.core.graph.graph_postprocessor.SemanticDedupPostProcessor + params: + field: answer + similarity_threshold: 0.92 + id_field: id + embedding_backend: sentence_transformers + embedding_model: all-MiniLM-L6-v2 + keep: first + max_pairs_in_report: 1000 + +output_config: + output_map: + id: + from: id + prompt: + from: prompt + answer: + from: answer diff --git a/tasks/examples/semantic_dedup/input.json b/tasks/examples/semantic_dedup/input.json new file mode 100644 index 00000000..2a5d0633 --- /dev/null +++ b/tasks/examples/semantic_dedup/input.json @@ -0,0 +1,18 @@ +[ + { + "id": "1", + "prompt": "Write a 2-sentence summary of why unit tests are useful." + }, + { + "id": "2", + "prompt": "In two sentences, explain the benefits of unit testing." + }, + { + "id": "3", + "prompt": "Give a brief description of what unit tests are and why they matter." + }, + { + "id": "4", + "prompt": "Explain what a database index is." + } +] diff --git a/tasks/examples/semantic_dedup_no_seed/graph_config.yaml b/tasks/examples/semantic_dedup_no_seed/graph_config.yaml new file mode 100644 index 00000000..ebd4d7ad --- /dev/null +++ b/tasks/examples/semantic_dedup_no_seed/graph_config.yaml @@ -0,0 +1,82 @@ +graph_config: + nodes: + generate_attributes: + node_type: weighted_sampler + attributes: + category: + values: ["Email issues", "Network Issue", "Printer issues"] + domain: + values: ["ITSM (Information Technology Service Management) - A business function that involves managing IT services and process to meet business needs effectively."] + data_definition: + values: ["An incident is unplanned interruption or reduction in the quality or service, ie, something is broken or not working as expected. Incident management covers the entire lifecycle of an incident form its detection until its resolution and closure."] + metadata: + values: ["{{\"short_description\":\"Provide a brief text to capture the essence of the request. Keep it within 100 characters. Type = String\",\"description\":\"A detailed account of the incident, including comprehensive information about the issue, steps to reproduce, and any relevant context. Type = String\"}}"] + generate_incident_data: + node_type: llm + post_process: tasks.examples.semantic_dedup_no_seed.task_executor.GenerateIncidentDataPostProcessor + pre_process: tasks.examples.semantic_dedup_no_seed.task_executor.GenerateIncidentDataPreProcessor + output_keys: + - short_description + - description + prompt: + - user: | + Unique ID: {unique_id} + + Follow these steps: + - Understand the Data Definition: Review the detailed description of the data to ensure it is realistic and meaningful. + - Understand the category: Review the category related to domain provided and generate meaningful data related to the category. + - Review the Metadata: Check the metadata carefully, including column names, data types, and descriptions. This will guide the generation of the data for each column. + - Generate the Data: Create values for each column as specified in the metadata. Ensure that the generated data is complete, realistic, consistent + - Maintain Column Integrity: Ensure that column names and types exactly match those specified in the metadata + Generate data for the following columns as described in the metadata specific to the domain mentioned. + + Domain: {domain} + Category: {category} + Data Definition: {data_definition} + Metadata: {metadata} + + Ensure that the output is complete and not truncated in JSON data. + {{ \"short_description\": \"<>\", \"description\": \"<>\" }} + + model: + name: gpt-4o-mini + parameters: + temperature: 0.8 + structured_output: + schema: + fields: + short_description: + type: str + description: "Brief text to capture the essence of the request" + description: + type: str + description: "Detailed account of the incident, including comprehensive information about the issue, steps to reproduce, and any relevant context." + edges: + - from: START + to: generate_attributes + - from: generate_attributes + to: generate_incident_data + - from: generate_incident_data + to: END + +graph_post_process: + - processor: sygra.core.graph.graph_postprocessor.SemanticDedupPostProcessor + params: + field: description + similarity_threshold: 0.85 + id_field: id + embedding_backend: sentence_transformers + embedding_model: all-MiniLM-L6-v2 + keep: first + max_pairs_in_report: 1000 + +output_config: + output_map: + short_description: + from: "short_description" + "description": + from: "description" + "category": + from: "category" + "domain": + from: "domain" diff --git a/tasks/examples/semantic_dedup_no_seed/task_executor.py b/tasks/examples/semantic_dedup_no_seed/task_executor.py new file mode 100644 index 00000000..520ae36e --- /dev/null +++ b/tasks/examples/semantic_dedup_no_seed/task_executor.py @@ -0,0 +1,39 @@ +import json + +import regex +from sygra.logger.logger_config import logger +from sygra.core.graph.functions.node_processor import NodePostProcessorWithState, NodePreProcessor +from sygra.core.graph.sygra_message import SygraMessage +from sygra.core.graph.sygra_state import SygraState + +import uuid + + +def parse_response_as_json(s): + JSON_REGEX_PATTERN = regex.compile(r"\{(?:[^{}]|(?R))*\}") + try: + return json.loads(s) + except json.decoder.JSONDecodeError as e: + p = JSON_REGEX_PATTERN.search(s) + if not p: + logger.error("No json string found: " + e.msg) + logger.error(s) + try: + return json.loads(p[0]) + except json.decoder.JSONDecodeError as e: + logger.error("Unable to parse json string: " + e.msg) + logger.error(s) + + +class GenerateIncidentDataPostProcessor(NodePostProcessorWithState): + def apply(self, resp: SygraMessage, state: SygraState) -> SygraState: + incident = parse_response_as_json(resp.message.content) + state["short_description"] = incident.get("short_description", "") + state["description"] = incident.get("description", "") + return state + + +class GenerateIncidentDataPreProcessor(NodePreProcessor): + def apply(self, state: SygraState) -> SygraState: + state["unique_id"] = str(uuid.uuid4()) + return state From 744b9185314c6e02512aa0e01e7d78534688bafa Mon Sep 17 00:00:00 2001 From: "sriram.puttagunta" Date: Mon, 12 Jan 2026 16:42:15 +0530 Subject: [PATCH 2/3] Added ANN dedup, test cases and documentation --- docs/features/semantic_deduplication.md | 132 ++++++++++++ docs/getting_started/graph_config_guide.md | 112 +++++----- sygra/core/graph/graph_postprocessor.py | 204 ++++++++++++++---- .../examples/semantic_dedup/graph_config.yaml | 3 +- .../semantic_dedup_no_seed/task_executor.py | 2 + tests/core/graph/test_graph_postprocessor.py | 154 +++++++++++++ 6 files changed, 518 insertions(+), 89 deletions(-) create mode 100644 docs/features/semantic_deduplication.md create mode 100644 tests/core/graph/test_graph_postprocessor.py diff --git a/docs/features/semantic_deduplication.md b/docs/features/semantic_deduplication.md new file mode 100644 index 00000000..d998fc52 --- /dev/null +++ b/docs/features/semantic_deduplication.md @@ -0,0 +1,132 @@ +# Semantic Deduplication + +> **Remove near-duplicate generated records using embedding-based similarity as a graph post-processor** + +## Overview + +SyGra supports semantic deduplication as a **graph post-processing** step via: + +`sygra.core.graph.graph_postprocessor.SemanticDedupPostProcessor` + +It embeds a configured output field (e.g., `answer`, `description`) and removes items whose **cosine similarity** is above a configurable threshold. + +This is useful when: + +- Your generation workflow tends to repeat the same/very similar answers. +- You are generating multiple records and want to reduce redundant samples. +- You want a report of duplicate pairs to inspect or tune dedup behavior. + +## Quick Start + +Add the post processor under `graph_post_process` in your task `graph_config.yaml`. + +Example (dedup over `answer`, see `tasks/examples/semantic_dedup/graph_config.yaml`): + +```yaml +graph_post_process: + - processor: sygra.core.graph.graph_postprocessor.SemanticDedupPostProcessor + params: + field: answer + similarity_threshold: 0.92 + id_field: id + embedding_backend: sentence_transformers + embedding_model: all-MiniLM-L6-v2 + dedup_mode: nearest_neighbor + vectorstore_k: 20 + keep: first + max_pairs_in_report: 1000 +``` + +Example (dedup over `description`, see `tasks/examples/semantic_dedup_no_seed/graph_config.yaml`): + +```yaml +graph_post_process: + - processor: sygra.core.graph.graph_postprocessor.SemanticDedupPostProcessor + params: + field: description + similarity_threshold: 0.85 + id_field: id + embedding_backend: sentence_transformers + embedding_model: all-MiniLM-L6-v2 + keep: first + max_pairs_in_report: 1000 +``` + +## Configuration Reference + +### Parameters + +All parameters are provided under `params:`. + +| Parameter | Type | Description | Default | +|-----------|------|-------------|---------| +| `field` | string | Field to embed and compare for similarity. If the field value is a list/tuple, values are joined with newlines. | `text` | +| `similarity_threshold` | float | Cosine similarity threshold. Higher values drop fewer items. | `0.9` | +| `id_field` | string | Optional ID field used in the report for readability. If missing, indices are used. | `id` | +| `embedding_backend` | string | Embedding backend. Currently only `sentence_transformers` is supported. | `sentence_transformers` | +| `embedding_model` | string | SentenceTransformers model name to use for embeddings. | `all-MiniLM-L6-v2` | +| `report_filename` | string | Optional report JSON filename. If relative, it is written next to the graph output file. If omitted, the report name is derived from the output file name. | (derived) | +| `keep` | string | Which item to keep when duplicates are found: `first` or `last`. | `first` | +| `max_pairs_in_report` | int | Max number of duplicate pairs written to the report. | `2000` | +| `dedup_mode` | string | Dedup implementation to use: `nearest_neighbor` (default) or `all_pairs`. Any other value is unsupported and will raise an error. `nearest_neighbor` avoids building a full similarity matrix by only comparing against nearest neighbors / kept items. `all_pairs` computes a full similarity matrix (exact, but O(n^2)). | `nearest_neighbor` | +| `vectorstore_k` | int | Number of nearest neighbors to retrieve/consider when `dedup_mode: nearest_neighbor`. | `20` | + +### How dedup is applied + +- A greedy pass keeps an item if it is not too similar to a previously kept one. +- Similarity is computed via cosine similarity over normalized embeddings. +- `keep: first` keeps the earlier item, `keep: last` prefers the later item. + +## Output report + +If SyGra provides `metadata["output_file"]` at runtime, the post processor writes a JSON report next to the output file. + +### Report naming + +- If `report_filename` is provided: + - absolute paths are used as-is + - relative paths are resolved relative to the output directory +- Otherwise, the report filename is derived from the output filename: + - `output_*.json` -> `semantic_dedup_report_*.json` + +### Report format (high level) + +The report includes: + +- `input_count`, `output_count`, `dropped_count` +- configuration (`field`, `similarity_threshold`, `embedding_model`, etc.) +- a bounded list of duplicate pairs under `duplicates` + +Each entry in `duplicates` contains: + +- `kept_index`, `dropped_index` +- `kept_id`, `dropped_id` +- `similarity` + +## Dependencies + +When using `embedding_backend: sentence_transformers`, this feature requires the `sentence-transformers` package to be available in your environment. + +## Performance considerations + +When `dedup_mode: nearest_neighbor` (default), dedup runs incrementally and does not build a full similarity matrix. This is typically faster and uses less memory for larger outputs. + +When `dedup_mode: all_pairs`, the implementation computes a full similarity matrix (**O(n^2)** time/memory), so it is intended for **relatively small** output lists. + +If you plan to deduplicate very large outputs, consider: + +- generating in smaller batches +- using a higher threshold to reduce comparisons +- implementing an approximate/streaming dedup strategy + +## Troubleshooting + +### Unsupported embedding backend + +If you set `embedding_backend` to anything other than `sentence_transformers`, SyGra will raise: + +`ValueError: Unsupported embedding_backend: ...` + +### No report is written + +A report is only written if `metadata["output_file"]` is present. If you are running in a context where SyGra does not set it, the post processor will still deduplicate in-memory but will not persist the report. diff --git a/docs/getting_started/graph_config_guide.md b/docs/getting_started/graph_config_guide.md index e275ff9c..06c82cfb 100644 --- a/docs/getting_started/graph_config_guide.md +++ b/docs/getting_started/graph_config_guide.md @@ -31,7 +31,7 @@ output_config: # Output generation configuration schema_config: - # Output schema validation + # Output schema validation graph_post_process: # Graph post processing @@ -69,7 +69,7 @@ data_config: mapping: task_id: id # Rename 'task_id' field to 'id' overwrite: false # Don't overwrite existing fields - + # Optional sink configuration for where to store output data sink: # Example 1: HuggingFace dataset sink @@ -77,9 +77,9 @@ data_config: repo_id: "output-dataset/synthetic-mbpp" # Where to upload the data split: "train" # Split to write to private: true # Create a private dataset - + # OR - + # Example 2: Local file sink type: "json" # File format (json, jsonl, csv, parquet) file_path: "/path/to/output/file.json" # Path to save the file @@ -120,7 +120,7 @@ Transformations allow you to modify the input data before processing. | `transform` | string | Fully qualified path to a transformation class | | `params` | object | Parameters for the transformation | -#### Some of the available transformations are: +#### Some of the available transformations are: #### RenameFieldsTransform It renames the fields in the dataset, so the prompt variables used are meaningful and reusable. The Below example shows how the `page` is renamed to `id`, `llm_extract` is renamed to `text` and `type` is renamed to `text_format`. @@ -135,8 +135,8 @@ The Below example shows how the `page` is renamed to `id`, `llm_extract` is rena #### CombineRecords When you want to combine records to form a new dataset, you can use this transformation. Below example shows how we can skip 10 records from beginning and from end, and combine 2 records by shifting 1. -For example record `11` and `12` will be combined to form `page`=`11-12`, in this example, `pdf_reader` and `llm_extract` columns are combined with two new lines. -And `type`, `model`, `metadata` is just picking data from first record. `$1` denotes first record, `$2` denotes second record and so on. +For example record `11` and `12` will be combined to form `page`=`11-12`, in this example, `pdf_reader` and `llm_extract` columns are combined with two new lines. +And `type`, `model`, `metadata` is just picking data from first record. `$1` denotes first record, `$2` denotes second record and so on. Once `11` and `12` is combined to form `11-12`, it shift by 1 and combines `12` with `13` to form `12-13`. ```yaml - transform: sygra.processors.data_transform.CombineRecords @@ -209,7 +209,7 @@ To generate data without a source: ```yaml data_config: # No source configuration - + # Only sink configuration sink: type: "json" @@ -319,7 +319,7 @@ class ShouldContinueCondition(EdgeCondition): | Parameter | Type | Description | |-----------|------|-------------| -| `from` | string | Source node name (can be a regular node or START) | +| `from` | string | Source node name (can be a regular node or START) | | `to` | string | Target node name (can be a regular node or END) | | `condition` | string | Fully qualified path to a condition class or function (for conditional edges) | | `path_map` | object | Map of condition results to target node names (for conditional edges) | @@ -433,7 +433,7 @@ However, datasource is already mentioned in the current YAML file. output_map va \$ variables are only supported under `value` key. Below example shows how a dictionary value can have \$ variables as dictionary values, -list values and direct string value. +list values and direct string value. It can read the path with dot format, also supports list with subscript operator. ```yaml @@ -467,27 +467,27 @@ class CodeGenOutputGenerator(BaseOutputGenerator): def build_conversation(self, data: Any, state: dict[str, Any]) -> Any: """ Transform messages into a conversation format - + Args: data: The value from the state (from the 'from' field) state: The entire graph state - + Returns: The transformed value """ chat_format_messages = utils.convert_messages_from_langchain_to_chat_format(data) - + # Example transformation logic: if chat_format_messages and "no more feedback" in chat_format_messages[-1]["content"].lower(): # Remove the last message with "no more feedback" chat_format_messages = chat_format_messages[:-1] - + # Add additional messages or modify existing ones if "rephrased_text" in state and state["rephrased_text"]: # output keys can be directly accessed from state question = state["rephrased_text"].replace("PARAPHRASED QUESTION: ", "") chat_format_messages.insert(0, {"role": "user", "content": question}) - + return chat_format_messages ``` @@ -500,17 +500,17 @@ class CustomOutputGenerator(BaseOutputGenerator): def generate(self, state: SygraState) -> dict[str, Any]: """ Create a custom output record from the graph state - + Args: state: The final graph state - + Returns: The output record as a dictionary """ # Custom logic to build the output record if "messages" not in state: return None # Skip records that don't have messages - + # Build your output record with custom logic record = { "id": state.get("id", ""), @@ -518,13 +518,13 @@ class CustomOutputGenerator(BaseOutputGenerator): "metadata": self._build_metadata(state), # Other fields... } - + return record - + def _process_conversation(self, messages): # Helper method for processing messages # ... - + def _build_metadata(self, state): # Helper method for building metadata # ... @@ -564,44 +564,44 @@ graph_config: values: [high school teacher, college professor, software engineer] paraphrase_question: - node_type: llm + node_type: llm output_keys: rephrased_text prompt: - system: | Assume you are {persona1} persona. You are an assistant tasked with paraphrasing a user question. - user: | - QUESTION: {prompt}. Write the program in python. - model: - name: mistralai - parameters: + QUESTION: {prompt}. Write the program in python. + model: + name: mistralai + parameters: temperature: 1.0 generate_answer: - node_type: llm + node_type: llm prompt: - system: | You are an assistant tasked with solving python coding problems. - user: | - {prompt} - model: + {prompt} + model: name: gpt-4o # Must match a model defined in config/models.yaml parameters: # Override default parameters from models.yaml temperature: 0.1 - + critique_answer: pre_process: tasks.mbpp.code_generation_with_graph_builder.task_executor.CritiqueAnsNodePreProcessor - node_type: llm - output_role: user - prompt: - - system: | + node_type: llm + output_role: user + prompt: + - system: | You are a teacher grading a solution to a python coding problem. - - QUESTION: {prompt} + + QUESTION: {prompt} TEST CASES: {test_list} - model: + model: name: gpt-4o - parameters: + parameters: temperature: 1.0 edges: @@ -647,17 +647,17 @@ output_config: Schema validator enables users to ensure correctness of generated data before uploading to HF or File System. -Key features supported for schema validation are as follows: +Key features supported for schema validation are as follows: 1. **YAML based schema check:** Users can define their schema using YAML config files in the following ways:- - Define a custom schema class inside `custom_schemas.py` and add it's path in `schema` key inside `schema_config`. - Add expected schema config in a list of dict format inside `fields` key inside `schema_config`. - + 2. **Rule based validation support:** Aside from adding validator rules inside custom class, users can choose from validation methods supported(details in additional validation rules section) and add it as a key for a particular field's dict. - + ### Usage Illustration -Let's assume we have the following record generated which we want to validate: +Let's assume we have the following record generated which we want to validate: ```json { @@ -691,8 +691,8 @@ Let's assume we have the following record generated which we want to validate: ] } ``` -For the above record, user can have the following class defined inside `custom_schemas.py` defining the -expected keys and values along with additional validation rules if any. +For the above record, user can have the following class defined inside `custom_schemas.py` defining the +expected keys and values along with additional validation rules if any. ```python class CustomUserSchema(BaseModel): @@ -719,7 +719,7 @@ class CustomUserSchema(BaseModel): schema_config: schema: sygra.validators.custom_schemas.CustomUserSchema ``` -#### Sample YAML configuration to define schema in YAML: +#### Sample YAML configuration to define schema in YAML: ```yaml schema_config: @@ -739,12 +739,12 @@ schema_config: type: list[str] ``` Note that `fields` is expected to be a list of dicts with `name` and `type` present in each dict with additional option -of providing validation key. In the above example `is_greater_than` is a validation key shown for demonstration purpose -to ensure `id` key in each record has a value with 6 digits or more. +of providing validation key. In the above example `is_greater_than` is a validation key shown for demonstration purpose +to ensure `id` key in each record has a value with 6 digits or more. ## Post Generation Tasks -Post generation tasks are tasks that are executed after the graph has been executed. These tasks can be used to perform additional processing on the generated data, such as **OASST Mapper** and **Data Quality** tagging. +Post generation tasks are tasks that are executed after the graph has been executed. These tasks can be used to perform additional processing on the generated data, such as **OASST Mapper** and **Data Quality** tagging. ### `Data mapper` or `oasst_mapper` @@ -799,3 +799,19 @@ class StatsCollatorPostProcessor(GraphPostProcessor): Each post processor persists the the processed data into file with name prefixed with the name of the post processor. For example Stats file name for `StatsCollatorPostProcessor` mentioned above will be prefixed with `StatsCollatorPostProcessor_.*` +## Semantic Deduplication + +Semantic deduplication removes near-duplicate generated outputs by comparing embeddings of a configured field and dropping items whose cosine similarity is above a threshold. + +Feature documentation: +[Semantic Deduplication](../features/semantic_deduplication.md) + +Minimal configuration: + +```yaml +graph_post_process: + - processor: sygra.core.graph.graph_postprocessor.SemanticDedupPostProcessor + params: + field: answer + similarity_threshold: 0.92 +``` diff --git a/sygra/core/graph/graph_postprocessor.py b/sygra/core/graph/graph_postprocessor.py index 65950eed..f34b6b3b 100644 --- a/sygra/core/graph/graph_postprocessor.py +++ b/sygra/core/graph/graph_postprocessor.py @@ -4,6 +4,8 @@ from typing import TYPE_CHECKING, Any, Optional, cast import numpy as np +from langchain_core.embeddings import Embeddings +from langchain_core.vectorstores import InMemoryVectorStore from sygra.logger.logger_config import logger @@ -24,6 +26,35 @@ def process(self, data: list, metadata: dict) -> list: class SemanticDedupPostProcessor(GraphPostProcessor): + """Semantic deduplication over a graph's output list. + + This post-processor removes near-duplicate items by embedding the configured + `field` and dropping items whose cosine similarity is above + `similarity_threshold`. + + Configuration is done via `graph_post_process` in the task `graph_config.yaml`. + Two working examples are in: + - `tasks/examples/semantic_dedup/graph_config.yaml` (dedup over `answer`) + - `tasks/examples/semantic_dedup_no_seed/graph_config.yaml` (dedup over `description`) + + Parameters (YAML `params` -> constructor args): + - `field`: Name of the key in each output item to embed and compare. + If the value is a list/tuple, values are joined with newlines. + - `similarity_threshold`: Cosine similarity threshold (higher => fewer drops). + - `id_field`: Optional ID key used in the report for readability. + - `embedding_backend`: Currently only `sentence_transformers` is supported. + - `embedding_model`: SentenceTransformers model name (default: `all-MiniLM-L6-v2`). + - `report_filename`: Optional report JSON filename. If relative, it is written + next to the graph output file. If omitted, the report name is derived from + the output file name. + - `keep`: Which item to keep when duplicates are found: `first` or `last`. + - `max_pairs_in_report`: Max number of duplicate pairs written to the report. + + Output report: + - If `metadata["output_file"]` is set, a JSON report is written containing + counts and sampled duplicate pairs. + """ + def __init__( self, field: str = "text", @@ -34,6 +65,8 @@ def __init__( report_filename: Optional[str] = None, keep: str = "first", max_pairs_in_report: int = 2000, + dedup_mode: str = "nearest_neighbor", + vectorstore_k: int = 20, ): self.field = field self.similarity_threshold = float(similarity_threshold) @@ -43,6 +76,8 @@ def __init__( self.report_filename = report_filename self.keep = keep self.max_pairs_in_report = int(max_pairs_in_report) + self.dedup_mode = dedup_mode + self.vectorstore_k = int(vectorstore_k) self._embedder: Optional["SentenceTransformer"] = None @@ -87,6 +122,78 @@ def _resolve_report_path(self, output_file: str) -> str: report_base = base.replace("output_", "semantic_dedup_report_", 1) return os.path.join(os.path.dirname(output_file), report_base) + def _dedup_via_langchain_vectorstore( + self, + data: list, + texts: list[str], + embs: np.ndarray, + ) -> tuple[list, list[int], list[dict[str, Any]]]: + + if self.keep == "last": + iter_indices = range(len(data) - 1, -1, -1) + else: + iter_indices = range(len(data)) + + # Use unique keys as "texts" so the embedding cache is unambiguous even if + # multiple records share identical content. + keys = [f"__sygra_semantic_dedup__{i}__" for i in range(len(texts))] + vec_map: dict[str, np.ndarray] = {keys[i]: embs[i] for i in range(len(keys))} + + class _CachedEmbeddings(Embeddings): + def embed_documents(self, texts: list[str]) -> list[list[float]]: + return [[float(x) for x in vec_map[t]] for t in texts] + + def embed_query(self, text: str) -> list[float]: + return [float(x) for x in vec_map[text]] + + store = InMemoryVectorStore(embedding=_CachedEmbeddings()) + + kept: list[int] = [] + duplicate_pairs: list[dict[str, Any]] = [] + + # Incrementally add only kept items to the store. + for i in iter_indices: + key = keys[i] + if kept: + try: + docs = store.similarity_search(key, k=max(1, self.vectorstore_k)) + except Exception: + docs = [] + + best_sim = -1.0 + best_kept_idx: Optional[int] = None + for d in docs: + idx = d.metadata.get("idx") + if idx is None: + continue + sim = float(embs[int(idx)] @ embs[i]) + if sim > best_sim: + best_sim = sim + best_kept_idx = int(idx) + + if best_kept_idx is not None and best_sim >= self.similarity_threshold: + if len(duplicate_pairs) < self.max_pairs_in_report: + duplicate_pairs.append( + { + "kept_index": int(best_kept_idx), + "dropped_index": int(i), + "kept_id": self._get_id( + data[int(best_kept_idx)], str(best_kept_idx) + ), + "dropped_id": self._get_id(data[int(i)], str(i)), + "similarity": float(best_sim), + } + ) + continue + + store.add_texts([key], metadatas=[{"idx": int(i)}]) + + kept.append(i) + + kept_sorted = sorted(kept) if self.keep == "last" else kept + deduped = [data[i] for i in kept_sorted] + return deduped, kept_sorted, duplicate_pairs + def process(self, data: list, metadata: dict) -> list: if not data: return data @@ -104,53 +211,68 @@ def process(self, data: list, metadata: dict) -> list: embs = embedder.encode(texts, normalize_embeddings=True) embs = np.asarray(embs, dtype=np.float32) - sims = self._cosine_sim_matrix(embs) - - kept: list[int] = [] - dropped: set[int] = set() duplicate_pairs: list[dict[str, Any]] = [] - def should_keep(i: int, j: int) -> bool: - # i and j are indices where sim(i, j) is above threshold and j is candidate duplicate. - # keep='first' means keep earlier index, drop later one. - if self.keep == "first": - return i < j - if self.keep == "last": - return i > j - return i < j + if self.dedup_mode == "nearest_neighbor": + deduped, kept_sorted, duplicate_pairs = self._dedup_via_langchain_vectorstore( + data=data, + texts=texts, + embs=embs, + ) + elif self.dedup_mode == "all_pairs": + sims = self._cosine_sim_matrix(embs) - # Greedy pass: keep an item if it isn't too-similar to a previously kept one - for i in range(len(data)): - if i in dropped: - continue + kept: list[int] = [] + dropped: set[int] = set() - kept.append(i) - # mark duplicates of i - row = sims[i] - dup_indices = np.where(row >= self.similarity_threshold)[0] - for j in dup_indices: - if j == i: - continue - if j in dropped: - continue - if j in kept: + # Greedy pass: keep an item if it isn't too-similar to a previously kept one + if self.keep == "last": + iter_indices = range(len(data) - 1, -1, -1) + else: + iter_indices = range(len(data)) + + for i in iter_indices: + if i in dropped: continue - # Decide drop based on keep strategy - if should_keep(i, int(j)): - dropped.add(int(j)) - if len(duplicate_pairs) < self.max_pairs_in_report: - duplicate_pairs.append( - { - "kept_index": i, - "dropped_index": int(j), - "kept_id": self._get_id(data[i], str(i)), - "dropped_id": self._get_id(data[int(j)], str(j)), - "similarity": float(row[int(j)]), - } - ) + kept.append(i) + # mark duplicates of i + row = sims[i] + dup_indices = np.where(row >= self.similarity_threshold)[0] + for j in dup_indices: + if j == i: + continue + if j in dropped: + continue + if j in kept: + continue + + should_drop = False + if self.keep == "first": + should_drop = int(j) > i + elif self.keep == "last": + should_drop = int(j) < i + else: + should_drop = int(j) > i + + if should_drop: + dropped.add(int(j)) + if len(duplicate_pairs) < self.max_pairs_in_report: + duplicate_pairs.append( + { + "kept_index": i, + "dropped_index": int(j), + "kept_id": self._get_id(data[i], str(i)), + "dropped_id": self._get_id(data[int(j)], str(j)), + "similarity": float(row[int(j)]), + } + ) + + kept_sorted = sorted(kept) if self.keep == "last" else kept + deduped = [data[i] for i in kept_sorted] - deduped = [data[i] for i in kept] + else: + raise ValueError(f"Unsupported dedup_mode: {self.dedup_mode}") report = { "processor": "SemanticDedupPostProcessor", @@ -160,6 +282,8 @@ def should_keep(i: int, j: int) -> bool: "embedding_model": self.embedding_model, "similarity_threshold": self.similarity_threshold, "keep": self.keep, + "dedup_mode": self.dedup_mode, + "vectorstore_k": self.vectorstore_k, "input_count": len(data), "output_count": len(deduped), "dropped_count": len(data) - len(deduped), diff --git a/tasks/examples/semantic_dedup/graph_config.yaml b/tasks/examples/semantic_dedup/graph_config.yaml index 0af1888a..93024181 100644 --- a/tasks/examples/semantic_dedup/graph_config.yaml +++ b/tasks/examples/semantic_dedup/graph_config.yaml @@ -1,7 +1,7 @@ data_config: source: type: "disk" - file_path: "tasks/examples/semantic_dedup_demo/input.json" + file_path: "tasks/examples/semantic_dedup/input.json" file_format: "json" graph_config: @@ -30,6 +30,7 @@ graph_post_process: params: field: answer similarity_threshold: 0.92 + dedup_mode: all_pairs id_field: id embedding_backend: sentence_transformers embedding_model: all-MiniLM-L6-v2 diff --git a/tasks/examples/semantic_dedup_no_seed/task_executor.py b/tasks/examples/semantic_dedup_no_seed/task_executor.py index 520ae36e..feccadad 100644 --- a/tasks/examples/semantic_dedup_no_seed/task_executor.py +++ b/tasks/examples/semantic_dedup_no_seed/task_executor.py @@ -18,11 +18,13 @@ def parse_response_as_json(s): if not p: logger.error("No json string found: " + e.msg) logger.error(s) + return {} try: return json.loads(p[0]) except json.decoder.JSONDecodeError as e: logger.error("Unable to parse json string: " + e.msg) logger.error(s) + return {} class GenerateIncidentDataPostProcessor(NodePostProcessorWithState): diff --git a/tests/core/graph/test_graph_postprocessor.py b/tests/core/graph/test_graph_postprocessor.py new file mode 100644 index 00000000..803dba01 --- /dev/null +++ b/tests/core/graph/test_graph_postprocessor.py @@ -0,0 +1,154 @@ +import json +import sys +from pathlib import Path + +import numpy as np +import pytest + +sys.path.append(str(Path(__file__).parent.parent.parent.parent)) + +from sygra.core.graph.graph_postprocessor import SemanticDedupPostProcessor + + +class DummyEmbedder: + def __init__(self, vectors: list[list[float]]): + self._vectors = np.asarray(vectors, dtype=np.float32) + self.last_texts: list[str] | None = None + + def encode(self, texts: list[str], normalize_embeddings: bool = True): + self.last_texts = list(texts) + return self._vectors + + +class TestSemanticDedupPostProcessor: + def test_keep_first_drops_later_duplicate(self, monkeypatch): + processor = SemanticDedupPostProcessor(field="text", similarity_threshold=0.9, keep="first") + embedder = DummyEmbedder([[1.0, 0.0], [1.0, 0.0], [0.0, 1.0]]) + monkeypatch.setattr(processor, "_get_embedder", lambda: embedder) + + data = [ + {"id": "a", "text": "dup"}, + {"id": "b", "text": "dup"}, + {"id": "c", "text": "unique"}, + ] + + out = processor.process(data, metadata={}) + assert [x["id"] for x in out] == ["a", "c"] + + def test_keep_last_drops_earlier_duplicate(self, monkeypatch): + processor = SemanticDedupPostProcessor(field="text", similarity_threshold=0.9, keep="last") + embedder = DummyEmbedder([[1.0, 0.0], [1.0, 0.0], [0.0, 1.0]]) + monkeypatch.setattr(processor, "_get_embedder", lambda: embedder) + + data = [ + {"id": "a", "text": "dup"}, + {"id": "b", "text": "dup"}, + {"id": "c", "text": "unique"}, + ] + + out = processor.process(data, metadata={}) + assert [x["id"] for x in out] == ["b", "c"] + + def test_matrix_mode_works(self, monkeypatch): + processor = SemanticDedupPostProcessor( + field="text", + similarity_threshold=0.9, + keep="first", + dedup_mode="all_pairs", + ) + embedder = DummyEmbedder([[1.0, 0.0], [1.0, 0.0], [0.0, 1.0]]) + monkeypatch.setattr(processor, "_get_embedder", lambda: embedder) + + data = [ + {"id": "a", "text": "dup"}, + {"id": "b", "text": "dup"}, + {"id": "c", "text": "unique"}, + ] + + out = processor.process(data, metadata={}) + assert [x["id"] for x in out] == ["a", "c"] + + def test_invalid_dedup_mode_raises(self, monkeypatch): + embedder = DummyEmbedder([[1.0, 0.0], [1.0, 0.0], [0.0, 1.0]]) + + data = [ + {"id": "a", "text": "dup"}, + {"id": "b", "text": "dup"}, + {"id": "c", "text": "unique"}, + ] + + for mode in ["not_a_real_mode", "matrix", "vectorstore"]: + processor = SemanticDedupPostProcessor( + field="text", + similarity_threshold=0.9, + dedup_mode=mode, + ) + monkeypatch.setattr(processor, "_get_embedder", lambda: embedder) + with pytest.raises(ValueError, match="Unsupported dedup_mode"): + _ = processor.process(data, metadata={}) + + def test_field_list_is_joined_before_embedding(self, monkeypatch): + processor = SemanticDedupPostProcessor(field="parts", similarity_threshold=0.9) + embedder = DummyEmbedder([[1.0, 0.0], [0.0, 1.0]]) + monkeypatch.setattr(processor, "_get_embedder", lambda: embedder) + + data = [ + {"id": "a", "parts": ["hello", "world"]}, + {"id": "b", "parts": ("x", None)}, + ] + + _ = processor.process(data, metadata={}) + assert embedder.last_texts == ["hello\nworld", "x\n"] + + def test_writes_default_report_next_to_output_file(self, monkeypatch, tmp_path): + processor = SemanticDedupPostProcessor( + field="text", + similarity_threshold=0.9, + max_pairs_in_report=10, + ) + embedder = DummyEmbedder([[1.0, 0.0], [1.0, 0.0], [0.0, 1.0]]) + monkeypatch.setattr(processor, "_get_embedder", lambda: embedder) + + data = [ + {"id": "a", "text": "dup"}, + {"id": "b", "text": "dup"}, + {"id": "c", "text": "unique"}, + ] + + output_file = tmp_path / "output_2026-01-12_11-15-00.json" + _ = processor.process(data, metadata={"output_file": str(output_file)}) + + report_file = tmp_path / "semantic_dedup_report_2026-01-12_11-15-00.json" + assert report_file.exists() + + report = json.loads(report_file.read_text()) + assert report["input_count"] == 3 + assert report["output_count"] == 2 + assert report["dropped_count"] == 1 + assert report["pairs_reported"] == 1 + assert len(report["duplicates"]) == 1 + + def test_respects_custom_report_filename(self, monkeypatch, tmp_path): + processor = SemanticDedupPostProcessor( + field="text", + similarity_threshold=0.9, + report_filename="my_report.json", + max_pairs_in_report=10, + ) + embedder = DummyEmbedder([[1.0, 0.0], [1.0, 0.0], [0.0, 1.0]]) + monkeypatch.setattr(processor, "_get_embedder", lambda: embedder) + + data = [ + {"id": "a", "text": "dup"}, + {"id": "b", "text": "dup"}, + {"id": "c", "text": "unique"}, + ] + + output_file = tmp_path / "output_any.json" + _ = processor.process(data, metadata={"output_file": str(output_file)}) + + report_file = tmp_path / "my_report.json" + assert report_file.exists() + + report = json.loads(report_file.read_text()) + assert report["processor"] == "SemanticDedupPostProcessor" From 91111737d267ac6c07bedec5be44732f4d938289 Mon Sep 17 00:00:00 2001 From: "sriram.puttagunta" Date: Wed, 14 Jan 2026 13:03:58 +0530 Subject: [PATCH 3/3] Added Documentation page to mkdocs --- mkdocs.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/mkdocs.yml b/mkdocs.yml index a65ce6a0..1dab76da 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -83,6 +83,7 @@ nav: - Structured Output: concepts/structured_output/README.md - Features: - Metadata Tracking: features/metadata_tracking.md + - Semantic Deduplication: features/semantic_deduplication.md - Tutorials: - Agent Simulation: tutorials/agent_simulation_tutorial.md - Agent Simulation with Tools: tutorials/agent_tool_simulation_tutorial.md