多线程模块threading的简单使用.md 8.1 KB


title: 多线程模块threading的简单使用 tags:

  • threading
  • 选课 id: '154' categories:
    • Python练习

      date: 2020-06-19 12:36:19

      from aip import AipOcr
      import requests, time, re, random, threading
      
      space = re.compile(r'\s+')
      htag = re.compile(r'<[^>]+>')
      
      """ 你的 APPID AK SK """
      APP_ID = ''
      API_KEY = ''
      SECRET_KEY = ''
      
      client = AipOcr(APP_ID, API_KEY, SECRET_KEY)
      options = {}
      options["language_type"] = "CHN_ENG"
      options["detect_direction"] = "true"
      options["detect_language"] = "true"
      
      def BaiduOCR(image, options=options):
      if isinstance(image, str):
      results = client.basicGeneralUrl(image, options)
      else:
      results = client.basicGeneral(image, options)
      if 'error_code' in results:
      print (f'BaiduOCR error {results["error_code"]} {results["error_msg"]}')
      return ''
      return ''.join(line['words'] for line in results['words_result'] if 'words' in line)
          
      def jsonfy(s:str)->object:
      #此函数将不带双引号的json的key标准化
      if s[0] not in ('{','['): print(s + '\n' + 'jsonfy 登录失效,请重新获取Cookie!')
      try:
      obj = eval(s, type('js', (dict,), dict(__getitem__=lambda s, n: n))())
      return obj
      except Exception:
      return {}
          
      
      def getScLc(d):
      return (int(d['sc']), int(d['lc']))
      
      def getXkList(r):
      a = r.text.find('[')
      b = r.text.find(r';/*sc 当前人数, lc 人数上限*/')
      c = r.text.find('{', b)
      print(f'getXkList {a} {b} {c}')
      if b==-1 or c==-1: print(r.text); time.sleep(0.5); return
      j = jsonfy(r.text[a:b])
      j2 = jsonfy(r.text[c:])
      return [(one['id'], one['no'], one['name'], getScLc(j2[str(one['id'])])) for one in j]
      
      def getData(Form = None):
      if Form:
      r = requests.post('', data = Form, headers=headers)
      else:
      r = requests.get(r''
      print(f'getData status_code {r.status_code}')
      if '请不要过快点击' in r.text: time.sleep(random.randint(3,9)/3)
      return r
      
      def getTimestamp(size = 1000):
      t = time.time()
      return int(round(t * size))
      
      def getCaptcha():
      r = requests.get(f'', headers=headers)
      print(f'getCaptcha status_code {r.status_code}')
      img = r.content
      text = BaiduOCR(img)
      print(f'getCaptcha results {text}')
      return text.replace(' ','')
      
      def Xk(CourseId, lock):
      print(f'Xk 怎么会死锁? {lock.locked()}')
      with lock:
      data = {
      "optype": "true",
      "operator0": f"{CourseId}:true:0",
      "captcha_response": getCaptcha()
      }
      r = requests.post(r'', data = data, headers=headers)
      print(f'Xk status_code {r.status_code}')
      return htag.sub(' ', space.sub('', r.text))
      
      def Xk2S(CourseId, lock):
      i = 3
      ord = 1
      while i >= 0:
      st = Xk(CourseId, lock)
      if '选课成功' in st:
          print(f'Xk2S 第{ord}次选课成功!')
          return True
      else:
          print(f'Xk2S 第{ord}次选课:')
          print(st)
          assert '选课失败:公选人数已满' not in st, 'Xk2S 公选人数已满 无法继续'
          assert '选课失败:你已经选过' not in st, 'Xk2S 选课成功 无需继续'
          if '操作失败:验证码错误' not in st: i -= 1
      ord += 1
      else:
      return False
      #PHPM110047.01
      
      from headers import headers
      
      def getCourse(Form):
      Courses = getXkList(getData(Form))
      while not Courses:
      print(f'getCourse 想必是要死循环了 {Courses}')
      time.sleep(random.randint(5,16)/2)
      Courses = getXkList(getData(Form))
              
      print (Courses)
      for Course in Courses:
      if Course[1] in Form['lessonNo']:
          print (Course)
          return Course
      assert False, 'getCourse 出现未知错误,请重试!'
      
      def isSuccessful(Form):
      time.sleep(5)
      Courses = getXkList(getData())
      print (Courses)
      for Course in Courses:
      if Course[1] == Form['lessonNo']:
          print (Course)
          return True
      return False
      
      #Form['lessonNo'] = 'PEDU110091.19'
      
      def start(lessonNo, lock, event):
      Form = {
      "lessonNo": lessonNo,
      "courseCode": "",
      "courseName": ""
      }
      time.sleep(random.randint(0,50)/10)
      print(f'start {lessonNo} {id(lock)} {id(event)}')
      while True:
      try :
          Course = getCourse(Form)
          if Course[3][0] < Course[3][1]:
              if Xk2S(Course[0], lock) and isSuccessful(Form):
                  print(f'主循环 选课成功 {time.ctime(time.time())}')
                  break
              else:
                  print(f'主循环 选课失败 继续尝试 {time.ctime(time.time())}')
          else:
              print(f'主循环 人数已达上限 继续等待 {time.ctime(time.time())}')
          event.set()
          time.sleep(random.randint(20,50)/5)
      except requests.ConnectionError:
          print('网络超时, 延时等待1分钟')
          for i in range(6):
              event.set()
              time.sleep(10)
      
      printf = print
      def wrapprint(*args, **kw):
      printf(threading.current_thread().name, ":",end='')
      printf(*args, **kw)
      print = wrapprint
      
      
      
      
      
      
      import threading, time, ctypes, inspect
      
      def _async_raise(tid, exctype):
      """raises the exception, performs cleanup if needed"""
      tid = ctypes.c_long(tid)
      if not inspect.isclass(exctype):
      exctype = type(exctype)
      res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
      if res == 0:
      #raise ValueError("invalid thread id")
      return "ValueError: invalid thread id"
      elif res != 1:
      # """if it returns a number greater than one, you're in trouble,
      # and you should call it again with exc=NULL to revert the effect"""
      ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
      #raise SystemError("PyThreadState_SetAsyncExc failed")
      return "SystemError: PyThreadState_SetAsyncExc failed"
      return "success!"
      def startThread(tasks, timeout=35):
      lock = threading.Lock()
      tasks = [(target, args, threading.Event()) for target,args in tasks]
      #for task in tasks: task['args'] = task.get('args',()) + (lock, watchdog)
      threads = [[threading.Thread(target=target, args=args+(lock, event)), event] for target,args,event in tasks]
      #if threads: timeout = timeout // len(threads) + 1
      for t,event in threads: t.setDaemon(True)
      for t,event in threads: t.start()
      print('startThread 守护线程开始执行!')
      counter = 1
      while any(t.is_alive() for t,event in threads):
      print(f'startThread 守护线程第{counter}轮 开始 {time.ctime(time.time())}')
      time.sleep(timeout)
      for t,event in threads:
          if not event.isSet() and t.is_alive(): break
          time.sleep(0.1)
      else: print(f'startThread 守护线程第{counter}轮 正常 {time.ctime(time.time())}'); counter += 1;continue
      print(f'startThread 守护线程第{counter}轮 发生异常,准备重启... {time.ctime(time.time())}')
      lock = threading.Lock()
      restart = []
      for i,one in enumerate(threads):
          t, event = one
          if not t.is_alive(): print(f'startThread 线程自然退出,忽略 {t.name}'); continue
          if event.isSet(): event.clear()
          else:
              print(f'startThread 正在结束线程 {t.name} {_async_raise(t.ident, SystemExit)}')
              target,args,event = tasks[i]
              t = threads[i][0] = threading.Thread(target=target, args=args+(lock, event))
              t.setDaemon(True)
              restart.append(t) 
      for t in restart: t.start(); print(f'startThread 启动替代线程 {t.name}')
      del restart
      print('startThread 守护线程准备退出中...')
      for t,event in threads:
      if t.is_alive(): print(f'startThread 正在结束线程 {t.name} {_async_raise(t.ident, SystemExit)}')
      print('startThread 守护线程已退出!')
      
      from FDUxk1 import start
      
      cl = [] 
      #cl.append('PEDU110091.19') #乒乓球
      cl.append('PHPM110047.01') #医院经营管理
      #cl.append('TOUR110001.01') #旅游与经济管理
      cl.append('PTSS110016.01') #当代世界经济与政治
      cl.append('PHIL119056.01') #哲学视野中的人工智能