Skip to main content

Quickstart

Consumer

import { init, kafkaRouter, createConsumer } from "@alt-stack/kafka-core";
import { Kafka } from "kafkajs";
import { z } from "zod";

const UserEventSchema = z.object({
userId: z.string(),
eventType: z.enum(["created", "updated", "deleted"]),
timestamp: z.number(),
});

const { procedure } = init();

const router = kafkaRouter({
"user-events": procedure
.input({ message: UserEventSchema })
.subscribe(({ input, ctx }) => {
console.log(`Event: ${input.eventType} for user ${input.userId}`);
}),
});

const consumer = await createConsumer(router, {
kafka: new Kafka({
clientId: "my-consumer",
brokers: ["localhost:9092"],
}),
groupId: "my-consumer-group",
});

Producer (using Kafka Client)

For producers, use the Kafka client packages with AsyncAPI-generated types:

// 1. Generate types: npx zod-asyncapi asyncapi.json -o ./generated-types.ts
import { Topics } from "./generated-types";
import { createKafkaClient } from "@alt-stack/kafka-client-kafkajs";

const client = await createKafkaClient({
kafka: { brokers: ["localhost:9092"], clientId: "my-producer" },
topics: Topics,
});

// Type-safe sending
await client.send("user-events", {
userId: "user-123",
eventType: "created",
timestamp: Date.now(),
});

Or with WarpStream:

import { Topics } from "./generated-types";
import { createWarpStreamClient } from "@alt-stack/kafka-client-warpstream";

const client = await createWarpStreamClient({
bootstrapServer: "my-cluster.warpstream.com:9092",
topics: Topics,
});

await client.send("user-events", {
userId: "user-123",
eventType: "created",
timestamp: Date.now(),
});