ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Bertcrf实体识别

2022-07-21 16:34:45  阅读:201  来源: 互联网

标签:Bertcrf self 实体 batch len test import model 识别


作者:昆特Alex
链接:https://www.zhihu.com/question/455063660/answer/2570541435
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

import torch
import torch.nn as nn
from transformers import BertModel, BertConfig
from torchcrf import CRF
import os
class Bert_CRF(nn.Module): # BiLSTM加上并无多大用处,速度还慢了,可去掉LSTM层
    def __init__(self, tag_to_ix, embedding_dim=768, hidden_dim=256):
        super(Bert_CRF, self).__init__()
        self.tag_to_ix = tag_to_ix
        self.tagset_size = len(tag_to_ix)
        self.hidden_dim = hidden_dim
        self.embedding_dim = embedding_dim
        self.bert = BertModel.from_pretrained("hfl/chinese-roberta-wwm-ext")
        self.dropout = nn.Dropout(p=0.1)
        self.linear = nn.Linear(embedding_dim, self.tagset_size)
        self.crf = CRF(self.tagset_size, batch_first=True)

    def _get_features(self, sentence):
        with torch.no_grad():
            outputs = self.bert(sentence)
        enc = outputs.last_hidden_state
        enc = self.dropout(enc)
        feats = self.linear(enc)
        return feats

    def forward(self, sentence, tags, mask, is_test=False):
        emissions = self._get_features(sentence)
        if not is_test: # Training,validation return loss
            loss=-self.crf.forward(emissions, tags, mask, reduction='mean')
            return loss
        else: # Testing,return decoding
            decode=self.crf.decode(emissions, mask)
            return decode


#工具类
作者:昆特Alex
链接:https://www.zhihu.com/question/455063660/answer/2570541435
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

import torch
from torch.utils.data import Dataset
from transformers import BertTokenizer
import pandas as pd
tokenizer = BertTokenizer.from_pretrained("hfl/chinese-roberta-wwm-ext")
ner_type = pd.read_csv("model_data/bio_type.txt") # 包含ner所有类别的txt文件
ners = ner_type["label"].tolist()
VOCAB = []
for n in ners:
    VOCAB.extend(["B-" + n, "I-"+ n])
VOCAB.extend(['<PAD>', '[CLS]', '[SEP]', "O"])
tag2idx = {tag: idx for idx, tag in enumerate(VOCAB)}
idx2tag = {idx: tag for idx, tag in enumerate(VOCAB)}
MAX_LEN = 256
class NerDataset(Dataset):
    ''' Generate our dataset '''
    def __init__(self, f_path, inference_df = None):
        self.sents = []
        self.tags_li = []
        if inference_df is not None:
            data = inference_df
        else:
            data = pd.read_csv(f_path)

        tags =  data["label"].to_list()
        words = data["word"].to_list()
        print("f_path is {} len_word is {}  len tag is {}".format(f_path, len(words), len(tags)))
        word, tag = [], []
        for char, t in zip(words, tags):
            if char != '。':
                word.append(char)
                tag.append(t)
            else:
                if len(word) >= MAX_LEN-2:
                  self.sents.append(['[CLS]'] + word[:MAX_LEN] +[char] + ['[SEP]'])
                  self.tags_li.append(['[CLS]'] + tag[:MAX_LEN] + [t] + ['[SEP]'])
                else:
                  self.sents.append(['[CLS]'] + word + [char] + ['[SEP]'])
                  self.tags_li.append(['[CLS]'] + tag + [t] + ['[SEP]'])
                word, tag = [], []

        if word:
            if len(word) >= MAX_LEN-2:
                self.sents.append(['[CLS]'] + word[:MAX_LEN] + ['[SEP]'])
                self.tags_li.append(['[CLS]'] + tag[:MAX_LEN] + ['[SEP]'])
            else:
                self.sents.append(['[CLS]'] + word + ['[SEP]'])
                self.tags_li.append(['[CLS]'] + tag + ['[SEP]'])
            word, tag = [], []

    def __getitem__(self, idx):
        words, tags = self.sents[idx], self.tags_li[idx]
        token_ids = tokenizer.convert_tokens_to_ids(words)
        laebl_ids = [tag2idx[tag] for tag in tags]
        seqlen = len(laebl_ids)
        return token_ids, laebl_ids, seqlen

    def __len__(self):
        return len(self.sents)

def PadBatch(batch):
    maxlen = max([i[2] for i in batch])
    token_tensors = torch.LongTensor([i[0] + [0] * (maxlen - len(i[0])) for i in batch])
    label_tensors = torch.LongTensor([i[1] + [0] * (maxlen - len(i[1])) for i in batch])
    mask = (token_tensors > 0)
    return token_tensors, label_tensors, mask



#训练
作者:昆特Alex
链接:https://www.zhihu.com/question/455063660/answer/2570541435
来源:知乎
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils import data
import os
import warnings
import argparse
import numpy as np
from sklearn import metrics
from transformers import AdamW, get_linear_schedule_with_warmup
import pandas as pd
from models import Bert_CRF
from utils import NerDataset, PadBatch, VOCAB, tokenizer, tag2idx, idx2tag

def train(e, model, iterator, optimizer, scheduler, criterion, device):
    model.train()
    losses = 0.0
    step = 0
    for i, batch in enumerate(iterator):
        step += 1
        x, y, z = batch
        x = x.to(device)
        y = y.to(device)
        z = z.to(device)
        loss = model(x, y, z)
        losses += loss.item()
        loss.backward()
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()
    print("Epoch: {}, Loss:{:.4f}".format(e, losses/step))

def validate(e, model, iterator, device):
    model.eval()
    Y, Y_hat = [], []
    losses = 0
    step = 0
    with torch.no_grad():
        for i, batch in enumerate(iterator):
            step += 1

            x, y, z = batch
            x = x.to(device)
            y = y.to(device)
            z = z.to(device)

            y_hat = model(x, y, z, is_test=True)

            loss = model(x, y, z)
            losses += loss.item()
            # Save prediction
            for j in y_hat:
              Y_hat.extend(j)
            # Save labels
            mask = (z==1)
            y_orig = torch.masked_select(y, mask)
            Y.append(y_orig.cpu())

    Y = torch.cat(Y, dim=0).numpy()
    Y_hat = np.array(Y_hat)
    acc = (Y_hat == Y).mean()*100
    print("Epoch: {}, Val Loss:{:.4f}, Val Acc:{:.3f}%".format(e, losses/step, acc))
    return model, losses/step, acc

def test(model, iterator, device):
    model.eval()
    Y, Y_hat = [], []
    with torch.no_grad():
        for i, batch in enumerate(iterator):
            x, y, z = batch
            x = x.to(device)
            z = z.to(device)
            y_hat = model(x, y, z, is_test=True)
            # Save prediction
            for j in y_hat:
              Y_hat.extend(j)
            # Save labels
            mask = (z==1).cpu()
            y_orig = torch.masked_select(y, mask)
            Y.append(y_orig)

    Y = torch.cat(Y, dim=0).numpy()
    y_true = [idx2tag[i] for i in Y]
    y_pred = [idx2tag[i] for i in Y_hat]
    return y_true, y_pred

if __name__=="__main__":
      ner_type = pd.read_csv("model_data/type.txt")
      ners = ner_type["label"].tolist()
      labels = []
      for n in ners:
          labels.extend(["B-" + n, "I-"+ n])
      print("all type len is {}".format(len(labels)))
      best_model = None
      _best_val_loss = np.inf
      _best_val_acc = -np.inf

      parser = argparse.ArgumentParser()
      parser.add_argument("--batch_size", type=int, default=256)
      parser.add_argument("--lr", type=float, default=0.0005)
      parser.add_argument("--n_epochs", type=int, default=40)
      parser.add_argument("--trainset", type=str, default="model_data/train.csv")
      parser.add_argument("--validset", type=str, default="model_data/valid.csv")
      parser.add_argument("--testset", type=str, default="model_data/test.csv")

      ner = parser.parse_args()
      model = Bert_CRF(tag2idx).cuda()
      print('Initial model Done.')
      train_dataset = NerDataset(ner.trainset)
      print("train data len is {}".format(len(train_dataset)))
      eval_dataset = NerDataset(ner.validset)
      print("validset data len is {}".format(len(eval_dataset)))
      test_dataset = NerDataset(ner.testset)
      print("test_dataset len is {}".format(len(test_dataset)))
      print('Load Data Done.')

      train_iter = data.DataLoader(dataset=train_dataset,
                                    batch_size=ner.batch_size,
                                    shuffle=True,
                                    num_workers=4,
                                    collate_fn=PadBatch)

      eval_iter = data.DataLoader(dataset=eval_dataset,
                                    batch_size=ner.batch_size,
                                    shuffle=False,
                                    num_workers=4,
                                    collate_fn=PadBatch)

      test_iter = data.DataLoader(dataset=test_dataset,
                                  batch_size=ner.batch_size,
                                  shuffle=False,
                                  num_workers=4,
                                  collate_fn=PadBatch)
      optimizer = AdamW(model.parameters(), lr=ner.lr, eps=1e-6)
      len_dataset = len(train_dataset) 
      epoch = ner.n_epochs
      batch_size = ner.batch_size
      total_steps = (len_dataset // batch_size) * epoch if len_dataset % batch_size == 0 else (len_dataset // batch_size + 1) * epoch
      warm_up_ratio = 0.1 # Define 10% steps
      scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps = warm_up_ratio * total_steps, num_training_steps = total_steps)
      criterion = nn.CrossEntropyLoss(ignore_index=0) 
      for epoch in range(1, ner.n_epochs+1):
          train(epoch, model, train_iter, optimizer, scheduler, criterion, device)
          candidate_model, loss, acc = validate(epoch, model, eval_iter, device)
          if loss < _best_val_loss and acc > _best_val_acc:
            best_model = candidate_model
            _best_val_loss = loss
            _best_val_acc = acc
      y_test, y_pred = test(best_model, test_iter, device)
      print(metrics.classification_report(y_test, y_pred, labels=labels, digits=3))
      torch.save(best_model.state_dict(), "checkpoint/0704_ner.pt")
      test_data = pd.read_csv("model_data/0704_bio_test.csv")
      y_test_useful = []
      y_pred_useful = []
      for a, b in zip(y_test, y_pred):
          if a not in ['[CLS]', '[SEP]']:
                y_test_useful.append(a)
                y_pred_useful.append(b)
      test_data["labeled"] = y_test_useful
      test_data["pred"] = y_pred_useful
      test_data.to_csv("result_files/bio_test_result.csv", index=False)

 

标签:Bertcrf,self,实体,batch,len,test,import,model,识别
来源: https://www.cnblogs.com/qiaoqifa/p/16502373.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有