Skip to content

Commit c1ce61b

Browse files
committed
feat: add nasa earthdata api and processor
1 parent 2ab1a6d commit c1ce61b

File tree

9 files changed

+649
-17
lines changed

9 files changed

+649
-17
lines changed

main.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,8 @@ def main():
8989
# 步驟:
9090
# 1. 前往src.config.settings中更改輸出路徑(硬碟路徑)
9191
# 2. 設定參數
92-
start, end = '2025-02-20', '2025-03-06'
93-
file_class: ClassInput = 'OFFL'
92+
start, end = '2025-03-01', '2025-03-13'
93+
file_class: ClassInput = 'NRTI'
9494
file_type: TypeInput = 'NO2___'
9595

9696
# 3. 設定輸入輸出配置

main_earthdata.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
"""主程式"""
2+
import logging
3+
import asyncio
4+
from datetime import datetime
5+
from pathlib import Path
6+
7+
from src.api.earthdata_api import EARTHDATAFetcher
8+
from src.processing.modis_processor import MODISProcessor
9+
10+
from src.config.richer import rich_print
11+
from src.config.catalog import ClassInput, TypeInput, PRODUCT_CONFIGS
12+
from src.config.setup import setup, setup_nasa
13+
14+
15+
logger = logging.getLogger(__name__)
16+
17+
18+
async def fetch_data(file_type,
19+
start_date: str | datetime,
20+
end_date: str | datetime):
21+
"""下載數據的工作流程"""
22+
try:
23+
rich_print(
24+
f"Fetching Earthdata (MODIS) products from {start_date} to {end_date} ...")
25+
26+
fetcher = EARTHDATAFetcher()
27+
28+
products = await fetcher.fetch_data(
29+
file_type=file_type,
30+
start_date=start_date,
31+
end_date=end_date,
32+
)
33+
34+
if products:
35+
rich_print(f"Start download Earthdata (MODIS) products from {start_date} to {end_date} ...")
36+
fetcher.download(products)
37+
rich_print("Data download completed!")
38+
return True
39+
else:
40+
rich_print("No data matching the criteria was found")
41+
42+
except Exception as e:
43+
error_message = f"Failed to download data: {str(e)}"
44+
rich_print(error_message)
45+
logger.error(error_message)
46+
47+
48+
def process_data(file_type,
49+
start_date: str | datetime,
50+
end_date: str | datetime):
51+
"""處理數據的工作流程"""
52+
try:
53+
rich_print(
54+
f"Processing Earthdata (MODIS) products from {start_date} to {end_date} ...")
55+
56+
processor = MODISProcessor(file_type)
57+
58+
# 處理所有文件
59+
processor.process_all_files(
60+
pattern=f"{file_type}/**/*.hdf", # 從組織好的文件結構中尋找文件
61+
start_date=start_date,
62+
end_date=end_date
63+
)
64+
65+
rich_print("Data processing completed")
66+
67+
except Exception as e:
68+
error_message = f"Failed to process data: {str(e)}"
69+
rich_print(error_message)
70+
logger.error(error_message)
71+
72+
73+
def main():
74+
# 步驟:
75+
# 1. 前往src.config.settings中更改輸出路徑(硬碟路徑)
76+
# 2. 設定參數
77+
start, end = '2025-03-01', '2025-03-12'
78+
file_type = "MYD04"
79+
80+
# 3. 設定輸入輸出配置
81+
setup_nasa(file_type=file_type, start_date=start, end_date=end)
82+
83+
# 4. 下載數據 (需要有.env 內含 EARTHDATA_USERNAME and EARTHDATA_PASSWORD 才能用)
84+
asyncio.run(fetch_data(file_type=file_type, start_date=start, end_date=end))
85+
86+
# 5. 處理與繪製數據
87+
process_data(file_type=file_type, start_date=start, end_date=end)
88+
89+
90+
if __name__ == "__main__":
91+
main()

requirements.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,6 @@ pillow~=10.4.0
2121
pytest~=8.3.4
2222
tkcalendar~=1.6.1
2323
certifi~=2024.8.30
24-
schedule~=1.2.2
24+
schedule~=1.2.2
25+
earthaccess~=0.14.0
26+
pyhdf~=0.11.6

run_pipeline.py

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,17 @@
66
import logging
77
import asyncio
88
import time
9-
from datetime import datetime
9+
from datetime import datetime, timedelta
1010
import schedule
1111

1212
from src.api.sentinel_api import S5PFetcher
1313
from src.processing.data_processor import S5Processor
1414
from src.config.catalog import ClassInput, TypeInput, PRODUCT_CONFIGS
15-
from src.config.setup import setup
15+
from src.config.setup import setup, setup_nasa
1616
from src.config.settings import FILTER_BOUNDARY, DATA_RETENTION_DAYS, LOGS_DIR, BASE_DIR
1717

18+
from main_earthdata import fetch_data, process_data
19+
1820
# 導入檔案保留管理器
1921
from file_retention_manager import FileRetentionManager
2022

@@ -134,26 +136,51 @@ async def daily_task():
134136

135137
# 設定參數 - 只處理當天的數據
136138
today = datetime.now().strftime('%Y-%m-%d')
139+
two_days_ago = (datetime.now() - timedelta(days=2)).strftime('%Y-%m-%d')
140+
seven_days_ago = (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d')
141+
142+
# 開始執行 Sentinel-5P
137143
file_class: ClassInput = 'NRTI'
138144
file_type: list[TypeInput] = ['NO2___', 'HCHO__', 'CO____']
139145

140146
for file_tp in file_type:
141147
# 設定輸入輸出配置
142-
setup(file_type=file_tp, start_date=today, end_date=today)
148+
setup(file_type=file_tp, start_date=two_days_ago, end_date=today)
149+
150+
# 檢查並下載當天的數據
151+
has_data = await fetch_data_auto(file_class=file_class, file_type=file_tp, start_date=two_days_ago, end_date=today)
152+
153+
# 如果有數據,則處理並繪製
154+
if has_data:
155+
success = process_data_auto(file_class=file_class, file_type=file_tp, start_date=two_days_ago, end_date=today)
156+
if success:
157+
logger.info(f"每日衛星數據處理pipeline執行完成 - {today}")
158+
else:
159+
logger.error(f"處理數據失敗 - {today}")
160+
else:
161+
logger.info(f"今日({today})無可用的衛星數據")
162+
163+
# 開始執行 MODIS
164+
file_type: list[str] = ['MYD04', 'MOD04']
165+
166+
for file_tp in file_type:
167+
# 設定輸入輸出配置
168+
setup_nasa(file_type=file_tp, start_date=seven_days_ago, end_date=today)
143169

144170
# 檢查並下載當天的數據
145-
has_data = await fetch_data_auto(file_class=file_class, file_type=file_tp, start_date=today, end_date=today)
171+
has_data = await fetch_data(file_type=file_tp, start_date=seven_days_ago, end_date=today)
146172

147173
# 如果有數據,則處理並繪製
148174
if has_data:
149-
success = process_data_auto(file_class=file_class, file_type=file_tp, start_date=today, end_date=today)
175+
success = process_data(file_type=file_tp, start_date=seven_days_ago, end_date=today)
150176
if success:
151177
logger.info(f"每日衛星數據處理pipeline執行完成 - {today}")
152178
else:
153179
logger.error(f"處理數據失敗 - {today}")
154180
else:
155181
logger.info(f"今日({today})無可用的衛星數據")
156182

183+
157184
# 執行舊檔案清理任務
158185
# logger.info("執行舊檔案清理任務")
159186
# clean_old_files()

src/api/earthdata_api.py

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
import re
2+
import shutil
3+
import earthaccess
4+
5+
from datetime import datetime
6+
from pathlib import Path
7+
from dotenv import load_dotenv
8+
from src.config.richer import console, rich_print, DisplayManager
9+
from src.config.settings import MODIS_RAW_DATA_DIR
10+
11+
12+
# 加載環境變數
13+
load_dotenv()
14+
15+
# 地理範圍設定
16+
SEARCH_BOUNDARY = (119.0, 21.0, 123.0, 26.0) # 搜索數據的邊界 (west_lon, south_lat, east_lon, north_lat)
17+
18+
19+
class EARTHDATAFetcher:
20+
def __init__(self):
21+
# 使用 earthaccess 登入
22+
self.auth = earthaccess.login(strategy="environment")
23+
24+
# 配置下載數據
25+
self.download_dir = MODIS_RAW_DATA_DIR
26+
27+
async def fetch_data(self, file_type, start_date: datetime, end_date: datetime, boundary: tuple = SEARCH_BOUNDARY):
28+
self.file_type = file_type
29+
30+
products = earthaccess.search_data(
31+
short_name=f"{file_type}_L2",
32+
temporal=(f'{start_date}T00:00:00.000Z', f'{end_date}T23:59:59.999Z'),
33+
bounding_box=boundary,
34+
)
35+
36+
# 進一步過濾結果,排除 NRT 文件
37+
filtered_products = []
38+
for product in products:
39+
# 檢查是否有下載連結
40+
if not hasattr(product, 'data_links') or not callable(getattr(product, 'data_links')):
41+
continue
42+
43+
links = product.data_links()
44+
if not links:
45+
continue
46+
47+
# 檢查文件名是否包含 .NRT.hdf
48+
filename = Path(links[0]).name
49+
if '.NRT.hdf' not in filename:
50+
filtered_products.append(product)
51+
52+
# Display product info
53+
if filtered_products:
54+
DisplayManager().display_products_nasa(filtered_products)
55+
return filtered_products
56+
else:
57+
print("No valid products found (NRT files excluded)")
58+
return []
59+
60+
def download(self, products):
61+
if not products:
62+
print("沒有找到符合條件的數據")
63+
return []
64+
65+
# 用於跟踪下載的文件
66+
downloaded_files = []
67+
68+
# 檢查哪些文件需要下載
69+
for result in products:
70+
try:
71+
# 獲取文件名和下載鏈接
72+
if not result.data_links():
73+
continue
74+
75+
file_url = result.data_links()[0]
76+
filename = Path(file_url).name
77+
78+
# 跳過 NRT 文件
79+
if '.NRT.hdf' in filename:
80+
print(f"跳過 NRT 文件: {filename}")
81+
continue
82+
83+
# 從檔案名稱提取日期信息
84+
date_match = re.search(r'\.A(\d{7})\.', filename)
85+
if not date_match:
86+
print(f"無法從文件名提取日期: {filename}")
87+
continue
88+
89+
date_str = date_match.group(1)
90+
year = date_str[:4]
91+
day_of_year = int(date_str[4:7])
92+
93+
# 將日期轉換為年月
94+
file_date = datetime.strptime(f"{year}-{day_of_year}", "%Y-%j")
95+
year_month_dir = file_date.strftime("%Y/%m")
96+
97+
# 創建目標目錄
98+
target_dir = self.download_dir / self.file_type / year_month_dir
99+
target_dir.mkdir(parents=True, exist_ok=True)
100+
101+
# 檢查文件是否已存在
102+
target_file = target_dir / filename
103+
if target_file.exists():
104+
# print(f"檔案已存在: {target_file}")
105+
downloaded_files.append(str(target_file))
106+
continue
107+
108+
# 下載單個文件
109+
# print(f"下載文件: {filename} 到 {target_dir}")
110+
111+
# 使用 earthaccess 下載到臨時位置
112+
temp_files = earthaccess.download([result], self.download_dir / "temp")
113+
114+
# 如果下載成功,移動到目標位置
115+
if temp_files and len(temp_files) > 0:
116+
temp_file = Path(temp_files[0])
117+
if temp_file.exists():
118+
# 確保目標目錄存在
119+
target_dir.mkdir(parents=True, exist_ok=True)
120+
121+
# 移動文件到正確的目錄
122+
temp_file.rename(target_file)
123+
# print(f"成功下載: {target_file}")
124+
downloaded_files.append(str(target_file))
125+
else:
126+
print(f"下載失敗: {filename}")
127+
else:
128+
print(f"下載失敗: {filename}")
129+
130+
except Exception as e:
131+
print(f"處理文件時發生錯誤: {str(e)}")
132+
133+
# 刪除臨時目錄
134+
temp_dir = self.download_dir / "temp"
135+
if temp_dir.exists():
136+
try:
137+
shutil.rmtree(temp_dir)
138+
except Exception:
139+
pass
140+
141+
if not downloaded_files:
142+
print("所有檔案已經存在,無需下載")
143+
else:
144+
print(f"成功下載 {len(downloaded_files)} 個檔案")
145+
146+
return downloaded_files

0 commit comments

Comments
 (0)