matrix-spam-ml/model_v2.py

433 lines
13 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import csv
import os
import time
from datetime import datetime
import keras_tuner as kt
import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow_hub as hub
from nltk.corpus import stopwords
from tensorflow import keras
logdir = "logs/scalars/" + datetime.now().strftime("%Y%m%d-%H%M%S")
tensorboard_callback = keras.callbacks.TensorBoard(log_dir=logdir)
hypermodel_logdir = (
"logs/scalars/" + datetime.now().strftime("%Y%m%d-%H%M%S") + "_hypermodel"
)
hypermodel_tensorboard_callback = keras.callbacks.TensorBoard(log_dir=hypermodel_logdir)
hypertuner_logdir = "hypertuner_logs/scalars/" + datetime.now().strftime(
"%Y%m%d-%H%M%S"
)
hypertuner_tensorboard_callback = keras.callbacks.TensorBoard(log_dir=hypertuner_logdir)
# Define the checkpoint directory to store the checkpoints.
checkpoint_dir = "./training_checkpoints"
# Define the name of the checkpoint files.
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")
progress_bar = tf.keras.callbacks.ProgbarLogger()
class SpamDectionModel(tf.keras.Model):
def __init__(self, hp_dense1, hp_dropout, hp_l2):
super(SpamDectionModel, self).__init__()
self.dropout = tf.keras.layers.Dropout(
hp_dropout,
)
self.dense1 = tf.keras.layers.Dense(
hp_dense1,
activation="relu",
# kernel_regularizer=tf.keras.regularizers.l2(hp_l2),
)
self.dense2 = tf.keras.layers.Dense(
1, activation="sigmoid", name="score_output"
)
# self.glob_average_pooling_1d = tf.keras.layers.GlobalAveragePooling1D()
self.use_layer = hub.KerasLayer(
"https://tfhub.dev/google/universal-sentence-encoder/4",
trainable=False,
input_shape=[],
dtype=tf.string,
name="USE",
)
@tf.function
def call(self, x, training=False):
x = self.use_layer(x)
if training:
x = self.dropout(x, training=training)
x = self.dense1(x)
# if training:
# x = self.dropout(x, training=training)
return self.dense2(x)
class SpamDectionHyperModel(kt.HyperModel):
def __init__(self):
super(SpamDectionHyperModel, self).__init__()
def build(self, hp):
# Tune the number of units in the first Dense layer
# Choose an optimal value between 128-1024
hp_dense1 = hp.Int("dense1", min_value=128, max_value=1024, step=16)
hp_dropout = hp.Float("dropout", min_value=0.1, max_value=0.9, step=0.1)
hp_l2 = hp.Float("l2", min_value=0.0001, max_value=0.001, step=0.0001)
model = SpamDectionModel(
hp_dense1,
hp_dropout,
hp_l2,
)
# Adam was best so far
# tf.keras.optimizers.Nadam() has similar results to Adam but a bit worse. second best
hp_learning_rate = hp.Choice(
"learning_rate",
values=[
1e-2,
1e-3,
1e-4,
1e-5,
],
)
model.compile(
loss=tf.keras.losses.BinaryCrossentropy(),
optimizer=tf.keras.optimizers.Adam(learning_rate=hp_learning_rate),
metrics=["accuracy"],
)
return model
# Code for text lowercasing
def lower_casing_text(text):
"""
The function will convert text into lower case.
arguments:
input_text: "text" of type "String".
return:
value: text in lowercase
Example:
Input : The World is Full of Surprises!
Output : the world is full of surprises!
"""
# Convert text to lower case
# lower() - It converts all upperase letter of given string to lowercase.
text = text.lower()
return text
def remove_stopwords(input_text):
"""
Function to remove English stopwords from a Pandas Series.
Parameters:
input_text : text to clean
Output:
cleaned Pandas Series
"""
stopwords_list = stopwords.words("english")
# Some words which might indicate a certain sentiment are kept via a whitelist
whitelist = ["n't", "not", "no"]
words = input_text.split()
clean_words = [
word
for word in words
if (word not in stopwords_list or word in whitelist) and len(word) > 1
]
return " ".join(clean_words)
def change_labels(x):
return 1 if x == "spam" else 0
def load_data():
data = pd.read_csv(
"./input/MatrixData.tsv", sep="\t", quoting=csv.QUOTE_NONE, encoding="utf-8"
)
# Minimum length
data.query(
'(message.str.split().str.len() >= 14 & label == "ham") | label == "spam"',
inplace=True,
)
# Remove unknown
data.dropna(inplace=True)
# Remove duplicates
data.drop_duplicates(inplace=True)
data.reset_index(drop=True, inplace=True)
# Change labels
data["label"] = data["label"].apply(change_labels)
# Remove stopwords and lowercase
data["message"] = data["message"].apply(remove_stopwords).apply(lower_casing_text)
# Remove duplicates
data.drop_duplicates(inplace=True)
data.reset_index(drop=True, inplace=True)
# Shuffle data
data = data.sample(frac=1).reset_index(drop=True)
# Split data into messages and label sets
sentences = data["message"].tolist()
labels = data["label"].tolist()
# Separate out the sentences and labels into training and test sets
# training_size = int(len(sentences) * 0.8)
training_size = int(len(sentences) * 0.7)
training_sentences = sentences[0:training_size]
testing_sentences = sentences[training_size:]
training_labels = labels[0:training_size]
testing_labels = labels[training_size:]
# Make labels into numpy arrays for use with the network later
testing_labels_final = np.array(testing_labels)
training_labels_final = np.array(training_labels)
training_sentences_final = np.array(training_sentences)
testing_sentences_final = np.array(testing_sentences)
return (
training_sentences_final,
testing_sentences_final,
training_labels_final,
testing_labels_final,
)
def train_hyperparamters(
training_sentences_final,
testing_sentences_final,
training_labels_final,
testing_labels_final,
tuner,
):
stop_early = tf.keras.callbacks.EarlyStopping(monitor="val_loss", patience=3)
tuner.search(
training_sentences_final,
training_labels_final,
epochs=5,
verbose=1,
validation_data=(testing_sentences_final, testing_labels_final),
callbacks=[hypertuner_tensorboard_callback, stop_early, progress_bar],
)
# Get the optimal hyperparameters
best_hps = tuner.get_best_hyperparameters(num_trials=1)[0]
print(
f"""
The hyperparameter search is complete. The optimal number of units in the first densely-connected
layer is {best_hps.get('dense1')} and the optimal learning rate for the optimizer is {best_hps.get('learning_rate')}.
The optimal dropout rate is {best_hps.get('dropout')} and the optimal l2 rate is {best_hps.get('l2')}.
"""
)
return best_hps
def train_model(
training_sentences_final,
testing_sentences_final,
training_labels_final,
testing_labels_final,
best_hps,
tuner,
):
model = SpamDectionModel(
64,
0.2,
0,
)
model.compile(
loss=tf.keras.losses.BinaryCrossentropy(),
optimizer=tf.keras.optimizers.Adam(),
metrics=["accuracy"],
)
num_epochs = 500
model = tuner.hypermodel.build(best_hps)
history = model.fit(
training_sentences_final,
training_labels_final,
epochs=num_epochs,
verbose=1,
validation_data=(testing_sentences_final, testing_labels_final),
callbacks=[tensorboard_callback, progress_bar],
)
val_acc_per_epoch = history.history["val_accuracy"]
best_epoch = val_acc_per_epoch.index(max(val_acc_per_epoch)) + 1
print("Best epoch: %d" % (best_epoch,))
print("Average train loss: ", np.average(history.history["loss"]))
print("Average test loss: ", np.average(history.history["val_loss"]))
model = SpamDectionModel(
64,
0.2,
0,
)
model.compile(
loss=tf.keras.losses.BinaryCrossentropy(),
optimizer=tf.keras.optimizers.Adam(),
metrics=["accuracy"],
)
hypermodel = tuner.hypermodel.build(best_hps)
hypermodel_history = hypermodel.fit(
training_sentences_final,
training_labels_final,
verbose=1,
epochs=best_epoch,
validation_data=(testing_sentences_final, testing_labels_final),
callbacks=[
hypermodel_tensorboard_callback,
# tf.keras.callbacks.ModelCheckpoint(
# filepath=checkpoint_prefix, save_weights_only=True
# ),
progress_bar,
],
)
print(
"Average train loss(hypermodel_history): ",
np.average(hypermodel_history.history["loss"]),
)
print(
"Average test loss(hypermodel_history): ",
np.average(hypermodel_history.history["val_loss"]),
)
return hypermodel
def test_model(model):
# Use the model to predict whether a message is spam
text_messages = [
"Greg, can you call me back once you get this?",
"Congrats on your new iPhone! Click here to claim your prize...",
"Really like that new photo of you",
"Did you hear the news today? Terrible what has happened...",
"Attend this free COVID webinar today: Book your session now...",
"Are you coming to the party tonight?",
"Your parcel has gone missing",
"Do not forget to bring friends!",
"You have won a million dollars! Fill out your bank details here...",
"Looking forward to seeing you again",
"oh wow https://github.com/MGCodesandStats/tensorflow-nlp/blob/master/spam%20detection%20tensorflow%20v2.ipynb works really good on spam detection. Guess I go with that as the base model then lol :D",
"ayo",
"Almost all my spam is coming to my non-gmail address actually",
"Oh neat I think I found the sizing sweetspot for my data :D",
"would never click on buttons in gmail :D always expecting there to be a bug in gmail that allows js to grab your google credentials :D XSS via email lol. I am too scared for touching spam in gmail",
"back to cacophony ",
"Room version 11 when",
"skip 11 and go straight to 12",
"100 events should clear out any events that might be causing a request to fail lol",
"I'll help anyone interested on how to invest and earn $30k, $50k, $100k, $200k or more in just 72hours from the crypto market.But you will have to pay me my commission! when you receive your profit! if interested send me a direct message let's get started or via WhatsApp +1 (605) 9536801",
"R-66Y/ircEtro: https://crt.sh/?id=8329576543 - crtshamindaeu: [Precertificate] Issued by R3; Valid from 2022-12-31 to 2023-03-31; Serial number 03fa470b78814f539d7b8074c4b8e0efd3a4"
]
text_messages = list(map(str.lower, text_messages))
spam_no_spam = [
False,
True,
False,
False,
True,
False,
False,
False,
True,
False,
False,
False,
False,
False,
False,
False,
False,
False,
False,
True,
False,
]
# print(text_messages)
# Create the sequences
classes = model.predict(np.array(text_messages))
# The closer the class is to 1, the more likely that the message is spam
correct = 0
expected = len(spam_no_spam)
for x in range(len(text_messages)):
print(f'Message: "{text_messages[x]}"')
print(f"Likeliness of spam in percentage: {classes[x][0]:.5f}")
spam = classes[x][0] >= 0.8
if spam:
print("Vote by AI: Spam")
else:
print("Vote by AI: Not Spam")
if spam_no_spam[x] != spam:
print("Model failed to predict correctly")
else:
correct = correct + 1
print("Model predicted correctly")
print("\n")
print(f"{correct} out of {expected} are detected correctly\n")
def main():
tf.get_logger().setLevel("ERROR")
print("TensorFlow version:", tf.__version__)
print("[Step 1/6] Loading data")
(
training_sentences_final,
testing_sentences_final,
training_labels_final,
testing_labels_final,
) = load_data()
model = SpamDectionHyperModel()
tuner = kt.Hyperband(
model,
objective="val_accuracy",
max_epochs=100,
directory="hyper_tuning",
project_name="spam-keras",
)
print("[Step 3/6] Tuning hypervalues")
best_hps = train_hyperparamters(
training_sentences_final,
testing_sentences_final,
training_labels_final,
testing_labels_final,
tuner,
)
print("[Step 4/6] Training model")
model = train_model(
training_sentences_final,
testing_sentences_final,
training_labels_final,
testing_labels_final,
best_hps,
tuner,
)
print("[Step 5/6] Saving model")
export_path = f"./models/spam_keras_{time.time()}"
print("Exporting trained model to", export_path)
model.save(export_path)
print("[Step 6/6] Testing model")
test_model(model)
if __name__ == "__main__":
main()