OSGi PushStreams – online processing for live data


Introduction

I recently gave a talk at the OSGi Community Event about a new RFC being discussed at the OSGi Alliance called PushStreams. A video of the talk is available here, and the latest version of the RFC is here. The talk generated a lot of interest (a full room at the conference) and even a blog analysis of the RFC.

Given the amount of interest there has been I thought I should give a bit more detail about what’s actually going on, and some of the current intentions for the PushStream API.

 

So what is a PushStream?

An OSGi PushStream is basically a Java 8 stream, but one that’s been upgraded to support “pushing” data, hence the imaginative name! The OSGi Alliance certainly isn’t the first place to talk about push-based streams of data (see Reactive Streams, Reactive Extensions and Akka) but a lot of people are still new to the concept, so we’ll start with some background.

Pull based iteration

Thanks to smartphone mail apps we’re all familiar with the difference between pull and push as a delivery mechanism, but you probably don’t think of the difference when using Java 8 streams. The simple reason for this is the elegance of the API! Finding the numbers bigger than 20 in a collection in Java 8 is as simple as:

List<Integer> results =  list.stream()
                             .filter(n -> n > 20)
                             .collect(Collectors.toList());

The thing is, despite all the cool optimisations under the hood, the model isn’t actually that much different from doing this:

List<Integer> results = new ArrayList<>();
Iterator<Integer> it = list.iterator();
while(it.hasNext()) {
    Integer i = it.next();
    if(i > 20) {
        results.add(i);
    }
}

As you can see here the it.next() operation is pulling the next value from the collection. In fact, due to the contract of the iterator, the actual pull occurs earlier in the call to it.hasNext(). It is at this check that the iterator must determine whether a) there is a “next” value and b) store it locally so that it can be accessed.

For the types of collections that we’re normally used to dealing with in Java this doesn’t look like an issue. The call to hasNext() will simply check the next holder in memory to see if there is an Objectthere. The problem comes when the collection isn’t all in memory, and potentially isn’t all available locally. In this case the call to hasNext() may block for a long time while IO occurs (imagine waiting for a robot loading a tape!). Even worse, what if the data that we want to process comes from a “live” stream? Our only option is to block the local thread preventing any more work from being done.

Iterating over live data streams

A good example for this problem would be setting a watch to notify a user of the next email that arrives from one of their grandparents. From a Java perspective the collection of “future emails starting from now” doesn’t exist, but you can imagine an iterator or cursor that you can use to check the server. This iterator may do cool stuff like batching emails if several have arrived since the last check, but what it cannot ever do is return false from hasNext() as you will always be able to receive more email (except maybe if your account gets closed down…).

In this general case this pseudocode:

Iterator<Email> emailIterator = …;
while(emailIterator.hasNext()) {
    Email email = emailIterator.next();
    if(email.getSender().isMyGrandParent()) {
        alertUser();
        break;
    }
}

will spend a huge amount of time blocked on the call to emailIterator.hasNext(). In fact unless you receive a truly prodigious amount of spam it is highly unlikely that you receive emails fast enough to keep this loop busy. It should also be noted that your smartphone will use a lot of battery polling your email server!

Something else to think about – how tech savvy are your grandparents? My niece and nephew get lots of emails from their grandparents, but not all grandparents use the internet. How does this loop terminate if your grandparents never send you email?

Using asynchronous events in iteration

The big problem in the previous example is that we’re dealing with a potentially infinite stream of data that will arrive intermittently and asynchronously. There is nothing we can do to speed up when the next email will arrive to be checked. To fix this model we need to react to the existence of new email, rather than proactively looking for it (this is where Reactive Streams get their name from).

The pseudocode for a “reactive” processor is therefore something like:

public Continuation next(Email email) {
    if(email.getSender().isMyGrandParent()) {
        alertUser();
        return STOP;
    }
    return CONTINUE;
}

In this case our processor gets a callback for each email, and can decide whether to continue processing more data, or to stop with the current event. There is no need to block a thread, and the processor can be written in a simple, functional style.

 

The OSGi PushStream API

OSGi PushStreams are intended to do several things:

  1. Allow processing of both finite and infinite streams of data
  2. Use simple and familiar API wherever possible
  3. Support event streams where data arrives asynchronously with respect to the consumer
  4. Provide support for back-pressure within the processing pipeline

OSGi PushStreams do not, however, intend to cover every single stream processing use case. The reason for this is simple, in order to explicitly cover some of the more exotic processing use cases the API necessarily becomes much more involved. Instead the OSGi PushStreams will focus on covering the majority use cases as simply as possible. For other use cases a little more work may be required from the user, or there are always other API options that trade simplicity for built-in  functionality.

Defining a good stream processing API is a challenge, and the team who worked on Java 8’s Stream API did a great job. Rather than trying to completely rebuild an API the OSGi Alliance therefore took the Java 8 Stream API as a starting point. It is not enough, however, to take an existing pull-based API, it must be enhanced to make it push-based. In particular the stream API methods must all be non-blocking in their operation. Thankfully OSGi already has a primitive for dealing with asynchronous non-blocking return values, called the Promise. For those of you who don’t know about OSGi Promises you can look at some of my previous discussions of the subject from OSGi DevCon and an InfoQ interview.

Translating the return values of a Java 8 Stream to use Promises therefore gives us a stream that can be used to process pushed events. It also allows us to keep some of the useful properties of Java 8 streams. For example OSGi PushStreams are lazy, they do not start processing events until a terminal operation is invoked; OSGi PushStreams support short-circuiting, a value can be returned, closing the stream, without processing every item in the stream.

The resulting API can therefore do things like:

PushStream<Email> emailStream = …;
emailStream.map(Email::getSender)
           .filter(Sender::isMyGrandparent)
           .findFirst()
           .then(p -> alertUser());

In a fully composable asynchronous fashion.

 

Backpressure in push-based systems

One problem that push-based systems can suffer from is that there can be a disconnect between the consumer and the producer. In a pull-based system the consumer controls how fast the data is consumed. This happens either directly, because the consumer explicitly waits, or indirectly because the consumer is busy processing the last thing that it pulled from the stream. In either case the system is self-throttling. For a push-based model this is not always the case. If items are pushed asynchronously then they may arrive faster than they can be processed. This causes the items to queue up, either in extra threads, an explicit work queue, a temporary collection, or even a network data buffer. If the items arrive, on average, faster than they can be processed, on average, then there is a problem. Sooner or later the consuming system will run out of resources!

The solution to this is to provide the system with backpressure, i.e. a mechanism to feedback that data is queuing up in the system. There are numerous examples of backpressure, for example in TCP backpressure is applied when the sender fails to receive an ACK for a packet. In Reactive Streams and RX Java the backpressure is applied by the source explicitly requesting that more data items be pushed, once the requested number of items have been sent then no more may be pushed until the consumer requests more. OSGi PushStreams use a time-based block, indicating that no more events should be pushed until the prescribed timeout has passed.

It is important to remember that in the case of a “live” stream *any* backpressure mechanism will result in queueing at the source. This applies equally to all of the technologies listed above.

 

Further comments about the PushStream

The OSGi PushStream RFC is at a very early stage in the specification lifecycle, and none of the pieces are final. This is a great opportunity for people to provide feedback, which is why we really appreciate Dávid Karnok’s blog post sharing his comparison of  PushStream and Reactive Streams.

In this post Dávid shows that it is relatively simple to adapt a Reactive Stream producer into a PushEventSource. There is a natural synergy between the Reactive Stream API and the PushStream API, which both offer simple interfaces for producers and consumers. The PushStream API goes further, however, by including the Stream pipeline itself, and a number of useful processing functions.

A) It is non-trivial to adapt a PushEventSource to act as a Reactive Streams publisher

This is an interesting observation, because it hits at the heart of what backpressure is intended to do. In this case the problem Dávid is hitting is that a Reactive Streams Subscriber may wait an infinite amount of time before requesting more events from the Publisher. This does not match well with the PushEventSource, which runs on delays. Does this mean that the PushEventSource is the wrong abstraction?

Looking at the examples used in Reactive Streams projects, the event producers are typically “on demand” producers which generate data on the fly. This makes for nice examples, but isn’t typically how the real world works. Real event sources are things like website hits, button presses, or periodic temperature readings, these live data streams aren’t generated on demand, but occur whether we like it or not. As I described in the backpressure section, implementing backpressure in these situations requires some sort of queueing.

As an abstraction for a live data source a PushEventSource does not allow for indefinite delays of delivery, but adding a buffer between a PushEventSource and a Reactive Streams Consumer does make it very easy to translate between the two. The buffer also makes explicit the assumption that data can be queued up to a point, but eventually must block or fail. This buffering is glossed over in most Reactive examples, but in fact is necessary in almost all push-based systems. In fact buffering is so important that the ability to insert buffering into the pipeline is a built in feature of the PushStream.

If we look to the email example I used when describing the need for push-based streams we can see where the Reactive Streams model falls down in the absence of buffering. If I use a Reactive Streams Consumer to consume “email arrival events” then the email server is only able to inform me of an email when I allow it to. In this case the email server itself is acting as the buffer, persisting the email until it can be retrieved later, and allowing me to catch up later if 20 emails arrive at once.

What if we didn’t have a buffer in the email server, and could receive email when we were prepared to accept email notifications? In this case we would have a serious problem – if Grandma sent me an email with a picture of her cat I would only be able to receive it if I had said I was ready. If instead I was busy receiving spam then I might never get Grandma’s email! Obviously this is a slightly contrived example, but it is interesting to note that SMTP does allow for email servers to temporarily apply backpressure. They do this by returning a 432 response indicating that the server is currently busy. On receiving a 432 the sender should wait and try again later which is, in effect, the same as the backpressure used in PushStreams!

B) It is not possible for a consumer to asynchronously close the stream

In the Reactive Streams model the Subscriber is initialised with a Subscription representing the connection between it and the Publisher. This Subscription can be used to close the connection between the Publisher and Subscriber. PushStreams do have an equivalent of the Subscription, being the Closeable returned by the open() method of the PushEventSource. This Closeable, however, is not passed to the PushEventConsumer. There are two reasons for this. Firstly, it allows the PushEventConsumer to be a Single Abstract Method interface type, which is lambda-friendly. Secondly, it allows the PushEventConsumer to be stateless, which is much safer as a processing model.

The result of this restriction is that the consuming function is not able to close the stream, except as the result of consuming an event. This may be a problem in some cases, however:

  • Closing the pipe between source and consumer is almost always as a result of receiving a data event
  • The PushEventConsumer will typically be part of the same class that called PushEventSource#open(), and can use the Closeable there if needed
  • In OSGi the PushEventSource/Consumer will typically be used in a whiteboard service pattern. Unregistering the service will have the effect of closing the connection
  • The Consumer can always ignore the next event and close the connection at that point.

Therefore, whilst it is a valid point that the consumer will not be able to eagerly close the pipeline in all cases, it will still be able to close the pipeline eventually, and usually to close it eagerly.

C) How do I know how much backpressure to apply?

This is a good question and the answer is, of course, it depends on the rest of the system. The right amount of backpressure is enough so that the consumer does not become overloaded, but not so much that the producer and intermediate buffers fill up.

There are actually several built-in backpressure policies as part of the PushStream API that can be applied to buffering stages. These policies are easily configurable and, if you are in an OSGi environment, they are easily reconfigurable using OSGi’s ConfigurationAdmin service.

From this position testing and experience with the runtime will allow the sensible defaults to be tuned for the particular application that they are running in. This is exactly the same position that people find themselves in when using buffers in other push-based systems.

 

Summary

OSGi PushStreams are a new and exciting RFC at the OSGi Alliance. They make some different optimisation choices from other popular push event pipelines, however they are easy to integrate with other popular standards. Over the next weeks and months the API will be further refined and prototyping will begin. I’ll be regularly feeding back about what’s going on, so watch the RFC and watch this space!

Share This:
Facebooktwitterredditpinterestlinkedinmail

Leave a comment

Your email address will not be published. Required fields are marked *