skip to navigation
skip to content

Planet Python

Last update: April 21, 2025 04:43 PM UTC

April 21, 2025


death and gravity

Process​Thread​Pool​Executor: when I‍/‍O becomes CPU-bound

So, you're doing some I‍/‍O bound stuff, in parallel.

Maybe you're scraping some websites – a lot of websites.

Maybe you're updating or deleting millions of DynamoDB items.

You've got your ThreadPoolExecutor, you've increased the number of threads and tuned connection limits... but after some point, it's just not getting any faster. You look at your Python process, and you see CPU utilization hovers above 100%.

You could split the work into batches and have a ProcessPoolExecutor run your original code in separate processes. But that requires yet more code, and a bunch of changes, which is no fun. And maybe your input is not that easy to split into batches.

If only we had an executor that worked seamlessly across processes and threads.

Well, you're in luck, since that's exactly what we're building today!

And even better, in a couple years you won't even need it anymore.

Establishing a baseline #

To measure things, we'll use a mock that pretends to do mostly I‍/‍O, with a sprinkling of CPU-bound work thrown in – a stand-in for something like a database connection, a Requests session, or a DynamoDB client.

class Client:
    io_time = 0.02
    cpu_time = 0.0008

    def method(self, arg):
        # simulate I/O
        time.sleep(self.io_time)

        # simulate CPU-bound work
        start = time.perf_counter()
        while time.perf_counter() - start < self.cpu_time:
            for i in range(100): i ** i

        return arg

We sleep() for the I‍/‍O, and do some math in a loop for the CPU stuff; it doesn't matter exactly how long each takes, as long I‍/‍O time dominates.

Real multi-threaded clients are usually backed by a connection pool; we could simulate one using a semaphore, but it's not relevant here – we're assuming the connection pool is effectively unbounded.

Since we'll use our client from multiple processes, we set up a global instance and a function that uses it; we can then pass init_client() as an executor initializer, which also allows us passing arguments to the client when creating it.

client = None

def init_client(*args):
    global client
    client = Client(*args)

def do_stuff(*args):
    return client.method(*args)

Finally, we make a simple timing context manager:

@contextmanager
def timer():
    start = time.perf_counter()
    yield
    end = time.perf_counter()
    print(f"elapsed: {end-start:1.3f}")

...and put everything together in a function that measures how long it takes to do a bunch of work using a concurrent.futures executor:

def benchmark(executor, n=10_000, timer=timer, chunksize=10):
    with executor:
        # make sure all the workers are started,
        # so we don't measure their startup time
        list(executor.map(time.sleep, [0] * 200))

        with timer():
            values = list(executor.map(do_stuff, range(n), chunksize=chunksize))

        assert values == list(range(n)), values

Threads #

So, a ThreadPoolExecutor should suffice here, since we're mostly doing I‍/‍O, right?

>>> from concurrent.futures import *
>>> from bench import *
>>> init_client()
>>> benchmark(ThreadPoolExecutor(10))
elapsed: 24.693

More threads!

>>> benchmark(ThreadPoolExecutor(20))
elapsed: 12.405

Twice the threads, twice as fast. More!

>>> benchmark(ThreadPoolExecutor(30))
elapsed: 8.718

Good, it's still scaling linearly. MORE!

>>> benchmark(ThreadPoolExecutor(40))
elapsed: 8.638

confused cat with question marks around its head

...more?

>>> benchmark(ThreadPoolExecutor(50))
elapsed: 8.458
>>> benchmark(ThreadPoolExecutor(60))
elapsed: 8.430
>>> benchmark(ThreadPoolExecutor(70))
elapsed: 8.428

squinting confused cat

Problem: CPU becomes a bottleneck #

It's time we take a closer look at what our process is doing.

I'd normally use the top command for this, but since the flags and output vary with the operating system, we'll implement our own using the excellent psutil library.

@contextmanager
def top():
    """Print information about current and child processes.

    RES is the resident set size. USS is the unique set size.
    %CPU is the CPU utilization. nTH is the number of threads.

    """
    process = psutil.Process()
    processes = [process] + process.children(True)
    for p in processes: p.cpu_percent()

    yield

    print(f"{'PID':>7} {'RES':>7} {'USS':>7} {'%CPU':>7} {'nTH':>7}")
    for p in processes:
        try:
            m = p.memory_full_info()
        except psutil.AccessDenied:
            m = p.memory_info()
        rss = m.rss / 2**20
        uss = getattr(m, 'uss', 0) / 2**20
        cpu = p.cpu_percent()
        nth = p.num_threads()
        print(f"{p.pid:>7} {rss:6.1f}m {uss:6.1f}m {cpu:7.1f} {nth:>7}")

And because it's a context manager, we can use it as a timer:

>>> init_client()
>>> benchmark(ThreadPoolExecutor(10), timer=top)
    PID     RES     USS    %CPU     nTH
  51395   35.2m   28.5m    38.7      11

So, what happens if we increase the number of threads?

>>> benchmark(ThreadPoolExecutor(20), timer=top)
    PID     RES     USS    %CPU     nTH
  13912   16.8m   13.2m    70.7      21
>>> benchmark(ThreadPoolExecutor(30), timer=top)
    PID     RES     USS    %CPU     nTH
  13912   17.0m   13.4m    99.1      31
>>> benchmark(ThreadPoolExecutor(40), timer=top)
    PID     RES     USS    %CPU     nTH
  13912   17.3m   13.7m   100.9      41

With more threads, the compute part of our I‍/‍O bound workload increases, eventually becoming high enough to saturate one CPU – and due to the global interpreter lock, one CPU is all we can use, regardless of the number of threads.1

Processes? #

I know, let's use a ProcessPoolExecutor instead!

>>> benchmark(ProcessPoolExecutor(20, initializer=init_client))
elapsed: 12.374
>>> benchmark(ProcessPoolExecutor(30, initializer=init_client))
elapsed: 8.330
>>> benchmark(ProcessPoolExecutor(40, initializer=init_client))
elapsed: 6.273

Hmmm... I guess it is a little bit better.

More? More!

>>> benchmark(ProcessPoolExecutor(60, initializer=init_client))
elapsed: 4.751
>>> benchmark(ProcessPoolExecutor(80, initializer=init_client))
elapsed: 3.785
>>> benchmark(ProcessPoolExecutor(100, initializer=init_client))
elapsed: 3.824

OK, it's better, but with diminishing returns – there's no improvement after 80 processes, and even then, it's only 2.2x faster than the best time with threads, when, in theory, it should be able to make full use of all 4 CPUs.

Also, we're not making best use of connection pooling (relevant if the client connects to many different hosts, since we now have 80 pools), nor multiplexing (relevant with protocols like HTTP/2 or newer, since we now have 80 connections).

Problem: more processes, more memory #

But it gets worse!

>>> benchmark(ProcessPoolExecutor(80, initializer=init_client), timer=top)
    PID     RES     USS    %CPU     nTH
   2479   21.2m   15.4m    15.0       3
   2480   11.2m    6.3m     0.0       1
   2481   13.8m    8.5m     3.4       1
  ... 78 more lines ...
   2560   13.8m    8.5m     4.4       1

13.8 MiB * 80 ~= 1 GiB ... that is a lot of memory.

Now, there's some nuance to be had here.

First, on most operating systems that have virtual memory, code segment pages are shared between processes – there's no point in having 80 copies of libc or the Python interpreter in memory.

The unique set size is probably a better measurement than the resident set size, since it excludes memory shared between processes.2 So, for the macOS output above,3 the actual usage is more like 8.5 MiB * 80 = 680 MiB.

Second, if you use the fork or forkserver start methods, processes also share memory allocated before the fork() via copy-on-write; for Python, this includes module code and variables. On Linux, the actual usage is 1.7 MiB * 80 = 136 MiB:

>>> benchmark(ProcessPoolExecutor(80, initializer=init_client), timer=top)
    PID     RES     USS    %CPU     nTH
 329801   17.0m    6.6m     5.1       3
 329802   13.3m    1.6m     2.1       1
  ... 78 more lines ...
 329881   13.3m    1.7m     2.0       1

However, it's important to note that's just a lower bound; memory allocated after fork() is not shared, and most real work will unavoidably allocate more memory.

Liking this so far? Here's another article you might like:

Why not both? #

One reasonable way of dealing with this would be to split the input into batches, one per CPU, and pass them to a ProcessPoolExecutor, which in turn runs the batch items using a ThreadPoolExecutor.4

But that would mean we need to change our code, and that's no fun.

If only we had an executor that worked seamlessly across processes and threads.

A minimal plausible solution #

In keeping with what has become tradition by now, we'll take an iterative, problem-solution approach; since we're not sure what to do yet, we start with the simplest thing that could possibly work.

We know we want a process pool executor that starts one thread pool executor per process, so let's deal with that first.

class ProcessThreadPoolExecutor(concurrent.futures.ProcessPoolExecutor):

    def __init__(self, max_threads=None, initializer=None, initargs=()):
        super().__init__(
            initializer=_init_process,
            initargs=(max_threads, initializer, initargs)
        )

By subclassing ProcessPoolExecutor, we get the map() implementation for free. By going with the default max_workers, we get one process per CPU (which is what we want); we can add more arguments later if needed.

In our custom process initializer, we set up a global thread pool executor, and then call the initializer provided by the user:

_executor = None

def _init_process(max_threads, initializer, initargs):
    global _executor

    _executor = concurrent.futures.ThreadPoolExecutor(max_threads)
    atexit.register(_executor.shutdown)

    if initializer:
        initializer(*initargs)

Likewise, submit() passes the work along to the thread pool executor:

class ProcessThreadPoolExecutor(concurrent.futures.ProcessPoolExecutor):
    # ...
    def submit(self, fn, *args, **kwargs):
        return super().submit(_submit, fn, *args, **kwargs)
def _submit(fn, *args, **kwargs):
    return _executor.submit(fn, *args, **kwargs).result()

OK, that looks good enough; let's use it and see if it works:

def _do_stuff(n):
    print(f"doing: {n}")
    return n ** 2

if __name__ == '__main__':
    with ProcessThreadPoolExecutor() as e:
        print(list(e.map(_do_stuff, [0, 1, 2])))
 $ python ptpe.py
doing: 0
doing: 1
doing: 2
[0, 1, 4]

Wait, we got it on the first try?!

Let's measure that:

>>> from bench import *
>>> from ptpe import *
>>> benchmark(ProcessThreadPoolExecutor(30, initializer=init_client), n=1000)
elapsed: 6.161

Hmmm... that's unexpectedly slow... almost as if:

>>> multiprocessing.cpu_count()
4
>>> benchmark(ProcessPoolExecutor(4, initializer=init_client), n=1000)
elapsed: 6.067

Ah, because _submit() waits for the result() in the main thread of the worker process, this is just a ProcessPoolExecutor with extra steps.


But what if we send back the future instead?

    def submit(self, fn, *args, **kwargs):
        return super().submit(_submit, fn, *args, **kwargs).result()
def _submit(fn, *args, **kwargs):
    return _executor.submit(fn, *args, **kwargs)

Alas:

$ python ptpe.py
doing: 0
doing: 1
doing: 2
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
  File "concurrent/futures/process.py", line 210, in _sendback_result
    result_queue.put(_ResultItem(work_id, result=result,
  File "multiprocessing/queues.py", line 391, in put
    obj = _ForkingPickler.dumps(obj)
  File "multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_thread.RLock' object
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "ptpe.py", line 42, in <module>
    print(list(e.map(_do_stuff, [0, 1, 2])))
  ...
TypeError: cannot pickle '_thread.RLock' object

It may not seem like it, but this is a partial success: the work happens, we just can't get anything back. Not surprising, to be honest, it couldn't have been that easy.

Getting results #

If you look carefully at the traceback, you'll find a hint of how ProcessPoolExecutor gets its own results back from workers – a queue; the module docstring even has a neat data-flow diagram:

|======================= In-process =====================|== Out-of-process ==|

+----------+     +----------+       +--------+     +-----------+    +---------+
|          |  => | Work Ids |       |        |     | Call Q    |    | Process |
|          |     +----------+       |        |     +-----------+    |  Pool   |
|          |     | ...      |       |        |     | ...       |    +---------+
|          |     | 6        |    => |        |  => | 5, call() | => |         |
|          |     | 7        |       |        |     | ...       |    |         |
| Process  |     | ...      |       | Local  |     +-----------+    | Process |
|  Pool    |     +----------+       | Worker |                      |  #1..n  |
| Executor |                        | Thread |                      |         |
|          |     +----------- +     |        |     +-----------+    |         |
|          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
|          |     +------------+     |        |     +-----------+    |         |
|          |     | 6: call()  |     |        |     | ...       |    |         |
|          |     |    future  |     |        |     | 4, result |    |         |
|          |     | ...        |     |        |     | 3, except |    |         |
+----------+     +------------+     +--------+     +-----------+    +---------+

Now, we could probably use the same queue somehow, but it would involve touching a lot of (private) internals.5 Instead, let's use a separate queue:

    def __init__(self, max_threads=None, initializer=None, initargs=()):
        self.__result_queue = multiprocessing.Queue()
        super().__init__(
            initializer=_init_process,
            initargs=(self.__result_queue, max_threads, initializer, initargs)
        )

On the worker side, we make it globally accessible:

_executor = None
_result_queue = None

def _init_process(queue, max_threads, initializer, initargs):
    global _executor, _result_queue

    _executor = concurrent.futures.ThreadPoolExecutor(max_threads)
    atexit.register(_executor.shutdown)

    _result_queue = queue
    atexit.register(_result_queue.close)

    if initializer:
        initializer(*initargs)

...so we can use it from a task callback registered by _submit():

def _submit(fn, *args, **kwargs):
    task = _executor.submit(fn, *args, **kwargs)
    task.add_done_callback(_put_result)

def _put_result(task):
    if exception := task.exception():
        _result_queue.put((False, exception))
    else:
        _result_queue.put((True, task.result()))

Back in the main process, we handle the results in a thread:

    def __init__(self, max_threads=None, initializer=None, initargs=()):
        # ...
        self.__result_handler = threading.Thread(target=self.__handle_results)
        self.__result_handler.start()
    def __handle_results(self):
        for ok, result in iter(self.__result_queue.get, None):
            print(f"{'ok' if ok else 'error'}: {result}")

Finally, to stop the handler, we use None as a sentinel on executor shutdown:

    def shutdown(self, wait=True):
        super().shutdown(wait=wait)
        if self.__result_queue:
            self.__result_queue.put(None)
            if wait:
                self.__result_handler.join()
            self.__result_queue.close()
            self.__result_queue = None

Let's see if it works:

$ python ptpe.py
doing: 0
ok: [0]
doing: 1
ok: [1]
doing: 2
ok: [4]
Traceback (most recent call last):
  File "concurrent/futures/_base.py", line 317, in _result_or_cancel
    return fut.result(timeout)
AttributeError: 'NoneType' object has no attribute 'result'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  ...
AttributeError: 'NoneType' object has no attribute 'cancel'

Yay, the results are making it to the handler!

The error happens because instead of returning a Future, our submit() returns the result of _submit(), which is always None.

Fine, we'll make our own futures #

But submit() must return a future, so we make our own:

    def __init__(self, max_threads=None, initializer=None, initargs=()):
        # ...
        self.__tasks = {}
        # ...
    def submit(self, fn, *args, **kwargs):
        outer = concurrent.futures.Future()
        task_id = id(outer)
        self.__tasks[task_id] = outer

        outer.set_running_or_notify_cancel()
        inner = super().submit(_submit, task_id, fn, *args, **kwargs)

        return outer

In order to map results to their futures, we can use a unique identifier; the id() of the outer future should do, since it is unique for the object's lifetime.

We pass the id to _submit(), then to _put_result() as an attribute on the future, and finally back in the queue with the result:

def _submit(task_id, fn, *args, **kwargs):
    task = _executor.submit(fn, *args, **kwargs)
    task.task_id = task_id
    task.add_done_callback(_put_result)

def _put_result(task):
    if exception := task.exception():
        _result_queue.put((task.task_id, False, exception))
    else:
        _result_queue.put((task.task_id, True, task.result()))

Back in the result handler, we find the maching future, and set the result accordingly:

    def __handle_results(self):
        for task_id, ok, result in iter(self.__result_queue.get, None):
            outer = self.__tasks.pop(task_id)
            if ok:
                outer.set_result(result)
            else:
                outer.set_exception(result)

And it works:

$ python ptpe.py
doing: 0
doing: 1
doing: 2
[0, 1, 4]

I mean, it really works:

>>> benchmark(ProcessThreadPoolExecutor(10, initializer=init_client))
elapsed: 6.220
>>> benchmark(ProcessThreadPoolExecutor(20, initializer=init_client))
elapsed: 3.397
>>> benchmark(ProcessThreadPoolExecutor(30, initializer=init_client))
elapsed: 2.575
>>> benchmark(ProcessThreadPoolExecutor(40, initializer=init_client))
elapsed: 2.664

3.3x is not quite the 4 CPUs my laptop has, but it's pretty close, and much better than the 2.2x we got from processes alone.

Death becomes a problem #

I wonder what happens when a worker process dies.

For example, the initializer can fail:

>>> executor = ProcessPoolExecutor(initializer=divmod, initargs=(0, 0))
>>> executor.submit(int).result()
Exception in initializer:
Traceback (most recent call last):
  ...
ZeroDivisionError: integer division or modulo by zero
Traceback (most recent call last):
  ...
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

...or a worker can die some time later, which we can help along with a custom timer:6

@contextmanager
def terminate_child(interval=1):
    threading.Timer(interval, psutil.Process().children()[-1].terminate).start()
    yield
>>> executor = ProcessPoolExecutor(initializer=init_client)
>>> benchmark(executor, timer=terminate_child)
[ one second later ]
Traceback (most recent call last):
  ...
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

Now let's see our executor:

>>> executor = ProcessThreadPoolExecutor(30, initializer=init_client)
>>> benchmark(executor, timer=terminate_child)
[ one second later ]
[ ... ]
[ still waiting ]
[ ... ]
[ hello? ]

If the dead worker is not around to send back results, its futures never get completed, and map() keeps waiting until the end of time, when the expected behavior is to detect when this happens, and fail all pending tasks with BrokenProcessPool.


Before we do that, though, let's address a more specific issue.

If map() hasn't finished submitting tasks when the worker dies, inner fails with BrokenProcessPool, which right now we're ignoring entirely. While we don't need to do anything about it in particular because it gets covered by handling the general case, we should still propagate all errors to the outer task anyway.

    def submit(self, fn, *args, **kwargs):
        # ...
        inner = super().submit(_submit, task_id, fn, *args, **kwargs)
        inner.task_id = task_id
        inner.add_done_callback(self.__handle_inner)

        return outer
    def __handle_inner(self, inner):
        task_id = inner.task_id
        if exception := inner.exception():
            if outer := self.__tasks.pop(task_id, None):
                outer.set_exception(exception)

This fixes the case where a worker dies almost instantly:

>>> executor = ProcessThreadPoolExecutor(30, initializer=init_client)
>>> benchmark(executor, timer=lambda: terminate_child(0))
Traceback (most recent call last):
  ...
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

For the general case, we need to check if the executor is broken – but how? We've already decided we don't want to depend on internals, so we can't use Process​Pool​Executor.​​_broken. Maybe we can submit a dummy task and see if it fails instead:

    def __check_broken(self):
        try:
            super().submit(int).cancel()
        except concurrent.futures.BrokenExecutor as e:
            return type(e)(str(e))
        except RuntimeError as e:
            if 'shutdown' not in str(e):
                raise
        return None

Using it is a bit involved, but not completely awful:

    def __handle_results(self):
        last_broken_check = time.monotonic()

        while True:
            now = time.monotonic()
            if now - last_broken_check >= .1:
                if exc := self.__check_broken():
                    break
                last_broken_check = now

            try:
                value = self.__result_queue.get(timeout=.1)
            except queue.Empty:
                continue

            if not value:
                return

            task_id, ok, result = value
            if outer := self.__tasks.pop(task_id, None):
                if ok:
                    outer.set_result(result)
                else:
                    outer.set_exception(result)

        while self.__tasks:
            try:
                _, outer = self.__tasks.popitem()
            except KeyError:
                break
            outer.set_exception(exc)

When there's a steady stream of results coming in, we don't want to check too often, so we enforce a minimum delay between checks. When there are no results coming in, we want to check regularly, so we use the Queue.get() timeout to avoid waiting forever. If the check fails, we break out of the loop and fail the pending tasks. Like so:

>>> executor = ProcessThreadPoolExecutor(30, initializer=init_client)
>>> benchmark(executor, timer=terminate_child)
Traceback (most recent call last):
  ...
concurrent.futures.process.BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore

cool smoking cat wearing denim jacket and sunglasses


So, yeah, I think we're done. Here's the final executor and benchmark code.

Some features left as an exercise for the reader:

Learned something new today? Share this with others, it really helps!

Want to know when new articles come out? Subscribe here to get new stuff straight to your inbox!

Bonus: free threading #

You may have heard people being excited about the experimental free threading support added in Python 3.13, which allows running Python code on multiple CPUs.

And for good reason:

$ python3.13t
Python 3.13.2 experimental free-threading build
>>> from concurrent.futures import *
>>> from bench import *
>>> init_client()
>>> benchmark(ThreadPoolExecutor(30))
elapsed: 8.224
>>> benchmark(ThreadPoolExecutor(40))
elapsed: 6.193
>>> benchmark(ThreadPoolExecutor(120))
elapsed: 2.323

3.6x over to the GIL version, with none of the shenanigans in this article!

Alas, packages with extensions need to be updated to support it:

>>> import psutil
zsh: segmentation fault  python3.13t

...but the ecosystem is slowly catching up.

cat patiently waiting on balcony

  1. At least, all we can use for pure-Python code. I‍/‍O always releases the global interpreter lock, and so do some extension modules. [return]

  2. The psutil documentation for memory_full_info() explains the difference quite nicely and links to further resources, because good libraries educate. [return]

  3. You may have to run Python as root to get the USS of child processes. [return]

  4. And no, asyncio is not a solution, since the event loop runs in a single thread, so you'd still need to run one event loop per CPU in dedicated processes. [return]

  5. Check out nilp0inter/threadedprocess for an idea of what that looks like. [return]

  6. pkill -fn '[Pp]ython' would've done it too, but it gets tedious if you do it a lot, and it's a different command on Windows. [return]

April 21, 2025 04:43 PM UTC


Real Python

Shallow vs Deep Copying of Python Objects

Python’s assignment statements don’t copy objects as they do in some other programming languages. Instead, they create bindings between your variable names and objects. For immutable objects, this distinction usually doesn’t matter. However, when you work with mutable objects or containers of mutable items, you may need to create explicit copies or “clones” of these objects.

By the end of this tutorial, you’ll understand that:

  • Shallow copying creates a new object but references the same nested objects, leading to shared changes.
  • Deep copying recursively duplicates all objects, ensuring full independence from the original.
  • Python’s copy module provides the copy() function for shallow copies and deepcopy() for deep copies.
  • Custom classes can implement .__copy__() and .__deepcopy__() for specific copying behavior.
  • Assignment in Python binds variable names to objects without copying, unlike some lower-level languages.

Explore the nuances of copying objects in Python and learn how to apply these techniques to manage mutable data structures effectively.

Get Your Code: Click here to download the free sample code that you’ll use to learn about shallow vs deep copying in Python.

Take the Quiz: Test your knowledge with our interactive “Shallow vs Deep Copying of Python Objects” quiz. You’ll receive a score upon completion to help you track your learning progress:


Interactive Quiz

Shallow vs Deep Copying of Python Objects

In this quiz, you'll test your understanding of Python's copy module, which provides tools for creating shallow and deep copies of objects. This knowledge is crucial for managing complex, mutable data structures safely and effectively.

Getting the Big Picture of Object Copying

Copying an object means creating its exact duplicate in memory. While there are many good reasons for doing so, at the end of the day, it allows you to modify the cloned objects independently of each other.

For example, a getter method may return sensitive information like the balance of someone’s bank account. To prevent unauthorized modifications of the bank account’s state, whether accidental or intentional, you’ll typically return a copy of the original data as a defensive programming measure. That way, you’ll have two separate objects safely representing the same piece of information.

Sometimes, you may need to work with multiple snapshots of the same data. In 3D computer graphics, transformations like rotation and scaling rely on matrix multiplication to update a model’s vertices. Rather than permanently changing the original model, you can duplicate its vertices and apply transformations to the copy. This will allow you to animate the model in a non-destructive way.

The following section provides an overview of the fundamental concepts and challenges associated with object copying in general. If you’d like to jump straight to copying objects in Python, then feel free to skip ahead.

Scalar vs Composite Types

In programming, objects can be classified into two broad categories of data types:

  1. Scalar
  2. Composite

Scalar data types represent simple, indivisible values that can’t be decomposed into smaller parts, much like atoms were once thought to be. Examples of scalars in Python include numbers, dates, and UUID-type identifiers:

Python
>>> from datetime import date
>>> from uuid import uuid4
>>> numbers = 42, 3.14, 3 + 2j
>>> dates = date.today(), date(1991, 2, 20)
>>> ids = uuid4(), uuid4(), uuid4()
Copied!

Each of these objects holds a single value representing a basic unit of data. By combining these fundamental building blocks, you can create more complex data structures.

Composite data types, on the other hand, are containers made up of other elements. Some of them are merely collections of scalar values, while others contain other composites or both, forming a complex hierarchy of objects:

Python
>>> import array
>>> audio_frames = array.array("h", [2644, 2814, 3001])
>>> audio_data = (
...     ("PCM", 2, 44100, 16),
...     [
...         (15975, 28928),
...         (-86, 15858),
...         (31999, -3),
...     ]
... )
Copied!

In this case, the "h" argument in the array.array() call specifies that the array will store numbers as two-byte signed integers. As you can see, a Python array aggregates scalar numbers into a flat sequence, whereas a list and tuple can contain deeply nested structures arranged in a particular way.

Note: Python types can sometimes fall into a gray area. For example, strings have a dual nature, as they’re technically sequences of characters. At the same time, they behave like scalars in specific contexts because they don’t allow element-wise operations—you must treat them as a whole.

These two categories of data types are closely related to the concept of object mutability, which you’ll learn more about now.

Mutable vs Immutable Objects

In high-level programming languages like Java and JavaScript, scalar types typically represent read-only values that can’t change over time. Such objects don’t allow in-place state mutation during their lifetime. So, if you want to modify a scalar value, then your only option is to disregard it and create another instance with a different value. In contrast, composite types can be either mutable or immutable, depending on their implementation.

Note: Immutable types have several advantages, including thread safety and improved memory efficiency, as they let you reuse objects without copying. On the other hand, when performance is vital, mutable types can reduce the overhead associated with object creation, especially when you tend to modify your objects frequently.

Read the full article at https://realpython.com/copying-python-objects/ »


[ Improve Your Python With 🐍 Python Tricks 💌 – Get a short & sweet Python Trick delivered to your inbox every couple of days. >> Click here to learn more and see examples ]

April 21, 2025 02:00 PM UTC

Quiz: Shallow vs Deep Copying of Python Objects

In this quiz, you’ll test your understanding of Shallow vs Deep Copying of Python Objects.

By working through this quiz, you’ll revisit the concepts of shallow and deep copying, and how they affect mutable objects in Python.


[ Improve Your Python With 🐍 Python Tricks 💌 – Get a short & sweet Python Trick delivered to your inbox every couple of days. >> Click here to learn more and see examples ]

April 21, 2025 12:00 PM UTC


Talk Python to Me

#502: Django Ledger: Accounting with Python

Do you or your company need accounting software? Well, there are plenty of SaaS products out there that you can give your data to. but maybe you also really like Django and would rather have a foundation to build your own accounting system exactly as you need for your company or your product. On this episode, we're diving into Django Ledger, created by Miguel Sanda, which can do just that.<br/> <br/> <strong>Episode sponsors</strong><br/> <br/> <a href='https://talkpython.fm/auth0'>Okta</a><br> <a href='https://talkpython.fm/training'>Talk Python Courses</a><br/> <br/> <h2 class="links-heading">Links from the show</h2> <div><strong>Miguel Sanda on Twitter</strong>: <a href="https://x.com/elarroba?featured_on=talkpython" target="_blank" >@elarroba</a><br/> <strong>Miguel on Mastodon</strong>: <a href="https://fosstodon.org/@elarroba" target="_blank" >@elarroba@fosstodon.org</a><br/> <strong>Miguel on GitHub</strong>: <a href="https://github.com/elarroba?featured_on=talkpython" target="_blank" >github.com</a><br/> <br/> <strong>Django Ledger on Github</strong>: <a href="https://github.com/arrobalytics/django-ledger?featured_on=talkpython" target="_blank" >github.com</a><br/> <strong>Django Ledger Discord</strong>: <a href="https://discord.gg/c7PZcbYgrc?featured_on=talkpython" target="_blank" >discord.gg</a><br/> <br/> <strong>Get Started with Django MongoDB Backend</strong>: <a href="https://www.mongodb.com/docs/languages/python/django-mongodb/current/get-started/?featured_on=talkpython" target="_blank" >mongodb.com</a><br/> <strong>Wagtail CMS</strong>: <a href="https://wagtail.org/?featured_on=talkpython" target="_blank" >wagtail.org</a><br/> <strong>Watch this episode on YouTube</strong>: <a href="https://www.youtube.com/watch?v=eM170jyjbu8" target="_blank" >youtube.com</a><br/> <strong>Episode transcripts</strong>: <a href="https://talkpython.fm/episodes/transcript/502/django-ledger-accounting-with-python" target="_blank" >talkpython.fm</a><br/> <br/> <strong>--- Stay in touch with us ---</strong><br/> <strong>Subscribe to Talk Python on YouTube</strong>: <a href="https://talkpython.fm/youtube" target="_blank" >youtube.com</a><br/> <strong>Talk Python on Bluesky</strong>: <a href="https://bsky.app/profile/talkpython.fm" target="_blank" >@talkpython.fm at bsky.app</a><br/> <strong>Talk Python on Mastodon</strong>: <a href="https://fosstodon.org/web/@talkpython" target="_blank" ><i class="fa-brands fa-mastodon"></i>talkpython</a><br/> <strong>Michael on Bluesky</strong>: <a href="https://bsky.app/profile/mkennedy.codes?featured_on=talkpython" target="_blank" >@mkennedy.codes at bsky.app</a><br/> <strong>Michael on Mastodon</strong>: <a href="https://fosstodon.org/web/@mkennedy" target="_blank" ><i class="fa-brands fa-mastodon"></i>mkennedy</a><br/></div>

April 21, 2025 08:00 AM UTC

April 20, 2025


ListenData

How to Use Gemini API in Python

Integrating Gemini API with Python

In this tutorial, you will learn how to use Google's Gemini AI model through its API in Python.

Update (April 21, 2025) : The tutorial has been updated for the latest Gemini model - Gemini 2.5 Flash and Gemini 2.5 Pro. It supports real-time search and multimodal generation.
Steps to Access Gemini API

Follow the steps below to access the Gemini API and then use it in python.

  1. Visit Google AI Studio website.
  2. Sign in using your Google account.
  3. Create an API key.
  4. Install the Google AI Python library for the Gemini API using the command below :
    pip install google-genai
    .
To read this article in full, please click here

April 20, 2025 10:15 PM UTC


TechBeamers Python

Matplotlib Practice Online: Free Exercises

📝 Check out a comprehensive set of Matplotlib exercises and practice with our Online Matplotlib Compiler. This library is mainly used for data visualization in Python. From this tutorial, you will get some idea about – how to analyze trends, build machine learning models, and explore datasets. What is Matplotlib? Matplotlib is famous for its […]

Source

April 20, 2025 07:13 PM UTC


Ed Crewe

Talk about Cloud Prices at PyConLT 2025


Introduction to Cloud Pricing

I am looking forward to speaking at PyConLT 2025

Its been a while (12 years!) since my last Python conference EuroPython Florence 2012, when I spoke as a Django web developer, although I did give a Golang talk at Kubecon USA last year.

I work at EDB, the Postgres company, on our Postgres AI product. The cloud version of which runs across the main cloud providers, AWS, Azure and GCP.

The team I am in handles the identity management and billing components of the product. So whilst I am mainly a Golang micro-service developer, I have dipped my toe into Data Science, having rewritten our Cloud prices ETL using Python & Airflow. The subject of my talk in Lithuania.

Cloud pricing can be surprisingly complex ... and the price lists are not small.

The full price lists for the 3 CSPs together are almost 5 million prices - known as SKUs (Stock Keeping Unit prices)

csp x service x type x tier x region
3    x  200      x 50     x 3     x 50        = 4.5 million

csp = AWS, Azure and GCP

service = vms, k8s, network, load balancer, storage etc.

type = e.g. storage - general purpose E2, N1 ... accelerated A1, A2  multiplied by various property sizes

tier  = T-shirt size tiers of usage, ie more use = cheaper rate - small, medium, large

region = us-east-1, us-west-2, af-south-1, etc.

We need to gather all the latest service SKU that our Postgres AI may use and total them up as a cost estimate for when customers are selecting the various options for creating or adding to their installation.
Applying the additional pricing for our product and any private offer discounts for it, as part of this process.

Therefore we needed to build a data pipeline to gather the SKUs and keep them current.

Previously we used a 3rd party kubecost based provider's data, however our usage was not sufficient to justify for paying for this particular cloud service when its free usage expired.

Hence we needed to rewrite our cloud pricing data pipeline. This pipeline is in Apache Airflow but it could equally be in Dagster or any other data pipeline framework.

My talk deals with the wider points around cloud pricing, refactoring a data pipeline and pipeline framework options. But here I want to provide more detail on the data pipeline's Python code, its use of Embedded Postgres and Click, and the benefits for development and testing.  Some things I didn't have room for in the talk.


Outline of our use of Data Pipelines

Airflow, Dagster, etc. provide many tools for pipeline development.
Notably local development mode for running up the pipeline framework locally and doing test runs.
Including some reloading on edit, it can still be a long process, running up a pipeline and then executing the full set of steps, known as a directed acyclic graph, DAG.

One way to improve the DEVX is if the DAG step's code is encapsulated as much as possible per step.
Removing use of shared state where that is viable and allowing individual steps to be separately tested, rapidly, with fixture data. With fast stand up and tear down, of temporary embedded storage.

To avoid shared state persistence across the whole pipeline we use extract transform load (ETL) within each step, rather than across the whole pipeline. This enables functional running and testing of individual steps outside the pipeline.


The Scraper Class

We need a standard scraper class to fetch the cloud prices from each CSP so use an abstract base class.


from abc import ABC

class BaseScraper(ABC):

   """Abstract base class for Scrapers"""

   batch = 500

   conn = None

   unit_map = {"FAIL": ""}

   root_url = ""


   def map_units(self, entry, key):

       """To standardize naming of units between CSPs"""

       return self.unit_map.get(entry.get(key, "FAIL"), entry[key])


   def scrape_sku(self):

       """Scrapes prices from CSP bulk JSON API - uses CSP specific methods"""

       Pass


   def bulk_insert_rows(self, rows):

       """Bulk insert batches of rows - Note that Psycopg >= 3.1 uses pipeline mode"""

       query = """INSERT INTO api_price.infra_price VALUES

       (%(sku_id)s, %(cloud_provider)s, %(region)s, %(sku_name)s, %(end_usage_amount)s)"""

       with self.conn.cursor() as cur:

           cur.executemany(query, rows)


This has 3 common methods:

  1. mapping units to common ones across all CSP
  2. Top level scrape sku methods some CSP differences within sub methods called from it
  3. Bulk insert rows - the main concrete method used by all scrapers

To bulk insert 500 rows per query we use Psycopg 3 pipeline mode - so it can send batch updates again and again without waiting for response.

The database update against local embedded Postgres is faster than the time to scrape the remote web site SKUs.


The largest part of the Extract is done at this point. Rather than loading all 5 million SKU as we did with the kubecost data dump, to query out the 120 thousand for our product. Scraping the sources directly we only need to ingest those 120k SKU. Which saves handling 97.6% of the data!


So the resultant speed is sufficient although not as performant as pg_dump loading which uses COPY.


Unfortunately Python Psycopg is significantly slower when using cursor.copy and it mitigated against using zipped up Postgres dumps. Hence all the data artefact creation and loading simply uses the pg_dump utility wrapped as a Python shell command. 

There is no need to use Python here when there is the tried and tested C based pg_dump utility for it that ensures compatibility outside our pipeline. Later version pg_dump can always handle earlier Postgres dumps.


We don't need to retain a long history of artefacts, since it is public data and never needs to be reverted.

This allows us a low retention level, cleaning out most of the old dumps on creation of a new one. So any storage saving on compression is negligible.

Therefore we avoid pg_dump compression, since it can be significantly slower, especially if the data already contains compressed blobs. Plain SQL COPY also allows for data inspection if required - eg grep for a SKU, when debugging why a price may be missing.


Postgres Embedded wrapped with Go

Unlike MySQL, Postgres doesn't do in memory databases. The equivalent for temporary or test run database lifetime, is the embedded version of Postgres. Run from an auto-created temp folder of files. 
Python doesn’t have maintained wrapper for Embedded Postgres, sadly project https://github.com/Simulmedia/pyembedpg is abandoned 😢

Hence use the most up to date wrapper from Go. Running the Go binary via a Python shell command.
It still lags behind by a version of Postgres, so its on Postgres 16 rather than latest 17.
But for the purposes of embedded use that is irrelevant.

By using separate temporary Postgres per step we can save a dumped SQL artefact at the end of a step and need no data dependency between steps, meaning individual step retry in parallel, just works.
The performance of localhost dump to socket is also superior.
By processing everything in the same (if embedded) version of our final target database as the Cloud Price, Go micro-service, we remove any SQL compatibility issues and ensure full Postgresql functionality is available.

The final data artefacts will be loaded to a Postgres cluster price schema micro-service running on CloudNativePG

Use a Click wrapper with Tests

The click package provides all the functionality for our pipeline..

> pscraper -h

Usage: pscraper [OPTIONS] COMMAND [ARGS]...

   price-scraper: python web scraping of CSP prices for api-price

Options:

  -h, --help  Show this message and exit.


Commands:

  awsscrape     Scrape prices from AWS

  azurescrape  Scrape prices from Azure

  delold            Delete old blob storage files, default all over 12 weeks old are deleted

  gcpscrape     Scrape prices from GCP - set env GCP_BILLING_KEY

  pgdump        Dump postgres file and upload to cloud storage - set env STORAGE_KEY
                      > pscraper pgdump --port 5377 --file price.sql 

  pgembed      Run up local embeddedPG on a random port for tests

> pscraper pgembed

  pgload           Load schema to local embedded postgres for testing

> pscraper pgload --port 5377 --file price.sql


This caters for developing the step code entirely outside the pipeline for development and debug.
We can run pgembed to create a local db, pgload to add the price schema. Then run individual scrapes from a pipenv pip install -e version of the the price scraper package.


For unit testing we can create a mock response object for the data scrapers that returns different fixture payloads based on the query and monkeypatch it in. This allows us to functionally test the whole scrape and data artefact creation ETL cycle as unit functional tests.

Any issues with source data changes can be replicated via a fixture for regression tests.

class MockResponse:

"""Fake to return fixture value of requests.get() for testing scrape parsing"""

name = "Mock User"
payload = {}
content = ""
status_code = 200
url = "http://mock_url"

def __init__(self, payload={}, url="http://mock_url"):
self.url = url
self.payload = payload
self.content = str(payload)

def json(self):
return self.payload


def mock_aws_get(url, **kwargs):
    """Return the fixture JSON that matches the URL used"""
for key, fix in fixtures.items():
if key in url:
return MockResponse(payload=fix, url=url)
return MockResponse()

class TestAWSScrape(TestCase):
"""Tests for the 'pscraper awsscrape' command"""

def setUpClass():
"""Simple monkeypatch in mock handlers for all tests in the class"""
psycopg.connect = MockConn
requests.get = mock_aws_get
# confirm that requests is patched hence returns short fixture of JSON from the AWS URLs
result = requests.get("{}/AmazonS3/current/index.json".format(ROOT))
assert len(result.json().keys()) > 5 and len(result.content) < 2000

A simple DAG with Soda Data validation

The click commands for each DAG are imported at the top, one for the scrape and one for postgres embedded, the DAG just becomes a wrapper to run them, adding Soda data validation of the scraped data ...

def scrape_azure():
   """Scrape Azure via API public json web pages"""
   from price_scraper.commands import azurescrape, pgembed
   folder, port = setup_pg_db(PORT)
   error = azurescrape.run_azure_scrape(port, HOST)
   if not error:
       error = csp_dump(port, "azure")
   if error:
       pgembed.teardown_pg_embed(folder) 
       notify_slack("azure", error)
       raise AirflowFailException(error)
  
   data_test = SodaScanOperator(
       dag=dag,
       task_id="data_test",
       data_sources=[
           {
               "data_source_name": "embedpg",
               "soda_config_path": "price-scraper/soda/configuration_azure.yml",
           }
       ],
       soda_cl_path="price-scraper/soda/price_azure_checks.yml",
   )
   data_test.execute(dict())
   pgembed.teardown_pg_embed(folder)
 


We setup a new Embedded Postgres (takes a few seconds) and then scrape directly to it.


We then use the SodaScanOperator to check the data we have scraped, if there is no error we dump to blob storage otherwise notify Slack with the error and raise it ending the DAG

Our Soda tests check that the number of and prices are in the ranges that they should be for each service. We also check we have the amount of tiered rates that we expect. We expect over 10 starting usage rates and over 3000 specific tiered prices.

If the Soda tests pass, we dump to cloud storage and teardown temporary Postgres. A final step aggregates together each steps data. We save the money and maintenance of running a persistent database cluster in the cloud for our pipeline.


April 20, 2025 09:48 AM UTC


Python GUIs

Multithreading PyQt6 applications with QThreadPool — Run background tasks concurrently without impacting your UI

A common problem when building Python GUI applications is the interface "locking up" when attempting to perform long-running background tasks. In this tutorial, we'll cover quick ways to achieve concurrent execution in PyQt6.

If you'd like to run external programs (such as command-line utilities) from your applications, check out the Using QProcess to run external programs tutorial.

Background: The frozen GUI issue

Applications based on Qt (like most GUI applications) are based on events. This means that execution is driven in response to user interaction, signals, and timers. In an event-driven application, clicking a button creates an event that your application subsequently handles to produce some expected output. Events are pushed onto and taken off an event queue and processed sequentially.

In PyQt, we create an app with the following code:

python
app = QApplication([])
window = MainWindow()
app.exec()

The event loop starts when you call .exec() on the QApplication object and runs within the same thread as your Python code. The thread that runs this event loop — commonly referred to as the GUI thread — also handles all window communication with the host operating system.

By default, any execution triggered by the event loop will also run synchronously within this thread. In practice, this means that the time your PyQt application spends doing something, the communication with the window and the interaction with the GUI are frozen.

If what you're doing is simple, and it returns control to the GUI loop quickly, the GUI freeze will be imperceptible to the user. However, if you need to perform longer-running tasks, for example, opening and writing a large file, downloading some data, or rendering a high-resolution image, there are going to be problems.

To your user, the application will appear to be unresponsive (because it is). Because your app is no longer communicating with the OS, on macOS, if you click on your app, you will see the spinning wheel of death. And, nobody wants that.

The solution is to move your long-running tasks out of the GUI thread into another thread. PyQt provides a straightforward interface for this.

Preparation: A minimal stub app

To demonstrate multi-threaded execution, we need an application to work with. Below is a minimal stub application for PyQt that will allow us to demonstrate multithreading and see the outcome in action. Simply copy and paste this into a new file and save it with an appropriate filename, like multithread.py. The remainder of the code will be added to this file (there is also a complete working example at the bottom if you're impatient:

python
import time

from PyQt6.QtCore import (
    QTimer,
)
from PyQt6.QtWidgets import (
    QApplication,
    QLabel,
    QMainWindow,
    QPushButton,
    QVBoxLayout,
    QWidget,
)

class MainWindow(QMainWindow):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.counter = 0

        layout = QVBoxLayout()

        self.label = QLabel("Start")
        button = QPushButton("DANGER!")
        button.pressed.connect(self.oh_no)

        layout.addWidget(self.label)
        layout.addWidget(button)

        w = QWidget()
        w.setLayout(layout)
        self.setCentralWidget(w)

        self.show()

        self.timer = QTimer()
        self.timer.setInterval(1000)
        self.timer.timeout.connect(self.recurring_timer)
        self.timer.start()

    def oh_no(self):
        time.sleep(5)

    def recurring_timer(self):
        self.counter += 1
        self.label.setText(f"Counter: {self.counter}")

app = QApplication([])
window = MainWindow()
app.exec()

Run the app as for any other Python application:

sh
$ python multithread.py

You will see a demonstration window with a number counting upwards. This count is generated by a simple recurring timer, firing once per second. Think of this as our event loop indicator (or GUI thread indicator), a simple way to let us know that our application is ticking over normally. There is also a button with the word "DANGER!. Push it.

You'll notice that each time you push the button, the counter stops ticking, and your application freezes entirely. On Windows, you may see the window turn pale, indicating it is not responding, while on macOS, you'll get the spinning wheel of death.

The wrong approach

Avoid doing this in your code.

What appears as a frozen interface is the main Qt event loop being blocked from processing (and responding to) window events. Your clicks on the window are still registered by the host OS and sent to your application, but because it's sat in your big ol' lump of code (calling time.sleep()), it can't accept or react to them. They have to wait until your code passes control back to Qt.

The quickest and perhaps most logical way to get around this issue is to accept events from within your code. This allows Qt to continue to respond to the host OS and your application will stay responsive. You can do this easily by using the static processEvents() method on the QApplication class.

For example, our long-running code time.sleep() could be broken down into five 1-second sleeps and insert the processEvents() in between. The code for this would be:

python
def oh_no(self):
    for n in range(5):
        QApplication.processEvents()
        time.sleep(1)

Now, when you push the DANGER! button, your app runs as before. However, now QApplication.processEvents() intermittently passes control back to Qt, and allows it to respond to events as normal. Qt will then accept events and handle them before returning to run the remainder of your code.

This approach works, but it's horrible for a few reasons, including the following:

  1. When you pass control back to Qt, your code is no longer running. This means that whatever long-running task you're trying to do will take longer. That is definitely not what you want.

  2. When you have multiple long-running tasks within your application, with each calling QApplication.processEvents() to keep things ticking, your application's behavior can be unpredictable.

  3. Processing events outside the main event loop (app.exec()) causes your application to branch off into handling code (e.g. for triggered slots or events) while within your loop. If your code depends on or responds to an external state, then this can cause undefined behavior.

The code below demonstrates the last point in action:

python
import time

from PyQt6.QtCore import (
    QTimer,
)
from PyQt6.QtWidgets import (
    QApplication,
    QLabel,
    QMainWindow,
    QPushButton,
    QVBoxLayout,
    QWidget,
)

class MainWindow(QMainWindow):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.counter = 0

        layout = QVBoxLayout()

        self.label = QLabel("Start")
        button = QPushButton("DANGER!")
        button.pressed.connect(self.oh_no)

        c = QPushButton("?")
        c.pressed.connect(self.change_message)

        layout.addWidget(self.label)
        layout.addWidget(button)
        layout.addWidget(c)

        w = QWidget()
        w.setLayout(layout)
        self.setCentralWidget(w)

        self.show()

        self.timer = QTimer()
        self.timer.setInterval(1000)
        self.timer.timeout.connect(self.recurring_timer)
        self.timer.start()

    def change_message(self):
        self.message = "OH NO"

    def oh_no(self):
        self.message = "Pressed"

        for n in range(100):
            time.sleep(0.1)
            self.label.setText(self.message)
            QApplication.processEvents()

    def recurring_timer(self):
        self.counter += 1
        self.label.setText(f"Counter: {self.counter}")

app = QApplication([])
window = MainWindow()
app.exec()

If you run this code you'll see the counter as before. Pressing DANGER! will change the displayed text to "Pressed", as defined at the entry point to the oh_no() method. However, if you press the "?" button while oh_no() is still running, you'll see that the message changes. The state is being changed from outside your event loop.

Use threads and processes

If you take a step back and think about what you want to happen in your application, then you can probably sum it up with "stuff to happen at the same time as other stuff happens".

There are two main approaches to running independent tasks within a PyQt application:

  1. Threads
  2. Processes

Threads share the same memory space, so they are quick to start up and consume minimal resources. The shared memory makes it trivial to pass data between threads. However, reading or writing memory from different threads can lead to race conditions or segfaults.

In a Python, there is the added issue that multiple threads are bound by the Global Interpreter Lock (GIL) — meaning non-GIL-releasing Python code can only execute in one thread at a time. However, this is not a major issue with PyQt, where most of the time is spent outside of Python.

Processes use separate memory space and an entirely separate Python interpreter. They sidestep any potential problems with Python's GIL but at the cost of slower start-up times, larger memory overhead, and complexity in sending and receiving data.

Processes in Qt are well suited to running and communicating with external programs. However, for simplicity's sake, threads are usually the best choice unless you have a good reason to use processes (see caveats later).

There is nothing stopping you from using pure Python threading or process-based approaches within your PyQt application. In the following sections, though, you'll rely on Qt's threading classes.

QRunnable and the QThreadPool

Favor this approach in your code.

Qt provides a straightforward interface for running jobs or tasks in other threads, which is nicely supported in PyQt. This interface is built around two classes:

  1. QRunnable: The container for the work you want to perform.
  2. QThreadPool: The method by which you pass that work to alternate threads.

The neat thing about using QThreadPool is that it handles queuing and executing workers for you. Other than queuing up jobs and retrieving the results, there is not much to do.

To define a custom QRunnable, you can subclass the base QRunnable class. Then, place the code you wish you execute within the run() method. The following is an implementation of our long-running time.sleep() job as a QRunnable.

Go ahead and add the following code to multithread.py, above the MainWindow class definition, and don't forget to import QRunnable and pyqtSlot from PyQt6.QtCore:

python
class Worker(QRunnable):
    """Worker thread."""

    @pyqtSlot()
    def run(self):
        """Your long-running job goes in this method."""
        print("Thread start")
        time.sleep(5)
        print("Thread complete")

Executing our long-running job in another thread is simply a matter of creating an instance of the Worker and passing it to our QThreadPool instance. It will be executed automatically.

Next, import QThreadPool from PyQt6.QtCore and add the following code to the __init__() method to set up our thread pool:

python
self.threadpool = QThreadPool()
thread_count = self.threadpool.maxThreadCount()
print(f"Multithreading with maximum {thread_count} threads")

Finally, update the oh_no() method as follows:

python
def oh_no(self):
    worker = Worker()
    self.threadpool.start(worker)

Now, clicking the DANGER! button will create a worker to handle the (long-running) job and spin that off into another thread via thread pool. If there are not enough threads available to process incoming workers, they'll be queued and executed in order at a later time.

Try it out, and you'll see that your application now handles you bashing the button with no problems.

Check what happens if you hit the button multiple times. You should see your threads executed immediately up to the number reported by maxThreadCount(). If you press the button again after there are already this number of active workers, then the subsequent workers will be queued until a thread becomes available.

Improved QRunnable

If you want to pass custom data into the runner function, you can do so via __init__(), and then have access to the data via self from within the run() slot:

python
class Worker(QRunnable):
    """Worker thread.

    :param args: Arguments to make available to the run code
    :param kwargs: Keywords arguments to make available to the run code
    """

    def __init__(self, *args, **kwargs):
        super().__init__()
        self.args = args
        self.kwargs = kwargs

    @pyqtSlot()
    def run(self):
        """Initialise the runner function with passed self.args, self.kwargs."""
        print(self.args, self.kwargs)

We can take advantage of the fact that Python functions are objects and pass in the function to execute rather than subclassing QRunnable for each runner function. In the following construction we only require a single Worker class to handle all of our jobs:

python
class Worker(QRunnable):
    """Worker thread.

    Inherits from QRunnable to handler worker thread setup, signals and wrap-up.

    :param callback: The function callback to run on this worker thread.
                     Supplied args and kwargs will be passed through to the runner.
    :type callback: function
    :param args: Arguments to pass to the callback function
    :param kwargs: Keywords to pass to the callback function
    """

    def __init__(self, fn, *args, **kwargs):
        super().__init__()
        self.fn = fn
        self.args = args
        self.kwargs = kwargs

    @pyqtSlot()
    def run(self):
        """Initialise the runner function with passed args, kwargs."""
        self.fn(*self.args, **self.kwargs)

You can now pass in any Python function and have it executed in a separate thread. Go ahead and update MainWindow with the following code:

python
def execute_this_fn(self):
    print("Hello!")

def oh_no(self):
    # Pass the function to execute
    worker = Worker(
        self.execute_this_fn
    )  # Any other args, kwargs are passed to the run function
    # Execute
    self.threadpool.start(worker)

Now, when you click DANGER!, the app will print Hello! to your terminal without affecting the counter.

Thread Input/Output

Sometimes, it's helpful to be able to pass back state and data from running workers. This could include the outcome of calculations, raised exceptions, or ongoing progress (maybe for progress bars). Qt provides the signals and slots framework to allow you to do just that. Qt's signals and slots are thread-safe, allowing safe communication directly from running threads to your GUI thread.

Signals allow you to emit values, which are then picked up elsewhere in your code by slot functions that have been linked with the connect() method.

Below is a custom WorkerSignals class defined to contain a number of example signals. Note that custom signals can only be defined on objects derived from QObject. Since QRunnable is not derived from QObject we can't define the signals there directly. A custom QObject to hold the signals is a quick solution:

python
class WorkerSignals(QObject):
    """Signals from a running worker thread.

    finished
        No data

    error
        tuple (exctype, value, traceback.format_exc())

    result
        object data returned from processing, anything
    """

    finished = pyqtSignal()
    error = pyqtSignal(tuple)
    result = pyqtSignal(object)

In this code, we've defined three custom signals:

  1. finished, which receives no data and is aimed to indicate when the task is complete.
  2. error, which receives a tuple of Exception type, Exception value, and formatted traceback.
  3. result, which receives any object type from the executed function.

You may not find a need for all of these signals, but they are included to give an indication of what is possible. In the following code, we're going to implement a long-running task that makes use of these signals to provide useful information to the user:

python
class Worker(QRunnable):
    """Worker thread.

    Inherits from QRunnable to handler worker thread setup, signals and wrap-up.

    :param callback: The function callback to run on this worker thread.
                     Supplied args and
                     kwargs will be passed through to the runner.
    :type callback: function
    :param args: Arguments to pass to the callback function
    :param kwargs: Keywords to pass to the callback function
    """

    def __init__(self, fn, *args, **kwargs):
        super().__init__()
        self.fn = fn
        self.args = args
        self.kwargs = kwargs
        self.signals = WorkerSignals()

    @pyqtSlot()
    def run(self):
        """Initialise the runner function with passed args, kwargs."""

        # Retrieve args/kwargs here; and fire processing using them
        try:
            result = self.fn(*self.args, **self.kwargs)
        except Exception:
            traceback.print_exc()
            exctype, value = sys.exc_info()[:2]
            self.signals.error.emit((exctype, value, traceback.format_exc()))
        else:
            self.signals.result.emit(result)  # Return the result of the processing
        finally:
            self.signals.finished.emit()  # Done

You can connect your own handler functions to the signals to receive notification of completion (or the result) of threads:

python
def execute_this_fn(self):
    for n in range(0, 5):
        time.sleep(1)
    return "Done."

def print_output(self, s):
    print(s)

def thread_complete(self):
    print("THREAD COMPLETE!")

def oh_no(self):
    # Pass the function to execute
    worker = Worker(
        self.execute_this_fn
    ) # Any other args, kwargs are passed to the run function
    worker.signals.result.connect(self.print_output)
    worker.signals.finished.connect(self.thread_complete)
    # Execute
    self.threadpool.start(worker)

You also often want to receive status information from long-running threads. This can be done by passing in callbacks to which your running code can send the information. You have two options here:

  1. Define new signals, allowing the handling to be performed using the event loop
  2. Use a regular Python function

In both cases, you'll need to pass these callbacks into your target function to be able to use them. The signal-based approach is used in the completed code below, where we pass a float back as an indicator of the thread's % progress.

The complete code

A complete working example is given below, showcasing the custom QRunnable worker together with the worker & progress signals. You should be able to easily adapt this code to any multithreaded application you develop.

python
import sys
import time
import traceback

from PyQt6.QtCore import (
    QObject,
    QRunnable,
    QThreadPool,
    QTimer,
    pyqtSignal,
    pyqtSlot,
)
from PyQt6.QtWidgets import (
    QApplication,
    QLabel,
    QMainWindow,
    QPushButton,
    QVBoxLayout,
    QWidget,
)

class WorkerSignals(QObject):
    """Signals from a running worker thread.

    finished
        No data

    error
        tuple (exctype, value, traceback.format_exc())

    result
        object data returned from processing, anything

    progress
        float indicating % progress
    """

    finished = pyqtSignal()
    error = pyqtSignal(tuple)
    result = pyqtSignal(object)
    progress = pyqtSignal(float)

class Worker(QRunnable):
    """Worker thread.

    Inherits from QRunnable to handler worker thread setup, signals and wrap-up.

    :param callback: The function callback to run on this worker thread.
                     Supplied args and
                     kwargs will be passed through to the runner.
    :type callback: function
    :param args: Arguments to pass to the callback function
    :param kwargs: Keywords to pass to the callback function
    """

    def __init__(self, fn, *args, **kwargs):
        super().__init__()
        self.fn = fn
        self.args = args
        self.kwargs = kwargs
        self.signals = WorkerSignals()
        # Add the callback to our kwargs
        self.kwargs["progress_callback"] = self.signals.progress

    @pyqtSlot()
    def run(self):
        try:
            result = self.fn(*self.args, **self.kwargs)
        except Exception:
            traceback.print_exc()
            exctype, value = sys.exc_info()[:2]
            self.signals.error.emit((exctype, value, traceback.format_exc()))
        else:
            self.signals.result.emit(result)
        finally:
            self.signals.finished.emit()

class MainWindow(QMainWindow):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.counter = 0

        layout = QVBoxLayout()

        self.label = QLabel("Start")
        button = QPushButton("DANGER!")
        button.pressed.connect(self.oh_no)

        layout.addWidget(self.label)
        layout.addWidget(button)

        w = QWidget()
        w.setLayout(layout)
        self.setCentralWidget(w)

        self.show()

        self.threadpool = QThreadPool()
        thread_count = self.threadpool.maxThreadCount()
        print(f"Multithreading with maximum {thread_count} threads")

        self.timer = QTimer()
        self.timer.setInterval(1000)
        self.timer.timeout.connect(self.recurring_timer)
        self.timer.start()

    def progress_fn(self, n):
        print(f"{n:.1f}% done")

    def execute_this_fn(self, progress_callback):
        for n in range(0, 5):
            time.sleep(1)
            progress_callback.emit(n * 100 / 4)

        return "Done."

    def print_output(self, s):
        print(s)

    def thread_complete(self):
        print("THREAD COMPLETE!")

    def oh_no(self):
        # Pass the function to execute
        worker = Worker(
            self.execute_this_fn
        )  # Any other args, kwargs are passed to the run function
        worker.signals.result.connect(self.print_output)
        worker.signals.finished.connect(self.thread_complete)
        worker.signals.progress.connect(self.progress_fn)
        # Execute
        self.threadpool.start(worker)

    def recurring_timer(self):
        self.counter += 1
        self.label.setText(f"Counter: {self.counter}")

app = QApplication([])
window = MainWindow()
app.exec()

Caveats

You may have spotted a slight flaw in this master plan—we are still using the event loop (and the GUI thread) to process our workers' output.

This isn't a problem when we're simply tracking progress, completion, or returning metadata. However, if you have workers that return large amounts of data — e.g. loading large files, performing complex analysis and needing (large) results, or querying databases — passing this data back through the GUI thread may cause performance problems and is best avoided.

Similarly, if your application uses a large number of threads and Python result handlers, you may come up against the limitations of the GIL. As mentioned previously, when using threads execution of Python code is limited to a single thread at a time. The Python code that handles signals from your threads can be blocked by your workers and the other way around. Since blocking your slot functions blocks the event loop, this can directly impact GUI responsiveness.

In these cases, it is often better to investigate using a pure Python thread pool (e.g. concurrent futures) to keep your processing and thread-event handling further isolated from your GUI. However, note that any Python GUI code can block other Python code unless it's in a separate process.

April 20, 2025 06:00 AM UTC

April 18, 2025


Eli Bendersky

Sparsely-gated Mixture Of Experts (MoE)

In transformer models, the attention block is typically followed by a feed forward layer (FF), which is a simple fully-connected NN with a hidden layer and nonlinearity. Here's the code for such a block that uses ReLU:

def feed_forward_relu(x, W1, W2):
    """Feed-forward layer with ReLU activation.

    Args:
        x: Input tensor (B, N, D).
        Wh: Weights for the hidden layer (D, DH).
        Wo: Weights for the output layer (DH, D).

    Returns:
        Output tensor (B, N, D).
    """
    x = x @ W1  # hidden layer (B, N, DH)
    x = np.maximum(0, x)  # ReLU activation (B, N, DH)
    x = x @ W2  # output layer (B, N, D)
    return x

This layer typically holds most of the weights in the transformer, because the hidden dimension (DH in this post, hidden_dim in some papers) is large - 4x the embedding depth D is common. Intuitively, this makes sense because this layer does the majority of the heavy lifting; while the attention block mixes the embeddings of tokens together to express their relationships to one another, the FF block does the actual "reasoning" on these tokens.

Transformer blocks are repeated dozens of times in a model, so the total size of these layers becomes problematic. One approach for improving efficiency that became very popular is called sparsely-gated mixture of experts (paper).

Mixture of Experts architecture (MoE)

The basic idea of MoE is this:

  • The large FF layer is split into a number (NEXP) of blocks called "experts". Each expert is still a FF layer. It takes a vector of size D and transforms it to another vector of size D.
  • There's an additional piece called "router" or "gate". This is just a fully-connected layer (D, NEXP) that takes a token and produces a score for each expert. The router is learned by the model, along with the experts themselves.
  • K experts with the highest scores are selected for each token, and the token is only fed through these experts.
  • The scores are also used to calculate a weighted average from the experts' outputs, eventually producing an answer of size D.

Here's a diagram for a single token, assuming NEXP=8 and TOPK=2 (the two highest scoring experts are selected for each token, out of a total of eight):

mixture of experts diagram

Notes:

  • Experts #1 and #5 are selected because the router produced the highest scores for them among all the experts. The input token is routed to these experts, but not to the others.
  • The output of each expert is element-wise multiplied by a corresponding weight, calculated from the scores of the selected experts using a softmax function (to ensure balanced weighting across multiple tokens and experts).
  • The weighted expert outputs are added up for the final output of the layer.

The key point to understand about this architecture is: the experts that were not among the top K for a token aren't used at all - the computation required to propagate the token through these experts is eschewed (both on the forward and backward passes).

This is the goal of the MoE architecture - we increase the overall model size, but keep the computational cost in check by only using a portion of the parameters for every single token. This is also reflected in the models' names; for example, the Mixtral model has size 8x7B; it has 8 experts, and it would be incorrect to just multiply the size of each expert by 8 because not all these parameters participate in the calculation of every token [1]. According to the Mixtral paper, the model only uses 13B active parameters for each token.

A summary of the idea behind MoE is:

MoE increases the model's capacity without proportionally increasing its computational cost.

Numpy implementation

Here's a well-commented implementation of the MoE layer using pure Numpy. First, some parameters:

# Parameters for a feed-forward layer with a fixed activation function.
@dataclass
class FFParams:
    Wh: np.ndarray
    Wo: np.ndarray


# Parameters for a Mixture of Experts (MoE) layer.
@dataclass
class MoEParams:
    # Embedding dimension of each token (a.k.a. model dimension, Dmodel)
    D: int

    # Hidden dimension in FF layers
    DH: int

    # Total number of experts
    NEXP: int

    # K in the top-k selection of top experts per token
    TOPK: int

    # List of experts: each expert is a forward layer with FFParams.
    ff_weights: List[FFParams]

    # Router weights: a linear layer (D, NEXP) that maps input to expert scores.
    router_weights: np.ndarray

And now the implementation. Note that it takes a general (B, N, D) input, assuming batch dimension D and sequence length N:

def moe(x: np.ndarray, params: MoEParams):
    """Mixture of Experts (MoE) layer.

    Args:
        x: Input tensor (B, N, D).
        params: MoEParams.

    Returns:
        Output tensor (B, N, D).
    """
    # Run input through router to get expert scores for each token.
    expert_scores = x @ params.router_weights  # (B, N, NEXP)

    # Select the top-k expert scores and their indices for each token.
    top_scores, top_experts = topk_lastdim(expert_scores, params.TOPK)  # (B, N, TOPK)

    # Apply softmax to the top scores to get weights that sum to 1.
    weights = softmax_lastdim(top_scores)  # (B, N, TOPK)

    out = np.zeros_like(x)
    for b in range(x.shape[0]):
        for n in range(x.shape[1]):
            # Unvectorized implementation: for each token in the batch and
            # sequence, select the top-k experts and apply them with the
            # calculated weights.
            for expert_idx, weight in zip(top_experts[b, n], weights[b, n]):
                expert = params.ff_weights[expert_idx]
                out[b, n] += weight * feed_forward_relu(x[b, n], expert.Wh, expert.Wo)

    return out

Calculating the experts themselves is not vectorized here - it is done token by token. MoE is inherently sparse: different tokens in the same sequence (and batch) may go through different sets of experts. Vectorizing this efficiently is tricky in general and depends on the HW we run the model on [2]. For a popular approach on GPUs, see the MegaBlocks paper from 2022. This remains an active area of research.

All that's left are some helper functions:

def topk_lastdim(x, k):
    """Get the top k elements and their indices.

    x is an arbitrary array with at least two dimensions. The returned
    array has the same shape as x, but its elements are the top k elements
    across the last dimension. The indices of the top k elements are also
    returned.
    """
    idx = np.argpartition(x, -k, axis=-1)[..., -k:]
    return np.take_along_axis(x, idx, axis=-1), idx

def softmax_lastdim(x):
    """Compute softmax across last dimension of x.

    x is an arbitrary array with at least two dimensions. The returned array has
    the same shape as x, but its elements sum up to 1 across the last dimension.
    """
    # Subtract the max for numerical stability
    ex = np.exp(x - np.max(x, axis=-1, keepdims=True))
    # Divide by sums across last dimension
    return ex / np.sum(ex, axis=-1, keepdims=True)

Additional considerations

A major area of focus with MoE architectures is load balancing among experts. Without special provisions, the model may learn to prefer certain experts over others and this leads to inefficient utilization of the model's weights. There are various approaches to tackle this, for example:

  • Adding noise to the top-k selection process to inject randomness
  • Defining a special loss function during training that encourages experts to receive a roughly equal number of training samples

Code

The full code for this post is available on GitHub.


[1]Another way to think about MoE is that each "expert" specializes in a certain area of the model's capability. For example, one expert would be good at math, another at prose, etc. This is a very rough approximation, though, because transformer models consist of dozens of repeating blocks, and all these different experts end up thoroughly intermixed as tokens flow through the entire model.
[2]

In the sparsely-gated mixture of experts paper, this is referred to as The Shrinking Batch Problem:

"In modern CPUs and GPUs, large batch sizes are necessary for computational efficiency, so as to amortize the overhead of parameter loads and updates. If the gating network chooses k out of n experts for each example, then for a batch of b examples, each expert receives a much smaller batch of approximately kb/n << b examples. This causes a naive MoE implementation to become very inefficient as the number of experts increases"

April 18, 2025 11:33 PM UTC


Trey Hunner

Which social network are we using for PyCon US this year?

Last year I updated my having a great first PyCon post to note that Mastodon would likely be more popular than Twitter at PyCon.

My guess was correct. During PyCon US 2024, Mastodon overtook Twitter for the most posts on the #PyConUS hashtag.

In the fall of 2024, Bluesky really took off. It currently seems like Bluesky is now a bit more popular than Mastodon for Python posts in general… but that doesn’t necessarily mean it will be the most popular social media network during the conference.

This year I’m guessing that Mastodon will still be the most popular network for #PyConUS posts, though I wouldn’t be very surprised if Bluesky took the lead instead.

What do the social networks think?

I decided to pose this question to the various social networks via a poll. I made sure to share these polls using both the #Python and #PyConUS hashtags for visibility’s sake.

The results?

In other words, LinkedIn leans more toward Bluesky being the leader than Mastodon, Twitter leans toward Bluesky, but Bluesky and Mastodon both lean toward Mastodon being the leader during PyCon.

I didn’t mention Threads or other platforms because they didn’t seem like real contenders given where the most active PyCon-attending Python-posting folks seem to hang out in 2025.

The actual results

Here are the actual results of the polls.

LinkedIn

Twitter

Bluesky

Mastodon

We’re still fragmented

Unfortunately, regardless of which network is the leader this year during PyCon US, we’re still going to be much more fragmented than we used to be. Twitter was the clear leader years ago and it very clearly isn’t anymore… at least not for PyCon US conference chatter.

I’ll be checking Mastodon and Bluesky and will post on at least Mastodon and possibly also Bluesky. I hope that other folks will also use one of these 2 social networks to organize dinners and gatherings! 🤞

Feel free to follow me on Mastodon and Bluesky during the conference.

Also let me know if you’d like to join my PyCon US starter pack on Bluesky. Lists on Mastodon require following and I prefer not to follow everyone I meet at PyCon so, unfortunately, I probably won’t have a Mastodon equivalent of this. 😢

I recommend checking the #PyConUS hashtag on both networks as well. Note that you can subscribe to hashtags on Mastodon which is pretty neat!

See you at PyCon!

If this will be your first PyCon US, I recommend signing up for both Mastodon and Bluesky and checking the #PyConUS hashtag during the conference.

Also, be sure to see my post on having a great first PyCon and see this additional tips.

April 18, 2025 06:45 PM UTC


Ned Batchelder

Regex affordances

Python regexes have a number of features that bring new power to text manipulation. I’m not talking about fancy matching features like negative look-behinds, but ways you can construct and use regexes. As a demonstration, I’ll show you some real code from a real project.

Coverage.py will expand environment variables in values read from its configuration files. It does this with a function called substitute_variables:

def substitute_variables(
    text: str,
    variables: dict[str, str],
) -> str:
    """
    Substitute ``${VAR}`` variables in `text`.

    Variables in the text can take a number of
    shell-inspired forms::

        $VAR
        ${VAR}
        ${VAR?}         strict: an error if no VAR.
        ${VAR-miss}     defaulted: "miss" if no VAR.
        $$              just a dollar sign.

    `variables` is a dictionary of variable values.

    Returns the resulting text with values substituted.

    """

Call it with a string and a dictionary, and it makes the substitutions:

>>> substitute_variables(
...     text="Look: $FOO ${BAR-default} $$",
...     variables={'FOO': 'Xyzzy'},
... )

'Look: Xyzzy default $'

We use a regex to pick apart the text:

dollar_pattern = r"""(?x)   # Verbose regex syntax
    \$                      # A dollar sign,
    (?:                     # then
        (?P<dollar> \$ ) |      # a dollar sign, or
        (?P<word1> \w+ ) |      # a plain word, or
        \{                      # a {-wrapped
            (?P<word2> \w+ )        # word,
            (?:                         # either
                (?P<strict> \? ) |      # strict or
                -(?P<defval> [^}]* )    # defaulted
            )?                      # maybe
        }
    )
    """

This isn’t a super-fancy regex: it doesn’t use advanced pattern matching. But there are some useful regex features at work here:

The verbose syntax in particular makes it easier to understand the regex. Compare to what it would look like in one line:

r"\$(?:(?P<dollar>\$)|(?P<word1>\w+)|\{(?P<word2>\w+)(?:(?P<strict>\?)|-(?P<defval>[^}]*))?})"

Once we have the regex, we can use re.sub() to replace the variables with their values:

re.sub(dollar_pattern, dollar_replace, text)

But we’re going to use another power feature of Python regexes: dollar_replace here isn’t a string, it’s a function! Each fragment the regex matches will be passed as a match object to our dollar_replace function. It returns a string which re.sub() uses as the replacement in the text:

def dollar_replace(match: re.Match[str]) -> str:
    """Called for each $replacement."""
    # Get the one group that matched.
    groups = match.group('dollar', 'word1', 'word2')
    word = next(g for g in groups if g)

    if word == "$":
        return "$"
    elif word in variables:
        return variables[word]
    elif match["strict"]:
        msg = f"Variable {word} is undefined: {text!r}"
        raise NameError(msg)
    else:
        return match["defval"]

First we use match.group(). Called with a number of names, it returns a tuple of what those named groups matched. They could be the matched text, or None if the group didn’t match anything.

The way our regex is written only one of those three groups will match, so the tuple will have one string and two None’s. To get the matched string, we use next() to find it. If the built-in any() returned the first true thing it found this code could be simpler, but it doesn’t so we have to do it this way.

Now we can check the value to decide on the replacement:

The final piece of the implementation is to use re.sub() and return the result:

return re.sub(dollar_pattern, dollar_replace, text)

Regexes are often criticized for being too opaque and esoteric. But done right, they can be very powerful and don’t have to be a burden. What we’ve done here is used simple pattern matching paired with useful API features to compactly write a useful transformation.

BTW, if you are interested, the real code is in coverage.py.

April 18, 2025 03:19 PM UTC


Django Weblog

Django Admin Theme Roundup 2025

One of Django’s most appreciated features is the built-in admin functionality. In fact, it was ranked as the most useful contrib app in the 2023 Django developer survey.

A screenshot of the 2023 developer survey results showing the most used contrib apps. The Admin is first (77%), followed by auth (74%), and then postgres quite a long way back (47%).

With a few lines of code, Django automatically generates an administrative interface to add, update, and edit objects in your database. While it's not meant to replace a full-featured frontend, the admin makes rapid prototyping possible and provides a lot of functionality out of the box.

However, the admin’s focus is not on a flashy user interface and some people have found it to be a little plain – some have even called it ugly! But fortunately, like all Django applications, the admin’s CSS and HTML templates can be overridden and tweaked. Here are a few projects which have done that, and are recently updated as of early 2025.

Chime in on the Django forum thread here with your favorite Django admin theme or if I missed any other options!

Note that these packages are listed in the order of the “easiest” integration to the hardest. However, the later libraries also tend to provide more features.

Dracula

A dark (and light) theme for the Django Admin based on the very popular Dracula which has themes for 400+ applications. This library is a quick win to give the admin a bit of pizazz without requiring much setup or changing the default admin functionality.

A screenshot of the Dracula admin theme.

Django Daisy

Django Daisy is a responsive admin built with DaisyUI and TailwindCSS. Application icons can be added by utilizing Font Awesome. Very minimal (and completely optional!) configuration.

A Daisy UI theme screenshot

django-jazzmin

A drop-in theme for the Django admin that utilises AdminLTE 3.2 & Bootstrap 5. All of the configuration is optional which means the installation is very straight-forward. However, it also includes the ability to create custom menus, convert all pop-ups to modals, and a slick UI customizer. django-jazzmin also includes a wide selection of built-in themes.

A Django Jazzmin screenshot

django-admin-kubi

Kubi applies a Bootstrap 5 facelift to the Django admin, but also adds Sass support for custom styling and Font Awesome icons. It includes a sidebar menu for easy navigation and support for some third-party packages, including django-modeltranslation, django-modeltrans, django-import-export, django-two-factor-auth, and django-colorfield.

A gif of the django-admin-kubi theme

django-jet-reboot

Modern template for the Django admin interface with improved functionality. It provides the ability to create a custom dashboard and modules. Also includes user-selectable themes through the UI.

A django-jet-reboot screenshot

django-semantic-admin

A responsive Django admin theme based on Semantic UI. Includes JavaScript datepicker and timepicker components. Includes support for django_filter and django-import-export.

A django-semantic-admin screenshot

Simple UI

A modern theme based on vue + element-ui which comes with 28 different themes. The documentation is originally in Chinese, but there is a translation in English.

A Simple UI admin theme screenshot

Grapelli

Grappelli is a grid-based alternative to the Django admin which provides a few nifty features such as a custom TinyMCE integration, customizable dashboard, and inline sortables which can be updated by drag and drop.

A Grapelli admin theme screenshot

django-admin-interface

A modern responsive flat admin interface which comes with optional themes that can be installed for Bootstrap, Foundation, and U.S. Web Design Standards, and customizable by the admin itself. Other features include replacing admin pop-ups with modals, accordions in the navigation bar to collapse applications, sticky filters and buttons to prevent them from scrolling off the screen, and a language switcher. Also includes support for django-ckeditor, django-dynamic-raw-id, django-json-widget, django-modeltranslation, django-rangefilter, django-streamfield, django-tabbed-admin, and sorl-thumbnail.

A gif of the django-admin-interface theme

Unfold

Unfold transforms the Django admin and is built with TailwindCSS. It includes custom widgets, pages, and admin sites. Also provides a language selector, conditional fields, custom filters, tabs, and additional features for actions. There are a lot of available settings and it is extremely customizable.

An Unfold admin theme screenshot

April 18, 2025 02:19 PM UTC


Real Python

The Real Python Podcast – Episode #247: Exploring DuckDB & Comparing Python Expressions vs Statements

Are you looking for a fast database that can handle large datasets in Python? What's the difference between a Python expression and a statement? Christopher Trudeau is back on the show this week, bringing another batch of PyCoder's Weekly articles and projects.


[ Improve Your Python With 🐍 Python Tricks 💌 – Get a short & sweet Python Trick delivered to your inbox every couple of days. >> Click here to learn more and see examples ]

April 18, 2025 12:00 PM UTC


Django Weblog

See you at PyCon US in Pittsburgh!

Django pony holo sticker

We’ll be at PyCon US 2025, and hope to see the Django community and all our Python friends there ❤️! We have been granted a community booth at the conference – come say hi in the Expo Hall during open hours. There may be Django stickers available to pick up!

Represent Django

For our Individual Members – if you’d like to help us showcase Django, we’re looking for help staffing the booth (members-only forum)! This is a great opportunity to give back to support our project – consider it!


David, Kátia, and Paolo smiling. They’re at the Django table at EuroPython, with Django resources on the table, laptops, and a Django banner behind them - featuring a pony design David, Kátia and Paolo representing the Foundation at EuroPython 2024.
Credit: Paolo Melchiorre (CC-BY-SA)

April 18, 2025 07:07 AM UTC


Daniel Roy Greenfeld

TIL: webbrowser library

Python comes with webbrowser, a library for opening webbrowsers to specific pages such as this one.

>>> import webbrowser
>>> url = 'https://daniel.feldroy.com/'
>>> webbrowser.open(url)

April 18, 2025 02:17 AM UTC

Using pyinstrument to profile FastHTML apps

FastHTML is built on Starlette, so we use Starlette's middleware tooling and then pass in the result. Just make sure you install pyinstrument.

WARNING: NOT FOR PRODUCTION ENVIRONMENTS Including a profiler like this in a production environment is dangerous. As it exposes infrastructure it is highly risky to include in any location where end users can access it.

"""WARNING: NOT FOR PRODUCTION ENVIRONMENTS"""
from fasthtml.common import *
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.middleware import Middleware
try:
    from pyinstrument import Profiler
except ImportError:
    raise ImportError('Please install pyinstrument')

class ProfileMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        profiling = request.query_params.get("profile", False)
        if profiling:
            profiler = Profiler()
            profiler.start()
            response = await call_next(request)            
            profiler.stop()        
            return HTMLResponse(profiler.output_html())
        return await call_next(request)

app, rt = fast_app(middleware=(Middleware(ProfileMiddleware)))

@rt("/")
def get():
    return Titled("FastHTML", P("Hello, world!"))

serve()

To invoke, make any request to your application with the GET parameter profile=1 and it will print the HTML result from pyinstrument.

April 18, 2025 02:17 AM UTC

April 17, 2025


Glyph Lefkowitz

Stop Writing `__init__` Methods

The History

Before dataclasses were added to Python in version 3.7 — in June of 2018 — the __init__ special method had an important use. If you had a class representing a data structure — for example a 2DCoordinate, with x and y attributes — you would want to be able to construct it as 2DCoordinate(x=1, y=2), which would require you to add an __init__ method with x and y parameters.

The other options available at the time all had pretty bad problems:

  1. You could remove 2DCoordinate from your public API and instead expose a make_2d_coordinate function and make it non-importable, but then how would you document your return or parameter types?
  2. You could document the x and y attributes and make the user assign each one themselves, but then 2DCoordinate() would return an invalid object.
  3. You could default your coordinates to 0 with class attributes, and while that would fix the problem with option 2, this would now require all 2DCoordinate objects to be not just mutable, but mutated at every call site.
  4. You could fix the problems with option 1 by adding a new abstract class that you could expose in your public API, but this would explode the complexity of every new public class, no matter how simple. To make matters worse, typing.Protocol didn’t even arrive until Python 3.8, so, in the pre-3.7 world this would condemn you to using concrete inheritance and declaring multiple classes even for the most basic data structure imaginable.

Also, an __init__ method that does nothing but assign a few attributes doesn’t have any significant problems, so it is an obvious choice in this case. Given all the problems that I just described with the alternatives, it makes sense that it became the obvious default choice, in most cases.

However, by accepting “define a custom __init__” as the default way to allow users to create your objects, we make a habit of beginning every class with a pile of arbitrary code that gets executed every time it is instantiated.

Wherever there is arbitrary code, there are arbitrary problems.

The Problems

Let’s consider a data structure more complex than one that simply holds a couple of attributes. We will create one that represents a reference to some I/O in the external world: a FileReader.

Of course Python has its own open-file object abstraction, but I will be ignoring that for the purposes of the example.

Let’s assume a world where we have the following functions, in an imaginary fileio module:

Our hypothetical fileio.open returns an integer representing a file descriptor1, fileio.read allows us to read length bytes from an open file descriptor, and fileio.close closes that file descriptor, invalidating it for future use.

With the habit that we have built from writing thousands of __init__ methods, we might want to write our FileReader class like this:

1
2
3
4
5
6
7
class FileReader:
    def __init__(self, path: str) -> None:
        self._fd = fileio.open(path)
    def read(self, length: int) -> bytes:
        return fileio.read(self._fd, length)
    def close(self) -> None:
        fileio.close(self._fd)

For our initial use-case, this is fine. Client code creates a FileReader by doing something like FileReader("./config.json"), which always creates a FileReader that maintains its file descriptor int internally as private state. This is as it should be; we don’t want user code to see or mess with _fd, as that might violate FileReader’s invariants. All the necessary work to construct a valid FileReader — i.e. the call to open — is always taken care of for you by FileReader.__init__.

However, additional requirements will creep in, and as they do, FileReader.__init__ becomes increasingly awkward.

Initially we only care about fileio.open, but later, we may have to deal with a library that has its own reasons for managing the call to fileio.open by itself, and wants to give us an int that we use as our _fd, we now have to resort to weird workarounds like:

1
2
3
4
def reader_from_fd(fd: int) -> FileReader:
    fr = object.__new__(FileReader)
    fr._fd = fd
    return fr

Now, all those nice properties that we got from trying to force object construction to give us a valid object are gone. reader_from_fd’s type signature, which takes a plain int, has no way of even suggesting to client code how to ensure that it has passed in the right kind of int.

Testing is much more of a hassle, because we have to patch in our own copy of fileio.open any time we want an instance of a FileReader in a test without doing any real-life file I/O, even if we could (for example) share a single file descriptor among many FileReader s for testing purposes.

All of this also assumes a fileio.open that is synchronous. Although for literal file I/O this is more of a hypothetical concern, there are many types of networked resource which are really only available via an asynchronous (and thus: potentially slow, potentially error-prone) API. If you’ve ever found yourself wanting to type async def __init__(self): ... then you have seen this limitation in practice.

Comprehensively describing all the possible problems with this approach would end up being a book-length treatise on a philosophy of object oriented design, so I will sum up by saying that the cause of all these problems is the same: we are inextricably linking the act of creating a data structure with whatever side-effects are most often associated with that data structure. If they are “often” associated with it, then by definition they are not “always” associated with it, and all the cases where they aren’t associated become unweildy and potentially broken.

Defining an __init__ is an anti-pattern, and we need a replacement for it.

The Solutions

I believe this tripartite assemblage of design techniques will address the problems raised above:

Using dataclass attributes to create an __init__ for you

To begin, let’s refactor FileReader into a dataclass. This does get us an __init__ method, but it won’t be one an arbitrary one we define ourselves; it will get the useful constraint enforced on it that it will just assign attributes.

1
2
3
4
5
6
7
@dataclass
class FileReader:
    _fd: int
    def read(self, length: int) -> bytes:
        return fileio.read(self._fd, length)
    def close(self) -> None:
        fileio.close(self._fd)

Except... oops. In fixing the problems that we created with our custom __init__ that calls fileio.open, we have re-introduced several problems that it solved:

  1. We have removed all the convenience of FileReader("path"). Now the user needs to import the low-level fileio.open again, making the most common type of construction both more verbose and less discoverable; if we want users to know how to build a FileReader in a practical scenario, we will have to add something in our documentation to point at a separate module entirely.
  2. There’s no enforcement of the validity of _fd as a file descriptor; it’s just some integer, which the user could easily pass an incorrect instance of, with no error.

In isolation, dataclass by itself can’t solve all our problems, so let’s add in the second technique.

Using classmethod factories to create objects

We don’t want to require any additional imports, or require users to go looking at any other modules — or indeed anything other than FileReader itself — to figure out how to create a FileReader for its intended usage.

Luckily we have a tool that can easily address all of these concerns at once: @classmethod. Let’s define a FileReader.open class method:

1
2
3
4
5
6
7
from typing import Self
@dataclass
class FileReader:
    _fd: int
    @classmethod
    def open(cls, path: str) -> Self:
        return cls(fileio.open(path))

Now, your callers can replace FileReader("path") with FileReader.open("path"), and get all the same benefits.

Additionally, if we needed to await fileio.open(...), and thus we needed its signature to be @classmethod async def open, we are freed from the constraint of __init__ as a special method. There is nothing that would prevent a @classmethod from being async, or indeed, from having any other modification to its return value, such as returning a tuple of related values rather than just the object being constructed.

Using NewType to address object validity

Next, let’s address the slightly trickier issue of enforcing object validity.

Our type signature calls this thing an int, and indeed, that is unfortunately what the lower-level fileio.open gives us, and that’s beyond our control. But for our own purposes, we can be more precise in our definitions, using NewType:

1
2
from typing import NewType
FileDescriptor = NewType("FileDescriptor", int)

There are a few different ways to address the underlying library, but for the sake of brevity and to illustrate that this can be done with zero run-time overhead, let’s just insist to Mypy that we have versions of fileio.open, fileio.read, and fileio.write which actually already take FileDescriptor integers rather than regular ones.

1
2
3
4
from typing import Callable
_open: Callable[[str], FileDescriptor] = fileio.open  # type:ignore[assignment]
_read: Callable[[FileDescriptor, int], bytes] = fileio.read
_close: Callable[[FileDescriptor], None] = fileio.close

We do of course have to slightly adjust FileReader, too, but the changes are very small. Putting it all together, we get:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from typing import Self
@dataclass
class FileReader:
    _fd: FileDescriptor
    @classmethod
    def open(cls, path: str) -> Self:
        return cls(_open(path))
    def read(self, length: int) -> bytes:
        return _read(self._fd, length)
    def close(self) -> None:
        _close(self._fd)

Note that the main technique here is not necessarily using NewType specifically, but rather aligning an instance’s property of “has all attributes set” as closely as possible with an instance’s property of “fully valid instance of its class”; NewType is just a handy tool to enforce any necessary constraints on the places where you need to use a primitive type like int, str or bytes.

In Summary - The New Best Practice

From now on, when you’re defining a new Python class:

If you define all your classes this way, you will get all the benefits of a custom __init__ method:

Along with some nice new benefits:

Before dataclasses, it was always a bit weird that such a basic feature of the Python language — giving data to a data structure to make it valid — required overriding a method with 4 underscores in its name. __init__ stuck out like a sore thumb. Other such methods like __add__ or even __repr__ were inherently customizing esoteric attributes of classes.

For many years now, that historical language wart has been resolved. @dataclass, @classmethod, and NewType give you everything you need to build classes which are convenient, idiomatic, flexible, testable, and robust.


Acknowledgments

Thank you to my patrons who are supporting my writing on this blog. If you like what you’ve read here and you’d like to read more of it, or you’d like to support my various open-source endeavors, you can support my work as a sponsor! I am also available for consulting work if you think your organization could benefit from expertise on topics like “but what is a ‘class’, really?”.


  1. If you aren’t already familiar, a “file descriptor” is an integer which has meaning only within your program; you tell the operating system to open a file, it says “I have opened file 7 for you”, and then whenever you refer to “7” it is that file, until you close(7)

  2. Or an attrs class, if you’re nasty. 

  3. Unless you have a really good reason to, of course. Backwards compatibility, or compatibility with another library, might be good reasons to do that. Or certain types of data-consistency validation which cannot be expressed within the type system. The most common example of these would be a class that requires consistency between two different fields, such as a “range” object where start must always be less than end. There are always exceptions to these types of rules. Still, it’s pretty much never a good idea to do any I/O in __init__, and nearly all of the remaining stuff that may sometimes be a good idea in edge-cases can be achieved with a __post_init__ rather than writing a literal __init__

April 17, 2025 10:35 PM UTC


Everyday Superpowers

Preventing painful coupling

This is the second entry in a five-part series about event sourcing:

  1. Why I Finally Embraced Event Sourcing—And Why You Should Too
  2. What is event sourcing and why you should care
  3. Preventing painful coupling (this page)
  4. Event-driven microservice in a monolith
  5. Get started with event sourcing today

In the first article, I introduced this series with the spark that made me so happy to finally try event sourcing.

In the last article, I showed you what event sourcing is.

In this article, I will show you how and why I’ve completely changed the way I code.

Editing software is hard

Have you ever worked on a project where adding a simple feature felt like defusing a bomb—one wrong move, and the whole system might break? You’re not alone. In 1979, the industry was reeling from how hard it was to modify software.

I came across an article from the Association for Computing Machinery 1979 that talked about a crisis in the software community: how expensive it was to modify existing software:

quote
Another component of the software crises is less commonly recognized, but, in fact, is often more costly… namely, the extreme difficulty encountered in attempting to modify an existing program… The cost of such evolution is almost never measured, but, in at least one case, it exceeded the original development cost by a factor of 100.
attribution
Terry Winograd, Beyond Programming Languages, Communications of the ACM, July 1979 Volume 22 Number 7

Could you imagine asking a team of developers to adjust a program and having to pay 100x what you paid them to write it in the first place?

I’ve been on projects where I can start to see how this happened, but it’s crazy this is where our industry came from.

This was untenable, and many smart people discussed how to improve software writing. A lot of progress was made in the 1980s and 1990s.

One of the main discoveries was that unwanted coupling made software more complex and more challenging to refactor. This was reflected in several pieces of the time, including Martin Fowler’s curation of refactoring methods in his seminal book published in 1999 and Robert Martin's curation of the SOLID principles in the early 2000s.

Despite these discoveries, the challenges of modifying software haven’t disappeared. Instead, they’ve taken on new forms, often exacerbated by the industry's rapid growth.

In the 2000s, the number of developers grew explosively, and the percentage of people who knew how to teach these concepts became a tiny percentage of the population. As a result, most developers have only heard of these techniques and patterns, like SOLID, but few truly understand or practice them[oops]{For example, I've noted a trend of programing resumes announcing they are skilled in OOPS.}.

Most developers are not familiar with or practiced in controlling complexity or refactoring to reduce unwanted coupling.

A real example

I imagine the 100x penalty for adding features to projects is a thing of the past, but I know of a project that was in trouble in a similar way.

This team struggled with slow development cycles because every feature required each developer to make changes across multiple layers, including database schema, API, UI state, and frontend components. Merge conflicts were constant, and progress slowed to a crawl. In this case, they ultimately found success by reducing complexity and shifting their React front end to HTMX.

But switching frameworks isn’t always an option. A deeper change is needed: a way to structure software that naturally reduces coupling.

One solution is to continue to educate people about refactoring, software patterns, and good habits—and define times to practice these new skills. I've been impressed with Emily Bache’s Samman Technical Coaching Society approach, which truly gives developers the space to learn and practice these skills on their codebase as a team.

I believe the Samman Society and other like-minded technical coaches are making an impact in the industry. But they can only work with so many teams. What if we could fundamentally change how we structure our applications to reduce this complexity at its source?

In the last few months, I’ve been convinced that there’s another option: changing how we write apps to reduce coupling.

What is coupling?

The excellent book Cosmic Python (aka Architecture Patterns in Python) explains coupling in this way:

quote
In a large-scale system, we become constrained by the decisions made elsewhere in the system. When we’re unable to change component A for fear of breaking component B, we say that the components have become coupled. Locally, coupling is a good thing: it’s a sign that our code is working together, each component supporting the others, all of them fitting in place like the gears of a watch. In jargon, we say this works when there is high _cohesion_ between the coupled elements. Globally, coupling is a nuisance: it increases the risk and the cost of changing our code, sometimes to the point where we feel unable to make any changes at all. This is the problem with the Ball of Mud pattern: as the application grows, if we’re unable to prevent coupling between elements that have no cohesion, that coupling increases superlinearly until we are no longer able to effectively change our systems.
attribution
Cosmic Python

Unwanted coupling is easy to accidentally introduce into your application. Most of the time, we couple things together because it seems like a good idea at the time. We don’t know if it’s unwanted until some time in the future.

One way to tell if your code has unwanted coupling is to test it. If the test case is easy to make, you probably have an healthy level of coupling. Testing is the canary in the coal mine for coupling.

Before I understood how to deal with coupling, I watched it take over on one project, where a function grew day after day, as we added new features to it, trying to meet a deadline. It started out processing some data. Then, we needed to add a feature to notify people when it finished. That was followed by some additional processing requirements and API calls to augment data. We also decided to add code to notify us if something failed in some of these steps.

What started out as a relatively simple function turned into over 100 lines of code with several different workflows braided together. I could tell something was off by the number of parameters we had and how hard it was to test, but I had no idea how to unwind it.

How to reduce coupling

So how do we reduce unwanted coupling? I mentioned above that one way is to learn how to make great software, how to refactor it, and practice it many times outside of normal project work.

In the last year, I’ve come across articles, podcasts and videos that has convinced me to change the way I work to help keep coupling low.

There are three key practices work together to keep complexity under control while enabling flexible and scalable solutions: event modeling, vertical slice architecture, and event sourcing.

Let’s look at these one at a time.

Event modeling

Event modeling is a documentation method that shows how data flows through a system and the domain constraints in play.

It might seem weird to start off with a documentation method, but by starting with getting the developers on the same page as the business stakeholders, you eliminate many surprises, and get a few more benefits.

Event modeling grew from event storming, a documentation workshop technique that extracts complex business domains from stakeholders and transforms them into domain-driven design concepts that developers can use.

As a user of event storming, Adam Dymitruk noticed that business stakeholders were left behind During event storming sessions once the conversation became more about bounded contexts and other technical implementation details.

After many iterations, he unveiled event modeling in 2020. It’s a documentation method that removes much of the implementation detail. Instead, it highlights the most important thing: data, and how it enters, gets used by, and exits the system, as well as domain rules that affect it, all in a simple visual language with four patterns to master.

Additionally, the application’s lifecycle is segmented into individual slices representing one process step. Each slice maps to a slice in the vertical slice architecture that we’ll discuss below.

Below is an example event modeling diagram showing part of the lifecycle of a shopping cart application. The blue boxes are commands. They represent an intent to change the state of the system. This is where domain rules are enforced, and a command either generates an event or an exception.

An event modeling diagram that shows a vertical slice called

By creating an event, this changes the state of the system. Other slices listen to events and populate views (the green boxes). These can be caches, database tables, CSV files, or anything that formats data in a way that will be consumed.

Having done a couple event modeling diagrams, what I like about them are:

  • Being able to understand the domain more as the diagram comes together
  • Uncovering needs early in an project, instead of late in the process when you’re implementing the feature under time pressure.
  • Making it easier for everyone—developers and stakeholders—to stay aligned

Vertical slice architecture

I’ve been conditioned by the majority of my projects and more senior teammates to expect the projects I work on to be organized with similar code together. The views, services, and models would be grouped by type and placed into a folder bearing its name.

Contrasting this practice, vertical slice architecture organized code by feature. For example, if we’re working on a shopping cart application. A vertically-sliced project would have folders with names like `add_item`, `checkout`, `remove_item`, and `list_items`.

In each of these folders, you would find the UI, data access code, and business logic related to those features.

I’ve found this to be a great way to work. Instead of hunting around the project or tracing through the code from the endpoint down into every layer that gets called, I can just open the folder for the feature and have a very good idea what file to open.

Adam Dymitruk’s group has seen great productivity increases from adopting vertical slice architecture. He’s mentioned having ten developers working independently on slices and not having a single merge conflict.

When all the code for a feature is in one folder, suddenly adding a feature to a “normal” project feels like performing shotgun surgery.

I like a lot about object-oriented programming, especially with an imperative shell and a functional core. With well-designed OOP software, you can trust that by editing one part of the code, you won’t introduce a bug in another part. It seems I’ve never worked on well-designed OOP code—or I’m just that good at finding the weak spots of our designs.

Putting it all together with event sourcing

With an event modeling diagram and the direction to implement a specific vertical slice, a developer can concentrate on what data is entering their slice (whether from the user, an external provider, a read model, or by subscribing to events), handle the business case, and then publish the events expected from their slice or populate a database table as needed.

A event-sourced project with vertical slice architecture, each slice is isolated from the other, communicating through predefined events and views. It’s a really cool pattern.

Caveat

It's important to note that you could reduce coupling without event sourcing. Communicating well with event modeling diagrams and an even-driven, vertically sliced architecture, you can have similar gains.

The three practices together have proved powerful for me. In the next article, I’ll show you more detail how, while also having the benefits of an event-driven microservice and a monolith.


Read more...

April 17, 2025 04:38 PM UTC


PyCon

Community Organizer Summit and PSF & Meetups Discussion

Calling all community organizers! We want to sit down together and share what’s going well, what new tricks we’ve learned, and what’s been challenging in the area of organizing Python and Python adjacent communities. So if you’re attending PyCon US this year and you run a local Python meet-up, help organize a regional conference, or facilitate peer learning through workshops or hack nights, we hope you will join us at the Community Organizer Summit and the PSF & Meetups Discussion.

Community Organizer Summit:
10:30 am - 1 pm on Saturday, May 17th
PSF & Meetups Discussion: 1pm - 2pm on Sunday, May 18th

Saturday’s Community Organizer Summit kicks off with a keynote and then three short “Show & Tell” slots, for which the CFP is open now. After the more formal sessions, we’ll break into unconference discussions, so be sure to bring your ideas. If you’re not able to make it to Pittsburgh but you want to join the ongoing conversation about conference organizing, please consider joining the bi-monthly Conference Chats on Discord. Thanks so much to Mason Egger, Heather White and Kevin Horn for organizing the Summit!

Sunday’s PSF & Meetups Discussion is organized by the PSF board and is intended to kick off a more collaborative and purposeful era of Python community organizer knowledge sharing! Community organizer meetups have been happening casually at PyCon US and (more famously) at EuroPython for a few years now. We’re looking to make this year’s PyCon US session into a jumping off point for a global conversation that we hope to see happen regularly in lots of places around the world, as well as online. If you’ve got ideas or want to be involved in this conversation, please email community-organizing@pyfound.org

This is not just for experienced organizers! Looking to start a community and want to learn from a group of awesome people? Come join us!

April 17, 2025 10:13 AM UTC


Django Weblog

Run your tests against Django&#x27;s main!

This is the blog version of a talk! If you prefer, watch the recording on YouTube:

Sage presenting, holding a microphone. To his right his title slide states 'Run your tests against Django’s main!', 'Django London Meetup', 'Thursday, 13 February 2025' Sage Abdullah - Run your tests against Django’s main! - Django London Meetup


Django is known for its stability. The framework makes a strong commitment to API stability and forwards-compatibility, ensuring that developers can rely on it for building long-term, maintainable projects. A key aspect of this commitment involves extensive testing and structured releases—an area where testing by Django users can significantly enhance Django’s reliability. Here’s a closer look at how this works, and how you can contribute 🤝.

How Django stays stable

Django's stability is upheld through rigorous testing. As of Django 5.2, there are more than 18,000 tests run against all officially supported database backends, Python versions, and operating systems. Additionally, Django follows a well-structured deprecation policy, ensuring that public APIs are deprecated over at least two feature releases before being removed.

The feature release schedule is systematic and structured:

  1. Active development happens on the main branch.
  2. A stable branch (for example stable/5.2.x) is forked when an alpha release is made.
  3. After a month, the beta release follows, where only release-blocking bug fixes are allowed.
  4. A month later, a release candidate (RC) is published, marking the translation string freeze.
  5. If no critical bugs are found, the final release is published after a couple of weeks.

With this structured approach, Django ensures that releases are stable. However, bugs can and do occasionally slip through the cracks!

Catching issues early

The best time to catch issues is before they reach the final release. Ideally, potential bugs should be caught at the pull request stage, but keeping up with all changes is challenging. This is where the community can help—by running their tests with Django's main branch.

How you can help

You can set up your test suite to run with Django's main branch in your tests pipeline. Here's an example using GitHub Actions, a popular Continuous Integration platform:

test:
    runs-on: ubuntu-latest
    continue-on-error: ${{ matrix.experimental }}
    strategy:
      matrix:
        include:
          - python: "3.13"
            django: "git+https://github.com/django/django.git@main#egg=Django"
            experimental: true
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: ${{ matrix.python }}
      - run: pip install -r requirements.txt
      - if: ${{ matrix.experimental }}
        run: pip install "${{ matrix.django }}"
      - run: python -Wd manage.py test
  
:root[data-theme="light"] .highlight .l { color: var(--code-fg); }

If you maintain a Django package, you likely already test with multiple Django versions. Adding the main branch ensures that your project stays ahead of potential breaking changes.

Why this helps you

Running tests with Django main allows you to detect when changes in Django break your project. Sometimes, this happens due to the removal of internal APIs (that were never intended for reuse outside Django 🙈). If your tests fail, you can identify which commit caused the issue and adjust your code accordingly.

For example, on the Wagtail CMS project, recently caught an issue when an internal class, SubqueryConstraint, was removed from Django. This wasn't a bug in Django—it was the removal of an internal workaround that was no longer needed. If your project relies on internal APIs, testing against main is crucial to avoid surprises.

Why this helps Django

Testing with main doesn't just help your project—it helps Django too. Sometimes, your tests may fail due to legitimate regressions in Django that its test suite doesn't cover. Reporting these issues ensures they get fixed before the next release.

For example, just two days before Django 5.2 alpha was released, Wagtail tests on main helped detect a bug where calling .full_clean() on a child model in a multi-table inheritance setup triggered an unintended database query. This regression was promptly fixed, ensuring a smoother release for all users.

Take action: test against Django's main and report issues

By running your tests against Django's main branch and reporting any issues you find, you contribute to a more stable framework for everyone. It's a small step that makes a big impact.

So, take a few minutes today to update your automated tests setup and help keep Django as reliable as ever!

April 17, 2025 09:00 AM UTC


EuroPython

Tickets, Sessions, & Call for 2026 Host Venues!

Hello, Pythonistas! 🐍

Tickets are on sale now! You can get your tickets at europython.eu/tickets

📣 Programme

Keynote Speakers

We are excited to announce three more keynote speakers!

Sebastián Ramírez 

Sebastián is the creator and maintainer of several widely used open-source Python libraries, including FastAPI, Typer, SQLModel, and Asyncer. His work focuses on tools for building web APIs, command-line interfaces, and working with data models in Python.

He has collaborated with teams and companies across Latin America, Europe, the Middle East, and the United States, working on systems involving APIs, distributed computing, and machine learning.

Currently, Sebastián works full-time on the development and maintenance of his open-source projects.

alt

Brett Cannon 

Brett has been a Python core developer since 2003 and has contributed to just about every corner of the language. He’s one of the most prolific PEP authors, the creator of importlib, and played a major role in the Python 2 -> 3 transition and the creation of pyproject.toml.

He served on the inaugural Python Steering Council and remained on it for five years, guiding the language through some of its most critical transitions.

But beyond code, Brett is known for his love of the Python community — “I came for the language, but I stayed for the community”. He’s a former PSF board member and a recipient of the Frank Willison Award.

alt

Petr Baudiš 

Petr Baudiš is a co-founder and CTO of Rossum, a SaaS deep tech scaleup that focuses on automating document-based B2B transactional communication. He co-invented Rossum&aposs unique AI engine and currently oversees Rossum&aposs R&D and product engineering. In the past, Petr led multiple AI research and deep learning projects in academia as well as in commercial environments. His work was used by companies like Google DeepMind, Novartis, Seznam.cz and countless others. 

Petr&aposs past exploits include building one of the key predecessors of AlphaGo, working with Linus Torvalds as one of the founding developers of Git, and leading many other open-source projects.

alt

Sessions

Our session list is live! 🎉 https://ep2025.europython.eu/sessions/ 

🎤 Speaker Mentorship

Accepted Proposals

We’re excited to share that 44 proposals were submitted by the mentees and 8 have been accepted! We’re incredibly proud of everyone who took the time to submit a proposal—your creativity, passion, and dedication are what make this community so special. Thank you for helping us shape an inspiring and diverse conference program. We can’t wait to see you in Prague! 🐍💛

First Time Speakers’ Workshop

Whether you&aposre preparing to present at EuroPython 2025 or any other conference or your talk proposal wasn&apost accepted this year, we warmly invite you to our First-Time Speakers&apos Workshop. Join us online via ZOOM on June 4, 2025, at 18:00 CEST to gain essential presentation skills, practical tips, and valuable insights from experienced speakers. This supportive session also aims to inspire you and strengthen your future conference proposals. You can find out more details here: https://ep2025.europython.eu/programme/mentorship/

Register now: https://forms.gle/T8rc73sbyu3KbLNKA

We look forward to supporting your speaking journey!

💰 Sponsorship

If you&aposre passionate about supporting EuroPython and helping us make the event accessible to all, consider becoming a sponsor or asking your employer to join us in this effort.

By sponsoring EuroPython, you’re not just backing an event – you&aposre gaining highly targeted visibility and the chance to present your company or personal brand to one of the largest and most diverse Python communities in Europe and beyond!

We offer a range of sponsorship tiers, some with limited slots available. Along with our main packages, there are optional add-ons and optional extras.

👉 More information at: https://ep2025.europython.eu/sponsorship/sponsor/ 

👉 Contact us at sponsors@europython.eu

EuroPython Society

🏰 Call for Venues - EuroPython 2026

Are you a community builder who dreams of bringing EuroPython to your city? The Call for Venues for EuroPython 2026 is now open!

If you want to propose a location on behalf of your community, please fill in the following form: https://forms.gle/ZGQA7WhTW4gc53MD6

📊 Board Report 

The EuroPython conference wouldn’t be what it is without the incredible volunteers who make it all happen. 💞 Behind the scenes, there’s also the EuroPython Society—a volunteer-led non-profit that manages the fiscal and legal aspects of running the conference, oversees its organization, and works on a few smaller projects like the grants programme. To keep everyone in the loop and promote transparency, the Board is sharing regular updates on what we’re working on.

The March board report is ready: https://www.europython-society.org/board-report-for-march/ 

💞 Community Outreach

Prague Python Meetup Pyvo 

A big thank you to Jakub Červinka for giving a lightning talk about EuroPython at the Prague Pyvo meetup!

alt

PyCon US

We’re happy to share that EuroPython will have a booth at PyCon US this year! If you’re attending, come over to say hi—pick up some stickers, meet members of our community, and learn more about our initiatives and upcoming plans.

alt

PyCon DE & PyData, DjangoCon Europe 

We’re proud to be supporting both events this year! You’ll find EuroPython stickers in the community area—stop by to pick some up and connect with fellow community members while you&aposre there.

Brno Python Pizza 

We recently supported Brno Python Pizza, which took place this February in Brno. The organizers shared a lovely write-up about the event, which you can read here: https://www.europython-society.org/brno-python-pizza-great-things-come-in-threes/ 

💞Upcoming Events in the Python Community

🐣 See You All Next Month 

And in the meantime, follow us on social media: 

April 17, 2025 05:00 AM UTC

April 16, 2025


Real Python

How to Exit Loops Early With the Python Break Keyword

In Python, the break statement lets you exit a loop prematurely, transferring control to the code that follows the loop. This tutorial guides you through using break in both for and while loops. You’ll also briefly explore the continue keyword, which complements break by skipping the current loop iteration.

By the end of this tutorial, you’ll understand that:

  • A break in Python is a keyword that lets you exit a loop immediately, stopping further iterations.
  • Using break outside of loops doesn’t make sense because it’s specifically designed to exit loops early.
  • The break doesn’t exit all loops, only the innermost loop that contains it.

To explore the use of break in Python, you’ll determine if a student needs tutoring based on the number of failed test scores. Then, you’ll print out a given number of test scores and calculate how many students failed at least one test.

You’ll also take a brief detour from this main scenario to examine how you can use break statements to accept and process user input, using a number-guessing game.

Get Your Code: Click here to download the free sample code that shows you how to exit loops early with the Python break keyword.

Take the Quiz: Test your knowledge with our interactive “How to Exit Loops Early With the Python Break Keyword” quiz. You’ll receive a score upon completion to help you track your learning progress:


Interactive Quiz

How to Exit Loops Early With the Python Break Keyword

In this quiz, you'll test your understanding of the Python break statement. This keyword allows you to exit a loop prematurely, transferring control to the code that follows the loop.

Introducing the break Statement

Before proceeding to the main examples, here’s a basic explanation of what the break statement is and what it does. It’s a Python keyword that, when used in a loop, immediately exits the loop and transfers control to the code that would normally run after the loop’s standard conclusion.

You can see the basics of the break statement in a simple example. The following code demonstrates a loop that prints numbers within a range until the next number is greater than 5:

Python
>>> for number in range(10):
...     if number > 5:
...         break
...     print(number)
... 
0
1
2
3
4
5
Copied!

This short code example consists of a for loop that iterates through a range of numbers from 0 to 9. It prints out each number, but when the next number is greater than 5, a break statement terminates the loop early. So, this code will print the numbers from 0 to 5, and then the loop will end.

As break statements end loops early, it wouldn’t make sense for you to use them in any context that doesn’t involve a loop. In fact, Python will raise a SyntaxError if you try to use a break statement outside of a loop.

A key benefit of using break statements is that you can prevent unnecessary loop iterations by exiting early when appropriate. You’ll see this in action in the next section.

Breaking Out of a Loop With a Set Number of Iterations

Imagine you’re a teacher who evaluates the scores of your students. Based on the scores, you want to determine how many tests each student has failed. The following example demonstrates how you might accomplish this task using a for loop to iterate through the students’ test scores:

Python
>>> scores = [90, 30, 50, 70, 85, 35]

>>> num_failed_scores = 0
>>> failed_score = 60
>>> for score in scores:
...     if score < failed_score:
...         num_failed_scores += 1
...
>>> print(f"Number of failed tests: {num_failed_scores}")
Number of failed tests: 3
Copied!

Read the full article at https://realpython.com/python-break/ »


[ Improve Your Python With 🐍 Python Tricks 💌 – Get a short & sweet Python Trick delivered to your inbox every couple of days. >> Click here to learn more and see examples ]

April 16, 2025 02:00 PM UTC


PyCharm

PyCharm 2025.1: Unified PyCharm, Free AI Tier, Junie Release, and More!

PyCharm 2025.1 brings major updates to improve your development experience.

PyCharm is now a unified product, combining PyCharm Professional and Community Edition. Version 2025.1 also brings a free AI tier, the public release of Junie, the launch of Cadence, significant Jupyter enhancements, support for Hatch, Data Wrangler, and many other improvements.

Get the latest version from our download page or update through our free Toolbox App

Read this blog post to learn more about the updates. 

Prefer video? Get an overview of the major news and improvements in this video:

PyCharm is now one powerful, unified product!

PyCharm is now one powerful, unified product! Its core functionality, including Jupyter Notebook support, will be free, and a Pro subscription with additional features will be available. 

Starting with the 2025.1 release, every user will get instant access to a free one-month Pro trial, so you’ll be able to access all of PyCharm’s advanced features right away. After the trial, you can choose whether to continue with a Pro subscription or keep using the core features for free. Learn more about the change in this blog post.

Junie – your personal coding agent Pro

Junie, the coding agent by JetBrains, is now available in PyCharm via JetBrains AI. Junie autonomously plans, writes, refines, and tests code to make your development experience smooth, efficient, and enjoyable. It handles tedious tasks like restructuring code, creating tests, and implementing refinements, so you can focus on bigger challenges and innovation. 

PyCharm goes AI

JetBrains AI has received a major upgrade, bringing both AI Assistant and the coding agent Junie under a single subscription. With this release, all JetBrains AI features are accessible for free in PyCharm Pro, with unlimited use for some, such as code completion and local model support, and limited credit-based access to others. 

PyCharm goes AI

We’re also introducing a new subscription system that makes it easy to scale up as needed with the AI Pro and AI Ultimate tiers. Other highlights of this release include smarter completion, advanced context awareness, and support for Claude 3.7 Sonnet and Gemini 2.0 Flash. Head to the What’s New page to learn more about the latest AI features.

Cadence – effortless cloud execution for ML workflows Pro

We’re introducing Cadence. You can now run your machine learning code on powerful cloud hardware directly from PyCharm in minutes – no complex setup or cloud expertise is required. The Cadence plugin simplifies ML workflows, allowing you to focus on your code while leveraging scalable computing resources. 

Data Wrangler Pro

We’ve implemented Data Wrangler, a powerful tool to help Python data professionals streamline data manipulation and focus on higher-level analysis. Use the interactive UI to perform common dataframe transformations – like filtering, cleaning, handling outliers, and more – without writing repetitive code. 

You can view and explore column statistics, generate Python code for transformations automatically, track the history of changes, export data easily, and insert transformations as new cells in your notebook.

Data Wrangler in PyCharm

SQL cells in notebooks Pro

PyCharm 2025.1 introduces SQL cells. This new cell type allows you to query databases, dataframes, and attached CSV files in Jupyter notebooks and automatically save query results to pandas DataFrames.

SQL cells in PyCharm

We’ve also introduced many other improvements to enhance the Jupyter notebook experience. Learn more about them in the What’s New.

Support for Hatch

We’re introducing support for Hatch, a modern and extensible Python project manager from the Python Packaging Authority (PyPA). Hatch can automatically migrate setuptools configurations, create isolated environments, and run and publish builds, making Python package management more efficient. 

Hatch support in PyCharm

PyCharm also allows you to create new projects managed by Hatch. The IDE will automatically recognize Hatch projects when they are imported from a local machine or a remote source.

Looking for more?

We’d love to hear your feedback on PyCharm 2025.1 – leave your comments below or connect with us on X.

April 16, 2025 01:58 PM UTC

PyCharm, the Only Python IDE You Need

Estimated reading time: 3 minutes

One PyCharm for Everyone

PyCharm is now one powerful, unified product! Its core functionality, including Jupyter Notebook support, will be free, and a Pro subscription will be available with additional features. Starting with the 2025.1 release, every user will get instant access to a free one-month Pro trial, so you’ll be able to access all of PyCharm’s advanced features right away. After the trial, you can choose whether to continue with a Pro subscription or keep using the core features for free.

Previously, PyCharm was offered as two separate products: the free Community Edition and the Professional Edition with extended capabilities. Now, with a single streamlined product, you no longer need to choose. Everything is in one place, and you can seamlessly switch between core and advanced features within the same installation whenever you need to.

💡 What’s new?

✅ One product for all developers

You no longer need to worry about additional downloads or switching between editions. PyCharm is now a single product. Start with a month of full Pro access for free, and then keep using the core features at no cost. Upgrade to Pro anytime within the same installation.

🎓 Free Jupyter Notebook support

PyCharm now offers free Jupyter support, including running, debugging, output rendering, and intelligent code assistance in notebooks. It’s perfect for data workflows, no Pro subscription required. However, a Pro subscription does offer more advanced capabilities, including remote notebooks, dynamic tables, SQL cells, and others.

🚀 Seamless access to Pro

With every new major PyCharm release (currently three times a year), you will get instant access to a free one-month Pro trial. Once it ends, you can continue using the core features for free.

🛠️ One product, better quality

Focusing on a single PyCharm product will help us improve overall quality, streamline updates, and deliver new features faster.

What does it mean for me?

🐍 I’m a PyCharm Community Edition user

First of all, thank you for being part of our amazing community! Your feedback, passion, and contributions have helped shape PyCharm into the tool it is today.

Nothing is going to change for you right away – you can upgrade to PyCharm Community 2025.1 as usual. Alternatively, you may choose to manually switch to the new PyCharm immediately and keep using everything you have now for free, plus the support for Jupyter notebooks.

Starting with PyCharm 2025.2, we’ll offer a smooth migration path that preserves your current setup and preferences. PyCharm Community 2025.2 will be the final standalone version, and, from 2025.3 onward, all Community Edition users will transition to the unified PyCharm experience.

Rest assured – our commitment to open-source development remains as strong as ever. The Community Edition codebase will stay public on GitHub, and we’ll continue to maintain and update it. We’ll also provide an easy way to build PyCharm from source via GitHub Actions.

Have more questions about what’s next? Read our extended FAQ for more details.

👥 I’m a PyCharm Professional Edition user

Nothing changes! Your license will automatically work with the new single PyCharm product. Simply upgrade to PyCharm 2025.1 and continue enjoying everything Pro has to offer.

🆕 I’m new to PyCharm 

You can start right away with the new single PyCharm product. You’ll get a free one-month Pro trial with full functionality. After that, you can purchase a Pro subscription and keep using PyCharm with its full capabilities, or you can continue using just the core features – including Jupyter Notebook support – for free. Download PyCharm now.

April 16, 2025 12:11 PM UTC


Real Python

Quiz: How to Exit Loops Early With the Python Break Keyword

In this quiz, you’ll test your understanding of the Python break statement. This keyword allows you to exit a loop prematurely, transferring control to the code that follows the loop.


[ Improve Your Python With 🐍 Python Tricks 💌 – Get a short & sweet Python Trick delivered to your inbox every couple of days. >> Click here to learn more and see examples ]

April 16, 2025 12:00 PM UTC