Предыдущая статья — Python AI в StarCraft II. Часть XIV: совершенствуем разведку.
Добро пожаловать в пятнадцатую часть серии статей про использование искусственного интеллекта в игре Starcraft II. Здесь мы увеличим возможности выбора для нашего AI.
Начнем с того, что добавим в метод __init__ следующий код:
self.choices = {0: self.build_scout,
1: self.build_zealot,
2: self.build_gateway,
3: self.build_voidray,
4: self.build_stalker,
5: self.build_worker,
6: self.build_assimilator,
7: self.build_stargate,
8: self.build_pylon,
9: self.defend_nexus,
10: self.attack_known_enemy_unit,
11: self.attack_known_enemy_structure,
12: self.expand,
13: self.do_nothing,
}
Теперь изменим метод on_step:
async def on_step(self, iteration):
self.time = (self.state.game_loop/22.4) / 60
#print('Time:',self.time)
await self.distribute_workers()
await self.scout()
await self.intel()
await self.do_something()
Далее нам нужно создать метод self.do_something():
async def do_something(self):
if self.time > self.do_something_after:
if self.use_model:
prediction = self.model.predict([self.flipped.reshape([-1, 176, 200, 3])])
choice = np.argmax(prediction[0])
else:
choice = random.randrange(0, 14)
try:
await self.choices[choice]()
except Exception as e:
print(str(e))
y = np.zeros(14)
y[choice] = 1
self.train_data.append([y, self.flipped])
Легче легкого!
Что ж, теперь убедимся, что у нас есть все необходимые методы.
[machinelearning_ad_block]У нас уже должен быть метод для создания разведчика:
async def build_scout(self):
for rf in self.units(ROBOTICSFACILITY).ready.noqueue:
print(len(self.units(OBSERVER)), self.time/3)
if self.can_afford(OBSERVER) and self.supply_left > 0:
await self.do(rf.train(OBSERVER))
break
Далее создадим Зелот:
async def build_zealot(self):
gateways = self.units(GATEWAY).ready
if gateways.exists:
if self.can_afford(ZEALOT):
await self.do(random.choice(gateways).train(ZEALOT))
Затем Врата:
async def build_gateway(self):
pylon = self.units(PYLON).ready.random
if self.can_afford(GATEWAY) and not self.already_pending(GATEWAY):
await self.build(GATEWAY, near=pylon)
Луч Бездны:
async def build_voidray(self):
stargates = self.units(STARGATE).ready
if stargates.exists:
if self.can_afford(VOIDRAY):
await self.do(random.choice(stargates).train(VOIDRAY))
Сталкер:
async def build_stalker(self):
pylon = self.units(PYLON).ready.random
gateways = self.units(GATEWAY).ready
cybernetics_cores = self.units(CYBERNETICSCORE).ready
if gateways.exists and cybernetics_cores.exists:
if self.can_afford(STALKER):
await self.do(random.choice(gateways).train(STALKER))
if not cybernetics_cores.exists:
if self.units(GATEWAY).ready.exists:
if self.can_afford(CYBERNETICSCORE) and not self.already_pending(CYBERNETICSCORE):
await self.build(CYBERNETICSCORE, near=pylon)
Рабочий:
async def build_worker(self):
nexuses = self.units(NEXUS).ready
if nexuses.exists:
if self.can_afford(PROBE):
await self.do(random.choice(nexuses).train(PROBE))
Ассимилятор:
async def build_assimilator(self):
for nexus in self.units(NEXUS).ready:
vaspenes = self.state.vespene_geyser.closer_than(15.0, nexus)
for vaspene in vaspenes:
if not self.can_afford(ASSIMILATOR):
break
worker = self.select_build_worker(vaspene.position)
if worker is None:
break
if not self.units(ASSIMILATOR).closer_than(1.0, vaspene).exists:
await self.do(worker.build(ASSIMILATOR, vaspene))
Звездные Врата:
async def build_stargate(self):
if self.units(PYLON).ready.exists:
pylon = self.units(PYLON).ready.random
if self.units(CYBERNETICSCORE).ready.exists:
if self.can_afford(STARGATE) and not self.already_pending(STARGATE):
await self.build(STARGATE, near=pylon)
Пилоны:
async def build_pylon(self):
nexuses = self.units(NEXUS).ready
if nexuses.exists:
if self.can_afford(PYLON):
await self.build(PYLON, near=self.units(NEXUS).first.position.towards(self.game_info.map_center, 5))
Обратите внимание, что код для Пилонов мы немного изменили, чтобы избежать засорения ресурсных областей.
Расширение:
async def expand(self):
try:
if self.can_afford(NEXUS):
await self.expand_now()
except Exception as e:
print(str(e))
Наши четыре варианта атаки:
async def do_nothing(self):
wait = random.randrange(7, 100)/100
self.do_something_after = self.time + wait
async def defend_nexus(self):
if len(self.known_enemy_units) > 0:
target = self.known_enemy_units.closest_to(random.choice(self.units(NEXUS)))
for u in self.units(VOIDRAY).idle:
await self.do(u.attack(target))
for u in self.units(STALKER).idle:
await self.do(u.attack(target))
for u in self.units(ZEALOT).idle:
await self.do(u.attack(target))
async def attack_known_enemy_structure(self):
if len(self.known_enemy_structures) > 0:
target = random.choice(self.known_enemy_structures)
for u in self.units(VOIDRAY).idle:
await self.do(u.attack(target))
for u in self.units(STALKER).idle:
await self.do(u.attack(target))
for u in self.units(ZEALOT).idle:
await self.do(u.attack(target))
async def attack_known_enemy_unit(self):
if len(self.known_enemy_units) > 0:
target = self.known_enemy_units.closest_to(random.choice(self.units(NEXUS)))
for u in self.units(VOIDRAY).idle:
await self.do(u.attack(target))
for u in self.units(STALKER).idle:
await self.do(u.attack(target))
for u in self.units(ZEALOT).idle:
await self.do(u.attack(target))
Мы также внесли изменения, чтобы AI было легче соревноваться с самим собой:
def __init__(self, use_model=False, title=1):
self.MAX_WORKERS = 50
self.do_something_after = 0
self.use_model = use_model
self.title = title
...
Добавим self.title в метод intel:
if not HEADLESS:
cv2.imshow(str(self.title), resized)
cv2.waitKey(1)
Весь код теперь принял следующий вид:
import sc2
from sc2 import run_game, maps, Race, Difficulty, Result
from sc2.player import Bot, Computer
from sc2 import position
from sc2.constants import NEXUS, PROBE, PYLON, ASSIMILATOR, GATEWAY, \
CYBERNETICSCORE, STARGATE, VOIDRAY, SCV, DRONE, ROBOTICSFACILITY, OBSERVER, \
ZEALOT, STALKER
import random
import cv2
import numpy as np
import os
import time
import math
#import keras
#os.environ["SC2PATH"] = '/starcraftstuff/StarCraftII/'
HEADLESS = False
class SentdeBot(sc2.BotAI):
def __init__(self, use_model=False, title=1):
self.MAX_WORKERS = 50
self.do_something_after = 0
self.use_model = use_model
self.title = title
self.scouts_and_spots = {}
# ADDED THE CHOICES #
self.choices = {0: self.build_scout,
1: self.build_zealot,
2: self.build_gateway,
3: self.build_voidray,
4: self.build_stalker,
5: self.build_worker,
6: self.build_assimilator,
7: self.build_stargate,
8: self.build_pylon,
9: self.defend_nexus,
10: self.attack_known_enemy_unit,
11: self.attack_known_enemy_structure,
12: self.expand,
13: self.do_nothing,
}
self.train_data = []
if self.use_model:
print("USING MODEL!")
self.model = keras.models.load_model("BasicCNN-30-epochs-0.0001-LR-4.2")
def on_end(self, game_result):
print('--- on_end called ---')
print(game_result, self.use_model)
with open("gameout-random-vs-medium.txt","a") as f:
if self.use_model:
f.write("Model {} - {}\n".format(game_result, int(time.time())))
else:
f.write("Random {} - {}\n".format(game_result, int(time.time())))
async def on_step(self, iteration):
self.time = (self.state.game_loop/22.4) / 60
#print('Time:',self.time)
await self.distribute_workers()
await self.scout()
await self.intel()
await self.do_something()
def random_location_variance(self, location):
x = location[0]
y = location[1]
# FIXED THIS
x += random.randrange(-5,5)
y += random.randrange(-5,5)
if x < 0:
print("x below")
x = 0
if y < 0:
print("y below")
y = 0
if x > self.game_info.map_size[0]:
print("x above")
x = self.game_info.map_size[0]
if y > self.game_info.map_size[1]:
print("y above")
y = self.game_info.map_size[1]
go_to = position.Point2(position.Pointlike((x,y)))
return go_to
async def scout(self):
'''
['__call__', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_game_data', '_proto', '_type_data', 'add_on_tag', 'alliance', 'assigned_harvesters', 'attack', 'build', 'build_progress', 'cloak', 'detect_range', 'distance_to', 'energy', 'facing', 'gather', 'has_add_on', 'has_buff', 'health', 'health_max', 'hold_position', 'ideal_harvesters', 'is_blip', 'is_burrowed', 'is_enemy', 'is_flying', 'is_idle', 'is_mine', 'is_mineral_field', 'is_powered', 'is_ready', 'is_selected', 'is_snapshot', 'is_structure', 'is_vespene_geyser', 'is_visible', 'mineral_contents', 'move', 'name', 'noqueue', 'orders', 'owner_id', 'position', 'radar_range', 'radius', 'return_resource', 'shield', 'shield_max', 'stop', 'tag', 'train', 'type_id', 'vespene_contents', 'warp_in']
'''
self.expand_dis_dir = {}
for el in self.expansion_locations:
distance_to_enemy_start = el.distance_to(self.enemy_start_locations[0])
#print(distance_to_enemy_start)
self.expand_dis_dir[distance_to_enemy_start] = el
self.ordered_exp_distances = sorted(k for k in self.expand_dis_dir)
existing_ids = [unit.tag for unit in self.units]
# removing of scouts that are actually dead now.
to_be_removed = []
for noted_scout in self.scouts_and_spots:
if noted_scout not in existing_ids:
to_be_removed.append(noted_scout)
for scout in to_be_removed:
del self.scouts_and_spots[scout]
if len(self.units(ROBOTICSFACILITY).ready) == 0:
unit_type = PROBE
unit_limit = 1
else:
unit_type = OBSERVER
unit_limit = 15
assign_scout = True
if unit_type == PROBE:
for unit in self.units(PROBE):
if unit.tag in self.scouts_and_spots:
assign_scout = False
if assign_scout:
if len(self.units(unit_type).idle) > 0:
for obs in self.units(unit_type).idle[:unit_limit]:
if obs.tag not in self.scouts_and_spots:
for dist in self.ordered_exp_distances:
try:
location = next(value for key, value in self.expand_dis_dir.items() if key == dist)
# DICT {UNIT_ID:LOCATION}
active_locations = [self.scouts_and_spots[k] for k in self.scouts_and_spots]
if location not in active_locations:
if unit_type == PROBE:
for unit in self.units(PROBE):
if unit.tag in self.scouts_and_spots:
continue
await self.do(obs.move(location))
self.scouts_and_spots[obs.tag] = location
break
except Exception as e:
pass
for obs in self.units(unit_type):
if obs.tag in self.scouts_and_spots:
if obs in [probe for probe in self.units(PROBE)]:
await self.do(obs.move(self.random_location_variance(self.scouts_and_spots[obs.tag])))
async def intel(self):
game_data = np.zeros((self.game_info.map_size[1], self.game_info.map_size[0], 3), np.uint8)
draw_dict = {
NEXUS: [15, (0, 255, 0)],
PYLON: [3, (20, 235, 0)],
PROBE: [1, (55, 200, 0)],
ASSIMILATOR: [2, (55, 200, 0)],
GATEWAY: [3, (200, 100, 0)],
CYBERNETICSCORE: [3, (150, 150, 0)],
STARGATE: [5, (255, 0, 0)],
ROBOTICSFACILITY: [5, (215, 155, 0)],
#VOIDRAY: [3, (255, 100, 0)],
}
for unit_type in draw_dict:
for unit in self.units(unit_type).ready:
pos = unit.position
cv2.circle(game_data, (int(pos[0]), int(pos[1])), draw_dict[unit_type][0], draw_dict[unit_type][1], -1)
# from Александр Тимофеев via YT
main_base_names = ['nexus', 'commandcenter', 'orbitalcommand', 'planetaryfortress', 'hatchery']
for enemy_building in self.known_enemy_structures:
pos = enemy_building.position
if enemy_building.name.lower() not in main_base_names:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 5, (200, 50, 212), -1)
for enemy_building in self.known_enemy_structures:
pos = enemy_building.position
if enemy_building.name.lower() in main_base_names:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 15, (0, 0, 255), -1)
for enemy_unit in self.known_enemy_units:
if not enemy_unit.is_structure:
worker_names = ["probe",
"scv",
"drone"]
# if that unit is a PROBE, SCV, or DRONE... it's a worker
pos = enemy_unit.position
if enemy_unit.name.lower() in worker_names:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (55, 0, 155), -1)
else:
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 3, (50, 0, 215), -1)
for obs in self.units(OBSERVER).ready:
pos = obs.position
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (255, 255, 255), -1)
for vr in self.units(VOIDRAY).ready:
pos = vr.position
cv2.circle(game_data, (int(pos[0]), int(pos[1])), 3, (255, 100, 0), -1)
line_max = 50
mineral_ratio = self.minerals / 1500
if mineral_ratio > 1.0:
mineral_ratio = 1.0
vespene_ratio = self.vespene / 1500
if vespene_ratio > 1.0:
vespene_ratio = 1.0
population_ratio = self.supply_left / self.supply_cap
if population_ratio > 1.0:
population_ratio = 1.0
plausible_supply = self.supply_cap / 200.0
worker_weight = len(self.units(PROBE)) / (self.supply_cap-self.supply_left)
if worker_weight > 1.0:
worker_weight = 1.0
cv2.line(game_data, (0, 19), (int(line_max*worker_weight), 19), (250, 250, 200), 3) # worker/supply ratio
cv2.line(game_data, (0, 15), (int(line_max*plausible_supply), 15), (220, 200, 200), 3) # plausible supply (supply/200.0)
cv2.line(game_data, (0, 11), (int(line_max*population_ratio), 11), (150, 150, 150), 3) # population ratio (supply_left/supply)
cv2.line(game_data, (0, 7), (int(line_max*vespene_ratio), 7), (210, 200, 0), 3) # gas / 1500
cv2.line(game_data, (0, 3), (int(line_max*mineral_ratio), 3), (0, 255, 25), 3) # minerals minerals/1500
# flip horizontally to make our final fix in visual representation:
self.flipped = cv2.flip(game_data, 0)
resized = cv2.resize(self.flipped, dsize=None, fx=2, fy=2)
if not HEADLESS:
cv2.imshow(str(self.title), resized)
cv2.waitKey(1)
def find_target(self, state):
if len(self.known_enemy_units) > 0:
return random.choice(self.known_enemy_units)
elif len(self.known_enemy_structures) > 0:
return random.choice(self.known_enemy_structures)
else:
return self.enemy_start_locations[0]
async def build_scout(self):
for rf in self.units(ROBOTICSFACILITY).ready.noqueue:
print(len(self.units(OBSERVER)), self.time/3)
if self.can_afford(OBSERVER) and self.supply_left > 0:
await self.do(rf.train(OBSERVER))
break
async def build_worker(self):
nexuses = self.units(NEXUS).ready
if nexuses.exists:
if self.can_afford(PROBE):
await self.do(random.choice(nexuses).train(PROBE))
async def build_zealot(self):
gateways = self.units(GATEWAY).ready
if gateways.exists:
if self.can_afford(ZEALOT):
await self.do(random.choice(gateways).train(ZEALOT))
async def build_gateway(self):
pylon = self.units(PYLON).ready.random
if self.can_afford(GATEWAY) and not self.already_pending(GATEWAY):
await self.build(GATEWAY, near=pylon)
async def build_voidray(self):
stargates = self.units(STARGATE).ready
if stargates.exists:
if self.can_afford(VOIDRAY):
await self.do(random.choice(stargates).train(VOIDRAY))
async def build_stalker(self):
pylon = self.units(PYLON).ready.random
gateways = self.units(GATEWAY).ready
cybernetics_cores = self.units(CYBERNETICSCORE).ready
if gateways.exists and cybernetics_cores.exists:
if self.can_afford(STALKER):
await self.do(random.choice(gateways).train(STALKER))
if not cybernetics_cores.exists:
if self.units(GATEWAY).ready.exists:
if self.can_afford(CYBERNETICSCORE) and not self.already_pending(CYBERNETICSCORE):
await self.build(CYBERNETICSCORE, near=pylon)
async def build_assimilator(self):
for nexus in self.units(NEXUS).ready:
vaspenes = self.state.vespene_geyser.closer_than(15.0, nexus)
for vaspene in vaspenes:
if not self.can_afford(ASSIMILATOR):
break
worker = self.select_build_worker(vaspene.position)
if worker is None:
break
if not self.units(ASSIMILATOR).closer_than(1.0, vaspene).exists:
await self.do(worker.build(ASSIMILATOR, vaspene))
async def build_stargate(self):
if self.units(PYLON).ready.exists:
pylon = self.units(PYLON).ready.random
if self.units(CYBERNETICSCORE).ready.exists:
if self.can_afford(STARGATE) and not self.already_pending(STARGATE):
await self.build(STARGATE, near=pylon)
async def build_pylon(self):
nexuses = self.units(NEXUS).ready
if nexuses.exists:
if self.can_afford(PYLON):
await self.build(PYLON, near=self.units(NEXUS).first.position.towards(self.game_info.map_center, 5))
async def expand(self):
try:
if self.can_afford(NEXUS):
await self.expand_now()
except Exception as e:
print(str(e))
async def do_nothing(self):
wait = random.randrange(7, 100)/100
self.do_something_after = self.time + wait
async def defend_nexus(self):
if len(self.known_enemy_units) > 0:
target = self.known_enemy_units.closest_to(random.choice(self.units(NEXUS)))
for u in self.units(VOIDRAY).idle:
await self.do(u.attack(target))
for u in self.units(STALKER).idle:
await self.do(u.attack(target))
for u in self.units(ZEALOT).idle:
await self.do(u.attack(target))
async def attack_known_enemy_structure(self):
if len(self.known_enemy_structures) > 0:
target = random.choice(self.known_enemy_structures)
for u in self.units(VOIDRAY).idle:
await self.do(u.attack(target))
for u in self.units(STALKER).idle:
await self.do(u.attack(target))
for u in self.units(ZEALOT).idle:
await self.do(u.attack(target))
async def attack_known_enemy_unit(self):
if len(self.known_enemy_units) > 0:
target = self.known_enemy_units.closest_to(random.choice(self.units(NEXUS)))
for u in self.units(VOIDRAY).idle:
await self.do(u.attack(target))
for u in self.units(STALKER).idle:
await self.do(u.attack(target))
for u in self.units(ZEALOT).idle:
await self.do(u.attack(target))
async def do_something(self):
if self.time > self.do_something_after:
if self.use_model:
prediction = self.model.predict([self.flipped.reshape([-1, 176, 200, 3])])
choice = np.argmax(prediction[0])
else:
choice = random.randrange(0, 14)
try:
await self.choices[choice]()
except Exception as e:
print(str(e))
###### NEW CHOICE HANDLING HERE #########
###### NEW CHOICE HANDLING HERE #########
y = np.zeros(14)
y[choice] = 1
self.train_data.append([y, self.flipped])
if True:
run_game(maps.get("AbyssalReefLE"), [
Bot(Race.Protoss, SentdeBot(use_model=False, title=1)),
#Bot(Race.Protoss, SentdeBot(use_model=False, title=2)),
Computer(Race.Protoss, Difficulty.Medium),
], realtime=False)
Следующая статья — Python AI в StarCraft II. Часть XVI: изменение визуализации.

