-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconnection_manager.py
122 lines (100 loc) · 3.41 KB
/
connection_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import argparse
import json
import subprocess
import time
import keyring
import loguru
import requests
from passwork.password_crud import get_password, search_password
from passwork.passwork_api import PassworkAPI
from pathlib2 import Path
from typing import Callable
class Connection:
address: str
port: str
login: str
password: str
def __init__(self, address: str, port: str, login: str, password: str):
self.address = address
self.port = port
self.login = login
self.password = password
connect: Callable[[self], None | str]
class RDPConnection(Connection):
def __init__(self, address: str, port: str, login: str, password: str):
super().__init__(address, port, login, password)
def connect(self) -> None | str:
subprocess.run(
[
"cmdkey",
f"/generic:{self.address}",
f"/user:{self.login}",
f"/pass:{self.password}",
]
)
subprocess.run(["mstsc", f"/v:{self.address}:{self.port}"])
subprocess.run(["cmdkey", f"/delete:{self.address}"])
class SSHConnection(Connection):
def __init__(self, address: str, port: str, login: str, password: str):
super().__init__(address, port, login, password)
def connect(self):
subprocess.run(
[
"plink",
self.address,
"-l",
self.login,
"-P",
self.port,
"-pw",
self.password,
]
)
def get_session(file: Path) -> PassworkAPI:
with open(file, "r") as f:
config = json.load(f)
if "master" not in config or config["master"] == "":
return PassworkAPI(config["host"], config["key"])
return PassworkAPI(config["host"], config["key"], config["master"])
def main(session: PassworkAPI, file: Path) -> None:
with open(file, "r") as f:
connection = json.load(f)
try:
credentials = search_password(
session,
{
"query": connection["id"],
"tags": [],
"colors": [],
"vaultId": None,
"includeShared": True,
"includeShortcuts": False,
},
)[0]
password = get_password(session, credentials["id"])["passwordPlainText"]
except requests.exceptions.ConnectionError:
print("Can not connect to server")
return
if (protocol := connection["protocol"]) == "rdp":
RDPConnection(
connection["address"], connection["port"], credentials["login"], password
).connect()
elif protocol == "ssh":
SSHConnection(
connection["address"], connection["port"], credentials["login"], password
).connect()
class _Namespace:
connection: Path
config: Path
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("connection", type=Path)
parser.add_argument("-c", "--config", type=Path, default=Path.cwd() / "config.json")
args = parser.parse_args(namespace=_Namespace)
loguru.logger.disable("passwork")
if not args.config.exists():
print(f'Config file "{args.config}" does not exist, exiting.')
if not args.config.is_file():
print(f'"{args.config}" is not a file, exiting.')
session = get_session(args.config)
main(session, args.connection)