Source code for omxplayer.bus_finder
import os.path
import time
from glob import glob
import logging
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())
[docs]class BusFinder(object):
def __init__(self, path=None):
self.path = path
logger.debug('BusFinder initialised with path: %s' % path)
[docs] def get_address(self):
self.wait_for_file()
logger.debug('Opening file at %s' % self.path)
with open(self.path, 'r') as f:
logger.debug('Opened file at %s' % self.path)
self.address = f.read().strip()
logger.debug('Address \'%s\' parsed from file' % self.address)
return self.address
[docs] def find_address_file(self):
"""
Finds the OMXPlayer DBus connection
Assumes there is an alive OMXPlayer process.
:return:
"""
possible_address_files = []
while not possible_address_files:
# filter is used here as glob doesn't support regexp :(
isnt_pid_file = lambda path: not path.endswith('.pid')
possible_address_files = list(filter(isnt_pid_file,
glob('/tmp/omxplayerdbus.*')))
possible_address_files.sort(key=lambda path: os.path.getmtime(path))
time.sleep(0.05)
self.path = possible_address_files[-1]
[docs] def wait_for_path_to_exist(self):
while not os.path.isfile(self.path):
time.sleep(0.05)
[docs] def wait_for_dbus_address_to_be_written_to_file(self):
while not os.path.getsize(self.path):
time.sleep(0.05)
[docs] def wait_for_file(self):
if self.path:
self.wait_for_path_to_exist()
else:
self.find_address_file()
self.wait_for_dbus_address_to_be_written_to_file()