Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/pythontest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
COVERALLS_FLAG_NAME: ${{ env.MATRIX_NAME }}
COVERALLS_PARALLEL: true
# this step can fail
continue-on-error: true

- name: Upload Test Results
if: always()
Expand All @@ -92,6 +94,8 @@ jobs:
coveralls --service=github --finish
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
# this step can fail
continue-on-error: true

event_file:
name: "Event File"
Expand Down
162 changes: 148 additions & 14 deletions qupulse/program/values.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
"""Runtime variable value implementations."""

from dataclasses import dataclass
from dataclasses import dataclass, field
from functools import cached_property
from numbers import Real
from typing import TypeVar, Generic, Mapping, Union
from typing import TypeVar, Generic, Mapping, Union, Tuple, Optional
from types import MappingProxyType

from qupulse.program.volatile import VolatileRepetitionCount
from qupulse.utils.types import TimeType
import numpy as np

from qupulse.program.volatile import VolatileRepetitionCount
from qupulse.utils.types import TimeType, frozendict
from qupulse.expressions import sympy as sym_expr
from qupulse.utils.sympy import _lambdify_modules


NumVal = TypeVar('NumVal', bound=Real)


@dataclass
@dataclass(
frozen=True,
repr=False, # dont leak frozendict implementation detail in repr
)
class DynamicLinearValue(Generic[NumVal]):
"""This is a potential runtime-evaluable expression of the form

Expand All @@ -34,8 +40,9 @@ class DynamicLinearValue(Generic[NumVal]):
factors: Mapping[str, NumVal]

def __post_init__(self):
assert isinstance(self.factors, Mapping)

immutable = frozendict(self.factors)
object.__setattr__(self, 'factors', immutable)

def value(self, scope: Mapping[str, NumVal]) -> NumVal:
"""Numeric value of the expression with the given scope.
Args:
Expand All @@ -44,19 +51,34 @@ def value(self, scope: Mapping[str, NumVal]) -> NumVal:
The numeric value.
"""
value = self.base
for name, factor in self.factors:
for name, factor in self.factors.items():
value += scope[name] * factor
return value


def __abs__(self):
# The deifnition of an absolute value is ambiguous, but there is a case
# to define it as sum_i abs(f_i) + abs(base) for certain conveniences.
# return abs(self.base)+sum([abs(o) for o in self.factors.values()])
raise NotImplementedError(f'abs({self.__class__.__name__}) is ambiguous')

def __eq__(self, other):
if isinstance(other, type(self)):
return self.base == other.base and self.factors == other.factors

if (base_eq := self.base.__eq__(other)) is NotImplemented:
return NotImplemented

return base_eq and not self.factors

def __add__(self, other):
if isinstance(other, (float, int, TimeType)):
return DynamicLinearValue(self.base + other, self.factors)

if type(other) == type(self):
offsets = dict(self.factors)
factors = dict(self.factors)
for name, value in other.factors.items():
offsets[name] = value + offsets.get(name, 0)
return DynamicLinearValue(self.base + other.base, offsets)
factors[name] = value + factors.get(name, 0)
return DynamicLinearValue(self.base + other.base, factors)

# this defers evaluation when other is still a symbolic expression
return NotImplemented
Expand Down Expand Up @@ -86,7 +108,7 @@ def __rmul__(self, other):
def __truediv__(self, other):
inv = 1 / other
return self.__mul__(inv)

@property
def free_symbols(self):
"""This is required for the :py:class:`sympy.expr.Expr` interface compliance. Since the keys of
Expand Down Expand Up @@ -114,12 +136,124 @@ def replace(self, r, s):
"""
return self

def __repr__(self):
return f"{type(self).__name__}(base={self.base!r}, factors={dict(self.factors)!r})"


# is there any way to cast the numpy cumprod to int?
int_type = Union[np.int64,np.int32,int]


def _to_resolution(x, resolution):
"""Function used by :py:class:`.ResolutionDependentValue` for rounding to resolution multiples."""
# to avoid conflicts between positive and negative vals from casting half to even, we only round positive numbers
if x < 0:
return -round(-x / resolution) * resolution
else:
return round(x / resolution) * resolution


@dataclass(frozen=True)
class ResolutionDependentValue(Generic[NumVal]):
"""This is a potential runtime-evaluable expression of the form

o + sum_i b_i*m_i

with (potential) float o, b_i and integers m_i. o and b_i are rounded(gridded)
to a resolution given upon __call__.

The main use case is the correct rounding of increments used in command-based
voltage scans on some hardware devices, where an imprecise numeric value is
looped over m_i times and could, if not rounded, accumulate a proportional
error leading to unintended drift in output voltages when jump-back commands
afterwards do not account for the deviations.
Rounding the value preemptively and supplying corrected values to jump-back
commands prevents this.
"""

bases: Tuple[NumVal, ...]
multiplicities: Tuple[int, ...]
offset: NumVal

@cached_property
def _is_time_or_int(self):
return all(isinstance(b,(TimeType,int_type)) for b in self.bases) and isinstance(self.offset,(TimeType,int_type))

def with_resolution(self, resolution: Optional[NumVal]) -> NumVal:
"""Get the numeric value rounding to the given resolution.

Args:
resolution: Resolution the bases and offset are rounded to. If none all values must be integers.

Returns:
The rounded numeric value.
"""
if resolution is None:
assert self._is_time_or_int
return sum(b * m for b, m in zip(self.bases, self.multiplicities)) + self.offset

offset = _to_resolution(self.offset, resolution)
base_sum = sum(_to_resolution(base, resolution) * multiplicity
for base, multiplicity in zip(self.bases, self.multiplicities))
return base_sum + offset

def __call__(self, resolution: Optional[float]) -> Union[NumVal,TimeType]:
"""Backward compatible alias of :py:meth:`~ResolutionDependentValue.with_resolution`."""
return self.with_resolution(resolution)

def __bool__(self):
#if any value is not zero - this helps for some checks
return any(bool(b) for b in self.bases) or bool(self.offset)

def __add__(self, other):
# this should happen in the context of an offset being added to it, not the bases being modified.
if isinstance(other, (float, int, TimeType)):
return ResolutionDependentValue(self.bases, self.multiplicities, self.offset+other)
return NotImplemented

def __radd__(self, other):
return self.__add__(other)

def __sub__(self, other):
return self.__add__(-other)

def __mul__(self, other):
# this should happen when the amplitude is being scaled
# multiplicities are not affected
if isinstance(other, (float, int, TimeType)):
return ResolutionDependentValue(tuple(b*other for b in self.bases),self.multiplicities,self.offset*other)
return NotImplemented

def __rmul__(self,other):
return self.__mul__(other)

def __truediv__(self,other):
return self.__mul__(1/other)

def __float__(self):
return float(self.with_resolution(resolution=None))

def __str__(self):
return f"RDP of {sum(b*m for b,m in zip(self.bases,self.multiplicities)) + self.offset}"



#This is a simple dervide class to allow better isinstance checks in the HDAWG driver
@dataclass(frozen=True)
class DynamicLinearValueStepped(DynamicLinearValue):
step_nesting_level: int
rng: range
reverse: int|bool


# TODO: hackedy, hackedy
sym_expr.ALLOWED_NUMERIC_SCALAR_TYPES = sym_expr.ALLOWED_NUMERIC_SCALAR_TYPES + (DynamicLinearValue,)

# this keeps the simple expression in lambdified results
_lambdify_modules.append({'DynamicLinearValue': DynamicLinearValue})
_lambdify_modules.append({
'DynamicLinearValue': DynamicLinearValue,
'DynamicLinearValueStepped': DynamicLinearValueStepped,
})

RepetitionCount = Union[int, VolatileRepetitionCount, DynamicLinearValue[int]]
HardwareTime = Union[TimeType, DynamicLinearValue[TimeType]]
Expand Down
Empty file added tests/program/__init__.py
Empty file.
139 changes: 139 additions & 0 deletions tests/program/values_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import copy
import unittest
from unittest import TestCase

import numpy as np

from qupulse.pulses import *
from qupulse.program.linspace import *
from qupulse.program.transformation import *
from qupulse.pulses.function_pulse_template import FunctionPulseTemplate
from qupulse.program.values import DynamicLinearValue, ResolutionDependentValue, DynamicLinearValueStepped
from qupulse.utils.types import TimeType


class DynamicLinearValueTests(TestCase):
def setUp(self):
self.d = DynamicLinearValue(-100,{'a':np.pi,'b':np.e})
self.d3 = DynamicLinearValue(-300,{'a':np.pi,'b':np.e})

def test_value(self):
dval = self.d.value({'a':12,'b':34})
np.testing.assert_allclose(dval, 12*np.pi+34*np.e-100)

# def test_abs(self):
# np.testing.assert_allclose(abs(self.d),100+np.pi+np.e)

def test_add_sub_neg(self):

self.assertEqual(self.d + 3,
DynamicLinearValue(-100+3,{'a':np.pi,'b':np.e}))
self.assertEqual(self.d + np.pi,
DynamicLinearValue(-100+np.pi,{'a':np.pi,'b':np.e}))
self.assertEqual(self.d + TimeType(12/5),
DynamicLinearValue(-100+TimeType(12/5),{'a':np.pi,'b':np.e}))
#sub
self.assertEqual(self.d - TimeType(12/5),
DynamicLinearValue(-100-TimeType(12/5),{'a':np.pi,'b':np.e}))

#this would raise because of TimeType conversion
# self.assertEqual(TimeType(12/5)-self.d,
# DynamicLinearValue(100+TimeType(12/5),{'a':-np.pi,'b':-np.e}))
#same type
self.assertEqual(self.d+DynamicLinearValue(0.1,{'b':1,'c':2}),
DynamicLinearValue(-99.9,{'a':np.pi,'b':np.e+1,'c':2}))

def test_mul(self):
self.assertEqual(self.d*3,
DynamicLinearValue(-3*100,{'a':3*np.pi,'b':3*np.e}))
self.assertEqual(3*self.d,
DynamicLinearValue(-3*100,{'a':3*np.pi,'b':3*np.e}))
#div
self.assertEqual(self.d3/3,
DynamicLinearValue(-100,{'a':np.pi/3,'b':np.e/3}))
#raise
self.assertRaises(TypeError,lambda: 3/self.d,)

def test_eq(self):

self.assertEqual(self.d==1,False)
self.assertEqual(self.d==1+1j,False)
# self.assertEqual(self.d>-101,True) #if one wants to allow these comparisons
# self.assertEqual(self.d<TimeType(24/5),True) #if one wants to allow these comparisons

self.assertEqual(self.d==self.d,True)
self.assertEqual(self.d+1==self.d,False)

# inoperative features.
# self.assertEqual(self.d+1>self.d,False)
# self.assertEqual(self.d+1<self.d,False)
# self.assertEqual(self.d+1>=self.d,True)
# self.assertEqual(self.d+1<=self.d,False)

# self.assertEqual(self.d>self.d/2-51,True)
# self.assertEqual(self.d<self.d*2+101,True)

def test_sympy(self):
self.assertEqual(self.d._sympy_(), self.d)
self.assertEqual(self.d.replace(1,1), self.d)

def test_hash(self):
self.assertEqual(hash(self.d), hash(self.d))
self.assertNotEqual(hash(self.d), hash(self.d3))



class ResolutionDependentValueTests(TestCase):
def setUp(self):
self.d = ResolutionDependentValue((np.pi*1e-5,np.e*1e-5),(10,20),0.1)
self.d2 = ResolutionDependentValue((np.e*1e-5,np.pi*1e-5),(10,20),-0.1)
self.dtt = ResolutionDependentValue((TimeType(12,5),TimeType(14,5)),(10,20),TimeType(12,5))
self.dint = ResolutionDependentValue((1,2),(10,20),1)

self._default_res = 2**-16

def test_call(self):
val = self.d(self._default_res)
#expected to round to 2*res each, so 60*2**16, offset also rounded
expected_val = 2**-16 * 60 + 0.100006103515625
self.assertAlmostEqual(val,expected_val,places=12)
self.assertEqual((val/self._default_res)%1,0)

#if no resolution must be tt or int
self.assertEqual(self.dtt(None),TimeType(412, 5))
self.assertEqual(self.dint(None),51)

def test_repr_round_trip(self):
eval_str = repr(self.dint)
evaluated = eval(eval_str)
self.assertEqual(self.dint, evaluated)
evaluated_repr = repr(evaluated)
self.assertEqual(eval_str, evaluated_repr)

def test_dunder(self):
self.assertEqual(bool(self.d), True)
self.assertRaises(TypeError, lambda: self.d+self.d2)
self.assertEqual(self.d-0.1,
ResolutionDependentValue((np.pi*1e-5,np.e*1e-5),(10,20),0.0))
self.assertEqual(self.d*2,
ResolutionDependentValue((np.pi*1e-5*2,np.e*1e-5*2),(10,20),0.2))
self.assertEqual(self.d/2,
ResolutionDependentValue((np.pi*1e-5/2,np.e*1e-5/2),(10,20),0.1/2))

self.assertRaises(AssertionError, lambda: float(self.d))

try: str(self.d)
except: self.fail()

self.assertEqual(hash(self.d), hash(self.d))
self.assertNotEqual(hash(self.d), hash(self.d2))

self.assertEqual(self.d==ResolutionDependentValue((np.pi*1e-5,np.e*1e-5),(10,20),0.1),True)


class DynamicLinearValueSteppedTests(TestCase):
def test_properties(self):
d = DynamicLinearValueStepped(0.,{'a':1},1,range(3),False)
self.assertEqual(d.rng, range(3))
self.assertEqual(d.step_nesting_level, 1)
self.assertEqual(d.reverse, False)
Loading