Example middleware v2.0.0+
The following examples show how you might use middleware in some real-world scenarios.
- Common actions for every function
- E2E Encryption
- Logging
- Prisma in function context
- Sentry error reporting and tracing
Common actions for every function
You likely reuse the same steps across many functions - whether it be fetching user data or sending an email, your app is hopefully full of reusable blocks of code.
We could add some middleware to pass these into any Inngest function, automatically wrapping them in step.run()
and allowing the code inside our function to feel a little bit cleaner.
/**
* Pass to a client to provide a set of actions as steps to all functions, or to
* a function to provide a set of actions as steps only to that function.
*/
const inngest = new Inngest({
id: "my-app",
middleware: [
createActionsMiddleware({
getUser(id: string) {
return db.user.get(id);
},
}),
],
});
inngest.createFunction(
{ id: "user-data-dump" },
{ event: "app/data.requested" },
async ({ event, action: { getUser } }) => {
// The first parameter is the step's options or ID
const user = await getUser("get-user-details", event.data.userId);
}
);
import { InngestMiddleware, StepOptionsOrId } from "inngest";
/**
* Create a middleware that wraps a set of functions in step tooling, allowing
* them to be invoked directly instead of using `step.run()`.
*
* This is useful for providing a set of common actions to a particular function
* or to all functions created by a client.
*/
export const createActionsMiddleware = <T extends Actions>(rawActions: T) => {
return new InngestMiddleware({
name: "Inngest: Actions",
init: () => {
return {
onFunctionRun: () => {
return {
transformInput: ({ ctx: { step } }) => {
const action: FilterActions<T> = Object.entries(
rawActions
).reduce((acc, [key, value]) => {
if (typeof value !== "function") {
return acc;
}
const action = (
idOrOptions: StepOptionsOrId,
...args: unknown[]
) => {
return step.run(idOrOptions, () => value(...args));
};
return {
...acc,
[key]: action,
};
}, {} as FilterActions<T>);
return {
ctx: { action },
};
},
};
},
};
},
});
};
type Actions = Record<string, unknown>;
/**
* Filter out all keys from `T` where the associated value does not match type
* `U`.
*/
type KeysNotOfType<T, U> = {
[P in keyof T]: T[P] extends U ? never : P;
}[keyof T];
/**
* Given a set of generic objects, extract any top-level functions and
* appropriately shim their types.
*
* We use this type to allow users to spread a set of functions into the
* middleware without having to worry about non-function properties.
*/
type FilterActions<Fns extends Record<string, any>> = {
[K in keyof Omit<Fns, KeysNotOfType<Fns, (...args: any[]) => any>>]: (
idOrOptions: StepOptionsOrId,
...args: Parameters<Fns[K]>
) => Promise<Awaited<ReturnType<Fns[K]>>>;
};
E2E Encryption
Inngest helps memoize state between steps within a function, but you may want to encrypt this, ensuring plaintext data never leaves your server.
⚠️ If you encrypt your step data and lose your encryption key, you'll lose access to all encrypted state. Be careful!
const inngest = new Inngest({
id: "my-app",
middleware: [stepEncryptionMiddleware()],
});
inngest.createFunction(
{ id: "example-function" },
{ event: "app/user.created" },
async ({ event, step }) => {
/**
* The return value of `db.get()` - and therefore the value of `user` is now
* silently encrypted and decrypted by the middleware; no plain-text step
* data leaves your server or is stored in Inngest Cloud.
*/
const user = await step.run("get-user", () =>
db.get("user", event.data.userId)
);
}
);
This example's "encryption" is just stringifying and reversing the value - in practice you'll want to replace this with your own method using something like node:crypto
.
const encryptionMarker = "__ENCRYPTED__";
type EncryptedValue = { [encryptionMarker]: true; data: string };
export const encryptionMiddleware = (
key: string = process.env.INNGEST_ENCRYPTION_KEY as string
) => {
if (!key) {
throw new Error("Missing INNGEST_ENCRYPTION_KEY environment variable");
}
// Some internal functions that we'll use to encrypt and decrypt values.
// In practice, you'll want to use the `key` passed in to handle encryption
// properly.
const isEncryptedValue = (value: unknown): value is EncryptedValue => {
return (
typeof value === "object" &&
value !== null &&
encryptionMarker in value &&
value[encryptionMarker] === true &&
"data" in value &&
typeof value["data"] === "string"
);
};
const encrypt = (value: unknown): EncryptedValue => {
return {
[encryptionMarker]: true,
data: JSON.stringify(value).split("").reverse().join(""),
};
};
const decrypt = <T>(value: T): T => {
if (isEncryptedValue(value)) {
return JSON.parse(value.data.split("").reverse().join("")) as T;
}
return value;
};
return new InngestMiddleware({
name: "Step Encryption Middleware",
init: () => ({
onSendEvent: () => ({
transformInput: ({ payloads }) => ({
payloads: payloads.map((payload) => ({
...payload,
data: payload.data && encrypt(payload.data),
})),
}),
}),
onFunctionRun: () => ({
transformInput: ({ ctx, steps }) => ({
steps: steps.map((step) => ({
...step,
data: step.data && decrypt(step.data),
})),
}),
transformOutput: (ctx) => {
if (!ctx.step) {
return;
}
return {
result: {
data: ctx.result.data && encrypt(ctx.result.data),
},
};
},
}),
}),
});
};
We could expand this middleware to transform event data entering and leaving the SDK. Be aware that, unlike step data, event data is much more commonly shared between systems; think about if you need to also encrypt your event data before doing so.
new InngestMiddleware({
name: "Full Encryption Middleware",
init: () => ({
onSendEvent: () => ({
transformInput: ({ payloads }) => ({
payloads: payloads.map((payload) => ({
...payload,
data: payload.data && encrypt(payload.data),
})),
}),
}),
onFunctionRun: () => ({
transformInput: ({ ctx, steps }) => ({
steps: steps.map((step) => ({
...step,
data: step.data && decrypt(step.data),
})),
ctx: {
event: ctx.event && {
...ctx.event,
data: ctx.event.data && decrypt(ctx.event.data),
},
events:
ctx.events &&
ctx.events?.map((event) => ({
...event,
data: event.data && decrypt(event.data),
})),
} as {},
}),
transformOutput: (ctx) => {
if (!ctx.step) {
return;
}
return {
result: {
data: ctx.result.data && encrypt(ctx.result.data),
},
};
},
}),
}),
});
Logging
The following shows you how you can create a logger middleware and customize it to your needs.
It is based on the built-in logger middleware in the SDK, and hope it gives you an idea of what you can do if the built-in logger doesn't meet your needs.
new InngestMiddleware({
name: "Inngest: Logger",
init({ client }) {
return {
onFunctionRun(arg) {
const { ctx } = arg;
const metadata = {
runID: ctx.runId,
eventName: ctx.event.name,
functionName: arg.fn.name,
};
let providedLogger: Logger = client["logger"];
// create a child logger if the provided logger has child logger implementation
try {
if ("child" in providedLogger) {
type ChildLoggerFn = (
metadata: Record<string, unknown>
) => Logger;
providedLogger = (providedLogger.child as ChildLoggerFn)(metadata)
}
} catch (err) {
console.error('failed to create "childLogger" with error: ', err);
// no-op
}
const logger = new ProxyLogger(providedLogger);
return {
transformInput() {
return {
ctx: {
/**
* The passed in logger from the user.
* Defaults to a console logger if not provided.
*/
logger,
},
};
},
beforeExecution() {
logger.enable();
},
transformOutput({ result: { error } }) {
if (error) {
logger.error(error);
}
},
async beforeResponse() {
await logger.flush();
},
};
},
};
},
})
Prisma in function context
The following is an example of adding a Prisma client to all Inngest functions, allowing them immediate access without needing to create the client themselves.
While this example uses Prisma, it serves as a good example of using the onFunctionRun -> input hook to mutate function input to perform crucial setup for your functions and keep them to just business logic.
💡 Types are inferred from middleware outputs, so your Inngest functions will see an appropriately-typed prisma
property in their input.
inngest.createFunction(
{ name: "Example" },
{ event: "app/user.loggedin" },
async ({ prisma }) => {
await prisma.auditTrail.create(/* ... */);
}
);
import { PrismaClient } from "@prisma/client";
const prismaMiddleware = new InngestMiddleware({
name: "Prisma Middleware",
init() {
const prisma = new PrismaClient();
return {
onFunctionRun(ctx) {
return {
transformInput(ctx) {
return {
// Anything passed via `ctx` will be merged with the function's arguments
ctx: {
prisma,
},
};
},
};
},
};
},
});
Check out Common actions for every function to see how this technique can be used to create steps for all of your unique logic.
Sentry error reporting and tracing
This example uses Sentry to:
- Capture exceptions for reporting
- Add tracing to each function run
- Include useful context for each exception and trace like function ID and event names
import * as Sentry from "@sentry/node";
const sentryMiddleware = new InngestMiddleware({
name: "Sentry Middleware",
init({ client }) {
// Initialize Sentry as soon as possible, creating a hub
Sentry.init({ dsn: "..." });
// Set up some tags that will be applied to all events
Sentry.setTag("inngest.client.name", client.name);
return {
onFunctionRun({ ctx, fn }) {
// Add specific context for the given function run
Sentry.setTags({
"inngest.function.id": fn.id(client.name),
"inngest.function.name": fn.name,
"inngest.event": ctx.event.name,
"inngest.run.id": ctx.runId,
});
// Start a transaction for this run
const transaction = Sentry.startTransaction({
name: "Inngest Function Run",
op: "run",
data: ctx.event,
});
let memoSpan: Sentry.Span;
let execSpan: Sentry.Span;
return {
transformInput() {
return {
ctx: {
// Add the Sentry client to the input arg so our
// functions can use it directly too
sentry: Sentry.getCurrentHub(),
},
};
},
beforeMemoization() {
// Track different spans for memoization and execution
memoSpan = transaction.startChild({ op: "memoization" });
},
afterMemoization() {
memoSpan.finish();
},
beforeExecution() {
execSpan = transaction.startChild({ op: "execution" });
},
afterExecution() {
execSpan.finish();
},
transformOutput({ result, step }) {
// Capture step output and log errors
if (step) {
Sentry.setTags({
"inngest.step.name": step.name,
"inngest.step.op": step.op,
});
if (result.error) {
Sentry.captureException(result.error);
}
}
},
async beforeResponse() {
// Finish the transaction and flush data to Sentry before the
// request closes
transaction.finish();
await Sentry.flush();
},
};
},
};
},
});