Backend
The DPSQL+ Backend handles the low-level operations required to execute differentially private SQL queries.
We provide backends for different data storage systems, including PySpark, SQLite, and DuckDB.
- class dpsql.backend.DuckDBBackend(conn: DuckDBPyConnection)[source]
Backend for executing DuckDB queries with differential privacy mechanisms.
- Parameters:
conn (duckdb.DuckDBPyConnection) – A live DuckDB connection object.
- apply_aggregation(agg_type: Aggregation, column_name: list[str], df: DataFrame | DataFrame | DataFrame, group_by: list[str], clipping_threshold: list[tuple[float, float]] | None = None) Series[source]
Apply noisy aggregation to the DataFrame.
- Parameters:
agg_type (Aggregation) – The type of aggregation to apply.
column_name (list[str]) – The column name(s) to aggregate.
df (DataFrame) – The DataFrame to aggregate.
group_by (list[str]) – The list of columns to group by.
clipping_threshold (list[tuple[float, float]] | None) – The clipping thresholds for each column.
- Returns:
The DataFrame with aggregation applied.
- Return type:
DataFrame
- contribution_bound(inner_df: DataFrame | DataFrame | DataFrame, privacy_unit: str, params: DPParams) DataFrame[source]
Apply contribution bounding to the intermediate DataFrame.
- Parameters:
inner_df (DataFrameLike) – The intermediate DataFrame.
privacy_unit (str) – The column name to use as the privacy unit.
params (DPParams) – The differential privacy parameters to use.
- Returns:
The filtered DataFrame.
- Return type:
DataFrameLike
- create_inner_df(inner_sql: str) DataFrame[source]
Create the intermediate DataFrame.
- Parameters:
sql (str) – The SQL query to execute.
- Returns:
The intermediate DataFrame.
- Return type:
DataFrameLike
- create_temporary_table(df: DataFrame, table_name: str, index: bool = True) None[source]
Create a temporary table in the database.
- Parameters:
df (DataFrame) – The DataFrame to create the temporary table from.
table_name (str) – The name of the temporary table.
index (bool) – Whether to include the index as a column in the temporary table.
- filter_by_selected_keys(df: DataFrame | DataFrame | DataFrame, group_by: list[str], selected_keys: list[tuple[str, ...]]) DataFrame[source]
Filter DataFrame to only include records with selected keys.
- Parameters:
df (DataFrameLike) – The DataFrame to filter.
group_by (list[str]) – The list of columns to group by.
selected_keys (list[tuple[str, ...]]) – List containing the selected keys.
- Returns:
The filtered DataFrame.
- Return type:
DataFrameLike
- get_column_name(table_name: str) list[str][source]
Get the list of columns in the table.
- Parameters:
table_name (str) – The name of the table.
- Returns:
The list of column names.
- Return type:
list[str]
- class dpsql.backend.SQLBackend[source]
Abstract class for executing SQL queries with differential privacy mechanisms.
- abstractmethod apply_aggregation(agg_type: Aggregation, column_name: list[str], df: DataFrame | DataFrame | DataFrame, group_by: list[str], clipping_threshold: list[tuple[float, float]] | None = None) Series[source]
Apply noisy aggregation to the DataFrame.
- Parameters:
agg_type (Aggregation) – The type of aggregation to apply.
column_name (list[str]) – The column name(s) to aggregate.
df (DataFrame) – The DataFrame to aggregate.
group_by (list[str]) – The list of columns to group by.
clipping_threshold (list[tuple[float, float]] | None) – The clipping thresholds for each column.
- Returns:
The DataFrame with aggregation applied.
- Return type:
DataFrame
- abstractmethod contribution_bound(inner_df: DataFrame | DataFrame | DataFrame, privacy_unit: str, params: DPParams) DataFrame | DataFrame | DataFrame[source]
Apply contribution bounding to the intermediate DataFrame.
- Parameters:
inner_df (DataFrameLike) – The intermediate DataFrame.
privacy_unit (str) – The column name to use as the privacy unit.
params (DPParams) – The differential privacy parameters to use.
- Returns:
The filtered DataFrame.
- Return type:
DataFrameLike
- create_final_df(filtered_df: DataFrame | DataFrame | DataFrame, agg_columns: list[AggregationColumn], group_by: list[str], sigmas: list[float], clipping_thresholds: list[list[tuple[float, float]] | None]) DataFrame[source]
Create the final DataFrame with noisy aggregation applied.
- Parameters:
filtered_df (DataFrameLike) – The filtered DataFrame.
agg_columns (list[AggregationColumn]) – The list of aggregation columns to use.
group_by (list[str]) – The list of columns to group by.
sigmas (list[float]) – The list of noise standard deviations for each aggregation.
clipping_thresholds (list[list[tuple[float, float]] | None]) – The list of clipping thresholds for each aggregation column.
- Returns:
The final DataFrame with DP mechanisms applied.
- abstractmethod create_inner_df(inner_sql: str) DataFrame | DataFrame | DataFrame[source]
Create the intermediate DataFrame.
- Parameters:
sql (str) – The SQL query to execute.
- Returns:
The intermediate DataFrame.
- Return type:
DataFrameLike
- abstractmethod create_temporary_table(df: DataFrame, table_name: str, index: bool = True) None[source]
Create a temporary table in the database.
- Parameters:
df (DataFrame) – The DataFrame to create the temporary table from.
table_name (str) – The name of the temporary table.
index (bool) – Whether to include the index as a column in the temporary table.
- execute_sql(privacy_unit: str, params: DPParams, inner_sql: str, agg_columns: list[AggregationColumn], group_by_columns: list[str], ordering_terms: list[dict[str, str | None]], limit: int | None = None, offset: int | None = None) DataFrame[source]
Execute a SQL query with differential privacy mechanisms.
- Parameters:
privacy_unit (str) – The column name to use as the privacy unit.
params (DPParams) – The differential privacy parameters to use.
inner_sql (str) – The SQL query to create the intermediate table.
agg_columns (list[AggregationColumn]) – The list of aggregation columns to use.
group_by_columns (list[str]) – The list of columns to group by.
ordering_terms (list[dict[str, str | None]]) – The list of ordering terms for the final result.
limit (int | None) – The maximum number of rows to return.
offset (int | None) – The number of rows to skip before returning results.
- Returns:
The result of the SQL query with DP mechanisms applied.
- Return type:
DataFrame
- abstractmethod filter_by_selected_keys(df: DataFrame | DataFrame | DataFrame, group_by: list[str], selected_keys: list[tuple[str, ...]]) DataFrame | DataFrame | DataFrame[source]
Filter DataFrame to only include records with selected keys.
- Parameters:
df (DataFrameLike) – The DataFrame to filter.
group_by (list[str]) – The list of columns to group by.
selected_keys (list[tuple[str, ...]]) – List containing the selected keys.
- Returns:
The filtered DataFrame.
- Return type:
DataFrameLike
- abstractmethod get_column_name(table_name: str) list[str][source]
Get the list of columns in the table.
- Parameters:
table_name (str) – The name of the table.
- Returns:
The list of column names.
- Return type:
list[str]
- abstractmethod get_table_name() list[str][source]
Get the list of tables in the database.
- Returns:
The list of table names.
- Return type:
list[str]
- key_selection(filtered_df: DataFrame | DataFrame | DataFrame, group_by: list[str], privacy_unit: str, min_frequency: int, sigma: float, tau: float) list[tuple[str, ...]][source]
Perform key selection (tau-thresholding) to determine which keys to keep.
- Parameters:
filtered_df (DataFrameLike) – The filtered DataFrame.
group_by (list[str]) – The list of columns to group by.
privacy_unit (str) – The column name to use as the privacy unit.
min_frequency (int) – The threshold for first thresholding before adding noise. It satisfies minimum frequency rule.
sigma (float) – The standard deviation for the Gaussian mechanism before the second thresholding.
tau (float) – The threshold for second thresholding after adding noise.
- Returns:
List containing the selected keys that pass the threshold.
- Return type:
list[tuple[str, …]]
- class dpsql.backend.SQLiteBackend(conn: Connection)[source]
Backend for executing SQLite queries with differential privacy mechanisms.
- Parameters:
conn (sqlite3.Connection) – A live SQLite connection object.
- apply_aggregation(agg_type: Aggregation, column_name: list[str], df: DataFrame | DataFrame | DataFrame, group_by: list[str], clipping_threshold: list[tuple[float, float]] | None = None) Series[source]
Apply noisy aggregation to the DataFrame.
- Parameters:
agg_type (Aggregation) – The type of aggregation to apply.
column_name (list[str]) – The column name(s) to aggregate.
df (DataFrame) – The DataFrame to aggregate.
group_by (list[str]) – The list of columns to group by.
clipping_threshold (list[tuple[float, float]] | None) – The clipping thresholds for each column.
- Returns:
The DataFrame with aggregation applied.
- Return type:
DataFrame
- contribution_bound(inner_df: DataFrame | DataFrame | DataFrame, privacy_unit: str, params: DPParams) DataFrame[source]
Apply contribution bounding to the intermediate DataFrame.
- Parameters:
inner_df (DataFrameLike) – The intermediate DataFrame.
privacy_unit (str) – The column name to use as the privacy unit.
params (DPParams) – The differential privacy parameters to use.
- Returns:
The filtered DataFrame.
- Return type:
DataFrameLike
- create_inner_df(inner_sql: str) DataFrame[source]
Create the intermediate DataFrame.
- Parameters:
sql (str) – The SQL query to execute.
- Returns:
The intermediate DataFrame.
- Return type:
DataFrameLike
- create_temporary_table(df: DataFrame, table_name: str, index: bool = True) None[source]
Create a temporary table in the database.
- Parameters:
df (DataFrame) – The DataFrame to create the temporary table from.
table_name (str) – The name of the temporary table.
index (bool) – Whether to include the index as a column in the temporary table.
- filter_by_selected_keys(df: DataFrame | DataFrame | DataFrame, group_by: list[str], selected_keys: list[tuple[str, ...]]) DataFrame[source]
Filter DataFrame to only include records with selected keys.
- Parameters:
df (DataFrameLike) – The DataFrame to filter.
group_by (list[str]) – The list of columns to group by.
selected_keys (list[tuple[str, ...]]) – List containing the selected keys.
- Returns:
The filtered DataFrame.
- Return type:
DataFrameLike
- get_column_name(table_name: str) list[str][source]
Get the list of columns in the table.
- Parameters:
table_name (str) – The name of the table.
- Returns:
The list of column names.
- Return type:
list[str]
- class dpsql.backend.SparkSQLBackend(spark_session: SparkSession)[source]
Backend for executing SparkSQL queries with differential privacy mechanisms.
- Parameters:
spark_session (SparkSession) – The Spark session
- apply_aggregation(agg_type: Aggregation, column_name: list[str], df: DataFrame | DataFrame | DataFrame, group_by: list[str], clipping_threshold: list[tuple[float, float]] | None = None) Series[source]
Apply noisy aggregation to the DataFrame.
- Parameters:
agg_type (Aggregation) – The type of aggregation to apply.
column_name (list[str]) – The column name(s) to aggregate.
df (DataFrame) – The DataFrame to aggregate.
group_by (list[str]) – The list of columns to group by.
clipping_threshold (list[tuple[float, float]] | None) – The clipping thresholds for each column.
- Returns:
The DataFrame with aggregation applied.
- Return type:
DataFrame
- contribution_bound(inner_df: DataFrame | DataFrame | DataFrame, privacy_unit: str, params: DPParams) DataFrame | DataFrame | DataFrame[source]
Apply contribution bounding to the intermediate DataFrame.
- Parameters:
inner_df (DataFrameLike) – The intermediate DataFrame.
privacy_unit (str) – The column name to use as the privacy unit.
params (DPParams) – The differential privacy parameters to use.
- Returns:
The filtered DataFrame.
- Return type:
DataFrameLike
- create_inner_df(inner_sql: str) DataFrame[source]
Create the intermediate DataFrame.
- Parameters:
sql (str) – The SQL query to execute.
- Returns:
The intermediate DataFrame.
- Return type:
DataFrameLike
- create_temporary_table(df: DataFrame, table_name: str, index: bool = True) None[source]
Create a temporary table in the database.
- Parameters:
df (DataFrame) – The DataFrame to create the temporary table from.
table_name (str) – The name of the temporary table.
index (bool) – Whether to include the index as a column in the temporary table.
- filter_by_selected_keys(df: DataFrame | DataFrame | DataFrame, group_by: list[str], selected_keys: list[tuple[str, ...]]) DataFrame | DataFrame | DataFrame[source]
Filter DataFrame to only include records with selected keys.
- Parameters:
df (DataFrameLike) – The DataFrame to filter.
group_by (list[str]) – The list of columns to group by.
selected_keys (list[tuple[str, ...]]) – List containing the selected keys.
- Returns:
The filtered DataFrame.
- Return type:
DataFrameLike
- get_column_name(table_name: str) list[str][source]
Get the list of columns in the table.
- Parameters:
table_name (str) – The name of the table.
- Returns:
The list of column names.
- Return type:
list[str]