diff --git a/diff_diff/estimators.py b/diff_diff/estimators.py index cf1099fa..18d4b201 100644 --- a/diff_diff/estimators.py +++ b/diff_diff/estimators.py @@ -169,6 +169,10 @@ def fit( # Validate inputs self._validate_data(data, outcome, treatment, time, covariates) + # Validate binary variables BEFORE any transformations + validate_binary(data[treatment].values, "treatment") + validate_binary(data[time].values, "time") + # Validate fixed effects and absorb columns if fixed_effects: for fe in fixed_effects: @@ -186,6 +190,9 @@ def fit( if absorb: # Apply within-transformation for each absorbed variable + # Only demean outcome and covariates, NOT treatment/time indicators + # Treatment is typically time-invariant (within unit), and time is + # unit-invariant, so demeaning them would create multicollinearity vars_to_demean = [outcome] + (covariates or []) for ab_var in absorb: n_absorbed_effects += working_data[ab_var].nunique() - 1 @@ -194,15 +201,11 @@ def fit( working_data[var] = working_data[var] - group_means absorbed_vars.append(ab_var) - # Extract variables + # Extract variables (may be demeaned if absorb was used) y = working_data[outcome].values.astype(float) d = working_data[treatment].values.astype(float) t = working_data[time].values.astype(float) - # Validate binary variables - validate_binary(d, "treatment") - validate_binary(t, "time") - # Create interaction term dt = d * t @@ -220,7 +223,8 @@ def fit( if fixed_effects: for fe in fixed_effects: # Create dummies, drop first category to avoid multicollinearity - dummies = pd.get_dummies(data[fe], prefix=fe, drop_first=True) + # Use working_data to be consistent with absorbed FE if both are used + dummies = pd.get_dummies(working_data[fe], prefix=fe, drop_first=True) for col in dummies.columns: X = np.column_stack([X, dummies[col].values.astype(float)]) var_names.append(col) @@ -298,7 +302,21 @@ def _fit_ols(self, X: np.ndarray, y: np.ndarray) -> tuple: ------- tuple (coefficients, residuals, fitted_values, r_squared) + + Raises + ------ + ValueError + If design matrix is rank-deficient (perfect multicollinearity). """ + # Check for rank deficiency (perfect multicollinearity) + rank = np.linalg.matrix_rank(X) + if rank < X.shape[1]: + raise ValueError( + f"Design matrix is rank-deficient (rank {rank} < {X.shape[1]} columns). " + "This indicates perfect multicollinearity. Check your fixed effects " + "and covariates for linear dependencies." + ) + # Solve normal equations: β = (X'X)^(-1) X'y coefficients = np.linalg.lstsq(X, y, rcond=None)[0] diff --git a/diff_diff/utils.py b/diff_diff/utils.py index e11327c2..60037818 100644 --- a/diff_diff/utils.py +++ b/diff_diff/utils.py @@ -218,13 +218,18 @@ def compute_trend(group_data): mean_t = np.mean(time_norm) mean_y = np.mean(outcome_values) - slope = np.sum((time_norm - mean_t) * (outcome_values - mean_y)) / np.sum((time_norm - mean_t) ** 2) + # Check for zero variance in time (all same time period) + time_var = np.sum((time_norm - mean_t) ** 2) + if time_var == 0: + return np.nan, np.nan + + slope = np.sum((time_norm - mean_t) * (outcome_values - mean_y)) / time_var # Compute standard error of slope y_hat = mean_y + slope * (time_norm - mean_t) residuals = outcome_values - y_hat mse = np.sum(residuals ** 2) / (n - 2) - se_slope = np.sqrt(mse / np.sum((time_norm - mean_t) ** 2)) + se_slope = np.sqrt(mse / time_var) return slope, se_slope @@ -258,7 +263,8 @@ def check_parallel_trends_robust( unit: str = None, pre_periods: list = None, n_permutations: int = 1000, - seed: int = None + seed: int = None, + wasserstein_threshold: float = 0.2 ) -> dict: """ Perform robust parallel trends testing using distributional comparisons. @@ -286,6 +292,9 @@ def check_parallel_trends_robust( Number of permutations for computing p-value. seed : int, optional Random seed for reproducibility. + wasserstein_threshold : float, default=0.2 + Threshold for normalized Wasserstein distance. Values below this + threshold (combined with p > 0.05) suggest parallel trends are plausible. Returns ------- @@ -321,8 +330,8 @@ def check_parallel_trends_robust( of pre-treatment changes are similar, supporting the parallel trends assumption. """ - if seed is not None: - np.random.seed(seed) + # Use local RNG to avoid affecting global random state + rng = np.random.default_rng(seed) # Identify pre-treatment periods if pre_periods is None: @@ -361,7 +370,7 @@ def check_parallel_trends_robust( permuted_distances = np.zeros(n_permutations) for i in range(n_permutations): - perm_idx = np.random.permutation(n_total) + perm_idx = rng.permutation(n_total) perm_treated = all_changes[perm_idx[:n_treated]] perm_control = all_changes[perm_idx[n_treated:]] permuted_distances[i] = stats.wasserstein_distance(perm_treated, perm_control) @@ -383,10 +392,10 @@ def check_parallel_trends_robust( wasserstein_normalized = wasserstein_dist / pooled_std if pooled_std > 0 else np.nan # Assessment: parallel trends plausible if p-value > 0.05 - # and normalized Wasserstein is small (< 0.2 as rule of thumb) + # and normalized Wasserstein is small (below threshold) plausible = bool( wasserstein_p > 0.05 and - (wasserstein_normalized < 0.2 if not np.isnan(wasserstein_normalized) else True) + (wasserstein_normalized < wasserstein_threshold if not np.isnan(wasserstein_normalized) else True) ) return { @@ -523,23 +532,64 @@ def equivalence_test_trends( pre_data, outcome, time, treatment_group, unit ) + # Need at least 2 observations per group to compute variance + # and at least 3 total for meaningful df calculation if len(treated_changes) < 2 or len(control_changes) < 2: return { "mean_difference": np.nan, + "se_difference": np.nan, "equivalence_margin": np.nan, + "lower_t_stat": np.nan, + "upper_t_stat": np.nan, "lower_p_value": np.nan, "upper_p_value": np.nan, "tost_p_value": np.nan, + "degrees_of_freedom": np.nan, "equivalent": None, - "error": "Insufficient data", + "error": "Insufficient data (need at least 2 observations per group)", } # Compute statistics + var_t = np.var(treated_changes, ddof=1) + var_c = np.var(control_changes, ddof=1) + n_t = len(treated_changes) + n_c = len(control_changes) + mean_diff = np.mean(treated_changes) - np.mean(control_changes) - se_diff = np.sqrt( - np.var(treated_changes, ddof=1) / len(treated_changes) + - np.var(control_changes, ddof=1) / len(control_changes) - ) + + # Handle zero variance case + if var_t == 0 and var_c == 0: + return { + "mean_difference": mean_diff, + "se_difference": 0.0, + "equivalence_margin": np.nan, + "lower_t_stat": np.nan, + "upper_t_stat": np.nan, + "lower_p_value": np.nan, + "upper_p_value": np.nan, + "tost_p_value": np.nan, + "degrees_of_freedom": np.nan, + "equivalent": None, + "error": "Zero variance in both groups - cannot perform t-test", + } + + se_diff = np.sqrt(var_t / n_t + var_c / n_c) + + # Handle zero SE case (cannot divide by zero in t-stat calculation) + if se_diff == 0: + return { + "mean_difference": mean_diff, + "se_difference": 0.0, + "equivalence_margin": np.nan, + "lower_t_stat": np.nan, + "upper_t_stat": np.nan, + "lower_p_value": np.nan, + "upper_p_value": np.nan, + "tost_p_value": np.nan, + "degrees_of_freedom": np.nan, + "equivalent": None, + "error": "Zero standard error - cannot perform t-test", + } # Set equivalence margin if not provided if equivalence_margin is None: @@ -547,13 +597,17 @@ def equivalence_test_trends( equivalence_margin = 0.5 * np.std(pooled_changes, ddof=1) # Degrees of freedom (Welch-Satterthwaite approximation) - var_t = np.var(treated_changes, ddof=1) - var_c = np.var(control_changes, ddof=1) - n_t = len(treated_changes) - n_c = len(control_changes) - - df = ((var_t/n_t + var_c/n_c)**2 / - ((var_t/n_t)**2/(n_t-1) + (var_c/n_c)**2/(n_c-1))) + # Guard against division by zero when one group has zero variance + numerator = (var_t/n_t + var_c/n_c)**2 + denom_t = (var_t/n_t)**2/(n_t-1) if var_t > 0 else 0 + denom_c = (var_c/n_c)**2/(n_c-1) if var_c > 0 else 0 + denominator = denom_t + denom_c + + if denominator == 0: + # Fall back to minimum of n_t-1 and n_c-1 when one variance is zero + df = min(n_t - 1, n_c - 1) + else: + df = numerator / denominator # TOST: Two one-sided tests # Test 1: H0: diff <= -margin vs H1: diff > -margin diff --git a/tests/test_estimators.py b/tests/test_estimators.py index f8b70e49..62fdbda0 100644 --- a/tests/test_estimators.py +++ b/tests/test_estimators.py @@ -688,3 +688,273 @@ def test_variance_ratio(self, parallel_trends_data): assert "variance_ratio" in results assert results["variance_ratio"] > 0 + + +class TestEdgeCases: + """Tests for edge cases and robustness.""" + + def test_multicollinearity_detection(self): + """Test that perfect multicollinearity is detected.""" + # Create data where a covariate is perfectly correlated with treatment + data = pd.DataFrame({ + "outcome": [10, 11, 15, 18, 9, 10, 12, 13], + "treated": [1, 1, 1, 1, 0, 0, 0, 0], + "post": [0, 0, 1, 1, 0, 0, 1, 1], + "duplicate_treated": [1, 1, 1, 1, 0, 0, 0, 0], # Same as treated + }) + + did = DifferenceInDifferences() + with pytest.raises(ValueError, match="rank-deficient"): + did.fit( + data, + outcome="outcome", + treatment="treated", + time="post", + covariates=["duplicate_treated"] + ) + + def test_wasserstein_custom_threshold(self): + """Test that custom Wasserstein threshold is respected.""" + from diff_diff.utils import check_parallel_trends_robust + + np.random.seed(42) + n_units = 50 + n_periods = 4 + + data = [] + for unit in range(n_units): + is_treated = unit < n_units // 2 + for period in range(n_periods): + y = 10.0 + period * 1.5 + np.random.normal(0, 0.5) + data.append({ + "unit": unit, + "period": period, + "treated": int(is_treated), + "outcome": y, + }) + + df = pd.DataFrame(data) + + # Test with very low threshold (more strict) + results_strict = check_parallel_trends_robust( + df, + outcome="outcome", + time="period", + treatment_group="treated", + unit="unit", + pre_periods=[0, 1], + seed=42, + wasserstein_threshold=0.01 # Very strict + ) + + # Test with high threshold (more lenient) + results_lenient = check_parallel_trends_robust( + df, + outcome="outcome", + time="period", + treatment_group="treated", + unit="unit", + pre_periods=[0, 1], + seed=42, + wasserstein_threshold=1.0 # Very lenient + ) + + # Both should return valid results + assert "wasserstein_distance" in results_strict + assert "wasserstein_distance" in results_lenient + + def test_equivalence_test_insufficient_data(self): + """Test equivalence test handles insufficient data gracefully.""" + from diff_diff.utils import equivalence_test_trends + + # Create minimal data with only 1 observation per group + data = pd.DataFrame({ + "outcome": [10, 15], + "period": [0, 1], + "treated": [1, 0], + "unit": [0, 1], + }) + + results = equivalence_test_trends( + data, + outcome="outcome", + time="period", + treatment_group="treated", + unit="unit", + pre_periods=[0] + ) + + # Should return NaN values with error message + assert np.isnan(results["tost_p_value"]) + assert results["equivalent"] is None + assert "error" in results + + def test_parallel_trends_single_period(self): + """Test that single pre-period returns NaN values.""" + from diff_diff.utils import check_parallel_trends + + data = pd.DataFrame({ + "outcome": [10, 11, 12, 13], + "time": [0, 0, 0, 0], # All same period + "treated": [1, 1, 0, 0], + }) + + results = check_parallel_trends( + data, + outcome="outcome", + time="time", + treatment_group="treated", + pre_periods=[0] + ) + + # Should handle gracefully with NaN + assert np.isnan(results["treated_trend"]) or results["treated_trend"] is None + + +class TestTwoWayFixedEffects: + """Tests for TwoWayFixedEffects estimator.""" + + @pytest.fixture + def twfe_panel_data(self): + """Create panel data for TWFE testing.""" + np.random.seed(42) + n_units = 20 + n_periods = 4 + + data = [] + for unit in range(n_units): + is_treated = unit < n_units // 2 + unit_effect = np.random.normal(0, 2) + + for period in range(n_periods): + time_effect = period * 1.0 + post = 1 if period >= 2 else 0 + + y = 10.0 + unit_effect + time_effect + if is_treated and post: + y += 3.0 # True ATT + + y += np.random.normal(0, 0.5) + + data.append({ + "unit": unit, + "period": period, + "treated": int(is_treated), + "post": post, + "outcome": y, + }) + + return pd.DataFrame(data) + + def test_twfe_basic_fit(self, twfe_panel_data): + """Test basic TWFE model fitting.""" + from diff_diff.estimators import TwoWayFixedEffects + + twfe = TwoWayFixedEffects() + results = twfe.fit( + twfe_panel_data, + outcome="outcome", + treatment="treated", + time="post", + unit="unit" + ) + + assert results is not None + assert twfe.is_fitted_ + # ATT should be positive (true effect is 3.0) + # Note: TWFE with within-transformation may give different estimates + # due to the mechanics of two-way demeaning + assert results.att > 0 + assert results.se > 0 + + def test_twfe_with_covariates(self, twfe_panel_data): + """Test TWFE with covariates.""" + from diff_diff.estimators import TwoWayFixedEffects + + # Add a covariate + twfe_panel_data["size"] = np.random.normal(100, 10, len(twfe_panel_data)) + + twfe = TwoWayFixedEffects() + results = twfe.fit( + twfe_panel_data, + outcome="outcome", + treatment="treated", + time="post", + unit="unit", + covariates=["size"] + ) + + assert results is not None + assert twfe.is_fitted_ + + def test_twfe_invalid_unit_column(self, twfe_panel_data): + """Test error when unit column doesn't exist.""" + from diff_diff.estimators import TwoWayFixedEffects + + twfe = TwoWayFixedEffects() + with pytest.raises(ValueError, match="not found"): + twfe.fit( + twfe_panel_data, + outcome="outcome", + treatment="treated", + time="post", + unit="nonexistent_unit" + ) + + def test_twfe_clusters_at_unit_level(self, twfe_panel_data): + """Test that TWFE defaults to clustering at unit level.""" + from diff_diff.estimators import TwoWayFixedEffects + + twfe = TwoWayFixedEffects() + twfe.fit( + twfe_panel_data, + outcome="outcome", + treatment="treated", + time="post", + unit="unit" + ) + + # Cluster should be set to unit + assert twfe.cluster == "unit" + + +class TestClusterRobustSE: + """Tests for cluster-robust standard errors.""" + + def test_cluster_robust_se(self): + """Test cluster-robust standard errors in base DiD.""" + np.random.seed(42) + + # Create clustered data + data = [] + for cluster in range(10): + for obs in range(10): + treated = cluster < 5 + post = obs >= 5 + y = 10 + (3.0 if treated and post else 0) + np.random.normal(0, 1) + data.append({ + "cluster": cluster, + "outcome": y, + "treated": int(treated), + "post": int(post), + }) + + df = pd.DataFrame(data) + + # With clustering + did_cluster = DifferenceInDifferences(cluster="cluster") + results_cluster = did_cluster.fit( + df, outcome="outcome", treatment="treated", time="post" + ) + + # Without clustering + did_no_cluster = DifferenceInDifferences(robust=True) + results_no_cluster = did_no_cluster.fit( + df, outcome="outcome", treatment="treated", time="post" + ) + + # ATT should be similar + assert abs(results_cluster.att - results_no_cluster.att) < 0.01 + + # SEs should be different (cluster-robust typically larger) + assert results_cluster.se != results_no_cluster.se