Source code for malib.backend.datapool.data_array

import abc
import math
import sys
from typing import Union, Iterable

import numpy as np

from malib import settings
from malib.utils.typing import DataTransferType


[docs]class DataArray(abc.ABC): def __init__( self, name: str, capacity: Union[int, None] = settings.DEFAULT_EPISODE_CAPACITY, init_capacity: int = settings.DEFAULT_EPISODE_INIT_CAPACITY, ): self._name = name self._length = 0 if capacity == -1: self._max_capacity = sys.maxsize elif capacity is None: self._max_capacity = settings.DEFAULT_EPISODE_CAPACITY else: self._max_capacity = capacity self._capacity = min(init_capacity, capacity) self._offset = 0 self._data = None @abc.abstractmethod def __getitem__(self, item): raise NotImplementedError @abc.abstractmethod def __len__(self): raise NotImplementedError
[docs] @abc.abstractmethod def capacity(self): raise NotImplementedError
[docs] @abc.abstractmethod def size(self): raise NotImplementedError
[docs] @abc.abstractmethod def fill(self, data): raise NotImplementedError
[docs] @abc.abstractmethod def insert(self, data): raise NotImplementedError
[docs] @abc.abstractmethod def get_data(self): raise NotImplementedError
[docs]class NumpyDataArray(DataArray): def __init__( self, name: str, capacity: Union[int, None] = settings.DEFAULT_EPISODE_CAPACITY, init_capacity: int = settings.DEFAULT_EPISODE_INIT_CAPACITY, ): """Data storage in the form of numpy array. The instantiation of its physical storage space is done when the first data insert/fill operation is called. Doubling its current capacity until hit the maximum capacity limitation when an insertion is called on a full data array. :param str name: A parameter help users identifying the data array, will set to be consistent with the episode column names by default. :param int capacity: The max capacity that the data array can span. The default value malib.settings.DEFAULT_EPISODE_CAPACITY is applied when ***capacity*** is set to be None. :param int init_capacity: Then init size of the internal data array created when the first insert/fill operation is called. """ super().__init__(name=name, capacity=capacity, init_capacity=init_capacity) def __getitem__(self, item: Union[int, Iterable, slice]) -> np.ndarray: """ :param Union[int,Iterable,slice] item: indices of the desired data entry. :return: data in the form of numpy ndarray. :raises IndexError """ if self._data is None: raise IndexError("Index on column data out of range") if isinstance(item, int) or isinstance(item, np.integer): idx = item # check valid index if -self._length <= idx < self._length: if idx >= 0: idx = (self._offset + idx) % self._max_capacity else: idx = (self._offset + self._length + idx) % self._max_capacity return self._data[idx] else: raise IndexError( f"Index on column data out of range, length{self._length}, index{idx}" ) elif isinstance(item, Iterable): indices = np.asarray(item) indices = ( (indices < 0) * self._length + indices + self._offset ) % self._max_capacity return self._data[indices] elif isinstance(item, slice): return self[range(*item.indices(self._length))] else: raise IndexError( f"Indices must be int, numpy.integer, slices, not {item.__class__}" ) def __len__(self) -> int: return self._length @property def capacity(self) -> int: return self._capacity @property def size(self) -> int: return self._length
[docs] def roll(self, shift: int, axis: int = 0): self._data[: self._length] = np.roll( self._data[: self._length], shift, axis=axis )
[docs] def fill(self, data: DataTransferType, capacity: int = None) -> "NumpyDataArray": """ Flush fill the array with the input data. Note: For performance issue, it is designed that after the filling, the internal data storage will only use a shallow copy of the input data. If users are not sure if the input data will be modified afterwards, please use the deep copy of the data instead. :param DataTransferType data: Array-like data input. :param int capacity: Override the pre-set max capacity if provided. :return: A reference of the caller. """ if capacity is not None: self._max_capacity = capacity if data.shape[0] >= capacity: data = data[-capacity:] self._data = data self._capacity = data.shape[0] else: self._data = data self._capacity = data.shape[0] self._offset = 0 self._length = data.shape[0] return self
[docs] def insert(self, data: DataTransferType) -> None: """ Insert data at the back of the array. A certain extent of time consumptions are expected due to: (1) The first insert/fill operation will instantiate the physical storage space. (2) Possible memory allocation & data movement when insert on a full array. :param DataTransferType data: Data block to be inserted at the back of the array. :return: None """ if isinstance(data, list): data = np.array(data) if not isinstance(data, np.ndarray): data = np.array([data]) if self._data is None: data_shape = list(data.shape) data_shape[0] = max(self._capacity, data.shape[0]) self._data = np.empty_like(data, shape=data_shape) length = data.shape[0] assert 0 <= self._length <= self._capacity <= self._max_capacity if self._length == self._max_capacity: if length < self._capacity: indices = (self._offset + np.array(range(length))) % self._capacity self._data[indices] = data self._offset = (indices[-1] + 1) % self._capacity else: self._data = data[-self._capacity :] self._offset = 0 else: # print(self._capacity) assert self._offset == 0 target_length = self._length + length if target_length <= self._capacity: self._data[self._length : target_length] = data self._length = target_length else: new_capacity = min(2 * target_length, self._max_capacity) data_shape = list(self._data.shape) data_shape[0] = new_capacity _data = np.empty_like(self._data, shape=data_shape) _data[: self._length] = self._data[: self._length] inserted_length = min(new_capacity - self._length, length) _data[self._length : self._length + inserted_length] = data[ :inserted_length ] data = data[inserted_length:] self._data = _data self._length += inserted_length self._capacity = new_capacity if data.size > 0: self.insert(data)
[docs] def get_data(self) -> DataTransferType: """ Return the existing data in the internal storage, order preserving. :return DataTransferType """ indices = np.roll(np.arange(self._length), self._offset) return self._data[indices]
@property def nbytes(self) -> int: """ Return the memory size(in bytes) of the space occupied by the array data. :return: int """ return self._data.nbytes
[docs]class LinkDataArray(DataArray): """ Segmented data structure for efficient rollout data storage/inference """ from collections import deque def __init__( self, name: str, capacity: Union[int, None] = settings.DEFAULT_EPISODE_CAPACITY, init_capacity: Union[int, None] = settings.DEFAULT_EPISODE_INIT_CAPACITY, block_size: Union[int, None] = settings.DEFAULT_CONFIG["rollout"][ "fragment_length" ], ): self._max_blocks = math.ceil(capacity / block_size) super().__init__( name=name, capacity=self._max_blocks * block_size, init_capacity=self._max_blocks * block_size, ) self._blocks = self.deque(maxlen=self._max_blocks) self._block_lengths = 0 self._length = 0 self._block_size = 75 def __setitem__(self, key, value): raise NotImplementedError def __getitem__(self, item: int) -> np.ndarray: idx = item if isinstance(item, int) or isinstance(item, np.integer): # check valid index if -self._length <= idx < self._length: block_index = item / self._block_size inblock_index = item % self._block_size return self._blocks[math.floor(block_index)][inblock_index] else: raise IndexError("Index on column data out of range") elif isinstance(item, Iterable): indices = np.array(item) block_indices = np.floor_divide(indices, self._block_size) inblock_indices = np.mod(indices, self._block_size) res = [] for (x, y) in zip(block_indices, inblock_indices): res.append(self._blocks[x][y]) return np.array(res) elif isinstance(item, slice): return self[range(*item.indices(self._length))] else: raise IndexError( f"Indices must be int, numpy.integer, slices, not {item.__class__}" ) def __len__(self) -> int: return self._length @property def capacity(self) -> int: return self._capacity @property def size(self) -> int: return self._length
[docs] def fill( self, data: np.ndarray, capacity: Union[None, int] = None ) -> "LinkDataArray": if capacity is not None: if capacity % self._block_size: self._max_blocks = math.ceil(capacity / self._block_size) self._capacity = self._max_blocks * self._block_size self._blocks = self.deque(maxlen=self._max_blocks) else: self._blocks = self.deque(maxlen=self._blocks.maxlen) if self._capacity > data.shape[0]: data = data[-self._capacity :] while data.size > 0: self._blocks.append(data[: self._block_size]) data = data[self._block_size :] self._length = self._block_size * len(self._blocks) return self
[docs] def insert(self, data: Union[np.ndarray, Iterable[np.ndarray]]): if isinstance(data, np.ndarray): assert data.shape[0] % self._block_size == 0 while data.size > 0: self._blocks.append(data[: self._block_size]) data = data[self._block_size :] elif isinstance(data, self.deque): for block in data: assert isinstance(block, np.ndarray) assert block.shape[0] == self._block_size self._blocks.append(block) self._length = self._block_size * len(self._blocks)
[docs] def get_data(self) -> Iterable[np.ndarray]: return self._blocks