Skip to main content
Custom plugins are an Enterprise feature that requires dedicated infrastructure. Contact us to enable custom plugins for your namespace.

Overview

Custom plugins let you create your own feature extractors that run on Mixpeek’s infrastructure. The framework handles all the complexity - batching, concurrency, GPU allocation, scaling - you just write your processing logic.

Quick Start

# Install the SDK
pip install mixpeek

# Create a new plugin
mixpeek plugin init my_extractor --category text

# Test locally
cd my_extractor && mixpeek plugin test

# Publish to your namespace
mixpeek plugin publish --namespace ns_your_namespace
Plugins must be published to your namespace before they can be used in collections.
For comprehensive CLI documentation including all commands, options, and examples, see the Mixpeek CLI Reference.

Plugin Structure

Every plugin has 4 files:
my_extractor/
├── manifest.py      # Schemas and metadata
├── pipeline.py      # Processing pipeline (declarative)
├── realtime.py      # HTTP endpoint (optional)
└── processors/
    └── core.py      # Your processing logic

1. manifest.py - Define Your Schemas

from pydantic import BaseModel, Field

class MyExtractorInput(BaseModel):
    text: str = Field(..., description="Input text")

class MyExtractorOutput(BaseModel):
    result: str
    confidence: float = Field(..., ge=0.0, le=1.0)

class MyExtractorParams(BaseModel):
    """Parameters users can configure when creating a collection."""
    threshold: float = Field(default=0.5, ge=0.0, le=1.0)
    text_column: str = Field(default="text")

metadata = {
    "name": "my_extractor",
    "version": "v1",
    "description": "My custom extractor",
    "category": "text",
}

input_schema = MyExtractorInput
output_schema = MyExtractorOutput
parameter_schema = MyExtractorParams

2. processors/core.py - Write Your Logic

from dataclasses import dataclass
from typing import Tuple
import pandas as pd

@dataclass
class MyExtractorConfig:
    threshold: float = 0.5
    text_column: str = "text"

class MyExtractorProcessor:
    """Called repeatedly with batches of data."""

    def __init__(self, config: MyExtractorConfig, progress_actor=None):
        self.config = config
        self._model = None  # Lazy load

    def _load_model(self):
        if self._model is None:
            # Load your model here (e.g., transformers, torch, etc.)
            self._model = "loaded"

    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        """Process a batch. Framework handles batch sizing."""
        self._load_model()

        results = []
        for text in batch[self.config.text_column].fillna(""):
            result, confidence = self._process_single(text)
            results.append({"result": result, "confidence": confidence})

        batch["output"] = results
        return batch

    def _process_single(self, text: str) -> Tuple[str, float]:
        # Your processing logic here
        return f"Processed: {text[:50]}", 0.95

3. pipeline.py - Wire It Together

from typing import Any, Dict, Optional
from engine.plugins.extractors.pipeline import (
    PipelineDefinition, ResourceType, RowCondition, StepDefinition, build_pipeline_steps,
)
from shared.collection.features.extractors.models import ExtractorDefinition
from .manifest import MyExtractorParams, metadata
from .processors.core import MyExtractorConfig, MyExtractorProcessor

def build_steps(
    extractor_request: Any,
    container: Optional[Any] = None,
    base_steps: Optional[list] = None,
    dataset_size: Optional[int] = None,
    content_flags: Optional[dict] = None,
) -> Dict[str, Any]:

    # Get user-configured parameters
    params = MyExtractorParams(**(extractor_request.extractor_config.parameters or {}))

    # Define your pipeline - framework handles batching, concurrency, scaling
    steps = [
        StepDefinition(
            service_class=MyExtractorProcessor,
            resource_type=ResourceType.CPU,  # or GPU, API
            config=MyExtractorConfig(threshold=params.threshold, text_column=params.text_column),
            condition=RowCondition.IS_TEXT,  # Only process text rows
        ),
    ]

    pipeline = PipelineDefinition(name="my_extractor", version="v1", steps=steps)
    pipeline_steps = build_pipeline_steps(pipeline, dataset_size=dataset_size)

    return {
        "steps": (base_steps or []) + pipeline_steps,
        "prepare": lambda ds: ds,
    }

extractor_definition = ExtractorDefinition(
    name=metadata["name"],
    version=metadata["version"],
    extractor_function_path=build_steps,
    requires_file_resolution=False,
)

4. realtime.py - HTTP Endpoint (Optional)

from typing import Any, Dict
from shared.plugins.inference.serve import BaseInferenceService, ServeDeploymentConfig
from .processors.core import MyExtractorConfig, MyExtractorProcessor

class MyExtractorRealtimeService(BaseInferenceService):
    serve_config = ServeDeploymentConfig(enabled=True, num_replicas=1)

    def __init__(self):
        super().__init__()
        self._processor = None

    async def _process_single(self, inputs: Dict[str, Any], parameters: Dict[str, Any]) -> Dict[str, Any]:
        if self._processor is None:
            self._processor = MyExtractorProcessor(MyExtractorConfig())

        text = inputs.get("text", "")
        result, confidence = self._processor._process_single(text)
        return {"result": result, "confidence": confidence}

__all__ = ["MyExtractorRealtimeService"]

Resource Types

Tell the framework what resources your processor needs:
ResourceTypeUse ForExample
CPUEmbeddings, classification, text processingE5, MiniLM, sentiment
GPULocal models requiring GPUWhisper local, CLIP
APIExternal API callsOpenAI, Vertex AI
The framework automatically configures actors, batch sizes, and concurrency based on your choice.

Row Conditions

Filter which rows your processor handles:
RowCondition.IS_TEXT       # Text content only
RowCondition.IS_IMAGE      # Image content only
RowCondition.IS_VIDEO      # Video content only
RowCondition.IS_AUDIO      # Audio content only
RowCondition.IS_PDF        # PDF documents
RowCondition.IS_VIDEO_OR_AUDIO  # Video or audio
RowCondition.ALWAYS        # All rows (default)

Using Built-in Services

Compose existing services into your pipeline instead of writing from scratch:
from engine.inference.openai.whisper_api.services import WhisperTranscriptionBatch
from engine.inference.intfloat.multilingual_e5_large_instruct.services import E5TextEmbeddingBatch
from shared.inference.openai.whisper_api.models import InferenceConfigs as WhisperConfig
from shared.inference.intfloat.multilingual_e5_large_instruct.models import InferenceConfigs as E5Config

def build_steps(extractor_request, **kwargs):
    steps = [
        # Step 1: Transcribe with Whisper
        StepDefinition(
            service_class=WhisperTranscriptionBatch,
            resource_type=ResourceType.API,
            config=WhisperConfig(bytes_column="audio_bytes", output_column_name="transcription"),
            condition=RowCondition.IS_VIDEO_OR_AUDIO,
        ),
        # Step 2: Embed with E5
        StepDefinition(
            service_class=E5TextEmbeddingBatch,
            resource_type=ResourceType.CPU,
            config=E5Config(text_column="transcription", output_column_name="embedding"),
        ),
    ]
    pipeline = PipelineDefinition(name="transcribe_and_embed", version="v1", steps=steps)
    return {"steps": build_pipeline_steps(pipeline), "prepare": lambda ds: ds}

Available Services

CategoryServiceImport
EmbeddingsE5 (1024-dim)engine.inference.intfloat.multilingual_e5_large_instruct.services.E5TextEmbeddingBatch
EmbeddingsMiniLM (384-dim)engine.inference.sentence_transformers.minilm.services
EmbeddingsCLIPengine.inference.laion.clip_vit_l_14.services
EmbeddingsSigLIPengine.inference.google.siglip.services
TranscriptionWhisper APIengine.inference.openai.whisper_api.services.WhisperTranscriptionBatch
TranscriptionWhisper Localengine.inference.openai.whisper_large_v3_turbo.services
VideoFFmpeg Chunkingengine.inference.ffmpeg.services.FFmpegParallelChunking
FaceDetectionengine.inference.insightface.scrfd.services.FaceDetectionBatch
FaceEmbeddingsengine.inference.insightface.arcface.services.FaceEmbeddingBatch
RerankingBGE Rerankerengine.inference.baai.bge_reranker_v2_m3.services
LLMVertex AIengine.inference.google.vertex.services.VertexAIBatch

Using Your Plugin

After publishing, use your plugin in collections:
from mixpeek import Mixpeek

client = Mixpeek(api_key="sk_...")

collection = client.collections.create(
    collection_name="my_collection",
    source={"type": "bucket", "bucket_ids": ["bkt_..."]},
    feature_extractor={
        "feature_extractor_name": "my_extractor",  # Your plugin name
        "version": "v1",
        "parameters": {
            "threshold": 0.7  # Your custom parameters
        }
    }
)

CLI Commands

CommandDescription
mixpeek plugin init <name>Create new plugin
mixpeek plugin testValidate and test locally
mixpeek plugin publishUpload to namespace
mixpeek plugin listList available plugins
See the Mixpeek CLI Reference for detailed command options, environment variables, and troubleshooting.

Security

Plugins are scanned before deployment. The following are not allowed:
  • subprocess, os.system - System calls
  • eval, exec - Dynamic code execution
  • socket - Direct network access
  • ctypes - Low-level memory access

Next Steps