Select Git revision
schema.sqlite.sql
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
hanabi_smc.py 36.28 KiB
#--->
# It's a dirty trick to deal with 'scripts' folder
# Otherwise, this file can be copy/paste near to 'src' folder
# and launched with 'python3 -O XXX.py'
import os, sys
p = os.path.abspath('scripts')
sys.path.insert(1, p)
#<-- End of the trick
from src.model.SMCPDEL.pySMCDEL import *
from src.model.SMCPDEL.pySMCPDEL import ProbaStructure, ProbaTransformer
from src.model.datastructure.add.add_real_function import ADDManager
from src.model.epistemiclogic.formula.formula import *
from src.model.epistemiclogic.examples.example import Example
from src.io.colors import *
def earn_red_atom(c, v, i):
return f"{c}{v}_{i}_red"
def get_hanabi_symbolic_events(nbA, nbC, nbCH, manager, prof, V, logtime=None, cache=None):
ex = HanabiExampleExplicit(nbA, nbC, nbCH, tokens=True)
turns = tuple(ex.turns_variables)
with Timer("Create Events", incr=1, show=False):
events = ex.getEventModels(addArc=False)
blues = tuple(ex.blue_tokens_variables)
reds = tuple(ex.red_tokens_variables)
cache = {} if cache is None else cache
@memoize(cache)
def get_use_token(vars): # RETURN DICT POST
post = {vars[0]: Not(Atom(vars[0]))}
for i, b in enumerate(vars[1:]):
f_and = MAnd(*list(map(Not, list(map(Atom, vars[:i+1])))))
post[b] = ite(f_and, Not(Atom(b)), Atom(b))
return post
@memoize(cache)
def get_earn_token(vars): # RETURN DICT POST
post = {vars[0]: Not(Atom(vars[0]))}
for i, b in enumerate(vars[1:]):
f_and = MAnd(*list(map(Atom, vars[:i+1])))
post[b] = ite(f_and, Not(Atom(b)), Atom(b))
return post
@memoize(cache)
def get_change_turn(): # RETURN DICT POST
post = {}
for i, turn in enumerate(turns):
post[turn] = ite(Atom(turns[(i-1)%len(turns)]), Top(), Bot())
return post
@memoize(cache)
def possible(ex, color, value): # RETURN FORMULA
needed = [(color2, value2, id2) for color2, value2, id2 in ex.pull_cards if
color == color2 and value2 < value]
formula = Top()
for v in range(1, value):
formula = And(formula,
Exactly(1, [getCardVariable(c, v2, i, TABLE_NAME) for c, v2, i in needed if v2 == v]))
return And(formula, Not(OrOfList(
[Atom(getCardVariable(color2, value2, id2, TABLE_NAME)) for color2, value2, id2 in ex.pull_cards if
color == color2 and value2 == value])))
@memoize(cache)
def has_card(ex, a, pos): # RETURN FORMULA
return MOr(*[Atom(getCardVariable(color, value, i, a, pos)) for color, value, i in ex.pull_cards])
no_obs = {a: [] for a in ex.getAgents()}
kts = []
with Timer("Events copy", show=False):
for i, e in enumerate(events):
with Timer(f"in loop {e.getName()}", incr=1, show=False):
# keep announcement as Explicit
if "Announce" in str(e):
annonce = e[0]
pointed = annonce.getPointedEvent()
theta_ = {}
for k, f in get_use_token(blues).items():
theta_[k] = f
for k, f in get_change_turn().items():
theta_[k] = f
kt = KnowledgeTransformer(
V,
[],
FormulaPrecondition(pointed.pre, manager, V, cache=cache),
no_obs,
[],
manager,
v_=list(list(theta_.keys())),
theta_=theta_, logtime=logtime,
name=str(annonce.getName()), precondition=pointed.pre, cache=cache)
kts.append(kt)
with Timer("PLAYS", show=False, incr=1):
incr_red = get_earn_token(reds)
for agent in ex.getAgents():
for p in range(0, ex.nb_cards_in_hands):
with Timer("Theta plays", show=False):
for c, v, i in ex.pull_cards:
for poss in [True, False]:
theta_ = {}
possible_formula = possible(ex, c, v) if poss else Not(possible(ex, c, v))
precond = MAnd(Atom(getTurnVariable(agent)), Atom(getCardVariable(c, v, i, agent, p)),
possible_formula)
name = get_plays_name(agent, c, v, i, p, poss)
theta = precond
if not poss:
# C-cvi = True if not a possible q else False
# print("-", getCardVariable(c, v, i, DISCARD_NAME))
theta_[getCardVariable(c, v, i, DISCARD_NAME)] = Atom(getCardVariable(c, v, i, agent, p))
# incr_red ; incr if the event is not playable, then, keep it
for red_bit, f in incr_red.items():
# print("-", red_bit)
theta_[red_bit] = f
else:
# play on table if possible event, else
theta_[getCardVariable(c, v, i, TABLE_NAME)] = Atom(
getCardVariable(c, v, i, agent, p))
### shift cards
# a0-cvi = Bot
theta_[getCardVariable(c, v, i, agent, 0)] = Bot()
# a(i+1)-cvi = ai-cvi
for ip in range(int(p), 0, -1):
# print("-", getCardVariable(c, v, i, agent, ip))
theta_[getCardVariable(c, v, i, agent, ip)] = Atom(
getCardVariable(c, v, i, agent, ip - 1))
# print("Obs : ", obs)
with Timer("to KT plays", show=False, incr=2):
kt = KnowledgeTransformer(V, [], FormulaPrecondition(theta, manager, V, cache=cache),
no_obs, [], manager, v_=list(theta_.keys()), theta_=theta_,
logtime=logtime, name=name, precondition=precond, cache=cache)
kts.append(kt)
with Timer("DISCARD", show=False, incr=1):
incr_blue = get_earn_token(blues)
for agent in ex.getAgents():
for p in range(0, ex.nb_cards_in_hands):
with Timer("Theta plays", show=False):
for c, v, i in ex.pull_cards:
theta_ = {}
precond = MAnd(Atom(getTurnVariable(agent)), Atom(getCardVariable(c, v, i, agent, p)))
name = get_discards_name(agent, c, v, i, p)
theta = precond
# C-cvi = True if not a possible q else False
# print("-", getCardVariable(c, v, i, DISCARD_NAME))
theta_[getCardVariable(c, v, i, DISCARD_NAME)] = Atom(getCardVariable(c, v, i, agent, p))
# incr_red ; incr if the event is not playable, then, keep it
for blue_bit, f in incr_blue.items():
# print("-", red_bit)
theta_[blue_bit] = f
### shift cards
# a0-cvi = Bot
theta_[getCardVariable(c, v, i, agent, 0)] = Bot()
# a(i+1)-cvi = ai-cvi
for ip in range(int(p), 0, -1):
# print("-", getCardVariable(c, v, i, agent, ip))
theta_[getCardVariable(c, v, i, agent, ip)] = Atom(
getCardVariable(c, v, i, agent, ip - 1))
# print("Obs : ", obs)
with Timer("to KT discard", show=False, incr=2):
kt = KnowledgeTransformer(V, [], FormulaPrecondition(theta, manager, V, cache=cache),
no_obs, [], manager, v_=list(theta_.keys()), theta_=theta_,
logtime=logtime, name=name, precondition=precond, cache=cache)
kts.append(kt)
# DRAWING
# CLIQUE FOR ATOMIC EVENT : a draws D-c-v-i
with Timer("DRAW", show=False, incr=1):
for a in ex.getAgents():
qs = []
theta = Bot()
theta_ = {}
preconds = {}
liste = []
for c, v, i in ex.pull_cards:
q = get_draw_name(a, c, v, i)
qs.append(q)
liste.append((q, (c, v, i)))
for q, (c, v, i) in liste:
precond = And(Not(has_card(ex, a, 0)), Atom(getCardVariable(c, v, i, DRAW_NAME)))
preconds[q] = precond
theta = Or(theta, And(precond, AndOfVarList({q2: q2 == q for q2 in qs})))
theta_[getCardVariable(c, v, i, DRAW_NAME)] = ite(Atom(q),
Not(Atom(getCardVariable(c, v, i, DRAW_NAME))),
Atom(getCardVariable(c, v, i, DRAW_NAME)))
theta_[getCardVariable(c, v, i, a, 0)] = ite(Atom(q),
Not(Atom(getCardVariable(c, v, i, a, 0))),
Atom(getCardVariable(c, v, i, a, 0)))
for k, f in get_change_turn().items():
theta_[k] = f
obs = {a2: [] if a == a2 else qs for a2 in ex.getAgents()}
# print("Obs : ", obs)
with Timer("to KT draw", show=False, incr=2):
for i, q in enumerate(qs):
kt = KnowledgeTransformer(V, qs, FormulaPrecondition(theta, manager, V + qs, cache=cache), obs, [q],
manager, v_=list(theta_.keys()), theta_=theta_, name=q,
logtime=logtime, precondition=preconds[q], cache=cache)
kt.all_preds = preconds
kts.append(kt)
return kts
def get_draw_name(a, c, v, i):
return f"{a}-draws-D-{c}{v}{i}"
def get_plays_name(a, c, v, i, p, possible):
return f"{a}-plays-pos{p}-{c}{v}{i}:{'T' if possible else '⊥'}"
def get_discards_name(a, c, v, i, p):
return f"{a}-discards-pos{p}-{c}{v}{i}"
class HanabiSMC(Example):
def __init__(self, s5, nbA, nbC, nbCH, random, order=["cards", "prime", "position", "agents", "after"], cleaning=False, prof=0, logtime=None, proba=False,
manager=None, inter=False, tokens=True, normalize=True):
print(f"HanabiSMC : intercale={inter}, s5={s5}, nbA={nbA}, nbC={nbC}, nbH={nbCH},"
f"random={random}, order={order}, cleaning={cleaning}, prof={prof}, proba={proba}, debug={__debug__}")
self.s5 = s5 #True: S5, else: KD45
self.nbA = nbA
self.nbC = nbC
self.nbCH = nbCH
self.random_generation = random
self.cleaning = cleaning
self.prof = prof
self.order = order
self.proba = proba
self.inter = inter
self.tokens = tokens
self.normalize = normalize
self.manager_type = ADDManager if manager is None else manager
self.ks = None
self.kts = None
if logtime is None:
self.logtime = {}
else:
self.logtime = logtime
def init_agents_relations(self):
symb_rels = {}
for agent_current in self.agents:
liste_rel = []
# Reciprocity of cards : agent does'nt see all variables of himself and draw
vars = [
Atom(getCardVariable(color, value, id, agent, position))
for color, value, id in self.pull_cards
for position in self.positions
for agent in [a for a in self.agents if a != agent_current]
]
vars += [Atom(getCardVariable(color, value, id, TABLE_NAME))
for color, value, id in self.pull_cards
]
vars += [Atom(getCardVariable(color, value, id, DISCARD_NAME))
for color, value, id in self.pull_cards
]
for v in vars:
liste_rel.append(v)
formules = [
Exactly(n, [getCardVariable(color, value, id, agent_current, position)
for color, value, id in self.pull_cards
for position in self.positions]
)
for n in range(0, self.nb_cards_in_hands + 1)]
for f in formules:
liste_rel.append(f)
form = Top()
for rel in liste_rel:
form = And(form, Equiv(rel, rel.rename(self.rename_to_prime)))
#print(rel, "<->", rel.rename(self.rename_to_prime))
symb_rels[agent_current] = form
self.symb_rels = symb_rels
#print("SYMB_RELS", self.symb_rels["a"])
return self.symb_rels
def init_rules(self, prime=True):
# Unicity of cards : a card is here only once : a:1 but no b:1 ...
formulas = []
for color, value, id in self.ex.pull_cards:
liste = [getCardVariable(color, value, id, agent, position)
for agent in self.ex.agents
for position in self.positions]
liste += [getCardVariable(color, value, id, TABLE_NAME), getCardVariable(color, value, id, DRAW_NAME), getCardVariable(color, value, id, DISCARD_NAME)]
formulas.append(Exactly(1, liste))
if prime:
formulas.append(Exactly(1, [getPrimedVariable(var) for var in liste]))
for agent in self.agents:
for position in self.positions:
liste = []
for color, value, id in self.pull_cards:
liste += [getCardVariable(color, value, id, agent, position)]
formulas.append(Exactly(1, liste))
if prime:
formulas.append(Exactly(1, [getPrimedVariable(var) for var in liste]))
if prime:
for var in self.turns_variables:
formulas.append(Equiv(Atom(var), Atom(getPrimedVariable(var))))
if self.tokens and prime:
for var in self.red_tokens_variables + self.blue_tokens_variables:
formulas.append(Equiv(Atom(var), Atom(getPrimedVariable(var))))
formulas.append(Exactly(1, self.turns_variables))
self.rules = AndOfList(formulas)
return self.rules
def getEpistemicModel(self):
if self.ks is not None:
# ALready calculated
return self.ks
#t1 = time.time()
self.ex = HanabiExampleExplicitProba(self.nbA, self.nbC, nbCardsInHand=self.nbCH, tokens=self.tokens,
random_pointed=self.random_generation)
self.V = self.ex.simple_varnames
self.agents = self.ex.agents
self.pull_cards = self.ex.pull_cards
self.nb_cards_in_hands = self.nbCH
self.positions = [i for i in range(0, self.nb_cards_in_hands)]
self.pull_cards = self.ex.pull_cards
self.agents = self.ex.agents
self.turns_variables = self.ex.turns_variables
if self.ex.tokens:
self.red_tokens_variables = self.ex.red_tokens_variables
self.blue_tokens_variables = self.ex.blue_tokens_variables
self.primes = [prime(v) for v in self.V]
self.rename_to_prime = {v: getPrimedVariable(v) for v in self.ex.simple_varnames } #+ self.ex.after_varnames}
manager_voc = generate_order(self.ex.simple_varnames, self.prof, getAfterSymbol())
# id for draw cards
if self.order != []:
new_order = self.ex.get_order_with_priority(self.order)
event_ids = []
if self.prof > 0: # Should not be used without history
for draw_atom in self.ex.draw_variables:
# INSERT DRAW_ID_EVENT NEAR TO DRAW_VARIABLE
d, c, v, i = read_atome(draw_atom)
for a in self.agents[::-1]:
a_draws_cvi = get_draw_name(a, c, v, i)
event_ids.append(a_draws_cvi)
new_order.insert(new_order.index(draw_atom) + 1, a_draws_cvi)
else:
new_order = [el for el in new_order if not getAfterSymbol() in el]
# remove primes to put them later
blocks = []
block = []
for var in new_order:
if getPrimeSymbol() in var:
blocks.append(block)
block = []
else:
block.append(var)
copy_histo = []
for block in blocks:
# add historic variants
new_block = []
for v in block:
if not (getAfterSymbol() not in v and getPrimeSymbol() not in v):
# la variable est déjà modifiée
# print("copy_histo modif", v)
copy_histo.append(v)
new_block.append(v)
else:
symbol = get_id_symbol() if v in event_ids else getAfterSymbol()
# variant in historic of events
variants = []
for variant in generate_order([v], self.prof, symbol, getprime=False):
variants.append(variant)
for var in variants:
copy_histo.append(var)
# print("copy_histo variant", var)
new_block.append(var)
if self.inter:
copy_histo.append(prime(var))
for v in new_block:
copy_histo.append(prime(v))
else:
copy_histo = manager_voc
self.manager = self.manager_type.create(order=copy_histo, tmp=False)
# print(f"Order : (len={len(manager.global_variables_names)})", manager.global_variables_names)
pointed = [k for k, v in self.ex.init_pointed_world(getdict=True).items() if v]
# print("Primes :", self.primes)
Obs = {a: [] for a in self.agents}
deck = [getCardVariable(c, v, i, DRAW_NAME) for c, v, i in self.pull_cards]
#print("Deck : ", deck)
for a in self.agents:
#print("\n", a)
for v in self.V:
if v not in self.ex.player_cards_variables_dict[a] and v not in deck:
Obs[a].append(v)
#print("append", v)
rules = self.init_rules(prime=False)
ks = KnowledgeStructure(self.V, rules, Obs, manager=self.manager, pointed=pointed) #, logtime=self.logtime)
#print("Vocabulary", V)
if not self.s5:
with Timer("TO KD45 KS", show=False):
#print("Omega", ks.o)
ks = KnowledgeStructure.to_KD45(ks)
#for a, o in ks.o.items():
# print(f"Sizes OMEGA[a]: {o.get_size()}")
all_variables = self.V + [prime(v) for v in self.V]
if self.proba:
with Timer("create PI", show=False, incr=1) as tpi:
assert not self.s5, "to get proba, we need KD45"
#print("rules")
#print(rules)
#relations = self.init_agents_relations()
toprime_varnames = {p:prime(p) for p in rules.getVariables()}
rules = rules.rename(toprime_varnames)
real_functions = {}
for a in self.agents:
with Timer("real_functions", show=False, father=tpi):
real_functions[a] = ks.omega[a].apply("x", self.manager.from_formula(Top()))
#symb_rules = self.manager.from_formula(rules, vars=self.V, cache=ks.cache)
#symb_rels = self.manager.from_formula(relations[a], all_variables, cache=ks.cache)
#and_symbs = symb_rules.apply("x", symb_rels, cache=ks.cache)
#print(f"Sizes : symb_rules={symb_rules.get_size()}, symb_rels={symb_rels.get_size()}, and_symbs={and_symbs.get_size()}")
#print("Vocabulary", all_variables, "(R0)" in all_variables)
#real_functions[a] = and_symbs
if self.normalize:
with Timer("probabilisation", incr=2, show=False, father=tpi):
pi = {}
for a, real_function in real_functions.items():
prime_varnames = [p for p in real_function.getVariables() if is_primed(p)]
res = ks.omega[a].apply("x", self.manager.from_formula(rules))
pi[a] = res.marginalize_and_normalize(prime_varnames, cache=ks.cache)
else:
pi = real_functions
with Timer("Creation ProbaStructure", show=False):
ks = ProbaStructure(self.V, ks.state_law, ks.omega, pi, pi_in_theta=True, manager=self.manager,
pointed=pointed, isnormalized=self.normalize, cache=ks.cache)
self.ks = ks
return ks
def getEventModels(self):
if self.kts is not None:
return self.kts
if self.ks is None:
raise Exception("Please, call .getEpistemicModel() before call .getEventModels().")
kts = get_hanabi_symbolic_events(self.nbA, self.nbC, self.nbCH, self.ks.manager, self.prof, self.V, cache=self.ks.cache) #, logtime=self.logtime)
# Put id of events in ADDManager variables to get a good order
for kt in kts:
for v in kt.vocabulary:
for variant in generate_order([v], self.prof, get_id_symbol(), getprime=False):
self.ks.manager.add_var(variant)
self.ks.manager.add_var(prime(variant))
if not self.s5:
with Timer("TO KD45 events", show=False):
kts = [Transformer.to_KD45(kt, cache=self.ks.cache) for kt in kts]
if self.proba:
assert not self.s5, "to get proba, we need KD45"
with Timer("TO PDEL events", show=False):
kts2 = []
for kt in kts:
pt = ProbaTransformer.toPDEL(kt)
kts2.append(pt)
kts = kts2
self.kts = kts
return kts
def apply(self, kt):
res = self.ks
if self.cleaning:
clean = [e for e in self.ks.manager.global_variables_names if
get_id_symbol() in e or getAfterSymbol() in e]
#print("Clean :", clean)
else:
clean = []
res = res.apply(kt, logtime=self.logtime, cleaning=clean)
return res
def getAgents(self):
return self.ks.omega.keys()
def getWorldName(self, valuation):
return ",".join(valuation)
def get_formulas_to_check(self, proba=False):
res = self.ex.get_formulas_to_check(proba=proba)
if self.s5:
return res
resl = []
for fname, f in res:
resl.append((fname, f.replace_operator(K, Box_a)))
return resl
def get_actions_to_apply(self, actions):
import random
if self.kts is None:
raise Exception("Call .getEventModels() before to call .get_actions_to_apply() to create Events.")
liste = []
for action in actions:
for event in self.kts:
e_name = event.name if isinstance(event, Transformer) else event[0].name
"""
if ("plays" in e_name or "discards" in e_name) and action in e_name:
draw_liste = self.get_draws(e_name[0])
draw_act = draw_liste[0] if not self.random_generation else random.choice(draw_liste)
if isinstance(event, list):
liste.append([*event, draw_act])
else:
liste.append([event, draw_act])
#liste.append(event)
"""
if action in e_name:
liste.append(event)
#print("FINDS")
#for event in liste:
# print(event.name if isinstance(event, Transformer) else " ".join([e.name for e in event]))
return liste
def get_draws(self, a):
if self.kts is None:
raise Exception("Call .getEventModels() before to call .get_actions_to_apply() to create Events.")
liste = []
draw_atomes = [f"{a}-draws-{atome}" for atome in self.ks.pointed if DRAW_NAME in atome]
for event in self.getEventModels():
e_name2 = event.name if isinstance(event, Transformer) else event[0].name
if e_name2 in draw_atomes:
liste.append(event)
return liste
def play(self, ks, action):
actions = self.get_actions_to_apply([action])
# print([a.name for al in actions for a in al])
for action in actions:
ks = ks.apply(action, cleaning=[])
return ks
def check(self, ks: Structure, logtime={}, seuil=None):
print(Color.get("\n>> MODEL CHECKING", Color.CVIOLET, Color.CBOLD))
print("Pointed:", sorted(ks.pointed))
if seuil is None and isinstance(ks, ProbaStructure):
n_pointed = []
for p in ks.pointed:
if get_id_symbol() not in p and getAfterSymbol() not in p:
n_pointed.append(p)
nbD = len([v for v in n_pointed if DRAW_NAME in v])
seuil = 1 / ((nbD + self.nb_cards_in_hands)) - 0.001
print("THRESHOLD=", seuil)
else:
seuil = 0.0
assert isinstance(seuil, float)
n_pointed = []
formulas = []
for a in ks.omega.keys():
for p in ks.pointed:
if get_id_symbol() not in p and getAfterSymbol() not in p:
n_pointed.append(p)
#if not self.s5:
# formulas.append(Box_a(a, Atom(p)))
#else:
# formulas.append(K(a, Atom(p)))
if isinstance(ks, ProbaStructure) and p[0] == a:
formulas.append(Pr(a, Atom(p), ">=", seuil))
formulas.append(Pr(a, Atom(p), ">=", seuil + 0.002))
formulas.append(Pr("b", Pr(a, Atom(p), ">=", seuil), ">=", 1))
formulas.append(Pr("b", Pr(a, Atom(p), ">=", seuil + 0.002), ">=", 1))
formulas.append("")
for test in formulas:
if "Pr[b](Pr[a]'a1-R12'>=0.24)>=1 " in str(test):
print(Color.get("\n>>" + str(test), Color.CVIOLET, Color.CBOLD))
#print("Theta")
#ks.theta.print(limit=20, trueAtoms=True)
print(">>", str(test), Color.get(ks.modelCheck(test, logtime=logtime, show=True, cache=None), Color.CBOLD))
print(Color.get(f"\n<< {test}", Color.CVIOLET, Color.CBOLD))
elif test == "":
print()
else:
# pass
with Timer("Model check", show=False, incr=1):
#print(test)
res = ks.modelCheck(test, logtime=logtime, show=False) #, cache=ks.cache)
print(">>", str(test), Color.get(res, Color.CBOLD))
print("Pointed (flat):", sorted(list(set(n_pointed))))
print(Color.get("<< MODEL CHECKING \n", Color.CVIOLET, Color.CBOLD))
return len(formulas)
def playable_actions(self, ks, ts, show=False):
"""
Permit to list all playable actions
:param ks: Structure
:param ts: list of Transformer
:param show: if show list all actions in console.
:return:
"""
liste = []
if show: print(Color.get("\nPLAYABLE ACTIONS :", Color.CUNDERLINE, Color.CBOLD))
for i, kt in enumerate(ts):
if isinstance(kt, Transformer):
if kt.precondition is None:
if show: print("1 ERROR", kt.name)
else:
if ks.modelCheck(kt.precondition):
if show: print(i, Color.get(kt.name, Color.CGREEN), kt.precondition)
liste.append(kt)
# else:
# if show: print(i, Color.get(kt.name, Color.CRED), kt.precondition)
else:
if kt[0].precondition is None:
if show: print("2 ERROR ", kt[0].name)
else:
if ks.modelCheck(kt[0].precondition):
if show: print(i, Color.get("-".join(t.name if isinstance(t, Transformer) else t for t in kt),
Color.CGREEN))
liste.append(kt)
if show: print()
return liste
def print_datas(current, S5, proba):
pointed_symb = current.manager.from_formula(
AndOfVarList({v: v in current.pointed for v in current.vocabulary})
)
print(current)
print("Pointed")
pointed_symb.print(trueAtoms=True)
if not S5:
print("Rel a")
current.omega["a"].print(limit=40, trueAtoms=True)
print("Rel a x pointed")
current.omega["a"].apply("x", pointed_symb).print(limit=20, trueAtoms=False)
if proba:
print("pi a")
current.pi["a"].print(limit=100, trueAtoms=True)
print("pi conditionned")
current.pi["a"].conditionning({"a0-R11": True}).print(limit=100, trueAtoms=True)
print("pi x pointed a")
res = current.pi["a"].apply("x", pointed_symb)
res.print(limit=20, trueAtoms=True)
print("pi x pointed b")
res = current.pi["b"].apply("x", pointed_symb)
res.print(limit=20, trueAtoms=True)
notprimes = [ p for p in current.pi["a"].getVariables() if not is_primed(p)]
print("pi x pointed marginalized")
res.marginalization(notprimes).print(limit=20, trueAtoms=True)
#print("succ pi b")
#current.pi["b"].apply("x", pointed_symb).print(limit=20, trueAtoms=True)
print("POINTED is : ", current.pointed)
def store_datas(ks, i, name, tps_mc, nbf, tps_pu):
global DATAS
DATAS[i] = {}
DATAS[i]["Dernière action"] = name
DATAS[i]["SIZE-Law-terminals"] = ks.state_law.get_nodes_informations()["Terminals"]
DATAS[i]["SIZE-Law-noterminals"] = ks.state_law.get_nodes_informations()["NoTerminals"]
DATAS[i]["Time-MC"] = tps_mc
DATAS[i]["Nb_formulas"] = nbf
DATAS[i]["Time-PU"] = tps_pu
for agent in ks.omega.keys():
DATAS[i][f"SIZE-OM-{agent}-terminals"] = ks.omega[agent].get_nodes_informations()["Terminals"]
DATAS[i][f"SIZE-OM-{agent}-noterminals"] = ks.omega[agent].get_nodes_informations()["NoTerminals"]
DATAS[i][f"SIZE-PI-{agent}-terminals"] = ks.pi[agent].get_nodes_informations()["Terminals"]
DATAS[i][f"SIZE-PI-{agent}-noterminals"] = ks.pi[agent].get_nodes_informations()["NoTerminals"]
print(DATAS[i])
if __name__ == "__main__":
PARSER = False
if PARSER:
print(">> WITH PARSER")
from my_args import arg_parser
args = arg_parser()
nbCards = args.nbCards
nbA = args.agents
nbCH = args.chands
normalize = args.norm
S5 = args.s5
proba = args.probability
prof = args.prof
random = args.random
modelcheck = False if args.modelcheck == -1 else True
product_update = False if args.productupdate == -1 else True
else:
print(">> WITHOUT PARSER")
nbCards = 12
nbA = 2
nbCH = 2
normalize = False
S5 = False
proba = True
prof = 10
random = False
modelcheck = True
product_update = True
import sys
sys.setrecursionlimit(2000)
tokens = True
Structure.filter_transformer = True
print(nbCards,nbA,nbCH,normalize,S5,proba,prof,random,modelcheck,product_update)
assert (not proba) or (proba and not S5), "to get 'ProbaStructure', we need KD45 form (not S5)."
assert not (
product_update and prof == 0), "If you want to use product_update, we certainly need to has historic and prof > 0."
#fd = FreshDistributor()
with Timer("ALL TIME", show=True) as timer_all_time:
seuil = 1/((nbCards-nbCH)+1)
action_names = ["a-discard-pos0-R11"]
action_names = ["a-plays-pos0-R11:T"]
#action_names = ["Announce for b : 1 at [0]"]
action_names = [
"Announce for b : 1 at [0]",
"b-plays-pos0-R13:T",
f"b-draws-{DRAW_NAME}-R22",
"a-discards-pos0-R11",
f"a-draws-{DRAW_NAME}-R31",
"Announce for a : 1 at [1]"
]
cleaning = False
# print("seuil = ", seuil)
order = ['cards', 'prime', 'position', 'agents', 'after'] #24,64s
order = ["cards", "agents", "position", "prime", "after"] #17.38
logtime = {}
with Timer("get HanabiSMC Structures", show=False):
hanabi_smc = HanabiSMC(S5, nbA, nbCards, nbCH, random, manager=ADDManager, prof=prof,
logtime=logtime, order=order, cleaning=False,
proba=proba, tokens=tokens, normalize=normalize)
with Timer("getEpistemicModel", show=False):
ks = hanabi_smc.getEpistemicModel()
if prof > 0 or product_update:
with Timer("getEventModels", show=False):
ts = hanabi_smc.getEventModels()
current = ks
# print_datas(current, S5, proba)
if modelcheck:
with Timer("mci", show=False) as timer_mc:
nbf = hanabi_smc.check(current, logtime, hanabi_smc.s5)
DATAS = {}
store_datas(ks, 0, "Begin", timer_mc.t, nbf, 0)
if product_update:
with Timer("Product_time", show=False):
print("POINTED is : ", current.pointed)
print(action_names)
actions = hanabi_smc.get_actions_to_apply(action_names)
print([a.name if isinstance(a, Transformer) else a[0].name for a in actions])
for i, a in enumerate(actions):
# hanabi_smc.playable_actions(current, ts, show=True)
subname = a.name if isinstance(a, Transformer) else a[0].name
print(Color.get(f"STEP {i} APPLY : {subname}", Color.CBLUE, Color.CBOLD))
if cleaning:
clean = [e for e in current.manager.global_variables_names if
get_id_symbol() in e or getAfterSymbol() in e]
print("Clean", clean)
else:
clean = []
with Timer("Product Update in", show=False) as timer_pu:
current = current.apply(a, cleaning=clean)
# print_datas(new, S5, proba)
n_pointed = []
for p in current.pointed:
if get_id_symbol() not in p and getAfterSymbol() not in p:
n_pointed.append(p)
print("New pointed (flat) :", n_pointed)
if modelcheck:
with Timer("mci", show=False) as timer_mc:
nb_formulas = hanabi_smc.check(current, logtime, hanabi_smc.s5)
store_datas(current, i+1, subname, timer_mc.t, nb_formulas, timer_pu.t)
#print("Structure.clean_transformer", Structure.filter_transformer, "NbC", nbCards, "nbA", nbA, "nbCH", nbCH,
# "S5", S5, "Proba", proba, "Prof", prof, "Random", random, "Seuil", seuil, "MC", modelcheck,
# "PU", product_update)
Timer.memory.print(threshold=0)
from pprint import pprint
DATAS["All_time"] = timer_all_time.t
pprint(DATAS)
import pickle
file_pi = open(f"nbA{nbA}-nbH{nbCH}-nbC{nbCards}-norm={normalize}.pkl", 'wb')
pickle.dump(DATAS, file_pi)