Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions corehq/ex-submodules/pillowtop/processors/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def process_change(self, change):
get_doc_meta_object_from_document,
)

if self.change_filter_fn and self.change_filter_fn(change):
if self.change_filter_fn(change):
return

if change.deleted and change.id:
Expand Down Expand Up @@ -101,7 +101,7 @@ def process_change(self, change):
ensure_matched_revisions(change, doc)

with self._datadog_timing('transform'):
if doc is None or (self.doc_filter_fn and self.doc_filter_fn(doc)):
if doc is None or self.doc_filter_fn(doc):
return

if doc.get('doc_type') is not None and doc['doc_type'].endswith("-Deleted"):
Expand Down
27 changes: 7 additions & 20 deletions corehq/pillows/case_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from corehq.apps.es.client import manager
from corehq.apps.geospatial.utils import get_geo_case_property
from corehq.form_processor.backends.sql.dbaccessors import CaseReindexAccessor
from corehq.pillows.base import is_couch_change_for_sql_domain
from corehq.util.doc_processor.sql import SqlDocumentProvider
from corehq.util.log import get_traceback_string
from corehq.util.quickcache import quickcache
Expand All @@ -31,9 +30,8 @@
get_checkpoint_for_elasticsearch_pillow,
)
from pillowtop.es_utils import initialize_index_and_mapping
from pillowtop.feed.interface import Change
from pillowtop.pillow.interface import ConstructedPillow
from pillowtop.processors.elastic import ElasticProcessor
from pillowtop.processors.elastic import BulkElasticProcessor
from pillowtop.reindexer.change_providers.case import (
get_domain_case_change_provider,
)
Expand Down Expand Up @@ -107,22 +105,14 @@ def _add_gps_smart_types(dynamic_properties, gps_props):
prop[GEOPOINT_VALUE] = None


class CaseSearchPillowProcessor(ElasticProcessor):
class CaseSearchPillowProcessor(BulkElasticProcessor):

def process_change(self, change):
assert isinstance(change, Change)
if self.change_filter_fn and self.change_filter_fn(change):
return

if change.metadata is not None:
# Comes from KafkaChangeFeed (i.e. running pillowtop)
def __init__(self, adapter):
def change_filter_fn(change):
domain = change.metadata.domain
else:
# comes from ChangeProvider (i.e reindexing)
domain = change.get_document()['domain']
return not (domain and domain_needs_search_index(domain))

if domain and domain_needs_search_index(domain):
super(CaseSearchPillowProcessor, self).process_change(change)
super().__init__(adapter, change_filter_fn=change_filter_fn)


def get_case_search_processor():
Expand All @@ -134,10 +124,7 @@ def get_case_search_processor():
Writes to:
- Case Search ES index
"""
return CaseSearchPillowProcessor(
adapter=case_search_adapter,
change_filter_fn=is_couch_change_for_sql_domain
)
return CaseSearchPillowProcessor(adapter=case_search_adapter)


def _fail_gracefully_and_tell_admins():
Expand Down
Empty file.
57 changes: 57 additions & 0 deletions corehq/pillows/tests/test_case_search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from unittest.mock import patch

from attrs import define, field

from pillowtop.processors import elastic

from .. import case_search as mod


class TestCaseSearchPillowProcessor:

def test_process_changes_chunk_excludes_irrelevant_changes(self):
changes = [FakeChange(0), FakeChange(1), FakeChange(2), FakeChange(3)]
adapter = FakeAdapter()
proc = mod.CaseSearchPillowProcessor(adapter)

def needs_search_index(domain_is_fake_change_id):
return domain_is_fake_change_id < 3

with (
patch.object(mod, 'domain_needs_search_index', needs_search_index),
patch.object(elastic, 'bulk_fetch_changes_docs', lambda chs: ([], [])),
patch.object(elastic, 'build_bulk_payload', lambda chs, *args: chs),
):
retries, errors = proc.process_changes_chunk(changes)

# Parts of change_filter_fn tested:
# excluded: FakeChange(0).metadata.domain is False-ish
# excluded: domain_needs_search_index(FakeChange(3).metadata.domain) is False
assert adapter.bulk_calls == [[FakeChange(1), FakeChange(2)]]
assert not retries, 'excluded changes should not be retried'
assert not errors


@define
class FakeChange:
id = field()
document = True # processed by doc_filter_fn, which is noop_filter

@property
def metadata(self):
return FakeMetadata(self.id)


@define
class FakeMetadata:
domain = field()


@define
class FakeAdapter:
index_name = "fake"
bulk_calls = field(factory=list)

def bulk(self, actions, raise_errors):
self.bulk_calls.append(actions)
return len(actions), []
Loading