From 56141f5d9064ef2119d0f68b87567475fbce5cd7 Mon Sep 17 00:00:00 2001 From: pingbowen Date: Mon, 11 Nov 2024 03:10:47 +0000 Subject: [PATCH 1/6] support step-wise dpo --- llama3_8b_instruct_dpo.py | 247 ++++++++++++++++++ .../collate_fns/preference_collate_fn.py | 2 +- xtuner/dataset/preference_dataset.py | 123 +++++++-- xtuner/model/dpo.py | 6 + xtuner/utils/templates.py | 5 +- 5 files changed, 365 insertions(+), 18 deletions(-) create mode 100644 llama3_8b_instruct_dpo.py diff --git a/llama3_8b_instruct_dpo.py b/llama3_8b_instruct_dpo.py new file mode 100644 index 000000000..0f0b50534 --- /dev/null +++ b/llama3_8b_instruct_dpo.py @@ -0,0 +1,247 @@ +# Copyright (c) OpenMMLab. All rights reserved. +import torch +from datasets import load_dataset +from mmengine.dataset import DefaultSampler +from mmengine.hooks import (CheckpointHook, DistSamplerSeedHook, IterTimerHook, + LoggerHook, ParamSchedulerHook) +from mmengine.optim import AmpOptimWrapper, CosineAnnealingLR, LinearLR +from peft import LoraConfig +from torch.optim import AdamW +from transformers import (AutoModelForCausalLM, AutoTokenizer, + BitsAndBytesConfig) + +from xtuner.dataset.collate_fns.preference_collate_fn import \ + preference_collate_fn +from xtuner.dataset.preference_dataset import (build_preference_dataset, + orpo_dpo_mix_40k_map_fn,ultrafeedback_dpo_map_fn,load_jsonl_dataset) +from xtuner.engine.hooks import (DatasetInfoHook, EvaluateChatHook, + VarlenAttnArgsToMessageHubHook) +from xtuner.engine.runner import TrainLoop +from xtuner.model.dpo import DPO +from xtuner.parallel.sequence import SequenceParallelSampler +from xtuner.utils import PROMPT_TEMPLATE, SYSTEM_TEMPLATE + +####################################################################### +# PART 1 Settings # +####################################################################### +# Model +pretrained_model_name_or_path = '/home/pingbowen/models/Longwriter-8b' +use_varlen_attn = True +dpo_loss_type = 'sigmoid' # One of ['sigmoid', 'hinge', 'ipo', 'kto_pair', 'sppo_hard', 'nca_pair', 'robust'] # noqa: E501 +loss_beta = 0.1 +label_smoothing = 0.0 + +# Data +# prompt_template = PROMPT_TEMPLATE.llama3_chat +prompt_template = PROMPT_TEMPLATE.llama2_chat # llama2_chat +max_length = 32768 +max_packed_length = max_length + +# parallel +sequence_parallel_size = 2 + +# Scheduler & Optimizer +batch_size = 1 # per_device +accumulative_counts = 1 +accumulative_counts *= sequence_parallel_size +dataloader_num_workers = 0 +max_epochs = 2 +optim_type = AdamW +lr = 1e-6 # refer to alignment handbook +betas = (0.9, 0.999) +weight_decay = 0 +max_norm = 1 # grad clip +warmup_ratio = 0.1 # 0.03 + +# Save +save_steps = 250 +save_total_limit = -1 # Maximum checkpoints to keep (-1 means unlimited) + +# Evaluate the generation performance during the training +evaluation_freq = 500 +SYSTEM = SYSTEM_TEMPLATE.alpaca +evaluation_inputs = [ + 'What famous British author, known for his tales of mystery and the macabre, shares his initials with a common abbreviation for "rest in peace"?', # noqa: E501 + 'Please tell me five scenic spots in Shanghai', + '890729 - 425663? Only respond with math and no words.' +] + +####################################################################### +# PART 2 Model & Tokenizer # +####################################################################### +tokenizer = dict( + type=AutoTokenizer.from_pretrained, + pretrained_model_name_or_path=pretrained_model_name_or_path, + trust_remote_code=True, + padding_side='right') + +model = dict( + type=DPO, + loss_type=dpo_loss_type, + use_varlen_attn=use_varlen_attn, + beta=loss_beta, + label_smoothing=label_smoothing, + llm=dict( + type=AutoModelForCausalLM.from_pretrained, + pretrained_model_name_or_path=pretrained_model_name_or_path, + trust_remote_code=True, + torch_dtype=torch.bfloat16, + ), + ref_llm=dict( ##### initialization of ref_llm ####### + type=AutoModelForCausalLM.from_pretrained, + pretrained_model_name_or_path=pretrained_model_name_or_path, + trust_remote_code=True, + torch_dtype=torch.bfloat16, + ), +) + # llm=dict( + # type=AutoModelForCausalLM.from_pretrained, + # pretrained_model_name_or_path=pretrained_model_name_or_path, + # trust_remote_code=True, + # torch_dtype=torch.float16, + # quantization_config=dict( + # type=BitsAndBytesConfig, + # load_in_4bit=True, + # load_in_8bit=False, + # llm_int8_threshold=6.0, + # llm_int8_has_fp16_weight=False, + # bnb_4bit_compute_dtype=torch.float16, + # bnb_4bit_use_double_quant=True, + # bnb_4bit_quant_type='nf4')), + # lora=dict( + # type=LoraConfig, + # r=64, + # lora_alpha=16, + # lora_dropout=0.1, + # bias='none', + # task_type='CAUSAL_LM')) +####################################################################### +# PART 3 Dataset & Dataloader # +####################################################################### +sampler = SequenceParallelSampler \ + if sequence_parallel_size > 1 else DefaultSampler + +train_dataset = dict( + type=build_preference_dataset, + # dataset=dict(type=load_jsonl_dataset,data_files=["/home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/longwriter_len_calssify/range_2_4k_res.jsonl","/home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/ultrafeedback_binarized.jsonl","/home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/longwriter_len_calssify/range_16_32k_res_filter.jsonl"]), + dataset=dict(type=load_jsonl_dataset,data_files=["/home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/test_new.jsonl"]), + # dataset=dict(type=load_dataset, path='llamafactory/ultrafeedback_binarized'), # mlabonne/orpo-dpo-mix-40k + tokenizer=tokenizer, + max_length=max_length, + dataset_map_fn=ultrafeedback_dpo_map_fn, + is_dpo=True, + is_reward=False, + reward_token_id=-1, + num_proc=32, + use_varlen_attn=use_varlen_attn, + max_packed_length=max_packed_length, + shuffle_before_pack=True, +) + +train_dataloader = dict( + batch_size=batch_size, + num_workers=dataloader_num_workers, + dataset=train_dataset, + sampler=dict(type=sampler, shuffle=True), + collate_fn=dict( + type=preference_collate_fn, use_varlen_attn=use_varlen_attn)) + +####################################################################### +# PART 4 Scheduler & Optimizer # +####################################################################### +# optimizer +optim_wrapper = dict( + type=AmpOptimWrapper, + optimizer=dict( + type=optim_type, lr=lr, betas=betas, weight_decay=weight_decay), + clip_grad=dict(max_norm=max_norm, error_if_nonfinite=False), + accumulative_counts=accumulative_counts, + loss_scale='dynamic', + dtype='bfloat16') + +# learning policy +# More information: https://github.com/open-mmlab/mmengine/blob/main/docs/en/tutorials/param_scheduler.md # noqa: E501 +param_scheduler = [ + dict( + type=LinearLR, + start_factor=1e-5, + by_epoch=True, + begin=0, + end=warmup_ratio * max_epochs, + convert_to_iter_based=True), + dict( + type=CosineAnnealingLR, + eta_min=0.0, + by_epoch=True, + begin=warmup_ratio * max_epochs, + end=max_epochs, + convert_to_iter_based=True) +] + +# train, val, test setting +train_cfg = dict(type=TrainLoop, max_epochs=max_epochs) + +####################################################################### +# PART 5 Runtime # +####################################################################### +# Log the dialogue periodically during the training process, optional + +custom_hooks = [ + dict(type=DatasetInfoHook, tokenizer=tokenizer), + # dict( + # type=EvaluateChatHook, + # tokenizer=tokenizer, + # every_n_iters=evaluation_freq, + # evaluation_inputs=evaluation_inputs, + # system=SYSTEM, + # prompt_template=prompt_template) +] + +if use_varlen_attn: + custom_hooks += [dict(type=VarlenAttnArgsToMessageHubHook)] + +# configure default hooks +default_hooks = dict( + # record the time of every iteration. + timer=dict(type=IterTimerHook), + # print log every 10 iterations. + logger=dict(type=LoggerHook, log_metric_by_epoch=False, interval=10), + # enable the parameter scheduler. + param_scheduler=dict(type=ParamSchedulerHook), + # save checkpoint per `save_steps`. + checkpoint=dict( + type=CheckpointHook, + by_epoch=False, + interval=save_steps, + max_keep_ckpts=save_total_limit), + # set sampler seed in distributed evrionment. + sampler_seed=dict(type=DistSamplerSeedHook), +) + +# configure environment +env_cfg = dict( + # whether to enable cudnn benchmark + cudnn_benchmark=False, + # set multi process parameters + mp_cfg=dict(mp_start_method='fork', opencv_num_threads=0), + # set distributed parameters + dist_cfg=dict(backend='nccl'), +) + +# set visualizer +visualizer = None + +# set log level +log_level = 'INFO' + +# load from which checkpoint +load_from = None + +# whether to resume training from the loaded checkpoint +resume = False + +# Defaults to use random seed and disable `deterministic` +randomness = dict(seed=42, deterministic=False) + +# set log processor +log_processor = dict(by_epoch=False) diff --git a/xtuner/dataset/collate_fns/preference_collate_fn.py b/xtuner/dataset/collate_fns/preference_collate_fn.py index 4b6a7f5c3..d9a10dcd8 100644 --- a/xtuner/dataset/collate_fns/preference_collate_fn.py +++ b/xtuner/dataset/collate_fns/preference_collate_fn.py @@ -69,7 +69,7 @@ def preference_collate_fn(instances: Sequence[Dict], bs, seq_len = input_ids.shape position_ids = torch.arange(seq_len).unsqueeze(0).long().repeat(bs, 1) - + if seq_parallel_world_size > 1: input_ids = pad_for_sequence_parallel(input_ids, pad_index) labels = pad_for_sequence_parallel(labels, IGNORE_INDEX) diff --git a/xtuner/dataset/preference_dataset.py b/xtuner/dataset/preference_dataset.py index 371ef8290..2cd9fd3a3 100644 --- a/xtuner/dataset/preference_dataset.py +++ b/xtuner/dataset/preference_dataset.py @@ -16,7 +16,6 @@ from mmengine.utils.misc import get_object_from_string from torch.utils.data import Dataset from transformers import AutoTokenizer - from xtuner.registry import BUILDER, MAP_FUNC from .huggingface import build_origin_dataset @@ -112,20 +111,58 @@ def tokenize(pair: str, max_length: int, is_reward: bool = False, reward_token_id: int = -1): - prompt = tokenizer.apply_chat_template( - pair['prompt'], tokenize=False, add_generation_prompt=True) - chosen = tokenizer.apply_chat_template( - pair['prompt'] + pair['chosen'], - tokenize=False, - add_generation_prompt=False) - rejected = tokenizer.apply_chat_template( - pair['prompt'] + pair['rejected'], - tokenize=False, - add_generation_prompt=False) + + # if tokenizer.chat_template is None: + # tokenizer.chat_template = ''' + # {%- for message in messages %} + # {%- if message['role'] == 'user' %} + # {{- '[INST]' + message['content'] + '[/INST]' }} + # {%- elif message['role'] == 'added_user' %} + # {{- message['content'] }} + # {%- elif message['role'] == 'system' %} + # {{- '<>\\n' + message['content'] + '\\n<>\\n\\n' }} + # {%- elif message['role'] == 'added_assistant' %} + # {{- " " + message['content']}} + # {%- elif message['role'] == 'assistant' %} + # {{- ' ' + message['content'] + ' ' + eos_token }} + # {%- endif %} + # {%- endfor %} + # ''' + + # prompt = tokenizer.apply_chat_template( + # pair['prompt'], tokenize=False, add_generation_prompt=True) + # chosen = tokenizer.apply_chat_template( + # pair['prompt'] + pair['chosen'], + # tokenize=False, + # add_generation_prompt=False) + # rejected = tokenizer.apply_chat_template( + # pair['prompt'] + pair['rejected'], + # tokenize=False, + # add_generation_prompt=False) + + def process_message(messages): + prompt = '' + for message in messages: + if message['role'] == 'user': + prompt += '[INST]' + message['content'] + '[/INST]' + elif message['role'] == 'added_user': + prompt += message['content'] + elif message['role'] =='system': + prompt += '<>\\n' + message['content'] + '\\n<>\\n\\n' + elif message['role'] == 'added_assistant': + prompt += " " + message['content'] + elif message['role'] == 'assistant': + prompt += ' ' + message['content'] + tokenizer.eos_token + return prompt + + prompt = pair['prompt'][0]['content'] + chosen = process_message(pair['prompt'] + pair['chosen']) + rejected = process_message(pair['prompt'] + pair['rejected']) + prompt_ids = tokenizer.encode(prompt, add_special_tokens=False) chosen_ids = tokenizer.encode(chosen, add_special_tokens=False) rejected_ids = tokenizer.encode(rejected, add_special_tokens=False) - + if len(chosen_ids) > max_length: chosen_ids = chosen_ids[:max_length] if len(rejected_ids) > max_length: @@ -150,6 +187,8 @@ def tokenize(pair: str, 'rejected_ids': rejected_ids, 'chosen_labels': chosen_labels, 'rejected_labels': rejected_labels, + 'group_id': pair.get('group_id', 0), + "seq_num": pair.get("seq_num", None) } @@ -176,7 +215,7 @@ def __init__( self.is_reward = is_reward self.reward_token_id = reward_token_id self.tokenized_pairs = [] - + for tokenized_pair in _multi_progress( partial( tokenize, @@ -209,13 +248,15 @@ def __init__(self, self.lengths = [] self.data = [] + dataset = self.post_process(dataset) + indices = np.arange(len(dataset)) if shuffle_before_pack: np.random.shuffle(indices) - data_bin = [] bin_seq_len = 0 removed = 0 + for idx in indices: data = dataset[int(idx)] cur_len = len(data['chosen_ids']) + len(data['rejected_ids']) @@ -249,13 +290,52 @@ def __init__(self, ' using var len attention.', logger='current') + def post_process(self, dataset): + + def merge_data(indices, dataset): + keys = ['chosen_ids', 'rejected_ids', 'chosen_labels', 'rejected_labels'] + merged_data = {key: [] for key in keys} + + for index in indices: + data = dataset[index] + for key in keys: + merged_data[key].extend(data[key]) + + return merged_data + + from collections import defaultdict + grouped_indices = defaultdict(list) + + # 遍历数据集并按 group_id 分组索引 + for index, item in enumerate(dataset): + grouped_indices[item["group_id"]].append((item["seq_num"], index)) + # 将 grouped_indices 转换为普通字典 + + for group_id in grouped_indices: + # 排序仅按 seq_num 排列,并提取 index + grouped_indices[group_id] = [index for seq_num, index in sorted(grouped_indices[group_id])] + + grouped_indices = dict(grouped_indices) + + selected_indices = {index for indices in grouped_indices.values() for index in indices} + merged_data = [merge_data(v, dataset) for k,v in grouped_indices.items()] + filtered_dataset = [item for index, item in enumerate(dataset) if index not in selected_indices] + + # if dist.get_rank() == 0: + # import pdb; pdb.set_trace() + + return merged_data + filtered_dataset + + # for k,v in grouped_indices.items(): + # data = merge_data(v, dataset) + + def __len__(self): return len(self.data) def __getitem__(self, index): pairs = self.data[index] input_ids, cu_seqlens, position_ids, labels = [], [0], [], [] - for pair in pairs: input_ids.extend(pair['chosen_ids']) input_ids.extend(pair['rejected_ids']) @@ -346,6 +426,9 @@ def build_preference_dataset( dataset = map_dataset( dataset, dataset_map_fn, map_num_proc=num_proc) + # if dist.get_rank() == 0: + # import pdb; pdb.set_trace() + tokenized_ds = PreferenceDataset( dataset=dataset, tokenizer=tokenizer, @@ -360,7 +443,7 @@ def build_preference_dataset( dataset=tokenized_ds, max_packed_length=max_packed_length, shuffle_before_pack=shuffle_before_pack, - ) + ) tokenized_ds = broad_cast_dataset(tokenized_ds) return tokenized_ds @@ -377,6 +460,14 @@ def intel_orca_dpo_map_fn(example): rejected = [{'role': 'assistant', 'content': example['rejected']}] return {'prompt': prompt, 'chosen': chosen, 'rejected': rejected} +def ultrafeedback_dpo_map_fn(example): + prompt = [{ + 'role': example['prompt_role'], + 'content': example['instruction'] + }] + chosen = [{'role': example['answer_role'], 'content': example['chosen']}] + rejected = [{'role': example['answer_role'], 'content': example['rejected']}] + return {'prompt': prompt, 'chosen': chosen, 'rejected': rejected} def orpo_dpo_mix_40k_map_fn(example): assert len(example['chosen']) == len(example['rejected']) diff --git a/xtuner/model/dpo.py b/xtuner/model/dpo.py index faaa43402..c6ccb3c50 100644 --- a/xtuner/model/dpo.py +++ b/xtuner/model/dpo.py @@ -162,6 +162,12 @@ def compute_loss(self, data, data_samples=None): data = self._split_for_sequence_parallel(data) all_logits = self.llm(**data).logits + + # if torch.distributed.get_rank() == 0: + # from transformers import AutoTokenizer + # tokenizer = AutoTokenizer.from_pretrained('/home/pingbowen/models/Longwriter-8b') + # print(tokenizer.batch_decode(data['input_ids'][:,:1000])) + with torch.no_grad(): if self.ref_llm is None: with self.llm.disable_adapter(): diff --git a/xtuner/utils/templates.py b/xtuner/utils/templates.py index 59b472731..74e5e5433 100644 --- a/xtuner/utils/templates.py +++ b/xtuner/utils/templates.py @@ -43,7 +43,10 @@ 'ensure that your responses are socially unbiased and positive in ' 'nature.\n{system}\n<>\n [/INST] '), INSTRUCTION='[INST] {input} [/INST]', - SEP='\n'), + # SEP='\n', + SUFFIX='<|eot_id|>', + SUFFIX_AS_EOS=True, + STOP_WORDS=['<|eot_id|>']), code_llama_chat=dict( SYSTEM='{system}\n', INSTRUCTION='[INST] {input} [/INST]'), chatglm2=dict( From 8478d653cf0ad8df1df02897ac9b48850936430f Mon Sep 17 00:00:00 2001 From: pingbowen Date: Mon, 11 Nov 2024 09:02:40 +0000 Subject: [PATCH 2/6] fix position_ids and cum_seq --- llama3_8b_instruct_dpo.py | 2 +- xtuner/dataset/preference_dataset.py | 63 ++++++++++++++++++---------- 2 files changed, 41 insertions(+), 24 deletions(-) diff --git a/llama3_8b_instruct_dpo.py b/llama3_8b_instruct_dpo.py index 0f0b50534..049c5074f 100644 --- a/llama3_8b_instruct_dpo.py +++ b/llama3_8b_instruct_dpo.py @@ -34,7 +34,7 @@ # Data # prompt_template = PROMPT_TEMPLATE.llama3_chat prompt_template = PROMPT_TEMPLATE.llama2_chat # llama2_chat -max_length = 32768 +max_length = 8192 max_packed_length = max_length # parallel diff --git a/xtuner/dataset/preference_dataset.py b/xtuner/dataset/preference_dataset.py index 2cd9fd3a3..e32a0738b 100644 --- a/xtuner/dataset/preference_dataset.py +++ b/xtuner/dataset/preference_dataset.py @@ -293,14 +293,22 @@ def __init__(self, def post_process(self, dataset): def merge_data(indices, dataset): - keys = ['chosen_ids', 'rejected_ids', 'chosen_labels', 'rejected_labels'] - merged_data = {key: [] for key in keys} + keys = ['chosen_ids', 'rejected_ids', 'chosen_labels', 'rejected_labels'] # 'position_ids','cumulative_len','concated' + merged_data = {key: (True if key == 'concated' else []) for key in keys} + position_ids , seq_len = [], [] for index in indices: data = dataset[index] for key in keys: merged_data[key].extend(data[key]) + + if "ids" in key: + position_ids.extend(list(range(len(data[key])))) + seq_len.append(len(list(range(len(data[key]))))) + merged_data["position_ids"] = position_ids + merged_data["seq_len"] = seq_len + merged_data["concated"] = True return merged_data from collections import defaultdict @@ -308,8 +316,8 @@ def merge_data(indices, dataset): # 遍历数据集并按 group_id 分组索引 for index, item in enumerate(dataset): - grouped_indices[item["group_id"]].append((item["seq_num"], index)) - # 将 grouped_indices 转换为普通字典 + if item["group_id"] is not None: + grouped_indices[item["group_id"]].append((item["seq_num"], index)) for group_id in grouped_indices: # 排序仅按 seq_num 排列,并提取 index @@ -320,15 +328,8 @@ def merge_data(indices, dataset): selected_indices = {index for indices in grouped_indices.values() for index in indices} merged_data = [merge_data(v, dataset) for k,v in grouped_indices.items()] filtered_dataset = [item for index, item in enumerate(dataset) if index not in selected_indices] - - # if dist.get_rank() == 0: - # import pdb; pdb.set_trace() - return merged_data + filtered_dataset - # for k,v in grouped_indices.items(): - # data = merge_data(v, dataset) - def __len__(self): return len(self.data) @@ -337,18 +338,34 @@ def __getitem__(self, index): pairs = self.data[index] input_ids, cu_seqlens, position_ids, labels = [], [0], [], [] for pair in pairs: - input_ids.extend(pair['chosen_ids']) - input_ids.extend(pair['rejected_ids']) - - position_ids.extend(list(range(len(pair['chosen_ids'])))) - position_ids.extend(list(range(len(pair['rejected_ids'])))) - - labels.extend(pair['chosen_labels']) - labels.extend(pair['rejected_labels']) - - cu_seqlens.append(cu_seqlens[-1] + len(pair['chosen_ids'])) - cu_seqlens.append(cu_seqlens[-1] + len(pair['rejected_ids'])) - + if not pair.get('concated', False): + input_ids.extend(pair['chosen_ids']) + input_ids.extend(pair['rejected_ids']) + + position_ids.extend(list(range(len(pair['chosen_ids'])))) + position_ids.extend(list(range(len(pair['rejected_ids'])))) + + labels.extend(pair['chosen_labels']) + labels.extend(pair['rejected_labels']) + + cu_seqlens.append(cu_seqlens[-1] + len(pair['chosen_ids'])) + cu_seqlens.append(cu_seqlens[-1] + len(pair['rejected_ids'])) + else: + input_ids.extend(pair['chosen_ids']) + input_ids.extend(pair['rejected_ids']) + + labels.extend(pair['chosen_labels']) + labels.extend(pair['rejected_labels']) + + position_ids.extend(pair.get('position_ids',None)) + seq_lens = pair.get('seq_len',None) + + for seq_len in seq_lens: + cu_seqlens.append(cu_seqlens[-1] + seq_len) + + if dist.get_rank() == 0: + import pdb;pdb.set_trace() + return { 'input_ids': input_ids, 'labels': labels, From 85a5506c292a6e0c7eb0437a45c4081530399678 Mon Sep 17 00:00:00 2001 From: pingbowen Date: Mon, 18 Nov 2024 03:46:03 +0000 Subject: [PATCH 3/6] fix training bug --- llama3_8b_instruct_dpo.py | 4 +- xtuner/dataset/preference_dataset.py | 116 +++++++++++++++++++++++---- 2 files changed, 103 insertions(+), 17 deletions(-) diff --git a/llama3_8b_instruct_dpo.py b/llama3_8b_instruct_dpo.py index 049c5074f..77898efde 100644 --- a/llama3_8b_instruct_dpo.py +++ b/llama3_8b_instruct_dpo.py @@ -34,7 +34,7 @@ # Data # prompt_template = PROMPT_TEMPLATE.llama3_chat prompt_template = PROMPT_TEMPLATE.llama2_chat # llama2_chat -max_length = 8192 +max_length = 32768 max_packed_length = max_length # parallel @@ -124,7 +124,7 @@ train_dataset = dict( type=build_preference_dataset, # dataset=dict(type=load_jsonl_dataset,data_files=["/home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/longwriter_len_calssify/range_2_4k_res.jsonl","/home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/ultrafeedback_binarized.jsonl","/home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/longwriter_len_calssify/range_16_32k_res_filter.jsonl"]), - dataset=dict(type=load_jsonl_dataset,data_files=["/home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/test_new.jsonl"]), + dataset=dict(type=load_jsonl_dataset,data_files=["/home/pingbowen/workspace/DDPO/openr/data/range_2_4k_res.jsonl","/home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/ultrafeedback_binarized.jsonl"]), # "/home/pingbowen/workspace/DDPO/openr/data/range_4_16k_res_new.jsonl","/home/pingbowen/workspace/DDPO/openr/data/range_16_32k_res_new.jsonl", /home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/test_new.jsonl # dataset=dict(type=load_dataset, path='llamafactory/ultrafeedback_binarized'), # mlabonne/orpo-dpo-mix-40k tokenizer=tokenizer, max_length=max_length, diff --git a/xtuner/dataset/preference_dataset.py b/xtuner/dataset/preference_dataset.py index e32a0738b..94b8cee7c 100644 --- a/xtuner/dataset/preference_dataset.py +++ b/xtuner/dataset/preference_dataset.py @@ -155,7 +155,7 @@ def process_message(messages): prompt += ' ' + message['content'] + tokenizer.eos_token return prompt - prompt = pair['prompt'][0]['content'] + prompt = pair['prompt'][0]['content'] if pair['prompt'][0]['role'] != "user" else process_message(pair['prompt']) chosen = process_message(pair['prompt'] + pair['chosen']) rejected = process_message(pair['prompt'] + pair['rejected']) @@ -187,8 +187,8 @@ def process_message(messages): 'rejected_ids': rejected_ids, 'chosen_labels': chosen_labels, 'rejected_labels': rejected_labels, - 'group_id': pair.get('group_id', 0), - "seq_num": pair.get("seq_num", None) + 'group_id': pair.get('group_id', None), + "seq_num": pair.get("depth", None) } @@ -229,7 +229,7 @@ def __init__( chunksize=num_proc, description='Tokenizing dataset'): self.tokenized_pairs.append(tokenized_pair) - + def __len__(self): return len(self.tokenized_pairs) @@ -247,8 +247,12 @@ def __init__(self, self.max_packed_length = max_packed_length self.lengths = [] self.data = [] - - dataset = self.post_process(dataset) + + # dataset = self.post_process(dataset) + # dataset_group_none , dataset_group_sorted = self.split_data(dataset) + # packed_dataset = self.pack_dataset(dataset_group_sorted, dataset_group_none, max_packed_length=self.max_packed_length) + # if dist.get_rank() == 0: + # import pdb; pdb.set_trace() indices = np.arange(len(dataset)) if shuffle_before_pack: @@ -256,7 +260,7 @@ def __init__(self, data_bin = [] bin_seq_len = 0 removed = 0 - + for idx in indices: data = dataset[int(idx)] cur_len = len(data['chosen_ids']) + len(data['rejected_ids']) @@ -290,6 +294,89 @@ def __init__(self, ' using var len attention.', logger='current') + def split_data(self, dataset): + # 转换为 Pandas DataFrame + import pandas as pd + df = pd.DataFrame(dataset[:]) + + # 拆分为两部分 + df_group_none = df[df['group_id'].isna()] # group_id 为 None 的部分 + df_group_sorted = df[df['group_id'].notna()] # group_id 不为 None 的部分 + + # 对 group_id 不为 None 的部分进行排序 + df_group_sorted = df_group_sorted.sort_values(by=['group_id', 'seq_num'], ascending=[True, True]) + + # 将两部分转换回列表形式 + dataset_group_none = df_group_none.to_dict(orient='records') + dataset_group_sorted = df_group_sorted.to_dict(orient='records') + return dataset_group_none , dataset_group_sorted + + def pack_dataset(self,dataset_group_sorted, dataset_group_none, max_packed_length=32768): + + def process_group(group_id): + data_bin = [] + bin_seq_len = 0 + removed = 0 + + for data in dataset_group_sorted: + if data['group_id'] == group_id: + cur_len = len(data['chosen_ids']) + len(data['rejected_ids']) + if cur_len > max_length: + removed += 1 + continue + + if (bin_seq_len + + cur_len) > max_packed_length and len(data_bin) > 0: + self.data.append(data_bin) + self.lengths.append(bin_seq_len) + data_bin = [] + bin_seq_len = 0 + data_bin.append(data) + bin_seq_len += cur_len + + return data_bin , bin_seq_len + + packed = [] # 保存每次打包的结果 + remaining_length = max_length # 当前剩余的可用长度 + + # 将 dataset_group_sorted 按 group_id 分组 + groups = {} + for item in dataset_group_sorted: + group_id = item['group_id'] + if group_id not in groups: + groups[group_id] = [] + groups[group_id].append(item) + + # 转换为按 group_id 的列表,确保顺序一致 + sorted_groups = list(groups.values()) + + # 打包过程 + groups = [0,1,2,3] + + for data in dataset_group_sorted: + ''' + cur_len = len(data['chosen_ids']) + len(data['rejected_ids']) + if cur_len > max_packed_length: + removed += 1 + continue + + if (bin_seq_len + + cur_len) > max_packed_length and len(data_bin) > 0: + self.data.append(data_bin) + self.lengths.append(bin_seq_len) + data_bin = [] + bin_seq_len = 0 + data_bin.append(data) + bin_seq_len += cur_len + ''' + + if len(data_bin) > 0: + self.data.append(data_bin) + self.lengths.append(bin_seq_len) + + return packed + + def post_process(self, dataset): def merge_data(indices, dataset): @@ -310,7 +397,7 @@ def merge_data(indices, dataset): merged_data["seq_len"] = seq_len merged_data["concated"] = True return merged_data - + from collections import defaultdict grouped_indices = defaultdict(list) @@ -324,7 +411,6 @@ def merge_data(indices, dataset): grouped_indices[group_id] = [index for seq_num, index in sorted(grouped_indices[group_id])] grouped_indices = dict(grouped_indices) - selected_indices = {index for indices in grouped_indices.values() for index in indices} merged_data = [merge_data(v, dataset) for k,v in grouped_indices.items()] filtered_dataset = [item for index, item in enumerate(dataset) if index not in selected_indices] @@ -363,8 +449,6 @@ def __getitem__(self, index): for seq_len in seq_lens: cu_seqlens.append(cu_seqlens[-1] + seq_len) - if dist.get_rank() == 0: - import pdb;pdb.set_trace() return { 'input_ids': input_ids, @@ -413,7 +497,6 @@ def map_dataset(dataset, dataset_map_fn, map_num_proc): raise TypeError('dataset_map_fn must be a function or a ' "registered function's string in MAP_FUNC, " f"but got a string of '{dataset_map_fn}'") - dataset = dataset.map(dataset_map_fn, num_proc=map_num_proc) return dataset @@ -478,12 +561,15 @@ def intel_orca_dpo_map_fn(example): return {'prompt': prompt, 'chosen': chosen, 'rejected': rejected} def ultrafeedback_dpo_map_fn(example): + prompt_role = example.get('prompt_role') if example.get('prompt_role') is not None else "user" + answer_role = example.get('answer_role') if example.get('answer_role') is not None else "assistant" + prompt = [{ - 'role': example['prompt_role'], + 'role': prompt_role, 'content': example['instruction'] }] - chosen = [{'role': example['answer_role'], 'content': example['chosen']}] - rejected = [{'role': example['answer_role'], 'content': example['rejected']}] + chosen = [{'role': answer_role, 'content': example['chosen']}] + rejected = [{'role': answer_role, 'content': example['rejected']}] return {'prompt': prompt, 'chosen': chosen, 'rejected': rejected} def orpo_dpo_mix_40k_map_fn(example): From 3863c302d16de04bf749c1fa23c0bbc2fdf96047 Mon Sep 17 00:00:00 2001 From: pingbowen Date: Thu, 28 Nov 2024 06:31:05 +0000 Subject: [PATCH 4/6] little modification --- llama3_8b_instruct_dpo.py | 2 +- xtuner/dataset/preference_dataset.py | 97 +--------------------------- 2 files changed, 4 insertions(+), 95 deletions(-) diff --git a/llama3_8b_instruct_dpo.py b/llama3_8b_instruct_dpo.py index 77898efde..e0a593734 100644 --- a/llama3_8b_instruct_dpo.py +++ b/llama3_8b_instruct_dpo.py @@ -124,7 +124,7 @@ train_dataset = dict( type=build_preference_dataset, # dataset=dict(type=load_jsonl_dataset,data_files=["/home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/longwriter_len_calssify/range_2_4k_res.jsonl","/home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/ultrafeedback_binarized.jsonl","/home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/longwriter_len_calssify/range_16_32k_res_filter.jsonl"]), - dataset=dict(type=load_jsonl_dataset,data_files=["/home/pingbowen/workspace/DDPO/openr/data/range_2_4k_res.jsonl","/home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/ultrafeedback_binarized.jsonl"]), # "/home/pingbowen/workspace/DDPO/openr/data/range_4_16k_res_new.jsonl","/home/pingbowen/workspace/DDPO/openr/data/range_16_32k_res_new.jsonl", /home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/test_new.jsonl + dataset=dict(type=load_jsonl_dataset,data_files=["/home/pingbowen/workspace/DDPO/openr/data/step_wise/range_2_4k_res_0.7.jsonl","/home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/ultrafeedback_binarized.jsonl","/home/pingbowen/workspace/DDPO/openr/data/step_wise/range_4_16k_res_0.7.jsonl","/home/pingbowen/workspace/DDPO/openr/data/step_wise/range_16_32k_res_new.jsonl"]), # dataset=dict(type=load_dataset, path='llamafactory/ultrafeedback_binarized'), # mlabonne/orpo-dpo-mix-40k tokenizer=tokenizer, max_length=max_length, diff --git a/xtuner/dataset/preference_dataset.py b/xtuner/dataset/preference_dataset.py index 94b8cee7c..933a5e6e6 100644 --- a/xtuner/dataset/preference_dataset.py +++ b/xtuner/dataset/preference_dataset.py @@ -150,9 +150,9 @@ def process_message(messages): elif message['role'] =='system': prompt += '<>\\n' + message['content'] + '\\n<>\\n\\n' elif message['role'] == 'added_assistant': - prompt += " " + message['content'] + prompt += message['content'] elif message['role'] == 'assistant': - prompt += ' ' + message['content'] + tokenizer.eos_token + prompt += message['content'] + tokenizer.eos_token return prompt prompt = pair['prompt'][0]['content'] if pair['prompt'][0]['role'] != "user" else process_message(pair['prompt']) @@ -186,9 +186,7 @@ def process_message(messages): 'chosen_ids': chosen_ids, 'rejected_ids': rejected_ids, 'chosen_labels': chosen_labels, - 'rejected_labels': rejected_labels, - 'group_id': pair.get('group_id', None), - "seq_num": pair.get("depth", None) + 'rejected_labels': rejected_labels } @@ -248,12 +246,6 @@ def __init__(self, self.lengths = [] self.data = [] - # dataset = self.post_process(dataset) - # dataset_group_none , dataset_group_sorted = self.split_data(dataset) - # packed_dataset = self.pack_dataset(dataset_group_sorted, dataset_group_none, max_packed_length=self.max_packed_length) - # if dist.get_rank() == 0: - # import pdb; pdb.set_trace() - indices = np.arange(len(dataset)) if shuffle_before_pack: np.random.shuffle(indices) @@ -293,89 +285,6 @@ def __init__(self, f'from {len(dataset)} to {len(self)} after' ' using var len attention.', logger='current') - - def split_data(self, dataset): - # 转换为 Pandas DataFrame - import pandas as pd - df = pd.DataFrame(dataset[:]) - - # 拆分为两部分 - df_group_none = df[df['group_id'].isna()] # group_id 为 None 的部分 - df_group_sorted = df[df['group_id'].notna()] # group_id 不为 None 的部分 - - # 对 group_id 不为 None 的部分进行排序 - df_group_sorted = df_group_sorted.sort_values(by=['group_id', 'seq_num'], ascending=[True, True]) - - # 将两部分转换回列表形式 - dataset_group_none = df_group_none.to_dict(orient='records') - dataset_group_sorted = df_group_sorted.to_dict(orient='records') - return dataset_group_none , dataset_group_sorted - - def pack_dataset(self,dataset_group_sorted, dataset_group_none, max_packed_length=32768): - - def process_group(group_id): - data_bin = [] - bin_seq_len = 0 - removed = 0 - - for data in dataset_group_sorted: - if data['group_id'] == group_id: - cur_len = len(data['chosen_ids']) + len(data['rejected_ids']) - if cur_len > max_length: - removed += 1 - continue - - if (bin_seq_len + - cur_len) > max_packed_length and len(data_bin) > 0: - self.data.append(data_bin) - self.lengths.append(bin_seq_len) - data_bin = [] - bin_seq_len = 0 - data_bin.append(data) - bin_seq_len += cur_len - - return data_bin , bin_seq_len - - packed = [] # 保存每次打包的结果 - remaining_length = max_length # 当前剩余的可用长度 - - # 将 dataset_group_sorted 按 group_id 分组 - groups = {} - for item in dataset_group_sorted: - group_id = item['group_id'] - if group_id not in groups: - groups[group_id] = [] - groups[group_id].append(item) - - # 转换为按 group_id 的列表,确保顺序一致 - sorted_groups = list(groups.values()) - - # 打包过程 - groups = [0,1,2,3] - - for data in dataset_group_sorted: - ''' - cur_len = len(data['chosen_ids']) + len(data['rejected_ids']) - if cur_len > max_packed_length: - removed += 1 - continue - - if (bin_seq_len + - cur_len) > max_packed_length and len(data_bin) > 0: - self.data.append(data_bin) - self.lengths.append(bin_seq_len) - data_bin = [] - bin_seq_len = 0 - data_bin.append(data) - bin_seq_len += cur_len - ''' - - if len(data_bin) > 0: - self.data.append(data_bin) - self.lengths.append(bin_seq_len) - - return packed - def post_process(self, dataset): From 15af16c0a9cbcadb016d280e06d1a0383bf50896 Mon Sep 17 00:00:00 2001 From: pingbowen Date: Fri, 6 Dec 2024 12:30:23 +0000 Subject: [PATCH 5/6] update --- llama3_8b_instruct_dpo.py | 1 + qwen2_instruct_dpo.py | 246 +++++++++++++++++++++++++++ run_copy.sh | 58 +++++++ xtuner/dataset/preference_dataset.py | 49 +++--- 4 files changed, 335 insertions(+), 19 deletions(-) create mode 100644 qwen2_instruct_dpo.py create mode 100644 run_copy.sh diff --git a/llama3_8b_instruct_dpo.py b/llama3_8b_instruct_dpo.py index e0a593734..6d5bdf908 100644 --- a/llama3_8b_instruct_dpo.py +++ b/llama3_8b_instruct_dpo.py @@ -125,6 +125,7 @@ type=build_preference_dataset, # dataset=dict(type=load_jsonl_dataset,data_files=["/home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/longwriter_len_calssify/range_2_4k_res.jsonl","/home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/ultrafeedback_binarized.jsonl","/home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/longwriter_len_calssify/range_16_32k_res_filter.jsonl"]), dataset=dict(type=load_jsonl_dataset,data_files=["/home/pingbowen/workspace/DDPO/openr/data/step_wise/range_2_4k_res_0.7.jsonl","/home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/ultrafeedback_binarized.jsonl","/home/pingbowen/workspace/DDPO/openr/data/step_wise/range_4_16k_res_0.7.jsonl","/home/pingbowen/workspace/DDPO/openr/data/step_wise/range_16_32k_res_new.jsonl"]), + # dataset=dict(type=load_jsonl_dataset,data_files=["/home/pingbowen/workspace/DDPO/openr/data/step_wise/range_2_4k_res_new_refined.jsonl","/home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/ultrafeedback_binarized.jsonl","/home/pingbowen/workspace/DDPO/openr/data/step_wise/range_4_16k_res_new_refined.jsonl","/home/pingbowen/workspace/DDPO/openr/data/step_wise/range_16_32k_res_new.jsonl"]) # dataset=dict(type=load_dataset, path='llamafactory/ultrafeedback_binarized'), # mlabonne/orpo-dpo-mix-40k tokenizer=tokenizer, max_length=max_length, diff --git a/qwen2_instruct_dpo.py b/qwen2_instruct_dpo.py new file mode 100644 index 000000000..483c5e126 --- /dev/null +++ b/qwen2_instruct_dpo.py @@ -0,0 +1,246 @@ +# Copyright (c) OpenMMLab. All rights reserved. +import torch +from datasets import load_dataset +from mmengine.dataset import DefaultSampler +from mmengine.hooks import (CheckpointHook, DistSamplerSeedHook, IterTimerHook, + LoggerHook, ParamSchedulerHook) +from mmengine.optim import AmpOptimWrapper, CosineAnnealingLR, LinearLR +from peft import LoraConfig +from torch.optim import AdamW +from transformers import (AutoModelForCausalLM, AutoTokenizer, + BitsAndBytesConfig) + +from xtuner.dataset.collate_fns.preference_collate_fn import \ + preference_collate_fn +from xtuner.dataset.preference_dataset import (build_preference_dataset, + orpo_dpo_mix_40k_map_fn,ultrafeedback_dpo_map_fn,load_jsonl_dataset) +from xtuner.engine.hooks import (DatasetInfoHook, EvaluateChatHook, + VarlenAttnArgsToMessageHubHook) +from xtuner.engine.runner import TrainLoop +from xtuner.model.dpo import DPO +from xtuner.parallel.sequence import SequenceParallelSampler +from xtuner.utils import PROMPT_TEMPLATE, SYSTEM_TEMPLATE + +####################################################################### +# PART 1 Settings # +####################################################################### +# Model +pretrained_model_name_or_path = '/home/pingbowen/models/LongWriter-Qwen2.5-7B-Instruct' +use_varlen_attn = True +dpo_loss_type = 'sigmoid' # One of ['sigmoid', 'hinge', 'ipo', 'kto_pair', 'sppo_hard', 'nca_pair', 'robust'] # noqa: E501 +loss_beta = 0.1 +label_smoothing = 0.0 + +# Data +# prompt_template = PROMPT_TEMPLATE.llama3_chat +prompt_template = PROMPT_TEMPLATE.qwen_chat # llama2_chat +max_length = 32768 +max_packed_length = max_length + +# parallel +sequence_parallel_size = 2 + +# Scheduler & Optimizer +batch_size = 1 # per_device +accumulative_counts = 1 +accumulative_counts *= sequence_parallel_size +dataloader_num_workers = 0 +max_epochs = 2 +optim_type = AdamW +lr = 1e-6 # refer to alignment handbook +betas = (0.9, 0.999) +weight_decay = 0 +max_norm = 1 # grad clip +warmup_ratio = 0.1 # 0.03 + +# Save +save_steps = 250 +save_total_limit = -1 # Maximum checkpoints to keep (-1 means unlimited) + +# Evaluate the generation performance during the training +evaluation_freq = 500 +SYSTEM = SYSTEM_TEMPLATE.alpaca +evaluation_inputs = [ + 'What famous British author, known for his tales of mystery and the macabre, shares his initials with a common abbreviation for "rest in peace"?', # noqa: E501 + 'Please tell me five scenic spots in Shanghai', + '890729 - 425663? Only respond with math and no words.' +] + +####################################################################### +# PART 2 Model & Tokenizer # +####################################################################### +tokenizer = dict( + type=AutoTokenizer.from_pretrained, + pretrained_model_name_or_path=pretrained_model_name_or_path, + trust_remote_code=True, + padding_side='right') + +model = dict( + type=DPO, + loss_type=dpo_loss_type, + use_varlen_attn=use_varlen_attn, + beta=loss_beta, + label_smoothing=label_smoothing, + llm=dict( + type=AutoModelForCausalLM.from_pretrained, + pretrained_model_name_or_path=pretrained_model_name_or_path, + trust_remote_code=True, + torch_dtype=torch.bfloat16, + ), + ref_llm=dict( ##### initialization of ref_llm ####### + type=AutoModelForCausalLM.from_pretrained, + pretrained_model_name_or_path=pretrained_model_name_or_path, + trust_remote_code=True, + torch_dtype=torch.bfloat16, + ), +) + # llm=dict( + # type=AutoModelForCausalLM.from_pretrained, + # pretrained_model_name_or_path=pretrained_model_name_or_path, + # trust_remote_code=True, + # torch_dtype=torch.float16, + # quantization_config=dict( + # type=BitsAndBytesConfig, + # load_in_4bit=True, + # load_in_8bit=False, + # llm_int8_threshold=6.0, + # llm_int8_has_fp16_weight=False, + # bnb_4bit_compute_dtype=torch.float16, + # bnb_4bit_use_double_quant=True, + # bnb_4bit_quant_type='nf4')), + # lora=dict( + # type=LoraConfig, + # r=64, + # lora_alpha=16, + # lora_dropout=0.1, + # bias='none', + # task_type='CAUSAL_LM')) +####################################################################### +# PART 3 Dataset & Dataloader # +####################################################################### +sampler = SequenceParallelSampler \ + if sequence_parallel_size > 1 else DefaultSampler + +train_dataset = dict( + type=build_preference_dataset, + # dataset=dict(type=load_jsonl_dataset,data_files=["/home/pingbowen/workspace/DDPO/openr/data/step_wise/range_2_4k_res_new_refined.jsonl","/home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/ultrafeedback_binarized.jsonl","/home/pingbowen/workspace/DDPO/openr/data/step_wise/range_4_16k_res_new_refined.jsonl","/home/pingbowen/workspace/DDPO/openr/data/step_wise/range_16_32k_res_new.jsonl"]), # , /home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/test_new.jsonl + dataset=dict(type=load_dataset, path='llamafactory/ultrafeedback_binarized'), # mlabonne/orpo-dpo-mix-40k + tokenizer=tokenizer, + max_length=max_length, + dataset_map_fn=ultrafeedback_dpo_map_fn, + is_dpo=True, + is_reward=False, + reward_token_id=-1, + num_proc=32, + use_varlen_attn=use_varlen_attn, + max_packed_length=max_packed_length, + shuffle_before_pack=True, +) + +train_dataloader = dict( + batch_size=batch_size, + num_workers=dataloader_num_workers, + dataset=train_dataset, + sampler=dict(type=sampler, shuffle=True), + collate_fn=dict( + type=preference_collate_fn, use_varlen_attn=use_varlen_attn)) + +####################################################################### +# PART 4 Scheduler & Optimizer # +####################################################################### +# optimizer +optim_wrapper = dict( + type=AmpOptimWrapper, + optimizer=dict( + type=optim_type, lr=lr, betas=betas, weight_decay=weight_decay), + clip_grad=dict(max_norm=max_norm, error_if_nonfinite=False), + accumulative_counts=accumulative_counts, + loss_scale='dynamic', + dtype='bfloat16') + +# learning policy +# More information: https://github.com/open-mmlab/mmengine/blob/main/docs/en/tutorials/param_scheduler.md # noqa: E501 +param_scheduler = [ + dict( + type=LinearLR, + start_factor=1e-5, + by_epoch=True, + begin=0, + end=warmup_ratio * max_epochs, + convert_to_iter_based=True), + dict( + type=CosineAnnealingLR, + eta_min=0.0, + by_epoch=True, + begin=warmup_ratio * max_epochs, + end=max_epochs, + convert_to_iter_based=True) +] + +# train, val, test setting +train_cfg = dict(type=TrainLoop, max_epochs=max_epochs) + +####################################################################### +# PART 5 Runtime # +####################################################################### +# Log the dialogue periodically during the training process, optional + +custom_hooks = [ + dict(type=DatasetInfoHook, tokenizer=tokenizer), + # dict( + # type=EvaluateChatHook, + # tokenizer=tokenizer, + # every_n_iters=evaluation_freq, + # evaluation_inputs=evaluation_inputs, + # system=SYSTEM, + # prompt_template=prompt_template) +] + +if use_varlen_attn: + custom_hooks += [dict(type=VarlenAttnArgsToMessageHubHook)] + +# configure default hooks +default_hooks = dict( + # record the time of every iteration. + timer=dict(type=IterTimerHook), + # print log every 10 iterations. + logger=dict(type=LoggerHook, log_metric_by_epoch=False, interval=10), + # enable the parameter scheduler. + param_scheduler=dict(type=ParamSchedulerHook), + # save checkpoint per `save_steps`. + checkpoint=dict( + type=CheckpointHook, + by_epoch=False, + interval=save_steps, + max_keep_ckpts=save_total_limit), + # set sampler seed in distributed evrionment. + sampler_seed=dict(type=DistSamplerSeedHook), +) + +# configure environment +env_cfg = dict( + # whether to enable cudnn benchmark + cudnn_benchmark=False, + # set multi process parameters + mp_cfg=dict(mp_start_method='fork', opencv_num_threads=0), + # set distributed parameters + dist_cfg=dict(backend='nccl'), +) + +# set visualizer +visualizer = None + +# set log level +log_level = 'INFO' + +# load from which checkpoint +load_from = None + +# whether to resume training from the loaded checkpoint +resume = False + +# Defaults to use random seed and disable `deterministic` +randomness = dict(seed=42, deterministic=False) + +# set log processor +log_processor = dict(by_epoch=False) diff --git a/run_copy.sh b/run_copy.sh new file mode 100644 index 000000000..def1931cb --- /dev/null +++ b/run_copy.sh @@ -0,0 +1,58 @@ +model_type=$1 # 或者设置为 "qwen" + +# 根据 model_type 设置 x 的值 +MAX_TIME=18000 + +if [ "$model_type" == "llama" ]; then + save_dir=./work_dirs/saved_model/llama3.1_ultrafeedback_wildchat_refine_0.75_merge + file=llama3_8b_instruct_dpo.py +elif [ "$model_type" == "qwen" ]; then + save_dir=./work_dirs/saved_model/qwen2.5_ultrafeedback + file=qwen2_instruct_dpo.py +else + echo "未知的 model_type" + exit 1 +fi + +mkdir -p $save_dir + +NPROC_PER_NODE=8 xtuner train $file --deepspeed deepspeed_zero3_offload --seed 42 --work-dir $save_dir + +PID=$! +START_TIME=$(date +%s) + +while true; do + # 获取当前时间 + CURRENT_TIME=$(date +%s) + + # 计算运行的时间差(单位:秒) + ELAPSED_TIME=$((CURRENT_TIME - START_TIME)) + + # 如果运行超过 5 小时,杀死程序 + if [ $ELAPSED_TIME -ge $MAX_TIME ]; then + echo "程序运行超过 5 小时,正在杀死进程 $PID ..." + kill -9 $PID + break + fi + + # 检查程序是否仍在运行 + if ! ps -p $PID > /dev/null; then + echo "程序已正常结束。" + break + fi + + # 每隔 60 秒检查一次 + sleep 60 +done + +save_iters=(iter_250 iter_500 iter_750) +for i in {0..2}; do + pth=$save_dir/${save_iters[$i]}.pth + SAVE_PATH=$save_dir/${save_iters[$i]} + + mkdir -p ${SAVE_PATH} + + xtuner convert pth_to_hf $file \ + ${pth} \ + ${SAVE_PATH} +done \ No newline at end of file diff --git a/xtuner/dataset/preference_dataset.py b/xtuner/dataset/preference_dataset.py index 933a5e6e6..a56411b10 100644 --- a/xtuner/dataset/preference_dataset.py +++ b/xtuner/dataset/preference_dataset.py @@ -139,25 +139,36 @@ def tokenize(pair: str, # pair['prompt'] + pair['rejected'], # tokenize=False, # add_generation_prompt=False) - - def process_message(messages): - prompt = '' - for message in messages: - if message['role'] == 'user': - prompt += '[INST]' + message['content'] + '[/INST]' - elif message['role'] == 'added_user': - prompt += message['content'] - elif message['role'] =='system': - prompt += '<>\\n' + message['content'] + '\\n<>\\n\\n' - elif message['role'] == 'added_assistant': - prompt += message['content'] - elif message['role'] == 'assistant': - prompt += message['content'] + tokenizer.eos_token - return prompt - - prompt = pair['prompt'][0]['content'] if pair['prompt'][0]['role'] != "user" else process_message(pair['prompt']) - chosen = process_message(pair['prompt'] + pair['chosen']) - rejected = process_message(pair['prompt'] + pair['rejected']) + if tokenizer.chat_template is None: + def process_message(messages): + prompt = '' + for message in messages: + if message['role'] == 'user': + prompt += '[INST]' + message['content'] + '[/INST]' + elif message['role'] == 'added_user': + prompt += message['content'] + elif message['role'] =='system': + prompt += '<>\\n' + message['content'] + '\\n<>\\n\\n' + elif message['role'] == 'added_assistant': + prompt += message['content'] + elif message['role'] == 'assistant': + prompt += message['content'] + tokenizer.eos_token + return prompt + + prompt = pair['prompt'][0]['content'] if pair['prompt'][0]['role'] != "user" else process_message(pair['prompt']) + chosen = process_message(pair['prompt'] + pair['chosen']) + rejected = process_message(pair['prompt'] + pair['rejected']) + else: + prompt = tokenizer.apply_chat_template( + pair['prompt'], tokenize=False, add_generation_prompt=True) + chosen = tokenizer.apply_chat_template( + pair['prompt'] + pair['chosen'], + tokenize=False, + add_generation_prompt=False) + rejected = tokenizer.apply_chat_template( + pair['prompt'] + pair['rejected'], + tokenize=False, + add_generation_prompt=False) prompt_ids = tokenizer.encode(prompt, add_special_tokens=False) chosen_ids = tokenizer.encode(chosen, add_special_tokens=False) From 6bcb6f58fcda8da25a98b05da03d33e5b1ad4620 Mon Sep 17 00:00:00 2001 From: root Date: Tue, 10 Dec 2024 20:25:21 +0800 Subject: [PATCH 6/6] fix bug for train Qwen --- llama3_8b_instruct_dpo.py | 4 +- qwen2_instruct_dpo.py | 7 ++- requirements/runtime.txt | 2 +- run_copy.sh | 54 +++++++++++----------- xtuner/dataset/preference_dataset.py | 67 +++++++++++++++------------- xtuner/model/dpo.py | 5 --- 6 files changed, 68 insertions(+), 71 deletions(-) diff --git a/llama3_8b_instruct_dpo.py b/llama3_8b_instruct_dpo.py index 6d5bdf908..e35c66619 100644 --- a/llama3_8b_instruct_dpo.py +++ b/llama3_8b_instruct_dpo.py @@ -25,7 +25,7 @@ # PART 1 Settings # ####################################################################### # Model -pretrained_model_name_or_path = '/home/pingbowen/models/Longwriter-8b' +pretrained_model_name_or_path = 'THUDM/LongWriter-llama3.1-8b' use_varlen_attn = True dpo_loss_type = 'sigmoid' # One of ['sigmoid', 'hinge', 'ipo', 'kto_pair', 'sppo_hard', 'nca_pair', 'robust'] # noqa: E501 loss_beta = 0.1 @@ -124,7 +124,7 @@ train_dataset = dict( type=build_preference_dataset, # dataset=dict(type=load_jsonl_dataset,data_files=["/home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/longwriter_len_calssify/range_2_4k_res.jsonl","/home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/ultrafeedback_binarized.jsonl","/home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/longwriter_len_calssify/range_16_32k_res_filter.jsonl"]), - dataset=dict(type=load_jsonl_dataset,data_files=["/home/pingbowen/workspace/DDPO/openr/data/step_wise/range_2_4k_res_0.7.jsonl","/home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/ultrafeedback_binarized.jsonl","/home/pingbowen/workspace/DDPO/openr/data/step_wise/range_4_16k_res_0.7.jsonl","/home/pingbowen/workspace/DDPO/openr/data/step_wise/range_16_32k_res_new.jsonl"]), + dataset=dict(type=load_jsonl_dataset,data_files=["/mnt/gemininjceph2/geminicephfs/pr-others-prctrans/pingbowen/workspace/MCTS_DPO/MCTS-dpo/data/step_wise/range_2_4k_res_new_refined.jsonl","/mnt/gemininjceph2/geminicephfs/pr-others-prctrans/pingbowen/workspace/MCTS_DPO/MCTS-dpo/data/ultrafeedback_binarized.jsonl","/mnt/gemininjceph2/geminicephfs/pr-others-prctrans/pingbowen/workspace/MCTS_DPO/MCTS-dpo/data/step_wise/range_4_16k_res_new.jsonl","/mnt/gemininjceph2/geminicephfs/pr-others-prctrans/pingbowen/workspace/MCTS_DPO/MCTS-dpo/data/step_wise/range_16_32k_res_new.jsonl"]), # dataset=dict(type=load_jsonl_dataset,data_files=["/home/pingbowen/workspace/DDPO/openr/data/step_wise/range_2_4k_res_new_refined.jsonl","/home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/ultrafeedback_binarized.jsonl","/home/pingbowen/workspace/DDPO/openr/data/step_wise/range_4_16k_res_new_refined.jsonl","/home/pingbowen/workspace/DDPO/openr/data/step_wise/range_16_32k_res_new.jsonl"]) # dataset=dict(type=load_dataset, path='llamafactory/ultrafeedback_binarized'), # mlabonne/orpo-dpo-mix-40k tokenizer=tokenizer, diff --git a/qwen2_instruct_dpo.py b/qwen2_instruct_dpo.py index 483c5e126..96bab6cfc 100644 --- a/qwen2_instruct_dpo.py +++ b/qwen2_instruct_dpo.py @@ -20,12 +20,11 @@ from xtuner.model.dpo import DPO from xtuner.parallel.sequence import SequenceParallelSampler from xtuner.utils import PROMPT_TEMPLATE, SYSTEM_TEMPLATE - ####################################################################### # PART 1 Settings # ####################################################################### # Model -pretrained_model_name_or_path = '/home/pingbowen/models/LongWriter-Qwen2.5-7B-Instruct' +pretrained_model_name_or_path = '/mnt/gemininjceph2/geminicephfs/pr-others-prctrans/pingbowen/models/LongWriter-Qwen2.5-7B-Instruct/' use_varlen_attn = True dpo_loss_type = 'sigmoid' # One of ['sigmoid', 'hinge', 'ipo', 'kto_pair', 'sppo_hard', 'nca_pair', 'robust'] # noqa: E501 loss_beta = 0.1 @@ -123,8 +122,8 @@ train_dataset = dict( type=build_preference_dataset, - # dataset=dict(type=load_jsonl_dataset,data_files=["/home/pingbowen/workspace/DDPO/openr/data/step_wise/range_2_4k_res_new_refined.jsonl","/home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/ultrafeedback_binarized.jsonl","/home/pingbowen/workspace/DDPO/openr/data/step_wise/range_4_16k_res_new_refined.jsonl","/home/pingbowen/workspace/DDPO/openr/data/step_wise/range_16_32k_res_new.jsonl"]), # , /home/pingbowen/workspace/DDPO/DDPO-generate_long/data/dpo_data/test_new.jsonl - dataset=dict(type=load_dataset, path='llamafactory/ultrafeedback_binarized'), # mlabonne/orpo-dpo-mix-40k + dataset=dict(type=load_jsonl_dataset,data_files=["/mnt/gemininjceph2/geminicephfs/pr-others-prctrans/pingbowen/workspace/MCTS_DPO/MCTS-dpo/data/qwen_step_wise_fix_bug/range_2_4k_res_filtered.jsonl","/mnt/gemininjceph2/geminicephfs/pr-others-prctrans/pingbowen/workspace/MCTS_DPO/MCTS-dpo/data/ultrafeedback_binarized.jsonl","/mnt/gemininjceph2/geminicephfs/pr-others-prctrans/pingbowen/workspace/MCTS_DPO/MCTS-dpo/data/qwen_step_wise_fix_bug/range_4_16k_res_filtered.jsonl","/mnt/gemininjceph2/geminicephfs/pr-others-prctrans/pingbowen/workspace/MCTS_DPO/MCTS-dpo/data/qwen_step_wise_fix_bug/range_16_32k_res.jsonl"]), + # dataset=dict(type=load_dataset, path='llamafactory/ultrafeedback_binarized'), # mlabonne/orpo-dpo-mix-40k tokenizer=tokenizer, max_length=max_length, dataset_map_fn=ultrafeedback_dpo_map_fn, diff --git a/requirements/runtime.txt b/requirements/runtime.txt index 3a4d2f84e..380bf843c 100644 --- a/requirements/runtime.txt +++ b/requirements/runtime.txt @@ -15,7 +15,7 @@ scikit-image scipy SentencePiece tiktoken -torch +torch==2.3.1 torchvision # Minimum 4.36.0 to support `Cache` data structure used by KV Cache # Registering a causal mask in `LlamaModel` is not friendly for very large diff --git a/run_copy.sh b/run_copy.sh index def1931cb..571db5085 100644 --- a/run_copy.sh +++ b/run_copy.sh @@ -1,49 +1,49 @@ model_type=$1 # 或者设置为 "qwen" # 根据 model_type 设置 x 的值 -MAX_TIME=18000 +# MAX_TIME=18000 if [ "$model_type" == "llama" ]; then save_dir=./work_dirs/saved_model/llama3.1_ultrafeedback_wildchat_refine_0.75_merge file=llama3_8b_instruct_dpo.py elif [ "$model_type" == "qwen" ]; then - save_dir=./work_dirs/saved_model/qwen2.5_ultrafeedback + save_dir=./work_dirs/saved_model/qwen2.5_ultrafeedback_longwriter_step_wise file=qwen2_instruct_dpo.py else echo "未知的 model_type" exit 1 fi +save_dir=./work_dirs/saved_model/qwen2.5_ultrafeedback_longwriter_step_wise mkdir -p $save_dir +NPROC_PER_NODE=8 xtuner train $file --deepspeed deepspeed_zero3_offload --seed 42 --work-dir $save_dir -NPROC_PER_NODE=8 xtuner train $file --deepspeed deepspeed_zero3_offload --seed 42 --work-dir $save_dir +# PID=$! +# START_TIME=$(date +%s) -PID=$! -START_TIME=$(date +%s) - -while true; do - # 获取当前时间 - CURRENT_TIME=$(date +%s) +# while true; do +# # 获取当前时间 +# CURRENT_TIME=$(date +%s) - # 计算运行的时间差(单位:秒) - ELAPSED_TIME=$((CURRENT_TIME - START_TIME)) - - # 如果运行超过 5 小时,杀死程序 - if [ $ELAPSED_TIME -ge $MAX_TIME ]; then - echo "程序运行超过 5 小时,正在杀死进程 $PID ..." - kill -9 $PID - break - fi +# # 计算运行的时间差(单位:秒) +# ELAPSED_TIME=$((CURRENT_TIME - START_TIME)) + +# # 如果运行超过 5 小时,杀死程序 +# if [ $ELAPSED_TIME -ge $MAX_TIME ]; then +# echo "程序运行超过 5 小时,正在杀死进程 $PID ..." +# kill -9 $PID +# break +# fi - # 检查程序是否仍在运行 - if ! ps -p $PID > /dev/null; then - echo "程序已正常结束。" - break - fi - - # 每隔 60 秒检查一次 - sleep 60 -done +# # 检查程序是否仍在运行 +# if ! ps -p $PID > /dev/null; then +# echo "程序已正常结束。" +# break +# fi + +# # 每隔 60 秒检查一次 +# sleep 60 +# done save_iters=(iter_250 iter_500 iter_750) for i in {0..2}; do diff --git a/xtuner/dataset/preference_dataset.py b/xtuner/dataset/preference_dataset.py index a56411b10..8ec83b9e9 100644 --- a/xtuner/dataset/preference_dataset.py +++ b/xtuner/dataset/preference_dataset.py @@ -159,16 +159,32 @@ def process_message(messages): chosen = process_message(pair['prompt'] + pair['chosen']) rejected = process_message(pair['prompt'] + pair['rejected']) else: - prompt = tokenizer.apply_chat_template( - pair['prompt'], tokenize=False, add_generation_prompt=True) - chosen = tokenizer.apply_chat_template( - pair['prompt'] + pair['chosen'], - tokenize=False, - add_generation_prompt=False) - rejected = tokenizer.apply_chat_template( - pair['prompt'] + pair['rejected'], - tokenize=False, - add_generation_prompt=False) + def process_message(messages): + prompt = '' + for message in messages: + if message['role'] == 'user': + prompt += '<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n<|im_start|>user\n' + message['content'] + '<|im_end|>\n' + elif message['role'] == 'added_user': + prompt += message['content'] + elif message['role'] == 'added_assistant': + prompt += message['content'] + elif message['role'] == 'assistant': + prompt += message['content'] + tokenizer.eos_token + return prompt + + prompt = '<|im_start|>system\nYou are a helpful assistant.<|im_end|>\n' + pair['prompt'][0]['content'] if pair['prompt'][0]['role'] != "user" else process_message(pair['prompt']) + chosen = process_message(pair['prompt'] + pair['chosen']) + rejected = process_message(pair['prompt'] + pair['rejected']) + # prompt = tokenizer.apply_chat_template( + # pair['prompt'], tokenize=False, add_generation_prompt=True) + # chosen = tokenizer.apply_chat_template( + # pair['prompt'] + pair['chosen'], + # tokenize=False, + # add_generation_prompt=False) + # rejected = tokenizer.apply_chat_template( + # pair['prompt'] + pair['rejected'], + # tokenize=False, + # add_generation_prompt=False) prompt_ids = tokenizer.encode(prompt, add_special_tokens=False) chosen_ids = tokenizer.encode(chosen, add_special_tokens=False) @@ -192,7 +208,7 @@ def process_message(messages): chosen_ids[prompt_len:]) rejected_labels = [-100] * prompt_len + copy.deepcopy( rejected_ids[prompt_len:]) - + return { 'chosen_ids': chosen_ids, 'rejected_ids': rejected_ids, @@ -344,30 +360,17 @@ def __getitem__(self, index): pairs = self.data[index] input_ids, cu_seqlens, position_ids, labels = [], [0], [], [] for pair in pairs: - if not pair.get('concated', False): - input_ids.extend(pair['chosen_ids']) - input_ids.extend(pair['rejected_ids']) - - position_ids.extend(list(range(len(pair['chosen_ids'])))) - position_ids.extend(list(range(len(pair['rejected_ids'])))) + input_ids.extend(pair['chosen_ids']) + input_ids.extend(pair['rejected_ids']) - labels.extend(pair['chosen_labels']) - labels.extend(pair['rejected_labels']) + position_ids.extend(list(range(len(pair['chosen_ids'])))) + position_ids.extend(list(range(len(pair['rejected_ids'])))) - cu_seqlens.append(cu_seqlens[-1] + len(pair['chosen_ids'])) - cu_seqlens.append(cu_seqlens[-1] + len(pair['rejected_ids'])) - else: - input_ids.extend(pair['chosen_ids']) - input_ids.extend(pair['rejected_ids']) + labels.extend(pair['chosen_labels']) + labels.extend(pair['rejected_labels']) - labels.extend(pair['chosen_labels']) - labels.extend(pair['rejected_labels']) - - position_ids.extend(pair.get('position_ids',None)) - seq_lens = pair.get('seq_len',None) - - for seq_len in seq_lens: - cu_seqlens.append(cu_seqlens[-1] + seq_len) + cu_seqlens.append(cu_seqlens[-1] + len(pair['chosen_ids'])) + cu_seqlens.append(cu_seqlens[-1] + len(pair['rejected_ids'])) return { diff --git a/xtuner/model/dpo.py b/xtuner/model/dpo.py index c6ccb3c50..a0bc4a7f1 100644 --- a/xtuner/model/dpo.py +++ b/xtuner/model/dpo.py @@ -162,11 +162,6 @@ def compute_loss(self, data, data_samples=None): data = self._split_for_sequence_parallel(data) all_logits = self.llm(**data).logits - - # if torch.distributed.get_rank() == 0: - # from transformers import AutoTokenizer - # tokenizer = AutoTokenizer.from_pretrained('/home/pingbowen/models/Longwriter-8b') - # print(tokenizer.batch_decode(data['input_ids'][:,:1000])) with torch.no_grad(): if self.ref_llm is None: