20 Implementing stream processors

In this chapter we present different implementations of stream processors, including a indeterministic solution that requires a language extension for parallel evaluation, and two purely functional ones: the first is based on streams as lazy lists, and the other uses a datatype with constructors corresponding to the operations of atomic stream processors We also discuss how suitable different representations are for parallel and sequential implementations. We start by discussing some design goals that we had in mind.

20.1 Design goals for stream processors

The design of the stream processors was already from the start influenced by the intended application as building blocks in a GUI library. In this context, we found the following properties important:
Hierarchical structure.
The result of a composition of stream processors should also be a stream processor, thus allowing complex process networks to be built in an hierarchical structure. There should be no difference in principle between an atomic stream processor and one composed from several smaller stream processors.
Encapsulated state.
We should permit each stream processor to have an internal state which is invisible from the outside, and which does not interfere with the state of other stream processors.
I/O connectedness.
It should be possible to connect stream processors to the I/O system in an abstract way, so that the I/O effects can be hidden by an abstract type with associated combinators for their combination.
Reactive behaviour.
The intended use of stream processors is in the implementation of interactive (reactive) programs. This means that programs are dominated by communication rather than computation: a program waits idly for some input to arrive, computes and outputs a response to the input, and then goes back to the idle state.
Demand-driven evaluation.
The intention is to use stream processors in a lazy functional language, where expressions are evaluated on demand. Stream processors should also behave lazily--they should not do any work until a value is demanded from their output stream, and they should not demand anything from their input stream unless the input can be used to produce a demanded output.
Typed, and higher order.
There should be no restriction on the element type of streams. It should be possible to transfer anything, from numbers and booleans to functions and stream processors. Communication should be type safe.
Parallel and sequential implementations.
Stream processors should be implementable in a sequential language, but we still want to keep the definitions general enough to be able to take advantage of constructions for indeterministic choices and parallel evaluation.
Especially in the light of the last property, it seems desirable to have an abstract, formal semantics which can be used to reason about different implementations of stream processors, and programs using them. So far, we have not elaborated such a semantics, but instead we have concentrated on more practical work by developing the Fudget library and application programs. Moreover, the implementation of the basic stream-processor combinators is quite simple, and can therefore be viewed as being a semantics on its own--although not the most concise and abstract one could imagine. Nevertheless, we have outlined a simple stream-processor calculus with an accompanying operational semantics in the future work chapter (see Section 43.1).

20.2 Intuitive ideas--what is the problem?

In a lazy functional language, a natural choice is to represent streams as lists. Thanks to laziness, the elements of the list can be computed on demand, one element at a time. The elements can thus form a sequence in time rather than a sequence in space, which would be the case in a strict language.

So a stream with elements of type a can be represented as a list with elements of type a. A stream processor can be represented as function from the input stream to the output stream:

type Stream a = [a]
type SP i o = Stream i -> Stream o
We call this the list-based representation. An obvious advantage with this approach is that the list type is a standard type and all operations provided for lists can be reused when defining stream processors.

Another advantage with this representation is that it clearly shows the close relationship between functions and stream processors. For example, serial composition is simply function composition:

(-==-) :: SP m o -> SP i m -> SP i o
sp1 -==- sp2 = sp1 . sp2
The basic stream processor actions also have very simple definitions:

nullSP        = \ xs -> []
x `putSP` sp  = \ xs -> x : sp xs
getSP isp     = \ xs -> case xs of
                           []     -> []
                           x:xs'  -> isp x xs'
A problem with this representation however, is that parallel composition is impossible to implement. A reasonable definition would have to look something like this:

sp1 -*- sp2 = \ xs -> merge (sp1 xs) (sp2 xs)
     where merge ys zs = ???
But what should ??? be replaced with, so that the first output from the composition is the first output to become available from one of the components? For example, suppose that

sp1 _|_ = _|_
sp2 _|_ = 1:_|_
that is, sp1 needs some input before it can produce some output, but sp2 can output 1 immediately. Then, the composition should immediately output 1,

(sp1 -*- sp2) _|_ = 1:_|_
But (sp2 -*- sp1) _|_ should also be 1:_|_, so ??? must be an expression that chooses the one of ys and zs which happens to be non-bottom. This can clearly not be done in an ordinary purely functional language.

As a more concrete example, consider what should happen if we apply the stream processor

map (*100) -*- filter even
to [1, 2, 3, 4, ...]. If the input elements appear at a slower rate than they can be processed by either map or filter, the desired output stream would be something like [100, 200, 2, 300, 4, 400, ...], i.e., in this particular case there should be two elements from the left stream processor for every element from the right stream processor.

The elements in the two output streams should be merged in the order they become computable as more elements of the input stream become available. However, there is no way of telling in a sequential language which of the two stream processors will be the first one to be able to produce an output. Is seems that the two streams need to be evaluated in parallel, and then elements must be chosen in the order they become available.

The most natural and general solution to this problem is to use parallel evaluation, and we will take a look at this next. But by changing the representation of stream processors it is possible to obtain solutions that work in an ordinary sequential language. We will look at these solutions in Section 20.4.

20.3 Parallel implementations

As illustrated in the previous section, when representing stream processors as list functions, parallel evaluation is needed, not to gain speed, but because no sequential evaluation order can give the desired result. We need an operator that starts the evaluation of two subexpressions in parallel, and then tells which evaluation finished first. The result is thus not determined by the values of the expressions, but rather from their operational behaviour. Therefore, such an operator cannot be added to a purely functional language without problems.

The operator suggested above is a variant of amb, McCarthy's ambivalent operator [McC63]. But a programming language with such an operator is not purely functional, and thus makes ordinary equational reasoning unsound. Although such a language may still be useful [Mor94], there are solutions that allow you to make indeterministic choices in a purely functional way.

In the following section, we will introduce a variant of amb which is purely functional.

20.3.1 Oracles

To be purely functional, the result of an operator must depend entirely on the values of the arguments, and the same arguments should always give the same result. One way to make an operator for indeterministic choice purely functional is to introduce an extra argument and pretend that the result is determined solely by this argument, although operationally, something else happens. Such an extra argument is called an oracle [Bur88].

We call our operator for indeterministic choice choose:

choose :: Oracle -> a -> b -> Bool
Operationally, the expression choose o a b is evaluated by starting the evaluation of a and b in parallel and then returning True if a reaches head normal form first, and False if b does. Denotationally, choose o a b returns True or False depending only on the value of the oracle o (which magically happen to have the ``right'' value). An oracle should only be used once, since it must always give the same answer. We therefore distribute an infinite tree of oracles to all stream processors, as an additional argument:

data OracleTree = OracleNode OracleTree Oracle OracleTree
type SP i o = OracleTree -> Stream i -> Stream o
Using the oracle tree, we can now easily implement parallel composition of stream processors (see also Figure 42):

sp1 -*- sp2 =
        \(OracleNode (OracleNode ot _ ot1) _ ot2) xs ->
            merge ot (sp1 ot1 xs) (sp2 ot2 xs)
  where merge (OracleNode ot o _) ys zs =
          if choose o ys zs
          then merge' ot ys zs
          else merge' ot zs ys
        merge' ot (y:ys)  zs = y:merge ot ys zs
        merge' ot []      zs = zs
In this implementation, the oracle tree is split into three: two subtrees are fed to the composed stream processors, and one is given to the function merge, together with the output streams from the composed stream processors. The function merge extracts fresh oracles from the tree and uses choose to see which stream is first to reach head normal form, It then calls merge', with the second argument being the stream which has been evaluated.

Figure 42. Parallel composition of stream processors using oracles.

20.4 Sequential implementations

As we have seen above, the most natural representation of streams, i.e., as lists, requires parallel evaluation and indeterministic choices. But there are solutions that allow you to stay within a purely functional language, like Haskell. Although they do not provide the same degree of parallelism, they have proved to be adequate for practical use. The solutions below have been used in the implementation of the Fudget system.

20.4.1 Synthetic oracles

As seen in Section 20.2, the problem with the most natural representation of stream processors--representing streams as lazy lists and stream processors as functions on lazy lists--is the implementation of parallel composition. It is impossible to know in which order the output streams should be merged.

If we impose the restriction that sp1 and sp2 must produce output at the same rate, then sp1 -*- sp2 can be defined as:

(sp1 -*- sp2) xs = merge (sp1 xs) (sp2 xs)
   where merge (y:ys) (z:zs) = y:z:merge ys zs
However, it is awkward to impose such a constraint between the output streams of two different stream processors. Also, this solution does not work well for tagged parallel composition. A more useful constraint relates the input and output stream of a single stream processor.The function map is an example that satisfies this constraint, whereas filter is a function that does not.

With this restriction, tagged parallel composition can easily be implemented: the next element in the output stream should be taken from the stream processor that last received an element from the input stream. The following implementation of tagged parallel composition uses this fact by merging the output streams using a stream of synthetic oracles computed from the input stream (see also Figure 43):

(-+-) :: SP a1 b1 -> SP a2 b2 -> SP (Either a1 a2) (Either b1 b2)
(sp1 -+- sp2) xs = merge os (sp1 xs1) (sp2 xs2)
  where
    xs1 = [ x | Left x <- xs ]
    xs2 = [ y | Right y <- xs ]
    -- os : a synthetic oracle stream
    os  = map isLeft xs
    merge (True:os) (y:ys) zs =
           Left y:merge os ys zs
    merge (False:os) ys (z:zs) =
           Right z:merge os ys zs
    isLeft (Left   _) = True
    isLeft (Right  _) = False

Figure 43. Parallel composition of stream processors using synthetic oracles.

This solution has some practical problems, however. As it stands above, there is a potentially serious space-leak problem. Consider the evaluation of an expression like

(sp1 -+- sp2) [Left n | n<-[1..]]
Here, sp2 will never receive any input. This means that merge will never need to evaluate the argument (sp2 xs2), which holds a reference to the beginning of the input stream via xs2. This would cause all input consumed by the composition to be retained in the heap. However, provided that pattern bindings are implemented properly [Spa93], this problem can be solved by computing xs1 and xs2 with a single recursive definition that returns a pair of lists:

split :: [Either a b] -> ([a],[b])
split [] = ([],[])
split (x:xs) =
    case x of
      Left   x1  -> (x1:xs1,xs2)
      Right  x2  -> (xs1,x2:xs2)
  where
    (xs1,xs2) = split xs
Another problem is that the 1-1 restriction is rather severe. What should a stream processor do if it does not want to put a value in the output stream after consuming an input (like filter)? What if it wants to output more than one value? Obviously, if an implementation with this restriction is given to a programmer, he will invent various ways to get around it. It is better to provide a solution from the beginning.

One way to relieve the restriction is to change the representation of stream processors to

type SP a b = [a] -> [[b]]
thus allowing a stream processor to output a list of values to put in the output stream for every element in the input stream. Unfortunately, with this representation the standard list functions, like map and filter, can no longer be used in such a direct way. For example, instead of map f one must use map (\x->[f x]). Serial composition is no longer just function composition, rather it is something more complicated and less efficient. Also, it is still possible to write stream processors that do not obey the 1-1 restriction, leading to errors that can not be detected by a compiler. Consequently, it is not a good idea to reveal this representation to the application programmer, but rather provide the stream-processor type as an abstract type. And while we are using an abstract type, we might as well use a better representation.

20.4.2 Continuation-based representation

Instead of using lists, the Fudget library uses a data type with constructors corresponding to the actions a stream processor can take (as described in Section 3.2):

data SP i o
  =  NullSP     
  |  PutSP o (SP i o)
  |  GetSP (i -> SP i o)
We call this the continuation-based representation of stream processors. The type has one constructor for each operation a stream processor can perform. The constructors have arguments that are part of the operations (the value to output in PutSP), and arguments that determine how the stream processor continues after the operation has been performed.

The continuation-based representation avoids the problem with parallel composition that we ran into when using the list-based representation, since it makes the consumption of the input stream observable. With list functions, a stream processor is applied to the entire input stream once and for all. The rate at which elements are consumed in this list is not observable from the outside. With the continuation-based representation, a stream processor must evaluate to GetSP sp each time it wants to read a value from the input stream. This is what we need to be able to merge the output stream in the right order in the definition of parallel composition.

An implementation of broadcasting parallel composition is shown in Figure 44. The implementation of tagged parallel composition is analogous. Note that we arbitrarily choose to inspect the left argument sp1 first. This means that even if sp2 could compute and output a value much faster than sp1, it will not get the chance to do so.

NullSP        -*- sp2           = sp2
sp1           -*- NullSP        = sp1
PutSP o sp1'  -*- sp2           = PutSP o (sp1' -*- sp2)
sp1           -*- PutSP o sp2'  = PutSP o (sp1 -*- sp2')
GetSP xsp1    -*- GetSP xsp2    = GetSP (\i -> xsp1 i -*- xsp2 i)

Figure 44. Implementation of parallel composition with the continuation-based representation.

With the continuation-based representation, serial composition can be implemented as shown in Figure 45.

NullSP        -==- sp2           = NullSP
PutSP o sp1'  -==- sp2           = PutSP o (sp1' -==- sp2)
GetSP xsp1    -==- NullSP        = NullSP
GetSP xsp1    -==- PutSP m sp2'  = xsp1 m -==- sp2'
GetSP xsp1    -==- GetSP xsp2    = GetSP (\i -> GetSP xsp1 -==- xsp2 i)

Figure 45. Implementation of serial composition with the continuation-based representation.

A definition of the loop combinator loopSP is shown in Figure 46.

loopSP sp = loopSP' empty sp
  where
    loopSP' q NullSP         = NullSP
    loopSP' q (PutSP o sp')  = PutSP o (loopSP' (enter q o) sp')
    loopSP' q (GetSP xsp)    = case qremove q of
                                 Just (i,q')  -> loopSP' q' (xsp i)
                                 Nothing      -> GetSP (loopSP . xsp)

-- Fifo queues
data Queue
empty    :: Queue a
enter    :: Queue a -> a -> Queue a
qremove  :: Queue a -> Maybe (a,Queue a)

Figure 46. Implementation of loopSP with the continuation-based representation.

Example:
Implement runSP :: SP a b -> [a] -> [b].
Solution:
runSP sp xs =
    case sp of
      PutSP y sp'  -> y : runSP sp' xs
      GetSP xsp    -> case xs of
                         x : xs'  -> runSP (xsp x) xs'
                         []       -> []
      NullSP       -> []

20.5 Continuations vs list functions

We have seen two representations of stream processors: one based on list functions and one based on continuations. Which one is better?

Using list functions works well for parallel implementations. Demand is propagated from the output to the input stream by the normal evaluation mechanism of the functional language. Since streams are represented as lists, the standard list functions can be used directly as stream processors.

For sequential implementations, we saw that representing stream processors as functions from streams to streams prevented us from implementing parallel composition. Here, the continuation-based representation seems more attractive, and is the one that we currently employ in the Fudget library. The continuation-based representation also allows stream processors to be detached, moved, and plugged in somewhere else in the program--something that is used in Chapter 25.

Since our implementation is based on a sequential programming language, we do not get true concurrency. As long as all stream processors quickly react to input to avoid blocking other stream processors in the program, this is acceptable in practice. The reactiveness property is not enforced by the compiler, however.

It would be nice to have a representation that works well for both parallel and sequential implementations. Is perhaps the continuation-based representation useful also for parallel implementations? Consider the composition

(sp1 -*- idSP) -==- sp2
The first output from the composition should be either the first output from sp1 or the first output from sp2, whichever happens to be ready first. But the parallel composition must evaluate to either PutSP ..., or GetSP .... In the first case we have prematurely committed ourselves to taking the output from sp1 first. In the second case we will not be able to deliver the first output until after sp2 has delivered its first output. It is not clear to us how the desired behaviour should be achieved.