# Predicting NYC Taxi Fares with RAPIDS

RAPIDS is a suite of GPU accelerated data science libraries with APIs that should be familiar to users of Pandas, Dask, and Scikitlearn.

This notebook focuses on showing how to use cuDF with Dask & XGBoost to scale GPU DataFrame ETL-style operations & model training out to multiple GPUs on mutliple nodes as part of Google Cloud Dataproc.

Anaconda has graciously made some of the NYC Taxi dataset available in a public Google Cloud Storage bucket. We'll use our Dataproc Cluster of GPUs to process it and train a model that predicts the fare amount.

For EDA we show the examples using [Holoviews](http://holoviews.org/) and [hvplot](https://hvplot.holoviz.org/). Best way to install Holoviews is to from `conda-forge` channel `conda install -c conda-forge holoviews`
and for hvplot `pyviz` channel. `conda install -c pyviz hvplot`

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
import cupy
import numba, socket, time
import cudf
import dask, dask_cudf
import xgboost as xgb
import cuspatial
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster

#To install Holoviews and hvplot
#conda install -c conda-forge holoviews
#conda install -c pyviz hvplot
import holoviews as hv
from holoviews import opts
import numpy as np
import hvplot.cudf
import hvplot.dask
hv.extension('bokeh')

# connect to the Dask cluster created at Dataproc startup time
# client = Client(socket.gethostname()+':8786')

# if you're not using Dataproc, you can use dask_cuda to create a local CUDA cluster as well
# as shown in the commented code below. Check https://github.com/rapidsai/dask-cuda for help

cluster = LocalCUDACluster()
client = Client(cluster)

# forces workers to restart. useful to ensure GPU memory is clear
# client.restart()

client

# Inspecting the Data

Now that we have a cluster of GPU workers, we'll use [dask-cudf](https://github.com/rapidsai/dask-cudf/) to load and parse a bunch of CSV files into a distributed DataFrame. 

In [None]:
'''if you get 'ModuleNotFoundError: No module named 'gcsfs', run `!pip install gcsfs` 
'''
base_path = '/yellow-taxi/'

df_2014 = dask_cudf.read_csv(base_path+'2014/yellow_*.csv')
df_2014.head()

# Data Cleanup

As usual, the data needs to be massaged a bit before we can start adding features that are useful to an ML model.

For example, in the 2014 taxi CSV files, there are `pickup_datetime` and `dropoff_datetime` columns. The 2015 CSVs have `tpep_pickup_datetime` and `tpep_dropoff_datetime`, which are the same columns. One year has `rate_code`, and another `RateCodeID`.

Also, some CSV files have column names with extraneous spaces in them.

Worst of all, starting in the July 2016 CSVs, pickup & dropoff latitude and longitude data were replaced by location IDs, making the second half of the year useless to us.

We'll do a little string manipulation, column renaming, and concatenating of DataFrames to sidestep the problems.

In [None]:
#Dictionary of required columns and their datatypes
must_haves = {
     'pickup_datetime': 'datetime64[s]',
     'dropoff_datetime': 'datetime64[s]',
     'passenger_count': 'int32',
     'trip_distance': 'float32',
     'pickup_longitude': 'float32',
     'pickup_latitude': 'float32',
     'rate_code': 'int32',
     'dropoff_longitude': 'float32',
     'dropoff_latitude': 'float32',
     'fare_amount': 'float32'
    }

In [None]:
def clean(ddf, must_haves):
    # replace the extraneous spaces in column names and lower the font type
    tmp = {col:col.strip().lower() for col in list(ddf.columns)}
    ddf = ddf.rename(columns=tmp)

    ddf = ddf.rename(columns={
        'tpep_pickup_datetime': 'pickup_datetime',
        'tpep_dropoff_datetime': 'dropoff_datetime',
        'ratecodeid': 'rate_code'
    })
    
    ddf['pickup_datetime'] = ddf['pickup_datetime'].astype('datetime64[ms]')
    ddf['dropoff_datetime'] = ddf['dropoff_datetime'].astype('datetime64[ms]')

    for col in ddf.columns:
        if col not in must_haves:
            ddf = ddf.drop(columns=col)
            continue
        # if column was read as a string, recast as float
        if ddf[col].dtype == 'object':
            ddf[col] = ddf[col].str.fillna('-1')
            ddf[col] = ddf[col].astype('float32')
        else:
            # downcast from 64bit to 32bit types
            # Tesla T4 are faster on 32bit ops
            if 'int' in str(ddf[col].dtype):
                ddf[col] = ddf[col].astype('int32')
            if 'float' in str(ddf[col].dtype):
                ddf[col] = ddf[col].astype('float32')
            ddf[col] = ddf[col].fillna(-1)
    
    return ddf

<b> NOTE: </b>We will realize that some of 2015 data has column name as `RateCodeID` and others have `RatecodeID`. When we rename the columns in the clean function, it internally doesn't pass meta while calling map_partitions(). This leads to the error of column name mismatch in the returned data. For this reason, we will call the clean function with map_partition and pass the meta to it. Here is the link to the bug created for that: https://github.com/rapidsai/cudf/issues/5413 

In [None]:
df_2014 = df_2014.map_partitions(clean, must_haves, meta=must_haves)

We still have 2015 and the first half of 2016's data to read and clean. Let's increase our dataset.

In [None]:
df_2015 = dask_cudf.read_csv(base_path+'2015/yellow_*.csv')

In [None]:
df_2015 = df_2015.map_partitions(clean, must_haves, meta=must_haves)

# Handling 2016's Mid-Year Schema Change

In 2016, only January - June CSVs have the columns we need. If we try to read base_path+2016/yellow_*.csv, Dask will not appreciate having differing schemas in the same DataFrame.

Instead, we'll need to create a list of the valid months and read them independently.

In [None]:
months = [str(x).rjust(2, '0') for x in range(1, 7)]
valid_files = [base_path+'2016/yellow_tripdata_2016-'+month+'.csv' for month in months]

In [None]:
#read & clean 2016 data and concat all DFs
df_2016 = dask_cudf.read_csv(valid_files).map_partitions(clean, must_haves, meta=must_haves)

#concatenate multiple DataFrames into one bigger one
taxi_df = dask.dataframe.multi.concat([df_2014, df_2015, df_2016])

In [None]:
taxi_df = taxi_df.persist()

## Exploratory Data Analysis (EDA)

Here, we are checking out if there are any non-sensical records and outliers, and in such case, we need to remove them from the dataset.

In [None]:
# check out if there is any negative total trip time
taxi_df[taxi_df.dropoff_datetime <= taxi_df.pickup_datetime].head()

In [None]:
# check out if there is any abnormal data where trip distance is short, but the fare is very high.
taxi_df[(taxi_df.trip_distance < 10) & (taxi_df.fare_amount > 300)].head()

In [None]:
# check out if there is any abnormal data where trip distance is long, but the fare is very low.
taxi_df[(taxi_df.trip_distance > 50) & (taxi_df.fare_amount < 50)].head()

In [None]:
#Using only 2016-01 data for visuals.
#taxi_df_cdf = clean(cudf.read_csv(valid_files[0]),must_haves)

#Using entire 2016 data for visualization
taxi_df_cdf = dask_cudf.read_csv(valid_files,npartitions=8).map_partitions(clean,must_haves,meta=must_haves).compute()

The plot below visualizes the histogram of trip_distance and we can see some abnormal trip_distance values for some records. Taking this and also the NYC map coordinates into consideration, we will only select records where tripdistance < 500 miles.

In [None]:
%%time
#Histogram using cupy and Holoviews
# frequencies, edges = cupy.histogram(x=cupy.array(taxi_df_cdf["trip_distance"]) , bins=20)
# hist = hv.Histogram((np.array(edges.tolist()), np.array(frequencies.tolist())))

#Histogram using hvplot
hist = taxi_df_cdf.hvplot.hist("trip_distance", bins=20)

#Customizing the plot
hist.opts(xlabel="trip distance (miles)",ylabel="count",color="green",width=900, height=400)

Similarly, the plot below visualizes the histogram of fare_amount and we can see some abnormal fare_amount values for some records. Taking this and also the NYC map coordinates into consideration, we will only select records where fare_amount < 500$.

In [None]:
%%time
#Histogram using cupy and Holoviews
# frequencies, edges = cupy.histogram(x=cupy.array(taxi_df_cdf["fare_amount"]) , bins=20)
# hist = hv.Histogram((np.array(edges.tolist()), np.array(frequencies.tolist())))

#Histogram using hvplot
hist = taxi_df_cdf.hvplot.hist("fare_amount", bins=20)

#Customizing the plot
hist.opts(xlabel="fare amount ($)",ylabel="count",color="green",width=900, height=400)

In [None]:
%%time
# Plot the number of passengers per trip. We'll remove the records where passenger_count > 5.
# Plotting using Holoviews
#bar = hv.Bars(taxi_df_cdf.groupby("passenger_count").size().to_frame().rename(columns={0:"count"}))

# Plotting using hvplot
df_bar = taxi_df_cdf.groupby("passenger_count").size().to_frame().rename(columns={0:"count"}).reset_index()
bar = df_bar.hvplot.bar(x="passenger_count",y="count")

#Customizing the plot
bar.opts(color="green",width=900, height=400)

EDA visuals and additional analysis yield the filter logic below.

In [None]:
# apply a list of filter conditions to throw out records with missing or outlier values
query_frags = [
    'fare_amount > 1 and fare_amount < 500',
    'passenger_count > 0 and passenger_count < 6',
    'pickup_longitude > -75 and pickup_longitude < -73',
    'dropoff_longitude > -75 and dropoff_longitude < -73',
    'pickup_latitude > 40 and pickup_latitude < 42',
    'dropoff_latitude > 40 and dropoff_latitude < 42',
    'trip_distance > 0 and trip_distance < 500',
    'not (trip_distance > 50 and fare_amount < 50)',
    'not (trip_distance < 10 and fare_amount > 300)',
    'not dropoff_datetime <= pickup_datetime'
]
taxi_df = taxi_df.query(' and '.join(query_frags))

In [None]:
# reset_index and drop index column
taxi_df = taxi_df.reset_index(drop=True)
taxi_df.head()

# Adding Interesting Features

Dask & cuDF provide standard DataFrame operations, but also let you run "user defined functions" on the underlying data. Here we use [dask.dataframe's map_partitions](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.map_partitions) to apply user defined python function on each DataFrame partition.

We'll use a Haversine Distance calculation to find total trip distance, and extract additional useful variables from the datetime fields.

In [None]:
## add features

taxi_df['hour'] = taxi_df['pickup_datetime'].dt.hour
taxi_df['year'] = taxi_df['pickup_datetime'].dt.year
taxi_df['month'] = taxi_df['pickup_datetime'].dt.month
taxi_df['day'] = taxi_df['pickup_datetime'].dt.day
taxi_df['day_of_week'] = taxi_df['pickup_datetime'].dt.weekday
taxi_df['is_weekend'] = (taxi_df['day_of_week']>=5).astype('int32')

#calculate the time difference between dropoff and pickup.
taxi_df['diff'] = taxi_df['dropoff_datetime'].astype('int64') - taxi_df['pickup_datetime'].astype('int64')
taxi_df['diff']=(taxi_df['diff']/1000).astype('int64')

taxi_df['pickup_latitude_r'] = taxi_df['pickup_latitude']//.01*.01
taxi_df['pickup_longitude_r'] = taxi_df['pickup_longitude']//.01*.01
taxi_df['dropoff_latitude_r'] = taxi_df['dropoff_latitude']//.01*.01
taxi_df['dropoff_longitude_r'] = taxi_df['dropoff_longitude']//.01*.01

taxi_df = taxi_df.drop('pickup_datetime', axis=1)
taxi_df = taxi_df.drop('dropoff_datetime', axis=1)

In [None]:
def haversine_dist(df):
    h_distance = cuspatial.haversine_distance(df['pickup_longitude'], df['pickup_latitude'], df['dropoff_longitude'], df['dropoff_latitude'])
    df['h_distance']= h_distance
    df['h_distance']= df['h_distance'].astype('float32')
    return df

taxi_df = taxi_df.map_partitions(haversine_dist)
taxi_df.head()

In [None]:
taxi_df = taxi_df.persist()

# Pick a Training Set

Let's imagine you're making a trip to New York on the 25th and want to build a model to predict what fare prices will be like the last few days of the month based on the first part of the month. We'll use a query expression to identify the `day` of the month to use to divide the data into train and test sets.

The wall-time below represents how long it takes your GPU cluster to load data from the Google Cloud Storage bucket and the ETL portion of the workflow.

In [None]:
#since we calculated the h_distance let's drop the trip_distance column, and then do model training with XGB.
taxi_df = taxi_df.drop('trip_distance', axis=1)

In [None]:
# this is the original data partition for train and test sets.
X_train = taxi_df.query('day < 25').persist()

# create a Y_train ddf with just the target variable
Y_train = X_train[['fare_amount']].persist()
# drop the target variable from the training ddf
X_train = X_train[X_train.columns.difference(['fare_amount'])]

# this wont return until all data is in GPU memory
done = wait([X_train, Y_train])

# Train the XGBoost Regression Model

The wall time output below indicates how long it took your GPU cluster to train an XGBoost model over the training set.

In [None]:
dtrain = xgb.dask.DaskDMatrix(client, X_train, Y_train)

In [None]:
%%time

trained_model = xgb.dask.train(client,
                        {
                         'learning_rate': 0.3,
                          'max_depth': 8,
                          'objective': 'reg:squarederror',
                          'subsample': 0.6,
                          'gamma': 1,
                          'silent': True,
                          'verbose_eval': True,
                          'tree_method':'gpu_hist'
                        },
                        dtrain,
                        num_boost_round=100, evals=[(dtrain, 'train')])

In [None]:
ax = xgb.plot_importance(trained_model['booster'], height=0.8, max_num_features=10, importance_type="gain")
ax.grid(False, axis="y")
ax.set_title('Estimated feature importance')
ax.set_xlabel('importance')
plt.show()

# How Good is Our Model?

Now that we have a trained model, we need to test it with the 25% of records we held out.

Based on the filtering conditions applied to this dataset, many of the DataFrame partitions will wind up having 0 rows. This is a problem for XGBoost which doesn't know what to do with 0 length arrays. We'll repartition the data. 

In [None]:
def drop_empty_partitions(df):
    lengths = df.map_partitions(len).compute()
    nonempty = [length > 0 for length in lengths]
    return df.partitions[nonempty]

In [None]:
X_test = taxi_df.query('day >= 25').persist()
X_test = drop_empty_partitions(X_test)

# Create Y_test with just the fare amount
Y_test = X_test[['fare_amount']].persist()

# Drop the fare amount from X_test
X_test = X_test[X_test.columns.difference(['fare_amount'])]

# this wont return until all data is in GPU memory
done = wait([X_test, Y_test])

# display test set size
len(X_test)

## Calculate Prediction

In [None]:
# generate predictions on the test set
'''feed X_test as a dask.dataframe'''

booster = trained_model["booster"] # "Booster" is the trained model
history = trained_model['history'] # "History" is a dictionary containing evaluation results 

booster.set_param({'predictor': 'gpu_predictor'})

prediction = xgb.dask.predict(client, booster, X_test).persist()

wait(prediction)

prediction.head()

In [None]:
prediction = prediction.map_partitions(lambda part: cudf.Series(part)).reset_index(drop=True)
actual = Y_test['fare_amount'].reset_index(drop=True)

NOTE: We mapped each partition of the result from `xgb.dask.predict` into `cudf.Series` to be able to substract it from `actual` data. Here is the issue asking XGBoost to solve that before returning data from `xgb.dask.predict` https://github.com/dmlc/xgboost/issues/5823#issuecomment-648526888

In [None]:
prediction.head()

In [None]:
actual.head()

In [None]:
# Calculate RMSE
squared_error = ((prediction-actual)**2)

# compute the actual RMSE over the full test set
cupy.sqrt(squared_error.mean().compute())

## Save Trained Model for Later UseÂ¶

To make a model maximally useful, you need to be able to save it for later use. We'll use Google Cloud Storage to persist the trained model in a dill file.

In [None]:
import gcsfs, dill

fs = gcsfs.GCSFileSystem()
# replace with a bucket you own
bucket = 'rapidsai-test-1/'

with fs.open(bucket+'trained_model.dill', 'wb') as file:  
    dill.dump(trained_model, file)

As an alternative, you can save the trained model on your system as below.

In [None]:
# Save the model to file
booster.save_model('xgboost-model')
print('Training evaluation history:', history)

## Reload a Saved Model from Disk

You can also read the saved model back out of Google Cloud Storage and into a normal XGBoost model object.

In [None]:
with fs.open(bucket+'trained_model.dill', 'rb') as file:  
    model_from_disk = dill.load(file)

# Generate predictions on the test set again, but this time using the reloaded model
prediction = xgb.dask.predict(client, model_from_disk, X_test).persist()
wait(prediction)

# Verify that the predictions result in the same RMSE error
prediction = prediction.map_partitions(lambda part: cudf.Series(part)).reset_index(drop=True)
actual = Y_test['fare_amount'].reset_index(drop=True)

squared_error = ((prediction-actual)**2)

# compute the actual RMSE over the full test set
cupy.sqrt(squared_error.mean().compute())

# cuML part

## Normalize data

In [None]:
from sklearn.preprocessing import MinMaxScaler, StandardScaler
scaler_x = MinMaxScaler()
scaler_y = StandardScaler()

In [None]:
scaler_x.fit(X_train)
X_train = scaler_x.transform(X_train)
X_test = scaler_x.transform(X_test)

In [None]:
scaler_y.fit(Y_train.to_numpy().reshape(-1, 1))
Y_train = scaler_y.transform(Y_train.to_numpy().reshape(-1, 1)).ravel()
Y_test = scaler_y.transform(Y_test.to_numpy().reshape(-1, 1)).ravel()

## Train Ridge regression model

In [None]:
from cuml import Ridge
from cuml.metrics.regression import mean_squared_error

In [None]:
%%time
model_r = Ridge()
model_r.fit(X_train, Y_train)

## Let's look at RMSE metric of models

In [None]:
y_pred = model_r_p.predict(X_test)

mse_r = mean_squared_error(Y_test, y_pred)

print(f'RMSE of Ridge: {np.sqrt(mse_r_p)}')