Skip to main content

What You’ll Build

A custom text embedding plugin that:
  1. Generates 128-dimensional embeddings from text
  2. Processes documents via batch pipeline
  3. Supports search via a retriever

Prerequisites

  • A Mixpeek API key
  • A namespace (create one via the API if needed)
  • curl for API calls

Step 1: Create Plugin Files

Create a directory text_embed/ with three files:

manifest.py

feature_extractor_name = "text_embed"
version = "1.0.0"
description = "Text embedding plugin"

dependencies = []

features = [
    {
        "feature_type": "embedding",
        "feature_name": "text_embed_v1_embedding",
        "embedding_dim": 128,
        "distance_metric": "cosine",
    }
]

output_schema = {
    "text_embed_v1_embedding": {
        "type": "array",
        "items": {"type": "number"},
        "description": "128-dim text embedding",
    },
}

input_mappings = {"text": "text"}
tier = 1
tier_label = "SIMPLE"

# Skip GPU — this plugin is CPU-only
compute_profile = {"resource_type": "cpu"}
Use the exact key names: feature_type, feature_name, embedding_dim, distance_metric. Using name/type/dimensions/distance will silently fail.

pipeline.py

import hashlib
from typing import Any, Dict, List, Optional

import numpy as np
import pandas as pd


def text_to_embedding(text: str, dim: int = 128) -> List[float]:
    """Generate deterministic embedding from text."""
    hash_bytes = hashlib.sha256(text.encode("utf-8")).digest()
    seed = int.from_bytes(hash_bytes[:4], byteorder="big")
    rng = np.random.default_rng(seed)
    embedding = rng.standard_normal(dim).astype(np.float32)
    norm = np.linalg.norm(embedding)
    if norm > 0:
        embedding = embedding / norm
    return embedding.tolist()


class TextEmbedBatchProcessor:
    def __init__(self, config=None, **kwargs):
        config = config or {}
        # IMPORTANT: Custom plugins receive data in the 'data' column
        self.text_column = "data"
        self.output_column = "text_embed_v1_embedding"
        self.embedding_dim = 128

    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        if batch.empty:
            return batch
        batch = batch.reset_index(drop=True)

        texts = []
        valid_indices = []
        for idx, v in enumerate(batch.get(self.text_column, [])):
            text = "" if v is None else str(v)
            if text.strip():
                texts.append(text)
                valid_indices.append(idx)

        batch[self.output_column] = None
        if texts:
            embeddings = [text_to_embedding(t, self.embedding_dim) for t in texts]
            for i, orig_idx in enumerate(valid_indices):
                batch.at[orig_idx, self.output_column] = embeddings[i]
        return batch


def build_steps(extractor_request=None, container=None,
                base_steps=None, dataset_size=None, content_flags=None):
    processor = TextEmbedBatchProcessor()
    steps = list(base_steps or [])
    steps.append(processor)
    return {"steps": steps, "prepare": lambda ds: ds}


def extract(extractor_request=None, base_steps=None,
            dataset_size=None, content_flags=None):
    result = build_steps(extractor_request=extractor_request,
                         base_steps=base_steps, dataset_size=dataset_size,
                         content_flags=content_flags)
    class PipelineResult:
        def __init__(self, steps, prepare):
            self.steps = steps
            self.prepare = prepare
    return PipelineResult(result["steps"], result["prepare"])
Always read from batch["data"] — NOT batch["text"] or any other column name. The data column contains raw text for text blobs and S3 URLs for binary blobs.

Step 2: Package and Upload

# Package
cd /path/to/parent && zip -r text_embed.zip text_embed/

# Request presigned upload URL
UPLOAD=$(curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/uploads" \
  -H "Authorization: Bearer $API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"name": "text_embed", "version": "1.0.0", "file_size_bytes": 5000}')

UPLOAD_ID=$(echo $UPLOAD | jq -r '.upload_id')
PRESIGNED_URL=$(echo $UPLOAD | jq -r '.presigned_url')

# Upload archive
curl -s -X PUT "$PRESIGNED_URL" \
  -H "Content-Type: application/zip" \
  --data-binary @text_embed.zip

# Confirm upload
curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/uploads/$UPLOAD_ID/confirm" \
  -H "Authorization: Bearer $API_KEY"

Step 3: Deploy

# Deploy for batch processing (works on all tiers)
PLUGIN_ID="text_embed_1_0_0"
curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/$PLUGIN_ID/deploy?deployment_type=batch_only" \
  -H "Authorization: Bearer $API_KEY"
Use ?deployment_type=batch_only unless you have Enterprise tier. Realtime endpoints require dedicated infrastructure.

Step 4: Create Bucket and Upload Data

# Create a bucket with text schema
BUCKET=$(curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/buckets" \
  -H "Authorization: Bearer $API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"name": "articles", "bucket_schema": {"properties": {"text": {"type": "text", "required": true}}}}')
BUCKET_ID=$(echo $BUCKET | jq -r '.bucket_id')

# Upload objects
curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/buckets/$BUCKET_ID/objects" \
  -H "Authorization: Bearer $API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "blobs": [{"property": "text", "type": "text", "data": "Quantum computing uses qubits to perform calculations exponentially faster than classical computers."}]
  }'
Blobs must be a list, not a dict. Each blob needs property, type, and data fields.

Step 5: Create Collection and Process

# Create collection with your plugin
COLLECTION=$(curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/collections" \
  -H "Authorization: Bearer $API_KEY" \
  -H "Content-Type: application/json" \
  -d "{
    \"name\": \"text_embed_articles\",
    \"feature_extractors\": [{\"feature_extractor_name\": \"text_embed\", \"version\": \"1.0.0\"}],
    \"source\": {\"type\": \"bucket\", \"bucket_ids\": [\"$BUCKET_ID\"]}
  }")
COLLECTION_ID=$(echo $COLLECTION | jq -r '.collection_id')

# Trigger batch processing
BATCH=$(curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/buckets/$BUCKET_ID/batches/trigger" \
  -H "Authorization: Bearer $API_KEY" \
  -H "Content-Type: application/json" \
  -d "{\"collection_ids\": [\"$COLLECTION_ID\"]}")
BATCH_ID=$(echo $BATCH | jq -r '.batch_id')

# Poll for completion
while true; do
  STATUS=$(curl -s "https://api.mixpeek.com/v1/namespaces/$NS_ID/buckets/$BUCKET_ID/batches/$BATCH_ID" \
    -H "Authorization: Bearer $API_KEY" | jq -r '.status')
  echo "Batch status: $STATUS"
  [ "$STATUS" = "COMPLETED" ] || [ "$STATUS" = "FAILED" ] && break
  sleep 10
done
# Create retriever
RETRIEVER=$(curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/retrievers" \
  -H "Authorization: Bearer $API_KEY" \
  -H "Content-Type: application/json" \
  -d "{
    \"name\": \"text_search\",
    \"stages\": [{
      \"stage_name\": \"vector_search\",
      \"stage_type\": \"filter\",
      \"config\": {
        \"stage_id\": \"feature_search\",
        \"parameters\": {
          \"searches\": [{
            \"feature_uri\": \"mixpeek://text_embed@1.0.0/text_embed_v1_embedding\",
            \"query\": \"{{INPUT.query}}\"
          }]
        }
      }
    }]
  }")
RETRIEVER_ID=$(echo $RETRIEVER | jq -r '.retriever_id')

# Search
curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/retrievers/$RETRIEVER_ID/execute" \
  -H "Authorization: Bearer $API_KEY" \
  -H "Content-Type: application/json" \
  -d '{"input": {"query": "quantum computing"}}' | jq '.results[:3]'
The execute format is {"input": {"query": "..."}} — not {"query": {"input": {...}}}.

Next Steps

  • Read the full Plugin documentation for advanced features
  • Add a realtime.py for query-time inference (Enterprise)
  • Configure compute_profile in your manifest to optimize resource allocation
  • Explore retriever stages for advanced search