Skip to content

Commit

Permalink
Merge pull request #418 from modeseven-os-climate/workflow-updates
Browse files Browse the repository at this point in the history
Chore: Update workflows from devops upstream [skip-ci]
  • Loading branch information
ModeSevenIndustrialSolutions authored Mar 28, 2024
2 parents 3e3affb + 27ac631 commit 7257cb0
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 33 deletions.
20 changes: 14 additions & 6 deletions src/ITR/data/base_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -913,9 +913,12 @@ def get_company_projected_targets(self, company_ids: List[str], year=None) -> pd
]
if target_list:
with warnings.catch_warnings():
# pd.DataFrame.__init__ (in pandas/core/frame.py) ignores the beautiful dtype information adorning the pd.Series list elements we are providing. Sad!
# pd.DataFrame.__init__ (in pandas/core/frame.py) ignores
# the beautiful dtype information adorning the pd.Series list
# elements we are providing. Sad!
warnings.simplefilter("ignore")
# If target_list produces a ragged left edge, resort columns so that earliest year is leftmost
# If target_list produces a ragged left edge,
# resort columns so that earliest year is leftmost
df = pd.DataFrame(target_list).sort_index(axis=1)
df.index.set_names(["company_id", "scope"], inplace=True)
if year is not None:
Expand Down Expand Up @@ -1046,7 +1049,8 @@ def _allocate_emissions(
for sector in sectors
]

# Having done all scopes and sectors for this company above, replace historic Em and EI data below
# Having done all scopes and sectors for this company above,
# replace historic Em and EI data below
for sector_aligned in aligned_em:
sector, scopes = sector_aligned
historic_sector = historic_dict["+".join([orig_id, sector])]
Expand Down Expand Up @@ -1157,7 +1161,8 @@ def project_ei_trajectories(self, companies: List[ICompanyData], backfill_needed
if ITR.HAS_UNCERTAINTIES:
historic_ei_t = historic_ei_t.map(lambda x: np.nan if ITR.isna(x) else x)
backfilled_t = historic_ei_t.bfill(axis=0)
# FIXME: this hack causes backfilling only on dates on or after the first year of the benchmark, which keeps it from disrupting current test cases
# FIXME: this hack causes backfilling only on dates on or after the
# first year of the benchmark, which keeps it from disrupting current test cases
# while also working on real-world use cases. But we need to formalize this decision.
backfilled_t = backfilled_t.reset_index()
backfilled_t = backfilled_t.where(
Expand All @@ -1167,7 +1172,8 @@ def project_ei_trajectories(self, companies: List[ICompanyData], backfill_needed
backfilled_t.set_index("year", inplace=True)
if not historic_ei_t.compare(backfilled_t).empty:
logger.warning(
f"some data backfilled to {self.projection_controls.BASE_YEAR} for company_ids in list {historic_ei_t.compare(backfilled_t).columns.get_level_values('company_id').unique().tolist()}"
f"some data backfilled to {self.projection_controls.BASE_YEAR} for company_ids in list \
{historic_ei_t.compare(backfilled_t).columns.get_level_values('company_id').unique().tolist()}"
)
historic_ei_t = backfilled_t.sort_index(axis=1)
for company in companies:
Expand Down Expand Up @@ -1803,7 +1809,9 @@ def project_ei_targets(
if target_i.target_start_year >= target_a.target_start_year:
if target_i.target_start_year == target_a.target_start_year:
warnings.warn(
f"intensity target overrides absolute target for target_start_year={target_i.target_start_year} and target_end_year={target_i.target_end_year}"
f"intensity target overrides absolute target for \
target_start_year={target_i.target_start_year} and \
target_end_year={target_i.target_end_year}"
)
scope_targets_absolute.pop(0)
scope_targets = scope_targets_intensity
Expand Down
4 changes: 3 additions & 1 deletion src/ITR/data/osc_units.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,9 @@ def requantify_df_from_columns(df: pd.DataFrame, inplace=False) -> pd.DataFrame:
:param df: pd.DataFrame
:param inplace: bool, default False. If True, perform operation in-place.
:return: A pd.DataFrame with columns originally matching the pattern COLUMN_NAME [UNITS] renamed to COLUMN_NAME and replaced with a PintArray with dtype=ureg(UNITS) (aka 'pint[UNITS]')
:return: A pd.DataFrame with columns originally matching the pattern
COLUMN_NAME [UNITS] renamed to COLUMN_NAME and replaced with a PintArray
with dtype=ureg(UNITS) (aka 'pint[UNITS]')
"""
p = re.compile(r"^(.*)\s*\[(.*)\]\s*$")
if not inplace:
Expand Down
35 changes: 23 additions & 12 deletions src/ITR/data/template.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,12 @@ def _solve_intensities(self, df_fundamentals: pd.DataFrame, df_esg: pd.DataFrame
)
.loc[:, esg_year_columns]
)
# Now we have to reconcile the fact that production submetrics/boundaries may or may not align with emissions(intensity) submetrics/boundaries
# There are two cases we handle: (1) unique production rows match all intensity rows, and (2) production rows with submetrics match intensity rows with same submetrics
# There may be intensities for multiple scopes, which all multiply against the same production number but which produce per-scope emissions values
# Now we have to reconcile the fact that production submetrics/boundaries may
# or may not align with emissions(intensity) submetrics/boundaries
# There are two cases we handle: (1) unique production rows match all intensity rows,
# and (2) production rows with submetrics match intensity rows with same submetrics
# There may be intensities for multiple scopes, which all multiply against the same
# production number but which produce per-scope emissions values
df1_case1 = (
df1[df1.sub_count == 1]
.droplevel("submetric")
Expand Down Expand Up @@ -545,11 +548,13 @@ def _init_from_template_company_data(self, excel_path: str):

if self.template_version == 2:
# Ensure our df_esg rows connect back to fundamental data
# the one single advantage of template_version==1 is that fundamental data and esg data are all part of the same rows so no need to do this integrity check/correction
# the one single advantage of template_version==1 is that
# fundamental data and esg data are all part of the same rows
# so no need to do this integrity check/correction
esg_missing_fundamentals = ~df_esg.company_id.isin(df_fundamentals.index)
if esg_missing_fundamentals.any():
logger.error(
f"The following companies have ESG data defined but no fundamental data and will be removed from further analysis:\n{df_esg[esg_missing_fundamentals].company_id.unique()}"
f"The following companies have ESG data defined but no fundamental data and will be removed from further analysis:\n{df_esg[esg_missing_fundamentals].company_id.unique()}" # noqa: E501
)
df_esg = df_esg[~esg_missing_fundamentals]

Expand Down Expand Up @@ -919,7 +924,7 @@ def _fixup_name(x):
em_invalid_idx = em_invalid[em_invalid].index
if len(em_invalid_idx) > 0:
logger.error(
f"The following rows of data do not have proper emissions data (can be converted to t CO2e) and will be dropped from the analysis\n{df_esg.loc[em_invalid_idx]}"
f"The following rows of data do not have proper emissions data (can be converted to t CO2e) and will be dropped from the analysis\n{df_esg.loc[em_invalid_idx]}" # noqa: E501
)
df_esg = df_esg.loc[df_esg.index.difference(em_invalid_idx)]
em_metrics = em_metrics.loc[em_metrics.index.difference(em_invalid_idx)]
Expand Down Expand Up @@ -1166,9 +1171,13 @@ def _fixup_name(x):
# Shift out of general (case_1) and leave in specific (case_2)
case_1 = case_1.loc[~case_1.index.isin(case_3.index)]

# Case 4: case_1 scopes containing case_2 scopes that need to be removed before remaining scopes can be allocated
# Example: We have S1 allocated to electricity and gas, but S2 and S3 are general. To allocate S1S2S3 we need to subtract out S1, allocate remaining to S2 and S3 across Electricity and Gas sectors
# Eni's Plenitude and power is an example where S1S2S3 > S1+S2+S3 (due to lifecycle emissions concept). FIXME: don't know how to deal with that!
# Case 4: case_1 scopes containing case_2 scopes that need to be removed before
# remaining scopes can be allocated
# Example: We have S1 allocated to electricity and gas, but S2 and S3 are general.
# To allocate S1S2S3 we need to subtract out S1, allocate remaining to S2 and S3
# across Electricity and Gas sectors
# Eni's Plenitude and power is an example where S1S2S3 > S1+S2+S3 (due to lifecycle emissions concept).
# FIXME: don't know how to deal with that!
case_4_df = case_1.reset_index("metric").merge(
case_2.reset_index("metric"),
on=["sector", "company_id"],
Expand All @@ -1179,7 +1188,7 @@ def _fixup_name(x):
)
if not case_4.empty:
logger.error(
f"Dropping attempt to disentangle embedded submetrics found in sector/scope assignment dataframe:\n{best_esg_em.submetric[case_4.index]}"
f"Dropping attempt to disentangle embedded submetrics found in sector/scope assignment dataframe:\n{best_esg_em.submetric[case_4.index]}" # noqa: E501
)
case_1 = case_1.loc[~case_1.index.isin(case_4.index)]

Expand Down Expand Up @@ -1400,7 +1409,9 @@ def fill_blank_or_missing_scopes(df, scope_a, scope_b, scope_ab, index_names, hi
# And keep df_fundamentals in sync
self.df_fundamentals = df_fundamentals

# company_id, netzero_year, target_type, target_scope, target_start_year, target_base_year, target_base_year_qty, target_base_year_unit, target_year, target_reduction_ambition
# company_id, netzero_year, target_type, target_scope, target_start_year,
# target_base_year, target_base_year_qty, target_base_year_unit, target_year,
# target_reduction_ambition
return self._company_df_to_model(None, df_target_data, df_historic_data)

def _validate_target_data(self, target_data: pd.DataFrame) -> pd.DataFrame:
Expand Down Expand Up @@ -1464,7 +1475,7 @@ def unique_ids(mask):
mask = target_data["netzero_year"] > ProjectionControls.TARGET_YEAR
if mask.any():
c_ids_invalid_netzero_year = unique_ids(mask)
warning_message = f"Invalid net-zero target years (>{ProjectionControls.TARGET_YEAR}) are entered for companies with ID: {c_ids_invalid_netzero_year}"
warning_message = f"Invalid net-zero target years (>{ProjectionControls.TARGET_YEAR}) are entered for companies with ID: {c_ids_invalid_netzero_year}" # noqa: E501
logger.warning(warning_message)
target_data = target_data[~mask]

Expand Down
34 changes: 21 additions & 13 deletions src/ITR/data/vault_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ def dequantify_df(df: pd.DataFrame) -> pd.DataFrame:
return pd.concat([dequantify_column(df[col]) for col in df.columns], axis=1)


# Because this DF comes from reading a Trino table, and because columns must be unqiue, we don't have to enumerate to ensure we properly handle columns with duplicated names
# Because this DF comes from reading a Trino table, and because columns must be unique,
# we don't have to enumerate to ensure we properly handle columns with duplicated names
def requantify_df(df: pd.DataFrame, typemap={}) -> pd.DataFrame:
units_col = None
columns_not_found = [k for k in typemap.keys() if k not in df.columns]
Expand Down Expand Up @@ -125,10 +126,11 @@ def __init__(
hive_schema: Optional[str] = None,
):
"""
As an alternative to using FastAPI interfaces, this creates an interface allowing access to Production benchmark data via the Data Vault.
:param engine: the Sqlalchemy connect to the Data Vault
As an alternative to using FastAPI interfaces, this creates an interface allowing access to Production
benchmark data via the Data Vault. :param engine: the Sqlalchemy connect to the Data Vault
:param schema: The database schema where the Data Vault lives
:param hive_bucket, hive_catalog, hive_schema: Optional parameters to enable fast ingestion via Hive; otherwise uses Trino batch insertion (which is slow)
:param hive_bucket, hive_catalog, hive_schema: Optional parameters to enable fast ingestion via Hive;
otherwise uses Trino batch insertion (which is slow)
"""
super().__init__()
self.engine = engine
Expand All @@ -154,7 +156,9 @@ def create_vault_table_from_df(
:param schemaname: The schema where the table should be written
:param tablename: The name of the table in the Data Vault
:param engine: The SqlAlchemy connection to the Data Vault
:param hive_bucket: :param hive_catalog: :param hive_schema: Optional paramters. If given we attempt to use a fast Hive ingestion process. Otherwise use default (and slow) Trino ingestion.
:param hive_bucket: :param hive_catalog: :param hive_schema: Optional paramters.
# If given we attempt to use a fast Hive ingestion process.
# Otherwise use default (and slow) Trino ingestion.
:param verbose: If True, log information about actions of the Data Vault as they happen
"""
drop_table = f"drop table if exists {vault.schema}.{tablename}"
Expand Down Expand Up @@ -197,7 +201,8 @@ def create_vault_table_from_df(
# When reading SQL tables to import into DataFrames, it is up to the user to preserve {COL}, {COL}_units pairings so they can be reconstructed.
# If the user does a naive "select * from ..." this happens naturally.
# We can give a warning when we see a resulting dataframe that could have, but does not have, unit information properly integrated. But
# fixing the query on the fly becomes difficult when we consider the fully complexity of parsing and rewriting SQL queries to put the units columns in the correct locations.
# fixing the query on the fly becomes difficult when we consider the fully complexity of parsing and
# rewriting SQL queries to put the units columns in the correct locations.
# (i.e., properly in the principal SELECT clause (which can have arbitrarily complex terms), not confused by FROM, WHERE, GROUP BY, ORDER BY, etc.)


Expand Down Expand Up @@ -226,7 +231,7 @@ def read_quantified_sql(
]
for col_tuple in extra_unit_columns_positions:
logger.error(
f"Missing units column '{col_tuple[2]}' after original column '{sql_df.columns[col_tuple[1]]}' (should be column #{col_tuple[0]+col_tuple[1]+1} in new query)"
f"Missing units column '{col_tuple[2]}' after original column '{sql_df.columns[col_tuple[1]]}' (should be column #{col_tuple[0]+col_tuple[1]+1} in new query)" # noqa: E501
)
raise ValueError
return requantify_df(sql_df).convert_dtypes()
Expand All @@ -241,7 +246,8 @@ def read_quantified_sql(

# Basic Benchmark Data Assumptions
# EI for a given scope
# Production defined in terms of growth (or negative growth) on a rolling basis (so 0.05, -0.04) would mean 5% growth followed by 4% negative growth for a total of 0.8%
# Production defined in terms of growth (or negative growth) on a rolling basis
# (so 0.05, -0.04) would mean 5% growth followed by 4% negative growth for a total of 0.8%
# Benchmarks are named (e.g., 'OECM')


Expand Down Expand Up @@ -746,7 +752,7 @@ def compute_portfolio_weights(
elif scope == EScope.S1S2:
factor_sql = "select company_id, sum(co2_s1_by_year+if(is_nan(co2_s2_by_year),0.0,co2_s2_by_year))"
elif scope == EScope.S1S2:
factor_sql = "select company_id, sum(co2_s1_by_year+if(is_nan(co2_s2_by_year),0.0,co2_s2_by_year)+if(is_nan(co2_s3_by_year),0.0,co2_s3_by_year))"
factor_sql = "select company_id, sum(co2_s1_by_year+if(is_nan(co2_s2_by_year),0.0,co2_s2_by_year)+if(is_nan(co2_s3_by_year),0.0,co2_s3_by_year))" # noqa: E501
else:
raise ValueError(f"scope {scope} not supported")
else:
Expand Down Expand Up @@ -792,13 +798,15 @@ def __init__(
-> { cumulative_budgets, cumulative_emissions }
:param engine: The Sqlalchemy connector to the Data Vault
:param company_data: as a VaultCompanyDataProvider, this provides both a reference to a fundamental company data table and data structures containing historic ESG data. Trajectory and Target projections also get filled in here.
:param company_data: as a VaultCompanyDataProvider, this provides both a reference to a fundamental
company data table and data structures containing historic ESG data. Trajectory and Target projections also get filled in here.
:param benchmark_projected_production: A reference to the benchmark production table as well as data structures used by the Data Vault for projections
:param benchmark_projected_ei: A reference to the benchmark emissions intensity table as well as data structures used by the Data Vault for projections
:param estimate_missing_data: If provided, a function that can fill in missing S3 data (possibly by aligning to benchmark statistics)
:param ingest_schema: The database schema where the Data Vault lives
:param itr_prefix: A prefix for all tables so that different users can use the same schema without conflicts
:param hive_bucket: :param hive_catalog: :param hive_schema: Optional paramters. If given we attempt to use a fast Hive ingestion process. Otherwise use default (and slow) Trino ingestion.
:param hive_bucket: :param hive_catalog: :param hive_schema: Optional paramters. If given we attempt to use a
fast Hive ingestion process. Otherwise use default (and slow) Trino ingestion.
"""
# This initialization step adds trajectory and target projections to `company_data`
super().__init__(
Expand Down Expand Up @@ -981,7 +989,7 @@ def __init__(
from {self._v.schema}.{self._company_table} C
join P_BY on P_BY.company_id=C.company_id
join {self._v.schema}.{self._production_table} P on P.company_id=C.company_id
join {self._v.schema}.{self._benchmarks_ei_name} B on P.year=B.year and C.sector=B.sector and B.region=if(C.region in ('North America', 'Europe'), C.region, 'Global')
join {self._v.schema}.{self._benchmarks_ei_name} B on P.year=B.year and C.sector=B.sector and B.region=if(C.region in ('North America', 'Europe'), C.region, 'Global') # noqa: E501
join {self._v.schema}.{self._emissions_table} CE on CE.company_id=C.company_id and B.scope=CE.scope and CE.year=P.year
join {self._v.schema}.{self._emissions_table} CE_BY on CE_BY.company_id=C.company_id and CE_BY.scope=B.scope and CE_BY.year=P_BY.base_year
join {self._v.schema}.{self._benchmarks_ei_name} B_BY on B.scope=B_BY.scope and B.region=B_BY.region and B.sector=B_BY.sector and B_BY.year=P_BY.base_year
Expand Down Expand Up @@ -1069,7 +1077,7 @@ def get_pa_temp_scores(
if probability < 0 or probability > 1:
raise ValueError(f"probability value {probability} outside range [0.0, 1.0]")
temp_scores = read_quantified_sql(
"select company_id, scope, target_temperature_score, target_temperature_score_units, trajectory_temperature_score, trajectory_temperature_score_units, year"
"select company_id, scope, target_temperature_score, target_temperature_score_units, trajectory_temperature_score, trajectory_temperature_score_units, year" # noqa: E501
f" from {self._tempscore_table} where scope='{scope.name}' and year={year}",
None,
self._v.engine,
Expand Down
Loading

0 comments on commit 7257cb0

Please sign in to comment.