| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | import os |
| | import os.path as osp |
| |
|
| | from huggingface_hub import repo_exists, snapshot_download |
| | from huggingface_hub.utils import HFValidationError, validate_repo_id |
| | from transformers import AutoConfig, AutoTokenizer, PretrainedConfig |
| |
|
| | from .configuration_vila import VILAConfig |
| | from .constants import MEDIA_TOKENS |
| | from .tokenizer_utils import infer_stop_tokens |
| |
|
| |
|
| | def load_tokenizer_then_handle_media_tokens_and_chat_template( |
| | model_name_or_path, config: VILAConfig, model_max_length=None |
| | ): |
| | tokenizer = AutoTokenizer.from_pretrained( |
| | osp.join(model_name_or_path, "llm"), padding_side="right", use_fast=True, legacy=False |
| | ) |
| | if model_max_length is not None: |
| | tokenizer.model_max_length = model_max_length |
| |
|
| | |
| | if getattr(config, "chat_template", None) is not None: |
| | print(f"Using chat template: {config.chat_template}") |
| | fpath = os.path.join(os.path.dirname(__file__), "chat_templates", f"{config.chat_template}.jinja") |
| | if not os.path.exists(fpath): |
| | fpath = os.path.join(model_name_or_path, f"{config.chat_template}.jinja") |
| | with open(fpath) as fd: |
| | chat_template = fd.read() |
| | tokenizer.chat_template = chat_template.replace(" ", "").replace("\n", "") |
| |
|
| | |
| | tokenizer.stop_tokens = infer_stop_tokens(tokenizer) |
| | tokenizer.stop_token_ids = tokenizer.convert_tokens_to_ids(tokenizer.stop_tokens) |
| |
|
| | |
| | tokenizer.media_tokens = MEDIA_TOKENS |
| | tokenizer.media_token_ids = {} |
| | for name, token in MEDIA_TOKENS.items(): |
| | tokenizer.add_tokens([token], special_tokens=True) |
| | tokenizer.media_token_ids[name] = tokenizer.convert_tokens_to_ids(token) |
| |
|
| | return tokenizer |
| |
|
| |
|
| | def get_model_config(config): |
| | default_keys = ["llm_cfg", "vision_tower_cfg", "mm_projector_cfg"] |
| |
|
| | if hasattr(config, "_name_or_path") and len(config._name_or_path) >= 2: |
| | root_path = config._name_or_path |
| | else: |
| | root_path = config.resume_path |
| |
|
| | |
| | if root_path is not None and not osp.exists(root_path): |
| | try: |
| | valid_hf_repo = repo_exists(root_path) |
| | except HFValidationError as e: |
| | valid_hf_repo = False |
| | if valid_hf_repo: |
| | root_path = snapshot_download(root_path) |
| |
|
| | return_list = [] |
| | for key in default_keys: |
| | cfg = getattr(config, key, None) |
| | if isinstance(cfg, dict): |
| | try: |
| | return_list.append(os.path.join(root_path, key[:-4])) |
| | except: |
| | raise ValueError(f"Cannot find resume path in config for {key}!") |
| | elif isinstance(cfg, PretrainedConfig): |
| | return_list.append(os.path.join(root_path, key[:-4])) |
| | elif isinstance(cfg, str): |
| | return_list.append(cfg) |
| |
|
| | return return_list |
| |
|
| |
|
| | def get_model_config_fp8(config): |
| | default_keys = ["llm_cfg", "vision_tower_cfg", "mm_projector_cfg"] |
| |
|
| | if hasattr(config, "_name_or_path") and len(config._name_or_path) >= 2: |
| | root_path = config._name_or_path |
| | else: |
| | root_path = config.resume_path |
| |
|
| | |
| | if root_path is not None and not osp.exists(root_path): |
| | try: |
| | valid_hf_repo = repo_exists(root_path) |
| | except HFValidationError as e: |
| | valid_hf_repo = False |
| | if valid_hf_repo: |
| | root_path = snapshot_download(root_path) |
| |
|
| | return_list = [] |
| | for key in default_keys: |
| | cfg = getattr(config, key, None) |
| | if isinstance(cfg, dict): |
| | try: |
| | return_list.append(os.path.join(root_path, key[:-4])) |
| | except: |
| | raise ValueError(f"Cannot find resume path in config for {key}!") |
| | elif isinstance(cfg, PretrainedConfig): |
| | return_list.append(os.path.join(root_path, key[:-4])) |
| | elif isinstance(cfg, str): |
| | return_list.append(cfg) |
| |
|
| | |
| | key = "fp8_llm_cfg" |
| | directory_path = os.path.join(root_path, key[:-4]) |
| | assert os.path.isdir(directory_path) and os.listdir( |
| | directory_path |
| | ), "You need to first convert the model weights to FP8 explicitly." |
| | return_list.append(directory_path) |
| |
|
| | return return_list |
| |
|
| |
|
| | def get_model_config_fp8(config): |
| | default_keys = ["llm_cfg", "vision_tower_cfg", "mm_projector_cfg"] |
| |
|
| | if hasattr(config, "_name_or_path") and len(config._name_or_path) >= 2: |
| | root_path = config._name_or_path |
| | else: |
| | root_path = config.resume_path |
| |
|
| | |
| | if root_path is not None and not osp.exists(root_path): |
| | try: |
| | valid_hf_repo = repo_exists(root_path) |
| | except HFValidationError as e: |
| | valid_hf_repo = False |
| | if valid_hf_repo: |
| | root_path = snapshot_download(root_path) |
| |
|
| | return_list = [] |
| | for key in default_keys: |
| | cfg = getattr(config, key, None) |
| | if isinstance(cfg, dict): |
| | try: |
| | return_list.append(os.path.join(root_path, key[:-4])) |
| | except: |
| | raise ValueError(f"Cannot find resume path in config for {key}!") |
| | elif isinstance(cfg, PretrainedConfig): |
| | return_list.append(os.path.join(root_path, key[:-4])) |
| | elif isinstance(cfg, str): |
| | return_list.append(cfg) |
| |
|
| | |
| | key = "fp8_llm_cfg" |
| | directory_path = os.path.join(root_path, key[:-4]) |
| | assert os.path.isdir(directory_path) and os.listdir( |
| | directory_path |
| | ), "You need to first convert the model weights to FP8 explicitly." |
| | return_list.append(directory_path) |
| |
|
| | return return_list |
| |
|
| |
|
| | def is_mm_model(model_path): |
| | """ |
| | Check if the model at the given path is a visual language model. |
| | |
| | Args: |
| | model_path (str): The path to the model. |
| | |
| | Returns: |
| | bool: True if the model is an MM model, False otherwise. |
| | """ |
| | config = AutoConfig.from_pretrained(model_path) |
| | architectures = config.architectures |
| | for architecture in architectures: |
| | if "llava" in architecture.lower(): |
| | return True |
| | return False |
| |
|
| |
|
| | def auto_upgrade(config): |
| | cfg = AutoConfig.from_pretrained(config) |
| | if "llava" in config and "llava" not in cfg.model_type: |
| | assert cfg.model_type == "llama" |
| | print("You are using newer LLaVA code base, while the checkpoint of v0 is from older code base.") |
| | print("You must upgrade the checkpoint to the new code base (this can be done automatically).") |
| | confirm = input("Please confirm that you want to upgrade the checkpoint. [Y/N]") |
| | if confirm.lower() in ["y", "yes"]: |
| | print("Upgrading checkpoint...") |
| | assert len(cfg.architectures) == 1 |
| | setattr(cfg.__class__, "model_type", "llava") |
| | cfg.architectures[0] = "LlavaLlamaForCausalLM" |
| | cfg.save_pretrained(config) |
| | print("Checkpoint upgraded.") |
| | else: |
| | print("Checkpoint upgrade aborted.") |
| | exit(1) |
| |
|