from math import * import numpy as np from scipy.io import wavfile from scipy import signal import matplotlib.pyplot as plt import subprocess import wave as wv import struct import librosa import heapq import scipy print("Starting...\n") def find_bpm_2(sample_rate, data, threshold, maxbpm, show): mx = np.max(data) min_spacing = (60*sample_rate)/maxbpm k = 0 while(k < len(data) and data[k]/mx < threshold): k += 1 k += 1 spacing = [] current = 1 progress = 0 while(k < len(data)): if(k%(len(data)/100) == 0): print(progress, "%") progress += 1 if(data[k]/mx >= threshold and current > min_spacing): spacing.append(current) current = 0 else: current += 1 k += 1 for x in range(len(spacing)): spacing[x] = 60/(spacing[x]/sample_rate) digits = [i for i in range(len(spacing))] if(show): plt.plot(digits, spacing) plt.xlabel("N") plt.ylabel("BPM") plt.grid() plt.show() beat = np.mean(spacing) error = np.std(spacing) return (np.round(beat, 3), np.round(error, 3)) def to_ms(song_data, sample_rate, offset): # converts audio data to have exactly 1 sample per millisecond (aka set sample_rate to 1000) new_data = [] spacing = int(sample_rate * 0.001) mx = max(song_data) i = 0 while(i < len(song_data)): avg = 0 for k in range(spacing): if(i+spacing < len(song_data)): avg += song_data[i+spacing] avg = avg / spacing new_data.append(avg) i += spacing if(False): # pls dont kill me thx t = [offset + j/1000 for j in range(len(new_data))] plt.plot(t, new_data) plt.xlabel("Time") plt.ylabel("Amplitude") plt.grid() plt.show() return (new_data, len(new_data)) def filter_n_percent(song_name, offset, length, threshold, reduce, show): # threshold is in ]0, 100] # filter data associated with song_name to keep only the highest threshold% values subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(length), "-i", song_name, "crop.wav"]) sample_rate, song_data = wavfile.read('crop.wav') subprocess.run(["clear"]) subprocess.run(["rm", "crop.wav"]) if(reduce): (song_data,e) = to_ms(song_data, 44100, 1) sample_rate = 1000 mx = max(song_data) is_locked = [False for i in range(len(song_data))] x = int((len(song_data)*threshold)//100) #print("X = ", x) print("Retreiving the", int(x), "/", len(song_data), "highest values") elements = heapq.nlargest(int(x), enumerate(song_data), key=lambda x: x[1]) print("Done") for idx in range(len(elements)): is_locked[elements[idx][0]] = True for r in range(len(song_data)): if(is_locked[r] == False): song_data[r] = 0 if(show): #print("EEEEE") t = [offset + j/sample_rate for j in range(len(song_data))] plt.plot(t, song_data) plt.xlabel("Time") plt.ylabel("Amplitude") plt.grid() plt.show() return song_data def write_to_file_thr(sample_rate, song_data, offset, threshold, filename): # write data to output file file = open(filename, 'w') file.writelines('time,amplitude\n') mx = max(song_data) print("writing to output...") for i in range(len(song_data)): if(i%(len(song_data)//50) == 0): print(i, "/", len(song_data)) if(song_data[i]/mx > threshold): file.writelines(str(np.round(offset + i/sample_rate, 3))) file.writelines(',') file.writelines(str(np.round(song_data[i], 0))) file.writelines('\n') def round_t(id, sample_rate, bpm, div, offset, k0): k = k0 t = offset + k/(bpm*div) while(t < id/sample_rate): t = offset + k/(bpm*div) k += 1 if(np.abs(t - id/sample_rate) < np.abs((t - 1/(bpm*div)) - id/sample_rate)): return t return (t - 1/(bpm*div), 0) def snap(data, sample_rate, bpm, divisor, show): # adjust time amplitudes to match the given BPM new = [0 for x in range(int(1000*len(data)/sample_rate))] # 1pt per millisecond print("old =", len(data)) print("len =", 1000*len(data)/sample_rate) k = 0 t = 0 percent = 0 for i in range(len(data)): while(t < i/sample_rate): t = k/(bpm*divisor) k += 60 ''' if(np.abs(i/sample_rate - k/(bpm*divisor)) > np.abs(i/sample_rate - (k-60)/(bpm*divisor))): k -= 60 t = k/(bpm*divisor)''' if(i%(len(data)//100) == 0): print(percent, "%") percent += 1 if(int(t*1000) < len(new)): new[int(t*1000)] = max(data[i], new[int(t*1000)]) else: new[len(new)-1] = max(data[i], new[len(new)-1]) if(show): t = [j/1000 for j in range(len(new))] plt.plot(t, new) plt.xlabel("Time (e)") plt.ylabel("Amplitude") plt.grid() plt.show() return new def compress(Zxx): res = [] def void_freq(song_name, offset, songlen, increment, minfreq, maxfreq, upperthr, ampthr): fft_list = [] times = [] current_time = offset k = 0 subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(songlen-offset), "-i", song_name, "crop.wav"]) sample_rate, global_data = wavfile.read("crop.wav") blit = int(sample_rate*increment) subprocess.run(["clear"]) subprocess.run(["rm", "crop.wav"]) #print("Blit :", blit) pfreq = scipy.fft.rfftfreq(blit, 1/sample_rate) #print(len(pfreq)) while(current_time <= songlen): pff = scipy.fft.rfft(global_data[k*blit:(k+1)*blit]) fft_list.append(pff) times.append(k*increment) k += 1 current_time = offset + k*increment print("FFT :", len(fft_list), "\nFFT[0] :", len(fft_list[0]), "\npfreq :", len(pfreq)) print("Finding global max...") gmax = 0 for i in range(len(fft_list)): #fft_list[i] = np.real(fft_list[i]) for j in range(len(fft_list[i])): if(np.real(fft_list[i][j]) > gmax): gmax = np.real(fft_list[i][j]) print("Trimming...") for i in range(len(fft_list)): lmax = 0 for j in range(len(fft_list[i])): if(np.abs(fft_list[i][j]) > lmax): lmax = np.abs(fft_list[i][j]) for j in range(len(fft_list[i])): if((pfreq[j] >= minfreq and pfreq[j] < maxfreq) or pfreq[j] > upperthr): fft_list[i][j] = 0+0j if(np.abs(fft_list[i][j]) < lmax/ampthr): fft_list[i][j] = 0+0j if(True): res = [] print("Converting...") for i in range(len(fft_list)): ift = scipy.fft.irfft(fft_list[i], n=blit) for k in ift: res.append(k) #print(type(res[0])) mx = 0 for j in range(len(res)): if(res[j] > mx): mx = res[j] for i in range(len(res)): res[i] = np.int16(32767*res[i]/mx) res = np.array(res) wavfile.write("trimmed.wav", 44100, res) #plt.plot(np.abs(pfreq[:len(fft_list[0])]), np.abs(fft_list[0])) #plt.grid() #plt.show() print("Done") if(True): void_freq("worlds_end_3.wav", 74.582, 84.582, 10.001, minfreq=0, maxfreq=440, upperthr=4500, ampthr=60) #void_freq("440.wav", 0, 3.9, 3.901, minfreq=0, maxfreq=0, upperthr=20000) if(True): #data2 = filter_n_percent("worlds_end_3.wav", 74.582, 15, 0.2, reduce=False, show=True) data2 = filter_n_percent("trimmed.wav", 0, 10, 0.1, reduce=False, show=False) data2 = snap(data2, 44100, 180, 4, show=True) #write_to_file_thr(1000, data2, 74.582, 0.02, "timing_points.csv") print("Program finished with return 0") ''' -------------------------------------------------------------------- ''' ''' -----------------------| Feuilles mortes |-------------------------- ''' ''' -------------------------------------------------------------------- ''' ''' def smooth(data, thr, mergeThr, show): mx = max(data) for i in range(len(data)-mergeThr): if(data[i]/mx > thr): for k in range(1, mergeThr): data[i+k] = 0 if(show): t = [j/1000 for j in range(len(data))] plt.plot(t, data) plt.xlabel("Time (not scaled to origin)") plt.ylabel("Amplitude") plt.grid() plt.show() return data if(False): #t, f, Zxx = fct("no.wav", 0, 0.032, 10, 5000, False) #t, f, Zxx = fct("worlds_end_3.wav", 150.889, 0.032, 170.889, 3000, False) #t, f, Zxx = fct("deltamax.wav", 9.992, 0.032, 114.318, 3000, False) #t, f, Zxx = fct("deltamax.wav", 9.992, 0.032, 20, 3000, False) #t, f, Zxx = fct("da^9.wav", 8.463, 0.032, 20, 5000, False) t, f, Zxx = fct("13. Cosmic Mind.wav", 0, 0.032, 20, 5000, False) #t, f, Zxx = fct("Furioso Melodia 44100.wav", 4, 0.032, 8, 3000, False) #t, f, Zxx = fct("changing.wav", 0, 0.05, 3.9, 5000, False) #fct("worlds_end_3.wav", 75, (60/178)/4, 75+2, 2500) plot_max(t, f, Zxx, True) if(False): #(t, data) = peaks("worlds_end_3.wav", 0, 300, False, 0.92) (t, data) = peaks("worlds_end_3.wav", 74.582, 6, False, 0.9) #(t, data) = peaks("da^9.wav", 8.463, 301.924 - 8.463, False, 0.95) #(t, data) = peaks("deltamax.wav", 8.463, 30101.924 - 8.463, False, 0.92) da = find_bpm(t, 44100, data, 100, 200, 1, 10) print("BPM data is", da)''' #data = [-1 for i in range(int(x))] #ids = [-1 for i in range(int(x))] ''' data = [] ids = [] for k in range(int(x)): data.append(int(7*mx/10)) ids.append(-1) # structure there is [[index, value]...] i = 0 calc = 0 while(i < len(song_data)): if(i%10 == 0): print(i, "/", len(song_data)) if(data[int(x)-1] < song_data[i]): calc += 1 #print("\n \n \n \n \n") data[int(x)-1] = song_data[i] ids[int(x)-1] = i k = int(x)-1 #while(k < int(x) & data[0] > data[k]): while(k > 0 and data[k-1] <= data[k]): data[k], data[k-1] = data[k-1], data[k] ids[k], ids[k-1] = ids[k-1], ids[k] k -= 1 #print(data[int(x)-1], calc, "/", x) i += skip i += 1 for s in range(int(x)-1): if(data[s] < data[s+1]): print("Nope", s) assert(0) ''' ''' def fct(song_name, offset, increment, songlen, maxfreq, display): to_cut = 20000//maxfreq global_Zxx = np.array([]) global_f = np.array([]) global_t = np.array([]) current_time = offset k = 0 while(current_time <= songlen): subprocess.run(["ffmpeg", "-ss", str(current_time), "-t", str(increment), "-i", song_name, "crop.wav"]) sample_rate, audio_data = wavfile.read('crop.wav') size = audio_data.size #subprocess.run(["clear"]) subprocess.run(["rm", "crop.wav"]) # do stuff here #f, t, Zxx = signal.stft(audio_data, sample_rate, nperseg=1000) f, t, Zxx = signal.spectrogram(audio_data, fs=sample_rate, nfft=size) leng = len(f) f, Zxx = f[:leng//to_cut], Zxx[:leng//to_cut] #print(len(Zxx)) #print(len(Zxx[0])) for i in range(len(Zxx)): for j in range(len(Zxx[i])): Zxx[i][j] *= 1127*np.log(1+f[i]/700) t = np.array([current_time + x for x in t]) if(k == 0): global_f = f global_t = t global_Zxx = Zxx else: global_Zxx = np.concatenate((global_Zxx, Zxx), axis=1) global_t = np.concatenate((global_t, t)) #print(len(global_t)) k += 1 current_time = offset + k*increment print("Completion rate : ", np.round(100*(current_time-offset)/(songlen-offset), 4), "%") if(display): plt.pcolormesh(global_t, global_f, np.abs(global_Zxx), shading='gouraud') # print(len(global_Zxx), len(global_Zxx[0])) # 88 192 = 2500 # 70 192 = 2000 plt.title('STFT Magnitude') plt.ylabel('Frequency [Hz]') plt.xlabel('Time [sec]') plt.show() return global_t, global_f, np.abs(global_Zxx) def write_to_file(t, flist, maxlist, filename): file = open(filename, 'w') file.writelines('time,frequency,maxvalue\n') for i in range(len(t)): file.writelines(str(np.round(t[i], 3))) file.writelines(',') file.writelines(str(np.round(flist[i], 1))) file.writelines(',') file.writelines(str(np.round(maxlist[i], 0))) file.writelines('\n') #close(file) def plot_max(time, freq, Zxx, save): fres = [0 for x in range(len(time))] maxres = [0 for x in range(len(time))] for t in range(len(time)): #subprocess.run(["clear"]) print(t, "/", len(time)) for f in range(len(Zxx)): if(maxres[t] < Zxx[f][t]): maxres[t] = Zxx[f][t] fres[t] = freq[f] if(save): write_to_file(time, fres, maxres, 'output.csv') '''''' plt.plot(time, fres, 'r') plt.grid() plt.xlabel("Time") plt.ylabel("Maximum frequencies") plt.plot(time, maxres, 'g') plt.grid() plt.xlabel("Time") plt.ylabel("Maximun values") plt.show()'''''' fig, (ax1, ax2) = plt.subplots(2) fig.suptitle('Top : time and frequencies\nBottom : time and max values') ax1.plot(time, fres) ax2.plot(time, maxres) plt.show() def extract_peaks(song_data, sample_rate, offset, display, threshold): mx = max(song_data) for i in range(len(song_data)): #subprocess.run(["clear"]) print(i, "/", len(song_data)) if(song_data[i]/mx < threshold): song_data[i] = 0 t = [offset + i/sample_rate for i in range(len(song_data))] if(display): plt.plot(t, song_data, 'b+') plt.grid() plt.xlabel("t") plt.ylabel("amp") plt.show() return (t, song_data) def get_local_max(song_data, center, width): mx = 0 for o in range(-width, width+1): togo = min(len(song_data)-1, center+o) togo = max(0, togo) if(mx < song_data[togo]): mx = song_data[togo] return mx def extract_peaks_v2(song_data, sample_rate, offset, display, threshold, seglen): mx = 0 for i in range(len(song_data)): if (i%seglen == 0): print("----") mx = get_local_max(song_data, i+seglen//2, seglen//2) #subprocess.run(["clear"]) print(i, "/", len(song_data)) if(song_data[i]/mx < threshold): song_data[i] = 0 t = [offset + i/sample_rate for i in range(len(song_data))] if(display): plt.plot(t, song_data, 'b+') plt.grid() plt.xlabel("t") plt.ylabel("amp") plt.show() return (t, song_data) def peaks(song_name, offset, length, display, thr): subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(length), "-i", song_name, "crop.wav"]) sample_rate, audio_data = wavfile.read('crop.wav') subprocess.run(["clear"]) subprocess.run(["rm", "crop.wav"]) #return extract_peaks(audio_data, sample_rate, offset, display, thr) return extract_peaks_v2(audio_data, sample_rate, offset, display, thr, 44100*2) def find_bpm(sample_rate, data, minbpm, maxbpm, step, width): optimal = minbpm optimal_acc = 0 accuracy = 0 bpmlst = [] scores = [] for beat in range(minbpm, maxbpm+step, step): loopturn = 0 print("testing", beat) accuracy = 0 current = 0 while(current+width < len(data)): loopturn += 1 for o in range(-width, width+1): accuracy += data[current + o] #current = (loopturn*sample_rate)//beat current += (sample_rate)//beat #accuracy = accuracy/loopturn #accuracy *= (1+(maxbpm-beat)/minbpm) if optimal_acc < accuracy: optimal_acc = accuracy optimal = beat bpmlst.append(beat) scores.append(accuracy) if(False): plt.plot(bpmlst, scores) plt.xlabel("BPM") plt.ylabel("Score") plt.grid() plt.show() return (optimal, optimal_acc) ''' ''' def void_freq(song_name, offset, songlen, increment, lthr, gthr): to_cut = 20000//2500 global_Zxx = np.array([]) global_f = np.array([]) global_t = np.array([]) current_time = offset k = 0 sample_rate, global_data = wavfile.read(song_name) blit = int(sample_rate*increment) print("Blit :", blit) while(current_time <= songlen): #subprocess.run(["ffmpeg", "-ss", str(current_time), "-t", str(increment), "-i", song_name, "crop.wav"]) #sample_rate, audio_data = wavfile.read('crop.wav') audio_data = global_data[int(k*blit):int((k+1)*blit)] size = audio_data.size #subprocess.run(["clear"]) #subprocess.run(["rm", "crop.wav"]) # do stuff here #f, t, Zxx = signal.stft(audio_data, sample_rate, nperseg=1000) f, t, Zxx = signal.spectrogram(audio_data, fs=sample_rate, nfft=size) leng = len(f) f, Zxx = f[:leng//to_cut], Zxx[:leng//to_cut] for i in range(len(Zxx)): for j in range(len(Zxx[i])): #Zxx[i][j] *= 1127*np.log(1+f[i]/700) Zxx[i][j] *= 1000 t = np.array([current_time + x for x in t]) if(k == 0): global_f = f global_t = t global_Zxx = Zxx else: global_Zxx = np.concatenate((global_Zxx, Zxx), axis=1) global_t = np.concatenate((global_t, t)) #print(len(global_t)) k += 1 current_time = offset + k*increment print("Completion rate : ", np.round(100*(current_time-offset)/(songlen-offset), 4), "%") print("Finding global max...") gmax = 0 for i in range(len(global_Zxx)): for j in range(len(global_Zxx[i])): if(global_Zxx[i][j] > gmax): gmax = global_Zxx[i][j] print("Trimming...") for j in range(len(global_Zxx[0])): lmax = 0 for i in range(len(global_Zxx)): if(global_Zxx[i][j] > lmax): lmax = global_Zxx[i][j] for i in range(len(global_Zxx)): val = global_Zxx[i][j] if(val/lmax <= lthr/100): global_Zxx[i][j] = 0 elif(val/gmax <= gthr/100): global_Zxx[i][j] = 0 if(False): print("Plotting...") plt.pcolormesh(global_t, global_f, np.abs(global_Zxx), shading='gouraud') # print(len(global_Zxx), len(global_Zxx[0])) print("XLEN :", len(global_Zxx), "\nYLEN :", len(global_Zxx[0])) plt.title('STFT Magnitude') plt.ylabel('Frequency [Hz]') plt.xlabel('Time [sec]') plt.show() if(True): print("Converting...") audio_signal = librosa.griffinlim(global_Zxx) #scipy.io.wavfile.write('trimmed.wav', sample_rate, np.array(audio_signal, dtype=np.int16)) wavfile.write('test.wav', sample_rate, np.array(audio_signal, dtype=np.int16)) print("Done") '''