#!/usr/bin/env python3 # The MIT License (MIT) # # Copyright (c) 2015 # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. # File included from https://github.com/ejordangottlieb/pyswmap # Thanks to jordan ;) # - Pierre # # There is still a great deal of work required on this module. Please # use with caution. # -Jordan import sys from ipaddress import ( IPv6Address, IPv6Network, ip_network, ip_address, ) from math import ( log, ) class MapCalc(object): def __init__(self,**bmr): #rulev6,rulev4): self.portranges = False # Validate and set BMR and BMR derived values self._check_bmr_values(bmr) def _check_bmr_values(self,bmr): # Assume these values have not been supplied. Validate later. self.ealen = False self.ratio = False # Validate that a proper PSID Offset has been set if 'psidoffset' not in bmr: # Set Default PSID Offset of 6 if it is not set self.psidoffset = 6 else: self.psidoffset = self._psid_offset(bmr['psidoffset']) # Validate that a proper IPv4 rule prefix is defined if 'rulev4' not in bmr: print("The rule IPv4 prefix has not been set") sys.exit(1) else: self.rulev4 = self._ipv4_rule(bmr['rulev4']) # Validate that a proper IPv6 rule prefix is defined if 'rulev6' not in bmr: print("The rule IPv6 prefix has not been set") sys.exit(1) else: self.rulev6 = self._ipv6_rule(bmr['rulev6']) # Check if EA length was passed if 'ealen' not in bmr: self.ealen = False else: self.ealen = bmr['ealen'] self.ratio = self._calc_ratio(bmr['ealen']) # Check if sharing ratio was passed or calculated by _calc_ratio if 'ratio' not in bmr: # Skip if we have already calculated ratio if not (self.ratio): self.ratio = False else: if (self.ealen): # Check to see if supplied EA length contradicts supplied ratio if ( bmr['ratio'] != self.ratio ): eavalue = "EA value {}".format(self.ealen) sharingratio = "sharing ratio {}".format(bmr['ratio']) print("Supplied {} and {} are contradictory".format( eavalue, sharingratio) ) sys.exit(1) else: self.ratio = bmr['ratio'] self.ealen = self._calc_ea(bmr['ratio']) # EA length or sharing ratio must be set if not ( self.ealen or self.ratio): print("The BMR must include an EA length or sharing ratio") sys.exit(1) # Since we have not hit an exception we can calculate the port bits self.portbits = self._calc_port_bits() def _ipv4_rule(self,rulev4): try: self.rulev4mask = ip_network( rulev4, strict=False ).prefixlen except ValueError: print("Invalid IPv4 prefix {}".format(rulev4)) sys.exit(1) self.rulev4object = ip_network(rulev4) return rulev4 def _ipv6_rule(self,rulev6): try: self.rulev6mask = IPv6Network( rulev6, strict=False ).prefixlen except ValueError: print("Invalid IPv6 prefix {}".format(rulev6)) sys.exit(1) return rulev6 def _psid_offset(self,psidoffset): PSIDOFFSET_MAX = 6 if psidoffset in range(0,PSIDOFFSET_MAX+1): return psidoffset else: print("Invalid PSID Offset value: {}".format(psidoffset)) sys.exit(1) def _psid_range(self,x): rset = [] for i in range(0,x+1): rset.append(2**i) return rset def _calc_port_bits(self): portbits = 16 - self.psidoffset - self.psidbits return portbits def _calc_ea(self,ratio): if ratio not in ( self._psid_range(16) ): print("Invalid ratio {}".format(ratio)) print("Ratio between 2 to the power of 0 thru 16") sys.exit(1) if ( 1 == ratio): self.psidbits = 0 else: self.psidbits = int(log(ratio,2)) ealen = self.psidbits + ( 32 - self.rulev4mask ) return ealen def _calc_ratio(self,ealen): maskbits = 32 - self.rulev4mask if ( ealen < maskbits ): print("EA of {} incompatible with rule IPv4 prefix {}".format( ealen, self.rulev4, ) ) print("EA length must be at least {} bits".format( maskbits, ) ) sys.exit(1) self.psidbits = ealen - ( 32 - self.rulev4mask ) if ( self.psidbits > 16): print("EA length of {} is too large".format( ealen, ) ) print("EA should not exceed {} for rule IPv4 prefix {}".format( maskbits + 16, self.rulev4, ) ) sys.exit(1) ratio = 2**self.psidbits return ratio def gen_psid(self,portnum): if ( portnum < self.start_port() ): print("port value is less than allowed by PSID Offset") sys.exit(1) psid = (portnum & ((2**self.psidbits - 1) << self.portbits)) psid = psid >> self.portbits return psid def port_ranges(self): return 2**self.psidoffset - 1 def start_port(self): if self.psidoffset == 0: return 0 return 2**(16 - self.psidoffset) def port_list(self,psid): startrange = psid * (2**self.portbits) + self.start_port() increment = (2**self.psidbits) * (2**self.portbits) portlist = [ ] for port in range(startrange,startrange + 2**self.portbits): if port >= 65536: continue portlist.append(port) for x in range(1,self.port_ranges()): startrange += increment for port in range(startrange,startrange + 2**self.portbits): portlist.append(port) return portlist def ipv4_index(self,ipv4addr): if ip_address(ipv4addr) in ip_network(self.rulev4): x = ip_address(ipv4addr) y = ip_network(self.rulev4,strict=False).network_address self.ipv4addr = x return ( int(x) - int(y) ) else: print("Error: IPv4 address {} not in Rule IPv4 subnet {}".format( ipv4add, ip_network(self.rulev4,strict=False).network_address)) sys.exit(1) def _calc_ipv6bit_pos(self): addroffset = 128 - (self.rulev6mask + ( self.ealen - self.psidbits)) psidshift = 128 - ( self.rulev6mask + self.ealen ) return [addroffset,psidshift] def _append_map_eabits(self,ipv4index,addroffset,psidshift,psid): rulev6base = IPv6Network(self.rulev6,strict=False).network_address map_prefix = int(rulev6base) | ( ipv4index << addroffset ) map_fullprefix = map_prefix | ( psid << psidshift) return map_fullprefix def get_mapce_addr(self,ipv4addr,psid): ipv4index = self.ipv4_index(ipv4addr) (addroffset,psidshift) = self._calc_ipv6bit_pos() map_fullprefix = self._append_map_eabits(ipv4index, addroffset, psidshift, psid) mapv4iid = map_fullprefix | ( int(self.ipv4addr) << 16 ) map_full_address = mapv4iid | psid mapce_address = "{}".format(IPv6Address(map_full_address)) return mapce_address def get_mapce_prefix(self,ipv4addr,psid): ipv4index = self.ipv4_index(ipv4addr) (addroffset,psidshift) = self._calc_ipv6bit_pos() map_fullprefix = self._append_map_eabits(ipv4index, addroffset, psidshift, psid) mapce_prefix = "{}/{}".format( IPv6Address(map_fullprefix), self.rulev6mask + self.ealen ) return mapce_prefix def get_map_ipv4(self,mapce_address): ipv4 = (int(IPv6Address(mapce_address)) & ( 0xffffffff << 16 )) >> 16 return ip_address(ipv4) class DmrCalc(object): def __init__(self,dmr): # Validate and set BMR and BMR derived values self.dmrprefix = self._check_dmr_prefix(dmr) def embed_6052addr(self,ipv4addr): try: ipv4addrint = int(ip_address(ipv4addr)) except ValueError: print("Invalid IPv4 address {}".format(ipv4addr)) sys.exit(1) if ( self.dmrprefix.prefixlen == 64 ): ipv6int = ipv4addrint << 24 ipv6int += int(self.dmrprefix.network_address) return IPv6Address(ipv6int) if ( self.dmrprefix.prefixlen == 96 ): ipv6int = ipv4addrint ipv6int += int(self.dmrprefix.network_address) return IPv6Address(ipv6int) def _check_dmr_prefix(self,dmrprefix): try: self.dmrmask = IPv6Network( dmrprefix, strict=False ).prefixlen except ValueError: print("Invalid IPv6 prefix {}".format(prefix)) sys.exit(1) if self.dmrmask not in (32,40,48,56,64,96): print("Invalid prefix mask /{}".format(self.dmrmask)) sys.exit(1) return IPv6Network(dmrprefix) if __name__ == "__main__": m = DmrCalc('fd80::/48') print(m.dmrprefix)