As I’ve mentioned in previous posts about Akka.NET, everything that actor knows is stored in memory. While we can skip any database or IO related bottlenecks because things in memory are generally speaking very fast, we must remember that memory is not a place where you can store your data for an extended period of time and keep it. So let’s talk about persisting it.
The simplest scenario is just manually updating data in some kind of storage every single time when actor state changes and fetching actor state during creation if any data is available. While it’s simple and will do the job, saving entire state every time we update something can be quite encumbering for our storage, especially when we deal with hundreds of thousands or millions of messages in a relatively short period of time and that is not something unheard of when using actor model.
So we need to persist something smaller. Any idea what? If you thought about messages you were really close. We could do that of course, but messages contain some data that we don’t need to persist, in my case, it’s for example UserName that is being used to send a message to proper user’s actor and we don’t need to store it. That’s why we should introduce a new type of object in our actor model –
That’s why we should introduce a new type of object in our actor model – an event. Or in other words – something that happened ie. content was added, user subscribed to tag or something like that. In the previous post, I’ve written about commands and queries. Events are follow-ups of commands and while commands are commanding to do something and containing some data that’s necessary for that, events represent changes of our actors state.
In example I have TagContentCommand class which represents that’s content(what) was tagged(with what) by user(by who).
1 2 3 4 5 6 7 8 9 10 11 12 13 |
public class TagContentCommand : CommandBase, IHaveUserName, IHaveContentUri, IHaveContentTags { public Uri Uri { get; private set; } public IEnumerable<string> ContentTags { get; private set; } public string UserName { get; private set; } public TagContentCommand(Uri uri, string userName, IEnumerable<string> contentTags = null) : base() { Uri = uri; UserName = userName; ContentTags = contentTags; } } |
This command will be sent to content, tag and user actor and each of them needs other pieces of information, from the perspective of every actor – different thing happened.
User actor has seen that IT tagged ITS content with some tags. It doesn’t need to store information about who it is, it does know that at any time and it’s not going to change. That’s why we persist just thins event:
1 2 3 4 5 6 7 8 9 10 11 |
public class ContentTaggedByUserEvent { public Uri ContentUri { get; private set; } public string[] ContentTags { get; private set; } public ContentTaggedByUserEvent(Uri uri, IEnumerable<string> contentTags = null) { ContentUri = uri; ContentTags = contentTags?.ToArray(); } } |
Akka.Persistence
Before we move further let’s talk about a great library called Akka.Persistence. Its role is to persist events and snapshots (we’ll talk about them in a few days) in some kind of storage. We have many available persistence providers ie. MS SQL, MongoDB, DocumentDB, Azure Tables and others. I’m using MS SQL for now because it’s easiest to maintain during development, but I’ll probably move to something non-relational because storing events doesn’t require any relations and
I’m using MS SQL for now because it’s easiest to maintain during development, but I’ll probably move to something non-relational because storing events doesn’t require any relations and db schema is just flat table with few columns with persistence-id (actor’s unique id), sequence number (first, second, third event persisted and so on), few other columns and finally payload which is serialized event.
Because of its flat structure, I want to try some kind of table storage like Azure Table Storage or Table Storage part of introduced yesterday at Build conference – Azure Cosmos DB. I think performance will be great.
When we’ll have Akka.Persistence library and some kind of provider library we need to configure it using HOCON Configuration (config notation used ie. in web.config file, I haven’t covered it yet but I’ll do it soon so I won’t explain it here). MS SQL Configuration in web.config looks like that:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
<akka> <hocon> <![CDATA[ akka{ persistence { journal { plugin = "akka.persistence.journal.sql-server" sql-server { class = "Akka.Persistence.SqlServer.Journal.SqlServerJournal, Akka.Persistence.SqlServer" plugin-dispatcher = "akka.actor.default-dispatcher" # connection string used for database access connection-string-name = "Me20DbConnectionString" # can alternativly specify: connection-string-name # default SQL timeout connection-timeout = 60s # SQL server schema name schema-name = dbo # persistent journal table name table-name = EventJournal # initialize journal table automatically auto-initialize = on timestamp-provider = "Akka.Persistence.Sql.Common.Journal.DefaultTimestampProvider, Akka.Persistence.Sql.Common" metadata-table-name = Metadata } } snapshot-store { plugin = "akka.persistence.snapshot-store.sql-server" sql-server { class = "Akka.Persistence.SqlServer.Snapshot.SqlServerSnapshotStore, Akka.Persistence.SqlServer" plugin-dispatcher = "akka.actor.default-dispatcher" table-name = SnapshotStore schema-name = dbo auto-initialize = on connection-string-name = "Me20DbConnectionString" } } } } ]]> </hocon> </akka> |
As you can see shapshot store which stores our snapshots(more details about them in next post) and event journal (our events) are two separate storages. That means we can store our events ie. in some table storage I’ve mentioned and our snapshots(entire actor’s state) in some DocumentDB.
Code changes
Introducing Akka.Persistence will require one thing from our actors – to derive from ReceivePersistentActor(or UntypedPersistentActor) class and implement some kind of PersistenceId string.
Main change is that Receive method used to handle incoming messages was changed to Command method. We’ve also got brand new Persist and Recover methods. So in my user actor, I’m registering handlers like that.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
private void RegisterCommands() { //... //Tag subscribtion Command<SubscribeToTagCommand>(cmd => HandleAddSubscribedTagCommand(cmd)); Recover<TagSubscribedEvent>(ev => HandleTagSubscribedEvent(ev)); } private void HandleAddSubscribedTagCommand(SubscribeToTagCommand cmd) { var @event = new TagSubscribedEvent(cmd.TagName); if (HandleTagSubscribedEvent(@event)) Persist(@event, ev => HandleSnapshoting(ActorState)); } private bool HandleTagSubscribedEvent(TagSubscribedEvent ev) { return ActorState.AddSubscribedTag(ev.TagName); } |
All magic happens in HandleAddSubscribedTagCommand method, before everything else I’m creating an event from data passed in my command. Normally I would invoke Persist method instantly to persist event (first parameter in line 13) and handle it in lambda (input parameter is event that we’ve just persisted and it’ll be executed after persisting our event).
In my case I’m storing subscribed tags in HashSet, that means I want to store only unique values. I really don’t need to persist event that doesn’t change anything in my actor’s state so if HandleTagSubscribedEvent return false I just don’t persist anything because it’s not necessary, after all nothing changed.
Take note that if I do things this way and something goes wrong with persisting event, it’ll not be replayed during restoring state in this case actor’s state could be different after restoring it. In my case it’s acceptable, if it’s different in your app you should handle event inside of Persist method to be sure that event will be persisted first.
Restoring events…
Imagine some kind of application crash, turning off or just power surge of some kind. Our entire app goes down and we lose everything stored in memory. But don’t worry, it’ll restart in a moment and our actors will be restored. How?
During actor creation, underlying storage is being checked for any stored events and if there are any they’ll be fetched and handler registered in Recover method will be invoked with passed parameter. And because I’ve used the same handler for the exactly same event my actor’s state will be exactly the same, actor just replayed every single change since it was first being created.
…and snapshots
But what if we stored dozens of thousands of events for any single actor? Should we restore them all one by one? That would be a little stupid and time-consuming. That’s why from time to time (ie. every 100 or 200 persisted events) we should persist snapshot of entire actors state.
Then if we need to restore actor to sequence number 11205 and we’ve stored snapshot every 100 persisted events, we just need to restore the latest snapshot (for sequence number 11200) and just replay 5 newest events which would be much faster that replaying almost dozen thousands of single events.
I’ll cover storing snapshots in next post in a few days and some advanced persisting scenarios in another one. See you then!