Smart Home/MATTER

MATTER/chip-device-ctrl(python controller) 분석

감자최고 2022. 10. 10. 00:45

./out/python_env/bin/chip-device-ctrl

def main():
   try:
        devMgrCmd = DeviceMgrCmd(rendezvousAddr=options.rendezvousAddr,
                                 controllerNodeId=options.controllerNodeId, bluetoothAdapter=adapterId)
    except Exception as ex:
        print(ex)
        print("Failed to bringup CHIPDeviceController CLI")
        sys.exit(1)

    print("Chip Device Controller Shell")

    try:
        devMgrCmd.cmdloop()
    except KeyboardInterrupt:
        print("\nQuitting")

    sys.exit(0)

if __name__ == "__main__":
    main()
from chip import ChipDeviceCtrl
from chip import ChipCommissionableNodeCtrl
from chip import exceptions
from cmd import Cmd

class DeviceMgrCmd(Cmd):
    def __init__(self, rendezvousAddr=None, controllerNodeId=0, bluetoothAdapter=None):
        self.lastNetworkId = None

        Cmd.__init__(self)

        Cmd.identchars = string.ascii_letters + string.digits + "-"

        DeviceMgrCmd.command_names.sort()
        self.devCtrl = ChipDeviceCtrl.ChipDeviceController(
            controllerNodeId=controllerNodeId, bluetoothAdapter=bluetoothAdapter)

        self.commissionableNodeCtrl = ChipCommissionableNodeCtrl.ChipCommissionableNodeController()

    command_names = [
        "ble-scan",
        "connect",
        "resolve",
        "zcl",
        "discover",
        "get-fabricid",
    ]

    def do_blescan(self, line):
        if not self.bleMgr:
            self.bleMgr = BleManager(self.devCtrl)
        self.bleMgr.scan(line)
        return

	def do_connect(self, line):
        try:
            print("Device is assigned with nodeid = {}".format(nodeid))

            if args[0] == "-ip" and len(args) >= 3:
                self.devCtrl.ConnectIP(args[1].encode(
                    "utf-8"), int(args[2]), nodeid)
            elif args[0] == "-ble" and len(args) >= 3:
                self.devCtrl.ConnectBLE(int(args[1]), int(args[2]), nodeid)
            else:
                print("Usage:")
                self.do_help("connect SetupPinCode")
                return
            print(
                "Device temporary node id (**this does not match spec**): {}".format(nodeid))
        except exceptions.ChipStackException as ex:
            print(str(ex))
            return
            
    def do_resolve(self, line):
        """
        resolve <nodeid>

        Resolve DNS-SD name corresponding with the given node ID and
        update address of the node in the device controller.
        """
        try:
            args = shlex.split(line)
            if len(args) == 1:
                err = self.devCtrl.ResolveNode(int(args[0]))
                if err == 0:
                    address = self.devCtrl.GetAddressAndPort(int(args[0]))
                    address = "{}:{}".format(
                        *address) if address else "unknown"
                    print("Current address: " + address)
            else:
                self.do_help("resolve")
        except exceptions.ChipStackException as ex:
            print(str(ex))
            return

    def do_discover(self, line):
        try:
            if args.all:
                self.commissionableNodeCtrl.DiscoverCommissioners()
                self.wait_for_many_discovered_devices()
                self.commissionableNodeCtrl.PrintDiscoveredCommissioners()
                self.devCtrl.DiscoverAllCommissioning()
                self.wait_for_many_discovered_devices()
            else:
                self.do_help("discover")
                return
            self.devCtrl.PrintDiscoveredDevices()
        except exceptions.ChipStackException as ex:
            print('exception')
            print(str(ex))
            return
        except:
            self.do_help("discover")
            return

    def do_zcl(self, line):
        try:
            args = shlex.split(line)
            all_commands = self.devCtrl.ZCLCommandList()
            if len(args) == 1 and args[0] == '?':
                print('\n'.join(all_commands.keys()))
            elif len(args) == 2 and args[0] == '?':
                if args[1] not in all_commands:
                    raise exceptions.UnknownCluster(args[1])
                for commands in all_commands.get(args[1]).items():
                    args = ", ".join(["{}: {}".format(argName, argType)
                                      for argName, argType in commands[1].items()])
                    print(commands[0])
                    if commands[1]:
                        print("  ", args)
                    else:
                        print("  <no arguments>")
            elif len(args) > 4:
                if args[0] not in all_commands:
                    raise exceptions.UnknownCluster(args[0])
                command = all_commands.get(args[0]).get(args[1], None)
                # When command takes no arguments, (not command) is True
                if command is None:
                    raise exceptions.UnknownCommand(args[0], args[1])
                err, res = self.devCtrl.ZCLSend(args[0], args[1], int(
                    args[2]), int(args[3]), int(args[4]), FormatZCLArguments(args[5:], command), blocking=True)
                if err != 0:
                    print("Failed to receive command response: {}".format(res))
                elif res != None:
                    print("Received command status response:")
                    print(res)
                else:
                    print("Success, no status code is attached with response.")
            else:
                self.do_help("zcl")
        except exceptions.ChipStackException as ex:
            print("An exception occurred during process ZCL command:")
            print(str(ex))
        except Exception as ex:
            print("An exception occurred during processing input:")
            traceback.print_exc()
            print(str(ex))

    def do_getfabricid(self, line):
        try:
            compressed_fabricid = self.devCtrl.GetCompressedFabricId()
            raw_fabricid = self.devCtrl.GetFabricId()
        except exceptions.ChipStackException as ex:
            print("An exception occurred during reading FabricID:")
            print(str(ex))
            return

        print("Get fabric ID complete")
        print("Raw Fabric ID: 0x{:016x}".format(raw_fabricid)
              + " (" + str(raw_fabricid) + ")")
        print("Compressed Fabric ID: 0x{:016x}".format(compressed_fabricid)
              + " (" + str(compressed_fabricid) + ")")