diff --git a/third_party/fatfsgen/README.md b/third_party/fatfsgen/README.md new file mode 100644 index 0000000000000000000000000000000000000000..70ed6b1db6ab673f1798fcc668a40c7cc4a0e95f --- /dev/null +++ b/third_party/fatfsgen/README.md @@ -0,0 +1,3 @@ +Vendored from esp-idf/components/fastfs at 213504238f77e01073f668e5e8f87e3b3cc02a8f . + +Can be removed once we update to ESP-IDF 5.x. diff --git a/third_party/fatfsgen/fatfs_utils/__init__.py b/third_party/fatfsgen/fatfs_utils/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/third_party/fatfsgen/fatfs_utils/boot_sector.py b/third_party/fatfsgen/fatfs_utils/boot_sector.py new file mode 100644 index 0000000000000000000000000000000000000000..615dd065112d1b9ceaad1ec14df68e45ea7c9d6d --- /dev/null +++ b/third_party/fatfsgen/fatfs_utils/boot_sector.py @@ -0,0 +1,168 @@ +# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 +from inspect import getmembers, isroutine +from typing import Optional + +from construct import Const, Int8ul, Int16ul, Int32ul, PaddedString, Struct, core + +from .exceptions import InconsistentFATAttributes, NotInitialized +from .fatfs_state import BootSectorState +from .utils import (ALLOWED_SECTOR_SIZES, ALLOWED_SECTORS_PER_CLUSTER, EMPTY_BYTE, FAT32, FULL_BYTE, + SHORT_NAMES_ENCODING, FATDefaults, generate_4bytes_random, pad_string) + + +class BootSector: + """ + This class describes the first sector of the volume in the Reserved Region. + It contains data from BPB (BIOS Parameter Block) and BS (Boot sector). The fields of the BPB and BS are mixed in + the header of the physical boot sector. Fields with prefix BPB belongs to BPB block and with prefix BS + belongs to the actual boot sector. + + Please beware, that the name of class BootSector refer to data both from the boot sector and BPB. + ESP32 ignores fields with prefix "BS_"! Fields with prefix BPB_ are essential to read the filesystem. + """ + MAX_VOL_LAB_SIZE = 11 + MAX_OEM_NAME_SIZE = 8 + MAX_FS_TYPE_SIZE = 8 + + # the FAT specification defines 512 bytes for the boot sector header + BOOT_HEADER_SIZE = 512 + + BOOT_SECTOR_HEADER = Struct( + # this value reflects BS_jmpBoot used for ESP32 boot sector (any other accepted) + 'BS_jmpBoot' / Const(b'\xeb\xfe\x90'), + 'BS_OEMName' / PaddedString(MAX_OEM_NAME_SIZE, SHORT_NAMES_ENCODING), + 'BPB_BytsPerSec' / Int16ul, + 'BPB_SecPerClus' / Int8ul, + 'BPB_RsvdSecCnt' / Int16ul, + 'BPB_NumFATs' / Int8ul, + 'BPB_RootEntCnt' / Int16ul, + 'BPB_TotSec16' / Int16ul, # zero if the FAT type is 32, otherwise number of sectors + 'BPB_Media' / Int8ul, + 'BPB_FATSz16' / Int16ul, # for FAT32 always zero, for FAT12/FAT16 number of sectors per FAT + 'BPB_SecPerTrk' / Int16ul, + 'BPB_NumHeads' / Int16ul, + 'BPB_HiddSec' / Int32ul, + 'BPB_TotSec32' / Int32ul, # zero if the FAT type is 12/16, otherwise number of sectors + 'BS_DrvNum' / Const(b'\x80'), + 'BS_Reserved1' / Const(EMPTY_BYTE), + 'BS_BootSig' / Const(b'\x29'), + 'BS_VolID' / Int32ul, + 'BS_VolLab' / PaddedString(MAX_VOL_LAB_SIZE, SHORT_NAMES_ENCODING), + 'BS_FilSysType' / PaddedString(MAX_FS_TYPE_SIZE, SHORT_NAMES_ENCODING), + 'BS_EMPTY' / Const(448 * EMPTY_BYTE), + 'Signature_word' / Const(FATDefaults.SIGNATURE_WORD) + ) + assert BOOT_SECTOR_HEADER.sizeof() == BOOT_HEADER_SIZE + + def __init__(self, boot_sector_state: Optional[BootSectorState] = None) -> None: + self._parsed_header: dict = {} + self.boot_sector_state: BootSectorState = boot_sector_state + + def generate_boot_sector(self) -> None: + boot_sector_state: BootSectorState = self.boot_sector_state + if boot_sector_state is None: + raise NotInitialized('The BootSectorState instance is not initialized!') + volume_uuid = generate_4bytes_random() + pad_header: bytes = (boot_sector_state.sector_size - BootSector.BOOT_HEADER_SIZE) * EMPTY_BYTE + data_content: bytes = boot_sector_state.data_sectors * boot_sector_state.sector_size * FULL_BYTE + root_dir_content: bytes = boot_sector_state.root_dir_sectors_cnt * boot_sector_state.sector_size * EMPTY_BYTE + fat_tables_content: bytes = (boot_sector_state.sectors_per_fat_cnt + * boot_sector_state.fat_tables_cnt + * boot_sector_state.sector_size + * EMPTY_BYTE) + self.boot_sector_state.binary_image = ( + BootSector.BOOT_SECTOR_HEADER.build( + dict(BS_OEMName=pad_string(boot_sector_state.oem_name, size=BootSector.MAX_OEM_NAME_SIZE), + BPB_BytsPerSec=boot_sector_state.sector_size, + BPB_SecPerClus=boot_sector_state.sectors_per_cluster, + BPB_RsvdSecCnt=boot_sector_state.reserved_sectors_cnt, + BPB_NumFATs=boot_sector_state.fat_tables_cnt, + BPB_RootEntCnt=boot_sector_state.entries_root_count, + # if fat type is 12 or 16 BPB_TotSec16 is filled and BPB_TotSec32 is 0x00 and vice versa + BPB_TotSec16=0x00 if boot_sector_state.fatfs_type == FAT32 else boot_sector_state.sectors_count, + BPB_Media=boot_sector_state.media_type, + BPB_FATSz16=boot_sector_state.sectors_per_fat_cnt, + BPB_SecPerTrk=boot_sector_state.sec_per_track, + BPB_NumHeads=boot_sector_state.num_heads, + BPB_HiddSec=boot_sector_state.hidden_sectors, + BPB_TotSec32=boot_sector_state.sectors_count if boot_sector_state.fatfs_type == FAT32 else 0x00, + BS_VolID=volume_uuid, + BS_VolLab=pad_string(boot_sector_state.volume_label, + size=BootSector.MAX_VOL_LAB_SIZE), + BS_FilSysType=pad_string(boot_sector_state.file_sys_type, + size=BootSector.MAX_FS_TYPE_SIZE) + ) + ) + pad_header + fat_tables_content + root_dir_content + data_content + ) + + def parse_boot_sector(self, binary_data: bytes) -> None: + """ + Checks the validity of the boot sector and derives the metadata from boot sector to the structured shape. + """ + try: + self._parsed_header = BootSector.BOOT_SECTOR_HEADER.parse(binary_data) + except core.StreamError: + raise NotInitialized('The boot sector header is not parsed successfully!') + + if self._parsed_header['BPB_TotSec16'] != 0x00: + sectors_count_: int = self._parsed_header['BPB_TotSec16'] + elif self._parsed_header['BPB_TotSec32'] != 0x00: + # uncomment for FAT32 implementation + # sectors_count_ = self._parsed_header['BPB_TotSec32'] + # possible_fat_types = [FAT32] + assert self._parsed_header['BPB_TotSec16'] == 0 + raise NotImplementedError('FAT32 not implemented!') + else: + raise InconsistentFATAttributes('The number of FS sectors cannot be zero!') + + if self._parsed_header['BPB_BytsPerSec'] not in ALLOWED_SECTOR_SIZES: + raise InconsistentFATAttributes(f'The number of bytes ' + f"per sector is {self._parsed_header['BPB_BytsPerSec']}! " + f'The accepted values are {ALLOWED_SECTOR_SIZES}') + if self._parsed_header['BPB_SecPerClus'] not in ALLOWED_SECTORS_PER_CLUSTER: + raise InconsistentFATAttributes(f'The number of sectors per cluster ' + f"is {self._parsed_header['BPB_SecPerClus']}" + f'The accepted values are {ALLOWED_SECTORS_PER_CLUSTER}') + + total_root_bytes: int = self._parsed_header['BPB_RootEntCnt'] * FATDefaults.ENTRY_SIZE + root_dir_sectors_cnt_: int = total_root_bytes // self._parsed_header['BPB_BytsPerSec'] + self.boot_sector_state = BootSectorState(oem_name=self._parsed_header['BS_OEMName'], + sector_size=self._parsed_header['BPB_BytsPerSec'], + sectors_per_cluster=self._parsed_header['BPB_SecPerClus'], + reserved_sectors_cnt=self._parsed_header['BPB_RsvdSecCnt'], + fat_tables_cnt=self._parsed_header['BPB_NumFATs'], + root_dir_sectors_cnt=root_dir_sectors_cnt_, + sectors_count=sectors_count_, + media_type=self._parsed_header['BPB_Media'], + sec_per_track=self._parsed_header['BPB_SecPerTrk'], + num_heads=self._parsed_header['BPB_NumHeads'], + hidden_sectors=self._parsed_header['BPB_HiddSec'], + volume_label=self._parsed_header['BS_VolLab'], + file_sys_type=self._parsed_header['BS_FilSysType'], + volume_uuid=self._parsed_header['BS_VolID']) + self.boot_sector_state.binary_image = binary_data + assert self.boot_sector_state.file_sys_type in (f'FAT{self.boot_sector_state.fatfs_type} ', 'FAT ') + + def __str__(self) -> str: + """ + FATFS properties parser (internal helper tool for fatfsgen.py/fatfsparse.py) + Provides all the properties of given FATFS instance by parsing its boot sector (returns formatted string) + """ + + if self._parsed_header == {}: + return 'Boot sector is not initialized!' + res: str = 'FATFS properties:\n' + for member in getmembers(self.boot_sector_state, lambda a: not (isroutine(a))): + prop_ = getattr(self.boot_sector_state, member[0]) + if isinstance(prop_, int) or isinstance(prop_, str) and not member[0].startswith('_'): + res += f'{member[0]}: {prop_}\n' + return res + + @property + def binary_image(self) -> bytes: + # when BootSector is not instantiated, self.boot_sector_state might be None + if self.boot_sector_state is None or len(self.boot_sector_state.binary_image) == 0: + raise NotInitialized('Boot sector is not initialized!') + bin_image_: bytes = self.boot_sector_state.binary_image + return bin_image_ diff --git a/third_party/fatfsgen/fatfs_utils/cluster.py b/third_party/fatfsgen/fatfs_utils/cluster.py new file mode 100644 index 0000000000000000000000000000000000000000..ced9b1f5c3b1b5c9a2d70358d7895e700eab73cc --- /dev/null +++ b/third_party/fatfsgen/fatfs_utils/cluster.py @@ -0,0 +1,213 @@ +# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +from typing import Dict, Optional + +from construct import Int16ul + +from .fatfs_state import BootSectorState +from .utils import (EMPTY_BYTE, FAT12, FAT16, build_byte, merge_by_half_byte_12_bit_little_endian, + split_by_half_byte_12_bit_little_endian) + + +def get_dir_size(is_root: bool, boot_sector: BootSectorState) -> int: + dir_size_: int = boot_sector.root_dir_sectors_cnt * boot_sector.sector_size if is_root else boot_sector.sector_size + return dir_size_ + + +class Cluster: + """ + class Cluster handles values in FAT table and allocates sectors in data region. + """ + RESERVED_BLOCK_ID: int = 0 + ROOT_BLOCK_ID: int = 1 + ALLOCATED_BLOCK_FAT12: int = 0xFFF + ALLOCATED_BLOCK_FAT16: int = 0xFFFF + ALLOCATED_BLOCK_SWITCH = {FAT12: ALLOCATED_BLOCK_FAT12, FAT16: ALLOCATED_BLOCK_FAT16} + INITIAL_BLOCK_SWITCH: Dict[int, int] = {FAT12: 0xFF8, FAT16: 0xFFF8} + + def __init__(self, + cluster_id: int, + boot_sector_state: BootSectorState, + init_: bool) -> None: + """ + Initially, if init_ is False, the cluster is virtual and is not allocated (doesn't do changes in the FAT). + :param cluster_id: the cluster ID - a key value linking the file's cluster, + the corresponding physical cluster (data region) and the FAT table cluster. + :param boot_sector_state: auxiliary structure holding the file-system's metadata + :param init_: True for allocation the cluster on instantiation, otherwise False. + :returns: None + """ + self.id: int = cluster_id + self.boot_sector_state: BootSectorState = boot_sector_state + + self._next_cluster = None # type: Optional[Cluster] + # First cluster in FAT is reserved, low 8 bits contains BPB_Media and the rest is filled with 1 + # e.g. the esp32 media type is 0xF8 thus the FAT[0] = 0xFF8 for FAT12, 0xFFF8 for FAT16 + if self.id == Cluster.RESERVED_BLOCK_ID and init_: + self.set_in_fat(self.INITIAL_BLOCK_SWITCH[self.boot_sector_state.fatfs_type]) + return + self.cluster_data_address: int = self._compute_cluster_data_address() + assert self.cluster_data_address + + @property + def next_cluster(self): # type: () -> Optional[Cluster] + return self._next_cluster + + @next_cluster.setter + def next_cluster(self, value): # type: (Optional[Cluster]) -> None + self._next_cluster = value + + def _cluster_id_to_fat_position_in_bits(self, _id: int) -> int: + """ + This private method calculates the position of the memory block (cluster) in the FAT table. + + :param _id: the cluster ID - a key value linking the file's cluster, + the corresponding physical cluster (data region) and the FAT table cluster. + :returns: bit offset of the cluster in FAT + e.g.: + 00003000: 42 65 00 2E 00 74 00 78 00 74 00 0F 00 43 FF FF + + For FAT12 the third cluster has value = 0x02E and ID = 2. + Its bit-address is 24 (24 bits preceding, 0-indexed), because 0x2E starts at the bit-offset 24. + """ + logical_position_: int = self.boot_sector_state.fatfs_type * _id + return logical_position_ + + @staticmethod + def compute_cluster_data_address(boot_sector_state: BootSectorState, id_: int) -> int: + """ + This method translates the id of the cluster to the address in data region. + + :param boot_sector_state: the class with FS shared data + :param id_: id of the cluster + :returns: integer denoting the address of the cluster in the data region + """ + data_address_: int = boot_sector_state.root_directory_start + if not id_ == Cluster.ROOT_BLOCK_ID: + # the first data cluster id is 2 (we have to subtract reserved cluster and cluster for root) + data_address_ = boot_sector_state.sector_size * (id_ - 2) + boot_sector_state.data_region_start + return data_address_ + + def _compute_cluster_data_address(self) -> int: + return self.compute_cluster_data_address(self.boot_sector_state, self.id) + + @property + def fat_cluster_address(self) -> int: + """Determines how many bits precede the first bit of the cluster in FAT""" + return self._cluster_id_to_fat_position_in_bits(self.id) + + @property + def real_cluster_address(self) -> int: + """ + The property method computes the real address of the cluster in the FAT region. Result is simply + address of the cluster in fat + fat table address. + """ + cluster_address: int = self.boot_sector_state.fat_table_start_address + self.fat_cluster_address // 8 + return cluster_address + + def get_from_fat(self) -> int: + """ + Calculating the value in the FAT block, that denotes if the block is full, empty, or chained to other block. + + For FAT12 is the block stored in one and half byte. If the order of the block is even the first byte and second + half of the second byte belongs to the block. First half of the second byte and the third byte belongs to + the second block. + + e.g. b'\xff\x0f\x00' stores two blocks. First of them is evenly ordered (index 0) and is set to 0xfff, + that means full block that is final in chain of blocks + and second block is set to 0x000 that means empty block. + + three bytes - AB XC YZ - stores two blocks - CAB YZX + """ + address_: int = self.real_cluster_address + bin_img_: bytearray = self.boot_sector_state.binary_image + if self.boot_sector_state.fatfs_type == FAT12: + if self.fat_cluster_address % 8 == 0: + # even block + return bin_img_[self.real_cluster_address] | ((bin_img_[self.real_cluster_address + 1] & 0x0F) << 8) + # odd block + return ((bin_img_[self.real_cluster_address] & 0xF0) >> 4) | (bin_img_[self.real_cluster_address + 1] << 4) + if self.boot_sector_state.fatfs_type == FAT16: + return int.from_bytes(bin_img_[address_:address_ + 2], byteorder='little') + raise NotImplementedError('Only valid fatfs types are FAT12 and FAT16.') + + @property + def is_empty(self) -> bool: + """ + The property method takes a look into the binary array and checks if the bytes ordered by little endian + and relates to the current cluster are all zeros (which denotes they are empty). + """ + return self.get_from_fat() == 0x00 + + def set_in_fat(self, value: int) -> None: + """ + Sets cluster in FAT to certain value. + Firstly, we split the target value into 3 half bytes (max value is 0xfff). + Then we could encounter two situations: + 1. if the cluster index (indexed from zero) is even, we set the full byte computed by + self.cluster_id_to_logical_position_in_bits and the second half of the consequent byte. + Order of half bytes is 2, 1, 3. + + 2. if the cluster index is odd, we set the first half of the computed byte and the full consequent byte. + Order of half bytes is 1, 3, 2. + """ + + def _set_msb_half_byte(address: int, value_: int) -> None: + """ + Sets 4 most significant bits (msb half-byte) of 'boot_sector_state.binary_image' at given + 'address' to 'value_' (size of variable 'value_' is half byte) + + If a byte contents is 0b11110000, the msb half-byte would be 0b1111 + """ + self.boot_sector_state.binary_image[address] &= 0x0f + self.boot_sector_state.binary_image[address] |= value_ << 4 + + def _set_lsb_half_byte(address: int, value_: int) -> None: + """ + Sets 4 least significant bits (lsb half-byte) of 'boot_sector_state.binary_image' at given + 'address' to 'value_' (size of variable 'value_' is half byte) + + If a byte contents is 0b11110000, the lsb half-byte would be 0b0000 + """ + self.boot_sector_state.binary_image[address] &= 0xf0 + self.boot_sector_state.binary_image[address] |= value_ + + # value must fit into number of bits of the fat (12, 16 or 32) + assert value <= (1 << self.boot_sector_state.fatfs_type) - 1 + half_bytes = split_by_half_byte_12_bit_little_endian(value) + bin_img_: bytearray = self.boot_sector_state.binary_image + + if self.boot_sector_state.fatfs_type == FAT12: + assert merge_by_half_byte_12_bit_little_endian(*half_bytes) == value + if self.fat_cluster_address % 8 == 0: + # even block + bin_img_[self.real_cluster_address] = build_byte(half_bytes[1], half_bytes[0]) + _set_lsb_half_byte(self.real_cluster_address + 1, half_bytes[2]) + elif self.fat_cluster_address % 8 != 0: + # odd block + _set_msb_half_byte(self.real_cluster_address, half_bytes[0]) + bin_img_[self.real_cluster_address + 1] = build_byte(half_bytes[2], half_bytes[1]) + elif self.boot_sector_state.fatfs_type == FAT16: + bin_img_[self.real_cluster_address:self.real_cluster_address + 2] = Int16ul.build(value) + assert self.get_from_fat() == value + + @property + def is_root(self) -> bool: + """ + The FAT12/FAT16 contains only one root directory, + the root directory allocates the first cluster with the ID `ROOT_BLOCK_ID`. + The method checks if the cluster belongs to the root directory. + """ + return self.id == Cluster.ROOT_BLOCK_ID + + def allocate_cluster(self) -> None: + """ + This method sets bits in FAT table to `allocated` and clean the corresponding sector(s) + """ + self.set_in_fat(self.ALLOCATED_BLOCK_SWITCH[self.boot_sector_state.fatfs_type]) + + cluster_start = self.cluster_data_address + dir_size = get_dir_size(self.is_root, self.boot_sector_state) + cluster_end = cluster_start + dir_size + self.boot_sector_state.binary_image[cluster_start:cluster_end] = dir_size * EMPTY_BYTE diff --git a/third_party/fatfsgen/fatfs_utils/entry.py b/third_party/fatfsgen/fatfs_utils/entry.py new file mode 100644 index 0000000000000000000000000000000000000000..bb5d9f8f7a108b70d40571af1a9dd61985a31b6f --- /dev/null +++ b/third_party/fatfsgen/fatfs_utils/entry.py @@ -0,0 +1,253 @@ +# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +from typing import List, Optional, Union + +from construct import Const, Int8ul, Int16ul, Int32ul, PaddedString, Struct + +from .exceptions import LowerCaseException, TooLongNameException +from .fatfs_state import FATFSState +from .utils import (DATETIME, EMPTY_BYTE, FATFS_INCEPTION, MAX_EXT_SIZE, MAX_NAME_SIZE, SHORT_NAMES_ENCODING, + FATDefaults, build_date_entry, build_time_entry, is_valid_fatfs_name, pad_string) + + +class Entry: + """ + The class Entry represents entry of the directory. + """ + ATTR_READ_ONLY: int = 0x01 + ATTR_HIDDEN: int = 0x02 + ATTR_SYSTEM: int = 0x04 + ATTR_VOLUME_ID: int = 0x08 + ATTR_DIRECTORY: int = 0x10 # directory + ATTR_ARCHIVE: int = 0x20 # file + ATTR_LONG_NAME: int = ATTR_READ_ONLY | ATTR_HIDDEN | ATTR_SYSTEM | ATTR_VOLUME_ID + + # indexes in the entry structure and sizes in bytes, not in characters (encoded using 2 bytes for lfn) + LDIR_Name1_IDX: int = 1 + LDIR_Name1_SIZE: int = 5 + LDIR_Name2_IDX: int = 14 + LDIR_Name2_SIZE: int = 6 + LDIR_Name3_IDX: int = 28 + LDIR_Name3_SIZE: int = 2 + + # short entry in long file names + LDIR_DIR_NTRES: int = 0x18 + # one entry can hold 13 characters with size 2 bytes distributed in three regions of the 32 bytes entry + CHARS_PER_ENTRY: int = LDIR_Name1_SIZE + LDIR_Name2_SIZE + LDIR_Name3_SIZE + + # the last 16 bytes record in the LFN entry has first byte masked with the following value + LAST_RECORD_LFN_ENTRY: int = 0x40 + SHORT_ENTRY: int = -1 + # this value is used for short-like entry but with accepted lower case + SHORT_ENTRY_LN: int = 0 + + # The 1st January 1980 00:00:00 + DEFAULT_DATE: DATETIME = (FATFS_INCEPTION.year, FATFS_INCEPTION.month, FATFS_INCEPTION.day) + DEFAULT_TIME: DATETIME = (FATFS_INCEPTION.hour, FATFS_INCEPTION.minute, FATFS_INCEPTION.second) + + ENTRY_FORMAT_SHORT_NAME = Struct( + 'DIR_Name' / PaddedString(MAX_NAME_SIZE, SHORT_NAMES_ENCODING), + 'DIR_Name_ext' / PaddedString(MAX_EXT_SIZE, SHORT_NAMES_ENCODING), + 'DIR_Attr' / Int8ul, + 'DIR_NTRes' / Int8ul, # this tagged for lfn (0x00 for short entry in lfn, 0x18 for short name) + 'DIR_CrtTimeTenth' / Const(EMPTY_BYTE), # ignored by esp-idf fatfs library + 'DIR_CrtTime' / Int16ul, # ignored by esp-idf fatfs library + 'DIR_CrtDate' / Int16ul, # ignored by esp-idf fatfs library + 'DIR_LstAccDate' / Int16ul, # must be same as DIR_WrtDate + 'DIR_FstClusHI' / Const(2 * EMPTY_BYTE), + 'DIR_WrtTime' / Int16ul, + 'DIR_WrtDate' / Int16ul, + 'DIR_FstClusLO' / Int16ul, + 'DIR_FileSize' / Int32ul, + ) + + def __init__(self, + entry_id: int, + parent_dir_entries_address: int, + fatfs_state: FATFSState) -> None: + self.fatfs_state: FATFSState = fatfs_state + self.id: int = entry_id + self.entry_address: int = parent_dir_entries_address + self.id * FATDefaults.ENTRY_SIZE + self._is_alias: bool = False + self._is_empty: bool = True + + @staticmethod + def get_cluster_id(obj_: dict) -> int: + cluster_id_: int = obj_['DIR_FstClusLO'] + return cluster_id_ + + @property + def is_empty(self) -> bool: + return self._is_empty + + @staticmethod + def _parse_entry(entry_bytearray: Union[bytearray, bytes]) -> dict: + entry_: dict = Entry.ENTRY_FORMAT_SHORT_NAME.parse(entry_bytearray) + return entry_ + + @staticmethod + def _build_entry(**kwargs) -> bytes: # type: ignore + entry_: bytes = Entry.ENTRY_FORMAT_SHORT_NAME.build(dict(**kwargs)) + return entry_ + + @staticmethod + def _build_entry_long(names: List[bytes], checksum: int, order: int, is_last: bool) -> bytes: + """ + Long entry starts with 1 bytes of the order, if the entry is the last in the chain it is or-masked with 0x40, + otherwise is without change (or masked with 0x00). The following example shows 3 entries: + first two (0x2000-0x2040) are long in the reverse order and the last one (0x2040-0x2060) is short. + The entries define file name "thisisverylongfilenama.txt". + + 00002000: 42 67 00 66 00 69 00 6C 00 65 00 0F 00 43 6E 00 Bg.f.i.l.e...Cn. + 00002010: 61 00 6D 00 61 00 2E 00 74 00 00 00 78 00 74 00 a.m.a...t...x.t. + 00002020: 01 74 00 68 00 69 00 73 00 69 00 0F 00 43 73 00 .t.h.i.s.i...Cs. + 00002030: 76 00 65 00 72 00 79 00 6C 00 00 00 6F 00 6E 00 v.e.r.y.l...o.n. + 00002040: 54 48 49 53 49 53 7E 31 54 58 54 20 00 00 00 00 THISIS~1TXT..... + 00002050: 21 00 00 00 00 00 00 00 21 00 02 00 15 00 00 00 !.......!....... + """ + order |= (Entry.LAST_RECORD_LFN_ENTRY if is_last else 0x00) + long_entry: bytes = (Int8ul.build(order) + # order of the long name entry (possibly masked with 0x40) + names[0] + # first 5 characters (10 bytes) of the name part + Int8ul.build(Entry.ATTR_LONG_NAME) + # one byte entity type ATTR_LONG_NAME + Int8ul.build(0) + # one byte of zeros + Int8ul.build(checksum) + # lfn_checksum defined in utils.py + names[1] + # next 6 characters (12 bytes) of the name part + Int16ul.build(0) + # 2 bytes of zeros + names[2]) # last 2 characters (4 bytes) of the name part + return long_entry + + @staticmethod + def parse_entry_long(entry_bytes_: bytes, my_check: int) -> dict: + order_ = Int8ul.parse(entry_bytes_[0:1]) + names0 = entry_bytes_[1:11] + if Int8ul.parse(entry_bytes_[12:13]) != 0 or Int16ul.parse(entry_bytes_[26:28]) != 0 or Int8ul.parse(entry_bytes_[11:12]) != 15: + return {} + if Int8ul.parse(entry_bytes_[13:14]) != my_check: + return {} + names1 = entry_bytes_[14:26] + names2 = entry_bytes_[28:32] + return { + 'order': order_, + 'name1': names0, + 'name2': names1, + 'name3': names2, + 'is_last': bool(order_ & Entry.LAST_RECORD_LFN_ENTRY == Entry.LAST_RECORD_LFN_ENTRY) + } + + @property + def entry_bytes(self) -> bytes: + """ + :returns: Bytes defining the entry belonging to the given instance. + """ + start_: int = self.entry_address + entry_: bytes = self.fatfs_state.binary_image[start_: start_ + FATDefaults.ENTRY_SIZE] + return entry_ + + @entry_bytes.setter + def entry_bytes(self, value: bytes) -> None: + """ + :param value: new content of the entry + :returns: None + + The setter sets the content of the entry in bytes. + """ + self.fatfs_state.binary_image[self.entry_address: self.entry_address + FATDefaults.ENTRY_SIZE] = value + + def _clean_entry(self) -> None: + self.entry_bytes: bytes = FATDefaults.ENTRY_SIZE * EMPTY_BYTE + + def allocate_entry(self, + first_cluster_id: int, + entity_name: str, + entity_type: int, + entity_extension: str = '', + size: int = 0, + date: DATETIME = DEFAULT_DATE, + time: DATETIME = DEFAULT_TIME, + lfn_order: int = SHORT_ENTRY, + lfn_names: Optional[List[bytes]] = None, + lfn_checksum_: int = 0, + fits_short: bool = False, + lfn_is_last: bool = False) -> None: + """ + :param first_cluster_id: id of the first data cluster for given entry + :param entity_name: name recorded in the entry + :param entity_extension: extension recorded in the entry + :param size: size of the content of the file + :param date: denotes year (actual year minus 1980), month number day of the month (minimal valid is (0, 1, 1)) + :param time: denotes hour, minute and second with granularity 2 seconds (sec // 2) + :param entity_type: type of the entity (file [0x20] or directory [0x10]) + :param lfn_order: if long names support is enabled, defines order in long names entries sequence (-1 for short) + :param lfn_names: if the entry is dedicated for long names the lfn_names contains + LDIR_Name1, LDIR_Name2 and LDIR_Name3 in this order + :param lfn_checksum_: use only for long file names, checksum calculated lfn_checksum function + :param fits_short: determines if the name fits in 8.3 filename + :param lfn_is_last: determines if the long file name entry is holds last part of the name, + thus its address is first in the physical order + :returns: None + + :raises LowerCaseException: In case when long_names_enabled is set to False and filename exceeds 8 chars + for name or 3 chars for extension the exception is raised + :raises TooLongNameException: When long_names_enabled is set to False and name doesn't fit to 8.3 filename + an exception is raised + """ + valid_full_name: bool = is_valid_fatfs_name(entity_name) and is_valid_fatfs_name(entity_extension) + if not (valid_full_name or lfn_order >= 0): + raise LowerCaseException('Lower case is not supported in short name entry, use upper case.') + + if self.fatfs_state.use_default_datetime: + date = self.DEFAULT_DATE + time = self.DEFAULT_TIME + + # clean entry before allocation + self._clean_entry() + self._is_empty = False + + object_name = entity_name.upper() if not self.fatfs_state.long_names_enabled else entity_name + object_extension = entity_extension.upper() if not self.fatfs_state.long_names_enabled else entity_extension + + exceeds_short_name: bool = len(object_name) > MAX_NAME_SIZE or len(object_extension) > MAX_EXT_SIZE + if not self.fatfs_state.long_names_enabled and exceeds_short_name: + raise TooLongNameException( + 'Maximal length of the object name is {} characters and {} characters for extension!'.format( + MAX_NAME_SIZE, MAX_EXT_SIZE + ) + ) + + start_address = self.entry_address + end_address = start_address + FATDefaults.ENTRY_SIZE + if lfn_order in (self.SHORT_ENTRY, self.SHORT_ENTRY_LN): + date_entry_: int = build_date_entry(*date) + time_entry: int = build_time_entry(*time) + self.fatfs_state.binary_image[start_address: end_address] = self._build_entry( + DIR_Name=pad_string(object_name, size=MAX_NAME_SIZE), + DIR_Name_ext=pad_string(object_extension, size=MAX_EXT_SIZE), + DIR_Attr=entity_type, + DIR_NTRes=0x00 if (not self.fatfs_state.long_names_enabled) or (not fits_short) else 0x18, + DIR_FstClusLO=first_cluster_id, + DIR_FileSize=size, + DIR_CrtDate=date_entry_, # ignored by esp-idf fatfs library + DIR_LstAccDate=date_entry_, # must be same as DIR_WrtDate + DIR_WrtDate=date_entry_, + DIR_CrtTime=time_entry, # ignored by esp-idf fatfs library + DIR_WrtTime=time_entry + ) + else: + assert lfn_names is not None + self.fatfs_state.binary_image[start_address: end_address] = self._build_entry_long(lfn_names, + lfn_checksum_, + lfn_order, + lfn_is_last) + + def update_content_size(self, content_size: int) -> None: + """ + :param content_size: the new size of the file content in bytes + :returns: None + + This method parses the binary entry to the construct structure, updates the content size of the file + and builds new binary entry. + """ + parsed_entry = self._parse_entry(self.entry_bytes) + parsed_entry.DIR_FileSize = content_size # type: ignore + self.entry_bytes = Entry.ENTRY_FORMAT_SHORT_NAME.build(parsed_entry) diff --git a/third_party/fatfsgen/fatfs_utils/exceptions.py b/third_party/fatfsgen/fatfs_utils/exceptions.py new file mode 100644 index 0000000000000000000000000000000000000000..a3a27df5d6eeb60b3034dfdea8a822c165db252f --- /dev/null +++ b/third_party/fatfsgen/fatfs_utils/exceptions.py @@ -0,0 +1,54 @@ +# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +class WriteDirectoryException(Exception): + """ + Exception is raised when the user tries to write the content into the directory instead of file + """ + pass + + +class NoFreeClusterException(Exception): + """ + Exception is raised when the user tries allocate cluster but no free one is available + """ + pass + + +class LowerCaseException(Exception): + """ + Exception is raised when the user tries to write file or directory with lower case + """ + pass + + +class TooLongNameException(Exception): + """ + Exception is raised when long name support is not enabled and user tries to write file longer then allowed + """ + pass + + +class NotInitialized(Exception): + """ + Exception is raised when the user tries to access not initialized property + """ + pass + + +class WLNotInitialized(Exception): + """ + Exception is raised when the user tries to write fatfs not initialized with wear levelling + """ + pass + + +class FatalError(Exception): + pass + + +class InconsistentFATAttributes(Exception): + """ + Caused by e.g. wrong number of clusters for given FAT type + """ + pass diff --git a/third_party/fatfsgen/fatfs_utils/fat.py b/third_party/fatfsgen/fatfs_utils/fat.py new file mode 100644 index 0000000000000000000000000000000000000000..396075dd08773c1b3271f9bee344803a59678307 --- /dev/null +++ b/third_party/fatfsgen/fatfs_utils/fat.py @@ -0,0 +1,100 @@ +# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +from typing import List, Optional + +from .cluster import Cluster +from .exceptions import NoFreeClusterException +from .fatfs_state import BootSectorState + + +class FAT: + """ + The FAT represents the FAT region in file system. It is responsible for storing clusters + and chaining them in case we need to extend file or directory to more clusters. + """ + + def allocate_root_dir(self) -> None: + """ + The root directory is implicitly created with the FatFS, + its block is on the index 1 (second index) and is allocated implicitly. + """ + self.clusters[Cluster.ROOT_BLOCK_ID].allocate_cluster() + + def __init__(self, boot_sector_state: BootSectorState, init_: bool) -> None: + self._first_free_cluster_id = 1 + self.boot_sector_state = boot_sector_state + self.clusters: List[Cluster] = [Cluster(cluster_id=i, + boot_sector_state=self.boot_sector_state, + init_=init_) for i in range(self.boot_sector_state.clusters)] + if init_: + self.allocate_root_dir() + + def get_cluster_value(self, cluster_id_: int) -> int: + """ + The method retrieves the values of the FAT memory block. + E.g. in case of FAT12: + 00000000: F8 FF FF 55 05 00 00 00 00 00 00 00 00 00 00 00 + + The reserved value is 0xFF8, the value of first cluster if 0xFFF, thus is last in chain, + and the value of the second cluster is 0x555, so refers to the cluster number 0x555. + """ + fat_cluster_value_: int = self.clusters[cluster_id_].get_from_fat() + return fat_cluster_value_ + + def is_cluster_last(self, cluster_id_: int) -> bool: + """ + Checks if the cluster is last in its cluster chain. If the value of the cluster is + 0xFFF for FAT12, 0xFFFF for FAT16 or 0xFFFFFFFF for FAT32, the cluster is the last. + """ + value_ = self.get_cluster_value(cluster_id_) + is_cluster_last_: bool = value_ == (1 << self.boot_sector_state.fatfs_type) - 1 + return is_cluster_last_ + + def get_chained_content(self, cluster_id_: int, size: Optional[int] = None) -> bytearray: + """ + The purpose of the method is retrieving the content from chain of clusters when the FAT FS partition + is analyzed. The file entry provides the reference to the first cluster, this method + traverses linked list of clusters and append partial results to the content. + """ + binary_image: bytearray = self.boot_sector_state.binary_image + + data_address_ = Cluster.compute_cluster_data_address(self.boot_sector_state, cluster_id_) + content_ = binary_image[data_address_: data_address_ + self.boot_sector_state.sector_size] + + while not self.is_cluster_last(cluster_id_): + cluster_id_ = self.get_cluster_value(cluster_id_) + data_address_ = Cluster.compute_cluster_data_address(self.boot_sector_state, cluster_id_) + content_ += binary_image[data_address_: data_address_ + self.boot_sector_state.sector_size] + # the size is None if the object is directory + if size is None: + return content_ + return content_[:size] + + def find_free_cluster(self) -> Cluster: + """ + Returns the first free cluster and increments value of `self._first_free_cluster_id`. + The method works only in context of creating a partition from scratch. + In situations where the clusters are allocated and freed during the run of the program, + might the method cause `Out of space` error despite there would be free clusters. + """ + + if self._first_free_cluster_id + 1 >= len(self.clusters): + raise NoFreeClusterException('No free cluster available!') + cluster = self.clusters[self._first_free_cluster_id + 1] + if not cluster.is_empty: + raise NoFreeClusterException('No free cluster available!') + cluster.allocate_cluster() + self._first_free_cluster_id += 1 + return cluster + + def allocate_chain(self, first_cluster: Cluster, size: int) -> None: + """ + Allocates the linked list of clusters needed for the given file or directory. + """ + current = first_cluster + for _ in range(size - 1): + free_cluster = self.find_free_cluster() + current.next_cluster = free_cluster + current.set_in_fat(free_cluster.id) + current = free_cluster diff --git a/third_party/fatfsgen/fatfs_utils/fatfs_parser.py b/third_party/fatfsgen/fatfs_utils/fatfs_parser.py new file mode 100644 index 0000000000000000000000000000000000000000..66aea11dd3788190b2a3aac22712cdb9518a014f --- /dev/null +++ b/third_party/fatfsgen/fatfs_utils/fatfs_parser.py @@ -0,0 +1,17 @@ +# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +from .boot_sector import BootSector +from .utils import read_filesystem + + +class FATFSParser: + + def __init__(self, image_file_path: str, wl_support: bool = False) -> None: + if wl_support: + raise NotImplementedError('Parser is not implemented for WL yet.') + self.fatfs = read_filesystem(image_file_path) + + # when wl is not supported we expect boot sector to be the first + self.parsed_header = BootSector.BOOT_SECTOR_HEADER.parse(self.fatfs[:BootSector.BOOT_HEADER_SIZE]) + print(BootSector) diff --git a/third_party/fatfsgen/fatfs_utils/fatfs_state.py b/third_party/fatfsgen/fatfs_utils/fatfs_state.py new file mode 100644 index 0000000000000000000000000000000000000000..22af7bfb0de2f352549ed8338a8f15129a9eafa5 --- /dev/null +++ b/third_party/fatfsgen/fatfs_utils/fatfs_state.py @@ -0,0 +1,170 @@ +# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +from textwrap import dedent +from typing import Optional + +from .exceptions import InconsistentFATAttributes +from .utils import (ALLOWED_SECTOR_SIZES, FAT12, FAT12_MAX_CLUSTERS, FAT16, FAT16_MAX_CLUSTERS, + RESERVED_CLUSTERS_COUNT, FATDefaults, get_fat_sectors_count, get_fatfs_type, + get_non_data_sectors_cnt, number_of_clusters) + + +class FATFSState: + """ + The class represents the state and the configuration of the FATFS. + """ + + def __init__(self, + sector_size: int, + reserved_sectors_cnt: int, + root_dir_sectors_cnt: int, + size: int, + media_type: int, + sectors_per_cluster: int, + volume_label: str, + oem_name: str, + fat_tables_cnt: int, + sec_per_track: int, + num_heads: int, + hidden_sectors: int, + file_sys_type: str, + use_default_datetime: bool, + explicit_fat_type: Optional[int] = None, + long_names_enabled: bool = False): + self.boot_sector_state = BootSectorState(oem_name=oem_name, + sector_size=sector_size, + sectors_per_cluster=sectors_per_cluster, + reserved_sectors_cnt=reserved_sectors_cnt, + fat_tables_cnt=fat_tables_cnt, + root_dir_sectors_cnt=root_dir_sectors_cnt, + sectors_count=size // sector_size, + media_type=media_type, + sec_per_track=sec_per_track, + num_heads=num_heads, + hidden_sectors=hidden_sectors, + volume_label=volume_label, + file_sys_type=file_sys_type, + volume_uuid=-1) + + self._explicit_fat_type: Optional[int] = explicit_fat_type + self.long_names_enabled: bool = long_names_enabled + self.use_default_datetime: bool = use_default_datetime + + if (size // sector_size) * sectors_per_cluster in (FAT12_MAX_CLUSTERS, FAT16_MAX_CLUSTERS): + print('WARNING: It is not recommended to create FATFS with bounding ' + f'count of clusters: {FAT12_MAX_CLUSTERS} or {FAT16_MAX_CLUSTERS}') + self.check_fat_type() + + @property + def binary_image(self) -> bytearray: + return self.boot_sector_state.binary_image + + @binary_image.setter + def binary_image(self, value: bytearray) -> None: + self.boot_sector_state.binary_image = value + + def check_fat_type(self) -> None: + _type = self.boot_sector_state.fatfs_type + if self._explicit_fat_type is not None and self._explicit_fat_type != _type: + raise InconsistentFATAttributes(dedent( + f"""FAT type you specified is inconsistent with other attributes of the system. + The specified FATFS type: FAT{self._explicit_fat_type} + The actual FATFS type: FAT{_type}""")) + if _type not in (FAT12, FAT16): + raise NotImplementedError('FAT32 is currently not supported.') + + +class BootSectorState: + # pylint: disable=too-many-instance-attributes + def __init__(self, + oem_name: str, + sector_size: int, + sectors_per_cluster: int, + reserved_sectors_cnt: int, + fat_tables_cnt: int, + root_dir_sectors_cnt: int, + sectors_count: int, + media_type: int, + sec_per_track: int, + num_heads: int, + hidden_sectors: int, + volume_label: str, + file_sys_type: str, + volume_uuid: int = -1) -> None: + self.oem_name: str = oem_name + self.sector_size: int = sector_size + assert self.sector_size in ALLOWED_SECTOR_SIZES + self.sectors_per_cluster: int = sectors_per_cluster + self.reserved_sectors_cnt: int = reserved_sectors_cnt + self.fat_tables_cnt: int = fat_tables_cnt + self.root_dir_sectors_cnt: int = root_dir_sectors_cnt + self.sectors_count: int = sectors_count + self.media_type: int = media_type + self.sectors_per_fat_cnt = get_fat_sectors_count(self.size // self.sector_size, self.sector_size) + self.sec_per_track: int = sec_per_track + self.num_heads: int = num_heads + self.hidden_sectors: int = hidden_sectors + self.volume_label: str = volume_label + self.file_sys_type: str = file_sys_type + self.volume_uuid: int = volume_uuid + self._binary_image: bytearray = bytearray(b'') + + @property + def binary_image(self) -> bytearray: + return self._binary_image + + @binary_image.setter + def binary_image(self, value: bytearray) -> None: + self._binary_image = value + + @property + def size(self) -> int: + return self.sector_size * self.sectors_count + + @property + def data_region_start(self) -> int: + return self.non_data_sectors * self.sector_size + + @property + def fatfs_type(self) -> int: + # variable typed_fatfs_type must be explicitly typed to avoid mypy error + typed_fatfs_type: int = get_fatfs_type(self.clusters) + return typed_fatfs_type + + @property + def clusters(self) -> int: + """ + The actual number of clusters is calculated by `number_of_clusters`, + however, the initial two blocks of FAT are reserved (device type and root directory), + despite they don't refer to the data region. + Since that, two clusters are added to use the full potential of the FAT file system partition. + """ + clusters_cnt_: int = number_of_clusters(self.data_sectors, self.sectors_per_cluster) + RESERVED_CLUSTERS_COUNT + return clusters_cnt_ + + @property + def data_sectors(self) -> int: + # self.sector_size is checked in constructor if has one of allowed values (ALLOWED_SECTOR_SIZES) + return (self.size // self.sector_size) - self.non_data_sectors + + @property + def non_data_sectors(self) -> int: + non_data_sectors_: int = get_non_data_sectors_cnt(self.reserved_sectors_cnt, + self.sectors_per_fat_cnt, + self.root_dir_sectors_cnt) + return non_data_sectors_ + + @property + def fat_table_start_address(self) -> int: + return self.sector_size * self.reserved_sectors_cnt + + @property + def entries_root_count(self) -> int: + entries_root_count_: int = (self.root_dir_sectors_cnt * self.sector_size) // FATDefaults.ENTRY_SIZE + return entries_root_count_ + + @property + def root_directory_start(self) -> int: + root_dir_start: int = (self.reserved_sectors_cnt + self.sectors_per_fat_cnt) * self.sector_size + return root_dir_start diff --git a/third_party/fatfsgen/fatfs_utils/fs_object.py b/third_party/fatfsgen/fatfs_utils/fs_object.py new file mode 100644 index 0000000000000000000000000000000000000000..307087cfb3d8dc04fb3f509a3c73fc57e03ee534 --- /dev/null +++ b/third_party/fatfsgen/fatfs_utils/fs_object.py @@ -0,0 +1,343 @@ +# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +import os +from datetime import datetime +from typing import List, Optional, Tuple, Union + +from .entry import Entry +from .exceptions import FatalError, WriteDirectoryException +from .fat import FAT, Cluster +from .fatfs_state import FATFSState +from .long_filename_utils import (build_lfn_full_name, build_lfn_unique_entry_name_order, + get_required_lfn_entries_count, split_name_to_lfn_entries, + split_name_to_lfn_entry_blocks) +from .utils import (DATETIME, INVALID_SFN_CHARS_PATTERN, MAX_EXT_SIZE, MAX_NAME_SIZE, FATDefaults, + build_lfn_short_entry_name, build_name, lfn_checksum, required_clusters_count, + split_content_into_sectors, split_to_name_and_extension) + + +class File: + """ + The class File provides API to write into the files. It represents file in the FS. + """ + ATTR_ARCHIVE: int = 0x20 + ENTITY_TYPE: int = ATTR_ARCHIVE + + def __init__(self, name: str, fat: FAT, fatfs_state: FATFSState, entry: Entry, extension: str = '') -> None: + self.name: str = name + self.extension: str = extension + self.fatfs_state: FATFSState = fatfs_state + self.fat: FAT = fat + self.size: int = 0 + self._first_cluster: Optional[Cluster] = None + self._entry: Entry = entry + + @property + def entry(self) -> Entry: + return self._entry + + @property + def first_cluster(self) -> Optional[Cluster]: + return self._first_cluster + + @first_cluster.setter + def first_cluster(self, value: Cluster) -> None: + self._first_cluster = value + + def name_equals(self, name: str, extension: str) -> bool: + equals_: bool = build_name(name, extension) == build_name(self.name, self.extension) + return equals_ + + def write(self, content: bytes) -> None: + self.entry.update_content_size(len(content)) + # we assume that the correct amount of clusters is allocated + current_cluster = self._first_cluster + for content_part in split_content_into_sectors(content, self.fatfs_state.boot_sector_state.sector_size): + content_as_list = content_part + if current_cluster is None: + raise FatalError('No free space left!') + + address: int = current_cluster.cluster_data_address + self.fatfs_state.binary_image[address: address + len(content_part)] = content_as_list + current_cluster = current_cluster.next_cluster + + +class Directory: + """ + The Directory class provides API to add files and directories into the directory + and to find the file according to path and write it. + """ + ATTR_DIRECTORY: int = 0x10 + ATTR_ARCHIVE: int = 0x20 + ENTITY_TYPE: int = ATTR_DIRECTORY + + CURRENT_DIRECTORY = '.' + PARENT_DIRECTORY = '..' + + def __init__(self, + name, + fat, + fatfs_state, + entry=None, + cluster=None, + size=None, + extension='', + parent=None): + # type: (str, FAT, FATFSState, Optional[Entry], Cluster, Optional[int], str, Directory) -> None + self.name: str = name + self.fatfs_state: FATFSState = fatfs_state + self.extension: str = extension + + self.fat: FAT = fat + self.size: int = size or self.fatfs_state.boot_sector_state.sector_size + + # if directory is root its parent is itself + self.parent: Directory = parent or self + self._first_cluster: Cluster = cluster + + # entries will be initialized after the cluster allocation + self.entries: List[Entry] = [] + self.entities: List[Union[File, Directory]] = [] # type: ignore + self._entry = entry # currently not in use (will use later for e.g. modification time, etc.) + + @property + def is_root(self) -> bool: + return self.parent is self + + @property + def first_cluster(self) -> Cluster: + return self._first_cluster + + @first_cluster.setter + def first_cluster(self, value: Cluster) -> None: + self._first_cluster = value + + def name_equals(self, name: str, extension: str) -> bool: + equals_: bool = build_name(name, extension) == build_name(self.name, self.extension) + return equals_ + + @property + def entries_count(self) -> int: + entries_count_: int = self.size // FATDefaults.ENTRY_SIZE + return entries_count_ + + def create_entries(self, cluster: Cluster) -> List[Entry]: + return [Entry(entry_id=i, + parent_dir_entries_address=cluster.cluster_data_address, + fatfs_state=self.fatfs_state) + for i in range(self.entries_count)] + + def init_directory(self) -> None: + self.entries = self.create_entries(self._first_cluster) + + # the root directory doesn't contain link to itself nor the parent + if self.is_root: + return + # if the directory is not root we initialize the reference to itself and to the parent directory + for dir_id, name_ in ((self, self.CURRENT_DIRECTORY), (self.parent, self.PARENT_DIRECTORY)): + new_dir_: Entry = self.find_free_entry() or self.chain_directory() + new_dir_.allocate_entry(first_cluster_id=dir_id.first_cluster.id, + entity_name=name_, + entity_extension='', + entity_type=dir_id.ENTITY_TYPE) + + def lookup_entity(self, object_name: str, extension: str): # type: ignore + for entity in self.entities: + if build_name(entity.name, entity.extension) == build_name(object_name, extension): + return entity + return None + + @staticmethod + def _is_end_of_path(path_as_list: List[str]) -> bool: + """ + :param path_as_list: path split into the list + + :returns: True if the file is the leaf of the directory tree, False otherwise + The method is part of the base of recursion, + determines if the path is target file or directory in the tree folder structure. + """ + return len(path_as_list) == 1 + + def recursive_search(self, path_as_list, current_dir): # type: ignore + name, extension = split_to_name_and_extension(path_as_list[0]) + next_obj = current_dir.lookup_entity(name, extension) + if next_obj is None: + raise FileNotFoundError('No such file or directory!') + if self._is_end_of_path(path_as_list) and next_obj.name_equals(name, extension): + return next_obj + return self.recursive_search(path_as_list[1:], next_obj) + + def find_free_entry(self) -> Optional[Entry]: + for entry in self.entries: + if entry.is_empty: + return entry + return None + + def _extend_directory(self) -> None: + current: Cluster = self.first_cluster + while current.next_cluster is not None: + current = current.next_cluster + new_cluster: Cluster = self.fat.find_free_cluster() + current.set_in_fat(new_cluster.id) + assert current is not new_cluster + current.next_cluster = new_cluster + self.entries += self.create_entries(new_cluster) + + def chain_directory(self) -> Entry: + """ + :returns: First free entry + + The method adds new Cluster to the Directory and returns first free entry. + """ + self._extend_directory() + free_entry: Entry = self.find_free_entry() + if free_entry is None: + raise FatalError('No more space left!') + return free_entry + + @staticmethod + def allocate_long_name_object(free_entry, + name, + extension, + target_dir, + free_cluster_id, + entity_type, + date, + time): + # type: (Entry, str, str, Directory, int, int, DATETIME, DATETIME) -> Entry + lfn_full_name: str = build_lfn_full_name(name, extension) + lfn_unique_entry_order: int = build_lfn_unique_entry_name_order(target_dir.entities, name) + lfn_short_entry_name: str = build_lfn_short_entry_name(name, extension, lfn_unique_entry_order) + checksum: int = lfn_checksum(lfn_short_entry_name) + entries_count: int = get_required_lfn_entries_count(lfn_full_name) + + # entries in long file name entries chain starts with the last entry + split_names_reversed = list(reversed(list(enumerate(split_name_to_lfn_entries(lfn_full_name, entries_count))))) + for i, name_split_to_entry in split_names_reversed: + order: int = i + 1 + blocks_: List[bytes] = split_name_to_lfn_entry_blocks(name_split_to_entry) + lfn_names: List[bytes] = list(map(lambda x: x.lower(), blocks_)) + free_entry.allocate_entry(first_cluster_id=free_cluster_id, + entity_name=name, + entity_extension=extension, + entity_type=entity_type, + lfn_order=order, + lfn_names=lfn_names, + lfn_checksum_=checksum, + lfn_is_last=order == entries_count) + free_entry = target_dir.find_free_entry() or target_dir.chain_directory() + free_entry.allocate_entry(first_cluster_id=free_cluster_id, + entity_name=lfn_short_entry_name[:MAX_NAME_SIZE], + entity_extension=lfn_short_entry_name[MAX_NAME_SIZE:], + entity_type=entity_type, + lfn_order=Entry.SHORT_ENTRY_LN, + date=date, + time=time) + return free_entry + + @staticmethod + def _is_valid_sfn(name: str, extension: str) -> bool: + if INVALID_SFN_CHARS_PATTERN.search(name) or INVALID_SFN_CHARS_PATTERN.search(name): + return False + ret: bool = len(name) <= MAX_NAME_SIZE and len(extension) <= MAX_EXT_SIZE + return ret + + def allocate_object(self, + name, + entity_type, + object_timestamp_, + path_from_root=None, + extension='', + is_empty=False): + # type: (str, int, datetime, Optional[List[str]], str, bool) -> Tuple[Cluster, Entry, Directory] + """ + Method finds the target directory in the path + and allocates cluster (both the record in FAT and cluster in the data region) + and entry in the specified directory + """ + + free_cluster: Optional[Cluster] = None + free_cluster_id = 0x00 + if not is_empty: + free_cluster = self.fat.find_free_cluster() + free_cluster_id = free_cluster.id + + target_dir: Directory = self if not path_from_root else self.recursive_search(path_from_root, self) + free_entry: Entry = target_dir.find_free_entry() or target_dir.chain_directory() + + fatfs_date_ = (object_timestamp_.year, object_timestamp_.month, object_timestamp_.day) + fatfs_time_ = (object_timestamp_.hour, object_timestamp_.minute, object_timestamp_.second) + + if not self.fatfs_state.long_names_enabled or self._is_valid_sfn(name, extension): + free_entry.allocate_entry(first_cluster_id=free_cluster_id, + entity_name=name, + entity_extension=extension, + date=fatfs_date_, + time=fatfs_time_, + fits_short=True, + entity_type=entity_type) + return free_cluster, free_entry, target_dir + return free_cluster, self.allocate_long_name_object(free_entry=free_entry, + name=name, + extension=extension, + target_dir=target_dir, + free_cluster_id=free_cluster_id, + entity_type=entity_type, + date=fatfs_date_, + time=fatfs_time_), target_dir + + def new_file(self, + name: str, + extension: str, + path_from_root: Optional[List[str]], + object_timestamp_: datetime, + is_empty: bool) -> None: + free_cluster, free_entry, target_dir = self.allocate_object(name=name, + extension=extension, + entity_type=Directory.ATTR_ARCHIVE, + path_from_root=path_from_root, + object_timestamp_=object_timestamp_, + is_empty=is_empty) + + file: File = File(name=name, + fat=self.fat, + extension=extension, + fatfs_state=self.fatfs_state, + entry=free_entry) + file.first_cluster = free_cluster + target_dir.entities.append(file) + + def new_directory(self, name, parent, path_from_root, object_timestamp_): + # type: (str, Directory, Optional[List[str]], datetime) -> None + free_cluster, free_entry, target_dir = self.allocate_object(name=name, + entity_type=Directory.ATTR_DIRECTORY, + path_from_root=path_from_root, + object_timestamp_=object_timestamp_) + + directory: Directory = Directory(name=name, + fat=self.fat, + parent=parent, + fatfs_state=self.fatfs_state, + entry=free_entry) + directory.first_cluster = free_cluster + directory.init_directory() + target_dir.entities.append(directory) + + def write_to_file(self, path: List[str], content: bytes) -> None: + """ + Writes to file existing in the directory structure. + + :param path: path split into the list + :param content: content as a string to be written into a file + :returns: None + :raises WriteDirectoryException: raised is the target object for writing is a directory + """ + entity_to_write: Entry = self.recursive_search(path, self) + if isinstance(entity_to_write, File): + clusters_cnt: int = required_clusters_count(cluster_size=self.fatfs_state.boot_sector_state.sector_size, + content=content) + self.fat.allocate_chain(entity_to_write.first_cluster, clusters_cnt) + entity_to_write.write(content) + else: + raise WriteDirectoryException(f'`{os.path.join(*path)}` is a directory!') diff --git a/third_party/fatfsgen/fatfs_utils/long_filename_utils.py b/third_party/fatfsgen/fatfs_utils/long_filename_utils.py new file mode 100644 index 0000000000000000000000000000000000000000..649312aeada0b17bf54ea2c7c8bcd1370ba29e40 --- /dev/null +++ b/third_party/fatfsgen/fatfs_utils/long_filename_utils.py @@ -0,0 +1,98 @@ +# SPDX-FileCopyrightText: 2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 +from typing import List + +from .entry import Entry +from .exceptions import NoFreeClusterException +from .utils import build_name, convert_to_utf16_and_pad + +# File name with long filenames support can be as long as memory allows. It is split into entries +# holding 13 characters of the filename, thus the number of required entries is ceil(len(long_name) / 13). +# This is computed using `get_required_lfn_entries_count`. +# For creating long name entries we need to split the name by 13 characters using `split_name_to_lfn_entries` +# and in every entry into three blocks with sizes 5, 6 and 2 characters using `split_name_to_lfn_entry`. + +MAXIMAL_FILES_SAME_PREFIX: int = 127 + + +def get_required_lfn_entries_count(lfn_full_name: str) -> int: + """ + Compute the number of entries required to store the long name. + One long filename entry can hold 13 characters with size 2 bytes. + + E.g. "thisisverylongfilenama.txt" with length of 26 needs 2 lfn entries, + but "thisisverylongfilenamax.txt" with 27 characters needs 3 lfn entries. + """ + entries_count: int = (len(lfn_full_name) + Entry.CHARS_PER_ENTRY - 1) // Entry.CHARS_PER_ENTRY + return entries_count + + +def split_name_to_lfn_entries(name: str, entries: int) -> List[str]: + """ + If the filename is longer than 8 (name) + 3 (extension) characters, + generator uses long name structure and splits the name into suitable amount of blocks. + + E.g. 'thisisverylongfilenama.txt' would be split to ['THISISVERYLON', 'GFILENAMA.TXT'], + in case of 'thisisverylongfilenamax.txt' - ['THISISVERYLON', 'GFILENAMAX.TX', 'T'] + """ + return [name[i * Entry.CHARS_PER_ENTRY:(i + 1) * Entry.CHARS_PER_ENTRY] for i in range(entries)] + + +def split_name_to_lfn_entry_blocks(name: str) -> List[bytes]: + """ + Filename is divided into three blocks in every long file name entry. Sizes of the blocks are defined + by LDIR_Name1_SIZE, LDIR_Name2_SIZE and LDIR_Name3_SIZE, thus every block contains LDIR_Name{X}_SIZE * 2 bytes. + + If the filename ends in one of the blocks, it is terminated by zero encoded to two bytes (0x0000). Other unused + characters are set to 0xFFFF. + E.g.: + 'GFILENAMA.TXT' -> [b'G\x00F\x00I\x00L\x00E\x00', b'N\x00A\x00M\x00A\x00.\x00T\x00', b'X\x00T\x00']; + 'T' -> [b'T\x00\x00\x00\xff\xff\xff\xff\xff\xff', b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff', + b'\xff\xff\xff\xff'] + + Notice that since every character is coded using 2 bytes be must add 0x00 to ASCII symbols ('G' -> 'G\x00', etc.), + since character 'T' ends in the first block, we must add '\x00\x00' after 'T\x00'. + """ + max_entry_size: int = Entry.LDIR_Name1_SIZE + Entry.LDIR_Name2_SIZE + Entry.LDIR_Name2_SIZE + assert len(name) <= max_entry_size + blocks_: List[bytes] = [ + convert_to_utf16_and_pad(content=name[:Entry.LDIR_Name1_SIZE], + expected_size=Entry.LDIR_Name1_SIZE), + convert_to_utf16_and_pad(content=name[Entry.LDIR_Name1_SIZE:Entry.LDIR_Name1_SIZE + Entry.LDIR_Name2_SIZE], + expected_size=Entry.LDIR_Name2_SIZE), + convert_to_utf16_and_pad(content=name[Entry.LDIR_Name1_SIZE + Entry.LDIR_Name2_SIZE:], + expected_size=Entry.LDIR_Name3_SIZE) + ] + return blocks_ + + +def build_lfn_unique_entry_name_order(entities: list, lfn_entry_name: str) -> int: + """ + The short entry contains only the first 6 characters of the file name, + and we have to distinguish it from other names within the directory starting with the same 6 characters. + To make it unique, we add its order in relation to other names such that lfn_entry_name[:6] == other[:6]. + The order is specified by the character, starting with chr(1). + + E.g. the file in directory 'thisisverylongfilenama.txt' will be named 'THISIS~1TXT' in its short entry. + If we add another file 'thisisverylongfilenamax.txt' its name in the short entry will be 'THISIS~2TXT'. + """ + preceding_entries: int = 1 + for entity in entities: + if entity.name[:6] == lfn_entry_name[:6]: + preceding_entries += 1 + if preceding_entries > MAXIMAL_FILES_SAME_PREFIX: + raise NoFreeClusterException('Maximal number of files with the same prefix is 127') + return preceding_entries + + +def build_lfn_full_name(name: str, extension: str) -> str: + """ + The extension is optional, and the long filename entry explicitly specifies it, + on the opposite as for short file names. + """ + lfn_record: str = build_name(name, extension) + # the name must be terminated with NULL terminator + # if it doesn't fit into the set of long name directory entries + if len(lfn_record) % Entry.CHARS_PER_ENTRY != 0: + return lfn_record + chr(0) + return lfn_record diff --git a/third_party/fatfsgen/fatfs_utils/utils.py b/third_party/fatfsgen/fatfs_utils/utils.py new file mode 100644 index 0000000000000000000000000000000000000000..d2180c76bdaa986e05e12bc719975d24f705b4ec --- /dev/null +++ b/third_party/fatfsgen/fatfs_utils/utils.py @@ -0,0 +1,299 @@ +# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +import argparse +import binascii +import os +import re +import uuid +from datetime import datetime +from typing import List, Optional, Tuple + +from construct import BitsInteger, BitStruct, Int16ul + +# the regex pattern defines symbols that are allowed by long file names but not by short file names +INVALID_SFN_CHARS_PATTERN = re.compile(r'[.+,;=\[\]]') + +FATFS_MIN_ALLOC_UNIT: int = 128 +FAT12_MAX_CLUSTERS: int = 4085 +FAT16_MAX_CLUSTERS: int = 65525 +RESERVED_CLUSTERS_COUNT: int = 2 +PAD_CHAR: int = 0x20 +FAT12: int = 12 +FAT16: int = 16 +FAT32: int = 32 +FULL_BYTE: bytes = b'\xff' +EMPTY_BYTE: bytes = b'\x00' +# redundant +BYTES_PER_DIRECTORY_ENTRY: int = 32 +UINT32_MAX: int = (1 << 32) - 1 +MAX_NAME_SIZE: int = 8 +MAX_EXT_SIZE: int = 3 +DATETIME = Tuple[int, int, int] +FATFS_INCEPTION_YEAR: int = 1980 + +FATFS_INCEPTION: datetime = datetime(FATFS_INCEPTION_YEAR, 1, 1, 0, 0, 0, 0) + +FATFS_MAX_HOURS = 24 +FATFS_MAX_MINUTES = 60 +FATFS_MAX_SECONDS = 60 + +FATFS_MAX_DAYS = 31 +FATFS_MAX_MONTHS = 12 +FATFS_MAX_YEARS = 127 + +FATFS_SECONDS_GRANULARITY: int = 2 + +# long names are encoded to two bytes in utf-16 +LONG_NAMES_ENCODING: str = 'utf-16' +SHORT_NAMES_ENCODING: str = 'utf-8' + +# compatible with WL_SECTOR_SIZE +# choices for WL are WL_SECTOR_SIZE_512 and WL_SECTOR_SIZE_4096 +ALLOWED_WL_SECTOR_SIZES: List[int] = [512, 4096] +ALLOWED_SECTOR_SIZES: List[int] = [512, 1024, 2048, 4096] + +ALLOWED_SECTORS_PER_CLUSTER: List[int] = [1, 2, 4, 8, 16, 32, 64, 128] + + +def crc32(input_values: List[int], crc: int) -> int: + """ + Name Polynomial Reversed? Init-value XOR-out + crc32 0x104C11DB7 True 4294967295 (UINT32_MAX) 0xFFFFFFFF + """ + return binascii.crc32(bytearray(input_values), crc) + + +def number_of_clusters(number_of_sectors: int, sectors_per_cluster: int) -> int: + return number_of_sectors // sectors_per_cluster + + +def get_non_data_sectors_cnt(reserved_sectors_cnt: int, sectors_per_fat_cnt: int, root_dir_sectors_cnt: int) -> int: + return reserved_sectors_cnt + sectors_per_fat_cnt + root_dir_sectors_cnt + + +def get_fatfs_type(clusters_count: int) -> int: + if clusters_count < FAT12_MAX_CLUSTERS: + return FAT12 + if clusters_count <= FAT16_MAX_CLUSTERS: + return FAT16 + return FAT32 + + +def get_fat_sectors_count(clusters_count: int, sector_size: int) -> int: + fatfs_type_ = get_fatfs_type(clusters_count) + if fatfs_type_ == FAT32: + raise NotImplementedError('FAT32 is not supported!') + # number of byte halves + cluster_s: int = fatfs_type_ // 4 + fat_size_bytes: int = ( + clusters_count * 2 + cluster_s) if fatfs_type_ == FAT16 else (clusters_count * 3 + 1) // 2 + cluster_s + return (fat_size_bytes + sector_size - 1) // sector_size + + +def required_clusters_count(cluster_size: int, content: bytes) -> int: + # compute number of required clusters for file text + return (len(content) + cluster_size - 1) // cluster_size + + +def generate_4bytes_random() -> int: + return uuid.uuid4().int & 0xFFFFFFFF + + +def pad_string(content: str, size: Optional[int] = None, pad: int = PAD_CHAR) -> str: + # cut string if longer and fill with pad character if shorter than size + return content.ljust(size or len(content), chr(pad))[:size] + + +def right_strip_string(content: str, pad: int = PAD_CHAR) -> str: + return content.rstrip(chr(pad)) + + +def build_lfn_short_entry_name(name: str, extension: str, order: int) -> str: + return '{}{}'.format(pad_string(content=name[:MAX_NAME_SIZE - 2] + '~' + chr(order), size=MAX_NAME_SIZE), + pad_string(extension[:MAX_EXT_SIZE], size=MAX_EXT_SIZE)) + + +def lfn_checksum(short_entry_name: str) -> int: + """ + Function defined by FAT specification. Computes checksum out of name in the short file name entry. + """ + checksum_result = 0 + for i in range(MAX_NAME_SIZE + MAX_EXT_SIZE): + # operation is a right rotation on 8 bits (Python equivalent for unsigned char in C) + checksum_result = (0x80 if checksum_result & 1 else 0x00) + (checksum_result >> 1) + ord(short_entry_name[i]) + checksum_result &= 0xff + return checksum_result + + +def convert_to_utf16_and_pad(content: str, + expected_size: int, + pad: bytes = FULL_BYTE) -> bytes: + # we need to get rid of the Byte order mark 0xfeff or 0xfffe, fatfs does not use it + bom_utf16: bytes = b'\xfe\xff' + encoded_content_utf16: bytes = content.encode(LONG_NAMES_ENCODING)[len(bom_utf16):] + return encoded_content_utf16.ljust(2 * expected_size, pad) + + +def split_to_name_and_extension(full_name: str) -> Tuple[str, str]: + name, extension = os.path.splitext(full_name) + return name, extension.replace('.', '') + + +def is_valid_fatfs_name(string: str) -> bool: + return string == string.upper() + + +def split_by_half_byte_12_bit_little_endian(value: int) -> Tuple[int, int, int]: + value_as_bytes: bytes = Int16ul.build(value) + return value_as_bytes[0] & 0x0f, value_as_bytes[0] >> 4, value_as_bytes[1] & 0x0f + + +def merge_by_half_byte_12_bit_little_endian(v1: int, v2: int, v3: int) -> int: + return v1 | v2 << 4 | v3 << 8 + + +def build_byte(first_half: int, second_half: int) -> int: + return (first_half << 4) | second_half + + +def split_content_into_sectors(content: bytes, sector_size: int) -> List[bytes]: + result = [] + clusters_cnt: int = required_clusters_count(cluster_size=sector_size, content=content) + + for i in range(clusters_cnt): + result.append(content[sector_size * i:(i + 1) * sector_size]) + return result + + +def get_args_for_partition_generator(desc: str, wl: bool) -> argparse.Namespace: + parser: argparse.ArgumentParser = argparse.ArgumentParser(description=desc) + parser.add_argument('input_directory', + help='Path to the directory that will be encoded into fatfs image') + parser.add_argument('--output_file', + default='fatfs_image.img', + help='Filename of the generated fatfs image') + parser.add_argument('--partition_size', + default=FATDefaults.SIZE, + help='Size of the partition in bytes.' + + ('' if wl else ' Use `--partition_size detect` for detecting the minimal partition size.') + ) + parser.add_argument('--sector_size', + default=FATDefaults.SECTOR_SIZE, + type=int, + choices=ALLOWED_WL_SECTOR_SIZES if wl else ALLOWED_SECTOR_SIZES, + help='Size of the partition in bytes') + parser.add_argument('--sectors_per_cluster', + default=1, + type=int, + choices=ALLOWED_SECTORS_PER_CLUSTER, + help='Number of sectors per cluster') + parser.add_argument('--root_entry_count', + default=FATDefaults.ROOT_ENTRIES_COUNT, + help='Number of entries in the root directory') + parser.add_argument('--long_name_support', + action='store_true', + help='Set flag to enable long names support.') + parser.add_argument('--use_default_datetime', + action='store_true', + help='For test purposes. If the flag is set the files are created with ' + 'the default timestamp that is the 1st of January 1980') + parser.add_argument('--fat_type', + default=0, + type=int, + choices=[FAT12, FAT16, 0], + help=""" + Type of fat. Select 12 for fat12, 16 for fat16. Don't set, or set to 0 for automatic + calculation using cluster size and partition size. + """) + + args = parser.parse_args() + if args.fat_type == 0: + args.fat_type = None + if args.partition_size == 'detect' and not wl: + args.partition_size = -1 + args.partition_size = int(str(args.partition_size), 0) + if not os.path.isdir(args.input_directory): + raise NotADirectoryError(f'The target directory `{args.input_directory}` does not exist!') + return args + + +def read_filesystem(path: str) -> bytearray: + with open(path, 'rb') as fs_file: + return bytearray(fs_file.read()) + + +DATE_ENTRY = BitStruct( + 'year' / BitsInteger(7), + 'month' / BitsInteger(4), + 'day' / BitsInteger(5)) + +TIME_ENTRY = BitStruct( + 'hour' / BitsInteger(5), + 'minute' / BitsInteger(6), + 'second' / BitsInteger(5), +) + + +def build_name(name: str, extension: str) -> str: + return f'{name}.{extension}' if len(extension) > 0 else name + + +def build_date_entry(year: int, mon: int, mday: int) -> int: + """ + :param year: denotes year starting from 1980 (0 ~ 1980, 1 ~ 1981, etc), valid values are 1980 + 0..127 inclusive + thus theoretically 1980 - 2107 + :param mon: denotes number of month of year in common order (1 ~ January, 2 ~ February, etc.), + valid values: 1..12 inclusive + :param mday: denotes number of day in month, valid values are 1..31 inclusive + + :returns: 16 bit integer number (7 bits for year, 4 bits for month and 5 bits for day of the month) + """ + assert year in range(FATFS_INCEPTION_YEAR, FATFS_INCEPTION_YEAR + FATFS_MAX_YEARS) + assert mon in range(1, FATFS_MAX_MONTHS + 1) + assert mday in range(1, FATFS_MAX_DAYS + 1) + return int.from_bytes(DATE_ENTRY.build(dict(year=year - FATFS_INCEPTION_YEAR, month=mon, day=mday)), 'big') + + +def build_time_entry(hour: int, minute: int, sec: int) -> int: + """ + :param hour: denotes number of hour, valid values are 0..23 inclusive + :param minute: denotes minutes, valid range 0..59 inclusive + :param sec: denotes seconds with granularity 2 seconds (e.g. 1 ~ 2, 29 ~ 58), valid range 0..29 inclusive + + :returns: 16 bit integer number (5 bits for hour, 6 bits for minute and 5 bits for second) + """ + assert hour in range(FATFS_MAX_HOURS) + assert minute in range(FATFS_MAX_MINUTES) + assert sec in range(FATFS_MAX_SECONDS) + return int.from_bytes(TIME_ENTRY.build( + dict(hour=hour, minute=minute, second=sec // FATFS_SECONDS_GRANULARITY)), + byteorder='big' + ) + + +class FATDefaults: + # FATFS defaults + SIZE: int = 1024 * 1024 + RESERVED_SECTORS_COUNT: int = 1 + FAT_TABLES_COUNT: int = 1 + SECTORS_PER_CLUSTER: int = 1 + SECTOR_SIZE: int = 0x1000 + HIDDEN_SECTORS: int = 0 + ENTRY_SIZE: int = 32 + NUM_HEADS: int = 0xff + OEM_NAME: str = 'MSDOS5.0' + SEC_PER_TRACK: int = 0x3f + VOLUME_LABEL: str = 'Espressif' + FILE_SYS_TYPE: str = 'FAT' + ROOT_ENTRIES_COUNT: int = 512 # number of entries in the root directory, recommended 512 + MEDIA_TYPE: int = 0xf8 + SIGNATURE_WORD: bytes = b'\x55\xAA' + + # wear levelling defaults + VERSION: int = 2 + TEMP_BUFFER_SIZE: int = 32 + UPDATE_RATE: int = 16 + WR_SIZE: int = 16 + # wear leveling metadata (config sector) contains always sector size 4096 + WL_SECTOR_SIZE: int = 4096 diff --git a/third_party/fatfsgen/fatfsgen.py b/third_party/fatfsgen/fatfsgen.py new file mode 100755 index 0000000000000000000000000000000000000000..199916ef38e89b9e1021e23fddf24866c3fa9e36 --- /dev/null +++ b/third_party/fatfsgen/fatfsgen.py @@ -0,0 +1,245 @@ +#!/usr/bin/env python +# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +import os +from datetime import datetime +from typing import Any, List, Optional + +from fatfs_utils.boot_sector import BootSector +from fatfs_utils.exceptions import NoFreeClusterException +from fatfs_utils.fat import FAT +from fatfs_utils.fatfs_state import FATFSState +from fatfs_utils.fs_object import Directory +from fatfs_utils.long_filename_utils import get_required_lfn_entries_count +from fatfs_utils.utils import (BYTES_PER_DIRECTORY_ENTRY, FATFS_INCEPTION, FATFS_MIN_ALLOC_UNIT, + RESERVED_CLUSTERS_COUNT, FATDefaults, get_args_for_partition_generator, + get_fat_sectors_count, get_non_data_sectors_cnt, read_filesystem, + required_clusters_count) + + +class FATFS: + """ + The class FATFS provides API for generating FAT file system. + It contains reference to the FAT table and to the root directory. + """ + + def __init__(self, + binary_image_path: Optional[str] = None, + size: int = FATDefaults.SIZE, + reserved_sectors_cnt: int = FATDefaults.RESERVED_SECTORS_COUNT, + fat_tables_cnt: int = FATDefaults.FAT_TABLES_COUNT, + sectors_per_cluster: int = FATDefaults.SECTORS_PER_CLUSTER, + sector_size: int = FATDefaults.SECTOR_SIZE, + hidden_sectors: int = FATDefaults.HIDDEN_SECTORS, + long_names_enabled: bool = False, + use_default_datetime: bool = True, + num_heads: int = FATDefaults.NUM_HEADS, + oem_name: str = FATDefaults.OEM_NAME, + sec_per_track: int = FATDefaults.SEC_PER_TRACK, + volume_label: str = FATDefaults.VOLUME_LABEL, + file_sys_type: str = FATDefaults.FILE_SYS_TYPE, + root_entry_count: int = FATDefaults.ROOT_ENTRIES_COUNT, + explicit_fat_type: int = None, + media_type: int = FATDefaults.MEDIA_TYPE) -> None: + # root directory bytes should be aligned by sector size + assert (root_entry_count * BYTES_PER_DIRECTORY_ENTRY) % sector_size == 0 + # number of bytes in the root dir must be even multiple of BPB_BytsPerSec + assert ((root_entry_count * BYTES_PER_DIRECTORY_ENTRY) // sector_size) % 2 == 0 + + root_dir_sectors_cnt: int = (root_entry_count * BYTES_PER_DIRECTORY_ENTRY) // sector_size + + self.state: FATFSState = FATFSState(sector_size=sector_size, + explicit_fat_type=explicit_fat_type, + reserved_sectors_cnt=reserved_sectors_cnt, + root_dir_sectors_cnt=root_dir_sectors_cnt, + size=size, + file_sys_type=file_sys_type, + num_heads=num_heads, + fat_tables_cnt=fat_tables_cnt, + sectors_per_cluster=sectors_per_cluster, + media_type=media_type, + hidden_sectors=hidden_sectors, + sec_per_track=sec_per_track, + long_names_enabled=long_names_enabled, + volume_label=volume_label, + oem_name=oem_name, + use_default_datetime=use_default_datetime) + binary_image: bytes = bytearray( + read_filesystem(binary_image_path) if binary_image_path else self.create_empty_fatfs()) + self.state.binary_image = binary_image + + self.fat: FAT = FAT(boot_sector_state=self.state.boot_sector_state, init_=True) + + root_dir_size = self.state.boot_sector_state.root_dir_sectors_cnt * self.state.boot_sector_state.sector_size + self.root_directory: Directory = Directory(name='A', # the name is not important, must be string + size=root_dir_size, + fat=self.fat, + cluster=self.fat.clusters[1], + fatfs_state=self.state) + self.root_directory.init_directory() + + def create_file(self, name: str, + extension: str = '', + path_from_root: Optional[List[str]] = None, + object_timestamp_: datetime = FATFS_INCEPTION, + is_empty: bool = False) -> None: + """ + This method allocates necessary clusters and creates a new file record in the directory required. + The directory must exists. + + When path_from_root is None the dir is root. + + :param name: The name of the file. + :param extension: The extension of the file. + :param path_from_root: List of strings containing names of the ancestor directories in the given order. + :param object_timestamp_: is not None, this will be propagated to the file's entry + :param is_empty: True if there is no need to allocate any cluster, otherwise False + """ + self.root_directory.new_file(name=name, + extension=extension, + path_from_root=path_from_root, + object_timestamp_=object_timestamp_, + is_empty=is_empty) + + def create_directory(self, name: str, + path_from_root: Optional[List[str]] = None, + object_timestamp_: datetime = FATFS_INCEPTION) -> None: + """ + Initially recursively finds a parent of the new directory + and then create a new directory inside the parent. + + When path_from_root is None the parent dir is root. + + :param name: The full name of the directory (excluding its path) + :param path_from_root: List of strings containing names of the ancestor directories in the given order. + :param object_timestamp_: in case the user preserves the timestamps, this will be propagated to the + metadata of the directory (to the corresponding entry) + :returns: None + """ + parent_dir = self.root_directory + if path_from_root: + parent_dir = self.root_directory.recursive_search(path_from_root, self.root_directory) + + self.root_directory.new_directory(name=name, + parent=parent_dir, + path_from_root=path_from_root, + object_timestamp_=object_timestamp_) + + def write_content(self, path_from_root: List[str], content: bytes) -> None: + """ + fat fs invokes root directory to recursively find the required file and writes the content + """ + self.root_directory.write_to_file(path_from_root, content) + + def create_empty_fatfs(self) -> Any: + boot_sector_ = BootSector(boot_sector_state=self.state.boot_sector_state) + boot_sector_.generate_boot_sector() + return boot_sector_.binary_image + + def write_filesystem(self, output_path: str) -> None: + with open(output_path, 'wb') as output: + output.write(bytearray(self.state.binary_image)) + + def _generate_partition_from_folder(self, + folder_relative_path: str, + folder_path: str = '', + is_dir: bool = False) -> None: + """ + Given path to folder and folder name recursively encodes folder into binary image. + Used by method generate. + """ + real_path: str = os.path.join(folder_path, folder_relative_path) + lower_path: str = folder_relative_path + + folder_relative_path = folder_relative_path.upper() + + normal_path = os.path.normpath(folder_relative_path) + split_path = normal_path.split(os.sep) + object_timestamp = datetime.fromtimestamp(os.path.getctime(real_path)) + + if os.path.isfile(real_path): + with open(real_path, 'rb') as file: + content = file.read() + file_name, extension = os.path.splitext(split_path[-1]) + extension = extension[1:] # remove the dot from the extension + self.create_file(name=file_name, + extension=extension, + path_from_root=split_path[1:-1] or None, + object_timestamp_=object_timestamp, + is_empty=len(content) == 0) + self.write_content(split_path[1:], content) + elif os.path.isdir(real_path): + if not is_dir: + self.create_directory(name=split_path[-1], + path_from_root=split_path[1:-1], + object_timestamp_=object_timestamp) + + # sorting files for better testability + dir_content = list(sorted(os.listdir(real_path))) + for path in dir_content: + self._generate_partition_from_folder(os.path.join(lower_path, path), folder_path=folder_path) + + def generate(self, input_directory: str) -> None: + """ + Normalize path to folder and recursively encode folder to binary image + """ + path_to_folder, folder_name = os.path.split(input_directory) + self._generate_partition_from_folder(folder_name, folder_path=path_to_folder, is_dir=True) + + +def calculate_min_space(path: List[str], + fs_entity: str, + sector_size: int = 0x1000, + long_file_names: bool = False, + is_root: bool = False) -> int: + if os.path.isfile(os.path.join(*path, fs_entity)): + with open(os.path.join(*path, fs_entity), 'rb') as file_: + content = file_.read() + res: int = required_clusters_count(sector_size, content) + return res + buff: int = 0 + dir_size = 2 * FATDefaults.ENTRY_SIZE # record for symlinks "." and ".." + for file in sorted(os.listdir(os.path.join(*path, fs_entity))): + if long_file_names and True: + # LFN entries + one short entry + dir_size += (get_required_lfn_entries_count(fs_entity) + 1) * FATDefaults.ENTRY_SIZE + else: + dir_size += FATDefaults.ENTRY_SIZE + buff += calculate_min_space(path + [fs_entity], file, sector_size, long_file_names, is_root=False) + if is_root and dir_size // FATDefaults.ENTRY_SIZE > FATDefaults.ROOT_ENTRIES_COUNT: + raise NoFreeClusterException('Not enough space in root!') + + # roundup sectors, at least one is required + buff += (dir_size + sector_size - 1) // sector_size + return buff + + +def main() -> None: + args = get_args_for_partition_generator('Create a FAT filesystem and populate it with directory content', wl=False) + + if args.partition_size == -1: + clusters = calculate_min_space([], args.input_directory, args.sector_size, long_file_names=True, is_root=True) + fats = get_fat_sectors_count(clusters, args.sector_size) + root_dir_sectors = (FATDefaults.ROOT_ENTRIES_COUNT * FATDefaults.ENTRY_SIZE) // args.sector_size + args.partition_size = max(FATFS_MIN_ALLOC_UNIT * args.sector_size, + (clusters + fats + get_non_data_sectors_cnt(RESERVED_CLUSTERS_COUNT, + fats, + root_dir_sectors) + ) * args.sector_size + ) + + fatfs = FATFS(sector_size=args.sector_size, + sectors_per_cluster=args.sectors_per_cluster, + size=args.partition_size, + root_entry_count=args.root_entry_count, + explicit_fat_type=args.fat_type, + long_names_enabled=args.long_name_support, + use_default_datetime=args.use_default_datetime) + + fatfs.generate(args.input_directory) + fatfs.write_filesystem(args.output_file) + + +if __name__ == '__main__': + main() diff --git a/third_party/fatfsgen/project_include.cmake b/third_party/fatfsgen/project_include.cmake new file mode 100644 index 0000000000000000000000000000000000000000..d945c0e96eddd519a45794bc27e1e70296897625 --- /dev/null +++ b/third_party/fatfsgen/project_include.cmake @@ -0,0 +1,119 @@ +# fatfs_create_partition_image +# +# Create a fatfs image of the specified directory on the host during build and optionally +# have the created image flashed using `idf.py flash` +function(fatfs_create_partition_image partition base_dir) + set(options FLASH_IN_PROJECT WL_INIT PRESERVE_TIME) + cmake_parse_arguments(arg "${options}" "" "${multi}" "${ARGN}") + + + idf_build_get_property(idf_path IDF_PATH) + idf_build_get_property(python PYTHON) + + if(arg_WL_INIT) + set(fatfsgen_py ${python} ${idf_path}/components/fatfs/wl_fatfsgen.py) + else() + set(fatfsgen_py ${python} ${idf_path}/components/fatfs/fatfsgen.py) + endif() + + if(arg_PRESERVE_TIME) + set(default_datetime_option) + else() + set(default_datetime_option --use_default_datetime) + endif() + + if("${CONFIG_FATFS_SECTOR_512}") + set(fatfs_sector_size 512) + elseif("${CONFIG_FATFS_SECTOR_1024}") + set(fatfs_sector_size 1024) + elseif("${CONFIG_FATFS_SECTOR_2048}") + set(fatfs_sector_size 2048) + else() + set(fatfs_sector_size 4096) + endif() + + if("${CONFIG_FATFS_LFN_NONE}") + set(fatfs_long_names_option) + elseif("${CONFIG_FATFS_LFN_HEAP}") + set(fatfs_long_names_option --long_name_support) + elseif("${CONFIG_FATFS_LFN_STACK}") + set(fatfs_long_names_option --long_name_support) + endif() + + get_filename_component(base_dir_full_path ${base_dir} ABSOLUTE) + partition_table_get_partition_info(size "--partition-name ${partition}" "size") + partition_table_get_partition_info(offset "--partition-name ${partition}" "offset") + + if("${size}" AND "${offset}") + set(image_file ${CMAKE_BINARY_DIR}/${partition}.bin) + # Execute FATFS image generation; this always executes as there is no way to specify for CMake to watch for + # contents of the base dir changing. + add_custom_target(fatfs_${partition}_bin ALL + COMMAND ${fatfsgen_py} ${base_dir_full_path} + ${fatfs_long_names_option} + ${default_datetime_option} + --partition_size ${size} + --output_file ${image_file} + --sector_size "${fatfs_sector_size}" + ) + set_property(DIRECTORY "${CMAKE_CURRENT_SOURCE_DIR}" APPEND PROPERTY + ADDITIONAL_CLEAN_FILES + ${image_file}) + + idf_component_get_property(main_args esptool_py FLASH_ARGS) + idf_component_get_property(sub_args esptool_py FLASH_SUB_ARGS) + # Last (optional) parameter is the encryption for the target. In our + # case, fatfs is not encrypt so pass FALSE to the function. + esptool_py_flash_target(${partition}-flash "${main_args}" "${sub_args}" ALWAYS_PLAINTEXT) + esptool_py_flash_to_partition(${partition}-flash "${partition}" "${image_file}") + + add_dependencies(${partition}-flash fatfs_${partition}_bin) + if(arg_FLASH_IN_PROJECT) + esptool_py_flash_to_partition(flash "${partition}" "${image_file}") + add_dependencies(flash fatfs_${partition}_bin) + endif() + else() + set(message "Failed to create FATFS image for partition '${partition}'. " + "Check project configuration if using the correct partition table file.") + fail_at_build_time(fatfs_${partition}_bin "${message}") + endif() +endfunction() + + +function(fatfs_create_rawflash_image partition base_dir) + set(options FLASH_IN_PROJECT PRESERVE_TIME) + cmake_parse_arguments(arg "${options}" "" "${multi}" "${ARGN}") + + if(arg_FLASH_IN_PROJECT) + if(arg_PRESERVE_TIME) + fatfs_create_partition_image(${partition} ${base_dir} FLASH_IN_PROJECT PRESERVE_TIME) + else() + fatfs_create_partition_image(${partition} ${base_dir} FLASH_IN_PROJECT) + endif() + else() + if(arg_PRESERVE_TIME) + fatfs_create_partition_image(${partition} ${base_dir} PRESERVE_TIME) + else() + fatfs_create_partition_image(${partition} ${base_dir}) + endif() + endif() +endfunction() + +function(fatfs_create_spiflash_image partition base_dir) + set(options FLASH_IN_PROJECT PRESERVE_TIME) + cmake_parse_arguments(arg "${options}" "" "${multi}" "${ARGN}") + + if(arg_FLASH_IN_PROJECT) + if(arg_PRESERVE_TIME) + fatfs_create_partition_image(${partition} ${base_dir} FLASH_IN_PROJECT WL_INIT PRESERVE_TIME) + else() + fatfs_create_partition_image(${partition} ${base_dir} FLASH_IN_PROJECT WL_INIT) + endif() + else() + if(arg_PRESERVE_TIME) + fatfs_create_partition_image(${partition} ${base_dir} WL_INIT PRESERVE_TIME) + else() + fatfs_create_partition_image(${partition} ${base_dir} WL_INIT) + endif() + endif() +endfunction() diff --git a/third_party/fatfsgen/wl_fatfsgen.py b/third_party/fatfsgen/wl_fatfsgen.py new file mode 100755 index 0000000000000000000000000000000000000000..4a685434a0752413da22256e701dd68c6ecbd1fc --- /dev/null +++ b/third_party/fatfsgen/wl_fatfsgen.py @@ -0,0 +1,215 @@ +#!/usr/bin/env python +# SPDX-FileCopyrightText: 2021-2022 Espressif Systems (Shanghai) CO LTD +# SPDX-License-Identifier: Apache-2.0 + +from construct import Const, Int32ul, Struct +from fatfs_utils.exceptions import WLNotInitialized +from fatfs_utils.utils import (FULL_BYTE, UINT32_MAX, FATDefaults, crc32, generate_4bytes_random, + get_args_for_partition_generator) +from fatfsgen import FATFS + + +def remove_wl(binary_image: bytes) -> bytes: + partition_size: int = len(binary_image) + total_sectors: int = partition_size // FATDefaults.WL_SECTOR_SIZE + wl_state_size: int = WLFATFS.WL_STATE_HEADER_SIZE + WLFATFS.WL_STATE_RECORD_SIZE * total_sectors + wl_state_sectors_cnt: int = (wl_state_size + FATDefaults.WL_SECTOR_SIZE - 1) // FATDefaults.WL_SECTOR_SIZE + wl_state_total_size: int = wl_state_sectors_cnt * FATDefaults.WL_SECTOR_SIZE + wl_sectors_size: int = (wl_state_sectors_cnt + * FATDefaults.WL_SECTOR_SIZE + * WLFATFS.WL_STATE_COPY_COUNT + + FATDefaults.WL_SECTOR_SIZE) + + correct_wl_configuration = binary_image[-wl_sectors_size:] + + data_ = WLFATFS.WL_STATE_T_DATA.parse(correct_wl_configuration[:WLFATFS.WL_STATE_HEADER_SIZE]) + + total_records = 0 + # iterating over records field of the first copy of the state sector + for i in range(WLFATFS.WL_STATE_HEADER_SIZE, wl_state_total_size, WLFATFS.WL_STATE_RECORD_SIZE): + if correct_wl_configuration[i:i + WLFATFS.WL_STATE_RECORD_SIZE] != WLFATFS.WL_STATE_RECORD_SIZE * b'\xff': + total_records += 1 + else: + break + before_dummy = binary_image[:total_records * FATDefaults.WL_SECTOR_SIZE] + after_dummy = binary_image[total_records * FATDefaults.WL_SECTOR_SIZE + FATDefaults.WL_SECTOR_SIZE:] + new_image: bytes = before_dummy + after_dummy + + # remove wl sectors + new_image = new_image[:len(new_image) - (FATDefaults.WL_SECTOR_SIZE + 2 * wl_state_total_size)] + + # reorder to preserve original order + new_image = (new_image[-data_['move_count'] * FATDefaults.WL_SECTOR_SIZE:] + + new_image[:-data_['move_count'] * FATDefaults.WL_SECTOR_SIZE]) + return new_image + + +class WLFATFS: + # pylint: disable=too-many-instance-attributes + WL_CFG_SECTORS_COUNT = 1 + WL_DUMMY_SECTORS_COUNT = 1 + WL_CONFIG_HEADER_SIZE = 48 + WL_STATE_RECORD_SIZE = 16 + WL_STATE_HEADER_SIZE = 64 + WL_STATE_COPY_COUNT = 2 # always 2 copies for power failure safety + WL_SECTOR_SIZE = 0x1000 + + WL_STATE_T_DATA = Struct( + 'pos' / Int32ul, + 'max_pos' / Int32ul, + 'move_count' / Int32ul, + 'access_count' / Int32ul, + 'max_count' / Int32ul, + 'block_size' / Int32ul, + 'version' / Int32ul, + 'device_id' / Int32ul, + 'reserved' / Const(28 * b'\x00') + ) + + WL_CONFIG_T_DATA = Struct( + 'start_addr' / Int32ul, + 'full_mem_size' / Int32ul, + 'page_size' / Int32ul, + 'sector_size' / Int32ul, # always 4096 for the types of NOR flash supported by ESP-IDF! + 'updaterate' / Int32ul, + 'wr_size' / Int32ul, + 'version' / Int32ul, + 'temp_buff_size' / Int32ul + ) + WL_CONFIG_T_HEADER_SIZE = 48 + + def __init__(self, + size: int = FATDefaults.SIZE, + sector_size: int = FATDefaults.SECTOR_SIZE, + reserved_sectors_cnt: int = FATDefaults.RESERVED_SECTORS_COUNT, + fat_tables_cnt: int = FATDefaults.FAT_TABLES_COUNT, + sectors_per_cluster: int = FATDefaults.SECTORS_PER_CLUSTER, + explicit_fat_type: int = None, + hidden_sectors: int = FATDefaults.HIDDEN_SECTORS, + long_names_enabled: bool = False, + num_heads: int = FATDefaults.NUM_HEADS, + oem_name: str = FATDefaults.OEM_NAME, + sec_per_track: int = FATDefaults.SEC_PER_TRACK, + volume_label: str = FATDefaults.VOLUME_LABEL, + file_sys_type: str = FATDefaults.FILE_SYS_TYPE, + use_default_datetime: bool = True, + version: int = FATDefaults.VERSION, + temp_buff_size: int = FATDefaults.TEMP_BUFFER_SIZE, + device_id: int = None, + root_entry_count: int = FATDefaults.ROOT_ENTRIES_COUNT, + media_type: int = FATDefaults.MEDIA_TYPE) -> None: + self._initialized = False + self._version = version + self._temp_buff_size = temp_buff_size + self._device_id = device_id + self.partition_size = size + self.total_sectors = self.partition_size // FATDefaults.WL_SECTOR_SIZE + self.wl_state_size = WLFATFS.WL_STATE_HEADER_SIZE + WLFATFS.WL_STATE_RECORD_SIZE * self.total_sectors + + # determine the number of required sectors (roundup to sector size) + self.wl_state_sectors = (self.wl_state_size + FATDefaults.WL_SECTOR_SIZE - 1) // FATDefaults.WL_SECTOR_SIZE + + self.boot_sector_start = FATDefaults.WL_SECTOR_SIZE # shift by one "dummy" sector + self.fat_table_start = self.boot_sector_start + reserved_sectors_cnt * FATDefaults.WL_SECTOR_SIZE + + wl_sectors = (WLFATFS.WL_DUMMY_SECTORS_COUNT + WLFATFS.WL_CFG_SECTORS_COUNT + + self.wl_state_sectors * WLFATFS.WL_STATE_COPY_COUNT) + self.plain_fat_sectors = self.total_sectors - wl_sectors + self.plain_fatfs = FATFS( + explicit_fat_type=explicit_fat_type, + size=self.plain_fat_sectors * FATDefaults.WL_SECTOR_SIZE, + reserved_sectors_cnt=reserved_sectors_cnt, + fat_tables_cnt=fat_tables_cnt, + sectors_per_cluster=sectors_per_cluster, + sector_size=sector_size, + root_entry_count=root_entry_count, + hidden_sectors=hidden_sectors, + long_names_enabled=long_names_enabled, + num_heads=num_heads, + use_default_datetime=use_default_datetime, + oem_name=oem_name, + sec_per_track=sec_per_track, + volume_label=volume_label, + file_sys_type=file_sys_type, + media_type=media_type + ) + + self.fatfs_binary_image = self.plain_fatfs.state.binary_image + + def init_wl(self) -> None: + self.fatfs_binary_image = self.plain_fatfs.state.binary_image + self._add_dummy_sector() + # config must be added after state, do not change the order of these two calls! + self._add_state_sectors() + self._add_config_sector() + self._initialized = True + + def _add_dummy_sector(self) -> None: + self.fatfs_binary_image = FATDefaults.WL_SECTOR_SIZE * FULL_BYTE + self.fatfs_binary_image + + def _add_config_sector(self) -> None: + wl_config_data = WLFATFS.WL_CONFIG_T_DATA.build( + dict( + start_addr=0, + full_mem_size=self.partition_size, + page_size=FATDefaults.WL_SECTOR_SIZE, # equal to sector size (always 4096) + sector_size=FATDefaults.WL_SECTOR_SIZE, + updaterate=FATDefaults.UPDATE_RATE, + wr_size=FATDefaults.WR_SIZE, + version=self._version, + temp_buff_size=self._temp_buff_size + ) + ) + + crc = crc32(list(wl_config_data), UINT32_MAX) + wl_config_crc = Int32ul.build(crc) + + # adding three 4 byte zeros to align the structure + wl_config = wl_config_data + wl_config_crc + Int32ul.build(0) + Int32ul.build(0) + Int32ul.build(0) + + self.fatfs_binary_image += ( + wl_config + (FATDefaults.WL_SECTOR_SIZE - WLFATFS.WL_CONFIG_HEADER_SIZE) * FULL_BYTE) + + def _add_state_sectors(self) -> None: + wl_state_data = WLFATFS.WL_STATE_T_DATA.build( + dict( + pos=0, + max_pos=self.plain_fat_sectors + WLFATFS.WL_DUMMY_SECTORS_COUNT, + move_count=0, + access_count=0, + max_count=FATDefaults.UPDATE_RATE, + block_size=FATDefaults.WL_SECTOR_SIZE, # equal to page size, thus equal to wl sector size (4096) + version=self._version, + device_id=self._device_id or generate_4bytes_random(), + ) + ) + crc = crc32(list(wl_state_data), UINT32_MAX) + wl_state_crc = Int32ul.build(crc) + wl_state = wl_state_data + wl_state_crc + wl_state_sector_padding: bytes = (FATDefaults.WL_SECTOR_SIZE - WLFATFS.WL_STATE_HEADER_SIZE) * FULL_BYTE + wl_state_sector: bytes = ( + wl_state + wl_state_sector_padding + (self.wl_state_sectors - 1) * FATDefaults.WL_SECTOR_SIZE * FULL_BYTE + ) + self.fatfs_binary_image += (WLFATFS.WL_STATE_COPY_COUNT * wl_state_sector) + + def wl_write_filesystem(self, output_path: str) -> None: + if not self._initialized: + raise WLNotInitialized('FATFS is not initialized with WL. First call method WLFATFS.init_wl!') + with open(output_path, 'wb') as output: + output.write(bytearray(self.fatfs_binary_image)) + + +if __name__ == '__main__': + desc = 'Create a FAT filesystem with support for wear levelling and populate it with directory content' + args = get_args_for_partition_generator(desc, wl=True) + wl_fatfs = WLFATFS(sectors_per_cluster=args.sectors_per_cluster, + size=args.partition_size, + sector_size=args.sector_size, + root_entry_count=args.root_entry_count, + explicit_fat_type=args.fat_type, + long_names_enabled=args.long_name_support, + use_default_datetime=args.use_default_datetime) + + wl_fatfs.plain_fatfs.generate(args.input_directory) + wl_fatfs.init_wl() + wl_fatfs.wl_write_filesystem(args.output_file)