# MIT License
# Copyright (c) 2021 MARL @ SJTU
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import traceback
from typing import List, Generator, Any
from abc import abstractmethod, ABC
import ray
from malib.utils.logging import Logger
from malib.remote.interface import RemoteInterface
[docs]class Manager(ABC):
@abstractmethod
def __init__(self, verbose: bool):
self._force_stop = False
self.pending_tasks = []
self.verbose = verbose
[docs] def is_running(self):
return len(self.pending_tasks) > 0
[docs] def force_stop(self):
self._force_stop = True
@property
def workers(self) -> List[RemoteInterface]:
raise NotImplementedError
[docs] def retrive_results(self):
raise NotImplementedError
[docs] def wait(self) -> List[Any]:
"""Wait workers to be terminated, and retrieve the executed results.
Returns:
List[Any]: A list of results.
"""
collected_rets = []
for res in self.retrive_results():
collected_rets.append(res)
return collected_rets
[docs] def cancel_pending_tasks(self):
"""Cancle all running tasks."""
rets = None
try:
ray.get([w.stop_pending_tasks.remote() for w in self.workers])
rets = self.wait()
except Exception as e:
Logger.warning(
"tray to cancel pending tasks, but met some exception: {}".format(e)
)
finally:
self.pending_tasks = []
return rets
[docs] @abstractmethod
def terminate(self):
"""Resource recall"""
self.cancel_pending_tasks()