#pylint:disable=C0114, C0115, C0116, R1705, R0902, R0904, R0911, R0912, R0915, W0703, E1101, W1203, W0719
#pylint:disable=R1710, W0603
import importlib
import importlib.machinery
import importlib.util
import logging
import os
import copy
import sys
import time
import traceback
import winreg
import configparser
from pathlib import Path
from types import ModuleType
from typing import List, Dict, Set, Iterable, Any, Tuple, Callable
from pydebugstring import outputDebugString,OutputDebugStringHandler
import debugpy
import natlink
from natlinkcore.config import LogLevel, NatlinkConfig, expand_path
from natlinkcore.readwritefile import ReadWriteFile
from natlinkcore.callbackhandler import CallbackHandler
from natlinkcore.singleton import Singleton
# the possible languages (for get_user_language) (runs at start and on_change_callback, user)
# default is "enx", being one of the English dialects...
from natlinkcore import getThisDir
thisDir = getThisDir(__file__) # return a Path instance
UserLanguages = {
"Nederlands": "nld",
"Fran\xe7ais": "fra",
"Deutsch": "deu",
"Italiano": "ita",
"Espa\xf1ol": "esp",
"Dutch": "nld",
"French": "fra",
"German": "deu",
"Italian": "ita",
"Spanish": "esp",}
python_exec= "python.exe" #for DAP
[docs]
class NatlinkMain(metaclass=Singleton):
"""main class of Natlink, make it a "singleton"
"""
def __init__(self, logger: Any=None, config: Any = None):
if logger is None:
raise ValueError(f'loader.NatlinkMain, first instance should be called with a logging.Logger instance, not {logger}')
if config is None:
raise ValueError(f'loader.NatlinkMain, first instance should be called with a NatlinkConfig instance, not {config}')
self.logger = logger
self.config = config
self.loaded_modules: Dict[Path, ModuleType] = {}
self.prog_names_visited: Set[str] = set() # to enable loading program specific grammars
self.bad_modules: Set[Path] = set()
self.load_attempt_times: Dict[Path, float] = {}
self.__user: str = '' #
self.__profile: str = '' # at start and on_change_callback user
self.__language: str = '' #
self.__load_on_begin_utterance = None
self.load_on_begin_utterance = self.config.load_on_begin_utterance # set the property load_on_begin_utterance
# callback instances:
self._pre_load_callback = CallbackHandler('pre_load')
self._post_load_callback = CallbackHandler('post_load')
self._on_mic_on_callback = CallbackHandler('on_mic_on')
self._on_mic_off_callback = CallbackHandler('on_mic_off')
self._on_begin_utterance_callback = CallbackHandler('on_begin_utterance')
self.seen: Set[Path] = set() # start empty in trigger_load
self.bom = self.encoding = self.config_text = '' # getconfigsetting and writeconfigsetting
self.dap_started=False
# for shorter logger.debug messages
self.prev_module_info = None
def set_on_begin_utterance_callback(self, func: Callable[[], None]) -> None:
self._on_begin_utterance_callback.set(func)
def set_on_mic_on_callback(self, func: Callable[[], None]) -> None:
self._on_mic_on_callback.set(func)
def set_on_mic_off_callback(self, func: Callable[[], None]) -> None:
self._on_mic_off_callback.set(func)
def set_pre_load_callback(self, func: Callable[[], None]) -> None:
self._pre_load_callback.set(func)
def set_post_load_callback(self, func: Callable[[], None]) -> None:
self._post_load_callback.set(func)
def delete_on_begin_utterance_callback(self, func: Callable[[], None]) -> None:
self._on_begin_utterance_callback.delete(func)
def delete_on_mic_on_callback(self, func: Callable[[], None]) -> None:
self._on_mic_on_callback.delete(func)
def delete_on_mic_off_callback(self, func: Callable[[], None]) -> None:
self._on_mic_off_callback.delete(func)
def delete_pre_load_callback(self, func: Callable[[], None]) -> None:
self._pre_load_callback.delete(func)
def delete_post_load_callback(self, func: Callable[[], None]) -> None:
self._post_load_callback.delete(func)
@property
def module_paths_for_user(self) -> List[Path]:
return self._module_paths_in_dirs(self.config.directories_for_user(self.user))
# @property
# def module_paths_for_directory(self) -> List[Path]:
# return self._module_paths_in_dir(self.config.directories_for_user(self.user))
# three properties, which are set at start or at on_change_callback:
@property
def language(self) -> str:
"""holds the language of the current profile (default 'enx')
"""
if self.__language == '':
self.set_user_language()
return self.__language or 'enx'
@language.setter
def language(self, value: str):
if value and len(value) == 3:
self.__language = value
else:
self.__language = 'enx'
self.logger.warning(f'set language property: invalid value ("{value}"), set "enx"')
@property
def profile(self) -> str:
"""holds the directory profile of current user profile
"""
return self.__profile or ''
@profile.setter
def profile(self, value: str):
self.__profile = value or ''
@property
def user(self) -> str:
"""holds the name of the current user profile
"""
return self.__user or ''
@user.setter
def user(self, value: str):
self.__user = value or ''
# QH added, for _control grammar of Unimacro:
[docs]
def get_loaded_modules(self) -> Dict:
"""return a copy of the loaded modules
"""
return copy.copy(self.loaded_modules)
# load_on_begin_utterance is a property...
[docs]
def get_load_on_begin_utterance(self) -> Any:
"""this value is most often True or False, taken from the config file
It can also be (set to) a positive int, with which it does
the load_on_begin_utterance so many times. After these utterances,
the value falls back to False.
With Vocola, this value is set to 1, for a one time load_on_begin_utterance, wihtout the
need to toggle the microphone.
"""
return self.__load_on_begin_utterance
[docs]
def set_load_on_begin_utterance(self, value: Any):
"""set the value for loading at each utterance to True, False or positive int
For Vocola, setting this value to 1 did not work, setting to 2 does, so
you need one extra utterance for a new vocola command to come through.
"""
if isinstance(value, bool):
self.__load_on_begin_utterance = value
return
if isinstance(value, int):
if value > 0:
self.logger.info(f'set_load_on_begin_utterance to {value}')
self.__load_on_begin_utterance = value
else:
self.logger.info('set_load_on_begin_utterance to False')
self.__load_on_begin_utterance = False
return
raise TypeError(f'set_load_on_begin_utterance, invalid type for value: {value} (type: {type(value)})')
load_on_begin_utterance = property(get_load_on_begin_utterance, set_load_on_begin_utterance)
# def _module_paths_in_dir(self, directory: str) -> List[Path]:
# """give modules in directory
# """
#
# def is_script(f: Path) -> bool:
# if not f.is_file():
# return False
# if not f.suffix == '.py':
# return False
#
# if f.stem.startswith('_'):
# return True
# for prog_name in self.prog_names_visited:
# if f.stem == prog_name or f.stem.startswith( prog_name + '_'):
# return True
# return False
#
# init = '__init__.py'
#
# mod_paths: List[Path] = []
# dir_path = Path(directory)
# scripts = sorted(filter(is_script, dir_path.iterdir()))
# init_path = dir_path.joinpath(init)
# if init_path in scripts:
# scripts.remove(init_path)
# scripts.insert(0, init_path)
# mod_paths.extend(scripts)
#
# return mod_paths
def _module_paths_in_dirs(self, directories: Iterable[str]) -> List[Path]:
def is_script(f: Path) -> bool:
if not f.is_file():
return False
if not f.suffix == '.py':
return False
if f.stem.startswith('_'):
return True
for prog_name in self.prog_names_visited:
if f.stem == prog_name or f.stem.startswith( prog_name + '_'):
return True
return False
init = '__init__.py'
mod_paths: List[Path] = []
for d in directories:
dir_path = Path(d)
scripts = sorted(filter(is_script, dir_path.iterdir()))
init_path = dir_path.joinpath(init)
if init_path in scripts:
scripts.remove(init_path)
scripts.insert(0, init_path)
mod_paths.extend(scripts)
return mod_paths
@staticmethod
def _add_dirs_to_path(directories: Iterable[str]) -> None:
for d in directories:
d_expanded = expand_path(d)
if d_expanded not in sys.path:
sys.path.insert(0, d_expanded)
def _call_and_catch_all_exceptions(self, fn: Callable[[], None]) -> None:
try:
fn()
except Exception:
self.logger.exception(traceback.format_exc())
def unload_module(self, module: ModuleType) -> None:
unload = getattr(module, 'unload', None)
if unload is None:
self.logger.info(f'cannot unload module {module.__name__}')
return
self.logger.debug(f'unloading module: {module.__name__}')
self._call_and_catch_all_exceptions(unload)
@staticmethod
def _import_module_from_path(mod_path: Path) -> ModuleType:
mod_name = mod_path.stem
spec = importlib.util.spec_from_file_location(mod_name, mod_path)
if spec is None:
raise FileNotFoundError(f'Could not find spec for: {mod_name}')
loader = spec.loader
if loader is None:
raise FileNotFoundError(f'Could not find loader for: {mod_name}')
if not isinstance(loader, importlib.machinery.SourceFileLoader):
raise ValueError(f'module {mod_name} does not have a SourceFileLoader loader')
module = importlib.util.module_from_spec(spec)
loader.exec_module(module)
return module
def load_or_reload_module(self, mod_path: Path, force_load: bool = False) -> None:
mod_name = mod_path.stem
if mod_path in self.seen:
self.logger.warning(f'Attempting to load duplicate module: {mod_path})')
return
if not mod_path.is_file():
# this can only happen if a file (_vocola_vcl.py for example) is present when scanning all files
# but is removed when loading _vocola_main in between (compiling the new state of all .vcl files)
self.logger.debug(f'load_or_reload_module: not a file, so cannot load:\n\t"{mod_path}')
if mod_path in self.bad_modules:
self.bad_modules.remove(mod_path)
return
# if not self.load_attempt_times:
# self.logger.warning(f'======== load_attempt_times is empty: {self.load_attempt_times}')
last_attempt_time = self.load_attempt_times.get(mod_path, 0.0)
self.load_attempt_times[mod_path] = time.time()
try:
if mod_path in self.bad_modules:
self.logger.debug(f'mod_path: {mod_path}, in self.bad_modules...')
last_modified_time = mod_path.stat().st_mtime
if force_load or last_attempt_time < last_modified_time:
self.logger.info(f'loading previously bad module: {mod_name}')
module = self._import_module_from_path(mod_path)
try:
self.bad_modules.remove(mod_path)
except KeyError:
# added QH, I think it should not come here:
self.logger.warning(f'load_or_reload_module, unexpected, cannot remove key {mod_path} from self.bad_modules:\n\t{self.bad_modules}\n\t====\n')
self.loaded_modules[mod_path] = module
return
else:
# self.logger.debug(f'skipping unchanged bad module: {mod_name}')
return
else:
maybe_module = self.loaded_modules.get(mod_path)
# remove force_load here, in favor of below:
if maybe_module is None:
self.logger.info(f'loading module: {mod_name}')
module = self._import_module_from_path(mod_path)
self.loaded_modules[mod_path] = module
return
module = maybe_module
last_modified_time = mod_path.stat().st_mtime
diff = last_modified_time - last_attempt_time # check for -0.1 instead of 0, a ???
# _pre_load_callback may need this..
if force_load or diff > 0:
if force_load:
self.logger.info(f'reloading module: {mod_name}, force_load: {force_load}')
else:
self.logger.info(f'reloading module: {mod_name}')
self.unload_module(module)
del module
module = self._import_module_from_path(mod_path)
self.loaded_modules[mod_path] = module
self.logger.debug(f'loaded module: {module.__name__}')
return
# self.logger.debug(f'skipping unchanged loaded module: {mod_name}')
return
except Exception:
self.logger.exception(traceback.format_exc())
self.logger.debug(f'load_or_reload_module, exception, add to self.bad_modules {mod_path}')
self.bad_modules.add(mod_path)
if mod_path in self.loaded_modules:
old_module = self.loaded_modules.pop(mod_path)
self.unload_module(old_module)
del old_module
importlib.invalidate_caches()
def load_or_reload_modules(self, mod_paths: Iterable[Path], force_load: bool = None) -> None:
for mod_path in mod_paths:
self.load_or_reload_module(mod_path, force_load=force_load)
self.seen.add(mod_path)
[docs]
def unload_all_loaded_modules(self):
"""unload the modules that are loaded, and empty the bad modules list
"""
for module in self.loaded_modules.values():
self.unload_module(module)
self.bad_modules.clear()
def remove_modules_that_no_longer_exist(self) -> None:
mod_paths = self.module_paths_for_user
for mod_path in set(self.loaded_modules).difference(mod_paths):
self.logger.info(f'unloading removed or not-for-this-user module {mod_path.stem}')
old_module = self.loaded_modules.pop(mod_path)
self.load_attempt_times.pop(mod_path)
self.unload_module(old_module)
del old_module
for mod_path in self.bad_modules.difference(mod_paths):
self.logger.debug(f'bad module was removed: {mod_path.stem}')
self.bad_modules.remove(mod_path)
self.load_attempt_times.pop(mod_path)
importlib.invalidate_caches()
def trigger_load(self, force_load: bool = None) -> None:
self.seen.clear()
if force_load:
self.logger.debug(f'triggering load/reload process (force_load: {force_load})')
else:
self.logger.debug('triggering load/reload process')
self.remove_modules_that_no_longer_exist()
mod_paths = self.module_paths_for_user
if not mod_paths:
fallback_directory = Path(get_natlinkcore_dirname())/"DefaultConfig"
if not fallback_directory.is_dir():
raise OSError(f'NatlinkMain.trigger_load: no directories specified, and fallback_directory is invalid: "{str(fallback_directory)}"')
mod_paths = self._module_paths_in_dirs([fallback_directory])
print(f'Warning, no directories specified for Natlink grammars,\n\tfalling back to default configuration "{str(fallback_directory)}"')
self._pre_load_callback.run()
self.load_or_reload_modules(mod_paths, force_load=force_load)
self._post_load_callback.run()
[docs]
def on_change_callback(self, change_type: str, args: Any) -> None:
"""on_change_callback, when another user profile is chosen, or when the mic state changes
"""
if change_type == 'user':
self.set_user_language(args)
self.logger.debug(f'on_change_callback, user "{self.user}", profile: "{self.profile}", language: "{self.language}"')
if self.config.load_on_user_changed:
# added line, QH, 2023-10-08
self.unload_all_loaded_modules()
self.trigger_load(force_load=True)
elif change_type == 'mic' and args == 'on':
self.logger.debug('on_change_callback called with: "mic", "on"')
self._on_mic_on_callback.run()
if self.config.load_on_mic_on:
self.trigger_load()
elif change_type == 'mic' and args == 'off':
self.logger.debug('on_change_callback called with: "mic", "off"')
self._on_mic_off_callback.run()
else:
self.logger.debug(f'on_change_callback unhandled: change_type: "{change_type}", args: "{args}"')
def on_begin_callback(self, module_info: Tuple[str, str, int]) -> None:
if module_info != self.prev_module_info:
prog_name = Path(module_info[0]).stem
self.logger.debug(f'-on_begin_callback, new module info: ( (...){prog_name}, {module_info[1]}, {module_info[2]} )')
self.prev_module_info = module_info
else:
self.logger.debug('-on_begin_callback, same moduleInfo')
self._on_begin_utterance_callback.run()
prog_name = Path(module_info[0].lower()).stem
if prog_name not in self.prog_names_visited:
self.prog_names_visited.add(prog_name)
self.trigger_load()
elif self.load_on_begin_utterance:
# manipulate this setting:
value = self.load_on_begin_utterance
if isinstance(value, bool):
pass
elif isinstance(value, int):
value -= 1
self.load_on_begin_utterance = value
self.trigger_load()
[docs]
def get_user_language(self, DNSuserDirectory):
"""return the user language (default "enx") from Dragon inifiles
like "nld" for Dutch, etc.
"""
isfile, isdir, join = os.path.isfile, os.path.isdir, os.path.join
if not (DNSuserDirectory and isdir(DNSuserDirectory)):
self.logger.debug('get_user_language, no DNSuserDirectory passed, probably Dragon is not running, return "enx"')
return 'enx'
ns_options_ini = join(DNSuserDirectory, 'options.ini')
if not (ns_options_ini and isfile(ns_options_ini)):
self.logger.debug(f'get_user_language, warning no valid ini file: "{ns_options_ini}" found, return "enx"')
return "enx"
section = "Options"
keyname = "Last Used Acoustics"
keyToModel = self.getconfigsetting(option=keyname, section=section, filepath=ns_options_ini)
ns_acoustic_ini = join(DNSuserDirectory, 'acoustic.ini')
section = "Base Acoustic"
if not (ns_acoustic_ini and isfile(ns_acoustic_ini)):
self.logger.debug(f'get_user_language: warning: user language cannot be found from Dragon Inifile: "{ns_acoustic_ini}", return "enx"')
return 'enx'
# user_language_long = win32api.GetProfileVal(section, keyToModel, "", ns_acoustic_ini)
user_language_long = self.getconfigsetting(option=keyToModel, section=section, filepath=ns_acoustic_ini)
user_language_long = user_language_long.split("|")[0].strip()
if user_language_long in UserLanguages:
language = UserLanguages[user_language_long]
self.logger.debug(f'get_user_language, return "{language}", (long language: "{user_language_long}")')
else:
language = 'enx'
self.logger.debug(f'get_user_language, return userLanguage: "{language}", (long language: "{user_language_long}")')
return language
[docs]
def set_user_language(self, args: Any = None):
"""can be called from other module to explicitly set the user language to 'enx', 'nld', etc
"""
if not (args and len(args) == 2):
try:
args = natlink.getCurrentUser()
except natlink.NatError:
# when Dragon not running, for testing:
args = ()
if args:
self.user, self.profile = args
self.language = self.get_user_language(self.profile)
# self.logger.debug(f'set_user_language, user: "{self.user}", profile: "{self.profile}", language: "{self.language}"')
else:
self.user, self.profile = '', ''
# self.logger.warning('set_user_language, cannot get input for get_user_language, set to "enx",\n\tprobably Dragon is not running or you are preforming pytests')
self.language = 'enx'
def start(self) -> None:
self.logger.info(f'Starting natlink loader from config file:\n\t"{self.config.config_path}"')
nsd = os.getenv('natlink_settingsdir')
if nsd:
self.logger.info('\t(You set environment variable "NATLINK_SETTINGSDIR")')
natlink.active_loader = self
# checking for default config location (probably when no natlinkconfig_gui has been run)
if self.config.config_path.startswith(str(thisDir)):
self.logger.warning('\nOops, you are starting Natlink with the config file "natlink.ini" from the "fallback location".')
self.logger.warning('\nThis can be fixed most easily by running the program *** natlinkconfig_gui *** from the Windows command line.\n')
self.logger.warning('The Natlink startup process is stopped now.\nPlease fix your configuration,and then restart Dragon.\n\n')
return
# checking for absence of directories, can also occur when natlinkconfig_gui has been run, but nothing done
if not self.config.directories:
self.logger.warning('\nStarting Natlink, but no directories to load are specified.\n\nPlease add one or more directories in your config file:\n')
self.logger.warning('This is most easily done by running the program *** natlinkconfig_gui *** from the Windows command line.')
self.logger.warning('\nBut, you can also edit this "natlink.ini" file with Notepad, or your favourite text editor...')
self.logger.warning('\nThe Natlink startup process is stopped now.\nPlease fix your configuration, and then restart Dragon.\n\n')
return
# self.logger.debug(f'directories: {self.config.directories}')
self._add_dirs_to_path(self.config.directories)
if self.config.load_on_startup:
# set language property:
self.set_user_language()
self.trigger_load()
natlink.setBeginCallback(self.on_begin_callback)
natlink.setChangeCallback(self.on_change_callback)
def setup_logger(self) -> None:
for handler in list(self.logger.handlers):
self.logger.removeHandler(handler)
self.logger.addHandler(logging.StreamHandler(sys.stdout))
self.logger.propagate = False
log_level = self.config.log_level
if log_level is not LogLevel.NOTSET:
self.logger.setLevel(log_level.value)
self.logger.debug(f'set log level to: {log_level.name}')
[docs]
def getconfigsetting(self, section: str, option: Any = None, filepath: Any = None, func: Any = None) -> str:
"""get a setting from possibly an inifile other than natlink.ini
Take a string as input, which is obtained from readwritefile.py, handling
different encodings and possible BOM marks.
When no "option" is passed, the contents of the section are returned (a list of options)
func can be configparser.getint or configparser.getboolean if needed, otherwise configparser.get (str) is taken.
pass: func='getboolean' or func='getint'.
"""
isfile = os.path.isfile
filepath = filepath or config_locations()[0]
if not isfile(filepath):
raise OSError(f'getconfigsetting, no valid filepath: "{filepath}"')
rwfile = ReadWriteFile()
self.config_text = rwfile.readAnything(filepath)
Config = configparser.ConfigParser()
Config.read_string(self.config_text)
if option is None:
return Config.options(section)
if isinstance(func, str):
func = getattr(Config, func)
else:
func = func or Config.get
if func.__name__ == 'get':
fallback = ''
elif func.__name__ == 'getint':
fallback = 0
elif func.__name__ == 'getboolean':
fallback = False
else:
raise TypeError(f'getconfigsetting, no fallback for "{func.__name__}"')
func = func or Config.get
return func(section=section, option=option, fallback=fallback)
# def get_natlink_system_config_filename() -> str:
# return get_config_info_from_registry('installPath')
def get_natlinkcore_dirname() -> str:
return thisDir
# return get_config_info_from_registry('installPath')
def get_config_info_from_registry(key_name: str) -> str:
hive, key, flags = (winreg.HKEY_LOCAL_MACHINE, r'Software\Natlink', winreg.KEY_WOW64_32KEY)
with winreg.OpenKeyEx(hive, key, access=winreg.KEY_READ | flags) as natlink_key:
result, _ = winreg.QueryValueEx(natlink_key, key_name)
return result
had_msg_error = False
had_msg_warning = False
[docs]
def config_locations() -> Iterable[str]:
"""give two possible locations, the wanted and the "fallback" location
wanted: in the '.natlink' subdirectory of `home` or in "NATLINK_USERDIR", this variable is
going to be replaced by "NATLINK_SETTINGSDIR".
name is always 'natlink.ini'
the fallback location is in the installed files, and provides the frame for the config file.
with the configurenatlink (natlinkconfigfunction.py or configfurenatlink.pyw) the fallback version
of the config file is copied into the wanted location.
"""
global had_msg_warning, had_msg_error
join, expanduser, getenv, isfile = os.path.join, os.path.expanduser, os.getenv, os.path.isfile
home = expanduser('~')
config_sub_dir = '.natlink'
natlink_inifile = 'natlink.ini'
fallback_config_file = join(get_natlinkcore_dirname(), "DefaultConfig", natlink_inifile)
if not isfile(fallback_config_file):
raise OSError(f'fallback_config_file does not exist: "{fallback_config_file}"')
# try NATLINK_USERDIR setting (obsolete) and NATLINK_SETTINGSDIR (new):
natlink_settingsdir_from_env = getenv("NATLINK_SETTINGSDIR")
natlink_userdir_from_env_obsolete = getenv("NATLINK_USERDIR")
## issue warnings if old setting is still there and conflicts with new setting:
if natlink_userdir_from_env_obsolete:
if natlink_settingsdir_from_env and natlink_userdir_from_env_obsolete:
pass
elif natlink_settingsdir_from_env:
if not had_msg_error:
logging.warning('You defined env variable "NATLINK_SETTINGSDIR", but different from the obsolete env variable "NATLINK_USERDIR"...')
logging.warning('"NATLINK_SETTINGSDIR (valid): "%s"', natlink_settingsdir_from_env)
logging.warning('"NATLINK_USERDIR (obsolete): "%s"', natlink_userdir_from_env_obsolete)
had_msg_error = True
else:
## natlink_settingsdir_from_env is not set, but natlink_userdir_from_env_obsolete IS
if not had_msg_warning:
logging.warning('You have set env variable "NATLINK_USERDIR", but this variable is obsolete.')
logging.warning('Please specify the env variable "NATLINK_SETTINGSDIR" to "%s", and restart Dragon', natlink_userdir_from_env_obsolete)
had_msg_warning = True
if natlink_settingsdir_from_env:
nl_settings_dir = expand_path(natlink_settingsdir_from_env)
nl_settings_file = join(nl_settings_dir, natlink_inifile)
return [nl_settings_file, fallback_config_file]
# choose between .natlink/natlink.ini in home or the fallback_directory:
return [join(home, config_sub_dir, natlink_inifile), fallback_config_file]
[docs]
def startDap(config : NatlinkConfig) -> bool:
"""
Starts DAP (Debug Adapter Protocol) if there a DAP port specified in the config object.
returns True if the dap was started.
Natlink will startDap automatically if configured in the run method below.
If you need to start the DAP sooner, edit your code to make a call to startDap.
Similarly, if you want to start the DAP later, call startDap. You can call it from your grammar or
anywhere else.
"""
dap_started=False
logging.debug(f"testing dap , enabled {config.dap_enabled} port {config.dap_port}")
try:
logging.debug("Debugpy.configure ...")
debugpy.configure(python=f"{python_exec}")
logging.debug("Debugpy.listen ...")
debugpy.listen(config.dap_port)
dap_started=True
logging.debug(f"DAP Started on Port {config.dap_port} in {__file__}")
if config.dap_wait_for_debugger_attach_on_startup:
#use info level logging, the user will need to know as natlink and dragon will hang here.
#unti debuger is attached.
logging.info(f"waiting for debugger to attach using DAP in {__file__} ")
debugpy.wait_for_client()
return dap_started
except Exception as ee:
logging.info(f"""
Exception {ee} while starting DAP in {__file__}. Possible cause is incorrect python executable specified {python_exec}
""" )
def run() -> None:
default_logger=logging.getLogger("natlink")
dh = OutputDebugStringHandler()
sh=logging.StreamHandler(sys.stdout)
for h in [sh,dh]:
default_logger.addHandler(h)
default_logger.setLevel(logging.DEBUG)
logging.debug(f"{__file__} run()")
try:
# # TODO: remove this hack. As of October 2021, win32api does not load properly, except if
# # the package pywin32_system32 is explictly put on new dll_directory white-list
# pywin32_dir = os.path.join(sysconfig.get_path('platlib'), "pywin32_system32")
# if os.path.isdir(pywin32_dir):
# os.add_dll_directory(pywin32_dir)
#create a temporary logging handler, so we can log the startup of DAP
config = NatlinkConfig.from_first_found_file(config_locations())
dap_started = config.dap_enabled and startDap(config)
logger=logging.getLogger("natlinkcore")
logger.setLevel(logging.DEBUG)
main = NatlinkMain(logger, config)
main.setup_logger()
main.dap_started=dap_started
for h in [sh,dh]:
default_logger.removeHandler(h)
logging.debug(f"Dap enabled: {config.dap_enabled} port: {config.dap_port} ")
main.start()
except Exception as exc:
logging.info(f'Exception: "{exc}" in loader.run', file=sys.stderr)
logging.info(traceback.format_exc())
raise Exception from exc
if __name__ == "__main__":
natlink.natConnect()
run()
natlink.natDisconnect()