osutipe/new-process.py

368 lines
10 KiB
Python

from math import *
import numpy as np
from scipy.io import wavfile
from scipy import signal
import matplotlib.pyplot as plt
import subprocess
import wave as wv
import struct
import librosa
import heapq
print("Starting...\n")
def fct(song_name, offset, increment, songlen, maxfreq, display):
to_cut = 20000//maxfreq
global_Zxx = np.array([])
global_f = np.array([])
global_t = np.array([])
current_time = offset
k = 0
while(current_time <= songlen):
subprocess.run(["ffmpeg", "-ss", str(current_time), "-t", str(increment), "-i", song_name, "crop.wav"])
sample_rate, audio_data = wavfile.read('crop.wav')
size = audio_data.size
#subprocess.run(["clear"])
subprocess.run(["rm", "crop.wav"])
# do stuff here
#f, t, Zxx = signal.stft(audio_data, sample_rate, nperseg=1000)
f, t, Zxx = signal.spectrogram(audio_data, fs=sample_rate, nfft=size)
leng = len(f)
f, Zxx = f[:leng//to_cut], Zxx[:leng//to_cut]
#print(len(Zxx))
#print(len(Zxx[0]))
#convert to mel
'''
for i in range(len(Zxx)):
for j in range(len(Zxx[i])):
Zxx[i][j] *= 1127*np.log(1+f[i]/700)'''
t = np.array([current_time + x for x in t])
if(k == 0):
global_f = f
global_t = t
global_Zxx = Zxx
else:
global_Zxx = np.concatenate((global_Zxx, Zxx), axis=1)
global_t = np.concatenate((global_t, t))
#print(len(global_t))
k += 1
current_time = offset + k*increment
print("Completion rate : ", np.round(100*(current_time-offset)/(songlen-offset), 4), "%")
if(display):
plt.pcolormesh(global_t, global_f, np.abs(global_Zxx), shading='gouraud')
# print(len(global_Zxx), len(global_Zxx[0]))
# 88 192 = 2500
# 70 192 = 2000
plt.title('STFT Magnitude')
plt.ylabel('Frequency [Hz]')
plt.xlabel('Time [sec]')
plt.show()
return global_t, global_f, np.abs(global_Zxx)
def write_to_file(t, flist, maxlist, filename):
file = open(filename, 'w')
file.writelines('time,frequency,maxvalue\n')
for i in range(len(t)):
file.writelines(str(np.round(t[i], 3)))
file.writelines(',')
file.writelines(str(np.round(flist[i], 1)))
file.writelines(',')
file.writelines(str(np.round(maxlist[i], 0)))
file.writelines('\n')
#close(file)
def plot_max(time, freq, Zxx, save):
fres = [0 for x in range(len(time))]
maxres = [0 for x in range(len(time))]
for t in range(len(time)):
#subprocess.run(["clear"])
print(t, "/", len(time))
for f in range(len(Zxx)):
if(maxres[t] < Zxx[f][t]):
maxres[t] = Zxx[f][t]
fres[t] = freq[f]
if(save):
write_to_file(time, fres, maxres, 'output.csv')
'''
plt.plot(time, fres, 'r')
plt.grid()
plt.xlabel("Time")
plt.ylabel("Maximum frequencies")
plt.plot(time, maxres, 'g')
plt.grid()
plt.xlabel("Time")
plt.ylabel("Maximun values")
plt.show()'''
fig, (ax1, ax2) = plt.subplots(2)
fig.suptitle('Top : time and frequencies\nBottom : time and max values')
ax1.plot(time, fres)
ax2.plot(time, maxres)
plt.show()
def extract_peaks(song_data, sample_rate, offset, display, threshold):
mx = max(song_data)
for i in range(len(song_data)):
#subprocess.run(["clear"])
print(i, "/", len(song_data))
if(song_data[i]/mx < threshold):
song_data[i] = 0
t = [offset + i/sample_rate for i in range(len(song_data))]
if(display):
plt.plot(t, song_data, 'b+')
plt.grid()
plt.xlabel("t")
plt.ylabel("amp")
plt.show()
return (t, song_data)
def get_local_max(song_data, center, width):
mx = 0
for o in range(-width, width+1):
togo = min(len(song_data)-1, center+o)
togo = max(0, togo)
if(mx < song_data[togo]):
mx = song_data[togo]
return mx
def extract_peaks_v2(song_data, sample_rate, offset, display, threshold, seglen):
mx = 0
for i in range(len(song_data)):
if (i%seglen == 0):
print("----")
mx = get_local_max(song_data, i+seglen//2, seglen//2)
#subprocess.run(["clear"])
print(i, "/", len(song_data))
if(song_data[i]/mx < threshold):
song_data[i] = 0
t = [offset + i/sample_rate for i in range(len(song_data))]
if(display):
plt.plot(t, song_data, 'b+')
plt.grid()
plt.xlabel("t")
plt.ylabel("amp")
plt.show()
return (t, song_data)
def peaks(song_name, offset, length, display, thr):
subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(length), "-i", song_name, "crop.wav"])
sample_rate, audio_data = wavfile.read('crop.wav')
subprocess.run(["clear"])
subprocess.run(["rm", "crop.wav"])
#return extract_peaks(audio_data, sample_rate, offset, display, thr)
return extract_peaks_v2(audio_data, sample_rate, offset, display, thr, 44100*2)
def find_bpm(sample_rate, data, minbpm, maxbpm, step, width):
optimal = minbpm
optimal_acc = 0
accuracy = 0
bpmlst = []
scores = []
for beat in range(minbpm, maxbpm+step, step):
loopturn = 0
print("testing", beat)
accuracy = 0
current = 0
while(current+width < len(data)):
loopturn += 1
for o in range(-width, width+1):
accuracy += data[current + o]
#current = (loopturn*sample_rate)//beat
current += (sample_rate)//beat
#accuracy = accuracy/loopturn
#accuracy *= (1+(maxbpm-beat)/minbpm)
if optimal_acc < accuracy:
optimal_acc = accuracy
optimal = beat
bpmlst.append(beat)
scores.append(accuracy)
if(True):
plt.plot(bpmlst, scores)
plt.xlabel("BPM")
plt.ylabel("Score")
plt.grid()
plt.show()
return (optimal, optimal_acc)
def find_bpm_2(sample_rate, data, threshold, maxbpm):
mx = np.max(data)
min_spacing = (60*sample_rate)/maxbpm
k = 0
while(k < len(data) and data[k]/mx < threshold):
k += 1
k += 1
spacing = []
current = 1
progress = 0
while(k < len(data)):
if(k%(len(data)/100) == 0):
print(progress, "%")
progress += 1
if(data[k]/mx >= threshold and current > min_spacing):
spacing.append(current)
current = 0
else:
current += 1
k += 1
for x in range(len(spacing)):
spacing[x] = 60/(spacing[x]/sample_rate)
digits = [i for i in range(len(spacing))]
if(True):
plt.plot(digits, spacing)
plt.xlabel("N")
plt.ylabel("BPM")
plt.grid()
plt.show()
beat = np.mean(spacing)
error = np.std(spacing)
return (np.round(beat, 3), np.round(error, 3))
def filter_n_percent(song_name, offset, length, threshold):
# threshold is in ]0, 100]
subprocess.run(["ffmpeg", "-ss", str(offset), "-t", str(length), "-i", song_name, "crop.wav"])
sample_rate, song_data = wavfile.read('crop.wav')
subprocess.run(["clear"])
subprocess.run(["rm", "crop.wav"])
is_locked = [False for i in range(len(song_data))]
x = int((len(song_data)*threshold)//100)
print("X = ", x)
mx = max(song_data)
print("Retreiving the", int(x), "/", len(song_data), "highest values")
elements = heapq.nlargest(int(x), enumerate(song_data), key=lambda x: x[1])
print("Done")
for idx in range(len(elements)):
is_locked[elements[idx][0]] = True
for r in range(len(song_data)):
if(is_locked[r] == False):
song_data[r] = 0
if(True):
t = [offset + j/sample_rate for j in range(len(song_data))]
plt.plot(t, song_data)
plt.xlabel("Time")
plt.ylabel("Amplitude")
plt.grid()
plt.show()
return song_data
if(False):
#t, f, Zxx = fct("no.wav", 0, 0.032, 10, 5000, False)
#t, f, Zxx = fct("worlds_end_3.wav", 150.889, 0.032, 170.889, 3000, False)
#t, f, Zxx = fct("deltamax.wav", 9.992, 0.032, 114.318, 3000, False)
#t, f, Zxx = fct("deltamax.wav", 9.992, 0.032, 20, 3000, False)
#t, f, Zxx = fct("da^9.wav", 8.463, 0.032, 20, 5000, False)
t, f, Zxx = fct("13. Cosmic Mind.wav", 0, 0.032, 20, 5000, False)
#t, f, Zxx = fct("Furioso Melodia 44100.wav", 4, 0.032, 8, 3000, False)
#t, f, Zxx = fct("changing.wav", 0, 0.05, 3.9, 5000, False)
#fct("worlds_end_3.wav", 75, (60/178)/4, 75+2, 2500)
plot_max(t, f, Zxx, True)
if(False):
#(t, data) = peaks("worlds_end_3.wav", 0, 300, False, 0.92)
(t, data) = peaks("worlds_end_3.wav", 74.582, 6, False, 0.9)
#(t, data) = peaks("da^9.wav", 8.463, 301.924 - 8.463, False, 0.95)
#(t, data) = peaks("deltamax.wav", 8.463, 30101.924 - 8.463, False, 0.92)
da = find_bpm(t, 44100, data, 100, 200, 1, 10)
print("BPM data is", da)
if(True):
#data = filter_n_percent("worlds_end_3.wav", 74.582, 20, 0.1)
data = filter_n_percent("no.wav", 1, 10, 0.1)
#da = find_bpm(44100, data, 100, 200, 1, 0)
da = find_bpm_2(44100, data, 0.97, 240)
print("BPM is", da[0], "with std of", da[1])
print("Program finished with return 0")
#data = [-1 for i in range(int(x))]
#ids = [-1 for i in range(int(x))]
'''
data = []
ids = []
for k in range(int(x)):
data.append(int(7*mx/10))
ids.append(-1)
# structure there is [[index, value]...]
i = 0
calc = 0
while(i < len(song_data)):
if(i%10 == 0):
print(i, "/", len(song_data))
if(data[int(x)-1] < song_data[i]):
calc += 1
#print("\n \n \n \n \n")
data[int(x)-1] = song_data[i]
ids[int(x)-1] = i
k = int(x)-1
#while(k < int(x) & data[0] > data[k]):
while(k > 0 and data[k-1] <= data[k]):
data[k], data[k-1] = data[k-1], data[k]
ids[k], ids[k-1] = ids[k-1], ids[k]
k -= 1
#print(data[int(x)-1], calc, "/", x)
i += skip
i += 1
for s in range(int(x)-1):
if(data[s] < data[s+1]):
print("Nope", s)
assert(0)
'''