#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
This module implement a series of mid-level descriptors of the performance expressions.
Built upon the low-level basis functions from the performance codec.
"""
import sys
import types
from typing import Union, List
import warnings
import numpy as np
from scipy.signal import find_peaks
import numpy.lib.recfunctions as rfn
from partitura.score import ScoreLike
from partitura.performance import PerformanceLike, PerformedPart
from partitura.utils.generic import interp1d
from partitura.musicanalysis.performance_codec import (
to_matched_score,
onsetwise_to_notewise,
encode_tempo,
)
__all__ = [
"make_performance_features",
]
# ordinal
OLS = ["ppp", "pp", "p", "mp", "mf", "f", "ff", "fff"]
class InvalidPerformanceFeatureException(Exception):
pass
def compute_matched_score(
score: ScoreLike,
performance: PerformanceLike,
alignment: list,
):
"""
Compute the matched score and add the score features
Parameters
----------
score : partitura.score.ScoreLike
Score information, can be a part, score
performance : partitura.performance.PerformanceLike
Performance information, can be a ppart, performance
alignment : list
The score--performance alignment, a list of dictionaries
Returns
-------
m_score : np strutured array
unique_onset_idxs : list
"""
m_score, snote_ids = to_matched_score(
score, performance, alignment, include_score_markings=True
)
(time_params, unique_onset_idxs) = encode_tempo(
score_onsets=m_score["onset"],
performed_onsets=m_score["p_onset"],
score_durations=m_score["duration"],
performed_durations=m_score["p_duration"],
return_u_onset_idx=True,
tempo_smooth="average",
)
m_score = rfn.append_fields(
m_score,
["beat_period", "timing", "articulation_log", "id"],
[
time_params["beat_period"],
time_params["timing"],
time_params["articulation_log"],
snote_ids,
],
["f4", "f4", "f4", "U256"],
usemask=False,
)
return m_score, unique_onset_idxs, snote_ids
def list_performance_feats_functions():
"""Return a list of all feature function names defined in this module.
The feature function names listed here can be specified by name in
the `make_performance_features` function. For example:
>>> feature, names = make_performance_features(score,
performance,
alignment,
['asynchrony_feature'])
Returns
-------
list
A list of strings
"""
module = sys.modules[__name__]
bfs = []
exclude = {"make_feature"}
for name in dir(module):
if name in exclude:
continue
member = getattr(sys.modules[__name__], name)
if isinstance(member, types.FunctionType) and name.endswith("_feature"):
bfs.append(name)
return bfs
def print_performance_feats_functions():
"""Print a list of all feature function names defined in this module,
with descriptions where available.
"""
module = sys.modules[__name__]
doc_indent = 4
for name in list_performance_feats_functions():
print("* {}".format(name))
member = getattr(sys.modules[__name__], name)
if member.__doc__:
print(
" " * doc_indent + member.__doc__.replace("\n", " " * doc_indent + "\n")
)
# alias
list_performance_feature_functions = list_performance_feats_functions
print_performance_feature_functions = print_performance_feats_functions
### Asynchrony
def asynchrony_feature(
m_score: np.ndarray, unique_onset_idxs: list, performance: PerformanceLike
):
"""
Compute the asynchrony attributes from the alignment.
Parameters
----------
m_score : list
correspondance between score and performance notes, with score markings.
unique_onset_idxs : list
a list of arrays with the note indexes that have the same onset
performance: PerformedPart
The original PerformedPart object
Returns
-------
async_ : structured array
structured array (broadcasted to the note level) with the following fields
delta [0, 1]: the largest time difference (in seconds) between onsets in this group
pitch_cor [-1, 1]: correlation between timing and pitch, min-scaling
vel_cor [-1, 1]: correlation between timing and velocity, min-scaling
voice_std [0, 1]: std of the avg timing (in seconds) of each voice in this group
"""
async_ = np.zeros(
len(unique_onset_idxs),
dtype=[
("delta", "f4"),
("pitch_cor", "f4"),
("vel_cor", "f4"),
("voice_std", "f4"),
],
)
for i, onset_idxs in enumerate(unique_onset_idxs):
note_group = m_score[onset_idxs]
onset_times = note_group["p_onset"]
delta = min(onset_times.max() - onset_times.min(), 1)
async_[i]["delta"] = delta
midi_pitch = note_group["pitch"]
midi_pitch = midi_pitch - midi_pitch.min() # min-scaling
onset_times = onset_times - onset_times.min()
cor = (
(-1) * np.corrcoef(midi_pitch, onset_times)[0, 1]
if (len(midi_pitch) > 1 and sum(midi_pitch) != 0 and sum(onset_times) != 0)
else 0
)
# cor=nan if there is only one note in the group
async_[i]["pitch_cor"] = cor
assert not np.isnan(cor)
midi_vel = note_group["velocity"].astype(float)
midi_vel = midi_vel - midi_vel.min()
cor = (
(-1) * np.corrcoef(midi_vel, onset_times)[0, 1]
if (sum(midi_vel) != 0 and sum(onset_times) != 0)
else 0
)
async_[i]["vel_cor"] = cor
assert not np.isnan(cor)
voices = np.unique(note_group["voice"])
voices_onsets = []
for voice in voices:
note_in_voice = note_group[note_group["voice"] == voice]
voices_onsets.append(note_in_voice["p_onset"].mean())
async_[i]["voice_std"] = min(np.std(np.array(voices_onsets)), 1)
return onsetwise_to_notewise(async_, unique_onset_idxs)
### Dynamics
### Articulation
def articulation_feature(
m_score: np.ndarray,
unique_onset_idxs: list,
performance: PerformanceLike,
return_mask=False,
):
"""
Compute the articulation attributes (key overlap ratio) from the alignment.
Key overlap ratio is the ratio between key overlap time (KOT) and IOI, result in a value between (-1, inf)
-1 is the dummy value. For normalization purposes we empirically cap the maximum to 5.
References
----------
.. [1] B.Repp: Acoustics, Perception, and Production of Legato Articulation on a Digital Piano
Parameters
----------
m_score : list
correspondance between score and performance notes, with score markings.
unique_onset_idxs : list
a list of arrays with the note indexes that have the same onset
performance: PerformedPart
The original PerformedPart object
return_mask : bool
if true, return a boolean mask of legato notes, staccato notes and repeated notes
Returns
-------
kor_ : structured array (1, n_notes)
structured array on the note level with fields kor (-1, 5]
"""
m_score = rfn.append_fields(
m_score, "offset", m_score["onset"] + m_score["duration"], usemask=False
)
m_score = rfn.append_fields(
m_score, "p_offset", m_score["p_onset"] + m_score["p_duration"], usemask=False
)
kor_ = np.full(len(m_score), -1, dtype=[("kor", "f4")])
if return_mask:
mask = np.full(
len(m_score),
False,
dtype=[("legato", "?"), ("staccato", "?"), ("repeated", "?")],
)
# consider the note transition by each voice
for voice in np.unique(m_score["voice"]):
match_voiced = m_score[m_score["voice"] == voice]
for _, note_info in enumerate(match_voiced):
if note_info["onset"] == match_voiced["onset"].max(): # last beat
break
next_note_info = get_next_note(
note_info, match_voiced
) # find most plausible transition
if next_note_info: # in some cases no meaningful transition
j = np.where(m_score == note_info)[0].item() # original position
if note_info["offset"] == next_note_info["onset"]:
kor_[j]["kor"] = get_kor(note_info, next_note_info)
if return_mask: # return the
if (note_info["slur_feature.slur_incr"] > 0) or (
note_info["slur_feature.slur_decr"] > 0
):
mask[j]["legato"] = True
if note_info["articulation"] == "staccato":
mask[j]["staccato"] = True
# KOR for repeated notes
if note_info["pitch"] == next_note_info["pitch"]:
mask[j]["repeated"] = True
if return_mask:
return kor_, mask
else:
return kor_
def get_kor(e1, e2):
"""
calculate the ratio between key overlap time and IOI.
In the case of a negative IOI (the order of notes in performance is reversed from the score),
set at default 0.
is bounded within the interval [-1,5]
Parameters
----------
e1 : np.ndarray
the m_score row of first note
e2 : np.ndarray
the m_score of second note
Returns
-------
kor : float
Key overlap ratio
"""
kot = e1["p_offset"] - e2["p_onset"]
ioi = e2["p_onset"] - e1["p_onset"]
if ioi <= 0:
kor = 0
kor = kot / ioi
return min(kor, 5)
def get_next_note(note_info, match_voiced):
"""
get the next note in the same voice that's a reasonable transition
Parameters
----------
note_info : np.ndarray
the row of current note
match_voiced : np.ndarray
all notes in the same voice
Returns
-------
next_position : np.ndarray
the next note
"""
next_position = min(o for o in match_voiced["onset"] if o > note_info["onset"])
# if the next note is not immediate successor of the previous one...
if next_position != note_info["onset"] + note_info["duration"]:
return None
next_position_notes = match_voiced[match_voiced["onset"] == next_position]
# from the notes in the next position, find the one that's closest pitch-wise.
closest_idx = np.abs((next_position_notes["pitch"] - note_info["pitch"])).argmin()
return next_position_notes[closest_idx]
### Pedals
def pedal_feature(m_score: list, unique_onset_idxs: list, performance: PerformanceLike):
"""
Compute the pedal features.
Parameters
----------
m_score : list
correspondance between score and performance notes, with score markings.
unique_onset_idxs : list
a list of arrays with the note indexes that have the same onset
performance: PerformedPart
The original PerformedPart object
Returns
-------
pedal_ : structured array (4, n_notes) with fields
onset_value [0, 127]: The interpolated pedal value at the onset
offset_value [0, 127]: The interpolated pedal value at the key offset
to_prev_release [0, 10]: delta time from note onset to the previous pedal release 'peak'
to_next_release [0, 10]: delta time from note offset to the next pedal release 'peak'
(think about something relates to the real duration)
"""
onset_offset_pedals, ramp_func = pedal_ramp(performance.performedparts[0], m_score)
x = np.linspace(0, 100, 200)
y = ramp_func(x)
peaks, _ = find_peaks(-y, prominence=10)
peak_timepoints = x[peaks]
release_times = np.zeros(
len(m_score), dtype=[("to_prev_release", "f4"), ("to_next_release", "f4")]
)
for i, note in enumerate(m_score):
peaks_before = peak_timepoints[note["p_onset"] >= peak_timepoints]
peaks_after = peak_timepoints[
(note["p_onset"] + note["p_duration"]) <= peak_timepoints
]
if len(peaks_before):
release_times[i]["to_prev_release"] = min(
note["p_onset"] - peaks_before.max(), 10
)
if len(peaks_after):
release_times[i]["to_next_release"] = min(
peaks_after.min() - (note["p_onset"] + note["p_duration"]), 10
)
return rfn.merge_arrays(
[onset_offset_pedals, release_times], flatten=True, usemask=False
)
def pedal_ramp(ppart: PerformedPart, m_score: np.ndarray):
"""Pedal ramp in the same shape as the m_score.
Returns:
* pramp : a ramp function that ranges from 0
to 127 with the change of sustain pedal
"""
pedal_controls = ppart.controls
W = np.zeros((len(m_score), 2))
onset_timepoints = m_score["p_onset"]
offset_timepoints = m_score["p_onset"] + m_score["p_duration"]
timepoints = [control["time"] for control in pedal_controls]
values = [control["value"] for control in pedal_controls]
if len(timepoints) <= 1: # the case there is no pedal
timepoints, values = [0, 0], [0, 0]
agg_ramp_func = interp1d(timepoints, values, bounds_error=False, fill_value=0)
W[:, 0] = agg_ramp_func(onset_timepoints)
W[:, 1] = agg_ramp_func(offset_timepoints)
# Filter out NaN values
W[np.isnan(W)] = 0.0
return (
np.array(
[tuple(i) for i in W], dtype=[("onset_value", "f4"), ("offset_value", "f4")]
),
agg_ramp_func,
)
### Phrasing
### Tempo