Skip to main content

Optimization Streaming

Get real-time updates on optimization progress using WebSocket streaming. Build responsive UIs and monitor long-running optimization jobs as they execute.

WebSocket Connection

Connect to the WebSocket endpoint and subscribe to job events:
ws://api.mutagent.io/ws/optimization

Protocol

  1. Client connects to the WebSocket endpoint
  2. Client sends a subscribe message with the job ID
  3. Server replays historical events (if afterEventId is provided)
  4. Server streams new events as they occur
  5. Client sends unsubscribe to stop receiving events
// Connect to WebSocket
const ws = new WebSocket('wss://api.mutagent.io/ws/optimization');

ws.onopen = () => {
  // Subscribe to a job
  ws.send(JSON.stringify({
    type: 'subscribe',
    jobId: 'job-uuid-here',
    afterEventId: 0,  // Optional: replay events after this ID
  }));
};

ws.onmessage = (event) => {
  const message = JSON.parse(event.data);

  switch (message.type) {
    case 'subscribed':
      console.log('Subscribed to job:', message.jobId);
      break;

    case 'event':
      handleJobEvent(message.event);
      break;

    case 'replay':
      // Batch of historical events
      message.data.forEach(handleJobEvent);
      break;

    case 'error':
      console.error('Error:', message.error);
      break;
  }
};

function handleJobEvent(event) {
  console.log(`[${event.eventType}] Iteration ${event.iteration} - Stage: ${event.stage}`);
  if (event.data) {
    console.log('  Data:', event.data);
  }
}

// Unsubscribe when done
ws.send(JSON.stringify({ type: 'unsubscribe', jobId: 'job-uuid-here' }));

Message Types

Client Messages

MessageDescriptionFields
subscribeSubscribe to job eventsjobId, afterEventId?
unsubscribeStop receiving eventsjobId
replayRequest historical eventsjobId, afterEventId?
pingKeepalive

Server Messages

MessageDescriptionFields
subscribedSubscription confirmedjobId
unsubscribedUnsubscription confirmedjobId
eventReal-time job eventjobId, event
replayBatch of historical eventsdata[]
pongKeepalive response
errorError messageerror, jobId?

Event Types

Each event object within the event or replay messages contains:
FieldTypeDescription
idnumberEvent ID (for replay tracking)
eventTypestringType of optimization event
iterationnumberCurrent iteration number
stagestringCurrent optimization stage
dataobjectEvent-specific payload
timestampstringISO 8601 timestamp

Optimization Event Types

Event TypeDescriptionData Fields
job_startedJob has begunpromptId, datasetId, config
iteration_completedIteration finishedscore
job_pausedJob was pausedpausedAt, currentIteration
job_resumedJob was resumedresumedAt, resumeFromIteration
job_completedOptimization finishedresultPromptId, finalScore
job_failedOptimization failederror, errorStack, willRetry
job_cancelledJob was cancelledcancelledAt, finalIteration

UI Integration

Build a real-time progress UI using streaming events.

React Example

import { useState, useEffect, useCallback } from 'react';

interface IterationData {
  iteration: number;
  score: number | null;
  stage: string;
}

function OptimizationProgress({ jobId }: { jobId: string }) {
  const [status, setStatus] = useState<string>('connecting');
  const [iterations, setIterations] = useState<IterationData[]>([]);
  const [bestScore, setBestScore] = useState<number>(0);
  const [currentIteration, setCurrentIteration] = useState<number>(0);
  const [error, setError] = useState<string | null>(null);

  const handleEvent = useCallback((event: any) => {
    switch (event.eventType) {
      case 'job_started':
        setStatus('running');
        break;

      case 'iteration_completed':
        setCurrentIteration(event.iteration);
        setIterations(prev => [...prev, {
          iteration: event.iteration,
          score: event.data?.score ?? null,
          stage: event.stage,
        }]);
        if (event.data?.score && event.data.score > bestScore) {
          setBestScore(event.data.score);
        }
        break;

      case 'job_completed':
        setStatus('completed');
        if (event.data?.finalScore) {
          setBestScore(event.data.finalScore);
        }
        break;

      case 'job_failed':
        setStatus('failed');
        setError(event.data?.error || 'Unknown error');
        break;

      case 'job_paused':
        setStatus('paused');
        break;
    }
  }, [bestScore]);

  useEffect(() => {
    const ws = new WebSocket('wss://api.mutagent.io/ws/optimization');

    ws.onopen = () => {
      ws.send(JSON.stringify({ type: 'subscribe', jobId }));
    };

    ws.onmessage = (msg) => {
      const message = JSON.parse(msg.data);
      if (message.type === 'event') {
        handleEvent(message.event);
      } else if (message.type === 'replay') {
        message.data.forEach(handleEvent);
      }
    };

    ws.onerror = () => setStatus('error');
    ws.onclose = () => setStatus('disconnected');

    return () => ws.close();
  }, [jobId, handleEvent]);

  return (
    <div className="optimization-progress">
      <div className="header">
        <h2>Optimization Progress</h2>
        <span className={`status ${status}`}>{status}</span>
      </div>

      <div className="stats">
        <div className="stat">
          <label>Current Iteration</label>
          <span>{currentIteration}</span>
        </div>
        <div className="stat">
          <label>Best Score</label>
          <span>{bestScore.toFixed(2)}</span>
        </div>
      </div>

      {error && <div className="error">{error}</div>}

      <div className="chart">
        {iterations.map((it, i) => (
          <div
            key={i}
            className="bar"
            style={{ height: `${(it.score ?? 0) * 100}%` }}
            title={`Iteration ${it.iteration}: ${it.score?.toFixed(2) ?? 'N/A'}`}
          />
        ))}
      </div>
    </div>
  );
}

export default OptimizationProgress;

Vue Example

<template>
  <div class="optimization-progress">
    <h2>Optimization: {{ status }}</h2>

    <div class="progress-bar">
      <div
        class="fill"
        :style="{ width: `${(currentIteration / maxIterations) * 100}%` }"
      ></div>
    </div>

    <div class="scores">
      <p>Iteration: {{ currentIteration }} / {{ maxIterations }}</p>
      <p>Best Score: {{ bestScore.toFixed(2) }}</p>
    </div>

    <ul class="iterations">
      <li
        v-for="it in iterations"
        :key="it.iteration"
      >
        Iteration {{ it.iteration }}: {{ it.score?.toFixed(2) ?? 'N/A' }}
      </li>
    </ul>
  </div>
</template>

<script setup>
import { ref, onMounted, onUnmounted } from 'vue';

const props = defineProps(['jobId']);

const status = ref('connecting');
const currentIteration = ref(0);
const maxIterations = ref(10);
const bestScore = ref(0);
const iterations = ref([]);

let ws;

function handleEvent(event) {
  if (event.eventType === 'iteration_completed') {
    currentIteration.value = event.iteration;
    iterations.value.push({
      iteration: event.iteration,
      score: event.data?.score ?? null,
    });
    if (event.data?.score > bestScore.value) {
      bestScore.value = event.data.score;
    }
  }
  if (event.eventType === 'job_completed') {
    status.value = 'completed';
  }
  if (event.eventType === 'job_failed') {
    status.value = 'failed';
  }
}

onMounted(() => {
  ws = new WebSocket('wss://api.mutagent.io/ws/optimization');

  ws.onopen = () => {
    ws.send(JSON.stringify({ type: 'subscribe', jobId: props.jobId }));
    status.value = 'running';
  };

  ws.onmessage = (msg) => {
    const message = JSON.parse(msg.data);
    if (message.type === 'event') handleEvent(message.event);
    if (message.type === 'replay') message.data.forEach(handleEvent);
  };

  ws.onerror = () => { status.value = 'error'; };
});

onUnmounted(() => {
  if (ws) ws.close();
});
</script>

Connection Management

Reconnection with Event Replay

Use afterEventId to resume from the last received event after reconnection:
let lastEventId = 0;

function connect(jobId: string) {
  const ws = new WebSocket('wss://api.mutagent.io/ws/optimization');

  ws.onopen = () => {
    ws.send(JSON.stringify({
      type: 'subscribe',
      jobId,
      afterEventId: lastEventId,  // Replay missed events
    }));
  };

  ws.onmessage = (msg) => {
    const message = JSON.parse(msg.data);
    if (message.type === 'event' && message.event.id) {
      lastEventId = message.event.id;
    }
    if (message.type === 'replay') {
      message.data.forEach(event => {
        if (event.id) lastEventId = event.id;
      });
    }
  };

  ws.onclose = () => {
    // Auto-reconnect after delay
    setTimeout(() => connect(jobId), 2000);
  };

  return ws;
}

Keepalive

Send periodic ping messages to prevent connection timeout:
const keepalive = setInterval(() => {
  if (ws.readyState === WebSocket.OPEN) {
    ws.send(JSON.stringify({ type: 'ping' }));
  }
}, 30000);

// Clean up
clearInterval(keepalive);

Error Handling

Handle various error scenarios:
ws.onmessage = (msg) => {
  const message = JSON.parse(msg.data);

  if (message.type === 'error') {
    switch (message.error) {
      case 'Job not found':
        console.error('Job does not exist:', message.jobId);
        break;
      case 'Not subscribed to job':
        console.error('Must subscribe before requesting replay');
        break;
      default:
        console.error('WebSocket error:', message.error);
    }
  }
};

Best Practices

Always close the WebSocket connection and clear keepalive intervals when the component unmounts or the job completes.
Store the last received event ID so you can resume from the correct position after reconnection without missing or duplicating events.
If receiving many events quickly (especially during replay), consider batching state updates to avoid excessive re-renders.
For long-running optimizations, consider persisting the best result locally in case of browser refresh.