Backend

The DPSQL+ Backend handles the low-level operations required to execute differentially private SQL queries.

We provide backends for different data storage systems, including PySpark, SQLite, and DuckDB.

class dpsql.backend.DuckDBBackend(conn: DuckDBPyConnection)[source]

Backend for executing DuckDB queries with differential privacy mechanisms.

Parameters:

conn (duckdb.DuckDBPyConnection) – A live DuckDB connection object.

apply_aggregation(agg_type: Aggregation, column_name: list[str], df: DataFrame | DataFrame | DataFrame, group_by: list[str], clipping_threshold: list[tuple[float, float]] | None = None) Series[source]

Apply noisy aggregation to the DataFrame.

Parameters:
  • agg_type (Aggregation) – The type of aggregation to apply.

  • column_name (list[str]) – The column name(s) to aggregate.

  • df (DataFrame) – The DataFrame to aggregate.

  • group_by (list[str]) – The list of columns to group by.

  • clipping_threshold (list[tuple[float, float]] | None) – The clipping thresholds for each column.

Returns:

The DataFrame with aggregation applied.

Return type:

DataFrame

contribution_bound(inner_df: DataFrame | DataFrame | DataFrame, privacy_unit: str, params: DPParams) DataFrame[source]

Apply contribution bounding to the intermediate DataFrame.

Parameters:
  • inner_df (DataFrameLike) – The intermediate DataFrame.

  • privacy_unit (str) – The column name to use as the privacy unit.

  • params (DPParams) – The differential privacy parameters to use.

Returns:

The filtered DataFrame.

Return type:

DataFrameLike

create_inner_df(inner_sql: str) DataFrame[source]

Create the intermediate DataFrame.

Parameters:

sql (str) – The SQL query to execute.

Returns:

The intermediate DataFrame.

Return type:

DataFrameLike

create_temporary_table(df: DataFrame, table_name: str, index: bool = True) None[source]

Create a temporary table in the database.

Parameters:
  • df (DataFrame) – The DataFrame to create the temporary table from.

  • table_name (str) – The name of the temporary table.

  • index (bool) – Whether to include the index as a column in the temporary table.

filter_by_selected_keys(df: DataFrame | DataFrame | DataFrame, group_by: list[str], selected_keys: list[tuple[str, ...]]) DataFrame[source]

Filter DataFrame to only include records with selected keys.

Parameters:
  • df (DataFrameLike) – The DataFrame to filter.

  • group_by (list[str]) – The list of columns to group by.

  • selected_keys (list[tuple[str, ...]]) – List containing the selected keys.

Returns:

The filtered DataFrame.

Return type:

DataFrameLike

get_column_name(table_name: str) list[str][source]

Get the list of columns in the table.

Parameters:

table_name (str) – The name of the table.

Returns:

The list of column names.

Return type:

list[str]

get_table_name() list[str][source]

Get the list of tables in the database.

Returns:

The list of table names.

Return type:

list[str]

use_database(database_name: str | None) None[source]

Use a database.

Parameters:

database_name (str) – The name of the database to use.

class dpsql.backend.SQLBackend[source]

Abstract class for executing SQL queries with differential privacy mechanisms.

abstractmethod apply_aggregation(agg_type: Aggregation, column_name: list[str], df: DataFrame | DataFrame | DataFrame, group_by: list[str], clipping_threshold: list[tuple[float, float]] | None = None) Series[source]

Apply noisy aggregation to the DataFrame.

Parameters:
  • agg_type (Aggregation) – The type of aggregation to apply.

  • column_name (list[str]) – The column name(s) to aggregate.

  • df (DataFrame) – The DataFrame to aggregate.

  • group_by (list[str]) – The list of columns to group by.

  • clipping_threshold (list[tuple[float, float]] | None) – The clipping thresholds for each column.

Returns:

The DataFrame with aggregation applied.

Return type:

DataFrame

abstractmethod contribution_bound(inner_df: DataFrame | DataFrame | DataFrame, privacy_unit: str, params: DPParams) DataFrame | DataFrame | DataFrame[source]

Apply contribution bounding to the intermediate DataFrame.

Parameters:
  • inner_df (DataFrameLike) – The intermediate DataFrame.

  • privacy_unit (str) – The column name to use as the privacy unit.

  • params (DPParams) – The differential privacy parameters to use.

Returns:

The filtered DataFrame.

Return type:

DataFrameLike

create_final_df(filtered_df: DataFrame | DataFrame | DataFrame, agg_columns: list[AggregationColumn], group_by: list[str], sigmas: list[float], clipping_thresholds: list[list[tuple[float, float]] | None]) DataFrame[source]

Create the final DataFrame with noisy aggregation applied.

Parameters:
  • filtered_df (DataFrameLike) – The filtered DataFrame.

  • agg_columns (list[AggregationColumn]) – The list of aggregation columns to use.

  • group_by (list[str]) – The list of columns to group by.

  • sigmas (list[float]) – The list of noise standard deviations for each aggregation.

  • clipping_thresholds (list[list[tuple[float, float]] | None]) – The list of clipping thresholds for each aggregation column.

Returns:

The final DataFrame with DP mechanisms applied.

abstractmethod create_inner_df(inner_sql: str) DataFrame | DataFrame | DataFrame[source]

Create the intermediate DataFrame.

Parameters:

sql (str) – The SQL query to execute.

Returns:

The intermediate DataFrame.

Return type:

DataFrameLike

abstractmethod create_temporary_table(df: DataFrame, table_name: str, index: bool = True) None[source]

Create a temporary table in the database.

Parameters:
  • df (DataFrame) – The DataFrame to create the temporary table from.

  • table_name (str) – The name of the temporary table.

  • index (bool) – Whether to include the index as a column in the temporary table.

execute_sql(privacy_unit: str, params: DPParams, inner_sql: str, agg_columns: list[AggregationColumn], group_by_columns: list[str], ordering_terms: list[dict[str, str | None]], limit: int | None = None, offset: int | None = None) DataFrame[source]

Execute a SQL query with differential privacy mechanisms.

Parameters:
  • privacy_unit (str) – The column name to use as the privacy unit.

  • params (DPParams) – The differential privacy parameters to use.

  • inner_sql (str) – The SQL query to create the intermediate table.

  • agg_columns (list[AggregationColumn]) – The list of aggregation columns to use.

  • group_by_columns (list[str]) – The list of columns to group by.

  • ordering_terms (list[dict[str, str | None]]) – The list of ordering terms for the final result.

  • limit (int | None) – The maximum number of rows to return.

  • offset (int | None) – The number of rows to skip before returning results.

Returns:

The result of the SQL query with DP mechanisms applied.

Return type:

DataFrame

abstractmethod filter_by_selected_keys(df: DataFrame | DataFrame | DataFrame, group_by: list[str], selected_keys: list[tuple[str, ...]]) DataFrame | DataFrame | DataFrame[source]

Filter DataFrame to only include records with selected keys.

Parameters:
  • df (DataFrameLike) – The DataFrame to filter.

  • group_by (list[str]) – The list of columns to group by.

  • selected_keys (list[tuple[str, ...]]) – List containing the selected keys.

Returns:

The filtered DataFrame.

Return type:

DataFrameLike

abstractmethod get_column_name(table_name: str) list[str][source]

Get the list of columns in the table.

Parameters:

table_name (str) – The name of the table.

Returns:

The list of column names.

Return type:

list[str]

abstractmethod get_table_name() list[str][source]

Get the list of tables in the database.

Returns:

The list of table names.

Return type:

list[str]

key_selection(filtered_df: DataFrame | DataFrame | DataFrame, group_by: list[str], privacy_unit: str, min_frequency: int, sigma: float, tau: float) list[tuple[str, ...]][source]

Perform key selection (tau-thresholding) to determine which keys to keep.

Parameters:
  • filtered_df (DataFrameLike) – The filtered DataFrame.

  • group_by (list[str]) – The list of columns to group by.

  • privacy_unit (str) – The column name to use as the privacy unit.

  • min_frequency (int) – The threshold for first thresholding before adding noise. It satisfies minimum frequency rule.

  • sigma (float) – The standard deviation for the Gaussian mechanism before the second thresholding.

  • tau (float) – The threshold for second thresholding after adding noise.

Returns:

List containing the selected keys that pass the threshold.

Return type:

list[tuple[str, …]]

abstractmethod use_database(database_name: str | None) None[source]

Use a database.

Parameters:

database_name (str) – The name of the database to use.

class dpsql.backend.SQLiteBackend(conn: Connection)[source]

Backend for executing SQLite queries with differential privacy mechanisms.

Parameters:

conn (sqlite3.Connection) – A live SQLite connection object.

apply_aggregation(agg_type: Aggregation, column_name: list[str], df: DataFrame | DataFrame | DataFrame, group_by: list[str], clipping_threshold: list[tuple[float, float]] | None = None) Series[source]

Apply noisy aggregation to the DataFrame.

Parameters:
  • agg_type (Aggregation) – The type of aggregation to apply.

  • column_name (list[str]) – The column name(s) to aggregate.

  • df (DataFrame) – The DataFrame to aggregate.

  • group_by (list[str]) – The list of columns to group by.

  • clipping_threshold (list[tuple[float, float]] | None) – The clipping thresholds for each column.

Returns:

The DataFrame with aggregation applied.

Return type:

DataFrame

contribution_bound(inner_df: DataFrame | DataFrame | DataFrame, privacy_unit: str, params: DPParams) DataFrame[source]

Apply contribution bounding to the intermediate DataFrame.

Parameters:
  • inner_df (DataFrameLike) – The intermediate DataFrame.

  • privacy_unit (str) – The column name to use as the privacy unit.

  • params (DPParams) – The differential privacy parameters to use.

Returns:

The filtered DataFrame.

Return type:

DataFrameLike

create_inner_df(inner_sql: str) DataFrame[source]

Create the intermediate DataFrame.

Parameters:

sql (str) – The SQL query to execute.

Returns:

The intermediate DataFrame.

Return type:

DataFrameLike

create_temporary_table(df: DataFrame, table_name: str, index: bool = True) None[source]

Create a temporary table in the database.

Parameters:
  • df (DataFrame) – The DataFrame to create the temporary table from.

  • table_name (str) – The name of the temporary table.

  • index (bool) – Whether to include the index as a column in the temporary table.

filter_by_selected_keys(df: DataFrame | DataFrame | DataFrame, group_by: list[str], selected_keys: list[tuple[str, ...]]) DataFrame[source]

Filter DataFrame to only include records with selected keys.

Parameters:
  • df (DataFrameLike) – The DataFrame to filter.

  • group_by (list[str]) – The list of columns to group by.

  • selected_keys (list[tuple[str, ...]]) – List containing the selected keys.

Returns:

The filtered DataFrame.

Return type:

DataFrameLike

get_column_name(table_name: str) list[str][source]

Get the list of columns in the table.

Parameters:

table_name (str) – The name of the table.

Returns:

The list of column names.

Return type:

list[str]

get_table_name() list[str][source]

Get the list of tables in the database.

Returns:

The list of table names.

Return type:

list[str]

is_inmemory_db() bool[source]
use_database(database_name: str | None) None[source]

Use a database.

Parameters:

database_name (str) – The name of the database to use.

class dpsql.backend.SparkSQLBackend(spark_session: SparkSession)[source]

Backend for executing SparkSQL queries with differential privacy mechanisms.

Parameters:

spark_session (SparkSession) – The Spark session

apply_aggregation(agg_type: Aggregation, column_name: list[str], df: DataFrame | DataFrame | DataFrame, group_by: list[str], clipping_threshold: list[tuple[float, float]] | None = None) Series[source]

Apply noisy aggregation to the DataFrame.

Parameters:
  • agg_type (Aggregation) – The type of aggregation to apply.

  • column_name (list[str]) – The column name(s) to aggregate.

  • df (DataFrame) – The DataFrame to aggregate.

  • group_by (list[str]) – The list of columns to group by.

  • clipping_threshold (list[tuple[float, float]] | None) – The clipping thresholds for each column.

Returns:

The DataFrame with aggregation applied.

Return type:

DataFrame

contribution_bound(inner_df: DataFrame | DataFrame | DataFrame, privacy_unit: str, params: DPParams) DataFrame | DataFrame | DataFrame[source]

Apply contribution bounding to the intermediate DataFrame.

Parameters:
  • inner_df (DataFrameLike) – The intermediate DataFrame.

  • privacy_unit (str) – The column name to use as the privacy unit.

  • params (DPParams) – The differential privacy parameters to use.

Returns:

The filtered DataFrame.

Return type:

DataFrameLike

create_inner_df(inner_sql: str) DataFrame[source]

Create the intermediate DataFrame.

Parameters:

sql (str) – The SQL query to execute.

Returns:

The intermediate DataFrame.

Return type:

DataFrameLike

create_temporary_table(df: DataFrame, table_name: str, index: bool = True) None[source]

Create a temporary table in the database.

Parameters:
  • df (DataFrame) – The DataFrame to create the temporary table from.

  • table_name (str) – The name of the temporary table.

  • index (bool) – Whether to include the index as a column in the temporary table.

filter_by_selected_keys(df: DataFrame | DataFrame | DataFrame, group_by: list[str], selected_keys: list[tuple[str, ...]]) DataFrame | DataFrame | DataFrame[source]

Filter DataFrame to only include records with selected keys.

Parameters:
  • df (DataFrameLike) – The DataFrame to filter.

  • group_by (list[str]) – The list of columns to group by.

  • selected_keys (list[tuple[str, ...]]) – List containing the selected keys.

Returns:

The filtered DataFrame.

Return type:

DataFrameLike

get_column_name(table_name: str) list[str][source]

Get the list of columns in the table.

Parameters:

table_name (str) – The name of the table.

Returns:

The list of column names.

Return type:

list[str]

get_table_name() list[str][source]

Get the list of tables in the database.

Returns:

The list of table names.

Return type:

list[str]

use_database(database_name: str | None) None[source]

Use a database.

Parameters:

database_name (str) – The name of the database to use.