How to Integrate Your Services Using GCP Pub-Sub
Pub-Sub (Publish-subscribe) is a great pattern to integrate components in a distributed system. It provides decoupling, allowing services to communicate without depending on each other directly. It enhances resiliency by ensuring reliable message delivery, even in the event of service disruptions or failures.
In this post, I will show you how to use Google Cloud (GCP) Pub-Sub to integrate two services written in Python. The post will tackle GCP cloud configuration and deployment of services using a simple Docker Compose setting.
About GCP Pub-Sub
Google Cloud Pub-Sub is a popular and powerful tool for implementing event-driven architectures in distributed services. It provides a highly scalable and flexible way to decouple services, allowing components to communicate asynchronously without tight coupling.
How does it work?
A component (service or application) can publish messages to a specific topic (a channel where messages are sent).
These messages can then be consumed by other services (subscribers) that are listening to that topic.
However, there’s an important distinction between subscriptions and subscribers that makes this solution highly scalable. A subscription is a named resource representing the stream of messages from a topic to be consumed by subscribers. A Subscriber is a client (like a service instance) that pulls or receives messages from a subscription. If you have multiple instances of the same service (for instance, running as replicas), each instance acts as a subscriber.
Suppose Service P publishes messages to Topic X and you want Service A (which runs 2 replicas) to subscribe to those messages. In that case, you will create a subscription A to Topic X. Each replica of Service A will act as a subscriber to Subscription A. Pub/Sub ensures that each message is delivered to only one subscriber within a subscription. If you need another service B to receive Topic X messages, you must create a new subscription.
Solution Architecture
Let's consider a reservation system to present how this solution works using GCP Pub-Sub. The Reservation Service publishes a message when a new reservation is made. The Reporting Service subscribes to the topic and consumes the reservation messages to generate reports or update analytics. It can run in several replicas to handle the increased load or for redundancy.
GCP project
To create a pub-sub instance, we need a project in GCP. We can create a project in GCP by following the steps below (I assume that you have a Google Account associated with your @gmail.com
mail address).
Click on
projects
selection in the top-left corner.Choose
New project
.-
Specify the project
name
andlocation
and clickCreate
.
We have our project created!
If you are afraid of the cost of cloud services, you can use the free tier of GCP or the free trial period. Currently, I am on 90 days free of charge period
Messages Dispatching
Our project is ready. The next thing we need to do is to create a pub-sub topic. We will use it to send messages to the cloud.
- Using the page searcher we navigate to the pub/sub section.
Click
Create topic
button.-
Define the topic name. The name should be meaningful and properly represent messages published on the topic. I chose
reservation-created
. We need a subscription that is related to the topic but we will tackle it in the Message Consumption section. For our simple example, other options (retention, schema, etc.) can be omitted.
Now, we need a publisher code. We will use google-cloud-pubsub
package to create GCPPublisherClient
.
# reservations-service/app.py
import structlog
from google.cloud import pubsub_v1
class ReservationCreatedEvent:
def __init__(self, reservation_id):
self.reservation_id = reservation_id
def to_dict(self):
return {
"time": datetime.now().isoformat(),
"type": "reservation_created",
"data": {
"reservation_id": self.reservation_id
}
}
class GCPPublisherClient:
def __init__(self, google_cloud_project, topic_id):
"""
:param google_cloud_project: Google project ID
:param topic_id: Topic ID
"""
self._pub_client = pubsub_v1.PublisherClient()
self._pub_topic = self._pub_client.topic_path(google_cloud_project, topic_id)
self._logger = structlog.get_logger()
def publish(self, event):
event_payload = json.dumps(event.to_dict()).encode("utf-8")
self._logger.info("Publishing event", data=event_payload)
future = self._pub_client.publish(
self._pub_topic, data=event_payload,
)
r = future.result()
self._logger.info("Message published successfully", r=r)
The ReservationCreatedEvent
is a class that represents the event we want to send. It has an reservation_id
attribute and timestamp when the event was created.
In the publisher code, we used future.result()
to wait for the message to be sent.
# reservations-service/app.py
import json
import os
import structlog
from flask import Flask
PROJECT_ID = os.environ.get('PROJECT_ID')
TOPIC_ID = os.getenv('TOPIC_ID')
app = Flask(__name__)
@app.route('/reservations', methods=['POST'])
def create_reservation():
publisher_client = GCPPublisherClient(PROJECT_ID, TOPIC_ID)
# some logic checking if reservation can be created
event = ReservationCreatedEvent(str(uuid.uuid4()))
publisher_client.publish(event)
return "Reservation created", 201
We exposed the API to create reservations using Flask
framework. The reservation-created
event will be sent at the end of /reservations
POST endpoint handler. The endpoint is simplified to focus only on the pub-sub integration part.
The docker-compose.yml
file is used to define the service configuration and deployment.
# docker-compose.yml
version: "3.7"
services:
reservations-service:
image: reservations-service:v0
ports:
- "8080:8080"
environment:
- TOPIC_ID=reservation-created
- PROJECT_ID=gcp-pubsub-guide
We exposed 8080
port to make HTTP requests to the service.
The reservation-service
image was defined in Dockerfile
:
FROM python:3.11-slim-bookworm
COPY app.py requirements.txt /
RUN pip install -r requirements.txt
EXPOSE 8080
CMD ["flask", "run", "-h", "0.0.0.0", "-p", "8080"]
And built using the command (within reservations-service
directory):
$ docker build -t reservations-service:v0 ./
The service can be run by executing docker compose up
command in the terminal:
$ docker compose up
[+] Running 1/1
✔ Container gcp-pubsub-reservations-service-1 Recreated 10.3s
Attaching to reservations-service-1
reservations-service-1 | * Debug mode: off
reservations-service-1 | WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
reservations-service-1 | * Running on all addresses (0.0.0.0)
reservations-service-1 | * Running on http://127.0.0.1:8080
reservations-service-1 | * Running on http://172.27.0.2:8080
reservations-service-1 | Press CTRL+C to quit
To test integration we can hit http://localhost:8080/reservations
endpoint:
$ curl -X POST http://127.0.0.1:8080/reservations
Instead of a success message, we should receive an error:
google.auth.exceptions.DefaultCredentialsError: Your default credentials were not found.
To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc for more information.
At this stage, our service has no credentials to publish messages to the GCP topic. We need to provide them.
Service Authentication
The easiest way to authenticate services in GCP is to use a Service Account. Service accounts provide a secure and manageable way for services or applications to authenticate with GCP and access other resources.
A service account is a special kind of account that belongs to your GCP project, not a specific user. It allows services, like virtual machines (VMs), applications, or containers, to authenticate and interact with GCP resources (e.g., Pub-Sub, Cloud Storage, Cloud Functions).
We need to navigate to the service accounts section in Google Console.
Click
Create Service Account
.Define the Service Account name.
Specify Service Account roles (
Pub Sub Publisher
andPub Sub Viewer
in our case).Select the newly created Service Account and use
Keys
tab.-
Create and save credentials JSON file for the Service Account. We stored it in the
reservations-service
directory on our disk.
Now we need to pass credentials to our reservation-service
app. We can use the GOOGLE_APPLICATION_CREDENTIALS
environment variable to provide the location of a credential JSON file (see docs). It is a default path that pubsub_v1.PublisherClient
looks for credentials.
We inject the GOOGLE_APPLICATION_CREDENTIALS
environment variable to the reservation-service
and mount credentials.json
file to a container:
# docker-compose.yml
services:
reservations-service:
image: reservations-service:v0
ports:
- "8080:8080"
environment:
- TOPIC_ID=reservation-created
- PROJECT_ID=gcp-pubsub-guide
- GOOGLE_APPLICATION_CREDENTIALS=/credentials.json
volumes:
- ./reservations-service/credentials.json:/credentials.json
There is also an option to explicitly point to authenticate GCP clients. We will present it in the Message Consumption section when we need to authenticate GCP subscribers.
Now if we repeat a test, we should get the following logs:
reservations-service-1 | 2024-09-23 11:14:33 [info ] Publishing event data=b'{"time": "2024-09-23T11:14:33.723921", "type": "reservation_created", "data": {"reservation_id": "1336dff2-2ba4-4fae-9980-008aaaea31b1"}}'
reservations-service-1 | 2024-09-23 11:14:34 [info ] Message published successfully r=11831900815790074
reservations-service-1 | 192.168.65.1 - - [23/Sep/2024 11:14:34] "POST /reservations HTTP/1.1" 201 -
Messages Consumption
We need another service that can consume messages published to reservation-topic
. But before building reporting-service
we have to create a subscription that will pull messages from the reservation-created
topic.
Navigate to the Subscriptions section.
-
Click
Create Subscription
..
Define the Subscription name and connect it to the
reservation-created
topic. The rest configuration options can be omitted..
When the subscription is ready, we can prepare the reporting-service
responsible for consuming reservation-created
events.
# reporting-service/app.py
import json
import os
import structlog
from google.auth import jwt
from google.cloud import pubsub_v1
CREDENTIALS_FILE = "credentials.json"
AUDIENCE = "https://pubsub.googleapis.com/google.pubsub.v1.Subscriber"
PROJECT_ID = os.environ.get('PROJECT_ID')
SUBSCRIPTION_ID = os.getenv('SUBSCRIPTION_ID')
subscription_name = 'projects/{project_id}/subscriptions/{sub}'.format(
project_id=PROJECT_ID,
sub=SUBSCRIPTION_ID
)
logger = structlog.get_logger()
service_account_info = json.load(open(CREDENTIALS_FILE))
credentials = jwt.Credentials.from_service_account_info(
service_account_info, audience=AUDIENCE,
)
credentials_sub = credentials.with_claims(audience=AUDIENCE)
def event_handler(message):
message.ack()
data = json.loads(message.data)
logger.info("Received event", message=data)
# Do something with the data, e.g. store in the reports database
with pubsub_v1.SubscriberClient(credentials=credentials_sub) as subscriber:
logger.info("Subscribing to topic", topic=subscription_name)
future = subscriber.subscribe(subscription_name, event_handler)
try:
future.result()
except Exception as e:
future.cancel()
logger.info("Cancelled subscription", error=e)
We need to create a Service Account for reporting-service
in a similar way we did for reservations-service
(we only have to remember to add Pub Sub Subscriber
role instead of Pub Sub Publisher
). The generated credentials.json
was save in reporting-service
directory. As we said before, this time we explicitly authenticate pubsub_v1.SubscriberClient
using jwt.Credentials.from_service_account_info
method.
pubsub_v1.SubscriberClient
instance is actively waiting for messages. When a message is received, the callback
function is called. The event_handler
function is responsible for processing the message.
We can copy the Dockerfile from reservations-service
and use it for reporting-service
. To build reporting-service:v0
image we run in reporting-service
directory:
$ docker build -t reporting-service:v0 ./
The reporting-service
have to be added to existing docker-compose.yml
file:
# docker-compose.yml
...
reporting-service:
image: reporting-service:v0
environment:
- SUBSCRIPTION_ID=reservation-created-subscription
- PROJECT_ID=gcp-pubsub-guide
volumes:
- ./reporting-service/credentials.json:/credentials.json
We pass both PROJECT_ID
and SUBSCRIPTION_ID
as environment variables and mount credentials.json
file into the container volume.
Now we can test our setup end-to-end by triggering /reservations
endpoint.
$ docker compose up
✔ Container gcp-pubsub-reporting-service-2 Created 0.0s
✔ Container gcp-pubsub-reservations-service-1 Created 0.0s
Attaching to reporting-service-2, reservations-service-1
reporting-service-2 | 2024-09-23 11:44:12 [info ] Subscribing to topic topic=projects/gcp-pubsub-guide/subscriptions/reservation-created-subscription
reservations-service-1 | * Debug mode: off
reservations-service-1 | WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
reservations-service-1 | * Running on all addresses (0.0.0.0)
reservations-service-1 | * Running on http://127.0.0.1:8080
reservations-service-1 | * Running on http://172.27.0.4:8080
reservations-service-1 | Press CTRL+C to quit
reservations-service-1 | 2024-09-23 11:44:16 [info ] Publishing event data=b'{"time": "2024-09-23T11:44:16.246260", "type": "reservation_created", "data": {"reservation_id": "7f91070b-c759-473d-93c2-ae73124f7db6"}}'
reservations-service-1 | 2024-09-23 11:44:16 [info ] Message published successfully r=11832290255551516
reservations-service-1 | 192.168.65.1 - - [23/Sep/2024 11:44:16] "POST /reservations HTTP/1.1" 201 -
reporting-service-2 | 2024-09-23 11:44:18 [info ] Received event message={'time': '2024-09-23T11:44:16.246260', 'type': 'reservation_created', 'data': {'reservation_id': '7f91070b-c759-473d-93c2-ae73124f7db6'}}
We can see a log reporting-service-2 | 2024-09-23 11:44:18 [info ] Received event
that means a message was properly handled by reporting-service
.
Scaling
As we mentioned earlier, GCP pub-sub provides excellent scalability by using subscriptions and subscribers. Even if you increase the number of subscribers (such as scaling your service to multiple instances or replicas), Pub/Sub guarantees that only one subscriber will receive a specific message per subscription.
To show it, we can exploit the replicated
mode in Docker Compose. We will increase the number of replicas
and check whether only one instance will receive a message.
# docker-compose.yml
...
reporting-service:
image: reporting-service:v0
deploy:
mode: replicated
replicas: 3
environment:
- SUBSCRIPTION_ID=reservation-created-subscription
- PROJECT_ID=gcp-pubsub-guide
volumes:
- ./reporting-service/credentials.json:/credentials.json
We restart Docker Compose:
✔ Container gcp-pubsub-reservations-service-1 Created 0.0s
✔ Container gcp-pubsub-reporting-service-4 Created 0.1s
✔ Container gcp-pubsub-reporting-service-2 Recreated 0.1s
✔ Container gcp-pubsub-reporting-service-3 Created 0.1s
Attaching to reporting-service-2, reporting-service-3, reporting-service-4, reservations-service-1
reporting-service-2 | 2024-09-23 11:50:39 [info ] Subscribing to topic topic=projects/gcp-pubsub-guide/subscriptions/reservation-created-subscription
reservations-service-1 | * Debug mode: off
reservations-service-1 | WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
reservations-service-1 | * Running on all addresses (0.0.0.0)
reservations-service-1 | * Running on http://127.0.0.1:8080
reservations-service-1 | * Running on http://172.27.0.4:8080
reservations-service-1 | Press CTRL+C to quit
reporting-service-3 | 2024-09-23 11:50:39 [info ] Subscribing to topic topic=projects/gcp-pubsub-guide/subscriptions/reservation-created-subscription
reporting-service-4 | 2024-09-23 11:50:39 [info ] Subscribing to topic topic=projects/gcp-pubsub-guide/subscriptions/reservation-created-subscription
In the logs, we can see that 3 instances of reporting-service
were created:
gcp-pubsub-reporting-service-4
gcp-pubsub-reporting-service-2
gcp-pubsub-reporting-service-3
However, when we hit /reservations
endpoint, only one replica received a message (in that case gcp-pubsub-reporting-service-3
):
reservations-service-1 | 2024-09-23 11:52:54 [info ] Publishing event data=b'{"time": "2024-09-23T11:52:54.415216", "type": "reservation_created", "data": {"reservation_id": "2bf10a7a-986b-49a6-ad7a-c4e2f648ee18"}}'
reservations-service-1 | 2024-09-23 11:52:54 [info ] Message published successfully r=11832626224360213
reservations-service-1 | 192.168.65.1 - - [23/Sep/2024 11:52:54] "POST /reservations HTTP/1.1" 201 -
reporting-service-3 | 2024-09-23 11:52:55 [info ] Received event message={'time': '2024-09-23T11:52:54.415216', 'type': 'reservation_created', 'data': {'reservation_id': '2bf10a7a-986b-49a6-ad7a-c4e2f648ee18'}}
If we want another service to receive the same message, we need to create a new subscription.
To not overcomplicate this example we use reporting-service:v0
image as a new service: insights-service
that subscribes to the reservation-created-subscription2
subscription.
# docker-compose.yml
insigths-service:
image: reporting-service:v0
deploy:
mode: replicated
replicas: 1
environment:
- SUBSCRIPTION_ID=reservation-created-subscription-2
- PROJECT_ID=gcp-pubsub-guide
volumes:
- ./reporting-service/credentials.json:/credentials.json
Now if we run a service and make a reservation request both reporting-service
(but only one replica) and insights-service
will receive a message.
reporting-service-2 | 2024-09-23 12:35:05 [info ] Subscribing to topic topic=projects/gcp-pubsub-guide/subscriptions/reservation-created-subscription
insigths-service-1 | 2024-09-23 12:35:05 [info ] Subscribing to topic topic=projects/gcp-pubsub-guide/subscriptions/reservation-created-subscription2
reporting-service-3 | 2024-09-23 12:35:05 [info ] Subscribing to topic topic=projects/gcp-pubsub-guide/subscriptions/reservation-created-subscription
reporting-service-4 | 2024-09-23 12:35:05 [info ] Subscribing to topic topic=projects/gcp-pubsub-guide/subscriptions/reservation-created-subscription
reservations-service-1 | 2024-09-23 12:35:33 [info ] Publishing event data=b'{"time": "2024-09-23T12:35:33.359304", "type": "reservation_created", "data": {"reservation_id": "1b289bae-232c-4a06-8a6e-f60db7d6e097"}}'
reservations-service-1 | 2024-09-23 12:35:33 [info ] Message published successfully r=12238859038022867
reservations-service-1 | 192.168.65.1 - - [23/Sep/2024 12:35:33] "POST /reservations HTTP/1.1" 201 -
reporting-service-3 | 2024-09-23 12:35:34 [info ] Received event message={'time': '2024-09-23T12:35:33.359304', 'type': 'reservation_created', 'data': {'reservation_id': '1b289bae-232c-4a06-8a6e-f60db7d6e097'}}
insigths-service-1 | 2024-09-23 12:35:34 [info ] Received event message={'time': '2024-09-23T12:35:33.359304', 'type': 'reservation_created', 'data': {'reservation_id': '1b289bae-232c-4a06-8a6e-f60db7d6e097'}}
Conclusion
In this post, we demonstrated how to integrate two services using GCP pub-sub, covering the authentication process and effective scaling of the solution. However, there are additional concepts such as message retention, at-least-once and at-most-once delivery guarantees, and the Dead Letter Queue (DLQ) that were not covered here and could a good material for a separate post.
The complete codebase for this example can be found here.
Subscribe to my newsletter
Read articles from jorzel directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by
jorzel
jorzel
Backend developer with special interest in software design, architecture and system modelling. Trying to stay in a continuous learning mindset. Enjoy refactoring, clean code, DDD philosophy and TDD approach.