# 3rd party modules
+import six
import pytz
from tzlocal import get_localzone
# Own modules
from fb_tools.colored import ColoredFormatter
from fb_tools.errors import IoTimeoutError
-from fb_tools.common import pp, is_sequence, human2mbytes, to_bool
+from fb_tools.common import pp, is_sequence, human2mbytes, to_bool, compare_ldap_values
from fb_tools.app import BaseApplication, DirectoryOptionAction
from fb_tools.config import CfgFileOptionAction
from fb_tools.errors import FbAppError
from .idict import CaseInsensitiveDict
from .istringset import CaseInsensitiveStringSet
-__version__ = '0.8.11'
+__version__ = '0.8.12'
LOG = logging.getLogger(__name__)
CFG_BASENAME = 'ldap-migration.ini'
self.migrated_entries = CaseInsensitiveDict()
self.integer_attribute_types = CaseInsensitiveStringSet([])
self.src_items_not_found = CaseInsensitiveStringSet([])
+ self.unknown_attributetypes = CaseInsensitiveStringSet([])
+ self.unknown_objectclasses = CaseInsensitiveStringSet([])
+ self.dn_attr_types = CaseInsensitiveStringSet([])
+ self.pure_binary_attr_types = CaseInsensitiveStringSet([])
+
super(LDAPMigrationApplication, self).__init__(
appname=appname, verbose=verbose, version=version, base_dir=base_dir,
res['struct_dns'] = self.struct_dns.as_dict(short=short)
res['group_entries'] = self.group_entries.as_dict(short=short)
res['integer_attribute_types'] = self.integer_attribute_types.as_list()
+ res['pure_binary_attr_types'] = self.pure_binary_attr_types.as_list()
+ res['dn_attr_types'] = self.dn_attr_types.as_list()
return res
"Attribute {attr!r} of item {dn!r} not found in the list of available "
"attribute types.").format(attr=attribute, dn=src_dn)
LOG.warn(msg)
+ self.unknown_attributetypes.add(attribute)
continue
src_val = src_entry['attributes'][attribute]
if attribute in self.integer_attribute_types:
val = self._get_integer_value(src_val)
- if self.verbose > 1:
+ if self.verbose > 2:
msg = "Migrated integer value: {old!r} => {new!r}.".format(
old=src_val, new=val)
- LOG.warn(msg)
+ LOG.debug(msg)
+
+ elif attribute in self.dn_attr_types:
+ val = []
+ if is_sequence(src_val):
+ for old_val in src_val:
+ new_val = self.mangle_dn(old_val)
+ val.append(new_val)
+ else:
+ val = self.mangle_dn(src_val)
else:
val = copy.copy(src_val)
self.discover_target_object_classes()
self.discover_target_attribute_types()
self.discover_integer_attribute_types()
+ self.discover_dn_attribute_types()
+ self.discover_pure_binary_attribute_types()
# -------------------------------------------------------------------------
def discover_target_object_classes(self):
LOG.debug("Discovered Integer AttributeTypes:\n" + pp(
self.integer_attribute_types.as_list()))
+ # -------------------------------------------------------------------------
+ def discover_dn_attribute_types(self):
+
+ for key in self.attribute_types.keys():
+ attribute_type = self.attribute_types[key]
+ if not attribute_type['syntax']:
+ continue
+ if attribute_type['syntax'] == '1.3.6.1.4.1.1466.115.121.1.12':
+ self.dn_attr_types.add(key)
+
+ if self.verbose > 2:
+ LOG.debug("Discovered DN AttributeTypes:\n" + pp(
+ self.dn_attr_types.as_list()))
+
+ # -------------------------------------------------------------------------
+ def discover_pure_binary_attribute_types(self):
+
+ for key in self.attribute_types.keys():
+ attribute_type = self.attribute_types[key]
+ if not attribute_type['syntax']:
+ continue
+ if attribute_type['syntax'] in (
+ '1.3.6.1.4.1.1466.115.121.1.5',
+ '1.3.6.1.4.1.1466.115.121.1.28'):
+ self.pure_binary_attr_types.add(key)
+
+ if self.verbose > 2:
+ LOG.debug("Discovered Pure Binary AttributeTypes:\n" + pp(
+ self.pure_binary_attr_types.as_list()))
+
# -------------------------------------------------------------------------
def check_log_dir(self):
"""Checking existence of logging-dir and creating it, if necessary."""
msg += "on target LDAP server."
msg = msg.format(oc=src_oc_name, dn=src_dn)
LOG.debug(msg)
+ self.unknown_objectclasses.add(src_oc_name)
continue
tgt_oc_name = self.object_classes.get_key(src_oc_name)
used_classes.add(tgt_oc_name)
do_replace = False
if tgt_at_name in tgt_attributes:
cur_tgt_value = tgt_attributes[tgt_at_name]
- if self.compare_values(src_value, cur_tgt_value):
+ both_equal = False
+ if tgt_at_name in self.pure_binary_attr_types:
+ both_equal = self.compare_binary_values(src_value, cur_tgt_value)
+ else:
+ both_equal = compare_ldap_values(src_value, cur_tgt_value)
+ if both_equal:
if self.verbose > 3:
msg = (
"Attribute {atr!r} of source DN {sdn!r} is equal to "
if tgt_entry:
- changes = self.generate_modify_data(src_entry, tgt_entry, src_dn, tgt_dn)
+ try:
+ changes = self.generate_modify_data(src_entry, tgt_entry, src_dn, tgt_dn)
+ except UnicodeDecodeError as e:
+ msg = "Source attributes:\n{}".format(pp(src_entry['attributes']))
+ LOG.debug(msg)
+ msg = "Current target attributes:\n{}".format(pp(tgt_entry['attributes']))
+ LOG.debug(msg)
+ msg = "UnicodeDecodeError on generating changes for source DN {s!r}: {e}".format(
+ s=src_dn, e=e)
+ raise WriteLDAPItemError(msg)
if changes:
if self.verbose:
LOG.info("Updating target entry {!r} ...".format(tgt_dn))
if self.verbose > 2:
+ msg = "Source attributes:\n{}".format(pp(src_entry['attributes']))
+ LOG.debug(msg)
+ msg = "Current target attributes:\n{}".format(pp(tgt_entry['attributes']))
+ LOG.debug(msg)
msg = "Changes on target entry {tdn!r}:\n{ch}".format(
tdn=tgt_dn, ch=pp(changes))
LOG.debug(msg)
print(line, file=fh, flush=True)
# -------------------------------------------------------------------------
- def compare_values(self, first, second):
+ def compare_binary_values(self, first, second):
if is_sequence(first) and not is_sequence(second):
if len(first) == 1:
- value = first[0]
- if str(value).lower() == str(second).lower():
+ if first[0] == second:
return True
return False
if is_sequence(second) and not is_sequence(first):
if len(second) == 1:
- value = second[0]
- if str(value).lower() == str(first).lower():
+ if first == second[0]:
return True
return False
if not is_sequence(first):
# second is also not an array at this point
- if str(first).lower() == str(second).lower():
+ if first == second:
return True
return False
# Both parameters are arays
first_array = []
- for val in first:
- first_array.append(str(val).lower())
- first_array.sort()
+ for val in sorted(first):
+ first_array.append(val)
+
second_array = []
- for val in second:
- second_array.append(str(val).lower())
- second_array.sort()
+ for val in sorted(second):
+ second_array.append(val)
if first_array == second_array:
return True
+
return False
# -------------------------------------------------------------------------
print(self.colored('###################', 'AQUA'))
for dn in self.src_items_not_found:
print(self.colored(' * {!r}'.format(dn), 'AQUA'))
+ if self.verbose:
+ if len(self.unknown_attributetypes):
+ print()
+ print(self.colored('Unknown and not migrated Attribute Types:', 'AQUA'))
+ print(self.colored('#########################################', 'AQUA'))
+ for at in self.unknown_attributetypes:
+ print(self.colored(' * {!r}'.format(at), 'AQUA'))
+ if len(self.unknown_objectclasses):
+ print()
+ print(self.colored('Unknown and not migrated Object Classes:', 'AQUA'))
+ print(self.colored('########################################', 'AQUA'))
+ for oc in self.unknown_objectclasses:
+ print(self.colored(' * {!r}'.format(oc), 'AQUA'))
finally:
self.disconnect()