from ldap3 import BASE, LEVEL, SUBTREE, DEREF_NEVER, DEREF_SEARCH, DEREF_BASE, DEREF_ALWAYS
from ldap3 import ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES
from ldap3 import MODIFY_ADD, MODIFY_DELETE, MODIFY_REPLACE
-from ldap3.core.exceptions import LDAPInvalidDnError
-
+from ldap3.core.exceptions import LDAPInvalidDnError, LDAPInvalidValueError
from ldap3.core.exceptions import LDAPException
# Own modules
from .config import LDAPMigrationConfiguration
-__version__ = '0.9.2'
+__version__ = '0.9.3'
LOG = logging.getLogger(__name__)
CFG_BASENAME = 'ldap-migration.ini'
self.unknown_objectclasses = CIStringSet()
self.dn_attr_types = CIStringSet()
self.pure_binary_attr_types = CIStringSet()
-
+ self.boolean_attr_types = CIStringSet()
super(LDAPMigrationApplication, self).__init__(
appname=appname, verbose=verbose, version=version, base_dir=base_dir,
res['integer_attribute_types'] = self.integer_attribute_types.as_list()
res['pure_binary_attr_types'] = self.pure_binary_attr_types.as_list()
res['dn_attr_types'] = self.dn_attr_types.as_list()
+ res['boolean_attr_types'] = self.boolean_attr_types.as_list()
return res
self.discover_integer_attribute_types()
self.discover_dn_attribute_types()
self.discover_pure_binary_attribute_types()
+ self.discover_boolean_attribute_types()
# -------------------------------------------------------------------------
def discover_target_object_classes(self):
LOG.debug("Discovered DN AttributeTypes:\n" + pp(
self.dn_attr_types.as_list()))
+ # -------------------------------------------------------------------------
+ def discover_boolean_attribute_types(self):
+
+ for key in self.attribute_types.keys():
+ attribute_type = self.attribute_types[key]
+ if not attribute_type['syntax']:
+ continue
+ if attribute_type['syntax'] == '1.3.6.1.4.1.1466.115.121.1.7':
+ self.boolean_attr_types.add(key)
+
+ if self.verbose > 1:
+ LOG.debug("Discovered boolean AttributeTypes:\n" + pp(
+ self.boolean_attr_types.as_list()))
+
# -------------------------------------------------------------------------
def discover_pure_binary_attribute_types(self):
continue
if tgt_at_name not in target_entry:
target_entry[tgt_at_name] = []
+ if attribute_name in self.boolean_attr_types:
+ if to_bool(single_attr):
+ single_attr = 'TRUE'
+ else:
+ single_attr = 'FALSE'
target_entry[tgt_at_name].append(single_attr)
else:
if src_attr != '':
- target_entry[tgt_at_name] = src_attr
+ if attribute_name in self.boolean_attr_types:
+ if to_bool(src_attr):
+ target_entry[tgt_at_name] = 'TRUE'
+ else:
+ target_entry[tgt_at_name] = 'FALSE'
+ else:
+ target_entry[tgt_at_name] = src_attr
if ('sunservice' in used_classes) or ('sunServiceComponent' in used_classes):
if 'organizationalUnit' not in used_classes:
continue
if src_at_name not in src_attributes:
src_attributes[src_at_name] = []
+ if src_at_name in self.boolean_attr_types:
+ if to_bool(single_attr):
+ single_attr = 'TRUE'
+ else:
+ single_attr = 'FALSE'
src_attributes[src_at_name].append(single_attr)
else:
if src_attr != '':
- src_attributes[src_at_name] = src_attr
+ if src_at_name in self.boolean_attr_types:
+ if to_bool(src_attr):
+ src_attributes[tgt_at_name] = 'TRUE'
+ else:
+ src_attributes[tgt_at_name] = 'FALSE'
+ else:
+ src_attributes[src_at_name] = src_attr
for tgt_at_name in tgt_entry['attributes']:
LOG.debug(msg)
self.count_modified += 1
if not self.simulate:
- mod_status, mod_result, mod_response, _ = self.target.modify(tgt_dn, changes)
+ try:
+ mod_status, mod_result, mod_response, _ = self.target.modify(tgt_dn, changes)
+ except LDAPException as e:
+ msg = "Modifying NOT successfull - {c}: {e}\n"
+ msg += "Source attributes:\n{sattr}\n"
+ msg += "Changes:\n{ch}"
+ msg = msg.format(
+ c=e.__class__.__name__, e=e,
+ sattr=pp(src_entry['attributes']), ch=pp(changes))
+ raise WriteLDAPItemError(msg)
if mod_status:
LOG.debug("Modifying successfull.")
if self.verbose > 2:
LOG.debug(msg)
self.count_added += 1
if not self.simulate:
- cr_status, cr_result, cr_response, _ = self.target.add(
- tgt_dn, object_class=tgt_obj_classes, attributes=tgt_entry)
+ try:
+ cr_status, cr_result, cr_response, _ = self.target.add(
+ tgt_dn, object_class=tgt_obj_classes, attributes=tgt_entry)
+ except LDAPException as e:
+ msg = "Modifying NOT successfull - {c}: {e}\n"
+ msg += "Source attributes:\n{sattr}\n"
+ msg += "Target-DN: {dn!r}\n"
+ msg += "Target Object classes:\n{ocs}\n"
+ msg += "Target attributes:\n{tattr}"
+ msg = msg.format(
+ c=e.__class__.__name__, e=e, sattr=pp(src_entry['attributes']),
+ dn=tgt_dn, ocs=pp(tgt_obj_classes), tattr=pp(tgt_entry))
+ raise WriteLDAPItemError(msg)
if cr_status:
LOG.debug("Creation successfull.")
if self.verbose > 2: