# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2020, 2021, 2022 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""REANA Workflow Controller status REST API."""
import json
from flask import Blueprint, jsonify, request
from reana_commons.config import WORKFLOW_TIME_FORMAT
from reana_commons.errors import REANASecretDoesNotExist
from reana_db.utils import _get_workflow_with_uuid_or_name
from reana_workflow_controller.errors import (
REANAExternalCallError,
REANAWorkflowControllerError,
REANAWorkflowStatusError,
)
from reana_workflow_controller.rest.utils import (
build_workflow_logs,
delete_workflow,
get_workflow_name,
get_workflow_progress,
start_workflow,
stop_workflow,
use_paginate_args,
)
START = "start"
STOP = "stop"
DELETED = "deleted"
STATUSES = {START, STOP, DELETED}
blueprint = Blueprint("statuses", __name__)
[docs]@blueprint.route("/workflows/<workflow_id_or_name>/logs", methods=["GET"])
@use_paginate_args()
def get_workflow_logs(workflow_id_or_name, paginate=None, **kwargs): # noqa
r"""Get workflow logs from a workflow engine.
---
get:
summary: Returns logs of a specific workflow from a workflow engine.
description: >-
This resource is expecting a workflow UUID and a filename to return
its outputs.
operationId: get_workflow_logs
produces:
- application/json
parameters:
- name: user
in: query
description: Required. UUID of workflow owner.
required: true
type: string
- name: workflow_id_or_name
in: path
description: Required. Workflow UUID or name.
required: true
type: string
- name: steps
in: body
description: Steps of a workflow.
required: false
schema:
type: array
description: List of step names to get logs for.
items:
type: string
description: Step name.
- name: page
in: query
description: Results page number (pagination).
required: false
type: integer
- name: size
in: query
description: Number of results per page (pagination).
required: false
type: integer
responses:
200:
description: >-
Request succeeded. Info about workflow, including the status is
returned.
schema:
type: object
properties:
workflow_id:
type: string
workflow_name:
type: string
logs:
type: string
user:
type: string
examples:
application/json:
{
"workflow_id": "256b25f4-4cfb-4684-b7a8-73872ef455a1",
"workflow_name": "mytest-1",
"logs": "{'workflow_logs': string,
'job_logs': {
'256b25f4-4cfb-4684-b7a8-73872ef455a2': string,
'256b25f4-4cfb-4684-b7a8-73872ef455a3': string,
},
'engine_specific': object,
}",
"user": "00000000-0000-0000-0000-000000000000"
}
400:
description: >-
Request failed. The incoming data specification seems malformed.
404:
description: >-
Request failed. User does not exist.
examples:
application/json:
{
"message": "User 00000000-0000-0000-0000-000000000000 does not
exist"
}
500:
description: >-
Request failed. Internal controller error.
examples:
application/json:
{
"message": "Internal workflow controller error."
}
"""
try:
user_uuid = request.args["user"]
workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, user_uuid)
steps = None
if request.is_json:
steps = request.json
if steps:
workflow_logs = {
"workflow_logs": None,
"job_logs": build_workflow_logs(workflow, steps, paginate=paginate),
"engine_specific": None,
}
else:
workflow_logs = {
"workflow_logs": workflow.logs,
"job_logs": build_workflow_logs(workflow, paginate=paginate),
"engine_specific": workflow.engine_specific,
}
return (
jsonify(
{
"workflow_id": workflow.id_,
"workflow_name": get_workflow_name(workflow),
"logs": json.dumps(workflow_logs),
"user": user_uuid,
}
),
200,
)
except ValueError:
return (
jsonify(
{
"message": "REANA_WORKON is set to {0}, but "
"that workflow does not exist. "
"Please set your REANA_WORKON environment "
"variable appropriately.".format(workflow_id_or_name)
}
),
404,
)
except KeyError as e:
return jsonify({"message": str(e)}), 400
except Exception as e:
return jsonify({"message": str(e)}), 500
[docs]@blueprint.route("/workflows/<workflow_id_or_name>/status", methods=["GET"])
def get_workflow_status(workflow_id_or_name): # noqa
r"""Get workflow status.
---
get:
summary: Get workflow status.
description: >-
This resource reports the status of workflow.
operationId: get_workflow_status
produces:
- application/json
parameters:
- name: user
in: query
description: Required. UUID of workflow owner.
required: true
type: string
- name: workflow_id_or_name
in: path
description: Required. Workflow UUID or name.
required: true
type: string
responses:
200:
description: >-
Request succeeded. Info about workflow, including the status is
returned.
schema:
type: object
properties:
id:
type: string
name:
type: string
created:
type: string
status:
type: string
user:
type: string
logs:
type: string
progress:
type: object
examples:
application/json:
{
"id": "256b25f4-4cfb-4684-b7a8-73872ef455a1",
"name": "mytest-1",
"created": "2018-06-13T09:47:35.66097",
"status": "running",
"user": "00000000-0000-0000-0000-000000000000"
}
400:
description: >-
Request failed. The incoming data specification seems malformed.
examples:
application/json:
{
"message": "Malformed request."
}
403:
description: >-
Request failed. User is not allowed to access workflow.
examples:
application/json:
{
"message": "User 00000000-0000-0000-0000-000000000000
is not allowed to access workflow
256b25f4-4cfb-4684-b7a8-73872ef455a1"
}
404:
description: >-
Request failed. Either User or Workflow does not exist.
examples:
application/json:
{
"message": "User 00000000-0000-0000-0000-000000000000 does not
exist"
}
application/json:
{
"message": "Workflow 256b25f4-4cfb-4684-b7a8-73872ef455a1
does not exist"
}
500:
description: >-
Request failed. Internal controller error.
"""
try:
user_uuid = request.args["user"]
workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, user_uuid)
workflow_logs = build_workflow_logs(workflow)
return (
jsonify(
{
"id": workflow.id_,
"name": get_workflow_name(workflow),
"created": workflow.created.strftime(WORKFLOW_TIME_FORMAT),
"status": workflow.status.name,
"progress": get_workflow_progress(workflow, include_progress=True),
"user": user_uuid,
"logs": json.dumps(workflow_logs),
}
),
200,
)
except ValueError:
return (
jsonify(
{
"message": "REANA_WORKON is set to {0}, but "
"that workflow does not exist. "
"Please set your REANA_WORKON environment "
"variable appropriately.".format(workflow_id_or_name)
}
),
404,
)
except KeyError as e:
return jsonify({"message": str(e)}), 400
except Exception as e:
return jsonify({"message": str(e)}), 500
[docs]@blueprint.route("/workflows/<workflow_id_or_name>/status", methods=["PUT"])
def set_workflow_status(workflow_id_or_name): # noqa
r"""Set workflow status.
---
put:
summary: Set workflow status.
description: >-
This resource sets the status of workflow.
operationId: set_workflow_status
produces:
- application/json
parameters:
- name: user
in: query
description: Required. UUID of workflow owner.
required: true
type: string
- name: workflow_id_or_name
in: path
description: Required. Workflow UUID or name.
required: true
type: string
- name: status
in: query
description: Required. New status.
required: true
type: string
enum:
- start
- stop
- deleted
- name: parameters
in: body
description: >-
Optional. Additional input parameters and operational options for
workflow execution. Possible parameters are `CACHE=on/off`, passed
to disable caching of results in serial workflows,
`all_runs=True/False` deletes all runs of a given workflow
if status is set to deleted and `workspace=True/False` which deletes
the workspace of a workflow.
required: false
schema:
type: object
properties:
CACHE:
type: string
all_runs:
type: boolean
workspace:
type: boolean
responses:
200:
description: >-
Request succeeded. Info about workflow, including the status is
returned.
schema:
type: object
properties:
message:
type: string
workflow_id:
type: string
workflow_name:
type: string
status:
type: string
user:
type: string
examples:
application/json:
{
"message": "Workflow successfully launched",
"workflow_id": "256b25f4-4cfb-4684-b7a8-73872ef455a1",
"workflow_name": "mytest-1",
"status": "running",
"user": "00000000-0000-0000-0000-000000000000"
}
400:
description: >-
Request failed. The incoming data specification seems malformed.
examples:
application/json:
{
"message": "Malformed request."
}
403:
description: >-
Request failed. User is not allowed to access workflow.
examples:
application/json:
{
"message": "User 00000000-0000-0000-0000-000000000000
is not allowed to access workflow
256b25f4-4cfb-4684-b7a8-73872ef455a1"
}
404:
description: >-
Request failed. Either User or Workflow does not exist.
examples:
application/json:
{
"message": "User 00000000-0000-0000-0000-000000000000 does not
exist"
}
application/json:
{
"message": "Workflow 256b25f4-4cfb-4684-b7a8-73872ef455a1
does not exist"
}
409:
description: >-
Request failed. The workflow could not be started due to a
conflict.
examples:
application/json:
{
"message": "Workflow 256b25f4-4cfb-4684-b7a8-73872ef455a1
could not be started because it is already
running."
}
500:
description: >-
Request failed. Internal controller error.
501:
description: >-
Request failed. The specified status change is not implemented.
examples:
application/json:
{
"message": "Status resume is not supported yet."
}
502:
description: >-
Request failed. Connection to a third party system has failed.
examples:
application/json:
{
"message": "Connection to database timed out, please retry."
}
"""
try:
user_uuid = request.args["user"]
workflow = _get_workflow_with_uuid_or_name(workflow_id_or_name, user_uuid)
status = request.args.get("status")
if not (status in STATUSES):
return (
jsonify(
{
"message": "Status {0} is not one of: {1}".format(
status, ", ".join(STATUSES)
)
}
),
400,
)
parameters = {}
if request.is_json:
parameters = request.json
if status == START:
start_workflow(workflow, parameters)
return (
jsonify(
{
"message": "Workflow successfully launched",
"workflow_id": str(workflow.id_),
"workflow_name": get_workflow_name(workflow),
"status": workflow.status.name,
"user": str(workflow.owner_id),
}
),
200,
)
elif status == DELETED:
all_runs = True if request.json.get("all_runs") else False
workspace = True if request.json.get("workspace", True) else False
if not workspace:
return (
jsonify(
{
"message": "Workspace must always be deleted when deleting a workflow.",
}
),
400,
)
return delete_workflow(workflow, all_runs, workspace)
if status == STOP:
stop_workflow(workflow)
return (
jsonify(
{
"message": "Workflow successfully stopped",
"workflow_id": workflow.id_,
"workflow_name": get_workflow_name(workflow),
"status": workflow.status.name,
"user": str(workflow.owner_id),
}
),
200,
)
else:
raise NotImplementedError("Status {} is not supported yet".format(status))
except ValueError:
return (
jsonify(
{
"message": "REANA_WORKON is set to {0}, but "
"that workflow does not exist. "
"Please set your REANA_WORKON environment "
"variable appropriately.".format(workflow_id_or_name)
}
),
404,
)
except REANAWorkflowControllerError as e:
return jsonify({"message": str(e)}), 409
except REANAWorkflowStatusError as e:
return jsonify({"message": str(e)}), 404
except (REANASecretDoesNotExist, KeyError) as e:
return jsonify({"message": str(e)}), 400
except NotImplementedError as e:
return jsonify({"message": str(e)}), 501
except REANAExternalCallError as e:
return jsonify({"message": str(e)}), 502
except Exception as e:
return jsonify({"message": str(e)}), 500