# -*- coding: utf-8 -*- # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import ast import codecs import copy import datetime as dt import itertools import json import logging import math import os import pickle import traceback from collections import defaultdict from datetime import timedelta from functools import wraps from textwrap import dedent import markdown import sqlalchemy as sqla import pendulum from flask import ( abort, jsonify, redirect, url_for, request, Markup, Response, current_app, render_template, make_response) from flask import flash from flask_admin import BaseView, expose, AdminIndexView from flask_admin.actions import action from flask_admin.babel import lazy_gettext from flask_admin.contrib.sqla import ModelView from flask_admin.form.fields import DateTimeField from flask_admin.tools import iterdecode import lazy_object_proxy from jinja2 import escape from jinja2.sandbox import ImmutableSandboxedEnvironment from jinja2.utils import pformat from past.builtins import basestring from pygments import highlight, lexers import six from pygments.formatters.html import HtmlFormatter from six.moves.urllib.parse import quote, unquote from sqlalchemy import or_, desc, and_, union_all from wtforms import ( Form, SelectField, TextAreaField, PasswordField, StringField, IntegerField, validators) import nvd3 import airflow from airflow import configuration from airflow.configuration import conf from airflow import jobs, models, settings from airflow.api.common.experimental.mark_tasks import (set_dag_run_state_to_running, set_dag_run_state_to_success, set_dag_run_state_to_failed) from airflow.exceptions import AirflowException from airflow.models import BaseOperator, Connection, DagRun, errors, XCom from airflow.models.dagcode import DagCode from airflow.settings import STATE_COLORS, STORE_SERIALIZED_DAGS from airflow.operators.subdag_operator import SubDagOperator from airflow.ti_deps.dep_context import RUNNING_DEPS, SCHEDULER_QUEUED_DEPS, DepContext from airflow.utils import timezone from airflow.utils.dates import infer_time_unit, scale_time_units, parse_execution_date from airflow.utils.db import create_session, provide_session from airflow.utils.helpers import alchemy_to_dict, render_log_filename from airflow.utils.net import get_hostname from airflow.utils.state import State from airflow.utils.timezone import datetime from airflow.www import utils as wwwutils from airflow.www.forms import (DateTimeForm, DateTimeWithNumRunsForm, DateTimeWithNumRunsWithDagRunsForm) from airflow.www.validators import GreaterEqualThan QUERY_LIMIT = 100000 CHART_LIMIT = 200000 UTF8_READER = codecs.getreader('utf-8') dagbag = models.DagBag(settings.DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS) login_required = airflow.login.login_required current_user = airflow.login.current_user logout_user = airflow.login.logout_user FILTER_BY_OWNER = False PAGE_SIZE = conf.getint('webserver', 'page_size') log = logging.getLogger(__name__) if conf.getboolean('webserver', 'FILTER_BY_OWNER'): # filter_by_owner if authentication is enabled and filter_by_owner is true FILTER_BY_OWNER = not current_app.config['LOGIN_DISABLED'] def dag_link(v, c, m, p): if m.dag_id is None: return Markup() kwargs = {'dag_id': m.dag_id} # This is called with various objects, TIs, (ORM) DAG - some have this, # some don't if hasattr(m, 'execution_date'): kwargs['execution_date'] = m.execution_date url = url_for('airflow.graph', **kwargs) return Markup( '{}').format(url, m.dag_id) def log_url_formatter(v, c, m, p): url = url_for( 'airflow.log', dag_id=m.dag_id, task_id=m.task_id, execution_date=m.execution_date.isoformat()) return Markup( '' ' ').format(log_url=url) def dag_run_link(v, c, m, p): url = url_for( 'airflow.graph', dag_id=m.dag_id, run_id=m.run_id, execution_date=m.execution_date) title = m.run_id return Markup('{title}').format(**locals()) def task_instance_link(v, c, m, p): url = url_for( 'airflow.task', dag_id=m.dag_id, task_id=m.task_id, execution_date=m.execution_date.isoformat()) url_root = url_for( 'airflow.graph', dag_id=m.dag_id, root=m.task_id, execution_date=m.execution_date.isoformat()) return Markup( """ {m.task_id} """).format(**locals()) def state_token(state): color = State.color(state) return Markup( '' '{state}').format(**locals()) def parse_datetime_f(value): if not isinstance(value, dt.datetime): return value return timezone.make_aware(value) def state_f(v, c, m, p): return state_token(m.state) def duration_f(v, c, m, p): if m.end_date and m.duration: return timedelta(seconds=m.duration) def datetime_f(v, c, m, p): attr = getattr(m, p) dttm = attr.isoformat() if attr else '' if timezone.utcnow().isoformat()[:4] == dttm[:4]: dttm = dttm[5:] return Markup("{}").format(dttm) def nobr_f(v, c, m, p): return Markup("{}").format(getattr(m, p)) def label_link(v, c, m, p): try: default_params = ast.literal_eval(m.default_params) except Exception: default_params = {} url = url_for( 'airflow.chart', chart_id=m.id, iteration_no=m.iteration_no, **default_params) title = m.label return Markup("{title}").format(**locals()) def pool_link(v, c, m, p): title = m.pool url = url_for('taskinstance.index_view', flt1_pool_equals=m.pool) return Markup("{title}").format(**locals()) def pygment_html_render(s, lexer=lexers.TextLexer): return highlight( s, lexer(), HtmlFormatter(linenos=True), ) def render(obj, lexer): out = "" if isinstance(obj, basestring): out += Markup(pygment_html_render(obj, lexer)) elif isinstance(obj, (tuple, list)): for i, s in enumerate(obj): out += Markup("
List item #{}
".format(i)) out += Markup("
" + pygment_html_render(s, lexer) + "
") elif isinstance(obj, dict): for k, v in obj.items(): out += Markup('
Dict item "{}"
'.format(k)) out += Markup("
" + pygment_html_render(v, lexer) + "
") return out def wrapped_markdown(s, css_class=None): if s is None: return None return Markup( '
' + markdown.markdown(s) + "
" ).format(css_class=css_class) attr_renderer = { 'bash_command': lambda x: render(x, lexers.BashLexer), 'hql': lambda x: render(x, lexers.SqlLexer), 'sql': lambda x: render(x, lexers.SqlLexer), 'doc': lambda x: render(x, lexers.TextLexer), 'doc_json': lambda x: render(x, lexers.JsonLexer), 'doc_rst': lambda x: render(x, lexers.RstLexer), 'doc_yaml': lambda x: render(x, lexers.YamlLexer), 'doc_md': wrapped_markdown, 'python_callable': lambda x: render( wwwutils.get_python_source(x), lexers.PythonLexer, ), } def data_profiling_required(f): """Decorator for views requiring data profiling access""" @wraps(f) def decorated_function(*args, **kwargs): if ( current_app.config['LOGIN_DISABLED'] or (not current_user.is_anonymous and current_user.data_profiling()) ): return f(*args, **kwargs) else: flash("This page requires data profiling privileges", "error") return redirect(url_for('admin.index')) return decorated_function def fused_slots(v, c, m, p): url = url_for( 'taskinstance.index_view', flt1_pool_equals=m.pool, flt2_state_equals='running', ) return Markup("{1}").format(url, m.used_slots()) def fqueued_slots(v, c, m, p): url = url_for( 'taskinstance.index_view', flt1_pool_equals=m.pool, flt2_state_equals='queued', sort='1', desc='1' ) return Markup("{1}").format(url, m.queued_slots()) def recurse_tasks(tasks, task_ids, dag_ids, task_id_to_dag): if isinstance(tasks, list): for task in tasks: recurse_tasks(task, task_ids, dag_ids, task_id_to_dag) return if isinstance(tasks, SubDagOperator): subtasks = tasks.subdag.tasks dag_ids.append(tasks.subdag.dag_id) for subtask in subtasks: if subtask.task_id not in task_ids: task_ids.append(subtask.task_id) task_id_to_dag[subtask.task_id] = tasks.subdag recurse_tasks(subtasks, task_ids, dag_ids, task_id_to_dag) if isinstance(tasks, BaseOperator): task_id_to_dag[tasks.task_id] = tasks.dag def get_chart_height(dag): """ TODO(aoen): See [AIRFLOW-1263] We use the number of tasks in the DAG as a heuristic to approximate the size of generated chart (otherwise the charts are tiny and unreadable when DAGs have a large number of tasks). Ideally nvd3 should allow for dynamic-height charts, that is charts that take up space based on the size of the components within. """ return 600 + len(dag.tasks) * 10 def get_date_time_num_runs_dag_runs_form_data(request, session, dag): dttm = request.args.get('execution_date') if dttm: dttm = pendulum.parse(dttm) else: dttm = dag.latest_execution_date or timezone.utcnow() base_date = request.args.get('base_date') if base_date: base_date = timezone.parse(base_date) else: # The DateTimeField widget truncates milliseconds and would loose # the first dag run. Round to next second. base_date = (dttm + timedelta(seconds=1)).replace(microsecond=0) default_dag_run = conf.getint('webserver', 'default_dag_run_display_number') num_runs = request.args.get('num_runs') num_runs = int(num_runs) if num_runs else default_dag_run DR = models.DagRun drs = ( session.query(DR) .filter( DR.dag_id == dag.dag_id, DR.execution_date <= base_date) .order_by(desc(DR.execution_date)) .limit(num_runs) .all() ) dr_choices = [] dr_state = None for dr in drs: dr_choices.append((dr.execution_date.isoformat(), dr.run_id)) if dttm == dr.execution_date: dr_state = dr.state # Happens if base_date was changed and the selected dag run is not in result if not dr_state and drs: dr = drs[0] dttm = dr.execution_date dr_state = dr.state return { 'dttm': dttm, 'base_date': base_date, 'num_runs': num_runs, 'execution_date': dttm.isoformat(), 'dr_choices': dr_choices, 'dr_state': dr_state, } class AirflowViewMixin(object): def render(self, template, **kwargs): kwargs['scheduler_job'] = lazy_object_proxy.Proxy(jobs.SchedulerJob.most_recent_job) kwargs['macros'] = airflow.macros return super(AirflowViewMixin, self).render(template, **kwargs) class Airflow(AirflowViewMixin, BaseView): def is_visible(self): return False @expose('/') @login_required def index(self): return self.render('airflow/dags.html') @expose('/chart_data') @data_profiling_required @wwwutils.gzipped def chart_data(self): from airflow import macros import pandas as pd if conf.getboolean('core', 'secure_mode'): abort(404) with create_session() as session: chart_id = request.args.get('chart_id') csv = request.args.get('csv') == "true" chart = session.query(models.Chart).filter_by(id=chart_id).first() db = session.query( Connection).filter_by(conn_id=chart.conn_id).first() payload = { "state": "ERROR", "error": "" } # Processing templated fields try: args = ast.literal_eval(chart.default_params) if not isinstance(args, dict): raise AirflowException('Not a dict') except Exception: args = {} payload['error'] += ( "Default params is not valid, string has to evaluate as " "a Python dictionary. ") request_dict = {k: request.args.get(k) for k in request.args} args.update(request_dict) args['macros'] = macros sandbox = ImmutableSandboxedEnvironment() sql = sandbox.from_string(chart.sql).render(**args) label = sandbox.from_string(chart.label).render(**args) payload['sql_html'] = Markup(highlight( sql, lexers.SqlLexer(), # Lexer call HtmlFormatter(noclasses=True)) ) payload['label'] = label pd.set_option('display.max_colwidth', 100) try: hook = db.get_hook() df = hook.get_pandas_df( wwwutils.limit_sql(sql, CHART_LIMIT, conn_type=db.conn_type)) df = df.fillna(0) except Exception as e: payload['error'] += "SQL execution failed. Details: " + str(e) if csv: return Response( response=df.to_csv(index=False), status=200, mimetype="application/text") if not payload['error'] and len(df) == CHART_LIMIT: payload['warning'] = ( "Data has been truncated to {0}" " rows. Expect incomplete results.").format(CHART_LIMIT) if not payload['error'] and len(df) == 0: payload['error'] += "Empty result set. " elif ( not payload['error'] and chart.sql_layout == 'series' and chart.chart_type != "datatable" and len(df.columns) < 3): payload['error'] += "SQL needs to return at least 3 columns. " elif ( not payload['error'] and chart.sql_layout == 'columns' and len(df.columns) < 2): payload['error'] += "SQL needs to return at least 2 columns. " elif not payload['error']: import numpy as np chart_type = chart.chart_type data = None if chart.show_datatable or chart_type == "datatable": data = df.to_dict(orient="split") data['columns'] = [{'title': c} for c in data['columns']] payload['data'] = data # Trying to convert time to something Highcharts likes x_col = 1 if chart.sql_layout == 'series' else 0 if chart.x_is_date: try: # From string to datetime df[df.columns[x_col]] = pd.to_datetime( df[df.columns[x_col]]) df[df.columns[x_col]] = df[df.columns[x_col]].apply( lambda x: int(x.strftime("%s")) * 1000) except Exception: payload['error'] = "Time conversion failed" if chart_type == 'datatable': payload['state'] = 'SUCCESS' return wwwutils.json_response(payload) else: if chart.sql_layout == 'series': # User provides columns (series, x, y) df[df.columns[2]] = df[df.columns[2]].astype(np.float) df = df.pivot_table( index=df.columns[1], columns=df.columns[0], values=df.columns[2], aggfunc=np.sum) else: # User provides columns (x, y, metric1, metric2, ...) df.index = df[df.columns[0]] df = df.sort_values(by=df.columns[0]) del df[df.columns[0]] for col in df.columns: df[col] = df[col].astype(np.float) df = df.fillna(0) NVd3ChartClass = chart_mapping.get(chart.chart_type) NVd3ChartClass = getattr(nvd3, NVd3ChartClass) nvd3_chart = NVd3ChartClass(x_is_date=chart.x_is_date) for col in df.columns: nvd3_chart.add_serie(name=col, y=df[col].tolist(), x=df[col].index.tolist()) try: nvd3_chart.buildcontent() payload['chart_type'] = nvd3_chart.__class__.__name__ payload['htmlcontent'] = nvd3_chart.htmlcontent except Exception as e: payload['error'] = str(e) payload['state'] = 'SUCCESS' payload['request_dict'] = request_dict return wwwutils.json_response(payload) @expose('/chart') @data_profiling_required def chart(self): if conf.getboolean('core', 'secure_mode'): abort(404) with create_session() as session: chart_id = request.args.get('chart_id') embed = request.args.get('embed') chart = session.query(models.Chart).filter_by(id=chart_id).first() NVd3ChartClass = chart_mapping.get(chart.chart_type) if not NVd3ChartClass: flash( "Not supported anymore as the license was incompatible, " "sorry", "danger") redirect('/admin/chart/') sql = "" if chart.show_sql: sql = Markup(highlight( chart.sql, lexers.SqlLexer(), # Lexer call HtmlFormatter(noclasses=True)) ) return self.render( 'airflow/nvd3.html', chart=chart, title="Airflow - Chart", sql=sql, label=chart.label, embed=embed) @expose('/dag_stats') @login_required @provide_session def dag_stats(self, session=None): dr = models.DagRun dm = models.DagModel dag_ids = session.query(dm.dag_id) dag_state_stats = ( session.query(dr.dag_id, dr.state, sqla.func.count(dr.state)).group_by(dr.dag_id, dr.state) ) data = {} for (dag_id, ) in dag_ids: data[dag_id] = {} for dag_id, state, count in dag_state_stats: if dag_id not in data: data[dag_id] = {} data[dag_id][state] = count payload = {} for dag_id, d in data.items(): payload[dag_id] = [] for state in State.dag_states: count = d.get(state, 0) payload[dag_id].append({ 'state': state, 'count': count }) return wwwutils.json_response(payload) @expose('/task_stats') @login_required @provide_session def task_stats(self, session=None): TI = models.TaskInstance DagRun = models.DagRun Dag = models.DagModel # Filter by get parameters selected_dag_ids = { unquote(dag_id) for dag_id in request.args.get('dag_ids', '').split(',') if dag_id } LastDagRun = ( session.query(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label('execution_date')) .join(Dag, Dag.dag_id == DagRun.dag_id) .filter(DagRun.state != State.RUNNING) .filter(Dag.is_active == True) # noqa: E712 .filter(Dag.is_subdag == False) # noqa: E712 .group_by(DagRun.dag_id) ) RunningDagRun = ( session.query(DagRun.dag_id, DagRun.execution_date) .join(Dag, Dag.dag_id == DagRun.dag_id) .filter(DagRun.state == State.RUNNING) .filter(Dag.is_active == True) # noqa: E712 .filter(Dag.is_subdag == False) # noqa: E712 ) if selected_dag_ids: LastDagRun = LastDagRun.filter(DagRun.dag_id.in_(selected_dag_ids)) RunningDagRun = RunningDagRun.filter(DagRun.dag_id.in_(selected_dag_ids)) LastDagRun = LastDagRun.subquery('last_dag_run') RunningDagRun = RunningDagRun.subquery('running_dag_run') # Select all task_instances from active dag_runs. # If no dag_run is active, return task instances from most recent dag_run. LastTI = ( session.query(TI.dag_id.label('dag_id'), TI.state.label('state')) .join(LastDagRun, and_( LastDagRun.c.dag_id == TI.dag_id, LastDagRun.c.execution_date == TI.execution_date)) ) RunningTI = ( session.query(TI.dag_id.label('dag_id'), TI.state.label('state')) .join(RunningDagRun, and_( RunningDagRun.c.dag_id == TI.dag_id, RunningDagRun.c.execution_date == TI.execution_date)) ) if selected_dag_ids: LastTI = LastTI.filter(TI.dag_id.in_(selected_dag_ids)) RunningTI = RunningTI.filter(TI.dag_id.in_(selected_dag_ids)) UnionTI = union_all(LastTI, RunningTI).alias('union_ti') qry = ( session.query(UnionTI.c.dag_id, UnionTI.c.state, sqla.func.count()) .group_by(UnionTI.c.dag_id, UnionTI.c.state) ) data = {} for dag_id, state, count in qry: if dag_id not in data: data[dag_id] = {} data[dag_id][state] = count payload = {} dag_ids = selected_dag_ids or {dag_id for (dag_id,) in session.query(Dag.dag_id)} for dag_id in dag_ids: payload[dag_id] = [] for state in State.task_states: count = data.get(dag_id, {}).get(state, 0) payload[dag_id].append({ 'state': state, 'count': count }) return wwwutils.json_response(payload) @expose('/code') @login_required @provide_session def code(self, session=None): all_errors = "" try: dag_id = request.args.get('dag_id') dag_orm = models.DagModel.get_dagmodel(dag_id, session=session) code = DagCode.get_code_by_fileloc(dag_orm.fileloc) html_code = Markup(highlight( code, lexers.PythonLexer(), HtmlFormatter(linenos=True))) except Exception as e: all_errors += ( "Exception encountered during " + "dag_id retrieval/dag retrieval fallback/code highlighting:\n\n{}\n".format(e) ) html_code = Markup('

Failed to load file.

Details: {}

').format( escape(all_errors)) return self.render( 'airflow/dag_code.html', html_code=html_code, dag=dag_orm, title=dag_id, root=request.args.get('root'), demo_mode=conf.getboolean('webserver', 'demo_mode'), wrapped=conf.getboolean('webserver', 'default_wrap')) @expose('/dag_details') @login_required @provide_session def dag_details(self, session=None): dag_id = request.args.get('dag_id') dag = dagbag.get_dag(dag_id) title = "DAG details" root = request.args.get('root', '') TI = models.TaskInstance states = session\ .query(TI.state, sqla.func.count(TI.dag_id))\ .filter(TI.dag_id == dag_id)\ .group_by(TI.state)\ .all() active_runs = models.DagRun.find( dag_id=dag.dag_id, state=State.RUNNING, external_trigger=False, session=session ) return self.render( 'airflow/dag_details.html', dag=dag, title=title, root=root, states=states, State=State, active_runs=active_runs) @current_app.errorhandler(404) def circles(self): return render_template( 'airflow/circles.html', hostname=get_hostname() if conf.getboolean( 'webserver', 'EXPOSE_HOSTNAME', fallback=True) else 'redact'), 404 @current_app.errorhandler(500) def show_traceback(self): from airflow.utils import asciiart as ascii_ return render_template( 'airflow/traceback.html', hostname=get_hostname() if conf.getboolean( 'webserver', 'EXPOSE_HOSTNAME', fallback=True) else 'redact', nukular=ascii_.nukular, info=traceback.format_exc() if conf.getboolean( 'webserver', 'EXPOSE_STACKTRACE', fallback=True) else 'Error! Please contact server admin'), 500 @expose('/noaccess') def noaccess(self): return self.render('airflow/noaccess.html') @expose('/pickle_info') @login_required def pickle_info(self): d = {} dag_id = request.args.get('dag_id') dags = [dagbag.dags.get(dag_id)] if dag_id else dagbag.dags.values() for dag in dags: if not dag.is_subdag: d[dag.dag_id] = dag.pickle_info() return wwwutils.json_response(d) @expose('/login', methods=['GET', 'POST']) def login(self): return airflow.login.login(self, request) @expose('/logout') def logout(self): logout_user() flash('You have been logged out.') return redirect(url_for('admin.index')) @expose('/rendered') @login_required @wwwutils.action_logging @provide_session def rendered(self, session=None): dag_id = request.args.get('dag_id') task_id = request.args.get('task_id') execution_date = request.args.get('execution_date') dttm = pendulum.parse(execution_date) form = DateTimeForm(data={'execution_date': dttm}) root = request.args.get('root', '') logging.info("Retrieving rendered templates.") dag = dagbag.get_dag(dag_id) task = copy.copy(dag.get_task(task_id)) ti = models.TaskInstance(task=task, execution_date=dttm) try: ti.get_rendered_template_fields() except Exception as e: msg = "Error rendering template: " + escape(e) if six.PY3: if e.__cause__: msg += Markup("

OriginalError: ") + escape(e.__cause__) flash(msg, "error") title = "Rendered Template" html_dict = {} for template_field in task.template_fields: content = getattr(task, template_field) if template_field in attr_renderer: html_dict[template_field] = attr_renderer[template_field](content) else: html_dict[template_field] = Markup("
{}
").format(pformat(content)) return self.render( 'airflow/ti_code.html', html_dict=html_dict, dag=dag, task_id=task_id, execution_date=execution_date, form=form, root=root, title=title) @expose('/get_logs_with_metadata') @login_required @wwwutils.action_logging @provide_session def get_logs_with_metadata(self, session=None): dag_id = request.args.get('dag_id') task_id = request.args.get('task_id') execution_date = request.args.get('execution_date') dttm = pendulum.parse(execution_date) if request.args.get('try_number') is not None: try_number = int(request.args.get('try_number')) else: try_number = None response_format = request.args.get('format', 'json') metadata = request.args.get('metadata') metadata = json.loads(metadata) # metadata may be null if not metadata: metadata = {} # Convert string datetime into actual datetime try: execution_date = timezone.parse(execution_date) except ValueError: error_message = ( 'Given execution date, {}, could not be identified ' 'as a date. Example date format: 2015-11-16T14:34:15+00:00'.format( execution_date)) response = jsonify({'error': error_message}) response.status_code = 400 return response logger = logging.getLogger('airflow.task') task_log_reader = conf.get('core', 'task_log_reader') handler = next((handler for handler in logger.handlers if handler.name == task_log_reader), None) ti = session.query(models.TaskInstance).filter( models.TaskInstance.dag_id == dag_id, models.TaskInstance.task_id == task_id, models.TaskInstance.execution_date == dttm).first() def _get_logs_with_metadata(try_number, metadata): if ti is None: logs = ["*** Task instance did not exist in the DB\n"] metadata['end_of_log'] = True else: logs, metadatas = handler.read(ti, try_number, metadata=metadata) metadata = metadatas[0] return logs, metadata try: if ti is not None: dag = dagbag.get_dag(dag_id) ti.task = dag.get_task(ti.task_id) if response_format == 'json': logs, metadata = _get_logs_with_metadata(try_number, metadata) message = logs[0] if try_number is not None else logs return jsonify(message=message, metadata=metadata) filename_template = conf.get('core', 'LOG_FILENAME_TEMPLATE') attachment_filename = render_log_filename( ti=ti, try_number="all" if try_number is None else try_number, filename_template=filename_template) metadata['download_logs'] = True def _generate_log_stream(try_number, metadata): if try_number is None and ti is not None: next_try = ti.next_try_number try_numbers = list(range(1, next_try)) else: try_numbers = [try_number] for try_number in try_numbers: metadata.pop('end_of_log', None) metadata.pop('max_offset', None) metadata.pop('offset', None) while 'end_of_log' not in metadata or not metadata['end_of_log']: logs, metadata = _get_logs_with_metadata(try_number, metadata) yield "\n".join(logs) + "\n" return Response(_generate_log_stream(try_number, metadata), mimetype="text/plain", headers={"Content-Disposition": "attachment; filename={}".format( attachment_filename)}) except AttributeError as e: error_message = ["Task log handler {} does not support read logs.\n{}\n" .format(task_log_reader, str(e))] metadata['end_of_log'] = True return jsonify(message=error_message, error=True, metadata=metadata) @expose('/log') @login_required @wwwutils.action_logging @provide_session def log(self, session=None): dag_id = request.args.get('dag_id') task_id = request.args.get('task_id') execution_date = request.args.get('execution_date') dttm = pendulum.parse(execution_date) form = DateTimeForm(data={'execution_date': dttm}) dag = dagbag.get_dag(dag_id) ti = session.query(models.TaskInstance).filter( models.TaskInstance.dag_id == dag_id, models.TaskInstance.task_id == task_id, models.TaskInstance.execution_date == dttm).first() num_logs = 0 if ti is not None: num_logs = ti.next_try_number - 1 if ti.state == State.UP_FOR_RESCHEDULE: # Tasks in reschedule state decremented the try number num_logs += 1 logs = [''] * num_logs root = request.args.get('root', '') return self.render( 'airflow/ti_log.html', logs=logs, dag=dag, title="Log by attempts", dag_id=dag.dag_id, task_id=task_id, execution_date=execution_date, form=form, root=root, wrapped=conf.getboolean('webserver', 'default_wrap')) @expose('/elasticsearch') @login_required @wwwutils.action_logging @provide_session def elasticsearch(self, session=None): dag_id = request.args.get('dag_id') task_id = request.args.get('task_id') execution_date = request.args.get('execution_date') try_number = request.args.get('try_number', 1) elasticsearch_frontend = conf.get('elasticsearch', 'frontend') log_id_template = conf.get('elasticsearch', 'log_id_template') log_id = log_id_template.format( dag_id=dag_id, task_id=task_id, execution_date=execution_date, try_number=try_number) url = 'https://' + elasticsearch_frontend.format(log_id=quote(log_id)) return redirect(url) @expose('/task') @login_required @wwwutils.action_logging def task(self): TI = models.TaskInstance dag_id = request.args.get('dag_id') task_id = request.args.get('task_id') # Carrying execution_date through, even though it's irrelevant for # this context execution_date = request.args.get('execution_date') dttm = pendulum.parse(execution_date) form = DateTimeForm(data={'execution_date': dttm}) root = request.args.get('root', '') dag = dagbag.get_dag(dag_id) if not dag or task_id not in dag.task_ids: flash( "Task [{}.{}] doesn't seem to exist" " at the moment".format(dag_id, task_id), "error") return redirect('/admin/') task = copy.copy(dag.get_task(task_id)) task.resolve_template_files() ti = TI(task=task, execution_date=dttm) ti.refresh_from_db() ti_attrs = [] for attr_name in dir(ti): if not attr_name.startswith('_'): attr = getattr(ti, attr_name) if type(attr) != type(self.task): # noqa: E721 ti_attrs.append((attr_name, str(attr))) task_attrs = [] for attr_name in dir(task): if not attr_name.startswith('_'): attr = getattr(task, attr_name) if type(attr) != type(self.task) and \ attr_name not in attr_renderer: # noqa: E721 task_attrs.append((attr_name, str(attr))) # Color coding the special attributes that are code special_attrs_rendered = {} for attr_name in attr_renderer: if hasattr(task, attr_name): source = getattr(task, attr_name) special_attrs_rendered[attr_name] = attr_renderer[attr_name](source) no_failed_deps_result = [( "Unknown", dedent("""\ All dependencies are met but the task instance is not running. In most cases this just means that the task will probably be scheduled soon unless:
- The scheduler is down or under heavy load
- The following configuration values may be limiting the number of queueable processes: parallelism, dag_concurrency, max_active_runs_per_dag, non_pooled_task_slot_count
{}
If this task instance does not start soon please contact your Airflow """ """administrator for assistance.""" .format( "- This task instance already ran and had its state changed " "manually (e.g. cleared in the UI)
" if ti.state == State.NONE else "")))] # Use the scheduler's context to figure out which dependencies are not met dep_context = DepContext(SCHEDULER_QUEUED_DEPS) failed_dep_reasons = [(dep.dep_name, dep.reason) for dep in ti.get_failed_dep_statuses( dep_context=dep_context)] title = "Task Instance Details" return self.render( 'airflow/task.html', task_attrs=task_attrs, ti_attrs=ti_attrs, failed_dep_reasons=failed_dep_reasons or no_failed_deps_result, task_id=task_id, execution_date=execution_date, special_attrs_rendered=special_attrs_rendered, form=form, root=root, dag=dag, title=title) @expose('/xcom') @login_required @wwwutils.action_logging @provide_session def xcom(self, session=None): dag_id = request.args.get('dag_id') task_id = request.args.get('task_id') # Carrying execution_date through, even though it's irrelevant for # this context execution_date = request.args.get('execution_date') dttm = pendulum.parse(execution_date) form = DateTimeForm(data={'execution_date': dttm}) root = request.args.get('root', '') dm_db = models.DagModel ti_db = models.TaskInstance dag = session.query(dm_db).filter(dm_db.dag_id == dag_id).first() ti = session.query(ti_db).filter(ti_db.dag_id == dag_id and ti_db.task_id == task_id).first() if not ti: flash( "Task [{}.{}] doesn't seem to exist" " at the moment".format(dag_id, task_id), "error") return redirect('/admin/') xcomlist = session.query(XCom).filter( XCom.dag_id == dag_id, XCom.task_id == task_id, XCom.execution_date == dttm).all() attributes = [] for xcom in xcomlist: if not xcom.key.startswith('_'): attributes.append((xcom.key, xcom.value)) title = "XCom" return self.render( 'airflow/xcom.html', attributes=attributes, task_id=task_id, execution_date=execution_date, form=form, root=root, dag=dag, title=title) @expose('/run', methods=['POST']) @login_required @wwwutils.action_logging @wwwutils.notify_owner def run(self): dag_id = request.form.get('dag_id') task_id = request.form.get('task_id') origin = request.form.get('origin') dag = dagbag.get_dag(dag_id) task = dag.get_task(task_id) execution_date = request.form.get('execution_date') execution_date = pendulum.parse(execution_date) ignore_all_deps = request.form.get('ignore_all_deps') == "true" ignore_task_deps = request.form.get('ignore_task_deps') == "true" ignore_ti_state = request.form.get('ignore_ti_state') == "true" from airflow.executors import get_default_executor executor = get_default_executor() valid_celery_config = False valid_kubernetes_config = False try: from airflow.executors.celery_executor import CeleryExecutor valid_celery_config = isinstance(executor, CeleryExecutor) except ImportError: pass try: from airflow.contrib.executors.kubernetes_executor import KubernetesExecutor valid_kubernetes_config = isinstance(executor, KubernetesExecutor) except ImportError: pass if not valid_celery_config and not valid_kubernetes_config: flash("Only works with the Celery or Kubernetes executors, sorry", "error") return redirect(origin) ti = models.TaskInstance(task=task, execution_date=execution_date) ti.refresh_from_db() # Make sure the task instance can be run dep_context = DepContext( deps=RUNNING_DEPS, ignore_all_deps=ignore_all_deps, ignore_task_deps=ignore_task_deps, ignore_ti_state=ignore_ti_state) failed_deps = list(ti.get_failed_dep_statuses(dep_context=dep_context)) if failed_deps: failed_deps_str = ", ".join( ["{}: {}".format(dep.dep_name, dep.reason) for dep in failed_deps]) flash("Could not queue task instance for execution, dependencies not met: " "{}".format(failed_deps_str), "error") return redirect(origin) executor.start() executor.queue_task_instance( ti, ignore_all_deps=ignore_all_deps, ignore_task_deps=ignore_task_deps, ignore_ti_state=ignore_ti_state) executor.heartbeat() flash( "Sent {} to the message queue, " "it should start any moment now.".format(ti)) return redirect(origin) @expose('/delete', methods=['POST']) @login_required @wwwutils.action_logging @wwwutils.notify_owner def delete(self): from airflow.api.common.experimental import delete_dag from airflow.exceptions import DagNotFound, DagFileExists dag_id = request.values.get('dag_id') origin = request.values.get('origin') or "/admin/" try: delete_dag.delete_dag(dag_id) except DagNotFound: flash("DAG with id {} not found. Cannot delete".format(dag_id)) return redirect(request.referrer) except DagFileExists: flash("Dag id {} is still in DagBag. " "Remove the DAG file first.".format(dag_id)) return redirect(request.referrer) flash("Deleting DAG with id {}. May take a couple minutes to fully" " disappear.".format(dag_id)) # Upon successful delete return to origin return redirect(origin) @expose('/trigger', methods=['POST', 'GET']) @login_required @wwwutils.action_logging @wwwutils.notify_owner @provide_session def trigger(self, session=None): dag_id = request.values.get('dag_id') origin = request.values.get('origin') or "/admin/" if request.method == 'GET': return self.render( 'airflow/trigger.html', dag_id=dag_id, origin=origin, conf='' ) dag = session.query(models.DagModel).filter(models.DagModel.dag_id == dag_id).first() if not dag: flash("Cannot find dag {}".format(dag_id)) return redirect(origin) execution_date = timezone.utcnow() run_id = "manual__{0}".format(execution_date.isoformat()) dr = DagRun.find(dag_id=dag_id, run_id=run_id) if dr: flash("This run_id {} already exists".format(run_id)) return redirect(origin) run_conf = {} conf = request.values.get('conf') if conf: try: run_conf = json.loads(conf) except ValueError: flash("Invalid JSON configuration", "error") return self.render( 'airflow/trigger.html', dag_id=dag_id, origin=origin, conf=conf, ) dag = dagbag.get_dag(dag_id) dag.create_dagrun( run_id=run_id, execution_date=execution_date, state=State.RUNNING, conf=run_conf, external_trigger=True ) flash( "Triggered {}, " "it should start any moment now.".format(dag_id)) return redirect(origin) def _clear_dag_tis(self, dag, start_date, end_date, origin, recursive=False, confirmed=False, only_failed=False): from airflow.exceptions import AirflowException if confirmed: count = dag.clear( start_date=start_date, end_date=end_date, include_subdags=recursive, include_parentdag=recursive, only_failed=only_failed, ) flash("{0} task instances have been cleared".format(count)) return redirect(origin) try: tis = dag.clear( start_date=start_date, end_date=end_date, include_subdags=recursive, include_parentdag=recursive, only_failed=only_failed, dry_run=True, ) except AirflowException as ex: flash(str(ex), 'error') return redirect(origin) if not tis: flash("No task instances to clear", 'error') response = redirect(origin) else: details = "\n".join([str(t) for t in tis]) response = self.render( 'airflow/confirm.html', message=("Here's the list of task instances you are about " "to clear:"), details=details) return response @expose('/clear', methods=['POST']) @login_required @wwwutils.action_logging @wwwutils.notify_owner def clear(self): dag_id = request.form.get('dag_id') task_id = request.form.get('task_id') origin = request.form.get('origin') dag = dagbag.get_dag(dag_id) execution_date = request.form.get('execution_date') execution_date = pendulum.parse(execution_date) confirmed = request.form.get('confirmed') == "true" upstream = request.form.get('upstream') == "true" downstream = request.form.get('downstream') == "true" future = request.form.get('future') == "true" past = request.form.get('past') == "true" recursive = request.form.get('recursive') == "true" only_failed = request.form.get('only_failed') == "true" dag = dag.sub_dag( task_regex=r"^{0}$".format(task_id), include_downstream=downstream, include_upstream=upstream) end_date = execution_date if not future else None start_date = execution_date if not past else None return self._clear_dag_tis(dag, start_date, end_date, origin, recursive=recursive, confirmed=confirmed, only_failed=only_failed) @expose('/dagrun_clear', methods=['POST']) @login_required @wwwutils.action_logging @wwwutils.notify_owner def dagrun_clear(self): dag_id = request.form.get('dag_id') origin = request.form.get('origin') execution_date = request.form.get('execution_date') confirmed = request.form.get('confirmed') == "true" dag = dagbag.get_dag(dag_id) execution_date = pendulum.parse(execution_date) start_date = execution_date end_date = execution_date return self._clear_dag_tis(dag, start_date, end_date, origin, recursive=True, confirmed=confirmed) @expose('/blocked') @login_required @provide_session def blocked(self, session=None): DR = models.DagRun dags = session\ .query(DR.dag_id, sqla.func.count(DR.id))\ .filter(DR.state == State.RUNNING)\ .group_by(DR.dag_id)\ .all() payload = [] for dag_id, active_dag_runs in dags: max_active_runs = 0 dag = dagbag.get_dag(dag_id) if dag: # TODO: Make max_active_runs a column so we can query for it directly max_active_runs = dag.max_active_runs payload.append({ 'dag_id': dag_id, 'active_dag_run': active_dag_runs, 'max_active_runs': max_active_runs, }) return wwwutils.json_response(payload) def _mark_dagrun_state_as_failed(self, dag_id, execution_date, confirmed, origin): if not execution_date: flash('Invalid execution date', 'error') return redirect(origin) execution_date = pendulum.parse(execution_date) dag = dagbag.get_dag(dag_id) if not dag: flash('Cannot find DAG: {}'.format(dag_id), 'error') return redirect(origin) new_dag_state = set_dag_run_state_to_failed(dag, execution_date, commit=confirmed) if confirmed: flash('Marked failed on {} task instances'.format(len(new_dag_state))) return redirect(origin) else: details = '\n'.join([str(t) for t in new_dag_state]) response = self.render('airflow/confirm.html', message=("Here's the list of task instances you are " "about to mark as failed"), details=details) return response def _mark_dagrun_state_as_success(self, dag_id, execution_date, confirmed, origin): if not execution_date: flash('Invalid execution date', 'error') return redirect(origin) execution_date = pendulum.parse(execution_date) dag = dagbag.get_dag(dag_id) if not dag: flash('Cannot find DAG: {}'.format(dag_id), 'error') return redirect(origin) new_dag_state = set_dag_run_state_to_success(dag, execution_date, commit=confirmed) if confirmed: flash('Marked success on {} task instances'.format(len(new_dag_state))) return redirect(origin) else: details = '\n'.join([str(t) for t in new_dag_state]) response = self.render('airflow/confirm.html', message=("Here's the list of task instances you are " "about to mark as success"), details=details) return response @expose('/dagrun_failed', methods=['POST']) @login_required @wwwutils.action_logging @wwwutils.notify_owner def dagrun_failed(self): dag_id = request.form.get('dag_id') execution_date = request.form.get('execution_date') confirmed = request.form.get('confirmed') == 'true' origin = request.form.get('origin') return self._mark_dagrun_state_as_failed(dag_id, execution_date, confirmed, origin) @expose('/dagrun_success', methods=['POST']) @login_required @wwwutils.action_logging @wwwutils.notify_owner def dagrun_success(self): dag_id = request.form.get('dag_id') execution_date = request.form.get('execution_date') confirmed = request.form.get('confirmed') == 'true' origin = request.form.get('origin') return self._mark_dagrun_state_as_success(dag_id, execution_date, confirmed, origin) def _mark_task_instance_state(self, dag_id, task_id, origin, execution_date, confirmed, upstream, downstream, future, past, state): dag = dagbag.get_dag(dag_id) task = dag.get_task(task_id) task.dag = dag execution_date = pendulum.parse(execution_date) if not dag: flash("Cannot find DAG: {}".format(dag_id)) return redirect(origin) if not task: flash("Cannot find task {} in DAG {}".format(task_id, dag.dag_id)) return redirect(origin) from airflow.api.common.experimental.mark_tasks import set_state if confirmed: altered = set_state(tasks=[task], execution_date=execution_date, upstream=upstream, downstream=downstream, future=future, past=past, state=state, commit=True) flash("Marked {} on {} task instances".format(state, len(altered))) return redirect(origin) to_be_altered = set_state(tasks=[task], execution_date=execution_date, upstream=upstream, downstream=downstream, future=future, past=past, state=state, commit=False) details = "\n".join([str(t) for t in to_be_altered]) response = self.render("airflow/confirm.html", message=("Here's the list of task instances you are " "about to mark as {}:".format(state)), details=details) return response @expose('/failed', methods=['POST']) @login_required @wwwutils.action_logging @wwwutils.notify_owner def failed(self): dag_id = request.form.get('dag_id') task_id = request.form.get('task_id') origin = request.form.get('origin') execution_date = request.form.get('execution_date') confirmed = request.form.get('confirmed') == "true" upstream = request.form.get('failed_upstream') == "true" downstream = request.form.get('failed_downstream') == "true" future = request.form.get('failed_future') == "true" past = request.form.get('failed_past') == "true" return self._mark_task_instance_state(dag_id, task_id, origin, execution_date, confirmed, upstream, downstream, future, past, State.FAILED) @expose('/success', methods=['POST']) @login_required @wwwutils.action_logging @wwwutils.notify_owner def success(self): dag_id = request.form.get('dag_id') task_id = request.form.get('task_id') origin = request.form.get('origin') execution_date = request.form.get('execution_date') confirmed = request.form.get('confirmed') == "true" upstream = request.form.get('success_upstream') == "true" downstream = request.form.get('success_downstream') == "true" future = request.form.get('success_future') == "true" past = request.form.get('success_past') == "true" return self._mark_task_instance_state(dag_id, task_id, origin, execution_date, confirmed, upstream, downstream, future, past, State.SUCCESS) @expose('/tree') @login_required @wwwutils.gzipped @wwwutils.action_logging @provide_session def tree(self, session=None): default_dag_run = conf.getint('webserver', 'default_dag_run_display_number') dag_id = request.args.get('dag_id') blur = conf.getboolean('webserver', 'demo_mode') dag = dagbag.get_dag(dag_id) if not dag: flash('DAG "{0}" seems to be missing.'.format(dag_id), "error") return redirect('/admin/') root = request.args.get('root') if root: dag = dag.sub_dag( task_regex=root, include_downstream=False, include_upstream=True) base_date = request.args.get('base_date') num_runs = request.args.get('num_runs') num_runs = int(num_runs) if num_runs else default_dag_run if base_date: base_date = timezone.parse(base_date) else: base_date = dag.latest_execution_date or timezone.utcnow() DR = models.DagRun dag_runs = ( session.query(DR) .filter( DR.dag_id == dag.dag_id, DR.execution_date <= base_date) .order_by(DR.execution_date.desc()) .limit(num_runs) .all() ) dag_runs = { dr.execution_date: alchemy_to_dict(dr) for dr in dag_runs} dates = sorted(list(dag_runs.keys())) max_date = max(dates) if dates else None min_date = min(dates) if dates else None tis = dag.get_task_instances( start_date=min_date, end_date=base_date, session=session) task_instances = {} for ti in tis: tid = alchemy_to_dict(ti) dr = dag_runs.get(ti.execution_date) tid['external_trigger'] = dr['external_trigger'] if dr else False task_instances[(ti.task_id, ti.execution_date)] = tid expanded = [] # The default recursion traces every path so that tree view has full # expand/collapse functionality. After 5,000 nodes we stop and fall # back on a quick DFS search for performance. See PR #320. node_count = [0] node_limit = 5000 / max(1, len(dag.leaves)) def recurse_nodes(task, visited): visited.add(task) node_count[0] += 1 children = [ recurse_nodes(t, visited) for t in task.downstream_list if node_count[0] < node_limit or t not in visited] # D3 tree uses children vs _children to define what is # expanded or not. The following block makes it such that # repeated nodes are collapsed by default. children_key = 'children' if task.task_id not in expanded: expanded.append(task.task_id) elif children: children_key = "_children" def set_duration(tid): if isinstance(tid, dict) and tid.get("state") == State.RUNNING \ and tid["start_date"] is not None: d = timezone.utcnow() - pendulum.parse(tid["start_date"]) tid["duration"] = d.total_seconds() return tid return { 'name': task.task_id, 'instances': [ set_duration(task_instances.get((task.task_id, d))) or { 'execution_date': d.isoformat(), 'task_id': task.task_id } for d in dates], children_key: children, 'num_dep': len(task.downstream_list), 'operator': task.task_type, 'retries': task.retries, 'owner': task.owner, 'start_date': task.start_date, 'end_date': task.end_date, 'depends_on_past': task.depends_on_past, 'ui_color': task.ui_color, } data = { 'name': '[DAG]', 'children': [recurse_nodes(t, set()) for t in dag.roots], 'instances': [dag_runs.get(d) or {'execution_date': d.isoformat()} for d in dates], } session.commit() form = DateTimeWithNumRunsForm(data={'base_date': max_date, 'num_runs': num_runs}) external_logs = conf.get('elasticsearch', 'frontend') doc_md = wrapped_markdown(getattr(dag, 'doc_md', None), css_class='dag-doc') return self.render( 'airflow/tree.html', operators=sorted({op.task_type: op for op in dag.tasks}.values(), key=lambda x: x.task_type), root=root, form=form, dag=dag, data=data, blur=blur, num_runs=num_runs, doc_md=doc_md, show_external_logs=bool(external_logs)) @expose('/graph') @login_required @wwwutils.gzipped @wwwutils.action_logging @provide_session def graph(self, session=None): dag_id = request.args.get('dag_id') blur = conf.getboolean('webserver', 'demo_mode') dag = dagbag.get_dag(dag_id) if not dag: flash('DAG "{0}" seems to be missing.'.format(dag_id), "error") return redirect('/admin/') root = request.args.get('root') if root: dag = dag.sub_dag( task_regex=root, include_upstream=True, include_downstream=False) arrange = request.args.get('arrange', dag.orientation) nodes = [] edges = [] for task in dag.tasks: nodes.append({ 'id': task.task_id, 'value': { 'label': task.task_id, 'labelStyle': "fill:{0};".format(task.ui_fgcolor), 'style': "fill:{0};".format(task.ui_color), } }) def get_downstream(task): for t in task.downstream_list: edge = { 'u': task.task_id, 'v': t.task_id, } if edge not in edges: edges.append(edge) get_downstream(t) for t in dag.roots: get_downstream(t) dt_nr_dr_data = get_date_time_num_runs_dag_runs_form_data(request, session, dag) dt_nr_dr_data['arrange'] = arrange dttm = dt_nr_dr_data['dttm'] class GraphForm(DateTimeWithNumRunsWithDagRunsForm): arrange = SelectField("Layout", choices=( ('LR', "Left->Right"), ('RL', "Right->Left"), ('TB', "Top->Bottom"), ('BT', "Bottom->Top"), )) form = GraphForm(data=dt_nr_dr_data) form.execution_date.choices = dt_nr_dr_data['dr_choices'] task_instances = { ti.task_id: alchemy_to_dict(ti) for ti in dag.get_task_instances(dttm, dttm, session=session)} tasks = { t.task_id: { 'dag_id': t.dag_id, 'task_type': t.task_type, } for t in dag.tasks} if not tasks: flash("No tasks found", "error") session.commit() doc_md = wrapped_markdown(getattr(dag, 'doc_md', None), css_class='dag-doc') external_logs = conf.get('elasticsearch', 'frontend') return self.render( 'airflow/graph.html', dag=dag, form=form, width=request.args.get('width', "100%"), height=request.args.get('height', "800"), execution_date=dttm.isoformat(), state_token=state_token(dt_nr_dr_data['dr_state']), doc_md=doc_md, arrange=arrange, operators=sorted({op.task_type: op for op in dag.tasks}.values(), key=lambda x: x.task_type), blur=blur, root=root or '', task_instances=task_instances, tasks=tasks, nodes=nodes, edges=edges, show_external_logs=bool(external_logs)) @expose('/duration') @login_required @wwwutils.action_logging @provide_session def duration(self, session=None): default_dag_run = conf.getint('webserver', 'default_dag_run_display_number') dag_id = request.args.get('dag_id') dag = dagbag.get_dag(dag_id) base_date = request.args.get('base_date') num_runs = request.args.get('num_runs') num_runs = int(num_runs) if num_runs else default_dag_run if dag is None: flash('DAG "{0}" seems to be missing.'.format(dag_id), "error") return redirect('/admin/') if base_date: base_date = pendulum.parse(base_date) else: base_date = dag.latest_execution_date or timezone.utcnow() dates = dag.date_range(base_date, num=-abs(num_runs)) min_date = dates[0] if dates else datetime(2000, 1, 1) root = request.args.get('root') if root: dag = dag.sub_dag( task_regex=root, include_upstream=True, include_downstream=False) chart_height = get_chart_height(dag) chart = nvd3.lineChart( name="lineChart", x_is_date=True, height=chart_height, width="1200") cum_chart = nvd3.lineChart( name="cumLineChart", x_is_date=True, height=chart_height, width="1200") y = defaultdict(list) x = defaultdict(list) cum_y = defaultdict(list) tis = dag.get_task_instances( start_date=min_date, end_date=base_date, session=session) TF = models.TaskFail ti_fails = ( session .query(TF) .filter( TF.dag_id == dag.dag_id, TF.execution_date >= min_date, TF.execution_date <= base_date, TF.task_id.in_([t.task_id for t in dag.tasks])) .all() ) fails_totals = defaultdict(int) for tf in ti_fails: dict_key = (tf.dag_id, tf.task_id, tf.execution_date) if tf.duration: fails_totals[dict_key] += tf.duration for ti in tis: if ti.duration: dttm = wwwutils.epoch(ti.execution_date) x[ti.task_id].append(dttm) y[ti.task_id].append(float(ti.duration)) fails_dict_key = (ti.dag_id, ti.task_id, ti.execution_date) fails_total = fails_totals[fails_dict_key] cum_y[ti.task_id].append(float(ti.duration + fails_total)) # determine the most relevant time unit for the set of task instance # durations for the DAG y_unit = infer_time_unit([d for t in y.values() for d in t]) cum_y_unit = infer_time_unit([d for t in cum_y.values() for d in t]) # update the y Axis on both charts to have the correct time units chart.create_y_axis('yAxis', format='.02f', custom_format=False, label='Duration ({})'.format(y_unit)) chart.axislist['yAxis']['axisLabelDistance'] = '40' cum_chart.create_y_axis('yAxis', format='.02f', custom_format=False, label='Duration ({})'.format(cum_y_unit)) cum_chart.axislist['yAxis']['axisLabelDistance'] = '40' for task in dag.tasks: if x[task.task_id]: chart.add_serie(name=task.task_id, x=x[task.task_id], y=scale_time_units(y[task.task_id], y_unit)) cum_chart.add_serie(name=task.task_id, x=x[task.task_id], y=scale_time_units(cum_y[task.task_id], cum_y_unit)) dates = sorted(list({ti.execution_date for ti in tis})) max_date = max([ti.execution_date for ti in tis]) if dates else None session.commit() form = DateTimeWithNumRunsForm(data={'base_date': max_date, 'num_runs': num_runs}) chart.buildcontent() cum_chart.buildcontent() s_index = cum_chart.htmlcontent.rfind('});') cum_chart.htmlcontent = (cum_chart.htmlcontent[:s_index] + "$(function() {$( document ).trigger('chartload') })" + cum_chart.htmlcontent[s_index:]) return self.render( 'airflow/duration_chart.html', dag=dag, demo_mode=conf.getboolean('webserver', 'demo_mode'), root=root, form=form, chart=Markup(chart.htmlcontent), cum_chart=Markup(cum_chart.htmlcontent) ) @expose('/tries') @login_required @wwwutils.action_logging @provide_session def tries(self, session=None): default_dag_run = conf.getint('webserver', 'default_dag_run_display_number') dag_id = request.args.get('dag_id') dag = dagbag.get_dag(dag_id) base_date = request.args.get('base_date') num_runs = request.args.get('num_runs') num_runs = int(num_runs) if num_runs else default_dag_run if base_date: base_date = pendulum.parse(base_date) else: base_date = dag.latest_execution_date or timezone.utcnow() dates = dag.date_range(base_date, num=-abs(num_runs)) min_date = dates[0] if dates else datetime(2000, 1, 1) root = request.args.get('root') if root: dag = dag.sub_dag( task_regex=root, include_upstream=True, include_downstream=False) chart_height = get_chart_height(dag) chart = nvd3.lineChart( name="lineChart", x_is_date=True, y_axis_format='d', height=chart_height, width="1200") for task in dag.tasks: y = [] x = [] for ti in task.get_task_instances(start_date=min_date, end_date=base_date, session=session): dttm = wwwutils.epoch(ti.execution_date) x.append(dttm) # y value should reflect completed tries to have a 0 baseline. y.append(ti.prev_attempted_tries) if x: chart.add_serie(name=task.task_id, x=x, y=y) tis = dag.get_task_instances( start_date=min_date, end_date=base_date, session=session) tries = sorted(list({ti.try_number for ti in tis})) max_date = max([ti.execution_date for ti in tis]) if tries else None session.commit() form = DateTimeWithNumRunsForm(data={'base_date': max_date, 'num_runs': num_runs}) chart.buildcontent() return self.render( 'airflow/chart.html', dag=dag, demo_mode=conf.getboolean('webserver', 'demo_mode'), root=root, form=form, chart=Markup(chart.htmlcontent) ) @expose('/landing_times') @login_required @wwwutils.action_logging @provide_session def landing_times(self, session=None): default_dag_run = conf.getint('webserver', 'default_dag_run_display_number') dag_id = request.args.get('dag_id') dag = dagbag.get_dag(dag_id) base_date = request.args.get('base_date') num_runs = request.args.get('num_runs') num_runs = int(num_runs) if num_runs else default_dag_run if base_date: base_date = pendulum.parse(base_date) else: base_date = dag.latest_execution_date or timezone.utcnow() dates = dag.date_range(base_date, num=-abs(num_runs)) min_date = dates[0] if dates else datetime(2000, 1, 1) root = request.args.get('root') if root: dag = dag.sub_dag( task_regex=root, include_upstream=True, include_downstream=False) chart_height = get_chart_height(dag) chart = nvd3.lineChart( name="lineChart", x_is_date=True, height=chart_height, width="1200") y = {} x = {} for task in dag.tasks: task_id = task.task_id y[task_id] = [] x[task_id] = [] for ti in task.get_task_instances(start_date=min_date, end_date=base_date, session=session): if ti.end_date: ts = ti.execution_date following_schedule = dag.following_schedule(ts) if dag.schedule_interval and following_schedule: ts = following_schedule dttm = wwwutils.epoch(ti.execution_date) secs = (ti.end_date - ts).total_seconds() x[task_id].append(dttm) y[task_id].append(secs) # determine the most relevant time unit for the set of landing times # for the DAG y_unit = infer_time_unit([d for t in y.values() for d in t]) # update the y Axis to have the correct time units chart.create_y_axis('yAxis', format='.02f', custom_format=False, label='Landing Time ({})'.format(y_unit)) chart.axislist['yAxis']['axisLabelDistance'] = '40' for task in dag.tasks: if x[task.task_id]: chart.add_serie(name=task.task_id, x=x[task.task_id], y=scale_time_units(y[task.task_id], y_unit)) tis = dag.get_task_instances( start_date=min_date, end_date=base_date, session=session) dates = sorted(list({ti.execution_date for ti in tis})) max_date = max([ti.execution_date for ti in tis]) if dates else None form = DateTimeWithNumRunsForm(data={'base_date': max_date, 'num_runs': num_runs}) chart.buildcontent() return self.render( 'airflow/chart.html', dag=dag, chart=Markup(chart.htmlcontent), height=str(chart_height + 100) + "px", demo_mode=conf.getboolean('webserver', 'demo_mode'), root=root, form=form, ) @expose('/paused', methods=['POST']) @login_required @wwwutils.action_logging @provide_session def paused(self, session=None): dag_id = request.values.get('dag_id') is_paused = True if request.args.get('is_paused') == 'false' else False models.DagModel.get_dagmodel(dag_id).set_is_paused( is_paused=is_paused, store_serialized_dags=STORE_SERIALIZED_DAGS) return "OK" @expose('/refresh', methods=['POST']) @login_required @wwwutils.action_logging @provide_session def refresh(self, session=None): # TODO: Is this method still needed after AIRFLOW-3561? dm = models.DagModel dag_id = request.values.get('dag_id') orm_dag = session.query(dm).filter(dm.dag_id == dag_id).first() if orm_dag: orm_dag.last_expired = timezone.utcnow() session.merge(orm_dag) session.commit() flash("DAG [{}] is now fresh as a daisy".format(dag_id)) return redirect(request.referrer) @expose('/gantt') @login_required @wwwutils.action_logging @provide_session def gantt(self, session=None): dag_id = request.args.get('dag_id') dag = dagbag.get_dag(dag_id) demo_mode = conf.getboolean('webserver', 'demo_mode') root = request.args.get('root') if root: dag = dag.sub_dag( task_regex=root, include_upstream=True, include_downstream=False) dt_nr_dr_data = get_date_time_num_runs_dag_runs_form_data(request, session, dag) dttm = dt_nr_dr_data['dttm'] form = DateTimeWithNumRunsWithDagRunsForm(data=dt_nr_dr_data) form.execution_date.choices = dt_nr_dr_data['dr_choices'] tis = [ ti for ti in dag.get_task_instances(dttm, dttm, session=session) if ti.start_date and ti.state] tis = sorted(tis, key=lambda ti: ti.start_date) TF = models.TaskFail ti_fails = list(itertools.chain(*[( session .query(TF) .filter(TF.dag_id == ti.dag_id, TF.task_id == ti.task_id, TF.execution_date == ti.execution_date) .all() ) for ti in tis])) # determine bars to show in the gantt chart gantt_bar_items = [] for ti in tis: end_date = ti.end_date or timezone.utcnow() # prev_attempted_tries will reflect the currently running try_number # or the try_number of the last complete run # https://issues.apache.org/jira/browse/AIRFLOW-2143 try_count = ti.prev_attempted_tries gantt_bar_items.append((ti.task_id, ti.start_date, end_date, ti.state, try_count)) tf_count = 0 try_count = 1 prev_task_id = "" for tf in ti_fails: end_date = tf.end_date or timezone.utcnow() start_date = tf.start_date or end_date if tf_count != 0 and tf.task_id == prev_task_id: try_count = try_count + 1 else: try_count = 1 prev_task_id = tf.task_id gantt_bar_items.append((tf.task_id, start_date, end_date, State.FAILED, try_count)) tf_count = tf_count + 1 tasks = [] for gantt_bar_item in gantt_bar_items: task_id = gantt_bar_item[0] start_date = gantt_bar_item[1] end_date = gantt_bar_item[2] state = gantt_bar_item[3] try_count = gantt_bar_item[4] tasks.append({ 'startDate': wwwutils.epoch(start_date), 'endDate': wwwutils.epoch(end_date), 'isoStart': start_date.isoformat()[:-4], 'isoEnd': end_date.isoformat()[:-4], 'taskName': task_id, 'duration': (end_date - start_date).total_seconds(), 'status': state, 'executionDate': dttm.isoformat(), 'try_number': try_count, }) states = {task['status']: task['status'] for task in tasks} data = { 'taskNames': [ti.task_id for ti in tis], 'tasks': tasks, 'taskStatus': states, 'height': len(tis) * 25 + 25, } session.commit() return self.render( 'airflow/gantt.html', dag=dag, execution_date=dttm.isoformat(), form=form, data=data, base_date='', demo_mode=demo_mode, root=root, ) @expose('/object/task_instances') @login_required @wwwutils.action_logging @provide_session def task_instances(self, session=None): dag_id = request.args.get('dag_id') dag = dagbag.get_dag(dag_id) dttm = request.args.get('execution_date') if dttm: dttm = pendulum.parse(dttm) else: return "Error: Invalid execution_date" task_instances = { ti.task_id: alchemy_to_dict(ti) for ti in dag.get_task_instances(dttm, dttm, session=session)} return json.dumps(task_instances) @expose('/variables/
', methods=["GET", "POST"]) @login_required @wwwutils.action_logging def variables(self, form): try: if request.method == 'POST': data = request.json if data: with create_session() as session: var = models.Variable(key=form, val=json.dumps(data)) session.add(var) session.commit() return "" else: return self.render( 'airflow/variables/{}.html'.format(form) ) except Exception: # prevent XSS form = escape(form) return ("Error: form airflow/variables/{}.html " "not found.").format(form), 404 @expose('/varimport', methods=['POST']) @login_required @wwwutils.action_logging def varimport(self): try: d = json.load(UTF8_READER(request.files['file'])) except Exception as e: flash("Missing file or syntax error: {}.".format(e)) else: suc_count = fail_count = 0 for k, v in d.items(): try: models.Variable.set(k, v, serialize_json=not isinstance(v, six.string_types)) except Exception as e: logging.info('Variable import failed: {}'.format(repr(e))) fail_count += 1 else: suc_count += 1 flash("{} variable(s) successfully updated.".format(suc_count), 'info') if fail_count: flash( "{} variables(s) failed to be updated.".format(fail_count), 'error') return redirect('/admin/variable') class HomeView(AirflowViewMixin, AdminIndexView): @expose("/") @login_required @provide_session def index(self, session=None): DM = models.DagModel # restrict the dags shown if filter_by_owner and current user is not superuser do_filter = FILTER_BY_OWNER and (not current_user.is_superuser()) owner_mode = conf.get('webserver', 'OWNER_MODE').strip().lower() hide_paused_dags_by_default = conf.getboolean('webserver', 'hide_paused_dags_by_default') show_paused_arg = request.args.get('showPaused', 'None') def get_int_arg(value, default=0): try: return int(value) except ValueError: return default arg_current_page = request.args.get('page', '0') arg_search_query = request.args.get('search', None) dags_per_page = PAGE_SIZE current_page = get_int_arg(arg_current_page, default=0) if show_paused_arg.strip().lower() == 'false': hide_paused = True elif show_paused_arg.strip().lower() == 'true': hide_paused = False else: hide_paused = hide_paused_dags_by_default # read orm_dags from the db query = session.query(DM) if do_filter and owner_mode == 'ldapgroup': query = query.filter( ~DM.is_subdag, DM.is_active, DM.owners.in_(current_user.ldap_groups) ) elif do_filter and owner_mode == 'user': query = query.filter( ~DM.is_subdag, DM.is_active, DM.owners == current_user.user.username ) else: query = query.filter( ~DM.is_subdag, DM.is_active ) # optionally filter out "paused" dags if hide_paused: query = query.filter(~DM.is_paused) if arg_search_query: query = query.filter( DM.dag_id.ilike('%' + arg_search_query + '%') | DM.owners.ilike('%' + arg_search_query + '%') ) query = query.order_by(DM.dag_id) start = current_page * dags_per_page end = start + dags_per_page dags = query.offset(start).limit(dags_per_page).all() import_errors = session.query(errors.ImportError).all() for ie in import_errors: flash( "Broken DAG: [{ie.filename}] {ie.stacktrace}".format(ie=ie), "error") from airflow.plugins_manager import import_errors as plugin_import_errors for filename, stacktrace in plugin_import_errors.items(): flash( "Broken plugin: [{filename}] {stacktrace}".format( stacktrace=stacktrace, filename=filename), "error") num_of_all_dags = query.count() num_of_pages = int(math.ceil(num_of_all_dags / float(dags_per_page))) auto_complete_data = set() for row in query.with_entities(DM.dag_id, DM.owners): auto_complete_data.add(row.dag_id) auto_complete_data.add(row.owners) state_color_mapping = State.state_color.copy() state_color_mapping["null"] = state_color_mapping.pop(None) state_color_mapping.update(STATE_COLORS) return self.render( 'airflow/dags.html', dags=dags, hide_paused=hide_paused, current_page=current_page, state_color=state_color_mapping, search_query=arg_search_query if arg_search_query else '', page_size=dags_per_page, num_of_pages=num_of_pages, num_dag_from=min(start + 1, num_of_all_dags), num_dag_to=min(end, num_of_all_dags), num_of_all_dags=num_of_all_dags, paging=wwwutils.generate_pages(current_page, num_of_pages, search=arg_search_query, showPaused=not hide_paused), auto_complete_data=auto_complete_data) class QueryView(wwwutils.DataProfilingMixin, AirflowViewMixin, BaseView): @expose('/', methods=['POST', 'GET']) @wwwutils.gzipped @provide_session def query(self, session=None): dbs = session.query(Connection).order_by(Connection.conn_id).all() session.expunge_all() db_choices = [] for db in dbs: try: if db.get_hook(): db_choices.append((db.conn_id, db.conn_id)) except Exception: pass conn_id_str = request.form.get('conn_id') csv = request.form.get('csv') == "true" sql = request.form.get('sql') class QueryForm(Form): conn_id = SelectField("Layout", choices=db_choices) sql = TextAreaField("SQL", widget=wwwutils.AceEditorWidget()) data = { 'conn_id': conn_id_str, 'sql': sql, } results = None has_data = False error = False if conn_id_str and request.method == 'POST': db = [db for db in dbs if db.conn_id == conn_id_str][0] try: hook = db.get_hook() df = hook.get_pandas_df(wwwutils.limit_sql(sql, QUERY_LIMIT, conn_type=db.conn_type)) # df = hook.get_pandas_df(sql) has_data = len(df) > 0 df = df.fillna('') results = df.to_html( classes=[ 'table', 'table-bordered', 'table-striped', 'no-wrap'], index=False, na_rep='', ) if has_data else '' except Exception as e: flash(str(e), 'error') error = True if has_data and len(df) == QUERY_LIMIT: flash( "Query output truncated at " + str(QUERY_LIMIT) + " rows", 'info') if not has_data and error: flash('No data', 'error') if csv and not error: return Response( response=df.to_csv(index=False), status=200, mimetype="application/text") form = QueryForm(request.form, data=data) session.commit() return self.render( 'airflow/query.html', form=form, title="Ad Hoc Query", results=Markup(results or ''), has_data=has_data) class AirflowModelView(AirflowViewMixin, ModelView): list_template = 'airflow/model_list.html' edit_template = 'airflow/model_edit.html' create_template = 'airflow/model_create.html' column_display_actions = True page_size = PAGE_SIZE class ModelViewOnly(wwwutils.LoginMixin, AirflowModelView): """ Modifying the base ModelView class for non edit, browse only operations """ named_filter_urls = True can_create = False can_edit = False can_delete = False column_display_pk = True class PoolModelView(wwwutils.SuperUserMixin, AirflowModelView): column_list = ('pool', 'slots', 'used_slots', 'queued_slots') column_formatters = dict( pool=pool_link, used_slots=fused_slots, queued_slots=fqueued_slots) named_filter_urls = True validators_columns = { 'pool': [validators.DataRequired()], 'slots': [validators.NumberRange(min=-1)] } form_args = { 'pool': { 'validators': [ validators.DataRequired(), ] } } def delete_model(self, model): if model.pool == models.Pool.DEFAULT_POOL_NAME: flash("default_pool cannot be deleted", 'error') return False return super(PoolModelView, self).delete_model(model) class SlaMissModelView(wwwutils.SuperUserMixin, ModelViewOnly): verbose_name_plural = "SLA misses" verbose_name = "SLA miss" column_list = ( 'dag_id', 'task_id', 'execution_date', 'email_sent', 'timestamp') column_formatters = dict( task_id=task_instance_link, execution_date=datetime_f, timestamp=datetime_f, dag_id=dag_link) named_filter_urls = True column_searchable_list = ('dag_id', 'task_id',) column_filters = ( 'dag_id', 'task_id', 'email_sent', 'timestamp', 'execution_date') filter_converter = wwwutils.UtcFilterConverter() form_widget_args = { 'email_sent': {'disabled': True}, 'timestamp': {'disabled': True}, } @provide_session def _connection_ids(session=None): return [(c.conn_id, c.conn_id) for c in ( session .query(Connection.conn_id) .group_by(Connection.conn_id))] class ChartModelView(wwwutils.DataProfilingMixin, AirflowModelView): verbose_name = "chart" verbose_name_plural = "charts" form_columns = ( 'label', 'owner', 'conn_id', 'chart_type', 'show_datatable', 'x_is_date', 'y_log_scale', 'show_sql', 'height', 'sql_layout', 'sql', 'default_params', ) column_list = ( 'label', 'conn_id', 'chart_type', 'owner', 'last_modified', ) column_sortable_list = ( 'label', 'conn_id', 'chart_type', ('owner', 'owner.username'), 'last_modified', ) column_formatters = dict(label=label_link, last_modified=datetime_f) column_default_sort = ('last_modified', True) create_template = 'airflow/chart/create.html' edit_template = 'airflow/chart/edit.html' column_filters = ('label', 'owner.username', 'conn_id') column_searchable_list = ('owner.username', 'label', 'sql') column_descriptions = { 'label': "Can include {{ templated_fields }} and {{ macros }}", 'chart_type': "The type of chart to be displayed", 'sql': "Can include {{ templated_fields }} and {{ macros }}.", 'height': "Height of the chart, in pixels.", 'conn_id': "Source database to run the query against", 'x_is_date': ( "Whether the X axis should be casted as a date field. Expect most " "intelligible date formats to get casted properly." ), 'owner': ( "The chart's owner, mostly used for reference and filtering in " "the list view." ), 'show_datatable': "Whether to display an interactive data table under the chart.", 'default_params': ( 'A dictionary of {"key": "values",} that define what the ' 'templated fields (parameters) values should be by default. ' 'To be valid, it needs to "eval" as a Python dict. ' 'The key values will show up in the url\'s querystring ' 'and can be altered there.' ), 'show_sql': "Whether to display the SQL statement as a collapsible " "section in the chart page.", 'y_log_scale': "Whether to use a log scale for the Y axis.", 'sql_layout': ( "Defines the layout of the SQL that the application should " "expect. Depending on the tables you are sourcing from, it may " "make more sense to pivot / unpivot the metrics." ), } column_labels = { 'sql': "SQL", 'height': "Chart Height", 'sql_layout': "SQL Layout", 'show_sql': "Display the SQL Statement", 'default_params': "Default Parameters", } form_choices = { 'chart_type': [ ('line', 'Line Chart'), ('spline', 'Spline Chart'), ('bar', 'Bar Chart'), ('column', 'Column Chart'), ('area', 'Overlapping Area Chart'), ('stacked_area', 'Stacked Area Chart'), ('percent_area', 'Percent Area Chart'), ('datatable', 'No chart, data table only'), ], 'sql_layout': [ ('series', 'SELECT series, x, y FROM ...'), ('columns', 'SELECT x, y (series 1), y (series 2), ... FROM ...'), ], 'conn_id': _connection_ids() } def on_model_change(self, form, model, is_created=True): if model.iteration_no is None: model.iteration_no = 0 else: model.iteration_no += 1 if not model.user_id and current_user and hasattr(current_user, 'id'): model.user_id = current_user.id model.last_modified = timezone.utcnow() chart_mapping = dict(( ('line', 'lineChart'), ('spline', 'lineChart'), ('bar', 'multiBarChart'), ('column', 'multiBarChart'), ('area', 'stackedAreaChart'), ('stacked_area', 'stackedAreaChart'), ('percent_area', 'stackedAreaChart'), ('datatable', 'datatable'), )) class KnownEventView(wwwutils.DataProfilingMixin, AirflowModelView): verbose_name = "known event" verbose_name_plural = "known events" form_columns = ( 'label', 'event_type', 'start_date', 'end_date', 'reported_by', 'description', ) form_args = { 'label': { 'validators': [ validators.DataRequired(), ], }, 'event_type': { 'validators': [ validators.DataRequired(), ], }, 'start_date': { 'validators': [ validators.DataRequired(), ], 'filters': [ parse_datetime_f, ], }, 'end_date': { 'validators': [ validators.DataRequired(), GreaterEqualThan(fieldname='start_date'), ], 'filters': [ parse_datetime_f, ] }, 'reported_by': { 'validators': [ validators.DataRequired(), ], } } column_list = ( 'label', 'event_type', 'start_date', 'end_date', 'reported_by', ) column_default_sort = ("start_date", True) column_sortable_list = ( 'label', # todo: yes this has a spelling error ('event_type', 'event_type.know_event_type'), 'start_date', 'end_date', ('reported_by', 'reported_by.username'), ) filter_converter = wwwutils.UtcFilterConverter() form_overrides = dict(start_date=DateTimeField, end_date=DateTimeField) class KnownEventTypeView(wwwutils.DataProfilingMixin, AirflowModelView): pass # NOTE: For debugging / troubleshooting # mv = KnowEventTypeView( # models.KnownEventType, # Session, name="Known Event Types", category="Manage") # admin.add_view(mv) # class DagPickleView(SuperUserMixin, ModelView): # pass # mv = DagPickleView( # models.DagPickle, # Session, name="Pickles", category="Manage") # admin.add_view(mv) class VariableView(wwwutils.DataProfilingMixin, AirflowModelView): verbose_name = "Variable" verbose_name_plural = "Variables" list_template = 'airflow/variable_list.html' def hidden_field_formatter(view, context, model, name): if wwwutils.should_hide_value_for_key(model.key): return Markup('*' * 8) val = getattr(model, name) if val: return val else: return Markup('Invalid') form_columns = ( 'key', 'val', ) column_list = ('key', 'val', 'is_encrypted',) column_filters = ('key', 'val') column_searchable_list = ('key', 'val', 'is_encrypted',) column_default_sort = ('key', False) form_widget_args = { 'is_encrypted': {'disabled': True}, 'val': { 'rows': 20, } } form_args = { 'key': { 'validators': { validators.DataRequired(), }, }, } column_sortable_list = ( 'key', 'val', 'is_encrypted', ) column_formatters = { 'val': hidden_field_formatter, } # Default flask-admin export functionality doesn't handle serialized json @action('varexport', 'Export', None) @provide_session def action_varexport(self, ids, session=None): V = models.Variable qry = session.query(V).filter(V.id.in_(ids)).all() var_dict = {} d = json.JSONDecoder() for var in qry: val = None try: val = d.decode(var.val) except Exception: val = var.val var_dict[var.key] = val response = make_response(json.dumps(var_dict, sort_keys=True, indent=4)) response.headers["Content-Disposition"] = "attachment; filename=variables.json" response.headers["Content-Type"] = "application/json; charset=utf-8" return response def on_form_prefill(self, form, id): if wwwutils.should_hide_value_for_key(form.key.data): form.val.data = '*' * 8 class XComView(wwwutils.SuperUserMixin, AirflowModelView): can_create = False can_edit = False verbose_name = "XCom" verbose_name_plural = "XComs" form_columns = ( 'key', 'value', 'execution_date', 'task_id', 'dag_id', ) form_extra_fields = { 'value': StringField('Value'), } form_args = { 'execution_date': { 'filters': [ parse_datetime_f, ] } } column_filters = ('key', 'timestamp', 'execution_date', 'task_id', 'dag_id') column_searchable_list = ('key', 'timestamp', 'execution_date', 'task_id', 'dag_id') filter_converter = wwwutils.UtcFilterConverter() form_overrides = dict(execution_date=DateTimeField) def on_model_change(self, form, model, is_created): enable_pickling = conf.getboolean('core', 'enable_xcom_pickling') if enable_pickling: model.value = pickle.dumps(model.value) else: try: model.value = json.dumps(model.value).encode('UTF-8') except ValueError: log.error("Could not serialize the XCOM value into JSON. " "If you are using pickles instead of JSON " "for XCOM, then you need to enable pickle " "support for XCOM in your airflow config.") raise class JobModelView(ModelViewOnly): verbose_name_plural = "jobs" verbose_name = "job" column_display_actions = False column_default_sort = ('start_date', True) column_filters = ( 'job_type', 'dag_id', 'state', 'unixname', 'hostname', 'start_date', 'end_date', 'latest_heartbeat') column_formatters = dict( start_date=datetime_f, end_date=datetime_f, hostname=nobr_f, state=state_f, latest_heartbeat=datetime_f) filter_converter = wwwutils.UtcFilterConverter() class DagRunModelView(ModelViewOnly): verbose_name_plural = "DAG Runs" can_edit = True can_create = True verbose_name = "dag run" column_default_sort = ('execution_date', True) form_choices = { '_state': [ ('success', 'success'), ('running', 'running'), ('failed', 'failed'), ], } form_args = { 'dag_id': { 'validators': [ validators.DataRequired(), ] }, 'execution_date': { 'filters': [ parse_datetime_f, ] } } column_list = ( 'state', 'dag_id', 'execution_date', 'run_id', 'external_trigger') column_filters = column_list filter_converter = wwwutils.UtcFilterConverter() column_searchable_list = ('dag_id', 'state', 'run_id') column_formatters = dict( execution_date=datetime_f, state=state_f, start_date=datetime_f, dag_id=dag_link, run_id=dag_run_link ) form_overrides = dict(execution_date=DateTimeField) @action('new_delete', "Delete", "Are you sure you want to delete selected records?") @provide_session def action_new_delete(self, ids, session=None): deleted = set(session.query(models.DagRun) .filter(models.DagRun.id.in_(ids)) .all()) session.query(models.DagRun) \ .filter(models.DagRun.id.in_(ids)) \ .delete(synchronize_session='fetch') session.commit() dirty_ids = [] for row in deleted: dirty_ids.append(row.dag_id) @action('set_running', "Set state to 'running'", None) @provide_session def action_set_running(self, ids, session=None): try: DR = models.DagRun count = 0 dirty_ids = [] for dr in session.query(DR).filter(DR.id.in_(ids)).all(): dirty_ids.append(dr.dag_id) count += 1 dr.state = State.RUNNING dr.start_date = timezone.utcnow() flash( "{count} dag runs were set to running".format(**locals())) except Exception as ex: if not self.handle_view_exception(ex): raise Exception("Ooops") flash('Failed to set state', 'error') @action('set_failed', "Set state to 'failed'", "All running task instances would also be marked as failed, are you sure?") @provide_session def action_set_failed(self, ids, session=None): try: DR = models.DagRun count = 0 dirty_ids = [] altered_tis = [] for dr in session.query(DR).filter(DR.id.in_(ids)).all(): dirty_ids.append(dr.dag_id) count += 1 altered_tis += \ set_dag_run_state_to_failed(dagbag.get_dag(dr.dag_id), dr.execution_date, commit=True, session=session) altered_ti_count = len(altered_tis) flash( "{count} dag runs and {altered_ti_count} task instances " "were set to failed".format(**locals())) except Exception as ex: if not self.handle_view_exception(ex): raise Exception("Ooops") flash('Failed to set state', 'error') @action('set_success', "Set state to 'success'", "All task instances would also be marked as success, are you sure?") @provide_session def action_set_success(self, ids, session=None): try: DR = models.DagRun count = 0 dirty_ids = [] altered_tis = [] for dr in session.query(DR).filter(DR.id.in_(ids)).all(): dirty_ids.append(dr.dag_id) count += 1 altered_tis += \ set_dag_run_state_to_success(dagbag.get_dag(dr.dag_id), dr.execution_date, commit=True, session=session) altered_ti_count = len(altered_tis) flash( "{count} dag runs and {altered_ti_count} task instances " "were set to success".format(**locals())) except Exception as ex: if not self.handle_view_exception(ex): raise Exception("Ooops") flash('Failed to set state', 'error') # Called after editing DagRun model in the UI. @provide_session def after_model_change(self, form, dagrun, is_created, session=None): altered_tis = [] if dagrun.state == State.SUCCESS: altered_tis = set_dag_run_state_to_success( dagbag.get_dag(dagrun.dag_id), dagrun.execution_date, commit=True, session=session) elif dagrun.state == State.FAILED: altered_tis = set_dag_run_state_to_failed( dagbag.get_dag(dagrun.dag_id), dagrun.execution_date, commit=True, session=session) elif dagrun.state == State.RUNNING: altered_tis = set_dag_run_state_to_running( dagbag.get_dag(dagrun.dag_id), dagrun.execution_date, commit=True, session=session) altered_ti_count = len(altered_tis) flash( "1 dag run and {altered_ti_count} task instances " "were set to '{dagrun.state}'".format(**locals())) class LogModelView(ModelViewOnly): verbose_name_plural = "logs" verbose_name = "log" column_display_actions = False column_default_sort = ('dttm', True) column_filters = ('dag_id', 'task_id', 'execution_date', 'extra') filter_converter = wwwutils.UtcFilterConverter() column_formatters = dict( dttm=datetime_f, execution_date=datetime_f, dag_id=dag_link) class TaskInstanceModelView(ModelViewOnly): verbose_name_plural = "task instances" verbose_name = "task instance" column_filters = ( 'state', 'dag_id', 'task_id', 'execution_date', 'hostname', 'queue', 'pool', 'operator', 'start_date', 'end_date') filter_converter = wwwutils.UtcFilterConverter() named_filter_urls = True column_formatters = dict( log_url=log_url_formatter, task_id=task_instance_link, hostname=nobr_f, state=state_f, execution_date=datetime_f, start_date=datetime_f, end_date=datetime_f, queued_dttm=datetime_f, dag_id=dag_link, run_id=dag_run_link, duration=duration_f) column_searchable_list = ('dag_id', 'task_id', 'state') column_default_sort = ('job_id', True) form_choices = { 'state': [ ('success', 'success'), ('running', 'running'), ('failed', 'failed'), ], } column_list = ( 'state', 'dag_id', 'task_id', 'execution_date', 'operator', 'start_date', 'end_date', 'duration', 'job_id', 'hostname', 'unixname', 'priority_weight', 'queue', 'queued_dttm', 'try_number', 'pool', 'log_url') page_size = PAGE_SIZE @action('set_running', "Set state to 'running'", None) def action_set_running(self, ids): self.set_task_instance_state(ids, State.RUNNING) @action('set_failed', "Set state to 'failed'", None) def action_set_failed(self, ids): self.set_task_instance_state(ids, State.FAILED) @action('set_success', "Set state to 'success'", None) def action_set_success(self, ids): self.set_task_instance_state(ids, State.SUCCESS) @action('set_retry', "Set state to 'up_for_retry'", None) def action_set_retry(self, ids): self.set_task_instance_state(ids, State.UP_FOR_RETRY) @provide_session @action('clear', lazy_gettext('Clear'), lazy_gettext( 'Are you sure you want to clear the state of the selected task instance(s)' ' and set their dagruns to the running state?')) def action_clear(self, ids, session=None): try: TI = models.TaskInstance dag_to_task_details = {} dag_to_tis = {} # Collect dags upfront as dagbag.get_dag() will reset the session for id_str in ids: task_id, dag_id, execution_date = iterdecode(id_str) dag = dagbag.get_dag(dag_id) task_details = dag_to_task_details.setdefault(dag, []) task_details.append((task_id, execution_date)) for dag, task_details in dag_to_task_details.items(): for task_id, execution_date in task_details: execution_date = parse_execution_date(execution_date) ti = session.query(TI).filter(TI.task_id == task_id, TI.dag_id == dag.dag_id, TI.execution_date == execution_date).one() tis = dag_to_tis.setdefault(dag, []) tis.append(ti) for dag, tis in dag_to_tis.items(): models.clear_task_instances(tis, session=session, dag=dag) session.commit() flash("{0} task instances have been cleared".format(len(ids))) except Exception as ex: if not self.handle_view_exception(ex): raise Exception("Ooops") flash('Failed to clear task instances', 'error') @provide_session def set_task_instance_state(self, ids, target_state, session=None): try: TI = models.TaskInstance count = len(ids) for id in ids: task_id, dag_id, execution_date = iterdecode(id) execution_date = parse_execution_date(execution_date) ti = session.query(TI).filter(TI.task_id == task_id, TI.dag_id == dag_id, TI.execution_date == execution_date).one() ti.state = target_state session.commit() flash( "{count} task instances were set to '{target_state}'".format(**locals())) except Exception as ex: if not self.handle_view_exception(ex): raise Exception("Ooops") flash('Failed to set state', 'error') def get_one(self, id): """ As a workaround for AIRFLOW-252, this method overrides Flask-Admin's ModelView.get_one(). TODO: this method should be removed once the below bug is fixed on Flask-Admin side. https://github.com/flask-admin/flask-admin/issues/1226 """ task_id, dag_id, execution_date = iterdecode(id) execution_date = pendulum.parse(execution_date) return self.session.query(self.model).get((task_id, dag_id, execution_date)) class ConnectionModelView(wwwutils.SuperUserMixin, AirflowModelView): create_template = 'airflow/conn_create.html' edit_template = 'airflow/conn_edit.html' list_template = 'airflow/conn_list.html' form_columns = ( 'conn_id', 'conn_type', 'host', 'schema', 'login', 'password', 'port', 'extra', 'extra__jdbc__drv_path', 'extra__jdbc__drv_clsname', 'extra__google_cloud_platform__project', 'extra__google_cloud_platform__key_path', 'extra__google_cloud_platform__keyfile_dict', 'extra__google_cloud_platform__scope', 'extra__google_cloud_platform__num_retries', 'extra__grpc__auth_type', 'extra__grpc__credentials_pem_file', 'extra__grpc__scopes', 'extra__oci__fingerprint', 'extra__oci__key_file', 'extra__oci__tenancy', 'extra__oci__region', 'extra__yandexcloud__service_account_json', 'extra__yandexcloud__service_account_json_path', 'extra__yandexcloud__oauth', 'extra__yandexcloud__public_ssh_key', 'extra__yandexcloud__folder_id', ) verbose_name = "Connection" verbose_name_plural = "Connections" column_default_sort = ('conn_id', False) column_list = ('conn_id', 'conn_type', 'host', 'port', 'is_encrypted', 'is_extra_encrypted',) form_overrides = dict(_password=PasswordField, _extra=TextAreaField) form_args = dict( conn_id=dict(validators=[validators.DataRequired()]) ) form_widget_args = { 'is_extra_encrypted': {'disabled': True}, 'is_encrypted': {'disabled': True}, } # Used to customized the form, the forms elements get rendered # and results are stored in the extra field as json. All of these # need to be prefixed with extra__ and then the conn_type ___ as in # extra__{conn_type}__name. You can also hide form elements and rename # others from the connection_form.js file form_extra_fields = { 'extra__jdbc__drv_path': StringField('Driver Path'), 'extra__jdbc__drv_clsname': StringField('Driver Class'), 'extra__google_cloud_platform__project': StringField('Project Id'), 'extra__google_cloud_platform__key_path': StringField('Keyfile Path'), 'extra__google_cloud_platform__keyfile_dict': PasswordField('Keyfile JSON'), 'extra__google_cloud_platform__scope': StringField('Scopes (comma separated)'), 'extra__google_cloud_platform__num_retries': IntegerField( 'Number of Retries', validators=[ validators.Optional(strip_whitespace=True), validators.NumberRange(min=0), ], ), 'extra__grpc__auth_type': StringField('Grpc Auth Type'), 'extra__grpc__credentials_pem_file': StringField('Credential Keyfile Path'), 'extra__grpc__scopes': StringField('Scopes (comma separated)'), 'extra__oci__fingerprint': StringField('API Key Fingerprint'), 'extra__oci__key_file': StringField('API Key Path'), 'extra__oci__tenancy': StringField('Tenancy OCID'), 'extra__oci__region': StringField('OCI Region'), 'extra__yandexcloud__service_account_json': PasswordField('Service account auth JSON'), 'extra__yandexcloud__service_account_json_path': StringField('Service account auth JSON file path'), 'extra__yandexcloud__oauth': PasswordField('OAuth Token'), 'extra__yandexcloud__public_ssh_key': StringField('Public SSH key'), 'extra__yandexcloud__folder_id': StringField('Default folder ID'), } form_choices = { 'conn_type': Connection._types } def on_model_change(self, form, model, is_created): formdata = form.data if formdata['conn_type'] in ['jdbc', 'google_cloud_platform', 'gprc', 'oci', 'yandexcloud']: extra = { key: formdata[key] for key in self.form_extra_fields.keys() if key in formdata} model.extra = json.dumps(extra) @classmethod def alert_fernet_key(cls): fk = None try: fk = conf.get('core', 'fernet_key') except Exception: pass return fk is None @classmethod def is_secure(cls): """ Used to display a message in the Connection list view making it clear that the passwords and `extra` field can't be encrypted. """ is_secure = False try: import cryptography # noqa F401 conf.get('core', 'fernet_key') is_secure = True except Exception: pass return is_secure def on_form_prefill(self, form, id): try: d = json.loads(form.data.get('extra', '{}')) except Exception: d = {} for field in list(self.form_extra_fields.keys()): value = d.get(field, '') if value: field = getattr(form, field) field.data = value class UserModelView(wwwutils.SuperUserMixin, AirflowModelView): verbose_name = "User" verbose_name_plural = "Users" column_default_sort = 'username' class VersionView(wwwutils.SuperUserMixin, AirflowViewMixin, BaseView): @expose('/') def version(self): # Look at the version from setup.py try: airflow_version = airflow.__version__ except Exception as e: airflow_version = None logging.error(e) # Get the Git repo and git hash git_version = None try: with open(os.path.join(*[settings.AIRFLOW_HOME, 'airflow', 'git_version'])) as f: git_version = f.readline() except Exception as e: logging.error(e) # Render information title = "Version Info" return self.render('airflow/version.html', title=title, airflow_version=airflow_version, git_version=git_version) class ConfigurationView(wwwutils.SuperUserMixin, AirflowViewMixin, BaseView): @expose('/') def conf(self): raw = request.args.get('raw') == "true" title = "Airflow Configuration" subtitle = configuration.AIRFLOW_CONFIG if conf.getboolean("webserver", "expose_config"): with open(configuration.AIRFLOW_CONFIG, 'r') as f: config = f.read() table = [(section, key, value, source) for section, parameters in conf.as_dict(True, True).items() for key, (value, source) in parameters.items()] else: config = ( "# Your Airflow administrator chose not to expose the " "configuration, most likely for security reasons.") table = None if raw: return Response( response=config, status=200, mimetype="application/text") else: code_html = Markup(highlight( config, lexers.IniLexer(), # Lexer call HtmlFormatter(noclasses=True)) ) return self.render( 'airflow/config.html', pre_subtitle=settings.HEADER + " v" + airflow.__version__, code_html=code_html, title=title, subtitle=subtitle, table=table) class DagModelView(wwwutils.SuperUserMixin, ModelView): column_list = ('dag_id', 'owners') column_editable_list = ('is_paused', 'description', 'default_view') form_excluded_columns = ('is_subdag', 'is_active') column_searchable_list = ('dag_id',) column_filters = ( 'dag_id', 'owners', 'is_paused', 'is_active', 'is_subdag', 'last_scheduler_run', 'last_expired') filter_converter = wwwutils.UtcFilterConverter() form_widget_args = { 'last_scheduler_run': {'disabled': True}, 'fileloc': {'disabled': True}, 'is_paused': {'disabled': True}, 'last_pickled': {'disabled': True}, 'pickle_id': {'disabled': True}, 'last_loaded': {'disabled': True}, 'last_expired': {'disabled': True}, 'pickle_size': {'disabled': True}, 'scheduler_lock': {'disabled': True}, 'owners': {'disabled': True}, } column_formatters = dict( dag_id=dag_link, ) can_delete = False can_create = False page_size = PAGE_SIZE list_template = 'airflow/list_dags.html' named_filter_urls = True def get_query(self): """ Default filters for model """ return super(DagModelView, self)\ .get_query()\ .filter(or_(models.DagModel.is_active, models.DagModel.is_paused))\ .filter(~models.DagModel.is_subdag) def get_count_query(self): """ Default filters for model """ return super(DagModelView, self)\ .get_count_query()\ .filter(models.DagModel.is_active)\ .filter(~models.DagModel.is_subdag) def edit_form(self, obj=None): # Ensure that disabled fields aren't overwritten form = super(DagModelView, self).edit_form(obj) if not obj: return obj for fld in form: if self.form_widget_args.get(fld.name, {}).get('disabled'): fld.data = getattr(obj, fld.name) return form