# Contents ## Modules
faktory

creates faktory singletons

## Classes
Client

A client connection handle for interacting with the faktory server. Holds a pool of 1 or more underlying connections. Safe for concurrent use and tolerant of unexpected connection terminations. Use this object for all interactions with the factory server.

Job

A class wrapping a JobPayload

Creating and pushing a job is typically accomplished by using a faktory client, which implements .job and automatically sets the client for the job when calling .push on the job later.

You do not need to use this class directly.`

Mutation

A wrapper for the Mutate API

A low-level data management API to script certain repairs or migrations.

!!! Please be warned: MUTATE commands can be slow and/or resource intensive. They should not be used as part of your application logic.

Worker

Representation of a worker process with many concurrent job processors. Works at the concurrency set in options during construction. Will hold at most concurrency jobs in-memory while processing at any one time. Listens for signals to quiet or shutdown. Should not be started more than once per-process, nor should more than one worker be started per-process.

## External
HI : object

An after-connect initial message from the server to handshake the connection

HELLO : object

The client's response to the server's HI to initiate a connection

# API ## faktory creates faktory singletons * [faktory](#module_faktory) * [.use(fn)](#module_faktory+use) ⇒ FaktoryControl * [.register(name, fn)](#module_faktory+register) ⇒ FaktoryControl * [.connect(...args)](#module_faktory+connect) ⇒ [Client](#Client) * [.work(options)](#module_faktory+work) ⇒ Promise * [.stop()](#module_faktory+stop) ⇒ promise ### faktory.use(fn) ⇒ FaktoryControl Adds a middleware function to the stack **Kind**: instance method of [faktory](#module_faktory) **Returns**: FaktoryControl - this **See**: [koa middleware](https://github.com/koajs/koa/blob/master/docs/guide.md#writing-middleware) | Param | Type | Description | | --- | --- | --- | | fn | function | koa-compose-style middleware function | **Example** ```js faktory.use(async (ctx, next) => { // a pool you created to hold database connections pool.use(async (conn) => { ctx.db = conn; await next(); }); }); ``` ### faktory.register(name, fn) ⇒ FaktoryControl Adds a [JobFunction](external:JobFunction) to the [Registry](Registry) **Kind**: instance method of [faktory](#module_faktory) **Returns**: FaktoryControl - this | Param | Type | Description | | --- | --- | --- | | name | external:Jobtype | string descriptor for the jobtype | | fn | external:JobFunction | | **Example** ```js faktory.register('MyJob', (...args) => { // some work }); ``` ### faktory.connect(...args) ⇒ [Client](#Client) Creates a new [Client](#Client) **Kind**: instance method of [faktory](#module_faktory) | Param | Type | Description | | --- | --- | --- | | ...args | \* | args forwarded to [Client](#Client) | **Example** ```js const client = await faktory.connect(); await client.push(job); ``` ### faktory.work(options) ⇒ Promise Starts a worker. Resolves after the worker is started. Only call this once per-process. **Kind**: instance method of [faktory](#module_faktory) **Returns**: Promise - the [Worker.work](Worker.work) promise | Param | Type | Description | | --- | --- | --- | | options | object | options to [Worker](#Worker) | **Example** ```js // this keeps the process open and can be `await`ed faktory.work(); ``` ### faktory.stop() ⇒ promise Stops the worker previously started. **Kind**: instance method of [faktory](#module_faktory) **Returns**: promise - promise returned by [Worker.stop](Worker.stop) **Example** ```js // previously faktory.work(); faktory.stop(); ``` ## Client A client connection handle for interacting with the faktory server. Holds a pool of 1 or more underlying connections. Safe for concurrent use and tolerant of unexpected connection terminations. Use this object for all interactions with the factory server. **Kind**: global class * [Client](#Client) * [new Client([options])](#new_Client_new) * [.connect()](#Client+connect) ⇒ [Promise.<Client>](#Client) * [.close()](#Client+close) ⇒ Promise.<undefined> * [.job(jobtype, ...args)](#Client+job) ⇒ [Job](#Job) * [.send(...args)](#Client+send) * [.fetch(...queues)](#Client+fetch) ⇒ Promise.<(object\|null)> * [.beat()](#Client+beat) ⇒ Promise.<string> * [.push(job)](#Client+push) ⇒ Promise.<string> * [.flush()](#Client+flush) ⇒ Promise.<string> * [.info()](#Client+info) ⇒ Promise.<object> * [.ack(jid)](#Client+ack) ⇒ Promise.<string> * [.fail(jid, e)](#Client+fail) ⇒ Promise.<string> ### new Client([options]) Creates a Client with a connection pool | Param | Type | Default | Description | | --- | --- | --- | --- | | [options] | object | | | | [options.url] | string | "tcp://localhost:7419" | connection string for the faktory server (checks for FAKTORY_PROVIDER and FAKTORY_URL) | | [options.host] | string | "localhost" | host string to connect to | | [options.port] | number \| string | 7419 | port to connect to faktory server on | | [options.password] | string | | faktory server password to use during HELLO | | [options.wid] | string | | optional wid that should be provided to the server (only necessary for a worker process consuming jobs) | | [options.labels] | Array.<string> | [] | optional labels to provide the faktory server for this client | | [options.poolSize] | number | 10 | the maxmimum size of the connection pool | **Example** ```js const client = new Client(); const job = await client.fetch('default'); ``` ### client.connect() ⇒ [Promise.<Client>](#Client) Explicitly opens a connection and then closes it to test connectivity. Under normal circumstances you don't need to call this method as all of the communication methods will check out a connection before executing. If a connection is not available, one will be created. This method exists to ensure connection is possible if you need to do so. You can think of this like [sqlx#MustConnect](https://godoc.org/github.com/jmoiron/sqlx#MustConnect) **Kind**: instance method of [Client](#Client) **Returns**: [Promise.<Client>](#Client) - resolves when a connection is opened ### client.close() ⇒ Promise.<undefined> Closes the connection to the server **Kind**: instance method of [Client](#Client) ### client.job(jobtype, ...args) ⇒ [Job](#Job) Creates a new Job object to build a job payload **Kind**: instance method of [Client](#Client) **Returns**: [Job](#Job) - a job builder with attached Client for PUSHing **See**: Job | Param | Type | Description | | --- | --- | --- | | jobtype | String | name of the job function | | ...args | \* | arguments to the job function | ### client.send(...args) Borrows a connection from the connection pool, forwards all arguments to [Connection.send](Connection.send), and checks the connection back into the pool when the promise returned by the wrapped function is resolved or rejected. **Kind**: instance method of [Client](#Client) **See**: Connection.send | Param | Type | Description | | --- | --- | --- | | ...args | \* | arguments to [Connection.send](Connection.send) | ### client.fetch(...queues) ⇒ Promise.<(object\|null)> Fetches a job payload from the server from one of ...queues **Kind**: instance method of [Client](#Client) **Returns**: Promise.<(object\|null)> - a job payload if one is available, otherwise null | Param | Type | Description | | --- | --- | --- | | ...queues | String | list of queues to pull a job from | ### client.beat() ⇒ Promise.<string> Sends a heartbeat for this.wid to the server **Kind**: instance method of [Client](#Client) **Returns**: Promise.<string> - string 'OK' when the heartbeat is accepted, otherwise may return a state string when the server has a signal to send this client (`quiet`, `terminate`) ### client.push(job) ⇒ Promise.<string> Pushes a job payload to the server **Kind**: instance method of [Client](#Client) **Returns**: Promise.<string> - the jid for the pushed job | Param | Type | Description | | --- | --- | --- | | job | [Job](#Job) \| Object | job payload to push | ### client.flush() ⇒ Promise.<string> Sends a FLUSH to the server **Kind**: instance method of [Client](#Client) **Returns**: Promise.<string> - resolves with the server's response text ### client.info() ⇒ Promise.<object> Sends an INFO command to the server **Kind**: instance method of [Client](#Client) **Returns**: Promise.<object> - the server's INFO response object ### client.ack(jid) ⇒ Promise.<string> Sends an ACK to the server for a particular job ID **Kind**: instance method of [Client](#Client) **Returns**: Promise.<string> - the server's response text | Param | Type | Description | | --- | --- | --- | | jid | String | the jid of the job to acknowledge | ### client.fail(jid, e) ⇒ Promise.<string> Sends a FAIL command to the server for a particular job ID with error information **Kind**: instance method of [Client](#Client) **Returns**: Promise.<string> - the server's response text | Param | Type | Description | | --- | --- | --- | | jid | String | the jid of the job to FAIL | | e | Error | an error object that caused the job to fail | ## Job A class wrapping a [JobPayload](external:JobPayload) Creating and pushing a job is typically accomplished by using a faktory client, which implements `.job` and automatically sets the client for the job when calling `.push` on the job later. You do not need to use this class directly.` **Kind**: global class * [Job](#Job) * [new Job(jobtype, [client])](#new_Job_new) * _instance_ * [.jid](#Job+jid) * [.queue](#Job+queue) * [.args](#Job+args) * [.priority](#Job+priority) * [.retry](#Job+retry) * [.at](#Job+at) * [.reserveFor](#Job+reserveFor) * [.custom](#Job+custom) * [.toJSON()](#Job+toJSON) ⇒ object * [.push()](#Job+push) ⇒ string * _static_ * [.jid()](#Job.jid) ⇒ string ### new Job(jobtype, [client]) Creates a job | Param | Type | Description | | --- | --- | --- | | jobtype | string | [Jobtype](external:Jobtype) string | | [client] | [Client](#Client) | a client to use for communicating to the server (if calling push) | **Example** *(with a faktory client)* ```js // with a client const client = await faktory.connect(); const job = client.job('SendWelcomeEmail', id); ``` ### job.jid sets the jid **Kind**: instance property of [Job](#Job) **See**: external:JobPayload | Param | Type | Description | | --- | --- | --- | | value | string | the >8 length jid | ### job.queue sets the queue **Kind**: instance property of [Job](#Job) **See**: external:JobPayload | Param | Type | Description | | --- | --- | --- | | value | string | queue name | ### job.args sets the args **Kind**: instance property of [Job](#Job) **See**: external:JobPayload | Param | Type | Description | | --- | --- | --- | | value | Array | array of positional arguments | ### job.priority sets the priority of this job **Kind**: instance property of [Job](#Job) **See**: external:JobPayload | Param | Type | Description | | --- | --- | --- | | value | number | 0-9 | ### job.retry sets the retry count **Kind**: instance property of [Job](#Job) **See**: external:JobPayload | Param | Type | Description | | --- | --- | --- | | value | number | {@see external:JobPayload} | ### job.at sets the scheduled time **Kind**: instance property of [Job](#Job) **See**: external:JobPayload | Param | Type | Description | | --- | --- | --- | | value | Date \| string | the date object or RFC3339 timestamp string | ### job.reserveFor sets the reserveFor parameter **Kind**: instance property of [Job](#Job) **See**: external:JobPayload | Param | Type | | --- | --- | | value | number | ### job.custom sets the custom object property **Kind**: instance property of [Job](#Job) **See**: external:JobPayload | Param | Type | Description | | --- | --- | --- | | value | object | the custom data | ### job.toJSON() ⇒ object Generates an object from this instance for transmission over the wire **Kind**: instance method of [Job](#Job) **Returns**: object - the job as a serializable javascript object **Link**: external:JobPayload|JobPayload} **See**: external:JobPayload ### job.push() ⇒ string Pushes this job to the faktory server. Modifications after this point are not persistable to the server **Kind**: instance method of [Job](#Job) **Returns**: string - return of client.push(job) ### Job.jid() ⇒ string generates a uuid **Kind**: static method of [Job](#Job) **Returns**: string - a uuid/v4 string ## Mutation A wrapper for the [Mutate API](https://github.com/contribsys/faktory/wiki/Mutate-API) A low-level data management API to script certain repairs or migrations. !!! Please be warned: MUTATE commands can be slow and/or resource intensive. **They should not be used as part of your application logic.** **Kind**: global class * [Mutation](#Mutation) * [new Mutation(client)](#new_Mutation_new) * [.ofType(type)](#Mutation+ofType) * [.withJids(...jids)](#Mutation+withJids) * [.matching(pattern)](#Mutation+matching) * [.clear()](#Mutation+clear) * [.kill()](#Mutation+kill) * [.discard()](#Mutation+discard) * [.requeue()](#Mutation+requeue) ### new Mutation(client) | Param | Type | | --- | --- | | client | [Client](#Client) | ### mutation.ofType(type) Filters the affected jobs by a jobtype string. Use this to ensure you're only affecting a single jobtype if applicable. Can be chained. Note: jobtype and other filters do not apply for the *clear* command. **Kind**: instance method of [Mutation](#Mutation) | Param | Type | Description | | --- | --- | --- | | type | string | jobtype fiter for operation | **Example** ```js client.dead.ofType('SendEmail').discard(); ``` ### mutation.withJids(...jids) Filters the affected jobs by one or more job ids. This is much more efficient when only one jid is provided. Can be chained. Note: jobtype and other filters do not apply for the *clear* command. **Kind**: instance method of [Mutation](#Mutation) | Param | Type | Description | | --- | --- | --- | | ...jids | string | job ids to target for the operation | **Example** ```js await client.retries.withJids('1234').requeue(); ``` ### mutation.matching(pattern) Filters the MUTATE selection to jobs matching a Redis SCAN pattern. Can be chained. Note the regexp filter scans the entire job payload and can be tricky to get right, for instance you'll probably need * on both sides. The regexp filter option is passed to Redis's SCAN command directly, read the SCAN documentation for further details. https://redis.io/commands/scan **Kind**: instance method of [Mutation](#Mutation) | Param | Type | Description | | --- | --- | --- | | pattern | string | redis SCAN pattern to target jobs for the operation | **Example** ```js await client.retries.matching("*uid:12345*").kill(); ``` ### mutation.clear() Executes a *clear* mutation. This clears the set entirely **and any filtering added does not apply**. **Kind**: instance method of [Mutation](#Mutation) ### mutation.kill() Executes a *kill* mutation. Jobs that are killed are sent to the dead set. **Kind**: instance method of [Mutation](#Mutation) ### mutation.discard() Executes a *discard* mutation. Jobs that are discarded are permanently deleted. **Kind**: instance method of [Mutation](#Mutation) ### mutation.requeue() Executes a *requeue* mutation. Jobs that are requeued are sent back to their original queue for processing. **Kind**: instance method of [Mutation](#Mutation) ## Worker Representation of a worker process with many concurrent job processors. Works at the concurrency set in options during construction. Will hold at most `concurrency` jobs in-memory while processing at any one time. Listens for signals to quiet or shutdown. Should not be started more than once per-process, nor should more than one worker be started per-process. **Kind**: global class * [Worker](#Worker) * [new Worker([options])](#new_Worker_new) * [.work()](#Worker+work) ⇒ * [.quiet()](#Worker+quiet) * [.stop()](#Worker+stop) ⇒ promise * [.beat()](#Worker+beat) * [.use(fn)](#Worker+use) ⇒ FaktoryControl * [.register(name, fn)](#Worker+register) ⇒ FaktoryControl ### new Worker([options]) | Param | Type | Default | Description | | --- | --- | --- | --- | | [options] | object | | | | [options.wid] | String | uuid().slice(0, 8) | the wid the worker will use | | [options.concurrency] | Number | 20 | how many jobs this worker can process at once | | [options.shutdownTimeout] | Number | 8 | the amount of time in seconds that the worker may take to finish a job before exiting ungracefully | | [options.beatInterval] | Number | 15 | the amount of time in seconds between each heartbeat | | [options.queues] | Array.<string> | ['default'] | the queues this worker will fetch jobs from | | [options.middleware] | Array.<function()> | [] | a set of middleware to run before performing each job in koa.js-style middleware execution signature | | [options.registry] | Registry | Registry | the job registry to use when working | | [options.poolSize] | Number | concurrency+2 | the client connection pool size for this worker | **Example** ```js const worker = new Worker({ queues: ['critical', 'default', 'low'], }); worker.work(); ``` ### worker.work() ⇒ starts the worker fetch loop and job processing **Kind**: instance method of [Worker](#Worker) **Returns**: self, when working has been stopped by a signal or concurrent call to stop or quiet **See** - Worker.quiet - Worker.stop ### worker.quiet() Signals to the worker to discontinue fetching new jobs and allows the worker to continue processing any currently-running jobs **Kind**: instance method of [Worker](#Worker) ### worker.stop() ⇒ promise stops the worker **Kind**: instance method of [Worker](#Worker) **Returns**: promise - resolved when worker stops ### worker.beat() Sends a heartbeat for this server and interprets the response state (if present) to quiet or terminate the worker **Kind**: instance method of [Worker](#Worker) ### worker.use(fn) ⇒ FaktoryControl Adds a middleware function to the stack **Kind**: instance method of [Worker](#Worker) **Returns**: FaktoryControl - this **See**: [koa middleware](https://github.com/koajs/koa/blob/master/docs/guide.md#writing-middleware) | Param | Type | Description | | --- | --- | --- | | fn | function | koa-compose-style middleware function | **Example** ```js faktory.use(async (ctx, next) => { // a pool you created to hold database connections pool.use(async (conn) => { ctx.db = conn; await next(); }); }); ``` ### worker.register(name, fn) ⇒ FaktoryControl Adds a [JobFunction](external:JobFunction) to the [Registry](Registry) **Kind**: instance method of [Worker](#Worker) **Returns**: FaktoryControl - this | Param | Type | Description | | --- | --- | --- | | name | external:Jobtype | string descriptor for the jobtype | | fn | external:JobFunction | | **Example** ```js faktory.register('MyJob', (...args) => { // some work }); ``` ## HI : object An after-connect initial message from the server to handshake the connection **Kind**: global external **See**: external:HELLO **Properties** | Name | Type | Description | | --- | --- | --- | | v | number | faktory server protocol version number | | i | number | only present when password is required. number of password hash iterations. see [HELLO](HELLO). | | s | string | only present when password is required. salt for password hashing. see [HELLO](HELLO). | ## HELLO : object The client's response to the server's [HI](HI) to initiate a connection **Kind**: global external **See** - external:HI - [Faktory Protocol Specification](https://github.com/contribsys/faktory/blob/master/docs/protocol-specification.md) **Properties** | Name | Type | Description | | --- | --- | --- | | v | string | the faktory client protocol version | | hostname | string | name of the host that is running this worker | | wid | string | globally unique identifier for this worker | | pid | number | local process identifier for this worker on its host | | labels | Array.<string> | labels that apply to this worker, to allow producers to target work units to worker types. | | pwdhash | string | This field should be the hexadecimal representation of the ith SHA256 hash of the client password concatenated with the value in s. |