diff --git a/cleaned_sp.py b/cleaned_sp.py new file mode 100644 index 0000000..fafcce5 --- /dev/null +++ b/cleaned_sp.py @@ -0,0 +1,349 @@ +from math import * +import numpy as np +import scipy as scp +from scipy.io import wavfile +import matplotlib.pyplot as plt +import subprocess +import heapq +from pathlib import Path +from time import sleep + +def is_data_stereo(raw_global_data:list) -> bool: + """ + self-explainatory + """ + try: + assert(raw_global_data[0][0]) + except IndexError: + return False + except AssertionError: + return True + return True + +def retrieve_dominant_freqs(song_name, offset, songlen, segsize): + # returns a list with peak frequencies alongside the sample rate + # /!\ song_name is specified to be a list, NOT a list of couples (aka song is mono) + # segsize is in seconds + + # remove high_pitched/low-pitched frequencies + minfreq = 110 + maxfreq = 440*8 + + # cutting the song to only keep the one we're interested in + subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(songlen+offset), "-i", song_name, "crop.wav"], shell=False) + + # extracting data from cropped song + sample_rate, raw_song_data = wavfile.read("crop.wav") + blit = int(sample_rate*segsize) # Te + + song_data = [0 for i in range(len(raw_song_data))] + + a = 0 + if(is_data_stereo(raw_song_data)): + print("Converting to mono...") + for x in range(len(raw_song_data)): + song_data[x] = raw_song_data[x][0]/2 + raw_song_data[x][1]/2 + + if(x % (int(len(raw_song_data)/100)) == 0): + print(a, "/ 100") + a += 1 + else: + song_data = raw_song_data + + # remove the copy of the song + subprocess.run(["rm", "crop.wav"], shell=False) + + # calculate the frequencies associated to the FFTs + pfreq = scp.fft.rfftfreq(blit, 1/sample_rate) + + # left boundary of segment to crop + current_time = offset + + # list of FFTs + fft_list = [] + + # number of samples + k = 0 + + print("Retrieving freqs from", offset, "to", songlen+offset, "...") + print("amplitudes are from", minfreq, "to", maxfreq) + while(current_time < songlen-segsize): + # index corresponding to left boundary + left_id = int(current_time*sample_rate) + + # index corresponding to right boundary + right_id = int((current_time+segsize)*sample_rate) + + # calculate the fft, append it to fft_list + pff = scp.fft.rfft(song_data[int(current_time*sample_rate):int(sample_rate*(current_time+segsize))]) + fft_list.append(pff) + + # just to avoid what causes 0.1 + 0.1 == 0.2 to be False + k += 1 + current_time = offset + k*segsize + #print(current_time) + + # spacing between samples (time) + fe = segsize/sample_rate + + # list that will contain the maximum frequencies/amplitudes for all FFTs + maxlist = [] + maxamps = [] + + print("\n\nSegSize :", segsize, "\nFFT :", len(fft_list), "\nFFT[0] :", len(fft_list[0]), "\npfreq :", len(pfreq), "\n\n") + + # find all maximums + for i in range(len(fft_list)): + current_max = -1 + current_fmax = 0 + + for j in range(len(fft_list[i])): + if(pfreq[j] < maxfreq and pfreq[j] >= minfreq and np.abs(fft_list[i][j]) > current_max): + current_max = np.abs(fft_list[i][j]) + current_fmax = pfreq[j] + + maxlist.append(current_fmax) + maxamps.append(current_max) + + # gg + # maxlist[i] corresponds to time (offset + i*segsize) + return (maxlist, maxamps) + +def void_freq_clean(song_name, offset, songlen, segsize, minfreq, maxfreq, ampthr, output_name): + # removes unnecessary frequencies/amps from a song + #ampthr is in [0, 1] + + # remove high_pitched/low-pitched frequencies + minfreq = 110 + maxfreq = 440*8 + + # cutting the song to only keep the one we're interested in + subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(songlen+offset), "-i", song_name, "crop.wav"], shell=False) + + # extracting data from cropped song + sample_rate, raw_song_data = wavfile.read("crop.wav") + blit = int(sample_rate*segsize) # Te + + song_data = [0 for i in range(len(raw_song_data))] + + a = 0 + if(is_data_stereo(raw_song_data)): + print("Converting to mono...") + for x in range(len(raw_song_data)): + song_data[x] = raw_song_data[x][0]/2 + raw_song_data[x][1]/2 + + if(x % (int(len(raw_song_data)/100)) == 0): + print(a, "/ 100") + a += 1 + else: + song_data = raw_song_data + + # remove the copy of the song + subprocess.run(["rm", "crop.wav"], shell=False) + + # calculate the frequencies associated to the FFTs + pfreq = scp.fft.rfftfreq(blit, 1/sample_rate) + + # left boundary of segment to crop + current_time = offset + + # list of FFTs + fft_list = [] + + # number of samples + k = 0 + + print("Retrieving freqs from", offset, "to", songlen+offset, "...") + print("amplitudes are from", minfreq, "to", maxfreq) + while(current_time < songlen-segsize): + # index corresponding to left boundary + left_id = int(current_time*sample_rate) + + # index corresponding to right boundary + right_id = int((current_time+segsize)*sample_rate) + + # calculate the fft, append it to fft_list + pff = scp.fft.rfft(song_data[int(current_time*sample_rate):int(sample_rate*(current_time+segsize))]) + fft_list.append(pff) + + # just to avoid what causes 0.1 + 0.1 == 0.2 to be False + k += 1 + current_time = offset + k*segsize + #print(current_time) + + print("\n\nSegSize :", segsize, "\nFFT :", len(fft_list), "\nFFT[0] :", len(fft_list[0]), "\npfreq :", len(pfreq), "\n\n") + + # remove + for i in range(len(fft_list)): + # get the local max freq + lmax = 0 + for j in range(len(fft_list[i])): + if(np.abs(fft_list[i][j]) > lmax): + lmax = np.abs(fft_list[i][j]) + + # remove freqs + amps + for j in range(len(fft_list[i])): + if(pfreq[j] <= minfreq or pfreq[j] >= maxfreq): + fft_list[i][j] = 0+0j + + if(np.abs(fft_list[i][j]) <= lmax*ampthr): + fft_list[i][j] = 0+0j + + # writing new .wav + res = [] + print("Converting...") + for i in range(len(fft_list)): + ift = scp.fft.irfft(fft_list[i], n=blit) + for k in ift: + res.append(k) + #print(type(res[0])) + mx = 0 + for j in range(len(res)): + if(res[j] > mx): + mx = res[j] + + for i in range(len(res)): + res[i] = np.int16(32767*res[i]/mx) + + res = np.array(res) + wavfile.write(output_name, sample_rate, res) + +def retrieve_dominant_amps(song_name, offset, songlen, segsize, percent): + # returns a list with the percent% peak amplitudes alongside the sample rate + # /!\ song_name is specified to be a list, NOT a list of couples (aka song is mono) + # segsize is in seconds + + # cutting the song to only keep the one we're interested in + subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(songlen+offset), "-i", song_name, "crop.wav"], shell=False) + + # extracting data from cropped song + sample_rate, raw_song_data = wavfile.read("crop.wav") + blit = int(sample_rate*segsize) # Te + + # in case song has stereo format, conversion to mono + song_data = [0 for i in range(len(raw_song_data))] + + a = 0 + if(is_data_stereo(raw_song_data)): + print("Converting to mono...") + for x in range(len(raw_song_data)): + song_data[x] = raw_song_data[x][0]/2 + raw_song_data[x][1]/2 + + if(x % (int(len(raw_song_data)/100)) == 0): + print(a, "/ 100") + a += 1 + else: + song_data = raw_song_data + + # which notes will be voided + is_locked = [False for i in range(len(song_data))] + x = int((len(song_data)*percent)//100) + + print("Retreiving the", int(x), "/", len(song_data), "highest values") + elements = heapq.nlargest(int(x), enumerate(song_data), key=lambda x: x[1]) + #returns a list of couples [id, value] + + for idx in range(len(elements)): + is_locked[elements[idx][0]] = True + + for r in range(len(song_data)): + if(is_locked[r] == False): + song_data[r] = 0 + + # now we need to reduce song_data so that it matches the length of the previous function's return + res = [] + k = 0 + current_time = offset + + while(current_time < songlen-segsize): + # index corresponding to left boundary + left_id = int(current_time*sample_rate) + + # index corresponding to right boundary + right_id = int((current_time+segsize)*sample_rate) + + # merge the segment into one value + cmax = 0 + for i in range(left_id, right_id): + if(i < len(song_data) and cmax < song_data[i]): + cmax = song_data[i] + + res.append(cmax) + + k += 1 + current_time = offset + k*segsize + + # gg + # res[i] corresponds to time (offset + i*segsize) + return res + +def convert_to_wav(song_name:str, output_file="audio.wav") -> str: + """ + Converts the song to .wav, only if it's not already in wave format. + Currently relies on file extension. + Returns: the song_name that should be used afterwards. + """ + extension = Path(song_name).suffix + match extension: + case ".mp3" | ".ogg": + print("Converting to .wav...") + subprocess.run(["ffmpeg", "-y", "-i", song_name, output_file], shell=False) + return output_file + return song_name + +def retrieve_all_from_song(filename, t0, t1, dt=0.001, threshold=0.1): + # dt = sample interval + # threshold is in percent + + if(t1 <= t0): + print("ERROR : t1 <= t0\n") + exit(1) + + # converts format to .wav + new_fn = convert_to_wav(filename) + + # crop the song to the part that will be mapped + subprocess.run(["ffmpeg", "-ss", str(t0), "-t", str(t1), "-i", new_fn, "crop0.wav"], shell=False) + subprocess.run(["clear"]) + + sample_rate, _ = wavfile.read("crop0.wav") + + print("Filtering song...") + void_freq_clean(new_fn, t0, t1-t0, dt, 200, 2500, 0.05, "crop1.wav") + + print("Now retrieving the frequencies") + (maxlist, maxamps) = retrieve_dominant_freqs(new_fn, t0, t1-t0, dt) + + print("Now retrieving the amplitudes") + amps = retrieve_dominant_amps(new_fn, t0, t1-t0, dt, threshold) + + print("Len of freqs : ", len(maxlist), "|", len(maxamps)) + print("Len of amps : ", len(maxlist), "|", len(amps)) + + timesF = [t0 + dt*k for k in range(len(maxlist))] + timesA = [t0 + dt*k for k in range(len(amps))] + + plt.plot(timesF, maxlist) + plt.show() + + plt.plot(timesA, amps) + plt.show() + + # free() + subprocess.run(["rm", "crop0.wav"], shell=False) + +retrieve_all_from_song("tetris_4.wav", 0, 5) +print("yipee") + + + + + + + + + + + + + diff --git a/crop.wav b/crop.wav new file mode 100644 index 0000000..417d66b Binary files /dev/null and b/crop.wav differ diff --git a/crop1.wav b/crop1.wav new file mode 100644 index 0000000..31148cb Binary files /dev/null and b/crop1.wav differ diff --git a/script (1).py b/script (1).py deleted file mode 100644 index 486fe63..0000000 --- a/script (1).py +++ /dev/null @@ -1,145 +0,0 @@ -import numpy as np -import scipy as scp -import heapq - -def retrieve_dominant_freqs(song_name, offset, songlen, segsize): - # returns a list with peak frequencies alongside the sample rate - # /!\ song_name is specified to be a list, NOT a list of couples (aka song is mono) - # segsize is in seconds - - # remove high_pitched/low-pitched frequencies - minfreq = 110 - maxfreq = 440*8 - - # cutting the song to only keep the one we're interested in - subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(songlen+offset), "-i", song_name, "crop.wav"], shell=False) - - # extracting data from cropped song - sample_rate, song_data = wavfile.read("crop.wav") - blit = int(sample_rate*segsize) # Te - - # remove the copy of the song - subprocess.run(["rm", "crop.wav"], shell=False) - - # calculate the frequencies associated to the FFTs - pfreq = scipy.fft.rfftfreq(blit, 1/sample_rate) - - # left boundary of segment to crop - current_time = offset - - # list of FFTs - fft_list = [] - - # number of samples - k = 0 - - while(current_time <= songlen+offset): - # index corresponding to left boundary - left_id = int(current_time*sample_rate) - - # index corresponding to right boundary - right_id = int((current_time+segsize)*sample_rate) - - # calculate the fft, append it to fft_list - pff = scp.fft.rfft(global_data[left:right]) - fft_list.append(pff) - - # just to avoid what causes 0.1 + 0.1 == 0.2 to be False - k += 1 - current_time = offset + k*segsize - - # spacing between samples (time) - fe = segsize/sample_rate - - # list that will contain the maximum frequencies/amplitudes for all FFTs - maxlist = [] - maxamps = [] - - # find all maximums - for i in range(len(fft_list)): - current_max = -1 - current_fmax = 0 - - for j in range(len(fft_list[i])): - if(pfreq[j] < maxfreq & pfreq[j] >= minfreq & np.abs(fft_list[i][j]) > current_max): - current_max = np.abs(fft_list[i][j]) - current_fmax = pfreq[j] - - maxlist.append(current_fmax) - maxamps.append(current_max) - - # gg - # maxlist[i] corresponds to time (offset + i*segsize) - return (maxlist, maxamps, segsize) - -def retrieve_dominant_amps(song_name, offset, songlen, segsize, percent): - # returns a list with the percent% peak amplitudes alongside the sample rate - # /!\ song_name is specified to be a list, NOT a list of couples (aka song is mono) - # segsize is in seconds - - # cutting the song to only keep the one we're interested in - subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(songlen+offset), "-i", song_name, "crop.wav"], shell=False) - - # extracting data from cropped song - sample_rate, song_data = wavfile.read("crop.wav") - blit = int(sample_rate*segsize) # Te - - # remove the copy of the song - subprocess.run(["rm", "crop.wav"], shell=False) - - # which notes will be voided - is_locked = [False for i in range(len(song_data))] - x = int((len(song_data)*threshold)//100) - - print("Retreiving the", int(x), "/", len(song_data), "highest values") - elements = heapq.nlargest(int(x), enumerate(song_data), key=lambda x: x[1]) - #returns a list of couples [id, value] - - for idx in range(len(elements)): - is_locked[elements[idx][0]] = True - - for r in range(len(song_data)): - if(is_locked[r] == False): - song_data[r] = 0 - - # now we need to reduce song_data so that it matches the length of the previous function's return - res = [] - k = 0 - current_time = offset - - while(current_time <= songlen+offset): - # index corresponding to left boundary - left_id = int(current_time*sample_rate) - - # index corresponding to right boundary - right_id = int((current_time+segsize)*sample_rate) - - # merge the segment into one value - cmax = 0 - for i in range(left_id, right_id): - if(i < len(song_data) & cmax < song_data[i]): - cmax = song_data[i] - - res.append(cmax) - - k += 1 - current_time = current_time + k*segsize - - # gg - # res[i] corresponds to time (offset + i*segsize) - return res - -print("done") - - - - - - - - - - - - - diff --git a/sound_process.py b/sound_process.py index cbc23fe..5c84f86 100755 --- a/sound_process.py +++ b/sound_process.py @@ -341,532 +341,3 @@ def main(): if __name__ == "__main__": main() - - - - - - - - - - - - - - - - - - - - - -''' -------------------------------------------------------------------- ''' -''' -----------------------| Feuilles mortes |-------------------------- ''' -''' -------------------------------------------------------------------- ''' - - -''' -def smooth(data, thr, mergeThr, show): - mx = max(data) - for i in range(len(data)-mergeThr): - if(data[i]/mx > thr): - for k in range(1, mergeThr): - data[i+k] = 0 - if(show): - t = [j/1000 for j in range(len(data))] - plt.plot(t, data) - plt.xlabel("Time (not scaled to origin)") - plt.ylabel("Amplitude") - plt.grid() - plt.show() - - return data -if(False): - #t, f, Zxx = fct("no.wav", 0, 0.032, 10, 5000, False) - #t, f, Zxx = fct("worlds_end_3.wav", 150.889, 0.032, 170.889, 3000, False) - #t, f, Zxx = fct("deltamax.wav", 9.992, 0.032, 114.318, 3000, False) - #t, f, Zxx = fct("deltamax.wav", 9.992, 0.032, 20, 3000, False) - #t, f, Zxx = fct("da^9.wav", 8.463, 0.032, 20, 5000, False) - t, f, Zxx = fct("13. Cosmic Mind.wav", 0, 0.032, 20, 5000, False) - #t, f, Zxx = fct("Furioso Melodia 44100.wav", 4, 0.032, 8, 3000, False) - #t, f, Zxx = fct("changing.wav", 0, 0.05, 3.9, 5000, False) - #fct("worlds_end_3.wav", 75, (60/178)/4, 75+2, 2500) - - plot_max(t, f, Zxx, True) - -if(False): - #(t, data) = peaks("worlds_end_3.wav", 0, 300, False, 0.92) - (t, data) = peaks("worlds_end_3.wav", 74.582, 6, False, 0.9) - #(t, data) = peaks("da^9.wav", 8.463, 301.924 - 8.463, False, 0.95) - #(t, data) = peaks("deltamax.wav", 8.463, 30101.924 - 8.463, False, 0.92) - da = find_bpm(t, 44100, data, 100, 200, 1, 10) - print("BPM data is", da)''' - - #data = [-1 for i in range(int(x))] - #ids = [-1 for i in range(int(x))] -''' - data = [] - ids = [] - for k in range(int(x)): - data.append(int(7*mx/10)) - ids.append(-1) - # structure there is [[index, value]...] - - i = 0 - calc = 0 - while(i < len(song_data)): - if(i%10 == 0): - print(i, "/", len(song_data)) - if(data[int(x)-1] < song_data[i]): - calc += 1 - #print("\n \n \n \n \n") - data[int(x)-1] = song_data[i] - ids[int(x)-1] = i - - k = int(x)-1 - #while(k < int(x) & data[0] > data[k]): - while(k > 0 and data[k-1] <= data[k]): - data[k], data[k-1] = data[k-1], data[k] - ids[k], ids[k-1] = ids[k-1], ids[k] - k -= 1 - - #print(data[int(x)-1], calc, "/", x) - - i += skip - i += 1 - - - for s in range(int(x)-1): - if(data[s] < data[s+1]): - print("Nope", s) - assert(0) -''' - - -''' -def fct(song_name, offset, increment, songlen, maxfreq, display): - to_cut = 20000//maxfreq - global_Zxx = np.array([]) - global_f = np.array([]) - global_t = np.array([]) - current_time = offset - k = 0 - while(current_time <= songlen): - subprocess.run(["ffmpeg", "-ss", str(current_time), "-t", str(increment), "-i", song_name, "crop.wav"], shell=False) - - sample_rate, audio_data = wavfile.read('crop.wav') - size = audio_data.size - - #subprocess.run(["clear"]) - subprocess.run(["rm", "crop.wav"], shell=False) - - # do stuff here - #f, t, Zxx = signal.stft(audio_data, sample_rate, nperseg=1000) - f, t, Zxx = signal.spectrogram(audio_data, fs=sample_rate, nfft=size) - leng = len(f) - - f, Zxx = f[:leng//to_cut], Zxx[:leng//to_cut] - - #print(len(Zxx)) - #print(len(Zxx[0])) - - - for i in range(len(Zxx)): - for j in range(len(Zxx[i])): - Zxx[i][j] *= 1127*np.log(1+f[i]/700) - - - t = np.array([current_time + x for x in t]) - - if(k == 0): - global_f = f - global_t = t - global_Zxx = Zxx - else: - global_Zxx = np.concatenate((global_Zxx, Zxx), axis=1) - global_t = np.concatenate((global_t, t)) - - #print(len(global_t)) - - k += 1 - current_time = offset + k*increment - - print("Completion rate : ", np.round(100*(current_time-offset)/(songlen-offset), 4), "%") - - if(display): - plt.pcolormesh(global_t, global_f, np.abs(global_Zxx), shading='gouraud') - # print(len(global_Zxx), len(global_Zxx[0])) - # 88 192 = 2500 - # 70 192 = 2000 - plt.title('STFT Magnitude') - plt.ylabel('Frequency [Hz]') - plt.xlabel('Time [sec]') - plt.show() - - return global_t, global_f, np.abs(global_Zxx) - -def write_to_file(t, flist, maxlist, filename): - file = open(filename, 'w') - file.writelines('time,frequency,maxvalue\n') - for i in range(len(t)): - file.writelines(str(np.round(t[i], 3))) - file.writelines(',') - file.writelines(str(np.round(flist[i], 1))) - file.writelines(',') - file.writelines(str(np.round(maxlist[i], 0))) - file.writelines('\n') - #close(file) - -def plot_max(time, freq, Zxx, save): - fres = [0 for x in range(len(time))] - maxres = [0 for x in range(len(time))] - for t in range(len(time)): - #subprocess.run(["clear"]) - print(t, "/", len(time)) - for f in range(len(Zxx)): - if(maxres[t] < Zxx[f][t]): - maxres[t] = Zxx[f][t] - fres[t] = freq[f] - - if(save): - write_to_file(time, fres, maxres, 'output.csv') - - '''''' - plt.plot(time, fres, 'r') - plt.grid() - plt.xlabel("Time") - plt.ylabel("Maximum frequencies") - - plt.plot(time, maxres, 'g') - plt.grid() - plt.xlabel("Time") - plt.ylabel("Maximun values") - - plt.show()'''''' - - fig, (ax1, ax2) = plt.subplots(2) - fig.suptitle('Top : time and frequencies\nBottom : time and max values') - ax1.plot(time, fres) - ax2.plot(time, maxres) - - plt.show() - -def extract_peaks(song_data, sample_rate, offset, display, threshold): - mx = max(song_data) - for i in range(len(song_data)): - #subprocess.run(["clear"]) - print(i, "/", len(song_data)) - if(song_data[i]/mx < threshold): - song_data[i] = 0 - t = [offset + i/sample_rate for i in range(len(song_data))] - - if(display): - plt.plot(t, song_data, 'b+') - plt.grid() - plt.xlabel("t") - plt.ylabel("amp") - plt.show() - - return (t, song_data) - -def get_local_max(song_data, center, width): - mx = 0 - for o in range(-width, width+1): - togo = min(len(song_data)-1, center+o) - togo = max(0, togo) - if(mx < song_data[togo]): - mx = song_data[togo] - return mx - -def extract_peaks_v2(song_data, sample_rate, offset, display, threshold, seglen): - mx = 0 - for i in range(len(song_data)): - if (i%seglen == 0): - print("----") - mx = get_local_max(song_data, i+seglen//2, seglen//2) - #subprocess.run(["clear"]) - print(i, "/", len(song_data)) - if(song_data[i]/mx < threshold): - song_data[i] = 0 - - t = [offset + i/sample_rate for i in range(len(song_data))] - - if(display): - plt.plot(t, song_data, 'b+') - plt.grid() - plt.xlabel("t") - plt.ylabel("amp") - plt.show() - - return (t, song_data) - -def peaks(song_name, offset, length, display, thr): - subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(length), "-i", song_name, "crop.wav"], shell=False) - - sample_rate, audio_data = wavfile.read('crop.wav') - - #subprocess.run(["clear"]) - subprocess.run(["rm", "crop.wav"], shell=False) - - #return extract_peaks(audio_data, sample_rate, offset, display, thr) - return extract_peaks_v2(audio_data, sample_rate, offset, display, thr, 44100*2) - -def find_bpm(sample_rate, data, minbpm, maxbpm, step, width): - optimal = minbpm - optimal_acc = 0 - accuracy = 0 - - bpmlst = [] - scores = [] - - for beat in range(minbpm, maxbpm+step, step): - loopturn = 0 - print("testing", beat) - accuracy = 0 - current = 0 - - while(current+width < len(data)): - loopturn += 1 - for o in range(-width, width+1): - accuracy += data[current + o] - #current = (loopturn*sample_rate)//beat - current += (sample_rate)//beat - - #accuracy = accuracy/loopturn - - #accuracy *= (1+(maxbpm-beat)/minbpm) - if optimal_acc < accuracy: - optimal_acc = accuracy - optimal = beat - bpmlst.append(beat) - scores.append(accuracy) - - if(False): - plt.plot(bpmlst, scores) - plt.xlabel("BPM") - plt.ylabel("Score") - plt.grid() - plt.show() - - return (optimal, optimal_acc) -''' - - - -''' -def void_freq(song_name, offset, songlen, increment, lthr, gthr): - to_cut = 20000//2500 - global_Zxx = np.array([]) - global_f = np.array([]) - global_t = np.array([]) - current_time = offset - k = 0 - sample_rate, global_data = wavfile.read(song_name) - blit = int(sample_rate*increment) - print("Blit :", blit) - while(current_time <= songlen): - #subprocess.run(["ffmpeg", "-ss", str(current_time), "-t", str(increment), "-i", song_name, "crop.wav"]) - - #sample_rate, audio_data = wavfile.read('crop.wav') - audio_data = global_data[int(k*blit):int((k+1)*blit)] - size = audio_data.size - - #subprocess.run(["clear"]) - #subprocess.run(["rm", "crop.wav"]) - - # do stuff here - #f, t, Zxx = signal.stft(audio_data, sample_rate, nperseg=1000) - f, t, Zxx = signal.spectrogram(audio_data, fs=sample_rate, nfft=size) - leng = len(f) - - f, Zxx = f[:leng//to_cut], Zxx[:leng//to_cut] - - for i in range(len(Zxx)): - for j in range(len(Zxx[i])): - #Zxx[i][j] *= 1127*np.log(1+f[i]/700) - Zxx[i][j] *= 1000 - - t = np.array([current_time + x for x in t]) - - if(k == 0): - global_f = f - global_t = t - global_Zxx = Zxx - else: - global_Zxx = np.concatenate((global_Zxx, Zxx), axis=1) - global_t = np.concatenate((global_t, t)) - - #print(len(global_t)) - - k += 1 - current_time = offset + k*increment - - print("Completion rate : ", np.round(100*(current_time-offset)/(songlen-offset), 4), "%") - - print("Finding global max...") - gmax = 0 - for i in range(len(global_Zxx)): - for j in range(len(global_Zxx[i])): - if(global_Zxx[i][j] > gmax): - gmax = global_Zxx[i][j] - - print("Trimming...") - for j in range(len(global_Zxx[0])): - lmax = 0 - for i in range(len(global_Zxx)): - if(global_Zxx[i][j] > lmax): - lmax = global_Zxx[i][j] - - for i in range(len(global_Zxx)): - val = global_Zxx[i][j] - if(val/lmax <= lthr/100): - global_Zxx[i][j] = 0 - elif(val/gmax <= gthr/100): - global_Zxx[i][j] = 0 - - if(False): - print("Plotting...") - plt.pcolormesh(global_t, global_f, np.abs(global_Zxx), shading='gouraud') - # print(len(global_Zxx), len(global_Zxx[0])) - print("XLEN :", len(global_Zxx), "\nYLEN :", len(global_Zxx[0])) - plt.title('STFT Magnitude') - plt.ylabel('Frequency [Hz]') - plt.xlabel('Time [sec]') - plt.show() - - if(True): - print("Converting...") - audio_signal = librosa.griffinlim(global_Zxx) - #scipy.io.wavfile.write('trimmed.wav', sample_rate, np.array(audio_signal, dtype=np.int16)) - wavfile.write('test.wav', sample_rate, np.array(audio_signal, dtype=np.int16)) - - print("Done") - -def find_bpm_2(sample_rate, data, threshold, maxbpm, show): - mx = np.max(data) - min_spacing = (60*sample_rate)/maxbpm - k = 0 - while(k < len(data) and data[k]/mx < threshold): - k += 1 - - k += 1 - spacing = [] - current = 1 - progress = 0 - - while(k < len(data)): - if(k%(len(data)/100) == 0): - print(progress, "%") - progress += 1 - if(data[k]/mx >= threshold and current > min_spacing): - spacing.append(current) - current = 0 - else: - current += 1 - k += 1 - - - for x in range(len(spacing)): - spacing[x] = 60/(spacing[x]/sample_rate) - - digits = [i for i in range(len(spacing))] - if(show): - plt.plot(digits, spacing) - plt.xlabel("N") - plt.ylabel("BPM") - plt.grid() - plt.show() - - beat = np.mean(spacing) - error = np.std(spacing) - - return (np.round(beat, 3), np.round(error, 3)) - -def to_ms(song_data, sample_rate, offset): - # converts audio data to have exactly 1 sample per millisecond (aka set sample_rate to 1000) - new_data = [] - spacing = int(sample_rate * 0.001) - mx = max(song_data) - i = 0 - while(i < len(song_data)): - avg = 0 - for k in range(spacing): - if(i+spacing < len(song_data)): - avg += song_data[i+spacing] - avg = avg / spacing - new_data.append(avg) - i += spacing - - if(False): # pls dont kill me thx - t = [offset + j/1000 for j in range(len(new_data))] - plt.plot(t, new_data) - plt.xlabel("Time") - plt.ylabel("Amplitude") - plt.grid() - plt.show() - - return (new_data, len(new_data)) - -def filter_n_percent(song_name, offset, length, threshold, reduce, show): - # threshold is in ]0, 100] - # filter data associated with song_name to keep only the highest threshold% values - - subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(length), "-i", song_name, "crop.wav"], shell=False) - - sample_rate, song_data = wavfile.read('crop.wav') - - subprocess.run(["clear"], shell=False) - subprocess.run(["rm", "crop.wav"], shell=False) - - if(reduce): - (song_data,e) = to_ms(song_data, 44100, 1) - sample_rate = 1000 - - mx = max(song_data) - - is_locked = [False for i in range(len(song_data))] - x = int((len(song_data)*threshold)//100) - #print("X = ", x) - - print("Retreiving the", int(x), "/", len(song_data), "highest values") - elements = heapq.nlargest(int(x), enumerate(song_data), key=lambda x: x[1]) - print("Done") - - for idx in range(len(elements)): - is_locked[elements[idx][0]] = True - - for r in range(len(song_data)): - if(is_locked[r] == False): - song_data[r] = 0 - - if(show): - #print("EEEEE") - t = [offset + j/sample_rate for j in range(len(song_data))] - plt.plot(t, song_data) - plt.xlabel("Time") - plt.ylabel("Amplitude") - plt.grid() - plt.show() - - return song_data - -def get_tpts(data, sample_rate, thr): - res = [] - for i in range(len(data)): - if(data[i] > thr): - res.append(i/sample_rate) - - for i in res: - print(i) - return res - -def test_sample(timelist): - for i in range(1,len(timelist)): - #os.system('play -n synth %s sin %s' % (0.05, 440)) - for k in range(random.randint(1, 10)): - print("E", end="") - print("F") - sleep(timelist[i]-timelist[i-1]) -'''