Skip to content

API Reference

CLI

cal_disp.cli

cli_app

cli_app(ctx: Context, debug: bool) -> None

Run a DISP calibration workflow.

Source code in src/cal_disp/cli/__init__.py
 6
 7
 8
 9
10
11
12
13
14
@click.group(name="cal-disp")
@click.version_option()
@click.option("--debug", is_flag=True, help="Add debug messages to the log.")
@click.pass_context
def cli_app(ctx: click.Context, debug: bool) -> None:
    """Run a DISP calibration workflow."""
    # https://click.palletsprojects.com/en/8.1.x/commands/#nested-handling-and-contexts
    ctx.ensure_object(dict)
    ctx.obj["debug"] = debug

download

burst_bounds

burst_bounds(input_file: Path, output_dir: Path) -> None

Download S1 CSLC boundary tiles for a DISP-S1 file.

Extracts frame ID and sensing times from the DISP-S1 filename, then downloads corresponding Sentinel-1 burst boundary geometries. Output directory is created if it doesn't exist.

Only supports DISP-S1 products (sensor must be S1).

Examples:

Basic usage: $ cal-disp download burst-bounds -i disp_product.nc -o ./burst_data

Full path example: $ cal-disp download burst-bounds \ -i OPERA_L3_DISP-S1_IW_F08882_VV_20220111T002651Z_20220722T002657Z_v1.0.nc \ -o ./burst_bounds

Source code in src/cal_disp/cli/download.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
@download_group.command(name="burst-bounds")
@click.option(
    "--input-file",
    "-i",
    type=click.Path(exists=True, dir_okay=False, path_type=Path),
    required=True,
    help="Path to OPERA DISP-S1 product (.nc file).",
)
@click.option(
    "--output-dir",
    "-o",
    type=click.Path(file_okay=False, path_type=Path),
    required=True,
    help="Directory to save burst boundary tiles.",
)
def burst_bounds(
    input_file: Path,
    output_dir: Path,
) -> None:
    r"""Download S1 CSLC boundary tiles for a DISP-S1 file.

    Extracts frame ID and sensing times from the DISP-S1 filename,
    then downloads corresponding Sentinel-1 burst boundary geometries.
    Output directory is created if it doesn't exist.

    Only supports DISP-S1 products (sensor must be S1).

    Examples
    --------
    Basic usage:
        $ cal-disp download burst-bounds -i disp_product.nc -o ./burst_data

    Full path example:
        $ cal-disp download burst-bounds \\
        -i OPERA_L3_DISP-S1_IW_F08882_VV_20220111T002651Z_20220722T002657Z_v1.0.nc \\
        -o ./burst_bounds

    """
    from cal_disp.download import generate_s1_burst_tiles

    # Parse filename: OPERA_L3_DISP-S1_IW_F{frame}_VV_{dates}...
    parts = input_file.stem.split("_")
    sensor = parts[2].split("-")[1]  # DISP-S1 -> S1
    frame_id = int(parts[4].lstrip("F"))  # F08882 -> 8882

    if sensor != "S1":
        raise click.ClickException(f"Only DISP-S1 products supported, got: {sensor}")

    sensing_times = extract_sensing_times_from_file(input_file)
    output_dir.mkdir(exist_ok=True, parents=True)

    for sensing_time in sensing_times:
        click.echo(f"Downloading burst bounds for {sensing_time}")
        generate_s1_burst_tiles(
            frame_id=frame_id,
            sensing_time=sensing_time,
            output_dir=output_dir,
        )

    click.echo(f"Download complete: {output_dir}")

disp_s1

disp_s1(frame_id: int, output_dir: Path, start: datetime | None, end: datetime | None, num_workers: int) -> None

Download OPERA DISP-S1 products for a frame.

Downloads displacement products from the OPERA DISP-S1 archive for the specified frame and date range. Products are filtered based on the secondary date of each interferogram.

Examples:

Download all products for a frame: $ cal-disp download disp-s1 --frame-id 8882 -o ./disp_data

Download products for specific date range: $ cal-disp download disp-s1 --frame-id 8882 -o ./disp_data \ --start 2022-07-01 --end 2022-07-31

Use more workers for faster downloads: $ cal-disp download disp-s1 --frame-id 8882 -o ./disp_data -n 8

Source code in src/cal_disp/cli/download.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
@download_group.command()
@click.option(
    "--frame-id",
    type=int,
    required=True,
    help="OPERA DISP-S1 frame ID.",
    show_default=True,
)
@click.option(
    "--output-dir",
    "-o",
    type=click.Path(file_okay=False, path_type=Path),
    required=True,
    help="Directory to save downloaded DISP products.",
)
@click.option(
    "--start",
    "-s",
    type=click.DateTime(formats=["%Y-%m-%d"]),
    default=None,
    help="Start date (YYYY-MM-DD). Based on secondary date of DISP.",
)
@click.option(
    "--end",
    "-e",
    type=click.DateTime(formats=["%Y-%m-%d"]),
    default=None,
    help="End date (YYYY-MM-DD). Based on secondary date of DISP.",
)
@click.option(
    "--num-workers",
    "-n",
    type=int,
    default=1,
    help="Number of parallel download workers.",
    show_default=True,
)
def disp_s1(
    frame_id: int,
    output_dir: Path,
    start: datetime | None,
    end: datetime | None,
    num_workers: int,
) -> None:
    r"""Download OPERA DISP-S1 products for a frame.

    Downloads displacement products from the OPERA DISP-S1 archive for
    the specified frame and date range. Products are filtered based on
    the secondary date of each interferogram.

    Examples
    --------
    Download all products for a frame:
        $ cal-disp download disp-s1 --frame-id 8882 -o ./disp_data

    Download products for specific date range:
        $ cal-disp download disp-s1 --frame-id 8882 -o ./disp_data \\
            --start 2022-07-01 --end 2022-07-31

    Use more workers for faster downloads:
        $ cal-disp download disp-s1 --frame-id 8882 -o ./disp_data -n 8

    """
    from cal_disp.download import download_disp

    download_disp(
        frame_id=frame_id,
        output_dir=output_dir,
        start=start,
        end=end,
        num_workers=num_workers,
    )
    click.echo(f"Download complete: {output_dir}")

download_group

download_group()

Sub-commands for downloading prerequisite data.

Source code in src/cal_disp/cli/download.py
11
12
13
14
15
16
@click.group(name="download")
def download_group():
    """Sub-commands for downloading prerequisite data."""
    from cal_disp._log import setup_logging

    setup_logging(logger_name="cal_disp")

tropo

tropo(input_file: Path, output_dir: Path, num_workers: int, interp: bool) -> None

Download OPERA TROPO for a DISP-S1 file.

Extracts sensing times from the input DISP-S1 product filename and downloads corresponding OPERA TROPO data. Output directory is created if it doesn't exist.

The input file must follow OPERA naming convention: OPERA_L3_DISP-S1_IW_F{frame}VV{ref_date}{sec_date}_v1.0{proc_date}.nc

Examples:

Basic usage: $ cal-disp download tropo -i disp_product.nc -o ./tropo_data

With temporal interpolation: $ cal-disp download tropo -i disp_product.nc -o ./tropo_data --interp

Using more workers: $ cal-disp download tropo -i disp_product.nc -o ./tropo_data -n 8

Source code in src/cal_disp/cli/download.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
@download_group.command()
@click.option(
    "--input-file",
    "-i",
    type=click.Path(exists=True, dir_okay=False, path_type=Path),
    required=True,
    help="Path to OPERA DISP-S1 product (.nc file).",
)
@click.option(
    "--output-dir",
    "-o",
    type=click.Path(file_okay=False, path_type=Path),
    required=True,
    help="Directory to save downloaded TROPO products.",
)
@click.option(
    "--num-workers",
    "-n",
    type=int,
    default=4,
    help="Number of parallel download workers.",
    show_default=True,
)
@click.option(
    "--interp",
    is_flag=True,
    help="Download 2 scenes per date for temporal interpolation.",
)
def tropo(
    input_file: Path,
    output_dir: Path,
    num_workers: int,
    interp: bool,
) -> None:
    """Download OPERA TROPO for a DISP-S1 file.

    Extracts sensing times from the input DISP-S1 product filename and
    downloads corresponding OPERA TROPO data. Output directory
    is created if it doesn't exist.

    The input file must follow OPERA naming convention:
    OPERA_L3_DISP-S1_IW_F{frame}_VV_{ref_date}_{sec_date}_v1.0_{proc_date}.nc

    Examples
    --------
    Basic usage:
        $ cal-disp download tropo -i disp_product.nc -o ./tropo_data

    With temporal interpolation:
        $ cal-disp download tropo -i disp_product.nc -o ./tropo_data --interp

    Using more workers:
        $ cal-disp download tropo -i disp_product.nc -o ./tropo_data -n 8

    """
    from cal_disp.download import download_tropo

    sensing_times = extract_sensing_times_from_file(input_file)
    output_dir.mkdir(exist_ok=True, parents=True)

    download_tropo(
        disp_times=sensing_times,
        output_dir=output_dir,
        num_workers=num_workers,
        interp=interp,
    )
    click.echo(f"Download complete: {output_dir}")

unr

unr(frame_id: int, output_dir: Path, start: datetime | None, end: datetime | None, margin: float) -> None

Download UNR GPS timeseries data for a DISP-S1 frame.

Downloads GPS timeseries grid from the Nevada Geodetic Laboratory (UNR) within the frame's bounding box (expanded by margin). Data is saved as a parquet file for efficient loading. Output directory is created if it doesn't exist.

Examples:

Download UNR data for a frame: $ cal-disp download unr --frame-id 8882 -o ./unr_data

With date range: $ cal-disp download unr --frame-id 8882 -o ./unr_data \ --start 2022-01-01 --end 2023-12-31

Expand bounding box by 1 degree: $ cal-disp download unr --frame-id 8882 -o ./unr_data -m 1.0

Source code in src/cal_disp/cli/download.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
@download_group.command()
@click.option(
    "--frame-id",
    type=int,
    required=True,
    help="OPERA DISP-S1 frame ID.",
    show_default=True,
)
@click.option(
    "--output-dir",
    "-o",
    type=click.Path(file_okay=False, path_type=Path),
    required=True,
    help="Directory to save downloaded UNR parquet file.",
)
@click.option(
    "--start",
    "-s",
    type=click.DateTime(formats=["%Y-%m-%d"]),
    default=None,
    help="Start date of timeseries (YYYY-MM-DD).",
)
@click.option(
    "--end",
    "-e",
    type=click.DateTime(formats=["%Y-%m-%d"]),
    default=None,
    help="End date of timeseries (YYYY-MM-DD).",
)
@click.option(
    "--margin",
    "-m",
    type=float,
    default=0.5,
    help="Margin in degrees to expand frame bounding box.",
    show_default=True,
)
def unr(
    frame_id: int,
    output_dir: Path,
    start: datetime | None,
    end: datetime | None,
    margin: float,
) -> None:
    r"""Download UNR GPS timeseries data for a DISP-S1 frame.

    Downloads GPS timeseries grid from the Nevada Geodetic Laboratory (UNR)
    within the frame's bounding box (expanded by margin). Data is saved
    as a parquet file for efficient loading. Output directory is created
    if it doesn't exist.

    Examples
    --------
    Download UNR data for a frame:
        $ cal-disp download unr --frame-id 8882 -o ./unr_data

    With date range:
        $ cal-disp download unr --frame-id 8882 -o ./unr_data \\
            --start 2022-01-01 --end 2023-12-31

    Expand bounding box by 1 degree:
        $ cal-disp download unr --frame-id 8882 -o ./unr_data -m 1.0

    """
    from cal_disp.download import download_unr_grid

    output_dir.mkdir(exist_ok=True, parents=True)

    download_unr_grid(
        frame_id=frame_id,
        output_dir=output_dir,
        start=start,
        end=end,
        margin_deg=margin,
    )
    click.echo(f"Download complete: {output_dir}")

Download

cal_disp.download

download_disp

download_disp(frame_id: int, output_dir: Path, start: datetime | None = None, end: datetime | None = None, num_workers: int = DEFAULT_NUM_WORKERS) -> None

Download DISP-S1 products for a frame.

Downloads displacement products from the OPERA DISP-S1 archive for the specified frame and date range. Products are filtered based on the secondary date of each interferogram.

Parameters:

Name Type Description Default
frame_id int

OPERA frame identifier.

required
output_dir Path

Directory where products will be saved.

required
start datetime or None

Start date for query (based on secondary date). Default is None (no start limit).

None
end datetime or None

End date for query (based on secondary date). Default is None (no end limit).

None
num_workers int

Number of parallel download workers. Default is 2.

DEFAULT_NUM_WORKERS

Raises:

Type Description
ValueError

If frame ID is not in the database or no products are found.

Notes

Date queries are based on the secondary (later) date of each interferometric pair. If start and end dates are identical, the range is automatically expanded by ±1 day and num_workers is set to 1 to ensure the specific product is captured.

Examples:

>>> download_frame_products(
...     frame_id=8887,
...     output_dir=Path("./data"),
...     start=datetime(2024, 1, 1),
...     end=datetime(2024, 12, 31)
... )
Source code in src/cal_disp/download/_stage_disp.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def download_disp(
    frame_id: int,
    output_dir: Path,
    start: datetime | None = None,
    end: datetime | None = None,
    num_workers: int = DEFAULT_NUM_WORKERS,
) -> None:
    """Download DISP-S1 products for a frame.

    Downloads displacement products from the OPERA DISP-S1 archive for
    the specified frame and date range. Products are filtered based on
    the secondary date of each interferogram.

    Parameters
    ----------
    frame_id : int
        OPERA frame identifier.
    output_dir : Path
        Directory where products will be saved.
    start : datetime or None, optional
        Start date for query (based on secondary date).
        Default is None (no start limit).
    end : datetime or None, optional
        End date for query (based on secondary date).
        Default is None (no end limit).
    num_workers : int, optional
        Number of parallel download workers. Default is 2.

    Raises
    ------
    ValueError
        If frame ID is not in the database or no products are found.

    Notes
    -----
    Date queries are based on the secondary (later) date of each
    interferometric pair. If start and end dates are identical,
    the range is automatically expanded by ±1 day and num_workers
    is set to 1 to ensure the specific product is captured.

    Examples
    --------
    >>> download_frame_products(
    ...     frame_id=8887,
    ...     output_dir=Path("./data"),
    ...     start=datetime(2024, 1, 1),
    ...     end=datetime(2024, 12, 31)
    ... )

    """
    logger.info(f"Downloading DISP-S1 products for frame {frame_id}")

    start_adjusted, end_adjusted = _adjust_single_date_range(start, end)
    logger.info(f"Search window: {start_adjusted} - {end_adjusted}")

    workers = 1 if (start and end and start == end) else num_workers

    run_download(
        frame_id,
        start_datetime=start_adjusted,
        end_datetime=end_adjusted,
        output_dir=Path(output_dir),
        num_workers=workers,
    )

    logger.info(f"Download complete: files saved to {output_dir}")

download_tropo

download_tropo(disp_times: list[datetime | Timestamp], output_dir: Path | str, num_workers: int = 4, interp: bool = True) -> None

Download tropospheric correction data for displacement times.

Parameters:

Name Type Description Default
disp_times list of datetime or pd.Timestamp

Displacement measurement times

required
output_dir Path or str

Output directory for downloads

required
num_workers int

Parallel download workers

4
interp bool

If True, get 2 scenes per time (for interpolation). If False, get single nearest scene.

True
Source code in src/cal_disp/download/_stage_tropo.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def download_tropo(
    disp_times: list[datetime | pd.Timestamp],
    output_dir: Path | str,
    num_workers: int = 4,
    interp: bool = True,
) -> None:
    """Download tropospheric correction data for displacement times.

    Parameters
    ----------
    disp_times : list of datetime or pd.Timestamp
        Displacement measurement times
    output_dir : Path or str
        Output directory for downloads
    num_workers : int
        Parallel download workers
    interp : bool
        If True, get 2 scenes per time (for interpolation).
        If False, get single nearest scene.

    """
    out = Path(output_dir)
    out.mkdir(exist_ok=True, parents=True)

    n_scenes = 2 if interp else 1
    all_scenes = []

    for t in disp_times:
        scenes = find_nearest_scenes(t, num_scenes=n_scenes)
        all_scenes.extend(scenes)

    combined = asf.ASFSearchResults(all_scenes)
    logger.info(f"Downloading {len(combined)} scenes to {out}")
    combined.download(path=out, processes=num_workers)

download_unr_grid

download_unr_grid(frame_id: int, output_dir: Path, start: datetime | None = None, end: datetime | None = None, margin_deg: float = 0.5, plate: Literal['NA', 'PA', 'IGS14', 'IGS20'] = 'IGS20', version: Literal['0.1', '0.2'] = '0.2') -> None

Download UNR gridded GNSS timeseries for a given frame.

Parameters:

Name Type Description Default
frame_id int

OPERA frame identifier.

required
output_dir Path

Output directory for downloaded data.

required
start datetime or None

Start date for timeseries. If None, downloads from beginning.

None
end datetime or None

End date for timeseries. If None, downloads until present.

None
margin_deg float

Margin in degrees to expand frame bounding box.

0.5
plate (NA, PA, IGS14, IGS20)

Reference plate for velocity computation.

"NA"
version (0.1, 0.2)

UNR grid version to download.

"0.1"
Source code in src/cal_disp/download/_stage_unr.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def download_unr_grid(
    frame_id: int,
    output_dir: Path,
    start: datetime | None = None,
    end: datetime | None = None,
    margin_deg: float = 0.5,
    plate: Literal["NA", "PA", "IGS14", "IGS20"] = "IGS20",
    version: Literal["0.1", "0.2"] = "0.2",
) -> None:
    """Download UNR gridded GNSS timeseries for a given frame.

    Parameters
    ----------
    frame_id : int
        OPERA frame identifier.
    output_dir : Path
        Output directory for downloaded data.
    start : datetime or None, optional
        Start date for timeseries. If None, downloads from beginning.
    end : datetime or None, optional
        End date for timeseries. If None, downloads until present.
    margin_deg : float, default=0.5
        Margin in degrees to expand frame bounding box.
    plate : {"NA", "PA", "IGS14", "IGS20"}, default="IGS20"
        Reference plate for velocity computation.
    version : {"0.1", "0.2"}, default="0.2"
        UNR grid version to download.

    """
    logger.info(
        f"Downloading UNR gridded GNSS timeseries for frame {frame_id} "
        f"from {start or 'beginning'} to {end or 'present'}"
    )

    selected_frame = get_frame_geojson([frame_id], as_geodataframe=True)
    frame_bounds = tuple(selected_frame.bounds.values[0])

    extended_bbox = (
        frame_bounds[0] - margin_deg,
        frame_bounds[1] - margin_deg,
        frame_bounds[2] + margin_deg,
        frame_bounds[3] + margin_deg,
    )

    grid = UnrGridSource(version=version)
    ts_grid_df = grid.timeseries_many(bbox=extended_bbox)
    ts_grid_df["date"] = pd.to_datetime(ts_grid_df["date"])

    output_dir.mkdir(parents=True, exist_ok=True)

    _save_parquet(ts_grid_df, frame_id, plate, output_dir)

generate_s1_burst_tiles

generate_s1_burst_tiles(frame_id: int, sensing_time: datetime, output_dir: Path, time_window_hours: float = 2.0, n_download_processes: int = 5) -> Path

Generate non-overlapping burst tiles for a frame and sensing time.

Downloads CSLC data, processes bursts to create non-overlapping polygons, and saves to GeoJSON. Priority: IW1 > IW2 > IW3, lower burst_id first.

Parameters:

Name Type Description Default
frame_id int

OPERA frame identifier.

required
sensing_time datetime

Sensing time to search for CSLC products.

required
output_dir Path

Directory to save output GeoJSON and temporary files.

required
time_window_hours float

Time window in hours for searching CSLC products. Default is 2.0.

2.0
n_download_processes int

Number of parallel download processes. Default is 5.

5

Returns:

Type Description
Path

Path to the generated GeoJSON file.

Raises:

Type Description
ValueError

If no bursts are found or download fails.

Source code in src/cal_disp/download/_stage_burst_bounds.py
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
def generate_s1_burst_tiles(
    frame_id: int,
    sensing_time: datetime,
    output_dir: Path,
    time_window_hours: float = 2.0,
    n_download_processes: int = 5,
) -> Path:
    """Generate non-overlapping burst tiles for a frame and sensing time.

    Downloads CSLC data, processes bursts to create non-overlapping polygons,
    and saves to GeoJSON. Priority: IW1 > IW2 > IW3, lower burst_id first.

    Parameters
    ----------
    frame_id : int
        OPERA frame identifier.
    sensing_time : datetime
        Sensing time to search for CSLC products.
    output_dir : Path
        Directory to save output GeoJSON and temporary files.
    time_window_hours : float, optional
        Time window in hours for searching CSLC products. Default is 2.0.
    n_download_processes : int, optional
        Number of parallel download processes. Default is 5.

    Returns
    -------
    Path
        Path to the generated GeoJSON file.

    Raises
    ------
    ValueError
        If no bursts are found or download fails.

    """
    output_dir = Path(output_dir)
    output_dir.mkdir(exist_ok=True, parents=True)

    logger.info(f"Starting burst tile generation for frame {frame_id}")

    # Get burst IDs and EPSG for frame
    burst_ids = opera_utils.get_burst_ids_for_frame(frame_id)
    burst_ids = [b.upper() for b in burst_ids]
    logger.info(f"Found {len(burst_ids)} burst IDs for frame {frame_id}")

    epsg = opera_utils.get_frame_bbox(frame_id)[0]
    logger.debug(f"Target CRS for frame: EPSG:{epsg}")

    # Search for CSLC products
    results = search_cslc_bursts(burst_ids, sensing_time, time_window_hours)
    logger.info(f"Found {len(results)} CSLC products")

    # Download files
    target_date = sensing_time.date()
    cslc_files = download_cslc_files(
        results, output_dir, target_date, n_download_processes
    )
    logger.info(f"Downloaded {len(cslc_files)} CSLC files")

    # Process bounds
    cslc_gdf = process_cslc_bounds(cslc_files, epsg=epsg)

    # Create non-overlapping tiles
    burst_gdf = create_nonoverlapping_tiles(cslc_gdf)

    # Log summary statistics
    swath_counts = burst_gdf.groupby("swath").size()
    logger.info(f"Generated {len(burst_gdf)} non-overlapping tiles")
    for swath_num, count in swath_counts.items():
        logger.info(f"  IW{swath_num}: {count} tiles")

    # Save to GeoJSON
    output_file = output_dir / f"{target_date}_tiles.geojson"
    burst_gdf.to_file(output_file)
    logger.info(f"Saved tiles to {output_file}")

    # Clean up temp directory
    temp_dir = output_dir / "tmp"
    if temp_dir.exists():
        shutil.rmtree(temp_dir)
        logger.debug(f"Cleaned up temporary directory: {temp_dir}")

    return output_file

utils

extract_sensing_times_from_file

extract_sensing_times_from_file(disp_file: Path) -> list[datetime]

Extract sensing times from DISP-S1 NetCDF filename.

Parses DISP-S1 filename to extract reference and secondary dates. Expected format: OPERA_L3_DISP-S1_IW_F{frame}VV{ref_date}{sec_date}_v{version}{prod_date}.nc

Parameters:

Name Type Description Default
disp_file Path

Path to DISP-S1 NetCDF file.

required

Returns:

Type Description
list[datetime]

List of unique sensing times from reference and secondary dates.

Raises:

Type Description
FileNotFoundError

If DISP file does not exist.

ValueError

If dates cannot be parsed from filename.

Examples:

>>> from pathlib import Path
>>> filename = Path("OPERA_L3_*.nc")
>>> times = extract_sensing_times_from_file(filename)
>>> len(times)
2
Source code in src/cal_disp/download/utils.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def extract_sensing_times_from_file(disp_file: Path) -> list[datetime]:
    """Extract sensing times from DISP-S1 NetCDF filename.

    Parses DISP-S1 filename to extract reference and secondary dates.
    Expected format:
    OPERA_L3_DISP-S1_IW_F{frame}_VV_{ref_date}_{sec_date}_v{version}_{prod_date}.nc

    Parameters
    ----------
    disp_file : Path
        Path to DISP-S1 NetCDF file.

    Returns
    -------
    list[datetime]
        List of unique sensing times from reference and secondary dates.

    Raises
    ------
    FileNotFoundError
        If DISP file does not exist.
    ValueError
        If dates cannot be parsed from filename.

    Examples
    --------
    >>> from pathlib import Path
    >>> filename = Path("OPERA_L3_*.nc")
    >>> times = extract_sensing_times_from_file(filename)  # doctest: +SKIP
    >>> len(times)  # doctest: +SKIP
    2

    """
    if not disp_file.exists():
        msg = f"DISP file not found: {disp_file}"
        raise FileNotFoundError(msg)

    logger.info(f"Extracting sensing times from {disp_file.name}")

    filename = disp_file.name
    parts = filename.split("_")

    # Find parts matching datetime format (YYYYMMDDTHHMMSSZ)
    datetime_parts = [p for p in parts if len(p) == 16 and p.endswith("Z") and "T" in p]

    if len(datetime_parts) < 2:
        msg = (
            f"Cannot parse reference and secondary dates from filename: {filename}. "
            "Expected format: "
            "OPERA_L3_DISP-S1_IW_F{{frame}}_VV_{{ref_date}}_{{sec_date}}_"
            "v{{version}}_{{prod_date}}.nc"
        )
        raise ValueError(msg)

    ref_date_str = datetime_parts[0]
    sec_date_str = datetime_parts[1]

    try:
        ref_date = datetime.strptime(ref_date_str, "%Y%m%dT%H%M%SZ")
        sec_date = datetime.strptime(sec_date_str, "%Y%m%dT%H%M%SZ")
    except ValueError as e:
        msg = f"Failed to parse dates from filename {filename}: {e}"
        raise ValueError(msg) from e

    sensing_times = sorted({ref_date, sec_date})
    logger.info(
        f"Parsed sensing times: {ref_date.isoformat()} (ref), "
        f"{sec_date.isoformat()} (sec)"
    )

    return sensing_times

Product

cal_disp.product

CalProduct dataclass

Calibrated OPERA DISP displacement product.

Represents a calibration correction product that should be subtracted from OPERA DISP products. Main group contains calibration at full resolution. Optional model_3d group contains 3D displacement components at coarser resolution.

Groups

Main group: - calibration: Correction to subtract from DISP (full resolution) - calibration_std: Calibration uncertainty (full resolution)

model_3d group (optional): - north_south: North-south displacement component (coarse resolution) - east_west: East-west displacement component (coarse resolution) - up_down: Up-down displacement component (coarse resolution) - north_south_std: Uncertainty in north-south - east_west_std: Uncertainty in east-west - up_down_std: Uncertainty in up-down

Parameters:

Name Type Description Default
path Path

Path to the calibration product NetCDF file.

required
frame_id int

OPERA frame identifier.

required
primary_date datetime

Earlier acquisition date (reference).

required
secondary_date datetime

Later acquisition date.

required
polarization str

Radar polarization (e.g., "VV", "VH").

required
sensor str

Sensor type: "S1" (Sentinel-1) or "NI" (NISAR).

required
version str

Product version string.

required
production_date datetime

Date when product was generated.

required
mode str

Acquisition mode (e.g., "IW" for S1, "LSAR" for NI).

'IW'

Examples:

>>> # Create product with both groups
>>> cal = CalProduct.create(
...     calibration=cal_correction,
...     disp_product=disp,
...     output_dir="output/",
...     model_3d={"north_south": vel_ns, "east_west": vel_ew, "up_down": vel_ud},
... )
>>> # Access main calibration
>>> ds_main = cal.open_dataset()
>>> calibration = ds_main["calibration"]
>>> # Access 3D model (coarse resolution)
>>> ds_model = cal.open_model_3d()
>>> model_up = ds_model["up_down"]
Source code in src/cal_disp/product/_cal.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
@dataclass
class CalProduct:
    """Calibrated OPERA DISP displacement product.

    Represents a calibration correction product that should be subtracted
    from OPERA DISP products. Main group contains calibration at full
    resolution. Optional model_3d group contains 3D displacement
    components at coarser resolution.

    Groups
    ------
    Main group:
        - calibration: Correction to subtract from DISP (full resolution)
        - calibration_std: Calibration uncertainty (full resolution)

    model_3d group (optional):
        - north_south: North-south displacement component (coarse resolution)
        - east_west: East-west displacement component (coarse resolution)
        - up_down: Up-down displacement component (coarse resolution)
        - north_south_std: Uncertainty in north-south
        - east_west_std: Uncertainty in east-west
        - up_down_std: Uncertainty in up-down

    Parameters
    ----------
    path : Path
        Path to the calibration product NetCDF file.
    frame_id : int
        OPERA frame identifier.
    primary_date : datetime
        Earlier acquisition date (reference).
    secondary_date : datetime
        Later acquisition date.
    polarization : str
        Radar polarization (e.g., "VV", "VH").
    sensor : str
        Sensor type: "S1" (Sentinel-1) or "NI" (NISAR).
    version : str
        Product version string.
    production_date : datetime
        Date when product was generated.
    mode : str
        Acquisition mode (e.g., "IW" for S1, "LSAR" for NI).

    Examples
    --------
    >>> # Create product with both groups
    >>> cal = CalProduct.create(
    ...     calibration=cal_correction,
    ...     disp_product=disp,
    ...     output_dir="output/",
    ...     model_3d={"north_south": vel_ns, "east_west": vel_ew, "up_down": vel_ud},
    ... )

    >>> # Access main calibration
    >>> ds_main = cal.open_dataset()
    >>> calibration = ds_main["calibration"]

    >>> # Access 3D model (coarse resolution)
    >>> ds_model = cal.open_model_3d()
    >>> model_up = ds_model["up_down"]

    """

    path: Path
    frame_id: int
    primary_date: datetime
    secondary_date: datetime
    polarization: str
    sensor: str
    version: str
    production_date: datetime
    mode: str = "IW"

    # Filename pattern supporting both S1 and NI sensors
    _PATTERN = re.compile(
        r"OPERA_L4_CAL-DISP-(?P<sensor>S1|NI)_"
        r"(?P<mode>\w+)_"
        r"F(?P<frame_id>\d+)_"
        r"(?P<pol>\w+)_"
        r"(?P<primary>\d{8}T\d{6}Z)_"
        r"(?P<secondary>\d{8}T\d{6}Z)_"
        r"v(?P<version>[\d.]+)_"
        r"(?P<production>\d{8}T\d{6}Z)"
        r"\.nc$"
    )

    # Main group layers (full resolution)
    CAL_LAYERS = [
        "calibration",  # Main correction (subtract from DISP)
        "calibration_std",  # Uncertainty
    ]

    # model_3d group layers (coarse resolution)
    MODEL_3D_LAYERS = [
        "north_south",  # North-south displacement component
        "east_west",  # East-west displacement component
        "up_down",  # Up-down displacement component
        "north_south_std",  # Uncertainty - north
        "east_west_std",  # Uncertainty - east
        "up_down_std",  # Uncertainty - up
    ]

    def __post_init__(self) -> None:
        """Validate product after construction."""
        self.path = Path(self.path)

        if self.frame_id <= 0:
            raise ValueError(f"frame_id must be positive, got {self.frame_id}")

        if self.secondary_date <= self.primary_date:
            raise ValueError(
                f"Secondary date ({self.secondary_date}) must be after "
                f"primary date ({self.primary_date})"
            )

        if self.polarization not in {"VV", "VH", "HH", "HV"}:
            raise ValueError(f"Invalid polarization: {self.polarization}")

        if self.sensor not in {"S1", "NI"}:
            raise ValueError(f"Invalid sensor: {self.sensor}. Must be 'S1' or 'NI'")

    @classmethod
    def from_path(cls, path: Path | str) -> "CalProduct":
        """Parse product metadata from filename.

        Parameters
        ----------
        path : Path or str
            Path to calibration product NetCDF file.

        Returns
        -------
        CalProduct
            Parsed calibration product instance.

        Raises
        ------
        ValueError
            If filename doesn't match OPERA CAL-DISP pattern.

        Examples
        --------
        >>> cal = CalProduct.from_path(
        "OPERA_L4_CAL-DISP-S1_IW_F08882_VV_20220111T002651Z_20220722T002657Z_v1.0_20251227T123456Z.nc")
        >>> cal.sensor
        'S1'

        """
        path = Path(path)
        match = cls._PATTERN.match(path.name)

        if not match:
            raise ValueError(
                f"Filename does not match OPERA CAL-DISP pattern: {path.name}"
            )

        return cls(
            path=path,
            frame_id=int(match.group("frame_id")),
            primary_date=datetime.strptime(match.group("primary"), "%Y%m%dT%H%M%SZ"),
            secondary_date=datetime.strptime(
                match.group("secondary"), "%Y%m%dT%H%M%SZ"
            ),
            polarization=match.group("pol"),
            sensor=match.group("sensor"),
            version=match.group("version"),
            production_date=datetime.strptime(
                match.group("production"), "%Y%m%dT%H%M%SZ"
            ),
            mode=match.group("mode"),
        )

    @classmethod
    def create(
        cls,
        calibration: xr.DataArray,
        disp_product: "DispProduct",
        output_dir: Path | str,
        sensor: str = "S1",
        calibration_std: xr.DataArray | None = None,
        model_3d: dict[str, xr.DataArray] | None = None,
        model_3d_std: dict[str, xr.DataArray] | None = None,
        metadata: dict[str, str] | None = None,
        version: str = "1.0",
    ) -> "CalProduct":
        """Create calibration product with optional model_3d group.

        Parameters
        ----------
        calibration : xr.DataArray
            Calibration correction at full DISP resolution.
        disp_product : DispProduct
            Original DISP product (for metadata).
        output_dir : Path or str
            Output directory for NetCDF file.
        sensor : str, optional
            Sensor type: "S1" or "NI". Default is "S1".
        calibration_std : xr.DataArray or None, optional
            Calibration uncertainty at full resolution. Default is None.
        model_3d : dict[str, xr.DataArray] or None, optional
            3D displacement components (coarse resolution) with keys:
            "north_south", "east_west", "up_down". Default is None.
        model_3d_std : dict[str, xr.DataArray] or None, optional
            3D displacement uncertainties (coarse resolution). Default is None.
        metadata : dict[str, str] or None, optional
            Additional metadata (e.g., GNSS reference epoch). Default is None.
        version : str, optional
            Product version. Default is "1.0".

        Returns
        -------
        CalProduct
            Created calibration product.

        Examples
        --------
        >>> from product import DispProduct, CalProduct
        >>>
        >>> disp = DispProduct.from_path(
        "OPERA_L3_DISP-S1_IW_F08882_VV_20220111T002651Z_20220722T002657Z_v1.0_20251027T005420Z.nc")
        >>>
        >>> # Full resolution calibration
        >>> cal_full = calibration_at_30m_resolution
        >>>
        >>> # Coarse resolution 3D model (e.g., 90m from GNSS interpolation)
        >>> model_coarse = {
        ...     "north_south": disp_ns_90m,
        ...     "east_west": disp_ew_90m,
        ...     "up_down": disp_ud_90m,
        ... }
        >>>
        >>> cal = CalProduct.create(
        ...     calibration=cal_full,
        ...     disp_product=disp,
        ...     output_dir="output/",
        ...     calibration_std=cal_std,
        ...     model_3d=model_coarse,
        ...     model_3d_std={
        ...         "north_south_std": disp_ns_std_90m,
        ...         "east_west_std": disp_ew_std_90m,
        ...         "up_down_std": disp_ud_std_90m,
        ...     },
        ...     metadata={
        ...         "gnss_reference_epoch": "2020-01-01T00:00:00Z",
        ...         "model_3d_resolution": "90m",
        ...     },
        ... )

        """
        if sensor not in {"S1", "NI"}:
            raise ValueError(f"Invalid sensor: {sensor}. Must be 'S1' or 'NI'")

        output_dir = Path(output_dir)
        output_dir.mkdir(parents=True, exist_ok=True)

        # Generate OPERA-compliant filename
        production_date = datetime.utcnow()
        filename = (
            f"OPERA_L4_CAL-DISP-{sensor}_"
            f"{disp_product.mode}_"
            f"F{disp_product.frame_id:05d}_"
            f"{disp_product.polarization}_"
            f"{disp_product.primary_date:%Y%m%dT%H%M%S}Z_"
            f"{disp_product.secondary_date:%Y%m%dT%H%M%S}Z_"
            f"v{version}_"
            f"{production_date:%Y%m%dT%H%M%S}Z"
            ".nc"
        )

        output_path = output_dir / filename

        # Build main group dataset (full resolution)
        data_vars = {"calibration": calibration}

        if calibration_std is not None:
            data_vars["calibration_std"] = calibration_std

        # Create main dataset
        ds = xr.Dataset(data_vars)

        # Add global attributes
        ds.attrs.update(
            {
                "product_type": f"OPERA_L4_CAL-DISP-{sensor}",
                "sensor": sensor,
                "frame_id": disp_product.frame_id,
                "mode": disp_product.mode,
                "polarization": disp_product.polarization,
                "primary_datetime": disp_product.primary_date.isoformat(),
                "secondary_datetime": disp_product.secondary_date.isoformat(),
                "production_datetime": production_date.isoformat(),
                "product_version": version,
                "description": (
                    f"Calibration correction for {sensor} InSAR displacement (subtract"
                    " from DISP)"
                ),
                "source_product": disp_product.filename,
                "usage": (
                    "Subtract calibration layer from DISP displacement to obtain"
                    " calibrated displacement"
                ),
            }
        )

        # Add custom metadata
        if metadata:
            ds.attrs.update(metadata)

        # Save main group
        ds.to_netcdf(output_path, engine="h5netcdf")

        # Create model_3d group if 3D components provided (coarse resolution)
        if model_3d or model_3d_std:
            model_data_vars = {}

            # Add 3D displacement components
            if model_3d:
                for comp in ["north_south", "east_west", "up_down"]:
                    if comp in model_3d:
                        model_data_vars[comp] = model_3d[comp]

            # Add 3D displacement uncertainties
            if model_3d_std:
                for comp in ["north_south_std", "east_west_std", "up_down_std"]:
                    if comp in model_3d_std:
                        model_data_vars[comp] = model_3d_std[comp]

            if model_data_vars:
                ds_model = xr.Dataset(model_data_vars)

                # Add model group attributes
                ds_model.attrs.update(
                    {
                        "description": (
                            "3D displacement model at coarse resolution (e.g., from"
                            " GNSS interpolation or deformation model)"
                        ),
                        "units": "meters",
                        "reference_frame": "ENU (East-North-Up)",
                    }
                )

                # Append to existing file as model_3d group
                ds_model.to_netcdf(
                    output_path,
                    mode="a",
                    group="model_3d",
                    engine="h5netcdf",
                )

        return cls(
            path=output_path,
            frame_id=disp_product.frame_id,
            primary_date=disp_product.primary_date,
            secondary_date=disp_product.secondary_date,
            polarization=disp_product.polarization,
            sensor=sensor,
            version=version,
            production_date=production_date,
            mode=disp_product.mode,
        )

    def open_dataset(self, group: str | None = None) -> xr.Dataset:
        """Open calibration dataset.

        Parameters
        ----------
        group : str or None, optional
            Group to open: None for main, "model_3d" for 3D model.
            Default is None (main group).

        Returns
        -------
        xr.Dataset
            Dataset containing requested group.

        Raises
        ------
        FileNotFoundError
            If product file does not exist.

        Examples
        --------
        >>> # Open main calibration (full resolution)
        >>> ds_main = cal.open_dataset()
        >>> calibration = ds_main["calibration"]

        >>> # Open model_3d group (coarse resolution)
        >>> ds_model = cal.open_dataset(group="model_3d")
        >>> model_up = ds_model["up_down"]

        """
        if not self.path.exists():
            raise FileNotFoundError(f"Product file not found: {self.path}")

        if group == "model_3d":
            return xr.open_dataset(self.path, group="model_3d", engine="h5netcdf")

        return xr.open_dataset(self.path, engine="h5netcdf")

    def open_model_3d(self) -> xr.Dataset:
        """Open model_3d group dataset.

        Returns
        -------
        xr.Dataset
            Dataset containing 3D displacement model at coarse resolution.

        Raises
        ------
        FileNotFoundError
            If product file does not exist.
        ValueError
            If model_3d group does not exist.

        Examples
        --------
        >>> ds_model = cal.open_model_3d()
        >>> disp_ns = ds_model["north_south"]
        >>> disp_ew = ds_model["east_west"]
        >>> disp_up = ds_model["up_down"]

        """
        if not self.path.exists():
            raise FileNotFoundError(f"Product file not found: {self.path}")

        try:
            return xr.open_dataset(self.path, group="model_3d", engine="h5netcdf")
        except (OSError, ValueError) as e:
            raise ValueError(
                f"model_3d group not found in {self.filename}. "
                "Product may not contain 3D displacement model."
            ) from e

    def has_model_3d(self) -> bool:
        """Check if product contains model_3d group.

        Returns
        -------
        bool
            True if model_3d group exists.

        """
        try:
            self.open_model_3d()
            return True
        except (FileNotFoundError, ValueError):
            return False

    def get_epsg(self) -> int | None:
        """Get EPSG code from spatial reference."""
        ds = self.open_dataset()

        if "spatial_ref" in ds:
            crs_wkt = ds.spatial_ref.attrs.get("crs_wkt")
            if crs_wkt:
                crs = CRS.from_wkt(crs_wkt)
                return crs.to_epsg()

        return None

    def get_bounds(self) -> dict[str, float]:
        """Get bounds in native projection."""
        ds = self.open_dataset()

        x = ds.x.values
        y = ds.y.values

        return {
            "left": float(x.min()),
            "bottom": float(y.min()),
            "right": float(x.max()),
            "top": float(y.max()),
        }

    def get_bounds_wgs84(self) -> dict[str, float]:
        """Get bounds transformed to WGS84."""
        ds = self.open_dataset()

        x = ds.x.values
        y = ds.y.values
        left = float(x.min())
        bottom = float(y.min())
        right = float(x.max())
        top = float(y.max())

        if "spatial_ref" not in ds:
            raise ValueError("Dataset missing spatial_ref")

        crs_wkt = ds.spatial_ref.attrs.get("crs_wkt")
        if not crs_wkt:
            raise ValueError("spatial_ref missing crs_wkt")

        src_crs = CRS.from_wkt(crs_wkt)

        west, south, east, north = transform_bounds(
            src_crs,
            CRS.from_epsg(4326),
            left,
            bottom,
            right,
            top,
        )

        return {
            "west": west,
            "south": south,
            "east": east,
            "north": north,
        }

    def to_geotiff(
        self,
        layer: str,
        output_path: Path | str,
        group: str | None = None,
        compress: str = "DEFLATE",
        **kwargs,
    ) -> Path:
        """Export layer to GeoTIFF.

        Parameters
        ----------
        layer : str
            Name of layer to export.
        output_path : Path or str
            Output GeoTIFF path.
        group : str or None, optional
            Group containing layer. Default is None (main group).
        compress : str, optional
            Compression method. Default is "DEFLATE".
        **kwargs
            Additional rasterio creation options.

        Returns
        -------
        Path
            Path to created GeoTIFF.

        Examples
        --------
        >>> # Export main calibration
        >>> cal.to_geotiff("calibration", "calibration.tif")

        >>> # Export 3D model component
        >>> cal.to_geotiff("up_down", "model_up.tif", group="model_3d")

        """
        output_path = Path(output_path)
        output_path.parent.mkdir(parents=True, exist_ok=True)

        ds = self.open_dataset(group=group)

        if layer not in ds:
            available = list(ds.data_vars)
            group_str = f" in {group} group" if group else ""
            raise ValueError(
                f"Layer '{layer}' not found{group_str}. Available: {available}"
            )

        da = ds[layer]
        data = da.values

        # Extract spatial information
        if "spatial_ref" in ds:
            transform = self._get_transform(ds)
            crs = self._get_crs(ds)
        else:
            transform = Affine.translation(
                float(ds.x.values[0]),
                float(ds.y.values[0]),
            ) * Affine.scale(
                float(ds.x.values[1] - ds.x.values[0]),
                float(ds.y.values[1] - ds.y.values[0]),
            )
            crs = None

        # Write GeoTIFF
        profile = {
            "driver": "GTiff",
            "height": data.shape[0],
            "width": data.shape[1],
            "count": 1,
            "dtype": np.float32,
            "transform": transform,
            "compress": compress,
            "tiled": True,
            "blockxsize": 512,
            "blockysize": 512,
            **kwargs,
        }

        if crs:
            profile["crs"] = crs

        with rasterio.open(output_path, "w", **profile) as dst:
            dst.write(data.astype(np.float32), 1)
            dst.set_band_description(1, layer)

            # Add OPERA metadata tags
            dst.update_tags(
                product_type=f"OPERA_L4_CAL-DISP-{self.sensor}",
                sensor=self.sensor,
                frame_id=self.frame_id,
                polarization=self.polarization,
                primary_date=self.primary_date.isoformat(),
                secondary_date=self.secondary_date.isoformat(),
                layer=layer,
                group=group if group else "main",
            )

        return output_path

    def get_calibration_summary(self) -> dict[str, dict[str, float]]:
        """Get summary statistics of all layers.

        Returns
        -------
        dict[str, dict[str, float]]
            Statistics for main and model_3d groups.

        Examples
        --------
        >>> summary = cal.get_calibration_summary()
        >>> summary["main"]["calibration"]
        {'mean': 0.023, 'std': 0.015, 'min': -0.05, 'max': 0.08}
        >>> summary["model_3d"]["up_down"]
        {'mean': 0.001, 'std': 0.003, 'min': -0.01, 'max': 0.02}

        """
        summary: dict = {"main": {}}

        # Main group
        ds = self.open_dataset()
        for var in ds.data_vars:
            data = ds[var].values
            valid_data = data[~np.isnan(data)]

            if len(valid_data) > 0:
                summary["main"][var] = {
                    "mean": float(np.mean(valid_data)),
                    "std": float(np.std(valid_data)),
                    "min": float(np.min(valid_data)),
                    "max": float(np.max(valid_data)),
                }

        # model_3d group if exists
        if self.has_model_3d():
            summary["model_3d"] = {}
            ds_model = self.open_model_3d()

            for var in ds_model.data_vars:
                data = ds_model[var].values
                valid_data = data[~np.isnan(data)]

                if len(valid_data) > 0:
                    summary["model_3d"][var] = {
                        "mean": float(np.mean(valid_data)),
                        "std": float(np.std(valid_data)),
                        "min": float(np.min(valid_data)),
                        "max": float(np.max(valid_data)),
                    }

        return summary

    def _get_transform(self, ds: xr.Dataset) -> Affine:
        """Extract affine transform from dataset."""
        gt = ds.spatial_ref.attrs.get("GeoTransform")
        if gt is None:
            raise ValueError("No GeoTransform found in spatial_ref")

        vals = [float(x) for x in gt.split()]
        return Affine(vals[1], vals[2], vals[0], vals[4], vals[5], vals[3])

    def _get_crs(self, ds: xr.Dataset) -> str:
        """Extract CRS from dataset."""
        crs_wkt = ds.spatial_ref.attrs.get("crs_wkt")
        if crs_wkt is None:
            raise ValueError("No crs_wkt found in spatial_ref")
        return crs_wkt

    @property
    def baseline_days(self) -> int:
        """Temporal baseline in days."""
        return (self.secondary_date - self.primary_date).days

    @property
    def filename(self) -> str:
        """Product filename."""
        return self.path.name

    @property
    def exists(self) -> bool:
        """Check if product file exists."""
        return self.path.exists()

    def __repr__(self) -> str:
        """Return a string representation."""
        model_str = "+model_3d" if self.exists and self.has_model_3d() else ""
        return (
            f"CalProduct(sensor={self.sensor}, frame={self.frame_id}, "
            f"{self.primary_date.date()}{self.secondary_date.date()}, "
            f"{self.polarization}{model_str})"
        )

baseline_days property

baseline_days: int

Temporal baseline in days.

exists property

exists: bool

Check if product file exists.

filename property

filename: str

Product filename.

create classmethod

create(calibration: DataArray, disp_product: DispProduct, output_dir: Path | str, sensor: str = 'S1', calibration_std: DataArray | None = None, model_3d: dict[str, DataArray] | None = None, model_3d_std: dict[str, DataArray] | None = None, metadata: dict[str, str] | None = None, version: str = '1.0') -> CalProduct

Create calibration product with optional model_3d group.

Parameters:

Name Type Description Default
calibration DataArray

Calibration correction at full DISP resolution.

required
disp_product DispProduct

Original DISP product (for metadata).

required
output_dir Path or str

Output directory for NetCDF file.

required
sensor str

Sensor type: "S1" or "NI". Default is "S1".

'S1'
calibration_std DataArray or None

Calibration uncertainty at full resolution. Default is None.

None
model_3d dict[str, DataArray] or None

3D displacement components (coarse resolution) with keys: "north_south", "east_west", "up_down". Default is None.

None
model_3d_std dict[str, DataArray] or None

3D displacement uncertainties (coarse resolution). Default is None.

None
metadata dict[str, str] or None

Additional metadata (e.g., GNSS reference epoch). Default is None.

None
version str

Product version. Default is "1.0".

'1.0'

Returns:

Type Description
CalProduct

Created calibration product.

Examples:

>>> from product import DispProduct, CalProduct
>>>
>>> disp = DispProduct.from_path(
"OPERA_L3_DISP-S1_IW_F08882_VV_20220111T002651Z_20220722T002657Z_v1.0_20251027T005420Z.nc")
>>>
>>> # Full resolution calibration
>>> cal_full = calibration_at_30m_resolution
>>>
>>> # Coarse resolution 3D model (e.g., 90m from GNSS interpolation)
>>> model_coarse = {
...     "north_south": disp_ns_90m,
...     "east_west": disp_ew_90m,
...     "up_down": disp_ud_90m,
... }
>>>
>>> cal = CalProduct.create(
...     calibration=cal_full,
...     disp_product=disp,
...     output_dir="output/",
...     calibration_std=cal_std,
...     model_3d=model_coarse,
...     model_3d_std={
...         "north_south_std": disp_ns_std_90m,
...         "east_west_std": disp_ew_std_90m,
...         "up_down_std": disp_ud_std_90m,
...     },
...     metadata={
...         "gnss_reference_epoch": "2020-01-01T00:00:00Z",
...         "model_3d_resolution": "90m",
...     },
... )
Source code in src/cal_disp/product/_cal.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
@classmethod
def create(
    cls,
    calibration: xr.DataArray,
    disp_product: "DispProduct",
    output_dir: Path | str,
    sensor: str = "S1",
    calibration_std: xr.DataArray | None = None,
    model_3d: dict[str, xr.DataArray] | None = None,
    model_3d_std: dict[str, xr.DataArray] | None = None,
    metadata: dict[str, str] | None = None,
    version: str = "1.0",
) -> "CalProduct":
    """Create calibration product with optional model_3d group.

    Parameters
    ----------
    calibration : xr.DataArray
        Calibration correction at full DISP resolution.
    disp_product : DispProduct
        Original DISP product (for metadata).
    output_dir : Path or str
        Output directory for NetCDF file.
    sensor : str, optional
        Sensor type: "S1" or "NI". Default is "S1".
    calibration_std : xr.DataArray or None, optional
        Calibration uncertainty at full resolution. Default is None.
    model_3d : dict[str, xr.DataArray] or None, optional
        3D displacement components (coarse resolution) with keys:
        "north_south", "east_west", "up_down". Default is None.
    model_3d_std : dict[str, xr.DataArray] or None, optional
        3D displacement uncertainties (coarse resolution). Default is None.
    metadata : dict[str, str] or None, optional
        Additional metadata (e.g., GNSS reference epoch). Default is None.
    version : str, optional
        Product version. Default is "1.0".

    Returns
    -------
    CalProduct
        Created calibration product.

    Examples
    --------
    >>> from product import DispProduct, CalProduct
    >>>
    >>> disp = DispProduct.from_path(
    "OPERA_L3_DISP-S1_IW_F08882_VV_20220111T002651Z_20220722T002657Z_v1.0_20251027T005420Z.nc")
    >>>
    >>> # Full resolution calibration
    >>> cal_full = calibration_at_30m_resolution
    >>>
    >>> # Coarse resolution 3D model (e.g., 90m from GNSS interpolation)
    >>> model_coarse = {
    ...     "north_south": disp_ns_90m,
    ...     "east_west": disp_ew_90m,
    ...     "up_down": disp_ud_90m,
    ... }
    >>>
    >>> cal = CalProduct.create(
    ...     calibration=cal_full,
    ...     disp_product=disp,
    ...     output_dir="output/",
    ...     calibration_std=cal_std,
    ...     model_3d=model_coarse,
    ...     model_3d_std={
    ...         "north_south_std": disp_ns_std_90m,
    ...         "east_west_std": disp_ew_std_90m,
    ...         "up_down_std": disp_ud_std_90m,
    ...     },
    ...     metadata={
    ...         "gnss_reference_epoch": "2020-01-01T00:00:00Z",
    ...         "model_3d_resolution": "90m",
    ...     },
    ... )

    """
    if sensor not in {"S1", "NI"}:
        raise ValueError(f"Invalid sensor: {sensor}. Must be 'S1' or 'NI'")

    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    # Generate OPERA-compliant filename
    production_date = datetime.utcnow()
    filename = (
        f"OPERA_L4_CAL-DISP-{sensor}_"
        f"{disp_product.mode}_"
        f"F{disp_product.frame_id:05d}_"
        f"{disp_product.polarization}_"
        f"{disp_product.primary_date:%Y%m%dT%H%M%S}Z_"
        f"{disp_product.secondary_date:%Y%m%dT%H%M%S}Z_"
        f"v{version}_"
        f"{production_date:%Y%m%dT%H%M%S}Z"
        ".nc"
    )

    output_path = output_dir / filename

    # Build main group dataset (full resolution)
    data_vars = {"calibration": calibration}

    if calibration_std is not None:
        data_vars["calibration_std"] = calibration_std

    # Create main dataset
    ds = xr.Dataset(data_vars)

    # Add global attributes
    ds.attrs.update(
        {
            "product_type": f"OPERA_L4_CAL-DISP-{sensor}",
            "sensor": sensor,
            "frame_id": disp_product.frame_id,
            "mode": disp_product.mode,
            "polarization": disp_product.polarization,
            "primary_datetime": disp_product.primary_date.isoformat(),
            "secondary_datetime": disp_product.secondary_date.isoformat(),
            "production_datetime": production_date.isoformat(),
            "product_version": version,
            "description": (
                f"Calibration correction for {sensor} InSAR displacement (subtract"
                " from DISP)"
            ),
            "source_product": disp_product.filename,
            "usage": (
                "Subtract calibration layer from DISP displacement to obtain"
                " calibrated displacement"
            ),
        }
    )

    # Add custom metadata
    if metadata:
        ds.attrs.update(metadata)

    # Save main group
    ds.to_netcdf(output_path, engine="h5netcdf")

    # Create model_3d group if 3D components provided (coarse resolution)
    if model_3d or model_3d_std:
        model_data_vars = {}

        # Add 3D displacement components
        if model_3d:
            for comp in ["north_south", "east_west", "up_down"]:
                if comp in model_3d:
                    model_data_vars[comp] = model_3d[comp]

        # Add 3D displacement uncertainties
        if model_3d_std:
            for comp in ["north_south_std", "east_west_std", "up_down_std"]:
                if comp in model_3d_std:
                    model_data_vars[comp] = model_3d_std[comp]

        if model_data_vars:
            ds_model = xr.Dataset(model_data_vars)

            # Add model group attributes
            ds_model.attrs.update(
                {
                    "description": (
                        "3D displacement model at coarse resolution (e.g., from"
                        " GNSS interpolation or deformation model)"
                    ),
                    "units": "meters",
                    "reference_frame": "ENU (East-North-Up)",
                }
            )

            # Append to existing file as model_3d group
            ds_model.to_netcdf(
                output_path,
                mode="a",
                group="model_3d",
                engine="h5netcdf",
            )

    return cls(
        path=output_path,
        frame_id=disp_product.frame_id,
        primary_date=disp_product.primary_date,
        secondary_date=disp_product.secondary_date,
        polarization=disp_product.polarization,
        sensor=sensor,
        version=version,
        production_date=production_date,
        mode=disp_product.mode,
    )

from_path classmethod

from_path(path: Path | str) -> CalProduct

Parse product metadata from filename.

Parameters:

Name Type Description Default
path Path or str

Path to calibration product NetCDF file.

required

Returns:

Type Description
CalProduct

Parsed calibration product instance.

Raises:

Type Description
ValueError

If filename doesn't match OPERA CAL-DISP pattern.

Examples:

>>> cal = CalProduct.from_path(
"OPERA_L4_CAL-DISP-S1_IW_F08882_VV_20220111T002651Z_20220722T002657Z_v1.0_20251227T123456Z.nc")
>>> cal.sensor
'S1'
Source code in src/cal_disp/product/_cal.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
@classmethod
def from_path(cls, path: Path | str) -> "CalProduct":
    """Parse product metadata from filename.

    Parameters
    ----------
    path : Path or str
        Path to calibration product NetCDF file.

    Returns
    -------
    CalProduct
        Parsed calibration product instance.

    Raises
    ------
    ValueError
        If filename doesn't match OPERA CAL-DISP pattern.

    Examples
    --------
    >>> cal = CalProduct.from_path(
    "OPERA_L4_CAL-DISP-S1_IW_F08882_VV_20220111T002651Z_20220722T002657Z_v1.0_20251227T123456Z.nc")
    >>> cal.sensor
    'S1'

    """
    path = Path(path)
    match = cls._PATTERN.match(path.name)

    if not match:
        raise ValueError(
            f"Filename does not match OPERA CAL-DISP pattern: {path.name}"
        )

    return cls(
        path=path,
        frame_id=int(match.group("frame_id")),
        primary_date=datetime.strptime(match.group("primary"), "%Y%m%dT%H%M%SZ"),
        secondary_date=datetime.strptime(
            match.group("secondary"), "%Y%m%dT%H%M%SZ"
        ),
        polarization=match.group("pol"),
        sensor=match.group("sensor"),
        version=match.group("version"),
        production_date=datetime.strptime(
            match.group("production"), "%Y%m%dT%H%M%SZ"
        ),
        mode=match.group("mode"),
    )

get_bounds

get_bounds() -> dict[str, float]

Get bounds in native projection.

Source code in src/cal_disp/product/_cal.py
478
479
480
481
482
483
484
485
486
487
488
489
490
def get_bounds(self) -> dict[str, float]:
    """Get bounds in native projection."""
    ds = self.open_dataset()

    x = ds.x.values
    y = ds.y.values

    return {
        "left": float(x.min()),
        "bottom": float(y.min()),
        "right": float(x.max()),
        "top": float(y.max()),
    }

get_bounds_wgs84

get_bounds_wgs84() -> dict[str, float]

Get bounds transformed to WGS84.

Source code in src/cal_disp/product/_cal.py
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
def get_bounds_wgs84(self) -> dict[str, float]:
    """Get bounds transformed to WGS84."""
    ds = self.open_dataset()

    x = ds.x.values
    y = ds.y.values
    left = float(x.min())
    bottom = float(y.min())
    right = float(x.max())
    top = float(y.max())

    if "spatial_ref" not in ds:
        raise ValueError("Dataset missing spatial_ref")

    crs_wkt = ds.spatial_ref.attrs.get("crs_wkt")
    if not crs_wkt:
        raise ValueError("spatial_ref missing crs_wkt")

    src_crs = CRS.from_wkt(crs_wkt)

    west, south, east, north = transform_bounds(
        src_crs,
        CRS.from_epsg(4326),
        left,
        bottom,
        right,
        top,
    )

    return {
        "west": west,
        "south": south,
        "east": east,
        "north": north,
    }

get_calibration_summary

get_calibration_summary() -> dict[str, dict[str, float]]

Get summary statistics of all layers.

Returns:

Type Description
dict[str, dict[str, float]]

Statistics for main and model_3d groups.

Examples:

>>> summary = cal.get_calibration_summary()
>>> summary["main"]["calibration"]
{'mean': 0.023, 'std': 0.015, 'min': -0.05, 'max': 0.08}
>>> summary["model_3d"]["up_down"]
{'mean': 0.001, 'std': 0.003, 'min': -0.01, 'max': 0.02}
Source code in src/cal_disp/product/_cal.py
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
def get_calibration_summary(self) -> dict[str, dict[str, float]]:
    """Get summary statistics of all layers.

    Returns
    -------
    dict[str, dict[str, float]]
        Statistics for main and model_3d groups.

    Examples
    --------
    >>> summary = cal.get_calibration_summary()
    >>> summary["main"]["calibration"]
    {'mean': 0.023, 'std': 0.015, 'min': -0.05, 'max': 0.08}
    >>> summary["model_3d"]["up_down"]
    {'mean': 0.001, 'std': 0.003, 'min': -0.01, 'max': 0.02}

    """
    summary: dict = {"main": {}}

    # Main group
    ds = self.open_dataset()
    for var in ds.data_vars:
        data = ds[var].values
        valid_data = data[~np.isnan(data)]

        if len(valid_data) > 0:
            summary["main"][var] = {
                "mean": float(np.mean(valid_data)),
                "std": float(np.std(valid_data)),
                "min": float(np.min(valid_data)),
                "max": float(np.max(valid_data)),
            }

    # model_3d group if exists
    if self.has_model_3d():
        summary["model_3d"] = {}
        ds_model = self.open_model_3d()

        for var in ds_model.data_vars:
            data = ds_model[var].values
            valid_data = data[~np.isnan(data)]

            if len(valid_data) > 0:
                summary["model_3d"][var] = {
                    "mean": float(np.mean(valid_data)),
                    "std": float(np.std(valid_data)),
                    "min": float(np.min(valid_data)),
                    "max": float(np.max(valid_data)),
                }

    return summary

get_epsg

get_epsg() -> int | None

Get EPSG code from spatial reference.

Source code in src/cal_disp/product/_cal.py
466
467
468
469
470
471
472
473
474
475
476
def get_epsg(self) -> int | None:
    """Get EPSG code from spatial reference."""
    ds = self.open_dataset()

    if "spatial_ref" in ds:
        crs_wkt = ds.spatial_ref.attrs.get("crs_wkt")
        if crs_wkt:
            crs = CRS.from_wkt(crs_wkt)
            return crs.to_epsg()

    return None

has_model_3d

has_model_3d() -> bool

Check if product contains model_3d group.

Returns:

Type Description
bool

True if model_3d group exists.

Source code in src/cal_disp/product/_cal.py
451
452
453
454
455
456
457
458
459
460
461
462
463
464
def has_model_3d(self) -> bool:
    """Check if product contains model_3d group.

    Returns
    -------
    bool
        True if model_3d group exists.

    """
    try:
        self.open_model_3d()
        return True
    except (FileNotFoundError, ValueError):
        return False

open_dataset

open_dataset(group: str | None = None) -> xr.Dataset

Open calibration dataset.

Parameters:

Name Type Description Default
group str or None

Group to open: None for main, "model_3d" for 3D model. Default is None (main group).

None

Returns:

Type Description
Dataset

Dataset containing requested group.

Raises:

Type Description
FileNotFoundError

If product file does not exist.

Examples:

>>> # Open main calibration (full resolution)
>>> ds_main = cal.open_dataset()
>>> calibration = ds_main["calibration"]
>>> # Open model_3d group (coarse resolution)
>>> ds_model = cal.open_dataset(group="model_3d")
>>> model_up = ds_model["up_down"]
Source code in src/cal_disp/product/_cal.py
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
def open_dataset(self, group: str | None = None) -> xr.Dataset:
    """Open calibration dataset.

    Parameters
    ----------
    group : str or None, optional
        Group to open: None for main, "model_3d" for 3D model.
        Default is None (main group).

    Returns
    -------
    xr.Dataset
        Dataset containing requested group.

    Raises
    ------
    FileNotFoundError
        If product file does not exist.

    Examples
    --------
    >>> # Open main calibration (full resolution)
    >>> ds_main = cal.open_dataset()
    >>> calibration = ds_main["calibration"]

    >>> # Open model_3d group (coarse resolution)
    >>> ds_model = cal.open_dataset(group="model_3d")
    >>> model_up = ds_model["up_down"]

    """
    if not self.path.exists():
        raise FileNotFoundError(f"Product file not found: {self.path}")

    if group == "model_3d":
        return xr.open_dataset(self.path, group="model_3d", engine="h5netcdf")

    return xr.open_dataset(self.path, engine="h5netcdf")

open_model_3d

open_model_3d() -> xr.Dataset

Open model_3d group dataset.

Returns:

Type Description
Dataset

Dataset containing 3D displacement model at coarse resolution.

Raises:

Type Description
FileNotFoundError

If product file does not exist.

ValueError

If model_3d group does not exist.

Examples:

>>> ds_model = cal.open_model_3d()
>>> disp_ns = ds_model["north_south"]
>>> disp_ew = ds_model["east_west"]
>>> disp_up = ds_model["up_down"]
Source code in src/cal_disp/product/_cal.py
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
def open_model_3d(self) -> xr.Dataset:
    """Open model_3d group dataset.

    Returns
    -------
    xr.Dataset
        Dataset containing 3D displacement model at coarse resolution.

    Raises
    ------
    FileNotFoundError
        If product file does not exist.
    ValueError
        If model_3d group does not exist.

    Examples
    --------
    >>> ds_model = cal.open_model_3d()
    >>> disp_ns = ds_model["north_south"]
    >>> disp_ew = ds_model["east_west"]
    >>> disp_up = ds_model["up_down"]

    """
    if not self.path.exists():
        raise FileNotFoundError(f"Product file not found: {self.path}")

    try:
        return xr.open_dataset(self.path, group="model_3d", engine="h5netcdf")
    except (OSError, ValueError) as e:
        raise ValueError(
            f"model_3d group not found in {self.filename}. "
            "Product may not contain 3D displacement model."
        ) from e

to_geotiff

to_geotiff(layer: str, output_path: Path | str, group: str | None = None, compress: str = 'DEFLATE', **kwargs) -> Path

Export layer to GeoTIFF.

Parameters:

Name Type Description Default
layer str

Name of layer to export.

required
output_path Path or str

Output GeoTIFF path.

required
group str or None

Group containing layer. Default is None (main group).

None
compress str

Compression method. Default is "DEFLATE".

'DEFLATE'
**kwargs

Additional rasterio creation options.

{}

Returns:

Type Description
Path

Path to created GeoTIFF.

Examples:

>>> # Export main calibration
>>> cal.to_geotiff("calibration", "calibration.tif")
>>> # Export 3D model component
>>> cal.to_geotiff("up_down", "model_up.tif", group="model_3d")
Source code in src/cal_disp/product/_cal.py
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
def to_geotiff(
    self,
    layer: str,
    output_path: Path | str,
    group: str | None = None,
    compress: str = "DEFLATE",
    **kwargs,
) -> Path:
    """Export layer to GeoTIFF.

    Parameters
    ----------
    layer : str
        Name of layer to export.
    output_path : Path or str
        Output GeoTIFF path.
    group : str or None, optional
        Group containing layer. Default is None (main group).
    compress : str, optional
        Compression method. Default is "DEFLATE".
    **kwargs
        Additional rasterio creation options.

    Returns
    -------
    Path
        Path to created GeoTIFF.

    Examples
    --------
    >>> # Export main calibration
    >>> cal.to_geotiff("calibration", "calibration.tif")

    >>> # Export 3D model component
    >>> cal.to_geotiff("up_down", "model_up.tif", group="model_3d")

    """
    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)

    ds = self.open_dataset(group=group)

    if layer not in ds:
        available = list(ds.data_vars)
        group_str = f" in {group} group" if group else ""
        raise ValueError(
            f"Layer '{layer}' not found{group_str}. Available: {available}"
        )

    da = ds[layer]
    data = da.values

    # Extract spatial information
    if "spatial_ref" in ds:
        transform = self._get_transform(ds)
        crs = self._get_crs(ds)
    else:
        transform = Affine.translation(
            float(ds.x.values[0]),
            float(ds.y.values[0]),
        ) * Affine.scale(
            float(ds.x.values[1] - ds.x.values[0]),
            float(ds.y.values[1] - ds.y.values[0]),
        )
        crs = None

    # Write GeoTIFF
    profile = {
        "driver": "GTiff",
        "height": data.shape[0],
        "width": data.shape[1],
        "count": 1,
        "dtype": np.float32,
        "transform": transform,
        "compress": compress,
        "tiled": True,
        "blockxsize": 512,
        "blockysize": 512,
        **kwargs,
    }

    if crs:
        profile["crs"] = crs

    with rasterio.open(output_path, "w", **profile) as dst:
        dst.write(data.astype(np.float32), 1)
        dst.set_band_description(1, layer)

        # Add OPERA metadata tags
        dst.update_tags(
            product_type=f"OPERA_L4_CAL-DISP-{self.sensor}",
            sensor=self.sensor,
            frame_id=self.frame_id,
            polarization=self.polarization,
            primary_date=self.primary_date.isoformat(),
            secondary_date=self.secondary_date.isoformat(),
            layer=layer,
            group=group if group else "main",
        )

    return output_path

DispProduct dataclass

OPERA DISP-S1 displacement product.

Represents a Level-3 interferometric displacement product from the OPERA DISP-S1 archive. Products contain displacement measurements between two Sentinel-1 acquisition dates.

Parameters:

Name Type Description Default
path Path

Path to the NetCDF product file.

required
frame_id int

OPERA frame identifier (e.g., 8882).

required
primary_date datetime

Earlier acquisition date (reference).

required
secondary_date datetime

Later acquisition date.

required
polarization str

Radar polarization (e.g., "VV", "VH").

required
version str

Product version string (e.g., "1.0").

required
production_date datetime

Date when product was generated.

required
mode str

Acquisition mode (e.g., "IW"). Default is "IW".

'IW'

Examples:

>>> path = Path(
"OPERA_L3_DISP-S1_IW_F08882_VV_20220111T002651Z_20220722T002657Z_v1.0_20251027T005420Z.nc")
>>> product = DispProduct.from_path(path)
>>> product.frame_id
8882

Get reference point

>>> row, col = product.get_reference_point_index()
>>> lat, lon = product.get_reference_point_latlon()
>>> print(f"Reference at ({row}, {col}): {lat:.4f}°N, {lon:.4f}°E")
Source code in src/cal_disp/product/_disp.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
@dataclass
class DispProduct:
    """OPERA DISP-S1 displacement product.

    Represents a Level-3 interferometric displacement product from
    the OPERA DISP-S1 archive. Products contain displacement measurements
    between two Sentinel-1 acquisition dates.

    Parameters
    ----------
    path : Path
        Path to the NetCDF product file.
    frame_id : int
        OPERA frame identifier (e.g., 8882).
    primary_date : datetime
        Earlier acquisition date (reference).
    secondary_date : datetime
        Later acquisition date.
    polarization : str
        Radar polarization (e.g., "VV", "VH").
    version : str
        Product version string (e.g., "1.0").
    production_date : datetime
        Date when product was generated.
    mode : str, optional
        Acquisition mode (e.g., "IW"). Default is "IW".

    Examples
    --------
    >>> path = Path(
    "OPERA_L3_DISP-S1_IW_F08882_VV_20220111T002651Z_20220722T002657Z_v1.0_20251027T005420Z.nc")
    >>> product = DispProduct.from_path(path)
    >>> product.frame_id
    8882

    # Get reference point
    >>> row, col = product.get_reference_point_index()
    >>> lat, lon = product.get_reference_point_latlon()
    >>> print(f"Reference at ({row}, {col}): {lat:.4f}°N, {lon:.4f}°E")

    """

    path: Path
    frame_id: int
    primary_date: datetime
    secondary_date: datetime
    polarization: str
    version: str
    production_date: datetime
    mode: str = "IW"

    # Filename pattern for OPERA DISP-S1 products
    _PATTERN = re.compile(
        r"OPERA_L3_DISP-S1_"
        r"(?P<mode>\w+)_"
        r"F(?P<frame_id>\d+)_"
        r"(?P<pol>\w+)_"
        r"(?P<primary>\d{8}T\d{6}Z)_"
        r"(?P<secondary>\d{8}T\d{6}Z)_"
        r"v(?P<version>[\d.]+)_"
        r"(?P<production>\d{8}T\d{6}Z)"
        r"\.nc$"
    )

    # Main dataset layers
    DISPLACEMENT_LAYERS = [
        "displacement",
        "short_wavelength_displacement",
        "recommended_mask",
        "connected_component_labels",
        "temporal_coherence",
        "estimated_phase_quality",
        "persistent_scatterer_mask",
        "shp_counts",
        "water_mask",
        "phase_similarity",
        "timeseries_inversion_residuals",
    ]

    # Corrections group layers
    CORRECTION_LAYERS = [
        "ionospheric_delay",
        "solid_earth_tide",
        "perpendicular_baseline",
    ]

    def __post_init__(self) -> None:
        """Validate product after construction."""
        self.path = Path(self.path)

        if self.frame_id <= 0:
            raise ValueError(f"frame_id must be positive, got {self.frame_id}")

        if self.secondary_date <= self.primary_date:
            raise ValueError(
                f"Secondary date ({self.secondary_date}) must be after "
                f"primary date ({self.primary_date})"
            )

        if self.polarization not in {"VV", "VH", "HH", "HV"}:
            raise ValueError(f"Invalid polarization: {self.polarization}")

    @classmethod
    def from_path(cls, path: Path | str) -> "DispProduct":
        """Parse product metadata from filename.

        Parameters
        ----------
        path : Path or str
            Path to OPERA DISP-S1 NetCDF file.

        Returns
        -------
        DispProduct
            Parsed product instance.

        Raises
        ------
        ValueError
            If filename doesn't match expected OPERA DISP-S1 format.

        """
        path = Path(path)
        match = cls._PATTERN.match(path.name)

        if not match:
            raise ValueError(
                f"Filename does not match OPERA DISP-S1 pattern: {path.name}"
            )

        return cls(
            path=path,
            frame_id=int(match.group("frame_id")),
            primary_date=datetime.strptime(match.group("primary"), "%Y%m%dT%H%M%SZ"),
            secondary_date=datetime.strptime(
                match.group("secondary"), "%Y%m%dT%H%M%SZ"
            ),
            polarization=match.group("pol"),
            version=match.group("version"),
            production_date=datetime.strptime(
                match.group("production"), "%Y%m%dT%H%M%SZ"
            ),
            mode=match.group("mode"),
        )

    def open_dataset(
        self, group: Literal["main", "corrections"] | None = None
    ) -> xr.Dataset:
        """Open dataset.

        Parameters
        ----------
        group : {"main", "corrections"} or None, optional
            Which group to open. If None, opens main group. Default is None.

        Returns
        -------
        xr.Dataset
            Dataset containing displacement and quality layers.

        Raises
        ------
        FileNotFoundError
            If product file does not exist.

        """
        if not self.path.exists():
            raise FileNotFoundError(f"Product file not found: {self.path}")

        if group == "corrections":
            return xr.open_dataset(self.path, group="corrections", engine="h5netcdf")

        return xr.open_dataset(self.path, engine="h5netcdf")

    def open_corrections(self) -> xr.Dataset:
        """Open corrections group dataset.

        Returns
        -------
        xr.Dataset
            Corrections dataset containing ionospheric delay, solid earth tide, etc.

        Raises
        ------
        FileNotFoundError
            If product file does not exist.

        """
        return self.open_dataset(group="corrections")

    def get_epsg(self) -> int | None:
        """Get EPSG code from spatial reference.

        Returns
        -------
        int or None
            EPSG code if found, None otherwise.

        Examples
        --------
        >>> product.get_epsg()
        32615

        """
        ds = self.open_dataset()
        crs = self._get_crs(ds)

        # Parse EPSG from CRS
        raster_crs = CRS.from_wkt(crs)
        return raster_crs.to_epsg()

    def get_bounds(self) -> dict[str, float]:
        """Get bounds in native projection coordinates.

        Returns
        -------
        dict[str, float]
            Dictionary with keys: left, bottom, right, top.

        Examples
        --------
        >>> bounds = product.get_bounds()
        >>> bounds
        {'left': 71970.0, 'bottom': 3153930.0, 'right': 355890.0, 'top': 3385920.0}

        """
        ds = self.open_dataset()

        x = ds.x.values
        y = ds.y.values

        return {
            "left": float(x.min()),
            "bottom": float(y.min()),
            "right": float(x.max()),
            "top": float(y.max()),
        }

    def get_bounds_wgs84(self) -> dict[str, float]:
        """Get bounds transformed to WGS84 (EPSG:4326).

        Returns
        -------
        dict[str, float]
            Dictionary with keys: west, south, east, north in decimal degrees.

        Examples
        --------
        >>> bounds = product.get_bounds_wgs84()
        >>> bounds
        {'west': -95.567, 'south': 28.486, 'east': -93.212, 'north': 30.845}

        """
        ds = self.open_dataset()

        # Get native bounds
        x = ds.x.values
        y = ds.y.values
        left = float(x.min())
        bottom = float(y.min())
        right = float(x.max())
        top = float(y.max())

        # Get native CRS
        crs_wkt = self._get_crs(ds)
        src_crs = CRS.from_wkt(crs_wkt)

        # Transform to WGS84
        west, south, east, north = transform_bounds(
            src_crs,
            CRS.from_epsg(4326),
            left,
            bottom,
            right,
            top,
        )

        return {
            "west": west,
            "south": south,
            "east": east,
            "north": north,
        }

    def get_reference_point_index(self) -> tuple[int, int]:
        """Get reference point pixel indices.

        The reference point is where the phase was set to zero during
        processing. This is stored in the corrections group.

        Returns
        -------
        tuple[int, int]
            Row and column indices (row, col) of reference point.

        Raises
        ------
        ValueError
            If reference_point variable not found or missing attributes.

        Examples
        --------
        >>> row, col = product.get_reference_point_index()
        >>> print(f"Reference point at pixel ({row}, {col})")

        """
        ds = self.open_corrections()

        if "reference_point" not in ds:
            raise ValueError("reference_point variable not found in corrections group")

        ref_attrs = ds.reference_point.attrs

        if "rows" not in ref_attrs or "cols" not in ref_attrs:
            raise ValueError("reference_point missing 'rows' or 'cols' attributes")

        row = int(ref_attrs["rows"])
        col = int(ref_attrs["cols"])

        return (row, col)

    def get_reference_point_latlon(self) -> tuple[float, float]:
        """Get reference point geographic coordinates.

        Returns latitude and longitude of the reference point in WGS84.

        Returns
        -------
        tuple[float, float]
            Latitude and longitude in decimal degrees (lat, lon).

        Raises
        ------
        ValueError
            If reference_point variable not found or missing attributes.

        Examples
        --------
        >>> lat, lon = product.get_reference_point_latlon()
        >>> print(f"Reference point: {lat:.6f}°N, {lon:.6f}°E")

        """
        ds = self.open_corrections()

        if "reference_point" not in ds:
            raise ValueError("reference_point variable not found in corrections group")

        ref_attrs = ds.reference_point.attrs

        if "latitudes" not in ref_attrs or "longitudes" not in ref_attrs:
            raise ValueError(
                "reference_point missing 'latitudes' or 'longitudes' attributes"
            )

        lat = float(ref_attrs["latitudes"])
        lon = float(ref_attrs["longitudes"])

        return (lat, lon)

    def to_geotiff(
        self,
        layer: str,
        output_path: Path | str,
        group: Literal["main", "corrections"] = "main",
        compress: str = "DEFLATE",
        **kwargs,
    ) -> Path:
        """Export layer to optimized GeoTIFF.

        Parameters
        ----------
        layer : str
            Name of layer to export (e.g., "displacement", "ionospheric_delay").
        output_path : Path or str
            Output GeoTIFF path.
        group : {"main", "corrections"}, optional
            Which group to read from. Default is "main".
        compress : str, optional
            Compression method. Default is "DEFLATE".
        **kwargs
            Additional rasterio creation options.

        Returns
        -------
        Path
            Path to created GeoTIFF.

        Raises
        ------
        ValueError
            If layer not found in specified group.

        Examples
        --------
        >>> product.to_geotiff("displacement", "disp.tif")
        >>> product.to_geotiff("ionospheric_delay", "iono.tif", group="corrections")

        """
        output_path = Path(output_path)
        output_path.parent.mkdir(parents=True, exist_ok=True)

        # Open appropriate dataset
        ds = self.open_dataset(group=group if group == "corrections" else None)

        if layer not in ds:
            available = list(ds.data_vars)
            raise ValueError(
                f"Layer '{layer}' not found in {group} group. "
                f"Available layers: {available}"
            )

        # Get data array
        da = ds[layer]

        # Extract transform from spatial_ref
        transform = self._get_transform(ds)

        # Get CRS
        crs = self._get_crs(ds)

        # Prepare data - handle (y, x) or (time, y, x) shapes
        if da.ndim == 3:
            # Take first time slice if 3D
            data = da.isel(time=0).values
        else:
            data = da.values

        # Write GeoTIFF
        profile = {
            "driver": "GTiff",
            "height": data.shape[0],
            "width": data.shape[1],
            "count": 1,
            "dtype": data.dtype,
            "crs": crs,
            "transform": transform,
            "compress": compress,
            "tiled": True,
            "blockxsize": 512,
            "blockysize": 512,
            **kwargs,
        }

        with rasterio.open(output_path, "w", **profile) as dst:
            dst.write(data, 1)
            dst.set_band_description(1, layer)

        return output_path

    def _get_transform(self, ds: xr.Dataset) -> Affine:
        """Extract affine transform from dataset."""
        gt = ds.spatial_ref.attrs.get("GeoTransform")
        if gt is None:
            raise ValueError("No GeoTransform found in spatial_ref")

        # Parse string like "71970.0 30.0 0.0 3385920.0 0.0 -30.0"
        vals = [float(x) for x in gt.split()]
        return Affine(vals[1], vals[2], vals[0], vals[4], vals[5], vals[3])

    def _get_crs(self, ds: xr.Dataset) -> str:
        """Extract CRS from dataset."""
        crs_wkt = ds.spatial_ref.attrs.get("crs_wkt")
        if crs_wkt is None:
            raise ValueError("No crs_wkt found in spatial_ref")
        return crs_wkt

    @property
    def baseline_days(self) -> int:
        """Temporal baseline in days between acquisitions."""
        return (self.secondary_date - self.primary_date).days

    @property
    def filename(self) -> str:
        """Product filename."""
        return self.path.name

    @property
    def exists(self) -> bool:
        """Check if product file exists on disk."""
        return self.path.exists()

    def __repr__(self) -> str:
        """Concise string representation."""
        return (
            f"DispProduct(frame={self.frame_id}, "
            f"{self.primary_date.date()}{self.secondary_date.date()}, "
            f"{self.polarization})"
        )

baseline_days property

baseline_days: int

Temporal baseline in days between acquisitions.

exists property

exists: bool

Check if product file exists on disk.

filename property

filename: str

Product filename.

from_path classmethod

from_path(path: Path | str) -> DispProduct

Parse product metadata from filename.

Parameters:

Name Type Description Default
path Path or str

Path to OPERA DISP-S1 NetCDF file.

required

Returns:

Type Description
DispProduct

Parsed product instance.

Raises:

Type Description
ValueError

If filename doesn't match expected OPERA DISP-S1 format.

Source code in src/cal_disp/product/_disp.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
@classmethod
def from_path(cls, path: Path | str) -> "DispProduct":
    """Parse product metadata from filename.

    Parameters
    ----------
    path : Path or str
        Path to OPERA DISP-S1 NetCDF file.

    Returns
    -------
    DispProduct
        Parsed product instance.

    Raises
    ------
    ValueError
        If filename doesn't match expected OPERA DISP-S1 format.

    """
    path = Path(path)
    match = cls._PATTERN.match(path.name)

    if not match:
        raise ValueError(
            f"Filename does not match OPERA DISP-S1 pattern: {path.name}"
        )

    return cls(
        path=path,
        frame_id=int(match.group("frame_id")),
        primary_date=datetime.strptime(match.group("primary"), "%Y%m%dT%H%M%SZ"),
        secondary_date=datetime.strptime(
            match.group("secondary"), "%Y%m%dT%H%M%SZ"
        ),
        polarization=match.group("pol"),
        version=match.group("version"),
        production_date=datetime.strptime(
            match.group("production"), "%Y%m%dT%H%M%SZ"
        ),
        mode=match.group("mode"),
    )

get_bounds

get_bounds() -> dict[str, float]

Get bounds in native projection coordinates.

Returns:

Type Description
dict[str, float]

Dictionary with keys: left, bottom, right, top.

Examples:

>>> bounds = product.get_bounds()
>>> bounds
{'left': 71970.0, 'bottom': 3153930.0, 'right': 355890.0, 'top': 3385920.0}
Source code in src/cal_disp/product/_disp.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def get_bounds(self) -> dict[str, float]:
    """Get bounds in native projection coordinates.

    Returns
    -------
    dict[str, float]
        Dictionary with keys: left, bottom, right, top.

    Examples
    --------
    >>> bounds = product.get_bounds()
    >>> bounds
    {'left': 71970.0, 'bottom': 3153930.0, 'right': 355890.0, 'top': 3385920.0}

    """
    ds = self.open_dataset()

    x = ds.x.values
    y = ds.y.values

    return {
        "left": float(x.min()),
        "bottom": float(y.min()),
        "right": float(x.max()),
        "top": float(y.max()),
    }

get_bounds_wgs84

get_bounds_wgs84() -> dict[str, float]

Get bounds transformed to WGS84 (EPSG:4326).

Returns:

Type Description
dict[str, float]

Dictionary with keys: west, south, east, north in decimal degrees.

Examples:

>>> bounds = product.get_bounds_wgs84()
>>> bounds
{'west': -95.567, 'south': 28.486, 'east': -93.212, 'north': 30.845}
Source code in src/cal_disp/product/_disp.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def get_bounds_wgs84(self) -> dict[str, float]:
    """Get bounds transformed to WGS84 (EPSG:4326).

    Returns
    -------
    dict[str, float]
        Dictionary with keys: west, south, east, north in decimal degrees.

    Examples
    --------
    >>> bounds = product.get_bounds_wgs84()
    >>> bounds
    {'west': -95.567, 'south': 28.486, 'east': -93.212, 'north': 30.845}

    """
    ds = self.open_dataset()

    # Get native bounds
    x = ds.x.values
    y = ds.y.values
    left = float(x.min())
    bottom = float(y.min())
    right = float(x.max())
    top = float(y.max())

    # Get native CRS
    crs_wkt = self._get_crs(ds)
    src_crs = CRS.from_wkt(crs_wkt)

    # Transform to WGS84
    west, south, east, north = transform_bounds(
        src_crs,
        CRS.from_epsg(4326),
        left,
        bottom,
        right,
        top,
    )

    return {
        "west": west,
        "south": south,
        "east": east,
        "north": north,
    }

get_epsg

get_epsg() -> int | None

Get EPSG code from spatial reference.

Returns:

Type Description
int or None

EPSG code if found, None otherwise.

Examples:

>>> product.get_epsg()
32615
Source code in src/cal_disp/product/_disp.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def get_epsg(self) -> int | None:
    """Get EPSG code from spatial reference.

    Returns
    -------
    int or None
        EPSG code if found, None otherwise.

    Examples
    --------
    >>> product.get_epsg()
    32615

    """
    ds = self.open_dataset()
    crs = self._get_crs(ds)

    # Parse EPSG from CRS
    raster_crs = CRS.from_wkt(crs)
    return raster_crs.to_epsg()

get_reference_point_index

get_reference_point_index() -> tuple[int, int]

Get reference point pixel indices.

The reference point is where the phase was set to zero during processing. This is stored in the corrections group.

Returns:

Type Description
tuple[int, int]

Row and column indices (row, col) of reference point.

Raises:

Type Description
ValueError

If reference_point variable not found or missing attributes.

Examples:

>>> row, col = product.get_reference_point_index()
>>> print(f"Reference point at pixel ({row}, {col})")
Source code in src/cal_disp/product/_disp.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
def get_reference_point_index(self) -> tuple[int, int]:
    """Get reference point pixel indices.

    The reference point is where the phase was set to zero during
    processing. This is stored in the corrections group.

    Returns
    -------
    tuple[int, int]
        Row and column indices (row, col) of reference point.

    Raises
    ------
    ValueError
        If reference_point variable not found or missing attributes.

    Examples
    --------
    >>> row, col = product.get_reference_point_index()
    >>> print(f"Reference point at pixel ({row}, {col})")

    """
    ds = self.open_corrections()

    if "reference_point" not in ds:
        raise ValueError("reference_point variable not found in corrections group")

    ref_attrs = ds.reference_point.attrs

    if "rows" not in ref_attrs or "cols" not in ref_attrs:
        raise ValueError("reference_point missing 'rows' or 'cols' attributes")

    row = int(ref_attrs["rows"])
    col = int(ref_attrs["cols"])

    return (row, col)

get_reference_point_latlon

get_reference_point_latlon() -> tuple[float, float]

Get reference point geographic coordinates.

Returns latitude and longitude of the reference point in WGS84.

Returns:

Type Description
tuple[float, float]

Latitude and longitude in decimal degrees (lat, lon).

Raises:

Type Description
ValueError

If reference_point variable not found or missing attributes.

Examples:

>>> lat, lon = product.get_reference_point_latlon()
>>> print(f"Reference point: {lat:.6f}°N, {lon:.6f}°E")
Source code in src/cal_disp/product/_disp.py
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
def get_reference_point_latlon(self) -> tuple[float, float]:
    """Get reference point geographic coordinates.

    Returns latitude and longitude of the reference point in WGS84.

    Returns
    -------
    tuple[float, float]
        Latitude and longitude in decimal degrees (lat, lon).

    Raises
    ------
    ValueError
        If reference_point variable not found or missing attributes.

    Examples
    --------
    >>> lat, lon = product.get_reference_point_latlon()
    >>> print(f"Reference point: {lat:.6f}°N, {lon:.6f}°E")

    """
    ds = self.open_corrections()

    if "reference_point" not in ds:
        raise ValueError("reference_point variable not found in corrections group")

    ref_attrs = ds.reference_point.attrs

    if "latitudes" not in ref_attrs or "longitudes" not in ref_attrs:
        raise ValueError(
            "reference_point missing 'latitudes' or 'longitudes' attributes"
        )

    lat = float(ref_attrs["latitudes"])
    lon = float(ref_attrs["longitudes"])

    return (lat, lon)

open_corrections

open_corrections() -> xr.Dataset

Open corrections group dataset.

Returns:

Type Description
Dataset

Corrections dataset containing ionospheric delay, solid earth tide, etc.

Raises:

Type Description
FileNotFoundError

If product file does not exist.

Source code in src/cal_disp/product/_disp.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def open_corrections(self) -> xr.Dataset:
    """Open corrections group dataset.

    Returns
    -------
    xr.Dataset
        Corrections dataset containing ionospheric delay, solid earth tide, etc.

    Raises
    ------
    FileNotFoundError
        If product file does not exist.

    """
    return self.open_dataset(group="corrections")

open_dataset

open_dataset(group: Literal['main', 'corrections'] | None = None) -> xr.Dataset

Open dataset.

Parameters:

Name Type Description Default
group (main, corrections)

Which group to open. If None, opens main group. Default is None.

"main"

Returns:

Type Description
Dataset

Dataset containing displacement and quality layers.

Raises:

Type Description
FileNotFoundError

If product file does not exist.

Source code in src/cal_disp/product/_disp.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def open_dataset(
    self, group: Literal["main", "corrections"] | None = None
) -> xr.Dataset:
    """Open dataset.

    Parameters
    ----------
    group : {"main", "corrections"} or None, optional
        Which group to open. If None, opens main group. Default is None.

    Returns
    -------
    xr.Dataset
        Dataset containing displacement and quality layers.

    Raises
    ------
    FileNotFoundError
        If product file does not exist.

    """
    if not self.path.exists():
        raise FileNotFoundError(f"Product file not found: {self.path}")

    if group == "corrections":
        return xr.open_dataset(self.path, group="corrections", engine="h5netcdf")

    return xr.open_dataset(self.path, engine="h5netcdf")

to_geotiff

to_geotiff(layer: str, output_path: Path | str, group: Literal['main', 'corrections'] = 'main', compress: str = 'DEFLATE', **kwargs) -> Path

Export layer to optimized GeoTIFF.

Parameters:

Name Type Description Default
layer str

Name of layer to export (e.g., "displacement", "ionospheric_delay").

required
output_path Path or str

Output GeoTIFF path.

required
group (main, corrections)

Which group to read from. Default is "main".

"main"
compress str

Compression method. Default is "DEFLATE".

'DEFLATE'
**kwargs

Additional rasterio creation options.

{}

Returns:

Type Description
Path

Path to created GeoTIFF.

Raises:

Type Description
ValueError

If layer not found in specified group.

Examples:

>>> product.to_geotiff("displacement", "disp.tif")
>>> product.to_geotiff("ionospheric_delay", "iono.tif", group="corrections")
Source code in src/cal_disp/product/_disp.py
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
def to_geotiff(
    self,
    layer: str,
    output_path: Path | str,
    group: Literal["main", "corrections"] = "main",
    compress: str = "DEFLATE",
    **kwargs,
) -> Path:
    """Export layer to optimized GeoTIFF.

    Parameters
    ----------
    layer : str
        Name of layer to export (e.g., "displacement", "ionospheric_delay").
    output_path : Path or str
        Output GeoTIFF path.
    group : {"main", "corrections"}, optional
        Which group to read from. Default is "main".
    compress : str, optional
        Compression method. Default is "DEFLATE".
    **kwargs
        Additional rasterio creation options.

    Returns
    -------
    Path
        Path to created GeoTIFF.

    Raises
    ------
    ValueError
        If layer not found in specified group.

    Examples
    --------
    >>> product.to_geotiff("displacement", "disp.tif")
    >>> product.to_geotiff("ionospheric_delay", "iono.tif", group="corrections")

    """
    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)

    # Open appropriate dataset
    ds = self.open_dataset(group=group if group == "corrections" else None)

    if layer not in ds:
        available = list(ds.data_vars)
        raise ValueError(
            f"Layer '{layer}' not found in {group} group. "
            f"Available layers: {available}"
        )

    # Get data array
    da = ds[layer]

    # Extract transform from spatial_ref
    transform = self._get_transform(ds)

    # Get CRS
    crs = self._get_crs(ds)

    # Prepare data - handle (y, x) or (time, y, x) shapes
    if da.ndim == 3:
        # Take first time slice if 3D
        data = da.isel(time=0).values
    else:
        data = da.values

    # Write GeoTIFF
    profile = {
        "driver": "GTiff",
        "height": data.shape[0],
        "width": data.shape[1],
        "count": 1,
        "dtype": data.dtype,
        "crs": crs,
        "transform": transform,
        "compress": compress,
        "tiled": True,
        "blockxsize": 512,
        "blockysize": 512,
        **kwargs,
    }

    with rasterio.open(output_path, "w", **profile) as dst:
        dst.write(data, 1)
        dst.set_band_description(1, layer)

    return output_path

StaticLayer dataclass

OPERA DISP-S1-STATIC layer.

Represents a single static layer (DEM, incidence angle, LOS vectors, etc.) used as input for DISP-S1 processing. These are frame-specific GeoTIFF files that don't change over time.

Examples:

>>> path = Path("OPERA_L3_DISP-S1-STATIC_F08882_20140403_S1A_v1.0_dem.tif")
>>> layer = StaticLayer.from_path(path)
>>> layer.frame_id
8882

Read LOS components

>>> los_layer = StaticLayer.from_path("..._line_of_sight_enu.tif")
>>> bands = los_layer.read_bands()
>>> east, north, up = bands[0], bands[1], bands[2]
Source code in src/cal_disp/product/_static.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
@dataclass
class StaticLayer:
    """OPERA DISP-S1-STATIC layer.

    Represents a single static layer (DEM, incidence angle, LOS vectors, etc.)
    used as input for DISP-S1 processing. These are frame-specific GeoTIFF
    files that don't change over time.

    Examples
    --------
    >>> path = Path("OPERA_L3_DISP-S1-STATIC_F08882_20140403_S1A_v1.0_dem.tif")
    >>> layer = StaticLayer.from_path(path)
    >>> layer.frame_id
    8882

    # Read LOS components
    >>> los_layer = StaticLayer.from_path("..._line_of_sight_enu.tif")
    >>> bands = los_layer.read_bands()
    >>> east, north, up = bands[0], bands[1], bands[2]

    """

    path: Path
    frame_id: int
    reference_date: datetime
    satellite: str
    version: str
    layer_type: str

    _PATTERN = re.compile(
        r"OPERA_L3_DISP-S1-STATIC_"
        r"F(?P<frame_id>\d+)_"
        r"(?P<date>\d{8})_"
        r"(?P<satellite>S1[AB])_"
        r"v(?P<version>[\d.]+)_"
        r"(?P<layer>[\w_]+)"
        r"\.tif$"
    )

    LAYER_TYPES = [
        "dem",
        "line_of_sight_enu",
        "layover_shadow_mask",
    ]

    def __post_init__(self) -> None:
        """Validate layer after construction."""
        self.path = Path(self.path)

        if self.frame_id <= 0:
            raise ValueError(f"frame_id must be positive, got {self.frame_id}")

    @classmethod
    def from_path(cls, path: Path | str) -> "StaticLayer":
        """Parse layer metadata from filename."""
        path = Path(path)
        match = cls._PATTERN.match(path.name)

        if not match:
            raise ValueError(
                f"Filename does not match OPERA DISP-S1-STATIC pattern: {path.name}"
            )

        return cls(
            path=path,
            frame_id=int(match.group("frame_id")),
            reference_date=datetime.strptime(match.group("date"), "%Y%m%d"),
            satellite=match.group("satellite"),
            version=match.group("version"),
            layer_type=match.group("layer"),
        )

    @property
    def num_bands(self) -> int:
        """Get number of bands in layer."""
        if not self.path.exists():
            raise FileNotFoundError(f"Layer file not found: {self.path}")

        with rasterio.open(self.path) as src:
            return src.count

    def read(self, band: int = 1, masked: bool = True) -> np.ndarray:
        """Read single band data."""
        if not self.path.exists():
            raise FileNotFoundError(f"Layer file not found: {self.path}")

        with rasterio.open(self.path) as src:
            if band < 1 or band > src.count:
                raise ValueError(
                    f"Band {band} out of range. File has {src.count} bands."
                )

            data = src.read(band)

            if masked and src.nodata is not None:
                data = np.ma.masked_equal(data, src.nodata)

        return data

    def read_bands(self, masked: bool = True) -> list[np.ndarray]:
        """Read all bands.

        Parameters
        ----------
        masked : bool, optional
            If True, return masked arrays with nodata values masked.
            Default is True.

        Returns
        -------
        list[np.ndarray]
            List of arrays, one per band.

        Examples
        --------
        >>> # Read DEM (single band)
        >>> dem_layer = StaticLayer.from_path("..._dem.tif")
        >>> bands = dem_layer.read_bands()
        >>> dem = bands[0]

        >>> # Read LOS vectors (three bands)
        >>> los_layer = StaticLayer.from_path("..._line_of_sight_enu.tif")
        >>> bands = los_layer.read_bands()
        >>> east, north, up = bands[0], bands[1], bands[2]

        """
        if not self.path.exists():
            raise FileNotFoundError(f"Layer file not found: {self.path}")

        with rasterio.open(self.path) as src:
            bands = []
            for band_idx in range(1, src.count + 1):
                data = src.read(band_idx)
                if masked and src.nodata is not None:
                    data = np.ma.masked_equal(data, src.nodata)
                bands.append(data)

        return bands

    def to_dataset(self) -> xr.Dataset:
        """Convert raster to xarray Dataset."""
        # Get shape and transform using existing methods
        height, width = self.get_shape()
        transform = self.get_transform()

        # Generate x and y coordinates from transform
        x_coords = np.arange(width) * transform[0] + transform[2] + transform[0] / 2
        y_coords = np.arange(height) * transform[4] + transform[5] + transform[4] / 2

        # Create coordinates dict
        coords = {
            "y": (["y"], y_coords),
            "x": (["x"], x_coords),
        }

        # Create data variables
        data_vars = {}
        nodata = self.get_nodata()

        if self.num_bands == 1:
            # Single band - use layer_type as variable name
            data = self.read(band=1, masked=False)
            if nodata is not None:
                data = np.where(data == nodata, np.nan, data)
            data_vars[self.layer_type] = (["y", "x"], data)

        elif self.layer_type == "line_of_sight_enu" and self.num_bands == 3:
            # Special case: LOS vectors - create three variables
            band_names = ["los_east", "los_north", "los_up"]
            bands = self.read_bands(masked=False)

            for name, data in zip(band_names, bands):
                if nodata is not None:
                    data = np.where(data == nodata, np.nan, data)
                data_vars[name] = (["y", "x"], data)

        else:
            # Generic multi-band: use band dimension
            bands = self.read_bands(masked=False)
            all_data = np.stack(bands)

            if nodata is not None:
                all_data = np.where(all_data == nodata, np.nan, all_data)

            coords["band"] = (["band"], np.arange(1, self.num_bands + 1))
            data_vars[self.layer_type] = (["band", "y", "x"], all_data)

        # Create dataset
        ds = xr.Dataset(data_vars=data_vars, coords=coords)

        # Add attributes
        ds.attrs["frame_id"] = self.frame_id
        ds.attrs["satellite"] = self.satellite
        ds.attrs["version"] = self.version
        ds.attrs["reference_date"] = self.reference_date.isoformat()
        ds.attrs["layer_type"] = self.layer_type

        # Add CRS information using existing method
        crs = self.get_crs()
        if crs:
            ds.attrs["crs_wkt"] = crs.to_wkt()
            epsg = self.get_epsg()
            if epsg:
                ds.attrs["epsg"] = epsg

        # Add transform
        ds.attrs["transform"] = list(transform)
        ds.attrs["nodata"] = nodata

        # Add coordinate attributes
        ds["x"].attrs["standard_name"] = "projection_x_coordinate"
        ds["x"].attrs["long_name"] = "x coordinate of projection"
        ds["x"].attrs["units"] = "m"

        ds["y"].attrs["standard_name"] = "projection_y_coordinate"
        ds["y"].attrs["long_name"] = "y coordinate of projection"
        ds["y"].attrs["units"] = "m"

        # Add variable-specific attributes
        if self.layer_type == "dem":
            ds["dem"].attrs["units"] = "m"
            ds["dem"].attrs["long_name"] = "Digital Elevation Model"
        elif self.layer_type == "line_of_sight_enu":
            ds["los_east"].attrs["long_name"] = "LOS unit vector - East component"
            ds["los_north"].attrs["long_name"] = "LOS unit vector - North component"
            ds["los_up"].attrs["long_name"] = "LOS unit vector - Up component"
        elif self.layer_type == "layover_shadow_mask":
            ds["layover_shadow_mask"].attrs["long_name"] = "Layover and Shadow Mask"
            ds["layover_shadow_mask"].attrs[
                "description"
            ] = "0=valid, 1=layover, 2=shadow"

        return ds

    def compute_incidence_angle(
        self,
        fill_value: float = 0.0,
        dtype: np.dtype = np.float32,
    ) -> np.ndarray:
        """Compute incidence angle from LOS up component.

        Only valid for line_of_sight_enu layers.

        Parameters
        ----------
        fill_value : float, optional
            Value to use for masked/invalid pixels. Default is 0.0.
        dtype : np.dtype, optional
            Output data type. Default is np.float32.

        Returns
        -------
        np.ndarray
            Incidence angle in degrees (0-90°).

        Raises
        ------
        ValueError
            If not a line_of_sight_enu layer or wrong number of bands.

        """
        if self.layer_type != "line_of_sight_enu":
            raise ValueError(
                "compute_incidence_angle() only valid for line_of_sight_enu layers, "
                f"got {self.layer_type}"
            )

        if self.num_bands != 3:
            raise ValueError(f"Expected 3 bands for LOS ENU, got {self.num_bands}")

        # Get LOS up component (band 3)
        bands = self.read_bands()
        los_up = bands[2]

        # Handle masked arrays
        if isinstance(los_up, np.ma.MaskedArray):
            los_up_data = los_up.data
            mask = los_up.mask
        else:
            los_up_data = los_up
            mask = None

        # Clip to valid range [-1, 1] to avoid arccos domain errors
        los_up_clipped = np.clip(los_up_data, -1.0, 1.0)

        # Compute incidence angle in degrees
        incidence_angle = np.rad2deg(np.arccos(los_up_clipped))

        # Apply mask if present
        if mask is not None:
            incidence_angle = np.where(mask, fill_value, incidence_angle)

        return incidence_angle.astype(dtype)

    def export_incidence_angle(
        self,
        output_path: Path | str,
        fill_value: float = 0.0,
        nodata: float | None = 0.0,
        compress: str = "DEFLATE",
        **kwargs,
    ) -> Path:
        """Compute and export incidence angle to GeoTIFF."""
        if self.layer_type != "line_of_sight_enu":
            raise ValueError(
                "export_incidence_angle() only valid for line_of_sight_enu layers, "
                f"got {self.layer_type}"
            )

        output_path = Path(output_path)
        output_path.parent.mkdir(parents=True, exist_ok=True)

        # Compute incidence angle
        incidence_angle = self.compute_incidence_angle(fill_value=fill_value)

        # Write GeoTIFF
        profile = {
            "driver": "GTiff",
            "height": incidence_angle.shape[0],
            "width": incidence_angle.shape[1],
            "count": 1,
            "dtype": incidence_angle.dtype,
            "crs": self.get_crs(),
            "transform": self.get_transform(),
            "nodata": nodata,
            "compress": compress,
            "tiled": True,
            "blockxsize": 512,
            "blockysize": 512,
            **kwargs,
        }

        with rasterio.open(output_path, "w", **profile) as dst:
            dst.write(incidence_angle, 1)
            dst.set_band_description(1, "Incidence angle (degrees)")
            dst.update_tags(
                1,
                units="degrees",
                description="Incidence angle computed from LOS up component",
            )

        return output_path

    def get_profile(self) -> dict:
        """Get rasterio profile."""
        if not self.path.exists():
            raise FileNotFoundError(f"Layer file not found: {self.path}")

        with rasterio.open(self.path) as src:
            return src.profile

    def get_transform(self) -> Affine:
        """Get affine transform."""
        if not self.path.exists():
            raise FileNotFoundError(f"Layer file not found: {self.path}")

        with rasterio.open(self.path) as src:
            return src.transform

    def get_crs(self) -> CRS:
        """Get coordinate reference system."""
        if not self.path.exists():
            raise FileNotFoundError(f"Layer file not found: {self.path}")

        with rasterio.open(self.path) as src:
            return src.crs

    def get_epsg(self) -> int | None:
        """Get EPSG code."""
        crs = self.get_crs()
        return crs.to_epsg() if crs else None

    def get_bounds(self) -> dict[str, float]:
        """Get bounds in native projection."""
        if not self.path.exists():
            raise FileNotFoundError(f"Layer file not found: {self.path}")

        with rasterio.open(self.path) as src:
            bounds = src.bounds
            return {
                "left": bounds.left,
                "bottom": bounds.bottom,
                "right": bounds.right,
                "top": bounds.top,
            }

    def get_bounds_wgs84(self) -> dict[str, float]:
        """Get bounds transformed to WGS84."""
        if not self.path.exists():
            raise FileNotFoundError(f"Layer file not found: {self.path}")

        with rasterio.open(self.path) as src:
            bounds = src.bounds
            west, south, east, north = transform_bounds(
                src.crs,
                CRS.from_epsg(4326),
                bounds.left,
                bounds.bottom,
                bounds.right,
                bounds.top,
            )

        return {
            "west": west,
            "south": south,
            "east": east,
            "north": north,
        }

    def get_shape(self) -> tuple[int, int]:
        """Get array shape."""
        if not self.path.exists():
            raise FileNotFoundError(f"Layer file not found: {self.path}")

        with rasterio.open(self.path) as src:
            return (src.height, src.width)

    def get_nodata(self) -> float | None:
        """Get nodata value."""
        if not self.path.exists():
            raise FileNotFoundError(f"Layer file not found: {self.path}")

        with rasterio.open(self.path) as src:
            return src.nodata

    @property
    def filename(self) -> str:
        """Layer filename."""
        return self.path.name

    @property
    def exists(self) -> bool:
        """Check if layer file exists."""
        return self.path.exists()

    def __repr__(self) -> str:
        """Concise string representation."""
        band_info = f", bands={self.num_bands}" if self.exists else ""
        return f"StaticLayer(frame={self.frame_id}, layer={self.layer_type}{band_info})"

exists property

exists: bool

Check if layer file exists.

filename property

filename: str

Layer filename.

num_bands property

num_bands: int

Get number of bands in layer.

compute_incidence_angle

compute_incidence_angle(fill_value: float = 0.0, dtype: dtype = np.float32) -> np.ndarray

Compute incidence angle from LOS up component.

Only valid for line_of_sight_enu layers.

Parameters:

Name Type Description Default
fill_value float

Value to use for masked/invalid pixels. Default is 0.0.

0.0
dtype dtype

Output data type. Default is np.float32.

float32

Returns:

Type Description
ndarray

Incidence angle in degrees (0-90°).

Raises:

Type Description
ValueError

If not a line_of_sight_enu layer or wrong number of bands.

Source code in src/cal_disp/product/_static.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
def compute_incidence_angle(
    self,
    fill_value: float = 0.0,
    dtype: np.dtype = np.float32,
) -> np.ndarray:
    """Compute incidence angle from LOS up component.

    Only valid for line_of_sight_enu layers.

    Parameters
    ----------
    fill_value : float, optional
        Value to use for masked/invalid pixels. Default is 0.0.
    dtype : np.dtype, optional
        Output data type. Default is np.float32.

    Returns
    -------
    np.ndarray
        Incidence angle in degrees (0-90°).

    Raises
    ------
    ValueError
        If not a line_of_sight_enu layer or wrong number of bands.

    """
    if self.layer_type != "line_of_sight_enu":
        raise ValueError(
            "compute_incidence_angle() only valid for line_of_sight_enu layers, "
            f"got {self.layer_type}"
        )

    if self.num_bands != 3:
        raise ValueError(f"Expected 3 bands for LOS ENU, got {self.num_bands}")

    # Get LOS up component (band 3)
    bands = self.read_bands()
    los_up = bands[2]

    # Handle masked arrays
    if isinstance(los_up, np.ma.MaskedArray):
        los_up_data = los_up.data
        mask = los_up.mask
    else:
        los_up_data = los_up
        mask = None

    # Clip to valid range [-1, 1] to avoid arccos domain errors
    los_up_clipped = np.clip(los_up_data, -1.0, 1.0)

    # Compute incidence angle in degrees
    incidence_angle = np.rad2deg(np.arccos(los_up_clipped))

    # Apply mask if present
    if mask is not None:
        incidence_angle = np.where(mask, fill_value, incidence_angle)

    return incidence_angle.astype(dtype)

export_incidence_angle

export_incidence_angle(output_path: Path | str, fill_value: float = 0.0, nodata: float | None = 0.0, compress: str = 'DEFLATE', **kwargs) -> Path

Compute and export incidence angle to GeoTIFF.

Source code in src/cal_disp/product/_static.py
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
def export_incidence_angle(
    self,
    output_path: Path | str,
    fill_value: float = 0.0,
    nodata: float | None = 0.0,
    compress: str = "DEFLATE",
    **kwargs,
) -> Path:
    """Compute and export incidence angle to GeoTIFF."""
    if self.layer_type != "line_of_sight_enu":
        raise ValueError(
            "export_incidence_angle() only valid for line_of_sight_enu layers, "
            f"got {self.layer_type}"
        )

    output_path = Path(output_path)
    output_path.parent.mkdir(parents=True, exist_ok=True)

    # Compute incidence angle
    incidence_angle = self.compute_incidence_angle(fill_value=fill_value)

    # Write GeoTIFF
    profile = {
        "driver": "GTiff",
        "height": incidence_angle.shape[0],
        "width": incidence_angle.shape[1],
        "count": 1,
        "dtype": incidence_angle.dtype,
        "crs": self.get_crs(),
        "transform": self.get_transform(),
        "nodata": nodata,
        "compress": compress,
        "tiled": True,
        "blockxsize": 512,
        "blockysize": 512,
        **kwargs,
    }

    with rasterio.open(output_path, "w", **profile) as dst:
        dst.write(incidence_angle, 1)
        dst.set_band_description(1, "Incidence angle (degrees)")
        dst.update_tags(
            1,
            units="degrees",
            description="Incidence angle computed from LOS up component",
        )

    return output_path

from_path classmethod

from_path(path: Path | str) -> StaticLayer

Parse layer metadata from filename.

Source code in src/cal_disp/product/_static.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@classmethod
def from_path(cls, path: Path | str) -> "StaticLayer":
    """Parse layer metadata from filename."""
    path = Path(path)
    match = cls._PATTERN.match(path.name)

    if not match:
        raise ValueError(
            f"Filename does not match OPERA DISP-S1-STATIC pattern: {path.name}"
        )

    return cls(
        path=path,
        frame_id=int(match.group("frame_id")),
        reference_date=datetime.strptime(match.group("date"), "%Y%m%d"),
        satellite=match.group("satellite"),
        version=match.group("version"),
        layer_type=match.group("layer"),
    )

get_bounds

get_bounds() -> dict[str, float]

Get bounds in native projection.

Source code in src/cal_disp/product/_static.py
386
387
388
389
390
391
392
393
394
395
396
397
398
def get_bounds(self) -> dict[str, float]:
    """Get bounds in native projection."""
    if not self.path.exists():
        raise FileNotFoundError(f"Layer file not found: {self.path}")

    with rasterio.open(self.path) as src:
        bounds = src.bounds
        return {
            "left": bounds.left,
            "bottom": bounds.bottom,
            "right": bounds.right,
            "top": bounds.top,
        }

get_bounds_wgs84

get_bounds_wgs84() -> dict[str, float]

Get bounds transformed to WGS84.

Source code in src/cal_disp/product/_static.py
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
def get_bounds_wgs84(self) -> dict[str, float]:
    """Get bounds transformed to WGS84."""
    if not self.path.exists():
        raise FileNotFoundError(f"Layer file not found: {self.path}")

    with rasterio.open(self.path) as src:
        bounds = src.bounds
        west, south, east, north = transform_bounds(
            src.crs,
            CRS.from_epsg(4326),
            bounds.left,
            bounds.bottom,
            bounds.right,
            bounds.top,
        )

    return {
        "west": west,
        "south": south,
        "east": east,
        "north": north,
    }

get_crs

get_crs() -> CRS

Get coordinate reference system.

Source code in src/cal_disp/product/_static.py
373
374
375
376
377
378
379
def get_crs(self) -> CRS:
    """Get coordinate reference system."""
    if not self.path.exists():
        raise FileNotFoundError(f"Layer file not found: {self.path}")

    with rasterio.open(self.path) as src:
        return src.crs

get_epsg

get_epsg() -> int | None

Get EPSG code.

Source code in src/cal_disp/product/_static.py
381
382
383
384
def get_epsg(self) -> int | None:
    """Get EPSG code."""
    crs = self.get_crs()
    return crs.to_epsg() if crs else None

get_nodata

get_nodata() -> float | None

Get nodata value.

Source code in src/cal_disp/product/_static.py
431
432
433
434
435
436
437
def get_nodata(self) -> float | None:
    """Get nodata value."""
    if not self.path.exists():
        raise FileNotFoundError(f"Layer file not found: {self.path}")

    with rasterio.open(self.path) as src:
        return src.nodata

get_profile

get_profile() -> dict

Get rasterio profile.

Source code in src/cal_disp/product/_static.py
357
358
359
360
361
362
363
def get_profile(self) -> dict:
    """Get rasterio profile."""
    if not self.path.exists():
        raise FileNotFoundError(f"Layer file not found: {self.path}")

    with rasterio.open(self.path) as src:
        return src.profile

get_shape

get_shape() -> tuple[int, int]

Get array shape.

Source code in src/cal_disp/product/_static.py
423
424
425
426
427
428
429
def get_shape(self) -> tuple[int, int]:
    """Get array shape."""
    if not self.path.exists():
        raise FileNotFoundError(f"Layer file not found: {self.path}")

    with rasterio.open(self.path) as src:
        return (src.height, src.width)

get_transform

get_transform() -> Affine

Get affine transform.

Source code in src/cal_disp/product/_static.py
365
366
367
368
369
370
371
def get_transform(self) -> Affine:
    """Get affine transform."""
    if not self.path.exists():
        raise FileNotFoundError(f"Layer file not found: {self.path}")

    with rasterio.open(self.path) as src:
        return src.transform

read

read(band: int = 1, masked: bool = True) -> np.ndarray

Read single band data.

Source code in src/cal_disp/product/_static.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def read(self, band: int = 1, masked: bool = True) -> np.ndarray:
    """Read single band data."""
    if not self.path.exists():
        raise FileNotFoundError(f"Layer file not found: {self.path}")

    with rasterio.open(self.path) as src:
        if band < 1 or band > src.count:
            raise ValueError(
                f"Band {band} out of range. File has {src.count} bands."
            )

        data = src.read(band)

        if masked and src.nodata is not None:
            data = np.ma.masked_equal(data, src.nodata)

    return data

read_bands

read_bands(masked: bool = True) -> list[np.ndarray]

Read all bands.

Parameters:

Name Type Description Default
masked bool

If True, return masked arrays with nodata values masked. Default is True.

True

Returns:

Type Description
list[ndarray]

List of arrays, one per band.

Examples:

>>> # Read DEM (single band)
>>> dem_layer = StaticLayer.from_path("..._dem.tif")
>>> bands = dem_layer.read_bands()
>>> dem = bands[0]
>>> # Read LOS vectors (three bands)
>>> los_layer = StaticLayer.from_path("..._line_of_sight_enu.tif")
>>> bands = los_layer.read_bands()
>>> east, north, up = bands[0], bands[1], bands[2]
Source code in src/cal_disp/product/_static.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def read_bands(self, masked: bool = True) -> list[np.ndarray]:
    """Read all bands.

    Parameters
    ----------
    masked : bool, optional
        If True, return masked arrays with nodata values masked.
        Default is True.

    Returns
    -------
    list[np.ndarray]
        List of arrays, one per band.

    Examples
    --------
    >>> # Read DEM (single band)
    >>> dem_layer = StaticLayer.from_path("..._dem.tif")
    >>> bands = dem_layer.read_bands()
    >>> dem = bands[0]

    >>> # Read LOS vectors (three bands)
    >>> los_layer = StaticLayer.from_path("..._line_of_sight_enu.tif")
    >>> bands = los_layer.read_bands()
    >>> east, north, up = bands[0], bands[1], bands[2]

    """
    if not self.path.exists():
        raise FileNotFoundError(f"Layer file not found: {self.path}")

    with rasterio.open(self.path) as src:
        bands = []
        for band_idx in range(1, src.count + 1):
            data = src.read(band_idx)
            if masked and src.nodata is not None:
                data = np.ma.masked_equal(data, src.nodata)
            bands.append(data)

    return bands

to_dataset

to_dataset() -> xr.Dataset

Convert raster to xarray Dataset.

Source code in src/cal_disp/product/_static.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def to_dataset(self) -> xr.Dataset:
    """Convert raster to xarray Dataset."""
    # Get shape and transform using existing methods
    height, width = self.get_shape()
    transform = self.get_transform()

    # Generate x and y coordinates from transform
    x_coords = np.arange(width) * transform[0] + transform[2] + transform[0] / 2
    y_coords = np.arange(height) * transform[4] + transform[5] + transform[4] / 2

    # Create coordinates dict
    coords = {
        "y": (["y"], y_coords),
        "x": (["x"], x_coords),
    }

    # Create data variables
    data_vars = {}
    nodata = self.get_nodata()

    if self.num_bands == 1:
        # Single band - use layer_type as variable name
        data = self.read(band=1, masked=False)
        if nodata is not None:
            data = np.where(data == nodata, np.nan, data)
        data_vars[self.layer_type] = (["y", "x"], data)

    elif self.layer_type == "line_of_sight_enu" and self.num_bands == 3:
        # Special case: LOS vectors - create three variables
        band_names = ["los_east", "los_north", "los_up"]
        bands = self.read_bands(masked=False)

        for name, data in zip(band_names, bands):
            if nodata is not None:
                data = np.where(data == nodata, np.nan, data)
            data_vars[name] = (["y", "x"], data)

    else:
        # Generic multi-band: use band dimension
        bands = self.read_bands(masked=False)
        all_data = np.stack(bands)

        if nodata is not None:
            all_data = np.where(all_data == nodata, np.nan, all_data)

        coords["band"] = (["band"], np.arange(1, self.num_bands + 1))
        data_vars[self.layer_type] = (["band", "y", "x"], all_data)

    # Create dataset
    ds = xr.Dataset(data_vars=data_vars, coords=coords)

    # Add attributes
    ds.attrs["frame_id"] = self.frame_id
    ds.attrs["satellite"] = self.satellite
    ds.attrs["version"] = self.version
    ds.attrs["reference_date"] = self.reference_date.isoformat()
    ds.attrs["layer_type"] = self.layer_type

    # Add CRS information using existing method
    crs = self.get_crs()
    if crs:
        ds.attrs["crs_wkt"] = crs.to_wkt()
        epsg = self.get_epsg()
        if epsg:
            ds.attrs["epsg"] = epsg

    # Add transform
    ds.attrs["transform"] = list(transform)
    ds.attrs["nodata"] = nodata

    # Add coordinate attributes
    ds["x"].attrs["standard_name"] = "projection_x_coordinate"
    ds["x"].attrs["long_name"] = "x coordinate of projection"
    ds["x"].attrs["units"] = "m"

    ds["y"].attrs["standard_name"] = "projection_y_coordinate"
    ds["y"].attrs["long_name"] = "y coordinate of projection"
    ds["y"].attrs["units"] = "m"

    # Add variable-specific attributes
    if self.layer_type == "dem":
        ds["dem"].attrs["units"] = "m"
        ds["dem"].attrs["long_name"] = "Digital Elevation Model"
    elif self.layer_type == "line_of_sight_enu":
        ds["los_east"].attrs["long_name"] = "LOS unit vector - East component"
        ds["los_north"].attrs["long_name"] = "LOS unit vector - North component"
        ds["los_up"].attrs["long_name"] = "LOS unit vector - Up component"
    elif self.layer_type == "layover_shadow_mask":
        ds["layover_shadow_mask"].attrs["long_name"] = "Layover and Shadow Mask"
        ds["layover_shadow_mask"].attrs[
            "description"
        ] = "0=valid, 1=layover, 2=shadow"

    return ds

TropoProduct dataclass

OPERA TROPO-ZENITH tropospheric delay product.

Minimal class for managing OPERA tropospheric products. Processing functions are standalone for composability.

Examples:

>>> product = TropoProduct.from_path("tropo.nc")
>>> ds = product.open_dataset()
>>> total = product.get_total_delay()
Source code in src/cal_disp/product/_tropo.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
@dataclass
class TropoProduct:
    """OPERA TROPO-ZENITH tropospheric delay product.

    Minimal class for managing OPERA tropospheric products.
    Processing functions are standalone for composability.

    Examples
    --------
    >>> product = TropoProduct.from_path("tropo.nc")
    >>> ds = product.open_dataset()
    >>> total = product.get_total_delay()

    """

    path: Path
    date: datetime
    production_date: datetime
    model: str
    version: str

    _PATTERN = re.compile(
        r"OPERA_L4_TROPO-ZENITH_"
        r"(?P<date>\d{8}T\d{6}Z)_"
        r"(?P<production>\d{8}T\d{6}Z)_"
        r"(?P<model>\w+)_"
        r"v(?P<version>[\d.]+)"
        r"\.nc$"
    )

    TROPO_LAYERS = [
        "wet_delay",
        "hydrostatic_delay",
    ]

    def __post_init__(self) -> None:
        """Validate product after construction."""
        self.path = Path(self.path)

        if self.production_date < self.date:
            raise ValueError(
                f"Production date ({self.production_date}) cannot be before "
                f"model date ({self.date})"
            )

    @classmethod
    def from_path(cls, path: Path | str) -> "TropoProduct":
        """Parse product metadata from filename."""
        path = Path(path)
        match = cls._PATTERN.match(path.name)

        if not match:
            raise ValueError(
                f"Filename does not match OPERA TROPO-ZENITH pattern: {path.name}"
            )

        return cls(
            path=path,
            date=datetime.strptime(match.group("date"), "%Y%m%dT%H%M%SZ"),
            production_date=datetime.strptime(
                match.group("production"), "%Y%m%dT%H%M%SZ"
            ),
            model=match.group("model"),
            version=match.group("version"),
        )

    def matches_date(self, target_date: datetime, hours: float = 6.0) -> bool:
        """Check if product date is within time window of target date."""
        delta = abs(self.date - target_date)
        return delta <= timedelta(hours=hours)

    def open_dataset(
        self,
        bounds: tuple[float, float, float, float] | None = None,
        max_height: float | None = None,
        bounds_crs: str = "EPSG:4326",
        bounds_buffer: float = 0.0,
    ) -> xr.Dataset:
        """Open tropospheric delay dataset with optional subsetting.

        Parameters
        ----------
        bounds : tuple[float, float, float, float] or None, optional
            Spatial bounds as (west, south, east, north). Default is None.
        max_height : float or None, optional
            Maximum height in meters. Default is None.
        bounds_crs : str, optional
            CRS of bounds. Default is "EPSG:4326".
        bounds_buffer : float, optional
            Buffer to add to bounds in degrees (for lat/lon) or meters
            (for projected CRS). Default is 0.0. Useful value: 0.2 for lat/lon.

        Returns
        -------
        xr.Dataset
            Tropospheric delay dataset.

        """
        if not self.path.exists():
            raise FileNotFoundError(f"Product file not found: {self.path}")

        ds = xr.open_dataset(self.path, engine="h5netcdf")

        if bounds is not None:
            # Apply buffer if requested
            if bounds_buffer > 0:
                west, south, east, north = bounds
                bounds = (
                    west - bounds_buffer,
                    south - bounds_buffer,
                    east + bounds_buffer,
                    north + bounds_buffer,
                )
            ds = self._subset_spatial(ds, bounds, bounds_crs)

        if max_height is not None:
            ds = self._subset_height(ds, max_height)

        return ds

    def get_total_delay(
        self,
        time_idx: int = 0,
        bounds: tuple[float, float, float, float] | None = None,
        max_height: float | None = None,
        bounds_crs: str = "EPSG:4326",
        bounds_buffer: float = 0.0,
    ) -> xr.DataArray:
        """Get total zenith delay (wet + hydrostatic).

        Computes total delay as sum of wet and hydrostatic components.

        Parameters
        ----------
        time_idx : int, optional
            Time index to extract. Default is 0.
        bounds : tuple[float, float, float, float] or None, optional
            Spatial bounds for subsetting. Default is None.
        max_height : float or None, optional
            Maximum height for subsetting. Default is None.
        bounds_crs : str, optional
            CRS of bounds. Default is "EPSG:4326".
        bounds_buffer : float, optional
            Buffer to add to bounds. Default is 0.0.

        Returns
        -------
        xr.DataArray
            Total zenith delay with dimensions (height, latitude, longitude).

        """
        ds = self.open_dataset(
            bounds=bounds,
            max_height=max_height,
            bounds_crs=bounds_crs,
            bounds_buffer=bounds_buffer,
        )

        # Compute total delay from wet + hydrostatic
        if "wet_delay" not in ds:
            raise ValueError("wet_delay not found in dataset")
        if "hydrostatic_delay" not in ds:
            raise ValueError("hydrostatic_delay not found in dataset")

        da = ds["wet_delay"] + ds["hydrostatic_delay"]
        da.name = "zenith_total_delay"
        da.attrs.update(
            {
                "long_name": "Total zenith tropospheric delay",
                "units": "meters",
                "description": "Sum of wet and hydrostatic components",
            }
        )

        # Preserve spatial reference
        if "spatial_ref" in ds:
            da = da.assign_coords({"spatial_ref": ds["spatial_ref"]})

        # Handle time dimension
        if "time" in da.dims:
            n_times = len(da.time)
            if abs(time_idx) >= n_times:
                raise ValueError(
                    f"time_idx {time_idx} out of range for {n_times} timesteps"
                )
            da = da.isel(time=time_idx)

        return da

    def _subset_spatial(
        self,
        ds: xr.Dataset,
        bounds: tuple[float, float, float, float],
        bounds_crs: str,
    ) -> xr.Dataset:
        """Subset dataset to spatial bounds."""
        west, south, east, north = bounds

        ds_crs_wkt = ds.spatial_ref.attrs.get("crs_wkt")
        if ds_crs_wkt is None:
            raise ValueError("Dataset missing CRS information")

        ds_crs = CRS.from_wkt(ds_crs_wkt)

        # Transform bounds to dataset CRS if needed
        if bounds_crs != ds_crs.to_string():
            left, bottom, right, top = rio_transform_bounds(
                CRS.from_string(bounds_crs),
                ds_crs,
                west,
                south,
                east,
                north,
            )
        else:
            left, bottom, right, top = west, south, east, north

        # Use latitude/longitude for subsetting
        ds_subset = ds.sel(
            longitude=slice(left, right),
            latitude=slice(top, bottom),
        )

        return ds_subset

    def _subset_height(self, ds: xr.Dataset, max_height: float) -> xr.Dataset:
        """Subset dataset by maximum height."""
        if "height" not in ds.dims:
            return ds

        return ds.where(ds["height"] <= max_height, drop=True)

    @property
    def filename(self) -> str:
        """Product filename."""
        return self.path.name

    @property
    def exists(self) -> bool:
        """Check if product file exists."""
        return self.path.exists()

    def __repr__(self) -> str:
        """Return a string representation."""
        return f"TropoProduct(date={self.date.isoformat()}, model={self.model})"

exists property

exists: bool

Check if product file exists.

filename property

filename: str

Product filename.

from_path classmethod

from_path(path: Path | str) -> TropoProduct

Parse product metadata from filename.

Source code in src/cal_disp/product/_tropo.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@classmethod
def from_path(cls, path: Path | str) -> "TropoProduct":
    """Parse product metadata from filename."""
    path = Path(path)
    match = cls._PATTERN.match(path.name)

    if not match:
        raise ValueError(
            f"Filename does not match OPERA TROPO-ZENITH pattern: {path.name}"
        )

    return cls(
        path=path,
        date=datetime.strptime(match.group("date"), "%Y%m%dT%H%M%SZ"),
        production_date=datetime.strptime(
            match.group("production"), "%Y%m%dT%H%M%SZ"
        ),
        model=match.group("model"),
        version=match.group("version"),
    )

get_total_delay

get_total_delay(time_idx: int = 0, bounds: tuple[float, float, float, float] | None = None, max_height: float | None = None, bounds_crs: str = 'EPSG:4326', bounds_buffer: float = 0.0) -> xr.DataArray

Get total zenith delay (wet + hydrostatic).

Computes total delay as sum of wet and hydrostatic components.

Parameters:

Name Type Description Default
time_idx int

Time index to extract. Default is 0.

0
bounds tuple[float, float, float, float] or None

Spatial bounds for subsetting. Default is None.

None
max_height float or None

Maximum height for subsetting. Default is None.

None
bounds_crs str

CRS of bounds. Default is "EPSG:4326".

'EPSG:4326'
bounds_buffer float

Buffer to add to bounds. Default is 0.0.

0.0

Returns:

Type Description
DataArray

Total zenith delay with dimensions (height, latitude, longitude).

Source code in src/cal_disp/product/_tropo.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def get_total_delay(
    self,
    time_idx: int = 0,
    bounds: tuple[float, float, float, float] | None = None,
    max_height: float | None = None,
    bounds_crs: str = "EPSG:4326",
    bounds_buffer: float = 0.0,
) -> xr.DataArray:
    """Get total zenith delay (wet + hydrostatic).

    Computes total delay as sum of wet and hydrostatic components.

    Parameters
    ----------
    time_idx : int, optional
        Time index to extract. Default is 0.
    bounds : tuple[float, float, float, float] or None, optional
        Spatial bounds for subsetting. Default is None.
    max_height : float or None, optional
        Maximum height for subsetting. Default is None.
    bounds_crs : str, optional
        CRS of bounds. Default is "EPSG:4326".
    bounds_buffer : float, optional
        Buffer to add to bounds. Default is 0.0.

    Returns
    -------
    xr.DataArray
        Total zenith delay with dimensions (height, latitude, longitude).

    """
    ds = self.open_dataset(
        bounds=bounds,
        max_height=max_height,
        bounds_crs=bounds_crs,
        bounds_buffer=bounds_buffer,
    )

    # Compute total delay from wet + hydrostatic
    if "wet_delay" not in ds:
        raise ValueError("wet_delay not found in dataset")
    if "hydrostatic_delay" not in ds:
        raise ValueError("hydrostatic_delay not found in dataset")

    da = ds["wet_delay"] + ds["hydrostatic_delay"]
    da.name = "zenith_total_delay"
    da.attrs.update(
        {
            "long_name": "Total zenith tropospheric delay",
            "units": "meters",
            "description": "Sum of wet and hydrostatic components",
        }
    )

    # Preserve spatial reference
    if "spatial_ref" in ds:
        da = da.assign_coords({"spatial_ref": ds["spatial_ref"]})

    # Handle time dimension
    if "time" in da.dims:
        n_times = len(da.time)
        if abs(time_idx) >= n_times:
            raise ValueError(
                f"time_idx {time_idx} out of range for {n_times} timesteps"
            )
        da = da.isel(time=time_idx)

    return da

matches_date

matches_date(target_date: datetime, hours: float = 6.0) -> bool

Check if product date is within time window of target date.

Source code in src/cal_disp/product/_tropo.py
80
81
82
83
def matches_date(self, target_date: datetime, hours: float = 6.0) -> bool:
    """Check if product date is within time window of target date."""
    delta = abs(self.date - target_date)
    return delta <= timedelta(hours=hours)

open_dataset

open_dataset(bounds: tuple[float, float, float, float] | None = None, max_height: float | None = None, bounds_crs: str = 'EPSG:4326', bounds_buffer: float = 0.0) -> xr.Dataset

Open tropospheric delay dataset with optional subsetting.

Parameters:

Name Type Description Default
bounds tuple[float, float, float, float] or None

Spatial bounds as (west, south, east, north). Default is None.

None
max_height float or None

Maximum height in meters. Default is None.

None
bounds_crs str

CRS of bounds. Default is "EPSG:4326".

'EPSG:4326'
bounds_buffer float

Buffer to add to bounds in degrees (for lat/lon) or meters (for projected CRS). Default is 0.0. Useful value: 0.2 for lat/lon.

0.0

Returns:

Type Description
Dataset

Tropospheric delay dataset.

Source code in src/cal_disp/product/_tropo.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def open_dataset(
    self,
    bounds: tuple[float, float, float, float] | None = None,
    max_height: float | None = None,
    bounds_crs: str = "EPSG:4326",
    bounds_buffer: float = 0.0,
) -> xr.Dataset:
    """Open tropospheric delay dataset with optional subsetting.

    Parameters
    ----------
    bounds : tuple[float, float, float, float] or None, optional
        Spatial bounds as (west, south, east, north). Default is None.
    max_height : float or None, optional
        Maximum height in meters. Default is None.
    bounds_crs : str, optional
        CRS of bounds. Default is "EPSG:4326".
    bounds_buffer : float, optional
        Buffer to add to bounds in degrees (for lat/lon) or meters
        (for projected CRS). Default is 0.0. Useful value: 0.2 for lat/lon.

    Returns
    -------
    xr.Dataset
        Tropospheric delay dataset.

    """
    if not self.path.exists():
        raise FileNotFoundError(f"Product file not found: {self.path}")

    ds = xr.open_dataset(self.path, engine="h5netcdf")

    if bounds is not None:
        # Apply buffer if requested
        if bounds_buffer > 0:
            west, south, east, north = bounds
            bounds = (
                west - bounds_buffer,
                south - bounds_buffer,
                east + bounds_buffer,
                north + bounds_buffer,
            )
        ds = self._subset_spatial(ds, bounds, bounds_crs)

    if max_height is not None:
        ds = self._subset_height(ds, max_height)

    return ds

UnrGrid dataclass

UNR GNSS grid data.

Represents gridded GNSS velocity data from University of Nevada Reno. Data is stored as parquet with point geometries and metadata.

Examples:

>>> # Load from path (frame_id parsed if in filename)
>>> grid = UnrGrid.from_path("unr_grid_frame8882.parquet")
>>> grid.frame_id
8882
>>> # Load GeoDataFrame
>>> gdf = grid.load()
>>> gdf.columns
['lon', 'lat', 'east', 'north', 'up', 'geometry', ...]
>>> # Get metadata
>>> meta = grid.get_metadata()
>>> meta['source']
'UNR'
Source code in src/cal_disp/product/_unr.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
@dataclass
class UnrGrid:
    """UNR GNSS grid data.

    Represents gridded GNSS velocity data from University of Nevada Reno.
    Data is stored as parquet with point geometries and metadata.

    Examples
    --------
    >>> # Load from path (frame_id parsed if in filename)
    >>> grid = UnrGrid.from_path("unr_grid_frame8882.parquet")
    >>> grid.frame_id
    8882

    >>> # Load GeoDataFrame
    >>> gdf = grid.load()
    >>> gdf.columns
    ['lon', 'lat', 'east', 'north', 'up', 'geometry', ...]

    >>> # Get metadata
    >>> meta = grid.get_metadata()
    >>> meta['source']
    'UNR'

    """

    path: Path
    frame_id: int | None = None

    # Optional pattern to extract frame_id from filename
    _PATTERN = re.compile(r"frame[\s_-]?(\d+)", re.IGNORECASE)

    def __post_init__(self) -> None:
        """Validate grid after construction."""
        self.path = Path(self.path)

    @classmethod
    def from_path(cls, path: Path | str, frame_id: int | None = None) -> "UnrGrid":
        """Create UnrGrid from parquet file path.

        Parameters
        ----------
        path : Path or str
            Path to UNR parquet file.
        frame_id : int or None, optional
            Frame ID. If None, attempts to parse from filename.
            Default is None.

        Returns
        -------
        UnrGrid
            Grid instance.

        Examples
        --------
        >>> # Frame ID from filename
        >>> grid = UnrGrid.from_path("unr_grid_frame8882.parquet")
        >>> grid.frame_id
        8882

        >>> # Explicit frame ID
        >>> grid = UnrGrid.from_path("custom_unr_data.parquet", frame_id=8882)
        >>> grid.frame_id
        8882

        >>> # No frame ID
        >>> grid = UnrGrid.from_path("unr_data.parquet")
        >>> grid.frame_id is None
        True

        """
        path = Path(path)

        # Try to parse frame_id from filename if not provided
        if frame_id is None:
            match = cls._PATTERN.search(path.name)
            if match:
                frame_id = int(match.group(1))

        return cls(path=path, frame_id=frame_id)

    def load(self) -> gpd.GeoDataFrame:
        """Load UNR grid as GeoDataFrame.

        Returns
        -------
        gpd.GeoDataFrame
            GeoDataFrame with point geometries and velocity data.

        Raises
        ------
        FileNotFoundError
            If parquet file does not exist.

        Examples
        --------
        >>> grid = UnrGrid.from_path("unr_grid_frame8882.parquet")
        >>> gdf = grid.load()
        >>> gdf.crs
        'EPSG:4326'
        >>> gdf[['lon', 'lat', 'east', 'north', 'up']].head()

        """
        if not self.path.exists():
            raise FileNotFoundError(f"UNR grid file not found: {self.path}")

        # Load parquet as DataFrame
        df = pd.read_parquet(self.path)

        # Create GeoDataFrame with point geometries
        gdf = gpd.GeoDataFrame(
            df,
            geometry=gpd.points_from_xy(x=df.lon, y=df.lat),
            crs="EPSG:4326",
        )

        return gdf

    def get_metadata(self) -> dict[str, str]:
        """Extract metadata from parquet file.

        Returns
        -------
        dict[str, str]
            Metadata dictionary.

        Examples
        --------
        >>> grid = UnrGrid.from_path("unr_grid_frame8882.parquet")
        >>> meta = grid.get_metadata()
        >>> meta.keys()
        dict_keys(['source', 'date_created', 'frame_id', ...])

        """
        if not self.path.exists():
            raise FileNotFoundError(f"UNR grid file not found: {self.path}")

        meta = pq.read_metadata(self.path).metadata

        if meta is None:
            return {}

        metadata_dict = {k.decode(): v.decode() for k, v in meta.items()}

        return metadata_dict

    def to_dataframe(self) -> pd.DataFrame:
        """Load as regular DataFrame without geometry.

        Returns
        -------
        pd.DataFrame
            DataFrame with lon, lat, and velocity columns.

        """
        if not self.path.exists():
            raise FileNotFoundError(f"UNR grid file not found: {self.path}")

        return pd.read_parquet(self.path)

    def get_bounds(self) -> dict[str, float]:
        """Get spatial bounds of grid.

        Returns
        -------
        dict[str, float]
            Dictionary with keys: west, south, east, north.

        """
        gdf = self.load()
        bounds = gdf.total_bounds  # (minx, miny, maxx, maxy)

        return {
            "west": bounds[0],
            "south": bounds[1],
            "east": bounds[2],
            "north": bounds[3],
        }

    def get_grid_count(self) -> int:
        """Get number of GNSS points in grid.

        Returns
        -------
        int
            Number of stations.

        """
        df = self.to_dataframe()
        grid_points = df.groupby("id", as_index=False).first()
        return len(grid_points)

    @property
    def filename(self) -> str:
        """Grid filename."""
        return self.path.name

    @property
    def exists(self) -> bool:
        """Check if grid file exists."""
        return self.path.exists()

    def __repr__(self) -> str:
        """Return a string representation."""
        frame_str = f"frame={self.frame_id}" if self.frame_id else "frame=None"
        return (
            f"UnrGrid({frame_str},"
            f" points={self.get_grid_count() if self.exists else '?'})"
        )

exists property

exists: bool

Check if grid file exists.

filename property

filename: str

Grid filename.

from_path classmethod

from_path(path: Path | str, frame_id: int | None = None) -> UnrGrid

Create UnrGrid from parquet file path.

Parameters:

Name Type Description Default
path Path or str

Path to UNR parquet file.

required
frame_id int or None

Frame ID. If None, attempts to parse from filename. Default is None.

None

Returns:

Type Description
UnrGrid

Grid instance.

Examples:

>>> # Frame ID from filename
>>> grid = UnrGrid.from_path("unr_grid_frame8882.parquet")
>>> grid.frame_id
8882
>>> # Explicit frame ID
>>> grid = UnrGrid.from_path("custom_unr_data.parquet", frame_id=8882)
>>> grid.frame_id
8882
>>> # No frame ID
>>> grid = UnrGrid.from_path("unr_data.parquet")
>>> grid.frame_id is None
True
Source code in src/cal_disp/product/_unr.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@classmethod
def from_path(cls, path: Path | str, frame_id: int | None = None) -> "UnrGrid":
    """Create UnrGrid from parquet file path.

    Parameters
    ----------
    path : Path or str
        Path to UNR parquet file.
    frame_id : int or None, optional
        Frame ID. If None, attempts to parse from filename.
        Default is None.

    Returns
    -------
    UnrGrid
        Grid instance.

    Examples
    --------
    >>> # Frame ID from filename
    >>> grid = UnrGrid.from_path("unr_grid_frame8882.parquet")
    >>> grid.frame_id
    8882

    >>> # Explicit frame ID
    >>> grid = UnrGrid.from_path("custom_unr_data.parquet", frame_id=8882)
    >>> grid.frame_id
    8882

    >>> # No frame ID
    >>> grid = UnrGrid.from_path("unr_data.parquet")
    >>> grid.frame_id is None
    True

    """
    path = Path(path)

    # Try to parse frame_id from filename if not provided
    if frame_id is None:
        match = cls._PATTERN.search(path.name)
        if match:
            frame_id = int(match.group(1))

    return cls(path=path, frame_id=frame_id)

get_bounds

get_bounds() -> dict[str, float]

Get spatial bounds of grid.

Returns:

Type Description
dict[str, float]

Dictionary with keys: west, south, east, north.

Source code in src/cal_disp/product/_unr.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def get_bounds(self) -> dict[str, float]:
    """Get spatial bounds of grid.

    Returns
    -------
    dict[str, float]
        Dictionary with keys: west, south, east, north.

    """
    gdf = self.load()
    bounds = gdf.total_bounds  # (minx, miny, maxx, maxy)

    return {
        "west": bounds[0],
        "south": bounds[1],
        "east": bounds[2],
        "north": bounds[3],
    }

get_grid_count

get_grid_count() -> int

Get number of GNSS points in grid.

Returns:

Type Description
int

Number of stations.

Source code in src/cal_disp/product/_unr.py
189
190
191
192
193
194
195
196
197
198
199
200
def get_grid_count(self) -> int:
    """Get number of GNSS points in grid.

    Returns
    -------
    int
        Number of stations.

    """
    df = self.to_dataframe()
    grid_points = df.groupby("id", as_index=False).first()
    return len(grid_points)

get_metadata

get_metadata() -> dict[str, str]

Extract metadata from parquet file.

Returns:

Type Description
dict[str, str]

Metadata dictionary.

Examples:

>>> grid = UnrGrid.from_path("unr_grid_frame8882.parquet")
>>> meta = grid.get_metadata()
>>> meta.keys()
dict_keys(['source', 'date_created', 'frame_id', ...])
Source code in src/cal_disp/product/_unr.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def get_metadata(self) -> dict[str, str]:
    """Extract metadata from parquet file.

    Returns
    -------
    dict[str, str]
        Metadata dictionary.

    Examples
    --------
    >>> grid = UnrGrid.from_path("unr_grid_frame8882.parquet")
    >>> meta = grid.get_metadata()
    >>> meta.keys()
    dict_keys(['source', 'date_created', 'frame_id', ...])

    """
    if not self.path.exists():
        raise FileNotFoundError(f"UNR grid file not found: {self.path}")

    meta = pq.read_metadata(self.path).metadata

    if meta is None:
        return {}

    metadata_dict = {k.decode(): v.decode() for k, v in meta.items()}

    return metadata_dict

load

load() -> gpd.GeoDataFrame

Load UNR grid as GeoDataFrame.

Returns:

Type Description
GeoDataFrame

GeoDataFrame with point geometries and velocity data.

Raises:

Type Description
FileNotFoundError

If parquet file does not exist.

Examples:

>>> grid = UnrGrid.from_path("unr_grid_frame8882.parquet")
>>> gdf = grid.load()
>>> gdf.crs
'EPSG:4326'
>>> gdf[['lon', 'lat', 'east', 'north', 'up']].head()
Source code in src/cal_disp/product/_unr.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def load(self) -> gpd.GeoDataFrame:
    """Load UNR grid as GeoDataFrame.

    Returns
    -------
    gpd.GeoDataFrame
        GeoDataFrame with point geometries and velocity data.

    Raises
    ------
    FileNotFoundError
        If parquet file does not exist.

    Examples
    --------
    >>> grid = UnrGrid.from_path("unr_grid_frame8882.parquet")
    >>> gdf = grid.load()
    >>> gdf.crs
    'EPSG:4326'
    >>> gdf[['lon', 'lat', 'east', 'north', 'up']].head()

    """
    if not self.path.exists():
        raise FileNotFoundError(f"UNR grid file not found: {self.path}")

    # Load parquet as DataFrame
    df = pd.read_parquet(self.path)

    # Create GeoDataFrame with point geometries
    gdf = gpd.GeoDataFrame(
        df,
        geometry=gpd.points_from_xy(x=df.lon, y=df.lat),
        crs="EPSG:4326",
    )

    return gdf

to_dataframe

to_dataframe() -> pd.DataFrame

Load as regular DataFrame without geometry.

Returns:

Type Description
DataFrame

DataFrame with lon, lat, and velocity columns.

Source code in src/cal_disp/product/_unr.py
156
157
158
159
160
161
162
163
164
165
166
167
168
def to_dataframe(self) -> pd.DataFrame:
    """Load as regular DataFrame without geometry.

    Returns
    -------
    pd.DataFrame
        DataFrame with lon, lat, and velocity columns.

    """
    if not self.path.exists():
        raise FileNotFoundError(f"UNR grid file not found: {self.path}")

    return pd.read_parquet(self.path)

bounds_contains

bounds_contains(outer_bounds: tuple[float, float, float, float] | dict[str, float], inner_bounds: tuple[float, float, float, float] | dict[str, float], buffer: float = 0.0) -> bool

Check if outer bounds completely contain inner bounds.

Parameters:

Name Type Description Default
outer_bounds tuple or dict

Outer bounds as (west, south, east, north) or dict with those keys.

required
inner_bounds tuple or dict

Inner bounds as (west, south, east, north) or dict with those keys.

required
buffer float

Buffer distance to require around inner bounds (in same units). Default is 0.0 (exact containment).

0.0

Returns:

Type Description
bool

True if outer bounds completely contain inner bounds (with buffer).

Examples:

>>> # Check if UNR grid covers DISP frame
>>> unr_bounds = (-97, 28, -93, 32)
>>> disp_bounds = (-96, 29, -94, 31)
>>> bounds_contains(unr_bounds, disp_bounds)
True
>>> # With buffer requirement
>>> bounds_contains(unr_bounds, disp_bounds, buffer=0.5)
True
>>> # Dict format
>>> unr_bounds = {"west": -97, "south": 28, "east": -93, "north": 32}
>>> disp_bounds = {"west": -96, "south": 29, "east": -94, "north": 31}
>>> bounds_contains(unr_bounds, disp_bounds)
True
>>> # Does not contain
>>> small_bounds = (-95, 29, -94, 30)
>>> large_bounds = (-96, 28, -93, 31)
>>> bounds_contains(small_bounds, large_bounds)
False
Source code in src/cal_disp/product/_utils.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def bounds_contains(
    outer_bounds: tuple[float, float, float, float] | dict[str, float],
    inner_bounds: tuple[float, float, float, float] | dict[str, float],
    buffer: float = 0.0,
) -> bool:
    """Check if outer bounds completely contain inner bounds.

    Parameters
    ----------
    outer_bounds : tuple or dict
        Outer bounds as (west, south, east, north) or dict with those keys.
    inner_bounds : tuple or dict
        Inner bounds as (west, south, east, north) or dict with those keys.
    buffer : float, optional
        Buffer distance to require around inner bounds (in same units).
        Default is 0.0 (exact containment).

    Returns
    -------
    bool
        True if outer bounds completely contain inner bounds (with buffer).

    Examples
    --------
    >>> # Check if UNR grid covers DISP frame
    >>> unr_bounds = (-97, 28, -93, 32)
    >>> disp_bounds = (-96, 29, -94, 31)
    >>> bounds_contains(unr_bounds, disp_bounds)
    True

    >>> # With buffer requirement
    >>> bounds_contains(unr_bounds, disp_bounds, buffer=0.5)
    True

    >>> # Dict format
    >>> unr_bounds = {"west": -97, "south": 28, "east": -93, "north": 32}
    >>> disp_bounds = {"west": -96, "south": 29, "east": -94, "north": 31}
    >>> bounds_contains(unr_bounds, disp_bounds)
    True

    >>> # Does not contain
    >>> small_bounds = (-95, 29, -94, 30)
    >>> large_bounds = (-96, 28, -93, 31)
    >>> bounds_contains(small_bounds, large_bounds)
    False

    """
    # Parse outer bounds
    if isinstance(outer_bounds, dict):
        o_west = outer_bounds["west"]
        o_south = outer_bounds["south"]
        o_east = outer_bounds["east"]
        o_north = outer_bounds["north"]
    else:
        o_west, o_south, o_east, o_north = outer_bounds

    # Parse inner bounds
    if isinstance(inner_bounds, dict):
        i_west = inner_bounds["west"]
        i_south = inner_bounds["south"]
        i_east = inner_bounds["east"]
        i_north = inner_bounds["north"]
    else:
        i_west, i_south, i_east, i_north = inner_bounds

    # Check containment with buffer
    return (
        o_west <= i_west - buffer
        and o_south <= i_south - buffer
        and o_east >= i_east + buffer
        and o_north >= i_north + buffer
    )

check_bounds_coverage

check_bounds_coverage(outer_bounds: tuple[float, float, float, float] | dict[str, float], inner_bounds: tuple[float, float, float, float] | dict[str, float], buffer: float = 0.0) -> dict[str, bool | dict[str, float]]

Check bounds coverage with detailed gap information.

Parameters:

Name Type Description Default
outer_bounds tuple or dict

Outer bounds as (west, south, east, north) or dict.

required
inner_bounds tuple or dict

Inner bounds as (west, south, east, north) or dict.

required
buffer float

Buffer distance required. Default is 0.0.

0.0

Returns:

Type Description
dict

Dictionary with: - "contains": bool, True if outer contains inner (with buffer) - "gaps": dict, Gap distances by direction (negative = covered)

Examples:

>>> unr_bounds = (-97, 28, -93, 32)
>>> disp_bounds = (-96, 29, -94, 31)
>>> result = check_bounds_coverage(unr_bounds, disp_bounds, buffer=0.5)
>>> result["contains"]
True
>>> result["gaps"]
{'west': -0.5, 'south': -0.5, 'east': -0.5, 'north': -0.5}
>>> # Insufficient coverage
>>> small_bounds = (-95.5, 29, -94, 30)
>>> result = check_bounds_coverage(small_bounds, disp_bounds)
>>> result["contains"]
False
>>> result["gaps"]
{'west': 0.5, 'south': 0.0, 'east': 0.0, 'north': 1.0}
Source code in src/cal_disp/product/_utils.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def check_bounds_coverage(
    outer_bounds: tuple[float, float, float, float] | dict[str, float],
    inner_bounds: tuple[float, float, float, float] | dict[str, float],
    buffer: float = 0.0,
) -> dict[str, bool | dict[str, float]]:
    """Check bounds coverage with detailed gap information.

    Parameters
    ----------
    outer_bounds : tuple or dict
        Outer bounds as (west, south, east, north) or dict.
    inner_bounds : tuple or dict
        Inner bounds as (west, south, east, north) or dict.
    buffer : float, optional
        Buffer distance required. Default is 0.0.

    Returns
    -------
    dict
        Dictionary with:
        - "contains": bool, True if outer contains inner (with buffer)
        - "gaps": dict, Gap distances by direction (negative = covered)

    Examples
    --------
    >>> unr_bounds = (-97, 28, -93, 32)
    >>> disp_bounds = (-96, 29, -94, 31)
    >>> result = check_bounds_coverage(unr_bounds, disp_bounds, buffer=0.5)
    >>> result["contains"]
    True
    >>> result["gaps"]
    {'west': -0.5, 'south': -0.5, 'east': -0.5, 'north': -0.5}

    >>> # Insufficient coverage
    >>> small_bounds = (-95.5, 29, -94, 30)
    >>> result = check_bounds_coverage(small_bounds, disp_bounds)
    >>> result["contains"]
    False
    >>> result["gaps"]
    {'west': 0.5, 'south': 0.0, 'east': 0.0, 'north': 1.0}

    """
    # Parse bounds
    if isinstance(outer_bounds, dict):
        o_west = outer_bounds["west"]
        o_south = outer_bounds["south"]
        o_east = outer_bounds["east"]
        o_north = outer_bounds["north"]
    else:
        o_west, o_south, o_east, o_north = outer_bounds

    if isinstance(inner_bounds, dict):
        i_west = inner_bounds["west"]
        i_south = inner_bounds["south"]
        i_east = inner_bounds["east"]
        i_north = inner_bounds["north"]
    else:
        i_west, i_south, i_east, i_north = inner_bounds

    # Calculate gaps (positive = missing coverage, negative = extra coverage)
    gaps = {
        "west": (i_west - buffer) - o_west,
        "south": (i_south - buffer) - o_south,
        "east": o_east - (i_east + buffer),
        "north": o_north - (i_north + buffer),
    }

    # Check if all gaps are negative or zero (fully contained)
    contains = all(gap <= 0 for gap in gaps.values())

    return {
        "contains": contains,
        "gaps": gaps,
    }

compute_los_correction

compute_los_correction(zenith_delay_2d: DataArray, los_up: DataArray, reference_correction: DataArray | None = None, target_crs: str | None = None, output_path: Path | str | None = None, output_format: str = 'geotiff') -> xr.DataArray

Convert zenith delay to line-of-sight correction.

Source code in src/cal_disp/product/_tropo.py
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
def compute_los_correction(
    zenith_delay_2d: xr.DataArray,
    los_up: xr.DataArray,
    reference_correction: xr.DataArray | None = None,
    target_crs: str | None = None,
    output_path: Path | str | None = None,
    output_format: str = "geotiff",
) -> xr.DataArray:
    """Convert zenith delay to line-of-sight correction."""
    # Ensure same grid
    if los_up.shape != zenith_delay_2d.shape:
        if hasattr(los_up, "rio") and hasattr(zenith_delay_2d, "rio"):
            los_up = los_up.rio.reproject_match(zenith_delay_2d)
        else:
            raise ValueError(
                f"Shape mismatch: los_up {los_up.shape} vs "
                f"zenith_delay {zenith_delay_2d.shape}"
            )

    # Convert zenith to LOS
    # Note: -1 matches DISP convention (positive = apparent uplift)
    los_correction = -1 * (zenith_delay_2d / los_up)

    # Subtract reference if provided
    if reference_correction is not None:
        if reference_correction.shape != los_correction.shape:
            if hasattr(reference_correction, "rio"):
                reference_correction = reference_correction.rio.reproject_match(
                    los_correction
                )

        los_correction = (los_correction - reference_correction).astype(np.float32)

    # Reproject to target CRS if requested
    if target_crs is not None:
        if hasattr(los_correction, "rio"):
            los_correction = los_correction.rio.reproject(target_crs)
        else:
            raise ValueError("Cannot reproject: DataArray missing CRS information")

    # Add metadata
    los_correction.name = "los_correction"
    los_correction.attrs.update(
        {
            "units": "meters",
            "long_name": "Line-of-sight atmospheric correction",
            "line_of_sight_convention": (
                "Positive means decrease in delay (apparent uplift towards satellite)"
            ),
        }
    )

    if reference_correction is not None:
        los_correction.attrs["reference_subtracted"] = "yes"

    if target_crs is not None:
        los_correction.attrs["target_crs"] = target_crs

    # Save if requested
    if output_path is not None:
        output_path = Path(output_path)
        output_path.parent.mkdir(parents=True, exist_ok=True)

        if output_format == "netcdf":
            los_correction.to_netcdf(output_path, engine="h5netcdf")
        elif output_format == "geotiff":
            los_correction.rio.to_raster(
                output_path,
                compress="deflate",
                tiled=True,
                dtype="float32",
            )
        else:
            raise ValueError(f"Unknown format: {output_format}")

    return los_correction

interpolate_in_time

interpolate_in_time(tropo_early: TropoProduct, tropo_late: TropoProduct, target_datetime: datetime, bounds: tuple[float, float, float, float] | None = None, max_height: float = 11000.0, bounds_buffer: float = 0.2, output_path: Path | str | None = None) -> xr.DataArray

Interpolate tropospheric delay between two products in time.

Parameters:

Name Type Description Default
tropo_early TropoProduct

Earlier tropospheric product.

required
tropo_late TropoProduct

Later tropospheric product.

required
target_datetime datetime

Target datetime for interpolation.

required
bounds tuple[float, float, float, float] or None

Spatial bounds as (west, south, east, north). Default is None.

None
max_height float

Maximum height in meters. Default is 11000 m.

11000.0
bounds_buffer float

Buffer to add to bounds in degrees. Default is 0.2.

0.2
output_path Path, str, or None

If provided, save result to NetCDF. Default is None.

None

Returns:

Type Description
DataArray

Interpolated tropospheric delay at target datetime.

Examples:

>>> from datetime import datetime
>>> from product import TropoProduct
>>> from product._tropo import interpolate_in_time
>>>
>>> early = TropoProduct.from_path("tropo_00Z.nc")
>>> late = TropoProduct.from_path("tropo_06Z.nc")
>>> target = datetime(2022, 1, 11, 3, 0)
>>>
>>> # Basic interpolation
>>> delay = interpolate_in_time(early, late, target)
>>>
>>> # With spatial subsetting
>>> bounds = (-96, 29, -94, 31)
>>> delay = interpolate_in_time(
...     early, late, target,
...     bounds=bounds,
...     max_height=11000,
...     bounds_buffer=0.2,
... )
Source code in src/cal_disp/product/_tropo.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
def interpolate_in_time(
    tropo_early: TropoProduct,
    tropo_late: TropoProduct,
    target_datetime: datetime,
    bounds: tuple[float, float, float, float] | None = None,
    max_height: float = 11e3,
    bounds_buffer: float = 0.2,
    output_path: Path | str | None = None,
) -> xr.DataArray:
    """Interpolate tropospheric delay between two products in time.

    Parameters
    ----------
    tropo_early : TropoProduct
        Earlier tropospheric product.
    tropo_late : TropoProduct
        Later tropospheric product.
    target_datetime : datetime
        Target datetime for interpolation.
    bounds : tuple[float, float, float, float] or None, optional
        Spatial bounds as (west, south, east, north). Default is None.
    max_height : float, optional
        Maximum height in meters. Default is 11000 m.
    bounds_buffer : float, optional
        Buffer to add to bounds in degrees. Default is 0.2.
    output_path : Path, str, or None, optional
        If provided, save result to NetCDF. Default is None.

    Returns
    -------
    xr.DataArray
        Interpolated tropospheric delay at target datetime.

    Examples
    --------
    >>> from datetime import datetime
    >>> from product import TropoProduct
    >>> from product._tropo import interpolate_in_time
    >>>
    >>> early = TropoProduct.from_path("tropo_00Z.nc")
    >>> late = TropoProduct.from_path("tropo_06Z.nc")
    >>> target = datetime(2022, 1, 11, 3, 0)
    >>>
    >>> # Basic interpolation
    >>> delay = interpolate_in_time(early, late, target)
    >>>
    >>> # With spatial subsetting
    >>> bounds = (-96, 29, -94, 31)
    >>> delay = interpolate_in_time(
    ...     early, late, target,
    ...     bounds=bounds,
    ...     max_height=11000,
    ...     bounds_buffer=0.2,
    ... )

    """
    if tropo_early.date > tropo_late.date:
        raise ValueError(
            f"Early product date ({tropo_early.date}) must be before "
            f"late product date ({tropo_late.date})"
        )

    if target_datetime < tropo_early.date or target_datetime > tropo_late.date:
        raise ValueError(
            f"Target datetime ({target_datetime}) must be between "
            f"early ({tropo_early.date}) and late ({tropo_late.date}) dates"
        )

    # Get total delay with consistent kwargs
    da_early = tropo_early.get_total_delay(
        bounds=bounds,
        max_height=max_height,
        bounds_buffer=bounds_buffer,
    )

    da_late = tropo_late.get_total_delay(
        bounds=bounds,
        max_height=max_height,
        bounds_buffer=bounds_buffer,
    )

    # Compute interpolation weight
    delta_total = (tropo_late.date - tropo_early.date).total_seconds()
    delta_target = (target_datetime - tropo_early.date).total_seconds()
    weight = delta_target / delta_total

    # Linear interpolation
    da_interp = (1 - weight) * da_early + weight * da_late

    # Add metadata
    da_interp.name = "zenith_total_delay"
    da_interp.attrs.update(
        {
            "interpolation_method": "linear",
            "early_product": tropo_early.filename,
            "late_product": tropo_late.filename,
            "early_date": tropo_early.date.isoformat(),
            "late_date": tropo_late.date.isoformat(),
            "target_date": target_datetime.isoformat(),
            "interpolation_weight": float(weight),
            "long_name": "Total zenith tropospheric delay",
            "units": "meters",
        }
    )

    # Save if requested
    if output_path is not None:
        output_path = Path(output_path)
        output_path.parent.mkdir(parents=True, exist_ok=True)
        da_interp.to_netcdf(output_path, engine="h5netcdf")

    return da_interp

interpolate_to_dem_surface

interpolate_to_dem_surface(da_tropo_cube: DataArray, dem: DataArray, method: str = 'linear', output_path: Path | str | None = None, output_format: str = 'netcdf') -> xr.DataArray

Interpolate 3D tropospheric delay to DEM surface heights.

Parameters:

Name Type Description Default
da_tropo_cube DataArray

3D tropospheric delay with dims (height, y, x). Assumed to be in EPSG:4326 (WGS84) if CRS not specified.

required
dem DataArray

DEM with surface heights in meters. Must have CRS information.

required
method str

Interpolation method ("linear" or "nearest"). Default is "linear".

'linear'
output_path Path, str, or None

If provided, save result. Default is None.

None
output_format str

Output format ("netcdf" or "geotiff"). Default is "netcdf".

'netcdf'

Returns:

Type Description
DataArray

2D tropospheric delay at DEM surface.

Raises:

Type Description
ValueError

If DEM is missing CRS information.

Source code in src/cal_disp/product/_tropo.py
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
def interpolate_to_dem_surface(
    da_tropo_cube: xr.DataArray,
    dem: xr.DataArray,
    method: str = "linear",
    output_path: Path | str | None = None,
    output_format: str = "netcdf",
) -> xr.DataArray:
    """Interpolate 3D tropospheric delay to DEM surface heights.

    Parameters
    ----------
    da_tropo_cube : xr.DataArray
        3D tropospheric delay with dims (height, y, x).
        Assumed to be in EPSG:4326 (WGS84) if CRS not specified.
    dem : xr.DataArray
        DEM with surface heights in meters. Must have CRS information.
    method : str, optional
        Interpolation method ("linear" or "nearest"). Default is "linear".
    output_path : Path, str, or None, optional
        If provided, save result. Default is None.
    output_format : str, optional
        Output format ("netcdf" or "geotiff"). Default is "netcdf".

    Returns
    -------
    xr.DataArray
        2D tropospheric delay at DEM surface.

    Raises
    ------
    ValueError
        If DEM is missing CRS information.

    """
    # Ensure consistent coordinate naming
    if "latitude" in da_tropo_cube.dims:
        da_tropo_cube = da_tropo_cube.rename({"latitude": "y", "longitude": "x"})

    # Check DEM has CRS
    if not hasattr(dem, "rio") or dem.rio.crs is None:
        raise ValueError(
            "DEM is missing CRS information. "
            "Use dem.rio.write_crs() to set the CRS before calling this function."
        )

    dem_crs = dem.rio.crs

    # Write CRS to tropo if missing (assume EPSG:4326)
    if not hasattr(da_tropo_cube, "rio") or da_tropo_cube.rio.crs is None:
        da_tropo_cube = da_tropo_cube.rio.write_crs("EPSG:4326")

    # Reproject if different CRS
    if da_tropo_cube.rio.crs != dem_crs:
        td_utm = da_tropo_cube.rio.reproject(
            dem_crs,
            resampling=Resampling.cubic,
        )
    else:
        td_utm = da_tropo_cube

    # Find height dimension
    if "height" not in td_utm.dims:
        raise ValueError(
            f"No height dimension found. Available dims: {list(td_utm.dims)}"
        )

    # Build interpolator
    rgi = RegularGridInterpolator(
        (td_utm["height"].values, td_utm.y.values, td_utm.x.values),
        np.nan_to_num(td_utm.values),
        method=method,
        bounds_error=False,
        fill_value=np.nan,
    )

    # Create coordinate meshgrid
    yy, xx = np.meshgrid(dem.y.values, dem.x.values, indexing="ij")
    pts = np.column_stack([dem.values.ravel(), yy.ravel(), xx.ravel()])

    # Interpolate
    vals = rgi(pts)

    # Create output DataArray
    out = dem.copy()
    out.values[:] = vals.reshape(dem.shape).astype(np.float32)
    out.name = da_tropo_cube.name or "tropospheric_delay"

    # Update attributes
    out.attrs.update(
        {
            "interpolation_method": method,
            "interpolated_from": "3D tropospheric model",
            "units": "meters",
            "long_name": "Tropospheric delay at DEM surface",
        }
    )

    # Add time if present in input
    if "time" in td_utm.coords:
        out.attrs["time"] = str(td_utm.time.values)

    # Preserve any existing tropo metadata
    if "target_date" in da_tropo_cube.attrs:
        out.attrs["target_date"] = da_tropo_cube.attrs["target_date"]
    if "interpolation_weight" in da_tropo_cube.attrs:
        out.attrs["interpolation_weight"] = da_tropo_cube.attrs["interpolation_weight"]

    # Save if requested
    if output_path is not None:
        output_path = Path(output_path)
        output_path.parent.mkdir(parents=True, exist_ok=True)

        if output_format == "netcdf":
            out.to_netcdf(output_path, engine="h5netcdf")
        elif output_format == "geotiff":
            out.rio.to_raster(
                output_path,
                compress="deflate",
                tiled=True,
                dtype="float32",
            )
        else:
            raise ValueError(f"Unknown format: {output_format}")

    return out

Config

cal_disp.config

DynamicAncillaryFileGroup

Bases: YamlModel

Dynamic ancillary files for the SAS.

Attributes:

Name Type Description
algorithm_parameters_file Path

Path to file containing SAS algorithm parameters.

los_file Path

Path to the DISP static LOS layer file (line-of-sight unit vectors). Alias: static_los_file

dem_file Path

Path to the DISP static DEM layer file (digital elevation model). Alias: static_dem_file

mask_file (Path or None, optional)

Optional byte mask file to ignore low correlation/bad data (e.g., water mask). Convention: 0 = invalid/no data, 1 = good data. Dtype must be uint8. Default is None.

reference_tropo_files (list[Path] or None, optional)

Paths to TROPO files for the reference (primary) date. If not provided, tropospheric correction for reference is skipped. Alias: ref_tropo_files. Default is None.

secondary_tropo_files (list[Path] or None, optional)

Paths to TROPO files for the secondary date. If not provided, tropospheric correction for secondary is skipped. Alias: sec_tropo_files. Default is None.

iono_files (list[Path] or None, optional)

Paths to ionospheric correction files. If not provided, ionospheric correction is skipped. Default is None.

tiles_files (list[Path] or None, optional)

Paths to calibration tile bounds files (e.g., S1 burst bounds) covering the full frame. If not provided, per-tile calibration is skipped. Default is None.

Source code in src/cal_disp/config/_base.py
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
class DynamicAncillaryFileGroup(YamlModel):
    """Dynamic ancillary files for the SAS.

    Attributes
    ----------
    algorithm_parameters_file : Path
        Path to file containing SAS algorithm parameters.
    los_file : Path
        Path to the DISP static LOS layer file (line-of-sight unit vectors).
        Alias: static_los_file
    dem_file : Path
        Path to the DISP static DEM layer file (digital elevation model).
        Alias: static_dem_file
    mask_file : Path or None, optional
        Optional byte mask file to ignore low correlation/bad data (e.g., water mask).
        Convention: 0 = invalid/no data, 1 = good data. Dtype must be uint8.
        Default is None.
    reference_tropo_files : list[Path] or None, optional
        Paths to TROPO files for the reference (primary) date.
        If not provided, tropospheric correction for reference is skipped.
        Alias: ref_tropo_files. Default is None.
    secondary_tropo_files : list[Path] or None, optional
        Paths to TROPO files for the secondary date.
        If not provided, tropospheric correction for secondary is skipped.
        Alias: sec_tropo_files. Default is None.
    iono_files : list[Path] or None, optional
        Paths to ionospheric correction files.
        If not provided, ionospheric correction is skipped.
        Default is None.
    tiles_files : list[Path] or None, optional
        Paths to calibration tile bounds files (e.g., S1 burst bounds) covering
        the full frame. If not provided, per-tile calibration is skipped.
        Default is None.

    """

    algorithm_parameters_file: RequiredPath = Field(
        ...,
        description="Path to file containing SAS algorithm parameters.",
    )

    los_file: RequiredPath = Field(
        ...,
        alias="static_los_file",
        description=(
            "Path to the DISP static los layer file (1 per frame) with line-of-sight"
            " unit vectors."
        ),
    )

    dem_file: RequiredPath = Field(
        ...,
        alias="static_dem_file",
        description=(
            "Path to the DISP static dem layer file (1 per frame) with line-of-sight"
            " unit vectors."
        ),
    )
    # NOTE should I add also shadow_layover static file as input

    mask_file: OptionalPath = Field(
        default=None,
        description=(
            "Optional Byte mask file used to ignore low correlation/bad data (e.g water"
            " mask). Convention is 0 for no data/invalid, and 1 for good data. Dtype"
            " must be uint8."
        ),
    )

    reference_tropo_files: Optional[List[Path]] = Field(
        default=None,
        alias="ref_tropo_files",
        description=(
            "Path to the TROPO file for the reference date."
            " If not provided, tropospheric correction for reference is skipped."
        ),
    )

    secondary_tropo_files: Optional[List[Path]] = Field(
        default=None,
        alias="sec_tropo_files",
        description=(
            "Path to the TROPO file for the secondary date."
            " If not provided, tropospheric correction for secondary is skipped."
        ),
    )

    iono_files: Optional[List[Path]] = Field(
        default=None,
        alias="iono_files",
        description=(
            "Path to the IONO files"
            " If not provided, ionosphere correction for reference is skipped."
        ),
    )

    tiles_files: Optional[List[Path]] = Field(
        default=None,
        description=(
            "Paths to the calibration tile bounds files (e.g. S1 burst bounds) covering"
            " full frame. If none provided, calibration per tile is skipped."
        ),
    )

    @field_validator(
        "reference_tropo_files",
        "secondary_tropo_files",
        "iono_files",
        "tiles_files",
        mode="before",
    )
    @classmethod
    def _validate_file_lists(cls, v):
        """Validate and process file lists or glob patterns."""
        return _read_file_list_or_glob(cls, v)

    def get_all_files(self) -> Dict[str, Path | list[Path]]:
        return self.get_all_file_paths(flatten_lists=True)

    model_config = STRICT_CONFIG_WITH_ALIASES

InputFileGroup

Bases: YamlModel

Input file group for the SAS.

Attributes:

Name Type Description
disp_file Path

Path to DISP file.

calibration_reference_grid Path

Path to UNR calibration reference file (parquet format).

Source code in src/cal_disp/config/_base.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class InputFileGroup(YamlModel):
    """Input file group for the SAS.

    Attributes
    ----------
    disp_file : Path
        Path to DISP file.
    calibration_reference_grid : Path
        Path to UNR calibration reference file (parquet format).

    """

    disp_file: RequiredPath = Field(
        ...,
        description="Path to DISP file.",
    )

    calibration_reference_grid: RequiredPath = Field(
        ...,
        description="Path to UNR calibration reference file [parquet].",
    )

    frame_id: int = Field(
        ...,
        description="Frame ID of the DISP frame.",
    )

    model_config = ConfigDict(
        extra="forbid",
    )

StaticAncillaryFileGroup

Bases: YamlModel

Static ancillary files for the SAS.

These files contain configuration and reference data that don't change between processing runs for a given frame.

Attributes:

Name Type Description
algorithm_parameters_overrides_json Optional[Path]

JSON file with frame-specific algorithm parameter overrides.

deformation_area_database_json Optional[Path]

GeoJSON file with deforming areas to exclude from calibration.

event_database_json Optional[Path]

GeoJSON file with earthquake/volcanic activity events for each frame.

Source code in src/cal_disp/config/_base.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
class StaticAncillaryFileGroup(YamlModel):
    """Static ancillary files for the SAS.

    These files contain configuration and reference data that don't change
    between processing runs for a given frame.

    Attributes
    ----------
    algorithm_parameters_overrides_json : Optional[Path]
        JSON file with frame-specific algorithm parameter overrides.
    deformation_area_database_json : Optional[Path]
        GeoJSON file with deforming areas to exclude from calibration.
    event_database_json : Optional[Path]
        GeoJSON file with earthquake/volcanic activity events for each frame.

    """

    algorithm_parameters_overrides_json: OptionalPath = Field(
        default=None,
        description=(
            "JSON file containing frame-specific algorithm parameters to override the"
            " defaults passed in the `algorithm_parameters.yaml`."
        ),
    )

    deformation_area_database_json: OptionalPath = Field(
        default=None,
        alias="defo_area_db_json",
        description=(
            "GeoJSON file containing list of deforming areas to exclude from"
            " calibration (e.g. Central Valley subsidence)."
        ),
    )

    event_database_json: OptionalPath = Field(
        default=None,
        alias="event_db_json",
        description=(
            "GeoJSON file containing list of events (earthquakes, volcanic activity)"
            " for each frame."
        ),
    )

    def has_algorithm_overrides(self) -> bool:
        """Check if algorithm parameter overrides are provided."""
        return self.algorithm_parameters_overrides_json is not None

    def has_deformation_database(self) -> bool:
        """Check if deformation area database is provided."""
        return self.deformation_area_database_json is not None

    def has_event_database(self) -> bool:
        """Check if event database is provided."""
        return self.event_database_json is not None

    def get_all_files(self) -> Dict[str, Path | list[Path]]:
        return self.get_all_file_paths(flatten_lists=True)

    model_config = STRICT_CONFIG_WITH_ALIASES

has_algorithm_overrides

has_algorithm_overrides() -> bool

Check if algorithm parameter overrides are provided.

Source code in src/cal_disp/config/_base.py
209
210
211
def has_algorithm_overrides(self) -> bool:
    """Check if algorithm parameter overrides are provided."""
    return self.algorithm_parameters_overrides_json is not None

has_deformation_database

has_deformation_database() -> bool

Check if deformation area database is provided.

Source code in src/cal_disp/config/_base.py
213
214
215
def has_deformation_database(self) -> bool:
    """Check if deformation area database is provided."""
    return self.deformation_area_database_json is not None

has_event_database

has_event_database() -> bool

Check if event database is provided.

Source code in src/cal_disp/config/_base.py
217
218
219
def has_event_database(self) -> bool:
    """Check if event database is provided."""
    return self.event_database_json is not None

WorkerSettings

Bases: YamlModel

Settings for controlling CPU/threading and parallelism.

This class configures Dask distributed computing settings including worker count, threads per worker, and data block sizes for chunked processing.

Attributes:

Name Type Description
n_workers int

Number of Dask workers to spawn. Default is 4.

threads_per_worker int

Number of threads per worker. Default is 2.

block_shape Tuple[int, int]

Block size (rows, columns) for chunked data loading.

Examples:

>>> # Default settings
>>> settings = WorkerSettings()
>>>
>>> # Custom configuration
>>> settings = WorkerSettings(
...     n_workers=8,
...     threads_per_worker=4,
...     block_shape=(256, 256)
... )
>>> print(f"Total threads: {settings.total_threads}")
Source code in src/cal_disp/config/_workers.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
class WorkerSettings(YamlModel):
    """Settings for controlling CPU/threading and parallelism.

    This class configures Dask distributed computing settings including
    worker count, threads per worker, and data block sizes for chunked
    processing.

    Attributes
    ----------
    n_workers : int
        Number of Dask workers to spawn. Default is 4.
    threads_per_worker : int
        Number of threads per worker. Default is 2.
    block_shape : Tuple[int, int]
        Block size (rows, columns) for chunked data loading.

    Examples
    --------
    >>> # Default settings
    >>> settings = WorkerSettings()
    >>>
    >>> # Custom configuration
    >>> settings = WorkerSettings(
    ...     n_workers=8,
    ...     threads_per_worker=4,
    ...     block_shape=(256, 256)
    ... )
    >>> print(f"Total threads: {settings.total_threads}")

    """

    n_workers: int = Field(
        default=4,
        ge=1,
        le=256,  # Reasonable upper limit
        description=(
            "Number of workers to use in dask.Client. Typically set to the number "
            "of physical CPU cores or less. Default: 4"
        ),
    )

    threads_per_worker: int = Field(
        default=2,
        ge=1,
        le=32,  # Reasonable upper limit
        description=(
            "Number of threads per worker in dask.Client. Controls parallelism "
            "within each worker. Default: 2"
        ),
    )

    block_shape: Tuple[int, int] = Field(
        default=(128, 128),
        description=(
            "Size (rows, columns) of blocks of data to load at a time. "
            "Larger blocks use more memory but may be more efficient. "
            "Must be positive integers. Default: (128, 128)"
        ),
    )

    @field_validator("block_shape", mode="before")
    @classmethod
    def _validate_block_shape(cls, v) -> Tuple[int, int]:
        """Validate block shape has positive dimensions.

        Parameters
        ----------
        v : tuple | list
            Block shape to validate.

        Returns
        -------
        Tuple[int, int]
            Validated block shape.

        Raises
        ------
        ValueError
            If block dimensions are not positive or not exactly 2 values.

        """
        # Convert to tuple if list
        if isinstance(v, list):
            v = tuple(v)

        # Check it's a tuple/list with 2 elements
        if not isinstance(v, tuple) or len(v) != 2:
            raise ValueError(
                f"block_shape must have exactly 2 dimensions (rows, cols), got {v}"
            )

        # Check both are integers
        if not all(isinstance(x, int) for x in v):
            raise ValueError(f"block_shape dimensions must be integers, got {v}")

        # Check both are positive
        if not all(x > 0 for x in v):
            raise ValueError(f"block_shape dimensions must be positive, got {v}")

        return v

    # CHANGED: Use @property instead of @computed_field to avoid mypy issues
    @property
    def total_threads(self) -> int:
        """Total number of threads across all workers.

        Returns
        -------
        int
            n_workers * threads_per_worker

        """
        return self.n_workers * self.threads_per_worker

    @property
    def block_size(self) -> int:
        """Total number of elements per block.

        Returns
        -------
        int
            Product of block_shape dimensions (rows * columns).

        """
        return self.block_shape[0] * self.block_shape[1]

    def estimate_memory_per_block(self, dtype_size: int = 8, n_bands: int = 1) -> float:
        """Estimate memory usage per block in MB.

        Parameters
        ----------
        dtype_size : int, default=8
            Size of data type in bytes (e.g., 8 for float64, 4 for float32).
        n_bands : int, default=1
            Number of bands/layers in the data.

        Returns
        -------
        float
            Estimated memory in megabytes.

        Examples
        --------
        >>> settings = WorkerSettings(block_shape=(256, 256))
        >>> mem_mb = settings.estimate_memory_per_block(dtype_size=8, n_bands=2)
        >>> print(f"Estimated memory: {mem_mb:.2f} MB")

        """
        bytes_per_block = self.block_size * dtype_size * n_bands
        return bytes_per_block / (1024 * 1024)  # Convert to MB

    def estimate_total_memory(
        self, dtype_size: int = 8, n_bands: int = 1, overhead_factor: float = 1.5
    ) -> float:
        """Estimate total memory usage across all workers in GB.

        Parameters
        ----------
        dtype_size : int, default=8
            Size of data type in bytes.
        n_bands : int, default=1
            Number of bands/layers in the data.
        overhead_factor : float, default=1.5
            Multiplier for overhead (copies, intermediate results).

        Returns
        -------
        float
            Estimated total memory in gigabytes.

        """
        mb_per_block = self.estimate_memory_per_block(dtype_size, n_bands)
        # Assume each worker might hold multiple blocks
        total_mb = mb_per_block * self.n_workers * overhead_factor
        return total_mb / 1024  # Convert to GB

    def summary(self) -> str:
        """Generate a human-readable summary of settings.

        Returns
        -------
        str
            Multi-line summary string.

        """
        lines = [
            "WorkerSettings Configuration:",
            "=" * 50,
            f"Workers:              {self.n_workers}",
            f"Threads per worker:   {self.threads_per_worker}",
            f"Total threads:        {self.total_threads}",
            f"Block shape:          {self.block_shape[0]} x {self.block_shape[1]}",
            f"Block size:           {self.block_size:,} elements",
            f"Estimated mem/block:  {self.estimate_memory_per_block():.2f} MB",
            f"Estimated total mem:  {self.estimate_total_memory():.2f} GB",
            "=" * 50,
        ]
        return "\n".join(lines)

    @classmethod
    def create_lightweight(cls) -> "WorkerSettings":
        """Create lightweight settings for small datasets or testing.

        Returns
        -------
        WorkerSettings
            Configuration with 2 workers, 1 thread each, small blocks.

        """
        return cls(n_workers=2, threads_per_worker=1, block_shape=(64, 64))

    @classmethod
    def create_standard(cls) -> "WorkerSettings":
        """Create standard settings for typical workloads.

        Returns
        -------
        WorkerSettings
            Configuration with 4 workers, 2 threads each, medium blocks.

        """
        return cls(n_workers=4, threads_per_worker=2, block_shape=(128, 128))

    @classmethod
    def create_heavy(cls) -> "WorkerSettings":
        """Create heavy-duty settings for large datasets.

        Returns
        -------
        WorkerSettings
            Configuration with 8 workers, 4 threads each, large blocks.

        """
        return cls(n_workers=8, threads_per_worker=4, block_shape=(256, 256))

    @classmethod
    def create_from_cpu_count(
        cls,
        use_fraction: float = 0.75,
        threads_per_worker: int = 2,
        block_shape: Tuple[int, int] = (128, 128),
    ) -> "WorkerSettings":
        """Create settings based on available CPU count.

        Parameters
        ----------
        use_fraction : float, default=0.75
            Fraction of available CPUs to use (0.0 to 1.0).
        threads_per_worker : int, default=2
            Threads per worker.
        block_shape : Tuple[int, int], default=(128, 128)
            Block shape for data loading.

        Returns
        -------
        WorkerSettings
            Configuration tuned to system CPU count.

        Examples
        --------
        >>> # Use 75% of available CPUs
        >>> settings = WorkerSettings.create_from_cpu_count(use_fraction=0.75)

        """
        cpu_count = multiprocessing.cpu_count()
        n_workers = max(1, int(cpu_count * use_fraction / threads_per_worker))

        return cls(
            n_workers=n_workers,
            threads_per_worker=threads_per_worker,
            block_shape=block_shape,
        )

    def validate_against_system(self) -> Dict[str, Any]:
        """Validate settings against system resources.

        Returns
        -------
        dict
            Dictionary with validation results and warnings.

        Examples
        --------
        >>> settings = WorkerSettings(n_workers=16, threads_per_worker=8)
        >>> validation = settings.validate_against_system()
        >>> if validation['warnings']:
        ...     for warning in validation['warnings']:
        ...         print(f"Warning: {warning}")

        """
        cpu_count = multiprocessing.cpu_count()
        warnings = []

        # Check if total threads exceed CPU count
        if self.total_threads > cpu_count:
            warnings.append(
                f"Total threads ({self.total_threads}) exceeds available "
                f"CPU cores ({cpu_count}). May cause oversubscription."
            )

        # Check if block size is very large
        if self.block_size > 1_000_000:
            warnings.append(
                f"Block size ({self.block_size:,}) is very large. "
                "May cause high memory usage."
            )

        # Check if block size is very small
        if self.block_size < 1_000:
            warnings.append(
                f"Block size ({self.block_size:,}) is very small. "
                "May cause poor performance due to overhead."
            )

        return {
            "valid": len(warnings) == 0,
            "warnings": warnings,
            "system_cpu_count": cpu_count,
            "configured_total_threads": self.total_threads,
            "cpu_utilization": self.total_threads / cpu_count if cpu_count > 0 else 0,
        }

    model_config = ConfigDict(
        extra="forbid",
        validate_assignment=True,
        json_schema_extra={
            "examples": [
                {"n_workers": 4, "threads_per_worker": 2, "block_shape": [128, 128]},
                {"n_workers": 8, "threads_per_worker": 4, "block_shape": [256, 256]},
            ]
        },
    )

block_size property

block_size: int

Total number of elements per block.

Returns:

Type Description
int

Product of block_shape dimensions (rows * columns).

total_threads property

total_threads: int

Total number of threads across all workers.

Returns:

Type Description
int

n_workers * threads_per_worker

create_from_cpu_count classmethod

create_from_cpu_count(use_fraction: float = 0.75, threads_per_worker: int = 2, block_shape: Tuple[int, int] = (128, 128)) -> WorkerSettings

Create settings based on available CPU count.

Parameters:

Name Type Description Default
use_fraction float

Fraction of available CPUs to use (0.0 to 1.0).

0.75
threads_per_worker int

Threads per worker.

2
block_shape Tuple[int, int]

Block shape for data loading.

(128, 128)

Returns:

Type Description
WorkerSettings

Configuration tuned to system CPU count.

Examples:

>>> # Use 75% of available CPUs
>>> settings = WorkerSettings.create_from_cpu_count(use_fraction=0.75)
Source code in src/cal_disp/config/_workers.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
@classmethod
def create_from_cpu_count(
    cls,
    use_fraction: float = 0.75,
    threads_per_worker: int = 2,
    block_shape: Tuple[int, int] = (128, 128),
) -> "WorkerSettings":
    """Create settings based on available CPU count.

    Parameters
    ----------
    use_fraction : float, default=0.75
        Fraction of available CPUs to use (0.0 to 1.0).
    threads_per_worker : int, default=2
        Threads per worker.
    block_shape : Tuple[int, int], default=(128, 128)
        Block shape for data loading.

    Returns
    -------
    WorkerSettings
        Configuration tuned to system CPU count.

    Examples
    --------
    >>> # Use 75% of available CPUs
    >>> settings = WorkerSettings.create_from_cpu_count(use_fraction=0.75)

    """
    cpu_count = multiprocessing.cpu_count()
    n_workers = max(1, int(cpu_count * use_fraction / threads_per_worker))

    return cls(
        n_workers=n_workers,
        threads_per_worker=threads_per_worker,
        block_shape=block_shape,
    )

create_heavy classmethod

create_heavy() -> WorkerSettings

Create heavy-duty settings for large datasets.

Returns:

Type Description
WorkerSettings

Configuration with 8 workers, 4 threads each, large blocks.

Source code in src/cal_disp/config/_workers.py
232
233
234
235
236
237
238
239
240
241
242
@classmethod
def create_heavy(cls) -> "WorkerSettings":
    """Create heavy-duty settings for large datasets.

    Returns
    -------
    WorkerSettings
        Configuration with 8 workers, 4 threads each, large blocks.

    """
    return cls(n_workers=8, threads_per_worker=4, block_shape=(256, 256))

create_lightweight classmethod

create_lightweight() -> WorkerSettings

Create lightweight settings for small datasets or testing.

Returns:

Type Description
WorkerSettings

Configuration with 2 workers, 1 thread each, small blocks.

Source code in src/cal_disp/config/_workers.py
208
209
210
211
212
213
214
215
216
217
218
@classmethod
def create_lightweight(cls) -> "WorkerSettings":
    """Create lightweight settings for small datasets or testing.

    Returns
    -------
    WorkerSettings
        Configuration with 2 workers, 1 thread each, small blocks.

    """
    return cls(n_workers=2, threads_per_worker=1, block_shape=(64, 64))

create_standard classmethod

create_standard() -> WorkerSettings

Create standard settings for typical workloads.

Returns:

Type Description
WorkerSettings

Configuration with 4 workers, 2 threads each, medium blocks.

Source code in src/cal_disp/config/_workers.py
220
221
222
223
224
225
226
227
228
229
230
@classmethod
def create_standard(cls) -> "WorkerSettings":
    """Create standard settings for typical workloads.

    Returns
    -------
    WorkerSettings
        Configuration with 4 workers, 2 threads each, medium blocks.

    """
    return cls(n_workers=4, threads_per_worker=2, block_shape=(128, 128))

estimate_memory_per_block

estimate_memory_per_block(dtype_size: int = 8, n_bands: int = 1) -> float

Estimate memory usage per block in MB.

Parameters:

Name Type Description Default
dtype_size int

Size of data type in bytes (e.g., 8 for float64, 4 for float32).

8
n_bands int

Number of bands/layers in the data.

1

Returns:

Type Description
float

Estimated memory in megabytes.

Examples:

>>> settings = WorkerSettings(block_shape=(256, 256))
>>> mem_mb = settings.estimate_memory_per_block(dtype_size=8, n_bands=2)
>>> print(f"Estimated memory: {mem_mb:.2f} MB")
Source code in src/cal_disp/config/_workers.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def estimate_memory_per_block(self, dtype_size: int = 8, n_bands: int = 1) -> float:
    """Estimate memory usage per block in MB.

    Parameters
    ----------
    dtype_size : int, default=8
        Size of data type in bytes (e.g., 8 for float64, 4 for float32).
    n_bands : int, default=1
        Number of bands/layers in the data.

    Returns
    -------
    float
        Estimated memory in megabytes.

    Examples
    --------
    >>> settings = WorkerSettings(block_shape=(256, 256))
    >>> mem_mb = settings.estimate_memory_per_block(dtype_size=8, n_bands=2)
    >>> print(f"Estimated memory: {mem_mb:.2f} MB")

    """
    bytes_per_block = self.block_size * dtype_size * n_bands
    return bytes_per_block / (1024 * 1024)  # Convert to MB

estimate_total_memory

estimate_total_memory(dtype_size: int = 8, n_bands: int = 1, overhead_factor: float = 1.5) -> float

Estimate total memory usage across all workers in GB.

Parameters:

Name Type Description Default
dtype_size int

Size of data type in bytes.

8
n_bands int

Number of bands/layers in the data.

1
overhead_factor float

Multiplier for overhead (copies, intermediate results).

1.5

Returns:

Type Description
float

Estimated total memory in gigabytes.

Source code in src/cal_disp/config/_workers.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def estimate_total_memory(
    self, dtype_size: int = 8, n_bands: int = 1, overhead_factor: float = 1.5
) -> float:
    """Estimate total memory usage across all workers in GB.

    Parameters
    ----------
    dtype_size : int, default=8
        Size of data type in bytes.
    n_bands : int, default=1
        Number of bands/layers in the data.
    overhead_factor : float, default=1.5
        Multiplier for overhead (copies, intermediate results).

    Returns
    -------
    float
        Estimated total memory in gigabytes.

    """
    mb_per_block = self.estimate_memory_per_block(dtype_size, n_bands)
    # Assume each worker might hold multiple blocks
    total_mb = mb_per_block * self.n_workers * overhead_factor
    return total_mb / 1024  # Convert to GB

summary

summary() -> str

Generate a human-readable summary of settings.

Returns:

Type Description
str

Multi-line summary string.

Source code in src/cal_disp/config/_workers.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def summary(self) -> str:
    """Generate a human-readable summary of settings.

    Returns
    -------
    str
        Multi-line summary string.

    """
    lines = [
        "WorkerSettings Configuration:",
        "=" * 50,
        f"Workers:              {self.n_workers}",
        f"Threads per worker:   {self.threads_per_worker}",
        f"Total threads:        {self.total_threads}",
        f"Block shape:          {self.block_shape[0]} x {self.block_shape[1]}",
        f"Block size:           {self.block_size:,} elements",
        f"Estimated mem/block:  {self.estimate_memory_per_block():.2f} MB",
        f"Estimated total mem:  {self.estimate_total_memory():.2f} GB",
        "=" * 50,
    ]
    return "\n".join(lines)

validate_against_system

validate_against_system() -> Dict[str, Any]

Validate settings against system resources.

Returns:

Type Description
dict

Dictionary with validation results and warnings.

Examples:

>>> settings = WorkerSettings(n_workers=16, threads_per_worker=8)
>>> validation = settings.validate_against_system()
>>> if validation['warnings']:
...     for warning in validation['warnings']:
...         print(f"Warning: {warning}")
Source code in src/cal_disp/config/_workers.py
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
def validate_against_system(self) -> Dict[str, Any]:
    """Validate settings against system resources.

    Returns
    -------
    dict
        Dictionary with validation results and warnings.

    Examples
    --------
    >>> settings = WorkerSettings(n_workers=16, threads_per_worker=8)
    >>> validation = settings.validate_against_system()
    >>> if validation['warnings']:
    ...     for warning in validation['warnings']:
    ...         print(f"Warning: {warning}")

    """
    cpu_count = multiprocessing.cpu_count()
    warnings = []

    # Check if total threads exceed CPU count
    if self.total_threads > cpu_count:
        warnings.append(
            f"Total threads ({self.total_threads}) exceeds available "
            f"CPU cores ({cpu_count}). May cause oversubscription."
        )

    # Check if block size is very large
    if self.block_size > 1_000_000:
        warnings.append(
            f"Block size ({self.block_size:,}) is very large. "
            "May cause high memory usage."
        )

    # Check if block size is very small
    if self.block_size < 1_000:
        warnings.append(
            f"Block size ({self.block_size:,}) is very small. "
            "May cause poor performance due to overhead."
        )

    return {
        "valid": len(warnings) == 0,
        "warnings": warnings,
        "system_cpu_count": cpu_count,
        "configured_total_threads": self.total_threads,
        "cpu_utilization": self.total_threads / cpu_count if cpu_count > 0 else 0,
    }

YamlModel

Bases: BaseModel

Pydantic model that can be exported to yaml.

Source code in src/cal_disp/config/_yaml.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
class YamlModel(BaseModel):
    """Pydantic model that can be exported to yaml."""

    model_config = STRICT_CONFIG

    def to_yaml(
        self,
        output_path: Union[Filename, TextIO],
        with_comments: bool = True,
        by_alias: bool = True,
        indent_per_level: int = 2,
    ):
        """Save configuration as a yaml file.

        Used to record the default-filled version of a supplied yaml.

        Parameters
        ----------
        output_path : Pathlike
            Path to the yaml file to save.
        with_comments : bool, default = False.
            Whether to add comments containing the type/descriptions to all fields.
        by_alias : bool, default = False.
            Whether to use the alias names for the fields.
            Passed to pydantic's ``to_json`` method.
            https://docs.pydantic.dev/usage/exporting_models/#modeljson
        indent_per_level : int, default = 2
            Number of spaces to indent per level.

        """
        yaml_obj = self._to_yaml_obj(by_alias=by_alias)

        if with_comments:
            _add_comments(
                yaml_obj,
                self.model_json_schema(by_alias=by_alias),
                indent_per_level=indent_per_level,
            )

        y = YAML()
        # https://yaml.readthedocs.io/en/latest/detail.html#indentation-of-block-sequences
        y.indent(
            offset=indent_per_level,
            mapping=indent_per_level,
            # It is best to always have sequence >= offset + 2 but this is not enforced
            # not following this advice might lead to invalid output.
            sequence=indent_per_level + 2,
        )
        if hasattr(output_path, "write"):
            y.dump(yaml_obj, output_path)
        else:
            with open(output_path, "w") as f:
                y.dump(yaml_obj, f)

    @classmethod
    def from_yaml(cls, yaml_path: Filename):
        """Load a configuration from a yaml file.

        Parameters
        ----------
        yaml_path : Pathlike
            Path to the yaml file to load.

        Returns
        -------
        Config
            Workflow configuration

        """
        y = YAML(typ="safe")
        with open(yaml_path) as f:
            data = y.load(f)

        return cls(**data)

    @classmethod
    def print_yaml_schema(
        cls,
        output_path: Union[Filename, TextIO] = sys.stdout,
        indent_per_level: int = 2,
    ):
        """Print/save an empty configuration with defaults filled in.

        Ignores the required `input_file_list` input, so a user can
        inspect all fields.

        Parameters
        ----------
        output_path : Pathlike
            Path or stream to save to the yaml file to.
            By default, prints to stdout.
        indent_per_level : int, default = 2
            Number of spaces to indent per level.

        """
        cls.model_construct().to_yaml(
            output_path, with_comments=True, indent_per_level=indent_per_level
        )

    def _to_yaml_obj(self, by_alias: bool = True) -> CommentedMap:
        # Make the YAML object to add comments to
        # We can't just do `dumps` for some reason, need a stream
        y = YAML()
        ss = StringIO()
        y.dump(json.loads(self.model_dump_json(by_alias=by_alias)), ss)
        return y.load(ss.getvalue())

    def get_all_file_paths(
        self, include_none: bool = False, flatten_lists: bool = True
    ) -> Dict[str, Union[Path, List[Path]]]:
        """Get all Path fields from the model.

        Parameters
        ----------
        include_none : bool, default=False
            Include fields with None values.
        flatten_lists : bool, default=True
            Flatten list fields to individual entries with indices.

        Returns
        -------
        dict
            Mapping of field names to Path objects.

        """
        files: Dict[str, Path | list[Path]] = {}

        for field_name, field_info in self.model_fields.items():
            value = getattr(self, field_name)

            # Skip None values if requested
            if value is None and not include_none:
                continue

            # Check if field is Path or Optional[Path]
            if self._is_path_field(field_info):
                if value is not None:
                    files[field_name] = value

            # Check if field is List[Path]
            elif self._is_path_list_field(field_info):
                if value:
                    if flatten_lists:
                        for i, path in enumerate(value):
                            files[f"{field_name}[{i}]"] = path
                    else:
                        files[field_name] = value

        return files

    @staticmethod
    def _is_path_field(field_info) -> bool:
        """Check if field is a Path type."""
        from pathlib import Path
        from typing import Union, get_args, get_origin

        annotation = field_info.annotation

        # Handle Annotated[Path, ...]
        if get_origin(annotation) is not None:
            # Check if it's Annotated
            if hasattr(annotation, "__metadata__"):
                # Get the actual type from Annotated
                args = get_args(annotation)
                if args:
                    annotation = args[0]

        # Direct Path type
        if annotation is Path:
            return True

        # Optional[Path] or Union[Path, None]
        origin = get_origin(annotation)
        if origin is Union:
            args = get_args(annotation)
            return Path in args or any(arg is Path for arg in args)

        return False

    @staticmethod
    def _is_path_list_field(field_info) -> bool:
        """Check if field is a List[Path] type."""
        from pathlib import Path
        from typing import Union, get_args, get_origin

        annotation = field_info.annotation
        origin = get_origin(annotation)

        # Check if it's Optional[List[Path]]
        if origin is Union:
            args = get_args(annotation)
            for arg in args:
                if arg is type(None):
                    continue
                if get_origin(arg) in (list, List):
                    list_args = get_args(arg)
                    if list_args:
                        first_arg = list_args[0]
                        if first_arg is Path:
                            return True

        # Check if it's List[Path]
        if origin in (list, List):
            args = get_args(annotation)
            if not args:
                return False
            first_arg = args[0]
            is_path_type: bool = first_arg is Path
            return is_path_type

        return False

    def validate_files_exist(
        self, raise_on_missing: bool = False
    ) -> Dict[str, Dict[str, Any]]:
        """Validate all file paths exist on disk.

        Parameters
        ----------
        raise_on_missing : bool, default=False
            If True, raise FileNotFoundError on first missing file.

        Returns
        -------
        dict
            Detailed status for each file including existence, size, etc.

        Raises
        ------
        FileNotFoundError
            If raise_on_missing=True and any file is missing.

        """
        results: Dict[str, Dict[str, Any]] = {}

        # flatten_lists=True guarantees Dict[str, Path]
        file_paths: Dict[str, Path] = cast(
            Dict[str, Path], self.get_all_file_paths(flatten_lists=True)
        )

        for field_name, path in file_paths.items():
            exists = path.exists()

            if not exists and raise_on_missing:
                raise FileNotFoundError(
                    f"Required file not found: {field_name} = {path}"
                )

            results[field_name] = {
                "exists": exists,
                "is_file": path.is_file() if exists else None,
                "is_dir": path.is_dir() if exists else None,
                "size_bytes": path.stat().st_size if exists else None,
                "absolute_path": str(path.absolute()),
            }

        return results

    def validate_ready_to_run(self) -> ValidationResult:
        """Check if configuration is ready to run."""
        return ValidationResult(ready=True, errors=[], warnings=[])

    def get_missing_files(self) -> List[str]:
        """Get list of missing file paths."""
        return [
            f"{name}: {info['absolute_path']}"
            for name, info in self.validate_files_exist().items()
            if not info["exists"]
        ]

    def all_files_exist(self) -> bool:
        """Check if all files exist."""
        return len(self.get_missing_files()) == 0

all_files_exist

all_files_exist() -> bool

Check if all files exist.

Source code in src/cal_disp/config/_yaml.py
309
310
311
def all_files_exist(self) -> bool:
    """Check if all files exist."""
    return len(self.get_missing_files()) == 0

from_yaml classmethod

from_yaml(yaml_path: Filename)

Load a configuration from a yaml file.

Parameters:

Name Type Description Default
yaml_path Pathlike

Path to the yaml file to load.

required

Returns:

Type Description
Config

Workflow configuration

Source code in src/cal_disp/config/_yaml.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
@classmethod
def from_yaml(cls, yaml_path: Filename):
    """Load a configuration from a yaml file.

    Parameters
    ----------
    yaml_path : Pathlike
        Path to the yaml file to load.

    Returns
    -------
    Config
        Workflow configuration

    """
    y = YAML(typ="safe")
    with open(yaml_path) as f:
        data = y.load(f)

    return cls(**data)

get_all_file_paths

get_all_file_paths(include_none: bool = False, flatten_lists: bool = True) -> Dict[str, Union[Path, List[Path]]]

Get all Path fields from the model.

Parameters:

Name Type Description Default
include_none bool

Include fields with None values.

False
flatten_lists bool

Flatten list fields to individual entries with indices.

True

Returns:

Type Description
dict

Mapping of field names to Path objects.

Source code in src/cal_disp/config/_yaml.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def get_all_file_paths(
    self, include_none: bool = False, flatten_lists: bool = True
) -> Dict[str, Union[Path, List[Path]]]:
    """Get all Path fields from the model.

    Parameters
    ----------
    include_none : bool, default=False
        Include fields with None values.
    flatten_lists : bool, default=True
        Flatten list fields to individual entries with indices.

    Returns
    -------
    dict
        Mapping of field names to Path objects.

    """
    files: Dict[str, Path | list[Path]] = {}

    for field_name, field_info in self.model_fields.items():
        value = getattr(self, field_name)

        # Skip None values if requested
        if value is None and not include_none:
            continue

        # Check if field is Path or Optional[Path]
        if self._is_path_field(field_info):
            if value is not None:
                files[field_name] = value

        # Check if field is List[Path]
        elif self._is_path_list_field(field_info):
            if value:
                if flatten_lists:
                    for i, path in enumerate(value):
                        files[f"{field_name}[{i}]"] = path
                else:
                    files[field_name] = value

    return files

get_missing_files

get_missing_files() -> List[str]

Get list of missing file paths.

Source code in src/cal_disp/config/_yaml.py
301
302
303
304
305
306
307
def get_missing_files(self) -> List[str]:
    """Get list of missing file paths."""
    return [
        f"{name}: {info['absolute_path']}"
        for name, info in self.validate_files_exist().items()
        if not info["exists"]
    ]

print_yaml_schema classmethod

print_yaml_schema(output_path: Union[Filename, TextIO] = sys.stdout, indent_per_level: int = 2)

Print/save an empty configuration with defaults filled in.

Ignores the required input_file_list input, so a user can inspect all fields.

Parameters:

Name Type Description Default
output_path Pathlike

Path or stream to save to the yaml file to. By default, prints to stdout.

stdout
indent_per_level int

Number of spaces to indent per level.

= 2
Source code in src/cal_disp/config/_yaml.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
@classmethod
def print_yaml_schema(
    cls,
    output_path: Union[Filename, TextIO] = sys.stdout,
    indent_per_level: int = 2,
):
    """Print/save an empty configuration with defaults filled in.

    Ignores the required `input_file_list` input, so a user can
    inspect all fields.

    Parameters
    ----------
    output_path : Pathlike
        Path or stream to save to the yaml file to.
        By default, prints to stdout.
    indent_per_level : int, default = 2
        Number of spaces to indent per level.

    """
    cls.model_construct().to_yaml(
        output_path, with_comments=True, indent_per_level=indent_per_level
    )

to_yaml

to_yaml(output_path: Union[Filename, TextIO], with_comments: bool = True, by_alias: bool = True, indent_per_level: int = 2)

Save configuration as a yaml file.

Used to record the default-filled version of a supplied yaml.

Parameters:

Name Type Description Default
output_path Pathlike

Path to the yaml file to save.

required
with_comments bool

Whether to add comments containing the type/descriptions to all fields.

= False.
by_alias bool

Whether to use the alias names for the fields. Passed to pydantic's to_json method. https://docs.pydantic.dev/usage/exporting_models/#modeljson

= False.
indent_per_level int

Number of spaces to indent per level.

= 2
Source code in src/cal_disp/config/_yaml.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def to_yaml(
    self,
    output_path: Union[Filename, TextIO],
    with_comments: bool = True,
    by_alias: bool = True,
    indent_per_level: int = 2,
):
    """Save configuration as a yaml file.

    Used to record the default-filled version of a supplied yaml.

    Parameters
    ----------
    output_path : Pathlike
        Path to the yaml file to save.
    with_comments : bool, default = False.
        Whether to add comments containing the type/descriptions to all fields.
    by_alias : bool, default = False.
        Whether to use the alias names for the fields.
        Passed to pydantic's ``to_json`` method.
        https://docs.pydantic.dev/usage/exporting_models/#modeljson
    indent_per_level : int, default = 2
        Number of spaces to indent per level.

    """
    yaml_obj = self._to_yaml_obj(by_alias=by_alias)

    if with_comments:
        _add_comments(
            yaml_obj,
            self.model_json_schema(by_alias=by_alias),
            indent_per_level=indent_per_level,
        )

    y = YAML()
    # https://yaml.readthedocs.io/en/latest/detail.html#indentation-of-block-sequences
    y.indent(
        offset=indent_per_level,
        mapping=indent_per_level,
        # It is best to always have sequence >= offset + 2 but this is not enforced
        # not following this advice might lead to invalid output.
        sequence=indent_per_level + 2,
    )
    if hasattr(output_path, "write"):
        y.dump(yaml_obj, output_path)
    else:
        with open(output_path, "w") as f:
            y.dump(yaml_obj, f)

validate_files_exist

validate_files_exist(raise_on_missing: bool = False) -> Dict[str, Dict[str, Any]]

Validate all file paths exist on disk.

Parameters:

Name Type Description Default
raise_on_missing bool

If True, raise FileNotFoundError on first missing file.

False

Returns:

Type Description
dict

Detailed status for each file including existence, size, etc.

Raises:

Type Description
FileNotFoundError

If raise_on_missing=True and any file is missing.

Source code in src/cal_disp/config/_yaml.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
def validate_files_exist(
    self, raise_on_missing: bool = False
) -> Dict[str, Dict[str, Any]]:
    """Validate all file paths exist on disk.

    Parameters
    ----------
    raise_on_missing : bool, default=False
        If True, raise FileNotFoundError on first missing file.

    Returns
    -------
    dict
        Detailed status for each file including existence, size, etc.

    Raises
    ------
    FileNotFoundError
        If raise_on_missing=True and any file is missing.

    """
    results: Dict[str, Dict[str, Any]] = {}

    # flatten_lists=True guarantees Dict[str, Path]
    file_paths: Dict[str, Path] = cast(
        Dict[str, Path], self.get_all_file_paths(flatten_lists=True)
    )

    for field_name, path in file_paths.items():
        exists = path.exists()

        if not exists and raise_on_missing:
            raise FileNotFoundError(
                f"Required file not found: {field_name} = {path}"
            )

        results[field_name] = {
            "exists": exists,
            "is_file": path.is_file() if exists else None,
            "is_dir": path.is_dir() if exists else None,
            "size_bytes": path.stat().st_size if exists else None,
            "absolute_path": str(path.absolute()),
        }

    return results

validate_ready_to_run

validate_ready_to_run() -> ValidationResult

Check if configuration is ready to run.

Source code in src/cal_disp/config/_yaml.py
297
298
299
def validate_ready_to_run(self) -> ValidationResult:
    """Check if configuration is ready to run."""
    return ValidationResult(ready=True, errors=[], warnings=[])

pge_runconfig

OutputOptions

Bases: YamlModel

Output configuration options.

Attributes:

Name Type Description
product_version str

Version of the product in . format.

output_format str

Format for output files (e.g., 'netcdf', 'hdf5').

compression bool

Whether to compress output files.

Source code in src/cal_disp/config/pge_runconfig.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class OutputOptions(YamlModel):
    """Output configuration options.

    Attributes
    ----------
    product_version : str
        Version of the product in <major>.<minor> format.
    output_format : str
        Format for output files (e.g., 'netcdf', 'hdf5').
    compression : bool
        Whether to compress output files.

    """

    product_version: str = Field(
        default="1.0",
        description="Version of the product, in <major>.<minor> format.",
    )

    output_format: str = Field(
        default="netcdf",
        description="Output file format.",
    )

    compression: bool = Field(
        default=True,
        description="Whether to compress output files.",
    )

    model_config = ConfigDict(extra="forbid")

PrimaryExecutable

Bases: YamlModel

Group describing the primary executable.

Attributes:

Name Type Description
product_type str

Product type identifier for the PGE.

Source code in src/cal_disp/config/pge_runconfig.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class PrimaryExecutable(YamlModel):
    """Group describing the primary executable.

    Attributes
    ----------
    product_type : str
        Product type identifier for the PGE.

    """

    product_type: str = Field(
        default="CAL_DISP",
        description="Product type of the PGE.",
    )

    model_config = ConfigDict(extra="forbid")

ProductPathGroup

Bases: YamlModel

Group describing the product paths.

Attributes:

Name Type Description
product_path Path

Directory where PGE will place results.

scratch_path Path

Path to the scratch directory for intermediate files.

output_path Path

Path to the SAS output directory.

Source code in src/cal_disp/config/pge_runconfig.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
class ProductPathGroup(YamlModel):
    """Group describing the product paths.

    Attributes
    ----------
    product_path : Path
        Directory where PGE will place results.
    scratch_path : Path
        Path to the scratch directory for intermediate files.
    output_path : Path
        Path to the SAS output directory.

    """

    product_path: DirectoryPath = Field(
        default=Path(),
        description="Directory where PGE will place results.",
    )

    scratch_path: DirectoryPath = Field(
        default=Path("./scratch"),
        description="Path to the scratch directory for intermediate files.",
    )

    output_path: DirectoryPath = Field(
        default=Path("./output"),
        description="Path to the SAS output directory.",
        alias="sas_output_path",
    )

    model_config = ConfigDict(extra="forbid", populate_by_name=True)

RunConfig

Bases: YamlModel

A PGE (Product Generation Executive) run configuration.

This class represents the top-level configuration for running the calibration workflow as a PGE. It includes input files, output options, paths, and worker settings.

Attributes:

Name Type Description
input_file_group InputFileGroup

Configuration for input files.

dynamic_ancillary_file_group Optional[DynamicAncillaryFileGroup]

Dynamic ancillary files configuration.

static_ancillary_file_group Optional[StaticAncillaryFileGroup]

Static ancillary files configuration.

output_options OutputOptions

Output configuration options.

primary_executable PrimaryExecutable

Primary executable configuration.

product_path_group ProductPathGroup

Product path configuration.

worker_settings WorkerSettings

Dask worker and parallelism configuration.

log_file Optional[Path]

Path to the output log file.

Source code in src/cal_disp/config/pge_runconfig.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
class RunConfig(YamlModel):
    """A PGE (Product Generation Executive) run configuration.

    This class represents the top-level configuration for running the
    calibration workflow as a PGE. It includes input files, output options,
    paths, and worker settings.

    Attributes
    ----------
    input_file_group : InputFileGroup
        Configuration for input files.
    dynamic_ancillary_file_group : Optional[DynamicAncillaryFileGroup]
        Dynamic ancillary files configuration.
    static_ancillary_file_group : Optional[StaticAncillaryFileGroup]
        Static ancillary files configuration.
    output_options : OutputOptions
        Output configuration options.
    primary_executable : PrimaryExecutable
        Primary executable configuration.
    product_path_group : ProductPathGroup
        Product path configuration.
    worker_settings : WorkerSettings
        Dask worker and parallelism configuration.
    log_file : Optional[Path]
        Path to the output log file.

    """

    # Used for the top-level key in YAML
    name: ClassVar[str] = "cal_disp_workflow"

    # Input configuration
    input_file_group: InputFileGroup = Field(
        ...,
        description="Configuration for required input files.",
    )

    dynamic_ancillary_file_group: Optional[DynamicAncillaryFileGroup] = Field(
        default=None,
        description="Dynamic ancillary files configuration.",
    )

    static_ancillary_file_group: Optional[StaticAncillaryFileGroup] = Field(
        default=None,
        description="Static ancillary files configuration.",
    )

    # Output and execution configuration
    output_options: OutputOptions = Field(
        default_factory=OutputOptions,
        description="Output configuration options.",
    )

    primary_executable: PrimaryExecutable = Field(
        default_factory=PrimaryExecutable,
        description="Primary executable configuration.",
    )

    product_path_group: ProductPathGroup = Field(
        default_factory=ProductPathGroup,
        description="Product path configuration.",
    )

    # Worker settings
    worker_settings: WorkerSettings = Field(
        default_factory=WorkerSettings,
        description="Dask worker and parallelism configuration.",
    )

    # Logging
    log_file: Optional[Path] = Field(
        default=None,
        description=(
            "Path to the output log file in addition to logging to stderr. "
            "If None, will be set based on output path."
        ),
    )

    def to_workflow(self) -> CalibrationWorkflow:
        """Convert PGE RunConfig to a CalibrationWorkflow object.

        This method translates the PGE-style configuration into the format
        expected by CalibrationWorkflow.

        Returns
        -------
        CalibrationWorkflow
            Converted workflow configuration.

        Examples
        --------
        >>> run_config = RunConfig.from_yaml_file("pge_config.yaml")
        >>> workflow = run_config.to_workflow()
        >>> workflow.create_directories()
        >>> workflow.run()

        """
        # Set up directories
        scratch_directory = self.product_path_group.scratch_path
        output_directory = self.product_path_group.output_path

        # Set up log file
        log_file = self.log_file
        if log_file is None:
            log_file = output_directory / "cal_disp_workflow.log"

        # Create the workflow
        workflow = CalibrationWorkflow(
            # Input files
            input_options=self.input_file_group,
            # Ancillary files
            dynamic_ancillary_file_options=self.dynamic_ancillary_file_group,
            static_ancillary_file_options=self.static_ancillary_file_group,
            # Directories
            work_directory=scratch_directory,
            output_directory=output_directory,
            # Settings
            worker_settings=self.worker_settings,
            log_file=log_file,
            # Don't resolve paths yet - let the workflow handle it
            keep_paths_relative=False,
        )

        return workflow

    def create_directories(self, exist_ok: bool = True) -> None:
        """Create all necessary directories for the PGE run.

        Parameters
        ----------
        exist_ok : bool, default=True
            If True, don't raise error if directories already exist.

        """
        self.product_path_group.product_path.mkdir(parents=True, exist_ok=exist_ok)
        self.product_path_group.scratch_path.mkdir(parents=True, exist_ok=exist_ok)
        self.product_path_group.output_path.mkdir(parents=True, exist_ok=exist_ok)

        # Create tmp directory in scratch
        tmp_dir = self.product_path_group.scratch_path / "tmp"
        tmp_dir.mkdir(parents=True, exist_ok=exist_ok)

    def validate_ready_to_run(self) -> ValidationResult:
        """Check if run configuration is ready for execution."""
        errors: List[str] = []
        warnings: List[str] = []

        # Check if None instead
        if self.input_file_group.disp_file is None:
            errors.append("disp_file must be provided")

        if self.input_file_group.calibration_reference_grid is None:
            errors.append("calibration_reference_grid must be provided")

        # Check dynamic ancillary files if provided
        if self.dynamic_ancillary_file_group:
            if self.dynamic_ancillary_file_group.algorithm_parameters_file is None:
                warnings.append("Missing algorithm_parameters_file")
            if self.dynamic_ancillary_file_group.geometry_file is None:
                warnings.append("Missing geometry_file")

        return {
            "ready": len(errors) == 0,
            "errors": errors,
            "warnings": warnings,
        }

    def summary(self) -> str:
        """Generate a human-readable summary of the run configuration.

        Returns
        -------
        str
            Multi-line summary string.

        """
        lines = [
            "PGE Run Configuration",
            "=" * 70,
            "",
            "Product Information:",
            f"  Product type:     {self.primary_executable.product_type}",
            f"  Product version:  {self.output_options.product_version}",
            f"  Output format:    {self.output_options.output_format}",
            "",
            "Paths:",
            f"  Product path:     {self.product_path_group.product_path}",
            f"  Scratch path:     {self.product_path_group.scratch_path}",
            f"  Output path:      {self.product_path_group.output_path}",
            f"  Log file:         {self.log_file or 'Default'}",
            "",
            "Input Files:",
            f"  DISP file:        {self.input_file_group.disp_file}",
            f"  Calibration grid: {self.input_file_group.calibration_reference_grid}",
            "",
            "Worker Settings:",
            f"  Workers:          {self.worker_settings.n_workers}",
            f"  Threads/worker:   {self.worker_settings.threads_per_worker}",
            f"  Total threads:    {self.worker_settings.total_threads}",
        ]

        # Dynamic ancillary files
        if self.dynamic_ancillary_file_group:
            lines.extend(
                [
                    "",
                    "Dynamic Ancillary Files:",
                ]
            )
            dynamic_files = self.dynamic_ancillary_file_group.get_all_files()
            for name, path in list(dynamic_files.items())[:5]:
                lines.append(f"  {name}: {path}")
            if len(dynamic_files) > 5:
                lines.append(f"  ... and {len(dynamic_files) - 5} more")

        # Static ancillary files
        if self.static_ancillary_file_group:
            static_files = self.static_ancillary_file_group.get_all_files()
            if static_files:
                lines.extend(
                    [
                        "",
                        "Static Ancillary Files:",
                    ]
                )
                for name, path in static_files.items():
                    lines.append(f"  {name}: {path}")

        # Validation status
        status = self.validate_ready_to_run()
        lines.append("")
        if not status["ready"]:
            lines.append("⚠️  Status: NOT READY")
            lines.append("Errors:")
            for error in status["errors"]:
                lines.append(f"  - {error}")
        else:
            lines.append("✓ Status: READY")

        if status["warnings"]:
            lines.append("")
            lines.append("Warnings:")
            for warning in status["warnings"]:
                lines.append(f"  ⚠️  {warning}")

        lines.append("=" * 70)

        return "\n".join(lines)

    @classmethod
    def create_example(cls) -> "RunConfig":
        """Create an example PGE run configuration.

        Returns
        -------
        RunConfig
            Example configuration with placeholder values.

        """
        return cls(
            input_file_group=InputFileGroup(
                disp_file=Path("input/disp.h5"),
                calibration_reference_grid=Path("input/cal_grid.parquet"),
                frame_id=1,
            ),
            dynamic_ancillary_file_group=DynamicAncillaryFileGroup(
                algorithm_parameters_file=Path("config/algorithm_params.yaml"),
                static_layers_file=Path("input/geometry.h5"),
            ),
            product_path_group=ProductPathGroup(
                scratch_path=Path("./scratch"), output_path=Path("./output")
            ),
            worker_settings=WorkerSettings.create_standard(),
        )

    @classmethod
    def from_yaml_file(cls, yaml_path: Path) -> "RunConfig":
        """Load run configuration from YAML file.

        Handles optional name wrapper key.

        Parameters
        ----------
        yaml_path : Path
            Path to YAML configuration file.

        Returns
        -------
        RunConfig
            Loaded and validated run configuration.

        """
        data = cls._load_yaml_data(yaml_path)

        # Handle optional wrapper key
        if cls.name in data:
            data = data[cls.name]

        return cls.model_validate(data)

    @staticmethod
    def _load_yaml_data(yaml_path: Path) -> dict:
        """Load YAML data from file.

        Parameters
        ----------
        yaml_path : Path
            Path to YAML file.

        Returns
        -------
        dict
            Loaded YAML data.

        """
        from ruamel.yaml import YAML

        y = YAML(typ="safe")
        with open(yaml_path) as f:
            return y.load(f)

    def to_yaml(
        self,
        output_path: Union[str, PathLike, TextIO],
        with_comments: bool = True,  # noqa: ARG002
        by_alias: bool = True,
        indent_per_level: int = 2,
    ) -> None:  # Note: return type can be None or Any, both work
        """Save configuration to YAML file with name wrapper.

        Parameters
        ----------
        output_path : str | PathLike | TextIO
            Path where YAML should be saved.
        with_comments : bool, default=True
            Whether to include field descriptions as comments.
        by_alias : bool, default=True
            Whether to use field aliases in output.
        indent_per_level : int, default=2
            Indentation spacing.

        Notes
        -----
        This method always wraps output in {cal_disp_workflow: ...} structure.
        The with_comments parameter is accepted for signature compatibility but
        not currently used in the wrapper output.

        """
        from ruamel.yaml import YAML

        # Handle file-like objects
        if hasattr(output_path, "write"):
            raise ValueError(
                "RunConfig.to_yaml doesn't support file-like objects. "
                "Save to a file path instead."
            )

        # Convert to Path
        output_path_obj = Path(output_path)

        # Get dict and convert paths
        data = self.model_dump(mode="python", by_alias=by_alias)
        data = convert_paths_to_strings(data)
        wrapped = {self.name: data}

        # Write with proper formatting
        output_path_obj.parent.mkdir(parents=True, exist_ok=True)
        y = YAML()
        y.indent(
            mapping=indent_per_level,
            sequence=indent_per_level + 2,
            offset=indent_per_level,
        )
        with open(output_path_obj, "w") as f:
            y.dump(wrapped, f)

    model_config = STRICT_CONFIG_WITH_ALIASES
create_directories
create_directories(exist_ok: bool = True) -> None

Create all necessary directories for the PGE run.

Parameters:

Name Type Description Default
exist_ok bool

If True, don't raise error if directories already exist.

True
Source code in src/cal_disp/config/pge_runconfig.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def create_directories(self, exist_ok: bool = True) -> None:
    """Create all necessary directories for the PGE run.

    Parameters
    ----------
    exist_ok : bool, default=True
        If True, don't raise error if directories already exist.

    """
    self.product_path_group.product_path.mkdir(parents=True, exist_ok=exist_ok)
    self.product_path_group.scratch_path.mkdir(parents=True, exist_ok=exist_ok)
    self.product_path_group.output_path.mkdir(parents=True, exist_ok=exist_ok)

    # Create tmp directory in scratch
    tmp_dir = self.product_path_group.scratch_path / "tmp"
    tmp_dir.mkdir(parents=True, exist_ok=exist_ok)
create_example classmethod
create_example() -> 'RunConfig'

Create an example PGE run configuration.

Returns:

Type Description
RunConfig

Example configuration with placeholder values.

Source code in src/cal_disp/config/pge_runconfig.py
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
@classmethod
def create_example(cls) -> "RunConfig":
    """Create an example PGE run configuration.

    Returns
    -------
    RunConfig
        Example configuration with placeholder values.

    """
    return cls(
        input_file_group=InputFileGroup(
            disp_file=Path("input/disp.h5"),
            calibration_reference_grid=Path("input/cal_grid.parquet"),
            frame_id=1,
        ),
        dynamic_ancillary_file_group=DynamicAncillaryFileGroup(
            algorithm_parameters_file=Path("config/algorithm_params.yaml"),
            static_layers_file=Path("input/geometry.h5"),
        ),
        product_path_group=ProductPathGroup(
            scratch_path=Path("./scratch"), output_path=Path("./output")
        ),
        worker_settings=WorkerSettings.create_standard(),
    )
from_yaml_file classmethod
from_yaml_file(yaml_path: Path) -> 'RunConfig'

Load run configuration from YAML file.

Handles optional name wrapper key.

Parameters:

Name Type Description Default
yaml_path Path

Path to YAML configuration file.

required

Returns:

Type Description
RunConfig

Loaded and validated run configuration.

Source code in src/cal_disp/config/pge_runconfig.py
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
@classmethod
def from_yaml_file(cls, yaml_path: Path) -> "RunConfig":
    """Load run configuration from YAML file.

    Handles optional name wrapper key.

    Parameters
    ----------
    yaml_path : Path
        Path to YAML configuration file.

    Returns
    -------
    RunConfig
        Loaded and validated run configuration.

    """
    data = cls._load_yaml_data(yaml_path)

    # Handle optional wrapper key
    if cls.name in data:
        data = data[cls.name]

    return cls.model_validate(data)
summary
summary() -> str

Generate a human-readable summary of the run configuration.

Returns:

Type Description
str

Multi-line summary string.

Source code in src/cal_disp/config/pge_runconfig.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
def summary(self) -> str:
    """Generate a human-readable summary of the run configuration.

    Returns
    -------
    str
        Multi-line summary string.

    """
    lines = [
        "PGE Run Configuration",
        "=" * 70,
        "",
        "Product Information:",
        f"  Product type:     {self.primary_executable.product_type}",
        f"  Product version:  {self.output_options.product_version}",
        f"  Output format:    {self.output_options.output_format}",
        "",
        "Paths:",
        f"  Product path:     {self.product_path_group.product_path}",
        f"  Scratch path:     {self.product_path_group.scratch_path}",
        f"  Output path:      {self.product_path_group.output_path}",
        f"  Log file:         {self.log_file or 'Default'}",
        "",
        "Input Files:",
        f"  DISP file:        {self.input_file_group.disp_file}",
        f"  Calibration grid: {self.input_file_group.calibration_reference_grid}",
        "",
        "Worker Settings:",
        f"  Workers:          {self.worker_settings.n_workers}",
        f"  Threads/worker:   {self.worker_settings.threads_per_worker}",
        f"  Total threads:    {self.worker_settings.total_threads}",
    ]

    # Dynamic ancillary files
    if self.dynamic_ancillary_file_group:
        lines.extend(
            [
                "",
                "Dynamic Ancillary Files:",
            ]
        )
        dynamic_files = self.dynamic_ancillary_file_group.get_all_files()
        for name, path in list(dynamic_files.items())[:5]:
            lines.append(f"  {name}: {path}")
        if len(dynamic_files) > 5:
            lines.append(f"  ... and {len(dynamic_files) - 5} more")

    # Static ancillary files
    if self.static_ancillary_file_group:
        static_files = self.static_ancillary_file_group.get_all_files()
        if static_files:
            lines.extend(
                [
                    "",
                    "Static Ancillary Files:",
                ]
            )
            for name, path in static_files.items():
                lines.append(f"  {name}: {path}")

    # Validation status
    status = self.validate_ready_to_run()
    lines.append("")
    if not status["ready"]:
        lines.append("⚠️  Status: NOT READY")
        lines.append("Errors:")
        for error in status["errors"]:
            lines.append(f"  - {error}")
    else:
        lines.append("✓ Status: READY")

    if status["warnings"]:
        lines.append("")
        lines.append("Warnings:")
        for warning in status["warnings"]:
            lines.append(f"  ⚠️  {warning}")

    lines.append("=" * 70)

    return "\n".join(lines)
to_workflow
to_workflow() -> CalibrationWorkflow

Convert PGE RunConfig to a CalibrationWorkflow object.

This method translates the PGE-style configuration into the format expected by CalibrationWorkflow.

Returns:

Type Description
CalibrationWorkflow

Converted workflow configuration.

Examples:

>>> run_config = RunConfig.from_yaml_file("pge_config.yaml")
>>> workflow = run_config.to_workflow()
>>> workflow.create_directories()
>>> workflow.run()
Source code in src/cal_disp/config/pge_runconfig.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def to_workflow(self) -> CalibrationWorkflow:
    """Convert PGE RunConfig to a CalibrationWorkflow object.

    This method translates the PGE-style configuration into the format
    expected by CalibrationWorkflow.

    Returns
    -------
    CalibrationWorkflow
        Converted workflow configuration.

    Examples
    --------
    >>> run_config = RunConfig.from_yaml_file("pge_config.yaml")
    >>> workflow = run_config.to_workflow()
    >>> workflow.create_directories()
    >>> workflow.run()

    """
    # Set up directories
    scratch_directory = self.product_path_group.scratch_path
    output_directory = self.product_path_group.output_path

    # Set up log file
    log_file = self.log_file
    if log_file is None:
        log_file = output_directory / "cal_disp_workflow.log"

    # Create the workflow
    workflow = CalibrationWorkflow(
        # Input files
        input_options=self.input_file_group,
        # Ancillary files
        dynamic_ancillary_file_options=self.dynamic_ancillary_file_group,
        static_ancillary_file_options=self.static_ancillary_file_group,
        # Directories
        work_directory=scratch_directory,
        output_directory=output_directory,
        # Settings
        worker_settings=self.worker_settings,
        log_file=log_file,
        # Don't resolve paths yet - let the workflow handle it
        keep_paths_relative=False,
    )

    return workflow
to_yaml
to_yaml(output_path: Union[str, PathLike, TextIO], with_comments: bool = True, by_alias: bool = True, indent_per_level: int = 2) -> None

Save configuration to YAML file with name wrapper.

Parameters:

Name Type Description Default
output_path str | PathLike | TextIO

Path where YAML should be saved.

required
with_comments bool

Whether to include field descriptions as comments.

True
by_alias bool

Whether to use field aliases in output.

True
indent_per_level int

Indentation spacing.

2
Notes

This method always wraps output in {cal_disp_workflow: ...} structure. The with_comments parameter is accepted for signature compatibility but not currently used in the wrapper output.

Source code in src/cal_disp/config/pge_runconfig.py
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
def to_yaml(
    self,
    output_path: Union[str, PathLike, TextIO],
    with_comments: bool = True,  # noqa: ARG002
    by_alias: bool = True,
    indent_per_level: int = 2,
) -> None:  # Note: return type can be None or Any, both work
    """Save configuration to YAML file with name wrapper.

    Parameters
    ----------
    output_path : str | PathLike | TextIO
        Path where YAML should be saved.
    with_comments : bool, default=True
        Whether to include field descriptions as comments.
    by_alias : bool, default=True
        Whether to use field aliases in output.
    indent_per_level : int, default=2
        Indentation spacing.

    Notes
    -----
    This method always wraps output in {cal_disp_workflow: ...} structure.
    The with_comments parameter is accepted for signature compatibility but
    not currently used in the wrapper output.

    """
    from ruamel.yaml import YAML

    # Handle file-like objects
    if hasattr(output_path, "write"):
        raise ValueError(
            "RunConfig.to_yaml doesn't support file-like objects. "
            "Save to a file path instead."
        )

    # Convert to Path
    output_path_obj = Path(output_path)

    # Get dict and convert paths
    data = self.model_dump(mode="python", by_alias=by_alias)
    data = convert_paths_to_strings(data)
    wrapped = {self.name: data}

    # Write with proper formatting
    output_path_obj.parent.mkdir(parents=True, exist_ok=True)
    y = YAML()
    y.indent(
        mapping=indent_per_level,
        sequence=indent_per_level + 2,
        offset=indent_per_level,
    )
    with open(output_path_obj, "w") as f:
        y.dump(wrapped, f)
validate_ready_to_run
validate_ready_to_run() -> ValidationResult

Check if run configuration is ready for execution.

Source code in src/cal_disp/config/pge_runconfig.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
def validate_ready_to_run(self) -> ValidationResult:
    """Check if run configuration is ready for execution."""
    errors: List[str] = []
    warnings: List[str] = []

    # Check if None instead
    if self.input_file_group.disp_file is None:
        errors.append("disp_file must be provided")

    if self.input_file_group.calibration_reference_grid is None:
        errors.append("calibration_reference_grid must be provided")

    # Check dynamic ancillary files if provided
    if self.dynamic_ancillary_file_group:
        if self.dynamic_ancillary_file_group.algorithm_parameters_file is None:
            warnings.append("Missing algorithm_parameters_file")
        if self.dynamic_ancillary_file_group.geometry_file is None:
            warnings.append("Missing geometry_file")

    return {
        "ready": len(errors) == 0,
        "errors": errors,
        "warnings": warnings,
    }

workflow

CalibrationWorkflow

Bases: YamlModel

Calibration workflow configuration.

This class manages the complete configuration for the displacement calibration workflow, including input files, output directories, worker settings, and logging configuration.

Attributes:

Name Type Description
input_options Optional[InputFileGroup]

Configuration for required input files.

work_directory Path

Directory for intermediate processing files.

output_directory Path

Directory for final output files.

keep_paths_relative bool

If False, resolve all paths to absolute paths.

dynamic_ancillary_file_options Optional[DynamicAncillaryFileGroup]

Optional dynamic ancillary files for processing.

static_ancillary_file_options Optional[StaticAncillaryFileGroup]

Optional static ancillary files for processing.

worker_settings WorkerSettings

Dask worker and threading configuration.

log_file Optional[Path]

Custom log file path.

Source code in src/cal_disp/config/workflow.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
class CalibrationWorkflow(YamlModel):
    """Calibration workflow configuration.

    This class manages the complete configuration for the displacement calibration
    workflow, including input files, output directories, worker settings, and
    logging configuration.

    Attributes
    ----------
    input_options : Optional[InputFileGroup]
        Configuration for required input files.
    work_directory : Path
        Directory for intermediate processing files.
    output_directory : Path
        Directory for final output files.
    keep_paths_relative : bool
        If False, resolve all paths to absolute paths.
    dynamic_ancillary_file_options : Optional[DynamicAncillaryFileGroup]
        Optional dynamic ancillary files for processing.
    static_ancillary_file_options : Optional[StaticAncillaryFileGroup]
        Optional static ancillary files for processing.
    worker_settings : WorkerSettings
        Dask worker and threading configuration.
    log_file : Optional[Path]
        Custom log file path.

    """

    # Input/output file configuration
    input_options: Optional[InputFileGroup] = Field(
        default=None,
        description=(
            "Configuration for required input files. Must be provided before running"
            " workflow."
        ),
    )

    work_directory: DirectoryPath = Field(
        default=Path(),
        description=(
            "Directory for intermediate processing files. Created if it doesn't exist."
        ),
    )

    output_directory: DirectoryPath = Field(
        default=Path(),
        description="Directory for final output files. Created if it doesn't exist.",
    )

    keep_paths_relative: bool = Field(
        default=False,
        description=(
            "If False, resolve all relative paths to absolute paths. "
            "If True, keep paths as provided."
        ),
    )

    # Optional ancillary file groups
    dynamic_ancillary_options: Optional[DynamicAncillaryFileGroup] = Field(
        default=None,
        description="Dynamic ancillary files (dem, los, masks, troposphere, etc.).",
    )

    static_ancillary_options: Optional[StaticAncillaryFileGroup] = Field(
        default=None,
        description="Static ancillary files (algorithm overrides, databases, etc.).",
    )

    # Worker and logging configuration
    worker_settings: WorkerSettings = Field(
        default_factory=WorkerSettings,
        description="Worker and parallelism configuration.",
    )

    log_file: Optional[Path] = Field(
        default=None,
        description=(
            "Path to output log file (in addition to logging to stderr). "
            "If None, defaults to 'cal_disp.log' in work_directory."
        ),
    )

    @model_validator(mode="after")
    def _resolve_paths(self) -> "CalibrationWorkflow":
        """Resolve all paths to absolute if keep_paths_relative is False.

        Uses object.__setattr__() to bypass validate_assignment and avoid recursion.
        """
        if not self.keep_paths_relative:
            # Use object.__setattr__() to bypass validation and avoid recursion
            object.__setattr__(self, "work_directory", self.work_directory.resolve())
            object.__setattr__(
                self, "output_directory", self.output_directory.resolve()
            )

            # Resolve log file path if provided
            if self.log_file is not None:
                object.__setattr__(self, "log_file", self.log_file.resolve())

        return self

    @model_validator(mode="after")
    def _set_default_log_file(self) -> "CalibrationWorkflow":
        """Set default log file path if not provided."""
        if self.log_file is None:
            # Use object.__setattr__() to bypass validation
            object.__setattr__(self, "log_file", self.work_directory / "cal_disp.log")

        return self

    def validate_ready_to_run(self) -> ValidationResult:
        """Check if workflow is ready to run."""
        errors = []
        warnings = []

        if self.input_options is None:
            errors.append("input_options must be provided")
        else:
            if self.input_options.disp_file is None:
                errors.append("disp_file must be provided in input_options")
            if self.input_options.calibration_reference_grid is None:
                errors.append(
                    "calibration_reference_grid must be provided in input_options"
                )

        # Check dynamic ancillaries
        if self.dynamic_ancillary_options is None:
            errors.append("dynamic_ancillary_options must be provided")

        # Check for missing files only if required options are set
        if self.input_options is not None:
            missing = self.get_missing_files()
            if missing:
                warnings.append(f"Missing files: {', '.join(missing)}")

        return {
            "ready": len(errors) == 0,
            "errors": errors,
            "warnings": warnings,
        }

    def validate_input_files_exist(self) -> Dict[str, Dict[str, Any]]:
        """Check if all input files exist."""
        results = {}

        if self.input_options:
            results.update(self.input_options.validate_files_exist())

        if self.dynamic_ancillary_options:
            results.update(self.dynamic_ancillary_options.validate_files_exist())

        if self.static_ancillary_options:
            results.update(self.static_ancillary_options.validate_files_exist())

        return results

    def get_missing_files(self) -> List[str]:
        """Get list of missing required input files."""
        status = self.validate_input_files_exist()
        return [name for name, info in status.items() if not info["exists"]]

    def create_directories(self, exist_ok: bool = True) -> None:
        """Create work and output directories if they don't exist.

        Parameters
        ----------
        exist_ok : bool, default=True
            If True, don't raise error if directories already exist.

        """
        self.work_directory.mkdir(parents=True, exist_ok=exist_ok)
        self.output_directory.mkdir(parents=True, exist_ok=exist_ok)

        # Also create parent directory for log file
        if self.log_file is not None:
            self.log_file.parent.mkdir(parents=True, exist_ok=exist_ok)

    def setup_logging(
        self, level: int = 20, format_string: Optional[str] = None  # logging.INFO
    ):
        """Set up logging configuration for the workflow.

        Parameters
        ----------
        level : int, default=20 (INFO)
            Logging level (DEBUG=10, INFO=20, WARNING=30, ERROR=40, CRITICAL=50).
        format_string : str, optional
            Custom format string for log messages.

        Returns
        -------
        logging.Logger
            Configured logger instance.

        """
        import logging

        if format_string is None:
            format_string = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

        logger = logging.getLogger("calibration_workflow")
        logger.setLevel(level)
        logger.handlers.clear()

        # Console handler
        console_handler = logging.StreamHandler()
        console_handler.setLevel(level)
        console_handler.setFormatter(logging.Formatter(format_string))
        logger.addHandler(console_handler)

        # File handler
        if self.log_file is not None:
            self.log_file.parent.mkdir(parents=True, exist_ok=True)
            file_handler = logging.FileHandler(self.log_file)
            file_handler.setLevel(level)
            file_handler.setFormatter(logging.Formatter(format_string))
            logger.addHandler(file_handler)

        return logger

    def summary(self) -> str:
        """Generate a human-readable summary of the workflow configuration.

        Returns
        -------
        str
            Multi-line summary string.

        """
        lines = [
            "Calibration Workflow Configuration",
            "=" * 70,
            "",
            "Directories:",
            f"  Work directory:   {self.work_directory}",
            f"  Output directory: {self.output_directory}",
            f"  Log file:         {self.log_file}",
            f"  Keep relative:    {self.keep_paths_relative}",
            "",
        ]

        # Input files
        if self.input_options:
            lines.extend(
                [
                    "Input Files:",
                    f"  DISP file:        {self.input_options.disp_file}",
                    (
                        "  Calibration grid:"
                        f" {self.input_options.calibration_reference_grid}"
                    ),
                    "",
                ]
            )
        else:
            lines.extend(
                [
                    "Input Files:",
                    "   Not configured!",
                    "",
                ]
            )

        # Worker settings
        lines.extend(
            [
                "Worker Settings:",
                f"  Workers:          {self.worker_settings.n_workers}",
                f"  Threads/worker:   {self.worker_settings.threads_per_worker}",
                f"  Total threads:    {self.worker_settings.total_threads}",
                f"  Block shape:      {self.worker_settings.block_shape}",
                "",
            ]
        )

        # Dynamic ancillary files
        if self.dynamic_ancillary_options:
            dynamic_files = self.dynamic_ancillary_options.get_all_files()
            if dynamic_files:
                lines.extend(["Dynamic Ancillary Files:"])
                for name, path in list(dynamic_files.items())[:5]:  # Show first 5
                    lines.append(f"  {name}: {path}")
                if len(dynamic_files) > 5:
                    lines.append(f"  ... and {len(dynamic_files) - 5} more")
                lines.append("")

        # Static ancillary files
        if self.static_ancillary_options:
            static_files = self.static_ancillary_options.get_all_files()
            if static_files:
                lines.extend(["Static Ancillary Files:"])
                for name, path in static_files.items():
                    lines.append(f"  {name}: {path}")
                lines.append("")

        # Check readiness
        status = self.validate_ready_to_run()
        if not status["ready"]:
            lines.extend(
                [
                    "  Workflow Status: NOT READY",
                    "Errors:",
                ]
            )
            for error in status["errors"]:
                lines.append(f"  - {error}")
        else:
            lines.extend(
                [
                    " Workflow Status: READY",
                ]
            )

        if status["warnings"]:
            lines.extend(
                [
                    "",
                    "Warnings:",
                ]
            )
            for warning in status["warnings"]:
                lines.append(f"  ⚠️  {warning}")

        lines.append("=" * 70)

        return "\n".join(lines)

    @classmethod
    def create_example(cls) -> "CalibrationWorkflow":
        """Create an example workflow configuration.

        Returns
        -------
        CalibrationWorkflow
            Example configuration with placeholder values.

        """
        return cls(
            work_directory=Path("./work"),
            output_directory=Path("./output"),
            input_options=InputFileGroup(
                disp_file=Path("input/disp.nc"),
                calibration_reference_grid=Path("input/cal_grid.parquet"),
                frame_id=1,
            ),
            dynamic_ancillary_options=DynamicAncillaryFileGroup(
                algorithm_parameters_file="algorithm.yaml",
                static_los_file="line_of_sight_enu.tif",
                static_dem_file="dem.tif",
            ),
            worker_settings=WorkerSettings.create_standard(),
            keep_paths_relative=True,
        )

    @classmethod
    def create_minimal(cls) -> "CalibrationWorkflow":
        """Create a minimal workflow configuration without input files.

        Returns
        -------
        CalibrationWorkflow
            Minimal configuration. Input files must be added before running.

        """
        return cls(
            work_directory=Path("./work"),
            output_directory=Path("./output"),
        )

    model_config = STRICT_CONFIG_WITH_ALIASES
create_directories
create_directories(exist_ok: bool = True) -> None

Create work and output directories if they don't exist.

Parameters:

Name Type Description Default
exist_ok bool

If True, don't raise error if directories already exist.

True
Source code in src/cal_disp/config/workflow.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def create_directories(self, exist_ok: bool = True) -> None:
    """Create work and output directories if they don't exist.

    Parameters
    ----------
    exist_ok : bool, default=True
        If True, don't raise error if directories already exist.

    """
    self.work_directory.mkdir(parents=True, exist_ok=exist_ok)
    self.output_directory.mkdir(parents=True, exist_ok=exist_ok)

    # Also create parent directory for log file
    if self.log_file is not None:
        self.log_file.parent.mkdir(parents=True, exist_ok=exist_ok)
create_example classmethod
create_example() -> 'CalibrationWorkflow'

Create an example workflow configuration.

Returns:

Type Description
CalibrationWorkflow

Example configuration with placeholder values.

Source code in src/cal_disp/config/workflow.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
@classmethod
def create_example(cls) -> "CalibrationWorkflow":
    """Create an example workflow configuration.

    Returns
    -------
    CalibrationWorkflow
        Example configuration with placeholder values.

    """
    return cls(
        work_directory=Path("./work"),
        output_directory=Path("./output"),
        input_options=InputFileGroup(
            disp_file=Path("input/disp.nc"),
            calibration_reference_grid=Path("input/cal_grid.parquet"),
            frame_id=1,
        ),
        dynamic_ancillary_options=DynamicAncillaryFileGroup(
            algorithm_parameters_file="algorithm.yaml",
            static_los_file="line_of_sight_enu.tif",
            static_dem_file="dem.tif",
        ),
        worker_settings=WorkerSettings.create_standard(),
        keep_paths_relative=True,
    )
create_minimal classmethod
create_minimal() -> 'CalibrationWorkflow'

Create a minimal workflow configuration without input files.

Returns:

Type Description
CalibrationWorkflow

Minimal configuration. Input files must be added before running.

Source code in src/cal_disp/config/workflow.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
@classmethod
def create_minimal(cls) -> "CalibrationWorkflow":
    """Create a minimal workflow configuration without input files.

    Returns
    -------
    CalibrationWorkflow
        Minimal configuration. Input files must be added before running.

    """
    return cls(
        work_directory=Path("./work"),
        output_directory=Path("./output"),
    )
get_missing_files
get_missing_files() -> List[str]

Get list of missing required input files.

Source code in src/cal_disp/config/workflow.py
170
171
172
173
def get_missing_files(self) -> List[str]:
    """Get list of missing required input files."""
    status = self.validate_input_files_exist()
    return [name for name, info in status.items() if not info["exists"]]
setup_logging
setup_logging(level: int = 20, format_string: Optional[str] = None)

Set up logging configuration for the workflow.

Parameters:

Name Type Description Default
level int

Logging level (DEBUG=10, INFO=20, WARNING=30, ERROR=40, CRITICAL=50).

20 (INFO)
format_string str

Custom format string for log messages.

None

Returns:

Type Description
Logger

Configured logger instance.

Source code in src/cal_disp/config/workflow.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def setup_logging(
    self, level: int = 20, format_string: Optional[str] = None  # logging.INFO
):
    """Set up logging configuration for the workflow.

    Parameters
    ----------
    level : int, default=20 (INFO)
        Logging level (DEBUG=10, INFO=20, WARNING=30, ERROR=40, CRITICAL=50).
    format_string : str, optional
        Custom format string for log messages.

    Returns
    -------
    logging.Logger
        Configured logger instance.

    """
    import logging

    if format_string is None:
        format_string = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"

    logger = logging.getLogger("calibration_workflow")
    logger.setLevel(level)
    logger.handlers.clear()

    # Console handler
    console_handler = logging.StreamHandler()
    console_handler.setLevel(level)
    console_handler.setFormatter(logging.Formatter(format_string))
    logger.addHandler(console_handler)

    # File handler
    if self.log_file is not None:
        self.log_file.parent.mkdir(parents=True, exist_ok=True)
        file_handler = logging.FileHandler(self.log_file)
        file_handler.setLevel(level)
        file_handler.setFormatter(logging.Formatter(format_string))
        logger.addHandler(file_handler)

    return logger
summary
summary() -> str

Generate a human-readable summary of the workflow configuration.

Returns:

Type Description
str

Multi-line summary string.

Source code in src/cal_disp/config/workflow.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
def summary(self) -> str:
    """Generate a human-readable summary of the workflow configuration.

    Returns
    -------
    str
        Multi-line summary string.

    """
    lines = [
        "Calibration Workflow Configuration",
        "=" * 70,
        "",
        "Directories:",
        f"  Work directory:   {self.work_directory}",
        f"  Output directory: {self.output_directory}",
        f"  Log file:         {self.log_file}",
        f"  Keep relative:    {self.keep_paths_relative}",
        "",
    ]

    # Input files
    if self.input_options:
        lines.extend(
            [
                "Input Files:",
                f"  DISP file:        {self.input_options.disp_file}",
                (
                    "  Calibration grid:"
                    f" {self.input_options.calibration_reference_grid}"
                ),
                "",
            ]
        )
    else:
        lines.extend(
            [
                "Input Files:",
                "   Not configured!",
                "",
            ]
        )

    # Worker settings
    lines.extend(
        [
            "Worker Settings:",
            f"  Workers:          {self.worker_settings.n_workers}",
            f"  Threads/worker:   {self.worker_settings.threads_per_worker}",
            f"  Total threads:    {self.worker_settings.total_threads}",
            f"  Block shape:      {self.worker_settings.block_shape}",
            "",
        ]
    )

    # Dynamic ancillary files
    if self.dynamic_ancillary_options:
        dynamic_files = self.dynamic_ancillary_options.get_all_files()
        if dynamic_files:
            lines.extend(["Dynamic Ancillary Files:"])
            for name, path in list(dynamic_files.items())[:5]:  # Show first 5
                lines.append(f"  {name}: {path}")
            if len(dynamic_files) > 5:
                lines.append(f"  ... and {len(dynamic_files) - 5} more")
            lines.append("")

    # Static ancillary files
    if self.static_ancillary_options:
        static_files = self.static_ancillary_options.get_all_files()
        if static_files:
            lines.extend(["Static Ancillary Files:"])
            for name, path in static_files.items():
                lines.append(f"  {name}: {path}")
            lines.append("")

    # Check readiness
    status = self.validate_ready_to_run()
    if not status["ready"]:
        lines.extend(
            [
                "  Workflow Status: NOT READY",
                "Errors:",
            ]
        )
        for error in status["errors"]:
            lines.append(f"  - {error}")
    else:
        lines.extend(
            [
                " Workflow Status: READY",
            ]
        )

    if status["warnings"]:
        lines.extend(
            [
                "",
                "Warnings:",
            ]
        )
        for warning in status["warnings"]:
            lines.append(f"  ⚠️  {warning}")

    lines.append("=" * 70)

    return "\n".join(lines)
validate_input_files_exist
validate_input_files_exist() -> Dict[str, Dict[str, Any]]

Check if all input files exist.

Source code in src/cal_disp/config/workflow.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def validate_input_files_exist(self) -> Dict[str, Dict[str, Any]]:
    """Check if all input files exist."""
    results = {}

    if self.input_options:
        results.update(self.input_options.validate_files_exist())

    if self.dynamic_ancillary_options:
        results.update(self.dynamic_ancillary_options.validate_files_exist())

    if self.static_ancillary_options:
        results.update(self.static_ancillary_options.validate_files_exist())

    return results
validate_ready_to_run
validate_ready_to_run() -> ValidationResult

Check if workflow is ready to run.

Source code in src/cal_disp/config/workflow.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def validate_ready_to_run(self) -> ValidationResult:
    """Check if workflow is ready to run."""
    errors = []
    warnings = []

    if self.input_options is None:
        errors.append("input_options must be provided")
    else:
        if self.input_options.disp_file is None:
            errors.append("disp_file must be provided in input_options")
        if self.input_options.calibration_reference_grid is None:
            errors.append(
                "calibration_reference_grid must be provided in input_options"
            )

    # Check dynamic ancillaries
    if self.dynamic_ancillary_options is None:
        errors.append("dynamic_ancillary_options must be provided")

    # Check for missing files only if required options are set
    if self.input_options is not None:
        missing = self.get_missing_files()
        if missing:
            warnings.append(f"Missing files: {', '.join(missing)}")

    return {
        "ready": len(errors) == 0,
        "errors": errors,
        "warnings": warnings,
    }

Browse Image

cal_disp.browse_image

Module for creating browse images for the output product.

make_browse_image_from_arr

make_browse_image_from_arr(output_filename: PathOrStr, arr: ArrayLike, mask: ArrayLike, max_dim_allowed: int = 2048, cmap: str = DEFAULT_CMAP, vmin: float = -0.1, vmax: float = 0.1) -> None

Create a PNG browse image for the output product from given array.

Source code in src/cal_disp/browse_image.py
35
36
37
38
39
40
41
42
43
44
45
46
47
def make_browse_image_from_arr(
    output_filename: PathOrStr,
    arr: ArrayLike,
    mask: ArrayLike,
    max_dim_allowed: int = 2048,
    cmap: str = DEFAULT_CMAP,
    vmin: float = -0.10,
    vmax: float = 0.10,
) -> None:
    """Create a PNG browse image for the output product from given array."""
    arr[mask == 0] = np.nan
    arr = _resize_to_max_pixel_dim(arr, max_dim_allowed)
    _save_to_disk_as_color(arr, output_filename, cmap, vmin, vmax)

make_browse_image_from_nc

make_browse_image_from_nc(output_filename: PathOrStr, input_filename: PathOrStr, max_dim_allowed: int = 2048, cmap: str = DEFAULT_CMAP, vmin: float = -0.1, vmax: float = 0.1) -> None

Create a PNG browse image for the output product from product in NetCDF file.

Source code in src/cal_disp/browse_image.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def make_browse_image_from_nc(
    output_filename: PathOrStr,
    input_filename: PathOrStr,
    max_dim_allowed: int = 2048,
    cmap: str = DEFAULT_CMAP,
    vmin: float = -0.10,
    vmax: float = 0.10,
) -> None:
    """Create a PNG browse image for the output product from product in NetCDF file."""
    arr = np.nanarray(input_filename)  # placeholder
    mask = np.nanarray  # placeholder

    make_browse_image_from_arr(
        output_filename, arr, mask, max_dim_allowed, cmap, vmin, vmax
    )