How to Integrate Your Services Using GCP Pub-Sub

jorzeljorzel
Sep 23, 2024·
11 min read

Pub-Sub (Publish-subscribe) is a great pattern to integrate components in a distributed system. It provides decoupling, allowing services to communicate without depending on each other directly. It enhances resiliency by ensuring reliable message delivery, even in the event of service disruptions or failures.

In this post, I will show you how to use Google Cloud (GCP) Pub-Sub to integrate two services written in Python. The post will tackle GCP cloud configuration and deployment of services using a simple Docker Compose setting.

About GCP Pub-Sub

Google Cloud Pub-Sub is a popular and powerful tool for implementing event-driven architectures in distributed services. It provides a highly scalable and flexible way to decouple services, allowing components to communicate asynchronously without tight coupling.

How does it work?

  1. A component (service or application) can publish messages to a specific topic (a channel where messages are sent).

  2. These messages can then be consumed by other services (subscribers) that are listening to that topic.

However, there’s an important distinction between subscriptions and subscribers that makes this solution highly scalable. A subscription is a named resource representing the stream of messages from a topic to be consumed by subscribers. A Subscriber is a client (like a service instance) that pulls or receives messages from a subscription. If you have multiple instances of the same service (for instance, running as replicas), each instance acts as a subscriber.

Suppose Service P publishes messages to Topic X and you want Service A (which runs 2 replicas) to subscribe to those messages. In that case, you will create a subscription A to Topic X. Each replica of Service A will act as a subscriber to Subscription A. Pub/Sub ensures that each message is delivered to only one subscriber within a subscription. If you need another service B to receive Topic X messages, you must create a new subscription.

Solution Architecture

Let's consider a reservation system to present how this solution works using GCP Pub-Sub. The Reservation Service publishes a message when a new reservation is made. The Reporting Service subscribes to the topic and consumes the reservation messages to generate reports or update analytics. It can run in several replicas to handle the increased load or for redundancy.

GCP project

To create a pub-sub instance, we need a project in GCP. We can create a project in GCP by following the steps below (I assume that you have a Google Account associated with your @gmail.com mail address).

  1. Go to: https://console.cloud.google.com/.

  2. Click on projects selection in the top-left corner.

  3. Choose New project.

  4. Specify the project name and location and click Create.

We have our project created!

If you are afraid of the cost of cloud services, you can use the free tier of GCP or the free trial period. Currently, I am on 90 days free of charge period

Messages Dispatching

Our project is ready. The next thing we need to do is to create a pub-sub topic. We will use it to send messages to the cloud.

  1. Using the page searcher we navigate to the pub/sub section.

  1. Click Create topic button.

  2. Define the topic name. The name should be meaningful and properly represent messages published on the topic. I chose reservation-created. We need a subscription that is related to the topic but we will tackle it in the Message Consumption section. For our simple example, other options (retention, schema, etc.) can be omitted.

Now, we need a publisher code. We will use google-cloud-pubsub package to create GCPPublisherClient.

# reservations-service/app.py
import structlog

from google.cloud import pubsub_v1


class ReservationCreatedEvent:
    def __init__(self, reservation_id):
        self.reservation_id = reservation_id

    def to_dict(self):
        return {
            "time": datetime.now().isoformat(),
            "type": "reservation_created",
            "data": {
                "reservation_id": self.reservation_id
            }
        }

class GCPPublisherClient:
    def __init__(self, google_cloud_project, topic_id):
        """
            :param google_cloud_project: Google project ID
            :param topic_id: Topic ID
        """
        self._pub_client = pubsub_v1.PublisherClient()
        self._pub_topic = self._pub_client.topic_path(google_cloud_project, topic_id)
        self._logger = structlog.get_logger()

    def publish(self, event):
        event_payload = json.dumps(event.to_dict()).encode("utf-8")
        self._logger.info("Publishing event", data=event_payload)
        future = self._pub_client.publish(
            self._pub_topic, data=event_payload,
        )
        r = future.result()
        self._logger.info("Message published successfully", r=r)

The ReservationCreatedEvent is a class that represents the event we want to send. It has an reservation_id attribute and timestamp when the event was created.

In the publisher code, we used future.result() to wait for the message to be sent.

# reservations-service/app.py

import json
import os

import structlog
from flask import Flask

PROJECT_ID = os.environ.get('PROJECT_ID')
TOPIC_ID = os.getenv('TOPIC_ID')

app = Flask(__name__)

@app.route('/reservations', methods=['POST'])
def create_reservation():
    publisher_client = GCPPublisherClient(PROJECT_ID, TOPIC_ID)

    # some logic checking if reservation can be created

    event = ReservationCreatedEvent(str(uuid.uuid4()))
    publisher_client.publish(event)
    return "Reservation created", 201

We exposed the API to create reservations using Flask framework. The reservation-created event will be sent at the end of /reservations POST endpoint handler. The endpoint is simplified to focus only on the pub-sub integration part.

The docker-compose.yml file is used to define the service configuration and deployment.

# docker-compose.yml

version: "3.7"

services:
  reservations-service:
    image: reservations-service:v0
    ports:
      - "8080:8080"
    environment:
      - TOPIC_ID=reservation-created
      - PROJECT_ID=gcp-pubsub-guide

We exposed 8080 port to make HTTP requests to the service.

The reservation-service image was defined in Dockerfile:

FROM python:3.11-slim-bookworm
COPY app.py requirements.txt /
RUN pip install -r requirements.txt
EXPOSE 8080
CMD ["flask", "run", "-h", "0.0.0.0", "-p", "8080"]

And built using the command (within reservations-service directory):

$ docker build -t reservations-service:v0 ./

The service can be run by executing docker compose up command in the terminal:

$ docker compose up
[+] Running 1/1
 ✔ Container gcp-pubsub-reservations-service-1  Recreated                                                                                                                            10.3s
Attaching to reservations-service-1
reservations-service-1  |  * Debug mode: off
reservations-service-1  | WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
reservations-service-1  |  * Running on all addresses (0.0.0.0)
reservations-service-1  |  * Running on http://127.0.0.1:8080
reservations-service-1  |  * Running on http://172.27.0.2:8080
reservations-service-1  | Press CTRL+C to quit

To test integration we can hit http://localhost:8080/reservations endpoint:

$ curl -X POST http://127.0.0.1:8080/reservations

Instead of a success message, we should receive an error:

google.auth.exceptions.DefaultCredentialsError: Your default credentials were not found. 
To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc for more information.

At this stage, our service has no credentials to publish messages to the GCP topic. We need to provide them.

Service Authentication

The easiest way to authenticate services in GCP is to use a Service Account. Service accounts provide a secure and manageable way for services or applications to authenticate with GCP and access other resources.

A service account is a special kind of account that belongs to your GCP project, not a specific user. It allows services, like virtual machines (VMs), applications, or containers, to authenticate and interact with GCP resources (e.g., Pub-Sub, Cloud Storage, Cloud Functions).

  1. We need to navigate to the service accounts section in Google Console.

  2. Click Create Service Account.

  3. Define the Service Account name.

  4. Specify Service Account roles (Pub Sub Publisher and Pub Sub Viewer in our case).

  5. Select the newly created Service Account and use Keys tab.

  6. Create and save credentials JSON file for the Service Account. We stored it in the reservations-service directory on our disk.

Now we need to pass credentials to our reservation-service app. We can use the GOOGLE_APPLICATION_CREDENTIALS environment variable to provide the location of a credential JSON file (see docs). It is a default path that pubsub_v1.PublisherClient looks for credentials.

We inject the GOOGLE_APPLICATION_CREDENTIALS environment variable to the reservation-service and mount credentials.json file to a container:

# docker-compose.yml

services:
  reservations-service:
    image: reservations-service:v0
    ports:
      - "8080:8080"
    environment:
      - TOPIC_ID=reservation-created
      - PROJECT_ID=gcp-pubsub-guide
      - GOOGLE_APPLICATION_CREDENTIALS=/credentials.json
    volumes:
      - ./reservations-service/credentials.json:/credentials.json

There is also an option to explicitly point to authenticate GCP clients. We will present it in the Message Consumption section when we need to authenticate GCP subscribers.

Now if we repeat a test, we should get the following logs:

reservations-service-1  | 2024-09-23 11:14:33 [info     ] Publishing event               data=b'{"time": "2024-09-23T11:14:33.723921", "type": "reservation_created", "data": {"reservation_id": "1336dff2-2ba4-4fae-9980-008aaaea31b1"}}'
reservations-service-1  | 2024-09-23 11:14:34 [info     ] Message published successfully r=11831900815790074
reservations-service-1  | 192.168.65.1 - - [23/Sep/2024 11:14:34] "POST /reservations HTTP/1.1" 201 -

Messages Consumption

We need another service that can consume messages published to reservation-topic. But before building reporting-service we have to create a subscription that will pull messages from the reservation-created topic.

  1. Navigate to the Subscriptions section.

  2. Click Create Subscription.

    .

  3. Define the Subscription name and connect it to the reservation-created topic. The rest configuration options can be omitted.

    .

When the subscription is ready, we can prepare the reporting-service responsible for consuming reservation-created events.

# reporting-service/app.py

import json
import os

import structlog
from google.auth import jwt
from google.cloud import pubsub_v1

CREDENTIALS_FILE = "credentials.json"
AUDIENCE = "https://pubsub.googleapis.com/google.pubsub.v1.Subscriber"
PROJECT_ID = os.environ.get('PROJECT_ID')
SUBSCRIPTION_ID = os.getenv('SUBSCRIPTION_ID')

subscription_name = 'projects/{project_id}/subscriptions/{sub}'.format(
    project_id=PROJECT_ID,
    sub=SUBSCRIPTION_ID
)

logger = structlog.get_logger()

service_account_info = json.load(open(CREDENTIALS_FILE))
credentials = jwt.Credentials.from_service_account_info(
    service_account_info, audience=AUDIENCE,
)
credentials_sub = credentials.with_claims(audience=AUDIENCE)

def event_handler(message):
    message.ack()
    data = json.loads(message.data)
    logger.info("Received event", message=data)
    # Do something with the data, e.g. store in the reports database

with pubsub_v1.SubscriberClient(credentials=credentials_sub) as subscriber:
    logger.info("Subscribing to topic", topic=subscription_name)
    future = subscriber.subscribe(subscription_name, event_handler)
    try:
        future.result()
    except Exception as e:
        future.cancel()
        logger.info("Cancelled subscription", error=e)

We need to create a Service Account for reporting-service in a similar way we did for reservations-service (we only have to remember to add Pub Sub Subscriber role instead of Pub Sub Publisher). The generated credentials.json was save in reporting-service directory. As we said before, this time we explicitly authenticate pubsub_v1.SubscriberClient using jwt.Credentials.from_service_account_info method.

pubsub_v1.SubscriberClient instance is actively waiting for messages. When a message is received, the callback function is called. The event_handler function is responsible for processing the message.

We can copy the Dockerfile from reservations-service and use it for reporting-service. To build reporting-service:v0 image we run in reporting-service directory:

$ docker build -t reporting-service:v0 ./

The reporting-service have to be added to existing docker-compose.yml file:

# docker-compose.yml
...
  reporting-service:
    image: reporting-service:v0
    environment:
      - SUBSCRIPTION_ID=reservation-created-subscription
      - PROJECT_ID=gcp-pubsub-guide
    volumes:
      - ./reporting-service/credentials.json:/credentials.json

We pass both PROJECT_ID and SUBSCRIPTION_ID as environment variables and mount credentials.json file into the container volume.

Now we can test our setup end-to-end by triggering /reservations endpoint.

$ docker compose up
 ✔ Container gcp-pubsub-reporting-service-2     Created                                                                                                                               0.0s
 ✔ Container gcp-pubsub-reservations-service-1  Created                                                                                                                               0.0s
Attaching to reporting-service-2, reservations-service-1
reporting-service-2     | 2024-09-23 11:44:12 [info     ] Subscribing to topic           topic=projects/gcp-pubsub-guide/subscriptions/reservation-created-subscription
reservations-service-1  |  * Debug mode: off
reservations-service-1  | WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
reservations-service-1  |  * Running on all addresses (0.0.0.0)
reservations-service-1  |  * Running on http://127.0.0.1:8080
reservations-service-1  |  * Running on http://172.27.0.4:8080
reservations-service-1  | Press CTRL+C to quit
reservations-service-1  | 2024-09-23 11:44:16 [info     ] Publishing event               data=b'{"time": "2024-09-23T11:44:16.246260", "type": "reservation_created", "data": {"reservation_id": "7f91070b-c759-473d-93c2-ae73124f7db6"}}'
reservations-service-1  | 2024-09-23 11:44:16 [info     ] Message published successfully r=11832290255551516
reservations-service-1  | 192.168.65.1 - - [23/Sep/2024 11:44:16] "POST /reservations HTTP/1.1" 201 -
reporting-service-2     | 2024-09-23 11:44:18 [info     ] Received event                 message={'time': '2024-09-23T11:44:16.246260', 'type': 'reservation_created', 'data': {'reservation_id': '7f91070b-c759-473d-93c2-ae73124f7db6'}}

We can see a log reporting-service-2 | 2024-09-23 11:44:18 [info ] Received event that means a message was properly handled by reporting-service.

Scaling

As we mentioned earlier, GCP pub-sub provides excellent scalability by using subscriptions and subscribers. Even if you increase the number of subscribers (such as scaling your service to multiple instances or replicas), Pub/Sub guarantees that only one subscriber will receive a specific message per subscription.

To show it, we can exploit the replicated mode in Docker Compose. We will increase the number of replicas and check whether only one instance will receive a message.

# docker-compose.yml
...
  reporting-service:
    image: reporting-service:v0
    deploy:
      mode: replicated
      replicas: 3
    environment:
      - SUBSCRIPTION_ID=reservation-created-subscription
      - PROJECT_ID=gcp-pubsub-guide
    volumes:
      - ./reporting-service/credentials.json:/credentials.json

We restart Docker Compose:

 ✔ Container gcp-pubsub-reservations-service-1  Created                                                                                                                               0.0s
 ✔ Container gcp-pubsub-reporting-service-4     Created                                                                                                                               0.1s
 ✔ Container gcp-pubsub-reporting-service-2     Recreated                                                                                                                             0.1s
 ✔ Container gcp-pubsub-reporting-service-3     Created                                                                                                                               0.1s
Attaching to reporting-service-2, reporting-service-3, reporting-service-4, reservations-service-1
reporting-service-2     | 2024-09-23 11:50:39 [info     ] Subscribing to topic           topic=projects/gcp-pubsub-guide/subscriptions/reservation-created-subscription
reservations-service-1  |  * Debug mode: off
reservations-service-1  | WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
reservations-service-1  |  * Running on all addresses (0.0.0.0)
reservations-service-1  |  * Running on http://127.0.0.1:8080
reservations-service-1  |  * Running on http://172.27.0.4:8080
reservations-service-1  | Press CTRL+C to quit
reporting-service-3     | 2024-09-23 11:50:39 [info     ] Subscribing to topic           topic=projects/gcp-pubsub-guide/subscriptions/reservation-created-subscription
reporting-service-4     | 2024-09-23 11:50:39 [info     ] Subscribing to topic           topic=projects/gcp-pubsub-guide/subscriptions/reservation-created-subscription

In the logs, we can see that 3 instances of reporting-service were created:

  • gcp-pubsub-reporting-service-4

  • gcp-pubsub-reporting-service-2

  • gcp-pubsub-reporting-service-3

However, when we hit /reservations endpoint, only one replica received a message (in that case gcp-pubsub-reporting-service-3):

reservations-service-1  | 2024-09-23 11:52:54 [info     ] Publishing event               data=b'{"time": "2024-09-23T11:52:54.415216", "type": "reservation_created", "data": {"reservation_id": "2bf10a7a-986b-49a6-ad7a-c4e2f648ee18"}}'
reservations-service-1  | 2024-09-23 11:52:54 [info     ] Message published successfully r=11832626224360213
reservations-service-1  | 192.168.65.1 - - [23/Sep/2024 11:52:54] "POST /reservations HTTP/1.1" 201 -
reporting-service-3     | 2024-09-23 11:52:55 [info     ] Received event                 message={'time': '2024-09-23T11:52:54.415216', 'type': 'reservation_created', 'data': {'reservation_id': '2bf10a7a-986b-49a6-ad7a-c4e2f648ee18'}}

If we want another service to receive the same message, we need to create a new subscription.

To not overcomplicate this example we use reporting-service:v0 image as a new service: insights-service that subscribes to the reservation-created-subscription2 subscription.

# docker-compose.yml

  insigths-service:
    image: reporting-service:v0
    deploy:
      mode: replicated
      replicas: 1
    environment:
      - SUBSCRIPTION_ID=reservation-created-subscription-2
      - PROJECT_ID=gcp-pubsub-guide
    volumes:
      - ./reporting-service/credentials.json:/credentials.json

Now if we run a service and make a reservation request both reporting-service (but only one replica) and insights-service will receive a message.

reporting-service-2     | 2024-09-23 12:35:05 [info     ] Subscribing to topic           topic=projects/gcp-pubsub-guide/subscriptions/reservation-created-subscription
insigths-service-1      | 2024-09-23 12:35:05 [info     ] Subscribing to topic           topic=projects/gcp-pubsub-guide/subscriptions/reservation-created-subscription2
reporting-service-3     | 2024-09-23 12:35:05 [info     ] Subscribing to topic           topic=projects/gcp-pubsub-guide/subscriptions/reservation-created-subscription
reporting-service-4     | 2024-09-23 12:35:05 [info     ] Subscribing to topic           topic=projects/gcp-pubsub-guide/subscriptions/reservation-created-subscription
reservations-service-1  | 2024-09-23 12:35:33 [info     ] Publishing event               data=b'{"time": "2024-09-23T12:35:33.359304", "type": "reservation_created", "data": {"reservation_id": "1b289bae-232c-4a06-8a6e-f60db7d6e097"}}'
reservations-service-1  | 2024-09-23 12:35:33 [info     ] Message published successfully r=12238859038022867
reservations-service-1  | 192.168.65.1 - - [23/Sep/2024 12:35:33] "POST /reservations HTTP/1.1" 201 -
reporting-service-3     | 2024-09-23 12:35:34 [info     ] Received event                 message={'time': '2024-09-23T12:35:33.359304', 'type': 'reservation_created', 'data': {'reservation_id': '1b289bae-232c-4a06-8a6e-f60db7d6e097'}}
insigths-service-1      | 2024-09-23 12:35:34 [info     ] Received event                 message={'time': '2024-09-23T12:35:33.359304', 'type': 'reservation_created', 'data': {'reservation_id': '1b289bae-232c-4a06-8a6e-f60db7d6e097'}}

Conclusion

In this post, we demonstrated how to integrate two services using GCP pub-sub, covering the authentication process and effective scaling of the solution. However, there are additional concepts such as message retention, at-least-once and at-most-once delivery guarantees, and the Dead Letter Queue (DLQ) that were not covered here and could a good material for a separate post.

The complete codebase for this example can be found here.

17
Subscribe to my newsletter

Read articles from jorzel directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

jorzel
jorzel

Backend developer with special interest in software design, architecture and system modelling. Trying to stay in a continuous learning mindset. Enjoy refactoring, clean code, DDD philosophy and TDD approach.