from math import * import numpy as np from scipy.io import wavfile from scipy import signal import matplotlib.pyplot as plt import subprocess import wave as wv import struct import librosa import heapq import scipy import os import random from pathlib import Path from time import sleep from datetime import timedelta import debug print("Starting...\n") def filter_n_percent_serial(song_name, offset, n_iter, step, threshold): """ song_name : string offset : int n_iter : int (number of turns) step : int (length of each small segment) threshold : int (is in ]0, 100]) filter data associated with song_name to keep only the highest threshold% values """ subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(offset+step*n_iter), "-i", song_name, "crop.wav"]) sample_rate, global_data = wavfile.read('crop.wav') subprocess.run(["clear"]) subprocess.run(["rm", "crop.wav"]) for i in range(n_iter): print(i, "/", n_iter) #print(i * step) song_data = global_data[int(i*step*sample_rate):int((i+1)*step*sample_rate)] if(len(song_data) != 0): mx = max(song_data) is_locked = [False for i in range(len(song_data))] x = int((len(song_data)*threshold)//100) #print("X = ", x) #print("Retreiving the", int(x), "/", len(song_data), "highest values") elements = heapq.nlargest(int(x), enumerate(song_data), key=lambda x: x[1]) #print("Done") for idx in range(len(elements)): is_locked[elements[idx][0]] = True for r in range(len(song_data)): if(is_locked[r] == False): global_data[r+int(i*step*sample_rate)] = 0 return global_data def write_to_file_thr(sample_rate, song_data, offset, threshold, filename): # write data to output file file = open(filename, 'w') file.writelines('time,amplitude\n') mx = max(song_data) print("writing to output...") for i in range(len(song_data)): if(i%(len(song_data)//50) == 0): print(i, "/", len(song_data)) if(song_data[i]/mx > threshold): file.writelines(str(np.round(offset + i/sample_rate, 3))) file.writelines(',') file.writelines(str(np.round(song_data[i], 0))) file.writelines('\n') def round_t(id, sample_rate, bpm, div, offset, k0): k = k0 t = offset + k/(bpm*div) while(t < id/sample_rate): t = offset + k/(bpm*div) k += 1 if(np.abs(t - id/sample_rate) < np.abs((t - 1/(bpm*div)) - id/sample_rate)): return t return (t - 1/(bpm*div), 0) def compress(Zxx): res = [] def get_freq(song_name, times, width=1000, display=False): """ for a given list of times (in seconds), returns the corresponding peak frequencies """ subprocess.run(["ffmpeg", "-ss", str(0), "-t", str(max(np.array(times))), "-i", song_name, "crop.wav"]) sample_rate, global_data = wavfile.read(song_name) #blit = int(sample_rate*step) subprocess.run(["clear"]) subprocess.run(["rm", "crop.wav"]) pfreq = scipy.fft.rfftfreq(2*width, 1/sample_rate) frequencies = [0 for s in range(len(times))] print(len(pfreq)) for s in range(len(times)): left = max(0, int(times[s]*44100)-width) right = min(len(global_data), int(times[s]*44100)+width) pff = scipy.fft.rfft(global_data[left:right]) #print(len(pff), len(pfreq)) mx = max(np.abs(pff)) for id in range(len(pff)): if frequencies[s] == 0 and np.abs(pff[id]) == mx: frequencies[s] = pfreq[id] if(display): plt.plot(times, frequencies) plt.grid() plt.xlabel("Time (s)") plt.ylabel("Dominant frequency (Hz)") plt.title("Dominant frequencies at peaks") plt.show() return frequencies def is_data_stereo(raw_global_data:list) -> bool: """ raw_global_data : list """ try: assert(raw_global_data[0][0]) except IndexError: return False except AssertionError: return True return True def void_freq(song_name, offset, songlen, increment, minfreq, maxfreq, upperthr, ampthr, ampfreq, ampval, leniency, write, linear, output_file="trimmed.wav"): """ song_name : string offset : int songlen : int (length of the part that will be filtered, starting from offset) increment : float (technical parameter) minfreq and maxfreq : every frequency in [minfreq, maxfreq] will be voided upperthr : every frequency above upperthr will be voided ampthr : every frequency with amplitude under MAX/ampthr (aka amplitudes under (100/ampthr)% of the max will be voided ampfreq, leniency (if linear is false), linear : technical parameters ampval : int - if linear is false, then this willbe the maximum amplification possible - if linear is true, this is the multiplier (Amp <- Amp * (ampval * frequency + leniency)) write : bool (should be set to True) output_file : technical """ fft_list = [] times = [] current_time = offset k = 0 subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(songlen+offset), "-i", song_name, "crop.wav"]) sample_rate, raw_global_data = wavfile.read("crop.wav") blit = int(sample_rate*increment) global_data = [0 for i in range(len(raw_global_data))] subprocess.run(["clear"]) subprocess.run(["rm", "crop.wav"]) a = 0 if(is_data_stereo(raw_global_data)): print("Converting to mono...") for x in range(len(raw_global_data)): global_data[x] = raw_global_data[x][0]/2 + raw_global_data[x][1]/2 if(x % (int(len(raw_global_data)/100)) == 0): print(a, "/ 100") a += 1 else: global_data = raw_global_data #print("Blit :", blit) pfreq = scipy.fft.rfftfreq(blit, 1/sample_rate) #print(len(pfreq)) while(current_time <= songlen): pff = scipy.fft.rfft(global_data[k*blit:(k+1)*blit]) fft_list.append(pff) times.append(k*increment) k += 1 current_time = offset + k*increment print("FFT :", len(fft_list), "\nFFT[0] :", len(fft_list[0]), "\npfreq :", len(pfreq)) print("Finding global max...") if(linear == False): for i in range(len(fft_list)): for j in range(len(fft_list[i])): fft_list[i][j] *= (1 + ampval/max(1, np.abs(pfreq[j] - ampfreq))) else: for i in range(len(fft_list)): for j in range(len(fft_list[i])): fft_list[i][j] *= (ampval*pfreq[j] + leniency) print("Trimming...") for i in range(len(fft_list)): lmax = 0 for j in range(len(fft_list[i])): if(np.abs(fft_list[i][j]) > lmax): lmax = np.abs(fft_list[i][j]) for j in range(len(fft_list[i])): if((pfreq[j] >= minfreq and pfreq[j] < maxfreq) or pfreq[j] > upperthr): fft_list[i][j] = 0+0j if(np.abs(fft_list[i][j]) < lmax/ampthr): fft_list[i][j] = 0+0j if(write): res = [] print("Converting...") for i in range(len(fft_list)): ift = scipy.fft.irfft(fft_list[i], n=blit) for k in ift: res.append(k) #print(type(res[0])) mx = 0 for j in range(len(res)): if(res[j] > mx): mx = res[j] for i in range(len(res)): res[i] = np.int16(32767*res[i]/mx) res = np.array(res) wavfile.write(output_file, 44100, res) #plt.plot(np.abs(pfreq[:len(fft_list[0])]), np.abs(fft_list[0])) #plt.grid() #plt.show() print("Done") def convert_tuple(data, times): """ Takes data and converts it to a list of tuples (amplitude, datetimes) """ return [(times[i], data[i]) for i in range(len(data))] def get_songlen(filename): """ retrieves the length of the song in seconds """ sample_rate, global_data = wavfile.read(filename) print("LEN :", len(global_data)/sample_rate) return (len(global_data)/sample_rate) def convert_to_wav(song_name:str, output_file="audio.wav") -> str: """ Converts the song to .wav, only if it's not already in wave format. Currently relies on file extension. Returns: the song_name that should be used afterwards. """ extension = Path(song_name).suffix match extension: case ".mp3" | ".ogg": print("Converting to .wav...") subprocess.run(["ffmpeg", "-y", "-i", song_name, output_file]) return output_file return song_name def process_song(filename, bpm, offset0=0, div_len_factor=1, n_iter_2=-1, threshold=0.5, divisor=4): """ filename : string (name of the song) offset : int [+] (song mapping will start from this time in seconds, default is 0) bpm : int [+] div_len_factor : float [+] (the length multiplier of each segment, default is 1) n_iter : int [+*] (the number of iterations, default is -1 (maps the whole music)) threshold : int [0, 100] (used by the filter function to only keep the largest threshold% of timing points, default is 0.5) divisor : int [+] (beat divisor used to snap the notes, default is 4) """ filename = convert_to_wav(filename) offset = offset0/1000 div_len = div_len_factor*60/bpm-0.01 n_iter = n_iter_2 song_len = get_songlen(filename) if(n_iter == -1): n_iter = int((song_len-offset/1000)/div_len)-1 filtered_name = f"{filename}_trimmed.wav" void_freq(filename, offset, min(song_len, offset+div_len*(n_iter+1)+0.01), 4*60/bpm, minfreq=0, maxfreq=220, upperthr=5000, ampthr=60, ampfreq = 1200, ampval = 5.0, leniency = 0.005, write=True, linear=False, output_file=filtered_name) datares = filter_n_percent_serial(filtered_name, offset, n_iter, div_len, threshold) #snapped_data = amplitude #times in ms (snapped_data, times) = debug.snap3(datares, mintime=50, initial_plot=True, after_plot=True) #frequencies=get_freq(filtered_name, offset, div_len, div_len*n_iter, snapped_data, True) frequencies = get_freq(filtered_name, times, display=True) Path(f"{filename}_trimmed.wav").unlink() return snapped_data, times, frequencies ''' datares = debug.snap2(datares, 44100, bpm, first_offset=offset, div=divisor, show=True, adjust=True) frequencies = get_freq(filtered_name, offset, div_len, div_len*n_iter, datares, True) Path(f"{filename}_trimmed.wav").unlink() return convert_tuple(datares, frequencies) ''' def main(): aa, bb, cc = process_song("tetris_4.wav", 160, n_iter_2=48) #print(data) print("Program finished with return 0") if __name__ == "__main__": main() ''' -------------------------------------------------------------------- ''' ''' -----------------------| Feuilles mortes |-------------------------- ''' ''' -------------------------------------------------------------------- ''' ''' def smooth(data, thr, mergeThr, show): mx = max(data) for i in range(len(data)-mergeThr): if(data[i]/mx > thr): for k in range(1, mergeThr): data[i+k] = 0 if(show): t = [j/1000 for j in range(len(data))] plt.plot(t, data) plt.xlabel("Time (not scaled to origin)") plt.ylabel("Amplitude") plt.grid() plt.show() return data if(False): #t, f, Zxx = fct("no.wav", 0, 0.032, 10, 5000, False) #t, f, Zxx = fct("worlds_end_3.wav", 150.889, 0.032, 170.889, 3000, False) #t, f, Zxx = fct("deltamax.wav", 9.992, 0.032, 114.318, 3000, False) #t, f, Zxx = fct("deltamax.wav", 9.992, 0.032, 20, 3000, False) #t, f, Zxx = fct("da^9.wav", 8.463, 0.032, 20, 5000, False) t, f, Zxx = fct("13. Cosmic Mind.wav", 0, 0.032, 20, 5000, False) #t, f, Zxx = fct("Furioso Melodia 44100.wav", 4, 0.032, 8, 3000, False) #t, f, Zxx = fct("changing.wav", 0, 0.05, 3.9, 5000, False) #fct("worlds_end_3.wav", 75, (60/178)/4, 75+2, 2500) plot_max(t, f, Zxx, True) if(False): #(t, data) = peaks("worlds_end_3.wav", 0, 300, False, 0.92) (t, data) = peaks("worlds_end_3.wav", 74.582, 6, False, 0.9) #(t, data) = peaks("da^9.wav", 8.463, 301.924 - 8.463, False, 0.95) #(t, data) = peaks("deltamax.wav", 8.463, 30101.924 - 8.463, False, 0.92) da = find_bpm(t, 44100, data, 100, 200, 1, 10) print("BPM data is", da)''' #data = [-1 for i in range(int(x))] #ids = [-1 for i in range(int(x))] ''' data = [] ids = [] for k in range(int(x)): data.append(int(7*mx/10)) ids.append(-1) # structure there is [[index, value]...] i = 0 calc = 0 while(i < len(song_data)): if(i%10 == 0): print(i, "/", len(song_data)) if(data[int(x)-1] < song_data[i]): calc += 1 #print("\n \n \n \n \n") data[int(x)-1] = song_data[i] ids[int(x)-1] = i k = int(x)-1 #while(k < int(x) & data[0] > data[k]): while(k > 0 and data[k-1] <= data[k]): data[k], data[k-1] = data[k-1], data[k] ids[k], ids[k-1] = ids[k-1], ids[k] k -= 1 #print(data[int(x)-1], calc, "/", x) i += skip i += 1 for s in range(int(x)-1): if(data[s] < data[s+1]): print("Nope", s) assert(0) ''' ''' def fct(song_name, offset, increment, songlen, maxfreq, display): to_cut = 20000//maxfreq global_Zxx = np.array([]) global_f = np.array([]) global_t = np.array([]) current_time = offset k = 0 while(current_time <= songlen): subprocess.run(["ffmpeg", "-ss", str(current_time), "-t", str(increment), "-i", song_name, "crop.wav"]) sample_rate, audio_data = wavfile.read('crop.wav') size = audio_data.size #subprocess.run(["clear"]) subprocess.run(["rm", "crop.wav"]) # do stuff here #f, t, Zxx = signal.stft(audio_data, sample_rate, nperseg=1000) f, t, Zxx = signal.spectrogram(audio_data, fs=sample_rate, nfft=size) leng = len(f) f, Zxx = f[:leng//to_cut], Zxx[:leng//to_cut] #print(len(Zxx)) #print(len(Zxx[0])) for i in range(len(Zxx)): for j in range(len(Zxx[i])): Zxx[i][j] *= 1127*np.log(1+f[i]/700) t = np.array([current_time + x for x in t]) if(k == 0): global_f = f global_t = t global_Zxx = Zxx else: global_Zxx = np.concatenate((global_Zxx, Zxx), axis=1) global_t = np.concatenate((global_t, t)) #print(len(global_t)) k += 1 current_time = offset + k*increment print("Completion rate : ", np.round(100*(current_time-offset)/(songlen-offset), 4), "%") if(display): plt.pcolormesh(global_t, global_f, np.abs(global_Zxx), shading='gouraud') # print(len(global_Zxx), len(global_Zxx[0])) # 88 192 = 2500 # 70 192 = 2000 plt.title('STFT Magnitude') plt.ylabel('Frequency [Hz]') plt.xlabel('Time [sec]') plt.show() return global_t, global_f, np.abs(global_Zxx) def write_to_file(t, flist, maxlist, filename): file = open(filename, 'w') file.writelines('time,frequency,maxvalue\n') for i in range(len(t)): file.writelines(str(np.round(t[i], 3))) file.writelines(',') file.writelines(str(np.round(flist[i], 1))) file.writelines(',') file.writelines(str(np.round(maxlist[i], 0))) file.writelines('\n') #close(file) def plot_max(time, freq, Zxx, save): fres = [0 for x in range(len(time))] maxres = [0 for x in range(len(time))] for t in range(len(time)): #subprocess.run(["clear"]) print(t, "/", len(time)) for f in range(len(Zxx)): if(maxres[t] < Zxx[f][t]): maxres[t] = Zxx[f][t] fres[t] = freq[f] if(save): write_to_file(time, fres, maxres, 'output.csv') '''''' plt.plot(time, fres, 'r') plt.grid() plt.xlabel("Time") plt.ylabel("Maximum frequencies") plt.plot(time, maxres, 'g') plt.grid() plt.xlabel("Time") plt.ylabel("Maximun values") plt.show()'''''' fig, (ax1, ax2) = plt.subplots(2) fig.suptitle('Top : time and frequencies\nBottom : time and max values') ax1.plot(time, fres) ax2.plot(time, maxres) plt.show() def extract_peaks(song_data, sample_rate, offset, display, threshold): mx = max(song_data) for i in range(len(song_data)): #subprocess.run(["clear"]) print(i, "/", len(song_data)) if(song_data[i]/mx < threshold): song_data[i] = 0 t = [offset + i/sample_rate for i in range(len(song_data))] if(display): plt.plot(t, song_data, 'b+') plt.grid() plt.xlabel("t") plt.ylabel("amp") plt.show() return (t, song_data) def get_local_max(song_data, center, width): mx = 0 for o in range(-width, width+1): togo = min(len(song_data)-1, center+o) togo = max(0, togo) if(mx < song_data[togo]): mx = song_data[togo] return mx def extract_peaks_v2(song_data, sample_rate, offset, display, threshold, seglen): mx = 0 for i in range(len(song_data)): if (i%seglen == 0): print("----") mx = get_local_max(song_data, i+seglen//2, seglen//2) #subprocess.run(["clear"]) print(i, "/", len(song_data)) if(song_data[i]/mx < threshold): song_data[i] = 0 t = [offset + i/sample_rate for i in range(len(song_data))] if(display): plt.plot(t, song_data, 'b+') plt.grid() plt.xlabel("t") plt.ylabel("amp") plt.show() return (t, song_data) def peaks(song_name, offset, length, display, thr): subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(length), "-i", song_name, "crop.wav"]) sample_rate, audio_data = wavfile.read('crop.wav') subprocess.run(["clear"]) subprocess.run(["rm", "crop.wav"]) #return extract_peaks(audio_data, sample_rate, offset, display, thr) return extract_peaks_v2(audio_data, sample_rate, offset, display, thr, 44100*2) def find_bpm(sample_rate, data, minbpm, maxbpm, step, width): optimal = minbpm optimal_acc = 0 accuracy = 0 bpmlst = [] scores = [] for beat in range(minbpm, maxbpm+step, step): loopturn = 0 print("testing", beat) accuracy = 0 current = 0 while(current+width < len(data)): loopturn += 1 for o in range(-width, width+1): accuracy += data[current + o] #current = (loopturn*sample_rate)//beat current += (sample_rate)//beat #accuracy = accuracy/loopturn #accuracy *= (1+(maxbpm-beat)/minbpm) if optimal_acc < accuracy: optimal_acc = accuracy optimal = beat bpmlst.append(beat) scores.append(accuracy) if(False): plt.plot(bpmlst, scores) plt.xlabel("BPM") plt.ylabel("Score") plt.grid() plt.show() return (optimal, optimal_acc) ''' ''' def void_freq(song_name, offset, songlen, increment, lthr, gthr): to_cut = 20000//2500 global_Zxx = np.array([]) global_f = np.array([]) global_t = np.array([]) current_time = offset k = 0 sample_rate, global_data = wavfile.read(song_name) blit = int(sample_rate*increment) print("Blit :", blit) while(current_time <= songlen): #subprocess.run(["ffmpeg", "-ss", str(current_time), "-t", str(increment), "-i", song_name, "crop.wav"]) #sample_rate, audio_data = wavfile.read('crop.wav') audio_data = global_data[int(k*blit):int((k+1)*blit)] size = audio_data.size #subprocess.run(["clear"]) #subprocess.run(["rm", "crop.wav"]) # do stuff here #f, t, Zxx = signal.stft(audio_data, sample_rate, nperseg=1000) f, t, Zxx = signal.spectrogram(audio_data, fs=sample_rate, nfft=size) leng = len(f) f, Zxx = f[:leng//to_cut], Zxx[:leng//to_cut] for i in range(len(Zxx)): for j in range(len(Zxx[i])): #Zxx[i][j] *= 1127*np.log(1+f[i]/700) Zxx[i][j] *= 1000 t = np.array([current_time + x for x in t]) if(k == 0): global_f = f global_t = t global_Zxx = Zxx else: global_Zxx = np.concatenate((global_Zxx, Zxx), axis=1) global_t = np.concatenate((global_t, t)) #print(len(global_t)) k += 1 current_time = offset + k*increment print("Completion rate : ", np.round(100*(current_time-offset)/(songlen-offset), 4), "%") print("Finding global max...") gmax = 0 for i in range(len(global_Zxx)): for j in range(len(global_Zxx[i])): if(global_Zxx[i][j] > gmax): gmax = global_Zxx[i][j] print("Trimming...") for j in range(len(global_Zxx[0])): lmax = 0 for i in range(len(global_Zxx)): if(global_Zxx[i][j] > lmax): lmax = global_Zxx[i][j] for i in range(len(global_Zxx)): val = global_Zxx[i][j] if(val/lmax <= lthr/100): global_Zxx[i][j] = 0 elif(val/gmax <= gthr/100): global_Zxx[i][j] = 0 if(False): print("Plotting...") plt.pcolormesh(global_t, global_f, np.abs(global_Zxx), shading='gouraud') # print(len(global_Zxx), len(global_Zxx[0])) print("XLEN :", len(global_Zxx), "\nYLEN :", len(global_Zxx[0])) plt.title('STFT Magnitude') plt.ylabel('Frequency [Hz]') plt.xlabel('Time [sec]') plt.show() if(True): print("Converting...") audio_signal = librosa.griffinlim(global_Zxx) #scipy.io.wavfile.write('trimmed.wav', sample_rate, np.array(audio_signal, dtype=np.int16)) wavfile.write('test.wav', sample_rate, np.array(audio_signal, dtype=np.int16)) print("Done") def find_bpm_2(sample_rate, data, threshold, maxbpm, show): mx = np.max(data) min_spacing = (60*sample_rate)/maxbpm k = 0 while(k < len(data) and data[k]/mx < threshold): k += 1 k += 1 spacing = [] current = 1 progress = 0 while(k < len(data)): if(k%(len(data)/100) == 0): print(progress, "%") progress += 1 if(data[k]/mx >= threshold and current > min_spacing): spacing.append(current) current = 0 else: current += 1 k += 1 for x in range(len(spacing)): spacing[x] = 60/(spacing[x]/sample_rate) digits = [i for i in range(len(spacing))] if(show): plt.plot(digits, spacing) plt.xlabel("N") plt.ylabel("BPM") plt.grid() plt.show() beat = np.mean(spacing) error = np.std(spacing) return (np.round(beat, 3), np.round(error, 3)) def to_ms(song_data, sample_rate, offset): # converts audio data to have exactly 1 sample per millisecond (aka set sample_rate to 1000) new_data = [] spacing = int(sample_rate * 0.001) mx = max(song_data) i = 0 while(i < len(song_data)): avg = 0 for k in range(spacing): if(i+spacing < len(song_data)): avg += song_data[i+spacing] avg = avg / spacing new_data.append(avg) i += spacing if(False): # pls dont kill me thx t = [offset + j/1000 for j in range(len(new_data))] plt.plot(t, new_data) plt.xlabel("Time") plt.ylabel("Amplitude") plt.grid() plt.show() return (new_data, len(new_data)) def filter_n_percent(song_name, offset, length, threshold, reduce, show): # threshold is in ]0, 100] # filter data associated with song_name to keep only the highest threshold% values subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(length), "-i", song_name, "crop.wav"]) sample_rate, song_data = wavfile.read('crop.wav') subprocess.run(["clear"]) subprocess.run(["rm", "crop.wav"]) if(reduce): (song_data,e) = to_ms(song_data, 44100, 1) sample_rate = 1000 mx = max(song_data) is_locked = [False for i in range(len(song_data))] x = int((len(song_data)*threshold)//100) #print("X = ", x) print("Retreiving the", int(x), "/", len(song_data), "highest values") elements = heapq.nlargest(int(x), enumerate(song_data), key=lambda x: x[1]) print("Done") for idx in range(len(elements)): is_locked[elements[idx][0]] = True for r in range(len(song_data)): if(is_locked[r] == False): song_data[r] = 0 if(show): #print("EEEEE") t = [offset + j/sample_rate for j in range(len(song_data))] plt.plot(t, song_data) plt.xlabel("Time") plt.ylabel("Amplitude") plt.grid() plt.show() return song_data def get_tpts(data, sample_rate, thr): res = [] for i in range(len(data)): if(data[i] > thr): res.append(i/sample_rate) for i in res: print(i) return res def test_sample(timelist): for i in range(1,len(timelist)): #os.system('play -n synth %s sin %s' % (0.05, 440)) for k in range(random.randint(1, 10)): print("E", end="") print("F") sleep(timelist[i]-timelist[i-1]) '''