Appearance
tts_service
Description
tex
文本转语音服务接收文本输入并将其转换为语音输出,支持多个语音合成引擎(包括腾讯云、讯飞、百度等)。支持中文、英文等常见语言,默认语言为中文。语速范围为0.5-2.0(默认1.0),音量范围为0-100(默认50)。文本长度建议不超过500字,过长文本可能会分段播放ROS Type: Service
Data Type: audio/TTS.srv
Version:
- 1.0.0 : added
CLI:
shell
rosservice call tts_service "text:
- 'hi'
isPlay: true"
# isPlay: false 只生成文件不播放
# isPlay: true 生成文件并播放Example:
python
#!/usr/bin/env python3
"""
ROS Service Test Script - TTS Service
"""
import rospy
import yaml
import sys
from audio.srv import TTS, TTSRequest
def main():
# Initialize ROS node
rospy.init_node('tts_service_test', anonymous=True)
# Wait for service
rospy.loginfo("Waiting for service tts_service...")
rospy.wait_for_service('tts_service')
try:
# Create service proxy
service_proxy = rospy.ServiceProxy('tts_service', TTS)
# Create request
req = TTSRequest()
req.text = ["hello world"] # 文本数组
req.isPlay = True # 是否播放,true=播放,false=仅生成文件
rospy.loginfo(f"Calling TTS service with text: {req.text}")
# Call service
response = service_proxy(req)
rospy.loginfo(f"Response: success={response.success}, file_path={response.file_path}")
print(f"\nService call successful!")
print(f"Generated audio file: {response.file_path}")
except rospy.ServiceException as e:
rospy.logerr(f"Service call failed: {e}")
sys.exit(1)
if __name__ == '__main__':
try:
main()
except rospy.ROSInterruptException:
passLLM_chat
Description
tex
智能对话服务用于接收用户文本输入,通过大语言模型生成回复文本,并可联动文本转语音服务自动播放回复。输入文本需为自然语言问句或对话内容,支持上下文关联对话。可通过参数控制是否自动播放语音回复(默认自动播放)。回复生成耗时与网络状态、文本复杂度相关,建议预留1-3秒响应时间ROS Type: Service
Data Type: audio/LLMChat.srv
Version:
- 1.0.0 : added
CLI:
shell
rosservice call LLM_chat "raw_input: '语音模块的版本号是多少'
enable_context: true
enable_save: true
context_id: 'test_session'"Example:
python
#!/usr/bin/env python3
"""
ROS Service Test Script - LLM Chat Service
"""
import rospy
import sys
from audio.srv import LLMChat, LLMChatRequest
def main():
# Initialize ROS node
rospy.init_node('llm_chat_test', anonymous=True)
# Wait for service
rospy.loginfo("Waiting for service LLM_chat...")
rospy.wait_for_service('LLM_chat')
try:
# Create service proxy
service_proxy = rospy.ServiceProxy('LLM_chat', LLMChat)
# Create request
req = LLMChatRequest()
req.raw_input = "语音模块的版本号是多少" # 用户输入的问题
req.enable_context = True # 启用上下文理解
req.enable_save = True # 是否记录对话
req.context_id = "test_session" # 会话上下文ID
rospy.loginfo(f"Calling LLM chat service with input: {req.raw_input}")
# Call service
response = service_proxy(req)
rospy.loginfo(f"Response: {response.response}")
print(f"\nService call successful!")
print(f"LLM Response: {response.response}")
except rospy.ServiceException as e:
rospy.logerr(f"Service call failed: {e}")
sys.exit(1)
if __name__ == '__main__':
try:
main()
except rospy.ROSInterruptException:
passmedia_play
Description
tex
音频文件播放服务用于播放本地或指定路径的音频文件,支持暂停、继续、停止等控制操作。支持的音频格式包括MP3、WAV、FLAC,其他格式可能无法正常播放。文件路径需为绝对路径(在docker容器中),网络文件需先下载至本地再播放ROS Type: Service
Data Type: audio/PcmPlay.srv
Version:
- 1.0.0 : added
CLI:
shell
rosservice call media_play "file_path: '/path/to/xxx.wav'"Example:
python
#!/usr/bin/env python3
"""
ROS Service Test Script - Media Play Service
"""
import rospy
import sys
from audio.srv import MediaPlay, MediaPlayRequest
def main():
# Initialize ROS node
rospy.init_node('media_play_test', anonymous=True)
# Wait for service
rospy.loginfo("Waiting for service media_play...")
rospy.wait_for_service('media_play')
try:
# Create service proxy
service_proxy = rospy.ServiceProxy('media_play', MediaPlay)
# Create request
req = MediaPlayRequest()
req.file_path = "/path/to/company_intro.wav" # 音频文件路径,需将文件放置在共享目录下
req.delete_after_play = False # 播放后是否删除文件
rospy.loginfo(f"Playing audio file: {req.file_path}")
# Call service
response = service_proxy(req)
rospy.loginfo(f"Response: success={response.success}, message={response.message}")
print(f"\nService call successful!")
print(f"Playback status: {response.message}")
except rospy.ServiceException as e:
rospy.logerr(f"Service call failed: {e}")
sys.exit(1)
if __name__ == '__main__':
try:
main()
except rospy.ROSInterruptException:
passspeaker/get_devices_list
Description
tex
用于获取系统中已识别的所有扬声器设备信息,包括设备ID、名称、型号等。返回结果为设备列表数组,每个设备包含唯一ID,用于后续选择播放设备。若未识别到扬声器设备,返回空列表,需检查硬件连接或驱动ROS Type: Service
Data Type: audio/GetDeviceList.srv
Version:
- 1.0.0 : added
CLI:
shell
rosservice call speaker/get_devices_list "{}"Example:
python
#!/usr/bin/env python3
"""
ROS Service Test Script - Get Speaker Devices List
"""
import rospy
import sys
from audio.srv import GetDeviceList, GetDeviceListRequest
def main():
# Initialize ROS node
rospy.init_node('get_speaker_devices_test', anonymous=True)
# Wait for service
rospy.loginfo("Waiting for service speaker/get_devices_list...")
rospy.wait_for_service('speaker/get_devices_list')
try:
# Create service proxy
service_proxy = rospy.ServiceProxy('speaker/get_devices_list', GetDeviceList)
# Create empty request (this service has no request parameters)
req = GetDeviceListRequest()
rospy.loginfo("Calling service to get speaker devices list...")
# Call service
response = service_proxy(req)
rospy.loginfo(f"Response: success={response.success}")
print(f"\nService call successful!")
print(f"Available speaker devices: {len(response.devicelist)}")
for idx, device in enumerate(response.devicelist):
print(f" {idx + 1}. {device}")
except rospy.ServiceException as e:
rospy.logerr(f"Service call failed: {e}")
sys.exit(1)
if __name__ == '__main__':
try:
main()
except rospy.ROSInterruptException:
passspeaker/select_device
Description
tex
用于指定当前生效的扬声器设备,通过设备ID选择对应的播放设备。设备ID需从"获取扬声器设备列表"接口的返回结果中获取。设置成功后,后续所有音频播放操作均通过该设备输出。若指定设备ID不存在,返回设置失败,保持当前生效设备不变ROS Type: Service
Data Type: audio/SetDevice.srv
Version:
- 1.0.0 : added
CLI:
shell
rosservice call speaker/select_device "name: 'YUNJI31993Ultra'"Example:
python
#!/usr/bin/env python3
"""
ROS Service Test Script - Select Speaker Device
"""
import rospy
import sys
from audio.srv import SetDevice, SetDeviceRequest
def main():
# Initialize ROS node
rospy.init_node('select_speaker_device_test', anonymous=True)
# Wait for service
rospy.loginfo("Waiting for service speaker/select_device...")
rospy.wait_for_service('speaker/select_device')
try:
# Create service proxy
service_proxy = rospy.ServiceProxy('speaker/select_device', SetDevice)
# Create request
req = SetDeviceRequest()
req.name = "YUNJI31993Ultra" # 设备名称,需要先通过get_devices_list获取
rospy.loginfo(f"Selecting speaker device: {req.name}")
# Call service
response = service_proxy(req)
rospy.loginfo(f"Response: success={response.success}, message={response.message}")
print(f"\nService call successful!")
print(f"Device selection status: {response.message}")
except rospy.ServiceException as e:
rospy.logerr(f"Service call failed: {e}")
sys.exit(1)
if __name__ == '__main__':
try:
main()
except rospy.ROSInterruptException:
passspeaker/get_volume
Description
tex
用于查询当前生效扬声器的音量大小,返回值为0-100的整数。音量值范围为0(静音)-100(最大音量),默认初始音量为50。若未设置生效扬声器,返回默认设备的音量值ROS Type: Service
Data Type: audio/GetVolume.srv
Version:
- 1.0.0 : added
CLI:
shell
rosservice call speaker/get_volume "{}"Example:
python
#!/usr/bin/env python3
"""
ROS Service Test Script - Get Speaker Volume
"""
import rospy
import sys
from audio.srv import GetVolume, GetVolumeRequest
def main():
# Initialize ROS node
rospy.init_node('get_volume_test', anonymous=True)
# Wait for service
rospy.loginfo("Waiting for service speaker/get_volume...")
rospy.wait_for_service('speaker/get_volume')
try:
# Create service proxy
service_proxy = rospy.ServiceProxy('speaker/get_volume', GetVolume)
# Create empty request (this service has no request parameters)
req = GetVolumeRequest()
rospy.loginfo("Getting current speaker volume...")
# Call service
response = service_proxy(req)
rospy.loginfo(f"Response: volume={response.volume}%")
print(f"\nService call successful!")
print(f"Current volume: {response.volume}%")
except rospy.ServiceException as e:
rospy.logerr(f"Service call failed: {e}")
sys.exit(1)
if __name__ == '__main__':
try:
main()
except rospy.ROSInterruptException:
passspeaker/set_volume
Description
tex
用于调整当前生效扬声器的音量,支持手动输入0-100的音量值。音量值超出0-100范围时,自动截断为最近的有效数值(如输入110则设为100)。设置成功后即时生效,后续播放的音频均使用新音量。支持通过该接口设置静音(输入音量值0)ROS Type: Service
Data Type: audio/SetVolume.srv
Version:
- 1.0.0 : added
CLI:
shell
rosservice call speaker/set_volume "volume: 50"Example:
python
#!/usr/bin/env python3
"""
ROS Service Test Script - Set Speaker Volume
"""
import rospy
import sys
from audio.srv import SetVolume, SetVolumeRequest
def main():
# Initialize ROS node
rospy.init_node('set_volume_test', anonymous=True)
# Wait for service
rospy.loginfo("Waiting for service speaker/set_volume...")
rospy.wait_for_service('speaker/set_volume')
try:
# Create service proxy
service_proxy = rospy.ServiceProxy('speaker/set_volume', SetVolume)
# Create request
req = SetVolumeRequest()
req.volume = 50 # 音量大小,范围0-100
rospy.loginfo(f"Setting speaker volume to: {req.volume}%")
# Call service
response = service_proxy(req)
rospy.loginfo(f"Response: success={response.success}, message={response.message}")
print(f"\nService call successful!")
print(f"Volume set status: {response.message}")
except rospy.ServiceException as e:
rospy.logerr(f"Service call failed: {e}")
sys.exit(1)
if __name__ == '__main__':
try:
main()
except rospy.ROSInterruptException:
passmicrophone/get_devices_list
Description
tex
用于获取系统中已识别的所有麦克风设备信息,包括设备ID、名称、采样率等。返回结果包含每个设备的唯一ID和支持的采样率,供后续选择和配置使用。未识别到麦克风设备时返回空列表,需检查硬件连接或驱动安装ROS Type: Service
Data Type: audio/GetDeviceList.srv
Version:
- 1.0.0 : added
CLI:
shell
rosservice call microphone/get_devices_list "{}"Example:
python
#!/usr/bin/env python3
"""
ROS Service Test Script - Get Microphone Devices List
"""
import rospy
import sys
from audio.srv import GetDeviceList, GetDeviceListRequest
def main():
# Initialize ROS node
rospy.init_node('get_microphone_devices_test', anonymous=True)
# Wait for service
rospy.loginfo("Waiting for service microphone/get_devices_list...")
rospy.wait_for_service('microphone/get_devices_list')
try:
# Create service proxy
service_proxy = rospy.ServiceProxy('microphone/get_devices_list', GetDeviceList)
# Create empty request (this service has no request parameters)
req = GetDeviceListRequest()
rospy.loginfo("Calling service to get microphone devices list...")
# Call service
response = service_proxy(req)
rospy.loginfo(f"Response: success={response.success}")
print(f"\nService call successful!")
print(f"Available microphone devices: {len(response.devicelist)}")
for idx, device in enumerate(response.devicelist):
print(f" {idx + 1}. {device}")
except rospy.ServiceException as e:
rospy.logerr(f"Service call failed: {e}")
sys.exit(1)
if __name__ == '__main__':
try:
main()
except rospy.ROSInterruptException:
passmicrophone/select_device
Description
tex
用于指定当前用于音频采集的麦克风设备,通过设备ID进行选择。设备ID需从"获取麦克风设备列表"接口的返回结果中获取。设置成功后,后续音频采集、语音识别等操作均通过该设备进行。若指定设备不存在,返回设置失败,保持当前生效设备不变ROS Type: Service
Data Type: audio/SetDevice.srv
Version:
- 1.0.0 : added
CLI:
shell
rosservice call microphone/select_device "name: 'YUNJI31993Ultra'"Example:
python
#!/usr/bin/env python3
"""
ROS Service Test Script - Select Microphone Device
"""
import rospy
import sys
from audio.srv import SetDevice, SetDeviceRequest
def main():
# Initialize ROS node
rospy.init_node('select_microphone_device_test', anonymous=True)
# Wait for service
rospy.loginfo("Waiting for service microphone/select_device...")
rospy.wait_for_service('microphone/select_device')
try:
# Create service proxy
service_proxy = rospy.ServiceProxy('microphone/select_device', SetDevice)
# Create request
req = SetDeviceRequest()
req.name = "YUNJI31993Ultra" # 设备名称,需要先通过get_devices_list获取
rospy.loginfo(f"Selecting microphone device: {req.name}")
# Call service
response = service_proxy(req)
rospy.loginfo(f"Response: success={response.success}, message={response.message}")
print(f"\nService call successful!")
print(f"Device selection status: {response.message}")
except rospy.ServiceException as e:
rospy.logerr(f"Service call failed: {e}")
sys.exit(1)
if __name__ == '__main__':
try:
main()
except rospy.ROSInterruptException:
passmicrophone/audio_data
Description
tex
用于实时发布唤醒后的麦克风音频数据流,供语音识别、音频存储等上层应用使用。音频流格式为PCM,采样率默认16000Hz,位深16bit,单声道。仅在麦克风被唤醒后开始发布数据,未唤醒时无消息输出。数据用于后续语音转文字(ASR)或音频分析处理ROS Type: Topic
Data Type: audio/AudioData.msg
Version:
- 1.0.0 : added
CLI:
shell
rostopic echo microphone/audio_dataExample:
python
#!/usr/bin/env python3
"""
ROS Topic Subscriber - Audio Data
"""
import rospy
from audio.msg import AudioData
def audio_data_callback(msg):
"""
处理音频数据流
msg.channel: 声道数
msg.vad_status: VAD状态(0=无语音,>0=有语音)
msg.is_final: 是否是最终一帧
msg.audio_data: 音频数据(uint8数组)
"""
if msg.vad_status > 0:
rospy.loginfo(f"Voice detected: channel={msg.channel}, is_final={msg.is_final}, data_size={len(msg.audio_data)}")
def main():
# Initialize ROS node
rospy.init_node('audio_data_subscriber', anonymous=True)
# Subscribe to audio data topic
rospy.Subscriber('microphone/audio_data', AudioData, audio_data_callback)
rospy.loginfo("Listening to audio data stream...")
rospy.spin()
if __name__ == '__main__':
try:
main()
except rospy.ROSInterruptException:
passlisten
Description
tex
唤醒倾听服务为订阅型接口,接收外部唤醒指令,触发麦克风开始音频采集和倾听。订阅该话题后,接收"唤醒"指令(trigger: true)时,麦克风启动采集;接收"停止"指令(trigger: false)时,麦克风停止采集。指令需携带唤醒关键词标识,用于区分不同唤醒场景ROS Type: Topic
Data Type: std_msgs/Bool.msg
Version:
- 1.0.0 : added
CLI:
shell
rostopic pub listen std_msgs/Bool "data: true"Example:
python
#!/usr/bin/env python3
"""
ROS Topic Publisher - Wake Up Control
"""
import rospy
from std_msgs.msg import Bool
def main():
# Initialize ROS node
rospy.init_node('wake_up_control', anonymous=True)
# Create publisher
pub = rospy.Publisher('listen', Bool, queue_size=10)
# Wait for subscribers
rospy.sleep(1.0)
# Send wake up command
msg = Bool()
msg.data = True # True=唤醒,False=休眠
rospy.loginfo(f"Sending wake up command: {msg.data}")
pub.publish(msg)
rospy.loginfo("Wake up command sent successfully!")
rospy.sleep(1.0)
if __name__ == '__main__':
try:
main()
except rospy.ROSInterruptException:
passlisten_state
Description
tex
实时发布麦克风的唤醒倾听状态,告知上层应用当前是否处于音频采集状态。状态包括"未唤醒""唤醒中""倾听超时"三种,便于上层应用判断是否进行后续交互。倾听超时时间默认30秒,超时后自动切换为"未唤醒"状态ROS Type: Topic
Data Type: std_msgs/Bool.msg
Version:
- 1.0.0 : added
CLI:
shell
rostopic echo listen_stateExample:
python
#!/usr/bin/env python3
"""
ROS Topic Subscriber - Listen State
"""
import rospy
from std_msgs.msg import Bool
def listen_state_callback(msg):
"""
处理倾听状态
msg.data: True=正在倾听,False=未倾听
"""
if msg.data:
rospy.loginfo("Robot is listening...")
else:
rospy.loginfo("Robot is not listening")
def main():
# Initialize ROS node
rospy.init_node('listen_state_subscriber', anonymous=True)
# Subscribe to listen state topic
rospy.Subscriber('listen_state', Bool, listen_state_callback)
rospy.loginfo("Monitoring listen state...")
rospy.spin()
if __name__ == '__main__':
try:
main()
except rospy.ROSInterruptException:
passasr_text
Description
tex
实时将麦克风采集的音频流转换为文本,通过话题发布转换结果,支持连续语音识别。识别语言默认中文,支持识别标点符号,准确率与语音清晰度、环境噪音相关。连续语音识别时,每100ms发布一次当前识别结果,句子结束后发布最终完整文本。未识别到有效语音时,发布空文本字符串ROS Type: Topic
Data Type: std_msgs/String.msg
Version:
- 1.0.0 : added
CLI:
shell
rostopic echo asr_textExample:
python
#!/usr/bin/env python3
"""
ROS Topic Subscriber - ASR Text
"""
import rospy
from std_msgs.msg import String
def asr_text_callback(msg):
"""
处理语音识别结果
msg.data: 识别到的文本
"""
if msg.data:
rospy.loginfo(f"Recognized text: {msg.data}")
else:
rospy.loginfo("No speech detected")
def main():
# Initialize ROS node
rospy.init_node('asr_text_subscriber', anonymous=True)
# Subscribe to ASR text topic
rospy.Subscriber('asr_text', String, asr_text_callback)
rospy.loginfo("Listening to ASR text output...")
rospy.spin()
if __name__ == '__main__':
try:
main()
except rospy.ROSInterruptException:
passversion
Description
tex
用于查询音频模块的当前软件版本号,包括基础功能版本和各子服务版本信息。返回结果包含主版本号、子版本号、修订号,以及TTS、ASR、LLM等子服务的版本信息。版本号格式遵循语义化版本规范(X.Y.Z)ROS Type: Service
Data Type: std_srvs/Trigger.srv
Version:
- 1.0.0 : added
CLI:
shell
rosservice call version "{}"Example:
python
#!/usr/bin/env python3
"""
ROS Service Test Script - Audio Module Version
"""
import rospy
import sys
from std_srvs.srv import Trigger, TriggerRequest
def main():
# Initialize ROS node
rospy.init_node('audio_version_test', anonymous=True)
# Wait for service
rospy.loginfo("Waiting for service version...")
rospy.wait_for_service('version')
try:
# Create service proxy
service_proxy = rospy.ServiceProxy('version', Trigger)
# Create empty request (Trigger service has no request parameters)
req = TriggerRequest()
rospy.loginfo("Calling service to get audio module version...")
# Call service
response = service_proxy(req)
rospy.loginfo(f"Response: success={response.success}")
print(f"\nService call successful!")
print(f"Audio Module Version: {response.message}")
except rospy.ServiceException as e:
rospy.logerr(f"Service call failed: {e}")
sys.exit(1)
if __name__ == '__main__':
try:
main()
except rospy.ROSInterruptException:
pass