@@ -43,7 +43,7 @@ export const SqlExecuteTool = Tool.define("sql_execute", {
4343 // but does NOT block execution. Used to measure catch rate before deciding
4444 // whether to enable blocking in a future release. Fire-and-forget so it
4545 // doesn't add latency to the sql_execute hot path.
46- preValidateSql ( args . query , args . warehouse ) . catch ( ( ) => { } )
46+ preValidateSql ( args . query , args . warehouse , queryType ) . catch ( ( ) => { } )
4747 // altimate_change end
4848
4949 try {
@@ -115,22 +115,39 @@ interface PreValidationResult {
115115 error ?: string
116116}
117117
118- async function preValidateSql ( sql : string , warehouse ? : string ) : Promise < PreValidationResult > {
118+ async function preValidateSql ( sql : string , warehouse : string | undefined , queryType : string ) : Promise < PreValidationResult > {
119119 const startTime = Date . now ( )
120+ // Yield the event loop before heavy synchronous SQLite work so concurrent
121+ // tasks aren't blocked. Bun's sqlite API is sync and listColumns can touch
122+ // hundreds of thousands of rows for large warehouses.
123+ await new Promise < void > ( ( resolve ) => setImmediate ( resolve ) )
124+
125+ // Precompute correlation fields used in every telemetry event this function emits.
126+ const maskedSqlHash = Telemetry . hashError ( Telemetry . maskString ( sql ) )
127+
120128 try {
121129 // Resolve the warehouse the same way sql.execute's fallback path does:
122130 // when caller omits `warehouse`, sql.execute uses Registry.list()[0].
123131 // Matching that here keeps the shadow validation aligned with actual
124132 // execution (dbt-routed queries are a known gap — they short-circuit
125133 // before this fallback, so validation may use a different warehouse
126134 // than the one dbt selects).
135+ const registered = Registry . list ( ) . warehouses
127136 let warehouseName = warehouse
128137 if ( ! warehouseName ) {
129- const registered = Registry . list ( ) . warehouses
130138 warehouseName = registered [ 0 ] ?. name
131139 }
140+ const warehouseInfo = registered . find ( ( w ) => w . name === warehouseName )
141+ const warehouseType = warehouseInfo ?. type ?? "unknown"
142+
143+ const ctx : TrackCtx = {
144+ warehouse_type : warehouseType ,
145+ query_type : queryType ,
146+ masked_sql_hash : maskedSqlHash ,
147+ }
148+
132149 if ( ! warehouseName ) {
133- trackPreValidation ( "skipped" , "no_cache" , 0 , Date . now ( ) - startTime , false )
150+ trackPreValidation ( "skipped" , "no_cache" , 0 , Date . now ( ) - startTime , false , ctx )
134151 return { blocked : false }
135152 }
136153
@@ -139,31 +156,39 @@ async function preValidateSql(sql: string, warehouse?: string): Promise<PreValid
139156
140157 const warehouseStatus = status . warehouses . find ( ( w ) => w . name === warehouseName )
141158 if ( ! warehouseStatus ?. last_indexed ) {
142- trackPreValidation ( "skipped" , "no_cache" , 0 , Date . now ( ) - startTime , false )
159+ trackPreValidation ( "skipped" , "no_cache" , 0 , Date . now ( ) - startTime , false , ctx )
143160 return { blocked : false }
144161 }
145162
146163 // Check cache freshness
147164 const cacheAge = Date . now ( ) - new Date ( warehouseStatus . last_indexed ) . getTime ( )
148165 if ( cacheAge > CACHE_TTL_MS ) {
149- trackPreValidation ( "skipped" , "stale_cache" , 0 , Date . now ( ) - startTime , false )
166+ trackPreValidation ( "skipped" , "stale_cache" , 0 , Date . now ( ) - startTime , false , ctx )
150167 return { blocked : false }
151168 }
152169
153170 // Build schema context from cached columns
154171 const columns = cache . listColumns ( warehouseName , COLUMN_SCAN_LIMIT )
155172 const schemaTruncated = columns . length >= COLUMN_SCAN_LIMIT
156173 if ( columns . length === 0 ) {
157- trackPreValidation ( "skipped" , "empty_cache" , 0 , Date . now ( ) - startTime , false )
174+ trackPreValidation ( "skipped" , "empty_cache" , 0 , Date . now ( ) - startTime , false , ctx )
158175 return { blocked : false }
159176 }
160177
161- const schemaContext : Record < string , any > = { }
178+ // Build schema context keyed by fully-qualified name (database.schema.table)
179+ // so multi-database warehouses don't collide on schema+table alone.
180+ // Dedupe columns per table to defend against residual collisions.
181+ const schemaContext : Record < string , { name : string ; type : string ; nullable : boolean } [ ] > = { }
182+ const seenColumns : Record < string , Set < string > > = { }
162183 for ( const col of columns ) {
163- const tableName = col . schema_name ? `${ col . schema_name } .${ col . table } ` : col . table
184+ const tableName = [ col . database , col . schema_name , col . table ] . filter ( Boolean ) . join ( "." )
185+ if ( ! tableName ) continue
164186 if ( ! schemaContext [ tableName ] ) {
165187 schemaContext [ tableName ] = [ ]
188+ seenColumns [ tableName ] = new Set ( )
166189 }
190+ if ( seenColumns [ tableName ] . has ( col . name ) ) continue
191+ seenColumns [ tableName ] . add ( col . name )
167192 schemaContext [ tableName ] . push ( {
168193 name : col . name ,
169194 type : col . data_type || "VARCHAR" ,
@@ -178,60 +203,61 @@ async function preValidateSql(sql: string, warehouse?: string): Promise<PreValid
178203 schema_context : schemaContext ,
179204 } )
180205
206+ // If the dispatcher itself failed, don't treat missing data as "valid".
207+ if ( ! validationResult . success ) {
208+ const errMsg = typeof validationResult . error === "string" ? validationResult . error : undefined
209+ trackPreValidation ( "error" , "dispatcher_failed" , 0 , Date . now ( ) - startTime , false , ctx , errMsg )
210+ return { blocked : false }
211+ }
212+
181213 const data = ( validationResult . data ?? { } ) as Record < string , any >
182214 const errors = Array . isArray ( data . errors ) ? data . errors : [ ]
183215 const isValid = data . valid !== false && errors . length === 0
184216
185217 if ( isValid ) {
186- trackPreValidation ( "passed" , "valid" , columns . length , Date . now ( ) - startTime , schemaTruncated )
218+ trackPreValidation ( "passed" , "valid" , columns . length , Date . now ( ) - startTime , schemaTruncated , ctx )
187219 return { blocked : false }
188220 }
189221
190222 // Only block on high-confidence structural errors
191223 const structuralErrors = errors . filter ( ( e : any ) => {
192224 const msg = ( e . message ?? "" ) . toLowerCase ( )
193- return msg . includes ( " column" ) || msg . includes ( " table" ) || msg . includes ( " not found" ) || msg . includes ( " does not exist" )
225+ return / \b ( c o l u m n | t a b l e | v i e w | r e l a t i o n | i d e n t i f i e r | n o t f o u n d | d o e s n o t e x i s t ) \b / . test ( msg )
194226 } )
195227
196228 if ( structuralErrors . length === 0 ) {
197229 // Non-structural errors (ambiguous cases) — let them through
198- trackPreValidation ( "passed" , "non_structural" , columns . length , Date . now ( ) - startTime , schemaTruncated )
230+ trackPreValidation ( "passed" , "non_structural" , columns . length , Date . now ( ) - startTime , schemaTruncated , ctx )
199231 return { blocked : false }
200232 }
201233
202- // Build helpful error with available columns
203234 const errorMsgs = structuralErrors . map ( ( e : any ) => e . message ) . join ( "\n" )
204- const referencedTables = Object . keys ( schemaContext ) . slice ( 0 , 10 )
205- const availableColumns = referencedTables
206- . map ( ( t ) => `${ t } : ${ schemaContext [ t ] . map ( ( c : any ) => c . name ) . join ( ", " ) } ` )
207- . join ( "\n" )
208-
209- const errorOutput = [
210- `Pre-execution validation failed (validated against cached schema):` ,
211- `` ,
212- errorMsgs ,
213- `` ,
214- `Available tables and columns:` ,
215- availableColumns ,
216- `` ,
217- `Fix the query and retry. If the schema cache is outdated, run schema_index to refresh it.` ,
218- ] . join ( "\n" )
219-
220- trackPreValidation ( "blocked" , "structural_error" , columns . length , Date . now ( ) - startTime , schemaTruncated , errorMsgs )
221- return { blocked : true , error : errorOutput }
235+ trackPreValidation ( "blocked" , "structural_error" , columns . length , Date . now ( ) - startTime , schemaTruncated , ctx , errorMsgs )
236+ // Shadow mode: caller discards the result. When blocking is enabled in the
237+ // future, build errorOutput here with the structural errors and
238+ // schemaContext keys for user-facing guidance.
239+ return { blocked : false }
222240 } catch {
223241 // Validation failure should never block execution
224- trackPreValidation ( "error" , "validation_exception" , 0 , Date . now ( ) - startTime , false )
242+ const ctx : TrackCtx = { warehouse_type : "unknown" , query_type : queryType , masked_sql_hash : maskedSqlHash }
243+ trackPreValidation ( "error" , "validation_exception" , 0 , Date . now ( ) - startTime , false , ctx )
225244 return { blocked : false }
226245 }
227246}
228247
248+ interface TrackCtx {
249+ warehouse_type : string
250+ query_type : string
251+ masked_sql_hash : string
252+ }
253+
229254function trackPreValidation (
230255 outcome : "skipped" | "passed" | "blocked" | "error" ,
231256 reason : string ,
232257 schema_columns : number ,
233258 duration_ms : number ,
234259 schema_truncated : boolean ,
260+ ctx : TrackCtx ,
235261 error_message ?: string ,
236262) {
237263 // Mask schema identifiers (table / column names, paths, user IDs) from the
@@ -244,6 +270,9 @@ function trackPreValidation(
244270 session_id : Telemetry . getContext ( ) . sessionId ,
245271 outcome,
246272 reason,
273+ warehouse_type : ctx . warehouse_type ,
274+ query_type : ctx . query_type ,
275+ masked_sql_hash : ctx . masked_sql_hash ,
247276 schema_columns,
248277 schema_truncated,
249278 duration_ms,
0 commit comments