Skip to content
Draft
33 changes: 31 additions & 2 deletions sigmf/convert/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
"""Convert non-SigMF recordings to SigMF format"""

from pathlib import Path
from typing import Optional

from ..error import SigMFConversionError


def get_magic_bytes(file_path: Path, count: int = 4, offset: int = 0) -> bytes:
def get_magic_bytes(file_path: Path, count: int = 4, offset: int = 0, magic_bytes: Optional[bytes] = None) -> bytes:
"""
Get magic bytes from a file to help identify file type.

Expand All @@ -23,6 +24,8 @@ def get_magic_bytes(file_path: Path, count: int = 4, offset: int = 0) -> bytes:
Number of bytes to read. Default is 4.
offset : int, optional
Byte offset to start reading from. Default is 0.
magic_bytes : str, optional
If provided, search the entire file for this byte sequence.

Returns
-------
Expand All @@ -36,11 +39,25 @@ def get_magic_bytes(file_path: Path, count: int = 4, offset: int = 0) -> bytes:
"""
try:
with open(file_path, "rb") as handle:
# If magic_bytes is provided, search anywhere in the file
if magic_bytes is not None:
data = handle.read()
idx = data.find(magic_bytes)
if idx != -1:
return magic_bytes
raise SigMFConversionError(
f"Magic bytes {magic_bytes} not found anywhere in {file_path}"
)
Comment on lines 41 to +50

# Otherwise: read bytes at the given offset
handle.seek(offset)
magic_bytes = handle.read(count)
if len(magic_bytes) < count:
raise SigMFConversionError(f"File {file_path} too small to read {count} magic bytes at offset {offset}")
raise SigMFConversionError(
f"File {file_path} too small to read {count} bytes at offset {offset}"
)
return magic_bytes

except (IOError, OSError) as err:
raise SigMFConversionError(f"Cannot read magic bytes from {file_path}: {err}") from err

Expand Down Expand Up @@ -84,6 +101,18 @@ def detect_converter(file_path: Path):
f"Expected SignalHoundIQFile for Signal Hound Spike files."
)

elif file_path.suffix in [".tar"]:
# iq.tar file extensions are used by Rohde & Schwarz for their IQ data, but the .tar extension is also used by other formats.
# So parse the tar file to determine if it is a Rohde & Schwarz file or not.
rohde_schwarz_expanded_magic_bytes = get_magic_bytes(file_path, count=20, offset=0,magic_bytes=b"RS_IQ_TAR_FileFormat") # <RS_IQ_TAR_FileFormat>
if rohde_schwarz_expanded_magic_bytes == b"RS_IQ_TAR_FileFormat":
return "rohdeschwarz"
else:
raise SigMFConversionError(
f"Unsupported XML file format. Root element: {rohde_schwarz_expanded_magic_bytes}. "
f"Expected RS_IQ_TAR_FileFormat for IQ.TAR files."
)

Comment on lines +104 to +115
else:
raise SigMFConversionError(
f"Unsupported file format. Magic bytes: {magic_bytes}. "
Expand Down
16 changes: 10 additions & 6 deletions sigmf/convert/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@
from .. import __version__ as toolversion
from ..error import SigMFConversionError
from . import detect_converter
from .wav import wav_to_sigmf
from .blue import blue_to_sigmf
from .signalhound import signalhound_to_sigmf
from .wav import wav_to_sigmf

from .rohdeschwarz import rohdeschwarz_to_sigmf
Comment on lines 16 to +20

def main() -> None:
"""
Unified entry-point for SigMF conversion of non-SigMF recordings.

This command-line interface converts various non-SigMF file formats into SigMF-compliant datasets.
It currently supports WAV and BLUE/Platinum file formats.
It currently supports WAV and BLUE/Platinum, Signal Hound Spike and Rohde and Schwarz IQ.TAR file formats.
The converter detects the file type based on magic bytes and invokes the appropriate conversion function.

By default it will output a SigMF pair (.sigmf-meta and .sigmf-data).
Expand Down Expand Up @@ -94,10 +94,14 @@ def main() -> None:
elif converter_type == "blue":
_ = blue_to_sigmf(blue_path=input_path, out_path=output_path, create_archive=args.archive, create_ncd=args.ncd)
elif converter_type == "signalhound":
_ = signalhound_to_sigmf(
signalhound_path=input_path, out_path=output_path, create_archive=args.archive, create_ncd=args.ncd
)
_ = signalhound_to_sigmf(signalhound_path=input_path, out_path=output_path, create_archive=args.archive, create_ncd=args.ncd)
elif converter_type == "rohdeschwarz":
_ = rohdeschwarz_to_sigmf(rohdeschwarz_path=input_path, out_path=output_path, create_archive=args.archive, create_ncd=args.ncd)

else:
raise SigMFConversionError(
f"Supported formats for conversion are WAV, BLUE/Platinum, Signal Hound Spike and Rohde and Schwarz IQ.TAR."
)
Comment on lines +101 to +104

if __name__ == "__main__":
main()
Loading
Loading