Предыдущая статья — Python AI в StarCraft II. Часть XIV: совершенствуем разведку.
Добро пожаловать в пятнадцатую часть серии статей про использование искусственного интеллекта в игре Starcraft II. Здесь мы увеличим возможности выбора для нашего AI.
Начнем с того, что добавим в метод __init__
следующий код:
self.choices = {0: self.build_scout, 1: self.build_zealot, 2: self.build_gateway, 3: self.build_voidray, 4: self.build_stalker, 5: self.build_worker, 6: self.build_assimilator, 7: self.build_stargate, 8: self.build_pylon, 9: self.defend_nexus, 10: self.attack_known_enemy_unit, 11: self.attack_known_enemy_structure, 12: self.expand, 13: self.do_nothing, }
Теперь изменим метод on_step
:
async def on_step(self, iteration): self.time = (self.state.game_loop/22.4) / 60 #print('Time:',self.time) await self.distribute_workers() await self.scout() await self.intel() await self.do_something()
Далее нам нужно создать метод self.do_something()
:
async def do_something(self): if self.time > self.do_something_after: if self.use_model: prediction = self.model.predict([self.flipped.reshape([-1, 176, 200, 3])]) choice = np.argmax(prediction[0]) else: choice = random.randrange(0, 14) try: await self.choices[choice]() except Exception as e: print(str(e)) y = np.zeros(14) y[choice] = 1 self.train_data.append([y, self.flipped])
Легче легкого!
Что ж, теперь убедимся, что у нас есть все необходимые методы.
[machinelearning_ad_block]У нас уже должен быть метод для создания разведчика:
async def build_scout(self): for rf in self.units(ROBOTICSFACILITY).ready.noqueue: print(len(self.units(OBSERVER)), self.time/3) if self.can_afford(OBSERVER) and self.supply_left > 0: await self.do(rf.train(OBSERVER)) break
Далее создадим Зелот:
async def build_zealot(self): gateways = self.units(GATEWAY).ready if gateways.exists: if self.can_afford(ZEALOT): await self.do(random.choice(gateways).train(ZEALOT))
Затем Врата:
async def build_gateway(self): pylon = self.units(PYLON).ready.random if self.can_afford(GATEWAY) and not self.already_pending(GATEWAY): await self.build(GATEWAY, near=pylon)
Луч Бездны:
async def build_voidray(self): stargates = self.units(STARGATE).ready if stargates.exists: if self.can_afford(VOIDRAY): await self.do(random.choice(stargates).train(VOIDRAY))
Сталкер:
async def build_stalker(self): pylon = self.units(PYLON).ready.random gateways = self.units(GATEWAY).ready cybernetics_cores = self.units(CYBERNETICSCORE).ready if gateways.exists and cybernetics_cores.exists: if self.can_afford(STALKER): await self.do(random.choice(gateways).train(STALKER)) if not cybernetics_cores.exists: if self.units(GATEWAY).ready.exists: if self.can_afford(CYBERNETICSCORE) and not self.already_pending(CYBERNETICSCORE): await self.build(CYBERNETICSCORE, near=pylon)
Рабочий:
async def build_worker(self): nexuses = self.units(NEXUS).ready if nexuses.exists: if self.can_afford(PROBE): await self.do(random.choice(nexuses).train(PROBE))
Ассимилятор:
async def build_assimilator(self): for nexus in self.units(NEXUS).ready: vaspenes = self.state.vespene_geyser.closer_than(15.0, nexus) for vaspene in vaspenes: if not self.can_afford(ASSIMILATOR): break worker = self.select_build_worker(vaspene.position) if worker is None: break if not self.units(ASSIMILATOR).closer_than(1.0, vaspene).exists: await self.do(worker.build(ASSIMILATOR, vaspene))
Звездные Врата:
async def build_stargate(self): if self.units(PYLON).ready.exists: pylon = self.units(PYLON).ready.random if self.units(CYBERNETICSCORE).ready.exists: if self.can_afford(STARGATE) and not self.already_pending(STARGATE): await self.build(STARGATE, near=pylon)
Пилоны:
async def build_pylon(self): nexuses = self.units(NEXUS).ready if nexuses.exists: if self.can_afford(PYLON): await self.build(PYLON, near=self.units(NEXUS).first.position.towards(self.game_info.map_center, 5))
Обратите внимание, что код для Пилонов мы немного изменили, чтобы избежать засорения ресурсных областей.
Расширение:
async def expand(self): try: if self.can_afford(NEXUS): await self.expand_now() except Exception as e: print(str(e))
Наши четыре варианта атаки:
async def do_nothing(self): wait = random.randrange(7, 100)/100 self.do_something_after = self.time + wait async def defend_nexus(self): if len(self.known_enemy_units) > 0: target = self.known_enemy_units.closest_to(random.choice(self.units(NEXUS))) for u in self.units(VOIDRAY).idle: await self.do(u.attack(target)) for u in self.units(STALKER).idle: await self.do(u.attack(target)) for u in self.units(ZEALOT).idle: await self.do(u.attack(target)) async def attack_known_enemy_structure(self): if len(self.known_enemy_structures) > 0: target = random.choice(self.known_enemy_structures) for u in self.units(VOIDRAY).idle: await self.do(u.attack(target)) for u in self.units(STALKER).idle: await self.do(u.attack(target)) for u in self.units(ZEALOT).idle: await self.do(u.attack(target)) async def attack_known_enemy_unit(self): if len(self.known_enemy_units) > 0: target = self.known_enemy_units.closest_to(random.choice(self.units(NEXUS))) for u in self.units(VOIDRAY).idle: await self.do(u.attack(target)) for u in self.units(STALKER).idle: await self.do(u.attack(target)) for u in self.units(ZEALOT).idle: await self.do(u.attack(target))
Мы также внесли изменения, чтобы AI было легче соревноваться с самим собой:
def __init__(self, use_model=False, title=1): self.MAX_WORKERS = 50 self.do_something_after = 0 self.use_model = use_model self.title = title ...
Добавим self.title
в метод intel
:
if not HEADLESS: cv2.imshow(str(self.title), resized) cv2.waitKey(1)
Весь код теперь принял следующий вид:
import sc2 from sc2 import run_game, maps, Race, Difficulty, Result from sc2.player import Bot, Computer from sc2 import position from sc2.constants import NEXUS, PROBE, PYLON, ASSIMILATOR, GATEWAY, \ CYBERNETICSCORE, STARGATE, VOIDRAY, SCV, DRONE, ROBOTICSFACILITY, OBSERVER, \ ZEALOT, STALKER import random import cv2 import numpy as np import os import time import math #import keras #os.environ["SC2PATH"] = '/starcraftstuff/StarCraftII/' HEADLESS = False class SentdeBot(sc2.BotAI): def __init__(self, use_model=False, title=1): self.MAX_WORKERS = 50 self.do_something_after = 0 self.use_model = use_model self.title = title self.scouts_and_spots = {} # ADDED THE CHOICES # self.choices = {0: self.build_scout, 1: self.build_zealot, 2: self.build_gateway, 3: self.build_voidray, 4: self.build_stalker, 5: self.build_worker, 6: self.build_assimilator, 7: self.build_stargate, 8: self.build_pylon, 9: self.defend_nexus, 10: self.attack_known_enemy_unit, 11: self.attack_known_enemy_structure, 12: self.expand, 13: self.do_nothing, } self.train_data = [] if self.use_model: print("USING MODEL!") self.model = keras.models.load_model("BasicCNN-30-epochs-0.0001-LR-4.2") def on_end(self, game_result): print('--- on_end called ---') print(game_result, self.use_model) with open("gameout-random-vs-medium.txt","a") as f: if self.use_model: f.write("Model {} - {}\n".format(game_result, int(time.time()))) else: f.write("Random {} - {}\n".format(game_result, int(time.time()))) async def on_step(self, iteration): self.time = (self.state.game_loop/22.4) / 60 #print('Time:',self.time) await self.distribute_workers() await self.scout() await self.intel() await self.do_something() def random_location_variance(self, location): x = location[0] y = location[1] # FIXED THIS x += random.randrange(-5,5) y += random.randrange(-5,5) if x < 0: print("x below") x = 0 if y < 0: print("y below") y = 0 if x > self.game_info.map_size[0]: print("x above") x = self.game_info.map_size[0] if y > self.game_info.map_size[1]: print("y above") y = self.game_info.map_size[1] go_to = position.Point2(position.Pointlike((x,y))) return go_to async def scout(self): ''' ['__call__', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_game_data', '_proto', '_type_data', 'add_on_tag', 'alliance', 'assigned_harvesters', 'attack', 'build', 'build_progress', 'cloak', 'detect_range', 'distance_to', 'energy', 'facing', 'gather', 'has_add_on', 'has_buff', 'health', 'health_max', 'hold_position', 'ideal_harvesters', 'is_blip', 'is_burrowed', 'is_enemy', 'is_flying', 'is_idle', 'is_mine', 'is_mineral_field', 'is_powered', 'is_ready', 'is_selected', 'is_snapshot', 'is_structure', 'is_vespene_geyser', 'is_visible', 'mineral_contents', 'move', 'name', 'noqueue', 'orders', 'owner_id', 'position', 'radar_range', 'radius', 'return_resource', 'shield', 'shield_max', 'stop', 'tag', 'train', 'type_id', 'vespene_contents', 'warp_in'] ''' self.expand_dis_dir = {} for el in self.expansion_locations: distance_to_enemy_start = el.distance_to(self.enemy_start_locations[0]) #print(distance_to_enemy_start) self.expand_dis_dir[distance_to_enemy_start] = el self.ordered_exp_distances = sorted(k for k in self.expand_dis_dir) existing_ids = [unit.tag for unit in self.units] # removing of scouts that are actually dead now. to_be_removed = [] for noted_scout in self.scouts_and_spots: if noted_scout not in existing_ids: to_be_removed.append(noted_scout) for scout in to_be_removed: del self.scouts_and_spots[scout] if len(self.units(ROBOTICSFACILITY).ready) == 0: unit_type = PROBE unit_limit = 1 else: unit_type = OBSERVER unit_limit = 15 assign_scout = True if unit_type == PROBE: for unit in self.units(PROBE): if unit.tag in self.scouts_and_spots: assign_scout = False if assign_scout: if len(self.units(unit_type).idle) > 0: for obs in self.units(unit_type).idle[:unit_limit]: if obs.tag not in self.scouts_and_spots: for dist in self.ordered_exp_distances: try: location = next(value for key, value in self.expand_dis_dir.items() if key == dist) # DICT {UNIT_ID:LOCATION} active_locations = [self.scouts_and_spots[k] for k in self.scouts_and_spots] if location not in active_locations: if unit_type == PROBE: for unit in self.units(PROBE): if unit.tag in self.scouts_and_spots: continue await self.do(obs.move(location)) self.scouts_and_spots[obs.tag] = location break except Exception as e: pass for obs in self.units(unit_type): if obs.tag in self.scouts_and_spots: if obs in [probe for probe in self.units(PROBE)]: await self.do(obs.move(self.random_location_variance(self.scouts_and_spots[obs.tag]))) async def intel(self): game_data = np.zeros((self.game_info.map_size[1], self.game_info.map_size[0], 3), np.uint8) draw_dict = { NEXUS: [15, (0, 255, 0)], PYLON: [3, (20, 235, 0)], PROBE: [1, (55, 200, 0)], ASSIMILATOR: [2, (55, 200, 0)], GATEWAY: [3, (200, 100, 0)], CYBERNETICSCORE: [3, (150, 150, 0)], STARGATE: [5, (255, 0, 0)], ROBOTICSFACILITY: [5, (215, 155, 0)], #VOIDRAY: [3, (255, 100, 0)], } for unit_type in draw_dict: for unit in self.units(unit_type).ready: pos = unit.position cv2.circle(game_data, (int(pos[0]), int(pos[1])), draw_dict[unit_type][0], draw_dict[unit_type][1], -1) # from Александр Тимофеев via YT main_base_names = ['nexus', 'commandcenter', 'orbitalcommand', 'planetaryfortress', 'hatchery'] for enemy_building in self.known_enemy_structures: pos = enemy_building.position if enemy_building.name.lower() not in main_base_names: cv2.circle(game_data, (int(pos[0]), int(pos[1])), 5, (200, 50, 212), -1) for enemy_building in self.known_enemy_structures: pos = enemy_building.position if enemy_building.name.lower() in main_base_names: cv2.circle(game_data, (int(pos[0]), int(pos[1])), 15, (0, 0, 255), -1) for enemy_unit in self.known_enemy_units: if not enemy_unit.is_structure: worker_names = ["probe", "scv", "drone"] # if that unit is a PROBE, SCV, or DRONE... it's a worker pos = enemy_unit.position if enemy_unit.name.lower() in worker_names: cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (55, 0, 155), -1) else: cv2.circle(game_data, (int(pos[0]), int(pos[1])), 3, (50, 0, 215), -1) for obs in self.units(OBSERVER).ready: pos = obs.position cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (255, 255, 255), -1) for vr in self.units(VOIDRAY).ready: pos = vr.position cv2.circle(game_data, (int(pos[0]), int(pos[1])), 3, (255, 100, 0), -1) line_max = 50 mineral_ratio = self.minerals / 1500 if mineral_ratio > 1.0: mineral_ratio = 1.0 vespene_ratio = self.vespene / 1500 if vespene_ratio > 1.0: vespene_ratio = 1.0 population_ratio = self.supply_left / self.supply_cap if population_ratio > 1.0: population_ratio = 1.0 plausible_supply = self.supply_cap / 200.0 worker_weight = len(self.units(PROBE)) / (self.supply_cap-self.supply_left) if worker_weight > 1.0: worker_weight = 1.0 cv2.line(game_data, (0, 19), (int(line_max*worker_weight), 19), (250, 250, 200), 3) # worker/supply ratio cv2.line(game_data, (0, 15), (int(line_max*plausible_supply), 15), (220, 200, 200), 3) # plausible supply (supply/200.0) cv2.line(game_data, (0, 11), (int(line_max*population_ratio), 11), (150, 150, 150), 3) # population ratio (supply_left/supply) cv2.line(game_data, (0, 7), (int(line_max*vespene_ratio), 7), (210, 200, 0), 3) # gas / 1500 cv2.line(game_data, (0, 3), (int(line_max*mineral_ratio), 3), (0, 255, 25), 3) # minerals minerals/1500 # flip horizontally to make our final fix in visual representation: self.flipped = cv2.flip(game_data, 0) resized = cv2.resize(self.flipped, dsize=None, fx=2, fy=2) if not HEADLESS: cv2.imshow(str(self.title), resized) cv2.waitKey(1) def find_target(self, state): if len(self.known_enemy_units) > 0: return random.choice(self.known_enemy_units) elif len(self.known_enemy_structures) > 0: return random.choice(self.known_enemy_structures) else: return self.enemy_start_locations[0] async def build_scout(self): for rf in self.units(ROBOTICSFACILITY).ready.noqueue: print(len(self.units(OBSERVER)), self.time/3) if self.can_afford(OBSERVER) and self.supply_left > 0: await self.do(rf.train(OBSERVER)) break async def build_worker(self): nexuses = self.units(NEXUS).ready if nexuses.exists: if self.can_afford(PROBE): await self.do(random.choice(nexuses).train(PROBE)) async def build_zealot(self): gateways = self.units(GATEWAY).ready if gateways.exists: if self.can_afford(ZEALOT): await self.do(random.choice(gateways).train(ZEALOT)) async def build_gateway(self): pylon = self.units(PYLON).ready.random if self.can_afford(GATEWAY) and not self.already_pending(GATEWAY): await self.build(GATEWAY, near=pylon) async def build_voidray(self): stargates = self.units(STARGATE).ready if stargates.exists: if self.can_afford(VOIDRAY): await self.do(random.choice(stargates).train(VOIDRAY)) async def build_stalker(self): pylon = self.units(PYLON).ready.random gateways = self.units(GATEWAY).ready cybernetics_cores = self.units(CYBERNETICSCORE).ready if gateways.exists and cybernetics_cores.exists: if self.can_afford(STALKER): await self.do(random.choice(gateways).train(STALKER)) if not cybernetics_cores.exists: if self.units(GATEWAY).ready.exists: if self.can_afford(CYBERNETICSCORE) and not self.already_pending(CYBERNETICSCORE): await self.build(CYBERNETICSCORE, near=pylon) async def build_assimilator(self): for nexus in self.units(NEXUS).ready: vaspenes = self.state.vespene_geyser.closer_than(15.0, nexus) for vaspene in vaspenes: if not self.can_afford(ASSIMILATOR): break worker = self.select_build_worker(vaspene.position) if worker is None: break if not self.units(ASSIMILATOR).closer_than(1.0, vaspene).exists: await self.do(worker.build(ASSIMILATOR, vaspene)) async def build_stargate(self): if self.units(PYLON).ready.exists: pylon = self.units(PYLON).ready.random if self.units(CYBERNETICSCORE).ready.exists: if self.can_afford(STARGATE) and not self.already_pending(STARGATE): await self.build(STARGATE, near=pylon) async def build_pylon(self): nexuses = self.units(NEXUS).ready if nexuses.exists: if self.can_afford(PYLON): await self.build(PYLON, near=self.units(NEXUS).first.position.towards(self.game_info.map_center, 5)) async def expand(self): try: if self.can_afford(NEXUS): await self.expand_now() except Exception as e: print(str(e)) async def do_nothing(self): wait = random.randrange(7, 100)/100 self.do_something_after = self.time + wait async def defend_nexus(self): if len(self.known_enemy_units) > 0: target = self.known_enemy_units.closest_to(random.choice(self.units(NEXUS))) for u in self.units(VOIDRAY).idle: await self.do(u.attack(target)) for u in self.units(STALKER).idle: await self.do(u.attack(target)) for u in self.units(ZEALOT).idle: await self.do(u.attack(target)) async def attack_known_enemy_structure(self): if len(self.known_enemy_structures) > 0: target = random.choice(self.known_enemy_structures) for u in self.units(VOIDRAY).idle: await self.do(u.attack(target)) for u in self.units(STALKER).idle: await self.do(u.attack(target)) for u in self.units(ZEALOT).idle: await self.do(u.attack(target)) async def attack_known_enemy_unit(self): if len(self.known_enemy_units) > 0: target = self.known_enemy_units.closest_to(random.choice(self.units(NEXUS))) for u in self.units(VOIDRAY).idle: await self.do(u.attack(target)) for u in self.units(STALKER).idle: await self.do(u.attack(target)) for u in self.units(ZEALOT).idle: await self.do(u.attack(target)) async def do_something(self): if self.time > self.do_something_after: if self.use_model: prediction = self.model.predict([self.flipped.reshape([-1, 176, 200, 3])]) choice = np.argmax(prediction[0]) else: choice = random.randrange(0, 14) try: await self.choices[choice]() except Exception as e: print(str(e)) ###### NEW CHOICE HANDLING HERE ######### ###### NEW CHOICE HANDLING HERE ######### y = np.zeros(14) y[choice] = 1 self.train_data.append([y, self.flipped]) if True: run_game(maps.get("AbyssalReefLE"), [ Bot(Race.Protoss, SentdeBot(use_model=False, title=1)), #Bot(Race.Protoss, SentdeBot(use_model=False, title=2)), Computer(Race.Protoss, Difficulty.Medium), ], realtime=False)
Следующая статья — Python AI в StarCraft II. Часть XVI: изменение визуализации.