As a developer, I've worked extensively with fullstack frameworks like Laravel and Phoenix Framework, both of which offer robust queue worker systems—Laravel Queue in Laravel and Oban in Phoenix. However, when building applications with Next.js or other JavaScript frameworks, I often found myself missing this essential feature.
Thankfully, Supabase recently introduced pgmq (a PostgreSQL-based message queue) and pg_cron integration into their platform. This release makes it possible to implement queue workers seamlessly.
In this post, I'll walk you through how to use Supabase as a queue worker backend by leveraging pgmq, Supabase Cron, and Edge Functions.
On many apps, there are situations where we need to send onboarding emails to users who have just signed up. (For email verification, Supabase Auth already handles this seamlessly.) For example, if a user signed up last week but has not yet created a product on our app, we might want to send a follow-up email with step-by-step guidance to help them get started.
Email Queue Worker
The main reason for putting emails on a queue is that some email providers enforce rate limits, and certain hosting providers (like Vercel) have execution timeouts (with the free tier only allowing a 10-second timeout). By putting email tasks in a queue, we can process them in the background without impacting the app's performance or user experience.
We will create an email queue worker using pgmq, pg_cron, and Supabase Edge Functions, which have a longer timeout limit (150 seconds on the free plan). You can find more details about the limits here.
Project Setup
- Enable Queue and Cron Integration Go to the Supabase dashboard and navigate to the integrations page. Enable the Queue and Cron integrations for your project.
If you are enabling this on an older Supabase project, you may need to upgrade your Supabase instance.
This integration requires the pgmq extension, which is only available on PostgreSQL version 15.6.1.143 and higher.
- Upgrade Your Supabase Instance If your project is running on an older PostgreSQL version, you will see a message prompting you to upgrade:
This integration requires the pgmq extension which is not available on this version of Postgres. The extension is available on version 15.6.1.143 and higher.
Simply click Upgrade and wait a few minutes. Once the upgrade is complete, you will be able to enable the Queue integration.
With the setup complete, we are now ready to implement our email queue worker.
Worker Function
Create a Supabase Edge Function using the Supabase CLI:
npx supabase functions new mail-worker
This command will generate a new file at ./supabase/functions/mail-worker/index.ts
in your project directory. This file will serve as the foundation for our email queue worker.
Here is an example of the mail-worker
function:
import { createClient } from "jsr:@supabase/supabase-js@2";
Deno.serve(async (req) => {
const supabase = createClient(
Deno.env.get("SUPABASE_URL") ?? "",
Deno.env.get("SUPABASE_SERVICE_ROLE_KEY") ?? "",
{ db: { schema: "pgmq_public" } },
);
const queue_name = "emails";
const { data, error } = await supabase.rpc("read", {
queue_name,
sleep_seconds: 60, // mark message invisible for 60 seconds
n: 1, // number of messages to read
});
for (const q of data) {
// Example processing queue message data
await fetch("https://sandbox.api.mailtrap.io/api/send/22222", {
method: "POST",
headers: {
Authorization: `Bearer YOUR_MAILTRAP_API_KEY`,
"Content-Type": "application/json",
},
body: JSON.stringify({
from: { name: "Supabase Worker", email: "system@yourapp.com" },
to: [{ email: q.message.to }],
subject: q.message.subject,
text: q.message.content,
}),
})
.then((r) => r.text())
.then((r) => console.log("send result", r));
const del = await supabase.rpc("delete", {
queue_name,
message_id: q.msg_id,
});
}
return new Response("ok", {
headers: { "Content-Type": "application/json" },
});
});
Deploying Your Function
To deploy your function, use the Supabase CLI with the following command:
npx supabase functions deploy mail-worker --project-ref <your-supabase-project-ref>
Once the deployment is successful, your function will be listed on the Supabase dashboard. You can view and manage your deployed functions at:Supabase Functions Dashboard
Disabling JWT Verification
To allow the function to be triggered by pg_cron
without requiring user authentication, you’ll need to disable JWT verification:
- Select your deployed function from the dashboard.
- Scroll down to the "Security" section and toggle off "Enforce JWT Verification".
This step ensures that the function can be triggered by pg_cron
without needing user information.
Setup Queue
The next step is to create a mail queue:
- Go to Queue Settings and enable "Expose Queues via PostgREST" so that we can interact with it using the Supabase client SDK.
- Navigate to the Queue Dashboard.
- Click "Create a Queue" and provide a name, such as "emails".
- Select "Basic" for the queue type, then click "Create Queue".
Setup Cron
Next, set up a cron job to trigger the function:
- Go to the Cron Jobs Dashboard.
- Click "Create Job".
- Set the schedule (e.g., "30 seconds") or use the
cron
format. - For Type, select "Edge Function".
- Choose POST as the method.
- Select the Edge Function: "mail-worker" or your deployed function name.
- Set the timeout to "5000" (in milliseconds), or adjust to a higher value if your job requires more time.
- Click "Create Cron Job".
Put Message to Queue
To put message into queue from your app you can initialize supabase client with custom db schema:
const queue = createClient(
Deno.env.get("SUPABASE_URL") ?? "",
Deno.env.get("SUPABASE_SERVICE_ROLE_KEY") ?? "",
{ db: { schema: "pgmq_public" } },
);
And then call send
rpc call with queue name and message payload
await queue.rpc("send", {
queue_name: "emails",
message: {
to: "testing@local.dev",
subject: "Hello World",
content: "Hello World",
},
});
Function Breakdown
The main idea behind this function is to handle queue processing in a way that ensures each message is only processed once at a time. Here's how it works:
Read from the Queue: The function starts by reading a message from the queue. It retrieves the next available item for processing.
Mark as Invisible: Once the message is fetched, the function marks it as "invisible" for a certain period. This ensures that no other worker can process the same message while it's being worked on. This temporary invisibility prevents duplicate processing and ensures that only one worker handles each message at a time.
Process the Queue Message: The main task of the function is to process the message data. In this case, the function sends an email using a mail provider. For this demo, Mailtrap is used as the email provider. The email content and recipient details are extracted from the message data, and the function sends the email.
Success Case: If the email is sent successfully, the function proceeds to delete the message from the queue. This marks the message as fully processed, and it is removed from the list of pending tasks.
Failure Case: If an error occurs while sending the email (for example, if the email provider fails), the function throws an error. In this case, the message is not deleted from the queue. Instead, it becomes available again for other workers to retry, ensuring that failed messages can be retried without being lost.
This approach ensures reliable and fault-tolerant processing of queue messages, where each task is only handled once, and failed tasks can be retried until successful.
Top comments (0)