1056 lines
31 KiB
Python
Executable File
1056 lines
31 KiB
Python
Executable File
from math import *
|
||
import numpy as np
|
||
from scipy.io import wavfile
|
||
from scipy import signal
|
||
import matplotlib.pyplot as plt
|
||
import subprocess
|
||
import wave as wv
|
||
import struct
|
||
import librosa
|
||
import heapq
|
||
import scipy
|
||
import os
|
||
import random
|
||
from pathlib import Path
|
||
from time import sleep
|
||
from datetime import timedelta
|
||
|
||
WORKING_SAMPLE_RATE = 1000
|
||
|
||
print("Starting...\n")
|
||
|
||
def filter_n_percent_serial(song_name, offset, n_iter, step, threshold):
|
||
"""
|
||
song_name : string
|
||
offset : int
|
||
n_iter : int (number of turns)
|
||
step : int (length of each small segment)
|
||
threshold : int (is in ]0, 100])
|
||
|
||
filter data associated with song_name to keep only the highest threshold% values
|
||
"""
|
||
|
||
offset = offset.total_seconds()
|
||
|
||
subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(offset+step*n_iter), "-i", song_name, "crop.wav"])
|
||
|
||
sample_rate, global_data = wavfile.read('crop.wav')
|
||
|
||
subprocess.run(["clear"])
|
||
subprocess.run(["rm", "crop.wav"])
|
||
|
||
for i in range(n_iter):
|
||
print(i, "/", n_iter)
|
||
#print(i * step)
|
||
song_data = global_data[int(i*step*sample_rate):int((i+1)*step*sample_rate)]
|
||
|
||
if(len(song_data) != 0):
|
||
mx = max(song_data)
|
||
|
||
is_locked = [False for i in range(len(song_data))]
|
||
x = int((len(song_data)*threshold)//100)
|
||
#print("X = ", x)
|
||
|
||
#print("Retreiving the", int(x), "/", len(song_data), "highest values")
|
||
elements = heapq.nlargest(int(x), enumerate(song_data), key=lambda x: x[1])
|
||
#print("Done")
|
||
|
||
for idx in range(len(elements)):
|
||
is_locked[elements[idx][0]] = True
|
||
|
||
for r in range(len(song_data)):
|
||
if(is_locked[r] == False):
|
||
global_data[r+int(i*step*sample_rate)] = 0
|
||
|
||
return global_data
|
||
|
||
|
||
def write_to_file_thr(sample_rate, song_data, offset, threshold, filename):
|
||
# write data to output file
|
||
file = open(filename, 'w')
|
||
file.writelines('time,amplitude\n')
|
||
mx = max(song_data)
|
||
print("writing to output...")
|
||
for i in range(len(song_data)):
|
||
if(i%(len(song_data)//50) == 0):
|
||
print(i, "/", len(song_data))
|
||
if(song_data[i]/mx > threshold):
|
||
file.writelines(str(np.round(offset + i/sample_rate, 3)))
|
||
file.writelines(',')
|
||
file.writelines(str(np.round(song_data[i], 0)))
|
||
file.writelines('\n')
|
||
|
||
def round_t(id, sample_rate, bpm, div, offset, k0):
|
||
k = k0
|
||
t = offset + k/(bpm*div)
|
||
while(t < id/sample_rate):
|
||
t = offset + k/(bpm*div)
|
||
k += 1
|
||
|
||
if(np.abs(t - id/sample_rate) < np.abs((t - 1/(bpm*div)) - id/sample_rate)):
|
||
return t
|
||
return (t - 1/(bpm*div), 0)
|
||
|
||
def compress(Zxx):
|
||
res = []
|
||
|
||
def get_freq(song_name, offset, step, songlen, data, display=False):
|
||
"""
|
||
for a given list of amplitudes, returns the corresponding peak frequencies
|
||
"""
|
||
offset = offset.total_seconds()
|
||
fft_list = []
|
||
times = []
|
||
current_time = offset
|
||
k = 0
|
||
|
||
subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(offset+songlen), "-i", song_name, "crop.wav"])
|
||
|
||
sample_rate, global_data = wavfile.read("crop.wav")
|
||
#blit = int(len(global_data) / len(data))
|
||
blit = int(sample_rate*step)
|
||
|
||
subprocess.run(["clear"])
|
||
subprocess.run(["rm", "crop.wav"])
|
||
|
||
pfreq = scipy.fft.rfftfreq(blit, 1/sample_rate)
|
||
|
||
print("len : ", len(global_data))
|
||
print("len : ", len(data))
|
||
|
||
frequencies = [0 for s in range(len(data))]
|
||
print(len(pfreq))
|
||
|
||
for s in range(len(data)):
|
||
if(data[s] != 0):
|
||
pff = scipy.fft.rfft(global_data[int(s*len(global_data)/len(data)):int(WORKING_SAMPLE_RATE*step+int(s*len(global_data)/len(data)))])
|
||
|
||
mx = max(np.abs(pff))
|
||
for id in range(len(pff)):
|
||
if frequencies[s] == 0 and np.abs(pff[id]) == mx:
|
||
frequencies[s] = pfreq[id]
|
||
|
||
elif s != 0:
|
||
frequencies[s] = 0
|
||
|
||
if(display):
|
||
plt.plot([offset+t/1000 for t in range(len(data))], frequencies)
|
||
plt.grid()
|
||
plt.xlabel("Time (s)")
|
||
plt.ylabel("Dominant frequency (Hz)")
|
||
plt.title("Dominant frequencies at peaks")
|
||
plt.show()
|
||
|
||
return frequencies
|
||
|
||
def is_data_stereo(raw_global_data:list) -> bool:
|
||
"""
|
||
raw_global_data : list
|
||
"""
|
||
try:
|
||
assert(raw_global_data[0][0])
|
||
except IndexError:
|
||
return False
|
||
except AssertionError:
|
||
return True
|
||
return True
|
||
|
||
|
||
def void_freq(song_name, offset, songlen, increment, minfreq, maxfreq, upperthr, ampthr, ampfreq, ampval, leniency, write, linear, output_file="trimmed.wav"):
|
||
"""
|
||
song_name : string
|
||
offset : int
|
||
songlen : int (length of the part that will be filtered, starting from offset)
|
||
increment : float (technical parameter)
|
||
minfreq and maxfreq : every frequency in [minfreq, maxfreq] will be voided
|
||
upperthr : every frequency above upperthr will be voided
|
||
ampthr : every frequency with amplitude under MAX/ampthr (aka amplitudes under (100/ampthr)% of the max will be voided
|
||
ampfreq, leniency (if linear is false), linear : technical parameters
|
||
ampval : int
|
||
- if linear is false, then this willbe the maximum amplification possible
|
||
- if linear is true, this is the multiplier (Amp <- Amp * (ampval * frequency + leniency))
|
||
write : bool (should be set to True)
|
||
output_file : technical
|
||
"""
|
||
offset = offset.total_seconds()
|
||
|
||
fft_list = []
|
||
times = []
|
||
current_time = offset
|
||
k = 0
|
||
|
||
subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(songlen+offset), "-i", song_name, "crop.wav"])
|
||
|
||
sample_rate, raw_global_data = wavfile.read("crop.wav")
|
||
blit = int(sample_rate*increment)
|
||
|
||
global_data = [0 for i in range(len(raw_global_data))]
|
||
|
||
subprocess.run(["clear"])
|
||
subprocess.run(["rm", "crop.wav"])
|
||
|
||
a = 0
|
||
|
||
if(is_data_stereo(raw_global_data)):
|
||
print("Converting to mono...")
|
||
for x in range(len(raw_global_data)):
|
||
global_data[x] = raw_global_data[x][0]/2 + raw_global_data[x][1]/2
|
||
|
||
if(x % (int(len(raw_global_data)/100)) == 0):
|
||
print(a, "/ 100")
|
||
a += 1
|
||
|
||
else:
|
||
global_data = raw_global_data
|
||
|
||
#print("Blit :", blit)
|
||
|
||
pfreq = scipy.fft.rfftfreq(blit, 1/sample_rate)
|
||
|
||
#print(len(pfreq))
|
||
|
||
while(current_time <= songlen):
|
||
pff = scipy.fft.rfft(global_data[k*blit:(k+1)*blit])
|
||
fft_list.append(pff)
|
||
times.append(k*increment)
|
||
|
||
k += 1
|
||
current_time = offset + k*increment
|
||
|
||
print("FFT :", len(fft_list), "\nFFT[0] :", len(fft_list[0]), "\npfreq :", len(pfreq))
|
||
|
||
|
||
print("Finding global max...")
|
||
|
||
if(linear == False):
|
||
for i in range(len(fft_list)):
|
||
for j in range(len(fft_list[i])):
|
||
fft_list[i][j] *= (1 + ampval/max(1, np.abs(pfreq[j] - ampfreq)))
|
||
else:
|
||
for i in range(len(fft_list)):
|
||
for j in range(len(fft_list[i])):
|
||
fft_list[i][j] *= (ampval*pfreq[j] + leniency)
|
||
|
||
print("Trimming...")
|
||
|
||
for i in range(len(fft_list)):
|
||
lmax = 0
|
||
for j in range(len(fft_list[i])):
|
||
if(np.abs(fft_list[i][j]) > lmax):
|
||
lmax = np.abs(fft_list[i][j])
|
||
|
||
for j in range(len(fft_list[i])):
|
||
if((pfreq[j] >= minfreq and pfreq[j] < maxfreq) or pfreq[j] > upperthr):
|
||
fft_list[i][j] = 0+0j
|
||
|
||
if(np.abs(fft_list[i][j]) < lmax/ampthr):
|
||
fft_list[i][j] = 0+0j
|
||
|
||
|
||
if(write):
|
||
res = []
|
||
print("Converting...")
|
||
for i in range(len(fft_list)):
|
||
ift = scipy.fft.irfft(fft_list[i], n=blit)
|
||
for k in ift:
|
||
res.append(k)
|
||
#print(type(res[0]))
|
||
mx = 0
|
||
for j in range(len(res)):
|
||
if(res[j] > mx):
|
||
mx = res[j]
|
||
|
||
for i in range(len(res)):
|
||
res[i] = np.int16(32767*res[i]/mx)
|
||
|
||
res = np.array(res)
|
||
wavfile.write(output_file, WORKING_SAMPLE_RATE, res)
|
||
|
||
#plt.plot(np.abs(pfreq[:len(fft_list[0])]), np.abs(fft_list[0]))
|
||
#plt.grid()
|
||
#plt.show()
|
||
|
||
print("Done")
|
||
|
||
def get_tpts(data, sample_rate, thr):
|
||
res = []
|
||
for i in range(len(data)):
|
||
if(data[i] > thr):
|
||
res.append(i/sample_rate)
|
||
|
||
for i in res:
|
||
print(i)
|
||
return res
|
||
|
||
def test_sample(timelist):
|
||
for i in range(1,len(timelist)):
|
||
#os.system('play -n synth %s sin %s' % (0.05, 440))
|
||
for k in range(random.randint(1, 10)):
|
||
print("E", end="")
|
||
print("F")
|
||
sleep(timelist[i]-timelist[i-1])
|
||
|
||
def convert_tuple(datares, freq):
|
||
"""
|
||
Takes datares and converts it to a list of tuples (amplitude, datetimes)
|
||
"""
|
||
return [(timedelta(milliseconds=i), datares[i], freq[i]) for i in range(len(datares)) if datares[i] > 0]
|
||
|
||
def get_songlen(filename):
|
||
"""
|
||
retrieves the length of the song in seconds
|
||
"""
|
||
sample_rate, global_data = wavfile.read(filename)
|
||
print("LEN :", len(global_data)/sample_rate)
|
||
|
||
return (len(global_data)/sample_rate)
|
||
|
||
def snap(data, sample_rate, bpm, divisor, show=False):
|
||
# adjust time amplitudes to match the given BPM
|
||
new = [0 for x in range(int(1000*len(data)/sample_rate))] # 1pt per millisecond
|
||
print("old =", len(data))
|
||
print("len =", 1000*len(data)/sample_rate)
|
||
k = 0
|
||
t = 0
|
||
percent = 0
|
||
for i in range(len(data)):
|
||
|
||
while(t < i/sample_rate):
|
||
t = k/(bpm*divisor)
|
||
k += 60
|
||
'''
|
||
if(np.abs(i/sample_rate - k/(bpm*divisor)) > np.abs(i/sample_rate - (k-60)/(bpm*divisor))):
|
||
k -= 60
|
||
t = k/(bpm*divisor)'''
|
||
|
||
if(i%(len(data)//100) == 0):
|
||
print(percent, "%")
|
||
percent += 1
|
||
|
||
if(int(t*1000) < len(new)):
|
||
new[int(t*1000)] = max(data[i], new[int(t*1000)])
|
||
else:
|
||
new[len(new)-1] = max(data[i], new[len(new)-1])
|
||
|
||
if(show):
|
||
t = [j/1000 for j in range(len(new))]
|
||
plt.plot(t, new)
|
||
plt.xlabel("Time (e)")
|
||
plt.ylabel("Amplitude")
|
||
plt.grid()
|
||
plt.show()
|
||
|
||
return new
|
||
|
||
def adjust_timings(raw_data, snapped_data, indexes, thr=100):
|
||
"""
|
||
adjusts weirdly snapped notes
|
||
"""
|
||
current = 0
|
||
|
||
while(current < len(indexes)):
|
||
|
||
if(current < len(indexes) - 3 and current % 2 == 1): # on a 1/4 beat
|
||
if(snapped_data[indexes[current]] > thr and snapped_data[indexes[current+1]] > thr and snapped_data[indexes[current+2]] > thr and snapped_data[indexes[current+3]] <= thr):
|
||
# -XXX_
|
||
snapped_data[indexes[current+3]] = snapped_data[indexes[current+2]]
|
||
snapped_data[indexes[current+2]] = 0
|
||
|
||
if(current > 0 and current < len(indexes) - 1 and current % 2 == 1):
|
||
if(snapped_data[indexes[current]] > thr and (snapped_data[indexes[current+1]] < thr or snapped_data[indexes[current-1]] < thr)):
|
||
#_X_
|
||
'''if(snapped_data[indexes[current-1]] < thr and raw_data[indexes[current-1]] > raw_data[indexes[current+1]]):
|
||
snapped_data[indexes[current-1]] = snapped_data[indexes[current]]
|
||
else:
|
||
snapped_data[indexes[current+1]] = snapped_data[indexes[current]]'''
|
||
|
||
snapped_data[indexes[current]] = 0
|
||
|
||
current += 1
|
||
|
||
print("Resnap done")
|
||
return snapped_data
|
||
|
||
def snap2(data, sample_rate, bpm, first_offset=0, div=4, show=False, adjust=False):
|
||
"""
|
||
data : list(int)
|
||
sample_rate : int
|
||
bpm = float
|
||
"""
|
||
|
||
song_len = int(len(data)/sample_rate)
|
||
|
||
indexes = []
|
||
app = True
|
||
|
||
reduced = [0 for i in range(song_len*1000)]
|
||
new = [0 for i in range(song_len*1000)]
|
||
|
||
# build the reduced version
|
||
for i in range(len(data)):
|
||
x = int(i*1000/sample_rate)
|
||
if(x < len(reduced)):
|
||
reduced[x] = max(reduced[x], data[i])
|
||
|
||
|
||
print("Build done")
|
||
# snap
|
||
k = 0
|
||
current_t = first_offset
|
||
|
||
while(current_t < 0):
|
||
k += 1
|
||
current_t = first_offset + k*60/(bpm*div)
|
||
|
||
for j in range(len(new)):
|
||
if(j/1000 > current_t):
|
||
k += 1
|
||
current_t = first_offset + k*60/(bpm*div)
|
||
app = True
|
||
|
||
y = int(current_t*1000)
|
||
if(y < len(new)):
|
||
new[y] = max(new[y], reduced[j])
|
||
|
||
if(app):
|
||
indexes.append(y)
|
||
app = False
|
||
|
||
print("Snap done")
|
||
|
||
if(adjust):
|
||
print("Len :", len(indexes))
|
||
|
||
new = adjust_timings(reduced, new, indexes)
|
||
|
||
|
||
if(show):
|
||
|
||
t = [j/1000+first_offset for j in range(len(new))]
|
||
scatter_t = [t[i] for i in range(len(new)) if new[i] != 0]
|
||
scatter_chosen_rhythm = [0.9 for i in range(len(new)) if new[i] != 0 ]
|
||
beats_1 = [0 for j in range(len(new))]
|
||
beats_2 = [0 for k in range(len(new))]
|
||
beats_4 = [0 for l in range(len(new))]
|
||
|
||
k = 0
|
||
current_t = first_offset
|
||
|
||
while(current_t < 0):
|
||
k += 1
|
||
current_t = first_offset + k*60/(bpm*div)
|
||
|
||
while(1000*current_t < len(new)):
|
||
beats_4[int(1000*current_t)] = 0.9
|
||
|
||
if(k % 2 == 0):
|
||
beats_2[int(1000*current_t)] = 0.902
|
||
|
||
if(k % 4 == 0):
|
||
beats_1[int(1000*current_t)] = 0.91
|
||
|
||
if(k % 16 == 0):
|
||
beats_1[int(1000*current_t)] = 0.915
|
||
|
||
k += 1
|
||
current_t = first_offset + k*60/(bpm*div)
|
||
|
||
points = plt.scatter(scatter_t, scatter_chosen_rhythm, marker="o", label="Detected Rhythm")
|
||
div1_plot, = plt.plot(t, beats_4, "b-", label="1/4")
|
||
div2_plot, = plt.plot(t, beats_2, "r-", label="1/2")
|
||
div3_plot, = plt.plot(t, beats_1, "black", label="1/1")
|
||
plt.xlabel("Time (s)")
|
||
#plt.ylabel("Amplitude")
|
||
plt.legend(handles=[points, div1_plot, div2_plot, div3_plot])
|
||
plt.grid()
|
||
plt.show()
|
||
|
||
return new
|
||
|
||
def convert_song(song_name:str, output_file="audio.wav") -> str:
|
||
"""
|
||
Converts the song to .wav AND lower its sample rate to 1000.
|
||
Returns: the song_name that should be used afterwards.
|
||
"""
|
||
subprocess.run(["ffmpeg", "-y", "-i", song_name, "-ar", "1000", output_file])
|
||
return output_file
|
||
|
||
def quantify(time: timedelta, bpm, offset, snapping):
|
||
"""
|
||
Input: timedelta, bpm, offset, and snapping divisor (2 for 1/2, etc...)
|
||
Returns a timedelta that is properly timed to the map.
|
||
"""
|
||
offset_ms = offset.total_seconds() / 1000
|
||
time_ms = time.total_seconds() * 1000
|
||
time_spacing = (60000/bpm)/snapping
|
||
beats_away = round((time_ms - offset_ms)/time_spacing)
|
||
new_time = timedelta(milliseconds=time_spacing*beats_away + offset_ms)
|
||
return new_time
|
||
|
||
def quantify_all(amplitudes_ugly, bpm, offset_ms, divisor):
|
||
n = len(amplitudes_ugly)
|
||
covered = [False] * n
|
||
times = []
|
||
amplitudes = []
|
||
for i in range(n):
|
||
if amplitudes_ugly[i] != 0 and not covered[i]:
|
||
times.append(quantify(timedelta(milliseconds=i), bpm, offset_ms, divisor))
|
||
amplitudes.append(amplitudes_ugly[i])
|
||
covered[i] = True
|
||
return times, amplitudes
|
||
|
||
def process_song(filename, bpm, offset=timedelta(milliseconds=0), div_len_factor=1, n_iter_2=-1, threshold=0.5, divisor=4):
|
||
"""
|
||
filename : string (name of the song)
|
||
offset : int [+] (song mapping will start from this time in seconds, default is 0)
|
||
bpm : int [+]
|
||
div_len_factor : float [+] (the length multiplier of each segment, default is 1)
|
||
n_iter : int [+*] (the number of iterations, default is -1 (maps the whole music))
|
||
threshold : int [0, 100] (used by the filter function to only keep the largest threshold% of timing points, default is 0.5)
|
||
divisor : int [+] (beat divisor used to snap the notes, default is 4)
|
||
"""
|
||
|
||
filename = convert_song(filename)
|
||
|
||
div_len = div_len_factor*60/bpm-0.01
|
||
|
||
n_iter = n_iter_2
|
||
song_len = get_songlen(filename)
|
||
|
||
if(n_iter == -1):
|
||
n_iter = floor((song_len-offset.total_seconds())/div_len)-1
|
||
|
||
filtered_name = f"{filename}_trimmed.wav"
|
||
|
||
void_freq(filename, offset, min(song_len, offset.total_seconds()+div_len*(n_iter+1)+0.01), 4*60/bpm, minfreq=0, maxfreq=220, upperthr=5000, ampthr=60, ampfreq = 1200, ampval = 5.0, leniency = 0.005, write=True, linear=False, output_file=filtered_name)
|
||
#void_freq(filename, offset, offset+div_len*(n_iter+1)+0.01, 4*60/bpm, minfreq=0, maxfreq=330, upperthr=2500, ampthr=60, ampfreq = 1200, ampval = 1/2000, leniency = 0.0, write=True, linear=True, output_file=filtered_name)
|
||
amplitudes_ugly = filter_n_percent_serial(filtered_name, offset, n_iter, div_len, threshold)
|
||
#datares = snap(datares, WORKING_SAMPLE_RATE, bpm, 4, True)
|
||
times, amplitudes = quantify_all(amplitudes_ugly, bpm, offset, divisor)
|
||
#frequencies = get_freq(filtered_name, offset, div_len, div_len*n_iter, datares, True)
|
||
Path(f"{filename}_trimmed.wav").unlink()
|
||
#return convert_tuple(datares, frequencies)
|
||
return times, amplitudes
|
||
|
||
def main():
|
||
data = process_song("tetris_4.wav", 160, n_iter_2=48, threshold=100)
|
||
#print(data)
|
||
print("Program finished with return 0")
|
||
|
||
if __name__ == "__main__":
|
||
main()
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
|
||
''' -------------------------------------------------------------------- '''
|
||
''' -----------------------| Feuilles mortes |-------------------------- '''
|
||
''' -------------------------------------------------------------------- '''
|
||
|
||
|
||
'''
|
||
def smooth(data, thr, mergeThr, show):
|
||
mx = max(data)
|
||
for i in range(len(data)-mergeThr):
|
||
if(data[i]/mx > thr):
|
||
for k in range(1, mergeThr):
|
||
data[i+k] = 0
|
||
if(show):
|
||
t = [j/1000 for j in range(len(data))]
|
||
plt.plot(t, data)
|
||
plt.xlabel("Time (not scaled to origin)")
|
||
plt.ylabel("Amplitude")
|
||
plt.grid()
|
||
plt.show()
|
||
|
||
return data
|
||
if(False):
|
||
#t, f, Zxx = fct("no.wav", 0, 0.032, 10, 5000, False)
|
||
#t, f, Zxx = fct("worlds_end_3.wav", 150.889, 0.032, 170.889, 3000, False)
|
||
#t, f, Zxx = fct("deltamax.wav", 9.992, 0.032, 114.318, 3000, False)
|
||
#t, f, Zxx = fct("deltamax.wav", 9.992, 0.032, 20, 3000, False)
|
||
#t, f, Zxx = fct("da^9.wav", 8.463, 0.032, 20, 5000, False)
|
||
t, f, Zxx = fct("13. Cosmic Mind.wav", 0, 0.032, 20, 5000, False)
|
||
#t, f, Zxx = fct("Furioso Melodia WORKING_SAMPLE_RATE.wav", 4, 0.032, 8, 3000, False)
|
||
#t, f, Zxx = fct("changing.wav", 0, 0.05, 3.9, 5000, False)
|
||
#fct("worlds_end_3.wav", 75, (60/178)/4, 75+2, 2500)
|
||
|
||
plot_max(t, f, Zxx, True)
|
||
|
||
if(False):
|
||
#(t, data) = peaks("worlds_end_3.wav", 0, 300, False, 0.92)
|
||
(t, data) = peaks("worlds_end_3.wav", 74.582, 6, False, 0.9)
|
||
#(t, data) = peaks("da^9.wav", 8.463, 301.924 - 8.463, False, 0.95)
|
||
#(t, data) = peaks("deltamax.wav", 8.463, 30101.924 - 8.463, False, 0.92)
|
||
da = find_bpm(t, WORKING_SAMPLE_RATE, data, 100, 200, 1, 10)
|
||
print("BPM data is", da)'''
|
||
|
||
#data = [-1 for i in range(int(x))]
|
||
#ids = [-1 for i in range(int(x))]
|
||
'''
|
||
data = []
|
||
ids = []
|
||
for k in range(int(x)):
|
||
data.append(int(7*mx/10))
|
||
ids.append(-1)
|
||
# structure there is [[index, value]...]
|
||
|
||
i = 0
|
||
calc = 0
|
||
while(i < len(song_data)):
|
||
if(i%10 == 0):
|
||
print(i, "/", len(song_data))
|
||
if(data[int(x)-1] < song_data[i]):
|
||
calc += 1
|
||
#print("\n \n \n \n \n")
|
||
data[int(x)-1] = song_data[i]
|
||
ids[int(x)-1] = i
|
||
|
||
k = int(x)-1
|
||
#while(k < int(x) & data[0] > data[k]):
|
||
while(k > 0 and data[k-1] <= data[k]):
|
||
data[k], data[k-1] = data[k-1], data[k]
|
||
ids[k], ids[k-1] = ids[k-1], ids[k]
|
||
k -= 1
|
||
|
||
#print(data[int(x)-1], calc, "/", x)
|
||
|
||
i += skip
|
||
i += 1
|
||
|
||
|
||
for s in range(int(x)-1):
|
||
if(data[s] < data[s+1]):
|
||
print("Nope", s)
|
||
assert(0)
|
||
'''
|
||
|
||
|
||
'''
|
||
def fct(song_name, offset, increment, songlen, maxfreq, display):
|
||
to_cut = 20000//maxfreq
|
||
global_Zxx = np.array([])
|
||
global_f = np.array([])
|
||
global_t = np.array([])
|
||
current_time = offset
|
||
k = 0
|
||
while(current_time <= songlen):
|
||
subprocess.run(["ffmpeg", "-ss", str(current_time), "-t", str(increment), "-i", song_name, "crop.wav"])
|
||
|
||
sample_rate, audio_data = wavfile.read('crop.wav')
|
||
size = audio_data.size
|
||
|
||
#subprocess.run(["clear"])
|
||
subprocess.run(["rm", "crop.wav"])
|
||
|
||
# do stuff here
|
||
#f, t, Zxx = signal.stft(audio_data, sample_rate, nperseg=1000)
|
||
f, t, Zxx = signal.spectrogram(audio_data, fs=sample_rate, nfft=size)
|
||
leng = len(f)
|
||
|
||
f, Zxx = f[:leng//to_cut], Zxx[:leng//to_cut]
|
||
|
||
#print(len(Zxx))
|
||
#print(len(Zxx[0]))
|
||
|
||
|
||
for i in range(len(Zxx)):
|
||
for j in range(len(Zxx[i])):
|
||
Zxx[i][j] *= 1127*np.log(1+f[i]/700)
|
||
|
||
|
||
t = np.array([current_time + x for x in t])
|
||
|
||
if(k == 0):
|
||
global_f = f
|
||
global_t = t
|
||
global_Zxx = Zxx
|
||
else:
|
||
global_Zxx = np.concatenate((global_Zxx, Zxx), axis=1)
|
||
global_t = np.concatenate((global_t, t))
|
||
|
||
#print(len(global_t))
|
||
|
||
k += 1
|
||
current_time = offset + k*increment
|
||
|
||
print("Completion rate : ", np.round(100*(current_time-offset)/(songlen-offset), 4), "%")
|
||
|
||
if(display):
|
||
plt.pcolormesh(global_t, global_f, np.abs(global_Zxx), shading='gouraud')
|
||
# print(len(global_Zxx), len(global_Zxx[0]))
|
||
# 88 192 = 2500
|
||
# 70 192 = 2000
|
||
plt.title('STFT Magnitude')
|
||
plt.ylabel('Frequency [Hz]')
|
||
plt.xlabel('Time [sec]')
|
||
plt.show()
|
||
|
||
return global_t, global_f, np.abs(global_Zxx)
|
||
|
||
def write_to_file(t, flist, maxlist, filename):
|
||
file = open(filename, 'w')
|
||
file.writelines('time,frequency,maxvalue\n')
|
||
for i in range(len(t)):
|
||
file.writelines(str(np.round(t[i], 3)))
|
||
file.writelines(',')
|
||
file.writelines(str(np.round(flist[i], 1)))
|
||
file.writelines(',')
|
||
file.writelines(str(np.round(maxlist[i], 0)))
|
||
file.writelines('\n')
|
||
#close(file)
|
||
|
||
def plot_max(time, freq, Zxx, save):
|
||
fres = [0 for x in range(len(time))]
|
||
maxres = [0 for x in range(len(time))]
|
||
for t in range(len(time)):
|
||
#subprocess.run(["clear"])
|
||
print(t, "/", len(time))
|
||
for f in range(len(Zxx)):
|
||
if(maxres[t] < Zxx[f][t]):
|
||
maxres[t] = Zxx[f][t]
|
||
fres[t] = freq[f]
|
||
|
||
if(save):
|
||
write_to_file(time, fres, maxres, 'output.csv')
|
||
|
||
''''''
|
||
plt.plot(time, fres, 'r')
|
||
plt.grid()
|
||
plt.xlabel("Time")
|
||
plt.ylabel("Maximum frequencies")
|
||
|
||
plt.plot(time, maxres, 'g')
|
||
plt.grid()
|
||
plt.xlabel("Time")
|
||
plt.ylabel("Maximun values")
|
||
|
||
plt.show()''''''
|
||
|
||
fig, (ax1, ax2) = plt.subplots(2)
|
||
fig.suptitle('Top : time and frequencies\nBottom : time and max values')
|
||
ax1.plot(time, fres)
|
||
ax2.plot(time, maxres)
|
||
|
||
plt.show()
|
||
|
||
def extract_peaks(song_data, sample_rate, offset, display, threshold):
|
||
mx = max(song_data)
|
||
for i in range(len(song_data)):
|
||
#subprocess.run(["clear"])
|
||
print(i, "/", len(song_data))
|
||
if(song_data[i]/mx < threshold):
|
||
song_data[i] = 0
|
||
t = [offset + i/sample_rate for i in range(len(song_data))]
|
||
|
||
if(display):
|
||
plt.plot(t, song_data, 'b+')
|
||
plt.grid()
|
||
plt.xlabel("t")
|
||
plt.ylabel("amp")
|
||
plt.show()
|
||
|
||
return (t, song_data)
|
||
|
||
def get_local_max(song_data, center, width):
|
||
mx = 0
|
||
for o in range(-width, width+1):
|
||
togo = min(len(song_data)-1, center+o)
|
||
togo = max(0, togo)
|
||
if(mx < song_data[togo]):
|
||
mx = song_data[togo]
|
||
return mx
|
||
|
||
def extract_peaks_v2(song_data, sample_rate, offset, display, threshold, seglen):
|
||
mx = 0
|
||
for i in range(len(song_data)):
|
||
if (i%seglen == 0):
|
||
print("----")
|
||
mx = get_local_max(song_data, i+seglen//2, seglen//2)
|
||
#subprocess.run(["clear"])
|
||
print(i, "/", len(song_data))
|
||
if(song_data[i]/mx < threshold):
|
||
song_data[i] = 0
|
||
|
||
t = [offset + i/sample_rate for i in range(len(song_data))]
|
||
|
||
if(display):
|
||
plt.plot(t, song_data, 'b+')
|
||
plt.grid()
|
||
plt.xlabel("t")
|
||
plt.ylabel("amp")
|
||
plt.show()
|
||
|
||
return (t, song_data)
|
||
|
||
def peaks(song_name, offset, length, display, thr):
|
||
offset = offset.total_seconds()
|
||
subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(length), "-i", song_name, "crop.wav"])
|
||
|
||
sample_rate, audio_data = wavfile.read('crop.wav')
|
||
|
||
subprocess.run(["clear"])
|
||
subprocess.run(["rm", "crop.wav"])
|
||
|
||
#return extract_peaks(audio_data, sample_rate, offset, display, thr)
|
||
return extract_peaks_v2(audio_data, sample_rate, offset, display, thr, WORKING_SAMPLE_RATE*2)
|
||
|
||
def find_bpm(sample_rate, data, minbpm, maxbpm, step, width):
|
||
optimal = minbpm
|
||
optimal_acc = 0
|
||
accuracy = 0
|
||
|
||
bpmlst = []
|
||
scores = []
|
||
|
||
for beat in range(minbpm, maxbpm+step, step):
|
||
loopturn = 0
|
||
print("testing", beat)
|
||
accuracy = 0
|
||
current = 0
|
||
|
||
while(current+width < len(data)):
|
||
loopturn += 1
|
||
for o in range(-width, width+1):
|
||
accuracy += data[current + o]
|
||
#current = (loopturn*sample_rate)//beat
|
||
current += (sample_rate)//beat
|
||
|
||
#accuracy = accuracy/loopturn
|
||
|
||
#accuracy *= (1+(maxbpm-beat)/minbpm)
|
||
if optimal_acc < accuracy:
|
||
optimal_acc = accuracy
|
||
optimal = beat
|
||
bpmlst.append(beat)
|
||
scores.append(accuracy)
|
||
|
||
if(False):
|
||
plt.plot(bpmlst, scores)
|
||
plt.xlabel("BPM")
|
||
plt.ylabel("Score")
|
||
plt.grid()
|
||
plt.show()
|
||
|
||
return (optimal, optimal_acc)
|
||
'''
|
||
|
||
|
||
|
||
'''
|
||
def void_freq(song_name, offset, songlen, increment, lthr, gthr):
|
||
to_cut = 20000//2500
|
||
global_Zxx = np.array([])
|
||
global_f = np.array([])
|
||
global_t = np.array([])
|
||
current_time = offset
|
||
k = 0
|
||
sample_rate, global_data = wavfile.read(song_name)
|
||
blit = int(sample_rate*increment)
|
||
print("Blit :", blit)
|
||
while(current_time <= songlen):
|
||
#subprocess.run(["ffmpeg", "-ss", str(current_time), "-t", str(increment), "-i", song_name, "crop.wav"])
|
||
|
||
#sample_rate, audio_data = wavfile.read('crop.wav')
|
||
audio_data = global_data[int(k*blit):int((k+1)*blit)]
|
||
size = audio_data.size
|
||
|
||
#subprocess.run(["clear"])
|
||
#subprocess.run(["rm", "crop.wav"])
|
||
|
||
# do stuff here
|
||
#f, t, Zxx = signal.stft(audio_data, sample_rate, nperseg=1000)
|
||
f, t, Zxx = signal.spectrogram(audio_data, fs=sample_rate, nfft=size)
|
||
leng = len(f)
|
||
|
||
f, Zxx = f[:leng//to_cut], Zxx[:leng//to_cut]
|
||
|
||
for i in range(len(Zxx)):
|
||
for j in range(len(Zxx[i])):
|
||
#Zxx[i][j] *= 1127*np.log(1+f[i]/700)
|
||
Zxx[i][j] *= 1000
|
||
|
||
t = np.array([current_time + x for x in t])
|
||
|
||
if(k == 0):
|
||
global_f = f
|
||
global_t = t
|
||
global_Zxx = Zxx
|
||
else:
|
||
global_Zxx = np.concatenate((global_Zxx, Zxx), axis=1)
|
||
global_t = np.concatenate((global_t, t))
|
||
|
||
#print(len(global_t))
|
||
|
||
k += 1
|
||
current_time = offset + k*increment
|
||
|
||
print("Completion rate : ", np.round(100*(current_time-offset)/(songlen-offset), 4), "%")
|
||
|
||
print("Finding global max...")
|
||
gmax = 0
|
||
for i in range(len(global_Zxx)):
|
||
for j in range(len(global_Zxx[i])):
|
||
if(global_Zxx[i][j] > gmax):
|
||
gmax = global_Zxx[i][j]
|
||
|
||
print("Trimming...")
|
||
for j in range(len(global_Zxx[0])):
|
||
lmax = 0
|
||
for i in range(len(global_Zxx)):
|
||
if(global_Zxx[i][j] > lmax):
|
||
lmax = global_Zxx[i][j]
|
||
|
||
for i in range(len(global_Zxx)):
|
||
val = global_Zxx[i][j]
|
||
if(val/lmax <= lthr/100):
|
||
global_Zxx[i][j] = 0
|
||
elif(val/gmax <= gthr/100):
|
||
global_Zxx[i][j] = 0
|
||
|
||
if(False):
|
||
print("Plotting...")
|
||
plt.pcolormesh(global_t, global_f, np.abs(global_Zxx), shading='gouraud')
|
||
# print(len(global_Zxx), len(global_Zxx[0]))
|
||
print("XLEN :", len(global_Zxx), "\nYLEN :", len(global_Zxx[0]))
|
||
plt.title('STFT Magnitude')
|
||
plt.ylabel('Frequency [Hz]')
|
||
plt.xlabel('Time [sec]')
|
||
plt.show()
|
||
|
||
if(True):
|
||
print("Converting...")
|
||
audio_signal = librosa.griffinlim(global_Zxx)
|
||
#scipy.io.wavfile.write('trimmed.wav', sample_rate, np.array(audio_signal, dtype=np.int16))
|
||
wavfile.write('test.wav', sample_rate, np.array(audio_signal, dtype=np.int16))
|
||
|
||
print("Done")
|
||
|
||
def find_bpm_2(sample_rate, data, threshold, maxbpm, show):
|
||
mx = np.max(data)
|
||
min_spacing = (60*sample_rate)/maxbpm
|
||
k = 0
|
||
while(k < len(data) and data[k]/mx < threshold):
|
||
k += 1
|
||
|
||
k += 1
|
||
spacing = []
|
||
current = 1
|
||
progress = 0
|
||
|
||
while(k < len(data)):
|
||
if(k%(len(data)/100) == 0):
|
||
print(progress, "%")
|
||
progress += 1
|
||
if(data[k]/mx >= threshold and current > min_spacing):
|
||
spacing.append(current)
|
||
current = 0
|
||
else:
|
||
current += 1
|
||
k += 1
|
||
|
||
|
||
for x in range(len(spacing)):
|
||
spacing[x] = 60/(spacing[x]/sample_rate)
|
||
|
||
digits = [i for i in range(len(spacing))]
|
||
if(show):
|
||
plt.plot(digits, spacing)
|
||
plt.xlabel("N")
|
||
plt.ylabel("BPM")
|
||
plt.grid()
|
||
plt.show()
|
||
|
||
beat = np.mean(spacing)
|
||
error = np.std(spacing)
|
||
|
||
return (np.round(beat, 3), np.round(error, 3))
|
||
|
||
def to_ms(song_data, sample_rate, offset):
|
||
# converts audio data to have exactly 1 sample per millisecond (aka set sample_rate to 1000)
|
||
new_data = []
|
||
spacing = int(sample_rate * 0.001)
|
||
mx = max(song_data)
|
||
i = 0
|
||
while(i < len(song_data)):
|
||
avg = 0
|
||
for k in range(spacing):
|
||
if(i+spacing < len(song_data)):
|
||
avg += song_data[i+spacing]
|
||
avg = avg / spacing
|
||
new_data.append(avg)
|
||
i += spacing
|
||
|
||
if(False): # pls dont kill me thx
|
||
t = [offset + j/1000 for j in range(len(new_data))]
|
||
plt.plot(t, new_data)
|
||
plt.xlabel("Time")
|
||
plt.ylabel("Amplitude")
|
||
plt.grid()
|
||
plt.show()
|
||
|
||
return (new_data, len(new_data))
|
||
|
||
def filter_n_percent(song_name, offset, length, threshold, reduce, show):
|
||
# threshold is in ]0, 100]
|
||
# filter data associated with song_name to keep only the highest threshold% values
|
||
|
||
offset = offset.total_seconds()
|
||
|
||
subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(length), "-i", song_name, "crop.wav"])
|
||
|
||
sample_rate, song_data = wavfile.read('crop.wav')
|
||
|
||
subprocess.run(["clear"])
|
||
subprocess.run(["rm", "crop.wav"])
|
||
|
||
if(reduce):
|
||
(song_data,e) = to_ms(song_data, WORKING_SAMPLE_RATE, 1)
|
||
sample_rate = 1000
|
||
|
||
mx = max(song_data)
|
||
|
||
is_locked = [False for i in range(len(song_data))]
|
||
x = int((len(song_data)*threshold)//100)
|
||
#print("X = ", x)
|
||
|
||
print("Retreiving the", int(x), "/", len(song_data), "highest values")
|
||
elements = heapq.nlargest(int(x), enumerate(song_data), key=lambda x: x[1])
|
||
print("Done")
|
||
|
||
for idx in range(len(elements)):
|
||
is_locked[elements[idx][0]] = True
|
||
|
||
for r in range(len(song_data)):
|
||
if(is_locked[r] == False):
|
||
song_data[r] = 0
|
||
|
||
if(show):
|
||
#print("EEEEE")
|
||
t = [offset + j/sample_rate for j in range(len(song_data))]
|
||
plt.plot(t, song_data)
|
||
plt.xlabel("Time")
|
||
plt.ylabel("Amplitude")
|
||
plt.grid()
|
||
plt.show()
|
||
|
||
return song_data
|
||
'''
|