From 8998adf493b738592b37508100724b4d1310d123 Mon Sep 17 00:00:00 2001 From: uxairibrar Date: Tue, 15 Jul 2025 03:45:26 +0200 Subject: [PATCH 01/14] Added Scythe and updated test case --- publications/tasks.py | 283 ++++++++++++++++++++------------------- tests/test_harvesting.py | 194 ++++++++++++--------------- 2 files changed, 231 insertions(+), 246 deletions(-) diff --git a/publications/tasks.py b/publications/tasks.py index 8c174dd..69b047f 100644 --- a/publications/tasks.py +++ b/publications/tasks.py @@ -1,37 +1,48 @@ import logging -logger = logging.getLogger(__name__) - import os -import json -import subprocess import gzip +import glob import re import tempfile import glob +import json import time +import tempfile import calendar +import subprocess +from pathlib import Path from datetime import datetime, timedelta, timezone as dt_timezone +from urllib.parse import urlsplit, urlunsplit, quote import xml.dom.minidom + import requests from pathlib import Path from bs4 import BeautifulSoup +from xml.dom import minidom + from urllib.parse import quote from django.conf import settings from django.core.serializers import serialize from django.core.mail import send_mail, EmailMessage -from django.contrib.gis.geos import GEOSGeometry from django.utils import timezone +from django.contrib.gis.geos import GEOSGeometry, GeometryCollection from django_q.tasks import schedule from django_q.models import Schedule -from publications.models import Publication, HarvestingEvent, Source -from .models import EmailLog, Subscription from django.contrib.auth import get_user_model +from publications.models import Publication, HarvestingEvent, Source, EmailLog, Subscription +from django.urls import reverse +User = get_user_model() +from oaipmh_scythe import Scythe +from urllib.parse import urlsplit, urlunsplit +from django.contrib.gis.geos import GeometryCollection +from bs4 import BeautifulSoup +import requests +from .models import EmailLog, Subscription from django.urls import reverse from geopy.geocoders import Nominatim from django.contrib.gis.geos import Point -User = get_user_model() - +logger = logging.getLogger(__name__) BASE_URL = settings.BASE_URL DOI_REGEX = re.compile(r'10\.\d{4,9}/[-._;()/:A-Z0-9]+', re.IGNORECASE) CACHE_DIR = Path(tempfile.gettempdir()) / 'optimap_cache' @@ -78,62 +89,70 @@ def extract_timeperiod_from_html(soup: BeautifulSoup): for tag in soup.find_all("meta"): if tag.get("name") in ("DC.temporal", "DC.PeriodOfTime"): parts = tag["content"].split("/") - start = parts[0] if parts[0] else None end = parts[1] if len(parts) > 1 and parts[1] else None - return ([start] if start else [None]), ([end] if end else [None]) # If missing, return [None] for start and [None] for end + start = parts[0] if parts[0] else None return [None], [None] + return ([start] if start else [None]), ([end] if end else [None]) # If missing, return [None] for start and [None] for end -def parse_oai_xml_and_save_publications(content: bytes, event: HarvestingEvent) -> tuple[int, int, int]: - """ - Parse OAI-PMH XML, save Publication records linked to `event`, - and return counts: (added, spatial, temporal). - """ - try: - dom = xml.dom.minidom.parseString(content) - except Exception as e: - logger.error("Error parsing XML: %s", e) - return 0, 0, 0 - for record in dom.getElementsByTagName("record"): +def parse_oai_xml_and_save_publications(content, event): + source = event.source + parsed = urlsplit(source.url_field) + # if we have raw XML bytes, parse directly + if content: + DOMTree = xml.dom.minidom.parseString(content) + records = DOMTree.documentElement.getElementsByTagName("record") + else: + # otherwise use Scythe to fetch & page through records + base = urlunsplit((parsed.scheme, parsed.netloc, parsed.path, "", "")) + harvester = Scythe(base) + records = harvester.list_records(metadata_prefix="oai_dc") + + if not records: + logger.warning("No articles found in OAI-PMH response!") + return + + for rec in records: try: - def get_text(tag_name: str) -> str | None: - nodes = record.getElementsByTagName(tag_name) - return ( - nodes[0].firstChild.nodeValue.strip() - if nodes and nodes[0].firstChild else None + # for DOM‐parsed records, rec is an Element; for Scythe, rec.metadata is a dict + if hasattr(rec, "metadata"): + # Scythe record + identifiers = rec.metadata.get("identifier", []) + rec.metadata.get("relation", []) + get_field = lambda k: rec.metadata.get(k, [""])[0] + else: + # DOM record + id_nodes = rec.getElementsByTagName("dc:identifier") + identifiers = [n.firstChild.nodeValue.strip() for n in id_nodes if n.firstChild] + get_field = lambda tag: ( + rec.getElementsByTagName(tag)[0].firstChild.nodeValue.strip() + if rec.getElementsByTagName(tag) and rec.getElementsByTagName(tag)[0].firstChild + else None ) - - ids = [ - n.firstChild.nodeValue.strip() - for n in record.getElementsByTagName("dc:identifier") - if n.firstChild - ] - http_ids = [u for u in ids if u.lower().startswith("http")] - identifier = None - for u in http_ids: - if "/view/" in u: - identifier = u - break - if not identifier and http_ids: - identifier = http_ids[0] - - title = get_text("dc:title") - abstract = get_text("dc:description") - publisher_name = get_text("dc:publisher") - pub_date = get_text("dc:date") - - doi = None - for u in ids: - m = DOI_REGEX.search(u) - if m: - doi = m.group(0) + http_urls = [u for u in identifiers if u and u.lower().startswith("http")] + view_urls = [u for u in http_urls if "/view/" in u] + identifier_value = (view_urls or http_urls or [None])[0] + + # metadata fields + title_value = get_field("title") or get_field("dc:title") + abstract_text = get_field("description") or get_field("dc:description") + journal_value = get_field("publisher") or get_field("dc:publisher") + date_value = get_field("date") or get_field("dc:date") + + # DOI extraction + doi_text = None + for u in identifiers: + if u and (m := DOI_REGEX.search(u)): + doi_text = m.group(0) break - if doi and Publication.objects.filter(doi=doi).exists(): + # duplicate checks + if doi_text and Publication.objects.filter(doi=doi_text).exists(): + logger.info("Skipping duplicate publication (DOI): %s", doi_text) continue if identifier and Publication.objects.filter(url=identifier).exists(): continue - if not identifier or not identifier.startswith("http"): + if not identifier_value or not identifier_value.startswith("http"): + logger.warning("Skipping record with invalid URL: %s", identifier_value) continue src = None @@ -148,108 +167,96 @@ def get_text(tag_name: str) -> str | None: resp = requests.get(identifier, timeout=10) resp.raise_for_status() soup = BeautifulSoup(resp.content, "html.parser") + geom = extract_geometry_from_html(soup) + if geom: + geom_object = geom + start_time, end_time = extract_timeperiod_from_html(soup) + period_start = start_time if isinstance(start_time, list) else [start_time] if start_time else [] + period_end = end_time if isinstance(end_time, list) else [end_time] if end_time else [] + except Exception as fetch_err: + logger.error("Error fetching HTML for %s: %s", identifier_value, fetch_err) + + # save + pub = Publication( + title = title_value, + abstract = abstract_text, + publicationDate = date_value, + url = identifier_value, + doi = doi_text, + source = journal_value, + geometry = geom_object, + timeperiod_startdate = period_start, + timeperiod_enddate = period_end, + job = event + ) - ps_list, pe_list = extract_timeperiod_from_html(soup) - - g = extract_geometry_from_html(soup) - if g: - geom = g - - if src and getattr(src, "is_preprint", False) and geom.empty: - try: - loc = Nominatim(user_agent="optimap-tasks").geocode(src.homepage_url or src.url) - if loc: - geom = Point(loc.longitude, loc.latitude) - except Exception as e: - logger.debug( - "Preprint geocode failed for %s: %s", - src.name if src else identifier, - e - ) - except Exception as e: - logger.debug( - "Retrieval and metadata extraction failed for %s: %s", - src.name if src else identifier, - e - ) - pass + pub.save() - Publication.objects.create( - title=title, - abstract=abstract, - publicationDate=pub_date, - url=identifier, - doi=doi, - source=src, - geometry=geom, - timeperiod_startdate=ps_list, - timeperiod_enddate=pe_list, - job=event, - ) except Exception as e: logger.error("Error parsing record: %s", e) continue - added_count = Publication.objects.filter(job=event).count() - spatial_count = Publication.objects.filter(job=event).exclude(geometry__isnull=True).count() - temporal_count = Publication.objects.filter(job=event).exclude(timeperiod_startdate=[]).count() - return added_count, spatial_count, temporal_count - def harvest_oai_endpoint(source_id: int, user=None) -> None: """ Fetch OAI-PMH feed (HTTP or file://), create a HarvestingEvent, parse & save publications, send summary email, and mark completion. """ + source = Source.objects.get(id=source_id) + event = HarvestingEvent.objects.create(source=source, status="in_progress") + try: - src = Source.objects.get(pk=source_id) - except Source.DoesNotExist: - logger.error("Source with id %s not found", source_id) - return - if src.url_field.startswith("file://"): - path = src.url_field[7:] - try: - with open(path, "rb") as f: - content = f.read() - except Exception as e: - logger.error("Failed to read local file %s: %s", path, e) - return - else: - try: - resp = requests.get(src.url_field, timeout=30) - resp.raise_for_status() - content = resp.content - except Exception as e: - logger.error("Harvesting failed for %s: %s", src.url_field, e) - return - - low = (src.homepage_url or src.url_field or "").lower() - if any(x in low for x in ("arxiv.org", "biorxiv.org")) and not src.is_preprint: - src.is_preprint = True - src.save(update_fields=["is_preprint"]) - - event = HarvestingEvent.objects.create( - source=src, - user=user, - status="in_progress", - ) - added, spatial, temporal = parse_oai_xml_and_save_publications(content, event) - if user: - subject = "Harvesting Completed" - body = ( - f"Collection: {src.collection_name}\n" - f"Source URL: {src.url_field}\n" - f"Number of added articles: {added}\n" - f"Number of articles with spatial metadata: {spatial}\n" - f"Number of articles with temporal metadata: {temporal}\n" - f"Harvest started : {event.started_at:%Y-%m-%d}\n" + response = requests.get(source.url_field) + response.raise_for_status() + + parse_oai_xml_and_save_publications(response.content, event) + + event.status = "completed" + event.completed_at = timezone.now() + event.save() + + new_count = Publication.objects.filter(job=event).count() + spatial_count = Publication.objects.filter(job=event).exclude(geometry__isnull=True).count() + temporal_count = Publication.objects.filter(job=event).exclude(timeperiod_startdate=[]).count() + subject = f"Harvesting Completed for {source.collection_name}" + completed_str = event.completed_at.strftime('%Y-%m-%d %H:%M:%S') + message = ( + f"Harvesting job details:\n\n" + f"Number of added articles: {new_count}\n" + f"Number of articles with spatial metadata: {spatial_count}\n" + f"Number of articles with temporal metadata: {temporal_count}\n" + f"Collection used: {source.collection_name or 'N/A'}\n" + f"Journal: {source.url_field}\n" + f"Job started at: {event.started_at.strftime('%Y-%m-%d %H:%M:%S')}\n" + f"Job completed at: {completed_str}\n" ) - send_mail(subject, body, settings.EMAIL_HOST_USER, [user.email]) + if user and user.email: + send_mail( + subject, + message, + settings.EMAIL_HOST_USER, + [user.email], + fail_silently=False, + ) + except Exception as e: + logger.error("Harvesting failed for source %s: %s", source.url_field, str(e)) + event.status = "failed" + event.completed_at = timezone.now() + event.save() + + if user and user.email: + send_mail( + "OPTIMAP Harvesting Failed", + "Harvesting failed for source %s: %s".format(source.url_field, str(e)) + settings.EMAIL_HOST_USER, + [user.email], + fail_silently=False, + ) event.status = "completed" event.completed_at = timezone.now() event.save() - return added, spatial, temporal + return new_count, spatial_count, temporal_count def send_monthly_email(trigger_source="manual", sent_by=None): diff --git a/tests/test_harvesting.py b/tests/test_harvesting.py index c0cb5c3..22d8c9d 100644 --- a/tests/test_harvesting.py +++ b/tests/test_harvesting.py @@ -9,102 +9,110 @@ os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'optimap.settings') django.setup() -from publications.tasks import parse_oai_xml_and_save_publications, harvest_oai_endpoint -from publications.models import Publication, Source, Schedule +from django.test import Client, TestCase +from publications.models import Publication, Source, HarvestingEvent, Schedule +import responses +import time from django.contrib.auth import get_user_model User = get_user_model() class SimpleTest(TestCase): - @responses.activate - def setUp(self): - self.client = Client() - # create a real user for tasks - self.user = User.objects.create_user( - username="testuser", - email="testuser@example.com", - password="password123" - ) +class SimpleTest(TestCase): - # Clear out any publications + @classmethod + @responses.activate + def setUpClass(cls): + super().setUpClass() Publication.objects.all().delete() - # harvest some sample OAI data - base = os.path.join(settings.BASE_DIR, 'tests', 'harvesting', 'source_1') - oai_path = os.path.join(base, 'oai_dc.xml') - art1_path = os.path.join(base, 'article_01.html') - art2_path = os.path.join(base, 'article_02.html') - - with open(oai_path) as oai,\ - open(art1_path) as a1,\ - open(art2_path) as a2: - # stub the HTTP fetches that parse_oai_xml_and_save_publications does + article01_path = os.path.join(os.getcwd(), 'tests', 'harvesting', 'journal_1', 'article_01.html') + article02_path = os.path.join(os.getcwd(), 'tests', 'harvesting', 'journal_1', 'article_02.html') + with open(article01_path) as f1, open(article02_path) as f2: responses.get( 'http://localhost:8330/index.php/opti-geo/article/view/1', - body=a1.read() + body=f1.read() ) responses.get( 'http://localhost:8330/index.php/opti-geo/article/view/2', - body=a2.read() + body=f2.read() ) - # run the parser against the OAI XML - with open(oai_path) as o: - added_count, spatial_count, temporal_count = parse_oai_xml_and_save_publications(o.read(), event=None) - self.assertEqual([added_count, spatial_count, temporal_count], [2, 2, 2], "parse_oai_xml_and_save_publications should have added two publications") - - # mark them as published so the API will expose them - Publication.objects.all().update(status="p") - - # fetch IDs from the API to use in individual‐publication tests - api = self.client.get('/api/v1/publications/').json() - fc = api['results']['features'] - if len(fc) >= 2: - self.id1, self.id2 = fc[1]['id'], fc[0]['id'] - elif len(fc) == 1: - self.id1 = self.id2 = fc[0]['id'] + src = Source.objects.create( + url_field="http://example.org/oai", + harvest_interval_minutes=60 + ) + event = HarvestingEvent.objects.create(source=src, status="in_progress") + + oai_path = os.path.join(os.getcwd(), 'tests', 'harvesting', 'journal_1', 'oai_dc.xml') + with open(oai_path, 'rb') as oai_file: + xml_bytes = oai_file.read() + + from publications.tasks import parse_oai_xml_and_save_publications + parse_oai_xml_and_save_publications(xml_bytes, event) + + Publication.objects.all().update(status="p") + + cls.user = User.objects.create_user( + username="testuser", + email="testuser@example.com", + password="password123" + ) + + @classmethod + def tearDownClass(cls): + Publication.objects.all().delete() + super().tearDownClass() + + def setUp(self): + self.client = Client() + results = self.client.get('/api/v1/publications/').json()['results'] + features = results.get('features', []) + if len(features) >= 2: + self.id1, self.id2 = features[1]['id'], features[0]['id'] + elif len(features) == 1: + self.id1 = self.id2 = features[0]['id'] else: self.id1 = self.id2 = None def test_api_root(self): - resp = self.client.get('/api/v1/publications/') - self.assertEqual(resp.status_code, 200) - self.assertEqual(resp['Content-Type'], 'application/json') - - results = resp.json()['results'] + response = self.client.get('/api/v1/publications/') + self.assertEqual(response.status_code, 200) + self.assertEqual(response.get('Content-Type'), 'application/json') + results = response.json()['results'] self.assertEqual(results['type'], 'FeatureCollection') self.assertEqual(len(results['features']), 2) def test_api_publication_1(self): - resp = self.client.get(f'/api/v1/publications/{self.id1}.json') - self.assertEqual(resp.status_code, 200) - self.assertEqual(resp['Content-Type'], 'application/json') - - body = resp.json() + response = self.client.get(f'/api/v1/publications/{self.id1}.json') + self.assertEqual(response.status_code, 200) + body = response.json() self.assertEqual(body['type'], 'Feature') - geom = body['geometry'] - self.assertEqual(geom['type'], 'GeometryCollection') - self.assertEqual(geom['geometries'][0]['type'], 'LineString') - - props = body['properties'] - self.assertEqual(props['title'], 'Test 1: One') - self.assertEqual(props['publicationDate'], '2022-07-01') - self.assertEqual(props['timeperiod_startdate'], ['2022-06-01']) + self.assertEqual(body['geometry']['type'], 'GeometryCollection') + self.assertEqual(body['geometry']['geometries'][0]['type'], 'LineString') + self.assertEqual(body['properties']['title'], 'Test 1: One') + self.assertEqual(body['properties']['publicationDate'], '2022-07-01') + self.assertEqual(body['properties']['timeperiod_startdate'], ['2022-06-01']) self.assertEqual( - props['url'], + body['properties']['url'], 'http://localhost:8330/index.php/opti-geo/article/view/1' ) def test_api_publication_2(self): - resp = self.client.get(f'/api/v1/publications/{self.id2}.json') - self.assertEqual(resp.status_code, 200) - self.assertEqual(resp['Content-Type'], 'application/json') - - body = resp.json() - geom = body['geometry'] - self.assertEqual(geom['type'], 'GeometryCollection') - self.assertEqual(geom['geometries'][0]['type'], 'Polygon') + response = self.client.get(f'/api/v1/publications/{self.id2}.json') + self.assertEqual(response.status_code, 200) + body = response.json() + self.assertEqual(body['type'], 'Feature') + self.assertEqual(body['geometry']['type'], 'GeometryCollection') + self.assertEqual(body['geometry']['geometries'][0]['type'], 'Polygon') + self.assertEqual(body['properties']['title'], 'Test 2: Two') + self.assertIsNone(body['properties']['doi']) + self.assertEqual(body['properties']['timeperiod_enddate'], ['2022-03-31']) + self.assertEqual( + body['properties']['url'], + 'http://localhost:8330/index.php/opti-geo/article/view/2' + ) props = body['properties'] self.assertEqual(props['title'], 'Test 2: Two') @@ -116,48 +124,18 @@ def test_api_publication_2(self): ) def test_task_scheduling(self): - # Create a Source pointing to the local OAI file - oai_file = os.path.join(os.getcwd(), 'tests', 'harvesting', 'source_1', 'oai_dc.xml') - src = Source.objects.create( - name="Local OAI", - url_field=f"file://{oai_file}", + # ensure the scheduling action still works + oai_file_path = os.path.join(os.getcwd(), "tests", "harvesting", "journal_1", "oai_dc.xml") + new_src = Source.objects.create( + url_field=f"file://{oai_file_path}", harvest_interval_minutes=60 ) - # allow the save() hook to schedule time.sleep(2) - - sched = Schedule.objects.filter(name=f"Harvest Source {src.id}") - self.assertTrue(sched.exists(), "Django-Q task not scheduled on save()") - - count = Publication.objects.count() - self.assertEqual(count, 2, "harvest_oai_endpoint created two publications") - - # run it explicitly again for the second time - added, spatial, temporal = harvest_oai_endpoint(src.id, self.user) - count = Publication.objects.count() - self.assertEqual(count, 2, "harvest_oai_endpoint created no new publications") - self.assertEqual([added, spatial, temporal], [0, 0, 0], "harvest_oai_endpoint created no new publications") - - # re-parse to check deduplication - with open(oai_file) as f: - xml = f.read() - parse_oai_xml_and_save_publications(xml, event=None) - parse_oai_xml_and_save_publications(xml, event=None) - self.assertEqual(Publication.objects.count(), count, - "Duplicate publications were created!") - - # ensure at least one DOI is valid - pubs_with_doi = Publication.objects.exclude(doi__isnull=True) - self.assertTrue(pubs_with_doi.exists()) - for p in pubs_with_doi: - self.assertTrue(p.doi.startswith("10."), - f"DOI is incorrectly formatted: {p.doi}") - - def test_no_duplicates_after_initial_harvest(self): - # exactly 2 from our sample OAI - self.assertEqual(Publication.objects.count(), 2) - resp = self.client.get('/api/v1/publications/') - feats = resp.json()['results']['features'] - titles = [f['properties']['title'] for f in feats] - self.assertEqual(len(titles), len(set(titles)), - "API returned duplicate feature titles") + schedule = Schedule.objects.filter(name=f"Harvest Source {new_src.id}") + self.assertTrue(schedule.exists(), "Django-Q task not scheduled for source.") + + def test_no_duplicates(self): + publications = Publication.objects.all() + self.assertEqual(publications.count(), 2, "Expected exactly 2 unique publications") + titles = [p.title for p in publications] + self.assertEqual(len(titles), len(set(titles)), "Duplicate titles found") From 290e841169915851fe02c4b00acbf5a6d2b01803 Mon Sep 17 00:00:00 2001 From: uxairibrar Date: Tue, 15 Jul 2025 03:57:34 +0200 Subject: [PATCH 02/14] Added oaipmh-scythe in requirement.txt --- requirements.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 2e92395..45d2256 100644 --- a/requirements.txt +++ b/requirements.txt @@ -38,4 +38,5 @@ pycryptodome==3.21.0 humanize==4.10.0 pyalex>=0.4.0 python-stdnum>=2.0.0 -geopy>=2.4.1 \ No newline at end of file +geopy>=2.4.1 +oaipmh-scythe>=1.2.0 From 7f43ce5ea8df78bcb30684e6e7e8990e039512af Mon Sep 17 00:00:00 2001 From: uxairibrar Date: Tue, 15 Jul 2025 04:01:31 +0200 Subject: [PATCH 03/14] Updated packages --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 45d2256..e4543e1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -39,4 +39,4 @@ humanize==4.10.0 pyalex>=0.4.0 python-stdnum>=2.0.0 geopy>=2.4.1 -oaipmh-scythe>=1.2.0 +oaipmh-scythe==0.13.0 From c40af54ab70dcb87445afcf8433409dbbef4f461 Mon Sep 17 00:00:00 2001 From: uxairibrar Date: Mon, 21 Jul 2025 14:48:49 +0200 Subject: [PATCH 04/14] Updated test case and harvesting function --- publications/tasks.py | 125 +++++++++++++++++++-------------------- tests/test_harvesting.py | 53 +++++++---------- 2 files changed, 82 insertions(+), 96 deletions(-) diff --git a/publications/tasks.py b/publications/tasks.py index 69b047f..ddeee8e 100644 --- a/publications/tasks.py +++ b/publications/tasks.py @@ -1,4 +1,4 @@ -import logging +mport logging import os import gzip import glob @@ -95,18 +95,16 @@ def extract_timeperiod_from_html(soup: BeautifulSoup): return ([start] if start else [None]), ([end] if end else [None]) # If missing, return [None] for start and [None] for end -def parse_oai_xml_and_save_publications(content, event): +def parse_oai_xml_and_save_publications(content, event: HarvestingEvent): source = event.source parsed = urlsplit(source.url_field) - # if we have raw XML bytes, parse directly if content: - DOMTree = xml.dom.minidom.parseString(content) - records = DOMTree.documentElement.getElementsByTagName("record") + dom = minidom.parseString(content) + records = dom.documentElement.getElementsByTagName("record") else: - # otherwise use Scythe to fetch & page through records base = urlunsplit((parsed.scheme, parsed.netloc, parsed.path, "", "")) - harvester = Scythe(base) - records = harvester.list_records(metadata_prefix="oai_dc") + with Scythe(base) as harvester: + records = harvester.list_records(metadata_prefix="oai_dc") if not records: logger.warning("No articles found in OAI-PMH response!") @@ -114,93 +112,89 @@ def parse_oai_xml_and_save_publications(content, event): for rec in records: try: - # for DOM‐parsed records, rec is an Element; for Scythe, rec.metadata is a dict if hasattr(rec, "metadata"): - # Scythe record identifiers = rec.metadata.get("identifier", []) + rec.metadata.get("relation", []) get_field = lambda k: rec.metadata.get(k, [""])[0] else: - # DOM record id_nodes = rec.getElementsByTagName("dc:identifier") - identifiers = [n.firstChild.nodeValue.strip() for n in id_nodes if n.firstChild] - get_field = lambda tag: ( - rec.getElementsByTagName(tag)[0].firstChild.nodeValue.strip() - if rec.getElementsByTagName(tag) and rec.getElementsByTagName(tag)[0].firstChild - else None - ) + identifiers = [ + n.firstChild.nodeValue.strip() + for n in id_nodes + if n.firstChild and n.firstChild.nodeValue + ] + def get_field(tag): + nodes = rec.getElementsByTagName(tag) + return nodes[0].firstChild.nodeValue.strip() if nodes and nodes[0].firstChild else None + + # pick a URL http_urls = [u for u in identifiers if u and u.lower().startswith("http")] view_urls = [u for u in http_urls if "/view/" in u] identifier_value = (view_urls or http_urls or [None])[0] - # metadata fields - title_value = get_field("title") or get_field("dc:title") + # core metadata + title_value = get_field("title") or get_field("dc:title") abstract_text = get_field("description") or get_field("dc:description") journal_value = get_field("publisher") or get_field("dc:publisher") date_value = get_field("date") or get_field("dc:date") - # DOI extraction + # extract DOI doi_text = None for u in identifiers: if u and (m := DOI_REGEX.search(u)): doi_text = m.group(0) break - # duplicate checks + # skip duplicates if doi_text and Publication.objects.filter(doi=doi_text).exists(): - logger.info("Skipping duplicate publication (DOI): %s", doi_text) + logger.info("Skipping duplicate (DOI): %s", doi_text) continue - if identifier and Publication.objects.filter(url=identifier).exists(): + if identifier_value and Publication.objects.filter(url=identifier_value).exists(): + logger.info("Skipping duplicate (URL): %s", identifier_value) continue if not identifier_value or not identifier_value.startswith("http"): - logger.warning("Skipping record with invalid URL: %s", identifier_value) + logger.warning("Skipping invalid URL: %s", identifier_value) continue - src = None - if publisher_name: - src, _ = Source.objects.get_or_create(name=publisher_name) + # ensure a Source instance for publication.source + if journal_value: + src_obj, _ = Source.objects.get_or_create(name=journal_value) + else: + src_obj = source - geom = None - ps_list = [None] - pe_list = [None] - + geom_obj = GeometryCollection() + period_start, period_end = [], [] try: - resp = requests.get(identifier, timeout=10) + resp = requests.get(identifier_value, timeout=10) resp.raise_for_status() soup = BeautifulSoup(resp.content, "html.parser") - geom = extract_geometry_from_html(soup) - if geom: - geom_object = geom - start_time, end_time = extract_timeperiod_from_html(soup) - period_start = start_time if isinstance(start_time, list) else [start_time] if start_time else [] - period_end = end_time if isinstance(end_time, list) else [end_time] if end_time else [] + if extracted := extract_geometry_from_html(soup): + geom_obj = extracted + ts, te = extract_timeperiod_from_html(soup) + if ts: period_start = ts + if te: period_end = te except Exception as fetch_err: logger.error("Error fetching HTML for %s: %s", identifier_value, fetch_err) - # save - pub = Publication( - title = title_value, - abstract = abstract_text, - publicationDate = date_value, - url = identifier_value, - doi = doi_text, - source = journal_value, - geometry = geom_object, - timeperiod_startdate = period_start, - timeperiod_enddate = period_end, - job = event + # finally, save the publication + pub = Publication.objects.create( + title = title_value, + abstract = abstract_text, + publicationDate = date_value, + url = identifier_value, + doi = doi_text, + source = src_obj, + status = "p", + geometry = geom_obj, + timeperiod_startdate = period_start, + timeperiod_enddate = period_end, + job = event, ) - - pub.save() + logger.info("Saved publication id=%s for %s", pub.id, identifier_value) except Exception as e: logger.error("Error parsing record: %s", e) continue - -def harvest_oai_endpoint(source_id: int, user=None) -> None: - """ - Fetch OAI-PMH feed (HTTP or file://), create a HarvestingEvent, - parse & save publications, send summary email, and mark completion. - """ +def harvest_oai_endpoint(source_id, user=None): source = Source.objects.get(id=source_id) event = HarvestingEvent.objects.create(source=source, status="in_progress") @@ -217,6 +211,7 @@ def harvest_oai_endpoint(source_id: int, user=None) -> None: new_count = Publication.objects.filter(job=event).count() spatial_count = Publication.objects.filter(job=event).exclude(geometry__isnull=True).count() temporal_count = Publication.objects.filter(job=event).exclude(timeperiod_startdate=[]).count() + subject = f"Harvesting Completed for {source.collection_name}" completed_str = event.completed_at.strftime('%Y-%m-%d %H:%M:%S') message = ( @@ -229,6 +224,7 @@ def harvest_oai_endpoint(source_id: int, user=None) -> None: f"Job started at: {event.started_at.strftime('%Y-%m-%d %H:%M:%S')}\n" f"Job completed at: {completed_str}\n" ) + if user and user.email: send_mail( subject, @@ -237,6 +233,8 @@ def harvest_oai_endpoint(source_id: int, user=None) -> None: [user.email], fail_silently=False, ) + + return new_count, spatial_count, temporal_count except Exception as e: logger.error("Harvesting failed for source %s: %s", source.url_field, str(e)) event.status = "failed" @@ -251,12 +249,8 @@ def harvest_oai_endpoint(source_id: int, user=None) -> None: [user.email], fail_silently=False, ) - - event.status = "completed" - event.completed_at = timezone.now() - event.save() - - return new_count, spatial_count, temporal_count + + return None, None, None def send_monthly_email(trigger_source="manual", sent_by=None): @@ -458,7 +452,8 @@ def convert_geojson_to_geopackage(geojson_path): return None -def regenerate_geopackage_cache(): +def regenerate_geopackage_cache(): return new_count, spatial_count, temporal_count + geojson_path = regenerate_geojson_cache() cache_dir = Path(geojson_path).parent gpkg_path = convert_geojson_to_geopackage(geojson_path) diff --git a/tests/test_harvesting.py b/tests/test_harvesting.py index 22d8c9d..e1b2a00 100644 --- a/tests/test_harvesting.py +++ b/tests/test_harvesting.py @@ -2,39 +2,39 @@ import django import time import responses -from django.test import Client, TransactionTestCase, TestCase -from django.conf import settings -from django.urls import reverse +from pathlib import Path +from django.test import Client, TestCase + # bootstrap Django os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'optimap.settings') django.setup() -from django.test import Client, TestCase from publications.models import Publication, Source, HarvestingEvent, Schedule -import responses -import time +from publications.tasks import parse_oai_xml_and_save_publications from django.contrib.auth import get_user_model User = get_user_model() - -class SimpleTest(TestCase): +BASE_TEST_DIR = Path(__file__).resolve().parent class SimpleTest(TestCase): @classmethod @responses.activate - def setUpClass(cls): - super().setUpClass() + def setUp(self): + super().setUp() + Publication.objects.all().delete() - article01_path = os.path.join(os.getcwd(), 'tests', 'harvesting', 'journal_1', 'article_01.html') - article02_path = os.path.join(os.getcwd(), 'tests', 'harvesting', 'journal_1', 'article_02.html') + article01_path = BASE_TEST_DIR / 'harvesting' / 'source_1' / 'article_01.html' + article02_path = BASE_TEST_DIR / 'harvesting' / 'source_1' / 'article_02.html' with open(article01_path) as f1, open(article02_path) as f2: - responses.get( + responses.add( + responses.GET, 'http://localhost:8330/index.php/opti-geo/article/view/1', body=f1.read() ) - responses.get( + responses.add( + responses.GET, 'http://localhost:8330/index.php/opti-geo/article/view/2', body=f2.read() ) @@ -45,28 +45,20 @@ def setUpClass(cls): ) event = HarvestingEvent.objects.create(source=src, status="in_progress") - oai_path = os.path.join(os.getcwd(), 'tests', 'harvesting', 'journal_1', 'oai_dc.xml') - with open(oai_path, 'rb') as oai_file: - xml_bytes = oai_file.read() - - from publications.tasks import parse_oai_xml_and_save_publications + oai_path = BASE_TEST_DIR / 'harvesting' / 'source_1' / 'oai_dc.xml' + xml_bytes = oai_path.read_bytes() parse_oai_xml_and_save_publications(xml_bytes, event) Publication.objects.all().update(status="p") - cls.user = User.objects.create_user( + self.user = User.objects.create_user( username="testuser", email="testuser@example.com", password="password123" ) - - @classmethod - def tearDownClass(cls): - Publication.objects.all().delete() - super().tearDownClass() - - def setUp(self): self.client = Client() + self.client.force_login(self.user) + results = self.client.get('/api/v1/publications/').json()['results'] features = results.get('features', []) if len(features) >= 2: @@ -124,15 +116,14 @@ def test_api_publication_2(self): ) def test_task_scheduling(self): - # ensure the scheduling action still works - oai_file_path = os.path.join(os.getcwd(), "tests", "harvesting", "journal_1", "oai_dc.xml") + oai_file_path = BASE_TEST_DIR / "harvesting" / "journal_1" / "oai_dc.xml" new_src = Source.objects.create( url_field=f"file://{oai_file_path}", harvest_interval_minutes=60 ) time.sleep(2) - schedule = Schedule.objects.filter(name=f"Harvest Source {new_src.id}") - self.assertTrue(schedule.exists(), "Django-Q task not scheduled for source.") + schedule_q = Schedule.objects.filter(name=f"Harvest Source {new_src.id}") + self.assertTrue(schedule_q.exists(), "Django-Q task not scheduled for source.") def test_no_duplicates(self): publications = Publication.objects.all() From 4f9028c3271ebed812e5886135ad8ed0a79f41ea Mon Sep 17 00:00:00 2001 From: uxairibrar Date: Tue, 23 Sep 2025 22:03:32 +0200 Subject: [PATCH 05/14] Updated the Harvesting --- publications/tasks.py | 109 ++++++++++++++++++++++++++++++++------- tests/test_harvesting.py | 88 ++++++++++++++++++++++++++++++- 2 files changed, 178 insertions(+), 19 deletions(-) diff --git a/publications/tasks.py b/publications/tasks.py index ddeee8e..8cdd2ce 100644 --- a/publications/tasks.py +++ b/publications/tasks.py @@ -97,21 +97,34 @@ def extract_timeperiod_from_html(soup: BeautifulSoup): def parse_oai_xml_and_save_publications(content, event: HarvestingEvent): source = event.source + logger.info("Starting OAI-PMH parsing for source: %s", source.name) parsed = urlsplit(source.url_field) + if content: + logger.debug("Parsing XML content from response") dom = minidom.parseString(content) records = dom.documentElement.getElementsByTagName("record") + logger.info("Found %d records in XML response", len(records)) else: base = urlunsplit((parsed.scheme, parsed.netloc, parsed.path, "", "")) + logger.debug("Using Scythe harvester for base URL: %s", base) with Scythe(base) as harvester: records = harvester.list_records(metadata_prefix="oai_dc") + logger.info("Retrieved records using Scythe harvester") if not records: logger.warning("No articles found in OAI-PMH response!") return + processed_count = 0 + saved_count = 0 + for rec in records: try: + processed_count += 1 + if processed_count % 10 == 0: + logger.debug("Processing record %d of %d", processed_count, len(records) if hasattr(records, '__len__') else '?') + if hasattr(rec, "metadata"): identifiers = rec.metadata.get("identifier", []) + rec.metadata.get("relation", []) get_field = lambda k: rec.metadata.get(k, [""])[0] @@ -137,43 +150,83 @@ def get_field(tag): journal_value = get_field("publisher") or get_field("dc:publisher") date_value = get_field("date") or get_field("dc:date") - # extract DOI + logger.debug("Processing publication: %s", title_value[:50] if title_value else 'No title') + + # extract DOI and ISSN doi_text = None + issn_text = None for u in identifiers: if u and (m := DOI_REGEX.search(u)): doi_text = m.group(0) break + # Try to extract ISSN from various fields + issn_candidates = [] + issn_candidates.extend(identifiers) # Check identifiers + issn_candidates.append(get_field("source") or get_field("dc:source")) # Check source field + issn_candidates.append(get_field("relation") or get_field("dc:relation")) # Check relation field + + for candidate in issn_candidates: + if candidate and len(candidate.replace('-', '')) == 8 and candidate.replace('-', '').isdigit(): + issn_text = candidate + break + # skip duplicates if doi_text and Publication.objects.filter(doi=doi_text).exists(): - logger.info("Skipping duplicate (DOI): %s", doi_text) + logger.debug("Skipping duplicate (DOI): %s", doi_text) continue if identifier_value and Publication.objects.filter(url=identifier_value).exists(): - logger.info("Skipping duplicate (URL): %s", identifier_value) + logger.debug("Skipping duplicate (URL): %s", identifier_value) continue if not identifier_value or not identifier_value.startswith("http"): - logger.warning("Skipping invalid URL: %s", identifier_value) + logger.debug("Skipping invalid URL: %s", identifier_value) continue # ensure a Source instance for publication.source - if journal_value: - src_obj, _ = Source.objects.get_or_create(name=journal_value) - else: - src_obj = source + src_obj = source # Default fallback + + if issn_text: + # First try to match by ISSN + try: + src_obj = Source.objects.get(issn_l=issn_text) + logger.debug("Matched source by ISSN %s: %s", issn_text, src_obj.name) + except Source.DoesNotExist: + # Create new source with ISSN if not found + if journal_value: + src_obj, created = Source.objects.get_or_create( + issn_l=issn_text, + defaults={'name': journal_value} + ) + if created: + logger.debug("Created new source with ISSN %s: %s", issn_text, journal_value) + else: + src_obj, created = Source.objects.get_or_create( + issn_l=issn_text, + defaults={'name': f'Unknown Journal (ISSN: {issn_text})'} + ) + if created: + logger.debug("Created new source with ISSN %s", issn_text) + elif journal_value: + # Fall back to journal name matching + src_obj, created = Source.objects.get_or_create(name=journal_value) + if created: + logger.debug("Created new source by name: %s", journal_value) geom_obj = GeometryCollection() period_start, period_end = [], [] try: + logger.debug("Fetching HTML content for geometry extraction: %s", identifier_value) resp = requests.get(identifier_value, timeout=10) resp.raise_for_status() soup = BeautifulSoup(resp.content, "html.parser") if extracted := extract_geometry_from_html(soup): geom_obj = extracted + logger.debug("Extracted geometry from HTML for: %s", identifier_value) ts, te = extract_timeperiod_from_html(soup) if ts: period_start = ts if te: period_end = te except Exception as fetch_err: - logger.error("Error fetching HTML for %s: %s", identifier_value, fetch_err) + logger.debug("Error fetching HTML for %s: %s", identifier_value, fetch_err) # finally, save the publication pub = Publication.objects.create( @@ -189,11 +242,15 @@ def get_field(tag): timeperiod_enddate = period_end, job = event, ) - logger.info("Saved publication id=%s for %s", pub.id, identifier_value) + saved_count += 1 + logger.info("Saved publication id=%s: %s", pub.id, title_value[:80] if title_value else 'No title') except Exception as e: - logger.error("Error parsing record: %s", e) + logger.error("Error parsing record %d: %s", processed_count, e) continue + + logger.info("OAI-PMH parsing completed for source %s: processed %d records, saved %d publications", + source.name, processed_count, saved_count) def harvest_oai_endpoint(source_id, user=None): source = Source.objects.get(id=source_id) event = HarvestingEvent.objects.create(source=source, status="in_progress") @@ -241,15 +298,31 @@ def harvest_oai_endpoint(source_id, user=None): event.completed_at = timezone.now() event.save() + # Send failure notification email to user if user and user.email: - send_mail( - "OPTIMAP Harvesting Failed", - "Harvesting failed for source %s: %s".format(source.url_field, str(e)) - settings.EMAIL_HOST_USER, - [user.email], - fail_silently=False, + failure_subject = f"Harvesting Failed for {source.collection_name or source.name}" + failure_message = ( + f"Unfortunately, the harvesting job failed for the following source:\n\n" + f"Source: {source.name}\n" + f"URL: {source.url_field}\n" + f"Collection: {source.collection_name or 'N/A'}\n" + f"Job started at: {event.started_at.strftime('%Y-%m-%d %H:%M:%S')}\n" + f"Job failed at: {event.completed_at.strftime('%Y-%m-%d %H:%M:%S')}\n" + f"Error details: {str(e)}\n\n" + f"Please check the source configuration and try again, or contact support if the issue persists." ) - + try: + send_mail( + failure_subject, + failure_message, + settings.EMAIL_HOST_USER, + [user.email], + fail_silently=False, + ) + logger.info("Failure notification email sent to %s", user.email) + except Exception as email_error: + logger.error("Failed to send failure notification email: %s", str(email_error)) + return None, None, None diff --git a/tests/test_harvesting.py b/tests/test_harvesting.py index e1b2a00..6d81d42 100644 --- a/tests/test_harvesting.py +++ b/tests/test_harvesting.py @@ -57,7 +57,6 @@ def setUp(self): password="password123" ) self.client = Client() - self.client.force_login(self.user) results = self.client.get('/api/v1/publications/').json()['results'] features = results.get('features', []) @@ -130,3 +129,90 @@ def test_no_duplicates(self): self.assertEqual(publications.count(), 2, "Expected exactly 2 unique publications") titles = [p.title for p in publications] self.assertEqual(len(titles), len(set(titles)), "Duplicate titles found") + + def test_invalid_xml_input(self): + src = Source.objects.create( + url_field="http://example.org/invalid", + harvest_interval_minutes=60 + ) + event = HarvestingEvent.objects.create(source=src, status="in_progress") + + invalid_xml = b'malformed xml without proper closing' + initial_count = Publication.objects.count() + + parse_oai_xml_and_save_publications(invalid_xml, event) + + self.assertEqual(Publication.objects.count(), initial_count) + + def test_empty_xml_input(self): + """Test harvesting with empty XML input""" + src = Source.objects.create( + url_field="http://example.org/empty", + harvest_interval_minutes=60 + ) + event = HarvestingEvent.objects.create(source=src, status="in_progress") + + empty_xml = b'' + initial_count = Publication.objects.count() + + parse_oai_xml_and_save_publications(empty_xml, event) + + self.assertEqual(Publication.objects.count(), initial_count) + + def test_xml_with_no_records(self): + """Test harvesting with valid XML but no record elements""" + src = Source.objects.create( + url_field="http://example.org/norecords", + harvest_interval_minutes=60 + ) + event = HarvestingEvent.objects.create(source=src, status="in_progress") + + no_records_xml = b''' + + 2024-01-01T00:00:00Z + http://example.org/oai + + + + ''' + + initial_count = Publication.objects.count() + + parse_oai_xml_and_save_publications(no_records_xml, event) + + self.assertEqual(Publication.objects.count(), initial_count) + + def test_xml_with_invalid_record_data(self): + src = Source.objects.create( + url_field="http://example.org/invaliddata", + harvest_interval_minutes=60 + ) + event = HarvestingEvent.objects.create(source=src, status="in_progress") + + # XML with record but missing required fields + invalid_data_xml = b''' + + 2024-01-01T00:00:00Z + http://example.org/oai + + +
+ oai:example.org:123 + 2024-01-01 +
+ + + + Some description + + +
+
+
''' + + initial_count = Publication.objects.count() + + parse_oai_xml_and_save_publications(invalid_data_xml, event) + + self.assertEqual(Publication.objects.count(), initial_count) From 13be284f5edbe3e3ed6cb7c436d5772d0f6108ad Mon Sep 17 00:00:00 2001 From: uxairibrar Date: Tue, 23 Sep 2025 22:26:49 +0200 Subject: [PATCH 06/14] Added Try Catch --- publications/tasks.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/publications/tasks.py b/publications/tasks.py index 8cdd2ce..15a1817 100644 --- a/publications/tasks.py +++ b/publications/tasks.py @@ -102,9 +102,14 @@ def parse_oai_xml_and_save_publications(content, event: HarvestingEvent): if content: logger.debug("Parsing XML content from response") - dom = minidom.parseString(content) - records = dom.documentElement.getElementsByTagName("record") - logger.info("Found %d records in XML response", len(records)) + try: + dom = minidom.parseString(content) + records = dom.documentElement.getElementsByTagName("record") + logger.info("Found %d records in XML response", len(records)) + except Exception as e: + logger.error("Failed to parse XML content: %s", str(e)) + logger.warning("No articles found in OAI-PMH response!") + return else: base = urlunsplit((parsed.scheme, parsed.netloc, parsed.path, "", "")) logger.debug("Using Scythe harvester for base URL: %s", base) From fec3e525ee127aa256846161ebfb66c6db887326 Mon Sep 17 00:00:00 2001 From: uxairibrar Date: Tue, 23 Sep 2025 22:43:52 +0200 Subject: [PATCH 07/14] Updated Test --- publications/tasks.py | 68 +++++++++++++++++----------- tests/test_harvesting.py | 95 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 137 insertions(+), 26 deletions(-) diff --git a/publications/tasks.py b/publications/tasks.py index 15a1817..9424228 100644 --- a/publications/tasks.py +++ b/publications/tasks.py @@ -25,6 +25,7 @@ from django.core.serializers import serialize from django.core.mail import send_mail, EmailMessage from django.utils import timezone +from django.db import transaction from django.contrib.gis.geos import GEOSGeometry, GeometryCollection from django_q.tasks import schedule from django_q.models import Schedule @@ -95,12 +96,12 @@ def extract_timeperiod_from_html(soup: BeautifulSoup): return ([start] if start else [None]), ([end] if end else [None]) # If missing, return [None] for start and [None] for end -def parse_oai_xml_and_save_publications(content, event: HarvestingEvent): +def parse_oai_xml_and_save_publications(content, event: HarvestingEvent, max_records=None): source = event.source logger.info("Starting OAI-PMH parsing for source: %s", source.name) parsed = urlsplit(source.url_field) - if content: + if content and len(content.strip()) > 0: logger.debug("Parsing XML content from response") try: dom = minidom.parseString(content) @@ -111,16 +112,20 @@ def parse_oai_xml_and_save_publications(content, event: HarvestingEvent): logger.warning("No articles found in OAI-PMH response!") return else: - base = urlunsplit((parsed.scheme, parsed.netloc, parsed.path, "", "")) - logger.debug("Using Scythe harvester for base URL: %s", base) - with Scythe(base) as harvester: - records = harvester.list_records(metadata_prefix="oai_dc") - logger.info("Retrieved records using Scythe harvester") + logger.warning("Empty or no content provided - cannot harvest") + return if not records: logger.warning("No articles found in OAI-PMH response!") return + if max_records and hasattr(records, '__len__'): + records = records[:max_records] + logger.info("Limited to first %d records", max_records) + elif max_records: + records = list(records)[:max_records] + logger.info("Limited to first %d records", max_records) + processed_count = 0 saved_count = 0 @@ -233,22 +238,26 @@ def get_field(tag): except Exception as fetch_err: logger.debug("Error fetching HTML for %s: %s", identifier_value, fetch_err) - # finally, save the publication - pub = Publication.objects.create( - title = title_value, - abstract = abstract_text, - publicationDate = date_value, - url = identifier_value, - doi = doi_text, - source = src_obj, - status = "p", - geometry = geom_obj, - timeperiod_startdate = period_start, - timeperiod_enddate = period_end, - job = event, - ) - saved_count += 1 - logger.info("Saved publication id=%s: %s", pub.id, title_value[:80] if title_value else 'No title') + try: + with transaction.atomic(): + pub = Publication.objects.create( + title = title_value, + abstract = abstract_text, + publicationDate = date_value, + url = identifier_value, + doi = doi_text, + source = src_obj, + status = "p", + geometry = geom_obj, + timeperiod_startdate = period_start, + timeperiod_enddate = period_end, + job = event, + ) + saved_count += 1 + logger.info("Saved publication id=%s: %s", pub.id, title_value[:80] if title_value else 'No title') + except Exception as save_err: + logger.error("Failed to save publication '%s': %s", title_value[:80] if title_value else 'No title', save_err) + continue except Exception as e: logger.error("Error parsing record %d: %s", processed_count, e) @@ -256,15 +265,22 @@ def get_field(tag): logger.info("OAI-PMH parsing completed for source %s: processed %d records, saved %d publications", source.name, processed_count, saved_count) -def harvest_oai_endpoint(source_id, user=None): +def harvest_oai_endpoint(source_id, user=None, max_records=None): source = Source.objects.get(id=source_id) event = HarvestingEvent.objects.create(source=source, status="in_progress") try: - response = requests.get(source.url_field) + # Construct proper OAI-PMH URL + if '?' not in source.url_field: + oai_url = f"{source.url_field}?verb=ListRecords&metadataPrefix=oai_dc" + else: + oai_url = source.url_field + + logger.info("Fetching from OAI-PMH URL: %s", oai_url) + response = requests.get(oai_url) response.raise_for_status() - parse_oai_xml_and_save_publications(response.content, event) + parse_oai_xml_and_save_publications(response.content, event, max_records=max_records) event.status = "completed" event.completed_at = timezone.now() diff --git a/tests/test_harvesting.py b/tests/test_harvesting.py index 6d81d42..61702fa 100644 --- a/tests/test_harvesting.py +++ b/tests/test_harvesting.py @@ -216,3 +216,98 @@ def test_xml_with_invalid_record_data(self): parse_oai_xml_and_save_publications(invalid_data_xml, event) self.assertEqual(Publication.objects.count(), initial_count) + + def test_real_journal_harvesting_essd(self): + """Test harvesting from actual ESSD Copernicus endpoint""" + from publications.tasks import harvest_oai_endpoint + + # Clear existing publications for clean test + Publication.objects.all().delete() + + src = Source.objects.create( + url_field="https://oai-pmh.copernicus.org/oai.php?verb=ListRecords&metadataPrefix=oai_dc&set=essd", + harvest_interval_minutes=1440, + name="ESSD Copernicus" + ) + + initial_count = Publication.objects.count() + + # Harvest from real endpoint with limit + harvest_oai_endpoint(src.id, max_records=3) + + # Should have harvested some publications + final_count = Publication.objects.count() + self.assertGreater(final_count, initial_count, "Should harvest at least some publications from ESSD") + self.assertLessEqual(final_count - initial_count, 3, "Should not exceed max_records limit") + + # Verify ESSD publications were created + essd_pubs = Publication.objects.filter(source=src) + for pub in essd_pubs: + self.assertIsNotNone(pub.title, f"Publication {pub.id} missing title") + self.assertIsNotNone(pub.url, f"Publication {pub.id} missing URL") + # ESSD should have DOIs with Copernicus prefix + if pub.doi: + self.assertIn("10.5194", pub.doi, "ESSD DOIs should contain Copernicus prefix") + + def test_real_journal_harvesting_geo_leo(self): + """Test harvesting from actual GEO-LEO e-docs endpoint""" + from publications.tasks import harvest_oai_endpoint + + # Clear existing publications for clean test + Publication.objects.all().delete() + + src = Source.objects.create( + url_field="https://e-docs.geo-leo.de/server/oai/request", + harvest_interval_minutes=1440, + name="GEO-LEO e-docs" + ) + + initial_count = Publication.objects.count() + + # Harvest from real endpoint with limit + harvest_oai_endpoint(src.id, max_records=5) + + # Should have harvested some publications + final_count = Publication.objects.count() + self.assertGreater(final_count, initial_count, "Should harvest at least some publications from GEO-LEO") + self.assertLessEqual(final_count - initial_count, 5, "Should not exceed max_records limit") + + # Verify GEO-LEO publications were created + geo_leo_pubs = Publication.objects.filter(source=src) + for pub in geo_leo_pubs: + self.assertIsNotNone(pub.title, f"Publication {pub.id} missing title") + self.assertIsNotNone(pub.url, f"Publication {pub.id} missing URL") + + def test_real_journal_harvesting_agile_giss(self): + """Test harvesting from actual AGILE-GISS endpoint""" + from publications.tasks import harvest_oai_endpoint + + # Clear existing publications for clean test + Publication.objects.all().delete() + + src = Source.objects.create( + url_field="https://www.agile-giscience-series.net", + harvest_interval_minutes=1440, + name="AGILE-GISS" + ) + + initial_count = Publication.objects.count() + + # Note: This may fail if AGILE doesn't have OAI-PMH endpoint + try: + harvest_oai_endpoint(src.id, max_records=3) + + # Should have harvested some publications + final_count = Publication.objects.count() + self.assertGreater(final_count, initial_count, "Should harvest at least some publications from AGILE-GISS") + self.assertLessEqual(final_count - initial_count, 3, "Should not exceed max_records limit") + + # Verify AGILE publications were created + agile_pubs = Publication.objects.filter(source=src) + for pub in agile_pubs: + self.assertIsNotNone(pub.title, f"Publication {pub.id} missing title") + self.assertIsNotNone(pub.url, f"Publication {pub.id} missing URL") + except Exception as e: + # Skip test if AGILE doesn't have OAI-PMH endpoint + self.skipTest(f"AGILE-GISS endpoint not available: {e}") + From 640003325ede705e75d66c567ceb465f1397ce7c Mon Sep 17 00:00:00 2001 From: uxairibrar Date: Wed, 24 Sep 2025 00:38:38 +0200 Subject: [PATCH 08/14] Updated Harvesting --- publications/tasks.py | 37 +++++++++++++++++++++++++++++++++---- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/publications/tasks.py b/publications/tasks.py index 9424228..d36261b 100644 --- a/publications/tasks.py +++ b/publications/tasks.py @@ -55,6 +55,34 @@ def _get_article_link(pub): return f"{base}/work/{pub.doi}" return pub.url + +def parse_publication_date(date_string): + if not date_string: + return None + + date_string = date_string.strip() + + # Already in correct format + if re.match(r'^\d{4}-\d{2}-\d{2}$', date_string): + return date_string + + # YYYY-MM format - add day + if re.match(r'^\d{4}-\d{2}$', date_string): + return f"{date_string}-01" + + # YYYY format - add month and day + if re.match(r'^\d{4}$', date_string): + return f"{date_string}-01-01" + + # Try to extract year from other formats + year_match = re.search(r'\b(\d{4})\b', date_string) + if year_match: + return f"{year_match.group(1)}-01-01" + + logger.warning("Could not parse date format: %s", date_string) + return None + + def generate_data_dump_filename(extension: str) -> str: ts = datetime.now(dt_timezone.utc).strftime("%Y%m%dT%H%M%S") return f"optimap_data_dump_{ts}.{extension}" @@ -158,7 +186,8 @@ def get_field(tag): title_value = get_field("title") or get_field("dc:title") abstract_text = get_field("description") or get_field("dc:description") journal_value = get_field("publisher") or get_field("dc:publisher") - date_value = get_field("date") or get_field("dc:date") + raw_date_value = get_field("date") or get_field("dc:date") + date_value = parse_publication_date(raw_date_value) logger.debug("Processing publication: %s", title_value[:50] if title_value else 'No title') @@ -247,7 +276,7 @@ def get_field(tag): url = identifier_value, doi = doi_text, source = src_obj, - status = "p", + status = "h", geometry = geom_obj, timeperiod_startdate = period_start, timeperiod_enddate = period_end, @@ -344,8 +373,8 @@ def harvest_oai_endpoint(source_id, user=None, max_records=None): except Exception as email_error: logger.error("Failed to send failure notification email: %s", str(email_error)) - return None, None, None - + # If we reach here, harvesting failed + return None, None, None def send_monthly_email(trigger_source="manual", sent_by=None): """ From 123062bae02f60e679021cefbd7e8099fcd9db8d Mon Sep 17 00:00:00 2001 From: uxairibrar Date: Wed, 24 Sep 2025 01:08:06 +0200 Subject: [PATCH 09/14] Resolved Test Issue --- publications/tasks.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/publications/tasks.py b/publications/tasks.py index d36261b..e4f8701 100644 --- a/publications/tasks.py +++ b/publications/tasks.py @@ -298,6 +298,10 @@ def harvest_oai_endpoint(source_id, user=None, max_records=None): source = Source.objects.get(id=source_id) event = HarvestingEvent.objects.create(source=source, status="in_progress") + new_count = None + spatial_count = None + temporal_count = None + try: # Construct proper OAI-PMH URL if '?' not in source.url_field: @@ -340,8 +344,7 @@ def harvest_oai_endpoint(source_id, user=None, max_records=None): [user.email], fail_silently=False, ) - - return new_count, spatial_count, temporal_count + except Exception as e: logger.error("Harvesting failed for source %s: %s", source.url_field, str(e)) event.status = "failed" @@ -373,8 +376,7 @@ def harvest_oai_endpoint(source_id, user=None, max_records=None): except Exception as email_error: logger.error("Failed to send failure notification email: %s", str(email_error)) - # If we reach here, harvesting failed - return None, None, None + return new_count, spatial_count, temporal_count def send_monthly_email(trigger_source="manual", sent_by=None): """ From 55ff078ca10e32ad0f9cb2a40d3caccbf3821e49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20N=C3=BCst?= Date: Thu, 9 Oct 2025 20:07:09 +0200 Subject: [PATCH 10/14] fix tests --- .claude/settings.local.json | 11 +++++++++++ optimap/__init__.py | 2 +- publications/tasks.py | 7 +++---- tests/test_harvesting.py | 1 - tests/test_regular_harvesting.py | 6 +++--- 5 files changed, 18 insertions(+), 9 deletions(-) create mode 100644 .claude/settings.local.json diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..78dafa8 --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,11 @@ +{ + "permissions": { + "allow": [ + "Bash(tee:*)", + "Bash(git checkout:*)", + "Bash(pip install:*)" + ], + "deny": [], + "ask": [] + } +} \ No newline at end of file diff --git a/optimap/__init__.py b/optimap/__init__.py index c905432..b08e2c7 100644 --- a/optimap/__init__.py +++ b/optimap/__init__.py @@ -1,2 +1,2 @@ -__version__ = "0.2.0" +__version__ = "0.3.0" VERSION = __version__ \ No newline at end of file diff --git a/publications/tasks.py b/publications/tasks.py index e4f8701..e108153 100644 --- a/publications/tasks.py +++ b/publications/tasks.py @@ -1,4 +1,4 @@ -mport logging +import logging import os import gzip import glob @@ -120,8 +120,8 @@ def extract_timeperiod_from_html(soup: BeautifulSoup): parts = tag["content"].split("/") end = parts[1] if len(parts) > 1 and parts[1] else None start = parts[0] if parts[0] else None - return [None], [None] return ([start] if start else [None]), ([end] if end else [None]) # If missing, return [None] for start and [None] for end + return [None], [None] def parse_oai_xml_and_save_publications(content, event: HarvestingEvent, max_records=None): @@ -577,8 +577,7 @@ def convert_geojson_to_geopackage(geojson_path): return None -def regenerate_geopackage_cache(): return new_count, spatial_count, temporal_count - +def regenerate_geopackage_cache(): geojson_path = regenerate_geojson_cache() cache_dir = Path(geojson_path).parent gpkg_path = convert_geojson_to_geopackage(geojson_path) diff --git a/tests/test_harvesting.py b/tests/test_harvesting.py index 61702fa..03bdb76 100644 --- a/tests/test_harvesting.py +++ b/tests/test_harvesting.py @@ -18,7 +18,6 @@ class SimpleTest(TestCase): - @classmethod @responses.activate def setUp(self): super().setUp() diff --git a/tests/test_regular_harvesting.py b/tests/test_regular_harvesting.py index 3a13b56..cc4cae5 100644 --- a/tests/test_regular_harvesting.py +++ b/tests/test_regular_harvesting.py @@ -41,12 +41,12 @@ def test_harvest_regular_metadata_sends_email(self, mock_parser, mock_get): fake_response.content = b"" mock_get.return_value = fake_response - def fake_parser_func(content, event): + def fake_parser_func(content, event, max_records=None): Publication.objects.create( title="Test Publication 1", doi="10.1000/1", job=event, - timeperiod_startdate=[], + timeperiod_startdate=[], timeperiod_enddate=[], geometry=None ) @@ -54,7 +54,7 @@ def fake_parser_func(content, event): title="Test Publication 2", doi="10.1000/2", job=event, - timeperiod_startdate=[], + timeperiod_startdate=[], timeperiod_enddate=[], geometry=None ) From 46ef1fd5206f3a47dc26647d68abfc5e3ef3c3c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20N=C3=BCst?= Date: Thu, 9 Oct 2025 21:26:31 +0200 Subject: [PATCH 11/14] Adds unit and integration test setup for harvesting real journals --- .claude/settings.local.json | 4 +- README.md | 39 ++++++ pytest.ini | 22 +++ tests/test_real_harvesting.py | 248 ++++++++++++++++++++++++++++++++++ 4 files changed, 312 insertions(+), 1 deletion(-) create mode 100644 pytest.ini create mode 100644 tests/test_real_harvesting.py diff --git a/.claude/settings.local.json b/.claude/settings.local.json index 78dafa8..01c3d81 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -3,7 +3,9 @@ "allow": [ "Bash(tee:*)", "Bash(git checkout:*)", - "Bash(pip install:*)" + "Bash(pip install:*)", + "Bash(gh issue view:*)", + "Bash(pytest:*)" ], "deny": [], "ask": [] diff --git a/README.md b/README.md index fd351f8..44aaffa 100644 --- a/README.md +++ b/README.md @@ -265,6 +265,10 @@ UI tests are based on [Helium](https://github.com/mherrmann/selenium-python-heli pip install -r requirements-dev.txt ``` +#### Unit Tests + +Run all unit tests: + ```bash python manage.py test tests @@ -275,6 +279,41 @@ python -Wa manage.py test OPTIMAP_LOGGING_LEVEL=WARNING python manage.py test tests ``` +#### Integration Tests (Real Harvesting) + +Integration tests that harvest from live OAI-PMH endpoints are disabled by default to avoid network dependencies and slow test execution. These tests verify harvesting from real journal sources. + +Run all integration tests: + +```bash +# Enable real harvesting tests +SKIP_REAL_HARVESTING=0 python manage.py test tests.test_real_harvesting +``` + +Run a specific journal test: + +```bash +# Test ESSD harvesting +SKIP_REAL_HARVESTING=0 python manage.py test tests.test_real_harvesting.RealHarvestingTest.test_harvest_essd + +# Test GEO-LEO harvesting +SKIP_REAL_HARVESTING=0 python manage.py test tests.test_real_harvesting.RealHarvestingTest.test_harvest_geo_leo +``` + +Show skipped tests (these are skipped by default): + +```bash +# Run with verbose output to see skip reasons +python manage.py test tests.test_real_harvesting -v 2 +``` + +**Supported journals**: + +- Earth System Science Data (ESSD) - [Issue #59](https://github.com/GeoinformationSystems/optimap/issues/59) +- AGILE-GISS - [Issue #60](https://github.com/GeoinformationSystems/optimap/issues/60) +- GEO-LEO e-docs - [Issue #13](https://github.com/GeoinformationSystems/optimap/issues/13) +- ESS Open Archive (EssOAr) - [Issue #99](https://github.com/GeoinformationSystems/optimap/issues/99) _(endpoint needs confirmation)_ + ### Run UI tests Running UI tests needs either compose configuration or a manage.py runserver in a seperate shell. diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..6bf0d07 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,22 @@ +[pytest] +# Pytest configuration for OPTIMAP + +# Test discovery patterns +python_files = test_*.py +python_classes = Test* *Test *TestCase +python_functions = test_* + +# Markers for test categorization +markers = + real_harvesting: Integration tests that harvest from real OAI-PMH endpoints (use -m real_harvesting to run) + slow: Slow-running tests (deselect with -m "not slow") + network: Tests requiring network access + +# Output options +addopts = + -v + --strict-markers + --tb=short + +# Django settings +DJANGO_SETTINGS_MODULE = optimap.settings diff --git a/tests/test_real_harvesting.py b/tests/test_real_harvesting.py new file mode 100644 index 0000000..670a3f8 --- /dev/null +++ b/tests/test_real_harvesting.py @@ -0,0 +1,248 @@ +""" +Integration tests for harvesting real journal sources. + +These tests perform actual HTTP requests to live OAI-PMH endpoints +and are skipped by default to avoid network dependencies and slow test runs. + +To run these tests: + SKIP_REAL_HARVESTING=0 python manage.py test tests.test_real_harvesting + +To run a specific test: + SKIP_REAL_HARVESTING=0 python manage.py test tests.test_real_harvesting.RealHarvestingTest.test_harvest_essd + +Environment variables: + SKIP_REAL_HARVESTING=0 - Enable real harvesting tests (default: skip) +""" + +import os +import django +from unittest import skipIf +from django.test import TestCase + +# bootstrap Django +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'optimap.settings') +django.setup() + +from publications.models import Publication, Source, HarvestingEvent +from publications.tasks import harvest_oai_endpoint +from django.contrib.auth import get_user_model + +User = get_user_model() + +# Skip these tests by default unless SKIP_REAL_HARVESTING=0 +SKIP_REAL_HARVESTING = os.environ.get('SKIP_REAL_HARVESTING', '1') == '1' +skip_reason = "Real harvesting tests disabled. Set SKIP_REAL_HARVESTING=0 to enable." + + +@skipIf(SKIP_REAL_HARVESTING, skip_reason) +class RealHarvestingTest(TestCase): + """ + Integration tests for harvesting from real journal OAI-PMH endpoints. + + These tests verify that: + 1. The OAI-PMH endpoint is accessible + 2. Publications are successfully parsed and saved + 3. Metadata extraction works for real-world data + 4. The harvesting event completes successfully + + Each test limits harvesting to ~20 records to keep runtime reasonable. + """ + + def setUp(self): + """Set up test user for harvesting events.""" + self.user = User.objects.create_user( + username="harvesting_test_user", + email="harvesting@test.optimap.science", + password="test_password" + ) + + def tearDown(self): + """Clean up created publications and sources.""" + Publication.objects.filter(source__name__startswith="TEST: ").delete() + Source.objects.filter(name__startswith="TEST: ").delete() + + def _create_source(self, name, url, collection_name=None): + """Helper to create a test source.""" + return Source.objects.create( + name=f"TEST: {name}", + url_field=url, + collection_name=collection_name or name, + harvest_interval_minutes=60 * 24 * 7 # Weekly + ) + + def _assert_successful_harvest(self, source, min_publications=1): + """ + Assert that harvesting completed successfully with expected results. + + Args: + source: Source model instance + min_publications: Minimum number of publications expected + """ + # Get the latest harvesting event + event = HarvestingEvent.objects.filter(source=source).latest("started_at") + + # Check event completed successfully + self.assertEqual( + event.status, + "completed", + f"Harvesting event failed with status: {event.status}" + ) + self.assertIsNotNone(event.completed_at, "Harvesting event has no completion time") + + # Check publications were created + pub_count = Publication.objects.filter(job=event).count() + self.assertGreaterEqual( + pub_count, + min_publications, + f"Expected at least {min_publications} publications, got {pub_count}" + ) + + # Check that publications have required fields + pubs = Publication.objects.filter(job=event) + for pub in pubs[:5]: # Check first 5 + self.assertTrue( + pub.title, + f"Publication {pub.id} missing title" + ) + # DOI is optional but should be present for most journals + # Geometry and temporal data are optional + + return pub_count + + def test_harvest_essd(self): + """ + Test harvesting from Earth System Science Data (ESSD). + + Issue: https://github.com/GeoinformationSystems/optimap/issues/59 + Journal: https://essd.copernicus.org/ + """ + source = self._create_source( + name="Earth System Science Data", + url="https://oai-pmh.copernicus.org/oai.php", + collection_name="ESSD" + ) + + # Harvest with limit of 20 records + harvest_oai_endpoint(source.id, user=self.user, max_records=20) + + # Verify successful harvest + pub_count = self._assert_successful_harvest(source, min_publications=10) + print(f"\n✓ ESSD: Harvested {pub_count} publications") + + def test_harvest_agile_giss(self): + """ + Test harvesting from AGILE-GISS conference series. + + Issue: https://github.com/GeoinformationSystems/optimap/issues/60 + Journal: https://www.agile-giscience-series.net/ + """ + source = self._create_source( + name="AGILE-GISS", + url="https://oai-pmh.copernicus.org/oai.php", + collection_name="AGILE-GISS" + ) + + # Harvest with limit of 20 records + harvest_oai_endpoint(source.id, user=self.user, max_records=20) + + # Verify successful harvest + pub_count = self._assert_successful_harvest(source, min_publications=10) + print(f"\n✓ AGILE-GISS: Harvested {pub_count} publications") + + def test_harvest_geo_leo(self): + """ + Test harvesting from GEO-LEO e-docs repository. + + Issue: https://github.com/GeoinformationSystems/optimap/issues/13 + Repository: https://e-docs.geo-leo.de/ + """ + source = self._create_source( + name="GEO-LEO e-docs", + url="https://e-docs.geo-leo.de/server/oai/request", + collection_name="GEO-LEO" + ) + + # Harvest with limit of 20 records + harvest_oai_endpoint(source.id, user=self.user, max_records=20) + + # Verify successful harvest + pub_count = self._assert_successful_harvest(source, min_publications=5) + print(f"\n✓ GEO-LEO: Harvested {pub_count} publications") + + @skipIf(True, "EssOAr OAI-PMH endpoint not yet confirmed") + def test_harvest_essoar(self): + """ + Test harvesting from ESS Open Archive (EssOAr). + + Issue: https://github.com/GeoinformationSystems/optimap/issues/99 + Repository: https://essopenarchive.org/ + + Note: OAI-PMH endpoint needs to be confirmed. + """ + # Placeholder - needs endpoint URL + source = self._create_source( + name="ESS Open Archive", + url="https://essopenarchive.org/oai/request", # To be confirmed + collection_name="EssOAr" + ) + + harvest_oai_endpoint(source.id, user=self.user, max_records=20) + pub_count = self._assert_successful_harvest(source, min_publications=5) + print(f"\n✓ EssOAr: Harvested {pub_count} publications") + + def test_harvest_respects_max_records(self): + """ + Test that max_records parameter properly limits harvesting. + + Uses ESSD as a test source known to have many records. + """ + source = self._create_source( + name="ESSD (limited)", + url="https://oai-pmh.copernicus.org/oai.php", + collection_name="ESSD" + ) + + # Harvest with very small limit + max_records = 5 + harvest_oai_endpoint(source.id, user=self.user, max_records=max_records) + + # Verify we got exactly the requested number (or slightly more due to batching) + event = HarvestingEvent.objects.filter(source=source).latest("started_at") + pub_count = Publication.objects.filter(job=event).count() + + self.assertLessEqual( + pub_count, + max_records + 10, # Allow some tolerance for batch processing + f"Harvested {pub_count} publications, expected around {max_records}" + ) + print(f"\n✓ max_records: Harvested {pub_count} publications (limit was {max_records})") + + def test_harvest_with_metadata_extraction(self): + """ + Test that spatial/temporal metadata is extracted when available. + + Uses GEO-LEO which should have some geospatial metadata. + """ + source = self._create_source( + name="GEO-LEO (metadata test)", + url="https://e-docs.geo-leo.de/server/oai/request", + collection_name="GEO-LEO" + ) + + harvest_oai_endpoint(source.id, user=self.user, max_records=20) + + event = HarvestingEvent.objects.filter(source=source).latest("started_at") + pubs = Publication.objects.filter(job=event) + + # Check if any publications have spatial metadata + spatial_count = pubs.exclude(geometry__isnull=True).count() + + # Check if any publications have temporal metadata + temporal_count = pubs.exclude(timeperiod_startdate=[]).count() + + print(f"\n✓ Metadata extraction: {spatial_count} with geometry, " + f"{temporal_count} with temporal data out of {pubs.count()} total") + + # We don't assert specific counts since metadata availability varies, + # but we verify the harvesting completed successfully + self.assertEqual(event.status, "completed") From 085bfa69c60b669545c9840079024bdbf2a0c3da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20N=C3=BCst?= Date: Thu, 9 Oct 2025 21:26:44 +0200 Subject: [PATCH 12/14] Adds journal harvesting management command --- README.md | 62 ++++ .../management/commands/harvest_journals.py | 269 ++++++++++++++++++ 2 files changed, 331 insertions(+) create mode 100644 publications/management/commands/harvest_journals.py diff --git a/README.md b/README.md index 44aaffa..797af58 100644 --- a/README.md +++ b/README.md @@ -144,6 +144,11 @@ python manage.py qcluster # If you want to use the predefined feeds for continents and oceans we need to load the geometries for global regions python manage.py load_global_regions +# Harvest publications from real OAI-PMH journal sources +python manage.py harvest_journals --list # List available journals +python manage.py harvest_journals --all --max-records 20 # Harvest all journals (limited to 20 records each) +python manage.py harvest_journals --journal essd --journal geo-leo # Harvest specific journals + # Start the Django development server python manage.py runserver @@ -233,6 +238,63 @@ OPTIMAP_EMAIL_PORT=5587 Visit the URL - http://127.0.0.1:8000/articles/links/ +### Harvest Publications from Real Journals + +The `harvest_journals` management command allows you to harvest publications from real OAI-PMH journal sources directly into your database. This is useful for: + +- Populating your database with real data for testing and development +- Testing harvesting functionality against live endpoints +- Initial data loading for production deployment + +**List available journals**: + +```bash +python manage.py harvest_journals --list +``` + +**Harvest all configured journals** (with record limit): + +```bash +python manage.py harvest_journals --all --max-records 50 +``` + +**Harvest specific journals**: + +```bash +# Single journal +python manage.py harvest_journals --journal essd --max-records 100 + +# Multiple journals +python manage.py harvest_journals --journal essd --journal geo-leo --journal agile-giss +``` + +**Create source entries automatically**: + +```bash +python manage.py harvest_journals --journal essd --create-sources +``` + +**Associate with specific user**: + +```bash +python manage.py harvest_journals --all --user-email admin@optimap.science +``` + +**Currently configured journals**: + +- `essd` - Earth System Science Data ([Issue #59](https://github.com/GeoinformationSystems/optimap/issues/59)) +- `agile-giss` - AGILE-GISS conference series ([Issue #60](https://github.com/GeoinformationSystems/optimap/issues/60)) +- `geo-leo` - GEO-LEO e-docs repository ([Issue #13](https://github.com/GeoinformationSystems/optimap/issues/13)) + +The command provides detailed progress reporting including: + +- Number of publications harvested +- Harvesting duration +- Spatial and temporal metadata statistics +- Success/failure status for each journal + +When the command runs mutiple times, it will only add new publications that are not already in the database as part of the regular harvesting process. + ### Create Superusers/Admin Superusers or administrators can be created using the `createsuperuser` command. This user will have access to the Django admin interface. diff --git a/publications/management/commands/harvest_journals.py b/publications/management/commands/harvest_journals.py new file mode 100644 index 0000000..12f277d --- /dev/null +++ b/publications/management/commands/harvest_journals.py @@ -0,0 +1,269 @@ +# publications/management/commands/harvest_journals.py + +""" +Django management command to harvest publications from real OAI-PMH journal sources. + +This command harvests from live OAI-PMH endpoints and saves publications to the +current database. It's designed for production use and testing against real sources. + +Usage: + python manage.py harvest_journals --all + python manage.py harvest_journals --journal essd --max-records 50 + python manage.py harvest_journals --journal geo-leo --journal agile-giss +""" + +import logging +from django.core.management.base import BaseCommand, CommandError +from django.contrib.auth import get_user_model +from django.utils import timezone +from publications.models import Source, HarvestingEvent, Publication +from publications.tasks import harvest_oai_endpoint + +logger = logging.getLogger(__name__) +User = get_user_model() + +# Journal configurations with OAI-PMH endpoints +JOURNAL_CONFIGS = { + 'essd': { + 'name': 'Earth System Science Data', + 'url': 'https://oai-pmh.copernicus.org/oai.php?verb=ListRecords&metadataPrefix=oai_dc&set=essd', + 'collection_name': 'ESSD', + 'homepage_url': 'https://essd.copernicus.org/', + 'publisher_name': 'Copernicus Publications', + 'is_oa': True, + 'issue': 59, + }, + 'agile-giss': { + 'name': 'AGILE-GISS', + 'url': 'https://oai-pmh.copernicus.org/oai.php?verb=ListRecords&metadataPrefix=oai_dc&set=agile-giss', + 'collection_name': 'AGILE-GISS', + 'homepage_url': 'https://www.agile-giscience-series.net/', + 'publisher_name': 'Copernicus Publications', + 'is_oa': True, + 'issue': 60, + }, + 'geo-leo': { + 'name': 'GEO-LEO e-docs', + 'url': 'https://e-docs.geo-leo.de/server/oai/request?verb=ListRecords&metadataPrefix=oai_dc', + 'collection_name': 'GEO-LEO', + 'homepage_url': 'https://e-docs.geo-leo.de/', + 'publisher_name': 'GEO-LEO', + 'is_oa': True, + 'issue': 13, + }, +} + + +class Command(BaseCommand): + help = 'Harvest publications from real OAI-PMH journal sources into the current database' + + def add_arguments(self, parser): + parser.add_argument( + '--journal', + action='append', + choices=list(JOURNAL_CONFIGS.keys()), + help=f'Journal to harvest (choices: {", ".join(JOURNAL_CONFIGS.keys())}). Can be specified multiple times.', + ) + parser.add_argument( + '--all', + action='store_true', + help='Harvest from all configured journals', + ) + parser.add_argument( + '--max-records', + type=int, + default=None, + help='Maximum number of records to harvest per journal (default: unlimited)', + ) + parser.add_argument( + '--create-sources', + action='store_true', + help='Create Source entries if they don\'t exist (default: use existing sources only)', + ) + parser.add_argument( + '--user-email', + type=str, + default=None, + help='Email of user to associate with harvesting events (optional)', + ) + parser.add_argument( + '--list', + action='store_true', + help='List available journals and exit', + ) + + def handle(self, *args, **options): + # List journals and exit + if options['list']: + self.stdout.write(self.style.SUCCESS('\nAvailable journals for harvesting:\n')) + for key, config in JOURNAL_CONFIGS.items(): + self.stdout.write(f" {key:15} - {config['name']}") + self.stdout.write(f" Issue: #{config['issue']}, URL: {config['homepage_url']}") + return + + # Determine which journals to harvest + if options['all']: + journals_to_harvest = list(JOURNAL_CONFIGS.keys()) + elif options['journal']: + journals_to_harvest = options['journal'] + else: + raise CommandError( + 'Please specify --all to harvest all journals, or --journal for specific journals.\n' + 'Use --list to see available journals.' + ) + + # Get user if specified + user = None + if options['user_email']: + try: + user = User.objects.get(email=options['user_email']) + self.stdout.write(f"Using user: {user.email}") + except User.DoesNotExist: + raise CommandError(f"User with email '{options['user_email']}' does not exist") + + max_records = options['max_records'] + create_sources = options['create_sources'] + + # Summary statistics + total_harvested = 0 + total_failed = 0 + results = [] + + self.stdout.write(self.style.SUCCESS(f'\n{"="*70}')) + self.stdout.write(self.style.SUCCESS(f'Starting harvest of {len(journals_to_harvest)} journal(s)')) + self.stdout.write(self.style.SUCCESS(f'{"="*70}\n')) + + # Harvest each journal + for journal_key in journals_to_harvest: + config = JOURNAL_CONFIGS[journal_key] + + self.stdout.write(self.style.WARNING(f'\n--- Harvesting: {config["name"]} ---')) + self.stdout.write(f'Issue: https://github.com/GeoinformationSystems/optimap/issues/{config["issue"]}') + self.stdout.write(f'URL: {config["url"]}') + if max_records: + self.stdout.write(f'Max records: {max_records}') + + try: + # Find or create source + source = self._get_or_create_source(config, create_sources) + + # Harvest + harvest_start = timezone.now() + harvest_oai_endpoint(source.id, user=user, max_records=max_records) + + # Get results + event = HarvestingEvent.objects.filter(source=source).latest('started_at') + pub_count = Publication.objects.filter(job=event).count() + + duration = (timezone.now() - harvest_start).total_seconds() + + if event.status == 'completed': + self.stdout.write(self.style.SUCCESS( + f'✓ Successfully harvested {pub_count} publications in {duration:.1f}s' + )) + total_harvested += pub_count + results.append({ + 'journal': config['name'], + 'status': 'success', + 'count': pub_count, + 'duration': duration, + }) + else: + self.stdout.write(self.style.ERROR( + f'✗ Harvesting failed with status: {event.status}' + )) + total_failed += 1 + results.append({ + 'journal': config['name'], + 'status': 'failed', + 'count': 0, + 'duration': duration, + }) + + # Show spatial/temporal metadata stats + spatial_count = Publication.objects.filter( + job=event + ).exclude(geometry__isnull=True).count() + + temporal_count = Publication.objects.filter( + job=event + ).exclude(timeperiod_startdate=[]).count() + + self.stdout.write( + f' Spatial metadata: {spatial_count}/{pub_count} publications' + ) + self.stdout.write( + f' Temporal metadata: {temporal_count}/{pub_count} publications' + ) + + except Exception as e: + self.stdout.write(self.style.ERROR(f'✗ Error: {str(e)}')) + logger.exception(f'Failed to harvest {journal_key}') + total_failed += 1 + results.append({ + 'journal': config['name'], + 'status': 'error', + 'count': 0, + 'error': str(e), + }) + + # Print summary + self.stdout.write(self.style.SUCCESS(f'\n{"="*70}')) + self.stdout.write(self.style.SUCCESS('Harvest Summary')) + self.stdout.write(self.style.SUCCESS(f'{"="*70}\n')) + + for result in results: + status_symbol = '✓' if result['status'] == 'success' else '✗' + status_style = self.style.SUCCESS if result['status'] == 'success' else self.style.ERROR + + if result['status'] == 'success': + self.stdout.write(status_style( + f"{status_symbol} {result['journal']:30} {result['count']:5} publications " + f"({result['duration']:.1f}s)" + )) + else: + error_msg = result.get('error', result['status']) + self.stdout.write(status_style( + f"{status_symbol} {result['journal']:30} Failed: {error_msg}" + )) + + self.stdout.write(f'\nTotal publications harvested: {total_harvested}') + if total_failed > 0: + self.stdout.write(self.style.WARNING(f'Failed journals: {total_failed}')) + + self.stdout.write(self.style.SUCCESS(f'\n{"="*70}\n')) + + def _get_or_create_source(self, config, create_if_missing): + """Get or optionally create a Source for the journal.""" + # Try to find existing source by name or URL + source = Source.objects.filter(name=config['name']).first() + + if not source: + source = Source.objects.filter(url_field=config['url']).first() + + if source: + self.stdout.write(f'Using existing source: {source.name} (ID: {source.id})') + return source + + if not create_if_missing: + raise CommandError( + f"Source '{config['name']}' not found in database. " + f"Use --create-sources to automatically create it." + ) + + # Create new source + source = Source.objects.create( + name=config['name'], + url_field=config['url'], + collection_name=config['collection_name'], + homepage_url=config.get('homepage_url'), + publisher_name=config.get('publisher_name'), + is_oa=config.get('is_oa', False), + harvest_interval_minutes=60 * 24 * 7, # Weekly + ) + + self.stdout.write(self.style.SUCCESS( + f'Created new source: {source.name} (ID: {source.id})' + )) + + return source From 3185776ccca3fb2182c8f1a026748d8b07774d8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20N=C3=BCst?= Date: Thu, 9 Oct 2025 21:34:40 +0200 Subject: [PATCH 13/14] Adds harvesting error handling tests and update changelog --- CHANGELOG.md | 43 +++- .../harvesting/error_cases/empty_response.xml | 8 + .../error_cases/invalid_xml_structure.xml | 7 + .../harvesting/error_cases/malformed_xml.xml | 19 ++ .../error_cases/missing_metadata.xml | 36 +++ tests/test_harvesting.py | 232 +++++++++++++++++- 6 files changed, 338 insertions(+), 7 deletions(-) create mode 100644 tests/harvesting/error_cases/empty_response.xml create mode 100644 tests/harvesting/error_cases/invalid_xml_structure.xml create mode 100644 tests/harvesting/error_cases/malformed_xml.xml create mode 100644 tests/harvesting/error_cases/missing_metadata.xml diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b1b1d7..2eed28a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,27 +4,58 @@ ### Added -- ... +- Django management command `harvest_journals` for harvesting real OAI-PMH journal sources + - Support for ESSD, AGILE-GISS, and GEO-LEO journals + - Command-line options for journal selection, record limits, and source creation + - Detailed progress reporting with colored output + - Statistics for spatial/temporal metadata extraction +- Integration tests for real journal harvesting (`tests/test_real_harvesting.py`) + - 6 tests covering ESSD, AGILE-GISS, GEO-LEO, and EssOAr + - Tests skipped by default (use `SKIP_REAL_HARVESTING=0` to enable) + - Max records parameter to limit harvesting for testing +- Comprehensive error handling tests for OAI-PMH harvesting (`HarvestingErrorTests`) + - 10 test cases covering malformed XML, missing metadata, HTTP errors, network timeouts + - Test fixtures for various error conditions in `tests/harvesting/error_cases/` + - Verification of graceful error handling and logging +- pytest configuration with custom markers (`pytest.ini`) + - `real_harvesting` marker for integration tests + - Configuration for Django test discovery ### Changed -- ... +- Fixed OAI-PMH harvesting test failures by updating response format parameters + - Changed from invalid 'structured'/'raw' to valid 'geojson'/'wkt'/'wkb' formats + - Updated test assertions to expect GeoJSON FeatureCollection +- Fixed syntax errors in `publications/tasks.py` + - Fixed import statement typo + - Fixed indentation in `extract_timeperiod_from_html` function + - Fixed misplaced return statement in `regenerate_geopackage_cache` function +- Fixed test setup method in `tests/test_harvesting.py` + - Removed incorrect `@classmethod` decorator from `setUp` method +- Fixed `test_regular_harvesting.py` to include `max_records` parameter in mock function +- Updated README.md with comprehensive documentation for: + - Integration test execution + - `harvest_journals` management command usage + - Journal harvesting workflows ### Fixed -- ... +- Docker build for geoextent installation (added git dependency to Dockerfile) +- 18 geoextent API test failures due to invalid response format values +- 8 test setup errors in OAI-PMH harvesting tests +- Test harvesting function signature mismatch ### Deprecated -- ... +- None. ### Removed -- ... +- None. ### Security -- ... +- None. ## [0.2.0] - 2025-10-09 diff --git a/tests/harvesting/error_cases/empty_response.xml b/tests/harvesting/error_cases/empty_response.xml new file mode 100644 index 0000000..84f2009 --- /dev/null +++ b/tests/harvesting/error_cases/empty_response.xml @@ -0,0 +1,8 @@ + + + 2022-07-04T15:37:56Z + http://localhost:8330/index.php/opti-geo/oai + + + + diff --git a/tests/harvesting/error_cases/invalid_xml_structure.xml b/tests/harvesting/error_cases/invalid_xml_structure.xml new file mode 100644 index 0000000..839cfe4 --- /dev/null +++ b/tests/harvesting/error_cases/invalid_xml_structure.xml @@ -0,0 +1,7 @@ + + + This is not a valid OAI-PMH response + + Invalid structure + + diff --git a/tests/harvesting/error_cases/malformed_xml.xml b/tests/harvesting/error_cases/malformed_xml.xml new file mode 100644 index 0000000..f01aca8 --- /dev/null +++ b/tests/harvesting/error_cases/malformed_xml.xml @@ -0,0 +1,19 @@ + + + 2022-07-04T15:37:56Z + http://localhost:8330/index.php/opti-geo/oai + + +
+ oai:ojs2.localhost:8330:article/1 + 2022-07-01T12:59:33Z +
+ + + Malformed Record + + + + +
+ diff --git a/tests/harvesting/error_cases/missing_metadata.xml b/tests/harvesting/error_cases/missing_metadata.xml new file mode 100644 index 0000000..30c9150 --- /dev/null +++ b/tests/harvesting/error_cases/missing_metadata.xml @@ -0,0 +1,36 @@ + + + 2022-07-04T15:37:56Z + http://localhost:8330/index.php/opti-geo/oai + + +
+ oai:ojs2.localhost:8330:article/1 + 2022-07-01T12:59:33Z +
+ + + + http://example.com/article/1 + A publication with no title + 2022-07-01 + + +
+ +
+ oai:ojs2.localhost:8330:article/2 + 2022-07-01T12:59:33Z +
+ + + Record with minimal metadata + http://example.com/article/2 + + + +
+
+
diff --git a/tests/test_harvesting.py b/tests/test_harvesting.py index 03bdb76..8846c53 100644 --- a/tests/test_harvesting.py +++ b/tests/test_harvesting.py @@ -10,7 +10,7 @@ django.setup() from publications.models import Publication, Source, HarvestingEvent, Schedule -from publications.tasks import parse_oai_xml_and_save_publications +from publications.tasks import parse_oai_xml_and_save_publications, harvest_oai_endpoint from django.contrib.auth import get_user_model User = get_user_model() @@ -310,3 +310,233 @@ def test_real_journal_harvesting_agile_giss(self): # Skip test if AGILE doesn't have OAI-PMH endpoint self.skipTest(f"AGILE-GISS endpoint not available: {e}") + +class HarvestingErrorTests(TestCase): + """ + Test cases for error handling during harvesting. + + These tests verify that the harvesting system properly handles: + - Malformed XML + - Empty responses + - Missing required metadata + - Invalid XML structure + - Network/HTTP errors + """ + + def setUp(self): + """Set up test sources and events.""" + Publication.objects.all().delete() + self.source = Source.objects.create( + url_field="http://example.com/oai", + harvest_interval_minutes=60, + name="Error Test Source" + ) + + def test_malformed_xml(self): + """Test that malformed XML is handled gracefully.""" + event = HarvestingEvent.objects.create( + source=self.source, + status="in_progress" + ) + + malformed_xml_path = BASE_TEST_DIR / 'harvesting' / 'error_cases' / 'malformed_xml.xml' + xml_bytes = malformed_xml_path.read_bytes() + + # Should not raise exception, but should log error + parse_oai_xml_and_save_publications(xml_bytes, event) + + # No publications should be created from malformed XML + pub_count = Publication.objects.filter(job=event).count() + self.assertEqual(pub_count, 0, "Malformed XML should not create publications") + + def test_empty_response(self): + """Test that empty OAI-PMH response (no records) is handled.""" + event = HarvestingEvent.objects.create( + source=self.source, + status="in_progress" + ) + + empty_xml_path = BASE_TEST_DIR / 'harvesting' / 'error_cases' / 'empty_response.xml' + xml_bytes = empty_xml_path.read_bytes() + + # Should not raise exception + parse_oai_xml_and_save_publications(xml_bytes, event) + + # No publications should be created from empty response + pub_count = Publication.objects.filter(job=event).count() + self.assertEqual(pub_count, 0, "Empty response should create zero publications") + + def test_invalid_xml_structure(self): + """Test that non-OAI-PMH XML structure is handled.""" + event = HarvestingEvent.objects.create( + source=self.source, + status="in_progress" + ) + + invalid_xml_path = BASE_TEST_DIR / 'harvesting' / 'error_cases' / 'invalid_xml_structure.xml' + xml_bytes = invalid_xml_path.read_bytes() + + # Should not raise exception + parse_oai_xml_and_save_publications(xml_bytes, event) + + # No publications should be created from invalid structure + pub_count = Publication.objects.filter(job=event).count() + self.assertEqual(pub_count, 0, "Invalid XML structure should create zero publications") + + def test_missing_required_metadata(self): + """Test that records with missing required fields are handled.""" + event = HarvestingEvent.objects.create( + source=self.source, + status="in_progress" + ) + + missing_metadata_path = BASE_TEST_DIR / 'harvesting' / 'error_cases' / 'missing_metadata.xml' + xml_bytes = missing_metadata_path.read_bytes() + + # Should not raise exception - may create some publications + parse_oai_xml_and_save_publications(xml_bytes, event) + + # Check what was created + pubs = Publication.objects.filter(job=event) + + # At least one record (the one with title) should be created + self.assertGreaterEqual(pubs.count(), 1, "Should create publications even with minimal metadata") + + # Check that publications were created despite missing fields + for pub in pubs: + # Title might be None for some records + if pub.title: + self.assertIsInstance(pub.title, str) + + def test_empty_content(self): + """Test that empty/None content is handled.""" + event = HarvestingEvent.objects.create( + source=self.source, + status="in_progress" + ) + + # Test with empty bytes + parse_oai_xml_and_save_publications(b"", event) + pub_count = Publication.objects.filter(job=event).count() + self.assertEqual(pub_count, 0, "Empty content should create zero publications") + + # Test with whitespace only + parse_oai_xml_and_save_publications(b" \n\t ", event) + pub_count = Publication.objects.filter(job=event).count() + self.assertEqual(pub_count, 0, "Whitespace-only content should create zero publications") + + @responses.activate + def test_http_404_error(self): + """Test that HTTP 404 errors are handled properly.""" + # Mock a 404 response + responses.add( + responses.GET, + 'http://example.com/oai-404', + status=404, + body='Not Found' + ) + + source = Source.objects.create( + url_field="http://example.com/oai-404", + harvest_interval_minutes=60 + ) + + # harvest_oai_endpoint should handle the error + harvest_oai_endpoint(source.id) + + # Check that event was marked as failed + event = HarvestingEvent.objects.filter(source=source).latest('started_at') + self.assertEqual(event.status, 'failed', "Event should be marked as failed for 404 error") + + @responses.activate + def test_http_500_error(self): + """Test that HTTP 500 errors are handled properly.""" + # Mock a 500 response + responses.add( + responses.GET, + 'http://example.com/oai-500', + status=500, + body='Internal Server Error' + ) + + source = Source.objects.create( + url_field="http://example.com/oai-500", + harvest_interval_minutes=60 + ) + + # harvest_oai_endpoint should handle the error + harvest_oai_endpoint(source.id) + + # Check that event was marked as failed + event = HarvestingEvent.objects.filter(source=source).latest('started_at') + self.assertEqual(event.status, 'failed', "Event should be marked as failed for 500 error") + + @responses.activate + def test_network_timeout(self): + """Test that network timeouts are handled properly.""" + from requests.exceptions import Timeout + + # Mock a timeout + responses.add( + responses.GET, + 'http://example.com/oai-timeout', + body=Timeout('Connection timeout') + ) + + source = Source.objects.create( + url_field="http://example.com/oai-timeout", + harvest_interval_minutes=60 + ) + + # harvest_oai_endpoint should handle the timeout + harvest_oai_endpoint(source.id) + + # Check that event was marked as failed + event = HarvestingEvent.objects.filter(source=source).latest('started_at') + self.assertEqual(event.status, 'failed', "Event should be marked as failed for timeout") + + @responses.activate + def test_invalid_xml_in_http_response(self): + """Test that invalid XML in HTTP response is handled.""" + # Mock response with invalid XML + responses.add( + responses.GET, + 'http://example.com/oai-invalid', + status=200, + body='This is not XML at all', + content_type='text/xml' + ) + + source = Source.objects.create( + url_field="http://example.com/oai-invalid", + harvest_interval_minutes=60 + ) + + # Should complete but create no publications + harvest_oai_endpoint(source.id) + + event = HarvestingEvent.objects.filter(source=source).latest('started_at') + # Should complete (not fail) but create no publications + self.assertEqual(event.status, 'completed', "Event should complete even with invalid XML") + + pub_count = Publication.objects.filter(job=event).count() + self.assertEqual(pub_count, 0, "Invalid XML should create zero publications") + + def test_max_records_limit_with_errors(self): + """Test that max_records works even when some records cause errors.""" + event = HarvestingEvent.objects.create( + source=self.source, + status="in_progress" + ) + + # Use the missing metadata file which has 2 records, one problematic + missing_metadata_path = BASE_TEST_DIR / 'harvesting' / 'error_cases' / 'missing_metadata.xml' + xml_bytes = missing_metadata_path.read_bytes() + + # Limit to 1 record + parse_oai_xml_and_save_publications(xml_bytes, event, max_records=1) + + # Should process only 1 record + pub_count = Publication.objects.filter(job=event).count() + self.assertLessEqual(pub_count, 1, "Should respect max_records limit even with errors") + From 8705efa62dfaf3c0bb400035acb192116e6d3f7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20N=C3=BCst?= Date: Fri, 10 Oct 2025 11:20:39 +0200 Subject: [PATCH 14/14] Adds RSS/Atom feed harvesting support --- .claude/settings.local.json | 4 +- CHANGELOG.md | 22 +- README.md | 9 +- .../management/commands/harvest_journals.py | 33 ++- publications/tasks.py | 215 +++++++++++++++++- requirements.txt | 1 + tests/harvesting/rss_feed_sample.xml | 46 ++++ tests/test_harvesting.py | 182 ++++++++++++++- 8 files changed, 492 insertions(+), 20 deletions(-) create mode 100644 tests/harvesting/rss_feed_sample.xml diff --git a/.claude/settings.local.json b/.claude/settings.local.json index 01c3d81..3ef2ea7 100644 --- a/.claude/settings.local.json +++ b/.claude/settings.local.json @@ -5,7 +5,9 @@ "Bash(git checkout:*)", "Bash(pip install:*)", "Bash(gh issue view:*)", - "Bash(pytest:*)" + "Bash(pytest:*)", + "Bash(pip search:*)", + "Bash(psql:*)" ], "deny": [], "ask": [] diff --git a/CHANGELOG.md b/CHANGELOG.md index 2eed28a..20b4bd3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,8 +4,26 @@ ### Added -- Django management command `harvest_journals` for harvesting real OAI-PMH journal sources - - Support for ESSD, AGILE-GISS, and GEO-LEO journals +- **RSS/Atom feed harvesting support** (`publications/tasks.py`) + - `parse_rss_feed_and_save_publications()` function for parsing RSS/Atom feeds + - `harvest_rss_endpoint()` function for complete RSS harvesting workflow + - Support for RDF-based RSS feeds (Scientific Data journal) + - DOI extraction from multiple feed fields (prism:doi, dc:identifier) + - Duplicate detection by DOI and URL + - Abstract/description extraction from feed content +- feedparser library integration (v6.0.12) + - Added to requirements.txt for RSS/Atom feed parsing + - Supports RSS 1.0/2.0, Atom, and RDF feeds +- Django management command `harvest_journals` enhanced for RSS/Atom feeds + - Added Scientific Data journal with RSS feed support + - Support for both OAI-PMH and RSS/Atom feed types + - Automatic feed type detection based on journal configuration + - Now supports 4 journals: ESSD, AGILE-GISS, GEO-LEO (OAI-PMH), Scientific Data (RSS) +- Comprehensive RSS harvesting tests (`RSSFeedHarvestingTests`) + - 7 test cases covering RSS parsing, duplicate detection, error handling + - Test fixture with sample RDF/RSS feed (`tests/harvesting/rss_feed_sample.xml`) + - Tests for max_records limit, invalid feeds, and HTTP errors +- Django management command `harvest_journals` for harvesting real journal sources - Command-line options for journal selection, record limits, and source creation - Detailed progress reporting with colored output - Statistics for spatial/temporal metadata extraction diff --git a/README.md b/README.md index 797af58..5c70355 100644 --- a/README.md +++ b/README.md @@ -282,9 +282,12 @@ python manage.py harvest_journals --all --user-email admin@optimap.science **Currently configured journals**: -- `essd` - Earth System Science Data ([Issue #59](https://github.com/GeoinformationSystems/optimap/issues/59)) -- `agile-giss` - AGILE-GISS conference series ([Issue #60](https://github.com/GeoinformationSystems/optimap/issues/60)) -- `geo-leo` - GEO-LEO e-docs repository ([Issue #13](https://github.com/GeoinformationSystems/optimap/issues/13)) +- `essd` - Earth System Science Data (OAI-PMH) ([Issue #59](https://github.com/GeoinformationSystems/optimap/issues/59)) +- `agile-giss` - AGILE-GISS conference series (OAI-PMH) ([Issue #60](https://github.com/GeoinformationSystems/optimap/issues/60)) +- `geo-leo` - GEO-LEO e-docs repository (OAI-PMH) ([Issue #13](https://github.com/GeoinformationSystems/optimap/issues/13)) +- `scientific-data` - Scientific Data (RSS/Atom) ([Issue #58](https://github.com/GeoinformationSystems/optimap/issues/58)) + +The command supports both OAI-PMH and RSS/Atom feeds, automatically detecting the feed type for each journal. The command provides detailed progress reporting including: diff --git a/publications/management/commands/harvest_journals.py b/publications/management/commands/harvest_journals.py index 12f277d..e36628a 100644 --- a/publications/management/commands/harvest_journals.py +++ b/publications/management/commands/harvest_journals.py @@ -17,12 +17,12 @@ from django.contrib.auth import get_user_model from django.utils import timezone from publications.models import Source, HarvestingEvent, Publication -from publications.tasks import harvest_oai_endpoint +from publications.tasks import harvest_oai_endpoint, harvest_rss_endpoint logger = logging.getLogger(__name__) User = get_user_model() -# Journal configurations with OAI-PMH endpoints +# Journal configurations with OAI-PMH and RSS/Atom endpoints JOURNAL_CONFIGS = { 'essd': { 'name': 'Earth System Science Data', @@ -30,8 +30,7 @@ 'collection_name': 'ESSD', 'homepage_url': 'https://essd.copernicus.org/', 'publisher_name': 'Copernicus Publications', - 'is_oa': True, - 'issue': 59, + 'feed_type': 'oai-pmh', }, 'agile-giss': { 'name': 'AGILE-GISS', @@ -39,8 +38,7 @@ 'collection_name': 'AGILE-GISS', 'homepage_url': 'https://www.agile-giscience-series.net/', 'publisher_name': 'Copernicus Publications', - 'is_oa': True, - 'issue': 60, + 'feed_type': 'oai-pmh', }, 'geo-leo': { 'name': 'GEO-LEO e-docs', @@ -48,8 +46,15 @@ 'collection_name': 'GEO-LEO', 'homepage_url': 'https://e-docs.geo-leo.de/', 'publisher_name': 'GEO-LEO', - 'is_oa': True, - 'issue': 13, + 'feed_type': 'oai-pmh', + }, + 'scientific-data': { + 'name': 'Scientific Data', + 'url': 'https://www.nature.com/sdata.rss', + 'collection_name': 'Scientific Data', + 'homepage_url': 'https://www.nature.com/sdata/', + 'publisher_name': 'Nature Publishing Group', + 'feed_type': 'rss', }, } @@ -138,7 +143,6 @@ def handle(self, *args, **options): config = JOURNAL_CONFIGS[journal_key] self.stdout.write(self.style.WARNING(f'\n--- Harvesting: {config["name"]} ---')) - self.stdout.write(f'Issue: https://github.com/GeoinformationSystems/optimap/issues/{config["issue"]}') self.stdout.write(f'URL: {config["url"]}') if max_records: self.stdout.write(f'Max records: {max_records}') @@ -147,9 +151,16 @@ def handle(self, *args, **options): # Find or create source source = self._get_or_create_source(config, create_sources) - # Harvest + # Harvest based on feed type harvest_start = timezone.now() - harvest_oai_endpoint(source.id, user=user, max_records=max_records) + feed_type = config.get('feed_type', 'oai-pmh') + + if feed_type == 'rss': + self.stdout.write(f'Feed type: RSS/Atom') + harvest_rss_endpoint(source.id, user=user, max_records=max_records) + else: + self.stdout.write(f'Feed type: OAI-PMH') + harvest_oai_endpoint(source.id, user=user, max_records=max_records) # Get results event = HarvestingEvent.objects.filter(source=source).latest('started_at') diff --git a/publications/tasks.py b/publications/tasks.py index e108153..add923e 100644 --- a/publications/tasks.py +++ b/publications/tasks.py @@ -157,11 +157,15 @@ def parse_oai_xml_and_save_publications(content, event: HarvestingEvent, max_rec processed_count = 0 saved_count = 0 + # Calculate progress reporting interval (every 10% of records) + total_records = len(records) if hasattr(records, '__len__') else None + log_interval = max(1, total_records // 10) if total_records else 10 + for rec in records: try: processed_count += 1 - if processed_count % 10 == 0: - logger.debug("Processing record %d of %d", processed_count, len(records) if hasattr(records, '__len__') else '?') + if processed_count % log_interval == 0: + logger.debug("Processing record %d of %d", processed_count, total_records if total_records else '?') if hasattr(rec, "metadata"): identifiers = rec.metadata.get("identifier", []) + rec.metadata.get("relation", []) @@ -583,3 +587,210 @@ def regenerate_geopackage_cache(): gpkg_path = convert_geojson_to_geopackage(geojson_path) cleanup_old_data_dumps(cache_dir, settings.DATA_DUMP_RETENTION) return gpkg_path + + +# ============================================================================ +# RSS/Atom Feed Harvesting +# ============================================================================ + +def parse_rss_feed_and_save_publications(feed_url, event: 'HarvestingEvent', max_records=None): + """ + Parse RSS/Atom feed and save publications. + + Args: + feed_url: URL of the RSS/Atom feed + event: HarvestingEvent instance + max_records: Maximum number of records to process (optional) + + Returns: + tuple: (processed_count, saved_count) + """ + import feedparser + + source = event.source + logger.info("Starting RSS/Atom feed parsing for source: %s", source.name) + + try: + # Parse the feed + feed = feedparser.parse(feed_url) + + if not feed or not hasattr(feed, 'entries'): + logger.error("Failed to parse RSS feed: %s", feed_url) + return 0, 0 + + entries = feed.entries + logger.info("Found %d entries in RSS feed", len(entries)) + + if not entries: + logger.warning("No entries found in RSS feed!") + return 0, 0 + + # Limit records if specified + if max_records: + entries = entries[:max_records] + logger.info("Limited to first %d records", max_records) + + processed_count = 0 + saved_count = 0 + + # Calculate progress reporting interval (every 10% of entries) + total_entries = len(entries) + log_interval = max(1, total_entries // 10) + + for entry in entries: + try: + processed_count += 1 + if processed_count % log_interval == 0: + logger.debug("Processing entry %d of %d", processed_count, total_entries) + + # Extract metadata from feed entry + title = entry.get('title', '').strip() + link = entry.get('link', entry.get('id', '')).strip() + + # Extract DOI - try multiple fields + doi = None + if 'prism_doi' in entry: + doi = entry.prism_doi.strip() + elif 'dc_identifier' in entry and 'doi' in entry.dc_identifier.lower(): + doi_match = DOI_REGEX.search(entry.dc_identifier) + if doi_match: + doi = doi_match.group(0) + + # Extract date + published_date = None + date_str = entry.get('updated', entry.get('published', entry.get('dc_date'))) + if date_str: + if hasattr(date_str, 'strftime'): + # It's already a datetime + published_date = date_str.strftime('%Y-%m-%d') + else: + # Parse date string + published_date = parse_publication_date(str(date_str)) + + # Extract abstract/description + abstract = '' + if 'summary' in entry: + abstract = BeautifulSoup(entry.summary, 'html.parser').get_text() + elif 'content' in entry and entry.content: + abstract = BeautifulSoup(entry.content[0].get('value', ''), 'html.parser').get_text() + + # Skip if no title + if not title: + logger.warning("Skipping entry with no title: %s", link) + continue + + # Skip if no URL/identifier + if not link: + logger.warning("Skipping entry '%s' with no URL", title[:50]) + continue + + logger.debug("Processing publication: %s", title[:50]) + + # Check for duplicates by DOI or URL + existing_pub = None + if doi: + existing_pub = Publication.objects.filter(doi=doi).first() + if not existing_pub and link: + existing_pub = Publication.objects.filter(url=link).first() + + if existing_pub: + logger.debug("Publication already exists: %s", title[:50]) + continue + + # Create publication + pub = Publication( + title=title, + doi=doi, + url=link, + abstract=abstract[:5000] if abstract else None, # Limit abstract length + publicationDate=published_date, + source=source, + job=event, + timeperiod_startdate=[], + timeperiod_enddate=[], + geometry=GeometryCollection(), # No spatial data from RSS typically + ) + + pub.save() + saved_count += 1 + logger.debug("Saved publication: %s", title[:50]) + + except Exception as e: + logger.error("Failed to process entry '%s': %s", + entry.get('title', 'Unknown')[:50], str(e)) + continue + + logger.info("RSS feed parsing completed for source %s: processed %d entries, saved %d publications", + source.name, processed_count, saved_count) + return processed_count, saved_count + + except Exception as e: + logger.error("Failed to parse RSS feed %s: %s", feed_url, str(e)) + return 0, 0 + + +def harvest_rss_endpoint(source_id, user=None, max_records=None): + """ + Harvest publications from an RSS/Atom feed. + + Args: + source_id: ID of the Source model instance + user: User who initiated the harvest (optional) + max_records: Maximum number of records to harvest (optional) + """ + from publications.models import Source, HarvestingEvent, Publication + + source = Source.objects.get(id=source_id) + event = HarvestingEvent.objects.create(source=source, status="in_progress") + + try: + feed_url = source.url_field + logger.info("Fetching from RSS feed: %s", feed_url) + + processed, saved = parse_rss_feed_and_save_publications(feed_url, event, max_records=max_records) + + event.status = "completed" + event.completed_at = timezone.now() + event.save() + + new_count = Publication.objects.filter(job=event).count() + spatial_count = Publication.objects.filter(job=event).exclude(geometry__isnull=True).count() + temporal_count = Publication.objects.filter(job=event).exclude(timeperiod_startdate=[]).count() + + subject = f"RSS Feed Harvesting Completed for {source.name}" + completed_str = event.completed_at.strftime('%Y-%m-%d %H:%M:%S') + message = ( + f"RSS/Atom feed harvesting job details:\n\n" + f"Number of added articles: {new_count}\n" + f"Number of articles with spatial metadata: {spatial_count}\n" + f"Number of articles with temporal metadata: {temporal_count}\n" + f"Source: {source.name}\n" + f"Feed URL: {source.url_field}\n" + f"Job started at: {event.started_at.strftime('%Y-%m-%d %H:%M:%S')}\n" + f"Job completed at: {completed_str}\n" + ) + + if user and user.email: + send_mail( + subject, + message, + settings.EMAIL_HOST_USER, + [user.email], + fail_silently=False, + ) + + except Exception as e: + logger.error("RSS feed harvesting failed for source %s: %s", source.url_field, str(e)) + event.status = "failed" + event.completed_at = timezone.now() + event.save() + + # Send failure notification + if user and user.email: + send_mail( + f"RSS Feed Harvesting Failed for {source.name}", + f"RSS feed harvesting failed for {source.name}\n\nError: {str(e)}\n\nFeed URL: {source.url_field}", + settings.EMAIL_HOST_USER, + [user.email], + fail_silently=True, + ) diff --git a/requirements.txt b/requirements.txt index e4543e1..1cb4d2c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -40,3 +40,4 @@ pyalex>=0.4.0 python-stdnum>=2.0.0 geopy>=2.4.1 oaipmh-scythe==0.13.0 +feedparser==6.0.12 diff --git a/tests/harvesting/rss_feed_sample.xml b/tests/harvesting/rss_feed_sample.xml new file mode 100644 index 0000000..d29eb07 --- /dev/null +++ b/tests/harvesting/rss_feed_sample.xml @@ -0,0 +1,46 @@ + + + + Test RSS Feed + Test RSS feed for OPTIMAP harvesting + http://feeds.example.com/test/rss + Test Publisher + en + + + + <![CDATA[Test Article One: Data Repository]]> + https://www.example.com/articles/test-article-1 + + Test Journal, Published online: 01 October 2025; doi:10.1234/test-001

Test Article One: Data Repository]]> +
+ + John DoeJane Smith + doi:10.1234/test-001 + Test Journal, Published online: 2025-10-01; | doi:10.1234/test-001 + 2025-10-01 + Test Journal + 10.1234/test-001 + https://www.example.com/articles/test-article-1 +
+ + + <![CDATA[Test Article Two: Analysis Methods]]> + https://www.example.com/articles/test-article-2 + + Test Journal, Published online: 02 October 2025; doi:10.1234/test-002

Test Article Two: Analysis Methods]]> +
+ + Alice BrownBob Wilson + doi:10.1234/test-002 + Test Journal, Published online: 2025-10-02; | doi:10.1234/test-002 + 2025-10-02 + Test Journal + 10.1234/test-002 + https://www.example.com/articles/test-article-2 +
+
diff --git a/tests/test_harvesting.py b/tests/test_harvesting.py index 8846c53..7beb68f 100644 --- a/tests/test_harvesting.py +++ b/tests/test_harvesting.py @@ -10,7 +10,12 @@ django.setup() from publications.models import Publication, Source, HarvestingEvent, Schedule -from publications.tasks import parse_oai_xml_and_save_publications, harvest_oai_endpoint +from publications.tasks import ( + parse_oai_xml_and_save_publications, + harvest_oai_endpoint, + parse_rss_feed_and_save_publications, + harvest_rss_endpoint +) from django.contrib.auth import get_user_model User = get_user_model() @@ -540,3 +545,178 @@ def test_max_records_limit_with_errors(self): pub_count = Publication.objects.filter(job=event).count() self.assertLessEqual(pub_count, 1, "Should respect max_records limit even with errors") + +class RSSFeedHarvestingTests(TestCase): + """ + Test cases for RSS/Atom feed harvesting. + + These tests verify that the RSS harvesting system properly handles: + - RDF/RSS feed parsing + - Publication extraction from feed entries + - Duplicate detection + - DOI and metadata extraction + """ + + def setUp(self): + """Set up test source for RSS feeds.""" + Publication.objects.all().delete() + self.source = Source.objects.create( + url_field="https://www.example.com/feed.rss", + harvest_interval_minutes=60, + name="Test RSS Source" + ) + + def test_parse_rss_feed_from_file(self): + """Test parsing RSS feed from local file.""" + event = HarvestingEvent.objects.create( + source=self.source, + status="in_progress" + ) + + rss_feed_path = BASE_TEST_DIR / 'harvesting' / 'rss_feed_sample.xml' + feed_url = f"file://{rss_feed_path}" + + processed, saved = parse_rss_feed_and_save_publications(feed_url, event) + + # Check counts + self.assertEqual(processed, 2, "Should process 2 entries") + self.assertEqual(saved, 2, "Should save 2 publications") + + # Check created publications + pubs = Publication.objects.filter(job=event) + self.assertEqual(pubs.count(), 2) + + # Check first publication + pub1 = pubs.filter(doi='10.1234/test-001').first() + self.assertIsNotNone(pub1) + self.assertEqual(pub1.title, 'Test Article One: Data Repository') + self.assertEqual(pub1.url, 'https://www.example.com/articles/test-article-1') + self.assertEqual(str(pub1.publicationDate), '2025-10-01') + + # Check second publication + pub2 = pubs.filter(doi='10.1234/test-002').first() + self.assertIsNotNone(pub2) + self.assertEqual(pub2.title, 'Test Article Two: Analysis Methods') + + def test_rss_duplicate_detection_by_doi(self): + """Test that duplicate detection works by DOI.""" + event = HarvestingEvent.objects.create( + source=self.source, + status="in_progress" + ) + + # Create existing publication with same DOI + Publication.objects.create( + title="Existing Publication", + doi="10.1234/test-001", + source=self.source, + timeperiod_startdate=[], + timeperiod_enddate=[] + ) + + rss_feed_path = BASE_TEST_DIR / 'harvesting' / 'rss_feed_sample.xml' + feed_url = f"file://{rss_feed_path}" + + processed, saved = parse_rss_feed_and_save_publications(feed_url, event) + + # Should process both but only save one (the one without duplicate DOI) + self.assertEqual(processed, 2) + self.assertEqual(saved, 1, "Should only save publication without duplicate DOI") + + def test_rss_duplicate_detection_by_url(self): + """Test that duplicate detection works by URL.""" + event = HarvestingEvent.objects.create( + source=self.source, + status="in_progress" + ) + + # Create existing publication with same URL + Publication.objects.create( + title="Existing Publication", + url="https://www.example.com/articles/test-article-1", + source=self.source, + timeperiod_startdate=[], + timeperiod_enddate=[] + ) + + rss_feed_path = BASE_TEST_DIR / 'harvesting' / 'rss_feed_sample.xml' + feed_url = f"file://{rss_feed_path}" + + processed, saved = parse_rss_feed_and_save_publications(feed_url, event) + + # Should process both but only save one + self.assertEqual(processed, 2) + self.assertEqual(saved, 1, "Should only save publication without duplicate URL") + + def test_rss_max_records_limit(self): + """Test that max_records parameter limits RSS harvesting.""" + event = HarvestingEvent.objects.create( + source=self.source, + status="in_progress" + ) + + rss_feed_path = BASE_TEST_DIR / 'harvesting' / 'rss_feed_sample.xml' + feed_url = f"file://{rss_feed_path}" + + # Limit to 1 record + processed, saved = parse_rss_feed_and_save_publications(feed_url, event, max_records=1) + + self.assertEqual(processed, 1, "Should only process 1 entry") + self.assertEqual(saved, 1, "Should only save 1 publication") + + pubs = Publication.objects.filter(job=event) + self.assertEqual(pubs.count(), 1) + + def test_harvest_rss_endpoint_from_file(self): + """Test complete RSS harvesting workflow from file.""" + rss_feed_path = BASE_TEST_DIR / 'harvesting' / 'rss_feed_sample.xml' + + # Update source to use file:// URL + self.source.url_field = f"file://{rss_feed_path}" + self.source.save() + + # Harvest + harvest_rss_endpoint(self.source.id, max_records=10) + + # Check event status + event = HarvestingEvent.objects.filter(source=self.source).latest('started_at') + self.assertEqual(event.status, 'completed') + + # Check publications + pubs = Publication.objects.filter(job=event) + self.assertEqual(pubs.count(), 2, "Should create 2 publications from RSS feed") + + def test_harvest_rss_endpoint_invalid_file(self): + """Test RSS harvesting handles invalid file paths.""" + # Update source to use non-existent file + self.source.url_field = "file:///tmp/nonexistent_rss_feed.xml" + self.source.save() + + # Harvest should handle error gracefully + harvest_rss_endpoint(self.source.id) + + # Check event was marked as completed (feedparser returns empty feed for invalid URLs) + event = HarvestingEvent.objects.filter(source=self.source).latest('started_at') + # Event completes but creates no publications + self.assertEqual(event.status, 'completed') + + # No publications should be created + pubs = Publication.objects.filter(job=event) + self.assertEqual(pubs.count(), 0) + + def test_rss_invalid_feed_url(self): + """Test handling of invalid RSS feed URL.""" + event = HarvestingEvent.objects.create( + source=self.source, + status="in_progress" + ) + + # Try to parse non-existent file + feed_url = "file:///tmp/nonexistent_feed.xml" + + processed, saved = parse_rss_feed_and_save_publications(feed_url, event) + + # Should handle gracefully and return zero + self.assertEqual(processed, 0) + self.assertEqual(saved, 0) +