import numpy as np
from .base import SparseMatrix
try:
from .. import _core
except Exception: # pragma: no cover
_core = None
[docs]
class CSC(SparseMatrix):
"""Compressed Sparse Column (CSC) matrix.
Parameters
----------
indptr : array_like of int64, shape ``(ncols + 1,)``
Column pointer array.
indices : array_like of int64, shape ``(nnz,)``
Row indices of nonzero values.
data : array_like of float64, shape ``(nnz,)``
Nonzero values.
shape : tuple of int
Matrix shape ``(nrows, ncols)``.
dtype : numpy.dtype, optional
Value dtype, defaults to ``np.float64``.
check : bool, optional
If True, validate invariants in the native layer (may be slower).
Attributes
----------
indptr, indices, data : numpy.ndarray
Storage arrays for CSC structure and values.
shape : tuple[int, int]
Matrix dimensions.
dtype : numpy.dtype
Value dtype.
nnz : int
Number of stored elements.
Notes
-----
Backed by Rust kernels through ``lacuna._core.Csc64``; operations release the GIL.
Examples
--------
Construct a small CSC and run basic ops::
>>> import numpy as np
>>> from lacuna.sparse import CSC
>>> indptr = np.array([0, 1, 2, 3])
>>> indices = np.array([0, 1, 1])
>>> data = np.array([1.0, 2.0, 3.0])
>>> a = CSC(indptr, indices, data, shape=(2, 3))
>>> a.nnz
3
>>> (a @ np.array([1.0, 0.0, 1.0])).tolist() # SpMV
[1.0, 3.0]
>>> a.sum()
6.0
"""
def __init__(self, indptr, indices, data, shape, dtype=np.float64, check=True):
super().__init__(shape=shape, dtype=dtype)
self.indptr = np.asarray(indptr, dtype=np.int64)
self.indices = np.asarray(indices, dtype=np.int64)
self.data = np.asarray(data, dtype=np.float64)
if _core is not None:
try:
self._handle = _core.Csc64(
self.shape[0], self.shape[1], self.indptr, self.indices, self.data, check
)
except Exception:
self._handle = None
else:
self._handle = None
[docs]
@classmethod
def from_arrays(cls, indptr, indices, data, shape, check=True):
"""Construct from CSC arrays.
Parameters
----------
indptr, indices, data : array_like
CSC structure and values.
shape : tuple[int, int]
Matrix shape.
check : bool, optional
Validate invariants in the native layer.
"""
return cls(indptr, indices, data, shape, check=check)
@property
def nnz(self):
"""Number of stored values."""
return int(self.data.size)
def __matmul__(self, other):
"""Matrix product with a dense vector or dense 2D array.
- If ``other`` is 1D, returns ``(nrows,)``.
- If ``other`` is 2D of shape ``(ncols, k)``, returns ``(nrows, k)``.
"""
if _core is None:
raise RuntimeError("native core is not available")
arr = np.asarray(other, dtype=np.float64)
if arr.ndim == 1:
if arr.shape[0] != self.shape[1]:
raise ValueError("vector length must equal ncols")
h = getattr(self, "_handle", None)
if h is not None:
return h.spmv(arr)
raise RuntimeError("native handle is not available")
elif arr.ndim == 2:
if arr.shape[0] != self.shape[1]:
raise ValueError("matrix rows must equal ncols")
h = getattr(self, "_handle", None)
if h is not None:
return h.spmm(arr)
raise RuntimeError("native handle is not available")
else:
raise ValueError("right operand must be 1D or 2D")
[docs]
def sum(self, axis=None):
"""Sum of entries.
Parameters
----------
axis : {None, 0, 1}, optional
``None`` for global sum; ``0`` for column sums; ``1`` for row sums.
"""
if _core is None:
raise RuntimeError("native core is not available")
h = getattr(self, "_handle", None)
if h is None:
raise RuntimeError("native handle is not available")
if axis is None:
return float(h.sum())
if axis == 0:
return h.col_sums()
if axis == 1:
return h.row_sums()
raise ValueError("axis must be None, 0, or 1")
[docs]
def prune(self, eps):
"""Drop entries with ``abs(value) <= eps``. Returns a new :class:`CSC`."""
if _core is None:
raise RuntimeError("native core is not available")
h = getattr(self, "_handle", None)
if h is None:
raise RuntimeError("native handle is not available")
pi, pj, pv, pr, pc = h.prune(float(eps))
return CSC(pi, pj, pv, (pr, pc), check=False)
[docs]
def eliminate_zeros(self):
"""Remove explicit zeros. Returns a new :class:`CSC`."""
if _core is None:
raise RuntimeError("native core is not available")
h = getattr(self, "_handle", None)
if h is None:
raise RuntimeError("native handle is not available")
pi, pj, pv, pr, pc = h.eliminate_zeros()
return CSC(pi, pj, pv, (pr, pc), check=False)
def __mul__(self, alpha):
"""Scalar multiplication: returns ``alpha * self`` as :class:`CSC`."""
if _core is None:
raise RuntimeError("native core is not available")
h = getattr(self, "_handle", None)
if h is None:
raise RuntimeError("native handle is not available")
alpha = float(alpha)
oi, oj, ov, orr, occ = h.mul_scalar(alpha)
return CSC(oi, oj, ov, (orr, occ), check=False)
__rmul__ = __mul__
def __add__(self, other):
"""Elementwise addition with another :class:`CSC`."""
if isinstance(other, CSC):
if _core is None:
raise RuntimeError("native core is not available")
ha = getattr(self, "_handle", None)
hb = getattr(other, "_handle", None)
if ha is not None and hb is not None:
ci, cj, cv, cr, cc = ha.add(hb)
return CSC(ci, cj, cv, (cr, cc), check=False)
raise RuntimeError("native handle is not available")
return NotImplemented
def __sub__(self, other):
"""Elementwise subtraction with another :class:`CSC`."""
if isinstance(other, CSC):
if _core is None:
raise RuntimeError("native core is not available")
ha = getattr(self, "_handle", None)
hb = getattr(other, "_handle", None)
if ha is not None and hb is not None:
ci, cj, cv, cr, cc = ha.sub(hb)
return CSC(ci, cj, cv, (cr, cc), check=False)
raise RuntimeError("native handle is not available")
return NotImplemented
[docs]
def multiply(self, other):
"""Hadamard (elementwise) product with another :class:`CSC`."""
if isinstance(other, CSC):
if _core is None:
raise RuntimeError("native core is not available")
ha = getattr(self, "_handle", None)
hb = getattr(other, "_handle", None)
if ha is not None and hb is not None:
ci, cj, cv, cr, cc = ha.hadamard(hb)
return CSC(ci, cj, cv, (cr, cc), check=False)
raise RuntimeError("native handle is not available")
return NotImplemented
[docs]
def toarray(self):
"""Convert to a dense NumPy ``ndarray`` of shape ``(nrows, ncols)``."""
nrows, ncols = self.shape
out = np.zeros((nrows, ncols), dtype=self.data.dtype)
for j in range(ncols):
s = int(self.indptr[j])
e = int(self.indptr[j + 1])
if s < e:
out[self.indices[s:e], j] = self.data[s:e]
return out
@property
def T(self):
if _core is None:
raise RuntimeError("native core is not available")
h = getattr(self, "_handle", None)
if h is not None:
ti, tj, tv, tr, tc = h.transpose()
return CSC(ti, tj, tv, (tr, tc), check=False)
# fallback to from_parts path
ti, tj, tv, tr, tc = _core.transpose_csc_from_parts(
self.shape[0], self.shape[1], self.indptr, self.indices, self.data, False
)
return CSC(ti, tj, tv, (tr, tc), check=False)
def __repr__(self):
return f"CSC(shape={self.shape}, nnz={self.nnz}, dtype={self.data.dtype.name})"
def __str__(self):
return self.__repr__()