Skip to content

Commit 080a551

Browse files
committed
revised IPv6-Opts schema based on changes in Schema and fields
1 parent 0a22081 commit 080a551

File tree

2 files changed

+50
-55
lines changed

2 files changed

+50
-55
lines changed

pcapkit/foundation/registry/protocols.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from pcapkit.protocols.schema.internet.hip import Parameter as Schema_HIP_Parameter
2828
from pcapkit.protocols.schema.internet.hopopt import Option as Schema_HOPOPT_Option
2929
from pcapkit.protocols.schema.internet.ipv4 import Option as Schema_IPv4_Option
30-
from pcapkit.protocols.schema.internet.ipv6_opts import IPv6_Opts as Schema_IPv6_Opts
30+
from pcapkit.protocols.schema.internet.ipv6_opts import Option as Schema_IPv6_Opts_Option
3131
from pcapkit.protocols.schema.internet.ipv6_route import MAP_IPV6_ROUTE_DATA
3232
from pcapkit.protocols.schema.internet.mh import MAP_MH_DATA
3333
from pcapkit.protocols.schema.internet.mh import CGAParameter as Schema_MH_CGAParameter
@@ -94,7 +94,6 @@
9494
from pcapkit.protocols.misc.pcapng import RecordParser as PCAPNG_RecordParser
9595
from pcapkit.protocols.misc.pcapng import SecretsConstructor as PCAPNG_SecretsConstructor
9696
from pcapkit.protocols.misc.pcapng import SecretsParser as PCAPNG_SecretsParser
97-
from pcapkit.protocols.schema.internet.ipv6_opts import Option as Schema_IPv6_Opts_Option
9897
from pcapkit.protocols.schema.internet.ipv6_route import \
9998
RoutingType as Schema_IPv6_Route_RoutingType
10099
from pcapkit.protocols.schema.internet.mh import CGAExtension as Schema_MH_CGAExtension
@@ -408,7 +407,7 @@ def register_ipv6_opts_option(code: 'IPv6_Option', meth: 'str | tuple[IPv6_Opts_
408407

409408
IPv6_Opts.register_option(code, meth)
410409
if schema is not None:
411-
cast('OptionField[Schema_IPv6_Opts_Option]', Schema_IPv6_Opts.options).registry[code] = schema
410+
Schema_IPv6_Opts_Option.register(code, schema)
412411
logger.info('registered IPv6-Opts option parser: %s', code.name)
413412

414413

pcapkit/protocols/schema/internet/ipv6_opts.py

Lines changed: 48 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from pcapkit.corekit.fields.numbers import (EnumField, NumberField, UInt8Field, UInt16Field,
2121
UInt32Field)
2222
from pcapkit.corekit.fields.strings import BitField, BytesField, PaddingField
23-
from pcapkit.protocols.schema.schema import Schema, schema_final
23+
from pcapkit.protocols.schema.schema import EnumSchema, Schema, schema_final
2424
from pcapkit.utilities.exceptions import FieldValueError
2525
from pcapkit.utilities.logging import SPHINX_TYPE_CHECKING
2626

@@ -38,7 +38,7 @@
3838

3939
if TYPE_CHECKING:
4040
from ipaddress import IPv4Address, IPv6Address
41-
from typing import Any, Optional
41+
from typing import Any, DefaultDict, Optional, Type
4242

4343
from pcapkit.corekit.fields.field import _Field as Field
4444
from pcapkit.protocols.protocol import Protocol
@@ -149,19 +149,18 @@ def smf_dpd_data_selector(pkt: 'dict[str, Any]') -> 'Field':
149149
150150
Returns:
151151
* If ``mode`` is ``0``, returns a :class:`~pcapkit.corekit.fields.misc.SchemaField`
152-
wrapped :class:`~pcapkit.protocols.schema.internet.hopopt.SMFIdentificationBasedDPDOption`
152+
wrapped :class:`~pcapkit.protocols.schema.internet.ipv6_opts.SMFIdentificationBasedDPDOption`
153153
instance.
154154
* If ``mode`` is ``1``, returns a :class:`~pcapkit.corekit.fields.misc.SchemaField`
155-
wrapped :class:`~pcapkit.protocols.schema.internet.hopopt.SMFHashBasedDPDOption`
155+
wrapped :class:`~pcapkit.protocols.schema.internet.ipv6_opts.SMFHashBasedDPDOption`
156156
instance.
157157
158158
"""
159159
mode = Enum_SMFDPDMode.get(pkt['test']['mode'])
160-
if mode == Enum_SMFDPDMode.I_DPD:
161-
return SchemaField(length=pkt['test']['len'], schema=SMFIdentificationBasedDPDOption)
162-
if mode == Enum_SMFDPDMode.H_DPD:
163-
return SchemaField(length=pkt['test']['len'], schema=SMFHashBasedDPDOption)
164-
raise FieldValueError(f'IPv6-Opts: invalid SMF DPD mode: {mode}')
160+
schema = SMFDPDOption.registry[mode]
161+
if schema is None:
162+
raise FieldValueError(f'IPv6-Opts: invalid SMF DPD mode: {mode}')
163+
return SchemaField(length=pkt['test']['len'], schema=schema)
165164

166165

167166
def smf_i_dpd_tid_selector(pkt: 'dict[str, Any]') -> 'Field':
@@ -209,26 +208,27 @@ def quick_start_data_selector(pkt: 'dict[str, Any]') -> 'Field':
209208
210209
Returns:
211210
* If ``func`` is ``0``, returns a :class:`~pcapkit.corekit.fields.misc.SchemaField`
212-
wrapped :class:`~pcapkit.protocols.schema.internet.hopopt.QuickStartRequestOption`
211+
wrapped :class:`~pcapkit.protocols.schema.internet.ipv6_opts.QuickStartRequestOption`
213212
instance.
214213
* If ``func`` is ``8``, returns a :class:`~pcapkit.corekit.fields.misc.SchemaField`
215-
wrapped :class:`~pcapkit.protocols.schema.internet.hopopt.QuickStartReportOption`
214+
wrapped :class:`~pcapkit.protocols.schema.internet.ipv6_opts.QuickStartReportOption`
216215
instance.
217216
218217
"""
219218
func = Enum_QSFunction.get(pkt['flags']['func'])
220219
pkt['flags']['func'] = func
221220

222-
if func == Enum_QSFunction.Quick_Start_Request:
223-
return SchemaField(length=5, schema=QuickStartRequestOption)
224-
if func == Enum_QSFunction.Report_of_Approved_Rate:
225-
return SchemaField(length=5, schema=QuickStartReportOption)
226-
raise FieldValueError(f'IPv6-Opts: invalid QS function: {func}')
221+
schema = QuickStartOption.registry[func]
222+
if schema is None:
223+
raise FieldValueError(f'IPv6-Opts: invalid QS function: {func}')
224+
return SchemaField(length=5, schema=schema)
227225

228226

229-
class Option(Schema):
227+
class Option(EnumSchema[Enum_Option]):
230228
"""Header schema for IPv6-Opts options."""
231229

230+
__default__ = lambda: UnassignedOption
231+
232232
#: Option type.
233233
type: 'Enum_Option' = EnumField(length=1, namespace=Enum_Option)
234234
#: Option length (conditional in case of ``Pad1`` option).
@@ -265,7 +265,8 @@ def __init__(self, type: 'Enum_Option', len: 'int', data: 'bytes') -> 'None': ..
265265

266266

267267
@schema_final
268-
class PadOption(Option):
268+
class PadOption(Option, code=[Enum_Option.Pad1,
269+
Enum_Option.PadN]):
269270
"""Header schema for IPv6-Opts padding options."""
270271

271272
#: Padding.
@@ -276,7 +277,7 @@ def __init__(self, type: 'Enum_Option', len: 'int') -> 'None': ...
276277

277278

278279
@schema_final
279-
class TunnelEncapsulationLimitOption(Option):
280+
class TunnelEncapsulationLimitOption(Option, code=Enum_Option.Tunnel_Encapsulation_Limit):
280281
"""Header schema for IPv6-Opts tunnel encapsulation limit options."""
281282

282283
#: Tunnel encapsulation limit.
@@ -287,7 +288,7 @@ def __init__(self, type: 'Enum_Option', len: 'int', limit: 'int') -> 'None': ...
287288

288289

289290
@schema_final
290-
class RouterAlertOption(Option):
291+
class RouterAlertOption(Option, code=Enum_Option.Router_Alert):
291292
"""Header schema for IPv6-Opts router alert options."""
292293

293294
#: Router alert.
@@ -298,7 +299,7 @@ def __init__(self, type: 'Enum_Option', len: 'int', alert: 'Enum_RouterAlert') -
298299

299300

300301
@schema_final
301-
class CALIPSOOption(Option):
302+
class CALIPSOOption(Option, code=Enum_Option.CALIPSO):
302303
"""Header schema for IPv6-Opts common architecture label IPv6 security options."""
303304

304305
#: CALIPSO domain of interpretation.
@@ -350,16 +351,22 @@ def post_process(self, packet: 'dict[str, Any]') -> 'SMFDPDOption':
350351
return ret
351352

352353

353-
class SMFDPDOption(Option):
354+
# register ``_SMFDPDOption`` as ``SMF_DPD`` option
355+
Option.register(Enum_Option.SMF_DPD, _SMFDPDOption)
356+
357+
358+
class SMFDPDOption(Option, EnumSchema[Enum_SMFDPDMode]):
354359
"""Header schema for IPv6-Opts simplified multicast forwarding duplicate packet
355360
detection (``SMF_DPD``) options."""
356361

362+
__enum__: 'DefaultDict[Enum_SMFDPDMode, Type[SMFDPDOption]]' = collections.defaultdict(lambda: None) # type: ignore[arg-type,return-value]
363+
357364
if TYPE_CHECKING:
358365
mode: 'Enum_SMFDPDMode'
359366

360367

361368
@schema_final
362-
class SMFIdentificationBasedDPDOption(SMFDPDOption):
369+
class SMFIdentificationBasedDPDOption(SMFDPDOption, code=Enum_SMFDPDMode.I_DPD):
363370
"""Header schema for IPv6-Opts SMF identification-based DPD options."""
364371

365372
test: 'SMFDPDTestFlag' = ForwardMatchField(BitField(length=1, namespace={
@@ -401,7 +408,7 @@ def __init__(self, type: 'Enum_Option', len: 'int', info: 'TaggerIDInfo',
401408

402409

403410
@schema_final
404-
class SMFHashBasedDPDOption(SMFDPDOption):
411+
class SMFHashBasedDPDOption(SMFDPDOption, code=Enum_SMFDPDMode.H_DPD):
405412
"""Header schema for IPv6-Opts SMF hash-based DPD options."""
406413

407414
#: Hash assist value (HAV).
@@ -426,7 +433,7 @@ def __init__(self, type: 'Enum_Option', len: 'int', hav: 'bytes') -> 'None': ...
426433

427434

428435
@schema_final
429-
class PDMOption(Option):
436+
class PDMOption(Option, code=Enum_Option.PDM):
430437
"""Header schema for IPv6-Opts performance and diagnostic metrics (PDM) options."""
431438

432439
#: Scale delta time last received (DTLR).
@@ -475,7 +482,11 @@ def post_process(self, packet: 'dict[str, Any]') -> 'QuickStartOption':
475482
return ret
476483

477484

478-
class QuickStartOption(Option):
485+
# register ``_QuickStartOption`` as ``Quick_Start`` option
486+
Option.register(Enum_Option.Quick_Start, _QuickStartOption)
487+
488+
489+
class QuickStartOption(Option, EnumSchema[Enum_QSFunction]):
479490
"""Header schema for IPv6-Opts quick start options."""
480491

481492
#: Flags.
@@ -489,7 +500,7 @@ class QuickStartOption(Option):
489500

490501

491502
@schema_final
492-
class QuickStartRequestOption(QuickStartOption):
503+
class QuickStartRequestOption(QuickStartOption, code=Enum_QSFunction.Quick_Start_Request):
493504
"""Header schema for IPv6-Opts quick start request options."""
494505

495506
#: QS time-to-live (TTL).
@@ -505,7 +516,7 @@ def __init__(self, type: 'Enum_Option', len: 'int', flags: 'QuickStartFlags',
505516

506517

507518
@schema_final
508-
class QuickStartReportOption(QuickStartOption):
519+
class QuickStartReportOption(QuickStartOption, code=Enum_QSFunction.Report_of_Approved_Rate):
509520
"""Header schema for IPv6-Opts quick start report of approved rate options."""
510521

511522
#: Reserved.
@@ -521,7 +532,8 @@ def __init__(self, type: 'Enum_Option', len: 'int', flags: 'QuickStartFlags',
521532

522533

523534
@schema_final
524-
class RPLOption(Option):
535+
class RPLOption(Option, code=[Enum_Option.RPL_Option_0x23,
536+
Enum_Option.RPL_Option_0x63]):
525537
"""Header schema for IPv6-Opts routing protocol for low-power and lossy networks (RPL) options."""
526538

527539
#: Flags.
@@ -541,7 +553,7 @@ def __init__(self, type: 'Enum_Option', len: 'int', flags: 'RPLFlags', id: 'int'
541553

542554

543555
@schema_final
544-
class MPLOption(Option):
556+
class MPLOption(Option, code=Enum_Option.MPL_Option):
545557
"""Header schema for IPv6-Opts multicast protocol for low-power and lossy networks (MPL) options."""
546558

547559
#: Flags.
@@ -582,7 +594,7 @@ def __init__(self, type: 'Enum_Option', len: 'int', flags: 'MPLFlags', seq: 'int
582594

583595

584596
@schema_final
585-
class ILNPOption(Option):
597+
class ILNPOption(Option, code=Enum_Option.ILNP_Nonce):
586598
"""Header schema for IPv6-Opts identifier-locator network protocol (ILNP) options."""
587599

588600
#: Nonce value.
@@ -593,7 +605,7 @@ def __init__(self, type: 'Enum_Option', len: 'int', nonce: 'int') -> 'None': ...
593605

594606

595607
@schema_final
596-
class LineIdentificationOption(Option):
608+
class LineIdentificationOption(Option, code=Enum_Option.Line_Identification_Option):
597609
"""Header schema for IPv6-Opts line-identification options."""
598610

599611
#: Line ID length.
@@ -606,7 +618,7 @@ def __init__(self, type: 'Enum_Option', len: 'int', id_len: 'int', id: 'bytes')
606618

607619

608620
@schema_final
609-
class JumboPayloadOption(Option):
621+
class JumboPayloadOption(Option, code=Enum_Option.Jumbo_Payload):
610622
"""Header schema for IPv6-Opts jumbo payload options."""
611623

612624
#: Jumbo payload length.
@@ -617,7 +629,7 @@ def __init__(self, type: 'Enum_Option', len: 'int', jumbo_len: 'int') -> 'None':
617629

618630

619631
@schema_final
620-
class HomeAddressOption(Option):
632+
class HomeAddressOption(Option, code=Enum_Option.Home_Address):
621633
"""Header schema for IPv6-Opts home address options."""
622634

623635
#: Home address.
@@ -628,7 +640,7 @@ def __init__(self, type: 'Enum_Option', len: 'int', addr: 'IPv6Address | int | s
628640

629641

630642
@schema_final
631-
class IPDFFOption(Option):
643+
class IPDFFOption(Option, code=Enum_Option.IP_DFF):
632644
"""Header schema for IPv6-Opts depth-first forwarding (``IP_DFF``) options."""
633645

634646
#: Flags.
@@ -657,23 +669,7 @@ class IPv6_Opts(Schema):
657669
length=lambda pkt: pkt['len'] * 8 + 6,
658670
base_schema=Option,
659671
type_name='type',
660-
registry=collections.defaultdict(lambda: UnassignedOption, {
661-
Enum_Option.Pad1: PadOption,
662-
Enum_Option.PadN: PadOption,
663-
Enum_Option.Tunnel_Encapsulation_Limit: TunnelEncapsulationLimitOption,
664-
Enum_Option.Router_Alert: RouterAlertOption,
665-
Enum_Option.CALIPSO: CALIPSOOption,
666-
Enum_Option.SMF_DPD: _SMFDPDOption,
667-
Enum_Option.PDM: PDMOption,
668-
Enum_Option.Quick_Start: _QuickStartOption,
669-
Enum_Option.RPL_Option_0x63: RPLOption,
670-
Enum_Option.MPL_Option: MPLOption,
671-
Enum_Option.ILNP_Nonce: ILNPOption,
672-
Enum_Option.Line_Identification_Option: LineIdentificationOption,
673-
Enum_Option.Jumbo_Payload: JumboPayloadOption,
674-
Enum_Option.Home_Address: HomeAddressOption,
675-
Enum_Option.IP_DFF: IPDFFOption,
676-
})
672+
registry=Option.registry,
677673
)
678674
#: Payload.
679675
payload: 'bytes' = PayloadField()

0 commit comments

Comments
 (0)