The Gofer implementation of the process scheduler is implemented in C as part of Gofer's runtime system. A feature of the scheduler is that it attempts to keep the message queues short by giving higher priority to processes that read from channels with many waiting messages.
A limitation in the Gofer implementation of Gadgets resulted in that for each channel, at most one process can be waiting for arriving messages, and channels must be explicitly claimed by a process before trying to read from them.
The functional scheduler that we will describe is not as advanced as the original one, but it is simpler and does not have the above mentioned limitation. Before describing the functional scheduler, we give an overview of the process primitives as they appear in the original Gofer version.
Process s
represents processes which have an internal state of type s. Communication between processes is asynchronous, and
mediated by typed wires.The communication along wires is directed, one end is input only (type Wire a = (In a,Out a) data In a = In Int data Out a = Out Int
In a
), the other is output only ( Out a
). If a process only knows the input (output) end
of a wire, it can only read from (write to) it. Note that the
wire ends are merely represented by integer identifiers,
although the types carry extra information about the message
type.
Wires are created by the primitive primWire
.
(Just as with stream processors, the sequential behaviour of a process is programmed in a continuation passing style). To transmit something along a wire, one usesprimWire :: (Wire a -> Process s) -> Process s
primTx
.A process can wait for input from many wires simultaneously, by using guarded processes. A guarded process (which we denoteprimTx :: Out o -> o -> Process s -> Process s
AGuarded s
) is a process continuation that is
waiting for input from one wire, and is formed by primFrom
.Given a list of guarded processes, we can wait for input to any of them byprimFrom :: In m -> (m -> Process s) -> AGuarded s
primRx
.Now, why are there two primitives for receiving input, when there is only one for transmitting output? The reason is that although we could combinetype Guarded s = [AGuarded s] primRx :: Guarded s -> Process s
primFrom
and primRx
,the combination forces us to wait for messages of the same type. The introduction of guarded processes hides the message types and allows a process to select input from wires of different type.-- not general enough! primRxFrom :: [(In m, (m -> Process s))] -> Process s -> Process s primRxFrom = primRx . map (uncurry primFrom)
Processes need not live forever, they can die by calling primTerminate
.
Last but not least, a process can spawn a new process.primTerminate :: Process s
Thus,primSpawn :: Process s' -> s' -> Process s -> Process s
primSpawn p s0 c
will
spawn the new process p, giving it initial state s0, and continue with c.
Gadget Gofer also uses primitives for claiming and disowning wires, and requires that a wire should be claimed by a process before attempting to receive from it. Since the functional scheduler does not have this restriction, we ignore them in the following. The presentation will also ignore
primRx
actually takes an additional debugging
argument, and nci
and nco
, which are not connected to anything.The mouse and keyboard can be configured by transmitting mouse or keyboard commands, respectively, whereas the screen commands are used for drawing. The events report key presses, mouse clicks, mouse movements, and exposure events.keyboard :: In KeyboardCmnd -> Out KeyboardEvnt -> Process s mouse :: In MouseCmnd -> Out MouseEvnt -> Process s screen :: In [ScreenCmnd] -> Out ScreenEvnt -> Process s
These three primitives are started once inside the Gadget window system. For example, the keyboard process is started with
After this, the keyboard events are read fromwire $ \smk -> wire $ \ksm -> spawn (keyboard (ip smk) (op ksm))
op smk
,
and the keyboard is configured by writing to ip ksm
.
To execute a process with a given initial state, Gadget Gofer
provides the primitive primLaunch
.
primLaunch :: Process s -> s -> IO ()
readState
and setState
.In Gadget Gofer, the typereadState :: (s -> Process s) -> Process s setState :: s -> Process s -> Process s
Process s
is a synonym for a
function from s
to s
, that is, a state transformer.The implementation oftype Process s = s -> s
readState
and showState
is
then straightforward.readState c = \s -> c s s setState s c = \_ -> c s
In the functional version, processes cannot have the simple
function type s -> s
any more, since we
must be explicit about the effects that processes can
have. Instead, we will define the process type in steps, where
we start with a stream-processor type that handles messages
related to the keyboard, mouse and screen. On top of the
stream-processor type, we define a state monad (SPms)
with operations for manipulating a state in addition to the I/O
operations of the stream processor. The state is used by the
scheduler, and is used to define a simple process type Process0
, which amounts to the Gadget processes except that
they do not have any local state. Having done this, we define
the full Gadget processes on top. The steps are summarised in
the following table.
Process
Gadget processes with state Process0
Processes without state SPms
Stream-processor state monads SP
Plain stream processors
SPms
:A computation of typetype SPms i o s a = (a -> (s -> SP i o)) -> (s -> SP i o)
SPms i o s
a
can input messages of type i, output messages
of type o, manipulate a state of type s, and
return a value of type a through the following
operations:getSPms :: SPms i o s i putSPms :: o -> SPms i o s () loadSPms :: SPms i o s s storeSPms :: s -> SPms i o s () getSPms = \k s -> getSP $ \i -> k i s putSPms o = \k s -> putSP o $ k () s loadSPms = \k s -> k s s storeSPms s = \k _ -> k () s
Process0
. The state of the
stream processor is used by the scheduler for bookkeeping.Just as in the Gofer implementation, we use integers to identify wire ends, except that we call the integers wire numbers (type Process0 i o = SPms i o (SchedulerState i o) () data SchedulerState i o = SS{ freeWire :: Wno , messageQs :: MessageQueues , ready :: [Process0 i o] , guarded :: [Guarded0 i o] , input :: [i -> Process0 i o] }
Wno
).What follows are definitions of the primitives for creating wires and processes, and communication over wires. We suffix the primitives with anewtype Wno = Wno Int newtype In a = In Wno newtype Out a = Out Wno
0
to indicate that they operate
on processes without local state.
A new wire is allocated with primWire0
, which increments
the field freeWire
in the state, and hands a fresh wire
to the continuation.
The second component in the scheduler state (primWire0 :: (Wire a -> Process0 i o) -> Process0 i o primWire0 c = do ps@(SS{ freeWire = w@(Wno i) }) <- loadSPms storeSPms ps{ freeWire = Wno (i+1) } c (In w, Out w)
messageQs
)
is a mapping from wire numbers to queues of not yet delivered
messages.The typestype MessageQueues = IntMap (Queue Msg)
IntMap
and Queue
implement integer
maps and Okasaki's queues [Oka95] come from HBC's
library, and have the following signatures:module Queue where empty :: Queue a snoc :: a -> Queue a -> Queue a tail :: Queue a -> Queue a head :: Queue a -> a null :: Queue a -> Bool
The operations are standard, exceptmodule IntMap where empty :: IntMap a modify :: (a -> a) -> a -> Int -> IntMap a -> IntMap a delete :: Int -> IntMap a -> IntMap a lookup :: Int -> IntMap a -> Maybe a
modify
, which
deserves an explanation. The expression modify f
a i m
applies the function f
to the entry i in m if it exists. Otherwise, it
inserts the value a at i.
Each message is paired with the wire number. Since different wires can have different type, messages can also be of different type. We use an existential type (an extension to Haskell provided by HBC) to hide the message type when putting messages in the queue.
Constructing values of typedata Msg = Msg ?a
Msg
is easy, but when
de-constructing them, we cannot assume anything about the type of
the argument. We return to this problem later.
Sending a value on a wire amounts to queueing the wire number together with the value.
The fieldprimTx0 :: Out a -> a -> Process0 i o -> Process0 i o primTx0 (Out wno) msg p = if wno == ncWno then p else do ps@(SS{ messageQs, ready }) <- loadSPms storeSPms ps{ messageQs = addMsg wno (Msg msg) messageQs , ready = p:ready} scheduler addMsg :: Wno -> Msg -> MessageQueues -> MessageQueues addMsg wno m = modify (snoc m) (snoc m Queue.empty) wno
ready
holds a list of processes that are ready
to run. When spawning off a new process, we put it on the ready
list.There is also a list of processes waiting for messages, stored in the fieldprimSpawn0 :: Process0 i o -> Process0 i o -> Process0 i o primSpawn0 p' p = do ps@(SS{ ready }) <- loadSPms storeSPms ps{ ready = p':ready } p
guarded
. The elements are lists of stateless
guarded processes (AGuarded0 i o
).A guarded process is a wire number and a function which takes a message as a parameter. The actual type of the message is hidden indata AGuarded0 i o = AGuarded0 Wno (?a -> Process0 i o)
AGuarded0
, so that we can form a list of guarded
processes regardless of what message type they are waiting
for.A guarded stateless process is formed withtype Guarded0 i o = [AGuarded0 i o]
primFrom0
.The functionprimFrom0 :: In m -> (m -> Process0 i o) -> AGuarded0 i o primFrom0 (In wno) f = AGuarded0 wno f
primRx0
will wait for a message to arrive to
any of the guarded processes in the first parameter. It adds the
guarded processes to the state, and then jump to the scheduler to find another process to execute.The scheduler's (Figure 83) job is to apply guarded processes to matching messages, move them to the ready list, and pick one from the ready list to run. In case the ready list is empty, the input list is investigated. This list contains processes waiting for input from the outside of the stream processor. If this list is also empty, then the gadget program is finished. Otherwise, we do stream-processor input and give the message to all processes in the input list.primRx0 :: Guarded0 i o -> Process0 i o -> Process0 i o primRx0 g def = do ps@(SS{ guarded }) <- loadSPms storeSPms ps{ guarded = g:guarded } scheduler
scheduler :: Process0 i o scheduler = do ps@(SS{ freeWire, messageQs, ready, guarded, input }) <- loadSPms let (messageQs',guarded',moreReady) = match messageQs guarded let run p ready' input' = do storeSPms ps{ messageQs = messageQs' , ready = ready' , guarded = guarded' , input = input' } p case (moreReady++ready) of [] -> if null input then nullSPms else do i <- getSPms case [ih i | ih <- input] of p:ready' -> run p ready' [] p:ready' -> run p ready' input match :: MessageQueues -> [Guarded0 i o] -> (MessageQueues,[Guarded0 i o],[Process0 i o]) match m [] = (m,[],[]) match m (g:f) = case match1 m g of Nothing -> (m',g:f',r) where (m',f',r) = match m f Just (m1,p) -> (m2,f',p:r) where (m2,f',r) = match m1 f match1 :: MessageQueues -> Guarded0 i o -> Maybe (MessageQueues,Process0 i o) match1 m [] = Nothing match1 m ((AGuarded0 (Wno w) f):gs) = case IntMap.lookup w m of Nothing -> match1 m gs Just mq -> case Queue.head mq of Msg msg -> Just (m',cast f msg) -- ! type cast ! where mq' = Queue.tail mq m' = if Queue.null mq' then delete w m else modify Queue.tail undefined w m cast :: a -> b -- Not defined in Haskell.Figure 83. The scheduler.
The function match
applies all guarded processes for
which there are matching messages. It returns the remaining
unmatched messages and guarded processes, together with a list
of new ready processes.
Recall that each element in the field guarded
is itself
a list, which comes from a call to primRx
. The function
match1
looks for a matching message for one of the
elements in such a list, possibly returning a new message
queue and a ready process. A matching message must have the
same wire number as the guarded process. It seems like this
cannot be expressed in the type system, so we are forced to
use a type cast (see the function match1
in
Figure 83).
The stateless processes can do stream-processor input/output
by means of get0
and put0
. The output part is easy:
When it comes to input, the process does not directly callput0 :: o -> Process0 i o -> Process0 i o put0 o p = do putSPms (Right o) p
getSPms
, since that would block other threads as
well. Instead, the continuation is put on the input list in
the scheduler state, and jump to the scheduler. Note that more
than one process may call get0
. As we have already
seen, the scheduler will ensure that all of them will receive
the next message that the stream processor inputs.If a process terminates, we need to schedule some other process for execution if possible. Therefore,get0 :: (i -> Process0 i o) -> Process0 i o get0 i = do ps@(SS{ input }) <- loadSPms storeSPms ps{ input = i:input } scheduler
primTerminate0
simply jumps to the scheduler.To launch a process, the process state must be initialised. This is done inprimTerminate0 :: Process0 i o primTerminate0 = scheduler
primLaunch0
.So far, we have been quite general about the type of messages that our stateless processes will speak. To implement gadget processes, we will use the stream-processor I/O to simulate the keyboard, mouse and screen, as discussed in Section 31.1.1. We will call stateless gadget processesprimLaunch0 :: Process0 i o -> Process0 i o primLaunch0 p = do storeSPms SS{ freeWire = startWno , messageQs = IntMap.empty , ready = [] , guarded = [] , input = [] } p
GProcess0
.The typestype GProcess0 = Process0 GEvent GCommand
GEvent
and GCommand
will be defined in
Section 31.2.4.GProcess0
:A stateful process is a process-valued function which takes a stateless process continuation (parameterised over its input state), and an input state as parameters. It can modify the state before applying it to the continuation, and also use the stateless process primitives.newtype Process s = P ((s -> GProcess0) -> s -> GProcess0)
The state parameter is accessed by setState
and readState
.
We now need to lift the primitive operations of typeunp (P p) = p setState :: s -> Process s -> Process s setState a p = P $ \c s -> unp p c a readState :: (s -> Process s) -> Process s readState p = P $ \c s -> unp (p s) c s
GProcess0
to Process
. We use two auxiliary functions,
depending on whether the continuation takes an argument or
not. (This ``duplication of code'' is a price we pay for not
working with monads: in monadic style, all operations return a
value, which might be ()
if it is uninteresting. In
CPS, operations without a result take continuations without an
argument, which can be seen as a slight optimisation, but adds
to the complexity of CPS programming.)We also need to lift stateless processes into stateful ones:liftP0arg :: ((a -> GProcess0) -> GProcess0) -> (a -> Process s) -> Process s liftP0arg p0 p = P $ \c s -> p0 (\a->unp (p a) c s) liftP0c :: (GProcess0 -> GProcess0) -> Process s -> Process s liftP0c p0 p = P $ \c s -> p0 (unp p c s)
The operations for creating a wire and transmitting a message are straightforward to lift.liftP0 :: GProcess0 -> Process s liftP0 p0 = P $ \c s -> p0
We will also need an auxiliary function to ``downgrade'' a stateful process to a function from state to a stateless process.primWire :: (Wire a -> Process s) -> Process s primWire = liftP0arg primWire0 primTx :: Out o -> o -> Process s -> Process s primTx o m = liftP0c $ primTx0 o m
When liftingdown :: Process s -> (s -> GProcess0) down (P p) s = p (\s' -> primTerminate0) s
primFrom
, we must ensure that the
guarded processes get access to the state. Guarded stateful processes
are therefore guarded stateless processes parameterised over the state.Intype AGuarded s = s -> AGuarded0 GEvent GCommand type Guarded s = [AGuarded s] primFrom :: In m -> (m -> Process s) -> AGuarded s primFrom i p = \s -> primFrom0 i (\m -> down (p m) s)
primRx
, we apply the state to each guarded process,
revealing the stateless guarded processes that primRx0
accepts.The remaining primitive operations are straightforward to lift.primRx :: Guarded s -> Process s primRx gs = P $ \c s -> primRx0 [g s | g <- gs]
primTerminate :: Process s primTerminate = P $ \c s -> primTerminate0 primSpawn :: Process a -> a -> Process s -> Process s primSpawn p' s p = liftP0c (primSpawn0 (down p' s)) p primLaunch :: Process s -> s -> GProcess0 primLaunch p s = primLaunch0 (down p s)
mouse
, screen
and
keyboard
. We use the stream-processor input/output to
mediate the events and commands from/to the mouse, keyboard
and screen.Each device is controlled by two processes: the output handler, which injects commands received on a wire into the typedata GEvent = ME MouseEvnt | KE KeyboardEvnt | SE ScreenEvnt data GCommand = MC MouseCmnd | KC KeyboardCmnd | SC [ScreenCmnd]
GCommand
and outputs them, and the input
handler, which inputs events, extracts those specific for the
device and transmit them on a wire. These two handlers run in
parallel. This is captured with the deviceHandler
.We can now form our devices.deviceHandler :: (c -> GCommand) -> (GEvent -> Maybe e) -> In c -> Out e -> Process s deviceHandler inj extract cw ew = liftP0 $ primSpawn0 ohandler ihandler where ohandler = primRx0 [primFrom0 cw $ \cmd -> put0 (inj cmd) ohandler] ihandler = get0 $ \i -> case extract i of Just evt -> primTx0 ew evt $ ihandler Nothing -> ihandler
Outside the Gadget stream processor, the screen commands are transformed into corresponding Fudget drawing commands, whereas the keyboard and mouse control commands are ignored. Conversely, Fudget keyboard presses, mouse clicks and screen exposure events are transformed intokeyboard :: In KeyboardCmnd -> Out KeyboardEvnt -> Process s keyboard = deviceHandler KC (\i -> case i of KE e -> Just e _ -> Nothing) mouse :: In MouseCmnd -> Out MouseEvnt -> Process s mouse = deviceHandler MC (\i -> case i of ME e -> Just e _ -> Nothing) screen :: In [ScreenCmnd] -> Out ScreenEvnt -> Process s screen = deviceHandler SC (\i -> case i of SE e -> Just e _ -> Nothing)
GEvent
messages. This is done in the fudget gadgetF
, of type
Note that the high-level streams ofgadgetF :: Gadget -> F a b
gadgetF
are
unused. It would be nice to use them for communication between
gadget processes and the rest of the fudget program, but this
is not possible in a type-safe way. The reason is that such a
communication could be used to exchange wires between
different instances of gadgetF
. Each gadgetF
has
its own scheduler, and mixing wires between schedulers is not
type safe.
Figure 84. Example of wire queue length profiles, provided by the Gadgets-in-Fudgets implementation. Each profile represents one wire, its height is proportional to the length of the queue of messages waiting to be delivered. The picture is a snapshot of the computation; by pressing the button, a new snapshot is taken. The time axis is the one growing into the graph.
A disappointment is that we are not able to safely type check all parts of the scheduler. Nevertheless, we believe that the Haskell implementation is ``more'' type safe than the original scheduler, which was written in C.
The functional scheduler also has a serious performance problem for certain processes. If a process dynamically creates wires, sends messages to them, and then forgets them, the wire queues cannot be garbage collected. The functional scheduler can never know if a process drops its reference to a wire.
A remedy for these problems is to use lazy state threads [LJ94] and their imperative variables for representing the queues.