python-12306

介绍

使用 python2.7 开发,登录网站,获取数据,模拟数据,邮箱通知!

用于 python 爬虫学习

涉及相关 python 库

  • PIL
  • bs4
  • prettytable
  • json
  • re
  • urllib
  • urllib2
  • smtplib

工具

  • Python2.7
  • Fiddler 4
  • Vs code

开源

配置文件

  1. 浏览器头文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    # 浏览器headers
    headers = {
    "User-Agent": 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.139 Safari/537.36',
    "Host": "kyfw.12306.cn",
    "Referer": "https://kyfw.12306.cn/otn/passport?redirect=/otn/"
    }

    headers2 = {
    "User-Agent": 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36',
    "Host": "kyfw.12306.cn",
    "Referer": "https://kyfw.12306.cn/otn/login/init"
    }
  2. 网站功能地址

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    #########登录###########
    init_url = 'https://kyfw.12306.cn/otn/login/init'
    captcha_url = 'https://kyfw.12306.cn/passport/captcha/captcha-image?{}'
    login_url = 'https://kyfw.12306.cn/passport/web/login'
    check_captcha_url = 'https://kyfw.12306.cn/passport/captcha/captcha-check'
    uamtk_url = 'https://kyfw.12306.cn/passport/web/auth/uamtk'
    auth_url = 'https://kyfw.12306.cn/otn/uamauthclient'
    initmy12306_url = 'https://kyfw.12306.cn/otn/index/initMy12306'
    #########查票###########
    left_tickets_sel_init = 'https://kyfw.12306.cn/otn/leftTicket/init'
    left_tickets_url = 'https://kyfw.12306.cn/otn/leftTicket/queryZ?'
    #########下单###########
    checkuser_url = 'https://kyfw.12306.cn/otn/login/checkUser'
    submit_order_url = 'https://kyfw.12306.cn/otn/leftTicket/submitOrderRequest'
    confirm_passenger_url = 'https://kyfw.12306.cn/otn/confirmPassenger/initDc'
    get_passenger_url = 'https://kyfw.12306.cn/otn/confirmPassenger/getPassengerDTOs'
    check_order_info_url = 'https://kyfw.12306.cn/otn/confirmPassenger/checkOrderInfo'
    get_queue_count_url = 'https://kyfw.12306.cn/otn/confirmPassenger/getQueueCount'
    get_pass_code_new_url = 'https://kyfw.12306.cn/otn/passcodeNew/getPassCodeNew'
    confirm_order_url = 'https://kyfw.12306.cn/otn/confirmPassenger/confirmSingleForQueue'
    wait_oder_url = 'https://kyfw.12306.cn/otn/confirmPassenger/queryOrderWaitTime'
    get_result_order_url = 'https://kyfw.12306.cn/otn/confirmPassenger/resultOrderForDcQueue'
    resultOrderForDcQueue = 'https://kyfw.12306.cn/otn/confirmPassenger/resultOrderForDcQueue'
  3. 手动验证码地址

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    captcha_point = {
    '1': '40,43',
    '2': '110,43',
    '3': '180,43',
    '4': '260,43',
    '5': '40,117',
    '6': '110,117',
    '7': '180,117',
    '8': '260,117',
    }
  4. 其他配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    ##############################################
    #需要修改的配置文件
    ##############################################
    # 12306账号信息
    logininfo = {
    'username': 'tosobo', #修改 帐号
    'password': '', #修改 帐号密码
    'appid': 'otn'
    }
    loginname = u'沙' #修改 帐号名称

    #购票信息
    From_Station = '武汉' #修改 起点站
    To_Station = '深圳' #修改 终点站
    Train_Date = '2019-02-12' #修改 订票日期
    TrainNumber = ['G1007','G1009','G1013','G73','G71','G1015','G79','G1017',''] #修改 车次,为空刷所有
    SeatType = '二等座' #修改 座次类型
    TicketName = ['沙','娟'] #修改 购票人名字

    #通知邮箱
    Email = 'tosobright@qq.com' #修改

    流程

登录

  • 打开登陆页面

    1
    2
    3
    4
    5
    6
    response = urllib2.urlopen(urllib2.Request(
    config.init_url, headers=config.headers))
    if response.getcode() == 200:
    html = response.read()
    soup = BeautifulSoup(html, 'html.parser')
    print soup.find(id="login_user").text.strip().encode('gbk')
  • 发送验证码

    • 获取验证码

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      captcha_data = {
      "login_site": "E",
      "module": "login",
      "rand": "sjrand",
      "0.17231872703389062": ""
      }
      param = urlencode(captcha_data)
      url = config.captcha_url.format(param)
      response = urllib2.urlopen(urllib2.Request(
      url, headers=config.headers))
      if response.getcode() == 200:
      file = BytesIO(response.read())
      img = Image.open(file)
      img.show()
    • 获取验证码输入坐标

      1
      2
      3
      4
      5
      6
      7
      positions = raw_input("输入验证码(以','分割):".decode('utf-8').encode('gbk'))
      pos = positions.rstrip('\r').strip().split(',')
      temp = ''
      for item in pos:
      temp += config.captcha_point[item] + ','
      pos_res = temp.rstrip(',')
      print pos_res
    • 核对验证码

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      captcha_postdata = {
      "answer": pos_res,
      "login_site": "E",
      "rand": "sjrand"
      }
      captcha_postdata = urlencode(captcha_postdata)
      response = urllib2.urlopen(urllib2.Request(config.check_captcha_url,
      headers=config.headers2), data=captcha_postdata)
      if response.getcode() == 200:
      result = json.loads(response.read())
      result_message = result.get("result_message")
      result_message = result_message.encode('gbk')
      print result_message
      if result.get("result_code") == "4":
      print "captcha OK"
      return True
      else:
      return False
      return False
  • 发送登录信息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    response = urllib2.urlopen(urllib2.Request(
    config.login_url, headers=config.headers), data=urlencode(config.logininfo))
    if response.getcode() == 200:
    result = json.loads(response.read())
    print(result.get('result_message').encode('gbk'))
    if result.get('result_code') != 0:
    return False

    response = urllib2.urlopen(urllib2.Request(
    config.uamtk_url, headers=config.headers), data=urlencode({"appid": "otn"}))
    if response.getcode() == 200:
    result = json.loads(response.read())
    print("uamtk:" + result.get("result_message").encode('gbk'))
    newapptk = result.get("newapptk")

    response = urllib2.urlopen(urllib2.Request(
    config.auth_url, headers=config.headers), data=urlencode({"tk": newapptk}))
    if response.getcode() == 200:
    result = json.loads(response.read())
    print("auth:" + result.get("result_message").encode('gbk'))
    print("username:" + result.get("username").encode('gbk'))

    response = urllib2.urlopen(urllib2.Request(
    config.initmy12306_url, headers=config.headers))
    if response.getcode() == 200:
    html = response.read()
    soup = BeautifulSoup(html, 'html.parser')
    name = soup.find(id="login_user").text.strip()
    print name.encode('gbk')
    if name == config.loginname:
    return True
    return False

    获取站点代码

1
2
3
4
5
6
7
8
9
10
11
station_url = 'https://kyfw.12306.cn/otn/resources/js/framework/station_name.js?station_version=1.9069'
response = urllib2.urlopen(urllib2.Request(
station_url, headers=config.headers))
jstxt = response.read().split('\'')[1]
station = jstxt.split('@')

for item in station:
if item != '':
v = item.split('|')
dict[v[1]] = v[2]
print('GetStation Suc...')

查询余票

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
response = urllib2.urlopen(urllib2.Request(
config.left_tickets_sel_init, headers=config.headers))
# time.sleep(1000)
# https://kyfw.12306.cn/otn/leftTicket/queryA?leftTicketDTO.train_date=2018-10-01&leftTicketDTO.from_station=SZQ&leftTicketDTO.to_station=WHN&purpose_codes=ADULT
left_tickets_url = config.left_tickets_url + \
'leftTicketDTO.train_date=' + config.Train_Date + \
'&leftTicketDTO.from_station=' + station.GetStationDict(config.From_Station) + \
'&leftTicketDTO.to_station=' + station.GetStationDict(config.To_Station) + \
'&purpose_codes=ADULT'
print left_tickets_url
response = urllib2.urlopen(urllib2.Request(
left_tickets_url, headers=config.headers))
if response.getcode() == 200:
r = response.read()
jdict = json.loads(r)
raw_trains = jdict['data']
raw_trains = raw_trains['result']
pt = PrettyTable()
pt._set_field_names("车次,车站,时间,经历时,一等座,二等座,软卧,硬卧,硬座,无座".split(','))
train_dict = []
for raw_train in raw_trains:
# split切割之后得到的是一个列表
data_list = raw_train.split("|")
dict = {}

orderurlstr = unquote(data_list[0])
dict['orderurlstr'] = orderurlstr
stationTrainCode = data_list[2]
dict['stationTrainCode'] = stationTrainCode
train_no = data_list[3]
dict['train_no'] = train_no
from_station_code = data_list[6]
dict['from_station_code'] = from_station_code
to_station_code = data_list[7]
dict['to_station_code'] = to_station_code
from_station_name = station.GetStationName(from_station_code)
dict['from_station_name'] = from_station_name
to_station_name = station.GetStationName(to_station_code)
dict['to_station_name'] = to_station_name
start_time = data_list[8]
dict['start_time'] = start_time
arrive_time = data_list[9]
dict['arrive_time'] = arrive_time
time_duration = data_list[10]
dict['time_duration'] = time_duration
train_location = data_list[15]
dict['train_location'] = train_location
first_class_seat = data_list[31] or "--"
dict['first_class_seat'] = first_class_seat
second_class_seat = data_list[30] or "--"
dict['second_class_seat'] = second_class_seat
soft_sleep = data_list[23] or "--"
dict['soft_sleep'] = soft_sleep
hard_sleep = data_list[28] or "--"
dict['hard_sleep'] = hard_sleep
hard_seat = data_list[29] or "--"
dict['hard_seat'] = hard_seat
no_seat = data_list[33] or "--"
dict['no_seat'] = no_seat

pt.add_row([
# 对特定文字添加颜色
train_no,
'\n'.join([station.GetStationName(from_station_code),
station.GetStationName(to_station_code)]),
'\n'.join([start_time, arrive_time]),
time_duration,
first_class_seat,
second_class_seat,
soft_sleep,
hard_sleep,
hard_seat,
no_seat
])
train_dict.append(dict)

# print(pt)
print train_dict
return train_dict

检查用户登录情况

1
2
3
4
5
6
7
8
9
10
response = urllib2.urlopen(urllib2.Request(
config.checkuser_url, headers=config.headers), data=urlencode({"_json_att": ""}))
if response.getcode() == 200:
result = json.loads(response.read())
if result['data']['flag']:
print('用户在线验证成功'.decode('utf-8').encode('gbk'))
return True
else:
print('检查到用户不在线,请重新登陆'.decode('utf-8').encode('gbk'))
return False

初次提交订单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
data = {"secretStr": train_number_str,
"train_date": config.Train_Date,
"back_train_date": datetime.datetime.now().strftime("%Y-%m-%d"),
"tour_flag": "dc",
"purpose_codes": "ADULT",
"query_from_station_name": config.From_Station,
"query_to_station_name": config.To_Station,
"undefined": ""
}
response = urllib2.urlopen(urllib2.Request(
config.submit_order_url, headers=config.headers), data=urlencode(data))
if response.getcode() == 200:
result = json.loads(response.read())
if result['status']:
print('初次提交订单成功'.decode('utf-8').encode('gbk'))
return True
elif result['messages'] != []:
if result['messages'][0] == "车票信息已过期,请重新查询最新车票信息":
print('车票信息已过期,请重新查询最新车票信息'.decode('utf-8').encode('gbk'))
return "ticketInfoOutData"
else:
print("提交失败".decode('utf-8').encode('gbk'))
return False

获取旅客信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
data = {
"_json_att": "",
"REPEAT_SUBMIT_TOKEN": reSubmitTk
}
response = urllib2.urlopen(urllib2.Request(
config.get_passenger_url, headers=config.headers), data=urlencode(data))
if response.getcode() == 200:
result = json.loads(response.read())

if result['messages'] != []:
if result['messages'][0] == '系统忙,请稍后重试':
return 'systembusy'
passengerAllInfoList = result['data']['normal_passengers']
print("获取联系人信息成功".decode('utf-8').encode('gbk'))
return passengerAllInfoList

核对订单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
passengerTicketStr = ""
oldPassengerStr = ""
for item in passengerslist:
passengerTicketStr += seat.GetSeatType(config.SeatType) + ',0,1,' + \
item['passenger_name'] + ',1,' + \
item['passenger_id_no'] + ',' + item['mobile_no'] + ',N_'
oldPassengerStr += item['passenger_name'] + \
',1,' + item['passenger_id_no']+',1_'

passengerTicketStr = passengerTicketStr.encode('utf-8')
oldPassengerStr = oldPassengerStr.encode('utf-8')
data = {
"cancel_flag": "2",
"bed_level_order_num": "000000000000000000000000000000",
"passengerTicketStr": passengerTicketStr,
"oldPassengerStr": oldPassengerStr,
"tour_flag": "dc",
"tour_flag": "dc",
"randCode": "",
"whatsSelect": "1",
"_json_att": "",
"REPEAT_SUBMIT_TOKEN": reSubmitTk
}
response = urllib2.urlopen(urllib2.Request(
config.check_order_info_url, headers=config.headers), data=urlencode(data))
if response.getcode() == 200:
result = json.loads(response.read())
if result['data']['submitStatus']:
if result['data']['ifShowPassCode'] == 'N':
print("checkOrder")
return True
if result['data']['ifShowPassCode'] == 'Y':
GetBuyImage()
return "Need Random Code"
else:
print("checkOrderFail")
print(result['data']['errMsg'].encode('gbk'))
return False

获取队列

  • 开始进入购票队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    thatdaydata = datetime.datetime.strptime(config.Train_Date, "%Y-%m-%d")
    train_date_f = "{} {} {} {} 00:00:00 GMT+0800 (中国标准时间)".format(thatdaydata.strftime('%a'),
    thatdaydata.strftime(
    '%b'), config.Train_Date.split('-')[2],
    config.Train_Date.split('-')[0])
    data = {
    "train_date": train_date_f,
    "train_no": trainInfo['train_no'],
    "stationTrainCode": trainInfo['stationTrainCode'],
    "seatType": seat.GetSeatType(config.SeatType),
    "fromStationTelecode": trainInfo['from_station_code'],
    "toStationTelecode": trainInfo['to_station_code'],
    "leftTicket": leftTicketStr,
    "purpose_codes": "00",
    "train_location": trainInfo['train_location'],
    "_json_att": "",
    "REPEAT_SUBMIT_TOKEN": reSubmitTk
    }
    response = urllib2.urlopen(urllib2.Request(
    config.get_queue_count_url, headers=config.headers), data=urlencode(data))
    if response.getcode() == 200:
    try:
    result = json.loads(response.read())
    except:
    return "NetWorkError"
    if result['status']:
    print("进入队列成功".decode('utf-8').encode('gbk'))
    return True
    else:
    print("进入队列失败".decode('utf-8').encode('gbk'))
    return False
  • 确认单人队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    passengerTicketStr = ""
    oldPassengerStr = ""
    for item in passengerslist:
    passengerTicketStr += seat.GetSeatType(config.SeatType) + ',0,1,' + \
    item['passenger_name'] + ',1,' + \
    item['passenger_id_no'] + ',' + item['mobile_no']+',N_'
    oldPassengerStr += item['passenger_name'] + \
    ',1,' + item['passenger_id_no']+',1_'

    passengerTicketStr = passengerTicketStr.encode('utf-8')
    oldPassengerStr = oldPassengerStr.encode('utf-8')
    data = {
    "passengerTicketStr": passengerTicketStr,
    "oldPassengerStr": oldPassengerStr,
    "randCode": "",
    "purpose_codes": "00",
    "key_check_isChange": keyIsChange,
    "leftTicketStr": leftTicketStr,
    "train_location": traininfo['train_location'],
    "choose_seats": "",
    "seatDetailType": "000",
    "whatsSelect": "1",
    "roomType": "00",
    "dwAll": "N",
    "_json_att": "",
    "REPEAT_SUBMIT_TOKEN": reSubmitTk
    }
    response = urllib2.urlopen(urllib2.Request(
    config.confirm_order_url, headers=config.headers), data=urlencode(data))
    if response.getcode() == 200:
    try:
    result = json.loads(response.read())
    except:
    return "NetWorkError"

    if 'data' in result.keys():
    if result['data']['submitStatus'] is True:
    print("确认提交订单成功".decode('utf-8').encode('gbk'))
    return True
    elif result['data']['errMsg'] == u"验证码输入错误!":
    return "wrongCode"

    else:
    print("提交订单失败".decode('utf-8').encode('gbk'))
    return False

    等待队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
orderId = ''
url = config.wait_oder_url + '?random={}&tourFlag=dc&_json_att=&REPEAT_SUBMIT_TOKEN={}'.format(
int(time.time()*1000), reSubmitTk)
response = urllib2.urlopen(urllib2.Request(url, headers=config.headers))
if response.getcode() == 200:
try:
result = json.loads(response.read())
except:
return ''
if result['status']:
if result['data']['queryOrderWaitTimeStatus']:
if result['data']['waitTime'] > 0:
print result['data']['waitTime']
return ''
elif result['data']['waitTime'] == -1:
orderId = result['data']['orderId']
print orderId
print "请登录12306,完成后续支付"
return orderId
elif result['data']['waitTime'] == -2:
print result['data']['msg'].encode('gbk')
return 'error'
else:
return 'error'
else:
return ''
else:
return ''

发送邮件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# SMTP 服务
mail_host = "smtp.126.com" #设置服务器
mail_port = 25
mail_user = "" #用户名
mail_pass = "" #口令

#收发信息
FromEmail = 'tipinfo@126.com'
ToEmails = [config.Email,] # 接收邮件,可设置为你的QQ邮箱或者其他邮箱

EmailTitle = '12306有票了'
EmailContent = '快来付款!'

msg = MIMEMultipart()
msg['From'] = "{}".format(FromEmail)
msg['To'] = ",".join(ToEmails)
msg['Subject'] = EmailTitle

msg.attach(MIMEText(EmailContent, 'plain', 'utf-8'))

#attach1 = MIMEText(open('f:\getweather.log', 'rb').read(), 'base64', 'utf-8')
#attach1["Content-Type"] = 'pplication/octet-stream'
#attach1["Content-Disposition"] = 'attrachment;filename="getweather.txt"'
#msg.attach(attach1)

try:
server = smtplib.SMTP(mail_host, mail_port)
server.set_debuglevel(1)
server.login(mail_user,mail_pass)
server.sendmail(FromEmail, ToEmails, msg.as_string())
server.quit()
print "邮件发送成功"
except smtplib.SMTPException as e:
print "Error: 无法发送邮件"
print e
坚持原创技术分享,您的支持将鼓励我继续创作!