873 lines
26 KiB
Python
Executable File
873 lines
26 KiB
Python
Executable File
from math import *
|
|
import numpy as np
|
|
from scipy.io import wavfile
|
|
from scipy import signal
|
|
import matplotlib.pyplot as plt
|
|
import subprocess
|
|
import wave as wv
|
|
import struct
|
|
import librosa
|
|
import heapq
|
|
import scipy
|
|
import os
|
|
import random
|
|
from pathlib import Path
|
|
from time import sleep
|
|
from datetime import timedelta
|
|
|
|
import debug
|
|
|
|
print("Starting...\n")
|
|
|
|
def filter_n_percent_serial(song_name, offset, n_iter, step, threshold):
|
|
"""
|
|
song_name : string
|
|
offset : int
|
|
n_iter : int (number of turns)
|
|
step : int (length of each small segment)
|
|
threshold : int (is in ]0, 100])
|
|
|
|
filter data associated with song_name to keep only the highest threshold% values
|
|
"""
|
|
|
|
subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(offset+step*n_iter), "-i", song_name, "crop.wav"], shell=False)
|
|
|
|
sample_rate, global_data = wavfile.read('crop.wav')
|
|
|
|
subprocess.run(["clear"], shell=False)
|
|
subprocess.run(["rm", "crop.wav"], shell=False)
|
|
|
|
for i in range(n_iter):
|
|
print(i, "/", n_iter)
|
|
#print(i * step)
|
|
song_data = global_data[int(i*step*sample_rate):int((i+1)*step*sample_rate)]
|
|
|
|
if(len(song_data) != 0):
|
|
mx = max(song_data)
|
|
|
|
is_locked = [False for i in range(len(song_data))]
|
|
x = int((len(song_data)*threshold)//100)
|
|
#print("X = ", x)
|
|
|
|
#print("Retreiving the", int(x), "/", len(song_data), "highest values")
|
|
elements = heapq.nlargest(int(x), enumerate(song_data), key=lambda x: x[1])
|
|
#print("Done")
|
|
|
|
for idx in range(len(elements)):
|
|
is_locked[elements[idx][0]] = True
|
|
|
|
for r in range(len(song_data)):
|
|
if(is_locked[r] == False):
|
|
global_data[r+int(i*step*sample_rate)] = 0
|
|
|
|
return global_data
|
|
|
|
|
|
def write_to_file_thr(sample_rate, song_data, offset, threshold, filename):
|
|
# write data to output file
|
|
file = open(filename, 'w')
|
|
file.writelines('time,amplitude\n')
|
|
mx = max(song_data)
|
|
print("writing to output...")
|
|
for i in range(len(song_data)):
|
|
if(i%(len(song_data)//50) == 0):
|
|
print(i, "/", len(song_data))
|
|
if(song_data[i]/mx > threshold):
|
|
file.writelines(str(np.round(offset + i/sample_rate, 3)))
|
|
file.writelines(',')
|
|
file.writelines(str(np.round(song_data[i], 0)))
|
|
file.writelines('\n')
|
|
|
|
def round_t(id, sample_rate, bpm, div, offset, k0):
|
|
k = k0
|
|
t = offset + k/(bpm*div)
|
|
while(t < id/sample_rate):
|
|
t = offset + k/(bpm*div)
|
|
k += 1
|
|
|
|
if(np.abs(t - id/sample_rate) < np.abs((t - 1/(bpm*div)) - id/sample_rate)):
|
|
return t
|
|
return (t - 1/(bpm*div), 0)
|
|
|
|
def compress(Zxx):
|
|
res = []
|
|
|
|
def get_freq(song_name, times, width=1000, display=False):
|
|
"""
|
|
for a given list of times (in seconds), returns the corresponding peak frequencies
|
|
"""
|
|
|
|
subprocess.run(["ffmpeg", "-ss", str(0), "-t", str(max(np.array(times))), "-i", song_name, "crop.wav"], shell=False)
|
|
|
|
sample_rate, global_data = wavfile.read(song_name)
|
|
#blit = int(sample_rate*step)
|
|
|
|
subprocess.run(["clear"], shell=False)
|
|
subprocess.run(["rm", "crop.wav"], shell=False)
|
|
|
|
pfreq = scipy.fft.rfftfreq(2*width, 1/sample_rate)
|
|
|
|
frequencies = [0 for s in range(len(times))]
|
|
print(len(pfreq))
|
|
|
|
for s in range(len(times)):
|
|
left = max(0, int(times[s]*44100)-width)
|
|
right = min(len(global_data), int(times[s]*44100)+width)
|
|
pff = scipy.fft.rfft(global_data[left:right])
|
|
|
|
#print(len(pff), len(pfreq))
|
|
|
|
mx = max(np.abs(pff))
|
|
for id in range(len(pff)):
|
|
if frequencies[s] == 0 and np.abs(pff[id]) == mx:
|
|
frequencies[s] = pfreq[id]
|
|
|
|
if(display):
|
|
plt.plot(times, frequencies)
|
|
plt.grid()
|
|
plt.xlabel("Time (s)")
|
|
plt.ylabel("Dominant frequency (Hz)")
|
|
plt.title("Dominant frequencies at peaks")
|
|
plt.show()
|
|
|
|
return frequencies
|
|
|
|
def is_data_stereo(raw_global_data:list) -> bool:
|
|
"""
|
|
raw_global_data : list
|
|
"""
|
|
try:
|
|
assert(raw_global_data[0][0])
|
|
except IndexError:
|
|
return False
|
|
except AssertionError:
|
|
return True
|
|
return True
|
|
|
|
|
|
def void_freq(song_name, offset, songlen, increment, minfreq, maxfreq, upperthr, ampthr, ampfreq, ampval, leniency, write, linear, output_file="trimmed.wav"):
|
|
"""
|
|
song_name : string
|
|
offset : int
|
|
songlen : int (length of the part that will be filtered, starting from offset)
|
|
increment : float (technical parameter)
|
|
minfreq and maxfreq : every frequency in [minfreq, maxfreq] will be voided
|
|
upperthr : every frequency above upperthr will be voided
|
|
ampthr : every frequency with amplitude under MAX/ampthr (aka amplitudes under (100/ampthr)% of the max will be voided
|
|
ampfreq, leniency (if linear is false), linear : technical parameters
|
|
ampval : int
|
|
- if linear is false, then this willbe the maximum amplification possible
|
|
- if linear is true, this is the multiplier (Amp <- Amp * (ampval * frequency + leniency))
|
|
write : bool (should be set to True)
|
|
output_file : technical
|
|
"""
|
|
fft_list = []
|
|
times = []
|
|
current_time = offset
|
|
k = 0
|
|
|
|
subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(songlen+offset), "-i", song_name, "crop.wav"], shell=False)
|
|
|
|
sample_rate, raw_global_data = wavfile.read("crop.wav")
|
|
blit = int(sample_rate*increment)
|
|
|
|
global_data = [0 for i in range(len(raw_global_data))]
|
|
|
|
#subprocess.run(["clear"])
|
|
subprocess.run(["rm", "crop.wav"], shell=False)
|
|
|
|
a = 0
|
|
|
|
if(is_data_stereo(raw_global_data)):
|
|
print("Converting to mono...")
|
|
for x in range(len(raw_global_data)):
|
|
global_data[x] = raw_global_data[x][0]/2 + raw_global_data[x][1]/2
|
|
|
|
if(x % (int(len(raw_global_data)/100)) == 0):
|
|
print(a, "/ 100")
|
|
a += 1
|
|
|
|
else:
|
|
global_data = raw_global_data
|
|
|
|
#print("Blit :", blit)
|
|
|
|
pfreq = scipy.fft.rfftfreq(blit, 1/sample_rate)
|
|
|
|
#print(len(pfreq))
|
|
|
|
while(current_time <= songlen):
|
|
pff = scipy.fft.rfft(global_data[k*blit:(k+1)*blit])
|
|
fft_list.append(pff)
|
|
times.append(k*increment)
|
|
|
|
k += 1
|
|
current_time = offset + k*increment
|
|
|
|
print("FFT :", len(fft_list), "\nFFT[0] :", len(fft_list[0]), "\npfreq :", len(pfreq))
|
|
|
|
|
|
print("Finding global max...")
|
|
|
|
if(linear == False):
|
|
for i in range(len(fft_list)):
|
|
for j in range(len(fft_list[i])):
|
|
fft_list[i][j] *= (1 + ampval/max(1, np.abs(pfreq[j] - ampfreq)))
|
|
else:
|
|
for i in range(len(fft_list)):
|
|
for j in range(len(fft_list[i])):
|
|
fft_list[i][j] *= (ampval*pfreq[j] + leniency)
|
|
|
|
print("Trimming...")
|
|
|
|
for i in range(len(fft_list)):
|
|
lmax = 0
|
|
for j in range(len(fft_list[i])):
|
|
if(np.abs(fft_list[i][j]) > lmax):
|
|
lmax = np.abs(fft_list[i][j])
|
|
|
|
for j in range(len(fft_list[i])):
|
|
if((pfreq[j] >= minfreq and pfreq[j] < maxfreq) or pfreq[j] > upperthr):
|
|
fft_list[i][j] = 0+0j
|
|
|
|
if(np.abs(fft_list[i][j]) < lmax/ampthr):
|
|
fft_list[i][j] = 0+0j
|
|
|
|
|
|
if(write):
|
|
res = []
|
|
print("Converting...")
|
|
for i in range(len(fft_list)):
|
|
ift = scipy.fft.irfft(fft_list[i], n=blit)
|
|
for k in ift:
|
|
res.append(k)
|
|
#print(type(res[0]))
|
|
mx = 0
|
|
for j in range(len(res)):
|
|
if(res[j] > mx):
|
|
mx = res[j]
|
|
|
|
for i in range(len(res)):
|
|
res[i] = np.int16(32767*res[i]/mx)
|
|
|
|
res = np.array(res)
|
|
wavfile.write(output_file, 44100, res)
|
|
|
|
#plt.plot(np.abs(pfreq[:len(fft_list[0])]), np.abs(fft_list[0]))
|
|
#plt.grid()
|
|
#plt.show()
|
|
|
|
print("Done")
|
|
|
|
def convert_tuple(data, times):
|
|
"""
|
|
Takes data and converts it to a list of tuples (amplitude, datetimes)
|
|
"""
|
|
return [(times[i], data[i]) for i in range(len(data))]
|
|
|
|
def get_songlen(filename):
|
|
"""
|
|
retrieves the length of the song in seconds
|
|
"""
|
|
sample_rate, global_data = wavfile.read(filename)
|
|
print("LEN :", len(global_data)/sample_rate)
|
|
|
|
return (len(global_data)/sample_rate)
|
|
|
|
def convert_to_wav(song_name:str, output_file="audio.wav") -> str:
|
|
"""
|
|
Converts the song to .wav, only if it's not already in wave format.
|
|
Currently relies on file extension.
|
|
Returns: the song_name that should be used afterwards.
|
|
"""
|
|
extension = Path(song_name).suffix
|
|
match extension:
|
|
case ".mp3" | ".ogg":
|
|
print("Converting to .wav...")
|
|
subprocess.run(["ffmpeg", "-y", "-i", song_name, output_file], shell=False)
|
|
return output_file
|
|
return song_name
|
|
|
|
def process_song(filename, bpm, offset0=0, div_len_factor=1, n_iter_2=-1, threshold=0.5, divisor=4):
|
|
"""
|
|
filename : string (name of the song)
|
|
offset : int [+] (song mapping will start from this time in seconds, default is 0)
|
|
bpm : int [+]
|
|
div_len_factor : float [+] (the length multiplier of each segment, default is 1)
|
|
n_iter : int [+*] (the number of iterations, default is -1 (maps the whole music))
|
|
threshold : int [0, 100] (used by the filter function to only keep the largest threshold% of timing points, default is 0.5)
|
|
divisor : int [+] (beat divisor used to snap the notes, default is 4)
|
|
"""
|
|
|
|
filename = convert_to_wav(filename)
|
|
|
|
offset = offset0/1000
|
|
|
|
div_len = div_len_factor*60/bpm-0.01
|
|
|
|
n_iter = n_iter_2
|
|
song_len = get_songlen(filename)
|
|
|
|
if(n_iter == -1):
|
|
n_iter = int((song_len-offset/1000)/div_len)-1
|
|
|
|
filtered_name = f"{filename}_trimmed.wav"
|
|
|
|
void_freq(filename, offset, min(song_len, offset+div_len*(n_iter+1)+0.01), 4*60/bpm, minfreq=0, maxfreq=220, upperthr=5000, ampthr=60, ampfreq = 1200, ampval = 5.0, leniency = 0.005, write=True, linear=False, output_file=filtered_name)
|
|
|
|
datares = filter_n_percent_serial(filtered_name, offset, n_iter, div_len, threshold)
|
|
|
|
#snapped_data = amplitude
|
|
#times in ms
|
|
(snapped_data, times) = debug.snap3(datares, mintime=50, initial_plot=True, after_plot=True)
|
|
|
|
#frequencies=get_freq(filtered_name, offset, div_len, div_len*n_iter, snapped_data, True)
|
|
frequencies = get_freq(filtered_name, times, display=True)
|
|
|
|
Path(f"{filename}_trimmed.wav").unlink()
|
|
return snapped_data, times, frequencies
|
|
|
|
'''
|
|
datares = debug.snap2(datares, 44100, bpm, first_offset=offset, div=divisor, show=True, adjust=True)
|
|
frequencies = get_freq(filtered_name, offset, div_len, div_len*n_iter, datares, True)
|
|
Path(f"{filename}_trimmed.wav").unlink()
|
|
return convert_tuple(datares, frequencies)
|
|
'''
|
|
|
|
def main():
|
|
aa, bb, cc = process_song("tetris_4.wav", 160, n_iter_2=48)
|
|
#print(data)
|
|
print("Program finished with return 0")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
''' -------------------------------------------------------------------- '''
|
|
''' -----------------------| Feuilles mortes |-------------------------- '''
|
|
''' -------------------------------------------------------------------- '''
|
|
|
|
|
|
'''
|
|
def smooth(data, thr, mergeThr, show):
|
|
mx = max(data)
|
|
for i in range(len(data)-mergeThr):
|
|
if(data[i]/mx > thr):
|
|
for k in range(1, mergeThr):
|
|
data[i+k] = 0
|
|
if(show):
|
|
t = [j/1000 for j in range(len(data))]
|
|
plt.plot(t, data)
|
|
plt.xlabel("Time (not scaled to origin)")
|
|
plt.ylabel("Amplitude")
|
|
plt.grid()
|
|
plt.show()
|
|
|
|
return data
|
|
if(False):
|
|
#t, f, Zxx = fct("no.wav", 0, 0.032, 10, 5000, False)
|
|
#t, f, Zxx = fct("worlds_end_3.wav", 150.889, 0.032, 170.889, 3000, False)
|
|
#t, f, Zxx = fct("deltamax.wav", 9.992, 0.032, 114.318, 3000, False)
|
|
#t, f, Zxx = fct("deltamax.wav", 9.992, 0.032, 20, 3000, False)
|
|
#t, f, Zxx = fct("da^9.wav", 8.463, 0.032, 20, 5000, False)
|
|
t, f, Zxx = fct("13. Cosmic Mind.wav", 0, 0.032, 20, 5000, False)
|
|
#t, f, Zxx = fct("Furioso Melodia 44100.wav", 4, 0.032, 8, 3000, False)
|
|
#t, f, Zxx = fct("changing.wav", 0, 0.05, 3.9, 5000, False)
|
|
#fct("worlds_end_3.wav", 75, (60/178)/4, 75+2, 2500)
|
|
|
|
plot_max(t, f, Zxx, True)
|
|
|
|
if(False):
|
|
#(t, data) = peaks("worlds_end_3.wav", 0, 300, False, 0.92)
|
|
(t, data) = peaks("worlds_end_3.wav", 74.582, 6, False, 0.9)
|
|
#(t, data) = peaks("da^9.wav", 8.463, 301.924 - 8.463, False, 0.95)
|
|
#(t, data) = peaks("deltamax.wav", 8.463, 30101.924 - 8.463, False, 0.92)
|
|
da = find_bpm(t, 44100, data, 100, 200, 1, 10)
|
|
print("BPM data is", da)'''
|
|
|
|
#data = [-1 for i in range(int(x))]
|
|
#ids = [-1 for i in range(int(x))]
|
|
'''
|
|
data = []
|
|
ids = []
|
|
for k in range(int(x)):
|
|
data.append(int(7*mx/10))
|
|
ids.append(-1)
|
|
# structure there is [[index, value]...]
|
|
|
|
i = 0
|
|
calc = 0
|
|
while(i < len(song_data)):
|
|
if(i%10 == 0):
|
|
print(i, "/", len(song_data))
|
|
if(data[int(x)-1] < song_data[i]):
|
|
calc += 1
|
|
#print("\n \n \n \n \n")
|
|
data[int(x)-1] = song_data[i]
|
|
ids[int(x)-1] = i
|
|
|
|
k = int(x)-1
|
|
#while(k < int(x) & data[0] > data[k]):
|
|
while(k > 0 and data[k-1] <= data[k]):
|
|
data[k], data[k-1] = data[k-1], data[k]
|
|
ids[k], ids[k-1] = ids[k-1], ids[k]
|
|
k -= 1
|
|
|
|
#print(data[int(x)-1], calc, "/", x)
|
|
|
|
i += skip
|
|
i += 1
|
|
|
|
|
|
for s in range(int(x)-1):
|
|
if(data[s] < data[s+1]):
|
|
print("Nope", s)
|
|
assert(0)
|
|
'''
|
|
|
|
|
|
'''
|
|
def fct(song_name, offset, increment, songlen, maxfreq, display):
|
|
to_cut = 20000//maxfreq
|
|
global_Zxx = np.array([])
|
|
global_f = np.array([])
|
|
global_t = np.array([])
|
|
current_time = offset
|
|
k = 0
|
|
while(current_time <= songlen):
|
|
subprocess.run(["ffmpeg", "-ss", str(current_time), "-t", str(increment), "-i", song_name, "crop.wav"], shell=False)
|
|
|
|
sample_rate, audio_data = wavfile.read('crop.wav')
|
|
size = audio_data.size
|
|
|
|
#subprocess.run(["clear"])
|
|
subprocess.run(["rm", "crop.wav"], shell=False)
|
|
|
|
# do stuff here
|
|
#f, t, Zxx = signal.stft(audio_data, sample_rate, nperseg=1000)
|
|
f, t, Zxx = signal.spectrogram(audio_data, fs=sample_rate, nfft=size)
|
|
leng = len(f)
|
|
|
|
f, Zxx = f[:leng//to_cut], Zxx[:leng//to_cut]
|
|
|
|
#print(len(Zxx))
|
|
#print(len(Zxx[0]))
|
|
|
|
|
|
for i in range(len(Zxx)):
|
|
for j in range(len(Zxx[i])):
|
|
Zxx[i][j] *= 1127*np.log(1+f[i]/700)
|
|
|
|
|
|
t = np.array([current_time + x for x in t])
|
|
|
|
if(k == 0):
|
|
global_f = f
|
|
global_t = t
|
|
global_Zxx = Zxx
|
|
else:
|
|
global_Zxx = np.concatenate((global_Zxx, Zxx), axis=1)
|
|
global_t = np.concatenate((global_t, t))
|
|
|
|
#print(len(global_t))
|
|
|
|
k += 1
|
|
current_time = offset + k*increment
|
|
|
|
print("Completion rate : ", np.round(100*(current_time-offset)/(songlen-offset), 4), "%")
|
|
|
|
if(display):
|
|
plt.pcolormesh(global_t, global_f, np.abs(global_Zxx), shading='gouraud')
|
|
# print(len(global_Zxx), len(global_Zxx[0]))
|
|
# 88 192 = 2500
|
|
# 70 192 = 2000
|
|
plt.title('STFT Magnitude')
|
|
plt.ylabel('Frequency [Hz]')
|
|
plt.xlabel('Time [sec]')
|
|
plt.show()
|
|
|
|
return global_t, global_f, np.abs(global_Zxx)
|
|
|
|
def write_to_file(t, flist, maxlist, filename):
|
|
file = open(filename, 'w')
|
|
file.writelines('time,frequency,maxvalue\n')
|
|
for i in range(len(t)):
|
|
file.writelines(str(np.round(t[i], 3)))
|
|
file.writelines(',')
|
|
file.writelines(str(np.round(flist[i], 1)))
|
|
file.writelines(',')
|
|
file.writelines(str(np.round(maxlist[i], 0)))
|
|
file.writelines('\n')
|
|
#close(file)
|
|
|
|
def plot_max(time, freq, Zxx, save):
|
|
fres = [0 for x in range(len(time))]
|
|
maxres = [0 for x in range(len(time))]
|
|
for t in range(len(time)):
|
|
#subprocess.run(["clear"])
|
|
print(t, "/", len(time))
|
|
for f in range(len(Zxx)):
|
|
if(maxres[t] < Zxx[f][t]):
|
|
maxres[t] = Zxx[f][t]
|
|
fres[t] = freq[f]
|
|
|
|
if(save):
|
|
write_to_file(time, fres, maxres, 'output.csv')
|
|
|
|
''''''
|
|
plt.plot(time, fres, 'r')
|
|
plt.grid()
|
|
plt.xlabel("Time")
|
|
plt.ylabel("Maximum frequencies")
|
|
|
|
plt.plot(time, maxres, 'g')
|
|
plt.grid()
|
|
plt.xlabel("Time")
|
|
plt.ylabel("Maximun values")
|
|
|
|
plt.show()''''''
|
|
|
|
fig, (ax1, ax2) = plt.subplots(2)
|
|
fig.suptitle('Top : time and frequencies\nBottom : time and max values')
|
|
ax1.plot(time, fres)
|
|
ax2.plot(time, maxres)
|
|
|
|
plt.show()
|
|
|
|
def extract_peaks(song_data, sample_rate, offset, display, threshold):
|
|
mx = max(song_data)
|
|
for i in range(len(song_data)):
|
|
#subprocess.run(["clear"])
|
|
print(i, "/", len(song_data))
|
|
if(song_data[i]/mx < threshold):
|
|
song_data[i] = 0
|
|
t = [offset + i/sample_rate for i in range(len(song_data))]
|
|
|
|
if(display):
|
|
plt.plot(t, song_data, 'b+')
|
|
plt.grid()
|
|
plt.xlabel("t")
|
|
plt.ylabel("amp")
|
|
plt.show()
|
|
|
|
return (t, song_data)
|
|
|
|
def get_local_max(song_data, center, width):
|
|
mx = 0
|
|
for o in range(-width, width+1):
|
|
togo = min(len(song_data)-1, center+o)
|
|
togo = max(0, togo)
|
|
if(mx < song_data[togo]):
|
|
mx = song_data[togo]
|
|
return mx
|
|
|
|
def extract_peaks_v2(song_data, sample_rate, offset, display, threshold, seglen):
|
|
mx = 0
|
|
for i in range(len(song_data)):
|
|
if (i%seglen == 0):
|
|
print("----")
|
|
mx = get_local_max(song_data, i+seglen//2, seglen//2)
|
|
#subprocess.run(["clear"])
|
|
print(i, "/", len(song_data))
|
|
if(song_data[i]/mx < threshold):
|
|
song_data[i] = 0
|
|
|
|
t = [offset + i/sample_rate for i in range(len(song_data))]
|
|
|
|
if(display):
|
|
plt.plot(t, song_data, 'b+')
|
|
plt.grid()
|
|
plt.xlabel("t")
|
|
plt.ylabel("amp")
|
|
plt.show()
|
|
|
|
return (t, song_data)
|
|
|
|
def peaks(song_name, offset, length, display, thr):
|
|
subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(length), "-i", song_name, "crop.wav"], shell=False)
|
|
|
|
sample_rate, audio_data = wavfile.read('crop.wav')
|
|
|
|
#subprocess.run(["clear"])
|
|
subprocess.run(["rm", "crop.wav"], shell=False)
|
|
|
|
#return extract_peaks(audio_data, sample_rate, offset, display, thr)
|
|
return extract_peaks_v2(audio_data, sample_rate, offset, display, thr, 44100*2)
|
|
|
|
def find_bpm(sample_rate, data, minbpm, maxbpm, step, width):
|
|
optimal = minbpm
|
|
optimal_acc = 0
|
|
accuracy = 0
|
|
|
|
bpmlst = []
|
|
scores = []
|
|
|
|
for beat in range(minbpm, maxbpm+step, step):
|
|
loopturn = 0
|
|
print("testing", beat)
|
|
accuracy = 0
|
|
current = 0
|
|
|
|
while(current+width < len(data)):
|
|
loopturn += 1
|
|
for o in range(-width, width+1):
|
|
accuracy += data[current + o]
|
|
#current = (loopturn*sample_rate)//beat
|
|
current += (sample_rate)//beat
|
|
|
|
#accuracy = accuracy/loopturn
|
|
|
|
#accuracy *= (1+(maxbpm-beat)/minbpm)
|
|
if optimal_acc < accuracy:
|
|
optimal_acc = accuracy
|
|
optimal = beat
|
|
bpmlst.append(beat)
|
|
scores.append(accuracy)
|
|
|
|
if(False):
|
|
plt.plot(bpmlst, scores)
|
|
plt.xlabel("BPM")
|
|
plt.ylabel("Score")
|
|
plt.grid()
|
|
plt.show()
|
|
|
|
return (optimal, optimal_acc)
|
|
'''
|
|
|
|
|
|
|
|
'''
|
|
def void_freq(song_name, offset, songlen, increment, lthr, gthr):
|
|
to_cut = 20000//2500
|
|
global_Zxx = np.array([])
|
|
global_f = np.array([])
|
|
global_t = np.array([])
|
|
current_time = offset
|
|
k = 0
|
|
sample_rate, global_data = wavfile.read(song_name)
|
|
blit = int(sample_rate*increment)
|
|
print("Blit :", blit)
|
|
while(current_time <= songlen):
|
|
#subprocess.run(["ffmpeg", "-ss", str(current_time), "-t", str(increment), "-i", song_name, "crop.wav"])
|
|
|
|
#sample_rate, audio_data = wavfile.read('crop.wav')
|
|
audio_data = global_data[int(k*blit):int((k+1)*blit)]
|
|
size = audio_data.size
|
|
|
|
#subprocess.run(["clear"])
|
|
#subprocess.run(["rm", "crop.wav"])
|
|
|
|
# do stuff here
|
|
#f, t, Zxx = signal.stft(audio_data, sample_rate, nperseg=1000)
|
|
f, t, Zxx = signal.spectrogram(audio_data, fs=sample_rate, nfft=size)
|
|
leng = len(f)
|
|
|
|
f, Zxx = f[:leng//to_cut], Zxx[:leng//to_cut]
|
|
|
|
for i in range(len(Zxx)):
|
|
for j in range(len(Zxx[i])):
|
|
#Zxx[i][j] *= 1127*np.log(1+f[i]/700)
|
|
Zxx[i][j] *= 1000
|
|
|
|
t = np.array([current_time + x for x in t])
|
|
|
|
if(k == 0):
|
|
global_f = f
|
|
global_t = t
|
|
global_Zxx = Zxx
|
|
else:
|
|
global_Zxx = np.concatenate((global_Zxx, Zxx), axis=1)
|
|
global_t = np.concatenate((global_t, t))
|
|
|
|
#print(len(global_t))
|
|
|
|
k += 1
|
|
current_time = offset + k*increment
|
|
|
|
print("Completion rate : ", np.round(100*(current_time-offset)/(songlen-offset), 4), "%")
|
|
|
|
print("Finding global max...")
|
|
gmax = 0
|
|
for i in range(len(global_Zxx)):
|
|
for j in range(len(global_Zxx[i])):
|
|
if(global_Zxx[i][j] > gmax):
|
|
gmax = global_Zxx[i][j]
|
|
|
|
print("Trimming...")
|
|
for j in range(len(global_Zxx[0])):
|
|
lmax = 0
|
|
for i in range(len(global_Zxx)):
|
|
if(global_Zxx[i][j] > lmax):
|
|
lmax = global_Zxx[i][j]
|
|
|
|
for i in range(len(global_Zxx)):
|
|
val = global_Zxx[i][j]
|
|
if(val/lmax <= lthr/100):
|
|
global_Zxx[i][j] = 0
|
|
elif(val/gmax <= gthr/100):
|
|
global_Zxx[i][j] = 0
|
|
|
|
if(False):
|
|
print("Plotting...")
|
|
plt.pcolormesh(global_t, global_f, np.abs(global_Zxx), shading='gouraud')
|
|
# print(len(global_Zxx), len(global_Zxx[0]))
|
|
print("XLEN :", len(global_Zxx), "\nYLEN :", len(global_Zxx[0]))
|
|
plt.title('STFT Magnitude')
|
|
plt.ylabel('Frequency [Hz]')
|
|
plt.xlabel('Time [sec]')
|
|
plt.show()
|
|
|
|
if(True):
|
|
print("Converting...")
|
|
audio_signal = librosa.griffinlim(global_Zxx)
|
|
#scipy.io.wavfile.write('trimmed.wav', sample_rate, np.array(audio_signal, dtype=np.int16))
|
|
wavfile.write('test.wav', sample_rate, np.array(audio_signal, dtype=np.int16))
|
|
|
|
print("Done")
|
|
|
|
def find_bpm_2(sample_rate, data, threshold, maxbpm, show):
|
|
mx = np.max(data)
|
|
min_spacing = (60*sample_rate)/maxbpm
|
|
k = 0
|
|
while(k < len(data) and data[k]/mx < threshold):
|
|
k += 1
|
|
|
|
k += 1
|
|
spacing = []
|
|
current = 1
|
|
progress = 0
|
|
|
|
while(k < len(data)):
|
|
if(k%(len(data)/100) == 0):
|
|
print(progress, "%")
|
|
progress += 1
|
|
if(data[k]/mx >= threshold and current > min_spacing):
|
|
spacing.append(current)
|
|
current = 0
|
|
else:
|
|
current += 1
|
|
k += 1
|
|
|
|
|
|
for x in range(len(spacing)):
|
|
spacing[x] = 60/(spacing[x]/sample_rate)
|
|
|
|
digits = [i for i in range(len(spacing))]
|
|
if(show):
|
|
plt.plot(digits, spacing)
|
|
plt.xlabel("N")
|
|
plt.ylabel("BPM")
|
|
plt.grid()
|
|
plt.show()
|
|
|
|
beat = np.mean(spacing)
|
|
error = np.std(spacing)
|
|
|
|
return (np.round(beat, 3), np.round(error, 3))
|
|
|
|
def to_ms(song_data, sample_rate, offset):
|
|
# converts audio data to have exactly 1 sample per millisecond (aka set sample_rate to 1000)
|
|
new_data = []
|
|
spacing = int(sample_rate * 0.001)
|
|
mx = max(song_data)
|
|
i = 0
|
|
while(i < len(song_data)):
|
|
avg = 0
|
|
for k in range(spacing):
|
|
if(i+spacing < len(song_data)):
|
|
avg += song_data[i+spacing]
|
|
avg = avg / spacing
|
|
new_data.append(avg)
|
|
i += spacing
|
|
|
|
if(False): # pls dont kill me thx
|
|
t = [offset + j/1000 for j in range(len(new_data))]
|
|
plt.plot(t, new_data)
|
|
plt.xlabel("Time")
|
|
plt.ylabel("Amplitude")
|
|
plt.grid()
|
|
plt.show()
|
|
|
|
return (new_data, len(new_data))
|
|
|
|
def filter_n_percent(song_name, offset, length, threshold, reduce, show):
|
|
# threshold is in ]0, 100]
|
|
# filter data associated with song_name to keep only the highest threshold% values
|
|
|
|
subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(length), "-i", song_name, "crop.wav"], shell=False)
|
|
|
|
sample_rate, song_data = wavfile.read('crop.wav')
|
|
|
|
subprocess.run(["clear"], shell=False)
|
|
subprocess.run(["rm", "crop.wav"], shell=False)
|
|
|
|
if(reduce):
|
|
(song_data,e) = to_ms(song_data, 44100, 1)
|
|
sample_rate = 1000
|
|
|
|
mx = max(song_data)
|
|
|
|
is_locked = [False for i in range(len(song_data))]
|
|
x = int((len(song_data)*threshold)//100)
|
|
#print("X = ", x)
|
|
|
|
print("Retreiving the", int(x), "/", len(song_data), "highest values")
|
|
elements = heapq.nlargest(int(x), enumerate(song_data), key=lambda x: x[1])
|
|
print("Done")
|
|
|
|
for idx in range(len(elements)):
|
|
is_locked[elements[idx][0]] = True
|
|
|
|
for r in range(len(song_data)):
|
|
if(is_locked[r] == False):
|
|
song_data[r] = 0
|
|
|
|
if(show):
|
|
#print("EEEEE")
|
|
t = [offset + j/sample_rate for j in range(len(song_data))]
|
|
plt.plot(t, song_data)
|
|
plt.xlabel("Time")
|
|
plt.ylabel("Amplitude")
|
|
plt.grid()
|
|
plt.show()
|
|
|
|
return song_data
|
|
|
|
def get_tpts(data, sample_rate, thr):
|
|
res = []
|
|
for i in range(len(data)):
|
|
if(data[i] > thr):
|
|
res.append(i/sample_rate)
|
|
|
|
for i in res:
|
|
print(i)
|
|
return res
|
|
|
|
def test_sample(timelist):
|
|
for i in range(1,len(timelist)):
|
|
#os.system('play -n synth %s sin %s' % (0.05, 440))
|
|
for k in range(random.randint(1, 10)):
|
|
print("E", end="")
|
|
print("F")
|
|
sleep(timelist[i]-timelist[i-1])
|
|
'''
|