Smart Home/MATTER
MATTER/chip-device-ctrl(python controller) 분석
감자최고
2022. 10. 10. 00:45
./out/python_env/bin/chip-device-ctrl
def main():
try:
devMgrCmd = DeviceMgrCmd(rendezvousAddr=options.rendezvousAddr,
controllerNodeId=options.controllerNodeId, bluetoothAdapter=adapterId)
except Exception as ex:
print(ex)
print("Failed to bringup CHIPDeviceController CLI")
sys.exit(1)
print("Chip Device Controller Shell")
try:
devMgrCmd.cmdloop()
except KeyboardInterrupt:
print("\nQuitting")
sys.exit(0)
if __name__ == "__main__":
main()
from chip import ChipDeviceCtrl
from chip import ChipCommissionableNodeCtrl
from chip import exceptions
from cmd import Cmd
class DeviceMgrCmd(Cmd):
def __init__(self, rendezvousAddr=None, controllerNodeId=0, bluetoothAdapter=None):
self.lastNetworkId = None
Cmd.__init__(self)
Cmd.identchars = string.ascii_letters + string.digits + "-"
DeviceMgrCmd.command_names.sort()
self.devCtrl = ChipDeviceCtrl.ChipDeviceController(
controllerNodeId=controllerNodeId, bluetoothAdapter=bluetoothAdapter)
self.commissionableNodeCtrl = ChipCommissionableNodeCtrl.ChipCommissionableNodeController()
command_names = [
"ble-scan",
"connect",
"resolve",
"zcl",
"discover",
"get-fabricid",
]
def do_blescan(self, line):
if not self.bleMgr:
self.bleMgr = BleManager(self.devCtrl)
self.bleMgr.scan(line)
return
def do_connect(self, line):
try:
print("Device is assigned with nodeid = {}".format(nodeid))
if args[0] == "-ip" and len(args) >= 3:
self.devCtrl.ConnectIP(args[1].encode(
"utf-8"), int(args[2]), nodeid)
elif args[0] == "-ble" and len(args) >= 3:
self.devCtrl.ConnectBLE(int(args[1]), int(args[2]), nodeid)
else:
print("Usage:")
self.do_help("connect SetupPinCode")
return
print(
"Device temporary node id (**this does not match spec**): {}".format(nodeid))
except exceptions.ChipStackException as ex:
print(str(ex))
return
def do_resolve(self, line):
"""
resolve <nodeid>
Resolve DNS-SD name corresponding with the given node ID and
update address of the node in the device controller.
"""
try:
args = shlex.split(line)
if len(args) == 1:
err = self.devCtrl.ResolveNode(int(args[0]))
if err == 0:
address = self.devCtrl.GetAddressAndPort(int(args[0]))
address = "{}:{}".format(
*address) if address else "unknown"
print("Current address: " + address)
else:
self.do_help("resolve")
except exceptions.ChipStackException as ex:
print(str(ex))
return
def do_discover(self, line):
try:
if args.all:
self.commissionableNodeCtrl.DiscoverCommissioners()
self.wait_for_many_discovered_devices()
self.commissionableNodeCtrl.PrintDiscoveredCommissioners()
self.devCtrl.DiscoverAllCommissioning()
self.wait_for_many_discovered_devices()
else:
self.do_help("discover")
return
self.devCtrl.PrintDiscoveredDevices()
except exceptions.ChipStackException as ex:
print('exception')
print(str(ex))
return
except:
self.do_help("discover")
return
def do_zcl(self, line):
try:
args = shlex.split(line)
all_commands = self.devCtrl.ZCLCommandList()
if len(args) == 1 and args[0] == '?':
print('\n'.join(all_commands.keys()))
elif len(args) == 2 and args[0] == '?':
if args[1] not in all_commands:
raise exceptions.UnknownCluster(args[1])
for commands in all_commands.get(args[1]).items():
args = ", ".join(["{}: {}".format(argName, argType)
for argName, argType in commands[1].items()])
print(commands[0])
if commands[1]:
print(" ", args)
else:
print(" <no arguments>")
elif len(args) > 4:
if args[0] not in all_commands:
raise exceptions.UnknownCluster(args[0])
command = all_commands.get(args[0]).get(args[1], None)
# When command takes no arguments, (not command) is True
if command is None:
raise exceptions.UnknownCommand(args[0], args[1])
err, res = self.devCtrl.ZCLSend(args[0], args[1], int(
args[2]), int(args[3]), int(args[4]), FormatZCLArguments(args[5:], command), blocking=True)
if err != 0:
print("Failed to receive command response: {}".format(res))
elif res != None:
print("Received command status response:")
print(res)
else:
print("Success, no status code is attached with response.")
else:
self.do_help("zcl")
except exceptions.ChipStackException as ex:
print("An exception occurred during process ZCL command:")
print(str(ex))
except Exception as ex:
print("An exception occurred during processing input:")
traceback.print_exc()
print(str(ex))
def do_getfabricid(self, line):
try:
compressed_fabricid = self.devCtrl.GetCompressedFabricId()
raw_fabricid = self.devCtrl.GetFabricId()
except exceptions.ChipStackException as ex:
print("An exception occurred during reading FabricID:")
print(str(ex))
return
print("Get fabric ID complete")
print("Raw Fabric ID: 0x{:016x}".format(raw_fabricid)
+ " (" + str(raw_fabricid) + ")")
print("Compressed Fabric ID: 0x{:016x}".format(compressed_fabricid)
+ " (" + str(compressed_fabricid) + ")")