Laravel Queues: Simplifying Queue Management with SingleStore

Guy WarnerGuy Warner
8 min read

One of the reasons we decided to switch over to SingleStore Cloud was that we could not only move our MySql (AWS Aurora), but we could also get rid of our Redis cluster that we are using for queues. Utilizing SingleStore's rowstore table type, we can keep the jobs table in memory for faster reads.

We had a few issues using the out-of-the-box MySQL driver for the queues. Even though it's an in-memory table, we were still experiencing table locks. This is due to the nodes locking down different rows but only returning one. To combat this, we first tried loading the jobs in random order within a created time of 5 minutes. This worked okay. Then Jack Ellis tweeted this:

Below is what we are currently using in our production environment to handle our queue (most of it is the DB version, we pulled it all out in case we needed more modifications):

SingleStoreQueue.php

<?php

namespace App\Queue;

use App\Queue\Jobs\SingleStoreJob;
use App\Queue\Jobs\SingleStoreJobRecord;
use Illuminate\Contracts\Queue\ClearableQueue;
use Illuminate\Contracts\Queue\Queue as QueueContract;
use Illuminate\Queue\Queue;
use Illuminate\Support\Carbon;
use Illuminate\Support\Facades\Cache;

class SingleStoreQueue extends Queue implements QueueContract, ClearableQueue
{
    /**
     * The database connection instance.
     *
     * @var \Illuminate\Database\Connection
     */
    protected $database;

    /**
     * The database table that holds the jobs.
     *
     * @var string
     */
    protected $table;

    /**
     * The name of the default queue.
     *
     * @var string
     */
    protected $default;

    /**
     * The expiration time of a job.
     *
     * @var int|null
     */
    protected $retryAfter = 60;

    /**
     * Store the cache lock key.
     *
     * @var mixed
     */
    protected $lock;

    /**
     * Create a new database queue instance.
     *
     * @param  \Illuminate\Database\ConnectionInterface|\Illuminate\Database\Connection  $database
     * @param  string  $table
     * @param  string  $default
     * @param  int  $retryAfter
     * @param  bool  $dispatchAfterCommit
     *
     * @return void
     */
    public function __construct(
        $database,
        $table,
        $default = 'default',
        $retryAfter = 60,
        $dispatchAfterCommit = false
    ) {
        $this->table = $table;
        $this->default = $default;
        $this->database = $database;
        $this->retryAfter = $retryAfter;
        $this->dispatchAfterCommit = $dispatchAfterCommit;
    }

    /**
     * Get the size of the queue.
     *
     * @param  string|null  $queue
     *
     * @return int
     */
    public function size($queue = null)
    {
        return $this->database->table($this->table)
            ->where('queue', $this->getQueue($queue))
            ->count();
    }

    /**
     * Push a new job onto the queue.
     *
     * @param  string  $job
     * @param  mixed  $data
     * @param  string|null  $queue
     *
     * @return mixed
     */
    public function push($job, $data = '', $queue = null)
    {
        return $this->enqueueUsing(
            $job,
            $this->createPayload($job, $this->getQueue($queue), $data),
            $queue,
            null,
            function ($payload, $queue) {
                return $this->pushToDatabase($queue, $payload);
            }
        );
    }

    /**
     * Push a raw payload onto the queue.
     *
     * @param  string  $payload
     * @param  string|null  $queue
     * @param  array  $options
     *
     * @return mixed
     */
    public function pushRaw($payload, $queue = null, array $options = [])
    {
        return $this->pushToDatabase($queue, $payload);
    }

    /**
     * Push a new job onto the queue after (n) seconds.
     *
     * @param  \DateTimeInterface|\DateInterval|int  $delay
     * @param  string  $job
     * @param  mixed  $data
     * @param  string|null  $queue
     *
     * @return void
     */
    public function later($delay, $job, $data = '', $queue = null)
    {
        return $this->enqueueUsing(
            $job,
            $this->createPayload($job, $this->getQueue($queue), $data),
            $queue,
            $delay,
            function ($payload, $queue, $delay) {
                return $this->pushToDatabase($queue, $payload, $delay);
            }
        );
    }

    /**
     * Push an array of jobs onto the queue.
     *
     * @param  array  $jobs
     * @param  mixed  $data
     * @param  string|null  $queue
     *
     * @return mixed
     */
    public function bulk($jobs, $data = '', $queue = null)
    {
        $queue = $this->getQueue($queue);

        $now = $this->availableAt();

        return $this->database->table($this->table)->insert(collect((array) $jobs)->map(
            function ($job) use ($queue, $data, $now) {
                return $this->buildDatabaseRecord(
                    $queue,
                    $this->createPayload($job, $this->getQueue($queue), $data),
                    isset($job->delay) ? $this->availableAt($job->delay) : $now,
                );
            }
        )->all());
    }

    /**
     * Release a reserved job back onto the queue after (n) seconds.
     *
     * @param  string  $queue
     * @param  \App\Queue\Jobs\SingleStoreJobRecord  $job
     * @param  int  $delay
     *
     * @return mixed
     */
    public function release($queue, $job, $delay)
    {
        return $this->pushToDatabase($queue, $job->payload, $delay, $job->attempts);
    }

    /**
     * Pop the next job off of the queue.
     *
     * @param  string|null  $queue
     *
     * @return \Illuminate\Contracts\Queue\Job|null
     *
     * @throws \Throwable
     */
    public function pop($queue = null)
    {
        $queue = $this->getQueue($queue);

        return $this->database->transaction(function () use ($queue) {
            if ($job = $this->getNextAvailableJob($queue)) {
                return $this->marshalJob($queue, $job);
            }
        });
    }

    /**
     * Delete a reserved job from the queue.
     *
     * @param  string  $queue
     * @param  string  $id
     *
     * @return void
     *
     * @throws \Throwable
     */
    public function deleteReserved($queue, $id)
    {
        $this->lock->release();
        $this->database->transaction(function () use ($id) {
            $this->database->table($this->table)->where('id', $id)->delete();
        });
    }

    /**
     * Delete a reserved job from the reserved queue and release it.
     *
     * @param  string  $queue
     * @param  \Illuminate\Queue\Jobs\DatabaseJob  $job
     * @param  int  $delay
     *
     * @return void
     */
    public function deleteAndRelease($queue, $job, $delay)
    {
        $this->lock->release();
        $this->database->transaction(function () use ($queue, $job, $delay) {
            if ($this->database->table($this->table)->find($job->getJobId())) {
                $this->database->table($this->table)->where('id', $job->getJobId())->delete();
            }

            $this->release($queue, $job->getJobRecord(), $delay);
        });
    }

    /**
     * Delete all of the jobs from the queue.
     *
     * @param  string  $queue
     *
     * @return int
     */
    public function clear($queue)
    {
        return $this->database->table($this->table)
            ->where('queue', $this->getQueue($queue))
            ->delete();
    }

    /**
     * Get the queue or return the default.
     *
     * @param  string|null  $queue
     *
     * @return string
     */
    public function getQueue($queue)
    {
        return $queue ? $queue : $this->default;
    }

    /**
     * Get the underlying database instance.
     *
     * @return \Illuminate\Database\Connection
     */
    public function getDatabase()
    {
        return $this->database;
    }

    /**
     * Push a raw payload to the database with a given delay of (n) seconds.
     *
     * @param  string|null  $queue
     * @param  string  $payload
     * @param  \DateTimeInterface|\DateInterval|int  $delay
     * @param  int  $attempts
     *
     * @return mixed
     */
    protected function pushToDatabase($queue, $payload, $delay = 0, $attempts = 0)
    {
        return $this->database->table($this->table)->insertGetId($this->buildDatabaseRecord(
            $this->getQueue($queue),
            $payload,
            $this->availableAt($delay),
            $attempts
        ));
    }

    /**
     * Create an array to insert for the given job.
     *
     * @param  string|null  $queue
     * @param  string  $payload
     * @param  int  $availableAt
     * @param  int  $attempts
     *
     * @return array
     */
    protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts = 0)
    {
        return [
            'queue' => $queue,
            'attempts' => $attempts,
            'reserved_at' => null,
            'available_at' => $availableAt,
            'created_at' => $this->currentTime(),
            'payload' => $payload,
        ];
    }

    /**
     * Get the next available job for the queue.
     *
     * @param  string|null  $queue
     *
     * @return  \App\Queue\Jobs\SingleStoreJobRecord|null
     */
    protected function getNextAvailableJob($queue)
    {
        $job = $this->database->table($this->table)
            ->where('queue', $this->getQueue($queue))
            ->where(function ($query) {
                $this->isAvailable($query);
            })
            ->first();
        if (is_null($job)) {
            return null;
        }
        try {
            $this->lock = Cache::lock('queue:worker-'.$job->id, 10 * 60);
        } catch (\Throwable $th) {
            return null;
        }
        if ($this->lock->get()) {
            return new SingleStoreJobRecord((object) $job);
        }

        return null;
    }

    /**
     * Modify the query to check for available jobs.
     *
     * @param  \Illuminate\Database\Query\Builder  $query
     *
     * @return void
     */
    protected function isAvailable($query)
    {
        $query->where(function ($query) {
            $query->where('reserved', 0)
                ->where('available_at', '<=', $this->currentTime());
        });
    }

    /**
     * Modify the query to check for jobs that are reserved but have expired.
     *
     * @param  \Illuminate\Database\Query\Builder  $query
     *
     * @return void
     */
    protected function isReservedButExpired($query)
    {
        $expiration = Carbon::now()->subSeconds($this->retryAfter)->getTimestamp();

        $query->orWhere(function ($query) use ($expiration) {
            $query->where('reserved_at', '<=', $expiration);
        });
    }

    /**
     * Marshal the reserved job into a DatabaseJob instance.
     *
     * @param  string  $queue
     * @param  \App\Queue\Jobs\SingleStoreJobRecord  $job
     *
     * @return \Illuminate\Queue\Jobs\DatabaseJob
     */
    protected function marshalJob($queue, $job)
    {
        $job = $this->markJobAsReserved($job);

        return new SingleStoreJob(
            $this->container,
            $this,
            $job,
            $this->connectionName,
            $queue
        );
    }

    /**
     * Mark the given job ID as reserved.
     *
     * @param  \App\Queue\Jobs\SingleStoreJobRecord  $job
     *
     * @return \App\Queue\Jobs\SingleStoreJobRecord
     */
    protected function markJobAsReserved($job)
    {
        $this->database->table($this->table)->where('id', $job->id)->update([
            'reserved' => 1,
            'reserved_at' => now()->getTimestamp(),
            'attempts' => $job->increment(),
        ]);
        return $job;
    }
}

SingleStoreJobRecord.php:

<?php

namespace App\Queue\Jobs;

use Illuminate\Support\InteractsWithTime;

class SingleStoreJobRecord
{
    use InteractsWithTime;

    /**
     * The underlying job record.
     *
     * @var \stdClass
     */
    protected $record;

    /**
     * Create a new job record instance.
     *
     * @param  \stdClass  $record
     *
     * @return void
     */
    public function __construct($record)
    {
        $this->record = $record;
    }

    /**
     * Dynamically access the underlying job information.
     *
     * @param  string  $key
     *
     * @return mixed
     */
    public function __get($key)
    {
        return $this->record->{$key};
    }

    /**
     * Increment the number of times the job has been attempted.
     *
     * @return int
     */
    public function increment()
    {
        $this->record->attempts++;

        return $this->record->attempts;
    }

    /**
     * Update the "reserved at" timestamp of the job.
     *
     * @return int
     */
    public function touch()
    {
        $this->record->reserved_at = $this->currentTime();

        return $this->record->reserved_at;
    }
}

SingleStoreJob.php

<?php

namespace App\Queue\Jobs;

use App\Queue\SingleStoreQueue;
use Illuminate\Container\Container;
use Illuminate\Contracts\Queue\Job as JobContract;
use Illuminate\Queue\Jobs\Job;

class SingleStoreJob extends Job implements JobContract
{
    /**
     * The database queue instance.
     *
     * @var \App\Queue\SingleStoreQueue
     */
    protected $database;

    /**
     * The database job payload.
     *
     * @var \stdClass
     */
    protected $job;

    /**
     * Create a new job instance.
     *
     * @param  \Illuminate\Container\Container  $container
     * @param  SingleStoreQueue $database
     * @param  \stdClass  $job
     * @param  string  $connectionName
     * @param  string  $queue
     *
     * @return void
     */
    public function __construct(Container $container, SingleStoreQueue $database, $job, $connectionName, $queue)
    {
        $this->job = $job;
        $this->queue = $queue;
        $this->database = $database;
        $this->container = $container;
        $this->connectionName = $connectionName;
    }

    /**
     * Release the job back into the queue after (n) seconds.
     *
     * @param  int  $delay
     *
     * @return void
     */
    public function release($delay = 0)
    {
        parent::release($delay);

        $this->database->deleteAndRelease($this->queue, $this, $delay);
    }

    /**
     * Delete the job from the queue.
     *
     * @return void
     */
    public function delete()
    {
        parent::delete();

        $this->database->deleteReserved($this->queue, $this->job->id);
    }

    /**
     * Get the number of times the job has been attempted.
     *
     * @return int
     */
    public function attempts()
    {
        return (int) $this->job->attempts;
    }

    /**
     * Get the job identifier.
     *
     * @return string
     */
    public function getJobId()
    {
        return $this->job->id;
    }

    /**
     * Get the raw body string for the job.
     *
     * @return string
     */
    public function getRawBody()
    {
        return $this->job->payload;
    }

    /**
     * Get the database job record.
     *
     * @return \Illuminate\Queue\Jobs\DatabaseJobRecord
     */
    public function getJobRecord()
    {
        return $this->job;
    }
}
CREATE ROWSTORE TABLE `jobs` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `queue` varchar(255) CHARACTER SET utf8 COLLATE utf8_unicode_ci NOT NULL DEFAULT '',
  `payload` longtext CHARACTER SET utf8 COLLATE utf8_unicode_ci NOT NULL DEFAULT '',
  `attempts` tinyint(3) unsigned NOT NULL,
  `reserved` tinyint(3) unsigned NOT NULL DEFAULT 0,
  `reserved_at` int(10) unsigned DEFAULT NULL,
  `available_at` int(10) unsigned NOT NULL,
  `created_at` int(10) unsigned NOT NULL,
  KEY `jobs_queue_reserved_reserved_at_index` (`queue`,`reserved`,`reserved_at`),
  SHARD KEY `id` (`id`)
) AUTO_INCREMENT=140000000 AUTOSTATS_CARDINALITY_MODE=PERIODIC AUTOSTATS_HISTOGRAM_MODE=CREATE SQL_MODE='STRICT_ALL_TABLES'
CREATE TABLE `cache_locks` (
  `key` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL,
  `owner` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL,
  `expiration` int(11) NOT NULL,
  UNIQUE KEY `PRIMARY` (`key`) USING HASH,
  SHARD KEY `__SHARDKEY` (`key`),
  KEY `expiration` (`expiration`) USING CLUSTERED COLUMNSTORE
) AUTOSTATS_CARDINALITY_MODE=INCREMENTAL AUTOSTATS_HISTOGRAM_MODE=CREATE AUTOSTATS_SAMPLING=ON SQL_MODE='ONLY_FULL_GROUP_BY,STRICT_ALL_TABLES,NO_AUTO_CREATE_USER'
10
Subscribe to my newsletter

Read articles from Guy Warner directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Guy Warner
Guy Warner

I am a self-taught developer currently in Salt Lake City. Currently, work in Laravel but have experience in plain PHP, PERL, and CakePHP. Entrepreneurial VP of tech and CTO with a strong grasp of both the business and engineering. PHP developer for 10 years with experience in CakePHP and Laravel