Source code for dpsql.dp_params

import logging
import warnings
from dataclasses import dataclass
from typing import TYPE_CHECKING

from scipy import stats

from .aggregation import Aggregation, AggregationColumn
from .errors import (
    AggregationError,
    InvalidPrivacyParametersError,
    UnsupportedQueryError,
)
from .utils import calibrate_analytic_gaussian_mechanism
from .validator.utils import get_privacy_definition

if TYPE_CHECKING:
    from .accountant import Accountant

logger = logging.getLogger(__name__)


[docs] @dataclass class DPParams: """ Parameters for differential privacy. Either of (epsilon, delta) or (sigmas, tau, sigma_for_thresholding) must be provided. Args: contribution_bound (int): The maximum number of records that can be contributed by a single privacy unit. clipping_thresholds (list[list[tuple[float, float]] | None]): The list of clipping parameters for input data, used to clip the input within the range [L, U] where each tuple is (L, U). Each element is either None (for COUNT/COUNT_DISTINCT) or a list of clipping thresholds for each column in a multi-column aggregation. min_frequency (int): The threshold for first thresholding before adding noise. It satisfies minimum frequency rule. (Default: 1) epsilon (float): The privacy budget for the query. delta (float): The probability of failure for the privacy guarantee. sigmas (list[float]): The standard deviations for the Gaussian mechanism. Deprecated: Use epsilon and delta instead. tau (float): The threshold for second thresholding after adding noise. It is expected to be greater than or equal to min_frequency. Deprecated: Use epsilon and delta instead. sigma_for_thresholding (float): The standard deviation for the Gaussian mechanism before the second thresholding. Deprecated: Use epsilon and delta instead. accountant_class (type[Accountant]): The privacy accountant class to use for calculating tight privacy budget allocation. """ contribution_bound: int clipping_thresholds: list[list[tuple[float, float]] | None] min_frequency: int = 1 epsilon: float | None = None delta: float | None = None sigmas: list[float] | None = None tau: float | None = None sigma_for_thresholding: float | None = None accountant_class: type["Accountant"] | None = None def __post_init__(self): # Validate the parameters logger.info("Validating DPParams") logger.debug( "Init values: contribution_bound=%s min_frequency=%s epsilon=%s delta=%s " "sigmas=%s tau=%s sigma_for_thresholding=%s", self.contribution_bound, self.min_frequency, self.epsilon, self.delta, self.sigmas, self.tau, self.sigma_for_thresholding, ) # Show deprecation warning if sigmas is used if self.sigmas is not None: warnings.warn( "The 'sigmas' parameter is deprecated." "Use 'epsilon' and 'delta' instead.", DeprecationWarning, stacklevel=2, ) # contribution_bound must be at least 1 if self.contribution_bound < 1: raise InvalidPrivacyParametersError( "Invalid `contribution_bound` (must be >= 1)", context={"contribution_bound": self.contribution_bound}, hint="Set contribution_bound to an integer >= 1", ) # min_frequency must be at least 1 if self.min_frequency < 1: raise InvalidPrivacyParametersError( "Invalid `min_frequency` (must be >= 1)", context={"min_frequency": self.min_frequency}, hint="Set min_frequency to an integer >= 1", ) if self.epsilon is None or self.delta is None: # If epsilon and delta are not set, sigmas, tau, # and sigma_for_thresholding must be set if ( self.sigmas is None or self.tau is None or self.sigma_for_thresholding is None ): raise InvalidPrivacyParametersError( "Missing required privacy parameters " "(provide either (epsilon, delta) " "or (sigmas, tau, sigma_for_thresholding))", context={ "epsilon": self.epsilon, "delta": self.delta, "sigmas": self.sigmas, "tau": self.tau, "sigma_for_thresholding": self.sigma_for_thresholding, }, hint="Supply epsilon & delta OR sigmas, tau, " "sigma_for_thresholding", ) logger.debug("Using (sigmas, tau, sigma_for_thresholding) mode") # tau must be greater than or equal to min_frequency if self.tau < self.min_frequency: raise InvalidPrivacyParametersError( "Invalid `tau` (must be >= min_frequency)", context={"tau": self.tau, "min_frequency": self.min_frequency}, hint="Increase tau or decrease min_frequency", ) # sigmas and sigma_for_thresholding must be non-negative if any(s < 0 for s in self.sigmas) or self.sigma_for_thresholding < 0: raise InvalidPrivacyParametersError( "Invalid noise scales " "(sigmas and sigma_for_thresholding must be >= 0)", context={ "sigmas": self.sigmas, "sigma_for_thresholding": self.sigma_for_thresholding, }, hint="Ensure all noise scale parameters are >= 0", ) else: # epsilon and delta must be positive and delta must be <= 1 if self.epsilon <= 0 or self.delta <= 0 or self.delta > 1: raise InvalidPrivacyParametersError( "Invalid (epsilon, delta) " "(epsilon > 0 and 0 < delta <= 1 required)", context={"epsilon": self.epsilon, "delta": self.delta}, hint="Use epsilon > 0 and 0 < delta <= 1", ) logger.info("DPParams validation completed")
[docs] def get_noise_parameters( self, sensitivities: list[float] ) -> tuple[list[float], float, float]: """ Get the noise parameters (sigmas, tau, sigma_for_thresholding). Args: sensitivities (list[float]): The sensitivities of the aggregation functions. Returns: tuple[list[float], float, float]: The noise parameters (sigmas, tau, sigma_for_thresholding). """ logger.info("Computing noise parameters") logger.debug("Sensitivities: %s", sensitivities) # If all noise parameters are already set, return them if ( self.tau is not None and self.sigma_for_thresholding is not None and self.sigmas is not None ): return (self.sigmas, self.tau, self.sigma_for_thresholding) # Error handling if self.epsilon is None or self.delta is None: raise InvalidPrivacyParametersError( "Missing (epsilon, delta) to compute tau and sigma_for_thresholding", context={"epsilon": self.epsilon, "delta": self.delta}, hint="Initialize DPParams with epsilon " "and delta or preset tau & sigma_for_thresholding", ) if len(sensitivities) != len(self.clipping_thresholds): raise InvalidPrivacyParametersError( "Mismatch between sensitivities and clipping thresholds count", context={ "num_sensitivities": len(sensitivities), "num_clipping_thresholds": len(self.clipping_thresholds), }, hint="Provide one clipping threshold per aggregation column", ) if self.accountant_class is None: # Directly use epsilon and delta without binary search logger.debug("No accountant_class; computing directly with epsilon/delta") sigmas, tau, sigma_for_thresholding = self._compute_noise_parameters( self.epsilon, self.delta, sensitivities ) else: # For other accountants, use binary search # to find optimal epsilon allocation logger.debug( "Using accountant_class=%s for epsilon allocation", self.accountant_class, ) accountant = self.accountant_class(self.epsilon, self.delta) epsilon_ok = self._binary_search_epsilon( self.epsilon, self.delta, sensitivities, accountant ) sigmas, tau, sigma_for_thresholding = self._compute_noise_parameters( epsilon_ok, self.delta, sensitivities ) logger.debug( "Noise parameters computed: sigmas=%s tau=%s sigma_for_thresholding=%s", sigmas, tau, sigma_for_thresholding, ) return sigmas, tau, sigma_for_thresholding
def _binary_search_epsilon( self, epsilon: float, delta: float, sensitivities: list[float], accountant: "Accountant", relative_max_epsilon: float = 2**5, relative_tol: float = 1 / 2**5, ) -> float: """Binary search to find the maximum epsilon that yields noisy parameters satisfying the budget. Args: epsilon (float): The total privacy budget epsilon. delta (float): The total privacy budget delta. sensitivities (list[float]): The sensitivities of the aggregation functions. accountant (Accountant): The privacy accountant to use. relative_max_epsilon (float): The relative maximum epsilon for the search. relative_tol (float): The relative tolerance for convergence. Returns: float: The maximum epsilon satisfying the budget. """ logger.debug("Starting binary search for epsilon") logger.debug( "Initial: epsilon=%s delta=%s rel_max=%s rel_tol=%s", epsilon, delta, relative_max_epsilon, relative_tol, ) epsilon_ok = epsilon epsilon_ng = epsilon * relative_max_epsilon # initial upper bound while epsilon_ng - epsilon_ok > epsilon * relative_tol: epsilon_mid = (epsilon_ng + epsilon_ok) / 2 sigmas, tau, sigma_for_thresholding = self._compute_noise_parameters( epsilon_mid, delta, sensitivities ) ok = accountant._check_budget( sensitivities=sensitivities, params=DPParams( contribution_bound=self.contribution_bound, clipping_thresholds=self.clipping_thresholds, min_frequency=self.min_frequency, sigmas=sigmas, tau=tau, sigma_for_thresholding=sigma_for_thresholding, ), ) if ok: epsilon_ok = epsilon_mid else: epsilon_ng = epsilon_mid logger.debug( "Mid epsilon=%s -> ok=%s (epsilon_ok=%s epsilon_ng=%s)", epsilon_mid, ok, epsilon_ok, epsilon_ng, ) # epsilon_ok is guaranteed to be greater than or equal to epsilon # because the initial lower bound is set to epsilon logger.debug("Binary search completed: epsilon_ok=%s", epsilon_ok) return epsilon_ok def _compute_noise_parameters( self, epsilon: float, delta: float, sensitivities: list[float] ): logger.debug( "Compute noise params: epsilon=%s delta=%s sensitivities=%s", epsilon, delta, sensitivities, ) epsilon_per_agg = epsilon / (len(sensitivities) + 1) delta_per_agg = delta / (len(sensitivities) + 1) # calculate variance for each aggregation function sigmas = [ calibrate_analytic_gaussian_mechanism( epsilon_per_agg, delta_per_agg, sensitivity ) for sensitivity in sensitivities ] # Compute tau and sigma_for_thresholding # Split delta_for_thresholding into two parts delta_gaussian = delta_per_agg / 2 delta_infinite = delta_per_agg / 2 # Calculate sigma_for_thresholding using the analytic Gaussian mechanism sigma_for_thresholding = calibrate_analytic_gaussian_mechanism( epsilon_per_agg, delta_gaussian, self.contribution_bound, ) # Calculate tau by inverting the formula in Theorem 4.1 # of [Wilkins et al., TPDP'22] tau = self.min_frequency + sigma_for_thresholding * stats.norm.ppf( (1 - delta_infinite) ** (1 / self.contribution_bound) ) logger.debug( "Computed: sigmas=%s tau=%s sigma_for_thresholding=%s", sigmas, tau, sigma_for_thresholding, ) return sigmas, tau, sigma_for_thresholding
[docs] def generate_dpparams( privacy_params: dict[str, float | int], agg_columns: list[AggregationColumn], ) -> DPParams: """ Generate dpparams from privacy_params, agg_func and parameters_list. Args: privacy_params (dict): The privacy parameters. {"EPSILON": float, "DELTA": float, "CONTRIBUTION_BOUND": int, "MIN_FREQUENCY": int} agg_columns (list[AggregationColumn]): The list of aggregation columns. Returns: DPParams: The dpparams. """ clipping_thresholds: list[list[tuple[float, float]] | None] = [] if get_privacy_definition(privacy_params) not in ("DP", "DP_MIN_FREQUENCY"): raise UnsupportedQueryError( "Unsupported privacy definition (expected DP or DP_MIN_FREQUENCY)", context={"privacy_definition": get_privacy_definition(privacy_params)}, hint="Use DP or DP_MIN_FREQUENCY", ) epsilon = float(privacy_params["EPSILON"]) delta = float(privacy_params["DELTA"]) contribution_bound = int(privacy_params["CONTRIBUTION_BOUND"]) min_frequency = int(privacy_params.get("MIN_FREQUENCY", 1)) logger.info("Generating DPParams from privacy_params") logger.debug("privacy_params=%s", privacy_params) logger.debug("agg_columns count=%s", len(agg_columns)) for agg_column in agg_columns: agg = agg_column.aggregation_type parameters = agg_column.parameters match agg_column.aggregation_type: case Aggregation.COUNT | Aggregation.COUNT_DISTINCT: if len(parameters) != 0: raise AggregationError( f"Invalid parameter count for {agg.name} (expected 0)", context={"aggregation": agg.name, "parameters": parameters}, hint="Remove parameters for COUNT or COUNT_DISTINCT", ) clipping_thresholds.append(None) logger.debug("Aggregation %s: no clipping params", agg.name) case ( Aggregation.SUM | Aggregation.AVG | Aggregation.STDDEV_SAMP | Aggregation.STDDEV_POP | Aggregation.VAR_SAMP | Aggregation.VAR_POP ): if len(parameters) != 2: raise AggregationError( f"Invalid parameter count for {agg.name} (expected 2)", context={"aggregation": agg.name, "parameters": parameters}, hint="Provide (lower_bound, upper_bound)", ) clipping_thresholds.append([(parameters[0], parameters[1])]) logger.debug( "Aggregation %s: clipping=(%s, %s)", agg.name, parameters[0], parameters[1], ) case Aggregation.COVAR_SAMP | Aggregation.COVAR_POP: if len(parameters) != 4: raise AggregationError( f"Invalid parameter count for {agg.name} (expected 4)", context={"aggregation": agg.name, "parameters": parameters}, hint="Provide (lower_bound_1, upper_bound_1, " "lower_bound_2, upper_bound_2)", ) clipping_thresholds.append( [ (parameters[0], parameters[1]), (parameters[2], parameters[3]), ] ) logger.debug( "Aggregation %s: clipping1=(%s, %s) clipping2=(%s, %s)", agg.name, parameters[0], parameters[1], parameters[2], parameters[3], ) case _: raise AggregationError( "Unsupported aggregation type in parameter generation", context={"aggregation": getattr(agg, "name", str(agg))}, hint="Extend generate_dpparams to handle this aggregation", ) logger.debug( "Constructing DPParams with epsilon=%s delta=%s " "min_frequency=%s contribution_bound=%s", epsilon, delta, min_frequency, contribution_bound, ) return DPParams( contribution_bound=contribution_bound, clipping_thresholds=clipping_thresholds, min_frequency=min_frequency, epsilon=epsilon, delta=delta, )