Skip to content

Commit

Permalink
WIP: Created a wrapper for Keras.
Browse files Browse the repository at this point in the history
* Predict is working with default VGG16 and Resnet models.
* TODO: Test and add all other models supported by Keras
* TODO: Implement training functionality
* TODO: Implement functionality to predict from trained models provided by user/trained in gramex
  • Loading branch information
radheyakale committed Jun 24, 2022
1 parent d97c32e commit fe2c85e
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 76 deletions.
94 changes: 26 additions & 68 deletions gramex/handlers/mlhandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ def setup(cls, data=None, model={}, config_dir='', template=DEFAULT_TEMPLATE, **
config_dir = op.join(gramex.config.variables['GRAMEXDATA'], 'apps', 'mlhandler',
slugify(cls.name))
cls.store = ml.ModelStore(config_dir)
cls.is_cv_request = False
if 'cv_model' in config_dir:
cls.is_cv_request = True

cls.template = template
super(MLHandler, cls).setup(**kwargs)
Expand Down Expand Up @@ -98,15 +95,9 @@ def setup(cls, data=None, model={}, config_dir='', template=DEFAULT_TEMPLATE, **
model_params = model.get('params', {})
cls.store.dump('class', mclass)
cls.store.dump('params', model_params)
if cls.is_cv_request:
pass
elif hasattr(cls.store, 'model_path') and op.exists(cls.store.model_path):
# If the pkl exists, load it
if op.isdir(cls.store.model_path):
mclass, wrapper = ml.search_modelclass(mclass)
cls.model = locate(wrapper).from_disk(mclass, cls.store.model_path)
else:
cls.model = get_model(cls.store.model_path, {})
# If the pkl exists, load it
if op.isdir(cls.store.model_path):
cls.model = get_model(mclass, model_params)
elif data is not None:
data = cls._filtercols(data)
data = cls._filterrows(data)
Expand Down Expand Up @@ -190,38 +181,9 @@ def _transform(self, data, **kwargs):
return data

def _predict(self, data=None, score_col=''):
if self.is_cv_request:
from tensorflow.keras.applications.resnet50 import ResNet50
from tensorflow.keras.preprocessing import image
from tensorflow.keras.models import load_model
from tensorflow.keras.applications.resnet50 import preprocess_input, decode_predictions

config_dir = op.join(gramex.config.variables['GRAMEXDATA'], 'apps', 'mlhandler',
slugify(self.name))
if op.exists(config_dir) and 'keras_metadata.pb' in os.listdir(config_dir):
model = load_model(config_dir)
else:
model = ResNet50(include_top=True,
weights="imagenet",
input_tensor=None,
input_shape=None,
pooling=None,
classes=1000)
x = image.img_to_array(data)
x = np.expand_dims(x, axis=0)
x = preprocess_input(x)

preds = model.predict(x)
# decode the results into a list of tuples (class, description, probability)
# (one such list for each sample in the batch)
try:
results = decode_predictions(preds)
except Exception:
class_names = []
class_names = json.load(open(op.join(config_dir, 'class_names.json')))
results = dict(zip(class_names, preds[0]))
return results

if type(data) == np.ndarray:
data = self.model.predict(data=data, mclass=self.store.load('class'))
return data
metric = self.get_argument('_metric', False)
if metric:
scorer = get_scorer(metric)
Expand All @@ -245,7 +207,7 @@ def _predict(self, data=None, score_col=''):
def _check_model_path(self):
try:
klass, wrapper = ml.search_modelclass(self.store.load('class'))
if hasattr(self.store, 'model_path'):
if hasattr(self.store, 'model_path') and not op.isdir(self.store.model_path):
self.model = locate(wrapper).from_disk(self.store.model_path, klass=klass)
except FileNotFoundError:
raise HTTPError(NOT_FOUND, f'No model found at {self.store.model_path}')
Expand Down Expand Up @@ -277,8 +239,7 @@ def get(self, *path_args, **path_kwargs):
elif '_cache' in self.args:
self.write(self.store.load_data().to_json(orient='records'))
else:
if not self.is_cv_request:
self._check_model_path()
self._check_model_path()
if '_download' in self.args:
self.set_header('Content-Type', 'application/octet-stream')
self.set_header('Content-Disposition',
Expand Down Expand Up @@ -384,28 +345,25 @@ def _train_keras(self, data):
return class_names

def _train(self, data=None):
if self.is_cv_request:
result = self._train_keras(data)
target_col = self.get_argument('target_col', self.store.load('target_col'))
index_col = self.get_argument('index_col', self.store.load('index_col'))
self.store.dump('target_col', target_col)
data = self._parse_data(False) if data is None else data
data = self._filtercols(data)
data = self._filterrows(data)
self.model = get_model(
self.store.load('class'), self.store.load('params'),
data=data, target_col=target_col,
nums=self.store.load('nums'), cats=self.store.load('cats')
)
if not isinstance(self.model, ml.SklearnTransformer):
target = data[target_col]
train = data[[c for c in data if c not in (target_col, index_col)]]
self.model.fit(train, target, self.store.model_path)
result = {'score': self.model.score(train, target)}
else:
target_col = self.get_argument('target_col', self.store.load('target_col'))
index_col = self.get_argument('index_col', self.store.load('index_col'))
self.store.dump('target_col', target_col)
data = self._parse_data(False) if data is None else data
data = self._filtercols(data)
data = self._filterrows(data)
self.model = get_model(
self.store.load('class'), self.store.load('params'),
data=data, target_col=target_col,
nums=self.store.load('nums'), cats=self.store.load('cats')
)
if not isinstance(self.model, ml.SklearnTransformer):
target = data[target_col]
train = data[[c for c in data if c not in (target_col, index_col)]]
self.model.fit(train, target, self.store.model_path)
result = {'score': self.model.score(train, target)}
else:
self.model.fit(data, None, self.store.model_path)
result = self.model.get_attributes()
self.model.fit(data, None, self.store.model_path)
result = self.model.get_attributes()
return result

def _retrain(self):
Expand Down
71 changes: 63 additions & 8 deletions gramex/ml_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
"statsmodels.tsa.statespace.sarimax",
],
"gramex.ml_api.HFTransformer": ["gramex.transformers"],
"gramex.ml_api.KerasApplications": [
"tensorflow.keras.applications.vgg16",
"tensorflow.keras.applications.resnet50"
]
}


Expand Down Expand Up @@ -203,8 +207,12 @@ class ModelStore(cache.JSONStore):

def __init__(self, path, *args, **kwargs):
_mkdir(path)
self.data_store = op.join(path, "data.h5")
self.model_path = op.join(path, op.basename(path) + ".pkl")
if op.exists(op.join(path, "data.h5")):
self.data_store = op.join(path, "data.h5")
self.model_path = op.join(path, op.basename(path) + ".pkl")
else:
self.data_store = path
self.model_path = path
self.path = path
super(ModelStore, self).__init__(op.join(path, "config.json"), *args, **kwargs)

Expand Down Expand Up @@ -397,19 +405,20 @@ def _predict(self, X, **kwargs):


class HFTransformer(SklearnModel):
@classmethod
def from_disk(cls, path, klass):
# Load model from disk
model = op.join(path, "model")
tokenizer = op.join(path, "tokenizer")
return cls(klass(model, tokenizer))

def __init__(self, model, params=None, data=None, **kwargs):
self.model = model
if params is None:
params = {"text_col": "text", "target_col": "label"}
self.params = params
self.kwargs = kwargs

@classmethod
def from_disk(cls, path, klass):
model = op.join(path, "model")
tokenizer = op.join(path, "tokenizer")
return cls(klass(model, tokenizer))

def fit(
self,
X: Union[pd.DataFrame, np.ndarray],
Expand All @@ -426,3 +435,49 @@ def _predict(
):
text = X["text"]
return self.model.predict(text)


class KerasApplications(AbstractModel):
def __init__(self, model, params=None, data=None, **kwargs):
self.model = model
if params is None:
params = {}
self.params = params
self.kwargs = kwargs

@classmethod
def from_disk(cls, path, klass):
# Load model from disk
return cls

def predict(self, data=None, **kwargs):
from tensorflow.keras.preprocessing import image

mclass, wrapper = search_modelclass(kwargs['mclass'])
module_imp = __import__(mclass.__module__, fromlist=SEARCH_MODULES[wrapper])
model = mclass(include_top=True,
weights="imagenet",
input_tensor=None,
input_shape=None,
pooling=None,
classes=1000)
x = image.img_to_array(data)
x = np.expand_dims(x, axis=0)
x = module_imp.preprocess_input(x)

preds = model.predict(x)
# decode the results into a list of tuples (class, description, probability)
results = module_imp.decode_predictions(preds)
return results

def fit(self, *args, **kwargs):
super().fit(*args, **kwargs)

def get_params(self, **kwargs):
super().get_params(**kwargs)

def score(self, X, y_true, **kwargs):
super().score(X, y_true, **kwargs)

def get_attributes(self):
super().get_attributes()

0 comments on commit fe2c85e

Please sign in to comment.