Mastering Laravel job batching Pt. 4: Refine our custom batch system

Introduction

This article is the last one of the series: Mastering Laravel batches

In the previous article, we built custom system designed to run a progressive batch safely and easily. It works fine but what we're missing now is an abstraction to allow developers to use it without copy-pasting a lot of code and to be able to use it in a easy way.
And there's an issue with the code and we're starting by tackle this one.

In order to focus on problem-solving, this article will be less guided as previous one. Then don't expect parts about how to test the code, this will be on your behalf. I strongly encourage to do so at it is the best way to learn new things: if you're able to test the code on your own, this means you understand it and you have the tools to do it.

The null problem

From times to times, there's problem with null because it's not a value. Here, we have the opposite problem because null can be a valid value and it is not processed as such. Check the way a new job is added to the batch:

# app/Jobs/SimpleJob.php

class SimpleJob implements ShouldQueue
{
    public function handle(): void
    {
        // ...
        $batch_queue = new BatchQueue($this->batch(), new RedisBatchQueueRepository());
        $next_item  = $batch_queue->pop();
        if ($next_item) {
            $this->batch()->add(new static($next_item));
        }

        //...
    }
}

We see that we check that $next_item is truthy before adding a new job to the batch. But if we inspect the pop() method, we see this:

# app/Models/BatchQueue.php
class BatchQueue
{
    // ...
    public function pop(): mixed
    {
        return $this->repository->getFirstItem($this->key);
    }
    // ...
}

The mixed in the signature means that null is considered as valid data, which is right but is a first clue. How can we know that the null received when calling this method is a null that should be treated as a value or a null indicating that there no items left in the queue? We don't know and we can't know. And with the SimpleJob code, receiving null means end of the queue and the batch will stop.
How can we solve this problem? First, the mixed return type is part of the problem. It doesn't express anything on what we should expect from the pop(). Yes, it can return any type because any type of value can be a valid parameter of a job, but how can we know if it's the last item of the queue. We have two solutions here:

  • don't touch pop() but check that the queue has any item left before retrieving it

  • update the pop() method to have it return a more explicit type.

First solution is the most commonly used so we're going to go for second one!
Having a explicit type when all type of value can be valid means that we need to wrap our value in an object which will be responsible to tells us if there is a value or not.
For this purpose, we'll use an Option type.

Install the package:

sail composer require phpoption/phpoption

Let's see how this works by using it. We'll start by updating the typing in the queue interface:

# app/Contracts/BatchQueueRepository.php
interface BatchQueueRepository
{
    //...
    public function getFirstItem(string $key): Option;
    //...
}

And review this method in the concrete class:

# app/Repositories/RedisBatchQueueRepository.php

class RedisBatchQueueRepository implements BatchQueueRepository
{
    public function getFirstItem(string $key): Option
    {
        $serialized_item =  Redis::lpop($key);
        $item = None::create();
        if ($serialized_item !== false) {
            $item = Some::create(unserialize($serialized_item));
        }
        return $item;
    }
}

Redis::lpop will return false if nothing's left in the queue. Because all items are serialized, there is no ambiguity between a false value in the queue and a false because the queue is empty.
Now, using Option is simple: if there is no value, it will a be a None object, otherwise if a value is found, it will be wrapped into a Some object.
Let's update the typing of the pop() method:

# app/Models/BatchQueue.php
class BatchQueue
{
    // ...
    public function pop(): Option
    {
        return $this->repository->getFirstItem($this->key);
    }
    // ...
}

The final usage in the job is as simple as this:

# app/Jobs/SimpleJob.php

class SimpleJob implements ShouldQueue
{
    public function handle(): void
    {
        // ...
        $batch_queue = new BatchQueue($this->batch(), new RedisBatchQueueRepository());
        $next_item  = $batch_queue->pop();
        if ($next_item->isDefined()) {
            $this->batch()->add(new static($next_item->get()));
        }

        //...
    }
}

$next_item->isDefined() informs us if a value is available and it can be retrieved with $next_item->get().

Our problem is solved!
Is it really? Well, more on less. We have replaced a non-explicit type with a explicit one but it's still a type and can be a valid value for a job... Except if this Option usage is restricted throughout the code to specific use case or even better if a specific type is created like this:

class QueueItem extends Option{}

this could lead to clearer code without ambiguity such as:

# app/Models/BatchQueue.php
class BatchQueue
{
    // ...
    public function pop(): QueueItem
    {
        return $this->repository->getFirstItem($this->key);
    }
    // ...
}

But this is a lot of boilerplate for a simple task and the solution of just checking the queue is simpler.
Either you have obscure mixed type and something like this:

$batch_queue = new BatchQueue($this->batch(), new RedisBatchQueueRepository());
if ($batch_queue->isNotEmpty()) {
    $this->batch()->add(new static($batch_queue->pop()));
}

Or you have explicit QueueItem type and this:

$batch_queue = new BatchQueue($this->batch(), new RedisBatchQueueRepository());
$next_item  = $batch_queue->pop();
if ($next_item->isDefined()) {
    $this->batch()->add(new static($next_item->get()));
}

Or a solution between the two.
Choice is yours, and you'll have to consider development speed, readability, maintainability, new developers onboarding to know which solution fits you the best.

An abstraction for a progressive batch job

Now, we're going to work on our SimpleJob class. Goal is to remove everything that is not related to its real work from it. This will lead to a job that is limited to its own work, ans thus easy to write for any developer without thinking about the way it will be used by the application. In few words, we need an abstraction that allows the developer to focus on the WHAT and not on the HOW.
Let's review our actual SimpleJob:

# app/Jobs/SimpleJob.php
class SimpleJob implements ShouldQueue
{
    use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * Create a new job instance.
     */
    public function __construct(public string $random_string)
    {
    }

    /**
     * Execute the job.
     */
    public function handle(): void
    {
        try {
            if (strlen($this->random_string) != 30) {
                throw new \Exception('Random string must be 30 characters long');
            }

            Log::info(sprintf('%s [%s] RAN', class_basename($this), $this->random_string));
        } catch (Throwable $e) {
            if ($this->batch()) {
                $args = [];
                $job_reflection = new ReflectionClass($this);
                $constructor = $job_reflection->getConstructor();
                foreach ($constructor->getParameters() as $param) {
                    $param_name = $param->getName();
                    $args[$param->getName()] = $this->$param_name;
                }

                event(new BatchJobFailed($this->batch(), [
                    'uuid' => $this->job->uuid(),
                    'error' => $e->getMessage(),
                    'class' =>  get_class($this),
                    'args' => $args,
                ]));
            }
            throw $e;
        } finally {
            if ($this->batch()) {
                $batch_queue = new BatchQueue($this->batch(), new RedisBatchQueueRepository());
                $next_item  = $batch_queue->pop();
                if ($next_item->isDefined()) {
                    $this->batch()->add(new static($next_item->get()));
                }
            }
        }
    }
}

We can easily isolate the WHAT in this class:

if (strlen($this->random_string) != 30) {
    throw new \Exception('Random string must be 30 characters long');
}
Log::info(sprintf('%s [%s] RAN', class_basename($this), $this->random_string));

This means that the vast majority of the code is just about how to run the job and is useless from a business logic perspective and worse, needs to be duplicated each time a progressive batch job is required.
Let's remove that code and just keep what's needed:

# app/Jobs/SimpleJob.php

class SimpleJob implements ShouldQueue
{
    use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public function __construct(public string $random_string)
    {
    }

    public function handle(): void
    {
        if (strlen($this->random_string) != 30) {
            throw new \Exception('Random string must be 30 characters long');
        }

        Log::info(sprintf('%s [%s] RAN', class_basename($this), $this->random_string));
    }
}

Still, the uses will be duplicated as they are the minimum required for all the jobs, then let's get rid of that too:

# app/Jobs/SimpleJob.php

class SimpleJob
{
    public function __construct(public string $random_string)
    {
    }

    public function handle(): void
    {
        if (strlen($this->random_string) != 30) {
            throw new \Exception('Random string must be 30 characters long');
        }

        Log::info(sprintf('%s [%s] RAN', class_basename($this), $this->random_string));
    }
}

Now, we have a problem: what about the handle method? How should be call the HOW?
First, we need to understand what is the handle() method. When a job is pulled from the queue to be processed, Laravel will call either the handle() or the __invoke() method to run it. This means that this method is mandatory if we want our job to run. Our desired process is too complex to simply run the job's work after a call to parent::handle() so we'll use another method to be called by the abstraction:

# app/Jobs/SimpleJob.php

class SimpleJob
{
    //...

    protected function run(): void
    {
        // ...
    }
}

We used the visibility protected in order to avoid this method to be called directly, which leads to having the job not run as expected as most of the required code is missing. Now, we have a clean and straight-to-the-point job and when anybody reads the code, they are focused on WHAT the job is doing, not on HOW it's doing it.

# app/Jobs/SimpleJob.php

namespace App\Jobs;

use Illuminate\Support\Facades\Log;

class SimpleJob
{
    /**
     * Create a new job instance.
     */
    public function __construct(public string $random_string)
    {
        //
    }

    /**
     * Execute the job.
     */
    protected function run(): void
    {
        if (strlen($this->random_string) != 30) {
            throw new \Exception('Random string must be 30 characters long');
        }

        Log::info(sprintf('%s [%s] RAN', class_basename($this), $this->random_string));
    }
}

Now, let's build the skeleton of our abstraction: we know that we will need a handle method and that we want to enforce the implementation of the run method. This leads us to a abstract class:

# app/Core/ProgressiveBatchJob.php

abstract class ProgressiveBatchJob implements ShouldQueue
{
    use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    abstract protected function run(): void;

    public function handle(): void
    {
    }
}

To know what we need to implement, let's inspect the old handle():

public function handle(): void
{
    try {
        if (strlen($this->random_string) != 30) {
            throw new \Exception('Random string must be 30 characters long');
        }

        Log::info(sprintf('%s [%s] RAN', class_basename($this), $this->random_string));
    } catch (Throwable $e) {
        if ($this->batch()) {
            $args = [];
            $job_reflection = new ReflectionClass($this);
            $constructor = $job_reflection->getConstructor();
            foreach ($constructor->getParameters() as $param) {
                $param_name = $param->getName();
                $args[$param->getName()] = $this->$param_name;
            }

            event(new BatchJobFailed($this->batch(), [
                'uuid' => $this->job->uuid(),
                'error' => $e->getMessage(),
                'class' =>  get_class($this),
                'args' => $args,
            ]));
        }
        throw $e;
    } finally {
        if ($this->batch()) {
            $batch_queue = new BatchQueue($this->batch(), new RedisBatchQueueRepository());
            $next_item  = $batch_queue->pop();
            if ($next_item->isDefined()) {
                $this->batch()->add(new static($next_item->get()));
            }
        }
    }
}

We see that:

  • the code in the try part is the job's work and is held in the run() method of each job

  • catch and finally holds code needed exclusively when the job is running in a batch. We have effectively two ways of processing the job, one as classic job, another as a part of a progressive batch. Let's write this down in the code:

# app/Core/ProgressiveBatchJob.php

abstract class ProgressiveBatchJob implements ShouldQueue
{
    // ...

    public function handle(): void
    {
        if ($this->batch()) {
            $this->runAsProgressiveBatchJob();
        } else {
            $this->runAsJob();
        }
    }

    protected function runAsJob(): void
    {
    }

    protected function runAsProgressiveBatchJob(): void
    {
    }
}

Clear and simple.
As a classic job, all that is required is to run the job:

# app/Core/ProgressiveBatchJob.php

abstract class ProgressiveBatchJob implements ShouldQueue
{
    // ...

    protected function runAsJob(): void
    {
        $this->run();
    }

    // ...
}

As a progressive batch job, we need to:

  • run the job

  • if job failed, send an event with the job data

  • add the next job to the queue if needed And we'll write just that, nothing more, nothing less. This will be a clear implementation, which still be open to override if needed for specific cases:

# app/Core/ProgressiveBatchJob.php

abstract class ProgressiveBatchJob implements ShouldQueue
{
    // ...
    protected function runAsProgressiveBatchJob(): void
    {
        try {
            $this->run();
        } catch (Throwable $e) {
            $this->progressiveBatchJobFailed($e);
            throw $e;
        } finally {
            $this->addNextJobToBatch();
        }
    }

    protected function progressiveBatchJobFailed(Throwable $e): void
    {
        $args = [];
        $job_reflection = new ReflectionClass($this);
        $constructor = $job_reflection->getConstructor();
        foreach ($constructor->getParameters() as $param) {
            $param_name = $param->getName();
            $args[$param->getName()] = $this->$param_name;
        }

        event(new BatchJobFailed($this->batch(), [
            'uuid' => $this->job->uuid(),
            'error' => $e->getMessage(),
            'class' =>  get_class($this),
            'args' => $args,
        ]));
    }

    protected function addNextJobToBatch(): void
    {
        $batch_queue = new BatchQueue($this->batch(), new RedisBatchQueueRepository());
        $next_item  = $batch_queue->pop();
        if ($next_item->isDefined()) {
            $this->batch()->add(new static($next_item->get()));
        }
    }
}

And that's it, our abstraction for the job is done!
Beside the abstraction process as code cleaning, what's important is the way we were able to express what we want through sentences and words which are not code. This always leads to clear and readable code with "motivated" parts, parts which make sense from a user or a reader perspective, not only the writer. Anybody can understand what addNextJobToBatch means, anybody can understand that there's two way of processing the job when reading the handle() method without having to deal with the implementation details. Just compare this new code with the old one, how much time will it take you to grasp what's going with the old code and the new one?
Abstraction is not only about avoiding duplicated code and improving code re-usability. It's also about making the code more readable and understandable. And readable and understandable code is easier to maintain.

An abstraction for a progressive batch

For the progressive batch abstraction, the goal is to extract most of the code we've done in the command and make it available in a class that other developer could use easily.

Create a batch

Let's start by the batch creation. Inspecting the actual dispatch code in the command will give us clues:

Bus::batch([new SimpleJob($first_data)])
    ->before(function (Batch $batch) use ($data) {
        event(new BatchStarted($batch));

        $batch_queue = new BatchQueue($batch, new RedisBatchQueueRepository());
        $batch_queue->data = $data;
        $batch_queue->create();
    })
    ->progress(function (Batch $batch) use ($total_jobs) {
        $nb_processed_jobs = $batch->processedJobs() + $batch->failedJobs;
        $previous_progress = $total_jobs > 0 ? round((($nb_processed_jobs - 1) / $total_jobs) * 100) : 0;
        $progress =  $total_jobs > 0 ? round(($nb_processed_jobs / $total_jobs) * 100) : 0;

        if (floor($previous_progress / 10) != floor($progress / 10)) {
            event(new BatchProgressed($batch, [
                'progress' => (int)(floor($progress / 10) * 10),
                'nb_jobs_processed' => $nb_processed_jobs,
                'nb_jobs_failed' => $batch->failedJobs,
                'nb_jobs_total' =>  $total_jobs,
            ]));
        }
    })
    ->finally(function (Batch $batch) {
        event(new BatchEnded($batch));
    })
    ->allowFailures()
    ->dispatch();

What we can see here is that a batch requires a job class and some data to be dispatched. progress() and finally() works on the current batch and don't need external input. Fine, our create() signature could be:

public function create(string $job_class, array $data): int

We provide a job class and an array of data. Batch's queue will be create from the array of data, the first job will be instantiate with the first item, then the other callbacks are defined, the batch is dispatched, its id is returned and voilà.

But our implementation is closed and don't allow tweak if needed. In some use cases, it is mandatory to do so in order to have full control of what's happening and how it's happening. It could be the case here as overriding the before() callback will break but at the same time, leaving it open allows to have progressive batches backed by another queue system if the progressive batch job is updated accordingly which is possible with its open design. Then there's the question about the $job_class parameter. Considering the implementation, it is the simplest argument we can have but we have to validate it. We could have a typed parameter like ProgressiveBatchJob $job which will avoid the need for validation but push boilerplate on the user side because there will be a need to get the first item from data and build the job with it, which is always what should be done. With all this considerations in mind, let's use this signature:

public function create(string $job_class, array $data): PendingBatch

and thus the implementation will be as follow:

# app/Core/ProgressiveBatch.php

class ProgressiveBatch
{
   public function create(string $job_class, array $data): PendingBatch
    {
        $this->validateJob($job_class);

        $total_jobs = count($data);
        $first_data = array_shift($data);

        return Bus::batch([new $job_class($first_data)])
            ->before(function (Batch $batch) use ($data) {
                event(new BatchStarted($batch));

                $batch_queue = new BatchQueue($batch, new RedisBatchQueueRepository());
                $batch_queue->data = $data;
                $batch_queue->create();
            })
            ->progress(function (Batch $batch) use ($total_jobs) {
                $nb_processed_jobs = $batch->processedJobs() + $batch->failedJobs;
                $previous_progress = $total_jobs > 0 ? round((($nb_processed_jobs - 1) / $total_jobs) * 100) : 0;
                $progress =  $total_jobs > 0 ? round(($nb_processed_jobs / $total_jobs) * 100) : 0;

                if (floor($previous_progress / 10) != floor($progress / 10)) {
                    event(new BatchProgressed($batch, [
                        'progress' => (int)(floor($progress / 10) * 10),
                        'nb_jobs_processed' => $nb_processed_jobs,
                        'nb_jobs_failed' => $batch->failedJobs,
                        'nb_jobs_total' =>  $total_jobs,
                    ]));
                }
            })
            ->finally(function (Batch $batch) {
                event(new BatchEnded($batch));
            })
            ->allowFailures();
    }

    private function validateJob(string $job_class): void
    {
        try {
            $job = new ReflectionClass($job_class);
        } catch (Throwable $e) {
            throw new InvalidArgumentException(sprintf('Job class [%s] does not exist.', $job_class));
        }
        throw_unless($job->isSubclassOf(ProgressiveBatchJob::class), new InvalidArgumentException(
            sprintf('Job class must extend %s. ', ProgressiveBatchJob::class)
        ));
    }
}

Note that the dispatch() is on user behalf.

Cancel a batch

Cancelling a batch is straightforward but there is the point where we fetch it:

$batch = Bus::findBatch($this->current_batch_id);

findBatch can return null, forcing us to assert that the retrieved batch is valid or not. Because we'll have this pattern multiple times throughout our ProgressiveBatch, we'll add some helpers to take care of this. This leaves us with this implementation:

# app/Core/ProgressiveBatch.php

class ProgressiveBatch
{
    // ...
    public function cancel(string $batch_id): void
    {
        $batch = $this->findOrFail($batch_id);
        $batch->cancel();

        // Clear the queue
        $batch_queue = new BatchQueue($batch, new RedisBatchQueueRepository());
        $batch_queue->delete();

        event(new BatchCancelled($batch));
    }

    public function find(string $batch_id): ?Batch
    {
        return Bus::findBatch($batch_id);
    }

    public function findOrFail(string $batch_id): Batch
    {
        $batch = Bus::findBatch($batch_id);
        throw_unless($batch, new InvalidArgumentException(sprintf('Batch [%s] does not exist.', $batch_id)));

        return $batch;
    }
    // ...
}

The find method seems strange as it just calls Batch::findBatch but its presence here will be clearer later.

Retry a batch

Implementation for a batch retry follows the same path as the batch cancellation then there's not much to discussed:

# app/Core/ProgressiveBatch.php

class ProgressiveBatch
{
    // ...
    public function retry(string $batch_id): void
    {
        $batch = $this->findOrFail($batch_id);
        $command_result = Artisan::call('queue:retry-batch', ['id' => $batch->id]);
        throw_unless($command_result === 0, new Exception(sprintf(
            'Failed to retry batch [%s].',
            $batch->id
        )));
    }
    // ...
}

I'm not fond of calling an artisan command from code that is not a command itself but it's still better than to copying Laravel code and have to maintain it.

Retry a specific batch job

This feature will be in two parts, just like we did in the BatchManager:

  • list the failed jobs of a batch

  • retry one one more these failed jobs

Listing failed jobs is almost a strict copy-paste from our BatchManager:

# app/Core/ProgressiveBatch.php

class ProgressiveBatch
{
    // ...
    public function listFailedJobs(string $batch_id): array
    {
        $batch = $this->findOrFail($batch_id);

        $queue_failer = app('queue.failer');
        return collect($batch->failedJobIds)
            ->map(function ($failed_job_id) use ($queue_failer) {
                // Get job data
                $job_data = $queue_failer->find($failed_job_id);
                $payload = json_decode($job_data->payload, true);
                $job_class = unserialize($payload['data']['command']);

                // Extract arg names from class constructor signature and their data from unserialized class
                $args = [];
                $job_reflection = new ReflectionClass($job_class);
                $constructor = $job_reflection->getConstructor();
                foreach ($constructor->getParameters() as $param) {
                    $param_name = $param->getName();
                    $args[$param->getName()] = $job_class->$param_name;
                }

                return [
                    'uuid' => $payload['uuid'],
                    'class' => $payload['data']['commandName'],
                    'args' => json_encode($args, JSON_PRETTY_PRINT),
                    'failed_at' => $job_data->failed_at,
                ];
            })
            ->toArray();
    }
    // ...
}

For the job retry method itself, we'll again call an artisan command but it's the safest way to retry jobs:

# app/Core/ProgressiveBatch.php

class ProgressiveBatch
{
    // ...
    public function retryJobs(string $batch_id, array $job_ids): void
    {
        $batch = $this->findOrFail($batch_id);
        $batch_failed_job_ids = collect($batch->failedJobIds);

        $invalid_job_ids = collect($job_ids)
            ->reject(fn ($job_id) => $batch_failed_job_ids->contains($job_id));

        throw_unless($invalid_job_ids->isEmpty(), new InvalidArgumentException(sprintf(
            'Job ids [%s] do not exist or are not linked to batch [%s].',
            implode(', ', $invalid_job_ids->toArray()),
            $batch->id
        )));

        $command_errors = collect($job_ids)
            ->map(
                fn ($job_id) => [
                    'job_id' => $job_id,
                    'result' => Artisan::call('queue:retry', ['id' => $job_id])
                ]
            )
            ->filter(fn ($command_result) => $command_result['result'] !== 0)
            ->pluck('job_id');

        throw_if(!$command_errors->isEmpty(), new Exception(sprintf(
            'Failed to retry jobs [%s].',
            implode(', ', $command_errors->toArray())
        )));
    }
    // ...
}

And our ProgressiveBatch abstraction is done...

A Facade for user-friendly usage

Or almost. There is this thing that could be done better: when we use our abstraction, we have to do this (new ProgressiveBatch())-> each time. It's not user-friendly, isn't it?
Let's implement a facade to tackle this issue:

# app/Facades/ProgressiveBatch.php

namespace App\Facades;

use Illuminate\Support\Facades\Facade;

/**
 * @method static Illuminate\Bus\PendingBatch create(string $job_class, array $data)
 * @method static void cancel(string $batch_id)
 * @method static void retry(string $batch_id)
 * @method static void retryJobs(string $batch_id array $job_ids)
 * @method static array listFailedJobs(string $batch_id)
 * @method static ?Illuminate\Bus\Batch find(string $batch_id)
 * @method static Illuminate\Bus\Batch findOrFail(string $batch_id)
 * 
 * @see App\Core\ProgressiveBatch
 */
class ProgressiveBatch extends Facade
{
    protected static function getFacadeAccessor()
    {
        return 'progressive_batch';
    }
}

Now, the find() makes more sense as we can work with ProgressiveBatch without having to rely on Bus to find a batch.
Last but nor least, we have to bootstrap our service:

# app/Providers/AppServiceProvider.php

class AppServiceProvider extends ServiceProvider
{
    // ...
    public function boot(): void
    {
        $this->app->singleton('progressive_batch', function () {
            return new ProgressiveBatch();
        });
    }
    // ...
}

Conclusion

Coding the abstraction was pretty quick and straight forward. This was so because we decided to not think about it in the first place, making code works first and only then refactor it. This eliminates the tendency of premature optimization, having complexity where there should not be, etc. Making code works can be tedious and focusing on only that helps to achieve this goal faster. Developing tests alongside is also a nice thing to do, as it will make the refactoring phase event smoother. Refactoring a working, tested code is easy and safe.

This concludes this series of article. While there was also technical stuff, the most important fact was: know your use cases (happy paths and edge cases) and do things step by step. Regularly take a step back and challenge your code. This way, most the problems are avoided.

I hope you enjoy this series.
See you and happy coding!

0
Subscribe to my newsletter

Read articles from Dominique VASSARD directly inside your inbox. Subscribe to the newsletter, and don't miss out.

Written by

Dominique VASSARD
Dominique VASSARD