TLDR; Use a standard .NET 8+ host for your microservices running for example in AKS without any Azure WebJobs SDK syntactic sugar or similar fluff/magic1.
Introduction
We started originally now almost 5 years ago with .NET/F# Azure Functions apps, and we had pretty bad experiences with that setup, the biggest issues being low node app density, high costs and too much bloat from MS frameworks.
Soon after that we migrated to AKS (managed Kubernetes on Azure similar to EKS and GKE), but to keep migration efforts reasonable we re-used the WebJobs SDK, which is a building stone for the Azure Functions SDK. More than 2 years ago I wrote an article about the app stub we were using.
Everything was running relatively fine until recently when we had to improve our local development experience, and we needed a way to change the low-level workings of some of our code related to Azure Event Hubs and similar. That was the trigger for re-visiting the usage of WebJobs SDK and actually migrating away from, which is the topic of this article.
When it comes to what Azure WebJobs SDK triggers we were using, I believe we are nothing special and like many others we have the following in place:
- Pub-Sub message bus, in our case Azure Event Hubs by means of
EventHubTrigger
- Queues, in our case Azure Storage Queues by means of
QueueTrigger
- Time-triggered jobs by means of
TimerTrigger
- Websockets, in our case Azure SignalR by means of
SignalRTrigger
Additionally, we use various other IHostedService/BackgroundService processes, which get started upon app start and are running in the background, doing some other work - e.g. listening to notifications from the database (change streams), aggregating something every x seconds, etc.
All of the above is using configuration based on simple environment variables, some of them pointing to a secrets store (in our case Azure Key Vault). There is of course some logging and telemetry sent to a cloud service (in our case Azure Application Insights), as well as some web/REST/HTTP API (in our case using barebone ASP.NET Core SDK).
All XyzTriggers
above have been migrated away from the Azure WebJobs SDK to pretty small, sweet and (almost) fully in our control implementations using directly the Azure SDK, which will be explain in the sections below. But before we do that let's look at how the app stub looks like now.
App Stub v2
// Program.fs
module TestService.XyzHandling.Program
open System.Threading
open Microsoft.Extensions.Hosting
open Framework.Hosting
open Framework.AzureKeyVault.Environment
open TestService.XyzHandling.Api.Wiring
Environment.overwriteEnvironmentVariablesFromKVRef () |> Async.RunSynchronously
[<EntryPoint>]
let main argv =
let builder =
HostBuilder.createDefaultBuilder argv BackgroundServiceExceptionBehavior.StopHost
|> HostBuilder.configureLogging
|> HostBuilder.configureAppInsights
|> HostBuilder.configureEventHubProcessors
EnvVars.appName
[ EventHubProcessors.eventHubProcessor1; EventHubProcessors.eventHubProcessor2 ]
|> HostBuilder.configureQueueProcessors
EnvVars.appName
([ Some QueueProcessors.queueProcessor1 ] @ [ QueueProcessors.queueProcessor2 ]
|> List.choose id)
|> HostBuilder.configureBackgroundServices
EnvVars.appName
[ BackgroundServices.eventHubPublisherTest
BackgroundServices.queuePublisherTest ]
|> HostBuilder.configureTimers EnvVars.appName [ Timers.timerProcessor1; Timers.timerProcessor2 ]
|> HostBuilder.configureStartup Startup.startupFunctions
|> HostBuilder.configureWebHost
EnvVars.appName
[ WebApi.checkHealth; WebApi.checkReadiness; WebApi.Entity.getById ]
use tokenSource = new CancellationTokenSource()
use host = builder.Build()
host.RunAsync(tokenSource.Token) |> Async.AwaitTask |> Async.RunSynchronously
0 // return an integer exit code
Notes:
- The above code spawns in total 9 IHosted/Background services (incl. the standard web host)
- Event Hub Processors listen to & process Azure Event Hub events
- Queue Processors listen to & process Azure Storage queue messages
- Timer Processors run stuff every x seconds, minutes, hours etc.
- There are even some generic Background Services started for writing to an event hub and a queue
- One can see at a glance all the running processes and even web api endpoints from the
Program.fs
file
Trigger Implementations
The trigger implementations below are all based on BackgroundService/IHostedService
, which means a background task is spawned and is running the whole time the host itself is running.
Compared to the original implementations in Azure Webjobs SDK the below ones may have less functionality (no auto-scaling for Azure Functions or similar) but at the same time are very lean and much easier to understand/maintain.
The source code of a working sample application will be eventually made available here.
Event Hub Processor
Old code:
// Api.Wiring.fs, handle the event
type WebJobs(...) =
[<FunctionName("HandleXyzEvent")>]
member _.HandleXyzEvent
(
[<EventHubTrigger("",
Connection = EnvVars.EventHubs.Xyz.connectionStringKey,
ConsumerGroup = EnvVars.EventHubs.consumerGroup)>] msg: EventData,
enqueuedTimeUtc: DateTime,
sequenceNumber: Int64,
offset: string,
logger: ILogger
)
=
// handle the event
// Program.fs, configure the host with web jobs
let configureWebJobs (builder:IHostBuilder) =
builder.ConfigureWebJobs(fun b ->
b.AddAzureStorageCoreServices() |> ignore
b.AddEventHubs() |> ignore)
Notes:
- There is some magic going on, because in Program.fs you do not say what you actually want to listen to, you just say "I want to enable Event Hubs Handling", and then you decorate some method of a class with attributes, which indicate that you want to listen to an event hub
- The EventHubTrigger insists on getting a key to a connection string environment variable, and fetch the value when it decides ...
- You seem to need to specify also a FunctionName in addition to the method name
- Any configuration settings for the EventHubTrigger are hidden away, you need to know what environment variables to configure, which are picked "automatigally" by the framework ;)
New code:
// Api.Functions.fs, handle the event
let processEvent log partitionId (cancellationToken: CancellationToken) event =
// handle the event
() |> Async.retn
// Api.Wiring.fs, instantiate the processor
let eventHubProcessor1 =
EventHubProcessorDef.create
"EventHubProcessor1"
EnvVars.appName
EnvVars.EventHubs.checkpointStorageConnectionString
EnvVars.EventHubs.eventHubConnectionString
EnvVars.EventHubs.consumerGroup
EnvVars.EventHubs.eventBatchMaximumCount
EnvVars.EventHubs.assignedPartitionIds
EnvVars.EventHubs.defaultStartingPosition
EventHubHandlers.processEvent
// Program.fs, configure the host with the processors
|> HostBuilder.configureEventHubProcessors
EnvVars.appName
[ EventHubProcessors.eventHubProcessor1; EventHubProcessors.eventHubProcessor2 ]
Event Hub Processor Implementation:
module Framework.AzureEventHubs.EventProcessing
open System
open System.Threading.Tasks
open System.Collections.Generic
open System.Threading
open Azure.Messaging.EventHubs.Consumer
open Azure.Storage.Blobs
open Azure.Messaging.EventHubs
open Azure.Messaging.EventHubs.Primitives
open Framework
open Framework.AzureEventHubs.LogEvents
open Framework.Logging.StructuredLog
// https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/eventhub/Azure.Messaging.EventHubs/samples/Sample08_CustomEventProcessor.md
// Event Processor which considers assigned partitions
type AssignablePartitionProcessor
(
log: Log,
name: string,
storageClient: BlobContainerClient,
assignedPartitions: string[] option,
eventBatchMaximumCount: int,
consumerGroup: string,
connectionString: string,
clientOptions: EventProcessorOptions,
processEvent: Log -> string -> CancellationToken -> EventData -> Async<unit>,
processError: Log -> string option -> string -> CancellationToken -> Exception -> Async<unit>
)
=
inherit
PluggableCheckpointStoreEventProcessor<EventProcessorPartition>(
BlobCheckpointStore(storageClient),
eventBatchMaximumCount,
consumerGroup,
connectionString,
clientOptions
)
// Workaround, see https://github.com/dotnet/fsharp/issues/12448 ...
member this.BaseListPartitionIdsAsync
(
connection: EventHubConnection,
cancellationToken: CancellationToken
)
: Task<string[]>
=
base.ListPartitionIdsAsync(connection, cancellationToken)
override this.ListPartitionIdsAsync
(
connection: EventHubConnection,
cancellationToken: CancellationToken
)
: Task<string[]>
=
match assignedPartitions with
| Some assignedPartitions -> assignedPartitions |> Task.FromResult
| None -> this.BaseListPartitionIdsAsync(connection, cancellationToken)
// Workaround, see https://github.com/dotnet/fsharp/issues/12448 ...
member this.BaseListOwnershipAsync
(cancellationToken: CancellationToken)
: Task<IEnumerable<EventProcessorPartitionOwnership>>
=
base.ListOwnershipAsync(cancellationToken)
override this.ListOwnershipAsync
(cancellationToken: CancellationToken)
: Task<IEnumerable<EventProcessorPartitionOwnership>>
=
match assignedPartitions with
| Some assignedPartitions ->
assignedPartitions
|> Seq.map (fun partition ->
EventProcessorPartitionOwnership(
FullyQualifiedNamespace = this.FullyQualifiedNamespace,
EventHubName = this.EventHubName,
ConsumerGroup = this.ConsumerGroup,
PartitionId = partition,
OwnerIdentifier = this.Identifier,
LastModifiedTime = DateTimeOffset.UtcNow
))
|> Task.FromResult
| None -> this.BaseListOwnershipAsync(cancellationToken)
// Workaround, see https://github.com/dotnet/fsharp/issues/12448 ...
member this.BaseClaimOwnershipAsync
(
desiredOwnership: IEnumerable<EventProcessorPartitionOwnership>,
cancellationToken: CancellationToken
)
: Task<IEnumerable<EventProcessorPartitionOwnership>>
=
base.ClaimOwnershipAsync(desiredOwnership, cancellationToken)
override this.ClaimOwnershipAsync
(
desiredOwnership: IEnumerable<EventProcessorPartitionOwnership>,
cancellationToken: CancellationToken
)
: Task<IEnumerable<EventProcessorPartitionOwnership>>
=
// Warning: if the match is removed, and only the code in the Some part is left => High CPU utilization if no assignedPartitions defined!
// for more info see https://github.com/Azure/azure-sdk-for-net/issues/39603
match assignedPartitions with
| Some _ ->
desiredOwnership
|> Seq.iter (fun ownership -> ownership.LastModifiedTime <- DateTimeOffset.UtcNow)
desiredOwnership |> Task.FromResult
| None -> this.BaseClaimOwnershipAsync(desiredOwnership, cancellationToken)
// Workaround, see https://github.com/dotnet/fsharp/issues/12448 ...
member this.BaseUpdateCheckpointAsync
(
partitionId: string,
offset: int64,
sequenceNumber: Nullable<int64>,
cancellationToken: CancellationToken
)
=
base.UpdateCheckpointAsync(partitionId, offset, sequenceNumber, cancellationToken)
// used in the OnProcessingEventBatchAsync member, calculate once
member private this.EventHubFullName =
Subscribing.createEventHubFullPath this.FullyQualifiedNamespace this.EventHubName this.ConsumerGroup
// https://learn.microsoft.com/en-us/dotnet/api/azure.messaging.eventhubs.eventprocessorclient.onprocessingeventbatchasync?view=azure-dotnet#remarks
override this.OnProcessingEventBatchAsync
(
events: IEnumerable<EventData>,
partition: EventProcessorPartition,
cancellationToken: CancellationToken
)
: Task
=
task {
try
if not (isNull events || events |> Seq.isEmpty) then
do!
events
|> Seq.map (fun event ->
async {
Subscribing.checkEventEnDequeueTime log this.EventHubFullName event
do! processEvent log partition.PartitionId cancellationToken event
})
|> Async.Sequential
|> Async.Ignore
|> Async.StartAsTask
:> Task
let lastEvent = events |> Seq.last
do!
this.BaseUpdateCheckpointAsync(
partition.PartitionId,
lastEvent.Offset,
lastEvent.SequenceNumber,
cancellationToken
)
with ex ->
// It is very important that you always guard against exceptions in
// your handler code; the processor does not have enough
// understanding of your code to determine the correct action to take.
// Any exceptions from your handlers go uncaught by the processor and
// will NOT be redirected to the error handler.
//
// In this case, the partition processing task will fault and be restarted
// from the last recorded checkpoint.
log.Exception
(int EventId.EventProcessorError, string EventId.EventProcessorError)
"OnProcessingEventBatchAsync: Exception while processing events: {ex}"
ex
[| ex |]
// bubble up, which will kill the background service and result in Health Check alert
// alternative is to log and "swallow" the exception here, but then the processor will go in infinite loop ...
// NOT a good idea, raising exception here invokes OnProcessingErrorAsync, which causes the host to get restarted automatically, and the health check does not detect this ..
// ex.Reraise()
}
// https://learn.microsoft.com/en-us/dotnet/api/azure.messaging.eventhubs.eventprocessorclient.onprocessingerrorasync?view=azure-dotnet#remarks
override this.OnProcessingErrorAsync
(
ex: Exception,
partition: EventProcessorPartition,
operationDescription: string,
cancellationToken: CancellationToken
)
: Task
=
task {
try
let partitionId = partition |> Option.ofObj |> Option.map _.PartitionId
do!
processError log partitionId operationDescription cancellationToken ex
|> Async.StartAsTask
with wex ->
// It is very important that you always guard against exceptions
// in your handler code; the processor does not have enough
// understanding of your code to determine the correct action to
// take. Any exceptions from your handlers go uncaught by the
// processor and will NOT be handled in any way.
//
// In this case, unhandled exceptions will not impact the processor
// operation but will go unobserved, hiding potential application problems.
log.Exception
(int EventId.EventProcessorError, string EventId.EventProcessorError)
"OnProcessingErrorAsync: Exception occurred while processing events: {wex}. Original exception: {ex}."
wex
[| wex; ex |]
// do! this.StopProcessingAsync(cancellationToken)
// bubble up, which will kill the background service and result in Health Check alert
// alternative is to log and "swallow" the exception here, but then the processor will go in infinite loop ...
// NOT a good idea, the host gets restarted automatically, and the health check does not detect this ..
// ex.Reraise()
}
/// Starts the Event Processor
let startConsumeEvents
(name: string)
(checkpointStorageConnectionString: string)
(checkpointBlobContainerName: string)
(eventHubConnectionString: string)
(consumerGroup: string)
(eventBatchMaximumCount: int)
(assignedPartitionIds: string[] option)
(defaultStartingPosition: EventPosition)
(processEvent: Log -> string -> CancellationToken -> EventData -> Async<unit>)
(processError: Log -> string option -> string -> CancellationToken -> Exception -> Async<unit>)
(started: ManualResetEvent)
(log: Log)
: (IDictionary<string, obj> -> CancellationToken -> Async<unit>)
=
fun state cancellationToken ->
async {
let blobContainerClient =
BlobContainerClient(checkpointStorageConnectionString, checkpointBlobContainerName)
// automatically create container if it does not exist
do! blobContainerClient.CreateIfNotExistsAsync() |> Async.AwaitTask |> Async.Ignore
let options = EventProcessorOptions() // TODO: Customize some of them?
options.DefaultStartingPosition <- defaultStartingPosition
let processor =
AssignablePartitionProcessor(
log,
name,
blobContainerClient,
assignedPartitionIds,
eventBatchMaximumCount,
consumerGroup,
eventHubConnectionString,
options,
processEvent,
processError
)
state.Add("processor", processor)
log.Info
(int EventId.EventProcessorStarted, string EventId.EventProcessorStarted)
"Starting with config:\n\
\tEventHubNamespace/Name/ConsumerGroup = {eventHubNamespace}/{eventHubName}/{consumerGroup}\n\
\tBlobContainerClient.Uri = {blobContainerClientUri}\n\
\tAssignedPartitionIds = {assignedPartitionIds}\n\
\tEventBatchMaximumCount = {eventBatchMaximumCount}\n\
\tOptions.PrefetchCount = {prefetchCount}\n\
\tOptions.PrefetchSizeInBytes = {prefetchSizeInBytes}\n\
\tOptions.MaximumWaitTime = {maximumWaitTime}\n\
\tOptions.TrackLastEnqueuedEventProperties = {trackLastEnqueuedEventProperties}\n\
\tOptions.DefaultStartingPosition = {defaultStartingPosition}\n\
\tOptions.LoadBalancingStrategy = {loadBalancingStrategy}\n\
\tOptions.LoadBalancingUpdateInterval = {loadBalancingUpdateInterval}\n\
\tOptions.PartitionOwnershipExpirationInterval = {partitionOwnershipExpirationInterval}\n\
\tOptions.RetryOptions.Mode = {retryOptionsMode}\n\
\tOptions.RetryOptions.Delay = {retryOptionsDelay}\n\
\tOptions.RetryOptions.MaximumDelay = {retryOptionsMaximumDelay}\n\
\tOptions.RetryOptions.MaximumRetries = {retryOptionsMaximumRetries}\n\
\tOptions.RetryOptions.TryTimeout = {retryOptionsTryTimeout}\n\
\tOptions.RetryOptions.CustomRetryPolicy = {retryOptionsCustomRetryPolicy}\n\
"
[|
(processor.FullyQualifiedNamespace |> String.replace ".servicebus.windows.net" "")
processor.EventHubName
processor.ConsumerGroup
blobContainerClient.Uri
$"%A{assignedPartitionIds}"
eventBatchMaximumCount
options.PrefetchCount
options.PrefetchSizeInBytes
options.MaximumWaitTime
options.TrackLastEnqueuedEventProperties
options.DefaultStartingPosition
options.LoadBalancingStrategy
options.LoadBalancingUpdateInterval
options.PartitionOwnershipExpirationInterval
options.RetryOptions.Mode
options.RetryOptions.Delay
options.RetryOptions.MaximumDelay
options.RetryOptions.MaximumRetries
options.RetryOptions.TryTimeout
options.RetryOptions.CustomRetryPolicy
|]
do! processor.StartProcessingAsync(cancellationToken) |> Async.AwaitTask
started.Set() |> ignore
}
/// Stops the Event Processor
let stopConsumeEvents
name
(started: ManualResetEvent)
(log: Log)
(state: IDictionary<string, obj>)
: (CancellationToken -> Async<unit>)
=
fun cancellationToken ->
async {
let processor = state["processor"] :?> AssignablePartitionProcessor
do! processor.StopProcessingAsync(cancellationToken) |> Async.AwaitTask
log.Info
(int EventId.EventProcessorStopped, string EventId.EventProcessorStopped)
"Event Hub Processor was stopped"
[||]
started.Reset() |> ignore
}
Notes:
- You define a function, then instantiate processor(s) with the function and a bunch of configuration values, which you can fetch by yourself, and then you tell the HostBuilder to configure your processor(s) - pretty straightforward, no reflection, no magic
- The implementation of the EventHubProcessor is using EventProcessorClient in the background, which does everything required, including the same checkpointing in blob storage as done by WebJobs SDK. The whole implementation is less than 350 LOCs ..
Queue Processor
Old code:
type WebJobs(...) =
[<FunctionName("RetryHandleXyzEvent")>]
member _.RetryHandleXyzEvent
([<QueueTrigger("xyz-events-retry-queue", Connection = "StorageQueueConnectionStringKey")>] msg: string)
(logger: ILogger)
=
// handle the message
// Program.fs, configure the host with web jobs
let configureWebJobs (builder:IHostBuilder) =
builder.ConfigureWebJobs(fun b ->
b.AddAzureStorageCoreServices() |> ignore
b.AddAzureStorageQueues() |> ignore)
New code:
// Api.Functions.fs, handle the event
let processMessage log (cancellationToken: CancellationToken) (msg: QueueMessage) : Async<unit> =
// handle the queue message
() |> Async.retn
// Api.Wiring.fs, instantiate the processor
let queueProcessor1 =
QueueProcessorDef.create
"QueueProcessor1"
EnvVars.appName
EnvVars.Queues.queueStorageConnectionString
EnvVars.Queues.queueName
EnvVars.Queues.messageBatchMaximumCount
EnvVars.Queues.visibilityTimeout
EnvVars.Queues.maxPollingInterval
EnvVars.Queues.maxDequeueCount
EnvVars.Queues.defaultBackOffIntervalMs
QueueHandlers.processMessage
// Program.fs, configure the host with the processors
|> HostBuilder.configureQueueProcessors
EnvVars.appName
[ QueueProcessors.queueProcessor1 ]
Queue Processor Implementation:
module Framework.AzureStorageQueues.QueueProcessing
open System
open System.Threading
open Azure.Storage.Queues
open Azure.Storage.Queues.Models
open Framework.AzureStorageQueues.BasicOperations
open Framework.AzureStorageQueues.LogEvents
open Framework.Logging.StructuredLog
open Framework.ExceptionHandling
let private doProcessMessage
(log: Log)
(name: string)
(cancellationToken: CancellationToken)
(maxDequeueCount: int)
(queueClient: QueueClient)
(poisonQueueClient: QueueClient)
(processMessage: Log -> CancellationToken -> QueueMessage -> Async<unit>)
(msg: QueueMessage)
=
async {
if msg.DequeueCount > maxDequeueCount then
// message has been retried too many times => move it the the poison queue
do!
poisonQueueClient.SendMessageAsync(msg.Body, cancellationToken = cancellationToken)
|> Async.AwaitTask
|> Async.Ignore
log.Debug
(int EventId.QueueMessageProcessingError, string EventId.QueueMessageProcessingError)
"Message dequeue count = {messageDequeueCount} > maxDequeueCount = {maxDequeueCount} => message moved to poison queue {poisonQueueUri}"
[| msg.DequeueCount; maxDequeueCount; poisonQueueClient.Uri |]
do!
queueClient.DeleteMessageAsync(msg.MessageId, msg.PopReceipt, cancellationToken)
|> Async.AwaitTask
|> Async.Ignore
log.Debug
(int EventId.QueueMessageProcessingError, string EventId.QueueMessageProcessingError)
"Message deleted from queue {queueUri}"
[| queueClient.Uri |]
else
// normal processing
// NOTE: try-with is used instead of Async.Catch to catch also cases when processMessage throws exception outside of an Async block ...
try
do! processMessage log cancellationToken msg
do!
queueClient.DeleteMessageAsync(msg.MessageId, msg.PopReceipt, cancellationToken)
|> Async.AwaitTask
|> Async.Ignore
log.Debug
(int EventId.QueueMessageBeingProcessed, string EventId.QueueMessageBeingProcessed)
"Deleted message with id = {messageId} and dequeue count = {messageDequeueCount} in queue {queueUri} after successful processing"
[| msg.MessageId; msg.DequeueCount; queueClient.Uri |]
with ex ->
// message remains in the queue and will become again visible after visibilityTimeout
log.Exception
(int EventId.QueueMessageProcessingError, string EventId.QueueMessageProcessingError)
"Queue message processing failed. Retrying {remainingDequeueCount} more times, then the message will be moved to poison queue. Queue: {queueUri}; Message Body: {messageBody}"
ex
[|
(int64 maxDequeueCount - msg.DequeueCount)
queueClient.Uri
msg |> QueueMessage.toString
|]
}
|> Async.Catch
|> Async.map (function
| Choice1Of2 _ -> ()
| Choice2Of2 ex ->
// log the message body
log.Exception
(int EventId.QueueMessageProcessingError, string EventId.QueueMessageProcessingError)
"Error occurred while performing auxiliary queue message processing (e.g. DeleteMessageAsync). Message Body: {messageBody}"
ex
[| msg |> QueueMessage.toString |]
// and propagate up the exception
ex.Reraise())
/// Starts an infinite loop for receiving queue messages
let consumeMessages
(name: string)
(storageConnectionString: string)
(queueName: string)
(poisonQueueName: string)
(messageBatchMaximumCount: int)
(visibilityTimeout: TimeSpan)
(maxPollingInterval: TimeSpan)
(maxDequeueCount: int)
(defaultBackOffIntervalMs: int)
(processMessage: Log -> CancellationToken -> QueueMessage -> Async<unit>)
(started: ManualResetEvent)
(log: Log)
: (CancellationToken -> Async<unit>)
=
fun cancellationToken ->
async {
let options = QueueClientOptions() // TODO: Configure options?
let queueClient = QueueClient(storageConnectionString, queueName, options)
do! queueClient.CreateIfNotExistsAsync() |> Async.AwaitTask |> Async.Ignore
let poisonQueueClient =
QueueClient(storageConnectionString, poisonQueueName, options)
do! poisonQueueClient.CreateIfNotExistsAsync() |> Async.AwaitTask |> Async.Ignore
let mutable backoffTimeMs = defaultBackOffIntervalMs
started.Set() |> ignore
log.Info
(int EventId.QueueProcessorStarting, string EventId.QueueProcessorStarting)
"Starting with config:\n\
\tUri = {queueClientUri}\n\
\tMessageBatchMaximumCount = {messageBatchMaximumCount}\n\
\tVisibilityTimeout = {visibilityTimeout}\n\
\tMaxPollingInterval = {maxPollingInterval}\n\
\tMaxDequeueCount = {maxDequeueCount}\n\
\tOptions.MessageEncoding = {messageEncoding}\n\
\tOptions.Retry.Mode = {retryMode}\n\
\tOptions.Retry.Delay = {retryDelay}\n\
\tOptions.Retry.MaxDelay = {retryMaxDelay}\n\
\tOptions.Retry.MaxRetries = {retryMaxRetries}\n\
\tOptions.Retry.NetworkTimeout = {retryNetworkTimeout}\n\
\tPoisonQueueName = {poisonQueueName}\n\
"
[|
queueClient.Uri
messageBatchMaximumCount
visibilityTimeout
maxPollingInterval
maxDequeueCount
options.MessageEncoding
options.Retry.Mode
options.Retry.Delay
options.Retry.MaxDelay
options.Retry.MaxRetries
options.Retry.NetworkTimeout
poisonQueueName
|]
while not cancellationToken.IsCancellationRequested do
try
// any exception in this block will be caught and logged
// because if propagated up they will stop the queue processor/background service => health check alert,
// but no automatic restart is currently possible ...
let! response =
queueClient.ReceiveMessagesAsync(
maxMessages = messageBatchMaximumCount,
visibilityTimeout = visibilityTimeout,
cancellationToken = cancellationToken
)
|> Async.AwaitTask
if response.HasValue && response.Value.Length > 0 then
do!
response.Value
|> Seq.map (fun msg ->
async {
// central check for endequeTime
// TODO: Enable this once a proper config/solution is found for the invisibility period, which is *not* exposed as a QueueMessage property (only InsertedOn and NextVisibleOn, but VisibleOn is needed ). See fore more info https://github.com/Azure/azure-sdk-for-net/issues/40147
// do checkMessageEnDequeueTime log queueClient.Name backoffTimeMs msg
return!
doProcessMessage
log
name
cancellationToken
maxDequeueCount
queueClient
poisonQueueClient
processMessage
msg
})
|> Async.Sequential
|> Async.Ignore
backoffTimeMs <- defaultBackOffIntervalMs
log.Debug
(int EventId.WaitingForQueueMessages, string EventId.WaitingForQueueMessages)
"{messageCount} messages successfully processed from queue {queueUri}. Waiting for {backoffTimeMs} milliseconds before checking again ..."
[| response.Value.Length; queueClient.Uri; backoffTimeMs |]
else
backoffTimeMs <- Math.Min(backoffTimeMs * 2, int maxPollingInterval.TotalMilliseconds)
log.Debug
(int EventId.WaitingForQueueMessages, string EventId.WaitingForQueueMessages)
"No messages found in queue {queueUri}. Waiting for {backoffTimeMs} milliseconds before checking again ..."
[| queueClient.Uri; backoffTimeMs |]
do! Async.Sleep(backoffTimeMs)
with ex ->
log.Exception
(int EventId.QueueMessageProcessingError, string EventId.QueueMessageProcessingError)
"Exception while processing messages: {ex}"
ex
[| ex |]
}
Notes:
- The implementation is based on the default QueueClient.ReceiveMessagesAsync approach of handling queue messages with the Azure SDK, invoked in an infinite loop with some Thread.Sleep sprinkled in it ... The whole implementation is about 200 LOCs.
Timer Processor
Old code:
type WebJobs(...) =
[<FunctionName("DoSomethingRegularly")>]
member this.ExpireCustomerDocuments
(
[<TimerTrigger("%DoSomethingRegularlyCrontab%")>] timer: TimerInfo,
logger: ILogger
)
=
// do something
// Program.fs, configure the host with web jobs
let configureWebJobs (builder:IHostBuilder) =
builder.ConfigureWebJobs(fun b ->
b.AddAzureStorageCoreServices() |> ignore
b.AddTimers() |> ignore)
Notes:
- Some exotic placeholder format of the crontab placeholder ..
New code:
// Api.Functions.fs, handle the event
let processTimer1 log (cancellationToken: CancellationToken) (toProcessOn: DateTime) : Async<unit> =
// do something
() |> Async.retn
// Api.Wiring.fs, instantiate the processor
let timerProcessor1 =
TimerProcessorDef.create
"TimerProcessor1"
EnvVars.appName
EnvVars.Timers.timerProcessorQueueStorageConnectionString
(TimeSpan.FromSeconds(10) |> Some)
"* * * * *"
TimerHandlers.processTimer1
// Program.fs, configure the host with the processors
|> HostBuilder.configureTimers EnvVars.appName [ Timers.timerProcessor1; Timers.timerProcessor2 ]
Timer Processor Implementation:
module Framework.AzureStorageQueues.TimerProcessing
open System
open System.Text
open System.Threading
open Azure.Storage.Queues
open NCrontab
open Framework
open Framework.AzureStorageQueues.LogEvents
open Framework.Logging.StructuredLog
type TimerMessage = { ToProcessOn: DateTime }
let private maxTimeoutPeriod = TimeSpan.FromDays(3) // could be up to 7 (messages are deleted after 7 days from the queue), but calculating possible downtimes/system recovery
let private calculateNextCheckOn (toProcessOn: DateTime) (now: DateTime) =
if toProcessOn - now < maxTimeoutPeriod then
toProcessOn
else
now + maxTimeoutPeriod
let private createAndSendNextTimerMessage (queueClient: QueueClient) (crontab: CrontabSchedule) =
async {
let toProcessOn = crontab.GetNextOccurrence(DateTime.UtcNow)
let msg = { ToProcessOn = toProcessOn }
let encodedMessage = msg |> Json.serialize |> Encoding.base64Encode Encoding.UTF8
let nextCheckOn = calculateNextCheckOn toProcessOn DateTime.UtcNow
let visibilityTimeout = nextCheckOn - DateTime.UtcNow // message should become visible always slightly after ToProcessOn because sending the msg to the queues takes some ms
do!
queueClient.SendMessageAsync(encodedMessage, visibilityTimeout, TimeSpan.FromSeconds(-1)) // -1 second indicates "infinite" message TTL (i.e. 7 days)
|> Async.AwaitTask
|> Async.Ignore
return msg
}
/// Starts an infinite loop for receiving queue timer messages
let private consumeMessages
(log: Log)
(name: string)
(queueClient: QueueClient)
(cancellationToken: CancellationToken)
(crontab: CrontabSchedule)
processMessage
(messageBatchMaximumCount: int)
(visibilityTimeout: TimeSpan)
=
async {
let mutable firstRun = true
while not cancellationToken.IsCancellationRequested do
try
// any exception in this block will be caught and logged
// because if propagated up they will stop the timer processor/background service => health check alert,
// but no automatic restart is currently possible ...
let! response =
queueClient.ReceiveMessagesAsync(
maxMessages = messageBatchMaximumCount,
visibilityTimeout = visibilityTimeout,
cancellationToken = cancellationToken
)
|> Async.AwaitTask
let! nextCheckOn =
if response.HasValue && (response.Value |> Seq.tryHead |> Option.isSome) then // the message became visible, so it needs to be processed or rescheduled
async {
let msg = response.Value |> Seq.head // we expect/process only 1 timer message per timer queue!
let decodedMessage =
msg.Body.ToString() |> Encoding.base64Decode Encoding.UTF8 |> Json.deserialize
log.Debug
(int EventId.TimerMessageBeingProcessed, string EventId.TimerMessageBeingProcessed)
"Timer message received in queue {queueUri} with ToProcessOn = {toProcessOn}."
[| queueClient.Uri; decodedMessage.ToProcessOn |> DateTime.toStringIso |]
let! toProcessOn =
// if ToProcessOn in the past => ready to process!
if decodedMessage.ToProcessOn <= DateTime.UtcNow then
async {
do! processMessage log cancellationToken decodedMessage.ToProcessOn
log.Debug
(int EventId.TimerMessageBeingProcessed,
string EventId.TimerMessageBeingProcessed)
"Timer message in queue {queueUri} successfully processed."
[| queueClient.Uri |]
// delete this (all) message(s)
do! queueClient.ClearMessagesAsync() |> Async.AwaitTask |> Async.Ignore
log.Debug
(int EventId.TimerMessageBeingProcessed,
string EventId.TimerMessageBeingProcessed)
"Deleted timer message in queue {queueUri}."
[| queueClient.Uri |]
// schedule next execution in a new message
let! newTimerMessage = createAndSendNextTimerMessage queueClient crontab
log.Debug
(int EventId.TimerMessageBeingProcessed,
string EventId.TimerMessageBeingProcessed)
"New timer message with ToProcessOn = {toProcessOn} sent to queue {queueUri}."
[| newTimerMessage.ToProcessOn; queueClient.Uri |]
return newTimerMessage.ToProcessOn
}
else // message visible before ToProcessOn .. make invisible again
async {
let nextCheckOn =
calculateNextCheckOn decodedMessage.ToProcessOn DateTime.UtcNow // message should become visible always slightly after ToProcessOn because sending the msg to the queues takes some ms
let visibilityTimeout = nextCheckOn - DateTime.UtcNow
if visibilityTimeout > TimeSpan.Zero then
do!
queueClient.UpdateMessageAsync(
msg.MessageId,
msg.PopReceipt,
visibilityTimeout = visibilityTimeout
)
|> Async.AwaitTask
|> Async.Ignore
log.Debug
(int EventId.TimerMessageBeingProcessed,
string EventId.TimerMessageBeingProcessed)
"Timer message in queue {queueUri} with ToProcessOn = {toProcessOn} became visible before ToProcessOn. The message's invisibility was extended."
[| queueClient.Uri; decodedMessage.ToProcessOn |> DateTime.toStringIso |]
return decodedMessage.ToProcessOn
}
return calculateNextCheckOn toProcessOn DateTime.UtcNow
}
elif firstRun then
log.Debug
(int EventId.WaitingForTimerMessage, string EventId.WaitingForTimerMessage)
"No visible message in queue {queueUri}, but first run, so await schedule ..."
[| queueClient.Uri |]
let toProcessOn = crontab.GetNextOccurrence(DateTime.UtcNow)
calculateNextCheckOn toProcessOn DateTime.UtcNow |> Async.retn
else
log.Debug
(int EventId.WaitingForTimerMessage, string EventId.WaitingForTimerMessage)
"No visible message in queue {queueUri} after sleep but there should have been one, maybe delayed, so check frequently ..."
[| queueClient.Uri |]
DateTime.UtcNow.AddMinutes(1) |> Async.retn
let sleepTimeSpan = (nextCheckOn - DateTime.UtcNow) + TimeSpan.FromSeconds(1) // add 1 second to make 100% sure sleep is over only after the message has already become visible
log.Debug
(int EventId.WaitingForTimerMessage, string EventId.WaitingForTimerMessage)
"Sleeping {sleepTimeSpan} before checking again in queue {queueUri}"
[| sleepTimeSpan; queueClient.Uri |]
if sleepTimeSpan > TimeSpan.Zero then
do! Async.Sleep(sleepTimeSpan)
with ex ->
log.Exception
(int EventId.TimerMessageProcessingError, string EventId.TimerMessageProcessingError)
"Exception while processing messages: {ex}"
ex
[| ex |]
firstRun <- false
}
/// Creates timer queue messages if missing and starts an infinite loop for receiving queue timer messages
let createAndConsumeMessages
(name: string)
(crontab: CrontabSchedule)
(storageConnectionString: string)
(queueName: string)
(visibilityTimeout: TimeSpan)
(processMessage: Log -> CancellationToken -> DateTime -> Async<unit>)
(started: ManualResetEvent)
(log: Log)
: (CancellationToken -> Async<unit>)
=
fun cancellationToken ->
async {
let options = QueueClientOptions() // TODO: Configure options?
let queueClient = QueueClient(storageConnectionString, queueName, options)
let! response = queueClient.CreateIfNotExistsAsync() |> Async.AwaitTask
if not (isNull response) then // queue was just created
let! timerMessage = createAndSendNextTimerMessage queueClient crontab
log.Info
(int EventId.TimerProcessorStarting, string EventId.TimerProcessorStarting)
"New queue {queueUri} was created and a new timer message with ToProcessOn = {toProcessOn} was sent to it."
[| queueClient.Uri; timerMessage.ToProcessOn |> DateTime.toStringIso |]
else // queue was already existing
let! response = queueClient.GetPropertiesAsync() |> Async.AwaitTask
if response.HasValue && response.Value.ApproximateMessagesCount = 0 then // ApproximateMessagesCount, even though not exact, is guaranteed to have a value > 0 if there are messages. Additionally "Approximate messages count will give you an approximate count of total messages in a queue and will include both visible and invisible messages."
let! timerMessage = createAndSendNextTimerMessage queueClient crontab
log.Info
(int EventId.TimerProcessorStarting, string EventId.TimerProcessorStarting)
"Queue {queueUri} exists, but no timer message found in it. Created timer message with ToProcessOn = {toProcessOn}."
[| queueClient.Uri; timerMessage.ToProcessOn |> DateTime.toStringIso; queueClient.Uri |]
else
log.Info
(int EventId.TimerProcessorStarting, string EventId.TimerProcessorStarting)
"Existing queue {queueUri} with timer message found."
[| queueClient.Uri |]
let messageBatchMaximumCount = 1 // 1 queue per timer processor, with 1 message per queue only
started.Set() |> ignore
log.Info
(int EventId.TimerProcessorStarting, string EventId.TimerProcessorStarting)
"Starting with config:\n\
\tUri = {queueClientUri}\n\
\tMessageBatchMaximumCount = {messageBatchMaximumCount}\n\
\tVisibilityTimeout = {visibilityTimeout}\n\
\tOptions.MessageEncoding = {messageEncoding}\n\
\tOptions.Retry.Mode = {retryMode}\n\
\tOptions.Retry.Delay = {retryDelay}\n\
\tOptions.Retry.MaxDelay = {retryMaxDelay}\n\
\tOptions.Retry.MaxRetries = {retryMaxRetries}\n\
\tOptions.Retry.NetworkTimeout = {retryNetworkTimeout}\n\
"
[|
queueClient.Uri
messageBatchMaximumCount
visibilityTimeout
options.MessageEncoding
options.Retry.Mode
options.Retry.Delay
options.Retry.MaxDelay
options.Retry.MaxRetries
options.Retry.NetworkTimeout
|]
do!
consumeMessages
log
name
queueClient
cancellationToken
crontab
processMessage
messageBatchMaximumCount
visibilityTimeout
}
Notes:
- The implementation is based on an Azure Storage Queue with a single message inside, which is made "invisible" for a certain period of time, which allows for surviving a process crash, having multiple instances running etc. The whole implementation is about 250 LOCs.
SignalR
Old code:
type WebJobs(...) =
[<FunctionName("HandleAndPushToClient")>]
member _.HandleAndPushToClient
(
[<EventHubTrigger("",
Connection = EnvVars.EventHubs.Xyz.connectionStringKey,
ConsumerGroup =
DependencyInjection.EventHubs.Xyz.consumerGroupForWebSocketNotification)>] msg:
EventData,
enqueuedTimeUtc: DateTime,
sequenceNumber: Int64,
offset: string,
[<SignalR(HubName = DependencyInjection.SignalR.hubName,
ConnectionStringSetting = "AzureSignalRConnectionString")>] signalRMessages:
IAsyncCollector<SignalRMessage>,
logger: ILogger
)
=
// transform some internal event to external
// publish to SignalR using signalRMessages.AddAsync
Notes:
- The output triggers are generally a killer feature, which is really killing you - instead of invoking a very simple Azure SDK client method, you deal with IAsyncCollector and SignalRTrigger magic, which is completely unnecessary. Go figure out how to send message to a specific user or to all ...
New code (same as for EventHubProcessor above:
let serviceManager =
SignalRClient.getServiceManager EnvVars.SignalR.connectionString
let hub =
SignalRClient.getHubContext serviceManager EnvVars.SignalR.azureSignalRHubName
|> Async.RunSynchronously // TODO: Find a way to get rid of this
let sendToUser = SignalRClient.sendToUser hub
// Api.Functions.fs, handle the event
let processEvent log sendToUser partitionId (cancellationToken: CancellationToken) event = async {
// handle the event
do! sendToUser "SomeTarget" "Some Message" "SomeUserId"
}
Notes:
- The Azure SDK for sending SignalR Messages is very very straightforward, when you use it directly ...
Conclusion
Removing a layer of indirection has always generated great satisfaction in me. Not only does it make the whole application easier to understand, but you gain also a lot more control, and get to know the inner workings of the technology, without someone deciding something for you, or translating stuff like configuration for you.
MS seems to always try to make things easier for the developer (patronizing him?) by providing a magical and abstract framework which achieves exactly the opposite. My recommendation to MS would be to try make everything look like a stupid console application instead, with full control of the client developer, who is just using a bunch of simple "helper" functions (or class methods in OOP) from MS, and nothing more.
Hopefully someone can save some time doing something similar based on the ideas and code in this article!
-
Funny enough in the meantime MS seems to be trying to do something similar, by integrating WebJobs SDK into .NET 9's HostBuilder. ↩
Top comments (0)