Home > memcached, perl, queue > Memcached based message queues

Memcached based message queues

February 26th, 2009

Distributed caching and message queues are basic building blocks of distributed computing.  Memcached is clearly the best of breed for caching.  For message queues however the choices are many and vary in features, reliability and ease of use.

I needed a simple and fast queue with only basic reliability (lost messages don’t cost much).   Memcached offers some special operations that make it possible to build queues… all you need is a couple counters. Here’s the basic idea:

Memcached queue design

First create head and tail keys to track the unread messages in the queue.   Then using the atomic incr() operation memcached provides,  increment the head and use that number as the key for a new message in the queue.  Increment the tail and use that the key of the message of the next message to read.  You can calculate the current queue size by subtracting the head and tail counters.

On the downside  you may get no messages when you call recv() if the cache is full.   Also, there is also no way of knowing what happens to a message once you recv() it. On the upside you can drop in a service like memcachedb for persistent storage.

There are similar queues built using memcached like sparrow.

Here is the perl implementation, should be easy to port to any language memcached provides clients for…

package MemQueue;

use strict;
use warnings;

use Cache::Memcached;
use constant PREFIX => "MEMQUEUE_";

sub new
{
    my $cn      = shift;
    my $name    = shift;
    die("queue name required") unless defined $name;

    my @servers   = @_;
    my $self      = {};
    $self->{name} = PREFIX().$name;
    $self->{head} = PREFIX().$name."_head";
    $self->{tail} = PREFIX().$name."_tail";
    $self->{memd} = new Cache::Memcached( servers => @servers );

    #create queue
    $self->{memd}->add( $self->{head}, 0 );
    $self->{memd}->add( $self->{tail}, 0 );

    return bless($self, $cn);
}

sub send
{
    my $self = shift;
    my $mess = shift;
    return unless defined $mess;

    #advance the head
    my $id = $self->{memd}->incr($self->{head});
    die("cache error") unless defined $id;

    $self->{memd}->set($self->{name}."$id", $mess);
}

sub recv
{
    my $self = shift;
    return "empty" unless $self->length() > 0;

    #advance the tail
    my $id = $self->{memd}->incr($self->{tail});

    die("cache error") unless defined $id;

    return $self->{memd}->get($self->{name}."$id");
}

sub length
{
    my $self = shift;

    my $v    = $self->{memd}->get_multi($self->{head},$self->{tail});

    return -1 unless defined $v->{$self->{head}} && defined $v->{$self->{tail}}
    && $v->{$self->{head}} >= $v->{$self->{tail}};

    return $v->{$self->{head}} - $v->{$self->{tail}};
}

1;

##############Example program####################
my $mq = new MemQueue("test2", "127.0.0.1:11211" );

my $c = 1000;
foreach my $i (0..$c){
    $mq->send("message $i"x100);
}

foreach my $i (0..$c){
    warn($mq->recv());
}

warn("len ".$mq->length());

jake memcached, perl, queue

Viewing 2 Comments

 
close Reblog this comment
blog comments powered by Disqus