Today we will see, how to use Zip operator in reactive programming. It is useful when writing microservices. Sometimes, we need to fetch some data from a group of services. Then, we wait for the requests to complete. Finally, we process the results. Making requests one by one would be very inefficient. However, Zip operator helps us make all of them concurrently. Let’s see, how.

tl;dr;

We can use the reactive Zip operator to make multiple asynchronous HTTP requests and wait for all results to arrive.

Simple feed aggregator

Efficient web crawlers are very easy to write with reactive programming. The final code is very compact and easy to follow. For this reason, they are also great for learning, how reactive programming works. Here we are going to write a simple RSS feed aggregator in Kotlin. It will take feeds from several hardcoded blogs and build a single output feed from them. It is not a true crawler, because it does not detect new blogs. However, it will be sufficient for us. Below, there is a summary of used tools:

  • Kotlin language
  • Micronaut Framework: good support for asynchronous programming
  • Project Reactor: reactive programming library

Note that you can also easily adapt the code to RxJava. It also offers Zip on its Single and Flowable types.

Sample code

Find a complete example on Github: zone84-examples/reactivezipoperator-demo

Prerequisite: non-blocking HTTP client

The first step is choosing the right HTTP client. There are many available libraries. Some of them, like OkHttp and Retrofit use blocking I/O. It is not suitable for our goal, because it blocks the thread until the response arrives. Although it’s possible to off-load the network call to a separate thread, it defeats the purpose of doing this exercise. Fortunately, Micronaut Framework provides its own HTTP client, which is non-blocking by default. The code below shows, how to use it:

class BlogCollector(
    private val client: ReactorHttpClient,
    private val uri: String
) {

    fun collect(): Mono<List<SyndEntry>> {
        logger.info { "Will collect entries from '$uri'" }
        return Mono.from(client.exchange(HttpRequest.GET<String>(uri), String::class.java))
            .filter { it.status == HttpStatus.OK }
            .map { parse(it.body()) }
    }

    fun parse(feedBody: String): List<SyndEntry> {
        val feed = SyndFeedInput().build(StringReader(feedBody))
        logger.info { "Processing feed..." }
        return feed.entries
    }

    companion object {
        private val logger = KotlinLogging.logger { }
    }
}

Let’s note that our collect() function does not make any call yet. It just returns a “recipe”, how to invoke the external service asynchronously and how to process the response. In the example, I’m using a simple library called Rome to parse RSS feed content.

Adding some feeds

The next step is rather technical. The BlogCollector class shown above fetches the feed from just a single website. For aggregation, however, we need more blogs. For this reason, we’ll make a Micronaut factory to make a couple of BlogCollector instances calling different blogs:

@Factory
class BlogCollectorFactory {
    @Singleton
    @Named("1")
    fun createFirstFactory(@Client("https://zone84.tech/") client: ReactorHttpClient) =
        BlogCollector(client, "/feed")

    @Singleton
    @Named("2")
    fun createSecondFactory(@Client("https://micronaut.io/") client: ReactorHttpClient) =
        BlogCollector(client, "/blog/feed")
}

Aggregation with reactive Zip operator

Finally, we can make the actual aggregation. We write a small controller with a single GET endpoint. Upon calling, it contacts two blogs, fetches the feeds and aggregates them into one:

@Controller("/feed")
class AggregatorController(
    @Named("1")
    private val firstCollector: BlogCollector,
    @Named("2")
    private val secondCollector: BlogCollector,
    private val configuration: HttpServerConfiguration
) {
    @Get("/", produces = [MediaType.APPLICATION_XML])
    fun collectPosts() : Mono<String> {
        val first: Mono<List<SyndEntry>> = firstCollector.collect()
        val second: Mono<List<SyndEntry>> = secondCollector.collect()
        return Mono.zip(first, second)
            .map { (firstEntries, secondEntries) ->
                logger.info {
                    "Results have arrived! ${firstEntries.size} posts from blog 1, " +
                        "${secondEntries.size} posts from blog 2"
                }
                val feed: SyndFeed = SyndFeedImpl()
                feed.feedType = "rss_2.0"
                feed.title = "my favorite blogs"
                feed.description = "my favorite blogs"
                feed.link = "http://${configuration.host}:${configuration.port}/feed"
                feed.entries = firstEntries + secondEntries
                publishFeed(feed)
            }
    }

    private fun publishFeed(feed: SyndFeed): String {
        return StringWriter().use { writer ->
            val syndFeedOutput = SyndFeedOutput()
            syndFeedOutput.output(feed, writer)
            writer.toString()
        }
    }

    companion object {
        private val logger = KotlinLogging.logger { }
    }
}

In lines 11-13, we see a typical usage of Mono.zip(). You might notice that it is not a typical function on the Mono instance. It’s rather a static method that accepts many streams. We begin with creating the recipes for invoking a single external service (lines 11 and 12). Then, we pass all monos into Mono.zip(). Once all results are available, the reactive Zip operator creates a tuple from them. Finally, we can process it, for example with .map() operator.

Important

zip() is a static method of Mono class. You will not find it in the autocompletion suggestions, when hitting dot at the Mono instance.

More use cases for reactive Zip operator

Note that in our case, both external services return the same data type: List<SyndFeed>. However, this is just specific to our example. Streams passed to the reactive Zip operator can return different data types. Imagine that you are running a video streaming website, and you want to show the user profile. You want to have a single BFF (backend-for-frontend) service that aggregates all the data from many sources:

  • UserProfileResponse from user-profiles service
  • RecentlyWatchedResponse from video-tracker service
  • SubscriptionStatusResponse from user-subscriptions service

This is how it would look like in the code:

val profileMono: Mono<UserProfileResponse> =
    fetchUserProfiles(query.userId)
val recentlyWatchedMono: Mono<RecentlyWatchedResponse> =
    fetchRecentlyWatched(query.userId)
val subscriptionStatusMono: Mono<SubscriptionStatus> =
    fetchSubscriptionStatus(query.userId)

val bffResponse = Mono.zip(
    profileMono,
    recentlyWatchedMono,
    subscriptionStatusMono
)
    .map { (profile, recentlyWatched subscriptionStatus) -> 
       // do all the aggregation here
    }

In short…

Every stream passed to Mono.zip() may produce a different data type.

How reactive Zip operator works

It’s time to understand how the reactive Zip operator works. Let’s take a look at the marble diagram:

Marble diagram for reactive zip operator: two mono streams produce a single element, zip returns the tuple with both.
Marble diagram of Zip operator in reactive programming

The reactive Zip operator takes two or more streams. This is the sequence of operations:

  1. subscribe to all streams,
  2. wait until all of them produce the result,
  3. wait until all of them complete,
  4. produce a tuple with all the results,
  5. send completion signal.

From the examples shown earlier we also know that we are not limited to just two input streams. In Project Reactor, there are variants that handle up to 8 typed arguments. If we need more, we can pass a collection of monos and get an Array<Any> (Java notation: Object[]) in the result. In this case, we must cast individual array elements to proper types on our own.

All-or-nothing

Careful readers may ask a couple of questions here. First of all, what happens if one of the streams does not produce a result? In Project Reactor, Mono may complete without producing any result. In this case, the reactive Zip operator also produces an empty mono. This is an all-or-nothing operation. Similarly, if one of the streams fails with an exception, the output mono also emits an error signal. This behavior can be easily explained by looking at the API reference. The return type of the reactive Zip operator does not have any support for partial results. So what to do if we need it? We can simply use Optional<T> from Java:

val optionalMono = Mono<Optional<SomeResponse>> = fetchOptionalData()

In short…

The reactive Zip operator does not support partial results. You need to implement them on your own on top of them, if you really need them.

Non-blocking?

Another question is about non-blocking code. Where’s any mention of non-blocking in the description of the reactive Zip operator? The answer is simple: nowhere. Technically speaking, you can use it with 100% synchronous code. You can also use it on actions off-loaded to a blocking I/O thread pool. It’s not the reactive Zip operator which makes your code non-blocking. As we said earlier, you need to use a non-blocking I/O first. Zip just works properly with it.

Reactive Zip operator in Flux

In a typical HTTP service, we usually operate on Monos that can produce up to 1 element. This is reasonable. For a single HTTP request, we want to produce a single HTTP response. However, other use cases may require processing Flux streams that carry many elements (possibly an infinite number). Flux also supports the reactive Zip operator. It has a similar behavior:

  1. subscribe to all streams,
  2. wait until all of them produce one result,
  3. produce a tuple with all the results,
  4. repeat from step 2 until any of the streams completes.

Note that the subscribed streams may produce data at different speeds. In Flux, the reactive Zip operator does not continue until it has all the items necessary to produce a tuple. Therefore, it works with the speed of the slowest stream. You need to ensure that the subscribed streams handle backpressure properly.

In short…

You can also use Zip operator with Flux, but you need to remember about the backpressure.

Conclusion

Reactive programming libraries come with a rich collection of operators. At the first sight, it may be overhelming. Studying practical examples is a very good way to learn them. The example code gives you not only the raw theory, but also the clues, when and how to use the operators to build something useful. The reactive Zip operator is quite tricky. It’s a static method, therefore you won’t see it, when you hit dot on your flux/mono instance. It also comes with many variants. However, it turns out that in reality, it’s rather easy to use. Just create several monos that make requests to other services and use Zip to wait for all the responses. Simple, isn’t it?

Sample code

Find a complete example on Github: zone84-examples/reactivezipoperator-demo

Subscribe
Notify of
guest

0 Comments
Inline Feedbacks
View all comments