@@ -2439,9 +2439,11 @@ def _generate_multistep_conversion_query(
24392439 ) -> str :
24402440 """Generate SQL for N-step conversion funnel metrics.
24412441
2442- Uses timestamp-aware chronological ordering: for each entity, tracks
2443- the earliest timestamp at which each step was satisfied (via MIN/CASE),
2444- then gates each step on occurring after all prior steps.
2442+ Uses a sequential CTE chain: each step CTE joins back to the previous
2443+ step and only considers events AFTER the prior step's timestamp. This
2444+ correctly handles entities with repeated actions (e.g., an entity that
2445+ does step 2 before step 1 is only counted if they also do step 2 after
2446+ step 1).
24452447
24462448 Args:
24472449 metric: The Metric object with steps defined
@@ -2509,10 +2511,10 @@ def _generate_multistep_conversion_query(
25092511 # Normalize filters: strip model name prefixes so they work inside CTEs
25102512 normalized_filters = self ._strip_model_prefixes (filters or [], model .name )
25112513
2512- # Build WHERE filter clause
2514+ # Build WHERE filter clause (appended with AND to each step CTE)
25132515 filter_clause = ""
25142516 if normalized_filters :
2515- filter_clause = "\n WHERE " + " AND " .join (normalized_filters )
2517+ filter_clause = " AND " + " AND " .join (normalized_filters )
25162518
25172519 # Resolve dimension columns for GROUP BY support
25182520 dim_entries : list [tuple [str , str ]] = []
@@ -2535,47 +2537,84 @@ def _generate_multistep_conversion_query(
25352537
25362538 dim_aliases = [alias for alias , _ in dim_entries ]
25372539
2538- # Build inner CTE: per-entity MIN timestamp for each step.
2539- # MIN(CASE WHEN step_expr THEN timestamp END) captures the earliest time
2540- # the step was satisfied, enabling chronological ordering in the outer query.
2541- step_cols = []
2542- for i , step_expr in enumerate (metric .steps , 1 ):
2543- step_cols .append (f"MIN(CASE WHEN { step_expr } THEN { ts_sql } END) AS step_{ i } _ts" )
2540+ # Build from_clause variant for non-first steps (aliased as 's' for joining)
2541+ if model .sql :
2542+ from_clause_s = f"({ model .sql } ) AS s"
2543+ else :
2544+ from_clause_s = f"{ model .table } s"
25442545
2545- inner_select_parts = [f"{ metric .entity } AS entity" ]
2546- if dim_entries :
2547- for alias , sql_col in dim_entries :
2548- inner_select_parts .append (f"{ sql_col } AS { alias } " )
2549- inner_select_parts .extend (step_cols )
2550- inner_select = ",\n " .join (inner_select_parts )
2546+ # Build sequential CTE chain: each step derives its timestamp from the
2547+ # prior step's completion, ensuring correct chronological ordering.
2548+ cte_parts = []
2549+ num_steps = len (metric .steps )
25512550
2552- group_by_inner_parts = [metric .entity ]
2553- if dim_entries :
2554- for alias , sql_col in dim_entries :
2555- group_by_inner_parts .append (sql_col )
2556- group_by_inner = ",\n " .join (group_by_inner_parts )
2551+ for i , step_expr in enumerate (metric .steps , 1 ):
2552+ if i == 1 :
2553+ # Step 1: find the earliest matching event per entity
2554+ select_parts = [f"{ metric .entity } AS entity" , f"MIN({ ts_sql } ) AS step_1_ts" ]
2555+ for alias , sql_col in dim_entries :
2556+ select_parts .append (f"{ sql_col } AS { alias } " )
2557+ select_str = ",\n " .join (select_parts )
2558+ group_parts = [metric .entity ]
2559+ for _alias , sql_col in dim_entries :
2560+ group_parts .append (sql_col )
2561+ group_str = ",\n " .join (group_parts )
2562+ cte_parts .append (
2563+ f"step_1 AS (\n "
2564+ f" SELECT\n "
2565+ f" { select_str } \n "
2566+ f" FROM { from_clause } \n "
2567+ f" WHERE { step_expr } { filter_clause } \n "
2568+ f" GROUP BY\n "
2569+ f" { group_str } \n "
2570+ f")"
2571+ )
2572+ else :
2573+ # Step N: join source to step N-1, only consider events at or after prior step
2574+ prev = f"step_{ i - 1 } "
2575+ select_parts = [f"s.{ metric .entity } AS entity" , f"MIN(s.{ ts_sql } ) AS step_{ i } _ts" ]
2576+ for alias , _sql_col in dim_entries :
2577+ select_parts .append (f"{ prev } .{ alias } " )
2578+ select_str = ",\n " .join (select_parts )
2579+ group_parts = [f"s.{ metric .entity } " ]
2580+ for alias , _sql_col in dim_entries :
2581+ group_parts .append (f"{ prev } .{ alias } " )
2582+ group_str = ",\n " .join (group_parts )
2583+ cte_parts .append (
2584+ f"step_{ i } AS (\n "
2585+ f" SELECT\n "
2586+ f" { select_str } \n "
2587+ f" FROM { from_clause_s } \n "
2588+ f" JOIN { prev } ON s.{ metric .entity } = { prev } .entity\n "
2589+ f" AND s.{ ts_sql } >= { prev } .step_{ i - 1 } _ts\n "
2590+ f" WHERE { step_expr } { filter_clause } \n "
2591+ f" GROUP BY\n "
2592+ f" { group_str } \n "
2593+ f")"
2594+ )
25572595
2558- # Build outer SELECT: total entities (only step 1 entrants) + sum of each step.
2559- # Gate each step on chronological order: step_N_ts must be >= step_(N-1)_ts.
2560- outer_select_parts = []
2561- if dim_aliases :
2596+ # Build final SELECT: count distinct entities from each step CTE
2597+ final_select_parts = []
2598+ for alias in dim_aliases :
2599+ final_select_parts .append (f"step_1.{ alias } " )
2600+ final_select_parts .append ("COUNT(DISTINCT step_1.entity) AS total_entities" )
2601+ for i in range (1 , num_steps + 1 ):
2602+ final_select_parts .append (f"COUNT(DISTINCT step_{ i } .entity) AS step_{ i } _count" )
2603+ final_select = ",\n " .join (final_select_parts )
2604+
2605+ # Build LEFT JOIN chain: step_1 LEFT JOIN step_2 LEFT JOIN step_3 ...
2606+ join_parts = []
2607+ for i in range (2 , num_steps + 1 ):
2608+ join_on = f"step_{ i - 1 } .entity = step_{ i } .entity"
25622609 for alias in dim_aliases :
2563- outer_select_parts .append (f"{ alias } " )
2564- outer_select_parts .append ("SUM(CASE WHEN step_1_ts IS NOT NULL THEN 1 ELSE 0 END) AS total_entities" )
2565- for i in range (1 , len (metric .steps ) + 1 ):
2566- # Each step requires all prior steps to have been completed in chronological order
2567- conditions = ["step_1_ts IS NOT NULL" ]
2568- for j in range (2 , i + 1 ):
2569- conditions .append (f"step_{ j } _ts IS NOT NULL" )
2570- conditions .append (f"step_{ j } _ts >= step_{ j - 1 } _ts" )
2571- gate_expr = " AND " .join (conditions )
2572- outer_select_parts .append (f"SUM(CASE WHEN { gate_expr } THEN 1 ELSE 0 END) AS step_{ i } _count" )
2573- outer_select = ",\n " .join (outer_select_parts )
2574-
2575- # Build GROUP BY, ORDER BY, LIMIT for outer query
2576- outer_group_by = ""
2610+ join_on += f"\n AND step_{ i - 1 } .{ alias } IS NOT DISTINCT FROM step_{ i } .{ alias } "
2611+ join_parts .append (f"LEFT JOIN step_{ i } ON { join_on } " )
2612+ join_str = "\n " .join (join_parts )
2613+
2614+ # Build GROUP BY, ORDER BY, LIMIT for final query
2615+ final_group_by = ""
25772616 if dim_aliases :
2578- outer_group_by = "\n GROUP BY\n " + ",\n " .join (str (i + 1 ) for i in range (len (dim_aliases )))
2617+ final_group_by = "\n GROUP BY\n " + ",\n " .join (str (i + 1 ) for i in range (len (dim_aliases )))
25792618
25802619 order_clause = ""
25812620 if order_by :
@@ -2589,17 +2628,14 @@ def _generate_multistep_conversion_query(
25892628 if limit is not None :
25902629 limit_clause = f"\n LIMIT { limit } "
25912630
2631+ # Assemble full query
2632+ cte_str = ",\n " .join (cte_parts )
2633+ join_section = f"\n { join_str } " if join_parts else ""
25922634 sql = f"""
2593- WITH { metric .name } _per_entity AS (
2594- SELECT
2595- { inner_select }
2596- FROM { from_clause } { filter_clause }
2597- GROUP BY
2598- { group_by_inner }
2599- )
2635+ WITH { cte_str }
26002636SELECT
2601- { outer_select }
2602- FROM { metric . name } _per_entity { outer_group_by } { order_clause } { limit_clause }
2637+ { final_select }
2638+ FROM step_1 { join_section } { final_group_by } { order_clause } { limit_clause }
26032639"""
26042640
26052641 return sql .strip ()
0 commit comments