Framework for kafka application. It contains computation expressions to help with building this kind of application and have in-build metrics, logging, parsing, etc..
Add following into paket.references
Alma.KafkaApplication
Entry point (your application)
├─> Build (computation expression) builds the Application out of your configuration
└─> Run Application, which might be either Ok or with the Error
┌────────────────────────────────┘ └──────────────────────────────────────────<Ends with the Error>───────┐
├─> Before Start (Debug pattern specific configuration) │
├─> Start Custom Tasks │
├─> Initialize runtime parts (only if initialization is set) │
├─> Before Run (allow the pattern to map some runtime parts) │
└─> Run Kafka Application │
├─> Debug Configuration (only with debug verbosity) │
├─> Enable Context Metric │
├─> Mark instance as disabled (set service_status to 0) │
├─> Check additionally registered resources and start interval checking │
├─> Start Metrics on Route (only if route is set) │
├─> Connect Producers <─────────────────────────────────────────────────────────┐ │
│ └─<On Error>───> Producer Error Handler (Default - RetryIn 60 seconds) │ │
│ └─> One of Following: │ │
<All connected> └─> Retry ─────────────────────────────────────────┘ │
│ └─> RetryIn X seconds ─────────────────────────────┘ │
│ └─> Shutdown ──────────────────────────────────────<Ends with the RuntimeError>────┐ │
│ └─> ShutdownIn X seconds ──────────────────────────<Ends with the RuntimeError>────┐ │
├─> Produce instance_started event to the [Supervision stream] (only if supervision stream is registered) │ │
├─> Consume all registered Consume Handlers, one by one (set service_status to 1) <───┐ │ │
│ └─<On Error>───> Consume Error Handler (Default - RetryIn 60 seconds) │ │ │
│ │ └─> One of Following: │ │ │
<All consumed> │ └─> Continue (with next Consume Handler) ───────────────┘ │ │
│ │ └─> Retry ──────────────────────────────────────────────┘ │ │
│ │ └─> RetryIn X seconds ──────────────────────────────────┘ │ │
│ │ └─> Shutdown ─────────────────────────────────────<Ends with the RuntimeError>────┐ │
│ │ └─> ShutdownIn X seconds ─────────────────────────<Ends with the RuntimeError>────┐ │
│ └─<On Success>─> Flush all Producers │ │
├─> Close all Producers │ │
│ (Successfully end) ──────────┐ │ │
└─> Return ApplicationShutdown │ │ │
└─> One of Following: │ │ │
└─> Successfully <─────┘ │ │
└─> WithRuntimeError <────────────────────<Application RuntimeError or Unhandled Exception>───────────┘ │
└─> WithError <──────────────────────────────────────────────────────────────────────────────────────────┘
┌────<ApplicationShutdown>────┘
└─> ApplicationShutdown.withStatusCode ─┐
<────────<Status code - [0|1]>──────────────┘
Definitions for patterns could is in the Confluence. You can simply use predefined patterns for application you want.
NOTE: All functions has the first argument for the state: Configuration<'Event>, but this is current state of the application and it is passed implicitly in the background by computation expression.
| Function | Arguments | Description |
|---|---|---|
| addHttpHandler | httpHandler: Giraffe.HttpHandler |
It will register additional HttpHandler to the WebServer. |
| checkKafkaWith | checker: Kafka.Checker |
It will register a checker, which will be passed to the Consumer Configuration and used by Kafka library to check resources. |
| checkResourceInInterval | checker: unit -> Metrics.ResourceStatus, ResourceAvailability, interval: int<second> |
It will register a resource which will be checked in runtime of the application in given interval. |
| connect | Kafka.ConnectionConfiguration |
It will register a default connection for Kafka. |
| connectManyToBroker | ManyTopicsConnectionConfiguration |
It will register a named connections for Kafka. Connection name will be the same as the topic name. |
| connectTo | connectionName: string, Kafka.ConnectionConfiguration |
It will register a named connection for Kafka. |
| consume | handler: ConsumeRuntimeParts -> seq<'Event> -> unit |
It will register a handler, which will be called with events consumed from the default Kafka connection. |
| consumeFrom | connectionName: string, handler: ConsumeRuntimeParts -> seq<'Event> -> unit |
It will register a handler, which will be called with events consumed from the Kafka connection. |
| consumeLast | handler: ConsumeRuntimeParts -> 'Event -> unit |
It will register a handler, which will be called if there is a last message (event), in the default connection. |
| consumeLastFrom | connectionName: string, handler: ConsumeRuntimeParts -> 'Event -> unit |
It will register a handler, which will be called if there is a last message (event), in the connection. |
| initialize | initialization: Initialization<'OutputEvent, 'Dependencies> |
Allow to update current ConsumeRuntimeParts before any consume handler can access them. It is the best place to initialize any application dependencies. |
| merge | configuration: Configuration<'Event> |
Add other configuration and merge it with current. New configuration values have higher priority. New values (only those with Some value) will replace already set configuration values. (Except of logger) |
| onConsumeError | ErrorHandler = Logger -> (errorMessage: string) -> ConsumeErrorPolicy |
It will register an error handler, which will be called on error while consuming a default connection. And it determines what will happen next. |
| onConsumeErrorFor | connectionName: string, ErrorHandler = Logger -> (errorMessage: string) -> ConsumeErrorPolicy |
It will register an error handler, which will be called on error while consuming a connection. And it determines what will happen next. |
| onProducerError | ErrorHandler = Logger -> (errorMessage: string) -> ProducerErrorPolicy |
It will register an error handler, which will be called on error while connecting producers. And it determines what will happen next. |
| parseEventWith | ParseEvent<'InputEvent> |
It will register a parser for input events. |
| parseEventAndUseApplicationWith | ConsumeRuntimeParts -> ParseEvent<'InputEvent> |
It will register a parser for input events. The parser has access to consume runtime parts. |
| produceTo | connectionName: string, FromDomain<'OutputEvent> |
This will register both a Kafka Producer and a produce event function. |
| produceToMany | topics: string list, FromDomain<'OutputEvent> |
This will register both a Kafka Producer and a produce event function for all topics with the one fromDomain function. |
| registerCustomMetric | CustomMetric |
It will register a custom metric, which will be shown (if it has a value) amongst other metrics on metrics route. (see also showMetrics, ConsumeRuntimeParts.IncrementMetric, etc.) |
| runCustomTask | TaskErrorPolicy, CustomTaskRuntimeParts -> Async<unit> |
Register a CustomTask, which will be start with the application. |
| showCustomMetric | name: string, MetricType, description: string |
It will register a custom metric, which will be shown (if it has a value) amongst other metrics on metrics route. (see also showMetrics, ConsumeRuntimeParts.IncrementMetric, etc.) |
| showInputEventsWith | createInputEventKeys: InputStreamName -> 'Event -> SimpleDataSetKey |
If this function is set, all Input events will be counted and the count will be shown on metrics. (Created keys will be added to the default ones, like Instance, etc.) |
| showMetrics | It will asynchronously run a web server (http://127.0.0.1:8080) and show metrics (for Prometheus) on the route. Route must start with /. |
|
| showMetrics | port: int |
It will asynchronously run a web server (http://127.0.0.1:{PORT}) and show metrics (for Prometheus) on the route. Route must start with /. |
| showOutputEventsWith | createOutputEventKeys: OutputStreamName -> 'Event -> SimpleDataSetKey |
If this function is set, all Output events will be counted and the count will be shown on metrics. (Created keys will be added to the default ones, like Instance, etc.) |
| showInternalState | path: string |
Add HttpHandler for a given path which will show current internal state as a plain/text. Route must start with /. |
| useDockerImageVersion | DockerImageVersion |
|
| useGit | Git |
|
| useGroupId | GroupId |
It is optional with default GroupId.Random. |
| useGroupIdFor | connectionName: string, GroupId |
Set group id for connection. |
| useCommitMessage | CommitMessage |
It is optional with default CommitMessage.Automatically. |
| useCommitMessageFor | connectionName: string, CommitMessage |
Set commit message mode for connection. |
| useCurrentEnvironment | Environment |
|
| useInstance | Instance |
|
| useLogger | logger: Logger |
It is optional. |
| useSpot | Spot |
It is optional with default Zone = "common"; Bucket = "all" |
| useSupervision | Kafka.ConnectionConfiguration |
It will register a supervision connection for Kafka. This connection will be used to produce a supervision events (like instance_started) |
Initialize<'OutputEvent, 'Dependencies>could return one of following:ConsumeRuntimePartsResult<ConsumeRuntimeParts, ErrorMessage>AsyncResult<ConsumeRuntimeParts, ErrorMessage>
FromDomain<'OutputEvent>could return one of following:MessageToProduceResult<MessageToProduce, string>AsyncResult<MessageToProduce, string>
ParseEvent<'InputEvent>could return one of following:'InputEventResult<'InputEvent, string>AsyncResult<'InputEvent, string>
ConsumeHandler<'InputEvent, 'OutputEvent, 'Dependencies>could return one of following:unitResult<unit, string>AsyncResult<unit, string>
- Instance of the application is required.
- CurrentEnvironment of the application is required.
- You have to define at least one consume handler and its connection.
- You have to define
ParseEvent<'InputEvent>function.
- Default error handler for consuming is set to
RetryIn 60 secondson error. - Default error handler for connecting producers is set to
RetryIn 60 secondson error. - Default
GroupIdisRandom. And if you define group id withoutconnectionit will be used for all connections unless you explicitly set other group id for them. - Default
SpotisZone = "common"; Bucket = "all". - Default
Checkerfor kafka is default checker defined in Kafka library. - Default
GitCommitisunknown. - Default
DockerImageVersionisunknown.
open Giraffe
open Alma.KafkaApplication
kafkaApplication {
addHttpHandler (
route "/my-new-route"
>=> warbler (fun _ -> text "OK")
)
}In every Consume Handler, the first parameter you will receive is the ConsumeRuntimeParts. There are all parts of the application, you might need on runtime.
type ConsumeRuntimeParts<'OutputEvent, 'Dependencies> = {
LoggerFactory: ILoggerFactory
Box: Box
CurrentEnvironment: Alma.EnvironmentModel.Environment
ProcessedBy: Events.ProcessedBy
Environment: Map<string, string>
Connections: Connections
ConsumerConfigurations: Map<RuntimeConnectionName, ConsumerConfiguration>
ProduceTo: Map<RuntimeConnectionName, ProduceEvent<'OutputEvent>>
IncrementMetric: Metrics.MetricName -> SimpleDataSetKeys -> unit
IncrementMetricBy: Metrics.MetricName -> SimpleDataSetKeys -> Metrics.MetricValue -> unit
SetMetric: Metrics.MetricName -> SimpleDataSetKeys -> Metrics.MetricValue -> unit
EnableResource: ResourceAvailability -> unit
DisableResource: ResourceAvailability -> unit
Dependencies: 'Dependencies option
Cancellation: System.Threading.CancellationTokenSource
}In every Custom task, the first parameter you will receive is the CustomTaskRuntimeParts. There all parts of the application, you might need on runtime.
type CustomTaskRuntimeParts = {
Logger: ApplicationLogger
Box: Box
Environment: Map<string, string>
IncrementMetric: MetricName -> SimpleDataSetKeys -> unit
IncrementMetricBy: MetricName -> SimpleDataSetKeys -> MetricValue -> unit
SetMetric: MetricName -> SimpleDataSetKeys -> MetricValue -> unit
EnableResource: ResourceAvailability -> unit
DisableResource: ResourceAvailability -> unit
ConsumerConfigurations: Map<RuntimeConnectionName, ConsumerConfiguration>
}It allows you to parse .env files and get other environment variables to use in your application workflow.
Environment computation expression returns Configuration<'Event> so you can merge it to the Kafka Application.
| Function | Arguments | Description |
|---|---|---|
| check | variable name: string, checker: string -> 'a option |
If the variable name is defined it is forwarded to the checker and it passes when Some is returned. |
| connect | connection configuration: EnvironmentConnectionConfiguration |
It will register a default connection for Kafka. Environment Connection configuration looks the same as Connection Configuration for Kafka, but it just has the variable names of the BrokerList and Topic. |
| connectManyToBroker | EnvironmentManyTopicsConnectionConfiguration |
It will register a named connections for Kafka. Connection name will be the same as the topic name. |
| connectTo | connectionName: string, connection configuration: EnvironmentConnectionConfiguration |
It will register a named connection for Kafka. |
| dockerImageVersion | variable name: string |
It will parse DockerImageVersion from the environment variable. |
| file | paths: string list |
It will parse the first existing file and add variables to others defined Environment variables. If no file is parse, it will still reads all other environment variables. |
| groupId | variable name: string |
It will parse GroupId from the environment variable. |
| ifSetDo | variable name: string, action: string -> unit |
It will try to parse a variable and if it is defined, the action is called with the value. |
| instance | variable name: string |
It will parse Instance from the environment variable. (Separator is -) |
| currentEnvironment | variable name: string |
It will parse Environment alias from the environment variable. |
| require | variables: string list |
It will check whether all required variables are already defined. |
| spot | variable name: string |
It will parse Spot from the environment variable. (Separator is -) |
| supervision | connection configuration: EnvironmentConnectionConfiguration |
It will register a supervision connection for Kafka. This connection will be used to produce a supervision events (like instance_started) |
This is the most elemental reason for this application framework, so there are simple ways for simple cases. But there are also more complex, for more complex applications, where you need more connections.
This is also mandatory, to define at least one connection.
Following example is the easiest setup, you can get.
open Alma.Kafka
open Alma.ServiceIdentification
open Alma.KafkaApplication
[<EntryPoint>]
let main argv =
kafkaApplication {
useInstance { Domain = Domain "my"; Context = Context "simple"; Purpose = Purpose "example"; Version = Version "local" }
useCurrentEnvironment environment
useGit {
Commit = Some (GitCommit "abc123")
Branch = None
Repository = None
}
useDockerImageVersion (DockerImageVersion "42-2019")
connect {
BrokerList = BrokerList "127.0.0.1:9092"
Topic = StreamName "my-input-stream"
}
consume (fun _ events ->
events
|> Seq.iter (printfn "%A")
)
}
|> run
|> ApplicationShutdown.withStatusCodeFollowing example is the easiest setup, you can get but uses a logger factory to log internal events.
open Alma.Kafka
open Alma.ServiceIdentification
open Alma.KafkaApplication
open Feather.ErrorHandling
type Dependencies = {
ServiceApi: ServiceApi
}
and ServiceApi = ServiceApi of string
[<EntryPoint>]
let main argv =
let envFiles = [ "./.env"; "./.dist.env" ]
use loggerFactory =
envFiles
|> LoggerFactory.common {
LogTo = "LOG_TO"
Verbosity = "VERBOSITY"
LoggerTags = "LOGGER_TAGS"
EnableTraceProvider = true
}
|> Result.orFail
kafkaApplication {
useInstance { Domain = Domain "my"; Context = Context "simple"; Purpose = Purpose "example"; Version = Version "local" }
useCurrentEnvironment environment
useGit {
Commit = Some (GitCommit "abc123")
Branch = None
Repository = None
}
useDockerImageVersion (DockerImageVersion "42-2019")
useLoggerFactory loggerFactory
require [
"SERVICE_API"
]
initialize (fun app ->
{ app with Dependencies = Some { ServiceApi = app.Environment.["SERVICE_API"] |> ServiceApi }}
)
connect {
BrokerList = BrokerList "127.0.0.1:9092"
Topic = StreamName "my-input-stream"
}
consume (fun app ->
let { ServiceApi = serviceApi } = app.Dependencies.Value
fun event ->
let response = serviceApi |> ServiceApi.getSomething
printfn "%A: %A" response event
)
}
|> run
|> ApplicationShutdown.withStatusCodeAndLogResult loggerFactoryInstead of previous example, where our application uses only one connections, there are cases, where we need more than that.
open Alma.Kafka
open Alma.ServiceIdentification
open Alma.KafkaApplication
[<EntryPoint>]
let main argv =
let brokerList = BrokerList "127.0.0.1:9092"
kafkaApplication {
useInstance { Domain = Domain "my"; Context = Context "simple"; Purpose = Purpose "example"; Version = Version "local" }
useCurrentEnvironment environment
// this will create only default connection, which can be consumed by the default `consume` function only
connect {
BrokerList = brokerList
Topic = StreamName "my-input-stream"
}
// this will create a second connections - called "secondary" which will be available in application parts by this key, and can be connect to
connectTo "secondary" {
BrokerList = brokerList
Topic = StreamName "my-secondary-stream"
}
// first we consume first 10 events from the "secondary" stream
consumeFrom "secondary" (fun _ events ->
events
|> Seq.take 10
|> Seq.iter (printfn "%A")
)
// ONLY after 10 events are consumed, this consume will be active and remain active, since it will wait indefinitely to next events
consume (fun _ events ->
events
|> Seq.iter (printfn "%A")
)
}
|> run
|> ApplicationShutdown.withStatusCodeNotes:
- It does NOT matter in which order you create connections - they are just registered.
- It does matter in which order you consume events - they are run in the same order of registration and the next consume will be run ONLY when the previous ended. So make sure it is not infinite, if you want to consume the other ones too.
- All connections are available in the
ConsumeRuntimeParts.Connectionsof the consume handler function
Now we have even more connection and different relationships between them.
Keep in mind, that this example is simplified and it is missing the parsing logic (which is passed to the run function)
open Alma.Kafka
open Alma.ServiceIdentification
open Alma.KafkaApplication
[<EntryPoint>]
let main argv =
let brokerList = BrokerList "127.0.0.1:9092"
kafkaApplication {
useInstance { Domain = Domain "my"; Context = Context "simple"; Purpose = Purpose "example"; Version = Version "local" }
useCurrentEnvironment environment
connect { BrokerList = brokerList; Topic = StreamName "my-input-stream" } // default connection
connectTo "application" { BrokerList = brokerList; Topic = StreamName "my-application-stream" }
connectTo "output" { BrokerList = brokerList; Topic = StreamName "my-output-stream" }
// first we consume the last event from the "application" stream, then we will consume the output stream to the last event (if there is None, handler won't be called at all)
consumeLastFrom "application" (fun application lastEvent ->
let hasCorrelationId event =
event.CorrelationId = lastEvent.CorrelationId
Consumer.consume application.Connections.["output"] RawEvent.parse
|> Seq.map (fun event ->
printfn "%A" event // do something with the event (we just print it)
event
)
|> Seq.takeWhile (hasCorrelationId >> not) // this will consume until event with last correlation id occurs, then it will stop
|> Seq.iter ignore
)
// then we consume the default connection
consume (fun _ events ->
events
|> Seq.iter (printfn "%A")
)
}
|> run
|> ApplicationShutdown.withStatusCodeYou can use different group id for every connection, but you have to explicitly define it, otherwise GroupId.Random will be used.
If you just define useGroupId "foo", it will be shared for all connections as default.
You can define specific group id for a connection by
useGroupIdFor "connection" "groupId-for-connection"You can use different commit message mode for every connection, but you have to explicitly define it, otherwise CommitMessage.Automatically will be used.
If you just define useCommitMessage CommitMessage.Manually, it will be shared for all connections as default.
You can define specific commit message for a connection by
useCommitMessageFor "connection" CommitMessage.ManuallyNOTE: Kafka application will also Manually commit the handled message for you - so there is nothing else to do, than just set a commit message mode.
- Increment version in
KafkaApplication.fsproj - Update
CHANGELOG.md - Commit new version and tag it
./build.sh build./build.sh -t tests