From 16c16b6ff56dbebf4fd0ae0f170c9acc6df080dd Mon Sep 17 00:00:00 2001 From: Atsushi Sakai Date: Sat, 4 May 2024 23:08:14 +0900 Subject: [PATCH] clean up fast slam codes (#1012) * clean up fast slam codes * clean up fast slam codes * clean up fast slam codes --- SLAM/FastSLAM1/fast_slam1.py | 100 ++++++++++++++++--------------- SLAM/FastSLAM2/fast_slam2.py | 110 +++++++++++++++++------------------ 2 files changed, 104 insertions(+), 106 deletions(-) diff --git a/SLAM/FastSLAM1/fast_slam1.py b/SLAM/FastSLAM1/fast_slam1.py index 17f6d5e572c..98f8a664177 100644 --- a/SLAM/FastSLAM1/fast_slam1.py +++ b/SLAM/FastSLAM1/fast_slam1.py @@ -17,8 +17,8 @@ R = np.diag([1.0, np.deg2rad(20.0)]) ** 2 # Simulation parameter -Q_sim = np.diag([0.3, np.deg2rad(2.0)]) ** 2 -R_sim = np.diag([0.5, np.deg2rad(10.0)]) ** 2 +Q_SIM = np.diag([0.3, np.deg2rad(2.0)]) ** 2 +R_SIM = np.diag([0.5, np.deg2rad(10.0)]) ** 2 OFFSET_YAW_RATE_NOISE = 0.01 DT = 0.1 # time tick [s] @@ -72,19 +72,18 @@ def normalize_weight(particles): def calc_final_state(particles): - xEst = np.zeros((STATE_SIZE, 1)) + x_est = np.zeros((STATE_SIZE, 1)) particles = normalize_weight(particles) for i in range(N_PARTICLE): - xEst[0, 0] += particles[i].w * particles[i].x - xEst[1, 0] += particles[i].w * particles[i].y - xEst[2, 0] += particles[i].w * particles[i].yaw + x_est[0, 0] += particles[i].w * particles[i].x + x_est[1, 0] += particles[i].w * particles[i].y + x_est[2, 0] += particles[i].w * particles[i].yaw - xEst[2, 0] = pi_2_pi(xEst[2, 0]) - # print(xEst) + x_est[2, 0] = pi_2_pi(x_est[2, 0]) - return xEst + return x_est def predict_particles(particles, u): @@ -235,28 +234,27 @@ def resampling(particles): pw = np.array(pw) n_eff = 1.0 / (pw @ pw.T) # Effective particle number - # print(n_eff) if n_eff < NTH: # resampling w_cum = np.cumsum(pw) base = np.cumsum(pw * 0.0 + 1 / N_PARTICLE) - 1 / N_PARTICLE resample_id = base + np.random.rand(base.shape[0]) / N_PARTICLE - inds = [] - ind = 0 + indexes = [] + index = 0 for ip in range(N_PARTICLE): - while (ind < w_cum.shape[0] - 1) \ - and (resample_id[ip] > w_cum[ind]): - ind += 1 - inds.append(ind) + while (index < w_cum.shape[0] - 1) \ + and (resample_id[ip] > w_cum[index]): + index += 1 + indexes.append(index) tmp_particles = particles[:] - for i in range(len(inds)): - particles[i].x = tmp_particles[inds[i]].x - particles[i].y = tmp_particles[inds[i]].y - particles[i].yaw = tmp_particles[inds[i]].yaw - particles[i].lm = tmp_particles[inds[i]].lm[:, :] - particles[i].lmP = tmp_particles[inds[i]].lmP[:, :] + for i in range(len(indexes)): + particles[i].x = tmp_particles[indexes[i]].x + particles[i].y = tmp_particles[indexes[i]].y + particles[i].yaw = tmp_particles[indexes[i]].yaw + particles[i].lm = tmp_particles[indexes[i]].lm[:, :] + particles[i].lmP = tmp_particles[indexes[i]].lmP[:, :] particles[i].w = 1.0 / N_PARTICLE return particles @@ -275,34 +273,34 @@ def calc_input(time): return u -def observation(xTrue, xd, u, rfid): +def observation(x_true, xd, u, rfid): # calc true state - xTrue = motion_model(xTrue, u) + x_true = motion_model(x_true, u) # add noise to range observation z = np.zeros((3, 0)) for i in range(len(rfid[:, 0])): - dx = rfid[i, 0] - xTrue[0, 0] - dy = rfid[i, 1] - xTrue[1, 0] + dx = rfid[i, 0] - x_true[0, 0] + dy = rfid[i, 1] - x_true[1, 0] d = math.hypot(dx, dy) - angle = pi_2_pi(math.atan2(dy, dx) - xTrue[2, 0]) + angle = pi_2_pi(math.atan2(dy, dx) - x_true[2, 0]) if d <= MAX_RANGE: - dn = d + np.random.randn() * Q_sim[0, 0] ** 0.5 # add noise - angle_with_noize = angle + np.random.randn() * Q_sim[ + dn = d + np.random.randn() * Q_SIM[0, 0] ** 0.5 # add noise + angle_with_noize = angle + np.random.randn() * Q_SIM[ 1, 1] ** 0.5 # add noise zi = np.array([dn, pi_2_pi(angle_with_noize), i]).reshape(3, 1) z = np.hstack((z, zi)) # add noise to input - ud1 = u[0, 0] + np.random.randn() * R_sim[0, 0] ** 0.5 - ud2 = u[1, 0] + np.random.randn() * R_sim[ + ud1 = u[0, 0] + np.random.randn() * R_SIM[0, 0] ** 0.5 + ud2 = u[1, 0] + np.random.randn() * R_SIM[ 1, 1] ** 0.5 + OFFSET_YAW_RATE_NOISE ud = np.array([ud1, ud2]).reshape(2, 1) xd = motion_model(xd, ud) - return xTrue, z, xd, ud + return x_true, z, xd, ud def motion_model(x, u): @@ -331,7 +329,7 @@ def main(): time = 0.0 # RFID positions [x, y] - RFID = np.array([[10.0, -2.0], + rfid = np.array([[10.0, -2.0], [15.0, 10.0], [15.0, 15.0], [10.0, 20.0], @@ -340,17 +338,17 @@ def main(): [-5.0, 5.0], [-10.0, 15.0] ]) - n_landmark = RFID.shape[0] + n_landmark = rfid.shape[0] # State Vector [x y yaw v]' - xEst = np.zeros((STATE_SIZE, 1)) # SLAM estimation - xTrue = np.zeros((STATE_SIZE, 1)) # True state - xDR = np.zeros((STATE_SIZE, 1)) # Dead reckoning + x_est = np.zeros((STATE_SIZE, 1)) # SLAM estimation + x_true = np.zeros((STATE_SIZE, 1)) # True state + x_dr = np.zeros((STATE_SIZE, 1)) # Dead reckoning # history - hxEst = xEst - hxTrue = xTrue - hxDR = xTrue + hist_x_est = x_est + hist_x_true = x_true + hist_x_dr = x_dr particles = [Particle(n_landmark) for _ in range(N_PARTICLE)] @@ -358,18 +356,18 @@ def main(): time += DT u = calc_input(time) - xTrue, z, xDR, ud = observation(xTrue, xDR, u, RFID) + x_true, z, x_dr, ud = observation(x_true, x_dr, u, rfid) particles = fast_slam1(particles, ud, z) - xEst = calc_final_state(particles) + x_est = calc_final_state(particles) - x_state = xEst[0: STATE_SIZE] + x_state = x_est[0: STATE_SIZE] # store data history - hxEst = np.hstack((hxEst, x_state)) - hxDR = np.hstack((hxDR, xDR)) - hxTrue = np.hstack((hxTrue, xTrue)) + hist_x_est = np.hstack((hist_x_est, x_state)) + hist_x_dr = np.hstack((hist_x_dr, x_dr)) + hist_x_true = np.hstack((hist_x_true, x_true)) if show_animation: # pragma: no cover plt.cla() @@ -377,16 +375,16 @@ def main(): plt.gcf().canvas.mpl_connect( 'key_release_event', lambda event: [exit(0) if event.key == 'escape' else None]) - plt.plot(RFID[:, 0], RFID[:, 1], "*k") + plt.plot(rfid[:, 0], rfid[:, 1], "*k") for i in range(N_PARTICLE): plt.plot(particles[i].x, particles[i].y, ".r") plt.plot(particles[i].lm[:, 0], particles[i].lm[:, 1], "xb") - plt.plot(hxTrue[0, :], hxTrue[1, :], "-b") - plt.plot(hxDR[0, :], hxDR[1, :], "-k") - plt.plot(hxEst[0, :], hxEst[1, :], "-r") - plt.plot(xEst[0], xEst[1], "xk") + plt.plot(hist_x_true[0, :], hist_x_true[1, :], "-b") + plt.plot(hist_x_dr[0, :], hist_x_dr[1, :], "-k") + plt.plot(hist_x_est[0, :], hist_x_est[1, :], "-r") + plt.plot(x_est[0], x_est[1], "xk") plt.axis("equal") plt.grid(True) plt.pause(0.001) diff --git a/SLAM/FastSLAM2/fast_slam2.py b/SLAM/FastSLAM2/fast_slam2.py index 2b57e3bcc39..d4cf0d84ddd 100644 --- a/SLAM/FastSLAM2/fast_slam2.py +++ b/SLAM/FastSLAM2/fast_slam2.py @@ -17,8 +17,8 @@ R = np.diag([1.0, np.deg2rad(20.0)]) ** 2 # Simulation parameter -Q_sim = np.diag([0.3, np.deg2rad(2.0)]) ** 2 -R_sim = np.diag([0.5, np.deg2rad(10.0)]) ** 2 +Q_SIM = np.diag([0.3, np.deg2rad(2.0)]) ** 2 +R_SIM = np.diag([0.5, np.deg2rad(10.0)]) ** 2 OFFSET_YAW_RATE_NOISE = 0.01 DT = 0.1 # time tick [s] @@ -35,16 +35,16 @@ class Particle: - def __init__(self, N_LM): + def __init__(self, n_landmark): self.w = 1.0 / N_PARTICLE self.x = 0.0 self.y = 0.0 self.yaw = 0.0 self.P = np.eye(3) # landmark x-y positions - self.lm = np.zeros((N_LM, LM_SIZE)) + self.lm = np.zeros((n_landmark, LM_SIZE)) # landmark position covariance - self.lmP = np.zeros((N_LM * LM_SIZE, LM_SIZE)) + self.lmP = np.zeros((n_landmark * LM_SIZE, LM_SIZE)) def fast_slam2(particles, u, z): @@ -73,18 +73,18 @@ def normalize_weight(particles): def calc_final_state(particles): - xEst = np.zeros((STATE_SIZE, 1)) + x_est = np.zeros((STATE_SIZE, 1)) particles = normalize_weight(particles) for i in range(N_PARTICLE): - xEst[0, 0] += particles[i].w * particles[i].x - xEst[1, 0] += particles[i].w * particles[i].y - xEst[2, 0] += particles[i].w * particles[i].yaw + x_est[0, 0] += particles[i].w * particles[i].x + x_est[1, 0] += particles[i].w * particles[i].y + x_est[2, 0] += particles[i].w * particles[i].yaw - xEst[2, 0] = pi_2_pi(xEst[2, 0]) + x_est[2, 0] = pi_2_pi(x_est[2, 0]) - return xEst + return x_est def predict_particles(particles, u): @@ -266,21 +266,21 @@ def resampling(particles): base = np.cumsum(pw * 0.0 + 1 / N_PARTICLE) - 1 / N_PARTICLE resample_id = base + np.random.rand(base.shape[0]) / N_PARTICLE - inds = [] - ind = 0 + indexes = [] + index = 0 for ip in range(N_PARTICLE): - while (ind < w_cum.shape[0] - 1) \ - and (resample_id[ip] > w_cum[ind]): - ind += 1 - inds.append(ind) + while (index < w_cum.shape[0] - 1) \ + and (resample_id[ip] > w_cum[index]): + index += 1 + indexes.append(index) tmp_particles = particles[:] - for i in range(len(inds)): - particles[i].x = tmp_particles[inds[i]].x - particles[i].y = tmp_particles[inds[i]].y - particles[i].yaw = tmp_particles[inds[i]].yaw - particles[i].lm = tmp_particles[inds[i]].lm[:, :] - particles[i].lmP = tmp_particles[inds[i]].lmP[:, :] + for i in range(len(indexes)): + particles[i].x = tmp_particles[indexes[i]].x + particles[i].y = tmp_particles[indexes[i]].y + particles[i].yaw = tmp_particles[indexes[i]].yaw + particles[i].lm = tmp_particles[indexes[i]].lm[:, :] + particles[i].lmP = tmp_particles[indexes[i]].lmP[:, :] particles[i].w = 1.0 / N_PARTICLE return particles @@ -299,35 +299,35 @@ def calc_input(time): return u -def observation(xTrue, xd, u, RFID): +def observation(x_true, xd, u, rfid): # calc true state - xTrue = motion_model(xTrue, u) + x_true = motion_model(x_true, u) # add noise to range observation z = np.zeros((3, 0)) - for i in range(len(RFID[:, 0])): + for i in range(len(rfid[:, 0])): - dx = RFID[i, 0] - xTrue[0, 0] - dy = RFID[i, 1] - xTrue[1, 0] + dx = rfid[i, 0] - x_true[0, 0] + dy = rfid[i, 1] - x_true[1, 0] d = math.hypot(dx, dy) - angle = pi_2_pi(math.atan2(dy, dx) - xTrue[2, 0]) + angle = pi_2_pi(math.atan2(dy, dx) - x_true[2, 0]) if d <= MAX_RANGE: - dn = d + np.random.randn() * Q_sim[0, 0] ** 0.5 # add noise - angle_noise = np.random.randn() * Q_sim[1, 1] ** 0.5 + dn = d + np.random.randn() * Q_SIM[0, 0] ** 0.5 # add noise + angle_noise = np.random.randn() * Q_SIM[1, 1] ** 0.5 angle_with_noise = angle + angle_noise # add noise zi = np.array([dn, pi_2_pi(angle_with_noise), i]).reshape(3, 1) z = np.hstack((z, zi)) # add noise to input - ud1 = u[0, 0] + np.random.randn() * R_sim[0, 0] ** 0.5 - ud2 = u[1, 0] + np.random.randn() * R_sim[ + ud1 = u[0, 0] + np.random.randn() * R_SIM[0, 0] ** 0.5 + ud2 = u[1, 0] + np.random.randn() * R_SIM[ 1, 1] ** 0.5 + OFFSET_YAW_RATE_NOISE ud = np.array([ud1, ud2]).reshape(2, 1) xd = motion_model(xd, ud) - return xTrue, z, xd, ud + return x_true, z, xd, ud def motion_model(x, u): @@ -356,7 +356,7 @@ def main(): time = 0.0 # RFID positions [x, y] - RFID = np.array([[10.0, -2.0], + rfid = np.array([[10.0, -2.0], [15.0, 10.0], [15.0, 15.0], [10.0, 20.0], @@ -365,17 +365,17 @@ def main(): [-5.0, 5.0], [-10.0, 15.0] ]) - n_landmark = RFID.shape[0] + n_landmark = rfid.shape[0] # State Vector [x y yaw v]' - xEst = np.zeros((STATE_SIZE, 1)) # SLAM estimation - xTrue = np.zeros((STATE_SIZE, 1)) # True state - xDR = np.zeros((STATE_SIZE, 1)) # Dead reckoning + x_est = np.zeros((STATE_SIZE, 1)) # SLAM estimation + x_true = np.zeros((STATE_SIZE, 1)) # True state + x_dr = np.zeros((STATE_SIZE, 1)) # Dead reckoning # history - hxEst = xEst - hxTrue = xTrue - hxDR = xTrue + hist_x_est = x_est + hist_x_true = x_true + hist_x_dr = x_dr particles = [Particle(n_landmark) for _ in range(N_PARTICLE)] @@ -383,18 +383,18 @@ def main(): time += DT u = calc_input(time) - xTrue, z, xDR, ud = observation(xTrue, xDR, u, RFID) + x_true, z, x_dr, ud = observation(x_true, x_dr, u, rfid) particles = fast_slam2(particles, ud, z) - xEst = calc_final_state(particles) + x_est = calc_final_state(particles) - x_state = xEst[0: STATE_SIZE] + x_state = x_est[0: STATE_SIZE] # store data history - hxEst = np.hstack((hxEst, x_state)) - hxDR = np.hstack((hxDR, xDR)) - hxTrue = np.hstack((hxTrue, xTrue)) + hist_x_est = np.hstack((hist_x_est, x_state)) + hist_x_dr = np.hstack((hist_x_dr, x_dr)) + hist_x_true = np.hstack((hist_x_true, x_true)) if show_animation: # pragma: no cover plt.cla() @@ -402,21 +402,21 @@ def main(): plt.gcf().canvas.mpl_connect( 'key_release_event', lambda event: [exit(0) if event.key == 'escape' else None]) - plt.plot(RFID[:, 0], RFID[:, 1], "*k") + plt.plot(rfid[:, 0], rfid[:, 1], "*k") for iz in range(len(z[:, 0])): landmark_id = int(z[2, iz]) - plt.plot([xEst[0][0], RFID[landmark_id, 0]], [ - xEst[1][0], RFID[landmark_id, 1]], "-k") + plt.plot([x_est[0][0], rfid[landmark_id, 0]], [ + x_est[1][0], rfid[landmark_id, 1]], "-k") for i in range(N_PARTICLE): plt.plot(particles[i].x, particles[i].y, ".r") plt.plot(particles[i].lm[:, 0], particles[i].lm[:, 1], "xb") - plt.plot(hxTrue[0, :], hxTrue[1, :], "-b") - plt.plot(hxDR[0, :], hxDR[1, :], "-k") - plt.plot(hxEst[0, :], hxEst[1, :], "-r") - plt.plot(xEst[0], xEst[1], "xk") + plt.plot(hist_x_true[0, :], hist_x_true[1, :], "-b") + plt.plot(hist_x_dr[0, :], hist_x_dr[1, :], "-k") + plt.plot(hist_x_est[0, :], hist_x_est[1, :], "-r") + plt.plot(x_est[0], x_est[1], "xk") plt.axis("equal") plt.grid(True) plt.pause(0.001)