jtyoui.neuralNetwork.paddle.ernie.run_msra 源代码

#!/usr/bin/python3.7
# -*- coding: utf-8 -*-
# @Time  : 2019/4/11 11:44
# @Author: Jtyoui@qq.com
from jtyoui.neuralNetwork.paddle.ernie.transformer_encoder import encoder, pre_process_layer
from jtyoui.neuralNetwork.paddle.ernie.vocab import vocal
import os
import numpy as np
from paddle import fluid  # pip install paddlepaddle==1.6.1

ERNIE_MODEL_PARAMETER = {
    "attention_probs_dropout_prob": 0.1,
    "hidden_act": "relu",
    "hidden_dropout_prob": 0.1,
    "hidden_size": 768,
    "initializer_range": 0.02,
    "max_position_embeddings": 513,
    "num_attention_heads": 12,
    "num_hidden_layers": 12,
    "type_vocab_size": 2,
    "vocab_size": 18000
}
ERNIE_LABEL_MAP = {
    "B-PER": 0,  # 人名
    "I-PER": 1,
    "B-ORG": 2,  # 机构名
    "I-ORG": 3,
    "B-LOC": 4,  # 地名
    "I-LOC": 5,
    "O": 6
}

# 需要自己更改
model_path, config, label_map_config = None, ERNIE_MODEL_PARAMETER, ERNIE_LABEL_MAP
examples = ''


[文档]def pad_batch_data(inst, pad_idx=0, input_mask=False): return_list = [] max_len = max(len(inst) for inst in inst) inst_data = np.array([inst + list([pad_idx] * (max_len - len(inst))) for inst in inst]) return_list += [inst_data.astype("int64").reshape([-1, max_len, 1])] if input_mask: input_mask_data = np.array([[1] * len(inst) + [0] * (max_len - len(inst)) for inst in inst]) input_mask_data = np.expand_dims(input_mask_data, axis=-1) return_list += [input_mask_data.astype("float32")] return return_list if len(return_list) > 1 else return_list[0]
[文档]def prepare_batch_data(example): examples = '' words = [1] for word in example: if word in vocal: words.append(vocal[word]) examples += word else: words.append(2) padded_token_ids, input_mask = pad_batch_data([words], 0, True) padded_text_type_ids = pad_batch_data([[0] * len(words)]) padded_position_ids = pad_batch_data([list(range(len(words)))]) padded_label_ids = pad_batch_data([[8] * len(words)], len(label_map_config) - 1) return_list = [padded_token_ids, padded_text_type_ids, padded_position_ids, input_mask, padded_label_ids] return return_list, examples
[文档]def data_generator(input_str): def wrapper(): global examples return_list, examples = prepare_batch_data(input_str) yield return_list return wrapper
[文档]def init_checkpoint(exe, init_checkpoint_path, main_program): def existed(var): if not fluid.io.is_persistable(var): return False return os.path.exists(os.path.join(init_checkpoint_path, var.name)) fluid.io.load_vars(exe, init_checkpoint_path, main_program=main_program, predicate=existed)
[文档]def evaluate(exe, program, reader, graph_vars): fetch_list = [graph_vars["labels"].name, graph_vars["infers"].name] total_number = None while True: reader.start() try: _, np_infers = exe.run(program=program, fetch_list=fetch_list) total_number = [ls[0] for ls in np_infers[1:-1]] except Exception as e: assert 'There is no next data' in e.__str__(), Exception('非正常停止。') reader.reset() break return total_number
[文档]def create_model(): reader = fluid.layers.py_reader(capacity=50, shapes=[[-1, 256, 1]] * 5, lod_levels=[0] * 5, use_double_buffer=True, dtypes=['int64'] * 3 + ['float32', 'int64']) src_ids, sent_ids, pos_ids, input_mask, labels = fluid.layers.read_file(reader) self_attn_mask = fluid.layers.matmul(x=input_mask, y=input_mask, transpose_y=True) self_attn_mask = fluid.layers.scale(x=self_attn_mask, scale=10000.0, bias=-1.0, bias_after_scale=False) n_head_self_attn_mask = fluid.layers.stack(x=[self_attn_mask] * config['num_attention_heads'], axis=1) n_head_self_attn_mask.stop_gradient = True param_initializer = fluid.initializer.TruncatedNormal(config['initializer_range']) emb_out = fluid.layers.embedding( input=src_ids, size=[config['vocab_size'], config['hidden_size']], dtype="float32", param_attr=fluid.ParamAttr(name="word_embedding", initializer=param_initializer), is_sparse=False) position_emb_out = fluid.layers.embedding( input=pos_ids, size=[config['max_position_embeddings'], config['hidden_size']], dtype="float32", param_attr=fluid.ParamAttr(name="pos_embedding", initializer=param_initializer)) sent_emb_out = fluid.layers.embedding( sent_ids, size=[config['type_vocab_size'], config['hidden_size']], dtype="float32", param_attr=fluid.ParamAttr(name="sent_embedding", initializer=param_initializer)) emb_out += position_emb_out + sent_emb_out emb_out = pre_process_layer(emb_out, 'nd', config['hidden_dropout_prob'], name='pre_encoder') enc_out = encoder( n_layer=config['num_hidden_layers'], enc_input=emb_out, attn_bias=n_head_self_attn_mask, n_head=config['num_attention_heads'], d_key=config['hidden_size'] // config['num_attention_heads'], d_value=config['hidden_size'] // config['num_attention_heads'], d_model=config['hidden_size'], d_inner_hid=config['hidden_size'] * 4, prepostprocess_dropout=config['hidden_dropout_prob'], attention_dropout=config['attention_probs_dropout_prob'], relu_dropout=0, hidden_act=config['hidden_act'], preprocess_cmd="", postprocess_cmd="dan", param_initializer=param_initializer, name='encoder') log = fluid.layers.fc(input=enc_out, size=len(label_map_config), num_flatten_dims=2, param_attr=fluid.ParamAttr(name="cls_seq_label_out_w", initializer=fluid.initializer.TruncatedNormal(scale=0.02)), bias_attr=fluid.ParamAttr(name="cls_seq_label_out_b", initializer=fluid.initializer.Constant(0.))) ret_labels = fluid.layers.reshape(x=labels, shape=[-1, 1]) ret_infers = fluid.layers.reshape(x=fluid.layers.argmax(log, axis=2), shape=[-1, 1]) graph_vars = {"labels": ret_labels, "infers": ret_infers} for v in graph_vars.values(): v.persistable = True return reader, graph_vars
[文档]def match(words, init_st: list): """抽取实体函数 :param words: 需要抽取的文字 :param init_st: 初始化参数。st() :return: 数字列表,这些数字是在label_map_config中配置的 """ global examples examples = '' data = data_generator(words) init_st[2].decorate_tensor_provider(data) number = evaluate(*init_st) return number, examples
[文档]def st(new_model_path=None, new_config=None, new_label_map_config=None) -> list: """初始化模型,只需要加载一次即可 :param new_model_path: 模型路径 :param new_config: 模型配置参数 :param new_label_map_config: 模型实体映射 """ global model_path, config, label_map_config if new_model_path: model_path = new_model_path if new_config: config = new_config if new_label_map_config: label_map_config = new_label_map_config exe = fluid.Executor(fluid.CPUPlace()) startup_program = fluid.Program() test_program = fluid.Program() with fluid.program_guard(test_program, startup_program): with fluid.unique_name.guard(): test_reader, graph_vars = create_model() test_program = test_program.clone(for_test=True) exe.run(startup_program) init_checkpoint(exe, model_path, main_program=startup_program) return [exe, test_program, test_reader, graph_vars]
if __name__ == '__main__': # 默认的模型参数和映射表 ERNIE_MODEL_PATH = 'D://model' s = st(ERNIE_MODEL_PATH) print(match('我叫刘万光我是贵阳 市南明叇村永乐乡水塘村的村民', s))