diff --git a/notebooks/OECM-2023-ingest.ipynb b/notebooks/OECM-2023-ingest.ipynb index 3f3f8f6..136e263 100644 --- a/notebooks/OECM-2023-ingest.ipynb +++ b/notebooks/OECM-2023-ingest.ipynb @@ -31,12 +31,9 @@ "outputs": [], "source": [ "import os\n", - "import pathlib\n", "import warnings\n", "import numpy as np\n", "import pandas as pd\n", - "import trino\n", - "from sqlalchemy.engine import create_engine\n", "import osc_ingest_trino as osc\n", "\n", "# import python_pachyderm" @@ -50,10 +47,8 @@ "outputs": [], "source": [ "import re\n", - "import io\n", "import json\n", - "from math import log10\n", - "import itertools" + "from math import log10" ] }, { @@ -84,10 +79,8 @@ "source": [ "# See data-platform-demo/pint-demo.ipynb for quantify/dequantify functions\n", "\n", - "import warnings # needed until quantile behaves better with Pint quantities in arrays\n", - "from pint import set_application_registry, Context, Quantity\n", - "from pint_pandas import PintArray, PintType\n", - "from openscm_units import unit_registry\n", + "from pint import Context\n", + "from pint_pandas import PintArray\n", "from common_units import ureg\n", "\n", "Q_ = ureg.Quantity\n", @@ -249,7 +242,9 @@ "metadata": {}, "outputs": [], "source": [ - "def egen_insert_var(df: pd.DataFrame, var_name: str, var_col: str, var_value: float) -> pd.DataFrame:\n", + "def egen_insert_var(\n", + " df: pd.DataFrame, var_name: str, var_col: str, var_value: float\n", + ") -> pd.DataFrame:\n", " df = df.infer_objects(copy=False).replace(r\"[ ]+- \", \"\", regex=True)\n", " df.insert(0, \"variable\", var_name)\n", " if var_col != \"fuel type\":\n", @@ -266,16 +261,30 @@ " egen_df.insert(1, \"fuel category\", pd.NA)\n", " egen_df.insert(2, \"producer type\", pd.NA)\n", " powerplants_df = egen_df.iloc[:16].copy()\n", - " powerplants_df = egen_insert_var(powerplants_df, \"Power plants\", \"fuel type\", \"total\")\n", - " powerplants_df.loc[powerplants_df[\"fuel type\"] == \"of which wind offshore\", \"fuel type\"] = \"Wind Offshore\"\n", + " powerplants_df = egen_insert_var(\n", + " powerplants_df, \"Power plants\", \"fuel type\", \"total\"\n", + " )\n", + " powerplants_df.loc[\n", + " powerplants_df[\"fuel type\"] == \"of which wind offshore\", \"fuel type\"\n", + " ] = \"Wind Offshore\"\n", " chp_and_power_df = egen_df.iloc[17:26].copy()\n", - " chp_and_power_df = egen_insert_var(chp_and_power_df, \"Combined heat and power plants\", \"fuel type\", \"total\")\n", + " chp_and_power_df = egen_insert_var(\n", + " chp_and_power_df, \"Combined heat and power plants\", \"fuel type\", \"total\"\n", + " )\n", " chp_by_producer_df = egen_df.iloc[27:30].copy()\n", - " chp_by_producer_df = egen_insert_var(chp_by_producer_df, \"CHP by producer\", \"producer type\", \"total\")\n", + " chp_by_producer_df = egen_insert_var(\n", + " chp_by_producer_df, \"CHP by producer\", \"producer type\", \"total\"\n", + " )\n", " total_pp_gen_df = egen_df.iloc[31:49].copy()\n", - " total_pp_gen_df = egen_insert_var(total_pp_gen_df, \"Generation\", \"Total generation\", \"total\")\n", - " total_pp_gen_df.loc[total_pp_gen_df[\"fuel type\"] == \"total\", \"fuel category\"] = \"All fuels\"\n", - " total_pp_gen_df.loc[total_pp_gen_df[\"fuel type\"].isin(fossil_fuel_types).values, \"fuel category\"] = \"Fossil\"\n", + " total_pp_gen_df = egen_insert_var(\n", + " total_pp_gen_df, \"Generation\", \"Total generation\", \"total\"\n", + " )\n", + " total_pp_gen_df.loc[total_pp_gen_df[\"fuel type\"] == \"total\", \"fuel category\"] = (\n", + " \"All fuels\"\n", + " )\n", + " total_pp_gen_df.loc[\n", + " total_pp_gen_df[\"fuel type\"].isin(fossil_fuel_types).values, \"fuel category\"\n", + " ] = \"Fossil\"\n", " fuel_type_idx = (\n", " total_pp_gen_df[\"fuel type\"]\n", " .isin(\n", @@ -288,12 +297,25 @@ " )\n", " .values\n", " )\n", - " total_pp_gen_df.loc[fuel_type_idx, \"fuel category\"] = total_pp_gen_df.loc[fuel_type_idx, \"fuel type\"]\n", - " total_pp_gen_df.loc[total_pp_gen_df[\"fuel type\"] == \"of which renewable H2\", \"fuel category\"] = \"Renewable H2\"\n", - " total_pp_gen_df.loc[total_pp_gen_df[\"fuel type\"].isin(renewable_fuel_types).values, \"fuel category\"] = (\n", - " \"Renewables (w/o renewable hydrogen)\"\n", - " )\n", - " return pd.concat([powerplants_df, chp_and_power_df, chp_by_producer_df, total_pp_gen_df, stats_df, res_df])" + " total_pp_gen_df.loc[fuel_type_idx, \"fuel category\"] = total_pp_gen_df.loc[\n", + " fuel_type_idx, \"fuel type\"\n", + " ]\n", + " total_pp_gen_df.loc[\n", + " total_pp_gen_df[\"fuel type\"] == \"of which renewable H2\", \"fuel category\"\n", + " ] = \"Renewable H2\"\n", + " total_pp_gen_df.loc[\n", + " total_pp_gen_df[\"fuel type\"].isin(renewable_fuel_types).values, \"fuel category\"\n", + " ] = \"Renewables (w/o renewable hydrogen)\"\n", + " return pd.concat(\n", + " [\n", + " powerplants_df,\n", + " chp_and_power_df,\n", + " chp_by_producer_df,\n", + " total_pp_gen_df,\n", + " stats_df,\n", + " res_df,\n", + " ]\n", + " )" ] }, { @@ -318,7 +340,9 @@ "source": [ "def process_transport(transport_df: pd.DataFrame) -> pd.DataFrame:\n", " res_df = transport_df.iloc[-2:]\n", - " transport_df = transport_df.iloc[:-3].copy().rename(columns={\"variable\": \"fuel type\"})\n", + " transport_df = (\n", + " transport_df.iloc[:-3].copy().rename(columns={\"variable\": \"fuel type\"})\n", + " )\n", " transport_df.insert(1, \"fuel category\", pd.NA)\n", " transport_df.insert(2, \"producer type\", pd.NA)\n", " road_df = transport_df.iloc[:7].copy()\n", @@ -343,7 +367,9 @@ "source": [ "def process_heat_cool(heat_cool_df: pd.DataFrame) -> pd.DataFrame:\n", " res_df = heat_cool_df.iloc[-2:]\n", - " heat_cool_df = heat_cool_df.iloc[:-3].copy().rename(columns={\"variable\": \"fuel type\"})\n", + " heat_cool_df = (\n", + " heat_cool_df.iloc[:-3].copy().rename(columns={\"variable\": \"fuel type\"})\n", + " )\n", " heat_cool_df.insert(1, \"fuel category\", pd.NA)\n", " heat_cool_df.insert(2, \"producer type\", pd.NA)\n", " district_df = heat_cool_df.iloc[:5].copy()\n", @@ -370,10 +396,16 @@ " capacity_df.insert(1, \"fuel category\", pd.NA)\n", " capacity_df.insert(2, \"producer type\", pd.NA)\n", " capacity_df.insert(0, \"variable\", \"Capacity\")\n", - " capacity_df = capacity_df.infer_objects(copy=False).replace(r\"[ ]+- \", \"\", regex=True)\n", - " capacity_df.loc[capacity_df[\"fuel type\"] == \"Total generation\", \"fuel type\"] = \"total\"\n", + " capacity_df = capacity_df.infer_objects(copy=False).replace(\n", + " r\"[ ]+- \", \"\", regex=True\n", + " )\n", + " capacity_df.loc[capacity_df[\"fuel type\"] == \"Total generation\", \"fuel type\"] = (\n", + " \"total\"\n", + " )\n", " capacity_df.loc[capacity_df[\"fuel type\"] == \"total\", \"fuel category\"] = \"All fuels\"\n", - " capacity_df.loc[capacity_df[\"fuel type\"].isin(fossil_fuel_types).values, \"fuel category\"] = \"Fossil\"\n", + " capacity_df.loc[\n", + " capacity_df[\"fuel type\"].isin(fossil_fuel_types).values, \"fuel category\"\n", + " ] = \"Fossil\"\n", " fuel_type_idx = (\n", " capacity_df[\"fuel type\"]\n", " .isin(\n", @@ -386,10 +418,18 @@ " )\n", " .values\n", " )\n", - " capacity_df.loc[fuel_type_idx, \"fuel category\"] = capacity_df.loc[fuel_type_idx, \"fuel type\"]\n", - " capacity_df.loc[capacity_df[\"fuel type\"] == \"of which wind offshore\", \"fuel category\"] = \"Wind\"\n", - " capacity_df.loc[capacity_df[\"fuel type\"] == \"of which wind offshore\", \"fuel type\"] = \"Wind Offshore\"\n", - " capacity_df.loc[capacity_df[\"fuel type\"].isin(renewable_fuel_types).values, \"fuel category\"] = \"Renewables\"\n", + " capacity_df.loc[fuel_type_idx, \"fuel category\"] = capacity_df.loc[\n", + " fuel_type_idx, \"fuel type\"\n", + " ]\n", + " capacity_df.loc[\n", + " capacity_df[\"fuel type\"] == \"of which wind offshore\", \"fuel category\"\n", + " ] = \"Wind\"\n", + " capacity_df.loc[\n", + " capacity_df[\"fuel type\"] == \"of which wind offshore\", \"fuel type\"\n", + " ] = \"Wind Offshore\"\n", + " capacity_df.loc[\n", + " capacity_df[\"fuel type\"].isin(renewable_fuel_types).values, \"fuel category\"\n", + " ] = \"Renewables\"\n", " return pd.concat([capacity_df, res_df])" ] }, @@ -414,7 +454,9 @@ " generic_insert_var(other_df, \"Other sectors\")\n", " non_energy_df = demand_df.iloc[-4:].copy()\n", " generic_insert_var(non_energy_df, \"Other sectors\")\n", - " return pd.concat([total_df, transport_df, industry_df, other_df, res_df, non_energy_df])" + " return pd.concat(\n", + " [total_df, transport_df, industry_df, other_df, res_df, non_energy_df]\n", + " )" ] }, { @@ -426,7 +468,9 @@ "source": [ "def process_emissions(emissions_df: pd.DataFrame) -> pd.DataFrame:\n", " population_df = emissions_df.iloc[-2:]\n", - " emissions_df = emissions_df.iloc[:-8].copy().rename(columns={\"variable\": \"fuel type\"})\n", + " emissions_df = (\n", + " emissions_df.iloc[:-8].copy().rename(columns={\"variable\": \"fuel type\"})\n", + " )\n", " emissions_df.insert(1, \"fuel category\", pd.NA)\n", " emissions_df.insert(2, \"producer type\", pd.NA)\n", " condensation_df = emissions_df.iloc[:5].copy()\n", @@ -439,7 +483,9 @@ " generic_insert_var(CO2e_EI_df, \"CO2 intensity (g/kWh)\")\n", " CO2e_sector_df = pd.concat([emissions_df.iloc[24:25], emissions_df.iloc[27:32]])\n", " generic_insert_var(CO2e_sector_df, \"CO2 emissions by sector\")\n", - " return pd.concat([condensation_df, chp_df, CO2e_df, CO2e_EI_df, CO2e_sector_df, population_df])" + " return pd.concat(\n", + " [condensation_df, chp_df, CO2e_df, CO2e_EI_df, CO2e_sector_df, population_df]\n", + " )" ] }, { @@ -456,9 +502,13 @@ " primary_df.insert(2, \"producer type\", pd.NA)\n", " primary_df.insert(0, \"variable\", \"Primary Energy Demand\")\n", " primary_df = primary_df.infer_objects(copy=False).replace(r\"[ ]+- \", \"\", regex=True)\n", - " primary_df.loc[primary_df[\"fuel type\"] == \"Total (incl. non-energy-use)\", \"fuel type\"] = \"total\"\n", + " primary_df.loc[\n", + " primary_df[\"fuel type\"] == \"Total (incl. non-energy-use)\", \"fuel type\"\n", + " ] = \"total\"\n", " primary_df.loc[primary_df[\"fuel type\"] == \"total\", \"fuel category\"] = \"All fuels\"\n", - " primary_df.loc[primary_df[\"fuel type\"].isin(fossil_fuel_types).values, \"fuel category\"] = \"Fossil\"\n", + " primary_df.loc[\n", + " primary_df[\"fuel type\"].isin(fossil_fuel_types).values, \"fuel category\"\n", + " ] = \"Fossil\"\n", " fuel_type_idx = (\n", " primary_df[\"fuel type\"]\n", " .isin(\n", @@ -469,8 +519,12 @@ " )\n", " .values\n", " )\n", - " primary_df.loc[fuel_type_idx, \"fuel category\"] = primary_df.loc[fuel_type_idx, \"fuel type\"]\n", - " primary_df.loc[primary_df[\"fuel type\"].isin(renewable_fuel_types).values, \"fuel category\"] = \"Renewables\"\n", + " primary_df.loc[fuel_type_idx, \"fuel category\"] = primary_df.loc[\n", + " fuel_type_idx, \"fuel type\"\n", + " ]\n", + " primary_df.loc[\n", + " primary_df[\"fuel type\"].isin(renewable_fuel_types).values, \"fuel category\"\n", + " ] = \"Renewables\"\n", " return pd.concat([primary_df, res_df])" ] }, @@ -492,7 +546,9 @@ "\n", "def process_annex_country(oecm_dir):\n", " data_dir = os.path.join(oecm_dir, \"Annex_Country\")\n", - " for file in [f for f in os.listdir(data_dir) if os.path.isfile(os.path.join(data_dir, f))]:\n", + " for file in [\n", + " f for f in os.listdir(data_dir) if os.path.isfile(os.path.join(data_dir, f))\n", + " ]:\n", " if file.startswith(\"~\") or not file.endswith(\".xlsx\"):\n", " continue\n", " try:\n", @@ -512,13 +568,17 @@ " m = p.match(raw_df.iloc[66, 1])\n", " transport_title, transport_units = m.groups()\n", " transport_df = raw_df.iloc[67:103, 1:15]\n", - " transport_df = set_oecm_indexes(transport_df, region, temperature, transport_title)\n", + " transport_df = set_oecm_indexes(\n", + " transport_df, region, temperature, transport_title\n", + " )\n", " transport_df = process_transport(transport_df)\n", " transport_df_list.append(transport_df)\n", " m = p.match(raw_df.iloc[104, 1])\n", " heat_cool_title, heat_cool_units = m.groups()\n", " heat_cool_df = raw_df.iloc[106:140, 1:15]\n", - " heat_cool_df = set_oecm_indexes(heat_cool_df, region, temperature, heat_cool_title)\n", + " heat_cool_df = set_oecm_indexes(\n", + " heat_cool_df, region, temperature, heat_cool_title\n", + " )\n", " heat_cool_df = process_heat_cool(heat_cool_df)\n", " heat_cool_df_list.append(heat_cool_df)\n", " m = p.match(raw_df.iloc[0, 16])\n", @@ -536,7 +596,9 @@ " m = p.match(raw_df.iloc[78, 16])\n", " emissions_title, emissions_units = m.groups()\n", " emissions_df = raw_df.iloc[79:116, 16:30]\n", - " emissions_df = set_oecm_indexes(emissions_df, region, temperature, emissions_title)\n", + " emissions_df = set_oecm_indexes(\n", + " emissions_df, region, temperature, emissions_title\n", + " )\n", " emissions_df = process_emissions(emissions_df)\n", " emissions_df_list.append(emissions_df)\n", " m = p.match(raw_df.iloc[117, 16])\n", @@ -634,7 +696,16 @@ " df.Unit = df.Unit.replace(r\"[\\[\\]]\", \"\", regex=True)\n", " df.Unit = df.Unit.replace(r\"(.*) */ *([^ ]+ .+)\", r\"\\1/(\\2)\", regex=True)\n", " df.Unit = df.Unit.replace(\n", - " [r\"t ?CO2 (equivalent|equiv\\.)\", r\"p(erson)? km\", r\"steel\", r\"cement\", r\"clinker\", r\"alu\", r\" GDP\", r\"bn \\$$\"],\n", + " [\n", + " r\"t ?CO2 (equivalent|equiv\\.)\",\n", + " r\"p(erson)? km\",\n", + " r\"steel\",\n", + " r\"cement\",\n", + " r\"clinker\",\n", + " r\"alu\",\n", + " r\" GDP\",\n", + " r\"bn \\$$\",\n", + " ],\n", " [\"tCO2e\", \"pkm\", \"Steel\", \"Cement\", \"Clinker\", \"Aluminium\", \"GDP\", \"bn $GDP\"],\n", " regex=True,\n", " )" @@ -655,7 +726,10 @@ " data_dir = os.path.join(oecm_dir, \"Sector_CSV\")\n", " sector_df_list = []\n", "\n", - " for file in sorted([f for f in os.listdir(data_dir) if os.path.isfile(os.path.join(data_dir, f))], reverse=True):\n", + " for file in sorted(\n", + " [f for f in os.listdir(data_dir) if os.path.isfile(os.path.join(data_dir, f))],\n", + " reverse=True,\n", + " ):\n", " if file.startswith(\"~\") or not file.endswith(\".csv\"):\n", " continue\n", " try:\n", @@ -669,11 +743,14 @@ " if region == \"USA\":\n", " S_USA_values = raw_df.Sector.values\n", " elif region == \"Australia\":\n", - " raw_df.loc[raw_df.Subsector.eq(\"Cement - production volume\"), \"Subsector\"] = (\n", - " \"Cement - production volume in mega tonnes per year\"\n", - " )\n", " raw_df.loc[\n", - " raw_df.Subsector.eq(\"Clinker - production volume (based on clinker to cement ratio)\"), \"Subsector\"\n", + " raw_df.Subsector.eq(\"Cement - production volume\"), \"Subsector\"\n", + " ] = \"Cement - production volume in mega tonnes per year\"\n", + " raw_df.loc[\n", + " raw_df.Subsector.eq(\n", + " \"Clinker - production volume (based on clinker to cement ratio)\"\n", + " ),\n", + " \"Subsector\",\n", " ] = \"Clinker - production volume in mega tonnes per year\"\n", " raw_df = raw_df[raw_df.Subsector.notna()]\n", " if len(raw_df.Sector.unique()) == 1:\n", @@ -684,12 +761,18 @@ " raw_df = pd.concat([raw_df.iloc[:first], raw_df.iloc[last + 1 :]])\n", " raw_df = raw_df[raw_df[\"2017\"] != 2017]\n", " raw_df.Sector = S_USA_values\n", - " raw_df.loc[(raw_df.Sector == \"Power Generation\") & (raw_df.Data == \"Energy Intensity\"), \"Data\"] = \"Production\"\n", + " raw_df.loc[\n", + " (raw_df.Sector == \"Power Generation\") & (raw_df.Data == \"Energy Intensity\"),\n", + " \"Data\",\n", + " ] = \"Production\"\n", " # raw_df.loc[raw_df.Sector.isin([\"Aviation\", \"Shipping\", \"Road\"]) & (raw_df.Data==\"Quantity of Service\"), \"Data\"] = \"Production\"\n", " # raw_df.loc[(raw_df.Sector==\"Chemical Industries\") & (raw_df.Data==\"Market\"), \"Data\"] = \"Production\"\n", " # raw_df.loc[(raw_df.Sector==\"Residential & Commercial Buildings and Construction\") & (raw_df.Data==\"Stock\"), \"Data\"] = \"Production\"\n", " # raw_df.loc[raw_df.Data==\"Market Development\", \"Data\"] = \"Production\"\n", - " raw_df.loc[raw_df.Data.isin([\"Quantity of Service\", \"Market\", \"Market Development\"]), \"Data\"] = \"Production\"\n", + " raw_df.loc[\n", + " raw_df.Data.isin([\"Quantity of Service\", \"Market\", \"Market Development\"]),\n", + " \"Data\",\n", + " ] = \"Production\"\n", " raw_df.set_index([\"Sector\", \"Region\", \"Data\", \"Subsector\"], inplace=True)\n", " raw_df.sort_index(inplace=True)\n", " try:\n", @@ -714,12 +797,23 @@ " except KeyError:\n", " print(file)\n", " raw_df.loc[\n", - " (\"Aluminium\", slice(None), \"Production\", \"Minning bauxite - production volume in mega tonnes per year\"),\n", + " (\n", + " \"Aluminium\",\n", + " slice(None),\n", + " \"Production\",\n", + " \"Minning bauxite - production volume in mega tonnes per year\",\n", + " ),\n", " \"Unit\",\n", " ] = \"Mt Bauxite/a\"\n", - " raw_df.loc[(\"Aluminium\", slice(None), \"Production\", \"Annual production volume- aluminium Industry\"), \"Unit\"] = (\n", - " \"Mt Aluminum/a\"\n", - " )\n", + " raw_df.loc[\n", + " (\n", + " \"Aluminium\",\n", + " slice(None),\n", + " \"Production\",\n", + " \"Annual production volume- aluminium Industry\",\n", + " ),\n", + " \"Unit\",\n", + " ] = \"Mt Aluminum/a\"\n", " raw_df.loc[\n", " (\n", " \"Total Materials / Cement\",\n", @@ -744,7 +838,9 @@ " with warnings.catch_warnings():\n", " warnings.filterwarnings(\"error\")\n", " try:\n", - " raw_df = raw_df.apply(lambda x: x.reindex(new_index).interpolate(), axis=1)\n", + " raw_df = raw_df.apply(\n", + " lambda x: x.reindex(new_index).interpolate(), axis=1\n", + " )\n", " except FutureWarning:\n", " print(raw_df.droplevel(level=[1, 2, 3, 4]))\n", " print(raw_df.dtypes)\n", @@ -777,11 +873,15 @@ " data_dir = os.path.join(oecm_dir, \"Scope_CSV\")\n", " scope_df_list = []\n", "\n", - " for file in [f for f in os.listdir(data_dir) if os.path.isfile(os.path.join(data_dir, f))]:\n", + " for file in [\n", + " f for f in os.listdir(data_dir) if os.path.isfile(os.path.join(data_dir, f))\n", + " ]:\n", " if file.startswith(\"~\") or not file.endswith(\".csv\"):\n", " continue\n", " try:\n", - " raw_df = pd.read_csv(os.path.join(data_dir, file), dtype={\"Remarks\": \"string\"}).iloc[:, 1:]\n", + " raw_df = pd.read_csv(\n", + " os.path.join(data_dir, file), dtype={\"Remarks\": \"string\"}\n", + " ).iloc[:, 1:]\n", " except ValueError:\n", " print(file)\n", " raise\n", @@ -789,15 +889,31 @@ " raw_df.Unit = raw_df.Unit.replace(r\"[\\[\\]]\", \"\", regex=True)\n", " raw_df.Unit = raw_df.Unit.replace(r\"CO2 equiv.\", \"CO2e\", regex=True)\n", " # Fix errant \"0\" and other errors in Transport Units\n", - " raw_df.loc[raw_df.Label.str.contains(\"GHG (CO2 only)\", regex=False), \"Unit\"] = \"Mt CO2/a\"\n", + " raw_df.loc[raw_df.Label.str.contains(\"GHG (CO2 only)\", regex=False), \"Unit\"] = (\n", + " \"Mt CO2/a\"\n", + " )\n", " raw_df.Sector = raw_df.Sector.replace(r\" +- *$\", \"\", regex=True)\n", " raw_df.Subsector = raw_df.Subsector.replace(r\" +- *$\", \"\", regex=True)\n", " raw_df.Subsector = raw_df.Subsector.replace(r\" +$\", \"\", regex=True)\n", - " raw_df.Subsector = raw_df.Subsector.replace(r\"&Tobacco\", \"& Tobacco\", regex=True)\n", - " raw_df.Description = raw_df.Description.replace(r\"&Tobacco\", \"& Tobacco\", regex=True)\n", + " raw_df.Subsector = raw_df.Subsector.replace(\n", + " r\"&Tobacco\", \"& Tobacco\", regex=True\n", + " )\n", + " raw_df.Description = raw_df.Description.replace(\n", + " r\"&Tobacco\", \"& Tobacco\", regex=True\n", + " )\n", " raw_df[\"Energy only\"] = raw_df.Description.map(lambda x: \"(energy-only)\" in x)\n", " raw_df.set_index(\n", - " [\"Sector\", \"Subsector\", \"Scope\", \"Label\", \"Energy only\", \"Description\", \"Unit\", \"Remarks\"], inplace=True\n", + " [\n", + " \"Sector\",\n", + " \"Subsector\",\n", + " \"Scope\",\n", + " \"Label\",\n", + " \"Energy only\",\n", + " \"Description\",\n", + " \"Unit\",\n", + " \"Remarks\",\n", + " ],\n", + " inplace=True,\n", " )\n", " raw_df.columns = raw_df.columns.astype(\"Int64\")\n", " new_index = range(2017, 2051)\n", @@ -809,7 +925,16 @@ " scope_df.reset_index([\"Description\"], inplace=True)\n", " scope_df.set_index(\"Region\", append=True, inplace=True)\n", " scope_df = scope_df.reorder_levels(\n", - " [\"Sector\", \"Subsector\", \"Region\", \"Scope\", \"Label\", \"Energy only\", \"Unit\", \"Remarks\"]\n", + " [\n", + " \"Sector\",\n", + " \"Subsector\",\n", + " \"Region\",\n", + " \"Scope\",\n", + " \"Label\",\n", + " \"Energy only\",\n", + " \"Unit\",\n", + " \"Remarks\",\n", + " ]\n", " )\n", " return scope_df" ] @@ -861,7 +986,9 @@ " energy_sum_df_list = []\n", " energy_share_df_list = []\n", "\n", - " for file in [f for f in os.listdir(data_dir) if os.path.isfile(os.path.join(data_dir, f))]:\n", + " for file in [\n", + " f for f in os.listdir(data_dir) if os.path.isfile(os.path.join(data_dir, f))\n", + " ]:\n", " if file.startswith(\"~\") or not file.endswith(\".xlsx\"):\n", " continue\n", " try:\n", @@ -881,9 +1008,9 @@ " raw_df.iloc[:, 2] = raw_df.iloc[:, 2].replace(r\"[\\[\\]]\", \"\", regex=True)\n", " sector_df = raw_df.iloc[2:19, :47]\n", " try:\n", - " sector_df.columns = pd.Index([\"Sector\", \"Industry\", \"Units\", \"Delay\"]).append(\n", - " pd.Index(raw_df.iloc[1, 4:47].astype(\"Int64\"))\n", - " )\n", + " sector_df.columns = pd.Index(\n", + " [\"Sector\", \"Industry\", \"Units\", \"Delay\"]\n", + " ).append(pd.Index(raw_df.iloc[1, 4:47].astype(\"Int64\")))\n", " except (ValueError, TypeError):\n", " print(file)\n", " break\n", @@ -898,9 +1025,9 @@ " sector_share_df.columns = raw_df.iloc[0, 51:52].values\n", " energy_df = raw_df.iloc[energy_start_row : energy_start_row + 8, :47]\n", " try:\n", - " energy_df.columns = pd.Index([\"Energy services\", \"Fossil Fuels\", \"Units\", \"Delay\"]).append(\n", - " pd.Index(raw_df.iloc[energy_start_row, 4:47].astype(\"Int64\"))\n", - " )\n", + " energy_df.columns = pd.Index(\n", + " [\"Energy services\", \"Fossil Fuels\", \"Units\", \"Delay\"]\n", + " ).append(pd.Index(raw_df.iloc[energy_start_row, 4:47].astype(\"Int64\")))\n", " except TypeError:\n", " print(file)\n", " break\n", @@ -938,9 +1065,14 @@ "metadata": {}, "outputs": [], "source": [ - "T_sector_df, T_sector_sum_df, T_sector_share_df, energy_df, energy_sum_df, energy_share_df = process_T_country(\n", - " benchmark_OECM_dir\n", - ")" + "(\n", + " T_sector_df,\n", + " T_sector_sum_df,\n", + " T_sector_share_df,\n", + " energy_df,\n", + " energy_sum_df,\n", + " energy_share_df,\n", + ") = process_T_country(benchmark_OECM_dir)" ] }, { @@ -958,7 +1090,12 @@ "metadata": {}, "outputs": [], "source": [ - "transport_elements = [\"Subsector\", \"Total CO2 Emissions\", \"Emission Intensity\", \"Energy Intensity\"]\n", + "transport_elements = [\n", + " \"Subsector\",\n", + " \"Total CO2 Emissions\",\n", + " \"Emission Intensity\",\n", + " \"Energy Intensity\",\n", + "]\n", "bldgs_elements = [\n", " \"Parameter\",\n", " \"Residential Buildings\",\n", @@ -966,7 +1103,9 @@ " \"Construction: Residential and Commercial Building - Economic value\",\n", "]\n", "\n", - "benchmark_years = pd.Series(name=\"Production\", index=pd.Index(list(range(2017, 2051))), dtype=\"float64\")\n", + "benchmark_years = pd.Series(\n", + " name=\"Production\", index=pd.Index(list(range(2017, 2051))), dtype=\"float64\"\n", + ")\n", "benchmark_years.index.name = \"Year\"\n", "\n", "# Maps Sector (really Sub-Sector) to Sheet data\n", @@ -1257,12 +1396,16 @@ " ei_unit = sector_elements[6]\n", " continue\n", " for region, filename in region_dict.items():\n", - " df = pd.read_excel(f\"{benchmark_OECM_dir}/{filename}.xlsx\", sheet_name=sheet, dtype=str)\n", + " df = pd.read_excel(\n", + " f\"{benchmark_OECM_dir}/{filename}.xlsx\", sheet_name=sheet, dtype=str\n", + " )\n", " orig_df = df.applymap(lambda x: x.rstrip(), na_action=\"ignore\")\n", " print(f\"Sector {subsector} Region {region}\")\n", "\n", " for production_centric in [True, False]:\n", - " df = process_sector_benchmark(orig_df, subsector, region, sector_elements, production_centric)\n", + " df = process_sector_benchmark(\n", + " orig_df, subsector, region, sector_elements, production_centric\n", + " )\n", " if subsector in [\"Oil\", \"Gas\"]:\n", " oil_and_gas_dict[(subsector, region, production_centric)] = df\n", " # It's tempting to concatenate these DataFrames, but doing so wrecks the nice PintArrays created for Production and EI\n", @@ -1279,7 +1422,8 @@ " \"scenario name\": \"OECM 1.5 Degrees\",\n", " \"release date\": \"2022\",\n", " \"projections_nounits\": [\n", - " {\"year\": year, \"value\": value.m} for year, value in zip(df.index, df[f\"EI_{scope}\"])\n", + " {\"year\": year, \"value\": value.m}\n", + " for year, value in zip(df.index, df[f\"EI_{scope}\"])\n", " ],\n", " }\n", " ],\n", @@ -1299,7 +1443,8 @@ " \"scenario name\": \"OECM 1.5 Degrees\",\n", " \"release date\": \"2022\",\n", " \"projections_nounits\": [\n", - " {\"year\": year, \"value\": value.m} for year, value in zip(df.index, df.EI_S3)\n", + " {\"year\": year, \"value\": value.m}\n", + " for year, value in zip(df.index, df.EI_S3)\n", " ],\n", " }\n", " ],\n", @@ -1319,7 +1464,8 @@ " \"release date\": \"2022\",\n", " \"base_year_production\": str(df.Production.values[0]),\n", " \"projections_nounits\": [\n", - " {\"year\": year, \"value\": value} for year, value in zip(df.index, df.d_Production)\n", + " {\"year\": year, \"value\": value}\n", + " for year, value in zip(df.index, df.d_Production)\n", " ],\n", " }\n", " ]\n", @@ -1386,10 +1532,14 @@ "ng.add_transformation(\"[volume] CH4 \", \"[energy]\", lambda ureg, x: x * NG_DENS * NG_SE)\n", "ng.add_transformation(\"[energy]\", \"[volume] CH4\", lambda ureg, x: x / (NG_DENS * NG_SE))\n", "ng.add_transformation(\n", - " \"[carbon] * [length] * [methane] * [time] ** 2\", \"[carbon] * [mass]\", lambda ureg, x: x * NG_DENS * NG_SE\n", + " \"[carbon] * [length] * [methane] * [time] ** 2\",\n", + " \"[carbon] * [mass]\",\n", + " lambda ureg, x: x * NG_DENS * NG_SE,\n", ")\n", "ng.add_transformation(\n", - " \"[carbon] * [mass] / [volume] / [methane]\", \"[carbon] * [mass] / [energy]\", lambda ureg, x: x / (NG_DENS * NG_SE)\n", + " \"[carbon] * [mass] / [volume] / [methane]\",\n", + " \"[carbon] * [mass] / [energy]\",\n", + " lambda ureg, x: x / (NG_DENS * NG_SE),\n", ")\n", "ng.add_transformation(\n", " \"[carbon] * [time] ** 2 / [length] ** 2\",\n", @@ -1400,8 +1550,12 @@ "# Cannot convert from 'megawatt_hour / CH4 / mmscf' ([mass] / [length] / [methane] / [time] ** 2) to 'dimensionless' (dimensionless)\n", "# conversion to dimensionless throws key error on '' in ureg\n", "\n", - "ng.add_transformation(\"Mscf CH4\", \"kg CO2e\", lambda ureg, x: x * ureg(\"54.87 kg CO2e / (Mscf CH4)\"))\n", - "ng.add_transformation(\"g CH4\", \"g CO2e\", lambda ureg, x: x * ureg(\"44 g CO2e / (16 g CH4)\"))\n", + "ng.add_transformation(\n", + " \"Mscf CH4\", \"kg CO2e\", lambda ureg, x: x * ureg(\"54.87 kg CO2e / (Mscf CH4)\")\n", + ")\n", + "ng.add_transformation(\n", + " \"g CH4\", \"g CO2e\", lambda ureg, x: x * ureg(\"44 g CO2e / (16 g CH4)\")\n", + ")\n", "ureg.add_context(ng)\n", "\n", "ureg.enable_contexts(\"oil\", \"ngas\")" @@ -1416,13 +1570,19 @@ "source": [ "for production_centric in [True, False]:\n", " break\n", - " df_all = pd.concat([v for k, v in oil_and_gas_dict.items() if k[2] is production_centric])\n", + " df_all = pd.concat(\n", + " [v for k, v in oil_and_gas_dict.items() if k[2] is production_centric]\n", + " )\n", " for region in df_all.Region.unique():\n", " df = df_all[df_all.Region == region]\n", " df_oil = df[df.Sector == \"Oil\"].copy()\n", - " df_oil.Production = df_oil.Production.astype(f\"pint[{str(df_oil.Production.values[0].u)}]\")\n", + " df_oil.Production = df_oil.Production.astype(\n", + " f\"pint[{str(df_oil.Production.values[0].u)}]\"\n", + " )\n", " df_gas = df[df.Sector == \"Gas\"].copy()\n", - " df_gas.Production = df_gas.Production.astype(f\"pint[{str(df_gas.Production.values[0].u)}]\")\n", + " df_gas.Production = df_gas.Production.astype(\n", + " f\"pint[{str(df_gas.Production.values[0].u)}]\"\n", + " )\n", " o_and_g_em_unit = \"t CO2e\"\n", " o_and_g_prod_unit = \"TJ\"\n", " o_and_g_bm_unit = f\"{o_and_g_em_unit}/{o_and_g_prod_unit}\"\n", @@ -1439,11 +1599,21 @@ " \"projections_nounits\": [\n", " {\n", " \"year\": year,\n", - " \"value\": (oil_em.m_as(o_and_g_em_unit) + gas_em.m_as(o_and_g_em_unit))\n", - " / (oil_prod.m_as(o_and_g_prod_unit) + gas_prod.m_as(o_and_g_prod_unit)),\n", + " \"value\": (\n", + " oil_em.m_as(o_and_g_em_unit)\n", + " + gas_em.m_as(o_and_g_em_unit)\n", + " )\n", + " / (\n", + " oil_prod.m_as(o_and_g_prod_unit)\n", + " + gas_prod.m_as(o_and_g_prod_unit)\n", + " ),\n", " }\n", " for year, oil_em, oil_prod, gas_em, gas_prod in zip(\n", - " df_oil.index, df_oil[scope], df_oil.Production, df_gas[scope], df_gas.Production\n", + " df_oil.index,\n", + " df_oil[scope],\n", + " df_oil.Production,\n", + " df_gas[scope],\n", + " df_gas.Production,\n", " )\n", " ],\n", " }\n", @@ -1466,11 +1636,21 @@ " \"projections_nounits\": [\n", " {\n", " \"year\": year,\n", - " \"value\": (oil_em.m_as(o_and_g_em_unit) + gas_em.m_as(o_and_g_em_unit))\n", - " / (oil_prod.m_as(o_and_g_prod_unit) + gas_prod.m_as(o_and_g_prod_unit)),\n", + " \"value\": (\n", + " oil_em.m_as(o_and_g_em_unit)\n", + " + gas_em.m_as(o_and_g_em_unit)\n", + " )\n", + " / (\n", + " oil_prod.m_as(o_and_g_prod_unit)\n", + " + gas_prod.m_as(o_and_g_prod_unit)\n", + " ),\n", " }\n", " for year, oil_em, oil_prod, gas_em, gas_prod in zip(\n", - " df_oil.index, df_oil[\"S3\"], df_oil.Production, df_gas[\"S3\"], df_gas.Production\n", + " df_oil.index,\n", + " df_oil[\"S3\"],\n", + " df_oil.Production,\n", + " df_gas[\"S3\"],\n", + " df_gas.Production,\n", " )\n", " ],\n", " }\n", @@ -1483,8 +1663,13 @@ " # Alas, we have to re-synthesize the year-over-year growth rate based on the sum PJ of the two components\n", "\n", " if not production_centric:\n", - " base_prod = df_oil.Production.values[0].m_as(\"PJ\") + df_gas.Production.values[0].m_as(\"PJ\")\n", - " prod_series = df_oil.Production.pint.m_as(\"PJ\").add(df_gas.Production.pint.m_as(\"PJ\")) / base_prod\n", + " base_prod = df_oil.Production.values[0].m_as(\n", + " \"PJ\"\n", + " ) + df_gas.Production.values[0].m_as(\"PJ\")\n", + " prod_series = (\n", + " df_oil.Production.pint.m_as(\"PJ\").add(df_gas.Production.pint.m_as(\"PJ\"))\n", + " / base_prod\n", + " )\n", " prod_delta = prod_series.div(prod_series.shift(1))\n", " prod_delta.iloc[0] = 1.0\n", " prod_delta = prod_delta.sub(1.0)\n", @@ -1500,7 +1685,9 @@ " \"base_year_production\": f\"{base_prod} PJ\",\n", " \"projections_nounits\": [\n", " {\"year\": year, \"value\": value}\n", - " for year, value in zip(prod_delta.index, prod_delta.values)\n", + " for year, value in zip(\n", + " prod_delta.index, prod_delta.values\n", + " )\n", " ],\n", " }\n", " ]\n", @@ -1586,7 +1773,9 @@ " {\n", " (idx[0], idx[1], idx[2], idx[3]): {\n", " projection[\"year\"]: projection[\"value\"]\n", - " for projection in production_bm[idx[3]][\"benchmarks\"][idx[4]][\"projections_nounits\"]\n", + " for projection in production_bm[idx[3]][\"benchmarks\"][idx[4]][\n", + " \"projections_nounits\"\n", + " ]\n", " }\n", " for idx in production_index\n", " },\n", @@ -1605,7 +1794,10 @@ "source": [ "benchmark_scopes = [[\"S1\", \"S2\", \"S1S2\", \"S3\", \"S1S2S3\"], [\"S1\", \"S2\", \"S1S2\"]]\n", "\n", - "for wb_filename, production_centric in [(\"benchmark_OECM_S3\", False), (\"benchmark_OECM_PC\", True)]:\n", + "for wb_filename, production_centric in [\n", + " (\"benchmark_OECM_S3\", False),\n", + " (\"benchmark_OECM_PC\", True),\n", + "]:\n", " continue\n", " ei_index = pd.MultiIndex.from_tuples(\n", " [\n", @@ -1625,7 +1817,9 @@ " {\n", " (idx[0], idx[1], idx[2], idx[3]): {\n", " projection[\"year\"]: projection[\"value\"]\n", - " for projection in ei_bms[production_centric][idx[3]][\"benchmarks\"][idx[4]][\"projections_nounits\"]\n", + " for projection in ei_bms[production_centric][idx[3]][\"benchmarks\"][\n", + " idx[4]\n", + " ][\"projections_nounits\"]\n", " }\n", " for idx in ei_index\n", " },\n", diff --git a/notebooks/OECM-benchmark-ingest.ipynb b/notebooks/OECM-benchmark-ingest.ipynb index 80b9ac5..e311f08 100644 --- a/notebooks/OECM-benchmark-ingest.ipynb +++ b/notebooks/OECM-benchmark-ingest.ipynb @@ -40,10 +40,7 @@ "source": [ "import os\n", "import pathlib\n", - "from pathlib import Path\n", - "import io\n", "import json\n", - "import itertools\n", "\n", "import numpy as np\n", "import pandas as pd\n", @@ -57,9 +54,8 @@ "\n", "# See data-platform-demo/pint-demo.ipynb for quantify/dequantify functions\n", "import warnings # needed until quantile behaves better with Pint quantities in arrays\n", - "from pint import set_application_registry, Context, Quantity\n", - "from pint_pandas import PintArray, PintType\n", - "from openscm_units import unit_registry\n", + "from pint import Context\n", + "from pint_pandas import PintArray\n", "from common_units import ureg\n", "\n", "Q_ = ureg.Quantity\n", @@ -136,7 +132,12 @@ "metadata": {}, "outputs": [], "source": [ - "transport_elements = [\"Subsector\", \"Total CO2 Emissions\", \"Emission Intensity\", \"Energy Intensity\"]\n", + "transport_elements = [\n", + " \"Subsector\",\n", + " \"Total CO2 Emissions\",\n", + " \"Emission Intensity\",\n", + " \"Energy Intensity\",\n", + "]\n", "bldgs_elements = [\n", " \"Parameter\",\n", " \"Residential Buildings\",\n", @@ -144,7 +145,9 @@ " \"Construction: Residential and Commercial Building - Economic value\",\n", "]\n", "\n", - "benchmark_years = pd.Series(name=\"Production\", index=pd.Index(list(range(2019, 2051))), dtype=\"float64\")\n", + "benchmark_years = pd.Series(\n", + " name=\"Production\", index=pd.Index(list(range(2019, 2051))), dtype=\"float64\"\n", + ")\n", "benchmark_years.index.name = \"Year\"\n", "\n", "# Maps Sector (really Sub-Sector) to Sheet data\n", @@ -445,7 +448,10 @@ "\n", " # By convention, the d_ column is zero at the start of the series.\n", " # Subsequent values multiply the previous quantity by the present d_ number to get the present quanity\n", - " df[\"d_Production\"] = [0] + [yoy.m - 1 for yoy in (df.Production.values[1:] / df.Production.values[:-1]).tolist()]\n", + " df[\"d_Production\"] = [0] + [\n", + " yoy.m - 1\n", + " for yoy in (df.Production.values[1:] / df.Production.values[:-1]).tolist()\n", + " ]\n", "\n", " # When production goes to zero (a net-zero goal!) treat 0/0 as 0, not np.inf\n", " df_normalized = df.apply(\n", @@ -509,7 +515,9 @@ "]\n", "\n", "\n", - "def process_sector_benchmark(sector_benchmark, subsector, region, sector_elements, production_centric=True):\n", + "def process_sector_benchmark(\n", + " sector_benchmark, subsector, region, sector_elements, production_centric=True\n", + "):\n", " s = sector_benchmark.iloc[:, 1]\n", " sheet = sector_elements[1]\n", " sector = sector_elements[2]\n", @@ -537,7 +545,9 @@ " sector_elements[0],\n", " (\n", " \"Specialties, Inorganic Chemicals, Consumer Products - Economic value\"\n", - " if sector_elements[3].startswith(\"Inorganic Chemicals and Consumer Products\")\n", + " if sector_elements[3].startswith(\n", + " \"Inorganic Chemicals and Consumer Products\"\n", + " )\n", " else sector_elements[3]\n", " ),\n", " f'{subsector} - Scope 1\"',\n", @@ -545,8 +555,12 @@ " f'{subsector} - Scope 3\"', # Alas we presently have no per-subsector Scope 3 data, so this is always NULL\n", " ]\n", "\n", - " df = sector_benchmark.iloc[s.loc[s.isin(df_elements).fillna(False)].index, 1:11]\n", - " ghg_s3 = sector_benchmark.iloc[s.loc[s.eq(\"Chemical Industry total non-energy GHG\")].index, 1:11].squeeze()\n", + " df = sector_benchmark.iloc[\n", + " s.loc[s.isin(df_elements).fillna(False)].index, 1:11\n", + " ]\n", + " ghg_s3 = sector_benchmark.iloc[\n", + " s.loc[s.eq(\"Chemical Industry total non-energy GHG\")].index, 1:11\n", + " ].squeeze()\n", " # Evenly distribute refrigeration among all four sub-sectors\n", " df.iloc[4, 3:] = ghg_s3.iloc[3:].astype(\"float\") / 4.0\n", " if production_centric:\n", @@ -554,17 +568,27 @@ " df.iloc[4, 3:] = 0.0\n", " df.iloc[:, 1] = df.iloc[:, 1].replace(\"million t \", \"Mt \", regex=True)\n", " else:\n", - " df = sector_benchmark.iloc[s.loc[s.isin(df_elements).fillna(False)].index, 1:14][\n", - " [True] * 2 + [False] * 3 + [not production_centric] * 3 + [production_centric] * 3\n", + " df = sector_benchmark.iloc[\n", + " s.loc[s.isin(df_elements).fillna(False)].index, 1:14\n", + " ][\n", + " [True] * 2\n", + " + [False] * 3\n", + " + [not production_centric] * 3\n", + " + [production_centric] * 3\n", " ]\n", " elif sheet == \"Utilities\" and subsector == \"Power Utilities\":\n", " # In both S3 and Production-Centric cases, we use Production-Centric data for Power Utilities\n", - " df = sector_benchmark.iloc[s.loc[s.isin(df_elements).fillna(False)].index, 1:14][\n", - " [True] * 2 + [False] * 3 + [True] * 3\n", - " ]\n", + " df = sector_benchmark.iloc[\n", + " s.loc[s.isin(df_elements).fillna(False)].index, 1:14\n", + " ][[True] * 2 + [False] * 3 + [True] * 3]\n", " elif sheet == \"Tex & Lea\":\n", - " df = sector_benchmark.iloc[s.loc[s.isin(df_elements).fillna(False)].index, 1:14][\n", - " [True] * 2 + [False] + [not production_centric] * 3 + [production_centric] * 3\n", + " df = sector_benchmark.iloc[\n", + " s.loc[s.isin(df_elements).fillna(False)].index, 1:14\n", + " ][\n", + " [True] * 2\n", + " + [False]\n", + " + [not production_centric] * 3\n", + " + [production_centric] * 3\n", " ]\n", " elif sheet == \"Buildings\":\n", " # Note this is built from `subsector` not `sector`\n", @@ -576,15 +600,16 @@ " \" \".join([f\"{subsector} - Scope 3:\", sector_elements[4]]).rstrip(),\n", " ]\n", " # We create our own benchmark data from piece-parts\n", - " df = sector_benchmark.iloc[s.loc[s.isin(df_elements).fillna(False)].index, 1:14][\n", - " [True] * (1 + (\"Construction\" not in subsector)) + [True] * 3\n", - " ]\n", + " df = sector_benchmark.iloc[\n", + " s.loc[s.isin(df_elements).fillna(False)].index, 1:14\n", + " ][[True] * (1 + (\"Construction\" not in subsector)) + [True] * 3]\n", " # Need to create Scope 3 for Building Construction\n", " if \"Construction\" in subsector:\n", " scope2_label = df.iloc[-1, 0]\n", " scope3_label = scope2_label.replace(\"Scope 2\", \"Scope 3\")\n", " scope3_row = pd.Series(\n", - " [scope3_label, df.iloc[-1, 1], df.iloc[-1, 2]] + [0.0] * len(df.iloc[-1, 3:]),\n", + " [scope3_label, df.iloc[-1, 1], df.iloc[-1, 2]]\n", + " + [0.0] * len(df.iloc[-1, 3:]),\n", " index=df.columns,\n", " name=str(int(df.iloc[-1].name) + 2),\n", " )\n", @@ -597,13 +622,19 @@ " f\"{sector} Scope 2: Electricity - own sector use\",\n", " f\"{sector} Scope 3: Total CO2 equivalent\",\n", " ]\n", - " df = sector_benchmark.iloc[s.loc[s.isin(df_elements).fillna(False)].index, 1:14][\n", - " [True] * 2 + [not production_centric] * 3 + [False] * 3 + [production_centric] * 3 + [False] * 3\n", + " df = sector_benchmark.iloc[\n", + " s.loc[s.isin(df_elements).fillna(False)].index, 1:14\n", + " ][\n", + " [True] * 2\n", + " + [not production_centric] * 3\n", + " + [False] * 3\n", + " + [production_centric] * 3\n", + " + [False] * 3\n", " ]\n", " else:\n", - " df = sector_benchmark.iloc[s.loc[s.isin(df_elements).fillna(False)].index, 1:14][\n", - " [True] * 2 + [not production_centric] * 3 + [production_centric] * 3\n", - " ]\n", + " df = sector_benchmark.iloc[\n", + " s.loc[s.isin(df_elements).fillna(False)].index, 1:14\n", + " ][[True] * 2 + [not production_centric] * 3 + [production_centric] * 3]\n", " while df.iloc[0, -1] != \"2050\":\n", " df = df.drop(columns=df.columns[-1])\n", "\n", @@ -615,12 +646,23 @@ " # Now ready to build the DataFrame...\n", " df.columns = [\"Year\", \"Production\", \"S1\", \"S2\", \"S3\"]\n", " df.S3 = df.S3.fillna(0)\n", - " units = df.iloc[1, 1:].map(lambda x: x[1:-1].split(\"/\")[0].replace(\"Mt CO2 equiv.\", \"Mt CO2e\"), na_action=\"ignore\")\n", + " units = df.iloc[1, 1:].map(\n", + " lambda x: x[1:-1].split(\"/\")[0].replace(\"Mt CO2 equiv.\", \"Mt CO2e\"),\n", + " na_action=\"ignore\",\n", + " )\n", " units.replace(\"bn $ GDP\", \"billion USD\")\n", " units.Production = sector_elements[5]\n", " df = (\n", " df.iloc[2:]\n", - " .astype({\"Year\": \"int\", \"Production\": \"float\", \"S1\": \"float\", \"S2\": \"float\", \"S3\": \"float\"})\n", + " .astype(\n", + " {\n", + " \"Year\": \"int\",\n", + " \"Production\": \"float\",\n", + " \"S1\": \"float\",\n", + " \"S2\": \"float\",\n", + " \"S3\": \"float\",\n", + " }\n", + " )\n", " .set_index(\"Year\")\n", " )\n", "\n", @@ -632,7 +674,9 @@ " units.iloc[-1] = units.iloc[-2]\n", " # Need to proportionalize total sector emissions vs. passenger-only and then feed back into total\n", " s = pd.concat([sector_benchmark.iloc[:8, 1], sector_benchmark.iloc[87:, 1]])\n", - " road = sector_benchmark.iloc[s.loc[s.isin(transport_elements).fillna(False)].index, 1:14]\n", + " road = sector_benchmark.iloc[\n", + " s.loc[s.isin(transport_elements).fillna(False)].index, 1:14\n", + " ]\n", " while road.iloc[0, -1] != \"2050\":\n", " road = road.drop(columns=road.columns[-1])\n", " if subsector == \"Road: LDV / Passenger Transport\":\n", @@ -641,14 +685,20 @@ " road = road.dropna(how=\"all\", axis=1)[4:7].T\n", " road.columns = road.iloc[0]\n", " road_units = road.iloc[1].map(\n", - " lambda x: x[1:-1].split(\"/\")[0].replace(\"Mt CO2 equiv.\", \"Mt CO2e\"), na_action=\"ignore\"\n", + " lambda x: x[1:-1].split(\"/\")[0].replace(\"Mt CO2 equiv.\", \"Mt CO2e\"),\n", + " na_action=\"ignore\",\n", " )\n", " road_km = \"pkm\" if subsector == \"Road: LDV / Passenger Transport\" else \"tkm\"\n", " for unit in road_units.index:\n", " if \"Intensity\" in unit:\n", " road_units[unit] = f\"{road_units[unit]} / {road_km}\"\n", " units.Production = (\n", - " (ureg(road_units[\"Total CO2 Emissions\"]) / ureg(road_units[\"Emission Intensity\"])).to(f\"giga{road_km}\").u\n", + " (\n", + " ureg(road_units[\"Total CO2 Emissions\"])\n", + " / ureg(road_units[\"Emission Intensity\"])\n", + " )\n", + " .to(f\"giga{road_km}\")\n", + " .u\n", " )\n", " road = road.iloc[2:].astype(\"float64\")\n", " road.index = df.index\n", @@ -671,7 +721,9 @@ " scopes = [\"S1\", \"S2\", \"S3\"]\n", " total_co2 = df[scopes].sum(axis=1)\n", " for scope in scopes:\n", - " df[scope] = (df[scope] * df[\"Total CO2 Emissions\"] / total_co2).replace(np.nan, 0)\n", + " df[scope] = (df[scope] * df[\"Total CO2 Emissions\"] / total_co2).replace(\n", + " np.nan, 0\n", + " )\n", " df = df.drop(columns=transport_elements[1:])\n", " elif sheet == \"Buildings\":\n", " # Here we get to construct our very own benchmark data!\n", @@ -837,19 +889,29 @@ " sheet = sector_elements[1]\n", " ei_unit = sector_elements[6]\n", " for region, filename in region_dict.items():\n", - " df = pd.read_excel(pathlib.Path(benchmark_OECM_dir, f\"{filename}.xlsx\"), sheet_name=sheet, dtype=str)\n", + " df = pd.read_excel(\n", + " pathlib.Path(benchmark_OECM_dir, f\"{filename}.xlsx\"),\n", + " sheet_name=sheet,\n", + " dtype=str,\n", + " )\n", " if sheet == \"Energy\" and subsector == \"Gas\" and region == \"North America\":\n", " print(\"Correcting...\")\n", " # Correct a typo in input data (North American production-centric data miscalculated for Energy:Gas)\n", - " print(f\"df.iloc[121, 9] = {df.iloc[121, 9]}; df.iloc[127, 9] = {df.iloc[127, 9]}\")\n", + " print(\n", + " f\"df.iloc[121, 9] = {df.iloc[121, 9]}; df.iloc[127, 9] = {df.iloc[127, 9]}\"\n", + " )\n", " df.iloc[121, 9] = str(float(df.iloc[108, 9]) + float(df.iloc[112, 9]))\n", " df.iloc[127, 9] = str(float(df.iloc[114, 9]) + float(df.iloc[118, 9]))\n", - " print(f\"df.iloc[121, 9] = {df.iloc[121, 9]}; df.iloc[127, 9] = {df.iloc[127, 9]}\")\n", + " print(\n", + " f\"df.iloc[121, 9] = {df.iloc[121, 9]}; df.iloc[127, 9] = {df.iloc[127, 9]}\"\n", + " )\n", " orig_df = df.map(lambda x: x.rstrip(), na_action=\"ignore\")\n", " print(f\"Sector {subsector} Region {region}\")\n", "\n", " for production_centric in [True, False]:\n", - " df = process_sector_benchmark(orig_df, subsector, region, sector_elements, production_centric)\n", + " df = process_sector_benchmark(\n", + " orig_df, subsector, region, sector_elements, production_centric\n", + " )\n", " if subsector in [\"Oil\", \"Gas\"]:\n", " oil_and_gas_dict[(subsector, region, production_centric)] = df\n", " # It's tempting to concatenate these DataFrames, but doing so wrecks the nice PintArrays created for Production and EI\n", @@ -866,7 +928,8 @@ " \"scenario name\": \"OECM 1.5 Degrees\",\n", " \"release date\": \"2022\",\n", " \"projections_nounits\": [\n", - " {\"year\": year, \"value\": value.m} for year, value in zip(df.index, df[f\"EI_{scope}\"])\n", + " {\"year\": year, \"value\": value.m}\n", + " for year, value in zip(df.index, df[f\"EI_{scope}\"])\n", " ],\n", " }\n", " ],\n", @@ -886,7 +949,8 @@ " \"scenario name\": \"OECM 1.5 Degrees\",\n", " \"release date\": \"2022\",\n", " \"projections_nounits\": [\n", - " {\"year\": year, \"value\": value.m} for year, value in zip(df.index, df.EI_S3)\n", + " {\"year\": year, \"value\": value.m}\n", + " for year, value in zip(df.index, df.EI_S3)\n", " ],\n", " }\n", " ],\n", @@ -906,7 +970,8 @@ " \"release date\": \"2022\",\n", " \"base_year_production\": str(df.Production.values[0]),\n", " \"projections_nounits\": [\n", - " {\"year\": year, \"value\": value} for year, value in zip(df.index, df.d_Production)\n", + " {\"year\": year, \"value\": value}\n", + " for year, value in zip(df.index, df.d_Production)\n", " ],\n", " }\n", " ]\n", @@ -973,10 +1038,14 @@ "ng.add_transformation(\"[volume] CH4 \", \"[energy]\", lambda ureg, x: x * NG_DENS * NG_SE)\n", "ng.add_transformation(\"[energy]\", \"[volume] CH4\", lambda ureg, x: x / (NG_DENS * NG_SE))\n", "ng.add_transformation(\n", - " \"[carbon] * [length] * [methane] * [time] ** 2\", \"[carbon] * [mass]\", lambda ureg, x: x * NG_DENS * NG_SE\n", + " \"[carbon] * [length] * [methane] * [time] ** 2\",\n", + " \"[carbon] * [mass]\",\n", + " lambda ureg, x: x * NG_DENS * NG_SE,\n", ")\n", "ng.add_transformation(\n", - " \"[carbon] * [mass] / [volume] / [methane]\", \"[carbon] * [mass] / [energy]\", lambda ureg, x: x / (NG_DENS * NG_SE)\n", + " \"[carbon] * [mass] / [volume] / [methane]\",\n", + " \"[carbon] * [mass] / [energy]\",\n", + " lambda ureg, x: x / (NG_DENS * NG_SE),\n", ")\n", "ng.add_transformation(\n", " \"[carbon] * [time] ** 2 / [length] ** 2\",\n", @@ -987,8 +1056,12 @@ "# Cannot convert from 'megawatt_hour / CH4 / mmscf' ([mass] / [length] / [methane] / [time] ** 2) to 'dimensionless' (dimensionless)\n", "# conversion to dimensionless throws key error on '' in ureg\n", "\n", - "ng.add_transformation(\"Mscf CH4\", \"kg CO2e\", lambda ureg, x: x * ureg(\"54.87 kg CO2e / (Mscf CH4)\"))\n", - "ng.add_transformation(\"g CH4\", \"g CO2e\", lambda ureg, x: x * ureg(\"44 g CO2e / (16 g CH4)\"))\n", + "ng.add_transformation(\n", + " \"Mscf CH4\", \"kg CO2e\", lambda ureg, x: x * ureg(\"54.87 kg CO2e / (Mscf CH4)\")\n", + ")\n", + "ng.add_transformation(\n", + " \"g CH4\", \"g CO2e\", lambda ureg, x: x * ureg(\"44 g CO2e / (16 g CH4)\")\n", + ")\n", "ureg.add_context(ng)\n", "\n", "ureg.enable_contexts(\"oil\", \"ngas\")" @@ -1002,13 +1075,19 @@ "outputs": [], "source": [ "for production_centric in [True, False]:\n", - " df_all = pd.concat([v for k, v in oil_and_gas_dict.items() if k[2] is production_centric])\n", + " df_all = pd.concat(\n", + " [v for k, v in oil_and_gas_dict.items() if k[2] is production_centric]\n", + " )\n", " for region in df_all.Region.unique():\n", " df = df_all[df_all.Region == region]\n", " df_oil = df[df.Sector == \"Oil\"].copy()\n", - " df_oil.Production = df_oil.Production.astype(f\"pint[{str(df_oil.Production.values[0].u)}]\")\n", + " df_oil.Production = df_oil.Production.astype(\n", + " f\"pint[{str(df_oil.Production.values[0].u)}]\"\n", + " )\n", " df_gas = df[df.Sector == \"Gas\"].copy()\n", - " df_gas.Production = df_gas.Production.astype(f\"pint[{str(df_gas.Production.values[0].u)}]\")\n", + " df_gas.Production = df_gas.Production.astype(\n", + " f\"pint[{str(df_gas.Production.values[0].u)}]\"\n", + " )\n", " o_and_g_em_unit = \"t CO2e\"\n", " o_and_g_prod_unit = \"TJ\"\n", " o_and_g_bm_unit = f\"{o_and_g_em_unit}/{o_and_g_prod_unit}\"\n", @@ -1025,11 +1104,21 @@ " \"projections_nounits\": [\n", " {\n", " \"year\": year,\n", - " \"value\": (oil_em.m_as(o_and_g_em_unit) + gas_em.m_as(o_and_g_em_unit))\n", - " / (oil_prod.m_as(o_and_g_prod_unit) + gas_prod.m_as(o_and_g_prod_unit)),\n", + " \"value\": (\n", + " oil_em.m_as(o_and_g_em_unit)\n", + " + gas_em.m_as(o_and_g_em_unit)\n", + " )\n", + " / (\n", + " oil_prod.m_as(o_and_g_prod_unit)\n", + " + gas_prod.m_as(o_and_g_prod_unit)\n", + " ),\n", " }\n", " for year, oil_em, oil_prod, gas_em, gas_prod in zip(\n", - " df_oil.index, df_oil[scope], df_oil.Production, df_gas[scope], df_gas.Production\n", + " df_oil.index,\n", + " df_oil[scope],\n", + " df_oil.Production,\n", + " df_gas[scope],\n", + " df_gas.Production,\n", " )\n", " ],\n", " }\n", @@ -1052,11 +1141,21 @@ " \"projections_nounits\": [\n", " {\n", " \"year\": year,\n", - " \"value\": (oil_em.m_as(o_and_g_em_unit) + gas_em.m_as(o_and_g_em_unit))\n", - " / (oil_prod.m_as(o_and_g_prod_unit) + gas_prod.m_as(o_and_g_prod_unit)),\n", + " \"value\": (\n", + " oil_em.m_as(o_and_g_em_unit)\n", + " + gas_em.m_as(o_and_g_em_unit)\n", + " )\n", + " / (\n", + " oil_prod.m_as(o_and_g_prod_unit)\n", + " + gas_prod.m_as(o_and_g_prod_unit)\n", + " ),\n", " }\n", " for year, oil_em, oil_prod, gas_em, gas_prod in zip(\n", - " df_oil.index, df_oil[\"S3\"], df_oil.Production, df_gas[\"S3\"], df_gas.Production\n", + " df_oil.index,\n", + " df_oil[\"S3\"],\n", + " df_oil.Production,\n", + " df_gas[\"S3\"],\n", + " df_gas.Production,\n", " )\n", " ],\n", " }\n", @@ -1069,8 +1168,13 @@ " # Alas, we have to re-synthesize the year-over-year growth rate based on the sum PJ of the two components\n", "\n", " if not production_centric:\n", - " base_prod = df_oil.Production.values[0].m_as(\"PJ\") + df_gas.Production.values[0].m_as(\"PJ\")\n", - " prod_series = df_oil.Production.pint.m_as(\"PJ\").add(df_gas.Production.pint.m_as(\"PJ\")) / base_prod\n", + " base_prod = df_oil.Production.values[0].m_as(\n", + " \"PJ\"\n", + " ) + df_gas.Production.values[0].m_as(\"PJ\")\n", + " prod_series = (\n", + " df_oil.Production.pint.m_as(\"PJ\").add(df_gas.Production.pint.m_as(\"PJ\"))\n", + " / base_prod\n", + " )\n", " prod_delta = prod_series.div(prod_series.shift(1))\n", " prod_delta.iloc[0] = 1.0\n", " prod_delta = prod_delta.sub(1.0)\n", @@ -1086,7 +1190,9 @@ " \"base_year_production\": f\"{base_prod} PJ\",\n", " \"projections_nounits\": [\n", " {\"year\": year, \"value\": value}\n", - " for year, value in zip(prod_delta.index, prod_delta.values)\n", + " for year, value in zip(\n", + " prod_delta.index, prod_delta.values\n", + " )\n", " ],\n", " }\n", " ]\n", @@ -1185,7 +1291,9 @@ " {\n", " (idx[0], idx[1], idx[2], idx[3]): {\n", " projection[\"year\"]: projection[\"value\"]\n", - " for projection in production_bm[idx[3]][\"benchmarks\"][idx[4]][\"projections_nounits\"]\n", + " for projection in production_bm[idx[3]][\"benchmarks\"][idx[4]][\n", + " \"projections_nounits\"\n", + " ]\n", " }\n", " for idx in production_index\n", " },\n", @@ -1204,7 +1312,10 @@ "source": [ "benchmark_scopes = [[\"S1\", \"S2\", \"S1S2\", \"S3\", \"S1S2S3\"], [\"S1\", \"S2\", \"S1S2\"]]\n", "\n", - "for wb_filename, production_centric in [(\"benchmark_OECM_S3\", False), (\"benchmark_OECM_PC\", True)]:\n", + "for wb_filename, production_centric in [\n", + " (\"benchmark_OECM_S3\", False),\n", + " (\"benchmark_OECM_PC\", True),\n", + "]:\n", " ei_index = pd.MultiIndex.from_tuples(\n", " [\n", " (\n", @@ -1223,7 +1334,9 @@ " {\n", " (idx[0], idx[1], idx[2], idx[3]): {\n", " projection[\"year\"]: projection[\"value\"]\n", - " for projection in ei_bms[production_centric][idx[3]][\"benchmarks\"][idx[4]][\"projections_nounits\"]\n", + " for projection in ei_bms[production_centric][idx[3]][\"benchmarks\"][\n", + " idx[4]\n", + " ][\"projections_nounits\"]\n", " }\n", " for idx in ei_index\n", " },\n", diff --git a/notebooks/TPI-benchmark-ingest.ipynb b/notebooks/TPI-benchmark-ingest.ipynb index e72f501..90724d2 100644 --- a/notebooks/TPI-benchmark-ingest.ipynb +++ b/notebooks/TPI-benchmark-ingest.ipynb @@ -34,9 +34,6 @@ "import pathlib\n", "import numpy as np\n", "import pandas as pd\n", - "import trino\n", - "from sqlalchemy.engine import create_engine\n", - "import osc_ingest_trino as osc\n", "\n", "# import python_pachyderm" ] @@ -48,10 +45,8 @@ "metadata": {}, "outputs": [], "source": [ - "import io\n", "import json\n", - "from math import log10\n", - "import itertools" + "from math import log10" ] }, { @@ -71,10 +66,7 @@ "source": [ "# See data-platform-demo/pint-demo.ipynb for quantify/dequantify functions\n", "\n", - "import warnings # needed until quantile behaves better with Pint quantities in arrays\n", - "from pint import set_application_registry, Quantity\n", - "from pint_pandas import PintArray, PintType\n", - "from openscm_units import unit_registry\n", + "from pint_pandas import PintArray\n", "from common_units import ureg\n", "\n", "Q_ = ureg.Quantity\n", @@ -483,9 +475,14 @@ " benchmark_global_budget = 646\n", " else:\n", " benchmark_temperature = 2.0\n", - " benchmark_global_budget = 1229 # starting from 1.5 @ 66% prob, plus 0.5C at 0.0006 tcre\n", + " benchmark_global_budget = (\n", + " 1229 # starting from 1.5 @ 66% prob, plus 0.5C at 0.0006 tcre\n", + " )\n", " df = csv_df[csv_df[\"Scenario name\"].eq(scenario_name)]\n", - " idx = df.groupby([\"Sector name\", \"Region\"])[\"Release date\"].transform(\"max\") == df[\"Release date\"]\n", + " idx = (\n", + " df.groupby([\"Sector name\", \"Region\"])[\"Release date\"].transform(\"max\")\n", + " == df[\"Release date\"]\n", + " )\n", " df = df.loc[idx].copy()\n", " df[\"benchmark_temperature\"] = benchmark_temperature\n", " df[\"benchmark_global_budget\"] = benchmark_global_budget\n", @@ -797,7 +794,10 @@ " \"benchmark_metric\": row[\"Unit\"],\n", " \"scenario name\": f\"TPI {scenario_name}\",\n", " \"release date\": str(row[\"Release date\"]).split(\" \")[0],\n", - " \"projections_nounits\": [{\"year\": year, \"value\": row[str(year)]} for year in range(2019, 2051)],\n", + " \"projections_nounits\": [\n", + " {\"year\": year, \"value\": row[str(year)]}\n", + " for year in range(2019, 2051)\n", + " ],\n", " }\n", " for index, row in df.iterrows()\n", " if ei_sectors_scope[row[\"Sector name\"]] == scope\n", @@ -837,7 +837,9 @@ "source": [ "for scenario_name, bm in ei_bms.items():\n", " path_name = scenario_name.translate(str.maketrans(\" .-\", \"___\", \"()\")).lower()\n", - " with open(pathlib.Path(output_datadir, f\"benchmark_EI_TPI_{path_name}.json\"), \"w\") as f:\n", + " with open(\n", + " pathlib.Path(output_datadir, f\"benchmark_EI_TPI_{path_name}.json\"), \"w\"\n", + " ) as f:\n", " json.dump(round_floats(bm), sort_keys=False, indent=2, fp=f)\n", " print(\"\", file=f)" ] diff --git a/notebooks/osc-ingest-rmi_utility_transition_hub.ipynb b/notebooks/osc-ingest-rmi_utility_transition_hub.ipynb index 96991b4..0e49b0f 100644 --- a/notebooks/osc-ingest-rmi_utility_transition_hub.ipynb +++ b/notebooks/osc-ingest-rmi_utility_transition_hub.ipynb @@ -40,26 +40,19 @@ "# From the AWS Account page, copy the export scripts from the appropriate role using the \"Command Line or Programmatic Access\" link\n", "# Paste the copied text into ~/credentials.env\n", "\n", - "import sys\n", "import os\n", "import io\n", - "import pathlib\n", "from pathlib import Path\n", "import shutil\n", - "import math\n", "\n", "import osc_ingest_trino as osc\n", "\n", - "import trino\n", - "from sqlalchemy.engine import create_engine\n", - "from sqlalchemy import text\n", "import boto3\n", "import zipfile\n", "\n", "# import pyarrow as pa\n", "# import pyarrow.parquet as pq\n", "import json\n", - "from io import BytesIO\n", "import datetime\n", "\n", "import numpy as np\n", @@ -88,7 +81,9 @@ "iceberg_schema = \"rmi\"\n", "rmi_table_prefix = \"\"\n", "\n", - "engine = osc.attach_trino_engine(verbose=True, catalog=iceberg_catalog, schema=iceberg_schema)\n", + "engine = osc.attach_trino_engine(\n", + " verbose=True, catalog=iceberg_catalog, schema=iceberg_schema\n", + ")\n", "cxn = engine.connect()" ] }, @@ -225,7 +220,12 @@ " \"earnings_value\": \"float64\",\n", " \"investment_value\": \"float64\",\n", " },\n", - " \"customers_sales\": {\"respondent_id\": \"int32\", \"customers\": \"Int32\", \"sales\": \"float64\", \"revenues\": \"float64\"},\n", + " \"customers_sales\": {\n", + " \"respondent_id\": \"int32\",\n", + " \"customers\": \"Int32\",\n", + " \"sales\": \"float64\",\n", + " \"revenues\": \"float64\",\n", + " },\n", " \"debt_equity_returns\": {\n", " \"respondent_id\": \"int32\",\n", " \"rate_base_actual\": \"float64\",\n", @@ -273,7 +273,11 @@ " \"burden\": \"float64\",\n", " },\n", " \"expenditure_bills_burden_detail\": \"string\",\n", - " \"housing_units_income\": {\"respondent_id\": \"int32\", \"housing_units\": \"float64\", \"income\": \"float64\"},\n", + " \"housing_units_income\": {\n", + " \"respondent_id\": \"int32\",\n", + " \"housing_units\": \"float64\",\n", + " \"income\": \"float64\",\n", + " },\n", " \"net_plant_balance\": {\n", " \"respondent_id\": \"int32\",\n", " \"original_cost\": \"float64\",\n", @@ -317,10 +321,18 @@ " \"balancing_authority_name_eia\": \"object\",\n", " \"iso_rto_code\": \"object\",\n", " },\n", - " \"revenue_by_tech\": {\"respondent_id\": \"int32\", \"revenue_total\": \"float64\", \"revenue_residential\": \"float64\"},\n", + " \"revenue_by_tech\": {\n", + " \"respondent_id\": \"int32\",\n", + " \"revenue_total\": \"float64\",\n", + " \"revenue_residential\": \"float64\",\n", + " },\n", " \"state_policies\": \"string\",\n", " \"state_targets\": \"string\",\n", - " \"utility_information\": {\"respondent_id\": \"int32\", \"utility_id_eia\": \"Int32\", \"duplicate_utility_id_eia\": \"boolean\"},\n", + " \"utility_information\": {\n", + " \"respondent_id\": \"int32\",\n", + " \"utility_id_eia\": \"Int32\",\n", + " \"duplicate_utility_id_eia\": \"boolean\",\n", + " },\n", " \"utility_information_2023\": {\n", " \"respondent_id\": \"int32\",\n", " \"utility_id_ferc1\": \"Int32\",\n", @@ -344,7 +356,11 @@ "}\n", "\n", "fillna_dict = {\n", - " \"assets_earnings_investments\": {\"asset_value\": 0, \"earnings_value\": 0, \"investment_value\": 0},\n", + " \"assets_earnings_investments\": {\n", + " \"asset_value\": 0,\n", + " \"earnings_value\": 0,\n", + " \"investment_value\": 0,\n", + " },\n", " \"customer_sales\": {\"customers\": 0, \"sales\": 0, \"revenues\": 0},\n", "}\n", "\n", @@ -371,16 +387,22 @@ "outputs": [], "source": [ "if False:\n", - " rmi_20210929_b = io.BytesIO(source_bucket.Object(\"RMI/RMI-20210929.zip\").get()[\"Body\"].read())\n", + " rmi_20210929_b = io.BytesIO(\n", + " source_bucket.Object(\"RMI/RMI-20210929.zip\").get()[\"Body\"].read()\n", + " )\n", " rmi_20210929_zip = zipfile.ZipFile(rmi_20210929_b, mode=\"r\")\n", " del rmi_20210929_b\n", " # display(zipfile.ZipFile(rmi_20210929_zip, mode='r').filelist)\n", - " rmi_dd = rmi_20210929_zip.read(\"data_download/RMI Utility Transition Hub Data Dictionary.xlsx\")\n", + " rmi_dd = rmi_20210929_zip.read(\n", + " \"data_download/RMI Utility Transition Hub Data Dictionary.xlsx\"\n", + " )\n", "\n", " # Read all the sheets. rmi_excel['sheet_name'] gives a specific sheet\n", " rmi_20210929_xls = pd.read_excel(rmi_dd, sheet_name=None, dtype=str)\n", "\n", - " rmi_20211120_b = io.BytesIO(source_bucket.Object(\"RMI/RMI-20211120.zip\").get()[\"Body\"].read())\n", + " rmi_20211120_b = io.BytesIO(\n", + " source_bucket.Object(\"RMI/RMI-20211120.zip\").get()[\"Body\"].read()\n", + " )\n", " rmi_20211120_zip = zipfile.ZipFile(rmi_20211120_b, mode=\"r\")\n", " del rmi_20211120_b\n", " # display(zipfile.ZipFile(rmi_20211120_zip, mode='r').filelist)\n", @@ -390,7 +412,9 @@ " rmi_20211120_xls = pd.read_excel(rmi_dd, sheet_name=None, dtype=str)\n", "\n", "# When updated, we'll get the data dictionary from 2023 zipfile.\n", - "rmi_20220307_b = io.BytesIO(source_bucket.Object(\"RMI/RMI-20220307.zip\").get()[\"Body\"].read())\n", + "rmi_20220307_b = io.BytesIO(\n", + " source_bucket.Object(\"RMI/RMI-20220307.zip\").get()[\"Body\"].read()\n", + ")\n", "rmi_20220307_zip = zipfile.ZipFile(rmi_20220307_b, mode=\"r\")\n", "del rmi_20220307_b\n", "# display(zipfile.ZipFile(rmi_20220307_zip, mode='r').filelist)\n", @@ -401,7 +425,9 @@ "\n", "del rmi_dd\n", "\n", - "rmi_20230202_b = io.BytesIO(source_bucket.Object(\"RMI/RMI-20230202.zip\").get()[\"Body\"].read())\n", + "rmi_20230202_b = io.BytesIO(\n", + " source_bucket.Object(\"RMI/RMI-20230202.zip\").get()[\"Body\"].read()\n", + ")\n", "rmi_20230202_zip = zipfile.ZipFile(rmi_20230202_b, mode=\"r\")\n", "del rmi_20230202_b" ] @@ -420,7 +446,9 @@ "\n", "\n", "def drop_cols_by_index(df, index):\n", - " column_numbers = [x - 1 for x in range(df.shape[1], 0, -1)] # reversed list of columns' integer indices\n", + " column_numbers = [\n", + " x - 1 for x in range(df.shape[1], 0, -1)\n", + " ] # reversed list of columns' integer indices\n", " if type(index) == list:\n", " index.sort()\n", " else:\n", @@ -457,7 +485,9 @@ "\n", "\n", "def get_meta_fields_from_dd(dd):\n", - " meta_fields = {k: {\"description\": v} for k, v in list(zip(dd[\"Data field\"], dd[\"Definition\"]))}\n", + " meta_fields = {\n", + " k: {\"description\": v} for k, v in list(zip(dd[\"Data field\"], dd[\"Definition\"]))\n", + " }\n", " for field, dim in list(zip(dd[\"Data field\"], dd[\"Units\"])):\n", " if dim == \"\":\n", " continue\n", @@ -526,7 +556,12 @@ " }\n", " # Temporary fix for bad November 2020 data\n", " if sheet == \"revenue_by_tech\":\n", - " dd = dd.rename(columns={\" revenue_total \": \"revenue_total\", \" revenue_residential \": \"revenue_residential\"})\n", + " dd = dd.rename(\n", + " columns={\n", + " \" revenue_total \": \"revenue_total\",\n", + " \" revenue_residential \": \"revenue_residential\",\n", + " }\n", + " )\n", " if dd.iloc[-1][\"Data field\"] == \"Additional notes\":\n", " meta_content[\"Additional_notes\"] = \"null\"\n", " dd = dd.drop(dd.index[-1:])\n", @@ -577,7 +612,9 @@ " meta_fields = get_meta_fields_from_dd(dd)\n", " meta_fields[\"utility_id_eia\"] = {\"description\": \"Utility Code from EIA\"}\n", " del meta_fields[\"respondent_id\"]\n", - " meta_fields[\"utility_type_rmi\"] = {\"description\": \"Type of utility as classified by RMI\"}\n", + " meta_fields[\"utility_type_rmi\"] = {\n", + " \"description\": \"Type of utility as classified by RMI\"\n", + " }\n", " meta_fields[\"operational_status_code\"] = meta_fields[\"status\"]\n", " del meta_fields[\"status\"]\n", " if \"fuel_type_code\" in meta_fields:\n", @@ -703,7 +740,9 @@ " return generate_assets_meta(sheet, dd)\n", " if sheet == \"emissions_targets\":\n", " meta_fields, meta_content = generate_generic_meta(sheet, dd)\n", - " meta_fields[\"target_scope\"] = {\"description\": \"Scope 1 (own generation) or Scope 3 (purchased generation)\"}\n", + " meta_fields[\"target_scope\"] = {\n", + " \"description\": \"Scope 1 (own generation) or Scope 3 (purchased generation)\"\n", + " }\n", " return meta_fields, meta_content\n", " if sheet == \"utility_information\":\n", " meta_fields, meta_content = generate_generic_meta(sheet, dd)\n", @@ -725,7 +764,12 @@ " meta_fields[\"utility_id_eia\"] = meta_fields[\"respondent_id\"]\n", " del meta_fields[\"respondent_id\"]\n", " return meta_fields, meta_content\n", - " if sheet in [\"employees\", \"expenditure_bills_burden\", \"expenditure_bills_burden_detail\", \"revenue_by_tech\"]:\n", + " if sheet in [\n", + " \"employees\",\n", + " \"expenditure_bills_burden\",\n", + " \"expenditure_bills_burden_detail\",\n", + " \"revenue_by_tech\",\n", + " ]:\n", " # Both tables have the same essential shape\n", " return generate_bills_meta(sheet, dd)\n", " if sheet in [\"operations_emissions_by_fuel\", \"operations_emissions_by_tech\"]:\n", @@ -834,7 +878,9 @@ " operations_emissions_content = None\n", " operations_emissions_fields = None\n", "\n", - " overview_meta_fields, overview_meta_content = generate_sheet_meta(workbook, \"Overview\", release_date, None)\n", + " overview_meta_fields, overview_meta_content = generate_sheet_meta(\n", + " workbook, \"Overview\", release_date, None\n", + " )\n", "\n", " for zipinfo in zipfile.infolist():\n", " fname = zipinfo.filename\n", @@ -855,47 +901,93 @@ " with zipfile.open(fname) as zf:\n", " if tablename == \"state_utility_policies\":\n", " df = pd.read_csv(\n", - " zf, dtype={\"respondent_id\": \"int32\"}, parse_dates=[\"date_updated\"], dayfirst=True, engine=\"c\"\n", + " zf,\n", + " dtype={\"respondent_id\": \"int32\"},\n", + " parse_dates=[\"date_updated\"],\n", + " dayfirst=True,\n", + " engine=\"c\",\n", " )\n", " elif tablename.startswith(\"utility\"):\n", " df = pd.read_csv(zf, dtype=dtype_dict[tablename], engine=\"c\")\n", " if release_date[-4:] <= \"2021\" and tablename == \"utility_information\":\n", " # Correct information for 'American Transmission Co LLC', which is owned by 'WEC Energy Group'\n", - " df.loc[df.respondent_id == 275, [\"parent_name\", \"parent_ticker\", \"parent_ISIN\", \"parent_LEI\"]] = (\n", - " df.loc[\n", - " df.respondent_id == 519, [\"parent_name\", \"parent_ticker\", \"parent_ISIN\", \"parent_LEI\"]\n", - " ].values\n", + " df.loc[\n", + " df.respondent_id == 275,\n", + " [\"parent_name\", \"parent_ticker\", \"parent_ISIN\", \"parent_LEI\"],\n", + " ] = df.loc[\n", + " df.respondent_id == 519,\n", + " [\"parent_name\", \"parent_ticker\", \"parent_ISIN\", \"parent_LEI\"],\n", + " ].values\n", + " df = df.rename(\n", + " columns={\n", + " \"parent_ISIN\": \"parent_isin\",\n", + " \"parent_LEI\": \"parent_lei\",\n", + " }\n", " )\n", - " df = df.rename(columns={\"parent_ISIN\": \"parent_isin\", \"parent_LEI\": \"parent_lei\"})\n", " if release_date[-4:] < \"2023\" and tablename == \"utility_information\":\n", " # Correct several LEI errors and omissions\n", - " df.loc[df.parent_name == \"American Electric Power Co., Inc.\", \"parent_lei\"] = \"1B4S6S7G0TW5EE83BO58\"\n", - " df.loc[df.parent_name == \"American States Water Co.\", \"parent_isin\"] = first_valid_col_value(\n", + " df.loc[\n", + " df.parent_name == \"American Electric Power Co., Inc.\",\n", + " \"parent_lei\",\n", + " ] = \"1B4S6S7G0TW5EE83BO58\"\n", + " df.loc[\n", + " df.parent_name == \"American States Water Co.\", \"parent_isin\"\n", + " ] = first_valid_col_value(\n", " df[df.parent_name == \"American States Water Co.\"], \"parent_isin\"\n", " )\n", - " df.loc[df.parent_name == \"American States Water Co.\", \"parent_lei\"] = \"529900L26LIS2V8PWM23\"\n", + " df.loc[\n", + " df.parent_name == \"American States Water Co.\", \"parent_lei\"\n", + " ] = \"529900L26LIS2V8PWM23\"\n", " # Berkshire Hathaway is 5493000C01ZX7D35SD85 but we're really dealing with the energy company,\n", " # which has the LEI 549300JD0S5IZJE9LY15. But while the energy company has a detailed revenue breakdown,\n", " # it doesn't have a proper market cap.\n", - " df.loc[df.parent_name == \"Berkshire Hathaway, Inc.\", \"parent_lei\"] = \"549300JD0S5IZJE9LY15\"\n", - " df.loc[df.parent_name == \"Citizens Energy Corp.\", \"parent_lei\"] = \"5493008ORX814MK1WM19\"\n", - " df.loc[df.parent_name == \"FirstEnergy Corp.\", \"parent_lei\"] = \"549300SVYJS666PQJH88\"\n", - " df.loc[df.parent_name == \"LS Power\", \"parent_lei\"] = \"549300Z88AAE0R1YHI77\"\n", + " df.loc[\n", + " df.parent_name == \"Berkshire Hathaway, Inc.\", \"parent_lei\"\n", + " ] = \"549300JD0S5IZJE9LY15\"\n", + " df.loc[df.parent_name == \"Citizens Energy Corp.\", \"parent_lei\"] = (\n", + " \"5493008ORX814MK1WM19\"\n", + " )\n", + " df.loc[df.parent_name == \"FirstEnergy Corp.\", \"parent_lei\"] = (\n", + " \"549300SVYJS666PQJH88\"\n", + " )\n", + " df.loc[df.parent_name == \"LS Power\", \"parent_lei\"] = (\n", + " \"549300Z88AAE0R1YHI77\"\n", + " )\n", " # df.loc[df.parent_name=='National Grid PLC', 'parent_lei'] = 'MOM4570XTJ5YYX7JKH83'\n", " # df.loc[df.parent_name=='National Grid PLC', 'parent_isin'] = 'US6362744095'\n", - " df.loc[df.parent_name == \"NextEra Energy, Inc.\", \"parent_lei\"] = \"UMI46YPGBLUE4VGNNT48\"\n", + " df.loc[df.parent_name == \"NextEra Energy, Inc.\", \"parent_lei\"] = (\n", + " \"UMI46YPGBLUE4VGNNT48\"\n", + " )\n", " # AEP owns OVEC\n", - " df.loc[df.parent_name == \"Ohio Valley Electric Corp.\", \"parent_lei\"] = \"1B4S6S7G0TW5EE83BO58\"\n", - " df.loc[df.parent_name == \"Ohio Valley Electric Corp.\", \"parent_ticker\"] = \"AEP\"\n", - " df.loc[df.parent_name == \"Ohio Valley Electric Corp.\", \"parent_isin\"] = \"US0255371017\"\n", - " df.loc[df.parent_name == \"PG&E Corp.\", \"parent_lei\"] = \"8YQ2GSDWYZXO2EDN3511\"\n", - " df.loc[df.parent_name == \"Sempra\", \"parent_isin\"] = first_valid_col_value(\n", - " df[df.parent_name == \"Sempra\"], \"parent_isin\"\n", + " df.loc[\n", + " df.parent_name == \"Ohio Valley Electric Corp.\", \"parent_lei\"\n", + " ] = \"1B4S6S7G0TW5EE83BO58\"\n", + " df.loc[\n", + " df.parent_name == \"Ohio Valley Electric Corp.\", \"parent_ticker\"\n", + " ] = \"AEP\"\n", + " df.loc[\n", + " df.parent_name == \"Ohio Valley Electric Corp.\", \"parent_isin\"\n", + " ] = \"US0255371017\"\n", + " df.loc[df.parent_name == \"PG&E Corp.\", \"parent_lei\"] = (\n", + " \"8YQ2GSDWYZXO2EDN3511\"\n", + " )\n", + " df.loc[df.parent_name == \"Sempra\", \"parent_isin\"] = (\n", + " first_valid_col_value(\n", + " df[df.parent_name == \"Sempra\"], \"parent_isin\"\n", + " )\n", + " )\n", + " df.loc[df.parent_name == \"Sempra\", \"parent_lei\"] = (\n", + " \"PBBKGKLRK5S5C0Y4T545\"\n", + " )\n", + " df.loc[df.parent_name == \"Unitil Corp.\", \"parent_lei\"] = (\n", + " \"549300EYGHO5EZE7RL80\"\n", + " )\n", + " df.loc[df.parent_name == \"Verso Corp.\", \"parent_lei\"] = (\n", + " \"549300FODXCTQ8DGT594\"\n", + " )\n", + " df.loc[df.parent_name == \"Verso Corp.\", \"parent_isin\"] = (\n", + " \"US92531L2079\"\n", " )\n", - " df.loc[df.parent_name == \"Sempra\", \"parent_lei\"] = \"PBBKGKLRK5S5C0Y4T545\"\n", - " df.loc[df.parent_name == \"Unitil Corp.\", \"parent_lei\"] = \"549300EYGHO5EZE7RL80\"\n", - " df.loc[df.parent_name == \"Verso Corp.\", \"parent_lei\"] = \"549300FODXCTQ8DGT594\"\n", - " df.loc[df.parent_name == \"Verso Corp.\", \"parent_isin\"] = \"US92531L2079\"\n", " if tablename.startswith(\"utility_information\"):\n", " if tablename.endswith(\"2023\"):\n", " cols = (\"parent_lei\", \"isin\")\n", @@ -905,14 +997,29 @@ " df.loc[idx, \"parent_name\"] = \"Constellation Energy Commodities\"\n", " df.loc[idx, \"utility_name\"] = \"Constellation Energy Commodities\"\n", " df.loc[idx, cols] = [\"549300F8Y20RYGNGV346\", \"US21037T1097\"]\n", - " df.loc[df.parent_name == \"E.ON\", cols] = [\"Q9MAIUP40P25UFBFG033\", \"DE000ENAG999\"]\n", + " df.loc[df.parent_name == \"E.ON\", cols] = [\n", + " \"Q9MAIUP40P25UFBFG033\",\n", + " \"DE000ENAG999\",\n", + " ]\n", " # Exelon Corp\n", - " df.loc[df.parent_name == \"Exelon Energy Co\", cols] = [\"5493006XZOVPY83VHP23\", \"US30161N1019\"]\n", + " df.loc[df.parent_name == \"Exelon Energy Co\", cols] = [\n", + " \"5493006XZOVPY83VHP23\",\n", + " \"US30161N1019\",\n", + " ]\n", " # NextEra Corp\n", - " df.loc[df.parent_name == \"FPL Energy\", cols] = [\"549300B8P43NZZ2P4080\", \"US65339F1012\"]\n", + " df.loc[df.parent_name == \"FPL Energy\", cols] = [\n", + " \"549300B8P43NZZ2P4080\",\n", + " \"US65339F1012\",\n", + " ]\n", " # Ørsted A/S\n", - " df.loc[df.parent_name == \"Orsted\", cols] = [\"W9NG6WMZIYEU8VEDOG48\", \"DK0060094928\"]\n", - " df.loc[df.parent_name == \"Reliant Energy Inc\", cols] = [\"21TPXMRRHFKOBHDC8J74\", \"US6293775085\"]\n", + " df.loc[df.parent_name == \"Orsted\", cols] = [\n", + " \"W9NG6WMZIYEU8VEDOG48\",\n", + " \"DK0060094928\",\n", + " ]\n", + " df.loc[df.parent_name == \"Reliant Energy Inc\", cols] = [\n", + " \"21TPXMRRHFKOBHDC8J74\",\n", + " \"US6293775085\",\n", + " ]\n", " elif tablename == \"state_targets\":\n", " df = pd.read_csv(zf, dtype=dtype_dict[tablename], engine=\"c\")\n", " df.year = df.year.fillna(\"-1\").astype(\"string\")\n", @@ -920,7 +1027,9 @@ " df.loc[df.year.isin([\"Annual\", \"2005/1990\"]), \"year\"] = \"-1\"\n", " df.year = df.year.astype(\"int32\")\n", " else:\n", - " df = pd.read_csv(zf, dtype=dtype_dict[tablename], thousands=\",\", engine=\"c\")\n", + " df = pd.read_csv(\n", + " zf, dtype=dtype_dict[tablename], thousands=\",\", engine=\"c\"\n", + " )\n", " if \"year\" in df.columns:\n", " df.year = df.year.fillna(\"-1\").astype(\"int32\")\n", " if tablename == \"revenue_by_tech\":\n", @@ -938,12 +1047,19 @@ " return s\n", "\n", " df.rename(\n", - " columns={\" revenue_total \": \"revenue_total\", \" revenue_residential \": \"revenue_residential\"},\n", + " columns={\n", + " \" revenue_total \": \"revenue_total\",\n", + " \" revenue_residential \": \"revenue_residential\",\n", + " },\n", " inplace=True,\n", " )\n", " if release_date[-4:] < \"2022\":\n", - " df.revenue_total = df.revenue_total.map(cleanup_2020_numbers).astype(\"float64\")\n", - " df.revenue_residential = df.revenue_residential.map(cleanup_2020_numbers).astype(\"float64\")\n", + " df.revenue_total = df.revenue_total.map(\n", + " cleanup_2020_numbers\n", + " ).astype(\"float64\")\n", + " df.revenue_residential = df.revenue_residential.map(\n", + " cleanup_2020_numbers\n", + " ).astype(\"float64\")\n", " elif tablename == \"debt_equity_returns\":\n", " df.loc[~df.ROR_actual.map(np.isfinite), \"ROR_actual\"] = 0.0\n", " df.loc[~df.ROE_actual.map(np.isfinite), \"ROE_actual\"] = 0.0\n", @@ -962,8 +1078,13 @@ " if tablename in fillna_dict:\n", " df.fillna(value=fillna_dict[tablename], inplace=True)\n", "\n", - " custom_meta_fields, custom_meta_content = generate_sheet_meta(workbook, tablename, release_date, df)\n", - " if release_date[-4:] < \"2023\" and tablename in [\"operations_emissions_by_fuel\", \"operations_emissions_by_tech\"]:\n", + " custom_meta_fields, custom_meta_content = generate_sheet_meta(\n", + " workbook, tablename, release_date, df\n", + " )\n", + " if release_date[-4:] < \"2023\" and tablename in [\n", + " \"operations_emissions_by_fuel\",\n", + " \"operations_emissions_by_tech\",\n", + " ]:\n", " # Both tables duplicate the 'Purchased Power' and 'EE & DR' data.\n", " # We only need one copy, which we create as 'other_generation'\n", " if df_nn is None:\n", @@ -971,21 +1092,32 @@ " # Drop many NULL columns we don't need\n", " df_anon.dropna(axis=1, how=\"all\", inplace=True)\n", " other_generation_df = df_anon\n", - " custom_gen_meta_fields, custom_gen_meta_content = generate_other_generation_meta(df_anon)\n", + " custom_gen_meta_fields, custom_gen_meta_content = (\n", + " generate_other_generation_meta(df_anon)\n", + " )\n", "\n", " iceberg_table = f\"{rmi_table_prefix}other_generation\"\n", - " dbt_models[iceberg_table] = dbt_table = {\"description\": custom_meta_content[\"description\"]}\n", + " dbt_models[iceberg_table] = dbt_table = {\n", + " \"description\": custom_meta_content[\"description\"]\n", + " }\n", " if custom_meta_fields:\n", " dbt_table[\"columns\"] = dbt_columns = {\n", - " name: {\"description\": custom_meta_fields[name][\"description\"]} for name in custom_meta_fields\n", + " name: {\"description\": custom_meta_fields[name][\"description\"]}\n", + " for name in custom_meta_fields\n", " }\n", " for name in custom_meta_fields:\n", " if \"tags\" in custom_meta_fields[name]:\n", " dbt_columns[name][\"tags\"] = custom_meta_fields[name][\"tags\"]\n", "\n", - " drop_table = osc._do_sql(f\"drop view if exists {iceberg_schema}.{iceberg_table}\", engine, verbose=False)\n", " drop_table = osc._do_sql(\n", - " f\"drop table if exists {iceberg_schema}.{iceberg_table}_source\", engine, verbose=False\n", + " f\"drop view if exists {iceberg_schema}.{iceberg_table}\",\n", + " engine,\n", + " verbose=False,\n", + " )\n", + " drop_table = osc._do_sql(\n", + " f\"drop table if exists {iceberg_schema}.{iceberg_table}_source\",\n", + " engine,\n", + " verbose=False,\n", " )\n", " osc.fast_pandas_ingest_via_hive(\n", " df_anon,\n", @@ -1000,7 +1132,9 @@ " overwrite=True,\n", " verbose=False,\n", " )\n", - " with open(models_dir.joinpath(f\"{iceberg_table}.sql\"), \"w\", encoding=\"utf-8\") as f:\n", + " with open(\n", + " models_dir.joinpath(f\"{iceberg_table}.sql\"), \"w\", encoding=\"utf-8\"\n", + " ) as f:\n", " print(\n", " \"{{ config(materialized='view', view_security='invoker') }}\"\n", " + f\"\"\"\n", @@ -1017,7 +1151,8 @@ " for index, row in df[df[\"operating_month\"].isna()].iterrows():\n", " # df_nn is only computed once, from either 'operations_emissions_by_fuel' or 'operations_emissions_by_tech'\n", " df0 = df_nn.loc[\n", - " (df_nn[\"respondent_id\"] == row[\"respondent_id\"]) & (df_nn[\"generator_id\"] == row[\"generator_id\"]),\n", + " (df_nn[\"respondent_id\"] == row[\"respondent_id\"])\n", + " & (df_nn[\"generator_id\"] == row[\"generator_id\"]),\n", " [\"operating_month\", \"operating_year\"],\n", " ]\n", " if len(df0) == 0:\n", @@ -1031,19 +1166,36 @@ " # pass\n", " elif tablename == \"emissions_targets\":\n", " # Needed because respondent_id 191 data is duplicated (and 121 is kinda duplicated, too).\n", - " df.drop_duplicates(subset=None, keep=\"first\", inplace=True, ignore_index=True)\n", - " df.loc[df[\"CO2_intensity_historical\"] == np.inf, [\"CO2_historical\", \"CO2_intensity_historical\"]] = [0, 0]\n", - " df[\"CO2_intensity_historical\"] = df[\"CO2_intensity_historical\"].astype(\"float64\")\n", + " df.drop_duplicates(\n", + " subset=None, keep=\"first\", inplace=True, ignore_index=True\n", + " )\n", + " df.loc[\n", + " df[\"CO2_intensity_historical\"] == np.inf,\n", + " [\"CO2_historical\", \"CO2_intensity_historical\"],\n", + " ] = [0, 0]\n", + " df[\"CO2_intensity_historical\"] = df[\"CO2_intensity_historical\"].astype(\n", + " \"float64\"\n", + " )\n", "\n", " # Fix discrepencies between emissions_targets and the other files that define/reference parent_name\n", " if release_date[-4:] < \"2023\":\n", - " df.loc[df.parent_name == \"American States Water\", \"parent_name\"] = \"American States Water Co.\"\n", - " df.loc[df.parent_name == \"CMS Energy\", \"parent_name\"] = \"CMS Energy Corp.\"\n", + " df.loc[df.parent_name == \"American States Water\", \"parent_name\"] = (\n", + " \"American States Water Co.\"\n", + " )\n", + " df.loc[df.parent_name == \"CMS Energy\", \"parent_name\"] = (\n", + " \"CMS Energy Corp.\"\n", + " )\n", " df.loc[df.parent_name == \"Emera Inc.\", \"parent_name\"] = \"Versant Power\"\n", " df.loc[df.parent_name == \"Fortis, Inc\", \"parent_name\"] = \"Fortis, Inc.\"\n", - " df.loc[df.parent_name == \"National Grid plc\", \"parent_name\"] = \"National Grid PLC\"\n", - " df.loc[df.parent_name == \"NorthWestern Corp.\", \"parent_name\"] = \"Northwestern Corp.\"\n", - " df.loc[df.parent_name == \"OG&E Energy\", \"parent_name\"] = \"OG&E Energy Corp.\"\n", + " df.loc[df.parent_name == \"National Grid plc\", \"parent_name\"] = (\n", + " \"National Grid PLC\"\n", + " )\n", + " df.loc[df.parent_name == \"NorthWestern Corp.\", \"parent_name\"] = (\n", + " \"Northwestern Corp.\"\n", + " )\n", + " df.loc[df.parent_name == \"OG&E Energy\", \"parent_name\"] = (\n", + " \"OG&E Energy Corp.\"\n", + " )\n", " df.loc[df.parent_name == \"PPL\", \"parent_name\"] = \"PPL Corp.\"\n", " df.loc[df.parent_name == \"Sempra Energy\", \"parent_name\"] = \"Sempra\"\n", " df.loc[df.parent_name == \"Verso\", \"parent_name\"] = \"Verso Corp.\"\n", @@ -1055,12 +1207,15 @@ "\n", " iceberg_table = f\"{rmi_table_prefix}{tablename}\"\n", " if custom_meta_content:\n", - " dbt_models[iceberg_table] = dbt_table = {\"description\": custom_meta_content[\"description\"]}\n", + " dbt_models[iceberg_table] = dbt_table = {\n", + " \"description\": custom_meta_content[\"description\"]\n", + " }\n", " dbt_table[\"meta\"] = custom_meta_content.copy()\n", " del dbt_table[\"meta\"][\"description\"]\n", " if custom_meta_fields:\n", " dbt_table[\"columns\"] = dbt_columns = {\n", - " name: {\"description\": custom_meta_fields[name][\"description\"]} for name in custom_meta_fields\n", + " name: {\"description\": custom_meta_fields[name][\"description\"]}\n", + " for name in custom_meta_fields\n", " }\n", " for name in custom_meta_fields:\n", " if \"tags\" in custom_meta_fields[name]:\n", @@ -1078,8 +1233,16 @@ " partition = \"utility_type_rmi\"\n", " else:\n", " raise KeyError\n", - " drop_table = osc._do_sql(f\"drop view if exists {iceberg_schema}.{iceberg_table}\", engine, verbose=False)\n", - " drop_table = osc._do_sql(f\"drop table if exists {iceberg_schema}.{iceberg_table}_source\", engine, verbose=False)\n", + " drop_table = osc._do_sql(\n", + " f\"drop view if exists {iceberg_schema}.{iceberg_table}\",\n", + " engine,\n", + " verbose=False,\n", + " )\n", + " drop_table = osc._do_sql(\n", + " f\"drop table if exists {iceberg_schema}.{iceberg_table}_source\",\n", + " engine,\n", + " verbose=False,\n", + " )\n", " if len(df) > 2000:\n", " print(\"hive->iceberg path...\")\n", " osc.fast_pandas_ingest_via_hive(\n", @@ -1115,7 +1278,9 @@ " index=False,\n", " method=osc.TrinoBatchInsert(batch_size=5000, verbose=False),\n", " )\n", - " with open(models_dir.joinpath(f\"{iceberg_table}.sql\"), \"w\", encoding=\"utf-8\") as f:\n", + " with open(\n", + " models_dir.joinpath(f\"{iceberg_table}.sql\"), \"w\", encoding=\"utf-8\"\n", + " ) as f:\n", " print(\n", " \"{{ config(materialized='view', view_security='invoker') }}\"\n", " + f\"\"\"\n", @@ -1198,7 +1363,11 @@ "metadata": {}, "outputs": [], "source": [ - "dbt_yml = open(repo_root.joinpath(\"dbt\", \"rmi_transform\", \"rmi_base_schema.yml\"), \"w\", encoding=\"utf-8\")" + "dbt_yml = open(\n", + " repo_root.joinpath(\"dbt\", \"rmi_transform\", \"rmi_base_schema.yml\"),\n", + " \"w\",\n", + " encoding=\"utf-8\",\n", + ")" ] }, { @@ -1237,7 +1406,11 @@ " indent = indent + 2\n", " for meta_key in meta:\n", " prefix = f\"{' '*indent}{meta_key}:\"\n", - " print(prefix, json_dumps_no_lint(meta[meta_key], len(prefix), comma=True), file=dbt_yml)\n", + " print(\n", + " prefix,\n", + " json_dumps_no_lint(meta[meta_key], len(prefix), comma=True),\n", + " file=dbt_yml,\n", + " )\n", " indent = indent - 4\n", " print(f\"{' '*indent} }}\", end=\"\", file=dbt_yml)\n", " print(f\"\\n{' '*indent}columns:\", file=dbt_yml)\n", @@ -1248,7 +1421,11 @@ " indent = indent + 2\n", " for col_meta in columns[col]:\n", " prefix = f\"{' '*indent}{col_meta}:\"\n", - " print(prefix, json_dumps_no_lint(columns[col][col_meta], len(prefix)), file=dbt_yml)\n", + " print(\n", + " prefix,\n", + " json_dumps_no_lint(columns[col][col_meta], len(prefix)),\n", + " file=dbt_yml,\n", + " )\n", " indent = indent - 2\n", " indent = indent - 4\n", "indent = indent - 2\n",