2
2
3
3
import copy
4
4
import random
5
+ from networkx .algorithms .shortest_paths .generic import shortest_path
6
+ import numpy as np
5
7
from collections import defaultdict
6
8
from typing import (
7
9
Dict ,
38
40
from env .tasks import HomeServiceSimplePickAndPlaceTask , HomeServiceBaseTask
39
41
40
42
AgentLocKeyType = Tuple [float , float , int , int ]
43
+ PositionKeyType = Tuple [float , float , float ]
41
44
42
45
43
46
# class ExplorerTHOR:
@@ -93,6 +96,7 @@ def __init__(
93
96
self .controller = controller
94
97
95
98
self ._include_move_left_right = include_move_left_right
99
+ self ._position_to_object_id : Dict [PositionKeyType , str ] = {}
96
100
97
101
@lazy_property
98
102
def nav_actions_set (self ) -> frozenset :
@@ -127,6 +131,7 @@ def on_reset(self):
127
131
"""Function that must be called whenever the AI2-THOR controller is
128
132
reset."""
129
133
self ._current_scene = None
134
+ self ._position_to_object_id = {}
130
135
131
136
@property
132
137
def graph (self ) -> nx .DiGraph :
@@ -860,7 +865,7 @@ def expert_action(self) -> int:
860
865
return self .expert_action_list [- 1 ]
861
866
862
867
@property
863
- def goto_action (self ) -> str :
868
+ def goto_action (self ) -> str :
864
869
if len (self .goto_action_list ) > 0 :
865
870
return self .goto_action_list .pop ()
866
871
@@ -933,6 +938,34 @@ def _get_interactable_positions(
933
938
#TODO
934
939
pass
935
940
941
+ def _expert_goto_action_to_scene_type (
942
+ self ,
943
+ scene_type : str
944
+ ) -> Optional [str ]:
945
+ if len (self .goto_action_list ) == 0 :
946
+ if not self .task ._1st_check :
947
+ for _ in range (4 ):
948
+ self .goto_action_list .append ("RotateRight" )
949
+ self .task ._1st_check = True
950
+ else :
951
+ if not self .task ._took_goto_action :
952
+ self .goto_action_list .append (f"Goto{ scene_type } " )
953
+ elif not self .task ._2nd_check :
954
+ for _ in range (4 ):
955
+ self .goto_action_list .append ("RotateRight" )
956
+ self .task ._2nd_check = True
957
+
958
+ goto_action = self .goto_action
959
+ if len (self .goto_action_list ) == 0 :
960
+ if self .task ._2nd_check :
961
+ self .task ._check_goto_done = True
962
+ elif self .task ._1st_check and (
963
+ SCENE_TO_SCENE_TYPE [self .task .env .scene ] == scene_type
964
+ ):
965
+ self .task ._check_goto_done = True
966
+
967
+ return goto_action
968
+
936
969
def _expert_nav_action_to_obj (
937
970
self ,
938
971
obj : Dict [str , Any ]
@@ -948,6 +981,8 @@ def _expert_nav_action_to_obj(
948
981
]
949
982
950
983
if len (target_keys ) == 0 :
984
+ print (f'No target keys' )
985
+ import pdb ; pdb .set_trace ()
951
986
return None
952
987
953
988
source_state_key = shortest_path_navigator .get_key (env .get_agent_location ())
@@ -959,6 +994,7 @@ def _expert_nav_action_to_obj(
959
994
source_state_key = source_state_key , goal_state_keys = target_keys ,
960
995
)
961
996
except nx .NetworkXNoPath as _ :
997
+ print ('h?' )
962
998
return None
963
999
964
1000
if action != "Pass" :
@@ -977,6 +1013,63 @@ def _expert_nav_action_to_obj(
977
1013
978
1014
return None
979
1015
1016
+ def _expert_nav_action_to_position (self , position ) -> Optional [str ]:
1017
+ """Get the shortest path navigational action towards the certain position
1018
+
1019
+ """
1020
+ env : HomeServiceTHOREnvironment = self .task .env
1021
+ shortest_path_navigator = self .shortest_path_navigator
1022
+
1023
+ if isinstance (position , np .ndarray ):
1024
+ position_key = (round (position [0 ], 2 ), round (position [1 ], 2 ), round (position [2 ], 2 ))
1025
+ position = dict (
1026
+ x = position [0 ],
1027
+ y = position [1 ],
1028
+ z = position [2 ],
1029
+ )
1030
+ elif isinstance (position , dict ):
1031
+ position_key = (round (position ['x' ], 2 ), round (position ['y' ], 2 ), round (position ['z' ], 2 ))
1032
+
1033
+ if position_key not in shortest_path_navigator ._position_to_object_id :
1034
+ # Spawn the TargetCircle and place it on the position
1035
+ event = env .controller .step ("SpawnTargetCircle" , anywhere = True )
1036
+ assert event .metadata ["lastActionSuccess" ]
1037
+ id_target_circle = event .metadata ["actionReturn" ]
1038
+
1039
+
1040
+ event = env .controller .step (
1041
+ "TeleportObject" ,
1042
+ objectId = id_target_circle ,
1043
+ position = position ,
1044
+ rotation = 0 ,
1045
+ forceAction = True
1046
+ )
1047
+ assert event .metadata ["lastActionSuccess" ]
1048
+
1049
+ # To change objectId for former target circle
1050
+ event = env .controller .step ("SpawnTargetCircle" , anywhere = True )
1051
+ assert event .metadata ["lastActionSuccess" ]
1052
+ id_target_circle = event .metadata ["actionReturn" ]
1053
+
1054
+ event = env .controller .step ("RemoveFromScene" , objectId = id_target_circle )
1055
+ assert event .metadata ["lastActionSuccess" ]
1056
+
1057
+ # check
1058
+ target_circle_after_teleport = next (
1059
+ obj for obj in env .last_event .metadata ['objects' ]
1060
+ if obj ['objectType' ] == "TargetCircle" and obj ["position" ] == position
1061
+
1062
+ )
1063
+ assert target_circle_after_teleport is not None
1064
+ shortest_path_navigator ._position_to_object_id [position_key ] = target_circle_after_teleport ['objectId' ]
1065
+
1066
+ object_id = shortest_path_navigator ._position_to_object_id [position_key ]
1067
+ obj = next (
1068
+ obj for obj in env .last_event .metadata ['objects' ]
1069
+ if obj ['objectId' ] == object_id
1070
+ )
1071
+ return self ._expert_nav_action_to_obj (obj = obj )
1072
+
980
1073
def _invalidate_interactable_loc_for_pose (
981
1074
self ,
982
1075
location : Dict [str , Any ],
@@ -1051,12 +1144,22 @@ def _generate_expert_action_dict(self) -> Dict[str, Any]:
1051
1144
return dict (action = "Done" )
1052
1145
1053
1146
elif subtask_action == "Goto" :
1054
- return dict (action = "Goto" , sceneType = subtask_target )
1147
+ expert_goto_action = self ._expert_goto_action_to_scene_type (
1148
+ scene_type = subtask_target
1149
+ )
1150
+ if expert_goto_action is None :
1151
+ raise RuntimeError
1152
+
1153
+ return dict (action = expert_goto_action )
1055
1154
1056
1155
elif subtask_action == "Scan" :
1057
- pass
1156
+ # TODO
1157
+
1158
+ # return dict(action="HEHEE")
1159
+ return dict (action = "Pass" )
1058
1160
1059
1161
elif subtask_action == "Navigate" :
1162
+ # from metadata
1060
1163
with include_object_data (env .controller ):
1061
1164
current_objects = env .last_event .metadata ["objects" ]
1062
1165
@@ -1068,8 +1171,11 @@ def _generate_expert_action_dict(self) -> Dict[str, Any]:
1068
1171
)
1069
1172
assert target_obj is not None
1070
1173
self ._last_to_interact_object_pose = target_obj
1071
- expert_nav_action = self ._expert_nav_action_to_obj (
1072
- obj = target_obj
1174
+ # expert_nav_action = self._expert_nav_action_to_obj(
1175
+ # obj=target_obj
1176
+ # )
1177
+ expert_nav_action = self ._expert_nav_action_to_position (
1178
+ position = self .task .target_positions [subtask_target ]
1073
1179
)
1074
1180
1075
1181
if expert_nav_action is None :
0 commit comments