Skip to main content

OpenTelemetry Integration

Distributed tracing and timing metrics for WarpStream workers using OpenTelemetry.

Installation

pnpm add @opentelemetry/api @opentelemetry/sdk-node @opentelemetry/exporter-trace-otlp-http

Tracing

Quick Start

import { createWorker } from "@alt-stack/workers-warpstream";
import { jobRouter } from "./jobs";

const worker = await createWorker(jobRouter, {
kafka: { brokers: ["warpstream.example.com:9092"] },
groupId: "my-workers",
telemetry: true,
});

Configuration

const worker = await createWorker(jobRouter, {
kafka: { brokers: ["..."] },
groupId: "my-workers",
telemetry: {
enabled: true,
serviceName: "email-worker",
ignoreJobs: ["health-check", "metrics-poll"],
},
});
OptionTypeDefaultDescription
enabledbooleanfalseEnable/disable tracing
serviceNamestring"altstack-worker"Service name for traces
ignoreJobsstring[][]Job names to exclude from tracing

Span Attributes

Each job creates a span with these attributes:

AttributeExampleDescription
job.namesend-welcome-emailJob name from router
job.idtopic-0-12345Unique job identifier
job.attempt1Attempt number
job.statussuccessFinal status (success/error/retry)

Custom Attributes

Access the span via ctx.span:

"process-order": procedure
.input({ payload: z.object({ orderId: z.string() }) })
.task(async ({ input, ctx }) => {
ctx.span?.setAttribute("order.id", input.orderId);
ctx.span?.addEvent("order.validated");

// Process order...

ctx.span?.addEvent("order.completed");
return ok({ processed: true });
}),

Metrics

Quick Start

const worker = await createWorker(jobRouter, {
kafka: { brokers: ["warpstream.example.com:9092"] },
groupId: "my-workers",
metrics: true,
});

Configuration

const worker = await createWorker(jobRouter, {
kafka: { brokers: ["..."] },
groupId: "my-workers",
metrics: {
enabled: true,
serviceName: "email-worker",
ignoreJobs: ["health-check"],
histogramBuckets: [10, 50, 100, 500, 1000, 5000],
},
});
OptionTypeDefaultDescription
enabledbooleanfalseEnable/disable metrics
serviceNamestring"altstack-worker"Meter name
ignoreJobsstring[][]Job names to exclude from metrics
histogramBucketsnumber[][10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000]Histogram bucket boundaries (ms)

Metrics Recorded

MetricTypeDescription
messaging.process.queue_time_msHistogramTime from job creation to processing start (ms)
messaging.process.duration_msHistogramJob handler execution duration (ms)
messaging.process.e2e_time_msHistogramEnd-to-end time from creation to completion (ms)

All metrics include these attributes:

  • job.name: The job name
  • job.status: "success" or "error" (for duration and e2e_time)

How Timing is Calculated

Timeline:

[Job Created] ---queue_time---> [Processing Starts] ---duration---> [Processing Ends]
|<-------------------------- e2e_time -------------------------->|

Queue Time - Recorded when message is received:

const createdAt = parseInt(message.headers["x-created-at"]);
const queueTimeMs = Date.now() - createdAt;

Processing Time - Recorded when handler completes:

const startTime = Date.now();
await handler(job);
const durationMs = Date.now() - startTime;

E2E Time - Recorded when handler completes:

const createdAt = parseInt(message.headers["x-created-at"]);
await handler(job);
const e2eTimeMs = Date.now() - createdAt;

Retries and Timing

Kafka/WarpStream handles retries at the consumer level. When a message fails:

ScenarioQueue TimeDurationE2E Time
First attempt failsRecordedRecorded with status: "error"Recorded with status: "error"
Redelivered messageIncludes original wait + retry delayFresh measurementIncludes all time since creation

Important: The x-created-at header persists across redeliveries, so:

  • Queue time accumulates (original wait + time in DLQ/retry topic)
  • E2E time reflects total time from original job creation
  • Duration is always the current attempt's execution time
// On retry, queue_time grows:
// Attempt 1: queue_time = 100ms (original)
// Attempt 2: queue_time = 5100ms (100ms + 5s retry delay)

// Duration is per-attempt:
// Attempt 1: duration = 50ms (failed)
// Attempt 2: duration = 45ms (succeeded)

// E2E includes everything:
// Attempt 2: e2e_time = 5145ms (queue + all durations)

How Queue Time Works

Queue time is measured using the x-created-at header automatically added by job clients. Both createJobClient and createWarpStreamClient add this header when enqueuing jobs.

If you're using a custom producer, add the header manually:

const message = {
value: JSON.stringify(payload),
headers: {
"x-created-at": Date.now().toString(),
},
};

Using Both Tracing and Metrics

Enable both for comprehensive observability:

const worker = await createWorker(jobRouter, {
kafka: { brokers: ["warpstream.example.com:9092"] },
groupId: "my-workers",
telemetry: true, // Spans
metrics: true, // Histograms
});

OTel SDK Setup

Create tracing.ts and import it first in your entry point:

// tracing.ts
import { NodeSDK } from "@opentelemetry/sdk-node";
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http";
import { OTLPMetricExporter } from "@opentelemetry/exporter-metrics-otlp-http";
import { PeriodicExportingMetricReader } from "@opentelemetry/sdk-metrics";

const sdk = new NodeSDK({
traceExporter: new OTLPTraceExporter({
url: "http://localhost:4318/v1/traces",
}),
metricReader: new PeriodicExportingMetricReader({
exporter: new OTLPMetricExporter({
url: "http://localhost:4318/v1/metrics",
}),
}),
});

sdk.start();

process.on("SIGTERM", () => sdk.shutdown().then(() => process.exit(0)));
// index.ts
import "./tracing.js"; // Must be first!
import { createWorker } from "@alt-stack/workers-warpstream";

Local Development with Jaeger

docker run -d --name jaeger \
-p 16686:16686 \
-p 4318:4318 \
jaegertracing/all-in-one:latest

View traces at http://localhost:16686

Graceful Degradation

If @opentelemetry/api is not installed, telemetry is silently disabled. ctx.span will be undefined, so always use optional chaining:

ctx.span?.setAttribute("key", "value");  // Safe even without OTel
ctx.span?.addEvent("processing.started"); // No-op if undefined

Metrics recording functions are also safe to call - they become no-ops if the OTel API is not available.