From 96400d1b3823f81c9bd375285040a8674e00b0f3 Mon Sep 17 00:00:00 2001 From: Giovanni Paolo Gibilisco Date: Tue, 19 Jun 2018 09:14:02 +0200 Subject: [PATCH 1/2] Add endpoint to get a specific dag from the id and to PUT a paused state chhange --- blueprints/airflow_api.py | 62 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 60 insertions(+), 2 deletions(-) diff --git a/blueprints/airflow_api.py b/blueprints/airflow_api.py index 2d1f0ab..0a3fa3f 100644 --- a/blueprints/airflow_api.py +++ b/blueprints/airflow_api.py @@ -8,10 +8,11 @@ from sqlalchemy import or_ from airflow import settings from airflow.exceptions import AirflowException, AirflowConfigException -from airflow.models import DagBag, DagRun +from airflow.models import DagBag, DagRun, DagModel from airflow.utils.state import State from airflow.utils.dates import date_range as utils_date_range from airflow.www.app import csrf +from airflow.bin.cli import pause, unpause, get_dag as cli_get_dag airflow_api_blueprint = Blueprint('airflow_api', __name__, url_prefix='/api/v1') @@ -65,6 +66,13 @@ def unauthorized(error='Not authorized to access this resource'): def server_error(error='An unexpected problem occurred'): return ApiResponse.error(ApiResponse.STATUS_SERVER_ERROR, error) + +class CliArgs: + def __init__(self, dag_id, subdir): + self.subdir = subdir + self.dag_id = dag_id + + @airflow_api_blueprint.before_request def verify_authentication(): authorization = request.headers.get('authorization') @@ -120,6 +128,55 @@ def dags_index(): return ApiResponse.success({'dags': dags}) +@csrf.exempt +@airflow_api_blueprint.route('/dags/', methods=['PUT']) +def dag_update(dag_id): + args = CliArgs(dag_id, 'dags') + try: + dag = cli_get_dag(args) + except AirflowException: + return ApiResponse.not_found('Could not find a dag with ID {}'.format(dag_id)) + + body = request.get_json() + if body is None or 'is_active' not in body: + return ApiResponse.bad_request("A Json body with 'is_active': True/False is expected") + + try: + if body['is_active']: + unpause(None, dag) + elif not body['is_active']: + pause(None, dag) + except AirflowException: + return ApiResponse.not_found('Could not pause/unpause dag with ID {}'.format(dag_id)) + + payload = { + 'dag_id': dag_id, + 'full_path': dag.full_filepath, + 'is_active': (not dag.is_paused), + 'last_execution': str(dag.latest_execution_date) + } + + return ApiResponse.success(payload) + + +@airflow_api_blueprint.route('/dags/', methods=['GET']) +def get_dag(dag_id): + args = CliArgs(dag_id, 'dags') + + try: + dag = cli_get_dag(args) + except AirflowException: + return ApiResponse.not_found('Could not find a dag with ID {}'.format(dag_id)) + + payload = { + 'dag_id': dag_id, + 'full_path': dag.full_filepath, + 'is_active': (not dag.is_paused), + 'last_execution': str(dag.latest_execution_date) + } + + return ApiResponse.success(payload) + @airflow_api_blueprint.route('/dag_runs', methods=['GET']) def get_dag_runs(): @@ -148,6 +205,7 @@ def get_dag_runs(): return ApiResponse.success({'dag_runs': dag_runs}) + @csrf.exempt @airflow_api_blueprint.route('/dag_runs', methods=['POST']) def create_dag_run(): @@ -284,4 +342,4 @@ def get_dag_run(dag_run_id): session.close() - return ApiResponse.success({'dag_run': format_dag_run(dag_run)}) \ No newline at end of file + return ApiResponse.success({'dag_run': format_dag_run(dag_run)}) From b4807530a3831b15d9a8068c6c13433ecbee4c8f Mon Sep 17 00:00:00 2001 From: Giovanni Paolo Gibilisco Date: Wed, 11 Jul 2018 14:41:13 +0200 Subject: [PATCH 2/2] Add api to create and delete a connection --- blueprints/airflow_api.py | 81 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 76 insertions(+), 5 deletions(-) diff --git a/blueprints/airflow_api.py b/blueprints/airflow_api.py index 0a3fa3f..01bee25 100644 --- a/blueprints/airflow_api.py +++ b/blueprints/airflow_api.py @@ -3,6 +3,7 @@ import os import six import time +import logging from flask import Blueprint, request, Response from sqlalchemy import or_ @@ -12,7 +13,7 @@ from airflow.utils.state import State from airflow.utils.dates import date_range as utils_date_range from airflow.www.app import csrf -from airflow.bin.cli import pause, unpause, get_dag as cli_get_dag +from airflow.bin.cli import pause, unpause, get_dag as cli_get_dag, connections as cli_connections airflow_api_blueprint = Blueprint('airflow_api', __name__, url_prefix='/api/v1') @@ -67,10 +68,11 @@ def server_error(error='An unexpected problem occurred'): return ApiResponse.error(ApiResponse.STATUS_SERVER_ERROR, error) -class CliArgs: +class dagCliArgs: def __init__(self, dag_id, subdir): self.subdir = subdir self.dag_id = dag_id + @airflow_api_blueprint.before_request @@ -131,12 +133,12 @@ def dags_index(): @csrf.exempt @airflow_api_blueprint.route('/dags/', methods=['PUT']) def dag_update(dag_id): - args = CliArgs(dag_id, 'dags') + args = dagCliArgs(dag_id, 'dags') try: dag = cli_get_dag(args) except AirflowException: return ApiResponse.not_found('Could not find a dag with ID {}'.format(dag_id)) - + logging.info("Processing dag {} PUT body {}".format(dag_id, request.get_json())) body = request.get_json() if body is None or 'is_active' not in body: return ApiResponse.bad_request("A Json body with 'is_active': True/False is expected") @@ -161,7 +163,7 @@ def dag_update(dag_id): @airflow_api_blueprint.route('/dags/', methods=['GET']) def get_dag(dag_id): - args = CliArgs(dag_id, 'dags') + args = dagCliArgs(dag_id, 'dags') try: dag = cli_get_dag(args) @@ -343,3 +345,72 @@ def get_dag_run(dag_run_id): session.close() return ApiResponse.success({'dag_run': format_dag_run(dag_run)}) + +class connectionCliArgs: + def __init__(self, mode, conn_id=None, conn_uri=None ): + + self.list = False + self.delete = False + self.add = False + + if mode == 'list': + self.list = True + elif mode == 'delete': + self.delete = True + elif mode == 'add': + self.add = True + + self.conn_id = conn_id + self.conn_uri = conn_uri + self.conn_extra = None + self.conn_type = None + self.conn_host = None + self.conn_login = None + self.conn_password = None + self.conn_schema = None + self.conn_port = None + +@csrf.exempt +@airflow_api_blueprint.route('/connections/', methods=['DELETE']) +def delete_connections(conn_id): + args = connectionCliArgs('delete',conn_id=conn_id) + + try: + cli_connections(args) + except AirflowException: + return ApiResponse.error('Could not delete the connection') + + payload = { + 'status': 'deleted' + } + + return ApiResponse.success(payload) + +@csrf.exempt +@airflow_api_blueprint.route('/connections', methods=['POST']) +def add_connections(): + + # decode input + data = request.get_json(force=True) + # ensure there is a conn_id + if 'conn_id' not in data or data['conn_id'] is None: + return ApiResponse.bad_request('Must specify the connection id (conn_id) for the new connection') + conn_id = data['conn_id'] + + # ensure there is a dag id + if 'conn_uri' not in data or data['conn_uri'] is None: + return ApiResponse.bad_request('Must specify the connection uri (conn_uri) for the new connection') + conn_uri = data['conn_uri'] + + args = connectionCliArgs('add',conn_id=conn_id, conn_uri=conn_uri) + + try: + cli_connections(args) + except AirflowException: + return ApiResponse.error('Could not add the new connection ') + + payload = { + 'status': 'created' + } + + return ApiResponse.success(payload)