What are pipelines?

Mixpeek pipelines are serverless functions executed when criteria is met in upstream source connections like S3.

They are integrated within your private Github repository so you can tie pipeline changes to downstream data changes via version control.

How to set them up via Github

This feature is currently in beta and may be subject to changes or potential issues. Please use it with caution and report any problems you encounter.

To setup a pipeline, create a Github repository that matches this structure:

my-repo/
├── .github/
│   └── workflows/
│       └── build-and-push-docker.yml
├── pipeline1/
│   ├── handler.py
│   └── config.json
├── pipeline2/
│   ├── handler.py
│   └── config.json
└── pipeline3/
    ├── handler.py
    └── config.json

You can also fork this repository: https://github.com/mixpeek/example-pipelines

Workflow

build-and-push-docker.yml contains logic which that is triggered via Github actions. It bundles all the changes and loads it into a pipeline.

Each folder contains a pipeline handler.py file as well as a config.json file.

Example

handler.py

The handler.py (which must be named as such) contains your pipeline transformation logic. It must contain certain characteristics in order to work. Here’s an example:

import os
from mixpeek import Mixpeek, FileTools

def handler(event):
    mixpeek = Mixpeek(os.getenv('API_KEY'))
    file_url = mixpeek.file_url(event['bucket'], event['key'])
    video_chunks = FileTools.split_video(file_url, frame_interval=10)

    full_video = []

    for chunk in video_chunks:
        obj = {
            "embedding": mixpeek.embed.video(chunk, "mixpeek/vuse-generic-v1"),
        }
        full_video.append(obj)

    return full_video

Note the API_KEY is loaded as an environment variable. We automatically load your API key (from the Github workflow) into the handler.py environment variable.

The event parameter is passed from the upstream change.

Use the mixpeek.file_url class to grab the event’s file_url, we’ll automatically validate and discover it.

config.json

This contains how you want the configuration rules for your pipleline:

{
  "alias": "video-processor",
  "destination": {
    "connection_id": "conn_123",
    "collection": "data_bucket"
  },
  "source": {
    "connection_id": "conn_321",
    "filters": {
      "foo": "bar"
    }
  }
}

What do these keys mean?

  • alias: Your unique name for the pipeline, used when invoking

  • source: This section specifies the origin of the data that the pipeline will process.

    • connection_id: This is the identifier for the upstream data source connection. You set this up in the Create Connections section. It tells the pipeline from which connection to fetch the data.
    • filters: These are conditions used to select specific data from the source connection. For example, {"foo": "bar"} means the pipeline will only process data where the attribute foo equals bar. This helps in processing only relevant data, reducing unnecessary computation and focusing on targeted datasets.
  • destination: This section specifies where the output of the pipeline will be sent or stored.

    • connection_id: This is the identifier for the downstream data destination connection. You set this up in the Create Connections section. It tells the pipeline where to send or store the processed data.
    • collection: This is the specific location or container within the destination where data will be stored. For example, data_bucket could be a specific database or storage bucket.