AI@home: Classifying images with Ollama – part two.


In my previous blog post, I created a basic API that would take a file as input, analyze it in ollama with a vision capable model and return the output. However, it had one major weakness: The HTTP call to the API would hang until the classifier job was finished. Especially classifying videos takes quite a while. Synchronous calls taking minutes to finish was never a good idea, so it became evident that I had to have a classifier that ran independet from the API, and then the API would just submit the jobs to a queue which you could poll for status to get the results.

Queueing mechanism

There exists a plethora of queuing mechanisms. My needs were pretty simple, so I picked the first that came up, RQ, which just submits job specification to Redis. I already have valkye, a drop-in-replacement for redis, running in my kubernetes cluster, so I didn’t need to set up more infrastructure to get it done.

I still kept my app.py from the last blog post, but it’s heavily modified. The classify jobs will submit the jobs to a queue, and I added another endpoint for getting the status of a job.

Defining queues

Defining a queue in RQ is pretty simple. Since I want to access the queue from multiple python procedures/jobs, I put it in its own file, queues.py

# queues.py
from redis import Redis
from rq import Queue
import os

VALKEY_PASSWORD = os.getenv("VALKEY_PASSWORD")
REDIS_HOST = os.getenv("REDIS_HOST", "valkey.valkey.svc.cluster.local")
REDIS_DB = int(os.getenv("REDIS_DB", "4"))

redis_conn = Redis(
host=REDIS_HOST,
port=6379,
db=REDIS_DB,
password=VALKEY_PASSWORD,
)

image_queue = Queue("image_classify", connection=redis_conn)
video_queue = Queue("video_classify", connection=redis_conn)

Then, I need to import that and the redis queuing mechanisms into app.py:

from redis import Redis
from rq import Queue
from rq.job import Job
from queues import redis_conn, image_queue, video_queue

Creating jobs to submit

After this, need to define the tasks that should be queued, so I have created a tasks.py

#!/usr/bin/env python3
from pathlib import Path
from typing import Any, Dict, Optional
from datetime import datetime

from classifier_core import (
classify_image_with_model,
process_video_path,
append_feedback_record,
VISION_MODEL,
)

# RQ will import and call these functions

def process_image_job(image_path: str,
vision_model: Optional[str] = None, original_path: str | None = None) -> Dict[str, Any]:
"""
RQ job: classify a single image.
"""
p = Path(image_path)
result = classify_image_with_model(p, model=vision_model or VISION_MODEL)

record = {
"ts": datetime.utcnow().isoformat(),
"type": "image",
"path": str(p),
"model": vision_model or VISION_MODEL,
"image_result": result,
}
append_feedback_record(record)

return {
"type": "image",
"path": str(p),
"original_path": original_path,
"result": result,
"vision_model": vision_model or VISION_MODEL
}


def process_video_job(video_path: str,
vision_model: Optional[str] = None,
agg_model: Optional[str] = None,
desired_fps: Optional[float] = None,
original_path: str | None = None) -> Dict[str, Any]:
"""
RQ job: classify a video by sampling images and aggregating.
"""
p = Path(video_path)
return process_video_path(p, vision_model=vision_model, agg_model=agg_model, desired_fps=desired_fps, original_path=original_path)

These basically use my old procedures. I have separated out that into classifier_core.py

Submitting jobs

Now, I am set to have my classification endpoints submit to the queues:

@app.post("/classify/image")
async def enqueue_image_classification(
file: UploadFile = File(...),
vision_model: Optional[str] = Query(
default=None,
description="Override vision model name for this job",
),
original_path: Optional[str] = Query(default=None),
):
if not file.content_type.startswith("image/"):
raise HTTPException(status_code=400, detail="Uploaded file is not an image")

img_path = _save_upload_to_path(file)

job = image_queue.enqueue(
process_image_job,
str(img_path),
vision_model,
original_path,
job_timeout=900,
result_ttl=604800
)

logger.info("Enqueued image job id=%s path=%s model=%s",
job.id, img_path, vision_model or DEFAULT_VISION_MODEL)

return {
"job_id": job.id,
"status": "queued",
"kind": "image",
}
@app.post("/classify/video")
async def enqueue_video_classification(
file: UploadFile = File(...),
vision_model: Optional[str] = Query(
default=None,
description="Override vision model name for this job",
),
agg_model: Optional[str] = Query(
default=None,
description="Override aggregation model name for this job",
),
desired_fps: Optional[float] = Query(
default=None,
description="Frames per second"
),
original_path: Optional[str] = Query(default=None),
):
ct = file.content_type or ""
if not (ct.startswith("video/") or ct == "application/octet-stream"):
raise HTTPException(status_code=400, detail=f"Not a video (content_type={ct})")

tmpdir = Path(tempfile.mkdtemp(prefix="vid_"))
video_path = _save_upload_to_path(file)

job = video_queue.enqueue(
process_video_job,
str(video_path),
vision_model,
agg_model,
desired_fps,
original_path,
job_timeout=86400,
)

logger.info("Enqueued video job id=%s path=%s vision_model=%s agg_model=%s",
job.id, video_path, vision_model or DEFAULT_VISION_MODEL,
agg_model or DEFAULT_AGG_MODEL)

return {
"job_id": job.id,
"status": "queued",
"kind": "video",
}

And then I need to have an endpoint that checks the status of a submitted job. A program using this API would typically submit jobs, and then poll the status endpoint to check whether the job is finished.

@app.get("/jobs/{job_id}")
async def get_job_status(job_id: str):
try:
job = Job.fetch(job_id, connection=redis_conn)
except Exception:
import logging
logging.getLogger(__name__).exception("Job.fetch failed")
raise HTTPException(status_code=404, detail="job not found")

status = job.get_status()
logger.info("Job status query id=%s status=%s", job_id, status)

return {
"job_id": job_id,
"status": status,
"result": job.result if job.is_finished else None,
"enqueued_at": job.enqueued_at,
"started_at": job.started_at,
"ended_at": job.ended_at,
}

This would tell you if the job is queued, running, completed or failed, which your application would have to handle.

The complete app.py

The app.py is now simplified a bit, as the API is no longer running the classification jobs. It basically needs to save the files to a shared file system (more on that later), and queue the files to be picked up by the worker

/usr/bin/env python3
import logging
import os
import shutil
import tempfile
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

from fastapi import FastAPI, UploadFile, File, HTTPException, Query
from fastapi.responses import JSONResponse
from redis import Redis
from rq import Queue
from rq.job import Job
from uuid import uuid4
from datetime import datetime
from queues import redis_conn, image_queue, video_queue

# -------------------------------------------------------------------
# Logging
# -------------------------------------------------------------------

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("media-classifier-api")

# -------------------------------------------------------------------
# Config
# -------------------------------------------------------------------


DEFAULT_VISION_MODEL = "qwen2.5vl:7b"
DEFAULT_AGG_MODEL = "qwen2.5vl:32b"

VISION_MODEL = DEFAULT_VISION_MODEL
AGG_MODEL = DEFAULT_AGG_MODEL
# Shared data root (backed by RWX PVC in Kubernetes)
DATA_ROOT = Path(os.getenv("DATA_ROOT", "/data/media_classifier"))
UPLOAD_ROOT = DATA_ROOT / "uploads"
UPLOAD_ROOT.mkdir(parents=True, exist_ok=True)


# Import tasks AFTER Redis is configured, to avoid circular imports
from tasks import process_image_job, process_video_job # noqa: E402

# -------------------------------------------------------------------
# FastAPI app
# -------------------------------------------------------------------

app = FastAPI(
title="Media Classifier (Queued)",
description=(
"Upload images or videos, enqueue classification jobs into Redis, "
"and poll for results asynchronously."
),
)

# -------------------------------------------------------------------
# Helpers
# -------------------------------------------------------------------

def _save_upload_to_path(file: UploadFile) -> Path:
# Use original filename; you can prepend a UUID if you want to avoid collisions.
ts = datetime.utcnow().strftime("%Y%m%dT%H%M%S")
uid = uuid4().hex[:8]
ext = Path(file.filename).suffix or ".bin"
dest_path = UPLOAD_ROOT / f"{ts}_{uid}{ext}"
logger.info("Saving upload to %s", dest_path)
with dest_path.open("wb") as f:
shutil.copyfileobj(file.file, f)
return dest_path


# -------------------------------------------------------------------
# API Endpoints
# -------------------------------------------------------------------

@app.post("/classify/image")
async def enqueue_image_classification(
file: UploadFile = File(...),
vision_model: Optional[str] = Query(
default=None,
description="Override vision model name for this job",
),
original_path: Optional[str] = Query(default=None),
):
if not file.content_type.startswith("image/"):
raise HTTPException(status_code=400, detail="Uploaded file is not an image")

img_path = _save_upload_to_path(file)

job = image_queue.enqueue(
process_image_job,
str(img_path),
vision_model,
original_path,
job_timeout=900,
result_ttl=604800
)

logger.info("Enqueued image job id=%s path=%s model=%s",
job.id, img_path, vision_model or DEFAULT_VISION_MODEL)

return {
"job_id": job.id,
"status": "queued",
"kind": "image",
}


@app.post("/classify/video")
async def enqueue_video_classification(
file: UploadFile = File(...),
vision_model: Optional[str] = Query(
default=None,
description="Override vision model name for this job",
),
agg_model: Optional[str] = Query(
default=None,
description="Override aggregation model name for this job",
),
desired_fps: Optional[float] = Query(
default=None,
description="Frames per second"
),
original_path: Optional[str] = Query(default=None),
):
ct = file.content_type or ""
if not (ct.startswith("video/") or ct == "application/octet-stream"):
raise HTTPException(status_code=400, detail=f"Not a video (content_type={ct})")

tmpdir = Path(tempfile.mkdtemp(prefix="vid_"))
video_path = _save_upload_to_path(file)

job = video_queue.enqueue(
process_video_job,
str(video_path),
vision_model,
agg_model,
desired_fps,
original_path,
job_timeout=86400,
)

logger.info("Enqueued video job id=%s path=%s vision_model=%s agg_model=%s",
job.id, video_path, vision_model or DEFAULT_VISION_MODEL,
agg_model or DEFAULT_AGG_MODEL)

return {
"job_id": job.id,
"status": "queued",
"kind": "video",
}


@app.get("/jobs/{job_id}")
async def get_job_status(job_id: str):
try:
job = Job.fetch(job_id, connection=redis_conn)
except Exception:
import logging
logging.getLogger(__name__).exception("Job.fetch failed")
raise HTTPException(status_code=404, detail="job not found")

status = job.get_status()
logger.info("Job status query id=%s status=%s", job_id, status)

return {
"job_id": job_id,
"status": status,
"result": job.result if job.is_finished else None,
"enqueued_at": job.enqueued_at,
"started_at": job.started_at,
"ended_at": job.ended_at,
}

The worker process

The worker process definition is pretty simple. It only takes jobs and runs them. The code for the jobs themselves are defined in tasks.py and classifier_core.py.

import os
import logging
from redis import Redis
from rq import Worker, Queue

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

VALKEY_PASSWORD = os.getenv("VALKEY_PASSWORD")
REDIS_HOST = os.getenv("REDIS_HOST", "valkey.valkey.svc.cluster.local")
REDIS_DB = int(os.getenv("REDIS_DB", "4"))

logger.info(f"Redis host={REDIS_HOST} db={REDIS_DB} pwd_set={bool(VALKEY_PASSWORD)}")

redis_conn = Redis(
host=REDIS_HOST,
port=6379,
db=REDIS_DB,
password=VALKEY_PASSWORD,
)

listen = ["image_classify", "video_classify"]

if __name__ == "__main__":
queues = [Queue(name, connection=redis_conn) for name in listen]
worker = Worker(queues, connection=redis_conn)
worker.work()

The classifier_core.py simply contains all the previous logic to talk to ollama etc. I include it here for completeness. It also contains some trued and tested prompts that gives me good results with my current models.

import json
import logging
import os
import re
import shutil
import subprocess
import tempfile
import uuid
from collections import Counter
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

import ollama

# -------------------------------------------------------------------
# Logging
# -------------------------------------------------------------------

logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("media-classifier-core")

# -------------------------------------------------------------------
# Config
# -------------------------------------------------------------------

OLLAMA_HOST = os.getenv("OLLAMA_HOST", "http://ollama.engen.priv.no:11434")
ollama.host = OLLAMA_HOST

VISION_MODEL = os.getenv("VISION_MODEL", "qwen2.5vl:7b")
AGG_MODEL = os.getenv("AGG_MODEL", "qwen2.5vl:32b")

DATA_ROOT = Path(os.getenv("DATA_ROOT", "/data/media_classifier"))
FEEDBACK_FILE = DATA_ROOT / "feedback.jsonl"
DATA_ROOT.mkdir(parents=True, exist_ok=True)

# -------------------------------------------------------------------
# Helpers: ffmpeg / fps
# -------------------------------------------------------------------

def get_video_duration_seconds(video_path: Path) -> float:
out = subprocess.check_output([
"ffprobe", "-v", "quiet",
"-show_format", "-print_format", "json",
str(video_path),
])
data = json.loads(out)
return float(data["format"]["duration"])


def choose_fps_with_min_frames(video_path: Path,
desired_fps: float=1.0,
min_frames: int = 15) -> float:
duration = get_video_duration_seconds(video_path)
if duration <= 0:
return desired_fps
est_frames = duration * desired_fps
if est_frames >= min_frames:
return desired_fps
return min_frames / duration



def extract_frames(video_path: Path, out_dir: Path, fps: float) -> None:
out_dir.mkdir(parents=True, exist_ok=True)
cmd = [
"ffmpeg", "-i", str(video_path),
"-vf", f"fps={fps}",
str(out_dir / "frame_%06d.jpg"),
"-hide_banner", "-loglevel", "error",
]
subprocess.run(cmd, check=True)

# -------------------------------------------------------------------
# Helpers: JSON cleaning
# -------------------------------------------------------------------

def _extract_json_block(text: str) -> str:
t = text.strip()
if t.startswith("```"):
t = re.sub(r"^```[a-zA-Z0-9]*\s*", "", t)
t = re.sub(r"\s*```$", "", t)
t = t.strip()
return t


def _normalize_confidence(value: Any) -> float:
if isinstance(value, (int, float)):
return float(value)
if isinstance(value, str):
try:
return float(value)
except ValueError:
return 1.0
if isinstance(value, (list, tuple)) and value:
return _normalize_confidence(value)
return 1.0

def _parse_classifier_response(raw: str) -> Dict[str, Any]:
raw = raw.strip()

# 1) Try full JSON object
try:
cleaned = _extract_json_block(raw)
obj = json.loads(cleaned)
except Exception as e:
logger.warning("Full JSON parse failed: %s; attempting markdown/fragment parse", e)
obj: Dict[str, Any] = {}

# 2a) Try standard JSON-like fields in text
m = re.search(r'"primary_label"\s*:\s*"([^"]+)"', raw)
if m:
obj["primary_label"] = m.group(1).strip()

m = re.search(r'"secondary_labels"\s*:\s*\[(.*?)\]', raw, re.S)
if m:
raw_list = m.group(1)
labels = re.findall(r'"([^"]+)"', raw_list)
obj["secondary_labels"] = labels

m = re.search(r'"confidence"\s*:\s*([0-9.]+)', raw)
if m:
try:
obj["confidence"] = float(m.group(1))
except Exception:
pass

m = re.search(r'"short_description"\s*:\s*"([^"]*)"', raw, re.S)
if m:
obj["short_description"] = m.group(1).strip()

# 2b) If that didn’t work, try markdown headings like your latest output
if "primary_label" not in obj:
m = re.search(r"Primary Label:\s*([^\n]+)", raw, re.I)
if m:
obj["primary_label"] = m.group(1).strip()

if "secondary_labels" not in obj:
m = re.search(r"Secondary Labels:\s*([^\n]+)", raw, re.I)
if m:
labels = [s.strip(" *") for s in m.group(1).split(",") if s.strip()]
# Also handle bullet list form:
if not labels or len(labels) == 1:
bullets = re.findall(r"^\*\s*(.+)$", raw, re.M)
if bullets:
labels = [b.strip() for b in bullets]
obj["secondary_labels"] = labels

if "confidence" not in obj:
m = re.search(r"Confidence:\s*([0-9.]+)", raw, re.I)
if m:
try:
obj["confidence"] = float(m.group(1))
except Exception:
pass

if "short_description" not in obj:
m = re.search(r"Short Description:\s*(.+)", raw, re.I)
if m:
obj["short_description"] = m.group(1).strip()

# 2c) If still nothing, final fallback
if not obj:
return {
"primary_label": "unknown",
"secondary_labels": [],
"confidence": 0.0,
"short_description": raw,
}

# 3) Normalize fields and defaults
primary = obj.get("primary_label") or "unknown"
secondary = obj.get("secondary_labels") or []
if not isinstance(secondary, list):
secondary = [str(secondary)]
try:
conf = float(obj.get("confidence", 0.0))
except Exception:
conf = 0.0
short = obj.get("short_description") or ""

return {
"primary_label": primary,
"secondary_labels": secondary,
"confidence": conf,
"short_description": short,
}


# -------------------------------------------------------------------
# LLaVA / vision model helpers
# -------------------------------------------------------------------

IMAGE_PROMPT = """
You are an image classification assistant.

You must return a single JSON object with this exact structure:

{
"primary_label": "string",
"secondary_labels": ["string", "string", ...],
"confidence": 0.0,
"short_description": "string"
}



First, carefully look at the image and mentally describe everything you see: scene type, important objects, people, activities, setting (indoor/outdoor), time of day, style, and mood. While analyzing the image, explicitly check for any specific, recognizable entities:

* landmarks (for example: Eiffel Tower, Golden Gate Bridge, Times Square)
* famous buildings or monuments
* well‑known mountains, lakes, beaches, or other natural locations
* recognizable city skylines or neighborhoods
* logos or clearly visible brand names
* readable place names on signs

Second, from that mental description, choose a primary label and several secondary labels that best summarize the image.
Third, write a short_description and a confidence score that are consistent with those labels.

If the picture is from a specific city, place or pictures a specific person or landmark, put that as a label and include it in the short description.

Rules for labels:

- Use lowercase for all labels, unless the label is a proper name (for example: "Paris", "Eiffel Tower", "Times Square"; keep their normal capitalization in that case).
- Prefer singular nouns for labels (for example: "car" not "cars", "tree" not "trees", "person" not "people"), unless the concept is inherently plural (for example: "fireworks", "stairs").
- Use short, generic concept words as labels (for example: "city street", "shopping", "dining", "urban", "mountain", "beach").
- Do not invent multiple variants of the same concept (for example: do not mix "shop" and "shops"; pick one form and use it consistently).
- Every important visual concept mentioned in short_description that is clearly visible in the image should appear in primary_label or secondary_labels.
- Prefer single words, not multiple words as labels, unless the label is a concept that is inherently two words, like for example "water skiing" and "mountain top"

If you recognize any specific landmark, famous building, mountain, or location name with reasonable confidence, you must:
* include its name as a tag in primary_label or secondary_labels, and mention it explicitly in short_description.
* Prefer a specific primary_label such as "kiyomizu-dera temple" or "kyoto temple" over generic labels like "mountains" or "landscape" when the specific place is clearly visible.
# Do not guess specific names; if you are unsure, use generic labels like "temple", "shrine", "mountain", or "city skyline" instead.


Use the normal capitalization for proper names (for example: "Mount Fuji", "Eiffel Tower", "Times Square").

Output constraints:

- primary_label must be a single, concise label that best describes the main subject or overall scene, following the label rules above. Do not use "unknown" if you can recognize anything meaningful in the image.
- secondary_labels must be 3–6 labels when the image is understandable. Only leave secondary_labels empty if the image is blank, corrupted, or impossible to interpret. Do not include duplicates or near‑duplicates.
- Always try to include tags for:
- scene type (for example: "city street", "beach", "living room"),
- important objects or people (for example: "person", "car", "building", "dog"),
- activities when visible (for example: "walking", "shopping", "dining"),
- context like location style or time (for example: "urban", "rural", "night", "sunset").

- confidence is a float between 0 and 1 representing how confident you are in the primary_label and the set of tags as a whole. Use lower values (for example: 0.3–0.5) if you are unsure, instead of leaving labels empty.
- short_description must be at most 2 sentences, plain text, no JSON, no markdown. It must be consistent with primary_label and secondary_labels and should not introduce concepts that are not tagged.
- Do not include any extra keys.
- Do not output explanations outside the JSON. Respond with JSON only.

Example for style (do not copy this literally):

{
"primary_label": "city street at night",
"secondary_labels": ["shopping", "dining", "urban"],
"confidence": 0.8,
"short_description": "a bustling city street at night, lined with shops and restaurants"
}
"""

def classify_image_with_model(image_path: Path,
model: Optional[str] = None) -> Dict[str, Any]:
use_model = model or VISION_MODEL
logger.info("Classifying image with model=%s path=%s", use_model, image_path)

with image_path.open("rb") as f:
image_bytes = f.read()

try:
res = ollama.generate(
model=use_model,
prompt=IMAGE_PROMPT,
images=[image_bytes],
keep_alive=0,
options={
"num_predict": 2048, # increase/decrease as needed
"temperature": 0.2,
"top_p": 0.9,
"repeat_penalty": 1.1,
}
)
except Exception as e:
logger.exception("Ollama generate failed for %s: %s", image_path, e)
return {
"primary_label": "error",
"secondary_labels": [],
"confidence": 0.0,
"short_description": f"Ollama error: {e}",
}

raw = res.get("response", "")
logger.debug("Raw vision response for %s: %.200r", image_path, raw)

data = _parse_classifier_response(raw)
return data

# -------------------------------------------------------------------
# Aggregation helpers
# -------------------------------------------------------------------

def aggregate_labels(per_image_results: List[Dict[str, Any]]) -> List[Tuple[str, float]]:
counter: Counter[str] = Counter()
for r in per_image_results:
labels: List[str] = []
if r.get("primary_label"):
labels.append(r["primary_label"])
labels.extend(r.get("secondary_labels") or [])
conf = _normalize_confidence(r.get("confidence", 1.0)) or 1.0
for label in labels:
counter[label] += conf
return counter.most_common()

def aggregate_with_llm(per_image_results: List[Dict[str, Any]],
model: Optional[str] = None) -> Dict[str, Any]:
use_model = model or AGG_MODEL
logger.info("Aggregating %d images with model=%s",
len(per_image_results), use_model)

lines = []
for i, r in enumerate(per_image_results):
labels = []
if r.get("primary_label"):
labels.append(r["primary_label"])
labels.extend(r.get("secondary_labels") or [])
labels_str = ", ".join(labels)
conf = _normalize_confidence(r.get("confidence", 0))
desc = r.get("short_description", "")
lines.append(
f"image={i} labels=[{labels_str}] confidence={conf:.2f} desc={desc}"
)
joined = "\n".join(lines)

prompt = f"""
You are a video classification system.
You are given per-frame analyses from a video, one per line.
Each line contains a frame index, labels, confidence, and a short description.

Frame analyses:
{joined}

Return ONLY a single JSON object with this exact structure:

{{
"primary_label": "string",
"secondary_labels": ["string", "string"],
"confidence": 0.0,
"short_description": "string",
"video_labels": ["string", "string"],
"rationale": "string",
"notable_segments": [
{{
"frame_start": 0,
"frame_end": 0,
"description": "string",
"primary_label": "string",
"secondary_labels": ["string"],
"confidence": 0.0
}}
]
}}

Rules:
- primary_label, secondary_labels, confidence, short_description should summarize the entire video, in the same style as the image classifier.
- confidence is a float between 0 and 1.
- video_labels are 3–10 high-level tags for the video.
- rationale is a short explanation in 1–2 sentences.
- notable_segments can be empty [], or 1–5 segments covering interesting parts of the video.
- Do not output any text outside the JSON.
"""
try:
res = ollama.generate(
model=use_model,
prompt=prompt,
keep_alive=0,
options={"num_ctx": 4096},
)
except Exception as e:
logger.exception("Ollama aggregate failed: %s", e)
return {
"video_labels": [],
"primary_label": "error",
"rationale": f"Ollama error: {e}",
"notable_segments": [],
}

raw = res.get("response", "")
logger.debug("Aggregation raw response: %.200r", raw)

try:
cleaned = _extract_json_block(raw)
obj = json.loads(cleaned)
except json.JSONDecodeError:
logger.warning("Aggregation JSON parse failed, storing raw text")
obj = {"raw": raw}
return obj

# -------------------------------------------------------------------
# Feedback storage
# -------------------------------------------------------------------

def append_feedback_record(obj: dict) -> None:
DATA_ROOT.mkdir(parents=True, exist_ok=True)
with FEEDBACK_FILE.open("a", encoding="utf-8") as f:
f.write(json.dumps(obj) + "\n")

# -------------------------------------------------------------------
# High-level video processing
# -------------------------------------------------------------------

def process_video_path(video_path: Path,
vision_model: Optional[str] = None,
agg_model: Optional[str] = None,
desired_fps: Optional[float] = None,
original_path: str | None = None) -> Dict[str, Any]:
media_id = str(uuid.uuid4())
logger.info("Processing video media_id=%s path=%s", media_id, video_path)

effective_fps = choose_fps_with_min_frames(video_path, desired_fps=desired_fps or 1.0, min_frames=15)
logger.info("Using fps=%.3f for video %s", effective_fps, video_path)

per_image_results: List[Dict[str, Any]] = []

with tempfile.TemporaryDirectory(prefix="frames_") as frames_tmp:
frames_dir = Path(frames_tmp)
extract_frames(video_path, frames_dir, fps=effective_fps)
image_files = sorted(frames_dir.glob("frame_*.jpg"))
logger.info("Extracted %d images from %s", len(image_files), video_path)

for idx, image_path in enumerate(image_files):
logger.info("Analyzing image %d/%d: %s",
idx + 1, len(image_files), image_path.name)
r = classify_image_with_model(image_path, model=vision_model or VISION_MODEL)
per_image_results.append(r)

label_counts = aggregate_labels(per_image_results)
logger.info("Label counts for media_id=%s: %s", media_id, label_counts)

agg_obj = aggregate_with_llm(per_image_results, model=agg_model or AGG_MODEL)
logger.info("Aggregation done for media_id=%s", media_id)

record = {
"ts": datetime.utcnow().isoformat(),
"media_id": media_id,
"type": "video",
"path": str(video_path),
"original_path": original_path,
"frames": len(per_image_results),
"fps": effective_fps,
"label_counts": label_counts,
"per_image_results": per_image_results,
"video_summary": agg_obj,
}
append_feedback_record(record)

return {
"type": "video",
"media_id": media_id,
"original_path": original_path,
"frames": len(per_image_results),
"fps": effective_fps,
"label_counts": label_counts,
"result": agg_obj,
"vision_model": vision_model or VISION_MODEL,
"aggregation_model": agg_model or AGG_MODEL,
}

Running the classifier in Kubernetes

We need to do a few changes in how to run it. First, we need to have a shared storage. Luckily longhorn supports that nicely:

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: media-classifier-volume
namespace: media-ml
spec:
accessModes: [ "ReadWriteMany" ]
storageClasName: longhorn-rwx-ssd
resources:
requests:
storage: 200Gi

This can be mounted in multiple pods that both can read and write from it.

The API POD is run more or less as before but with the new code:

apiVersion: apps/v1
kind: Deployment
metadata:
name: media-classifier
namespace: media-ml
spec:
replicas: 1
selector:
matchLabels:
app: media-classifier
template:
metadata:
labels:
app: media-classifier
spec:
containers:
- name: media-classifier
image: registry.engen.priv.no/ollama-classifier:3.11-slim
imagePullPolicy: Always
ports:
- containerPort: 8000
env:
- name: OLLAMA_HOST
value: "http://ollama.engen.priv.no:11434"
- name: VALKEY_PASSWORD
valueFrom:
secretKeyRef:
name: valkey-password
key: password
- name: REDIS_HOST
value: "valkey.valkey.svc.cluster.local"
- name: REDIS_PORT
value: "6379"
- name: REDIS_DB
value: "4"
volumeMounts:
- name: data
mountPath: /data/media_classifier
volumes:
- name: data
persistentVolumeClaim:
claimName: media-classifier-volume


There is an extra pod for the worker;

apiVersion: apps/v1
kind: Deployment
metadata:
name: media-classifier-worker
namespace: media-ml
spec:
replicas: 1 # increase carefully if your box can handle more parallel jobs
selector:
matchLabels:
app: media-classifier-worker
template:
metadata:
labels:
app: media-classifier-worker
spec:
containers:
- name: media-classifier-worker
image: registry.engen.priv.no/ollama-classifier:3.11-slim
imagePullPolicy: Always
# Run RQ worker instead of uvicorn
command: ["python"]
args: ["worker.py"]
env:
- name: OLLAMA_HOST
value: "http://ollama.engen.priv.no:11434"
- name: VALKEY_PASSWORD
valueFrom:
secretKeyRef:
name: valkey-password
key: password
- name: REDIS_HOST
value: "valkey.valkey.svc.cluster.local"
- name: REDIS_DB
value: "4"
- name: DATA_ROOT
value: "/data/media_classifier"
volumeMounts:
- name: data
mountPath: /data/media_classifier
volumes:
- name: data
persistentVolumeClaim:
claimName: media-classifier-volume

As you can see, they both mount the same volume under /data/media-classifier, and there’s environment variables etc for defining the redis connection.

Using the API

Using the API is pretty simple.

Submitting a job:

vegardengen@MacBookPro media-ml % curl -X POST  https://mc.engen.priv.no/classify/image -F "file=@/Users/vegardengen/Nextcloud/Direkteopplasting/20210529_143504.jpg;type=image/jpeg"
{"job_id":"dc824764-bc10-4db5-9658-16c5456c1400","status":"queued","kind":"image"}%

Checking the status:

vegardengen@MacBookPro media-ml % curl https://mc.engen.priv.no/jobs/dc824764-bc10-4db5-9658-16c5456c1400 
{"job_id":"dc824764-bc10-4db5-9658-16c5456c1400","status":"started","result":null,"enqueued_at":"2026-02-08T14:17:51.392467+00:00","started_at":"2026-02-08T14:18:07.038275+00:00","ended_at":null}%

This job is running. So, when it’s done:

vegardengen@MacBookPro media-ml % curl https://mc.engen.priv.no/jobs/dc824764-bc10-4db5-9658-16c5456c1400
{"job_id":"dc824764-bc10-4db5-9658-16c5456c1400","status":"finished","result":{"type":"image","path":"/data/media_classifier/uploads/20260208T141751_eda2d741.jpg","original_path":null,"result":{"primary_label":"boat ride","secondary_labels":["woman","ocean","navigation"],"confidence":0.9,"short_description":"a woman is driving a boat on a calm ocean, navigating with a GPS device"},"vision_model":"qwen2.5vl:7b"},"enqueued_at":"2026-02-08T14:17:51.392467+00:00","started_at":"2026-02-08T14:18:07.040444+00:00","ended_at":"2026-02-08T14:19:39.023664+00:00"}%

Summary and future improvements

My pipeline has a few rough edges, and the file system fills up so I need to clean it manually.

I also want to make the prompts customizable, as you’d classify west coast swing videos differently than random vacation videos, and you can have a multitude of types of images where you might emphasize different things.

In my next blog post, hower, I’m creating a command line interface wrapping the API, complete with generating XMP files that digikam, my currently preferred video organizing software, can read. It’s already done, so I just need to write it, but I decided it belongs in another blog post.

,

Legg igjen en kommentar

Din e-postadresse vil ikke bli publisert. Obligatoriske felt er merket med *

Dette nettstedet bruker Akismet for å redusere spam. Finn ut mer om hvordan kommentardataene dine behandles.