Skip to content

Commit

Permalink
Last fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
TJ-Solergibert committed Aug 7, 2024
1 parent 2d91154 commit ce068fd
Show file tree
Hide file tree
Showing 10 changed files with 156 additions and 56 deletions.
39 changes: 19 additions & 20 deletions examples/config_multilingual_nanoset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ data_stages:
- data:
dataset:
training_folder:
datasets/c4-es/train: 0.85
datasets/c4-en/train: 0.05
datasets/c4-fr/train: 0.1
- datasets/c4-es/train
- datasets/c4-en/train
- datasets/c4-fr/train
validation_folder:
- datasets/c4-es/validation
- datasets/c4-en/validation
- datasets/c4-fr/validation
lang_to_ids:
es: 128002
en: 128003
fr: 128004
languages:
- es
- en
- fr
num_loading_workers: 1
seed: 42
name: General purpose training (Blended dataset)
Expand All @@ -29,12 +29,12 @@ data_stages:
- datasets/c4-es/train
validation_folder:
- datasets/c4-es/validation
lang_to_ids:
es: 128002
languages:
- es
num_loading_workers: 1
seed: 42
name: Second purpose training (Single dataset)
start_training_step: 100
start_training_step: 1000
- data:
dataset:
training_folder:
Expand All @@ -45,20 +45,19 @@ data_stages:
- datasets/c4-es/validation
- datasets/c4-en/validation
- datasets/c4-fr/validation
lang_to_ids:
es: 128002
en: 128003
fr: 128004

languages:
- es
- en
- fr
num_loading_workers: 1
seed: 42
name: Third purpose training (>1 dataset)
start_training_step: 200
start_training_step: 2000
general:
benchmark_csv_path: null
consumed_train_samples: null
ignore_sanity_checks: true
project: Multilingual
project: MultilingualV2
run: llama
seed: 42
step: null
Expand Down Expand Up @@ -114,7 +113,7 @@ optimizer:
weight_decay: 0.01
zero_stage: 0
parallelism:
dp: 1
dp: 2
expert_parallel_size: 1
pp: 1
pp_engine: 1f1b
Expand All @@ -132,5 +131,5 @@ tokens:
limit_val_batches: 10
micro_batch_size: 3
sequence_length: 4096
train_steps: 800
val_check_interval: 50
train_steps: 500
val_check_interval: 100
4 changes: 2 additions & 2 deletions run_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ def get_dataloader_from_data_stage(
sequence_length=trainer.sequence_length,
token_size=token_size,
train_split_num_samples=trainer.config.tokens.train_steps * trainer.global_batch_size,
dataset_tokens=data.dataset.dataset_tokens,
random_seed=data.seed,
)

Expand All @@ -209,6 +208,7 @@ def get_dataloader_from_data_stage(
consumed_train_samples=consumed_train_samples,
dataloader_num_workers=data.num_loading_workers,
dataloader_drop_last=True,
is_multilingual=True,
)

return train_dataloader
Expand Down Expand Up @@ -241,7 +241,6 @@ def get_valid_dataloader_from_data_stage(
dataset_folders=data.dataset.validation_folder,
sequence_length=trainer.sequence_length,
token_size=token_size,
dataset_tokens=data.dataset.dataset_tokens,
is_valid=True,
random_seed=data.seed,
)
Expand All @@ -257,6 +256,7 @@ def get_valid_dataloader_from_data_stage(
dataloader_num_workers=data.num_loading_workers,
dataloader_drop_last=True,
shuffle=True,
is_multilingual=True,
)

return valid_dataloader
Expand Down
11 changes: 5 additions & 6 deletions src/nanotron/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def __post_init__(self):
class MultilingualNanosetDatasetsArgs:
training_folder: Union[str, dict, List[str]]
validation_folder: Union[str, List[str]]
lang_to_ids: dict # Mapping from the previously defined folders to tokens. Respect the order
languages: List[str] # NOTE(tj.solergibert) Required for 1. Aggregating the result 2. Reporting to WANDB

def __post_init__(self):
if isinstance(self.training_folder, str): # Case 1: 1 Dataset folder
Expand All @@ -125,14 +125,13 @@ def __post_init__(self):
self.training_folder = list(tmp_training_folder.keys())
self.dataset_weights = list(tmp_training_folder.values())

self.ids_to_lang = {v: k for k, v in self.lang_to_ids.items()}
self.dataset_tokens = list(self.lang_to_ids.values())
assert len(self.training_folder) == len(
self.languages
), f"The sizes of training_folder and languages mismatch ({len(self.training_folder)} vs {len(self.languages)})"

assert len(self.training_folder) == len(
self.validation_folder
), f"The sizes of training_folder and validation_folder mismatch ({len(self.training_folder)} vs {len(self.validation_folder)})"
assert len(self.training_folder) == len(
self.dataset_tokens
), f"The sizes of training_folder and lang_to_ids mismatch ({len(self.training_folder)} vs {len(self.dataset_tokens)})"


@dataclass
Expand Down
73 changes: 73 additions & 0 deletions src/nanotron/data/collator.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,76 @@ def __call__(self, examples: List[Dict[str, List[np.ndarray]]]) -> Dict[str, Uni
)

return result


@dataclasses.dataclass
class MultilingualNanosetDataCollatorForCLM:
"""
Data collator used for causal language modeling with Nanosets dataset.
- input_pp_rank: Discards last input id token
- output_pp_rank: Discards first label id token
- other pp ranks: Don't have data. Instead, we use `TensorPointer` to point to the rank having the data.
"""

sequence_length: int
input_pp_rank: int
output_pp_rank: int
parallel_context: ParallelContext

def __call__(self, examples: List[Dict[str, List[np.ndarray]]]) -> Dict[str, Union[torch.Tensor, TensorPointer]]:
# Process the case when current rank doesn't require data. We return `TensorPointer` that points to ranks having the data.
current_pp_rank = dist.get_rank(self.parallel_context.pp_pg)
if current_pp_rank not in [
self.input_pp_rank,
self.output_pp_rank,
]:
assert all(len(example) == 0 for example in examples)
return {
"input_ids": TensorPointer(group_rank=self.input_pp_rank),
"input_mask": TensorPointer(group_rank=self.input_pp_rank),
"lang_code": TensorPointer(group_rank=self.input_pp_rank),
"label_ids": TensorPointer(group_rank=self.output_pp_rank),
"label_mask": TensorPointer(group_rank=self.output_pp_rank),
}

# TODO @nouamanetazi: Is it better to have examples as np.array or torch.Tensor?
input_ids = torch.vstack([examples[i]["input_ids"] for i in range(len(examples))]) # (b, s)
lang_code = torch.vstack([examples[i]["lang_code"] for i in range(len(examples))]) # (b, 1)
batch_size, expanded_input_length = input_ids.shape

result: Dict[str, Union[torch.LongTensor, TensorPointer]] = {}

result["input_ids"] = TensorPointer(group_rank=self.input_pp_rank)
result["input_mask"] = TensorPointer(group_rank=self.input_pp_rank)
result["lang_code"] = TensorPointer(group_rank=self.input_pp_rank)
result["label_ids"] = TensorPointer(group_rank=self.output_pp_rank)
result["label_mask"] = TensorPointer(group_rank=self.output_pp_rank)

assert (
expanded_input_length == self.sequence_length + 1
), f"Samples should be of length {self.sequence_length + 1} (seq_len+1), but got {expanded_input_length}"

# Process inputs: last token is the label
if current_pp_rank == self.input_pp_rank:
result["input_ids"] = input_ids[:, :-1]
result["input_mask"] = torch.ones((batch_size, self.sequence_length), dtype=torch.bool)
result["lang_code"] = lang_code

# Process labels: shift them to the left
if current_pp_rank == self.output_pp_rank:
result["label_ids"] = input_ids[:, 1:]
result["label_mask"] = torch.ones((batch_size, self.sequence_length), dtype=torch.bool)

if isinstance(result["input_ids"], torch.Tensor) and result["input_ids"].shape[-1] != self.sequence_length:
raise ValueError(
f"`labels` are incorrectly preprocessed. `labels` length is {result['input_ids'].shape[-1]}, but should be"
f" {self.sequence_length}."
)
if isinstance(result["label_ids"], torch.Tensor) and result["label_ids"].shape[-1] != self.sequence_length:
raise ValueError(
f"`labels` are incorrectly preprocessed. `labels` length is {result['label_ids'].shape[-1]}, but should be"
f" {self.sequence_length}."
)

return result
11 changes: 10 additions & 1 deletion src/nanotron/data/dataloader_builder.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import nanotron.distributed as dist
from nanotron import logging
from nanotron.data.collator import NanosetDataCollatorForCLM
from nanotron.data.collator import MultilingualNanosetDataCollatorForCLM, NanosetDataCollatorForCLM
from nanotron.dataloader import (
EmptyInfiniteDataset,
get_dataloader_worker_init,
Expand All @@ -20,6 +20,7 @@ def build_nanoset_dataloader(
output_pp_rank: int,
micro_batch_size: int,
dataloader_num_workers: int,
is_multilingual: bool = False,
consumed_train_samples: int = 0,
dataloader_drop_last: bool = True,
dataloader_pin_memory: bool = True,
Expand All @@ -40,6 +41,14 @@ def build_nanoset_dataloader(
parallel_context=parallel_context,
)

if is_multilingual:
data_collator = MultilingualNanosetDataCollatorForCLM(
sequence_length=sequence_length,
input_pp_rank=input_pp_rank,
output_pp_rank=output_pp_rank,
parallel_context=parallel_context,
)

# Compute size and rank of dataloader workers
dp_ranks_size = parallel_context.dp_pg.size()
dp_rank = parallel_context.dp_pg.rank()
Expand Down
4 changes: 1 addition & 3 deletions src/nanotron/data/multilingual_nanoset.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ def __init__(
dataset_folders: List[str],
sequence_length: int,
token_size: int,
dataset_tokens: List[int],
train_split_num_samples: int = None,
is_valid: bool = False,
dataset_weights: Union[List[float], None] = None,
Expand All @@ -47,7 +46,6 @@ def __init__(
self.sequence_length = sequence_length
self.token_size = token_size
self.train_split_num_samples = train_split_num_samples
self.dataset_tokens = dataset_tokens
self.is_valid = is_valid
self.random_seed = random_seed
self.datatrove_datasets = []
Expand Down Expand Up @@ -107,7 +105,7 @@ def __getitem__(self, idx: int) -> Dict[str, np.ndarray]:
dataset_sample = self.dataset_sample_index[idx]

tokens = self.datatrove_datasets[dataset][dataset_sample]
tokens["input_ids"][0] = self.dataset_tokens[dataset] # Prepend language token
tokens["lang_code"] = torch.tensor(dataset, dtype=torch.long)

return tokens

Expand Down
4 changes: 0 additions & 4 deletions src/nanotron/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,6 @@ def all_gather_into_tensor( # pylint: disable=function-redefined
if group is None:
group = dist.torch_dist.distributed_c10d._get_default_group()

assert (
group.size() > 1
), "You should probably not call `all_gather_into_tensor` with a single rank, as it copies data over"

if torch_version_above_1_13:
return dist.all_gather_into_tensor(
output_tensor=output_tensor, input_tensor=input_tensor, group=group, async_op=async_op
Expand Down
10 changes: 9 additions & 1 deletion src/nanotron/models/llama.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,14 +733,20 @@ def forward(
self,
input_ids: Union[torch.Tensor, TensorPointer], # [batch_size, seq_length]
input_mask: Union[torch.Tensor, TensorPointer], # [batch_size, seq_length]
lang_code: Union[torch.Tensor, TensorPointer], # [batch_size, 1]
):
return self.forward_with_hidden_states(input_ids=input_ids, input_mask=input_mask)[0]
return self.forward_with_hidden_states(input_ids=input_ids, input_mask=input_mask, lang_code=lang_code)[0]

def forward_with_hidden_states(
self,
input_ids: Union[torch.Tensor, TensorPointer], # [batch_size, seq_length]
input_mask: Union[torch.Tensor, TensorPointer], # [batch_size, seq_length]
lang_code: Union[torch.Tensor, TensorPointer], # [batch_size, 1]
):
# NOTE(tj.solergibert) I bring `lang_code` till the forward of LlamaModel. Remember that
# to use it in the different pipeline blocks you need to also set the module_input_keys & module_output_keys
# of the necessary `PipelineBlock`'s defined in the LlamaModel init!

# all tensors are optional as most ranks don't need anything from the dataloader.

output = self.token_position_embeddings(input_ids=input_ids, input_mask=input_mask)
Expand Down Expand Up @@ -863,12 +869,14 @@ def forward(
self,
input_ids: Union[torch.Tensor, TensorPointer],
input_mask: Union[torch.Tensor, TensorPointer],
lang_code: Union[torch.Tensor, TensorPointer],
label_ids: Union[torch.Tensor, TensorPointer],
label_mask: Union[torch.Tensor, TensorPointer],
) -> Dict[str, Union[torch.Tensor, TensorPointer]]:
sharded_logits = self.model(
input_ids=input_ids,
input_mask=input_mask,
lang_code=lang_code,
)
outputs = self.loss(
sharded_logits=sharded_logits,
Expand Down
6 changes: 3 additions & 3 deletions src/nanotron/parallel/pipeline_parallel/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def validate_batch_iter(
self.nb_microbatches = nb_microbatches

outputs = []
lang_ids = []
lang_codes = []

with attach_pipeline_state_to_model(model=model, pipeline_state=state):
# All forward
Expand All @@ -166,9 +166,9 @@ def validate_batch_iter(
outputs.extend(
list(output["sample_loss"])
) # NOTE(tj.solergibert) Yes, it might look useless to do list + extend but it's necessary to split the output["sample_loss"] tensor into multiple tensors
lang_ids.extend(micro_batch["input_ids"][:, 0].tolist())
lang_codes.extend(micro_batch["lang_code"].flatten().tolist())

return outputs, lang_ids
return outputs, lang_codes


class AllForwardAllBackwardPipelineEngine(PipelineEngine):
Expand Down
Loading

0 comments on commit ce068fd

Please sign in to comment.