#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
This module contains the Dataset class and all its methods.
A Dataset holds all observations and is at the center of the
MetObs-toolkit.
"""
import os
import sys
import copy
from datetime import timedelta
import pytz
import logging
import pandas as pd
import numpy as np
import pickle
from metobs_toolkit.settings import Settings
from metobs_toolkit.data_import import import_data_from_csv, import_metadata_from_csv
from metobs_toolkit.template import Template
from metobs_toolkit.printing import print_dataset_info
from metobs_toolkit.landcover_functions import (
connect_to_gee,
lcz_extractor,
height_extractor,
lc_fractions_extractor,
_validate_metadf,
)
from metobs_toolkit.plotting_functions import (
geospatial_plot,
timeseries_plot,
qc_stats_pie,
folium_plot,
add_stations_to_folium_map,
make_folium_html_plot,
)
from metobs_toolkit.qc_checks import (
gross_value_check,
persistance_check,
repetitions_check,
duplicate_timestamp_check,
step_check,
window_variation_check,
invalid_input_check,
toolkit_buddy_check,
titan_buddy_check,
titan_sct_resistant_check,
)
from metobs_toolkit.qc_statistics import get_freq_statistics
from metobs_toolkit.writing_files import write_dataset_to_csv
from metobs_toolkit.missingobs import Missingob_collection
from metobs_toolkit.gap import (
Gap,
remove_gaps_from_obs,
remove_gaps_from_outliers,
missing_timestamp_and_gap_check,
get_gaps_indx_in_obs_space,
get_station_gaps,
apply_interpolate_gaps,
make_gapfill_df,
apply_debias_era5_gapfill,
gaps_to_df,
)
from metobs_toolkit.df_helpers import (
multiindexdf_datetime_subsetting,
fmt_datetime_argument,
init_multiindex,
init_multiindexdf,
init_triple_multiindexdf,
metadf_to_gdf,
conv_applied_qc_to_df,
get_freqency_series,
value_labeled_doubleidxdf_to_triple_idxdf,
xs_save,
concat_save,
)
from metobs_toolkit.obstypes import tlk_obstypes
from metobs_toolkit.obstypes import Obstype as Obstype_class
from metobs_toolkit.analysis import Analysis
from metobs_toolkit.modeldata import Modeldata
from metobs_toolkit.datasetbase import _DatasetBase
logger = logging.getLogger(__name__)
# =============================================================================
# Dataset class
# =============================================================================
class Dataset(_DatasetBase):
"""Objects holding observations and methods on observations."""
[docs]
def __init__(self):
"""Construct all the necessary attributes for Dataset object."""
logger.info("Initialise dataset")
_DatasetBase.__init__(self) # holds df, metadf, obstypes and settings
# Dataset with outlier observations
self.outliersdf = init_triple_multiindexdf()
self.missing_obs = None # becomes a Missingob_collection after import
self.gaps = None # becomes a list of gaps
self.gapfilldf = init_multiindexdf()
self.missing_fill_df = init_multiindexdf()
# Template for mapping data and metadata
self.template = Template()
self._istype = "Dataset"
self._freqs = pd.Series(dtype=object)
self._applied_qc = pd.DataFrame(columns=["obstype", "checkname"])
self._qc_checked_obstypes = [] # list with qc-checked obstypes
def __str__(self):
"""Represent as text."""
if self.df.empty:
if self._istype == "Dataset":
return "Empty instance of a Dataset."
elif self._istype == "Station":
return "Empty instance of a Station."
else:
return "Empty instance of a Analysis."
add_info = ""
n_stations = self.df.index.get_level_values("name").unique().shape[0]
n_obs_tot = self.df.shape[0]
n_outl = self.outliersdf.shape[0]
startdt = self.df.index.get_level_values("datetime").min()
enddt = self.df.index.get_level_values("datetime").max()
if (not self.metadf["lat"].isnull().all()) & (
not self.metadf["lon"].isnull().all()
):
add_info += " *Coordinates are available for all stations."
return (
f"{self._istype} instance containing: \n \
*{n_stations} stations \n \
*{self.df.columns.to_list()} observation types \n \
*{n_obs_tot} observation records \n \
*{n_outl} records labeled as outliers \n \
*{len(self.gaps)} gaps \n \
*{self.missing_obs.series.shape[0]} missing observations \n \
*records range: {startdt} --> {enddt} (total duration: {enddt - startdt}) \n \
*time zone of the records: {self.settings.time_settings['timezone']} \n "
+ add_info
)
def __repr__(self):
"""Info representation."""
return self.__str__()
def __add__(self, other, gapsize=None):
"""Addition of two Datasets."""
# important !!!!!
# the toolkit makes a new dataframe, and assumes the df from self and other
# to be the input data.
# This means that missing obs, gaps, invalid and duplicated records are
# being looked for in the concatenation of both dataset, using their current
# resolution !
new = Dataset()
self_obstypes = self.df.columns.to_list().copy()
# ---- df ----
# check if observation of self are also in other
assert all([(obs in other.df.columns) for obs in self_obstypes])
# subset obstype of other to self
other.df = other.df[self.df.columns.to_list()]
# remove duplicate rows
common_indexes = self.df.index.intersection(other.df.index)
other.df = other.df.drop(common_indexes)
# set new df
new.df = concat_save([self.df, other.df])
new.df = new.df.sort_index()
# ----- outliers df ---------
other_outliers = other.outliersdf.reset_index()
other_outliers = other_outliers[other_outliers["obstype"].isin(self_obstypes)]
other_outliers = other_outliers.set_index(["name", "datetime", "obstype"])
new.outliersdf = concat_save([self.outliersdf, other_outliers])
new.outliersdf = new.outliersdf.sort_index()
# ------- Gaps -------------
# Gaps have to be recaluculated using a frequency assumtion from the
# combination of self.df and other.df, thus NOT the native frequency if
# their is a coarsening allied on either of them.
new.gaps = []
# ---------- missing ---------
# Missing observations have to be recaluculated using a frequency assumtion from the
# combination of self.df and other.df, thus NOT the native frequency if
# their is a coarsening allied on either of them.
new.missing_obs = None
# ---------- metadf -----------
# Use the metadf from self and add new rows if they are present in other
new.metadf = concat_save([self.metadf, other.metadf])
new.metadf = new.metadf.drop_duplicates(keep="first")
new.metadf = new.metadf.sort_index()
# ------- specific attributes ----------
# Template (units and descritpions) are taken from self
new.template = self.template
# Inherit Settings from self
new.settings = copy.deepcopy(self.settings)
# Applied qc:
# TODO: is this oke to do?
new._applied_qc = pd.DataFrame(columns=["obstype", "checkname"])
new._qc_checked_obstypes = [] # list with qc-checked obstypes
# set init_dataframe to empty
# NOTE: this is not necesarry but users will use this method when they
# have a datafile that is to big. So storing and overloading a copy of
# the very big datafile is invalid for these cases.
new.input_df = pd.DataFrame()
# ----- Apply IO QC ---------
# Apply only checks that are relevant on records in between self and other
# OR
# that are dependand on the frequency (since the freq of the .df is used,
# which is not the naitive frequency if coarsening is applied on either. )
# missing and gap check
if gapsize is None:
gapsize = new.settings.gap["gaps_settings"]["gaps_finder"]["gapsize_n"]
# note gapsize is now defined on the frequency of self
new.missing_obs, new.gaps = missing_timestamp_and_gap_check(
df=new.df,
gapsize_n=self.settings.gap["gaps_settings"]["gaps_finder"]["gapsize_n"],
)
# duplicate check
new.df, dup_outl_df = duplicate_timestamp_check(
df=new.df,
checks_info=new.settings.qc["qc_checks_info"],
checks_settings=new.settings.qc["qc_check_settings"],
)
if not dup_outl_df.empty:
new.update_outliersdf(add_to_outliersdf=dup_outl_df)
# update the order and which qc is applied on which obstype
checked_obstypes = list(self.obstypes.keys())
checknames = ["duplicated_timestamp"] # KEEP order
new._applied_qc = concat_save(
[
new._applied_qc,
conv_applied_qc_to_df(
obstypes=checked_obstypes, ordered_checknames=checknames
),
],
ignore_index=True,
)
return new
[docs]
def show(self, show_all_settings=False, max_disp_n_gaps=5):
"""Show detailed information of the Dataset.
A function to print out a detailed overview information about the Dataset.
Parameters
----------
show_all_settings : bool, optional
If True all the settings are printed out. The default is False.
max_disp_n_gaps: int, optional
The maximum number of gaps to display detailed information of.
Returns
-------
None.
Examples
--------
.. code-block:: python
>>> import metobs_toolkit
>>>
>>> #Create your Dataset
>>> dataset = metobs_toolkit.Dataset() #empty Dataset
>>>
>>> #Add observations to the Dataset
>>> dataset.update_settings(
... input_data_file=metobs_toolkit.demo_datafile,
... input_metadata_file=metobs_toolkit.demo_metadatafile,
... template_file=metobs_toolkit.demo_template,
... )
>>> dataset.import_data_from_file()
>>>
>>> # Print out details
>>> dataset.show()
-------- General ---------
...
"""
logger.info("Show basic info of dataset.")
print_dataset_info(self, show_all_settings)
[docs]
def get_info(self, show_all_settings=False, max_disp_n_gaps=5):
"""Alias of show().
A function to print out a detailed overview information about the Dataset.
Parameters
----------
show_all_settings : bool, optional
If True all the settings are printed out. The default is False.
max_disp_n_gaps: int, optional
The maximum number of gaps to display detailed information of.
Returns
-------
None.
Examples
--------
.. code-block:: python
>>> import metobs_toolkit
>>>
>>> #Create your Dataset
>>> dataset = metobs_toolkit.Dataset() #empty Dataset
>>>
>>> #Add observations to the Dataset
>>> dataset.update_settings(
... input_data_file=metobs_toolkit.demo_datafile,
... input_metadata_file=metobs_toolkit.demo_metadatafile,
... template_file=metobs_toolkit.demo_template,
... )
>>> dataset.import_data_from_file()
>>>
>>> # Print out details
>>> dataset.get_info()
-------- General ---------
...
"""
self.show(show_all_settings, max_disp_n_gaps)
[docs]
def save_dataset(
self, outputfolder=None, filename="saved_dataset.pkl", overwrite=False
):
"""Save a Dataset instance to a (pickle) file.
Parameters
----------
outputfolder : str or None, optional
The path to the folder to save the file. If None, the outputfolder
from the Settings is used. The default is None.
filename : str, optional
The name of the output file. The default is 'saved_dataset.pkl'.
overwrite : bool, optional
If True, the target file will be overwritten if it exist. The
default is False.
Returns
-------
None.
Examples
--------
.. code-block:: python
>>> import metobs_toolkit
>>> import os
>>>
>>> # Import data into a Dataset
>>> dataset = metobs_toolkit.Dataset()
>>> dataset.update_settings(
... input_data_file=metobs_toolkit.demo_datafile,
... input_metadata_file=metobs_toolkit.demo_metadatafile,
... template_file=metobs_toolkit.demo_template)
>>>
>>> dataset.import_data_from_file()
>>>
>>> # Save dataset to a .pkl file
>>> dataset.save_dataset(outputfolder=os.getcwd(),
... filename='your_saved_dataset.pkl',
... overwrite=True)
Dataset saved in ...
"""
# check if outputfolder is known and exists
if outputfolder is None:
outputfolder = self.settings.IO["output_folder"]
assert (
outputfolder is not None
), "No outputfolder is given, and no outputfolder is found in the settings."
assert os.path.isdir(outputfolder), f"{outputfolder} is not a directory!"
# check file extension in the filename:
if filename[-4:] != ".pkl":
filename += ".pkl"
full_path = os.path.join(outputfolder, filename)
if (os.path.isfile(full_path)) & overwrite:
logger.info(f"The file {full_path} will be overwritten!")
os.remove(full_path)
# check if file exists
assert not os.path.isfile(full_path), f"{full_path} is already a file!"
with open(full_path, "wb") as outp:
pickle.dump(self, outp, pickle.HIGHEST_PROTOCOL)
print(f"Dataset saved in {full_path}")
logger.info(f"Dataset saved in {full_path}")
[docs]
def import_dataset(self, folder_path=None, filename="saved_dataset.pkl"):
"""Import a Dataset instance from a (pickle) file.
Parameters
----------
folder_path : str or None, optional
The path to the folder to save the file. If None, the outputfolder
from the Settings is used. The default is None.
filename : str, optional
The name of the output file. The default is 'saved_dataset.pkl'.
Returns
-------
metobs_toolkit.Dataset
The Dataset instance.
Examples
--------
.. code-block:: python
import metobs_toolkit
import os
# Initialize an empty Dataset
empty_dataset = metobs_toolkit.Dataset()
# Import the dataset
dataset=empty_dataset.import_dataset(folder_path=os.getcwd(),
filename='your_saved_dataset.pkl')
"""
# check if folder_path is known and exists
if folder_path is None:
folder_path = self.settings.IO["output_folder"]
assert (
folder_path is not None
), "No folder_path is given, and no outputfolder is found in the settings."
assert os.path.isdir(folder_path), f"{folder_path} is not a directory!"
full_path = os.path.join(folder_path, filename)
# check if file exists
assert os.path.isfile(full_path), f"{full_path} does not exist."
with open(full_path, "rb") as inp:
dataset = pickle.load(inp)
# convert metadf to a geodataframe (if coordinates are available)
dataset.metadf = metadf_to_gdf(dataset.metadf)
return dataset
[docs]
def add_new_observationtype(self, Obstype):
"""Add a new observation type to the known observation types.
The observation can only be added if it is not already present in the
knonw observation types. If that is the case that you probably need to
use use the Dataset.add_new_unit() method.
Parameters
----------
The new Obstype to add.
Returns
-------
None.
Examples
--------
.. code-block:: python
>>> import metobs_toolkit
>>> co2_concentration = metobs_toolkit.Obstype(obsname='co2',
... std_unit='ppm')
>>> #add other units to it (if needed)
>>> co2_concentration.add_unit(unit_name='ppb',
... conversion=['x / 1000'], #1 ppb = 0.001 ppm
... )
>>> #Set a description
>>> co2_concentration.set_description(desc='The CO2 concentration measured at 2m above surface')
>>> #Add it to a Dataset
>>> dataset = metobs_toolkit.Dataset()
>>> dataset.add_new_observationtype(co2_concentration)
>>> dataset.obstypes
{'temp': Obstype instance of temp, 'humidity': Obstype instance of humidity, 'radiation_temp': Obstype instance of radiation_temp, 'pressure': Obstype instance of pressure, 'pressure_at_sea_level': Obstype instance of pressure_at_sea_level, 'precip': Obstype instance of precip, 'precip_sum': Obstype instance of precip_sum, 'wind_speed': Obstype instance of wind_speed, 'wind_gust': Obstype instance of wind_gust, 'wind_direction': Obstype instance of wind_direction, 'co2': Obstype instance of co2}
"""
# Test if the obstype is of the correct class.
if not isinstance(Obstype, Obstype_class):
sys.exit(
f"{Obstype} is not an instance of metobs_toolkit.obstypes.Obstype."
)
# Test if the obsname is already in use
if Obstype.name in self.obstypes.keys():
logger.warning(
f"{Obstype.name} is already a known observation type: {self.obstypes[Obstype.name]}"
)
return
# Update the known obstypes
logger.info(f"Adding {Obstype} to the list of knonw observation types.")
self.obstypes[Obstype.name] = Obstype
[docs]
def add_new_unit(self, obstype, new_unit, conversion_expression=[]):
"""Add a new unit to a known observation type.
Parameters
----------
obstype : str
The observation type to add the new unit to.
new_unit : str
The new unit name.
conversion_expression : list or str, optional
The conversion expression to the standard unit of the observation
type. The expression is a (list of) strings with simple algebraic
operations, where x represent the value in the new unit, and the
result is the value in the standard unit. Two examples for
temperature (with a standard unit in Celsius):
["x - 273.15"] #if the new_unit is Kelvin
["x-32.0", "x/1.8"] #if the new unit is Farenheit
The default is [].
Returns
-------
None.
Examples
--------
.. code-block:: python
>>> import metobs_toolkit
>>>
>>> #Create your Dataset
>>> dataset = metobs_toolkit.Dataset() #empty Dataset
>>>
>>> #Add new unit to a known obstype
>>> dataset.add_new_unit(obstype = 'temp',
... new_unit= 'your_new_unit',
... conversion_expression = ['x+3', 'x * 2'])
>>> # The conversion means: 1 [your_new_unit] = (1 + 3) * 2 [°C]
>>> dataset.obstypes['temp'].get_info()
temp observation with:
* standard unit: Celsius
* data column as None in None
* known units and aliases: {'Celsius': ['celsius', '°C', '°c', 'celcius', 'Celcius'], 'Kelvin': ['K', 'kelvin'], 'Farenheit': ['farenheit'], 'your_new_unit': []}
* description: 2m - temperature
* conversions to known units: {'Kelvin': ['x - 273.15'], 'Farenheit': ['x-32.0', 'x/1.8'], 'your_new_unit': ['x+3', 'x * 2']}
* originates from data column: None with None as native unit.
"""
# test if observation is present
if not obstype in self.obstypes.keys():
logger.warning(f"{obstype} is not a known obstype! No unit can be added.")
return
# check if the unit is already present
is_present = self.obstypes[obstype].test_if_unit_is_known(new_unit)
if is_present:
logger.info(
f"{new_unit} is already a known unit of {self.obstypes[obstype]}"
)
return
self.obstypes[obstype].add_unit(
unit_name=new_unit, conversion=conversion_expression
)
[docs]
def show_settings(self):
"""Show detailed information of the stored Settings.
A function that prints out all the settings, structured per thematic.
Returns
-------
None.
Examples
--------
.. code-block:: python
>>> import metobs_toolkit
>>>
>>> #Create your Dataset
>>> dataset = metobs_toolkit.Dataset() #empty Dataset
>>>
>>> #Show default settings
>>> dataset.show_settings()
All settings:...
"""
self.settings.show()
[docs]
def get_station(self, stationname):
"""Filter out one station of the Dataset.
Extract a metobs_toolkit.Station object from the dataset by name.
Parameters
----------
stationname : string
The name of the station.
Returns
-------
metobs_toolkit.Station
The station object.
Examples
--------
.. code-block:: python
>>> import metobs_toolkit
>>>
>>> #Create your Dataset
>>> dataset = metobs_toolkit.Dataset() #empty Dataset
>>>
>>> #Add observations to the Dataset
>>> dataset.update_settings(
... input_data_file=metobs_toolkit.demo_datafile,
... input_metadata_file=metobs_toolkit.demo_metadatafile,
... template_file=metobs_toolkit.demo_template,
... )
>>> dataset.import_data_from_file()
>>>
>>> dataset.get_station('vlinder12')
Station instance containing:
*1 stations
*['temp', 'humidity', 'wind_speed', 'wind_direction'] observation types
*4320 observation records
*0 records labeled as outliers
*0 gaps
*0 missing observations
*records range: 2022-09-01 00:00:00+00:00 --> 2022-09-15 23:55:00+00:00 (total duration: 14 days 23:55:00)
*time zone of the records: UTC
*Coordinates are available for all stations.
"""
from metobs_toolkit.station import Station
logger.info(f"Extract {stationname} from dataset.")
# important: make sure all station attributes are of the same time as dataset.
# so that all methods can be inherited.
try:
sta_df = self.df.xs(stationname, level="name", drop_level=False)
sta_metadf = self.metadf.loc[stationname].to_frame().transpose()
sta_metadf.index.name = "name"
except KeyError:
logger.warning(f"{stationname} not found in the dataset.")
return None
try:
sta_outliers = self.outliersdf.xs(
stationname, level="name", drop_level=False
)
except KeyError:
sta_outliers = init_triple_multiindexdf()
sta_gaps = get_station_gaps(self.gaps, stationname)
sta_missingobs = self.missing_obs.get_station_missingobs(stationname)
try:
sta_gapfill = self.gapfilldf.xs(stationname, level="name", drop_level=False)
except KeyError:
sta_gapfill = init_multiindexdf()
try:
sta_missingfill = self.missing_fill_df.xs(
stationname, level="name", drop_level=False
)
except KeyError:
sta_missingfill = init_multiindexdf()
return Station(
name=stationname,
df=sta_df,
outliersdf=sta_outliers,
gaps=sta_gaps,
missing_obs=sta_missingobs,
gapfilldf=sta_gapfill,
missing_fill_df=sta_missingfill,
metadf=sta_metadf,
obstypes=self.obstypes,
template=self.template,
settings=self.settings,
_qc_checked_obstypes=self._qc_checked_obstypes,
_applied_qc=self._applied_qc,
)
[docs]
def make_plot(
self,
stationnames=None,
obstype="temp",
colorby="name",
starttime=None,
endtime=None,
title=None,
y_label=None,
legend=True,
show_outliers=True,
show_filled=True,
_ax=None, # needed for GUI, not recommended use
):
"""
This function creates a timeseries plot for the dataset. The variable observation type
is plotted for all stationnames from a starttime to an endtime.
All styling attributes are extracted from the Settings.
Parameters
----------
stationnames : list, optional
A list with stationnames to include in the timeseries. If None is given, all the stations are used, defaults to None.
obstype : string, optional
Fieldname to visualise. This can be an observation or station
attribute. The default is 'temp'.
colorby : 'label' or 'name', optional
Indicate how colors should be assigned to the lines. 'label' will color the lines by their quality control label. 'name' will color by each station, defaults to 'name'.
starttime : datetime.datetime, optional
Specifiy the start datetime for the plot. If None is given it will use the start datetime of the dataset, defaults to None.
endtime : datetime.datetime, optional
Specifiy the end datetime for the plot. If None is given it will use the end datetime of the dataset, defaults to None.
title : string, optional
Title of the figure, if None a default title is generated. The default is None.
y_label : string, optional
y-axes label of the figure, if None a default label is generated. The default is None.
legend : bool, optional
If True, a legend is added to the plot. The default is True.
show_outliers : bool, optional
If true the observations labeld as outliers will be included in
the plot. This is only true when colorby == 'name'. The default
is True.
show_filled : bool, optional
If true the filled values for gaps and missing observations will
be included in the plot. This is only true when colorby == 'name'.
The default is True.
Returns
-------
axis : matplotlib.pyplot.axes
The timeseries axes of the plot is returned.
Note
--------
If a timezone unaware datetime is given as an argument, it is interpreted
as if it has the same timezone as the observations.
Examples
--------
.. code-block:: python
>>> import metobs_toolkit
>>>
>>> # Import data into a Dataset
>>> dataset = metobs_toolkit.Dataset()
>>> dataset.update_settings(
... input_data_file=metobs_toolkit.demo_datafile,
... input_metadata_file=metobs_toolkit.demo_metadatafile,
... template_file=metobs_toolkit.demo_template,
... )
>>> dataset.import_data_from_file()
>>>
>>> # Make plot
>>> dataset.make_plot(stationnames=['vlinder02', 'vlinder16'],
... obstype='temp',
... colorby='label')
<Axes: ...
"""
if stationnames is None:
logger.info(f"Make {obstype}-timeseries plot for all stations")
else:
logger.info(f"Make {obstype}-timeseries plot for {stationnames}")
# combine all dataframes
mergedf = self.combine_all_to_obsspace()
# subset to obstype
mergedf = xs_save(mergedf, obstype, level="obstype")
# Subset on stationnames
if stationnames is not None:
mergedf = mergedf[mergedf.index.get_level_values("name").isin(stationnames)]
# Subset on start and endtime
starttime = fmt_datetime_argument(
starttime, self.settings.time_settings["timezone"]
)
endtime = fmt_datetime_argument(
endtime, self.settings.time_settings["timezone"]
)
mergedf = multiindexdf_datetime_subsetting(mergedf, starttime, endtime)
# Get plot styling attributes
if title is None:
if stationnames is None:
if self._istype == "Dataset":
title = (
self.obstypes[obstype].get_orig_name() + " for all stations. "
)
elif self._istype == "Station":
title = self.obstypes[obstype].get_orig_name() + " of " + self.name
else:
title = (
self.obstypes[obstype].get_orig_name()
+ " for stations: "
+ str(stationnames)
)
# create y label
if y_label is None:
y_label = self.obstypes[obstype].get_plot_y_label()
# Make plot
ax, _colmap = timeseries_plot(
mergedf=mergedf,
title=title,
ylabel=y_label,
colorby=colorby,
show_legend=legend,
show_outliers=show_outliers,
show_filled=show_filled,
settings=self.settings,
_ax=_ax,
)
return ax
[docs]
def make_interactive_plot(
self,
obstype="temp",
save=True,
outputfile=None,
starttime=None,
endtime=None,
vmin=None,
vmax=None,
mpl_cmap_name="viridis",
radius=13,
fill_alpha=0.6,
max_fps=4,
outlier_col="red",
ok_col="black",
gap_col="orange",
fill_col="yellow",
):
"""Make interactive geospatial plot with time evolution.
This function uses the folium package to make an interactive geospatial
plot to illustrate the time evolution.
Parameters
----------
obstype : str or metobs_toolkit.Obstype, optional
The observation type to plot. The default is 'temp'.
save : bool, optional
If true, the figure will be saved as an html-file. The default is True.
outputfile : str, optional
The path of the output html-file. The figure will be saved here, if
save is True. If outputfile is not given, and save is True, than
the figure will be saved in the default outputfolder (if given).
The default is None.
starttime : datetime.datetime, optional
Specifiy the start datetime for the plot. If None is given it will
use the start datetime of the dataset, defaults to None.
endtime : datetime.datetime, optional
Specifiy the end datetime for the plot. If None is given it will
use the end datetime of the dataset, defaults to None.
vmin : numeric, optional
The value corresponding with the minimum color. If None, the
minimum of the presented observations is used. The default is None.
vmax : numeric, optional
The value corresponding with the maximum color. If None, the
maximum of the presented observations is used. The default is None.
mpl_cmap_name : str, optional
The name of the matplotlib colormap to use. The default is 'viridis'.
radius : int, optional
The radius (in pixels) of the scatters. The default is 13.
fill_alpha : float ([0;1]), optional
The alpha of the fill color for the scatters. The default is 0.6.
max_fps : int (>0), optional
The maximum allowd frames per second for the time evolution. The
default is 4.
outlier_col : str, optional
The edge color of the scatters to identify an outliers. The default is 'red'.
ok_col : str, optional
The edge color of the scatters to identify an ok observation. The default is 'black'.
gap_col : str, optional
The edge color of the scatters to identify an missing/gap
observation. The default is 'orange'.
fill_col : str, optional
The edge color of the scatters to identify a fillded observation.
The default is 'yellow'.
Returns
-------
m : folium.folium.map
The interactive folium map.
Note
-------
The figure will only appear when this is runned in notebooks. If you do
not run this in a notebook, make sure to save the html file, and open it
with a browser.
Examples
--------
.. code-block:: python
>>> import metobs_toolkit
>>>
>>> # Import data into a Dataset
>>> dataset = metobs_toolkit.Dataset()
>>> dataset.update_settings(
... input_data_file=metobs_toolkit.demo_datafile,
... input_metadata_file=metobs_toolkit.demo_metadatafile,
... template_file=metobs_toolkit.demo_template,
... )
>>> dataset.import_data_from_file()
>>>
>>> # Make default interactive geospatial plot
>>> dataset.make_interactive_plot()
"""
# Check if obstype is known
if isinstance(obstype, str):
if obstype not in self.obstypes.keys():
logger.error(
f"{obstype} is not found in the knonw observation types: {list(self.obstypes.keys())}"
)
return None
else:
obstype = self.obstypes[obstype]
if save:
if outputfile is None:
if self.settings.IO["output_folder"] is None:
logger.error(
"No outputfile is given, and there is no default outputfolder specified."
)
return None
else:
outputfile = os.path.join(
self.output_folder, "interactive_figure.html"
)
else:
# Check if outputfile has .html extension
if not outputfile.endswith(".html"):
outputfile = outputfile + ".html"
logger.warning(
f"The .hmtl extension is added to the outputfile: {outputfile}"
)
# Check if the obstype is present in the data
if obstype.name not in self.df.columns:
logger.error(f"{obstype.name} is not found in your the Dataset.")
return None
# Check if geospatial data is available
if self.metadf["lat"].isnull().any():
_sta = self.metadf[self.metadf["lat"].isnull()]["lat"]
logger.error(f"Stations without coordinates detected: {_sta}")
return None
if self.metadf["lon"].isnull().any():
_sta = self.metadf[self.metadf["lon"].isnull()]["lon"]
logger.error(f"Stations without coordinates detected: {_sta}")
return None
# Construct dataframe
combdf = self.combine_all_to_obsspace()
combdf = xs_save(combdf, obstype.name, level="obstype")
# Merge geospatial info
combgdf = combdf.merge(
self.metadf, how="left", left_on="name", right_index=True
)
# Subset on start and endtime
starttime = fmt_datetime_argument(
starttime, self.settings.time_settings["timezone"]
)
endtime = fmt_datetime_argument(
endtime, self.settings.time_settings["timezone"]
)
combgdf = multiindexdf_datetime_subsetting(combgdf, starttime, endtime)
combgdf = combgdf.reset_index()
# to gdf
combgdf = metadf_to_gdf(combgdf, crs=4326)
# Make label color mapper
label_col_map = {}
# Ok label
label_col_map["ok"] = ok_col
# outlier labels
for val in self.settings.qc["qc_checks_info"].values():
label_col_map[val["outlier_flag"]] = outlier_col
# missing labels (gaps and missing values)
for val in self.settings.gap["gaps_info"].values():
label_col_map[val["outlier_flag"]] = gap_col
# fill labels
for val in self.settings.missing_obs["missing_obs_fill_info"]["label"].values():
label_col_map[val] = fill_col
for val in self.settings.gap["gaps_fill_info"]["label"].values():
label_col_map[val] = fill_col
# make time estimation
est_seconds = combgdf.shape[0] / 2411.5 # normal laptop
logger.info(
f'The figure will take approximatly (laptop) {"{:.1f}".format(est_seconds)} seconds to make.'
)
# Making the figure
m = make_folium_html_plot(
gdf=combgdf,
variable_column="value",
var_display_name=obstype.name,
var_unit=obstype.get_standard_unit(),
label_column="label",
label_col_map=label_col_map,
vmin=vmin,
vmax=vmax,
radius=radius,
fill_alpha=fill_alpha,
mpl_cmap_name=mpl_cmap_name,
max_fps=int(max_fps),
)
if save:
logger.info(f"Saving the htlm figure at {outputfile}")
m.save(outputfile)
return m
[docs]
def make_geo_plot(
self,
variable="temp",
title=None,
timeinstance=None,
legend=True,
vmin=None,
vmax=None,
legend_title=None,
boundbox=[],
):
"""Make geospatial plot.
This functions creates a geospatial plot for a field
(observations or attributes) of all stations.
If the field is timedepending, than the timeinstance is used to plot
the field status at that datetime.
If the field is categorical than the leged will have categorical
values, else a colorbar is used.
All styling attributes are extracted from the Settings.
Parameters
----------
variable : string, optional
Fieldname to visualise. This can be an observation type or station
or 'lcz'. The default is 'temp'.
title : string, optional
Title of the figure, if None a default title is generated. The default is None.
timeinstance : datetime.datetime, optional
Datetime moment of the geospatial plot. If None, the first occuring (not Nan) record is used. The default is None.
legend : bool, optional
I True, a legend is added to the plot. The default is True.
vmin : numeric, optional
The value corresponding with the minimum color. If None, the minimum of the presented observations is used. The default is None.
vmax : numeric, optional
The value corresponding with the maximum color. If None, the maximum of the presented observations is used. The default is None.
legend_title : string, optional
Title of the legend, if None a default title is generated. The default is None.
boundbox : [lon-west, lat-south, lon-east, lat-north], optional
The boundbox to indicate the domain to plot. The elemenst are numeric.
If the list is empty, a boundbox is created automatically. The default
is [].
Returns
-------
axis : matplotlib.pyplot.geoaxes
The geoaxes of the plot is returned.
Note
--------
If a timezone unaware datetime is given as an argument, it is interpreted
as if it has the same timezone as the observations.
Examples
--------
.. code-block:: python
>>> import metobs_toolkit
>>>
>>> # Import data into a Dataset
>>> dataset = metobs_toolkit.Dataset()
>>> dataset.update_settings(
... input_data_file=metobs_toolkit.demo_datafile,
... input_metadata_file=metobs_toolkit.demo_metadatafile,
... template_file=metobs_toolkit.demo_template,
... )
>>> dataset.import_data_from_file()
>>>
>>> # Make default geospatial plot
>>> dataset.make_geo_plot()
<GeoAxes:...
"""
# Load default plot settings
# default_settings=Settings.plot_settings['spatial_geo']
# get first (Not Nan) timeinstance of the dataset if not given
timeinstance = fmt_datetime_argument(
timeinstance, self.settings.time_settings["timezone"]
)
if timeinstance is None:
timeinstance = self.df.dropna(subset=["temp"]).index[0][1]
logger.info(f"Make {variable}-geo plot at {timeinstance}")
# check coordinates if available
if self.metadf["lat"].isnull().any():
_sta = self.metadf[self.metadf["lat"].isnull()]["lat"]
logger.error(f"Stations without coordinates detected: {_sta}")
return None
if self.metadf["lon"].isnull().any():
_sta = self.metadf[self.metadf["lon"].isnull()]["lon"]
logger.error(f"Stations without coordinates detected: {_sta}")
return None
if bool(boundbox):
if len(boundbox) != 4:
logger.warning(
f"The boundbox ({boundbox}) does not contain 4 elements! The default boundbox is used!"
)
boundbox = []
# Check if LCZ if available
if variable == "lcz":
if self.metadf["lcz"].isnull().any():
_sta = self.metadf[self.metadf["lcz"].isnull()]["lcz"]
logger.warning(f"Stations without lcz detected: {_sta}")
return None
title = f"Local climate zones at {timeinstance}."
legend_title = ""
# subset to timeinstance
plotdf = xs_save(self.df, timeinstance, level="datetime")
# merge metadata
plotdf = plotdf.merge(
self.metadf, how="left", left_index=True, right_index=True
)
# titles
if title is None:
try:
title = f"{self.obstypes[variable].get_orig_name()} at {timeinstance}."
except KeyError:
title = f"{variable} at {timeinstance}."
if legend:
if legend_title is None:
legend_title = f"{self.obstypes[variable].get_standard_unit()}"
axis = geospatial_plot(
plotdf=plotdf,
variable=variable,
timeinstance=timeinstance,
title=title,
legend=legend,
legend_title=legend_title,
vmin=vmin,
vmax=vmax,
plotsettings=self.settings.app["plot_settings"],
categorical_fields=self.settings.app["categorical_fields"],
static_fields=self.settings.app["static_fields"],
display_name_mapper=self.settings.app["display_name_mapper"],
boundbox=boundbox,
)
return axis
# =============================================================================
# Gap Filling
# =============================================================================
[docs]
def get_modeldata(
self,
modelname="ERA5_hourly",
modeldata=None,
obstype="temp",
stations=None,
startdt=None,
enddt=None,
):
"""Make Modeldata for the Dataset.
Make a metobs_toolkit.Modeldata object with modeldata at the locations
of the stations present in the dataset. This Modeldata stores timeseries
of model data for each station.
Parameters
----------
modelname : str, optional
Which dataset to download timeseries from. This is only used when
no modeldata is provided. The default is 'ERA5_hourly'.
modeldata : metobs_toolkit.Modeldata, optional
Use the modelname attribute and the gee information stored in the
modeldata instance to extract timeseries.
obstype : String, optional
Name of the observationtype you want to apply gap filling on. The
modeldata must contain this observation type as well. The
default is 'temp'.
stations : string or list of strings, optional
Stationnames to subset the modeldata to. If None, all stations will be used. The default is None.
startdt : datetime.datetime, optional
Start datetime of the model timeseries. If None, the start datetime of the dataset is used. The default is None.
enddt : datetime.datetime, optional
End datetime of the model timeseries. If None, the last datetime of the dataset is used. The default is None.
Returns
-------
Modl : metobs_toolkit.Modeldata
The extracted modeldata for period and a set of stations.
Note
--------
If a timezone unaware datetime is given as an argument, it is interpreted
as if it has the same timezone as the observations.
Note
------
When extracting large amounts of data, the timeseries data will be
written to a file and saved on your google drive. In this case, you need
to provide the Modeldata with the data using the .set_model_from_csv()
method.
Examples
--------
.. code-block:: python
import metobs_toolkit
# Import data into a Dataset
dataset = metobs_toolkit.Dataset()
dataset.update_settings(
input_data_file=metobs_toolkit.demo_datafile,
input_metadata_file=metobs_toolkit.demo_metadatafile,
template_file=metobs_toolkit.demo_template,
)
dataset.import_data_from_file()
# To limit data transfer, we define a short period
import datetime
tstart = datetime.datetime(2022, 9, 5)
tend = datetime.datetime(2022, 9, 6)
# Collect ERA5 2mT timeseries at your stations
ERA5_data = dataset.get_modeldata(
modelname="ERA5_hourly",
modeldata=None,
obstype="temp",
stations=None,
startdt=tstart,
enddt=tend)
"""
if modeldata is None:
Modl = Modeldata(modelname)
else:
Modl = modeldata
modelname = Modl.modelname
# Filters
if startdt is None:
startdt = self.df.index.get_level_values("datetime").min()
else:
startdt = fmt_datetime_argument(
startdt, self.settings.time_settings["timezone"]
)
if enddt is None:
enddt = self.df.index.get_level_values("datetime").max()
else:
enddt = fmt_datetime_argument(
enddt, self.settings.time_settings["timezone"]
)
# make shure bounds include required range
Model_time_res = Modl.mapinfo[Modl.modelname]["time_res"]
startdt = startdt.floor(Model_time_res)
enddt = enddt.ceil(Model_time_res)
if stations is not None:
if isinstance(stations, str):
metadf = self.metadf.loc[[stations]]
if isinstance(stations, list):
metadf = self.metadf.iloc[self.metadf.index.isin(stations)]
else:
metadf = self.metadf
# Convert to UTC
startdt_utc = startdt.astimezone(pytz.utc)
enddt_utc = enddt.astimezone(pytz.utc)
# fill modell with data
if modelname == "ERA5_hourly":
Modl.get_ERA5_data(
metadf=metadf,
startdt_utc=startdt_utc,
enddt_utc=enddt_utc,
obstypes=obstype,
)
else:
Modl.get_gee_dataset_data(
mapname=modelname,
metadf=metadf,
startdt_utc=startdt_utc,
enddt_utc=enddt_utc,
obstypes=obstype,
)
print(
f"(When using the .set_model_from_csv() method, make sure the modelname of your Modeldata is {modelname})"
)
logger.info(
f"(When using the .set_model_from_csv() method, make sure the modelname of your Modeldata is {modelname})"
)
return Modl
[docs]
def update_gaps_and_missing_from_outliers(self, obstype="temp", n_gapsize=None):
"""Interpret the outliers as missing observations.
If there is a sequence
of these outliers for a station, larger than n_gapsize than this will
be interpreted as a gap.
The outliers are not removed.
Parameters
----------
obstype : str, optional
Use the outliers on this observation type to update the gaps and
missing timestamps. The default is 'temp'.
n_gapsize : int, optional
The minimum number of consecutive missing observations to define
as a gap. If None, n_gapsize is taken from the settings defenition
of gaps. The default is None.
Returns
-------
None.
Note
-------
Gaps and missing observations resulting from an outlier on a specific
obstype, are assumed to be gaps/missing observation for all obstypes.
Note
------
Be aware that n_gapsize is used for the current resolution of the Dataset,
this is different from the gap check applied on the inported data, if
the dataset is coarsend.
Examples
--------
.. code-block:: python
>>> import metobs_toolkit
>>>
>>> # Import data into a Dataset
>>> dataset = metobs_toolkit.Dataset()
>>> dataset.update_settings(
... input_data_file=metobs_toolkit.demo_datafile,
... input_metadata_file=metobs_toolkit.demo_metadatafile,
... template_file=metobs_toolkit.demo_template,
... )
>>> dataset.import_data_from_file()
>>> dataset.coarsen_time_resolution(freq='1h')
>>>
>>> # Apply quality control on the temperature observations
>>> dataset.apply_quality_control(obstype='temp') #Using the default QC settings
>>> dataset
Dataset instance containing:
*28 stations
*['temp', 'humidity', 'wind_speed', 'wind_direction'] observation types
*10080 observation records
*1676 records labeled as outliers
*0 gaps
*3 missing observations
*records range: 2022-09-01 00:00:00+00:00 --> 2022-09-15 23:00:00+00:00 (total duration: 14 days 23:00:00)
*time zone of the records: UTC
*Coordinates are available for all stations.
>>> # Interpret the outliers as missing/gaps
>>> dataset.update_gaps_and_missing_from_outliers(obstype='temp')
>>> dataset
Dataset instance containing:
*28 stations
*['temp', 'humidity', 'wind_speed', 'wind_direction'] observation types
*10080 observation records
*0 records labeled as outliers
*2 gaps
*1473 missing observations
*records range: 2022-09-01 00:00:00+00:00 --> 2022-09-15 23:00:00+00:00 (total duration: 14 days 23:00:00)
*time zone of the records: UTC
*Coordinates are available for all stations.
"""
if n_gapsize is None:
n_gapsize = self.settings.gap["gaps_settings"]["gaps_finder"]["gapsize_n"]
if (
not self.metadf["assumed_import_frequency"]
.eq(self.metadf["dataset_resolution"])
.all()
):
logger.info(
f"The defenition of the gapsize (n_gapsize = {n_gapsize}) \
will have another effect on the update of the gaps and missing \
timestamps because coarsening is applied and the defenition \
of the gapsize is not changed."
)
# combine to one dataframe
mergedf = self.combine_all_to_obsspace()
mergedf = xs_save(mergedf, obstype, level="obstype")
# ignore labels
possible_outlier_labels = [
vals["outlier_flag"] for vals in self.settings.qc["qc_checks_info"].values()
]
# create groups when the final label changes
persistance_filter = ((mergedf["label"].shift() != mergedf["label"])).cumsum()
grouped = mergedf.groupby(["name", persistance_filter])
# locate new gaps by size of consecutive the same final label per station
group_sizes = grouped.size()
large_groups = group_sizes[group_sizes > n_gapsize]
# find only groups with final label as an outlier
gaps = []
# new_gapsdf = pd.DataFrame()
new_gaps_idx = init_multiindex()
for group_idx in large_groups.index:
groupdf = grouped.get_group(group_idx)
group_final_label = groupdf["label"].iloc[0]
if group_final_label not in possible_outlier_labels:
# no gap candidates
continue
else:
gap = Gap(
name=groupdf.index.get_level_values("name")[0],
startdt=groupdf.index.get_level_values("datetime").min(),
enddt=groupdf.index.get_level_values("datetime").max(),
)
gaps.append(gap)
new_gaps_idx = new_gaps_idx.union(groupdf.index, sort=False)
# add all the outliers, that are not in the new gaps to the new missing obs
new_missing_obs = mergedf[mergedf["label"].isin(possible_outlier_labels)].index
new_missing_obs = new_missing_obs.drop(new_gaps_idx.to_numpy(), errors="ignore")
# to series
missing_obs_series = (
new_missing_obs.to_frame()
.reset_index(drop=True)
.set_index("name")["datetime"]
)
# Create missing obs
new_missing_collection = Missingob_collection(missing_obs_series)
# update self
self.gaps.extend(gaps)
self.missing_obs = self.missing_obs + new_missing_collection
# remove outliers that are converted to gaps
self.outliersdf = remove_gaps_from_outliers(
gaplist=gaps, outldf=self.outliersdf
)
# remove outliers that are converted to missing obs
self.outliersdf = self.missing_obs.remove_missing_from_outliers(self.outliersdf)
# =============================================================================
# Gap Filling
# =============================================================================
[docs]
def fill_gaps_automatic(
self,
modeldata,
obstype="temp",
max_interpolate_duration_str=None,
overwrite_fill=False,
):
"""Fill the gaps by using linear interpolation or debiased modeldata.
This method serves as a triage to select the gaps to be filled with
linear interpolation and those to be filled using a diurnal debias
gapfill. When the duration of a gap is smaller or equal than
max_interpolation_duration, the linear interpolation method is applied
else the debiased modeldata method.
For a detailed description of these methods, we refer to the
corresponding metobs_toolkit.Dataset.fill_gaps_linear() and
metobs_toolkit.Dataset.fill_gaps_era5().
Parameters
----------
modeldata : metobs_toolkit.Modeldata
The modeldata to use for the gapfill. This model data should the required
timeseries to fill all gaps present in the dataset.
obstype : String, optional
Name of the observationtype you want to apply gap filling on. The
modeldata must contain this observation type as well. The
default is 'temp'.
max_interpolate_duration_str : Timedelta or str, optional
Maximum duration to apply interpolation for gapfill when using the
automatic gapfill method. Gaps with longer durations will be filled
using debiased modeldata. The default is None.
overwrite_fill: bool, optional
If a gap has already filled values, the interpolation of this gap
is skipped if overwrite_fill is False. If set to True, the gapfill
values and info will be overwitten. The default is False.
Returns
-------
comb_df : pandas.DataFrame
A dataframe containing all the filled records.
Examples
--------
.. code-block:: python
import metobs_toolkit
your_dataset = metobs_toolkit.Dataset()
your_dataset.update_settings(
input_data_file=metobs_toolkit.demo_datafile, # path to the data file
input_metadata_file=metobs_toolkit.demo_metadatafile,
template_file=metobs_toolkit.demo_template,
)
# Specify the gap defenition
your_dataset.update_qc_settings(gapsize_in_records = 20)
#Update the gapsize BEFORE importing the data
your_dataset.import_data_from_file()
#Update the settings (definition of the period to calculate biases for)
your_dataset.update_gap_and_missing_fill_settings(
gap_debias_prefered_leading_period_hours=24,
gap_debias_prefered_trailing_period_hours=24,
gap_debias_minimum_leading_period_hours=6,
gap_debias_minimum_trailing_period_hours=6,
)
#(As a demonstration, we will fill the gaps of a single station. The following functions can also be
# directly applied to the dataset.)
your_station = your_dataset.get_station('vlinder05')
#Get ERA5 modeldata at the location of your stations and period.
ERA5_modeldata = your_station.get_modeldata(modelname='ERA5_hourly',
obstype='temp')
#Use the debias method to fill the gaps
gapfill_df = your_station.fill_gaps_automatic(modeldata=ERA5_modeldata,
max_interpolate_duration_str='6h', # <6 hours will be interpolated
obstype='temp')
"""
# ----------- Validate ----------------------------------------
# check if modeldata is available
if modeldata is None:
logger.warning(
"The dataset has no modeldate. Use the set_modeldata() function to add modeldata."
)
return None
# check if obstype is present in eramodel
assert (
obstype in modeldata.df.columns
), f"{obstype} is not present in the modeldate: {modeldata}"
# check if all station are present in eramodeldata
# stations = self.gaps.to_df().index.unique().to_list()
stations = list(set([gap.name for gap in self.gaps]))
assert all(
[sta in modeldata.df.index.get_level_values("name") for sta in stations]
), "Not all stations with gaps are in the modeldata!"
if max_interpolate_duration_str is None:
max_interpolate_duration_str = self.settings.gap["gaps_fill_settings"][
"automatic"
]["max_interpolation_duration_str"]
# ------------select the method to apply gapfill per gap ----------
interpolate_gaps = []
debias_gaps = []
for gap in self.gaps:
if gap.duration <= pd.to_timedelta(max_interpolate_duration_str):
interpolate_gaps.append(gap)
else:
debias_gaps.append(gap)
# 1 ---------------Fill by interpolation ---------------------
fill_settings_interp = self.settings.gap["gaps_fill_settings"]["linear"]
apply_interpolate_gaps(
gapslist=interpolate_gaps,
obsdf=self.df,
outliersdf=self.outliersdf,
dataset_res=self.metadf["dataset_resolution"],
gapfill_settings=self.settings.gap["gaps_fill_info"],
obstype=obstype,
method=fill_settings_interp["method"],
max_consec_fill=fill_settings_interp["max_consec_fill"],
overwrite_fill=overwrite_fill,
)
filldf_interp = make_gapfill_df(interpolate_gaps)
# 2 -------------- Fill by debias -----------------------------
fill_settings_debias = self.settings.gap["gaps_fill_settings"]["model_debias"]
apply_debias_era5_gapfill(
gapslist=debias_gaps,
dataset=self,
eraModelData=modeldata,
obstype=obstype,
debias_settings=fill_settings_debias,
overwrite_fill=overwrite_fill,
)
# add label column
filldf_debias = make_gapfill_df(debias_gaps)
# combine both fill df's
comb_df = concat_save([filldf_interp, filldf_debias])
# update attr
self.gapfilldf = comb_df
return comb_df
[docs]
def fill_gaps_linear(self, obstype="temp", overwrite_fill=False):
"""Fill the gaps using linear interpolation.
The gapsfilldf attribute of the Datasetinstance will be updated if
the gaps are not filled yet or if overwrite_fill is set to True.
Parameters
----------
obstype : string, optional
Fieldname to visualise. This can be an observation or station
attribute. The default is 'temp'.
overwrite_fill: bool, optional
If a gap has already filled values, the interpolation of this gap
is skipped if overwrite_fill is False. If set to True, the gapfill
values and info will be overwitten. The default is False.
Returns
-------
gapfilldf : pandas.DataFrame
A dataframe containing all the filled records.
Notes
-----
A schematic description of the linear gap fill:
1. Iterate over all gaps.
2. The gap is converted into a set of missing records (depending on the time resolution of the observations).
3. Find a leading (the last observations before the gap) record and a trailing record (the last observation after the gap).
4. By using the leading and trailing record an interpolation is applied to fill the missing records. A maximum consecutive fill threshold is applied, if exceeded the fill values are Nan's.
5. The gap is updated with the interpolated values (metobs_toolkit.Gap.gapfill_df)
Examples
--------
.. code-block:: python
>>> import metobs_toolkit
>>>
>>> # Import data into a Dataset
>>> dataset = metobs_toolkit.Dataset()
>>> dataset.update_settings(
... input_data_file=metobs_toolkit.demo_datafile,
... input_metadata_file=metobs_toolkit.demo_metadatafile,
... template_file=metobs_toolkit.demo_template,
... )
>>> dataset.import_data_from_file()
>>> dataset.coarsen_time_resolution(freq='1h')
>>>
>>> # Apply quality control on the temperature observations
>>> dataset.apply_quality_control(obstype='temp') #Using the default QC settings
>>>
>>> # Interpret the outliers as missing/gaps
>>> dataset.update_gaps_and_missing_from_outliers(obstype='temp')
>>> dataset
Dataset instance containing:
*28 stations
*['temp', 'humidity', 'wind_speed', 'wind_direction'] observation types
*10080 observation records
*0 records labeled as outliers
*2 gaps
*1473 missing observations
*records range: 2022-09-01 00:00:00+00:00 --> 2022-09-15 23:00:00+00:00 (total duration: 14 days 23:00:00)
*time zone of the records: UTC
*Coordinates are available for all stations.
>>>
>>> #Update the gapfill settings (else the defaults are used)
>>> dataset.update_gap_and_missing_fill_settings(gap_interpolation_max_consec_fill=35)
>>>
>>> # Fill the gaps
>>> dataset.fill_gaps_linear(obstype='temp')
temp temp_final_label
name datetime
vlinder05 2022-09-06 21:00:00+00:00 21.378710 gap_interpolation
2022-09-06 22:00:00+00:00 21.357419 gap_interpolation
2022-09-06 23:00:00+00:00 21.336129 gap_interpolation
2022-09-07 00:00:00+00:00 21.314839 gap_interpolation
2022-09-07 01:00:00+00:00 21.293548 gap_interpolation
2022-09-07 02:00:00+00:00 21.272258 gap_interpolation
2022-09-07 03:00:00+00:00 21.250968 gap_interpolation
2022-09-07 04:00:00+00:00 21.229677 gap_interpolation
2022-09-07 05:00:00+00:00 21.208387 gap_interpolation
2022-09-07 06:00:00+00:00 21.187097 gap_interpolation
2022-09-07 07:00:00+00:00 21.165806 gap_interpolation
2022-09-07 08:00:00+00:00 21.144516 gap_interpolation
2022-09-07 09:00:00+00:00 21.123226 gap_interpolation
2022-09-07 10:00:00+00:00 21.101935 gap_interpolation
2022-09-07 11:00:00+00:00 21.080645 gap_interpolation
2022-09-07 12:00:00+00:00 21.059355 gap_interpolation
2022-09-07 13:00:00+00:00 21.038065 gap_interpolation
2022-09-07 14:00:00+00:00 21.016774 gap_interpolation
2022-09-07 15:00:00+00:00 20.995484 gap_interpolation
2022-09-07 16:00:00+00:00 20.974194 gap_interpolation
2022-09-07 17:00:00+00:00 20.952903 gap_interpolation
2022-09-07 18:00:00+00:00 20.931613 gap_interpolation
2022-09-07 19:00:00+00:00 20.910323 gap_interpolation
2022-09-07 20:00:00+00:00 20.889032 gap_interpolation
2022-09-07 21:00:00+00:00 20.867742 gap_interpolation
2022-09-07 22:00:00+00:00 20.846452 gap_interpolation
2022-09-07 23:00:00+00:00 20.825161 gap_interpolation
2022-09-08 00:00:00+00:00 20.803871 gap_interpolation
2022-09-08 01:00:00+00:00 20.782581 gap_interpolation
2022-09-08 02:00:00+00:00 20.761290 gap_interpolation
2022-09-08 03:00:00+00:00 20.740000 gap_interpolation
2022-09-08 04:00:00+00:00 20.718710 gap_interpolation
2022-09-08 05:00:00+00:00 20.697419 gap_interpolation
2022-09-08 06:00:00+00:00 20.676129 gap_interpolation
2022-09-08 07:00:00+00:00 20.654839 gap_interpolation
>>> dataset.get_gaps_info()
Gap for vlinder05 with:...
"""
# TODO logging
fill_settings = self.settings.gap["gaps_fill_settings"]["linear"]
# fill gaps
apply_interpolate_gaps(
gapslist=self.gaps,
obsdf=self.df,
outliersdf=self.outliersdf,
dataset_res=self.metadf["dataset_resolution"],
gapfill_settings=self.settings.gap["gaps_fill_info"],
obstype=obstype,
method=fill_settings["method"],
max_consec_fill=fill_settings["max_consec_fill"],
overwrite_fill=overwrite_fill,
)
# get gapfilldf
gapfilldf = make_gapfill_df(self.gaps)
# update attr
self.gapfilldf = gapfilldf
return gapfilldf
[docs]
def fill_missing_obs_linear(self, obstype="temp"):
"""Interpolate missing observations.
Fill in the missing observation rectords using interpolation. The
missing_fill_df attribute of the Dataset will be updated.
Parameters
----------
obstype : string, optional
Fieldname to visualise. This can be an observation or station
attribute. The default is 'temp'.
Returns
-------
None.
Notes
-----
A schematic description of the linear fill of missing observations:
1. Iterate over all missing observations.
2. The missing observations are converted into a set of missing records (depending on the time resolution of the observations).
3. Find a leading (the last observations before the missing observation) record and a trailing record (the last observation after the missing observation).
4. By using the leading and trailing records, interpolation is applied to fill the missing records.
5. The missing record is updated with the interpolated values (metobs_toolkit.Gap.gapfill_df).
Examples
--------
.. code-block:: python
>>> import metobs_toolkit
>>>
>>> # Import data into a Dataset
>>> dataset = metobs_toolkit.Dataset()
>>> dataset.update_settings(
... input_data_file=metobs_toolkit.demo_datafile,
... input_metadata_file=metobs_toolkit.demo_metadatafile,
... template_file=metobs_toolkit.demo_template,
... )
>>> dataset.import_data_from_file()
>>> dataset.coarsen_time_resolution(freq='1h')
>>>
>>> # Apply quality control on the temperature observations
>>> dataset.apply_quality_control(obstype='temp') #Using the default QC settings
>>>
>>> # Interpret the outliers as missing/gaps
>>> dataset.update_gaps_and_missing_from_outliers(obstype='temp')
>>> dataset
Dataset instance containing:
*28 stations
*['temp', 'humidity', 'wind_speed', 'wind_direction'] observation types
*10080 observation records
*0 records labeled as outliers
*2 gaps
*1473 missing observations
*records range: 2022-09-01 00:00:00+00:00 --> 2022-09-15 23:00:00+00:00 (total duration: 14 days 23:00:00)
*time zone of the records: UTC
*Coordinates are available for all stations.
>>>
>>> # Fill the missing observations
>>> dataset.fill_missing_obs_linear(obstype='temp')
>>> dataset.missing_obs.get_info()
-------- Missing observations info --------
(Note: missing observations are defined on the frequency estimation of the native dataset.)
* 1473 missing observations
* For 28 stations
* Missing observations are filled with interpolate for:
temp:
temp
name datetime
vlinder01 2022-09-08 08:00:00+00:00 18.630303
2022-09-07 23:00:00+00:00 17.512121
2022-09-08 00:00:00+00:00 17.636364
2022-09-08 02:00:00+00:00 17.884848
2022-09-08 03:00:00+00:00 18.009091
...
"""
# TODO logging
fill_settings = self.settings.missing_obs["missing_obs_fill_settings"]["linear"]
fill_info = self.settings.missing_obs["missing_obs_fill_info"]
# fill missing obs
self.missing_obs.interpolate_missing(
obsdf=self.df,
resolutionseries=self.metadf["dataset_resolution"],
obstype=obstype,
method=fill_settings["method"],
)
missing_fill_df = self.missing_obs.fill_df
missing_fill_df[obstype + "_" + fill_info["label_columnname"]] = fill_info[
"label"
]["linear"]
# Update attribute
self.missing_fill_df = missing_fill_df
[docs]
def get_gaps_df(self):
"""
List all gaps into an overview dataframe.
Returns
-------
pandas.DataFrame
A DataFrame with stationnames as index, and the start, end and duretion
of the gaps as columns.
Examples
--------
.. code-block:: python
>>> import metobs_toolkit
>>>
>>> # Import data into a Dataset
>>> dataset = metobs_toolkit.Dataset()
>>> dataset.update_settings(
... input_data_file=metobs_toolkit.demo_datafile,
... input_metadata_file=metobs_toolkit.demo_metadatafile,
... template_file=metobs_toolkit.demo_template,
... )
>>> dataset.import_data_from_file()
>>> dataset.coarsen_time_resolution(freq='1h')
>>>
>>> # Apply quality control on the temperature observations
>>> dataset.apply_quality_control(obstype='temp') #Using the default QC settings
>>>
>>> # Interpret the outliers as missing/gaps
>>> dataset.update_gaps_and_missing_from_outliers(obstype='temp')
>>> dataset
Dataset instance containing:
*28 stations
*['temp', 'humidity', 'wind_speed', 'wind_direction'] observation types
*10080 observation records
*0 records labeled as outliers
*2 gaps
*1473 missing observations
*records range: 2022-09-01 00:00:00+00:00 --> 2022-09-15 23:00:00+00:00 (total duration: 14 days 23:00:00)
*time zone of the records: UTC
*Coordinates are available for all stations.
>>> dataset.get_gaps_df()
start_gap end_gap duration
name
vlinder05 2022-09-06 21:00:00+00:00 2022-09-13 06:00:00+00:00 6 days 09:00:00
vlinder05 2022-09-13 20:00:00+00:00 2022-09-15 23:00:00+00:00 2 days 03:00:00
"""
return gaps_to_df(self.gaps)
[docs]
def get_gaps_info(self):
"""Print out detailed information of the gaps.
Returns
-------
None.
Examples
--------
.. code-block:: python
>>> import metobs_toolkit
>>>
>>> # Import data into a Dataset
>>> dataset = metobs_toolkit.Dataset()
>>> dataset.update_settings(
... input_data_file=metobs_toolkit.demo_datafile,
... input_metadata_file=metobs_toolkit.demo_metadatafile,
... template_file=metobs_toolkit.demo_template,
... )
>>> dataset.import_data_from_file()
>>> dataset.coarsen_time_resolution(freq='1h')
>>>
>>> # Apply quality control on the temperature observations
>>> dataset.apply_quality_control(obstype='temp') #Using the default QC settings
>>>
>>> # Interpret the outliers as missing/gaps
>>> dataset.update_gaps_and_missing_from_outliers(obstype='temp')
>>> dataset
Dataset instance containing:
*28 stations
*['temp', 'humidity', 'wind_speed', 'wind_direction'] observation types
*10080 observation records
*0 records labeled as outliers
*2 gaps
*1473 missing observations
*records range: 2022-09-01 00:00:00+00:00 --> 2022-09-15 23:00:00+00:00 (total duration: 14 days 23:00:00)
*time zone of the records: UTC
*Coordinates are available for all stations.
>>> dataset.get_gaps_info()
Gap for vlinder05 with:
---- Gap info -----
(Note: gaps are defined on the frequency estimation of the native dataset.)
* Start gap: 2022-09-06 21:00:00+00:00
* End gap: 2022-09-13 06:00:00+00:00
* Duration gap: 6 days 09:00:00
---- Gap fill info -----
(No gapfill applied)
Gap for vlinder05 with:
---- Gap info -----
(Note: gaps are defined on the frequency estimation of the native dataset.)
* Start gap: 2022-09-13 20:00:00+00:00
* End gap: 2022-09-15 23:00:00+00:00
* Duration gap: 2 days 03:00:00
---- Gap fill info -----
(No gapfill applied)
"""
if bool(self.gaps):
# there are gaps
for gap in self.gaps:
gap.get_info()
else:
# no gaps
print("There are no gaps.")
[docs]
def get_missing_obs_info(self):
"""Print out detailed information of the missing observations.
Returns
-------
None.
Examples
--------
.. code-block:: python
>>> import metobs_toolkit
>>>
>>> # Import data into a Dataset
>>> dataset = metobs_toolkit.Dataset()
>>> dataset.update_settings(
... input_data_file=metobs_toolkit.demo_datafile,
... input_metadata_file=metobs_toolkit.demo_metadatafile,
... template_file=metobs_toolkit.demo_template,
... )
>>> dataset.import_data_from_file()
>>> dataset.coarsen_time_resolution(freq='1h')
>>>
>>> # Apply quality control on the temperature observations
>>> dataset.apply_quality_control(obstype='temp') #Using the default QC settings
>>>
>>> # Interpret the outliers as missing/gaps
>>> dataset.update_gaps_and_missing_from_outliers(obstype='temp')
>>> dataset
Dataset instance containing:
*28 stations
*['temp', 'humidity', 'wind_speed', 'wind_direction'] observation types
*10080 observation records
*0 records labeled as outliers
*2 gaps
*1473 missing observations
*records range: 2022-09-01 00:00:00+00:00 --> 2022-09-15 23:00:00+00:00 (total duration: 14 days 23:00:00)
*time zone of the records: UTC
*Coordinates are available for all stations.
>>> dataset.get_missing_obs_info()
-------- Missing observations info --------
(Note: missing observations are defined on the frequency estimation of the native dataset.)
* 1473 missing observations
* For 28 stations
* The missing observations are not filled.
(More details on the missing observation can be found in the .series and .fill_df attributes.)
"""
# empty obs protector in the .get_info method.
self.missing_obs.get_info()
[docs]
def get_analysis(self, add_gapfilled_values=False):
"""Create an Analysis instance from the Dataframe.
Parameters
----------
add_gapfilled_values : bool, optional
If True, all filled values (from gapfill and missing observation fill),
are added to the analysis records aswell. The default is False.
Returns
-------
metobs_toolkit.Analysis
The Analysis instance of the Dataset.
Examples
--------
.. code-block:: python
>>> import metobs_toolkit
>>>
>>> # Import data into a Dataset
>>> dataset = metobs_toolkit.Dataset()
>>> dataset.update_settings(
... input_data_file=metobs_toolkit.demo_datafile,
... input_metadata_file=metobs_toolkit.demo_metadatafile,
... template_file=metobs_toolkit.demo_template,
... )
>>> dataset.import_data_from_file()
>>> dataset.coarsen_time_resolution(freq='1h')
>>>
>>> # Create an Analysis from the dataset
>>> analysis = dataset.get_analysis()
>>> analysis
Analysis instance containing:
*28 stations
*['temp', 'humidity', 'wind_speed', 'wind_direction'] observation types
*10080 observation records
*Coordinates are available for all stations.
<BLANKLINE>
*records range: 2022-09-01 00:00:00+00:00 --> 2022-09-15 23:00:00+00:00 (total duration: 14 days 23:00:00) *Coordinates are available for all stations.
<BLANKLINE>
"""
# combine all to obsspace and include gapfill
if add_gapfilled_values:
mergedf = self.combine_all_to_obsspace()
# gapsfilled labels
gapfill_settings = self.settings.gap["gaps_fill_info"]
gapfilllabels = [val for val in gapfill_settings["label"].values()]
# missingfilled labels
missingfill_settings = self.settings.missing_obs["missing_obs_fill_info"]
missingfilllabels = [val for val in missingfill_settings["label"].values()]
# get all labels
fill_labels = gapfilllabels.copy()
fill_labels.extend(missingfilllabels)
fill_labels.append("ok")
df = mergedf[mergedf["label"].isin(fill_labels)]
df = df[["value"]]
df = df.unstack(level="obstype")
df = df.droplevel(level=0, axis=1)
else:
df = self.df
return Analysis(
obsdf=df,
metadf=self.metadf,
settings=self.settings,
obstypes=self.obstypes,
)
[docs]
def fill_gaps_era5(
self, modeldata, method="debias", obstype="temp", overwrite_fill=False
):
"""Fill the gaps using a diurnal debiased modeldata approach.
Parameters
----------
modeldata : metobs_toolkit.Modeldata
The modeldata to use for the gapfill. This model data should the required
timeseries to fill all gaps present in the dataset.
method : 'debias', optional
Specify which method to use. The default is 'debias'.
obstype : String, optional
Name of the observationtype you want to apply gap filling on. The
modeldata must contain this observation type as well. The
default is 'temp'.
overwrite_fill: bool, optional
If a gap has already filled values, the interpolation of this gap
is skipped if overwrite_fill is False. If set to True, the gapfill
values and info will be overwitten. The default is False.
Returns
-------
Gapfilldf : pandas.DataFrame
A dataframe containing all gap filled values and the use method.
Notes
-----
A schematic description of the fill_gaps_era5 method:
1. Modeldata is converted to the timezone of the observations.
2. Iterate over all gaps.
* The gap is converted into a set of missing records (depending on the time resolution of the observations).
* Find a leading and trailing period. These periods are a subset
of observations respectively before and after the gap. The size
of these subsets is set by a target size (in records) and a minimum
size (in records). If the subset of observations is smaller than
the corresponding minimum size, the gap cannot be filled.
* Modeldata, for the corresponding station and observation type, is extracted for the leading and trailing period.
* By comparing the model data with the observations of the
leading and trailing period, and grouping all records to their
timestamp (i.g. diurnal categories), biasses are computed.
* Modeldata for the missing records is extracted.
* Weights ([0;1]) are computed for each gap record, representing
the normalized distance (in time), to the beginning and end of
the gap.
* The modeldata at the missing records is then corrected by
a weighted sum of the leading and trailing biases at the
corresponding timestamp. In general, this means that the diurnal
trend of the observations is restored as well as possible.
3. The gap is updated with the interpolated values (metobs_toolkit.Gap.gapfill_df)
Note
-------
A scientific publication on the performance of this technique is expected.
Examples
--------
.. code-block:: python
import metobs_toolkit
your_dataset = metobs_toolkit.Dataset()
your_dataset.update_settings(
input_data_file=metobs_toolkit.demo_datafile, # path to the data file
input_metadata_file=metobs_toolkit.demo_metadatafile,
template_file=metobs_toolkit.demo_template,
)
# Specify the gap defenition
your_dataset.update_qc_settings(gapsize_in_records = 20)
#Update the gapsize BEFORE importing the data
your_dataset.import_data_from_file()
#Update the settings (definition of the period to calculate biases for)
your_dataset.update_gap_and_missing_fill_settings(
gap_debias_prefered_leading_period_hours=24,
gap_debias_prefered_trailing_period_hours=24,
gap_debias_minimum_leading_period_hours=6,
gap_debias_minimum_trailing_period_hours=6,
)
#(As a demonstration, we will fill the gaps of a single station. The following functions can also be
# directly applied to the dataset.)
your_station = your_dataset.get_station('vlinder05')
#Get ERA5 modeldata at the location of your stations and period.
ERA5_modeldata = your_station.get_modeldata(modelname='ERA5_hourly',
obstype='temp')
#Use the debias method to fill the gaps
gapfill_df = your_station.fill_gaps_era5(modeldata=ERA5_modeldata,
obstype='temp')
"""
# check if modeldata is available
if modeldata is None:
logger.warning(
"The dataset has no modeldate. Use the set_modeldata() function to add modeldata."
)
return None
# check if obstype is present in eramodel
assert (
obstype in modeldata.df.columns
), f"{obstype} is not present in the modeldate: {modeldata}"
# check if all station are present in eramodeldata
# stations = self.gaps.to_df().index.unique().to_list()
stations = list(set([gap.name for gap in self.gaps]))
assert all(
[sta in modeldata.df.index.get_level_values("name") for sta in stations]
), "Not all stations with gaps are in the modeldata!"
if method == "debias":
fill_settings_debias = self.settings.gap["gaps_fill_settings"][
"model_debias"
]
apply_debias_era5_gapfill(
gapslist=self.gaps,
dataset=self,
eraModelData=modeldata,
obstype=obstype,
debias_settings=fill_settings_debias,
overwrite_fill=overwrite_fill,
)
# get fill df
filldf = make_gapfill_df(self.gaps)
else:
sys.exit(f"{method} not implemented yet")
# update attribute
self.gapfilldf = filldf
return filldf
[docs]
def write_to_csv(
self,
obstype=None,
filename=None,
include_outliers=True,
include_fill_values=True,
add_final_labels=True,
use_tlk_obsnames=True,
overwrite_outliers_by_gaps_and_missing=True,
seperate_metadata_file=True,
):
"""Write Dataset to a csv file.
Write the dataset to a file where the observations, metadata and
(if available) the quality labels per observation type are merged
together.
A final qualty control label for each
quality-controlled-observation type can be added in the outputfile.
The file will be written to the outputfolder specified in the settings.
Parameters
----------
obstype : string, optional
Specify an observation type to subset all observations to. If None,
all available observation types are written to file. The default is
None.
filename : string, optional
The name of the output csv file. If none, a standard-filename
is generated based on the period of data. The default is None.
include_outliers : bool, optional
If True, the outliers will be present in the csv file. The default is True.
include_fill_values : bool, optional
If True, the filled gap and missing observation values will be
present in the csv file. The default is True.
add_final_labels : bool, optional
If True, a column is added containing the final label of an observation. The default is True.
use_tlk_obsnames : bool, optional
If True, the standard naming of the metobs_toolkit is used, else
the original names for obstypes is used. The default is True.
overwrite_outliers_by_gaps_and_missing : bool, optional
If the gaps and missing observations are updated using outliers,
interpret these records as gaps/missing outliers if True. Else these
will be interpreted as outliers. The default is True.
seperate_metadata_file : bool, optional
If true, the metadat is written to a seperate file, else the metadata
is merged to the observation in one file. The default is True.
Returns
-------
None.
Examples
--------
.. code-block:: python
>>> import metobs_toolkit
>>> import os
>>>
>>> # Import data into a Dataset
>>> dataset = metobs_toolkit.Dataset()
>>> dataset.update_settings(
... input_data_file=metobs_toolkit.demo_datafile,
... input_metadata_file=metobs_toolkit.demo_metadatafile,
... template_file=metobs_toolkit.demo_template)
>>>
>>> dataset.import_data_from_file()
>>>
>>> # Save dataset to a .csv file
>>> dataset.update_settings(output_folder = os.getcwd())
>>> dataset.write_to_csv(filename='your_saved_table.csv')
write metadata to file: ...
write dataset to file: ...
"""
logger.info("Writing the dataset to a csv file")
assert (
not self.settings.IO["output_folder"] is None
), "Specify Settings.output_folder in order to export a csv."
assert os.path.isdir(
self.settings.IO["output_folder"]
), f'The outputfolder: \
{self.settings.IO["output_folder"]} is not found. '
# combine all dataframes
mergedf = self.combine_all_to_obsspace(
overwrite_outliers_by_gaps_and_missing=overwrite_outliers_by_gaps_and_missing
) # with outliers
# Unstack mergedf
# remove duplicates
mergedf = mergedf[~mergedf.index.duplicated(keep="first")]
# drop outliers if required
if not include_outliers:
outlier_labels = [
var["outlier_flag"] for var in self.settings.qc["qc_checks_info"]
]
mergedf = mergedf[~mergedf["label"].isin(outlier_labels)]
# drop fill values if required
if not include_fill_values:
fill_labels = [
"gap fill",
"missing observation fill",
] # toolkit representation labels
mergedf = mergedf[~mergedf["toolkit_representation"].isin(fill_labels)]
if obstype is not None:
mergedf = xs_save(mergedf, obstype, level="obstype", drop_level=False)
# Map obstypes columns
if not use_tlk_obsnames:
mapper = {
col: self.obstypes[col].get_orig_name() for col in self.obstypes.keys()
}
mergedf = mergedf.reset_index()
mergedf["new_names"] = mergedf["obstype"].map(mapper)
mergedf = mergedf.drop(columns=["obstype"])
mergedf = mergedf.rename(columns={"new_names": "obstype"})
mergedf = mergedf.set_index(["name", "datetime", "obstype"])
mergedf = mergedf.unstack("obstype")
# to one level for the columns
mergedf.columns = [" : ".join(col).strip() for col in mergedf.columns.values]
# columns to write
write_dataset_to_csv(
df=mergedf,
metadf=self.metadf,
filename=filename,
outputfolder=self.settings.IO["output_folder"],
seperate_metadata_file=seperate_metadata_file,
)
# =============================================================================
# Quality control
# =============================================================================
[docs]
def apply_quality_control(
self,
obstype="temp",
gross_value=True,
persistance=True,
repetitions=True,
step=True,
window_variation=True,
):
"""Apply quality control methods to the dataset.
The default settings are used, and can be changed in the
settings_files/qc_settings.py
The checks are performed in a sequence: gross_vallue -->
persistance --> ..., Outliers by a previous check are ignored in the
following checks!
The dataset is updated inline.
Parameters
----------
obstype : String, optional
Name of the observationtype you want to apply the checks on. The
default is 'temp'.
gross_value : Bool, optional
If True the gross_value check is applied if False not. The default
is True.
persistance : Bool, optional
If True the persistance check is applied if False not. The default
is True.. The default is True.
repetition : Bool, optional
If True the repetations check is applied if False not. The default
is True.
step : Bool, optional
If True the step check is applied if False not. The default is True.
window_variation : Bool, optional
If True the window_variation check is applied if False not. The
default is True.
Returns
---------
None.
Notes
-----
A schematic description of the quality control checks.
Gross value check
==================
This check looks for outliers based on unrealistic values
1. Find observations that exceed a minimum and maximum value threshold.
2. These observations are labeled as outliers.
Persistence check
=================
Test observations to change over a specific period.
1. Find the stations that have a maximum assumed observation frequency
that does not exceed the minimum number of records for moving window
size. The window size is defined by a duration.
2. Subset to those stations.
3. For each station, a moving window scan is applied that validates if
there is variation in the observations (NaN's are excluded). The
validation is only applied when a sufficient amount of records are
found in the window specified by a threshold.
4. After the scan, all records found in the windows without variation
are labeled as outliers.
Repetitions check
=================
Test if observation changes after a number of records.
1. For each station, make a group of consecutive records for which
the values do not change.
2. Filter those groups that have more records than the maximum valid
repetitions.
3. All the records in these groups are labeled as outliers
Note
-----
The repetitions check is similar to the persistence check, but not identical.
The persistence check uses thresholds that are meteorologically based (i.g. the moving window is defined by a duration),
in contrast to the repetitions check whose thresholds are instrumentally based (i.g. the "window" is defined by a number of records.)
Step check
============
Test if observations do not produce unphysical spikes in time series.
1. Iterate over all the stations.
2. Get the observations of the stations (i.g. drop the previously labeled outliers represented by NaN's).
3. Find the observations for which:
* The increase between two consecutive records is larger than the
threshold. This threshold is defined by a maximum increase per second
multiplied by the timedelta (in seconds) between the consecutive
records.
* Similar filter for a decrease.
4. The found observations are labeled as outliers.
Note
-----
In general, for temperatures, the decrease threshold is set less stringent than the increase
threshold. This is because a temperature drop is meteorologycally more
common than a sudden increase which is often the result of a radiation error.
Window Variation check
=======================
Test if the variation is found in a moving window.
1. Find the stations that have a maximum assumed observation frequency
that does not exceed the minimum number of records for moving window
size. The window size is defined by a duration.
2. Compute the maximum increase and decrease thresholds for a window.
This is done by multiplying the maximum increase per second by the
window size in seconds.
3. For each station, a moving window scan is applied that validates if
the maximum increase/decrease thresholds are exceeded. This is done
by comparison of the minimum and maximum values inside the window. The
validation is only applied when a sufficient amount of records are
found in the window specified by a threshold.
4. After the scan, *all* records found in the window that exceed one
of these thresholds are labeled as outliers.
Examples
--------
.. code-block:: python
>>> import metobs_toolkit
>>>
>>> # Import data into a Dataset
>>> dataset = metobs_toolkit.Dataset()
>>> dataset.update_settings(
... input_data_file=metobs_toolkit.demo_datafile,
... input_metadata_file=metobs_toolkit.demo_metadatafile,
... template_file=metobs_toolkit.demo_template,
... )
>>> dataset.import_data_from_file()
>>> dataset.coarsen_time_resolution(freq='1h')
>>>
>>> #Update some temperature QC settings
>>> dataset.update_qc_settings(obstype='temp',
... gross_value_max_value=42.,
... persis_time_win_to_check='4h',
... buddy_min_std = 1.5)
>>> # Apply quality control on the temperature observations
>>> dataset.apply_quality_control(obstype='temp')
>>> dataset
Dataset instance containing:
*28 stations
*['temp', 'humidity', 'wind_speed', 'wind_direction'] observation types
*10080 observation records
*1676 records labeled as outliers
*0 gaps
*3 missing observations
*records range: 2022-09-01 00:00:00+00:00 --> 2022-09-15 23:00:00+00:00 (total duration: 14 days 23:00:00)
*time zone of the records: UTC
*Coordinates are available for all stations.
"""
if repetitions:
apliable = _can_qc_be_applied(self, obstype, "repetitions")
if apliable:
logger.info("Applying repetitions check.")
obsdf, outl_df = repetitions_check(
obsdf=self.df,
obstype=obstype,
checks_info=self.settings.qc["qc_checks_info"],
checks_settings=self.settings.qc["qc_check_settings"],
)
# update the dataset and outliers
self.df = obsdf
if not outl_df.empty:
self.outliersdf = concat_save([self.outliersdf, outl_df])
# add this check to the applied checks
self._applied_qc = concat_save(
[
self._applied_qc,
conv_applied_qc_to_df(
obstypes=obstype, ordered_checknames="repetitions"
),
],
ignore_index=True,
)
if gross_value:
apliable = _can_qc_be_applied(self, obstype, "gross_value")
if apliable:
logger.info("Applying gross value check.")
obsdf, outl_df = gross_value_check(
obsdf=self.df,
obstype=obstype,
checks_info=self.settings.qc["qc_checks_info"],
checks_settings=self.settings.qc["qc_check_settings"],
)
# update the dataset and outliers
self.df = obsdf
if not outl_df.empty:
self.outliersdf = concat_save([self.outliersdf, outl_df])
# add this check to the applied checks
self._applied_qc = concat_save(
[
self._applied_qc,
conv_applied_qc_to_df(
obstypes=obstype, ordered_checknames="gross_value"
),
],
ignore_index=True,
)
if persistance:
apliable = _can_qc_be_applied(self, obstype, "persistance")
if apliable:
logger.info("Applying persistance check.")
obsdf, outl_df = persistance_check(
station_frequencies=self.metadf["dataset_resolution"],
obsdf=self.df,
obstype=obstype,
checks_info=self.settings.qc["qc_checks_info"],
checks_settings=self.settings.qc["qc_check_settings"],
)
# update the dataset and outliers
self.df = obsdf
if not outl_df.empty:
self.outliersdf = concat_save([self.outliersdf, outl_df])
# add this check to the applied checks
self._applied_qc = concat_save(
[
self._applied_qc,
conv_applied_qc_to_df(
obstypes=obstype, ordered_checknames="persistance"
),
],
ignore_index=True,
)
if step:
apliable = _can_qc_be_applied(self, obstype, "step")
if apliable:
logger.info("Applying step-check.")
obsdf, outl_df = step_check(
obsdf=self.df,
obstype=obstype,
checks_info=self.settings.qc["qc_checks_info"],
checks_settings=self.settings.qc["qc_check_settings"],
)
# update the dataset and outliers
self.df = obsdf
if not outl_df.empty:
self.outliersdf = concat_save([self.outliersdf, outl_df])
# add this check to the applied checks
self._applied_qc = concat_save(
[
self._applied_qc,
conv_applied_qc_to_df(
obstypes=obstype, ordered_checknames="step"
),
],
ignore_index=True,
)
if window_variation:
apliable = _can_qc_be_applied(self, obstype, "window_variation")
if apliable:
logger.info("Applying window variation-check.")
obsdf, outl_df = window_variation_check(
station_frequencies=self.metadf["dataset_resolution"],
obsdf=self.df,
obstype=obstype,
checks_info=self.settings.qc["qc_checks_info"],
checks_settings=self.settings.qc["qc_check_settings"],
)
# update the dataset and outliers
self.df = obsdf
if not outl_df.empty:
self.outliersdf = concat_save([self.outliersdf, outl_df])
# add this check to the applied checks
self._applied_qc = concat_save(
[
self._applied_qc,
conv_applied_qc_to_df(
obstypes=obstype,
ordered_checknames="window_variation",
),
],
ignore_index=True,
)
self._qc_checked_obstypes.append(obstype)
self._qc_checked_obstypes = list(set(self._qc_checked_obstypes))
self.outliersdf = self.outliersdf.sort_index()
[docs]
def apply_buddy_check(
self,
obstype="temp",
use_constant_altitude=False,
haversine_approx=True,
metric_epsg="31370",
):
"""Apply the buddy check on the observations.
The buddy check compares an observation against its neighbours (i.e.
buddies). The check looks for buddies in a neighbourhood specified by
a certain radius. The buddy check flags observations if the
(absolute value of the) difference between the observations and the
average of the neighbours normalized by the standard deviation in the
circle is greater than a predefined threshold.
This check is based on the buddy check from titanlib. Documentation on
the titanlib buddy check can be found
`here <https://github.com/metno/titanlib/wiki/Buddy-check>`_.
The observation and outliers attributes will be updated accordingly.
Parameters
----------
obstype : String, optional
Name of the observationtype you want to apply the checks on. The
default is 'temp'.
use_constant_altitude : bool, optional
Use a constant altitude for all stations. The default is False.
haversine_approx : bool, optional
Use the haversine approximation (earth is a sphere) to calculate
distances between stations. The default is True.
metric_epsg : str, optional
EPSG code for the metric CRS to calculate distances in. Only used when
haversine approximation is set to False. Thus becoming a better
distance approximation but not global applicable The default is '31370'
(which is suitable for Belgium).
Returns
-------
None.
Notes
-----
A schematic step-by-step description of the buddy check:
1. A distance matrix is constructed for all inter distances between the stations. This is done using the haversine approximation, or by first converting the Coordinate Reference System (CRS) to a metric one, specified by an EPSG code.
2. A set of all (spatial) buddies per station is created by filtering out all stations that are too far.
3. The buddies are further filtered based on altitude differences with respect to the reference station.
4. For each station:
* Observations of buddies are extracted from all observations.
* These observations are corrected for altitude differences by assuming a constant lapse rate.
* For each reference record, the mean, standard deviation (std), and sample size of the corrected buddies’ observations are computed.
* If the std is lower than the minimum std, it is replaced by the minimum std.
* Chi values are calculated for all reference records.
* If the Chi value is larger than the std_threshold, the record is accepted, otherwise it is marked as an outlier.
Examples
--------
.. code-block:: python
>>> import metobs_toolkit
>>>
>>> # Import data into a Dataset
>>> dataset = metobs_toolkit.Dataset()
>>> dataset.update_settings(
... input_data_file=metobs_toolkit.demo_datafile,
... input_metadata_file=metobs_toolkit.demo_metadatafile,
... template_file=metobs_toolkit.demo_template,
... )
>>> dataset.import_data_from_file()
>>> dataset.coarsen_time_resolution(freq='1h')
>>>
>>> #Update some temperature QC settings
>>> dataset.update_qc_settings(obstype='temp',
... buddy_min_std=1.5,
... buddy_threshold=3.2)
>>> # Apply buddy check on the temperature observations
>>> dataset.apply_buddy_check(obstype='temp',
... use_constant_altitude=True)
>>> dataset
Dataset instance containing:
*28 stations
*['temp', 'humidity', 'wind_speed', 'wind_direction'] observation types
*10080 observation records
*69 records labeled as outliers
*0 gaps
*3 missing observations
*records range: 2022-09-01 00:00:00+00:00 --> 2022-09-15 23:00:00+00:00 (total duration: 14 days 23:00:00)
*time zone of the records: UTC
*Coordinates are available for all stations.
"""
logger.info("Applying the toolkit buddy check")
checkname = "buddy_check"
# 1. coordinates are available?
if self.metadf["lat"].isnull().any():
logger.warning(
f"Not all coordinates are available, the {checkname} cannot be executed!"
)
return
if self.metadf["lon"].isnull().any():
logger.warning(
f"Not all coordinates are available, the {checkname} cannot be executed!"
)
return
# set constant altitude if needed:
# if altitude is already available, save it to restore it after this check
restore_altitude = False
if use_constant_altitude:
if "altitulde" in self.metadf.columns:
self.metadf["altitude_backup"] = self.metadf["altitude"]
restore_altitude = True
self.metadf["altitude"] = 2.0 # absolut value does not matter
# 2. altitude available?
if (not use_constant_altitude) & ("altitude" not in self.metadf.columns):
logger.warning(
f"The altitude is not known for all stations. The {checkname} cannot be executed!"
)
logger.info(
'(To resolve this error you can: \n *Use the Dataset.get_altitude() method \n *Set use_constant_altitude to True \n update the "altitude" column in the metadf attribute of your Dataset.'
)
return
if (not use_constant_altitude) & (self.metadf["altitude"].isnull().any()):
logger.warning(
f"The altitude is not known for all stations. The {checkname} cannot be executed!"
)
logger.info(
'(To resolve this error you can: \n *Use the Dataset.get_altitude() method \n *Set use_constant_altitude to True \n *Update the "altitude" column in the metadf attribute of your Dataset.)'
)
return
apliable = _can_qc_be_applied(self, obstype, checkname)
if apliable:
buddy_set = self.settings.qc["qc_check_settings"][checkname][obstype]
outl_flag = self.settings.qc["qc_checks_info"][checkname]["outlier_flag"]
obsdf, outliersdf = toolkit_buddy_check(
obsdf=self.df,
metadf=self.metadf,
obstype=obstype,
buddy_radius=buddy_set["radius"],
min_sample_size=buddy_set["num_min"],
max_alt_diff=buddy_set["max_elev_diff"],
min_std=buddy_set["min_std"],
std_threshold=buddy_set["threshold"],
metric_epsg=metric_epsg,
lapserate=buddy_set["elev_gradient"],
outl_flag=outl_flag,
haversine_approx=haversine_approx,
)
# update the dataset and outliers
self.df = obsdf
if not outliersdf.empty:
self.outliersdf = concat_save([self.outliersdf, outliersdf])
# add this check to the applied checks
self._applied_qc = concat_save(
[
self._applied_qc,
conv_applied_qc_to_df(
obstypes=obstype, ordered_checknames=checkname
),
],
ignore_index=True,
)
else:
logger.warning(
f"The {checkname} can NOT be applied on {obstype} because it was already applied on this observation type!"
)
# Revert artificial data that has been added if needed
if restore_altitude: # altitude was overwritten, thus revert it
self.metadf["altitude"] = self.metadf["altitude_backup"]
self.metadf = self.metadf.drop(columns=["altitude_backup"])
elif use_constant_altitude:
# when no alitude was available apriori, remove the fake constant altitude column
self.metadf = self.metadf.drop(columns=["altitude"])
[docs]
def apply_titan_buddy_check(self, obstype="temp", use_constant_altitude=False):
"""Apply the TITAN buddy check on the observations.
The buddy check compares an observation against its neighbours (i.e. buddies). The check looks for
buddies in a neighbourhood specified by a certain radius. The buddy check flags observations if the
(absolute value of the) difference between the observations and the average of the neighbours
normalized by the standard deviation in the circle is greater than a predefined threshold.
See the `titanlib documentation on the buddy check <https://github.com/metno/titanlib/wiki/Buddy-check>`_
for futher details.
The observation and outliers attributes will be updated accordingly.
Parameters
----------
obstype : String, optional
Name of the observationtype you want to apply the checks on. The
default is 'temp'.
use_constant_altitude : bool, optional
Use a constant altitude for all stations. The default is False.
Returns
-------
None.
Note
-------
To update the check settings, use the update_titan_qc_settings method
of the Dataset class.
Warning
--------
To use this method, you must install titanlib. Windows users must have
a c++ compiler installed. See the titanlib documentation: https://github.com/metno/titanlib/wiki/Installation.
Examples
--------
.. code-block:: python
>>> import metobs_toolkit
>>>
>>> # Import data into a Dataset
>>> dataset = metobs_toolkit.Dataset()
>>> dataset.update_settings(
... input_data_file=metobs_toolkit.demo_datafile,
... input_metadata_file=metobs_toolkit.demo_metadatafile,
... template_file=metobs_toolkit.demo_template,
... )
>>> dataset.import_data_from_file()
>>> dataset.coarsen_time_resolution(freq='1h')
>>>
>>> #Update some temperature QC settings
>>> dataset.update_titan_qc_settings(obstype='temp',
... buddy_min_std=1.5,
... buddy_threshold=3.2,
... buddy_num_min=5)
buddy num min for the TITAN buddy check updated: 2--> 5
buddy threshold for the TITAN buddy check updated: 1.5--> 3.2
buddy min std for the TITAN buddy check updated: 1.0--> 1.5
>>> # Apply buddy check on the temperature observations
>>> dataset.apply_titan_buddy_check(obstype='temp',
... use_constant_altitude=True)
>>> dataset
Dataset instance containing:
*28 stations
*['temp', 'humidity', 'wind_speed', 'wind_direction'] observation types
*10080 observation records
*35 records labeled as outliers
*0 gaps
*3 missing observations
*records range: 2022-09-01 00:00:00+00:00 --> 2022-09-15 23:00:00+00:00 (total duration: 14 days 23:00:00)
*time zone of the records: UTC
*Coordinates are available for all stations.
"""
logger.info("Applying the titan buddy check")
try:
import titanlib
# Add version restrictions??
except ModuleNotFoundError:
logger.warning(
"Titanlib is not installed, install it manually if you want to use this functionallity."
)
return
checkname = "titan_buddy_check"
# 1. coordinates are available?
if self.metadf["lat"].isnull().any():
logger.warning(
f"Not all coordinates are available, the {checkname} cannot be executed!"
)
return
if self.metadf["lon"].isnull().any():
logger.warning(
f"Not all coordinates are available, the {checkname} cannot be executed!"
)
return
# set constant altitude if needed:
# if altitude is already available, save it to restore it after this check
restore_altitude = False
if use_constant_altitude:
if "altitulde" in self.metadf.columns:
self.metadf["altitude_backup"] = self.metadf["altitude"]
restore_altitude = True
self.metadf["altitude"] = 2.0 # absolut value does not matter
# 2. altitude available?
if (not use_constant_altitude) & ("altitude" not in self.metadf.columns):
logger.warning(
f"The altitude is not known for all stations. The {checkname} cannot be executed!"
)
logger.info(
'(To resolve this error you can: \n *Use the Dataset.get_altitude() method \n *Set use_constant_altitude to True \n update the "altitude" column in the metadf attribute of your Dataset.'
)
return
if (not use_constant_altitude) & (self.metadf["altitude"].isnull().any()):
logger.warning(
f"The altitude is not known for all stations. The {checkname} cannot be executed!"
)
logger.info(
'(To resolve this error you can: \n *Use the Dataset.get_altitude() method \n *Set use_constant_altitude to True \n *Update the "altitude" column in the metadf attribute of your Dataset.)'
)
return
apliable = _can_qc_be_applied(self, obstype, checkname)
if apliable:
obsdf, outliersdf = titan_buddy_check(
obsdf=self.df,
metadf=self.metadf,
obstype=obstype,
checks_info=self.settings.qc["qc_checks_info"],
checks_settings=self.settings.qc["titan_check_settings"][checkname][
obstype
],
titan_specific_labeler=self.settings.qc["titan_specific_labeler"][
checkname
],
)
# update the dataset and outliers
self.df = obsdf
if not outliersdf.empty:
self.outliersdf = concat_save([self.outliersdf, outliersdf])
# add this check to the applied checks
self._applied_qc = concat_save(
[
self._applied_qc,
conv_applied_qc_to_df(
obstypes=obstype, ordered_checknames=checkname
),
],
ignore_index=True,
)
else:
logger.warning(
f"The {checkname} can NOT be applied on {obstype} because it was already applied on this observation type!"
)
# Revert artificial data that has been added if needed
if restore_altitude: # altitude was overwritten, thus revert it
self.metadf["altitude"] = self.metadf["altitude_backup"]
self.metadf = self.metadf.drop(columns=["altitude_backup"])
elif use_constant_altitude:
# when no alitude was available apriori, remove the fake constant altitude column
self.metadf = self.metadf.drop(columns=["altitude"])
[docs]
def apply_titan_sct_resistant_check(self, obstype="temp"):
"""Apply the TITAN spatial consistency test (resistant).
The SCT resistant check is a spatial consistency check which compares each observations to what is expected given the other observations in the
nearby area. If the deviation is large, the observation is removed. The SCT uses optimal interpolation
(OI) to compute an expected value for each observation. The background for the OI is computed from
a general vertical profile of observations in the area.
See the `titanlib documentation on the sct check <https://github.com/metno/titanlib/wiki/Spatial-consistency-test-resistant>`_
for futher details.
The observation and outliers attributes will be updated accordingly.
Parameters
----------
obstype : String, optional
Name of the observationtype you want to apply the checks on. The
default is 'temp'.
Returns
-------
None.
Note
-------
To update the check settings, use the update_titan_qc_settings method
of the Dataset class.
Warning
--------
To use this method, you must install titanlib. Windows users must have
a c++ compiler installed. See the titanlib documentation: https://github.com/metno/titanlib/wiki/Installation.
Warning
-------
This method is a python wrapper on titanlib c++ scripts, and it is prone
to segmentation faults. The perfomance of this check is thus not
guaranteed!
Examples
--------
.. code-block:: python
import metobs_toolkit
# Import data into a Dataset
dataset = metobs_toolkit.Dataset()
dataset.update_settings(
input_data_file=metobs_toolkit.demo_datafile,
input_metadata_file=metobs_toolkit.demo_metadatafile,
template_file=metobs_toolkit.demo_template,
)
dataset.import_data_from_file()
dataset.coarsen_time_resolution(freq='1h')
#Get altitude of all stations
dataset.get_altitude()
#Update some temperature QC settings
dataset.update_titan_qc_settings(obstype='temp',
sct_outer_radius=25000)
# Apply buddy check on the temperature observations
dataset.apply_titan_sct_resistant_check(obstype='temp')
"""
logger.info("Applying the titan SCT check")
try:
import titanlib
# Add version restrictions??
except ModuleNotFoundError:
logger.warning(
"Titanlib is not installed, install it manually if you want to use this functionallity."
)
return
checkname = "titan_sct_resistant_check"
# check if required metadata is available:
# 1. coordinates are available?
if self.metadf["lat"].isnull().any():
logger.warning(
f"Not all coordinates are available, the {checkname} cannot be executed!"
)
return
if self.metadf["lon"].isnull().any():
logger.warning(
f"Not all coordinates are available, the {checkname} cannot be executed!"
)
return
# 2. altitude available?
if "altitude" not in self.metadf.columns:
logger.warning(
f"The altitude is not known for all stations. The {checkname} cannot be executed!"
)
logger.info(
'(To resolve this error you can: \n *Use the Dataset.get_altitude() method \n *Set use_constant_altitude to True \n update the "altitude" column in the metadf attribute of your Dataset.'
)
return
if self.metadf["altitude"].isnull().any():
logger.warning(
f"The altitude is not known for all stations. The {checkname} cannot be executed!"
)
logger.info(
'(To resolve this error you can: \n *Use the Dataset.get_altitude() method \n *Set use_constant_altitude to True \n *Update the "altitude" column in the metadf attribute of your Dataset.)'
)
return
apliable = _can_qc_be_applied(self, obstype, checkname)
if apliable:
obsdf, outliersdf = titan_sct_resistant_check(
obsdf=self.df,
metadf=self.metadf,
obstype=obstype,
checks_info=self.settings.qc["qc_checks_info"],
checks_settings=self.settings.qc["titan_check_settings"][checkname][
obstype
],
titan_specific_labeler=self.settings.qc["titan_specific_labeler"][
checkname
],
)
# update the dataset and outliers
self.df = obsdf
if not outliersdf.empty:
self.outliersdf = concat_save([self.outliersdf, outliersdf])
# add this check to the applied checks
self._applied_qc = concat_save(
[
self._applied_qc,
conv_applied_qc_to_df(
obstypes=obstype, ordered_checknames=checkname
),
],
ignore_index=True,
)
else:
logger.warning(
f"The {checkname} can NOT be applied on {obstype} because it was already applied on this observation type!"
)
[docs]
def combine_all_to_obsspace(
self,
repr_outl_as_nan=False,
overwrite_outliers_by_gaps_and_missing=True,
):
"""Make one dataframe with all observations and their labels.
Combine all observations, outliers, missing observations and gaps into
one Dataframe. All observation types are combined an a label is added
in a serperate column.
When gaps and missing records are updated from outliers one has to choice
to represent these records as outliers or gaps. There can not be duplicates
in the return dataframe.
By default the observation values of the outliers are saved, one can
choice to use these values or NaN's.
following checks!
Parameters
----------
repr_outl_as_nan : bool, optional
If True, Nan's are use for the values of the outliers. The
default is False.
overwrite_outliers_by_gaps_and_missing : Bool, optional
If True, records that are labeld as gap/missing and outlier are
labeled as gaps/missing. This has only effect when the gaps/missing
observations are updated from the outliers. The default is True.
Returns
---------
combdf : pandas.DataFrame()
A dataframe containing a continious time resolution of records, where each
record is labeld.
Examples
--------
.. code-block:: python
>>> import metobs_toolkit
>>>
>>> # Import data into a Dataset
>>> dataset = metobs_toolkit.Dataset()
>>> dataset.update_settings(
... input_data_file=metobs_toolkit.demo_datafile,
... input_metadata_file=metobs_toolkit.demo_metadatafile,
... template_file=metobs_toolkit.demo_template,
... )
>>> dataset.import_data_from_file()
>>> dataset.coarsen_time_resolution(freq='1h')
>>>
>>> # Apply quality control on the temperature observations
>>> dataset.apply_quality_control(obstype='temp') #Using the default QC settings
>>> dataset
Dataset instance containing:
*28 stations
*['temp', 'humidity', 'wind_speed', 'wind_direction'] observation types
*10080 observation records
*1676 records labeled as outliers
*0 gaps
*3 missing observations
*records range: 2022-09-01 00:00:00+00:00 --> 2022-09-15 23:00:00+00:00 (total duration: 14 days 23:00:00)
*time zone of the records: UTC
*Coordinates are available for all stations.
>>>
>>> # Combine all records to one dataframe in Observation-resolution
>>> overview_df = dataset.combine_all_to_obsspace()
>>> overview_df.head(12)
value ... toolkit_representation
name datetime obstype ...
vlinder01 2022-09-01 00:00:00+00:00 humidity 65.000000 ... observation
temp 18.800000 ... observation
wind_direction 65.000000 ... observation
wind_speed 1.555556 ... observation
2022-09-01 01:00:00+00:00 humidity 65.000000 ... observation
temp 18.400000 ... observation
wind_direction 55.000000 ... observation
wind_speed 1.416667 ... observation
2022-09-01 02:00:00+00:00 humidity 68.000000 ... observation
temp 17.100000 ... observation
wind_direction 45.000000 ... observation
wind_speed 1.583333 ... observation
<BLANKLINE>
[12 rows x 3 columns]
"""
# TODO: label values from settings not hardcoding
# TODO: use the repr_outl_as_nan argumenten here
# =============================================================================
# Stack observations and outliers
# =============================================================================
df = self.df
# better save than sorry
present_obstypes = list(self.obstypes.keys())
df = df[present_obstypes]
# to tripple index
df = (
df.stack(future_stack=True)
.reset_index()
.rename(columns={"level_2": "obstype", 0: "value"})
.set_index(["name", "datetime", "obstype"])
)
df["label"] = "ok"
df["toolkit_representation"] = "observation"
# outliers
outliersdf = self.outliersdf.copy()
outliersdf["toolkit_representation"] = "outlier"
# Careful! Some outliers exist on inport frequency (duplicated, invalid)
# So only use the outliers for which station-datetime-obstype are present in the
# dataset.df
outliersdf = outliersdf[outliersdf.index.isin(df.index)]
# remove outliers from the observations
df = df[~df.index.isin(outliersdf.index)]
# =============================================================================
# Stack gaps
# =============================================================================
# add gapfill and remove the filled records from gaps
gapsfilldf = self.gapfilldf.copy()
# to triple index
gapsfilldf = value_labeled_doubleidxdf_to_triple_idxdf(
gapsfilldf, known_obstypes=list(self.obstypes.keys())
)
gapsfilldf["toolkit_representation"] = "gap fill"
gapsidx = get_gaps_indx_in_obs_space(
gapslist=self.gaps,
obsdf=self.df,
outliersdf=self.outliersdf,
resolutionseries=self.metadf["dataset_resolution"],
)
gapsdf = pd.DataFrame(index=gapsidx, columns=present_obstypes)
gapsdf = (
gapsdf.stack(future_stack=True)
.reset_index()
.rename(columns={"level_2": "obstype", 0: "value"})
.set_index(["name", "datetime", "obstype"])
)
gapsdf["label"] = self.settings.gap["gaps_info"]["gap"]["outlier_flag"]
gapsdf["toolkit_representation"] = "gap"
# Remove gaps from df
df = df[~df.index.isin(gapsdf.index)]
if overwrite_outliers_by_gaps_and_missing:
outliersdf = outliersdf.drop(index=gapsdf.index, errors="ignore")
# Remove gapfill values records from the gaps
gapsdf = gapsdf.drop(index=gapsfilldf.index)
# =============================================================================
# Stack missing
# =============================================================================
missingfilldf = self.missing_fill_df.copy()
missingfilldf = value_labeled_doubleidxdf_to_triple_idxdf(
missingfilldf, known_obstypes=list(self.obstypes.keys())
)
missingfilldf["toolkit_representation"] = "missing observation fill"
# add missing observations if they occure in observation space
missingidx = self.missing_obs.get_missing_indx_in_obs_space(
self.df, self.metadf["dataset_resolution"]
)
missingdf = pd.DataFrame(index=missingidx, columns=present_obstypes)
missingdf = (
missingdf.stack(future_stack=True)
.reset_index()
.rename(columns={"level_2": "obstype", 0: "value"})
.set_index(["name", "datetime", "obstype"])
)
missingdf["label"] = self.settings.gap["gaps_info"]["missing_timestamp"][
"outlier_flag"
]
missingdf["toolkit_representation"] = "missing observation"
# Remove missing from df
df = df[~df.index.isin(missingdf.index)]
if overwrite_outliers_by_gaps_and_missing:
outliersdf = outliersdf.drop(index=missingdf.index, errors="ignore")
# Remove missingfill values records from the missing
missingdf = missingdf.drop(index=missingfilldf.index)
# =============================================================================
# combine all
# =============================================================================
combdf = concat_save(
[df, outliersdf, gapsdf, gapsfilldf, missingdf, missingfilldf]
).sort_index()
combdf.index.names = ["name", "datetime", "obstype"]
# To be shure?
combdf = combdf[~combdf.index.duplicated(keep="first")]
return combdf
[docs]
def get_qc_stats(self, obstype="temp", stationname=None, make_plot=True):
"""Get quality control statistics.
Compute frequency statistics on the qc labels for an observationtype.
The output is a dataframe containing the frequency statistics presented
as percentages.
These frequencies can also be presented as a collection of piecharts
per check.
With stationnames you can subset the data to one ore multiple stations.
Parameters
-----------
obstype : str, optional
Observation type to analyse the QC labels on. The default is
'temp'.
stationname : str, optional
Stationname to subset the quality labels on. If None, all
stations are used. The default is None.
make_plot : Bool, optional
If True, a plot with piecharts is generated. The default is True.
Returns
---------
dataset_qc_stats : pandas.DataFrame
A table containing the label frequencies per check presented
as percentages.
Examples
--------
.. code-block:: python
>>> import metobs_toolkit
>>>
>>> # Import data into a Dataset
>>> dataset = metobs_toolkit.Dataset()
>>> dataset.update_settings(
... input_data_file=metobs_toolkit.demo_datafile,
... input_metadata_file=metobs_toolkit.demo_metadatafile,
... template_file=metobs_toolkit.demo_template,
... )
>>> dataset.import_data_from_file()
>>> dataset.coarsen_time_resolution(freq='1h')
>>>
>>> # Apply quality control on the temperature observations
>>> dataset.apply_quality_control(obstype='temp') #Using the default QC settings
>>> dataset
Dataset instance containing:
*28 stations
*['temp', 'humidity', 'wind_speed', 'wind_direction'] observation types
*10080 observation records
*1676 records labeled as outliers
*0 gaps
*3 missing observations
*records range: 2022-09-01 00:00:00+00:00 --> 2022-09-15 23:00:00+00:00 (total duration: 14 days 23:00:00)
*time zone of the records: UTC
*Coordinates are available for all stations.
>>>
>>> #Get quality control statistics
>>> stats = dataset.get_qc_stats(make_plot=False)
>>> stats
({'ok': 83.37301587301587, 'QC outliers': 16.6269841269...
"""
# cobmine all and get final label
comb_df = self.combine_all_to_obsspace()
# subset to relevant columnt
comb_df = xs_save(comb_df, obstype, level="obstype")[["label"]]
# subset to stationnames
if stationname is not None:
assert stationname in comb_df.index.get_level_values(
"name"
), f" stationnames: {stationname} is not a list."
comb_df = comb_df.loc[stationname]
# compute freq statistics
final_freq, outl_freq, specific_freq = get_freq_statistics(
comb_df=comb_df,
obstype=obstype,
checks_info=self.settings.qc["qc_checks_info"],
gaps_info=self.settings.gap["gaps_info"],
applied_qc_order=self._applied_qc,
)
if any([stat is None for stat in [final_freq, outl_freq, specific_freq]]):
return None
# make title
orig_obstype = self.obstypes[obstype].get_orig_name()
if stationname is None:
title = f"Label frequency statistics on all stations for {orig_obstype}."
else:
title = f"Label frequency statistics for {stationname} for {orig_obstype}."
if make_plot:
# make pie plots
qc_stats_pie(
final_stats=final_freq,
outlier_stats=outl_freq,
specific_stats=specific_freq,
plot_settings=self.settings.app["plot_settings"],
qc_check_info=self.settings.qc["qc_checks_info"],
title=title,
)
return (final_freq, outl_freq, specific_freq)
def update_outliersdf(self, add_to_outliersdf):
"""Update the outliersdf attribute."""
self.outliersdf = concat_save([self.outliersdf, add_to_outliersdf])
[docs]
def coarsen_time_resolution(
self, origin=None, origin_tz=None, freq=None, method=None, limit=None
):
"""Resample the observations to coarser timeresolution.
The assumed dataset resolution (stored in the metadf attribute) will be
updated.
Parameters
----------
origin : datetime.datetime, optional
Define the origin (first timestamp) for the obervations. The origin
is timezone naive, and is assumed to have the same timezone as the
obervations. If None, the earliest occuring timestamp is used as
origin. The default is None.
origin_tz : str, optional
Timezone string of the input observations. Element of
pytz.all_timezones. If None, the timezone from the settings is
used. The default is None.
freq : DateOffset, Timedelta or str, optional
The offset string or object representing target conversion.
Ex: '15min' is 15 minutes, '1h', is one hour. If None, the target time
resolution of the dataset.settings is used. The default is None.
method : 'nearest' or 'bfill', optional
Method to apply for the resampling. If None, the resample method of
the dataset.settings is used. The default is None.
limit : int, optional
Limit of how many values to fill with one original observations. If
None, the target limit of the dataset.settings is used. The default
is None.
Returns
-------
None.
Examples
--------
.. code-block:: python
>>> import metobs_toolkit
>>>
>>> # Import data into a Dataset
>>> dataset = metobs_toolkit.Dataset()
>>> dataset.update_settings(
... input_data_file=metobs_toolkit.demo_datafile,
... input_metadata_file=metobs_toolkit.demo_metadatafile,
... template_file=metobs_toolkit.demo_template,
... )
>>> dataset.import_data_from_file()
>>> dataset.coarsen_time_resolution(freq='15min') #to 15 minutes resolution
>>> dataset.df[['temp', 'humidity']].head()
temp humidity
name datetime
vlinder01 2022-09-01 00:00:00+00:00 18.8 65
2022-09-01 00:15:00+00:00 18.7 65
2022-09-01 00:30:00+00:00 18.7 65
2022-09-01 00:45:00+00:00 18.6 65
2022-09-01 01:00:00+00:00 18.4 65
"""
if freq is None:
freq = self.settings.time_settings["target_time_res"]
if method is None:
method = self.settings.time_settings["resample_method"]
if limit is None:
limit = int(self.settings.time_settings["resample_limit"])
if origin_tz is None:
origin_tz = self.settings.time_settings["timezone"]
logger.info(
f"Coarsening the timeresolution to {freq} using \
the {method}-method (with limit={limit})."
)
# test if coarsening the resolution is valid for the dataset
# 1. If resolution-dep-qc is applied --> coarsening is not valid and will result in a broken dataset
if (
self._applied_qc[
~self._applied_qc["checkname"].isin(
["duplicated_timestamp", "invalid_input"]
)
].shape[0]
> 0
):
logger.warning(
"Coarsening time resolution is not possible because quality control checks that are resolution depening are already performed on the Dataset."
)
logger.info(
"(Apply coarsening_time_resolution BEFORE applying quality control.)"
)
return
# TODO: implement buffer method
df = self.df.reset_index()
if origin is None:
# find earlyest timestamp, if it is on the hour, use it else use the following hour
tstart = df["datetime"].min()
if tstart.minute != 0 or tstart.second != 0 or tstart.microsecond != 0:
# Round up to nearest hour
tstart = tstart.ceil(freq=freq)
else:
origin_tz_aware = pytz.timezone(origin_tz).localize(origin)
tstart = origin_tz_aware.astimezone(
pytz.timezone(self.settings.time_settings["timezone"])
)
# Coarsen timeresolution
if method == "nearest":
df = (
df.set_index("datetime")
.groupby("name")
.resample(freq, origin=tstart)
.nearest(limit=limit)
)
elif method == "bfill":
df = (
df.set_index("datetime")
.groupby("name")
.resample(freq, origin=tstart)
.bfill(limit=limit)
)
else:
logger.warning(f"The coarsening method: {method}, is not implemented yet.")
df = df.set_index(["name", "datetime"])
if "name" in df.columns:
df = df.drop(columns=["name"])
# Update resolution info in metadf
self.metadf["dataset_resolution"] = pd.to_timedelta(freq)
# update df
self.df = df
# Remove gaps and missing from the observatios
# most gaps and missing are already removed but when increasing timeres,
# some records should be removed as well.
self.df = remove_gaps_from_obs(gaplist=self.gaps, obsdf=self.df)
self.df = self.missing_obs.remove_missing_from_obs(obsdf=self.df)
[docs]
def sync_observations(
self,
tolerance,
verbose=True,
_force_resolution_minutes=None,
_drop_target_nan_dt=False,
):
"""Simplify and syncronize the observation timestamps.
To simplify the resolution (per station), a tolerance is use to shift timestamps. The tolerance indicates the
maximum translation in time that can be applied to an observation.
The sycronisation tries to group stations that have an equal simplified resolution, and syncronize them. The origin
of the sycronized timestamps will be set to round hours, round 10-minutes or round-5 minutes if possible given the tolerance.
The observations present in the input file are used.
After syncronization, the IO outliers, missing observations and gaps are recomputed.
Parameters
----------
tolerance : Timedelta or str
The tolerance string or object representing the maximum translation in time.
Ex: '5min' is 5 minutes, '1h', is one hour.
verbose : bool, optional
If True, a dataframe illustrating the mapping from original datetimes to simplified and syncronized is returned. The default is True.
_drop_target_nan_dt : bool, optional
If record has no target datetime, the datetimes will be listed as Nat. To remove them,
set this to True. Default is False.
_force_resolution_minutes : bool, optional
force the resolution estimate to this frequency in minutes. If None, the frequency is estimated. The default is None.
Note
--------
Keep in mind that this method will overwrite the df, outliersdf, missing timestamps and gaps.
Note
--------
Because the used observations are from the input file, previously coarsend timeresolutions are ignored.
Returns
-------
pandas.DataFrame (if verbose is True)
A dataframe containing the original observations with original timestamps and the corresponding target timestamps.
"""
# get columns pressent in metadf, because the input df can have columns
# that does not have to be mapped to the toolkit
assert (
not self.input_df.empty
), "To syncronize a dataset, the (pure) input dataframe cannot be empty."
init_meta_cols = self.metadf.columns.copy()
df = self.input_df
self.df = init_multiindexdf()
self.outliersdf = init_triple_multiindexdf()
self.gapfilldf = init_multiindexdf()
self.missing_obs = None
self.gaps = None
# find simplified resolution
if _force_resolution_minutes is None:
simplified_resolution = get_freqency_series(
df=df,
method="median",
simplify=True,
max_simplify_error=tolerance,
)
else:
if isinstance(_force_resolution_minutes, list):
# TODO
print(
"foce resolution minutes as a list is not implemented yet, sorry."
)
else:
stations = self.metadf.index
freq_series = pd.Series(
index=stations,
data=[timedelta(minutes=float(_force_resolution_minutes))]
* len(stations),
)
simplified_resolution = freq_series
logger.debug(f"Syncronizing to these resolutions: {simplified_resolution}")
occuring_resolutions = simplified_resolution.unique()
df = df.reset_index()
def find_simple_origin(tstart, tolerance):
if tstart.minute == 0 and tstart.second == 0 and tstart.microsecond == 0:
return tstart # already a round hour
# try converting to a round hour
tstart_round_hour = tstart.round("60min")
if abs(tstart - tstart_round_hour) <= pd.to_timedelta(tolerance):
return tstart_round_hour
# try converting to a tenfold in minutes
tstart_round_tenfold = tstart.round("10min")
if abs(tstart - tstart_round_tenfold) <= pd.to_timedelta(tolerance):
return tstart_round_tenfold
# try converting to a fivefold in minutes
tstart_round_fivefold = tstart.round("5min")
if abs(tstart - tstart_round_fivefold) <= pd.to_timedelta(tolerance):
return tstart_round_fivefold
# no suitable conversion found
return tstart
merged_df = pd.DataFrame()
_total_verbose_df = pd.DataFrame()
for occur_res in occuring_resolutions:
group_stations = simplified_resolution[
simplified_resolution == occur_res
].index.to_list()
logger.info(
f" Grouping stations with simplified resolution of {pd.to_timedelta(occur_res)}: {group_stations}"
)
groupdf = df[df["name"].isin(group_stations)]
tstart = groupdf["datetime"].min()
tend = groupdf["datetime"].max()
# find a good origin point
origin = find_simple_origin(tstart=tstart, tolerance=tolerance)
# Create records index
target_records = pd.date_range(
start=origin, end=tend, freq=pd.Timedelta(occur_res)
).to_series()
target_records.name = "target_datetime"
# convert records to new target records, station per station
for sta in group_stations:
stadf = groupdf[groupdf["name"] == sta]
# Drop all nan values! these will be added later from the outliersdf
stadf = stadf.set_index(["name", "datetime"])
# drop all records per statiotion for which there are no obsecvations
present_obs = list(self.obstypes.keys())
stadf = stadf.loc[stadf[present_obs].dropna(axis=0, how="all").index]
stadf = stadf.reset_index()
mergedstadf = pd.merge_asof(
left=stadf.sort_values("datetime"),
right=target_records.to_frame(),
right_on="target_datetime",
left_on="datetime",
direction="nearest",
tolerance=pd.Timedelta(tolerance),
)
if _drop_target_nan_dt:
mergedstadf = mergedstadf.dropna(subset="target_datetime")
# possibility 1: record is mapped crrectly
correct_mapped = mergedstadf[~mergedstadf["target_datetime"].isnull()]
# possibility2: records that ar not mapped to target
# not_mapped_records =mergedstadf[mergedstadf['target_datetime'].isnull()]
# possibilyt 3 : no suitable candidates found for the target
# these will be cached by the missing and gap check
# no_record_candidates = target_records[~target_records.isin(mergedstadf['target_datetime'])].values
merged_df = concat_save([merged_df, correct_mapped])
if verbose:
_total_verbose_df = concat_save([_total_verbose_df, mergedstadf])
# overwrite the df with the synced observations
merged_df = (
merged_df.rename(
columns={
"datetime": "original_datetime",
"target_datetime": "datetime",
}
)
.set_index(["name", "datetime"])
.drop(["original_datetime"], errors="ignore", axis=1)
.sort_index()
)
# self.df = merged_df
# Recompute the dataset attributes, apply qc, gap and missing searches, etc.
self._construct_dataset(
df=merged_df,
freq_estimation_method="highest",
freq_estimation_simplify=False,
freq_estimation_simplify_error=None,
fixed_freq_series=simplified_resolution,
update_full_metadf=False,
) # Do not overwrite full metadf, only the frequencies
self.metadf = self.metadf[
[col for col in self.metadf.columns if col in init_meta_cols]
]
if verbose:
_total_verbose_df = _total_verbose_df.rename(
columns={
"datetime": "original_datetime",
"target_datetime": "datetime",
}
).set_index(["name", "datetime"])
return _total_verbose_df
[docs]
def import_data_from_file(
self,
input_data_file=None,
input_metadata_file=None,
template_file=None,
freq_estimation_method=None,
freq_estimation_simplify=None,
freq_estimation_simplify_error=None,
kwargs_data_read={},
kwargs_metadata_read={},
):
"""Read observations from a csv file.
The paths (data, metadata and template) are stored in the settings if
Dataset.update_settings() is called on this object. These paths can be
updated by adding them as argument to this method.
The input data (and metadata) are interpreted by using a template
(json file).
An estimation of the observational frequency is made per station. This is used
to find missing observations and gaps.
The Dataset attributes are set and the following checks are executed:
* Duplicate check
* Invalid input check
* Find missing observations
* Find gaps
Parameters
----------
input_data_file : string, optional
Path to the input data file with observations. If None, the input
data path in the settings is used.
input_metadata_file : string, optional
Path to the input metadata file. If None, the input metadata path
in the settings is used.
template_file : string, optional
Path to the template (json) file to be used on the observations
and metadata. If None, the template path in the settings is used.
freq_estimation_method : 'highest' or 'median', optional
Select wich method to use for the frequency estimation. If
'highest', the highest apearing frequency is used. If 'median', the
median of the apearing frequencies is used. If None, the method
stored in the
Dataset.settings.time_settings['freq_estimation_method'] is used.
The default is None.
freq_estimation_simplify : bool, optional
If True, the likely frequency is converted to round hours, or round minutes.
The "freq_estimation_simplify_error' is used as a constrain. If the constrain is not met,
the simplification is not performed. If None, the method
stored in the
Dataset.settings.time_settings['freq_estimation_simplify'] is used.
The default is None.
freq_estimation_simplify_error : Timedelta or str, optional
The tolerance string or object representing the maximum translation in time to form a simplified frequency estimation.
Ex: '5min' is 5 minutes, '1h', is one hour. If None, the method
stored in the
Dataset.settings.time_settings['freq_estimation_simplify_error'] is
used. The default is None.
kwargs_data_read : dict, optional
Keyword arguments collected in a dictionary to pass to the
pandas.read_csv() function on the data file. The default is {}.
kwargs_metadata_read : dict, optional
Keyword arguments collected in a dictionary to pass to the
pandas.read_csv() function on the metadata file. The default is {}.
Note
--------
In pracktice, the default arguments will be sufficient for most applications.
Note
--------
If options are present in the template, these will have priority over the arguments of this function.
Warning
---------
All CSV data files must be in *UTF-8 encoding*. For most CSV files,
this condition is already met. To make sure, in Microsoft Excel (or
similar), you can specify to export as **`CSV UTF-8`**. If you
encounter an error, mentioning a `"/ueff..."` tag in a CSV file, it is
often solved by converting the CSV to UTF-8.
Returns
-------
None.
Examples
--------
.. code-block:: python
>>> import metobs_toolkit
>>>
>>> # Import data into a Dataset
>>> dataset = metobs_toolkit.Dataset()
>>> dataset.update_settings(
... input_data_file=metobs_toolkit.demo_datafile,
... input_metadata_file=metobs_toolkit.demo_metadatafile,
... template_file=metobs_toolkit.demo_template,
... )
>>> dataset.import_data_from_file()
"""
logger.info(f'Importing data from file: {self.settings.IO["input_data_file"]}')
# Update paths to the input files, if given.
if input_data_file is not None:
self.update_settings(input_data_file=input_data_file)
if input_metadata_file is not None:
self.update_settings(input_metadata_file=input_metadata_file)
if template_file is not None:
self.update_settings(template_file=template_file)
if freq_estimation_method is None:
freq_estimation_method = self.settings.time_settings[
"freq_estimation_method"
]
if freq_estimation_simplify is None:
freq_estimation_simplify = self.settings.time_settings[
"freq_estimation_simplify"
]
if freq_estimation_simplify_error is None:
freq_estimation_simplify_error = self.settings.time_settings[
"freq_estimation_simplify_error"
]
assert self.settings.templatefile is not None, "No templatefile is specified."
# Read template
self.template.read_template_from_file(jsonpath=self.settings.templatefile)
# overload the timezone from template to the settings
if not self.template._get_tz() is None:
self.update_timezone(self.template.get_tz())
logger.info(
f"Set timezone = {self.template.get_tz()} from options in template."
)
# Read observations into pandas dataframe
df = import_data_from_csv(
input_file=self.settings.IO["input_data_file"],
template=self.template,
known_obstypes=list(self.obstypes.keys()),
kwargs_data_read=kwargs_data_read,
)
# Set timezone information
df.index = df.index.tz_localize(
tz=self.settings.time_settings["timezone"],
ambiguous="infer",
nonexistent="shift_forward",
)
# drop Nat datetimes if present
df = df.loc[pd.notnull(df.index)]
logger.debug(
f'Data from {self.settings.IO["input_data_file"]} \
imported to dataframe {df.head()}.'
)
if self.settings.IO["input_metadata_file"] is None:
logger.warning(
"No metadata file is defined,\
no metadata attributes can be set!"
)
else:
logger.info(
f'Importing metadata from file: {self.settings.IO["input_metadata_file"]}'
)
meta_df = import_metadata_from_csv(
input_file=self.settings.IO["input_metadata_file"],
template=self.template,
kwargs_metadata_read=kwargs_metadata_read,
)
# in dataset of one station
if self.template._is_data_single_station():
# logger.warning("No station names find in the observations!")
# If there is ONE name in the metadf, than we use that name for
# the df, else we use the default name
if ("name" in meta_df.columns) & (meta_df.shape[0] == 1):
name = meta_df["name"].iloc[0]
df["name"] = name
logger.warning(
f"One stationname found in the metadata: {name}, this name is used for the data."
)
else:
df["name"] = str(self.settings.app["default_name"])
# for later merging, we add the name column with the default
# also in the metadf
meta_df["name"] = str(self.settings.app["default_name"])
logger.warning(
f'Assume the dataset is for ONE station with the \
default name: {self.settings.app["default_name"]}.'
)
# merge additional metadata to observations
logger.debug(f"Head of data file, before merge: {df.head()}")
logger.debug(f"Head of metadata file, before merge: {meta_df.head()}")
meta_cols = [
colname for colname in meta_df.columns if not colname.startswith("_")
]
additional_meta_cols = list(set(meta_cols).difference(df.columns))
if bool(additional_meta_cols):
logger.debug(
f"Merging metadata ({additional_meta_cols}) to dataset data by name."
)
additional_meta_cols.append("name") # merging on name
# merge deletes datetime index somehow? so add it back.
df_index = df.index
df = df.merge(
right=meta_df[additional_meta_cols], how="left", on="name"
)
df.index = df_index
# update dataset object
# Remove stations whith only one observation (no freq estimation)
station_counts = df["name"].value_counts()
issue_station = station_counts[station_counts < 2].index.to_list()
logger.warning(
f"These stations will be removed because of only having one record: {issue_station}"
)
df = df[~df["name"].isin(issue_station)]
# convert dataframe to multiindex (datetime - name)
df = df.set_index(["name", df.index])
# Sort by name and then by datetime (to avoid negative freq)
df = df.sort_index(level=["name", "datetime"])
# dataframe with all data of input file
self.input_df = df.sort_index(level=["name", "datetime"])
# Construct all attributes of the Dataset
self._construct_dataset(
df=df,
freq_estimation_method=freq_estimation_method,
freq_estimation_simplify=freq_estimation_simplify,
freq_estimation_simplify_error=freq_estimation_simplify_error,
)
def _construct_dataset(
self,
df,
freq_estimation_method,
freq_estimation_simplify,
freq_estimation_simplify_error,
fixed_freq_series=None,
update_full_metadf=True,
):
"""Construct the Dataset class from a IO dataframe.
The df, metadf, outliersdf, gaps, missing timestamps and observationtypes attributes are set.
The observations are converted to the toolkit standard units if possible.
Qc on IO is applied (duplicated check and invalid check) + gaps and missing
values are defined by assuming a frequency per station.
Parameters
----------
df : pandas.dataframe
The dataframe containing the input observations and metadata.
freq_estimation_method : 'highest' or 'median'
Select wich method to use for the frequency estimation. If
'highest', the highest apearing frequency is used. If 'median', the
median of the apearing frequencies is used.
freq_estimation_simplify : bool
If True, the likely frequency is converted to round hours, or round minutes.
The "freq_estimation_simplify_error' is used as a constrain. If the constrain is not met,
the simplification is not performed.
freq_estimation_simplify_error : Timedelta or str, optional
The tolerance string or object representing the maximum translation in time to form a simplified frequency estimation.
Ex: '5min' is 5 minutes, '1h', is one hour.
fixed_freq_series : pandas.series or None, optional
If you do not want the frequencies to be recalculated, one can pass the
frequency series to update the metadf["dataset_resolution"]. If None, the frequencies will be estimated. The default is None.
update_full_metadf : bool, optional
If True, the full Dataset.metadf will be updated. If False, only the frequency columns in the Dataset.metadf will be updated. The default is True.
Returns
-------
None.
"""
# Convert dataframe to dataset attributes
self._initiate_df_attribute(dataframe=df, update_metadf=update_full_metadf)
# Check observation types and convert units if needed.
self._setup_of_obstypes_and_units()
# Apply quality control on Import resolution
self._apply_qc_on_import()
if fixed_freq_series is None:
freq_series = get_freqency_series(
df=self.df,
method=freq_estimation_method,
simplify=freq_estimation_simplify,
max_simplify_error=freq_estimation_simplify_error,
)
freq_series_import = freq_series
else:
if "assumed_import_frequency" in self.metadf.columns:
freq_series_import = self.metadf[
"assumed_import_frequency"
] # No update
else:
freq_series_import = fixed_freq_series
freq_series = fixed_freq_series
# add import frequencies to metadf (after import qc!)
self.metadf["assumed_import_frequency"] = freq_series_import
self.metadf["dataset_resolution"] = freq_series
# Remove gaps and missing from the observations AFTER timecoarsening
self.df = remove_gaps_from_obs(gaplist=self.gaps, obsdf=self.df)
self.df = self.missing_obs.remove_missing_from_obs(obsdf=self.df)
def _initiate_df_attribute(self, dataframe, update_metadf=True):
"""Initialize dataframe attributes."""
logger.info(f"Updating dataset by dataframe with shape: {dataframe.shape}.")
# Create dataframe with fixed order of observational columns
obs_col_order = [
col for col in list(self.obstypes.keys()) if col in dataframe.columns
]
self.df = dataframe[obs_col_order].sort_index()
if update_metadf:
metadf = dataframe
# create metadataframe
metacolumns = list(self.template._get_metadata_column_map().values())
metacolumns.remove("name") # This is the index
metadf = metadf.reindex(columns=metacolumns)
metadf.index = metadf.index.droplevel("datetime") # drop datetimeindex
# drop dubplicates due to datetime
metadf = metadf[~metadf.index.duplicated(keep="first")]
# "lat' and 'lon' column are required, so add them as empty if not present
if "lat" not in metadf.columns:
metadf["lat"] = np.nan
if "lon" not in metadf.columns:
metadf["lon"] = np.nan
self.metadf = metadf_to_gdf(metadf)
def _apply_qc_on_import(self):
# if the name is Nan, remove these records from df, and metadf (before)
# they end up in the gaps and missing obs
if np.nan in self.df.index.get_level_values("name"):
logger.warning(
f'Following observations are not linked to a station name and will be removed: {xs_save(self.df, np.nan, "name")}'
)
self.df = self.df[~self.df.index.get_level_values("name").isna()]
if np.nan in self.metadf.index:
logger.warning(
f"Following station will be removed from the Dataset {self.metadf[self.metadf.index.isna()]}"
)
self.metadf = self.metadf[~self.metadf.index.isna()]
# find missing obs and gaps, and remove them from the df
self.missing_obs, self.gaps = missing_timestamp_and_gap_check(
df=self.df,
gapsize_n=self.settings.gap["gaps_settings"]["gaps_finder"]["gapsize_n"],
)
# Create gaps and missing obs objects
# self.gaps = gaps_list
# self.missing_obs = Missingob_collection(missing_obs)
# Perform QC checks on original observation frequencies
self.df, dup_outl_df = duplicate_timestamp_check(
df=self.df,
checks_info=self.settings.qc["qc_checks_info"],
checks_settings=self.settings.qc["qc_check_settings"],
)
if not dup_outl_df.empty:
self.update_outliersdf(add_to_outliersdf=dup_outl_df)
self.df, nan_outl_df = invalid_input_check(
self.df, checks_info=self.settings.qc["qc_checks_info"]
)
if not nan_outl_df.empty:
self.update_outliersdf(nan_outl_df)
self.outliersdf = self.outliersdf.sort_index()
# update the order and which qc is applied on which obstype
checked_obstypes = [
obs for obs in self.df.columns if obs in self.obstypes.keys()
]
checknames = ["duplicated_timestamp", "invalid_input"] # KEEP order
self._applied_qc = concat_save(
[
self._applied_qc,
conv_applied_qc_to_df(
obstypes=checked_obstypes, ordered_checknames=checknames
),
],
ignore_index=True,
)
def _setup_of_obstypes_and_units(self):
"""Function to setup all attributes related to observation types and
convert to standard units."""
# Check if all present observation types are known.
unknown_obs_cols = [
obs_col
for obs_col in self.df.columns
if obs_col not in self.obstypes.keys()
]
if len(unknown_obs_cols) > 0:
sys.exit(f"The following observation types are unknown: {unknown_obs_cols}")
for obs_col in self.df.columns:
# Convert the units to the toolkit standards (if unit is known)
self.df[obs_col] = self.obstypes[obs_col].convert_to_standard_units(
input_data=self.df[obs_col],
input_unit=self.template._get_input_unit_of_tlk_obstype(obs_col),
)
# Update the description of the obstype
description = self.template._get_description_of_tlk_obstype(obs_col)
if pd.isna(description):
description = None
self.obstypes[obs_col].set_description(desc=description)
# Update the original column name and original units
self.obstypes[obs_col].set_original_name(
self.template._get_original_obstype_columnname(obs_col)
)
self.obstypes[obs_col].set_original_unit(
self.template._get_input_unit_of_tlk_obstype(obs_col)
)
# subset the obstypes attribute
self.obstypes = {
name: obj for name, obj in self.obstypes.items() if name in self.df.columns
}
# =============================================================================
# Physiography extractions
# =============================================================================
[docs]
def get_lcz(self):
"""Extract local climate zones for all stations.
Function to extract the Local CLimate zones (LCZ) from the
wudapt global LCZ map on the Google engine for all stations.
A 'LCZ' column will be added to the metadf, and series is returned.
Returns
-------
lcz_series : pandas.Series()
A series with the stationnames as index and the LCZ as values.
Examples
--------
.. code-block:: python
import metobs_toolkit
# Import data into a Dataset
dataset = metobs_toolkit.Dataset()
dataset.update_settings(
input_data_file=metobs_toolkit.demo_datafile,
input_metadata_file=metobs_toolkit.demo_metadatafile,
template_file=metobs_toolkit.demo_template,
)
dataset.import_data_from_file()
# Get the local climate zones for all stations
lcz_series = dataset.get_lcz()
# in addition to the returned series, the metadf attribute is updated aswell
print(dataset.metadf)
"""
# connect to gee
connect_to_gee()
# Extract LCZ for all stations
lcz_series = lcz_extractor(
metadf=self.metadf,
mapinfo=self.settings.gee["gee_dataset_info"]["global_lcz_map"],
)
# drop column if it was already present
if "lcz" in self.metadf:
self.metadf = self.metadf.drop(columns=["lcz"])
# update metadata
self.metadf = self.metadf.merge(
lcz_series.to_frame(),
how="left",
left_index=True,
right_index=True,
)
return lcz_series
[docs]
def get_altitude(self):
"""Extract Altitudes for all stations.
Function to extract the Altitude from the SRTM Digital Elevation Data
global map on the Google engine for all stations.
A 'altitude' column will be added to the metadf, and series is returned.
Returns
-------
altitude_series : pandas.Series()
A series with the stationnames as index and the altitudes as values.
Examples
--------
.. code-block:: python
import metobs_toolkit
# Import data into a Dataset
dataset = metobs_toolkit.Dataset()
dataset.update_settings(
input_data_file=metobs_toolkit.demo_datafile,
input_metadata_file=metobs_toolkit.demo_metadatafile,
template_file=metobs_toolkit.demo_template,
)
dataset.import_data_from_file()
# Get the altitude for all stations
alt_series = dataset.get_altitude()
# in addition to the returned series, the metadf attribute is updated aswell
print(dataset.metadf)
"""
# connect to gee
connect_to_gee()
# Extract LCZ for all stations
altitude_series = height_extractor(
metadf=self.metadf,
mapinfo=self.settings.gee["gee_dataset_info"]["DEM"],
)
# drop column if it was already present
if "altitude" in self.metadf:
self.metadf = self.metadf.drop(columns=["altitude"])
# update metadata
self.metadf = self.metadf.merge(
altitude_series.to_frame(),
how="left",
left_index=True,
right_index=True,
)
return altitude_series
[docs]
def get_landcover(
self,
buffers=[100],
aggregate=True,
overwrite=True,
gee_map="worldcover",
):
"""Extract landcover for all stations.
Extract the landcover fractions in a buffer with a specific radius for
all stations. If an aggregation scheme is define, one can choose to
aggregate the landcoverclasses.
The landcover fractions will be added to the Dataset.metadf if overwrite
is True. Presented as seperate columns where each column represent the
landcovertype and corresponding buffer.
Parameters
----------
buffers : num, optional
The list of buffer radia in dataset units (meters for ESA worldcover) . The default is 100.
aggregate : bool, optional
If True, the classes will be aggregated with the corresponding
aggregation scheme. The default is True.
overwrite : bool, optional
If True, the Datset.metadf will be updated with the generated
landcoverfractions. The default is True.
gee_map : str, optional
The name of the dataset to use. This name should be present in the
settings.gee['gee_dataset_info']. If aggregat is True, an aggregation
scheme should included as well. The default is 'worldcover'
Returns
-------
frac_df : pandas.DataFrame
A Dataframe with index: name, buffer_radius and the columns are the
fractions.
Examples
--------
.. code-block:: python
import metobs_toolkit
# Import data into a Dataset
dataset = metobs_toolkit.Dataset()
dataset.update_settings(
input_data_file=metobs_toolkit.demo_datafile,
input_metadata_file=metobs_toolkit.demo_metadatafile,
template_file=metobs_toolkit.demo_template,
)
dataset.import_data_from_file()
# Get the landcover fractions for multiple buffers, for all stations
lc_frac_series = dataset.get_landcover(buffers=[50, 100, 250, 500],
aggregate=False)
# in addition to the returned dataframe, the metadf attribute is updated aswell
print(dataset.metadf)
"""
# connect to gee
connect_to_gee()
df_list = []
for buffer in buffers:
logger.info(
f"Extracting landcover from {gee_map} with buffer radius = {buffer}"
)
# Extract landcover fractions for all stations
lc_frac_df, buffer = lc_fractions_extractor(
metadf=self.metadf,
mapinfo=self.settings.gee["gee_dataset_info"][gee_map],
buffer=buffer,
agg=aggregate,
)
# add buffer to the index
lc_frac_df["buffer_radius"] = buffer
lc_frac_df = lc_frac_df.reset_index().set_index(["name", "buffer_radius"])
lc_frac_df = lc_frac_df.sort_index()
# add to the list
df_list.append(lc_frac_df)
# concat all df for different buffers to one
frac_df = concat_save(df_list)
frac_df = frac_df.sort_index()
if overwrite:
for buf in frac_df.index.get_level_values("buffer_radius").unique():
buf_df = xs_save(frac_df, buf, level="buffer_radius")
buf_df.columns = [col + f"_{int(buf)}m" for col in buf_df.columns]
# overwrite the columns or add them if they did not exist
self.metadf[buf_df.columns] = buf_df
return frac_df
[docs]
def make_gee_plot(self, gee_map, show_stations=True, save=False, outputfile=None):
"""Make an interactive plot of a google earth dataset.
The location of the stations can be plotted on top of it.
Parameters
----------
gee_map : str, optional
The name of the dataset to use. This name should be present in the
settings.gee['gee_dataset_info']. If aggregat is True, an aggregation
scheme should included as well. The default is 'worldcover'
show_stations : bool, optional
If True, the stations will be plotted as markers. The default is True.
save : bool, optional
If True, the map will be saved as an html file in the output_folder
as defined in the settings if the outputfile is not set. The
default is False.
outputfile : str, optional
Specify the path of the html file if save is True. If None, and save
is true, the html file will be saved in the output_folder. The
default is None.
Returns
-------
Map : geemap.foliumap.Map
The folium Map instance.
Warning
---------
To display the interactive map a graphical backend is required, which
is often missing on (free) cloud platforms. Therefore it is better to
set save=True, and open the .html in your browser
"""
# Connect to GEE
connect_to_gee()
# get the mapinfo
mapinfo = self.settings.gee["gee_dataset_info"][gee_map]
# Read in covers, numbers and labels
covernum = list(mapinfo["colorscheme"].keys())
colors = list(mapinfo["colorscheme"].values())
covername = [mapinfo["categorical_mapper"][covnum] for covnum in covernum]
# create visparams
vis_params = {
"min": min(covernum),
"max": max(covernum),
"palette": colors, # hex colors!
}
if "band_of_use" in mapinfo:
band = mapinfo["band_of_use"]
else:
band = None
Map = folium_plot(
mapinfo=mapinfo,
band=band,
vis_params=vis_params,
labelnames=covername,
layername=gee_map,
legendname=f"{gee_map} covers",
# showmap = show,
)
if show_stations:
if not _validate_metadf(self.metadf):
logger.warning(
"Not enough coordinates information is provided to plot the stations."
)
else:
Map = add_stations_to_folium_map(Map=Map, metadf=self.metadf)
# Save if needed
if save:
if outputfile is None:
# Try to save in the output folder
if self.settings.IO["output_folder"] is None:
logger.warning(
"The outputfolder is not set up, use the update_settings to specify the output_folder."
)
else:
filename = f"gee_{gee_map}_figure.html"
filepath = os.path.join(self.settings.IO["output_folder"], filename)
else:
# outputfile is specified
# 1. check extension
if not outputfile.endswith(".html"):
outputfile = outputfile + ".html"
filepath = outputfile
print(f"Gee Map will be save at {filepath}")
logger.info(f"Gee Map will be save at {filepath}")
Map.save(filepath)
return Map
def _can_qc_be_applied(dataset, obstype, checkname):
"""Test if a qc check can be applied."""
# test if check is already applied on the obstype
applied_df = dataset._applied_qc
can_be_applied = (
not applied_df[
(applied_df["obstype"] == obstype) & (applied_df["checkname"] == checkname)
].shape[0]
> 0
)
if not can_be_applied:
logger.warning(
f"The {checkname} check can NOT be applied on {obstype} because it was already applied on this observation type!"
)
return False
# test of all settings are present for the check on the obstype
if checkname not in [
"duplicated_timestamp",
"titan_buddy_check",
"titan_sct_resistant_check",
]:
# these checks are obstype depending,
required_keys = list(
dataset.settings.qc["qc_check_settings"][checkname]["temp"].keys()
) # use temp to find all required settings
if obstype not in dataset.settings.qc["qc_check_settings"][checkname].keys():
logger.warning(
f"The {checkname} check can NOT be applied on {obstype} because none of the required check settings are found. The following are missing: {required_keys}"
)
return False
if not all(
[
req_key
in dataset.settings.qc["qc_check_settings"][checkname][obstype].keys()
for req_key in required_keys
]
):
# not all required settings are available
missing_settings = [
req_key
for req_key in required_keys
if req_key
not in dataset.settings.qc["qc_check_settings"][checkname][
obstype
].keys()
]
logger.warning(
f"The {checkname} check can NOT be applied on {obstype} because not all required check settings ar found. The following are missing: {missing_settings}"
)
return False
return True