We have not used stream processors extensively in the examples presented so far, but plain stream processors are interesting for at least these reasons:
We use the following informal definitions:
Figure 26. A general stream processor and a stream processor with a single input stream and a single output stream.
The restriction may seem severe, but the chosen set of combinators allows streams to be merged and split, so a stream processor with many input/output streams can be represented as one with a single input stream and a single output stream. The advantage is that we can take a combinator-based approach to building networks of communicating stream processors. The combinators are discussed further in Chapter 17. Below we discuss how to write atomic stream processors, that is, stream processors that do not consist of several concurrently running stream processors. Their behaviour is defined by a linear sequence of I/O actions.
wheredata SP input output
input
and output
are the types of the
elements in the input and output streams, respectively
(Figure 27). (The implementation of stream processors in a
lazy functional language are discussed in
Chapter 20.)
Figure 27. A stream processor of type
SP
i o.
The library also provides the function
which can be used on the top level of a program built with stream processors (see Chapter 19). The functionrunSP :: SP i o -> [i] -> [o]
absF
discussed in Chapter 12 can be
used to combine stream processors with fudgets.
As an example of how to use these in recursive definitions of stream processors, consider the identity stream processorputSP :: output -> SP input output -> SP input output getSP :: (input -> SP input output) -> SP input output nullSP :: SP input output
the busy stream processor-- The identity stream processor idSP :: SP a a idSP = getSP $ \ x -> putSP x idSP
and the following stream-processor equivalents of the well known list functions:-- A stream processor that is forever busy computing. busySP :: SP a b busySP = busySP
The stream processormapSP :: (a -> b) -> SP a b mapSP f = getSP $ \ x -> putSP (f x) $ mapSP f filterSP :: (a -> Bool) -> SP a a filterSP p = getSP $ \ x -> if p x then putSP x $ filterSP p else filterSP p
nullSP
need actually not be
considered as a primitive. It can be defined as
i.e., it is a stream processor that ignores all input and never produces any output. A practical advantage with an explicitly representednullSP = getSP $ \ x -> nullSP
nullSP
is that it allows stream processors
that terminate to be ``garbage collected''.concatMapSP :: (i->[o]) -> SP i o
.putListSP
that outputs the elements
of a list, one at a time:
AndputListSP :: [o] -> SP i o -> SP i o putListSP [] = id putListSP (x:xs) = putSP x . putListSP xs
concatMapSP
itself:
concatMapSP f = getSP $ \ x -> putListSP (f x) $ concatMapSP f
mapFilterSP :: (i->Maybe o) -> SP i o
.mapFilterSP f = getSP $ \ x -> case f x of Nothing -> mapFilterSP f Just y -> putSP y $ mapFilterSP f
sumSP
, a stream processor that computes the accumulated
sum of its input stream:
In this case, the internal state is a value of the typesumSP :: Int -> SP Int Int sumSP acc = getSP $ \ n -> putSP (acc+n) $ sumSP (acc+n)
Int
, which also happens to be the type of the input
and output streams. In general, the type of the input and output
streams can be different from the type of the internal state,
which can then be completely hidden.
The Fudget library provides two general functions for construction of stream processors with internal state:
(mapAccumlSP :: (s -> i -> (s, o)) -> s -> SP i o concatMapAccumlSP :: (s -> i -> (s, [o])) -> s -> SP i o
concatMapAccumlSP
is also known as mapstateSP
.)
The first argument to these functions is a state transition
function which given the current state and an input message
should produce a new state and an output message (zero or more
outputs in the case of concatMapAccumlSP
). Using mapAccumlSP
we
can define sumSP
without using explicit recursion:
Representing state information as one or more accumulating arguments is useful when the behaviour of the stream processor is uniform with respect to the state. If a stream processor reacts differently to input depending on its current state, it can be more convenient to use a set of mutually recursive stream processors where each stream processor corresponds to a state in a finite state automaton. As a simple example, consider a stream processor that outputs every other element in its input stream:sumSP :: Int -> SP Int Int sumSP = mapAccumlSP (\ acc n -> (acc+n,acc+n))
It has two states: the ``pass on'' state, where the next input is passed on to the output; and the ``skip'' state, where the next input is skipped.passOnSP = getSP $ \ x -> putSP x $ skipSP skipSP = getSP $ \ x -> passOnSP
The two ways of representing state illustrated above, can of course be combined.
mapAccumlSP
and concatMapAccumlSP
using putSP
and getSP
.concatMapAccumlSP :: (s -> i -> (s, [o])) -> s -> SP i o concatMapAccumlSP f s0 = getSP $ \x -> let (s, ys) = f s0 x in putListSP ys $ concatMapAccumlSP f s mapAccumlSP :: (s -> i -> (s, o)) -> s -> SP i o mapAccumlSP f s0 = getSP $ \x -> let (s, y) = f s0 x in putSP y $ mapAccumlSP f s
The stream processorseqSP :: SP a b -> SP a b -> SP a b
sp1 `seqSP` sp2
behaves like
sp1 until sp1 becomes nullSP
, and then behaves like
sp2. However, the same can also be achieved by making all
procedures end with a call to a continuation stream processor
instead of nullSP
; so seqSP
does not add any new
power.
We should also note that if this is to work properly, the
operation nullSP
must be explicitly represented, and not
just defined as a stream processor that ignores all input and
never produces any output; contrary to what was suggested in
Section 16.2.
-- The type: type SPm input output answer -- Standard monad operations: unitSPm :: a -> SPm i o a bindSPm :: SPm i o a -> (a -> SPm i o b) -> SPm i o b -- Monadic versions of nullSP, putSP and getSP: nullSPm :: SPm i o () putSPm :: o -> SPm i o () getSPm :: SPm i o i -- A glue function: runSPm :: SPm i o () -> SP i oFigure 28. The stream-processor monad.
Thanks to runSPm
you can use the combinators for
``plain'' stream processors to construct networks of
stream-processor monads.
For writing complex stream processors, it is of course possible
to combine the stream-processor monad with other monads, e.g., a
state monad. The Fudget library defines the type SPms
for stream processor-monads with state. A closer presentation
and an example of its use can be found as part of
Chapter 31.