efficient circular buffer?

PythonCircular Buffer

Python Problem Overview

I want to create an efficient circular buffer in python (with the goal of taking averages of the integer values in the buffer).

Is this an efficient way to use a list to collect values?

def add_to_buffer( self, num ):
    self.mylist.pop( 0 )
    self.mylist.append( num )

What would be more efficient (and why)?

Python Solutions

Solution 1 - Python

I would use collections.deque with a maxlen arg

>>> import collections
>>> d = collections.deque(maxlen=10)
>>> d
deque([], maxlen=10)
>>> for i in xrange(20):
...     d.append(i)
>>> d
deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10)

There is a recipe in the docs for deque that is similar to what you want. My assertion that it's the most efficient rests entirely on the fact that it's implemented in C by an incredibly skilled crew that is in the habit of cranking out top notch code.

Solution 2 - Python

Although there are already a great number of great answers here, I could not find any direct comparison of timings for the options mentioned. Therefore, please find my humble attempt at a comparison below.

For testing purposes only, the class can switch between a list-based buffer, a collections.deque-based buffer, and a Numpy.roll-based buffer.

Note that the update method adds only one value at a time, to keep it simple.

import numpy
import timeit
import collections

class CircularBuffer(object):
    buffer_methods = ('list', 'deque', 'roll')

    def __init__(self, buffer_size, buffer_method):
        self.content = None
        self.size = buffer_size
        self.method = buffer_method

    def update(self, scalar):
        if self.method == self.buffer_methods[0]:
            # Use list
            except AttributeError:
                self.content = [0.] * self.size
        elif self.method == self.buffer_methods[1]:
            # Use collections.deque
            except AttributeError:
                self.content = collections.deque([0.] * self.size,
        elif self.method == self.buffer_methods[2]:
            # Use Numpy.roll
                self.content = numpy.roll(self.content, -1)
                self.content[-1] = scalar
            except IndexError:
                self.content = numpy.zeros(self.size, dtype=float)

# Testing and Timing
circular_buffer_size = 100
circular_buffers = [CircularBuffer(buffer_size=circular_buffer_size,
                    for method in CircularBuffer.buffer_methods]
timeit_iterations = 1e4
timeit_setup = 'from __main__ import circular_buffers'
timeit_results = []
for i, cb in enumerate(circular_buffers):
    # We add a convenient number of convenient values (see equality test below)
    code = '[circular_buffers[{}].update(float(j)) for j in range({})]'.format(
        i, circular_buffer_size)
    # Testing
    buffer_content = [item for item in cb.content]
    assert buffer_content == range(circular_buffer_size)
    # Timing
        timeit.timeit(code, setup=timeit_setup, number=int(timeit_iterations)))
    print '{}: total {:.2f}s ({:.2f}ms per iteration)'.format(
        cb.method, timeit_results[-1],
        timeit_results[-1] / timeit_iterations * 1e3)

On my system this yields:

list:  total 1.06s (0.11ms per iteration)
deque: total 0.87s (0.09ms per iteration)
roll:  total 6.27s (0.63ms per iteration)

Solution 3 - Python

popping from the head of a list causes the whole list to be copied, so is inefficient

You should instead use a list/array of fixed size and an index which moves through the buffer as you add/remove items

Solution 4 - Python

Based on MoonCactus's answer, here is a circularlist class. The difference with his version is that here c[0] will always give the oldest-appended element, c[-1] the latest-appended element, c[-2] the penultimate... This is more natural for applications.

c = circularlist(4)
c.append(1); print(c, c[0], c[-1])    #[1] (1/4 items)              1  1
c.append(2); print(c, c[0], c[-1])    #[1, 2] (2/4 items)           1  2
c.append(3); print(c, c[0], c[-1])    #[1, 2, 3] (3/4 items)        1  3
c.append(8); print(c, c[0], c[-1])    #[1, 2, 3, 8] (4/4 items)     1  8
c.append(10); print(c, c[0], c[-1])   #[2, 3, 8, 10] (4/4 items)    2  10
c.append(11); print(c, c[0], c[-1])   #[3, 8, 10, 11] (4/4 items)   3  11
d = circularlist(4, [1, 2, 3, 4, 5])  #[2, 3, 4, 5]


class circularlist(object):
    def __init__(self, size, data = []):
        self.index = 0
        self.size = size
        self._data = list(data)[-size:]

    def append(self, value):
        """Append an element"""
        if len(self._data) == self.size:
            self._data[self.index] = value
        self.index = (self.index + 1) % self.size

    def __getitem__(self, key):
        """Get element by index, relative to the current index"""
        if len(self._data) == self.size:
            return(self._data[(key + self.index) % self.size])

    def __repr__(self):
        """Return string representation"""
        return (self._data[self.index:] + self._data[:self.index]).__repr__() + ' (' + str(len(self._data))+'/{} items)'.format(self.size)

Solution 5 - Python

ok with the use of deque class, but for the requeriments of the question (average) this is my solution:

>>> from collections import deque
>>> class CircularBuffer(deque):
...     def __init__(self, size=0):
...             super(CircularBuffer, self).__init__(maxlen=size)
...     @property
...     def average(self):  # TODO: Make type check for integer or floats
...             return sum(self)/len(self)
>>> cb = CircularBuffer(size=10)
>>> for i in range(20):
...     cb.append(i)
...     print "@%s, Average: %s" % (cb, cb.average)
@deque([0], maxlen=10), Average: 0
@deque([0, 1], maxlen=10), Average: 0
@deque([0, 1, 2], maxlen=10), Average: 1
@deque([0, 1, 2, 3], maxlen=10), Average: 1
@deque([0, 1, 2, 3, 4], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5, 6], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8], maxlen=10), Average: 4
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], maxlen=10), Average: 4
@deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], maxlen=10), Average: 5
@deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10), Average: 6
@deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10), Average: 7
@deque([4, 5, 6, 7, 8, 9, 10, 11, 12, 13], maxlen=10), Average: 8
@deque([5, 6, 7, 8, 9, 10, 11, 12, 13, 14], maxlen=10), Average: 9
@deque([6, 7, 8, 9, 10, 11, 12, 13, 14, 15], maxlen=10), Average: 10
@deque([7, 8, 9, 10, 11, 12, 13, 14, 15, 16], maxlen=10), Average: 11
@deque([8, 9, 10, 11, 12, 13, 14, 15, 16, 17], maxlen=10), Average: 12
@deque([9, 10, 11, 12, 13, 14, 15, 16, 17, 18], maxlen=10), Average: 13
@deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10), Average: 14

Solution 6 - Python

Solution 7 - Python

You can also see this quite old Python recipe.

Here is my own version with NumPy array:

#!/usr/bin/env python

import numpy as np

class RingBuffer(object):
    def __init__(self, size_max, default_value=0.0, dtype=float):
        self.size_max = size_max

        self._data = np.empty(size_max, dtype=dtype)

        self.size = 0
    def append(self, value):
        """append an element"""
        self._data = np.roll(self._data, 1)
        self._data[0] = value 

        self.size += 1
        if self.size == self.size_max:
            self.__class__  = RingBufferFull

    def get_all(self):
        """return a list of elements from the oldest to the newest"""
    def get_partial(self):

    def __getitem__(self, key):
        """get element"""

    def __repr__(self):
        """return string representation"""
        s = self._data.__repr__()
        s = s + '\t' + str(self.size)
        s = s + '\t' + self.get_all()[::-1].__repr__()
        s = s + '\t' + self.get_partial()[::-1].__repr__()

class RingBufferFull(RingBuffer):
    def append(self, value):
        """append an element when buffer is full"""
        self._data = np.roll(self._data, 1)
        self._data[0] = value

Solution 8 - Python

How about the solution from the Python Cookbook, including a reclassification of the ring buffer instance when it becomes full?

class RingBuffer:
    """ class that implements a not-yet-full buffer """
    def __init__(self,size_max):
        self.max = size_max
        self.data = []

    class __Full:
        """ class that implements a full buffer """
        def append(self, x):
            """ Append an element overwriting the oldest one. """
            self.data[self.cur] = x
            self.cur = (self.cur+1) % self.max
        def get(self):
            """ return list of elements in correct order """
            return self.data[self.cur:]+self.data[:self.cur]

    def append(self,x):
        """append an element at the end of the buffer"""
        if len(self.data) == self.max:
            self.cur = 0
            # Permanently change self's class from non-full to full
            self.__class__ = self.__Full

    def get(self):
        """ Return a list of elements from the oldest to the newest. """
        return self.data

# sample usage
if __name__=='__main__':
    x.append(1); x.append(2); x.append(3); x.append(4)
    print(x.__class__, x.get())
    print(x.__class__, x.get())
    print(x.data, x.get())
    x.append(7); x.append(8); x.append(9); x.append(10)
    print(x.data, x.get())

> The notable design choice in the implementation is that, since these > objects undergo a nonreversible state transition at some point in > their lifetimes—from non-full buffer to full-buffer (and behavior > changes at that point)—I modeled that by changing self.__class__. > This works even in Python 2.2, as long as both classes have the same > slots (for example, it works fine for two classic classes, such as > RingBuffer and __Full in this recipe). > > Changing the class of an instance may be strange in many languages, > but it is a Pythonic alternative to other ways of representing > occasional, massive, irreversible, and discrete changes of state that > vastly affect behavior, as in this recipe. Good thing that Python > supports it for all kinds of classes.

Credit: Sébastien Keim

Solution 9 - Python

From Github:

class CircularBuffer:
    def __init__(self, size):
        """Store buffer in given storage."""
        self.buffer = [None]*size
        self.low = 0
        self.high = 0
        self.size = size
        self.count = 0

    def isEmpty(self):
        """Determines if buffer is empty."""
        return self.count == 0

    def isFull(self):
        """Determines if buffer is full."""
        return self.count == self.size
    def __len__(self):
        """Returns number of elements in buffer."""
        return self.count
    def add(self, value):
        """Adds value to buffer, overwrite as needed."""
        if self.isFull():
            self.low = (self.low+1) % self.size
            self.count += 1
        self.buffer[self.high] = value
        self.high = (self.high + 1) % self.size
    def remove(self):
        """Removes oldest value from non-empty buffer."""
        if self.count == 0:
            raise Exception ("Circular Buffer is empty");
        value = self.buffer[self.low]
        self.low = (self.low + 1) % self.size
        self.count -= 1
        return value
    def __iter__(self):
        """Return elements in the circular buffer in order using iterator."""
        idx = self.low
        num = self.count
        while num > 0:
            yield self.buffer[idx]
            idx = (idx + 1) % self.size
            num -= 1

    def __repr__(self):
        """String representation of circular buffer."""
        if self.isEmpty():
            return 'cb:[]'

        return 'cb:[' + ','.join(map(str,self)) + ']'


Solution 10 - Python

This one does not require any library. It grows a list and then cycle within by index.

The footprint is very small (no library), and it runs twice as fast as dequeue at least. This is good to compute moving averages indeed, but be aware that the items are not kept sorted by age as above.

class CircularBuffer(object):
	def __init__(self, size):
		self.index= 0
		self.size= size
		self._data = []

	def record(self, value):
		"""append an element"""
		if len(self._data) == self.size:
			self._data[self.index]= value
		self.index= (self.index + 1) % self.size

	def __getitem__(self, key):
		"""get element by index like a regular array"""
	def __repr__(self):
		"""return string representation"""
		return self._data.__repr__() + ' (' + str(len(self._data))+' items)'

	def get_all(self):
		"""return a list of all the elements"""

To get the average value, e.g.:

q= CircularBuffer(1000000);
for i in range(40000):
print "capacity=", q.size
print "stored=", len(q.get_all())
print "average=", sum(q.get_all()) / len(q.get_all())

Results in:

capacity= 1000000
stored= 40000
average= 19999

real 0m0.024s
user 0m0.020s
sys  0m0.000s

This is about 1/3 the time of the equivalent with dequeue.

Solution 11 - Python

I've had this problem before doing serial programming. At the time just over a year ago, I couldn't find any efficient implementations either, so I ended up writing one as a C extension and it's also available on pypi under an MIT license. It's super basic, only handles buffers of 8-bit signed chars, but is of flexible length, so you can use Struct or something on top of it if you need something other than chars. I see now with a google search that there are several options these days though, so you might want to look at those too.

Solution 12 - Python

The original question was: "efficient" circular buffer. According to this efficiency asked for, the answer from aaronasterling seems to be definitively correct. Using a dedicated class programmed in Python and comparing time processing with collections.deque shows a x5.2 times acceleration with deque! Here is very simple code to test this:

class cb:
    def __init__(self, size):
        self.b = [0]*size
        self.i = 0
        self.sz = size
    def append(self, v):
        self.b[self.i] = v
        self.i = (self.i + 1) % self.sz

b = cb(1000)
for i in range(10000):
# called 200 times, this lasts 1.097 second on my laptop

from collections import deque
b = deque( [], 1000 )
for i in range(10000):
# called 200 times, this lasts 0.211 second on my laptop

To transform a deque into a list, just use:

my_list = [v for v in my_deque]

You will then get O(1) random access to the deque items. Of course, this is only valuable if you need to do many random accesses to the deque after having set it once.

Solution 13 - Python

This is applying the same principal to some buffers intended to hold the most recent text messages.

import time
import datetime
import sys, getopt

class textbffr(object):
	def __init__(self, size_max):
		self.posn_max = size_max-1
		self._data = [""]*(size_max)
		self.posn = self.posn_max

	def append(self, value):
		#append an element
		if self.posn == self.posn_max:
			self.posn = 0
			self._data[self.posn] = value	
			self.posn += 1
			self._data[self.posn] = value

	def __getitem__(self, key):
		#return stored element
		if (key + self.posn+1) > self.posn_max:
			return(self._data[key - (self.posn_max-self.posn)])
			return(self._data[key + self.posn+1])

def print_bffr(bffr,bffer_max): 
	for ind in range(0,bffer_max):
		stored = bffr[ind]
		if stored != "":
	print ( '\n' )
def make_time_text(time_value):
	return(str(time_value.month).zfill(2) + str(time_value.day).zfill(2)
	  + str(time_value.hour).zfill(2) +  str(time_value.minute).zfill(2)
	  + str(time_value.second).zfill(2))

def main(argv):
	#Set things up 
	starttime = datetime.datetime.now()
	log_max = 5
	status_max = 7
	log_bffr = textbffr(log_max)
	status_bffr = textbffr(status_max)
	scan_count = 1
	#Main Loop
	# every 10 secounds write a line with the time and the scan count.
	while True:	

		time_text = make_time_text(datetime.datetime.now())
		#create next messages and store in buffers
		status_bffr.append(str(scan_count).zfill(6) + " :  Status is just fine at : " + time_text)
		log_bffr.append(str(scan_count).zfill(6) + " : " + time_text + " : Logging Text ")
		#print whole buffers so far
		scan_count += 1	
if __name__ == '__main__':

Solution 14 - Python

I don't get the answers here. Obviously if you're working within NumPy, you'd want to subclass either array or ndarray (usually), that way (at least once your cyclic array is full) you can still use the NumPy array arithmetic operations on the cyclical array. The only thing you have to be careful of is that for operations that span multiple components (such as a moving average), you don't have your window be larger than what has accumulated in the buffer.

Also, as all the commenters mentioned, don't use rolling as that defeats the purpose of efficiency. If you need a growing array, you simply double its size each time a resize is required (this is different from a cyclical array implementation).


All content for this solution is sourced from the original question on Stackoverflow.

The content on this page is licensed under the Attribution-ShareAlike 4.0 International (CC BY-SA 4.0) license.

Content TypeOriginal AuthorOriginal Content on Stackoverflow
QuestionjedierikbView Question on Stackoverflow
Solution 1 - PythonaaronasterlingView Answer on Stackoverflow
Solution 2 - PythondjvgView Answer on Stackoverflow
Solution 3 - PythonJohn La RooyView Answer on Stackoverflow
Solution 4 - PythonBasjView Answer on Stackoverflow
Solution 5 - PythonSmartElectronView Answer on Stackoverflow
Solution 6 - PythonOrvar KorvarView Answer on Stackoverflow
Solution 7 - PythonsclsView Answer on Stackoverflow
Solution 8 - Pythond8aninjaView Answer on Stackoverflow
Solution 9 - PythonIjaz AhmadView Answer on Stackoverflow
Solution 10 - PythonMoonCactusView Answer on Stackoverflow
Solution 11 - PythonsirlarkView Answer on Stackoverflow
Solution 12 - PythonSchmoukView Answer on Stackoverflow
Solution 13 - PythonDavid TorrensView Answer on Stackoverflow
Solution 14 - PythonDiagramChasingBluesView Answer on Stackoverflow