Rust multi module microservices Part 7 - Books Analytics
We are so close to the end and just one more step to configure a service that consumes the created book. Update the Cargo.toml of books_analytics crate to have the below contents.
[package]
name = "books_analytics"
version = "0.0.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
kafka = {path = "../kafka"}
tokio = { workspace = true }
common = {path = "../common"}
tracing = {workspace = true}
tracing-subscriber = {workspace = true}
opentelemetry = {workspace = true}
axum-tracing-opentelemetry = {workspace = true}
opentelemetry-zipkin = {workspace = true}
tracing-opentelemetry = {workspace = true}
Update the main.rs to have the below code
use common::events::{constants::Topics, dto::CreatedBook};
use kafka::consumer::KafkaConsumer;
use tokio::sync::mpsc;
use tracing::info;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
opentelemetry::global::set_text_map_propagator(opentelemetry_zipkin::Propagator::new());
let tracer = opentelemetry_zipkin::new_pipeline()
.with_service_name("books_analytics".to_owned())
.with_service_address("127.0.0.1:8080".parse().unwrap())
.with_collector_endpoint("http://localhost:9411/api/v2/spans")
.install_batch(opentelemetry::runtime::Tokio)
.expect("unable to install zipkin tracer");
let tracer = tracing_opentelemetry::layer().with_tracer(tracer.clone());
let subscriber = tracing_subscriber::fmt::layer().json();
let level = EnvFilter::new("debug".to_owned());
tracing_subscriber::registry()
.with(subscriber)
.with(level)
.with(tracer)
.init();
let kakfa_consumer = KafkaConsumer::new(
"localhost:9092".to_string(),
"http://localhost:8081".to_string(),
"books-created-consumer".to_string(),
Topics::BookCreated.to_string(),
);
let (sender, mut receiver) = mpsc::unbounded_channel::<CreatedBook>();
tokio::spawn(async move {
info!("Strarting book created consumer");
kakfa_consumer.consume(sender.clone()).await;
});
while let Some(message) = receiver.recv().await {
info!("Consumed messaged {:?}", message)
}
opentelemetry::global::shutdown_tracer_provider();
Ok(())
}
The code starts by importing various modules and dependencies.
The
main
function is defined as an asynchronous function and is marked with the#[tokio::main]
attribute, indicating that it should be executed as the entry point of the program.The first line in the
main
function sets the text map propagator for OpenTelemetry to the Zipkin propagator.The code then initializes a Zipkin tracer pipeline by configuring the service name, service address, and collector endpoint. The pipeline is installed and assigned to the
tracer
variable.Next, the
tracer
is used to create a tracing layer (tracing_opentelemetry::layer()
) that is added to the subscriber.The subscriber is configured with a JSON formatter and an environment filter to determine the log level.
The subscriber is then initialized using
tracing_subscriber::registry().init()
.An instance of
KafkaConsumer
is created, passing the Kafka bootstrap servers, schema registry URL, consumer group ID, and the topic name as arguments.An unbounded channel is created using
tokio::sync::mpsc::unbounded_channel
, which returns a sender and a receiver.A background task is spawned using
tokio::spawn
, which will consume messages from Kafka using theconsume
method ofKafkaConsumer
. The sender is cloned and passed as an argument to theconsume
method.Inside the spawned task, a log message is printed indicating that the consumer is starting.
The while loop iterates over messages received from the receiver channel using
receiver.recv().await
. When a message is received, a log message is printed indicating that the message has been consumed.After the loop exits, the global tracer provider is shut down.
Finally, the
Ok(())
expression is returned to indicate a successful execution of themain
function.
In summary, the code sets up the necessary components for consuming messages from a Kafka topic. It initializes tracing, creates a Kafka consumer, spawns a task to consume messages, and prints log messages for each consumed message. The program runs indefinitely until it is terminated.
We are done with the coding and now let's run the services to see what all this is about in the next article.
Subscribe to my newsletter
Read articles from Omprakash Sridharan directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by