import os
import logging
import copy
+import re
+import datetime
+import json
+
+from json import JSONDecodeError
# Third party modules
# Own modules
-from .common import pp, to_bytes, to_utf8
+from .common import pp, compare_fqdn, to_bytes, to_utf8
from .errors import PpError
from .obj import PpBaseObjectError, PpBaseObject
-__version__ = '0.2.1'
+__version__ = '0.3.1'
LOG = logging.getLogger(__name__)
+
+TYPE_ORDER = {
+ 'SOA': 0,
+ 'NS': 1,
+ 'MX': 2,
+ 'A': 3,
+ 'AAAA': 4,
+ 'CNAME': 5,
+ 'SRV': 6,
+ 'TXT': 7,
+ 'SPF': 8,
+ 'PTR': 9,
+}
+
# =============================================================================
class PdnsApiRrsetError(PpBaseObjectError):
pass
+# =============================================================================
+def compare_rrsets(x, y):
+
+ if not isinstance(x, PdnsApiRrset):
+ raise TypeError("Argument x {!r} must be a PdnsApiRrset object.".format(x))
+
+ if not isinstance(y, PdnsApiRrset):
+ raise TypeError("Argument y {!r} must be a PdnsApiRrset object.".format(y))
+
+ ret = compare_fqdn(x.name, y.name)
+ if ret:
+ return ret
+
+ xt = 99
+ if x.type.upper() in TYPE_ORDER:
+ xt = TYPE_ORDER[x.type.upper()]
+ if y.type.upper() in TYPE_ORDER:
+ yt = TYPE_ORDER[y.type.upper()]
+
+ if xt < yt:
+ return -1
+ if xt > yt:
+ return 1
+ return 0
+
# =============================================================================
class PdnsApiRecord(PpBaseObject):
"The name of this record set."
return self._name
+ # -----------------------------------------------------------
+ @property
+ def name_unicode(self):
+ """The name of the resource record set in unicode, if it is an IDNA encoded zone."""
+ n = getattr(self, '_name', None)
+ if n is None:
+ return None
+ if 'xn--' in n:
+ return to_utf8(n).decode('idna')
+ return n
+
# -----------------------------------------------------------
@property
def type(self):
def ttl(self, value):
self._ttl = int(value)
- # -----------------------------------------------------------
+ # -------------------------------------------------------------------------
@classmethod
def init_from_dict(
cls, data, appname=None, verbose=0, version=__version__, base_dir=None, initialized=None):
return rrset
+ # -------------------------------------------------------------------------
+ def name_relative(self, reference):
+
+ # current name must be an absolute name
+ if not self.name.endswith('.'):
+ return self.name
+
+ # reference name must be an absolute name
+ if not reference.endswith('.'):
+ return self.name
+
+ ref_escaped = r'\.' + re.escape(reference) + r'$'
+ rel_name = re.sub(ref_escaped, '', self.name)
+ return rel_name
+
# -------------------------------------------------------------------------
def as_dict(self, short=True):
"""
res['name'] = self.name
res['type'] = self.type
res['ttl'] = self.ttl
+ res['name_unicode'] = self.name_unicode
res['comments'] = copy.copy(self.comments)
res['records'] = []
rrset.comments = copy.copy(self.comments)
rrset.records = copy.copy(self.records)
+ # -------------------------------------------------------------------------
+ def get_zone_lines(self, rrname_len=12, reference=None, default_ttl=None):
+
+ lines = ''
+ for comment in self.comments:
+
+ if self.verbose > 3:
+ LOG.debug("Formatting comment: {}".format(comment))
+
+ try:
+ cmt = eval(comment)
+ mtime = datetime.datetime.utcfromtimestamp(cmt['modified_at'])
+ if cmt['content']:
+ line = "; {} {}: {}\n".format(
+ mtime.isoformat(' '), cmt['account'], cmt['content'])
+ else:
+ line = "; {} {}\n".format(mtime.isoformat(' '), cmt['account'])
+ except JSONDecodeError as e:
+ LOG.warn("Could not decode comment {!r}: {}".format(comment, e))
+ line = '; {}\n'.format(comment)
+
+ lines += line
+
+ i = 0
+ for record in self.records:
+ show_name = ''
+ if not i:
+ if reference:
+ show_name = self.name_relative(reference)
+ else:
+ show_name = self.name
+ i += 1
+ if record.disabled:
+ show_name = '; ' + show_name
+ ttl = self.ttl
+ if default_ttl and default_ttl == self.ttl:
+ ttl = ''
+ tpl = "{name:<{name_len}} {ttl:>8} {type:<6} {content}\n"
+ line = tpl.format(
+ name=show_name, name_len=rrname_len, ttl=ttl,
+ type=self.type, content=record.content)
+ lines += line
+
+ return lines
+
# =============================================================================
if __name__ == "__main__":
from .pdns_app import PpPDNSAppError, PpPDNSApplication, PDNSApiNotFoundError, PDNSApiValidationError
from .pdns_zone import PdnsApiZone
+from .pdns_record import compare_rrsets
-__version__ = '0.3.1'
+__version__ = '0.4.1'
LOG = logging.getLogger(__name__)
if self.verbose > 2:
LOG.debug("Zone object:\n{}".format(pp(zone.as_dict())))
+ msg = "All information about zone {}:".format(zout)
+ print("\n{}".format(msg))
+ print('-' * len(msg))
+
+ params = {
+ 'name': zone.name,
+ 'name_unicode': zone.name_unicode,
+ 'kind': zone.kind,
+ 'serial': zone.serial,
+ 'soa_edit': zone.soa_edit,
+ 'dnssec': 'no',
+ 'account': zone.account,
+ 'default_ttl': self.default_ttl,
+ }
+ if zone.dnssec:
+ params['dnssec'] = 'yes'
+
+ msg = textwrap.dedent("""\
+ Name (Punicode): {name}
+ Name (UTF-8): {name_unicode}
+ Kind: {kind}
+ Serial: {serial}
+ SOA edit: {soa_edit}
+ DNSSEC enabled: {dnssec}
+ Default TTL: {default_ttl}
+ Account info: {account}
+ """).strip().format(**params)
+
+ if zone.masters:
+ i = 0
+ for master in masters:
+ if i:
+ msg += " {!r}".format(master)
+ else:
+ msg += "Masters: {!r}".format(master)
+ i += 1
+ print(msg)
+
+ enabled = 0
+ disabled = 0
+
+ msg = "All Resource Records:"
+ print("\n{}".format(msg))
+ print('-' * len(msg))
+
+ rrname_len = 1
+ for rrset in zone.rrsets:
+ name = rrset.name_relative(zone.name)
+ if len(name) > rrname_len:
+ rrname_len = len(name)
+ for record in rrset.records:
+ if record.disabled:
+ disabled += 1
+ else:
+ enabled += 1
+ rrname_len += 2
+ if self.verbose > 2:
+ LOG.debug("Length of longest rrset name: {}".format(rrname_len))
+
+ for rrset in sorted(zone.rrsets, key=lambda x: cmp_to_key(compare_rrsets)(x)):
+ msg = rrset.get_zone_lines(
+ rrname_len=rrname_len, reference=zone.name,
+ default_ttl=self.default_ttl).rstrip()
+ print(msg)
+
+ msg = "\nFound {} enabled and {} disabled records.".format(
+ enabled, disabled)
+ print(msg)
+
return True
# =============================================================================