Building an Event Source System - Part 2

January 01, 2018

Continuing my previous post, weā€™ll be working to setup an event source system using NodeJS. Event source systems utilize a common log of events to manipulate system state in a repeatable and scalable way. By recording state changes in logs, the system doesnā€™t need to rely on a database requests to gather state information. This has a variety of benefits that Iā€™ve included previously.

I enjoy working with the JavaScript/TypeScript, React, and NodeJS stack and find these tools very expressive. Iā€™ve chosen them for this exploration because of their readability and rapid prototyping abilities. These concepts can easily be applied to other languages (Ruby/Java/.NET/etc.) and Iā€™ll try not to make things terribly complicated for those less familiar with my stack of choice.

Basic server setup

To get things started Iā€™d like to make a folder and import the dependencies to setup a NodeJS server using Express:

mkdir ness & cd ness
npm init -y
npm install --save body-parser express
npm install --save-dev @types/node ts-node typescript

Iā€™d like to note that Iā€™m using Node v8.6.0

Next Iā€™ll export the base server configuration in a new file /src/app.ts:

import * as express from "express"
import * as bodyParser from "body-parser"

const app = express()

// Support JSON encoded bodies
app.use(bodyParser.json())
app.use(bodyParser.urlencoded({ extended: true }))
app.set("json spaces", 2)

// TODO: register endpoints

export default app

The app is initialized using the Express library which instantiates a new application to provide REST endpoints. bodyParser provides some great middleware to automatically parse JSON payloads within requests.

This configuration can now be used to initialize the server in /index.ts:

import app from "./src/app"

app.listen(3000, () => console.log("Example app listening on port 3000!"))

The server can now be tested with the ts-node command imported earlier:

> ts-node index.ts

However, there arenā€™t any routes and so the server is just sitting there with no way to make requests against it. As the first route, Iā€™ll make a basic endpoint that will simply output the number of requests the server has received. Since Iā€™m likely to have more routes in the future, Iā€™ll define this route in its own file /src/root.ts:

import * as express from 'express'

export default class RootRoute {
  private route: express.Router
  private requests: number

  constructor() {
    this.route = express.Router()
    this.requests = 0

    this.route.get('/', (req, res) => {
      res.send(`Requests:${JSON.stringify(this.requests)}`)
      this.requests++
    })
  }

  getRouter(): express.Router {
    return this.route
  }
}

Note that requests here is a class member variable and always initializes to zero when the route is constructed. Then, after each request, I increment the value so that the next request will have the correct number to display.

Then itā€™s simple to register this route for the app configuration in /src/app.ts:

import RootRoute from './root'
...

app.use(new RootRoute().getRouter())

export default app

Now that thereā€™s something to test, restart the server: > ts-node index.ts

Running this code will display an incrementing number of requests as users hit the root endpoint at localhost:3000/. The code thus far creates a system that stores a variable in local memory which is updated at each request. The function of the update is more or less a pure but we still have a ways to go before this is a true event source system.

Adding Redux

First, I want to add a Redux layer to the application that will help define events, a store, and reducers that weā€™ll use to manipulate state. Since Redux already has a great workflow for managing state it makes sense to just build on top of that when defining the system.

If youā€™re unfamiliar with functional programming using Redux hereā€™s a link to their docs to get familiar with the basics.

> npm install --save redux

Iā€™ll start building out the implementation by defining an interface for the store in /src/core/interfaces/appState.ts

export default interface AppState {
  requests: number
}

And another interface for event actions at /src/core/interfaces/appAction.ts

interface AppAction {
  type: string;
  payload?: any;
}
export default AppAction

Now that Iā€™ve defined what attributes the store will contain, I can implement a reducer to setup the initial state in /src/core/reducers/reducer.ts

import AppState from "../interfaces/appState"
import AppAction from "../interfaces/appAction"

const initialState: AppState = { requests: 0 }

export const REQUESTED_ACTION = "@@ness/REQUESTED"

export const reducer = (state: AppState = initialState, action: AppAction) => {
  switch (action.type) {
    case REQUESTED_ACTION:
      return {
        ...state,
        requests: state.requests + 1,
      }
    default:
      return state
  }
}

This reducer processes requests similarly to the previous endpoint. Whenever the reducer receives an action of the REQUESTED_ACTION type, it will increment the request count and return the new state object. This state is initialized from the initialState variable which is used when there is no state defined (like when the application starts).

With a basic reducer and interface for the store, itā€™s time to initialize Redux. Iā€™ll need to define the store above the route level to be shared between all routes. Iā€™ll pass the store down into the route in /src/app.ts:

import { createStore } from 'redux'
import { reducer } from './core/reducers/reducer'
...

const store = createStore(reducer)

app.use(new RootRoute(store).getRouter())
...

This requires updating the RootRoute to make calls against the Redux store in /src/root.ts:

import { Store } from 'redux'
import AppAction from './core/interfaces/appAction'
import { REQUESTED_ACTION } from './core/reducers/reducer'
...

  constructor(store: Store<any>) {
    this.route = express.Router()

    this.route.get('/', (req, res) => {
      let action: AppAction = { type: REQUESTED_ACTION }
      res.send(`Requests:${JSON.stringify(store.getState().requests)}`)
      store.dispatch(action)
    })
  }
...

Testing this now should have the same result as the previous example but now utilizes Redux to provide a great framework for managing events.

Persisting Events to Files

Last but not least, itā€™s time to persist these events to a place where they wonā€™t be lost when the server restarts. Ideally this place is a server or service with a data backup but itā€™s easier to setup this configuration locally on our machine first. Having a local copy will also help validate and debug how events are fired and processed.

The goal is, on every change, to append the event to the end of a file that serves as the application history log. Then only update the serverā€™s state when the file changes. This enables servers to run consistently in parallel. To find the latest change to the file easily, Iā€™ll use another file as a buffer to just simply store the last change.

First, create those two files:

touch last.txt
touch log.txt

Next, itā€™s time to create a file system logging service to use throughout the application at /src/logging/filesystem.ts

import { readFile, appendFile, watchFile, writeFile } from 'fs'
import { Store } from 'redux'
import AppAction from '../core/interfaces/appAction'

const LOG_FILE = 'log.txt'
const LAST_FILE = 'last.txt'

export default class FileSystemLogger {
  constructor(private store: Store<any>) {}

  // Update state from log history
  restoreLogs() {
    readFile(LOG_FILE, (err, data) => {
      if (err) return console.log(err)
      let logs = data.toString().split('\n')
      logs.forEach(log => {
        if (log.length > 0)
          this.store.dispatch(JSON.parse(log))
      })
    })
    this.watchLog()
  }

  // Save a new action in the log
  saveLog(action: AppAction) {
    this.saveLast(action, () => {
      appendFile(LOG_FILE, '\n' + JSON.stringify(action), (err) => {
        if (err) return console.log(err)
        console.log('data appended to log')
      })
    })
  }

  // Save an action in the last buffer
  private saveLast(action: AppAction, cb: Function) {
    writeFile(LAST_FILE, JSON.stringify(action), (err) => {
      if (err) return console.log(err)
      cb()
    })
  }

  // Watch the log for changes to trigger a read of the last buffer
  private watchLog() {
    watchFile(LOG_FILE, (curr, prev) => {
      readFile(LAST_FILE, (err, data) => {
        if (err) return console.log(err);
        this.store.dispatch(JSON.parse(data.toString()))
      })
      console.log('file changed')
    })
  }
}

Now that thereā€™s a good class built out to handle the logging, itā€™s time to setup /src/app.ts to utilize it:

import FileSystemLogger from './logging/filesystem'
...

const store = createStore(reducer)
const logger = new FileSystemLogger(store)

logger.restoreLogs()
...

app.use(new RootRoute(store, logger).getRouter())
...

Constructing the FileSystemLogger with a reference to the store allows the logger to directly interface with the redux store. The logger is initialized, logs are restored on start up, and a reference is passed to the RootRoute. Time to update /src/root.ts:

import FileSystemLogger from './logging/filesystem'
import { reducer, REQUESTED_ACTION } from './core/reducers/meta.reducer'
...

constructor(store: Store<any>, logger: FileSystemLogger) {
  ...

  this.route.get('/', (req, res) => {
    let action: AppAction = {type: REQUESTED_ACTION}
    // run the reducer against the api action to render it
    let storePreview = reducer(store.getState(), action)
    res.send(`Hello World!\n${JSON.stringify(storePreview.requests)}`)

    logger.saveLog(action)
  })
...

Now that everything is in place itā€™s finally time to test the system. As new requests are made they populate into last.txt and log.txt. When log.txt is updated the Node server looks at the last.txt file and pulls the event to update the store. Then Redux runs its reducers and updates the local state. The requesting user doesnā€™t have to wait for all of this to occur and receives back a result instantly thanks to the server running the reducer first directly.

Whatā€™s Next?

This is a great place to start exploring the concept of event source systems. Since thereā€™s a simple example provided here, itā€™s easy to start inventing new endpoints and testing out new ideas for how to use these systems. One test I encourage trying is to run two of these systems side by side in two terminals on two different ports. Note how they accurately update together and also notice the latency that shows up when reading from the filesystem.

Iā€™m excited to keep exploring the architecture and will post again when I have something else cool to share. Next, Iā€™ll look at ways to replace the filesystem with a real service that can run on a separate machine. Also, Happy New Year!