diff --git a/python/fate_flow/scheduler/federated_scheduler.py b/python/fate_flow/scheduler/federated_scheduler.py index ca183bee7..8174b1643 100644 --- a/python/fate_flow/scheduler/federated_scheduler.py +++ b/python/fate_flow/scheduler/federated_scheduler.py @@ -56,7 +56,17 @@ def resource_for_job(cls, job, operation_type: ResourceOperation, specific_dest= else: schedule_logger(job.f_job_id).info(f"{operation_type} job resource failed") return status_code, response - + + @classmethod + def resource_for_task(cls, job, task, operation_type: ResourceOperation, specific_dest=None): + schedule_logger(task.f_job_id).info("task {} {} {} resource".format(task.f_task_id,task.f_task_version,operation_type )) + status_code, response = cls.task_command(job=job,task=task, command=f"resource/{operation_type.value}",specific_dest=None) + if status_code == FederatedSchedulingStatusCode.SUCCESS: + schedule_logger(task.f_job_id).info(f"task {task.f_task_id} {task.f_task_version} {operation_type} resource successfully") + else: + schedule_logger(task.f_job_id).info(f"task {task.f_task_id} {task.f_task_version} {operation_type} resource failed") + return status_code, response + @classmethod def check_component(cls, job, check_type, specific_dest=None): schedule_logger(job.f_job_id).info(f"try to check component inheritance dependence") @@ -234,13 +244,26 @@ def clean_task(cls, job, task, content_type: TaskCleanResourceType): return status_code, response @classmethod - def task_command(cls, job: Job, task: Task, command, command_body=None, parallel=False, need_user=False): + def task_command(cls, job: Job, task: Task, command, command_body=None, parallel=False, need_user=False,specific_dest=None): msg = f"execute federated task {task.f_component_name} command({command})" federated_response = {} + if specific_dest: + dest = specific_dest.items() + else: + dest = job.f_roles.items() + job_parameters = job.f_runtime_conf_on_party["job_parameters"] tasks = JobSaver.query_task(task_id=task.f_task_id, only_latest=True) threads = [] + + targets = {} + for dest_role, dest_party_ids in dest: + for dest_party_id in dest_party_ids: + targets[str(dest_party_id)] = dest_role + for task in tasks: + if task.f_party_id not in targets: + continue dest_role, dest_party_id = task.f_role, task.f_party_id federated_response[dest_role] = federated_response.get(dest_role, {}) endpoint = f"/party/{task.f_job_id}/{task.f_component_name}/{task.f_task_id}/{task.f_task_version}/{dest_role}/{dest_party_id}/{command}" diff --git a/python/fate_flow/scheduler/task_scheduler.py b/python/fate_flow/scheduler/task_scheduler.py index 2b68242b6..43126d6f3 100644 --- a/python/fate_flow/scheduler/task_scheduler.py +++ b/python/fate_flow/scheduler/task_scheduler.py @@ -22,6 +22,7 @@ from fate_flow.utils import job_utils from fate_flow.scheduler.federated_scheduler import FederatedScheduler from fate_flow.operation.job_saver import JobSaver +from fate_flow.entity.types import ResourceOperation from fate_flow.utils.log_utils import schedule_logger from fate_flow.manager.resource_manager import ResourceManager from fate_flow.controller.job_controller import JobController @@ -103,9 +104,39 @@ def schedule(cls, job, dsl_parser, canceled=False): @classmethod def start_task(cls, job, task): schedule_logger(task.f_job_id).info("try to start task {} {} on {} {}".format(task.f_task_id, task.f_task_version, task.f_role, task.f_party_id)) - apply_status = ResourceManager.apply_for_task_resource(task_info=task.to_human_model_dict(only_primary_with=["status"])) - if not apply_status: + + apply_status_code, federated_response = FederatedScheduler.resource_for_task(job=job,task=task, operation_type=ResourceOperation.APPLY) + if apply_status_code != FederatedSchedulingStatusCode.SUCCESS: + # rollback resource + job_id = job.f_job_id + rollback_party = {} + failed_party = {} + for dest_role in federated_response.keys(): + for dest_party_id in federated_response[dest_role].keys(): + retcode = federated_response[dest_role][dest_party_id]["retcode"] + if retcode == 0: + rollback_party[dest_role] = rollback_party.get(dest_role, []) + rollback_party[dest_role].append(dest_party_id) + else: + failed_party[dest_role] = failed_party.get(dest_role, []) + failed_party[dest_role].append(dest_party_id) + schedule_logger(job_id).info("task {} {} apply resource failed on {}, rollback {}".format( + task.f_task_id, + task.f_task_version, + ",".join([",".join([f"{_r}:{_p}" for _p in _ps]) for _r, _ps in failed_party.items()]), + ",".join([",".join([f"{_r}:{_p}" for _p in _ps]) for _r, _ps in rollback_party.items()]), + )) + if rollback_party: + return_status_code, federated_response = FederatedScheduler.resource_for_task(job=job,task=task, operation_type=ResourceOperation.RETURN, specific_dest=rollback_party) + if return_status_code != FederatedSchedulingStatusCode.SUCCESS: + schedule_logger(job_id).info(f"task {task.f_task_id} {task.f_task_version} return resource failed:\n{federated_response}") + else: + schedule_logger(job_id).info(f"task {task.f_task_id} {task.f_task_version} no party should be rollback resource") + if apply_status_code == FederatedSchedulingStatusCode.ERROR: + FederatedScheduler.stop_task(job_id=job_id,task=task, stop_status=TaskStatus.FAILED) + schedule_logger(job_id).info(f"task {task.f_task_id} {task.f_task_version} apply resource error, stop job") return SchedulingStatusCode.NO_RESOURCE + task.f_status = TaskStatus.RUNNING update_status = JobSaver.update_task_status(task_info=task.to_human_model_dict(only_primary_with=["status"])) if not update_status: @@ -113,7 +144,10 @@ def start_task(cls, job, task): schedule_logger(task.f_job_id).info("task {} {} start on another scheduler".format(task.f_task_id, task.f_task_version)) # Rollback task.f_status = TaskStatus.WAITING - ResourceManager.return_task_resource(task_info=task.to_human_model_dict(only_primary_with=["status"])) + return_status_code, federated_response = FederatedScheduler.resource_for_task(job=job,task=task, operation_type=ResourceOperation.RETURN) + if return_status_code != FederatedSchedulingStatusCode.SUCCESS: + schedule_logger(job_id).info(f"task {task.f_task_id} {task.f_task_version} return resource failed:\n{federated_response}") + return FederatedSchedulingStatusCode.ERROR return SchedulingStatusCode.PASS schedule_logger(task.f_job_id).info("start task {} {} on {} {}".format(task.f_task_id, task.f_task_version, task.f_role, task.f_party_id)) FederatedScheduler.sync_task_status(job=job, task=task) @@ -122,6 +156,7 @@ def start_task(cls, job, task): return SchedulingStatusCode.SUCCESS else: return SchedulingStatusCode.FAILED + @classmethod def prepare_rerun_task(cls, job: Job, task: Task, dsl_parser, auto=False, force=False): diff --git a/python/fate_flow/scheduling_apps/party_app.py b/python/fate_flow/scheduling_apps/party_app.py index 57582ce14..1e5a5f71a 100644 --- a/python/fate_flow/scheduling_apps/party_app.py +++ b/python/fate_flow/scheduling_apps/party_app.py @@ -237,3 +237,36 @@ def clean_task(job_id, component_name, task_id, task_version, role, party_id, co return get_json_result(retcode=0, retmsg='success') + +@manager.route('///////resource/apply', methods=['POST']) +def apply_task_resource(job_id,component_name, task_id, task_version, role, party_id): + task_info = {} + task_info.update({ + "job_id": job_id, + "task_id": task_id, + "task_version": task_version, + "role": role, + "party_id": party_id + }) + status = ResourceManager.apply_for_task_resource(task_info) + if status: + return get_json_result(retcode=0, retmsg='success') + else: + return get_json_result(retcode=RetCode.OPERATING_ERROR, retmsg=f"apply for task {task_id} resource failed") + + +@manager.route('///////resource/return', methods=['POST']) +def return_task_resource(job_id,component_name, task_id, task_version, role, party_id): + task_info = {} + task_info.update({ + "job_id": job_id, + "task_id": task_id, + "task_version": task_version, + "role": role, + "party_id": party_id, + }) + status = ResourceManager.return_task_resource(task_info) + if status: + return get_json_result(retcode=0, retmsg='success') + else: + return get_json_result(retcode=RetCode.OPERATING_ERROR, retmsg=f"return task {task_id} resource failed")