Concurrency and Parallelism

In the concurrent model of computation, a program is organized as a composition of several autonomous activities or tasks. A web server that handle thousands of clients at once is an example of a concurrent program. Concurrency is also essential for programs that interact with their environment, e.g games and GUI programs.

Slogan support three styles of concurrent programming. The first is declarative concurrency, which enable us to apply simple functional reasoning techniques for verifying the correctness of concurrent programs. The second style of concurrent programming is based on the Actor model. Here the concurrent tasks work towards a common goal by doing independent computation and communicating with each other by asynchronous message passing. Slogan also support imperative multi-tasking, based on shared memory and locks. The first few sections of this chapter will cover the various forms of concurrent programming supported by Slogan.

Two concurrent tasks are executing parallely if they are scheduled to run on two separate hardware processors. Two separate tasks may also be made to share a single core, by time-sharing. Depending on the implementation, a Slogan task may share computing resources with other tasks or it may run on an independent processor. The task abstraction is high-level and do not give the programmer much control on how an individual task is scheduled.

Slogan has a parallel programming facility designed to help programmers take direct advantage of the multiple cores they may have on their computers. The parallel programming facility, which is also based on a simple model of message-passing, will be discussed in the last section of this chapter.

8.1 Tasks

Units of concurrent program execution are known as tasks. A new task is created and started by the task function. It takes a function as argument, which will be called from the newly launched task. The spawn operator (!) also starts a new task. The expression following the spawn operator will be evaluated in a new task. Both the task function and the spawn operator return an object that represents the new task. This object can be used to perform operations like sending asynchronous messages to the task, prematurely terminating the task etc.

The task will finish running and its resources will be released once the function or expression used to start it is fully evaluated. If an error is thrown by during the execution of the task and it is un-handled within the task's body, then the task will terminate immediately.

The next program defines a simple task that runs forever. It prints the message "hello" to the output stream, sleeps for 2 seconds and loops-back to print the message again. (If you are running this from the REPL, press Ctrl+C to terminate the process and get out of the loop).

task(^() letfn loop ()
         { showln("hello")
           loop() })
//> hello
    hello ...

The same program can be written using the spawn operator as shown below:

!letfn loop ()
 { showln("hello")
   loop() }
//> hello
    hello ...

8.1.1 Nondeterminism

The programs we wrote in the previous chapters were all deterministic. That means, for a given input the computation will always pass through the same steps and states to reach a final state that can be predicted before-hand. This property do not hold true once we have concurrently executing tasks in our program. In other words, execution of concurrent tasks are inherently nondeterministic. This is because, concurrency introduces a choice after each computation step, i.e. the choice of which task to run next.

The choice of which thread to execute next is done by a part of the system known as the scheduler. At each computation step, the scheduler picks one among all the ready tasks to execute next. We say a task is ready or runnable, if its has all the information it needs to execute at least one computation step. A task that do not have this information is said to be suspended.

To better understand nondeterminism, let us revisit the bank account example from the chapter about state and modularity. Basically, a bank account is a closure that responds to deposit and withdraw messages and update the balance accordingly. An example transaction on a bank account with opening balance 100 is shown below:

let account = make_bank_account(100)
account.balance // 100 + 150 - 200 + 300 + 400 - 200
// 550

Now let us see what will happen if we introduce concurrency into the program, where each update is performed in a separate task:

let account = make_bank_account(100)
{ !account.deposit(150)
  !account.withdraw(200) }
// ???

As we are running only a few tasks here, the account balance may still be 550, but there is no guarantee for that. What if the first task is not immediately scheduled for running? The second withdraw call may run instead, resulting in an error. Then the next two deposits may execute followed by the last withdraw. After this the first withdraw may be scheduled to run at last. What will be the balance now? It would be 750 if the scheduler ran the tasks exactly as we described above. But the scheduler is free to prioritize any task above others and the result may be something else. We can try to simulate the above scenario by asking the first task to suspend itself for a few seconds. This can be achieved by calling the task_sleep function.

let account = make_bank_account(100)
{ !{ task_sleep(3); account.deposit(150) }
  !account.withdraw(200) }
// 750???

What we have here is a race condition, a situation in which the program does not give the correct result for some interleaving of the operations of multiple tasks. Now this may seem rather harmless. If the first withdraw can be tried again, we will end-up with the expected balance of 550. But think of another situation where the first calls to deposit and withdraw was executed in order and the current balance is now 50. Next the task to deposit 300 is scheduled to run. It read the current balance into memory and just then it got suspended by the scheduler. The next task to deposit 400 executes now. It runs to completion and the balance is now 450. Now the previous deposit task is scheduled to run again. It proceeds to add 300 to the current balance (as it saw) and the final balance in the account is now 350. The deposit of 400 is lost forever! This is a specific instance of a race condition known as a data race where competing tasks end up with corrupt or invalid data.

To prevent race conditions, we need some mechanism for tasks to communicate with each other and synchronize their activities. The next few sections will explore how to do this and make concurrent programs more deterministic.

8.2 Declarative Concurrency

What does it mean for a concurrent program to be declarative? As we already saw, the result a concurrent program can produce is dependent on the order of task scheduling. We say a concurrent program is declarative, if for multiple executions with the same input, it either (1) do not terminate or (2) it eventually reaches a point where it produce results that are equivalent.

8.2.1 Reactive Variables

The simplest way to write concurrent declarative programs is by using reactive variables. A reactive variable is different from a normal variable in that it can be created in an unbound state and at a later point in time, bound to some value. A reactive variable can be bound only to a single value. Any further attempts to bind a different value to the reactive variable will fail. A task trying to get the value of a reactive variable will be suspended until the variable is bound.

The following program shows how a reactive variable is used to communicate a value between two tasks. Later we will see how reactive variables enable us to design concurrent programs that are declarative by the definitions we saw above.

let r = ? // an unbound reactive variable
!showln(?r) // the task will wait until the `r` is bound to a value

?r = 100
//> 100

The assignment done to the reactive variable is atomic. It means, while one task is updating the value of a reactive variable, the system will prevent other tasks from accessing it. Once a reactive variable is updated, it effectively becomes a read-only variable, no further updates are allowed on it.

Let us write a more complex concurrent program designed around reactive variables. Consider a program where we have a producer and a consumer. The producer creates a stream of values. The consumer will take each value from the stream and does some computation with it. We a have simple producer in the following program, where the stream consists of integers in a range. We have introduced a sleep before adding a new integer to the stream to simulate the effect of the producer doing some long computation. On the generated stream, we apply a function, print the result and also construct a new stream of all the results. This is achieved by calling the map function.

function producer(a, b)
  if (a < b)
  { task_sleep(2)
    a:producer(a+1, b) }
  else []

function consumer_callback(x)
  let (r = sqrt(x))
  { showln(r)
    r }

let xs = producer(1, 5)
let ys = map(consumer_callback, xs)
//> 1

// [1, 1.4142135623730951, 1.7320508075688772, 2]

As you run this program, you observe that map has to wait for 10 seconds before it could start processing any values. Is it possible to start any useful computation as soon as there are values available in the producer's stream? We update the producer as shown below.

function producer(a, b, r)
  if (a < b)
    let (r2 = ?)
    { task_sleep(2)
      ?r = a:r2
      producer(a+1, b, r2) }
  else ?r = []

function consumer(r)
  let (v = ?r)
    if (is_empty(v)) []
    else let (result = sqrt(head(v)))
    { showln(result)
      result:consumer(tail(v)) }

Instead of constructing the stream as a plain list of integers, the new producer will run in its own task and construct pairs of integers and reactive variables. The consumer can bind to the head of this stream, which itself is represented by a reactive variable. As soon as this variable is bound, the consumer can start doing its work.

We can see the consumer kicking-in as soon as a value is ready in the producer's stream:

let r = ?
!producer(1, 5, r)
//> 1
// [1, 1.4142135623730951, 1.7320508075688772, 2]

8.2.2 Event Notification

Reactive variables are a means to synchronize the activities of multiple tasks. A task may still execute their computations independently of others, but when it reaches a point where it expects some event to have already happened, it should suspend itself. How can it be notified when the event really happens? By waiting for a reactive variable to be bound! We shall apply this technique to the bank transactions example we saw earlier. We enforce the order in the transactions using reactive variables:

let r1, r2, r3, r4 = ?, ?, ?, ?
let account = make_bank_account(100)
{ !{ task_sleep(3); account.deposit(150); ?r1 = true }
  !when (?r1) { account.withdraw(200); ?r2 = true }
  !when (?r2) { account.deposit(300); ?r3 = true }
  !when (?r3) { account.deposit(400); ?r4 = true }
  !when (?r4) account.withdraw(200) }

// wait for 3 seconds...
// 550

In this example, the tasks have been effectively serialized, losing the benefit of concurrency. But in practice, this need not be the case. Each task may proceed to do other stuff concurrently after setting the reactive variable to true and unblocking the next task in queue.

8.2.3 Getting out of Time

Reactive variables can help us design concurrent programs that exist completely outside the dimension of time. In other words, a concurrent program can be written in such a way that it is not constrained by how time flows within or outside the system. To make this clear, let us consider the example of a solver for the simple equation z = x + y. Imagine the solver as a creature moving around in space with three sensors attached to it. Each of the sensors can detect the values for x, y and z transmitted by other creatures that float around the solver. Each creature exists in its own parallel world and the solver cannot predict in advance which sensor will be activated and in what order. But as soon as it has enough values, it should be able to solve the equation and bring itself into a consistent state.

We can express the solver as a function that tries to find the value of all variables in the equation with whatever information it has at any point in time. Once it has solved the equation for a given set of values, it displays its current state and goes back to solve for the next couple of values detected by the sensors. Each sensor is represented by a reactive variable.

function solver()
{ // Each variable is solved in its own task because the
  // solver do not know in which order the sensors gets activated.
  !?z = ?x + ?y
  !?x = ?z - ?y
  !?y = ?z - ?x

  solver() }

// Display the values of all variables and reset
// the sensors.    
function flush_sensors()
{ showln(#{'x:?x, 'y:?y, 'z:?z})
  x = ?
  y = ?
  z = ? }

// Initialize the sensors and start the solver.
let x, y, z = ?, ?, ?

// Now the sensors will start detecting values in random order and trigger the solver:
?x = 100
?y = 40
//> #{x:100, y:40, z:140}
?y = 50
?z = 60
//> #{x:10, y:50, z:60}

8.3 Message-Passing Concurrency

Message passing is a programming style in which a program consists of independent entities that interact by sending each other messages asynchronously, i.e., without waiting for a reply. This programming style was first studied by Carl Hewitt in the actor model. Message passing is important in artificial intelligence programming using multi-agent system frameworks. Each agent is an independent unit of communication working towards some local goal. If they can interact properly, the agents can also reach some global goal, working together. The message passing agents can be used to build highly reliable systems. Since the agents are independent, one can take over the work of another, if that one fails.

In Slogan, the message passing agents are represented by tasks. A task has a mailbox attached to it where it can receive messages from other tasks. The following program shows a simple agent which echoes back whatever message is sent to it:

let echo = !{let sender:message = task_receive(); task_send(sender, message)}
task_send(echo, self():'hello)
// hello

The echo task expects the message to be a pair of the sender task object and the message itself. The sender task object is required for sending back the reply. The sender calls the self function to get the handle to the current task object.

Slogan define two operators for receiving and sending messages. The !> operator is used to send a message to a task. Unlike the task_send function, the !> operator will return the message that was sent. The !< operator is used to retrieve the next message from the current task's mailbox. Utilizing these operators, we can rewrite the above example more compactly:

let echo = !{let sender:message = !<; sender !> message}
echo !> self():'hello
// <task>:hello
// hello

8.3.1 Designing Protocols

Tasks work together by exchanging messages in coordinated ways. It is interesting to study what kinds of coordination are important. This leads us to define a protocol as a sequence of messages between two or more parties that can be understood at a higher level of abstraction than just its individual messages. Let us take a closer look at two simple messaging protocols and see how to realize them with task objects. Request-Reply

The most basic protocol we can come up with is based on the request-reply messaging pattern. In this pattern, a client task will send a request to a server task and the server will respond with some result. If the computation on the server takes too long to perform, it can send back an acknowledgment token that the client can use to fetch the result later.

Here we will implement only the simple case. The server we are going to write can handle multiple clients simultaneously, by spawning a sub-task for each request. A client can also send multiple requests to the server because the message send operation is non-blocking. So here is the definition of the server and client functions:

function server()
  let (client:reqid:reqbody = !<)
  { !handle_client(client, reqid, reqbody)
    server() }

function handle_client(client, reqid, n)
  client !> reqid:sqrt(n)

function client()
 let (c = self())
 { s !> c:1:10
   s !> c:2:34
   s !> c:3:12
   list(!<, !<, !<) }

The server expects the client to send its request in the format: client_task_object:request_id:request_body. The request_body must be a number. Once it receives a request, the server will spawn a new task to handle it and goes back to accept the next request. The client handler function will find the square root of the number and send it back to the client along with the request_id. The sample client will send three requests in succession and return their responses in a single list.

Here is how we would use our client-server pair:

let s = !server()
// [1:3.1622776601683795, 2:5.830951894845301, 3:3.4641016151377544]

Can you figure out how the client will behave if the server crashes before it can serve all requests? To find out, let us rewrite the server in such a way that it will fail intermittently.

function server()
  let (client:reqid:reqbody = !<)
  { when (is_zero(random_integer(2)))
    { showln("server: i am dying!")
      raise("aaaah!") }
    !handle_client(client, reqid, reqbody)
    server() }

If we run the client again you may see this behavior if there are no other tasks running in the system:

let s = !server()
//> server: i am dying!
//> error: Deadlock detected

If there are other tasks running in the system, the call to client will just hang forever, waiting for a response to come.

We can make the client more robust by calling the task_receive function with a timeout and a default value. If a response is not received before the timeout expires, the client will assume the server is dead and return the default value. It will never crash or hang the task it is running in.

function client()
 let (c = self())
 { s !> c:1:10
   s !> c:2:34
   s !> c:3:12
   let r = ^() task_receive(.5, 0)
   list(r(), r(), r()) }

Now, if the server is down, it is dead forever, unless someone restarts it. It is possible to make it self-healing and more resilient to failures. In a future chapter we will see how to deal with failure conditions more gracefully. Publish-Subscribe

Let us look at another interesting pattern in which a server pushes updates to a set of clients. Data is pushed as a continuous stream and any client that connects to the server can read the stream, starting from that point in time.

The server we are going to write models an application that sends out stock market ticks to investors. Each tick has the format [symbolic_name_of_company, timestamp_of_tick, tick_value]. A new tick is generated every 2 seconds.

The code for the server is shown below.

function tick_server(clients)
  let (c = task_receive(2, false))
  { when (c) clients = c:clients
    for_each(^(c) c !> new_tick(), clients)
    tick_server(clients) }

let symbols = ['XYZ, 'ABC, 'HIJ]

function new_tick()
  [random_at(symbols), floor(now_seconds()), random_real()]

function random_at(xs)

Multiple clients can connect to a running instance of the tick server. Each of them will be served a separate stream of ticks.

function tick_client(server, name)
{ server !> self()
  letfn loop ()
  { showln(name, ": ", !<)
    loop() }}

let s = !tick_server([])
!tick_client(s, "1")
!tick_client(s, "2")

//> 1: [HIJ, 1494573730., .2876463473845367]
    1: [ABC, 1494663730., .013863292728449427]
    2: [HIJ, 1494663730., .8162050798467028]
    2: [HIJ, 1494663732., .11536616785362432]
    1: [XYZ, 1494663732., .7824845469456133]
    2: [HIJ, 1494663734., .15957494643321002]
    1: [HIJ, 1494663734., .209029373358495]

Exercise 8.1.   A client connecting to the server will receive all ticks that are published. What if the client is interested only in ticks associated with the symbol 'HIJ. What if another client wants to see only those ticks generated every 10 seconds? A task can sit between the client and server and filter the stream for the client. The client then talks only to the filter task and not directly to the server. Implement the pipeline just described.

8.4 Shared-Memory Concurrency

In the preceding sections we explored how to write concurrent programs that don't dabble with mutable objects shared between tasks. But some problems may require to maintain a shared state that will be accessed and modified by multiple tasks. In the bank account example at the beginning of this chapter, we learned that this will lead to a race condition, if the access to the shared state is not properly serialized. We can enforce serialization by making tasks wait on reactive variables, but this can lead to broken programs if clients of the bank account object fails to do the serialization properly. Another way to avoid the race condition is by allowing only one task at a time to access the state variable. This is achieved through mutual exclusion, which we discuss in this section.

A mutex is an object that can be owned only by a single task at a given time. A task claims ownership of mutex by locking it. The mutex is released when the task that owns it call unlock on it. The code between the calls to mutex_lock and mutex_unlock is called a critical region. Only the thread that has locked the mutex can execute the critical region. This means, if a mutation happens in a critical region, it is guaranteed that only one task will be attempting to do that at a time.

The next program rewrites the bank account in such a way that the changes to the balance happens in a critical region. This is achieved by packaging a mutex inside the closure and doing the assignment to balance after locking this mutex. This happens transparently to the users of a bank account object.

function make_bank_account(balance)
  where balance > 0
{ let m = mutex()

  function deposit(amount)
  { mutex_lock(m)    
    balance = balance + amount
    mutex_unlock(m) }

  function withdraw(amount)
    where balance - amount > 0
  { mutex_lock(m)
    balance = balance - amount
    mutex_unlock(m) }

  | 'balance -> balance
  | 'deposit -> deposit
  | 'withdraw -> withdraw }

With critical regions in place, concurrent tasks, updating the same bank account, will not be allowed to step-over the work another task has done. Each task will perform its update on a valid balance and other tasks wanting to make a deposit or withdraw will have to wait until the current ongoing update is finished. Note that the query for the current balance itself is not locked. This may cause a client to see a stale value, but this will never lead to a situation where the balance becomes invalid.

8.4.1 Memory model

Read and update operations on variables, arrays, strings and hash-tables are not atomic. An application has to avoid write/read and write/write races between tasks through appropriate use of synchronization primitives like the mutex.

The language runtime makes sure that concurrent reads and writes to input/output streams are properly serialized.

Slogan has built-in modules that implement purely functional data structures and software transactional memory. These facilities can greatly ease the development of concurrent programs that need to share state between tasks. You are encouraged to explores these modules.

8.5 A Concurrent Time-Server

Now it's time to revisit the time server from Chapter 3. There we had a server which can handle multiple clients, but only one at a time. A new client has to wait for the server to finish serving the currently connected client before it can get its response. With our new-found knowledge about tasks, we can fix the situation. The server sends each client to be handled by a dedicated task and immediately moves on to receive the next incoming connection. As Slogan can efficiently run millions of tasks, the server is limited only by the number of open network connections that the host operating system permits for a process.

Here is the code for our new concurrent time server:

let server = tcp_server_stream(2121)

function client_handler(client)
{ let request = read_line(client)
  if (request == "GET TIME")
    showln(stream = client, time_to_string(now()))
    showln(stream = client, "error: invalid request")
  close_stream(client) }

function time_server()
  let (client = read(server))
  { !client_handler(client)
    time_server() }

// start the server
// we will never reach here

8.6 Dynamic Variables

Variables generally have static scope. They can also have dynamic scope with task specific bindings. Dynamic bindings are established by the letdyn expression. Dynamic variable names must follow a certain naming convention – they must begin and end with the percentage character (%). The following program shows a function that changes its output based on the dynamic binding of the variable %abc%:

let %abc% = 20
function df() %abc% * 100

// 2000

letdyn (%abc% = 5) df()
// 500

// 2000

// a dynamic binding is visible only within the current task:
{ !letdyn (%abc% = 5) showln(df())
  !letdyn (%abc% = 10) showln(df()) }
//> 500
//> 1000

8.7 Parallel Programming

Slogan tasks are really light-weight because multiple tasks may share a single hardware processor. There is no way the user can make sure that a newly started task will execute on a dedicated processor. To execute some code on a separate processor, a new set of functions are required. These are the process, process_send and process_receive functions. Together these three functions implement a simple message passing model for multi-core programming.

The following program shows how to execute a computation parallely using the process abstraction:

function handler(msg)
| ['add, x, y] -> x + y
| _ -> 'invalid_message

let child = process(^(parent)
                    letfn loop (msg = process_receive(parent))
                      when (msg <> 'quit)
                      { process_send(parent, handler(msg))
                        loop(process_receive(parent)) })

process_send(child, ['add, 10, 20])
process_send(child, ['sub, 10, 20])
// 30
// invalid_message
process_send(child, 'quit)

Once all computations required from a new process is finished, the resources held by it must be released by calling the process_close function.

Exercise 8.2.   Extend the handler so that it can also respond to the 'sub request.

While designing parallel programs using the process abstraction, keep in mind that the messaging functions process_send and process_receive are not optimized to deal with large data. You can get the best out of these functions by using them to exchange short messages, ideally less than 1KB (1024 bytes) in size. If you want to send a large chunk of data to another process, do that by passing a "pointer" to that data as a message. This "pointer" should point to a file or a database where the real data is stored.

Next | Previous | Contents