Skip to content

Commit 70f5085

Browse files
authored
Merge pull request #119 from Police-Data-Accessibility-Project/mc_118_collector_manager_prototype
Create Collector Manager Prototype
2 parents 19173f1 + 5cd1d43 commit 70f5085

File tree

9 files changed

+248
-0
lines changed

9 files changed

+248
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ identification_pipeline.py | The core python script uniting this modular pipelin
1313
openai-playground | Scripts for accessing the openai API on PDAP's shared account
1414
source_collectors| Tools for extracting metadata from different sources, including CKAN data portals and Common Crawler
1515
collector_db | Database for storing data from source collectors
16+
collector_manager | A module which provides a unified interface for interacting with source collectors and relevant data
1617

1718
## How to use
1819

collector_manager/CollectorBase.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
"""
2+
Base class for all collectors
3+
"""
4+
import abc
5+
import threading
6+
from abc import ABC
7+
8+
from collector_manager.enums import Status
9+
10+
11+
class CollectorBase(ABC):
12+
def __init__(self, name: str, config: dict) -> None:
13+
self.name = name
14+
self.config = config
15+
self.data = {}
16+
self.logs = []
17+
self.status = Status.RUNNING
18+
# # TODO: Determine how to update this in some of the other collectors
19+
self._stop_event = threading.Event()
20+
21+
@abc.abstractmethod
22+
def run(self) -> None:
23+
raise NotImplementedError
24+
25+
def log(self, message: str) -> None:
26+
self.logs.append(message)
27+
28+
def stop(self) -> None:
29+
self._stop_event.set()
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
"""
2+
Manager for all collectors
3+
Can start, stop, and get info on running collectors
4+
And manages the retrieval of collector info
5+
"""
6+
7+
import threading
8+
import uuid
9+
from typing import Dict, List, Optional
10+
11+
from collector_manager.ExampleCollector import ExampleCollector
12+
from collector_manager.enums import Status
13+
14+
15+
# Collector Manager Class
16+
class CollectorManager:
17+
def __init__(self):
18+
self.collectors: Dict[str, ExampleCollector] = {}
19+
20+
def list_collectors(self) -> List[str]:
21+
return ["example_collector"]
22+
23+
def start_collector(
24+
self,
25+
name: str,
26+
config: Optional[dict] = None
27+
) -> str:
28+
cid = str(uuid.uuid4())
29+
# The below would need to become more sophisticated
30+
# As we may load different collectors depending on the name
31+
collector = ExampleCollector(name, config)
32+
self.collectors[cid] = collector
33+
thread = threading.Thread(target=collector.run, daemon=True)
34+
thread.start()
35+
return cid
36+
37+
def get_status(self, cid: Optional[str] = None) -> str | List[str]:
38+
if cid:
39+
collector = self.collectors.get(cid)
40+
if not collector:
41+
return f"Collector with CID {cid} not found."
42+
return f"{cid} ({collector.name}) - {collector.status}"
43+
else:
44+
return [
45+
f"{cid} ({collector.name}) - {collector.status}"
46+
for cid, collector in self.collectors.items()
47+
]
48+
49+
def get_info(self, cid: str) -> str:
50+
collector = self.collectors.get(cid)
51+
if not collector:
52+
return f"Collector with CID {cid} not found."
53+
logs = "\n".join(collector.logs[-3:]) # Show the last 3 logs
54+
return f"{cid} ({collector.name}) - {collector.status}\nLogs:\n{logs}"
55+
56+
def close_collector(self, cid: str) -> str:
57+
collector = self.collectors.get(cid)
58+
if not collector:
59+
return f"Collector with CID {cid} not found."
60+
match collector.status:
61+
case Status.RUNNING:
62+
collector.stop()
63+
return f"Collector {cid} stopped."
64+
case Status.COMPLETED:
65+
data = collector.data
66+
del self.collectors[cid]
67+
return f"Collector {cid} harvested. Data: {data}"
68+
case _:
69+
return f"Cannot close collector {cid} with status {collector.status}."
70+
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
"""
2+
Command Handler
3+
4+
This module provides a command handler for the Collector Manager CLI.
5+
"""
6+
7+
from typing import List
8+
9+
from collector_manager.CollectorManager import CollectorManager
10+
11+
12+
class CommandHandler:
13+
def __init__(self, cm: CollectorManager):
14+
self.cm = cm
15+
self.commands = {
16+
"list": self.list_collectors,
17+
"start": self.start_collector,
18+
"status": self.get_status,
19+
"info": self.get_info,
20+
"close": self.close_collector,
21+
"exit": self.exit_manager,
22+
}
23+
self.running = True
24+
25+
def handle_command(self, command: str):
26+
parts = command.split()
27+
if not parts:
28+
return
29+
30+
cmd = parts[0]
31+
func = self.commands.get(cmd, self.unknown_command)
32+
func(parts)
33+
34+
def list_collectors(self, args: List[str]):
35+
print("\n".join(self.cm.list_collectors()))
36+
37+
def start_collector(self, args: List[str]):
38+
if len(args) < 2:
39+
print("Usage: start {collector_name}")
40+
return
41+
collector_name = args[1]
42+
config = None
43+
if len(args) > 3 and args[2] == "--config":
44+
config = args[3]
45+
cid = self.cm.start_collector(collector_name, config)
46+
print(f"Started collector with CID: {cid}")
47+
48+
def get_status(self, args: List[str]):
49+
if len(args) > 1:
50+
cid = args[1]
51+
print(self.cm.get_status(cid))
52+
else:
53+
print("\n".join(self.cm.get_status()))
54+
55+
def get_info(self, args: List[str]):
56+
if len(args) < 2:
57+
print("Usage: info {cid}")
58+
return
59+
cid = args[1]
60+
print(self.cm.get_info(cid))
61+
62+
def close_collector(self, args: List[str]):
63+
if len(args) < 2:
64+
print("Usage: close {cid}")
65+
return
66+
cid = args[1]
67+
print(self.cm.close_collector(cid))
68+
69+
def exit_manager(self, args: List[str]):
70+
print("Exiting Collector Manager.")
71+
self.running = False
72+
73+
def unknown_command(self, args: List[str]):
74+
print("Unknown command.")
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
"""
2+
Example collector
3+
Exists as a proof of concept for collector functionality
4+
5+
"""
6+
import time
7+
8+
from collector_manager.CollectorBase import CollectorBase
9+
from collector_manager.enums import Status
10+
11+
12+
class ExampleCollector(CollectorBase):
13+
14+
def run(self):
15+
try:
16+
for i in range(10): # Simulate a task
17+
if self._stop_event.is_set():
18+
self.log("Collector stopped.")
19+
self.status = Status.ERRORED
20+
return
21+
self.log(f"Step {i+1}/10")
22+
time.sleep(1) # Simulate work
23+
self.data = {"message": f"Data collected by {self.name}"}
24+
self.status = Status.COMPLETED
25+
self.log("Collector completed successfully.")
26+
except Exception as e:
27+
self.status = Status.ERRORED
28+
self.log(f"Error: {e}")

collector_manager/README.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
The Collector Manager is a class used to manage collectors. It can start, stop, and get info on running collectors.
2+
3+
The following commands are available:
4+
5+
| Command | Description |
6+
|------------------------------------------|----------------------------------------------------------------|
7+
| list | List running collectors |
8+
| start {collector_name} --config {config} | Start a collector, optionally with a given configuration |
9+
| status {collector_id} | Get status of a collector, or all collectors if no id is given |
10+
| info {collector_id} | Get info on a collector, including recent log updates |
11+
| close {collector_id} | Close a collector |
12+
| exit | Exit the collector manager |
13+
14+
This directory consists of the following files:
15+
16+
| File | Description |
17+
|-------------------|----------------------------------------------------------------|
18+
| CollectorManager.py | Main collector manager class |
19+
| CommandHandler.py | Class used to handle commands from the command line interface |
20+
| CollectorBase.py | Base class for collectors |
21+
|enums.py | Enumerations used in the collector manager |
22+
| ExampleCollector.py | Example collector |
23+
| main.py | Main function for the collector manager |

collector_manager/__init__.py

Whitespace-only changes.

collector_manager/enums.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
class Status:
2+
RUNNING = "RUNNING"
3+
COMPLETED = "COMPLETED"
4+
ERRORED = "ERRORED"

collector_manager/main.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
"""
2+
This starts up the collector manager Command Line Interface (CLI)
3+
"""
4+
5+
from collector_manager.CollectorManager import CollectorManager
6+
from collector_manager.CommandHandler import CommandHandler
7+
8+
9+
def main():
10+
cm = CollectorManager()
11+
handler = CommandHandler(cm)
12+
print("Collector Manager CLI")
13+
while handler.running:
14+
command = input("Enter command: ")
15+
handler.handle_command(command)
16+
17+
18+
if __name__ == "__main__":
19+
main()

0 commit comments

Comments
 (0)