An Introduction to Reactive Programming and Spring WebFlux.
What's the mess ?
Imagine you’re developing a financial application that provides real-time stock price updates to users. In this application:
Real-Time Data: Users need to see the latest stock prices as soon as they change.
High Concurrency: The application must handle thousands of concurrent users who all require live updates.
Low Latency: Users expect minimal delay in receiving updates.
The traditional MVC
In traditional spring MVC application, each request is handled by a seperate thread.
Request Handling: Each user request for stock prices is processed by a dedicated thread. While this model is manageable for low to moderate traffic, it struggles with high concurrency. The server can quickly become overwhelmed, leading to performance degradation and slow response times.
Polling: To provide real-time updates, the frontend may need to poll the server at regular intervals for new stock prices. This approach has several drawbacks:
Increased Latency: There’s a delay between stock price changes and user updates due to polling intervals.
High Resource Utilization: Polling consumes significant server resources, leading to higher loads and potential server crashes.
Scalability Issues: Handling each request with a separate thread and relying on polling leads to inefficient resource utilization, increased memory and CPU usage, and challenges in scaling.
Reactive Programming
Reactive programming offers a solution to these challenges by focusing on asynchronous data streams and non-blocking I/O. This paradigm enables developers to build systems that are more resilient, responsive, and elastic.
Blocking vs. Non-Blocking:
Blocking: Operations wait for previous ones to complete, leading to inefficiencies and scalability issues under high load.
Non-Blocking: Operations use callbacks or asynchronous methods to handle results as they become available, improving efficiency and scalability.
Spring WebFlux
Introduced in Spring 5, WebFlux supports reactive programming and offers a way to write asynchronous, non-blocking code.
Built on Project Reactor: WebFlux uses Project Reactor to implement the Reactive Streams specification, providing a standard for asynchronous stream processing with non-blocking backpressure.
Reactive Stack: Unlike traditional Spring MVC, which relies on the servlet API and synchronous processing, WebFlux uses a reactive stack, allowing efficient handling of many requests with fewer resources.
Flexibility: Supports both annotated controllers and functional endpoints.
Key Features
Mono and Flux
Mono
Represents a single asynchronous value or an empty value. It is typically used when you expect a single response, such as a single object or a status.
@RestController
@RequestMapping("/api/mono")
public class MonoController {
@GetMapping("/greeting")
public Mono<String> getGreeting() {
return Mono.just("Hello, WebFlux with Mono!");
}
@GetMapping("/empty")
public Mono<Void> getEmpty() {
return Mono.empty();
}
}
/api/mono/greeting
: Returns a single string"Hello, WebFlux with Mono!"
wrapped in aMono
./api/mono/empty
: Returns an empty response with a 204 No Content status using an emptyMono<Void>
.
Flux
Represents a stream of 0 to N asynchronous values. It is used when you expect multiple results, such as a list of objects or a data stream.
@RestController
@RequestMapping("/api/flux")
public class FluxController {
@GetMapping("/numbers")
public Flux<Integer> getNumbers() {
return Flux.just(1, 2, 3, 4, 5);
}
@GetMapping("/range")
public Flux<Integer> getRange() {
return Flux.range(1, 10);
}
}
/api/flux/numbers
: Returns aFlux
that emits a fixed sequence of integers (1 through 5)./api/flux/range
: Returns aFlux
that emits a range of integers from 1 to 10.
WebClient
WebClient is a non-blocking, reactive client for making HTTP requests offering more flexible and powerful alternative to RestTemplate
for performing asynchronous and reactive operations.
example usage:
public class WebClientExample {
private final WebClient webClient;
public WebClientExample() {
this.webClient = WebClient.create("http://localhost:8080");
}
public Mono<String> getExample() {
return webClient.get()
.uri("/api/hello")
.retrieve()
.bodyToMono(String.class);
}
}
WebClient Initialization:
- When an instance of
WebClientExample
is created, it initializes aWebClient
with the base URLhttp://localhost:8080
.
- When an instance of
Making Requests:
The
getExample
method makes an asynchronous GET request to the/api/hello
endpoint.It returns a
Mono<String>
, which represents a single asynchronous value. TheMono
will contain the response body as a string when the request completes.
WebSockets
WebSocket is a protocol that provides full-duplex communication channel over a single, long-lived connection between the client and server, allowing for real-time, bi-directional communication.
@Component
public class NewsWebSocketHandler extends TextWebSocketHandler {
private final CopyOnWriteArraySet<WebSocketSession> sessions = new CopyOnWriteArraySet<>();
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
sessions.add(session);
}
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
// Handle incoming messages if needed
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
sessions.remove(session);
}
public void broadcastNews(List<Article> articles) {
String newsJson;
try {
newsJson = objectMapper.writeValueAsString(articles);
} catch (IOException e) {
e.printStackTrace();
return;
}
for (WebSocketSession session : sessions) {
try {
session.sendMessage(new TextMessage(newsJson));
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
@Configuration
@EnableWebSocket
@RequiredArgsConstructor
public class WebSocketConfig implements WebSocketConfigurer {
private final NewsWebSocketHandler newsWebSocketHandler;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(newsWebSocketHandler, "/ws/news").setAllowedOrigins("*");
}
}
}
Client Connection:
- Clients connect to the WebSocket endpoint
ws://
localhost:8080/ws/news
.
- Clients connect to the WebSocket endpoint
Handling Connections:
When a client connects,
afterConnectionEstablished
is called, adding the session to thesessions
set.If a client disconnects,
afterConnectionClosed
is called, removing the session from thesessions
set.
Broadcasting News:
The
broadcastNews
method can be called to send a list of articles to all connected clients.Articles are converted to JSON strings and sent as
TextMessage
to each session.
Router Functions
Router functions provide a functional programming style for routing HTTP requests in Spring WebFlux. Instead of using the traditional @Controller
and @RequestMapping
annotations, you define routes using a functional approach. This can lead to more flexible and concise route definitions.
@Configuration
public class RouterConfig {
@Bean
public RouterFunction<ServerResponse> routerFunction(Handler handler) {
return route(GET("/hello"), handler::hello);
}
}
@Component
public class Handler {
public Mono<ServerResponse> hello(ServerRequest request) {
return ServerResponse.ok().bodyValue("Hello, WebFlux!");
}
}
RouterConfig: Defines a bean for the router function. The
route
method maps the GET request to/hello
to thehello
method in theHandler
class.route(GET("/hello"), handler::hello)
: This line routes GET requests to/hello
to thehello
method in theHandler
class.
Handler: Handles the incoming requests.
hello(ServerRequest request)
: This method handles the request and returns aMono<ServerResponse>
with a response body of "Hello, WebFlux!".
Back to the point
Using Spring WebFlux in a financial application that provides real-time stock price updates addresses several limitations of traditional Spring MVC:
Efficient Resource Utilization: WebFlux employs non-blocking I/O, allowing the system to handle multiple tasks concurrently without keeping threads waiting for responses. This improves resource management, especially when dealing with a high volume of users.
Reduced Latency: By supporting reactive streams, WebFlux enables the server to push updates to clients as soon as new data is available, eliminating the need for constant polling and reducing latency.
Handling High Concurrency: WebFlux manages numerous concurrent connections more effectively than traditional MVC by using fewer threads and lowering memory and CPU overhead, making it suitable for applications with many simultaneous users.
Bidirectional Communication: WebFlux supports WebSockets for real-time, two-way communication, allowing the server to send updates directly to clients without repeated requests, enhancing efficiency for real-time data delivery.
Conclusion
Embracing Spring WebFlux can transform how you handle real-time, high-concurrency applications like financial platforms needing instant stock updates. Unlike traditional Spring MVC, WebFlux’s non-blocking architecture and reactive approach let you build more scalable and responsive systems. By effectively managing resources and reducing latency, you can deliver a smoother, more efficient experience for users, even under heavy loads. WebFlux not only meets the modern demands of real-time data but also opens up new possibilities for creating dynamic, high-performance applications. Dive into WebFlux, and you'll find yourself equipped to tackle today’s complex challenges with a fresh, powerful toolkit.
Subscribe to my newsletter
Read articles from pawan kanishka directly inside your inbox. Subscribe to the newsletter, and don't miss out.
Written by