Brewing Coffee : Setting up remaining Activities

AniruddhaAniruddha
4 min read

If you haven’t read the previous post about setting up the Payment Activity, I highly recommend checking that out first.

In that post, we covered Temporal Activities in detail.

Here, we’ll follow a similar structure to create three new services:

  • inventory-service

  • brewing-service

  • counter-service

Step 1 : inventory-service

Handles supplying ingredients, materials like cups, coffee beans, sugar needed based on the coffee order.

Setup : Create the folder and files:

best-coffee-shop> mkdir inventory-service
best-coffee-shop> mkdir brewing-service
best-coffee-shop> touch inventory-service/activity.py inventory-service/main.py inventory-service/temporal_config.py
best-coffee-shop> touch brewing-service/activity.py brewing-service/main.py brewing-service/temporal_config.py

file inventory-service/temporal_config.py

TEMPORAL_ADDRESS = 'temporal:7233'
TASK_QUEUE = 'inventory-service'

file inventory-service/activity.py

from temporalio import activity
from dataclasses import dataclass

@dataclass
class CoffeeOrder:
    customer_name: str
    order_number: str
    quantity: int = 1

@dataclass
class Supplies:
    order_number: str
    cups: int
    coffee_beans: int
    suger: int

CUP_FOR_COFFEE = 1
COFFEE_BEANS_PER_COFFEE = 25
SUGER_PER_COFFEE = 1

@activity.defn(name='GetSuppliesActivity')
async def get_supplies(order: CoffeeOrder) -> Supplies:
    no_of_coffee = order.quantity
    return Supplies(
        order.order_number,
        CUP_FOR_COFFEE * no_of_coffee,
        COFFEE_BEANS_PER_COFFEE * no_of_coffee,
        SUGER_PER_COFFEE * no_of_coffee
    )

file : inventory-service/main.py

import asyncio
from temporalio.worker import Worker
from temporalio.client import Client
from activity import get_supplies
from temporal_config import TEMPORAL_ADDRESS, TASK_QUEUE

async def main():
    print(TEMPORAL_ADDRESS)
    client = await Client.connect(TEMPORAL_ADDRESS)
    worker = Worker(
        client,
        task_queue=TASK_QUEUE,
        activities=[get_supplies],
    )
    await worker.run()

if __name__ == "__main__":
    asyncio.run(main())

Step 2 : brewing-service

Take the supplies and “brews” the coffee (virtually).

Setup : Create the folder and files:

best-coffee-shop> mkdir brewing-service
best-coffee-shop> touch brewing-service/activity.py brewing-service/main.py brewing-service/temporal_config.py

file brewing-service/temporal_config.py

TEMPORAL_ADDRESS = 'temporal:7233'
TASK_QUEUE = 'brewing-service'

file brewing-service/activity.py

from temporalio import activity
from dataclasses import dataclass

@dataclass
class Supplies:
    order_number: str
    cups: int
    coffee_beans: int
    suger: int

@dataclass
class ReadyCoffeeOrder:
    order_number: str
    num_of_coffees: int


@activity.defn(name='BrewCoffeeActivity')
async def brew_coffee(supplies: Supplies) -> ReadyCoffeeOrder:
    activity.logger.info("Brewing coffee with CoffeeBeans = {0} & sugar = {1}".format(supplies.coffee_beans, supplies.suger))
    activity.logger.info("Now filling coffee in {0} cups".format(supplies.cups))

    return ReadyCoffeeOrder(
        supplies.order_number,
        supplies.cups
    )

file : brewing-service/main.py

import asyncio
from temporalio.worker import Worker
from temporalio.client import Client
from activity import brew_coffee
from temporal_config import TEMPORAL_ADDRESS, TASK_QUEUE

async def main():
    print(TEMPORAL_ADDRESS)
    client = await Client.connect(TEMPORAL_ADDRESS)
    worker = Worker(
        client,
        task_queue=TASK_QUEUE,
        activities=[brew_coffee],
    )
    await worker.run()

if __name__ == "__main__":
    asyncio.run(main())

Step 3 : counter-service

Manages customer interaction: requesting payment and serving coffee.

Please Note : In the next post, we will add REST interfaces so services can talk to customers over HTTP.

Setup : Create the folder and files:

best-coffee-shop> mkdir counter-service
best-coffee-shop> touch counter-service/main.py counter-service/activity.py counter-service/temporal_config.py

file counter-service/temporal_config.py

TEMPORAL_ADDRESS = 'temporal:7233'
TASK_QUEUE = 'counter-service'

file counter-service/activity.py

from temporalio import activity
from dataclasses import dataclass

@dataclass
class OrderBill:
    order_number: str
    amount: int

@dataclass
class BrewedCoffee:
    order_number: str
    num_of_coffees: int

@activity.defn(name='RequestPaymentActivity')
async def request_payment(bill: OrderBill) :
    activity.logger.info("Hi please pay {0} to proceed".format(bill.amount))

@activity.defn(name='ServeCoffeeActivity')
async def serve_coffee(coffee: BrewedCoffee) :
    activity.logger.info("Thank you, enjoy your cofee {0} ".format(coffee))

file counter-service/main.py

import asyncio
from temporalio.worker import Worker
from temporalio.client import Client
from activity import request_payment, serve_coffee
from temporal_config import TEMPORAL_ADDRESS, TASK_QUEUE

async def main():
    print(TEMPORAL_ADDRESS)
    client = await Client.connect(TEMPORAL_ADDRESS)
    worker = Worker(
        client,
        task_queue=TASK_QUEUE,
        activities=[request_payment, serve_coffee],
    )
    await worker.run()

if __name__ == "__main__":
    asyncio.run(main())

In counter service we defined two activities. One to request payment & other to serve coffee. Currently we haven’t implement interface so customer can communicate. We will do that in next post. For workflow execution let's assume, that activity invocation is enough.

Step 4 : update docker-compose.yml

Append following contents to docker-compose.yml

  .....
  inventory-service:
    container_name: inventory-service
    image: python:3
    depends_on:
      - temporal
    volumes:
      - ./inventory-service:/app
      - ./requirements.txt:/requirements.txt
      - ./init.sh:/init.sh
    entrypoint: sh -c "chmod -R 755 /app && chmod -R 755 /init.sh && sh /init.sh"
    networks:
      - best-coffee-network
  brewing-service:
    container_name: brewing-service
    image: python:3
    depends_on:
      - temporal
    volumes:
      - ./brewing-service:/app
      - ./requirements.txt:/requirements.txt
      - ./init.sh:/init.sh
    entrypoint: sh -c "chmod -R 755 /app && chmod -R 755 /init.sh && sh /init.sh"
    networks:
      - best-coffee-network
  counter-service:
    container_name: counter-service
    image: python:3
    depends_on:
      - temporal
    volumes:
      - ./counter-service:/app
      - ./requirements.txt:/requirements.txt
      - ./init.sh:/init.sh
    entrypoint: sh -c "chmod -R 755 /app && chmod -R 755 /init.sh && sh /init.sh"
    networks:
      - best-coffee-network

Step 5 : Updated Orchestrator Workflow

Now time to put pieces together. Update workflow to following

file orchestrator-service/workflow.py

from temporalio import workflow
from datetime import timedelta
from dataclasses import dataclass
from temporal_config import TASK_QUEUE

@dataclass  
class CoffeeOrder:
    customer_name: str
    order_number: str
    quantity: int = 1

@dataclass
class OrderBill:
    order_number: str
    amount: int

@dataclass
class Supplies:
    order_number: str
    cups: int
    coffee_beans: int
    suger: int

@dataclass
class ReadyCoffeeOrder:
    order_number: str
    num_of_coffees: int

@workflow.defn(name="CoffeeOrderWorkflow")
class CoffeeOrderWorkflow:
    @workflow.run
    async def run(self, order: CoffeeOrder):
        # Calculate Payment
        payment_info: OrderBill = await workflow.execute_activity("BillCalculationActivity",  order, task_queue="payment-service", start_to_close_timeout=timedelta(seconds=15))

        # Request Payment
        await workflow.execute_activity("RequestPaymentActivity", payment_info, task_queue="counter-service", start_to_close_timeout=timedelta(seconds=15))

        # Collect Supplies
        coffee_supplies: Supplies = await workflow.execute_activity("GetSuppliesActivity", order, task_queue="inventory-service", start_to_close_timeout=timedelta(seconds=15))

        # Brew Coffee
        ready_order: ReadyCoffeeOrder = await workflow.execute_activity("BrewCoffeeActivity", coffee_supplies, task_queue="brewing-service", start_to_close_timeout=timedelta(seconds=15))

        # Serve Coffee
        await workflow.execute_activity("ServeCoffeeActivity", ready_order, task_queue="counter-service", start_to_close_timeout=timedelta(seconds=15))

Step 6 : Try-it Out

Clean up old containers & run containers.

best-coffee-shop> docker-compose down
best-coffee-shop> docker system prune -a -f
best-coffee-shop> docker-compose up

Visit http://localhost:8080 (Temporal UI) Click Start Workflow → by providing following

Workflow ID     = best-coffee-3
Task Queue      = best-coffee-orders
Workflow Type   = CoffeeOrderWorkflow
Data            = {"customer_name": "Aniruddha", "order_number": "best-coffee-3", "quantity": 1}
Encoding        = json/plain

Code for this post

You can find the code up to this 👉: Code repo

0
Subscribe to my newsletter

Read articles from Aniruddha directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Aniruddha
Aniruddha