The meaningfulness of Events via standardization ( Part 2 )
In this part of Event standardization, I'm going to follow part 1 ( here ) and show how we can better use standards to guarantee the promises established.
In Part 1 we talked about:
Event Envelope
Event Metadata
Event Data
Promises
These are the simple steps to prepare enterprise-level better governance and inter-component communication
But Asking why and how we can use them to better communicate and better govern the software, brings us toward this part ( part 2 )
Documentation:
The documentation for an event-driven design can be achieved via one or a mix of the following tools
AsyncApi: to document the stream and messaging contracts side of channel, protocol, etc …
Event Catalog: to document visually the event flow and communication between different components in a single or multiple bounded context.
Markdown Docs: This is a text-based, simple doc that provides a good starting when discovering new services.
Mermaid: This is a useful tool to schematize any kind of component relationship and flow diagram. you can draw via a simple syntax a collaboration, sequence, or flow diagram.
AsyncApi:
asyncApi document is a JSON / Yaml standard that allows providing Api documents like openApi but with a focus on async communication, Async API has the following parts:
Info
Servers
Channels
Components
Example:
---
asyncapi: 2.6.0
info:
version: 1.0.0
title: Event
servers:
eventTopic:
url: source-event-sns-topic
protocol: HTTP
channels:
notificationEvents:
servers:
- eventTopic
subscribe:
summary: Subscribe to receive notification events.
message:
$ref: '#/components/messages/NotificationMessage'
components:
messages:
NotificationMessage:
name: notification event
payload:
$ref: '#/components/schemas/Event'
schemas:
Event:
type: object
required:
- spacVersion
- source
- type
- time
- idempotencyKey
- id
properties:
spacVersion:
type: string
source:
type: string
time:
type: string
format: date-time
type:
type: string
enum:
- order.created
- order.udated
- order.rejected
idempotencyKey:
type: string
datacontentType:
type: string
enum:
- application/json
- application/*+avro
data:
type: object
properties:
orderId:
type: string
state:
type: string
enum:
- OrderCreated
- OrderCanceled
- OrderRefused
- OrderPaymentRejected
- OrderConfirmed
You can use Asyncapi studio or the vs code extension to render your definition and validate visually your contract creation process.
Event Catalog:
The Event Catalog is a great tool for generating multi-context communication visualization. you can define the following pars by just creating the folder structure.
Domains
Services
Events
This is a sample from the article code, we define the different domains and the related services and events inside each one. each domain, event, or service is represented by an index.md file.
An example of a domain:
---
name: Order Management
summary: |
Domain for Order Management
owners:
- omid.eidivandi
---
<Admonition>Domain for everything to do with Shopping at our business. Before adding any events or services to this domain make sure you contact the domain owners and verify it's the correct place.</Admonition>
### Details
This domain encapsulates everything in our business that has to do with shopping and users. This might be new items added to our online shop or online cart management.
<NodeGraph title="Domain Graph" />
The result can be generated and presented with a simple configuration file and event catalogue command line interface ( find more here )
Find out more by looking at the online demo: https://app.eventcatalog.dev/events/
Mermaid:
Mermaid is a great tool to cover basic diagrams as code for any level of project and the advantage is that can be integrated into most of the tools easily
sequenceDiagram
Order->>+Payment: NewOrderCreated!
Payment->>+Order: PaymentConfirmed!
The documentation represents a contract when designing distributed systems and event-driven design is not an exception. but this kind of documentation just accelerates the development phase and makes easier the evolutions in SDLC, what about run-time needs? what about promise protection?
To cover the tun time needs contract validation is the first step.
Validation:
Validation helps to confirm that an incoming piece of data at transit respects the promises established by a given contract.
For a successful validation process, one of the ways of guaranteeing the promise is versioning and that is why we had different versions based on different parts of events.
Reminder: in part one we introduced the following version items
SpecVersion: The Spec version represents the event standard at the enterprise level, this is related to the format of top-level elements of an event.
MetadataVersion: The Metadata version represents the version of the Metadata item and its content at the service level for All types of events.
DataVersion: The data version has a similar lifecycle as the metadata but its contract and promises are harder to define, this is mostly related to Domain, Aggregate, View, and Delta Events.
Who is responsible for event validation? The Producer must guarantee and respect the promised contract, but the validation preferably must be done at the producer side and consumer side, particularly when using the Carried-Event transfer State Pattern or data Aggregation pattern.
Who must know the validation details? The Validation details are better mastered by the producer as the producer better knows the state changes in the system and the proper validation for any state change.
The Documentation must help us to establish a validation process, in other words, the validation must be based on the documentation
Validate definition:
The first step in the validation process will be to validate the contract asyncApi definition, here we use the async API command line interface to validate the definition.
$ npm i -g asyncapi-cli
$ asycapi validate ./docs/streams/asyncapi.yaml
Validate Event Spec:
Till now our main goal was to introduce a contract and validate it as a well-defined document. now we will try to use that contract ( promises ) to validate our events in real-time.
in the validation section, we began with the question about Who is responsible for event validation. and the response was
The Producer must guarantee and respect the promised contract, but the validation preferably must be done at the producer side and consumer side, particularly when using the Carried-Event transfer State Pattern or data Aggregation pattern.
So here we are going to prepare and share an event validation process and let the consumers double-check the event as well.
To validate the event against the definition we first need to access those contract definitions
const { Parser, fromFile } = require('@asyncapi/parser');
const validator = require('./ajv-validator');
class SchemaValidator {
schema;
asyncapiParser;
defaultFile = 'asyncapi.yaml';
defaultversion = 'v1';
ajvValidator = validator;
constructor(version, path, file) {
this.file = file ?? this.defaultFile ;
this.version = version ?? this.defaultversion;
this.path = path;
this.asyncapiParser = new Parser();
}
getParser = async (path) => {
return await fromFile(this.asyncapiParser, path).parse();
}
initSchema = async (specVersion) => {
const ver = specVersion ?? this.version;
if( !this.schema ) {
const data = await this.getParser(`${this.path}/${ver}/${this.file}`);
this.schema = data.document.json();
}
}
}
module.exports = {
SchemaValidator
}
In the above snippet of code, we extract the definition schema as a JSON object, this is an object representing the above async API definition.
now we need to introduce an example event validation based on the schema, referring to the Serverless advocate article here using ajv seems a nice start, there are some other equivalents but here for simplicity, we continue with ajv
The Ajv validator as SchemaValidator
const Ajv = require('ajv');
const addFormats = require('ajv-formats');
const ajvOptions = { allErrors: true };
function validate(
obj,
schema,
key,
ref
) {
const ajv = new Ajv(ajvOptions);
addFormats(ajv);
ajv.addVocabulary([
'asyncapi',
'info',
'servers',
'channels',
'components',
'x-parser-api-version',
'x-parser-spec-parsed',
'x-parser-schema-id']);
ajv.addSchema(schema, key);
const valid = ajv.validate({ $ref: ref }, obj);
if (!valid) {
const errorMessage = JSON.stringify(ajv.errors);
throw new Error(errorMessage);
}
}
module.exports = { validate };
now we need a SpecValidator as below, this is the event validator that uses the above SchemaValidator as well as the @asyncapi/parser to parse the definition,
const { Parser, fromFile } = require('@asyncapi/parser');
const validator = require('./ajv-validator');
class SchemaValidator {
schema;
asyncapiParser;
defaultFile = 'asyncapi.yaml';
defaultversion = 'v1';
ajvValidator = validator;
constructor(version, path, file) {
this.file = file ?? this.defaultFile ;
this.version = version ?? this.defaultversion;
this.path = path;
this.asyncapiParser = new Parser();
}
getParser = async (path) => {
return await fromFile(this.asyncapiParser, path).parse();
}
initSchema = async (specVersion) => {
const ver = specVersion ?? this.version;
if( !this.schema ) {
const data = await this.getParser(`${this.path}/${ver}/${this.file}`);
this.schema = data.document.json();
}
}
validate = async (event, schemaPath) => {
await this.initSchema(this.version);
return this.ajvValidator.validate(event, this.schema, schemaPath.split('#')[0], schemaPath);
}
}
module.exports = {
SchemaValidator
}
The code introduces a simple ValidateSpec that validates just the received event against Event Schema.
Let’s Use it
Here the provided event will be validated against the definition via the validateSpec method.
const { SchemaValidator } = require('../../../asyncapi/schema-validator');
const { Version1 } = require('../versions');
const validator = new SchemaValidator(Version1, './shared/streams/order', 'asyncapi.yaml');
const body = {
specVersion: "1",
id: "jkhqskdjhqskdhqskdh",
idempotencyKey: 'AAAAAAAAAAAAAAAAAAAAAAAA',
source: 'ordermanagement:order',
time: "2023-01-01T12:54;00.000Z",
type: 'order.created',
datacontentType: 'application/json',
dataVersion: "1.0.0",
dataSchema: 'IntegrationEvent#/components/schemas/IntegrationEvent',
data:{
}
};
Promise.resolve(validator.validateSpec(
body,
body.ref
))
.then(() =>
console.log("Event respects the enterrise specification")
).catch(
(error) => {
const msg = { Errors: JSON.parse(error.message) };
console.log(msg)
}
);
The validator throws an exception with an error object or will pass as a success if the validation passes.
Validate Data:
To validate metadata we need to separate the metadata schema from the event spec schema by changing the asynapi definition
Let’s enrich the definition, first, let’s remove the metadata details from the Event schema
schemas:
Event:
type: object
required:
- specVersion
- type
- source
- time
- idempotencyKey
- id
properties:
specVersion:
type: string
scope:
type: string
type:
type: string
enum:
- order.created
- order.udated
- order.rejected
idempotencyKey:
type: string
datacontentType:
type: string
enum:
- application/json
- application/*+avro
ref:
type: string
metadata:
type: object
Now, we add OrderCreatedNotification Schema below
OrderEventMetadata:
type: object
properties:
metadata:
type: object
required:
- orderId
- state
properties:
orderId:
type: string
state:
type: string
enum:
- OrderCreated
- OrderCanceled
- OrderRefused
- OrderPaymentRejected
- OrderConfirmed
OrderCreatedEvent:
allOf:
- $ref: '#/components/schemas/Event'
- $ref: '#/components/schemas/OrderEventMetadata'
Now let’s validate an event against the OrderCreatedEvent
const { SchemaValidator } = require('../../../asyncapi/schema-validator');
const { Version1 } = require('../versions');
const validator = new SchemaValidator(Version1, './shared/streams/order', 'asyncapi.yaml');
const bodywithmetadta = {
specVersion: "1",
id: "jkhqskdjhqskdhqskdh",
idempotencyKey: 'AAAAAAAAAAAAAAAAAAAAAAAA',
source: 'ordermanagement:order',
time: "2023-01-01T12:54;00.000Z",
type: 'order.created',
datacontentType: 'application/json',
dataVersion: "1.0.0",
dataSchema: 'OrderCreatedEvent#/components/schemas/OrderCreatedEvent',
data:{
orderId: 'lj0OVlHFdHz9hHfgXUSoW',
state: 'OrderCreated'
}
};
Promise.resolve(validator.validate(
bodywithmetadta,
bodywithmetadta.ref
))
.then(() =>
console.log("Event respects the Order Notification Specifications")
).catch(
(error) => {
const msg = { Errors: JSON.parse(error.message) };
console.log(msg)
}
);
Here the output
Let’s validate an invalid event
const body = {
specVersion: "1",
id: "jkhqskdjhqskdhqskdh",
idempotencyKey: 'AAAAAAAAAAAAAAAAAAAAAAAA',
source: 'ordermanagement:order',
time: "2023-01-01T12:54;00.000Z",
type: 'order.created',
datacontentType: 'application/json',
dataVersion: "1.0.0",
dataSchema: 'OrderCreatedNotification#/components/schemas/OrderCreatedNotification',
data:{
orderId: 'lj0OVlHFdHz9hHfgXUSoW'
}
};
And here the output
Conclusion:
Distributed communication is an important part of the distributed design and in EDA we need to guarantee the promises, but this validation is better to be done on both producer/consumer sides, but keep in mind that this double validation must not bring more overhead and effort, a better approach is providing a promise as well the Apis in desired programming languages that are used in enterprise-level by the producer.
You can share the promises alongside:
AsyncApi Definition
Validators
Typed models ( like typescript )
schema path refs and automated discovery
Subscribe to my newsletter
Read articles from Omid Eidivandi directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by