Skip to content

Disaggregators & Pipelines

Base Class

Disaggregator

Disaggregator(*, name: Optional[str] = None, debug: bool = False)

Bases: ABC

Abstract base class for all temporal disaggregation methods.

Disaggregators transform synthetic flows from one temporal resolution to a finer resolution (e.g., monthly to daily).

All disaggregator implementations should inherit from this class.

Initialize the disaggregator with algorithm configuration.

Subclasses add algorithm-specific keyword-only parameters before name and debug. Data is not passed here — use fit(Q_obs) or preprocessing(Q_obs) instead.

Parameters:

Name Type Description Default
name str

Name identifier for this disaggregator instance.

None
debug bool

Enable debug logging.

False

is_fitted property

is_fitted: bool

Check if disaggregator is fitted.

is_preprocessed property

is_preprocessed: bool

Check if preprocessing is complete.

n_sites property

n_sites: int

Number of sites in the disaggregator.

Returns:

Type Description
int

Number of sites.

Raises:

Type Description
ValueError

If preprocessing not yet run.

sites property

sites: List[str]

List of site names.

Returns:

Type Description
List[str]

Site identifiers.

Raises:

Type Description
ValueError

If preprocessing not yet run.

input_frequency abstractmethod property

input_frequency: str

Expected temporal frequency of input ensemble.

Returns:

Type Description
str

Pandas frequency string (e.g., 'MS' for monthly, 'W' for weekly).

output_frequency abstractmethod property

output_frequency: str

Temporal frequency of disaggregated output.

Returns:

Type Description
str

Pandas frequency string (e.g., 'D' for daily, 'H' for hourly).

validate_input_data

validate_input_data(data: Union[Series, DataFrame]) -> pd.DataFrame

Validate and standardize input data format.

Checks type, DatetimeIndex, NaN content, and negative values.

Parameters:

Name Type Description Default
data Series or DataFrame

Input time series data

required

Returns:

Type Description
DataFrame

Validated and standardized data

Raises:

Type Description
ValueError

If data format is invalid

TypeError

If data type is unsupported

validate_preprocessing

validate_preprocessing() -> None

Check if preprocessing has been completed.

Raises:

Type Description
ValueError

If preprocessing() has not been run.

validate_fit

validate_fit() -> None

Check if disaggregator has been fitted.

Raises:

Type Description
ValueError

If fit() has not been run.

update_state

update_state(preprocessed: Optional[bool] = None, fitted: Optional[bool] = None) -> None

Update disaggregator state flags.

Parameters:

Name Type Description Default
preprocessed bool

Set preprocessing state.

None
fitted bool

Set fitted state.

None

validate_input_ensemble

validate_input_ensemble(ensemble: Ensemble) -> None

Validate that input ensemble is compatible with disaggregator.

Checks temporal frequency and site consistency.

Parameters:

Name Type Description Default
ensemble Ensemble

Input ensemble to validate

required

Raises:

Type Description
ValueError

If ensemble is incompatible with disaggregator

get_params

get_params(deep: bool = True) -> Dict[str, Any]

Get initialization parameters (scikit-learn style).

Returns only constructor/configuration parameters, not fitted values.

Parameters:

Name Type Description Default
deep bool

If True, return deep copy of parameters.

True

Returns:

Type Description
Dict[str, Any]

Dictionary of initialization parameters.

get_fitted_params

get_fitted_params() -> Dict[str, Any]

Get parameters learned from data during fit().

Returns:

Type Description
Dict[str, Any]

Dictionary of fitted parameters (all keys end with underscore).

Raises:

Type Description
ValueError

If disaggregator has not been fitted yet.

summary

summary(show_fitted: bool = True) -> str

Generate comprehensive summary of disaggregator configuration and fit.

Parameters:

Name Type Description Default
show_fitted bool

Whether to include fitted parameters in summary.

True

Returns:

Type Description
str

Formatted summary string.

save

save(filepath: str) -> None

Save fitted disaggregator to file using pickle.

Parameters:

Name Type Description Default
filepath str

Path to save the disaggregator.

required

Raises:

Type Description
ValueError

If disaggregator is not fitted.

load classmethod

load(filepath: str) -> Disaggregator

Load fitted disaggregator from file.

Parameters:

Name Type Description Default
filepath str

Path to saved disaggregator file.

required

Returns:

Type Description
Disaggregator

Loaded disaggregator instance.

preprocessing abstractmethod

preprocessing(Q_obs: Union[Series, DataFrame], *, sites: Optional[List[str]] = None, **kwargs: Any) -> None

Preprocess and validate observed flow data.

Parameters:

Name Type Description Default
Q_obs Series or DataFrame

Observed historical flow data at the output resolution.

required
sites list of str

Sites to use. If None, uses all columns.

None
**kwargs Any

Additional preprocessing parameters.

{}

fit abstractmethod

fit(Q_obs: Optional[Union[Series, DataFrame]] = None, *, sites: Optional[List[str]] = None, **kwargs: Any) -> None

Fit the disaggregator to observed flow data.

If Q_obs is provided, preprocessing() is called automatically. If omitted, a prior call to preprocessing() is required.

Parameters:

Name Type Description Default
Q_obs Series or DataFrame

Observed data. If provided, runs preprocessing automatically.

None
sites list of str

Sites to use (only when Q_obs is provided).

None
**kwargs Any

Additional fitting parameters.

{}

disaggregate abstractmethod

disaggregate(ensemble: Ensemble, **kwargs: Any) -> Ensemble

Disaggregate synthetic flows from coarser to finer temporal resolution.

Implementations should: 1. Call validate_fit() at start 2. Call validate_input_ensemble() to check compatibility 3. Disaggregate each realization in the ensemble 4. Return new Ensemble with finer temporal resolution

Parameters:

Name Type Description Default
ensemble Ensemble

Input ensemble at coarser temporal resolution

required
**kwargs Any

Additional disaggregation parameters.

{}

Returns:

Type Description
Ensemble

Disaggregated ensemble at finer temporal resolution


NowakDisaggregator

NowakDisaggregator

NowakDisaggregator(*, n_neighbors: int = 5, max_month_shift: int = 7, blend_days: int = 2, name: str = None, debug: bool = False)

Bases: Disaggregator

Temporal disaggregation from monthly to daily as described in Nowak et al. (2010).

Supports both single-site and multisite disaggregation from monthly to daily streamflows.

For each month in synthetic data, finds the N historic monthly flow profiles which have similar total flow at the index gauge (sum of all sites).

Then, randomly selects one of the N profiles and uses the daily flow proportions from that month to disaggregate the synthetic monthly flow at all sites.

When disaggregating a month, only considers historic profiles from the same month of interest, with +/- max_month_shift days around each month.

References

Nowak, K., Prairie, J., Rajagopalan, B., & Lall, U. (2010). A nonparametric stochastic approach for multisite disaggregation of annual to daily streamflow. Water Resources Research, 46(8).

Initialize the Nowak Disaggregator.

Supports both single site (Series) and multi-site (DataFrame) disaggregation.

Parameters:

Name Type Description Default
n_neighbors int

Number of K-nearest neighbors to consider for disaggregation.

5
max_month_shift int

Maximum number of days to shift around each month center when creating historic monthly flow profiles.

7
blend_days int or None

Number of days on each side of month boundaries to smooth. Reduces artificial discontinuities at month transitions. Set to None or 0 to disable boundary smoothing.

2
name str

Name for this disaggregator instance.

None
debug bool

Enable debug logging.

False

input_frequency property

input_frequency: str

Nowak disaggregator expects monthly input.

output_frequency property

output_frequency: str

Nowak disaggregator produces daily output.

preprocessing

preprocessing(Q_obs, *, sites=None, **kwargs)

Preprocess observed daily flow data.

Validates input data and detects single-site vs multisite configuration.

Parameters:

Name Type Description Default
Q_obs Series or DataFrame

Daily streamflow data for the historic period. Must have DatetimeIndex with daily frequency. If DataFrame, columns represent different sites.

required
sites list of str

Sites to use. If None, uses all columns.

None
**kwargs

Additional preprocessing parameters (currently unused).

{}

fit

fit(Q_obs=None, *, sites=None, **kwargs)

Fit the Nowak Disaggregator to the data.

Creates a dataset of candidate monthly flow profiles for each month, and trains KNN models for each month.

If Q_obs is provided, preprocessing() is called automatically. If omitted, a prior call to preprocessing() is required.

Parameters:

Name Type Description Default
Q_obs Series or DataFrame

Observed data. If provided, runs preprocessing automatically.

None
sites list of str

Sites to use (only when Q_obs is provided).

None
**kwargs

Additional fitting parameters (currently unused).

{}

disaggregate

disaggregate(ensemble: Ensemble, n_neighbors=None, sample_method='distance_weighted', seed=None, **kwargs) -> Ensemble

Disaggregate monthly ensemble to daily flows using the Nowak method.

Parameters:

Name Type Description Default
ensemble Ensemble

Monthly streamflow ensemble to disaggregate. Must have frequency 'MS' (monthly start).

required
n_neighbors int

Number of neighbors to use for disaggregation. If None, uses the value from initialization.

None
sample_method str

Method to use for sampling the K nearest neighbors.

'distance_weighted'
seed int

Random seed for reproducibility.

None
**kwargs

Additional disaggregation parameters.

{}

Returns:

Type Description
Ensemble

Disaggregated daily streamflow ensemble.

find_knn_indices

find_knn_indices(Qs_monthly_array, month, n_neighbors=None)

Given cumulative monthly flow values, find the K nearest neighbors from the historic dataset.

Parameters:

Name Type Description Default
Qs_monthly_array array

The cumulative monthly flow values for the month to disaggregate.

required
month int

The calendar month which is being disaggregated (1-12).

required
n_neighbors int

The number of neighbors to find.

None

Returns:

Name Type Description
distances array

The distances to the K nearest neighbors.

indices array

The indices of the K nearest neighbors in the historic dataset.

sample_knn_monthly_flows

sample_knn_monthly_flows(Qs_monthly_array, month, n_neighbors=None, sample_method='distance_weighted', *, rng=None)

Given cumulative monthly flow values, sample K nearest neighbors from the historic dataset.

Parameters:

Name Type Description Default
Qs_monthly_array array

The cumulative monthly flow values for the month to disaggregate.

required
month int

The calendar month which is being disaggregated (1-12).

required
n_neighbors int

The number of neighbors to sample.

None
sample_method str

The sampling method to use.

'distance_weighted'

Returns:

Name Type Description
sampled_indices array

The sampled indices from the historic dataset.

disaggregate_monthly_flows

disaggregate_monthly_flows(Qs_monthly, n_neighbors=None, sample_method='distance_weighted')

Disaggregate monthly to daily flows using the Nowak method.

Parameters:

Name Type Description Default
Qs_monthly Series or DataFrame

Monthly streamflow data for the synthetic period. The index should be a datetime index. For multisite, should be DataFrame with same columns as historic data.

required
n_neighbors int

The number of neighbors to use for disaggregation.

None
sample_method str

The method to use for sampling the K nearest neighbors.

'distance_weighted'

Returns:

Name Type Description
Qs_daily Series or DataFrame

Daily streamflow data for the synthetic period. The index will be a datetime index. For multisite, returns DataFrame with same columns as input.


Pipelines

GeneratorDisaggregatorPipeline

GeneratorDisaggregatorPipeline(generator: Generator, disaggregator: Disaggregator, name: Optional[str] = None, debug: bool = False)

Pipeline for composing a generator with a disaggregator.

This class orchestrates the flow from generation to disaggregation, ensuring compatibility between components and managing the complete workflow.

Data is not passed at construction time. Use preprocessing(Q_obs) or fit(Q_obs) to supply observed flow data.

Parameters:

Name Type Description Default
generator Generator

An unfitted generator instance (constructed without data).

required
disaggregator Disaggregator

An unfitted disaggregator instance (constructed without data).

required
name str

Name for this pipeline instance.

None
debug bool

Enable debug logging.

False

Examples:

>>> from synhydro.methods.generation.nonparametric.kirsch import KirschGenerator
>>> from synhydro.methods.disaggregation.temporal.nowak import NowakDisaggregator
>>> from synhydro.core.pipeline import GeneratorDisaggregatorPipeline
>>>
>>> # Create components (no data)
>>> generator = KirschGenerator()
>>> disaggregator = NowakDisaggregator()
>>>
>>> # Create pipeline
>>> pipeline = GeneratorDisaggregatorPipeline(generator, disaggregator)
>>>
>>> # Fit and generate
>>> pipeline.preprocessing(Q_daily)
>>> pipeline.fit()
>>> daily_ensemble = pipeline.generate(n_realizations=10, n_years=50)

Initialize the pipeline with generator and disaggregator components.

Parameters:

Name Type Description Default
generator Generator

Generator instance for producing synthetic flows.

required
disaggregator Disaggregator

Disaggregator instance for temporal disaggregation.

required
name str

Name for this pipeline.

None
debug bool

Enable debug logging.

False

Raises:

Type Description
TypeError

If components are not proper Generator/Disaggregator instances.

is_preprocessed property

is_preprocessed: bool

Check if both components are preprocessed.

Returns:

Type Description
bool

True if both generator and disaggregator are preprocessed.

is_fitted property

is_fitted: bool

Check if both components are fitted.

Returns:

Type Description
bool

True if both generator and disaggregator are fitted.

output_frequency property

output_frequency: str

Get the final output frequency of the pipeline.

Returns:

Type Description
str

Pandas frequency string (e.g., 'D' for daily).

validate_compatibility

validate_compatibility() -> None

Validate that generator and disaggregator are compatible.

Checks that the generator's output frequency matches the disaggregator's input frequency.

Raises:

Type Description
ValueError

If frequencies are incompatible.

preprocessing

preprocessing(Q_obs: Union[Series, DataFrame], *, sites: Optional[List[str]] = None, **kwargs) -> None

Preprocess both generator and disaggregator.

Passes Q_obs to both components' preprocessing() methods in sequence.

Parameters:

Name Type Description Default
Q_obs Series or DataFrame

Observed historical flow data.

required
sites list of str

Sites to use. If None, uses all columns.

None
**kwargs

Additional preprocessing parameters passed to both components.

{}

fit

fit(Q_obs: Optional[Union[Series, DataFrame]] = None, *, sites: Optional[List[str]] = None, **kwargs) -> None

Fit both generator and disaggregator.

If Q_obs is provided, preprocessing() is called automatically before fitting. If omitted, a prior call to preprocessing() is required.

After preprocessing, validates frequency compatibility between generator and disaggregator before fitting.

Parameters:

Name Type Description Default
Q_obs Series or DataFrame

Observed data. If provided, runs preprocessing automatically.

None
sites list of str

Sites to use (only when Q_obs is provided).

None
**kwargs

Additional fitting parameters passed to both components.

{}

Raises:

Type Description
ValueError

If preprocessing has not been completed and Q_obs is not provided.

generate

generate(n_realizations: int = 1, n_years: Optional[int] = None, n_timesteps: Optional[int] = None, seed: Optional[int] = None, **kwargs) -> Ensemble

Generate and disaggregate synthetic flows through the pipeline.

This method orchestrates the complete workflow: 1. Generate monthly (or other coarse) synthetic flows using the generator 2. Disaggregate to finer temporal resolution using the disaggregator 3. Return the final ensemble

Parameters:

Name Type Description Default
n_realizations int

Number of synthetic realizations to generate.

1
n_years int

Number of years to generate.

None
n_timesteps int

Number of timesteps to generate explicitly.

None
seed int

Random seed for reproducibility.

None
**kwargs

Additional parameters passed to generator and disaggregator.

{}

Returns:

Type Description
Ensemble

Final disaggregated ensemble at the output frequency.

Raises:

Type Description
ValueError

If pipeline has not been fitted.

summary

summary() -> str

Generate a summary of the pipeline configuration and status.

Returns:

Type Description
str

Formatted summary string.

save

save(filepath: str) -> None

Save the entire pipeline to file.

Saves both the generator and disaggregator, preserving their fitted state.

Parameters:

Name Type Description Default
filepath str

Path to save the pipeline.

required

Raises:

Type Description
ValueError

If pipeline is not fitted.

load classmethod

load(filepath: str) -> GeneratorDisaggregatorPipeline

Load a pipeline from file.

Parameters:

Name Type Description Default
filepath str

Path to saved pipeline file.

required

Returns:

Type Description
GeneratorDisaggregatorPipeline

Loaded pipeline instance.


KirschNowakPipeline

KirschNowakPipeline

KirschNowakPipeline(*, generate_using_log_flow: bool = True, matrix_repair_method: str = 'spectral', n_neighbors: int = 5, max_month_shift: int = 7, name: Optional[str] = None, debug: bool = False)

Bases: GeneratorDisaggregatorPipeline

Pre-configured pipeline combining Kirsch generator with Nowak disaggregator.

This pipeline generates monthly synthetic flows using the Kirsch nonparametric bootstrap method, then disaggregates them to daily flows using the Nowak KNN-based temporal disaggregation method.

Data is not passed at construction time. Use preprocessing(Q_obs) or fit(Q_obs) to supply observed flow data.

Parameters:

Name Type Description Default
generate_using_log_flow bool

Whether to generate in log-space (Kirsch parameter).

True
matrix_repair_method str

Method for repairing correlation matrices (Kirsch parameter).

'spectral'
n_neighbors int

Number of KNN neighbors for disaggregation (Nowak parameter).

5
max_month_shift int

Maximum day shift for monthly profiles (Nowak parameter).

7
name str

Name for this pipeline instance.

None
debug bool

Enable debug logging.

False

Examples:

>>> import pandas as pd
>>> from synhydro.pipelines import KirschNowakPipeline
>>>
>>> # Load daily historic flows
>>> Q_daily = pd.read_csv('daily_flows.csv', index_col=0, parse_dates=True)
>>>
>>> # Create pipeline (no data)
>>> pipeline = KirschNowakPipeline()
>>>
>>> # Fit and generate
>>> pipeline.preprocessing(Q_daily)
>>> pipeline.fit()
>>> daily_ensemble = pipeline.generate(n_realizations=100, n_years=50)
Notes

This pipeline is equivalent to creating:

generator = KirschGenerator(generate_using_log_flow=True)
disaggregator = NowakDisaggregator(n_neighbors=5)
pipeline = GeneratorDisaggregatorPipeline(generator, disaggregator)
pipeline.fit(Q_obs)

References

Kirsch generator: Nonparametric bootstrap with correlation preservation Nowak disaggregator: KNN-based temporal disaggregation (Nowak et al., 2010)

Initialize the Kirsch-Nowak pipeline.

Parameters:

Name Type Description Default
generate_using_log_flow bool

Generate in log-space for Kirsch.

True
matrix_repair_method str

Correlation matrix repair method for Kirsch.

'spectral'
n_neighbors int

Number of KNN neighbors for Nowak.

5
max_month_shift int

Day shift for Nowak monthly profiles.

7
name str

Pipeline name.

None
debug bool

Enable debug logging.

False

ThomasFieringNowakPipeline

ThomasFieringNowakPipeline

ThomasFieringNowakPipeline(*, n_neighbors: int = 5, max_month_shift: int = 7, name: Optional[str] = None, debug: bool = False)

Bases: GeneratorDisaggregatorPipeline

Pre-configured pipeline combining Thomas-Fiering generator with Nowak disaggregator.

This pipeline generates monthly synthetic flows using the Thomas-Fiering AR(1) parametric method with Stedinger-Taylor normalization, then disaggregates them to daily flows using the Nowak KNN-based temporal disaggregation method.

Note: Thomas-Fiering is a univariate method, so only single-site generation is supported. For multisite, use KirschNowakPipeline instead.

Data is not passed at construction time. Use preprocessing(Q_obs) or fit(Q_obs) to supply observed flow data.

Parameters:

Name Type Description Default
n_neighbors int

Number of KNN neighbors for disaggregation (Nowak parameter).

5
max_month_shift int

Maximum day shift for monthly profiles (Nowak parameter).

7
name str

Name for this pipeline instance.

None
debug bool

Enable debug logging.

False

Examples:

>>> import pandas as pd
>>> from synhydro.pipelines import ThomasFieringNowakPipeline
>>>
>>> # Load daily historic flows (single site)
>>> Q_daily = pd.read_csv('daily_flows.csv', index_col=0, parse_dates=True)
>>>
>>> # Create pipeline (no data)
>>> pipeline = ThomasFieringNowakPipeline()
>>>
>>> # Fit and generate
>>> pipeline.preprocessing(Q_daily['site_1'])
>>> pipeline.fit()
>>> daily_ensemble = pipeline.generate(n_realizations=100, n_years=50)
Notes

This pipeline is equivalent to creating:

generator = ThomasFieringGenerator()
disaggregator = NowakDisaggregator(n_neighbors=5)
pipeline = GeneratorDisaggregatorPipeline(generator, disaggregator)
pipeline.fit(Q_obs)

References

Thomas-Fiering: AR(1) with Stedinger-Taylor normalization Nowak disaggregator: KNN-based temporal disaggregation (Nowak et al., 2010)

Initialize the Thomas-Fiering-Nowak pipeline.

Parameters:

Name Type Description Default
n_neighbors int

Number of KNN neighbors for Nowak.

5
max_month_shift int

Day shift for Nowak monthly profiles.

7
name str

Pipeline name.

None
debug bool

Enable debug logging.

False