tinyengine/code_generator/TfliteConvertor.py
Wei-Ming Chen 9696f47c2a
Refactor code gen(#55)
* refactor conv2d

* upsample

* upsample

* nchw

* minor

* nhwc

* add

* fix hwc

* avgpool

* avgpool

* maxpool

* fc layer

* mean1d

* SE element

* fix mean1d

* fix fc

* fix utils

* clean up

* minor

* minor
2023-02-15 22:33:43 -05:00

188 lines
7.3 KiB
Python

# ----------------------------------------------------------------------
# Project: TinyEngine
# Title: TfliteConvertor.py
#
# Reference papers:
# - MCUNet: Tiny Deep Learning on IoT Device, NeurIPS 2020
# - MCUNetV2: Memory-Efficient Patch-based Inference for Tiny Deep Learning, NeurIPS 2021
# - MCUNetV3: On-Device Training Under 256KB Memory, NeurIPS 2022
# Contact authors:
# - Wei-Ming Chen, wmchen@mit.edu
# - Wei-Chen Wang, wweichen@mit.edu
# - Ji Lin, jilin@mit.edu
# - Ligeng Zhu, ligeng@mit.edu
# - Song Han, songhan@mit.edu
#
# Target ISA: ARMv7E-M
# ----------------------------------------------------------------------
import logging
import code_generator.converters.tflite_parser as TF_Parser
from code_generator.converters.tflite_parser.mean1dto2d import MEAN2D
from code_generator.converters.tflite_parser.utils import get_input_tensors, get_output_tensors, getOpCodeStr
from .constant import SKIP_OPs
from .tflite import Model
# Parse tflite model into TinyEngine IR format
class TfliteConvertor(object):
def __init__(self, filepath):
# path to the tflite file
self.filepath = filepath
self.model = self.loadTFmodel(filepath)
self.subgraph = self.model.Subgraphs(0)
self.layer = []
self.tmpPADIndice = None
self.skip_transpose = None
self.average_1D_to_2D_holder = MEAN2D() # For merging 1D to 2D
# public functions
def loadTFmodel(self, filepath):
buf = open(filepath, "rb").read()
return Model.Model.GetRootAsModel(buf, 0)
def dumpModelInfo(self):
version = self.model.Version()
print("Model version:", version)
description = self.model.Description().decode("utf-8")
print("Description:", description)
subgraph_len = self.model.SubgraphsLength()
print("Subgraph length:", subgraph_len)
self.dumpLayerInfo()
def dumpLayerInfo(self):
print("Layer length:", len(self.layer))
# print brief info about each layer
for i, layer in enumerate(self.layer):
if self.layer[i]["op"] == "ADD":
print(
"op:",
layer["op"],
",input_idx:",
layer["input_idx"],
",input2_idx:",
layer["input2_idx"],
"output_idx:",
layer["output_idx"],
)
else:
print(
"op:",
layer["op"],
",input_idx:",
layer["input_idx"],
"output_idx:",
layer["output_idx"],
)
def parseOperatorInfo(self):
operators_len = self.subgraph.OperatorsLength()
skip_next_ops = 0
for i in range(operators_len):
if skip_next_ops > 0:
skip_next_ops -= 1
continue
op = self.subgraph.Operators(i)
if i + 2 < operators_len - 2:
next_op = self.subgraph.Operators(i + 1)
next_next_op = self.subgraph.Operators(i + 2)
three_op_sequence = [op, next_op, next_next_op]
if self.checkIfRequireSEelementmult(three_op_sequence):
logging.info("found SE block")
skip_next_ops = 2
# -> MEAN -> MEAN -> PWCONV -> PWCONV -> | ADD -> MUL -> |
# DWCONV | -> MUL |
# | SEelementmult |
SEelementmult_op = TF_Parser.parse_SEelement(three_op_sequence, self.model, self.layer)
self.layer.append(SEelementmult_op)
continue
# parse the op
self._handleOperator(op)
# handle one op and parse it into layers[] for supported operators
def _handleOperator(self, op):
op_code_str = getOpCodeStr(op, self.model)
if op_code_str == "CONV_2D":
self.layer.append(TF_Parser.parse_conv2d(op, self.model, self.tmpPADIndice))
self.tmpPADIndice = None
elif op_code_str == "ADD":
self.layer.append(TF_Parser.parse_add(op, self.model))
elif op_code_str == "AVERAGE_POOL_2D":
self.layer.append(TF_Parser.parse_avgpool(op, self.model))
elif op_code_str == "DEPTHWISE_CONV_2D":
self.layer.append(TF_Parser.parse_conv2d(op, self.model, self.tmpPADIndice))
self.tmpPADIndice = None
elif op_code_str == "PAD":
self._convert_PAD(op)
elif op_code_str == "RESIZE_NEAREST_NEIGHBOR":
self.layer.append(TF_Parser.parse_upsample(op, self.model))
elif op_code_str == "MAX_POOL_2D":
self.layer.append(TF_Parser.parse_maxpool(op, self.model))
elif op_code_str in "MEAN":
ret_op = TF_Parser.parse_mead1dto2d(op, self.model, self.average_1D_to_2D_holder)
if ret_op is not None:
# TODO: This only handle a specific graph: TRANSPOSE -> MEAN -> MEANS
if self.skip_transpose is not None:
ret_op.params["input_idx"] = self.skip_transpose.input_idx
ret_op.input_tensors[0].graph_idx = self.skip_transpose.input_idx
self.layer.append(ret_op)
elif op_code_str == "TRANSPOSE":
self._convert_TRANSPOSE(op)
elif op_code_str in "FULLY_CONNECTED":
self.layer.append(TF_Parser.parse_fc(op, self.model))
elif op_code_str in SKIP_OPs:
pass
else:
raise NotImplementedError(f"Unsupported {op_code_str}")
# -> MEAN -> MEAN -> PWCONV -> PWCONV -> | ADD -> MUL -> |
# DWCONV | -> MUL |
# | Fuse Target |
def checkIfRequireSEelementmult(self, three_op_sequence):
if (
getOpCodeStr(three_op_sequence[0], self.model) == "ADD"
and getOpCodeStr(three_op_sequence[1], self.model) == "MUL"
and getOpCodeStr(three_op_sequence[2], self.model) == "MUL"
):
return True
return False
def _convert_PAD(self, op):
# get input, weight, and output tensors
input_tensors = get_input_tensors(op, self.model)
input_tensor = input_tensors[0]
output_tensors = get_output_tensors(op, self.model)
assert len(output_tensors) == 1, "output tensors length should be 1"
output_tensor = output_tensors[0]
# fuse pad into conv
self.tmpPADIndice = PAD_tensorIndice(input_tensor.tensor_idx, output_tensor.tensor_idx)
def _convert_TRANSPOSE(self, op):
# get input, weight, and output tensors
input_tensors = get_input_tensors(op, self.model)
input_tensor = input_tensors[0]
output_tensors = get_output_tensors(op, self.model)
assert len(output_tensors) == 1, "output tensors length should be 1"
output_tensor = output_tensors[0]
# fuse pad into conv
self.skip_transpose = PAD_tensorIndice(input_tensor.tensor_idx, output_tensor.tensor_idx)
class PAD_tensorIndice(object):
def __init__(self, input_idx, output_idx):
self.input_idx = input_idx
self.output_idx = output_idx