# Contents ## Modules
creates faktory singletons
A client connection handle for interacting with the faktory server. Holds a pool of 1 or more underlying connections. Safe for concurrent use and tolerant of unexpected connection terminations. Use this object for all interactions with the factory server.
A class wrapping a JobPayload
Creating and pushing a job is typically accomplished by using
a faktory client, which implements .job and automatically
sets the client for the job when calling .push on the job later.
You do not need to use this class directly.`
A wrapper for the Mutate API
A low-level data management API to script certain repairs or migrations.
!!! Please be warned: MUTATE commands can be slow and/or resource intensive. They should not be used as part of your application logic.
Representation of a worker process with many concurrent job processors. Works at the
concurrency set in options during construction. Will hold at most concurrency jobs
in-memory while processing at any one time. Listens for signals to quiet or shutdown.
Should not be started more than once per-process, nor should more than one worker be
started per-process.
objectAn after-connect initial message from the server to handshake the connection
objectThe client's response to the server's HI to initiate a connection
FaktoryControl
* [.register(name, fn)](#module_faktory+register) ⇒ FaktoryControl
* [.connect(...args)](#module_faktory+connect) ⇒ [Client](#Client)
* [.work(options)](#module_faktory+work) ⇒ Promise
* [.stop()](#module_faktory+stop) ⇒ promise
### faktory.use(fn) ⇒ FaktoryControl
Adds a middleware function to the stack
**Kind**: instance method of [faktory](#module_faktory)
**Returns**: FaktoryControl - this
**See**: [koa middleware](https://github.com/koajs/koa/blob/master/docs/guide.md#writing-middleware)
| Param | Type | Description |
| --- | --- | --- |
| fn | function | koa-compose-style middleware function |
**Example**
```js
faktory.use(async (ctx, next) => {
// a pool you created to hold database connections
pool.use(async (conn) => {
ctx.db = conn;
await next();
});
});
```
### faktory.register(name, fn) ⇒ FaktoryControl
Adds a [JobFunction](external:JobFunction) to the [Registry](Registry)
**Kind**: instance method of [faktory](#module_faktory)
**Returns**: FaktoryControl - this
| Param | Type | Description |
| --- | --- | --- |
| name | external:Jobtype | string descriptor for the jobtype |
| fn | external:JobFunction | |
**Example**
```js
faktory.register('MyJob', (...args) => {
// some work
});
```
### faktory.connect(...args) ⇒ [Client](#Client)
Creates a new [Client](#Client)
**Kind**: instance method of [faktory](#module_faktory)
| Param | Type | Description |
| --- | --- | --- |
| ...args | \* | args forwarded to [Client](#Client) |
**Example**
```js
const client = await faktory.connect();
await client.push(job);
```
### faktory.work(options) ⇒ Promise
Starts a worker. Resolves after the worker is started. Only call this
once per-process.
**Kind**: instance method of [faktory](#module_faktory)
**Returns**: Promise - the [Worker.work](Worker.work) promise
| Param | Type | Description |
| --- | --- | --- |
| options | object | options to [Worker](#Worker) |
**Example**
```js
// this keeps the process open and can be `await`ed
faktory.work();
```
### faktory.stop() ⇒ promise
Stops the worker previously started.
**Kind**: instance method of [faktory](#module_faktory)
**Returns**: promise - promise returned by [Worker.stop](Worker.stop)
**Example**
```js
// previously
faktory.work();
faktory.stop();
```
## Client
A client connection handle for interacting with the faktory server. Holds a pool of 1 or more
underlying connections. Safe for concurrent use and tolerant of unexpected
connection terminations. Use this object for all interactions with the factory server.
**Kind**: global class
* [Client](#Client)
* [new Client([options])](#new_Client_new)
* [.connect()](#Client+connect) ⇒ [Promise.<Client>](#Client)
* [.close()](#Client+close) ⇒ Promise.<undefined>
* [.job(jobtype, ...args)](#Client+job) ⇒ [Job](#Job)
* [.send(...args)](#Client+send)
* [.fetch(...queues)](#Client+fetch) ⇒ Promise.<(object\|null)>
* [.beat()](#Client+beat) ⇒ Promise.<string>
* [.push(job)](#Client+push) ⇒ Promise.<string>
* [.flush()](#Client+flush) ⇒ Promise.<string>
* [.info()](#Client+info) ⇒ Promise.<object>
* [.ack(jid)](#Client+ack) ⇒ Promise.<string>
* [.fail(jid, e)](#Client+fail) ⇒ Promise.<string>
### new Client([options])
Creates a Client with a connection pool
| Param | Type | Default | Description |
| --- | --- | --- | --- |
| [options] | object | | |
| [options.url] | string | "tcp://localhost:7419" | connection string for the faktory server (checks for FAKTORY_PROVIDER and FAKTORY_URL) |
| [options.host] | string | "localhost" | host string to connect to |
| [options.port] | number \| string | 7419 | port to connect to faktory server on |
| [options.password] | string | | faktory server password to use during HELLO |
| [options.wid] | string | | optional wid that should be provided to the server (only necessary for a worker process consuming jobs) |
| [options.labels] | Array.<string> | [] | optional labels to provide the faktory server for this client |
| [options.poolSize] | number | 10 | the maxmimum size of the connection pool |
**Example**
```js
const client = new Client();
const job = await client.fetch('default');
```
### client.connect() ⇒ [Promise.<Client>](#Client)
Explicitly opens a connection and then closes it to test connectivity.
Under normal circumstances you don't need to call this method as all of the
communication methods will check out a connection before executing. If a connection is
not available, one will be created. This method exists to ensure connection is possible
if you need to do so. You can think of this like [sqlx#MustConnect](https://godoc.org/github.com/jmoiron/sqlx#MustConnect)
**Kind**: instance method of [Client](#Client)
**Returns**: [Promise.<Client>](#Client) - resolves when a connection is opened
### client.close() ⇒ Promise.<undefined>
Closes the connection to the server
**Kind**: instance method of [Client](#Client)
### client.job(jobtype, ...args) ⇒ [Job](#Job)
Creates a new Job object to build a job payload
**Kind**: instance method of [Client](#Client)
**Returns**: [Job](#Job) - a job builder with attached Client for PUSHing
**See**: Job
| Param | Type | Description |
| --- | --- | --- |
| jobtype | String | name of the job function |
| ...args | \* | arguments to the job function |
### client.send(...args)
Borrows a connection from the connection pool, forwards all arguments to
[Connection.send](Connection.send), and checks the connection back into the pool when
the promise returned by the wrapped function is resolved or rejected.
**Kind**: instance method of [Client](#Client)
**See**: Connection.send
| Param | Type | Description |
| --- | --- | --- |
| ...args | \* | arguments to [Connection.send](Connection.send) |
### client.fetch(...queues) ⇒ Promise.<(object\|null)>
Fetches a job payload from the server from one of ...queues
**Kind**: instance method of [Client](#Client)
**Returns**: Promise.<(object\|null)> - a job payload if one is available, otherwise null
| Param | Type | Description |
| --- | --- | --- |
| ...queues | String | list of queues to pull a job from |
### client.beat() ⇒ Promise.<string>
Sends a heartbeat for this.wid to the server
**Kind**: instance method of [Client](#Client)
**Returns**: Promise.<string> - string 'OK' when the heartbeat is accepted, otherwise
may return a state string when the server has a signal
to send this client (`quiet`, `terminate`)
### client.push(job) ⇒ Promise.<string>
Pushes a job payload to the server
**Kind**: instance method of [Client](#Client)
**Returns**: Promise.<string> - the jid for the pushed job
| Param | Type | Description |
| --- | --- | --- |
| job | [Job](#Job) \| Object | job payload to push |
### client.flush() ⇒ Promise.<string>
Sends a FLUSH to the server
**Kind**: instance method of [Client](#Client)
**Returns**: Promise.<string> - resolves with the server's response text
### client.info() ⇒ Promise.<object>
Sends an INFO command to the server
**Kind**: instance method of [Client](#Client)
**Returns**: Promise.<object> - the server's INFO response object
### client.ack(jid) ⇒ Promise.<string>
Sends an ACK to the server for a particular job ID
**Kind**: instance method of [Client](#Client)
**Returns**: Promise.<string> - the server's response text
| Param | Type | Description |
| --- | --- | --- |
| jid | String | the jid of the job to acknowledge |
### client.fail(jid, e) ⇒ Promise.<string>
Sends a FAIL command to the server for a particular job ID with error information
**Kind**: instance method of [Client](#Client)
**Returns**: Promise.<string> - the server's response text
| Param | Type | Description |
| --- | --- | --- |
| jid | String | the jid of the job to FAIL |
| e | Error | an error object that caused the job to fail |
## Job
A class wrapping a [JobPayload](external:JobPayload)
Creating and pushing a job is typically accomplished by using
a faktory client, which implements `.job` and automatically
sets the client for the job when calling `.push` on the job later.
You do not need to use this class directly.`
**Kind**: global class
* [Job](#Job)
* [new Job(jobtype, [client])](#new_Job_new)
* _instance_
* [.jid](#Job+jid)
* [.queue](#Job+queue)
* [.args](#Job+args)
* [.priority](#Job+priority)
* [.retry](#Job+retry)
* [.at](#Job+at)
* [.reserveFor](#Job+reserveFor)
* [.custom](#Job+custom)
* [.toJSON()](#Job+toJSON) ⇒ object
* [.push()](#Job+push) ⇒ string
* _static_
* [.jid()](#Job.jid) ⇒ string
### new Job(jobtype, [client])
Creates a job
| Param | Type | Description |
| --- | --- | --- |
| jobtype | string | [Jobtype](external:Jobtype) string |
| [client] | [Client](#Client) | a client to use for communicating to the server (if calling push) |
**Example** *(with a faktory client)*
```js
// with a client
const client = await faktory.connect();
const job = client.job('SendWelcomeEmail', id);
```
### job.jid
sets the jid
**Kind**: instance property of [Job](#Job)
**See**: external:JobPayload
| Param | Type | Description |
| --- | --- | --- |
| value | string | the >8 length jid |
### job.queue
sets the queue
**Kind**: instance property of [Job](#Job)
**See**: external:JobPayload
| Param | Type | Description |
| --- | --- | --- |
| value | string | queue name |
### job.args
sets the args
**Kind**: instance property of [Job](#Job)
**See**: external:JobPayload
| Param | Type | Description |
| --- | --- | --- |
| value | Array | array of positional arguments |
### job.priority
sets the priority of this job
**Kind**: instance property of [Job](#Job)
**See**: external:JobPayload
| Param | Type | Description |
| --- | --- | --- |
| value | number | 0-9 |
### job.retry
sets the retry count
**Kind**: instance property of [Job](#Job)
**See**: external:JobPayload
| Param | Type | Description |
| --- | --- | --- |
| value | number | {@see external:JobPayload} |
### job.at
sets the scheduled time
**Kind**: instance property of [Job](#Job)
**See**: external:JobPayload
| Param | Type | Description |
| --- | --- | --- |
| value | Date \| string | the date object or RFC3339 timestamp string |
### job.reserveFor
sets the reserveFor parameter
**Kind**: instance property of [Job](#Job)
**See**: external:JobPayload
| Param | Type |
| --- | --- |
| value | number |
### job.custom
sets the custom object property
**Kind**: instance property of [Job](#Job)
**See**: external:JobPayload
| Param | Type | Description |
| --- | --- | --- |
| value | object | the custom data |
### job.toJSON() ⇒ object
Generates an object from this instance for transmission over the wire
**Kind**: instance method of [Job](#Job)
**Returns**: object - the job as a serializable javascript object
**Link**: external:JobPayload|JobPayload}
**See**: external:JobPayload
### job.push() ⇒ string
Pushes this job to the faktory server. Modifications after this point are not
persistable to the server
**Kind**: instance method of [Job](#Job)
**Returns**: string - return of client.push(job)
### Job.jid() ⇒ string
generates a uuid
**Kind**: static method of [Job](#Job)
**Returns**: string - a uuid/v4 string
## Mutation
A wrapper for the [Mutate API](https://github.com/contribsys/faktory/wiki/Mutate-API)
A low-level data management API to script certain repairs or migrations.
!!! Please be warned: MUTATE commands can be slow and/or resource intensive.
**They should not be used as part of your application logic.**
**Kind**: global class
* [Mutation](#Mutation)
* [new Mutation(client)](#new_Mutation_new)
* [.ofType(type)](#Mutation+ofType)
* [.withJids(...jids)](#Mutation+withJids)
* [.matching(pattern)](#Mutation+matching)
* [.clear()](#Mutation+clear)
* [.kill()](#Mutation+kill)
* [.discard()](#Mutation+discard)
* [.requeue()](#Mutation+requeue)
### new Mutation(client)
| Param | Type |
| --- | --- |
| client | [Client](#Client) |
### mutation.ofType(type)
Filters the affected jobs by a jobtype string.
Use this to ensure you're only affecting a single jobtype if applicable.
Can be chained.
Note: jobtype and other filters do not apply for the *clear* command.
**Kind**: instance method of [Mutation](#Mutation)
| Param | Type | Description |
| --- | --- | --- |
| type | string | jobtype fiter for operation |
**Example**
```js
client.dead.ofType('SendEmail').discard();
```
### mutation.withJids(...jids)
Filters the affected jobs by one or more job ids. This is much more
efficient when only one jid is provided. Can be chained.
Note: jobtype and other filters do not apply for the *clear* command.
**Kind**: instance method of [Mutation](#Mutation)
| Param | Type | Description |
| --- | --- | --- |
| ...jids | string | job ids to target for the operation |
**Example**
```js
await client.retries.withJids('1234').requeue();
```
### mutation.matching(pattern)
Filters the MUTATE selection to jobs matching a Redis SCAN pattern.
Can be chained.
Note the regexp filter scans the entire job payload and can be tricky to
get right, for instance you'll probably need * on both sides. The regexp
filter option is passed to Redis's SCAN command directly, read the SCAN
documentation for further details.
https://redis.io/commands/scan
**Kind**: instance method of [Mutation](#Mutation)
| Param | Type | Description |
| --- | --- | --- |
| pattern | string | redis SCAN pattern to target jobs for the operation |
**Example**
```js
await client.retries.matching("*uid:12345*").kill();
```
### mutation.clear()
Executes a *clear* mutation. This clears the
set entirely **and any filtering added does not apply**.
**Kind**: instance method of [Mutation](#Mutation)
### mutation.kill()
Executes a *kill* mutation. Jobs that are killed are sent to the dead set.
**Kind**: instance method of [Mutation](#Mutation)
### mutation.discard()
Executes a *discard* mutation. Jobs that are discarded are permanently deleted.
**Kind**: instance method of [Mutation](#Mutation)
### mutation.requeue()
Executes a *requeue* mutation. Jobs that are requeued are sent back to their
original queue for processing.
**Kind**: instance method of [Mutation](#Mutation)
## Worker
Representation of a worker process with many concurrent job processors. Works at the
concurrency set in options during construction. Will hold at most `concurrency` jobs
in-memory while processing at any one time. Listens for signals to quiet or shutdown.
Should not be started more than once per-process, nor should more than one worker be
started per-process.
**Kind**: global class
* [Worker](#Worker)
* [new Worker([options])](#new_Worker_new)
* [.work()](#Worker+work) ⇒
* [.quiet()](#Worker+quiet)
* [.stop()](#Worker+stop) ⇒ promise
* [.beat()](#Worker+beat)
* [.use(fn)](#Worker+use) ⇒ FaktoryControl
* [.register(name, fn)](#Worker+register) ⇒ FaktoryControl
### new Worker([options])
| Param | Type | Default | Description |
| --- | --- | --- | --- |
| [options] | object | | |
| [options.wid] | String | uuid().slice(0, 8) | the wid the worker will use |
| [options.concurrency] | Number | 20 | how many jobs this worker can process at once |
| [options.shutdownTimeout] | Number | 8 | the amount of time in seconds that the worker may take to finish a job before exiting ungracefully |
| [options.beatInterval] | Number | 15 | the amount of time in seconds between each heartbeat |
| [options.queues] | Array.<string> | ['default'] | the queues this worker will fetch jobs from |
| [options.middleware] | Array.<function()> | [] | a set of middleware to run before performing each job in koa.js-style middleware execution signature |
| [options.registry] | Registry | Registry | the job registry to use when working |
| [options.poolSize] | Number | concurrency+2 | the client connection pool size for this worker |
**Example**
```js
const worker = new Worker({
queues: ['critical', 'default', 'low'],
});
worker.work();
```
### worker.work() ⇒
starts the worker fetch loop and job processing
**Kind**: instance method of [Worker](#Worker)
**Returns**: self, when working has been stopped by a signal or concurrent
call to stop or quiet
**See**
- Worker.quiet
- Worker.stop
### worker.quiet()
Signals to the worker to discontinue fetching new jobs and allows the worker
to continue processing any currently-running jobs
**Kind**: instance method of [Worker](#Worker)
### worker.stop() ⇒ promise
stops the worker
**Kind**: instance method of [Worker](#Worker)
**Returns**: promise - resolved when worker stops
### worker.beat()
Sends a heartbeat for this server and interprets the response state (if present)
to quiet or terminate the worker
**Kind**: instance method of [Worker](#Worker)
### worker.use(fn) ⇒ FaktoryControl
Adds a middleware function to the stack
**Kind**: instance method of [Worker](#Worker)
**Returns**: FaktoryControl - this
**See**: [koa middleware](https://github.com/koajs/koa/blob/master/docs/guide.md#writing-middleware)
| Param | Type | Description |
| --- | --- | --- |
| fn | function | koa-compose-style middleware function |
**Example**
```js
faktory.use(async (ctx, next) => {
// a pool you created to hold database connections
pool.use(async (conn) => {
ctx.db = conn;
await next();
});
});
```
### worker.register(name, fn) ⇒ FaktoryControl
Adds a [JobFunction](external:JobFunction) to the [Registry](Registry)
**Kind**: instance method of [Worker](#Worker)
**Returns**: FaktoryControl - this
| Param | Type | Description |
| --- | --- | --- |
| name | external:Jobtype | string descriptor for the jobtype |
| fn | external:JobFunction | |
**Example**
```js
faktory.register('MyJob', (...args) => {
// some work
});
```
## HI : object
An after-connect initial message from the server to handshake the connection
**Kind**: global external
**See**: external:HELLO
**Properties**
| Name | Type | Description |
| --- | --- | --- |
| v | number | faktory server protocol version number |
| i | number | only present when password is required. number of password hash iterations. see [HELLO](HELLO). |
| s | string | only present when password is required. salt for password hashing. see [HELLO](HELLO). |
## HELLO : object
The client's response to the server's [HI](HI) to initiate a connection
**Kind**: global external
**See**
- external:HI
- [Faktory Protocol Specification](https://github.com/contribsys/faktory/blob/master/docs/protocol-specification.md)
**Properties**
| Name | Type | Description |
| --- | --- | --- |
| v | string | the faktory client protocol version |
| hostname | string | name of the host that is running this worker |
| wid | string | globally unique identifier for this worker |
| pid | number | local process identifier for this worker on its host |
| labels | Array.<string> | labels that apply to this worker, to allow producers to target work units to worker types. |
| pwdhash | string | This field should be the hexadecimal representation of the ith SHA256 hash of the client password concatenated with the value in s. |