Skip to content

Container Package (swimrs.container)

Zarr-backed .swim storage with ingest/compute/export/query components, schema definitions, and storage providers.

Core entry points

open_container(uri: str | Path, mode: str = 'r', **storage_kwargs: Any) -> SwimContainer

Open an existing SWIM container.

Auto-detects storage format: - If path is a file → ZipStore - If path is a directory → DirectoryStore

Parameters:

Name Type Description Default
uri str | Path

Location of the container: - Local path: "project.swim", "/path/to/project.zarr" - S3 URI: "s3://bucket/key" (requires s3fs) - GCS URI: "gs://bucket/key" (requires gcsfs)

required
mode str

'r' for read-only, 'r+' for read-write

'r'
**storage_kwargs Any

Additional arguments for cloud storage providers

{}

Returns:

Type Description
SwimContainer

SwimContainer instance

Example

Local file (auto-detects zip or directory)

container = open_container("project.swim", mode="r+")

S3 storage

container = open_container( "s3://bucket/project.zarr", mode="r", aws_access_key_id="...", aws_secret_access_key="..." )

create_container(uri: str | Path, fields_shapefile: str | Path, uid_column: str, start_date: str | datetime, end_date: str | datetime, project_name: str = None, overwrite: bool = False, storage: str = 'auto', **storage_kwargs: Any) -> SwimContainer

Create a new SWIM container from a shapefile.

Parameters:

Name Type Description Default
uri str | Path

Location for the new container: - Local path: "project.swim" (default .swim extension added) - S3 URI: "s3://bucket/project.zarr" - GCS URI: "gs://bucket/project.zarr"

required
fields_shapefile str | Path

Path to shapefile with field geometries

required
uid_column str

Column name containing unique field identifiers

required
start_date str | datetime

Start of analysis period

required
end_date str | datetime

End of analysis period

required
project_name str

Optional project name

None
overwrite bool

If True, overwrite existing container

False
storage str

Storage backend for local paths: - "auto" (default): directory store (better for development) - "directory": explicit directory store - "zip": explicit zip store (better for sharing)

'auto'
**storage_kwargs Any

Additional arguments for cloud storage providers

{}

Returns:

Type Description
SwimContainer

New SwimContainer instance

Example

Local directory store (default)

container = create_container( "project.swim", fields_shapefile="fields.shp", uid_column="FID", start_date="2017-01-01", end_date="2023-12-31" )

Explicit zip store for sharing

container = create_container( "project.swim", fields_shapefile="fields.shp", uid_column="FID", start_date="2017-01-01", end_date="2023-12-31", storage="zip" )

S3 storage

container = create_container( "s3://bucket/project.zarr", fields_shapefile="fields.shp", uid_column="FID", start_date="2017-01-01", end_date="2023-12-31", aws_access_key_id="...", aws_secret_access_key="..." )

SwimContainer

Unified data container for SWIM-RS projects.

Stores all project data in a Zarr archive including: - Field geometries (from shapefile) - Remote sensing data (NDVI, ETF from Landsat/Sentinel/ECOSTRESS) - Meteorology (GridMET, ERA5) - Static properties (soils, land cover, irrigation masks) - Snow data (SNODAS) - Derived products (dynamics, fused NDVI)

Provides full provenance tracking and observability into data completeness.

Storage backends are pluggable: - Local .swim files (ZipStoreProvider) - default - Local directories (DirectoryStoreProvider) - faster for development - S3 buckets (S3StoreProvider) - cloud storage - GCS buckets (GCSStoreProvider) - cloud storage - Memory (MemoryStoreProvider) - for testing

Component-based API: - container.ingest: Data ingestion operations - container.compute: Derived data computation - container.export: Data export operations - container.query: Data access and status queries

Example

Create a new container

container = SwimContainer.create( "project.swim", fields_shapefile="fields.shp", uid_column="FID", start_date="2016-01-01", end_date="2023-12-31", project_name="My Project" )

Ingest data via component API

container.ingest.ndvi("path/to/csvs/", instrument="landsat", mask="irr") container.ingest.gridmet("path/to/met/")

Compute derived products

container.compute.dynamics(etf_model="ssebop")

Save and close

container.save()

Open existing container from URI (auto-selects backend)

container = SwimContainer.open("s3://bucket/project.zarr", mode="r")

create(uri: str | Path, fields_shapefile: str | Path, uid_column: str, start_date: str | datetime, end_date: str | datetime, project_name: str = None, overwrite: bool = False, storage: str = 'auto', **storage_kwargs: Any) -> SwimContainer classmethod

Create a new SwimContainer from a shapefile.

Parameters:

Name Type Description Default
uri str | Path

Location for the new container: - Local path: "project.swim" (default .swim extension added) - S3 URI: "s3://bucket/project.zarr" - GCS URI: "gs://bucket/project.zarr"

required
fields_shapefile str | Path

Path to shapefile with field geometries

required
uid_column str

Column name containing unique field identifiers

required
start_date str | datetime

Start of analysis period

required
end_date str | datetime

End of analysis period

required
project_name str

Optional project name

None
overwrite bool

If True, overwrite existing storage

False
storage str

Storage backend for local paths: - "auto" (default): directory store (better for development) - "directory": explicit directory store - "zip": explicit zip store (better for sharing)

'auto'
**storage_kwargs Any

Additional arguments passed to storage provider

{}

Returns:

Type Description
SwimContainer

New SwimContainer instance

Example

Local directory store (default)

container = SwimContainer.create( "project.swim", fields_shapefile="fields.shp", uid_column="FID", start_date="2016-01-01", end_date="2023-12-31" )

Explicit zip store for sharing

container = SwimContainer.create( "project.swim", fields_shapefile="fields.shp", uid_column="FID", start_date="2016-01-01", end_date="2023-12-31", storage="zip" )

S3

container = SwimContainer.create( "s3://bucket/project.zarr", fields_shapefile="fields.shp", uid_column="FID", start_date="2016-01-01", end_date="2023-12-31", aws_access_key_id="...", aws_secret_access_key="...")

open(uri: str | Path, mode: str = 'r', **kwargs: Any) -> SwimContainer classmethod

Open an existing container from a URI.

This is the preferred way to open containers, especially for cloud storage. Automatically selects the appropriate storage backend based on the URI.

Parameters:

Name Type Description Default
uri str | Path

Location of the container: - Local path: "project.swim", "/path/to/project.zarr" - File URI: "file:///path/to/project.swim" - S3 URI: "s3://bucket/key" (requires s3fs) - GCS URI: "gs://bucket/key" (requires gcsfs)

required
mode str

Access mode ('r', 'r+', 'a')

'r'
**kwargs Any

Additional arguments passed to the storage provider: - S3: aws_access_key_id, aws_secret_access_key, endpoint_url, etc. - GCS: project, token, etc.

{}

Returns:

Name Type Description
SwimContainer SwimContainer

Opened container instance

Example

Local file

container = SwimContainer.open("project.swim")

S3 with IAM credentials

container = SwimContainer.open("s3://bucket/project.zarr")

S3 with explicit credentials

container = SwimContainer.open( "s3://bucket/project.zarr", aws_access_key_id="AKIA...", aws_secret_access_key="..." )

close()

Close the container and release resources.

Components

Ingestor

Bases: Component

Component for ingesting data into the container.

Provides methods for ingesting remote sensing data, meteorology, properties, and other data sources. All methods use bulk xarray operations for efficiency and record provenance for audit trails.

Example

container.ingest.ndvi(source_dir, instrument="landsat", mask="irr") container.ingest.gridmet(met_dir) container.ingest.properties(lulc_csv="lulc.csv", soils_csv="soils.csv")

__init__(state: ContainerState, container=None)

Initialize the Ingestor.

Parameters:

Name Type Description Default
state ContainerState

ContainerState instance

required
container

Optional reference to parent SwimContainer

None

ndvi(source_dir: str | Path, uid_column: str = 'FID', instrument: str = 'landsat', mask: str = 'irr', fields: list[str] | None = None, overwrite: bool = False, min_ndvi: float = 0.05, apply_consecutive_filter: bool = True, workers: int = 1) -> ProvenanceEvent

Ingest NDVI data from Earth Engine CSV exports.

Parameters:

Name Type Description Default
source_dir str | Path

Directory containing CSV files

required
uid_column str

Column name for field UID in CSVs (default: "FID")

'FID'
instrument str

Source instrument ("landsat", "sentinel", "ecostress")

'landsat'
mask str

Mask type ("irr", "inv_irr", "no_mask")

'irr'
fields list[str] | None

Optional list of field UIDs to process (default: all)

None
overwrite bool

If True, replace existing data

False
min_ndvi float

Minimum valid NDVI value (default: 0.05)

0.05
apply_consecutive_filter bool

Remove lower of consecutive-day observations

True
workers int

Number of threads for parallel CSV reading (default: 1)

1

Returns:

Type Description
ProvenanceEvent

ProvenanceEvent recording the operation

etf(source_dir: str | Path, uid_column: str = 'FID', model: str = 'ssebop', mask: str = 'irr', instrument: str = 'landsat', fields: list[str] | None = None, overwrite: bool = False, min_etf: float = 0.05, workers: int = 1) -> ProvenanceEvent

Ingest ET fraction data from Earth Engine CSV exports.

Parameters:

Name Type Description Default
source_dir str | Path

Directory containing CSV files

required
uid_column str

Column name for field UID in CSVs (default: "FID")

'FID'
model str

ET model ("ssebop", "ptjpl", "sims", "eemetric", etc.)

'ssebop'
mask str

Mask type ("irr", "inv_irr", "no_mask")

'irr'
instrument str

Source instrument ("landsat", "ecostress")

'landsat'
fields list[str] | None

Optional list of field UIDs to process

None
overwrite bool

If True, replace existing data

False
min_etf float

Minimum valid ETf value (default: 0.05). Values below this are treated as noise/artifacts and set to NaN.

0.05
workers int

Number of threads for parallel CSV reading (default: 1)

1

Returns:

Type Description
ProvenanceEvent

ProvenanceEvent recording the operation

gridmet(source_dir: str | Path, grid_shapefile: str | Path | None = None, grid_mapping: str | Path | dict[str, int] | GridMapping | None = None, uid_column: str = 'FID', grid_column: str = 'GFID', variables: list[str] | None = None, include_corrected: bool = True, overwrite: bool = False) -> ProvenanceEvent

Ingest GridMET meteorology data from Parquet files.

GridMET data is downloaded at grid cell resolution (4km), where multiple fields may share the same grid cell. This method can operate in two modes:

  1. Mapped mode (grid_shapefile or grid_mapping provided): Uses a UID-to-GFID mapping to replicate grid cell data across fields that share the same cell. Files are named {gfid}.parquet.

  2. Direct mode (no mapping provided): Looks for files named {uid}.parquet directly. Use this when each field has its own unique parquet file (e.g., sparse flux stations).

Parameters:

Name Type Description Default
source_dir str | Path

Directory containing Parquet files

required
grid_shapefile str | Path | None

Shapefile with UID and GFID columns for mapping

None
grid_mapping str | Path | dict[str, int] | GridMapping | None

Alternative to grid_shapefile - can be: - Path to JSON file with {uid: gfid, ...} mapping - Dict with {uid: gfid, ...} mapping - GridMapping instance

None
uid_column str

Column name for field UID in shapefile (default: "FID")

'FID'
grid_column str

Column name for grid ID in shapefile (default: "GFID")

'GFID'
variables list[str] | None

Variables to ingest (default: all available)

None
include_corrected bool

Include bias-corrected ET variables (eto_corr, etr_corr)

True
overwrite bool

If True, replace existing data

False

Returns:

Type Description
ProvenanceEvent

ProvenanceEvent recording the operation

era5(source_dir: str | Path, variables: list[str] | None = None, field_mapping: dict[str, str] | None = None, overwrite: bool = False) -> ProvenanceEvent

Ingest ERA5 meteorology data from monthly CSV exports.

Handles the column format: {param}_{YYYYMMDD} (e.g., eto_20170115)

Parameters:

Name Type Description Default
source_dir str | Path

Directory containing ERA5 CSV files

required
variables list[str] | None

Variables to ingest (default: swe, eto, tmean, tmin, tmax, prcp, srad)

None
field_mapping dict[str, str] | None

Optional UID to met-file mapping

None
overwrite bool

If True, replace existing data

False

Returns:

Type Description
ProvenanceEvent

ProvenanceEvent recording the operation

snodas(source_dir: str | Path, uid_column: str = 'FID', fields: list[str] | None = None, overwrite: bool = False) -> ProvenanceEvent

Ingest SNODAS snow water equivalent data from Earth Engine CSV extracts.

Parameters:

Name Type Description Default
source_dir str | Path

Directory containing CSV files from Earth Engine export. CSV format: rows=fields, columns=dates (YYYYMMDD), values=SWE in meters.

required
uid_column str

Column name for field UID in CSVs (default: "FID")

'FID'
fields list[str] | None

Optional list of field UIDs to process

None
overwrite bool

If True, replace existing data

False

Returns:

Type Description
ProvenanceEvent

ProvenanceEvent recording the operation

properties(lulc_csv: str | Path | None = None, soils_csv: str | Path | None = None, irr_csv: str | Path | None = None, location_csv: str | Path | None = None, uid_column: str = 'FID', lulc_column: str = 'modis_lc', extra_lulc_column: str | None = 'glc10_lc', overwrite: bool = False) -> ProvenanceEvent

Ingest static field properties from CSV files.

Applies LULC override logic: 1. GLCLand10 crop code (10) overrides non-crop MODIS codes to cropland (12) 2. Mean irrigation > 0.3 overrides to cropland (12)

Parameters:

Name Type Description Default
lulc_csv str | Path | None

CSV with land use/land cover data

None
soils_csv str | Path | None

CSV with soil properties (AWC, clay, sand, ksat)

None
irr_csv str | Path | None

CSV with irrigation fraction data

None
location_csv str | Path | None

CSV with location data (lat, lon, elevation)

None
uid_column str

Column name for field UID in CSVs

'FID'
lulc_column str

Column name for LULC code (default: modis_lc)

'modis_lc'
extra_lulc_column str | None

Column for secondary LULC (default: glc10_lc)

'glc10_lc'
overwrite bool

If True, replace existing data

False

Returns:

Type Description
ProvenanceEvent

ProvenanceEvent recording the operation

dynamics(dynamics_json: str | Path, overwrite: bool = False) -> ProvenanceEvent

Ingest pre-computed dynamics data from JSON file.

Parameters:

Name Type Description Default
dynamics_json str | Path

Path to JSON file with dynamics data

required
overwrite bool

If True, replace existing data

False

Returns:

Type Description
ProvenanceEvent

ProvenanceEvent recording the operation

_parse_ee_remote_sensing_csvs(source_dir: Path, instrument: str, parameter: str, uid_column: str, fields: list[str] | None = None, mask: str | None = None, workers: int = 1) -> pd.DataFrame

Parse Earth Engine CSV exports into a unified DataFrame.

CSV format: rows=fields (identified by uid_column), columns=dates (YYYYMMDD). Handles date parsing from column names: - Landsat: PARAM_YYYYMMDD (e.g., NDVI_20170115) - Sentinel: YYYYMMDD_... (e.g., 20170115_S2A)

File naming convention for mask filtering: - ndvi_{field_id}{mask}{year}.csv - {model}etf{field_id}{mask}{year}.csv

Parameters:

Name Type Description Default
source_dir Path

Directory containing CSV files

required
instrument str

Source instrument ("landsat", "sentinel", etc.)

required
parameter str

Data type ("ndvi" or "etf")

required
uid_column str

Column name for field UID in CSVs

required
fields list[str] | None

Optional list of field UIDs to process

None
mask str | None

Optional mask type to filter files ("irr", "inv_irr", "no_mask")

None
workers int

Number of threads for parallel CSV reading (default: 1)

1

Returns:

Type Description
DataFrame

DataFrame with DatetimeIndex and field UIDs as columns

_filter_files_by_mask(csv_files: list[Path], mask: str) -> list[Path] staticmethod

Filter CSV file list by mask pattern in filename.

_apply_ndvi_filters(df: pd.DataFrame, min_ndvi: float, apply_consecutive_filter: bool) -> pd.DataFrame

Apply quality filters to NDVI data.

  1. Replace values below min_ndvi with NaN
  2. Remove lower of two consecutive-day observations

_write_timeseries(path: str, data: pd.DataFrame, fields: list[str] | None, overwrite: bool = False) -> int

Write time series DataFrame to container Zarr array.

Parameters:

Name Type Description Default
path str

Target path in container

required
data DataFrame

DataFrame with DatetimeIndex and field columns

required
fields list[str] | None

Optional field filter

required
overwrite bool

If True, overwrite existing array

False

Returns:

Type Description
int

Number of non-NaN values written

_load_gridded_variable(source_dir: Path, variable: str, grid_mapping: GridMapping) -> pd.DataFrame

Load a variable from grid-cell-based parquet files.

Replicates timeseries across all fields mapped to each grid cell. This handles the case where multiple fields share the same GridMET cell (or other coarse-resolution grid).

Parameters:

Name Type Description Default
source_dir Path

Directory containing {grid_id}.parquet files

required
variable str

Variable name to extract (e.g., 'eto', 'tmax')

required
grid_mapping GridMapping

GridMapping with UID→grid_id relationships

required

Returns:

Type Description
DataFrame

DataFrame with columns=field_uids, index=dates

Raises:

Type Description
ValueError

If legacy MultiIndex format is detected

_load_uid_variable(source_dir: Path, variable: str) -> pd.DataFrame

Load a variable from UID-named parquet files (direct mode).

Looks for files named {uid}.parquet directly, without grid mapping. Use this for sparse field networks where each field has its own unique parquet file.

Parameters:

Name Type Description Default
source_dir Path

Directory containing {uid}.parquet files

required
variable str

Variable name to extract (e.g., 'eto', 'tmax')

required

Returns:

Type Description
DataFrame

DataFrame with columns=field_uids, index=dates

_parse_era5_csvs(source_dir: Path, param_mapping: dict[str, str]) -> dict[str, pd.DataFrame]

Parse ERA5 monthly CSV exports using vectorized operations.

Column format: {param}_{YYYYMMDD} (e.g., eto_20170115)

Returns:

Type Description
dict[str, DataFrame]

Dict mapping field_uid to DataFrame with parameter columns

_extract_variable_from_site_data(site_data: dict[str, pd.DataFrame], variable: str) -> pd.DataFrame

Extract a single variable from site-level DataFrames.

_load_snodas_extracts(source_dir: Path, uid_column: str, fields: list[str] | None) -> pd.DataFrame

Load SNODAS SWE data from Earth Engine CSV extracts.

CSV format: rows=fields, columns=dates (YYYYMMDD), values=SWE in meters. Values are converted to millimeters (*1000). See src/swimrs/units.py (SNODAS_DAILY_UNITS).

Parameters:

Name Type Description Default
source_dir Path

Directory containing CSV files

required
uid_column str

Column name for field UID

required
fields list[str] | None

Optional list of field UIDs to filter

required

Returns:

Type Description
DataFrame

DataFrame with DatetimeIndex and field UIDs as columns, SWE in mm

_ingest_lulc(lulc_csv: Path, uid_column: str, lulc_column: str, extra_lulc_column: str | None, irrigation_csv: str | Path | None, overwrite: bool) -> None

Ingest LULC data with override logic.

_ingest_soils(soils_csv: Path, uid_column: str, overwrite: bool) -> None

Ingest soil properties.

Expected units (canonical SWIM-RS): - awc: meters of water per meter soil (m/m) in source CSV; stored as-is in the container and converted to mm/m when building SwimInput. - ksat: mm/day. This is converted to mm/hr internally for IER runoff. See src/swimrs/units.py (PROCESS_CANONICAL_UNITS).

_ingest_irrigation(irrigation_csv: Path, uid_column: str, overwrite: bool) -> None

Ingest irrigation fraction data (mean and per-year).

_ingest_location(location_csv: Path, uid_column: str, overwrite: bool) -> None

Ingest location data (lat, lon, elevation).

Calculator

Bases: Component

Component for computing derived data products.

Provides methods for computing dynamics (irrigation detection, groundwater subsidy, K-parameters) and merging multi-sensor NDVI.

Example

container.compute.dynamics(etf_model="ssebop") container.compute.merged_ndvi()

__init__(state: ContainerState, container=None)

Initialize the Calculator.

Parameters:

Name Type Description Default
state ContainerState

ContainerState instance

required
container

Optional reference to parent SwimContainer

None

merged_ndvi(masks: tuple[str, ...] = ('irr', 'inv_irr'), instruments: tuple[str, ...] = ('landsat', 'sentinel'), preference_order: tuple[str, ...] = ('landsat', 'sentinel'), overwrite: bool = False) -> ProvenanceEvent

Merge harmonized sensor time series into a unified NDVI dataset.

Systematic spectral differences between sensors are handled during GEE extraction via Spectral Bandpass Adjustment Factors (SBAF). This method performs a simple chronological merge, combining observations from multiple sensors into a denser time series.

References

Roy, D.P., et al. (2016). Characterization of Landsat-7 to Landsat-8 reflectance and NDVI differences. Remote Sensing of Environment.

Claverie, M., et al. (2018). The Harmonized Landsat and Sentinel-2 (HLS) product. Remote Sensing of Environment.

Parameters:

Name Type Description Default
masks tuple[str, ...]

Mask types to process (e.g., "irr", "inv_irr")

('irr', 'inv_irr')
instruments tuple[str, ...]

Instruments to merge (e.g., "landsat", "sentinel")

('landsat', 'sentinel')
preference_order tuple[str, ...]

When multiple sensors have data for the same date, prefer sensors in this order (default: Landsat > Sentinel)

('landsat', 'sentinel')
overwrite bool

If True, replace existing results

False

Returns:

Type Description
ProvenanceEvent

ProvenanceEvent recording the operation

fused_ndvi(*args, **kwargs) -> ProvenanceEvent

Deprecated: Use merged_ndvi() instead.

dynamics(etf_model: str = 'ssebop', irr_threshold: float = 0.1, masks: tuple[str, ...] = ('irr', 'inv_irr'), instruments: tuple[str, ...] = ('landsat', 'sentinel'), use_mask: bool = False, use_lulc: bool = True, lookback: int = 10, ndvi_threshold: float = 0.3, min_pos_days: int = 10, met_source: str = 'gridmet', fields: list[str] | None = None, overwrite: bool = False) -> ProvenanceEvent

Compute field dynamics: irrigation detection, groundwater subsidy, K-parameters.

This is the main computation method that: 1. Detects irrigation events per year using ETf/NDVI patterns 2. Calculates groundwater subsidy (ET/PPT ratio) 3. Extracts Ke (evaporation) and Kc (crop) parameters

Two modes for determining irrigation status:

use_mask=True (CONUS - Examples 4 & 5): - Reads per-year irrigation fraction from properties/irrigation/irr_yearly - Year is irrigated if f_irr > irr_threshold - Requires masks=("irr", "inv_irr")

use_lulc=True (International - Example 6): - Computes irrigation from water balance (ET/PPT ratio) - Year is irrigated if subsidy_months >= 3 AND field is cropped (LULC) - Works with masks=("no_mask",)

Parameters:

Name Type Description Default
etf_model str

ET model to use ("ssebop", "ptjpl", etc.)

'ssebop'
irr_threshold float

Fraction threshold for classifying irrigated years

0.1
masks tuple[str, ...]

Mask types to process

('irr', 'inv_irr')
instruments tuple[str, ...]

Instruments for NDVI data

('landsat', 'sentinel')
use_mask bool

If True, use irrigation mask properties (CONUS mode)

False
use_lulc bool

If True, use water balance + LULC (International mode)

True
lookback int

Days of lookback for irrigation window extension

10
ndvi_threshold float

NDVI threshold for window extension

0.3
min_pos_days int

Minimum consecutive positive slope days

10
met_source str

Meteorology source ("gridmet", "era5")

'gridmet'
fields list[str] | None

Optional list of field UIDs to process

None
overwrite bool

If True, replace existing results

False

Returns:

Type Description
ProvenanceEvent

ProvenanceEvent recording the operation

Raises:

Type Description
ValueError

If neither use_mask nor use_lulc is True

irrigation_windows(etf_model: str = 'ssebop', fields: list[str] | None = None) -> dict[str, dict]

Get irrigation windows for specific fields.

Returns per-year irrigation start/end dates and classifications.

Parameters:

Name Type Description Default
etf_model str

ET model to use

'ssebop'
fields list[str] | None

Fields to analyze

None

Returns:

Type Description
dict[str, dict]

Dict mapping field UID to yearly irrigation data

Note

This requires dynamics to have been computed first.

_merge_sensors(sensor_data: list[tuple[str, xr.DataArray]], preference_order: tuple[str, ...]) -> xr.DataArray

Merge multiple sensor DataArrays chronologically.

Since sensors are already harmonized via SBAF during extraction, this performs a simple merge. When multiple sensors have data for the same date, the preferred sensor's value is used.

Parameters:

Name Type Description Default
sensor_data list[tuple[str, DataArray]]

List of (instrument_name, DataArray) tuples

required
preference_order tuple[str, ...]

Instruments in order of preference

required

Returns:

Type Description
DataArray

Merged DataArray with values from all sensors

_load_dynamics_dataset(fields: list[str], etf_model: str, masks: tuple[str, ...], instruments: tuple[str, ...], met_source: str) -> xr.Dataset | None

Load all data needed for dynamics computation.

Combines data from multiple masks - uses primary mask data where available, falls back to secondary mask for fields with no data in primary mask. This handles cases where some fields have only irr mask data and others have only inv_irr mask data.

_load_ensemble_etf(fields: list[str], masks: tuple[str, ...], instruments: tuple[str, ...]) -> xr.DataArray | None

Compute the mean ETf across all available models in the container.

Discovers every remote_sensing/etf/{instrument}/{model}/{mask} path, loads each model's data (combining masks via fill), then returns the per-date, per-field mean.

Returns:

Type Description
DataArray | None

DataArray of ensemble-mean ETf, or None if no models found.

_compute_k_parameters(ds: xr.Dataset) -> tuple[xr.DataArray, xr.DataArray]

Compute ke_max and kc_max from ETf and NDVI data.

ke_max: 90th percentile of ETf where NDVI < 0.3 kc_max: 90th percentile of all ETf values

Uses only raw observations where both ETf and NDVI are present.

_compute_groundwater_subsidy(ds: xr.Dataset, irr_threshold: float) -> dict[str, dict]

Compute groundwater subsidy for each field and year.

Subsidy is detected when ET > precipitation (ratio > 1). f_sub = (ratio - 1) / ratio

Matches original SamplePlotDynamics._analyze_field_groundwater_subsidy().

_impute_missing_gwsub(site_data: dict, etf_years: list[int], missing_years: list[int]) -> dict

Impute groundwater subsidy for years without ETf data.

If >50% of years with data show subsidy (f_sub > 0.1), fills missing years with mean values from subsidized years. Matches original SamplePlotDynamics behavior.

_compute_irrigation_data(ds: xr.Dataset, irr_threshold: float, lookback: int, ndvi_threshold: float, min_pos_days: int, use_mask: bool, use_lulc: bool, irr_props: dict[str, dict[str, float]] | None = None) -> dict[str, dict]

Compute irrigation windows for each field and year.

Two modes for determining irrigation status:

use_mask=True (CONUS): Reads f_irr from irr_props (per-year irrigation fraction from properties). Year is irrigated if f_irr > irr_threshold.

use_lulc=True (International): Computes irrigation from water balance (ET/PPT ratio). Year is irrigated if subsidy_months >= 3 AND field is cropped.

Uses NDVI slope analysis to detect irrigation periods. Matches original SamplePlotDynamics behavior for exact parity.

_get_lulc_by_site(sites: np.ndarray) -> dict[str, int]

Get LULC code for each site from container properties.

_get_yearly_irrigation_properties() -> dict[str, dict[str, float]]

Get per-year irrigation fraction from container properties.

Returns:

Type Description
dict[str, dict[str, float]]

Dict mapping site_id -> {year_str: f_irr, ...}

dict[str, dict[str, float]]

e.g., {"US-FPe": {"2020": 0.0, "2021": 0.0}, "ALARC2_Smith6": {"2020": 1.0}}

_get_extended_year_ndvi(ndvi: xr.DataArray, site: str, year: int, years: list[int], time_index: pd.DatetimeIndex) -> pd.Series

Get NDVI with extended year context for boundary handling.

Uses ±1 year of data for smoother detection at year boundaries, matching the original SamplePlotDynamics._compose_ndvi() behavior.

_get_extended_year_ndvi_fast(site_ndvi_s: pd.Series, year: int, years: list[int]) -> pd.Series

Get NDVI with extended year context for boundary handling (pandas version).

Uses ±1 year of data for smoother detection at year boundaries. This is the fast pandas-based version of _get_extended_year_ndvi.

_backfill_irrigation_windows(irr_data: dict[str, dict], backfill_tracker: dict[str, list[int]]) -> dict[str, dict]

Backfill irrigation DOYs from nearest year with data.

For irrigated years that had no detected irrigation windows, copies the windows from the nearest year that has data. Matches original SamplePlotDynamics._backfill_irrigation_days().

_detect_irrigation_windows(ndvi_series: pd.Series, lookback: int, ndvi_threshold: float, min_pos_days: int, year: int) -> list[int]

Detect irrigation windows from NDVI time series.

Algorithm matches legacy SamplePlotDynamics._detect_irrigation_windows(): 1. Apply 32-day rolling mean to smooth NDVI 2. Compute slope (diff) 3. Find consecutive positive slope periods >= min_pos_days 4. Extend windows by lookback and until NDVI drops below threshold 5. Extend to include next group if its min NDVI > threshold

_write_dynamics_results(ke_max: xr.DataArray, kc_max: xr.DataArray, irr_data: dict[str, dict], gwsub_data: dict[str, dict], fields: list[str], overwrite: bool) -> None

Write computed dynamics results to container.

Exporter

Bases: Component

Component for exporting container data.

Provides methods for exporting data to various formats including shapefiles, CSVs, and observation files for calibration.

Example

container.export.shapefile("output/fields.shp") container.export.csv("remote_sensing/ndvi/landsat/irr", "output/ndvi/") container.export.observations("output/obs/", etf_model="ssebop")

__init__(state: ContainerState, container=None)

Initialize the Exporter.

Parameters:

Name Type Description Default
state ContainerState

ContainerState instance

required
container

Optional reference to parent SwimContainer

None

shapefile(output_path: str | Path, fields: list[str] | None = None) -> ProvenanceEvent

Export field geometries to shapefile.

Parameters:

Name Type Description Default
output_path str | Path

Output shapefile path (.shp)

required
fields list[str] | None

Optional list of field UIDs to export

None

Returns:

Type Description
ProvenanceEvent

ProvenanceEvent recording the operation

csv(path: str, output_dir: str | Path, format: str = 'wide', fields: list[str] | None = None) -> ProvenanceEvent

Export data at a zarr path to CSV files.

Parameters:

Name Type Description Default
path str

Zarr path to export (e.g., "remote_sensing/ndvi/landsat/irr")

required
output_dir str | Path

Directory for output CSV files

required
format str

Output format ("wide" or "long")

'wide'
fields list[str] | None

Optional list of field UIDs

None

Returns:

Type Description
ProvenanceEvent

ProvenanceEvent recording the operation

model_inputs(output_dir: str | Path, etf_model: str = 'ssebop', met_source: str = 'gridmet', fields: list[str] | None = None) -> ProvenanceEvent

Export model inputs to directory structure.

Creates separate files for each data type in a directory structure suitable for batch processing.

Parameters:

Name Type Description Default
output_dir str | Path

Base directory for outputs

required
etf_model str

ET model

'ssebop'
met_source str

Meteorology source

'gridmet'
fields list[str] | None

Optional list of field UIDs

None

Returns:

Type Description
ProvenanceEvent

ProvenanceEvent recording the operation

to_xarray(output_path: str | Path, variables: list[str] | None = None, fields: list[str] | None = None) -> ProvenanceEvent

Export data as a NetCDF file via xarray.

Parameters:

Name Type Description Default
output_path str | Path

Output NetCDF path (.nc)

required
variables list[str] | None

Variables to include (default: all time series)

None
fields list[str] | None

Fields to include (default: all)

None

Returns:

Type Description
ProvenanceEvent

ProvenanceEvent recording the operation

to_dataframe(path: str, fields: list[str] | None = None) -> pd.DataFrame

Export a single variable as a pandas DataFrame.

Parameters:

Name Type Description Default
path str

Zarr path to the variable

required
fields list[str] | None

Optional list of field UIDs

None

Returns:

Type Description
DataFrame

pd.DataFrame with DatetimeIndex and field columns

observations(output_dir: str | Path, etf_model: str = 'ssebop', masks: tuple[str, ...] = ('irr', 'inv_irr'), irr_threshold: float = 0.1, fields: list[str] | None = None, start_date: str | None = None, end_date: str | None = None) -> ProvenanceEvent

Export observation files for model calibration.

Creates per-field numpy files compatible with the SWIM-RS calibration workflow: - obs_etf_{fid}.np: ETf observations with mask switching - obs_swe_{fid}.np: SWE observations

The ETf mask switching logic matches prep_plots.preproc(): - Default to inv_irr (non-irrigated) mask - Switch to irr mask for years where f_irr >= irr_threshold

Parameters:

Name Type Description Default
output_dir str | Path

Directory for output files

required
etf_model str

ET model to use (e.g., "ssebop", "ptjpl")

'ssebop'
masks tuple[str, ...]

Mask types for ETf switching

('irr', 'inv_irr')
irr_threshold float

Threshold for irrigated year classification

0.1
fields list[str] | None

Fields to export (default: all)

None
start_date str | None

Optional start date filter

None
end_date str | None

Optional end date filter

None

Returns:

Type Description
ProvenanceEvent

ProvenanceEvent recording the operation

_build_switched_etf(fid: str, etf_data: dict[str, xr.DataArray], irr_data: dict[str, dict], masks: tuple[str, ...], irr_threshold: float, time_index: pd.DatetimeIndex) -> np.ndarray | None

Build ETf array with mask switching based on irrigation status.

Logic matches prep_plots.preproc(): - Start with inv_irr (non-irrigated) mask as base - For years where f_irr >= irr_threshold, use irr mask

_get_properties_dict(fields: list[str]) -> dict[str, dict]

Get field properties as a dictionary.

Used by build_swim_input to extract properties for HDF5 construction.

Parameters:

Name Type Description Default
fields list[str]

List of field UIDs to get properties for

required

Returns:

Type Description
dict[str, dict]

Dict mapping field UIDs to their property dictionaries

_get_dynamics_dict(fields: list[str]) -> dict[str, dict]

Get dynamics data for all fields as a dictionary.

Query

Bases: Component

Component for querying container data and status.

Provides methods for accessing data in various formats (xarray, pandas, geopandas), checking container status, and validating data completeness.

Example

container.query.status() df = container.query.dataframe("remote_sensing/ndvi/landsat/irr") gdf = container.query.geodataframe()

_container = container instance-attribute

__init__(state: ContainerState, container=None)

Initialize the Query component.

Parameters:

Name Type Description Default
state ContainerState

ContainerState instance

required
container

Optional reference to parent SwimContainer

None

status(detailed: bool = False) -> str

Get formatted status report of container contents.

Shows data coverage, completeness, and readiness for operations.

Parameters:

Name Type Description Default
detailed bool

If True, show detailed per-field statistics

False

Returns:

Type Description
str

Formatted status string

validate(operation: str = 'calibration', model: str = 'ssebop', mask: str = 'irr', met_source: str = 'gridmet', snow_source: str = 'snodas', instrument: str = 'landsat') -> ValidationResult

Validate container readiness for an operation.

Parameters:

Name Type Description Default
operation str

"calibration" or "forward_run"

'calibration'
model str

ET model to check

'ssebop'
mask str

Mask type to check

'irr'
met_source str

Meteorology source to check

'gridmet'
snow_source str

Snow source to check

'snodas'
instrument str

NDVI instrument to check

'landsat'

Returns:

Type Description
ValidationResult

ValidationResult with readiness status and details

validate_fields(min_area_m2: float = 0, require_awc: bool = True, require_ksat: bool = False, require_lulc: bool = True, require_ndvi: bool = True, require_etf: bool = True, require_meteorology: bool = True, min_ndvi_obs: int = 10, min_etf_obs: int = 5, etf_model: str = 'ssebop', mask: str = 'irr', instrument: str = 'landsat', met_source: str = 'gridmet') -> FieldValidationReport

Validate individual fields against criteria.

Parameters:

Name Type Description Default
min_area_m2 float

Minimum field area

0
require_awc bool

Require available water capacity

True
require_ksat bool

Require saturated hydraulic conductivity

False
require_lulc bool

Require land use/land cover

True
require_ndvi bool

Require NDVI observations

True
require_etf bool

Require ETf observations

True
require_meteorology bool

Require meteorology data

True
min_ndvi_obs int

Minimum NDVI observation count

10
min_etf_obs int

Minimum ETf observation count

5
etf_model str

ET model for ETf validation

'ssebop'
mask str

Mask type for validation

'irr'
instrument str

NDVI instrument for validation

'landsat'
met_source str

Meteorology source for validation

'gridmet'

Returns:

Type Description
FieldValidationReport

FieldValidationReport with per-field validation results

valid_fields(**kwargs) -> list[str]

Get list of valid field UIDs based on validation criteria.

Parameters:

Name Type Description Default
**kwargs

Arguments passed to validate_fields()

{}

Returns:

Type Description
list[str]

List of valid field UIDs

xarray(path: str, fields: list[str] | None = None, start_date: str | pd.Timestamp | None = None, end_date: str | pd.Timestamp | None = None, name: str | None = None) -> xr.DataArray

Get data as labeled xarray DataArray.

Parameters:

Name Type Description Default
path str

Zarr path (e.g., "remote_sensing/ndvi/landsat/irr")

required
fields list[str] | None

Optional field UIDs to include

None
start_date str | Timestamp | None

Optional start date

None
end_date str | Timestamp | None

Optional end date

None
name str | None

Optional name for DataArray

None

Returns:

Type Description
DataArray

xr.DataArray with 'time' and 'site' coordinates

dataset(paths: dict[str, str] | None = None, fields: list[str] | None = None, start_date: str | pd.Timestamp | None = None, end_date: str | pd.Timestamp | None = None) -> xr.Dataset

Get multiple variables as xarray Dataset.

Parameters:

Name Type Description Default
paths dict[str, str] | None

Mapping of var names to zarr paths

None
fields list[str] | None

Optional field UIDs to include

None
start_date str | Timestamp | None

Optional start date

None
end_date str | Timestamp | None

Optional end date

None

Returns:

Type Description
Dataset

xr.Dataset with requested variables

dataframe(path: str, fields: list[str] | None = None, start_date: str | pd.Timestamp | None = None, end_date: str | pd.Timestamp | None = None) -> pd.DataFrame

Get data as pandas DataFrame.

Parameters:

Name Type Description Default
path str

Zarr path

required
fields list[str] | None

Optional field UIDs

None
start_date str | Timestamp | None

Optional start date

None
end_date str | Timestamp | None

Optional end date

None

Returns:

Type Description
DataFrame

DataFrame with DatetimeIndex and field columns

geodataframe()

Get field geometries as GeoDataFrame.

Returns:

Type Description

gpd.GeoDataFrame with field geometries and properties

field_timeseries(uid: str, parameters: list[str] | None = None) -> pd.DataFrame

Get all time series for a single field.

Parameters:

Name Type Description Default
uid str

Field UID

required
parameters list[str] | None

Optional list of parameter paths to include. Can be zarr paths (e.g., "remote_sensing/ndvi/landsat/irr") or variable names (e.g., "ndvi_landsat_irr"). If None, includes all available time series.

None

Returns:

Type Description
DataFrame

DataFrame with DatetimeIndex and parameter columns, including:

DataFrame
  • All requested time series parameters
DataFrame
  • 'irr_doy': Binary flag (1 if day is in computed irrigation window, 0 otherwise)
Example

df = container.query.field_timeseries("US-Ne1") df = container.query.field_timeseries( "US-Ne1", parameters=["meteorology/gridmet/eto", "remote_sensing/ndvi/landsat/irr"] )

dynamics(uid: str) -> dict[str, Any]

Get computed dynamics for a field.

Parameters:

Name Type Description Default
uid str

Field UID

required

Returns:

Type Description
dict[str, Any]

Dict with ke_max, kc_max, irr (per-year), gwsub (per-year)

inventory()

Get the container inventory tracker.

Returns:

Type Description

Inventory instance for coverage analysis

Schema and enums

SwimSchema

Schema definition for SWIM data container.

Defines the complete structure of data that can be stored, including valid combinations of instruments, parameters, masks, etc.

VERSION = '1.0' class-attribute instance-attribute

PARAMETERS = {Parameter.NDVI: ParameterSpec(name='ndvi', dtype='float32', valid_range=(0.0, 1.0), units='', description='Normalized Difference Vegetation Index', required_for_model=True), Parameter.ETF: ParameterSpec(name='etf', dtype='float32', valid_range=(0.0, 2.0), units='', description='ET fraction (actual ET / reference ET)', required_for_model=True), Parameter.LST: ParameterSpec(name='lst', dtype='float32', valid_range=(200.0, 350.0), units='K', description='Land Surface Temperature'), Parameter.ETO: ParameterSpec(name='eto', dtype='float32', valid_range=(0.0, 20.0), units='mm/day', description='Reference evapotranspiration', required_for_model=True), Parameter.PRCP: ParameterSpec(name='prcp', dtype='float32', valid_range=(0.0, 500.0), units='mm/day', description='Precipitation', required_for_model=True), Parameter.TMIN: ParameterSpec(name='tmin', dtype='float32', valid_range=(-50.0, 50.0), units='C', description='Minimum temperature', required_for_model=True), Parameter.TMAX: ParameterSpec(name='tmax', dtype='float32', valid_range=(-50.0, 60.0), units='C', description='Maximum temperature', required_for_model=True), Parameter.SRAD: ParameterSpec(name='srad', dtype='float32', valid_range=(0.0, 40.0), units='MJ/m2/day', description='Solar radiation'), Parameter.SWE: ParameterSpec(name='swe', dtype='float32', valid_range=(0.0, 5000.0), units='mm', description='Snow water equivalent'), Parameter.U2: ParameterSpec(name='u2', dtype='float32', valid_range=(0.0, 50.0), units='m/s', description='Wind speed at 2m height'), Parameter.ELEV: ParameterSpec(name='elev', dtype='float32', valid_range=(-500.0, 9000.0), units='m', description='Elevation above sea level'), Parameter.ETO_CORR: ParameterSpec(name='eto_corr', dtype='float32', valid_range=(0.0, 20.0), units='mm/day', description='Bias-corrected reference evapotranspiration (grass)'), Parameter.ETR_CORR: ParameterSpec(name='etr_corr', dtype='float32', valid_range=(0.0, 25.0), units='mm/day', description='Bias-corrected reference evapotranspiration (alfalfa)')} class-attribute instance-attribute

REMOTE_SENSING_STRUCTURE = {'ndvi': {'instruments': [Instrument.LANDSAT, Instrument.SENTINEL, Instrument.ECOSTRESS], 'masks': [MaskType.IRR, MaskType.INV_IRR, MaskType.NO_MASK], 'models': []}, 'etf': {'instruments': [Instrument.LANDSAT, Instrument.ECOSTRESS], 'masks': [MaskType.IRR, MaskType.INV_IRR, MaskType.NO_MASK], 'models': list(ETModel)}, 'lst': {'instruments': [Instrument.ECOSTRESS], 'masks': [MaskType.NO_MASK], 'models': []}} class-attribute instance-attribute

METEOROLOGY_STRUCTURE = {'sources': [MetSource.GRIDMET, MetSource.ERA5, MetSource.NLDAS], 'variables': [Parameter.ETO, Parameter.ETR, Parameter.PRCP, Parameter.TMIN, Parameter.TMAX, Parameter.TMEAN, Parameter.SRAD, Parameter.VPD, Parameter.EA, Parameter.U2, Parameter.ELEV, Parameter.ETO_CORR, Parameter.ETR_CORR]} class-attribute instance-attribute

SNOW_STRUCTURE = {'sources': [SnowSource.SNODAS, SnowSource.ERA5], 'variables': [Parameter.SWE]} class-attribute instance-attribute

PROPERTIES_STRUCTURE = {'soils': ['awc', 'clay', 'sand', 'ksat', 'rock_ite', 'rew', 'source'], 'land_cover': ['modis_lc', 'cdl', 'glc10', 'lulc_code'], 'vegetation': ['rooting_depth'], 'irrigation': ['lanid', 'irrmapper', 'irr'], 'location': ['lat', 'lon', 'elevation', 'state', 'area_m2']} class-attribute instance-attribute

DERIVED_STRUCTURE = {'dynamics': ['ke_max', 'kc_max', 'irr_data', 'gwsub_data'], 'merged_ndvi': ['ndvi']} class-attribute instance-attribute

get_zarr_path(category, parameter, instrument=None, mask=None, model=None, source=None) classmethod

Generate the Zarr array path for a given data specification.

Examples:

remote_sensing/ndvi/landsat/irr remote_sensing/etf/landsat/ssebop/irr meteorology/gridmet/eto properties/soils/awc

validate_path(path) classmethod

Validate that a data path conforms to the schema.

list_all_paths() classmethod

List all valid data paths in the schema.

required_for_calibration(model='ssebop', mask='irr', met_source='gridmet', snow_source='snodas', instrument='landsat') classmethod

List data paths required to run calibration.

Parameters:

Name Type Description Default
model str

ET model to calibrate against (default: ssebop)

'ssebop'
mask str

Mask type to use (irr, inv_irr, or no_mask)

'irr'
met_source str

Meteorology source (gridmet, era5, nldas)

'gridmet'
snow_source str

Snow data source (snodas, era5)

'snodas'
instrument str

Remote sensing instrument (landsat, ecostress)

'landsat'

required_for_forward_run(model='ssebop', mask='irr', met_source='gridmet', instrument='landsat') classmethod

List data paths required to run forward model (uncalibrated).

Parameters:

Name Type Description Default
model str

ET model to use (default: ssebop)

'ssebop'
mask str

Mask type - can be 'irr', 'inv_irr', or 'no_mask'

'irr'
met_source str

Meteorology source (gridmet, era5, nldas)

'gridmet'
instrument str

Remote sensing instrument (landsat, ecostress)

'landsat'

Instrument

Bases: str, Enum

Remote sensing instruments.

LANDSAT = 'landsat' class-attribute instance-attribute

SENTINEL = 'sentinel' class-attribute instance-attribute

ECOSTRESS = 'ecostress' class-attribute instance-attribute

COMBINED = 'combined' class-attribute instance-attribute

MaskType

Bases: str, Enum

Irrigation mask types.

IRR = 'irr' class-attribute instance-attribute

INV_IRR = 'inv_irr' class-attribute instance-attribute

NO_MASK = 'no_mask' class-attribute instance-attribute

ETModel

Bases: str, Enum

ET fraction models.

SSEBOP = 'ssebop' class-attribute instance-attribute

PTJPL = 'ptjpl' class-attribute instance-attribute

SIMS = 'sims' class-attribute instance-attribute

EEMETRIC = 'eemetric' class-attribute instance-attribute

DISALEXI = 'disalexi' class-attribute instance-attribute

GEESEBAL = 'geesebal' class-attribute instance-attribute

StorageProvider

Bases: ABC

Abstract interface for container storage backends.

Implementations handle the specifics of opening, closing, and managing zarr groups for different storage types while presenting a unified interface.

Attributes:

Name Type Description
_mode

Current access mode ('r', 'r+', 'a', 'w')

_root Group | None

The opened zarr root group (None when closed)

_mode = mode instance-attribute

_root: zarr.Group | None = None instance-attribute

_store = None instance-attribute

mode: str property

Current access mode.

is_open: bool property

Whether the storage is currently open.

is_writable: bool property

Whether the storage is open in a writable mode.

root: zarr.Group | None property

The zarr root group, or None if not open.

uri: str abstractmethod property

Return a URI identifying this storage location.

Examples:

  • "file:///path/to/container.swim"
  • "s3://bucket/key/container.zarr"
  • "memory://container"

location: str | Path abstractmethod property

Return the location in the most appropriate native format.

For local storage, returns Path. For cloud storage, returns string URL.

__init__(mode: str = 'r')

Initialize the storage provider.

Parameters:

Name Type Description Default
mode str

Access mode: - 'r': Read-only (default) - 'r+': Read-write on existing - 'a': Append (create if doesn't exist) - 'w': Write (overwrite if exists)

'r'

open() -> zarr.Group abstractmethod

Open the storage and return the root zarr group.

Returns:

Type Description
Group

zarr.Group: The root group of the container

Raises:

Type Description
FileNotFoundError

If storage doesn't exist and mode is 'r' or 'r+'

FileExistsError

If storage exists and mode is 'x'

close() -> None abstractmethod

Close the storage connection.

Should safely flush any pending writes and release resources.

exists() -> bool abstractmethod

Check if the storage location exists.

Returns:

Name Type Description
bool bool

True if storage exists and is accessible

delete() -> None abstractmethod

Delete the storage location.

Raises:

Type Description
PermissionError

If deletion is not permitted

__enter__() -> StorageProvider

Context manager entry - opens the storage.

__exit__(exc_type, exc_val, exc_tb) -> bool

Context manager exit - closes the storage.

__repr__() -> str