| #!/usr/bin/env python3 |
| |
| import argparse |
| import cmd |
| import logging |
| import sys |
| import zmq |
| |
| HELP = ''' |
| Provide a shell used to send interactive commands to a zmq filter. |
| |
| The command assumes there is a running zmq or azmq filter acting as a |
| ZMQ server. |
| |
| You can send a command to it, follwing the syntax: |
| TARGET COMMAND [COMMAND_ARGS] |
| |
| * TARGET is the target filter identifier to send the command to |
| * COMMAND is the name of the command sent to the filter |
| * COMMAND_ARGS is the optional specification of command arguments |
| |
| See the zmq/azmq filters documentation for more details, and the |
| zeromq documentation at: |
| https://zeromq.org/ |
| ''' |
| |
| logging.basicConfig(format='zmqshell|%(levelname)s> %(message)s', level=logging.INFO) |
| log = logging.getLogger() |
| |
| |
| class LavfiCmd(cmd.Cmd): |
| prompt = 'lavfi> ' |
| |
| def __init__(self, bind_address): |
| context = zmq.Context() |
| self.requester = context.socket(zmq.REQ) |
| self.requester.connect(bind_address) |
| cmd.Cmd.__init__(self) |
| |
| def onecmd(self, cmd): |
| if cmd == 'EOF': |
| sys.exit(0) |
| log.info(f"Sending command: {cmd}") |
| self.requester.send_string(cmd) |
| response = self.requester.recv_string() |
| log.info(f"Received response: {response}") |
| |
| |
| class Formatter( |
| argparse.ArgumentDefaultsHelpFormatter, argparse.RawDescriptionHelpFormatter |
| ): |
| pass |
| |
| |
| def main(): |
| parser = argparse.ArgumentParser(description=HELP, formatter_class=Formatter) |
| parser.add_argument('--bind-address', '-b', default='tcp://localhost:5555', help='specify bind address used to communicate with ZMQ') |
| |
| args = parser.parse_args() |
| try: |
| LavfiCmd(args.bind_address).cmdloop('FFmpeg libavfilter interactive shell') |
| except KeyboardInterrupt: |
| pass |
| |
| |
| if __name__ == '__main__': |
| main() |