184 lines
7.3 KiB
Python
184 lines
7.3 KiB
Python
# treat_as_circle
|
|
# -------------------
|
|
def treat_as_circle(ind, is_slider_head):
|
|
global timeM1
|
|
start = (ind == 0)
|
|
beat_phase = round((objects[ind].time.total_seconds()*1000 - offset*1000)/ms_per_beat % 1, 2)
|
|
xcoord, ycoord = objects[ind].position
|
|
xcoord, ycoord = normalize(xcoord, ycoord)
|
|
if start:
|
|
delta_time = 0
|
|
else:
|
|
delta_time = (objects[ind].time - timeM1).total_seconds()
|
|
red_tick = (beat_phase == 0.5)
|
|
blue_tick = (beat_phase == 0.75)
|
|
white_tick = (beat_phase == 1.0)
|
|
delta_beat = round((delta_time*1000)/ms_per_beat, 2) / 3
|
|
timeM1 = objects[ind].time
|
|
stream.append((start,delta_time,delta_beat,blue_tick,red_tick,white_tick,xcoord,ycoord,False,is_slider_head, stars))
|
|
|
|
|
|
|
|
# Création de "stream"
|
|
#-------------------------------
|
|
def extract_stream(map:sl.beatmap.Beatmap, fix_stars=-1):
|
|
if fix_stars == -1:
|
|
stars = map.stars() / 7
|
|
else:
|
|
stars = fix_stars / 7
|
|
objects = map.hit_objects()
|
|
stream = []
|
|
offset = map.timing_points[0].offset.total_seconds()
|
|
ms_per_beat = map.timing_points[0].ms_per_beat
|
|
timeM1 = timedelta(seconds=0)
|
|
n = len(objects)
|
|
for i in range(len(objects)):
|
|
if is_slider(objects[i]):
|
|
duration = objects[i].end_time - objects[i].time
|
|
positions = []
|
|
number_points,curve = len(objects[i].curve.points)
|
|
curve = objects[i].curve.points
|
|
delta = duration/float(number_points)
|
|
for j in range(number_points):
|
|
if j == 0:
|
|
treat_as_circle(i, True)
|
|
else:
|
|
start = False
|
|
xcoord, ycoord = normalize(curve[j][0], curve[j][1])
|
|
delta_time = delta.total_seconds()
|
|
beat_phase = round(((objects[i].time + j*delta).total_seconds()*1000 - offset*1000)/ms_per_beat % 1, 2)
|
|
delta_beat = round((delta_time*1000)/ms_per_beat, 2) / 3
|
|
red_tick = (beat_phase == 0.5)
|
|
blue_tick = (beat_phase == 0.75)
|
|
white_tick = (beat_phase == 1.0)
|
|
stream.append((start,delta_time,delta_beat,blue_tick,red_tick,white_tick,xcoord,ycoord,True,False, stars))
|
|
timeM1 = objects[i].end_time
|
|
else:
|
|
treat_as_circle(i, False)
|
|
return stream
|
|
|
|
|
|
|
|
#Construction de séquence 2
|
|
# --------------------------------------
|
|
def mirror_stream(stream):
|
|
return [(s[0], s[1], s[2], s[3], s[4], s[5], 1 - s[6], s[7], s[8], s[9], s[10]) for s in stream]
|
|
|
|
def build_sequences(maps:list):
|
|
sequences = []
|
|
for mapp in maps:
|
|
stream = extract_stream(mapp)
|
|
mirror = mirror_stream(stream)
|
|
for i in range(len(stream) - seq_len-1):
|
|
sequences.append((stream[i:i+seq_len], stream[i+seq_len+1]))
|
|
sequences.append((mirror[i:i+seq_len], mirror[i+seq_len+1]))
|
|
return sequences
|
|
|
|
|
|
#Création de map
|
|
# --------------------------------------
|
|
def reposition_objects(mapp: sl.Beatmap):
|
|
stream = extract_stream(mapp, fix_stars=5.5)
|
|
model.eval()
|
|
j = 0 # because stream separates different curve points, j keeps the true object index while str_i is the index corresponding to "fake" objects
|
|
str_i = 0
|
|
objects = mapp.hit_objects()
|
|
while str_i < len(stream):
|
|
with torch.no_grad():
|
|
if not is_slider(objects[j]):
|
|
in_sequence = torch.tensor(stream[max(0, str_i - 20):str_i+1], dtype=torch.float32).unsqueeze(0).to(cuda)
|
|
coords = model(in_sequence)
|
|
coords = coords.tolist()
|
|
x,y = coords[0][0], coords[0][1]
|
|
mapp.hit_objects()[j].position = sl.Position(x*512, y*384)
|
|
stream[str_i] = [stream[str_i][0], stream[str_i][1], stream[str_i][2], stream[str_i][3], stream[str_i][4], stream[str_i][5], x, y, stream[str_i][8], stream[str_i][9], stream[str_i][10]]
|
|
j += 1
|
|
str_i += 1
|
|
else:
|
|
for curvepoint_k in range(len(objects[j].curve.points)):
|
|
in_sequence = torch.tensor(stream[max(0, str_i - 16):str_i+1], dtype=torch.float32).unsqueeze(0).to(cuda)
|
|
coords = model(in_sequence) # Predict xcoord and ycoord
|
|
coords = coords.tolist()
|
|
x,y = coords[0][0], coords[0][1]
|
|
if curvepoint_k == 0:
|
|
mapp.hit_objects()[j].position = sl.Position(x*512, y*384)
|
|
mapp.hit_objects()[j].curve.points[curvepoint_k] = sl.Position(x*512, y*384)
|
|
stream[str_i] = [stream[str_i][0], stream[str_i][1], stream[str_i][2], stream[str_i][3], stream[str_i][4], stream[str_i][5], x, y, stream[str_i][8], stream[str_i][9], stream[str_i][10]]
|
|
str_i += 1
|
|
j += 1
|
|
|
|
return mapp
|
|
path = Path("/kaggle/input/survey1/4.osu")
|
|
map = reposition_objects(sl.Beatmap.from_path(path))
|
|
map.write_path("/kaggle/working/4.osu")
|
|
|
|
|
|
#TRAINING LOOP
|
|
# --------------------------------------
|
|
model = LSTM(input_size=num_features, hidden_size=hidden_size, output_size=2).to(cuda)
|
|
optimizer = Adam(model.parameters(), lr=0.003)
|
|
criterion = nn.L1Loss()
|
|
# boucle d'entraînement
|
|
for epoch in range(20):
|
|
model.train()
|
|
train_loss = 0
|
|
for x, y in train_dataloader:
|
|
optimizer.zero_grad()
|
|
preds = model(x)
|
|
loss = criterion(preds, y)
|
|
loss.backward()
|
|
optimizer.step()
|
|
train_loss += loss.item()
|
|
avg_train_loss = train_loss / len(train_dataloader)
|
|
model.eval()
|
|
val_loss = 0
|
|
with torch.no_grad():
|
|
for x_val, y_val in val_dataloader:
|
|
preds_val = model(x_val)
|
|
val_loss += criterion(preds_val, y_val).item()
|
|
|
|
avg_val_loss = val_loss / len(val_dataloader)
|
|
print(f'Epoch {epoch+1}: Train Loss = {avg_train_loss:.4f}, Val Loss = {avg_val_loss:.4f}')
|
|
|
|
#CREATION DATASET
|
|
# --------------------------------------
|
|
class SequenceDataset(torch.utils.data.Dataset):
|
|
def __init__(self, sequences):
|
|
self.sequences = sequences # liste de tuples
|
|
|
|
def __len__(self):
|
|
return len(self.sequences)
|
|
|
|
def __getitem__(self, idx):
|
|
x, y = self.sequences[idx]
|
|
x = torch.tensor(x, dtype=torch.float32).to(cuda) # x = entrée
|
|
y = torch.tensor([y[6], y[7]], dtype=torch.float32).to(cuda) # y = sortie ([xcoord, ycoord])
|
|
return x, y
|
|
path1 = Path("/kaggle/input/biggerset139")
|
|
path2 = Path("/kaggle/input/varietypack51")
|
|
maps = [sl.Beatmap.from_path(m) for m in (path1.ls() + path2.ls())]
|
|
sequences = build_sequences(maps)
|
|
train_sequences, val_sequences = train_test_split(sequences, test_size=0.2)
|
|
train_dataset = SequenceDataset(train_sequences)
|
|
val_dataset = SequenceDataset(val_sequences)
|
|
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)
|
|
val_dataloader = DataLoader(val_dataset, batch_size=64, shuffle=False)
|
|
|
|
|
|
|
|
|
|
#DEFINITION LSTM/dataset
|
|
# --------------------------------------
|
|
hidden_size,num_features,num_outputs = 128,11,2
|
|
class LSTM(nn.Module):
|
|
def __init__(self, input_size, hidden_size, output_size):
|
|
super().__init__()
|
|
self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
|
|
self.fc = nn.Linear(hidden_size, output_size)
|
|
|
|
def forward(self, x):
|
|
out, _ = self.lstm(x)
|
|
out = self.fc(out[:, -1, :])
|
|
out = torch.sigmoid(out)
|
|
return out
|