# coding=utf-8
# Copyright (c) 2020 Alibaba PAI team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import pdb
import torch
from torch.utils.data import DataLoader, RandomSampler
from torch.utils.data.distributed import DistributedSampler
try:
from torch.utils.tensorboard import SummaryWriter
except:
from tensorboardX import SummaryWriter
from .evaluator import Evaluator
from .optimizers import get_optimizer
from ..utils import exporter, io, get_dir_name, get_pretrain_model_path, get_cnn_vocab
from ..utils.logger import logger
from ..utils.statistics import Statistics
from ..utils import get_args
[docs]class Trainer(object):
def __init__(self, model, train_dataset, valid_dataset, evaluator=None):
self.cfg = get_args()
self._model = None
self._optimizer = None
self._train_loader = None
self._valied_loader = None
self._start_epoch = 0
self._start_global_step = 0
self._start_time = time.time()
self._current_loss = 0.
self._eval_scores = None
self._best_valid_score = float('-inf')
self._current_epoch = self._start_epoch
self.set_evaluator(evaluator, valid_dataset.eval_metrics)
self.set_data_loader(train_dataset, valid_dataset, self.cfg)
self.set_model_and_optimizer(model, self.cfg)
self.resume_from_ckpt(self.model_module, self.cfg)
self.set_tensorboard()
self._global_step = self._start_epoch * len(self._train_loader)
@property
def model_module(self):
if self._model is None:
return self._model
# space left for apex/deepspeed
return self._model.module if hasattr(self._model, 'module') else self._model
@property
def learning_rate(self):
return self._optimizer.get_current_lr()
[docs] def set_model_and_optimizer(self, model, cfg):
self._model = model.to(self.cfg.local_rank)
if self.cfg.n_gpu > 1:
self._model = torch.nn.parallel.DistributedDataParallel(
self._model, device_ids=[self.cfg.local_rank],
output_device=self.cfg.local_rank,
find_unused_parameters=True)
# Build Optimizer
self._optimizer = get_optimizer(optimizer_type="adam",
learning_rate=cfg.learning_rate,
warmup_proportion=cfg.warmup_proportion,
max_grad_norm=cfg.max_grad_norm,
named_parameters=list(self.model_module.named_parameters()),
gradient_accumulation_steps=cfg.gradient_accumulation_steps,
num_steps_per_epoch=len(self._train_loader),
epoch_num=cfg.epoch_num)
# space left for apex/deepspeed
[docs] def resume_from_ckpt(self, model_module, cfg):
if cfg.resume_from_checkpoint is None:
return
meta_file = cfg.resume_from_checkpoint + ".meta.bin"
model_file = cfg.resume_from_checkpoint + ".bin"
if "oss::" in cfg.resume_from_checkpoint:
local_file = "easytexminer_resume_pytorch_model.meta.bin"
io.download(model_file, local_file)
meta_file = local_file
local_file = "easytexminer_resume_pytorch_model.bin"
io.download(model_file, local_file)
model_file = local_file
with io.open(meta_file, "rb") as f:
meta_data = torch.load(f, map_location='cpu')
self._start_epoch = meta_data["epoch"]
self._start_global_step = meta_data["global_step"] + 1
self._optimizer.load_state_dict(meta_data['optimizer'])
logger.info("Resume from checkpoint {}".format(cfg.resume_from_checkpoint))
logger.info("Start epoch {}".format(self._start_epoch))
logger.info("Start step {}".format(self._start_global_step))
logger.info("Start learning rate {:.6f}".format(self._optimizer.get_current_lr()))
with io.open(model_file, "rb") as f:
model_module.load_state_dict(torch.load(f, map_location='cpu'))
logger.info("Resume checkpoint Done".format(cfg.resume_from_checkpoint))
[docs] def set_tensorboard(self):
cfg = self.cfg
if not cfg.is_master_node:
return
logger.info("=" * 10 + " Initializing Tensorboard " + "=" * 10)
if "oss://" in cfg.checkpoint_dir:
self.tensorboard = SummaryWriter(log_dir=os.path.join("./easytexminer_tensorboard"))
else:
self.tensorboard = SummaryWriter(log_dir=os.path.join(cfg.checkpoint_dir, "log"))
self.tensorboard.add_text(tag="config/training", text_string=str(self.cfg), global_step=0)
self.tensorboard.add_text(tag="config/model_arch",
text_string=self.model_module.arch, global_step=0)
[docs] def set_evaluator(self, evaluator=None, eval_metrics=None):
if evaluator is None:
self.evaluator = Evaluator(metrics=eval_metrics)
else:
self.evaluator = evaluator
[docs] def set_data_loader(self, train_dataset, valid_dataset, cfg):
if cfg.read_odps:
train_sampler = None
else:
train_sampler = RandomSampler if cfg.n_gpu <= 1 else DistributedSampler
self._train_loader = DataLoader(train_dataset,
sampler=train_sampler(train_dataset) if train_sampler else None,
batch_size=cfg.micro_batch_size,
collate_fn=train_dataset.batch_fn)
self._valid_loader = DataLoader(valid_dataset,
batch_size=cfg.micro_batch_size,
shuffle=False,
collate_fn=valid_dataset.batch_fn)
[docs] def log_train_infos(self):
cfg = self.cfg
logger.info("=" * 10 + " Training Start " + "=" * 10 + "\n")
logger.info(" Num of GPUs = %d", cfg.n_gpu)
n_tr_samples = len(self._train_loader.dataset) * cfg.n_gpu if cfg.read_odps else len(self._train_loader.dataset)
logger.info(" Num training examples = %d", n_tr_samples)
logger.info(" Num validation examples = %d", len(self._valid_loader.dataset))
logger.info(" Training batch size = %d",
cfg.micro_batch_size * cfg.n_gpu * cfg.gradient_accumulation_steps)
logger.info(" Evaluation batch size = %d", cfg.micro_batch_size)
total_training_steps = self._optimizer.total_training_steps
logger.info(" Total training steps = %d", total_training_steps)
logger.info(" Saving steps = %s", str(cfg.save_checkpoint_steps))
model_num_params = sum([p.nelement() for n, p in self.model_module.named_parameters()])
trainable_num_params = sum([p.nelement() for n, p in self.model_module.named_parameters() if p.requires_grad])
logger.info(" num model parameters = %s" % format(model_num_params, ","))
logger.info(" num trainable parameters = %s" % format(trainable_num_params, ","))
logger.info("\n")
logger.info("=" * 10 + " Model Arch " + "=" * 10)
logger.info(self.model_module.arch)
[docs] def before_epoch(self, _epoch):
cfg = self.cfg
self._current_epoch = _epoch
if cfg.n_gpu > 1:
torch.distributed.barrier()
self._model.train()
self._epoch_tr_loss = 0.0
self._epoch_n_tr_steps = 0.0
if cfg.is_master_node:
self._epoch_stats = Statistics(epoch_num=int(cfg.epoch_num),
total_training_steps=self._optimizer.total_training_steps)
[docs] def after_epoch(self):
pass
[docs] def before_iter(self):
pass
[docs] def optimizer_step(self):
self._optimizer.step()
self._optimizer.zero_grad()
[docs] def after_iter(self, _step, _epoch, loss_dict):
cfg = self.cfg
self.pred_loss = loss_dict["loss"].item()
self._epoch_tr_loss += self.pred_loss
self._epoch_n_tr_steps += 1
if (_step + 1) % cfg.gradient_accumulation_steps == 0:
self.optimizer_step()
self._global_step += 1
if not cfg.is_master_node:
return
self._epoch_stats.update(loss_dict)
if self._global_step == 0 or (self._global_step + 1) % cfg.logging_steps == 0:
self._epoch_stats.output(self._global_step + 1, _epoch, self.learning_rate)
self._epoch_stats.log_tensorboard(writer=self.tensorboard,
learning_rate=self.learning_rate,
current_loss=self.pred_loss,
global_step=self._global_step,
output_dir=os.path.join(cfg.checkpoint_dir, "log"))
if cfg.save_checkpoint_steps and (self._global_step + 1) % cfg.save_checkpoint_steps == 0:
if cfg.save_all_checkpoints:
self.save_checkpoint()
self._eval_scores = self.evaluator.evaluate(
model=self.model_module, valid_loader=self._valid_loader)
if self._eval_scores[0][1] > self._best_valid_score:
logger.info("Saving best model to %s..." % os.path.join(cfg.checkpoint_dir, "pytorch_model.bin"))
self.save_checkpoint(save_best=True)
self._best_valid_score = self._eval_scores[0][1]
logger.info("Best score: {}".format(self._best_valid_score))
logger.info("Learning rate: {:.8f}".format(self._optimizer.get_current_lr()))
logger.info("")
self._epoch_stats.log_tensorboard(writer=self.tensorboard,
learning_rate=self.learning_rate,
eval_scores=self._eval_scores,
global_step=self._global_step,
is_training=False,
output_dir=os.path.join(cfg.checkpoint_dir, "log"))
[docs] def after_train(self):
cfg = self.cfg
# Save last checkpoint if needed
if not cfg.is_master_node:
return
if cfg.save_checkpoint_steps is None:
logger.info("Saving best model to %s..." % os.path.join(cfg.checkpoint_dir, "pytorch_model.bin"))
self.save_checkpoint(save_best=True)
else:
self._eval_scores = self.evaluator.evaluate(
model=self.model_module, valid_loader=self._valid_loader)
if self._eval_scores[0][1] > self._best_valid_score:
logger.info("Saving best model to %s..." % os.path.join(cfg.checkpoint_dir, "pytorch_model.bin"))
self.save_checkpoint(save_best=True)
self._best_valid_score = self._eval_scores[0][1]
logger.info("Best score: {}".format(self._best_valid_score))
self.tensorboard.close()
logger.info("Training Time: {}".format(time.time() - self._start_time))
[docs] def save_checkpoint(self, save_best=False):
if not self.cfg.is_master_node:
return
exporter.export_train_config(
saved_path=os.path.join(self.cfg.checkpoint_dir, "train_config.json"),
vocab_dir=get_dir_name(self.cfg.checkpoint_dir),
label_enumerate_values=self._valid_loader.dataset.label_enumerate_values,
model_name=self.model_module.model_name,
model_config=self.model_module.config,
cfg=self.cfg)
exporter.export_label_mapping(
saved_path=os.path.join(self.cfg.checkpoint_dir, "label_mapping.json"),
label_enumerate_values=self._valid_loader.dataset.label_enumerate_values)
# Save config.json
output_config_file = os.path.join(self.cfg.checkpoint_dir, "config.json")
with io.open(output_config_file, "w") as f:
f.write(self.model_module.arch)
# Save vocab.txt
if self.cfg.model_name.endswith("cnn"):
io.copy(get_cnn_vocab(self.cfg),
os.path.join(get_dir_name(self.cfg.checkpoint_dir), "vocab.txt"))
elif self.cfg.pretrained_model_name_or_path is not None:
io.copy(os.path.join(
get_dir_name(get_pretrain_model_path(
self.cfg.pretrained_model_name_or_path, disable_auto_download=True)), "vocab.txt"),
os.path.join(get_dir_name(self.cfg.checkpoint_dir), "vocab.txt"))
# Save the model
model_to_save_prefix = "pytorch_model" if save_best else "pytorch_model_step_%d" % (self._global_step + 1)
with io.open(os.path.join(self.cfg.checkpoint_dir, model_to_save_prefix + ".bin"), "wb") \
as output_model_file:
torch.save(self.model_module.state_dict(), output_model_file)
meta_data = {
"epoch": self._current_epoch,
"global_step": self._global_step,
"optimizer": self._optimizer.state_dict()
}
with io.open(os.path.join(self.cfg.checkpoint_dir, model_to_save_prefix + ".meta.bin"), "wb") \
as output_model_file:
torch.save(meta_data, output_model_file)
if not save_best:
return
if self.cfg.export_tf_checkpoint_type != "none" and hasattr(self.model_module, "model_name"):
# If the student is pre-defined EasyTransfer AppZoo model
# Save train_config.json, model.ckpt.* for EasyTransfer
logger.info("Export tensorflow checkpoint (%s format) to %s" % (
self.cfg.export_tf_checkpoint_type,
os.path.join(get_dir_name(self.cfg.checkpoint_dir), "model.ckpt")))
if self.cfg.export_tf_checkpoint_type == "easytransfer":
exporter.export_pytorch_checkpoint_to_tf(
model=self.model_module,
ckpt_dir=get_dir_name(self.cfg.checkpoint_dir),
bert_output_prefix="bert_pre_trained_model",
appended_val_map=(("classifier", "app/ez_dense"),),
appended_tensors_to_transpose=("classifier.weight",))
elif self.cfg.export_tf_checkpoint_type == "google":
exporter.export_pytorch_checkpoint_to_tf(
model=self.model_module,
ckpt_dir=get_dir_name(self.cfg.checkpoint_dir),
bert_output_prefix="",
appended_val_map=(("classifier.weight", "output_weights"),
("classifier.bias", "output_bias")),
appended_tensors_to_transpose=())
else:
raise RuntimeError("Invalid export_tf_checkpoint_type %s" % self.cfg.export_tf_checkpoint_type)
# This is a hack
torch.cuda.set_device(self.cfg.local_rank)
[docs] def train(self):
self.log_train_infos()
cfg = self.cfg
for _epoch in range(self._start_epoch, int(cfg.epoch_num)):
self.before_epoch(_epoch)
for _step, batch in enumerate(self._train_loader):
if self._global_step + 1 < self._start_global_step:
if (_step + 1) % cfg.gradient_accumulation_steps == 0:
self._global_step += 1
continue
self.before_iter()
batch = {key: val.to(cfg.local_rank) if isinstance(val, torch.Tensor) else val
for key, val in batch.items()}
model_outputs = self._model(batch)
loss_dict = self.model_module.compute_loss(model_outputs, batch)
_loss = loss_dict["loss"]
if cfg.n_gpu > 1:
_loss = _loss.mean()
if cfg.gradient_accumulation_steps > 1:
_loss = _loss / cfg.gradient_accumulation_steps
_loss.backward()
self.after_iter(_step, _epoch, loss_dict)
self.after_epoch()
self.after_train()