请求示例
脚本代码的样例,这里提供了python的代码示例。
1、打开单个环境
import requests
import sys
env_id = [2628] # 环境id
data = {"env_id": env_id}
res = requests.post("http://127.0.0.1:10086/api/open_env", json=data)
if res.status_code != 200:
print('环境打开失败:{}'.format(res))
sys.exit()
open_res_json = res.json()
print('返回响应数据:{}'.format(open_res_json))
2、连续打开多个环境
import requests
import sys
import multiprocessing as mp
from time import sleep
def open_env(env_id):
data = {"env_id": env_id}
res = requests.post("http://127.0.0.1:10086/api/open_env", json=data)
if res.status_code != 200:
print('环境打开失败:{}'.format(res))
sys.exit()
open_res_json = res.json()
print('返回响应数据:{}'.format(open_res_json))
if __name__ == '__main__':
env_ids = [2628, 2958] #多个环境id
procs = []
for env_id in env_ids:
env_proc = mp.Process(target=open_env, args=(env_id,))
procs.append(env_proc)
env_proc.start()
sleep(1)
for proc in procs:
proc.join()
3、打开单个环境并且操作浏览器打开新的网页
import time
import sys
import requests
from selenium import webdriver
# 获取webdriver
def get_driver_by_path(webdriver_path, port):
options = webdriver.ChromeOptions()
options.add_experimental_option("debuggerAddress", '127.0.0.1:' + str(port))
return webdriver.Chrome(webdriver_path, options=options)
def open_baidu(driver):
driver.get("https://www.baidu.com/") #打开新的网页
time.sleep(1)
if __name__ == '__main__':
url = r'http://127.0.0.1:10086/api/open_env' # 打开环境
open_data = {"env_id": 2587} # 填写参数,环境id可从创建环境或环境列表接口获取
open_res = requests.post(url, json=open_data).json()
if open_res['code'] != 200:
print('环境打开失败:%s' % open_res)
sys.exit()
webdriver_path = open_res['data']['webdriver'] # 获取webdriver路径
debugging_port = open_res['data']['debuggingport'] # 获取调试端口
# 获取webdriver
driver = get_driver_by_path(webdriver_path, debugging_port) # 填写打开环境返回的webdriver和debuggingPort参数
# 运行脚本
open_baidu(driver)
4、打开账号下所有环境,并且让所有环境打开一个网页
import os
from multiprocessing import Process, current_process
import requests
import sys
from selenium import webdriver
from time import sleep
HOST = 'http://127.0.0.1:10086'
OPEN_ENV = HOST + '/api/open_env'
CLOSR_ENV = HOST + '/api/close_env'
ENV_LIST = HOST + '/api/env_list'
ENV_STATUS = HOST + '/api/env_status'
class MultiEnv(Process):
def __init__(self, env_id):
# 必须调用父类的init方法
super(MultiEnv, self).__init__()
self.env_id = env_id
def run(self):
data = {"env_id": self.env_id}
try:
# self.pid
proc_id = os.getpid()
# 获取子进程名称
# self.name
proc_name = current_process().name
print('proc_id:{0} proc_name:{1}'.format(proc_id, proc_name))
# 请求/api/env_list 得到所有环境信息
open_res_json = MultiEnv.get_res(OPEN_ENV, dat a_json=data)
# 获取webdriver路径
webdriver_path = open_res_json['data']['webdriver']
# 获取调试端口
debugging_port = open_res_json['data']['debuggingport']
options = webdriver.ChromeOptions()
options.add_experimental_option("debuggerAddress", '127.0.0.1:' + str(debugging_port))
# 得到driver对象
driver_yangtao_env = webdriver.Chrome(webdriver_path, options=options)
# 这里就可以写业务代码
sleep(1)
driver_yangtao_env.get("https://www.baidu.com")
sleep(500)
finally:
requests.post(CLOSR_ENV, json=data)
# 请求接口的工具方法
@classmethod
def get_res(cls, url, data_json):
res = requests.post(url, json=data_json)
if res.status_code != 200:
print('环境打开失败:{}'.format(res))
sys.exit()
open_res_json = res.json()
print('返回响应数据:{}'.format(open_res_json))
return open_res_json
# 请求/api/env_list 得到所有环境信息
@classmethod
def get_env_ids(cls):
env_ids = []
data = {
}
env_res = MultiEnv.get_res(ENV_LIST, data_json=data)
for env_data in env_res['data']:
env_ids.append(env_data['id'])
return env_ids
if __name__ == '__main__':
"""
1、调用接口/api/env_list获得所有可用的环境id
2、使用进程分别打开环境
"""
procs = []
# 得到账号内所有可用环境id
for env_id in MultiEnv.get_env_ids():
# 根据每个环境id,分别创建子进程
proc = MultiEnv(env_id)
sleep(5)
procs.append(proc)
# 启动子进程,启动一个新进程实际就是执行本进程对应的run方法
proc.start()
# join方法会让父进程等待子进程结束后再执行
for proc in procs:
proc.join()
print("Done.")