The backpressure is an important property of reactive streams. In simple words, it ensures us that the publishers do not produce new elements faster than we can process them. In this way, we avoid overloading our system. Over years, I noticed that many newcomers to the reactive world have trouble with understanding how backpressure affects their reactive streams. In this article, we will explore this topic in greater detail, with some illustrations and code examples in Project Reactor library.

tl;dr;

Backpressure is a kind of feedback loop between publisher-subscriber pairs that makes sense in a concurrent environment. Various reactive operators can change our initial demand, for example by requesting more elements than we asked for.

Backpressure is not specific to reactive streams

Backpressure is a resistance or force opposing the desired flow of fluid through pipes.

Wikipedia

The term “backpressure” came to the software from the world of fluid dynamic. It also predates the appearance of reactive programming. For example, it is a part of TCP protocol, where it is called flow control. The client and server can negotiate optimal transfer rate, so that they do not overload each other. We can also find a primitive (although fully correct) form of backpressure in the solution to the classic producer-consumer problem:

  • producers add items to the common buffer,
  • consumers read items from the common buffer,
  • the buffer has limited capacity.

Through the use of locks and condition variables, producers go idle, when the buffer is full. It happens if consumers can’t consume incoming items fast enough.

In short…

Don’t be afraid of the backpressure. There is a great chance that you have already encountered it many times in different places.

Signalling demand

We can look at the backpressure in reactive streams as a kind of a feedback loop. Before the publisher emits anything, the subscriber needs to request some demand. After processing the element, it can ask for more, which causes next item to be emitted.

Illustration of a reactive stream with backpressure between the initial publisher, operator and final subscriber.
Backpressure feedback loop

In the image above, we have 3 actors. The middle one (“Subscriber/Publisher”) can be a bit confusing, but in fact, this is just a normal reactive operator. Operators are both publishers and subscribers at the same time. Therefore when we request some demand in the final subscriber, this request doesn’t go directly to the initial publisher, but is received by the last operator. We will get back to that shortly. Meanwhile, let’s take a look at the code snippet below. It shows the simplest form of signalling the demand: “I have processed one element, give me the next one”.

public class OneByOneSubscriber implements Subscriber<Integer> {
    private static final Logger logger =
        LoggerFactory.getLogger(OneByOneSubscriber.class);
    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1); // 1
    }

    @Override
    public void onNext(Integer value) {
        logger.info("Consumed " + value);
        subscription.request(1); // 2
    }

    @Override
    public void onError(Throwable error) {
        logger.error("An error has occurred", error);
    }

    @Override
    public void onComplete() {
        logger.info("Reading completed");
    }
}

Explanation:

  1. to put the machine in motion, we need to make some initial request. Without this line, we would do nothing.
  2. in response for the request, the publisher tries to send the requested number of items. It does not need to happen immediately. Once it has an item to send, it calls onNext() method. After processing the item, we request one more. Wihout this line, the publisher would never emit any further item.

Sample code

Find a complete example on Github: zone84-examples/backpressure-in-reactive-streams

Let’s use the subscriber

Let’s try to run our subscriber. The backpressure makes sense only in a concurrent environment. Why? On a single thread, it doesn’t matter who is faster (publisher or the subscriber). Only one of them can work at a time, so there is no way to overload the other side. Below, we use 4 threads in total, spread equally between both sides.

private static final int PREFETCH_SIZE = 1;

// ...

var first = Flux.fromIterable(publisher)
    .subscribeOn(Schedulers.parallel());
var second = Flux.fromIterable(publisher)
    .subscribeOn(Schedulers.parallel());
Flux.merge(PREFETCH_SIZE, first, second)
    .parallel(2)
    .runOn(Schedulers.parallel(), PREFETCH_SIZE)
    .subscribe(new OneByOneSubscriber(latch));

Output analysis

If we run the example, we’ll see the following output:

[parallel-4] - Produced value: 76
[parallel-2] - Consumed 74
[parallel-3] - Produced value: 77
[parallel-1] - Consumed 76
[parallel-4] - Produced value: 78
[parallel-3] - Produced value: 79
[parallel-1] - Consumed 77
[parallel-4] - Produced value: 80
[parallel-3] - Produced value: 81
[parallel-4] - Produced value: 82
[parallel-1] - Consumed 78
[parallel-1] - Consumed 79

The small lag between producing and consuming is expected, when we work on multiple threads. In fact, this is a proof that the backpressure works. Why? Because it doesn’t grow over time. Both publishers match their speed to the subscribers. We can try to put sleep() call into the subscriber code to see that the other side also slows down.

Consequences of backpressure for reactive streams

I have mentioned earier that reactive operators are both publishers and subscribers at the same time. Now it’s time to explore this in more detail, because it is crucial to understand what the backpressure can do. Let’s take a look at the illustration:

Illustration that operators are both publishers and subscribers at the same time.
Operators are also both publishers and subscribers

We do not subscribe directly to the initial publisher, but to the last operator. In fact, our subscriber is not aware who’s the initial publisher at all! The same happens with requesting demand. It is received by Operator B which can do absolutely anything with it. Propagate immediately. Delay. Request 100 items instead of 1. This is an important property of the backpressure in the reactive streams.

Operators can change demand

publishOn() is a great example, how operators can change our demand (observeOn() in RxJava). We use it for switching the execution to a different scheduler. However, it also keeps an internal buffer, so that the elements are immediately available, when the thread is ready to pick up the next one. Let’s take a look how it works in practice. We need another subscriber for that:

public class LimitedSubscriber implements Subscriber<Integer> {
    private static final Logger logger =
        LoggerFactory.getLogger(LimitedSubscriber.class);

    private final int limit;
    private final AtomicInteger received = new AtomicInteger(0);
    private Subscription subscription;

    public LimitedSubscriber(int limit) {
        this.limit = limit;
        this.latch = latch;
    }

    @Override
    public void onSubscribe(Subscription s) {
        logger.info("Requesting " + limit + " elements...");
        subscription = s;
        subscription.request(limit);
    }

    @Override
    public void onNext(Integer value) {
        var index = received.incrementAndGet();
        logger.info("Received element #" + index + ": " + value);

        if (index == limit) {
            subscription.cancel();
        }
    }

    @Override
    public void onError(Throwable error) {
        logger.error("Panic!", error);
    }

    @Override
    public void onComplete() {
        logger.info("Completed.");
    }
}

This time, we have only one request() call, in line 18. Once we receive everything we want, we simply cancel our subscription (lines 26-27). Now let’s use it in a very simple stream that just runs the publisher and subscriber on different threads and request just 1 element from the publisher:

var subscriber = new LimitedSubscriber(1);

Flux.fromIterable(publisher)
    .subscribeOn(Schedulers.boundedElastic())
    .publishOn(Schedulers.parallel())
    .subscribe(subscriber);

Let’s run the example:

[main] - Requesting 1 elements...
[boundedElastic-1] - Produced value: 0
[boundedElastic-1] - Produced value: 1
[boundedElastic-1] - Produced value: 2
 ... (a lot of logs here) ...
[boundedElastic-1] - Produced value: 147
[parallel-1] - Received element #1: 0
[boundedElastic-1] - Produced value: 148
[boundedElastic-1] - Produced value: 149

What has just happened? We requested only 1 element, but the publisher generated more than 100. This is correct. publishOn() operator received our demand for 1 element, but it also wants to fill in the internal buffer. Therefore it asks the initial publisher for 256 elements which is the default buffer size. Therefore, the publisher attempts to generate 256 elements, and they are still being created when we cancel our subscription.

In short…

Operators can change our demand, and ask for more elements than we need. Always read the documentation to learn, how the given operator handles the backpressure.

Demand is additive

One might ask, what happens if we call request() twice, with different values? The answer is simple: the publisher should sum the demand. If we duplicate the line 18 in the example subscriber code (and update the condition in line 26 to compare to twice the limit), we should receive 2 elements instead of one. It works like an ordinary counter. Calling request() increases the counter value, receiving the element with onNext() – decrements it. If the counter goes down to 0, we stop sending more elements. And basically that’s all.

In short…

Calling request() multiple times is safe. You don’t need to track how many elements you have requested.

Summary

At this point, we should have a good overview of what backpressure is and how it works. Under the hood, it’s like a simple counter and a couple of rules that control updating it. The key takeaways are:

  • backpressure makes sense only in a concurrent environment,
  • operators are also both publishers and subscribers at the same time,
  • we request the demand only to the nearest operator which can change it.

Sample code

Find a complete example on Github: zone84-examples/backpressure-in-reactive-streams

Subscribe
Notify of
guest

0 Comments
Inline Feedbacks
View all comments