Overview
Plugins are the core processing units in Mixpeek. They define how your data is transformed, embedded, and indexed. Mixpeek provides two types of plugins:
- Builtin plugins: Pre-built extractors maintained by Mixpeek (text, image, video, document, etc.)
- Custom plugins: Your own extraction logic running on Mixpeek infrastructure
Deployment modes:
| Mode | Description | Use Case |
|---|
| Batch processing | High-throughput Ray Data pipelines | Processing collections, indexing documents |
| Real-time inference | Ray Serve HTTP endpoints | Live API requests, synchronous embedding |
Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Plugin System │
├──────────────────────────┬──────────────────────────────────────┤
│ Builtin Plugins │ Custom Plugins │
│ (engine/plugins/) │ (S3 uploaded archives) │
├──────────────────────────┴──────────────────────────────────────┤
│ Pipeline Builder │
│ - Declarative step definitions │
│ - Resource allocation (CPU/GPU/API) │
│ - Content-type filtering │
├─────────────────────────────────────────────────────────────────┤
│ Ray Data (Batch) │ Ray Serve (Real-time) │
│ - map_batches() processing │ - HTTP deployment │
│ - DataFrame input/output │ - Auto-scaling │
│ - Parallel execution │ - Load balancing │
├─────────────────────────────────────────────────────────────────┤
│ Model Registry │
│ - HuggingFace models (cluster-cached) │
│ - Custom S3 models (namespace-scoped) │
│ - Lazy loading (on-demand) │
└─────────────────────────────────────────────────────────────────┘
Quick Start: Custom Plugin
1. Create Plugin Structure
my_extractor/
├── manifest.py # Schemas + metadata
├── pipeline.py # Batch processing pipeline
├── realtime.py # HTTP endpoint (optional, Enterprise)
└── processors/
└── core.py # Your processing logic
2. Define Your Plugin (manifest.py)
from pydantic import BaseModel, Field
from typing import List, Optional
# Input schema - what your plugin accepts
class MyInput(BaseModel):
text: str = Field(..., description="Input text to process")
# Output schema - what your plugin produces
class MyOutput(BaseModel):
embedding: List[float] = Field(..., description="384-dim embedding vector")
sentiment: str = Field(..., description="positive/negative/neutral")
# Parameters - user-configurable options
class MyParams(BaseModel):
threshold: float = Field(default=0.5, ge=0, le=1, description="Confidence threshold")
model_size: str = Field(default="base", description="Model size: base or large")
# Plugin metadata
metadata = {
"feature_extractor_name": "my_extractor",
"version": "1.0.0",
"description": "Custom text embedding with sentiment analysis",
"category": "text",
}
input_schema = MyInput
output_schema = MyOutput
parameter_schema = MyParams
supported_input_types = ["text"]
# Vector index definitions for Qdrant
features = [
{
"feature_name": "my_embedding",
"feature_type": "embedding",
"embedding_dim": 384,
"distance_metric": "cosine",
},
]
3. Implement Batch Processing (processors/core.py)
from dataclasses import dataclass
import pandas as pd
@dataclass
class MyConfig:
threshold: float = 0.5
model_size: str = "base"
class MyProcessor:
"""Batch processor for Ray Data pipelines."""
def __init__(self, config: MyConfig, progress_actor=None):
self.config = config
self._model = None # Lazy loading
def _ensure_model_loaded(self):
"""Load model on first batch (lazy loading)."""
if self._model is None:
from sentence_transformers import SentenceTransformer
model_name = "all-MiniLM-L6-v2" if self.config.model_size == "base" else "all-mpnet-base-v2"
self._model = SentenceTransformer(model_name)
def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
"""Process a batch of rows."""
self._ensure_model_loaded()
# Get text from batch
texts = batch["text"].fillna("").tolist()
# Generate embeddings
embeddings = self._model.encode(texts).tolist()
batch["my_embedding"] = embeddings
# Simple sentiment (replace with your logic)
batch["sentiment"] = ["positive" if len(t) > 50 else "neutral" for t in texts]
return batch
4. Wire Into Pipeline (pipeline.py)
from engine.plugins.extractors.pipeline import (
PipelineDefinition,
ResourceType,
StepDefinition,
build_pipeline_steps
)
from .manifest import MyParams, metadata
from .processors.core import MyConfig, MyProcessor
def build_steps(extractor_request, container=None, base_steps=None, **kwargs):
"""Build the extraction pipeline."""
# Parse parameters from request
params = MyParams(**(extractor_request.extractor_config.parameters or {}))
# Create processor config
config = MyConfig(
threshold=params.threshold,
model_size=params.model_size,
)
# Define pipeline steps
pipeline = PipelineDefinition(
name=metadata["feature_extractor_name"],
version=metadata["version"],
steps=[
StepDefinition(
service_class=MyProcessor,
resource_type=ResourceType.CPU, # or GPU, API
config=config,
),
]
)
# Build Ray Data steps
steps = build_pipeline_steps(pipeline)
return {"steps": steps}
5. Add Real-time Endpoint (realtime.py) - Enterprise Only
from shared.plugins.inference.serve import BaseInferenceService
class InferenceService(BaseInferenceService):
"""Real-time HTTP inference endpoint."""
def __init__(self):
super().__init__()
self._model = None
async def __call__(self, inputs: dict, parameters: dict) -> dict:
"""Handle inference request."""
# Lazy load model
if self._model is None:
from sentence_transformers import SentenceTransformer
self._model = SentenceTransformer("all-MiniLM-L6-v2")
text = inputs.get("text", "")
embedding = self._model.encode([text])[0].tolist()
return {
"embedding": embedding,
"sentiment": "positive" if len(text) > 50 else "neutral"
}
6. Upload and Deploy
# Package your plugin
zip -r my_extractor.zip my_extractor/
# Get upload URL
curl -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/uploads" \
-H "Authorization: Bearer $API_KEY" \
-d '{"name": "my_extractor", "version": "1.0.0"}'
# Upload
curl -X PUT "$PRESIGNED_URL" --data-binary @my_extractor.zip
# Confirm
curl -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/uploads/$UPLOAD_ID/confirm" \
-H "Authorization: Bearer $API_KEY"
# Deploy real-time endpoint (Enterprise)
curl -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/$PLUGIN_ID/deploy" \
-H "Authorization: Bearer $API_KEY"
The features list in manifest.py is how Mixpeek knows which Qdrant vector indexes to create for your plugin. Use the exact key names below — using the wrong names silently creates a collection with no vector indexes.
The wrong key names are a common pitfall. They produce no error at upload time but result in 0 documents being indexed.
# ✅ CORRECT — these exact key names are required
features = [
{
"feature_type": "embedding", # must be "embedding"
"feature_name": "my_embedding", # the vector index name
"embedding_dim": 768, # vector dimensionality
"distance_metric": "cosine", # "cosine", "euclid", or "dot"
},
]
# ❌ WRONG — intuitive but silently ignored (0 vectors created)
features = [
{
"type": "vector", # wrong key: use feature_type
"name": "my_embedding", # wrong key: use feature_name
"dimensions": 768, # wrong key: use embedding_dim
"distance": "cosine", # wrong key: use distance_metric
},
]
Multiple vectors are supported — add one entry per embedding your plugin produces:
features = [
{
"feature_type": "embedding",
"feature_name": "visual_embedding",
"embedding_dim": 768,
"distance_metric": "cosine",
},
{
"feature_type": "embedding",
"feature_name": "semantic_embedding",
"embedding_dim": 384,
"distance_metric": "cosine",
},
]
DataFrame Schema Reference
Your plugin’s __call__(batch: pd.DataFrame) receives a DataFrame with the following columns:
| Column | Type | Description |
|---|
data | str | For binary blobs: S3 URL (e.g. s3://bucket/org_.../file.jpg). For text blobs: raw string content. Never raw bytes. |
document_id | str | Unique document ID (e.g. doc_abc123) |
object_id | str | Source object ID in the bucket |
blob_id | str | Blob identifier within the object |
blob_property | str | Property name that produced this blob (matches your bucket schema key, e.g. "content") |
blob_type | str | Asset type: "image", "video", "audio", "text" |
collection_id | str | Collection that triggered processing |
feature_extractor_id | str | ID of the feature extractor |
mime_type | str | MIME type (e.g. image/jpeg, text/plain) |
size_bytes | int | File size in bytes (0 for inline text) |
input_mappings does NOT rename columns. The data column is always data regardless of any input_mappings configuration in manifest.py. input_mappings is metadata only — it does not transform the DataFrame. Always read from batch["data"].
Loading Assets from S3 URLs
The data column contains s3:// URLs, not raw bytes. Use the stable Plugin SDK to resolve them to local paths:
from shared.plugins import open_asset # stable public import
class MyProcessor:
def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
for idx, row in batch.iterrows():
s3_url = row["data"] # e.g. "s3://mixpeek-server-dev/org_.../file.jpg"
# open_asset downloads and auto-cleans temp files
with open_asset(s3_url, suffix=".jpg") as local_path:
from PIL import Image
img = Image.open(local_path).convert("RGB")
# ... process img ...
return batch
For text blobs, the data column contains the raw string — no download needed:
class MyTextProcessor:
def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
texts = batch["data"].fillna("").tolist()
batch["embedding"] = self._model.encode(texts).tolist()
return batch
If you need the raw (path, is_temp) tuple rather than a context manager, use download_asset:
from shared.plugins import download_asset
local_path, is_temp = download_asset(row["data"], suffix=".mp4")
try:
result = process(local_path)
finally:
if is_temp:
import os
os.unlink(local_path)
Backward compatibility: The internal path from shared.utilities.files import resolve_url_to_local_path still works but may change without notice. Prefer from shared.plugins import open_asset for future-proof plugins.
Using Built-in Services
Instead of implementing models from scratch, compose existing Mixpeek services:
Option 1: Import Batch Services Directly
from shared.inference.registry import get_batch_service
# Get the service class
E5Batch = get_batch_service("intfloat/multilingual-e5-large-instruct")
SigLIPBatch = get_batch_service("google/siglip-base-patch16-224")
WhisperBatch = get_batch_service("openai/whisper-large-v3-turbo")
# Use in your pipeline
def build_steps(extractor_request, **kwargs):
from shared.inference.intfloat.multilingual_e5_large_instruct.models import InferenceConfigs
config = InferenceConfigs(
text_column="text",
output_column_name="embedding",
)
pipeline = PipelineDefinition(
name="my_extractor",
version="v1",
steps=[
StepDefinition(
service_class=E5Batch,
resource_type=ResourceType.CPU,
config=config,
),
]
)
return {"steps": build_pipeline_steps(pipeline)}
Option 2: Call Real-time Services via HTTP
import httpx
async def embed_text(text: str) -> list[float]:
"""Call the E5 embedding service."""
async with httpx.AsyncClient() as client:
response = await client.post(
"http://localhost:8001/multilingual_e5_large_instruct/v1",
json={"inputs": {"texts": [text]}, "parameters": {}}
)
return response.json()["embeddings"][0]
Available Services
| Service ID | Type | Dimensions | Use Case |
|---|
intfloat/multilingual-e5-large-instruct | Embedding | 1024 | Multilingual text embeddings |
google/siglip-base-patch16-224 | Embedding | 512 | Image embeddings |
jinaai/jina-embeddings-v2-base-code | Embedding | 768 | Code embeddings |
BAAI/bge-reranker-v2-m3 | Reranker | - | Search result reordering |
openai/whisper-large-v3-turbo | Transcription | - | Audio to text |
Loading Models
Mixpeek provides three ways to load models in your plugins:
1. HuggingFace Models (Cluster-Cached)
Models are downloaded once and cached across all workers:
from engine.models import load_hf_model
from transformers import AutoConfig
class MyProcessor:
def __init__(self, config):
self.config = config
self._model = None
def _ensure_model_loaded(self):
if self._model is None:
# Load from cluster cache (zero-copy sharing)
cached = load_hf_model(
hf_model_id="intfloat/multilingual-e5-large-instruct",
model_class="AutoModel",
tokenizer_class="AutoTokenizer",
torch_dtype="float16",
)
# Instantiate model from cached state_dict
from transformers import AutoModel, AutoTokenizer
config = AutoConfig.from_dict(cached["config"])
self._model = AutoModel(config)
self._model.load_state_dict(cached["state_dict"])
self._tokenizer = AutoTokenizer.from_pretrained(
cached["tokenizer_config"]["tokenizer_dir"]
)
2. Custom S3 Models (Namespace-Scoped)
Use your own uploaded model weights:
from engine.models import load_namespace_model
import torch
class MyProcessor:
def __init__(self, config):
self._model = None
def _ensure_model_loaded(self):
if self._model is None:
# Load pre-uploaded weights from S3
weights = load_namespace_model("my-fine-tuned-bert_1_0_0")
# Initialize your model architecture
self._model = torch.nn.Sequential(
torch.nn.Linear(768, 256),
torch.nn.ReLU(),
torch.nn.Linear(256, 128),
)
self._model.load_state_dict(weights)
self._model.eval()
3. LazyModelMixin (Recommended for New Plugins)
Automatic lazy loading with cluster-wide caching:
from engine.models import LazyModelMixin
from shared.plugins.inference.batch import BaseBatchInferenceService
class MyEmbedder(LazyModelMixin, BaseBatchInferenceService):
"""Batch embedder with automatic lazy loading."""
# Configure model (class attributes)
model_id = "intfloat/multilingual-e5-large-instruct"
model_class = "AutoModel"
tokenizer_class = "AutoTokenizer"
torch_dtype = "float16"
def __init__(self, config, **kwargs):
super().__init__(**kwargs)
self.config = config
# Model NOT loaded here - loaded on first batch
def _process_batch(self, batch):
# Model automatically loaded on first call
model, tokenizer = self.get_model()
texts = batch["text"].tolist()
inputs = tokenizer(texts, return_tensors="pt", padding=True, truncation=True)
outputs = model(**inputs)
batch["embedding"] = outputs.last_hidden_state.mean(dim=1).tolist()
return batch
4. @lazy_model Decorator (Quick Pattern)
For simpler cases:
from engine.models import lazy_model
class MyProcessor:
def __init__(self, config):
self.config = config
self._model = None
self._tokenizer = None
@lazy_model(
model_id="intfloat/multilingual-e5-large-instruct",
model_class="AutoModel",
tokenizer_class="AutoTokenizer",
)
def __call__(self, batch):
# self._model and self._tokenizer are automatically available
texts = batch["text"].tolist()
inputs = self._tokenizer(texts, return_tensors="pt", padding=True)
outputs = self._model(**inputs)
batch["embedding"] = outputs.last_hidden_state.mean(dim=1).tolist()
return batch
Pipeline Step Configuration
Resource Types
| Type | Description | Use Case |
|---|
ResourceType.CPU | CPU-only workers | Text processing, lightweight models |
ResourceType.GPU | GPU-allocated workers | Large models, image/video |
ResourceType.API | External API calls | OpenAI, Vertex AI, Anthropic |
Row Conditions
Filter which rows your step processes:
from engine.plugins.extractors.pipeline import RowCondition
StepDefinition(
service_class=ImageProcessor,
resource_type=ResourceType.GPU,
condition=RowCondition.IS_IMAGE, # Only process images
)
| Condition | Matches |
|---|
RowCondition.IS_TEXT | text/* MIME types |
RowCondition.IS_IMAGE | image/* MIME types |
RowCondition.IS_VIDEO | video/* MIME types |
RowCondition.IS_AUDIO | audio/* MIME types |
RowCondition.IS_PDF | application/pdf |
RowCondition.ALWAYS | All rows (default) |
Conditional Steps
Enable/disable steps based on parameters:
pipeline = PipelineDefinition(
name="my_extractor",
version="v1",
steps=[
StepDefinition(
service_class=TextChunker,
resource_type=ResourceType.CPU,
enabled=params.enable_chunking, # Conditional
config=chunker_config,
),
StepDefinition(
service_class=E5Batch,
resource_type=ResourceType.CPU,
config=embedding_config,
),
]
)
Builtin Plugin Development
This section is for Mixpeek internal developers creating new builtin plugins.
Directory Structure
engine/plugins/builtin/my_extractor/v1/
├── __init__.py
├── definition.py # Single source of truth
├── pipeline.py # Batch processing pipeline
├── realtime.py # Optional: Ray Serve endpoint
└── processors/
├── __init__.py
└── core.py # Processing logic
definition.py (Single Source of Truth)
"""My extractor plugin definition."""
from enum import IntEnum
from typing import List, Literal
from pydantic import BaseModel, Field
from shared.billing.models import CostRate, CostUnit
from shared.collection.features.extractors.models import (
CostsInfo,
FeatureExtractorModel,
)
from shared.collection.features.vectors.models import (
VectorIndex,
VectorIndexDefinition,
VectorType,
)
# =============================================================================
# COST CONFIGURATION
# =============================================================================
class MyExtractorCosts(IntEnum):
PER_1K_TOKENS = 1
TIER = 1
TIER_LABEL = "SIMPLE"
RATES: List[CostRate] = [
CostRate(
unit=CostUnit.TOKENS_1K,
credits_per_unit=MyExtractorCosts.PER_1K_TOKENS,
description="Per 1K tokens processed",
),
]
# =============================================================================
# PARAMETER SCHEMA
# =============================================================================
class MyExtractorParams(BaseModel):
extractor_type: Literal["my_extractor"] = "my_extractor"
threshold: float = Field(default=0.5, ge=0, le=1)
enable_feature_x: bool = Field(default=True)
# =============================================================================
# INPUT/OUTPUT SCHEMAS
# =============================================================================
class MyExtractorInput(BaseModel):
text: str = Field(..., min_length=1)
class MyExtractorOutput(BaseModel):
embedding: List[float] = Field(..., min_length=1024, max_length=1024)
# =============================================================================
# VECTOR INDEX CONFIGURATION
# =============================================================================
VECTOR_INDEXES = [
VectorIndexDefinition(
name="my_embedding",
description="Dense embedding vector",
type="single",
index=VectorIndex(
name="my_extractor_v1_embedding",
description="Dense vector embedding",
dimensions=1024,
type=VectorType.DENSE,
distance="Cosine",
inference_service_id="intfloat/multilingual-e5-large-instruct",
),
),
]
# =============================================================================
# PLUGIN DEFINITION
# =============================================================================
definition = FeatureExtractorModel(
feature_extractor_name="my_extractor",
version="v1",
description="My custom extractor for specialized processing",
icon="wand-2",
input_schema=MyExtractorInput,
output_schema=MyExtractorOutput,
parameter_schema=MyExtractorParams,
required_vector_indexes=VECTOR_INDEXES,
costs=CostsInfo(tier=TIER, tier_label=TIER_LABEL, rates=RATES),
)
metadata = {
"name": "my_extractor",
"version": "v1",
"description": definition.description,
}
Register in Plugin Registry
Add to engine/plugins/registry.py:
BUILTIN_PLUGINS = {
# ... existing plugins
"my_extractor_v1": {
"name": "my_extractor",
"version": "v1",
"module_path": "engine.plugins.builtin.my_extractor.v1",
},
}
Security Requirements
Custom plugins are scanned before deployment. Code violating these rules is rejected.
Allowed
| Category | Libraries |
|---|
| Data | numpy, pandas, polars, pyarrow |
| ML/AI | torch, transformers, sentence_transformers, onnxruntime |
| Image | PIL, cv2, imageio |
| Audio | librosa, soundfile, ffmpeg-python |
| HTTP | requests, httpx, aiohttp |
| Utils | os, json, re, typing, dataclasses, pydantic, logging |
Forbidden
| Category | Blocked | Reason |
|---|
| Execution | subprocess, os.system, os.popen, os.exec*, eval, exec | Shell/code execution |
| System | ctypes, socket, multiprocessing | Low-level access |
| Builtins | open, setattr, delattr, __import__ | File/attribute mutation |
import os is allowed. Only specific dangerous functions (os.system, os.popen, os.exec*, os.spawn*) are blocked. Safe uses like os.environ["KEY"] and os.path.join(...) work fine.
Allowed Read-Only Builtins
getattr(), hasattr(), and dir() are allowed in custom plugins. These are read-only operations commonly used by ML libraries and configuration patterns:
# All of these are fine in custom plugins
model_name = getattr(config, "model_name", "default")
if hasattr(self, "_model"):
self._model.eval()
# os.environ is allowed for device/runtime configuration
import os
os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1"
File I/O in Plugins
Direct open() calls in your plugin .py files are blocked by the security scanner.
However, library-internal file I/O is allowed. The scanner only analyzes your plugin’s Python source code via AST inspection — it does not scan compiled C extensions or library internals. This means:
| Pattern | Allowed? | Why |
|---|
onnxruntime.InferenceSession("model.onnx") | Yes | C++ internal file I/O |
transformers.AutoModel.from_pretrained(...) | Yes | Library-internal download + read |
torch.load("weights.pt") | Yes | Library-internal file I/O |
open("config.json", "r") in your .py | No | Direct open() in plugin code |
pd.read_csv("data.csv") | Yes | Library-internal file I/O |
For reading configuration files, embed your config as Python dicts/dataclasses in your plugin code, or use json.loads() on a string constant.
Custom plugins can access Mixpeek’s platform LLM services (Gemini, OpenAI, Anthropic) through the container.llm accessor. This gives you:
- Platform-managed API keys — no need to bring your own
- Response caching for repeated calls
- Cost tracking integrated with your org billing
- Error handling with typed failures
Quick Example
# pipeline.py
def build_steps(extractor_request, container=None, **kwargs):
config = {
"llm_service": container.llm, # Pass to your processor
"threshold": 0.8,
}
pipeline = PipelineDefinition(
name="brand_compliance",
version="v1",
steps=[
StepDefinition(
service_class=BrandComplianceProcessor,
resource_type=ResourceType.API,
config=config,
),
]
)
return {"steps": build_pipeline_steps(pipeline)}
# processors/compliance.py
import asyncio
class BrandComplianceProcessor:
def __init__(self, config, **kwargs):
self._llm = config.get("llm_service")
self.threshold = config.get("threshold", 0.8)
def __call__(self, batch):
loop = asyncio.new_event_loop()
for idx, row in batch.iterrows():
result = loop.run_until_complete(
self._llm.generate(
instruction="Evaluate this image for brand compliance",
images=[row["image_url"]],
provider="google",
model="gemini-2.5-flash",
schema={
"type": "object",
"properties": {
"compliant": {"type": "boolean"},
"violations": {"type": "array", "items": {"type": "string"}},
"score": {"type": "number"}
}
}
)
)
batch.at[idx, "compliance_result"] = result.data
return batch
Available LLM Methods
| Method | Description |
|---|
container.llm.generate(instruction, text, images, provider, model, schema) | Convenience method for text/image generation |
container.llm.process(LLMRequest(...)) | Full control with LLMRequest object |
container.llm.service | Direct access to CentralLLMService instance |
Supported Providers
| Provider | Models | Best For |
|---|
google | gemini-2.5-flash, gemini-2.5-pro | Fast multimodal, structured output |
openai | gpt-4o, gpt-4o-mini | Text generation, function calling |
anthropic | claude-sonnet-4-20250514 | Complex reasoning, long context |
Plugin Parameters and Secrets
How Parameters Flow
Plugin parameters are configured when enabling the plugin for a namespace. They flow through the pipeline as:
Enable Plugin API → MongoDB namespace.feature_extractors[].params → extractor_request.params → build_steps()
# When enabling a plugin via API:
curl -X POST "https://api.mixpeek.com/v1/organizations/$ORG_ID/plugins/$PLUGIN_ID/enable" \
-H "Authorization: Bearer $API_KEY" \
-d '{
"params": {
"model_name": "gemini-2.5-flash",
"confidence_threshold": 0.8,
"api_key": "your-external-api-key"
}
}'
# In your pipeline.py, read params:
def build_steps(extractor_request, container=None, **kwargs):
params = extractor_request.extractor_config.parameters or {}
model_name = params.get("model_name", "gemini-2.5-flash")
threshold = params.get("confidence_threshold", 0.8)
Managing Secrets
Custom plugins can access encrypted organization secrets via container.secrets. Secrets are stored encrypted at rest using the Organization Secrets API and decrypted on demand at plugin runtime.
# In your pipeline.py:
def build_steps(extractor_request, container=None, **kwargs):
processor = MyProcessor(
config={},
secrets=container.secrets, # Pass secrets accessor to your processor
)
return {"steps": [processor], "prepare": lambda ds: ds}
class MyProcessor:
def __init__(self, config, secrets=None):
self._secrets = secrets
async def process(self, data):
# Read a secret at runtime (decrypted automatically)
api_key = await self._secrets.get("EXTERNAL_API_KEY")
# Use for external service calls
| Method | Description |
|---|
await container.secrets.get("KEY") | Get a decrypted secret value by name |
await container.secrets.list() | List available secret names (not values) |
For platform LLM services (Gemini, OpenAI, Anthropic), use container.llm instead — it handles API keys automatically. Use container.secrets for your own external API keys, tokens, and credentials.
You can also pass non-secret configuration as plugin parameters:
# Plugin parameters (stored in MongoDB, not encrypted)
{
"params": {
"webhook_url": "https://your-service.com/callback",
"threshold": 0.8
}
}
Archive Size and Model Loading
Upload Limits
| Limit | Value |
|---|
| Archive upload size | 500 MB |
| Extracted archive size | 500 MB |
| Max files in archive | 1,000 |
Loading Large Models
Most ML models exceed 500 MB. The recommended pattern is lazy downloading from HuggingFace Hub at init time:
class MyProcessor:
def __init__(self, config, **kwargs):
self._model = None
def _ensure_model_loaded(self):
if self._model is None:
# Downloaded once, cached on disk for subsequent calls
from transformers import AutoModel
self._model = AutoModel.from_pretrained(
"google/siglip-base-patch16-224",
cache_dir="/tmp/hf_cache"
)
def __call__(self, batch):
self._ensure_model_loaded()
# ... process batch
For custom model weights, use the namespace model API:
from shared.models.loader import load_namespace_model
weights = load_namespace_model("my-fine-tuned-model_1_0_0")
First cold start will download models (~1-2 min depending on size). Subsequent starts use the HuggingFace cache. Use LazyModelMixin for automatic cluster-wide caching.
Plugins vs Retrievers: Architecture
Understanding the boundary between plugins and retrievers is key for building retrieval-augmented pipelines.
Plugin Responsibility
Plugins handle data transformation: embedding, classification, extraction.
Raw Data → Plugin (batch/realtime) → Embeddings + Structured Output → Qdrant
Retriever Responsibility
Retrievers handle search and retrieval context: query processing, vector search, reranking, enrichment.
Query → Retriever Stages → [semantic_search → rerank → agentic_enrich] → Results
How They Interact
┌───────────────────────────────────────────────────┐
│ Ingestion │
│ Object → Plugin pipeline.py → Embeddings → Qdrant│
└───────────────────────────────────────────────────┘
┌───────────────────────────────────────────────────┐
│ Retrieval │
│ Query → Plugin realtime.py (embed query) │
│ → Retriever semantic_search (Qdrant search) │
│ → Retriever rerank (sort results) │
│ → Retriever agentic_enrich (LLM analysis) │
│ → Results with scores │
└───────────────────────────────────────────────────┘
What realtime.py Receives
The run_inference(inputs, parameters) method receives:
inputs["texts"] or inputs["text"] — text to embed
inputs["vector_index"] — which vector index to use
parameters — inference options
It does not receive retrieval context or search results. If your pipeline needs retrieval context (e.g., “compare this image against 5 nearest references”), use retriever stages:
- Plugin
pipeline.py: Embed reference images during ingestion
- Plugin
realtime.py: Embed the query image at search time
- Retriever
semantic_search stage: Find nearest references in Qdrant
- Retriever
agentic_enrich or code_execution stage: Run VLM comparison with retrieved references
For retrieval-dependent analysis (like brand compliance with reference comparison), the VLM evaluation belongs in a retriever stage, not in the plugin’s realtime.py.
Example: Image Classification Plugin
Here’s a complete image processing plugin (addresses the common need for non-text templates):
# manifest.py
feature_extractor_name = "image_classifier"
version = "1.0.0"
description = "Image classification with structured labels"
dependencies = ["torch", "torchvision", "pillow"]
features = [
{
"feature_name": "image_embedding",
"feature_type": "embedding",
"embedding_dim": 512,
"distance_metric": "cosine",
}
]
feature_uri = "mixpeek://image_classifier@1.0.0/image_embedding"
output_schema = {
"image_embedding": {"type": "array", "items": {"type": "number"}},
"labels": {"type": "array", "items": {"type": "string"}},
"confidence": {"type": "number"},
}
# processors/classifier.py
import io
import pandas as pd
import requests
class ImageClassifierProcessor:
def __init__(self, config=None, **kwargs):
self._model = None
self._preprocess = None
self.config = config or {}
def _ensure_model_loaded(self):
if self._model is None:
import torch
from torchvision import models, transforms
self._model = models.mobilenet_v2(pretrained=True)
self._model.eval()
self._preprocess = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
),
])
def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
self._ensure_model_loaded()
import torch
from PIL import Image
embeddings = []
labels_list = []
for _, row in batch.iterrows():
image_url = row.get("source_url") or row.get("image_url", "")
try:
resp = requests.get(image_url, timeout=30)
img = Image.open(io.BytesIO(resp.content)).convert("RGB")
tensor = self._preprocess(img).unsqueeze(0)
with torch.no_grad():
output = self._model(tensor)
# Use penultimate layer as embedding
embedding = output.squeeze().tolist()[:512]
top_classes = torch.topk(output, 5).indices[0].tolist()
embeddings.append(embedding)
labels_list.append([str(c) for c in top_classes])
except Exception:
embeddings.append([0.0] * 512)
labels_list.append([])
batch["image_embedding"] = embeddings
batch["labels"] = labels_list
return batch
# pipeline.py
from engine.plugins.extractors.pipeline import (
PipelineDefinition, ResourceType, RowCondition,
StepDefinition, build_pipeline_steps,
)
from .processors.classifier import ImageClassifierProcessor
def build_steps(extractor_request, container=None, **kwargs):
params = (extractor_request.extractor_config.parameters or {})
pipeline = PipelineDefinition(
name="image_classifier",
version="1.0.0",
steps=[
StepDefinition(
service_class=ImageClassifierProcessor,
resource_type=ResourceType.GPU,
condition=RowCondition.IS_IMAGE,
config={"threshold": params.get("threshold", 0.5)},
),
]
)
return {"steps": build_pipeline_steps(pipeline)}
Example: VLM Analyzer Plugin
A plugin that uses platform LLM services for vision-language analysis:
# manifest.py
feature_extractor_name = "vlm_analyzer"
version = "1.0.0"
description = "Vision-language analysis using Gemini"
dependencies = ["pillow", "httpx"]
features = [
{
"feature_name": "vlm_embedding",
"feature_type": "embedding",
"embedding_dim": 384,
"distance_metric": "cosine",
}
]
output_schema = {
"vlm_embedding": {"type": "array", "items": {"type": "number"}},
"analysis": {"type": "object"},
"tags": {"type": "array", "items": {"type": "string"}},
}
# processors/analyzer.py
import asyncio
import hashlib
import pandas as pd
class VLMAnalyzerProcessor:
def __init__(self, config=None, **kwargs):
self.config = config or {}
self._llm = config.get("llm_service")
self.analysis_prompt = config.get(
"prompt",
"Describe this image. Return tags and a detailed description."
)
def _text_to_embedding(self, text, dim=384):
"""Deterministic text-to-embedding for analysis results."""
h = hashlib.sha256(text.encode()).digest()
return [((b % 200) - 100) / 100.0 for b in (h * (dim // 32 + 1))[:dim]]
def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
loop = asyncio.new_event_loop()
embeddings = []
analyses = []
tags_list = []
for _, row in batch.iterrows():
image_url = row.get("source_url", "")
try:
result = loop.run_until_complete(
self._llm.generate(
instruction=self.analysis_prompt,
images=[image_url],
provider="google",
model="gemini-2.5-flash",
schema={
"type": "object",
"properties": {
"description": {"type": "string"},
"tags": {"type": "array", "items": {"type": "string"}},
}
}
)
)
analysis = result.data
analyses.append(analysis)
tags_list.append(analysis.get("tags", []))
embeddings.append(
self._text_to_embedding(analysis.get("description", ""))
)
except Exception:
analyses.append({})
tags_list.append([])
embeddings.append([0.0] * 384)
batch["vlm_embedding"] = embeddings
batch["analysis"] = analyses
batch["tags"] = tags_list
return batch
# pipeline.py
from engine.plugins.extractors.pipeline import (
PipelineDefinition, ResourceType, RowCondition,
StepDefinition, build_pipeline_steps,
)
from .processors.analyzer import VLMAnalyzerProcessor
def build_steps(extractor_request, container=None, **kwargs):
params = (extractor_request.extractor_config.parameters or {})
config = {
"llm_service": container.llm,
"prompt": params.get("prompt", "Describe this image in detail."),
}
pipeline = PipelineDefinition(
name="vlm_analyzer",
version="1.0.0",
steps=[
StepDefinition(
service_class=VLMAnalyzerProcessor,
resource_type=ResourceType.API,
condition=RowCondition.IS_IMAGE,
config=config,
),
]
)
return {"steps": build_pipeline_steps(pipeline)}
Local Testing
Unit Testing Your Plugin
Test your processors locally without the full Mixpeek stack:
# tests/test_my_processor.py
import pandas as pd
from my_plugin.processors.core import MyProcessor
def test_processor_basic():
processor = MyProcessor(config={"threshold": 0.5})
batch = pd.DataFrame({
"text": ["Hello world", "Test input"],
"document_id": ["doc_1", "doc_2"],
})
result = processor(batch)
assert "my_embedding" in result.columns
assert len(result) == 2
assert all(len(e) == 384 for e in result["my_embedding"])
Testing with Mock LLM Service
# tests/test_with_llm.py
from unittest.mock import AsyncMock, MagicMock
def test_vlm_processor():
mock_llm = MagicMock()
mock_llm.generate = AsyncMock(return_value=MagicMock(
data={"description": "A red car", "tags": ["car", "red", "vehicle"]}
))
processor = VLMAnalyzerProcessor(config={
"llm_service": mock_llm,
"prompt": "Describe this image",
})
batch = pd.DataFrame({
"source_url": ["https://example.com/car.jpg"],
})
result = processor(batch)
assert result["tags"].iloc[0] == ["car", "red", "vehicle"]
mock_llm.generate.assert_called_once()
Testing build_steps
# tests/test_pipeline.py
from unittest.mock import MagicMock
from my_plugin.pipeline import build_steps
def test_build_steps_returns_steps():
mock_request = MagicMock()
mock_request.extractor_config.parameters = {"threshold": 0.8}
mock_container = MagicMock()
result = build_steps(mock_request, container=mock_container)
assert "steps" in result
assert len(result["steps"]) > 0
Running the Security Scanner Locally
Before uploading, validate your plugin passes the security scan:
# test_security.py
from api.namespaces.plugins.security.scanner import PluginSecurityScanner
scanner = PluginSecurityScanner(strict_mode=False)
result = scanner.scan_directory("my_plugin/")
if not result.passed:
for v in result.violations:
print(f"{v.filename}:{v.line}: [{v.severity}] {v.message}")
else:
print("Security scan passed!")
Deployment Lifecycle
Deploy Real-time Endpoint
curl -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/$PLUGIN_ID/deploy" \
-H "Authorization: Bearer $API_KEY"
Check Status
curl "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/$PLUGIN_ID/status"
| Status | Description |
|---|
QUEUED | Waiting in deployment queue |
PENDING | Deployment triggered |
IN_PROGRESS | Blue-green rollout in progress |
DEPLOYED | Ready for real-time inference |
FAILED | Check error field |
NOT_DEPLOYED | Batch-only mode |
Undeploy
curl -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/$PLUGIN_ID/undeploy"
Delete
curl -X DELETE "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/$PLUGIN_ID"
Example: Multi-Stage Pipeline
Here’s a complete example combining transcription and embedding:
# pipeline.py
from shared.inference.registry import get_batch_service
from engine.plugins.extractors.pipeline import (
PipelineDefinition,
ResourceType,
RowCondition,
StepDefinition,
build_pipeline_steps,
)
def build_steps(extractor_request, **kwargs):
params = MyParams(**(extractor_request.extractor_config.parameters or {}))
# Get built-in services
WhisperBatch = get_batch_service("openai/whisper-large-v3-turbo")
E5Batch = get_batch_service("intfloat/multilingual-e5-large-instruct")
pipeline = PipelineDefinition(
name="audio_to_embedding",
version="v1",
steps=[
# Step 1: Transcribe audio
StepDefinition(
service_class=WhisperBatch,
resource_type=ResourceType.API,
condition=RowCondition.IS_AUDIO,
config=WhisperConfig(output_column="transcription"),
),
# Step 2: Embed transcription
StepDefinition(
service_class=E5Batch,
resource_type=ResourceType.CPU,
config=E5Config(
text_column="transcription",
output_column_name="embedding",
),
),
]
)
return {"steps": build_pipeline_steps(pipeline)}
Troubleshooting
Plugin validation failed
Check the validation_errors array. Common issues:
- Using
setattr() or delattr() — use class attributes or constructor assignment instead
- Importing
subprocess — use requests or httpx for HTTP
- Using
open() directly — library-internal file I/O (torch.load, transformers, etc.) is allowed
- Using
eval() or exec() — use json.loads() for parsing, dataclasses for config
import os and getattr() / hasattr() are all allowed. Only the dangerous functions (os.system, os.popen, os.exec*) are blocked.
Task shows COMPLETED but 0 documents were written
This is the most confusing failure mode. The task API returns COMPLETED even when the Qdrant upsert fails for every row — for example, because your plugin declared new vector names that weren’t registered in the namespace’s Qdrant schema.
Symptoms:
Task status: COMPLETED
Documents found: 0
Most common cause — wrong features format in manifest.py:
If you used the wrong key names (type/name/dimensions instead of feature_type/feature_name/embedding_dim), the collection is created with vector_indexes: [] in MongoDB. The engine then tries to write embeddings to vector names that don’t exist in Qdrant and fails silently.
Fix: Correct the features format (see Manifest Features Format) and recreate the collection.
Diagnosis steps:
- Check that
vector_indexes is non-empty on your collection: GET /v1/collections/{collection_id}
- If
vector_indexes is empty, delete and recreate the collection with corrected manifest.py
- Check Ray worker logs for Qdrant upsert errors (
Not existing vector name)
Retriever creation fails with collection_identifiers error
The retriever create endpoint requires collection_identifiers, not collection_ids. The error response hints at this:
{"hint": "Add collection_identifiers to your retriever"}
Use:
{
"collection_identifiers": ["col_abc123"]
}
Not "collection_ids" (used by other endpoints).
Model loading is slow
- Use
LazyModelMixin for automatic cluster caching
- Pre-deploy models via the Model Registry API
- For HuggingFace models, set
cache_dir="/tmp/hf_cache" to reuse across restarts
- Check
cached: true in deployment response
Archive too large
The upload limit is 500 MB. For plugins with large models:
- Don’t bundle model weights in the archive
- Download from HuggingFace Hub at init time using lazy loading
- Use
load_namespace_model() for custom weights uploaded via the Models API
Batch processing fails
- Ensure
__call__() returns a DataFrame with the same number of rows
- Reset DataFrame index:
batch = batch.reset_index(drop=True)
- Handle None/empty values:
batch["col"].fillna("")
- Wrap async LLM calls with
asyncio.new_event_loop().run_until_complete()
Real-time endpoint not responding
- Check deployment status via
/status endpoint
- Verify plugin is
DEPLOYED not NOT_DEPLOYED
- Check Ray Serve logs for errors
Can’t access retrieval context in realtime.py
realtime.py handles embedding, not retrieval. If your pipeline needs to compare against stored references, configure retriever stages (semantic_search → rerank → agentic_enrich) to handle the retrieval and comparison logic. See Plugins vs Retrievers above.