Starting with Sequence
Sequence is a Lazy List implementation provided by Kotlin. For example, the following Fibonacci list is implemented using Sequence.
The terminal operator
We use an infinite loop in the Sequence builder block to continuously compute the elements of the sequence and send them to the consumer via yield
. The consumers of Sequence are the so-called terminal operators, such as forEach
, sum
, etc.
In the case of forEach
, whenever we yield
a new element in the sequence
block, we send that element to the forEach
block for processing, and then go back to the sequence
block to execute the subsequent logic and compute the next value. A normal list would first compute all the elements in memory. Because of this, we can use Sequence to represent an infinite sequence of elements. However, when consuming data, you must use the take
intervening operator to limit the number of elements to be consumed.
The intermediate operator
Sequence has map
and filter
intermediate operators, just like List. Each call to an intermediate operator on a List returns a new List; a call to an operator on a Sequence is equivalent to building a pipeline of data. After an element is yielded
, it travels through these defined data pipelines to the end consumer.
Assuming there is no end-consumer, then we have built the data pipeline, but the code in the Sequence builder does not run at runtime.
The following small example will output the string "AaBbCc"
### : find the first non-null property
Suppose we want to find the first Feedback.message
in the list of feedbacks
that is not null, i.e. "Cool and clear function"
. There are several ways to do this.
If the set has a lot of elements or needs to be transformed by many operators, Method 1 creates a new set for each transformation and stores the intermediate results, which is a bit wasteful. It would be better to use Sequence for such scenarios.
Use Case: Flat Nested List Iterator
Suppose there is a nested list of integers. Design an iterator that iterates over all the integers in this integer list. The integer list is represented by this interface.
Each item in a list is either an integer or another list. The elements of the list may also be integers or other lists. Example.
If all the elements are stored in a list, the problem can be easily solved using recursion.
After getting the list, it can be turned into an iterator. But the iterator should be “lazy”, meaning that the consumer consumes the data while the iterator traverses the data source. The advantages of this include.
- If the consumer only needs the first few pieces of data, and the data source is large, then pre-computing the entire List does a lot of useless work.
- Consumers do not have to wait for the entire List to be computed before proceeding to the next step. The whole data processing process can be sped up by concurrency.
However, this makes it inconvenient for us to recurse directly, and we need to maintain a stack manually by ourselves, which makes the original simple code much more complicated (you can see the full code at here) .
With Sequence, we can implement a lazy iterator using the recursive algorithm.
Flow: Suspendable Sequence
The above example has only pure computation in the data flow. In real scenarios we may need to do some time-consuming operations in the middle or terminal operators (assuming they are encapsulated in suspend functions), such as interfacing, reading and writing data from the database.
In the above example, we want to call the interface to search for "foo"
and "bar"
in order, and then store the search results in the database, i.e. in this order.
- yield
"foo"
- search
"foo"
- saveToDB
"foo"
- yield
"bar"
- search
"bar"
- saveToDB
"bar"
However, Sequence’s operators can only pass in regular (non-suspend) blocks, and cannot call suspend functions within them.
So we have Flow.
Flow is designed to execute sequentially by default, as are Kotlin Coroutine, with no concurrency. The above example is equivalent to the following loop in terms of execution order.
The terminal operator represented by collect
is a suspend function. Therefore, a CoroutineScope is required to consume Flow, while the terminal operators of Sequence are just normal functions.
In fact, like Sequence, the end operators of Flow are the “driving force” of the entire data flow. If there is no terminal operator, but only a number of map and filter intermediate operators are referenced, it is equivalent to building a data pipeline, the code in the Flow builder will not run and the data will not flow. This kind of Flow is called “cold flow”. A useful way to understand this Flow is to analogize its definition to a function definition, and the terminal operator to a (suspend) function call
- The code in the
flow {}
block andmap {}
will only run and the data will flow if the terminal operator is called onmyFlow
. - If
collect
is called twice onmyFlow
, it is like calling the same function twice. myFlow
contains a suspend block inside, and should be treated as a suspend function itself, and must provide a CoroutineScope when called.
Flow implements asynchrony with the suspend infrastructure provided by the Kotlin language, while RxJava needs to reflect asynchrony in the data types in the flow. For example, using RxJava for network requests
Notice that the return value of callSearchApi
needs to be nested in an Observable
, and the transformations in the data stream need to use flatMap
. With Coroutine and Flow, asynchronous functions do not need to wear any nesting, and transforms in the data stream can be done directly using map
, which is more natural.
A Toy Flow
The design and implementation of Kotlin Flow is very simple and elegant, so let’s try to implement a minimalist toy version.
The Flow
interface has only a single collect
method that takes a FlowCollector
argument. The FlowCollector
is a typical interface representing a consumer (e.g. Comparator
, also using <in T>
).
This collect
method can be seen as the link between the upstream and downstream of the reactive data flow.
- Upstream data sources, accessed via
this
. - Downstream consumers, accessed via
collector
.
Implementing collect
When we collect
we need to send the data to the FlowCollector
for consumption, so where does this data come from? We can use the Flow builder to create the flow
Hey, did you notice emit
? It’s none other than the FlowCollector.exit
method. So this Flow builder function takes a block with FlowCollector
as the receiver. Where does this FlowCollector
come from? It’s passed in during Flow.collect
.
This creates a closed loop: the emit
we call in the suspend block of the flow
builder function is called on the downstream consumer that will appear when we collect Flow in the future. This reflects the “lazy” nature of Flow: we don’t compute the data immediately when we create a Flow, but pass in a suspend function block, and when the Flow collects, we get the consumer FlowCollector
and use it as a receiver to call the pre-saved suspend block. This design makes particularly clever use of Kotlin’s receiver lambda feature.
Putting the above analysis into code, we get our toy implementation.
To make it easier for the caller to be able to consume data using collect {...}
to consume the data, you can define an extension function
Intermediate operators
The intermediate operator collects the upstream flow, transforms the upstream data, and sends the transformed data to a new flow. for example, we can implement map like this
Like Sequence and Iterable, Flow’s intermediate operators are all extension functions. This allows Flow’s interface to have only one method, keeping it lean and meanwhile making it easy for users to customize operators. There is no difference between calling custom operators and the standard library’s own operators, unlike RxJava, which requires additional APIs like compose
or lift
(see Implementing Custom Operators in RxJava).
The pattern “transform the upstream data and return a new Flow” is so common that the Kotlin library provides a transform
method to implement map, filter, and many other operators. It is also recommended to use it when defining your own operators.
Full code
|
|
As you can see, with Kotlin’s existing suspend infrastructure, implementing a reactive dataflow with asynchronous support is just two or three lines. In fact, the Kotlin Coroutine Library Flow is stripped down to a core code that is not too different from our toy implementation, but provides two additional guarantees: Context preservation and Exception transparency.
Additional guarantees for Flow
Context preservation
RxJava can switch threads with the observeOn
and subscribeOn
operators. But after we get an Observable, we can’t determine which thread the consumer will be executed in just by looking at the function signature, so we usually call observeOn(mainThread)
once manually. Sometimes the project will add the switch to the main thread in a global location (e.g. Retrofit’s call adapter), but when it comes to the actual call, you may be unsure, or you may habitually say observeOn(mainThread)
and the thread is cut again and again.
Flow provides a guarantee of ``context preservation’’: Flow ensures at runtime that the upstream cannot change the downstream context. In other words, the threads consuming Flow depend on the CoroutineContext of the collect call. What you see is what you get, and where you collect is what you execute. Suppose we take a Flow from some API.
This magicFlow
may be partially switched to some background thread, but these are internal implementation details of magicFlow
for the caller, so don’t care. The caller wants to consume the data in the main thread because he wants to update the UI. androidx provides a LifecycleScope
that specifies the main thread as a Coroutine scheduler. We call the suspend collect
method inside the LifecycleScope
-opened Coroutine to make sure that the collect block will be executed in the main thread.
This design is in line with Kotlin’s Coroutine. suspend function execution depends on the CoroutineScope of the suspend function and is entirely within the control of the caller. suspend function may internally switch to other threads (e.g. in IO scenarios where it is necessary to switch threads to avoid blocking the main thread), but the caller does not need to care. The details of the thread switch are also almost transparent to the caller of Flow. In Android client calls that expose Flow’s API to update the UI, you can just collect
Flow in the main thread, without having to manually switch threads.
Looking back at our toy implementation does not provide such a guarantee
In the above example, we are emitting data in a block that has switched the Coroutine scheduler. Recall that emit is actually a call to a lambda in collect, so that the internal implementation of the upstream flow “secretly” switches the downstream caller’s CoroutineContext, making it difficult for the caller to know at a glance where the code of the consuming flow will be executed in the CoroutineContext.
So, the Kotlin library’s Flow implementation checks that emit and collect are executed in the same Coroutine, otherwise it will simply throw an exception.
Note that Flow prohibits emitting data in different Coroutine, not that you can’t switch Contexts in a Flow block, as in the following example.
By the design of the Kotlin Coroutine, this is necessarily the way to write it. We can imagine abstracting the expression withContext
block into a suspend function that is called from within the Flow builder. suspend is internally transparent to the external caller for switching Context.
Exception transparency (Exception transparency)
Another guarantee of Flow is exception transparency. However, in my current opinion, it is not recommended to throw exceptions when using Kotlin Coroutine. You can check the official documentation if you are interested in this part.
Asynchronous Flow - Exception Transparency
Flow in Android client application
To summarize, the advantages of Flow over RxJava include.
- Relying on Kotlin’s suspend infrastructure, the design and implementation are simpler and more elegant, and the operators are more combinatorial.
- Extends function-defined operators, and custom operators are called in the same way as the Coroutine library’s own Flow operators.
- Additional guarantees such as context preservation are provided, continuing the Coroutine design idea of making the details of thread switching almost completely transparent.
However, on the Android client side, most asynchronous scenarios using Kotlin Coroutine (the suspend function) is sufficient. RxJava is very popular in the Android community, mainly to solve the problem of troublesome thread switching, a scenario that Kotlin Coroutine has been able to solve very elegantly.
Currently, more and more APIs in Android Jetpack use Flow, such as DataStore, Room, Paging 3 and so on. Establishing a proper understanding of Flow will enable better use of these libraries.
In addition, RxAndroid encapsulates common Android components as data sources, which is convenient for us to do function-responsive programming, and it works better in some simple scenarios, such as debounce the user’s input and then call the asynchronous interface. flow can use ReactiveCircus/FlowBinding.
Corresponding RxJava versions.
Notice that Flow’s terminal operator uses collectLatest
. Since Flow’s producer and consumer can both suspend, when the consumer suspend is processing an element suspend, if the producer emits new data, the *Latest series of operators will cancel the Coroutine block that processed the previous element. This behavior is similar to the logic of RxJava using switchMap
. In client-side scenarios, it seems that collectLatest
should be used in most cases.
But doesn’t Flow execute sequentially (Sequential), where the producer waits for the consumer? This is because the *Latest family of operators has an additional channel inside that listens and reacts upstream while suspend is going on downstream. Such a Flow always has an “active” part inside, which is different from the “cold flow” described in this article, and is a topic for another article.