Skip to content

Commit d4b6bf8

Browse files
committed
feature: get started with advanced worker
1 parent 244012d commit d4b6bf8

File tree

14 files changed

+705
-30
lines changed

14 files changed

+705
-30
lines changed

.data/.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
*
2-
!.gitignore
2+
!.datashare

.data/datashare/.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
*
2+
!.gitignore

docker-compose.yml

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,22 @@ services:
55
build:
66
context: .
77
target: worker
8-
# deploy:
9-
# mode: replicated
10-
# replicas: 2
11-
# depends_on:
12-
# datashare_web:
13-
# condition: service_started
8+
deploy:
9+
mode: replicated
10+
replicas: 2
11+
depends_on:
12+
datashare_web:
13+
condition: service_started
1414
environment:
1515
DS_DOCKER_ML_LOG_LEVEL: DEBUG
1616
DS_DOCKER_ES_ADDRESS: http://elasticsearch:9200
1717
ICIJ_WORKER_TYPE: amqp
1818
ICIJ_WORKER_RABBITMQ_HOST: rabbitmq
1919
ICIJ_WORKER_RABBITMQ_PORT: 5672
20+
volumes:
21+
- type: bind
22+
source: ${PWD}/.data
23+
target: /home/user/src/app/.data
2024

2125
# Adding rabbitmq to distribute Datashare tasks
2226
rabbitmq:
@@ -50,7 +54,7 @@ services:
5054
- DS_DOCKER_MOUNTED_DATA_DIR=${PWD}/data
5155
volumes:
5256
- type: bind
53-
source: ${PWD}/.data
57+
source: ${PWD}/.data/datashare
5458
target: /home/datashare/Datashare
5559
depends_on:
5660
redis:

docs/get-started/implement/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# How to use the worker template ?
1+
# How to use the worker template repo ?
22

33
The [datashare-python](https://github.com/ICIJ/datashare-python) repository is meant to be used as a template to implement your own Datashare worker.
44

Lines changed: 213 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,227 @@
1-
# Implement your own Datashare worker
1+
# Advanced Datashare worker
2+
3+
In this section we'll augment the worker template app (translation and classification) with
4+
[vector store](https://en.wikipedia.org/wiki/Vector_database) to allow us to perform semantic similarity searches
5+
between queries and Datashare docs.
6+
7+
Make sure you've followed the [basic worker example](worker-basic.md) to understand the basics !
28

39
## Clone the template repository
410

5-
Start by cloning the [template repository](https://github.com/ICIJ/datashare-python):
11+
Start over and clone the [template repository](https://github.com/ICIJ/datashare-python) once again:
612

713
<!-- termynal -->
814
```console
915
$ git clone [email protected]:ICIJ/datashare-python.git
1016
---> 100%
1117
```
1218

13-
## Install dependencies
19+
## Install extra dependencies
20+
21+
We'll use [LanceDB](https://lancedb.github.io/lancedb/) to implement our vector store, we need to add it as well as
22+
the [sentence-transformers](https://github.com/UKPLab/sentence-transformers) to our dependencies:
1423

15-
Install [`uv`](https://docs.astral.sh/uv/getting-started/installation/) and install dependencies:
1624
<!-- termynal -->
1725
```console
18-
$ curl -LsSf https://astral.sh/uv/install.sh | sh
19-
$ uv sync --frozen --group dev
26+
$ uv add lancedb sentence-transformers
27+
---> 100%
28+
```
29+
30+
!!! note
31+
In a production setup, since elasticsearch implements its [own vector database](https://www.elastic.co/elasticsearch/vector-database)
32+
it might have been convenient to use it. For this examples, we're using LanceDB as it's embedded and doesn't require
33+
any deployment update.
34+
35+
## Embedding Datashare documents
36+
37+
For the demo purpose, we'll split the task of embedding docs into two tasks:
38+
39+
- the `create_vectorization_tasks` which scans the index, get IDs of Datashare docs and batch them and create `vectorize_docs` tasks
40+
- the `vectorize_docs` tasks (triggered by the `create_vectorization_tasks` task) receives docs IDs,
41+
fetch the doc contents from the index and add them to vector database
42+
43+
!!! note
44+
We could have performed vectorization in a single task, having first task splitting a large tasks into batches/chunks
45+
is a commonly used pattern to distribute heavy workloads across workers (learn more in the
46+
[task workflow guide](../../guides/task-workflows.md)).
47+
48+
49+
### The `create_vectorization_tasks` task
50+
51+
The `create_vectorization_tasks` is defined in the `tasks/vectorize.py` file as following:
52+
```python title="tasks/vectorize.py"
53+
--8<--
54+
vectorize.py:create_vectorization_tasks
55+
--8<--
56+
```
57+
58+
59+
The function starts by creating a schema for our vector DB table using the convenient
60+
[LanceDB embedding function](https://lancedb.github.io/lancedb/embeddings/embedding_functions/) feature,
61+
which will automatically create the record `vector field from the provided source field (`content` in our case) using
62+
our HuggingFace embedding model:
63+
```python title="tasks/vectorize.py" hl_lines="2 6 7"
64+
--8<--
65+
vectorize.py:embedding-schema
66+
--8<--
67+
```
68+
69+
We then (re)-create a vector table using the **DB connection provided by dependency injection** (see the next section to learn more):
70+
```python title="tasks/vectorize.py" hl_lines="4"
71+
--8<--
72+
vectorize.py:create-table
73+
--8<--
74+
```
75+
76+
Next `create_vectorization_tasks` queries the index matching all documents:
77+
```python title="tasks/vectorize.py"
78+
--8<--
79+
vectorize.py:query-docs
80+
--8<--
2081
```
82+
and scroll through results pages creating batches of `batch_size`:
83+
```python title="tasks/vectorize.py"
84+
--8<--
85+
vectorize.py:retrieve-docs
86+
--8<--
87+
```
88+
89+
Finally, for each batch, it spawns a vectorization task using the datashare task client and returns the list of created tasks:
90+
```python title="tasks/vectorize.py" hl_lines="5 6 7 8 10"
91+
--8<--
92+
vectorize.py:batch-vectorization
93+
--8<--
94+
```
95+
96+
### The `lifespan_vector_db` dependency injection
97+
98+
In order to avoid to re-create a DB connection each time the worker processes a task, we leverage
99+
[dependency injection](../../guides/dependency-injection.md) in order to create the connection at start up and
100+
retrieve it inside our function.
101+
102+
This pattern is already used for the elasticsearch client and the datashare task client, to use it for the vector DB
103+
connection, we'll need to update the
104+
[dependencies.py](https://github.com/ICIJ/datashare-python/blob/main/ml_worker/tasks/dependencies.py) file.
105+
106+
First we need to implement the dependency setup function:
107+
```python title="dependencies.py" hl_lines="10 11"
108+
--8<--
109+
vector_db_dependencies.py:setup
110+
--8<--
111+
```
112+
113+
The function creates a connection to the vector DB located on the filesystem and stores the connection to a
114+
global variable.
115+
116+
We then have to implement a function to make this global available to the rest of the codebase:
117+
```python title="dependencies.py" hl_lines="4"
118+
--8<--
119+
vector_db_dependencies.py:provide
120+
--8<--
121+
```
122+
We also need to make sure the connection is properly exited when the worker stops by implementing the dependency tear down.
123+
We just call the `:::python AsyncConnection.__aexit__` methode:
124+
```python title="dependencies.py" hl_lines="2"
125+
--8<--
126+
vector_db_dependencies.py:teardown
127+
--8<--
128+
```
129+
130+
Read the [dependency injection guide](../../guides/dependency-injection.md) to learn more !
131+
132+
133+
### The `vectorize_docs` task
134+
135+
Next we implement the `vectorize_docs` as following:
136+
137+
```python title="tasks/vectorize.py"
138+
--8<--
139+
vectorize.py:vectorize_docs
140+
--8<--
141+
```
142+
143+
The task function starts by retriving the batch document contents, querying the index by doc IDs:
144+
```python title="tasks/vectorize.py" hl_lines="1-4"
145+
--8<--
146+
vectorize.py:retrieve-doc-content
147+
--8<--
148+
```
149+
150+
Finally, we add each doc content to the vector DB table, because we created table using a schema and the
151+
[embedding function](https://lancedb.github.io/lancedb/embeddings/embedding_functions/) feature, the embedding vector
152+
will be automatically created from the `content` source field:
153+
```python title="tasks/vectorize.py" hl_lines="5-7"
154+
--8<--
155+
vectorize.py:vectorization
156+
--8<--
157+
```
158+
159+
160+
## Semantic similarity search
161+
162+
Now that we've built a vector store from Datashare's docs, we need to query it. Let's create a `find_most_similar`
163+
task which find the most similar docs for a provided set of queries.
164+
165+
The task function starts by loading the embedding model and vectorizes the input queries:
166+
167+
```python title="tasks/vectorize.py" hl_lines="13-14"
168+
--8<--
169+
vectorize.py:find_most_similar
170+
--8<--
171+
```
172+
173+
it then performs an [hybrid search](https://lancedb.github.io/lancedb/hybrid_search/hybrid_search/), using both the
174+
input query vector and its text:
175+
176+
```python title="tasks/vectorize.py" hl_lines="4-11"
177+
--8<--
178+
vectorize.py:hybrid-search
179+
--8<--
180+
```
181+
182+
## Registering the new tasks
183+
184+
In order to turn our function into a Datashare [task](../../learn/concepts-basic.md#tasks), we have to register it into the
185+
`:::python app` [async app](../../learn/concepts-basic.md#app) variable of the
186+
[app.py](https://github.com/ICIJ/datashare-python/blob/main/ml_worker/app.py) file, using the `:::python @task` decorator:
187+
188+
```python title="app.py" hl_lines="16 17 18 19 20 25 32 37"
189+
--8<--
190+
vectorize_app.py:vectorize-app
191+
--8<--
192+
```
193+
194+
## Testing
195+
196+
Finally, we implement some tests in the `tests/tasks/test_vectorize.py` file:
197+
198+
```python title="tests/tasks/test_vectorize.py"
199+
--8<--
200+
test_vectorize.py:test-vectorize
201+
--8<--
202+
```
203+
204+
We can then run the tests after starting test services using the `ml-worker` Docker Compose wrapper:
205+
<!-- termynal -->
206+
```console
207+
$ ./ml-worker up -d postgresql redis elasticsearch rabbitmq datashare_web
208+
$ uv run --frozen pytest ml_worker/tests/tasks/test_vectorize.py
209+
===== test session starts =====
210+
collected 3 items
211+
212+
ml_worker/tests/tasks/test_vectorize.py ... [100%]
213+
214+
====== 3 passed in 6.87s ======
215+
....
216+
```
217+
218+
## Summary
219+
220+
We've successfully added a vector store to Datashare !
221+
222+
Rather than copy-pasting the above code blocks, you can replace/update your codebase with the following files:
223+
- [ml_worker/tasks/vectorize.py](https://github.com/ICIJ/datashare-python/blob/main/docs/src/vectorize.py)
224+
- [ml_worker/tasks/dependencies](https://github.com/ICIJ/datashare-python/blob/main/docs/src/vector_db_dependencies.py)
225+
- [ml_worker/app.py](https://github.com/ICIJ/datashare-python/blob/main/docs/src/vectorize_app.py)
226+
- [ml_worker/tests/tasks/test_vectorize.py](https://github.com/ICIJ/datashare-python/blob/main/docs/src/test_vectorize.py)
227+

docs/get-started/implement/worker-basic.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ basic_app.py:hello_user_fn
2525
## Register the `hello_user` task
2626

2727
In order to turn our function into a Datashare [task](../../learn/concepts-basic.md#tasks), we have to register it into the
28-
`:::python app` [async app](../../learn/concepts-basic.md#app) variable of the [app.py](../../../ml_worker/app.py) file, using the `:::python @task` decorator.
28+
`:::python app` [async app](../../learn/concepts-basic.md#app) variable of the
29+
[app.py](https://github.com/ICIJ/datashare-python/blob/main/ml_worker/app.py) file, using the `:::python @task` decorator.
2930

3031
Since we won't use existing tasks, we can also perform some cleaning and get rid of them.
3132
The `app.py` file should hence look like this:

docs/src/test_vectorize.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# pylint: disable=redefined-outer-name
2+
# --8<-- [start:test-vectorize]
3+
from pathlib import Path
4+
from typing import List
5+
6+
import pytest
7+
from icij_common.es import ESClient
8+
from lancedb import AsyncConnection as LanceDBConnection, connect_async
9+
10+
from ml_worker.objects import Document
11+
from ml_worker.tasks.vectorize import (
12+
create_vectorization_tasks,
13+
find_most_similar,
14+
make_record_schema,
15+
recreate_vector_table,
16+
vectorize_docs,
17+
)
18+
from ml_worker.tests.conftest import TEST_PROJECT
19+
from ml_worker.utils import DSTaskClient
20+
21+
22+
@pytest.fixture
23+
async def test_vector_db(tmpdir) -> LanceDBConnection:
24+
db = await connect_async(Path(tmpdir) / "test_vectors.db")
25+
return db
26+
27+
28+
@pytest.mark.integration
29+
async def test_create_vectorization_tasks(
30+
populate_es: List[Document], # pylint: disable=unused-argument
31+
test_es_client: ESClient,
32+
test_task_client: DSTaskClient,
33+
test_vector_db: LanceDBConnection,
34+
):
35+
# When
36+
task_ids = await create_vectorization_tasks(
37+
project=TEST_PROJECT,
38+
es_client=test_es_client,
39+
task_client=test_task_client,
40+
vector_db=test_vector_db,
41+
batch_size=2,
42+
)
43+
# Then
44+
assert len(task_ids) == 2
45+
46+
47+
@pytest.mark.integration
48+
async def test_vectorize_docs(
49+
populate_es: List[Document], # pylint: disable=unused-argument
50+
test_es_client: ESClient,
51+
test_vector_db: LanceDBConnection,
52+
):
53+
# Given
54+
model = "BAAI/bge-small-en-v1.5"
55+
docs = ["doc-0", "doc-3"]
56+
schema = make_record_schema(model)
57+
await recreate_vector_table(test_vector_db, schema)
58+
59+
# When
60+
n_vectorized = await vectorize_docs(
61+
docs,
62+
TEST_PROJECT,
63+
es_client=test_es_client,
64+
vector_db=test_vector_db,
65+
)
66+
# Then
67+
assert n_vectorized == 2
68+
table = await test_vector_db.open_table("ds_docs")
69+
records = await table.query().to_list()
70+
assert len(records) == 2
71+
doc_ids = sorted(d["doc_id"] for d in records)
72+
assert doc_ids == ["doc-0", "doc-3"]
73+
assert all("vector" in r for r in records)
74+
75+
76+
@pytest.mark.integration
77+
async def test_find_most_similar(test_vector_db: LanceDBConnection):
78+
# Given
79+
model = "BAAI/bge-small-en-v1.5"
80+
schema = make_record_schema(model)
81+
table = await recreate_vector_table(test_vector_db, schema)
82+
docs = [
83+
{"doc_id": "novel", "content": "I'm a doc about novels"},
84+
{"doc_id": "monkey", "content": "I'm speaking about monkeys"},
85+
]
86+
await table.add(docs)
87+
queries = ["doc about books", "doc speaking about animal"]
88+
89+
# When
90+
n_similar = 1
91+
most_similar = await find_most_similar(
92+
queries, model, vector_db=test_vector_db, n_similar=n_similar
93+
)
94+
# Then
95+
assert len(most_similar) == 2
96+
similar_ids = [s["doc_id"] for s in most_similar]
97+
assert similar_ids == ["novel", "monkey"]
98+
assert all("distance" in s for s in most_similar)
99+
100+
101+
# --8<-- [end:test-vectorize]

0 commit comments

Comments
 (0)