Source code for reana_workflow_controller.rest.workflows

# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2020, 2021, 2022 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""REANA Workflow Controller workflows REST API."""

import json
import logging
from typing import Optional
from uuid import uuid4

from flask import Blueprint, jsonify, request
from reana_commons.config import WORKFLOW_TIME_FORMAT
from reana_db.database import Session
from reana_db.utils import build_workspace_path
from reana_db.models import User, Workflow, RunStatus, WorkflowResource
from reana_db.utils import _get_workflow_with_uuid_or_name, get_default_quota_resource
from sqlalchemy import and_, nullslast
from webargs import fields
from webargs.flaskparser import use_args, use_kwargs


from reana_workflow_controller.config import DEFAULT_NAME_FOR_WORKFLOWS
from reana_workflow_controller.errors import (
    REANAWorkflowControllerError,
    REANAWorkflowNameError,
)
from reana_workflow_controller.rest.utils import (
    create_workflow_workspace,
    get_specification_diff,
    get_workflow_name,
    get_workflow_progress,
    get_workspace_diff,
    is_uuid_v4,
    use_paginate_args,
)


START = "start"
STOP = "stop"
DELETED = "deleted"
STATUSES = {START, STOP, DELETED}

blueprint = Blueprint("workflows", __name__)


[docs]@blueprint.route("/workflows", methods=["GET"]) @use_paginate_args() @use_args( { "include_progress": fields.Bool(), "include_workspace_size": fields.Bool(), "search": fields.String(missing=""), "sort": fields.String(missing="desc"), "status": fields.String(missing=""), "type": fields.String(required=True), "user": fields.String(required=True), "verbose": fields.Bool(missing=False), "workflow_id_or_name": fields.String(), }, location="query", ) def get_workflows(args, paginate=None): # noqa r"""Get all workflows. --- get: summary: Returns all workflows. description: >- This resource is expecting a user UUID. The information related to all workflows for a given user will be served as JSON operationId: get_workflows produces: - application/json parameters: - name: user in: query description: Required. UUID of workflow owner. required: true type: string - name: type in: query description: Required. Type of workflows. required: true type: string - name: verbose in: query description: Optional flag to show more information. required: false type: boolean - name: search in: query description: Filter workflows by name. required: false type: string - name: sort in: query description: Sort workflows by creation date (asc, desc). required: false type: string - name: status in: query description: Filter workflows by list of statuses. required: false type: array items: type: string - name: page in: query description: Results page number (pagination). required: false type: integer - name: size in: query description: Number of results per page (pagination). required: false type: integer - name: include_progress in: query description: Include progress information of the workflows. required: false type: boolean - name: include_workspace_size in: query description: Include size information of the workspace. required: false type: boolean - name: workflow_id_or_name in: query description: Optional analysis UUID or name to filter. required: false type: string responses: 200: description: >- Requests succeeded. The response contains the current workflows for a given user. schema: type: object properties: total: type: integer items: type: array items: type: object properties: id: type: string name: type: string status: type: string size: type: object properties: raw: type: number human_readable: type: string user: type: string created: type: string progress: type: object launcher_url: type: string x-nullable: true examples: application/json: [ { "id": "256b25f4-4cfb-4684-b7a8-73872ef455a1", "name": "mytest.1", "status": "running", "size":{ "raw": 10490000, "human_readable": "10 MB" }, "user": "00000000-0000-0000-0000-000000000000", "created": "2018-06-13T09:47:35.66097", "launcher_url": "https://github.com/reanahub/reana-demo-helloworld.git", }, { "id": "3c9b117c-d40a-49e3-a6de-5f89fcada5a3", "name": "mytest.2", "status": "finished", "size":{ "raw": 12580000, "human_readable": "12 MB" }, "user": "00000000-0000-0000-0000-000000000000", "created": "2018-06-13T09:47:35.66097", "launcher_url": "https://example.org/specs/reana-snakemake.yaml", }, { "id": "72e3ee4f-9cd3-4dc7-906c-24511d9f5ee3", "name": "mytest.3", "status": "created", "size":{ "raw": 184320, "human_readable": "180 KB" }, "user": "00000000-0000-0000-0000-000000000000", "created": "2018-06-13T09:47:35.66097", "launcher_url": "https://zenodo.org/record/1/reana.yaml", }, { "id": "c4c0a1a6-beef-46c7-be04-bf4b3beca5a1", "name": "mytest.4", "status": "created", "size": { "raw": 1074000000, "human_readable": "1 GB" }, "user": "00000000-0000-0000-0000-000000000000", "created": "2018-06-13T09:47:35.66097", "launcher_url": null, } ] 400: description: >- Request failed. The incoming data specification seems malformed. 404: description: >- Request failed. User does not exist. examples: application/json: { "message": "User 00000000-0000-0000-0000-000000000000 does not exist" } 500: description: >- Request failed. Internal controller error. examples: application/json: { "message": "Internal workflow controller error." } """ user_uuid: str = args["user"] type_: str = args["type"] verbose: bool = args["verbose"] sort: str = args["sort"] search: str = args["search"] status_list: str = args["status"] include_progress: bool = args.get("include_progress", verbose) include_workspace_size: bool = args.get("include_workspace_size", verbose) workflow_id_or_name: Optional[str] = args.get("workflow_id_or_name") try: user = User.query.filter(User.id_ == user_uuid).first() if not user: return jsonify({"message": "User {} does not exist".format(user_uuid)}), 404 workflows = [] query = user.workflows if search: search = json.loads(search) search_val = search.get("name")[0] query = query.filter(Workflow.name.ilike("%{}%".format(search_val))) if status_list: workflow_status = [RunStatus[status] for status in status_list.split(",")] query = query.filter(Workflow.status.in_(workflow_status)) if workflow_id_or_name: query = ( query.filter(Workflow.id_ == workflow_id_or_name) if is_uuid_v4(workflow_id_or_name) else query.filter(Workflow.name == workflow_id_or_name) ) column_sorted = Workflow.created.desc() if sort in ["disk-desc", "cpu-desc"]: resource_type = sort.split("-")[0] resource = get_default_quota_resource(resource_type) query = query.join( WorkflowResource, and_( Workflow.id_ == WorkflowResource.workflow_id, WorkflowResource.resource_id == resource.id_, ), isouter=True, ) column_sorted = nullslast(WorkflowResource.quota_used.desc()) elif sort in ["asc", "desc"]: column_sorted = getattr(Workflow.created, sort)() pagination_dict = paginate(query.order_by(column_sorted)) for workflow in pagination_dict["items"]: workflow_response = { "id": workflow.id_, "name": get_workflow_name(workflow), "status": workflow.status.name, "user": user_uuid, "launcher_url": workflow.launcher_url, "created": workflow.created.strftime(WORKFLOW_TIME_FORMAT), "progress": get_workflow_progress( workflow, include_progress=include_progress ), } if type_ == "interactive" or verbose: int_session = workflow.sessions.first() if int_session: workflow_response["session_type"] = int_session.type_.name workflow_response["session_uri"] = int_session.path workflow_response["session_status"] = int_session.status.name # Skip workflow if type is interactive and there is no session elif type_ == "interactive": continue empty_disk_usage = { "human_readable": "", "raw": -1, } if include_workspace_size: workflow_response["size"] = ( workflow.get_quota_usage() .get("disk", {}) .get("usage", empty_disk_usage) ) else: workflow_response["size"] = empty_disk_usage workflows.append(workflow_response) pagination_dict["items"] = workflows pagination_dict["user_has_workflows"] = user.workflows.first() is not None return jsonify(pagination_dict), 200 except (ValueError, KeyError): return jsonify({"message": "Malformed request."}), 400 except json.JSONDecodeError: return jsonify({"message": "Your request contains not valid JSON."}), 400 except Exception as e: return jsonify({"message": str(e)}), 500
[docs]@blueprint.route("/workflows", methods=["POST"]) def create_workflow(): # noqa r"""Create workflow and its workspace. --- post: summary: Create workflow and its workspace. description: >- This resource expects all necessary data to represent a workflow so it is stored in database and its workspace is created. operationId: create_workflow produces: - application/json parameters: - name: user in: query description: Required. UUID of workflow owner. required: true type: string - name: workspace_root_path in: query description: A root path under which the workflow workspaces are stored. required: false type: string - name: workflow in: body description: >- JSON object including workflow parameters and workflow specification in JSON format (`yadageschemas.load()` output) with necessary data to instantiate a yadage workflow. required: true schema: type: object properties: operational_options: type: object description: Operational options. reana_specification: type: object description: Workflow specification in JSON format. workflow_name: type: string description: Workflow name. If empty name will be generated. git_data: type: object description: GitLab data. launcher_url: type: string description: Launcher URL. retention_rules: type: array title: Retention rules list for the files in the workspace. items: title: Retention rule for the files in the workspace. type: object additionalProperties: false properties: workspace_files: type: string retention_days: type: integer required: [reana_specification, workflow_name, operational_options, retention_rules] responses: 201: description: >- Request succeeded. The workflow has been created along with its workspace schema: type: object properties: message: type: string workflow_id: type: string workflow_name: type: string examples: application/json: { "message": "Workflow workspace has been created.", "workflow_id": "cdcf48b1-c2f3-4693-8230-b066e088c6ac", "workflow_name": "mytest-1" } 400: description: >- Request failed. The incoming data specification seems malformed 404: description: >- Request failed. User does not exist. examples: application/json: { "message": "User 00000000-0000-0000-0000-000000000000 does not exist" } """ try: user_uuid = request.args["user"] user = User.query.filter(User.id_ == user_uuid).first() if not user: return ( jsonify( {"message": "User with id:{} does not exist".format(user_uuid)} ), 404, ) workflow_uuid = str(uuid4()) # Use name prefix user specified or use default name prefix # Actual name is prefix + autoincremented run_number. workflow_name = request.json.get("workflow_name", "") if workflow_name == "": workflow_name = DEFAULT_NAME_FOR_WORKFLOWS else: try: workflow_name.encode("ascii") except UnicodeEncodeError: # `workflow_name` contains something else than just ASCII. raise REANAWorkflowNameError( "Workflow name {} is not valid.".format(workflow_name) ) git_ref = "" git_repo = "" if "git_data" in request.json: git_data = request.json["git_data"] git_ref = git_data["git_commit_sha"] git_repo = git_data["git_url"] # add spec and params to DB as JSON workspace_root_path = request.args.get("workspace_root_path", None) reana_specification = request.json["reana_specification"] workflow = Workflow( id_=workflow_uuid, name=workflow_name, owner_id=request.args["user"], reana_specification=reana_specification, operational_options=request.json.get("operational_options", {}), type_=reana_specification["workflow"]["type"], logs="", git_ref=git_ref, git_repo=git_repo, workspace_path=build_workspace_path( request.args["user"], workflow_uuid, workspace_root_path ), launcher_url=request.json.get("launcher_url"), ) Session.add(workflow) Session.object_session(workflow).commit() retention_rules = request.json.get("retention_rules", []) if retention_rules: workflow.set_workspace_retention_rules(retention_rules) if git_ref: create_workflow_workspace( workflow.workspace_path, user_id=user.id_, git_url=git_data["git_url"], git_branch=git_data["git_branch"], git_ref=git_ref, ) else: create_workflow_workspace(workflow.workspace_path) return ( jsonify( { "message": "Workflow workspace created", "workflow_id": workflow.id_, "workflow_name": get_workflow_name(workflow), } ), 201, ) except (REANAWorkflowNameError, KeyError) as e: return jsonify({"message": str(e)}), 400 except Exception as e: return jsonify({"message": str(e)}), 500
[docs]@blueprint.route("/workflows/<workflow_id_or_name>/parameters", methods=["GET"]) def get_workflow_parameters(workflow_id_or_name): # noqa r"""Get workflow input parameters. --- get: summary: Get workflow parameters. description: >- This resource reports the input parameters of workflow. operationId: get_workflow_parameters produces: - application/json parameters: - name: user in: query description: Required. UUID of workflow owner. required: true type: string - name: workflow_id_or_name in: path description: Required. Workflow UUID or name. required: true type: string responses: 200: description: >- Request succeeded. Workflow input parameters, including the status are returned. schema: type: object properties: id: type: string name: type: string type: type: string parameters: type: object examples: application/json: { 'id': 'dd4e93cf-e6d0-4714-a601-301ed97eec60', 'name': 'workflow.24', 'type': 'serial', 'parameters': {'helloworld': 'code/helloworld.py', 'inputfile': 'data/names.txt', 'outputfile': 'results/greetings.txt', 'sleeptime': 2} } 400: description: >- Request failed. The incoming data specification seems malformed. examples: application/json: { "message": "Malformed request." } 403: description: >- Request failed. User is not allowed to access workflow. examples: application/json: { "message": "User 00000000-0000-0000-0000-000000000000 is not allowed to access workflow 256b25f4-4cfb-4684-b7a8-73872ef455a1" } 404: description: >- Request failed. Either User or Workflow does not exist. examples: application/json: { "message": "User 00000000-0000-0000-0000-000000000000 does not exist" } application/json: { "message": "Workflow 256b25f4-4cfb-4684-b7a8-73872ef455a1 does not exist" } 500: description: >- Request failed. Internal controller error. """ try: user_uuid = request.args["user"] workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, user_uuid) workflow_parameters = workflow.get_input_parameters() return ( jsonify( { "id": workflow.id_, "name": get_workflow_name(workflow), "type": workflow.reana_specification["workflow"]["type"], "parameters": workflow_parameters, } ), 200, ) except ValueError: return ( jsonify( { "message": "REANA_WORKON is set to {0}, but " "that workflow does not exist. " "Please set your REANA_WORKON environment " "variable appropriately.".format(workflow_id_or_name) } ), 404, ) except KeyError as e: return jsonify({"message": str(e)}), 400 except Exception as e: return jsonify({"message": str(e)}), 500
[docs]@blueprint.route( "/workflows/<workflow_id_or_name_a>/diff/" "<workflow_id_or_name_b>", methods=["GET"], ) def get_workflow_diff(workflow_id_or_name_a, workflow_id_or_name_b): # noqa r"""Get differences between two workflows. --- get: summary: Get diff between two workflows. description: >- This resource shows the differences between the assets of two workflows. Resource is expecting two workflow UUIDs or names. operationId: get_workflow_diff produces: - application/json parameters: - name: user in: query description: Required. UUID of workflow owner. required: true type: string - name: workflow_id_or_name_a in: path description: Required. Analysis UUID or name of the first workflow. required: true type: string - name: workflow_id_or_name_b in: path description: Required. Analysis UUID or name of the second workflow. required: true type: string - name: brief in: query description: Optional flag. If set, file contents are examined. required: false type: boolean default: false - name: context_lines in: query description: Optional parameter. Sets number of context lines for workspace diff output. required: false type: string default: '5' responses: 200: description: >- Request succeeded. Info about a workflow, including the status is returned. schema: type: object properties: reana_specification: type: string workspace_listing: type: string examples: application/json: { "reana_specification": ["- nevents: 100000\n+ nevents: 200000"], "workspace_listing": {"Only in workspace a: code"} } 400: description: >- Request failed. The incoming payload seems malformed. examples: application/json: { "message": "Malformed request." } 403: description: >- Request failed. User is not allowed to access workflow. examples: application/json: { "message": "User 00000000-0000-0000-0000-000000000000 is not allowed to access workflow 256b25f4-4cfb-4684-b7a8-73872ef455a1" } 404: description: >- Request failed. Either user or workflow does not exist. examples: application/json: { "message": "Workflow 256b25f4-4cfb-4684-b7a8-73872ef455a1 does not exist." } 500: description: >- Request failed. Internal controller error. """ try: user_uuid = request.args["user"] brief = json.loads(request.args.get("brief", "false").lower()) context_lines = request.args.get("context_lines", 5) workflow_a_exists = False workflow_a = _get_workflow_with_uuid_or_name(workflow_id_or_name_a, user_uuid) workflow_a_exists = True workflow_b = _get_workflow_with_uuid_or_name(workflow_id_or_name_b, user_uuid) if not workflow_id_or_name_a or not workflow_id_or_name_b: raise ValueError("Workflow id or name is not supplied") specification_diff = get_specification_diff(workflow_a, workflow_b) try: workspace_diff = get_workspace_diff( workflow_a, workflow_b, brief, context_lines ) except ValueError as e: workspace_diff = str(e) response = { "reana_specification": json.dumps(specification_diff), "workspace_listing": json.dumps(workspace_diff), } return jsonify(response) except REANAWorkflowControllerError as e: return jsonify({"message": str(e)}), 409 except ValueError: wrong_workflow = ( workflow_id_or_name_b if workflow_a_exists else workflow_id_or_name_a ) return ( jsonify({"message": "Workflow {0} does not exist.".format(wrong_workflow)}), 404, ) except KeyError as e: return jsonify({"message": str(e)}), 400 except ValueError as e: return jsonify({"message": str(e)}), 400 except json.JSONDecodeError: return jsonify({"message": "Your request contains not valid JSON."}), 400 except Exception as e: return jsonify({"message": str(e)}), 500
[docs]@blueprint.route("/workflows/<workflow_id_or_name>/retention_rules") @use_kwargs({"user": fields.Str(required=True)}, location="query") def get_workflow_retention_rules(workflow_id_or_name: str, user: str): r"""Get the retention rules of a workflow. --- get: summary: Get the retention rules of a workflow. description: >- This resource returns all the retention rules of a given workflow. operationId: get_workflow_retention_rules produces: - application/json parameters: - name: user in: query description: Required. UUID of workflow owner. required: true type: string - name: workflow_id_or_name in: path description: Required. Analysis UUID or name. required: true type: string responses: 200: description: >- Request succeeded. The response contains the list of all the retention rules. schema: type: object properties: workflow_id: type: string workflow_name: type: string retention_rules: type: array items: type: object properties: id: type: string workspace_files: type: string retention_days: type: integer apply_on: type: string x-nullable: true status: type: string examples: application/json: { "workflow_id": "256b25f4-4cfb-4684-b7a8-73872ef455a1", "workflow_name": "mytest.1", "retention_rules": [ { "id": "851da5cf-0b26-40c5-97a1-9acdbb35aac7", "workspace_files": "**/*.tmp", "retention_days": 1, "apply_on": "2022-11-24T23:59:59", "status": "active" } ] } 404: description: >- Request failed. User or workflow do not exist. schema: type: object properties: message: type: string examples: application/json: { "message": "User 00000000-0000-0000-0000-000000000000 does not exist." } 500: description: >- Request failed. Internal server error. schema: type: object properties: message: type: string examples: application/json: { "message": "Something went wrong." } """ try: workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, user) rules = workflow.retention_rules.all() response = { "workflow_id": workflow.id_, "workflow_name": workflow.get_full_workflow_name(), "retention_rules": [rule.serialize() for rule in rules], } return jsonify(response), 200 except ValueError as e: logging.exception(str(e)) return jsonify({"message": str(e)}), 404 except Exception as e: logging.exception(str(e)) return jsonify({"message": str(e)}), 500