Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the forked espeak-ng phonemizer in Python #138

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 146 additions & 45 deletions src/python_run/piper/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import ctypes
import io
import json
import logging
Expand All @@ -8,14 +9,24 @@

import numpy as np
import onnxruntime
from espeak_phonemizer import Phonemizer

_LOGGER = logging.getLogger(__name__)

_BOS = "^"
_EOS = "$"
_PAD = "_"

EE_OK = 0
AUDIO_OUTPUT_SYNCHRONOUS = 0x02
espeakPHONEMES_IPA = 0x02
espeakCHARS_AUTO = 0

CLAUSE_INTONATION_FULL_STOP = 0x00000000
CLAUSE_INTONATION_COMMA = 0x00001000
CLAUSE_INTONATION_QUESTION = 0x00002000
CLAUSE_INTONATION_EXCLAMATION = 0x00003000

CLAUSE_TYPE_SENTENCE = 0x00080000

@dataclass
class PiperConfig:
Expand All @@ -29,6 +40,64 @@ class PiperConfig:
phoneme_id_map: Mapping[str, Sequence[int]]


class CustomPhonemizer(object):
""" A modified Phonemizer that keeps the punctuation.
Needs a patched libespeak-ng.so from https://github.com/rhasspy/espeak-ng """
def __init__(self, voice):
# Set voice
ret = forked_lib.espeak_SetVoiceByName(voice.encode("utf-8"))
assert ret == EE_OK, ret

def phonemize(self, text):
text_pointer = ctypes.c_char_p(text.encode("utf-8"))

phoneme_flags = espeakPHONEMES_IPA
text_flags = espeakCHARS_AUTO

phonemes = ""
while text_pointer:
terminator = ctypes.c_int(0)
clause_phonemes = forked_lib.espeak_TextToPhonemesWithTerminator(
ctypes.pointer(text_pointer),
text_flags,
phoneme_flags,
ctypes.pointer(terminator),
)
if isinstance(clause_phonemes, bytes):
phonemes += clause_phonemes.decode()

# Check for punctuation.
# The testing order here is critical.
if (terminator.value & CLAUSE_INTONATION_EXCLAMATION) == CLAUSE_INTONATION_EXCLAMATION:
phonemes += "!"
elif (terminator.value & CLAUSE_INTONATION_QUESTION) == CLAUSE_INTONATION_QUESTION:
phonemes += "?"
elif (terminator.value & CLAUSE_INTONATION_COMMA) == CLAUSE_INTONATION_COMMA:
phonemes += ","
elif (terminator.value & CLAUSE_INTONATION_FULL_STOP) == CLAUSE_INTONATION_FULL_STOP:
phonemes += "."

# Check for end of sentence
if (terminator.value & CLAUSE_TYPE_SENTENCE) == CLAUSE_TYPE_SENTENCE:
phonemes += "\n"
else:
phonemes += " "
return phonemes


# Check if we have the patched lib needed to use the CustomPhonemizer
forked_lib_available = False
try:
forked_lib = ctypes.cdll.LoadLibrary("libespeak-ng.so")
# Will fail if custom function is missing
forked_lib.espeak_TextToPhonemesWithTerminator.restype = ctypes.c_char_p
# Initialize
forked_lib_available = forked_lib.espeak_Initialize(AUDIO_OUTPUT_SYNCHRONOUS, 0, None, 0) > 0
Phonemizer = CustomPhonemizer
except ValueError:
from espeak_phonemizer import Phonemizer


class Piper:
def __init__(
self,
Expand All @@ -49,14 +118,14 @@ def __init__(
else ["CUDAExecutionProvider"],
)

def synthesize(
def synthesize_partial(
self,
text: str,
speaker_id: Optional[int] = None,
length_scale: Optional[float] = None,
noise_scale: Optional[float] = None,
noise_w: Optional[float] = None,
) -> bytes:
) -> List[np.ndarray]:
"""Synthesize WAV audio from text."""
if length_scale is None:
length_scale = self.config.length_scale
Expand All @@ -68,54 +137,80 @@ def synthesize(
noise_w = self.config.noise_w

phonemes_str = self.phonemizer.phonemize(text)
phonemes = [_BOS] + list(phonemes_str)
phoneme_ids: List[int] = []

for phoneme in phonemes:
if phoneme in self.config.phoneme_id_map:
phoneme_ids.extend(self.config.phoneme_id_map[phoneme])
phoneme_ids.extend(self.config.phoneme_id_map[_PAD])
else:
_LOGGER.warning("No id for phoneme: %s", phoneme)

phoneme_ids.extend(self.config.phoneme_id_map[_EOS])
total_audio = []
for s in phonemes_str.splitlines():
# print(s)
phonemes = [_BOS] + list(s)
phoneme_ids: List[int] = []

for phoneme in phonemes:
if phoneme in self.config.phoneme_id_map:
phoneme_ids.extend(self.config.phoneme_id_map[phoneme])
phoneme_ids.extend(self.config.phoneme_id_map[_PAD])
else:
_LOGGER.warning("No id for phoneme: `%s`", phoneme)

phoneme_ids.extend(self.config.phoneme_id_map[_EOS])

phoneme_ids_array = np.expand_dims(np.array(phoneme_ids, dtype=np.int64), 0)
phoneme_ids_lengths = np.array([phoneme_ids_array.shape[1]], dtype=np.int64)
scales = np.array(
[noise_scale, length_scale, noise_w],
dtype=np.float32,
)

if (self.config.num_speakers > 1) and (speaker_id is None):
# Default speaker
speaker_id = 0

sid = None

if speaker_id is not None:
sid = np.array([speaker_id], dtype=np.int64)

# Synthesize through Onnx
audio = self.model.run(
None,
{
"input": phoneme_ids_array,
"input_lengths": phoneme_ids_lengths,
"scales": scales,
"sid": sid,
},
)[0].squeeze((0, 1))
total_audio.append(audio.squeeze())

return total_audio

phoneme_ids_array = np.expand_dims(np.array(phoneme_ids, dtype=np.int64), 0)
phoneme_ids_lengths = np.array([phoneme_ids_array.shape[1]], dtype=np.int64)
scales = np.array(
[noise_scale, length_scale, noise_w],
dtype=np.float32,
def synthesize(
self,
text: str,
speaker_id: Optional[int] = None,
length_scale: Optional[float] = None,
noise_scale: Optional[float] = None,
noise_w: Optional[float] = None,
) -> bytes:
audios = self.synthesize_partial(
text,
speaker_id,
length_scale,
noise_scale,
noise_w,
)
return self.audios_to_wav(audios)

if (self.config.num_speakers > 1) and (speaker_id is None):
# Default speaker
speaker_id = 0

sid = None

if speaker_id is not None:
sid = np.array([speaker_id], dtype=np.int64)

# Synthesize through Onnx
audio = self.model.run(
None,
{
"input": phoneme_ids_array,
"input_lengths": phoneme_ids_lengths,
"scales": scales,
"sid": sid,
},
)[0].squeeze((0, 1))
audio = audio_float_to_int16(audio.squeeze())

def audios_to_wav(
self, audios: List[np.ndarray]
) -> bytes:
# Convert to WAV
with io.BytesIO() as wav_io:
wav_file: wave.Wave_write = wave.open(wav_io, "wb")
with wav_file:
wav_file.setframerate(self.config.sample_rate)
wav_file.setsampwidth(2)
wav_file.setnchannels(1)
wav_file.writeframes(audio.tobytes())
wav_file.writeframes(audio_float_to_int16(audios))

return wav_io.getvalue()

Expand All @@ -138,10 +233,16 @@ def load_config(config_path: Union[str, Path]) -> PiperConfig:


def audio_float_to_int16(
audio: np.ndarray, max_wav_value: float = 32767.0
audios: List[np.ndarray], max_wav_value: float = 32767.0
) -> np.ndarray:
"""Normalize audio and convert to int16 range"""
audio_norm = audio * (max_wav_value / max(0.01, np.max(np.abs(audio))))
audio_norm = np.clip(audio_norm, -max_wav_value, max_wav_value)
audio_norm = audio_norm.astype("int16")
return audio_norm
mx = 0.01
for audio in audios:
mx = max(mx, np.max(np.abs(audio)))
total_audio = b''
for audio in audios:
audio_norm = audio * (max_wav_value / mx)
audio_norm = np.clip(audio_norm, -max_wav_value, max_wav_value)
audio_norm = audio_norm.astype("int16")
total_audio += audio_norm.tobytes()
return total_audio