多线程显存与内存管理

多线程显存与内存管理

问题背景:

​ 需求是要制作一个文本模态和语音模态对齐的数据集,我需要做的仅仅是选一个效果很好的 TTS 模型将文本转语音就行了,我们需要从 1000k 条 conversation 里面挑选 40k 条数据进行转换(格式类似于 llava_instruct_150k.json 文件,是 human 和 gpt 文本对话的格式),由于单个模型一条一条地转化太耗时了,而且模型也不大,因此我们将在单卡上进行多进程推理(文件记为 parallel_cosyvoice.py),在多卡上重复多进程(再套一次多进程)来实现最大的并行化,因此要做一个 parallel.py 来多线程调用 parallel_cosyvoice.py

遇到的问题

并不能直接指定模型的设备

​ 由于主流 TTS 框架并不会做一个支持多卡并行的 TTS 模型,cosyvoice 也是这样,甚至 cosyvoice 都不能指定设置在哪一块显卡上,只能默认是 cuda0。对于这个问题,第一种尝试的解决思路是,通过修改它的源代码,把整个模型的传入的参数都加上 device_id 这个参数,再在创建模型之后就使用 torch.cuda.set_device() 或者 torch.device() 来设置每一个模型的设备。但是后来发现了一个错误怎么都解决不了:

1
2
x = torch.mul(y, CONST_TENSOR):
Expected all tensors to be on the same device, but found at least two devices, cuda:1 and cuda:0!

这个 CONST_TENSOR 按命名规则来说,他是一个常数张量,并不是我们手动创建的,他创建的时候默认在 cuda:0 上,而且这是内部库,我根本不能修改。但是我需要整个模型从头到尾都是在 cuda:1 这个设备上。

​ 最终的解决办法是使用直接设置环境变量的方式,解决了默认创建张量位置不对的问题:

1
2
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '1'

总结一下指定设备的方式:

  • torch.device 用于创建表示计算设备的对象,可以指定CPU或特定编号的GPU
  • torch.cuda.set_device 用于设置当前默认的CUDA设备,影响后续操作的默认行为。适用于单卡调试,简单快捷。最佳实践:
1
2
with torch.cuda.device(1):
pass
  • os.environ['CUDA_VISIBLE_DEVICES'] 是一个环境变量,用于限定程序可以看到的GPU,是一种更高级别的控制手段

multiprocessing不能管理显存

​ 我们一开始使用 python 原生的的 multiprocessing 模块来实现在单张卡上的多进程,但是我发现程序总是会跑到一半就卡住,后来在终端使用:

1
watch -n 1 nvidia-smi

发现显存只增不减,后果就是,多个进程之间进行显存的竞争,最后导致死锁什么都跑不了。但是这很奇怪,因为程序运行到后面的话显存应该是稳定下来的。说明显存并没有被正确的管理

​ 如果我机灵一点的话早就应该想到这个问题了,multiprocessing 模块是 python 的原生模块,它最多只能管理到内存,怎么可能管理到显存?后来查阅资料发现 pytorch 有一个专门管理多进程的模块:torch.multiprocessing(属于我学艺不精了,用了这么久 pytorch 都不知道这个模块)

  • multiprocessing:通用的多进程模块,适用于任何Python程序,只能管理内存
  • torch.multiprocessing:专门为PyTorch优化的多进程模块,适用于需要在多进程中使用PyTorch的Tensor和CUDA功能的场景,可以管理内存和显存

单卡多进程的最终实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import torch.multiprocessing as mp

class MyProcess(mp.Process):
def __init__(self, questions, output_dir, id):
super(MyProcess, self).__init__()
self.questions = questions
self.output_dir = output_dir
self.id = id

def run(self):
try:
logging.debug(f"new CosyVoice")
cosyvoice = CosyVoice(MODEL_PATH)
for i, question in enumerate(self.questions):
style = random.choice(style_list)
single_inference(cosyvoice, *question, style, self.output_dir)

if i % MAX_ROUNDS_PER_COSYVOICE == MAX_ROUNDS_PER_COSYVOICE-1:
logging.debug(f"进程 {self.name} 已经处理了 {i+1} 个问题, 重建 cosyvoice。")
print(f"进程 {self.name} 已经处理了 {i+1} 个问题, 重建 cosyvoice。")

cosyvoice.destroy()
del cosyvoice
gc.collect()
torch.cuda.empty_cache()
cosyvoice = CosyVoice(MODEL_PATH)

except Exception as e:
print(f"进程 {self.name} 中发生错误: {e}")
logging.error(f"发生错误: {e}", exc_info=True)
raise e


def single_inference(cosyvoice, id, text, style, output_dir):
for i, j in enumerate(cosyvoice.inference_sft(text, style, stream=False)):
torchaudio.save( os.path.join(output_dir, 'output', 'sft_{}_{}.wav'.format(id, i)), j['tts_speech'], 22050)

def restart_process(questions, output_dir):
process = MyProcess(questions, output_dir)
process.start()
process.join()

##########################################################################


def parallel_inference(questions, max_workers, output_dir):
processes = []
for i in range(max_workers):
process = MyProcess(questions[i::max_workers], output_dir, i)
process.start()
processes.append(process)

for process in processes:
process.join()

subprocess 启动子进程受到操作系统内存限制

​ 我需要在多卡上都调用多线程 parallel_cosyvoice.py 文件,为了不每次跑的时候有几张卡就启动几个进程,我就想着使用 subprocess 模块,由此出现了一个问题:我在两张卡上进行实验,每次跑着跑着就会有一张卡挂掉(跑不动了),而另一张卡非常正常的运行。这时候就猜到了是内存的竞争造成的原因(其实对于单个来说内存够了,但是多张卡加起来内存就不够了)

​ 内存受限的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import os, subprocess

device_num = 1 # GPU 数量 (=线程数量)
task_num = 8 # 每个 GPU 上运行的模型数量

def exist_dir(dirpath):
if not os.path.exists(dirpath):
os.makedirs(dirpath)
print(f'目录 "{dirpath}" 已创建。')
else:
print(f'目录 "{dirpath}" 已经存在。')

if __name__ == "__main__":
exist_dir("output")
exist_dir("log")

os.environ["PYTHONPATH"] = 'third_party/Matcha-TTS'

# 要运行的外部程序命令列表
commands = [
["python", "parrel_cosyvoice.py", str(i), str(device_num), str(task_num)] for i in range(device_num)
]

processes = []

for command in commands:
print(f"Starting Command {str(command)}")
processes.append(subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE))

# 等待所有进程完成,并获取它们的输出
for process in processes:
stdout, stderr = process.communicate()
print(f"STDOUT: {stdout.decode('utf-8')}")
if stderr:
print(f"STDERR: {stderr.decode('utf-8')}")

后来为了避免多个子进程在一起内存最大上限不够,于是写了一个 bash:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
device_num=2   # GPU 数量 (=线程数量)
task_num=4 # 每个 GPU 上运行的模型数量
alpha=0.04 # 选择的数据比例, 0.04 表示选择 4% 的数据
input_file="open-llava-next_instruct_mix1M.json" # 要转化为语音的数据文件
output_dir="/root/autodl-tmp/result" # 输出目录
max_words_per_sentence=40 # 每个问/答中允许的最大单词数

exist_dir() {
if [ -d "$1" ]; then
rm -rf "$1"
echo "目录 \"$1\" 中原有数据已经被删除。"
fi
mkdir -p "$1"
}

main() {
# 删除之前产生的输出数据
exist_dir "$output_dir"
mkdir -p "$output_dir/output"
mkdir -p "$output_dir/log"

python preprocess.py $input_file $alpha $device_num

commands=()
for ((i=0; i<device_num; i++)); do
new_file="${input_file%.json}_$i.json"
commands+=("python parallel_cosyvoice.py $new_file $output_dir $i $task_num $max_words_per_sentence")
done

export PYTHONPATH='third_party/Matcha-TTS'
for command in "${commands[@]}"; do
echo "Starting Command $command"
$command &
done

wait
}

# 执行主程序
main
echo "All Done!"

至此,问题全部解决

总结:

​ 操作系统的知识还是挺重要的,我要是知道内存管理的重要性早就应该意识到 multiprocessing 不能管理显存了,要是会监视内存和显存早就可以解决操作系统对内存进行限制的问题了。在这里立志把CSAPP从头到尾看完