Skip to main content

Topics and Procedures

Define Kafka topics with type-safe message schemas using kafkaRouter.

Basic Topic Definition

import { init, kafkaRouter } from "@alt-stack/kafka-core";
import { z } from "zod";

const { procedure } = init();

const router = kafkaRouter({
"user-events": procedure
.input({
message: z.object({
userId: z.string(),
eventType: z.string(),
}),
})
.subscribe(({ input }) => {
// input is the validated message
console.log(input.userId);
}),
});

Message Validation

Messages are validated before the handler runs:

const router = kafkaRouter({
orders: procedure
.input({
message: z.object({
orderId: z.string().uuid(),
amount: z.number().positive(),
currency: z.string().length(3),
}),
})
.subscribe(({ input }) => {
// Only called if message passes validation
processOrder(input);
}),
});

Multiple Topics

const router = kafkaRouter({
"user-events": procedure
.input({ message: UserEventSchema })
.subscribe(({ input }) => handleUserEvent(input)),

"order-events": procedure
.input({ message: OrderEventSchema })
.subscribe(({ input }) => handleOrderEvent(input)),
});

Output Validation

Optionally validate handler return values:

const router = kafkaRouter({
"process-data": procedure
.input({
message: z.object({ data: z.string() }),
})
.output(
z.object({
processed: z.boolean(),
result: z.string(),
})
)
.subscribe(({ input }) => ({
processed: true,
result: input.data.toUpperCase(),
})),
});

subscribe vs handler

  • .subscribe() - Use in kafkaRouter({}) config (topic determined by key)
  • .handler() - Creates a pending procedure for manual registration
// Using subscribe (recommended)
const router = kafkaRouter({
"my-topic": procedure.input({ message: Schema }).subscribe(({ input }) => {}),
});

// Using handler for manual registration
const pendingProc = procedure.input({ message: Schema }).handler(({ input }) => {});
const router = createKafkaRouter();
router.registerPendingProcedure("my-topic", pendingProc);