osutipe/cleaned_sp.py

357 lines
11 KiB
Python

from math import *
import numpy as np
import scipy as scp
from scipy.io import wavfile
import matplotlib.pyplot as plt
import subprocess
import heapq
from pathlib import Path
from time import sleep
def is_data_stereo(raw_global_data:list) -> bool:
"""
self-explainatory
"""
try:
assert(raw_global_data[0][0])
except IndexError:
return False
except AssertionError:
return True
return True
def retrieve_dominant_freqs(song_name, offset, songlen, segsize):
# returns a list with peak frequencies alongside the sample rate
# /!\ song_name is specified to be a list, NOT a list of couples (aka song is mono)
# segsize is in seconds
# remove high_pitched/low-pitched frequencies
minfreq = 110
maxfreq = 440*6
# cutting the song to only keep the one we're interested in
subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(songlen+offset), "-i", song_name, "crop.wav"], shell=False)
subprocess.run(["clear"])
# extracting data from cropped song
sample_rate, raw_song_data = wavfile.read("crop.wav")
blit = int(sample_rate*segsize) # Te
song_data = [0 for i in range(len(raw_song_data))]
a = 0
if(is_data_stereo(raw_song_data)):
print("Converting to mono...")
for x in range(len(raw_song_data)):
song_data[x] = raw_song_data[x][0]/2 + raw_song_data[x][1]/2
if(x % (int(len(raw_song_data)/100)) == 0):
print(a, "/ 100")
a += 1
else:
song_data = raw_song_data
print("\nSampleRate : ", sample_rate)
print("SegSize : ", blit)
# remove the copy of the song
subprocess.run(["rm", "crop.wav"], shell=False)
# calculate the frequencies associated to the FFTs
pfreq = scp.fft.rfftfreq(blit, 1/sample_rate)
# left boundary of segment to crop
current_time = offset
# list of FFTs
fft_list = []
# number of samples
k = 0
print("Retrieving freqs from", offset, "to", songlen+offset, "...")
print("amplitudes are from", minfreq, "to", maxfreq)
while(current_time < songlen-segsize):
# index corresponding to left boundary
left_id = int(current_time*sample_rate)
# index corresponding to right boundary
right_id = int((current_time+segsize)*sample_rate)
# calculate the fft, append it to fft_list
pff = scp.fft.rfft(song_data[int(current_time*sample_rate):int(sample_rate*(current_time+segsize))])
fft_list.append(pff)
#print("(k =", k, ") :", left_id, "to", right_id)
# just to avoid what causes 0.1 + 0.1 == 0.2 to be False
k += 1
current_time = offset + k*segsize
#print(current_time)
# spacing between samples (time)
fe = segsize/sample_rate
# list that will contain the maximum frequencies/amplitudes for all FFTs
maxlist = []
maxamps = []
print("\n\nSegSize :", segsize, "\nFFT :", len(fft_list), "\nFFT[0] :", len(fft_list[0]), "\npfreq :", len(pfreq), "\n\n")
# find all maximums
for i in range(len(fft_list)):
current_max = -1
current_fmax = 0
for j in range(len(fft_list[i])):
if(j < len(pfreq) and pfreq[j] < maxfreq and pfreq[j] >= minfreq and np.abs(fft_list[i][j]) > current_max):
current_max = np.abs(fft_list[i][j])
current_fmax = pfreq[j]
maxlist.append(current_fmax)
maxamps.append(current_max)
# gg
# maxlist[i] corresponds to time (offset + i*segsize)
return (maxlist, maxamps)
def void_freq_clean(song_name, offset, songlen, segsize, minfreq, maxfreq, ampthr, output_name):
# removes unnecessary frequencies/amps from a song
# ampthr is in [0, 1]
# extracting data from cropped song
sample_rate, raw_song_data = wavfile.read(song_name)
blit = int(sample_rate*segsize) # Te
song_data = [0 for i in range(len(raw_song_data))]
a = 0
if(is_data_stereo(raw_song_data)):
print("Converting to mono...")
for x in range(len(raw_song_data)):
song_data[x] = raw_song_data[x][0]/2 + raw_song_data[x][1]/2
if(x % (int(len(raw_song_data)/100)) == 0):
print(a, "/ 100")
a += 1
else:
song_data = raw_song_data
# remove the copy of the song
subprocess.run(["rm", "crop.wav"], shell=False)
# calculate the frequencies associated to the FFTs
pfreq = scp.fft.rfftfreq(blit, 1/sample_rate)
# left boundary of segment to crop
current_time = offset
# list of FFTs
fft_list = []
# number of samples
k = 0
print("Retrieving freqs from", offset, "to", songlen+offset, "...")
print("amplitudes are from", minfreq, "to", maxfreq)
while(current_time < songlen-segsize):
# index corresponding to left boundary
left_id = int(current_time*sample_rate)
# index corresponding to right boundary
right_id = int((current_time+segsize)*sample_rate)
# calculate the fft, append it to fft_list
pff = scp.fft.rfft(song_data[int(current_time*sample_rate):int(sample_rate*(current_time+segsize))])
fft_list.append(pff)
# just to avoid what causes 0.1 + 0.1 == 0.2 to be False
k += 1
current_time = offset + k*segsize
#print(current_time)
print("\n\nSegSize :", segsize, "\nFFT :", len(fft_list), "\nFFT[0] :", len(fft_list[0]), "\npfreq :", len(pfreq), "\n\n")
# remove
for i in range(len(fft_list)):
# get the local max freq
lmax = 0
for j in range(len(fft_list[i])):
if(np.abs(fft_list[i][j]) > lmax):
lmax = np.abs(fft_list[i][j])
# remove freqs + amps
for j in range(len(fft_list[i])):
if(pfreq[j] <= minfreq or pfreq[j] >= maxfreq):
fft_list[i][j] = 0+0j
if(np.abs(fft_list[i][j]) <= lmax*ampthr):
fft_list[i][j] = 0+0j
# writing new .wav
res = []
print("Converting...")
for i in range(len(fft_list)):
ift = scp.fft.irfft(fft_list[i], n=blit)
for k in ift:
res.append(k)
#print(type(res[0]))
mx = 0
for j in range(len(res)):
if(res[j] > mx):
mx = res[j]
for i in range(len(res)):
res[i] = np.int16(32767*res[i]/mx)
res = np.array(res)
wavfile.write(output_name, sample_rate, res)
def retrieve_dominant_amps(song_name, offset, songlen, segsize, percent):
# returns a list with the percent% peak amplitudes alongside the sample rate
# /!\ song_name is specified to be a list, NOT a list of couples (aka song is mono)
# segsize is in seconds
# cutting the song to only keep the one we're interested in
subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(songlen+offset), "-i", song_name, "crop.wav"], shell=False)
subprocess.run(["clear"])
# extracting data from cropped song
sample_rate, raw_song_data = wavfile.read("crop.wav")
blit = int(sample_rate*segsize) # Te
# in case song has stereo format, conversion to mono
song_data = [0 for i in range(len(raw_song_data))]
a = 0
if(is_data_stereo(raw_song_data)):
print("Converting to mono...")
for x in range(len(raw_song_data)):
song_data[x] = raw_song_data[x][0]/2 + raw_song_data[x][1]/2
if(x % (int(len(raw_song_data)/100)) == 0):
print(a, "/ 100")
a += 1
else:
song_data = raw_song_data
# which notes will be voided
is_locked = [False for i in range(len(song_data))]
x = int((len(song_data)*percent)//100)
print("Retreiving the", int(x), "/", len(song_data), "highest values")
elements = heapq.nlargest(int(x), enumerate(song_data), key=lambda x: x[1])
#returns a list of couples [id, value]
for idx in range(len(elements)):
is_locked[elements[idx][0]] = True
for r in range(len(song_data)):
if(is_locked[r] == False):
song_data[r] = 0
# now we need to reduce song_data so that it matches the length of the previous function's return
res = []
k = 0
current_time = offset
while(current_time < songlen-segsize):
# index corresponding to left boundary
left_id = int(current_time*sample_rate)
# index corresponding to right boundary
right_id = int((current_time+segsize)*sample_rate)
# merge the segment into one value
cmax = 0
for i in range(left_id, right_id):
if(i < len(song_data) and cmax < song_data[i]):
cmax = song_data[i]
res.append(cmax)
k += 1
current_time = offset + k*segsize
# gg
# res[i] corresponds to time (offset + i*segsize)
return res
def convert_to_wav(song_name:str, output_file="audio.wav") -> str:
"""
Converts the song to .wav, only if it's not already in wave format.
Currently relies on file extension.
Returns: the song_name that should be used afterwards.
"""
extension = Path(song_name).suffix
match extension:
case ".mp3" | ".ogg":
print("Converting to .wav...")
subprocess.run(["ffmpeg", "-y", "-i", song_name, output_file], shell=False)
return output_file
return song_name
def retrieve_all_from_song(filename, t0, t1, dta=0.001, dtf=0.01, threshold=0.1, show=True):
# dt = sample interval
# threshold is in percent
if(t1 <= t0):
print("ERROR : t1 <= t0\n")
exit(1)
# converts format to .wav
new_fn = convert_to_wav(filename)
# crop the song to the part that will be mapped
subprocess.run(["ffmpeg", "-ss", str(t0), "-t", str(t1), "-i", new_fn, "crop0.wav"], shell=False)
subprocess.run(["clear"])
sample_rate, _ = wavfile.read("crop0.wav")
print("Filtering song...")
#void_freq_clean(new_fn, t0, t1-t0, dt, 200, 2500, 0.05, "crop1.wav")
print("Now retrieving the frequencies")
(maxlist, maxamps) = retrieve_dominant_freqs(new_fn, t0, t1-t0, dtf)
print("Now retrieving the amplitudes")
amps = retrieve_dominant_amps(new_fn, t0, t1-t0, dta, threshold)
print("Len of freqs : ", len(maxlist), "|", len(maxamps))
print("Len of amps : ", len(maxlist), "|", len(amps))
maxa = amps[0]
for jj in amps:
if(jj > maxa):
maxa = jj
for i in range(len(amps)):
amps[i] = (amps[i] * 2000) / maxa
if(show):
timesF = [t0 + dtf*k for k in range(len(maxlist))]
timesA = [t0 + dta*k for k in range(len(amps))]
plt.plot(timesA, amps)
plt.plot(timesF, maxlist)
plt.show()
# free()
subprocess.run(["rm", "crop0.wav"], shell=False)
retrieve_all_from_song("tetris_4.wav", 0, 5, dtf=0.375/2)
print("yipee")
print(1/(160/60));