Source code for aiida_dataframe.data.dataframe

"""
This module defines a AiiDA Data plugin for pandas DataFrames to be
stored in the file repository as HDF5 files
"""
from __future__ import annotations

import hashlib
from pathlib import Path
import shutil
import tempfile
from typing import Any

import pandas as pd
from pandas.util import hash_pandas_object

from aiida.common import exceptions
from aiida.orm import SinglefileData


[docs]class PandasFrameData(SinglefileData): """ Data plugin for pandas DataFrame objects. Dataframes are serialized to Hdf5 using the :py:meth:`~pandas.DataFrame.to_hdf()` method and stored in the file repository and are deserialized using :py:func:`~pandas.read_hdf()` The whole DataFrame can be retrieved by using the :py:meth:`df` property The names of columns and indices are stored in attributes to be queryable through the database :param df: pandas Dataframe """ DEFAULT_FILENAME = "dataframe.h5"
[docs] def __init__( self, df: pd.DataFrame, filename: str | None = None, **kwargs: Any ) -> None: if df is None: raise TypeError("the `df` argument cannot be `None`.") if not isinstance(df, pd.DataFrame): raise TypeError("the `df` argument is not a pandas DataFrame.") super().__init__(None, **kwargs) self._update_dataframe(df, filename=filename) self._df = df
[docs] def _update_dataframe(self, df: pd.DataFrame, filename: str | None = None) -> None: """ Update the stored HDF5 file. Raises if the node is already stored """ if self.is_stored: raise exceptions.ModificationNotAllowed( "cannot update the DataFrame on a stored node" ) if filename is None: try: filename = self.filename except AttributeError: filename = self.DEFAULT_FILENAME with tempfile.TemporaryDirectory() as td: df.to_hdf(Path(td) / self.DEFAULT_FILENAME, "w", format="table") with open(Path(td) / self.DEFAULT_FILENAME, "rb") as file: self.set_file(file, filename=filename) self.set_attribute("_pandas_data_hash", self._hash_dataframe(df)) self.set_attribute("index", list(df.index)) self.set_attribute("columns", list(df.columns.to_flat_index())) self._df = df
[docs] @staticmethod def _hash_dataframe(df): """ Return a hash corresponding to the Data inside the dataframe (not column names) """ return hashlib.sha256(hash_pandas_object(df, index=True).values).hexdigest()
[docs] def _get_dataframe_from_repo(self) -> pd.DataFrame: """ Get dataframe associated with this node from the file repository. """ with tempfile.TemporaryDirectory() as td: with open(Path(td) / self.filename, "wb") as temp_handle: with self.open(self.filename, mode="rb") as file: # Copy the content of source to target in chunks shutil.copyfileobj(file, temp_handle) # type: ignore[arg-type] # Workaround for empty dataframe with pd.HDFStore( Path(td) / self.filename, mode="r", errors="strict" ) as store: if len(store.groups()) == 0: return pd.DataFrame([], columns=self.get_attribute("columns")) return pd.read_hdf(store)
[docs] def _get_dataframe(self) -> pd.DataFrame: """ Get dataframe associated with this node. """ try: self._df except AttributeError: self._df = self._get_dataframe_from_repo() if self.is_stored: return self._df.copy(deep=True) return self._df
@property def df(self) -> pd.DataFrame: """ Return the pandas DataFrame instance associated with this node """ return self._get_dataframe() @df.setter def df(self, df: pd.DataFrame) -> None: """ Update the associated dataframe """ self._update_dataframe(df)
[docs] def store(self, *args, **kwargs) -> PandasFrameData: """ Store the node. Before the node is stored sync the HDF5 storage with the _df attribute on the node This catches changes to the node made by using setitem on the dataframe e.g. `df["A"] = new_value` This is only done if the hashes of the DATA does not match up """ if not self.is_stored: # Check if the dataframe directly attached to the node # has been mutated in place before storing # If so the underlying file is updated current_hash = self._hash_dataframe(self._df) if current_hash != self.get_attribute("_pandas_data_hash"): self._update_dataframe(self._df) return super().store(*args, **kwargs)