Skip to main content

Overview

Plugins are the core processing units in Mixpeek. They define how your data is transformed, embedded, and indexed. Mixpeek provides two types of plugins:
  • Builtin plugins: Pre-built extractors maintained by Mixpeek (text, image, video, document, etc.)
  • Custom plugins: Your own extraction logic running on Mixpeek infrastructure
Deployment modes:
ModeDescriptionUse Case
Batch processingHigh-throughput Ray Data pipelinesProcessing collections, indexing documents
Real-time inferenceRay Serve HTTP endpointsLive API requests, synchronous embedding

Architecture

┌─────────────────────────────────────────────────────────────────┐
│                        Plugin System                            │
├──────────────────────────┬──────────────────────────────────────┤
│   Builtin Plugins        │   Custom Plugins                     │
│   (engine/plugins/)      │   (S3 uploaded archives)             │
├──────────────────────────┴──────────────────────────────────────┤
│                     Pipeline Builder                            │
│   - Declarative step definitions                                │
│   - Resource allocation (CPU/GPU/API)                           │
│   - Content-type filtering                                      │
├─────────────────────────────────────────────────────────────────┤
│              Ray Data (Batch)    │    Ray Serve (Real-time)     │
│   - map_batches() processing     │    - HTTP deployment         │
│   - DataFrame input/output       │    - Auto-scaling            │
│   - Parallel execution           │    - Load balancing          │
├─────────────────────────────────────────────────────────────────┤
│                      Model Registry                             │
│   - HuggingFace models (cluster-cached)                         │
│   - Custom S3 models (namespace-scoped)                         │
│   - Lazy loading (on-demand)                                    │
└─────────────────────────────────────────────────────────────────┘

Quick Start: Custom Plugin

1. Create Plugin Structure

my_extractor/
├── manifest.py      # Schemas + metadata
├── pipeline.py      # Batch processing pipeline
├── realtime.py      # HTTP endpoint (optional, Enterprise)
└── processors/
    └── core.py      # Your processing logic

2. Define Your Plugin (manifest.py)

from pydantic import BaseModel, Field
from typing import List, Optional

# Input schema - what your plugin accepts
class MyInput(BaseModel):
    text: str = Field(..., description="Input text to process")

# Output schema - what your plugin produces
class MyOutput(BaseModel):
    embedding: List[float] = Field(..., description="384-dim embedding vector")
    sentiment: str = Field(..., description="positive/negative/neutral")

# Parameters - user-configurable options
class MyParams(BaseModel):
    threshold: float = Field(default=0.5, ge=0, le=1, description="Confidence threshold")
    model_size: str = Field(default="base", description="Model size: base or large")

# Plugin metadata
metadata = {
    "feature_extractor_name": "my_extractor",
    "version": "1.0.0",
    "description": "Custom text embedding with sentiment analysis",
    "category": "text",
}

input_schema = MyInput
output_schema = MyOutput
parameter_schema = MyParams
supported_input_types = ["text"]

# Vector index definitions for Qdrant
features = [
    {
        "feature_name": "my_embedding",
        "feature_type": "embedding",
        "embedding_dim": 384,
        "distance_metric": "cosine",
    },
]

3. Implement Batch Processing (processors/core.py)

from dataclasses import dataclass
import pandas as pd

@dataclass
class MyConfig:
    threshold: float = 0.5
    model_size: str = "base"

class MyProcessor:
    """Batch processor for Ray Data pipelines."""

    def __init__(self, config: MyConfig, progress_actor=None):
        self.config = config
        self._model = None  # Lazy loading

    def _ensure_model_loaded(self):
        """Load model on first batch (lazy loading)."""
        if self._model is None:
            from sentence_transformers import SentenceTransformer
            model_name = "all-MiniLM-L6-v2" if self.config.model_size == "base" else "all-mpnet-base-v2"
            self._model = SentenceTransformer(model_name)

    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        """Process a batch of rows."""
        self._ensure_model_loaded()

        # Get text from batch
        texts = batch["text"].fillna("").tolist()

        # Generate embeddings
        embeddings = self._model.encode(texts).tolist()
        batch["my_embedding"] = embeddings

        # Simple sentiment (replace with your logic)
        batch["sentiment"] = ["positive" if len(t) > 50 else "neutral" for t in texts]

        return batch

4. Wire Into Pipeline (pipeline.py)

from engine.plugins.extractors.pipeline import (
    PipelineDefinition,
    ResourceType,
    StepDefinition,
    build_pipeline_steps
)
from .manifest import MyParams, metadata
from .processors.core import MyConfig, MyProcessor

def build_steps(extractor_request, container=None, base_steps=None, **kwargs):
    """Build the extraction pipeline."""

    # Parse parameters from request
    params = MyParams(**(extractor_request.extractor_config.parameters or {}))

    # Create processor config
    config = MyConfig(
        threshold=params.threshold,
        model_size=params.model_size,
    )

    # Define pipeline steps
    pipeline = PipelineDefinition(
        name=metadata["feature_extractor_name"],
        version=metadata["version"],
        steps=[
            StepDefinition(
                service_class=MyProcessor,
                resource_type=ResourceType.CPU,  # or GPU, API
                config=config,
            ),
        ]
    )

    # Build Ray Data steps
    steps = build_pipeline_steps(pipeline)

    return {"steps": steps}

5. Add Real-time Endpoint (realtime.py) - Enterprise Only

from shared.plugins.inference.serve import BaseInferenceService

class InferenceService(BaseInferenceService):
    """Real-time HTTP inference endpoint."""

    def __init__(self):
        super().__init__()
        self._model = None

    async def __call__(self, inputs: dict, parameters: dict) -> dict:
        """Handle inference request."""
        # Lazy load model
        if self._model is None:
            from sentence_transformers import SentenceTransformer
            self._model = SentenceTransformer("all-MiniLM-L6-v2")

        text = inputs.get("text", "")
        embedding = self._model.encode([text])[0].tolist()

        return {
            "embedding": embedding,
            "sentiment": "positive" if len(text) > 50 else "neutral"
        }

6. Upload and Deploy

# Package your plugin
zip -r my_extractor.zip my_extractor/

# Get upload URL
curl -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/uploads" \
  -H "Authorization: Bearer $API_KEY" \
  -d '{"name": "my_extractor", "version": "1.0.0"}'

# Upload
curl -X PUT "$PRESIGNED_URL" --data-binary @my_extractor.zip

# Confirm
curl -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/uploads/$UPLOAD_ID/confirm" \
  -H "Authorization: Bearer $API_KEY"

# Deploy real-time endpoint (Enterprise)
curl -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/$PLUGIN_ID/deploy" \
  -H "Authorization: Bearer $API_KEY"

Using Built-in Services

Instead of implementing models from scratch, compose existing Mixpeek services:

Option 1: Import Batch Services Directly

from shared.inference.registry import get_batch_service

# Get the service class
E5Batch = get_batch_service("intfloat/multilingual-e5-large-instruct")
SigLIPBatch = get_batch_service("google/siglip-base-patch16-224")
WhisperBatch = get_batch_service("openai/whisper-large-v3-turbo")

# Use in your pipeline
def build_steps(extractor_request, **kwargs):
    from shared.inference.intfloat.multilingual_e5_large_instruct.models import InferenceConfigs

    config = InferenceConfigs(
        text_column="text",
        output_column_name="embedding",
    )

    pipeline = PipelineDefinition(
        name="my_extractor",
        version="v1",
        steps=[
            StepDefinition(
                service_class=E5Batch,
                resource_type=ResourceType.CPU,
                config=config,
            ),
        ]
    )
    return {"steps": build_pipeline_steps(pipeline)}

Option 2: Call Real-time Services via HTTP

import httpx

async def embed_text(text: str) -> list[float]:
    """Call the E5 embedding service."""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            "http://localhost:8001/multilingual_e5_large_instruct/v1",
            json={"inputs": {"texts": [text]}, "parameters": {}}
        )
        return response.json()["embeddings"][0]

Available Services

Service IDTypeDimensionsUse Case
intfloat/multilingual-e5-large-instructEmbedding1024Multilingual text embeddings
google/siglip-base-patch16-224Embedding512Image embeddings
jinaai/jina-embeddings-v2-base-codeEmbedding768Code embeddings
BAAI/bge-reranker-v2-m3Reranker-Search result reordering
openai/whisper-large-v3-turboTranscription-Audio to text

Loading Models

Mixpeek provides three ways to load models in your plugins:

1. HuggingFace Models (Cluster-Cached)

Models are downloaded once and cached across all workers:
from engine.models import load_hf_model
from transformers import AutoConfig

class MyProcessor:
    def __init__(self, config):
        self.config = config
        self._model = None

    def _ensure_model_loaded(self):
        if self._model is None:
            # Load from cluster cache (zero-copy sharing)
            cached = load_hf_model(
                hf_model_id="intfloat/multilingual-e5-large-instruct",
                model_class="AutoModel",
                tokenizer_class="AutoTokenizer",
                torch_dtype="float16",
            )

            # Instantiate model from cached state_dict
            from transformers import AutoModel, AutoTokenizer
            config = AutoConfig.from_dict(cached["config"])
            self._model = AutoModel(config)
            self._model.load_state_dict(cached["state_dict"])
            self._tokenizer = AutoTokenizer.from_pretrained(
                cached["tokenizer_config"]["tokenizer_dir"]
            )

2. Custom S3 Models (Namespace-Scoped)

Use your own uploaded model weights:
from engine.models import load_namespace_model
import torch

class MyProcessor:
    def __init__(self, config):
        self._model = None

    def _ensure_model_loaded(self):
        if self._model is None:
            # Load pre-uploaded weights from S3
            weights = load_namespace_model("my-fine-tuned-bert_1_0_0")

            # Initialize your model architecture
            self._model = torch.nn.Sequential(
                torch.nn.Linear(768, 256),
                torch.nn.ReLU(),
                torch.nn.Linear(256, 128),
            )
            self._model.load_state_dict(weights)
            self._model.eval()
Automatic lazy loading with cluster-wide caching:
from engine.models import LazyModelMixin
from shared.plugins.inference.batch import BaseBatchInferenceService

class MyEmbedder(LazyModelMixin, BaseBatchInferenceService):
    """Batch embedder with automatic lazy loading."""

    # Configure model (class attributes)
    model_id = "intfloat/multilingual-e5-large-instruct"
    model_class = "AutoModel"
    tokenizer_class = "AutoTokenizer"
    torch_dtype = "float16"

    def __init__(self, config, **kwargs):
        super().__init__(**kwargs)
        self.config = config
        # Model NOT loaded here - loaded on first batch

    def _process_batch(self, batch):
        # Model automatically loaded on first call
        model, tokenizer = self.get_model()

        texts = batch["text"].tolist()
        inputs = tokenizer(texts, return_tensors="pt", padding=True, truncation=True)
        outputs = model(**inputs)

        batch["embedding"] = outputs.last_hidden_state.mean(dim=1).tolist()
        return batch

4. @lazy_model Decorator (Quick Pattern)

For simpler cases:
from engine.models import lazy_model

class MyProcessor:
    def __init__(self, config):
        self.config = config
        self._model = None
        self._tokenizer = None

    @lazy_model(
        model_id="intfloat/multilingual-e5-large-instruct",
        model_class="AutoModel",
        tokenizer_class="AutoTokenizer",
    )
    def __call__(self, batch):
        # self._model and self._tokenizer are automatically available
        texts = batch["text"].tolist()
        inputs = self._tokenizer(texts, return_tensors="pt", padding=True)
        outputs = self._model(**inputs)
        batch["embedding"] = outputs.last_hidden_state.mean(dim=1).tolist()
        return batch

Pipeline Step Configuration

Resource Types

TypeDescriptionUse Case
ResourceType.CPUCPU-only workersText processing, lightweight models
ResourceType.GPUGPU-allocated workersLarge models, image/video
ResourceType.APIExternal API callsOpenAI, Vertex AI, Anthropic

Row Conditions

Filter which rows your step processes:
from engine.plugins.extractors.pipeline import RowCondition

StepDefinition(
    service_class=ImageProcessor,
    resource_type=ResourceType.GPU,
    condition=RowCondition.IS_IMAGE,  # Only process images
)
ConditionMatches
RowCondition.IS_TEXTtext/* MIME types
RowCondition.IS_IMAGEimage/* MIME types
RowCondition.IS_VIDEOvideo/* MIME types
RowCondition.IS_AUDIOaudio/* MIME types
RowCondition.IS_PDFapplication/pdf
RowCondition.ALWAYSAll rows (default)

Conditional Steps

Enable/disable steps based on parameters:
pipeline = PipelineDefinition(
    name="my_extractor",
    version="v1",
    steps=[
        StepDefinition(
            service_class=TextChunker,
            resource_type=ResourceType.CPU,
            enabled=params.enable_chunking,  # Conditional
            config=chunker_config,
        ),
        StepDefinition(
            service_class=E5Batch,
            resource_type=ResourceType.CPU,
            config=embedding_config,
        ),
    ]
)

Builtin Plugin Development

This section is for Mixpeek internal developers creating new builtin plugins.

Directory Structure

engine/plugins/builtin/my_extractor/v1/
├── __init__.py
├── definition.py     # Single source of truth
├── pipeline.py       # Batch processing pipeline
├── realtime.py       # Optional: Ray Serve endpoint
└── processors/
    ├── __init__.py
    └── core.py       # Processing logic

definition.py (Single Source of Truth)

"""My extractor plugin definition."""

from enum import IntEnum
from typing import List, Literal
from pydantic import BaseModel, Field

from shared.billing.models import CostRate, CostUnit
from shared.collection.features.extractors.models import (
    CostsInfo,
    FeatureExtractorModel,
)
from shared.collection.features.vectors.models import (
    VectorIndex,
    VectorIndexDefinition,
    VectorType,
)

# =============================================================================
# COST CONFIGURATION
# =============================================================================

class MyExtractorCosts(IntEnum):
    PER_1K_TOKENS = 1

TIER = 1
TIER_LABEL = "SIMPLE"

RATES: List[CostRate] = [
    CostRate(
        unit=CostUnit.TOKENS_1K,
        credits_per_unit=MyExtractorCosts.PER_1K_TOKENS,
        description="Per 1K tokens processed",
    ),
]

# =============================================================================
# PARAMETER SCHEMA
# =============================================================================

class MyExtractorParams(BaseModel):
    extractor_type: Literal["my_extractor"] = "my_extractor"
    threshold: float = Field(default=0.5, ge=0, le=1)
    enable_feature_x: bool = Field(default=True)

# =============================================================================
# INPUT/OUTPUT SCHEMAS
# =============================================================================

class MyExtractorInput(BaseModel):
    text: str = Field(..., min_length=1)

class MyExtractorOutput(BaseModel):
    embedding: List[float] = Field(..., min_length=1024, max_length=1024)

# =============================================================================
# VECTOR INDEX CONFIGURATION
# =============================================================================

VECTOR_INDEXES = [
    VectorIndexDefinition(
        name="my_embedding",
        description="Dense embedding vector",
        type="single",
        index=VectorIndex(
            name="my_extractor_v1_embedding",
            description="Dense vector embedding",
            dimensions=1024,
            type=VectorType.DENSE,
            distance="Cosine",
            inference_service_id="intfloat/multilingual-e5-large-instruct",
        ),
    ),
]

# =============================================================================
# PLUGIN DEFINITION
# =============================================================================

definition = FeatureExtractorModel(
    feature_extractor_name="my_extractor",
    version="v1",
    description="My custom extractor for specialized processing",
    icon="wand-2",
    input_schema=MyExtractorInput,
    output_schema=MyExtractorOutput,
    parameter_schema=MyExtractorParams,
    required_vector_indexes=VECTOR_INDEXES,
    costs=CostsInfo(tier=TIER, tier_label=TIER_LABEL, rates=RATES),
)

metadata = {
    "name": "my_extractor",
    "version": "v1",
    "description": definition.description,
}

Register in Plugin Registry

Add to engine/plugins/registry.py:
BUILTIN_PLUGINS = {
    # ... existing plugins
    "my_extractor_v1": {
        "name": "my_extractor",
        "version": "v1",
        "module_path": "engine.plugins.builtin.my_extractor.v1",
    },
}

Security Requirements

Custom plugins are scanned before deployment. Code violating these rules is rejected.

Allowed

CategoryLibraries
Datanumpy, pandas, polars, pyarrow
ML/AItorch, transformers, sentence_transformers, onnxruntime
ImagePIL, cv2, imageio
Audiolibrosa, soundfile, ffmpeg-python
HTTPrequests, httpx, aiohttp
Utilsjson, re, typing, dataclasses, pydantic, logging

Forbidden

CategoryBlockedReason
Executionsubprocess, os.system, eval, execShell/code execution
Systemctypes, socket, multiprocessingLow-level access
Builtinsopen, setattr, getattr, __import__File/attribute manipulation

Deployment Lifecycle

Deploy Real-time Endpoint

curl -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/$PLUGIN_ID/deploy" \
  -H "Authorization: Bearer $API_KEY"

Check Status

curl "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/$PLUGIN_ID/status"
StatusDescription
QUEUEDWaiting in deployment queue
PENDINGDeployment triggered
IN_PROGRESSBlue-green rollout in progress
DEPLOYEDReady for real-time inference
FAILEDCheck error field
NOT_DEPLOYEDBatch-only mode

Undeploy

curl -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/$PLUGIN_ID/undeploy"

Delete

curl -X DELETE "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/$PLUGIN_ID"

Example: Multi-Stage Pipeline

Here’s a complete example combining transcription and embedding:
# pipeline.py
from shared.inference.registry import get_batch_service
from engine.plugins.extractors.pipeline import (
    PipelineDefinition,
    ResourceType,
    RowCondition,
    StepDefinition,
    build_pipeline_steps,
)

def build_steps(extractor_request, **kwargs):
    params = MyParams(**(extractor_request.extractor_config.parameters or {}))

    # Get built-in services
    WhisperBatch = get_batch_service("openai/whisper-large-v3-turbo")
    E5Batch = get_batch_service("intfloat/multilingual-e5-large-instruct")

    pipeline = PipelineDefinition(
        name="audio_to_embedding",
        version="v1",
        steps=[
            # Step 1: Transcribe audio
            StepDefinition(
                service_class=WhisperBatch,
                resource_type=ResourceType.API,
                condition=RowCondition.IS_AUDIO,
                config=WhisperConfig(output_column="transcription"),
            ),
            # Step 2: Embed transcription
            StepDefinition(
                service_class=E5Batch,
                resource_type=ResourceType.CPU,
                config=E5Config(
                    text_column="transcription",
                    output_column_name="embedding",
                ),
            ),
        ]
    )

    return {"steps": build_pipeline_steps(pipeline)}

Troubleshooting

Plugin validation failed

Check the validation_errors array. Common issues:
  • Using setattr() or getattr() - use class attributes instead
  • Importing subprocess - use requests or httpx for HTTP
  • Using open() - use provided data loading APIs

Model loading is slow

  • Use LazyModelMixin for automatic cluster caching
  • Pre-deploy models via the Model Registry API
  • Check cached: true in deployment response

Batch processing fails

  1. Ensure _process_batch() returns a DataFrame
  2. Reset DataFrame index: batch = batch.reset_index(drop=True)
  3. Handle None/empty values: batch["col"].fillna("")

Real-time endpoint not responding

  1. Check deployment status via /status endpoint
  2. Verify plugin is DEPLOYED not NOT_DEPLOYED
  3. Check Ray Serve logs for errors