Source code for radarx.io.imd

#!/usr/bin/env python
# Copyright (c) 2024, radarx developers.
# Distributed under the MIT License. See LICENSE for more info.

"""
IMD Reader
==========

This sub-module provides functionality to read and process single radar files
from the Indian Meteorological Department (IMD), returning a quasi-CF-Radial
xarray Dataset.

Example::

    import radarx as rx
    dtree = rx.io.read_sweep(filename)
    dtree = rx.io.read_volume(filename)

.. autosummary::
   :nosignatures:
   :toctree: generated/

   {}
"""

import logging
import itertools
import numpy as np
import xarray as xr
from xarray import DataTree
from xradar.io.backends.cfradial1 import (
    _get_radar_calibration,
    _get_required_root_dataset,
    _get_subgroup,
    _get_sweep_groups,
)

from xradar.model import (
    georeferencing_correction_subgroup,
    radar_parameters_subgroup,
)

logger = logging.getLogger(__name__)

__all__ = [
    "read_sweep",
    "read_volume",
    "to_cfradial2",
    "to_cfradial2_volumes",
]

__doc__ = __doc__.format("\n   ".join(__all__))


[docs] def read_sweep(file): """ Read and process a single radar file from the Indian Meteorological Department (IMD). This function reads radar data and returns it as a quasi-CF-Radial `xarray.Dataset`, with coordinates and variables properly renamed and calculated fields added as necessary. Parameters ---------- file : str or pathlib.Path The file path to the radar data file. Returns ------- xarray.Dataset A processed dataset in quasi-CF-Radial format, with variables and coordinates appropriately renamed and additional fields calculated. Examples -------- >>> import radarx as rx >>> ds = rx.io.read_sweep("path/to/radar/file") See Also -------- read_volume : Reads and processes radar volume data from multiple sweeps. """ ds = xr.open_dataset(file, engine="netcdf4") # Rename dimensions and variables for consistency with CF-Radial format ds = ds.rename_dims({"radial": "azimuth", "bin": "range"}) ds = ds.rename_vars( { "radialAzim": "azimuth", "radialElev": "elevation", "elevationAngle": "fixed_angle", "radialTime": "time", "nyquist": "nyquist_velocity", "unambigRange": "unambiguous_range", } ) ds = ds.set_coords(["azimuth", "elevation"]) # Rename site-related coordinates site_coords = { "siteLat": "latitude", "siteLon": "longitude", "siteAlt": "altitude", "groundHeight": "altitude_agl", } ds = ds.rename_vars({k: v for k, v in site_coords.items() if k in ds}) # Add ray gate spacing variable ds["ray_gate_spacing"] = xr.DataArray( data=np.repeat(ds.gateSize.values, ds.azimuth.size), dims=("azimuth",), attrs={"long_name": "gate_spacing_for_ray", "units": "meters"}, ) # Rename moments for radar variables moments = { "T": "DBT", # Total power "Z": "DBZ", # Reflectivity "V": "VEL", # Velocity "W": "WIDTH", # Spectrum width } # Goes First in order # # Rename only the variables that exist in the dataset ds = ds.rename_vars({k: v for k, v in moments.items() if k in ds}) # Compute range and assign to dataset, Goes 2nd ds = ds.pipe(_compute_range) # Compute angle resolution, Goes 3rd ds = ds.pipe(_angle_resolution) # Determine scan type and mode, Goes 4th ds = ds.pipe(_scantype) # Time coverage handling, Goes 5th ds = ds.pipe(_time_coverage) # Handle sweep numbers, Goes 6th ds = ds.pipe(_sweep_number) # Assign start_end_ray_index, Goes 7th ds = ds.pipe(_assign_sweep_metadata) ds = ds.pipe(_assign_metadata) ds = ds.swap_dims({"azimuth": "time"}) return ds
[docs] def read_volume(files): """ Read and process multiple radar files to create a volume scan dataset. This function reads a list of radar files (or a list of lists, in the case of multi-sweep data) and returns a `DataTree` object containing the processed volume scan data. Parameters ---------- files : list of str or pathlib.Path or list of list of str or pathlib.Path A list of radar file paths or a list of lists, where each sublist represents a separate sweep in the radar volume scan. Returns ------- DataTree A `DataTree` object containing the volume scan data, organized by sweep. Examples -------- >>> import radarx as rx >>> files = ["sweep1.nc", "sweep2.nc", "sweep3.nc"] >>> volume_data = rx.io.read_volume(files) See Also -------- read_sweep : Read and process a single radar sweep file. """ # Check if there are nested lists and merge them if needed if any(isinstance(i, list) for i in files): files = _merge_file_lists(files) # Determine volume groups from files grouped_dataset_list = _determine_volumes(files) # Create a list to store the volumes volume_datasets = [] # Iterate over each group of datasets (which represent a volume) for datasets in grouped_dataset_list: vol = create_volume(datasets) # Custom function to create the volume if isinstance(vol, xr.Dataset): volume_datasets.append(vol) # Only add if it's an xarray.Dataset del vol # Cleanup after each volume is processed # Construct the DataTree from the volume datasets # (ensure that we pass datasets, not a list) volumes = DataTree.from_dict( {f"volume_{i}": volume for i, volume in enumerate(volume_datasets)} ) return volumes
def _merge_file_lists(file_lists): """ Merge multiple lists of file paths into a single, flattened list. This function takes a list of lists, where each sublist contains file paths, and merges them into a single flat list. This is useful when dealing with grouped file paths that need to be processed collectively. Parameters ---------- file_lists : list of lists A list containing multiple sublists, each holding file paths as strings. Example: [['file1.txt', 'file2.txt'], ['file3.txt'], ['file4.txt', 'file5.txt']] Returns ------- merged_list : list A single flattened list containing all the file paths from the provided sublists. Example: ['file1.txt', 'file2.txt', 'file3.txt', 'file4.txt', 'file5.txt'] """ # Use itertools.chain to flatten the list of lists merged_list = list(itertools.chain(*file_lists)) return merged_list def _compute_range(ds): """ Compute the range to each radar gate and add it as a coordinate to the dataset. This function calculates the distance from the radar instrument to the center of each gate based on the dataset's `firstGateRange` and `gateSize` values. The computed range values are then added to the dataset as a new coordinate named "range". Parameters ---------- ds : xarray.Dataset Input dataset containing radar data. The dataset must include the variables 'firstGateRange' and 'gateSize', which define the starting range and the distance between consecutive gates, respectively. Returns ------- ds : xarray.Dataset The original dataset with an added "range" coordinate. This coordinate represents the computed distances to each gate and is defined with the following attributes: - "standard_name": Standardized name for the range. - "long_name": Descriptive name for the range. - "units": The units of the range, which are "meters". - "spacing_is_constant": Indicates that the spacing between gates is constant. - "meters_to_center_of_first_gate": Distance to the center of the first gate. - "meters_between_gates": Spacing between consecutive gates. """ # Using ds.sizes instead of ds.dims to avoid future warning gate_count = ds.sizes["range"] # Ensure proper conversion to avoid deprecation warning ranges = np.arange( ds["firstGateRange"].values, gate_count * ds["gateSize"].values + ds["firstGateRange"].values, ds["gateSize"].astype(int).values, # Ensure proper integer type conversion ) ds.coords["range"] = xr.DataArray( ranges, dims=("range",), attrs={ "standard_name": "range_to_center_of_measurement_volume", "long_name": "Range from instrument to center of gate", "units": "meters", "spacing_is_constant": "true", "meters_to_center_of_first_gate": ds["firstGateRange"].values, "meters_between_gates": ds["gateSize"].values, }, ) return ds def _angle_resolution(ds): """ Adds angular resolution for each ray to the dataset. Parameters ---------- ds : xarray.Dataset Input dataset. Returns ------- ds : xarray.Dataset Dataset with angular resolution variable. """ ds = ds.drop_dims("sweep", errors="ignore") ds["ray_angle_res"] = xr.DataArray( data=[ds["angleResolution"]], dims=("sweep",), attrs={"long_name": "angular_resolution_between_rays", "units": "degrees"}, ) ds = ds.drop_vars("angleResolution", errors="ignore") return ds def _scantype(ds): """ Determines the scan type based on the scanType variable. Parameters ---------- ds : xarray.Dataset Input dataset. Returns ------- ds : xarray.Dataset Dataset with scan type and sweep mode variables. """ scan_type_lookup = { 4: (b"azimuth_surveillance", b"ppi"), 7: (b"elevation_surveillance", b"rhi"), 1: (b"sector", b"ppi_sector"), 2: (b"manual_rhi", b"rhi_sector"), 0: (b"unknown", b"unknown"), } # Extract the scalar value from the array scan_type = int(ds["scanType"].astype(int).values) # Get sweep_mode and scan_type_name from lookup dictionary sweep_mode, scan_type_name = scan_type_lookup.get( scan_type, (b"unknown", b"unknown") ) ds["sweep_mode"] = xr.DataArray(data=[sweep_mode], dims=("sweep")) ds["scan_type"] = xr.DataArray(data=[scan_type_name], dims=("sweep")) # ds = ds.drop_vars("scanType", errors="ignore") ds["fixed_angle"] = xr.DataArray(data=[ds["fixed_angle"]], dims=("sweep",)) return ds def _time_coverage(ds): """ Processes time coverage start and end times. Parameters ---------- ds : xarray.Dataset Input dataset. Returns ------- ds : xarray.Dataset Dataset with time coverage start and end variables. """ ds = ds.rename_vars({"esStartTime": "time_coverage_start"}) ds["time_coverage_start"] = xr.DataArray( data=str( ds.time_coverage_start.dt.strftime("%Y-%m-%dT%H:%M:%SZ").values ).encode("utf-8"), attrs={"standard_name": "data_volume_start_time_utc"}, ) ds["time_coverage_end"] = xr.DataArray( data=str(ds.time.max().dt.strftime("%Y-%m-%dT%H:%M:%SZ").values).encode( "utf-8" ), attrs={"standard_name": "data_volume_end_time_utc"}, ) return ds def _sweep_number(ds): """ Adds sweep number information to the dataset. Parameters ---------- ds : xarray.Dataset Input dataset. Returns ------- ds : xarray.Dataset Dataset with sweep number variable. """ sweep_num = ds["elevationNumber"].astype(int).values ds["sweep_number"] = xr.DataArray( data=[sweep_num], dims="sweep", attrs={"long_name": "sweep_index_number_0_based", "units": ""}, ) ds = ds.drop_vars("elevationNumber") return ds def _assign_sweep_metadata(ds): """ Assigns and updates the sweep metadata after reading multiple files. Parameters ---------- ds : xarray.Dataset Combined dataset with multiple sweeps. Returns ------- ds : xarray.Dataset Dataset with updated sweep metadata. """ num_sweeps = ds.sizes["sweep"] sweep_start = np.arange(0, num_sweeps * ds.sizes["azimuth"], ds.sizes["azimuth"]) sweep_end = sweep_start + ds.sizes["azimuth"] - 1 ds["sweep_start_ray_index"] = xr.DataArray( sweep_start, dims="sweep", attrs={"long_name": "Index of the first ray in the sweep"}, ) ds["sweep_end_ray_index"] = xr.DataArray( sweep_end, dims="sweep", attrs={"long_name": "Index of the last ray in the sweep"}, ) # Update sweep number if not done already if "sweep_number" not in ds: ds["sweep_number"] = xr.DataArray( np.arange(num_sweeps), dims="sweep", attrs={"long_name": "Sweep number"} ) return ds def _find_variables_by_coords(ds, required_dims): """ Finds variables in the dataset that have the specified dimensions. Parameters ---------- ds : xarray.Dataset The input dataset. required_dims : set A set of required dimensions (e.g., {"range", "azimuth"}). Returns ------- matching_vars : list List of variable names that have the specified dimensions. Example ------- Assuming `ds` is your xarray.Dataset and you want to find variables with both "range" and "azimuth" dimensions vars_with_rng_and_az = find_variables_by_coords(ds, {"range", "azimuth"}) """ matching_vars = [] for var_name, var_data in ds.data_vars.items(): var_dims = set(var_data.dims) if required_dims.issubset(var_dims): matching_vars.append(var_name) return matching_vars def _assign_metadata(ds): field_names = _find_variables_by_coords(ds, {"azimuth", "range"}) field_names = ", ".join(field_names) ds.attrs = { "Conventions": "CF/Radial instrument_parameters", "version": "1.3", "title": "", "institution": "India Meteorological Department", "references": "", "source": "", "comment": "im/radarx", "instrument_name": "", "history": "", "field_names": field_names, } return ds def _determine_nsweeps(files): """ Determine the number of sweeps (nsweeps) and group files accordingly into sweep_groups. If the nsweeps cannot be determined from metadata, the function will detect where the sweep elevations start decreasing to identify the start of a new volume. Parameters ---------- files : list of str List of file paths corresponding to radar sweep files. Returns ------- sweep_groups : list of list of str List of sweep groups where each group contains radar sweep files corresponding to a complete volume. Notes ----- The function first attempts to retrieve the number of sweeps (nsweeps) from the metadata of the files. If it cannot be determined, it uses the elevation angles to detect when the sweep elevations decrease, which indicates the start of a new volume. """ import re def _natural_sort_key(s, _re=re.compile(r"(\d+)")): return [int(t) if i & 1 else t.lower() for i, t in enumerate(_re.split(s))] try: # Check if files exist and open the first file to determine nsweeps if len(files): file = files[0] else: logger.warning("No files provided.") return [] ds = xr.open_dataset(file, engine="netcdf4") # Try to retrieve nsweeps from the dataset try: nsweeps = ( ds.attrs.get("nsweeps") or ds.sizes.get("sweep") or ds["sweep"].size or ds["fixed_angle"].size ) except KeyError: logger.error("Could not determine nsweeps from dataset metadata.") raise ValueError("Could not determine nsweeps from dataset metadata.") # Sort the files naturally and group them by nsweeps files = sorted(files, key=_natural_sort_key) sweep_groups = [files[i : i + nsweeps] for i in range(0, len(files), nsweeps)] del ds logger.info( f"Successfully grouped files into {len(sweep_groups)} sweep groups." ) return sweep_groups except Exception as e: # Fallback method if nsweeps cannot be determined from metadata logger.warning(f"Failed to determine nsweeps from metadata: {e}") sweep_groups = [] sweep_times = [] sweep_elevs = [] files = sorted(files, key=_natural_sort_key) # Collect sweep times and elevations for file in files: ds = xr.open_dataset(file, engine="netcdf4") estime = ds.get("esStartTime", ds.get("time_coverage_start", None)) if estime is not None: estime = estime.dt.strftime("%Y-%m-%dT%H:%M:%S").values.item() ele_ang = ds.get("elevationAngle", ds.get("fixed_angle", None)) if ele_ang is not None: ele_ang = ele_ang.values.item() sweep_times.append(estime) sweep_elevs.append(ele_ang if ele_ang is not None else float("nan")) del ds # Determine sweep groups based on elevation increases/decreases current_group = [] for i in range(1, len(sweep_elevs)): current_group.append(files[i - 1]) # Check if the elevation decreases, indicating the start of a new volume if ( not (np.isnan(sweep_elevs[i]) or np.isnan(sweep_elevs[i - 1])) and sweep_elevs[i] < sweep_elevs[i - 1] ): logger.info(f"Detected volume change at file index {i}.") sweep_groups.append(current_group) current_group = [] # Append the last group current_group.append(files[-1]) sweep_groups.append(current_group) logger.info(f"Total number of sweep groups determined: {len(sweep_groups)}") return sweep_groups def _determine_volumes(files): """ Determine radar volumes from files. Parameters ---------- files : list of str List of file paths corresponding to radar sweep files. Returns ------- volumes : list of list of xarray.Dataset A list of radar volumes, where each volume is a list of xarray.Dataset objects. Each dataset corresponds to a radar sweep. Notes ----- This function attempts to read radar sweep files using the `read_sweep` function. If `read_sweep` raises an exception, the function falls back to `xarray.open_dataset` for opening the file. """ # Determine sweep lists based on the number of sweeps swp_lists = _determine_nsweeps(files) volumes = [] for swp_list in swp_lists: volume = [] for file in swp_list: try: # Attempt to read sweep using custom `read_sweep` ds = read_sweep(file) except (OSError, ValueError, KeyError) as e: # Log the specific exception with the filename and # fall back to `open_dataset` logger.warning( f"Failed to read sweep from {file} with read_sweep. " f"Error: {e}. Falling back to open_dataset." ) try: ds = xr.open_dataset(file) except Exception as open_error: logger.error( f"Failed to open {file} with open_dataset. Error: {open_error}" ) raise open_error # Raise the exception if fallback also fails volume.append(ds) volumes.append(volume) return volumes def create_volume(dataset_list): """ Concatenate a list of datasets along the 'time' and 'sweep' dimensions. Non-time-varying variables will be retained as single values. Also updates `time_coverage_end` based on the maximum time value. You have to give the list of sweeps which are already organized/sorted, for that you can either make it manually, or use `_determine_volumes` function Parameters ---------- dataset_list : list of xarray.Dataset List of datasets representing individual sweeps. Returns ------- xarray.Dataset Concatenated dataset with proper time and sweep dimensions. """ # Identify variables that should be concatenated along the sweep dimension sweep_vars = [ "fixed_angle", "ray_angle_res", "sweep_mode", "scan_type", "sweep_number", "sweep_start_ray_index", "sweep_end_ray_index", ] # Identify variables that are constant across sweeps and should not be concatenated constant_vars = [ "time_coverage_start", "latitude", "longitude", "altitude", "firstGateRange", "gateSize", "nyquist_velocity", "unambiguous_range", "calibConst", "radarConst", "beamWidthHori", "pulseWidth", "bandWidth", "filterDop", "azimuthSpeed", "highPRF", "lowPRF", "dwellTime", "waveLength", "calI0", "calNoise", "groundHeight", "meltHeight", "scanType", "logNoise", "linNoise", "inphaseOffset", "quadratureOffset", "logSlope", "logFilter", "filterPntClt", "filterThreshold", "sampleNum", "SQIThresh", "LOGThresh", "SIGThresh", "CSRThresh", "DBTThreshFlag", "DBZThreshFlag", "VELThreshFlag", "WIDThreshFlag", ] # Step 1: Concatenate the datasets along the time dimension time_concat = xr.concat( [ds.drop_vars(sweep_vars, errors="ignore") for ds in dataset_list], dim="time" ) # Step 2: Handle sweep-specific variables (metadata) sweep_datasets = [] for i, ds in enumerate(dataset_list): sweep_ds = {} for var in sweep_vars: if var in ds: # Expand dims to add a 'sweep' dimension if needed if "sweep" not in ds[var].dims: sweep_ds[var] = ds[var].expand_dims("sweep") else: sweep_ds[var] = ds[var] # Update sweep start and end ray indices sweep_start = i * ds.sizes["time"] sweep_end = sweep_start + ds.sizes["time"] - 1 sweep_ds["sweep_start_ray_index"] = xr.DataArray( np.array([sweep_start]), dims="sweep", attrs={"long_name": "Index of the first ray in the sweep"}, ) sweep_ds["sweep_end_ray_index"] = xr.DataArray( np.array([sweep_end]), dims="sweep", attrs={"long_name": "Index of the last ray in the sweep"}, ) # Add the updated sweep metadata dataset to the list sweep_datasets.append(xr.Dataset(sweep_ds)) # Step 3: Concatenate the sweep-specific datasets along the sweep dimension sweep_concat = xr.concat(sweep_datasets, dim="sweep") # Step 4: Merge the time-concatenated data, # sweep-concatenated metadata, and constant variables combined_ds = xr.merge([time_concat, sweep_concat]) # Step 5: Assign constant variables (they do not need to be concatenated) for var in constant_vars: if var in dataset_list[0]: combined_ds[var] = dataset_list[0][var] # Step 6: Update the 'time_coverage_end' variable with the maximum time value combined_ds["time_coverage_end"] = xr.DataArray( data=str( combined_ds.time.max().dt.strftime("%Y-%m-%dT%H:%M:%SZ").values ).encode("utf-8"), attrs={"standard_name": "data_volume_end_time_utc"}, ) return combined_ds # to_cfradial2 function implementation
[docs] def to_cfradial2(ds, **kwargs): """ Convert a CfRadial1 Dataset to a CfRadial2 hierarchical structure using xarray's native DataTree. Parameters ---------- ds : xarray.Dataset The input dataset in CfRadial1 format to be converted. **kwargs : dict, optional Additional keyword arguments to customize the conversion process: - first_dim : str, default 'auto' Specifies the first dimension of the sweeps. - optional : bool, default True Whether to include optional fields. - site_coords : bool, default True Whether to include site coordinates. - sweep : int or None, default None Specifies a particular sweep to extract. Returns ------- DataTree An xarray DataTree object representing the dataset in CfRadial2 format. Notes ----- - The hierarchical structure of the resulting DataTree will include: - "/" (root): Contains required global attributes and general metadata. - "/radar_parameters": Radar parameter data, if available. - "/georeferencing_correction": Georeferencing correction data, if available. - "/radar_calibration": Radar calibration data, if available. - "/sweep_X": Individual sweep data for each sweep (e.g., "/sweep_0", "/sweep_1"). - Missing groups (e.g., calibration) will not be included in the DataTree. - This function leverages helper functions `_get_required_root_dataset`, `_get_subgroup`, `_get_radar_calibration`, and `_get_sweep_groups`. Examples -------- >>> dtree = to_cfradial2(ds) >>> print(dtree) DataTree('root') ├── DataTree('radar_parameters') ├── DataTree('georeferencing_correction') ├── DataTree('radar_calibration') ├── DataTree('sweep_0') ├── DataTree('sweep_1') ├── ... See Also -------- - to_cfradial2_volumes : Convert cfradial1 volume to cfradial2 volume. """ # Extract customization options first_dim = kwargs.pop("first_dim", "auto") optional = kwargs.pop("optional", True) site_coords = kwargs.pop("site_coords", True) sweep = kwargs.pop("sweep", None) # Build the base dictionary for DataTree dtree: dict = { "/": _get_required_root_dataset(ds, optional=optional), "/radar_parameters": _get_subgroup(ds, radar_parameters_subgroup), "/georeferencing_correction": _get_subgroup( ds, georeferencing_correction_subgroup ), "/radar_calibration": _get_radar_calibration(ds), } # Add sweep groups to the dictionary sweep_groups = _get_sweep_groups( ds, sweep=sweep, first_dim=first_dim, optional=optional, site_coords=site_coords, ) for sweep_name, sweep_data in sweep_groups.items(): dtree[f"/{sweep_name}"] = sweep_data # Create and return the DataTree from the dictionary return DataTree.from_dict(dtree)
[docs] def to_cfradial2_volumes(volumes): """ Convert multiple CfRadial1 volumes to a DataTree containing CfRadial2 structures. This function processes a collection of radar volumes, converting each volume from CfRadial1 format to CfRadial2 using `to_cfradial2`. It organizes these volumes into a single DataTree where each top-level group corresponds to a different radar volume. Parameters ---------- volumes : DataTree A DataTree containing multiple radar volumes. Each child node should represent a radar volume. Returns ------- DataTree A root DataTree object named 'volumes' containing each converted radar volume as a subgroup. Each subgroup (volume_{i}) is structured according to the CfRadial2 format, containing sweeps and other relevant metadata. Examples -------- >>> dtree_root = to_cfradial2_volumes(volumes) >>> print(dtree_root.groups) ['/volume_0', '/volume_1', '/volume_2', ...] >>> print(dtree_root['volume_0'].groups) ['/', '/radar_parameters', '/georeferencing_correction', '/sweep_0', '/sweep_1', ...] Notes ----- - Each radar volume is expected to be a CfRadial1 format dataset. - The resulting structure has a root named 'volumes' with child nodes 'volume_0', 'volume_1', etc., each containing its respective subgroups. See Also -------- to_cfradial2 : Convert a sweep to cfradial2 format """ volumes_list = [] for volume in volumes.children: if "volume_" in volume: vol = volumes[volume].to_dataset() dtree = to_cfradial2(vol) volumes_list.append(dtree) # Create the root DataTree to hold all volumes dtree_root = DataTree(name="volumes") # Iterate over the volumes_list to create the required structure for i, vol_dtree in enumerate(volumes_list): # Create a new DataTree for this volume volume_name = f"volume_{i}" # Attach the entire vol_dtree directly to volume_name dtree_root[volume_name] = vol_dtree return dtree_root