csp(n) 0.1.0 "Concurrency"

Name

csp - Golang style concurrency library based on Communicating Sequential Processes

Table Of Contents

Synopsis

Description

The csp package provides two concurrency primitives namely coroutines and channels which allow concurrent programming in the style of Golang.

The concepts originate in Hoare's Communicating Sequential Processes while the syntax mimics the Golang implementation.

The CSP concurrency model may be visualized as a set of independent processes (coroutines) sending and receiving messages to the named channels. The control flow in the coroutines is coordinated at the points of sending and receiving messages i.e. the coroutine may need to wait while trying to send or receive. Since it must work in a single-threaded interpreter, waiting is non-blocking. Instead of blocking a waiting coroutine gives way to other coroutines.

This concurrency model may also be seen as a generalization of Unix named pipes where processes and pipes correspond to coroutines and channels.

Concepts

channel
There are two types of channels.
Unbuffered channels

The unbuffered channel is a single value container that can be imagined as a rendez-vous venue where the sender must wait for the receiver to collect the message. By default channels are unbuffered.

Buffered channels

The buffered channel behaves like a FIFO queue.

Whether receiver need to wait while trying to receive from a channel depends on the channel's internal state:
ready for receive

The buffered channel is ready for receive when it is not empty. The unbuffered channel is ready for receive if there exists a sender waiting with a message on this channel.

ready for send

The buffered channel is ready for send when it is not full. The unbuffered channel is ready for send if there is no other sender already waiting on this channel. Note that

Channel is created with:

::csp::channel chanVar ?size?

Where the optional parameter size specifies the maximum number of messages that can be stored in the channel. When the channel is full the sender trying to send more messages to it must wait until any receiver offloads the channel. Waiting means that the sender gives way to other coroutines.

If the size is zero (default) the created channel is unbuffered which means that the sender coroutine always waits for the receiver to collect the message.

Channel may be closed with:

channelObj close

and is destroyed automatically when all messages are received (the channel is drained).

Channel lifecycle is described by 3 possible states:
created

Once the channel is created you can send to and receive from the channel.

closed

When the channel is closed you can still receive from the channel but you cannot send to it. Trying to send to the closed channel throws an error. It is responsibility of the library user to close the unused channels.

destroyed

The channel does not exist. After receiving all messages from the closed channel, the channel is destroyed. Trying to send to or receive from the destroyed channel throws an error.

Note that creating a large number of channels that are properly closed but not emptied may result in a memory leak.

coroutine

Coroutine is a procedure executing concurrently with other coroutines. Coroutine may send messages to and receive messages from channels. Any coroutine may act as a sender or receiver at different times. If channel is not ready a coroutine waits by giving way to other coroutines. This makes the coroutine execution multiplexed at the points of sending to or receiving from channels.

Coroutine is created with:

::csp::go procName ?args?

where procName is the name of the existing Tcl procedure that will be run as a coroutine. You can create many coroutines from a single Tcl procedure, possibly called with different arguments.

Coroutine is destroyed when its execution ends.

We reuse the term coroutine known from Tcl (modelled on Lua) coroutines, but they are are not equivalent. csp coroutines are implemented in terms of Tcl coroutines and it's better not to mix csp and Tcl coroutines in a single program.

COMMANDS

::csp::go procName ?args?

Create a coroutine by calling procName with arguments args. Returns internal name of the coroutine.

::csp::channel channelVar ?size?

Create a channel object that will be further referred as channelObj. The name of the object is contained in variable channelVar.

var channelVar

Variable channelVar that will be created and will contiain the channel object name.

number size

Size of the channel buffer understood as the maximum number of messages that can be buffered in the channel. If size is zero (default) the channel is unbuffered.

Returns channel object name.

channelObj close

Close the channel channelObj. Returns empty string.

channelObj <- msg

Send msg to channel channelObj in a coroutine. Returns empty string.

channelObj <-! msg

Send msg to channel channelObj in a script (in the Tcl program main control flow). It is implemented using vwait and has many limitations. Use with care and only in simple scenarios. Returns empty string.

::csp::<- channelObj

Receive from channel channelObj in a coroutine. Returns the message received from the channel.

::csp::<-! channelObj

Receive from channel channelObj in a script (in the Tcl program main control flow). Returns the message received from the channel.

::csp::select operation body

Evaluate set of channels to find which channels are ready and run corresponding block of code. Returns the result of evaluation of the block of code.

list operation

Operation takes one of 3 forms:

<- channelObj

for evaluating whether the channelObj is ready for receive, or

channelObj <-

for evaluating whether the channelObj is ready for send, or

default

for evaluating default case if no channel is ready.

block body

Block of code to be evaluated.

The select command provides a way to handle multiple channels. It is a switch like statement where channels are evaluated for readiness. The select command makes the coroutine wait until at least one channel is ready. If multiple channels can proceed, select chooses pseudo-randomly. A default clause, if present, executes immediately if no channel is ready.

::csp::range varName channelObj body

Receive from channel until closed in a coroutine.

This is a foreach like construct that iterates by receiving messages from channel one by one until channel is closed. If channel is not ready for receive, range waits.

::csp::range! varName channelObj body

Receive from channel until closed in the main control flow.

A version of range command that can be used outside of a coroutine. It is implemented using vwait and has many limitations. Use with care and only in simple scenarios.

::csp::timer channelVar interval

Create a receive-only channel with scheduled message in interval milliseconds. Trying to receive from the channel will cause the coroutine to wait interval milliseconds since creation. Eventually the received message is a Unix epoch time in microseconds. After receiving the message the channel is closed and destroyed.

Returns the created channel.

::csp::ticker channelVar interval ?closeafter?

Create a receive-only channel with scheduled messages every interval milliseconds.

Returns the created channel. The optional closeafter argument determines when the channel is closed. It may take one of the 2 forms:

  • integerNumber that specifies the number of milliseconds after which the channel will be closed

  • #integerNumber that specifies number of messages after which the channel will be closed

If closeafter argument is not provided, the ticker channel emits messages endlessly.

::csp::-> channelVar

Creates a channel and returns a new coroutine that may be called with a single argument. The coroutine is meant for integration with callback-driven code and to be used in place of one-time callback. The channel is placed in channelVar and will be destroyed after receiving a single message. The single argument passed to the callback will be available to receive from the created channel.

Note that there is a limitation in replacing callbacks with -> command: only a single- or zero- argument callbacks can be replaced. In case of zero-argument callbacks an empty string is sent to the channel.

::csp::->> channelVar ?size?

Creates a buffered channel of size size and returns a new coroutine that may be used in place of a callback. The coroutine may be called many times and the callback arguments are internally sent to the created channel.

Note that there is a limitation in replacing callbacks with -> command: only a single- or zero- argument callbacks can be replaced. In case of zero-argument callbacks an empty string is sent to the channel.

::csp::forward fromChannel toChannel

Receive messages from fromChannel and send them to toChannel.

EXAMPLES

Example 1

Simple message passing over an unbuffered channel

    package require csp
    namespace import csp::*
 
    proc sender1 {ch} {
        foreach i {1 2 3 4} {
            puts "Sending $i"
            $ch <- $i
        }
        puts "Closing channel"
        $ch close
    }
 
    proc receiver1 {ch} {
        while 1 {
            puts "Receiving [<- $ch]"
        }
    }
 
    # create unbuffered (rendez-vous) channel
    channel ch
    go sender1 $ch
    go receiver1 $ch
 
    vwait forever

Output:

Sending 1
Receiving 1
Sending 2
Receiving 2
Sending 3
Receiving 3
Sending 4
Receiving 4
Closing channel

The communication between the coroutines is coordinated because the channel is unbuffered. The sender waits for the receiver.

Example 2

Simple message passing over a buffered channel

    package require csp
    namespace import csp::*
 
    proc sender1 {ch} {
        foreach i {1 2 3 4} {
            puts "Sending $i"
            $ch <- $i
        }
        puts "Closing channel"
        $ch close
    }
 
    proc receiver1 {ch} {
        while 1 {
            puts "Receiving [<- $ch]"
        }
    }
 
    # create buffered channel of size 2
    channel ch 2
    go sender1 $ch
    go receiver1 $ch
 
    vwait forever

Output:

Sending 1
Sending 2
Sending 3
Receiving 1
Receiving 2
Sending 4
Closing channel
Receiving 3
Receiving 4
Error: Cannot receive from a drained (empty and closed) channel ::csp::Channel#1

Since the channel is buffered of size 2, the sender waits only on the third attempt.

Note that the channel was closed but we still receive messages. Only after the channel was emptied, trying to receive from the channel throws an error.

Example 3

Using range for receiving from the channel until closed.

We can prevent throwing the error in the previous example by using the range command instead of iterating blindly with while. Also if the channel is buffered we can send all messages first and iterate to receive using range in a single coroutine.

    package require csp
    namespace import csp::*
 
    proc senderreceiver {ch} {
        foreach i {1 2 3 4} {
            puts "Sending $i"
            $ch <- $i
        }
        puts "Closing channel"
        $ch close
        range msg $ch {
            puts "Message $msg"
        }
        puts "Received all"
    }
 
    channel ch 10
    go senderreceiver $ch
 
    vwait forever

Output:

Sending 1
Sending 2
Sending 3
Sending 4
Closing channel
Message 1
Message 2
Message 3
Message 4
Received all

Example 4

Channels can be used to coordinate future events. We use after to create coroutine that will send to the channel.

Instead of using direct callback which cannot keep local state we consume events in adder which can keep sum in local variable.

    package require csp
    namespace import csp::*
 
    proc adder {ch} {
        set sum 0
        while 1 {
            set number [<- $ch]
            incr sum $number
            puts "adder received $number. The sum is $sum"
        }
    }
 
    proc trigger {ch number} {
        $ch <- $number
    }
 
    channel ch
    go adder $ch
    after 1000 go trigger $ch 1
    after 3000 go trigger $ch 3
    after 5000 go trigger $ch 5
    puts "Enter event loop"
 
    vwait forever

Output:

Enter event loop
adder received 1. The sum is 1
adder received 3. The sum is 4
adder received 5. The sum is 9

Example 5

Use timer to create a channel supplying scheduled messages in the future.

    package require csp
    namespace import csp::*
 
    proc future {ch} {
        try {
            puts "future happened at  [<- $ch]"
            puts "try to receive again:"
            puts "[<- $ch]"
        } on error {out err} {
            puts "error: $out"
        }
    }
 
    timer ch 2000
    go future $ch
    puts "Enter event loop at [clock microseconds]"
 
    vwait forever

Output:

Enter event loop at 1434472163190638
future happened at  1434472165189759
try to receive again:
error: Cannot receive from a drained (empty and closed) channel ::csp::Channel#1

Instead of scheduling events with after we can use timer to create a special receive only channel. There will be only one message send to this channel after the specified time so we can pass this channel to another coroutine that will wait for that message. The message from the timer channel represents unix epoch time in microseconds. The timer channel will be automatically destroyed after first receive so trying to receive again will throw an error.

Example 6

Using ticker we can create receive only channel from which we can consume timestamp messages at regular intervals.

    package require csp
    namespace import csp::*
 
    proc future {ch} {
        set count 0
        while 1 {
            incr count
            puts "future $count received at [<- $ch]"
        }
    }
 
    ticker ch 1000
    go future $ch
    puts "Enter event loop at  [clock microseconds]"
  
    vwait forever

Output:

Enter event loop at  1434472822879684
future 1 received at 1434472823879110
future 2 received at 1434472824882163
future 3 received at 1434472825884246
...

Example 7

ticker command returns the created channel so we can use it in place in combination with range to further simplify the example

    package require csp
    namespace import csp::*
 
    proc counter {} {
        range t [ticker ch 1000] {
            puts "received $t"
        }
    }
 
    go counter
 
    vwait forever

Output:

received 1434474325947677
received 1434474326950822
received 1434474327952904
...

Example 8

Another example of using ticker to implement the canonical countdown counter from Tcl wiki.

    package require Tk
    package require csp
    namespace import csp::*
 
    proc countdown {varName} {
        upvar $varName var
        range _ [ticker t 1000 #10] {
            incr var -1
        }
    }
 
    set count 10
    label .counter -font {Helvetica 72} -width 3 -textvariable count
    grid .counter -padx 100 -pady 100
    go countdown count

Example 9

Closing the channel by another scheduled event breaks the range loop

    package require csp
    namespace import csp::*
 
    proc counter {ch} {
        range t $ch {
            puts "received $t"
        }
        puts "counter exit"
    }
 
    ticker ch 1000
    go counter $ch
    after 4500 $ch close
    puts "Enter event loop at [clock microseconds]"
 
    vwait forever

Output:

Enter event loop at 1434474384645704
received 1434474385644900
received 1434474386648105
received 1434474387650088
received 1434474388652345
counter exit

Example 10

Redirect callback call argument to a channel using -> command.

    package require http
    package require csp
    namespace import csp::*
 
    proc main {} {
        http::geturl http://securitykiss.com/rest/now -command [-> ch]
        puts "fetched: [http::data [<- $ch]]"
    }
 
    go main
 
    vwait forever

Output:

fetched: 1434474568

csp package makes it easy to integrate channels and coroutines with existing event driven code. Using the -> utility command we can make channels work with callback driven commands and at the same time avoid callback hell.

-> ch creates a channel ch and returns a new coroutine that may be used in place of a callback. The channel will be destroyed after receiving a single value. The single argument passed to the callback will be available to receive from the created channel.

Such code organization promotes local reasoning - it helps writing linear code with local state kept in proc variables. Otherwise the callback would require keeping state in global variables.

Note that there is a limitation in replacing callbacks with -> command: only a single- or zero- argument callbacks can be replaced. In case of zero-argument callbacks an empty string is sent to the channel.

Note that there is no symmetry in <- <-! -> ->> commands. Every one of them has a different purpose.

Example 11

Use select command to choose ready channels.

    package require http
    package require csp
    namespace import csp::*
 
    proc main {} {
        http::geturl http://securitykiss.com/rest/slow/now -command [-> ch1]
        http::geturl http://securitykiss.com/rest/slow/now -command [-> ch2]
        select {
            <- $ch1 {
                puts "from first request: [http::data [<- $ch1]]"
            }
            <- $ch2 {
                puts "from second request: [http::data [<- $ch2]]"
            }
        }
    }
 
    go main
 
    vwait forever

Output:

from first request: 1434483100

Previous example with callback channels does not extend to making parallel http requests because one waiting channel would prevent receiving from the other. The select command chooses which of a set of possible send or receive operations will proceed. In this example select command examines two callback channels and depending on which one is ready for receive first, it evaluates corresponding body block.

Example 12

Combine timer created channel with select to enforce timeouts.

    package require http
    package require csp
    namespace import csp::*
 
    proc main {} {
        http::geturl http://securitykiss.com/rest/slow/now -command [-> ch1]
        http::geturl http://securitykiss.com/rest/slow/now -command [-> ch2]
        timer t1 400
        select {
            <- $ch1 {
                puts "from first request: [http::data [<- $ch1]]"
            }
            <- $ch2 {
                puts "from second request: [http::data [<- $ch2]]"
            }
            <- $t1 {
                puts "requests timed out at [<- $t1]"
            }
        }
    }
 
    go main
 
    vwait forever

Output:

requests timed out at 1434484003634953

Since select chooses from the set of channels whichever is ready first, by adding the timer created channel to select from, we can implement timeout as in the example above.

Example 13

Use select with the default clause.

    package require http
    package require csp
    namespace import csp::*
 
    proc DisplayResult {ch1 ch2} {
        set result [select {
            <- $ch1 {
                http::data [<- $ch1]
            }
            <- $ch2 {
                http::data [<- $ch2]
            }
            default {
                subst "no response was ready"
            }
        }]
        puts "DisplayResult: $result"
    }
 
    proc main {} {
        http::geturl http://securitykiss.com/rest/slow/now -command [-> ch1]
        http::geturl http://securitykiss.com/rest/slow/now -command [-> ch2]
        after 400 go DisplayResult $ch1 $ch2
    }
 
    go main
 
    vwait forever

Output:

DisplayResult: no response was ready

select command is potentially waiting if no channel is ready. Sometimes we need to proceed no matter what so select makes it possible to return without waiting if the default clause is provided. This example also shows that select has a return value. In this case the result returned by select is either HTTP response or the value specified in the default block if no channel is ready.

Example 14

Funnel multiple channels into a single channel using forward command.

    package require http
    package require csp
    namespace import csp::*
 
    proc main {} {
        set urls {
            http://securitykiss.com
            http://meetup.com
            http://reddit.com
            http://google.com
            http://twitter.com
            http://bitcoin.org
        }
        channel f
        foreach url $urls {
            http::geturl $url -method HEAD -command [-> ch]
            forward $ch $f
        }
        after 200 $f close
        range token $f {
            upvar #0 $token state
            puts "$state(http)\t$state(url)"
        }
        puts "main exit"
    }
 
    go main
 
    vwait forever

Output:

HTTP/1.1 302 Found  http://google.com/
HTTP/1.1 301 Moved Permanently  http://reddit.com/
HTTP/1.1 301 Moved Permanently  http://securitykiss.com/
main exit

When we want to listen on many channels, especially when they are dynamically created for example per URL as in the above example, select command becomes awkward because it requires specifying logic for every channel.

In the example above we spawn a HTTP request for every URL and forward messages from individual "callback channels" into the single "funnel channel" f. In this way the responses are available in a single channel so we can apply common logic to the results. We also set the timeout for the requests by closing the "funnel channel" after some time. Responses that don't make it within a specified timeout are ignored.

Example 15

Redirect callback multi call argument to a long-lived channel using ->> command.

    package require Tk
    package require csp
    namespace import csp::*
 
    proc main {} {
        set number 5
        frame .f
        button .f.left -text <<< -command [->> chleft]
        label .f.lbl -font {Helvetica 24} -text $number
        button .f.right -text >>> -command [->> chright]
        grid .f.left .f.lbl .f.right
        grid .f
        while 1 {
            select {
                <- $chleft {
                    <- $chleft
                    incr number -1
                }
                <- $chright {
                    <- $chright
                    incr number
                }
            }
            .f.lbl configure -text $number
        }
    }
 
    go main

In previous examples the -> command created short-lived disposable callback channels that could be received from only once. Often an existing command require a callback that will be called many times over long period of time. In such case ->> comes to play. It returns a coroutine that may be called many times in place of the callback. Callback argument is passed to the newly created buffered channel that can be later received from to consume the messages (callback arguments).

In this example similar functionality could be achieved in a simpler way using -textvariable on label but it would require a global variable instead of local number.

The same limitations regarding callback arguments arity apply as for the -> command.

Note that there is no symmetry in <- <-! -> ->> commands. Every one of them has a different purpose.

Example 16

Channel operations like <- and range can be used only in coroutines. Using coroutines for channel driven coordination is the recommended way of using csp package.

It may happen that we need to use channels outside of coroutines. It is possible with corresponding <-! and range! commands but there are caveats. The "bang" terminated commands are implemented using vwait nested calls and have many limitations. Thus they should be used with extra care and only in simple scenarios. Especially it is not guaranteed that they will work correctly if used inside callbacks.

In this example we show a simple scenario where receiving from the channel in the main script control flow makes sense as a way to synchronize coroutine termination.

    package require http
    package require csp
    namespace import csp::*
  
    proc worker {ch_quit} {
        http::geturl http://securitykiss.com/rest/now -command [-> ch]
        puts "fetched: [http::data [<- $ch]]"
        $ch_quit <- 1
    }
  
    # termination coordination channel
    channel ch_quit
  
    go worker $ch_quit
 
    <-! $ch_quit

Output:

fetched: 1434556219

Without the last line the script would exit immediately without giving the coroutine a chance to fetch the url.

Example 17

Following the "bang" terminated command trail, this example shows how range! command may further simplify the previous countdown counter example.

    package require Tk
    package require csp
    namespace import csp::*
 
    set count 5
    label .counter -font {Helvetica 72} -width 3 -textvariable count
    grid .counter -padx 100 -pady 100
    range! _ [ticker t 1000 #$count] {
        incr count -1
    }

Example 18

A more complex example using the already discussed constructs.

    # A simple web crawler/scraper demonstrating the csp style programming in Tcl
    # In this example we have 2 coroutines: a crawler and a parser communicating over 2 channels.
    # The crawler receives the url to process from the urls channel and spawns a http request
    # Immediately sends the pair: (url, callback channel from http request) 
    # into a pending requests channel for further processing by the parser.
    # The parser receives the http token from the received callback channel 
    # and fetches the page content from the url in order to extract more urls.
    # The new urls are sent to the urls channel where the crawler takes over again.
  
    package require http
    package require csp
    namespace import csp::*
 
    # The crawler coroutine is initialized with 3 channels:
    # urls - channel with urls waiting to process
    # requests - channel with pending http requests
    # quit - synchronization channel to communicate end of coroutine
    proc crawler {urls requests quit} {
        # list of visited urls
        set visited {}
        range url $urls {
            if {$url ni $visited} {
                http::geturl $url -command [-> req]
                lappend visited $url
                # note we are passing channel object over a channel
                $requests <- [list $url $req]
            }
        }
        $quit <- 1
    }
 
 
    # The parser coroutine is initialized with 3 channels:
    # urls - channel with urls waiting to process
    # requests - channel with pending http requests
    # quit - synchronization channel to communicate end of coroutine
    proc parser {urls requests quit} {
        set count 0
        range msg $requests {
            lassign $msg url req
            timer timeout 5000
            select {
                <- $req {
                    set token [<- $req]
                    set data [http::data $token]
                    puts "Fetched URL $url with content size [string length $data] bytes"
                    foreach {_ href} [regexp -nocase -all -inline {href="(.*?)"} $data] {
                        if {![string match http:* $href] && ![string match mailto:* $href]} {
                            # catch error if channel has been closed
                            $urls <- [create_url $url $href]
                        }
                    }
                }
                <- $timeout {
                    <- $timeout
                    puts "Request to $url timed out"
                }
            }
            # we stop after fetching 10 urls
            if {[incr count] >= 10} {
                $urls close
                $requests close
            }
        }
        $quit <- 1
    }
 
    # utility function to construct urls
    proc create_url {current href} {
        regexp {(http://[^/]*)(.*)} $current _ base path
        if {[string match /* $href]} {
            return $base$href
        } else {
            return $current$href
        }
    }
 
 
    # channel containing urls to process
    # this channel must have rather large buffer so that the urls to crawl can queue
    channel urls 10000
    # channel containing (url req) pairs representing pending http requests
    # size of this channel determines parallelism i.e. the maximum number of pending requests at the same time
    channel requests 3
    # coordination channels that make the main program wait until coroutines end
    channel crawler_quit
    channel parser_quit
    go crawler $urls $requests $crawler_quit
    go parser $urls $requests $parser_quit
 
    # send the seed url to initiate crawling
    $urls <-! "http://www.tcl.tk/man/tcl8.6/"
 
    # Gracefully exit - wait for coroutines to complete
    <-! $crawler_quit
    <-! $parser_quit

In particular it is worth noting:

  • it is possible to pass a channel object over another channel

  • use of quit synchronization channel to communicate end of coroutine

  • closing channels as a way to terminate range iteration

Keywords

actors, callback, channel, concurrency, csp, golang

Category

Concurrency