Skip to main content

Resiliency in distributed messaging

A common unit of work in a distributed system involves at least two things:
1) Storing the result of work to DB
2) Notifying consumers about the changes

The problem is - they often can't be done in the same transaction:
Kafka, Redis, RabitMQ, AlmostAnyCloud messaging system - they all don't support XA transactions. (And not because they are lazy to implement one, but that deserves a separate post)

Let's say we have a transaction scope opened(Tb) and we want to store some changes to DB and dispatch an event:

Tb -> DB -> Message -> Tc 

Looks valid, right? If DB changes fail we will rollback DB transaction. If dispatching an event failed, we still rollback. Seems quite transactional, where is the problem ¯\(°_o)/¯?

Problem 1: Leaked notifications: on failure

The problem is with the last part: Tc - Transaction commit (end)
Why would it fail?
  • It can be a network outage
  • Our DB transaction can timeout while we are dispatching messages
  • SQL Server might run out of space and fail to write to commit log
  • Probably more depending on DB you are using 
The evil of this failure is that its quite hard to test as you need to break the right thing at the right time, but once you get to quite a high load it will eventually hit you.

Problem 2: Leaked notifications: notified before things have actually happened

That was a simple case, in real life, we might want to store a bulk of changes in one unit of work, and send a notification for each change:

Tb -> DB1-> M1-> DB2 ->... DB10-> M10 -> -> Tc 

This scenario is not just complicated in terms of in how many places we can break now and have leaked messages, but also creates an extra problem: we are notifying consumers about events that haven't happened yet (transaction didn't commit).

So if you store some cache you might receive a notification about the change, reset the cache but then request and store stale data (transaction didn't commit at the time of the request). Or receive a notification about a new payment - try to get details couple of times but give up and fail right before the transaction completes.

Problem 3: Duplicated messages

The other big part of the problem is: we probably will try to repeat the failed operation, and that will result in sending messages again.

Let's say that the consumer of the event wants to send some notifications as well, so we have:
  • Get message (M_get)
  • Do some job (DB)
  • Dispatch new event message (M)
  • Acknowledge received message (M_ack)
We can wrap the processing part in a transaction:

M_get -> [Tb -> DB -> M-> Tc] -> M_ack

Any issues (except the old one with leaked Message)? 
Its the last thingy again: M_ack - everything can succeed but we might fail to acknowledge the message, it will get back to the queue and we would have to repeat its processing again and send events more one more time.

We can try to store received message hash to verify duplications, but once DB rollbacks we`ll lose our data. 

At this point, people usually try to joggle with the order to find the magic combination. But the hard truth is: it doesn't exist. One thing I`ve seen too many times is when in the hunt for that combination people put M_ack into a transaction scope:

M_get -> [Tb -> DB -> M-> M_ack -> Tc]

Remember what we started with? Transaction commit CAN fail. And if transaction commit fails we will lose EVERYTHING: the incoming message and the DB changes. That's the worst thing you normally want: you notified consumers about something that will never happen and then lost all traces of work.  (。_。)

So what do we have? 
  •  We can send notifications about things that haven't happened yet
  •  We can send notifications about things that never happened
  •  We can send duplicated notifications

What's the solution?

In general, you have 2 options on how to deal with it:
  1. Event sourcing 
  2. Outbox pattern (with polling or log trailing)
Each has its own variations and I`ll try to describe it in the next posts.

Comments

Popular posts from this blog

Using MinIO as on premises object storage with .NET and S3 SDK

Ever tried to find a blob store that can work on-premises as well as in a cloud, support meta-data, scale well and have .NET client libraries? I did and stopped on MinIO . Well, honestly to my surprise I was quite limited in my choice. It's free, it's open-source, it can work on-premises and has helm charts for k8s. The best thing is that its S3 compatible, so if one day you move to the cloud the only thing you`ll need to change in your code is a connection string. The easiest way to start is by starting a docker image. Pull the image: docker pull minio/minio start for testing (data will be part of the container, so after a restart, all files will be gone docker run -p 9000:9000 minio/minio server /data Or start with a mapped image in windows: docker run -p 9000:9000 --name minio1 \ -v C:\data:/data \ minio/minio server /data When the server is up you can access it by http://127.0.0.1:9000/minio/login default user/password: minioadmin/minioadmin Working wi...

Avoiding distributed transactions (DTC) with SQL Server and async code

Wrapping async code in transaction scope is not as straightforward as sync one. Let's say we have some simple code: await using (var connection = new SqlConnection(connectionString)) { await using var command = new SqlCommand("select 1", connection); await connection.OpenAsync(); await command.ExecuteScalarAsync(); } We can wrap it in transaction scope and test that it still works: using var ts = new TransactionScope(); await using (var connection = new SqlConnection(connectionString)) { await using var command = new SqlCommand("select 1", connection); await connection.OpenAsync(); await command.ExecuteScalarAsync(); } ts.Complete(); But if you try to run this code you will get: "A TransactionScope must be disposed on the same thread that it was created" exception.  The fix is easy: we need to add TransactionScopeAsyncFlowOption.Enabled option to the constructor: var options = new TransactionOptions { IsolationLevel = IsolationLevel.ReadCom...

Fluent-Bit and Kibana in Kubernetes cluster or minikube

Agenda I`ll show how to setup a centralized logging solution running in k8s cluster that works beyond hello world examples.I`ll use local minikube but the same charts with adjustments could be used for normal k8s cluster (the real diff usually comes with usage of persistent storage). What you need to be installed: K8s Cluster (as I said, I use minikube ) Helm ( https://helm.sh/docs/intro/install/ ) Code: https://github.com/Vfialkin/vf-observability A bit of theory first: Let’s start with how logging works by default in Docker and Kubernetes. application log appender should forward logs to standard output, this way it will be passed to Docker container.  default container logging driver will forward them to Pod where logs are stored as JSON files (see: configure logging drivers ). There are other options for log drivers like  syslog, fluentd or splunk , but for now, I’ll limit scenario to default driver. at the end all those files will end-up in a node folde...