Skip to main content
The Mixpeek CLI (mixpeek) is a command-line tool for developing custom feature extractors that run on Mixpeek’s infrastructure. Build your own processing logic and deploy it alongside Mixpeek’s built-in extractors.
Custom plugins are an Enterprise feature that requires dedicated infrastructure. Contact us to enable custom plugins for your organization.

Features

Plugin Templates

Generate complete plugin scaffolds with schemas, pipelines, and tests

Local Testing

Validate structure, run tests, and test with sample data

Security Scanning

Automatic security validation before deployment

Easy Publishing

Upload plugins to your namespace with a single command

Installation

Install the Mixpeek SDK which includes the CLI:
pip install mixpeek
Verify installation:
mixpeek --version

Quick Start

Get a custom plugin running in 5 minutes:
# 1. Create a new plugin
mixpeek plugin init my_extractor --category text

# 2. Navigate to plugin directory
cd my_extractor

# 3. Edit your processing logic
# Open processors/core.py and add your code

# 4. Test locally
mixpeek plugin test

# 5. Publish to your namespace
mixpeek plugin publish --namespace ns_your_namespace

Configuration

Environment Variables

Set these environment variables to avoid passing them with every command:
export MIXPEEK_API_KEY="sk_your_api_key"
export MIXPEEK_NAMESPACE="ns_your_namespace"
export MIXPEEK_BASE_URL="https://api.mixpeek.com"  # Optional, defaults to production

Command-Line Options

Global options available for all commands:
OptionEnvironment VariableDescription
--api-keyMIXPEEK_API_KEYYour Mixpeek API key
--base-urlMIXPEEK_BASE_URLAPI base URL (default: https://api.mixpeek.com)

Commands

mixpeek plugin init

Create a new plugin from a template with all required files and structure.
mixpeek plugin init <name> [options]
Arguments:
  • <name> - Plugin name (letters, numbers, underscores, hyphens)
Options:
OptionShortDefaultDescription
--description-d”A custom Mixpeek extractor”Plugin description
--category-ctextPlugin category
--author-aGit user.nameAuthor name
--output-oCurrent directoryOutput directory
Categories:
  • text - Text processing (NLP, classification, embeddings)
  • image - Image analysis (detection, recognition)
  • video - Video processing (scene detection, analysis)
  • audio - Audio processing (transcription, analysis)
  • document - Document processing (PDF, OCR)
  • multimodal - Multiple content types
Examples:
# Basic plugin
mixpeek plugin init sentiment_analyzer

# With options
mixpeek plugin init product_classifier \
  --category text \
  --description "Classify products into categories" \
  --author "Your Name"

# Image processing plugin
mixpeek plugin init face_detector --category image

# Specify output directory
mixpeek plugin init my_plugin --output ./plugins/

mixpeek plugin test

Validate plugin structure, run tests, and optionally test with sample data.
mixpeek plugin test [options]
Options:
OptionShortDefaultDescription
--path-p.Plugin directory path
--sample-data-sNoneSample data file (JSON/CSV)
--verbose-vFalseShow detailed output
What it validates:
  1. Structure - Required files exist (manifest.py, pipeline.py)
  2. Schemas - Input/output schemas are valid Pydantic models
  3. Pipeline - build_steps() function is defined and callable
  4. Tests - Runs pytest if tests exist in tests/ directory
  5. Sample Data - Processes sample data through pipeline (if provided)
Examples:
# Test current directory
mixpeek plugin test

# Test specific plugin
mixpeek plugin test --path ./my_extractor

# Test with sample data
mixpeek plugin test --sample-data samples.json --verbose

# CSV sample data
mixpeek plugin test -s test_data.csv -v
Sample Data Format: JSON:
[
  {"text": "Sample text 1", "metadata": {"source": "test"}},
  {"text": "Sample text 2", "metadata": {"source": "test"}}
]
CSV:
text,metadata.source
"Sample text 1","test"
"Sample text 2","test"

mixpeek plugin validate

Validate plugin structure without running full tests. Alias for test without sample data.
mixpeek plugin validate [options]
Options:
OptionShortDefaultDescription
--path-p.Plugin directory path

mixpeek plugin publish

Upload and deploy your plugin to Mixpeek.
mixpeek plugin publish [options]
Options:
OptionShortDefaultDescription
--path-p.Plugin directory path
--namespace-nMIXPEEK_NAMESPACETarget namespace ID
--dry-runFalseValidate without uploading
What happens during publish:
  1. Validates plugin structure and schemas
  2. Runs security scan for forbidden patterns
  3. Creates compressed archive (.tar.gz)
  4. Requests presigned upload URL from API
  5. Uploads archive directly to S3
  6. Confirms upload and triggers validation
  7. Plugin becomes available in your namespace
Examples:
# Publish to default namespace
mixpeek plugin publish

# Specify namespace
mixpeek plugin publish --namespace ns_abc123

# Dry run (validate without uploading)
mixpeek plugin publish --dry-run

# Publish specific plugin
mixpeek plugin publish --path ./plugins/my_extractor

mixpeek plugin list

List all plugins available in your namespace.
mixpeek plugin list [options]
Options:
OptionShortDefaultDescription
--namespace-nMIXPEEK_NAMESPACENamespace ID
--source-sallFilter by source type
Source Types:
  • all - All plugins
  • builtin - Mixpeek’s built-in extractors
  • custom - Your custom plugins
  • community - Community-contributed plugins
Examples:
# List all plugins
mixpeek plugin list

# List only custom plugins
mixpeek plugin list --source custom

# List plugins in specific namespace
mixpeek plugin list --namespace ns_abc123

Plugin Structure

Every plugin has the following structure:
my_extractor/
├── __init__.py          # Package exports
├── manifest.py          # Metadata and schemas
├── pipeline.py          # Pipeline definition
├── realtime.py          # HTTP endpoint (optional)
├── processors/
│   ├── __init__.py
│   └── core.py          # Your processing logic
├── tests/
│   └── test_plugin.py   # Unit tests
├── README.md            # Documentation
└── pyproject.toml       # Package config

manifest.py

Defines plugin metadata and Pydantic schemas:
from pydantic import BaseModel, Field

class MyExtractorInput(BaseModel):
    """Input schema - what your plugin accepts."""
    text: str = Field(..., description="Input text to process")

class MyExtractorOutput(BaseModel):
    """Output schema - what your plugin produces."""
    result: str = Field(..., description="Processed result")
    confidence: float = Field(..., ge=0.0, le=1.0)

class MyExtractorParams(BaseModel):
    """Parameters users can configure when creating collections."""
    threshold: float = Field(default=0.5, ge=0.0, le=1.0)
    text_column: str = Field(default="text")

# Plugin metadata
metadata = {
    "name": "my_extractor",
    "version": "v1",
    "description": "My custom extractor",
    "category": "text",
    "author": "Your Name",
}

input_schema = MyExtractorInput
output_schema = MyExtractorOutput
parameter_schema = MyExtractorParams

pipeline.py

Defines how your plugin processes data using the declarative pipeline system:
from typing import Any, Dict, Optional
from engine.plugins.extractors.pipeline import (
    PipelineDefinition,
    ResourceType,
    RowCondition,
    StepDefinition,
    build_pipeline_steps,
)
from shared.collection.features.extractors.models import ExtractorDefinition
from .manifest import MyExtractorParams, metadata
from .processors.core import MyExtractorConfig, MyExtractorProcessor

def build_steps(
    extractor_request: Any,
    container: Optional[Any] = None,
    base_steps: Optional[list] = None,
    dataset_size: Optional[int] = None,
    content_flags: Optional[dict] = None,
) -> Dict[str, Any]:
    """Build the extraction pipeline."""

    # Get user-configured parameters
    params = MyExtractorParams(**(extractor_request.extractor_config.parameters or {}))

    # Define pipeline steps
    steps = [
        StepDefinition(
            service_class=MyExtractorProcessor,
            resource_type=ResourceType.CPU,  # CPU, GPU, or API
            config=MyExtractorConfig(
                threshold=params.threshold,
                text_column=params.text_column,
            ),
            condition=RowCondition.IS_TEXT,  # Only process text rows
        ),
    ]

    pipeline = PipelineDefinition(name="my_extractor", version="v1", steps=steps)
    pipeline_steps = build_pipeline_steps(pipeline, dataset_size=dataset_size)

    return {
        "steps": (base_steps or []) + pipeline_steps,
        "prepare": lambda ds: ds,
    }

extractor_definition = ExtractorDefinition(
    name=metadata["name"],
    version=metadata["version"],
    extractor_function_path=build_steps,
    requires_file_resolution=False,
)

processors/core.py

Your actual processing logic:
from dataclasses import dataclass
from typing import Tuple
import pandas as pd

@dataclass
class MyExtractorConfig:
    """Configuration passed from pipeline."""
    threshold: float = 0.5
    text_column: str = "text"

class MyExtractorProcessor:
    """Batch processor - instantiated once, called repeatedly."""

    def __init__(self, config: MyExtractorConfig, progress_actor=None):
        self.config = config
        self._model = None  # Lazy load expensive resources

    def _load_model(self):
        """Load model on first use."""
        if self._model is None:
            # Load your model here
            # self._model = transformers.AutoModel.from_pretrained("...")
            self._model = "loaded"

    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        """Process a batch of data."""
        self._load_model()

        results = []
        for text in batch[self.config.text_column].fillna(""):
            result, confidence = self._process_single(text)
            results.append({"result": result, "confidence": confidence})

        batch["output"] = results
        return batch

    def _process_single(self, text: str) -> Tuple[str, float]:
        """Process single item."""
        # Your processing logic here
        return f"Processed: {text[:50]}", 0.95

realtime.py (Optional)

HTTP endpoint for real-time inference:
from typing import Any, Dict
from shared.plugins.inference.serve import BaseInferenceService, ServeDeploymentConfig
from .processors.core import MyExtractorConfig, MyExtractorProcessor

class MyExtractorRealtimeService(BaseInferenceService):
    """Real-time HTTP endpoint deployed via Ray Serve."""

    serve_config = ServeDeploymentConfig(enabled=True, num_replicas=1)

    def __init__(self):
        super().__init__()
        self._processor = None

    async def _process_single(
        self, inputs: Dict[str, Any], parameters: Dict[str, Any]
    ) -> Dict[str, Any]:
        if self._processor is None:
            self._processor = MyExtractorProcessor(MyExtractorConfig())

        text = inputs.get("text", "")
        result, confidence = self._processor._process_single(text)
        return {"result": result, "confidence": confidence}

__all__ = ["MyExtractorRealtimeService"]

Resource Types

Tell the framework what resources your processor needs:
ResourceTypeUse ForExample Use Cases
CPUCPU-bound processingText embeddings (E5, MiniLM), classification, NLP
GPUGPU-accelerated modelsLocal Whisper, CLIP, custom PyTorch models
APIExternal API callsOpenAI, Anthropic, Google Vertex AI
The framework automatically configures:
  • Actor resources (num_cpus, num_gpus)
  • Batch sizes based on hardware
  • Concurrency scaling based on dataset size

Row Conditions

Filter which rows your processor handles:
from engine.plugins.extractors.pipeline import RowCondition

# Available conditions
RowCondition.IS_TEXT       # Text content only
RowCondition.IS_IMAGE      # Image content only
RowCondition.IS_VIDEO      # Video content only
RowCondition.IS_AUDIO      # Audio content only
RowCondition.IS_PDF        # PDF documents
RowCondition.IS_VIDEO_OR_AUDIO  # Video or audio
RowCondition.ALWAYS        # All rows (default)

Using Built-in Services

Compose existing Mixpeek services into your pipeline:
from engine.inference.openai.whisper_api.services import WhisperTranscriptionBatch
from engine.inference.intfloat.multilingual_e5_large_instruct.services import E5TextEmbeddingBatch
from shared.inference.openai.whisper_api.models import InferenceConfigs as WhisperConfig
from shared.inference.intfloat.multilingual_e5_large_instruct.models import InferenceConfigs as E5Config

def build_steps(extractor_request, **kwargs):
    steps = [
        # Step 1: Transcribe with Whisper
        StepDefinition(
            service_class=WhisperTranscriptionBatch,
            resource_type=ResourceType.API,
            config=WhisperConfig(
                bytes_column="audio_bytes",
                output_column_name="transcription"
            ),
            condition=RowCondition.IS_VIDEO_OR_AUDIO,
        ),
        # Step 2: Embed with E5
        StepDefinition(
            service_class=E5TextEmbeddingBatch,
            resource_type=ResourceType.CPU,
            config=E5Config(
                text_column="transcription",
                output_column_name="embedding"
            ),
        ),
    ]

    pipeline = PipelineDefinition(name="transcribe_and_embed", version="v1", steps=steps)
    return {"steps": build_pipeline_steps(pipeline), "prepare": lambda ds: ds}

Available Services

CategoryServiceDescription
EmbeddingsE5TextEmbeddingBatchE5 text embeddings (1024-dim)
EmbeddingsMiniLMEmbeddingBatchMiniLM embeddings (384-dim)
EmbeddingsCLIPEmbeddingBatchCLIP image/text embeddings
EmbeddingsSigLIPEmbeddingBatchSigLIP embeddings
TranscriptionWhisperTranscriptionBatchWhisper API transcription
VideoFFmpegParallelChunkingVideo segmentation
FaceFaceDetectionBatchFace detection
FaceFaceEmbeddingBatchFace embeddings
RerankingBGERerankerBatchBGE reranking
LLMVertexAIBatchGoogle Vertex AI

Security

Plugins are security-scanned before deployment. The following are not allowed:
ForbiddenReason
subprocess, os.systemSystem command execution
eval, execDynamic code execution
__import__, importlibDynamic imports
socketDirect network access
ctypesLow-level memory access
Plugins that contain forbidden patterns will fail validation and cannot be deployed.

Using Your Plugin

After publishing, use your plugin when creating collections:
from mixpeek import Mixpeek

client = Mixpeek(api_key="sk_...")

# Create collection with your custom extractor
collection = client.collections.create(
    collection_name="my_collection",
    source={"type": "bucket", "bucket_ids": ["bkt_..."]},
    feature_extractor={
        "feature_extractor_name": "my_extractor",  # Your plugin name
        "version": "v1",
        "parameters": {
            "threshold": 0.7,  # Your custom parameters
            "text_column": "content"
        }
    }
)

Examples

Sentiment Classifier

mixpeek plugin init sentiment_classifier --category text \
  --description "Classify text sentiment as positive, negative, or neutral"
Edit processors/core.py:
from transformers import pipeline

class SentimentClassifierProcessor:
    def __init__(self, config, progress_actor=None):
        self.config = config
        self._model = None

    def _load_model(self):
        if self._model is None:
            self._model = pipeline("sentiment-analysis")

    def __call__(self, batch):
        self._load_model()

        texts = batch[self.config.text_column].fillna("").tolist()
        results = self._model(texts)

        batch["sentiment"] = [r["label"] for r in results]
        batch["confidence"] = [r["score"] for r in results]
        return batch

Image Embedder

mixpeek plugin init image_embedder --category image \
  --description "Generate embeddings for images using CLIP"
Edit processors/core.py:
import torch
from PIL import Image
from transformers import CLIPProcessor, CLIPModel

class ImageEmbedderProcessor:
    def __init__(self, config, progress_actor=None):
        self.config = config
        self._model = None
        self._processor = None

    def _load_model(self):
        if self._model is None:
            self._model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
            self._processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

    def __call__(self, batch):
        self._load_model()

        embeddings = []
        for image_bytes in batch["image_bytes"]:
            image = Image.open(io.BytesIO(image_bytes))
            inputs = self._processor(images=image, return_tensors="pt")
            with torch.no_grad():
                embedding = self._model.get_image_features(**inputs)
            embeddings.append(embedding.numpy().tolist()[0])

        batch["embedding"] = embeddings
        return batch

Troubleshooting

Common Issues

Plugin not found after publishing
  • Verify you’re using the correct namespace
  • Check that the plugin passed validation (no security violations)
  • Wait a few seconds for deployment to complete
Import errors during testing
  • Ensure you’re in the plugin’s parent directory
  • Check that all dependencies are installed
  • Verify the plugin has __init__.py files
Security scan failures
  • Remove any forbidden imports (subprocess, eval, etc.)
  • Use approved libraries for network requests
  • Avoid dynamic code execution patterns
Validation errors
  • Ensure manifest.py exports metadata, input_schema, output_schema
  • Verify pipeline.py has a build_steps function
  • Check that all required files exist

Debug Mode

Enable verbose output for debugging:
mixpeek plugin test --verbose
mixpeek plugin publish --dry-run

API Reference

Plugin Management Endpoints

EndpointMethodDescription
/v1/namespaces/{id}/plugins/uploadsPOSTGet presigned upload URL
/v1/namespaces/{id}/plugins/uploads/{id}/confirmPOSTConfirm upload
/v1/namespaces/{id}/pluginsGETList plugins
/v1/namespaces/{id}/plugins/{id}GETGet plugin details
/v1/namespaces/{id}/plugins/{id}DELETEDelete plugin
/v1/namespaces/{id}/plugins/{id}/deployPOSTDeploy plugin

Next Steps


Questions? Contact support at [email protected] or visit mixpeek.com/support.