Custom plugins are an Enterprise feature that requires dedicated infrastructure. Contact us to enable custom plugins for your namespace.
Overview
Custom plugins let you create your own feature extractors that run on Mixpeek’s infrastructure. The framework handles all the complexity - batching, concurrency, GPU allocation, scaling - you just write your processing logic.
Quick Start
# Install the SDK
pip install mixpeek
# Create a new plugin
mixpeek plugin init my_extractor --category text
# Test locally
cd my_extractor && mixpeek plugin test
# Publish to your namespace
mixpeek plugin publish --namespace ns_your_namespace
Plugins must be published to your namespace before they can be used in collections.
For comprehensive CLI documentation including all commands, options, and examples, see the Mixpeek CLI Reference.
Plugin Structure
Every plugin has 4 files:
my_extractor/
├── manifest.py # Schemas and metadata
├── pipeline.py # Processing pipeline (declarative)
├── realtime.py # HTTP endpoint (optional)
└── processors/
└── core.py # Your processing logic
1. manifest.py - Define Your Schemas
from pydantic import BaseModel, Field
class MyExtractorInput(BaseModel):
text: str = Field(..., description="Input text")
class MyExtractorOutput(BaseModel):
result: str
confidence: float = Field(..., ge=0.0, le=1.0)
class MyExtractorParams(BaseModel):
"""Parameters users can configure when creating a collection."""
threshold: float = Field(default=0.5, ge=0.0, le=1.0)
text_column: str = Field(default="text")
metadata = {
"name": "my_extractor",
"version": "v1",
"description": "My custom extractor",
"category": "text",
}
input_schema = MyExtractorInput
output_schema = MyExtractorOutput
parameter_schema = MyExtractorParams
2. processors/core.py - Write Your Logic
from dataclasses import dataclass
from typing import Tuple
import pandas as pd
@dataclass
class MyExtractorConfig:
threshold: float = 0.5
text_column: str = "text"
class MyExtractorProcessor:
"""Called repeatedly with batches of data."""
def __init__(self, config: MyExtractorConfig, progress_actor=None):
self.config = config
self._model = None # Lazy load
def _load_model(self):
if self._model is None:
# Load your model here (e.g., transformers, torch, etc.)
self._model = "loaded"
def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
"""Process a batch. Framework handles batch sizing."""
self._load_model()
results = []
for text in batch[self.config.text_column].fillna(""):
result, confidence = self._process_single(text)
results.append({"result": result, "confidence": confidence})
batch["output"] = results
return batch
def _process_single(self, text: str) -> Tuple[str, float]:
# Your processing logic here
return f"Processed: {text[:50]}", 0.95
3. pipeline.py - Wire It Together
from typing import Any, Dict, Optional
from engine.plugins.extractors.pipeline import (
PipelineDefinition, ResourceType, RowCondition, StepDefinition, build_pipeline_steps,
)
from shared.collection.features.extractors.models import ExtractorDefinition
from .manifest import MyExtractorParams, metadata
from .processors.core import MyExtractorConfig, MyExtractorProcessor
def build_steps(
extractor_request: Any,
container: Optional[Any] = None,
base_steps: Optional[list] = None,
dataset_size: Optional[int] = None,
content_flags: Optional[dict] = None,
) -> Dict[str, Any]:
# Get user-configured parameters
params = MyExtractorParams(**(extractor_request.extractor_config.parameters or {}))
# Define your pipeline - framework handles batching, concurrency, scaling
steps = [
StepDefinition(
service_class=MyExtractorProcessor,
resource_type=ResourceType.CPU, # or GPU, API
config=MyExtractorConfig(threshold=params.threshold, text_column=params.text_column),
condition=RowCondition.IS_TEXT, # Only process text rows
),
]
pipeline = PipelineDefinition(name="my_extractor", version="v1", steps=steps)
pipeline_steps = build_pipeline_steps(pipeline, dataset_size=dataset_size)
return {
"steps": (base_steps or []) + pipeline_steps,
"prepare": lambda ds: ds,
}
extractor_definition = ExtractorDefinition(
name=metadata["name"],
version=metadata["version"],
extractor_function_path=build_steps,
requires_file_resolution=False,
)
4. realtime.py - HTTP Endpoint (Optional)
from typing import Any, Dict
from shared.plugins.inference.serve import BaseInferenceService, ServeDeploymentConfig
from .processors.core import MyExtractorConfig, MyExtractorProcessor
class MyExtractorRealtimeService(BaseInferenceService):
serve_config = ServeDeploymentConfig(enabled=True, num_replicas=1)
def __init__(self):
super().__init__()
self._processor = None
async def _process_single(self, inputs: Dict[str, Any], parameters: Dict[str, Any]) -> Dict[str, Any]:
if self._processor is None:
self._processor = MyExtractorProcessor(MyExtractorConfig())
text = inputs.get("text", "")
result, confidence = self._processor._process_single(text)
return {"result": result, "confidence": confidence}
__all__ = ["MyExtractorRealtimeService"]
Resource Types
Tell the framework what resources your processor needs:
| ResourceType | Use For | Example |
|---|
CPU | Embeddings, classification, text processing | E5, MiniLM, sentiment |
GPU | Local models requiring GPU | Whisper local, CLIP |
API | External API calls | OpenAI, Vertex AI |
The framework automatically configures actors, batch sizes, and concurrency based on your choice.
Row Conditions
Filter which rows your processor handles:
RowCondition.IS_TEXT # Text content only
RowCondition.IS_IMAGE # Image content only
RowCondition.IS_VIDEO # Video content only
RowCondition.IS_AUDIO # Audio content only
RowCondition.IS_PDF # PDF documents
RowCondition.IS_VIDEO_OR_AUDIO # Video or audio
RowCondition.ALWAYS # All rows (default)
Using Built-in Services
Compose existing services into your pipeline instead of writing from scratch:
from engine.inference.openai.whisper_api.services import WhisperTranscriptionBatch
from engine.inference.intfloat.multilingual_e5_large_instruct.services import E5TextEmbeddingBatch
from shared.inference.openai.whisper_api.models import InferenceConfigs as WhisperConfig
from shared.inference.intfloat.multilingual_e5_large_instruct.models import InferenceConfigs as E5Config
def build_steps(extractor_request, **kwargs):
steps = [
# Step 1: Transcribe with Whisper
StepDefinition(
service_class=WhisperTranscriptionBatch,
resource_type=ResourceType.API,
config=WhisperConfig(bytes_column="audio_bytes", output_column_name="transcription"),
condition=RowCondition.IS_VIDEO_OR_AUDIO,
),
# Step 2: Embed with E5
StepDefinition(
service_class=E5TextEmbeddingBatch,
resource_type=ResourceType.CPU,
config=E5Config(text_column="transcription", output_column_name="embedding"),
),
]
pipeline = PipelineDefinition(name="transcribe_and_embed", version="v1", steps=steps)
return {"steps": build_pipeline_steps(pipeline), "prepare": lambda ds: ds}
Available Services
| Category | Service | Import |
|---|
| Embeddings | E5 (1024-dim) | engine.inference.intfloat.multilingual_e5_large_instruct.services.E5TextEmbeddingBatch |
| Embeddings | MiniLM (384-dim) | engine.inference.sentence_transformers.minilm.services |
| Embeddings | CLIP | engine.inference.laion.clip_vit_l_14.services |
| Embeddings | SigLIP | engine.inference.google.siglip.services |
| Transcription | Whisper API | engine.inference.openai.whisper_api.services.WhisperTranscriptionBatch |
| Transcription | Whisper Local | engine.inference.openai.whisper_large_v3_turbo.services |
| Video | FFmpeg Chunking | engine.inference.ffmpeg.services.FFmpegParallelChunking |
| Face | Detection | engine.inference.insightface.scrfd.services.FaceDetectionBatch |
| Face | Embeddings | engine.inference.insightface.arcface.services.FaceEmbeddingBatch |
| Reranking | BGE Reranker | engine.inference.baai.bge_reranker_v2_m3.services |
| LLM | Vertex AI | engine.inference.google.vertex.services.VertexAIBatch |
Using Your Plugin
After publishing, use your plugin in collections:
from mixpeek import Mixpeek
client = Mixpeek(api_key="sk_...")
collection = client.collections.create(
collection_name="my_collection",
source={"type": "bucket", "bucket_ids": ["bkt_..."]},
feature_extractor={
"feature_extractor_name": "my_extractor", # Your plugin name
"version": "v1",
"parameters": {
"threshold": 0.7 # Your custom parameters
}
}
)
CLI Commands
| Command | Description |
|---|
mixpeek plugin init <name> | Create new plugin |
mixpeek plugin test | Validate and test locally |
mixpeek plugin publish | Upload to namespace |
mixpeek plugin list | List available plugins |
See the Mixpeek CLI Reference for detailed command options, environment variables, and troubleshooting.
Security
Plugins are scanned before deployment. The following are not allowed:
subprocess, os.system - System calls
eval, exec - Dynamic code execution
socket - Direct network access
ctypes - Low-level memory access
Next Steps