1. 概述
本篇文章介绍 ROI 网络。经过 RPN 网络生成的候选框输入进 ROI 网络, ROI 网络进行类别的判断,并对边界框进一步回归。
ROI 网络分三个部分,分别为 ROI Align 、 全链接层和预测层。其中 ROI Align 是 ROI pooling 的改进版。
2. ROI 网络结构
2.1. ROI Align
在 Faster RCNN 原论文中使用的 ROI pooling 层对预测特征层进行处理,但 ROI pooling 层在计算的过程中有两次整数化的过程,首先是对 RPN 网络生成的候选框进行取整操作;其次是将整数化后的边界区域平均分割成 k*k 个单元,对每个单元的边界进行整数化。经过上述两次整数化,此时的候选框已经和最开始回归出来的位置有一定的偏差,最终导致预测的精度存在较大误差。
ROI Align 层取消了整数化的过程,使用双线性插值的方法计算坐标为浮点数时的像素值。
这里还有一个问题,对于原始的 Faster RCNN 网络,只在一个 feature map 上进行预测,对于使用了 FPN 的 backbone 网络,是在多个 featrue map 上进行预测的。那么 ROI Align 层首先需要先计算每一个候选框使用哪个特征层进行预测。我们首先看下代码实现。
ROI Align 实现在类 MultiScaleRoIAlign 中。
- self.featmap_names: 表示在哪些特征层进行 ROI Align
- self.sampling_ratio: 采样点,默认是 2 ,对于 7*7 的每一个区域平分 2 份,每一份取中心点位置,中心点坐标采用双线性插值法进行计算,最终取每一份的最大值作为这个区域的像素值
- self.output_size: 候选框分割的大小,默认是 7*7
- self.scales: 存储每个 feature_map 相对于网络输入 image 的下采样倍率 scale
- self.map_levels: 存储所有 box 对应的 feature_map
class MultiScaleRoIAlign(nn.Module): __annotations__ = { 'scales': Optional[List[float]], 'map_levels': Optional[LevelMapper] } def __init__(self, featmap_names, output_size, sampling_ratio): super(MultiScaleRoIAlign, self).__init__() if isinstance(output_size, int): output_size = (output_size, output_size) self.featmap_names = featmap_names self.sampling_ratio = sampling_ratio self.output_size = tuple(output_size) self.scales = None self.map_levels = None
前向传播函数如下。
- x_filtered: 保存参与预测的预测特征层
- num_levels: 保存参与预测的预测特征层的个数,如果是 1 则直接在该预测特征层上进行预测
- self.convert_to_roi_format: 将一个批量的所有图像的候选框合并在一起。并在每一张图像的候选框前加一列,标示图像编号。
- self.setup_scales: 计算 self.scales ,每个 feature_map 相对于网络输入 image 的下采样倍率 scale ;
- 如果网络只使用一个预测特征层,则直接进行调用 roi_align 函数进行预测。
- levels: 存储每一个预选框对应的预测特征层
- num_rois: rois 的个数
- num_channels: 预测特征层的通道数
- result: 创建一个 shape 为 (num_rois, num_channels, 7, 7) 的 tensor ,并初始化为 0
- 遍历每一个预测特征层和 scale , idx_in_level 表示在该层预测的预选框的索引, rois_per_level 表示在该层预测的预选框的坐标值
- roi_align: 进行 roi align 处理,返回值是当前预测特征层对每个预选框进行 roi align 后的值
- result: 将每个预测特征层进行 roi align 处理后的结果保存到 result 中
def forward(self, x, boxes, image_shapes): # type: (Dict[str, Tensor], List[Tensor], List[Tuple[int, int]]) x_filtered = [] for k, v in x.items(): if k in self.featmap_names: x_filtered.append(v) num_levels = len(x_filtered) rois = self.convert_to_roi_format(boxes) if self.scales is None: self.setup_scales(x_filtered, image_shapes) scales = self.scales assert scales is not None if num_levels == 1: return roi_align( x_filtered[0], rois, output_size=self.output_size, spatial_scale=scales[0], sampling_ratio=self.sampling_ratio ) mapper = self.map_levels assert mapper is not None levels = mapper(boxes) num_rois = len(rois) num_channels = x_filtered[0].shape[1] dtype, device = x_filtered[0].dtype, x_filtered[0].device result = torch.zeros( (num_rois, num_channels,) + self.output_size, dtype=dtype, device=device, ) tracing_results = [] for level, (per_level_feature, scale) in enumerate(zip(x_filtered, scales)): idx_in_level = torch.nonzero(levels == level).squeeze(1) rois_per_level = rois[idx_in_level] result_idx_in_level = roi_align( per_level_feature, rois_per_level, output_size=self.output_size, spatial_scale=scale, sampling_ratio=self.sampling_ratio) if torchvision._is_tracing(): tracing_results.append(result_idx_in_level.to(dtype)) else: result[idx_in_level] = result_idx_in_level if torchvision._is_tracing(): result = _onnx_merge_levels(levels, tracing_results) return result
接下来看一下 convert_to_roi_format 函数,该函数是将一个批量图像生成的预选框合并在一起,并在第 1 维度前加上图像的索引。
def convert_to_roi_format(self, boxes):
# type: (List[Tensor])
concat_boxes = torch.cat(boxes, dim=0)
device, dtype = concat_boxes.device, concat_boxes.dtype
ids = torch.cat(
[
torch.full_like(b[:, :1], i, dtype=dtype, layout=torch.strided, device=device)
for i, b in enumerate(boxes)
],
dim=0,
)
rois = torch.cat([ids, concat_boxes], dim=1)
return rois
下面介绍生成下采样倍率 scale 的函数,主要是在 infer_scale 函数中。该函数在 setup_scales 中调用。
- 2 ** (feature_size/original_size).log2() = [1/4, 1/8, 1/16, 1/32] ,不是很明白为什么要这么算一下,不绕吗…
def infer_scale(self, feature, original_size): # type: (Tensor, List[int]) # assumption: the scale is of the form 2 ** (-k), with k integer size = feature.shape[-2:] possible_scales = torch.jit.annotate(List[float], []) for s1, s2 in zip(size, original_size): approx_scale = float(s1) / float(s2) scale = 2 ** float(torch.tensor(approx_scale).log2().round()) possible_scales.append(scale) assert possible_scales[0] == possible_scales[1] return possible_scales[0]
setup_scales 函数除了生成下采样倍率外,还初始化了 self.map_levels 变量,该变量用来生成每个预选框在哪个特征层进行预测。
- scales: 生成下采样的倍率
- lvl_min: 根据下采样倍率的最小值生成一个 lvl_min 变量。
- lvl_max: 根据下采样倍率的最大值生成一个 lvl_max 变量。
- self.map_levels: 用 lvl_min 和 lvl_max 初始化 initLevelMapper , lvl_min=2;lvl_max=4
def setup_scales(self, features, image_shapes): # type: (List[Tensor], List[Tuple[int, int]]) assert len(image_shapes) != 0 max_x = 0 max_y = 0 for shape in image_shapes: max_x = max(shape[0], max_x) max_y = max(shape[1], max_y) original_input_shape = (max_x, max_y) scales = [self.infer_scale(feat, original_input_shape) for feat in features] # get the levels in the feature map by leveraging the fact that the network always # downsamples by a factor of 2 at each level. lvl_min = -torch.log2(torch.tensor(scales[0], dtype=torch.float32)).item() lvl_max = -torch.log2(torch.tensor(scales[-1], dtype=torch.float32)).item() self.scales = scales self.map_levels = initLevelMapper(int(lvl_min), int(lvl_max))
下面介绍下,如何将预选框映射到某一个预测特征层:
def initLevelMapper(k_min, k_max, canonical_scale=224, canonical_level=4, eps=1e-6):
# type: (int, int, int, int, float)
return LevelMapper(k_min, k_max, canonical_scale, canonical_level, eps)
LevelMapper 类的定义如下:
- self.k_min: 根据下采样倍率的最小值生成一个常量
- self.k_max: 根据下采样倍率的最大值生成一个常量
- self.s0: 常量,默认是 224
- self.lvl0: 常量,默认是 4
- self.eps: 常量,防止除数为 0
class LevelMapper(object): def __init__(self, k_min, k_max, canonical_scale=224, canonical_level=4, eps=1e-6): # type: (int, int, int, int, float) self.k_min = k_min self.k_max = k_max self.s0 = canonical_scale self.lvl0 = canonical_level self.eps = eps
LevelMapper 类的前向传播函数定义如下:
- 将预选框映射到某一个预测特征层公式如下:k = k0 + log2(sqrt(wh)/224)
- 将得到的 target_lvls 值,通过 clamp 函数控制在 self.k_min 和 self.k_max 之间
- 最终结果要用 target_lvls - self.k_min
def __call__(self, boxlists): # type: (List[Tensor]) """ Arguments: boxlists (list[BoxList]) """ # Compute level ids s = torch.sqrt(torch.cat([box_area(boxlist) for boxlist in boxlists])) # Eqn.(1) in FPN paper target_lvls = torch.floor(self.lvl0 + torch.log2(s / self.s0) + torch.tensor(self.eps, dtype=s.dtype)) target_lvls = torch.clamp(target_lvls, min=self.k_min, max=self.k_max) return (target_lvls.to(torch.int64) - self.k_min).to(torch.int64)
至此, MultiScaleRoIAlign 部分就介绍到这里。
2.2. 全链接层
经过 ROI Align 层后,将处理结果送入两层全链接层。
第一层全链接层的输入是 num_channelsx7x7 ,输出是 1024 ;第二层全链接层输入是 1024 ,输出也是 1024 。
在正向传播过程中,先将 rois 的信息在第一维度上进行展平操作(因为每一个 rois 都要进行预测和边界框回归),然后进行两次全链接层。最终输出 1024 维的特征向量。
class TwoMLPHead(nn.Module):
def __init__(self, in_channels, representation_size):
super(TwoMLPHead, self).__init__()
self.fc6 = nn.Linear(in_channels, representation_size)
self.fc7 = nn.Linear(representation_size, representation_size)
def forward(self, x):
x = x.flatten(start_dim=1)
x = F.relu(self.fc6(x))
x = F.relu(self.fc7(x))
return x
2.3. 预测层
网络经过两层全链接层的处理后,输出 1024 维的特征向量,将该特征向量输入预测层进行类别的预测和边界框的回归。
类别预测输出是 num_classes ,边界框回归预测输出是 num_classesx4 。
class FastRCNNPredictor(nn.Module):
def __init__(self, in_channels, num_classes):
super(FastRCNNPredictor, self).__init__()
self.cls_score = nn.Linear(in_channels, num_classes)
self.bbox_pred = nn.Linear(in_channels, num_classes * 4)
def forward(self, x):
if x.dim() == 4:
assert list(x.shape[2:]) == [1, 1]
x = x.flatten(start_dim=1)
scores = self.cls_score(x)
bbox_deltas = self.bbox_pred(x)
return scores, bbox_deltas
3. ROI 网络实现
ROI 网络结构如下图所示:
初始化 ROI Align 网络:
box_roi_pool = MultiScaleRoIAlign(
featmap_names=['0', '1', '2', '3'], # 在哪些特征层进行roi pooling
output_size=[7, 7],
sampling_ratio=2)
初始化全链接网络:
resolution = box_roi_pool.output_size[0] # 默认等于7
representation_size = 1024
box_head = TwoMLPHead(
out_channels * resolution ** 2,
representation_size
)
初始化预测网络:
representation_size = 1024
box_predictor = FastRCNNPredictor(
representation_size,
num_classes)
创建整个 ROI 网络:
roi_heads = RoIHeads(
# box
box_roi_pool, box_head, box_predictor,
box_fg_iou_thresh, box_bg_iou_thresh,
box_batch_size_per_image, box_positive_fraction,
bbox_reg_weights,
box_score_thresh, box_nms_thresh, box_detections_per_img)
先看 RoIHeads 定义。
- fg_iou_thresh 和 bg_iou_thresh: 表示采集正负样本设置的阈值
- batch_size_per_image 和 positive_fraction: 表示采样的样本数,以及正样本占所有样本的比例
- score_thresh: 表示要移除低目标概率的值
- nms_thresh: 表示进行 nms 处理的阈值
- detection_per_img: 表示对预测结果根据 score 排序取前 100 个目标
class RoIHeads(torch.nn.Module): __annotations__ = { 'box_coder': det_utils.BoxCoder, 'proposal_matcher': det_utils.Matcher, 'fg_bg_sampler': det_utils.BalancedPositiveNegativeSampler, } def __init__(self, box_roi_pool, box_head, box_predictor, # Faster R-CNN training fg_iou_thresh, bg_iou_thresh, batch_size_per_image, positive_fraction, bbox_reg_weights, # Faster R-CNN inference score_thresh, nms_thresh, detection_per_img): super(RoIHeads, self).__init__() self.box_similarity = box_ops.box_iou # assign ground-truth boxes for each proposal self.proposal_matcher = det_utils.Matcher( fg_iou_thresh, # 0.5 bg_iou_thresh, # 0.5 allow_low_quality_matches=False) self.fg_bg_sampler = det_utils.BalancedPositiveNegativeSampler( batch_size_per_image, # 512 positive_fraction) # 0.25 if bbox_reg_weights is None: bbox_reg_weights = (10., 10., 5., 5.) self.box_coder = det_utils.BoxCoder(bbox_reg_weights) self.box_roi_pool = box_roi_pool self.box_head = box_head self.box_predictor = box_predictor self.score_thresh = score_thresh self.nms_thresh = nms_thresh self.detection_per_img = detection_per_img
正向传播函数如下:
- 判断 targets 类型是否正确,如果是训练模式,使用 self.select_training_samples 划分正负样本
- 调用 self.box_roi_pool 进行 ROI Align
- 调用 self.box_head 进行全链接层
- 调用 self.box_predictor 进行预测
- fastrcnn_loss: 如果是训练模式,计算损失
- self.postprocess_detections: 如果是预测模式,将预测值映射到真实图像得到预测类别以及边界框的位置
def forward(self, features, proposals, image_shapes, targets=None): # type: (Dict[str, Tensor], List[Tensor], List[Tuple[int, int]], Optional[List[Dict[str, Tensor]]]) # 检查targets的数据类型是否正确 if targets is not None: for t in targets: floating_point_types = (torch.float, torch.double, torch.half) assert t["boxes"].dtype in floating_point_types, "target boxes must of float type" assert t["labels"].dtype == torch.int64, "target labels must of int64 type" if self.training: # 划分正负样本,统计对应gt的标签以及边界框回归信息 proposals, matched_idxs, labels, regression_targets = self.select_training_samples(proposals, targets) else: labels = None regression_targets = None matched_idxs = None # 将采集样本通过roi_pooling层 box_features = self.box_roi_pool(features, proposals, image_shapes) # 通过roi_pooling后的两层全连接层 box_features = self.box_head(box_features) # 接着分别预测目标类别和边界框回归参数 class_logits, box_regression = self.box_predictor(box_features) result = torch.jit.annotate(List[Dict[str, torch.Tensor]], []) losses = {} if self.training: assert labels is not None and regression_targets is not None loss_classifier, loss_box_reg = fastrcnn_loss( class_logits, box_regression, labels, regression_targets) losses = { "loss_classifier": loss_classifier, "loss_box_reg": loss_box_reg } else: boxes, scores, labels = self.postprocess_detections(class_logits, box_regression, proposals, image_shapes) num_images = len(boxes) for i in range(num_images): result.append( { "boxes": boxes[i], "labels": labels[i], "scores": scores[i], } ) return result, losses
下面看一下正负样本划分函数 self.select_training_samples ,这里的正负样本的划分与 rpn 网络没有本质的区别,只是一些参数上的调整。
- self.add_gt_proposals: 将 gt boxes 与 proposals 预选框进行拼接
- self.assign_targets_to_proposals: 遍历每张图像,为每个 proposal 匹配对应的 gt box ,并划分到正负样本中
- self.subsample: 按给定数量和比例采样正负样本
- 遍历每一张图片,计算正负样本与真实标签的类别损失和边界框回归损失
- 返回处理后的 proposals , 正负样本对应的索引 matched_idxs , 正负样本的预测类别 labels , 以及正负样本的边界框回归参数 regression_targets
def select_training_samples(self, proposals, targets): # type: (List[Tensor], Optional[List[Dict[str, Tensor]]]) # 检查target数据是否为空 self.check_targets(targets) assert targets is not None dtype = proposals[0].dtype device = proposals[0].device gt_boxes = [t["boxes"].to(dtype) for t in targets] gt_labels = [t["labels"] for t in targets] # append ground-truth bboxes to proposal # 将gt_boxes拼接到proposal后面 proposals = self.add_gt_proposals(proposals, gt_boxes) # get matching gt indices for each proposal # 为每个proposal匹配对应的gt_box,并划分到正负样本中 matched_idxs, labels = self.assign_targets_to_proposals(proposals, gt_boxes, gt_labels) # sample a fixed proportion of positive-negative proposals # 按给定数量和比例采样正负样本 sampled_inds = self.subsample(labels) matched_gt_boxes = [] num_images = len(proposals) # 遍历每张图像 for img_id in range(num_images): # 获取每张图像的正负样本索引 img_sampled_inds = sampled_inds[img_id] # 获取对应正负样本的proposals信息 proposals[img_id] = proposals[img_id][img_sampled_inds] # 获取对应正负样本的预测类别信息 labels[img_id] = labels[img_id][img_sampled_inds] # 获取对应正负样本的真实类别信息 matched_idxs[img_id] = matched_idxs[img_id][img_sampled_inds] gt_boxes_in_image = gt_boxes[img_id] if gt_boxes_in_image.numel() == 0: gt_boxes_in_image = torch.zeros((1, 4), dtype=dtype, device=device) # 获取对应正负样本的gt box信息 matched_gt_boxes.append(gt_boxes_in_image[matched_idxs[img_id]]) # 根据gt和proposal计算边框回归参数(针对gt的) regression_targets = self.box_coder.encode(matched_gt_boxes, proposals) return proposals, matched_idxs, labels, regression_targets
继续看损失计算函数 fastrcnn_loss 。
- 类别损失使用多分类的交叉熵损失函数
- 边界框回归参数使用 smooth_l1 损失
def fastrcnn_loss(class_logits, box_regression, labels, regression_targets): # type: (Tensor, Tensor, List[Tensor], List[Tensor]) labels = torch.cat(labels, dim=0) regression_targets = torch.cat(regression_targets, dim=0) # 计算类别损失信息 classification_loss = F.cross_entropy(class_logits, labels) # get indices that correspond to the regression targets for # the corresponding ground truth labels, to be used with # advanced indexing # 返回标签类别大于0的索引 sampled_pos_inds_subset = torch.nonzero(labels > 0).squeeze(1) # 返回标签类别大于0位置的类别信息 labels_pos = labels[sampled_pos_inds_subset] # shape=[num_proposal, num_classes] N, num_classes = class_logits.shape box_regression = box_regression.reshape(N, -1, 4) # 计算边界框损失信息 box_loss = det_utils.smooth_l1_loss( # 获取指定索引proposal的指定类别box信息 box_regression[sampled_pos_inds_subset, labels_pos], regression_targets[sampled_pos_inds_subset], beta=1 / 9, size_average=False, ) / labels.numel() return classification_loss, box_loss
如果是预测模式,调用 postprocess_detections 对预测的类别和边界框进行处理。
- self.box_coder.decode: 根据 proposal 以及预测的回归参数计算出最终 bbox 坐标
- F.softmax: 对预测类别结果进行 softmax 处理
- box_ops.clip_boxes_to_image: 裁剪预测的 boxes 信息,将越界的坐标调整到图片边界上
- boxes = boxes[:, 1:],scores = scores[:, 1:],labels = labels[:, 1:]: 移除所有背景信息
- torch.nonzero(scores > self.score_thresh).squeeze(1): 移除低概率目标
- box_ops.remove_small_boxes: 移除小目标
- box_ops.batched_nms: 执行 nms 处理,并按 scores 进行排序
- keep[:self.detection_per_img]: 根据 scores 排序返回前 topk 个目标
def postprocess_detections(self, class_logits, box_regression, proposals, image_shapes): # type: (Tensor, Tensor, List[Tensor], List[Tuple[int, int]]) device = class_logits.device # 预测目标类别数 num_classes = class_logits.shape[-1] # 获取每张图像的预测bbox数量 boxes_per_image = [boxes_in_image.shape[0] for boxes_in_image in proposals] # 根据proposal以及预测的回归参数计算出最终bbox坐标 pred_boxes = self.box_coder.decode(box_regression, proposals) # 对预测类别结果进行softmax处理 pred_scores = F.softmax(class_logits, -1) # split boxes and scores per image # 根据每张图像的预测bbox数量分割结果 pred_boxes_list = pred_boxes.split(boxes_per_image, 0) pred_scores_list = pred_scores.split(boxes_per_image, 0) all_boxes = [] all_scores = [] all_labels = [] # 遍历每张图像预测信息 for boxes, scores, image_shape in zip(pred_boxes_list, pred_scores_list, image_shapes): # 裁剪预测的boxes信息,将越界的坐标调整到图片边界上 boxes = box_ops.clip_boxes_to_image(boxes, image_shape) # create labels for each prediction labels = torch.arange(num_classes, device=device) labels = labels.view(1, -1).expand_as(scores) # remove prediction with the background label # 移除索引为0的所有信息(0代表背景) boxes = boxes[:, 1:] scores = scores[:, 1:] labels = labels[:, 1:] # batch everything, by making every class prediction be a separate instance boxes = boxes.reshape(-1, 4) scores = scores.reshape(-1) labels = labels.reshape(-1) # remove low scoring boxes # 移除低概率目标,self.scores_thresh=0.05 inds = torch.nonzero(scores > self.score_thresh).squeeze(1) boxes, scores, labels = boxes[inds], scores[inds], labels[inds] # remove empty boxes # 移除小目标 keep = box_ops.remove_small_boxes(boxes, min_size=1e-2) boxes, scores, labels = boxes[keep], scores[keep], labels[keep] # non-maximun suppression, independently done per class # 执行nms处理,执行后的结果会按照scores从大到小进行排序返回 keep = box_ops.batched_nms(boxes, scores, labels, self.nms_thresh) # keep only topk scoring predictions # 获取scores排在前topk个预测目标 keep = keep[:self.detection_per_img] boxes, scores, labels = boxes[keep], scores[keep], labels[keep] all_boxes.append(boxes) all_scores.append(scores) all_labels.append(labels) return all_boxes, all_scores, all_labels
至此, ROI 网络就介绍完了。