基本信息
源码名称:高性能分发gearman worker管理
源码大小:0.01M
文件格式:.zip
开发语言:Python
更新时间:2019-05-04
   友情提示:(无需注册或充值,赞助后即可获取资源下载链接)

     嘿,亲!知识可是无价之宝呢,但咱这精心整理的资料也耗费了不少心血呀。小小地破费一下,绝对物超所值哦!如有下载和支付问题,请联系我们QQ(微信同号):813200300

本次赞助数额为: 2 元 
   源码介绍

1.redis结构设置
  jcsworker_num: 5
  jcsworker_status:{pid:busy,pid:free}
  jcsworkerstop_flag:True/False


2.jcsworker_num从redis中读取

  执行过程中,状态上报,存到redis中 jcsworker_status:{pid:busy,pid:free}
 
  stop打破循环 
 

# -*- coding: utf-8 -*-
from worker import *
from multiprocessing import process,Event
import time
from redisdb import  Database
import schedule
import logging
import errno
import signal
class Mworker(object):
    def __init__(self, num_workers=1,name="jcs"):
        self.name=name
        self.workers = list()
        for i in range(0, num_workers):
            worker = Worker("jcs")
            p = multiprocessing.Process(target=worker.do)
            p.daemon = True
            self.workers.append((worker, p))
            p.start()
            print("process start")
            time.sleep(1)
           #self.workers.append((worker,p))

    def createWorkers(self, num_workers):
        for i in range(0, num_workers):
            worker = Worker("jcs")
            p =multiprocessing.Process(target=worker.do)
            p.daemon = True
            self.workers.append((worker, p))
            p.start()
            # sleep 1s to avoid that worker start too fast
            time.sleep(1)

    def dismissWorkers(self, num_workers):
        print("num of dismiss_workers %s"%(num_workers))
        for i in range(0, num_workers):
            w, p =self.workers.pop()
            w.dismiss()
            print("dismiss start")
            p.join()

    def wait_child(self,signum, frame):
        logging.info('receive SIGCHLD')
        try:
            while True:
                # -1 表示任意子进程
                # os.WNOHANG 表示如果没有可用的需要 wait 退出状态的子进程,立即返回不阻塞
                cpid, status = os.waitpid(-1, os.WNOHANG)
                if cpid == 0:
                    logging.info('no child process was immediately available')
                    break
                exitcode = status >> 8
                logging.info('child process %s exit with exitcode %s', cpid, exitcode)
                for i in range(len(self.workers)):
                    logging.info("pid files")
                    logging.info(self.workers[i][0].pidnum.value)
                    if cpid==self.workers[i][0].pidnum.value:
                        worker = Worker("jcs")
                        p = multiprocessing.Process(target=worker.do)
                        p.daemon = True
                        #establish Correponding process in the specified location
                        self.workers[i]=(worker,p)
                        p.start()
        except OSError as e:
            if e.errno == errno.ECHILD:
                logging.warning('current process has no existing unwaited-for child processes.')
            else:
                raise
        logging.info('handle SIGCHLD end')

    def plan_restart(self):
        logging.info('it is to restart')
        for i in range(len(self.workers)):

            if self.workers[i][0].times.value >= 600:
                self.workers[i][0].dismiss()
                self.workers[i][1].join()
        # restarted the process
                worker = Worker("jcs")
                p = multiprocessing.Process(target=worker.do)
                p.daemon = True
                self.worker[i] = (worker, p)
        # establish Correponding process in the specified location
    logging.info("has restarted the process")


#to do,how to set the schedule to retart

#########################
"""
1.redis结构设置
  jcsworker_num: 1
  jcsworker_name:{pid:0,pid:1} 0:free,1:busy
  jcsworkerstop_flag:True/False
"""
#########################

def schedule1(jc,st):
    r=Database()
    def job():
        jc.value =int(r.read("jcsnum"))
        print jc.value
        st.value =int(r.read("stopjcs"))
        print st.value
    schedule.every(2).seconds.do(job)
    while True:
        schedule.run_pending()
        time.sleep(1)

if __name__ == '__main__':
    print("start")
    mw = Mworker()
    jc= multiprocessing.Value("i",1)
    st=multiprocessing.Value("i",0)
    p= multiprocessing.Process(target=schedule1,args=(jc,st))
    p.daemon=True
    p.start()
    # this process is to monitor the execute times of each worker,and if the execute times of the worker is greater than 600, it will restart
    p_restart = multiprocessing.Process(target=mw.plan_restart)
    p_restart.daemon = True
    p_restart.start()
    # seize the SIGCHLD
    signal.signal(signal.SIGCHLD,mw.wait_child)
    while True:
        worknum=jc.value
        worknum=int(worknum)
        print(worknum)
        print(len(mw.workers))
        if worknum>len(mw.workers):
            mw.createWorkers(worknum-len(mw.workers))
        else:
            mw.dismissWorkers(len(mw.workers)-worknum)
        stop=st.value
        #stop=int(r.read("stopjcs"))
        time.sleep(1)
        if stop>0:
            while len(mw.workers) != 0:
               # print(len(mw.workers))
               # print(mw.workers[0][0].flag.value)
                while len(mw.workers)!=0 and mw.workers[-1][0].flag.value!= 0:
                    pass
                mw.dismissWorkers(1)
            break
    print(len(mw.workers))
    print("worker stopped")