Disaggregators & Pipelines¶
Base Class¶
Disaggregator ¶
Bases: ABC
Abstract base class for all temporal disaggregation methods.
Disaggregators transform synthetic flows from one temporal resolution to a finer resolution (e.g., monthly to daily).
All disaggregator implementations should inherit from this class.
Initialize the disaggregator with algorithm configuration.
Subclasses add algorithm-specific keyword-only parameters before
name and debug. Data is not passed here — use fit(Q_obs)
or preprocessing(Q_obs) instead.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Name identifier for this disaggregator instance. |
None
|
debug
|
bool
|
Enable debug logging. |
False
|
n_sites
property
¶
Number of sites in the disaggregator.
Returns:
| Type | Description |
|---|---|
int
|
Number of sites. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If preprocessing not yet run. |
sites
property
¶
List of site names.
Returns:
| Type | Description |
|---|---|
List[str]
|
Site identifiers. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If preprocessing not yet run. |
input_frequency
abstractmethod
property
¶
Expected temporal frequency of input ensemble.
Returns:
| Type | Description |
|---|---|
str
|
Pandas frequency string (e.g., 'MS' for monthly, 'W' for weekly). |
output_frequency
abstractmethod
property
¶
Temporal frequency of disaggregated output.
Returns:
| Type | Description |
|---|---|
str
|
Pandas frequency string (e.g., 'D' for daily, 'H' for hourly). |
validate_input_data ¶
Validate and standardize input data format.
Checks type, DatetimeIndex, NaN content, and negative values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data
|
Series or DataFrame
|
Input time series data |
required |
Returns:
| Type | Description |
|---|---|
DataFrame
|
Validated and standardized data |
Raises:
| Type | Description |
|---|---|
ValueError
|
If data format is invalid |
TypeError
|
If data type is unsupported |
validate_preprocessing ¶
Check if preprocessing has been completed.
Raises:
| Type | Description |
|---|---|
ValueError
|
If preprocessing() has not been run. |
validate_fit ¶
Check if disaggregator has been fitted.
Raises:
| Type | Description |
|---|---|
ValueError
|
If fit() has not been run. |
update_state ¶
Update disaggregator state flags.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
preprocessed
|
bool
|
Set preprocessing state. |
None
|
fitted
|
bool
|
Set fitted state. |
None
|
validate_input_ensemble ¶
Validate that input ensemble is compatible with disaggregator.
Checks temporal frequency and site consistency.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ensemble
|
Ensemble
|
Input ensemble to validate |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If ensemble is incompatible with disaggregator |
get_params ¶
Get initialization parameters (scikit-learn style).
Returns only constructor/configuration parameters, not fitted values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
deep
|
bool
|
If True, return deep copy of parameters. |
True
|
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dictionary of initialization parameters. |
get_fitted_params ¶
Get parameters learned from data during fit().
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dictionary of fitted parameters (all keys end with underscore). |
Raises:
| Type | Description |
|---|---|
ValueError
|
If disaggregator has not been fitted yet. |
summary ¶
Generate comprehensive summary of disaggregator configuration and fit.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
show_fitted
|
bool
|
Whether to include fitted parameters in summary. |
True
|
Returns:
| Type | Description |
|---|---|
str
|
Formatted summary string. |
save ¶
Save fitted disaggregator to file using pickle.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filepath
|
str
|
Path to save the disaggregator. |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If disaggregator is not fitted. |
load
classmethod
¶
Load fitted disaggregator from file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filepath
|
str
|
Path to saved disaggregator file. |
required |
Returns:
| Type | Description |
|---|---|
Disaggregator
|
Loaded disaggregator instance. |
preprocessing
abstractmethod
¶
preprocessing(Q_obs: Union[Series, DataFrame], *, sites: Optional[List[str]] = None, **kwargs: Any) -> None
Preprocess and validate observed flow data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
Q_obs
|
Series or DataFrame
|
Observed historical flow data at the output resolution. |
required |
sites
|
list of str
|
Sites to use. If None, uses all columns. |
None
|
**kwargs
|
Any
|
Additional preprocessing parameters. |
{}
|
fit
abstractmethod
¶
fit(Q_obs: Optional[Union[Series, DataFrame]] = None, *, sites: Optional[List[str]] = None, **kwargs: Any) -> None
Fit the disaggregator to observed flow data.
If Q_obs is provided, preprocessing() is called automatically.
If omitted, a prior call to preprocessing() is required.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
Q_obs
|
Series or DataFrame
|
Observed data. If provided, runs preprocessing automatically. |
None
|
sites
|
list of str
|
Sites to use (only when Q_obs is provided). |
None
|
**kwargs
|
Any
|
Additional fitting parameters. |
{}
|
disaggregate
abstractmethod
¶
Disaggregate synthetic flows from coarser to finer temporal resolution.
Implementations should: 1. Call validate_fit() at start 2. Call validate_input_ensemble() to check compatibility 3. Disaggregate each realization in the ensemble 4. Return new Ensemble with finer temporal resolution
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ensemble
|
Ensemble
|
Input ensemble at coarser temporal resolution |
required |
**kwargs
|
Any
|
Additional disaggregation parameters. |
{}
|
Returns:
| Type | Description |
|---|---|
Ensemble
|
Disaggregated ensemble at finer temporal resolution |
NowakDisaggregator¶
NowakDisaggregator ¶
NowakDisaggregator(*, n_neighbors: int = 5, max_month_shift: int = 7, blend_days: int = 2, name: str = None, debug: bool = False)
Bases: Disaggregator
Temporal disaggregation from monthly to daily as described in Nowak et al. (2010).
Supports both single-site and multisite disaggregation from monthly to daily streamflows.
For each month in synthetic data, finds the N historic monthly flow profiles which have similar total flow at the index gauge (sum of all sites).
Then, randomly selects one of the N profiles and uses the daily flow proportions from that month to disaggregate the synthetic monthly flow at all sites.
When disaggregating a month, only considers historic profiles from the same month of interest, with +/- max_month_shift days around each month.
References
Nowak, K., Prairie, J., Rajagopalan, B., & Lall, U. (2010). A nonparametric stochastic approach for multisite disaggregation of annual to daily streamflow. Water Resources Research, 46(8).
Initialize the Nowak Disaggregator.
Supports both single site (Series) and multi-site (DataFrame) disaggregation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
n_neighbors
|
int
|
Number of K-nearest neighbors to consider for disaggregation. |
5
|
max_month_shift
|
int
|
Maximum number of days to shift around each month center when creating historic monthly flow profiles. |
7
|
blend_days
|
int or None
|
Number of days on each side of month boundaries to smooth. Reduces artificial discontinuities at month transitions. Set to None or 0 to disable boundary smoothing. |
2
|
name
|
str
|
Name for this disaggregator instance. |
None
|
debug
|
bool
|
Enable debug logging. |
False
|
preprocessing ¶
Preprocess observed daily flow data.
Validates input data and detects single-site vs multisite configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
Q_obs
|
Series or DataFrame
|
Daily streamflow data for the historic period. Must have DatetimeIndex with daily frequency. If DataFrame, columns represent different sites. |
required |
sites
|
list of str
|
Sites to use. If None, uses all columns. |
None
|
**kwargs
|
Additional preprocessing parameters (currently unused). |
{}
|
fit ¶
Fit the Nowak Disaggregator to the data.
Creates a dataset of candidate monthly flow profiles for each month, and trains KNN models for each month.
If Q_obs is provided, preprocessing() is called automatically.
If omitted, a prior call to preprocessing() is required.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
Q_obs
|
Series or DataFrame
|
Observed data. If provided, runs preprocessing automatically. |
None
|
sites
|
list of str
|
Sites to use (only when Q_obs is provided). |
None
|
**kwargs
|
Additional fitting parameters (currently unused). |
{}
|
disaggregate ¶
disaggregate(ensemble: Ensemble, n_neighbors=None, sample_method='distance_weighted', seed=None, **kwargs) -> Ensemble
Disaggregate monthly ensemble to daily flows using the Nowak method.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
ensemble
|
Ensemble
|
Monthly streamflow ensemble to disaggregate. Must have frequency 'MS' (monthly start). |
required |
n_neighbors
|
int
|
Number of neighbors to use for disaggregation. If None, uses the value from initialization. |
None
|
sample_method
|
str
|
Method to use for sampling the K nearest neighbors. |
'distance_weighted'
|
seed
|
int
|
Random seed for reproducibility. |
None
|
**kwargs
|
Additional disaggregation parameters. |
{}
|
Returns:
| Type | Description |
|---|---|
Ensemble
|
Disaggregated daily streamflow ensemble. |
find_knn_indices ¶
Given cumulative monthly flow values, find the K nearest neighbors from the historic dataset.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
Qs_monthly_array
|
array
|
The cumulative monthly flow values for the month to disaggregate. |
required |
month
|
int
|
The calendar month which is being disaggregated (1-12). |
required |
n_neighbors
|
int
|
The number of neighbors to find. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
distances |
array
|
The distances to the K nearest neighbors. |
indices |
array
|
The indices of the K nearest neighbors in the historic dataset. |
sample_knn_monthly_flows ¶
sample_knn_monthly_flows(Qs_monthly_array, month, n_neighbors=None, sample_method='distance_weighted', *, rng=None)
Given cumulative monthly flow values, sample K nearest neighbors from the historic dataset.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
Qs_monthly_array
|
array
|
The cumulative monthly flow values for the month to disaggregate. |
required |
month
|
int
|
The calendar month which is being disaggregated (1-12). |
required |
n_neighbors
|
int
|
The number of neighbors to sample. |
None
|
sample_method
|
str
|
The sampling method to use. |
'distance_weighted'
|
Returns:
| Name | Type | Description |
|---|---|---|
sampled_indices |
array
|
The sampled indices from the historic dataset. |
disaggregate_monthly_flows ¶
Disaggregate monthly to daily flows using the Nowak method.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
Qs_monthly
|
Series or DataFrame
|
Monthly streamflow data for the synthetic period. The index should be a datetime index. For multisite, should be DataFrame with same columns as historic data. |
required |
n_neighbors
|
int
|
The number of neighbors to use for disaggregation. |
None
|
sample_method
|
str
|
The method to use for sampling the K nearest neighbors. |
'distance_weighted'
|
Returns:
| Name | Type | Description |
|---|---|---|
Qs_daily |
Series or DataFrame
|
Daily streamflow data for the synthetic period. The index will be a datetime index. For multisite, returns DataFrame with same columns as input. |
Pipelines¶
GeneratorDisaggregatorPipeline ¶
GeneratorDisaggregatorPipeline(generator: Generator, disaggregator: Disaggregator, name: Optional[str] = None, debug: bool = False)
Pipeline for composing a generator with a disaggregator.
This class orchestrates the flow from generation to disaggregation, ensuring compatibility between components and managing the complete workflow.
Data is not passed at construction time. Use preprocessing(Q_obs)
or fit(Q_obs) to supply observed flow data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
generator
|
Generator
|
An unfitted generator instance (constructed without data). |
required |
disaggregator
|
Disaggregator
|
An unfitted disaggregator instance (constructed without data). |
required |
name
|
str
|
Name for this pipeline instance. |
None
|
debug
|
bool
|
Enable debug logging. |
False
|
Examples:
>>> from synhydro.methods.generation.nonparametric.kirsch import KirschGenerator
>>> from synhydro.methods.disaggregation.temporal.nowak import NowakDisaggregator
>>> from synhydro.core.pipeline import GeneratorDisaggregatorPipeline
>>>
>>> # Create components (no data)
>>> generator = KirschGenerator()
>>> disaggregator = NowakDisaggregator()
>>>
>>> # Create pipeline
>>> pipeline = GeneratorDisaggregatorPipeline(generator, disaggregator)
>>>
>>> # Fit and generate
>>> pipeline.preprocessing(Q_daily)
>>> pipeline.fit()
>>> daily_ensemble = pipeline.generate(n_realizations=10, n_years=50)
Initialize the pipeline with generator and disaggregator components.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
generator
|
Generator
|
Generator instance for producing synthetic flows. |
required |
disaggregator
|
Disaggregator
|
Disaggregator instance for temporal disaggregation. |
required |
name
|
str
|
Name for this pipeline. |
None
|
debug
|
bool
|
Enable debug logging. |
False
|
Raises:
| Type | Description |
|---|---|
TypeError
|
If components are not proper Generator/Disaggregator instances. |
is_preprocessed
property
¶
Check if both components are preprocessed.
Returns:
| Type | Description |
|---|---|
bool
|
True if both generator and disaggregator are preprocessed. |
is_fitted
property
¶
Check if both components are fitted.
Returns:
| Type | Description |
|---|---|
bool
|
True if both generator and disaggregator are fitted. |
output_frequency
property
¶
Get the final output frequency of the pipeline.
Returns:
| Type | Description |
|---|---|
str
|
Pandas frequency string (e.g., 'D' for daily). |
validate_compatibility ¶
Validate that generator and disaggregator are compatible.
Checks that the generator's output frequency matches the disaggregator's input frequency.
Raises:
| Type | Description |
|---|---|
ValueError
|
If frequencies are incompatible. |
preprocessing ¶
preprocessing(Q_obs: Union[Series, DataFrame], *, sites: Optional[List[str]] = None, **kwargs) -> None
Preprocess both generator and disaggregator.
Passes Q_obs to both components' preprocessing() methods
in sequence.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
Q_obs
|
Series or DataFrame
|
Observed historical flow data. |
required |
sites
|
list of str
|
Sites to use. If None, uses all columns. |
None
|
**kwargs
|
Additional preprocessing parameters passed to both components. |
{}
|
fit ¶
fit(Q_obs: Optional[Union[Series, DataFrame]] = None, *, sites: Optional[List[str]] = None, **kwargs) -> None
Fit both generator and disaggregator.
If Q_obs is provided, preprocessing() is called automatically
before fitting. If omitted, a prior call to preprocessing() is
required.
After preprocessing, validates frequency compatibility between generator and disaggregator before fitting.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
Q_obs
|
Series or DataFrame
|
Observed data. If provided, runs preprocessing automatically. |
None
|
sites
|
list of str
|
Sites to use (only when Q_obs is provided). |
None
|
**kwargs
|
Additional fitting parameters passed to both components. |
{}
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If preprocessing has not been completed and Q_obs is not provided. |
generate ¶
generate(n_realizations: int = 1, n_years: Optional[int] = None, n_timesteps: Optional[int] = None, seed: Optional[int] = None, **kwargs) -> Ensemble
Generate and disaggregate synthetic flows through the pipeline.
This method orchestrates the complete workflow: 1. Generate monthly (or other coarse) synthetic flows using the generator 2. Disaggregate to finer temporal resolution using the disaggregator 3. Return the final ensemble
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
n_realizations
|
int
|
Number of synthetic realizations to generate. |
1
|
n_years
|
int
|
Number of years to generate. |
None
|
n_timesteps
|
int
|
Number of timesteps to generate explicitly. |
None
|
seed
|
int
|
Random seed for reproducibility. |
None
|
**kwargs
|
Additional parameters passed to generator and disaggregator. |
{}
|
Returns:
| Type | Description |
|---|---|
Ensemble
|
Final disaggregated ensemble at the output frequency. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If pipeline has not been fitted. |
summary ¶
Generate a summary of the pipeline configuration and status.
Returns:
| Type | Description |
|---|---|
str
|
Formatted summary string. |
save ¶
Save the entire pipeline to file.
Saves both the generator and disaggregator, preserving their fitted state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filepath
|
str
|
Path to save the pipeline. |
required |
Raises:
| Type | Description |
|---|---|
ValueError
|
If pipeline is not fitted. |
load
classmethod
¶
Load a pipeline from file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
filepath
|
str
|
Path to saved pipeline file. |
required |
Returns:
| Type | Description |
|---|---|
GeneratorDisaggregatorPipeline
|
Loaded pipeline instance. |
KirschNowakPipeline¶
KirschNowakPipeline ¶
KirschNowakPipeline(*, generate_using_log_flow: bool = True, matrix_repair_method: str = 'spectral', n_neighbors: int = 5, max_month_shift: int = 7, name: Optional[str] = None, debug: bool = False)
Bases: GeneratorDisaggregatorPipeline
Pre-configured pipeline combining Kirsch generator with Nowak disaggregator.
This pipeline generates monthly synthetic flows using the Kirsch nonparametric bootstrap method, then disaggregates them to daily flows using the Nowak KNN-based temporal disaggregation method.
Data is not passed at construction time. Use preprocessing(Q_obs) or
fit(Q_obs) to supply observed flow data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
generate_using_log_flow
|
bool
|
Whether to generate in log-space (Kirsch parameter). |
True
|
matrix_repair_method
|
str
|
Method for repairing correlation matrices (Kirsch parameter). |
'spectral'
|
n_neighbors
|
int
|
Number of KNN neighbors for disaggregation (Nowak parameter). |
5
|
max_month_shift
|
int
|
Maximum day shift for monthly profiles (Nowak parameter). |
7
|
name
|
str
|
Name for this pipeline instance. |
None
|
debug
|
bool
|
Enable debug logging. |
False
|
Examples:
>>> import pandas as pd
>>> from synhydro.pipelines import KirschNowakPipeline
>>>
>>> # Load daily historic flows
>>> Q_daily = pd.read_csv('daily_flows.csv', index_col=0, parse_dates=True)
>>>
>>> # Create pipeline (no data)
>>> pipeline = KirschNowakPipeline()
>>>
>>> # Fit and generate
>>> pipeline.preprocessing(Q_daily)
>>> pipeline.fit()
>>> daily_ensemble = pipeline.generate(n_realizations=100, n_years=50)
Notes
This pipeline is equivalent to creating:
References
Kirsch generator: Nonparametric bootstrap with correlation preservation Nowak disaggregator: KNN-based temporal disaggregation (Nowak et al., 2010)
Initialize the Kirsch-Nowak pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
generate_using_log_flow
|
bool
|
Generate in log-space for Kirsch. |
True
|
matrix_repair_method
|
str
|
Correlation matrix repair method for Kirsch. |
'spectral'
|
n_neighbors
|
int
|
Number of KNN neighbors for Nowak. |
5
|
max_month_shift
|
int
|
Day shift for Nowak monthly profiles. |
7
|
name
|
str
|
Pipeline name. |
None
|
debug
|
bool
|
Enable debug logging. |
False
|
ThomasFieringNowakPipeline¶
ThomasFieringNowakPipeline ¶
ThomasFieringNowakPipeline(*, n_neighbors: int = 5, max_month_shift: int = 7, name: Optional[str] = None, debug: bool = False)
Bases: GeneratorDisaggregatorPipeline
Pre-configured pipeline combining Thomas-Fiering generator with Nowak disaggregator.
This pipeline generates monthly synthetic flows using the Thomas-Fiering AR(1) parametric method with Stedinger-Taylor normalization, then disaggregates them to daily flows using the Nowak KNN-based temporal disaggregation method.
Note: Thomas-Fiering is a univariate method, so only single-site generation is supported. For multisite, use KirschNowakPipeline instead.
Data is not passed at construction time. Use preprocessing(Q_obs) or
fit(Q_obs) to supply observed flow data.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
n_neighbors
|
int
|
Number of KNN neighbors for disaggregation (Nowak parameter). |
5
|
max_month_shift
|
int
|
Maximum day shift for monthly profiles (Nowak parameter). |
7
|
name
|
str
|
Name for this pipeline instance. |
None
|
debug
|
bool
|
Enable debug logging. |
False
|
Examples:
>>> import pandas as pd
>>> from synhydro.pipelines import ThomasFieringNowakPipeline
>>>
>>> # Load daily historic flows (single site)
>>> Q_daily = pd.read_csv('daily_flows.csv', index_col=0, parse_dates=True)
>>>
>>> # Create pipeline (no data)
>>> pipeline = ThomasFieringNowakPipeline()
>>>
>>> # Fit and generate
>>> pipeline.preprocessing(Q_daily['site_1'])
>>> pipeline.fit()
>>> daily_ensemble = pipeline.generate(n_realizations=100, n_years=50)
Notes
This pipeline is equivalent to creating:
References
Thomas-Fiering: AR(1) with Stedinger-Taylor normalization Nowak disaggregator: KNN-based temporal disaggregation (Nowak et al., 2010)
Initialize the Thomas-Fiering-Nowak pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
n_neighbors
|
int
|
Number of KNN neighbors for Nowak. |
5
|
max_month_shift
|
int
|
Day shift for Nowak monthly profiles. |
7
|
name
|
str
|
Pipeline name. |
None
|
debug
|
bool
|
Enable debug logging. |
False
|