Operators¶
- call>: Calls another workflow
- require>: Depends on another workflow
- py>: Python scripts
- rb>: Ruby scripts
- sh>: Shell scripts
- loop>: Repeat tasks
- for_each>: Repeat tasks
- if>: Conditional execution
- fail>: make the workflow failed
- td>: Treasure Data queries
- td_run>: Treasure Data saved queries
- td_load>: Treasure Data bulk loading
- td_ddl>: Treasure Data operations
- td_table_export>: Treasure Data table export to S3
- pg>: PostgreSQL operations
- mail>: Sending email
- embulk>: Embulk data transfer
call>: Calls another workflow¶
call>: operator calls another workflow.
This operator embeds another workflow as a subtask.
# workflow1.dig
+step1:
call>: another_workflow
# another_workflow.dig
+step2:
sh>: tasks/step2.sh
- call>: NAME
Name of a workflow.
Example: another_workflow
require>: Depends on another workflow¶
require>: operator runs another workflow. Unlike call> operator, the workflow is skipped if the workflow for the session time is already done successfully before.
This operator submits a new session to digdag.
# workflow1.dig
+step1:
require>: another_workflow
# another_workflow.dig
+step2:
sh>: tasks/step2.sh
- require>: NAME
Name of a workflow.
Example: another_workflow
py>: Python scripts¶
py>: operator runs a Python script using python command.
See Python API documents for details including variable mappings to keyword arguments.
+step1:
py>: my_step1_method
+step2:
py>: tasks.MyWorkflow.step2
- py>: [PACKAGE.CLASS.]METHOD
Name of a method to run.
- py>: tasks.MyWorkflow.my_task
rb>: Ruby scripts¶
rb>: operator runs a Ruby script using ruby command.
See Ruby API documents for details including best practices how to configure the workflow using _export: require:.
_export:
ruby:
require: tasks/my_workflow
+step1:
rb>: my_step1_method
+step2:
rb>: Task::MyWorkflow.step2
- rb>: [MODULE::CLASS.]METHOD
Name of a method to run.
- rb>: Task::MyWorkflow.my_task
- require: FILE
Name of a file to require.
- require: task/my_workflow
sh>: Shell scripts¶
sh>: operator runs a shell script.
Run a shell command (/bin/sh)
+step1:
sh>: echo "hello world"
Run a shell script
+step1:
sh>: tasks/step1.sh
+step2:
sh>: tasks/step2.sh
- sh>: COMMAND [ARGS...]
Name of the command to run.
- sh>: tasks/workflow.sh --task1
The shell defaults to /bin/sh. If an alternate shell such as zsh is desired, use the shell option in the _export section.
_export:
sh:
shell: [/usr/bin/zsh]
loop>: Repeat tasks¶
loop>: operator runs subtasks multiple times.
This operator exports ${i} variable for the subtasks. Its value begins from 0. For example, if count is 3, a task runs with i=0, i=1, and i=2.
(This operator is EXPERIMENTAL. Parameters may change in a future release)
+repeat:
loop>: 7
_do:
+step1:
sh>: echo ${new Date((session_unixtime + i * 60 * 60 * 24) * 1000).toLocaleDateString()} is ${i} days later than $session_date
+step2:
sh>: echo ${
new Date((session_unixtime + i * 60 * 60) * 1000).toLocaleDateString()
+ " "
+ new Date((session_unixtime + i * 60 * 60) * 1000).toLocaleTimeString()
} is ${i} hours later than ${session_local_time}
- loop>: COUNT
Number of times to run the tasks.
- loop>: 7
- _parallel: BOOLEAN
Runs the repeating tasks in parallel.
- _parallel: true
- _do: TASKS
- Tasks to run.
for_each>: Repeat tasks¶
for_each>: operator runs subtasks multiple times using sets of variables.
(This operator is EXPERIMENTAL. Parameters may change in a future release)
+repeat:
for_each>:
fruit: [apple, orange]
verb: [eat, throw]
_do:
sh>: echo ${verb} ${fruit}
# this will generate 4 tasks:
# +for-fruit=apple&verb=eat:
# sh>: echo eat apple
# +for-fruit=apple&verb=throw:
# sh>: echo throw apple
# +for-fruit=orange&verb=eat:
# sh>: echo eat orange
# +for-fruit=orange&verb=throw:
# sh>: echo throw orange
- for_each>: VARIABLES
Variables used for the loop in
key: [value, value, ...]syntax.- for_each>: {i: [1, 2, 3]}
- _parallel: BOOLEAN
Runs the repeating tasks in parallel.
- _parallel: true
- _do: TASKS
- Tasks to run.
if>: Conditional execution¶
if>: operator runs subtasks if true is given.
(This operator is EXPERIMENTAL. Parameters may change in a future release)
+run_if_param_is_true:
if>: ${param}
_do:
sh>: echo ${param} == true
- if>: BOOLEAN
trueorfalse.- _do: TASKS
- Tasks to run if
trueis given.
fail>: make the workflow failed¶
fail>: always fails and makes the workflow failed.
(This operator is EXPERIMENTAL. Parameters may change in a future release)
This operator is useful used with if> operator to validate resuls of a previous task with _check directive so that a workflow fails when the validation doesn’t pass.
+fail_if_too_few:
if>: ${count < 10}
_do:
fail>: count is less than 10!
- fail>: STRING
- Message so that
_errortask can refer the message using${error.message}syntax.
td>: Treasure Data queries¶
td>: operator runs a Hive or Presto query on Treasure Data.
TODO: add more description here
_export:
td:
apikey: YOUR/API_KEY
database: www_access
+step1:
td>: queries/step1.sql
+step2:
td>: queries/step2.sql
create_table: mytable_${session_date_compact}
+step3:
td>: queries/step2.sql
insert_into: mytable
Parameters¶
- td>: FILE.sql
Path to a query template file. This file can contain
${...}syntax to embed variables.- td>: queries/step1.sql
- create_table: NAME
Name of a table to create from the results. This option deletes the table if it already exists.
This option adds DROP TABLE IF EXISTS; CREATE TABLE AS (Presto) or INSERT OVERWRITE (Hive) commands before the SELECT statement. If the query includes a
-- DIGDAG_INSERT_LINEline, the commands are inserted there.- create_table: my_table
- insert_into: NAME
Name of a table to append results into. The table is created if it does not already exist.
This option adds INSERT INTO (Presto) or INSERT INTO TABLE (Hive) command at the beginning of SELECT statement. If the query includes
-- DIGDAG_INSERT_LINEline, the command is inserted to the line.- insert_into: my_table
- download_file: NAME
Saves query result as a local CSV file.
- download_file: output.csv
- store_last_results: BOOLEAN
Stores the first 1 row of the query results to
${td.last_results}variable (default: false). td.last_results is a map of column name and a value. To access to a single value, you can use${td.last_results.my_count}syntax.- store_last_results: true
- preview: BOOLEAN
Tries to show some query results to confirm the results of a query.
- preview: true
- result_url: NAME
Output the query results to the URL:
- result_url: tableau://username:password@my.tableauserver.com/?mode=replace
- database: NAME
Name of a database.
- database: my_db
- apikey: APIKEY
API key. You can set this at command line using
-p td.apikey=$TD_APIKEYargument.- apikey: 992314/abcdef0123456789abcdef0123456789
- endpoint: ADDRESS
- API endpoint (default: api.treasuredata.com).
- use_ssl: BOOLEAN
- Enable SSL (https) to access to the endpoint (default: true).
- engine: presto
Query engine (
prestoorhive).- engine: hive
- engine: presto
- priority: 0
- Set Priority (From
-2(VERY LOW) to2(VERY HIGH) , default: 0 (NORMAL)).
Output parameters¶
- td.last_job_id
The job id this task executed.
- 52036074
- td.last_results
The first 1 row of the query results as a map. This is available only when
store_last_results: trueis set.- {"path":"/index.html","count":1}
td_run>: Treasure Data saved queries¶
td_run>: operator runs a query saved on Treasure Data.
TODO: add more description here
_export:
td:
apikey: YOUR/API_KEY
database: www_access
+step1:
td_run>: myquery1
+step2:
td_run>: myquery2
session_time: 2016-01-01T01:01:01+0000
Parameters¶
- td_run>: NAME
Name of a saved query.
- td_run>: my_query
- download_file: NAME
Saves query result as a local CSV file.
- download_file: output.csv
- store_last_results: BOOLEAN
Stores the first 1 row of the query results to
${td.last_results}variable (default: false). td.last_results is a map of column name and a value. To access to a single value, you can use${td.last_results.my_count}syntax.- store_last_results: true
- preview: BOOLEAN
Tries to show some query results to confirm the results of a query.
- preview: true
- apikey: APIKEY
API key. You can set this at command line using
-p td.apikey=$TD_APIKEYargument.- apikey: 992314/abcdef0123456789abcdef0123456789
- endpoint: ADDRESS
- API endpoint (default: api.treasuredata.com).
- use_ssl: BOOLEAN
- Enable SSL (https) to access to the endpoint (default: true).
Output parameters¶
- td.last_job_id
The job id this task executed.
- 52036074
- td.last_results
The first 1 row of the query results as a map. This is available only when
store_last_results: trueis set.- {"path":"/index.html","count":1}
td_load>: Treasure Data bulk loading¶
td_load>: operator loads data from storages, databases, or services.
TODO: add more description here
_export:
td:
apikey: YOUR/API_KEY
+step1:
td_load>: config/guessed.dig
database: prod
table: raw
- td_load>: FILE.yml
Path to a YAML template file. This configuration needs to be guessed using td command.
- td_load>: imports/load.yml
- database: NAME
Name of the database load data to.
- database: my_database
- table: NAME
Name of the table load data to.
- table: my_table
- apikey: APIKEY
API key. You can set this at command line using
-p td.apikey=$TD_APIKEYargument.- apikey: 992314/abcdef0123456789abcdef0123456789
- endpoint: ADDRESS
- API endpoint (default: api.treasuredata.com).
- use_ssl: BOOLEAN
- Enable SSL (https) to access to the endpoint (default: true).
Output parameters¶
- td.last_job_id
The job id this task executed.
- 52036074
td_ddl>: Treasure Data operations¶
_type: td_ddl operator runs an operational task on Treasure Data.
TODO: add more description here
_export:
td:
apikey: YOUR/API_KEY
database: www_access
+step1:
_type: td_ddl
create_tables: ["my_table_${session_date_compact}"]
+step2:
_type: td_ddl
drop_tables: ["my_table_${session_date_compact}"]
+step2:
_type: td_ddl
empty_tables: ["my_table_${session_date_compact}"]
- create_tables: [ARRAY OF NAMES]
Create new tables if not exists.
- create_tables: [my_table1, my_table2]
- empty_tables: [ARRAY OF NAME]
Create new tables (drop it first if it exists).
- empty_tables: [my_table1, my_table2]
- drop_tables: [ARRAY OF NAMES]
Drop tables if exists.
- drop_tables: [my_table1, my_table2]
- create_databases: [ARRAY OF NAMES]
Create new databases if not exists.
- create_databases: [my_database1, my_database2]
- empty_databases: [ARRAY OF NAME]
Create new databases (drop it first if it exists).
- empty_databases: [my_database1, my_database2]
- drop_databases: [ARRAY OF NAMES]
Drop databases if exists.
- drop_databases: [my_database1, my_database2]
- apikey: APIKEY
API key. You can set this at command line using
-p td.apikey=$TD_APIKEYargument.- apikey: 992314/abcdef0123456789abcdef0123456789
- endpoint: ADDRESS
- API endpoint (default: api.treasuredata.com).
- use_ssl: BOOLEAN
- Enable SSL (https) to access to the endpoint (default: true).
td_table_export>: Treasure Data table export to S3¶
td_table_export>: operator loads data from storages, databases, or services.
TODO: add more description here
_export:
td:
apikey: YOUR/API_KEY
+step1:
_type: td_table_export
database: mydb
table: mytable
file_format: jsonl.gz
from: 2016-01-01 00:00:00 +0800
to: 2016-02-01 00:00:00 +0800
s3_bucket: my_backup_backet
s3_path_prefix: mydb/mytable
s3_access_key_id: ABCDEFGHJKLMNOPQRSTU
s3_secret_access_key: QUtJ/QUpJWTQ3UkhZTERNUExTUEEQUtJQUpJWTQ3
- database: NAME
Name of the database.
- database: my_database
- table: NAME
Name of the table to export.
- table: my_table
- file_format: TYPE
Output file format. Available formats are
tsv.gz,jsonl.gz,json.gz,json-line.gz.- file_format: jsonl.gz
- from: yyyy-MM-dd HH:mm:ss[ Z]
Export records from this time (inclusive). Actual time range is [from, to). Value should be a UNIX timestamp integer (seconds) or string in yyyy-MM-dd HH:mm:ss[ Z] format.
- from: 2016-01-01 00:00:00 +0800
- to: yyyy-MM-dd HH:mm:ss[ Z]
Export records to this time (exclusive). Actual time range is [from, to). Value should be a UNIX timestamp integer (seconds) or string in yyyy-MM-dd HH:mm:ss[ Z] format.
- to: 2016-02-01 00:00:00 +0800
- s3_bucket: NAME
S3 bucket name to export records to.
- s3_bucket: my_backup_backet
- s3_path_prefix: NAME
S3 file name prefix.
- s3_path_prefix: mytable/mydb
- s3_access_key_id: KEY
S3 access key id.
- s3_access_key_id: ABCDEFGHJKLMNOPQRSTU
- s3_secret_access_key: KEY
S3 secret access key.
- s3_secret_access_key: QUtJ/QUpJWTQ3UkhZTERNUExTUEEQUtJQUpJWTQ3
- apikey: APIKEY
API key. You can set this at command line using
-p td.apikey=$TD_APIKEYargument.- apikey: 992314/abcdef0123456789abcdef0123456789
- endpoint: ADDRESS
- API endpoint (default: api.treasuredata.com).
- use_ssl: BOOLEAN
- Enable SSL (https) to access to the endpoint (default: true).
Output parameters¶
- td.last_job_id
The job id this task executed.
- 52036074
pg>: PostgreSQL operations¶
_type: pg operator runs queries and/or DDLs on PostgreSQL
_export:
pg:
host: 192.0.2.1
port: 5430
database: production_db
user: app_user
password: 1qazxsw23edcvfr4
ssl: true
+replace_deduplicated_master_table:
pg>: queries/dedup_master_table.sql
create_table: dedup_master
+prepare_summary_table:
pg>: queries/create_summary_table_ddl.sql
+insert_to_summary_table:
pg>: queries/join_log_with_master.sql
insert_into: summary_table
- pg>: FILE.sql
Path of the query template file. This file can contain
${...}syntax to embed variables.- pg>: queries/complex_queries.sql
- create_table: NAME
Table name to create from the results. This option deletes the table if it already exists.
This option adds DROP TABLE IF EXISTS; CREATE TABLE AS before the statements written in the query template file. Also, CREATE TABLE statement can be written in the query template file itself without this command.
- create_table: dest_table
- insert_into: NAME
Table name to append results into.
This option adds INSERT INTO before the statements written in the query template file. Also, INSERT INTO statement can be written in the query template file itself without this command.
- insert_into: dest_table
- download_file: NAME
Local CSV file name to be downloaded. The file includes the result of query.
- download_file: output.csv
- database: NAME
Database name.
- database: my_db
- host: NAME
Hostname or IP address of the database.
- host: db.foobar.com
- port: NUMBER
Port number to connect to the database (default: 5432).
- port: 2345
- user: NAME
User to connect to the database
- user: app_user
- password: NAME
User password to connect to the database (default: empty)
- password: 12345678iuytrewq
- ssl: BOOLEAN
Enable SSL to connect to the database (default: false).
- ssl: true
- schema: NAME
Default schema name (default: public)
- schema: my_schema
TODO: Add some other commands
mail>: Sending email¶
mail>: operator sends an email.
To use Gmail SMTP server, you need to do either of:
- Generate a new app password at App passwords. This needs to enable 2-Step Verification first.
- Enable access for less secure apps at Less secure apps. This works even if 2-Step Verification is not enabled.
_export:
mail:
host: smtp.gmail.com
port: 587
from: "you@gmail.com"
username: "you@gmail.com"
password: "...password..."
debug: true
+step1:
mail>: body.txt
subject: workflow started
to: [me@example.com]
+step2:
_type: mail
body: this is email body in string
subject: workflow started
to: [me@example.com]
+step3:
sh>: this_task_might_fail.sh
error:
mail>: body.txt
subject: this workflow failed
to: [me@example.com]
- mail>: FILE
Path to a mail body template file. This file can contain
${...}syntax to embed variables.- mail>: mail_body.txt
- subject: SUBJECT
Subject of the email.
- subject: Mail From Digdag
- body: TEXT
Email body if tempalte file path is not set.
- body: Hello, this is from Digdag
- to: [ADDR1, ADDR2, ...]
To addresses.
- to: [analyst@examile.com]
- from: ADDR
From address.
- from: admin@example.com
- host: NAME
SMTP host name.
- host: smtp.gmail.com
- port: NAME
SMTP port number.
- port: 587
- username: NAME
SMTP login username if authentication is required me.
- username: me
- password: PASSWORD
SMTP login password.
- password: MyPaSsWoRd
- tls: BOOLEAN
Enables TLS handshake.
- tls: true
- ssl: BOOLEAN
Enables legacy SSL encryption.
- ssl: false
- html: BOOLEAN
Uses HTML mail (default: false).
- html: true
- debug: BOOLEAN
Shows debug logs (default: false).
- debug: false
- attach_files: ARRAY
Attach files. Each element is an object of:
- path: FILE: Path to a file to attach.
- content_type: Content-Type of this file. Default is application/octet-stream.
- filename: Name of this file. Default is base name of the path.
Example:
attach_files: - path: data.csv - path: output.dat filename: workflow_result_data.csv - path: images/image1.png content_type: image/png
embulk>: Embulk data transfer¶
embulk>: operator runs Embulk to transfer data across storages including local files.
+load:
embulk>: data/load.yml
- embulk>: FILE.yml
Path to a configuration template file.
- embulk>: embulk/mysql_to_csv.yml