基于libnmap的多线程端口扫描

发布时间:March 29, 2016 // 分类:开发笔记,linux,python,windows,生活琐事 // No Comments

1.采用的是threading, Queue结合的模式

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import nmap
import logging
import threading, Queue, time
import sys


SHARE_Q = Queue.Queue()  #构造一个不限制大小的的队列
_WORKER_THREAD_NUM = 5  #设置线程的个数

logging.basicConfig(
    level=logging.DEBUG,
    format="[%(asctime)s] %(levelname)s: %(message)s")

class MyThread(threading.Thread) :
    """
    doc of class
    Attributess:
        func: 线程函数逻辑
    """
    def __init__(self, func) :
        super(MyThread, self).__init__()  #调用父类的构造函数
        self.func = func  #传入线程函数逻辑

    def run(self) :
        """
        重写基类的run方法
        """
        self.func()

def nmapScan(targetHosts):
    """
    主要用来工作区域
    获取当前的ip地址加入Nmap扫描中
    如果发现地址存活,就输出服务等信息
    """
    try:
        scanner = nmap.PortScanner()
        scanner.scan(targetHosts,arguments='-Pn -sT -sV --allports --version-trace')
        for targetHost in scanner.all_hosts():
            if scanner[targetHost].state() == 'up' and scanner[targetHost]['tcp']:
                #logging.debug(scanner[targetHost]['tcp'])
                for targetport in scanner[targetHost]['tcp']:
                    if scanner[targetHost]['tcp'][int(targetport)]['state'] == 'open':
                        #dat = str(targetport)+ '    ' + scanner[targetHost]['tcp'][int(targetport)]['name']+ '  ' + str(scanner[targetHost]['tcp'][int(targetport)]['product']+scanner[targetHost]['tcp'][int(targetport)]['version'])
                        #result.append(dat)
                        #logging.debug(str(port)  + '\t' + scanner[targetHost]['tcp'][int(port)]['name'])
                        logging.debug(targetHosts+'\t'+str(targetport) + '\t' + scanner[targetHost]['tcp'][int(targetport)]['name'] + '\t' + scanner[targetHost]['tcp'][int(targetport)]['product']+scanner[targetHost]['tcp'][int(targetport)]['version'])
                #    print 'Port ' + str(targetPort) + '/tcp ' + scanner[targetHost]['tcp'][int(targetPort)]['name'] + ' is ' + scanner[targetHost]['tcp'][int(targetPort)]['state']
            else:
                break
                continue
        #return result
    except Exception, e:
        logging.info(targetHosts+'\t'+str(e))
        return

def worker() :
    """
    主要用来写工作逻辑, 只要队列不空持续处理
    队列为空时, 检查队列, 由于Queue中已经包含了wait,
    notify和锁, 所以不需要在取任务或者放任务的时候加锁解锁
    """
    global SHARE_Q
    while True :
        if not SHARE_Q.empty():
            item = SHARE_Q.get() #获得任务
            #logging.debug(item)
            nmapScan(item)
            time.sleep(1)
            SHARE_Q.task_done()


def main(ip_prefix) :
    global SHARE_Q
    threads = []
    #向队列中放入任务, 真正使用时, 应该设置为可持续的放入任务
    for i in range(1,256):
        ip = '%s.%s'%(ip_prefix,i)
        #logging.debug(ip)
        SHARE_Q.put(ip)
    #开启_WORKER_THREAD_NUM个线程
    for i in xrange(_WORKER_THREAD_NUM) :
        thread = MyThread(worker)
        thread.start()  #线程开始处理任务
        threads.append(thread)
    for thread in threads :
        thread.join()
    #等待所有任务完成
    SHARE_Q.join()

if __name__ == "__main__":
    try:
        data= '115.238.55'
        print main(data)
        #for data in rsult:
        #    logging.debug(data)
    except KeyboardInterrupt, e:
        print '\nBreak out.'
        sys.exit()

2.采用的是map引入并发执行

#/usr/bin/env python
#coding=utf8
from multiprocessing import Pool
import logging
import nmap
import time
import sys

logging.basicConfig(
    level=logging.DEBUG,
    format="[%(asctime)s] %(levelname)s: %(message)s")

def do_add(targetHosts):
    try:
        scanner = nmap.PortScanner()
        scanner.scan(targetHosts,arguments='-Pn -sT -sV --allports --version-trace')
        for targetHost in scanner.all_hosts():
            if scanner[targetHost].state() == 'up' and scanner[targetHost]['tcp']:
                #logging.debug(scanner[targetHost]['tcp'])
                for targetport in scanner[targetHost]['tcp']:
                    if scanner[targetHost]['tcp'][int(targetport)]['state'] == 'open':
                        #dat = str(targetport)+ '    ' + scanner[targetHost]['tcp'][int(targetport)]['name']+ '  ' + str(scanner[targetHost]['tcp'][int(targetport)]['product']+scanner[targetHost]['tcp'][int(targetport)]['version'])
                        #result.append(dat)
                        #logging.debug(str(port)  + '\t' + scanner[targetHost]['tcp'][int(port)]['name'])
                        logging.debug(targetHosts+'\t'+str(targetport) + '\t' + scanner[targetHost]['tcp'][int(targetport)]['name'] + '\t' + scanner[targetHost]['tcp'][int(targetport)]['product']+scanner[targetHost]['tcp'][int(targetport)]['version'])
                #    print 'Port ' + str(targetPort) + '/tcp ' + scanner[targetHost]['tcp'][int(targetPort)]['name'] + ' is ' + scanner[targetHost]['tcp'][int(targetPort)]['state']
            else:
                break
                continue
        #return result
    except Exception, e:
        logging.info(targetHosts+'\t'+str(e))
        return


def main(ip_prefix):
    pool = Pool(5)
    #向队列中放入任务, 真正使用时, 应该设置为可持续的放入任务
    for i in range(1,256):
        ip = '%s.%s'%(ip_prefix,i)
        #print ip
        print pool.map(do_add, [ip])
    pool.close()
    pool.join()

if __name__ == "__main__":
    try:
        commandargs = sys.argv[1:]
        args = "".join(commandargs)
        data = '.'.join(args.split('.')[:-1])
        print main(data)
        #for data in rsult:
        #    logging.debug(data)
    except KeyboardInterrupt, e:
        print '\nBreak out.'
        sys.exit()

不得不提踩到的坑.

传入的function,只能接收一个传入参数,传入的function必须处理必要的异常,有网友提供了解决办法,使用functools的partial可以解决,详见 爆栈

第三点是为什么要在子进程里用死循环让其长期执行。窃以为作者的直接把上千个任务暴力丢给进程池的做法并不是最高效的方式,即便是正在执行的进程数和CPU数能匹配得切到好处,但是一大堆的进程切换的开销也会有相当的负担。但是创建几个长期运行的工作进程,每个工作进程处理多个任务,省略掉了大量开启关闭进程的开销,原理上来说会效率高一些。不过这个问题我没有实测过。再不过其实从原理上来说这个开销虽然有但是并不是有多么大,很多时候完全可以忽略,比如作者用的例子。 所以其实更确切一点的需求反而是用于实现生产者消费者模式。因为在作者的例子里,任务数是固定的,不可控的,更多的时候我们反而是需要用生产者创建任务,由worker进程去执行任务。举个例子,生产者监听一个redis的队列,有新url放进去的时候就通知worker进程去取。

代码如下:

#coding=utf8
from multiprocessing import Pool, Queue
import redis
import requests

queue = Queue(20)

def consumer():
    r = redis.Redis(host='127.0.0.1',port=6379,db=1)
    while True:
        k, url = r.blpop(['pool',])
        queue.put(url)

def worker():
    while True:
        data = queue.get()#利用spit(':')分割
        print nmap(host,port)

def process(ptype):
    try:
        if ptype:
            consumer()
        else:
            worker()
    except:
        pass

pool = Pool(5)
print pool.map(process, [1,0,0,0,0])
pool.close()
pool.join()

比起经典的方式来说简单很多设置好端口范围,然后跟ip一起丢入队列,然后从队列里面调用nmap识别,效率高,易懂,而且没什么死锁的陷阱。

Refer:

(1)英文原文:https://medium.com/p/40e9b2b36148

(2)原文代码:https://github.com/chriskiehl/Blog/tree/master/40e9b2b36148

(3)关于Python并行任务技巧的几点补充 http://liming.me/2014/01/12/python-multitask-fixed/

(4)在单核 CPU、Python GIL 限制下,多线程需要加锁吗?

https://github.com/onlytiancai/codesnip/blob/master/python/sprace.py

(5)gevent程序员指南  http://xlambda.com/gevent-tutorial/#_8

(6)进程、线程和协程的理解

http://blog.leiqin.name/2012/12/02/%E8%BF%9B%E7%A8%8B%E3%80%81%E7%BA%BF%E7%A8%8B%E5%92%8C%E5%8D%8F%E7%A8%8B%E7%9A%84%E7%90%86%E8%A7%A3.html

(7)python 多进程: from multiprocessing.pool import ThreadPool
http://hi.baidu.com/0xcea4/item/ddd133c187a6277089ad9e4b

http://outofmemory.cn/code-snippet/6723/Python-many-process-bingfa-multiprocessing

(8)python的threading和multiprocessing模块初探

http://blog.csdn.net/zhaozhi406/article/details/8137670

(9)使用Python进行并发编程

http://python.jobbole.com/81255/

标签:libnmap, Queue, threading, multiprocessing

添加新评论 »