# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import ast
import codecs
import copy
import datetime as dt
import itertools
import json
import logging
import math
import os
import pickle
import traceback
from collections import defaultdict
from datetime import timedelta
from functools import wraps
from textwrap import dedent
import markdown
import sqlalchemy as sqla
import pendulum
from flask import (
abort, jsonify, redirect, url_for, request, Markup, Response,
current_app, render_template, make_response)
from flask import flash
from flask_admin import BaseView, expose, AdminIndexView
from flask_admin.actions import action
from flask_admin.babel import lazy_gettext
from flask_admin.contrib.sqla import ModelView
from flask_admin.form.fields import DateTimeField
from flask_admin.tools import iterdecode
import lazy_object_proxy
from jinja2 import escape
from jinja2.sandbox import ImmutableSandboxedEnvironment
from jinja2.utils import pformat
from past.builtins import basestring
from pygments import highlight, lexers
import six
from pygments.formatters.html import HtmlFormatter
from six.moves.urllib.parse import quote, unquote
from sqlalchemy import or_, desc, and_, union_all
from wtforms import (
Form, SelectField, TextAreaField, PasswordField,
StringField, IntegerField, validators)
import nvd3
import airflow
from airflow import configuration
from airflow.configuration import conf
from airflow import jobs, models, settings
from airflow.api.common.experimental.mark_tasks import (set_dag_run_state_to_running,
set_dag_run_state_to_success,
set_dag_run_state_to_failed)
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator, Connection, DagRun, errors, XCom
from airflow.models.dagcode import DagCode
from airflow.settings import STATE_COLORS, STORE_SERIALIZED_DAGS
from airflow.operators.subdag_operator import SubDagOperator
from airflow.ti_deps.dep_context import RUNNING_DEPS, SCHEDULER_QUEUED_DEPS, DepContext
from airflow.utils import timezone
from airflow.utils.dates import infer_time_unit, scale_time_units, parse_execution_date
from airflow.utils.db import create_session, provide_session
from airflow.utils.helpers import alchemy_to_dict, render_log_filename
from airflow.utils.net import get_hostname
from airflow.utils.state import State
from airflow.utils.timezone import datetime
from airflow.www import utils as wwwutils
from airflow.www.forms import (DateTimeForm, DateTimeWithNumRunsForm,
DateTimeWithNumRunsWithDagRunsForm)
from airflow.www.validators import GreaterEqualThan
QUERY_LIMIT = 100000
CHART_LIMIT = 200000
UTF8_READER = codecs.getreader('utf-8')
dagbag = models.DagBag(settings.DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS)
login_required = airflow.login.login_required
current_user = airflow.login.current_user
logout_user = airflow.login.logout_user
FILTER_BY_OWNER = False
PAGE_SIZE = conf.getint('webserver', 'page_size')
log = logging.getLogger(__name__)
if conf.getboolean('webserver', 'FILTER_BY_OWNER'):
# filter_by_owner if authentication is enabled and filter_by_owner is true
FILTER_BY_OWNER = not current_app.config['LOGIN_DISABLED']
def dag_link(v, c, m, p):
if m.dag_id is None:
return Markup()
kwargs = {'dag_id': m.dag_id}
# This is called with various objects, TIs, (ORM) DAG - some have this,
# some don't
if hasattr(m, 'execution_date'):
kwargs['execution_date'] = m.execution_date
url = url_for('airflow.graph', **kwargs)
return Markup(
'{}').format(url, m.dag_id)
def log_url_formatter(v, c, m, p):
url = url_for(
'airflow.log',
dag_id=m.dag_id,
task_id=m.task_id,
execution_date=m.execution_date.isoformat())
return Markup(
''
' '
'').format(log_url=url)
def dag_run_link(v, c, m, p):
url = url_for(
'airflow.graph',
dag_id=m.dag_id,
run_id=m.run_id,
execution_date=m.execution_date)
title = m.run_id
return Markup('{title}').format(**locals())
def task_instance_link(v, c, m, p):
url = url_for(
'airflow.task',
dag_id=m.dag_id,
task_id=m.task_id,
execution_date=m.execution_date.isoformat())
url_root = url_for(
'airflow.graph',
dag_id=m.dag_id,
root=m.task_id,
execution_date=m.execution_date.isoformat())
return Markup(
"""
{m.task_id}
""").format(**locals())
def state_token(state):
color = State.color(state)
return Markup(
''
'{state}').format(**locals())
def parse_datetime_f(value):
if not isinstance(value, dt.datetime):
return value
return timezone.make_aware(value)
def state_f(v, c, m, p):
return state_token(m.state)
def duration_f(v, c, m, p):
if m.end_date and m.duration:
return timedelta(seconds=m.duration)
def datetime_f(v, c, m, p):
attr = getattr(m, p)
dttm = attr.isoformat() if attr else ''
if timezone.utcnow().isoformat()[:4] == dttm[:4]:
dttm = dttm[5:]
return Markup("{}").format(dttm)
def nobr_f(v, c, m, p):
return Markup("{}").format(getattr(m, p))
def label_link(v, c, m, p):
try:
default_params = ast.literal_eval(m.default_params)
except Exception:
default_params = {}
url = url_for(
'airflow.chart', chart_id=m.id, iteration_no=m.iteration_no,
**default_params)
title = m.label
return Markup("{title}").format(**locals())
def pool_link(v, c, m, p):
title = m.pool
url = url_for('taskinstance.index_view', flt1_pool_equals=m.pool)
return Markup("{title}").format(**locals())
def pygment_html_render(s, lexer=lexers.TextLexer):
return highlight(
s,
lexer(),
HtmlFormatter(linenos=True),
)
def render(obj, lexer):
out = ""
if isinstance(obj, basestring):
out += Markup(pygment_html_render(obj, lexer))
elif isinstance(obj, (tuple, list)):
for i, s in enumerate(obj):
out += Markup("
List item #{}
".format(i))
out += Markup("" + pygment_html_render(s, lexer) + "
")
elif isinstance(obj, dict):
for k, v in obj.items():
out += Markup('Dict item "{}"
'.format(k))
out += Markup("" + pygment_html_render(v, lexer) + "
")
return out
def wrapped_markdown(s, css_class=None):
if s is None:
return None
return Markup(
'' + markdown.markdown(s) + "
"
).format(css_class=css_class)
attr_renderer = {
'bash_command': lambda x: render(x, lexers.BashLexer),
'hql': lambda x: render(x, lexers.SqlLexer),
'sql': lambda x: render(x, lexers.SqlLexer),
'doc': lambda x: render(x, lexers.TextLexer),
'doc_json': lambda x: render(x, lexers.JsonLexer),
'doc_rst': lambda x: render(x, lexers.RstLexer),
'doc_yaml': lambda x: render(x, lexers.YamlLexer),
'doc_md': wrapped_markdown,
'python_callable': lambda x: render(
wwwutils.get_python_source(x),
lexers.PythonLexer,
),
}
def data_profiling_required(f):
"""Decorator for views requiring data profiling access"""
@wraps(f)
def decorated_function(*args, **kwargs):
if (
current_app.config['LOGIN_DISABLED'] or
(not current_user.is_anonymous and current_user.data_profiling())
):
return f(*args, **kwargs)
else:
flash("This page requires data profiling privileges", "error")
return redirect(url_for('admin.index'))
return decorated_function
def fused_slots(v, c, m, p):
url = url_for(
'taskinstance.index_view',
flt1_pool_equals=m.pool,
flt2_state_equals='running',
)
return Markup("{1}").format(url, m.used_slots())
def fqueued_slots(v, c, m, p):
url = url_for(
'taskinstance.index_view',
flt1_pool_equals=m.pool,
flt2_state_equals='queued',
sort='1',
desc='1'
)
return Markup("{1}").format(url, m.queued_slots())
def recurse_tasks(tasks, task_ids, dag_ids, task_id_to_dag):
if isinstance(tasks, list):
for task in tasks:
recurse_tasks(task, task_ids, dag_ids, task_id_to_dag)
return
if isinstance(tasks, SubDagOperator):
subtasks = tasks.subdag.tasks
dag_ids.append(tasks.subdag.dag_id)
for subtask in subtasks:
if subtask.task_id not in task_ids:
task_ids.append(subtask.task_id)
task_id_to_dag[subtask.task_id] = tasks.subdag
recurse_tasks(subtasks, task_ids, dag_ids, task_id_to_dag)
if isinstance(tasks, BaseOperator):
task_id_to_dag[tasks.task_id] = tasks.dag
def get_chart_height(dag):
"""
TODO(aoen): See [AIRFLOW-1263] We use the number of tasks in the DAG as a heuristic to
approximate the size of generated chart (otherwise the charts are tiny and unreadable
when DAGs have a large number of tasks). Ideally nvd3 should allow for dynamic-height
charts, that is charts that take up space based on the size of the components within.
"""
return 600 + len(dag.tasks) * 10
def get_date_time_num_runs_dag_runs_form_data(request, session, dag):
dttm = request.args.get('execution_date')
if dttm:
dttm = pendulum.parse(dttm)
else:
dttm = dag.latest_execution_date or timezone.utcnow()
base_date = request.args.get('base_date')
if base_date:
base_date = timezone.parse(base_date)
else:
# The DateTimeField widget truncates milliseconds and would loose
# the first dag run. Round to next second.
base_date = (dttm + timedelta(seconds=1)).replace(microsecond=0)
default_dag_run = conf.getint('webserver', 'default_dag_run_display_number')
num_runs = request.args.get('num_runs')
num_runs = int(num_runs) if num_runs else default_dag_run
DR = models.DagRun
drs = (
session.query(DR)
.filter(
DR.dag_id == dag.dag_id,
DR.execution_date <= base_date)
.order_by(desc(DR.execution_date))
.limit(num_runs)
.all()
)
dr_choices = []
dr_state = None
for dr in drs:
dr_choices.append((dr.execution_date.isoformat(), dr.run_id))
if dttm == dr.execution_date:
dr_state = dr.state
# Happens if base_date was changed and the selected dag run is not in result
if not dr_state and drs:
dr = drs[0]
dttm = dr.execution_date
dr_state = dr.state
return {
'dttm': dttm,
'base_date': base_date,
'num_runs': num_runs,
'execution_date': dttm.isoformat(),
'dr_choices': dr_choices,
'dr_state': dr_state,
}
class AirflowViewMixin(object):
def render(self, template, **kwargs):
kwargs['scheduler_job'] = lazy_object_proxy.Proxy(jobs.SchedulerJob.most_recent_job)
kwargs['macros'] = airflow.macros
return super(AirflowViewMixin, self).render(template, **kwargs)
class Airflow(AirflowViewMixin, BaseView):
def is_visible(self):
return False
@expose('/')
@login_required
def index(self):
return self.render('airflow/dags.html')
@expose('/chart_data')
@data_profiling_required
@wwwutils.gzipped
def chart_data(self):
from airflow import macros
import pandas as pd
if conf.getboolean('core', 'secure_mode'):
abort(404)
with create_session() as session:
chart_id = request.args.get('chart_id')
csv = request.args.get('csv') == "true"
chart = session.query(models.Chart).filter_by(id=chart_id).first()
db = session.query(
Connection).filter_by(conn_id=chart.conn_id).first()
payload = {
"state": "ERROR",
"error": ""
}
# Processing templated fields
try:
args = ast.literal_eval(chart.default_params)
if not isinstance(args, dict):
raise AirflowException('Not a dict')
except Exception:
args = {}
payload['error'] += (
"Default params is not valid, string has to evaluate as "
"a Python dictionary. ")
request_dict = {k: request.args.get(k) for k in request.args}
args.update(request_dict)
args['macros'] = macros
sandbox = ImmutableSandboxedEnvironment()
sql = sandbox.from_string(chart.sql).render(**args)
label = sandbox.from_string(chart.label).render(**args)
payload['sql_html'] = Markup(highlight(
sql,
lexers.SqlLexer(), # Lexer call
HtmlFormatter(noclasses=True))
)
payload['label'] = label
pd.set_option('display.max_colwidth', 100)
try:
hook = db.get_hook()
df = hook.get_pandas_df(
wwwutils.limit_sql(sql, CHART_LIMIT, conn_type=db.conn_type))
df = df.fillna(0)
except Exception as e:
payload['error'] += "SQL execution failed. Details: " + str(e)
if csv:
return Response(
response=df.to_csv(index=False),
status=200,
mimetype="application/text")
if not payload['error'] and len(df) == CHART_LIMIT:
payload['warning'] = (
"Data has been truncated to {0}"
" rows. Expect incomplete results.").format(CHART_LIMIT)
if not payload['error'] and len(df) == 0:
payload['error'] += "Empty result set. "
elif (
not payload['error'] and
chart.sql_layout == 'series' and
chart.chart_type != "datatable" and
len(df.columns) < 3):
payload['error'] += "SQL needs to return at least 3 columns. "
elif (
not payload['error'] and
chart.sql_layout == 'columns' and
len(df.columns) < 2):
payload['error'] += "SQL needs to return at least 2 columns. "
elif not payload['error']:
import numpy as np
chart_type = chart.chart_type
data = None
if chart.show_datatable or chart_type == "datatable":
data = df.to_dict(orient="split")
data['columns'] = [{'title': c} for c in data['columns']]
payload['data'] = data
# Trying to convert time to something Highcharts likes
x_col = 1 if chart.sql_layout == 'series' else 0
if chart.x_is_date:
try:
# From string to datetime
df[df.columns[x_col]] = pd.to_datetime(
df[df.columns[x_col]])
df[df.columns[x_col]] = df[df.columns[x_col]].apply(
lambda x: int(x.strftime("%s")) * 1000)
except Exception:
payload['error'] = "Time conversion failed"
if chart_type == 'datatable':
payload['state'] = 'SUCCESS'
return wwwutils.json_response(payload)
else:
if chart.sql_layout == 'series':
# User provides columns (series, x, y)
df[df.columns[2]] = df[df.columns[2]].astype(np.float)
df = df.pivot_table(
index=df.columns[1],
columns=df.columns[0],
values=df.columns[2], aggfunc=np.sum)
else:
# User provides columns (x, y, metric1, metric2, ...)
df.index = df[df.columns[0]]
df = df.sort_values(by=df.columns[0])
del df[df.columns[0]]
for col in df.columns:
df[col] = df[col].astype(np.float)
df = df.fillna(0)
NVd3ChartClass = chart_mapping.get(chart.chart_type)
NVd3ChartClass = getattr(nvd3, NVd3ChartClass)
nvd3_chart = NVd3ChartClass(x_is_date=chart.x_is_date)
for col in df.columns:
nvd3_chart.add_serie(name=col, y=df[col].tolist(), x=df[col].index.tolist())
try:
nvd3_chart.buildcontent()
payload['chart_type'] = nvd3_chart.__class__.__name__
payload['htmlcontent'] = nvd3_chart.htmlcontent
except Exception as e:
payload['error'] = str(e)
payload['state'] = 'SUCCESS'
payload['request_dict'] = request_dict
return wwwutils.json_response(payload)
@expose('/chart')
@data_profiling_required
def chart(self):
if conf.getboolean('core', 'secure_mode'):
abort(404)
with create_session() as session:
chart_id = request.args.get('chart_id')
embed = request.args.get('embed')
chart = session.query(models.Chart).filter_by(id=chart_id).first()
NVd3ChartClass = chart_mapping.get(chart.chart_type)
if not NVd3ChartClass:
flash(
"Not supported anymore as the license was incompatible, "
"sorry",
"danger")
redirect('/admin/chart/')
sql = ""
if chart.show_sql:
sql = Markup(highlight(
chart.sql,
lexers.SqlLexer(), # Lexer call
HtmlFormatter(noclasses=True))
)
return self.render(
'airflow/nvd3.html',
chart=chart,
title="Airflow - Chart",
sql=sql,
label=chart.label,
embed=embed)
@expose('/dag_stats')
@login_required
@provide_session
def dag_stats(self, session=None):
dr = models.DagRun
dm = models.DagModel
dag_ids = session.query(dm.dag_id)
dag_state_stats = (
session.query(dr.dag_id, dr.state, sqla.func.count(dr.state)).group_by(dr.dag_id, dr.state)
)
data = {}
for (dag_id, ) in dag_ids:
data[dag_id] = {}
for dag_id, state, count in dag_state_stats:
if dag_id not in data:
data[dag_id] = {}
data[dag_id][state] = count
payload = {}
for dag_id, d in data.items():
payload[dag_id] = []
for state in State.dag_states:
count = d.get(state, 0)
payload[dag_id].append({
'state': state,
'count': count
})
return wwwutils.json_response(payload)
@expose('/task_stats')
@login_required
@provide_session
def task_stats(self, session=None):
TI = models.TaskInstance
DagRun = models.DagRun
Dag = models.DagModel
# Filter by get parameters
selected_dag_ids = {
unquote(dag_id) for dag_id in request.args.get('dag_ids', '').split(',') if dag_id
}
LastDagRun = (
session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date'))
.join(Dag, Dag.dag_id == DagRun.dag_id)
.filter(DagRun.state != State.RUNNING)
.filter(Dag.is_active == True) # noqa: E712
.filter(Dag.is_subdag == False) # noqa: E712
.group_by(DagRun.dag_id)
)
RunningDagRun = (
session.query(DagRun.dag_id, DagRun.execution_date)
.join(Dag, Dag.dag_id == DagRun.dag_id)
.filter(DagRun.state == State.RUNNING)
.filter(Dag.is_active == True) # noqa: E712
.filter(Dag.is_subdag == False) # noqa: E712
)
if selected_dag_ids:
LastDagRun = LastDagRun.filter(DagRun.dag_id.in_(selected_dag_ids))
RunningDagRun = RunningDagRun.filter(DagRun.dag_id.in_(selected_dag_ids))
LastDagRun = LastDagRun.subquery('last_dag_run')
RunningDagRun = RunningDagRun.subquery('running_dag_run')
# Select all task_instances from active dag_runs.
# If no dag_run is active, return task instances from most recent dag_run.
LastTI = (
session.query(TI.dag_id.label('dag_id'), TI.state.label('state'))
.join(LastDagRun, and_(
LastDagRun.c.dag_id == TI.dag_id,
LastDagRun.c.execution_date == TI.execution_date))
)
RunningTI = (
session.query(TI.dag_id.label('dag_id'), TI.state.label('state'))
.join(RunningDagRun, and_(
RunningDagRun.c.dag_id == TI.dag_id,
RunningDagRun.c.execution_date == TI.execution_date))
)
if selected_dag_ids:
LastTI = LastTI.filter(TI.dag_id.in_(selected_dag_ids))
RunningTI = RunningTI.filter(TI.dag_id.in_(selected_dag_ids))
UnionTI = union_all(LastTI, RunningTI).alias('union_ti')
qry = (
session.query(UnionTI.c.dag_id, UnionTI.c.state, sqla.func.count())
.group_by(UnionTI.c.dag_id, UnionTI.c.state)
)
data = {}
for dag_id, state, count in qry:
if dag_id not in data:
data[dag_id] = {}
data[dag_id][state] = count
payload = {}
dag_ids = selected_dag_ids or {dag_id for (dag_id,) in session.query(Dag.dag_id)}
for dag_id in dag_ids:
payload[dag_id] = []
for state in State.task_states:
count = data.get(dag_id, {}).get(state, 0)
payload[dag_id].append({
'state': state,
'count': count
})
return wwwutils.json_response(payload)
@expose('/code')
@login_required
@provide_session
def code(self, session=None):
all_errors = ""
try:
dag_id = request.args.get('dag_id')
dag_orm = models.DagModel.get_dagmodel(dag_id, session=session)
code = DagCode.get_code_by_fileloc(dag_orm.fileloc)
html_code = Markup(highlight(
code, lexers.PythonLexer(), HtmlFormatter(linenos=True)))
except Exception as e:
all_errors += (
"Exception encountered during " +
"dag_id retrieval/dag retrieval fallback/code highlighting:\n\n{}\n".format(e)
)
html_code = Markup('Failed to load file.
Details: {}
').format(
escape(all_errors))
return self.render(
'airflow/dag_code.html', html_code=html_code, dag=dag_orm, title=dag_id,
root=request.args.get('root'),
demo_mode=conf.getboolean('webserver', 'demo_mode'),
wrapped=conf.getboolean('webserver', 'default_wrap'))
@expose('/dag_details')
@login_required
@provide_session
def dag_details(self, session=None):
dag_id = request.args.get('dag_id')
dag = dagbag.get_dag(dag_id)
title = "DAG details"
root = request.args.get('root', '')
TI = models.TaskInstance
states = session\
.query(TI.state, sqla.func.count(TI.dag_id))\
.filter(TI.dag_id == dag_id)\
.group_by(TI.state)\
.all()
active_runs = models.DagRun.find(
dag_id=dag.dag_id,
state=State.RUNNING,
external_trigger=False,
session=session
)
return self.render(
'airflow/dag_details.html',
dag=dag, title=title, root=root, states=states, State=State,
active_runs=active_runs)
@current_app.errorhandler(404)
def circles(self):
return render_template(
'airflow/circles.html', hostname=get_hostname() if conf.getboolean(
'webserver',
'EXPOSE_HOSTNAME',
fallback=True) else 'redact'), 404
@current_app.errorhandler(500)
def show_traceback(self):
from airflow.utils import asciiart as ascii_
return render_template(
'airflow/traceback.html',
hostname=get_hostname() if conf.getboolean(
'webserver',
'EXPOSE_HOSTNAME',
fallback=True) else 'redact',
nukular=ascii_.nukular,
info=traceback.format_exc() if conf.getboolean(
'webserver',
'EXPOSE_STACKTRACE',
fallback=True) else 'Error! Please contact server admin'), 500
@expose('/noaccess')
def noaccess(self):
return self.render('airflow/noaccess.html')
@expose('/pickle_info')
@login_required
def pickle_info(self):
d = {}
dag_id = request.args.get('dag_id')
dags = [dagbag.dags.get(dag_id)] if dag_id else dagbag.dags.values()
for dag in dags:
if not dag.is_subdag:
d[dag.dag_id] = dag.pickle_info()
return wwwutils.json_response(d)
@expose('/login', methods=['GET', 'POST'])
def login(self):
return airflow.login.login(self, request)
@expose('/logout')
def logout(self):
logout_user()
flash('You have been logged out.')
return redirect(url_for('admin.index'))
@expose('/rendered')
@login_required
@wwwutils.action_logging
@provide_session
def rendered(self, session=None):
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date')
dttm = pendulum.parse(execution_date)
form = DateTimeForm(data={'execution_date': dttm})
root = request.args.get('root', '')
logging.info("Retrieving rendered templates.")
dag = dagbag.get_dag(dag_id)
task = copy.copy(dag.get_task(task_id))
ti = models.TaskInstance(task=task, execution_date=dttm)
try:
ti.get_rendered_template_fields()
except Exception as e:
msg = "Error rendering template: " + escape(e)
if six.PY3:
if e.__cause__:
msg += Markup("
OriginalError: ") + escape(e.__cause__)
flash(msg, "error")
title = "Rendered Template"
html_dict = {}
for template_field in task.template_fields:
content = getattr(task, template_field)
if template_field in attr_renderer:
html_dict[template_field] = attr_renderer[template_field](content)
else:
html_dict[template_field] = Markup("{}
").format(pformat(content))
return self.render(
'airflow/ti_code.html',
html_dict=html_dict,
dag=dag,
task_id=task_id,
execution_date=execution_date,
form=form,
root=root,
title=title)
@expose('/get_logs_with_metadata')
@login_required
@wwwutils.action_logging
@provide_session
def get_logs_with_metadata(self, session=None):
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date')
dttm = pendulum.parse(execution_date)
if request.args.get('try_number') is not None:
try_number = int(request.args.get('try_number'))
else:
try_number = None
response_format = request.args.get('format', 'json')
metadata = request.args.get('metadata')
metadata = json.loads(metadata)
# metadata may be null
if not metadata:
metadata = {}
# Convert string datetime into actual datetime
try:
execution_date = timezone.parse(execution_date)
except ValueError:
error_message = (
'Given execution date, {}, could not be identified '
'as a date. Example date format: 2015-11-16T14:34:15+00:00'.format(
execution_date))
response = jsonify({'error': error_message})
response.status_code = 400
return response
logger = logging.getLogger('airflow.task')
task_log_reader = conf.get('core', 'task_log_reader')
handler = next((handler for handler in logger.handlers
if handler.name == task_log_reader), None)
ti = session.query(models.TaskInstance).filter(
models.TaskInstance.dag_id == dag_id,
models.TaskInstance.task_id == task_id,
models.TaskInstance.execution_date == dttm).first()
def _get_logs_with_metadata(try_number, metadata):
if ti is None:
logs = ["*** Task instance did not exist in the DB\n"]
metadata['end_of_log'] = True
else:
logs, metadatas = handler.read(ti, try_number, metadata=metadata)
metadata = metadatas[0]
return logs, metadata
try:
if ti is not None:
dag = dagbag.get_dag(dag_id)
ti.task = dag.get_task(ti.task_id)
if response_format == 'json':
logs, metadata = _get_logs_with_metadata(try_number, metadata)
message = logs[0] if try_number is not None else logs
return jsonify(message=message, metadata=metadata)
filename_template = conf.get('core', 'LOG_FILENAME_TEMPLATE')
attachment_filename = render_log_filename(
ti=ti,
try_number="all" if try_number is None else try_number,
filename_template=filename_template)
metadata['download_logs'] = True
def _generate_log_stream(try_number, metadata):
if try_number is None and ti is not None:
next_try = ti.next_try_number
try_numbers = list(range(1, next_try))
else:
try_numbers = [try_number]
for try_number in try_numbers:
metadata.pop('end_of_log', None)
metadata.pop('max_offset', None)
metadata.pop('offset', None)
while 'end_of_log' not in metadata or not metadata['end_of_log']:
logs, metadata = _get_logs_with_metadata(try_number, metadata)
yield "\n".join(logs) + "\n"
return Response(_generate_log_stream(try_number, metadata),
mimetype="text/plain",
headers={"Content-Disposition": "attachment; filename={}".format(
attachment_filename)})
except AttributeError as e:
error_message = ["Task log handler {} does not support read logs.\n{}\n"
.format(task_log_reader, str(e))]
metadata['end_of_log'] = True
return jsonify(message=error_message, error=True, metadata=metadata)
@expose('/log')
@login_required
@wwwutils.action_logging
@provide_session
def log(self, session=None):
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date')
dttm = pendulum.parse(execution_date)
form = DateTimeForm(data={'execution_date': dttm})
dag = dagbag.get_dag(dag_id)
ti = session.query(models.TaskInstance).filter(
models.TaskInstance.dag_id == dag_id,
models.TaskInstance.task_id == task_id,
models.TaskInstance.execution_date == dttm).first()
num_logs = 0
if ti is not None:
num_logs = ti.next_try_number - 1
if ti.state == State.UP_FOR_RESCHEDULE:
# Tasks in reschedule state decremented the try number
num_logs += 1
logs = [''] * num_logs
root = request.args.get('root', '')
return self.render(
'airflow/ti_log.html',
logs=logs, dag=dag, title="Log by attempts",
dag_id=dag.dag_id, task_id=task_id,
execution_date=execution_date, form=form,
root=root, wrapped=conf.getboolean('webserver', 'default_wrap'))
@expose('/elasticsearch')
@login_required
@wwwutils.action_logging
@provide_session
def elasticsearch(self, session=None):
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
execution_date = request.args.get('execution_date')
try_number = request.args.get('try_number', 1)
elasticsearch_frontend = conf.get('elasticsearch', 'frontend')
log_id_template = conf.get('elasticsearch', 'log_id_template')
log_id = log_id_template.format(
dag_id=dag_id, task_id=task_id,
execution_date=execution_date, try_number=try_number)
url = 'https://' + elasticsearch_frontend.format(log_id=quote(log_id))
return redirect(url)
@expose('/task')
@login_required
@wwwutils.action_logging
def task(self):
TI = models.TaskInstance
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
# Carrying execution_date through, even though it's irrelevant for
# this context
execution_date = request.args.get('execution_date')
dttm = pendulum.parse(execution_date)
form = DateTimeForm(data={'execution_date': dttm})
root = request.args.get('root', '')
dag = dagbag.get_dag(dag_id)
if not dag or task_id not in dag.task_ids:
flash(
"Task [{}.{}] doesn't seem to exist"
" at the moment".format(dag_id, task_id),
"error")
return redirect('/admin/')
task = copy.copy(dag.get_task(task_id))
task.resolve_template_files()
ti = TI(task=task, execution_date=dttm)
ti.refresh_from_db()
ti_attrs = []
for attr_name in dir(ti):
if not attr_name.startswith('_'):
attr = getattr(ti, attr_name)
if type(attr) != type(self.task): # noqa: E721
ti_attrs.append((attr_name, str(attr)))
task_attrs = []
for attr_name in dir(task):
if not attr_name.startswith('_'):
attr = getattr(task, attr_name)
if type(attr) != type(self.task) and \
attr_name not in attr_renderer: # noqa: E721
task_attrs.append((attr_name, str(attr)))
# Color coding the special attributes that are code
special_attrs_rendered = {}
for attr_name in attr_renderer:
if hasattr(task, attr_name):
source = getattr(task, attr_name)
special_attrs_rendered[attr_name] = attr_renderer[attr_name](source)
no_failed_deps_result = [(
"Unknown",
dedent("""\
All dependencies are met but the task instance is not running.
In most cases this just means that the task will probably
be scheduled soon unless:
- The scheduler is down or under heavy load
- The following configuration values may be limiting the number
of queueable processes:
parallelism
,
dag_concurrency
,
max_active_runs_per_dag
,
non_pooled_task_slot_count
{}
If this task instance does not start soon please contact your Airflow """
"""administrator for assistance."""
.format(
"- This task instance already ran and had its state changed "
"manually (e.g. cleared in the UI)
"
if ti.state == State.NONE else "")))]
# Use the scheduler's context to figure out which dependencies are not met
dep_context = DepContext(SCHEDULER_QUEUED_DEPS)
failed_dep_reasons = [(dep.dep_name, dep.reason) for dep in
ti.get_failed_dep_statuses(
dep_context=dep_context)]
title = "Task Instance Details"
return self.render(
'airflow/task.html',
task_attrs=task_attrs,
ti_attrs=ti_attrs,
failed_dep_reasons=failed_dep_reasons or no_failed_deps_result,
task_id=task_id,
execution_date=execution_date,
special_attrs_rendered=special_attrs_rendered,
form=form,
root=root,
dag=dag, title=title)
@expose('/xcom')
@login_required
@wwwutils.action_logging
@provide_session
def xcom(self, session=None):
dag_id = request.args.get('dag_id')
task_id = request.args.get('task_id')
# Carrying execution_date through, even though it's irrelevant for
# this context
execution_date = request.args.get('execution_date')
dttm = pendulum.parse(execution_date)
form = DateTimeForm(data={'execution_date': dttm})
root = request.args.get('root', '')
dm_db = models.DagModel
ti_db = models.TaskInstance
dag = session.query(dm_db).filter(dm_db.dag_id == dag_id).first()
ti = session.query(ti_db).filter(ti_db.dag_id == dag_id and ti_db.task_id == task_id).first()
if not ti:
flash(
"Task [{}.{}] doesn't seem to exist"
" at the moment".format(dag_id, task_id),
"error")
return redirect('/admin/')
xcomlist = session.query(XCom).filter(
XCom.dag_id == dag_id, XCom.task_id == task_id,
XCom.execution_date == dttm).all()
attributes = []
for xcom in xcomlist:
if not xcom.key.startswith('_'):
attributes.append((xcom.key, xcom.value))
title = "XCom"
return self.render(
'airflow/xcom.html',
attributes=attributes,
task_id=task_id,
execution_date=execution_date,
form=form,
root=root,
dag=dag, title=title)
@expose('/run', methods=['POST'])
@login_required
@wwwutils.action_logging
@wwwutils.notify_owner
def run(self):
dag_id = request.form.get('dag_id')
task_id = request.form.get('task_id')
origin = request.form.get('origin')
dag = dagbag.get_dag(dag_id)
task = dag.get_task(task_id)
execution_date = request.form.get('execution_date')
execution_date = pendulum.parse(execution_date)
ignore_all_deps = request.form.get('ignore_all_deps') == "true"
ignore_task_deps = request.form.get('ignore_task_deps') == "true"
ignore_ti_state = request.form.get('ignore_ti_state') == "true"
from airflow.executors import get_default_executor
executor = get_default_executor()
valid_celery_config = False
valid_kubernetes_config = False
try:
from airflow.executors.celery_executor import CeleryExecutor
valid_celery_config = isinstance(executor, CeleryExecutor)
except ImportError:
pass
try:
from airflow.contrib.executors.kubernetes_executor import KubernetesExecutor
valid_kubernetes_config = isinstance(executor, KubernetesExecutor)
except ImportError:
pass
if not valid_celery_config and not valid_kubernetes_config:
flash("Only works with the Celery or Kubernetes executors, sorry", "error")
return redirect(origin)
ti = models.TaskInstance(task=task, execution_date=execution_date)
ti.refresh_from_db()
# Make sure the task instance can be run
dep_context = DepContext(
deps=RUNNING_DEPS,
ignore_all_deps=ignore_all_deps,
ignore_task_deps=ignore_task_deps,
ignore_ti_state=ignore_ti_state)
failed_deps = list(ti.get_failed_dep_statuses(dep_context=dep_context))
if failed_deps:
failed_deps_str = ", ".join(
["{}: {}".format(dep.dep_name, dep.reason) for dep in failed_deps])
flash("Could not queue task instance for execution, dependencies not met: "
"{}".format(failed_deps_str),
"error")
return redirect(origin)
executor.start()
executor.queue_task_instance(
ti,
ignore_all_deps=ignore_all_deps,
ignore_task_deps=ignore_task_deps,
ignore_ti_state=ignore_ti_state)
executor.heartbeat()
flash(
"Sent {} to the message queue, "
"it should start any moment now.".format(ti))
return redirect(origin)
@expose('/delete', methods=['POST'])
@login_required
@wwwutils.action_logging
@wwwutils.notify_owner
def delete(self):
from airflow.api.common.experimental import delete_dag
from airflow.exceptions import DagNotFound, DagFileExists
dag_id = request.values.get('dag_id')
origin = request.values.get('origin') or "/admin/"
try:
delete_dag.delete_dag(dag_id)
except DagNotFound:
flash("DAG with id {} not found. Cannot delete".format(dag_id))
return redirect(request.referrer)
except DagFileExists:
flash("Dag id {} is still in DagBag. "
"Remove the DAG file first.".format(dag_id))
return redirect(request.referrer)
flash("Deleting DAG with id {}. May take a couple minutes to fully"
" disappear.".format(dag_id))
# Upon successful delete return to origin
return redirect(origin)
@expose('/trigger', methods=['POST', 'GET'])
@login_required
@wwwutils.action_logging
@wwwutils.notify_owner
@provide_session
def trigger(self, session=None):
dag_id = request.values.get('dag_id')
origin = request.values.get('origin') or "/admin/"
if request.method == 'GET':
return self.render(
'airflow/trigger.html',
dag_id=dag_id,
origin=origin,
conf=''
)
dag = session.query(models.DagModel).filter(models.DagModel.dag_id == dag_id).first()
if not dag:
flash("Cannot find dag {}".format(dag_id))
return redirect(origin)
execution_date = timezone.utcnow()
run_id = "manual__{0}".format(execution_date.isoformat())
dr = DagRun.find(dag_id=dag_id, run_id=run_id)
if dr:
flash("This run_id {} already exists".format(run_id))
return redirect(origin)
run_conf = {}
conf = request.values.get('conf')
if conf:
try:
run_conf = json.loads(conf)
except ValueError:
flash("Invalid JSON configuration", "error")
return self.render(
'airflow/trigger.html',
dag_id=dag_id,
origin=origin,
conf=conf,
)
dag = dagbag.get_dag(dag_id)
dag.create_dagrun(
run_id=run_id,
execution_date=execution_date,
state=State.RUNNING,
conf=run_conf,
external_trigger=True
)
flash(
"Triggered {}, "
"it should start any moment now.".format(dag_id))
return redirect(origin)
def _clear_dag_tis(self, dag, start_date, end_date, origin,
recursive=False, confirmed=False, only_failed=False):
from airflow.exceptions import AirflowException
if confirmed:
count = dag.clear(
start_date=start_date,
end_date=end_date,
include_subdags=recursive,
include_parentdag=recursive,
only_failed=only_failed,
)
flash("{0} task instances have been cleared".format(count))
return redirect(origin)
try:
tis = dag.clear(
start_date=start_date,
end_date=end_date,
include_subdags=recursive,
include_parentdag=recursive,
only_failed=only_failed,
dry_run=True,
)
except AirflowException as ex:
flash(str(ex), 'error')
return redirect(origin)
if not tis:
flash("No task instances to clear", 'error')
response = redirect(origin)
else:
details = "\n".join([str(t) for t in tis])
response = self.render(
'airflow/confirm.html',
message=("Here's the list of task instances you are about "
"to clear:"),
details=details)
return response
@expose('/clear', methods=['POST'])
@login_required
@wwwutils.action_logging
@wwwutils.notify_owner
def clear(self):
dag_id = request.form.get('dag_id')
task_id = request.form.get('task_id')
origin = request.form.get('origin')
dag = dagbag.get_dag(dag_id)
execution_date = request.form.get('execution_date')
execution_date = pendulum.parse(execution_date)
confirmed = request.form.get('confirmed') == "true"
upstream = request.form.get('upstream') == "true"
downstream = request.form.get('downstream') == "true"
future = request.form.get('future') == "true"
past = request.form.get('past') == "true"
recursive = request.form.get('recursive') == "true"
only_failed = request.form.get('only_failed') == "true"
dag = dag.sub_dag(
task_regex=r"^{0}$".format(task_id),
include_downstream=downstream,
include_upstream=upstream)
end_date = execution_date if not future else None
start_date = execution_date if not past else None
return self._clear_dag_tis(dag, start_date, end_date, origin,
recursive=recursive, confirmed=confirmed, only_failed=only_failed)
@expose('/dagrun_clear', methods=['POST'])
@login_required
@wwwutils.action_logging
@wwwutils.notify_owner
def dagrun_clear(self):
dag_id = request.form.get('dag_id')
origin = request.form.get('origin')
execution_date = request.form.get('execution_date')
confirmed = request.form.get('confirmed') == "true"
dag = dagbag.get_dag(dag_id)
execution_date = pendulum.parse(execution_date)
start_date = execution_date
end_date = execution_date
return self._clear_dag_tis(dag, start_date, end_date, origin,
recursive=True, confirmed=confirmed)
@expose('/blocked')
@login_required
@provide_session
def blocked(self, session=None):
DR = models.DagRun
dags = session\
.query(DR.dag_id, sqla.func.count(DR.id))\
.filter(DR.state == State.RUNNING)\
.group_by(DR.dag_id)\
.all()
payload = []
for dag_id, active_dag_runs in dags:
max_active_runs = 0
dag = dagbag.get_dag(dag_id)
if dag:
# TODO: Make max_active_runs a column so we can query for it directly
max_active_runs = dag.max_active_runs
payload.append({
'dag_id': dag_id,
'active_dag_run': active_dag_runs,
'max_active_runs': max_active_runs,
})
return wwwutils.json_response(payload)
def _mark_dagrun_state_as_failed(self, dag_id, execution_date, confirmed, origin):
if not execution_date:
flash('Invalid execution date', 'error')
return redirect(origin)
execution_date = pendulum.parse(execution_date)
dag = dagbag.get_dag(dag_id)
if not dag:
flash('Cannot find DAG: {}'.format(dag_id), 'error')
return redirect(origin)
new_dag_state = set_dag_run_state_to_failed(dag, execution_date, commit=confirmed)
if confirmed:
flash('Marked failed on {} task instances'.format(len(new_dag_state)))
return redirect(origin)
else:
details = '\n'.join([str(t) for t in new_dag_state])
response = self.render('airflow/confirm.html',
message=("Here's the list of task instances you are "
"about to mark as failed"),
details=details)
return response
def _mark_dagrun_state_as_success(self, dag_id, execution_date, confirmed, origin):
if not execution_date:
flash('Invalid execution date', 'error')
return redirect(origin)
execution_date = pendulum.parse(execution_date)
dag = dagbag.get_dag(dag_id)
if not dag:
flash('Cannot find DAG: {}'.format(dag_id), 'error')
return redirect(origin)
new_dag_state = set_dag_run_state_to_success(dag, execution_date,
commit=confirmed)
if confirmed:
flash('Marked success on {} task instances'.format(len(new_dag_state)))
return redirect(origin)
else:
details = '\n'.join([str(t) for t in new_dag_state])
response = self.render('airflow/confirm.html',
message=("Here's the list of task instances you are "
"about to mark as success"),
details=details)
return response
@expose('/dagrun_failed', methods=['POST'])
@login_required
@wwwutils.action_logging
@wwwutils.notify_owner
def dagrun_failed(self):
dag_id = request.form.get('dag_id')
execution_date = request.form.get('execution_date')
confirmed = request.form.get('confirmed') == 'true'
origin = request.form.get('origin')
return self._mark_dagrun_state_as_failed(dag_id, execution_date,
confirmed, origin)
@expose('/dagrun_success', methods=['POST'])
@login_required
@wwwutils.action_logging
@wwwutils.notify_owner
def dagrun_success(self):
dag_id = request.form.get('dag_id')
execution_date = request.form.get('execution_date')
confirmed = request.form.get('confirmed') == 'true'
origin = request.form.get('origin')
return self._mark_dagrun_state_as_success(dag_id, execution_date,
confirmed, origin)
def _mark_task_instance_state(self, dag_id, task_id, origin, execution_date,
confirmed, upstream, downstream,
future, past, state):
dag = dagbag.get_dag(dag_id)
task = dag.get_task(task_id)
task.dag = dag
execution_date = pendulum.parse(execution_date)
if not dag:
flash("Cannot find DAG: {}".format(dag_id))
return redirect(origin)
if not task:
flash("Cannot find task {} in DAG {}".format(task_id, dag.dag_id))
return redirect(origin)
from airflow.api.common.experimental.mark_tasks import set_state
if confirmed:
altered = set_state(tasks=[task], execution_date=execution_date,
upstream=upstream, downstream=downstream,
future=future, past=past, state=state,
commit=True)
flash("Marked {} on {} task instances".format(state, len(altered)))
return redirect(origin)
to_be_altered = set_state(tasks=[task], execution_date=execution_date,
upstream=upstream, downstream=downstream,
future=future, past=past, state=state,
commit=False)
details = "\n".join([str(t) for t in to_be_altered])
response = self.render("airflow/confirm.html",
message=("Here's the list of task instances you are "
"about to mark as {}:".format(state)),
details=details)
return response
@expose('/failed', methods=['POST'])
@login_required
@wwwutils.action_logging
@wwwutils.notify_owner
def failed(self):
dag_id = request.form.get('dag_id')
task_id = request.form.get('task_id')
origin = request.form.get('origin')
execution_date = request.form.get('execution_date')
confirmed = request.form.get('confirmed') == "true"
upstream = request.form.get('failed_upstream') == "true"
downstream = request.form.get('failed_downstream') == "true"
future = request.form.get('failed_future') == "true"
past = request.form.get('failed_past') == "true"
return self._mark_task_instance_state(dag_id, task_id, origin, execution_date,
confirmed, upstream, downstream,
future, past, State.FAILED)
@expose('/success', methods=['POST'])
@login_required
@wwwutils.action_logging
@wwwutils.notify_owner
def success(self):
dag_id = request.form.get('dag_id')
task_id = request.form.get('task_id')
origin = request.form.get('origin')
execution_date = request.form.get('execution_date')
confirmed = request.form.get('confirmed') == "true"
upstream = request.form.get('success_upstream') == "true"
downstream = request.form.get('success_downstream') == "true"
future = request.form.get('success_future') == "true"
past = request.form.get('success_past') == "true"
return self._mark_task_instance_state(dag_id, task_id, origin, execution_date,
confirmed, upstream, downstream,
future, past, State.SUCCESS)
@expose('/tree')
@login_required
@wwwutils.gzipped
@wwwutils.action_logging
@provide_session
def tree(self, session=None):
default_dag_run = conf.getint('webserver', 'default_dag_run_display_number')
dag_id = request.args.get('dag_id')
blur = conf.getboolean('webserver', 'demo_mode')
dag = dagbag.get_dag(dag_id)
if not dag:
flash('DAG "{0}" seems to be missing.'.format(dag_id), "error")
return redirect('/admin/')
root = request.args.get('root')
if root:
dag = dag.sub_dag(
task_regex=root,
include_downstream=False,
include_upstream=True)
base_date = request.args.get('base_date')
num_runs = request.args.get('num_runs')
num_runs = int(num_runs) if num_runs else default_dag_run
if base_date:
base_date = timezone.parse(base_date)
else:
base_date = dag.latest_execution_date or timezone.utcnow()
DR = models.DagRun
dag_runs = (
session.query(DR)
.filter(
DR.dag_id == dag.dag_id,
DR.execution_date <= base_date)
.order_by(DR.execution_date.desc())
.limit(num_runs)
.all()
)
dag_runs = {
dr.execution_date: alchemy_to_dict(dr) for dr in dag_runs}
dates = sorted(list(dag_runs.keys()))
max_date = max(dates) if dates else None
min_date = min(dates) if dates else None
tis = dag.get_task_instances(
start_date=min_date, end_date=base_date, session=session)
task_instances = {}
for ti in tis:
tid = alchemy_to_dict(ti)
dr = dag_runs.get(ti.execution_date)
tid['external_trigger'] = dr['external_trigger'] if dr else False
task_instances[(ti.task_id, ti.execution_date)] = tid
expanded = []
# The default recursion traces every path so that tree view has full
# expand/collapse functionality. After 5,000 nodes we stop and fall
# back on a quick DFS search for performance. See PR #320.
node_count = [0]
node_limit = 5000 / max(1, len(dag.leaves))
def recurse_nodes(task, visited):
visited.add(task)
node_count[0] += 1
children = [
recurse_nodes(t, visited) for t in task.downstream_list
if node_count[0] < node_limit or t not in visited]
# D3 tree uses children vs _children to define what is
# expanded or not. The following block makes it such that
# repeated nodes are collapsed by default.
children_key = 'children'
if task.task_id not in expanded:
expanded.append(task.task_id)
elif children:
children_key = "_children"
def set_duration(tid):
if isinstance(tid, dict) and tid.get("state") == State.RUNNING \
and tid["start_date"] is not None:
d = timezone.utcnow() - pendulum.parse(tid["start_date"])
tid["duration"] = d.total_seconds()
return tid
return {
'name': task.task_id,
'instances': [
set_duration(task_instances.get((task.task_id, d))) or {
'execution_date': d.isoformat(),
'task_id': task.task_id
}
for d in dates],
children_key: children,
'num_dep': len(task.downstream_list),
'operator': task.task_type,
'retries': task.retries,
'owner': task.owner,
'start_date': task.start_date,
'end_date': task.end_date,
'depends_on_past': task.depends_on_past,
'ui_color': task.ui_color,
}
data = {
'name': '[DAG]',
'children': [recurse_nodes(t, set()) for t in dag.roots],
'instances': [dag_runs.get(d) or {'execution_date': d.isoformat()} for d in dates],
}
session.commit()
form = DateTimeWithNumRunsForm(data={'base_date': max_date,
'num_runs': num_runs})
external_logs = conf.get('elasticsearch', 'frontend')
doc_md = wrapped_markdown(getattr(dag, 'doc_md', None), css_class='dag-doc')
return self.render(
'airflow/tree.html',
operators=sorted({op.task_type: op for op in dag.tasks}.values(),
key=lambda x: x.task_type),
root=root,
form=form,
dag=dag, data=data, blur=blur, num_runs=num_runs,
doc_md=doc_md,
show_external_logs=bool(external_logs))
@expose('/graph')
@login_required
@wwwutils.gzipped
@wwwutils.action_logging
@provide_session
def graph(self, session=None):
dag_id = request.args.get('dag_id')
blur = conf.getboolean('webserver', 'demo_mode')
dag = dagbag.get_dag(dag_id)
if not dag:
flash('DAG "{0}" seems to be missing.'.format(dag_id), "error")
return redirect('/admin/')
root = request.args.get('root')
if root:
dag = dag.sub_dag(
task_regex=root,
include_upstream=True,
include_downstream=False)
arrange = request.args.get('arrange', dag.orientation)
nodes = []
edges = []
for task in dag.tasks:
nodes.append({
'id': task.task_id,
'value': {
'label': task.task_id,
'labelStyle': "fill:{0};".format(task.ui_fgcolor),
'style': "fill:{0};".format(task.ui_color),
}
})
def get_downstream(task):
for t in task.downstream_list:
edge = {
'u': task.task_id,
'v': t.task_id,
}
if edge not in edges:
edges.append(edge)
get_downstream(t)
for t in dag.roots:
get_downstream(t)
dt_nr_dr_data = get_date_time_num_runs_dag_runs_form_data(request, session, dag)
dt_nr_dr_data['arrange'] = arrange
dttm = dt_nr_dr_data['dttm']
class GraphForm(DateTimeWithNumRunsWithDagRunsForm):
arrange = SelectField("Layout", choices=(
('LR', "Left->Right"),
('RL', "Right->Left"),
('TB', "Top->Bottom"),
('BT', "Bottom->Top"),
))
form = GraphForm(data=dt_nr_dr_data)
form.execution_date.choices = dt_nr_dr_data['dr_choices']
task_instances = {
ti.task_id: alchemy_to_dict(ti)
for ti in dag.get_task_instances(dttm, dttm, session=session)}
tasks = {
t.task_id: {
'dag_id': t.dag_id,
'task_type': t.task_type,
}
for t in dag.tasks}
if not tasks:
flash("No tasks found", "error")
session.commit()
doc_md = wrapped_markdown(getattr(dag, 'doc_md', None), css_class='dag-doc')
external_logs = conf.get('elasticsearch', 'frontend')
return self.render(
'airflow/graph.html',
dag=dag,
form=form,
width=request.args.get('width', "100%"),
height=request.args.get('height', "800"),
execution_date=dttm.isoformat(),
state_token=state_token(dt_nr_dr_data['dr_state']),
doc_md=doc_md,
arrange=arrange,
operators=sorted({op.task_type: op for op in dag.tasks}.values(),
key=lambda x: x.task_type),
blur=blur,
root=root or '',
task_instances=task_instances,
tasks=tasks,
nodes=nodes,
edges=edges,
show_external_logs=bool(external_logs))
@expose('/duration')
@login_required
@wwwutils.action_logging
@provide_session
def duration(self, session=None):
default_dag_run = conf.getint('webserver', 'default_dag_run_display_number')
dag_id = request.args.get('dag_id')
dag = dagbag.get_dag(dag_id)
base_date = request.args.get('base_date')
num_runs = request.args.get('num_runs')
num_runs = int(num_runs) if num_runs else default_dag_run
if dag is None:
flash('DAG "{0}" seems to be missing.'.format(dag_id), "error")
return redirect('/admin/')
if base_date:
base_date = pendulum.parse(base_date)
else:
base_date = dag.latest_execution_date or timezone.utcnow()
dates = dag.date_range(base_date, num=-abs(num_runs))
min_date = dates[0] if dates else datetime(2000, 1, 1)
root = request.args.get('root')
if root:
dag = dag.sub_dag(
task_regex=root,
include_upstream=True,
include_downstream=False)
chart_height = get_chart_height(dag)
chart = nvd3.lineChart(
name="lineChart", x_is_date=True, height=chart_height, width="1200")
cum_chart = nvd3.lineChart(
name="cumLineChart", x_is_date=True, height=chart_height, width="1200")
y = defaultdict(list)
x = defaultdict(list)
cum_y = defaultdict(list)
tis = dag.get_task_instances(
start_date=min_date, end_date=base_date, session=session)
TF = models.TaskFail
ti_fails = (
session
.query(TF)
.filter(
TF.dag_id == dag.dag_id,
TF.execution_date >= min_date,
TF.execution_date <= base_date,
TF.task_id.in_([t.task_id for t in dag.tasks]))
.all()
)
fails_totals = defaultdict(int)
for tf in ti_fails:
dict_key = (tf.dag_id, tf.task_id, tf.execution_date)
if tf.duration:
fails_totals[dict_key] += tf.duration
for ti in tis:
if ti.duration:
dttm = wwwutils.epoch(ti.execution_date)
x[ti.task_id].append(dttm)
y[ti.task_id].append(float(ti.duration))
fails_dict_key = (ti.dag_id, ti.task_id, ti.execution_date)
fails_total = fails_totals[fails_dict_key]
cum_y[ti.task_id].append(float(ti.duration + fails_total))
# determine the most relevant time unit for the set of task instance
# durations for the DAG
y_unit = infer_time_unit([d for t in y.values() for d in t])
cum_y_unit = infer_time_unit([d for t in cum_y.values() for d in t])
# update the y Axis on both charts to have the correct time units
chart.create_y_axis('yAxis', format='.02f', custom_format=False,
label='Duration ({})'.format(y_unit))
chart.axislist['yAxis']['axisLabelDistance'] = '40'
cum_chart.create_y_axis('yAxis', format='.02f', custom_format=False,
label='Duration ({})'.format(cum_y_unit))
cum_chart.axislist['yAxis']['axisLabelDistance'] = '40'
for task in dag.tasks:
if x[task.task_id]:
chart.add_serie(name=task.task_id, x=x[task.task_id],
y=scale_time_units(y[task.task_id], y_unit))
cum_chart.add_serie(name=task.task_id, x=x[task.task_id],
y=scale_time_units(cum_y[task.task_id],
cum_y_unit))
dates = sorted(list({ti.execution_date for ti in tis}))
max_date = max([ti.execution_date for ti in tis]) if dates else None
session.commit()
form = DateTimeWithNumRunsForm(data={'base_date': max_date,
'num_runs': num_runs})
chart.buildcontent()
cum_chart.buildcontent()
s_index = cum_chart.htmlcontent.rfind('});')
cum_chart.htmlcontent = (cum_chart.htmlcontent[:s_index] +
"$(function() {$( document ).trigger('chartload') })" +
cum_chart.htmlcontent[s_index:])
return self.render(
'airflow/duration_chart.html',
dag=dag,
demo_mode=conf.getboolean('webserver', 'demo_mode'),
root=root,
form=form,
chart=Markup(chart.htmlcontent),
cum_chart=Markup(cum_chart.htmlcontent)
)
@expose('/tries')
@login_required
@wwwutils.action_logging
@provide_session
def tries(self, session=None):
default_dag_run = conf.getint('webserver', 'default_dag_run_display_number')
dag_id = request.args.get('dag_id')
dag = dagbag.get_dag(dag_id)
base_date = request.args.get('base_date')
num_runs = request.args.get('num_runs')
num_runs = int(num_runs) if num_runs else default_dag_run
if base_date:
base_date = pendulum.parse(base_date)
else:
base_date = dag.latest_execution_date or timezone.utcnow()
dates = dag.date_range(base_date, num=-abs(num_runs))
min_date = dates[0] if dates else datetime(2000, 1, 1)
root = request.args.get('root')
if root:
dag = dag.sub_dag(
task_regex=root,
include_upstream=True,
include_downstream=False)
chart_height = get_chart_height(dag)
chart = nvd3.lineChart(
name="lineChart", x_is_date=True, y_axis_format='d', height=chart_height,
width="1200")
for task in dag.tasks:
y = []
x = []
for ti in task.get_task_instances(start_date=min_date,
end_date=base_date,
session=session):
dttm = wwwutils.epoch(ti.execution_date)
x.append(dttm)
# y value should reflect completed tries to have a 0 baseline.
y.append(ti.prev_attempted_tries)
if x:
chart.add_serie(name=task.task_id, x=x, y=y)
tis = dag.get_task_instances(
start_date=min_date, end_date=base_date, session=session)
tries = sorted(list({ti.try_number for ti in tis}))
max_date = max([ti.execution_date for ti in tis]) if tries else None
session.commit()
form = DateTimeWithNumRunsForm(data={'base_date': max_date,
'num_runs': num_runs})
chart.buildcontent()
return self.render(
'airflow/chart.html',
dag=dag,
demo_mode=conf.getboolean('webserver', 'demo_mode'),
root=root,
form=form,
chart=Markup(chart.htmlcontent)
)
@expose('/landing_times')
@login_required
@wwwutils.action_logging
@provide_session
def landing_times(self, session=None):
default_dag_run = conf.getint('webserver', 'default_dag_run_display_number')
dag_id = request.args.get('dag_id')
dag = dagbag.get_dag(dag_id)
base_date = request.args.get('base_date')
num_runs = request.args.get('num_runs')
num_runs = int(num_runs) if num_runs else default_dag_run
if base_date:
base_date = pendulum.parse(base_date)
else:
base_date = dag.latest_execution_date or timezone.utcnow()
dates = dag.date_range(base_date, num=-abs(num_runs))
min_date = dates[0] if dates else datetime(2000, 1, 1)
root = request.args.get('root')
if root:
dag = dag.sub_dag(
task_regex=root,
include_upstream=True,
include_downstream=False)
chart_height = get_chart_height(dag)
chart = nvd3.lineChart(
name="lineChart", x_is_date=True, height=chart_height, width="1200")
y = {}
x = {}
for task in dag.tasks:
task_id = task.task_id
y[task_id] = []
x[task_id] = []
for ti in task.get_task_instances(start_date=min_date,
end_date=base_date,
session=session):
if ti.end_date:
ts = ti.execution_date
following_schedule = dag.following_schedule(ts)
if dag.schedule_interval and following_schedule:
ts = following_schedule
dttm = wwwutils.epoch(ti.execution_date)
secs = (ti.end_date - ts).total_seconds()
x[task_id].append(dttm)
y[task_id].append(secs)
# determine the most relevant time unit for the set of landing times
# for the DAG
y_unit = infer_time_unit([d for t in y.values() for d in t])
# update the y Axis to have the correct time units
chart.create_y_axis('yAxis', format='.02f', custom_format=False,
label='Landing Time ({})'.format(y_unit))
chart.axislist['yAxis']['axisLabelDistance'] = '40'
for task in dag.tasks:
if x[task.task_id]:
chart.add_serie(name=task.task_id, x=x[task.task_id],
y=scale_time_units(y[task.task_id], y_unit))
tis = dag.get_task_instances(
start_date=min_date, end_date=base_date, session=session)
dates = sorted(list({ti.execution_date for ti in tis}))
max_date = max([ti.execution_date for ti in tis]) if dates else None
form = DateTimeWithNumRunsForm(data={'base_date': max_date,
'num_runs': num_runs})
chart.buildcontent()
return self.render(
'airflow/chart.html',
dag=dag,
chart=Markup(chart.htmlcontent),
height=str(chart_height + 100) + "px",
demo_mode=conf.getboolean('webserver', 'demo_mode'),
root=root,
form=form,
)
@expose('/paused', methods=['POST'])
@login_required
@wwwutils.action_logging
@provide_session
def paused(self, session=None):
dag_id = request.values.get('dag_id')
is_paused = True if request.args.get('is_paused') == 'false' else False
models.DagModel.get_dagmodel(dag_id).set_is_paused(
is_paused=is_paused,
store_serialized_dags=STORE_SERIALIZED_DAGS)
return "OK"
@expose('/refresh', methods=['POST'])
@login_required
@wwwutils.action_logging
@provide_session
def refresh(self, session=None):
# TODO: Is this method still needed after AIRFLOW-3561?
dm = models.DagModel
dag_id = request.values.get('dag_id')
orm_dag = session.query(dm).filter(dm.dag_id == dag_id).first()
if orm_dag:
orm_dag.last_expired = timezone.utcnow()
session.merge(orm_dag)
session.commit()
flash("DAG [{}] is now fresh as a daisy".format(dag_id))
return redirect(request.referrer)
@expose('/gantt')
@login_required
@wwwutils.action_logging
@provide_session
def gantt(self, session=None):
dag_id = request.args.get('dag_id')
dag = dagbag.get_dag(dag_id)
demo_mode = conf.getboolean('webserver', 'demo_mode')
root = request.args.get('root')
if root:
dag = dag.sub_dag(
task_regex=root,
include_upstream=True,
include_downstream=False)
dt_nr_dr_data = get_date_time_num_runs_dag_runs_form_data(request, session, dag)
dttm = dt_nr_dr_data['dttm']
form = DateTimeWithNumRunsWithDagRunsForm(data=dt_nr_dr_data)
form.execution_date.choices = dt_nr_dr_data['dr_choices']
tis = [
ti for ti in dag.get_task_instances(dttm, dttm, session=session)
if ti.start_date and ti.state]
tis = sorted(tis, key=lambda ti: ti.start_date)
TF = models.TaskFail
ti_fails = list(itertools.chain(*[(
session
.query(TF)
.filter(TF.dag_id == ti.dag_id,
TF.task_id == ti.task_id,
TF.execution_date == ti.execution_date)
.all()
) for ti in tis]))
# determine bars to show in the gantt chart
gantt_bar_items = []
for ti in tis:
end_date = ti.end_date or timezone.utcnow()
# prev_attempted_tries will reflect the currently running try_number
# or the try_number of the last complete run
# https://issues.apache.org/jira/browse/AIRFLOW-2143
try_count = ti.prev_attempted_tries
gantt_bar_items.append((ti.task_id, ti.start_date, end_date, ti.state, try_count))
tf_count = 0
try_count = 1
prev_task_id = ""
for tf in ti_fails:
end_date = tf.end_date or timezone.utcnow()
start_date = tf.start_date or end_date
if tf_count != 0 and tf.task_id == prev_task_id:
try_count = try_count + 1
else:
try_count = 1
prev_task_id = tf.task_id
gantt_bar_items.append((tf.task_id, start_date, end_date, State.FAILED, try_count))
tf_count = tf_count + 1
tasks = []
for gantt_bar_item in gantt_bar_items:
task_id = gantt_bar_item[0]
start_date = gantt_bar_item[1]
end_date = gantt_bar_item[2]
state = gantt_bar_item[3]
try_count = gantt_bar_item[4]
tasks.append({
'startDate': wwwutils.epoch(start_date),
'endDate': wwwutils.epoch(end_date),
'isoStart': start_date.isoformat()[:-4],
'isoEnd': end_date.isoformat()[:-4],
'taskName': task_id,
'duration': (end_date - start_date).total_seconds(),
'status': state,
'executionDate': dttm.isoformat(),
'try_number': try_count,
})
states = {task['status']: task['status'] for task in tasks}
data = {
'taskNames': [ti.task_id for ti in tis],
'tasks': tasks,
'taskStatus': states,
'height': len(tis) * 25 + 25,
}
session.commit()
return self.render(
'airflow/gantt.html',
dag=dag,
execution_date=dttm.isoformat(),
form=form,
data=data,
base_date='',
demo_mode=demo_mode,
root=root,
)
@expose('/object/task_instances')
@login_required
@wwwutils.action_logging
@provide_session
def task_instances(self, session=None):
dag_id = request.args.get('dag_id')
dag = dagbag.get_dag(dag_id)
dttm = request.args.get('execution_date')
if dttm:
dttm = pendulum.parse(dttm)
else:
return "Error: Invalid execution_date"
task_instances = {
ti.task_id: alchemy_to_dict(ti)
for ti in dag.get_task_instances(dttm, dttm, session=session)}
return json.dumps(task_instances)
@expose('/variables/