- #!/usr/bin/env python
-
- import time
- import os
- import threading
- import optparse
- import datetime
- import sys
- import select
- import signal
- from kazoo.client import KazooClient
-
- debug = False
-
- def _dump_stat( stat ):
- if stat is None:
- print("\tStat is None")
- else:
- createTime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(stat.ctime / 1000.0))
- modTime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(stat.mtime / 1000.0))
- print("\tStat is:")
- print("\t\tacl_version: %d" % stat.aversion )
- print("\t\tversion: %s" % stat.version )
- print("\t\tcreated: %s" % createTime )
- print("\t\tcreation_transaction_id: %d" % stat.czxid )
- print("\t\tlast_modified: %s" % modTime )
- print("\t\tlast_modified_transaction_id: %d" % stat.mzxid)
- print("\t\tdata_length: %d" % stat.dataLength )
- print("\t\tchildren_count: %d" % stat.numChildren )
-
- def _dump_event( event ):
- if event is None:
- print("\tEvent is None")
- else:
- print( event )
- print('Event Type %s' % event.type )
- print('Event State %s' % event.state )
- if event.path is None:
- print("\tNode Path is None")
- else:
- print("\tNode Path is: %s" % event.path)
-
- def dump_child_event( parent_node, children, event ):
- global debug
- if debug == False:
- return;
- print("CHILDREN EVENT OCCURS: parent is %s." % parent_node )
- if event is None:
- print("\tEvent is None")
- else:
- _dump_event( event )
- if children is not None:
- i = 0;
- for child in children:
- i += 1
- print( "\tChild node %d is %s" % (i, child) )
-
- def dump_node_event( node, data, stat, event ):
- global debug
- if debug == False:
- return;
- print("NODE EVENT OCCURS: node is %s." % node )
- _dump_event( event )
- _dump_stat( stat )
- print("\tNodeData is: \"%s\"" % data )
-
- parser = optparse.OptionParser( description='Snoop on Zookeeper events for a DCS cluster.' )
- parser.add_option( '--debug', dest='debugOn', action='store_true', default=False,
- help='print out verbose debugging information' )
- parser.add_option( '--zkport', dest='zkport', default="localhost:2181",
- help='CSV string containing ZK host:port pairs' )
-
- default_user = os.path.basename(os.environ['HOME'])
- parser.add_option( '--cluster', dest='cluster', default=default_user + "_cluster",
- help='cluster id of the cluster to monitor' )
- opts, args = parser.parse_args()
-
- if opts.zkport and opts.cluster:
- cluster = opts.cluster
- zkportpairs = opts.zkport
- else:
- print("Must either specify a config file or both zkport and cluster")
- print( parser.print_help() )
- sys.exit(0)
-
- memberNode = '/perforce/cluster/' + cluster + '/members'
- clusterRootNode = '/perforce/cluster/' + cluster
- electionNode = clusterRootNode + '/election'
- workspaceNode = clusterRootNode + '/workspace'
- masterNode = clusterRootNode + '/master'
- routerNode = clusterRootNode + '/router'
-
- # Global Locked Access Variables
- list_lock = threading.RLock()
- member_children = []
- member_set = set(member_children)
- election_children = []
- election_set = set(election_children)
- router_children = []
- router_set = set(router_children)
- workspace_children = []
- workspace_set = set(workspace_children)
- masterData = ''
-
- zk = KazooClient(hosts=zkportpairs, read_only=True )
- zk.start()
- zk.ensure_path(electionNode)
- zk.ensure_path(workspaceNode)
- zk.ensure_path(routerNode)
- zk.ensure_path(memberNode)
-
- # connection watch
- def my_listener(state):
- if state == KazooState.LOST:
- print("ERROR - SESSION WAS LOST!!!")
- elif state == KazooState.SUSPENDED:
- print("ERROR - DISCONNECTED!!!")
- else:
- print("ERROR - DIS & RE-CONNECTED!!!")
-
- zk.add_listener( my_listener )
-
- # utility routine
- def show_status():
- print("CURRENT ZOOKEEPER STATUS")
- print("----------------------------------------------------------------------------------")
- if zk.exists( masterNode ):
- (data, stat) = zk.get( masterNode )
- print("MASTER is UP: %s" % data)
- else:
- print("MASTER is DOWN")
- print("ELECTION NODES")
- children = zk.get_children(electionNode)
- for child in children:
- childNode = electionNode + '/' + child
- (data, stat) = zk.get(childNode)
- print("\tNode %s has data: %s" % (child, data))
- print("WORKSPACE NODES")
- children = zk.get_children(workspaceNode)
- for child in children:
- childNode = workspaceNode + '/' + child
- (data, stat) = zk.get(childNode)
- print("\tNode %s has data: %s" % (child, data))
- print("ROUTER NODES")
- children = zk.get_children(routerNode)
- for child in children:
- childNode = routerNode + '/' + child
- (data, stat) = zk.get(childNode)
- print("\tNode %s has data: %s" % (child, data))
- print("MEMBER NODES")
- children = zk.get_children(memberNode)
- for child in children:
- childNode = memberNode + '/' + child
- (data, stat) = zk.get(childNode)
- print("\tNode %s has data: %s" % (child, data))
- print("----------------------------------------------------------------------------------")
- print("")
-
-
-
-
- # data watches
- def election_data_changed(data, stat, event):
- dump_node_event( 'Election node', data, stat, event )
- with list_lock:
- if event is None:
- if data:
- print("+++++++++++++++ Child of %s data is now at version: %s, data: %s" %
- (electionNode, stat.version, data.decode("utf-8")))
- else:
- print("+++++++++++++++ %s/%s Event type: %s (state %)" %
- (electionNode, event.path, event.type, event.state))
- if data is not None and data:
- print("+++++++++++++++ %s/%s data is now at version: %s, data: %s" %
- ( electionNode, event.path, stat.version, data.decode("utf-8")))
-
- def wksp_data_changed(data, stat, event):
- dump_node_event( 'Workspace node', data, stat, event )
- with list_lock:
- if event is None:
- if data:
- print("+++++++++++++++ Child of %s data is now at version: %s, data: %s" %
- (workspaceNode, stat.version, data.decode("utf-8")))
- else:
- print("+++++++++++++++ %s/%s Event type: %s (state %)" %
- (workspaceNode, event.path, event.type, event.state))
- if data is not None and data:
- print("+++++++++++++++ %s/%s data is now at version: %s, data: %s" %
- ( workspaceNode, event.path, stat.version, data.decode("utf-8")))
-
- def router_data_changed(data, stat, event):
- dump_node_event( 'Router node', data, stat, event )
- with list_lock:
- if event is None:
- if data:
- print("+++++++++++++++ Child of %s data is now at version: %s, data: %s" %
- (routerNode, stat.version, data.decode("utf-8")))
- else:
- print("+++++++++++++++ %s/%s Event type: %s (state %)" %
- (routerNode, event.path, event.type, event.state))
- if data is not None and data:
- print("+++++++++++++++ %s/%s data is now at version: %s, data: %s" %
- ( routerNode, event.path, stat.version, data.decode("utf-8")))
-
- def member_data_changed(data, stat, event):
- dump_node_event( 'Member node', data, stat, event )
- with list_lock:
- if event is None:
- if data:
- print("+++++++++++++++ Child of %s data is now at version: %s, data: %s" %
- (memberNode, stat.version, data.decode("utf-8")))
- else:
- print("+++++++++++++++ %s/%s Event type: %s (state %)" %
- (memberNode, event.path, event.type, event.state))
- if data is not None and data:
- print("+++++++++++++++ %s/%s data is now at version: %s, data: %s" %
- ( memberNode, event.path, stat.version, data.decode("utf-8")))
-
-
- # setup watches on master node
- def print_master_status( masterExists ):
- if masterExists:
- @zk.DataWatch( masterNode )
- def master_data_changed(data, stat):
- with list_lock:
- if data:
- print("+++++++++++++++ %s data is now at version: %s, data: %s" %
- (masterNode, stat.version, data.decode("utf-8")))
- else:
- print("!!!!!!! MASTER DOES NOT EXIST")
-
- def master_changed( event ):
- print("MASTER NODE STATUS")
- if event is not None:
- if(debug == False):
- print("-------- %s Event type: %s (state: %s)"
- % (masterNode, event.type, event.state))
- else:
- print("-------- %s Event type: Zookeeper Monitor Startup" % masterNode )
- masterExists = zk.exists( masterNode, watch=master_changed )
- print_master_status( masterExists )
-
- masterExists = zk.exists( masterNode, watch=master_changed )
- print("MASTER NODE STATUS")
- print("-------- %s Event type: Zookeeper Monitor Startup" % masterNode )
- print_master_status( masterExists )
-
- @zk.ChildrenWatch(electionNode, send_event=True)
- def elect_child_changed(children, event):
- global election_children
- print(' ')
- print("ELECTION CHILDREN are now: %s" % children)
- with list_lock:
- dump_child_event( 'election children', children, event )
- if event is not None:
- if(debug == False):
- print("-------- %s Event type: %s (state %)"
- % (event.path, event.type, event.state))
- children_set = set(children)
- for child in children:
- if child not in election_set:
- childNode = electionNode + '/' + child
- election_children.append(child)
- print("-------- Election Node %s Created" % childNode )
- zk.DataWatch( childNode, func=election_data_changed, send_event=True )
- for child in election_children:
- if child not in children_set:
- childNode = electionNode + '/' + child
- election_children.remove(child)
- print("-------- Election Node %s Deleted" % childNode )
- return True
-
- @zk.ChildrenWatch(workspaceNode, send_event=True)
- def wksp_child_changed(children, event):
- global workspace_children
- print(' ')
- print("WORKSPACE CHILDREN are now: %s" % children)
- with list_lock:
- dump_child_event( 'workspace children', children, event )
- if event is not None:
- if(debug == False):
- print("-------- %s Event type: %s (state %)"
- % (event.path, event.type, event.state))
- children_set = set(children)
- for child in children:
- if child not in workspace_set:
- childNode = workspaceNode + '/' + child
- workspace_children.append(child)
- print("-------- Workspace Node %s Created" % childNode )
- zk.DataWatch( childNode, func=wksp_data_changed, send_event=True )
- for child in workspace_children:
- if child not in children_set:
- childNode = workspaceNode + '/' + child
- workspace_children.remove(child)
- print("-------- Workspace Node %s Deleted" % childNode )
-
- return True
-
- @zk.ChildrenWatch(routerNode, send_event=True)
- def router_child_changed(children, event):
- global router_children
- print(' ')
- print("ROUTER CHILDREN are now: %s" % children)
- with list_lock:
- dump_child_event( 'router children', children, event )
- if event is not None:
- if(debug == False):
- print("-------- %s Event type: %s (state %)"
- % (event.path, event.type, event.state))
- children_set = set(children)
- for child in children:
- if child not in router_set:
- childNode = routerNode + '/' + child
- router_children.append(child)
- print("-------- Router Node %s Created" % childNode )
- zk.DataWatch( childNode, func=router_data_changed, send_event=True )
- for child in router_children:
- if child not in children_set:
- childNode = routerNode + '/' + child
- router_children.remove(child)
- print("-------- Router Node %s Deleted" % childNode )
- return True
-
- @zk.ChildrenWatch(memberNode, send_event=True)
- def member_child_changed(children, event):
- global member_children
- print(' ')
- print("MEMBER CHILDREN are now: %s" % children)
- with list_lock:
- dump_child_event( 'member children', children, event )
- if event is not None:
- if(debug == False):
- print("-------- %s Event type: %s (state %)"
- % (event.path, event.type, event.state))
- children_set = set(children)
- for child in children:
- if child not in member_set:
- childNode = memberNode + '/' + child
- member_children.append(child)
- print("-------- member Node %s Created" % childNode )
- zk.DataWatch( childNode, func=member_data_changed, send_event=True )
- for child in member_children:
- if child not in children_set:
- childNode = memberNode + '/' + child
- member_children.remove(child)
- print("-------- member Node %s Deleted" % childNode )
- return True
-
- def handler(signum, frame):
- print ('Stopping zkMonitor with signal %d' % signum)
- zk.stop()
- sys.exit(0)
-
- # Set the signal handlers
- signal.signal(signal.SIGHUP, handler)
- signal.signal(signal.SIGINT, handler)
-
- while True:
- sys.stdin in select.select([sys.stdin], [], [])[0]
- input = sys.stdin.readline()
- print("What do you want to do: quit [q], show status[s], nothing[n] or any other input.")
- input = raw_input('Your choice [q,s,N]: ').strip().lower()
- if input == 'q':
- zk.stop()
- sys.exit(0)
- elif input == 's':
- show_status()