import math
from typing import Union
import logging
import numpy as np
import pandas as pd
import pint
from metobs_toolkit.backend_collection.errorclasses import (
MetObsUnitsIncompatible,
MetObsUnitUnknown,
)
import metobs_toolkit.backend_collection.printing_collection as printing
# Use logger with name "<metobs_toolkit>"
logger = logging.getLogger("<metobs_toolkit>")
# ------------------------------------------
# Setup unit registry
# ------------------------------------------
ureg = pint.UnitRegistry(system="SI")
ureg.formatter.default_format = ".3f"
pint.set_application_registry(ureg)
def fmt_unit_to_str(unit) -> str:
"""
Format a pint unit or quantity to string.
Parameters
----------
unit : pint.Unit or pint.Quantity
The unit or quantity to format.
Returns
-------
str
The formatted unit as string.
"""
logger.debug(f"{fmt_unit_to_str.__name__} called")
if isinstance(unit, pint.Unit):
return str(unit)
if isinstance(unit, pint.Quantity):
if unit.magnitude == 1:
return str(unit.u)
else:
# a non-trivial quantity
return str(unit)
return str(unit)
[docs]
class Obstype:
"""
Class representing an observation type with standard unit and description.
Parameters
----------
obsname : str
Name of the observation type.
std_unit : str or pint.Unit
Standard unit for the observation type.
description : str
Description of the observation type.
"""
[docs]
def __init__(self, obsname: str, std_unit: Union[str, pint.Unit], description: str):
# set name
self._name = str(obsname)
# set standard unit
self._std_unit = _fmtunit(std_unit)
# set description
self._description = str(description)
# open slots
self._original_name = None
self._original_unit = None
def __eq__(self, other) -> bool:
"""Check equality with another Obstype object."""
if not isinstance(other, Obstype):
return False
return (
self._name == other._name
and self._std_unit == other._std_unit
and self._description == other._description
)
@property
def name(self) -> str:
"""Return the name of the observation type."""
return str(self._name)
@property
def std_unit(self) -> str:
"""Return the standard unit as string."""
return fmt_unit_to_str(self._std_unit)
@property
def description(self) -> str:
"""Return the description of the observation type."""
return str(self._description)
@description.setter
def description(self, value: str):
"""Set the description of the observation type."""
self._description = str(value)
@property
def orginal_name(self):
"""Return the original name of the observation type."""
return str(self._original_name)
@orginal_name.setter
def original_name(self, value):
"""Set the original name of the observation type."""
self._original_name = str(value)
[docs]
def get_compatible_units(self) -> list:
"""
Get all units compatible with the standard unit.
Returns
-------
list
List of compatible units as strings.
"""
logger.debug(
f"{self.__class__.__name__}.get_compatible_units called for {self}"
)
# Get all units related to the same dimension
compunits = list(ureg.get_compatible_units(self._std_unit.dimensionality))
# return the units as strings
return [fmt_unit_to_str(uni) for uni in compunits]
[docs]
def is_compatible_with(self, other: "Obstype") -> bool:
"""
Test if the other Obstype is compatible with this one.
Parameters
----------
other : Obstype
The other Obstype to test compatibility with.
Returns
-------
bool
True if compatible, False otherwise.
"""
logger.debug(f"{self.__class__.__name__}.is_compatible_with called for {self}")
return self._std_unit.is_compatible_with(other._std_unit)
@property
def original_unit(self) -> str:
"""Return the original unit as string."""
return fmt_unit_to_str(self._original_unit)
@original_unit.setter
def original_unit(self, value):
"""Set the original unit and check compatibility with standard unit."""
self._original_unit = _fmtunit(value)
# test if it is a compatible unit wrt the standard unit
if not self._original_unit.is_compatible_with(self._std_unit):
raise MetObsUnitsIncompatible(
f"{self._original_unit} is not compatible with the standard unit ({self.std_unit} of {self}) "
)
def __repr__(self):
"""Instance representation."""
return f"{type(self).__name__} instance of {self.name}"
def __str__(self):
"""Text representation."""
return f"{type(self).__name__} instance of {self.name}"
def _get_info_core(self) -> str:
infostr = ""
infostr += printing.print_fmt_line(f"{self.name} observation with:", 0)
infostr += printing.print_fmt_line(f"standard unit: {self.std_unit}")
infostr += printing.print_fmt_line(f"description: {self.description}")
return infostr
[docs]
def get_info(self, printout: bool = True) -> Union[None, str]:
"""
Print or return detailed information of the observation type.
Parameters
----------
printout : bool, optional
If True, print the information. If False, return as string. Default is True.
Returns
-------
None or str
None if printout is True, otherwise the information string.
"""
logger.debug(f"{self.__class__.__name__}.get_info called for {self}")
infostr = ""
infostr += printing.print_fmt_title("General info of Obstype")
infostr += self._get_info_core()
if printout:
print(infostr)
else:
return infostr
def _get_plot_y_label(self) -> str:
"""Return a string to represent the vertical axes of a plot."""
return f"{self.name} ({self.std_unit})"
[docs]
def convert_to_standard_units(self, input_data, input_unit):
"""
Convert input data to the standard unit.
Parameters
----------
input_data : array-like, pd.Series, int, or float
The data to convert.
input_unit : str or pint.Unit
The unit of the input data.
Returns
-------
array-like, pd.Series, int, or float
The converted data in standard units.
"""
logger.debug(
f"{self.__class__.__name__}.convert_to_standard_units called for {self}"
)
# format input unit
input_unit = _fmtunit(input_unit)
# Test if inputunit is compatible with std unit
if not input_unit.is_compatible_with(self._std_unit):
raise MetObsUnitUnknown(
f"{input_unit} is not compatible with the standard unit ({self.std_unit} of {self}) "
)
# convert data
return convert_units(
records=input_data, cur_unit=input_unit, trg_unit=self._std_unit
)
class ModelObstype(Obstype):
"""
Class representing a model observation type, extending Obstype.
Parameters
----------
obstype : Obstype
The base Obstype.
model_unit : str or pint.Unit
The model's unit.
model_band : str
The model's band name.
"""
def __init__(
self, obstype: Obstype, model_unit: Union[str, pint.Unit], model_band: str
):
# set regular obstype
super().__init__(
obsname=obstype.name,
std_unit=obstype.std_unit,
description=obstype.description,
)
# Set modelunit
self._model_unit = _fmtunit(model_unit)
# test if it is a compatible unit wrt the standard unit
if not self._model_unit.is_compatible_with(self._std_unit):
raise MetObsUnitUnknown(
f"{self._model_unit} is not compatible with the standard unit ({self.std_unit} of {self}) "
)
# Set modelband
self._model_band = str(model_band)
@property
def model_unit(self) -> str:
"""Return the model unit as string."""
return str(self._model_unit)
@property
def model_band(self) -> str:
"""Return the model band as string."""
return str(self._model_band)
def get_info(self, printout: bool = True) -> Union[None, str]:
"""
Print or return detailed information of the model observation type.
Parameters
----------
printout : bool, optional
If True, print the information. If False, return as string. Default is True.
Returns
-------
None or str
None if printout is True, otherwise the information string.
"""
logger.debug(f"{self.__class__.__name__}.get_info called for {self}")
infostr = ""
infostr += printing.print_fmt_title("General info of ModelObstype")
infostr += printing.print_fmt_section("Obstype info")
infostr += super()._get_info_core()
infostr += printing.print_fmt_section("Model related info")
infostr += printing.print_fmt_line(f"corresponding bandname: {self.model_band}")
infostr += printing.print_fmt_line(
f"original modeldata unit: {self.model_unit}"
)
if printout:
print(infostr)
else:
return infostr
class ModelObstype_Vectorfield(Obstype):
"""
Class representing a model observation type for vector fields.
Parameters
----------
obstype : Obstype
The base Obstype.
model_unit : str or pint.Unit
The model's unit.
model_band_u : str
The model's U-component band name.
model_band_v : str
The model's V-component band name.
amplitude_obstype_name : str
Name for the amplitude obstype.
direction_obstype_name : str
Name for the direction obstype.
"""
def __init__(
self,
obstype: Obstype,
model_unit: Union[str, pint.Unit],
model_band_u: str,
model_band_v: str,
amplitude_obstype_name: str,
direction_obstype_name: str,
):
# set regular obstype
super().__init__(
obsname=obstype.name,
std_unit=obstype.std_unit,
description=obstype.description,
)
# Set modelunit
self._model_unit = _fmtunit(model_unit)
# test if it is a compatible unit wrt the standard unit
if not self._model_unit.is_compatible_with(self._std_unit):
raise MetObsUnitUnknown(
f"{self._model_unit} is not compatible with the standard unit ({self.std_unit} of {self}) "
)
# Set bandnames
self._model_band_u = str(model_band_u)
self._model_band_v = str(model_band_v)
self._amp_obs_name = str(amplitude_obstype_name)
self._dir_obs_name = str(direction_obstype_name)
@property
def model_unit(self) -> str:
"""Return the model unit as string."""
return str(self._model_unit)
@property
def model_band_u(self) -> str:
"""Return the model U-component band as string."""
return str(self._model_band_u)
@property
def model_band_v(self) -> str:
"""Return the model V-component band as string."""
return str(self._model_band_v)
@property
def amplitude_obstype_name(self) -> str:
"""Return the amplitude obstype name as string."""
return str(self._amp_obs_name)
@property
def direction_obstype_name(self) -> str:
"""Return the direction obstype name as string."""
return str(self._dir_obs_name)
def get_info(self, printout: bool = True) -> Union[None, str]:
"""
Print or return detailed information of the vectorfield model observation type.
Parameters
----------
printout : bool, optional
If True, print the information. If False, return as string. Default is True.
Returns
-------
None or str
None if printout is True, otherwise the information string.
"""
logger.debug(f"{self.__class__.__name__}.get_info called for {self}")
infostr = ""
infostr += printing.print_fmt_title("General info of ModelObstype_Vectorfield")
infostr += printing.print_fmt_section("Obstype info")
infostr += super()._get_info_core()
infostr += printing.print_fmt_section("Model related info")
infostr += printing.print_fmt_line(f"U-component bandname: {self.model_band_u}")
infostr += printing.print_fmt_line(f"in {self.model_unit}", 2)
infostr += printing.print_fmt_line(f"V-component bandname: {self.model_band_v}")
infostr += printing.print_fmt_line(f"in {self.model_unit}", 2)
if printout:
print(infostr)
else:
return infostr
def _get_plot_y_label(self) -> str:
"""Return a string to represent the vertical axes of a plot."""
return f"{self.name} ({self.std_unit})\n originates from {self.original_name}"
def _compute_angle(self, df: pd.DataFrame) -> Union[pd.DataFrame, ModelObstype]:
"""
Compute vector direction of 2D vectorfield components.
The direction column is added to the dataframe and a new ModelObstype,
representing the angle is returned. The values represent the angles in
degrees, from north in clockwise rotation.
Parameters
----------
df : pandas.DataFrame
The dataframe with the vector components present as columns.
Returns
-------
data : pandas.DataFrame
The df with an extra column representing the directions.
amplitude_obstype : ModelObstype
The (scalar) ModelObstype representation of the angles.
"""
logger.debug(f"{self.__class__.__name__}._compute_angle called for {self}")
def unit_vector(vector):
"""Returns the unit vector of the vector."""
return vector / np.linalg.norm(vector)
def angle_between(u_comp, v_comp):
"""Returns the angle in ° from North (CW) from 2D Vector components."""
v2 = (u_comp, v_comp)
v1_u = unit_vector((0, 1)) # North unit arrow
v2_u = unit_vector(v2)
angle_rad = np.arccos(np.clip(np.dot(v1_u, v2_u), -1.0, 1.0))
angle_degrees = angle_rad * ((180.0 / math.pi))
# fix the quadrants
if (v2[0] >= 0) & (v2[1] >= 0):
# N-E quadrant
return angle_degrees
if (v2[0] >= 0) & (v2[1] < 0):
# S-E quadrant
return angle_degrees
if (v2[0] < 0) & (v2[1] < 0):
# S-W quadrant
return 180.0 + (180.0 - angle_degrees)
if (v2[0] < 0) & (v2[1] >= 0):
# N-W quadrant
return 360.0 - angle_degrees
u_column = self.model_band_u
v_column = self.model_band_v
data = df.apply(lambda x: angle_between(x[u_column], x[v_column]), axis=1)
# Create a new obstype for the direction
direction_obstype = Obstype(
obsname=self.direction_obstype_name,
std_unit=ureg.degree,
description=f"Direction of 2D-vector of {self.name} components.",
)
# convert to model obstype
direction_modelobstype = ModelObstype(
obstype=direction_obstype,
model_unit=ureg.degree, # indep of units
model_band=self.direction_obstype_name, # NOTE: this band does not exist, but column is created with this name by the toolkit
)
direction_modelobstype._originates_from_vectorfield = True
return data, direction_modelobstype
def compute_amplitude(self, df: pd.DataFrame) -> Union[pd.DataFrame, ModelObstype]:
"""
Compute amplitude of 2D vectorfield components.
The amplitude column is added to the dataframe and a new ModelObstype,
representing the amplitude is returned. All attributes with respect to the units are
inherited from the ModelObstype_Vectorfield.
Parameters
----------
df : pandas.DataFrame
The dataframe with the vector components present as columns.
Returns
-------
data : pandas.DataFrame
The df with an extra column representing the amplitudes.
amplitude_obstype : ModelObstype
The (scalar) ModelObstype representation of the amplitudes.
"""
logger.debug(f"{self.__class__.__name__}.compute_amplitude called for {self}")
# Compute the data
data = ((df[self.model_band_u].pow(2)) + (df[self.model_band_v].pow(2))).pow(
1.0 / 2
)
# Create a new Obstype for the amplitude
amplitude_obstype = Obstype(
obsname=self.amplitude_obstype_name,
std_unit=self.std_unit,
description=f"2D-vector amplitude of {self.name} components.",
)
# convert to model obstype
amplitude_modelobstype = ModelObstype(
obstype=amplitude_obstype,
model_unit=self.model_unit,
model_band=self.amplitude_obstype_name,
) # NOTE: this band does not exist, but column is created with this name by the toolkit
amplitude_modelobstype._originates_from_vectorfield = True
return data, amplitude_modelobstype
# ------------------------------------------
# Helpers
# ------------------------------------------
def is_known_unit(unit: str) -> bool:
"""
Check if a unit string is known to the unit registry.
Parameters
----------
unit : str
The unit string to check.
Returns
-------
bool
True if known, False otherwise.
"""
logger.debug("is_known_unit called")
try:
ureg.parse_expression(unit)
return True
except pint.errors.UndefinedUnitError:
return False
def _fmtunit(value) -> pint.Unit:
"""
Convert unit input to pint.Unit.
Parameters
----------
value : str, pint.Unit, or pint.Quantity
The value to convert.
Returns
-------
pint.Unit
The formatted unit.
Raises
------
MetObsUnitUnknown
If the value cannot be converted to a known unit.
"""
logger.debug("_fmtunit called")
if isinstance(value, pint.Unit):
return value
elif isinstance(value, pint.Quantity):
if value.magnitude == 1:
return value.u
else:
raise MetObsUnitUnknown(
f"{value} is a pint.Quantity with non-1 magnitude and cannot be converted to a unit."
)
elif isinstance(value, str):
if not is_known_unit(value):
raise MetObsUnitUnknown(
f"{value} is not a known unit. (See https://github.com/hgrecco/pint/blob/master/pint/default_en.txt for all known units.)"
)
else:
return ureg.parse_expression(value)
else:
raise MetObsUnitUnknown(f"{value} is not a string or pint.Unit.")
def convert_units(records, cur_unit, trg_unit):
"""
Convert records from one unit to another.
Parameters
----------
records : pd.Series, np.ndarray, int, or float
The data to convert.
cur_unit : str or pint.Unit
The current unit of the data.
trg_unit : str or pint.Unit
The target unit to convert to.
Returns
-------
pd.Series, np.ndarray, int, or float
The converted data.
"""
logger.debug(f"Converting data from {cur_unit} --> {trg_unit} ")
if isinstance(records, pd.Series):
trgvalues = ureg.Quantity(records.to_numpy(), cur_unit).to(trg_unit)
return pd.Series(
index=records.index, data=trgvalues.magnitude, name=records.name
)
elif isinstance(records, np.ndarray):
return ureg.Quantity(records, cur_unit).to(trg_unit).magnitude
elif isinstance(records, int):
return ureg.Quantity(records, cur_unit).to(trg_unit).magnitude
elif isinstance(records, float):
return ureg.Quantity(records, cur_unit).to(trg_unit).magnitude
else:
raise NotImplementedError(f"{records} is not a supported input type.")
# ------------------------------------------
# Default obstypes
# ------------------------------------------
temperature = Obstype(
obsname="temp", std_unit=ureg.degC, description="2m - temperature"
)
humidity = Obstype(
obsname="humidity",
std_unit=ureg.percent,
description="2m - relative humidity",
)
radiation_temp = Obstype(
obsname="radiation_temp",
std_unit=ureg.degC,
description="2m - Black globe",
)
pressure = Obstype(
obsname="pressure",
std_unit=ureg.hectopascal,
description="atmospheric pressure (at station)",
)
pressure_at_sea_level = Obstype(
obsname="pressure_at_sea_level",
std_unit=ureg.hectopascal,
description="atmospheric pressure (at sea level)",
)
precip = Obstype(
obsname="precip",
std_unit=ureg.mm / (ureg.meter * ureg.meter),
description="precipitation intensity",
)
precip_sum = Obstype(
obsname="precip_sum",
std_unit=ureg.mm / (ureg.meter * ureg.meter),
description="Cummulated precipitation",
)
wind_speed = Obstype(
obsname="wind_speed",
std_unit=ureg.meter / ureg.second,
description="wind speed",
)
windgust = Obstype(
obsname="wind_gust",
std_unit=ureg.meter / ureg.second,
description="wind gust",
)
wind_direction = Obstype(
obsname="wind_direction",
std_unit=ureg.degree,
description="wind direction",
)
tlk_obstypes = {
"temp": temperature,
"humidity": humidity,
"radiation_temp": radiation_temp,
"pressure": pressure,
"pressure_at_sea_level": pressure_at_sea_level,
"precip": precip,
"precip_sum": precip_sum,
"wind_speed": wind_speed,
"wind_gust": windgust,
"wind_direction": wind_direction,
}
temp_era5 = ModelObstype(
obstype=temperature, model_unit=ureg.degK, model_band="temperature_2m"
)
temp_era5.description = "Temperature of air at 2m above the surface of land, sea or in-land waters. 2m temperature is calculated by interpolating between the lowest model level and the Earth's surface, taking account of the atmospheric conditions."
pressure_era5 = ModelObstype(
obstype=pressure, model_unit="pascal", model_band="surface_pressure"
)
pressure_era5.description = (
"Pressure (force per unit area) of the atmosphere on the surface of land, sea and in-land water. It is a measure of the weight of all the air in a column vertically above the area of the Earth's surface represented at a fixed point. Surface pressure is often used in combination with temperature to calculate air density. The strong variation of pressure with altitude makes it difficult to see the low and high pressure systems over mountainous areas, so mean sea level pressure, rather than surface pressure, is normally used for this purpose. The units of this variable are Pascals (Pa). Surface pressure is often measured in hPa and sometimes is presented in the old units of millibars, mb (1 hPa = 1 mb = 100 Pa).",
)
# create a new obstype that represent the vectorfield of wind
wind_components = Obstype(
"wind",
std_unit=wind_speed.std_unit,
description=None,
)
wind_era5 = ModelObstype_Vectorfield(
obstype=wind_components,
model_unit=ureg.meter / ureg.second,
model_band_u="u_component_of_wind_10m",
model_band_v="v_component_of_wind_10m",
amplitude_obstype_name="wind_speed",
direction_obstype_name="wind_direction",
)
wind_era5.description = "2D-vector combined 10m windspeed. Care should be taken when comparing this variable with observations, because wind observations vary on small space and time scales and are affected by the local terrain, vegetation and buildings that are represented only on average in the ECMWF Integrated Forecasting System."
default_era5_obstypes = [temp_era5, pressure_era5, wind_era5]