Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions python/fate_flow/scheduler/federated_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,17 @@ def resource_for_job(cls, job, operation_type: ResourceOperation, specific_dest=
else:
schedule_logger(job.f_job_id).info(f"{operation_type} job resource failed")
return status_code, response


@classmethod
def resource_for_task(cls, job, task, operation_type: ResourceOperation, specific_dest=None):
schedule_logger(task.f_job_id).info("task {} {} {} resource".format(task.f_task_id,task.f_task_version,operation_type ))
status_code, response = cls.task_command(job=job,task=task, command=f"resource/{operation_type.value}",specific_dest=None)
if status_code == FederatedSchedulingStatusCode.SUCCESS:
schedule_logger(task.f_job_id).info(f"task {task.f_task_id} {task.f_task_version} {operation_type} resource successfully")
else:
schedule_logger(task.f_job_id).info(f"task {task.f_task_id} {task.f_task_version} {operation_type} resource failed")
return status_code, response

@classmethod
def check_component(cls, job, check_type, specific_dest=None):
schedule_logger(job.f_job_id).info(f"try to check component inheritance dependence")
Expand Down Expand Up @@ -234,13 +244,26 @@ def clean_task(cls, job, task, content_type: TaskCleanResourceType):
return status_code, response

@classmethod
def task_command(cls, job: Job, task: Task, command, command_body=None, parallel=False, need_user=False):
def task_command(cls, job: Job, task: Task, command, command_body=None, parallel=False, need_user=False,specific_dest=None):
msg = f"execute federated task {task.f_component_name} command({command})"
federated_response = {}
if specific_dest:
dest = specific_dest.items()
else:
dest = job.f_roles.items()

job_parameters = job.f_runtime_conf_on_party["job_parameters"]
tasks = JobSaver.query_task(task_id=task.f_task_id, only_latest=True)
threads = []

targets = {}
for dest_role, dest_party_ids in dest:
for dest_party_id in dest_party_ids:
targets[str(dest_party_id)] = dest_role

for task in tasks:
if task.f_party_id not in targets:
continue
dest_role, dest_party_id = task.f_role, task.f_party_id
federated_response[dest_role] = federated_response.get(dest_role, {})
endpoint = f"/party/{task.f_job_id}/{task.f_component_name}/{task.f_task_id}/{task.f_task_version}/{dest_role}/{dest_party_id}/{command}"
Expand Down
41 changes: 38 additions & 3 deletions python/fate_flow/scheduler/task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from fate_flow.utils import job_utils
from fate_flow.scheduler.federated_scheduler import FederatedScheduler
from fate_flow.operation.job_saver import JobSaver
from fate_flow.entity.types import ResourceOperation
from fate_flow.utils.log_utils import schedule_logger
from fate_flow.manager.resource_manager import ResourceManager
from fate_flow.controller.job_controller import JobController
Expand Down Expand Up @@ -103,17 +104,50 @@ def schedule(cls, job, dsl_parser, canceled=False):
@classmethod
def start_task(cls, job, task):
schedule_logger(task.f_job_id).info("try to start task {} {} on {} {}".format(task.f_task_id, task.f_task_version, task.f_role, task.f_party_id))
apply_status = ResourceManager.apply_for_task_resource(task_info=task.to_human_model_dict(only_primary_with=["status"]))
if not apply_status:

apply_status_code, federated_response = FederatedScheduler.resource_for_task(job=job,task=task, operation_type=ResourceOperation.APPLY)
if apply_status_code != FederatedSchedulingStatusCode.SUCCESS:
# rollback resource
job_id = job.f_job_id
rollback_party = {}
failed_party = {}
for dest_role in federated_response.keys():
for dest_party_id in federated_response[dest_role].keys():
retcode = federated_response[dest_role][dest_party_id]["retcode"]
if retcode == 0:
rollback_party[dest_role] = rollback_party.get(dest_role, [])
rollback_party[dest_role].append(dest_party_id)
else:
failed_party[dest_role] = failed_party.get(dest_role, [])
failed_party[dest_role].append(dest_party_id)
schedule_logger(job_id).info("task {} {} apply resource failed on {}, rollback {}".format(
task.f_task_id,
task.f_task_version,
",".join([",".join([f"{_r}:{_p}" for _p in _ps]) for _r, _ps in failed_party.items()]),
",".join([",".join([f"{_r}:{_p}" for _p in _ps]) for _r, _ps in rollback_party.items()]),
))
if rollback_party:
return_status_code, federated_response = FederatedScheduler.resource_for_task(job=job,task=task, operation_type=ResourceOperation.RETURN, specific_dest=rollback_party)
if return_status_code != FederatedSchedulingStatusCode.SUCCESS:
schedule_logger(job_id).info(f"task {task.f_task_id} {task.f_task_version} return resource failed:\n{federated_response}")
else:
schedule_logger(job_id).info(f"task {task.f_task_id} {task.f_task_version} no party should be rollback resource")
if apply_status_code == FederatedSchedulingStatusCode.ERROR:
FederatedScheduler.stop_task(job_id=job_id,task=task, stop_status=TaskStatus.FAILED)
schedule_logger(job_id).info(f"task {task.f_task_id} {task.f_task_version} apply resource error, stop job")
return SchedulingStatusCode.NO_RESOURCE

task.f_status = TaskStatus.RUNNING
update_status = JobSaver.update_task_status(task_info=task.to_human_model_dict(only_primary_with=["status"]))
if not update_status:
# Another scheduler scheduling the task
schedule_logger(task.f_job_id).info("task {} {} start on another scheduler".format(task.f_task_id, task.f_task_version))
# Rollback
task.f_status = TaskStatus.WAITING
ResourceManager.return_task_resource(task_info=task.to_human_model_dict(only_primary_with=["status"]))
return_status_code, federated_response = FederatedScheduler.resource_for_task(job=job,task=task, operation_type=ResourceOperation.RETURN)
if return_status_code != FederatedSchedulingStatusCode.SUCCESS:
schedule_logger(job_id).info(f"task {task.f_task_id} {task.f_task_version} return resource failed:\n{federated_response}")
return FederatedSchedulingStatusCode.ERROR
return SchedulingStatusCode.PASS
schedule_logger(task.f_job_id).info("start task {} {} on {} {}".format(task.f_task_id, task.f_task_version, task.f_role, task.f_party_id))
FederatedScheduler.sync_task_status(job=job, task=task)
Expand All @@ -122,6 +156,7 @@ def start_task(cls, job, task):
return SchedulingStatusCode.SUCCESS
else:
return SchedulingStatusCode.FAILED


@classmethod
def prepare_rerun_task(cls, job: Job, task: Task, dsl_parser, auto=False, force=False):
Expand Down
33 changes: 33 additions & 0 deletions python/fate_flow/scheduling_apps/party_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,36 @@ def clean_task(job_id, component_name, task_id, task_version, role, party_id, co
return get_json_result(retcode=0, retmsg='success')



@manager.route('/<job_id>/<component_name>/<task_id>/<task_version>/<role>/<party_id>/resource/apply', methods=['POST'])
def apply_task_resource(job_id,component_name, task_id, task_version, role, party_id):
task_info = {}
task_info.update({
"job_id": job_id,
"task_id": task_id,
"task_version": task_version,
"role": role,
"party_id": party_id
})
status = ResourceManager.apply_for_task_resource(task_info)
if status:
return get_json_result(retcode=0, retmsg='success')
else:
return get_json_result(retcode=RetCode.OPERATING_ERROR, retmsg=f"apply for task {task_id} resource failed")


@manager.route('/<job_id>/<component_name>/<task_id>/<task_version>/<role>/<party_id>/resource/return', methods=['POST'])
def return_task_resource(job_id,component_name, task_id, task_version, role, party_id):
task_info = {}
task_info.update({
"job_id": job_id,
"task_id": task_id,
"task_version": task_version,
"role": role,
"party_id": party_id,
})
status = ResourceManager.return_task_resource(task_info)
if status:
return get_json_result(retcode=0, retmsg='success')
else:
return get_json_result(retcode=RetCode.OPERATING_ERROR, retmsg=f"return task {task_id} resource failed")