1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
|
#!/router/bin/python
from interfaces_e import IFType
import CustomLogger
import misc_methods
import telnetlib
import socket
import time
from collections import OrderedDict
class CCommandCache(object):
def __init__(self):
self.__gen_clean_data_structure()
def __gen_clean_data_structure (self):
self.cache = {"IF" : OrderedDict(),
"CONF" : [],
"EXEC" : []}
def __list_append (self, dest_list, cmd):
if isinstance(cmd, list):
dest_list.extend( cmd )
else:
dest_list.append( cmd )
def add (self, cmd_type, cmd, interface = None):
if interface is not None: # this is an interface ("IF") config command
if interface in self.cache['IF']:
# interface commands already exists
self.__list_append(self.cache['IF'][interface], cmd)
else:
# no chached commands for this interface
self.cache['IF'][interface] = []
self.__list_append(self.cache['IF'][interface], cmd)
else: # this is either a CONF or EXEC command
self.__list_append(self.cache[cmd_type.upper()], cmd)
def dump_config (self):
# dump IF config:
print("configure terminal")
for intf, intf_cmd_list in self.cache['IF'].items():
print("interface {if_name}".format( if_name = intf ))
print('\n'.join(intf_cmd_list))
if self.cache['IF']:
# add 'exit' note only if if config actually took place
print('exit') # exit to global config mode
# dump global config
if self.cache['CONF']:
print('\n'.join(self.cache['CONF']))
# exit back to en mode
print("exit")
# dump exec config
if self.cache['EXEC']:
print('\n'.join(self.cache['EXEC']))
def get_config_list (self):
conf_list = []
conf_list.append("configure terminal")
for intf, intf_cmd_list in self.cache['IF'].items():
conf_list.append( "interface {if_name}".format( if_name = intf ) )
conf_list.extend( intf_cmd_list )
if len(conf_list)>1:
# add 'exit' note only if if config actually took place
conf_list.append("exit")
conf_list.extend( self.cache['CONF'] )
conf_list.append("exit")
conf_list.extend( self.cache['EXEC'] )
return conf_list
def clear_cache (self):
# clear all pointers to cache data (erase the data structure)
self.cache.clear()
# Re-initialize the cache
self.__gen_clean_data_structure()
pass
class CCommandLink(object):
def __init__(self, silent_mode = False, debug_mode = False):
self.history = []
self.virtual_mode = True
self.silent_mode = silent_mode
self.telnet_con = None
self.debug_mode = debug_mode
def __transmit (self, cmd_list, **kwargs):
self.history.extend(cmd_list)
if not self.silent_mode:
print('\n'.join(cmd_list)) # prompting the pushed platform commands
if not self.virtual_mode:
# transmit the command to platform.
return self.telnet_con.write_ios_cmd(cmd_list, debug_mode = self.debug_mode, **kwargs)
def run_command (self, cmd_list, **kwargs):
response = ''
for cmd in cmd_list:
# check which type of cmd we handle
if isinstance(cmd, CCommandCache):
tmp_response = self.__transmit( cmd.get_config_list(), **kwargs ) # join the commands with new-line delimiter
else:
tmp_response = self.__transmit([cmd], **kwargs)
if not self.virtual_mode:
response += tmp_response
return response
def run_single_command (self, cmd, **kwargs):
return self.run_command([cmd], **kwargs)
def get_history (self, as_string = False):
if as_string:
return '\n'.join(self.history)
else:
return self.history
def clear_history (self):
# clear all pointers to history data (erase the data structure)
del self.history[:]
# Re-initialize the histoyr with clear one
self.history = []
def launch_platform_connectivity (self, device_config_obj):
connection_info = device_config_obj.get_platform_connection_data()
self.telnet_con = CIosTelnet( **connection_info )
self.virtual_mode = False # if physical connectivity was successful, toggle virtual mode off
def close_platform_connection(self):
if self.telnet_con is not None:
self.telnet_con.close()
class CDeviceCfg(object):
def __init__(self, cfg_yaml_path = None):
if cfg_yaml_path is not None:
(self.platform_cfg, self.tftp_cfg) = misc_methods.load_complete_config_file(cfg_yaml_path)[1:3]
self.interfaces_cfg = self.platform_cfg['interfaces'] # extract only the router interface configuration
def set_platform_config(self, config_dict):
self.platform_cfg = config_dict
self.interfaces_cfg = self.platform_cfg['interfaces']
def set_tftp_config(self, tftp_cfg):
self.tftp_cfg = tftp_cfg
def get_interfaces_cfg (self):
return self.interfaces_cfg
def get_ip_address (self):
return self.__get_attr('ip_address')
def get_line_password (self):
return self.__get_attr('line_pswd')
def get_en_password (self):
return self.__get_attr('en_pswd')
def get_mgmt_interface (self):
return self.__get_attr('mgmt_interface')
def get_platform_connection_data (self):
return { 'host' : self.get_ip_address(), 'line_pass' : self.get_line_password(), 'en_pass' : self.get_en_password() }
def get_tftp_info (self):
return self.tftp_cfg
def get_image_name (self):
return self.__get_attr('image')
def __get_attr (self, attr):
return self.platform_cfg[attr]
def dump_config (self):
import yaml
print(yaml.dump(self.interfaces_cfg, default_flow_style=False))
class CIfObj(object):
_obj_id = 0
def __init__(self, if_name, ipv4_addr, ipv6_addr, src_mac_addr, dest_mac_addr, dest_ipv6_mac_addr, if_type):
self.__get_and_increment_id()
self.if_name = if_name
self.if_type = if_type
self.src_mac_addr = src_mac_addr
self.dest_mac_addr = dest_mac_addr
self.dest_ipv6_mac_addr = dest_ipv6_mac_addr
self.ipv4_addr = ipv4_addr
self.ipv6_addr = ipv6_addr
self.pair_parent = None # a pointer to CDualIfObj which holds this interface and its pair-complement
def __get_and_increment_id (self):
self._obj_id = CIfObj._obj_id
CIfObj._obj_id += 1
def get_name (self):
return self.if_name
def get_src_mac_addr (self):
return self.src_mac_addr
def get_dest_mac (self):
return self.dest_mac_addr
def get_ipv6_dest_mac (self):
if self.dest_mac_addr != 0:
return self.dest_mac_addr
else:
return self.dest_ipv6_mac_addr
def get_id (self):
return self._obj_id
def get_if_type (self):
return self.if_type
def get_ipv4_addr (self):
return self.ipv4_addr
def get_ipv6_addr (self):
return self.ipv6_addr
def set_ipv4_addr (self, addr):
self.ipv4_addr = addr
def set_ipv6_addr (self, addr):
self.ipv6_addr = addr
def set_pair_parent (self, dual_if_obj):
self.pair_parent = dual_if_obj
def get_pair_parent (self):
return self.pair_parent
def is_client (self):
return (self.if_type == IFType.Client)
def is_server (self):
return (self.if_type == IFType.Server)
pass
class CDualIfObj(object):
_obj_id = 0
def __init__(self, vrf_name, client_if_obj, server_if_obj):
self.__get_and_increment_id()
self.vrf_name = vrf_name
self.client_if = client_if_obj
self.server_if = server_if_obj
# link if_objects to its parent dual_if
self.client_if.set_pair_parent(self)
self.server_if.set_pair_parent(self)
pass
def __get_and_increment_id (self):
self._obj_id = CDualIfObj._obj_id
CDualIfObj._obj_id += 1
def get_id (self):
return self._obj_id
def get_vrf_name (self):
return self.vrf_name
def is_duplicated (self):
return self.vrf_name != None
class CIfManager(object):
_ipv4_gen = misc_methods.get_network_addr()
_ipv6_gen = misc_methods.get_network_addr(ip_type = 'ipv6')
def __init__(self):
self.interfarces = OrderedDict()
self.dual_intf = []
self.full_device_cfg = None
def __add_if_to_manager (self, if_obj):
self.interfarces[if_obj.get_name()] = if_obj
def __add_dual_if_to_manager (self, dual_if_obj):
self.dual_intf.append(dual_if_obj)
def __get_ipv4_net_client_addr(self, ipv4_addr):
return misc_methods.get_single_net_client_addr (ipv4_addr)
def __get_ipv6_net_client_addr(self, ipv6_addr):
return misc_methods.get_single_net_client_addr (ipv6_addr, {'7' : 1}, ip_type = 'ipv6')
def load_config (self, device_config_obj):
self.full_device_cfg = device_config_obj
# first, erase all current config
self.interfarces.clear()
del self.dual_intf[:]
# than, load the configuration
intf_config = device_config_obj.get_interfaces_cfg()
# finally, parse the information into data-structures
for intf_pair in intf_config:
# generate network addresses for client side, and initialize client if object
tmp_ipv4_addr = self.__get_ipv4_net_client_addr (next(CIfManager._ipv4_gen)[0])
tmp_ipv6_addr = self.__get_ipv6_net_client_addr (next(CIfManager._ipv6_gen))
if 'dest_mac_addr' in intf_pair['client']:
client_dest_mac = intf_pair['client']['dest_mac_addr']
else:
client_dest_mac = 0
if 'dest_ipv6_mac_addr' in intf_pair['client']:
client_dest_ipv6_mac = intf_pair['client']['dest_ipv6_mac_addr']
else:
client_dest_ipv6_mac = 0
client_obj = CIfObj(if_name = intf_pair['client']['name'],
ipv4_addr = tmp_ipv4_addr,
ipv6_addr = tmp_ipv6_addr,
src_mac_addr = intf_pair['client']['src_mac_addr'],
dest_mac_addr = client_dest_mac,
dest_ipv6_mac_addr = client_dest_ipv6_mac,
if_type = IFType.Client)
# generate network addresses for server side, and initialize server if object
tmp_ipv4_addr = self.__get_ipv4_net_client_addr (next(CIfManager._ipv4_gen)[0])
tmp_ipv6_addr = self.__get_ipv6_net_client_addr (next(CIfManager._ipv6_gen))
if 'dest_mac_addr' in intf_pair['server']:
server_dest_mac = intf_pair['server']['dest_mac_addr']
else:
server_dest_mac = 0
if 'dest_ipv6_mac_addr' in intf_pair['server']:
server_dest_ipv6_mac = intf_pair['server']['dest_ipv6_mac_addr']
else:
server_dest_ipv6_mac = 0
server_obj = CIfObj(if_name = intf_pair['server']['name'],
ipv4_addr = tmp_ipv4_addr,
ipv6_addr = tmp_ipv6_addr,
src_mac_addr = intf_pair['server']['src_mac_addr'],
dest_mac_addr = server_dest_mac,
dest_ipv6_mac_addr = server_dest_ipv6_mac,
if_type = IFType.Server)
dual_intf_obj = CDualIfObj(vrf_name = intf_pair['vrf_name'],
client_if_obj = client_obj,
server_if_obj = server_obj)
# update single interfaces pointers
client_obj.set_pair_parent(dual_intf_obj)
server_obj.set_pair_parent(dual_intf_obj)
# finally, update the data-structures with generated objects
self.__add_if_to_manager(client_obj)
self.__add_if_to_manager(server_obj)
self.__add_dual_if_to_manager(dual_intf_obj)
def get_if_list (self, if_type = IFType.All, is_duplicated = None):
result = []
for if_name,if_obj in self.interfarces.items():
if (if_type == IFType.All) or ( if_obj.get_if_type() == if_type) :
if (is_duplicated is None) or (if_obj.get_pair_parent().is_duplicated() == is_duplicated):
# append this if_obj only if matches both IFType and is_duplicated conditions
result.append(if_obj)
return result
def get_duplicated_if (self):
result = []
for dual_if_obj in self.dual_intf:
if dual_if_obj.get_vrf_name() is not None :
result.extend( (dual_if_obj.client_if, dual_if_obj.server_if) )
return result
def get_dual_if_list (self, is_duplicated = None):
result = []
for dual_if in self.dual_intf:
if (is_duplicated is None) or (dual_if.is_duplicated() == is_duplicated):
result.append(dual_if)
return result
def dump_if_config (self):
if self.full_device_cfg is None:
print("Device configuration isn't loaded.\nPlease load config and try again.")
else:
self.full_device_cfg.dump_config()
class AuthError(Exception):
pass
class CIosTelnet(telnetlib.Telnet):
AuthError = AuthError
# wrapper for compatibility with Python2/3, convert input to bytes
def str_to_bytes_wrapper(self, func, text, *args, **kwargs):
if type(text) in (list, tuple):
text = [elem.encode('ascii') if type(elem) is str else elem for elem in text]
res = func(self, text.encode('ascii') if type(text) is str else text, *args, **kwargs)
return res.decode() if type(res) is bytes else res
def read_until(self, *args, **kwargs):
return self.str_to_bytes_wrapper(telnetlib.Telnet.read_until, *args, **kwargs)
def write(self, *args, **kwargs):
return self.str_to_bytes_wrapper(telnetlib.Telnet.write, *args, **kwargs)
def expect(self, *args, **kwargs):
res = self.str_to_bytes_wrapper(telnetlib.Telnet.expect, *args, **kwargs)
return [elem.decode() if type(elem) is bytes else elem for elem in res]
def __init__ (self, host, line_pass, en_pass, port = 23, str_wait = "#"):
telnetlib.Telnet.__init__(self)
self.host = host
self.port = port
self.line_passwd = line_pass
self.enable_passwd = en_pass
self.pr = str_wait
# self.set_debuglevel (1)
try:
self.open(self.host,self.port, timeout = 5)
self.read_until("word:",1)
self.write("{line_pass}\n".format(line_pass = self.line_passwd) )
res = self.read_until(">",1)
if 'Password' in res:
raise AuthError('Invalid line password was provided')
self.write("enable 15\n")
self.read_until("d:",1)
self.write("{en_pass}\n".format(en_pass = self.enable_passwd) )
res = self.read_until(self.pr,1)
if 'Password' in res:
raise AuthError('Invalid en password was provided')
self.write_ios_cmd(['terminal length 0'])
except socket.timeout:
raise socket.timeout('A timeout error has occured.\nCheck platform connectivity or the hostname defined in the config file')
except Exception as inst:
raise
def write_ios_cmd (self, cmd_list, result_from = 0, timeout = 60, **kwargs):
assert (isinstance (cmd_list, list) == True)
self.read_until(self.pr, timeout = 1)
res = ''
if 'read_until' in kwargs:
wf = kwargs['read_until']
else:
wf = self.pr
for idx, cmd in enumerate(cmd_list):
start_time = time.time()
self.write(cmd+'\r\n')
if kwargs.get('debug_mode'):
print('-->\n%s' % cmd)
if type(wf) is list:
output = self.expect(wf, timeout)[2]
else:
output = self.read_until(wf, timeout)
if idx >= result_from:
res += output
if kwargs.get('debug_mode'):
print('<-- (%ss)\n%s' % (round(time.time() - start_time, 2), output))
if time.time() - start_time > timeout - 1:
raise Exception('Timeout while performing telnet command: %s' % cmd)
if 'Invalid' in res:
print('Warning: telnet command probably failed.\nCommand: %s\nResponse: %s' % (cmd_list, res))
# return res.split('\r\n')
return res # return the received response as a string, each line is seperated by '\r\n'.
if __name__ == "__main__":
# dev_cfg = CDeviceCfg('config/config.yaml')
# print dev_cfg.get_platform_connection_data()
# telnet = CIosTelnet( **(dev_cfg.get_platform_connection_data() ) )
# if_mng = CIfManager()
# if_mng.load_config(dev_cfg)
# if_mng.dump_config()
pass
|