FMTlab 2026-02-13¶
For my results¶
see the very bottom of this record.
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s"
)
from grab_with_python import GrabSound, _LOGGER
_LOGGER.setLevel(logging.DEBUG)
# Data from monitoring RWM:
TOTAL_DELAY_FMTLAB = 28.5e-3 # 28.5 ms ± 1.2 ms or so
# TRX frequency is too high by about -0.01402 ppm
FREQ_DEVIATION = -0.014 # PPM
FILENAME = "2026-03-06_205119"
DIAL_FREQUENCY = 3563400
RF_FREQUENCY = DIAL_FREQUENCY * (1 + FREQ_DEVIATION * 1e-6)
RF_FREQUENCY
3563399.9501123996
import numpy
import math
from datetime import datetime, timedelta, UTC
import multiprocessing
import scipy
import functools
from dataclasses import dataclass
import pandas
import matplotlib.pyplot as plt
grab = GrabSound(FILENAME)
rx_qrg = RF_FREQUENCY
print(f"RX oscillator assumed to be running at {rx_qrg:.3f} Hz")
2026-03-09 17:55:09,241 DEBUG grab_with_python: Time values read for most samples (78011392 total) at 47999.6763 ≈ 48000 samples per second 2026-03-09 17:55:10,318 DEBUG grab_with_python: Read 78011392 samples, spanning about 27.09 minutes
RX oscillator assumed to be running at 3563399.950 Hz
total_delay = TOTAL_DELAY_FMTLAB
First, some sanity checks¶
I use a somewhat dubious way to time our measurements: Each time a parcel of samples comes in from the sound card, I should be writing the total number of samples received thus far and a time stamp to an auxiliary file.
For optimization, I didn't do that, but only wrote every so often. Still, enough "time points" remain. From these, we can reconstruct the time (as a Python datetime or as a nanosecond since the epoch) for each particular sample.
The basis of everything is a regular PC's clock, nothing fancy. But it is disciplined via NTP.
I do some sanity checks on these times.
First check: Time difference for each time point¶
I do some smoothing in my calculation of the time of an individual sample.
Here, I calculate the difference between the raw data as written at the time of recording and the smoothed time value for the last sample written.
def plot_time_deviation(grab) -> pandas.DataFrame:
deltas = [(grab.time_raw(tp.written_samples) - tp.t_ns)*1e-9 for tp in grab.timed_points[:-1]]
deltas_without_outliers = [d if abs(d) < 0.5e-3 else None for d in deltas]
outlier_count = 0
for d in deltas_without_outliers:
if d is None:
outlier_count += 1
df_delta = pandas.DataFrame(
{
"delta": deltas,
f"delta without {outlier_count} outliers": deltas_without_outliers,
},
index = [tp.written_samples for tp in grab.timed_points[:-1]]
)
df_delta.plot(
subplots=True,
figsize=(12,6),
grid=True,
title="Precision of Sound Card",
# legend=False,
xlabel=f"Samples at {grab.samples_per_second} samples/second.",
ylabel="Difference / s"
# ylabel="How much ahead (+) or delayed (-) of average flow\ndid individual sample packages arrive, in seconds."
)
return df_delta
plot_time_deviation(grab)
| delta | delta without 40 outliers | |
|---|---|---|
| 1024 | -0.000080 | -0.000080 |
| 2048 | 0.000037 | 0.000037 |
| 3072 | -0.000124 | -0.000124 |
| 4096 | -0.000128 | -0.000128 |
| 5120 | -0.000043 | -0.000043 |
| ... | ... | ... |
| 78006272 | 0.000135 | 0.000135 |
| 78007296 | -0.000033 | -0.000033 |
| 78008320 | 0.000145 | 0.000145 |
| 78009344 | 0.000100 | 0.000100 |
| 78010368 | -0.000062 | -0.000062 |
76182 rows × 2 columns
Difference of when recording started¶
Given the sample frequency (of 48000 Hz) and the number of samples written thus far, I can calculate back when recording should have started.
Ideally, this should always be the same point in time.
I am not so much interested in that point in time, but more how it develops over time. For this calculations, I no longer use the raw value, but my smoothed version of it.
This should be a straight line ideally.
def plot_smooth(grab) -> pandas.DataFrame:
df_s_delta = pandas.DataFrame(
{
"smooth delta": [tpe.backprojected_sample_start*1e-9 for tpe in grab.timed_points_extended]
},
index=[tpe.written_samples for tpe in grab.timed_points_extended]
)
df_s_delta.plot(
figsize=(12,6),
grid=True,
title="Precision of Sound Card",
ylabel="Deviation / s",
xlabel=f"Samples at {grab.samples_per_second} samples/second.",
)
return df_s_delta
plot_smooth(grab), len(grab.timed_points)
( smooth delta 52224 -0.021413 53248 -0.021416 54272 -0.021417 55296 -0.021420 56320 -0.021423 ... ... 77956096 -0.010305 77957120 -0.010305 77958144 -0.010308 77959168 -0.010308 77960192 -0.010308 [76083 rows x 1 columns], 76183)
Delta from one smoothing point to the next¶
Here, I look into my internal smoothing data structure and check for changes that are more rapid than I would want. This could have happened if NTP had jumped the clock. But the computer had been running for some time when the recording was made, so NTP didn't do that.
def plot_delta_to_next(grab) -> pandas.DataFrame:
pns = list(zip(grab.timed_points_extended[:-1],grab.timed_points_extended[1:]))
df_delta_to_next = pandas.DataFrame(
{
"from_one_to_next": [
(pn[1].backprojected_sample_start-pn[0].backprojected_sample_start)*1e-9 for pn in pns
]
},
index=[pn[0].written_samples for pn in pns]
)
df_delta_to_next.plot(
figsize=(12,6),
grid=True,
title="From one smoothing point to next, time difference of when the two think recording started.",
ylabel="Difference / s",
xlabel=f"Samples at {grab.samples_per_second} samples/second.",
)
return df_delta_to_next
df_delta_to_next = plot_delta_to_next(grab)
# Hunt the problem
df_delta_to_next[0.0001 < df_delta_to_next["from_one_to_next"]]
| from_one_to_next | |
|---|---|
| 26665984 | 0.000132 |
| 26668032 | 0.000182 |
| 26671104 | 0.000129 |
| 26672128 | 0.000139 |
| 26674176 | 0.000112 |
| 26675200 | 0.000197 |
| 26677248 | 0.000128 |
| 26678272 | 0.000131 |
| 26681344 | 0.000140 |
| 26682368 | 0.000110 |
| 26683392 | 0.000120 |
grab.datetime(26665984), grab.datetime(26683392)
(datetime.datetime(2026, 3, 6, 21, 0, 35, 432267, tzinfo=datetime.timezone.utc), datetime.datetime(2026, 3, 6, 21, 0, 35, 796713, tzinfo=datetime.timezone.utc))
Oh, good. That's harmless.
There has been a problem with finding the signal, so I can use my recording only after 2026-03-06 21:02:58 UTC anyway, more than 2 minutes after the glitch.
From a timestamp to a sample and back¶
Go from a timestamp (in nanoseconds since epoch) to a sample index and back. This should not deviate more than half the time from one sample to the next. The latter is about 0.2 ms.
This doesn't say much about the quality of the measurement, but is more a test that my software is working as intended.
def sanity_ns_to_index_and_back(grab):
start_ns = grab.time_ns(0)
end_ns = grab.time_ns(grab.num_of_samples -1)
start_ns, end_ns, type(start_ns), type(end_ns)
ns_and_back_delta = []
t_nss = []
# Cannot use numpy.linspace, as internally it calculates with floats
# and then, rounding errors cause the initial value to be too low.
delta_per_step = (end_ns - start_ns) // 2999
t_ns = start_ns
while t_ns < end_ns:
indx = grab.sample_index_from_ns(t_ns)
t_ns_back = grab.time_ns(indx)
ns_and_back_delta.append((t_ns_back-t_ns)*1e-9)
t_nss.append(t_ns)
t_ns += delta_per_step
df = pandas.DataFrame({
"ns_and_back_delta": ns_and_back_delta
},
index = t_nss
)
df.plot(
title="Going from ns to index and back to ns, resulting delta.",
grid=True,
figsize=(12,6)
)
return df
print(f"Compare with {1/grab.samples_per_second} s per sample")
sanity_ns_to_index_and_back(grab)
Compare with 2.0833333333333333e-05 s per sample
| ns_and_back_delta | |
|---|---|
| 1772830279886130582 | 0.000000 |
| 1772830280428060697 | 0.000007 |
| 1772830280969990812 | -0.000006 |
| 1772830281511920927 | 0.000006 |
| 1772830282053851042 | -0.000003 |
| ... | ... |
| 1772831902966825007 | 0.000005 |
| 1772831903508755122 | -0.000009 |
| 1772831904050685237 | 0.000009 |
| 1772831904592615352 | -0.000005 |
| 1772831905134545467 | 0.000003 |
3000 rows × 1 columns
From a sample to a timestamp and back¶
This is the reverse: Going from a sample index to a time and back to the sample index.
No leeway here: It should be the very same index we started with.
This again has nothing to do with the quality of the recorded data, but is a software test.
def sanity_index_to_ns_and_back(grab):
indexs = numpy.linspace(0, grab.num_of_samples, 2999, endpoint=False, dtype=numpy.int64)
for indx in indexs:
t_ns = grab.time_ns(indx)
indx_back = grab.sample_index_from_ns(t_ns)
if indx != indx_back:
raise RuntimeError(f"{indx} → {t_ns} → {indx_back}")
for indx in indexs:
timestamp = grab.datetime(indx)
indx_back = grab.sample_index(timestamp)
if indx != indx_back:
raise RuntimeError(f"{indx} → {timestamp} → {indx_back}")
print("All is well in nanoseconds.")
def sanity_index_to_datetime_and_back(grab):
indexs = numpy.linspace(0, grab.num_of_samples, 499, endpoint=False, dtype=numpy.int64)
for indx in indexs:
time = grab.datetime(indx)
indx_back = grab.sample_index(time)
if indx != indx_back:
raise RuntimeError("f{indx} → {time} → {indx_back}")
print("All is well in timestamps")
sanity_index_to_ns_and_back(grab)
sanity_index_to_datetime_and_back(grab)
All is well in nanoseconds. All is well in timestamps
What have we got?¶
Let us have a first look at our recording.
When did we record?¶
start_time = grab.start_time()
end_time = grab.end_time()
signal_start_time = start_time.replace(minute=58, second=0, microsecond=0)
signal_hour_start = signal_start_time.replace(hour=signal_start_time.hour + 1, minute=0, second=0, microsecond=0)
print(f"Recording from {start_time} to {end_time},\n"
f"signal expected to start not before {signal_start_time}\n"
f"or actually at about {signal_hour_start}"
)
Recording from 2026-03-06 20:51:19.886131+00:00 to 2026-03-06 21:18:25.134548+00:00, signal expected to start not before 2026-03-06 20:58:00+00:00 or actually at about 2026-03-06 21:00:00+00:00
What signals do we see?¶
I want a waterfall for this. Mine flows from left to right.
def do_fft(grab, from_t, to_t, window_width=grab.samples_per_second):
samples = grab.samples[grab.sample_index(from_t):grab.sample_index(to_t)]
half_window_width = window_width / 2
def window_value(i:int) -> float:
# straight from https://en.wikipedia.org/wiki/Window_function#Welch_window
return 1.0 - ((i - half_window_width)/half_window_width)**2
# Slightly non-symmetrical: The leftmost value is 0, the rightmost not quite.
window = [window_value(i) for i in range(0, window_width)]
fft_engine = scipy.signal.ShortTimeFFT(numpy.array(window), int(window_width/2), grab.samples_per_second)
ffts = fft_engine.stft(numpy.array(samples))
fft_amplitudes = [
[
abs(ffts[row_index][col_index]) for col_index in range(0,len(ffts[row_index]))
]
for row_index in range(0, len(ffts))
]
return fft_engine, fft_amplitudes, ffts
fft_engine, fft_amplitudes, ffts = do_fft(grab, signal_start_time, end_time)
import heapq
# The graph has more information if vmax is used to remove the few brightest values.
def find_reasonably_large_amplitude(fraction:float, fft_amplitudes):
all_negative_amps = [-fft_amplitudes[f_index][t_index]
for t_index in range(0,len(fft_amplitudes[0]))
for f_index in range(0,len(fft_amplitudes))
]
heapq.heapify(all_negative_amps)
for disregard_count in range(0,int(len(all_negative_amps)*fraction)):
heapq.heappop(all_negative_amps)
return -all_negative_amps[0]
vmax = find_reasonably_large_amplitude(0.01, fft_amplitudes)
vmax
np.float64(499217.489584916)
def plot(fft_amplitudes, fft_engine, from_t,
vmax = vmax,
zoom_t_start = None,
zoom_t_end = None,
zoom_f_start = 0,
zoom_f_end = 3000
):
delta_t_per_x = fft_engine.delta_t
if zoom_t_start is None:
zoom_t_start = from_t
if zoom_t_end is None:
zoom_t_end = from_t + timedelta(seconds=(1+len(fft_amplitudes[0]))*delta_t_per_x)
time_index_and_time = []
for time_index in range(0, len(fft_amplitudes[0])):
time = from_t + timedelta(seconds=time_index*delta_t_per_x)
if zoom_t_start <= time < zoom_t_end:
time_index_and_time.append((time_index, time))
delta_f_per_y = fft_engine.delta_f
freq_index_and_freq = []
for freq_index in range(0, len(fft_amplitudes)):
freq = freq_index * delta_f_per_y
if zoom_f_start <= freq < zoom_f_end:
freq_index_and_freq.append((freq_index, freq))
fft_amplitudes_filtered = [
[
fft_amplitudes[freq_index][time_index]
for time_index, time in time_index_and_time
]
for freq_index, freq in freq_index_and_freq
]
fig, axes = plt.subplots(figsize=(12, 8))
cmap = plt.colormaps["plasma"]
x_values = [(time - signal_hour_start).total_seconds()/60 for time_index, time in time_index_and_time]
y_values = [freq for freq_index, freq in freq_index_and_freq]
axes.pcolormesh(x_values, y_values, fft_amplitudes_filtered,
vmax=vmax, cmap=cmap, norm="linear")
axes.set_xlabel("time into full hour / minutes")
axes.set_ylabel("Frequency / Hz")
plot(fft_amplitudes, fft_engine, signal_start_time)
Apparently, the signal at about 2300 Hz is the one that I want.
I suspect the steady tones of 60 s each exercise my receiver's AGC, so the background noise becomes less. This is how I interpret the six well-defined line segments with a darker background.
Let us look into the first one:
LOWER_AF = 2300
HIGHER_AF = 2325
ZOOM_VMAX = 4e7
# Finding first carrier
plot(fft_amplitudes, fft_engine, signal_start_time, ZOOM_VMAX,
signal_hour_start.replace(minute=4, second=0, microsecond=0),
signal_hour_start.replace(minute=5, second=10, microsecond=0),
LOWER_AF, HIGHER_AF)
print(f"{fft_engine.delta_f:.5f} Hz per bin")
1.00000 Hz per bin
audio_frequencies = []
into_the_carrier_minute_timestamps = []
# Finding first audio frequency
def analyze_for_frequencies(grab: GrabSound, carrier_detect_from, carrier_detect_to, plot=False):
frequencies, densities = scipy.signal.periodogram(
grab.samples[grab.sample_index(carrier_detect_from):grab.sample_index(carrier_detect_to)],
fs=grab.samples_per_second
)
af_measured, density = functools.reduce(lambda b,n: b if n[1] < b[1] else n,
zip(frequencies, densities))
if plot:
fds = [(f,d) for f,d in zip(frequencies, densities) if af_measured - 0.5 < f < af_measured + 0.5]
df = pandas.DataFrame(
{"relative Amplitude": [fd[1] for fd in fds]},
index=[fd[0] for fd in fds]
)
df.plot(grid=True, xlabel="Frequency / Hz", figsize=(12,6))
return frequencies, densities, af_measured
itcm = signal_hour_start.replace(minute=4, second=6)
into_the_carrier_minute_timestamps.append(itcm)
freq, densities, af_measured = \
analyze_for_frequencies(grab,
itcm,
itcm + timedelta(seconds=54), True)
print(f"Audio frequency measured in first minute: {af_measured:.2f}, "
f"TX QRG: {rx_qrg + af_measured:.2f}")
audio_frequencies.append(af_measured)
Audio frequency measured in first minute: 2312.60, TX QRG: 3565712.55
# Finding second carrier
plot(fft_amplitudes, fft_engine, signal_start_time, ZOOM_VMAX,
signal_hour_start.replace(minute=6, second=0, microsecond=0),
signal_hour_start.replace(minute=7, second=30, microsecond=0),
LOWER_AF, HIGHER_AF)
# Finding second audio frequency
into_the_carrier_minute_timestamps.append(signal_hour_start.replace(minute=6, second=21))
freq, densities, af_measured = \
analyze_for_frequencies(grab,
into_the_carrier_minute_timestamps[-1],
into_the_carrier_minute_timestamps[-1] + timedelta(seconds=54),
True)
print(f"Audio frequency measured in minute after {into_the_carrier_minute_timestamps[-1]}: {af_measured:.2f}\n"
f"TX QRG: {rx_qrg + af_measured:.2f}")
audio_frequencies.append(af_measured)
Audio frequency measured in minute after 2026-03-06 21:06:21+00:00: 2312.60 TX QRG: 3565712.55
# Finding third carrier
plot(fft_amplitudes, fft_engine, signal_start_time, ZOOM_VMAX,
signal_hour_start.replace(minute=8, second=30, microsecond=0),
signal_hour_start.replace(minute=9, second=42, microsecond=0),
LOWER_AF, HIGHER_AF)
# Finding third audio frequency
into_the_carrier_minute_timestamps.append(signal_hour_start.replace(minute=8, second=42))
freq, densities, af_measured = \
analyze_for_frequencies(grab,
into_the_carrier_minute_timestamps[-1],
into_the_carrier_minute_timestamps[-1] + timedelta(seconds=54),
True)
print(f"Audio frequency measured in minute after {into_the_carrier_minute_timestamps[-1]}: {af_measured:.2f}\n"
f"TX QRG: {rx_qrg + af_measured:.2f}")
audio_frequencies.append(af_measured)
Audio frequency measured in minute after 2026-03-06 21:08:42+00:00: 2312.60 TX QRG: 3565712.55
# Finding forth carrier
plot(fft_amplitudes, fft_engine, signal_start_time, ZOOM_VMAX,
signal_hour_start.replace(minute=10, second=48, microsecond=0),
signal_hour_start.replace(minute=12, second=0, microsecond=0),
LOWER_AF, HIGHER_AF)
# Finding forth audio frequency
into_the_carrier_minute_timestamps.append(signal_hour_start.replace(minute=10, second=57))
freq, densities, af_measured = \
analyze_for_frequencies(grab,
into_the_carrier_minute_timestamps[-1],
into_the_carrier_minute_timestamps[-1] + timedelta(seconds=54),
True)
print(f"Audio frequency measured in minute after {into_the_carrier_minute_timestamps[-1]}: {af_measured:.2f}\n"
f"TX QRG: {rx_qrg + af_measured:.2f}")
audio_frequencies.append(af_measured)
Audio frequency measured in minute after 2026-03-06 21:10:57+00:00: 2312.53 TX QRG: 3565712.48
# Finding fifth carrier
plot(fft_amplitudes, fft_engine, signal_start_time, ZOOM_VMAX,
signal_hour_start.replace(minute=13, second=12, microsecond=0),
signal_hour_start.replace(minute=14, second=24, microsecond=0),
LOWER_AF, HIGHER_AF)
# Finding fifth audio frequency
into_the_carrier_minute_timestamps.append(signal_hour_start.replace(minute=13, second=15))
freq, densities, af_measured = \
analyze_for_frequencies(grab,
into_the_carrier_minute_timestamps[-1],
into_the_carrier_minute_timestamps[-1] + timedelta(seconds=54),
True)
print(f"Audio frequency measured in minute after {into_the_carrier_minute_timestamps[-1]}: {af_measured:.2f}\n"
f"TX QRG: {rx_qrg + af_measured:.2f}")
audio_frequencies.append(af_measured)
Audio frequency measured in minute after 2026-03-06 21:13:15+00:00: 2312.60 TX QRG: 3565712.55
# Finding sixth carrier
plot(fft_amplitudes, fft_engine, signal_start_time, ZOOM_VMAX,
signal_hour_start.replace(minute=15, second=24, microsecond=0),
signal_hour_start.replace(minute=16, second=42, microsecond=0),
LOWER_AF, HIGHER_AF)
# Finding sixth audio frequency
into_the_carrier_minute_timestamps.append(signal_hour_start.replace(minute=15, second=33))
freq, densities, af_measured = \
analyze_for_frequencies(grab,
into_the_carrier_minute_timestamps[-1],
into_the_carrier_minute_timestamps[-1] + timedelta(seconds=54),
True)
print(f"Audio frequency measured in minute after {into_the_carrier_minute_timestamps[-1]}: {af_measured:.2f}\n"
f"TX QRG: {rx_qrg + af_measured:.2f}")
audio_frequencies.append(af_measured)
Audio frequency measured in minute after 2026-03-06 21:15:33+00:00: 2312.58 TX QRG: 3565712.53
audio_frequencies, into_the_carrier_minute_timestamps
([np.float64(2312.602406877498), np.float64(2312.602406877498), np.float64(2312.599730246081), np.float64(2312.5274403064827), np.float64(2312.6015146663376), np.float64(2312.5803194684554)], [datetime.datetime(2026, 3, 6, 21, 4, 6, tzinfo=datetime.timezone.utc), datetime.datetime(2026, 3, 6, 21, 6, 21, tzinfo=datetime.timezone.utc), datetime.datetime(2026, 3, 6, 21, 8, 42, tzinfo=datetime.timezone.utc), datetime.datetime(2026, 3, 6, 21, 10, 57, tzinfo=datetime.timezone.utc), datetime.datetime(2026, 3, 6, 21, 13, 15, tzinfo=datetime.timezone.utc), datetime.datetime(2026, 3, 6, 21, 15, 33, tzinfo=datetime.timezone.utc)])
avg_audio_frequency = sum(audio_frequencies)/len(audio_frequencies)
avg_audio_frequency, avg_audio_frequency + rx_qrg
(np.float64(2312.585636407059), np.float64(3565712.535748807))
class FindSignalEdge:
# num_of_samples 128 experimentally arrived at as yielding reasonable values
def __init__(self, frequency, num_of_samples=128, samples_per_second=48000):
self.num_of_samples = num_of_samples
self.index_adjustment = round(num_of_samples * 1.66) # Experimentally arrived
ts = numpy.array([n/samples_per_second for n in range(0, num_of_samples)])
ωts = numpy.float64(2*math.pi*frequency) * ts
self.cos_samples = numpy.cos(ωts)
self.sin_samples = numpy.sin(ωts)
self.edge_rise = numpy.array([1 if n < num_of_samples else -1 for n in range(0, 2*num_of_samples)])
self.edge_fall = numpy.array([-1 if n < num_of_samples else 1 for n in range(0, 2*num_of_samples)])
def convolve(self, samples):
sin_cor = scipy.signal.convolve(self.sin_samples, samples, mode="full")
cos_cor = scipy.signal.convolve(self.cos_samples, samples, mode="full")
ampl2 = sin_cor**2 + cos_cor**2
return ampl2 / self.num_of_samples
def look_for_edge(self, samples, rise):
if rise:
return scipy.signal.convolve(self.edge_rise, self.convolve(samples)) / self.num_of_samples
else:
return scipy.signal.convolve(self.edge_fall, self.convolve(samples)) / self.num_of_samples
def find_edge_raw(self, samples, rise):
lfe = self.look_for_edge(samples, rise)
# Homebrew peak finder (scipy has something better, should figure out how to use it).
value, lfe_index = functools.reduce(lambda b,n: b if n[0] <= b[0] else n,
zip(lfe, range(0, len(lfe))))
return lfe_index
def find_edge(self, grab, from_t: datetime, to_t: datetime, rise=True) -> datetime:
from_t_index = grab.sample_index(from_t)
to_t_index = grab.sample_index(to_t)
edge_index_raw = self.find_edge_raw(grab.samples[from_t_index:to_t_index], rise)
edge_index_adjusted = edge_index_raw - self.index_adjustment
return grab.datetime(from_t_index + edge_index_adjusted)
fse = FindSignalEdge(avg_audio_frequency)
real_starts = [
fse.find_edge(grab, icmt+timedelta(seconds=50), icmt+timedelta(seconds=60), rise=True) \
- timedelta(seconds=60)
for icmt in into_the_carrier_minute_timestamps
]
real_starts
[datetime.datetime(2026, 3, 6, 21, 3, 56, 309, tzinfo=datetime.timezone.utc), datetime.datetime(2026, 3, 6, 21, 6, 11, 6, tzinfo=datetime.timezone.utc), datetime.datetime(2026, 3, 6, 21, 8, 40, 677243, tzinfo=datetime.timezone.utc), datetime.datetime(2026, 3, 6, 21, 10, 47, 262, tzinfo=datetime.timezone.utc), datetime.datetime(2026, 3, 6, 21, 13, 14, 345583, tzinfo=datetime.timezone.utc), datetime.datetime(2026, 3, 6, 21, 15, 32, 974671, tzinfo=datetime.timezone.utc)]
def oszi_data(from_t, to_t):
from_i = grab.sample_index(from_t)
to_i = grab.sample_index(to_t)
frames = grab.samples[from_i:to_i]
times = [grab.datetime(i) for i in range(from_i, to_i)]
df = pandas.DataFrame(
{
"samples": frames,
},
index= [(t.second-60 if 30 < t.second else t.second) + t.microsecond/1e6 for t in times]
)
df.plot(grid=True, figsize=(12,6), xlabel="seconds into minute")
return df
# Manually fished out of the data using the two lines below and experimentation.
impulse_minutes_starts = [
datetime(2026, 3, 6, 21, 2, 58, 0, tzinfo=UTC),
datetime(2026, 3, 6, 21, 5, 17, 0, tzinfo=UTC),
datetime(2026, 3, 6, 21, 7, 35, 0, tzinfo=UTC),
datetime(2026, 3, 6, 21, 9, 53, 0, tzinfo=UTC),
datetime(2026, 3, 6, 21, 12, 11, 0, tzinfo=UTC),
datetime(2026, 3, 6, 21, 14, 29, 0, tzinfo=UTC),
]
start = impulse_minutes_starts[0]
oszi_data(start+timedelta(seconds=0), start+timedelta(seconds=0.6))
pass
# There was something wrong with at least the second impulse batch. Document the problem:
start = impulse_minutes_starts[2] # + timedelta(seconds=1)
oszi_data(start-timedelta(seconds=1), start-timedelta(seconds=0.4))
oszi_data(start, start+timedelta(seconds=0.6))
oszi_data(start+timedelta(seconds=58), start + timedelta(seconds=58.6))
oszi_data(start+timedelta(seconds=59), start + timedelta(seconds=59.6))
# oszi_data(start - delta, start + timedelta(seconds=0.5) - delta)
pass
-1.947 +
[(t1-t0).total_seconds() for t0, t1 in zip(impulse_minutes_starts[:-1],impulse_minutes_starts[1:])]
[139.0, 138.0, 138.0, 138.0, 138.0]
# Find the PPS delay for the minute i, for i in 0 to 5
def establish_pps_delay(i: int):
# First step: Start investigation to find the impulse minute:
start_investigation = impulse_minutes_starts[i]
# They tend to goof on the last impulse train:
def find_intervals(start_t, expected_num_of_impulsetrains=59):
result = []
for i in range(0, expected_num_of_impulsetrains):
now_t = (start_t + timedelta(seconds=i))
result.append(fse.find_edge(grab, now_t, now_t + timedelta(seconds=0.2)))
return result
impulse_starts = find_intervals(start_investigation)
median_µs = sorted([i_start.microsecond for i_start in impulse_starts])[len(impulse_starts)//2]
filtered_impulse_starts = [
i_start for i_start in impulse_starts if median_µs-2000 < i_start.microsecond < median_µs+2000
]
filtered_impulse_starts_df = pandas.DataFrame(
{
"ms": [i_start.microsecond / 1e3 for i_start in filtered_impulse_starts]
},
index = filtered_impulse_starts
)
avg_µs = sum((i_start.microsecond for i_start in filtered_impulse_starts)) / len(filtered_impulse_starts)
filtered_impulse_starts_df.plot(kind="kde", grid=True, figsize=(12,6), xlabel="delay / ms")
return median_µs, avg_µs
pps_median_avgs = [establish_pps_delay(i) for i in range(0,6)]
pps_median_avgs
[(40495, 40519.017857142855), (40686, 40686.96078431373), (40742, 40803.61818181818), (40960, 40865.104166666664), (40557, 40584.769230769234), (40492, 40495.54385964912)]
impulse_delay_into_second = sum((ma[1] for ma in pps_median_avgs)) / len(pps_median_avgs)
impulse_delay_into_second * 1e-3 # ms
40.659169013393296
# Establish length of impulse
def find_rising_and_falling_edges(i, from_delta: timedelta, to_delta: timedelta):
# First step: Start investigation to find the impulse minute:
start_investigation = impulse_minutes_starts[i]
# They tend to goof on the last impulse train:
def find_intervals(start_t, expected_num_of_impulsetrains=59):
result = []
for i in range(0, expected_num_of_impulsetrains):
now_t = (start_t + timedelta(seconds=i))
rising_edge = fse.find_edge(grab, now_t + from_delta, now_t + to_delta, True)
falling_edge = fse.find_edge(grab, now_t + from_delta, now_t + to_delta, False)
result.append((rising_edge, falling_edge))
return result
edges = find_intervals(start_investigation)
median_rising_edge_µs = sorted((edge[0].microsecond for edge in edges))[len(edges)//2]
median_falling_edge_µs = sorted((edge[1].microsecond for edge in edges))[len(edges)//2]
filtered_edges = [
edge for edge in edges
if median_rising_edge_µs - 2000 < edge[0].microsecond < median_rising_edge_µs + 2000 and \
median_falling_edge_µs - 2000 < edge[1].microsecond < median_falling_edge_µs + 2000
]
filtered_edges_df = pandas.DataFrame(
{
# "rising ms": [edge[0].microsecond * 1e-3 for edge in filtered_edges],
# "falling ms": [edge[1].microsecond * 1e-3 for edge in filtered_edges],
"duration ms" : [(edge[1].microsecond-edge[0].microsecond) * 1e-3 for edge in filtered_edges],
},
index = [edge[0] + 0.5 * (edge[1]-edge[0]) for edge in filtered_edges]
)
filtered_edges_df.plot(kind="kde", grid=True, figsize=(12,6), xlabel="delay / ms")
avg_rising_edge = sum((e[0].microsecond for e in filtered_edges)) / len(filtered_edges)
avg_falling_edge = sum((e[1].microsecond for e in filtered_edges)) / len(filtered_edges)
return (avg_rising_edge, avg_falling_edge, len(filtered_edges))
ar_af_ls = [
find_rising_and_falling_edges(i, timedelta(seconds=0), timedelta(seconds=0.2))
for i in range(0,6)
]
lengths = [(c[1]-c[0])*1e-3 for c in ar_af_ls]
sum(lengths)/len(lengths), lengths, ar_af_ls
(99.18496864184648, [99.20410416666665, 99.18851162790698, 99.13083333333333, 99.26254838709679, 99.20416216216216, 99.11965217391305], [(40520.3125, 139724.41666666666, 48), (40699.51162790698, 139888.02325581395, 43), (40944.95238095238, 140075.7857142857, 42), (40865.22580645161, 140127.7741935484, 31), (40620.43243243243, 139824.5945945946, 37), (40585.30434782609, 139704.95652173914, 46)])
ar_af_ls = [
find_rising_and_falling_edges(i, timedelta(seconds=0.24), timedelta(seconds=0.35))
for i in range(0,6)
]
lengths = [(c[1]-c[0])*1e-3 for c in ar_af_ls]
sum(lengths)/len(lengths), lengths, ar_af_ls
(48.99869847934679, [49.038224489795915, 49.10063414634144, 49.09465116279072, 48.817156250000004, 48.94359459459456, 48.99793023255814], [(240690.02040816325, 289728.2448979592, 49), (240746.9024390244, 289847.53658536583, 41), (241021.0, 290115.6511627907, 43), (241229.5, 290046.65625, 32), (240844.35135135136, 289787.9459459459, 37), (240720.39534883722, 289718.32558139536, 43)])
ar_af_ls = [
find_rising_and_falling_edges(i, timedelta(seconds=0.35), timedelta(seconds=0.5))
for i in range(0,6)
]
lengths = [(c[1]-c[0])*1e-3 for c in ar_af_ls]
sum(lengths)/len(lengths), lengths, ar_af_ls, sum((c[2] for c in ar_af_ls))
(43.239344477706794, [43.18882978723408, 43.30153488372092, 43.2755581395349, 43.190290322580665, 43.248342105263205, 43.23151162790699], [(378558.08510638296, 421746.91489361704, 47), (378669.6046511628, 421971.1395348837, 43), (378758.9534883721, 422034.511627907, 43), (378904.93548387097, 422095.22580645164, 31), (378650.9736842105, 421899.3157894737, 38), (378534.4418604651, 421765.9534883721, 43)], 245)
avg_audio_frequency + RF_FREQUENCY, 43.239+1.1, impulse_delay_into_second * 1e-3 - TOTAL_DELAY_FMTLAB * 1e3
(np.float64(3565712.535748807), 44.339, 12.159169013393296)
# This is the only change after the hand-in of my results:
fmtlab_official_frequency = 3565712.72
print(f"Frequency error: {3565712.536 - fmtlab_official_frequency:.3f} Hz")
fmtlab_official_pps_offset = 11.4
print(f"PPS offset error: {12.159 - fmtlab_official_pps_offset:.3f} ms")
fmtlab_official_pulse_length = 44.3
print(f"Pulse length error: {44.239 - fmtlab_official_pulse_length:.3f} ms")
Frequency error: -0.184 Hz PPS offset error: 0.759 ms Pulse length error: -0.061 ms