| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 |
- import torch
- from sympy.physics.units import current
- import bio
- import polars as pl
- from transformers import AutoTokenizer, AutoModelForTokenClassification
- from torch.utils.data import Dataset
- from torch.nn import CrossEntropyLoss # 需要导入损失函数
- from data_preparation import columns, sample_file_name, clean_raw_text
- from bio import entity_types
- from env import env
- label_map = {"O": 0}
- index = 1
- for tag in set(entity_types.values()):
- label_map[f'B-{tag}'] = index
- index += 1
- label_map[f'I-{tag}'] = index
- index += 1
- label_index = {v: k for k, v in label_map.items()}
- # model_name = "distilbert-base-multilingual-cased"
- # model_name = "hfl/chinese-roberta-wwm-ext"
- # tokenizer = AutoTokenizer.from_pretrained(model_name)
- # model = AutoModelForTokenClassification.from_pretrained(model_name, num_labels=len(set(label_map)) + 1) # include:“O”
- device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
- class PatentNERDataset(Dataset):
- def __init__(self, index, texts, labels, tokenizer, max_length=512):
- self.index = index
- self.texts = texts
- self.labels = labels
- self.tokenizer = tokenizer
- self.max_length = max_length
- def __len__(self):
- return len(self.texts)
- def __getitem__(self, idx):
- text = self.texts[idx]
- labels = self.labels[idx]
- encoding = self.tokenizer(text, truncation=True, padding="max_length", max_length=self.max_length,
- return_tensors="pt")
- input_ids = encoding["input_ids"].squeeze()
- attention_mask = encoding["attention_mask"].squeeze()
- # 转换标签
- label_ids = [label_map[label] for label in labels] + [0] * (self.max_length - len(labels))
- return {"input_ids": input_ids,
- "attention_mask": attention_mask,
- "labels": torch.tensor(label_ids),
- "text": text,
- "index": self.index[idx]
- }
- def parse_entity(text, predict):
- entities = []
- bios = []
- entity = []
- current_categ = None
- for word, tensor in zip(text, predict):
- label_id = tensor.item()
- label = label_index[label_id]
- if label == 'O' or (current_categ and not label[2:] == current_categ):
- if entity:
- entities.append(''.join(entity))
- bios.append(current_categ)
- entity = []
- current_categ = None
- continue
- current_categ = label[2:]
- entity.append(word)
- return entities, bios
- def train(model_name):
- tokenizer = AutoTokenizer.from_pretrained(model_name)
- model = AutoModelForTokenClassification.from_pretrained(model_name,
- num_labels=len(set(label_map)) + 1) # include:“O”
- # tokenizer = AutoTokenizer.from_pretrained("building_ner_model_bert_wwm")
- # model = AutoModelForTokenClassification.from_pretrained("building_ner_model_bert_wwm",
- # num_labels=len(set(label_map)) + 1) # include:“O”
- # 训练数据
- train_index, train_texts, train_labels = bio.get_bio()
- # 创建数据集
- dataset = PatentNERDataset(train_index, train_texts, train_labels, tokenizer)
- from torch.utils.data import DataLoader
- from torch.optim import AdamW
- # 训练参数
- model.to(device)
- optimizer = AdamW(model.parameters(), lr=5e-5)
- train_loader = DataLoader(dataset, batch_size=2, shuffle=True)
- # 创建一个损失函数实例,用于单独计算样本损失
- # reduction='none' 表示不进行聚合,返回每个元素的损失
- # reduction='mean' 在这里用于计算单个样本所有 token 的平均损失
- loss_fct = CrossEntropyLoss(ignore_index=-100, reduction='mean') # 'mean' 计算单个样本的平均loss
- # 训练循环
- for epoch in range(4):
- for batch in train_loader:
- optimizer.zero_grad()
- input_ids, attention_mask, labels = batch["input_ids"].to(device), batch["attention_mask"].to(device), batch["labels"].to(device)
- outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
- loss = outputs.loss
- loss.backward()
- optimizer.step()
- print(f"Epoch {epoch}, Loss: {loss.item()}")
- original_texts = batch["text"]
- original_bios = batch["labels"]
- # 获取模型的 logits (预测分数)
- logits = outputs.logits # Shape: (batch_size, sequence_length, num_labels)
- # --- 单独计算每个样本的损失 ---
- sample_losses = []
- for i in range(logits.size(0)): # 遍历批次中的每个样本
- # 提取单个样本的 logits 和 labels
- sample_logits = logits[i] # Shape: (sequence_length, num_labels)
- sample_labels = labels[i] # Shape: (sequence_length)
- # 检查是否存在有效标签,避免除以零或 NaN
- if (sample_labels != -100).sum() > 0:
- # 使用 loss_fct 计算该样本的平均损失
- # CrossEntropyLoss 需要 (N, C) 和 (N) 格式,这里 N=sequence_length, C=num_labels
- individual_loss = loss_fct(sample_logits, sample_labels)
- sample_losses.append((individual_loss.item(), i))
- else:
- # 如果样本没有有效标签(可能全是padding或特殊标记),损失设为0
- sample_losses.append((0.0, i))
- # --- 按损失值降序排序 ---
- sample_losses.sort(key=lambda x: x[0], reverse=True)
- # --- 打印损失最高的样本信息 ---
- for individual_loss_value, index in sample_losses:
- # 只打印损失大于某个值的样本,或者打印前 N 个
- # if individual_loss_value > 0.1: # 可以加一个过滤条件
- print(f" - Sample Index in Batch: {batch['index'][index]}")
- print(f" Individual Avg Loss: {individual_loss_value:.4f}")
- print(f" Original Text: {original_texts[index]}") # 打印部分文本
- # print(f" Original BIO : {' '.join(original_bios[index][:20])}...") # 打印部分BIO
- # 可选:进行预测并打印对比,帮助分析错误
- with torch.no_grad():
- pred_labels_ids = torch.argmax(logits[index], dim=-1)
- pred_labels, _ = parse_entity(original_texts[index], pred_labels_ids)
- true_labels, _ = parse_entity(original_texts[index], labels[index])
- print(f" Predicted BIO: {' '.join(pred_labels)}")
- print(f" True BIO : {' '.join(true_labels)}")
- # 保存训练模型
- # model.save_pretrained("building_ner_model")
- # tokenizer.save_pretrained("building_ner_model")
- # model.save_pretrained("building_ner_model_bert_wwm")
- # tokenizer.save_pretrained("building_ner_model_bert_wwm")
- model.save_pretrained(env.resolve_output(f"{model_name}-building"))
- tokenizer.save_pretrained(f"{model_name}-building")
- def predict_test(model_name):
- # 加载训练模型
- # model = AutoModelForTokenClassification.from_pretrained("building_ner_model")
- # tokenizer = AutoTokenizer.from_pretrained("building_ner_model")
- # model = AutoModelForTokenClassification.from_pretrained("building_ner_model_bert_wwm")
- # tokenizer = AutoTokenizer.from_pretrained("building_ner_model_bert_wwm")
- model = AutoModelForTokenClassification.from_pretrained(model_name)
- tokenizer = AutoTokenizer.from_pretrained(model_name)
- # id2label = model.config.id2label
- # 推理函数
- def predict_distilbert(text):
- inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True).to(device)
- outputs = model(**inputs)
- predictions = torch.argmax(outputs.logits, dim=-1).squeeze().tolist()
- # labels = [model.config.id2label[pred.item()] for pred in predictions[0]]
- # 解析识别结果
- # tokens = tokenizer.convert_ids_to_tokens(inputs["input_ids"].squeeze().tolist())
- tokens = list(text)
- results = []
- for token, pred in zip(tokens, predictions):
- # label = id2label[pred]
- label = label_index[pred]
- # if label != "LABEL_0": # 过滤掉非实体的 "O" 标签
- if label != "O": # 过滤掉非实体的 "O" 标签
- results.append((label, token))
- # 输出结果
- print(text)
- print("识别出的实体类型及词语:")
- for entity_type, entity in results:
- if entity_type.startswith("B-"):
- print(f" {entity}", end='')
- else:
- print(f"{entity}", end='')
- print("\n")
- return predictions
- df = pl.read_csv(str(env.resolve_data(sample_file_name)), columns=columns, encoding="utf-8")
- for description in df['摘要']:
- description = clean_raw_text(description)
- predict_distilbert(description)
- if __name__ == '__main__':
- # train('hfl/chinese-roberta-wwm-ext-large-building')
- # train('hfl/chinese-roberta-wwm-ext-large')
- predict_test('hfl/chinese-roberta-wwm-ext-large-building-building')
- # test()
|