# Threading

{% embed url="<https://realpython.com/intro-to-python-threading>" %}

### What Is a Thread?

A thread is a separate flow of execution. This means that your program will have two things happening at once. But for most Python 3 implementations the different threads do not actually execute at the same time: they merely appear to.

### Starting a Thread

Now that you’ve got an idea of what a thread is, let’s learn how to make one. The Python standard library provides [`threading`](https://docs.python.org/3/library/threading.html), which contains most of the primitives you’ll see in this article. `Thread`, in this module, nicely encapsulates threads, providing a clean interface to work with them.

To start a separate thread, you create a `Thread` instance and then tell it to `.start()`:

```python
import logging
import threading
import time

def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    logging.info("Main    : before creating thread")
    x = threading.Thread(target=thread_function, args=(1,))
    logging.info("Main    : before running thread")
    x.start()
    logging.info("Main    : wait for the thread to finish")
    # x.join()
    logging.info("Main    : all done")
    
    
   16:03:57: Main    : before creating thread
16:03:57: Main    : before running thread
16:03:57: Thread 1: starting
16:03:57: Main    : wait for the thread to finish
16:03:57: Main    : all done
16:03:59: Thread 1: finish
```

When you create a `Thread`, you pass it a function and a list containing the arguments to that function. In this case, you’re telling the `Thread` to run `thread_function()` and to pass it `1` as an argument.

For this article, you’ll use sequential integers as names for your threads. There is `threading.get_ident()`, which returns a unique name for each thread, but these are usually neither short nor easily readable.

`thread_function()` itself doesn’t do much. It simply logs some messages with a [`time.sleep()`](https://realpython.com/python-sleep/) in between them.

### Wait a thread using join()

```python
import logging
import threading
import time

def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    logging.info("Main    : before creating thread")
    x = threading.Thread(target=thread_function, args=(1,))

    logging.info("Main    : before running thread")
    x.start()
    logging.info("Main    : wait for the thread to finish")
    x.join()
    logging.info("Main    : all done")
    
16:16:10: Main    : before creating thread
16:16:10: Main    : before running thread
16:16:10: Thread 1: starting
16:16:10: Main    : wait for the thread to finish
16:16:12: Thread 1: finishing
16:16:12: Main    : all done
```

### Using a `ThreadPoolExecutor`

There’s an easier way to start up a group of threads than the one you saw above. It’s called a `ThreadPoolExecutor`, and it’s part of the standard library in [`concurrent.futures`](https://docs.python.org/3/library/concurrent.futures.html) (as of Python 3.2).

The easiest way to create it is as a context manager, using the [`with` statement](https://realpython.com/python-with-statement/) to manage the creation and destruction of the pool.

Here’s the `__main__` from the last example rewritten to use a `ThreadPoolExecutor`:

```python
import concurrent.futures

# [rest of code]

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        executor.map(thread_function, range(3))
        
16:19:40: Thread 0: starting
16:19:40: Thread 1: starting
16:19:40: Thread 2: starting
16:19:42: Thread 0: finishing
16:19:42: Thread 1: finishing
16:19:42: Thread 2: finishing
```

The code creates a `ThreadPoolExecutor` as a context manager, telling it how many worker threads it wants in the pool. It then uses `.map()` to step through an iterable of things, in your case `range(3)`, passing each one to a thread in the pool.

The end of the `with` block causes the `ThreadPoolExecutor` to do a `.join()` on each of the threads in the pool. It is *strongly* recommended that you use `ThreadPoolExecutor` as a context manager when you can so that you never forget to `.join()` the threads.

{% hint style="info" %}
&#x20;Using a `ThreadPoolExecutor` can cause some confusing errors.

For example, if you call a function that takes no parameters, but you pass it parameters in `.map()`, the thread will throw an exception.

Unfortunately, `ThreadPoolExecutor` will hide that exception, and (in the case above) the program terminates with no output. This can be quite confusing to debug at first.
{% endhint %}

### Race Conditions

Before you move on to some of the other features tucked away in Python `threading`, let’s talk a bit about one of the more difficult issues you’ll run into when writing threaded programs: [race conditions](https://en.wikipedia.org/wiki/Race_condition).

Once you’ve seen what a race condition is and looked at one happening, you’ll move on to some of the primitives provided by the standard library to prevent race conditions from happening.

Race conditions can occur when two or more threads access a shared piece of data or resource. In this example, you’re going to create a large race condition that happens every time, but be aware that most race conditions are not this obvious. Frequently, they only occur rarely, and they can produce confusing results. As you can imagine, this makes them quite difficult to debug.

Fortunately, this race condition will happen every time, and you’ll walk through it in detail to explain what is happening.

For this example, you’re going to write a class that updates a database. Okay, you’re not really going to have a database: you’re just going to fake it, because that’s not the point of this article.

Your `FakeDatabase` will have `.__init__()` and `.update()` methods:

```python
class FakeDatabase:
    def __init__(self):
        self.value = 0

    def update(self, name):
        logging.info("Thread %s: starting update", name)
        local_copy = self.value
        local_copy += 1
        time.sleep(0.1)
        self.value = local_copy
        logging.info("Thread %s: finishing update", name)
        
        
if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    database = FakeDatabase()
    logging.info("Testing update. Starting value is %d.", database.value)
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        for index in range(2):
            executor.submit(database.update, index)
    logging.info("Testing update. Ending value is %d.", database.value)
    
    

16:25:49: Testing update. Starting value is 0.
16:25:49: Thread 0: starting update
16:25:49: Thread 1: starting update
16:25:49: Thread 1: finishing update
16:25:49: Thread 0: finishing update
16:25:49: Testing update. Ending value is 1.The program creates a ThreadPoolExecutor with two threads and then calls .submit() on each of them, telling them to run database.update().

```

The program creates a `ThreadPoolExecutor` with two threads and then calls `.submit()` on each of them, telling them to run `database.update()`.

`.submit()` has a signature that allows both positional and named arguments to be passed to the function running in the thread:

{% hint style="info" %}
`.submit()` has a signature that allows both positional and named arguments to be passed to the function running in the thread: .submit(function, \*args, \*\*kwargs)

{% endhint %}

In the usage above, `index` is passed as the first and only positional argument to `database.update()`. You’ll see later in this article where you can pass multiple arguments in a similar manner.

Since each thread runs `.update()`, and `.update()` adds one to `.value`, you might expect `database.value` to be `2` when it’s printed out at the end. But you wouldn’t be looking at this example if that was the case. <br>

When you tell your `ThreadPoolExecutor` to run each thread, you tell it which function to run and what parameters to pass to it: `executor.submit(database.update, index)`.

The result of this is that each of the threads in the pool will call `database.update(index)`. Note that `database` is a reference to the one `FakeDatabase` object created in `__main__`. Calling `.update()` on that object calls an [instance method](https://realpython.com/instance-class-and-static-methods-demystified/) on that object.

Each thread is going to have a reference to the same `FakeDatabase` object, `database`. Each thread will also have a unique value, `index`, to make the logging statements a bit easier to read:

[![Thread 1 and Thread 2 use the same shared database.](https://files.realpython.com/media/intro-threading-shared-database.267a5d8c6aa1.png)](https://files.realpython.com/media/intro-threading-shared-database.267a5d8c6aa1.png)

When the thread starts running `.update()`, it has its own version of all of the data **local** to the function. In the case of `.update()`, this is `local_copy`. This is definitely a good thing. Otherwise, two threads running the same function would always confuse each other. It means that all variables that are scoped (or local) to a function are **thread-safe**.

Now you can start walking through what happens if you run the program above with a single thread and a single call to `.update()`.

The image below steps through the execution of `.update()` if only a single thread is run. The statement is shown on the left followed by a diagram showing the values in the thread’s `local_copy` and the shared `database.value`:

[![Single thread modifying a shared database](https://files.realpython.com/media/intro-threading-single-thread.6a11288bc199.png)](https://files.realpython.com/media/intro-threading-single-thread.6a11288bc199.png)

The diagram is laid out so that time increases as you move from top to bottom. It begins when `Thread 1` is created and ends when it is terminated.

When `Thread 1` starts, `FakeDatabase.value` is zero. The first line of code in the method, `local_copy = self.value`, copies the value zero to the local variable. Next it increments the value of `local_copy` with the `local_copy += 1` statement. You can see `.value` in `Thread 1` getting set to one.

Next `time.sleep()` is called, which makes the current thread pause and allows other threads to run. Since there is only one thread in this example, this has no effect.

When `Thread 1` wakes up and continues, it copies the new value from `local_copy` to `FakeDatabase.value`, and then the thread is complete. You can see that `database.value` is set to one.

So far, so good. You ran `.update()` once and `FakeDatabase.value` was incremented to one.

Getting back to the race condition, the two threads will be running concurrently but not at the same time. They will each have their own version of `local_copy` and will each point to the same `database`. It is this shared `database` object that is going to cause the problems.

The program starts with `Thread 1` running `.update()`:

[![Thread 1 gets a copy of shared data and increments it.](https://files.realpython.com/media/intro-threading-two-threads-part1.c1c0e65a8481.png)](https://files.realpython.com/media/intro-threading-two-threads-part1.c1c0e65a8481.png)

When `Thread 1` calls `time.sleep()`, it allows the other thread to start running. This is where things get interesting.

`Thread 2` starts up and does the same operations. It’s also copying `database.value` into its private `local_copy`, and this shared `database.value` has not yet been updated:

[![Thread 2 gets a copy of shared data and increments it.](https://files.realpython.com/media/intro-threading-two-threads-part2.df42d4fbfe21.png)](https://files.realpython.com/media/intro-threading-two-threads-part2.df42d4fbfe21.png)

When `Thread 2` finally goes to sleep, the shared `database.value` is still unmodified at zero, and both private versions of `local_copy` have the value one.

`Thread 1` now wakes up and saves its version of `local_copy` and then terminates, giving `Thread 2` a final chance to run. `Thread 2` has no idea that `Thread 1` ran and updated `database.value` while it was sleeping. It stores *its* version of `local_copy` into `database.value`, also setting it to one:

[![Both threads write 1 to shared database.](https://files.realpython.com/media/intro-threading-two-threads-part3.18576920f88f.png)](https://files.realpython.com/media/intro-threading-two-threads-part3.18576920f88f.png)

The two threads have interleaving access to a single shared object, overwriting each other’s results. Similar race conditions can arise when one thread frees memory or closes a file handle before the other thread is finished accessing it.

### Basic Synchronization Using `Lock`

There are a number of ways to avoid or solve race conditions. You won’t look at all of them here, but there are a couple that are used frequently. Let’s start with `Lock`.

To solve your race condition above, you need to find a way to allow only one thread at a time into the read-modify-write section of your code. The most common way to do this is called `Lock` in Python. In some other languages this same idea is called a `mutex`. Mutex comes from MUTual EXclusion, which is exactly what a `Lock` does.

A `Lock` is an object that acts like a hall pass. Only one thread at a time can have the `Lock`. Any other thread that wants the `Lock` must wait until the owner of the `Lock` gives it up.

The basic functions to do this are `.acquire()` and `.release()`. A thread will call `my_lock.acquire()` to get the lock. If the lock is already held, the calling thread will wait until it is released. There’s an important point here. If one thread gets the lock but never gives it back, your program will be stuck. You’ll read more about this later.

Fortunately, Python’s `Lock` will also operate as a context manager, so you can use it in a `with` statement, and it gets released automatically when the `with` block exits for any reason.

```python
class FakeDatabase:
    def __init__(self):
        self.value = 0
        self._lock = threading.Lock()

    def locked_update(self, name):
        logging.info("Thread %s: starting update", name)
        logging.debug("Thread %s about to lock", name)
        with self._lock:
            logging.debug("Thread %s has lock", name)
            local_copy = self.value
            local_copy += 1
            time.sleep(0.1)
            self.value = local_copy
            logging.debug("Thread %s about to release lock", name)
        logging.debug("Thread %s after release", name)
        
18:11:45: Testing update. Starting value is 0.
18:11:45: Thread 0: starting update
18:11:45: Thread 1: starting update
18:11:45: Thread 0: finishing update
18:11:45: Thread 1: finishing update
18:11:45: Testing update. Ending value is 2.
```

Other than adding a bunch of debug logging so you can see the locking more clearly, the big change here is to add a member called `._lock`, which is a `threading.Lock()` object. This `._lock` is initialized in the unlocked state and locked and released by the `with` statement.

It’s worth noting here that the thread running this function will hold on to that `Lock` until it is completely finished updating the database. In this case, that means it will hold the `Lock` while it copies, updates, sleeps, and then writes the value back to the database.

You can turn on full logging by setting the level to `DEBUG` by adding this statement after you configure the logging output in `__main__`:

{% hint style="info" %}
logging.getLogger().setLevel(logging.DEBUG)
{% endhint %}

Many of the examples in the rest of this article will have `WARNING` and `DEBUG` level logging. We’ll generally only show the `WARNING` level output, as the `DEBUG` logs can be quite lengthy. Try out the programs with the logging turned up and see what they do.

### Deadlock

Before you move on, you should look at a common problem when using `Locks`. As you saw, if the `Lock` has already been acquired, a second call to `.acquire()` will wait until the thread that is holding the `Lock` calls `.release()`. What do you think happens when you run this code:

```python
import threading

l = threading.Lock()
print("before first acquire")
l.acquire()
print("before second acquire")
l.acquire()
print("acquired lock twice")
```

When the program calls `l.acquire()` the second time, it hangs waiting for the `Lock` to be released. In this example, you can fix the deadlock by removing the second call, but deadlocks usually happen from one of two subtle things:

1. An implementation bug where a `Lock` is not released properly
2. A design issue where a utility function needs to be called by functions that might or might not already have the `Lock`

The first situation happens sometimes, but using a `Lock` as a context manager greatly reduces how often. It is recommended to write code whenever possible to make use of context managers, as they help to avoid situations where an exception skips you over the `.release()` call.

The design issue can be a bit trickier in some languages. Thankfully, Python threading has a second object, called `RLock`, that is designed for just this situation. It allows a thread to `.acquire()` an `RLock` multiple times before it calls `.release()`. That thread is still required to call `.release()` the same number of times it called `.acquire()`, but it should be doing that anyway.

`Lock` and `RLock` are two of the basic tools used in threaded programming to prevent race conditions. There are a few other that work in different ways. Before you look at them, let’s shift to a slightly different problem domain.

### Producer-Consumer Threading

The [Producer-Consumer Problem](https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem) is a standard computer science problem used to look at threading or process synchronization issues. You’re going to look at a variant of it to get some ideas of what primitives the Python `threading` module provides.

For this example, you’re going to imagine a program that needs to read messages from a network and write them to disk. The program does not request a message when it wants. It must be listening and accept messages as they come in. The messages will not come in at a regular pace, but will be coming in bursts. This part of the program is called the producer.

On the other side, once you have a message, you need to write it to a database. The database access is slow, but fast enough to keep up to the average pace of messages. It is *not* fast enough to keep up when a burst of messages comes in. This part is the consumer.

In between the producer and the consumer, you will create a `Pipeline` that will be the part that changes as you learn about different synchronization objects.

That’s the basic layout. Let’s look at a solution using `Lock`. It doesn’t work perfectly, but it uses tools you already know, so it’s a good place to start.

#### Producer-Consumer Using `Lock`

Since this is an article about Python `threading`, and since you just read about the `Lock` primitive, let’s try to solve this problem with two threads using a `Lock` or two.

The general design is that there is a `producer` thread that reads from the fake network and puts the message into a `Pipeline`:

```python
import random 

SENTINEL = object()

def producer(pipeline):
    """Pretend we're getting a message from the network."""
    for index in range(10):
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        pipeline.set_message(message, "Producer")

    # Send a sentinel message to tell consumer we're done
    pipeline.set_message(SENTINEL, "Producer")
```

To generate a fake message, the `producer` gets a random number between one and one hundred. It calls `.set_message()` on the `pipeline` to send it to the `consumer`.

The `producer` also uses a `SENTINEL` value to signal the consumer to stop after it has sent ten values. This is a little awkward, but don’t worry, you’ll see ways to get rid of this `SENTINEL` value after you work through this example.

On the other side of the `pipeline` is the consumer:

```python
def consumer(pipeline):
    """Pretend we're saving a number in the database."""
    message = 0
    while message is not SENTINEL:
        message = pipeline.get_message("Consumer")
        if message is not SENTINEL:
            logging.info("Consumer storing message: %s", message)
```

The `consumer` reads a message from the `pipeline` and writes it to a fake database, which in this case is just printing it to the display. If it gets the `SENTINEL` value, it returns from the function, which will terminate the thread.

Before you look at the really interesting part, the `Pipeline`, here’s the `__main__` section, which spawns these threads:

```python
if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    # logging.getLogger().setLevel(logging.DEBUG)

    pipeline = Pipeline()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline)
        executor.submit(consumer, pipeline)
```

Now let’s take a look at the `Pipeline` that passes messages from the `producer` to the `consumer`:

```python
class Pipeline:
    """
    Class to allow a single element pipeline between producer and consumer.
    """
    def __init__(self):
        self.message = 0
        self.producer_lock = threading.Lock()
        self.consumer_lock = threading.Lock()
        self.consumer_lock.acquire()

    def get_message(self, name):
        logging.debug("%s:about to acquire getlock", name)
        self.consumer_lock.acquire()
        logging.debug("%s:have getlock", name)
        message = self.message
        logging.debug("%s:about to release setlock", name)
        self.producer_lock.release()
        logging.debug("%s:setlock released", name)
        return message

    def set_message(self, message, name):
        logging.debug("%s:about to acquire setlock", name)
        self.producer_lock.acquire()
        logging.debug("%s:have setlock", name)
        self.message = message
        logging.debug("%s:about to release getlock", name)
        self.consumer_lock.release()
        logging.debug("%s:getlock released", name)
```

That seems a bit more manageable. The `Pipeline` in this version of your code has three members:

1. **`.message`** stores the message to pass.
2. **`.producer_lock`** is a `threading.Lock` object that restricts access to the message by the `producer` thread.
3. **`.consumer_lock`** is also a `threading.Lock` that restricts access to the message by the `consumer` thread.

`__init__()` initializes these three members and then calls `.acquire()` on the `.consumer_lock`. This is the state you want to start in. The `producer` is allowed to add a new message, but the `consumer` needs to wait until a message is present.

`.get_message()` and `.set_messages()` are nearly opposites. `.get_message()` calls `.acquire()` on the `consumer_lock`. This is the call that will make the `consumer` wait until a message is ready.

Before you go on to `.set_message()`, there’s something subtle going on in `.get_message()` that’s pretty easy to miss. It might seem tempting to get rid of `message` and just have the function end with `return self.message`. See if you can figure out why you don’t want to do that before moving on.

Here’s the answer. As soon as the `consumer` calls `.producer_lock.release()`, it can be swapped out, and the `producer` can start running. That could happen before `.release()` returns! This means that there is a slight possibility that when the function returns `self.message`, that could actually be the *next* message generated, so you would lose the first message. This is another example of a race condition.

Moving on to `.set_message()`, you can see the opposite side of the transaction. The `producer` will call this with a message. It will acquire the `.producer_lock`, set the `.message`, and the call `.release()` on then `consumer_lock`, which will allow the `consumer` to read that value.

Let’s run the code that has logging set to `WARNING` and see what it looks like:

```python
import random 
SENTINEL = object()
def producer(pipeline):
    """Pretend we're getting a message from the network."""
    for index in range(5):
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        pipeline.set_message(message, "Producer")
    # Send a sentinel message to tell consumer we're done
    pipeline.set_message(SENTINEL, "Producer")

def consumer(pipeline):
    """Pretend we're saving a number in the database."""
    message = 0
    while message is not SENTINEL:
        message = pipeline.get_message("Consumer")
        if message is not SENTINEL:
            logging.info("Consumer storing message: %s", message)
            
            
class Pipeline:
    """
    Class to allow a single element pipeline between producer and consumer.
    """
    def __init__(self):
        self.message = 0
        self.producer_lock = threading.Lock()
        self.consumer_lock = threading.Lock()
        self.consumer_lock.acquire()

    def get_message(self, name):
        logging.debug("%s:about to acquire getlock", name)
        self.consumer_lock.acquire()
        logging.debug("%s:have getlock", name)
        message = self.message
        logging.debug("%s:about to release setlock", name)
        self.producer_lock.release()
        logging.debug("%s:setlock released", name)
        return message

    def set_message(self, message, name):
        logging.debug("%s:about to acquire setlock", name)
        self.producer_lock.acquire()
        logging.debug("%s:have setlock", name)
        self.message = message
        logging.debug("%s:about to release getlock", name)
        self.consumer_lock.release()
        logging.debug("%s:getlock released", name)
        
        
if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    # logging.getLogger().setLevel(logging.DEBUG)

    pipeline = Pipeline()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline)
        executor.submit(consumer, pipeline)
        
        
        
19:25:40: Producer got message: 45
19:25:40: Consumer:about to acquire getlock
19:25:40: Producer:about to acquire setlock
19:25:40: Producer:have setlock
19:25:40: Producer:about to release getlock
19:25:40: Producer:getlock released
19:25:40: Consumer:have getlock
19:25:40: Producer got message: 43
19:25:40: Consumer:about to release setlock
19:25:40: Producer:about to acquire setlock
19:25:40: Consumer:setlock released
19:25:40: Producer:have setlock
19:25:40: Consumer storing message: 45
19:25:40: Producer:about to release getlock
19:25:40: Consumer:about to acquire getlock
19:25:40: Producer:getlock released
19:25:40: Consumer:have getlock
19:25:40: Producer got message: 101
19:25:40: Consumer:about to release setlock
19:25:40: Producer:about to acquire setlock
19:25:40: Consumer:setlock released
19:25:40: Producer:have setlock
19:25:40: Consumer storing message: 43
19:25:40: Producer:about to release getlock
19:25:40: Consumer:about to acquire getlock
19:25:40: Producer:getlock released
19:25:40: Consumer:have getlock
19:25:40: Producer got message: 46
19:25:40: Consumer:about to release setlock
19:25:40: Producer:about to acquire setlock
19:25:40: Consumer:setlock released
19:25:40: Producer:have setlock
19:25:40: Consumer storing message: 101
19:25:40: Producer:about to release getlock
19:25:40: Consumer:about to acquire getlock
19:25:40: Producer:getlock released
19:25:40: Consumer:have getlock
19:25:40: Producer got message: 99
19:25:40: Consumer:about to release setlock
19:25:40: Producer:about to acquire setlock
19:25:40: Consumer:setlock released
19:25:40: Producer:have setlock
19:25:40: Consumer storing message: 46
19:25:40: Producer:about to release getlock
19:25:40: Consumer:about to acquire getlock
19:25:40: Producer:getlock released
19:25:40: Consumer:have getlock
19:25:40: Producer:about to acquire setlock
19:25:40: Consumer:about to release setlock
19:25:40: Consumer:setlock released
19:25:40: Producer:have setlock
19:25:40: Consumer storing message: 99
19:25:40: Producer:about to release getlock
19:25:40: Consumer:about to acquire getlock
19:25:40: Producer:getlock released
19:25:40: Consumer:have getlock
19:25:40: Consumer:about to release setlock
19:25:40: Consumer:setlock released
```

At first, you might find it odd that the producer gets two messages before the consumer even runs. If you look back at the `producer` and `.set_message()`, you will notice that the only place it will wait for a `Lock` is when it attempts to put the message into the pipeline. This is done after the `producer` gets the message and logs that it has it.

When the `producer` attempts to send this second message, it will call `.set_message()` the second time and it will block.

The operating system can swap threads at any time, but it generally lets each thread have a reasonable amount of time to run before swapping it out. That’s why the `producer` usually runs until it blocks in the second call to `.set_message()`.

Once a thread is blocked, however, the operating system will always swap it out and find a different thread to run. In this case, the only other thread with anything to do is the `consumer`.

The `consumer` calls `.get_message()`, which reads the message and calls `.release()` on the `.producer_lock`, thus allowing the `producer` to run again the next time threads are swapped.

Notice that the first message was `43`, and that is exactly what the `consumer` read, even though the `producer` had already generated the `45` message.

While it works for this limited test, it is not a great solution to the producer-consumer problem in general because it only allows a single value in the pipeline at a time. When the `producer` gets a burst of messages, it will have nowhere to put them.

Let’s move on to a better way to solve this problem, using a `Queue`.

### Producer-Consumer Using `Queue`

If you want to be able to handle more than one value in the pipeline at a time, you’ll need a data structure for the pipeline that allows the number to grow and shrink as data backs up from the `producer`.

Python’s standard library has a `queue` module which, in turn, has a `Queue` class. Let’s change the `Pipeline` to use a `Queue` instead of just a variable protected by a `Lock`. You’ll also use a different way to stop the worker threads by using a different primitive from Python `threading`, an `Event`.

Let’s start with the `Event`. The `threading.Event` object allows one thread to signal an `event` while many other threads can be waiting for that `event` to happen. The key usage in this code is that the threads that are waiting for the event do not necessarily need to stop what they are doing, they can just check the status of the `Event` every once in a while.

The triggering of the event can be many things. In this example, the main thread will simply sleep for a while and then `.set()` it:

```python
if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    # logging.getLogger().setLevel(logging.DEBUG)

    pipeline = Pipeline()
    event = threading.Event()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)

        time.sleep(0.1)
        logging.info("Main: about to set event")
        event.set()
```

The only changes here are the creation of the `event` object on line 6, passing the `event` as a parameter on lines 8 and 9, and the final section on lines 11 to 13, which sleep for a second, log a message, and then call `.set()` on the event.

The `producer` also did not have to change too much:

```python
def producer(pipeline, event):
    """Pretend we're getting a number from the network."""
    while not event.is_set():
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        pipeline.set_message(message, "Producer")

    logging.info("Producer received EXIT event. Exiting")
```

It now will loop until it sees that the event was set on line 3. It also no longer puts the `SENTINEL` value into the `pipeline`.

`consumer` had to change a little more:

```python
def consumer(pipeline, event):
    
     """Pretend we're saving a number in the database."""
     while not event.is_set() or not pipeline.empty():
         message = pipeline.get_message("Consumer")
         logging.info(
             "Consumer storing message: %s  (queue size=%s)",
             message,
             pipeline.qsize(),
         )

     logging.info("Consumer received EXIT event. Exiting")
```

While you got to take out the code related to the `SENTINEL` value, you did have to do a slightly more complicated `while` condition. Not only does it loop until the `event` is set, but it also needs to keep looping until the `pipeline` has been emptied.

Making sure the queue is empty before the consumer finishes prevents another fun issue. If the `consumer` does exit while the `pipeline` has messages in it, there are two bad things that can happen. The first is that you lose those final messages, but the more serious one is that the `producer` can get caught attempting to add a message to a full queue and never return.

This happens if the `event` gets triggered after the `producer` has checked the `.is_set()` condition but before it calls `pipeline.set_message()`.

If that happens, it’s possible for the producer to wake up and exit with the queue still completely full. The `producer` will then call `.set_message()` which will wait until there is space on the queue for the new message. The `consumer` has already exited, so this will not happen and the `producer` will not exit.

The rest of the `consumer` should look familiar.

The `Pipeline` has changed dramatically, however:

```python
class Pipeline(queue.Queue):
    def __init__(self):
        super().__init__(maxsize=10)

    def get_message(self, name):
        logging.debug("%s:about to get from queue", name)
        value = self.get()
        logging.debug("%s:got %d from queue", name, value)
        return value

    def set_message(self, value, name):
        logging.debug("%s:about to add %d to queue", name, value)
        self.put(value)
        logging.debug("%s:added %d to queue", name, value)
```

You can see that `Pipeline` is a subclass of `queue.Queue`. `Queue` has an optional parameter when initializing to specify a maximum size of the queue.

If you give a positive number for `maxsize`, it will limit the queue to that number of elements, causing `.put()` to block until there are fewer than `maxsize` elements. If you don’t specify `maxsize`, then the queue will grow to the limits of your computer’s memory.

`.get_message()` and `.set_message()` got much smaller. They basically wrap `.get()` and `.put()` on the `Queue`. You might be wondering where all of the locking code that prevents the threads from causing race conditions went.

The core devs who wrote the standard library knew that a `Queue` is frequently used in multi-threading environments and incorporated all of that locking code inside the `Queue` itself. `Queue` is thread-safe.

Running this program looks like the following:

```python
import concurrent.futures
import logging
import queue
import random
import threading
import time

def producer(queue, event):
    """Pretend we're getting a number from the network."""
    while not event.is_set():
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        queue.put(message)

    logging.info("Producer received event. Exiting")

def consumer(queue, event):
    """Pretend we're saving a number in the database."""
    while not event.is_set() or not queue.empty():
        message = queue.get()
        logging.info(
            f"Consumer storing message: {message} (size={queue.qsize()})"
        )

    logging.info("Consumer received event. Exiting")

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    pipeline = queue.Queue(maxsize=10)
    event = threading.Event()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)

        time.sleep(0.1)
        logging.info("Main: about to set event")
        event.set()
        
19:54:14: Producer got message: 9
19:54:14: Producer got message: 77
19:54:14: Producer got message: 53
19:54:14: Consumer storing message: 9 (size=0)
19:54:14: Producer got message: 16
19:54:14: Consumer storing message: 77 (size=1)
19:54:14: Producer got message: 55
19:54:14: Consumer storing message: 53 (size=1)
19:54:14: Producer got message: 78
19:54:14: Consumer storing message: 16 (size=1)
19:54:14: Producer got message: 23
19:54:14: Consumer storing message: 55 (size=1)
19:54:14: Producer got message: 62
19:54:14: Consumer storing message: 78 (size=1)
19:54:14: Producer got message: 52
19:54:14: Consumer storing message: 23 (size=1)
19:54:14: Producer got message: 15
19:54:14: Consumer storing message: 62 (size=1)
19:54:14: Producer got message: 74
19:54:14: Consumer storing message: 52 (size=1)
19:54:14: Producer got message: 82
19:54:14: Consumer storing message: 15 (size=1)
19:54:14: Producer got message: 73
19:54:14: Consumer storing message: 74 (size=1)
19:54:14: Producer got message: 33
19:54:14: Consumer storing message: 82 (size=1)
19:54:14: Producer got message: 54
19:54:14: Consumer storing message: 73 (size=1)
19:54:14: Producer got message: 36
19:54:14: Consumer storing message: 33 (size=1)
19:54:14: Producer got message: 79
19:54:14: Consumer storing message: 54 (size=1)
19:54:14: Producer got message: 71
19:54:14: Consumer storing message: 36 (size=1)
19:54:14: Producer got message: 78
19:54:14: Consumer storing message: 79 (size=1)
19:54:14: Producer got message: 18
19:54:14: Consumer storing message: 71 (size=1)
19:54:14: Producer got message: 26
19:54:14: Consumer storing message: 78 (size=1)
19:54:14: Producer got message: 63
19:54:14: Consumer storing message: 18 (size=1)
19:54:14: Producer got message: 14
19:54:14: Consumer storing message: 26 (size=1)
19:54:14: Producer got message: 8
19:54:14: Consumer storing message: 63 (size=1)
19:54:14: Producer got message: 86
19:54:14: Consumer storing message: 14 (size=1)
19:54:14: Producer got message: 16
19:54:14: Consumer storing message: 8 (size=1)
19:54:14: Producer got message: 25
19:54:14: Consumer storing message: 86 (size=1)
19:54:14: Producer got message: 93
19:54:14: Consumer storing message: 16 (size=1)
19:54:14: Producer got message: 60
19:54:14: Consumer storing message: 25 (size=1)
19:54:14: Producer got message: 31
19:54:14: Consumer storing message: 93 (size=1)
19:54:14: Producer got message: 82
19:54:14: Consumer storing message: 60 (size=1)
19:54:14: Producer got message: 18
19:54:14: Consumer storing message: 31 (size=1)
19:54:14: Producer got message: 60
19:54:14: Consumer storing message: 82 (size=1)
19:54:14: Producer got message: 87
19:54:14: Consumer storing message: 18 (size=1)
19:54:14: Producer got message: 31
19:54:14: Consumer storing message: 60 (size=1)
19:54:14: Producer got message: 34
19:54:14: Consumer storing message: 87 (size=1)
19:54:14: Producer got message: 50
19:54:14: Consumer storing message: 31 (size=1)
19:54:14: Producer got message: 88
19:54:14: Consumer storing message: 34 (size=1)
19:54:15: Producer got message: 83
19:54:15: Consumer storing message: 50 (size=1)
19:54:15: Producer got message: 73
19:54:15: Consumer storing message: 88 (size=1)
19:54:15: Producer got message: 65
19:54:15: Consumer storing message: 83 (size=1)
19:54:15: Producer got message: 99
19:54:15: Consumer storing message: 73 (size=1)
19:54:15: Main: about to set event
19:54:15: Producer got message: 57
19:54:15: Consumer storing message: 65 (size=1)
19:54:15: Producer received event. Exiting
19:54:15: Consumer storing message: 99 (size=1)
19:54:15: Consumer storing message: 57 (size=0)
19:54:15: Consumer received event. Exiting      
```
