Skip to content

Workers

Jason M edited this page Jun 13, 2017 · 5 revisions

Synopsis

Workers are the work-horse of your Daemon. Workers are just like Tasks except they can return results to the parent process simply by using return from your worker. Workers can also stick around indefinitely to process more than 1 request, so you don't have extra overhead of having to re-fork Workers over and over (unless you want to depending on your use-case). Workers may also have multiple background processes waiting to process a request. This allows multiple-processing of worker requests, which can speed up your daemon tremendously.

Workers should be created in your Daemon::initialize() method just like Plugins. To create a Worker, you call Daemon::addWorker($worker, $alias = null, $ipc = null) with $worker as one of the types below.

  • A Closure: function(){...}
  • A callable: [$myObject, 'methodName']
  • An object instance that optionally implements WorkerInterface: new MyWorker(). A worker can always just be a plain old object. Implementing WorkerInterface allows your worker to have an initialize() method of it's own that will get called in the forked process once.
  • A FQCN string: 'Lifo\Daemon\Worker\SimpleWorker'. The class will be instantiated in the parent process since the parent needs a copy of the object to do it's magic.

The $alias will default to the snake_case of the $worker class name with any trailing '_worker' string stripped off. If your worker is a Closure then the alias will always literally be 'closure', so you'll probably want to provide an alias in those cases. All workers must have a unique alias. If you attempt to create two different workers with the same alias an exception will be thrown.

The $ipc will default to \Lifo\Daemon\IPC\SysV which will use Shared Memory and Message Queues for parent/worker communication.

Behind The Scenes

When you add a Worker to your daemon, what you actually get returned to you is a Mediator object. The mediator is responsible for all the magic that allows workers to communicate, replicate and return data to the parent process.

When a Worker exits (or killed by the OS), the ProcessManager Plugin will reap it and the Mediator will clean it up, as needed, sending any results (or exceptions) to the parent process. You never have to worry about a rogue Worker process (unless your code runs-amok and kills memory/cpu).

If your Worker is a Closure or callable, all you can do is call the Worker and get a result. If, however, your Worker is an object. Any public method defined in your worker will be available to be called and processed in the background. This way your worker can do all sorts of things, as needed by your business logic. Your Worker must not contain any overlapping public methods that the Mediator also contains. If this occurs an exception is thrown.

Example

An example of a very trivial worker.

class MyWorker {
  public function getUrls($endPoint) {
    sleep(1); // fake latency
    // pretend we fetched some urls from the remote REST $endPoint
    // maybe I'll update this with a real world example
    return [
      'https://google.com',
      'https://github.com',
      'http://washingpost.com',
    ];
  }

  // you would never actually do this ... 
  public function rand($min, $max) {
    static $seed = true;
    if ($seed) {
      mt_srand(); // reset seed, so we don't share what the parent had
      $seed = false;
    }
    sleep(1); // fake latency
    return mt_rand($min, $max);
  }
}
// elsewhere in our Daemon code...

class MyDaemon extends Daemon {
  public function initialize() {
    $this->addWorker(new MyWorker(), 'example');
  }

  public function execute() {
    if ($this->getLoopIterations() % 5 = 0) {
      // the worker method will return a Promise
      $this->worker('example')->getUrls()->then(function($urls) {
        $this->log("URLS received: %s", json_encode($urls));
      });
    }
  }
}

Return Value

The return value of any Worker method will be a standard Promise. Where you can react to the return value when it happens and not have to wait around for it. When calling addWorker(...) you can also setup an ON_RETURN callback and not use the Promise value.

A little more care has to go into making sure the data returned is the data you expected. It could also be a CallDiedException (if the worker died w/o processing the request). In which case you'll have to handle it appropriately.

  // ...
  public function initialize() {
    $this->addWorker(new MyWorker(), 'example')
      ->onReturn(function($data) {
          switch (true) {
            case $data instanceof CallDiedException:
              $call = $data->getCall();
              $this->error("Worker PID %d died while processing request %s(%s)", 
                $call->getPid(), 
                $call->getMethod(), 
                implode(', ', $call->getArguments())
              );
              break;
            default:
              $this->log("URLS received: %s", json_encode($data));
          }
        }
      });
  }

To be continued ...

Clone this wiki locally