Brewing Coffee : Setting up remaining Activities

If you haven’t read the previous post about setting up the Payment Activity, I highly recommend checking that out first.
In that post, we covered Temporal Activities in detail.
Here, we’ll follow a similar structure to create three new services:
inventory-service
brewing-service
counter-service
Step 1 : inventory-service
Handles supplying ingredients, materials like cups, coffee beans, sugar needed based on the coffee order.
Setup : Create the folder and files:
best-coffee-shop> mkdir inventory-service
best-coffee-shop> mkdir brewing-service
best-coffee-shop> touch inventory-service/activity.py inventory-service/main.py inventory-service/temporal_config.py
best-coffee-shop> touch brewing-service/activity.py brewing-service/main.py brewing-service/temporal_config.py
file inventory-service/temporal_config.py
TEMPORAL_ADDRESS = 'temporal:7233'
TASK_QUEUE = 'inventory-service'
file inventory-service/activity.py
from temporalio import activity
from dataclasses import dataclass
@dataclass
class CoffeeOrder:
customer_name: str
order_number: str
quantity: int = 1
@dataclass
class Supplies:
order_number: str
cups: int
coffee_beans: int
suger: int
CUP_FOR_COFFEE = 1
COFFEE_BEANS_PER_COFFEE = 25
SUGER_PER_COFFEE = 1
@activity.defn(name='GetSuppliesActivity')
async def get_supplies(order: CoffeeOrder) -> Supplies:
no_of_coffee = order.quantity
return Supplies(
order.order_number,
CUP_FOR_COFFEE * no_of_coffee,
COFFEE_BEANS_PER_COFFEE * no_of_coffee,
SUGER_PER_COFFEE * no_of_coffee
)
file : inventory-service/main.py
import asyncio
from temporalio.worker import Worker
from temporalio.client import Client
from activity import get_supplies
from temporal_config import TEMPORAL_ADDRESS, TASK_QUEUE
async def main():
print(TEMPORAL_ADDRESS)
client = await Client.connect(TEMPORAL_ADDRESS)
worker = Worker(
client,
task_queue=TASK_QUEUE,
activities=[get_supplies],
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
Step 2 : brewing-service
Take the supplies and “brews” the coffee (virtually).
Setup : Create the folder and files:
best-coffee-shop> mkdir brewing-service
best-coffee-shop> touch brewing-service/activity.py brewing-service/main.py brewing-service/temporal_config.py
file brewing-service/temporal_config.py
TEMPORAL_ADDRESS = 'temporal:7233'
TASK_QUEUE = 'brewing-service'
file brewing-service/activity.py
from temporalio import activity
from dataclasses import dataclass
@dataclass
class Supplies:
order_number: str
cups: int
coffee_beans: int
suger: int
@dataclass
class ReadyCoffeeOrder:
order_number: str
num_of_coffees: int
@activity.defn(name='BrewCoffeeActivity')
async def brew_coffee(supplies: Supplies) -> ReadyCoffeeOrder:
activity.logger.info("Brewing coffee with CoffeeBeans = {0} & sugar = {1}".format(supplies.coffee_beans, supplies.suger))
activity.logger.info("Now filling coffee in {0} cups".format(supplies.cups))
return ReadyCoffeeOrder(
supplies.order_number,
supplies.cups
)
file : brewing-service/main.py
import asyncio
from temporalio.worker import Worker
from temporalio.client import Client
from activity import brew_coffee
from temporal_config import TEMPORAL_ADDRESS, TASK_QUEUE
async def main():
print(TEMPORAL_ADDRESS)
client = await Client.connect(TEMPORAL_ADDRESS)
worker = Worker(
client,
task_queue=TASK_QUEUE,
activities=[brew_coffee],
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
Step 3 : counter-service
Manages customer interaction: requesting payment and serving coffee.
Please Note : In the next post, we will add REST interfaces so services can talk to customers over HTTP.
Setup : Create the folder and files:
best-coffee-shop> mkdir counter-service
best-coffee-shop> touch counter-service/main.py counter-service/activity.py counter-service/temporal_config.py
file counter-service/temporal_config.py
TEMPORAL_ADDRESS = 'temporal:7233'
TASK_QUEUE = 'counter-service'
file counter-service/activity.py
from temporalio import activity
from dataclasses import dataclass
@dataclass
class OrderBill:
order_number: str
amount: int
@dataclass
class BrewedCoffee:
order_number: str
num_of_coffees: int
@activity.defn(name='RequestPaymentActivity')
async def request_payment(bill: OrderBill) :
activity.logger.info("Hi please pay {0} to proceed".format(bill.amount))
@activity.defn(name='ServeCoffeeActivity')
async def serve_coffee(coffee: BrewedCoffee) :
activity.logger.info("Thank you, enjoy your cofee {0} ".format(coffee))
file counter-service/main.py
import asyncio
from temporalio.worker import Worker
from temporalio.client import Client
from activity import request_payment, serve_coffee
from temporal_config import TEMPORAL_ADDRESS, TASK_QUEUE
async def main():
print(TEMPORAL_ADDRESS)
client = await Client.connect(TEMPORAL_ADDRESS)
worker = Worker(
client,
task_queue=TASK_QUEUE,
activities=[request_payment, serve_coffee],
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
In counter service we defined two activities. One to request payment & other to serve coffee. Currently we haven’t implement interface so customer can communicate. We will do that in next post. For workflow execution let's assume, that activity invocation is enough.
Step 4 : update docker-compose.yml
Append following contents to docker-compose.yml
.....
inventory-service:
container_name: inventory-service
image: python:3
depends_on:
- temporal
volumes:
- ./inventory-service:/app
- ./requirements.txt:/requirements.txt
- ./init.sh:/init.sh
entrypoint: sh -c "chmod -R 755 /app && chmod -R 755 /init.sh && sh /init.sh"
networks:
- best-coffee-network
brewing-service:
container_name: brewing-service
image: python:3
depends_on:
- temporal
volumes:
- ./brewing-service:/app
- ./requirements.txt:/requirements.txt
- ./init.sh:/init.sh
entrypoint: sh -c "chmod -R 755 /app && chmod -R 755 /init.sh && sh /init.sh"
networks:
- best-coffee-network
counter-service:
container_name: counter-service
image: python:3
depends_on:
- temporal
volumes:
- ./counter-service:/app
- ./requirements.txt:/requirements.txt
- ./init.sh:/init.sh
entrypoint: sh -c "chmod -R 755 /app && chmod -R 755 /init.sh && sh /init.sh"
networks:
- best-coffee-network
Step 5 : Updated Orchestrator Workflow
Now time to put pieces together. Update workflow to following
file orchestrator-service/workflow.py
from temporalio import workflow
from datetime import timedelta
from dataclasses import dataclass
from temporal_config import TASK_QUEUE
@dataclass
class CoffeeOrder:
customer_name: str
order_number: str
quantity: int = 1
@dataclass
class OrderBill:
order_number: str
amount: int
@dataclass
class Supplies:
order_number: str
cups: int
coffee_beans: int
suger: int
@dataclass
class ReadyCoffeeOrder:
order_number: str
num_of_coffees: int
@workflow.defn(name="CoffeeOrderWorkflow")
class CoffeeOrderWorkflow:
@workflow.run
async def run(self, order: CoffeeOrder):
# Calculate Payment
payment_info: OrderBill = await workflow.execute_activity("BillCalculationActivity", order, task_queue="payment-service", start_to_close_timeout=timedelta(seconds=15))
# Request Payment
await workflow.execute_activity("RequestPaymentActivity", payment_info, task_queue="counter-service", start_to_close_timeout=timedelta(seconds=15))
# Collect Supplies
coffee_supplies: Supplies = await workflow.execute_activity("GetSuppliesActivity", order, task_queue="inventory-service", start_to_close_timeout=timedelta(seconds=15))
# Brew Coffee
ready_order: ReadyCoffeeOrder = await workflow.execute_activity("BrewCoffeeActivity", coffee_supplies, task_queue="brewing-service", start_to_close_timeout=timedelta(seconds=15))
# Serve Coffee
await workflow.execute_activity("ServeCoffeeActivity", ready_order, task_queue="counter-service", start_to_close_timeout=timedelta(seconds=15))
Step 6 : Try-it Out
Clean up old containers & run containers.
best-coffee-shop> docker-compose down
best-coffee-shop> docker system prune -a -f
best-coffee-shop> docker-compose up
Visit http://localhost:8080 (Temporal UI) Click Start Workflow → by providing following
Workflow ID = best-coffee-3
Task Queue = best-coffee-orders
Workflow Type = CoffeeOrderWorkflow
Data = {"customer_name": "Aniruddha", "order_number": "best-coffee-3", "quantity": 1}
Encoding = json/plain
Code for this post
You can find the code up to this 👉: Code repo
Subscribe to my newsletter
Read articles from Aniruddha directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
