#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
This module implements a codec to encode and decode expressive performances to a set of
expressive parameters.
"""
import numpy as np
import numpy.lib.recfunctions as rfn
try:
import torch
except ImportError:
# Dummy module to avoid ImportErrors
class DummyTorch(object):
Tensor = np.ndarray
def __init__(self):
pass
torch = DummyTorch()
from partitura.score import Part
from partitura.performance import PerformedPart
from scipy.interpolate import interp1d
__all__ = ["encode_performance", "decode_performance"]
def decode_time(score_onsets, score_durations, parameters, *args, **kwargs):
"""
Decode a performance into onsets and durations in seconds
for each note in the score.
"""
score_onsets = score_onsets.astype(float, copy=False)
score_durations = score_durations.astype(float, copy=False)
score_info = get_unique_seq(
onsets=score_onsets,
offsets=score_onsets + score_durations,
unique_onset_idxs=None,
return_diff=True,
)
unique_onset_idxs = score_info["unique_onset_idxs"]
diff_u_onset_score = score_info["diff_u_onset"]
time_param = np.array(
[tuple([np.mean(parameters["beat_period"][uix])]) for uix in unique_onset_idxs],
dtype=[("beat_period", "f4")],
)
beat_period = time_param["beat_period"]
ioi_perf = diff_u_onset_score * beat_period
eq_onset = np.cumsum(np.r_[0, ioi_perf])
performance = np.zeros((len(score_onsets), 2))
for i, jj in enumerate(unique_onset_idxs):
# decode onset
performance[jj, 0] = eq_onset[i] - parameters["timing"][jj]
# decode duration
performance[jj, 1] = decode_articulation(
score_durations=score_durations[jj],
articulation_parameter=parameters["articulation_log"][jj],
beat_period=beat_period[i],
)
performance[:, 0] -= np.min(performance[:, 0])
return performance
def decode_articulation(score_durations, articulation_parameter, beat_period):
"""
Decode articulation
"""
art_ratio = 2 ** articulation_parameter
dur = art_ratio * score_durations * beat_period
return dur
def encode_tempo(
score_onsets: np.ndarray,
performed_onsets: np.ndarray,
score_durations,
performed_durations,
return_u_onset_idx: bool = False,
) -> np.ndarray:
"""
Compute time-related performance parameters from a performance
"""
if score_onsets.shape != performed_onsets.shape:
raise ValueError("The performance and the score should be of " "the same size")
# use float64, float32 led to problems that x == x + eps evaluated to True
# Maybe replace by np.isclose
score_onsets = score_onsets.astype(float, copy=False)
performed_onsets = performed_onsets.astype(float, copy=False)
score_durations = score_durations.astype(float, copy=False)
performed_durations = performed_durations.astype(float, copy=False)
score = np.column_stack((score_onsets, score_durations))
performance = np.column_stack((performed_onsets, performed_durations))
# Compute beat period
beat_period, s_onsets, unique_onset_idxs = tempo_by_average(
score_onsets=score[:, 0],
performed_onsets=performance[:, 0],
score_durations=score[:, 1],
performed_durations=performance[:, 1],
return_onset_idxs=True,
)
# Compute equivalent onsets
eq_onsets = (
np.cumsum(np.r_[0, beat_period[:-1] * np.diff(s_onsets)])
+ performance[unique_onset_idxs[0], 0].mean()
)
# Compute tempo parameter
# TODO fix normalization
tempo_params = [beat_period]
# Compute articulation parameter
articulation_param = encode_articulation(
score_durations=score[:, 1],
performed_durations=performance[:, 1],
unique_onset_idxs=unique_onset_idxs,
beat_period=beat_period,
)
# Initialize array of parameters
parameter_names = ["beat_period", "velocity", "timing", "articulation_log"]
parameters = np.zeros(len(score), dtype=[(pn, "f4") for pn in parameter_names])
parameters["articulation_log"] = articulation_param
for i, jj in enumerate(unique_onset_idxs):
parameters["beat_period"][jj] = tempo_params[0][i]
# Defined as in Eq. (3.9) in Thesis (pp. 34)
parameters["timing"][jj] = eq_onsets[i] - performance[jj, 0]
if return_u_onset_idx:
return parameters, unique_onset_idxs
else:
return parameters
def tempo_by_average(
score_onsets,
performed_onsets,
score_durations,
performed_durations,
unique_onset_idxs=None,
input_onsets=None,
return_onset_idxs=False,
):
"""
Computes a tempo curve using the average of the onset times of all
notes belonging to the same score onset.
Parameters
----------
score_onsets : np.ndarray
Onset in beats of each note in the score.
performed_onsets : np.ndarray
Performed onsets in seconds of each note in the score.
score_durations : np.ndarray
Duration in beats of each note in the score.
performed_durations : np.ndarray
Performed duration in seconds of each note in the score.
unique_onset_idxs : np.ndarray or None (optional)
Indices of the notes with the same score onset. (By default is None,
and is therefore, inferred from `score_onsets`).
input_onsets : np.ndarray or None
Input onset times in beats at which the tempo curve is to be
sampled (by default is None, which means that the tempo curve
is returned for each unique score onset)
return_onset_idxs : bool
Return the indices of the unique score onsets (Default is False)
Returns
-------
tempo_curve : np.ndarray
Tempo curve in seconds per beat (spb). If `input_onsets` was provided,
this array contains the value of the tempo in spb for each onset
in `input_onsets`. Otherwise, this array contains the value of the
tempo in spb for each unique score onset.
input_onsets : np.ndarray
The score onsets corresponding to each value of the tempo curve.
unique_onset_idxs: list
Each element of the list is an array of the indices of the score
corresponding to the elements in `tempo_curve`. Only returned if
`return_onset_idxs` is True.
"""
# use float64, float32 led to problems that x == x + eps evaluated
# to True
score_onsets = np.array(score_onsets).astype(float, copy=False)
performed_onsets = np.array(performed_onsets).astype(float, copy=False)
score_durations = np.array(score_durations).astype(float, copy=False)
performed_durations = np.array(performed_durations).astype(float, copy=False)
# Get unique onsets if no provided
if unique_onset_idxs is None:
# Get indices of the unique onsets (quantize score onsets)
unique_onset_idxs = get_unique_onset_idxs((1e4 * score_onsets).astype(int))
# Get score information
score_info = get_unique_seq(
onsets=score_onsets,
offsets=score_onsets + score_durations,
unique_onset_idxs=unique_onset_idxs,
)
# Get performance information
perf_info = get_unique_seq(
onsets=performed_onsets,
offsets=performed_onsets + performed_durations,
unique_onset_idxs=unique_onset_idxs,
)
# unique score onsets
unique_s_onsets = score_info["u_onset"]
# equivalent onsets
eq_onsets = perf_info["u_onset"]
# Monotonize times
eq_onset_mt, unique_s_onsets_mt = monotonize_times(
eq_onsets, deltas=unique_s_onsets
)
# Estimate Beat Period
perf_iois = np.diff(eq_onset_mt)
s_iois = np.diff(unique_s_onsets_mt)
beat_period = perf_iois / s_iois
tempo_fun = interp1d(
unique_s_onsets_mt[:-1],
beat_period,
kind="zero",
bounds_error=False,
fill_value=(beat_period[0], beat_period[-1]),
)
if input_onsets is None:
input_onsets = unique_s_onsets[:-1]
tempo_curve = tempo_fun(input_onsets)
if return_onset_idxs:
return tempo_curve, input_onsets, unique_onset_idxs
else:
return tempo_curve, input_onsets
def get_unique_seq(onsets, offsets, unique_onset_idxs=None, return_diff=False):
"""
Get unique onsets of a sequence of notes
"""
eps = np.finfo(float).eps
first_time = np.min(onsets)
# ensure last score time is later than last onset
last_time = max(np.max(onsets) + eps, np.max(offsets))
total_dur = last_time - first_time
if unique_onset_idxs is None:
# unique_onset_idxs = unique_onset_idx(score[:, 0])
unique_onset_idxs = get_unique_onset_idxs(onsets)
u_onset = np.array([np.mean(onsets[uix]) for uix in unique_onset_idxs])
# add last offset, so we have as many IOIs as notes
u_onset = np.r_[u_onset, last_time]
output_dict = dict(
u_onset=u_onset, total_dur=total_dur, unique_onset_idxs=unique_onset_idxs
)
if return_diff:
output_dict["diff_u_onset"] = np.diff(u_onset)
return output_dict
def get_unique_onset_idxs(
onsets, eps: float = 1e-6, return_unique_onsets: bool = False
):
"""
Get unique onsets and their indices.
Parameters
----------
onsets : np.ndarray
Score onsets in beats.
eps : float
Small epsilon (for dealing with quantization in symbolic scores).
This is particularly useful for dealing with triplets and other
similar rhytmical structures that do not have a finite decimal
representation.
return_unique_onsets : bool (optional)
If `True`, returns the unique score onsets.
Returns
-------
unique_onset_idxs : np.ndarray
Indices of the unique onsets in the score.
unique_onsets : np.ndarray
Unique score onsets
"""
# Do not assume that the onsets are sorted
# (use a stable sorting algorithm for preserving the order
# of elements with the same onset, which is useful e.g. if the
# notes within a same onset are ordered by pitch)
sort_idx = np.argsort(onsets, kind="mergesort")
split_idx = np.where(np.diff(onsets[sort_idx]) > eps)[0] + 1
unique_onset_idxs = np.split(sort_idx, split_idx)
if return_unique_onsets:
# Instead of np.unique(onsets)
unique_onsets = np.array([onsets[uix].mean() for uix in unique_onset_idxs])
return unique_onset_idxs, unique_onsets
else:
return unique_onset_idxs
def to_matched_score(part, ppart, alignment):
"""
Returns a mixed score-performance note array
consisting of matched notes in the alignment.
Args:
part (partitura.Part): a part object
ppart (partitura.PerformedPart): a performedpart object
alignment (List(Dict)): an alignment
Returns:
np.ndarray: a minimal, aligned
score-performance note array
"""
# remove repetitions from aligment note ids
for a in alignment:
if a["label"] == "match":
a["score_id"] = str(a["score_id"])
part_by_id = dict((n.id, n) for n in part.notes_tied)
ppart_by_id = dict((n["id"], n) for n in ppart.notes)
# pair matched score and performance notes
note_pairs = [
(part_by_id[a["score_id"]], ppart_by_id[a["performance_id"]])
for a in alignment
if (a["label"] == "match" and a["score_id"] in part_by_id)
]
ms = []
# sort according to onset (primary) and pitch (secondary)
pitch_onset = [(sn.midi_pitch, sn.start.t) for sn, _ in note_pairs]
sort_order = np.lexsort(list(zip(*pitch_onset)))
beat_map = part.beat_map
snote_ids = []
for i in sort_order:
sn, n = note_pairs[i]
sn_on, sn_off = beat_map([sn.start.t, sn.start.t + sn.duration_tied])
sn_dur = sn_off - sn_on
# hack for notes with negative durations
n_dur = max(n["sound_off"] - n["note_on"], 60 / 200 * 0.25)
ms.append((sn_on, sn_dur, sn.midi_pitch, n["note_on"], n_dur, n["velocity"]))
snote_ids.append(sn.id)
fields = [
("onset", "f4"),
("duration", "f4"),
("pitch", "i4"),
("p_onset", "f4"),
("p_duration", "f4"),
("velocity", "i4"),
]
return np.array(ms, dtype=fields), snote_ids
def encode_articulation(
score_durations, performed_durations, unique_onset_idxs, beat_period
):
"""
Encode articulation
"""
articulation = np.zeros_like(score_durations)
for idx, bp in zip(unique_onset_idxs, beat_period):
sd = score_durations[idx]
pd = performed_durations[idx]
# indices of notes with duration 0 (grace notes)
grace_mask = sd <= 0
# Grace notes have an articulation ratio of 1
sd[grace_mask] = 1
pd[grace_mask] = bp
# Compute log articulation ratio
articulation[idx] = np.log2(pd / (bp * sd))
return articulation
def monotonize_times(s, deltas=None):
"""Interpolate linearly over as many points in `s` as necessary to
obtain a monotonic sequence. The minimum and maximum of `s` are
prepended and appended, respectively, to ensure monotonicity at
the bounds of `s`.
Parameters
----------
s : ndarray
a sequence of numbers
strict : bool
when True, return a strictly monotonic sequence (default: True)
Returns
-------
ndarray
a monotonic sequence that has been linearly interpolated using a subset of s
"""
eps = np.finfo(float).eps
_s = np.r_[np.min(s) - eps, s, np.max(s) + eps]
if deltas is not None:
_deltas = np.r_[np.min(deltas) - eps, deltas, np.max(deltas) + eps]
else:
_deltas = None
mask = np.ones(_s.shape[0], dtype=bool)
mask[0] = mask[-1] = False
idx = np.arange(_s.shape[0])
s_mono = interp1d(idx[mask], _s[mask])(idx[1:-1])
return _s[mask], _deltas[mask]
def notewise_to_onsetwise(notewise_inputs, unique_onset_idxs):
"""Agregate basis functions per onset"""
if isinstance(notewise_inputs, np.ndarray):
if notewise_inputs.ndim == 1:
shape = len(unique_onset_idxs)
else:
shape = (len(unique_onset_idxs),) + notewise_inputs.shape[1:]
onsetwise_inputs = np.zeros(shape, dtype=notewise_inputs.dtype)
elif isinstance(notewise_inputs, torch.Tensor):
onsetwise_inputs = torch.zeros(
(len(unique_onset_idxs), notewise_inputs.shape[1]),
dtype=notewise_inputs.dtype,
)
for i, uix in enumerate(unique_onset_idxs):
try:
onsetwise_inputs[i] = notewise_inputs[uix].mean(0)
except TypeError:
for tn in notewise_inputs.dtype.names:
onsetwise_inputs[i][tn] = notewise_inputs[uix][tn].mean()
return onsetwise_inputs
def onsetwise_to_notewise(onsetwise_input, unique_onset_idxs):
"""Expand onsetwise predictions for each note"""
n_notes = sum([len(uix) for uix in unique_onset_idxs])
if isinstance(onsetwise_input, np.ndarray):
if onsetwise_input.ndim == 1:
shape = n_notes
else:
shape = (n_notes,) + onsetwise_input.shape[1:]
notewise_inputs = np.zeros(shape, dtype=onsetwise_input.dtype)
elif isinstance(onsetwise_input, torch.Tensor):
notewise_inputs = torch.zeros(n_notes, dtype=onsetwise_input.dtype)
for i, uix in enumerate(unique_onset_idxs):
notewise_inputs[uix] = onsetwise_input[[i]]
return notewise_inputs