Faster queues with Symfony and Go: When PHP isn't Fast Enough
- Patrick Kenekayoro
- Php , Go , Symfony , Queues
- December 15, 2024
For most applications PHP is more than capable, and I’ve never run into a situation where it has been too slow, especially as I usually process long-running tasks in the background with Symfony Messenger. In a situation where this is still not sufficient, it is possible to process these queues in a faster compiled language GO. Achieving this is easier than you would imagine.
An Example Scenario
Imagine an application where a user inputs the length and width of a shape, and the system computes the area of this shape. While this might seem straightforward, letโs assume the calculation is an expensive task due to additional complexity โ maybe it involves large datasets, complex algorithms, or heavy computation.
In such scenarios, instead of handling the task directly in PHP, we can offload the computation to a Go worker for faster processing.
Here’s a step-by-step guide on setting this up.
1. Create a symfony project
Create a new symfony project
symfony new messenger_tuts --version="7.2.x-dev" --webapp
2. Generate Shape Entity, CRUD controllers and Migration
Next, generate the Shape
entity and CRUD controllers.
php bin/console doctrine:database:create
php bin/console make:entity Shape
php bin/console make:crud Shape
Create and the database migration
php bin/console make:migration
php bin/console doctrine:migration:migrate
3. Create Message for tasks
The next thing we’ll do is use Symfony Messenger to dispatch a message to a Go worker for processing. The message will carry the ID of the shape that needs to be computed. This message will not have a MessageHandler, as it will be handled in the GO Worker.
Create the ComputeAreaMessage
class
<?php
namespace App\Message;
final class ComputeAreaMessage
{
public function __construct(
public readonly int $id,
) {
}
}
4. Messenger Configuration
Now, let’s configure Symfony Messenger to use a Redis transport for the queue and a custom JSON serializer.
framework:
messenger:
failure_transport: failed
transports:
async_redis:
dsn: '%env(MESSENGER_TRANSPORT_REDIS_DSN)%'
serializer: App\Serializer\JsonMessageSerializer
default_bus: messenger.bus.default
buses:
messenger.bus.default: []
routing:
App\Message\ComputeAreaMessage: async_redis
This configuration tells Symfony to route ComputeAreaMessage through the Redis queue (async_redis) and use a custom serializer. The JSON serializer.
5. Custom Serializer
<?php
namespace App\Serializer;
use App\Message\ComputeAreaMessage;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
class JsonMessageSerializer implements SerializerInterface
{
public function decode(array $encodedEnvelope): Envelope
{
return new Envelope(new ComputeAreaMessage(0));
}
public function encode(Envelope $envelope): array
{
$message = $envelope->getMessage();
return [
'body' => json_encode($message),
];
}
}
The JsonMessageSerializer handles encoding and decoding of the message for Redis. For simplicity, the decode method uses a placeholder value (0) but you can expand it to handle incoming data if the symfony application needs to process it.
6. Event Listeners
Lastly, on the PHP side, we need an event listener that dispatches a message to the redis queue when a Shape
is created or updated.
<?php
namespace App\EventListener;
use App\Entity\Shape;
use App\Message\ComputeAreaMessage;
use Doctrine\Bundle\DoctrineBundle\Attribute\AsEntityListener;
use Doctrine\ORM\Event\PostPersistEventArgs;
use Doctrine\ORM\Event\PostUpdateEventArgs;
use Doctrine\ORM\Events;
use Symfony\Component\Messenger\MessageBusInterface;
#[AsEntityListener(event: Events::postUpdate, method: 'postUpdate', entity: Shape::class)]
#[AsEntityListener(event: Events::postPersist, method: 'postPersist', entity: Shape::class)]
class ShapeListener
{
public function __construct(
private readonly MessageBusInterface $messageBusInterface
) {
}
public function postUpdate(Shape $shape, PostUpdateEventArgs $args): void
{
$this->sendMessageToQueue($shape->getId());
}
public function postPersist(Shape $shape, PostPersistEventArgs $args): void
{
$this->sendMessageToQueue($shape->getId());
}
private function sendMessageToQueue(int $shapeId): void
{
$message = new ComputeAreaMessage($shapeId);
$this->messageBusInterface->dispatch($message);
}
}
This is all that is needed on the PHP Side, we now only need to consume/compute the average in GO.
7. Processing messages in GO
With the PHP side set up, we can now create a Go worker to process the message from the queue. The Go worker will:
- Listen for incoming tasks on the redis queue.
- Get the database row and compute / update the area column.
- Remove the task from the redis queue
GO Script
package main
import (
"context"
"encoding/json"
"fmt"
"ktarila/messenger_tuts/database"
"log"
"time"
"github.com/elliotchance/phpserialize"
"github.com/go-redis/redis/v8"
)
type RedisStream struct {
Fields struct {
Message string `json:"message"`
} `json:"fields"`
ID string `json:"id"`
}
// Outer structure for the JSON object
type Message struct {
Body string `json:"body"`
Headers []interface{} `json:"headers"` // Assuming headers is an array of unknown types
}
type Body struct {
ID int `json:"id"`
FunctionName string `json:"functionName"`
}
func main() {
// Initialize the database
database.Init("/home/ktarila/work/messenger_tuts/var/data.db")
defer database.DB.Close() // Close the database connection when the program ends
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379", // Redis server address
})
stream := "messages" // Your Symfony Redis queue name
for {
// Read messages from the stream
streams, err := rdb.XRead(ctx, &redis.XReadArgs{
Streams: []string{stream, "0"},
Count: 1,
Block: 50 * time.Millisecond,
}).Result()
if err != nil {
if err == redis.Nil {
// No new messages within the block timeout
continue
}
log.Fatalf("Error reading stream: %v", err)
}
// Process each message
for _, msgStream := range streams {
for _, message := range msgStream.Messages {
fmt.Printf("\nMessage ID: %s\n", message.ID)
jsonMessage := map[string]interface{}{
"id": message.ID,
"fields": message.Values,
}
// Serialize JSON
jsonData, err := json.Marshal(jsonMessage)
if err != nil {
log.Printf("Error serializing message to JSON: %v", err)
continue
}
var redisStream RedisStream
if err := json.Unmarshal(jsonData, &redisStream); err != nil {
fmt.Println("Error unmarshalling Redis stream:", err)
return
}
// Parse the actual message body
var msg string
err = phpserialize.Unmarshal([]byte(redisStream.Fields.Message), &msg)
if err != nil {
log.Printf("Failed to unmarshal message body: %v", err)
continue
}
var content Message
err = json.Unmarshal([]byte(msg), &content)
if err != nil {
fmt.Println("Error unmarshaling JSON:", err)
continue
}
var body Body
err = json.Unmarshal([]byte(content.Body), &body)
if err != nil {
fmt.Println("Error unmarshaling JSON:", err)
continue
}
database.UpdateShapeArea(body.ID)
_, err = rdb.XDel(ctx, stream, message.ID).Result()
if err != nil {
log.Fatalf("Error deleting message: %v\n", err)
}
}
}
}
}
Notice that SQlite is used, this script is used to update and fetch database rows
package database
import (
"database/sql"
"fmt"
"log"
_ "github.com/mattn/go-sqlite3" // SQLite driver
)
// DB is a global variable to hold the database connection
var DB *sql.DB
// User struct represents a user in the database
type Shape struct {
ID int
Width float32
Height float32
Area *float32
}
// Init initializes the SQLite database
func Init(dbPath string) {
var err error
DB, err = sql.Open("sqlite3", dbPath)
if err != nil {
log.Fatalf("Error opening database: %v", err)
}
fmt.Println("Database initialized.")
}
// GetUsers retrieves all users from the database
func GetShapes() {
query := `SELECT id, width, height, area FROM shape`
rows, err := DB.Query(query)
if err != nil {
log.Fatalf("Error querying users: %v", err)
}
defer rows.Close()
fmt.Println("Shapes in the database:")
for rows.Next() {
var id int
var width float32
var height float32
var area *float32 = nil
err := rows.Scan(&id, &width, &height, &area)
if err != nil {
log.Fatalf("Error scanning row: %v", err)
}
fmt.Printf("ID: %d, Width: %.2f, Height: %.2f\n", id, width, height)
}
}
func UpdateShapeArea(id int) {
updateQuery := `UPDATE shape SET area = width * height WHERE id = ?`
_, err := DB.Exec(updateQuery, id)
if err != nil {
log.Fatalf("Error updating user: %v", err)
}
fmt.Printf("Updated area for shape with id %d\n", id)
}
func GetShapeByID(id int) (*Shape, error) {
query := "SELECT id, width, height, area FROM shape WHERE id = ?"
var shape Shape
// QueryRow for a single result
err := DB.QueryRow(query, id).Scan(&shape.ID, &shape.Width, &shape.Height, &shape.Area)
if err == sql.ErrNoRows {
return nil, fmt.Errorf("no shape found with id %d", id)
} else if err != nil {
return nil, err
}
return &shape, nil
}
8. Run the Symfony Application and Go Worker
Start the Symfony server:
symfony serve
Run the Go worker:
go run main.go
Conclusion
You now have a Symfony CRUD application that offloads expensive computations to a Go worker via Symfony Messenger and Redis. This architecture improves performance and scalability, letting PHP handle web logic while Go takes care of resource-intensive tasks.
the symfony application that is accessible from the web browser on https://127.0.0.1:8000/. When a shape is updated or created, the average is computed/updated from the Go worker.
Here are the links to the complete code on GitHub:
- Go Worker: https://github.com/ktarila/consume_messenger_tuts
- Symfony Project: https://github.com/ktarila/messenger_tuts
Happy coding! ๐
Feel free to use this Markdown content for your blog or website. If you have any more requests or questions, let me know!