general refactoring of folder + rework of most functions

This commit is contained in:
Alexandre 2024-12-10 20:19:28 +01:00
parent a59c0c4e08
commit 421cddf267
22 changed files with 202 additions and 356 deletions

BIN
Zblit.wav Normal file

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

BIN
audio.wav

Binary file not shown.

View File

@ -324,6 +324,166 @@ def localize_frequencies(song_name, offset, songlen, segsize, output_name):
res = np.array(res)
wavfile.write(output_name, sample_rate, res)
NOTE_DIST = (2**(1/12))
def is_note_within(fr1, fr2):
if(fr1 > fr2):
return (fr1/fr2 <= NOTE_DIST)
else:
return (fr2/fr1 <= NOTE_DIST)
def keep_highest(song_name, offset, songlen, segsize, count, output_name, minfreq=110, maxfreq=5000):
# extracting data from cropped song
sample_rate, raw_song_data = wavfile.read(song_name)
blit = int(sample_rate*segsize) # Te
song_data = [0 for i in range(len(raw_song_data))]
id_start = int(offset*sample_rate)
id_end = min(len(raw_song_data), int((offset+songlen)*sample_rate))
a = 0
if(is_data_stereo(raw_song_data)):
print("Converting to mono...")
for x in range(id_start, id_end):
song_data[x] = raw_song_data[x][0]/2 + raw_song_data[x][1]/2
if(x % (int(len(raw_song_data)/100)) == 0):
print(a, "/ 100")
a += 1
else:
song_data = raw_song_data
print("\nSampleRate : ", sample_rate)
print("SegSize : ", blit)
# calculate the frequencies associated to the FFTs
pfreq = scp.fft.rfftfreq(blit, 1/sample_rate)
# left boundary of segment to crop
current_time = offset
# list of FFTs
fft_list = []
# number of samples
k = 0
print("Retrieving freqs from", offset, "to", songlen+offset, "...")
while(current_time < songlen+offset-segsize):
# index corresponding to left boundary
left_id = int(current_time*sample_rate)
# index corresponding to right boundary
right_id = int((current_time+segsize)*sample_rate)
# calculate the fft, append it to fft_list
pff = scp.fft.rfft(song_data[int(current_time*sample_rate):int(sample_rate*(current_time+segsize))])
fft_list.append(pff)
# just to avoid what causes 0.1 + 0.1 == 0.2 to be False
k += 1
current_time = offset + k*segsize
#print(current_time)
print("\n\nSegSize :", segsize, "\nFFT :", len(fft_list), "\nFFT[0] :", len(fft_list[0]), "\npfreq :", len(pfreq), "\n\n")
# -------------------------------------------- Clean song -------------------------------------------- #
pfreq_minid = 0
pfreq_maxid = len(pfreq) -1
while(pfreq[pfreq_minid] < minfreq):
for t in range(len(fft_list)):
fft_list[t][pfreq_minid] = 0+0j
pfreq_minid += 1
while(pfreq[pfreq_maxid] > maxfreq):
for t in range(len(fft_list)):
fft_list[t][pfreq_maxid] = 0+0j
pfreq_maxid -= 1
new_times = []
new_freqs = []
new_ampls = []
new_kept = []
# i = time, j = freq
for i in range(len(fft_list)):
#returns a list of couples [id, value]
elements = heapq.nlargest(count, enumerate(fft_list[i]), key=lambda x: x[1])
for idx in range(len(elements)):
if(elements[idx][0] < len(pfreq)):
new_times.append(offset + i*segsize)
new_freqs.append(pfreq[elements[idx][0]])
new_ampls.append(fft_list[i][elements[idx][0]])
'''for i in range(len(new_freqs)):
while(new_freqs[i]>1000):
new_freqs[i] = new_freqs[i]/2'''
# -------------------------------------------- Localize -------------------------------------------- #
timing_points = []
for i in range(len(new_times)):
if(i == 0 or not is_note_within(new_freqs[i], new_freqs[i-1])):
timing_points.append(new_times[i])
new_kept.append(new_freqs[i])
else:
new_kept.append(0)
plt.plot(new_times, new_freqs)
plt.plot(new_times, new_kept, "ro")
plt.grid()
plt.show()
# -------------------------------------------- Write -------------------------------------------- #
i0 = 0
timing_points.append(999999)
write_freq = 880
write_cur = 0
write_id = -1
while(write_cur <= write_freq): # shouldnt seg fault
write_id += 1
write_cur = pfreq[write_id]
# remove
# i = time, j = freq
for i in range(len(fft_list)):
# retrieve dominant freq
if(segsize*i >= timing_points[i0]-offset):
i0 += 1
maxfreq = 0
maxfreqid = 0
maxamp = 0
for j in range(len(fft_list[0])):
if(np.abs(fft_list[i][j]) > maxamp):
maxamp = np.abs(fft_list[i][j])
maxfreq = pfreq[j]
maxfreqid = j
fft_list[i][write_id] = max(maxamp*2, 32767)
fft_list[i][write_id-1] = max(maxamp*2, 32767)
fft_list[i][write_id+1] = max(maxamp*2, 32767)
res = []
print("Converting...")
for i in range(len(fft_list)):
ift = scp.fft.irfft(fft_list[i], n=blit)
for k in ift:
res.append(k)
#print(type(res[0]))
mx = 0
for j in range(len(res)):
if(res[j] > mx):
mx = res[j]
for i in range(len(res)):
res[i] = np.int16(32767*res[i]/mx)
res = np.array(res)
wavfile.write(output_name, sample_rate, res)
def write_result(song_name, offset, songlen, segsize, timing_pts, output_name):
# removes unnecessary frequencies/amps from a song
# ampthr is in [0, 1]
@ -407,7 +567,9 @@ def write_result(song_name, offset, songlen, segsize, timing_pts, output_name):
maxfreq = pfreq[j]
maxfreqid = j
fft_list[i][write_id] = max(maxamp*2, 10000)
fft_list[i][write_id] = max(maxamp*2, 32767)
fft_list[i][write_id-1] = max(maxamp*2, 32767)
fft_list[i][write_id+1] = max(maxamp*2, 32767)
# writing new .wav
@ -645,27 +807,53 @@ def retrieve_all_from_song(filename, t0, t1, bpm, dta=0.001, dtf=0.01, threshold
plt.show()
# free()
'''
void_freq_clean(convert_to_wav("ctype.mp3"), 0.042, 5, 1/(149.3/60)/8, 100, 3000, 0.05, "ctype_void.mp3")
localize_frequencies(convert_to_wav("ctype_void.mp3"), 0, 5, 1/(149.3/60)/12, "ctype_filtered.mp3")
retrieve_all_from_song("ctype_filtered.mp3", 0, 5, 149.3, dta=1/(149.3/60)/128, dtf=1/(149.3/60)/8)
'''
#OFFSET = 0.042
#BPM = 149.3
# c-type
SONG_LEN = 5
OFFSET = 117.790
BPM = 150
OFFSET = 0.042
BPM = 149.3
SEGSIZE = 1/(BPM/60)
'''
'''
# tetris_2
SONG_LEN = 8
OFFSET = 0
BPM = 157
SEGSIZE = 1/(BPM/60)
'''
'''
# test
SONG_LEN = 1
OFFSET = 0
BPM = 240
SEGSIZE = 1/(BPM/60)
'''
'''
# gmtn
SONG_LEN = 3
OFFSET = 1.652
BPM = 155
SEGSIZE = 1/(BPM/60)
'''
# E
SONG_LEN = 10
OFFSET = 2.641
BPM = 155
SEGSIZE = 1/(BPM/60)
wavved_song = convert_to_wav("Galaxy Collapse.mp3")
#wavved_song = convert_to_wav("tetris_2.wav")
wavved_song = convert_to_wav("songs/rushe.mp3")
keep_highest(wavved_song, OFFSET, SONG_LEN, SEGSIZE/4, 1, "Zblit.wav", minfreq=300, maxfreq=3000)
'''
# remove high/low frequencies (often noise)
#void_freq_clean(wavved_song, OFFSET, SONG_LEN, SEGSIZE/8, 100, 3000, 0.05, "Zvoided_song.wav")
# crops any part with let ring
localize_frequencies(convert_to_wav("Zvoided_song.wav"), 0, SONG_LEN-0.1, SEGSIZE/8, "Zcleaned_song.wav")
localize_frequencies("Zblit.wav", 0, SONG_LEN-0.1, SEGSIZE/8, "Zcleaned_song.wav")
#localize_frequencies(wavved_song, OFFSET, SONG_LEN, SEGSIZE/8, "Zcleaned_song.wav")
# find timings
tp = parse_after_filter("Zcleaned_song.wav", 0, SONG_LEN-0.1, SEGSIZE/8, OFFSET)
@ -674,6 +862,7 @@ tp = parse_after_filter("Zcleaned_song.wav", 0, SONG_LEN-0.1, SEGSIZE/8, OFFSET)
write_result(wavved_song, OFFSET, SONG_LEN-0.1, SEGSIZE/8, tp, "Zoutput_song.wav")
#retrieve_all_from_song("Zcleaned_song.wav", 0, 5, 149.3, dtf=1/(149.3/60)/8)
'''
print("yipee")

BIN
songs/audio.wav Normal file

Binary file not shown.

BIN
tetris_4.wav → songs/furioso melodia.mp3 Executable file → Normal file

Binary file not shown.

BIN
songs/rushe.mp3 Normal file

Binary file not shown.

View File

@ -1,343 +0,0 @@
from math import *
import numpy as np
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
import subprocess
import wave as wv
import struct
import librosa
import heapq
import scipy
import os
import random
from pathlib import Path
from time import sleep
from datetime import timedelta
import debug
print("Starting...\n")
def filter_n_percent_serial(song_name, offset, n_iter, step, threshold):
"""
song_name : string
offset : int
n_iter : int (number of turns)
step : int (length of each small segment)
threshold : int (is in ]0, 100])
filter data associated with song_name to keep only the highest threshold% values
"""
subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(offset+step*n_iter), "-i", song_name, "crop.wav"], shell=False)
sample_rate, global_data = wavfile.read('crop.wav')
subprocess.run(["clear"], shell=False)
subprocess.run(["rm", "crop.wav"], shell=False)
for i in range(n_iter):
print(i, "/", n_iter)
#print(i * step)
song_data = global_data[int(i*step*sample_rate):int((i+1)*step*sample_rate)]
if(len(song_data) != 0):
mx = max(song_data)
is_locked = [False for i in range(len(song_data))]
x = int((len(song_data)*threshold)//100)
#print("X = ", x)
#print("Retreiving the", int(x), "/", len(song_data), "highest values")
elements = heapq.nlargest(int(x), enumerate(song_data), key=lambda x: x[1])
#print("Done")
for idx in range(len(elements)):
is_locked[elements[idx][0]] = True
for r in range(len(song_data)):
if(is_locked[r] == False):
global_data[r+int(i*step*sample_rate)] = 0
return global_data
def write_to_file_thr(sample_rate, song_data, offset, threshold, filename):
# write data to output file
file = open(filename, 'w')
file.writelines('time,amplitude\n')
mx = max(song_data)
print("writing to output...")
for i in range(len(song_data)):
if(i%(len(song_data)//50) == 0):
print(i, "/", len(song_data))
if(song_data[i]/mx > threshold):
file.writelines(str(np.round(offset + i/sample_rate, 3)))
file.writelines(',')
file.writelines(str(np.round(song_data[i], 0)))
file.writelines('\n')
def round_t(id, sample_rate, bpm, div, offset, k0):
k = k0
t = offset + k/(bpm*div)
while(t < id/sample_rate):
t = offset + k/(bpm*div)
k += 1
if(np.abs(t - id/sample_rate) < np.abs((t - 1/(bpm*div)) - id/sample_rate)):
return t
return (t - 1/(bpm*div), 0)
def compress(Zxx):
res = []
def get_freq(song_name, times, width=1000, display=False):
"""
for a given list of times (in seconds), returns the corresponding peak frequencies
"""
subprocess.run(["ffmpeg", "-ss", str(0), "-t", str(max(np.array(times))), "-i", song_name, "crop.wav"], shell=False)
sample_rate, global_data = wavfile.read(song_name)
#blit = int(sample_rate*step)
subprocess.run(["clear"], shell=False)
subprocess.run(["rm", "crop.wav"], shell=False)
pfreq = scipy.fft.rfftfreq(2*width, 1/sample_rate)
frequencies = [0 for s in range(len(times))]
print(len(pfreq))
for s in range(len(times)):
left = max(0, int(times[s]*44100)-width)
right = min(len(global_data), int(times[s]*44100)+width)
pff = scipy.fft.rfft(global_data[left:right])
#print(len(pff), len(pfreq))
mx = max(np.abs(pff))
for id in range(len(pff)):
if frequencies[s] == 0 and np.abs(pff[id]) == mx:
frequencies[s] = pfreq[id]
if(display):
plt.plot(times, frequencies)
plt.grid()
plt.xlabel("Time (s)")
plt.ylabel("Dominant frequency (Hz)")
plt.title("Dominant frequencies at peaks")
plt.show()
return frequencies
def is_data_stereo(raw_global_data:list) -> bool:
"""
raw_global_data : list
"""
try:
assert(raw_global_data[0][0])
except IndexError:
return False
except AssertionError:
return True
return True
def void_freq(song_name, offset, songlen, increment, minfreq, maxfreq, upperthr, ampthr, ampfreq, ampval, leniency, write, linear, output_file="trimmed.wav"):
"""
song_name : string
offset : int
songlen : int (length of the part that will be filtered, starting from offset)
increment : float (technical parameter)
minfreq and maxfreq : every frequency in [minfreq, maxfreq] will be voided
upperthr : every frequency above upperthr will be voided
ampthr : every frequency with amplitude under MAX/ampthr (aka amplitudes under (100/ampthr)% of the max will be voided
ampfreq, leniency (if linear is false), linear : technical parameters
ampval : int
- if linear is false, then this willbe the maximum amplification possible
- if linear is true, this is the multiplier (Amp <- Amp * (ampval * frequency + leniency))
write : bool (should be set to True)
output_file : technical
"""
fft_list = []
times = []
current_time = offset
k = 0
subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(songlen+offset), "-i", song_name, "crop.wav"], shell=False)
sample_rate, raw_global_data = wavfile.read("crop.wav")
blit = int(sample_rate*increment)
global_data = [0 for i in range(len(raw_global_data))]
#subprocess.run(["clear"])
subprocess.run(["rm", "crop.wav"], shell=False)
a = 0
if(is_data_stereo(raw_global_data)):
print("Converting to mono...")
for x in range(len(raw_global_data)):
global_data[x] = raw_global_data[x][0]/2 + raw_global_data[x][1]/2
if(x % (int(len(raw_global_data)/100)) == 0):
print(a, "/ 100")
a += 1
else:
global_data = raw_global_data
#print("Blit :", blit)
pfreq = scipy.fft.rfftfreq(blit, 1/sample_rate)
#print(len(pfreq))
while(current_time <= songlen):
pff = scipy.fft.rfft(global_data[k*blit:(k+1)*blit])
fft_list.append(pff)
times.append(k*increment)
k += 1
current_time = offset + k*increment
print("FFT :", len(fft_list), "\nFFT[0] :", len(fft_list[0]), "\npfreq :", len(pfreq))
print("Finding global max...")
if(linear == False):
for i in range(len(fft_list)):
for j in range(len(fft_list[i])):
fft_list[i][j] *= (1 + ampval/max(1, np.abs(pfreq[j] - ampfreq)))
else:
for i in range(len(fft_list)):
for j in range(len(fft_list[i])):
fft_list[i][j] *= (ampval*pfreq[j] + leniency)
print("Trimming...")
for i in range(len(fft_list)):
lmax = 0
for j in range(len(fft_list[i])):
if(np.abs(fft_list[i][j]) > lmax):
lmax = np.abs(fft_list[i][j])
for j in range(len(fft_list[i])):
if((pfreq[j] >= minfreq and pfreq[j] < maxfreq) or pfreq[j] > upperthr):
fft_list[i][j] = 0+0j
if(np.abs(fft_list[i][j]) < lmax/ampthr):
fft_list[i][j] = 0+0j
if(write):
res = []
print("Converting...")
for i in range(len(fft_list)):
ift = scipy.fft.irfft(fft_list[i], n=blit)
for k in ift:
res.append(k)
#print(type(res[0]))
mx = 0
for j in range(len(res)):
if(res[j] > mx):
mx = res[j]
for i in range(len(res)):
res[i] = np.int16(32767*res[i]/mx)
res = np.array(res)
wavfile.write(output_file, 44100, res)
#plt.plot(np.abs(pfreq[:len(fft_list[0])]), np.abs(fft_list[0]))
#plt.grid()
#plt.show()
print("Done")
def convert_tuple(data, times):
"""
Takes data and converts it to a list of tuples (amplitude, datetimes)
"""
return [(times[i], data[i]) for i in range(len(data))]
def get_songlen(filename):
"""
retrieves the length of the song in seconds
"""
sample_rate, global_data = wavfile.read(filename)
print("LEN :", len(global_data)/sample_rate)
return (len(global_data)/sample_rate)
def convert_to_wav(song_name:str, output_file="audio.wav") -> str:
"""
Converts the song to .wav, only if it's not already in wave format.
Currently relies on file extension.
Returns: the song_name that should be used afterwards.
"""
extension = Path(song_name).suffix
match extension:
case ".mp3" | ".ogg":
print("Converting to .wav...")
subprocess.run(["ffmpeg", "-y", "-i", song_name, output_file], shell=False)
return output_file
return song_name
def process_song(filename, bpm, offset0=0, div_len_factor=1, n_iter_2=-1, threshold=0.5, divisor=4):
"""
filename : string (name of the song)
offset : int [+] (song mapping will start from this time in seconds, default is 0)
bpm : int [+]
div_len_factor : float [+] (the length multiplier of each segment, default is 1)
n_iter : int [+*] (the number of iterations, default is -1 (maps the whole music))
threshold : int [0, 100] (used by the filter function to only keep the largest threshold% of timing points, default is 0.5)
divisor : int [+] (beat divisor used to snap the notes, default is 4)
"""
filename = convert_to_wav(filename)
offset = offset0/1000
div_len = div_len_factor*60/bpm-0.01
n_iter = n_iter_2
song_len = get_songlen(filename)
if(n_iter == -1):
n_iter = int((song_len-offset/1000)/div_len)-1
filtered_name = f"{filename}_trimmed.wav"
void_freq(filename, offset, min(song_len, offset+div_len*(n_iter+1)+0.01), 4*60/bpm, minfreq=0, maxfreq=220, upperthr=5000, ampthr=60, ampfreq = 1200, ampval = 5.0, leniency = 0.005, write=True, linear=False, output_file=filtered_name)
datares = filter_n_percent_serial(filtered_name, offset, n_iter, div_len, threshold)
#snapped_data = amplitude
#times in ms
(snapped_data, times) = debug.snap3(datares, mintime=50, initial_plot=True, after_plot=True)
#frequencies=get_freq(filtered_name, offset, div_len, div_len*n_iter, snapped_data, True)
frequencies = get_freq(filtered_name, times, display=True)
Path(f"{filename}_trimmed.wav").unlink()
return snapped_data, times, frequencies
'''
datares = debug.snap2(datares, 44100, bpm, first_offset=offset, div=divisor, show=True, adjust=True)
frequencies = get_freq(filtered_name, offset, div_len, div_len*n_iter, datares, True)
Path(f"{filename}_trimmed.wav").unlink()
return convert_tuple(datares, frequencies)
'''
def main():
aa, bb, cc = process_song("tetris_4.wav", 160, n_iter_2=48)
#print(data)
print("Program finished with return 0")
if __name__ == "__main__":
main()