378 lines
11 KiB
Python
378 lines
11 KiB
Python
def is_data_stereo(raw_global_data:list) -> bool:
|
|
"""
|
|
self-explainatory
|
|
"""
|
|
try:
|
|
assert(raw_global_data[0][0])
|
|
except IndexError:
|
|
return False
|
|
except AssertionError:
|
|
return True
|
|
return True
|
|
|
|
def dist_to_integer(x):
|
|
ent = np.floor(x+0.5)
|
|
return np.abs(x - ent)
|
|
|
|
def is_note_within(fr1, fr2):
|
|
if(fr1 > fr2):
|
|
return (fr1/fr2 <= NOTE_DIST and dist_to_integer(fr1/fr2) >= OCTAVE_DIST)
|
|
else:
|
|
return (fr2/fr1 <= NOTE_DIST and dist_to_integer(fr2/fr1) >= OCTAVE_DIST)
|
|
|
|
def keep_highest(song_name, offset, songlen, segsize, count, output_name, minfreq=110, maxfreq=5000, ampthr=250):
|
|
# extracting data from cropped song
|
|
sample_rate, raw_song_data = wavfile.read(song_name)
|
|
blit = int(sample_rate*segsize) # Te
|
|
|
|
song_data = [0 for i in range(len(raw_song_data))]
|
|
|
|
id_start = int(offset*sample_rate)
|
|
id_end = min(len(raw_song_data), int((offset+songlen)*sample_rate))
|
|
|
|
a = 0
|
|
if(is_data_stereo(raw_song_data)):
|
|
print("Converting to mono...")
|
|
for x in range(id_start, id_end):
|
|
song_data[x] = raw_song_data[x][0]/2 + raw_song_data[x][1]/2
|
|
|
|
if(x % (int(len(raw_song_data)/100)) == 0):
|
|
print(a, "/ 100")
|
|
a += 1
|
|
else:
|
|
song_data = raw_song_data
|
|
|
|
print("\nSampleRate : ", sample_rate)
|
|
print("SegSize : ", blit)
|
|
|
|
# calculate the frequencies associated to the FFTs
|
|
pfreq = scp.fft.rfftfreq(blit, 1/sample_rate)
|
|
|
|
# left boundary of segment to crop
|
|
current_time = offset
|
|
|
|
# list of FFTs
|
|
fft_list = []
|
|
fft_list_untouched = []
|
|
|
|
# number of samples
|
|
k = 0
|
|
|
|
print("Retrieving freqs from", offset, "to", songlen+offset, "...")
|
|
while(current_time < songlen+offset-segsize):
|
|
# index corresponding to left boundary
|
|
left_id = int(current_time*sample_rate)
|
|
|
|
# index corresponding to right boundary
|
|
right_id = int((current_time+segsize)*sample_rate)
|
|
|
|
# calculate the fft, append it to fft_list
|
|
pff = scp.fft.rfft(song_data[int(current_time*sample_rate):int(sample_rate*(current_time+segsize))])
|
|
fft_list.append(pff)
|
|
fft_list_untouched.append([ee for ee in pff])
|
|
|
|
# just to avoid what causes 0.1 + 0.1 == 0.2 to be False
|
|
k += 1
|
|
current_time = offset + k*segsize
|
|
#print(current_time)
|
|
|
|
print("\n\nSegSize :", segsize, "\nFFT :", len(fft_list), "\nFFT[0] :", len(fft_list[0]), "\npfreq :", len(pfreq), "\n\n")
|
|
|
|
# -------------------------------------------- Clean song -------------------------------------------- #
|
|
pfreq_minid = 0
|
|
pfreq_maxid = len(pfreq) -1
|
|
while(pfreq[pfreq_minid] < minfreq):
|
|
for t in range(len(fft_list)):
|
|
fft_list[t][pfreq_minid] = 0+0j
|
|
pfreq_minid += 1
|
|
|
|
while(pfreq[pfreq_maxid] > maxfreq):
|
|
for t in range(len(fft_list)):
|
|
fft_list[t][pfreq_maxid] = 0+0j
|
|
pfreq_maxid -= 1
|
|
|
|
new_times = []
|
|
new_freqs = []
|
|
new_ampls = []
|
|
new_kept = []
|
|
|
|
# i = time, j = freq
|
|
for i in range(len(fft_list)):
|
|
#returns a list of couples [id, value]
|
|
elements = heapq.nlargest(count, enumerate(fft_list[i]), key=lambda x: x[1])
|
|
|
|
for idx in range(len(elements)):
|
|
if(elements[idx][0] < len(pfreq)):
|
|
new_times.append(offset + i*segsize)
|
|
new_freqs.append(pfreq[elements[idx][0]])
|
|
new_ampls.append(fft_list[i][elements[idx][0]])
|
|
|
|
# -------------------------------------------- Get amp distribution -------------------------------------------- #
|
|
new_new_amps = [0 for i in range(int(sample_rate*songlen))]
|
|
new_new_t = [offset + i/sample_rate for i in range(int(sample_rate*songlen))]
|
|
|
|
amp_ct = 0
|
|
incr_a = segsize*4
|
|
len_seg_a = int(sample_rate*incr_a)
|
|
count_a = len_seg_a//1250
|
|
left_0 = int(sample_rate*(amp_ct+offset))
|
|
while(amp_ct < songlen-segsize):
|
|
left = int(sample_rate*(amp_ct+offset))
|
|
right = int(sample_rate*(amp_ct+offset + incr_a))
|
|
|
|
#returns a list of couples [id, value]
|
|
elements = heapq.nlargest(count_a, enumerate([song_data[i] for i in range(left, right)]), key=lambda x: x[1])
|
|
|
|
amp_ct += incr_a
|
|
|
|
for idx in range(len(elements)):
|
|
new_new_amps[elements[idx][0]+left-left_0] = song_data[left+elements[idx][0]]
|
|
|
|
mmxx = max(new_new_amps)
|
|
new_new_amps = [nnw*1000/mmxx for nnw in new_new_amps]
|
|
|
|
# localize peaks
|
|
left_id = 0
|
|
right_id = 0
|
|
a_ampl = 0
|
|
in_seg = False
|
|
time_d = 0.035
|
|
cur_t = 0
|
|
|
|
locs = []
|
|
loct = []
|
|
for i in range(len(new_new_amps)):
|
|
if(new_new_amps[i] > 100):
|
|
if(not in_seg):
|
|
in_seg = True
|
|
left_id = i
|
|
right_id = i
|
|
a_ampl = max(a_ampl, new_new_amps[i])
|
|
cur_t = 0
|
|
else:
|
|
cur_t += 1/sample_rate
|
|
if(in_seg and cur_t >= time_d):
|
|
in_seg = False
|
|
locs.append(a_ampl)
|
|
loct.append((left_id + right_id)/(2*sample_rate) + offset)
|
|
|
|
a_ampl = 0
|
|
|
|
# detect sliders
|
|
sl_a = []
|
|
sl_t = []
|
|
in_slider = False
|
|
slider_dtct = segsize
|
|
for i in range(1, len(loct)-1):
|
|
delta = loct[i] - loct[i-1]
|
|
delta2 = loct[i+1] - loct[i]
|
|
if(delta < slider_dtct and delta2 < slider_dtct):
|
|
if(in_slider):
|
|
sl_t.append(loct[i])
|
|
sl_a.append(locs[i])
|
|
else:
|
|
in_slider = True
|
|
sl_t.append(loct[i-1])
|
|
sl_a.append(locs[i-1])
|
|
sl_t.append(loct[i])
|
|
sl_a.append(locs[i])
|
|
else:
|
|
in_slider = False
|
|
|
|
plt.plot(new_new_t, new_new_amps, "b-")
|
|
plt.plot(loct, locs, "ro")
|
|
plt.plot(sl_t, sl_a, "go")
|
|
plt.grid()
|
|
plt.show()
|
|
|
|
# -------------------------------------------- Localize -------------------------------------------- #
|
|
|
|
timing_points = []
|
|
last_freq = new_freqs[0]
|
|
for i in range(len(new_times)):
|
|
if(np.abs(new_ampls[i]) > ampthr and (i == 0 or not is_note_within(new_freqs[i], last_freq))):
|
|
last_freq = new_freqs[i]
|
|
timing_points.append(new_times[i])
|
|
new_kept.append(new_freqs[i])
|
|
else:
|
|
new_kept.append(0)
|
|
|
|
mx = max(new_ampls)
|
|
|
|
plt.plot(new_times, new_freqs)
|
|
plt.plot(new_times, [elt*1000/mx for elt in new_ampls])
|
|
plt.plot(new_times, new_kept, "ro")
|
|
plt.grid()
|
|
plt.show()
|
|
|
|
# -------------------------------------------- Write -------------------------------------------- #
|
|
i0 = 0
|
|
timing_points.append(999999)
|
|
|
|
write_freq = 880
|
|
write_cur = 0
|
|
write_id = -1
|
|
while(write_cur <= write_freq): # shouldnt seg fault
|
|
write_id += 1
|
|
write_cur = pfreq[write_id]
|
|
|
|
|
|
# remove
|
|
# i = time, j = freq
|
|
for i in range(len(fft_list)):
|
|
# retrieve dominant freq
|
|
if(segsize*i >= timing_points[i0]-offset):
|
|
i0 += 1
|
|
|
|
maxfreq = 0
|
|
maxfreqid = 0
|
|
maxamp = 0
|
|
for j in range(len(fft_list[0])):
|
|
if(np.abs(fft_list[i][j]) > maxamp):
|
|
maxamp = np.abs(fft_list[i][j])
|
|
maxfreq = pfreq[j]
|
|
maxfreqid = j
|
|
|
|
fft_list_untouched[i][write_id] = max(maxamp*2, 32767)
|
|
fft_list_untouched[i][write_id-1] = max(maxamp*2, 32767)
|
|
fft_list_untouched[i][write_id+1] = max(maxamp*2, 32767)
|
|
|
|
res = []
|
|
print("Converting...")
|
|
for i in range(len(fft_list_untouched)):
|
|
ift = scp.fft.irfft(fft_list_untouched[i], n=blit)
|
|
for k in ift:
|
|
res.append(k)
|
|
#print(type(res[0]))
|
|
mx = 0
|
|
for j in range(len(res)):
|
|
if(res[j] > mx):
|
|
mx = res[j]
|
|
|
|
for i in range(len(res)):
|
|
res[i] = np.int16(32767*res[i]/mx)
|
|
|
|
res = np.array(res)
|
|
wavfile.write(output_name, sample_rate, res)
|
|
|
|
return (loct, sl_t, timing_points) # amplitude result, sliders and frequency result
|
|
|
|
def convert_to_wav(song_name:str, output_file="audio.wav") -> str:
|
|
"""
|
|
Converts the song to .wav, only if it's not already in wave format.
|
|
Currently relies on file extension.
|
|
Returns: the song_name that should be used afterwards.
|
|
"""
|
|
extension = Path(song_name).suffix
|
|
if(extension == ".mp3" or extension == ".ogg"):
|
|
print("Converting to .wav...")
|
|
subprocess.run(["ffmpeg", "-y", "-i", song_name, output_file], shell=False)
|
|
return output_file
|
|
return song_name
|
|
|
|
def retrieve_all_from_song(filename, t0, t1, bpm, dta=0.001, dtf=0.01, threshold=0.06, show=True):
|
|
# dt = sample interval
|
|
# threshold is in percent
|
|
|
|
if(t1 <= t0):
|
|
print("ERROR : t1 <= t0\n")
|
|
exit(1)
|
|
|
|
# converts format to .wav
|
|
new_fn = convert_to_wav(filename)
|
|
|
|
print("Filtering song...")
|
|
#void_freq_clean(new_fn, t0, t1, dtf, 20, 20000, 0.05, "crop1.wav")
|
|
#def void_freq_clean(song_name, offset, songlen, segsize, minfreq, maxfreq, ampthr, output_name):
|
|
|
|
print("Now retrieving the frequencies")
|
|
(maxlist, maxamps) = retrieve_dominant_freqs(new_fn, t0, t1, dtf)
|
|
#def retrieve_dominant_freqs(song_name, offset, songlen, segsize):
|
|
|
|
print("Now retrieving the amplitudes")
|
|
amps = retrieve_dominant_amps(new_fn, t0, t1, dta, threshold, (4/(bpm/60))/4)
|
|
|
|
print("Len of freqs : ", len(maxlist), "|", len(maxamps))
|
|
print("Len of amps : ", len(maxlist), "|", len(amps))
|
|
|
|
maxa = amps[0]
|
|
for jj in amps:
|
|
if(jj > maxa):
|
|
maxa = jj
|
|
|
|
for i in range(len(amps)):
|
|
amps[i] = (amps[i] * 2000) / maxa
|
|
|
|
if(show):
|
|
timesF = [t0 + dtf*k for k in range(len(maxlist))]
|
|
timesA = [t0 + dta*k for k in range(len(amps))]
|
|
|
|
plt.plot(timesA, amps)
|
|
plt.plot(timesF, maxlist)
|
|
plt.show()
|
|
|
|
# free()
|
|
|
|
# c-type
|
|
SONG_LEN = 7
|
|
OFFSET = 0.042
|
|
BPM = 149.3
|
|
SEGSIZE = 1/(BPM/60)
|
|
wavved_song = convert_to_wav("ctype.mp3")
|
|
|
|
'''
|
|
# tetris_2
|
|
SONG_LEN = 10
|
|
OFFSET = 0
|
|
BPM = 157
|
|
SEGSIZE = 1/(BPM/60)
|
|
wavved_song = convert_to_wav("tetris_2.wav")
|
|
'''
|
|
'''
|
|
# test
|
|
SONG_LEN = 1
|
|
OFFSET = 0
|
|
BPM = 240
|
|
SEGSIZE = 1/(BPM/60)
|
|
'''
|
|
'''
|
|
# gmtn
|
|
SONG_LEN = 5
|
|
OFFSET = 1.652
|
|
BPM = 155
|
|
SEGSIZE = 1/(BPM/60)
|
|
wavved_song = convert_to_wav("furioso melodia.mp3")
|
|
'''
|
|
'''
|
|
# E
|
|
SONG_LEN = 30
|
|
OFFSET = 2.641
|
|
BPM = 155
|
|
SEGSIZE = 1/(BPM/60)
|
|
wavved_song = convert_to_wav("rushe.mp3")
|
|
'''
|
|
'''
|
|
# Tsubaki
|
|
SONG_LEN = 10
|
|
OFFSET = 35.659
|
|
BPM = 199
|
|
SEGSIZE = 1/(BPM/60)
|
|
wavved_song = convert_to_wav("TSUBAKI.mp3")
|
|
'''
|
|
'''
|
|
# death
|
|
SONG_LEN = 8
|
|
OFFSET = 21.750
|
|
BPM = 180
|
|
SEGSIZE = 1/(BPM/60)
|
|
wavved_song = convert_to_wav("songs/Night of Knights.mp3")
|
|
'''
|
|
|
|
#wavved_song = convert_to_wav("tetris_2.wav")
|
|
|
|
NOTE_DIST = (2**(1/4))
|
|
OCTAVE_DIST = 0.05
|
|
keep_highest(wavved_song, OFFSET, SONG_LEN, SEGSIZE/4, 1, "Zblit.wav", minfreq=300, maxfreq=3000, ampthr=500)
|
|
|
|
print("yipee")
|