Source code for metobs_toolkit.qc_collection.whitelist

import logging

import pandas as pd
from typing import List, Union

from metobs_toolkit.settings_collection import Settings
import metobs_toolkit.backend_collection.printing_collection as printing
from metobs_toolkit.backend_collection.datetime_collection import (
    timestamps_to_datetimeindex,
    convert_timezone,
)
from metobs_toolkit.backend_collection.errorclasses import (
    MetObsArgumentError,
)
from metobs_toolkit.backend_collection.decorators import log_entry

logger = logging.getLogger("<metobs_toolkit>")


[docs] class SensorWhiteSet: """Whitelist container for a single sensor (station-obstype combination). This class manages whitelisted timestamps for a specific sensor, allowing certain observations to be excluded from outlier detection in QC checks. Parameters ---------- white_timestamps : list, optional List of datetime objects to whitelist. Default is empty list. all_timestamps : bool, optional If True, all timestamps are whitelisted. Default is False. """ # Class variable for internal timezone storage _target_tz: str = Settings.get("store_tz")
[docs] def __init__( self, white_timestamps: Union[None, List] = None, all_timestamps: bool = False ) -> None: if white_timestamps is None: # None as a default is more convenient white_timestamps = [] if (all_timestamps) & (len(white_timestamps) != 0): raise MetObsArgumentError( "If all_timestamps is True, white_timestamps must be empty." ) self.white_timestamps = white_timestamps self.all_timestamps = all_timestamps logger.debug( "Initialized SensorWhiteSet: all_timestamps=%s, n_timestamps=%s", all_timestamps, len(white_timestamps), )
def __repr__(self) -> str: """Return a string representation for debugging. Returns ------- str String representation showing whitelist status. """ if self.all_timestamps: return f"{type(self).__name__}(all_timestamps=True)" return f"{type(self).__name__}(n_timestamps={len(self.white_timestamps)})" def __str__(self) -> str: """Return a string representation for printing. Returns ------- str String representation of the SensorWhiteSet. """ return self.__repr__() def _fmt_for_xr_attr(self) -> str: """Format whitelist information for inclusion in xarray attributes. Returns ------- str Formatted string summarizing the whitelist status. """ if self._flag_has_whites(): return "Empty SensorWhiteSet (no whitelisted timestamps)" if self.all_timestamps: return "All timestamps whitelisted" else: return f"{len(self.white_timestamps)} whitelisted timestamps" def _flag_has_whites(self) -> bool: """Check if any timestamps are whitelisted. Returns ------- bool True if any timestamps are whitelisted, False otherwise. """ if self.all_timestamps: return True if len(self.white_timestamps) > 0: return True return False def _flag_all_timestamps_are_whites(self) -> bool: """Check if all timestamps are whitelisted. Returns ------- bool True if all timestamps are whitelisted, False otherwise. """ return self.all_timestamps def _get_white_timestamps(self) -> pd.DatetimeIndex: """Get whitelisted timestamps as a DatetimeIndex. Returns ------- pd.DatetimeIndex DatetimeIndex containing all whitelisted timestamps. """ dtindex = timestamps_to_datetimeindex( timestamps=self.white_timestamps, current_tz=None, # self.white_timestamps is already tz aware name="datetime", ) return convert_timezone(dtindex, target_tz=SensorWhiteSet._target_tz) def catch_white_records(self, outliers_idx: pd.DatetimeIndex) -> pd.DatetimeIndex: """Remove whitelisted timestamps from outliers index. Filters the provided outliers index by removing any timestamps that are whitelisted in this SensorWhiteSet. Parameters ---------- outliers_idx : pd.DatetimeIndex Index of outlier timestamps to filter. Returns ------- pd.DatetimeIndex Filtered outliers index with whitelisted records removed. """ logger.debug( "Filtering %s outliers with SensorWhiteSet (has_whites=%s)", len(outliers_idx), self._flag_has_whites(), ) if self._flag_has_whites(): if self._flag_all_timestamps_are_whites(): # all timestamps are white, return empty index logger.debug( "All timestamps whitelisted, removing all %s outliers", len(outliers_idx), ) outliers = timestamps_to_datetimeindex( [], name="datetime", current_tz=SensorWhiteSet._target_tz ) else: # Get the white timestamps white_records = self._get_white_timestamps() # Remove white records from outliers outliers = outliers_idx.difference(white_records) n_filtered = len(outliers_idx) - len(outliers) logger.debug( "Filtered out %s whitelisted outliers, %s outliers remain", n_filtered, len(outliers), ) else: # no whites outliers = outliers_idx logger.debug( "No whitelist applied, all %s outliers retained", len(outliers) ) return outliers
[docs] class WhiteSet: """Whitelist container for multiple stations and observation types. This class manages a collection of whitelisted records across multiple stations and observation types. It uses a pandas Index or MultiIndex with optional levels for 'name' (station), 'obstype', and 'datetime' to define which records should be excluded from outlier detection in QC checks. Parameters ---------- white_records : pd.Index, optional Index with levels 'name', 'obstype', and/or 'datetime' defining whitelisted records. Default is an empty Index. Notes ----- * The white_records index must contain at least one of: 'name', 'obstype', or 'datetime' as level names. If 'datetime' is not present, all timestamps for matching station/obstype combinations are whitelisted. * **Timezone handling**: If a 'datetime' level is present: - Timezone-aware timestamps are automatically converted to UTC - Timezone-naive timestamps are localized to UTC with a warning - It is strongly recommended to provide timezone-aware timestamps to avoid ambiguity """ # Class variable for internal timezone storage _target_tz: str = Settings.get("store_tz")
[docs] def __init__(self, white_records: pd.Index = pd.Index([])) -> None: self.white_records = white_records # Validate white_records structure self._self_test_white_records() # Format datetime values to UTC self._fmt_datetimes()
def __repr__(self) -> str: """Return a string representation for debugging. Returns ------- str String representation showing the number of records and index levels, or '(empty)' if no records are present. """ if self._flag_is_empty(): return f"{type(self).__name__}(empty)" levels = list(self.white_records.names) n_records = len(self.white_records) return f"{type(self).__name__}(n_records={n_records}, levels={levels})" def __str__(self) -> str: """Return a string representation for printing. Returns ------- str String representation of the WhiteSet. """ return self.__repr__() def _fmt_for_xr_attr(self) -> str: """Format whitelist information for inclusion in xarray attributes. Returns ------- str Formatted string summarizing the whitelist status. """ if self._flag_is_empty(): return "Empty WhiteSet (no whitelisted records)" else: return self.__repr__() def _fmt_datetimes(self) -> None: """Format datetime index level to ensure UTC timezone. Checks if 'datetime' is present in the white_records index. If so: - If timestamps are timezone-aware, converts them to UTC - If timestamps are timezone-naive, localizes them to UTC with a warning This method modifies self.white_records in-place. """ if self._flag_is_empty(): return None # Check if datetime level exists if "datetime" not in self.white_records.names: return None # Get the datetime values dt_values = self.white_records.get_level_values("datetime") # Convert to tz-aware Datetimeindex (tz is localized to UTC if naive, else it is converted) dt_index = timestamps_to_datetimeindex( timestamps=dt_values, current_tz="UTC", name="datetime" ) dt_index = convert_timezone(dt_index, target_tz=WhiteSet._target_tz) # Reconstruct the index with formatted datetimes if isinstance(self.white_records, pd.MultiIndex): # Drop the existing 'datetime' level and add it back with formatted values temp_index = self.white_records.droplevel("datetime") self.white_records = ( temp_index.to_frame() .assign(datetime=dt_index) .set_index("datetime", append=True) .index ) else: # For simple Index with only datetime self.white_records = dt_index logger.debug("Datetime formatting completed") def _self_test_white_records(self) -> None: """Validate the structure and content of white_records index. Validates that white_records has a valid structure for use in QC methods. The index must contain at least one of the expected level names ('name', 'obstype', or 'datetime') and may not contain any unexpected levels. Raises ------ ValueError If white_records does not contain at least one of 'name', 'obstype', or 'datetime' as index level names. ValueError If white_records contains unexpected index levels. """ if self._flag_is_empty(): return None logger.debug( "Validating WhiteSet structure with levels: %s", self.white_records.names ) if not any( [ idxname in self.white_records.names for idxname in ["name", "obstype", "datetime"] ] ): logger.debug("Validation failed: missing required index levels") raise ValueError( "white_records must contain at least one of the following index levels: 'name', 'obstype', 'datetime'" ) if not all( [ idxname in ["name", "obstype", "datetime"] for idxname in self.white_records.names ] ): logger.debug("Validation failed: unexpected index levels found") raise ValueError( "white_records contains unexpected index levels. Only 'name', 'obstype', and 'datetime' are allowed." ) logger.debug("WhiteSet validation passed") def _flag_is_empty(self) -> bool: """Check if white_records is empty. Returns ------- bool True if white_records is empty, False otherwise. """ return self.white_records.empty
[docs] @log_entry def get_info(self, printout: bool = True) -> Union[str, None]: """ Retrieve and optionally print detailed information about the WhiteSet. Parameters ---------- printout : bool, optional If True, prints the information to the console. If False, returns the information as a string. Default is True. Returns ------- str or None A string containing the WhiteSet information if `printout` is False. Otherwise, returns None. """ infostr = "" infostr += printing.print_fmt_title("General info of WhiteSet") if self._flag_is_empty(): infostr += printing.print_fmt_section("Whitelist details") infostr += printing.print_fmt_line( "Empty WhiteSet (no whitelisted records)" ) else: # Basic information infostr += printing.print_fmt_section("Whitelist details") n_records = len(self.white_records) infostr += printing.print_fmt_line(f"Total records: {n_records}") # Index levels information levels = [lvl for lvl in self.white_records.names if lvl is not None] infostr += printing.print_fmt_line(f"Index levels: {', '.join(levels)}") # Count unique values per level if "name" in levels: n_stations = self.white_records.get_level_values("name").nunique() stations = sorted(self.white_records.get_level_values("name").unique()) infostr += printing.print_fmt_line( f"Stations ({n_stations}): {', '.join(map(str, stations))}" ) if "obstype" in levels: n_obstypes = self.white_records.get_level_values("obstype").nunique() obstypes = sorted( self.white_records.get_level_values("obstype").unique() ) infostr += printing.print_fmt_line( f"Observation types ({n_obstypes}): {', '.join(map(str, obstypes))}" ) if "datetime" in levels: n_times = self.white_records.get_level_values("datetime").nunique() infostr += printing.print_fmt_line(f"Unique timestamps: {n_times}") # Time range information datetimes = self.white_records.get_level_values("datetime") min_time = datetimes.min() max_time = datetimes.max() infostr += printing.print_fmt_line( f"Time range: {min_time} to {max_time}", 1 ) else: infostr += printing.print_fmt_line( "All timestamps whitelisted (no 'datetime' level)" ) if printout: print(infostr) else: return infostr
[docs] @log_entry def create_sensorwhitelist(self, stationname: str, obstype: str) -> SensorWhiteSet: """Create a sensor-specific whitelist for a station and observation type. Filters the white_records by station name and obstype to create a SensorWhiteSet containing only the relevant whitelisted timestamps. Parameters ---------- stationname : str Target station name to filter for. obstype : str Target observation type to filter for. Returns ------- SensorWhiteSet A SensorWhiteSet instance containing whitelisted timestamps for the specified station and obstype combination. Notes ----- If the white_records index does not contain a 'datetime' level but does match the station/obstype, all timestamps are whitelisted for that sensor. """ logger.debug( "Creating SensorWhiteSet for station='%s', obstype='%s'", stationname, obstype, ) if self._flag_is_empty(): logger.debug("WhiteSet is empty, returning empty SensorWhiteSet") return SensorWhiteSet(white_timestamps=None, all_timestamps=False) # Filter white_records for the target station and obstype trg_whitelist = self.white_records if "name" in trg_whitelist.names: if stationname in trg_whitelist.get_level_values("name"): trg_whitelist = trg_whitelist[ (trg_whitelist.get_level_values("name") == stationname) ] logger.debug( "Filtered whitelist by station name, %s records remain", len(trg_whitelist), ) else: # name is specified, but no matches in whitelist logger.debug( "Station '%s' not found in whitelist, returning empty SensorWhiteSet", stationname, ) return SensorWhiteSet(white_timestamps=None, all_timestamps=False) # filter on obstype if present if "obstype" in trg_whitelist.names: if obstype in trg_whitelist.get_level_values("obstype"): trg_whitelist = trg_whitelist[ (trg_whitelist.get_level_values("obstype") == obstype) ] logger.debug( "Filtered whitelist by obstype, %s records remain", len(trg_whitelist), ) else: # obstype is specified, but no matches in whitelist logger.debug( "Obstype '%s' not found in whitelist, returning empty SensorWhiteSet", obstype, ) return SensorWhiteSet(white_timestamps=None, all_timestamps=False) if "datetime" in trg_whitelist.names: # Get the white datetimes white_datetimes = pd.DatetimeIndex( trg_whitelist.get_level_values("datetime").unique() ) logger.debug( "Created SensorWhiteSet with %s unique timestamps", len(white_datetimes) ) return SensorWhiteSet( white_timestamps=white_datetimes, all_timestamps=False ) else: # if no datetime level is set, and name and/or obstype match, all timestamps are white logger.debug( "No datetime level in whitelist, all timestamps whitelisted for this sensor" ) return SensorWhiteSet(white_timestamps=None, all_timestamps=True)