im building an ml service that detects emotions. my motivation behind this project is to learn about distributed ways of training and serving ml models. im going to use ray for distributing workloads across nodes. ray is being used by top tech companies to train large language models. ray abstracts a lot of complexity for us.
lets create a cluster of machines to scale the workloads effortlessly. this cluster has a head node that manages the cluster and several worker nodes that will execute workloads. we can then implement auto-scaling based on our application's computing needs.
im going to create our cluster by defining a computing configuration and an environment.
im using a macbook air for this project but you can use any os including cloud. im using pyenv
to create the virtual environments and switch between python versions easily. to create a cluster on the cloud you'll need a yaml with all the configurations with a base image, env variables, etc.
pyenv install 3.10.11 # install
pyenv global 3.10.11 # set default
once the pyenv is installed, create a virtual environment to install the dependencies.
mkdir detect-emotions
cd detect-emotions
python3 -m venv venv # create virtual environment
source venv/bin/activate
python3 -m pip install --upgrade pip setuptools wheel
now define the compute configuration in the cluster_compute.yaml
that specify the hardware dependencies for workload execution. if you're using cloud computing platforms like aws, define configurations such as region, instance_type, min_workers, max_workers, etc.
im doing this on my laptop. there's one cpu(head node) and some of the remaining cpus are worker nodes.
create a github repo and clone it.
export GITHUB_USERNAME="aniket-mish"
git clone https://github.com/aniket-mish/detect-emotions.git .
git remote set-url origin https://github.com/$GITHUB_USERNAME/detect-emotions.git
git checkout -b dev
export PYTHONPATH=$PYTHONPATH:$PWD
next, install the necessary packages by requirements.txt
file.
python3 -m pip install -r requirements.txt
the recommendations is to use pre-commit
that keeps your syntaxes/jsons/yamls/credentials in check.
pre-commit install
pre-commit autoupdate
start experimenting in a jupyter notebook.
jupyter lab notebooks/emotions.ipynb
to check if ray is installed.
import ray
# initialize Ray
if ray.is_initialized():
ray.shutdown()
ray.init()
view cluster resources.
ray.cluster_resources()
This is a typical setup of pipelines with mature mlops practices.
im downloading the dataset from huggingface.
from datasets import load_dataset
hf_dataset = load_dataset("dair-ai/emotion")
the dataset already has train, val and test sets.
DatasetDict({
train: Dataset({
features: ['text', 'label'],
num_rows: 16000
})
validation: Dataset({
features: ['text', 'label'],
num_rows: 2000
})
test: Dataset({
features: ['text', 'label'],
num_rows: 2000
})
})
data exploration is kinda the first step of every ml project. so convert the dataset to a pandas dataframe.
hf_dataset.set_format("pandas")
train_df = hf_dataset["train"][:]
number of data points for each emotion.
all_labels = Counter(train_df.label)
all_labels.most_common()
[('joy', 5362),
('sadness', 4666),
('anger', 2159),
('fear', 1937),
('love', 1304),
('surprise', 572)]
encode the text labels into indices and vice versa. im using SentenceTransformers
to tokenize our text.
from sentence_transformers import SentenceTransformer
model = SentenceTransformer("all-MiniLM-L6-v2")
embeddings = model.encode(df["text"])
we can convert this into a preprocess
function.
this dataset can fit on a single machine but the datasets are used to train llms are huge and can't fit on a single machine and so we need to distribut our data processing ops to different machines. we use ray data
and load the data in a streaming fashion.
you can read the data from various sources
import ray.data
from datasets import load_dataset
hf_ds = load_dataset("dair-ai/emotion")
ray_ds = ray.data.from_huggingface(hf_ds["train"])
ray_ds.take(2)
data processing in ray can be done using map_batches()
. this helps preprocess data in batches.
train_ds.map_batches(preprocess)