Faster queues with Symfony and Go: When PHP isn't Fast Enough

Faster queues with Symfony and Go: When PHP isn't Fast Enough

For most applications PHP is more than capable, and I’ve never run into a situation where it has been too slow, especially as I usually process long-running tasks in the background with Symfony Messenger. In a situation where this is still not sufficient, it is possible to process these queues in a faster compiled language GO. Achieving this is easier than you would imagine.

An Example Scenario

Imagine an application where a user inputs the length and width of a shape, and the system computes the area of this shape. While this might seem straightforward, letโ€™s assume the calculation is an expensive task due to additional complexity โ€” maybe it involves large datasets, complex algorithms, or heavy computation.

In such scenarios, instead of handling the task directly in PHP, we can offload the computation to a Go worker for faster processing.

Here’s a step-by-step guide on setting this up.

1. Create a symfony project

Create a new symfony project

symfony new messenger_tuts --version="7.2.x-dev" --webapp

2. Generate Shape Entity, CRUD controllers and Migration

Next, generate the Shape entity and CRUD controllers.

php bin/console doctrine:database:create
php bin/console make:entity Shape
php bin/console make:crud Shape

Create and the database migration

php bin/console make:migration
php bin/console doctrine:migration:migrate

3. Create Message for tasks

The next thing we’ll do is use Symfony Messenger to dispatch a message to a Go worker for processing. The message will carry the ID of the shape that needs to be computed. This message will not have a MessageHandler, as it will be handled in the GO Worker.

Create the ComputeAreaMessage class

<?php

namespace App\Message;

final class ComputeAreaMessage
{
    public function __construct(
        public readonly int $id,
    ) {
    }
}

4. Messenger Configuration

Now, let’s configure Symfony Messenger to use a Redis transport for the queue and a custom JSON serializer.

framework:
    messenger:
        failure_transport: failed

        transports:
            async_redis:
                dsn: '%env(MESSENGER_TRANSPORT_REDIS_DSN)%'
                serializer: App\Serializer\JsonMessageSerializer

        default_bus: messenger.bus.default

        buses:
            messenger.bus.default: []

        routing:
            App\Message\ComputeAreaMessage: async_redis

This configuration tells Symfony to route ComputeAreaMessage through the Redis queue (async_redis) and use a custom serializer. The JSON serializer.

5. Custom Serializer

<?php

namespace App\Serializer;

use App\Message\ComputeAreaMessage;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

class JsonMessageSerializer implements SerializerInterface
{
    public function decode(array $encodedEnvelope): Envelope
    {
        return new Envelope(new ComputeAreaMessage(0));
    }

    public function encode(Envelope $envelope): array
    {
        $message = $envelope->getMessage();

        return [
            'body' => json_encode($message),
        ];
    }
}

The JsonMessageSerializer handles encoding and decoding of the message for Redis. For simplicity, the decode method uses a placeholder value (0) but you can expand it to handle incoming data if the symfony application needs to process it.

6. Event Listeners

Lastly, on the PHP side, we need an event listener that dispatches a message to the redis queue when a Shape is created or updated.

<?php

namespace App\EventListener;

use App\Entity\Shape;
use App\Message\ComputeAreaMessage;
use Doctrine\Bundle\DoctrineBundle\Attribute\AsEntityListener;
use Doctrine\ORM\Event\PostPersistEventArgs;
use Doctrine\ORM\Event\PostUpdateEventArgs;
use Doctrine\ORM\Events;
use Symfony\Component\Messenger\MessageBusInterface;

#[AsEntityListener(event: Events::postUpdate, method: 'postUpdate', entity: Shape::class)]
#[AsEntityListener(event: Events::postPersist, method: 'postPersist', entity: Shape::class)]
class ShapeListener
{
    public function __construct(
        private readonly MessageBusInterface $messageBusInterface
    ) {
    }

    public function postUpdate(Shape $shape, PostUpdateEventArgs $args): void
    {
        $this->sendMessageToQueue($shape->getId());
    }

    public function postPersist(Shape $shape, PostPersistEventArgs $args): void
    {
        $this->sendMessageToQueue($shape->getId());
    }

    private function sendMessageToQueue(int $shapeId): void
    {
        $message = new ComputeAreaMessage($shapeId);
        $this->messageBusInterface->dispatch($message);
    }
}

This is all that is needed on the PHP Side, we now only need to consume/compute the average in GO.

7. Processing messages in GO

With the PHP side set up, we can now create a Go worker to process the message from the queue. The Go worker will:

  1. Listen for incoming tasks on the redis queue.
  2. Get the database row and compute / update the area column.
  3. Remove the task from the redis queue

GO Script

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"ktarila/messenger_tuts/database"
	"log"
	"time"

	"github.com/elliotchance/phpserialize"
	"github.com/go-redis/redis/v8"
)

type RedisStream struct {
	Fields struct {
		Message string `json:"message"`
	} `json:"fields"`
	ID string `json:"id"`
}

// Outer structure for the JSON object
type Message struct {
	Body    string        `json:"body"`
	Headers []interface{} `json:"headers"` // Assuming headers is an array of unknown types
}

type Body struct {
	ID           int    `json:"id"`
	FunctionName string `json:"functionName"`
}

func main() {
	// Initialize the database
	database.Init("/home/ktarila/work/messenger_tuts/var/data.db")
	defer database.DB.Close() // Close the database connection when the program ends

	ctx := context.Background()
	rdb := redis.NewClient(&redis.Options{
		Addr: "localhost:6379", // Redis server address
	})

	stream := "messages" // Your Symfony Redis queue name

	for {
		// Read messages from the stream
		streams, err := rdb.XRead(ctx, &redis.XReadArgs{
			Streams: []string{stream, "0"},
			Count:   1,
			Block:   50 * time.Millisecond,
		}).Result()

		if err != nil {
			if err == redis.Nil {
				// No new messages within the block timeout
				continue
			}
			log.Fatalf("Error reading stream: %v", err)
		}

		// Process each message
		for _, msgStream := range streams {
			for _, message := range msgStream.Messages {
				fmt.Printf("\nMessage ID: %s\n", message.ID)
				jsonMessage := map[string]interface{}{
					"id":     message.ID,
					"fields": message.Values,
				}

				// Serialize JSON
				jsonData, err := json.Marshal(jsonMessage)
				if err != nil {
					log.Printf("Error serializing message to JSON: %v", err)
					continue
				}

				var redisStream RedisStream
				if err := json.Unmarshal(jsonData, &redisStream); err != nil {
					fmt.Println("Error unmarshalling Redis stream:", err)
					return
				}

				// Parse the actual message body
				var msg string
				err = phpserialize.Unmarshal([]byte(redisStream.Fields.Message), &msg)
				if err != nil {
					log.Printf("Failed to unmarshal message body: %v", err)
					continue
				}

				var content Message
				err = json.Unmarshal([]byte(msg), &content)
				if err != nil {
					fmt.Println("Error unmarshaling JSON:", err)
					continue
				}

				var body Body
				err = json.Unmarshal([]byte(content.Body), &body)
				if err != nil {
					fmt.Println("Error unmarshaling JSON:", err)
					continue
				}

				database.UpdateShapeArea(body.ID)

				_, err = rdb.XDel(ctx, stream, message.ID).Result()
				if err != nil {
					log.Fatalf("Error deleting message: %v\n", err)
				}
			}
		}
	}
}

Notice that SQlite is used, this script is used to update and fetch database rows

package database

import (
	"database/sql"
	"fmt"
	"log"

	_ "github.com/mattn/go-sqlite3" // SQLite driver
)

// DB is a global variable to hold the database connection
var DB *sql.DB

// User struct represents a user in the database
type Shape struct {
	ID     int
	Width  float32
	Height float32
	Area   *float32
}

// Init initializes the SQLite database
func Init(dbPath string) {
	var err error
	DB, err = sql.Open("sqlite3", dbPath)
	if err != nil {
		log.Fatalf("Error opening database: %v", err)
	}

	fmt.Println("Database initialized.")
}

// GetUsers retrieves all users from the database
func GetShapes() {
	query := `SELECT id, width, height, area FROM shape`
	rows, err := DB.Query(query)
	if err != nil {
		log.Fatalf("Error querying users: %v", err)
	}
	defer rows.Close()

	fmt.Println("Shapes in the database:")
	for rows.Next() {
		var id int
		var width float32
		var height float32
		var area *float32 = nil
		err := rows.Scan(&id, &width, &height, &area)
		if err != nil {
			log.Fatalf("Error scanning row: %v", err)
		}

		fmt.Printf("ID: %d, Width: %.2f, Height: %.2f\n", id, width, height)
	}
}

func UpdateShapeArea(id int) {
	updateQuery := `UPDATE shape SET area = width * height WHERE id = ?`
	_, err := DB.Exec(updateQuery, id)
	if err != nil {
		log.Fatalf("Error updating user: %v", err)
	}
	fmt.Printf("Updated area for shape with id  %d\n", id)
}

func GetShapeByID(id int) (*Shape, error) {
	query := "SELECT id, width, height, area FROM shape WHERE id = ?"

	var shape Shape

	// QueryRow for a single result
	err := DB.QueryRow(query, id).Scan(&shape.ID, &shape.Width, &shape.Height, &shape.Area)
	if err == sql.ErrNoRows {
		return nil, fmt.Errorf("no shape found with id %d", id)
	} else if err != nil {
		return nil, err
	}

	return &shape, nil
}

8. Run the Symfony Application and Go Worker

  1. Start the Symfony server:

    symfony serve
    
  2. Run the Go worker:

    go run main.go
    

Conclusion

You now have a Symfony CRUD application that offloads expensive computations to a Go worker via Symfony Messenger and Redis. This architecture improves performance and scalability, letting PHP handle web logic while Go takes care of resource-intensive tasks.

the symfony application that is accessible from the web browser on https://127.0.0.1:8000/. When a shape is updated or created, the average is computed/updated from the Go worker.

Here are the links to the complete code on GitHub:

Happy coding! ๐Ÿš€

Feel free to use this Markdown content for your blog or website. If you have any more requests or questions, let me know!

comments powered by Disqus

Related Posts

Starting a Blog with Hugo and Adding a Contact Form

Starting a Blog with Hugo and Adding a Contact Form

Embarking on the journey of creating a personal blog or website often begins with the quest for simplicity and speed.

Read More