-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathasync_client.py
More file actions
79 lines (65 loc) · 3.16 KB
/
async_client.py
File metadata and controls
79 lines (65 loc) · 3.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
from .s7comm import AsyncS7Comm, enums
from .s7comm.packets.variable_address import VariableAddress
from .s7comm.szl import (
CPUStateDataTree,
ModuleIdentificationDataTree,
ModuleIdentificationIndex,
SZLResponseData,
)
class AsyncClient:
def __init__(
self,
tpdu_size: int = 1024, # cotp packet length
pdu_length: int = 480, # s7comm packet length
source_tsap: int = 0x0100,
dest_tsap: int = 0x0101,
) -> None:
self.s7comm = AsyncS7Comm(
tpdu_size=tpdu_size,
pdu_length=pdu_length,
source_tsap=source_tsap,
dest_tsap=dest_tsap,
)
self.is_connected = False
async def connect(self, address: str, rack: int, slot: int, port: int = 102) -> None:
await self.s7comm.connect(address=address, rack=rack, slot=slot, port=port)
self.is_connected = True
async def disconnect(self) -> None:
await self.close()
self.is_connected = False
def get_pdu_length(self) -> int:
return self.s7comm.pdu_length
async def close(self) -> None:
await self.s7comm.close()
async def get_cpu_state(self) -> enums.CPUStatus:
response = await self.s7comm.read_szl(szl_id=0x0424, szl_index=0x0000)
response_data = SZLResponseData.parse(response.data.data)
cpu_state = CPUStateDataTree.parse(response_data.szl_data_tree_list[0])
return cpu_state.requested_mode
async def read_area(self, address: str) -> bytes:
return await self.s7comm.read_area(VariableAddress.from_string(address))
async def write_area(self, address: str, data: bytes) -> None:
await self.s7comm.write_area(address=VariableAddress.from_string(address), data=data)
async def get_order_code(self) -> str | None:
response = await self.s7comm.read_szl(szl_id=0x0011, szl_index=0x0000)
szl_data = SZLResponseData.parse(response.data.data)
for date_tree in szl_data.szl_data_tree_list:
module_identification = ModuleIdentificationDataTree.parse(date_tree)
if module_identification.index == ModuleIdentificationIndex.MODULE_IDENTIFICATION:
return module_identification.order_number
return None
async def read_szl(self, szl_id: int, szl_index: int = 0x0000) -> SZLResponseData:
response_data = await self.s7comm.read_szl(szl_id=szl_id, szl_index=szl_index)
return SZLResponseData.parse(response_data.data.data)
async def read_szl_list(self) -> SZLResponseData:
response_data = await self.s7comm.read_szl(szl_id=0x0000, szl_index=0x0000)
return SZLResponseData.parse(response_data.data.data)
async def read_multi_vars(self, items: list[str]) -> list[bytes]:
vars_ = [VariableAddress.from_string(item) for item in items]
response = await self.s7comm.read_multi_vars(items=vars_)
return response.values()
async def write_multi_vars(self, items: list[tuple[str, bytes]]) -> None:
vars_ = [(VariableAddress.from_string(address), data) for address, data in items]
response = await self.s7comm.write_multi_vars(items=vars_)
response.check_result()
return None