Source code for swmm.pandas.output.structure

from __future__ import annotations
from collections.abc import Sequence

import numpy as np
from pandas.core.api import DataFrame
from pandas._libs.missing import NA

from swmm.pandas.output.tools import arrayish

volumeConstants = {
    "CFS": dict(multiplier=1 * (7.481 / 1e6), volumeUnits="MG"),
    "GPM": dict(multiplier=(1 / 60) * (1 / 1e6), volumeUnits="MG"),
    "MGD": dict(multiplier=(1 / 86400) * 1, volumeUnits="MG"),
    "CMS": dict(multiplier=1 * 1, volumeUnits="CM"),
    "LPS": dict(multiplier=1 * (1 / 1000), volumeUnits="CM"),
    "MLD": dict(multiplier=(1 / 86400) * 1000, volumeUnits="CM"),
}

hour_unit = np.timedelta64(1, "h")

# This class as it is could be DRYer, but it should also integrate with the
# inp file module and rpt file module when they are implemented. Plan to refactor
# and make more DRY when those are added


[docs] class Structure: """ A class that represents a particular system structure that may be represented by multiple model elements. The outputs from each element are combined into a single time series for analysis as if they are a single structure. The structure class can be used to summarize flow and flooding at one or more model elements from a particular simulation. Parse link flow time series into discrete events with the `flowEvents` method, or summarize flooding events with the `floodEvents` method. Parameters ---------- outfile: swmm.pandas.Output The swmm-pandas outfile object containing the model elements. link: Union[str, Sequence[str]] The list of links that belong to the structure. node: Union[str, Sequence[str]] The list of nodes that below to the structure. """
[docs] def __init__( self, outfile, link: str | Sequence[str], node: str | Sequence[str], ): self.out = outfile """The Output object from which this structure is derived""" self.link = link """A list of the link(s) that belong to this structure""" self.node = node """A list of the node(s) that belong to this structure"""
@property def floodFrame(self) -> DataFrame: """ Returns a pandas DataFrame with the flood rates of each node in the structure Returns ------- pd.DataFrame Time series of flooding for each node """ if hasattr(self, "_floodFrame"): pass else: self._floodFrame = self.out.node_series( self.node, "flooding_losses", columns="elem" ) return self._floodFrame @property def flowFrame(self) -> DataFrame: """ Returns a pandas DataFrame with the flow rates of each link in the structure Returns ------- pd.DataFrame Time series of flow for each link """ if hasattr(self, "_flowFrame"): pass else: self._flowFrame = self.out.link_series( self.link, "flow_rate", columns="elem" ) return self._flowFrame def _aggSeries( self, df: DataFrame, useNegative: bool | Sequence[bool] = False, reverse: bool | int | Sequence[bool | int] = False, aggFunc: str = "sum", ): """ Aggregate a multi element time series into a single element time series. This function is used to calculate combined flow rates and flooding rates for the structure. Parameters ---------- df: pd.DataFrame A DataFrame with a time series each column for each element in the structure. (e.g. output of self.flowFrame) useNegative: Union[bool, Sequence[bool]], optional If true, negative values will not be removed from the time series. Can either provide a boolean to apply to all columns or a list of booleans to apply to each column, by default False reverse: Union[bool, Sequence[bool]], optional If true, the timeseries will be multiplied by -1. Can either priovide a boolean to apply to all columns or a list of booleans to apply to each column, by default False aggFunc: str, optional The aggregation function to apply to all columns in the DataFrame. Should be compatible with `pd.DataFrame.agg`_., by default "sum" .. _pd.DataFrame.agg: https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.agg.html Returns ------- pd.Series Single time series aggregated from input DataFrame. Raises ------ ValueError If useNegative or reverse arguments are not a compatible type. """ # reverse values if requested if isinstance(reverse, arrayish): reverse = [-1 if col else 1 for col in reverse] elif isinstance(reverse, bool): reverse = -1 if reverse else 1 else: raise ValueError( f"invert must be either bool or sequence of bool, given {type(reverse)}" ) df = df * reverse # screen out negative values if requested if isinstance(useNegative, arrayish): for i, col in enumerate(df.columns): if not useNegative[i]: df.loc[df[col] < 0, col] = 0 elif isinstance(useNegative, bool): if not useNegative: for i, col in enumerate(df.columns): df.loc[df[col] < 0, col] = 0 else: raise ValueError( f"useNegative must be either bool or sequence of bool, given {type(useNegative)}" ) # return aggregated df according to given aggFunc return df.agg(func=aggFunc, axis=1)
[docs] def flowEvents( self, inter_event_period: float = 6, thresholdFlow: float = 0.01, useNegativeFlow: bool | Sequence[bool] = False, reverseFlow: bool | Sequence[bool] = False, ): """ Bin flow data into discrete events based on an inter-event period and threshold flow rate. Maximum flowrates, total flow volumes, and duration of each event are returned in a DataFrame. Parameters ---------- inter_event_period: float, optional The period in hours of flow less than or equal to thresholdFlow that demarks flow events, default to 6 thresholdFlow: float, optional The flowrate in model flow units that dry or baseline condition that is not considered significant, default to 0.01 useNegativeFlow: bool, optional If true, the method will consider negative flows when calculating flow volumes, defaults False reverseFlow: bool, optional If true, the method will calculate the flow in the reverse direction by multiplying the timeseries by negative one, defaults to False Returns ------- pd.DataFrame DataFrame with statistics on each flow event """ # pull aggregated series series = self._aggSeries(self.flowFrame, useNegativeFlow, reverseFlow) # put series in DataFrame, and add event_num column q = DataFrame( series[series > thresholdFlow], columns=["flow_rate"] ).reset_index() q["event_num"] = NA # initialize first event q.loc[0, "event_num"] = 1 # calculate period between flows greater than threshold hours = q.datetime.diff(1) / hour_unit # slice out times demarking a new event # assign event numbers to those starting points slicer = hours > inter_event_period q.loc[slicer, "event_num"] = range(2, sum(slicer) + 2) q.event_num.fillna(method="ffill", inplace=True) # group by event_num gpd = q.groupby("event_num") # find indices of max flow timesteps in each event maxSer = gpd.flow_rate.idxmax() # find event start date start_date = gpd.datetime.min().rename("start_datetime") # calculate volume for each event vol = ( gpd.flow_rate.sum() * self.out.report * volumeConstants[self.out.units[1]]["multiplier"] ) # add unit name to column vol.name = f"totalVolume_{volumeConstants[self.out.units[1]]['volumeUnits']}" # calculate the duration of each event in hours durations = (gpd.datetime.count() * self.out.report) / 60 / 60 durations.name = "hours_duration" # join in event volumes and durations with event maxima return ( q.loc[maxSer] .join(start_date, on="event_num") .join(vol, on="event_num") .join(durations, on="event_num") .rename({"flow_rate": "maxFlow", "datetime": "time_of_maxFlow"}, axis=1) .set_index("event_num") )
[docs] def floodEvents( self, inter_event_period: float = 6, thresholdFood: float = 0.01, ): """ Bins flooding data into discrete events based on an inter-event period and threshold flooding rate. Maximum flooding rates and duration of each flooding event are returned in a DataFrame. TODO: add in ponded depth when inp file is integrated. Ponded volume from out file is tough to interpret alone. Parameters ---------- inter_event_period: float, optionalep The period in hours of flooding less than or equal to thresholdFlood that demarks flow events, default to 6 thresholdFood: float, optional The flooding rate in model flow units above which should be considered in calculations, default to 0.01 Returns ------- pd.DataFrame DataFrame with statistics on each flooding event """ series = self._aggSeries(self.floodFrame) # put series in DataFrame, and add event_num column q = DataFrame( series[series > thresholdFood], columns=["flooding_losses"], ).reset_index() q["event_num"] = NA # initialize first event q.loc[0, "event_num"] = 1 # calculate period between flows greater than threshold hours = q.datetime.diff(1) / hour_unit # slice out times demarking a new event # assign event numbers to those starting points slicer = hours > inter_event_period q.loc[slicer, "event_num"] = range(2, sum(slicer) + 2) q.event_num.fillna(method="ffill", inplace=True) # group by event_num gpd = q.groupby("event_num") # find indices of max flow timesteps in each event maxSer = gpd.flooding_losses.idxmax() # calculate the duration of each event in hours durations = (gpd.datetime.count() * self.out.report) / 60 / 60 durations.name = "hours_duration" # return event maxima joined with durations return ( q.loc[maxSer] .join(durations, on="event_num") .rename( {"flooding_losses": "maxFloodRate", "datetime": "time_of_maxFlow"}, axis=1, ) .set_index("event_num") )