Recreating Laravel Forge atomic locks example to prevent overlaps for crucial tasks

November 5, 2020

Job middleware is a pretty cool newer(ish) feature of Laravel. It works just like regular HTTP middleware. Every job that has a registered middleware will run through each middleware in the list before it's handle method is called. Which means you can do all funky stuff with it.

Atomic locks have been in Laravel for a very long time. I always thought there are not enough articles on how to use atomic locks, so I decided to come up with one. Today I'll show you how you may combine atomic locks with job middleware to recreate the Laravel Forge example from the documentation:

For example, Laravel Forge uses atomic locks to ensure that only one remote task is being executed on a server at a time.

To get started, imagine you have a Server model that can execute shell scripts on the actual remote machine. What we want achieve is ensuring that only one shell script is running at a time, to prevent any overlaps or potential failures that may occur. For example, software updates and many other tasks can fail if there's an existing process running the same command. To give an illustratation how an overlap can harm Forge or an application similar to Forge, we'll use the example of syncing SSH keys on the server.

Example

Let's imagine the following idea. Once owner of the server grants access to new user, we'll create a file in the server's SSH keys directory with the contents of user's public SSH key. Then, we'll assemble a single file containing all SSH keys based on all specific files in the directory. Essentially, each time a key is added or removed, we'll create a single file that contains all current keys. Can you see how that may potentially harm us? If we performed multiple operations very quickly, like allowing and denying certain user access, we'd be overriding the same file in a nondeterministic matter and we got a race condition because multiple scripts will be racing for resources. In that case, we'd probably want to prevent any overlaps that may happen so we can be sure users' whose keys got deleted definitely won't have shell access, and those that have been added can access the server.

Syncing SSH keys takes a few seconds, so we want to offload that to a queue. Imagine that we have a job that looks kinda like this:

class SyncShellKey implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected $server;
    protected $user;
    protected $revokeAccess;

    public function __construct(Server $server, User $user, $revokeAccess = false)
    {
        $this->server = $server;
        $this->user = $user;
        $this->revokeAccess = $revokeAccess;
    }

    public function handle()
    {
        if ($this->revokeAccess) {
            $this->server->execute("./revoke-access.sh {$this->user->key}");
        } else {
            $this->server->execute("./grant-access.sh {$this->user->key}");
        }
    }
}

When manipulating a user, we'd call:

SyncShellKey::dispatch($server, $userToAdd, $revoke = false);
SyncShellKey::dispatch($server, $userToRevoke, $revoke = true);

If executing shell scripts happens synchronously (meaning queued job will not complete until script ends) and if we have multiple workers, these jobs will run in parallel and result in potential mistake.

Implementation

We can leverage a thing called atomic locks as well as job middleware to prevent this from happening. Let's create a WithoutTaskOverlaps middleware class that may look something like this:

class WithoutTaskOverlaps
{
    public $server;

    public function __construct(Server $server)
    {
        $this->server = $server;
    }

    public function handle($job, $next)
    {
        $next($job);
    }
}

This thing currently does nothing... It just passes the job onto the next middleware in the stack and finally runs the job. We want to try to acquire an atomic lock before running a job. If the lock couldn't be obtained (another task is already running), we'll release the job back onto the queue to retry again after a few seconds. However, if the lock was obtained, we'll wait until job ends then release the lock so another tasks can run. Atomic locks require a "key". Since we have this concept of "tasks" on a server, we'll generate a key based on the server ID. Modified handle method may look like this:

public function handle($job, $next)
{
    $lock = Cache::lock('tasks:'.$this->server->id);

    if (! $lock->get()) {
        return $job->release(5);
    }

    try {
        $next($job);
    } finally {
        $lock->release();
    }
}

We wrap the job call into try-finally block so that if any exceptions happen, we'll still release the lock. Since we're releasing a job back to the queue, we should probably specify how long the job should be attempted before it's marked as failed. To do this, you can implement the retryUntil method. We also need to register a our new middleware as well.

class SyncShellKey implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected $server;
    protected $user;
    protected $revokeAccess;

    public function __construct(Server $server, User $user, $revokeAccess = false)
    {
        $this->server = $server;
        $this->user = $user;
        $this->revokeAccess = $revokeAccess;
    }

    public function handle()
    {
        if ($this->revokeAccess) {
            $this->server->execute('./revoke-access.sh ' . $this->user->id);
        } else {
            $this->server->execute('./grant-access.sh ' . $this->user->id);
        }
    }

    public function middleware()
    {
        return [
            new WithoutTaskOverlaps($this->server),
        ];
    }

    public function retryUntil()
    {
        return Carbon::now()->addMinutes(5);
    }
}

Actually, recent versions of Laravel ship with the Illuminate\Queue\Middleware\WithoutOverlapping middleware that looks very similarly to the one we created, so you don't have to implement it yourself.

Scripts running in background?

Congrats, it now works just fine, right? And it's all done! Wrong! What if you have a long running task that you don't want to wait for to finish? For example, running a deployment script on Forge can take a long time, like 10 minutes. We don't want our queued job to run for 10 minutes and clog our queue. If we run our deployment asynchronously and ping back after it's completed, this middleware won't work. Lock will be released as soon as the task starts up on the server, but that task might be actually still running in the background. If we dispatch another job to the queue, we'll get an overlap.

We can solve this by only releasing a lock when the shell script actually ends and pings our backend back. To do this, we need to store the lock owner in the database so it can be released on completion. Modified job may look something like this:

public function handle()
{
    $this->server->executeAsync('./deploy.sh', function ($server) {
        Cache::restoreLock("tasks:{$server->id}", $server->lock_owner)->release();

        $server->fill([
            'lock_owner' => null,
        ])->save();
    });
}

Important thing to note is that callback is only called once the script actually ends and pings our backend. Hint: you'd also store the callback in the database so that it can be invoked once script ends ;)

Modified middleware

We also need to modify the middleware to not release the lock once queued job finishes since we only want the callback to release the lock. Modified middleware may look something like this:

class WithoutTaskOverlaps
{
    public $server;
    public $dontReleaseLock = false;

    public function __construct(Server $server)
    {
        $this->server = $server;
    }

    public function handle($job, $next)
    {
        $lock = Cache::lock('tasks:'.$this->server->id);

        if (! $lock->get()) {
            return $job->release(5);
        }

        $this->server->fill([
            'lock_owner' => $lock->owner(),
        ])->save();

        try {
            $next($job);
        } finally {
            if (! $this->dontReleaseLock) {
                $lock->release();

                $this->server->fill([
                    'lock_owner' => null,
                ])->save();
            }
        }
    }

    public function withoutReleasingLock()
    {
        $this->dontReleaseLock = true;

        return $this;
    }
}

Note that once lock is obtained, we store the owner in the database. Now, when the job ends, we only release lock and remove the lock owner if the script runs in the foreground. Registering the middleware is pretty much the same:

public function handle()
{
    $this->server->executeAsync('./deploy.sh', function ($server) {
        Cache::restoreLock("tasks:{$server->id}", $server->lock_owner)->release();

        $server->fill([
            'lock_owner' => null,
        ])->save();
    });
}

public function retryUntil()
{
    return Carbon::now()->addMinutes(10);
}

public function middleware()
{
    return [
        (new WithoutTaskOverlaps($this->server))->withoutReleasingLock(),
    ];
}

Cool thing about this is that this exact middleware can be reused on every single shell task (managing daemons, deployments, keys, etc) and all of them will run in sequence. :)

Sign up for my newsletter

You'll get notified as soon as I write something! I don't send spam!