malib.agent package

Submodules

malib.agent.agent_interface module

class malib.agent.agent_interface.AgentInterface(experiment_tag: str, runtime_id: str, log_dir: str, env_desc: Dict[str, Any], algorithms: Dict[str, Tuple[Type, Type, Dict, Dict]], agent_mapping_func: Callable[[str], str], governed_agents: Tuple[str], trainer_config: Dict[str, Any], custom_config: Optional[Dict[str, Any]] = None, local_buffer_config: Optional[Dict] = None, verbose: bool = True)[source]

Bases: RemoteInterface, ABC

Base class of agent interface, for training

Construct agent interface for training.

Parameters:
  • experiment_tag (str) – Experiment tag.

  • runtime_id (str) – Assigned runtime id, should be an element of the agent mapping results.

  • log_dir (str) – The directory for logging.

  • env_desc (Dict[str, Any]) – A dict that describes the environment property.

  • algorithms (Dict[str, Tuple[Type, Type, Dict]]) – A dict that describes the algorithm candidates. Each is a tuple of policy_cls, trainer_cls, model_config and custom_config.

  • agent_mapping_func (Callable[[AgentID], str]) – A function that defines the rule of agent groupping.

  • governed_agents (Tuple[AgentID]) – A tuple that records which agents is related to this training procedures. Note that it should be a subset of the original set of environment agents.

  • trainer_config (Dict[str, Any]) – Trainer configuration.

  • custom_config (Dict[str, Any], optional) – A dict of custom configuration. Defaults to None.

  • local_buffer_config (Dict, optional) – A dict for local buffer configuration. Defaults to None.

  • verbose (bool, True) – Enable logging or not. Defaults to True.

add_policies(n: int) StrategySpec[source]

Construct n new policies and return the latest strategy spec.

Parameters:

n (int) – Indicates how many new policies will be added.

Returns:

The latest strategy spec instance.

Return type:

StrategySpec

connect(max_tries: int = 10, dataset_server_ref: Optional[str] = None, parameter_server_ref: Optional[str] = None)[source]

Try to connect with backend, i.e., parameter server and offline dataset server. If the reference of dataset server or parameter server is not been given, then the agent will use default settings.

Parameters:
  • max_tries (int, optional) – Maximum of trails. Defaults to 10.

  • dataset_server_ref (str, optional) – Name of ray-based dataset server. Defaults to None.

  • parameter_server_ref (str, optional) – Name of ray-based parameter server. Defaults to None.

property device: Union[str, DeviceObjType]

Retrive device name.

Returns:

Device name.

Return type:

Union[str, torch.DeviceObjType]

get_algorithm(key: str) Any[source]

Return a copy of algorithm configuration with given key, if not exist, raise KeyError.

Parameters:

key (str) – Algorithm configuration reference key.

Raises:

KeyError – No such an algorithm configuration relates to the give key.

Returns:

Algorithm configuration, mabe a dict.

Return type:

Any

get_algorthms() Dict[str, Any][source]

Return a copy of full algorithm configurations.

Returns:

Full algorithm configurations.

Return type:

Dict[str, Any]

get_interface_state() Dict[str, Any][source]

Return a dict that describes the current learning state.

Returns:

A dict of learning state.

Return type:

Dict[str, Any]

property governed_agents: Tuple[str]

Return a tuple of governed environment agents.

Returns:

A tuple of agent ids.

Return type:

Tuple[str]

abstract multiagent_post_process(batch_info: Union[Dict[str, Tuple[Batch, List[int]]], Tuple[Batch, List[int]]]) Dict[str, Any][source]

Merge agent buffer here and return the merged buffer.

Parameters:

batch_info (Union[Dict[AgentID, Tuple[Batch, List[int]]], Tuple[Batch, List[int]]]) – Batch info, could be a dict of agent batch info or a tuple.

Returns:

A merged buffer dict.

Return type:

Dict[str, Any]

pull()[source]

Pull remote weights to update local version.

push()[source]

Push local weights to remote server

reset()[source]

Reset training state.

sync_remote_parameters()[source]

Push latest network parameters of active policies to remote parameter server.

train(data_request_identifier: str, reset_state: bool = True) Dict[str, Any][source]

Executes training task and returns the final interface state.

Parameters:
  • stopping_conditions (Dict[str, Any]) – Control the training stepping.

  • reset_tate (bool, optional) – Reset interface state or not. Default is True.

Returns:

A dict that describes the final state.

Return type:

Dict[str, Any]

malib.agent.async_agent module

class malib.agent.async_agent.AsyncAgent(experiment_tag: str, runtime_id: str, log_dir: str, env_desc: Dict[str, Any], algorithms: Dict[str, Tuple[Type, Type, Dict, Dict]], agent_mapping_func: Callable[[str], str], governed_agents: Tuple[str], trainer_config: Dict[str, Any], custom_config: Optional[Dict[str, Any]] = None, local_buffer_config: Optional[Dict] = None, verbose: bool = True)[source]

Bases: AgentInterface

Construct agent interface for training.

Parameters:
  • experiment_tag (str) – Experiment tag.

  • runtime_id (str) – Assigned runtime id, should be an element of the agent mapping results.

  • log_dir (str) – The directory for logging.

  • env_desc (Dict[str, Any]) – A dict that describes the environment property.

  • algorithms (Dict[str, Tuple[Type, Type, Dict]]) – A dict that describes the algorithm candidates. Each is a tuple of policy_cls, trainer_cls, model_config and custom_config.

  • agent_mapping_func (Callable[[AgentID], str]) – A function that defines the rule of agent groupping.

  • governed_agents (Tuple[AgentID]) – A tuple that records which agents is related to this training procedures. Note that it should be a subset of the original set of environment agents.

  • trainer_config (Dict[str, Any]) – Trainer configuration.

  • custom_config (Dict[str, Any], optional) – A dict of custom configuration. Defaults to None.

  • local_buffer_config (Dict, optional) – A dict for local buffer configuration. Defaults to None.

  • verbose (bool, True) – Enable logging or not. Defaults to True.

malib.agent.indepdent_agent module

class malib.agent.indepdent_agent.IndependentAgent(experiment_tag: str, runtime_id: str, log_dir: str, env_desc: Dict[str, Any], algorithms: Dict[str, Tuple[Dict, Dict, Dict]], agent_mapping_func: Callable[[str], str], governed_agents: Tuple[str], trainer_config: Dict[str, Any], custom_config: Optional[Dict[str, Any]] = None, local_buffer_config: Optional[Dict] = None, verbose: bool = True)[source]

Bases: AgentInterface

Construct agent interface for training.

Parameters:
  • experiment_tag (str) – Experiment tag.

  • runtime_id (str) – Assigned runtime id, should be an element of the agent mapping results.

  • log_dir (str) – The directory for logging.

  • env_desc (Dict[str, Any]) – A dict that describes the environment property.

  • algorithms (Dict[str, Tuple[Type, Type, Dict]]) – A dict that describes the algorithm candidates. Each is a tuple of policy_cls, trainer_cls, model_config and custom_config.

  • agent_mapping_func (Callable[[AgentID], str]) – A function that defines the rule of agent groupping.

  • governed_agents (Tuple[AgentID]) – A tuple that records which agents is related to this training procedures. Note that it should be a subset of the original set of environment agents.

  • trainer_config (Dict[str, Any]) – Trainer configuration.

  • custom_config (Dict[str, Any], optional) – A dict of custom configuration. Defaults to None.

  • local_buffer_config (Dict, optional) – A dict for local buffer configuration. Defaults to None.

  • verbose (bool, True) – Enable logging or not. Defaults to True.

multiagent_post_process(batch_info: Union[Dict[str, Tuple[Batch, List[int]]], Tuple[Batch, List[int]]]) Dict[str, Any][source]

Merge agent buffer here and return the merged buffer.

Parameters:

batch_info (Union[Dict[AgentID, Tuple[Batch, List[int]]], Tuple[Batch, List[int]]]) – Batch info, could be a dict of agent batch info or a tuple.

Returns:

A merged buffer dict.

Return type:

Dict[str, Any]

malib.agent.manager module

class malib.agent.manager.TrainingManager(experiment_tag: str, stopping_conditions: Dict[str, Any], algorithms: Dict[str, Any], env_desc: Dict[str, Any], agent_mapping_func: Callable[[str], str], training_config: Dict[str, Any], log_dir: str, remote_mode: bool = True, resource_config: Optional[Dict[str, Any]] = None, verbose: bool = True)[source]

Bases: Manager

Create an TrainingManager instance which is responsible for the multi agent training tasks execution and rollout task requests sending.

Parameters:
  • experiment_tag (str) – Experiment identifier, for data tracking.

  • algorithms (Dict[str, Any]) – The algorithms configuration candidates.

  • env_desc (Dict[str, Any]) – The description for environment generation.

  • interface_config (Dict[str, Any]) – Configuration for agent training inferece construction, keys include type and custom_config, a dict.

  • agent_mapping_func (Callable[[AgentID], str]) – The mapping function maps agent id to training interface id.

  • training_config (Dict[str, Any]) – Training configuration, for agent interface, keys include type, trainer_config and custom_config.

  • log_dir (str) – Directory for logging.

  • remote_mode (bool, Optional) – Init agent interfaces as remote actor or not. Default is True.

add_policies(interface_ids: Optional[Sequence[str]] = None, n: Union[int, Dict[str, int]] = 1) Dict[str, Type[StrategySpec]][source]

Notify interface interface_id add n policies and return the newest strategy spec.

Parameters:
  • interface_ids (Sequence[str]) – Registered agent interface id.

  • n (int, optional) – Indicates how many policies will be added.

Returns:

A dict of strategy specs, maps from runtime ids to strategy specs.

Return type:

Dict[str, Type[StrategySpec]]

property agent_groups: Dict[str, Set[str]]

A dict describes the agent grouping, maps from runtime ids to agent sets.

Returns:

A dict of agent set.

Return type:

Dict[str, Set[AgentID]]

get_exp(policy_distribution)[source]

Compute exploitability

retrive_results()[source]
run(data_request_identifiers: Dict[str, str])[source]

Start training thread without blocking

property runtime_ids: Tuple[str]
terminate() None[source]

Terminate all training actors.

property workers: List[RemoteInterface]

malib.agent.team_agent module

class malib.agent.team_agent.TeamAgent(experiment_tag: str, runtime_id: str, log_dir: str, env_desc: Dict[str, Any], algorithms: Dict[str, Tuple[Type, Type, Dict, Dict]], agent_mapping_func: Callable[[str], str], governed_agents: Tuple[str], trainer_config: Dict[str, Any], custom_config: Optional[Dict[str, Any]] = None, local_buffer_config: Optional[Dict] = None, verbose: bool = True)[source]

Bases: AgentInterface

Construct agent interface for training.

Parameters:
  • experiment_tag (str) – Experiment tag.

  • runtime_id (str) – Assigned runtime id, should be an element of the agent mapping results.

  • log_dir (str) – The directory for logging.

  • env_desc (Dict[str, Any]) – A dict that describes the environment property.

  • algorithms (Dict[str, Tuple[Type, Type, Dict]]) – A dict that describes the algorithm candidates. Each is a tuple of policy_cls, trainer_cls, model_config and custom_config.

  • agent_mapping_func (Callable[[AgentID], str]) – A function that defines the rule of agent groupping.

  • governed_agents (Tuple[AgentID]) – A tuple that records which agents is related to this training procedures. Note that it should be a subset of the original set of environment agents.

  • trainer_config (Dict[str, Any]) – Trainer configuration.

  • custom_config (Dict[str, Any], optional) – A dict of custom configuration. Defaults to None.

  • local_buffer_config (Dict, optional) – A dict for local buffer configuration. Defaults to None.

  • verbose (bool, True) – Enable logging or not. Defaults to True.

multiagent_post_process(batch_info: Union[Dict[str, Tuple[Batch, List[int]]], Tuple[Batch, List[int]]]) Dict[str, Batch][source]

Merge agent buffer here and return the merged buffer.

Parameters:

batch_info (Union[Dict[AgentID, Tuple[Batch, List[int]]], Tuple[Batch, List[int]]]) – Batch info, could be a dict of agent batch info or a tuple.

Returns:

A merged buffer dict.

Return type:

Dict[str, Any]