styx

A simple event sourcing library written in Scala.

View on GitHub

styx

A simple event sourcing library written in Scala.

What is Event Sourcing?

E.S. is the idea of persisting immutable events of a domain object instead of its actual state:

Although you can save and get the last state into a database, you can also easily replay the events of the stream to get the state of your domain object.

Example

There are a few problems with E.S., such as concurrency and side-effects while replaying your events. Styx can help you build your application using E.S., thus avoiding these common problems. ;)

Overview

Example

The following example is based on: https://ookami86.github.io/event-sourcing-in-practice/

The state

case class BankAccount(aggregationId: AggregationId) extends DynamicData with State

The event handler

object BankAccountEventHander {
  //you can always change the event store implementation, for instance:
  // implicit val eventHandler: EventHander[BankAccount] = new CassandraEventHandlerFetcher[BankAccount]
  implicit val eventHandler: EventHandler[BankAccount] with EventFetcher[BankAccount] = MongoDBEventHandlerFetcher(MongoD.collection, mapper, converter)
}

The events

case class BankAccountCreated(override val revision: Long, override val eventDate: Date = new Date()) extends Event[BankAccount](revision) {
  def applyTo(account: BankAccount): BankAccount = {
    val newAccount = BankAccount(revision, account.aggregationId)
    newAccount.balance = 0
    newAccount.id = this.id
    newAccount.status = "ACTIVE"
    newAccount.owner = this.owner
    newAccount
  }

  override def canApply(state: BankAccount): Valid = validation(state.status == null, "this account is already created")
}
case class DepositPerformed(override val revision: Long, override val eventDate: Date = new Date()) extends Event[BankAccount](revision) {
  def applyTo(account: BankAccount): BankAccount = {
    val newAccount = BankAccount(revision, account.aggregationId)
    account copyTo newAccount
    newAccount.balance = account.balance[Int] + this.amount[Int]
    newAccount
  }
}
case class OwnerChanged(override val revision: Long, override val eventDate: Date = new Date()) extends Event[BankAccount](revision) {
  def applyTo(account: BankAccount): BankAccount = {
    val newAccount = BankAccount(revision, account.aggregationId)
    account copyTo newAccount
    newAccount.owner = this.newOwner
    newAccount
  }
}
case class WithdrawalPerformed(override val revision: Long, override val eventDate: Date = new Date()) extends Event[BankAccount](revision) {
  def applyTo(account: BankAccount): BankAccount = {
    val newAccount = BankAccount(revision, account.aggregationId)
    account copyTo newAccount
    newAccount.balance = account.balance[Int] - this.amount[Int]
    newAccount
  }

  override def canApply(state: BankAccount): Event.Valid = validation((state.balance[Int] - this.amount[Int]) >= 0,
    s"the account cannot have a balance lower than zero. current balance: ${state.balance[Int]}, withdrawal amount: ${this.amount[Int]}")
}
case class BankAccountClosed(override val revision: Long, override val eventDate: Date = new Date()) extends Event[BankAccount](revision) {
  def applyTo(account: BankAccount): BankAccount = {
    val newAccount = BankAccount(revision, account.aggregationId)
    account copyTo newAccount
    newAccount.closeReason = this.closeReason
    newAccount.status = "CLOSED"
    newAccount
  }

  override def canApply(state: BankAccount): Event.Valid = validation(!state.status.equals("CLOSED"), "this account is already closed")
}

The commands

class CreateAccountCommand(implicit override val executionContext: ExecutionContext) extends Command[Request, BankAccount] {
  override def event: EventProduce = (request) => (state) => {
    Future {
      val event = BankAccountCreated(state.lastEventVersion + 1)
      event.id = request.id
      event.owner = request.owner
      event
    }
  }

  override def execute: ExecutionProduce = (request) => (state) => Future.successful()
}
class DepositCommand(implicit override val executionContext: ExecutionContext) extends Command[Request, BankAccount] {
  override def event: EventProduce = (request) => (state) => {
    Future {
      val event = DepositPerformed(state.lastEventVersion + 1)
      event.amount = request.amount
      event
    }
  }

  override def execute: ExecutionProduce = (request) => (state) => Future.successful()
}
class ChangeOwnerCommand(implicit override val executionContext: ExecutionContext) extends Command[Request, BankAccount] {
  override def execute: ExecutionProduce = (request) => (state) => Future.successful()

  override def event: EventProduce = (request) => (state) => Future {
    val event = OwnerChanged(state.lastEventVersion + 1)
    event.newOwner = request.newOwner
    event
  }
}
class WithdrawalCommand(implicit override val executionContext: ExecutionContext) extends Command[Request, BankAccount] {
  override def event: EventProduce = (request) => (state) => {
    Future {
      val event = WithdrawalPerformed(state.lastEventVersion + 1)
      event.amount = request.amount
      event
    }
  }

  override def execute: ExecutionProduce = (request) => (state) => Future.successful()
}
class CloseCommand(implicit override val executionContext: ExecutionContext) extends Command[Request, BankAccount] {
  override def event: EventProduce = (request) => (state) => {
    Future {
      val event = BankAccountClosed(state.lastEventVersion + 1)
      event.closeReason = request.reason
      event
    }
  }

  override def execute: ExecutionProduce = (request) => (state) => Future.successful()
}

Commands as functions

object BankAccountCommands {
  implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))

  val createAccount: ExecutionRequest[Request, BankAccount] = new CreateAccountCommand
  val withdrawal: ExecutionRequest[Request, BankAccount] = new WithdrawalCommand
  val deposit: ExecutionRequest[Request, BankAccount] = new DepositCommand
  val changeOwner: ExecutionRequest[Request, BankAccount] = new ChangeOwnerCommand
  val close: ExecutionRequest[Request, BankAccount] = new CloseCommand
}

Running

class EventSourcingTest extends FeatureSpec with Matchers {
   feature("Creating an account") {
     scenario("withdrawing more money than the balance has should throw an exception") {
       implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3))
 
       val aggregationId = UUID.randomUUID().toString
 
       val result = List.range(0, 500).map { i =>
         val eventualBankAccount = for {
           account <- createAccount(Request("owner" -> "John Doe", "id" -> 123))(BankAccount(0, aggregationId))
           account <- deposit(Request("amount" -> 20))(account)
           account <- changeOwner(Request("newOwner" -> "Jane Doe"))(account)
           account <- withdrawal(Request("amount" -> 10))(account)
           account <- withdrawal(Request("amount" -> 10))(account)
           account <- withdrawal(Request("amount" -> 10))(account)
           account <- close(Request("reason" -> "Unavailable address"))(account)
         } yield account
 
         an[InvalidExecutionException] should be thrownBy Await.result(eventualBankAccount, 1000 millis)
 
         val eventualSeq = Await.result(eventHandler.get(aggregationId), 1 minute)
         val state = eventualSeq.play(BankAccount(0, aggregationId))
 
         state.balance[Int] shouldBe 0
         state.status[String] shouldNot be("CLOSED")
       }
     }
 
     scenario("assert that replaying restores the actual state of the BankAccount object") {
       implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(20))
 
       val result = List.range(0, 20).map { i =>
         val aggregationId = UUID.randomUUID().toString
 
         val eventualBankAccount = for {
           account <- createAccount(Request("owner" -> "John Doe", "id" -> 123))(BankAccount(0, aggregationId))
           account <- deposit(Request("amount" -> 20))(account)
           account <- changeOwner(Request("newOwner" -> "Jane Doe"))(account)
           account <- withdrawal(Request("amount" -> 10))(account)
           account <- close(Request("reason" -> "Unavailable address"))(account)
         } yield account
 
         val result: Future[BankAccount] = eventualBankAccount.andThen {
           case Success(state) => eventHandler.get(aggregationId).play(BankAccount(0, aggregationId))
         }
 
         eventualBankAccount -> result
       }
 
       result.foreach(f => {
         val (eventualActualState, eventualPlayedState) = f
 
         val playedState = Await.result(eventualPlayedState, 60 minutes)
         val actualState = Await.result(eventualActualState, 60 minutes)
 
         playedState shouldBe actualState
       })
     }
   }
}