rtems-tools/tester/rt/tftpy/TftpPacketFactory.py
Chris Johns f24e11688e tester: Update the Python TFTP server to fix Python3 issues.
Updated to af2f2fe89a3bf45748b78703820efb0986a8207a.
Repo is https://github.com/msoulier/tftpy.git
2018-11-08 18:13:55 +11:00

48 lines
1.5 KiB
Python

# vim: ts=4 sw=4 et ai:
# -*- coding: utf8 -*-
"""This module implements the TftpPacketFactory class, which can take a binary
buffer, and return the appropriate TftpPacket object to represent it, via the
parse() method."""
from .TftpShared import *
from .TftpPacketTypes import *
import logging
log = logging.getLogger('tftpy.TftpPacketFactory')
class TftpPacketFactory(object):
"""This class generates TftpPacket objects. It is responsible for parsing
raw buffers off of the wire and returning objects representing them, via
the parse() method."""
def __init__(self):
self.classes = {
1: TftpPacketRRQ,
2: TftpPacketWRQ,
3: TftpPacketDAT,
4: TftpPacketACK,
5: TftpPacketERR,
6: TftpPacketOACK
}
def parse(self, buffer):
"""This method is used to parse an existing datagram into its
corresponding TftpPacket object. The buffer is the raw bytes off of
the network."""
log.debug("parsing a %d byte packet" % len(buffer))
(opcode,) = struct.unpack(str("!H"), buffer[:2])
log.debug("opcode is %d" % opcode)
packet = self.__create(opcode)
packet.buffer = buffer
return packet.decode()
def __create(self, opcode):
"""This method returns the appropriate class object corresponding to
the passed opcode."""
tftpassert(opcode in self.classes,
"Unsupported opcode: %d" % opcode)
packet = self.classes[opcode]()
return packet