diff --git a/corehq/ex-submodules/pillowtop/processors/elastic.py b/corehq/ex-submodules/pillowtop/processors/elastic.py index 3e5d34c7f427..feef87ef233a 100644 --- a/corehq/ex-submodules/pillowtop/processors/elastic.py +++ b/corehq/ex-submodules/pillowtop/processors/elastic.py @@ -67,7 +67,7 @@ def process_change(self, change): get_doc_meta_object_from_document, ) - if self.change_filter_fn and self.change_filter_fn(change): + if self.change_filter_fn(change): return if change.deleted and change.id: @@ -101,7 +101,7 @@ def process_change(self, change): ensure_matched_revisions(change, doc) with self._datadog_timing('transform'): - if doc is None or (self.doc_filter_fn and self.doc_filter_fn(doc)): + if doc is None or self.doc_filter_fn(doc): return if doc.get('doc_type') is not None and doc['doc_type'].endswith("-Deleted"): diff --git a/corehq/pillows/case_search.py b/corehq/pillows/case_search.py index e21b2137173a..cbfc64d5f093 100644 --- a/corehq/pillows/case_search.py +++ b/corehq/pillows/case_search.py @@ -20,7 +20,6 @@ from corehq.apps.es.client import manager from corehq.apps.geospatial.utils import get_geo_case_property from corehq.form_processor.backends.sql.dbaccessors import CaseReindexAccessor -from corehq.pillows.base import is_couch_change_for_sql_domain from corehq.util.doc_processor.sql import SqlDocumentProvider from corehq.util.log import get_traceback_string from corehq.util.quickcache import quickcache @@ -31,9 +30,8 @@ get_checkpoint_for_elasticsearch_pillow, ) from pillowtop.es_utils import initialize_index_and_mapping -from pillowtop.feed.interface import Change from pillowtop.pillow.interface import ConstructedPillow -from pillowtop.processors.elastic import ElasticProcessor +from pillowtop.processors.elastic import BulkElasticProcessor from pillowtop.reindexer.change_providers.case import ( get_domain_case_change_provider, ) @@ -107,22 +105,14 @@ def _add_gps_smart_types(dynamic_properties, gps_props): prop[GEOPOINT_VALUE] = None -class CaseSearchPillowProcessor(ElasticProcessor): +class CaseSearchPillowProcessor(BulkElasticProcessor): - def process_change(self, change): - assert isinstance(change, Change) - if self.change_filter_fn and self.change_filter_fn(change): - return - - if change.metadata is not None: - # Comes from KafkaChangeFeed (i.e. running pillowtop) + def __init__(self, adapter): + def change_filter_fn(change): domain = change.metadata.domain - else: - # comes from ChangeProvider (i.e reindexing) - domain = change.get_document()['domain'] + return not (domain and domain_needs_search_index(domain)) - if domain and domain_needs_search_index(domain): - super(CaseSearchPillowProcessor, self).process_change(change) + super().__init__(adapter, change_filter_fn=change_filter_fn) def get_case_search_processor(): @@ -134,10 +124,7 @@ def get_case_search_processor(): Writes to: - Case Search ES index """ - return CaseSearchPillowProcessor( - adapter=case_search_adapter, - change_filter_fn=is_couch_change_for_sql_domain - ) + return CaseSearchPillowProcessor(adapter=case_search_adapter) def _fail_gracefully_and_tell_admins(): diff --git a/corehq/pillows/tests/__init__.py b/corehq/pillows/tests/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/corehq/pillows/tests/test_case_search.py b/corehq/pillows/tests/test_case_search.py new file mode 100644 index 000000000000..c07a4258adc4 --- /dev/null +++ b/corehq/pillows/tests/test_case_search.py @@ -0,0 +1,57 @@ +from unittest.mock import patch + +from attrs import define, field + +from pillowtop.processors import elastic + +from .. import case_search as mod + + +class TestCaseSearchPillowProcessor: + + def test_process_changes_chunk_excludes_irrelevant_changes(self): + changes = [FakeChange(0), FakeChange(1), FakeChange(2), FakeChange(3)] + adapter = FakeAdapter() + proc = mod.CaseSearchPillowProcessor(adapter) + + def needs_search_index(domain_is_fake_change_id): + return domain_is_fake_change_id < 3 + + with ( + patch.object(mod, 'domain_needs_search_index', needs_search_index), + patch.object(elastic, 'bulk_fetch_changes_docs', lambda chs: ([], [])), + patch.object(elastic, 'build_bulk_payload', lambda chs, *args: chs), + ): + retries, errors = proc.process_changes_chunk(changes) + + # Parts of change_filter_fn tested: + # excluded: FakeChange(0).metadata.domain is False-ish + # excluded: domain_needs_search_index(FakeChange(3).metadata.domain) is False + assert adapter.bulk_calls == [[FakeChange(1), FakeChange(2)]] + assert not retries, 'excluded changes should not be retried' + assert not errors + + +@define +class FakeChange: + id = field() + document = True # processed by doc_filter_fn, which is noop_filter + + @property + def metadata(self): + return FakeMetadata(self.id) + + +@define +class FakeMetadata: + domain = field() + + +@define +class FakeAdapter: + index_name = "fake" + bulk_calls = field(factory=list) + + def bulk(self, actions, raise_errors): + self.bulk_calls.append(actions) + return len(actions), []