from math import * import numpy as np import scipy as scp from scipy.io import wavfile import matplotlib.pyplot as plt import subprocess import heapq from pathlib import Path from time import sleep def is_data_stereo(raw_global_data:list) -> bool: """ self-explainatory """ try: assert(raw_global_data[0][0]) except IndexError: return False except AssertionError: return True return True def dist_to_integer(x): ent = np.floor(x) if(ent < 0.5): return ent else: return (1-ent) def is_note_within(fr1, fr2): if(fr1 > fr2): return (fr1/fr2 <= NOTE_DIST or dist_to_integer(fr1/fr2) >= OCTAVE_DIST) # same tone or octave else: return (fr2/fr1 <= NOTE_DIST or dist_to_integer(fr2/fr1) >= OCTAVE_DIST) def keep_highest(song_name, offset, songlen, segsize, count, output_name, minfreq=110, maxfreq=5000, ampthr=250): ''' INPUT : data relative to music + config about the analysis OUTPUT : * a list of timings : it contains floats (representing circles) and couple of floats (representing sliders) (e.g. [float, float]) * a list of amplitudes relative to timings ''' # extracting data from cropped song sample_rate, raw_song_data = wavfile.read(song_name) blit = int(sample_rate*segsize) # Te song_data = [0 for i in range(len(raw_song_data))] id_start = int(offset*sample_rate) id_end = min(len(raw_song_data), int((offset+songlen)*sample_rate)) a = 0 if(is_data_stereo(raw_song_data)): print("Converting to mono...") for x in range(id_start, id_end): song_data[x] = raw_song_data[x][0]/2 + raw_song_data[x][1]/2 if(x % (int(len(raw_song_data)/100)) == 0): print(a, "/ 100") a += 1 else: song_data = raw_song_data print("\nSampleRate : ", sample_rate) print("SegSize : ", blit) # calculate the frequencies associated to the FFTs pfreq = scp.fft.rfftfreq(blit, 1/sample_rate) # left boundary of segment to crop current_time = offset # list of FFTs fft_list = [] fft_list_untouched = [] # number of samples k = 0 print("Retrieving freqs from", offset, "to", songlen+offset, "...") while(current_time < songlen+offset-segsize): # index corresponding to left boundary left_id = int(current_time*sample_rate) # index corresponding to right boundary right_id = int((current_time+segsize)*sample_rate) # calculate the fft, append it to fft_list pff = scp.fft.rfft(song_data[int(current_time*sample_rate):int(sample_rate*(current_time+segsize))]) fft_list.append(pff) fft_list_untouched.append([ee for ee in pff]) # just to avoid what causes 0.1 + 0.1 == 0.2 to be False k += 1 current_time = offset + k*segsize #print(current_time) print("\n\nSegSize :", segsize, "\nFFT :", len(fft_list), "\nFFT[0] :", len(fft_list[0]), "\npfreq :", len(pfreq), "\n\n") # -------------------------------------------- Clean song -------------------------------------------- # pfreq_minid = 0 pfreq_maxid = len(pfreq) -1 while(pfreq[pfreq_minid] < minfreq): for t in range(len(fft_list)): fft_list[t][pfreq_minid] = 0+0j pfreq_minid += 1 while(pfreq[pfreq_maxid] > maxfreq): for t in range(len(fft_list)): fft_list[t][pfreq_maxid] = 0+0j pfreq_maxid -= 1 new_times = [] new_freqs = [] new_ampls = [] new_kept = [] # i = time, j = freq for i in range(len(fft_list)): #returns a list of couples [id, value] elements = heapq.nlargest(count, enumerate(fft_list[i]), key=lambda x: x[1]) for idx in range(len(elements)): if(elements[idx][0] < len(pfreq)): new_times.append(offset + i*segsize) new_freqs.append(pfreq[elements[idx][0]]) new_ampls.append(fft_list[i][elements[idx][0]]) # -------------------------------------------- Get amp distribution -------------------------------------------- # new_new_amps = [0 for i in range(int(sample_rate*songlen))] new_new_t = [offset + i/sample_rate for i in range(int(sample_rate*songlen))] amp_ct = 0 incr_a = segsize*4 len_seg_a = int(sample_rate*incr_a) count_a = len_seg_a//1000 left_0 = int(sample_rate*(amp_ct+offset)) while(amp_ct < songlen-segsize): left = int(sample_rate*(amp_ct+offset)) right = int(sample_rate*(amp_ct+offset + incr_a)) #returns a list of couples [id, value] elements = heapq.nlargest(count_a, enumerate([song_data[i] for i in range(left, right)]), key=lambda x: x[1]) amp_ct += incr_a for idx in range(len(elements)): new_new_amps[elements[idx][0]+left-left_0] = song_data[left+elements[idx][0]] mmxx = max(new_new_amps) new_new_amps = [nnw*1000/mmxx for nnw in new_new_amps] # localize peaks left_id = 0 right_id = 0 a_ampl = 0 in_seg = False time_d = 0.035 cur_t = 0 last_t = -10.0 locs = [] # amplitudes loct = [] # times for i in range(len(new_new_amps)): if(new_new_amps[i] > 100): if(not in_seg): in_seg = True left_id = i right_id = i a_ampl = max(a_ampl, new_new_amps[i]) cur_t = 0 else: cur_t += 1/sample_rate if(in_seg and cur_t >= time_d): in_seg = False delta_t = (right_id - left_id)/sample_rate if(np.abs(left_id/sample_rate - last_t) >= 0.01): # these notes are less than 10ms apart ! last_t = right_id/sample_rate if(delta_t < segsize*1.1): locs.append(a_ampl) loct.append((left_id + right_id)/(2*sample_rate) + offset) else: locs.append(a_ampl) loct.append([left_id/sample_rate + offset, right_id/sample_rate + offset]) a_ampl = 0 # -------------------------------------------- Compute freqs -------------------------------------------- # ssize_0 = segsize/3 locf = [] # frequencies for k in range(len(locs)): ktime = 0 ssize = ssize_0 if(type(loct[k]) == float): # circle ktime = loct[k] else: # slider ktime = (loct[k][1]+loct[k][0])/2 ssize = max((loct[k][1]-loct[k][0])/2, ssize_0) left_id = max(0, int((ktime-ssize/2)*sample_rate)) right_id = min(int((ktime+ssize/2)*sample_rate), len(song_data)) # calculate the fft pff = scp.fft.rfft(song_data[left_id:right_id]) fmax = pfreq[0] fampmax = 0 for i in range(1, len(pff)): if(pfreq[i] > minfreq and pfreq[i] < maxfreq and fampmax < np.abs(pff[i])): fmax = pfreq[i] fampmax = np.abs(pff[i]) locf.append(fmax) # -------------------------------------------- Merge -------------------------------------------- # k = 0 while(k < len(locs)): delta_t = 0 if(type(loct[k]) == float): delta_t += loct[k] else: delta_t += (loct[k][0] + loct[k][1])/2 if(type(loct[k-1]) == float): delta_t -= loct[k-1] else: delta_t -= (loct[k-1][0] + loct[k-1][1])/2 if(k > 0 and np.abs(delta_t) < segsize and np.abs(locs[k] - locs[k-1]) < 50 and is_note_within(locf[k], locf[k-1])): loct[k-1] = [loct[k-1], loct[k]] locs[k-1] = (locs[k-1] + locs[k])/2 loct[k] = -1 locs[k] = -1 locf[k] = -1 loct.remove(-1) locs.remove(-1) locf.remove(-1) k += 1 # -------------------------------------------- Plot -------------------------------------------- # plt_loct_all = [] plt_loct = [] plt_locs = [] plt_slidt = [] plt_slids = [] for i in range(len(loct)): if(type(loct[i]) == float): plt_loct_all.append(loct[i]) plt_loct.append(loct[i]) plt_locs.append(locs[i]) else: plt_loct_all.append(loct[i][0]) plt_slidt.append(loct[i][0]) plt_slidt.append(loct[i][1]) plt_slids.append(locs[i]) plt_slids.append(locs[i]) plt.plot(new_new_t, new_new_amps, "y-", label="amplitude (ua)") plt.plot(plt_loct, plt_locs, "ro", label="circles") plt.plot(plt_slidt, plt_slids, "go", label="sliders") plt.plot(plt_loct_all, locf, "mo", label="frequencies (Hz)") plt.legend(loc="upper left") '''plt.plot(new_times, new_freqs) plt.plot(new_times, [elt*1000/mx for elt in new_ampls]) plt.plot(new_times, new_kept, "bo")''' plt.grid() plt.show() # -------------------------------------------- Write -------------------------------------------- # return (loct, locs) def convert_to_wav(song_name:str, output_file="audio.wav") -> str: """ Converts the song to .wav, only if it's not already in wave format. Currently relies on file extension. Returns: the song_name that should be used afterwards. """ extension = Path(song_name).suffix if(extension == ".mp3" or extension == ".ogg"): print("Converting to .wav...") subprocess.run(["ffmpeg", "-y", "-i", song_name, output_file], shell=False) return output_file return song_name def retrieve_all_from_song(filename, t0, t1, bpm, dta=0.001, dtf=0.01, threshold=0.06, show=True): # dt = sample interval # threshold is in percent if(t1 <= t0): print("ERROR : t1 <= t0\n") exit(1) # converts format to .wav new_fn = convert_to_wav(filename) print("Filtering song...") #void_freq_clean(new_fn, t0, t1, dtf, 20, 20000, 0.05, "crop1.wav") #def void_freq_clean(song_name, offset, songlen, segsize, minfreq, maxfreq, ampthr, output_name): print("Now retrieving the frequencies") (maxlist, maxamps) = retrieve_dominant_freqs(new_fn, t0, t1, dtf) #def retrieve_dominant_freqs(song_name, offset, songlen, segsize): print("Now retrieving the amplitudes") amps = retrieve_dominant_amps(new_fn, t0, t1, dta, threshold, (4/(bpm/60))/4) print("Len of freqs : ", len(maxlist), "|", len(maxamps)) print("Len of amps : ", len(maxlist), "|", len(amps)) maxa = amps[0] for jj in amps: if(jj > maxa): maxa = jj for i in range(len(amps)): amps[i] = (amps[i] * 2000) / maxa if(show): timesF = [t0 + dtf*k for k in range(len(maxlist))] timesA = [t0 + dta*k for k in range(len(amps))] plt.plot(timesA, amps) plt.plot(timesF, maxlist) plt.show() # free() NOTE_DIST = (2**(1/4)) OCTAVE_DIST = 0.05 if __name__ == '__main__': ''' # c-type SONG_LEN = 7 OFFSET = 0.042 BPM = 149.3 SEGSIZE = 1/(BPM/60) wavved_song = convert_to_wav("songs/ctype.mp3") ''' ''' # tetris_2 SONG_LEN = 10 OFFSET = 0 BPM = 157 SEGSIZE = 1/(BPM/60) wavved_song = convert_to_wav("songs/tetris_2.wav") ''' ''' # test SONG_LEN = 1 OFFSET = 0 BPM = 240 SEGSIZE = 1/(BPM/60) ''' ''' # gmtn SONG_LEN = 5 OFFSET = 1.652 BPM = 155 SEGSIZE = 1/(BPM/60) wavved_song = convert_to_wav("songs/furioso melodia.mp3") ''' ''' # E SONG_LEN = 15 OFFSET = 2.641 BPM = 155 SEGSIZE = 1/(BPM/60) wavved_song = convert_to_wav("songs/rushe.mp3") ''' ''' # Tsubaki SONG_LEN = 10 OFFSET = 35.659 BPM = 199 SEGSIZE = 1/(BPM/60) wavved_song = convert_to_wav("songs/TSUBAKI.mp3") ''' ''' # death SONG_LEN = 8 OFFSET = 21.750 BPM = 180 SEGSIZE = 1/(BPM/60) wavved_song = convert_to_wav("songs/Night of Knights.mp3") ''' ''' # Bad apple SONG_LEN = 8 OFFSET = 0.152 BPM = 138 SEGSIZE = 1/(BPM/60) #wavved_song = convert_to_wav("songs/Bad apple (138-152).mp3") wavved_song = convert_to_wav("songs/Bad apple (138-152)[filtered].wav") ''' ''' # Freedom dive SONG_LEN = 7 OFFSET = 1.058 BPM = 222.22 SEGSIZE = 1/(BPM/60) #wavved_song = convert_to_wav("songs/Freedom Dive (222.22-1058).mp3") wavved_song = convert_to_wav("songs/Freedom Dive (222.22-1058)[filtered].wav") ''' # Mevalogania SONG_LEN = 7 OFFSET = 0 BPM = 240 SEGSIZE = 1/(BPM/60) #wavved_song = convert_to_wav("songs/Megalovania(240-7984).mp3") wavved_song = convert_to_wav("songs/Megalovania(240-7984)[filtered].wav") #wavved_song = convert_to_wav("songs/tetris_2.wav") keep_highest(wavved_song, OFFSET, SONG_LEN, SEGSIZE/4, 1, "Zblit.wav", minfreq=300, maxfreq=3000, ampthr=500) print("yipee")