Middleware
Add cross-cutting concerns like logging, metrics, or authentication.
Basic Middleware
const { procedure } = init();
const loggedProcedure = procedure.use(async ({ ctx, next }) => {
console.log(`Processing message from ${ctx.topic}`);
const result = await next();
console.log(`Completed processing`);
return result;
});
const router = kafkaRouter({
events: loggedProcedure
.input({ message: EventSchema })
.subscribe(({ input }) => {
// Logging middleware runs before/after this
}),
});
Context Extension
Middleware can add properties to context:
const authMiddleware = procedure.use(async ({ ctx, next }) => {
const user = await getUserFromMessage(ctx.message);
return next({ ctx: { user } });
});
const router = kafkaRouter({
"protected-events": authMiddleware
.input({ message: EventSchema })
.subscribe(({ input, ctx }) => {
// ctx.user is available and typed
console.log(`User: ${ctx.user.name}`);
}),
});
Reusable Middleware with createMiddleware
import { createMiddleware } from "@alt-stack/kafka-core";
interface AppContext {
logger: Logger;
}
const metricsMiddleware = createMiddleware<AppContext>()(async ({ ctx, next }) => {
const start = Date.now();
const result = await next();
const duration = Date.now() - start;
metrics.recordDuration(ctx.topic, duration);
return result;
});
Chaining Middleware
const { procedure } = init<AppContext>();
const protectedProcedure = procedure
.use(loggingMiddleware)
.use(authMiddleware)
.use(metricsMiddleware);
// All three middleware run in order
const router = kafkaRouter({
events: protectedProcedure
.input({ message: EventSchema })
.subscribe(({ input, ctx }) => {}),
});
Piping Middleware Builders
const authChain = createMiddleware<AppContext>()
.pipe(validateSession)
.pipe(loadUser);
// Use the chain
const protectedProcedure = procedure.use(authChain);