The Mixpeek CLI (mixpeek) is a command-line tool for developing custom feature extractors that run on Mixpeek’s infrastructure. Build your own processing logic and deploy it alongside Mixpeek’s built-in extractors.
Custom plugins are an Enterprise feature that requires dedicated infrastructure. Contact us to enable custom plugins for your organization.
Features
Plugin Templates Generate complete plugin scaffolds with schemas, pipelines, and tests
Local Testing Validate structure, run tests, and test with sample data
Security Scanning Automatic security validation before deployment
Easy Publishing Upload plugins to your namespace with a single command
Installation
Install the Mixpeek SDK which includes the CLI:
Verify installation:
Quick Start
Get a custom plugin running in 5 minutes:
# 1. Create a new plugin
mixpeek plugin init my_extractor --category text
# 2. Navigate to plugin directory
cd my_extractor
# 3. Edit your processing logic
# Open processors/core.py and add your code
# 4. Test locally
mixpeek plugin test
# 5. Publish to your namespace
mixpeek plugin publish --namespace ns_your_namespace
Configuration
Environment Variables
Set these environment variables to avoid passing them with every command:
export MIXPEEK_API_KEY = "sk_your_api_key"
export MIXPEEK_NAMESPACE = "ns_your_namespace"
export MIXPEEK_BASE_URL = "https://api.mixpeek.com" # Optional, defaults to production
Command-Line Options
Global options available for all commands:
Option Environment Variable Description --api-keyMIXPEEK_API_KEYYour Mixpeek API key --base-urlMIXPEEK_BASE_URLAPI base URL (default: https://api.mixpeek.com )
Commands
mixpeek plugin init
Create a new plugin from a template with all required files and structure.
mixpeek plugin init < nam e > [options]
Arguments:
<name> - Plugin name (letters, numbers, underscores, hyphens)
Options:
Option Short Default Description --description-d”A custom Mixpeek extractor” Plugin description --category-ctextPlugin category --author-aGit user.name Author name --output-oCurrent directory Output directory
Categories:
text - Text processing (NLP, classification, embeddings)
image - Image analysis (detection, recognition)
video - Video processing (scene detection, analysis)
audio - Audio processing (transcription, analysis)
document - Document processing (PDF, OCR)
multimodal - Multiple content types
Examples:
# Basic plugin
mixpeek plugin init sentiment_analyzer
# With options
mixpeek plugin init product_classifier \
--category text \
--description "Classify products into categories" \
--author "Your Name"
# Image processing plugin
mixpeek plugin init face_detector --category image
# Specify output directory
mixpeek plugin init my_plugin --output ./plugins/
mixpeek plugin test
Validate plugin structure, run tests, and optionally test with sample data.
mixpeek plugin test [options]
Options:
Option Short Default Description --path-p.Plugin directory path --sample-data-sNone Sample data file (JSON/CSV) --verbose-vFalse Show detailed output
What it validates:
Structure - Required files exist (manifest.py, pipeline.py)
Schemas - Input/output schemas are valid Pydantic models
Pipeline - build_steps() function is defined and callable
Tests - Runs pytest if tests exist in tests/ directory
Sample Data - Processes sample data through pipeline (if provided)
Examples:
# Test current directory
mixpeek plugin test
# Test specific plugin
mixpeek plugin test --path ./my_extractor
# Test with sample data
mixpeek plugin test --sample-data samples.json --verbose
# CSV sample data
mixpeek plugin test -s test_data.csv -v
Sample Data Format:
JSON:
[
{ "text" : "Sample text 1" , "metadata" : { "source" : "test" }},
{ "text" : "Sample text 2" , "metadata" : { "source" : "test" }}
]
CSV:
text, metadata.source
"Sample text 1", "test"
"Sample text 2", "test"
mixpeek plugin validate
Validate plugin structure without running full tests. Alias for test without sample data.
mixpeek plugin validate [options]
Options:
Option Short Default Description --path-p.Plugin directory path
mixpeek plugin publish
Upload and deploy your plugin to Mixpeek.
mixpeek plugin publish [options]
Options:
Option Short Default Description --path-p.Plugin directory path --namespace-nMIXPEEK_NAMESPACETarget namespace ID --dry-runFalse Validate without uploading
What happens during publish:
Validates plugin structure and schemas
Runs security scan for forbidden patterns
Creates compressed archive (.tar.gz)
Requests presigned upload URL from API
Uploads archive directly to S3
Confirms upload and triggers validation
Plugin becomes available in your namespace
Examples:
# Publish to default namespace
mixpeek plugin publish
# Specify namespace
mixpeek plugin publish --namespace ns_abc123
# Dry run (validate without uploading)
mixpeek plugin publish --dry-run
# Publish specific plugin
mixpeek plugin publish --path ./plugins/my_extractor
mixpeek plugin list
List all plugins available in your namespace.
mixpeek plugin list [options]
Options:
Option Short Default Description --namespace-nMIXPEEK_NAMESPACENamespace ID --source-sallFilter by source type
Source Types:
all - All plugins
builtin - Mixpeek’s built-in extractors
custom - Your custom plugins
community - Community-contributed plugins
Examples:
# List all plugins
mixpeek plugin list
# List only custom plugins
mixpeek plugin list --source custom
# List plugins in specific namespace
mixpeek plugin list --namespace ns_abc123
Plugin Structure
Every plugin has the following structure:
my_extractor/
├── __init__.py # Package exports
├── manifest.py # Metadata and schemas
├── pipeline.py # Pipeline definition
├── realtime.py # HTTP endpoint (optional)
├── processors/
│ ├── __init__.py
│ └── core.py # Your processing logic
├── tests/
│ └── test_plugin.py # Unit tests
├── README.md # Documentation
└── pyproject.toml # Package config
manifest.py
Defines plugin metadata and Pydantic schemas:
from pydantic import BaseModel, Field
class MyExtractorInput ( BaseModel ):
"""Input schema - what your plugin accepts."""
text: str = Field( ... , description = "Input text to process" )
class MyExtractorOutput ( BaseModel ):
"""Output schema - what your plugin produces."""
result: str = Field( ... , description = "Processed result" )
confidence: float = Field( ... , ge = 0.0 , le = 1.0 )
class MyExtractorParams ( BaseModel ):
"""Parameters users can configure when creating collections."""
threshold: float = Field( default = 0.5 , ge = 0.0 , le = 1.0 )
text_column: str = Field( default = "text" )
# Plugin metadata
metadata = {
"name" : "my_extractor" ,
"version" : "v1" ,
"description" : "My custom extractor" ,
"category" : "text" ,
"author" : "Your Name" ,
}
input_schema = MyExtractorInput
output_schema = MyExtractorOutput
parameter_schema = MyExtractorParams
pipeline.py
Defines how your plugin processes data using the declarative pipeline system:
from typing import Any, Dict, Optional
from engine.plugins.extractors.pipeline import (
PipelineDefinition,
ResourceType,
RowCondition,
StepDefinition,
build_pipeline_steps,
)
from shared.collection.features.extractors.models import ExtractorDefinition
from .manifest import MyExtractorParams, metadata
from .processors.core import MyExtractorConfig, MyExtractorProcessor
def build_steps (
extractor_request : Any,
container : Optional[Any] = None ,
base_steps : Optional[ list ] = None ,
dataset_size : Optional[ int ] = None ,
content_flags : Optional[ dict ] = None ,
) -> Dict[ str , Any]:
"""Build the extraction pipeline."""
# Get user-configured parameters
params = MyExtractorParams( ** (extractor_request.extractor_config.parameters or {}))
# Define pipeline steps
steps = [
StepDefinition(
service_class = MyExtractorProcessor,
resource_type = ResourceType. CPU , # CPU, GPU, or API
config = MyExtractorConfig(
threshold = params.threshold,
text_column = params.text_column,
),
condition = RowCondition. IS_TEXT , # Only process text rows
),
]
pipeline = PipelineDefinition( name = "my_extractor" , version = "v1" , steps = steps)
pipeline_steps = build_pipeline_steps(pipeline, dataset_size = dataset_size)
return {
"steps" : (base_steps or []) + pipeline_steps,
"prepare" : lambda ds : ds,
}
extractor_definition = ExtractorDefinition(
name = metadata[ "name" ],
version = metadata[ "version" ],
extractor_function_path = build_steps,
requires_file_resolution = False ,
)
processors/core.py
Your actual processing logic:
from dataclasses import dataclass
from typing import Tuple
import pandas as pd
@dataclass
class MyExtractorConfig :
"""Configuration passed from pipeline."""
threshold: float = 0.5
text_column: str = "text"
class MyExtractorProcessor :
"""Batch processor - instantiated once, called repeatedly."""
def __init__ ( self , config : MyExtractorConfig, progress_actor = None ):
self .config = config
self ._model = None # Lazy load expensive resources
def _load_model ( self ):
"""Load model on first use."""
if self ._model is None :
# Load your model here
# self._model = transformers.AutoModel.from_pretrained("...")
self ._model = "loaded"
def __call__ ( self , batch : pd.DataFrame) -> pd.DataFrame:
"""Process a batch of data."""
self ._load_model()
results = []
for text in batch[ self .config.text_column].fillna( "" ):
result, confidence = self ._process_single(text)
results.append({ "result" : result, "confidence" : confidence})
batch[ "output" ] = results
return batch
def _process_single ( self , text : str ) -> Tuple[ str , float ]:
"""Process single item."""
# Your processing logic here
return f "Processed: { text[: 50 ] } " , 0.95
realtime.py (Optional)
HTTP endpoint for real-time inference:
from typing import Any, Dict
from shared.plugins.inference.serve import BaseInferenceService, ServeDeploymentConfig
from .processors.core import MyExtractorConfig, MyExtractorProcessor
class MyExtractorRealtimeService ( BaseInferenceService ):
"""Real-time HTTP endpoint deployed via Ray Serve."""
serve_config = ServeDeploymentConfig( enabled = True , num_replicas = 1 )
def __init__ ( self ):
super (). __init__ ()
self ._processor = None
async def _process_single (
self , inputs : Dict[ str , Any], parameters : Dict[ str , Any]
) -> Dict[ str , Any]:
if self ._processor is None :
self ._processor = MyExtractorProcessor(MyExtractorConfig())
text = inputs.get( "text" , "" )
result, confidence = self ._processor._process_single(text)
return { "result" : result, "confidence" : confidence}
__all__ = [ "MyExtractorRealtimeService" ]
Resource Types
Tell the framework what resources your processor needs:
ResourceType Use For Example Use Cases CPUCPU-bound processing Text embeddings (E5, MiniLM), classification, NLP GPUGPU-accelerated models Local Whisper, CLIP, custom PyTorch models APIExternal API calls OpenAI, Anthropic, Google Vertex AI
The framework automatically configures:
Actor resources (num_cpus, num_gpus)
Batch sizes based on hardware
Concurrency scaling based on dataset size
Row Conditions
Filter which rows your processor handles:
from engine.plugins.extractors.pipeline import RowCondition
# Available conditions
RowCondition. IS_TEXT # Text content only
RowCondition. IS_IMAGE # Image content only
RowCondition. IS_VIDEO # Video content only
RowCondition. IS_AUDIO # Audio content only
RowCondition. IS_PDF # PDF documents
RowCondition. IS_VIDEO_OR_AUDIO # Video or audio
RowCondition. ALWAYS # All rows (default)
Using Built-in Services
Compose existing Mixpeek services into your pipeline:
from engine.inference.openai.whisper_api.services import WhisperTranscriptionBatch
from engine.inference.intfloat.multilingual_e5_large_instruct.services import E5TextEmbeddingBatch
from shared.inference.openai.whisper_api.models import InferenceConfigs as WhisperConfig
from shared.inference.intfloat.multilingual_e5_large_instruct.models import InferenceConfigs as E5Config
def build_steps ( extractor_request , ** kwargs ):
steps = [
# Step 1: Transcribe with Whisper
StepDefinition(
service_class = WhisperTranscriptionBatch,
resource_type = ResourceType. API ,
config = WhisperConfig(
bytes_column = "audio_bytes" ,
output_column_name = "transcription"
),
condition = RowCondition. IS_VIDEO_OR_AUDIO ,
),
# Step 2: Embed with E5
StepDefinition(
service_class = E5TextEmbeddingBatch,
resource_type = ResourceType. CPU ,
config = E5Config(
text_column = "transcription" ,
output_column_name = "embedding"
),
),
]
pipeline = PipelineDefinition( name = "transcribe_and_embed" , version = "v1" , steps = steps)
return { "steps" : build_pipeline_steps(pipeline), "prepare" : lambda ds : ds}
Available Services
Category Service Description Embeddings E5TextEmbeddingBatchE5 text embeddings (1024-dim) Embeddings MiniLMEmbeddingBatchMiniLM embeddings (384-dim) Embeddings CLIPEmbeddingBatchCLIP image/text embeddings Embeddings SigLIPEmbeddingBatchSigLIP embeddings Transcription WhisperTranscriptionBatchWhisper API transcription Video FFmpegParallelChunkingVideo segmentation Face FaceDetectionBatchFace detection Face FaceEmbeddingBatchFace embeddings Reranking BGERerankerBatchBGE reranking LLM VertexAIBatchGoogle Vertex AI
Security
Plugins are security-scanned before deployment. The following are not allowed :
Forbidden Reason subprocess, os.systemSystem command execution eval, execDynamic code execution __import__, importlibDynamic imports socketDirect network access ctypesLow-level memory access
Plugins that contain forbidden patterns will fail validation and cannot be deployed.
Using Your Plugin
After publishing, use your plugin when creating collections:
from mixpeek import Mixpeek
client = Mixpeek( api_key = "sk_..." )
# Create collection with your custom extractor
collection = client.collections.create(
collection_name = "my_collection" ,
source = { "type" : "bucket" , "bucket_ids" : [ "bkt_..." ]},
feature_extractor = {
"feature_extractor_name" : "my_extractor" , # Your plugin name
"version" : "v1" ,
"parameters" : {
"threshold" : 0.7 , # Your custom parameters
"text_column" : "content"
}
}
)
Examples
Sentiment Classifier
mixpeek plugin init sentiment_classifier --category text \
--description "Classify text sentiment as positive, negative, or neutral"
Edit processors/core.py:
from transformers import pipeline
class SentimentClassifierProcessor :
def __init__ ( self , config , progress_actor = None ):
self .config = config
self ._model = None
def _load_model ( self ):
if self ._model is None :
self ._model = pipeline( "sentiment-analysis" )
def __call__ ( self , batch ):
self ._load_model()
texts = batch[ self .config.text_column].fillna( "" ).tolist()
results = self ._model(texts)
batch[ "sentiment" ] = [r[ "label" ] for r in results]
batch[ "confidence" ] = [r[ "score" ] for r in results]
return batch
Image Embedder
mixpeek plugin init image_embedder --category image \
--description "Generate embeddings for images using CLIP"
Edit processors/core.py:
import torch
from PIL import Image
from transformers import CLIPProcessor, CLIPModel
class ImageEmbedderProcessor :
def __init__ ( self , config , progress_actor = None ):
self .config = config
self ._model = None
self ._processor = None
def _load_model ( self ):
if self ._model is None :
self ._model = CLIPModel.from_pretrained( "openai/clip-vit-base-patch32" )
self ._processor = CLIPProcessor.from_pretrained( "openai/clip-vit-base-patch32" )
def __call__ ( self , batch ):
self ._load_model()
embeddings = []
for image_bytes in batch[ "image_bytes" ]:
image = Image.open(io.BytesIO(image_bytes))
inputs = self ._processor( images = image, return_tensors = "pt" )
with torch.no_grad():
embedding = self ._model.get_image_features( ** inputs)
embeddings.append(embedding.numpy().tolist()[ 0 ])
batch[ "embedding" ] = embeddings
return batch
Troubleshooting
Common Issues
Plugin not found after publishing
Verify you’re using the correct namespace
Check that the plugin passed validation (no security violations)
Wait a few seconds for deployment to complete
Import errors during testing
Ensure you’re in the plugin’s parent directory
Check that all dependencies are installed
Verify the plugin has __init__.py files
Security scan failures
Remove any forbidden imports (subprocess, eval, etc.)
Use approved libraries for network requests
Avoid dynamic code execution patterns
Validation errors
Ensure manifest.py exports metadata, input_schema, output_schema
Verify pipeline.py has a build_steps function
Check that all required files exist
Debug Mode
Enable verbose output for debugging:
mixpeek plugin test --verbose
mixpeek plugin publish --dry-run
API Reference
Plugin Management Endpoints
Endpoint Method Description /v1/namespaces/{id}/plugins/uploadsPOST Get presigned upload URL /v1/namespaces/{id}/plugins/uploads/{id}/confirmPOST Confirm upload /v1/namespaces/{id}/pluginsGET List plugins /v1/namespaces/{id}/plugins/{id}GET Get plugin details /v1/namespaces/{id}/plugins/{id}DELETE Delete plugin /v1/namespaces/{id}/plugins/{id}/deployPOST Deploy plugin
Next Steps
Questions? Contact support at [email protected] or visit mixpeek.com/support .