Prerequisites:
- Ruby installed on your system
- Postgres installed and running
- Redis server installed and running (default port: 6379)
- Bundler gem installed (
gem install bundler
)
Step 1: Setup the Project
1.1 Create a new project directory and navigate to it:
Copy code
mkdir outbox_sinatra_example
cd outbox_sinatra_example
1.2 Create a Gemfile to manage dependencies:
source 'https://rubygems.org'
gem 'sinatra'
gem 'pg'
gem 'sidekiq'
1.3 Install the required gems using Bundler:
bundle install
Step 2: Create the Outbox Table
2.1 Access your Postgres database
Assuming you have a database named 'your_database', using psql or any other PostgreSQL client.
2.2 Create the Outbox table:
In the psql shell do below, this will create a table for oubox messages;
CREATE TABLE outbox (
id SERIAL PRIMARY KEY,
message_type VARCHAR(255),
message_body TEXT,
created_at TIMESTAMPTZ DEFAULT NOW(),
processed_at TIMESTAMPTZ
);
Step 3: Implement the Sinatra Endpoint and OutboxWorker
3.1 Create a new file named app.rb in the project directory:
# app.rb
require 'sinatra'
require 'pg'
require 'sidekiq'
require 'json'
# Configure Sidekiq with Redis URL
Sidekiq.configure_server do |config|
config.redis = { url: 'redis://localhost:6379/0' }
end
class OutboxWorker
include Sidekiq::Worker
sidekiq_options retry: 5 # Set the number of retries on failure
def perform(message_id)
conn = PG.connect(dbname: 'your_database')
message = conn.exec_params('SELECT * FROM outbox WHERE id = $1', [message_id]).first
return unless message
# Ensure idempotency - check if the message has already been processed
return if message['processed_at']
begin
# Process the message here (e.g., send the message to the appropriate destination)
# ...
# Mark the message as processed and record the processing time
conn.exec_params(
'UPDATE outbox SET processed_at = $1 WHERE id = $2',
[Time.now, message_id]
)
rescue StandardError => e
# If an error occurs, log it and let Sidekiq retry the job
puts "Error processing message (id=#{message['id']}): #{e.message}"
raise e
end
end
end
post '/create-message' do
content_type :json
data = JSON.parse(request.body.read)
message_type = data['message_type']
message_body = data['message_body']
conn = PG.connect(dbname: 'your_database')
result = conn.exec_params(
'INSERT INTO outbox (message_type, message_body) VALUES ($1, $2) RETURNING id',
[message_type, message_body]
).first
# Enqueue the message for background processing
OutboxWorker.perform_async(result['id']) if result
status 201
{ message: 'Message created and enqueued for processing' }.to_json
end
Step 4: Set Up the config.ru File
4.1 Create a config.ru file to define the application for use with Rack:
# config.ru
require './app'
run Sinatra::Application
Step 5: Start the Application
5.1 Start Sidekiq worker in a separate terminal window:
bundle exec sidekiq -r ./app.rb
5.2 Start the Sinatra application using the Rack server:
bundle exec rackup
Step 6: Testing the Outbox Endpoint
Use a tool like curl or Postman to send a POST request to create a new message:
curl -X POST -H "Content-Type: application/json" -d '{"message_type":"notification","message_body":"Hello, World!"}' http://localhost:9292/create-message
Step 7: Verify Message Processing
Check the logs in the terminal running Sidekiq to see the message processing:
[INFO] 2023-08-07T00:00:00.000Z: [OutboxWorker] start
Processing message: 1, Type: notification, Body: Hello, World!
[INFO] 2023-08-07T00:00:00.000Z: [OutboxWorker] done: 3.123 sec
With these changes, the OutboxWorker now handles retries on failure using Sidekiq's retry mechanism. If a processing error occurs, the job will be retried up to 5 times (or any number you specify in sidekiq_options) before being moved to the Sidekiq dead queue.
Additionally, we've introduced idempotency in the worker by checking if the message has already been processed before attempting to process it again. If the processed_at
field in the Outbox table is already set, the worker will skip processing the message again, ensuring that the same message isn't processed multiple times.
Keep in mind that idempotency also depends on the processing logic of the actual message handler, so ensure that your message processing code is idempotent as well.
Top comments (0)