@@ -2207,8 +2207,9 @@ def _generate_conversion_query(
22072207 ) -> str :
22082208 """Generate SQL for conversion funnel metrics.
22092209
2210- Uses self-join pattern to find entities that had both base and conversion events
2211- within the specified time window.
2210+ Supports two modes:
2211+ - Legacy 2-step: uses base_event/conversion_event with self-join pattern
2212+ - N-step funnel: uses steps list with per-entity BOOL_OR aggregation
22122213
22132214 Args:
22142215 metric_name: Name of the conversion metric (can be "metric" or "model.metric" format)
@@ -2237,7 +2238,14 @@ def _generate_conversion_query(
22372238 except KeyError :
22382239 pass
22392240
2240- if not metric or not metric .entity or not metric .base_event or not metric .conversion_event :
2241+ if not metric or not metric .entity :
2242+ raise ValueError (f"Conversion metric { metric_name } missing required fields" )
2243+
2244+ # Determine if this is a multi-step funnel or legacy 2-step
2245+ if metric .steps :
2246+ return self ._generate_multistep_conversion_query (metric , metric_name , dimensions , filters , order_by , limit )
2247+
2248+ if not metric .base_event or not metric .conversion_event :
22412249 raise ValueError (f"Conversion metric { metric_name } missing required fields" )
22422250
22432251 # Find the model that owns this metric if we haven't already
@@ -2390,6 +2398,150 @@ def _generate_conversion_query(
23902398{ dim_select } COUNT(DISTINCT conversions.entity)::FLOAT / NULLIF(COUNT(DISTINCT base_events.entity), 0) AS { metric .name }
23912399FROM base_events
23922400LEFT JOIN conversions ON { join_condition } { group_by } { order_clause } { limit_clause }
2401+ """
2402+
2403+ return sql .strip ()
2404+
2405+ def _generate_multistep_conversion_query (
2406+ self ,
2407+ metric ,
2408+ metric_name : str ,
2409+ dimensions : list [str ],
2410+ filters : list [str ] | None = None ,
2411+ order_by : list [str ] | None = None ,
2412+ limit : int | None = None ,
2413+ ) -> str :
2414+ """Generate SQL for N-step conversion funnel metrics.
2415+
2416+ Uses per-entity BOOL_OR aggregation for each step, then sums in the outer query.
2417+
2418+ Args:
2419+ metric: The Metric object with steps defined
2420+ metric_name: Name of the conversion metric
2421+ dimensions: List of dimension references
2422+ filters: List of filter expressions
2423+ order_by: List of fields to order by
2424+ limit: Maximum number of rows to return
2425+
2426+ Returns:
2427+ SQL query string
2428+ """
2429+ import re as _re
2430+
2431+ # Validate entity identifier
2432+ if not _re .match (r"^[a-zA-Z_][a-zA-Z0-9_.]*$" , metric .entity ):
2433+ raise ValueError (f"Invalid entity identifier: { metric .entity } " )
2434+
2435+ # Find the model that owns this metric
2436+ model = None
2437+ if "." in metric_name :
2438+ model_name , _ = metric_name .split ("." , 1 )
2439+ model = self .graph .get_model (model_name )
2440+ if not model :
2441+ for m_name , m in self .graph .models .items ():
2442+ if m .get_metric (metric_name .split ("." , 1 )[- 1 ] if "." in metric_name else metric_name ):
2443+ model = m
2444+ break
2445+ if not model :
2446+ for m_name , m in self .graph .models .items ():
2447+ for dim in m .dimensions :
2448+ if dim .name == metric .entity :
2449+ model = m
2450+ break
2451+ if model :
2452+ break
2453+
2454+ if not model :
2455+ raise ValueError (f"No model found for conversion metric { metric_name } " )
2456+
2457+ # Build FROM clause
2458+ if model .sql :
2459+ from_clause = f"({ model .sql } ) AS t"
2460+ else :
2461+ from_clause = model .table
2462+
2463+ # Build WHERE filter clause
2464+ filter_clause = ""
2465+ if filters :
2466+ filter_clause = "\n WHERE " + " AND " .join (filters )
2467+
2468+ # Resolve dimension columns for GROUP BY support
2469+ dim_entries : list [tuple [str , str ]] = []
2470+ for dim_ref in dimensions :
2471+ dim_name = dim_ref .split ("." , 1 )[1 ] if "." in dim_ref else dim_ref
2472+ if "__" in dim_name :
2473+ base_dim , gran = dim_name .rsplit ("__" , 1 )
2474+ else :
2475+ base_dim , gran = dim_name , None
2476+ dim_obj = model .get_dimension (base_dim )
2477+ if not dim_obj :
2478+ continue
2479+ sql_col = dim_obj .sql_expr
2480+ if gran and dim_obj .type == "time" :
2481+ sql_col = self ._date_trunc (gran , sql_col )
2482+ alias = f"{ base_dim } __{ gran } "
2483+ else :
2484+ alias = base_dim
2485+ dim_entries .append ((alias , sql_col ))
2486+
2487+ dim_aliases = [alias for alias , _ in dim_entries ]
2488+
2489+ # Build inner CTE: per-entity BOOL_OR for each step
2490+ step_cols = []
2491+ for i , step_expr in enumerate (metric .steps , 1 ):
2492+ step_cols .append (f"BOOL_OR({ step_expr } ) AS step_{ i } " )
2493+
2494+ inner_select_parts = [f"{ metric .entity } AS entity" ]
2495+ if dim_entries :
2496+ for alias , sql_col in dim_entries :
2497+ inner_select_parts .append (f"{ sql_col } AS { alias } " )
2498+ inner_select_parts .extend (step_cols )
2499+ inner_select = ",\n " .join (inner_select_parts )
2500+
2501+ group_by_inner_parts = [metric .entity ]
2502+ if dim_entries :
2503+ for alias , sql_col in dim_entries :
2504+ group_by_inner_parts .append (sql_col )
2505+ group_by_inner = ",\n " .join (group_by_inner_parts )
2506+
2507+ # Build outer SELECT: total entities + sum of each step
2508+ outer_select_parts = []
2509+ if dim_aliases :
2510+ for alias in dim_aliases :
2511+ outer_select_parts .append (f"{ alias } " )
2512+ outer_select_parts .append ("COUNT(*) AS total_entities" )
2513+ for i in range (1 , len (metric .steps ) + 1 ):
2514+ outer_select_parts .append (f"SUM(step_{ i } ::int) AS step_{ i } _count" )
2515+ outer_select = ",\n " .join (outer_select_parts )
2516+
2517+ # Build GROUP BY, ORDER BY, LIMIT for outer query
2518+ outer_group_by = ""
2519+ if dim_aliases :
2520+ outer_group_by = "\n GROUP BY\n " + ",\n " .join (str (i + 1 ) for i in range (len (dim_aliases )))
2521+
2522+ order_clause = ""
2523+ if order_by :
2524+ order_fields = []
2525+ for field in order_by :
2526+ field_name = field .split ("." , 1 )[1 ] if "." in field else field
2527+ order_fields .append (field_name )
2528+ order_clause = f"\n ORDER BY { ', ' .join (order_fields )} "
2529+
2530+ limit_clause = ""
2531+ if limit is not None :
2532+ limit_clause = f"\n LIMIT { limit } "
2533+
2534+ sql = f"""
2535+ WITH { metric .name } _per_entity AS (
2536+ SELECT
2537+ { inner_select }
2538+ FROM { from_clause } { filter_clause }
2539+ GROUP BY
2540+ { group_by_inner }
2541+ )
2542+ SELECT
2543+ { outer_select }
2544+ FROM { metric .name } _per_entity{ outer_group_by } { order_clause } { limit_clause }
23932545"""
23942546
23952547 return sql .strip ()
0 commit comments