Source code for malib.algorithm.common.misc

import torch
import numpy as np
import torch.nn.functional as F

from torch.autograd import Variable

from malib.utils.typing import Dict, List, DataTransferType, Any


[docs]def soft_update(target, source, tau): """Perform DDPG soft update (move target params toward source based on weight factor tau). Reference: https://github.com/ikostrikov/pytorch-ddpg-naf/blob/master/ddpg.py#L11 :param torch.nn.Module target: Net to copy parameters to :param torch.nn.Module source: Net whose parameters to copy :param float tau: Range form 0 to 1, weight factor for update """ for target_param, param in zip(target.parameters(), source.parameters()): target_param.data.copy_(target_param.data * (1.0 - tau) + param.data * tau)
[docs]def hard_update(target, source): """Copy network parameters from source to target. Reference: https://github.com/ikostrikov/pytorch-ddpg-naf/blob/master/ddpg.py#L15 :param torch.nn.Module target: Net to copy parameters to. :param torch.nn.Module source: Net whose parameters to copy """ for target_param, param in zip(target.parameters(), source.parameters()): target_param.data.copy_(param.data)
[docs]def onehot_from_logits(logits, eps=0.0): """ Given batch of logits, return one-hot sample using epsilon greedy strategy (based on given epsilon) """ # get best (according to current policy) actions in one-hot form argmax_acs = (logits == logits.max(-1, keepdim=True)[0]).float() if eps == 0.0: return argmax_acs # get random actions in one-hot form rand_acs = Variable( torch.eye(logits.shape[1])[ [np.random.choice(range(logits.shape[1]), size=logits.shape[0])] ], requires_grad=False, ) # chooses between best and random actions using epsilon greedy return torch.stack( [ argmax_acs[i] if r > eps else rand_acs[i] for i, r in enumerate(torch.rand(logits.shape[0])) ] )
[docs]def sample_gumbel(shape, eps=1e-20, tens_type=torch.FloatTensor): """Sample from Gumbel(0, 1). Note: modified for PyTorch from https://github.com/ericjang/gumbel-softmax/blob/master/Categorical%20VAE.ipynb """ U = Variable(tens_type(*shape).uniform_(), requires_grad=False) return -torch.log(-torch.log(U + eps) + eps)
[docs]def gumbel_softmax_sample(logits, temperature, explore: bool = True): """Draw a sample from the Gumbel-Softmax distribution. Note: modified for PyTorch from https://github.com/ericjang/gumbel-softmax/blob/master/Categorical%20VAE.ipynb """ y = logits if explore: y += sample_gumbel(logits.shape, tens_type=type(logits.data)) return F.softmax(y / temperature, dim=-1)
[docs]def gumbel_softmax(logits: DataTransferType, temperature=1.0, hard=False, explore=True): """Sample from the Gumbel-Softmax distribution and optionally discretize. Note: modified for PyTorch from https://github.com/ericjang/gumbel-softmax/blob/master/Categorical%20VAE.ipynb :param DataTransferType logits: Unnormalized log-probs. :param float temperature: Non-negative scalar. :param bool hard: If ture take argmax, but differentiate w.r.t. soft sample y :returns [batch_size, n_class] sample from the Gumbel-Softmax distribution. If hard=True, then the returned sample will be one-hot, otherwise it will be a probability distribution that sums to 1 across classes """ y = gumbel_softmax_sample(logits, temperature, explore) if hard: y_hard = onehot_from_logits(y) y = (y_hard - y).detach() + y return y
[docs]def masked_softmax(logits: torch.Tensor, mask: torch.Tensor): probs = F.softmax(logits, dim=-1) * mask probs = probs + (mask.sum(dim=-1, keepdim=True) == 0.0).to(dtype=torch.float32) Z = probs.sum(dim=-1, keepdim=True) return probs / Z
# def non_centered_rmsprop( # gradient: Union[torch.Tensor, DataTransferType], # delta: Union[torch.Tensor, DataTransferType], # alpha: float, # eta: float, # eps: float, # ): # """Implementation of non-centered RMSProb algorithm (# TODO(ming): add reference here) # :param gradient: Union[torch.Tensor, DataTransferType], bootstrapped gradient # :param delta: Union[torch.Tensor, DataTransferType] # :param alpha: float, moving factor # :param eta: flat, learning step # :param eps: float, control exploration # :return: # """ # gradient = alpha * gradient + (1.0 - alpha) * delta ** 2 # delta = -eta * delta / np.sqrt(gradient + eps) # return delta
[docs]class GradientOps:
[docs] @staticmethod def add(source: Any, delta: Any): """Apply gradients (delta) to parameters (source)""" if isinstance(source, Dict) and isinstance(delta, Dict): for k, v in delta.items(): if isinstance(v, Dict): source[k] = GradientOps.add(source[k], v) else: # if isinstance(v, DataTransferType): assert source[k].data.shape == v.shape, ( source[k].data.shape, v.shape, ) if isinstance(v, np.ndarray): source[k].data.copy_(source[k].data + v) elif isinstance(v, torch.Tensor): source[k].data.copy_(source[k].data + v.data) else: raise TypeError( "Inner type of delta should be numpy.ndarray or torch.Tensor, but `{}` detected".format( type(v) ) ) elif isinstance(source, torch.Tensor): if isinstance(delta, torch.Tensor): source.data.copy_(source.data + delta.data) elif isinstance(delta, np.ndarray): source.data.copy_(source.data + delta) else: raise TypeError("Unexpected delta type: {}".format(type(delta))) else: raise TypeError( "Source data must be a dict or torch tensor but got: {}".format( type(source) ) ) return source
[docs] @staticmethod def mean(gradients: List): if len(gradients) < 1: return gradients if isinstance(gradients[0], dict): keys = list(gradients[0].keys()) res = {} for k in keys: res[k] = GradientOps.mean([grad[k] for grad in gradients]) return res elif isinstance(gradients[0], np.ndarray): res = np.mean(gradients, axis=0) return res elif isinstance(gradients[0], torch.Tensor): raise NotImplementedError( "Do not support tensor-based gradients aggragation yet." ) else: raise TypeError("Illegal data type: {}".format(type(gradients[0])))
[docs] @staticmethod def sum(gradients: List): """Sum gradients. :param List gradients: A list of gradients. :return: """ if len(gradients) < 1: return gradients if isinstance(gradients[0], dict): keys = list(gradients[0].keys()) res = {} for k in keys: res[k] = GradientOps.sum([grad[k] for grad in gradients]) return res elif isinstance( gradients[0], np.ndarray ): # if isinstance(gradients[0], DataTransferType): res = np.sum(gradients, axis=0) return res elif isinstance(gradients[0], torch.Tensor): raise NotImplementedError( "Do not support tensor-based gradients aggragation yet." ) else: raise TypeError("Illegal data type: {}".format(type(gradients[0])))
[docs]class OUNoise: """https://github.com/songrotek/DDPG/blob/master/ou_noise.py""" def __init__(self, action_dimension: int, scale=0.1, mu=0, theta=0.15, sigma=0.2): self.action_dimension = action_dimension self.scale = scale self.mu = mu self.theta = theta self.sigma = sigma self.state = np.ones(self.action_dimension) * self.mu self.reset()
[docs] def reset(self): self.state = np.ones(self.action_dimension) * self.mu
[docs] def noise(self): x = self.state dx = self.theta * (self.mu - x) + self.sigma * np.random.randn(len(x)) self.state = x + dx return self.state * self.scale
[docs]class EPSGreedy: def __init__(self, action_dimension: int, threshold: float = 0.3): self._action_dim = action_dimension self._threshold = threshold