Contracts for Event Sourced Systems with FsCodec
FsCodec is an opinionated library for working with contracts in event sourced systems in F#. At its core, it’s a set of custom JSON serializers for two backends Newtonsoft.Json (NSJ) or System.Text.Json (STJ).
Normally a higher level system like Propulsion or Equinox will handle this for you - but if you just want contract encoding/decoding, we can directly use FsCodec
which is plenty high-level. This post details using it in your projects if you don’t want to take on additional dependencies.
Why
Why not just use NSJ or STJ instead? For starters, it doesn’t understand F# core types like options, lists, maps et al. If you chose to add custom serializers for those types and made some reasonable choices to get a consistent, version-able output, you’d likely end up with FsCodec.
Structuring events
This is where the opinionated part is - if you want to structure your events in a resilient fashion, one of the more maintenance-amenable ways is to structure it as a union contract. Concisely:
- a discriminated union for each event stream
- each event in the stream is a union case
- all details of an event is contained in a single record type
- the union is marked with a
IUnionContract
interface
Consider this toy domain model:
module ShoppingCart =
type CartAddItem = {
sku : string
quantity: int
}
type CartRemoveItem = {
sku : string
}
type CartUpdateQuantity = {
sku : string
newQuantity : int
}
type CartClear = {
automatic : bool
}
type CartCheckout = {
oneclick : bool
}
[<RequireQualifiedAccess>]
type ShoppingCartEvents =
| CartAddItem of CartAddItem
| CartRemoveItem of CartRemoveItem
| CartUpdateQuantity of CartUpdateQuantity
| CartClear of CartClear
| CartCheckout of CartCheckout
interface IUnionContract
Each event is a union case with detail contained in a record type.
Encoding/Decoding
With an event DU, you can create a codec to encode/decode it.
let codec = Codec.Create<ShoppingCart.ShoppingCartEvents>()
If you add more fields in the record in the future, existing applications will ignore those fields. If you add more events in the future, they won’t be decoded by the codec and just be ignored.
Now that we have a codec, we can decode events with TryDecode
and encoding events with Encode
.
FsCodec uses two core types as its event abstraction:
IEventData
represents a single raw event (and optional metadata) which hasn’t been persisted yetITimelineEvent
inherits fromIEventData
and represents a persisted single event (with optional metadata)
To encode, there’s codec.Encode
- which gets you an IEventData
.
let event = ShoppingCartEvents.CartAddItem { sku = "ABC10"; quantity = 1 }
let eventData = codec.Encode(None, event)
eventData.Data // {"sku":"ABC10","quantity":1}
eventData.EventType // CartAddItem
The event data is serialized separately from the union case. In datastores, you save the event type (union case) out-of-band from the event data. In EventStore, there’s EventType
and with Kafka, you generally have the event type as part of the headers. If you however need it in-band, you can use FsCodec’s Serdes
.
Serdes.Serialize event // {"Case":"CartAddItem","Fields":[{"sku":"ABC10","quantity":1}]}
To decode, FsCodec expects a ITimelineEvent
. For example, to decode an event from EventStoreDB, we can take the ResolvedEvent
type from the EventStore client and create a ITimelineEvent
:
let toTimelineEvent (resolvedEvent: ResolvedEvent) =
let evt = resolvedEvent.Event
TimelineEvent.Create(
index = evt.EventNumber,
eventType = evt.EventType,
data = evt.Data,
meta = evt.Metadata,
eventId = evt.EventId,
timestamp = DateTimeOffset evt.Created)
and
resolvedEvent |> toTimelineEvent |> codec.TryDecode // ShoppingCartEvents option
It’s up to your implementation to decide whether you ignore un-decodable events or do something else like log them.
Differing Contracts
While these are great for schemas we fully own, with external systems we may not be so lucky. Sometimes, the message contract may be completely different from the inputs to our ACL.
In a case where a single field in the message serves as the discriminator tag, we can use an UnionConverter
. It flattens the event data and label (this is customizable) into a single JSON object.
Serialize (Greet { message = "hi"}) // {"case":"Greet","message":"hi"}
However, when the message contract diverges too far from our internal representation (a level usually behind the domain model), we need an escape hatch.
That escape hatch presents itself in the form of JsonIsomorphism
.
An Isomorphism is defined as a reversible mapping between two types, f : T -> U
and g : U -> T
such that f.g
and g.f
is equal to id
.
type JsonIsomorphism<'T,'U> =
...
abstract Pickle: 'T -> 'U
abstract UnPickle: 'U -> 'T
Pickle
is converting to the contract type and Unpickle
is converting from the target type.
For example, consider the type:
type ConveyorAssociation =
| ConveyorId of string
| NoAssociation
which is used in a contract type
{ ...
association : ConveyorAssociation option }
but the actual JSON message contains one of:
"association" : "NONE"
which maps toSome ConveyorAssociation.NoAssociation
"association" : "ABC123"
which maps toSome (ConveyorAssociation.ConveyorId "ABC123")
"association" : null
which maps toNone
This type of contract is fairly arbitrary and messy, using the string "None"
as a sentinel value.
The solution is to introduce an isomorphism between ConveyorAssociation option
<=> string option
:
type ConveyorConverter() =
inherit JsonIsomorphism<ConveyorAssociation option, string option>()
override _.Pickle v =
v |> Option.map(function
| NoAssociation -> "NONE"
| ConveyorId id -> id)
override _.UnPickle s =
s |> Option.map(function
| "NONE" -> NoAssociation
| id -> ConveyorId id)
and annotating the record field with:
{ ...
[<JsonConverter(typeof<ConveyorConverter>)>]
association : ConveyorAssociation option }
which now works like we expect it to.
Enumerations
If a particular value is best represented by an enumeration, it’s better to use a DU instead of an .NET Enum. Enums in either serialization library can take more than the defined range of values.
However, a DU will be serialized with a case label. If the DU will never have any attached fields,
we can use a TypeSafeEnumConverter
.
[<JsonConverter(typeof<TypeSafeEnumConverter>)>]
type FileSource =
| HTTPS
| HTTP
| FTP
| SFTP
type FileUploaded = {
filename: string
size: int64
source: FileSource
}
let event =
{ filename = "test.xls"; size = 10000L; source = SFTP }
Serdes.Serialize event // {"filename":"test.xls","size":10000,"source":"SFTP"}
Deserializng parses it out to a DU case, however missing DU cases will thrown an exception, so it’s difficult to version. For more flexibility, we can use an isomorphism.
type TypeSafeEnumConverterWithFallback<'T>(fallback : string) =
inherit JsonIsomorphism<'T, string>()
let fallback = TypeSafeEnum.parse fallback
override _.Pickle v = TypeSafeEnum.toString v
override _.UnPickle s = s |> TypeSafeEnum.tryParse |> Option.defaultValue fallback
and
...
[<JsonConverter(typeof<TypeSafeEnumConverter>, (nameof HTTP))>]
source: FileSource
This gives us much more flexibility to add cases in the future and work safely with existing applications.
Further reading
Versioning events is a non-trivial problem, and is not under the scope of this post.
- A Contract Pattern for Schemaless DataStores by Eirik Tsarpalis.
- The FsCodec Readme
- Serialization strategy for Event Sourcing by Software Mill
Last modified on 2021-12-18
Comments Disabled.