File indexing completed on 2024-05-12 04:52:38

0001 #!/usr/bin/env python3
0002 
0003 # correlation.py
0004 import subprocess
0005 import numpy
0006 import sys
0007 
0008 # seconds to sample audio file for
0009 sample_time = 500  # number of points to scan cross correlation over
0010 span = 150  # step size (in points) of cross correlation
0011 step = 1  # minimum number of points that must overlap in cross correlation
0012 # exception is raised if this cannot be met
0013 min_overlap = 20  # report match when cross correlation has a peak exceeding threshold
0014 threshold = 0.8
0015 
0016 
0017 # calculate fingerprint
0018 def calculate_fingerprints(filename):
0019     # Convert to wav
0020     tmpFile = "/tmp/out1.wav"
0021     # subprocess.getoutput('ffmpeg -y -i %s -filter:a "atempo=0.5" -vn %s' % (filename, tmpFile))
0022     subprocess.getoutput("ffmpeg -y -i %s -vn %s" % (filename, tmpFile))
0023     fpcalc_out = subprocess.getoutput(
0024         "fpcalc -raw -length %i %s" % (sample_time, tmpFile)
0025     )
0026     fingerprint_index = fpcalc_out.find("FINGERPRINT=") + 12
0027     # convert fingerprint to list of integers
0028     intlist = fpcalc_out[fingerprint_index:]
0029     fingerprints = list(map(int, intlist.split(",")))
0030     return fingerprints
0031     # returns correlation between lists
0032 
0033 
0034 def correlation(listx, listy):
0035     if len(listx) == 0 or len(listy) == 0:
0036         # Error checking in main program should prevent us from ever being
0037         # able to get here.
0038         raise Exception("Empty lists cannot be correlated.")
0039     if len(listx) > len(listy):
0040         listx = listx[: len(listy)]
0041     elif len(listx) < len(listy):
0042         listy = listy[: len(listx)]
0043 
0044     covariance = 0
0045     for i in range(len(listx)):
0046         covariance += 32 - bin(listx[i] ^ listy[i]).count("1")
0047     covariance = covariance / float(len(listx))
0048     return covariance / 32
0049     # return cross correlation, with listy offset from listx
0050 
0051 
0052 def cross_correlation(listx, listy, offset):
0053     if offset > 0:
0054         listx = listx[offset:]
0055         listy = listy[: len(listx)]
0056     elif offset < 0:
0057         offset = -offset
0058         listy = listy[offset:]
0059         listx = listx[: len(listy)]
0060     if min(len(listx), len(listy)) < min_overlap:
0061         # Error checking in main program should prevent us from ever being
0062         # able to get here.
0063         return
0064     # raise Exception('Overlap too small: %i' % min(len(listx), len(listy)))
0065     return correlation(listx, listy)
0066     # cross correlate listx and listy with offsets from -span to span
0067 
0068 
0069 def compare(listx, listy, span, step):
0070     if span > min(len(listx), len(listy)):
0071         # Error checking in main program should prevent us from ever being
0072         # able to get here.
0073         raise Exception(
0074             "span >= sample size: %i >= %i\n" % (span, min(len(listx), len(listy)))
0075             + "Reduce span, reduce crop or increase sample_time."
0076         )
0077 
0078     corr_xy = []
0079     for offset in numpy.arange(-span, span + 1, step):
0080         corr_xy.append(cross_correlation(listx, listy, offset))
0081     return corr_xy
0082     # return index of maximum value in list
0083 
0084 
0085 def max_index(listx):
0086     max_index = 0
0087     max_value = listx[0]
0088     for i, value in enumerate(listx):
0089         if value > max_value:
0090             max_value = value
0091             max_index = i
0092     return max_index
0093 
0094 
0095 def get_max_corr(corr, source, target):
0096     max_corr_index = max_index(corr)
0097     max_corr_offset = -span + max_corr_index * step
0098     # print("max_corr_index = ", max_corr_index, "max_corr_offset = ", max_corr_offset)
0099     # report matches
0100     if corr[max_corr_index] > threshold:
0101         print(
0102             (
0103                 "MATCH with correlation of %.4f at offset %i"
0104                 % (corr[max_corr_index], max_corr_offset)
0105             )
0106         )
0107     else:
0108         print(
0109             (
0110                 "NO MATCH (Correlation: %.4f at offset %i)"
0111                 % (corr[max_corr_index], max_corr_offset)
0112             )
0113         )
0114 
0115 
0116 def correlate(source, target):
0117     fingerprint_source = calculate_fingerprints(source)
0118     fingerprint_target = calculate_fingerprints(target)
0119     corr = compare(fingerprint_source, fingerprint_target, span, step)
0120     max_corr_offset = get_max_corr(corr, source, target)
0121 
0122 
0123 if __name__ == "__main__":
0124     correlate(sys.argv[1], sys.argv[2])