#!/usr/bin/env python # irrp.py # 2015-12-21 # Public Domain """ A utility to record and then playback IR remote control codes. To record use ./irrp.py -r -g4 -fcodes 1 2 3 4 5 6 where -r record -g the GPIO connected to the IR receiver -f the file to store the codes and 1 2 3 4 5 6 is a list of codes to record. To playback use ./irrp.py -p -g17 -fcodes 2 3 4 where -p playback -g the GPIO connected to the IR transmitter -f the file storing the codes to transmit and 2 3 4 is a list of codes to transmit. OPTIONS -r record -p playback -g GPIO (receiver for record, transmitter for playback) -f file id1 id2 id3 list of ids to record or transmit RECORD --glitch ignore edges shorter than glitch microseconds, default 100 us --post expect post milliseconds of silence after code, default 15 ms --pre expect pre milliseconds of silence before code, default 200 ms --short reject codes with less than short pulses, default 10 --tolerance consider pulses the same if within tolerance percent, default 15 --no-confirm don't require a code to be repeated during record TRANSMIT --freq IR carrier frequency, default 38 kHz --gap gap in milliseconds between transmitted codes, default 100 ms """ import time import json import os import argparse import pigpio # http://abyz.co.uk/rpi/pigpio/python.html p = argparse.ArgumentParser() g = p.add_mutually_exclusive_group(required=True) g.add_argument("-p", "--play", help="play keys", action="store_true") g.add_argument("-r", "--record", help="record keys", action="store_true") p.add_argument("-g", "--gpio", help="GPIO for RX/TX", required=True, type=int) # p.add_argument("-f", "--file", help="Filename", required=True) p.add_argument("--irdata", help="IR Data") p.add_argument('id', nargs='+', type=str, help='IR codes') p.add_argument("--freq", help="frequency kHz", type=float, default=38.0) p.add_argument("--gap", help="key gap ms", type=int, default=100) p.add_argument("--glitch", help="glitch us", type=int, default=100) p.add_argument("--post", help="postamble ms", type=int, default=15) p.add_argument("--pre", help="preamble ms", type=int, default=200) p.add_argument("--short", help="short code length", type=int, default=10) p.add_argument("--tolerance", help="tolerance percent", type=int, default=15) p.add_argument("-v", "--verbose", help="Be verbose", action="store_true") p.add_argument("--no-confirm", help="No confirm needed", action="store_true") args = p.parse_args() GPIO = args.gpio # FILE = args.file IRDATA = args.irdata GLITCH = args.glitch PRE_MS = args.pre POST_MS = args.post FREQ = args.freq VERBOSE = args.verbose SHORT = args.short GAP_MS = args.gap NO_CONFIRM = args.no_confirm TOLERANCE = args.tolerance POST_US = POST_MS * 1000 PRE_US = PRE_MS * 1000 GAP_S = GAP_MS / 1000.0 CONFIRM = not NO_CONFIRM TOLER_MIN = (100 - TOLERANCE) / 100.0 TOLER_MAX = (100 + TOLERANCE) / 100.0 last_tick = 0 in_code = False code = [] fetching_code = False def backup(f): """ f -> f.bak -> f.bak1 -> f.bak2 """ try: os.rename(os.path.realpath(f)+".bak1", os.path.realpath(f)+".bak2") except: pass try: os.rename(os.path.realpath(f)+".bak", os.path.realpath(f)+".bak1") except: pass try: os.rename(os.path.realpath(f), os.path.realpath(f)+".bak") except: pass def carrier(gpio, frequency, micros): """ Generate carrier square wave. """ wf = [] cycle = 1000.0 / frequency cycles = int(round(micros/cycle)) on = int(round(cycle / 2.0)) sofar = 0 for c in range(cycles): target = int(round((c+1)*cycle)) sofar += on off = target - sofar sofar += off wf.append(pigpio.pulse(1< TOLER_MAX): return False for i in range(len(p1)): p1[i] = int(round((p1[i]+p2[i])/2.0)) if VERBOSE: print("after compare", p1, flush=True) return True def tidy_mark_space(records, base): ms = {} # Find all the unique marks (base=0) or spaces (base=1) # and count the number of times they appear, for rec in records: rl = len(records[rec]) for i in range(base, rl, 2): if records[rec][i] in ms: ms[records[rec][i]] += 1 else: ms[records[rec][i]] = 1 if VERBOSE: print("t_m_s A", ms, flush=True) v = None for plen in sorted(ms): # Now go through in order, shortest first, and collapse # pulses which are the same within a tolerance to the # same value. The value is the weighted average of the # occurences. # # E.g. 500x20 550x30 600x30 1000x10 1100x10 1700x5 1750x5 # # becomes 556(x80) 1050(x20) 1725(x10) # if v == None: e = [plen] v = plen tot = plen * ms[plen] similar = ms[plen] elif plen < (v*TOLER_MAX): e.append(plen) tot += (plen * ms[plen]) similar += ms[plen] else: v = int(round(tot/float(similar))) # set all previous to v for i in e: ms[i] = v e = [plen] v = plen tot = plen * ms[plen] similar = ms[plen] v = int(round(tot/float(similar))) # set all previous to v for i in e: ms[i] = v if VERBOSE: print("t_m_s B", ms, flush=True) for rec in records: rl = len(records[rec]) for i in range(base, rl, 2): records[rec][i] = ms[records[rec][i]] def tidy(records): tidy_mark_space(records, 0) # Marks. tidy_mark_space(records, 1) # Spaces. def end_of_code(): global code, fetching_code if len(code) > SHORT: normalise(code) fetching_code = False else: code = [] print("Short code, probably a repeat, try again", flush=True) def cbf(gpio, level, tick): global last_tick, in_code, code, fetching_code if level != pigpio.TIMEOUT: edge = pigpio.tickDiff(last_tick, tick) last_tick = tick if fetching_code: if (edge > PRE_US) and (not in_code): # Start of a code. in_code = True pi.set_watchdog(GPIO, POST_MS) # Start watchdog. elif (edge > POST_US) and in_code: # End of a code. in_code = False pi.set_watchdog(GPIO, 0) # Cancel watchdog. end_of_code() elif in_code: code.append(edge) else: pi.set_watchdog(GPIO, 0) # Cancel watchdog. if in_code: in_code = False end_of_code() pi = pigpio.pi() # Connect to Pi. if not pi.connected: exit(0) if args.record: # Record. # try: # f = open(FILE, "r") # records = json.load(f) # f.close() # except: # records = {} records = {} pi.set_mode(GPIO, pigpio.INPUT) # IR RX connected to this GPIO. pi.set_glitch_filter(GPIO, GLITCH) # Ignore glitches. cb = pi.callback(GPIO, pigpio.EITHER_EDGE, cbf) # Process each id print("読み取り開始:", flush=True) for arg in args.id: print("リモコンのボタンを押してください。", flush=True) code = [] fetching_code = True while fetching_code: time.sleep(0.1) print("OK", flush=True) time.sleep(0.5) if CONFIRM: press_1 = code[:] done = False tries = 0 while not done: print("確認のためもう一度同じボタンを押してください。", flush=True) code = [] fetching_code = True while fetching_code: time.sleep(0.1) press_2 = code[:] the_same = compare(press_1, press_2) if the_same: done = True records[arg] = press_1[:] print("OK", flush=True) time.sleep(0.5) else: tries += 1 if tries <= 3: print("信号が一致しません。", flush=True) else: print("信号が一致しませんでした。終了します。", flush=True) done = True time.sleep(0.5) else: # No confirm. records[arg] = code[:] pi.set_glitch_filter(GPIO, 0) # Cancel glitch filter. pi.set_watchdog(GPIO, 0) # Cancel watchdog. tidy(records) # backup(FILE) # f = open(FILE, "w") # f.write(json.dumps(records, sort_keys=True).replace("],", "],\n")+"\n") # f.close() # print('\n', flush=True) print(json.dumps(records), flush=True) else: # Playback. # try: # f = open(FILE, "r") # except: # print("Can't open: {}".format(FILE)) # exit(0) # print(FILE, flush=True) records = json.loads(IRDATA) # records = json.loads(FILE) # print(records, flush=True) # f.close() pi.set_mode(GPIO, pigpio.OUTPUT) # IR TX connected to this GPIO. pi.wave_add_new() emit_time = time.time() if VERBOSE: print("送信中", flush=True) for arg in args.id: if arg in records: code = records[arg] # Create wave marks_wid = {} spaces_wid = {} wave = [0]*len(code) for i in range(0, len(code)): ci = code[i] if i & 1: # Space if ci not in spaces_wid: pi.wave_add_generic([pigpio.pulse(0, 0, ci)]) spaces_wid[ci] = pi.wave_create() wave[i] = spaces_wid[ci] else: # Mark if ci not in marks_wid: wf = carrier(GPIO, FREQ, ci) pi.wave_add_generic(wf) marks_wid[ci] = pi.wave_create() wave[i] = marks_wid[ci] delay = emit_time - time.time() if delay > 0.0: time.sleep(delay) pi.wave_chain(wave) if VERBOSE: print("key " + arg, flush=True) while pi.wave_tx_busy(): time.sleep(0.002) emit_time = time.time() + GAP_S for i in marks_wid: pi.wave_delete(marks_wid[i]) marks_wid = {} for i in spaces_wid: pi.wave_delete(spaces_wid[i]) spaces_wid = {} print("Id: {}".format(arg), flush=True) else: print("Id {} not found".format(arg), flush=True) pi.stop() # Disconnect from Pi.