ZeroMQ

installation sous Linux Debian

Création de l'extension php-zmq pour php < 5.4

1/ aptitude install libzmq-dev
    - libpgm-5.1-0 amd64 5.1.118-1~dfsg-0.1 [181 kB]
    - libzmq1 amd64 2.2.0+dfsg-2 [243 kB]
    - libzmq-dev amd64 2.2.0+dfsg-2 [369 kB]

2/ Récupération du binding PHP de 0MQ sur le repo git
    git clone git://github.com/mkoppanen/php-zmq.git
    
3/ Compilation / Installation
cd php-zmq
phpize
./configure
make
sudo make install
mettre : extension=zmq.so dans php.ini

Utilisation

Protocole send/receive

CLIENT: zmq_client_hello.php

<?php
$context = new ZMQContext();

// Socket to talk to server
echo "Connecting to hello world server…\n";
$requester = new ZMQSocket($context, ZMQ::SOCKET_REQ);
$requester->connect("tcp://localhost:5555");

for ($request_nbr = 0; $request_nbr != 10; $request_nbr++) {
    printf ("Sending request %d…\n", $request_nbr);
    $requester->send("Hello");

    $reply = $requester->recv();
    printf ("Received reply %d: [%s]\n", $request_nbr, $reply);
}

SERVEUR: zmq_server_hello.php

<?php
$context = new ZMQContext(1);

// Socket to talk to clients
$responder = new ZMQSocket($context, ZMQ::SOCKET_REP);
$responder->bind("tcp://*:5555");

while (true) {

    // Wait for next request from client
    $request = $responder->recv();
    printf ("Received request: [%s]\n", $request);

    // Do some 'work'
    sleep (1);

    // Send reply back to client
    $responder->send("World");

}

 

Le server est d'abord activé: #> php zmq_serveur_hello.php &
Puis on lance le client : #> php zmq_client_hello.php
SORTIE:
Connecting to hello world server…
Sending request 0…
Received request: [Hello]
Received reply 0: [World]
…………………………….
Sending request 9…
Received request: [Hello]
Received reply 9: [World]

 

 

 

NB: Les entrées sorties sont bloquantes, le serveur n'envoie un nouveau message que lorsqu'il a reçu une réponse du client.

Protocole publisher/subscriber

PUBLISHER: zmq_publisher.php

<?php
$context = new ZMQContext();
$publisher = $context->getSocket(ZMQ::SOCKET_PUB);
$publisher->bind("tcp://*:5556");
usleep(250000);

$i = 10;
$N = 0;

while ($i--) {
    echo "\n";
    $N++;
    $application = 'SAGA';
    $filepath = '/storage/BUDI/'.md5(microtime(true)).'/import.sql';
    $update = sprintf ("%s %s %d", $application, $filepath, $N);
    $publisher->send($update);
    echo 'PUBLISH : '.$update."\n";

    // we are sleeping for 10ms
    usleep(10000);
}

echo "\n";

ATTENTION: Le usleep(250000) au début du code, force le publisher à attendre que la connexion s'établisse avant d'envoyer quoique ce soit. On ne doit pas utiliser cette méthode en production, mais utiliser la méthode synchronised pub/sub que l'on verra plus bas.

SUBSCRIBER: zmq_subscriber.php

<?php
$context = new ZMQContext();

// Socket to talk to server
echo "Waiting for a SAGA file to process...", PHP_EOL;
$subscriber = new ZMQSocket($context, ZMQ::SOCKET_SUB);
$subscriber->connect("tcp://localhost:5556");

$subscriber->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, 'SAGA');

while (TRUE) {
    echo "\n";
    $string = $subscriber->recv();
    //error_log('receive data');
    sscanf ($string, "%s %s %d", $application, $filepath, $requete);
    $update = 'Processing request '.$requete.' for '.$application.' file : '
     .$filepath."\n";
    echo $update."\n";

    // On fait semblant de travailler pendant 5 secondes
    sleep(5);

}

 

Sortie

#> php5 zmq_subscriber.php &
Waiting for a SAGA file to process...

#> php5 zmq_publisher.php
PUBLISH : SAGA /storage/BUDI/e8f32d7c1ed30eb4fd44810cf4005c51/import.sql 1
Processing request 1 for SAGA file : /storage/BUDI/e8f32d7c1ed30eb4fd44810cf4005c51/import.sql

PUBLISH : SAGA /storage/BUDI/400254c67f949f090f36df6fc2967e11/import.sql 2
PUBLISH : SAGA /storage/BUDI/a75de93d32e8ccda50668c5edc7e0db7/import.sql 3
PUBLISH : SAGA /storage/BUDI/0a64e203acbab90cf89d8c460f81a4ad/import.sql 4
PUBLISH : SAGA /storage/BUDI/934883e6e22ca9775b1c63f37449707d/import.sql 5
PUBLISH : SAGA /storage/BUDI/17c5c4eac5b3f54f1a6fbfdb8b18c392/import.sql 6
PUBLISH : SAGA /storage/BUDI/340eb5dab844900b4e2555375d38abf5/import.sql 7
PUBLISH : SAGA /storage/BUDI/def197e9634d313bd5a9b74fe42d8701/import.sql 8
PUBLISH : SAGA /storage/BUDI/ce73ffc45e72e8028b63809b4a084d4b/import.sql 9
PUBLISH : SAGA /storage/BUDI/893dc8ee14cd7945e97a5245ec1f317a/import.sql 10

Processing request 2 for SAGA file : /storage/BUDI/400254c67f949f090f36df6fc2967e11/import.sql
Processing request 3 for SAGA file : /storage/BUDI/a75de93d32e8ccda50668c5edc7e0db7/import.sql
Processing request 4 for SAGA file : /storage/BUDI/0a64e203acbab90cf89d8c460f81a4ad/import.sql
Processing request 5 for SAGA file : /storage/BUDI/934883e6e22ca9775b1c63f37449707d/import.sql
Processing request 6 for SAGA file : /storage/BUDI/17c5c4eac5b3f54f1a6fbfdb8b18c392/import.sql
Processing request 7 for SAGA file : /storage/BUDI/340eb5dab844900b4e2555375d38abf5/import.sql
Processing request 8 for SAGA file : /storage/BUDI/def197e9634d313bd5a9b74fe42d8701/import.sql
Processing request 9 for SAGA file : /storage/BUDI/ce73ffc45e72e8028b63809b4a084d4b/import.sql
Processing request 10 for SAGA file : /storage/BUDI/893dc8ee14cd7945e97a5245ec1f317a/import.sql

 

Protocole publish/subscribe synchronisé

Comme vu dans l'exemple précédent, il existe  une latence due au temps d'établissement de la connexion entre le publisher et le subscriber. Nous avions mis un délai de 250 ms avant d'envoyer la moindre donnée pour être sûr que le subscriber recoive la donnée. Ceci n'est pas applicable en production.

Nous allons utiliser un pattern différent:
1/ Le subscriber établit la connexion comme dans l'exemple send/receive, et attend que le publisher réponde.
2/ Le publisher répond (c'est la synchronisation), puis peu alors publier son message.

 

 

SYNCHRONIZED PUBLISHER

<?php
define("SUBSCRIBERS_EXPECTED", 1);

$context = new ZMQContext();

// Socket pub/sub
$publisher = new ZMQSocket($context, ZMQ::SOCKET_PUB);
$publisher->bind("tcp://*:5561");

// Socket send/receive
$syncservice = new ZMQSocket($context, ZMQ::SOCKET_REP);
$syncservice->bind("tcp://*:5562");

// ======= SYNCHRO SERVICE ===========================

$subscribers = 0;

while ($subscribers < SUBSCRIBERS_EXPECTED) {

    // - wait for synchronization request
    $string = $syncservice->recv();

    // - send synchronization reply
    $syncservice->send("READY");
    $subscribers++;
}

// ======= PUBLISH SERVICE ===========================

$i = 5;
$N = 0;

while ($i--) {
    echo "\n";
    $N++;
    $application = 'SAGA';
    $filepath = '/storage/BUDI/'.md5(microtime(true)).'/import.sql';       

    $update = sprintf ("%s %s %d", $application, $filepath, $N);
    $publisher->send($update);
    echo 'PUBLISH : '.$update."\n";
}
echo "\n";

sleep(1);

 

SYNCHRONIZED SUBSCRIBER

$context = new ZMQContext();

// First, connect our subscriber socket
$subscriber = $context->getSocket(ZMQ::SOCKET_SUB);
$subscriber->connect("tcp://localhost:5561");
$subscriber->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "SAGA");

// Socket pub/sub
$syncclient = $context->getSocket(ZMQ::SOCKET_REQ);
$syncclient->connect("tcp://localhost:5562");

// ======= SYNCHRO SERVICE ===========================
// - send a synchronization request
$syncclient->send("READY?");

// - wait for synchronization reply
$string = $syncclient->recv();

// ======= SUBSCRIBE SERVICE =========================
while (TRUE) {
    echo "\n";   

    $string = $subscriber->recv();
    //error_log('receive data');
    sscanf ($string, "%s %s %d", $application, $filepath, $requete);

    $update = 'Processing request '.$requete.' for '.$application.' file : '
     .$filepath."\n";
    echo $update."\n";

    // On fait semblant de travailler pendant 5 secondes
    sleep(5);

}

 

Sortie

#> php5 zmq_sync_subscriber.php &
#> php5 zmq_sync_publisher.php

PUBLISH : SAGA /storage/BUDI/04c0b888de1d8bb1a971241ba10c1a7b/import.sql 1
PUBLISH : SAGA /storage/BUDI/4198d71eba22ca52b734cc7b074feba0/import.sql 2
PUBLISH : SAGA /storage/BUDI/8538afdc5ab687daa8ed702d24ac41fd/import.sql 3

Processing request 1 for SAGA file : /storage/BUDI/04c0b888de1d8bb1a971241ba10c1a7b/import.sql

PUBLISH : SAGA /storage/BUDI/8538afdc5ab687daa8ed702d24ac41fd/import.sql 4
PUBLISH : SAGA /storage/BUDI/7b41c6e67d150be0dea150965f7c9382/import.sql 5

Processing request 2 for SAGA file : /storage/BUDI/4198d71eba22ca52b734cc7b074feba0/import.sql
Processing request 3 for SAGA file : /storage/BUDI/8538afdc5ab687daa8ed702d24ac41fd/import.sql
Processing request 4 for SAGA file : /storage/BUDI/8538afdc5ab687daa8ed702d24ac41fd/import.sql
Processing request 5 for SAGA file : /storage/BUDI/7b41c6e67d150be0dea150965f7c9382/import.sql

 

Utilisation pour les bulk-import