Celery Worker Execution From PHP
Use of Rhubarb is outlined as follows.
Send Task and Wait For Result
use \Rhubarb\Exception\TimeoutException;
$rhubarb = new \Rhubarb\Rhubarb($options);
try {
$task = $rhubarb->sendTask('task.add', array(2,2));
$task->delay();
$result = $task->get();
} catch (TimeoutException $e) {
$log->error('task failed to return in default timelimit [10] seconds');
}
Fire And Forget Task
use \Rhubarb\Exception\TimeoutException;
$rhubarb = new \Rhubarb\Rhubarb($options);
try {
$task = $rhubarb->sendTask('task.add', array(2,2));
$result = $task->delay();
} catch (TimeoutException $e) {
$log->error('task failed to return in default timelimit [10] seconds');
}
Getting Task Status
use \Rhubarb\Exception\TimeoutException;
$rhubarb = new \Rhubarb\Rhubarb($options);
$task = $rhubarb->sendTask('task.add', array(2,2));
$result = $task->delay();
while (!$task->successful()) {
echo $task->state(), PHP_EOL;
// You should have some time based break; statement here
}
var_dump($task->get());
KWARG Support
use \Rhubarb\Exception\TimeoutException;
$rhubarb = new \Rhubarb\Rhubarb($options);
try {
$task = $rhubarb->sendTask('task.add', array('arg1' => 2, 'arg2' => 2));
$result = $task->delay();
var_dump($task->get());
} catch (TimeoutException $e) {
$log->error('task failed to return in default timelimit [10] seconds');
}
Method Specific Queue and/or Exchange
use \Rhubarb\Exception\TimeoutException;
$rhubarb = new \Rhubarb\Rhubarb($options);
try {
$task = $rhubarb->sendTask('task.add', array('arg1' => 2, 'arg2' => 2));
$task->getMessage()
->setPropQueue('priority.high')
->setPropExchange('queue.other');
$result = $task->delay();
var_dump($task->get());
} catch (TimeoutException $e) {
$log->error('task failed to return in default timelimit [10] seconds');
}
At runtime it may become necessary to utilize a different queue, exchange or various runtime options. These options may be passed to the __delay__ method when called:
Supported Options are:
Example
$rhubarb = new \Rhubarb\Rhubarb($options);
$res = $rhubarb->sendTask('subtract', array(3, 2));
$res->delay(
array(
'queue' => 'priority.high',
'exchange' => 'subtract_queue'
)
);
$result = $res->get(2);
$this->assertEquals(1, $result);