What You’ll Build
A custom text embedding plugin that:- Generates 128-dimensional embeddings from text
- Processes documents via batch pipeline
- Supports search via a retriever
Prerequisites
- A Mixpeek API key
- A namespace (create one via the API if needed)
curlfor API calls
Step 1: Create Plugin Files
Create a directorytext_embed/ with three files:
manifest.py
Copy
Ask AI
feature_extractor_name = "text_embed"
version = "1.0.0"
description = "Text embedding plugin"
dependencies = []
features = [
{
"feature_type": "embedding",
"feature_name": "text_embed_v1_embedding",
"embedding_dim": 128,
"distance_metric": "cosine",
}
]
output_schema = {
"text_embed_v1_embedding": {
"type": "array",
"items": {"type": "number"},
"description": "128-dim text embedding",
},
}
input_mappings = {"text": "text"}
tier = 1
tier_label = "SIMPLE"
# Skip GPU — this plugin is CPU-only
compute_profile = {"resource_type": "cpu"}
Use the exact key names:
feature_type, feature_name, embedding_dim, distance_metric. Using name/type/dimensions/distance will silently fail.pipeline.py
Copy
Ask AI
import hashlib
from typing import Any, Dict, List, Optional
import numpy as np
import pandas as pd
def text_to_embedding(text: str, dim: int = 128) -> List[float]:
"""Generate deterministic embedding from text."""
hash_bytes = hashlib.sha256(text.encode("utf-8")).digest()
seed = int.from_bytes(hash_bytes[:4], byteorder="big")
rng = np.random.default_rng(seed)
embedding = rng.standard_normal(dim).astype(np.float32)
norm = np.linalg.norm(embedding)
if norm > 0:
embedding = embedding / norm
return embedding.tolist()
class TextEmbedBatchProcessor:
def __init__(self, config=None, **kwargs):
config = config or {}
# IMPORTANT: Custom plugins receive data in the 'data' column
self.text_column = "data"
self.output_column = "text_embed_v1_embedding"
self.embedding_dim = 128
def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
if batch.empty:
return batch
batch = batch.reset_index(drop=True)
texts = []
valid_indices = []
for idx, v in enumerate(batch.get(self.text_column, [])):
text = "" if v is None else str(v)
if text.strip():
texts.append(text)
valid_indices.append(idx)
batch[self.output_column] = None
if texts:
embeddings = [text_to_embedding(t, self.embedding_dim) for t in texts]
for i, orig_idx in enumerate(valid_indices):
batch.at[orig_idx, self.output_column] = embeddings[i]
return batch
def build_steps(extractor_request=None, container=None,
base_steps=None, dataset_size=None, content_flags=None):
processor = TextEmbedBatchProcessor()
steps = list(base_steps or [])
steps.append(processor)
return {"steps": steps, "prepare": lambda ds: ds}
def extract(extractor_request=None, base_steps=None,
dataset_size=None, content_flags=None):
result = build_steps(extractor_request=extractor_request,
base_steps=base_steps, dataset_size=dataset_size,
content_flags=content_flags)
class PipelineResult:
def __init__(self, steps, prepare):
self.steps = steps
self.prepare = prepare
return PipelineResult(result["steps"], result["prepare"])
Always read from
batch["data"] — NOT batch["text"] or any other column name. The data column contains raw text for text blobs and S3 URLs for binary blobs.Step 2: Package and Upload
Copy
Ask AI
# Package
cd /path/to/parent && zip -r text_embed.zip text_embed/
# Request presigned upload URL
UPLOAD=$(curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/uploads" \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d '{"name": "text_embed", "version": "1.0.0", "file_size_bytes": 5000}')
UPLOAD_ID=$(echo $UPLOAD | jq -r '.upload_id')
PRESIGNED_URL=$(echo $UPLOAD | jq -r '.presigned_url')
# Upload archive
curl -s -X PUT "$PRESIGNED_URL" \
-H "Content-Type: application/zip" \
--data-binary @text_embed.zip
# Confirm upload
curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/uploads/$UPLOAD_ID/confirm" \
-H "Authorization: Bearer $API_KEY"
Step 3: Deploy
Copy
Ask AI
# Deploy for batch processing (works on all tiers)
PLUGIN_ID="text_embed_1_0_0"
curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/plugins/$PLUGIN_ID/deploy?deployment_type=batch_only" \
-H "Authorization: Bearer $API_KEY"
Use
?deployment_type=batch_only unless you have Enterprise tier. Realtime endpoints require dedicated infrastructure.Step 4: Create Bucket and Upload Data
Copy
Ask AI
# Create a bucket with text schema
BUCKET=$(curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/buckets" \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d '{"name": "articles", "bucket_schema": {"properties": {"text": {"type": "text", "required": true}}}}')
BUCKET_ID=$(echo $BUCKET | jq -r '.bucket_id')
# Upload objects
curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/buckets/$BUCKET_ID/objects" \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d '{
"blobs": [{"property": "text", "type": "text", "data": "Quantum computing uses qubits to perform calculations exponentially faster than classical computers."}]
}'
Blobs must be a list, not a dict. Each blob needs
property, type, and data fields.Step 5: Create Collection and Process
Copy
Ask AI
# Create collection with your plugin
COLLECTION=$(curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/collections" \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d "{
\"name\": \"text_embed_articles\",
\"feature_extractors\": [{\"feature_extractor_name\": \"text_embed\", \"version\": \"1.0.0\"}],
\"source\": {\"type\": \"bucket\", \"bucket_ids\": [\"$BUCKET_ID\"]}
}")
COLLECTION_ID=$(echo $COLLECTION | jq -r '.collection_id')
# Trigger batch processing
BATCH=$(curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/buckets/$BUCKET_ID/batches/trigger" \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d "{\"collection_ids\": [\"$COLLECTION_ID\"]}")
BATCH_ID=$(echo $BATCH | jq -r '.batch_id')
# Poll for completion
while true; do
STATUS=$(curl -s "https://api.mixpeek.com/v1/namespaces/$NS_ID/buckets/$BUCKET_ID/batches/$BATCH_ID" \
-H "Authorization: Bearer $API_KEY" | jq -r '.status')
echo "Batch status: $STATUS"
[ "$STATUS" = "COMPLETED" ] || [ "$STATUS" = "FAILED" ] && break
sleep 10
done
Step 6: Create Retriever and Search
Copy
Ask AI
# Create retriever
RETRIEVER=$(curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/retrievers" \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d "{
\"name\": \"text_search\",
\"stages\": [{
\"stage_name\": \"vector_search\",
\"stage_type\": \"filter\",
\"config\": {
\"stage_id\": \"feature_search\",
\"parameters\": {
\"searches\": [{
\"feature_uri\": \"mixpeek://text_embed@1.0.0/text_embed_v1_embedding\",
\"query\": \"{{INPUT.query}}\"
}]
}
}
}]
}")
RETRIEVER_ID=$(echo $RETRIEVER | jq -r '.retriever_id')
# Search
curl -s -X POST "https://api.mixpeek.com/v1/namespaces/$NS_ID/retrievers/$RETRIEVER_ID/execute" \
-H "Authorization: Bearer $API_KEY" \
-H "Content-Type: application/json" \
-d '{"input": {"query": "quantum computing"}}' | jq '.results[:3]'
The execute format is
{"input": {"query": "..."}} — not {"query": {"input": {...}}}.Next Steps
- Read the full Plugin documentation for advanced features
- Add a
realtime.pyfor query-time inference (Enterprise) - Configure
compute_profilein your manifest to optimize resource allocation - Explore retriever stages for advanced search

