Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for worker_threads #90

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ language: node_js
node_js:
- 6
- 8
- 9
- 10
- 12
branches:
Expand Down
129 changes: 126 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
[![NPM](https://nodei.co/npm/worker-farm.png?downloads=true&downloadRank=true&stars=true)](https://nodei.co/npm/worker-farm/)


Distribute processing tasks to child processes with an über-simple API and baked-in durability & custom concurrency options. *Available in npm as <strong>worker-farm</strong>*.
Distribute processing tasks to child processes or worker threads with an über-simple API and baked-in durability & custom concurrency options. *Available in npm as <strong>worker-farm</strong>*.

## Example

Expand Down Expand Up @@ -48,6 +48,58 @@ We'll get an output something like the following:

This example is contained in the *[examples/basic](https://github.com/rvagg/node-worker-farm/tree/master/examples/basic/)* directory.

### Using worker_threads

Since version 2.0.0 you can also make use of node's [worker_threads](https://nodejs.org/api/worker_threads.html) module.
This requires you to use the module with node 12.x or higher, or 10.5 or higher with the ```--experimental-worker``` flag.
If ```worker_threads``` is not available, the module will fall back to the default implementation using child_processes, but a warning will be emited.

Given a file, *child.js*:

```js
'use strict'
const {threadId} = require('worker_threads')

module.exports = function (inp, callback) {
callback(null, inp + ' BAR (' + [process.pid, threadId].join(':') + ')')
}
```

And a main file:

```js
'use strict'

let workerFarm = require('../../')
, workers = workerFarm.threaded(require.resolve('./child'))
, ret = 0

for (let i = 0; i < 10; i++) {
workers('#' + i + ' FOO', function(err, outp) {
console.log(outp)
if (++ret == 10)
workerFarm.end(workers)
})
}
```

We'll get an output something like the following:

```
#3 FOO BAR (22236:4)
#5 FOO BAR (22236:6)
#1 FOO BAR (22236:2)
#9 FOO BAR (22236:2)
#4 FOO BAR (22236:5)
#7 FOO BAR (22236:8)
#2 FOO BAR (22236:3)
#0 FOO BAR (22236:1)
#8 FOO BAR (22236:1)
#6 FOO BAR (22236:7)
```

This example is contained in the *[examples/threaded](https://github.com/rvagg/node-worker-farm/tree/master/examples/threaded/)* directory.

### Example #1: Estimating π using child workers

You will also find a more complex example in *[examples/pi](https://github.com/rvagg/node-worker-farm/tree/master/examples/pi/)* that estimates the value of **π** by using a Monte Carlo *area-under-the-curve* method and compares the speed of doing it all in-process vs using child workers to complete separate portions.
Expand All @@ -63,6 +115,52 @@ Doing it the fast (multi-process) way...
took 1985 milliseconds
```

### Example #2: Using transferLists with worker threads

The [benefit](https://nodejs.org/docs/latest-v10.x/api/worker_threads.html#worker_threads_worker_threads) of using worker threads compared to child processes is that we can make use of transferLists and SharedArrayBuffers to efficiently pass data around.

When passing data from a child to the main thread, you can specify a transferList as third argument to the callback like
```js
module.exports = function(callback) {
let result = getLargeDataStructure()
let transferList = getTransferListForResult(result)
callback(null, result, transferList)
}
```

When passing data to a child, you can specify a transferList after the callback like
```js
let workers = workerFarm(require.resolve('path/to/child'))
workers(all, your, args, function(err, result) {
// Do something with the result
}, transferList)
```

Beware that **after** transferring, the data is no longer available on the sending side.
However, because we're using a queue it might be possible that after calling a worker, data may still be available.
Consider
```js
// Let's run a task very often, much more than we have child threads.
for (let i = 0; i < 100; i++) {
let arr = new Uint8Array(1024);
workers(function(err, result) {

}, [arr.buffer])

// What will be the length of arr here? If the array was transferred
// immediately to the child, arr.length === 0, but if the call is still in
// the queue - arr will still be available because it hasn't been
// transferred yet! In that case arr.length === 1024!
let schrodingersCat = {
dead: arr.length === 0
}

}
```
This behavior can hence lead to subtle bugs and therefore **YOU SHOULD NEVER** use an ArrayBuffer anymore after you've specified it in a transferList.

Have a look at the *[examples/transfer](https://github.com/rvagg/node-worker-farm/tree/master/examples/transfer/)* directory for an example that shows that transferLists can pass data around more efficiently.

## Durability

An important feature of Worker Farm is **call durability**. If a child process dies for any reason during the execution of call(s), those calls will be re-queued and taken care of by other child processes. In this way, when you ask for something to be done, unless there is something *seriously* wrong with what you're doing, you should get a result on your callback function.
Expand All @@ -83,6 +181,12 @@ Worker Farm exports a main function and an `end()` method. The main function set

In its most basic form, you call `workerFarm()` with the path to a module file to be invoked by the child process. You should use an **absolute path** to the module file, the best way to obtain the path is with `require.resolve('./path/to/module')`, this function can be used in exactly the same way as `require('./path/to/module')` but it returns an absolute path.

### workerFarm.threaded([options, ]pathToModule[, exportedMethods])

Using this method will try to use node's `worker_threads` module if it's available.
If it's not available, this will be detected and the module will default to the default implementation with child processes, displaying a warning that the `worker_threads` module is not available.
The api of the method is exactly the same as the `workerFarm()` function.

#### `exportedMethods`

If your module exports a single function on `module.exports` then you should omit the final parameter. However, if you are exporting multiple functions on `module.exports` then you should list them in an Array of Strings:
Expand Down Expand Up @@ -115,7 +219,7 @@ If you don't provide an `options` object then the following defaults will be use
}
```

* **<code>workerOptions</code>** allows you to customize all the parameters passed to child nodes. This object supports [all possible options of `child_process.fork`](https://nodejs.org/api/child_process.html#child_process_child_process_fork_modulepath_args_options). The default options passed are the parent `execArgv`, `cwd` and `env`. Any (or all) of them can be overridden, and others can be added as well.
* **<code>workerOptions</code>** allows you to customize all the parameters passed to child nodes. This object supports [all possible options of `child_process.fork`](https://nodejs.org/api/child_process.html#child_process_child_process_fork_modulepath_args_options) or [all possible options of `Worker`](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options) when using the threaded implementation. The default options passed are the parent `execArgv`, `cwd` and `env`. Any (or all) of them can be overridden, and others can be added as well.

* **<code>maxCallsPerWorker</code>** allows you to control the lifespan of your child processes. A positive number will indicate that you only want each child to accept that many calls before it is terminated. This may be useful if you need to control memory leaks or similar in child processes.

Expand All @@ -131,7 +235,7 @@ If you don't provide an `options` object then the following defaults will be use

* **<code>autoStart</code>** when set to `true` will start the workers as early as possible. Use this when your workers have to do expensive initialization. That way they'll be ready when the first request comes through.

* **<code>onChild</code>** when new child process starts this callback will be called with subprocess object as an argument. Use this when you need to add some custom communication with child processes.
* **<code>onChild</code>** when new child process (or worker when using threads) starts this callback will be called with subprocess (or worker) object as an argument. Use this when you need to add some custom communication with children.

### workerFarm.end(farm)

Expand All @@ -141,6 +245,25 @@ Any calls that are queued and not yet being handled by a child process will be d

Once you end a farm, it won't handle any more calls, so don't even try!

## Breaking changes in v2

Although not explicitly documented, in v1 it was possible to pass multiple arguments to the callback
```js
// BEWARE! CODE BELOW NO LONGER WORKS IN v2!

// child.js
module.exports = function(callback) {
callback(null, 'result', 'another result')
}

// parent.js
workers(function(err, one, two) {
console.log(one, two) // Logs 'result' 'another result'
})
```
This is no longer possible in v2 because the third argument is considered to be a transferList.
If you're using the default implementation that uses child processes, the third (and fourth, and fifth, ...) argument specified to the callback will simply be ignored.

## Related

* [farm-cli](https://github.com/Kikobeats/farm-cli) – Launch a farm of workers from CLI.
Expand Down
6 changes: 6 additions & 0 deletions examples/threads/child.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
'use strict'
const {threadId} = require('worker_threads')

module.exports = function (inp, callback) {
callback(null, inp + ' BAR (' + [process.pid, threadId].join(':') + ')')
}
14 changes: 14 additions & 0 deletions examples/threads/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
'use strict'

// Beware! Run with node 12.x or higher, or 10.5 with --experimental-worker
let workerFarm = require('../../')
, workers = workerFarm.threaded(require.resolve('./child'))
, ret = 0

for (let i = 0; i < 10; i++) {
workers('#' + i + ' FOO', function(err, outp) {
console.log(outp)
if (++ret == 10)
workerFarm.end(workers)
})
}
9 changes: 9 additions & 0 deletions examples/transfer/child.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
'use strict';

module.exports = function (size, useList, callback) {
let arr = new Float64Array(size[0])
for (let i = 0; i < arr.length; i++) {
arr[i] = Math.random()
}
callback(null, arr, useList ? [arr.buffer] : null)
}
40 changes: 40 additions & 0 deletions examples/transfer/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
'use strict'

// Beware! Run with node 12.x or higher, or 10.5 with --experimental-worker
let workerFarm = require('../../')
, workers = workerFarm.threaded(require.resolve('./child'))
, ret = 0
, n = 100
, size = 5*1024*1024

;(async function() {

console.time('With transferList')
await new Promise(function(resolve, reject) {
for (let i = 0; i < n; i++) {
let data = new Uint32Array([size])
workers(data, true, function(err, result) {
if (++ret == n)
resolve()
}, [data.buffer])
}
})
console.timeEnd('With transferList')

ret = 0

console.time('Without transferList')
await new Promise(function(resolve, reject) {
for (let i = 0; i < n; i++) {
let data = new Uint32Array([size])
workers(data, false, function(err, result) {
if (++ret == n)
resolve()
})
}
})
console.timeEnd('Without transferList')

workerFarm.end(workers)

})().catch(console.error)
46 changes: 46 additions & 0 deletions lib/child/handle.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
'use strict'
let $module
function handle (data, send) {
let idx = data.idx
, child = data.child
, method = data.method
, args = data.args
, callback = function () {
let _args = Array.prototype.slice.call(arguments)
if (_args[0] instanceof Error) {
let e = _args[0]
_args[0] = {
'$error' : '$error'
, 'type' : e.constructor.name
, 'message' : e.message
, 'stack' : e.stack
}
Object.keys(e).forEach(function(key) {
_args[0][key] = e[key]
})
}
send({ owner: 'farm', idx: idx, child: child, args: _args.slice(0,2) }, _args[2]);
}
, exec

if (method == null && typeof $module == 'function')
exec = $module
else if (typeof $module[method] == 'function')
exec = $module[method]

if (!exec)
return console.error('NO SUCH METHOD:', method)

exec.apply(null, args.concat([ callback ]))
}

module.exports = function(send) {
return function(data) {
if (data.owner !== 'farm') {
return;
}
if (!$module) return $module = require(data.module)
if (data.event == 'die') return process.exit(0)
handle(data, send)
}
}
54 changes: 6 additions & 48 deletions lib/child/index.js
Original file line number Diff line number Diff line change
@@ -1,56 +1,14 @@
'use strict'

let $module
const handle = require('./handle')

/*
let contextProto = this.context;
while (contextProto = Object.getPrototypeOf(contextProto)) {
completionGroups.push(Object.getOwnPropertyNames(contextProto));
}
*/


function handle (data) {
let idx = data.idx
, child = data.child
, method = data.method
, args = data.args
, callback = function () {
let _args = Array.prototype.slice.call(arguments)
if (_args[0] instanceof Error) {
let e = _args[0]
_args[0] = {
'$error' : '$error'
, 'type' : e.constructor.name
, 'message' : e.message
, 'stack' : e.stack
}
Object.keys(e).forEach(function(key) {
_args[0][key] = e[key]
})
}
process.send({ owner: 'farm', idx: idx, child: child, args: _args })
}
, exec

if (method == null && typeof $module == 'function')
exec = $module
else if (typeof $module[method] == 'function')
exec = $module[method]

if (!exec)
return console.error('NO SUCH METHOD:', method)

exec.apply(null, args.concat([ callback ]))
}


process.on('message', function (data) {
if (data.owner !== 'farm') {
return;
}

if (!$module) return $module = require(data.module)
if (data.event == 'die') return process.exit(0)
handle(data)
})
process.on('message', handle(function(data) {
// Ignore any additional arguments because process send only allows to
// send handles.
process.send(data)
}))
4 changes: 4 additions & 0 deletions lib/child/thread.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
'use strict'
const handle = require('./handle')
const {parentPort} = require('worker_threads')
parentPort.on('message', handle(parentPort.postMessage.bind(parentPort)))
Loading