Package subsync
Subtitle Speech Synchronizer https://subsync.online
This is an automatic movie subtitle synchronization library.
Expand source code
"""Subtitle Speech Synchronizer https://subsync.online
This is an automatic movie subtitle synchronization library.
"""
import gizmo
from .synchro import SyncController, SyncTask, InputFile, SubFile, RefFile, OutputFile
from .assets import assetManager, AssetList
__all__ = [
'synchronize', 'version', 'subsync',
'SyncController', 'SyncTask',
'InputFile', 'SubFile', 'RefFile', 'OutputFile',
'assetManager', 'AssetList',
]
__pdoc__ = { key: False for key in [
'cli', 'cmdargs', 'config', 'data', 'error', 'gui', 'img', 'loggercfg',
'pubkey', 'settings', 'subtitle', 'thread', 'translations', 'utils',
'validator',
]}
def synchronize(sub, ref, out, *, onError=None, options={}, offline=False, updateAssets=True):
"""Synchronize single subtitle file.
This is simplified high level synchronization API. For finer-grained
control use `SyncController` object.
Parameters
----------
sub: SubFile or InputFile or dict
Input subtitle description, proper object instance or `dict` with
fields same as arguments to `InputFile` constructor.
ref: RefFile or InputFile or dict
Reference description, proper object instance or `dict` with fields
same as arguments to `InputFile` constructor.
out: OutputFile or dict
Output subtitle description, proper object instance or `dict` with
fields same as arguments to `OutputFile` constructor.
onError: callable, optional
Error callback for non-terminal errors. Terminal errors will raise
exception. For details see `SyncController` constructor.
options: dict, optional
Override default synchronization options, see `SyncController.configure`.
offline: bool, optional
Prevent any communication with asset server. If required assets are
missing, they will not be downloaded and synchronization will fail.
updateAssets: bool, optional
Whether to update existing assets if update is available online. Has no
effect with `offline`=`True`.
Returns
-------
tuple of (subsync.synchro.controller.SyncJobResult, subsync.synchro.controller.SyncStatus)
Synchronization result.
Notes
-----
This function runs synchronously - it will block until synchronization
finishes. For non-blocking synchronization use `SyncController` instead.
Example
-------
>>> sub = { 'path': './sub.srt', 'lang': 'rus' }
>>> ref = { 'path': './movie.avi', 'stream': 2, 'lang': 'eng' }
>>> out = { 'path': './out.srt' }
>>> subsync.synchronize(sub, ref, out)
(SyncJobResult(success=True, terminated=False, path='out.srt'), SyncStatus(correlated=True, maxChange=0.010168457031248579, progress=0.5212266710947953, factor=0.9999998222916713, points=289, formula=1.0000x+0.010, effort=0.5004523633099243))
"""
task = SyncTask(sub, ref, out)
assets = assetManager().getAssetsForTasks([ task ])
if not offline and (assets.missing() or updateAssets):
listUpdater = assetManager().getAssetListUpdater()
if not listUpdater.isRunning() and not listUpdater.isUpdated():
listUpdater.run()
listUpdater.wait()
assets.validate()
if not offline:
if updateAssets:
downloadAssets = assets.hasUpdate()
else:
downloadAssets = assets.notInstalled()
for asset in downloadAssets:
downloader = asset.downloader()
try:
downloader.run()
downloader.wait(reraise=True)
except:
downloader.terminate()
raise
assets.validate(localOnly=True)
res = (None, None)
def onJobEnd(task, status, result):
nonlocal res
res = (result, status)
sync = SyncController(onJobEnd=onJobEnd, onError=onError)
try:
sync.configure(**options)
sync.synchronize(task)
sync.wait()
except:
sync.terminate()
raise
return res
def version():
"""Get subsync version.
Returns
-------
tuple of str
Tuple of two strings (short, long):
- short - numbers separated by dot in format `major.minor` or `major.minor.revision`;
- long - descriptive string.
Example
-------
>>> subsync.version()
('0.15.34', '0.15-34-g5aa31da')
"""
try:
from .version import version, version_short
return version_short, version
except:
return None, 'UNDEFINED'
def subsync(argv):
"""Run subsync with list of command line arguments.
.. deprecated:: 0.16.0
Use either `synchronize` or `SyncController` instead.
"""
from .__main__ import subsync
return subsync(argv)
Sub-modules
subsync.assets
-
Assets management functionality
subsync.synchro
-
Subtitle Synchronizer
Functions
def assetManager()
-
Get
AssetManager
instance.Expand source code
def assetManager(): """Get `subsync.assets.mgr.AssetManager` instance.""" return AssetManager.instance()
def subsync(argv)
-
Run subsync with list of command line arguments.
Deprecated since version: 0.16.0
Use either
synchronize()
orSyncController
instead.Expand source code
def subsync(argv): """Run subsync with list of command line arguments. .. deprecated:: 0.16.0 Use either `synchronize` or `SyncController` instead. """ from .__main__ import subsync return subsync(argv)
def synchronize(sub, ref, out, *, onError=None, options={}, offline=False, updateAssets=True)
-
Synchronize single subtitle file.
This is simplified high level synchronization API. For finer-grained control use
SyncController
object.Parameters
sub
:SubFile
orInputFile
ordict
- Input subtitle description, proper object instance or
dict
with fields same as arguments toInputFile
constructor. ref
:RefFile
orInputFile
ordict
- Reference description, proper object instance or
dict
with fields same as arguments toInputFile
constructor. out
:OutputFile
ordict
- Output subtitle description, proper object instance or
dict
with fields same as arguments toOutputFile
constructor. onError
:callable
, optional- Error callback for non-terminal errors. Terminal errors will raise
exception. For details see
SyncController
constructor. options
:dict
, optional- Override default synchronization options, see
SyncController.configure()
. offline
:bool
, optional- Prevent any communication with asset server. If required assets are missing, they will not be downloaded and synchronization will fail.
updateAssets
:bool
, optional- Whether to update existing assets if update is available online. Has no
effect with
offline
=True
.
Returns
tuple
of(SyncJobResult, SyncStatus)
- Synchronization result.
Notes
This function runs synchronously - it will block until synchronization finishes. For non-blocking synchronization use
SyncController
instead.Example
>>> sub = { 'path': './sub.srt', 'lang': 'rus' } >>> ref = { 'path': './movie.avi', 'stream': 2, 'lang': 'eng' } >>> out = { 'path': './out.srt' } >>> subsync.synchronize(sub, ref, out) (SyncJobResult(success=True, terminated=False, path='out.srt'), SyncStatus(correlated=True, maxChange=0.010168457031248579, progress=0.5212266710947953, factor=0.9999998222916713, points=289, formula=1.0000x+0.010, effort=0.5004523633099243))
Expand source code
def synchronize(sub, ref, out, *, onError=None, options={}, offline=False, updateAssets=True): """Synchronize single subtitle file. This is simplified high level synchronization API. For finer-grained control use `SyncController` object. Parameters ---------- sub: SubFile or InputFile or dict Input subtitle description, proper object instance or `dict` with fields same as arguments to `InputFile` constructor. ref: RefFile or InputFile or dict Reference description, proper object instance or `dict` with fields same as arguments to `InputFile` constructor. out: OutputFile or dict Output subtitle description, proper object instance or `dict` with fields same as arguments to `OutputFile` constructor. onError: callable, optional Error callback for non-terminal errors. Terminal errors will raise exception. For details see `SyncController` constructor. options: dict, optional Override default synchronization options, see `SyncController.configure`. offline: bool, optional Prevent any communication with asset server. If required assets are missing, they will not be downloaded and synchronization will fail. updateAssets: bool, optional Whether to update existing assets if update is available online. Has no effect with `offline`=`True`. Returns ------- tuple of (subsync.synchro.controller.SyncJobResult, subsync.synchro.controller.SyncStatus) Synchronization result. Notes ----- This function runs synchronously - it will block until synchronization finishes. For non-blocking synchronization use `SyncController` instead. Example ------- >>> sub = { 'path': './sub.srt', 'lang': 'rus' } >>> ref = { 'path': './movie.avi', 'stream': 2, 'lang': 'eng' } >>> out = { 'path': './out.srt' } >>> subsync.synchronize(sub, ref, out) (SyncJobResult(success=True, terminated=False, path='out.srt'), SyncStatus(correlated=True, maxChange=0.010168457031248579, progress=0.5212266710947953, factor=0.9999998222916713, points=289, formula=1.0000x+0.010, effort=0.5004523633099243)) """ task = SyncTask(sub, ref, out) assets = assetManager().getAssetsForTasks([ task ]) if not offline and (assets.missing() or updateAssets): listUpdater = assetManager().getAssetListUpdater() if not listUpdater.isRunning() and not listUpdater.isUpdated(): listUpdater.run() listUpdater.wait() assets.validate() if not offline: if updateAssets: downloadAssets = assets.hasUpdate() else: downloadAssets = assets.notInstalled() for asset in downloadAssets: downloader = asset.downloader() try: downloader.run() downloader.wait(reraise=True) except: downloader.terminate() raise assets.validate(localOnly=True) res = (None, None) def onJobEnd(task, status, result): nonlocal res res = (result, status) sync = SyncController(onJobEnd=onJobEnd, onError=onError) try: sync.configure(**options) sync.synchronize(task) sync.wait() except: sync.terminate() raise return res
def version()
-
Get subsync version.
Returns
tuple
ofstr
-
Tuple of two strings (short, long):
- short - numbers separated by dot in format
major.minor
ormajor.minor.revision
; - long - descriptive string.
- short - numbers separated by dot in format
Example
>>> subsync.version() ('0.15.34', '0.15-34-g5aa31da')
Expand source code
def version(): """Get subsync version. Returns ------- tuple of str Tuple of two strings (short, long): - short - numbers separated by dot in format `major.minor` or `major.minor.revision`; - long - descriptive string. Example ------- >>> subsync.version() ('0.15.34', '0.15-34-g5aa31da') """ try: from .version import version, version_short return version_short, version except: return None, 'UNDEFINED'
Classes
class AssetList (iterable=(), /)
-
List of assets.
Used mainly to select assets required for synchronization.
Expand source code
class AssetList(list): """List of assets. Used mainly to select assets required for synchronization. """ def missing(self): """Get missing assets (as new `AssetList`). Missing assets are assets that are not available locally nor remotely on asset server. """ return AssetList([ a for a in self if a.isMissing() ]) def hasUpdate(self): """Get assets that could be updated (as new `AssetList`). These assets are available on asset server and are not installed locally or remote version are newer than local (based on version number). """ return AssetList([ a for a in self if a.hasUpdate() ]) def installed(self): """Get assets installed locally and ready to use (as new `AssetList`).""" return AssetList([ a for a in self if a.localVersion() ]) def notInstalled(self): """Get assets that are not installed locally (as new `AssetList`).""" return AssetList([ a for a in self if not a.localVersion() ]) def validate(self, localOnly=False): """Check if all assets on the list are available. Parameters ---------- localOnly: bool, optional If `True` this method will check if all assets are installed locally, otherwise it will check if assets are available either locally or on asset server. Raises ------ Error At least one asset is not available. """ if localOnly: assets = self.notInstalled() else: assets = self.missing() if assets: msg = [] speech = [ asset for asset in assets if asset.type == 'speech' ] dicts = [ asset for asset in assets if asset.type == 'dict' ] if speech: langs = ', '.join([ languages.getName(a.params[0]) for a in speech ]) msg += [ _('Synchronization with {} audio is currently not supported.') \ .format(langs) ] if dicts: langs = [ ' - '.join([ languages.getName(p) for p in a.params ]) for a in dicts ] msg += [ _('Synchronization between languages {} is currently not supported.') \ .format(', '.join(langs)) ] msg += [ '', _('missing assets:') ] msg += [ ' - ' + asset.getPrettyName() for asset in assets ] raise error.Error('\n'.join(msg))
Ancestors
- builtins.list
Methods
def hasUpdate(self)
-
Get assets that could be updated (as new
AssetList
).These assets are available on asset server and are not installed locally or remote version are newer than local (based on version number).
Expand source code
def hasUpdate(self): """Get assets that could be updated (as new `AssetList`). These assets are available on asset server and are not installed locally or remote version are newer than local (based on version number). """ return AssetList([ a for a in self if a.hasUpdate() ])
def installed(self)
-
Get assets installed locally and ready to use (as new
AssetList
).Expand source code
def installed(self): """Get assets installed locally and ready to use (as new `AssetList`).""" return AssetList([ a for a in self if a.localVersion() ])
def missing(self)
-
Get missing assets (as new
AssetList
).Missing assets are assets that are not available locally nor remotely on asset server.
Expand source code
def missing(self): """Get missing assets (as new `AssetList`). Missing assets are assets that are not available locally nor remotely on asset server. """ return AssetList([ a for a in self if a.isMissing() ])
def notInstalled(self)
-
Get assets that are not installed locally (as new
AssetList
).Expand source code
def notInstalled(self): """Get assets that are not installed locally (as new `AssetList`).""" return AssetList([ a for a in self if not a.localVersion() ])
def validate(self, localOnly=False)
-
Check if all assets on the list are available.
Parameters
localOnly
:bool
, optional- If
True
this method will check if all assets are installed locally, otherwise it will check if assets are available either locally or on asset server.
Raises
Error
- At least one asset is not available.
Expand source code
def validate(self, localOnly=False): """Check if all assets on the list are available. Parameters ---------- localOnly: bool, optional If `True` this method will check if all assets are installed locally, otherwise it will check if assets are available either locally or on asset server. Raises ------ Error At least one asset is not available. """ if localOnly: assets = self.notInstalled() else: assets = self.missing() if assets: msg = [] speech = [ asset for asset in assets if asset.type == 'speech' ] dicts = [ asset for asset in assets if asset.type == 'dict' ] if speech: langs = ', '.join([ languages.getName(a.params[0]) for a in speech ]) msg += [ _('Synchronization with {} audio is currently not supported.') \ .format(langs) ] if dicts: langs = [ ' - '.join([ languages.getName(p) for p in a.params ]) for a in dicts ] msg += [ _('Synchronization between languages {} is currently not supported.') \ .format(', '.join(langs)) ] msg += [ '', _('missing assets:') ] msg += [ ' - ' + asset.getPrettyName() for asset in assets ] raise error.Error('\n'.join(msg))
class InputFile (path=None, stream=None, *, streamByType=None, streamByLang=None, lang=None, enc=None, fps=None, channels=None)
-
Subtitle/reference input file.
You should use either
SubFile
orRefFile
instead of this base class.Parameters
path
:str
- Input file path.
stream
:int
, optional- Stream number, 1-based.
streamByType
:str
, optional- Select stream by type ('sub' or 'audio').
streamByLang
:str
, optional- Select stream by 3-letter language code, relies of file metadata.
lang
:str
, optional- Stream language as 2- or 3-letter ISO code.
enc
:str
, optional- Character encoding,
None
for auto detection based bylang
. fps
:float
, optional- Framerate.
channels
:str
orChannelsMap
, optional- Audio channels to listen to, could be 'auto' or
None
for auto selection, 'all' to listen to all channels, or comma separated channels abbreviation, e.g. 'FL,FR'.
Notes
Here stream number is 1-based (first stream in file has number 1) but other methods expects 0-based numbers.
Expand source code
class InputFile(object): """Subtitle/reference input file. You should use either `SubFile` or `RefFile` instead of this base class. """ def __init__(self, path=None, stream=None, *, streamByType=None, streamByLang=None, lang=None, enc=None, fps=None, channels=None): """ Parameters ---------- path: str Input file path. stream: int, optional Stream number, 1-based. streamByType: str, optional Select stream by type ('sub' or 'audio'). streamByLang: str, optional Select stream by 3-letter language code, relies of file metadata. lang: str, optional Stream language as 2- or 3-letter ISO code. enc: str, optional Character encoding, `None` for auto detection based by `lang`. fps: float, optional Framerate. channels: str or subsync.synchro.channels.ChannelsMap, optional Audio channels to listen to, could be 'auto' or `None` for auto selection, 'all' to listen to all channels, or comma separated channels abbreviation, e.g. 'FL,FR'. Notes ----- Here stream number is 1-based (first stream in file has number 1) but other methods expects 0-based numbers. """ self.path = path self.no = None self.type = None self.streams = {} self.fps = None self.lang = None self.enc = None self.channels = ChannelsMap.auto() self.filetype = None if not hasattr(self, 'types'): self.types = None if path is not None: self.open(path) if stream is not None: self.select(stream - 1) if streamByType or streamByLang: self.selectBy(type=streamByType, lang=streamByLang) self.lang = lang or self.lang self.enc = enc or self.enc self.fps = fps or self.fps if type(channels) is str: self.channels = ChannelsMap.deserialize(channels) elif channels is not None: self.channels = channels def __lt__(self, other): if self.path != other.path: return self.path < other.path return self.no < other.no def stream(self): """Return selected stream.""" if self.no is not None: return self.streams[self.no] def open(self, path): """Open input file.""" ss = gizmo.Demux(path).getStreamsInfo() streams = {s.no: s for s in ss} for t in [ 'video', 'audio', 'subtitle/text' ]: if t in [ s.type for s in ss ]: self.filetype = t break self.path = path self.streams = streams self.no = None self.lang = None self.enc = None self.channels = ChannelsMap.auto() self.fps = None for stream in ss: if stream.frameRate: self.fps = stream.frameRate break if len(streams) > 0: self.selectFirstMatchingStream() def select(self, no): """Select stream by number.""" stream = self.streams[no] self.no = no self.type = stream.type self.lang = stream.lang if not self.lang or self.lang.lower() == 'und': self.lang = getLangFromPath(self.path) return stream def selectBy(self, type=None, lang=None): """Select stream by type and language (or only one of them).""" for s in self.streams.values(): if self.types and s.type not in self.types: continue if type and not s.type.startswith(type): continue if lang and lang != s.lang.lower(): continue return self.select(s.no) raise Error(_('There is no matching stream in {}').format(self.path)) \ .addn('path', self.path) \ .addn('type', type) \ .addn('language', lang) def selectFirstMatchingStream(self, types=None): if types is not None: self.types = types if self.streams == None or len(self.streams) == 0: return None if self.types == None: return self.select(min(self.streams)) for t in self.types: for no in sorted(self.streams): if self.streams[no].type == t: return self.select(no) def hasMatchingStream(self, types=None): types = types or self.types if types is None: return len(self.streams) > 0 for s in self.streams.values(): if s.type in types: return True return False def isOpen(self): """Check whether file is opened.""" return self.path != None def isSelect(self): """Check whether stream is selected.""" return self.path != None and self.no != None def serialize(self): res = {} if self.path: res['path'] = self.path if self.no is not None: res['stream'] = self.no + 1 if self.lang: res['lang'] = self.lang if self.enc: res['enc'] = self.enc if self.fps: res['fps'] = self.fps if self.channels and self.channels.type != 'auto': res['channels'] = self.channels.serialize() return res def __repr__(self): return utils.fmtobj(self.__class__.__name__, path = '{}:{}/{}'.format(self.path, self.no, len(self.streams)), type = self.type, lang = self.lang, enc = self.enc, fps = self.fps, channels = self.channels if self.channels.type != 'auto' else None) def __str__(self): return utils.fmtstr( '{}:{}/{}'.format(self.path, self.no, len(self.streams)), type = self.type, lang = self.lang, enc = self.enc, fps = self.fps, channels = self.channels if self.channels.type != 'auto' else None)
Subclasses
Methods
def hasMatchingStream(self, types=None)
-
Expand source code
def hasMatchingStream(self, types=None): types = types or self.types if types is None: return len(self.streams) > 0 for s in self.streams.values(): if s.type in types: return True return False
def isOpen(self)
-
Check whether file is opened.
Expand source code
def isOpen(self): """Check whether file is opened.""" return self.path != None
def isSelect(self)
-
Check whether stream is selected.
Expand source code
def isSelect(self): """Check whether stream is selected.""" return self.path != None and self.no != None
def open(self, path)
-
Open input file.
Expand source code
def open(self, path): """Open input file.""" ss = gizmo.Demux(path).getStreamsInfo() streams = {s.no: s for s in ss} for t in [ 'video', 'audio', 'subtitle/text' ]: if t in [ s.type for s in ss ]: self.filetype = t break self.path = path self.streams = streams self.no = None self.lang = None self.enc = None self.channels = ChannelsMap.auto() self.fps = None for stream in ss: if stream.frameRate: self.fps = stream.frameRate break if len(streams) > 0: self.selectFirstMatchingStream()
def select(self, no)
-
Select stream by number.
Expand source code
def select(self, no): """Select stream by number.""" stream = self.streams[no] self.no = no self.type = stream.type self.lang = stream.lang if not self.lang or self.lang.lower() == 'und': self.lang = getLangFromPath(self.path) return stream
def selectBy(self, type=None, lang=None)
-
Select stream by type and language (or only one of them).
Expand source code
def selectBy(self, type=None, lang=None): """Select stream by type and language (or only one of them).""" for s in self.streams.values(): if self.types and s.type not in self.types: continue if type and not s.type.startswith(type): continue if lang and lang != s.lang.lower(): continue return self.select(s.no) raise Error(_('There is no matching stream in {}').format(self.path)) \ .addn('path', self.path) \ .addn('type', type) \ .addn('language', lang)
def selectFirstMatchingStream(self, types=None)
-
Expand source code
def selectFirstMatchingStream(self, types=None): if types is not None: self.types = types if self.streams == None or len(self.streams) == 0: return None if self.types == None: return self.select(min(self.streams)) for t in self.types: for no in sorted(self.streams): if self.streams[no].type == t: return self.select(no)
def serialize(self)
-
Expand source code
def serialize(self): res = {} if self.path: res['path'] = self.path if self.no is not None: res['stream'] = self.no + 1 if self.lang: res['lang'] = self.lang if self.enc: res['enc'] = self.enc if self.fps: res['fps'] = self.fps if self.channels and self.channels.type != 'auto': res['channels'] = self.channels.serialize() return res
def stream(self)
-
Return selected stream.
Expand source code
def stream(self): """Return selected stream.""" if self.no is not None: return self.streams[self.no]
class OutputFile (path=None, *, enc=None, fps=None)
-
Subtitle target description - specifies where output should be saved.
Parameters
path
:str
-
Output path or pattern. Path could be literal or may contain following variables:
{sub_path}
/{ref_path}
- subtitle/reference full path;{sub_no}
/{ref_no}
- stream number;{sub_lang}
/{ref_lang}
- 3-letter language code;{sub_name}
/{ref_name}
- file name (without path and extension);{sub_dir}
/{ref_dir}
- directory path;{if:<field>:<value>}
- if field is set, append value;{if_not:<field>:<value>}
- if field is not set, append value.
enc
:str
, optional- Character encoding, default is 'UTF-8'.
fps
:float
, optional- Framerate, applies only for frame-based subtitles.
Notes
Subtitle format is derived from file extension, thus path must end with one of the supported extensions.
Examples
{ref_dir}/{ref_name}{if:sub_lang:.}{sub_lang}.srt
{sub_dir}/{sub_name}-out.ssa
Expand source code
class OutputFile(object): """Subtitle target description - specifies where output should be saved.""" def __init__(self, path=None, *, enc=None, fps=None): """ Parameters ---------- path: str Output path or pattern. Path could be literal or may contain following variables: - `{sub_path}`/`{ref_path}` - subtitle/reference full path; - `{sub_no}`/`{ref_no}` - stream number; - `{sub_lang}`/`{ref_lang}` - 3-letter language code; - `{sub_name}`/`{ref_name}` - file name (without path and extension); - `{sub_dir}`/`{ref_dir}` - directory path; - `{if:<field>:<value>}` - if field is set, append value; - `{if_not:<field>:<value>}` - if field is not set, append value. enc: str, optional Character encoding, default is 'UTF-8'. fps: float, optional Framerate, applies only for frame-based subtitles. Notes ----- Subtitle format is derived from file extension, thus path must end with one of the supported extensions. Examples -------- `{ref_dir}/{ref_name}{if:sub_lang:.}{sub_lang}.srt` `{sub_dir}/{sub_name}-out.ssa` """ self.path = path self.enc = enc or 'UTF-8' self.fps = fps self.pathFormatter = None def getPath(self, sub, ref): """Compile path pattern for given `sub` and `ref`.""" if self.pathFormatter is None: self.pathFormatter = PathFormatter() return self.pathFormatter.format(self.path, sub, ref) def validateOutputPattern(self): """Raise exception for invalid path pattern.""" validatePattern(self.path) def serialize(self): res = {} if self.path: res['path'] = self.path if self.enc: res['enc'] = self.enc if self.fps: res['fps'] = self.fps return res def __repr__(self): return utils.fmtobj(self.__class__.__name__, path = self.path, enc = self.enc, fps = self.fps) def __str__(self): return utils.fmtstr(self.path, enc = self.enc, fps = self.fps)
Methods
def getPath(self, sub, ref)
-
Compile path pattern for given
sub
andref
.Expand source code
def getPath(self, sub, ref): """Compile path pattern for given `sub` and `ref`.""" if self.pathFormatter is None: self.pathFormatter = PathFormatter() return self.pathFormatter.format(self.path, sub, ref)
def serialize(self)
-
Expand source code
def serialize(self): res = {} if self.path: res['path'] = self.path if self.enc: res['enc'] = self.enc if self.fps: res['fps'] = self.fps return res
def validateOutputPattern(self)
-
Raise exception for invalid path pattern.
Expand source code
def validateOutputPattern(self): """Raise exception for invalid path pattern.""" validatePattern(self.path)
class RefFile (*args, **kw)
-
Reference file.
Parameters
path
:str
- Input file path.
stream
:int
, optional- Stream number, 1-based.
streamByType
:str
, optional- Select stream by type ('sub' or 'audio').
streamByLang
:str
, optional- Select stream by 3-letter language code, relies of file metadata.
lang
:str
, optional- Stream language as 2- or 3-letter ISO code.
enc
:str
, optional- Character encoding,
None
for auto detection based bylang
. fps
:float
, optional- Framerate.
channels
:str
orChannelsMap
, optional- Audio channels to listen to, could be 'auto' or
None
for auto selection, 'all' to listen to all channels, or comma separated channels abbreviation, e.g. 'FL,FR'.
Notes
Here stream number is 1-based (first stream in file has number 1) but other methods expects 0-based numbers.
Expand source code
class RefFile(InputFile): """Reference file.""" types = ('subtitle/text', 'audio') """Supported stream types.""" def __init__(self, *args, **kw): self.types = RefFile.types super().__init__(*args, **kw)
Ancestors
Class variables
var types
-
Supported stream types.
Inherited members
class SubFile (*args, **kw)
-
Input subtitle file.
Parameters
path
:str
- Input file path.
stream
:int
, optional- Stream number, 1-based.
streamByType
:str
, optional- Select stream by type ('sub' or 'audio').
streamByLang
:str
, optional- Select stream by 3-letter language code, relies of file metadata.
lang
:str
, optional- Stream language as 2- or 3-letter ISO code.
enc
:str
, optional- Character encoding,
None
for auto detection based bylang
. fps
:float
, optional- Framerate.
channels
:str
orChannelsMap
, optional- Audio channels to listen to, could be 'auto' or
None
for auto selection, 'all' to listen to all channels, or comma separated channels abbreviation, e.g. 'FL,FR'.
Notes
Here stream number is 1-based (first stream in file has number 1) but other methods expects 0-based numbers.
Expand source code
class SubFile(InputFile): """Input subtitle file.""" types = ('subtitle/text',) """Supported stream types.""" def __init__(self, *args, **kw): self.types = SubFile.types super().__init__(*args, **kw)
Ancestors
Class variables
var types
-
Supported stream types.
Inherited members
class SyncController (listener=None, **kw)
-
Synchronization controller.
Controls subtitle synchronization process. Synchronization is performed asynchronously in separate threads controlled internally. Status is reported by optional callbacks.
Parameters
listener
:object
, optional- Object with methods named as following callback parameters.
onJobStart
:callable(task)
, optional- Called when the task synchronization starts.
onJobEnd
:callable(task, status, result)
, optional- Called when the task synchronization ends.
onJobUpdate
:callable(task, status)
, optional- Called periodically during synchronization to report current state.
onFinish
:callable(terminated)
, optional- Called when synchronizer ends its work - either all tasks are processed or synchronization was terminated;
onError
:callable(task, source, error)
, optional- Called to report non-terminal errors. Non-terminal errors are errors not causing synchronizer to stop, but could impact results, e.g. audio decoding errors.
Notes
Callbacks arguments:
- task :
SyncTask
- object passed toSyncController.synchronize()
; - status :
SyncStatus
- object reflecting current synchronization state; - result :
SyncJobResult
; - terminated :
bool
- whether synchronization was terminated; - source :
str
- error source, could be 'sub' or 'ref' for subtitle/reference extractor or 'core'; - error - exception instance.
Expand source code
class SyncController(object): """Synchronization controller. Controls subtitle synchronization process. Synchronization is performed asynchronously in separate threads controlled internally. Status is reported by optional callbacks. """ def __init__(self, listener=None, **kw): """ Parameters ---------- listener: object, optional Object with methods named as following callback parameters. onJobStart: callable(task), optional Called when the task synchronization starts. onJobEnd: callable(task, status, result), optional Called when the task synchronization ends. onJobUpdate: callable(task, status), optional Called periodically during synchronization to report current state. onFinish: callable(terminated), optional Called when synchronizer ends its work - either all tasks are processed or synchronization was terminated; onError: callable(task, source, error), optional Called to report non-terminal errors. Non-terminal errors are errors not causing synchronizer to stop, but could impact results, e.g. audio decoding errors. Notes ----- Callbacks arguments: - task : `subsync.synchro.SyncTask` - object passed to `SyncController.synchronize`; - status : `SyncStatus` - object reflecting current synchronization state; - result : `SyncJobResult`; - terminated : `bool` - whether synchronization was terminated; - source : `str` - error source, could be 'sub' or 'ref' for subtitle/reference extractor or 'core'; - error - exception instance. """ self._onJobStart = kw.get('onJobStart', getattr(listener, 'onJobStart', lambda task: None)) self._onJobInit = kw.get('onJobInit', getattr(listener, 'onJobInit', lambda task: None)) self._onJobEnd = kw.get('onJobEnd', getattr(listener, 'onJobEnd', lambda task, status, result: None)) self._onJobUpdate = kw.get('onJobUpdate', getattr(listener, 'onJobUpdate', lambda task, status: None)) self._onFinish = kw.get('onFinish', getattr(listener, 'onFinish', lambda terminated: None)) self._onError = kw.get('onError', getattr(listener, 'onError', lambda task, source, error: None)) self._options = settings().getSynchronizationOptions() self._thread = None self._semaphore = threading.Semaphore() self._sync = None self._terminated = False def configure(self, **kw): """Override default synchronization options. Parameters ---------- maxPointDist: float, optional Maximum acceptable synchronization error, in seconds (default 2). minPointsNo: int, optional Minimum synchronization points no (default 20). outputCharEnc: str or None, optional Output character encoding, `None` for the same encoding as input subtitles (default 'UTF-8'). windowSize: float, optional Synchronization window, in seconds. Timestamps will be corrected no more than this value (default 1800). minWordProb: float, optional Minimum speech recognition score, between 0 and 1 (default 0.3). jobsNo: int or None, optional Number of concurent synchronization threads, `None` for auto (default `None`). minWordLen: int, optional Minumum word length, in letters. Shorter words will not be used as synchronization points (default 5). minCorrelation: float, optional Minimum correlation factor, between 0 and 1 (default 0.9999). minWordsSim: float, optional Minimum words similarity to be used as synchronization points, between 0 and 1 (default 0.6). minEffort: float, optional Controls when synchronization should be stopped, between 0 and 1 (default 0.5). outTimeOffset: float, optional Add constant offset to output subtitles, in seconds (default 0). overwrite: bool, optional Whether to overwrite existing output files. If not set, output file name will be suffixed with number if needed, to avoid overwritting (default `False`). """ for key, val in kw.items(): if key not in self._options: raise TypeError("Unexpected keyword argument '{}'".format(key)) self._options[key] = val def synchronize(self, tasks, *, timeout=None, interactive=False): """Start task[s] synchronization. Parameters ---------- tasks: SyncTask or iterable of SyncTask Single or several synchronization tasks. timeout: float or None, optional How often to call `onJobUpdate` callback, in seconds (if registered). `None` to not call it at all. interactive: bool, optional In interactive, tasks subtitles are not saved automatically (`minEffort` and task `out` field are ignored) and user is expected to save it manually either with `saveSynchronizedSubtitles` or `getSynchronizedSubtitles`. Notes ----- Synchronization status changes are notified with callbacks registered in constructor. `onFinish` is called only when `tasks` is iterable (as oposed to single `SyncTask` object). """ if self.isRunning(): raise RuntimeError('Another synchronization in progress') logger.debug('synchronization options: %s', self._options) self._terminated = False if isinstance(tasks, Iterable): for task in tasks: self.validateTask(task, interactive=interactive) self._thread = threading.Thread( target=self._run, args=(tasks, timeout, interactive), name='Synchronizer') else: self.validateTask(tasks, interactive=interactive) self._thread = threading.Thread( target=self._runTask, args=(tasks, timeout, interactive), name='Synchronizer') self._thread.start() def terminate(self): """Terminate running synchronization. Does nothing if synchronization is not running. """ if self.isRunning(): self._terminated = True self._semaphore.release() def isRunning(self): """Check if synchronization is running. Returns ------- bool """ return self._thread and self._thread.is_alive() def wait(self): """Block until synchronization ends. Returns ------- bool `True` if synchronization finished (either successfully or not), `False` if it was terminated with `SyncController.terminate`. """ if self._thread: self._thread.join() return not self._terminated def getStatus(self): """Return current synchronization status. Returns ------- SyncStatus or None """ return self._sync and self._sync.getStatus() def getProgress(self): """Return synchronization progress. Returns ------- float or None Progress of currently processed `SyncTask`, between 0 and 1. `None` if no task was run. """ return self._sync and self._sync.getProgress() def getSynchronizedSubtitles(self): if self._sync is None: raise RuntimeError('Subtitles not synchronized') return self._sync.getSynchronizedSubtitles() def saveSynchronizedSubtitles(self, path=None, task=None): """Save synchronized subtitles. Parameters ---------- path: str, optional Path to output subtitles. task: SyncTask, optional `out` field of task will be used as output description. Notes ----- At least one parameter must be set. Subtitles will be saved in `path` or in `task`.out.path if `path` is not set. `task`.out character encoding and framerate will be used, if set. """ if not path and not task: raise RuntimeError('At least one of the following arguments must be set: path or task') subs = self.getSynchronizedSubtitles() offset = self._options.get('outTimeOffset') if offset: logger.info('adjusting timestamps by offset %.3f', offset) subs.shift(s=offset) enc = (task and task.out and task.out.enc) \ or self._options.get('outputCharEnc') \ or (task and task.sub and task.sub.enc) or 'UTF-8' return subs.save(path=path or task.getOutputPath(), encoding=enc, fps=task and task.out and task.out.fps, overwrite=self._options.get('overwrite')) def validateTask(self, task, *, interactive=False): """Check if task is properly defined. Parameters ---------- task: SyncTask Task to validate. interactive: bool, optional For interactive synchronization `out` will not be vaildated. Raises ------ Error Invalid task. KeyError or Exception Invalid `task`.out.path pattern. """ sub, ref, out = task.sub, task.ref, task.out if sub is None or not sub.path or sub.no is None: raise Error('subtitles not set', task=task) if ref is None or not ref.path or ref.no is None: raise Error('reference file not set', task=task) if not interactive and (out is None or not out.path): raise Error('output path not set', task=task) if not interactive: out.validateOutputPattern() def _run(self, tasks, timeout, interactive): try: for no, task in enumerate(tasks): if not self._terminated: logger.info('running task %i/%i: %r', no, len(tasks), task) self._runTask(task, timeout, interactive) else: break except Exception as err: logger.warning('%r', err, exc_info=True) self._onError(None, 'core', err) finally: logger.info('synchronization finished') self._onFinish(self._terminated) def _runTask(self, task, timeout, interactive): try: from .synchronizer import Synchronizer self._onJobStart(task) self._sync = sync = Synchronizer(task.sub, task.ref) sync.onUpdate = self._semaphore.release sync.onError = lambda src, err: self._onError(task, src, err) sync.init(self._options, runCb=lambda: not self._terminated) if not self._terminated: sync.start() self._onJobInit(task) status = sync.getStatus() minEffort = self._options.get('minEffort', 1.0) if timeout is not None: lastTime = time.monotonic() - timeout while not self._terminated and sync.isRunning() \ and (interactive or minEffort >= 1.0 or status.effort < minEffort): self._semaphore.acquire(timeout=timeout) status = sync.getStatus() if timeout is not None: now = time.monotonic() if now - lastTime >= timeout: lastTime = now self._onJobUpdate(task, status) except Exception as err: logger.warning('%r', err, exc_info=True) self._onError(task, 'core', err) try: sync.stop(force=True) status = sync.getStatus() logger.info('result: %r', status) succeeded = not self._terminated and status and status.correlated path = None if not interactive and succeeded and task.out: try: path = self.saveSynchronizedSubtitles(task=task) except Exception as err: logger.warning('subtitle save failed: %r', err, exc_info=True) self._onError(task, 'core', err) succeeded = False res = SyncJobResult(succeeded, self._terminated, path) self._onJobEnd(task, status, res) except Exception as err: logger.warning('%r', err, exc_info=True) self._onError(task, 'core', err) finally: sync.destroy() logger.info('task finished %r', task)
Methods
def configure(self, **kw)
-
Override default synchronization options.
Parameters
maxPointDist
:float
, optional- Maximum acceptable synchronization error, in seconds (default 2).
minPointsNo
:int
, optional- Minimum synchronization points no (default 20).
outputCharEnc
:str
orNone
, optional- Output character encoding,
None
for the same encoding as input subtitles (default 'UTF-8'). windowSize
:float
, optional- Synchronization window, in seconds. Timestamps will be corrected no more than this value (default 1800).
minWordProb
:float
, optional- Minimum speech recognition score, between 0 and 1 (default 0.3).
jobsNo
:int
orNone
, optional- Number of concurent synchronization threads,
None
for auto (defaultNone
). minWordLen
:int
, optional- Minumum word length, in letters. Shorter words will not be used as synchronization points (default 5).
minCorrelation
:float
, optional- Minimum correlation factor, between 0 and 1 (default 0.9999).
minWordsSim
:float
, optional- Minimum words similarity to be used as synchronization points, between 0 and 1 (default 0.6).
minEffort
:float
, optional- Controls when synchronization should be stopped, between 0 and 1 (default 0.5).
outTimeOffset
:float
, optional- Add constant offset to output subtitles, in seconds (default 0).
overwrite
:bool
, optional- Whether to overwrite existing output files. If not set, output file
name will be suffixed with number if needed, to avoid overwritting
(default
False
).
Expand source code
def configure(self, **kw): """Override default synchronization options. Parameters ---------- maxPointDist: float, optional Maximum acceptable synchronization error, in seconds (default 2). minPointsNo: int, optional Minimum synchronization points no (default 20). outputCharEnc: str or None, optional Output character encoding, `None` for the same encoding as input subtitles (default 'UTF-8'). windowSize: float, optional Synchronization window, in seconds. Timestamps will be corrected no more than this value (default 1800). minWordProb: float, optional Minimum speech recognition score, between 0 and 1 (default 0.3). jobsNo: int or None, optional Number of concurent synchronization threads, `None` for auto (default `None`). minWordLen: int, optional Minumum word length, in letters. Shorter words will not be used as synchronization points (default 5). minCorrelation: float, optional Minimum correlation factor, between 0 and 1 (default 0.9999). minWordsSim: float, optional Minimum words similarity to be used as synchronization points, between 0 and 1 (default 0.6). minEffort: float, optional Controls when synchronization should be stopped, between 0 and 1 (default 0.5). outTimeOffset: float, optional Add constant offset to output subtitles, in seconds (default 0). overwrite: bool, optional Whether to overwrite existing output files. If not set, output file name will be suffixed with number if needed, to avoid overwritting (default `False`). """ for key, val in kw.items(): if key not in self._options: raise TypeError("Unexpected keyword argument '{}'".format(key)) self._options[key] = val
def getProgress(self)
-
Return synchronization progress.
Returns
float
orNone
- Progress of currently processed
SyncTask
, between 0 and 1.None
if no task was run.
Expand source code
def getProgress(self): """Return synchronization progress. Returns ------- float or None Progress of currently processed `SyncTask`, between 0 and 1. `None` if no task was run. """ return self._sync and self._sync.getProgress()
def getStatus(self)
-
Return current synchronization status.
Returns
SyncStatus
orNone
Expand source code
def getStatus(self): """Return current synchronization status. Returns ------- SyncStatus or None """ return self._sync and self._sync.getStatus()
def getSynchronizedSubtitles(self)
-
Expand source code
def getSynchronizedSubtitles(self): if self._sync is None: raise RuntimeError('Subtitles not synchronized') return self._sync.getSynchronizedSubtitles()
def isRunning(self)
-
Check if synchronization is running.
Returns
bool
Expand source code
def isRunning(self): """Check if synchronization is running. Returns ------- bool """ return self._thread and self._thread.is_alive()
def saveSynchronizedSubtitles(self, path=None, task=None)
-
Save synchronized subtitles.
Parameters
path
:str
, optional- Path to output subtitles.
task
:SyncTask
, optionalout
field of task will be used as output description.
Notes
At least one parameter must be set. Subtitles will be saved in
path
or intask
.out.path ifpath
is not set.task
.out character encoding and framerate will be used, if set.Expand source code
def saveSynchronizedSubtitles(self, path=None, task=None): """Save synchronized subtitles. Parameters ---------- path: str, optional Path to output subtitles. task: SyncTask, optional `out` field of task will be used as output description. Notes ----- At least one parameter must be set. Subtitles will be saved in `path` or in `task`.out.path if `path` is not set. `task`.out character encoding and framerate will be used, if set. """ if not path and not task: raise RuntimeError('At least one of the following arguments must be set: path or task') subs = self.getSynchronizedSubtitles() offset = self._options.get('outTimeOffset') if offset: logger.info('adjusting timestamps by offset %.3f', offset) subs.shift(s=offset) enc = (task and task.out and task.out.enc) \ or self._options.get('outputCharEnc') \ or (task and task.sub and task.sub.enc) or 'UTF-8' return subs.save(path=path or task.getOutputPath(), encoding=enc, fps=task and task.out and task.out.fps, overwrite=self._options.get('overwrite'))
def synchronize(self, tasks, *, timeout=None, interactive=False)
-
Start task[s] synchronization.
Parameters
tasks
:SyncTask
oriterable
ofSyncTask
- Single or several synchronization tasks.
timeout
:float
orNone
, optional- How often to call
onJobUpdate
callback, in seconds (if registered).None
to not call it at all. interactive
:bool
, optional- In interactive, tasks subtitles are not saved automatically
(
minEffort
and taskout
field are ignored) and user is expected to save it manually either withsaveSynchronizedSubtitles
orgetSynchronizedSubtitles
.
Notes
Synchronization status changes are notified with callbacks registered in constructor.
onFinish
is called only whentasks
is iterable (as oposed to singleSyncTask
object).Expand source code
def synchronize(self, tasks, *, timeout=None, interactive=False): """Start task[s] synchronization. Parameters ---------- tasks: SyncTask or iterable of SyncTask Single or several synchronization tasks. timeout: float or None, optional How often to call `onJobUpdate` callback, in seconds (if registered). `None` to not call it at all. interactive: bool, optional In interactive, tasks subtitles are not saved automatically (`minEffort` and task `out` field are ignored) and user is expected to save it manually either with `saveSynchronizedSubtitles` or `getSynchronizedSubtitles`. Notes ----- Synchronization status changes are notified with callbacks registered in constructor. `onFinish` is called only when `tasks` is iterable (as oposed to single `SyncTask` object). """ if self.isRunning(): raise RuntimeError('Another synchronization in progress') logger.debug('synchronization options: %s', self._options) self._terminated = False if isinstance(tasks, Iterable): for task in tasks: self.validateTask(task, interactive=interactive) self._thread = threading.Thread( target=self._run, args=(tasks, timeout, interactive), name='Synchronizer') else: self.validateTask(tasks, interactive=interactive) self._thread = threading.Thread( target=self._runTask, args=(tasks, timeout, interactive), name='Synchronizer') self._thread.start()
def terminate(self)
-
Terminate running synchronization.
Does nothing if synchronization is not running.
Expand source code
def terminate(self): """Terminate running synchronization. Does nothing if synchronization is not running. """ if self.isRunning(): self._terminated = True self._semaphore.release()
def validateTask(self, task, *, interactive=False)
-
Check if task is properly defined.
Parameters
task
:SyncTask
- Task to validate.
interactive
:bool
, optional- For interactive synchronization
out
will not be vaildated.
Raises
Error
- Invalid task.
KeyError
orException
- Invalid
task
.out.path pattern.
Expand source code
def validateTask(self, task, *, interactive=False): """Check if task is properly defined. Parameters ---------- task: SyncTask Task to validate. interactive: bool, optional For interactive synchronization `out` will not be vaildated. Raises ------ Error Invalid task. KeyError or Exception Invalid `task`.out.path pattern. """ sub, ref, out = task.sub, task.ref, task.out if sub is None or not sub.path or sub.no is None: raise Error('subtitles not set', task=task) if ref is None or not ref.path or ref.no is None: raise Error('reference file not set', task=task) if not interactive and (out is None or not out.path): raise Error('output path not set', task=task) if not interactive: out.validateOutputPattern()
def wait(self)
-
Block until synchronization ends.
Returns
bool
True
if synchronization finished (either successfully or not),False
if it was terminated withSyncController.terminate()
.
Expand source code
def wait(self): """Block until synchronization ends. Returns ------- bool `True` if synchronization finished (either successfully or not), `False` if it was terminated with `SyncController.terminate`. """ if self._thread: self._thread.join() return not self._terminated
class SyncTask (sub=None, ref=None, out=None, data=None)
-
Synchronization task.
Parameters
sub
:SubFile
orInputFile
ordict
- Input subtitle file description. Proper object instance or
dict
with fields same as arguments toInputFile
. ref
:RefFile
orInputFile
ordict
- Reference file description. Proper object instance or
dict
with fields same as arguments toInputFile
. out
:OutputFile
ordict
- Output subtitle file description. Proper object instance or
dict
with fields same as arguments toOutputFile
. data
:any
, optional- Any user defined data, useful for passing extra information via
SyncController
callbacks.
Expand source code
class SyncTask(object): """Synchronization task.""" def __init__(self, sub=None, ref=None, out=None, data=None): """ Parameters ---------- sub: subsync.SubFile or subsync.InputFile or dict Input subtitle file description. Proper object instance or `dict` with fields same as arguments to `subsync.InputFile`. ref: subsync.RefFile or subsync.InputFile or dict Reference file description. Proper object instance or `dict` with fields same as arguments to `subsync.InputFile`. out: subsync.OutputFile or dict Output subtitle file description. Proper object instance or `dict` with fields same as arguments to `subsync.OutputFile`. data: any, optional Any user defined data, useful for passing extra information via `subsync.SyncController` callbacks. """ if isinstance(sub, dict): sub = SubFile(**sub) if isinstance(ref, dict): ref = RefFile(**ref) if isinstance(out, dict): out = OutputFile(**out) self.sub = sub self.ref = ref self.out = out self.data = data def getOutputPath(self): return self.out and self.out.getPath(self.sub, self.ref) def serialize(self): res = {} if self.sub: res['sub'] = self.sub.serialize() if self.ref: res['ref'] = self.ref.serialize() if self.out: res['out'] = self.out.serialize() return res def __repr__(self): return utils.fmtobj(self.__class__.__name__, sub = repr(self.sub), ref = repr(self.ref), out = repr(self.out), )
Methods
def getOutputPath(self)
-
Expand source code
def getOutputPath(self): return self.out and self.out.getPath(self.sub, self.ref)
def serialize(self)
-
Expand source code
def serialize(self): res = {} if self.sub: res['sub'] = self.sub.serialize() if self.ref: res['ref'] = self.ref.serialize() if self.out: res['out'] = self.out.serialize() return res