Continuing my previous post, weāll be working to setup an event source system using NodeJS. Event source systems utilize a common log of events to manipulate system state in a repeatable and scalable way. By recording state changes in logs, the system doesnāt need to rely on a database requests to gather state information. This has a variety of benefits that Iāve included previously.
I enjoy working with the JavaScript/TypeScript, React, and NodeJS stack and find these tools very expressive. Iāve chosen them for this exploration because of their readability and rapid prototyping abilities. These concepts can easily be applied to other languages (Ruby/Java/.NET/etc.) and Iāll try not to make things terribly complicated for those less familiar with my stack of choice.
Basic server setup
To get things started Iād like to make a folder and import the dependencies to setup a NodeJS server using Express:
mkdir ness & cd ness
npm init -y
npm install --save body-parser express
npm install --save-dev @types/node ts-node typescript
Iād like to note that Iām using Node
v8.6.0
Next Iāll export the base server configuration in a new file /src/app.ts
:
import * as express from "express"
import * as bodyParser from "body-parser"
const app = express()
// Support JSON encoded bodies
app.use(bodyParser.json())
app.use(bodyParser.urlencoded({ extended: true }))
app.set("json spaces", 2)
// TODO: register endpoints
export default app
The app
is initialized using the Express library which instantiates a new application to provide REST endpoints. bodyParser
provides some great middleware to automatically parse JSON payloads within requests.
This configuration can now be used to initialize the server in /index.ts
:
import app from "./src/app"
app.listen(3000, () => console.log("Example app listening on port 3000!"))
The server can now be tested with the ts-node
command imported earlier:
> ts-node index.ts
However, there arenāt any routes and so the server is just sitting there with no way to make requests against it. As the first route, Iāll make a basic endpoint that will simply output the number of requests the server has received. Since Iām likely to have more routes in the future, Iāll define this route in its own file /src/root.ts
:
import * as express from 'express'
export default class RootRoute {
private route: express.Router
private requests: number
constructor() {
this.route = express.Router()
this.requests = 0
this.route.get('/', (req, res) => {
res.send(`Requests:${JSON.stringify(this.requests)}`)
this.requests++
})
}
getRouter(): express.Router {
return this.route
}
}
Note that requests here is a class member variable and always initializes to zero when the route is constructed. Then, after each request, I increment the value so that the next request will have the correct number to display.
Then itās simple to register this route for the app configuration in /src/app.ts
:
import RootRoute from './root'
...
app.use(new RootRoute().getRouter())
export default app
Now that thereās something to test, restart the server: > ts-node index.ts
Running this code will display an incrementing number of requests as users hit the root endpoint at localhost:3000/
. The code thus far creates a system that stores a variable in local memory which is updated at each request. The function of the update is more or less a pure but we still have a ways to go before this is a true event source system.
Adding Redux
First, I want to add a Redux layer to the application that will help define events, a store, and reducers that weāll use to manipulate state. Since Redux already has a great workflow for managing state it makes sense to just build on top of that when defining the system.
If youāre unfamiliar with functional programming using Redux hereās a link to their docs to get familiar with the basics.
> npm install --save redux
Iāll start building out the implementation by defining an interface for the store in /src/core/interfaces/appState.ts
export default interface AppState {
requests: number
}
And another interface for event actions at /src/core/interfaces/appAction.ts
interface AppAction {
type: string;
payload?: any;
}
export default AppAction
Now that Iāve defined what attributes the store will contain, I can implement a reducer to setup the initial state in /src/core/reducers/reducer.ts
import AppState from "../interfaces/appState"
import AppAction from "../interfaces/appAction"
const initialState: AppState = { requests: 0 }
export const REQUESTED_ACTION = "@@ness/REQUESTED"
export const reducer = (state: AppState = initialState, action: AppAction) => {
switch (action.type) {
case REQUESTED_ACTION:
return {
...state,
requests: state.requests + 1,
}
default:
return state
}
}
This reducer processes requests similarly to the previous endpoint. Whenever the reducer receives an action of the REQUESTED_ACTION
type, it will increment the request count and return the new state object. This state is initialized from the initialState
variable which is used when there is no state defined (like when the application starts).
With a basic reducer and interface for the store, itās time to initialize Redux. Iāll need to define the store above the route level to be shared between all routes. Iāll pass the store down into the route in /src/app.ts
:
import { createStore } from 'redux'
import { reducer } from './core/reducers/reducer'
...
const store = createStore(reducer)
app.use(new RootRoute(store).getRouter())
...
This requires updating the RootRoute
to make calls against the Redux store in /src/root.ts
:
import { Store } from 'redux'
import AppAction from './core/interfaces/appAction'
import { REQUESTED_ACTION } from './core/reducers/reducer'
...
constructor(store: Store<any>) {
this.route = express.Router()
this.route.get('/', (req, res) => {
let action: AppAction = { type: REQUESTED_ACTION }
res.send(`Requests:${JSON.stringify(store.getState().requests)}`)
store.dispatch(action)
})
}
...
Testing this now should have the same result as the previous example but now utilizes Redux to provide a great framework for managing events.
Persisting Events to Files
Last but not least, itās time to persist these events to a place where they wonāt be lost when the server restarts. Ideally this place is a server or service with a data backup but itās easier to setup this configuration locally on our machine first. Having a local copy will also help validate and debug how events are fired and processed.
The goal is, on every change, to append the event to the end of a file that serves as the application history log. Then only update the serverās state when the file changes. This enables servers to run consistently in parallel. To find the latest change to the file easily, Iāll use another file as a buffer to just simply store the last change.
First, create those two files:
touch last.txt
touch log.txt
Next, itās time to create a file system logging service to use throughout the application at /src/logging/filesystem.ts
import { readFile, appendFile, watchFile, writeFile } from 'fs'
import { Store } from 'redux'
import AppAction from '../core/interfaces/appAction'
const LOG_FILE = 'log.txt'
const LAST_FILE = 'last.txt'
export default class FileSystemLogger {
constructor(private store: Store<any>) {}
// Update state from log history
restoreLogs() {
readFile(LOG_FILE, (err, data) => {
if (err) return console.log(err)
let logs = data.toString().split('\n')
logs.forEach(log => {
if (log.length > 0)
this.store.dispatch(JSON.parse(log))
})
})
this.watchLog()
}
// Save a new action in the log
saveLog(action: AppAction) {
this.saveLast(action, () => {
appendFile(LOG_FILE, '\n' + JSON.stringify(action), (err) => {
if (err) return console.log(err)
console.log('data appended to log')
})
})
}
// Save an action in the last buffer
private saveLast(action: AppAction, cb: Function) {
writeFile(LAST_FILE, JSON.stringify(action), (err) => {
if (err) return console.log(err)
cb()
})
}
// Watch the log for changes to trigger a read of the last buffer
private watchLog() {
watchFile(LOG_FILE, (curr, prev) => {
readFile(LAST_FILE, (err, data) => {
if (err) return console.log(err);
this.store.dispatch(JSON.parse(data.toString()))
})
console.log('file changed')
})
}
}
Now that thereās a good class built out to handle the logging, itās time to setup /src/app.ts
to utilize it:
import FileSystemLogger from './logging/filesystem'
...
const store = createStore(reducer)
const logger = new FileSystemLogger(store)
logger.restoreLogs()
...
app.use(new RootRoute(store, logger).getRouter())
...
Constructing the FileSystemLogger
with a reference to the store allows the logger to directly interface with the redux store. The logger is initialized, logs are restored on start up, and a reference is passed to the RootRoute
. Time to update /src/root.ts
:
import FileSystemLogger from './logging/filesystem'
import { reducer, REQUESTED_ACTION } from './core/reducers/meta.reducer'
...
constructor(store: Store<any>, logger: FileSystemLogger) {
...
this.route.get('/', (req, res) => {
let action: AppAction = {type: REQUESTED_ACTION}
// run the reducer against the api action to render it
let storePreview = reducer(store.getState(), action)
res.send(`Hello World!\n${JSON.stringify(storePreview.requests)}`)
logger.saveLog(action)
})
...
Now that everything is in place itās finally time to test the system. As new requests are made they populate into last.txt
and log.txt
. When log.txt
is updated the Node server looks at the last.txt
file and pulls the event to update the store. Then Redux runs its reducers and updates the local state. The requesting user doesnāt have to wait for all of this to occur and receives back a result instantly thanks to the server running the reducer first directly.
Whatās Next?
This is a great place to start exploring the concept of event source systems. Since thereās a simple example provided here, itās easy to start inventing new endpoints and testing out new ideas for how to use these systems. One test I encourage trying is to run two of these systems side by side in two terminals on two different ports. Note how they accurately update together and also notice the latency that shows up when reading from the filesystem.
Iām excited to keep exploring the architecture and will post again when I have something else cool to share. Next, Iāll look at ways to replace the filesystem with a real service that can run on a separate machine. Also, Happy New Year!