Module subsync.synchro
Subtitle Synchronizer
Expand source code
"""Subtitle Synchronizer"""
from subsync.synchro.synchronizer import Synchronizer
from subsync.synchro.controller import SyncController, SyncJobResult, SyncStatus
from subsync.synchro.task import SyncTask, SyncTaskList
from subsync.synchro.input import InputFile, SubFile, RefFile
from subsync.synchro.output import OutputFile
from subsync.synchro.channels import ChannelsMap
__all__ = [
'Synchronizer', 'SyncController',
'SyncTask', 'SyncTaskList',
'InputFile', 'SubFile', 'RefFile', 'OutputFile',
'ChannelsMap' ]
__pdoc__ = { key: False for key in [
'dictionary', 'encdetect', 'pipeline', 'speech', 'synchronizer',
'wordsdump', 'SyncTaskList',
]}
Sub-modules
subsync.synchro.channels
subsync.synchro.controller
subsync.synchro.input
subsync.synchro.output
subsync.synchro.task
Classes
class ChannelsMap
-
Expand source code
class ChannelsMap(object): def auto(): return AutoChannelsMap() def all(): return AllChannelsMap() def custom(cm): return CustomChannelsMap(cm) def deserialize(cm): if cm == 'auto' or cm == None: return AutoChannelsMap() elif cm == 'all': return AllChannelsMap() else: return CustomChannelsMap(cm) def getChannelDescription(ch): name = gizmo.AudioFormat.getChannelName(ch) desc = gizmo.AudioFormat.getChannelDescription(ch) if name and desc: return '{} ({})'.format(desc, name) else: return 'channel {}'.format(ch) def getChannelId(ch): try: return gizmo.AudioFormat.getChannelIdByName(ch) or int(ch) except: return None def getChannelName(ch): return gizmo.AudioFormat.getChannelName(ch) or str(ch) def layoutToIds(layout): i = 1 res = [] while i <= layout: if i & layout: res.append(i) i <<= 1 return res def __repr__(self): return self.serialize()
Subclasses
Methods
def all()
-
Expand source code
def all(): return AllChannelsMap()
def auto()
-
Expand source code
def auto(): return AutoChannelsMap()
def custom(cm)
-
Expand source code
def custom(cm): return CustomChannelsMap(cm)
def deserialize(cm)
-
Expand source code
def deserialize(cm): if cm == 'auto' or cm == None: return AutoChannelsMap() elif cm == 'all': return AllChannelsMap() else: return CustomChannelsMap(cm)
def getChannelDescription(ch)
-
Expand source code
def getChannelDescription(ch): name = gizmo.AudioFormat.getChannelName(ch) desc = gizmo.AudioFormat.getChannelDescription(ch) if name and desc: return '{} ({})'.format(desc, name) else: return 'channel {}'.format(ch)
def getChannelId(ch)
-
Expand source code
def getChannelId(ch): try: return gizmo.AudioFormat.getChannelIdByName(ch) or int(ch) except: return None
def getChannelName(ch)
-
Expand source code
def getChannelName(ch): return gizmo.AudioFormat.getChannelName(ch) or str(ch)
def layoutToIds(layout)
-
Expand source code
def layoutToIds(layout): i = 1 res = [] while i <= layout: if i & layout: res.append(i) i <<= 1 return res
class InputFile (path=None, stream=None, *, streamByType=None, streamByLang=None, lang=None, enc=None, fps=None, channels=None)
-
Subtitle/reference input file.
You should use either
SubFile
orRefFile
instead of this base class.Parameters
path
:str
- Input file path.
stream
:int
, optional- Stream number, 1-based.
streamByType
:str
, optional- Select stream by type ('sub' or 'audio').
streamByLang
:str
, optional- Select stream by 3-letter language code, relies of file metadata.
lang
:str
, optional- Stream language as 2- or 3-letter ISO code.
enc
:str
, optional- Character encoding,
None
for auto detection based bylang
. fps
:float
, optional- Framerate.
channels
:str
orChannelsMap
, optional- Audio channels to listen to, could be 'auto' or
None
for auto selection, 'all' to listen to all channels, or comma separated channels abbreviation, e.g. 'FL,FR'.
Notes
Here stream number is 1-based (first stream in file has number 1) but other methods expects 0-based numbers.
Expand source code
class InputFile(object): """Subtitle/reference input file. You should use either `SubFile` or `RefFile` instead of this base class. """ def __init__(self, path=None, stream=None, *, streamByType=None, streamByLang=None, lang=None, enc=None, fps=None, channels=None): """ Parameters ---------- path: str Input file path. stream: int, optional Stream number, 1-based. streamByType: str, optional Select stream by type ('sub' or 'audio'). streamByLang: str, optional Select stream by 3-letter language code, relies of file metadata. lang: str, optional Stream language as 2- or 3-letter ISO code. enc: str, optional Character encoding, `None` for auto detection based by `lang`. fps: float, optional Framerate. channels: str or subsync.synchro.channels.ChannelsMap, optional Audio channels to listen to, could be 'auto' or `None` for auto selection, 'all' to listen to all channels, or comma separated channels abbreviation, e.g. 'FL,FR'. Notes ----- Here stream number is 1-based (first stream in file has number 1) but other methods expects 0-based numbers. """ self.path = path self.no = None self.type = None self.streams = {} self.fps = None self.lang = None self.enc = None self.channels = ChannelsMap.auto() self.filetype = None if not hasattr(self, 'types'): self.types = None if path is not None: self.open(path) if stream is not None: self.select(stream - 1) if streamByType or streamByLang: self.selectBy(type=streamByType, lang=streamByLang) self.lang = lang or self.lang self.enc = enc or self.enc self.fps = fps or self.fps if type(channels) is str: self.channels = ChannelsMap.deserialize(channels) elif channels is not None: self.channels = channels def __lt__(self, other): if self.path != other.path: return self.path < other.path return self.no < other.no def stream(self): """Return selected stream.""" if self.no is not None: return self.streams[self.no] def open(self, path): """Open input file.""" ss = gizmo.Demux(path).getStreamsInfo() streams = {s.no: s for s in ss} for t in [ 'video', 'audio', 'subtitle/text' ]: if t in [ s.type for s in ss ]: self.filetype = t break self.path = path self.streams = streams self.no = None self.lang = None self.enc = None self.channels = ChannelsMap.auto() self.fps = None for stream in ss: if stream.frameRate: self.fps = stream.frameRate break if len(streams) > 0: self.selectFirstMatchingStream() def select(self, no): """Select stream by number.""" stream = self.streams[no] self.no = no self.type = stream.type self.lang = stream.lang if not self.lang or self.lang.lower() == 'und': self.lang = getLangFromPath(self.path) return stream def selectBy(self, type=None, lang=None): """Select stream by type and language (or only one of them).""" for s in self.streams.values(): if self.types and s.type not in self.types: continue if type and not s.type.startswith(type): continue if lang and lang != s.lang.lower(): continue return self.select(s.no) raise Error(_('There is no matching stream in {}').format(self.path)) \ .addn('path', self.path) \ .addn('type', type) \ .addn('language', lang) def selectFirstMatchingStream(self, types=None): if types is not None: self.types = types if self.streams == None or len(self.streams) == 0: return None if self.types == None: return self.select(min(self.streams)) for t in self.types: for no in sorted(self.streams): if self.streams[no].type == t: return self.select(no) def hasMatchingStream(self, types=None): types = types or self.types if types is None: return len(self.streams) > 0 for s in self.streams.values(): if s.type in types: return True return False def isOpen(self): """Check whether file is opened.""" return self.path != None def isSelect(self): """Check whether stream is selected.""" return self.path != None and self.no != None def serialize(self): res = {} if self.path: res['path'] = self.path if self.no is not None: res['stream'] = self.no + 1 if self.lang: res['lang'] = self.lang if self.enc: res['enc'] = self.enc if self.fps: res['fps'] = self.fps if self.channels and self.channels.type != 'auto': res['channels'] = self.channels.serialize() return res def __repr__(self): return utils.fmtobj(self.__class__.__name__, path = '{}:{}/{}'.format(self.path, self.no, len(self.streams)), type = self.type, lang = self.lang, enc = self.enc, fps = self.fps, channels = self.channels if self.channels.type != 'auto' else None) def __str__(self): return utils.fmtstr( '{}:{}/{}'.format(self.path, self.no, len(self.streams)), type = self.type, lang = self.lang, enc = self.enc, fps = self.fps, channels = self.channels if self.channels.type != 'auto' else None)
Subclasses
Methods
def hasMatchingStream(self, types=None)
-
Expand source code
def hasMatchingStream(self, types=None): types = types or self.types if types is None: return len(self.streams) > 0 for s in self.streams.values(): if s.type in types: return True return False
def isOpen(self)
-
Check whether file is opened.
Expand source code
def isOpen(self): """Check whether file is opened.""" return self.path != None
def isSelect(self)
-
Check whether stream is selected.
Expand source code
def isSelect(self): """Check whether stream is selected.""" return self.path != None and self.no != None
def open(self, path)
-
Open input file.
Expand source code
def open(self, path): """Open input file.""" ss = gizmo.Demux(path).getStreamsInfo() streams = {s.no: s for s in ss} for t in [ 'video', 'audio', 'subtitle/text' ]: if t in [ s.type for s in ss ]: self.filetype = t break self.path = path self.streams = streams self.no = None self.lang = None self.enc = None self.channels = ChannelsMap.auto() self.fps = None for stream in ss: if stream.frameRate: self.fps = stream.frameRate break if len(streams) > 0: self.selectFirstMatchingStream()
def select(self, no)
-
Select stream by number.
Expand source code
def select(self, no): """Select stream by number.""" stream = self.streams[no] self.no = no self.type = stream.type self.lang = stream.lang if not self.lang or self.lang.lower() == 'und': self.lang = getLangFromPath(self.path) return stream
def selectBy(self, type=None, lang=None)
-
Select stream by type and language (or only one of them).
Expand source code
def selectBy(self, type=None, lang=None): """Select stream by type and language (or only one of them).""" for s in self.streams.values(): if self.types and s.type not in self.types: continue if type and not s.type.startswith(type): continue if lang and lang != s.lang.lower(): continue return self.select(s.no) raise Error(_('There is no matching stream in {}').format(self.path)) \ .addn('path', self.path) \ .addn('type', type) \ .addn('language', lang)
def selectFirstMatchingStream(self, types=None)
-
Expand source code
def selectFirstMatchingStream(self, types=None): if types is not None: self.types = types if self.streams == None or len(self.streams) == 0: return None if self.types == None: return self.select(min(self.streams)) for t in self.types: for no in sorted(self.streams): if self.streams[no].type == t: return self.select(no)
def serialize(self)
-
Expand source code
def serialize(self): res = {} if self.path: res['path'] = self.path if self.no is not None: res['stream'] = self.no + 1 if self.lang: res['lang'] = self.lang if self.enc: res['enc'] = self.enc if self.fps: res['fps'] = self.fps if self.channels and self.channels.type != 'auto': res['channels'] = self.channels.serialize() return res
def stream(self)
-
Return selected stream.
Expand source code
def stream(self): """Return selected stream.""" if self.no is not None: return self.streams[self.no]
class OutputFile (path=None, *, enc=None, fps=None)
-
Subtitle target description - specifies where output should be saved.
Parameters
path
:str
-
Output path or pattern. Path could be literal or may contain following variables:
{sub_path}
/{ref_path}
- subtitle/reference full path;{sub_no}
/{ref_no}
- stream number;{sub_lang}
/{ref_lang}
- 3-letter language code;{sub_name}
/{ref_name}
- file name (without path and extension);{sub_dir}
/{ref_dir}
- directory path;{if:<field>:<value>}
- if field is set, append value;{if_not:<field>:<value>}
- if field is not set, append value.
enc
:str
, optional- Character encoding, default is 'UTF-8'.
fps
:float
, optional- Framerate, applies only for frame-based subtitles.
Notes
Subtitle format is derived from file extension, thus path must end with one of the supported extensions.
Examples
{ref_dir}/{ref_name}{if:sub_lang:.}{sub_lang}.srt
{sub_dir}/{sub_name}-out.ssa
Expand source code
class OutputFile(object): """Subtitle target description - specifies where output should be saved.""" def __init__(self, path=None, *, enc=None, fps=None): """ Parameters ---------- path: str Output path or pattern. Path could be literal or may contain following variables: - `{sub_path}`/`{ref_path}` - subtitle/reference full path; - `{sub_no}`/`{ref_no}` - stream number; - `{sub_lang}`/`{ref_lang}` - 3-letter language code; - `{sub_name}`/`{ref_name}` - file name (without path and extension); - `{sub_dir}`/`{ref_dir}` - directory path; - `{if:<field>:<value>}` - if field is set, append value; - `{if_not:<field>:<value>}` - if field is not set, append value. enc: str, optional Character encoding, default is 'UTF-8'. fps: float, optional Framerate, applies only for frame-based subtitles. Notes ----- Subtitle format is derived from file extension, thus path must end with one of the supported extensions. Examples -------- `{ref_dir}/{ref_name}{if:sub_lang:.}{sub_lang}.srt` `{sub_dir}/{sub_name}-out.ssa` """ self.path = path self.enc = enc or 'UTF-8' self.fps = fps self.pathFormatter = None def getPath(self, sub, ref): """Compile path pattern for given `sub` and `ref`.""" if self.pathFormatter is None: self.pathFormatter = PathFormatter() return self.pathFormatter.format(self.path, sub, ref) def validateOutputPattern(self): """Raise exception for invalid path pattern.""" validatePattern(self.path) def serialize(self): res = {} if self.path: res['path'] = self.path if self.enc: res['enc'] = self.enc if self.fps: res['fps'] = self.fps return res def __repr__(self): return utils.fmtobj(self.__class__.__name__, path = self.path, enc = self.enc, fps = self.fps) def __str__(self): return utils.fmtstr(self.path, enc = self.enc, fps = self.fps)
Methods
def getPath(self, sub, ref)
-
Compile path pattern for given
sub
andref
.Expand source code
def getPath(self, sub, ref): """Compile path pattern for given `sub` and `ref`.""" if self.pathFormatter is None: self.pathFormatter = PathFormatter() return self.pathFormatter.format(self.path, sub, ref)
def serialize(self)
-
Expand source code
def serialize(self): res = {} if self.path: res['path'] = self.path if self.enc: res['enc'] = self.enc if self.fps: res['fps'] = self.fps return res
def validateOutputPattern(self)
-
Raise exception for invalid path pattern.
Expand source code
def validateOutputPattern(self): """Raise exception for invalid path pattern.""" validatePattern(self.path)
class RefFile (*args, **kw)
-
Reference file.
Parameters
path
:str
- Input file path.
stream
:int
, optional- Stream number, 1-based.
streamByType
:str
, optional- Select stream by type ('sub' or 'audio').
streamByLang
:str
, optional- Select stream by 3-letter language code, relies of file metadata.
lang
:str
, optional- Stream language as 2- or 3-letter ISO code.
enc
:str
, optional- Character encoding,
None
for auto detection based bylang
. fps
:float
, optional- Framerate.
channels
:str
orChannelsMap
, optional- Audio channels to listen to, could be 'auto' or
None
for auto selection, 'all' to listen to all channels, or comma separated channels abbreviation, e.g. 'FL,FR'.
Notes
Here stream number is 1-based (first stream in file has number 1) but other methods expects 0-based numbers.
Expand source code
class RefFile(InputFile): """Reference file.""" types = ('subtitle/text', 'audio') """Supported stream types.""" def __init__(self, *args, **kw): self.types = RefFile.types super().__init__(*args, **kw)
Ancestors
Class variables
var types
-
Supported stream types.
Inherited members
class SubFile (*args, **kw)
-
Input subtitle file.
Parameters
path
:str
- Input file path.
stream
:int
, optional- Stream number, 1-based.
streamByType
:str
, optional- Select stream by type ('sub' or 'audio').
streamByLang
:str
, optional- Select stream by 3-letter language code, relies of file metadata.
lang
:str
, optional- Stream language as 2- or 3-letter ISO code.
enc
:str
, optional- Character encoding,
None
for auto detection based bylang
. fps
:float
, optional- Framerate.
channels
:str
orChannelsMap
, optional- Audio channels to listen to, could be 'auto' or
None
for auto selection, 'all' to listen to all channels, or comma separated channels abbreviation, e.g. 'FL,FR'.
Notes
Here stream number is 1-based (first stream in file has number 1) but other methods expects 0-based numbers.
Expand source code
class SubFile(InputFile): """Input subtitle file.""" types = ('subtitle/text',) """Supported stream types.""" def __init__(self, *args, **kw): self.types = SubFile.types super().__init__(*args, **kw)
Ancestors
Class variables
var types
-
Supported stream types.
Inherited members
class SyncController (listener=None, **kw)
-
Synchronization controller.
Controls subtitle synchronization process. Synchronization is performed asynchronously in separate threads controlled internally. Status is reported by optional callbacks.
Parameters
listener
:object
, optional- Object with methods named as following callback parameters.
onJobStart
:callable(subsync.synchro.task)
, optional- Called when the task synchronization starts.
onJobEnd
:callable(subsync.synchro.task, status, result)
, optional- Called when the task synchronization ends.
onJobUpdate
:callable(subsync.synchro.task, status)
, optional- Called periodically during synchronization to report current state.
onFinish
:callable(terminated)
, optional- Called when synchronizer ends its work - either all tasks are processed or synchronization was terminated;
onError
:callable(subsync.synchro.task, source, error)
, optional- Called to report non-terminal errors. Non-terminal errors are errors not causing synchronizer to stop, but could impact results, e.g. audio decoding errors.
Notes
Callbacks arguments:
- task :
SyncTask
- object passed toSyncController.synchronize()
; - status :
SyncStatus
- object reflecting current synchronization state; - result :
SyncJobResult
; - terminated :
bool
- whether synchronization was terminated; - source :
str
- error source, could be 'sub' or 'ref' for subtitle/reference extractor or 'core'; - error - exception instance.
Expand source code
class SyncController(object): """Synchronization controller. Controls subtitle synchronization process. Synchronization is performed asynchronously in separate threads controlled internally. Status is reported by optional callbacks. """ def __init__(self, listener=None, **kw): """ Parameters ---------- listener: object, optional Object with methods named as following callback parameters. onJobStart: callable(task), optional Called when the task synchronization starts. onJobEnd: callable(task, status, result), optional Called when the task synchronization ends. onJobUpdate: callable(task, status), optional Called periodically during synchronization to report current state. onFinish: callable(terminated), optional Called when synchronizer ends its work - either all tasks are processed or synchronization was terminated; onError: callable(task, source, error), optional Called to report non-terminal errors. Non-terminal errors are errors not causing synchronizer to stop, but could impact results, e.g. audio decoding errors. Notes ----- Callbacks arguments: - task : `subsync.synchro.SyncTask` - object passed to `SyncController.synchronize`; - status : `SyncStatus` - object reflecting current synchronization state; - result : `SyncJobResult`; - terminated : `bool` - whether synchronization was terminated; - source : `str` - error source, could be 'sub' or 'ref' for subtitle/reference extractor or 'core'; - error - exception instance. """ self._onJobStart = kw.get('onJobStart', getattr(listener, 'onJobStart', lambda task: None)) self._onJobInit = kw.get('onJobInit', getattr(listener, 'onJobInit', lambda task: None)) self._onJobEnd = kw.get('onJobEnd', getattr(listener, 'onJobEnd', lambda task, status, result: None)) self._onJobUpdate = kw.get('onJobUpdate', getattr(listener, 'onJobUpdate', lambda task, status: None)) self._onFinish = kw.get('onFinish', getattr(listener, 'onFinish', lambda terminated: None)) self._onError = kw.get('onError', getattr(listener, 'onError', lambda task, source, error: None)) self._options = settings().getSynchronizationOptions() self._thread = None self._semaphore = threading.Semaphore() self._sync = None self._terminated = False def configure(self, **kw): """Override default synchronization options. Parameters ---------- maxPointDist: float, optional Maximum acceptable synchronization error, in seconds (default 2). minPointsNo: int, optional Minimum synchronization points no (default 20). outputCharEnc: str or None, optional Output character encoding, `None` for the same encoding as input subtitles (default 'UTF-8'). windowSize: float, optional Synchronization window, in seconds. Timestamps will be corrected no more than this value (default 1800). minWordProb: float, optional Minimum speech recognition score, between 0 and 1 (default 0.3). jobsNo: int or None, optional Number of concurent synchronization threads, `None` for auto (default `None`). minWordLen: int, optional Minumum word length, in letters. Shorter words will not be used as synchronization points (default 5). minCorrelation: float, optional Minimum correlation factor, between 0 and 1 (default 0.9999). minWordsSim: float, optional Minimum words similarity to be used as synchronization points, between 0 and 1 (default 0.6). minEffort: float, optional Controls when synchronization should be stopped, between 0 and 1 (default 0.5). outTimeOffset: float, optional Add constant offset to output subtitles, in seconds (default 0). overwrite: bool, optional Whether to overwrite existing output files. If not set, output file name will be suffixed with number if needed, to avoid overwritting (default `False`). """ for key, val in kw.items(): if key not in self._options: raise TypeError("Unexpected keyword argument '{}'".format(key)) self._options[key] = val def synchronize(self, tasks, *, timeout=None, interactive=False): """Start task[s] synchronization. Parameters ---------- tasks: SyncTask or iterable of SyncTask Single or several synchronization tasks. timeout: float or None, optional How often to call `onJobUpdate` callback, in seconds (if registered). `None` to not call it at all. interactive: bool, optional In interactive, tasks subtitles are not saved automatically (`minEffort` and task `out` field are ignored) and user is expected to save it manually either with `saveSynchronizedSubtitles` or `getSynchronizedSubtitles`. Notes ----- Synchronization status changes are notified with callbacks registered in constructor. `onFinish` is called only when `tasks` is iterable (as oposed to single `SyncTask` object). """ if self.isRunning(): raise RuntimeError('Another synchronization in progress') logger.debug('synchronization options: %s', self._options) self._terminated = False if isinstance(tasks, Iterable): for task in tasks: self.validateTask(task, interactive=interactive) self._thread = threading.Thread( target=self._run, args=(tasks, timeout, interactive), name='Synchronizer') else: self.validateTask(tasks, interactive=interactive) self._thread = threading.Thread( target=self._runTask, args=(tasks, timeout, interactive), name='Synchronizer') self._thread.start() def terminate(self): """Terminate running synchronization. Does nothing if synchronization is not running. """ if self.isRunning(): self._terminated = True self._semaphore.release() def isRunning(self): """Check if synchronization is running. Returns ------- bool """ return self._thread and self._thread.is_alive() def wait(self): """Block until synchronization ends. Returns ------- bool `True` if synchronization finished (either successfully or not), `False` if it was terminated with `SyncController.terminate`. """ if self._thread: self._thread.join() return not self._terminated def getStatus(self): """Return current synchronization status. Returns ------- SyncStatus or None """ return self._sync and self._sync.getStatus() def getProgress(self): """Return synchronization progress. Returns ------- float or None Progress of currently processed `SyncTask`, between 0 and 1. `None` if no task was run. """ return self._sync and self._sync.getProgress() def getSynchronizedSubtitles(self): if self._sync is None: raise RuntimeError('Subtitles not synchronized') return self._sync.getSynchronizedSubtitles() def saveSynchronizedSubtitles(self, path=None, task=None): """Save synchronized subtitles. Parameters ---------- path: str, optional Path to output subtitles. task: SyncTask, optional `out` field of task will be used as output description. Notes ----- At least one parameter must be set. Subtitles will be saved in `path` or in `task`.out.path if `path` is not set. `task`.out character encoding and framerate will be used, if set. """ if not path and not task: raise RuntimeError('At least one of the following arguments must be set: path or task') subs = self.getSynchronizedSubtitles() offset = self._options.get('outTimeOffset') if offset: logger.info('adjusting timestamps by offset %.3f', offset) subs.shift(s=offset) enc = (task and task.out and task.out.enc) \ or self._options.get('outputCharEnc') \ or (task and task.sub and task.sub.enc) or 'UTF-8' return subs.save(path=path or task.getOutputPath(), encoding=enc, fps=task and task.out and task.out.fps, overwrite=self._options.get('overwrite')) def validateTask(self, task, *, interactive=False): """Check if task is properly defined. Parameters ---------- task: SyncTask Task to validate. interactive: bool, optional For interactive synchronization `out` will not be vaildated. Raises ------ Error Invalid task. KeyError or Exception Invalid `task`.out.path pattern. """ sub, ref, out = task.sub, task.ref, task.out if sub is None or not sub.path or sub.no is None: raise Error('subtitles not set', task=task) if ref is None or not ref.path or ref.no is None: raise Error('reference file not set', task=task) if not interactive and (out is None or not out.path): raise Error('output path not set', task=task) if not interactive: out.validateOutputPattern() def _run(self, tasks, timeout, interactive): try: for no, task in enumerate(tasks): if not self._terminated: logger.info('running task %i/%i: %r', no, len(tasks), task) self._runTask(task, timeout, interactive) else: break except Exception as err: logger.warning('%r', err, exc_info=True) self._onError(None, 'core', err) finally: logger.info('synchronization finished') self._onFinish(self._terminated) def _runTask(self, task, timeout, interactive): try: from .synchronizer import Synchronizer self._onJobStart(task) self._sync = sync = Synchronizer(task.sub, task.ref) sync.onUpdate = self._semaphore.release sync.onError = lambda src, err: self._onError(task, src, err) sync.init(self._options, runCb=lambda: not self._terminated) if not self._terminated: sync.start() self._onJobInit(task) status = sync.getStatus() minEffort = self._options.get('minEffort', 1.0) if timeout is not None: lastTime = time.monotonic() - timeout while not self._terminated and sync.isRunning() \ and (interactive or minEffort >= 1.0 or status.effort < minEffort): self._semaphore.acquire(timeout=timeout) status = sync.getStatus() if timeout is not None: now = time.monotonic() if now - lastTime >= timeout: lastTime = now self._onJobUpdate(task, status) except Exception as err: logger.warning('%r', err, exc_info=True) self._onError(task, 'core', err) try: sync.stop(force=True) status = sync.getStatus() logger.info('result: %r', status) succeeded = not self._terminated and status and status.correlated path = None if not interactive and succeeded and task.out: try: path = self.saveSynchronizedSubtitles(task=task) except Exception as err: logger.warning('subtitle save failed: %r', err, exc_info=True) self._onError(task, 'core', err) succeeded = False res = SyncJobResult(succeeded, self._terminated, path) self._onJobEnd(task, status, res) except Exception as err: logger.warning('%r', err, exc_info=True) self._onError(task, 'core', err) finally: sync.destroy() logger.info('task finished %r', task)
Methods
def configure(self, **kw)
-
Override default synchronization options.
Parameters
maxPointDist
:float
, optional- Maximum acceptable synchronization error, in seconds (default 2).
minPointsNo
:int
, optional- Minimum synchronization points no (default 20).
outputCharEnc
:str
orNone
, optional- Output character encoding,
None
for the same encoding as input subtitles (default 'UTF-8'). windowSize
:float
, optional- Synchronization window, in seconds. Timestamps will be corrected no more than this value (default 1800).
minWordProb
:float
, optional- Minimum speech recognition score, between 0 and 1 (default 0.3).
jobsNo
:int
orNone
, optional- Number of concurent synchronization threads,
None
for auto (defaultNone
). minWordLen
:int
, optional- Minumum word length, in letters. Shorter words will not be used as synchronization points (default 5).
minCorrelation
:float
, optional- Minimum correlation factor, between 0 and 1 (default 0.9999).
minWordsSim
:float
, optional- Minimum words similarity to be used as synchronization points, between 0 and 1 (default 0.6).
minEffort
:float
, optional- Controls when synchronization should be stopped, between 0 and 1 (default 0.5).
outTimeOffset
:float
, optional- Add constant offset to output subtitles, in seconds (default 0).
overwrite
:bool
, optional- Whether to overwrite existing output files. If not set, output file
name will be suffixed with number if needed, to avoid overwritting
(default
False
).
Expand source code
def configure(self, **kw): """Override default synchronization options. Parameters ---------- maxPointDist: float, optional Maximum acceptable synchronization error, in seconds (default 2). minPointsNo: int, optional Minimum synchronization points no (default 20). outputCharEnc: str or None, optional Output character encoding, `None` for the same encoding as input subtitles (default 'UTF-8'). windowSize: float, optional Synchronization window, in seconds. Timestamps will be corrected no more than this value (default 1800). minWordProb: float, optional Minimum speech recognition score, between 0 and 1 (default 0.3). jobsNo: int or None, optional Number of concurent synchronization threads, `None` for auto (default `None`). minWordLen: int, optional Minumum word length, in letters. Shorter words will not be used as synchronization points (default 5). minCorrelation: float, optional Minimum correlation factor, between 0 and 1 (default 0.9999). minWordsSim: float, optional Minimum words similarity to be used as synchronization points, between 0 and 1 (default 0.6). minEffort: float, optional Controls when synchronization should be stopped, between 0 and 1 (default 0.5). outTimeOffset: float, optional Add constant offset to output subtitles, in seconds (default 0). overwrite: bool, optional Whether to overwrite existing output files. If not set, output file name will be suffixed with number if needed, to avoid overwritting (default `False`). """ for key, val in kw.items(): if key not in self._options: raise TypeError("Unexpected keyword argument '{}'".format(key)) self._options[key] = val
def getProgress(self)
-
Return synchronization progress.
Returns
float
orNone
- Progress of currently processed
SyncTask
, between 0 and 1.None
if no task was run.
Expand source code
def getProgress(self): """Return synchronization progress. Returns ------- float or None Progress of currently processed `SyncTask`, between 0 and 1. `None` if no task was run. """ return self._sync and self._sync.getProgress()
def getStatus(self)
-
Return current synchronization status.
Returns
SyncStatus
orNone
Expand source code
def getStatus(self): """Return current synchronization status. Returns ------- SyncStatus or None """ return self._sync and self._sync.getStatus()
def getSynchronizedSubtitles(self)
-
Expand source code
def getSynchronizedSubtitles(self): if self._sync is None: raise RuntimeError('Subtitles not synchronized') return self._sync.getSynchronizedSubtitles()
def isRunning(self)
-
Check if synchronization is running.
Returns
bool
Expand source code
def isRunning(self): """Check if synchronization is running. Returns ------- bool """ return self._thread and self._thread.is_alive()
def saveSynchronizedSubtitles(self, path=None, task=None)
-
Save synchronized subtitles.
Parameters
path
:str
, optional- Path to output subtitles.
task
:SyncTask
, optionalout
field of task will be used as output description.
Notes
At least one parameter must be set. Subtitles will be saved in
path
or insubsync.synchro.task
.out.path ifpath
is not set.subsync.synchro.task
.out character encoding and framerate will be used, if set.Expand source code
def saveSynchronizedSubtitles(self, path=None, task=None): """Save synchronized subtitles. Parameters ---------- path: str, optional Path to output subtitles. task: SyncTask, optional `out` field of task will be used as output description. Notes ----- At least one parameter must be set. Subtitles will be saved in `path` or in `task`.out.path if `path` is not set. `task`.out character encoding and framerate will be used, if set. """ if not path and not task: raise RuntimeError('At least one of the following arguments must be set: path or task') subs = self.getSynchronizedSubtitles() offset = self._options.get('outTimeOffset') if offset: logger.info('adjusting timestamps by offset %.3f', offset) subs.shift(s=offset) enc = (task and task.out and task.out.enc) \ or self._options.get('outputCharEnc') \ or (task and task.sub and task.sub.enc) or 'UTF-8' return subs.save(path=path or task.getOutputPath(), encoding=enc, fps=task and task.out and task.out.fps, overwrite=self._options.get('overwrite'))
def synchronize(self, tasks, *, timeout=None, interactive=False)
-
Start task[s] synchronization.
Parameters
tasks
:SyncTask
oriterable
ofSyncTask
- Single or several synchronization tasks.
timeout
:float
orNone
, optional- How often to call
onJobUpdate
callback, in seconds (if registered).None
to not call it at all. interactive
:bool
, optional- In interactive, tasks subtitles are not saved automatically
(
minEffort
and taskout
field are ignored) and user is expected to save it manually either withsaveSynchronizedSubtitles
orgetSynchronizedSubtitles
.
Notes
Synchronization status changes are notified with callbacks registered in constructor.
onFinish
is called only whentasks
is iterable (as oposed to singleSyncTask
object).Expand source code
def synchronize(self, tasks, *, timeout=None, interactive=False): """Start task[s] synchronization. Parameters ---------- tasks: SyncTask or iterable of SyncTask Single or several synchronization tasks. timeout: float or None, optional How often to call `onJobUpdate` callback, in seconds (if registered). `None` to not call it at all. interactive: bool, optional In interactive, tasks subtitles are not saved automatically (`minEffort` and task `out` field are ignored) and user is expected to save it manually either with `saveSynchronizedSubtitles` or `getSynchronizedSubtitles`. Notes ----- Synchronization status changes are notified with callbacks registered in constructor. `onFinish` is called only when `tasks` is iterable (as oposed to single `SyncTask` object). """ if self.isRunning(): raise RuntimeError('Another synchronization in progress') logger.debug('synchronization options: %s', self._options) self._terminated = False if isinstance(tasks, Iterable): for task in tasks: self.validateTask(task, interactive=interactive) self._thread = threading.Thread( target=self._run, args=(tasks, timeout, interactive), name='Synchronizer') else: self.validateTask(tasks, interactive=interactive) self._thread = threading.Thread( target=self._runTask, args=(tasks, timeout, interactive), name='Synchronizer') self._thread.start()
def terminate(self)
-
Terminate running synchronization.
Does nothing if synchronization is not running.
Expand source code
def terminate(self): """Terminate running synchronization. Does nothing if synchronization is not running. """ if self.isRunning(): self._terminated = True self._semaphore.release()
def validateTask(self, task, *, interactive=False)
-
Check if task is properly defined.
Parameters
task
:SyncTask
- Task to validate.
interactive
:bool
, optional- For interactive synchronization
out
will not be vaildated.
Raises
Error
- Invalid task.
KeyError
orException
- Invalid
subsync.synchro.task
.out.path pattern.
Expand source code
def validateTask(self, task, *, interactive=False): """Check if task is properly defined. Parameters ---------- task: SyncTask Task to validate. interactive: bool, optional For interactive synchronization `out` will not be vaildated. Raises ------ Error Invalid task. KeyError or Exception Invalid `task`.out.path pattern. """ sub, ref, out = task.sub, task.ref, task.out if sub is None or not sub.path or sub.no is None: raise Error('subtitles not set', task=task) if ref is None or not ref.path or ref.no is None: raise Error('reference file not set', task=task) if not interactive and (out is None or not out.path): raise Error('output path not set', task=task) if not interactive: out.validateOutputPattern()
def wait(self)
-
Block until synchronization ends.
Returns
bool
True
if synchronization finished (either successfully or not),False
if it was terminated withSyncController.terminate()
.
Expand source code
def wait(self): """Block until synchronization ends. Returns ------- bool `True` if synchronization finished (either successfully or not), `False` if it was terminated with `SyncController.terminate`. """ if self._thread: self._thread.join() return not self._terminated
class SyncTask (sub=None, ref=None, out=None, data=None)
-
Synchronization task.
Parameters
sub
:SubFile
orInputFile
ordict
- Input subtitle file description. Proper object instance or
dict
with fields same as arguments toInputFile
. ref
:RefFile
orInputFile
ordict
- Reference file description. Proper object instance or
dict
with fields same as arguments toInputFile
. out
:OutputFile
ordict
- Output subtitle file description. Proper object instance or
dict
with fields same as arguments toOutputFile
. data
:any
, optional- Any user defined data, useful for passing extra information via
SyncController
callbacks.
Expand source code
class SyncTask(object): """Synchronization task.""" def __init__(self, sub=None, ref=None, out=None, data=None): """ Parameters ---------- sub: subsync.SubFile or subsync.InputFile or dict Input subtitle file description. Proper object instance or `dict` with fields same as arguments to `subsync.InputFile`. ref: subsync.RefFile or subsync.InputFile or dict Reference file description. Proper object instance or `dict` with fields same as arguments to `subsync.InputFile`. out: subsync.OutputFile or dict Output subtitle file description. Proper object instance or `dict` with fields same as arguments to `subsync.OutputFile`. data: any, optional Any user defined data, useful for passing extra information via `subsync.SyncController` callbacks. """ if isinstance(sub, dict): sub = SubFile(**sub) if isinstance(ref, dict): ref = RefFile(**ref) if isinstance(out, dict): out = OutputFile(**out) self.sub = sub self.ref = ref self.out = out self.data = data def getOutputPath(self): return self.out and self.out.getPath(self.sub, self.ref) def serialize(self): res = {} if self.sub: res['sub'] = self.sub.serialize() if self.ref: res['ref'] = self.ref.serialize() if self.out: res['out'] = self.out.serialize() return res def __repr__(self): return utils.fmtobj(self.__class__.__name__, sub = repr(self.sub), ref = repr(self.ref), out = repr(self.out), )
Methods
def getOutputPath(self)
-
Expand source code
def getOutputPath(self): return self.out and self.out.getPath(self.sub, self.ref)
def serialize(self)
-
Expand source code
def serialize(self): res = {} if self.sub: res['sub'] = self.sub.serialize() if self.ref: res['ref'] = self.ref.serialize() if self.out: res['out'] = self.out.serialize() return res
class Synchronizer (sub, ref)
-
Expand source code
class Synchronizer(object): def __init__(self, sub, ref): self.sub = sub self.ref = ref self.onUpdate = lambda: None self.onError = lambda src, err: None self.stats = gizmo.CorrelationStats() self.effortBegin = None self.statsLock = threading.Lock() self.correlator = None self.translator = None self.subPipeline = None self.refPipelines = [] self.pipelines = [] self.wordsDumpers = [] def destroy(self): logger.info('releasing synchronizer resources') if self.correlator: self.correlator.stop(force=True) for p in self.pipelines: p.stop() for p in self.pipelines: p.destroy() if self.correlator: self.correlator.wait() self.correlator.connectStatsCallback(None) self.subPipeline = None self.refPipelines = [] self.pipelines = [] self.translator = None self.dictionary = None self.refWordsSink = None for wd in self.wordsDumpers: wd.flush() self.wordsDumpers = [] def init(self, options, runCb=None): try: self._initInternal(options, runCb) except gizmo.ErrorTerminated: logger.info('initialization terminated') def _initInternal(self, options, runCb=None): logger.info('initializing synchronization jobs') for stream in (self.sub, self.ref): if stream.type == 'subtitle/text' and not stream.enc and len(stream.streams) == 1: stream.enc = encdetect.detectEncoding(stream.path, stream.lang) self.correlator = gizmo.Correlator( options['windowSize'], options['minCorrelation'], options['maxPointDist'], options['minPointsNo'], options['minWordsSim']) self.correlator.connectStatsCallback(self.onStatsUpdate) self.refWordsSink = self.correlator.pushRefWord self.subtitlesCollector = subtitle.SubtitlesCollector() self.subPipeline = pipeline.createProducerPipeline(self.sub) self.subPipeline.connectEosCallback(self.onSubEos) self.subPipeline.connectErrorCallback(self.onSubError) self.subPipeline.addSubsListener(self.subtitlesCollector.addSubtitle) self.subPipeline.addSubsListener(self.correlator.pushSubtitle) self.subPipeline.addWordsListener(self.correlator.pushSubWord) if self.sub.lang and self.ref.lang and self.sub.lang != self.ref.lang: self.dictionary = dictionary.loadDictionary(self.ref.lang, self.sub.lang, options['minWordLen']) self.translator = gizmo.Translator(self.dictionary) self.translator.setMinWordsSim(options['minWordsSim']) self.translator.addWordsListener(self.correlator.pushRefWord) self.refWordsSink = self.translator.pushWord self.refPipelines = pipeline.createProducerPipelines(self.ref, no=options['jobsNo'], runCb=runCb) for p in self.refPipelines: p.connectEosCallback(self.onRefEos) p.connectErrorCallback(self.onRefError) p.addWordsListener(self.refWordsSink) self.pipelines = [ self.subPipeline ] + self.refPipelines for p in self.pipelines: p.configure( minWordLen=options['minWordLen'], minWordProb=options['minWordProb']) dumpSources = { 'sub': [ self.subPipeline ], 'subPipe': [ self.subPipeline ], 'subRaw': [ self.subPipeline.getRawWordsSource() ], 'ref': [ self.translator ] if self.translator else self.refPipelines, 'refPipe': self.refPipelines, 'refRaw': [ p.getRawWordsSource() for p in self.refPipelines ], } for srcId, path in options.get('dumpWords', []): sources = dumpSources.get(srcId) if sources: logger.debug('dumping %s to %s (from %i sources)', srcId, path, len(sources)) if path: wd = wordsdump.WordsFileDump(path, overwrite=True) else: wd = wordsdump.WordsStdoutDump(srcId) self.wordsDumpers.append(wd) for source in sources: source.addWordsListener(wd.pushWord) def start(self): logger.info('starting synchronization') self.correlator.start('Correlator') for id, p in enumerate(self.pipelines): p.start('Pipeline{}'.format(id)) def stop(self, force=True): logger.info('stopping synchronizer') self.correlator.stop(force=force) for p in self.pipelines: p.stop() def isRunning(self): if self.correlator and self.correlator.isRunning(): return True for p in self.pipelines: if p.isRunning(): return True return False def getProgress(self): psum = 0.0 plen = 0 for pr in [ p.getProgress() for p in self.pipelines ]: if pr != None: psum += pr plen += 1 if self.correlator: cp = self.correlator.getProgress() res = cp * cp else: res = 0.0 if plen > 0: res *= psum / plen if res < 0: res = 0 if res > 1: res = 1 return res def getStatus(self): with self.statsLock: stats = self.stats progress = self.getProgress() begin = self.effortBegin effort = -1 if begin is not None and begin < 1.0: if progress <= 0.0: effort = 0.0 elif progress >= 1.0: effort = 1.0 else: effort = (progress - begin) / (1.0 - begin) return controller.SyncStatus( correlated = stats.correlated and self.subPipeline and not self.subPipeline.isRunning(), maxChange = self.subtitlesCollector.getMaxSubtitleDiff(stats.formula), progress = progress, factor = stats.factor, points = stats.points, formula = stats.formula, effort = effort) def getSynchronizedSubtitles(self): with self.statsLock: formula = self.stats.formula return self.subtitlesCollector.getSynchronizedSubtitles(formula) def onStatsUpdate(self, stats): logger.debug(stats) with self.statsLock: self.stats = stats if self.effortBegin == None and stats.correlated and self.subPipeline and not self.subPipeline.isRunning(): self.effortBegin = self.getProgress() self.onUpdate() def onSubEos(self): logger.info('subtitles read done') self.checkIfAllPipelinesDone() with self.statsLock: if self.effortBegin == None and self.stats.correlated: self.effortBegin = self.getProgress() def onRefEos(self): logger.info('references read done') self.checkIfAllPipelinesDone() def checkIfAllPipelinesDone(self): for p in self.pipelines: if p.isRunning(): return logger.info('stopping correlator') self.correlator.stop(force=False) self.onUpdate() def onSubError(self, err): logger.warning('SUB: %r', str(err).replace('\n', '; ')) self.onError('sub', err) def onRefError(self, err): logger.warning('REF: %r', str(err).replace('\n', '; ')) self.onError('ref', err)
Methods
def checkIfAllPipelinesDone(self)
-
Expand source code
def checkIfAllPipelinesDone(self): for p in self.pipelines: if p.isRunning(): return logger.info('stopping correlator') self.correlator.stop(force=False) self.onUpdate()
def destroy(self)
-
Expand source code
def destroy(self): logger.info('releasing synchronizer resources') if self.correlator: self.correlator.stop(force=True) for p in self.pipelines: p.stop() for p in self.pipelines: p.destroy() if self.correlator: self.correlator.wait() self.correlator.connectStatsCallback(None) self.subPipeline = None self.refPipelines = [] self.pipelines = [] self.translator = None self.dictionary = None self.refWordsSink = None for wd in self.wordsDumpers: wd.flush() self.wordsDumpers = []
def getProgress(self)
-
Expand source code
def getProgress(self): psum = 0.0 plen = 0 for pr in [ p.getProgress() for p in self.pipelines ]: if pr != None: psum += pr plen += 1 if self.correlator: cp = self.correlator.getProgress() res = cp * cp else: res = 0.0 if plen > 0: res *= psum / plen if res < 0: res = 0 if res > 1: res = 1 return res
def getStatus(self)
-
Expand source code
def getStatus(self): with self.statsLock: stats = self.stats progress = self.getProgress() begin = self.effortBegin effort = -1 if begin is not None and begin < 1.0: if progress <= 0.0: effort = 0.0 elif progress >= 1.0: effort = 1.0 else: effort = (progress - begin) / (1.0 - begin) return controller.SyncStatus( correlated = stats.correlated and self.subPipeline and not self.subPipeline.isRunning(), maxChange = self.subtitlesCollector.getMaxSubtitleDiff(stats.formula), progress = progress, factor = stats.factor, points = stats.points, formula = stats.formula, effort = effort)
def getSynchronizedSubtitles(self)
-
Expand source code
def getSynchronizedSubtitles(self): with self.statsLock: formula = self.stats.formula return self.subtitlesCollector.getSynchronizedSubtitles(formula)
def init(self, options, runCb=None)
-
Expand source code
def init(self, options, runCb=None): try: self._initInternal(options, runCb) except gizmo.ErrorTerminated: logger.info('initialization terminated')
def isRunning(self)
-
Expand source code
def isRunning(self): if self.correlator and self.correlator.isRunning(): return True for p in self.pipelines: if p.isRunning(): return True return False
def onRefEos(self)
-
Expand source code
def onRefEos(self): logger.info('references read done') self.checkIfAllPipelinesDone()
def onRefError(self, err)
-
Expand source code
def onRefError(self, err): logger.warning('REF: %r', str(err).replace('\n', '; ')) self.onError('ref', err)
def onStatsUpdate(self, stats)
-
Expand source code
def onStatsUpdate(self, stats): logger.debug(stats) with self.statsLock: self.stats = stats if self.effortBegin == None and stats.correlated and self.subPipeline and not self.subPipeline.isRunning(): self.effortBegin = self.getProgress() self.onUpdate()
def onSubEos(self)
-
Expand source code
def onSubEos(self): logger.info('subtitles read done') self.checkIfAllPipelinesDone() with self.statsLock: if self.effortBegin == None and self.stats.correlated: self.effortBegin = self.getProgress()
def onSubError(self, err)
-
Expand source code
def onSubError(self, err): logger.warning('SUB: %r', str(err).replace('\n', '; ')) self.onError('sub', err)
def start(self)
-
Expand source code
def start(self): logger.info('starting synchronization') self.correlator.start('Correlator') for id, p in enumerate(self.pipelines): p.start('Pipeline{}'.format(id))
def stop(self, force=True)
-
Expand source code
def stop(self, force=True): logger.info('stopping synchronizer') self.correlator.stop(force=force) for p in self.pipelines: p.stop()