Operators

call>: Calls another workflow

call>: operator calls another workflow.

This operator embeds another workflow as a subtask.

# workflow1.dig
+step1:
  call>: another_workflow
# another_workflow.dig
+step2:
  sh>: tasks/step2.sh
call>: NAME

Name of a workflow.

Example: another_workflow

require>: Depends on another workflow

require>: operator runs another workflow. Unlike call> operator, the workflow is skipped if the workflow for the session time is already done successfully before.

This operator submits a new session to digdag.

# workflow1.dig
+step1:
  require>: another_workflow
# another_workflow.dig
+step2:
  sh>: tasks/step2.sh
require>: NAME

Name of a workflow.

Example: another_workflow

py>: Python scripts

py>: operator runs a Python script using python command.

See Python API documents for details including variable mappings to keyword arguments.

+step1:
  py>: my_step1_method
+step2:
  py>: tasks.MyWorkflow.step2
py>: [PACKAGE.CLASS.]METHOD

Name of a method to run.

  • py>: tasks.MyWorkflow.my_task

rb>: Ruby scripts

rb>: operator runs a Ruby script using ruby command.

See Ruby API documents for details including best practices how to configure the workflow using _export: require:.

_export:
  ruby:
    require: tasks/my_workflow

+step1:
  rb>: my_step1_method
+step2:
  rb>: Task::MyWorkflow.step2
rb>: [MODULE::CLASS.]METHOD

Name of a method to run.

  • rb>: Task::MyWorkflow.my_task
require: FILE

Name of a file to require.

  • require: task/my_workflow

sh>: Shell scripts

sh>: operator runs a shell script.

Run a shell command (/bin/sh)

+step1:
  sh>: echo "hello world"

Run a shell script

+step1:
  sh>: tasks/step1.sh
+step2:
  sh>: tasks/step2.sh
sh>: COMMAND [ARGS...]

Name of the command to run.

  • sh>: tasks/workflow.sh --task1

The shell defaults to /bin/sh. If an alternate shell such as zsh is desired, use the shell option in the _export section.

_export:
  sh:
    shell: [/usr/bin/zsh]

loop>: Repeat tasks

loop>: operator runs subtasks multiple times.

This operator exports ${i} variable for the subtasks. Its value begins from 0. For example, if count is 3, a task runs with i=0, i=1, and i=2.

(This operator is EXPERIMENTAL. Parameters may change in a future release)

+repeat:
  loop>: 7
  _do:
    +step1:
      sh>: echo ${new Date((session_unixtime + i * 60 * 60 * 24) * 1000).toLocaleDateString()} is ${i} days later than $session_date
    +step2:
      sh>: echo ${
            new Date((session_unixtime + i * 60 * 60) * 1000).toLocaleDateString()
            + " "
            + new Date((session_unixtime + i * 60 * 60) * 1000).toLocaleTimeString()
        } is ${i} hours later than ${session_local_time}
loop>: COUNT

Number of times to run the tasks.

  • loop>: 7
_parallel: BOOLEAN

Runs the repeating tasks in parallel.

  • _parallel: true
_do: TASKS
Tasks to run.

for_each>: Repeat tasks

for_each>: operator runs subtasks multiple times using sets of variables.

(This operator is EXPERIMENTAL. Parameters may change in a future release)

+repeat:
  for_each>:
    fruit: [apple, orange]
    verb: [eat, throw]
  _do:
    sh>: echo ${verb} ${fruit}
    # this will generate 4 tasks:
    #  +for-fruit=apple&verb=eat:
    #    sh>: echo eat apple
    #  +for-fruit=apple&verb=throw:
    #    sh>: echo throw apple
    #  +for-fruit=orange&verb=eat:
    #    sh>: echo eat orange
    #  +for-fruit=orange&verb=throw:
    #    sh>: echo throw orange
for_each>: VARIABLES

Variables used for the loop in key: [value, value, ...] syntax.

  • for_each>: {i: [1, 2, 3]}
_parallel: BOOLEAN

Runs the repeating tasks in parallel.

  • _parallel: true
_do: TASKS
Tasks to run.

if>: Conditional execution

if>: operator runs subtasks if true is given.

(This operator is EXPERIMENTAL. Parameters may change in a future release)

+run_if_param_is_true:
  if>: ${param}
  _do:
    sh>: echo ${param} == true
if>: BOOLEAN
true or false.
_do: TASKS
Tasks to run if true is given.

fail>: make the workflow failed

fail>: always fails and makes the workflow failed.

(This operator is EXPERIMENTAL. Parameters may change in a future release)

This operator is useful used with if> operator to validate resuls of a previous task with _check directive so that a workflow fails when the validation doesn’t pass.

+fail_if_too_few:
  if>: ${count < 10}
  _do:
    fail>: count is less than 10!
fail>: STRING
Message so that _error task can refer the message using ${error.message} syntax.

td>: Treasure Data queries

td>: operator runs a Hive or Presto query on Treasure Data.

TODO: add more description here

_export:
  td:
    apikey: YOUR/API_KEY
    database: www_access

+step1:
  td>: queries/step1.sql
+step2:
  td>: queries/step2.sql
  create_table: mytable_${session_date_compact}
+step3:
  td>: queries/step2.sql
  insert_into: mytable

Parameters

td>: FILE.sql

Path to a query template file. This file can contain ${...} syntax to embed variables.

  • td>: queries/step1.sql
create_table: NAME

Name of a table to create from the results. This option deletes the table if it already exists.

This option adds DROP TABLE IF EXISTS; CREATE TABLE AS (Presto) or INSERT OVERWRITE (Hive) commands before the SELECT statement. If the query includes a -- DIGDAG_INSERT_LINE line, the commands are inserted there.

  • create_table: my_table
insert_into: NAME

Name of a table to append results into. The table is created if it does not already exist.

This option adds INSERT INTO (Presto) or INSERT INTO TABLE (Hive) command at the beginning of SELECT statement. If the query includes -- DIGDAG_INSERT_LINE line, the command is inserted to the line.

  • insert_into: my_table
download_file: NAME

Saves query result as a local CSV file.

  • download_file: output.csv
store_last_results: BOOLEAN

Stores the first 1 row of the query results to ${td.last_results} variable (default: false). td.last_results is a map of column name and a value. To access to a single value, you can use ${td.last_results.my_count} syntax.

  • store_last_results: true
preview: BOOLEAN

Tries to show some query results to confirm the results of a query.

  • preview: true
result_url: NAME

Output the query results to the URL:

  • result_url: tableau://username:password@my.tableauserver.com/?mode=replace
database: NAME

Name of a database.

  • database: my_db
apikey: APIKEY

API key. You can set this at command line using -p td.apikey=$TD_APIKEY argument.

  • apikey: 992314/abcdef0123456789abcdef0123456789
endpoint: ADDRESS
API endpoint (default: api.treasuredata.com).
use_ssl: BOOLEAN
Enable SSL (https) to access to the endpoint (default: true).
engine: presto

Query engine (presto or hive).

  • engine: hive
  • engine: presto
priority: 0
Set Priority (From -2 (VERY LOW) to 2 (VERY HIGH) , default: 0 (NORMAL)).

Output parameters

td.last_job_id

The job id this task executed.

  • 52036074
td.last_results

The first 1 row of the query results as a map. This is available only when store_last_results: true is set.

  • {"path":"/index.html","count":1}

td_run>: Treasure Data saved queries

td_run>: operator runs a query saved on Treasure Data.

TODO: add more description here

_export:
  td:
    apikey: YOUR/API_KEY
    database: www_access

+step1:
  td_run>: myquery1
+step2:
  td_run>: myquery2
  session_time: 2016-01-01T01:01:01+0000

Parameters

td_run>: NAME

Name of a saved query.

  • td_run>: my_query
download_file: NAME

Saves query result as a local CSV file.

  • download_file: output.csv
store_last_results: BOOLEAN

Stores the first 1 row of the query results to ${td.last_results} variable (default: false). td.last_results is a map of column name and a value. To access to a single value, you can use ${td.last_results.my_count} syntax.

  • store_last_results: true
preview: BOOLEAN

Tries to show some query results to confirm the results of a query.

  • preview: true
apikey: APIKEY

API key. You can set this at command line using -p td.apikey=$TD_APIKEY argument.

  • apikey: 992314/abcdef0123456789abcdef0123456789
endpoint: ADDRESS
API endpoint (default: api.treasuredata.com).
use_ssl: BOOLEAN
Enable SSL (https) to access to the endpoint (default: true).

Output parameters

td.last_job_id

The job id this task executed.

  • 52036074
td.last_results

The first 1 row of the query results as a map. This is available only when store_last_results: true is set.

  • {"path":"/index.html","count":1}

td_load>: Treasure Data bulk loading

td_load>: operator loads data from storages, databases, or services.

TODO: add more description here

_export:
  td:
    apikey: YOUR/API_KEY

+step1:
  td_load>: config/guessed.dig
  database: prod
  table: raw
td_load>: FILE.yml

Path to a YAML template file. This configuration needs to be guessed using td command.

  • td_load>: imports/load.yml
database: NAME

Name of the database load data to.

  • database: my_database
table: NAME

Name of the table load data to.

  • table: my_table
apikey: APIKEY

API key. You can set this at command line using -p td.apikey=$TD_APIKEY argument.

  • apikey: 992314/abcdef0123456789abcdef0123456789
endpoint: ADDRESS
API endpoint (default: api.treasuredata.com).
use_ssl: BOOLEAN
Enable SSL (https) to access to the endpoint (default: true).

Output parameters

td.last_job_id

The job id this task executed.

  • 52036074

td_ddl>: Treasure Data operations

_type: td_ddl operator runs an operational task on Treasure Data.

TODO: add more description here

_export:
  td:
    apikey: YOUR/API_KEY
    database: www_access

+step1:
  _type: td_ddl
  create_tables: ["my_table_${session_date_compact}"]
+step2:
  _type: td_ddl
  drop_tables: ["my_table_${session_date_compact}"]
+step2:
  _type: td_ddl
  empty_tables: ["my_table_${session_date_compact}"]
create_tables: [ARRAY OF NAMES]

Create new tables if not exists.

  • create_tables: [my_table1, my_table2]
empty_tables: [ARRAY OF NAME]

Create new tables (drop it first if it exists).

  • empty_tables: [my_table1, my_table2]
drop_tables: [ARRAY OF NAMES]

Drop tables if exists.

  • drop_tables: [my_table1, my_table2]
create_databases: [ARRAY OF NAMES]

Create new databases if not exists.

  • create_databases: [my_database1, my_database2]
empty_databases: [ARRAY OF NAME]

Create new databases (drop it first if it exists).

  • empty_databases: [my_database1, my_database2]
drop_databases: [ARRAY OF NAMES]

Drop databases if exists.

  • drop_databases: [my_database1, my_database2]
apikey: APIKEY

API key. You can set this at command line using -p td.apikey=$TD_APIKEY argument.

  • apikey: 992314/abcdef0123456789abcdef0123456789
endpoint: ADDRESS
API endpoint (default: api.treasuredata.com).
use_ssl: BOOLEAN
Enable SSL (https) to access to the endpoint (default: true).

td_table_export>: Treasure Data table export to S3

td_table_export>: operator loads data from storages, databases, or services.

TODO: add more description here

_export:
  td:
    apikey: YOUR/API_KEY

+step1:
  _type: td_table_export
  database: mydb
  table: mytable
  file_format: jsonl.gz
  from: 2016-01-01 00:00:00 +0800
  to:   2016-02-01 00:00:00 +0800
  s3_bucket: my_backup_backet
  s3_path_prefix: mydb/mytable
  s3_access_key_id: ABCDEFGHJKLMNOPQRSTU
  s3_secret_access_key: QUtJ/QUpJWTQ3UkhZTERNUExTUEEQUtJQUpJWTQ3
database: NAME

Name of the database.

  • database: my_database
table: NAME

Name of the table to export.

  • table: my_table
file_format: TYPE

Output file format. Available formats are tsv.gz, jsonl.gz, json.gz, json-line.gz.

  • file_format: jsonl.gz
from: yyyy-MM-dd HH:mm:ss[ Z]

Export records from this time (inclusive). Actual time range is [from, to). Value should be a UNIX timestamp integer (seconds) or string in yyyy-MM-dd HH:mm:ss[ Z] format.

  • from: 2016-01-01 00:00:00 +0800
to: yyyy-MM-dd HH:mm:ss[ Z]

Export records to this time (exclusive). Actual time range is [from, to). Value should be a UNIX timestamp integer (seconds) or string in yyyy-MM-dd HH:mm:ss[ Z] format.

  • to: 2016-02-01 00:00:00 +0800
s3_bucket: NAME

S3 bucket name to export records to.

  • s3_bucket: my_backup_backet
s3_path_prefix: NAME

S3 file name prefix.

  • s3_path_prefix: mytable/mydb
s3_access_key_id: KEY

S3 access key id.

  • s3_access_key_id: ABCDEFGHJKLMNOPQRSTU
s3_secret_access_key: KEY

S3 secret access key.

  • s3_secret_access_key: QUtJ/QUpJWTQ3UkhZTERNUExTUEEQUtJQUpJWTQ3
apikey: APIKEY

API key. You can set this at command line using -p td.apikey=$TD_APIKEY argument.

  • apikey: 992314/abcdef0123456789abcdef0123456789
endpoint: ADDRESS
API endpoint (default: api.treasuredata.com).
use_ssl: BOOLEAN
Enable SSL (https) to access to the endpoint (default: true).

Output parameters

td.last_job_id

The job id this task executed.

  • 52036074

pg>: PostgreSQL operations

_type: pg operator runs queries and/or DDLs on PostgreSQL

_export:
  pg:
    host: 192.0.2.1
    port: 5430
    database: production_db
    user: app_user
    password: 1qazxsw23edcvfr4
    ssl: true

+replace_deduplicated_master_table:
  pg>: queries/dedup_master_table.sql
  create_table: dedup_master

+prepare_summary_table:
  pg>: queries/create_summary_table_ddl.sql

+insert_to_summary_table:
  pg>: queries/join_log_with_master.sql
  insert_into: summary_table
pg>: FILE.sql

Path of the query template file. This file can contain ${...} syntax to embed variables.

  • pg>: queries/complex_queries.sql
create_table: NAME

Table name to create from the results. This option deletes the table if it already exists.

This option adds DROP TABLE IF EXISTS; CREATE TABLE AS before the statements written in the query template file. Also, CREATE TABLE statement can be written in the query template file itself without this command.

  • create_table: dest_table
insert_into: NAME

Table name to append results into.

This option adds INSERT INTO before the statements written in the query template file. Also, INSERT INTO statement can be written in the query template file itself without this command.

  • insert_into: dest_table
download_file: NAME

Local CSV file name to be downloaded. The file includes the result of query.

  • download_file: output.csv
database: NAME

Database name.

  • database: my_db
host: NAME

Hostname or IP address of the database.

  • host: db.foobar.com
port: NUMBER

Port number to connect to the database (default: 5432).

  • port: 2345
user: NAME

User to connect to the database

  • user: app_user
password: NAME

User password to connect to the database (default: empty)

  • password: 12345678iuytrewq
ssl: BOOLEAN

Enable SSL to connect to the database (default: false).

  • ssl: true
schema: NAME

Default schema name (default: public)

  • schema: my_schema

TODO: Add some other commands

mail>: Sending email

mail>: operator sends an email.

To use Gmail SMTP server, you need to do either of:

  1. Generate a new app password at App passwords. This needs to enable 2-Step Verification first.
  2. Enable access for less secure apps at Less secure apps. This works even if 2-Step Verification is not enabled.
_export:
  mail:
    host: smtp.gmail.com
    port: 587
    from: "you@gmail.com"
    username: "you@gmail.com"
    password: "...password..."
    debug: true

+step1:
  mail>: body.txt
  subject: workflow started
  to: [me@example.com]

+step2:
  _type: mail
  body: this is email body in string
  subject: workflow started
  to: [me@example.com]

+step3:
  sh>: this_task_might_fail.sh
  error:
    mail>: body.txt
    subject: this workflow failed
    to: [me@example.com]
mail>: FILE

Path to a mail body template file. This file can contain ${...} syntax to embed variables.

  • mail>: mail_body.txt
subject: SUBJECT

Subject of the email.

  • subject: Mail From Digdag
body: TEXT

Email body if tempalte file path is not set.

  • body: Hello, this is from Digdag
to: [ADDR1, ADDR2, ...]

To addresses.

  • to: [analyst@examile.com]
from: ADDR

From address.

  • from: admin@example.com
host: NAME

SMTP host name.

  • host: smtp.gmail.com
port: NAME

SMTP port number.

  • port: 587
username: NAME

SMTP login username if authentication is required me.

  • username: me
password: PASSWORD

SMTP login password.

  • password: MyPaSsWoRd
tls: BOOLEAN

Enables TLS handshake.

  • tls: true
ssl: BOOLEAN

Enables legacy SSL encryption.

  • ssl: false
html: BOOLEAN

Uses HTML mail (default: false).

  • html: true
debug: BOOLEAN

Shows debug logs (default: false).

  • debug: false
attach_files: ARRAY

Attach files. Each element is an object of:

  • path: FILE: Path to a file to attach.
  • content_type: Content-Type of this file. Default is application/octet-stream.
  • filename: Name of this file. Default is base name of the path.

Example:

attach_files:
  - path: data.csv
  - path: output.dat
    filename: workflow_result_data.csv
  - path: images/image1.png
    content_type: image/png

embulk>: Embulk data transfer

embulk>: operator runs Embulk to transfer data across storages including local files.

+load:
  embulk>: data/load.yml
embulk>: FILE.yml

Path to a configuration template file.

  • embulk>: embulk/mysql_to_csv.yml