The Code Execution stage allows you to run custom Python code to transform, filter, or enrich documents. This provides maximum flexibility for complex logic that can’t be expressed with other stages.
Stage Category : APPLY (Transforms documents)Transformation : N documents → M documents (custom logic)
When to Use
Use Case Description Custom transformations Complex field calculations Business logic Domain-specific rules Data normalization Custom parsing/formatting Advanced filtering Logic beyond structured_filter
When NOT to Use
Scenario Recommended Alternative Simple field transforms json_transformLLM-based enrichment llm_enrichmentStandard filtering structured_filterExternal API calls api_call
Parameters
Parameter Type Default Description codestring Required Python code to execute runtimestring python3.11Python runtime version timeoutinteger 5000Execution timeout (ms) memoryinteger 128Memory limit (MB) packagesarray []Additional pip packages
Configuration Examples
Basic Transformation
Custom Scoring
Filtering Logic
With External Packages
Text Processing
{
"stage_type" : "apply" ,
"stage_id" : "code_execution" ,
"parameters" : {
"code" : "def transform(doc): \n doc['word_count'] = len(doc.get('content', '').split()) \n return doc"
}
}
Code Structure
Your code must define a transform function:
def transform ( doc ):
"""
Transform a single document.
Args:
doc: Dictionary containing document fields
Returns:
- Modified doc dict to keep document
- None to filter out document
"""
# Your logic here
return doc
Available in Scope
Variable Type Description docdict Current document INPUTdict Pipeline input parameters CONTEXTdict Pipeline context
doc = {
"document_id" : "doc_123" ,
"content" : "Document text content..." ,
"score" : 0.85 ,
"metadata" : {
"title" : "Document Title" ,
"author" : "John Doe" ,
"date" : "2024-01-15"
}
}
Output Options
Return Value Effect doc (modified)Keep document with changes doc (unmodified)Keep document as-is NoneFilter out document
Metric Value Latency 5-50ms per document Timeout Configurable (default 5s) Memory Configurable (default 128MB) Concurrency Parallel execution
Code execution adds latency. Keep transformations simple and avoid heavy computation. For complex processing, consider pre-computing during ingestion.
Common Pipeline Patterns
Custom Scoring Pipeline
[
{
"stage_type" : "filter" ,
"stage_id" : "semantic_search" ,
"parameters" : {
"query" : "{{INPUT.query}}" ,
"vector_index" : "text_extractor_v1_embedding" ,
"top_k" : 50
}
},
{
"stage_type" : "apply" ,
"stage_id" : "code_execution" ,
"parameters" : {
"code" : "def transform(doc): \n score = doc.get('score', 0) \n # Boost verified sources \n if doc.get('metadata', {}).get('verified'): \n score *= 1.2 \n # Penalize old content \n if doc.get('metadata', {}).get('year', 2024) < 2020: \n score *= 0.8 \n doc['custom_score'] = score \n return doc"
}
},
{
"stage_type" : "sort" ,
"stage_id" : "sort_relevance" ,
"parameters" : {
"score_field" : "custom_score"
}
}
]
Data Normalization Pipeline
[
{
"stage_type" : "filter" ,
"stage_id" : "hybrid_search" ,
"parameters" : {
"query" : "{{INPUT.query}}" ,
"vector_index" : "text_extractor_v1_embedding" ,
"top_k" : 30
}
},
{
"stage_type" : "apply" ,
"stage_id" : "code_execution" ,
"parameters" : {
"code" : "def transform(doc): \n meta = doc.get('metadata', {}) \n # Normalize price to USD \n price = meta.get('price', 0) \n currency = meta.get('currency', 'USD') \n rates = {'EUR': 1.1, 'GBP': 1.27, 'USD': 1.0} \n doc['metadata']['price_usd'] = price * rates.get(currency, 1.0) \n return doc"
}
}
]
Advanced Filtering
[
{
"stage_type" : "filter" ,
"stage_id" : "semantic_search" ,
"parameters" : {
"query" : "{{INPUT.query}}" ,
"vector_index" : "text_extractor_v1_embedding" ,
"top_k" : 100
}
},
{
"stage_type" : "apply" ,
"stage_id" : "code_execution" ,
"parameters" : {
"code" : "def transform(doc): \n content = doc.get('content', '') \n # Complex filtering logic \n word_count = len(content.split()) \n if word_count < 50: \n return None \n # Check for required sections \n required = ['introduction', 'conclusion'] \n content_lower = content.lower() \n if not all(section in content_lower for section in required): \n return None \n doc['metadata']['word_count'] = word_count \n return doc"
}
}
]
Security
Restriction Description Network No outbound network access Filesystem No file system access Imports Limited to approved packages Resources Memory and CPU limits enforced
Allowed Packages
Built-in packages available:
json, re, math, datetime, collections
itertools, functools, operator
Additional packages can be requested via the packages parameter.
Error Handling
Error Behavior Syntax error Stage fails Runtime exception Document skipped Timeout Document skipped Memory exceeded Stage fails
Always handle missing fields gracefully using .get() with defaults to avoid runtime errors.
Debugging
Enable debug mode to see execution details:
{
"stage_type" : "apply" ,
"stage_id" : "code_execution" ,
"parameters" : {
"code" : "def transform(doc): \n print(f'Processing: {doc.get( \" document_id \" )}') \n return doc" ,
"debug" : true
}
}