Skip to main content

Installation

pip install mixpeek
mixpeek --version

Quick Start

# 1. Create plugin
mixpeek plugin init my_extractor --category text

# 2. Edit processors/core.py with your logic

# 3. Test locally
cd my_extractor && mixpeek plugin test

# 4. Publish
mixpeek plugin publish --namespace ns_xxx
See Custom Plugins for full plugin development guide.

Configuration

export MIXPEEK_API_KEY="sk_your_api_key"
export MIXPEEK_NAMESPACE="ns_your_namespace"
OptionEnvironment VariableDescription
--api-keyMIXPEEK_API_KEYYour Mixpeek API key
--base-urlMIXPEEK_BASE_URLAPI base URL (default: https://api.mixpeek.com)

Commands

mixpeek plugin init

Create a new plugin from template.
mixpeek plugin init <name> [options]
OptionDescription
--categorytext, image, video, audio, document, multimodal
--descriptionPlugin description
--authorAuthor name
--outputOutput directory
# Examples
mixpeek plugin init sentiment_analyzer --category text
mixpeek plugin init face_detector --category image --description "Detect faces"

mixpeek plugin test

Validate and test plugin locally.
mixpeek plugin test [options]
OptionDescription
--pathPlugin directory (default: .)
--sample-dataJSON/CSV file with test data
--verboseDetailed output
Validates:
  • Structure (manifest.py, pipeline.py exist)
  • Schemas (valid Pydantic models)
  • Pipeline (build_steps() callable)
  • Tests (runs pytest if tests/ exists)
# Examples
mixpeek plugin test
mixpeek plugin test --path ./my_extractor --verbose
mixpeek plugin test --sample-data samples.json

mixpeek plugin publish

Upload and deploy plugin to Mixpeek.
mixpeek plugin publish [options]
OptionDescription
--pathPlugin directory
--namespaceTarget namespace ID
--dry-runValidate without uploading
What happens:
  1. Validates structure and schemas
  2. Runs security scan
  3. Creates .tar.gz archive
  4. Uploads to S3 via presigned URL
  5. Confirms and triggers deployment
# Examples
mixpeek plugin publish
mixpeek plugin publish --namespace ns_abc123 --dry-run

mixpeek plugin list

List plugins in namespace.
mixpeek plugin list [options]
OptionDescription
--namespaceNamespace ID
--sourceall, builtin, custom, community
mixpeek plugin list --source custom

Plugin Structure

my_extractor/
├── manifest.py      # Metadata + schemas
├── pipeline.py      # Batch processing
├── realtime.py      # HTTP endpoint (optional, Enterprise)
└── processors/
    └── core.py      # Your logic

manifest.py

from pydantic import BaseModel, Field
from typing import List

class MyInput(BaseModel):
    text: str

class MyOutput(BaseModel):
    embedding: List[float]

class MyParams(BaseModel):
    threshold: float = Field(default=0.5)

metadata = {
    "feature_extractor_name": "my_extractor",
    "version": "1.0.0",
    "description": "My extractor",
    "category": "text",
}

input_schema = MyInput
output_schema = MyOutput
parameter_schema = MyParams
supported_input_types = ["text"]

features = [
    {
        "feature_name": "my_embedding",
        "feature_type": "embedding",
        "embedding_dim": 384,
        "distance_metric": "cosine",
    },
]

processors/core.py

from dataclasses import dataclass
import pandas as pd

@dataclass
class MyConfig:
    threshold: float = 0.5

class MyProcessor:
    def __init__(self, config: MyConfig, progress_actor=None):
        self.config = config
        self._model = None

    def _load_model(self):
        if self._model is None:
            from sentence_transformers import SentenceTransformer
            self._model = SentenceTransformer("all-MiniLM-L6-v2")

    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        self._load_model()
        texts = batch["text"].fillna("").tolist()
        batch["my_embedding"] = self._model.encode(texts).tolist()
        return batch

pipeline.py

from typing import Any, Dict, Optional
from engine.plugins.extractors.pipeline import (
    PipelineDefinition, ResourceType, RowCondition, StepDefinition, build_pipeline_steps
)
from .manifest import MyParams, metadata
from .processors.core import MyConfig, MyProcessor

def build_steps(
    extractor_request: Any,
    container: Optional[Any] = None,
    base_steps: Optional[list] = None,
    **kwargs
) -> Dict[str, Any]:
    params = MyParams(**(extractor_request.extractor_config.parameters or {}))

    steps = [
        StepDefinition(
            service_class=MyProcessor,
            resource_type=ResourceType.CPU,
            config=MyConfig(threshold=params.threshold),
            condition=RowCondition.IS_TEXT,
        ),
    ]

    pipeline = PipelineDefinition(name=metadata["feature_extractor_name"], version=metadata["version"], steps=steps)
    return {"steps": (base_steps or []) + build_pipeline_steps(pipeline), "prepare": lambda ds: ds}

realtime.py (Enterprise)

from typing import Any, Dict

class RealtimeHandler:
    def __init__(self):
        self._model = None

    def predict(self, request: Dict[str, Any]) -> Dict[str, Any]:
        if self._model is None:
            from sentence_transformers import SentenceTransformer
            self._model = SentenceTransformer("all-MiniLM-L6-v2")

        text = request.get("text", "")
        embedding = self._model.encode([text])[0].tolist()
        return {"embedding": embedding}

Resource Types

TypeUse For
ResourceType.CPUText embeddings, classification
ResourceType.GPULocal models (Whisper, CLIP)
ResourceType.APIExternal APIs (OpenAI, Vertex)

Row Conditions

RowCondition.IS_TEXT       # text/* MIME types
RowCondition.IS_IMAGE      # image/* MIME types
RowCondition.IS_VIDEO      # video/* MIME types
RowCondition.IS_AUDIO      # audio/* MIME types
RowCondition.IS_PDF        # application/pdf
RowCondition.ALWAYS        # All rows (default)

Security Constraints

Plugins are scanned before deployment. Forbidden:
PatternReason
subprocess, os.systemShell execution
eval, execDynamic code
socketDirect network
ctypesMemory access
__import__Dynamic imports

Using Your Plugin

After publishing:
client.collections.create(
    collection_name="my_collection",
    source={"type": "bucket", "bucket_ids": ["bkt_..."]},
    feature_extractor={
        "feature_extractor_name": "my_extractor",
        "version": "1.0.0",
        "parameters": {"threshold": 0.7}
    }
)

API Reference

EndpointMethodDescription
/v1/namespaces/{id}/plugins/uploadsPOSTGet presigned upload URL
/v1/namespaces/{id}/plugins/uploads/{id}/confirmPOSTConfirm upload
/v1/namespaces/{id}/pluginsGETList plugins
/v1/namespaces/{id}/plugins/{id}GETGet plugin details
/v1/namespaces/{id}/plugins/{id}DELETEDelete plugin
/v1/namespaces/{id}/plugins/{id}/deployPOSTDeploy for realtime (Enterprise)
/v1/namespaces/{id}/plugins/{id}/statusGETCheck deployment status

Troubleshooting

IssueSolution
Plugin not foundCheck namespace, wait for deployment
Import errorsEnsure __init__.py files exist
Security scan failsRemove forbidden patterns
Validation errorsCheck manifest.py exports metadata/schemas
Debug mode:
mixpeek plugin test --verbose
mixpeek plugin publish --dry-run