osutipe/cleaned_sp.py

472 lines
14 KiB
Python

from math import *
import numpy as np
import scipy as scp
from scipy.io import wavfile
import matplotlib.pyplot as plt
import subprocess
import heapq
from pathlib import Path
from time import sleep
import datetime
def is_data_stereo(raw_global_data:list) -> bool:
"""
self-explainatory
"""
try:
assert(raw_global_data[0][0])
except IndexError:
return False
except AssertionError:
return True
return True
def dist_to_integer(x):
ent = np.floor(x)
if(ent < 0.5):
return ent
else:
return (1-ent)
def is_note_within(fr1, fr2):
if(fr1 > fr2):
return (fr1/fr2 <= NOTE_DIST or dist_to_integer(fr1/fr2) >= OCTAVE_DIST) # same tone or octave
else:
return (fr2/fr1 <= NOTE_DIST or dist_to_integer(fr2/fr1) >= OCTAVE_DIST)
def keep_highest(song_name, offset, songlen, segsize, count, output_name, minfreq=110, maxfreq=5000, ampthr=250, canPlot=True, writeO = True):
'''
INPUT : data relative to music + config about the analysis
OUTPUT :
* a list of timings : it contains floats (representing circles) and couple of floats (representing sliders) (e.g. [float, float])
* a list of amplitudes relative to timings
'''
# extracting data from cropped song
sample_rate, raw_song_data = wavfile.read(song_name)
blit = int(sample_rate*segsize) # Te
song_data = [0 for i in range(len(raw_song_data))]
id_start = int(offset*sample_rate)
id_end = min(len(raw_song_data), int((offset+songlen)*sample_rate))
a = 0
if(is_data_stereo(raw_song_data)):
print("Converting to mono...")
for x in range(id_start, id_end):
song_data[x] = raw_song_data[x][0]/2 + raw_song_data[x][1]/2
if(x % (int(len(raw_song_data)/100)) == 0):
print(a, "/ 100")
a += 1
else:
song_data = raw_song_data
print("\nSampleRate : ", sample_rate)
print("SegSize : ", blit)
# calculate the frequencies associated to the FFTs
pfreq = scp.fft.rfftfreq(blit, 1/sample_rate)
# left boundary of segment to crop
current_time = offset
# list of FFTs
fft_list = []
fft_list_untouched = []
# number of samples
k = 0
print("Retrieving freqs from", offset, "to", songlen+offset, "...")
while(current_time < songlen+offset-segsize):
# index corresponding to left boundary
left_id = int(current_time*sample_rate)
# index corresponding to right boundary
right_id = int((current_time+segsize)*sample_rate)
# calculate the fft, append it to fft_list
pff = scp.fft.rfft(song_data[int(current_time*sample_rate):int(sample_rate*(current_time+segsize))])
fft_list.append(pff)
fft_list_untouched.append([ee for ee in pff])
# just to avoid what causes 0.1 + 0.1 == 0.2 to be False
k += 1
current_time = offset + k*segsize
#print(current_time)
print("\n\nSegSize :", segsize, "\nFFT :", len(fft_list), "\nFFT[0] :", len(fft_list[0]), "\npfreq :", len(pfreq), "\n\n")
# -------------------------------------------- Clean song -------------------------------------------- #
pfreq_minid = 0
pfreq_maxid = len(pfreq) -1
while(pfreq[pfreq_minid] < minfreq):
for t in range(len(fft_list)):
fft_list[t][pfreq_minid] = 0+0j
pfreq_minid += 1
while(pfreq[pfreq_maxid] > maxfreq):
for t in range(len(fft_list)):
fft_list[t][pfreq_maxid] = 0+0j
pfreq_maxid -= 1
new_times = []
new_freqs = []
new_ampls = []
new_kept = []
# i = time, j = freq
for i in range(len(fft_list)):
#returns a list of couples [id, value]
elements = heapq.nlargest(count, enumerate(fft_list[i]), key=lambda x: x[1])
for idx in range(len(elements)):
if(elements[idx][0] < len(pfreq)):
new_times.append(offset + i*segsize)
new_freqs.append(pfreq[elements[idx][0]])
new_ampls.append(fft_list[i][elements[idx][0]])
# -------------------------------------------- Get amp distribution -------------------------------------------- #
new_new_amps = [0 for i in range(int(sample_rate*songlen))]
new_new_t = [offset + i/sample_rate for i in range(int(sample_rate*songlen))]
amp_ct = 0
incr_a = segsize*4
len_seg_a = int(sample_rate*incr_a)
count_a = len_seg_a//1000
left_0 = int(sample_rate*(amp_ct+offset))
while(amp_ct < songlen-segsize):
left = int(sample_rate*(amp_ct+offset))
right = int(sample_rate*(amp_ct+offset + incr_a))
#returns a list of couples [id, value]
elements = heapq.nlargest(count_a, enumerate([song_data[i] for i in range(left, right)]), key=lambda x: x[1])
amp_ct += incr_a
for idx in range(len(elements)):
try:
new_new_amps[elements[idx][0]+left-left_0] = song_data[left+elements[idx][0]]
except:
pass
mmxx = max(new_new_amps)
new_new_amps = [nnw*1000/mmxx for nnw in new_new_amps]
# localize peaks
left_id = 0
right_id = 0
a_ampl = 0
in_seg = False
time_d = 0.035
cur_t = 0
last_t = -10.0
locs = [] # amplitudes
loct = [] # times
for i in range(len(new_new_amps)):
if(new_new_amps[i] > 100):
if(not in_seg):
in_seg = True
left_id = i
right_id = i
a_ampl = max(a_ampl, new_new_amps[i])
cur_t = 0
else:
cur_t += 1/sample_rate
if(in_seg and cur_t >= time_d):
in_seg = False
delta_t = (right_id - left_id)/sample_rate
if(np.abs(left_id/sample_rate - last_t) >= 0.01): # these notes are less than 10ms apart !
last_t = right_id/sample_rate
if(delta_t < segsize*1.1):
locs.append(a_ampl)
loct.append((left_id + right_id)/(2*sample_rate) + offset)
else:
locs.append(a_ampl)
loct.append([left_id/sample_rate + offset, right_id/sample_rate + offset])
a_ampl = 0
# -------------------------------------------- Compute freqs -------------------------------------------- #
ssize_0 = segsize/3
locf = [] # frequencies
for k in range(len(locs)):
ktime = 0
ssize = ssize_0
if(type(loct[k]) == float): # circle
ktime = loct[k]
else: # slider
ktime = (loct[k][1]+loct[k][0])/2
ssize = max((loct[k][1]-loct[k][0])/2, ssize_0)
left_id = max(0, int((ktime-ssize/2)*sample_rate))
right_id = min(int((ktime+ssize/2)*sample_rate), len(song_data))
# calculate the fft
pff = scp.fft.rfft(song_data[left_id:right_id])
fmax = pfreq[0]
fampmax = 0
for i in range(1, len(pff)):
if(pfreq[i] > minfreq and pfreq[i] < maxfreq and fampmax < np.abs(pff[i])):
fmax = pfreq[i]
fampmax = np.abs(pff[i])
locf.append(fmax)
# -------------------------------------------- Merge -------------------------------------------- #
k = 0
while(k < len(locs)):
delta_t = 0
if(type(loct[k]) == float):
delta_t += loct[k]
else:
delta_t += (loct[k][0] + loct[k][1])/2
if(type(loct[k-1]) == float):
delta_t -= loct[k-1]
else:
delta_t -= (loct[k-1][0] + loct[k-1][1])/2
if(k > 0 and np.abs(delta_t) < segsize and np.abs(locs[k] - locs[k-1]) < 50 and is_note_within(locf[k], locf[k-1])):
loct[k-1] = [loct[k-1], loct[k]]
locs[k-1] = (locs[k-1] + locs[k])/2
loct[k] = -1
locs[k] = -1
locf[k] = -1
loct.remove(-1)
locs.remove(-1)
locf.remove(-1)
k += 1
# -------------------------------------------- Plot -------------------------------------------- #
if(canPlot):
plt_loct_all = []
plt_loct = []
plt_locs = []
plt_slidt = []
plt_slids = []
for i in range(len(loct)):
if(type(loct[i]) == float):
plt_loct_all.append(loct[i])
plt_loct.append(loct[i])
plt_locs.append(locs[i])
else:
plt_loct_all.append(loct[i][0])
plt_slidt.append(loct[i][0])
plt_slidt.append(loct[i][1])
plt_slids.append(locs[i])
plt_slids.append(locs[i])
plt.plot(new_new_t, new_new_amps, "y-", label="amplitude (ua)")
plt.plot(plt_loct, plt_locs, "ro", label="circles")
plt.plot(plt_slidt, plt_slids, "go", label="sliders")
plt.plot(plt_loct_all, locf, "mo", label="frequencies (Hz)")
plt.legend(loc="upper left")
'''plt.plot(new_times, new_freqs)
plt.plot(new_times, [elt*1000/mx for elt in new_ampls])
plt.plot(new_times, new_kept, "bo")'''
plt.grid()
plt.show()
# -------------------------------------------- Write -------------------------------------------- #
if(writeO):
f = open("result_bad_apple[90].txt", "w")
f.write("Song name : " + song_name + "\n")
f.write("Start : " + str(offset) + "\n")
f.write("End : " + str(offset+songlen) + "\n\n")
f.write("Hit Objects : \n")
for ct in loct:
f.write(str(ct))
f.write("\n")
f.close()
return (loct, locs)
def convert_to_wav(song_name:str, output_file="audio.wav") -> str:
"""
Converts the song to .wav, only if it's not already in wave format.
Currently relies on file extension.
Returns: the song_name that should be used afterwards.
"""
extension = Path(song_name).suffix
if(extension == ".mp3" or extension == ".ogg"):
print("Converting to .wav...")
subprocess.run(["ffmpeg", "-y", "-i", song_name, output_file], shell=False)
return output_file
return song_name
'''
# c-type
SONG_LEN = 7
OFFSET = 0.042
BPM = 149.3
SEGSIZE = 1/(BPM/60)
wavved_song = convert_to_wav("songs/ctype.mp3")
'''
'''
# tetris_2
SONG_LEN = 14
OFFSET = 0
BPM = 157
SEGSIZE = 1/(BPM/60)
wavved_song = convert_to_wav("songs/tetris_2.wav")
'''
'''
# test
SONG_LEN = 1
OFFSET = 0
BPM = 240
SEGSIZE = 1/(BPM/60)
'''
'''
# gmtn
SONG_LEN = 5
OFFSET = 1.652
BPM = 155
SEGSIZE = 1/(BPM/60)
wavved_song = convert_to_wav("songs/furioso melodia.mp3")
'''
'''
# E
SONG_LEN = 15
OFFSET = 2.641
BPM = 155
SEGSIZE = 1/(BPM/60)
wavved_song = convert_to_wav("songs/rushe.mp3")
'''
'''
# Tsubaki
SONG_LEN = 20
OFFSET = 35.659
BPM = 199
SEGSIZE = 1/(BPM/60)
wavved_song = convert_to_wav("songs/TSUBAKI.mp3")
'''
'''
# Owen 1/2
SONG_LEN = 20
OFFSET = 1.008
BPM = 157
SEGSIZE = 1/(BPM/60)
wavved_song = convert_to_wav("songs/owen(157.00024-1008).mp3")
'''
'''
# Owen 2/2
SONG_LEN = 7
OFFSET = 25.466
BPM = 157
SEGSIZE = 1/(BPM/60)
wavved_song = convert_to_wav("songs/owen(157.00024-1008).mp3")
'''
# death
SONG_LEN = 10
OFFSET = 21.750
BPM = 180
SEGSIZE = 1/(BPM/60)
wavved_song = convert_to_wav("songs/Night of Knights.mp3")
'''
# Bad apple
SONG_LEN = 15
OFFSET = 0.152
BPM = 138
SEGSIZE = 1/(BPM/60)
#wavved_song = convert_to_wav("songs/Bad apple (138-152).mp3")
wavved_song = convert_to_wav("songs/Bad apple (138-152)[filtered].wav")
'''
'''
# Freedom dive
SONG_LEN = 7
OFFSET = 1.058
BPM = 222.22
SEGSIZE = 1/(BPM/60)
#wavved_song = convert_to_wav("songs/Freedom Dive (222.22-1058).mp3")
wavved_song = convert_to_wav("songs/Freedom Dive (222.22-1058)[filtered].wav")
'''
'''
# Mevalogania
SONG_LEN = 7
OFFSET = 7.984
BPM = 240
SEGSIZE = 1/(BPM/60)
#wavved_song = convresult_bad_appleert_to_wav("songs/Megalovania(240-7984).mp3")
wavved_song = convert_to_wav("songs/Megalovania(240-7984)[filtered].wav")
'''
'''
SONG_LEN = 0 # length of the song, in seconds
OFFSET = 0 # offset of the 1st note (aka time offset of the first red bar), in seconds
BPM = 0 # BPM
wavved_song = convert_to_wav("insert_song_name_here.wav")
'''
# Do not touch
DIVIDER = 4 # note divider
SEGSIZE = 1/(BPM/60)
NOTE_DIST = (2**(1/4))
OCTAVE_DIST = 0.05
# keep_highest(song_name, offset, songlen, segsize, count, output_name, minfreq=110, maxfreq=5000, ampthr=250):
(loct, locs) = keep_highest(wavved_song, OFFSET, SONG_LEN, SEGSIZE/DIVIDER, 1, "Zblit.wav", minfreq=220, maxfreq=3000, ampthr=800)
'''
minfreq and maxfred are thresholds for frequency analysts (anything outside of [minfreq, maxfreq] will not be accounted for)
ampthr is a threshold for amplitude (arbitrary unit)
'''
''' you can deactivate this if you want (show timings points in terminal) '''
'''
import time
import random
loct2 = []
for k in loct:
if(type(k) == float):
loct2.append(k)
else:
loct2.append(k[0])
loct2.append(k[1])
for i in range(len(loct2)-1):
print("*"*(random.randint(10, 100)))
time.sleep(loct2[i+1]-loct2[i])
print("yipee")
'''
# complexity test
fl = open("complexity.txt", "w")
# f.write("Song name : " + song_name + "\n")
'''
deltat = []
compl = []
for end in range(2,120):
st = datetime.datetime.now()
(e, ee) = keep_highest(wavved_song, OFFSET, OFFSET+end/2, SEGSIZE/DIVIDER, 1, "Zblit.wav", minfreq=220, maxfreq=3000, ampthr=800, canPlot=False,writeO=False)
et = datetime.datetime.now()
dt = et.microsecond - st.microsecond + (et.second - st.second)*1000000 + (et.minute - st.minute)/60
if(dt>0):
deltat.append(end/2)
compl.append(dt)
plt.plot(deltat, compl, "y-")
plt.plot(deltat, compl, "ro")
plt.xlabel("size of the song")
plt.ylabel("time complexity (us)")
plt.grid()
plt.show()
fl.close()
'''