Producers
Create type-safe Kafka producers using AsyncAPI-generated types.
Installation
# Core + KafkaJS binding
pnpm add @alt-stack/kafka-client-core @alt-stack/kafka-client-kafkajs kafkajs zod
# Or for WarpStream
pnpm add @alt-stack/kafka-client-core @alt-stack/kafka-client-warpstream kafkajs zod
Basic Setup
Use types generated from your AsyncAPI spec with zod-asyncapi:
// 1. Generate types from AsyncAPI spec
// npx zod-asyncapi asyncapi.json -o ./generated-types.ts
// 2. Import generated types
import { Topics } from "./generated-types";
import { createKafkaClient } from "@alt-stack/kafka-client-kafkajs";
// 3. Create type-safe client
const client = await createKafkaClient({
kafka: { brokers: ["localhost:9092"], clientId: "my-producer" },
topics: Topics,
});
// 4. Send messages with full type safety
await client.send("user-events", {
userId: "user-123",
eventType: "created",
timestamp: Date.now(),
});
KafkaJS Client
import { Topics } from "./generated-types";
import { createKafkaClient } from "@alt-stack/kafka-client-kafkajs";
const client = await createKafkaClient({
kafka: {
brokers: ["localhost:9092"],
clientId: "my-app",
ssl: true,
sasl: { mechanism: "plain", username: "user", password: "pass" },
},
topics: Topics,
producerConfig: {
allowAutoTopicCreation: false,
},
onError: (error) => console.error("Producer error:", error),
});
WarpStream Client
Optimized for WarpStream with recommended defaults (LZ4 compression, extended timeouts):
import { Topics } from "./generated-types";
import { createWarpStreamClient } from "@alt-stack/kafka-client-warpstream";
const client = await createWarpStreamClient({
bootstrapServer: "my-cluster.warpstream.com:9092",
topics: Topics,
clientId: "my-producer",
});
Type-Safe Sending
// TypeScript enforces valid topics and message shapes
await client.send("user-events", {
userId: "user-123",
eventType: "created",
timestamp: Date.now(),
});
// Type error: "invalid-topic" doesn't exist
await client.send("invalid-topic", { data: "test" });
// Type error: missing required field
await client.send("user-events", { userId: "123" });