|
29 | 29 | vol.Optional("config_file", default="databricks.json.j2"): str,
|
30 | 30 | vol.Optional("name", default=""): str,
|
31 | 31 | vol.Optional("lang", default="python"): vol.All(str, vol.In(["python", "scala"])),
|
32 |
| - vol.Optional("run_immediately", default=True): bool, |
| 32 | + vol.Optional("run_stream_job_immediately", default=True): bool, |
33 | 33 | vol.Optional("arguments", default=[{}]): [{}],
|
34 | 34 | vol.Optional("schedule"): {
|
35 | 35 | vol.Required("quartz_cron_expression"): str,
|
@@ -88,14 +88,14 @@ def deploy_to_databricks(self):
|
88 | 88 | job_name = f"{app_name}-{self.env.artifact_tag}"
|
89 | 89 | job_config = self.create_config(job_name, job)
|
90 | 90 | is_streaming = self._job_is_streaming(job_config)
|
91 |
| - run_immediately = job["run_immediately"] |
| 91 | + run_stream_job_immediately = job["run_stream_job_immediately"] |
92 | 92 |
|
93 | 93 | logger.info("Removing old job")
|
94 | 94 | self.remove_job(self.env.artifact_tag, job_config=job, is_streaming=is_streaming)
|
95 | 95 |
|
96 | 96 | logger.info("Submitting new job with configuration:")
|
97 | 97 | logger.info(pprint.pformat(job_config))
|
98 |
| - self._submit_job(job_config, is_streaming, run_immediately) |
| 98 | + self._submit_job(job_config, is_streaming, run_stream_job_immediately) |
99 | 99 |
|
100 | 100 | def create_config(self, job_name: str, job_config: dict):
|
101 | 101 | common_arguments = dict(
|
@@ -192,11 +192,11 @@ def _kill_it_with_fire(self, job_id):
|
192 | 192 | logger.info(f"Canceling active runs {active_run_ids}")
|
193 | 193 | [self.runs_api.cancel_run(_) for _ in active_run_ids]
|
194 | 194 |
|
195 |
| - def _submit_job(self, job_config: dict, is_streaming: bool, run_immediately: bool): |
| 195 | + def _submit_job(self, job_config: dict, is_streaming: bool, run_stream_job_immediately: bool): |
196 | 196 | job_resp = self.jobs_api.create_job(job_config)
|
197 | 197 | logger.info(f"Created Job with ID {job_resp['job_id']}")
|
198 | 198 |
|
199 |
| - if is_streaming or run_immediately: |
| 199 | + if is_streaming and run_stream_job_immediately: |
200 | 200 | resp = self.jobs_api.run_now(
|
201 | 201 | job_id=job_resp["job_id"],
|
202 | 202 | jar_params=None,
|
|
0 commit comments