# rom ..config.ldap import DEFAULT_PORT_LDAP, DEFAULT_PORT_LDAPS
from ..config.ldap import DEFAULT_TIMEOUT
-__version__ = '0.6.2'
+__version__ = '0.7.1'
LOG = logging.getLogger(__name__)
_ = XLATOR.gettext
def __call__(self, parser, namespace, given_path, option_string=None):
path = Path(given_path)
- if must_absolute:
+ if self.must_absolute:
if not path.is_absolute():
msg = _("The path {!r} must be an absolute path.").format(given_path)
raise argparse.ArgumentError(self, msg)
Public available method to initiate the argument parser.
"""
- super(BaseLdapApplication, self).init_arg_parser()
-
group_title = _('LDAP options')
if self.use_default_ldap_connection:
group_title = _('Options for the default LDAP connection')
"Default: {}").format(DEFAULT_TIMEOUT),
)
+ super(BaseLdapApplication, self).init_arg_parser()
+
# -------------------------------------------------------------------------
def post_init(self):
"""
connect_info = self.cfg.ldap_connection[inst]
- if self.verbose > 1:
- LOG.debug(_("Connecting to LDAP server {} ...").format(connect_info.url))
+ ldap_server = self.get_ldap_server_obj(inst)
+ self.ldap_server[inst] = ldap_server
+
+ ldap_connection = self.connect_to_ldap_server(ldap_server, inst)
+ self.ldap_connection[inst] = ldap_connection
+
+ if self.verbose > 2:
+ msg = _("Info about LDAP server {}:").format(connect_info.url)
+ msg += ' ' + repr(ldap_connection)
+ LOG.debug(msg)
+
+ # -------------------------------------------------------------------------
+ def get_ldap_server_obj(self, inst):
+
+ connect_info = self.cfg.ldap_connection[inst]
+
+ if self.verbose > 2:
+ msg = _("Trying to get LDAP server object for {} ...").format(connect_info.url)
+ LOG.debug(msg)
server_opts = {}
if connect_info.use_ldaps:
server_opts['get_info'] = DSA
server_opts['mode'] = IP_V4_PREFERRED
server_opts['connect_timeout'] = self.cfg.ldap_timeout
- if self.verbose > 1:
+ if self.verbose > 2:
msg = _("Connect options to server {!r}:").format(connect_info.url)
msg += ' ' + pp(server_opts)
LOG.debug(msg)
ldap_server = Server(connect_info.host, **server_opts)
- self.ldap_server[inst] = ldap_server
if self.verbose > 2:
LOG.debug(_("LDAP server {s}: {re}").format(s=ldap_server, re=repr(ldap_server)))
- ldap_connection = Connection(
- ldap_server, connect_info.bind_dn, connect_info.bind_pw,
- client_strategy=SAFE_SYNC, auto_bind=True)
- self.ldap_connection[inst] = ldap_connection
+ return ldap_server
- if self.verbose > 2:
- msg = _("Info about LDAP server {}:").format(connect_info.url)
- msg += ' ' + repr(ldap_connection)
+ # -------------------------------------------------------------------------
+ def connect_to_ldap_server(self, ldap_server, inst, bind_dn=None, bind_pw=None):
+
+ connect_info = self.cfg.ldap_connection[inst]
+ if not bind_dn:
+ bind_dn = connect_info.bind_dn
+ if not bind_pw:
+ bind_pw = connect_info.bind_pw
+
+ if self.verbose > 1:
+ msg = _("Connecting to LDAP server {url} as {dn!r} ...").format(
+ url=connect_info.url, dn=bind_dn)
LOG.debug(msg)
+ ldap_connection = Connection(
+ ldap_server, bind_dn, bind_pw, client_strategy=SAFE_SYNC, auto_bind=True)
+
+ return ldap_connection
+
# -------------------------------------------------------------------------
def post_run(self):
return line.strip()
return None
+
# =============================================================================
if __name__ == "__main__":