Предыдущая статья — Python AI в StarCraft II. Часть XV: увеличиваем вариативность.
В этой части серии статей про использование AI в игре Starcraft II мы собираемся изменить некоторые визуальные эффекты, которые обрабатывает наша нейронная сеть. Вероятно, нам не нужны цвета и мы хотим отображать больше юнитов.
За исключением строк, описывающих различные ресурсы, мы собираемся полностью переписать наш метод intel
. И еще: вместо отслеживания весов военных юнитов мы будем отслеживать характеристики рабочих.
Начнем, как и раньше:
async def intel(self): game_data = np.zeros((self.game_info.map_size[1], self.game_info.map_size[0], 3), np.uint8)
Теперь переберем наши текущие юниты:
for unit in self.units().ready: pos = unit.position cv2.circle(game_data, (int(pos[0]), int(pos[1])), int(unit.radius*8), (255, 255, 255), math.ceil(int(unit.radius*0.5)))
Затем юниты врага:
for unit in self.known_enemy_units: pos = unit.position cv2.circle(game_data, (int(pos[0]), int(pos[1])), int(unit.radius*8), (125, 125, 125), math.ceil(int(unit.radius*0.5)))
Обратите внимание, что сейчас мы просто рисуем круги. В качестве радиуса мы берем размер юнита. Наши юниты белые, а у противника серые.
А теперь нарисуем линии:
try: line_max = 50 mineral_ratio = self.minerals / 1500 if mineral_ratio > 1.0: mineral_ratio = 1.0 vespene_ratio = self.vespene / 1500 if vespene_ratio > 1.0: vespene_ratio = 1.0 population_ratio = self.supply_left / self.supply_cap if population_ratio > 1.0: population_ratio = 1.0 plausible_supply = self.supply_cap / 200.0 worker_weight = len(self.units(PROBE)) / (self.supply_cap-self.supply_left) if worker_weight > 1.0: worker_weight = 1.0 cv2.line(game_data, (0, 19), (int(line_max*worker_weight), 19), (250, 250, 200), 3) # worker/supply ratio cv2.line(game_data, (0, 15), (int(line_max*plausible_supply), 15), (220, 200, 200), 3) # plausible supply (supply/200.0) cv2.line(game_data, (0, 11), (int(line_max*population_ratio), 11), (150, 150, 150), 3) # population ratio (supply_left/supply) cv2.line(game_data, (0, 7), (int(line_max*vespene_ratio), 7), (210, 200, 0), 3) # gas / 1500 cv2.line(game_data, (0, 3), (int(line_max*mineral_ratio), 3), (0, 255, 25), 3) # minerals minerals/1500 except Exception as e: print(str(e))
Обратите внимание, что мы заменили переменную military_weigh
на worker_weigh
, и соответственно, теперь отображаем размер нашего юнита рабочих.
А закончим мы этот метод вот так:
# flip horizontally to make our final fix in visual representation: grayed = cv2.cvtColor(game_data, cv2.COLOR_BGR2GRAY) self.flipped = cv2.flip(grayed, 0) resized = cv2.resize(self.flipped, dsize=None, fx=2, fy=2) if not HEADLESS: cv2.imshow(str(self.title), resized) cv2.waitKey(1)
Теперь наше визуальное представление имеет примерно такой вид:
[machinelearning_ad_block]Наконец, мы немного поиграли с весами случайных результатов, чтобы усилить нашу базу результатов для обучения.
Внутри нашего метода do_something
мы поработаем вот с этими весами:
async def do_something(self): if self.time > self.do_something_after: if self.use_model: prediction = self.model.predict([self.flipped.reshape([-1, 176, 200, 3])]) choice = np.argmax(prediction[0]) else: worker_weight = 8 zealot_weight = 3 voidray_weight = 20 stalker_weight = 8 pylon_weight = 5 stargate_weight = 5 gateway_weight = 3 choice_weights = 1*[0]+zealot_weight*[1]+gateway_weight*[2]+voidray_weight*[3]+stalker_weight*[4]+worker_weight*[5]+1*[6]+stargate_weight*[7]+pylon_weight*[8]+1*[9]+1*[10]+1*[11]+1*[12]+1*[13] choice = random.choice(choice_weights) try: await self.choices[choice]() except Exception as e: print(str(e)) y = np.zeros(14) y[choice] = 1 self.train_data.append([y, self.flipped])
Вот полный код для создания обучающих данных:
import sc2 from sc2 import run_game, maps, Race, Difficulty, Result from sc2.player import Bot, Computer from sc2 import position from sc2.constants import NEXUS, PROBE, PYLON, ASSIMILATOR, GATEWAY, \ CYBERNETICSCORE, STARGATE, VOIDRAY, SCV, DRONE, ROBOTICSFACILITY, OBSERVER, \ ZEALOT, STALKER import random import cv2 import numpy as np import os import time import math os.environ["SC2PATH"] = '/starcraftstuff/StarCraftII/' HEADLESS = True class SentdeBot(sc2.BotAI): def __init__(self, use_model=False, title=1): self.MAX_WORKERS = 50 self.do_something_after = 0 self.use_model = use_model self.title = title # DICT {UNIT_ID:LOCATION} # every iteration, make sure that unit id still exists! self.scouts_and_spots = {} # ADDED THE CHOICES # self.choices = {0: self.build_scout, 1: self.build_zealot, 2: self.build_gateway, 3: self.build_voidray, 4: self.build_stalker, 5: self.build_worker, 6: self.build_assimilator, 7: self.build_stargate, 8: self.build_pylon, 9: self.defend_nexus, 10: self.attack_known_enemy_unit, 11: self.attack_known_enemy_structure, 12: self.expand, # might just be self.expand_now() lol 13: self.do_nothing, } self.train_data = [] if self.use_model: print("USING MODEL!") self.model = keras.models.load_model("BasicCNN-30-epochs-0.0001-LR-4.2") def on_end(self, game_result): print('--- on_end called ---') print(game_result, self.use_model) #if self.time < 17: if game_result == Result.Victory: np.save("train_data/{}.npy".format(str(int(time.time()))), np.array(self.train_data)) async def on_step(self, iteration): self.time = (self.state.game_loop/22.4) / 60 print('Time:',self.time) if iteration % 5 == 0: await self.distribute_workers() await self.scout() await self.intel() await self.do_something() def random_location_variance(self, location): x = location[0] y = location[1] # FIXED THIS x += random.randrange(-5,5) y += random.randrange(-5,5) if x < 0: print("x below") x = 0 if y < 0: print("y below") y = 0 if x > self.game_info.map_size[0]: print("x above") x = self.game_info.map_size[0] if y > self.game_info.map_size[1]: print("y above") y = self.game_info.map_size[1] go_to = position.Point2(position.Pointlike((x,y))) return go_to async def scout(self): self.expand_dis_dir = {} for el in self.expansion_locations: distance_to_enemy_start = el.distance_to(self.enemy_start_locations[0]) #print(distance_to_enemy_start) self.expand_dis_dir[distance_to_enemy_start] = el self.ordered_exp_distances = sorted(k for k in self.expand_dis_dir) existing_ids = [unit.tag for unit in self.units] # removing of scouts that are actually dead now. to_be_removed = [] for noted_scout in self.scouts_and_spots: if noted_scout not in existing_ids: to_be_removed.append(noted_scout) for scout in to_be_removed: del self.scouts_and_spots[scout] if len(self.units(ROBOTICSFACILITY).ready) == 0: unit_type = PROBE unit_limit = 1 else: unit_type = OBSERVER unit_limit = 15 assign_scout = True if unit_type == PROBE: for unit in self.units(PROBE): if unit.tag in self.scouts_and_spots: assign_scout = False if assign_scout: if len(self.units(unit_type).idle) > 0: for obs in self.units(unit_type).idle[:unit_limit]: if obs.tag not in self.scouts_and_spots: for dist in self.ordered_exp_distances: try: location = next(value for key, value in self.expand_dis_dir.items() if key == dist) # DICT {UNIT_ID:LOCATION} active_locations = [self.scouts_and_spots[k] for k in self.scouts_and_spots] if location not in active_locations: if unit_type == PROBE: for unit in self.units(PROBE): if unit.tag in self.scouts_and_spots: continue await self.do(obs.move(location)) self.scouts_and_spots[obs.tag] = location break except Exception as e: pass for obs in self.units(unit_type): if obs.tag in self.scouts_and_spots: if obs in [probe for probe in self.units(PROBE)]: await self.do(obs.move(self.random_location_variance(self.scouts_and_spots[obs.tag]))) async def intel(self): game_data = np.zeros((self.game_info.map_size[1], self.game_info.map_size[0], 3), np.uint8) for unit in self.units().ready: pos = unit.position cv2.circle(game_data, (int(pos[0]), int(pos[1])), int(unit.radius*8), (255, 255, 255), math.ceil(int(unit.radius*0.5))) for unit in self.known_enemy_units: pos = unit.position cv2.circle(game_data, (int(pos[0]), int(pos[1])), int(unit.radius*8), (125, 125, 125), math.ceil(int(unit.radius*0.5))) try: line_max = 50 mineral_ratio = self.minerals / 1500 if mineral_ratio > 1.0: mineral_ratio = 1.0 vespene_ratio = self.vespene / 1500 if vespene_ratio > 1.0: vespene_ratio = 1.0 population_ratio = self.supply_left / self.supply_cap if population_ratio > 1.0: population_ratio = 1.0 plausible_supply = self.supply_cap / 200.0 worker_weight = len(self.units(PROBE)) / (self.supply_cap-self.supply_left) if worker_weight > 1.0: worker_weight = 1.0 cv2.line(game_data, (0, 19), (int(line_max*worker_weight), 19), (250, 250, 200), 3) # worker/supply ratio cv2.line(game_data, (0, 15), (int(line_max*plausible_supply), 15), (220, 200, 200), 3) # plausible supply (supply/200.0) cv2.line(game_data, (0, 11), (int(line_max*population_ratio), 11), (150, 150, 150), 3) # population ratio (supply_left/supply) cv2.line(game_data, (0, 7), (int(line_max*vespene_ratio), 7), (210, 200, 0), 3) # gas / 1500 cv2.line(game_data, (0, 3), (int(line_max*mineral_ratio), 3), (0, 255, 25), 3) # minerals minerals/1500 except Exception as e: print(str(e)) # flip horizontally to make our final fix in visual representation: grayed = cv2.cvtColor(game_data, cv2.COLOR_BGR2GRAY) self.flipped = cv2.flip(grayed, 0) resized = cv2.resize(self.flipped, dsize=None, fx=2, fy=2) if not HEADLESS: if self.use_model: cv2.imshow(str(self.title), resized) cv2.waitKey(1) else: cv2.imshow(str(self.title), resized) cv2.waitKey(1) def find_target(self, state): if len(self.known_enemy_units) > 0: return random.choice(self.known_enemy_units) elif len(self.known_enemy_structures) > 0: return random.choice(self.known_enemy_structures) else: return self.enemy_start_locations[0] async def build_scout(self): for rf in self.units(ROBOTICSFACILITY).ready.noqueue: print(len(self.units(OBSERVER)), self.time/3) if self.can_afford(OBSERVER) and self.supply_left > 0: await self.do(rf.train(OBSERVER)) break if len(self.units(ROBOTICSFACILITY)) == 0: pylon = self.units(PYLON).ready.noqueue.random if self.units(CYBERNETICSCORE).ready.exists: if self.can_afford(ROBOTICSFACILITY) and not self.already_pending(ROBOTICSFACILITY): await self.build(ROBOTICSFACILITY, near=pylon) async def build_worker(self): nexuses = self.units(NEXUS).ready.noqueue if nexuses.exists: if self.can_afford(PROBE): await self.do(random.choice(nexuses).train(PROBE)) async def build_zealot(self): #if len(self.units(ZEALOT)) < (8 - self.time): # how we can phase out zealots over time? gateways = self.units(GATEWAY).ready.noqueue if gateways.exists: if self.can_afford(ZEALOT): await self.do(random.choice(gateways).train(ZEALOT)) async def build_gateway(self): #if len(self.units(GATEWAY)) < 5: pylon = self.units(PYLON).ready.noqueue.random if self.can_afford(GATEWAY) and not self.already_pending(GATEWAY): await self.build(GATEWAY, near=pylon.position.towards(self.game_info.map_center, 5)) async def build_voidray(self): stargates = self.units(STARGATE).ready.noqueue if stargates.exists: if self.can_afford(VOIDRAY): await self.do(random.choice(stargates).train(VOIDRAY)) async def build_stalker(self): pylon = self.units(PYLON).ready.noqueue.random gateways = self.units(GATEWAY).ready cybernetics_cores = self.units(CYBERNETICSCORE).ready if gateways.exists and cybernetics_cores.exists: if self.can_afford(STALKER): await self.do(random.choice(gateways).train(STALKER)) if not cybernetics_cores.exists: if self.units(GATEWAY).ready.exists: if self.can_afford(CYBERNETICSCORE) and not self.already_pending(CYBERNETICSCORE): await self.build(CYBERNETICSCORE, near=pylon.position.towards(self.game_info.map_center, 5)) async def build_assimilator(self): for nexus in self.units(NEXUS).ready: vaspenes = self.state.vespene_geyser.closer_than(15.0, nexus) for vaspene in vaspenes: if not self.can_afford(ASSIMILATOR): break worker = self.select_build_worker(vaspene.position) if worker is None: break if not self.units(ASSIMILATOR).closer_than(1.0, vaspene).exists: await self.do(worker.build(ASSIMILATOR, vaspene)) async def build_stargate(self): if self.units(PYLON).ready.exists: pylon = self.units(PYLON).ready.random if self.units(CYBERNETICSCORE).ready.exists: if self.can_afford(STARGATE) and not self.already_pending(STARGATE): await self.build(STARGATE, near=pylon.position.towards(self.game_info.map_center, 5)) async def build_pylon(self): nexuses = self.units(NEXUS).ready if nexuses.exists: if self.can_afford(PYLON) and not self.already_pending(PYLON): await self.build(PYLON, near=self.units(NEXUS).first.position.towards(self.game_info.map_center, 5)) async def expand(self): try: if self.can_afford(NEXUS) and len(self.units(NEXUS)) < 3: await self.expand_now() except Exception as e: print(str(e)) async def do_nothing(self): wait = random.randrange(7, 100)/100 self.do_something_after = self.time + wait async def defend_nexus(self): if len(self.known_enemy_units) > 0: target = self.known_enemy_units.closest_to(random.choice(self.units(NEXUS))) for u in self.units(VOIDRAY).idle: await self.do(u.attack(target)) for u in self.units(STALKER).idle: await self.do(u.attack(target)) for u in self.units(ZEALOT).idle: await self.do(u.attack(target)) async def attack_known_enemy_structure(self): if len(self.known_enemy_structures) > 0: target = random.choice(self.known_enemy_structures) for u in self.units(VOIDRAY).idle: await self.do(u.attack(target)) for u in self.units(STALKER).idle: await self.do(u.attack(target)) for u in self.units(ZEALOT).idle: await self.do(u.attack(target)) async def attack_known_enemy_unit(self): if len(self.known_enemy_units) > 0: target = self.known_enemy_units.closest_to(random.choice(self.units(NEXUS))) for u in self.units(VOIDRAY).idle: await self.do(u.attack(target)) for u in self.units(STALKER).idle: await self.do(u.attack(target)) for u in self.units(ZEALOT).idle: await self.do(u.attack(target)) async def do_something(self): if self.time > self.do_something_after: if self.use_model: prediction = self.model.predict([self.flipped.reshape([-1, 176, 200, 3])]) choice = np.argmax(prediction[0]) else: worker_weight = 8 zealot_weight = 3 voidray_weight = 20 stalker_weight = 8 pylon_weight = 5 stargate_weight = 5 gateway_weight = 3 choice_weights = 1*[0]+zealot_weight*[1]+gateway_weight*[2]+voidray_weight*[3]+stalker_weight*[4]+worker_weight*[5]+1*[6]+stargate_weight*[7]+pylon_weight*[8]+1*[9]+1*[10]+1*[11]+1*[12]+1*[13] choice = random.choice(choice_weights) try: await self.choices[choice]() except Exception as e: print(str(e)) y = np.zeros(14) y[choice] = 1 self.train_data.append([y, self.flipped]) while True: run_game(maps.get("AbyssalReefLE"), [ Bot(Race.Protoss, SentdeBot()), #Bot(Race.Protoss, SentdeBot()), Computer(Race.Protoss, Difficulty.Easy) ], realtime=False)
Следующая статья — Python AI в StarCraft II. Часть XVII: продолжаем обучение.