Reinforcement Learning Agent for CARLA Autonomous Self-Driving Car Using Python

In the last tutorial of our autonomous self-driving car using CARLA and Python, we developed the class needed for our reinforcement learning environment.
We also set up all the nitty-gritty needed for our reinforcement learning agent to thrive and interact with the reinforcement learning environment.
In this tutorial, we are going to be working on our reinforcement learning agent.
It is worth knowing that every step taken by a reinforcement learning agent comes with a plausible prediction.
The is the ability of the reinforcement learning to choose exploration or exploitation in decision making.
Let me explain this concept in the simplest term.
Daily, most people are confronted with the same dilemma of should I keep doing what I do, or should I try something else.
For instance, should I go to my favorite eatery or should I try a new eatery, should I continue visiting my preferred spa or should I find a new one, etc...
In Reinforcement Learning, this type of decision-making process is called exploitation, when you keep doing what you were doing, or exploration, when you try something new.
For this reason, the question of how much to exploit and how much to explore naturally arises.
This means that our reinforcement learning agent will be training and predicting at the same time. However, our reinforcement learning agent must get as many frames per second as possible.
However, this poses a problem that is inherent in reinforcement learning.
This is because we will be receiving and processing a large amount of data from the image module due to calculations and weight comparisons, and as a result, our reinforcement model will be large.
Also, we want the program to run in real-time.
We would also like our training process to go as quickly as possible. To achieve this, we can use either multiprocessing or threading. These are common programming concepts, you should know.
To make things simple, we will stick with threading.
Introducing Keras and Tensorflow
To create our reinforcement learning agent, we have to think about the model to be used. For that reason, we will be using Keras and Tensorflow.
If you already have Keras and Tensorflow installed, please skip this part of the tutorial. Otherwise, use the commands below to install them.
How to Install and Verify Tensorflow on a macOS
Use the code below to install Tensorflow on a macOS or visit the Official Tensorflow site for other OS platforms.
pip install --upgrade tensorflow
Run the code below to verify your Tensorflow installation.
python -c "import tensorflow as tf;print(tf.reduce_sum(tf.random.normal([1000, 1000])))"
How to Install and Verify Keras on a macOS
Use the code below to install Keras on a macOS or visit the Official Tensorflow site for other OS platforms.
First, let’s install a few Python dependencies:
pip install numpy scipy
pip install scikit-learn
pip install pillow
pip install h5py
Followed by installing Keras itself:
pip install keras
That’s it! Keras and Tensorflow are now installed on your system!
Set Up the Reinforcement Learning Agent
To explain the concept, we will have the main network, which is constantly evolving, and a target network, which will update every n things, where n is whatever you want and things are something like steps, the sequence of events, or episodes.
We will begin by adding some important CONSTANTS
in the autonomous-car-tutorial2.py
file from the last tutorial.
We may not go into details of how they work now, you will learn about each CONSTANT
as we use them.
REPLAY_MEMORY_SIZE = 5_000
MIN_REPLAY_MEMORY_SIZE = 1_000
MINIBATCH_SIZE = 16
PREDICTION_BATCH_SIZE = 1
epsilon = 1
EPSILON_DECAY = 0.95
MIN_EPSILON = 0.001
TRAINING_BATCH_SIZE = MINIBATCH_SIZE // 4 # The double forward slash means that we do not want any remainder
UPDATE_TARGET_EVERY = 5 # Number of episodes before the prediction model is updated
MODEL_NAME = "Xception" # This is the name of the model will we use from Keras
MEMORY_FRACTION = 0.8 # how much GPU we will be allocating to prevent the program from getting more GPU's than what we can spare.
MIN_REWARD = -200
EPISODES = 200
DISCOUNT = 0.99
AGGREGATE_STATS_EVERY = 10
Next, we will create a class named CarAgent
in the autonomous-car-tutorial2.py
file from the last tutorial. This will hold our codes for the reinforcement learning agent.
class CarAgent:
Next, we will add an __init__
method in the class along with some codes.
- Python Code
def __init__(self):
self.model = self.create_model()
self.target_model = self.create_model()
self.target_model.set_weights(self.model.get_weights())
What we are essentially trying to achieve from the code above is to create two models. One of the models is the training model and the order is the prediction model, which will be used to compare the result of our training.
One way to use the two models would be to constantly update the prediction model as we train but doing that might result in getting volatile results.
Hence, what we will do instead is constantly train with the training model while holding the prediction model from updating.
After a certain number of episodes or training turns, we will update the prediction model.
Next, still in the __init__
method, we will write in some codes that enable us to get a memory of previous actions as we train. This will help with stabilizing our training process between both models.
First, we get the necessary import needed.
from collections import deque
Next, we add the code in the __init__
method.
self.replay_memory = deque(maxlen=REPLAY_MEMORY_SIZE)
The constant REPLAY_MEMORY_SIZE
specified above is used here. In the constant we specified, the underscore between the 5 and the 3 zeros serves as a separator like a comma. So, it can be interpreted as 5,000.
Next, we will modify the TensorBoard for speed and storage purposes, and wrap up our __init__
method by setting some final values.
self.tensorboard = ModifiedTensorBoard(log_dir=f"logs/{MODEL_NAME}-{int(time.time())}")
self.target_update_counter = 0
self.graph = tf.get_default_graph()
self.terminate = False
self.last_logged_episode = 0
self.training_initialized = False
The first line prevents the Tensorboard from going out of control, we want to make sure the speed and use of storage are checked.
The second line serves as a counter that is updated after each episode runs. The third line will be useful since we are carrying out prediction and training in different Threads. The fourth line is one of the flags we will use for the Threads.
The fifth line helps to keep track of Tensorboard since some of the things we do will be asynchronous. We are going to use the last line self.training_initialized
to track our progress when we start running simulations.
Final code for the CarAgent
class up to this point.
- Python Code
class CarAgent:
def __init__(self):
self.model = self.create_model()
self.target_model = self.create_model()
self.target_model.set_weights(self.model.get_weights())
self.tensorboard = ModifiedTensorBoard(log_dir=f"logs/{MODEL_NAME}-{int(time.time())}")
self.target_update_counter = 0
self.graph = tf.get_default_graph()
self.terminate = False
self.last_logged_episode = 0
self.training_initialized = False
Creating a Model for Reinforcement Learning Agent Using Keras
Here, we are just going to make use of the prebuilt Xception
model from Tensorflow, but you could use some a different prebuilt model or import yours.
We are also adding GlobalAveragePooling
to our output layer, as well as the three neuron output that represents each possible action that the reinforcement learning agent can take.
Let start by making some required imports for this section. Add the code below at the top of the file where we have other imports.
from keras.applications.xception import Xception
from keras.layers import Dense, GlobalAveragePooling2D
from keras.optimizers import Adam
from keras.models import Model
Now let's create our model:
- Python Code
def create_model(self):
base_model = Xception(weights=None, include_top=False, input_shape=(IMG_HEIGHT, IMG_WIDTH,3))
x = base_model.output
x = GlobalAveragePooling2D()(x)
predictions = Dense(3, activation="linear")(x)
model = Model(inputs=base_model.input, outputs=predictions)
model.compile(loss="mse", optimizer=Adam(lr=0.001), metrics=["accuracy"])
return model
The base_model
specifies the prebuilt Xception
model from Tensorflow as our default model. Whenever we use a prebuilt model from Tensorflow, we have to change the input and output layer. This is done by specifying what to use as input and output.
We used the output of the base_model
, added the GlobalAveragePooling
and dense layer to it.
The "3" specified in the predictions indicate the three possible turns (turn left, go straight, turn right). later on, we set the inputs for the model and the outputs as predictions.
Next, we will add a quick method in our CarAgent for updating the replay memory.
- Python Code
def update_replay_memory(self, transition):
self.replay_memory.append(transition)
The transition contains all the information needed to train the model.
Creating the train
method for our Reinforcement Learning Agent
Next, we will work on the train()
method.
To begin, it is important to know that we only want to train if we have a bare minimum of samples in replay memory
- Python Code
def train(self):
if len(self.replay_memory) < MIN_REPLAY_MEMORY_SIZE:
return
Earlier on, we specified the MIN_REPLAY_MEMORY_SIZE
constant as 1_000 (1000).
The if
statement above ends the training if we do not have enough samples in replay memory because we do not want to do anything if we do not have enough replay memory samples (At least 1000) to work with.
However, if we do have enough replay memory (1000 and above), then we will begin our training.
To do that, we first need to grab a random minibatch.
minibatch = random.sample(self.replay_memory, MINIBATCH_SIZE)
As soon as we have our minibatch, we want to calculate our current and future states so we can do the training operation.
The current state uses the self.model
to predict while the future state uses the target_model
current_states = np.array([transition[0] for transition in minibatch])/255
with self.graph.as_default():
current_list = self.model.predict(current_states, PREDICTION_BATCH_SIZE)
future_states = np.array([transition[3] for transition in minibatch])/255
with self.graph.as_default():
future_list = self.target_model.predict(future_states, PREDICTION_BATCH_SIZE)
Next, we want to enumerate over the batches and do our training.
we create our inputs (X) and outputs (y):
X = []
y = []
for index, (current_state, action, reward, new_state, done) in enumerate(minibatch):
if not done:
max_future_q = np.max(future_list[index])
new_q = reward + DISCOUNT * max_future_q
else:
new_q = reward
current_qs = current_list[index]
current_qs[action] = new_q
X.append(current_state)
y.append(current_qs)
Next, we will need to set up a way to keep track of our training logs.
log_this_step = False
if self.tensorboard.step > self.last_logged_episode:
log_this_step = True
self.last_log_episode = self.tensorboard.step
Next, we will carry out the fitment. We will be setting the tensorboard callback only if log_this_step
is true
. If it's false
, then we will still carry out fitment, but won't log to TensorBoard.
with self.graph.as_default():
self.model.fit(np.array(X)/255, np.array(y), batch_size=TRAINING_BATCH_SIZE, verbose=0, shuffle=False, callbacks=[self.tensorboard] if log_this_step else None)
Next, we want to continue tracking for logging:
if log_this_step:
self.target_update_counter += 1
Finally, we'll check to see if it's time to update our target_model:
if self.target_update_counter > UPDATE_TARGET_EVERY:
self.target_model.set_weights(self.model.get_weights())
self.target_update_counter = 0
That concludes the train
method.
Now, we need a method to get q values (basically to make a prediction).
- Python Code
def get_qs(self, state):
return self.model.predict(np.array(state).reshape(-1, *state.shape)/255)[0]
Finally, we create a method that handles the training and prediction parts of the reinforcement learning agent.
To begin, we will use some random data to initialize, then we begin an infinite loop. An infinite loop is best used here to prevent reinitializing everything whenever we run the program, which speeds things up for us a bit.
- Python Code
def train_in_loop(self):
X = np.random.uniform(size=(1, IMG_HEIGHT, IMG_WIDTH, 3)).astype(np.float32)
y = np.random.uniform(size=(1, 3)).astype(np.float32)
with self.graph.as_default():
self.model.fit(X,y, verbose=False, batch_size=1)
self.training_initialized = True
while True:
if self.terminate:
return
self.train()
time.sleep(0.01)
With that, we are done with the reinforcement learning agent of our autonomous self-driving car using CARLA.
Full Code of Reinforcement Learning Agent.
- Python Code
import glob
import os
import sys
import random
import time
import numpy as np
import cv2
import math
from collections import deque
from keras.applications.xception import Xception
from keras.layers import Dense, GlobalAveragePooling2D
from keras.optimizers import Adam
from keras.models import Model
try:
sys.path.append(glob.glob('../carla/dist/carla-*%d.%d-%s.egg' % (
sys.version_info.major,
sys.version_info.minor,
'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0])
except IndexError:
pass
import carla
SHOW_PREVIEW = False
IMG_WIDTH = 640
IMG_HEIGHT = 480
SECONDS_PER_EPISODE = 10
REPLAY_MEMORY_SIZE = 5_000
MIN_REPLAY_MEMORY_SIZE = 1_000
MINIBATCH_SIZE = 16
PREDICTION_BATCH_SIZE = 1
epsilon = 1
EPSILON_DECAY = 0.95
MIN_EPSILON = 0.001
TRAINING_BATCH_SIZE = MINIBATCH_SIZE // 4
UPDATE_TARGET_EVERY = 5
MODEL_NAME = "Xception"
MEMORY_FRACTION = 0.8
MIN_REWARD = -200
EPISODES = 200
DISCOUNT = 0.99
AGGREGATE_STATS_EVERY = 10
class CarEnvironment:
SHOW_CAM = SHOW_PREVIEW
STEER_AMT = 1.0
img_width = IMG_WIDTH
img_height = IMG_HEIGHT
front_camera = None
def __init__(self):
self.client = carla.Client('localhost', 2000)
self.client.set_timeout(2.0)
self.world = self.client.get_world()
self.blueprint_library = self.world.get_blueprint_library()
self.tesla_model3 = self.blueprint_library.filter('model3')[0]
def reset(self):
self.collision_hist = []
self.actor_list = []
#Vehicle Spawn and and location set up
self.transform = random.choice(self.world.get_map().get_spawn_points())
self.vehicle = self.world.spawn_actor(self.tesla_model3, self.transform)
self.actor_list.append(self.vehicle)
#RGB Camera set up
self.rgb_cam = self.world.get_blueprint_library().find('sensor.camera.rgb')
self.rgb_cam.set_attribute('image_size_x', f'{self.img_width}')
self.rgb_cam.set_attribute('image_size_y', f'{self.img_height}')
self.rgb_cam.set_attribute('fov', '110')
#Vehicle sensor set up
transform = carla.Transform(carla.Location(x=2.5, z=0.7))
self.sensor = self.world.spawn_actor(self.rgb_cam, transform, attach_to=self.vehicle)
self.actor_list.append(self.sensor)
self.sensor.listen(lambda data: self.process_img(data))
self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0))
#Collision sensor set up
time.sleep(4)
collision_sensor = self.world.get_blueprint_library().find('sensor.other.collision')
self.collision_sensor = self.world.spawn_actor(collision_sensor, transform, attach_to=self.vehicle)
self.actor_list.append(self.collision_sensor)
self.collision_sensor.listen(lambda event: self.collision_data(event))
#episodes set up
self.episode_start = time.time()
self.vehicle.apply_control(carla.VehicleControl(brake=0.0, throttle=0.0))
#The value returned
return self.front_camera
def process_img(self, image):
i = np.array(image.raw_data)
#np.save("iout.npy", i)
i2 = i.reshape((self.img_height, self.img_width, 4))
i3 = i2[:, :, :3]
if self.SHOW_CAM:
cv2.imshow("",i3)
cv2.waitKey(1)
self.front_camera = i3
def collision_data(self, event):
self.collision_hist.append(event)
def step(self, action):
if action == 0:
self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=-1*self.STEER_AMT))
elif action == 1:
self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer= 0))
elif action == 2:
self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=1*self.STEER_AMT))
v = self.vehicle.get_velocity()
kmh = int(3.6 * math.sqrt(v.x**2 + v.y**2 + v.z**2))
if len(self.collision_hist) != 0:
done = True
reward = -200
elif kmh < 50:
done = False
reward = -1
else:
done = False
reward = 1
if self.episode_start + SECONDS_PER_EPISODE < time.time():
done = True
return self.front_camera, reward, done, None
class CarAgent:
def __init__(self):
self.model = self.create_model()
self.target_model = self.create_model()
self.target_model.set_weights(self.model.get_weights())
self.replay_memory = deque(maxlen=REPLAY_MEMORY_SIZE)
self.tensorboard = ModifiedTensorBoard(log_dir=f"logs/{MODEL_NAME}-{int(time.time())}")
self.target_update_counter = 0
self.graph = tf.get_default_graph()
self.terminate = False
self.last_logged_episode = 0
self.training_initialized = False
def create_model(self):
base_model = Xception(weights=None, include_top=False, input_shape=(IMG_HEIGHT, IMG_WIDTH,3))
x = base_model.output
x = GlobalAveragePooling2D()(x)
predictions = Dense(3, activation="linear")(x)
model = Model(inputs=base_model.input, outputs=predictions)
model.compile(loss="mse", optimizer=Adam(lr=0.001), metrics=["accuracy"])
return model
def update_replay_memory(self, transition):
self.replay_memory.append(transition)
def train(self):
if len(self.replay_memory) < MIN_REPLAY_MEMORY_SIZE:
return
minibatch = random.sample(self.replay_memory, MINIBATCH_SIZE)
current_states = np.array([transition[0] for transition in minibatch])/255
with self.graph.as_default():
current_list = self.model.predict(current_states, PREDICTION_BATCH_SIZE)
future_states = np.array([transition[3] for transition in minibatch])/255
with self.graph.as_default():
future_list = self.target_model.predict(future_states, PREDICTION_BATCH_SIZE)
X = []
y = []
for index, (current_state, action, reward, new_state, done) in enumerate(minibatch):
if not done:
max_future_q = np.max(future_list[index])
new_q = reward + DISCOUNT * max_future_q
else:
new_q = reward
current_qs = current_list[index]
current_qs[action] = new_q
X.append(current_state)
y.append(current_qs)
log_this_step = False
if self.tensorboard.step > self.last_logged_episode:
log_this_step = True
self.last_log_episode = self.tensorboard.step
with self.graph.as_default():
self.model.fit(np.array(X)/255, np.array(y), batch_size=TRAINING_BATCH_SIZE, verbose=0, shuffle=False, callbacks=[self.tensorboard] if log_this_step else None)
if log_this_step:
self.target_update_counter += 1
if self.target_update_counter > UPDATE_TARGET_EVERY:
self.target_model.set_weights(self.model.get_weights())
self.target_update_counter = 0
def get_qs(self, state):
return self.model.predict(np.array(state).reshape(-1, *state.shape)/255)[0]
def train_in_loop(self):
X = np.random.uniform(size=(1, IMG_HEIGHT, IMG_WIDTH, 3)).astype(np.float32)
y = np.random.uniform(size=(1, 3)).astype(np.float32)
with self.graph.as_default():
self.model.fit(X,y, verbose=False, batch_size=1)
self.training_initialized = True
while True:
if self.terminate:
return
self.train()
time.sleep(0.01)
Wrap Off
If you run into errors or unable to complete this tutorial, feel free to contact us anytime, and we will instantly resolve it. You can also request clarification, download this tutorial as a pdf, or report bugs using the buttons below.