Reactive DDD with Akka – lesson 3 (Projections)
Paweł Kaczor - 17 czerwca 2014
Let’s continue our adventure with Akka and DDD (previous lesson). So far we have been concentrated on write/command side of the system. We have covered topics like command processing, event storage, sharding etc. but haven’t touched the other side of the system: the query side. Our goal of this lesson is to implement necessary infrastructure supporting creation of components responsible for feeding query-side database: projections. For query side we will choose sql database although there are other technologies worth considering (document or graph databases might be a better choice depending on requirements).
Our first task is to check if Akka (akka-persistence in current, still experimental version to be precise) supports concept of projections. And as it turns out, Akka provides View
trait that defines an actor capable of receiving events from journal of particular processor (aggregate root – AR) instance. This sounds good but unfortunately the concept of View
is insufficient for building projections that typically listen to aggregated stream of events rather then events of particular AR instance. A workaround would be to register a view for a processor that receives events from all ARs of given type, but in such scenario, events would be persisted twice and reliable event delivery (more on that later) would had to be ensured between ARs and aggregating processor.
What we should require from „ideal” event store implementation is a dsl for event streams transformation and aggregation.
EventStore should be mentioned here as it provides impressive set of methods for handling streams of events: Event Store as a Read Model including stream aggregation (per category/AR type) . You can configure EventStore Journal for Akka Persistence and try experiment with projections (be prepared to write some javascript and json).
But what if we don’t want to depend on any particular journal provider? Well, composing streams of events would be possible if journals were exposed through some functional (composable) interface rather than View
. And this is one of the goals already established by akka team with introduction of akka-stream module. Akka-stream is implementation of recently announced standard for asynchronous, composable and non-blocking stream processing on JVM: http://www.reactive-streams.org that models stream publishers and subscribers (as defined by the specification) as akka actors. It also already provides simple dsl (PersistentFlow
) for creating event stream publishers that are backed by Views
. As you can see here event streams can be aggregated by merging stream publishers (although aggregation by category is not yet supported). Once the akka-streams is released we should definitely make use of it and see how to implement projections as reactive components (subscribers of event streams). For now we need to find a different solution. The only possibility left is to introduce third-party broker and forward events to topics or queues configured within the broker. As we will see in following section, integrating akka with external broker is much simpler that one could imagine. This approach will require enriching AR with Publisher
component (trait) responsible for sending persisted events to configured target actor. This is a perfect opportunity to learn more about message delivery in distributed systems.
Reliable event delivery
Sending events generated by AR to any other actor (being a broker gateway or aggregating processor (already mentioned before)) should be straightforward to implement, right? Yes, it is, if you accept that in case of node crash some messages might be lost (despite the fact that corresponding commands had been acknowledged to the client!). Akka does not guarantee that message will be delivered (this is so called at-most-once delivery semantics) because reliable delivery does not come for free in distributed systems. To obtain at-least-once delivery semantics (message is guaranteed to be delivered, but duplicates may occur) acknowledgment protocol must be used. Sender must store the message before sending and receiver must acknowledge the message once it is received. If acknowledgment is received, sender must mark previously stored message as confirmed. If acknowledgment is not received within configured time period or sender is restarted (for example as the result of crash) sender retries delivery. Because acknowledgment message might be lost as well, sender might resend message already delivered. Akka-persistence provides at-least-once delivery semantics in its core. Journal plugin api defines methods for storing confirmation entries for messaged that have been acknowledged. Acknowledgment and retry mechanism is encapsulated within Channel
component that must be used to send events from within processor (AR) to destination actor. Events must be wrapped in Persistent
envelop so that receiver can acknowledge the message by simply calling Persistent.acknowledge
method.
ReliablePublisher
Thanks to traits composability we can put all the behavior required by reliable AR (sender) to separate component: ReliablePublisher. It extends from abstract EventPublisher that in turns extends from abstract EventHandler. We mix in AggregateRoot
with EventHandler
making event handling in context of AR configurable as much as possible (useful for testing purposes). Please notice that it is possible to stop redelivery when configured number of retries is exceeded. For that purpose RedeliverFailureListener
has been registered on the channel to be notified if redelivery fails. The listener actor throws RedeliveryFailedException
exception that results in restart of parent actor (AR) (to make it work supervisorStrategy
of listener actor had to be adjusted). Inside preRestart
method (that RedeliveryPublisher
overrides) we can trigger compensation action and/or mark failing event as deleted (to prevent continuation of redelivery after AR is restarted).
To provide destination for ReliablePublisher
its abstract member target
must be defined on creation time (AggregateRootActorFactory
should take care of this as shown here: ProductPublishingSpec. Finally we should verify if ReliablePublisher
works as expected and is really reliable. You can find test here: ReliablePublisherSpec.
Message exchange over queue
Now we will configure infrastructure for events transmission over durable queue. We will use Apache Camel to arrange in-only message exchange between a component writing to the queue (producer) and projection component reading from the queue (consumer). Thanks to akka-camel both components (producer and consumer) can be represented as … (surprise! 😉 actors. EventMessageConfirmableProducer is the producer that receives event messages (notice that events are published inside (EventMessage envelope) coming from ReliablePublisher
(AR) and forwards them to the configured camel endpoint (for example jms queue or topic) (see transformOutgoingMessage
method). Once event message is accepted by the queue (and persisted if the queue is durable) producer acknowledges event reception to the publisher (ReliablePublisher
) (see routeResponse
method). Please notice that we choosed to unwrap EventMessage
from ConfirmablePersistent
envelope before putting it into the queue (so that consumer does not have to do unwrapping itself). ConfirmablePersistent
still needs to be attached to the EventMessage
so we convert it to meta attribute.
Finally we can implement projection as Consumer
actor, an actor that consumes messages from camel endpoint. Projection actor simply applies provided projection specification (ProjectionSpec) to received event and finalizes message exchange by either sending acknowledgment or failure. To prevent processing of duplicated events concrete implementation of ProjectionSpec
must return sequence number of last processed event for given aggregareId
(see currentVersion
method):
Before pulling all the pieces together we need to register concrete broker as component of Camel. We will use ActiveMQ. Configuration of the ActiveMQ component is straightforward (see ActiveMQMessaging trait) And of course we need a runner class (EmbeddedActiveMQRunner) that starts the broker as embedded service.
Implementing projection
Now we can implement concrete projection specification. Let’s model ProductCatalog service inside Sales module (context) that maintains list of products with price. Whenever new Product is added to inventory (within Inventory module/context) it should also be added to product catalog in Sales module. Thus we need projection that listens to InventoryQueue and updates product catalog. We will skip implementation details of ProductCatalog
as accessing relational db is rather not very interesting topic (we use Slick for that purpose). InventoryProjection just calls insert
method of ProductCatalog
providing Product
data copied from the ProductAdded
event and empty price.
An integration test is available here: EventsPublishingClusterSpec. From first node we send two AddProduct
commands. Commands are handled by Product AR within inventory context and generated events are published (reliably) to InventoryQueue
that is available from all nodes in the cluster. On the second node we register InvetoryProjection
and wait for 1 second (so that published events have time to reach the queue) and then send GetProduct
query message to ProductFinder to check if expected product has been added to product catalog.
And surprisingly test succeeds 😉 !
Last but not least, I added experimental feature to our application: If command sender wants to be notified once the view has been updated he can request special delivery receipt!
Please see ProductAcknowledgedPublicationSpec:
// when
import DeliveryContext.Adjust._
office[Product] ! AddProduct("product-1", "product 1", Standard).requestDLR[ViewUpdated]
// then
expectReply(Acknowledged)
expectReply(ViewUpdated(ProductAdded("product 1", Standard)))
As homework you can find out how the feature is implemented and let me know your opinion about the idea 🙂
http://pkaczor.blogspot.com/2014/06/reactive-ddd-with-akka-projections.html
Paweł Kaczor
Software Developer, IT enthusiast.
Founder of Akka-DDD framework for scalable systems in DDD/CQRS/ES architecture. Mad about functional programming. Loves swimming in his free time, amateur chess player.
@PavelKaczor
pkaczor.blogspot.com
github.com/pawelkaczor
Comments