One of the features we built into Ambit is a powerful automation feature to enable event-driven actions within the platform.
Automations are actions performed automatically when certain other actions take place (when A happens, then do B).
Here's how we built simple, robust, and scalable automation system.
Requirements
First, let's identify the requirements.
The automation system should be:
- Resilient
We don't want an incomplete/half-finished automation state.
Automations can vary in length, duration, and complexity.
We need to guarantee that they complete, considering worst-case scenarios like the API crashing halfway through.
- Modular
Reduce redundant code, make it extensible to add additional step types/actions.
- Observable
The underlying state should be simple to understand and easily queried.
AWS
Starting with requirement #1, we'll achieve this by using serverless AWS components that scale on their own and require little to no maintenance.
The most important things to keep in mind:
- Automation execution should be asynchronous — it should happen in the background, and shouldn't affect request-response lifecycle of the initial request.
- In the case our API crashes right as an automation is kicked off, the automation should still carry out when the server is restored.
To achieve this, we'll build an event-driven system by combining three AWS products: SNS, SQS, and Lambda.
SNS
SNS is a fast and cost-effective notification or pub/sub system.
It lets us quickly and cheaply publish messages, and connect various subscribers.
It will have no problem processing large numbers of automations.
However, SNS on its own isn't enough to ensure resilience.
It doesn't retain messages after they're sent.
Meaning, if our server goes down in the middle of an automation run, the notification will try to reach our down server, fail, and be lost.
We can account for this by combining it with a queue.
SQS
SQS is a serverless queue.
It integrates nicely with SNS, storing notifications and retrying them in the event of failure.
It's a simple and cost-effective way of adding retry capability to our notifications.
Lambda
Lambda is AWS's serverless function offering.
We'll create a Lambda function that subscribes to the queue and calls back to our API with notification data.
Setting up AWS
SNS
Connecting to SNS from the backend:
Wrapper around the AWS SDK:
class SnsService {
client: SNSClient;
constructor(private readonly configService: ConfigService) {
this.client = new SNSClient({
credentials: {
accessKeyId: configService.get("AWS_ACCESS_KEY_ID"),
secretAccessKey: configService.get("AWS_SECRET_ACCESS_KEY"),
},
region: configService.get("AWS_REGION"),
});
}
async publish(data: Omit<PublishCommandInput, "TopicArn">) {
const response = await this.client.send(
new PublishCommand({
...data,
TopicArn: this.configService.get("AWS_SNS_NOTIFICATIONS_TOPIC_ARN"),
})
);
return response;
}
}
A nicer interface around the SNS client to publish notifications with typing support:
class NotificationsService {
constructor(private readonly snsService: SnsService) {}
async publish<T extends NotificationType>(
workspaceId: string,
type: T,
data: NotificationPayloads[T]
) {
return await this.snsService.publish({
MessageAttributes: {
workspaceId: {
DataType: "String",
StringValue: workspaceId,
},
type: {
DataType: "String",
StringValue: type,
},
},
Message: JSON.stringify(data),
});
}
}
Now we can call it like this to publish to SNS.
await this.notificationsService.publish(
workspaceId,
"TAG_ASSIGN",
{
contactId: "ecf7f7fd-1de8-4b50-88ea-f688b300129a",
tagId: "c1612cc1-dc1d-4d20-82e9-3079b1e5d61f"
}
);
Lambda
The Lambda script is simple.
A batch of SQS messages is available at event.Records
.
We'll just send them as-is to our backend and process them there.
export const handler = async (event) => {
const response = await fetch("https://myapp.example.com/notifications/track", {
method: "POST",
headers: {
"Content-type": "application/json",
},
body: JSON.stringify(event.Records),
});
if (response.ok) {
return response;
} else {
throw new Error("An error occurred while processing batch.");
}
};
-
If Lambda returns a value, it's considered a success and the SQS batch will be deleted.
-
If it throws an error, it's considered an error and the SQS batch will be retried.
We return the response we get from our backend, so if the request times out or we get an error code, the batch will be retried.
Putting it together
A high-level overview of how data will move through AWS.
Notification flow through AWS services
Automation flow
Now let's drive application actions via these cloud services.
An overview of the flow:
Automation logic flow
- Business operations publish notifications to SNS.
- SNS notifications are queued and re-emitted to API.
- When a notification is received, check for automations that should trigger.
- If automation(s) are found, begin new automation runs, execute the first action, and emit an
AUTOMATION_PROGRESS_SUCCESS
notification. - When an
AUTOMATION_PROGRESS_SUCCESS
notification is received, execute the next action, and emit anotherAUTOMATION_PROGRESS_SUCCESS
notification. - Repeat step 5 until the last action has been executed.
Database schema
We'll define our business entities like this:
-
Automations
Top-level organizational unit.
-
Automation steps
Individual steps for each automation.
-
Automation runs
Individual instances of when the automation was executed.
Automations
Table automations
Column name | Type | Description |
---|---|---|
id | uuid | Primary key. |
workspace_id | uuid | Foreign key on containing workspace. |
name | text | Descriptive front-end facing name. |
is_active | boolean | Toggle whether automation is able to be run. |
version | integer | Gets incremented when automation is re-built. |
created_at | datetime | Creation timestamp. |
updated_at | datetime | Last updated timestamp. |
Automation Steps
Table automation_steps
Stores linked steps within an automation.
Each automation has a single trigger, and one or more actions.
A step is a trigger if:
parent_automation_step_id
is null.trigger_type
is defined.
A step is an action if:
parent_automation_step_id
is defined.trigger_type
is null.action_type
is defined.
Column name | Type | Description |
---|---|---|
id | uuid | Primary key. |
automation_id | uuid | Foreign key on containing automation. |
parent_automation_step_id | uuid | Recursive foreign key on the parent automation step. |
trigger_type | text | Defined if this step is the trigger, which kicks off the automation. Else null. |
action_type | text | Defined if this step is an action that performs work. Else null. |
inputs | jsonb | Parameters for the executed code. |
version | integer | At time of creation gets the value of the parent automation's version. |
index | integer | Tracks ordering of steps in chain. |
Automation Runs
Table automation_runs
Column name | Type | Description |
---|---|---|
id | uuid | Primary key. |
contact_id | uuid | Foreign key on the contact that kicked off the automation. |
automation_id | uuid | Foreign key on the automation this run is for. |
automation_step_id | uuid | Foreign key for the automation step this run is currently on. |
status | text | Tracks run completion status. |
created_at | datetime | Creation timestamp. |
updated_at | datetime | Last updated timestamp. |
Writing some code
The backend logic is initiated when the Lambda callback endpoint is hit.
This is what the handler looks like:
async handle<T extends NotificationType>(data: {
workspaceId: string;
type: T;
payload: NotificationPayloads[T];
messageId: string;
}) {
if (this.triggerExistsForNotificationType(data.type)) {
const triggerType = this.mapNotificationsToTriggers[data.type];
const contactId = this.extractContactIdFromNotification[data.type](
data.payload,
);
const openAutomationTriggers = await this.findAutomationsByTrigger(
data.workspaceId,
triggerType,
);
for (const automation of openAutomationTriggers) {
await this.beginAutomationRun(
data.workspaceId,
contactId,
automation.automation_id,
);
}
} else {
switch (data.type) {
case "AUTOMATION_PROGRESS_SUCCESS":
await this.executeAutomationRun(
data.workspaceId,
(data.payload as AutomationProgressSuccessPayload)
.automationRunId,
);
break;
default:
break;
}
}
}
Notification handler
- Map notification types to automation trigger types. Check if the incoming notification type maps to an automation type.
- If yes, retrieve the automation trigger type and extract the ID of the contact for whom the action was performed.
- Find automations that are valid for starting a new run.
- Begin a new automation run for each.
- If no at 2, check if the notification is driving an in-progress automation run.
- If the notification is of type
AUTOMATION_PROGRESS_SUCCESS
, continue the in-progress automation run.
mapNotificationsToTriggers: Record<NotificationType, AutomationTriggerType> = {
CONTACT_ADD: "CONTACT_ADD",
TAG_ASSIGN: "TAG_ASSIGN",
CONTACT_PIPELINE_MOVE: "STAGE_ENTER",
CONTACT_PIPELINE_ENTER: "PIPELINE_ENTER",
CONTACT_PIPELINE_EXIT: "PIPELINE_EXIT",
CONTACT_PIPELINE_STAGE_EXIT: "STAGE_EXIT",
FORM_SUBMISSION: "FORM_SUBMISSION",
};
Map notification types to trigger types
extractContactIdFromNotification: Record<
NotificationType,
(payload: NotificationPayloads[NotificationType]) => string
> = {
TAG_ASSIGN: (payload: TagAssignPayload) => payload.contactId,
CONTACT_ADD: (payload: ContactAddPayload) => payload.id,
CONTACT_PIPELINE_ENTER: (payload: ContactPipelineEnterPayload) =>
payload.contactId,
CONTACT_PIPELINE_EXIT: (payload: ContactPipelineExitPayload) =>
payload.contactId,
CONTACT_PIPELINE_STAGE_EXIT: (payload: ContactPipelineStageExitPayload) =>
payload.contactId,
FORM_SUBMISSION: (payload: any) => payload.contactId,
CONTACT_PIPELINE_STAGE_ENTER: (payload: ContactPipelineStageEnterPayload) =>
payload.contactId,
AUTOMATION_PROGRESS_SUCCESS: (payload: AutomationProgressSuccessPayload) =>
payload.contactId,
};
Helper fns to extract contact ID from each notification payload type
Executing work
How do we actually execute code?
Let's take a look at the function executeAutomationRun
.
But first...
There's one more database table I didn't mention.
Table automation_delays
Column name | Type | Description |
---|---|---|
id | uuid | Primary key. |
automation_run_id | uuid | Foreign key on automation run that created delay. |
automation_step_id | uuid | Foreign key on corresponding DELAY automation step. |
status | text | Delay status. |
triggers_at | datetime | When the delay should trigger. |
created_at | datetime | Creation timestamp. |
This table drives delay type automation steps.
These are automation steps that pause the automation for a period of time.
Simply set the triggers_at
column for when the automation should be continued.
Use a cron job to periodically poll the database and resume the automation for delays after their trigger date.
Let's take a look at the code.
async executeAutomationRun(workspaceId: string, automationRunId: string) {
const automationRun = /* Find automation run with ID */
if (!automationRun) {
throw new NotFoundException(
"No in-progress automation instance was found with the supplied ID.",
);
}
switch (automationRun.action_type) {
case "DELAY":
await this.createDelay(
workspaceId,
automationRun.automation_run_id,
automationRun.inputs,
);
break;
default:
try {
await this.executeAutomationStep(
workspaceId,
automationRun,
);
catch (error) {}
await this.advanceAutomationRun(
workspaceId,
automationRun,
);
break;
}
}
Executing an automation run
- Find the automation run with the supplied ID.
- Check its type.
- If it's a delay type automation, create a new automation delay to postpone the flow.
- Otherwise, execute the automation step's work and advance the automation run.
Command and Conquer
The last missing piece is a robust system for dynamically executing chunks of code.
The command pattern solves this for us.
It lets us store units of code execution as business entities.
Let's write the interface for a command.
interface ICommand<D, R> {
execute(
workspaceId: string,
resourceId: string | undefined,
data: D | undefined,
metadata?: unknown
): Promise<R>;
getType(): CommandType;
}
We'll implement this interface every time we create a new command.
Let's also create some command types:
const CommandTypeValues = [
"CREATE_CONTACT",
"ASSIGN_TAG",
"UNASSIGN_TAG",
"ADD_CONTACT_TO_PIPELINE",
"MOVE_CONTACT_TO_STAGE",
"REMOVE_CONTACT_FROM_PIPELINE",
"SEND_EMAIL",
] as const;
type CommandType = (typeof CommandTypeValues)[number];
Up next is an example implementation, for the CREATE_CONTACT
command.
Notice we can perform side-effects alongside the main business action.
This is how we'll publish the initialnotifications that will kick off the automation process.
class CreateContactCommand implements ICommand<InsertContactData, Contact> {
constructor(private readonly contactsService: ContactsService) {}
async execute(
workspaceId: string,
_resourceId: undefined,
data: InsertContactData
) {
const contact = await this.contactsService.create(workspaceId, data);
try {
await this.notificationsService.publish(
workspaceId,
"CONTACT_ADD",
contact,
);
return contact;
} catch (error) {
await this.contactsService.destroy(
workspaceId,
contact.id,
);
throw error;
}
}
getType(): CommandType {
return "CREATE_CONTACT";
}
}
Command example implementation
To recap, the command pattern gives us several benefits:
- Standard interface for executing well-defined "actions" within our application.
- Can write complex logic, potentially combining methods from multiple services, into a single logical unit of work.
- Can add in additional side-effects when the command is run.
This is the service method we'll use to dispatch these commands:
async execute<D, R>(
commandConstructor: Type<ICommand<D, R>>,
workspaceId: string,
data: ExecuteCommandData<D, R>,
): Promise<R> {
const commandInstance: ICommand<D, R> =
this.moduleRef.get(commandConstructor);
if (!commandInstance)
throw new Error(
"Error: Requested command not found in module context.",
);
const commandRecord = await this.track(workspaceId, {
commandType: commandInstance.getType(),
issuerType: data.issuerType,
issuerId: data.issuerId,
resourceId: data.resourceId ?? null,
data: data.data ?? null,
metadata: data.metadata ?? null,
});
try {
const commandResult = await commandInstance.execute(
workspaceId,
data.resourceId,
data.data,
data.metadata,
);
await this.update(workspaceId, commandRecord.id, {
status: "COMPLETED",
});
return commandResult;
} catch (error) {
await this.update(workspaceId, commandRecord.id, {
status: "FAILED",
error: error,
});
throw error;
}
}
CommandService::execute
implementation
This function does three things:
- Grab an instance of the specified command
- Create a new record in the
commands
table pending execution - Attempt to run the command's work
- Update the
commands
record toCOMPLETED
if successful,FAILED
if an error occurred (and re-throw the error)
Finally, to execute a command, it looks like this:
await this.commandsService.execute(
CreateContactCommand,
workspaceId,
{
issuerType: "USER",
issuerId: userId,
data: createContactDto,
},
),
This will run CreateContactCommand
's execute
function, return the value from it, and as a side-effect, log its execution.
Executing automation steps
We'll use commands to execute actions from automations.
This is the function executeAutomationStep
that gets called from executeAutomationRun
.
async executeAutomationStep(
workspaceId: string,
automationRun: AutomationRun & Automation & AutomationStep,
) {
switch (automationRun.action_type) {
case "TAG_ASSIGN":
return await Promise.any(
automationRun.inputs.tagIds.map((tagId) =>
this.commandsService.execute(
AssignTagCommand,
workspaceId,
{
issuerType: "AUTOMATION",
issuerId: automationRun.automation_id,
resourceId: automationRun.contact_id,
data: { tagId },
},
),
),
);
case "TAG_REMOVE":
return await Promise.any(
automationRun.inputs.tagIds.map((tagId) =>
this.commandsService.execute(
UnassignTagCommand,
workspaceId,
{
issuerType: "AUTOMATION",
issuerId: automationRun.automation_id,
resourceId: automationRun.contact_id,
data: { tagId },
},
),
),
);
case "CONTACT_UPDATE":
return await this.commandsService.execute(
UpdateContactCommand,
workspaceId,
{
issuerType: "AUTOMATION",
issuerId: automationRun.automation_id,
resourceId: automationRun.contact_id,
data: automationRun.inputs,
},
);
case "ADD_TO_PIPELINE_STAGE":
return await this.commandsService.execute(
AddContactToPipelineCommand,
workspaceId,
{
issuerType: "AUTOMATION",
issuerId: automationRun.automation_id,
data: {
contactId: automationRun.contact_id,
pipelineStageId:
automationRun.inputs.pipelineStageId,
},
},
);
case "REMOVE_FROM_PIPELINE":
return await this.commandsService.execute(
RemoveContactFromPipelineCommand,
workspaceId,
{
issuerType: "AUTOMATION",
issuerId: automationRun.automation_id,
resourceId: automationRun.inputs.pipelineId,
data: {
contactId: automationRun.contact_id,
},
},
);
case "SEND_EMAIL":
return await this.commandsService.execute(
SendEmailCommand,
workspaceId,
{
issuerType: "AUTOMATION",
issuerId: automationRun.automation_id,
resourceId: automationRun.inputs.emailId,
data: { contactId: automationRun.contact_id },
},
);
case "END":
return;
default:
return;
}
}
Executing automation steps
- Check the type of the current automation step.
- Execute a command for each type.
- Pass in the automation step's
inputs
as parameters to the command.
And we're done!
To add new step types, all we have to do is add cases to this switch statement.
Querying
We've addressed requirements 1 and 2.
💡We solved the resiliency requirement by bringing in serverless AWS components that will scale with usage.
💡We solved the modularity requirement by using the command pattern and map structures to create a unified interface for executing automation work, with limited edge cases/conditionals.
💡The third requirement of observability is solved for us due to our database design!
The automation_runs
table is useful for reporting.
Some example queries to run:
-
Filter by contact ID to get a contact's history of automations.
-
Filter by workspace ID to and aggregate to summarize in-progress or completed automations.
-
Additionally filter by status to find prior automation runs that were completed, or runs that are in progress.
Conclusion
Thank you for reading! I hope you enjoyed this deep dive.
In the interest of brevity, I only covered the core components in this article.
Some behavior I didn't mention is:
- Building automations from JSON data (to connect to a front-end builder, for example).
- Managing versioning, so that updates to automations don't interrupt existing runs.
- Aborting/purging in-progress automation runs.
Shoot me an email or DM me on LinkedIn if you have any questions or comments, or would just like to get in touch!