Configure your robot hardware for use with Repoch
Import the module
from repoch.client_api.streaming import VideoStream
Create streaming instance
stream = VideoStream(robot_name="robot_arm", stream_name="wrist_cam")
Stream data
stream.add_frame(image)
Close safely
stream.close()
from repoch.client_api.streaming import TimeseriesStream
stream = TimeseriesStream(robot_name="robot_arm", stream_name="wrist_cam", series_names=["joint_1", "joint_2"])
stream.add_data({"joint_1": 0.5, "joint_2": 1.5})
from repoch.client_api.streaming import AudioStream
stream = AudioStream(robot_name="robot_arm", stream_name="mic_1")
from repoch.client_api.streaming import TimeseriesStream, VideoStream, AudioStream # Create streaming instances with robot and stream names video_stream = VideoStream(robot_name="robot_arm", stream_name="wrist_cam") audio_stream = AudioStream(robot_name="robot_arm", stream_name="mic_1") ts_stream = TimeseriesStream(robot_name="robot_arm", stream_name="wrist_cam", series_names=["joint_1", "joint_2"]) # Your camera and robot driver implementations camera = YourCameraObject() robot_driver = YourRobotDriver() try: # Stream data during operation while True: image = camera.get_frame() if image: video_stream.add_frame(image) joints = robot_driver.get_pos() joint_series = {"joint_1": joints[0], "joint_2": joints[1]} ts_stream.add_data(joint_series) finally: # Close streams safely when done ts_stream.close() video_stream.close() audio_stream.close()