Source code for eobox.raster.cube

import numpy as np
import pandas as pd
from pathlib import Path
import rasterio
import time
from tqdm import tqdm
import warnings

from .rasterprocessing import MultiRasterIO
from .gdalutils import buildvrt

from .utils import dtype_checker_df
from .utils import cleanup_df_values_for_given_dtype


[docs]class EOCubeAbstract(): def __init__(self, df_layers, chunksize=2**5, wdir=None): self._df_layers = df_layers self._chunksize = chunksize # In this whole class we could directly get the windows from the # eobox.raster.windows_from_blocksize(blocksize_xy, width, height) # then we do not need the more expensive initialization of MultiRasterIO self._mrio = None self._initialize_mrio_if_attr_is_none() if wdir: self._wdir = Path(wdir) else: self._wdir = wdir @property def df_layers(self): return self._df_layers @property def chunksize(self): return self._chunksize @chunksize.setter def chunksize(self, value): self._mrio = self._mrio.windows_from_blocksize(value) self._chunksize = value @property def n_chunks(self): return len(self._mrio.windows) @property def wdir(self): return self._wdir @wdir.setter def wdir(self, value): self._wdir = value def _initialize_mrio_if_attr_is_none(self): if not self._mrio: self._mrio = MultiRasterIO(layer_files=self.df_layers["path"].values) self._mrio = self._mrio.windows_from_blocksize(self.chunksize)
[docs] def get_chunk_path_from_layer_path(self, path_layer, ji, mkdir=True): path_layer = Path(path_layer) bdir_chunks = path_layer.parent / f"xchunks_cs{self.chunksize}" / path_layer.stem if mkdir: bdir_chunks.mkdir(exist_ok=True, parents=True) ndigits = len(str((self.n_chunks))) dst_path_chunk = bdir_chunks / (path_layer.stem + f"_ji-{ji:0{ndigits}d}.tif") return dst_path_chunk
[docs] def get_df_ilocs(self, band, date): """Get positions of rows matching specific band(s) and date(s). The method supports three typical queries: * one band and one date (both given as strings) * one band and of several dates (band given as strings, date as list of strings) * several band and of one date (date given as strings, band as list of strings) Parameters ---------- band : str or list Band(s) for which to derive the iloc index. date : str or list Date(s) for which to derive the iloc index. Returns ------- int or list Integer (if band and date are str) or list of iloc indices. """ df = self.df_layers.copy() df["index"] = range(df.shape[0]) idx_layers = [] if isinstance(band, str) and isinstance(date, str): idx_layers = df[(df["date"] == date) & (df["band"] == band)]["index"].values[0] if isinstance(band, list) and isinstance(date, str): for b in band: idx = df[(df["date"] == date) & (df["band"] == b)]["index"].values[0] idx_layers.append(idx) elif isinstance(band, str) and isinstance(date, list): for d in date: idx = df[(df["band"] == band) & (df["date"] == d)]["index"].values[0] idx_layers.append(idx) return idx_layers
[docs]class EOCube(EOCubeAbstract): def __init__(self, df_layers, chunksize=2**5, wdir=None): super().__init__(df_layers=df_layers, chunksize=chunksize, wdir=wdir)
[docs] def get_chunk(self, ji): """Get a EOCubeChunk""" return EOCubeChunk.from_eocube(self, ji)
[docs] def apply_and_write(self, fun, dst_paths, **kwargs): ji_process = [] for ji in range(self.n_chunks): dst_paths_ji_exist = [self.get_chunk_path_from_layer_path( dst, ji, mkdir=False).exists() for dst in dst_paths] if not all(dst_paths_ji_exist): ji_process.append(ji) if len(ji_process) != self.n_chunks: print(f"{self.n_chunks - len(ji_process)} chunks already processed and skipped.") results = [] for ji in tqdm(ji_process, total=len(ji_process)): eoc_chunk = self.get_chunk(ji) results.append(fun(eoc_chunk, dst_paths, **kwargs)) for pth in dst_paths: if not Path(pth).exists(): self.create_vrt_from_chunks(pth)
[docs] def create_vrt_from_chunks(self, dst_path): chunk_paths = [] for ji in range(self.n_chunks): pth = self.get_chunk_path_from_layer_path(dst_path, ji, mkdir=True).absolute() if not pth.exists(): raise FileNotFoundError(pth) else: chunk_paths.append(str(pth)) buildvrt(chunk_paths, str(dst_path), relative=True)
[docs]class EOCubeChunk(EOCubeAbstract): def __init__(self, ji, df_layers, chunksize=2**5, wdir=None): super().__init__(df_layers=df_layers, chunksize=chunksize, wdir=wdir) self._ji = ji self._data = None # set with self.read_data() self._data_structure = None # can be ndarray, dataframe self._window = self._mrio.windows[self.ji] self._width = self._window.width self._height = self._window.height self._n_layers = self.df_layers.shape[0] self.result = None self._spatial_bounds = self._get_spatial_bounds() @property def ji(self): return self._ji @property def data(self): return self._data @property def chunksize(self): return self._chunksize @property def spatial_bounds(self): return self._spatial_bounds @chunksize.setter def chunksize(self, value): raise NotImplementedError("It is not allowed to set the EOCubeChunk chunksize (but of a EOCube object).") def _get_spatial_bounds(self): """Get the spatial bounds of the chunk.""" # This should be a MultiRasterIO method with rasterio.open(self._mrio._get_template_for_given_resolution(self._mrio.dst_res, "path")) as src_layer: pass # later we need src_layer for src_layer.window_transform(win) win_transform = src_layer.window_transform(self._window) bounds = rasterio.windows.bounds(window=self._window, transform=win_transform, height=0, width=0) return bounds
[docs] def read_data(self): self._data = self._mrio.get_arrays(self.ji) if self.data.shape[0] * self.data.shape[1] != self._width * self._height: raise Exception(f"X/Y dimension size of extracted window (={'/'.join(self.data.shape)}) different from expected shape (={self._width}/{self._height}).") self._update_data_structure() return self
def _update_data_structure(self): self._data_structure = self.data.__class__.__name__
[docs] def convert_data_to_dataframe(self): if self._data_structure != "ndarray": raise Exception(f"Data is not an ndarray but {self._data_structure}.") if "uname" not in self.df_layers.columns: raise Exception("You need a column named uname with unique names for all layers.") if not self.df_layers["uname"].is_unique: raise Exception("You need a column named uname with unique names for all layers.") self._data = pd.DataFrame(self._reshape_3to2d(self._data)) self._update_data_structure() self._data.columns = self.df_layers["uname"] return self
[docs] def convert_data_to_ndarray(self): """Converts the data from dataframe to ndarray format. Assumption: df-columns are ndarray-layers (3rd dim.)""" if self._data_structure != "DataFrame": raise Exception(f"Data is not a DataFrame but {self._data_structure}.") self._data = self._convert_to_ndarray(self._data) self._update_data_structure() return self
def _convert_to_ndarray(self, data): """Converts data from dataframe to ndarray format. Assumption: df-columns are ndarray-layers (3rd dim.)""" if data.__class__.__name__ != "DataFrame": raise Exception(f"data is not a DataFrame but {data.__class__.__name__}.") shape_ndarray = (self._height, self._width, data.shape[1]) data_ndarray = data.values.reshape(shape_ndarray) return data_ndarray
[docs] def write_dataframe(self, result, dst_paths, nodata=None, compress='lzw'): """Write results (dataframe) to disc.""" result = self._convert_to_ndarray(result) self.write_ndarray(result, dst_paths, nodata=nodata, compress=compress)
[docs] def write_ndarray(self, result, dst_paths, nodata=None, compress='lzw'): """Write results (ndarray) to disc.""" assert len(dst_paths) == result.shape[2] assert result.shape[0] == self._height assert result.shape[1] == self._width assert result.shape[2] == len(dst_paths) with rasterio.open(self._mrio._get_template_for_given_resolution(self._mrio.dst_res, "path")) as src_layer: pass # later we need src_layer for src_layer.window_transform(win) for i, pth in enumerate(dst_paths): dst_path_chunk = self.get_chunk_path_from_layer_path(pth, self.ji) result_layer_i = result[:, :, [i]] assert result_layer_i.shape[2] == 1 kwargs = self._mrio._get_template_for_given_resolution( res=self._mrio.dst_res, return_="meta").copy() kwargs.update({"driver": "GTiff", "compress": compress, "nodata": nodata, "height": self._height, "width": self._width, "dtype": result_layer_i.dtype, "transform": src_layer.window_transform(self._window)}) with rasterio.open(dst_path_chunk, "w", **kwargs) as dst: dst.write(result_layer_i[:, :, 0], 1)
[docs] @staticmethod def robust_data_range(arr, robust=False, vmin=None, vmax=None): """Get a robust data range, i.e. 2nd and 98th percentile for vmin, vmax parameters.""" # from the seaborn code # https://github.com/mwaskom/seaborn/blob/3a3ec75befab52c02650c62772a90f8c23046038/seaborn/matrix.py#L201 def _get_vmin_vmax(arr2d, vmin=None, vmax=None): if vmin is None: vmin = np.percentile(arr2d, 2) if robust else arr2d.min() if vmax is None: vmax = np.percentile(arr2d, 98) if robust else arr2d.max() return vmin, vmax if len(arr.shape) == 3 and vmin is None and vmax is None: vmin = [] vmax = [] for i in range(arr.shape[2]): arr_i = arr[:, :, i] vmin_i, vmax_i = _get_vmin_vmax(arr_i, vmin=None, vmax=None) vmin.append(vmin_i) vmax.append(vmax_i) else: vmin, vmax = _get_vmin_vmax(arr, vmin=vmin, vmax=vmax) return vmin, vmax
[docs] def plot_raster(self, idx_layer, robust=False, vmin=None, vmax=None, spatial_bounds=False, figsize=None, ax=None): import matplotlib.pyplot as plt if self.data is None: self.read_data() arr = self.data[:, :, idx_layer] if ax is None: fig, ax = plt.subplots(figsize=figsize) vmin, vmax = self.robust_data_range(arr, robust=robust, vmin=vmin, vmax=vmax) ax = ax.imshow(arr, extent=self.spatial_bounds if spatial_bounds else None, vmin=vmin, vmax=vmax) return ax
[docs] def plot_raster_rgb(self, idx_layers, robust=False, vmin=None, vmax=None, spatial_bounds=False, figsize=None, ax=None): import matplotlib.pyplot as plt def _select_vmxx(vmxx, i): """If vmxx is a list, return i-th value of the list, else return vmxx.""" if isinstance(vmxx, list): return vmxx[i] else: return vmxx if self.data is None: self.read_data() arr = self.data[:, :, idx_layers].astype(float) for i in range(arr.shape[2]): arr_i = arr[:, :, i] vmin_i, vmax_i = self.robust_data_range(arr_i, robust=robust, vmin=_select_vmxx(vmin, i), vmax=_select_vmxx(vmax, i)) arr_i[arr_i < vmin_i] = vmin_i arr_i[arr_i > vmax_i] = vmax_i arr[:, :, i] = (arr_i - vmin_i) / (vmax_i - vmin_i) if ax is None: fig, ax = plt.subplots(figsize=figsize) ax = ax.imshow(arr, extent=self.spatial_bounds if spatial_bounds else None) return ax
@staticmethod def _reshape_3to2d(arr_3d): new_shape = (arr_3d.shape[0] * arr_3d.shape[1], arr_3d.shape[2],) return arr_3d.reshape(new_shape)
[docs] @staticmethod def from_eocube(eocube, ji): """Create a EOCubeChunk object from an EOCube object.""" eocubewin = EOCubeChunk(ji, eocube.df_layers, eocube.chunksize, eocube.wdir) return eocubewin
[docs]class EOCubeSceneCollectionAbstract(EOCubeAbstract): def __init__(self, df_layers, chunksize, variables, qa, qa_valid, # timeless=None, wdir=None): """Handling scene collections, i.e. a set of scenes each with the same layers. The class enables to perform chunkwise processing over a set of scenes having the same variables / bands. Therefore the ``df_layers`` dataframe requires information to be stored in the following columns: * *sceneid* (unique identifier of the scene), * *date* (the aquisition date of the scene as datetime type), * *band* (the layers / bands that exist for all scenes), * *uname* (unique identifier for all layers, i.e. scene + variable/qu-layer), * *path* (the path where the raster for that layer is located). Parameters ---------- df_layers : dataframe A dataframe, see description above. chunksize : int Size of the spatial window used as processing unit. variables : list of str Those values in ``df_layers['band']`` that are treated as variables. qa : str The value in ``df_layers['band']`` which is treated as quality assessment layer. qa_valid : list of int The values in the qualitiy assessment layer that identify pixels to be considered as valid in the variable rasters., by default None wdir : str, optional Working directory Raises ------ ValueError [description] """ # validation and formatting n_per_sceneid = len(variables) + 1 # i.e. qa layer scenes_complete = df_layers.groupby("sceneid").apply( lambda x: x["band"].isin(variables + [qa]).sum() == n_per_sceneid) if not all(scenes_complete): scenes_incomplete = scenes_complete.index[~scenes_complete].values raise ValueError(f"Variable or qa layers missing in the following scenes: {scenes_incomplete}") df_layers = df_layers.sort_values(["date", "sceneid", "band"]) EOCubeAbstract.__init__(self, df_layers=df_layers, chunksize=chunksize, wdir=wdir) self._variables = variables self._qa = qa self._qa_valid = qa_valid # self._timeless = timeless @property def variables(self): return self._variables @property def qa(self): return self._qa @property def qa_valid(self): return self._qa_valid
""" @property def timeless(self): return self._timeless """
[docs]class EOCubeSceneCollection(EOCubeSceneCollectionAbstract, EOCube):
[docs] def get_chunk(self, ji): """Get a EOCubeSceneCollectionChunk""" return EOCubeSceneCollectionChunk(ji=ji, df_layers=self.df_layers, chunksize=self.chunksize, variables=self.variables, qa=self.qa, qa_valid=self.qa_valid, wdir=self.wdir)
[docs] def apply_and_write_by_variable(self, fun, dst_paths, dtypes, compress, nodata, **kwargs): def check_variable_specific_args(arg, variables): if isinstance(arg, dict): if not set(arg.keys()) == set(self.variables): raise ValueError("'dtypes'-, 'nodata'-dicts keys must match (self.)variables.") else: return arg # TODO: better validate the args before else: # TODO: validate the args before return {var: arg for var in variables} dtypes = check_variable_specific_args(dtypes, self.variables) nodata = check_variable_specific_args(nodata, self.variables) compress = check_variable_specific_args(compress, self.variables) ji_process = {} for var in self.variables: ji_process[var] = [] counter = 0 for ji in range(self.n_chunks): dst_paths_ji_exist = [self.get_chunk_path_from_layer_path( dst, ji, mkdir=False).exists() for dst in dst_paths[var]] if all(dst_paths_ji_exist): counter += 1 else: ji_process[var].append(ji) if len(ji_process[var]) != self.n_chunks: print(f"{var}: {counter} / {self.n_chunks} chunks already processed and skipped.") # a sorted list of all chunks that we need to run somewhere ji_process_over_all_variables = [] for key, process in ji_process.items(): ji_process_over_all_variables += process ji_process_over_all_variables = np.unique(ji_process_over_all_variables) for ji in tqdm(ji_process_over_all_variables, total=len(ji_process)): eoc_chunk = self.get_chunk(ji) eoc_chunk = eoc_chunk.read_data_by_variable(mask=True) results = {} for var in self.variables: if ji in ji_process[var]: results[var] = fun(eoc_chunk.data[var], **kwargs) results[var] = cleanup_df_values_for_given_dtype(results[var], dtype=dtypes[var], lower_as=None, higher_as=None, nan_as=None) eoc_chunk.write_dataframe(results[var], dst_paths[var], nodata[var], compress[var]) for var in self.variables: for pth in dst_paths[var]: if not Path(pth).exists(): self.create_vrt_from_chunks(pth)
[docs] def create_virtual_time_series(self, idx_virtual, dst_pattern, #"./xxx_uncontrolled/ls2008_vts4w/ls2008_vts4w_{date}_{var}.vrt" dtypes, compress="lzw", nodata=None, num_workers=1): dst_paths = create_names_virtual_time_series(dst_pattern=dst_pattern, idx_virtual=idx_virtual, variables=self.variables) self.apply_and_write_by_variable(fun=create_virtual_time_series, dst_paths=dst_paths, dtypes=dtypes, compress=compress, nodata=nodata, idx_virtual=idx_virtual, num_workers=num_workers, verbosity=0)
[docs] def create_statistical_metrics(self, percentiles, iqr, diffs, dst_pattern, #"./xxx_uncontrolled/ls2008_vts4w/ls2008_vts4w_{date}_{var}.vrt" dtypes, compress="lzw", nodata=None, num_workers=1): dst_paths = create_names_statistical_metrics(dst_pattern=dst_pattern, variables=self.variables, percentiles=percentiles, iqr=iqr, diffs=diffs) self.apply_and_write_by_variable(fun=create_statistical_metrics, dst_paths=dst_paths, dtypes=dtypes, compress=compress, nodata=nodata, percentiles=percentiles, iqr=iqr, diffs=diffs, num_workers=num_workers)
[docs]class EOCubeSceneCollectionChunk(EOCubeSceneCollectionAbstract, EOCubeChunk): def __init__(self, ji, df_layers, chunksize, variables, qa, qa_valid, # timeless=None, wdir=None): EOCubeSceneCollectionAbstract.__init__(self, df_layers=df_layers, chunksize=chunksize, variables=variables, qa=qa, qa_valid=qa_valid, wdir=wdir) EOCubeChunk.__init__(self, ji=ji, df_layers=df_layers, chunksize=chunksize, wdir=wdir)
[docs] def read_data_by_variable(self, mask=True): """Reads and masks (if desired) the data and converts it in one dataframe per variable.""" def print_elapsed_time(start, last_stopped, prefix): # print(f"{prefix} - Elapsed time [s] since start / last stopped: \ # {(int(time.time() - start_time))} / {(int(time.time() - last_stopped))}") return time.time() start_time = time.time() last_stopped = time.time() last_stopped = print_elapsed_time(start_time, last_stopped, "Starting chunk function") verbose = False self.read_data() last_stopped = print_elapsed_time(start_time, last_stopped, "Data read") # 2. sc_chunk = self.convert_data_to_dataframe() last_stopped = print_elapsed_time(start_time, last_stopped, "Data converted to df") # 3.B. if mask: # 3.A. ilocs_qa = np.where((self.df_layers["band"] == self.qa).values)[0] df_qa = self.data.iloc[:, ilocs_qa] df_qa.columns = self.df_layers["date"].iloc[ilocs_qa] df_clearsky = df_qa.isin(self.qa_valid) last_stopped = print_elapsed_time(start_time, last_stopped, "Clearsky df created") return_bands = self.variables else: return_bands = self.variables + [self.qa] dfs_variables = {} for var in return_bands: if verbose: print("VARIABLE:", var) ilocs_var = np.where((self.df_layers["band"] == var).values)[0] df_var = self.data.iloc[:, ilocs_var] df_var.columns = self.df_layers["date"].iloc[ilocs_var] if mask: df_var = df_var.where(df_clearsky, other=np.nan) dfs_variables[var] = df_var last_stopped = print_elapsed_time(start_time, last_stopped, "Clearsky df created") self._data = dfs_variables return self
# def create_names_statistical_metrics(dst_pattern, percentiles=None, iqr=True, diffs=True, variables=None): # if isinstance(variables, str): # var_input_is_str = True # # in this case we will later reduce the dictionary to the valuee of its single element # variables = [variables] # else: # var_input_is_str = True # if percentiles is None: # percentiles = [.1, .25, .50, .75, .9] # metrics = ['mean', 'std', 'min'] # metrics += [f'p{int(p*100):02d}' for p in percentiles] # metrics += ['max'] # if iqr: # metrics += ['p75-p25'] # if diffs: # metrics += ['max-min'] # if 0.05 in percentiles and 0.95 in percentiles: # metrics += ['p95-p05'] # if 0.10 in percentiles and 0.90 in percentiles: # metrics += ['p90-p10'] # dst_names = {} # for var in variables: # dst_names[var] = [] # for metric in metrics: # dst_names[var].append(dst_pattern.format(**{"var": var, "metric": metric})) # assert (len(metrics) * len(variables)) == sum([len(dst_names[var]) for var in variables]) # if var_input_is_str: # dst_names = dst_names[variables[0]] # return dst_names def _create_names_virtual_time_series(dst_pattern, idx_virtual): dst_names = [] for date in idx_virtual: dst_names.append(dst_pattern.format(**{"date": date.strftime("%Y-%m-%d")})) assert len(idx_virtual) == len(dst_names) return dst_names
[docs]def create_names_virtual_time_series(dst_pattern, idx_virtual, variables=None): if variables is None: dst_names = _create_names_virtual_time_series(dst_pattern, idx_virtual) elif isinstance(variables, str): dst_names = _create_names_virtual_time_series(dst_pattern=dst_pattern.replace("{var}", variables), idx_virtual=idx_virtual) else: dst_names = {} for var in variables: dst_names[var] = _create_names_virtual_time_series(dst_pattern=dst_pattern.replace("{var}", var), idx_virtual=idx_virtual) return dst_names
def _create_names_statistical_metrics(dst_pattern, percentiles=None, iqr=True, diffs=True): if percentiles is None: percentiles = [.1, .25, .50, .75, .9] metrics = ['mean', 'std', 'min'] metrics += [f'p{int(p*100):02d}' for p in percentiles] metrics += ['max'] if iqr: metrics += ['p75-p25'] if diffs: metrics += ['max-min'] if 0.05 in percentiles and 0.95 in percentiles: metrics += ['p95-p05'] if 0.10 in percentiles and 0.90 in percentiles: metrics += ['p90-p10'] dst_names = [] for metric in metrics: dst_names.append(dst_pattern.format(**{"metric": metric})) assert len(metrics) == len(dst_names) return dst_names
[docs]def create_names_statistical_metrics(dst_pattern, percentiles=None, iqr=True, diffs=True, variables=None): if variables is None: dst_names = _create_names_statistical_metrics(dst_pattern, percentiles=percentiles, iqr=iqr, diffs=diffs) elif isinstance(variables, str): dst_names = _create_names_statistical_metrics(dst_pattern=dst_pattern.replace("{var}", variables), percentiles=percentiles, iqr=iqr, diffs=diffs) else: dst_names = {} for var in variables: dst_names[var] = _create_names_statistical_metrics(dst_pattern=dst_pattern.replace("{var}", var), percentiles=percentiles, iqr=iqr, diffs=diffs) return dst_names
[docs]def scoll_df_to_var_dfs(scoll_df, df_layers, qa=None, qa_valid=None, verbose=True): """Convert a dataframe with variable and qa columns into a dictionary with masked variable dataframes. Parameters ---------- scoll_df : [pd.DataFrame] dataframe as returned by `EOCubeSceneCollectionChunk(...).read_data().convert_data_to_dataframe()` df_layers : [pd.DataFrame] See EOCubeSceneCollectionAbstract. qa : [str] See EOCubeSceneCollectionAbstract. qa_valid : [list] See EOCubeSceneCollectionAbstract. verbose : [bool] ... """ if qa is not None: mask=True assert qa_valid is not None, "qa is not None therefore it is assumed that qa_valid should be not None." else: mask=False if mask: # 3.A. ilocs_qa = np.where((df_layers["band"] == qa).values)[0] df_qa = scoll_df.iloc[:, ilocs_qa] df_qa.columns = df_layers["date"].iloc[ilocs_qa] df_clearsky = df_qa.isin(qa_valid) # last_stopped = print_elapsed_time(start_time, last_stopped, "Clearsky df created") return_bands = list(df_layers.band.unique()) return_bands.remove(qa) else: return_bands = df_layers.band.unique() dfs_variables = {} for var in return_bands: if verbose: print("VARIABLE:", var) ilocs_var = np.where((df_layers["band"] == var).values)[0] df_var = scoll_df.iloc[:, ilocs_var] df_var.columns = df_layers["date"].iloc[ilocs_var] if mask: df_var = df_var.where(df_clearsky, other=np.nan) dfs_variables[var] = df_var # last_stopped = print_elapsed_time(start_time, last_stopped, "Clearsky df created") return dfs_variables
[docs]def create_virtual_time_series(df_var, idx_virtual, colname_pattern=None, num_workers=1, verbosity=0): """Create a virtual time series from a dataframe with pd.DateTimeIndex and a instance (e.g. pixels) dimension. Parameters ---------- df_var : [type] [description] idx_virtual : [type] [description] colname_pattern : str Pattern for the column name, e.g. 'ls2010_vts2w_{date}_ndvi', where the data will be inserted from idx_virtual. num_workers : int, optional [description], by default 1 verbosity : int, optional [description], by default 0 Returns ------- [type] [description] """ def _create_virtual_time_series_core(df, idx_virtual, verbosity=0): if verbosity: print("Shape of input dataframe:", df.shape) transpose_before_return = False if isinstance(df.columns, pd.DatetimeIndex): transpose_before_return = True df = df.transpose() # define the virtual points in the time series index # idx_virtual = df.resample(rule).asfreq().index if not df.index.is_unique: if verbosity: print(f"Aggregating (max) data with > observations per day: {df.index[df.index.duplicated()]}") df = df.groupby(level=0).max() assert df.index.is_unique if verbosity: print("Length of virtual time series:", len(idx_virtual)) # add the existing time series points to the virtual time series index idx_virtual_and_data = idx_virtual.append(df.index).unique() idx_virtual_and_data = idx_virtual_and_data.sort_values() if verbosity: print("Length of virtual and data time series:", len(idx_virtual_and_data)) # extend the time series data such that it contains all existing and virtual time series points df = df.reindex(index=idx_virtual_and_data) # interpolate between dates and forward/backward fill edges with closest values float_columns = df.dtypes.astype(str).str.contains("float") if not float_columns.all(): # since interpolate would not work otherwise warnings.warn(f"Converting the following columns to float for termporal interpolation: {df.columns[~float_columns]}") df.loc[:, ~float_columns] = df.loc[:, ~float_columns].astype(float) df = df.interpolate(method='time') df = df.bfill() df = df.ffill() df = df.loc[idx_virtual] if transpose_before_return: df = df.transpose() if verbosity: print("Shape of output dataframe:", df.shape) return df if (num_workers > 1) or (num_workers == -1): import dask.dataframe as dd df_var = dd.from_pandas(df_var, npartitions=num_workers) df_result = df_var.map_partitions(_create_virtual_time_series_core, idx_virtual=idx_virtual, verbosity=verbosity) df_result = df_result.compute(scheduler='processes', num_workers=num_workers) else: df_result = _create_virtual_time_series_core(df_var, idx_virtual, verbosity=verbosity) if colname_pattern is not None: colnames = create_names_virtual_time_series(dst_pattern=colname_pattern, idx_virtual=idx_virtual) for col, name in zip(df_result.columns.strftime("%Y-%m-%d"), colnames): if col not in name: raise Exception(f"Mismatch between column {col} and name {name}.") df_result.columns = colnames return df_result
[docs]def create_statistical_metrics(df_var, percentiles=None, iqr=True, diffs=True, colname_pattern=None, num_workers=1, rename2p=False, verbosity=0): """Calculate statistial metrics from a dataframe with pd.DateTimeIndex and a instance (e.g. pixels) dimension. Parameters ---------- df_var : [type] [description] percentiles : [type] [description] iqr : [type] [description] num_workers : int, optional [description], by default 1 rename2p : bool, optional Rename %-columns (e.g 90%) to p-columns (e.g. p90) verbosity : int, optional [description], by default 0 Returns ------- [type] [description] """ def _calc_statistical_metrics(df, percentiles=None, iqr=True, diffs=True): """Calculate statistical metrics and the count of valid observations.""" metrics_df = df.transpose().describe(percentiles=percentiles).transpose() if iqr and all(np.isin(["25%", "75%"], metrics_df.columns)): metrics_df["p75-p25"] = metrics_df["75%"] - metrics_df["25%"] # other differences if diffs and all(np.isin(["min", "max"], metrics_df.columns)): metrics_df["max-min"] = metrics_df["max"] - metrics_df["min"] if diffs and all(np.isin(["5%", "95%"], metrics_df.columns)): metrics_df["p95-p05"] = metrics_df["95%"] - metrics_df["5%"] if diffs and all(np.isin(["10%", "90%"], metrics_df.columns)): metrics_df["p90-p10"] = metrics_df["90%"] - metrics_df["10%"] metrics_df = metrics_df.drop(labels=["count"], axis=1) return metrics_df if percentiles is None: percentiles = [.1, .25, .50, .75, .9] if (num_workers > 1) or (num_workers == -1): import dask.dataframe as dd df_var = dd.from_pandas(df_var, npartitions=num_workers) df_result = df_var.map_partitions(_calc_statistical_metrics, percentiles=percentiles, iqr=iqr, diffs=diffs) df_result = df_result.compute(scheduler='processes', num_workers=num_workers) else: df_result = _calc_statistical_metrics(df_var, percentiles, iqr, diffs=diffs) if colname_pattern or rename2p: df_result.columns = [f"p{col.replace('%', '')}" if '%' in col else col for col in df_result.columns] if colname_pattern is not None: colnames = create_names_statistical_metrics(colname_pattern, percentiles=percentiles, iqr=iqr, diffs=diffs) for col, name in zip(df_result.columns, colnames): if col not in name: raise Exception(f"Mismatch between column {col} and name {name}.") df_result.columns = colnames return df_result