Skip to main content
Code Execution stage showing custom code transformations
The Code Execution stage allows you to run custom Python code to transform, filter, or enrich documents. This provides maximum flexibility for complex logic that can’t be expressed with other stages.
Stage Category: APPLY (Transforms documents)Transformation: N documents → M documents (custom logic)

When to Use

Use CaseDescription
Custom transformationsComplex field calculations
Business logicDomain-specific rules
Data normalizationCustom parsing/formatting
Advanced filteringLogic beyond structured_filter

When NOT to Use

ScenarioRecommended Alternative
Simple field transformsjson_transform
LLM-based enrichmentllm_enrichment
Standard filteringstructured_filter
External API callsapi_call

Parameters

ParameterTypeDefaultDescription
codestringRequiredPython code to execute
runtimestringpython3.11Python runtime version
timeoutinteger5000Execution timeout (ms)
memoryinteger128Memory limit (MB)
packagesarray[]Additional pip packages

Configuration Examples

{
  "stage_type": "apply",
  "stage_id": "code_execution",
  "parameters": {
    "code": "def transform(doc):\n    doc['word_count'] = len(doc.get('content', '').split())\n    return doc"
  }
}

Code Structure

Your code must define a transform function:
def transform(doc):
    """
    Transform a single document.

    Args:
        doc: Dictionary containing document fields

    Returns:
        - Modified doc dict to keep document
        - None to filter out document
    """
    # Your logic here
    return doc

Available in Scope

VariableTypeDescription
docdictCurrent document
INPUTdictPipeline input parameters
CONTEXTdictPipeline context

Input Document Structure

doc = {
    "document_id": "doc_123",
    "content": "Document text content...",
    "score": 0.85,
    "metadata": {
        "title": "Document Title",
        "author": "John Doe",
        "date": "2024-01-15"
    }
}

Output Options

Return ValueEffect
doc (modified)Keep document with changes
doc (unmodified)Keep document as-is
NoneFilter out document

Performance

MetricValue
Latency5-50ms per document
TimeoutConfigurable (default 5s)
MemoryConfigurable (default 128MB)
ConcurrencyParallel execution
Code execution adds latency. Keep transformations simple and avoid heavy computation. For complex processing, consider pre-computing during ingestion.

Common Pipeline Patterns

Custom Scoring Pipeline

[
  {
    "stage_type": "filter",
    "stage_id": "semantic_search",
    "parameters": {
      "query": "{{INPUT.query}}",
      "vector_index": "text_extractor_v1_embedding",
      "top_k": 50
    }
  },
  {
    "stage_type": "apply",
    "stage_id": "code_execution",
    "parameters": {
      "code": "def transform(doc):\n    score = doc.get('score', 0)\n    # Boost verified sources\n    if doc.get('metadata', {}).get('verified'):\n        score *= 1.2\n    # Penalize old content\n    if doc.get('metadata', {}).get('year', 2024) < 2020:\n        score *= 0.8\n    doc['custom_score'] = score\n    return doc"
    }
  },
  {
    "stage_type": "sort",
    "stage_id": "sort_relevance",
    "parameters": {
      "score_field": "custom_score"
    }
  }
]

Data Normalization Pipeline

[
  {
    "stage_type": "filter",
    "stage_id": "hybrid_search",
    "parameters": {
      "query": "{{INPUT.query}}",
      "vector_index": "text_extractor_v1_embedding",
      "top_k": 30
    }
  },
  {
    "stage_type": "apply",
    "stage_id": "code_execution",
    "parameters": {
      "code": "def transform(doc):\n    meta = doc.get('metadata', {})\n    # Normalize price to USD\n    price = meta.get('price', 0)\n    currency = meta.get('currency', 'USD')\n    rates = {'EUR': 1.1, 'GBP': 1.27, 'USD': 1.0}\n    doc['metadata']['price_usd'] = price * rates.get(currency, 1.0)\n    return doc"
    }
  }
]

Advanced Filtering

[
  {
    "stage_type": "filter",
    "stage_id": "semantic_search",
    "parameters": {
      "query": "{{INPUT.query}}",
      "vector_index": "text_extractor_v1_embedding",
      "top_k": 100
    }
  },
  {
    "stage_type": "apply",
    "stage_id": "code_execution",
    "parameters": {
      "code": "def transform(doc):\n    content = doc.get('content', '')\n    # Complex filtering logic\n    word_count = len(content.split())\n    if word_count < 50:\n        return None\n    # Check for required sections\n    required = ['introduction', 'conclusion']\n    content_lower = content.lower()\n    if not all(section in content_lower for section in required):\n        return None\n    doc['metadata']['word_count'] = word_count\n    return doc"
    }
  }
]

Security

RestrictionDescription
NetworkNo outbound network access
FilesystemNo file system access
ImportsLimited to approved packages
ResourcesMemory and CPU limits enforced

Allowed Packages

Built-in packages available:
  • json, re, math, datetime, collections
  • itertools, functools, operator
Additional packages can be requested via the packages parameter.

Error Handling

ErrorBehavior
Syntax errorStage fails
Runtime exceptionDocument skipped
TimeoutDocument skipped
Memory exceededStage fails
Always handle missing fields gracefully using .get() with defaults to avoid runtime errors.

Debugging

Enable debug mode to see execution details:
{
  "stage_type": "apply",
  "stage_id": "code_execution",
  "parameters": {
    "code": "def transform(doc):\n    print(f'Processing: {doc.get(\"document_id\")}')\n    return doc",
    "debug": true
  }
}