Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
Loading items

Target

Select target project
  • flow3r/flow3r-firmware
  • Vespasian/flow3r-firmware
  • alxndr42/flow3r-firmware
  • pl/flow3r-firmware
  • Kari/flow3r-firmware
  • raimue/flow3r-firmware
  • grandchild/flow3r-firmware
  • mu5tach3/flow3r-firmware
  • Nervengift/flow3r-firmware
  • arachnist/flow3r-firmware
  • TheNewCivilian/flow3r-firmware
  • alibi/flow3r-firmware
  • manuel_v/flow3r-firmware
  • xeniter/flow3r-firmware
  • maxbachmann/flow3r-firmware
  • yGifoom/flow3r-firmware
  • istobic/flow3r-firmware
  • EiNSTeiN_/flow3r-firmware
  • gnudalf/flow3r-firmware
  • 999eagle/flow3r-firmware
  • toerb/flow3r-firmware
  • pandark/flow3r-firmware
  • teal/flow3r-firmware
  • x42/flow3r-firmware
  • alufers/flow3r-firmware
  • dos/flow3r-firmware
  • yrlf/flow3r-firmware
  • LuKaRo/flow3r-firmware
  • ThomasElRubio/flow3r-firmware
  • ai/flow3r-firmware
  • T_X/flow3r-firmware
  • highTower/flow3r-firmware
  • beanieboi/flow3r-firmware
  • Woazboat/flow3r-firmware
  • gooniesbro/flow3r-firmware
  • marvino/flow3r-firmware
  • kressnerd/flow3r-firmware
  • quazgar/flow3r-firmware
  • aoid/flow3r-firmware
  • jkj/flow3r-firmware
  • naomi/flow3r-firmware
41 results
Select Git revision
Loading items
Show changes
Showing
with 3629 additions and 0 deletions
# SPDX-License-Identifier: CC0-1.0
from bl00mbox._user import *
import bl00mbox._patches as patches
import bl00mbox._helpers as helpers
from bl00mbox._plugins import plugins
# SPDX-License-Identifier: CC0-1.0
import time
def terminal_scope(
signal,
signal_min=-32767,
signal_max=32767,
delay_ms=20,
width=80,
fun=None,
fun_ms=None,
):
"""give it a signal and show it on terminal"""
if signal_max <= signal_min:
return
ms_counter = 0
fun_counter = 0
if fun != None:
fun()
while True:
if fun != None:
if fun_ms != None:
if fun_ms <= fun_counter:
fun()
fun_counter = fun_counter % fun_ms
raw_val = signal.value
ret = f"{ms_counter:06d}"
if raw_val == -32768:
ret += " [" + "?" * width + "] INVALID"
else:
val = int((width * (raw_val - signal_min)) / (signal_max - signal_min))
if val > width:
val = width
if val < 0:
val = 0
ret += " [" + "X" * val + "." * (width - val) + "]"
percent = int(100 * val / width)
ret += f" {percent:02d}%"
print(ret)
time.sleep_ms(delay_ms)
ms_counter += delay_ms
fun_counter += delay_ms
# terminal_scope(a.env.signals.output, 0, fun = a.start, fun_ms = 1000)
def sct_to_note_name(sct):
sct = sct - 18367 + 100
octave = ((sct + 9 * 200) // 2400) + 4
tones = ["A", "Bb", "B", "C", "Db", "D", "Eb", "E", "F", "Gb", "G", "Ab"]
tone = tones[(sct // 200) % 12]
return tone + str(octave)
def note_name_to_sct(name):
tones = ["A", "Bb", "B", "C", "Db", "D", "Eb", "E", "F", "Gb", "G", "Ab"]
semitones = tones.index(name[0])
if semitones > 2:
semitones -= 12
if name[1] == "b":
octave = int(name[2:])
semitones -= 1
elif name[1] == "#":
octave = int(name[2:])
semitones += 1
else:
octave = int(name[1:])
return 18367 + (octave - 4) * 2400 + (200 * semitones)
def sct_to_freq(sct):
return 440 * 2 ** ((sct - 18367) / 2400)
# SPDX-License-Identifier: CC0-1.0
import math
import os
import bl00mbox
import cpython.wave as wave
class _Patch:
def __init__(self, chan):
self.signals = _PatchSignalList()
self._channel = chan
# old style
self.plugins = _PatchPluginList()
# new style
self._plugins = []
def new(self, *args, **kwargs):
plugin = self._channel.new(*args, **kwargs)
self._plugins.append(plugin)
return plugin
def __repr__(self):
ret = "[patch] " + type(self).__name__
ret += "\n [signals:] " + "\n ".join(repr(self.signals).split("\n"))
# old style
plugin_repr = repr(self.plugins)
# new style
for plugin in self._plugins:
plugin_repr += "\n" + repr(plugin)
ret += "\n [plugins:] " + "\n ".join(plugin_repr.split("\n"))
return ret
def delete(self):
# new style
plugins = self._plugins[:]
# old style
for plugin_name in self.plugins.__dict__:
if not plugin_name.startswith("_"):
plugin_list = self.plugins.__dict__[plugin_name]
if (type(plugin_list)) != list:
plugin_list = [plugin_list]
plugins += plugin_list
for plugin in set(plugins):
plugin.delete()
class _PatchItemList:
def __init__(self):
self._items = []
# workaround
def __iter__(self):
return iter(self._items)
def __repr__(self):
ret = ""
for x in self._items:
a = ("\n" + repr(getattr(self, x))).split("]")
a[0] += ": " + x
ret += "]".join(a)
return ret
def __setattr__(self, key, value):
# old style
current_value = getattr(self, key, None)
if current_value is None and not key.startswith("_"):
self._items.append(key)
super().__setattr__(key, value)
class _PatchSignalList(_PatchItemList):
def __setattr__(self, key, value):
current_value = getattr(self, key, None)
if isinstance(current_value, bl00mbox.Signal):
current_value.value = value
else:
super().__setattr__(key, value)
class _PatchPluginList(_PatchItemList):
pass
class tinysynth(_Patch):
def __init__(self, chan):
super().__init__(chan)
self.plugins.osc = self._channel.new(bl00mbox.plugins.osc_fm)
self.plugins.env = self._channel.new(bl00mbox.plugins.env_adsr)
self.plugins.amp = self._channel.new(bl00mbox.plugins.ampliverter)
self.plugins.amp.signals.gain = self.plugins.env.signals.output
self.plugins.amp.signals.input = self.plugins.osc.signals.output
self.plugins.env.signals.decay = 500
self.signals.output = self.plugins.amp.signals.output
self.signals.pitch = self.plugins.osc.signals.pitch
self.signals.waveform = self.plugins.osc.signals.waveform
self.signals.trigger = self.plugins.env.signals.trigger
self.signals.attack = self.plugins.env.signals.attack
self.signals.sustain = self.plugins.env.signals.sustain
self.signals.decay = self.plugins.env.signals.decay
self.signals.release = self.plugins.env.signals.release
self.signals.volume = self.plugins.env.signals.input
self.signals.release = 100
class tinysynth_fm(tinysynth):
def __init__(self, chan):
super().__init__(chan)
self.plugins.mod_osc = self._channel.new(bl00mbox.plugins.osc_fm)
self.plugins.mult = self._channel.new(bl00mbox.plugins.multipitch, 1)
self.plugins.mod_osc.signals.output = self.plugins.osc.signals.lin_fm
self.plugins.mod_osc.signals.pitch = self.plugins.mult.signals.output0
self.plugins.osc.signals.pitch = self.plugins.mult.signals.thru
self.signals.fm_waveform = self.plugins.mod_osc.signals.waveform
self.signals.fm = self.plugins.mult.signals.shift0
self.signals.pitch = self.plugins.mult.signals.input
self.signals.decay = 1000
self.signals.attack = 20
self.signals.waveform = -1
self.signals.fm_waveform = 0
self.signals.fm.tone = 3173 / 200
class sampler(_Patch):
"""
requires a wave file (str) or max sample length in milliseconds (int). default path: /sys/samples/
"""
# DEPRECATED
_READ_HEAD_POS = 0 // 2
_WRITE_HEAD_POS = 2 // 2
_SAMPLE_START = 4 // 2
_SAMPLE_LEN = 6 // 2
_SAMPLE_RATE = 8 // 2
_STATUS = 10
_BUFFER = 11
def __init__(self, chan, init_var):
# init can be filename to load into ram
super().__init__(chan)
self.buffer_offset_i16 = 11
self._filename = ""
if type(init_var) == str:
self.plugins.sampler = chan.new(
bl00mbox.plugins.sampler, self._convert_filename(init_var)
)
else:
self.plugins.sampler = chan.new(bl00mbox.plugins.sampler, init_var)
self.signals.trigger = self.plugins.sampler.signals.playback_trigger
self.signals.output = self.plugins.sampler.signals.playback_output
self.signals.rec_in = self.plugins.sampler.signals.record_input
self.signals.rec_trigger = self.plugins.sampler.signals.record_trigger
# terrible backwards compatibility hack, never do that IRL
del self.plugins.sampler.signals._setattr_allowed
self.plugins.sampler.signals.pitch_shift = (
self.plugins.sampler.signals.playback_speed
)
self.plugins.sampler.signals._setattr_allowed = True
def _convert_filename(self, filename):
# TODO: waht if filename doesn't exist?
if filename.startswith("/flash/") or filename.startswith("/sd/"):
return filename
elif filename.startswith("/"):
return "/flash/" + filename
else:
return "/flash/sys/samples/" + filename
def load(self, filename):
filename = self._convert_filename(filename)
try:
self.plugins.sampler.load(filename)
except:
# no proper exception catching bc backwards compat
return False
return True
def save(self, filename, overwrite=False):
filename = self._convert_filename(filename)
try:
self.plugins.sampler.save(filename)
except:
# no proper exception catching bc backwards compat
return False
return True
def _offset_index(self, index):
index += self.sample_start
if index >= self.memory_len:
index -= self.memory_len
index += self.buffer_offset_i16
return index
@property
def memory_len(self):
return self.plugins.sampler.buffer_length
@property
def read_head_position(self):
table = self.plugins.sampler.table_uint32_array
return table[self._READ_HEAD_POS]
@read_head_position.setter
def read_head_position(self, val):
if val >= self.memory_len:
val = self.memory_len - 1
table = self.plugins.sampler.table_uint32_array
table[self._READ_HEAD_POS] = int(val)
@property
def sample_start(self):
table = self.plugins.sampler.table_uint32_array
return table[self._SAMPLE_START]
@sample_start.setter
def sample_start(self, val):
if val >= self.memory_len:
val = self.memory_len - 1
table = self.plugins.sampler.table_uint32_array
table[self._SAMPLE_START] = int(val)
@property
def sample_len(self):
table = self.plugins.sampler.table_uint32_array
return table[self._SAMPLE_LEN]
@sample_len.setter
def sample_len(self, val):
if val > self.memory_len:
val = self.memory_len
table = self.plugins.sampler.table_uint32_array
table[self._SAMPLE_LEN] = int(val)
@property
def sample_rate(self):
table = self.plugins.sampler.table_uint32_array
return table[self._SAMPLE_RATE]
@sample_rate.setter
def sample_rate(self, val):
table = self.plugins.sampler.table_uint32_array
if val < 0:
table[self._SAMPLE_RATE] = int(val)
@property
def filename(self):
return self._filename
@property
def rec_event_autoclear(self):
"""
returns true once after a record cycle has been completed. useful for checking whether a save is necessary if nothing else has modified the table.
"""
a = self.plugins.sampler_table[10]
if a & (1 << 4) & 0xFF:
a = a & ~(1 << 4) & 0xFF
self.plugins.sampler_table[10] = a
return True
return False
class sequencer(_Patch):
def __init__(self, chan, num_tracks, num_steps):
super().__init__(chan)
self.plugins.seq = chan.new(bl00mbox.plugins.sequencer, num_tracks, num_steps)
self.signals.bpm = self.plugins.seq.signals.bpm
self.signals.beat_div = self.plugins.seq.signals.beat_div
self.signals.step = self.plugins.seq.signals.step
self.signals.step_end = self.plugins.seq.signals.step_end
self.signals.step_start = self.plugins.seq.signals.step_start
self.signals.sync_in = self.plugins.seq.signals.sync_in
self.num_tracks = num_tracks
self.num_steps = num_steps
tracktable = [-32767] + ([0] * self.num_steps)
self.plugins.seq.table = tracktable * self.num_tracks
def __repr__(self):
ret = "[patch] step sequencer"
# ret += "\n " + "\n ".join(repr(self.seqs[0]).split("\n"))
ret += (
"\n bpm: "
+ str(self.signals.bpm.value)
+ " @ 1/"
+ str(self.signals.beat_div.value)
)
ret += (
" step: "
+ str(self.signals.step.value - self.signals.step_start.value)
+ "/"
+ str(self.signals.step_end.value - self.signals.step_start.value)
)
ret += "\n [tracks]"
"""
for x, seq in enumerate(self.seqs):
ret += (
"\n "
+ str(x)
+ " [ "
+ "".join(["X " if x > 0 else ". " for x in seq.table[1:]])
+ "]"
)
"""
ret += "\n" + "\n".join(super().__repr__().split("\n")[1:])
return ret
def _get_table_index(self, track, step):
return step + 1 + track * (self.num_steps + 1)
def trigger_start(self, track, step, val=32767):
a = self.plugins.seq.table
a[self._get_table_index(track, step)] = val
self.plugins.seq.table = a
def trigger_stop(self, track, step, val=32767):
a = self.plugins.seq.table
a[self._get_table_index(track, step)] = -1
self.plugins.seq.table = a
def trigger_clear(self, track, step):
a = self.plugins.seq.table
a[self._get_table_index(track, step)] = 0
self.plugins.seq.table = a
def trigger_state(self, track, step):
a = self.plugins.seq.table
return a[self._get_table_index(track, step)]
def trigger_toggle(self, track, step):
if self.trigger_state(track, step) == 0:
self.trigger_start(track, step)
else:
self.trigger_clear(track, step)
class fuzz(_Patch):
# DEPRECATED
def __init__(self, chan):
super().__init__(chan)
self.plugins.dist = chan.new(bl00mbox.plugins._distortion)
self.signals.input = self.plugins.dist.signals.input
self.signals.output = self.plugins.dist.signals.output
self._intensity = 2
self._volume = 32767
self._gate = 0
@property
def intensity(self):
return self._intensity
@intensity.setter
def intensity(self, val):
self._intensity = val
self._update_table()
@property
def volume(self):
return self._volume
@volume.setter
def volume(self, val):
self._volume = val
self._update_table()
@property
def gate(self):
return self._gate
@gate.setter
def gate(self, val):
self._gate = val
self._update_table()
def _update_table(self):
table = list(range(129))
for num in table:
if num < 64:
ret = num / 64 # scale to [0..1[ range
ret = ret**self._intensity
if ret > 1:
ret = 1
table[num] = int(self._volume * (ret - 1))
else:
ret = (128 - num) / 64 # scale to [0..1] range
ret = ret**self._intensity
table[num] = int(self._volume * (1 - ret))
gate = self.gate >> 9
for i in range(64 - gate, 64 + gate):
table[i] = 0
self.plugins.dist.table = table
class karplus_strong(_Patch):
def __init__(self, chan):
super().__init__(chan)
self.plugins.noise = chan._new_plugin(bl00mbox.plugins.noise_burst)
self.plugins.noise.signals.length = 25
self.plugins.flanger = chan._new_plugin(bl00mbox.plugins.flanger)
self.plugins.flanger.signals.resonance = 0
self.plugins.flanger.signals.decay = 1000
self.plugins.flanger.signals.manual.tone = "A2"
self.plugins.flanger.signals.input = self.plugins.noise.signals.output
self.signals.trigger = self.plugins.noise.signals.trigger
self.signals.trigger_length = self.plugins.noise.signals.length
self.signals.pitch = self.plugins.flanger.signals.manual
self.signals.output = self.plugins.flanger.signals.output
self.signals.level = self.plugins.flanger.signals.level
self.signals.decay = self.plugins.flanger.signals.decay
self.signals.resonance = self.plugins.flanger.signals.resonance
@property
def decay(self):
# deprecated
return self.plugins.flanger.signals.decay.value
@decay.setter
def decay(self, val):
# deprecated
self.plugins.flanger.signals.resonance = -2
self.plugins.flanger.signals.decay = int(val)
# SPDX-License-Identifier: CC0-1.0
import sys_bl00mbox
import bl00mbox
import uctypes
import os
import cpython.wave as wave
class _PluginDescriptor:
def __init__(self, index):
self.index = index
self.plugin_id = sys_bl00mbox.plugin_index_get_id(self.index)
self.name = sys_bl00mbox.plugin_index_get_name(self.index)
self.description = sys_bl00mbox.plugin_index_get_description(self.index)
def __repr__(self):
return (
"[plugin "
+ str(self.plugin_id)
+ "] "
+ self.name
+ ": "
+ self.description
)
class _PluginDescriptors:
pass
plugins = _PluginDescriptors()
def _fill():
plugins_list = {}
for i in range(sys_bl00mbox.plugin_registry_num_plugins()):
plugins_list[sys_bl00mbox.plugin_index_get_name(i).replace(" ", "_")] = i
for name, value in plugins_list.items():
setattr(plugins, name, _PluginDescriptor(value))
# legacy
if name == "sequencer" or name == "distortion":
setattr(plugins, "_" + name, _PluginDescriptor(value))
elif name == "sampler":
setattr(plugins, "_sampler_ram", _PluginDescriptor(value))
elif name == "delay_static":
setattr(plugins, "delay", _PluginDescriptor(value))
_fill()
class _Plugin:
_core_keys = (
"always_render",
"delete",
"init_var",
"table_len",
"name",
"plugin_id",
)
def __setattr__(self, key, value):
if key in self._core_keys:
setattr(self._core, key, value)
else:
super().__setattr__(key, value)
def __getattr__(self, key):
if key in self._core_keys:
return getattr(self._core, key)
else:
raise AttributeError(f"'{type(self)}' object has no attribute {key}")
def __init__(self, channel, core=None, plugin_id=None, init_var=0):
if core:
self._core = core
elif plugin_id is not None:
self._core = sys_bl00mbox.PluginCore(channel._core, plugin_id, init_var)
else:
raise ValueError("must supply core or plugin id")
self._signals = bl00mbox.SignalList(self)
def _repr_no_signals(self):
id_num = self._core.id
# id_num 0: special meaning, channel plugin, there can only be one
if id_num:
return f"[{self._core.name} (id={id_num})]"
else:
return f"[{self._core.name}]"
def __repr__(self):
ret = self._repr_no_signals()
for sig in self.signals._list:
ret += "\n " + "\n ".join(sig._no_desc().split("\n"))
return ret
def _check_existence(self):
# will fail if plugin was deleted
_ = self._core.plugin_id
@property
def signals(self):
return self._signals
@property
def table(self):
_table = self.table_int16_array
ret = [None] * self.table_len
for x in range(self.table_len):
ret[x] = _table[x]
return ret
@table.setter
def table(self, data):
_table = self.table_int16_array
for x in range(min(self.table_len, len(data))):
_table[x] = data[x]
@property
def table_pointer(self):
pointer = self._core.table_pointer
max_len = self._core.table_len
return (pointer, max_len)
@property
def table_bytearray(self):
pointer, max_len = self.table_pointer
bytes_len = max_len * 2
return uctypes.bytearray_at(pointer, bytes_len)
@property
def table_int8_array(self):
pointer, max_len = self.table_pointer
descriptor = {"table": (0 | uctypes.ARRAY, max_len * 2 | uctypes.INT8)}
struct = uctypes.struct(pointer, descriptor)
return struct.table
@property
def table_int16_array(self):
pointer, max_len = self.table_pointer
descriptor = {"table": (0 | uctypes.ARRAY, max_len | uctypes.INT16)}
struct = uctypes.struct(pointer, descriptor)
return struct.table
@property
def table_uint32_array(self):
pointer, max_len = self.table_pointer
descriptor = {"table": (0 | uctypes.ARRAY, (max_len // 2) | uctypes.UINT32)}
struct = uctypes.struct(pointer, descriptor)
return struct.table
_plugin_subclasses = {}
def _make_plugin(channel, plugin_id, *args, **kwargs):
if plugin_id in _plugin_subclasses:
return _plugin_subclasses[plugin_id](channel, None, plugin_id, *args, **kwargs)
else:
init_var = 0
_init_var = kwargs.get("init_var", None)
if _init_var is None:
if len(args) == 1:
init_var = int(args[0])
else:
init_var = _init_var
return _Plugin(channel, None, plugin_id, init_var)
def _get_plugin(core):
plugin_id = core.plugin_id
if plugin_id in _plugin_subclasses:
return _plugin_subclasses[plugin_id](None, core, plugin_id, core.init_var)
else:
return _Plugin(None, core, plugin_id, core.init_var)
def _plugin_set_subclass(plugin_id):
def decorator(cls):
_plugin_subclasses[plugin_id] = cls
return cls
return decorator
@_plugin_set_subclass(696969)
class _Sampler(_Plugin):
# divide by two if uint32_t
_READ_HEAD_POS = 0 // 2
_WRITE_HEAD_POS = 2 // 2
_SAMPLE_START = 4 // 2
_SAMPLE_LEN = 6 // 2
_SAMPLE_RATE = 8 // 2
_STATUS = 10
_BUFFER = 11
def __init__(self, channel, core, plugin_id, init_var=1000):
self._filename = ""
if core is not None:
super().__init__(channel, core, None)
self._memory_len = self.init_var
elif type(init_var) is str:
with wave.open(init_var, "r") as f:
self._memory_len = f.getnframes()
super().__init__(channel, None, plugin_id, init_var=self._memory_len)
self.load(init_var)
else:
self._memory_len = int(48 * init_var)
super().__init__(channel, None, plugin_id, init_var=self._memory_len)
def __repr__(self):
ret = super().__repr__()
ret += "\n playback "
if self.playback_loop:
ret += "(looped): "
else:
ret += "(single): "
if self.playback_progress is not None:
dots = int(self.playback_progress * 20)
ret += (
"["
+ "#" * dots
+ "-" * (20 - dots)
+ "] "
+ str(int(self.playback_progress * 100))
+ "%"
)
else:
ret += "idle"
ret += "\n record "
if self.record_overflow:
ret += "(overflow): "
else:
ret += "(autostop): "
if self.record_progress is not None:
dots = int(self.record_progress * 20)
ret += (
"["
+ "#" * dots
+ "-" * (20 - dots)
+ "] "
+ str(int(self.record_progress * 100))
+ "%"
)
else:
ret += "idle"
ret += "\n buffer: " + " " * 11
rel_fill = self.sample_length / self.buffer_length
dots = int(rel_fill * 20)
if dots > 19:
dots = 19
ret += (
"[" + "#" * dots + "-" * (19 - dots) + "] " + str(int(rel_fill * 100)) + "%"
)
if self.filename is not "":
ret += "\n file: " + self.filename
ret += "\n sample rate: " + " " * 6 + str(self.sample_rate)
return ret
def load(self, filename):
with wave.open(filename, "r") as f:
try:
assert f.getsampwidth() == 2
assert f.getnchannels() in (1, 2)
assert f.getcomptype() == "NONE"
except AssertionError:
raise Bl00mboxError("incompatible file format")
frames = f.getnframes()
if frames > self._memory_len:
frames = self._memory_len
self._sample_len_frames = frames
self.sample_rate = f.getframerate()
BUFFER_SIZE = int(48000 * 2.5)
if f.getnchannels() == 1:
# fast path for mono
table = self.table_bytearray
for i in range(
2 * self._BUFFER,
(self._sample_len_frames + self._BUFFER) * 2,
BUFFER_SIZE * 2,
):
table[i : i + BUFFER_SIZE * 2] = f.readframes(BUFFER_SIZE)
else:
# somewhat fast path for stereo
table = self.table_int16_array
for i in range(
self._BUFFER,
self._sample_len_frames + self._BUFFER,
BUFFER_SIZE,
):
frame = f.readframes(BUFFER_SIZE)
for j in range(0, len(frame) // 4):
value = int.from_bytes(frame[4 * j : 4 * j + 2], "little")
table[i + j] = value
self._filename = filename
self._set_status_bit(4, 0)
def save(self, filename):
with wave.open(filename, "w") as f:
f.setnchannels(1)
f.setsampwidth(2)
f.setframerate(self.sample_rate)
start = self._offset_index(0)
end = self._offset_index(self._sample_len_frames)
table = self.table_bytearray
if end > start:
f.writeframes(table[2 * start : 2 * end])
else:
f.writeframes(table[2 * start :])
f.writeframes(table[2 * self._BUFFER : 2 * end])
if self._filename:
self._filename = filename
self._set_status_bit(4, 0)
def _offset_index(self, index):
index += self._sample_start
if index >= self._memory_len:
index -= self._memory_len
index += self._BUFFER
return index
def _set_status_bit(self, bit, val):
table = self.table_int16_array
if val:
table[self._STATUS] = (table[self._STATUS] | (1 << bit)) & 0xFF
else:
table[self._STATUS] = (table[self._STATUS] & ~(1 << bit)) & 0xFF
@property
def filename(self):
if self._get_status_bit(4):
self._filename = ""
self._set_status_bit(4, 0)
return self._filename
def _get_status_bit(self, bit):
table = self.table_int16_array
return bool(table[self._STATUS] & (1 << bit))
@property
def playback_loop(self):
return self._get_status_bit(1)
@playback_loop.setter
def playback_loop(self, val):
self._set_status_bit(1, val)
@property
def record_overflow(self):
return self._get_status_bit(3)
@record_overflow.setter
def record_overflow(self, val):
self._set_status_bit(3, val)
@property
def record_progress(self):
if self._get_status_bit(2):
table = self.table_uint32_array
return table[self._WRITE_HEAD_POS] / self._memory_len
return None
@property
def playback_progress(self):
if self._get_status_bit(0):
table = self.table_uint32_array
return table[0] / table[3]
return None
@property
def _sample_start(self):
table = self.table_uint32_array
return table[2]
@_sample_start.setter
def _sample_start(self, val):
if val >= self._memory_len:
val = self._memory_len - 1
table = self.table_uint32_array
table[2] = int(val)
@property
def _sample_len_frames(self):
table = self.table_uint32_array
return table[self._SAMPLE_LEN]
@_sample_len_frames.setter
def _sample_len_frames(self, val):
val = int(val)
if val > 0:
table = self.table_uint32_array
table[self._SAMPLE_LEN] = val
@property
def sample_length(self):
return self._sample_len_frames
@property
def buffer_length(self):
return self._memory_len
@property
def sample_rate(self):
table = self.table_uint32_array
return table[self._SAMPLE_RATE]
@sample_rate.setter
def sample_rate(self, val):
table = self.table_uint32_array
if int(val) > 0:
table[self._SAMPLE_RATE] = int(val)
@_plugin_set_subclass(9000)
class _Distortion(_Plugin):
def curve_set_power(self, power=2, volume=32767, gate=0):
volume = min(max(volume, -32767), 32767)
table = [0] * 129
for num in range(len(table)):
if num < 64:
ret = num / 64 # scale to [0..1[ range
ret = ret**power
if ret > 1:
ret = 1
table[num] = int(volume * (ret - 1))
else:
ret = (128 - num) / 64 # scale to [0..1] range
ret = ret**power
table[num] = int(volume * (1 - ret))
gate = min(abs(int(gate)), 32767) >> 9
for i in range(64 - gate, 64 + gate):
table[i] = 0
self.table = table
@property
def _secret_sauce(self):
table = self.table_int16_array
return table[129]
@_secret_sauce.setter
def _secret_sauce(self, val):
val = min(max(int(val), 0), 7)
table = self.table_int16_array
table[129] = val
@property
def curve(self):
return self.table[:129]
@curve.setter
def curve(self, points):
# interpolation only implemented for len(points) <= 129,
# for longer lists data may be dropped.
points_size = len(points)
if not points_size:
return
table = [0] * 129
for x, num in enumerate(table):
position = x * (points_size - 1) / 129
lower = int(position)
lerp = position - lower
if position < points_size - 1:
table[x] = int((1 - lerp) * points[lower] + lerp * points[lower + 1])
else:
table[x] = int(points[points_size - 1])
self.table = table
def __repr__(self):
ret = super().__repr__()
wave = self.table[:129]
ret += "\n curve:\n"
ret += " " + "_" * 67 + "\n"
ret += " |" + " " * 67 + "|\n"
symbols = "UW"
symbol_counter = 0
for i in range(15, -1, -1):
line = " | "
for k in range(63):
vals = wave[2 * k : 2 * k + 4]
upper = ((max(vals) >> 8) + 128) >> 4
lower = ((min(vals) >> 8) + 128) >> 4
if (i >= lower) and (i <= upper):
line += symbols[symbol_counter]
symbol_counter = (symbol_counter + 1) % len(symbols)
else:
line += " "
line += " |\n"
ret += line
ret += " |" + "_" * 67 + "|"
return ret
@_plugin_set_subclass(172)
class _PolySqueeze(_Plugin):
def __init__(self, channel, core, plugin_id, num_outputs=3, num_inputs=10):
if core is None:
outs = max(min(num_outputs, 16), 1)
ins = max(min(num_inputs, 32), num_outputs)
init_var = outs + (ins * 256)
super().__init__(channel, core, plugin_id, init_var=init_var)
else:
super().__init__(channel, core, None)
@_plugin_set_subclass(0)
class _Noise(_Plugin):
@property
def speed(self):
if self.signals.speed.value < 0:
return "lfo"
else:
return "audio"
@speed.setter
def speed(self, val):
if val == "lfo":
self.signals.speed.switch.LFO = True
elif val == "audio":
self.signals.speed.switch.AUDIO = True
else:
raise ValueError('speed must be "lfo" or "audio"')
@_plugin_set_subclass(69)
class _RangeShifter(_Plugin):
@property
def speed(self):
val = self.signals.speed.value
if val <= -10922:
return "slow"
elif val < 10922:
return "slow_range"
else:
return "fast"
@speed.setter
def speed(self, val):
if val == "slow":
self.signals.speed.switch.SLOW = True
elif val == "slow_range":
self.signals.speed.switch.SLOW_RANGE = True
elif val == "fast":
self.signals.speed.switch.AUDIO = True
else:
raise ValueError('speed must be "slow", "slow_range" or "fast"')
@_plugin_set_subclass(21)
class _Mixer(_Plugin):
@property
def block_dc(self):
return self.signals.block_dc.value > 0
@block_dc.setter
def block_dc(self, val):
if val:
self.signals.block_dc.switch.ON = True
else:
self.signals.block_dc.switch.OFF = True
@_plugin_set_subclass(420)
class _Osc(_Plugin):
@property
def wave(self):
return tuple([self.table_int8_array[i] for i in range(64)])
@property
def speed(self):
val = self.signals.speed.value
if val < -10922:
return "lfo"
elif val < 10922:
return "auto"
else:
return "audio"
@speed.setter
def speed(self, val):
if val == "lfo":
self.signals.speed.switch.LFO = True
elif val == "auto":
self.signals.speed.switch.AUTO = True
elif val == "audio":
self.signals.speed.switch.AUDIO = True
else:
raise ValueError('speed must be "lfo", "auto" or "audio"')
@property
def antialiasing(self):
return bool(self.table_int8_array[64])
@antialiasing.setter
def antialiasing(self, val):
self.table_int8_array[64] = bool(val)
def __repr__(self):
ret = super().__repr__()
wave = self.wave
ret += "\n wave debug (lazy updates, may get stuck on old data):\n"
ret += " " + "_" * 68 + "\n"
ret += " |" + " " * 68 + "|\n"
symbols = "UW"
symbol_counter = 0
for i in range(15, -1, -1):
line = " | "
for j in range(64):
if j == 0:
upper = wave[63]
else:
upper = wave[j - 1]
upper = (upper + 128) >> 4
lower = (wave[j] + 128) >> 4
if lower > upper:
upper, lower = lower, upper
if (i >= lower) and (i <= upper):
line += symbols[symbol_counter]
symbol_counter = (symbol_counter + 1) % len(symbols)
else:
line += " "
line += " |\n"
ret += line
ret += " |" + "_" * 68 + "|"
return ret
@_plugin_set_subclass(56709)
class _Sequencer(_Plugin):
def __init__(self, channel, core, plugin_id, num_tracks=4, num_steps=16):
if core is None:
self.num_steps = num_steps % 256
self.num_tracks = num_tracks % 256
init_var = (self.num_steps * 256) + (self.num_tracks)
super().__init__(channel, core, plugin_id, init_var=init_var)
tracktable = [-32767] + ([0] * self.num_steps)
self.table = tracktable * self.num_tracks
else:
super().__init__(channel, core, None)
self.num_tracks = self.init_var % 256
self.num_steps = (self.init_var // 256) % 256
def __repr__(self):
ret = super().__repr__()
ret += (
"\n bpm: "
+ str(self.signals.bpm.value)
+ " @ 1/"
+ str(self.signals.beat_div.value)
)
ret += (
" step: "
+ str(self.signals.step.value - self.signals.step_start.value)
+ "/"
+ str(self.signals.step_end.value - self.signals.step_start.value)
)
ret += "\n [tracks]"
for track in range(self.num_tracks):
ret += (
"\n "
+ str(track)
+ " [ "
+ "".join(
[
"X " if self.trigger_state(track, x) > 0 else ". "
for x in range(self.num_steps)
]
)
+ "]"
)
return ret
def _get_table_index(self, track, step):
return step + 1 + track * (self.num_steps + 1)
def trigger_start(self, track, step, val=32767):
if val > 32767:
val = 32767
elif val < 1:
val = 1
table = self.table_int16_array
table[self._get_table_index(track, step)] = val
def trigger_stop(self, track, step):
table = self.table_int16_array
table[self._get_table_index(track, step)] = -1
def trigger_clear(self, track, step):
table = self.table_int16_array
table[self._get_table_index(track, step)] = 0
def trigger_state(self, track, step):
table = self.table_int16_array
return table[self._get_table_index(track, step)]
def trigger_toggle(self, track, step):
if self.trigger_state(track, step) == 0:
self.trigger_start(track, step)
else:
self.trigger_clear(track, step)
def save_track_pattern(self, track_index):
start = track_index * (self.num_steps + 1)
stop = start + self.num_steps + 1
track = {}
table = self.table
if self.table[start] == -32767:
track["type"] = "trigger"
else:
track["type"] = "value"
track["steps"] = list(table[start + 1 : stop])
return track
def load_track_pattern(self, track, track_index):
start = track_index * (self.num_steps + 1)
table = self.table_int16_array
stop = start + 1 + min(self.num_steps, len(track["steps"]))
for i in range(start + 1, stop):
x = track["steps"][i - start - 1]
assert (x < 32768) and (x > -32768)
table[i] = x
if track["type"] == "trigger":
table[start] = -32767
else:
table[start] = 32767
def save_pattern(self):
beat = {}
beat["tracks"] = [self.save_track_pattern(i) for i in range(self.num_tracks)]
return beat
def load_pattern(self, beat):
num_tracks = min(len(beat["tracks"]), self.num_tracks)
[self.load_track_pattern(beat["tracks"][i], i) for i in range(num_tracks)]
@_plugin_set_subclass(38)
class _TriggerMerge(_Plugin):
@property
def block_stop(self):
table = self.table_int16_array
return bool(table[0])
@block_stop.setter
def block_stop(self, val):
table = self.table_int16_array
table[0] = 1 if val else 0
# SPDX-License-Identifier: CC0-1.0
import sys_bl00mbox
# note: consider the 'sys_bl00mbox' api super unstable for now pls :3
import math
import uctypes
import bl00mbox
from bl00mbox import _helpers as helpers
from bl00mbox._patches import _Patch as Patch
class Bl00mboxError(Exception):
pass
def _make_signal(plugin, signal_num=0, core=None):
if core is None:
core = sys_bl00mbox.SignalCore(plugin._core, signal_num)
is_input = bool(core.hints & sys_bl00mbox.SIGNAL_HINT_INPUT)
is_output = bool(core.hints & sys_bl00mbox.SIGNAL_HINT_OUTPUT)
if is_output == is_input:
raise Bl00mboxError("signal must be either input or output")
if is_input:
signal = SignalInput(plugin, core)
else:
signal = SignalOutput(plugin, core)
return signal
class ChannelMixer:
def __init__(self, core):
self._core = core
def __repr__(self):
ret = f"[channel mixer] ({len(self.connections)} connections)"
for con in self.connections:
ret += f"\n {con.name} in {con._plugin._repr_no_signals()}"
return ret
@property
def connections(self):
cons = []
signal_cores = self._core.get_connected_mx()
for core in signal_cores:
plugin = bl00mbox._plugins._get_plugin(core.plugin_core)
cons.append(_make_signal(plugin, core=core))
return cons
class ValueSwitch:
def __init__(self, plugin, core):
self._plugin = plugin
self._core = core
def __setattr__(self, key, value):
if getattr(self, "_locked", False):
if not value:
return
val = getattr(self, key, None)
if val is None:
return
self._core.value = val
else:
super().__setattr__(key, value)
def get_value_name(self, val):
d = dict((k, v) for k, v in self.__dict__.items() if not k.startswith("_"))
for k, v in d.items():
if v == val:
return k
class Signal:
def __init__(self, plugin, core):
self._core = core
self._plugin = plugin
self._mpx = core.mpx
self._unit = core.unit
self._description = core.description
constants = {}
another_round = True
while another_round:
another_round = False
for k, char in enumerate(self._unit):
if char == "{":
for j, stopchar in enumerate(self._unit[k:]):
if stopchar == "}":
const = self._unit[k + 1 : k + j].split(":")
constants[const[0]] = int(const[1])
self._unit = self._unit[:k] + self._unit[k + j + 1 :]
break
another_round = True
break
if constants:
self._unit = self._unit.strip()
self.switch = ValueSwitch(plugin, self._core)
for key in constants:
setattr(self.switch, key, constants[key])
self.switch._locked = True
hint_list = []
if core.hints & sys_bl00mbox.SIGNAL_HINT_INPUT:
hint_list.append("input")
if core.hints & sys_bl00mbox.SIGNAL_HINT_OUTPUT:
hint_list.append("output")
if core.hints & sys_bl00mbox.SIGNAL_HINT_TRIGGER:
hint_list.append("trigger")
if core.hints & sys_bl00mbox.SIGNAL_HINT_SCT:
hint_list.append("pitch")
if core.hints & sys_bl00mbox.SIGNAL_HINT_GAIN:
hint_list.append("gain")
if core.hints & sys_bl00mbox.SIGNAL_HINT_DEPRECATED:
hint_list.append("deprecated")
self._hints = "/".join(hint_list)
@property
def connections(self):
cons = []
signal_cores = self._core.get_connected()
for core in signal_cores:
plugin = bl00mbox._plugins._get_plugin(core.plugin_core)
cons.append(_make_signal(plugin, core=core))
if self._core.connected_mx:
cons.append(ChannelMixer(None))
return cons
def __repr__(self):
ret = self._no_desc()
if len(self._description):
ret += "\n " + "\n ".join(self._description.split("\n"))
return ret
def _no_desc(self):
self._plugin._check_existence()
ret = self.name
if self._mpx != -1:
ret += "[" + str(self._mpx) + "]"
if len(self.unit):
ret += " [" + self.unit + "]"
ret += " [" + self.hints + "]: "
direction = " <?> "
val = self.value
if isinstance(self, SignalInput):
direction = " <= "
if len(self.connections):
val = self.connections[0].value
elif isinstance(self, SignalOutput):
direction = " => "
ret += str(val)
if getattr(self, "switch", None) is not None:
value_name = self.switch.get_value_name(val)
if value_name is not None:
ret += " (" + value_name.lower() + ")"
if self._core.hints & sys_bl00mbox.SIGNAL_HINT_SCT:
ret += " / " + str(self.tone) + " semitones / " + str(self.freq) + "Hz"
if self._core.hints & sys_bl00mbox.SIGNAL_HINT_GAIN:
if self.mult == 0:
ret += " / (mute)"
else:
ret += " / " + str(self.dB) + "dB / x" + str(self.mult)
conret = []
for con in self.connections:
if isinstance(con, Signal):
conret += [f"{direction}{con.name} in {con._plugin._repr_no_signals()}"]
if isinstance(con, ChannelMixer):
conret += [f"{direction}[channel mixer]"]
if len(conret) > 1:
nl = "\n "
ret += nl + nl.join(conret)
elif conret:
ret += conret[0]
return ret
@property
def name(self):
return self._core.name
@property
def description(self):
return self._core.description
@property
def unit(self):
return self._core.unit
@property
def hints(self):
return self._hints
@property
def value(self):
return self._core.value
@property
def tone(self):
return (self.value - (32767 - 2400 * 6)) / 200
@tone.setter
def tone(self, val):
if isinstance(self, SignalInput):
if (type(val) == int) or (type(val) == float):
self.value = (32767 - 2400 * 6) + 200 * val
if type(val) == str:
self.value = helpers.note_name_to_sct(val)
else:
raise AttributeError("can't set output signal")
@property
def freq(self):
tone = (self.value - (32767 - 2400 * 6)) / 200
return 440 * (2 ** (tone / 12))
@freq.setter
def freq(self, val):
if isinstance(self, SignalInput):
tone = 12 * math.log(val / 440, 2)
self.value = (32767 - 2400 * 6) + 200 * tone
else:
raise AttributeError("can't set output signal")
@property
def dB(self):
if self.value == 0:
return -9999
return 20 * math.log((abs(self.value) / 4096), 10)
@dB.setter
def dB(self, val):
if isinstance(self, SignalInput):
self.value = int(4096 * (10 ** (val / 20)))
else:
raise AttributeError("can't set output signal")
@property
def mult(self):
return self.value / 4096
@mult.setter
def mult(self, val):
if isinstance(self, SignalInput):
self.value = int(4096 * val)
else:
raise AttributeError("can't set output signal")
def start(self, velocity=32767):
if self.value > 0:
self.value = -abs(velocity)
else:
self.value = abs(velocity)
def stop(self):
self.value = 0
class SignalOutput(Signal):
@Signal.value.setter
def value(self, val):
if val is None:
self._core.disconnect()
elif isinstance(val, SignalInput):
val << self
elif isinstance(val, ChannelMixer):
self._core.connect_mx()
# fails silently bc of backwards compatibility :/
# we'll deprecate this setter entirely someday, use rshift instead
def __rshift__(self, other):
if not (
isinstance(other, SignalInput)
or isinstance(other, ChannelMixer)
or other is None
):
raise TypeError(f"can't connect SignalOutput to {type(other)}")
self.value = other
def __lshift__(self, other):
raise TypeError("output signals can't receive data from other signals")
class SignalInput(Signal):
@Signal.value.setter
def value(self, val):
if val is None:
self._core.disconnect()
elif isinstance(val, SignalOutput):
self._core.connect(val._core)
elif (type(val) == int) or (type(val) == float):
self._core.value = int(val)
# fails silently bc of backwards compatibility :/
def __lshift__(self, other):
if not (
isinstance(other, SignalOutput)
or type(other) == int
or type(other) == float
or other is None
):
raise TypeError(f"can't connect SignalInput to {type(other)}")
self.value = other
def __rshift__(self, other):
raise TypeError("input signals can't send data to other signals")
class SignalMpxList:
def __init__(self):
self._list = []
self._setattr_allowed = False
def __len__(self):
return len(self._list)
def __getitem__(self, index):
return self._list[index]
def __setitem__(self, index, value):
try:
current_value = self._list[index]
except:
raise AttributeError("index does not exist")
if isinstance(current_value, Signal):
current_value.value = value
else:
raise AttributeError("signal does not exist")
def __iter__(self):
self._iter_index = 0
return self
def __next__(self):
self._iter_index += 1
if self._iter_index >= len(self._list):
raise StopIteration
else:
return self._list[self._iter_index]
def add_new_signal(self, signal, index):
self._setattr_allowed = True
index = int(index)
if len(self._list) <= index:
self._list += [None] * (1 + index - len(self._list))
self._list[index] = signal
self._setattr_allowed = False
def __setattr__(self, key, value):
"""
current_value = getattr(self, key, None)
if isinstance(current_value, Signal):
current_value.value = value
return
"""
if key == "_setattr_allowed" or getattr(self, "_setattr_allowed", True):
super().__setattr__(key, value)
else:
raise AttributeError("signal does not exist")
class SignalList:
def __init__(self, plugin):
self._list = []
for signal_num in range(plugin._core.num_signals):
signal = _make_signal(plugin, signal_num=signal_num)
self._list.append(signal)
name = signal.name.split(" ")[0]
if signal._mpx == -1:
setattr(self, name, signal)
else:
# <LEGACY SUPPORT>
setattr(self, name + str(signal._mpx), signal)
# </LEGACY SUPPORT>
current_list = getattr(self, name, None)
if not isinstance(current_list, SignalMpxList):
setattr(self, name, SignalMpxList())
getattr(self, name, None).add_new_signal(signal, signal._mpx)
self._setattr_allowed = False
def __setattr__(self, key, value):
current_value = getattr(self, key, None)
if isinstance(current_value, Signal):
current_value.value = value
return
elif getattr(self, "_setattr_allowed", True):
super().__setattr__(key, value)
else:
raise AttributeError("signal does not exist")
_channel_init_callback = None
_rms_base = 3 - 10 * math.log(32767 * 32767, 10)
def set_channel_init_callback(callback):
global _channel_init_callback
_channel_init_callback = callback
class Channel:
_core_keys = (
"name",
"num_plugins",
"clear",
"volume",
"foreground",
"background_mute_override",
"callback",
"compute_rms",
"delete",
)
# we are passing through a lot of methods/properties to _core, so why not subclass?
# well, the core needs to run a destructor in the engine when it is garbage collected,
# and we're not so sure if it still does that when subclassed. we never actually tested
# it, but we made bad experiences. also this allows us to hide the sys_ methods from users.
#
# maybe the proper solution would be to move this entire thing to the backend, but if we
# ever wanna play around with fancy things like (de)serialization this might lead to a bigger
# code size. it's fine for now.
def __setattr__(self, key, value):
if key in self._core_keys:
setattr(self._core, key, value)
else:
super().__setattr__(key, value)
def __getattr__(self, key):
# __getattr__ is only fallback, but since we don't allow for keys in _core_keys
# to be stored within Channel this should suffice
if key in self._core_keys:
return getattr(self._core, key)
else:
raise AttributeError(f"'{type(self)}' object has no attribute {key}")
def __init__(self, name="repl"):
# never ever hand out the deep core to somebody else to make sure channel gets gc'd right
self._deep_core = sys_bl00mbox.ChannelCore(name)
self._core = self._deep_core.get_weak()
# also don't hand out self, use this one instead
self._weak_channel = WeakChannel(self._core)
if _channel_init_callback is not None:
_channel_init_callback(self)
self._channel_plugin = bl00mbox._plugins._get_plugin(self._core.channel_plugin)
@property
def signals(self):
return self._channel_plugin._signals
def __repr__(self):
ret = f'[channel "{self.name}"]'
if self.foreground:
ret += " (foreground)"
if self.background_mute_override:
ret += " (background mute override)"
ret += "\n gain: " + str(self.gain_dB) + "dB"
b = self.num_plugins
ret += "\n plugins: " + str(b)
ret += "\n " + "\n ".join(repr(self.mixer).split("\n"))
return ret
@property
def free(self):
# legacy oof
return self._core is None
@free.setter
def free(self, val):
# bigger legacy oof
if val:
self.clear()
self._core = None
def _new_plugin(self, thing, *args, **kwargs):
if isinstance(thing, bl00mbox._plugins._PluginDescriptor):
return bl00mbox._plugins._make_plugin(
self._weak_channel, thing.plugin_id, *args, **kwargs
)
else:
raise TypeError("not a plugin")
def new(self, thing, *args, **kwargs):
self.free = False
if type(thing) == type:
if issubclass(thing, bl00mbox.patches._Patch):
return thing(self._weak_channel, *args, **kwargs)
elif isinstance(thing, bl00mbox._plugins._PluginDescriptor) or (
type(thing) == int
):
return self._new_plugin(thing, *args, **kwargs)
else:
raise TypeError("invalid plugin/patch")
@property
def plugins(self):
for core in self._core.get_plugins():
yield bl00mbox._plugins._get_plugin(core)
@property
def gain_dB(self):
ret = self._core.volume
if ret == 0:
return -math.inf
else:
return 20 * math.log(ret / 32768, 10)
@gain_dB.setter
def gain_dB(self, value):
if value == -math.inf:
value = 0
else:
value = int(32768 * (10 ** (value / 20)))
value = min(32767, max(0, value))
self._core.volume = value
@property
def rms_dB(self):
if self.compute_rms:
ret = self._core.mean_square
if ret == 0:
return -math.inf
else:
# volume is applied together with sys_volume
# after rms is calculated. we do want the output
# before sys_volume but after regular volume,
# so we're compensating here
mult = abs(self._core.volume) / 32768
if mult == 0:
return -math.inf
# squaring because we're getting root later
ret *= mult * mult
return 10 * math.log(ret, 10) + _rms_base
else:
return None
@property
def mixer(self):
return ChannelMixer(self._core)
@mixer.setter
def mixer(self, val):
if isinstance(val, SignalOutput):
val.value = ChannelMixer(self._core)
elif val is not None:
# val is None: backwards compatibility, not allowed to throw exception ;w;
raise Bl00mboxError("can't connect this")
class WeakChannel(Channel):
def __init__(self, core):
self._core = core.get_weak()
self._channel_plugin = bl00mbox._plugins._get_plugin(self._core.channel_plugin)
self._weak_channel = self
class SysChannel(Channel):
def __init__(self, core):
self._deep_core = core
self._core = core.get_weak()
self._channel_plugin = bl00mbox._plugins._get_plugin(self._core.channel_plugin)
self._weak_channel = WeakChannel(self._core)
@property
def sys_gain_dB(self):
ret = self._core.sys_gain
if ret == 0:
return -math.inf
else:
try:
return 20 * math.log(ret / 4096, 10)
except:
return -math.inf
@sys_gain_dB.setter
def sys_gain_dB(self, value):
if value == -math.inf:
value = 0
else:
value = int(4096 * (10 ** (value / 20)))
value = min(32767, max(0, value))
self._core.sys_gain = value
@property
def sys_rms_dB(self):
if self.compute_rms:
ret = self._core.mean_square
if ret == 0:
return -math.inf
else:
mult = self._core.volume / 32768
mult *= self._core.sys_gain / 4096
if mult == 0:
return -math.inf
ret *= mult * mult
return 10 * math.log(ret, 10) + _rms_base
else:
return None
class Sys:
@staticmethod
def clear():
cores = sys_bl00mbox.collect_channels(False)
for core in cores:
core.delete()
@staticmethod
def foreground_clear():
cores = sys_bl00mbox.collect_channels(True)
for core in cores:
core.foreground = False
@staticmethod
def collect_channels(active, weak_ref=False):
cores = sys_bl00mbox.collect_channels(active)
for core in cores:
if weak_ref:
core = core.get_weak()
yield SysChannel(core)
@staticmethod
def collect_active_callbacks():
# Channel callbacks have a strange side effect on garbage collection:
#
# If a Channel has become unreachable but has not been collected yet, this will still fetch the
# ChannelCore object and the attached callback, meaning that from a gc perspective the channel
# "flickers" into reachability on the stack while the callback is fetched. if the callback
# contains the last reference to the channel (as it typically does), this flickering is extended
# for the duration of the callback execution.
#
# If garbage collection is triggered always in that exact moment, the channel will never
# be collected. In practice, this is very unlikely, so it should be fine.
#
# It is still good practice to call .clear() on the Channel before dropping the last
# reference as this will also remove the callback and set defined endpoint to Channel
# audio rendering.
#
# We are relying on implementation details of the garbage collector to ensure memory safety.
# micropython uses primitive conservative tracing gc triggered by some or the other threshold.
# It always sweeps everything it can before it returns control. See channel_core_obj_t for details.
#
# We never heard of this technique before, probably because it's risky. On the off chance we
# invented something here we'll dub this one "gc necromancy". Think of the flickering objects
# as "tiny little ghosts" that get summoned during the midnight hour and can only be gc'd at
# daytime :D. Necromancy is allowed to be risky business ^w^.
#
# hooray programming!
for core in sys_bl00mbox.collect_channels(True):
if core.callback and core.sys_gain:
yield core.callback
// SPDX-License-Identifier: CC0-1.0
#include <stdio.h>
#include "py/obj.h"
#include "py/runtime.h"
#include "bl00mbox.h"
#include "bl00mbox_os.h"
#include "bl00mbox_plugin_registry.h"
#include "bl00mbox_user.h"
#include "radspa.h"
#if !MICROPY_ENABLE_FINALISER
#error "bl00mbox needs finaliser"
#endif
#include "py/objexcept.h"
MP_DEFINE_EXCEPTION(ReferenceError, Exception)
typedef struct _channel_core_obj_t {
mp_obj_base_t base;
// will be NULLed on original if channel has been manually deleted
// both for weak and regular
bl00mbox_channel_t *chan;
// pointer to object that weakly references original channel object
// there is no more than one weak channel reference obj per channel
mp_obj_t weak;
// things we don't want garbage collected:
// - channel plugin
mp_obj_t channel_plugin;
// - list of all sources connect to the mixer
mp_obj_t sources_mx;
#ifdef BL00MBOX_CALLBACKS_FUNSAFE
// the channel may be become reachable after having been unreachable.
// if the callback was collected during this phase it results in a
// use-after-free, we're relying on the fact that if the callback
// was collected the channel must have been collected as well since
// the garbage collector sweeps everything directly after marking.
// conservative false positives for the channel are ok since this
// means the callback can't be collected either.
// see bl00mbox_config.h for more notes on this.
mp_obj_t callback;
#endif
} channel_core_obj_t;
STATIC const mp_obj_type_t channel_core_type;
typedef struct _plugin_core_obj_t {
mp_obj_base_t base;
// important note: the plugin may have been garbage collected along with
// its channel or it might have been deleted, in either case this pointer
// is set to NULL, so you need to check before using it. if you do anything
// that might trigger a gc (like creating objects) you need to check this
// pointer for NULL again.
bl00mbox_plugin_t *plugin;
// list of all source plugins to avoid gc
mp_obj_t sources;
} plugin_core_obj_t;
STATIC const mp_obj_type_t plugin_core_type;
typedef struct _signal_core_obj_t {
mp_obj_base_t base;
mp_obj_t plugin_core;
// same rules for garbage collection as for plugin_core_obj_t apply here
// too, call plugin_core_verify() on plugin_core after each potential
// garbage collection keeping a direct reference to keep the plugin alive as
// long as there's a signal in reach (unless the channel gets gc'd of
// course)
bl00mbox_signal_t signal;
} signal_core_obj_t;
STATIC const mp_obj_type_t signal_core_type;
STATIC mp_obj_t signal_core_make_new(const mp_obj_type_t *type, size_t n_args,
size_t n_kw, const mp_obj_t *args);
static inline void channel_core_verify(channel_core_obj_t *channel_core) {
if (!channel_core->chan)
mp_raise_msg(&mp_type_ReferenceError,
MP_ERROR_TEXT("channel was deleted"));
}
static inline void plugin_core_verify(plugin_core_obj_t *plugin_core) {
if (!plugin_core->plugin)
mp_raise_msg(&mp_type_ReferenceError,
MP_ERROR_TEXT("plugin was deleted"));
}
static void bl00mbox_error_unwrap(bl00mbox_error_t error) {
switch (error) {
case BL00MBOX_ERROR_OOM:
mp_raise_msg(&mp_type_OSError, MP_ERROR_TEXT("out of memory"));
case BL00MBOX_ERROR_INVALID_CONNECTION:
mp_raise_TypeError(MP_ERROR_TEXT("can't connect that"));
case BL00MBOX_ERROR_INVALID_IDENTIFIER:
mp_raise_TypeError(MP_ERROR_TEXT("bad identifier"));
default:;
}
}
void bl00mbox_disconnect_rx_callback(void *rx, uint16_t signal_index) {
plugin_core_obj_t *self = rx;
mp_obj_list_t *list = MP_OBJ_TO_PTR(self->sources);
if (signal_index < list->len) {
list->items[signal_index] = mp_const_none;
} else {
bl00mbox_log_error("plugin signal list too short");
}
}
void bl00mbox_connect_rx_callback(void *rx, void *tx, uint16_t signal_index) {
plugin_core_obj_t *self = rx;
mp_obj_list_t *list = MP_OBJ_TO_PTR(self->sources);
if (signal_index < list->len) {
list->items[signal_index] = MP_OBJ_FROM_PTR(tx);
} else {
bl00mbox_log_error("plugin signal list too short");
}
}
void bl00mbox_disconnect_mx_callback(void *chan, void *tx) {
channel_core_obj_t *self = chan;
// don't have to check for duplicates, bl00mbox_user does that for us
mp_obj_list_remove(self->sources_mx, MP_OBJ_FROM_PTR(tx));
}
void bl00mbox_connect_mx_callback(void *chan, void *tx) {
channel_core_obj_t *self = chan;
// don't have to check for duplicates, bl00mbox_user does that for us
mp_obj_list_append(self->sources_mx, MP_OBJ_FROM_PTR(tx));
}
static char *strdup_raise(char *str) {
char *ret = strdup(str);
if (!ret) bl00mbox_error_unwrap(BL00MBOX_ERROR_OOM);
return ret;
}
static mp_obj_t get_connections_from_array(bl00mbox_array_t *array) {
if (!array) bl00mbox_error_unwrap(BL00MBOX_ERROR_OOM);
if (array->len) {
int len = array->len;
bl00mbox_signal_t *signals[len];
memcpy(&signals, &array->elems, sizeof(void *) * len);
// need to free before we can longjump
free(array);
mp_obj_t elems[len];
int output_index = 0;
for (int i = 0; i < len; i++) {
plugin_core_obj_t *core = signals[i]->plugin->parent;
// gc can happen during this loop. the objects we created are safe
// as they are on the stack, but unprocessed signals might've been
// collected
if (!core->plugin) continue;
mp_obj_t args[2] = { MP_OBJ_FROM_PTR(core),
mp_obj_new_int(signals[i]->index) };
elems[output_index] =
signal_core_make_new(&signal_core_type, 2, 0, args);
output_index++;
}
return mp_obj_new_list(output_index, elems);
} else {
free(array);
return mp_obj_new_list(0, NULL);
}
}
static mp_obj_t get_plugins_from_array(bl00mbox_array_t *array) {
if (!array) bl00mbox_error_unwrap(BL00MBOX_ERROR_OOM);
if (array->len) {
int len = array->len;
bl00mbox_plugin_t *plugins[len];
memcpy(&plugins, &array->elems, sizeof(void *) * len);
// need to free before we can longjump
free(array);
mp_obj_t elems[len];
int output_index = 0;
for (int i = 0; i < len; i++) {
plugin_core_obj_t *core = plugins[i]->parent;
// gc shouldn't happen during this loop but we keep it in anyways :3
if (!core->plugin) continue;
elems[output_index] = MP_OBJ_FROM_PTR(core);
output_index++;
}
return mp_obj_new_list(output_index, elems);
} else {
free(array);
return mp_obj_new_list(0, NULL);
}
}
// ========================
// PLUGIN REGISTRY
// ========================
// TODO: clean this up
STATIC mp_obj_t mp_plugin_index_get_id(mp_obj_t index) {
radspa_descriptor_t *desc =
bl00mbox_plugin_registry_get_descriptor_from_index(
mp_obj_get_int(index));
if (desc == NULL) return mp_const_none;
return mp_obj_new_int(desc->id);
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(mp_plugin_index_get_id_obj,
mp_plugin_index_get_id);
STATIC mp_obj_t mp_plugin_index_get_name(mp_obj_t index) {
/// prints name
radspa_descriptor_t *desc =
bl00mbox_plugin_registry_get_descriptor_from_index(
mp_obj_get_int(index));
if (desc == NULL) return mp_const_none;
return mp_obj_new_str(desc->name, strlen(desc->name));
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(mp_plugin_index_get_name_obj,
mp_plugin_index_get_name);
STATIC mp_obj_t mp_plugin_index_get_description(mp_obj_t index) {
/// prints name
radspa_descriptor_t *desc =
bl00mbox_plugin_registry_get_descriptor_from_index(
mp_obj_get_int(index));
if (desc == NULL) return mp_const_none;
return mp_obj_new_str(desc->description, strlen(desc->description));
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(mp_plugin_index_get_description_obj,
mp_plugin_index_get_description);
STATIC mp_obj_t mp_plugin_registry_num_plugins(void) {
return mp_obj_new_int(bl00mbox_plugin_registry_get_plugin_num());
}
STATIC MP_DEFINE_CONST_FUN_OBJ_0(mp_plugin_registry_num_plugins_obj,
mp_plugin_registry_num_plugins);
// ========================
// CHANNELS
// ========================
static mp_obj_t create_channel_plugin(bl00mbox_channel_t *chan) {
plugin_core_obj_t *self = m_new_obj_with_finaliser(plugin_core_obj_t);
self->base.type = &plugin_core_type;
self->plugin = chan->channel_plugin;
self->plugin->parent = self;
self->plugin->parent_self_ref = &self->plugin;
size_t num_signals = self->plugin->rugin->len_signals;
mp_obj_t nones[num_signals];
for (size_t i = 0; i < num_signals; i++) nones[i] = mp_const_none;
self->sources = mp_obj_new_list(num_signals, nones);
return MP_OBJ_FROM_PTR(self);
}
static mp_obj_t create_weak_channel_core(channel_core_obj_t *self) {
channel_core_obj_t *weak = m_new_obj_with_finaliser(channel_core_obj_t);
weak->base.type = &channel_core_type;
weak->weak = MP_OBJ_FROM_PTR(weak);
weak->chan = self->chan;
weak->chan->weak_parent_self_ref = &weak->chan;
// only the real channel should prevent these from being gc'd
weak->sources_mx = MP_OBJ_NULL;
weak->channel_plugin = MP_OBJ_NULL;
#ifdef BL00MBOX_CALLBACKS_FUNSAFE
weak->callback = MP_OBJ_NULL;
#endif
return MP_OBJ_FROM_PTR(weak);
}
STATIC mp_obj_t channel_core_make_new(const mp_obj_type_t *type, size_t n_args,
size_t n_kw, const mp_obj_t *args) {
mp_arg_check_num(n_args, n_kw, 1, 1, false);
channel_core_obj_t *self = m_new_obj_with_finaliser(channel_core_obj_t);
self->chan = NULL; // must initialize if obj gets gc'd due to exception
self->base.type = &channel_core_type;
self->sources_mx = mp_obj_new_list(0, NULL);
const char *name = strdup_raise(mp_obj_str_get_str(args[0]));
bl00mbox_channel_t *chan = bl00mbox_channel_create();
if (!chan) bl00mbox_error_unwrap(BL00MBOX_ERROR_OOM);
chan->name = name;
chan->parent_self_ref = &self->chan;
chan->parent = self;
self->chan = chan;
self->weak = create_weak_channel_core(self);
self->channel_plugin = create_channel_plugin(chan);
#ifdef BL00MBOX_CALLBACKS_FUNSAFE
self->callback = mp_const_none;
#endif
bl00mbox_log_info("created channel %s", chan->name);
return MP_OBJ_FROM_PTR(self);
}
STATIC mp_obj_t mp_channel_core_get_weak(mp_obj_t self_in) {
channel_core_obj_t *self = MP_OBJ_TO_PTR(self_in);
channel_core_verify(self);
return self->weak;
}
MP_DEFINE_CONST_FUN_OBJ_1(mp_channel_core_get_weak_obj,
mp_channel_core_get_weak);
mp_obj_t mp_channel_core_del(mp_obj_t self_in) {
channel_core_obj_t *self = MP_OBJ_TO_PTR(self_in);
if (self->chan && (self->weak != self_in)) {
bl00mbox_log_info("destroyed channel %s", self->chan->name);
bl00mbox_channel_destroy(self->chan);
}
return mp_const_none;
}
MP_DEFINE_CONST_FUN_OBJ_1(mp_channel_core_del_obj, mp_channel_core_del);
mp_obj_t mp_channel_core_delete(mp_obj_t self_in) {
channel_core_obj_t *self = MP_OBJ_TO_PTR(self_in);
channel_core_verify(self);
bl00mbox_log_info("destroyed channel %s", self->chan->name);
bl00mbox_channel_destroy(self->chan);
return mp_const_none;
}
MP_DEFINE_CONST_FUN_OBJ_1(mp_channel_core_delete_obj, mp_channel_core_delete);
mp_obj_t mp_channel_core_clear(mp_obj_t self_in) {
channel_core_obj_t *self = MP_OBJ_TO_PTR(self_in);
channel_core_verify(self);
self->callback = mp_const_none;
bl00mbox_channel_clear(self->chan);
return mp_const_none;
}
MP_DEFINE_CONST_FUN_OBJ_1(mp_channel_core_clear_obj, mp_channel_core_clear);
STATIC mp_obj_t mp_channel_core_get_connected_mx(mp_obj_t self_in) {
channel_core_obj_t *self = MP_OBJ_TO_PTR(self_in);
channel_core_verify(self);
bl00mbox_array_t *array =
bl00mbox_channel_collect_connections_mx(self->chan);
return get_connections_from_array(array);
}
MP_DEFINE_CONST_FUN_OBJ_1(mp_channel_core_get_connected_mx_obj,
mp_channel_core_get_connected_mx);
STATIC mp_obj_t mp_channel_core_get_plugins(mp_obj_t self_in) {
channel_core_obj_t *self = MP_OBJ_TO_PTR(self_in);
channel_core_verify(self);
bl00mbox_array_t *array = bl00mbox_channel_collect_plugins(self->chan);
return get_plugins_from_array(array);
}
MP_DEFINE_CONST_FUN_OBJ_1(mp_channel_core_get_plugins_obj,
mp_channel_core_get_plugins);
STATIC void channel_core_attr(mp_obj_t self_in, qstr attr, mp_obj_t *dest) {
channel_core_obj_t *self = MP_OBJ_TO_PTR(self_in);
bl00mbox_channel_t *chan = self->chan;
// if gc_sweep tries to load __del__ we mustn't raise exceptions, setjmp
// isn't set up
if (attr != MP_QSTR___del__) channel_core_verify(self);
if (dest[0] != MP_OBJ_NULL) {
bool attr_exists = true;
if (attr == MP_QSTR_volume) {
int gain = abs(mp_obj_get_int(dest[1]));
chan->volume = gain > 32767 ? 32767 : gain;
} else if (attr == MP_QSTR_sys_gain) {
int gain = abs(mp_obj_get_int(dest[1]));
chan->sys_gain = gain > 32767 ? 32767 : gain;
} else if (attr == MP_QSTR_compute_rms) {
chan->compute_rms = mp_obj_is_true(dest[1]);
if (!chan->compute_rms) chan->mean_square = 0;
} else if (attr == MP_QSTR_background_mute_override) {
bl00mbox_channel_set_background_mute_override(
chan, mp_obj_is_true(dest[1]));
} else if (attr == MP_QSTR_foreground) {
bl00mbox_channel_set_foreground(chan, mp_obj_is_true(dest[1]));
#ifdef BL00MBOX_CALLBACKS_FUNSAFE
} else if (attr == MP_QSTR_callback) {
((channel_core_obj_t *)chan->parent)->callback = dest[1];
#endif
} else {
attr_exists = false;
}
if (attr_exists) dest[0] = MP_OBJ_NULL;
} else {
if (attr == MP_QSTR_volume) {
dest[0] = mp_obj_new_int(chan->volume);
} else if (attr == MP_QSTR_sys_gain) {
dest[0] = mp_obj_new_int(chan->sys_gain);
} else if (attr == MP_QSTR_mean_square) {
dest[0] = mp_obj_new_int(chan->mean_square);
} else if (attr == MP_QSTR_compute_rms) {
dest[0] = mp_obj_new_bool(chan->compute_rms);
} else if (attr == MP_QSTR_name) {
char *ret = strdup_raise(chan->name);
dest[0] = mp_obj_new_str(ret, strlen(ret));
free(ret);
} else if (attr == MP_QSTR_channel_plugin) {
dest[0] = ((channel_core_obj_t *)chan->parent)->channel_plugin;
} else if (attr == MP_QSTR_callback) {
#ifdef BL00MBOX_CALLBACKS_FUNSAFE
dest[0] = ((channel_core_obj_t *)chan->parent)->callback;
#else
dest[0] = mp_const_none;
#endif
} else if (attr == MP_QSTR_background_mute_override) {
dest[0] = mp_obj_new_bool(chan->background_mute_override);
} else if (attr == MP_QSTR_foreground) {
dest[0] = mp_obj_new_bool(bl00mbox_channel_get_foreground(chan));
} else if (attr == MP_QSTR_num_plugins) {
dest[0] = mp_obj_new_int(bl00mbox_channel_plugins_num(chan));
} else if (attr == MP_QSTR_num_conns) {
dest[0] = mp_obj_new_int(bl00mbox_channel_conns_num(chan));
} else if (attr == MP_QSTR_num_mixer) {
dest[0] = mp_obj_new_int(bl00mbox_channel_mixer_num(chan));
} else {
dest[1] = MP_OBJ_SENTINEL;
}
}
}
STATIC const mp_rom_map_elem_t channel_core_locals_dict_table[] = {
{ MP_ROM_QSTR(MP_QSTR___del__), MP_ROM_PTR(&mp_channel_core_del_obj) },
{ MP_ROM_QSTR(MP_QSTR_delete), MP_ROM_PTR(&mp_channel_core_delete_obj) },
{ MP_ROM_QSTR(MP_QSTR_clear), MP_ROM_PTR(&mp_channel_core_clear_obj) },
{ MP_ROM_QSTR(MP_QSTR_get_weak),
MP_ROM_PTR(&mp_channel_core_get_weak_obj) },
{ MP_ROM_QSTR(MP_QSTR_get_connected_mx),
MP_ROM_PTR(&mp_channel_core_get_connected_mx_obj) },
{ MP_ROM_QSTR(MP_QSTR_get_plugins),
MP_ROM_PTR(&mp_channel_core_get_plugins_obj) },
};
STATIC MP_DEFINE_CONST_DICT(channel_core_locals_dict,
channel_core_locals_dict_table);
STATIC MP_DEFINE_CONST_OBJ_TYPE(channel_core_type, MP_QSTR_ChannelCore,
MP_TYPE_FLAG_NONE, make_new,
channel_core_make_new, locals_dict,
&channel_core_locals_dict, attr,
channel_core_attr);
static mp_obj_t mp_collect_channels(mp_obj_t active_in) {
// critical phase: make sure to not trigger any garbage collection until
// these are on the stack!!
bl00mbox_array_t *chans =
bl00mbox_collect_channels(mp_obj_is_true(active_in));
mp_obj_t ret = MP_OBJ_NULL;
if (chans && chans->len) {
mp_obj_t elems[chans->len];
size_t output_index = 0;
for (size_t input_index = 0; input_index < chans->len; input_index++) {
bl00mbox_channel_t *chan = chans->elems[input_index];
channel_core_obj_t *core = chan->parent;
if (core->base.type != &channel_core_type) {
bl00mbox_log_error("channel core corrupted");
continue;
}
elems[output_index] = MP_OBJ_FROM_PTR(core);
output_index++;
}
ret = mp_obj_new_list(output_index, elems);
} else {
ret = mp_obj_new_list(0, NULL);
}
free(chans);
return ret;
}
STATIC MP_DEFINE_CONST_FUN_OBJ_1(mp_collect_channels_obj, mp_collect_channels);
// ========================
// PLUGINS
// ========================
STATIC mp_obj_t plugin_core_make_new(const mp_obj_type_t *type, size_t n_args,
size_t n_kw, const mp_obj_t *args) {
mp_arg_check_num(n_args, n_kw, 3, 3, false);
plugin_core_obj_t *self = m_new_obj_with_finaliser(plugin_core_obj_t);
self->base.type = &plugin_core_type;
channel_core_obj_t *chan_core = MP_OBJ_TO_PTR(args[0]);
if (chan_core->base.type != &channel_core_type) {
mp_raise_TypeError(MP_ERROR_TEXT("first argument must be ChannelCore"));
}
channel_core_verify(chan_core);
bl00mbox_channel_t *chan = chan_core->chan;
int plugin_kind = mp_obj_get_int(args[1]);
int init_var = mp_obj_get_int(args[2]);
bl00mbox_plugin_t *plugin =
bl00mbox_plugin_create(chan, plugin_kind, init_var);
if (!plugin) bl00mbox_error_unwrap(BL00MBOX_ERROR_OOM);
self->plugin = plugin;
plugin->parent = self;
plugin->parent_self_ref = &self->plugin;
// create empty source list
size_t num_signals = plugin->rugin->len_signals;
mp_obj_t nones[num_signals];
for (size_t i = 0; i < num_signals; i++) nones[i] = mp_const_none;
self->sources = mp_obj_new_list(num_signals, nones);
bl00mbox_log_info("created plugin %s",
self->plugin->rugin->descriptor->name);
return MP_OBJ_FROM_PTR(self);
}
mp_obj_t mp_plugin_core_del(mp_obj_t self_in) {
plugin_core_obj_t *self = MP_OBJ_TO_PTR(self_in);
// do not verify here as it will result in prints from the gc if the channel
// object was already collected. don't gc the channel plugin, it is deleted
// with the channel.
if (self->plugin &&
(self->plugin->rugin->descriptor->id != BL00MBOX_CHANNEL_PLUGIN_ID)) {
bl00mbox_log_info("deleted plugin %s",
self->plugin->rugin->descriptor->name);
bl00mbox_plugin_destroy(self->plugin);
}
return mp_const_none;
}
MP_DEFINE_CONST_FUN_OBJ_1(mp_plugin_core_del_obj, mp_plugin_core_del);
mp_obj_t mp_plugin_core_delete(mp_obj_t self_in) {
plugin_core_obj_t *self = MP_OBJ_TO_PTR(self_in);
plugin_core_verify(self);
if (self->plugin->rugin->descriptor->id == BL00MBOX_CHANNEL_PLUGIN_ID) {
mp_raise_TypeError(
MP_ERROR_TEXT("cannot manually delete channel plugin"));
}
bl00mbox_log_info("deleted plugin");
bl00mbox_plugin_destroy(self->plugin);
return mp_const_none;
}
MP_DEFINE_CONST_FUN_OBJ_1(mp_plugin_core_delete_obj, mp_plugin_core_delete);
STATIC void plugin_core_attr(mp_obj_t self_in, qstr attr, mp_obj_t *dest) {
plugin_core_obj_t *self = MP_OBJ_TO_PTR(self_in);
bl00mbox_plugin_t *plugin = self->plugin;
// if gc_sweep tries to load __del__ we mustn't raise exceptions, setjmp
// isn't set up
if (attr != MP_QSTR___del__) plugin_core_verify(self);
if (dest[0] != MP_OBJ_NULL) {
if (attr == MP_QSTR_always_render) {
bl00mbox_plugin_set_always_render(plugin, mp_obj_is_true(dest[1]));
dest[0] = MP_OBJ_NULL;
}
} else {
if (attr == MP_QSTR_name) {
// no need to strdup here, descriptors don't die
char *ret = plugin->rugin->descriptor->name;
dest[0] = mp_obj_new_str(ret, strlen(ret));
} else if (attr == MP_QSTR_id) {
dest[0] = mp_obj_new_int(plugin->id);
} else if (attr == MP_QSTR_plugin_id) {
dest[0] = mp_obj_new_int(plugin->rugin->descriptor->id);
} else if (attr == MP_QSTR_init_var) {
dest[0] = mp_obj_new_int(plugin->init_var);
} else if (attr == MP_QSTR_num_signals) {
dest[0] = mp_obj_new_int(plugin->rugin->len_signals);
} else if (attr == MP_QSTR_always_render) {
dest[0] = mp_obj_new_bool(plugin->always_render);
} else if (attr == MP_QSTR_table_len) {
dest[0] = mp_obj_new_int(plugin->rugin->plugin_table_len);
} else if (attr == MP_QSTR_table_pointer) {
// if there's no table this returns 0, which is fine by us
int16_t *ret = plugin->rugin->plugin_table;
dest[0] = mp_obj_new_int_from_uint((uint32_t)ret);
} else {
dest[1] = MP_OBJ_SENTINEL;
}
}
}
STATIC const mp_rom_map_elem_t plugin_core_locals_dict_table[] = {
{ MP_ROM_QSTR(MP_QSTR___del__), MP_ROM_PTR(&mp_plugin_core_del_obj) },
{ MP_ROM_QSTR(MP_QSTR_delete), MP_ROM_PTR(&mp_plugin_core_delete_obj) },
};
STATIC MP_DEFINE_CONST_DICT(plugin_core_locals_dict,
plugin_core_locals_dict_table);
STATIC MP_DEFINE_CONST_OBJ_TYPE(plugin_core_type, MP_QSTR_PluginCore,
MP_TYPE_FLAG_NONE, make_new,
plugin_core_make_new, locals_dict,
&plugin_core_locals_dict, attr,
plugin_core_attr);
// ========================
// SIGNALS
// ========================
STATIC mp_obj_t signal_core_make_new(const mp_obj_type_t *type, size_t n_args,
size_t n_kw, const mp_obj_t *args) {
mp_arg_check_num(n_args, n_kw, 2, 2, false);
plugin_core_obj_t *plugin_core = MP_OBJ_TO_PTR(args[0]);
if (plugin_core->base.type != &plugin_core_type) {
mp_raise_TypeError(MP_ERROR_TEXT("first argument must be PluginCore"));
}
// do this before verification
signal_core_obj_t *self = m_new_obj(signal_core_obj_t);
int index = mp_obj_get_int(args[1]);
plugin_core_verify(plugin_core);
radspa_t *rugin = plugin_core->plugin->rugin;
if (index >= rugin->len_signals) {
mp_raise_msg(&mp_type_IndexError,
MP_ERROR_TEXT("signal index out of range"));
}
self->base.type = &signal_core_type;
self->plugin_core = plugin_core;
self->signal.rignal = &rugin->signals[index];
self->signal.plugin = plugin_core->plugin;
self->signal.index = index;
return MP_OBJ_FROM_PTR(self);
}
STATIC mp_obj_t mp_signal_core_connect(mp_obj_t self_in, mp_obj_t other_in) {
signal_core_obj_t *self = MP_OBJ_TO_PTR(self_in);
plugin_core_verify(MP_OBJ_TO_PTR(self->plugin_core));
signal_core_obj_t *other = MP_OBJ_TO_PTR(other_in);
plugin_core_verify(MP_OBJ_TO_PTR(other->plugin_core));
bl00mbox_error_unwrap(
bl00mbox_signal_connect(&self->signal, &other->signal));
return mp_const_none;
}
MP_DEFINE_CONST_FUN_OBJ_2(mp_signal_core_connect_obj, mp_signal_core_connect);
// legacy support
STATIC mp_obj_t mp_signal_core_connect_mx(mp_obj_t self_in) {
signal_core_obj_t *self = MP_OBJ_TO_PTR(self_in);
plugin_core_verify(MP_OBJ_TO_PTR(self->plugin_core));
bl00mbox_error_unwrap(bl00mbox_signal_connect_mx(&self->signal));
return mp_const_none;
}
MP_DEFINE_CONST_FUN_OBJ_1(mp_signal_core_connect_mx_obj,
mp_signal_core_connect_mx);
STATIC mp_obj_t mp_signal_core_disconnect(mp_obj_t self_in) {
signal_core_obj_t *self = MP_OBJ_TO_PTR(self_in);
plugin_core_verify(MP_OBJ_TO_PTR(self->plugin_core));
bl00mbox_signal_disconnect(&self->signal);
return mp_const_none;
}
MP_DEFINE_CONST_FUN_OBJ_1(mp_signal_core_disconnect_obj,
mp_signal_core_disconnect);
STATIC mp_obj_t mp_signal_core_get_connected(mp_obj_t self_in) {
signal_core_obj_t *self = MP_OBJ_TO_PTR(self_in);
plugin_core_verify(MP_OBJ_TO_PTR(self->plugin_core));
bl00mbox_array_t *array =
bl00mbox_signal_collect_connections(&self->signal);
return get_connections_from_array(array);
}
MP_DEFINE_CONST_FUN_OBJ_1(mp_signal_core_get_connected_obj,
mp_signal_core_get_connected);
STATIC void signal_core_attr(mp_obj_t self_in, qstr attr, mp_obj_t *dest) {
signal_core_obj_t *self = MP_OBJ_TO_PTR(self_in);
plugin_core_obj_t *plugin_core = MP_OBJ_TO_PTR(self->plugin_core);
bl00mbox_signal_t *signal = &self->signal;
radspa_signal_t *rignal = signal->rignal;
// if gc_sweep tries to load __del__ we mustn't raise exceptions, setjmp
// isn't set up
if (attr != MP_QSTR___del__) plugin_core_verify(plugin_core);
if (dest[0] != MP_OBJ_NULL) {
if (attr == MP_QSTR_value) {
if (bl00mbox_signal_is_input(signal)) {
bl00mbox_error_unwrap(
bl00mbox_signal_set_value(signal, mp_obj_get_int(dest[1])));
dest[0] = MP_OBJ_NULL;
}
}
} else {
if (attr == MP_QSTR_index) {
dest[0] = mp_obj_new_int(signal->index);
} else if (attr == MP_QSTR_plugin_core) {
dest[0] = plugin_core;
} else if (attr == MP_QSTR_name) {
char *ret = strdup_raise(rignal->name);
dest[0] = mp_obj_new_str(ret, strlen(ret));
free(ret);
} else if (attr == MP_QSTR_mpx) {
dest[0] = mp_obj_new_int(rignal->name_multiplex);
} else if (attr == MP_QSTR_description) {
char *ret = strdup_raise(rignal->description);
dest[0] = mp_obj_new_str(ret, strlen(ret));
free(ret);
} else if (attr == MP_QSTR_unit) {
char *ret = strdup_raise(rignal->unit);
dest[0] = mp_obj_new_str(ret, strlen(ret));
free(ret);
} else if (attr == MP_QSTR_value) {
dest[0] = mp_obj_new_int(bl00mbox_signal_get_value(signal));
} else if (attr == MP_QSTR_connected_mx) {
bool ret = false;
if (bl00mbox_signal_is_output(signal)) {
bl00mbox_connection_t *conn =
bl00mbox_connection_from_signal(signal);
if (conn) ret = conn->connected_to_mixer;
}
dest[0] = mp_obj_new_bool(ret);
} else if (attr == MP_QSTR_hints) {
// this should maybe be uint someday :>
dest[0] = mp_obj_new_int(rignal->hints);
} else {
dest[1] = MP_OBJ_SENTINEL;
}
}
}
STATIC const mp_rom_map_elem_t signal_core_locals_dict_table[] = {
{ MP_ROM_QSTR(MP_QSTR_connect), MP_ROM_PTR(&mp_signal_core_connect_obj) },
{ MP_ROM_QSTR(MP_QSTR_connect_mx),
MP_ROM_PTR(&mp_signal_core_connect_mx_obj) },
{ MP_ROM_QSTR(MP_QSTR_disconnect),
MP_ROM_PTR(&mp_signal_core_disconnect_obj) },
{ MP_ROM_QSTR(MP_QSTR_get_connected),
MP_ROM_PTR(&mp_signal_core_get_connected_obj) },
};
STATIC MP_DEFINE_CONST_DICT(signal_core_locals_dict,
signal_core_locals_dict_table);
STATIC MP_DEFINE_CONST_OBJ_TYPE(signal_core_type, MP_QSTR_SignalCore,
MP_TYPE_FLAG_NONE, make_new,
signal_core_make_new, locals_dict,
&signal_core_locals_dict, attr,
signal_core_attr);
STATIC const mp_rom_map_elem_t bl00mbox_globals_table[] = {
{ MP_OBJ_NEW_QSTR(MP_QSTR___name__),
MP_OBJ_NEW_QSTR(MP_QSTR_sys_bl00mbox) },
// PLUGIN REGISTRY
{ MP_ROM_QSTR(MP_QSTR_plugin_registry_num_plugins),
MP_ROM_PTR(&mp_plugin_registry_num_plugins_obj) },
{ MP_ROM_QSTR(MP_QSTR_plugin_index_get_id),
MP_ROM_PTR(&mp_plugin_index_get_id_obj) },
{ MP_ROM_QSTR(MP_QSTR_plugin_index_get_name),
MP_ROM_PTR(&mp_plugin_index_get_name_obj) },
{ MP_ROM_QSTR(MP_QSTR_plugin_index_get_description),
MP_ROM_PTR(&mp_plugin_index_get_description_obj) },
// CHANNELS
{ MP_OBJ_NEW_QSTR(MP_QSTR_ChannelCore), (mp_obj_t)&channel_core_type },
{ MP_ROM_QSTR(MP_QSTR_collect_channels),
MP_ROM_PTR(&mp_collect_channels_obj) },
// PLUGINS
{ MP_OBJ_NEW_QSTR(MP_QSTR_PluginCore), (mp_obj_t)&plugin_core_type },
// SIGNALS
{ MP_OBJ_NEW_QSTR(MP_QSTR_SignalCore), (mp_obj_t)&signal_core_type },
// CONSTANTS
{ MP_ROM_QSTR(MP_QSTR_SIGNAL_HINT_OUTPUT),
MP_ROM_INT(RADSPA_SIGNAL_HINT_OUTPUT) },
{ MP_ROM_QSTR(MP_QSTR_SIGNAL_HINT_INPUT),
MP_ROM_INT(RADSPA_SIGNAL_HINT_INPUT) },
{ MP_ROM_QSTR(MP_QSTR_SIGNAL_HINT_SCT),
MP_ROM_INT(RADSPA_SIGNAL_HINT_SCT) },
{ MP_ROM_QSTR(MP_QSTR_SIGNAL_HINT_GAIN),
MP_ROM_INT(RADSPA_SIGNAL_HINT_GAIN) },
{ MP_ROM_QSTR(MP_QSTR_SIGNAL_HINT_TRIGGER),
MP_ROM_INT(RADSPA_SIGNAL_HINT_TRIGGER) },
{ MP_ROM_QSTR(MP_QSTR_SIGNAL_HINT_DEPRECATED),
MP_ROM_INT(RADSPA_SIGNAL_HINT_DEPRECATED) },
{ MP_ROM_QSTR(MP_QSTR_BL00MBOX_CHANNEL_PLUGIN_ID),
MP_ROM_INT(BL00MBOX_CHANNEL_PLUGIN_ID) },
{ MP_ROM_QSTR(MP_QSTR_ReferenceError),
MP_ROM_PTR(&mp_type_ReferenceError) },
};
STATIC MP_DEFINE_CONST_DICT(mp_module_bl00mbox_globals, bl00mbox_globals_table);
const mp_obj_module_t bl00mbox_user_cmodule = {
.base = { &mp_type_module },
.globals = (mp_obj_dict_t *)&mp_module_bl00mbox_globals,
};
MP_REGISTER_MODULE(MP_QSTR_sys_bl00mbox, bl00mbox_user_cmodule);
#include "bl00mbox_channel_plugin.h"
#include "bl00mbox_config.h"
radspa_descriptor_t bl00mbox_channel_plugin_desc = {
.name = "channel",
.id = BL00MBOX_CHANNEL_PLUGIN_ID,
.description = "bl00mbox channel signals",
.create_plugin_instance = bl00mbox_channel_plugin_create,
.destroy_plugin_instance = radspa_standard_plugin_destroy
};
// TODO: at this point we CANNOT just forward the same buffer pointer to every client because the buffer
// pointer serves as a channel-specific source plugin identifier, so we need to memcpy the line in.
// this could be faster at some point by adding a special case but we don't wanna think about it now.
typedef struct {
int16_t buffer[BL00MBOX_MAX_BUFFER_LEN];
uint32_t buffer_render_pass_id;
int16_t value;
int16_t value_render_pass_id;
} line_in_singleton_t;
static line_in_singleton_t line_in;
static inline void update_line_in_buffer(uint16_t num_samples, uint32_t render_pass_id){
if(line_in.buffer_render_pass_id == render_pass_id) return;
for(uint16_t i = 0; i < num_samples; i++){
int32_t val = bl00mbox_line_in_interlaced[2*i];
val += bl00mbox_line_in_interlaced[2*i+1];
val = radspa_clip(val>>1);
line_in.buffer[i] = val;
}
line_in.value = line_in.buffer[0];
line_in.value_render_pass_id = render_pass_id;
line_in.buffer_render_pass_id = render_pass_id;
}
static inline void update_line_in_value(uint32_t render_pass_id){
if(line_in.value_render_pass_id == render_pass_id) return;
int32_t val = bl00mbox_line_in_interlaced[0];
val += bl00mbox_line_in_interlaced[1];
val = radspa_clip(val>>1);
line_in.value = val;
line_in.value_render_pass_id = render_pass_id;
}
void bl00mbox_channel_plugin_run(radspa_t * channel_plugin, uint16_t num_samples, uint32_t render_pass_id){
if(!bl00mbox_line_in_interlaced) return;
// handle line in signal, only render full if requested
radspa_signal_t * input_sig = radspa_signal_get_by_index(channel_plugin, 0);
if(input_sig->buffer){
update_line_in_buffer(num_samples, render_pass_id);
memcpy(input_sig->buffer, line_in.buffer, sizeof(int16_t) * num_samples);
}
// do NOT render output here, if somebody has requested channel this would be too early
// we're accessing the source buffer directly if present to avoid needing memcpy
}
void bl00mbox_channel_plugin_update_values(radspa_t * channel_plugin, uint32_t render_pass_id){
update_line_in_value(render_pass_id);
radspa_signal_t * input_sig = radspa_signal_get_by_index(channel_plugin, 0);
input_sig->value = line_in.value;
}
radspa_t * bl00mbox_channel_plugin_create(uint32_t init_var){
radspa_t * channel_plugin = radspa_standard_plugin_create(&bl00mbox_channel_plugin_desc, 2, 0, 0);
if(!channel_plugin) return NULL;
channel_plugin->render = bl00mbox_channel_plugin_run;
radspa_signal_set(channel_plugin, 0, "line_in", RADSPA_SIGNAL_HINT_OUTPUT, 0);
radspa_signal_set(channel_plugin, 1, "line_out", RADSPA_SIGNAL_HINT_INPUT, 0);
return channel_plugin;
}
#pragma once
#include "radspa.h"
#include "radspa_helpers.h"
// SPECIAL REQUIREMENTS
#include "bl00mbox_audio.h"
#define BL00MBOX_CHANNEL_PLUGIN_ID 4002
extern radspa_descriptor_t bl00mbox_channel_plugin_desc;
radspa_t * bl00mbox_channel_plugin_create(uint32_t init_var);
void bl00mbox_channel_plugin_run(radspa_t * channel_plugin, uint16_t num_samples, uint32_t render_pass_id);
void bl00mbox_channel_plugin_update_values(radspa_t * channel_plugin, uint32_t render_pass_id);
#include "bl00mbox_line_in.h"
radspa_descriptor_t bl00mbox_line_in_desc = {
.name = "bl00mbox_line_in",
.id = 4001,
.description = "connects to the line input of bl00mbox",
.create_plugin_instance = bl00mbox_line_in_create,
.destroy_plugin_instance = radspa_standard_plugin_destroy
};
// TODO: every channel does this conversion individually. this is okay for now, but we can safe cpu
// by doing this once globally per render pass id.
// at this point we CANNOT just forward the same buffer pointer to every client because the buffer
// pointer serves as a channel-specific source plugin identifier, but we could at least reduce
// the overhead to a memcpy.
void bl00mbox_line_in_run(radspa_t * line_in, uint16_t num_samples, uint32_t render_pass_id){
if(bl00mbox_line_in_interlaced == NULL) return;
radspa_signal_t * left_sig = radspa_signal_get_by_index(line_in, 0);
radspa_signal_t * right_sig = radspa_signal_get_by_index(line_in, 1);
radspa_signal_t * mid_sig = radspa_signal_get_by_index(line_in, 2);
radspa_signal_t * gain_sig = radspa_signal_get_by_index(line_in, 3);
for(uint16_t i = 0; i < num_samples; i++){
int32_t gain = radspa_signal_get_value(gain_sig, i, render_pass_id);
if(left_sig->buffer != NULL){
int16_t left = radspa_gain(bl00mbox_line_in_interlaced[2*i], gain);
radspa_signal_set_value(left_sig, i, left);
}
if(right_sig->buffer != NULL){
int16_t right = radspa_gain(bl00mbox_line_in_interlaced[2*i+1], gain);
radspa_signal_set_value(right_sig, i, right);
}
if(mid_sig->buffer != NULL){
int16_t mid = bl00mbox_line_in_interlaced[2*i]>>1;
mid += bl00mbox_line_in_interlaced[2*i+1]>>1;
mid = radspa_gain(mid, gain);
radspa_signal_set_value(mid_sig, i, mid);
}
}
}
radspa_t * bl00mbox_line_in_create(uint32_t init_var){
radspa_t * line_in = radspa_standard_plugin_create(&bl00mbox_line_in_desc, 4, 0, 0);
if(line_in == NULL) return NULL;
line_in->render = bl00mbox_line_in_run;
radspa_signal_set(line_in, 0, "left", RADSPA_SIGNAL_HINT_OUTPUT, 0);
radspa_signal_set(line_in, 1, "right", RADSPA_SIGNAL_HINT_OUTPUT, 0);
radspa_signal_set(line_in, 2, "mid", RADSPA_SIGNAL_HINT_OUTPUT, 0);
radspa_signal_set(line_in, 3, "gain", RADSPA_SIGNAL_HINT_INPUT | RADSPA_SIGNAL_HINT_GAIN, RADSPA_SIGNAL_VAL_UNITY_GAIN);
return line_in;
}
#pragma once
#include "radspa.h"
#include "radspa_helpers.h"
// SPECIAL REQUIREMENTS
#include "bl00mbox_audio.h"
extern radspa_descriptor_t bl00mbox_line_in_desc;
radspa_t * bl00mbox_line_in_create(uint32_t init_var);
void bl00mbox_line_in_run(radspa_t * line_in, uint16_t num_samples, uint32_t render_pass_id);
Creative Commons Legal Code
CC0 1.0 Universal
CREATIVE COMMONS CORPORATION IS NOT A LAW FIRM AND DOES NOT PROVIDE
LEGAL SERVICES. DISTRIBUTION OF THIS DOCUMENT DOES NOT CREATE AN
ATTORNEY-CLIENT RELATIONSHIP. CREATIVE COMMONS PROVIDES THIS
INFORMATION ON AN "AS-IS" BASIS. CREATIVE COMMONS MAKES NO WARRANTIES
REGARDING THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS
PROVIDED HEREUNDER, AND DISCLAIMS LIABILITY FOR DAMAGES RESULTING FROM
THE USE OF THIS DOCUMENT OR THE INFORMATION OR WORKS PROVIDED
HEREUNDER.
Statement of Purpose
The laws of most jurisdictions throughout the world automatically confer
exclusive Copyright and Related Rights (defined below) upon the creator
and subsequent owner(s) (each and all, an "owner") of an original work of
authorship and/or a database (each, a "Work").
Certain owners wish to permanently relinquish those rights to a Work for
the purpose of contributing to a commons of creative, cultural and
scientific works ("Commons") that the public can reliably and without fear
of later claims of infringement build upon, modify, incorporate in other
works, reuse and redistribute as freely as possible in any form whatsoever
and for any purposes, including without limitation commercial purposes.
These owners may contribute to the Commons to promote the ideal of a free
culture and the further production of creative, cultural and scientific
works, or to gain reputation or greater distribution for their Work in
part through the use and efforts of others.
For these and/or other purposes and motivations, and without any
expectation of additional consideration or compensation, the person
associating CC0 with a Work (the "Affirmer"), to the extent that he or she
is an owner of Copyright and Related Rights in the Work, voluntarily
elects to apply CC0 to the Work and publicly distribute the Work under its
terms, with knowledge of his or her Copyright and Related Rights in the
Work and the meaning and intended legal effect of CC0 on those rights.
1. Copyright and Related Rights. A Work made available under CC0 may be
protected by copyright and related or neighboring rights ("Copyright and
Related Rights"). Copyright and Related Rights include, but are not
limited to, the following:
i. the right to reproduce, adapt, distribute, perform, display,
communicate, and translate a Work;
ii. moral rights retained by the original author(s) and/or performer(s);
iii. publicity and privacy rights pertaining to a person's image or
likeness depicted in a Work;
iv. rights protecting against unfair competition in regards to a Work,
subject to the limitations in paragraph 4(a), below;
v. rights protecting the extraction, dissemination, use and reuse of data
in a Work;
vi. database rights (such as those arising under Directive 96/9/EC of the
European Parliament and of the Council of 11 March 1996 on the legal
protection of databases, and under any national implementation
thereof, including any amended or successor version of such
directive); and
vii. other similar, equivalent or corresponding rights throughout the
world based on applicable law or treaty, and any national
implementations thereof.
2. Waiver. To the greatest extent permitted by, but not in contravention
of, applicable law, Affirmer hereby overtly, fully, permanently,
irrevocably and unconditionally waives, abandons, and surrenders all of
Affirmer's Copyright and Related Rights and associated claims and causes
of action, whether now known or unknown (including existing as well as
future claims and causes of action), in the Work (i) in all territories
worldwide, (ii) for the maximum duration provided by applicable law or
treaty (including future time extensions), (iii) in any current or future
medium and for any number of copies, and (iv) for any purpose whatsoever,
including without limitation commercial, advertising or promotional
purposes (the "Waiver"). Affirmer makes the Waiver for the benefit of each
member of the public at large and to the detriment of Affirmer's heirs and
successors, fully intending that such Waiver shall not be subject to
revocation, rescission, cancellation, termination, or any other legal or
equitable action to disrupt the quiet enjoyment of the Work by the public
as contemplated by Affirmer's express Statement of Purpose.
3. Public License Fallback. Should any part of the Waiver for any reason
be judged legally invalid or ineffective under applicable law, then the
Waiver shall be preserved to the maximum extent permitted taking into
account Affirmer's express Statement of Purpose. In addition, to the
extent the Waiver is so judged Affirmer hereby grants to each affected
person a royalty-free, non transferable, non sublicensable, non exclusive,
irrevocable and unconditional license to exercise Affirmer's Copyright and
Related Rights in the Work (i) in all territories worldwide, (ii) for the
maximum duration provided by applicable law or treaty (including future
time extensions), (iii) in any current or future medium and for any number
of copies, and (iv) for any purpose whatsoever, including without
limitation commercial, advertising or promotional purposes (the
"License"). The License shall be deemed effective as of the date CC0 was
applied by Affirmer to the Work. Should any part of the License for any
reason be judged legally invalid or ineffective under applicable law, such
partial invalidity or ineffectiveness shall not invalidate the remainder
of the License, and in such case Affirmer hereby affirms that he or she
will not (i) exercise any of his or her remaining Copyright and Related
Rights in the Work or (ii) assert any associated claims and causes of
action with respect to the Work, in either case contrary to Affirmer's
express Statement of Purpose.
4. Limitations and Disclaimers.
a. No trademark or patent rights held by Affirmer are waived, abandoned,
surrendered, licensed or otherwise affected by this document.
b. Affirmer offers the Work as-is and makes no representations or
warranties of any kind concerning the Work, express, implied,
statutory or otherwise, including without limitation warranties of
title, merchantability, fitness for a particular purpose, non
infringement, or the absence of latent or other defects, accuracy, or
the present or absence of errors, whether or not discoverable, all to
the greatest extent permissible under applicable law.
c. Affirmer disclaims responsibility for clearing rights of other persons
that may apply to the Work or any use thereof, including without
limitation any person's Copyright and Related Rights in the Work.
Further, Affirmer disclaims responsibility for obtaining any necessary
consents, permissions or other rights required for any use of the
Work.
d. Affirmer understands and acknowledges that Creative Commons is not a
party to this document and has no duty or obligation with respect to
this CC0 or use of the Work.
//SPDX-License-Identifier: CC0-1.0
// Please do not define a new version number. If want to distribute a modified version of
// this file, kindly append "-modified" to the version string below so it is not mistaken
// for an official release.
// Version 0.2.1
/* Realtime Audio Developer's Simple Plugin Api
*
* Written from scratch but largely inspired by faint memories of the excellent ladspa.h
*
* Plugins may only include this file and the corresponding "radspa_helpers.h" with
* the same version string. Specifically, do not include <math.h> no matter how tempting
* it may be - it's a notoriously slow library on most architectures and has no place
* in realtime audio synthesis.
*
* For a simple plugin implementation example check ampliverter.c/.h :D
*/
#pragma once
#include <string.h>
#include <stdbool.h>
#include <stdint.h>
/* CONVENTIONS
*
* All plugins communicate all signals in int16_t data.
*
* SCT (semi-cent, 2/ct) is a mapping between int16_t pitch values and frequency values comparable
* to 1V/oct. in analog music electronics. A cent is a common unit in music theory and describes
* 1/100 of a semitone or 1/1200 of an octave. Typically 1 cent is precise enough for synth
* applications, however 1/2 cent is better and allows for a 27 octave range. INT16_MAX
* represents a frequency of 28160Hz, 6 octaves above middle A (440Hz) which is represented by
* INT16_MAX - 6 * 2400.
*/
// signal hints
#define RADSPA_SIGNAL_HINT_INPUT (1<<0)
#define RADSPA_SIGNAL_HINT_OUTPUT (1<<1)
#define RADSPA_SIGNAL_HINT_TRIGGER (1<<2)
#define RADSPA_SIGNAL_HINT_GAIN (1<<3)
#define RADSPA_SIGNAL_HINT_DEPRECATED (1<<4)
#define RADSPA_SIGNAL_HINT_SCT (1<<5)
#define RADSPA_SIGNAL_VAL_SCT_A440 (INT16_MAX - 6*2400)
#define RADSPA_SIGNAL_VAL_UNITY_GAIN (1<<12)
struct _radspa_descriptor_t;
struct _radspa_signal_t;
struct _radspa_t;
typedef void (* radspa_render_t)(struct _radspa_t * plugin, uint16_t num_samples, uint32_t render_pass_id);
typedef struct _radspa_descriptor_t{
char * name;
uint32_t id; // unique id number
char * description;
struct _radspa_t * (* create_plugin_instance)(uint32_t init_var);
void (* destroy_plugin_instance)(struct _radspa_t * plugin); // point to radspa_t
} radspa_descriptor_t;
typedef struct _radspa_signal_t{
// this bitfield determines the type of the signal, see RADSPA_SIGNAL_HINTS_*
uint32_t hints;
// this is the name of the signal as shown to the user.
// allowed characters: lowercase, numbers, underscore, may not start with number
// if name_multplex >= 0: may not end with number
char * name;
// arbitrary formatted string to describe what the signal is for
char * description;
// unit that corresponds to value, may be empty. note: some RADSPA_SIGNAL_HINTS_*
// imply units. field may be empty.
char * unit;
// -1 to disable signal multiplexing
int8_t name_multiplex;
// buffer full of samples, may be NULL.
int16_t * buffer;
// static value to be used when buffer is NULL for input signals only
int16_t value;
// when the signal has last requested to render its source
uint32_t render_pass_id;
} radspa_signal_t;
typedef struct _radspa_t{
const radspa_descriptor_t * descriptor;
void * parent;
// renders all signal outputs for num_samples if render_pass_id has changed
// since the last call, else does nothing.
radspa_render_t render;
// stores id number of render pass.
uint32_t render_pass_id;
// init var that was used for creating the plugin. if the plugin needs to modify the value to
// a valid range it may do so at any point in time.
uint32_t init_var;
void * plugin_data; // internal data for the plugin to use. should not be accessed from outside.
uint32_t plugin_table_len;
int16_t * plugin_table;
uint8_t len_signals;
radspa_signal_t signals[];
} radspa_t;
/* REQUIREMENTS
* Hosts must provide implementations for the following functions:
*/
/* The return value should equal frequency(sct) * UINT32_MAX / (sample_rate>>undersample_pow) with:
* frequency(sct) = pow(2, (sct + 2708)/2400)
* /!\ Performance critical, might be called on a per-sample basis, do _not_ just use pow()!
*/
extern uint32_t radspa_sct_to_rel_freq(int16_t sct, int16_t undersample_pow);
// Return 1 if the buffer wasn't rendered already, 0 otherwise.
extern bool radspa_host_request_buffer_render(int16_t * buf);
extern int16_t radspa_random();
//SPDX-License-Identifier: CC0-1.0
#include "radspa_helpers.h"
extern inline int16_t radspa_signal_get_value(radspa_signal_t * sig, int16_t index, uint32_t render_pass_id);
extern inline int16_t radspa_signal_get_const_value(radspa_signal_t * sig, uint32_t render_pass_id);
extern inline void radspa_signal_set_value(radspa_signal_t * sig, int16_t index, int32_t value);
extern inline void radspa_signal_set_value_check_const(radspa_signal_t * sig, int16_t index, int32_t value);
extern inline void radspa_signal_set_const_value(radspa_signal_t * sig, int32_t value);
extern inline int16_t radspa_clip(int32_t a);
extern inline int16_t radspa_add_sat(int32_t a, int32_t b);
extern inline int32_t radspa_mult_shift(int32_t a, int32_t b);
extern inline int32_t radspa_gain(int32_t a, int32_t b);
extern inline int16_t radspa_trigger_start(int16_t velocity, int16_t * hist);
extern inline int16_t radspa_trigger_stop(int16_t * hist);
extern inline int16_t radspa_trigger_get(int16_t trigger_signal, int16_t * hist);
extern inline radspa_signal_t * radspa_signal_get_by_index(radspa_t * plugin, uint16_t signal_index);
radspa_signal_t * radspa_signal_set(radspa_t * plugin, uint8_t signal_index, char * name, uint32_t hints, int16_t value){
radspa_signal_t * sig = radspa_signal_get_by_index(plugin, signal_index);
if(sig == NULL) return NULL;
sig->name = name;
sig->hints = hints;
sig->value = value;
return sig;
}
void radspa_signal_set_description(radspa_t * plugin, uint8_t signal_index, char * description){
radspa_signal_t * sig = radspa_signal_get_by_index(plugin, signal_index);
if(sig == NULL) return;
sig->description = description;
}
radspa_signal_t * radspa_signal_set_group(radspa_t * plugin, uint8_t group_len, uint8_t step, uint8_t signal_index, char * name,
uint32_t hints, int16_t value){
for(uint8_t i = 0; i < group_len; i++){
radspa_signal_t * sig = radspa_signal_get_by_index(plugin, signal_index + i * step);
if(sig == NULL) return NULL;
sig->name = name;
sig->hints = hints;
sig->value = value;
sig->name_multiplex = i;
}
return radspa_signal_get_by_index(plugin, signal_index);
}
void radspa_signal_set_group_description(radspa_t * plugin, uint8_t group_len, uint8_t step, uint8_t signal_index,
char * description){
for(uint8_t i = 0; i < group_len; i++){
radspa_signal_t * sig = radspa_signal_get_by_index(plugin, signal_index + i * step);
if(sig == NULL) return;
sig->description = description;
}
}
static void radspa_signal_init(radspa_signal_t * sig){
sig->name = "UNINITIALIZED";
sig->hints = 0;
sig->unit = "";
sig->description = "";
sig->value = 0;
sig->name_multiplex = -1;
sig->buffer = NULL;
}
radspa_t * radspa_standard_plugin_create(radspa_descriptor_t * desc, uint8_t num_signals, size_t plugin_data_size, uint32_t plugin_table_size){
radspa_t * ret = calloc(1, sizeof(radspa_t) + num_signals * sizeof(radspa_signal_t));
if(ret == NULL) return NULL;
if(plugin_data_size){
ret->plugin_data = calloc(1,plugin_data_size);
if(ret->plugin_data == NULL){
free(ret);
return NULL;
}
}
ret->len_signals = num_signals;
ret->render = NULL;
ret->descriptor = desc;
ret->plugin_table_len = plugin_table_size;
for(uint8_t i = 0; i < num_signals; i++){
radspa_signal_init(&(ret->signals[i]));
}
bool init_failed = false;
if(ret->plugin_table_len){
ret->plugin_table = calloc(plugin_table_size, sizeof(int16_t));
if(ret->plugin_table == NULL) init_failed = true;
} else {
ret->plugin_table = NULL;
}
if(init_failed){
radspa_standard_plugin_destroy(ret);
return NULL;
}
return ret;
}
void radspa_standard_plugin_destroy(radspa_t * plugin){
if(plugin->plugin_table != NULL) free(plugin->plugin_table);
if(plugin->plugin_data != NULL) free(plugin->plugin_data);
free(plugin);
}
//SPDX-License-Identifier: CC0-1.0
#pragma once
#include "radspa.h"
#define RADSPA_SIGNAL_NONCONST (-32768)
#define RADSPA_EVENT_MASK (0b11111)
#define RADSPA_EVENT_POW (5)
// adds signal to plugin instance struct. typically used to initiate a plugin instance.
int16_t radspa_signal_add(radspa_t * plugin, char * name, uint32_t hints, int16_t value);
// as above, but sets parameters of an already existing signal with at list position signal_index
radspa_signal_t * radspa_signal_set(radspa_t * plugin, uint8_t signal_index, char * name, uint32_t hints, int16_t value);
radspa_signal_t * radspa_signal_set_group(radspa_t * plugin, uint8_t group_len, uint8_t step, uint8_t signal_index, char * name,
uint32_t hints, int16_t value);
void radspa_signal_set_description(radspa_t * plugin, uint8_t signal_index, char * description);
void radspa_signal_set_group_description(radspa_t * plugin, uint8_t group_len, uint8_t step, uint8_t signal_index,
char * description);
radspa_t * radspa_standard_plugin_create(radspa_descriptor_t * desc, uint8_t num_signals, size_t plugin_data_size,
uint32_t plugin_table_size);
void radspa_standard_plugin_destroy(radspa_t * plugin);
// frees all signal structs. typically used to destroy a plugin instance.
void radspa_signals_free(radspa_t * plugin);
inline int16_t radspa_clip(int32_t a){
if(a > 32767){
return 32767;
} else if(a < -32767){
return -32767;
}
return a;
}
inline int16_t radspa_add_sat(int32_t a, int32_t b){ return radspa_clip(a+b); }
inline int32_t radspa_mult_shift(int32_t a, int32_t b){ return radspa_clip((a*b)>>15); }
inline int32_t radspa_gain(int32_t a, int32_t b){ return radspa_clip((a*b)>>12); }
inline int16_t radspa_trigger_start(int16_t velocity, int16_t * hist){
if(!velocity) velocity = 1;
if(velocity == -32768) velocity = 1;
if(velocity < 0) velocity = -velocity;
(* hist) = ((* hist) > 0) ? -velocity : velocity;
return * hist;
}
inline int16_t radspa_trigger_stop(int16_t * hist){
(* hist) = 0;
return * hist;
}
inline int16_t radspa_trigger_get(int16_t trigger_signal, int16_t * hist){
// might wanna remove that safeguard soon
if(trigger_signal == RADSPA_SIGNAL_NONCONST) return 0;
if((* hist) == trigger_signal) return 0;
(* hist) = trigger_signal;
if(!trigger_signal){
return -1;
} else if(trigger_signal < 0 ){
return -trigger_signal;
} else {
return trigger_signal;
}
}
inline radspa_signal_t * radspa_signal_get_by_index(radspa_t * plugin, uint16_t signal_index){
return &(plugin->signals[signal_index]);
}
/* returns the value that a signal has at a given moment in time. time is
* represented as the buffer index. requests rendering from host and requires implementation
* of radspa_host_request_buffer_render.
*/
inline int16_t radspa_signal_get_value(radspa_signal_t * sig, int16_t index, uint32_t render_pass_id){
if(sig->buffer){
if(sig->render_pass_id != render_pass_id) radspa_host_request_buffer_render(sig->buffer);
if(sig->buffer[1] == -32768) return sig->buffer[0];
return sig->buffer[index];
}
return sig->value;
}
inline void radspa_signal_set_value_noclip(radspa_signal_t * sig, int16_t index, int16_t val){
if(sig->buffer != NULL){
sig->buffer[index] = val;
} else if(!index){
sig->value = val;
}
}
inline void radspa_signal_set_value(radspa_signal_t * sig, int16_t index, int32_t val){
if(sig->buffer != NULL){
sig->buffer[index] = radspa_clip(val);
} else if(!index){
sig->value = radspa_clip(val);
}
}
inline void radspa_signal_copy(radspa_signal_t * input, radspa_signal_t * output, uint32_t buffer_len, uint32_t render_pass_id){
if(!output->buffer){
output->value = radspa_signal_get_value(input, 0, render_pass_id);
} else if(!input->buffer){
output->buffer[0] = input->value;
output->buffer[1] = -32768;
} else {
if(input->render_pass_id != render_pass_id) radspa_host_request_buffer_render(input->buffer);
if(input->buffer[1] == -32768){
memcpy(output->buffer, input->buffer, sizeof(int16_t) * 2);
} else {
memcpy(output->buffer, input->buffer, sizeof(int16_t) * buffer_len);
}
}
}
inline void radspa_signal_set_value_check_const(radspa_signal_t * sig, int16_t index, int32_t val){
// disabled for now, causes issues somewhere, no time to track it down
radspa_signal_set_value(sig, index, val);
return;
if(sig->buffer == NULL){
if(!index) sig->value = radspa_clip(val);
return;
}
val = radspa_clip(val);
if(index == 0){
sig->buffer[0] = val;
} else if(index == 1){
if(val == sig->buffer[0]) sig->buffer[1] = -32768;
} else {
if((sig->buffer[1] == -32768) && (val != sig->buffer[0])) sig->buffer[1] = sig->buffer[0];
sig->buffer[index] = val;
}
}
inline int16_t radspa_signal_set_value_check_const_result(radspa_signal_t * sig){
if(sig->buffer != NULL){
if(sig->buffer[1] == -32768) return sig->buffer[0];
return RADSPA_SIGNAL_NONCONST;
}
return sig->value;
}
inline void radspa_signal_set_const_value(radspa_signal_t * sig, int32_t val){
if(sig->buffer == NULL){
sig->value = radspa_clip(val);
} else {
sig->buffer[0] = radspa_clip(val);
sig->buffer[1] = -32768;
}
}
inline void radspa_signal_set_values(radspa_signal_t * sig, uint16_t start, uint16_t stop, int32_t val){
if(sig->buffer == NULL){
if((!start) && stop) sig->value = radspa_clip(val);
} else {
val = radspa_clip(val);
for(uint16_t i = start; i < stop; i++){
sig->buffer[i] = val;
}
}
}
inline int16_t radspa_signal_get_const_value(radspa_signal_t * sig, uint32_t render_pass_id){
if(sig->buffer != NULL){
if(sig->render_pass_id != render_pass_id){
radspa_host_request_buffer_render(sig->buffer);
sig->render_pass_id = render_pass_id;
}
if(sig->buffer[1] == -32768) return sig->buffer[0];
return RADSPA_SIGNAL_NONCONST;
}
return sig->value;
}
inline int16_t radspa_trigger_get_const(radspa_signal_t * sig, int16_t * hist, uint16_t * index, uint16_t num_samples, uint32_t render_pass_id){
(* index) = 0;
int16_t ret_const = radspa_signal_get_const_value(sig, render_pass_id);
if(ret_const != RADSPA_SIGNAL_NONCONST) return radspa_trigger_get(ret_const, hist);
int16_t ret = 0;
for(uint16_t i = 0; i< num_samples; i++){
int16_t tmp = radspa_trigger_get(radspa_signal_get_value(sig, i, render_pass_id), hist);
if(tmp){
ret = tmp;
(* index) = i;
}
}
return ret;
}
#include "ampliverter.h"
// we're defining a prototype for the create function because it has a circular
// dependency with the descriptor
radspa_t * ampliverter_create(uint32_t init_var);
radspa_descriptor_t ampliverter_desc = {
.name = "ampliverter",
.id = 68,
.description = "[DEPRECATED, replaced by `mixer` or `range_shifter`] saturating multiplication and addition",
.create_plugin_instance = ampliverter_create,
// with this we can only use radspa_standard_plugin_create to allocate memory.
// this restricts data layout flexibility for large buffers, but in return it offers
// offers a bit of protection from double free/memory leak issues and makes plugins
// easier to write.
.destroy_plugin_instance = radspa_standard_plugin_destroy
};
#define AMPLIVERTER_NUM_SIGNALS 4
#define AMPLIVERTER_OUTPUT 0
#define AMPLIVERTER_INPUT 1
#define AMPLIVERTER_GAIN 2
#define AMPLIVERTER_BIAS 3
void ampliverter_run(radspa_t * ampliverter, uint16_t num_samples, uint32_t render_pass_id){
// step 1: get signal pointers. since these are stored in a linked list this is a rather
// slow operation and should ideally be only be done once per call.
// if no signals with output hint are being read the run function may exit early.
radspa_signal_t * output_sig = radspa_signal_get_by_index(ampliverter, AMPLIVERTER_OUTPUT);
if(output_sig->buffer == NULL) return;
radspa_signal_t * input_sig = radspa_signal_get_by_index(ampliverter, AMPLIVERTER_INPUT);
radspa_signal_t * gain_sig = radspa_signal_get_by_index(ampliverter, AMPLIVERTER_GAIN);
radspa_signal_t * bias_sig = radspa_signal_get_by_index(ampliverter, AMPLIVERTER_BIAS);
static int16_t ret = 0;
for(uint16_t i = 0; i < num_samples; i++){
// step 2: render the outputs. most of the time a simple for loop will be fine.
// using {*radspa_signal_t}->get_value is required to automatically switch between
// static values and streamed data at various sample rates, don't access the data directly
int16_t bias = radspa_signal_get_value(bias_sig, i, render_pass_id);
int16_t gain = radspa_signal_get_value(gain_sig, i, render_pass_id);
if(gain == 0){
ret = bias;
} else {
ret = radspa_signal_get_value(input_sig, i, render_pass_id);
ret = radspa_mult_shift(ret, gain);
ret = radspa_add_sat(ret, bias);
}
radspa_signal_set_value(output_sig, i, ret);
}
}
radspa_t * ampliverter_create(uint32_t init_var){
// init_var is not used in this example
// step 1: try to allocate enough memory for a standard plugin with the given amount of signals
// and plugin data. there is no plugin data in this case, but sizeof(void) is invalid sooo we're
// taking the next smallest thing (char) ig? we're not good at C.
// providing the descriptor address is required to make sure it is not forgotten.
radspa_t * ampliverter = radspa_standard_plugin_create(&ampliverter_desc, AMPLIVERTER_NUM_SIGNALS, sizeof(char), 0);
if(ampliverter == NULL) return NULL;
// step 2: define run function
ampliverter->render = ampliverter_run;
// step 3: standard_plugin_create has already created dummy signals for us, we just need to
// fill them
radspa_signal_set(ampliverter, AMPLIVERTER_OUTPUT, "output", RADSPA_SIGNAL_HINT_OUTPUT, 0);
radspa_signal_set(ampliverter, AMPLIVERTER_INPUT, "input", RADSPA_SIGNAL_HINT_INPUT, 0);
radspa_signal_set(ampliverter, AMPLIVERTER_GAIN, "gain", RADSPA_SIGNAL_HINT_INPUT, 32767);
radspa_signal_set(ampliverter, AMPLIVERTER_BIAS, "bias", RADSPA_SIGNAL_HINT_INPUT, 0);
return ampliverter;
}
#pragma once
#include <radspa.h>
#include <radspa_helpers.h>
extern radspa_descriptor_t ampliverter_desc;
radspa_t * ampliverter_create(uint32_t init_var);
void ampliverter_run(radspa_t * osc, uint16_t num_samples, uint32_t render_pass_id);
#include "buffer.h"
radspa_descriptor_t buffer_desc = {
.name = "buffer",
.id = 22,
.description = "forwards input signal to output signal",
.create_plugin_instance = buffer_create,
.destroy_plugin_instance = radspa_standard_plugin_destroy
};
void buffer_run(radspa_t * buffer, uint16_t num_samples, uint32_t render_pass_id){
// note: this could be more lightweight by simply forwarding the buffer,
// however at this point the radspa protocol has no built-in flag for this.
// a host may still choose to simply do so to save CPU.
// for the future, since this is a common use case, we should add some sort of
// buffer forwarding flag. but it's okay. still lighter than the old approach
// (running a single channel mixer).
radspa_signal_t * output = radspa_signal_get_by_index(buffer, 0);
radspa_signal_t * input = radspa_signal_get_by_index(buffer, 1);
radspa_signal_copy(input, output, num_samples, render_pass_id);
}
radspa_t * buffer_create(uint32_t init_var){
radspa_t * buffer = radspa_standard_plugin_create(&buffer_desc, 2, 0, 0);
if(!buffer) return NULL;
buffer->render = buffer_run;
radspa_signal_set(buffer, 0, "output", RADSPA_SIGNAL_HINT_OUTPUT, 0);
radspa_signal_set(buffer, 1, "input", RADSPA_SIGNAL_HINT_INPUT, 0);
return buffer;
}
#pragma once
#include <radspa.h>
#include <radspa_helpers.h>
extern radspa_descriptor_t buffer_desc;
radspa_t * buffer_create(uint32_t init_var);
void buffer_run(radspa_t * buffer, uint16_t num_samples, uint32_t render_pass_id);
#include "delay.h"
radspa_t * delay_create(uint32_t init_var);
radspa_descriptor_t delay_desc = {
.name = "delay_static",
.id = 42069,
.description = "simple delay with ms input and feedback\n"
"init_var: delay buffer length in ms, default 500",
.create_plugin_instance = delay_create,
.destroy_plugin_instance = radspa_standard_plugin_destroy
};
#define DELAY_NUM_SIGNALS 7
#define DELAY_OUTPUT 0
#define DELAY_INPUT 1
#define DELAY_TIME 2
#define DELAY_FEEDBACK 3
#define DELAY_LEVEL 4
#define DELAY_DRY_VOL 5
#define DELAY_REC_VOL 6
void delay_run(radspa_t * delay, uint16_t num_samples, uint32_t render_pass_id){
radspa_signal_t * output_sig = radspa_signal_get_by_index(delay, DELAY_OUTPUT);
delay_data_t * data = delay->plugin_data;
int16_t * buf = delay->plugin_table;
radspa_signal_t * input_sig = radspa_signal_get_by_index(delay, DELAY_INPUT);
radspa_signal_t * time_sig = radspa_signal_get_by_index(delay, DELAY_TIME);
radspa_signal_t * feedback_sig = radspa_signal_get_by_index(delay, DELAY_FEEDBACK);
radspa_signal_t * level_sig = radspa_signal_get_by_index(delay, DELAY_LEVEL);
radspa_signal_t * dry_vol_sig = radspa_signal_get_by_index(delay, DELAY_DRY_VOL);
radspa_signal_t * rec_vol_sig = radspa_signal_get_by_index(delay, DELAY_REC_VOL);
static int16_t ret = 0;
int32_t buffer_size = delay->plugin_table_len;
int32_t time = radspa_signal_get_value(time_sig, 0, render_pass_id);
if(time < 0) time = -time;
if(time > data->max_delay) time = data->max_delay;
if(time != data->time_prev){
data->read_head_position = data->write_head_position;
data->read_head_position -= time * (48000/1000);
if(data->read_head_position < 0) data->read_head_position += buffer_size;
data->time_prev = time;
}
int16_t fb = radspa_signal_get_value(feedback_sig, 0, render_pass_id);
int16_t level = radspa_signal_get_value(level_sig, 0, render_pass_id);
int16_t dry_vol = radspa_signal_get_value(dry_vol_sig, 0, render_pass_id);
int16_t rec_vol = radspa_signal_get_value(rec_vol_sig, 0, render_pass_id);
for(uint16_t i = 0; i < num_samples; i++){
data->write_head_position++;
while(data->write_head_position >= buffer_size) data->write_head_position -= buffer_size; // maybe faster than %
data->read_head_position++;
while(data->read_head_position >= buffer_size) data->read_head_position -= buffer_size;
int16_t dry = radspa_signal_get_value(input_sig, i, render_pass_id);
int16_t wet = buf[data->read_head_position];
if(rec_vol){
buf[data->write_head_position] = radspa_add_sat(radspa_mult_shift(rec_vol, dry), radspa_mult_shift(wet,fb));
}
ret = radspa_add_sat(radspa_mult_shift(dry_vol,dry), radspa_mult_shift(wet,level));
radspa_signal_set_value(output_sig, i, ret);
}
}
radspa_t * delay_create(uint32_t init_var){
if(init_var == 0) init_var = 500;
if(init_var > 10000) init_var = 10000;
uint32_t buffer_size = init_var*(48000/1000);
radspa_t * delay = radspa_standard_plugin_create(&delay_desc, DELAY_NUM_SIGNALS, sizeof(delay_data_t), buffer_size);
if(delay == NULL) return NULL;
delay_data_t * plugin_data = delay->plugin_data;
plugin_data->time_prev = -1;
plugin_data->max_delay = init_var;
delay->render = delay_run;
radspa_signal_set(delay, DELAY_OUTPUT, "output", RADSPA_SIGNAL_HINT_OUTPUT, 0);
radspa_signal_set(delay, DELAY_INPUT, "input", RADSPA_SIGNAL_HINT_INPUT, 0);
radspa_signal_set(delay, DELAY_TIME, "time", RADSPA_SIGNAL_HINT_INPUT, 200);
radspa_signal_set(delay, DELAY_FEEDBACK, "feedback", RADSPA_SIGNAL_HINT_INPUT, 16000);
radspa_signal_set(delay, DELAY_LEVEL, "level", RADSPA_SIGNAL_HINT_INPUT, 16000);
radspa_signal_set(delay, DELAY_DRY_VOL, "dry_vol", RADSPA_SIGNAL_HINT_INPUT, 32767);
radspa_signal_set(delay, DELAY_REC_VOL, "rec_vol", RADSPA_SIGNAL_HINT_INPUT, 32767);
return delay;
}
#pragma once
#include <radspa.h>
#include <radspa_helpers.h>
typedef struct {
int32_t read_head_position;
int32_t write_head_position;
int32_t max_delay;
int32_t time_prev;
} delay_data_t;
extern radspa_descriptor_t delay_desc;
radspa_t * delay_create(uint32_t init_var);
void delay_run(radspa_t * osc, uint16_t num_samples, uint32_t render_pass_id);