Skip to main content

Overview

Custom plugins let you run your own feature extraction logic on Mixpeek’s infrastructure. The framework handles batching, scaling, and GPU allocation - you write the processing code. Two deployment modes:
  • Batch processing (all accounts): Process files in collections
  • Realtime inference (Enterprise): HTTP endpoint for live requests

Quick Start

# 1. Create plugin
mixpeek plugin init my_extractor --category text

# 2. Edit processors/core.py with your logic

# 3. Test locally
cd my_extractor && mixpeek plugin test

# 4. Upload to namespace
mixpeek plugin publish --namespace ns_xxx
After upload, use in collections:
client.collections.create(
    collection_name="my_collection",
    source={"type": "bucket", "bucket_ids": ["bkt_..."]},
    feature_extractor={
        "feature_extractor_name": "my_extractor",
        "version": "1.0.0",
        "parameters": {"threshold": 0.7}
    }
)

Plugin Structure

my_extractor/
├── manifest.py      # Schemas + metadata
├── pipeline.py      # Batch processing pipeline
├── realtime.py      # HTTP endpoint (optional, Enterprise)
└── processors/
    └── core.py      # Your logic

manifest.py

Defines what your plugin accepts and produces:
from pydantic import BaseModel, Field
from typing import List

class MyInput(BaseModel):
    text: str = Field(..., description="Input text")

class MyOutput(BaseModel):
    embedding: List[float] = Field(..., description="384-dim embedding")
    label: str

class MyParams(BaseModel):
    """User-configurable parameters for collections."""
    threshold: float = Field(default=0.5, ge=0, le=1)

metadata = {
    "feature_extractor_name": "my_extractor",
    "version": "1.0.0",
    "description": "My custom extractor",
    "category": "text",
}

input_schema = MyInput
output_schema = MyOutput
parameter_schema = MyParams
supported_input_types = ["text"]

# Define output features for vector indexing
features = [
    {
        "feature_name": "my_extractor_embedding",
        "feature_type": "embedding",
        "embedding_dim": 384,
        "distance_metric": "cosine",
    },
]

processors/core.py

Your processing logic:
from dataclasses import dataclass
import pandas as pd

@dataclass
class MyConfig:
    threshold: float = 0.5

class MyProcessor:
    def __init__(self, config: MyConfig, progress_actor=None):
        self.config = config
        self._model = None

    def _load_model(self):
        if self._model is None:
            # Load once, reuse for all batches
            from sentence_transformers import SentenceTransformer
            self._model = SentenceTransformer("all-MiniLM-L6-v2")

    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        """Process batch. Framework handles batch sizing."""
        self._load_model()

        texts = batch["text"].fillna("").tolist()
        embeddings = self._model.encode(texts).tolist()

        batch["my_extractor_embedding"] = embeddings
        batch["label"] = ["positive" if e[0] > self.config.threshold else "negative" for e in embeddings]
        return batch

pipeline.py

Wire processor into the pipeline:
from typing import Any, Dict, Optional
from engine.plugins.extractors.pipeline import (
    PipelineDefinition, ResourceType, RowCondition, StepDefinition, build_pipeline_steps
)
from .manifest import MyParams, metadata
from .processors.core import MyConfig, MyProcessor

def build_steps(
    extractor_request: Any,
    container: Optional[Any] = None,
    base_steps: Optional[list] = None,
    **kwargs
) -> Dict[str, Any]:
    params = MyParams(**(extractor_request.extractor_config.parameters or {}))

    steps = [
        StepDefinition(
            service_class=MyProcessor,
            resource_type=ResourceType.CPU,  # or GPU, API
            config=MyConfig(threshold=params.threshold),
            condition=RowCondition.IS_TEXT,
        ),
    ]

    pipeline = PipelineDefinition(name=metadata["feature_extractor_name"], version=metadata["version"], steps=steps)
    return {"steps": (base_steps or []) + build_pipeline_steps(pipeline), "prepare": lambda ds: ds}

realtime.py (Enterprise)

For HTTP inference endpoint:
from typing import Any, Dict

class RealtimeHandler:
    def __init__(self):
        self._model = None

    def predict(self, request: Dict[str, Any]) -> Dict[str, Any]:
        if self._model is None:
            from sentence_transformers import SentenceTransformer
            self._model = SentenceTransformer("all-MiniLM-L6-v2")

        text = request.get("text", "")
        embedding = self._model.encode([text])[0].tolist()
        return {"embedding": embedding, "label": "positive" if embedding[0] > 0.5 else "negative"}

API Reference

Upload Plugin (Presigned URL)

Step 1: Get upload URL
curl -X POST "https://api.mixpeek.com/v1/namespaces/ns_xxx/plugins/uploads" \
  -H "Authorization: Bearer $API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"name": "my_extractor", "version": "1.0.0"}'
Response:
{
  "upload_id": "plu_abc123",
  "presigned_url": "https://s3.amazonaws.com/...",
  "expires_at": "2024-01-15T12:00:00Z"
}
Step 2: Upload archive
curl -X PUT "$PRESIGNED_URL" \
  -H "Content-Type: application/zip" \
  --data-binary @my_extractor.zip
Step 3: Confirm upload
curl -X POST "https://api.mixpeek.com/v1/namespaces/ns_xxx/plugins/uploads/plu_abc123/confirm" \
  -H "Authorization: Bearer $API_KEY"
Response:
{
  "success": true,
  "plugin_id": "my_extractor_1_0_0",
  "validation_status": "passed",
  "feature_uri": "mixpeek://[email protected]"
}

Deploy Plugin (Enterprise)

curl -X POST "https://api.mixpeek.com/v1/namespaces/ns_xxx/plugins/my_extractor_1_0_0/deploy" \
  -H "Authorization: Bearer $API_KEY"
Response:
{
  "status": "queued",
  "message": "Plugin queued for deployment. Will start within 30 seconds.",
  "route_prefix": "/inference/custom_plugin_ns_xxx_my_extractor_1_0_0",
  "estimated_completion_seconds": 210
}

Check Status

curl "https://api.mixpeek.com/v1/namespaces/ns_xxx/plugins/my_extractor_1_0_0/status" \
  -H "Authorization: Bearer $API_KEY"
Status values:
StatusMeaning
QUEUEDWaiting in deployment queue (max 30s)
PENDINGDeployment triggered, waiting for Anyscale
IN_PROGRESSBlue-green rollout happening
DEPLOYEDReady for realtime inference
FAILEDCheck error field
NOT_DEPLOYEDBatch-only, no realtime endpoint

Delete Plugin

curl -X DELETE "https://api.mixpeek.com/v1/namespaces/ns_xxx/plugins/my_extractor_1_0_0" \
  -H "Authorization: Bearer $API_KEY"
Response includes deployment_removed: true if Ray Serve endpoint was queued for removal.

Resource Types

TypeUse ForAuto-config
ResourceType.CPUText embeddings, classificationBatch size ~64
ResourceType.GPULocal models (Whisper, CLIP)Batch size ~8, GPU allocation
ResourceType.APIExternal APIs (OpenAI, Vertex)Concurrent requests

Row Conditions

Filter which rows your processor handles:
RowCondition.IS_TEXT       # text/* MIME types
RowCondition.IS_IMAGE      # image/* MIME types
RowCondition.IS_VIDEO      # video/* MIME types
RowCondition.IS_AUDIO      # audio/* MIME types
RowCondition.IS_PDF        # application/pdf
RowCondition.ALWAYS        # All rows (default)

Using Built-in Services

Compose existing services instead of writing from scratch:
from engine.inference.openai.whisper_api.services import WhisperTranscriptionBatch
from engine.inference.intfloat.multilingual_e5_large_instruct.services import E5TextEmbeddingBatch

def build_steps(extractor_request, **kwargs):
    steps = [
        StepDefinition(
            service_class=WhisperTranscriptionBatch,
            resource_type=ResourceType.API,
            config=WhisperConfig(output_column_name="transcription"),
            condition=RowCondition.IS_AUDIO,
        ),
        StepDefinition(
            service_class=E5TextEmbeddingBatch,
            resource_type=ResourceType.CPU,
            config=E5Config(text_column="transcription", output_column_name="embedding"),
        ),
    ]
    return {"steps": build_pipeline_steps(PipelineDefinition(name="transcribe_embed", version="v1", steps=steps))}

Security Requirements

Plugins are scanned using AST analysis before upload confirmation. Code that violates these rules will be rejected.

Allowed Imports

These libraries are safe to use in your plugins:
CategoryLibraries
Data Processingnumpy, pandas, polars, pyarrow
ML/AItorch, transformers, sentence_transformers, onnxruntime, safetensors
Image ProcessingPIL/pillow, cv2/opencv-python, imageio
Audio/Videolibrosa, soundfile, ffmpeg-python
HTTP Clientsrequests, httpx, aiohttp
Utilitiesjson, re, typing, dataclasses, pydantic, logging
Mixpeek SDKshared.models.loader.load_namespace_model

Forbidden Imports

These imports are blocked for security reasons:
CategoryBlockedReason
Process Executionsubprocess, os.system, os.popen, os.spawn*, os.exec*Shell execution
Low-level Accessctypes, pty, fcntl, resourceMemory/system access
Concurrencymultiprocessing, threadingUse Ray instead
NetworksocketUse requests/httpx instead
Legacycommands, popen2Deprecated Python 2 modules

Forbidden Builtins

These built-in functions cannot be used:
FunctionReason
eval(), exec(), compile()Dynamic code execution
open()Direct file access (use provided APIs)
__import__()Dynamic imports
globals(), locals(), vars()Namespace manipulation
getattr(), setattr(), delattr(), hasattr()Attribute manipulation

Forbidden Module Functions

ModuleBlocked Functions
ossystem, popen, spawn, exec, fork, kill, killpg
importlibimport_module, __import__
pickleloads, load (arbitrary code execution risk)
marshalloads, load

Warning Patterns

These patterns generate warnings and may fail in strict mode:
  • pickle, marshal, shelve usage
  • Dunder attribute access: __builtins__, __class__, __bases__, __subclasses__, __mro__, __code__, __globals__

Validation Errors

If your plugin fails validation, the API returns:
{
  "detail": "Security scan failed",
  "violations": [
    {
      "file": "processors/core.py",
      "line": 15,
      "column": 0,
      "message": "Forbidden import: subprocess",
      "severity": "error"
    }
  ]
}

Feature URI

After deployment, reference your plugin with:
This URI is organization-scoped - two orgs can have the same plugin name without collision.

Namespace Isolation

Plugins are isolated per namespace:
  • MongoDB: internal_id scoping ensures org isolation
  • S3: Archives stored at org_id/ns_id/plugin_custom/...
  • Ray Serve: Deployment names include namespace prefix to prevent collisions

CLI Commands

CommandDescription
mixpeek plugin init <name>Create plugin from template
mixpeek plugin testValidate and test locally
mixpeek plugin publishUpload to namespace
mixpeek plugin listList plugins in namespace
See Mixpeek CLI for full reference.