|
| 1 | +import functools |
| 2 | +import json |
| 3 | +import logging |
| 4 | + |
| 5 | +import requests |
| 6 | +from datetime import date, datetime, timedelta |
| 7 | +from io import StringIO |
| 8 | +from typing import Callable |
| 9 | + |
| 10 | +import pandas as pd |
| 11 | + |
| 12 | +from skypro.common.cli_utils.cli_utils import get_user_ack_of_warning_or_exit, read_yaml_file |
| 13 | +from skypro.common.data.utility import prepare_data_dir |
| 14 | + |
| 15 | +from skypro.common.timeutils.month_str import get_first_and_last_date |
| 16 | +from skypro.commands.pull_elexon_imbalance.utils import daterange, with_retries |
| 17 | + |
| 18 | +ELEXON_API_MAX_RETRIES = 5 |
| 19 | +ELEXON_API_RETRY_DELAY = timedelta(seconds=1) |
| 20 | + |
| 21 | + |
| 22 | +def pull_elexon_imbalance(month_str: str, env_file_path: str): |
| 23 | + """ |
| 24 | + Pulls a months worth of half-hourly imbalance price and volume data from Elexon and saves it to disk. |
| 25 | + The data is saved in monthly CSV files in the directory defined by the MARKET_DATA_DIR in the environment configuration. |
| 26 | + """ |
| 27 | + |
| 28 | + start_date, end_date = get_first_and_last_date(month_str) |
| 29 | + |
| 30 | + today = datetime.now().date() |
| 31 | + if end_date > today: |
| 32 | + end_date = today |
| 33 | + get_user_ack_of_warning_or_exit(f"The month has not ended yet, so data will be incomplete after {end_date}") |
| 34 | + |
| 35 | + env_config = read_yaml_file(env_file_path) |
| 36 | + data_dir = env_config["vars"]["MARKET_DATA_DIR"] |
| 37 | + |
| 38 | + df = _fetch_multiple_days( |
| 39 | + start=start_date, |
| 40 | + end=end_date, |
| 41 | + fetch_func=_fetch_day, |
| 42 | + ) |
| 43 | + |
| 44 | + df["price"] = df["price"] / 10 # £/MW to p/kW |
| 45 | + |
| 46 | + prices_file_path = prepare_data_dir(data_dir, "elexon", "imbalance_price", start_date) |
| 47 | + volume_file_path = prepare_data_dir(data_dir, "elexon", "imbalance_volume", start_date) |
| 48 | + |
| 49 | + logging.info(f"Saving pricing data to '{prices_file_path}'") |
| 50 | + df[["spUTCTime", "spClockTime", "price"]].to_csv(prices_file_path, index=False) |
| 51 | + logging.info(f"Saving volume data to '{volume_file_path}'") |
| 52 | + df[["spUTCTime", "spClockTime", "volume"]].to_csv(volume_file_path, index=False) |
| 53 | + |
| 54 | + |
| 55 | +def _fetch_multiple_days(start: date, end: date, fetch_func: Callable) -> pd.DataFrame: |
| 56 | + """ |
| 57 | + The elexon API pulls for a single day, so this function calls the elexon API repeatedly for each day and stacks up |
| 58 | + the results. |
| 59 | + """ |
| 60 | + df = pd.DataFrame() |
| 61 | + for day in daterange(start, end): |
| 62 | + |
| 63 | + logging.info(f"Fetching imbalance data for '{str(day)}'...") |
| 64 | + day_df = with_retries( # The Elexon API can be busy/unreliable at times so use retries to get past temporary failures |
| 65 | + functools.partial(fetch_func, day), |
| 66 | + ELEXON_API_MAX_RETRIES, |
| 67 | + ELEXON_API_RETRY_DELAY |
| 68 | + ) |
| 69 | + |
| 70 | + df = pd.concat([df, day_df]) |
| 71 | + |
| 72 | + # The values come through in the opposite order to what you'd expect |
| 73 | + df = df.sort_values(by=["spUTCTime"], ignore_index=True) |
| 74 | + |
| 75 | + return df |
| 76 | + |
| 77 | + |
| 78 | +def _fetch_day(day: date) -> pd.DataFrame: |
| 79 | + """ |
| 80 | + Pulls a single days worth of imbalance data from Elexon. |
| 81 | + """ |
| 82 | + |
| 83 | + day_str = day.isoformat() |
| 84 | + # Send a GET request to the API |
| 85 | + response = requests.get( |
| 86 | + url=f"https://data.elexon.co.uk/bmrs/api/v1/balancing/settlement/system-prices/{day_str}", |
| 87 | + params={"format": "json"} |
| 88 | + ) |
| 89 | + response.raise_for_status() |
| 90 | + |
| 91 | + json_data = json.load(StringIO(response.text)) |
| 92 | + day_df = pd.DataFrame.from_dict(json_data["data"]) |
| 93 | + |
| 94 | + day_df["startTime"] = pd.to_datetime(day_df["startTime"], utc=True) |
| 95 | + day_df = day_df[["startTime", "systemSellPrice", "netImbalanceVolume"]] |
| 96 | + day_df = day_df.rename(columns={ |
| 97 | + "startTime": "spUTCTime", |
| 98 | + "systemSellPrice": "price", |
| 99 | + "netImbalanceVolume": "volume" |
| 100 | + }) |
| 101 | + |
| 102 | + day_df.insert(1, "spClockTime", day_df["spUTCTime"].dt.tz_convert("Europe/London")) |
| 103 | + |
| 104 | + return day_df |
0 commit comments