from __future__ import annotations
import re
from io import StringIO
from pandas.core.api import DataFrame, Timestamp, to_datetime, to_timedelta, Series
from import read_csv, read_fwf
class Report:
_rptfile: str
"""path to swmm rpt file"""
_rpt_text: str
"""text string of rpt file contents"""
_sections: dict[str, str]
"""dictionary of SWMM report sections as {section name: section text}"""
def __init__(self, rptfile: str):
"""Base class for a SWMM simulation report file.
The report object provides an api for the tables in the the SWMM
simulation report file. Tables are access as properties of the object
and returned as pandas DataFrames.
rptfile: str
model report file path
self._rptfile = rptfile
with open(rptfile) as file:
self._rpt_text =
self._sections = {
self._find_title(section): section
for section in self._find_sections(self._rpt_text)
def _find_sections(rpt_text: str) -> list[str]:
Function to split the report file text into separate sections using a regex
pattern match:
"^\s+$\s+(?=\*|A)": pattern matches blank lines followed by at least
1 white space followed by a lookhead for a asterisk (demarks section headers)
or the letter A (looks for the word Analysis at the end of the report file)
rpt_text: str
Text content of the report file
A list section texts
# pattern to match blank lines preceding a line of asterisks
section_pattern = R"^\s+$\s+(?=\*|A)"
section_comp = re.compile(section_pattern, re.MULTILINE)
return list(
map(lambda x: x.replace("\n ", "\n"), section_comp.split(rpt_text)[2:-1])
def _find_title(section: str) -> str:
Function to extract the title of section produced by _find_sections using
regex to match lines between two lines of asterisks.
"^\*+[\s\S]*?\n([\s\S]*?)\s*\*+": Pattern matches any number white space or non-white
space characters that are between:
1. A line starting with a string of asterisks followed by any white space or
non-whitespace chacter and ending with a new line break
2. A line starting with a string of asterisks
section: str
The section text produced by _find_sections
Title of section
If regex could not find a match
# pattern to match line between two lines of asterisks
title_pattern = R"^\*+[\s\S]*?\n([\s\S]*?)\s*\*+"
title_comp = re.compile(title_pattern, re.MULTILINE)
s = title_comp.match(section)
if s:
# if string is found, split line on more two consecutive spaces and pull the first token
return" ")[0]
raise Exception(f"Error finding title for section\n{section}")
def _split_section(section: str) -> tuple[str, str]:
Function to split a report section into header and data elements. Relies on regex
matching lines with consecutive dashes indicating header lines.
section: str
The section text produced by _find_sections
Tuple[str, str]
header text and data text
If regex could not find a match
title = Report._find_title(section)
subsections = re.split(R"\s*-+\n", section)
num_subsections = len(subsections)
if num_subsections == 1:
header = "Result"
# split section on line of asterisks
data = re.split(R"\*+", section)[-1]
elif num_subsections == 2:
header, data = subsections
elif num_subsections == 3:
notes, header, data = subsections
elif num_subsections == 4:
notes, header, data, sytem = subsections
raise Exception(f"Error parsing table {title}")
return header, data
def _parse_header(header: str) -> list[str]:
Parse header line produced from _split_section into list of column headers. Uses pandas
read_fwf to automatically parse multi line headers present in report file.
header: str
Header text string produced from _split_section
List of column headers
# substitute single spaces between words with underscores
# replace asterisks or dashes with spaces
header = [
re.sub(R"(?<=\w)[^\S\r\n](?=\w)", "_", field[1].dropna()"_"))
for field in read_fwf(
StringIO(re.sub(R"\*|-", " ", header)), header=None
# split day and time into separate fields to be recombined in to datetime object
# when parsing table
if "Time_of_Max_Occurrence_days_hr:min" in header:
max_idx = header.index("Time_of_Max_Occurrence_days_hr:min")
header[max_idx] = "days"
header.insert(max_idx + 1, "Time_of_Max")
return header
def _parse_table(
header: list[str], data: str, sep: str = R"\s{2,}|\s:\s", index_col: int = 0
) -> DataFrame:
Function to parse data string produced from _split_section into pandas DataFrame
header: Sequence[str]
Sequence of column names to assign to DataFrame. Mostly can be produced from _parse_header.
data: str
Data string produced form _split_section
sep: str, optional
Delimeter to be fed into pandas read_csv function that operates on data string
, by default R"\s{2,}|\s:\s"
index_col: int, optional
Column in data to be used as DataFrame index, by default 0
Report data table
# remove leading spaces on each line and replace long runs of periods with spaces
data = re.sub(R"^\s+", "", re.sub(R"\.{2,}", " ", data), flags=re.MULTILINE)
# by default read in data with minimum 2-spaces or semicolon flanked by spaces as delimiter
df = read_csv(
# convert day and time columns into a single datetime column
if "Time_of_Max" in df.columns:
# convert time of max to timedelta
df["Time_of_Max"] = to_timedelta(
df.pop("days").astype(int), unit="D"
) + to_timedelta(
df["Time_of_Max"] + ":00"
) # type: ignore
return df
def analysis_options(self) -> Series:
Pandas series containing the analysis options listed in the
report file including units, models, methods, dates, time steps, etc.
Series of options.
if not hasattr(self, "_analysis_options"):
header, data = self._split_section(self._sections["Analysis Options"])
df = self._parse_table(["Option", "Setting"], data)["Setting"]
self._analysis_options = df.dropna()
return self._analysis_options
def runoff_quantity_continuity(self) -> DataFrame:
Runoff quantity continuity error table in volume and depth units.
System wide error is show in percent.
DataFrame of runoff quantity continuity error table.
if not hasattr(self, "_runoff_quantity_continuity"):
header, data = self._split_section(
self._sections["Runoff Quantity Continuity"]
# substitute spaces between words with underscore so read_fwf works
# had to use some regex to not also match new lines
header = self._parse_header(re.sub(R"(?<=\w)[^\S\r\n](?=\w)", "_", header))
self._runoff_quantity_continuity = self._parse_table(header, data)
return self._runoff_quantity_continuity
def runoff_quality_continuity(self) -> DataFrame:
Runoff quality continuity error table in mass units for each pollutant.
System wide error is show in percent.
DataFrame of runoff quality continuity error table
if not hasattr(self, "_runoff_quality_continuity"):
header, data = self._split_section(
self._sections["Runoff Quality Continuity"]
# substitute spaces between words with underscore so read_fwf works
# had to use some regex to not also match new lines
header = self._parse_header(re.sub(R"(?<=\w)[^\S\r\n](?=\w)", "_", header))
self._runoff_quality_continuity = self._parse_table(header, data)
return self._runoff_quality_continuity
def groundwater_continuity(self) -> DataFrame:
Groundwater quantity continuity error table in volume and depth units.
System wide error is show in percent.
DataFrame of groundwater quantity continuity error table
if not hasattr(self, "_groundwater_continuity"):
header, data = self._split_section(self._sections["Groundwater Continuity"])
# substitute spaces between words with underscore so read_fwf works
# had to use some regex to not also match new lines
header = self._parse_header(re.sub(R"(?<=\w)[^\S\r\n](?=\w)", "_", header))
self._groundwater_continuity = self._parse_table(header, data)
return self._groundwater_continuity
def flow_routing_continuity(self) -> DataFrame:
Flow routing continuity error table in volume units.
System wide error is show in percent.
DataFrame of flow routing continuity error table
if not hasattr(self, "_flow_routing_continuity"):
header, data = self._split_section(
self._sections["Flow Routing Continuity"]
# substitute spaces between words with underscore so read_fwf works
# had to use some regex to not also match new lines
header = self._parse_header(re.sub(R"(?<=\w)[^\S\r\n](?=\w)", "_", header))
self._flow_routing_continuity = self._parse_table(header, data)
return self._flow_routing_continuity
def quality_routing_continuity(self) -> DataFrame:
Quality routing continuity error table in mass units.
System wide error is show in percent.
DataFrame of quality routing continuity error table
if not hasattr(self, "_quality_routing_continuity"):
header, data = self._split_section(
self._sections["Quality Routing Continuity"]
# substitute spaces between words with underscore so read_fwf works
# had to use some regex to not also match new lines
header = self._parse_header(re.sub(R"(?<=\w)[^\S\r\n](?=\w)", "_", header))
self._quality_routing_continuity = self._parse_table(header, data)
return self._quality_routing_continuity
def highest_continuity_errors(self) -> DataFrame:
Highest continuity error table in percent.
This table shows the model elements with the highest
flow routing continuity error.
DataFrame of highest continuity errors table
if not hasattr(self, "_highest_errors"):
header, data = self._split_section(
self._sections["Highest Continuity Errors"]
df = self._parse_table(
["object_type", "name", "percent_error"], data, sep=R"\s+", index_col=1
df["percent_error"] = df["percent_error"].str.strip("()%").astype(float)
self._highest_errors = df
return self._highest_errors
def time_step_critical_elements(self) -> DataFrame:
Time-step critical elements table in percent.
This table shows the model elements that were controlling
the model time step if a variable one was used.
DataFrame of time-step critical elements table
if not hasattr(self, "_ts_critical"):
header, data = self._split_section(
self._sections["Time-Step Critical Elements"]
df = self._parse_table(
["object_type", "name", "percent"], data, sep=R"\s+", index_col=1
df["percent"] = df["percent"].str.strip("()%").astype(float)
self._ts_critical = df
return self._ts_critical
def highest_flow_instability_indexes(self) -> DataFrame:
Highest flow instability indexes.
This table shows the model elements that have the highest
flow instability.
DataFrame of highest flow instability indexes table
if not hasattr(self, "_highest_flow_instability_indexes"):
header, data = self._split_section(
self._sections["Highest Flow Instability Indexes"]
if "All links are stable" in data:
data = ""
df = self._parse_table(
["object_type", "name", "index"], data, sep=R"\s+", index_col=1
df["index"] = df["index"].str.strip("()").astype(int)
self._highest_flow_instability_indexes = df
return self._highest_flow_instability_indexes
def routing_time_step_summary(self) -> DataFrame:
Routing time step summary table that shows the average, minimum,
and maximum time steps as well as convergance summary.
DataFrame of routing time step summary table
if not hasattr(self, "_routing_time_step_summary"):
header, data = self._split_section(
self._sections["Routing Time Step Summary"]
self._routing_time_step_summary = self._parse_table(
self._parse_header(header), data, sep=R"\s+:\s+"
return self._routing_time_step_summary
def runoff_summary(self) -> DataFrame:
Runoff summary table for each subcatchment that details rainfall,
runon, evap, infil, and runoff.
DataFrame of subcatchment runoff summary table
if not hasattr(self, "_runoff_summary"):
header, data = self._split_section(
self._sections["Subcatchment Runoff Summary"]
self._runoff_summary = self._parse_table(self._parse_header(header), data)
return self._runoff_summary
def groundwater_summary(self) -> DataFrame:
Groundwater summary table for each subcatchment that details groundwater
inflow, outflow, moisture, and water table.
DataFrame of subcatchment groundwater summary table
if not hasattr(self, "_groundwater_summary"):
header, data = self._split_section(self._sections["Groundwater Summary"])
self._groundwater_summary = self._parse_table(
self._parse_header(header), data
return self._groundwater_summary
def washoff_summary(self) -> DataFrame:
Washoff summary table that details the total pollutant load
that was washed off of each subcatchment.
DataFrame of subcatchment washoff summary table
if not hasattr(self, "_washoff_summary"):
header, data = self._split_section(
self._sections["Subcatchment Washoff Summary"]
self._washoff_summary = self._parse_table(self._parse_header(header), data)
return self._washoff_summary
def node_depth_summary(self) -> DataFrame:
Node depth summary table that details the average and maximum
depth and HGL simulated for each node.
DataFrame of node depth summary table
if not hasattr(self, "_node_depth_summary"):
header, data = self._split_section(self._sections["Node Depth Summary"])
self._node_depth_summary = self._parse_table(
self._parse_header(header), data, sep=R"\s{1,}|\s:\s"
return self._node_depth_summary
def node_inflow_summary(self) -> DataFrame:
Node inflow summary table that details the maximum inflow rates, total
inflow volumes, and flow balance error percent for each node.
DataFrame of node inflow summary table
if not hasattr(self, "_node_inflow_summary"):
header, data = self._split_section(self._sections["Node Inflow Summary"])
self._node_inflow_summary = self._parse_table(
self._parse_header(header), data
return self._node_inflow_summary
def node_surchage_summary(self) -> DataFrame:
Node surcharge summary that details the maximum surcharge level and duration
of surharge for each node.
DataFrame of node surcharge summary table
if not hasattr(self, "_node_surcharge_summary"):
header, data = self._split_section(self._sections["Node Surcharge Summary"])
self._node_surcharge_summary = self._parse_table(
self._parse_header(header), data
return self._node_surcharge_summary
def node_flooding_summary(self) -> DataFrame:
Node flood summary that details the maximum ponded depth, peak flooding rate, total flood volume,
total flood duration for each node.
DataFrame of node flooding summary table
if not hasattr(self, "_node_flooding_summary"):
header, data = self._split_section(self._sections["Node Flooding Summary"])
self._node_flooding_summary = self._parse_table(
self._parse_header(header), data
return self._node_flooding_summary
def storage_volume_summary(self) -> DataFrame:
Storage volume summary that details the frequency of filling, average and peak volumes,
losses, and outfall rate for each storage unit.
DataFrame of storage volume summary table
if not hasattr(self, "_storage_volume_summary"):
header, data = self._split_section(self._sections["Storage Volume Summary"])
header = header.replace("Storage Unit", "Storage ")
self._storage_volume_summary = self._parse_table(
self._parse_header(header), data
return self._storage_volume_summary
def outfall_loading_summary(self) -> DataFrame:
Outfall loading summary that details the flow frequency, average and peak flow rates,
total outflow volume, and pollutant mass loads for each outfall.
DataFrame of outfall loading summary table
if not hasattr(self, "_outfall_loading_summary"):
header, data = self._split_section(
self._sections["Outfall Loading Summary"]
header = header.replace("Outfall Node", "Outfall ")
self._outfall_loading_summary = self._parse_table(
self._parse_header(header), data
return self._outfall_loading_summary
def link_flow_summary(self) -> DataFrame:
Link flow summary that details the peak flow, velocity, depth, and capacity for each link.
DataFrame of link flow summary table
if not hasattr(self, "_link_flow_summary"):
header, data = self._split_section(self._sections["Link Flow Summary"])
header = header.replace("|", " ")
self._link_flow_summary = self._parse_table(
self._parse_header(header), data, sep=R"\s{1,}|\s:\s"
return self._link_flow_summary
def flow_classification_summary(self) -> DataFrame:
Flow classification summary that details the amount of conduit lengthening during
the simualtion and the fraction of simulation time that is dry, subcritical, supercritical,
or critical flow for each conduit.
DataFrame of flow classification summary table
if not hasattr(self, "_flow_classification_summary"):
header, data = self._split_section(
self._sections["Flow Classification Summary"]
to_remove = "---------- Fraction of Time in Flow Class ----------"
to_replace = " "
header = header.replace(to_remove, to_replace)
self._flow_classification_summary = self._parse_table(
self._parse_header(header), data
return self._flow_classification_summary
def conduit_surcharge_summary(self) -> DataFrame:
Conduit surcharge summary that details the hours of surcharging and
capacity limited conditions.
DataFrame of conduit surcharge summary table
if not hasattr(self, "_conduit_surcharge_summary"):
header, data = self._split_section(
self._sections["Conduit Surcharge Summary"]
to_remove = "--------- Hours Full --------"
to_replace = "HrsFull HoursFull HrsFull "
header = header.replace(to_remove, to_replace)
self._conduit_surcharge_summary = self._parse_table(
self._parse_header(header), data
return self._conduit_surcharge_summary
def pumping_summary(self) -> DataFrame:
Pumping summary that details the utilization, peak flow rates, total flow volume,
power usage, and time off pump curve for each pump.
DataFrame of pumping summary table
if not hasattr(self, "_pumping_summary"):
header, data = self._split_section(self._sections["Pumping Summary"])
header = self._parse_header(header)
header[-1] = "Percent_Time_Off_Pump_Curve_Low"
self._pumping_summary = self._parse_table(header, data)
return self._pumping_summary
def link_pollutant_load_summary(self) -> DataFrame:
Link pollutant load summary that details the total pollutant mass discharged
from each link.
DataFrame of link pollutant load summary table
if not hasattr(self, "_link_pollutant_load_summary"):
header, data = self._split_section(
self._sections["Link Pollutant Load Summary"]
self._link_pollutant_load_summary = self._parse_table(
self._parse_header(header), data
return self._link_pollutant_load_summary
def analysis_begun(self) -> Timestamp:
Date and time when the simulation was started
Simulation start time
if analysis begun text could not be found in the report file
if not hasattr(self, "_analysis_begun"):
pattern = R"\s+Analysis begun on:\s+([^\n]+)$"
s =, self._rpt_text, flags=re.MULTILINE)
if s:
self._analysis_begun = to_datetime(
raise Exception("Error finding analysis begun")
return self._analysis_begun
def analysis_end(self) -> Timestamp:
Date and time when the simulation ended
Simulation end time
if analysis ended text could not be found in the report file
if not hasattr(self, "_analysis_end"):
pattern = R"\s+Analysis ended on:\s+([^\n]+)$"
s =, self._rpt_text, flags=re.MULTILINE)
if s:
self._analysis_end = to_datetime(
raise Exception("Error finding analysis end")
return self._analysis_end